Commit 6f7840cfeb5d4f57eb81674aff77a1a0d60094ab

Authored by tangwang
1 parent 137455af

refactor: rename product annotator to enrich and expand multilingual prompts

- Rename indexer/product_annotator.py to indexer/product_enrich.py and remove CSV-based CLI entrypoint, keeping only in-memory analyze_products API
- Introduce dedicated product_enrich logging with separate verbose log file for full LLM requests/responses
- Change indexer and /indexer/enrich-content API wiring to use indexer.product_enrich instead of indexer.product_annotator, updating tests and docs accordingly
- Switch translate_prompts to share SUPPORTED_INDEX_LANGUAGES from tenant_config_loader and reuse that mapping for language code → display name
- Remove hard SUPPORTED_LANGS constraint from LLM content-enrichment flow, driving languages directly from tenant/indexer configuration
- Redesign LLM prompt generation to support multi-round, multi-language tables: first round in English, subsequent rounds translate the entire table (headers + cells) into target languages using English instructions
api/routes/indexer.py
... ... @@ -443,23 +443,12 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest):
443 443  
444 444 def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]:
445 445 """
446   - 同步执行内容理解:调用 product_annotator.analyze_products,按语言批量跑 LLM,
  446 + 同步执行内容理解:调用 product_enrich.analyze_products,按语言批量跑 LLM,
447 447 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。
448 448 """
449   - from indexer.product_annotator import analyze_products, SUPPORTED_LANGS
450   -
451   - llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS]
452   - if not llm_langs:
453   - return [
454   - {
455   - "spu_id": it["spu_id"],
456   - "qanchors": {},
457   - "semantic_attributes": [],
458   - "tags": [],
459   - "error": "no supported languages (supported: %s)" % sorted(SUPPORTED_LANGS),
460   - }
461   - for it in items
462   - ]
  449 + from indexer.product_enrich import analyze_products
  450 +
  451 + llm_langs = list(dict.fromkeys(languages)) or ["en"]
463 452  
464 453 products = [{"id": it["spu_id"], "title": (it.get("title") or "").strip()} for it in items]
465 454 dim_keys = [
... ... @@ -544,7 +533,7 @@ async def enrich_content(request: EnrichContentRequest):
544 533 - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可
545 534 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。
546 535  
547   - 实现逻辑与 indexer.product_annotator.analyze_products 一致,支持多语言与 Redis 缓存。
  536 + 实现逻辑与 indexer.product_enrich.analyze_products 一致,支持多语言与 Redis 缓存。
548 537 """
549 538 try:
550 539 if not request.items:
... ...
api/translator_app.py
... ... @@ -88,7 +88,7 @@ import sys
88 88 import logging
89 89 import argparse
90 90 import uvicorn
91   -from typing import Optional, Dict
  91 +from typing import Dict, List, Optional, Sequence, Union
92 92 from fastapi import FastAPI, HTTPException
93 93 from fastapi.responses import JSONResponse
94 94 from fastapi.middleware.cors import CORSMiddleware
... ... @@ -162,7 +162,7 @@ def get_translator(model: str = "qwen") -> object:
162 162 # Request/Response models
163 163 class TranslationRequest(BaseModel):
164 164 """Translation request model."""
165   - text: str = Field(..., description="Text to translate")
  165 + text: Union[str, List[str]] = Field(..., description="Text to translate (string or list of strings)")
166 166 target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)")
167 167 source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)")
168 168 model: Optional[str] = Field(None, description="Translation model: qwen-mt | deepl | llm")
... ... @@ -183,10 +183,13 @@ class TranslationRequest(BaseModel):
183 183  
184 184 class TranslationResponse(BaseModel):
185 185 """Translation response model."""
186   - text: str = Field(..., description="Original text")
  186 + text: Union[str, List[str]] = Field(..., description="Original text (string or list)")
187 187 target_lang: str = Field(..., description="Target language code")
188 188 source_lang: Optional[str] = Field(None, description="Source language code (detected or provided)")
189   - translated_text: str = Field(..., description="Translated text")
  189 + translated_text: Union[str, List[Optional[str]]] = Field(
  190 + ...,
  191 + description="Translated text (string or list; list elements may be null on failure)",
  192 + )
190 193 status: str = Field(..., description="Translation status")
191 194 model: str = Field(..., description="Translation model used")
192 195  
... ... @@ -260,11 +263,19 @@ async def translate(request: TranslationRequest):
260 263  
261 264 Supports both Qwen (default) and DeepL models via the 'model' parameter.
262 265 """
263   - if not request.text or not request.text.strip():
264   - raise HTTPException(
265   - status_code=400,
266   - detail="Text cannot be empty"
267   - )
  266 + # 允许 text 为字符串或字符串列表
  267 + if isinstance(request.text, list):
  268 + if not request.text:
  269 + raise HTTPException(
  270 + status_code=400,
  271 + detail="Text list cannot be empty"
  272 + )
  273 + else:
  274 + if not request.text or not request.text.strip():
  275 + raise HTTPException(
  276 + status_code=400,
  277 + detail="Text cannot be empty"
  278 + )
268 279  
269 280 if not request.target_lang:
270 281 raise HTTPException(
... ... @@ -283,24 +294,96 @@ async def translate(request: TranslationRequest):
283 294 try:
284 295 # Get translator instance for the specified model
285 296 translator = get_translator(model=model)
286   -
287   - # Translate using the fixed prompt
  297 + raw_text = request.text
  298 +
  299 + # 如果是列表,并且底层 provider 声明支持 batch,则直接传 list
  300 + if isinstance(raw_text, list) and getattr(translator, "supports_batch", False):
  301 + try:
  302 + translated_list = translator.translate(
  303 + text=raw_text,
  304 + target_lang=request.target_lang,
  305 + source_lang=request.source_lang,
  306 + context=request.context,
  307 + prompt=request.prompt,
  308 + )
  309 + except Exception as exc:
  310 + logger.error("Batch translation failed: %s", exc, exc_info=True)
  311 + # 回退到逐条拆分逻辑
  312 + translated_list = None
  313 +
  314 + if translated_list is not None:
  315 + # 规范化为 List[Optional[str]],并保证长度对应
  316 + if not isinstance(translated_list, list):
  317 + raise HTTPException(
  318 + status_code=500,
  319 + detail="Batch translation provider returned non-list result",
  320 + )
  321 + normalized: List[Optional[str]] = []
  322 + for idx, item in enumerate(raw_text):
  323 + if idx < len(translated_list):
  324 + val = translated_list[idx]
  325 + else:
  326 + val = None
  327 + # 失败语义:失败位置为 None
  328 + normalized.append(val)
  329 +
  330 + return TranslationResponse(
  331 + text=raw_text,
  332 + target_lang=request.target_lang,
  333 + source_lang=request.source_lang,
  334 + translated_text=normalized,
  335 + status="success",
  336 + model=str(getattr(translator, "model", model)),
  337 + )
  338 +
  339 + # 否则:统一走逐条拆分逻辑(包括不支持 batch 的 provider)
  340 + if isinstance(raw_text, list):
  341 + results: List[Optional[str]] = []
  342 + for item in raw_text:
  343 + if item is None or not str(item).strip():
  344 + # 空元素不视为失败,直接返回原值
  345 + results.append(item) # type: ignore[arg-type]
  346 + continue
  347 + try:
  348 + out = translator.translate(
  349 + text=str(item),
  350 + target_lang=request.target_lang,
  351 + source_lang=request.source_lang,
  352 + context=request.context,
  353 + prompt=request.prompt,
  354 + )
  355 + except Exception as exc:
  356 + logger.warning("Per-item translation failed: %s", exc, exc_info=True)
  357 + out = None
  358 + # 失败语义:该元素为 None
  359 + results.append(out)
  360 +
  361 + return TranslationResponse(
  362 + text=raw_text,
  363 + target_lang=request.target_lang,
  364 + source_lang=request.source_lang,
  365 + translated_text=results,
  366 + status="success",
  367 + model=str(getattr(translator, "model", model)),
  368 + )
  369 +
  370 + # 单文本模式:保持原有严格失败语义
288 371 translated_text = translator.translate(
289   - text=request.text,
  372 + text=raw_text,
290 373 target_lang=request.target_lang,
291 374 source_lang=request.source_lang,
292 375 context=request.context,
293 376 prompt=request.prompt,
294 377 )
295   -
  378 +
296 379 if translated_text is None:
297 380 raise HTTPException(
298 381 status_code=500,
299 382 detail="Translation failed"
300 383 )
301   -
  384 +
302 385 return TranslationResponse(
303   - text=request.text,
  386 + text=raw_text,
304 387 target_lang=request.target_lang,
305 388 source_lang=request.source_lang,
306 389 translated_text=translated_text,
... ...
config/config.yaml
... ... @@ -224,7 +224,7 @@ spu_config:
224 224  
225 225 # 租户配置(Tenant Configuration)
226 226 # 每个租户可配置主语言 primary_language 与索引语言 index_languages(主市场语言,商家可勾选)
227   -# 默认 index_languages: [en, zh],可配置为任意 SUPPORTED_INDEX_LANGUAGES 的子集
  227 +# 默认 index_languages: [en, zh],可配置为任意 SOURCE_LANG_CODE_MAP.keys() 的子集
228 228 tenant_config:
229 229 default:
230 230 primary_language: "en"
... ...
config/tenant_config_loader.py
... ... @@ -11,7 +11,8 @@ from typing import Dict, Any, Optional, List
11 11 logger = logging.getLogger(__name__)
12 12  
13 13 # 支持的索引语言:code -> display name(供商家勾选主市场语言等场景使用)
14   -SUPPORTED_INDEX_LANGUAGES: Dict[str, str] = {
  14 +# 语言代码与展示名的双向映射(供翻译/LLM 提示等统一使用)
  15 +SOURCE_LANG_CODE_MAP: Dict[str, str] = {
15 16 "en": "English",
16 17 "zh": "Chinese",
17 18 "zh_tw": "Traditional Chinese",
... ... @@ -51,6 +52,9 @@ SUPPORTED_INDEX_LANGUAGES: Dict[str, str] = {
51 52 "bg": "Bulgarian",
52 53 }
53 54  
  55 +TARGET_LANG_CODE_MAP: Dict[str, str] = {v: k for k, v in SOURCE_LANG_CODE_MAP.items()}
  56 +
  57 +
54 58 def normalize_index_languages(value: Any, primary_language: str = "en") -> List[str]:
55 59 """
56 60 将 index_languages 配置规范化为合法语言代码列表。
... ... @@ -67,7 +71,7 @@ def normalize_index_languages(value: Any, primary_language: str = &quot;en&quot;) -&gt; List[
67 71 code = (item or "").strip().lower()
68 72 if not code or code in seen:
69 73 continue
70   - if code in SUPPORTED_INDEX_LANGUAGES:
  74 + if code in SOURCE_LANG_CODE_MAP:
71 75 valid.append(code)
72 76 seen.add(code)
73 77 return valid
... ... @@ -91,11 +95,11 @@ def resolve_index_languages(
91 95 to_en = bool(tenant_config.get("translate_to_en"))
92 96 to_zh = bool(tenant_config.get("translate_to_zh"))
93 97 langs: List[str] = []
94   - if primary and primary in SUPPORTED_INDEX_LANGUAGES:
  98 + if primary and primary in SOURCE_LANG_CODE_MAP:
95 99 langs.append(primary)
96 100 for code in ("en", "zh"):
97 101 if code not in langs and ((code == "en" and to_en) or (code == "zh" and to_zh)):
98   - if code in SUPPORTED_INDEX_LANGUAGES:
  102 + if code in SOURCE_LANG_CODE_MAP:
99 103 langs.append(code)
100 104 return langs if langs else list(default_index_languages)
101 105  
... ...
config/translate_prompts.py
1   -SOURCE_LANG_CODE_MAP = {
2   - "en": "English",
3   - "zh": "Chinese",
4   - "zh_tw": "Traditional Chinese",
5   - "ru": "Russian",
6   - "ja": "Japanese",
7   - "ko": "Korean",
8   - "es": "Spanish",
9   - "fr": "French",
10   - "pt": "Portuguese",
11   - "de": "German",
12   - "it": "Italian",
13   - "th": "Thai",
14   - "vi": "Vietnamese",
15   - "id": "Indonesian",
16   - "ms": "Malay",
17   - "ar": "Arabic",
18   - "hi": "Hindi",
19   - "he": "Hebrew",
20   - "my": "Burmese",
21   - "ta": "Tamil",
22   - "ur": "Urdu",
23   - "bn": "Bengali",
24   - "pl": "Polish",
25   - "nl": "Dutch",
26   - "ro": "Romanian",
27   - "tr": "Turkish",
28   - "km": "Khmer",
29   - "lo": "Lao",
30   - "yue": "Cantonese",
31   - "cs": "Czech",
32   - "el": "Greek",
33   - "sv": "Swedish",
34   - "hu": "Hungarian",
35   - "da": "Danish",
36   - "fi": "Finnish",
37   - "uk": "Ukrainian",
38   - "bg": "Bulgarian",
39   -}
40   -
41   -TARGET_LANG_CODE_MAP = {v: k for k, v in SOURCE_LANG_CODE_MAP.items()}
  1 +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP
42 2  
43 3 TRANSLATION_PROMPTS = {
44 4 "general": {
... ...
docs/DEVELOPER_GUIDE.md
... ... @@ -92,7 +92,7 @@ MySQL (店匠 SPU/SKU)
92 92 | indexer | 6004 | 索引 API(reindex/build-docs 等) | ✓ |
93 93 | frontend | 6003 | 调试 UI | ✓ |
94 94 | embedding | 6005 | 向量服务(文本/图片) | 可选 |
95   -| translator | 6006 | 翻译服务 | 可选 |
  95 +| translator | 6006 | 翻译服务(`POST /translate` 支持单条或批量 list;批量失败用 `null` 占位) | 可选 |
96 96 | reranker | 6007 | 重排服务 | 可选 |
97 97  
98 98 - 启动:`./run.sh` 仅启动 backend / indexer / frontend;需全功能时通过环境变量或脚本另行启动 embedding / translator / reranker。
... ... @@ -170,6 +170,11 @@ docs/ # 文档(含本指南)
170 170 - **原则**:业务代码只依赖 Provider 接口,不依赖具体 URL 或后端类型;新增调用方式(如新 Provider 类型)在对应 `providers/<capability>.py` 中实现并在工厂中注册。
171 171 - **详见**:本指南 §7.2;[QUICKSTART.md](./QUICKSTART.md) §3。
172 172  
  173 +补充约定(翻译 provider):
  174 +
  175 +- `translate(text=...)` 支持 `str` 与 `List[str]` 两种输入;当输入为列表时,输出必须与输入 **等长且顺序对应**,失败位置为 `None`(HTTP JSON 表现为 `null`)。
  176 +- provider 可暴露 `supports_batch: bool`(property)用于标识其是否支持直接批量调用;上层在处理 `text` 为列表时可优先走 batch,否则逐条拆分调用。
  177 +
173 178 ### 4.9 suggestion
174 179  
175 180 - **职责**:建议索引的构建与检索:从 ES 商品索引与 MySQL 日志等构建 suggestion 索引;搜索 API 的 `/search/suggestions` 使用本模块。
... ...
docs/工作总结-微服务性能优化与架构.md
... ... @@ -101,9 +101,9 @@ instruction: &quot;Given a shopping query, rank product titles by relevance&quot;
101 101  
102 102 **具体内容**:
103 103 - **接口**:`POST /indexer/enrich-content`(Indexer 服务端口 **6004**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`semantic_attributes`、`tags`。
104   -- **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_annotator.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。
  104 +-- **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_enrich.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。
105 105 - **Suggest 侧**:`suggestion/builder.py` 从 ES 商品索引读取 `_source: ["id", "spu_id", "title", "qanchors"]`,对 `qanchors.{lang}` 用 `_split_qanchors` 拆成词条,以 `source="qanchor"` 加入候选,排序时 `qanchor` 权重大于纯 title(`add_product("qanchor", ...)`);suggest 配置中 `sources: ["query_log", "qanchor"]` 表示候选来源包含 qanchor。
106   -- **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_annotator` 一致。
  106 +- **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_enrich` 一致。
107 107  
108 108 **状态**:内容理解字段已接入索引与 suggest 链路;依赖内容理解(qanchors/tags)的**全量数据尚未全部完成一轮**,后续需持续跑满并校验效果。
109 109  
... ...
docs/搜索API对接指南.md
... ... @@ -1484,7 +1484,7 @@ curl -X POST &quot;http://127.0.0.1:6004/indexer/build-docs-from-db&quot; \
1484 1484 ### 5.8 内容理解字段生成接口
1485 1485  
1486 1486 - **端点**: `POST /indexer/enrich-content`
1487   -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_annotator` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。
  1487 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_enrich` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。
1488 1488  
1489 1489 #### 请求参数
1490 1490  
... ... @@ -1862,6 +1862,10 @@ curl &quot;http://localhost:6007/health&quot;
1862 1862 }
1863 1863 ```
1864 1864  
  1865 +> **失败语义(批量)**:当 `text` 为列表时,如果其中某条翻译失败,对应位置返回 `null`(即 `translated_text[i] = null`),并保持数组长度与顺序不变;接口整体仍返回 `status="success"`,用于避免“部分失败”导致整批请求失败。
  1866 +
  1867 +> **实现提示(可忽略)**:服务端会尽可能使用底层翻译 provider 的批量能力(若支持),否则自动拆分逐条翻译;无论采用哪种方式,上述批量契约保持一致。
  1868 +
1865 1869 **完整 curl 示例**:
1866 1870  
1867 1871 中文 → 英文:
... ...
docs/翻译模块说明.md
... ... @@ -24,3 +24,83 @@ TRANSLATION_MODEL=qwen # 或 deepl
24 24 ## Provider 配置
25 25  
26 26 Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [QUICKSTART.md](./QUICKSTART.md) §3 与 [DEVELOPER_GUIDE.md](./DEVELOPER_GUIDE.md) §7.2。
  27 +
  28 +## HTTP 接口契约(translator service,端口 6006)
  29 +
  30 +服务默认监听 `http://localhost:6006`,提供:
  31 +
  32 +- `POST /translate`: 文本翻译(支持 `qwen/qwen-mt`、`deepl`、`llm`)
  33 +- `GET /health`: 健康检查
  34 +
  35 +### `POST /translate`
  36 +
  37 +**请求体**:
  38 +
  39 +```json
  40 +{
  41 + "text": "商品名称",
  42 + "target_lang": "en",
  43 + "source_lang": "zh",
  44 + "model": "qwen",
  45 + "context": "sku_name",
  46 + "prompt": null
  47 +}
  48 +```
  49 +
  50 +- `text` 支持两种形式:
  51 + - 单条:`string`
  52 + - 批量:`string[]`(等长返回,顺序对应)
  53 +
  54 +**响应体**(单条):
  55 +
  56 +```json
  57 +{
  58 + "text": "商品名称",
  59 + "target_lang": "en",
  60 + "source_lang": "zh",
  61 + "translated_text": "Product name",
  62 + "status": "success",
  63 + "model": "qwen"
  64 +}
  65 +```
  66 +
  67 +**响应体**(批量):
  68 +
  69 +```json
  70 +{
  71 + "text": ["商品名称1", "商品名称2"],
  72 + "target_lang": "en",
  73 + "source_lang": "zh",
  74 + "translated_text": ["Product name 1", null],
  75 + "status": "success",
  76 + "model": "qwen"
  77 +}
  78 +```
  79 +
  80 +批量模式下,**单条失败用 `null` 占位**(即 `translated_text[i] = null`),保证长度与顺序一一对应,避免部分失败导致整批报错。
  81 +
  82 +---
  83 +
  84 +## 开发者接口约定(Provider / 代码调用)
  85 +
  86 +除 HTTP 微服务外,代码侧(如 query/indexer)通常通过 `providers.translation.create_translation_provider()` 获取翻译 provider 实例并调用 `translate()`。
  87 +
  88 +### 输入输出形状(Shape)
  89 +
  90 +- `translate(text=...)` 支持:
  91 + - **单条**:`text: str` → 返回 `Optional[str]`
  92 + - **批量**:`text: List[str]` → 返回 `List[Optional[str]]`
  93 +- **批量语义**:返回列表必须与输入 **等长且顺序对应**;某条翻译失败时,对应位置为 `None`(HTTP JSON 中表现为 `null`)。
  94 +
  95 +### 批量能力标识(supports_batch)
  96 +
  97 +不同 provider 对批量的实现方式可能不同(例如:真正一次请求传多条,或内部循环逐条翻译并保持 shape)。
  98 +
  99 +为便于上层(如 `api/translator_app.py`)做最优调用,provider 可暴露:
  100 +
  101 +- `supports_batch: bool`(property)
  102 +
  103 +上层在收到 `text` 为列表时:
  104 +
  105 +- **若 `supports_batch=True`**:可以直接将列表传给 `translate(text=[...])`
  106 +- **若 `supports_batch=False`**:上层会逐条拆分调用(仍保证输出列表一一对应、失败为 `null`)
... ...
indexer/document_transformer.py
... ... @@ -14,7 +14,7 @@ import logging
14 14 import re
15 15 from typing import Dict, Any, Optional, List
16 16 from config import ConfigLoader
17   -from indexer.product_annotator import analyze_products, SUPPORTED_LANGS
  17 +from indexer.product_enrich import analyze_products
18 18  
19 19 logger = logging.getLogger(__name__)
20 20  
... ... @@ -225,9 +225,8 @@ class SPUDocumentTransformer:
225 225 index_langs = self.tenant_config.get("index_languages") or ["en", "zh"]
226 226 except Exception:
227 227 index_langs = ["en", "zh"]
228   - llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS]
229   - if not llm_langs:
230   - return
  228 + # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用
  229 + llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序
231 230  
232 231 # 只对有 title 的 SPU 参与 LLM;其余跳过
233 232 id_to_idx: Dict[str, int] = {}
... ... @@ -651,7 +650,7 @@ class SPUDocumentTransformer:
651 650  
652 651 def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None:
653 652 """
654   - 调用 indexer.product_annotator.analyze_products,为当前 SPU 填充:
  653 + 调用 indexer.product_enrich.analyze_products,为当前 SPU 填充:
655 654 - qanchors.{lang}
656 655 - semantic_attributes (lang/name/value)
657 656 """
... ... @@ -660,10 +659,8 @@ class SPUDocumentTransformer:
660 659 except Exception:
661 660 index_langs = ["en", "zh"]
662 661  
663   - # 只在支持的语言集合内调用
664   - llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS]
665   - if not llm_langs:
666   - return
  662 + # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用
  663 + llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序
667 664  
668 665 spu_id = str(spu_row.get("id") or "").strip()
669 666 title = str(spu_row.get("title") or "").strip()
... ...
indexer/product_annotator.py deleted
... ... @@ -1,685 +0,0 @@
1   -#!/usr/bin/env python3
2   -"""
3   -商品品类分析脚本
4   -批量读取商品标题,调用大模型进行品类分析,并保存结果
5   -"""
6   -
7   -import csv
8   -import os
9   -import json
10   -import logging
11   -import time
12   -import hashlib
13   -from datetime import datetime
14   -from typing import List, Dict, Tuple, Any, Optional
15   -
16   -import redis
17   -import requests
18   -from pathlib import Path
19   -from requests.adapters import HTTPAdapter
20   -from urllib3.util.retry import Retry
21   -
22   -from config.env_config import REDIS_CONFIG
23   -
24   -# 配置
25   -BATCH_SIZE = 20
26   -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1
27   -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
28   -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1
29   -API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1"
30   -MODEL_NAME = "qwen-flash"
31   -API_KEY = os.environ.get("DASHSCOPE_API_KEY")
32   -MAX_RETRIES = 3
33   -RETRY_DELAY = 5 # 秒
34   -REQUEST_TIMEOUT = 180 # 秒
35   -
36   -# 文件路径
37   -INPUT_FILE = "saas_170_products.csv"
38   -OUTPUT_DIR = Path("output_logs")
39   -OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv"
40   -LOG_DIR = OUTPUT_DIR / "logs"
41   -
42   -# 设置独立日志(不影响全局 indexer.log)
43   -LOG_DIR.mkdir(parents=True, exist_ok=True)
44   -timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
45   -log_file = LOG_DIR / f"process_{timestamp}.log"
46   -
47   -logger = logging.getLogger("product_annotator")
48   -logger.setLevel(logging.INFO)
49   -
50   -if not logger.handlers:
51   - formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
52   -
53   - file_handler = logging.FileHandler(log_file, encoding="utf-8")
54   - file_handler.setFormatter(formatter)
55   -
56   - stream_handler = logging.StreamHandler()
57   - stream_handler.setFormatter(formatter)
58   -
59   - logger.addHandler(file_handler)
60   - logger.addHandler(stream_handler)
61   -
62   - # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件
63   - logger.propagate = False
64   -
65   -
66   -# Redis 缓存(用于 anchors / 语义属性)
67   -ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors")
68   -ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30))
69   -_anchor_redis: Optional[redis.Redis] = None
70   -
71   -try:
72   - _anchor_redis = redis.Redis(
73   - host=REDIS_CONFIG.get("host", "localhost"),
74   - port=REDIS_CONFIG.get("port", 6479),
75   - password=REDIS_CONFIG.get("password"),
76   - decode_responses=True,
77   - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1),
78   - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1),
79   - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False),
80   - health_check_interval=10,
81   - )
82   - _anchor_redis.ping()
83   - logger.info("Redis cache initialized for product anchors and semantic attributes")
84   -except Exception as e:
85   - logger.warning(f"Failed to initialize Redis for anchors cache: {e}")
86   - _anchor_redis = None
87   -
88   -
89   -LANG_LABELS: Dict[str, str] = {
90   - "zh": "中文",
91   - "en": "英文",
92   - "de": "德文",
93   - "ru": "俄文",
94   - "fr": "法文",
95   -}
96   -
97   -SUPPORTED_LANGS = set(LANG_LABELS.keys())
98   -
99   -SYSTEM_MESSAGES: Dict[str, str] = {
100   - "zh": (
101   - "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注,"
102   - "并按要求格式返回 Markdown 表格。所有输出内容必须为中文。"
103   - ),
104   - "en": (
105   - "You are a product annotator for an e-commerce platform. "
106   - "For each input product, you must understand, analyze and label it, "
107   - "and return a Markdown table strictly following the requested format. "
108   - "All output must be in English."
109   - ),
110   - "de": (
111   - "Du bist ein Produktannotator für eine E‑Commerce‑Plattform. "
112   - "Du sollst jedes Eingabeprodukt verstehen, analysieren und beschriften "
113   - "und eine Markdown-Tabelle im geforderten Format zurückgeben. "
114   - "Alle Ausgaben müssen auf Deutsch sein."
115   - ),
116   - "ru": (
117   - "Вы — разметчик товаров для платформы электронной коммерции. "
118   - "Ваша задача — понимать, анализировать и размечать каждый товар "
119   - "и возвращать таблицу Markdown в требуемом формате. "
120   - "Весь вывод должен быть на русском языке."
121   - ),
122   - "fr": (
123   - "Vous êtes annotateur de produits pour une plateforme e‑commerce. "
124   - "Pour chaque produit en entrée, vous devez le comprendre, l’analyser et l’annoter, "
125   - "puis renvoyer un tableau Markdown au format demandé. "
126   - "Toute la sortie doit être en français."
127   - ),
128   -}
129   -
130   -
131   -def _make_anchor_cache_key(
132   - title: str,
133   - target_lang: str,
134   - tenant_id: Optional[str] = None,
135   -) -> str:
136   - """构造 anchors/语义属性的缓存 key。"""
137   - base = (tenant_id or "global").strip()
138   - h = hashlib.md5(title.encode("utf-8")).hexdigest()
139   - return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}"
140   -
141   -
142   -def _get_cached_anchor_result(
143   - title: str,
144   - target_lang: str,
145   - tenant_id: Optional[str] = None,
146   -) -> Optional[Dict[str, Any]]:
147   - if not _anchor_redis:
148   - return None
149   - try:
150   - key = _make_anchor_cache_key(title, target_lang, tenant_id)
151   - raw = _anchor_redis.get(key)
152   - if not raw:
153   - return None
154   - return json.loads(raw)
155   - except Exception as e:
156   - logger.warning(f"Failed to get anchor cache: {e}")
157   - return None
158   -
159   -
160   -def _set_cached_anchor_result(
161   - title: str,
162   - target_lang: str,
163   - result: Dict[str, Any],
164   - tenant_id: Optional[str] = None,
165   -) -> None:
166   - if not _anchor_redis:
167   - return
168   - try:
169   - key = _make_anchor_cache_key(title, target_lang, tenant_id)
170   - ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600
171   - _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))
172   - except Exception as e:
173   - logger.warning(f"Failed to set anchor cache: {e}")
174   -
175   -
176   -def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
177   - """根据目标语言创建 LLM 提示词和表头说明。"""
178   - if target_lang == "en":
179   - prompt = """Please analyze each input product title and extract the following information:
180   -
181   -1. Product title: a natural English product name derived from the input title
182   -2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress)
183   -3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style)
184   -4. Target audience: gender / age group, etc. (e.g. young women)
185   -5. Usage scene
186   -6. Applicable season
187   -7. Key attributes
188   -8. Material description
189   -9. Functional features
190   -10. Selling point: one concise key selling sentence for recommendation
191   -11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc.
192   -
193   -Input product list:
194   -
195   -"""
196   - prompt_tail = """
197   -Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations:
198   -
199   -| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text |
200   -|----|----|----|----|----|----|----|----|----|----|----|----|
201   -"""
202   - elif target_lang == "de":
203   - prompt = """Bitte analysiere jeden eingegebenen Produkttitel und extrahiere die folgenden Informationen:
204   -
205   -1. Produkttitel: ein natürlicher deutscher Produkttitel basierend auf dem Eingangstitel
206   -2. Kategoriepfad: von Oberkategorie bis Feinkategorie, getrennt durch ">" (z. B. Kleidung>Damen>Kleider>Businesskleid)
207   -3. Feinkörnige Tags: Stil / Merkmale / Eigenschaften (z. B. Blumenmuster, tailliert, französischer Stil)
208   -4. Zielgruppe: Geschlecht / Altersgruppe usw. (z. B. junge Frauen)
209   -5. Einsatzszenario
210   -6. Geeignete Saison
211   -7. Wichtige Attribute
212   -8. Materialbeschreibung
213   -9. Funktionale Merkmale
214   -10. Verkaufsargument: ein prägnanter, einzeiliger Haupt-Selling-Point für Empfehlungen
215   -11. Ankertexte: eine Menge von Wörtern oder Phrasen, die Nutzer als Suchanfragen für dieses Produkt verwenden könnten und die Kategorie, feine Tags, Funktion und Nutzungsszenarien abdecken.
216   -
217   -Eingabeliste der Produkte:
218   -
219   -"""
220   - prompt_tail = """
221   -Gib bitte strikt eine Markdown-Tabelle im folgenden Format zurück. Mehrere Werte in einer Spalte werden durch Kommas getrennt. Füge keine weiteren Erklärungen hinzu:
222   -
223   -| Nr. | Produkttitel | Kategoriepfad | Feintags | Zielgruppe | Einsatzszenario | Saison | Wichtige Attribute | Material | Merkmale | Verkaufsargument | Ankertexte |
224   -|----|----|----|----|----|----|----|----|----|----|----|----|
225   -"""
226   - elif target_lang == "ru":
227   - prompt = """Пожалуйста, проанализируйте каждый входной заголовок товара и извлеките следующую информацию:
228   -
229   -1. Заголовок товара: естественное русскоязычное название товара на основе исходного заголовка
230   -2. Путь категории: от широкой до узкой категории, разделённый символом ">" (например: Одежда>Женская одежда>Платья>Деловое платье)
231   -3. Детализированные теги: стиль / особенности / характеристики (например: цветочный принт, приталенный, французский стиль)
232   -4. Целевая аудитория: пол / возрастная группа и т. п. (например: молодые женщины)
233   -5. Сценарий использования
234   -6. Подходящий сезон
235   -7. Ключевые характеристики
236   -8. Описание материала
237   -9. Функциональные особенности
238   -10. Торговое преимущество: одно краткое ключевое предложение для рекомендаций
239   -11. Якорные запросы: набор слов или фраз, которые пользователи могут использовать в качестве поисковых запросов для этого товара, покрывающих категорию, детализированные теги, функциональные характеристики, сценарии использования и т. д.
240   -
241   -Список входных товаров:
242   -
243   -"""
244   - prompt_tail = """
245   -Пожалуйста, строго верните Markdown‑таблицу в следующем формате. Для колонок с несколькими значениями разделяйте значения запятыми. Не добавляйте никаких дополнительных пояснений:
246   -
247   -| № | Заголовок товара | Путь категории | Детализированные теги | Целевая аудитория | Сценарий использования | Сезон | Ключевые характеристики | Материал | Особенности | Торговое преимущество | Якорные запросы |
248   -|----|----|----|----|----|----|----|----|----|----|----|----|
249   -"""
250   - elif target_lang == "fr":
251   - prompt = """Veuillez analyser chaque titre de produit en entrée et extraire les informations suivantes :
252   -
253   -1. Titre du produit : un titre de produit naturel en français basé sur le titre d’origine
254   -2. Chemin de catégorie : de la catégorie la plus large à la plus fine, séparées par ">" (par ex. Vêtements>Femme>Robes>Robe de travail)
255   -3. Tags détaillés : style / caractéristiques / attributs (par ex. fleuri, cintré, style français)
256   -4. Public cible : sexe / tranche d’âge, etc. (par ex. jeunes femmes)
257   -5. Scénario d’utilisation
258   -6. Saison adaptée
259   -7. Attributs clés
260   -8. Description du matériau
261   -9. Caractéristiques fonctionnelles
262   -10. Argument de vente : une phrase concise résumant le principal atout pour la recommandation
263   -11. Texte d’ancrage : un ensemble de mots ou d’expressions que les utilisateurs pourraient saisir comme requêtes de recherche pour ce produit, couvrant la catégorie, les tags détaillés, les fonctions, les scénarios d’usage, etc.
264   -
265   -Liste des produits en entrée :
266   -
267   -"""
268   - prompt_tail = """
269   -Veuillez strictement renvoyer un tableau Markdown au format suivant. Pour toute colonne pouvant contenir plusieurs valeurs, séparez‑les par des virgules. N’ajoutez aucune autre explication :
270   -
271   -| N° | Titre du produit | Chemin de catégorie | Tags détaillés | Public cible | Scénario d’utilisation | Saison | Attributs clés | Matériau | Caractéristiques | Argument de vente | Texte d’ancrage |
272   -|----|----|----|----|----|----|----|----|----|----|----|----|
273   -"""
274   - else:
275   - # 默认中文版本
276   - prompt = """请对输入的每条商品标题,分析并提取以下信息:
277   -
278   -1. 商品标题:将输入商品名称翻译为自然、完整的中文商品标题
279   -2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤)
280   -3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式)
281   -4. 适用人群:性别/年龄段等(例如:年轻女性)
282   -5. 使用场景
283   -6. 适用季节
284   -7. 关键属性
285   -8. 材质说明
286   -9. 功能特点
287   -10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由
288   -11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。
289   -
290   -输入商品列表:
291   -
292   -"""
293   - prompt_tail = """
294   -请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明:
295   -
296   -| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 |
297   -|----|----|----|----|----|----|----|----|----|----|----|----|
298   -"""
299   -
300   - for idx, product in enumerate(products, 1):
301   - prompt += f'{idx}. {product["title"]}\n'
302   - prompt += prompt_tail
303   -
304   - return prompt
305   -
306   -
307   -def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]:
308   - """调用大模型API(带重试机制),按目标语言选择系统提示词。"""
309   - headers = {
310   - "Authorization": f"Bearer {API_KEY}",
311   - "Content-Type": "application/json"
312   - }
313   -
314   - payload = {
315   - "model": MODEL_NAME,
316   - "messages": [
317   - {
318   - "role": "system",
319   - "content": SYSTEM_MESSAGES.get(target_lang, SYSTEM_MESSAGES["zh"])
320   - },
321   - {
322   - "role": "user",
323   - "content": prompt
324   - }
325   - ],
326   - "temperature": 0.3,
327   - "top_p": 0.8
328   - }
329   -
330   - request_data = {
331   - "headers": {k: v for k, v in headers.items() if k != "Authorization"},
332   - "payload": payload
333   - }
334   -
335   - logger.info(f"\n{'='*80}")
336   - logger.info(f"LLM Request (Model: {MODEL_NAME}):")
337   - logger.info(json.dumps(request_data, ensure_ascii=False, indent=2))
338   - logger.info(f"\nPrompt:\n{prompt}")
339   -
340   - # 创建session,禁用代理
341   - session = requests.Session()
342   - session.trust_env = False # 忽略系统代理设置
343   -
344   - try:
345   - # 重试机制
346   - for attempt in range(MAX_RETRIES):
347   - try:
348   - response = session.post(
349   - f"{API_BASE_URL}/chat/completions",
350   - headers=headers,
351   - json=payload,
352   - timeout=REQUEST_TIMEOUT,
353   - proxies={"http": None, "https": None} # 明确禁用代理
354   - )
355   -
356   - response.raise_for_status()
357   - result = response.json()
358   -
359   - logger.info(f"\nLLM Response:")
360   - logger.info(json.dumps(result, ensure_ascii=False, indent=2))
361   -
362   - content = result["choices"][0]["message"]["content"]
363   - logger.info(f"\nExtracted Content:\n{content}")
364   -
365   - return content, json.dumps(result, ensure_ascii=False)
366   -
367   - except requests.exceptions.ProxyError as e:
368   - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}")
369   - if attempt < MAX_RETRIES - 1:
370   - logger.info(f"Retrying in {RETRY_DELAY} seconds...")
371   - time.sleep(RETRY_DELAY)
372   - else:
373   - raise
374   -
375   - except requests.exceptions.RequestException as e:
376   - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}")
377   - if attempt < MAX_RETRIES - 1:
378   - logger.info(f"Retrying in {RETRY_DELAY} seconds...")
379   - time.sleep(RETRY_DELAY)
380   - else:
381   - raise
382   -
383   - except Exception as e:
384   - logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}")
385   - if attempt < MAX_RETRIES - 1:
386   - logger.info(f"Retrying in {RETRY_DELAY} seconds...")
387   - time.sleep(RETRY_DELAY)
388   - else:
389   - raise
390   -
391   - finally:
392   - session.close()
393   -
394   -
395   -def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]:
396   - """解析markdown表格内容"""
397   - lines = markdown_content.strip().split('\n')
398   - data = []
399   - data_started = False
400   -
401   - for line in lines:
402   - line = line.strip()
403   - if not line:
404   - continue
405   -
406   - # 表格行处理
407   - if line.startswith('|'):
408   - # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |")
409   - sep_chars = line.replace('|', '').strip().replace(' ', '')
410   - if sep_chars and set(sep_chars) <= {'-', ':'}:
411   - data_started = True
412   - continue
413   -
414   - # 首个表头行:无论语言如何,统一跳过
415   - if not data_started:
416   - # 等待下一行数据行
417   - continue
418   -
419   - # 解析数据行
420   - parts = [p.strip() for p in line.split('|')]
421   - parts = [p for p in parts if p] # 移除空字符串
422   -
423   - if len(parts) >= 2:
424   - row = {
425   - "seq_no": parts[0],
426   - "title": parts[1], # 商品标题(按目标语言)
427   - "category_path": parts[2] if len(parts) > 2 else "", # 品类路径
428   - "tags": parts[3] if len(parts) > 3 else "", # 细分标签
429   - "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群
430   - "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景
431   - "season": parts[6] if len(parts) > 6 else "", # 适用季节
432   - "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性
433   - "material": parts[8] if len(parts) > 8 else "", # 材质说明
434   - "features": parts[9] if len(parts) > 9 else "", # 功能特点
435   - "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点
436   - "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本
437   - }
438   - data.append(row)
439   -
440   - return data
441   -
442   -
443   -def process_batch(
444   - batch_data: List[Dict[str, str]],
445   - batch_num: int,
446   - target_lang: str = "zh"
447   -) -> List[Dict[str, str]]:
448   - """处理一个批次的数据"""
449   - logger.info(f"\n{'#'*80}")
450   - logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)")
451   -
452   - # 创建提示词
453   - prompt = create_prompt(batch_data, target_lang=target_lang)
454   -
455   - # 调用LLM
456   - try:
457   - raw_response, full_response_json = call_llm(prompt, target_lang=target_lang)
458   -
459   - # 解析结果
460   - parsed_results = parse_markdown_table(raw_response)
461   -
462   - logger.info(f"\nParsed Results ({len(parsed_results)} items):")
463   - logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2))
464   -
465   - # 映射回原始ID
466   - results_with_ids = []
467   - for i, parsed_item in enumerate(parsed_results):
468   - if i < len(batch_data):
469   - original_id = batch_data[i]["id"]
470   - result = {
471   - "id": original_id,
472   - "lang": target_lang,
473   - "title_input": batch_data[i]["title"], # 原始输入标题
474   - "title": parsed_item.get("title", ""), # 模型生成的标题
475   - "category_path": parsed_item.get("category_path", ""), # 品类路径
476   - "tags": parsed_item.get("tags", ""), # 细分标签
477   - "target_audience": parsed_item.get("target_audience", ""), # 适用人群
478   - "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景
479   - "season": parsed_item.get("season", ""), # 适用季节
480   - "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性
481   - "material": parsed_item.get("material", ""), # 材质说明
482   - "features": parsed_item.get("features", ""), # 功能特点
483   - "selling_points": parsed_item.get("selling_points", ""), # 商品卖点
484   - "anchor_text": parsed_item.get("anchor_text", "") # 锚文本
485   - }
486   - results_with_ids.append(result)
487   - logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}")
488   -
489   - # 保存日志
490   - batch_log = {
491   - "batch_num": batch_num,
492   - "timestamp": datetime.now().isoformat(),
493   - "input_products": batch_data,
494   - "raw_response": raw_response,
495   - "full_response_json": full_response_json,
496   - "parsed_results": parsed_results,
497   - "final_results": results_with_ids
498   - }
499   -
500   - batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json"
501   - with open(batch_log_file, 'w', encoding='utf-8') as f:
502   - json.dump(batch_log, f, ensure_ascii=False, indent=2)
503   -
504   - logger.info(f"Batch log saved to: {batch_log_file}")
505   -
506   - return results_with_ids
507   -
508   - except Exception as e:
509   - logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True)
510   - # 返回空结果,保持ID映射
511   - return [{
512   - "id": item["id"],
513   - "lang": target_lang,
514   - "title_input": item["title"],
515   - "title": "",
516   - "category_path": "",
517   - "tags": "",
518   - "target_audience": "",
519   - "usage_scene": "",
520   - "season": "",
521   - "key_attributes": "",
522   - "material": "",
523   - "features": "",
524   - "selling_points": "",
525   - "anchor_text": "",
526   - "error": str(e),
527   - } for item in batch_data]
528   -
529   -
530   -def read_products(input_file: str) -> List[Dict[str, str]]:
531   - """读取CSV文件"""
532   - products = []
533   - with open(input_file, 'r', encoding='utf-8') as f:
534   - reader = csv.DictReader(f)
535   - for row in reader:
536   - products.append({
537   - "id": row["id"],
538   - "title": row["title"]
539   - })
540   - return products
541   -
542   -
543   -def write_results(results: List[Dict[str, str]], output_file: Path):
544   - """写入结果到CSV文件"""
545   - output_file.parent.mkdir(parents=True, exist_ok=True)
546   -
547   - fieldnames = [
548   - "id",
549   - "lang",
550   - "title_input",
551   - "title",
552   - "category_path",
553   - "tags",
554   - "target_audience",
555   - "usage_scene",
556   - "season",
557   - "key_attributes",
558   - "material",
559   - "features",
560   - "selling_points",
561   - "anchor_text",
562   - ]
563   -
564   - with open(output_file, 'w', encoding='utf-8', newline='') as f:
565   - writer = csv.DictWriter(f, fieldnames=fieldnames)
566   - writer.writeheader()
567   - writer.writerows(results)
568   -
569   - logger.info(f"\nResults written to: {output_file}")
570   -
571   -
572   -def main():
573   - """主函数"""
574   - if not API_KEY:
575   - logger.error("Error: DASHSCOPE_API_KEY environment variable not set!")
576   - return
577   -
578   - logger.info(f"Starting product analysis process")
579   - logger.info(f"Input file: {INPUT_FILE}")
580   - logger.info(f"Output file: {OUTPUT_FILE}")
581   - logger.info(f"Batch size: {BATCH_SIZE}")
582   - logger.info(f"Model: {MODEL_NAME}")
583   -
584   - # 读取产品数据
585   - logger.info(f"\nReading products from {INPUT_FILE}...")
586   - products = read_products(INPUT_FILE)
587   - logger.info(f"Total products to process: {len(products)}")
588   -
589   - # 分批处理
590   - all_results = []
591   - total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE
592   -
593   - for i in range(0, len(products), BATCH_SIZE):
594   - batch_num = i // BATCH_SIZE + 1
595   - batch = products[i:i + BATCH_SIZE]
596   -
597   - logger.info(f"\nProgress: Batch {batch_num}/{total_batches}")
598   -
599   - results = process_batch(batch, batch_num, target_lang="zh")
600   - all_results.extend(results)
601   -
602   - # 每处理完一个批次就写入一次(断点续传)
603   - write_results(all_results, OUTPUT_FILE)
604   - logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed")
605   -
606   - logger.info(f"\n{'='*80}")
607   - logger.info(f"Processing completed!")
608   - logger.info(f"Total processed: {len(all_results)} items")
609   - logger.info(f"Output file: {OUTPUT_FILE}")
610   - logger.info(f"Log file: {log_file}")
611   -
612   -
613   -if __name__ == "__main__":
614   - main()
615   -
616   -
617   -def analyze_products(
618   - products: List[Dict[str, str]],
619   - target_lang: str = "zh",
620   - batch_size: Optional[int] = None,
621   - tenant_id: Optional[str] = None,
622   -) -> List[Dict[str, Any]]:
623   - """
624   - 库调用入口:根据输入+语言,返回锚文本及各维度信息。
625   -
626   - Args:
627   - products: [{"id": "...", "title": "..."}]
628   - target_lang: 输出语言,需在 SUPPORTED_LANGS 内
629   - batch_size: 批大小,默认使用全局 BATCH_SIZE
630   - """
631   - if not API_KEY:
632   - raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM")
633   -
634   - if target_lang not in SUPPORTED_LANGS:
635   - raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}")
636   -
637   - if not products:
638   - return []
639   -
640   - # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中
641   - if len(products) == 1:
642   - p = products[0]
643   - title = str(p.get("title") or "").strip()
644   - if title:
645   - cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id)
646   - if cached:
647   - logger.info(
648   - f"[analyze_products] Cache hit for title='{title[:50]}...', "
649   - f"lang={target_lang}, tenant_id={tenant_id or 'global'}"
650   - )
651   - return [cached]
652   -
653   - # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20):
654   - # - 尽可能攒批处理;
655   - # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。
656   - req_bs = BATCH_SIZE if batch_size is None else int(batch_size)
657   - bs = max(1, min(req_bs, BATCH_SIZE))
658   - all_results: List[Dict[str, Any]] = []
659   - total_batches = (len(products) + bs - 1) // bs
660   -
661   - for i in range(0, len(products), bs):
662   - batch_num = i // bs + 1
663   - batch = products[i:i + bs]
664   - logger.info(
665   - f"[analyze_products] Processing batch {batch_num}/{total_batches}, "
666   - f"size={len(batch)}, target_lang={target_lang}"
667   - )
668   - batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang)
669   - all_results.extend(batch_results)
670   -
671   - # 写入缓存
672   - for item in batch_results:
673   - title_input = str(item.get("title_input") or "").strip()
674   - if not title_input:
675   - continue
676   - if item.get("error"):
677   - # 不缓存错误结果,避免放大临时故障
678   - continue
679   - try:
680   - _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id)
681   - except Exception:
682   - # 已在内部记录 warning
683   - pass
684   -
685   - return all_results
indexer/product_enrich.py 0 → 100644
... ... @@ -0,0 +1,516 @@
  1 +#!/usr/bin/env python3
  2 +"""
  3 +商品内容理解与属性补充模块(product_enrich)
  4 +
  5 +提供基于 LLM 的商品锚文本 / 语义属性 / 标签等分析能力,
  6 +供 indexer 与 API 在内存中调用(不再负责 CSV 读写)。
  7 +"""
  8 +
  9 +import os
  10 +import json
  11 +import logging
  12 +import time
  13 +import hashlib
  14 +from datetime import datetime
  15 +from typing import List, Dict, Tuple, Any, Optional
  16 +
  17 +import redis
  18 +import requests
  19 +from pathlib import Path
  20 +
  21 +from config.env_config import REDIS_CONFIG
  22 +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP
  23 +
  24 +# 配置
  25 +BATCH_SIZE = 20
  26 +# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1
  27 +# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
  28 +# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1
  29 +API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1"
  30 +MODEL_NAME = "qwen-flash"
  31 +API_KEY = os.environ.get("DASHSCOPE_API_KEY")
  32 +MAX_RETRIES = 3
  33 +RETRY_DELAY = 5 # 秒
  34 +REQUEST_TIMEOUT = 180 # 秒
  35 +
  36 +# 日志路径
  37 +OUTPUT_DIR = Path("output_logs")
  38 +LOG_DIR = OUTPUT_DIR / "logs"
  39 +
  40 +# 设置独立日志(不影响全局 indexer.log)
  41 +LOG_DIR.mkdir(parents=True, exist_ok=True)
  42 +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  43 +log_file = LOG_DIR / f"product_enrich_{timestamp}.log"
  44 +verbose_log_file = LOG_DIR / "product_enrich_verbose.log"
  45 +
  46 +# 主日志 logger:执行流程、批次信息等
  47 +logger = logging.getLogger("product_enrich")
  48 +logger.setLevel(logging.INFO)
  49 +
  50 +if not logger.handlers:
  51 + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
  52 +
  53 + file_handler = logging.FileHandler(log_file, encoding="utf-8")
  54 + file_handler.setFormatter(formatter)
  55 +
  56 + stream_handler = logging.StreamHandler()
  57 + stream_handler.setFormatter(formatter)
  58 +
  59 + logger.addHandler(file_handler)
  60 + logger.addHandler(stream_handler)
  61 +
  62 + # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件
  63 + logger.propagate = False
  64 +
  65 +# 详尽日志 logger:专门记录 LLM 请求与响应
  66 +verbose_logger = logging.getLogger("product_enrich_verbose")
  67 +verbose_logger.setLevel(logging.INFO)
  68 +
  69 +if not verbose_logger.handlers:
  70 + verbose_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
  71 + verbose_file_handler = logging.FileHandler(verbose_log_file, encoding="utf-8")
  72 + verbose_file_handler.setFormatter(verbose_formatter)
  73 + verbose_logger.addHandler(verbose_file_handler)
  74 + verbose_logger.propagate = False
  75 +
  76 +
  77 +# Redis 缓存(用于 anchors / 语义属性)
  78 +ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors")
  79 +ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30))
  80 +_anchor_redis: Optional[redis.Redis] = None
  81 +
  82 +try:
  83 + _anchor_redis = redis.Redis(
  84 + host=REDIS_CONFIG.get("host", "localhost"),
  85 + port=REDIS_CONFIG.get("port", 6479),
  86 + password=REDIS_CONFIG.get("password"),
  87 + decode_responses=True,
  88 + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1),
  89 + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1),
  90 + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False),
  91 + health_check_interval=10,
  92 + )
  93 + _anchor_redis.ping()
  94 + logger.info("Redis cache initialized for product anchors and semantic attributes")
  95 +except Exception as e:
  96 + logger.warning(f"Failed to initialize Redis for anchors cache: {e}")
  97 + _anchor_redis = None
  98 +
  99 +
  100 +SYSTEM_MESSAGES = (
  101 + "You are a product annotator for an e-commerce platform. "
  102 + "For each input product, you must understand, analyze and label it, "
  103 + "and return a Markdown table strictly following the requested format. "
  104 + "All output must be in English."
  105 +)
  106 +
  107 +
  108 +def _make_anchor_cache_key(
  109 + title: str,
  110 + target_lang: str,
  111 + tenant_id: Optional[str] = None,
  112 +) -> str:
  113 + """构造 anchors/语义属性的缓存 key。"""
  114 + base = (tenant_id or "global").strip()
  115 + h = hashlib.md5(title.encode("utf-8")).hexdigest()
  116 + return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}"
  117 +
  118 +
  119 +def _get_cached_anchor_result(
  120 + title: str,
  121 + target_lang: str,
  122 + tenant_id: Optional[str] = None,
  123 +) -> Optional[Dict[str, Any]]:
  124 + if not _anchor_redis:
  125 + return None
  126 + try:
  127 + key = _make_anchor_cache_key(title, target_lang, tenant_id)
  128 + raw = _anchor_redis.get(key)
  129 + if not raw:
  130 + return None
  131 + return json.loads(raw)
  132 + except Exception as e:
  133 + logger.warning(f"Failed to get anchor cache: {e}")
  134 + return None
  135 +
  136 +
  137 +def _set_cached_anchor_result(
  138 + title: str,
  139 + target_lang: str,
  140 + result: Dict[str, Any],
  141 + tenant_id: Optional[str] = None,
  142 +) -> None:
  143 + if not _anchor_redis:
  144 + return
  145 + try:
  146 + key = _make_anchor_cache_key(title, target_lang, tenant_id)
  147 + ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600
  148 + _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))
  149 + except Exception as e:
  150 + logger.warning(f"Failed to set anchor cache: {e}")
  151 +
  152 +
  153 +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
  154 + """根据目标语言创建 LLM 提示词和表头说明。
  155 +
  156 + 约定:
  157 + - 提示词始终使用英文;
  158 + - 当 target_lang == "en" 时,直接要求用英文分析并输出英文表头;
  159 + - 当 target_lang 为其他语言时,视作“多轮对话”的后续轮次:
  160 + * 默认上一轮已经用英文完成了分析;
  161 + * 当前轮只需要在保持结构和含义不变的前提下,将整张表格翻译为目标语言,
  162 + 包含表头与所有单元格内容。
  163 + """
  164 + lang_name = SOURCE_LANG_CODE_MAP.get(target_lang, target_lang)
  165 +
  166 + prompt = """Please analyze each input product title and extract the following information:
  167 +
  168 +1. Product title: a natural English product name derived from the input title
  169 +2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress)
  170 +3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style)
  171 +4. Target audience: gender / age group, etc. (e.g. young women)
  172 +5. Usage scene
  173 +6. Applicable season
  174 +7. Key attributes
  175 +8. Material description
  176 +9. Functional features
  177 +10. Selling point: one concise key selling sentence for recommendation
  178 +11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc.
  179 +
  180 +Input product list:
  181 +
  182 +"""
  183 +
  184 + for idx, product in enumerate(products, 1):
  185 + prompt += f'{idx}. {product["title"]}\n'
  186 +
  187 + if target_lang == "en":
  188 + # 英文首轮:直接要求英文表头 + 英文内容
  189 + prompt += """
  190 +Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations:
  191 +
  192 +| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text |
  193 +|----|----|----|----|----|----|----|----|----|----|----|----|
  194 +"""
  195 + else:
  196 + # 非英文语言:视作“下一轮对话”,只做翻译,要求表头与内容全部用目标语言
  197 + prompt += f"""
  198 +Now we will output the same table in {lang_name}.
  199 +
  200 +IMPORTANT:
  201 +- Assume you have already generated the full table in English in a previous round.
  202 +- In this round, you must output exactly the same table structure and content,
  203 + but fully translated into {lang_name}, including ALL column headers and ALL cell values.
  204 +- Do NOT change the meaning, fields, or the number/order of rows and columns.
  205 +- Keep valid Markdown table syntax.
  206 +
  207 +Please return ONLY the Markdown table in {lang_name}, without any extra explanations.
  208 +"""
  209 +
  210 + return prompt
  211 +
  212 +
  213 +def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]:
  214 + """调用大模型API(带重试机制),按目标语言选择系统提示词。"""
  215 + headers = {
  216 + "Authorization": f"Bearer {API_KEY}",
  217 + "Content-Type": "application/json",
  218 + }
  219 +
  220 + payload = {
  221 + "model": MODEL_NAME,
  222 + "messages": [
  223 + {
  224 + "role": "system",
  225 + "content": SYSTEM_MESSAGES,
  226 + },
  227 + {
  228 + "role": "user",
  229 + "content": prompt,
  230 + },
  231 + ],
  232 + "temperature": 0.3,
  233 + "top_p": 0.8,
  234 + }
  235 +
  236 + request_data = {
  237 + "headers": {k: v for k, v in headers.items() if k != "Authorization"},
  238 + "payload": payload,
  239 + }
  240 +
  241 + # 主日志 + 详尽日志:LLM Request
  242 + logger.info(f"\n{'=' * 80}")
  243 + logger.info(f"LLM Request (Model: {MODEL_NAME}):")
  244 + logger.info(json.dumps(request_data, ensure_ascii=False, indent=2))
  245 + logger.info(f"\nPrompt:\n{prompt}")
  246 +
  247 + verbose_logger.info(f"\n{'=' * 80}")
  248 + verbose_logger.info(f"LLM Request (Model: {MODEL_NAME}):")
  249 + verbose_logger.info(json.dumps(request_data, ensure_ascii=False, indent=2))
  250 + verbose_logger.info(f"\nPrompt:\n{prompt}")
  251 +
  252 + # 创建session,禁用代理
  253 + session = requests.Session()
  254 + session.trust_env = False # 忽略系统代理设置
  255 +
  256 + try:
  257 + # 重试机制
  258 + for attempt in range(MAX_RETRIES):
  259 + try:
  260 + response = session.post(
  261 + f"{API_BASE_URL}/chat/completions",
  262 + headers=headers,
  263 + json=payload,
  264 + timeout=REQUEST_TIMEOUT,
  265 + proxies={"http": None, "https": None}, # 明确禁用代理
  266 + )
  267 +
  268 + response.raise_for_status()
  269 + result = response.json()
  270 +
  271 + # 主日志 + 详尽日志:LLM Response
  272 + logger.info(f"\nLLM Response:")
  273 + logger.info(json.dumps(result, ensure_ascii=False, indent=2))
  274 +
  275 + verbose_logger.info(f"\nLLM Response:")
  276 + verbose_logger.info(json.dumps(result, ensure_ascii=False, indent=2))
  277 +
  278 + content = result["choices"][0]["message"]["content"]
  279 + logger.info(f"\nExtracted Content:\n{content}")
  280 + verbose_logger.info(f"\nExtracted Content:\n{content}")
  281 +
  282 + return content, json.dumps(result, ensure_ascii=False)
  283 +
  284 + except requests.exceptions.ProxyError as e:
  285 + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}")
  286 + if attempt < MAX_RETRIES - 1:
  287 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  288 + time.sleep(RETRY_DELAY)
  289 + else:
  290 + raise
  291 +
  292 + except requests.exceptions.RequestException as e:
  293 + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}")
  294 + if attempt < MAX_RETRIES - 1:
  295 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  296 + time.sleep(RETRY_DELAY)
  297 + else:
  298 + raise
  299 +
  300 + except Exception as e:
  301 + logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}")
  302 + if attempt < MAX_RETRIES - 1:
  303 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  304 + time.sleep(RETRY_DELAY)
  305 + else:
  306 + raise
  307 +
  308 + finally:
  309 + session.close()
  310 +
  311 +
  312 +def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]:
  313 + """解析markdown表格内容"""
  314 + lines = markdown_content.strip().split("\n")
  315 + data = []
  316 + data_started = False
  317 +
  318 + for line in lines:
  319 + line = line.strip()
  320 + if not line:
  321 + continue
  322 +
  323 + # 表格行处理
  324 + if line.startswith("|"):
  325 + # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |")
  326 + sep_chars = line.replace("|", "").strip().replace(" ", "")
  327 + if sep_chars and set(sep_chars) <= {"-", ":"}:
  328 + data_started = True
  329 + continue
  330 +
  331 + # 首个表头行:无论语言如何,统一跳过
  332 + if not data_started:
  333 + # 等待下一行数据行
  334 + continue
  335 +
  336 + # 解析数据行
  337 + parts = [p.strip() for p in line.split("|")]
  338 + parts = [p for p in parts if p] # 移除空字符串
  339 +
  340 + if len(parts) >= 2:
  341 + row = {
  342 + "seq_no": parts[0],
  343 + "title": parts[1], # 商品标题(按目标语言)
  344 + "category_path": parts[2] if len(parts) > 2 else "", # 品类路径
  345 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签
  346 + "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群
  347 + "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景
  348 + "season": parts[6] if len(parts) > 6 else "", # 适用季节
  349 + "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性
  350 + "material": parts[8] if len(parts) > 8 else "", # 材质说明
  351 + "features": parts[9] if len(parts) > 9 else "", # 功能特点
  352 + "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点
  353 + "anchor_text": parts[11] if len(parts) > 11 else "", # 锚文本
  354 + }
  355 + data.append(row)
  356 +
  357 + return data
  358 +
  359 +
  360 +def process_batch(
  361 + batch_data: List[Dict[str, str]],
  362 + batch_num: int,
  363 + target_lang: str = "zh",
  364 +) -> List[Dict[str, str]]:
  365 + """处理一个批次的数据"""
  366 + logger.info(f"\n{'#' * 80}")
  367 + logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)")
  368 +
  369 + # 创建提示词
  370 + prompt = create_prompt(batch_data, target_lang=target_lang)
  371 +
  372 + # 调用LLM
  373 + try:
  374 + raw_response, full_response_json = call_llm(prompt, target_lang=target_lang)
  375 +
  376 + # 解析结果
  377 + parsed_results = parse_markdown_table(raw_response)
  378 +
  379 + logger.info(f"\nParsed Results ({len(parsed_results)} items):")
  380 + logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2))
  381 +
  382 + # 映射回原始ID
  383 + results_with_ids = []
  384 + for i, parsed_item in enumerate(parsed_results):
  385 + if i < len(batch_data):
  386 + original_id = batch_data[i]["id"]
  387 + result = {
  388 + "id": original_id,
  389 + "lang": target_lang,
  390 + "title_input": batch_data[i]["title"], # 原始输入标题
  391 + "title": parsed_item.get("title", ""), # 模型生成的标题
  392 + "category_path": parsed_item.get("category_path", ""), # 品类路径
  393 + "tags": parsed_item.get("tags", ""), # 细分标签
  394 + "target_audience": parsed_item.get("target_audience", ""), # 适用人群
  395 + "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景
  396 + "season": parsed_item.get("season", ""), # 适用季节
  397 + "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性
  398 + "material": parsed_item.get("material", ""), # 材质说明
  399 + "features": parsed_item.get("features", ""), # 功能特点
  400 + "selling_points": parsed_item.get("selling_points", ""), # 商品卖点
  401 + "anchor_text": parsed_item.get("anchor_text", ""), # 锚文本
  402 + }
  403 + results_with_ids.append(result)
  404 + logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}")
  405 +
  406 + # 保存批次 JSON 日志到独立文件
  407 + batch_log = {
  408 + "batch_num": batch_num,
  409 + "timestamp": datetime.now().isoformat(),
  410 + "input_products": batch_data,
  411 + "raw_response": raw_response,
  412 + "full_response_json": full_response_json,
  413 + "parsed_results": parsed_results,
  414 + "final_results": results_with_ids,
  415 + }
  416 +
  417 + batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json"
  418 + with open(batch_log_file, "w", encoding="utf-8") as f:
  419 + json.dump(batch_log, f, ensure_ascii=False, indent=2)
  420 +
  421 + logger.info(f"Batch log saved to: {batch_log_file}")
  422 +
  423 + return results_with_ids
  424 +
  425 + except Exception as e:
  426 + logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True)
  427 + # 返回空结果,保持ID映射
  428 + return [
  429 + {
  430 + "id": item["id"],
  431 + "lang": target_lang,
  432 + "title_input": item["title"],
  433 + "title": "",
  434 + "category_path": "",
  435 + "tags": "",
  436 + "target_audience": "",
  437 + "usage_scene": "",
  438 + "season": "",
  439 + "key_attributes": "",
  440 + "material": "",
  441 + "features": "",
  442 + "selling_points": "",
  443 + "anchor_text": "",
  444 + "error": str(e),
  445 + }
  446 + for item in batch_data
  447 + ]
  448 +
  449 +
  450 +def analyze_products(
  451 + products: List[Dict[str, str]],
  452 + target_lang: str = "zh",
  453 + batch_size: Optional[int] = None,
  454 + tenant_id: Optional[str] = None,
  455 +) -> List[Dict[str, Any]]:
  456 + """
  457 + 库调用入口:根据输入+语言,返回锚文本及各维度信息。
  458 +
  459 + Args:
  460 + products: [{"id": "...", "title": "..."}]
  461 + target_lang: 输出语言
  462 + batch_size: 批大小,默认使用全局 BATCH_SIZE
  463 + """
  464 + if not API_KEY:
  465 + raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM")
  466 +
  467 + if not products:
  468 + return []
  469 +
  470 + # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中
  471 + if len(products) == 1:
  472 + p = products[0]
  473 + title = str(p.get("title") or "").strip()
  474 + if title:
  475 + cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id)
  476 + if cached:
  477 + logger.info(
  478 + f"[analyze_products] Cache hit for title='{title[:50]}...', "
  479 + f"lang={target_lang}, tenant_id={tenant_id or 'global'}"
  480 + )
  481 + return [cached]
  482 +
  483 + # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20):
  484 + # - 尽可能攒批处理;
  485 + # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。
  486 + req_bs = BATCH_SIZE if batch_size is None else int(batch_size)
  487 + bs = max(1, min(req_bs, BATCH_SIZE))
  488 + all_results: List[Dict[str, Any]] = []
  489 + total_batches = (len(products) + bs - 1) // bs
  490 +
  491 + for i in range(0, len(products), bs):
  492 + batch_num = i // bs + 1
  493 + batch = products[i : i + bs]
  494 + logger.info(
  495 + f"[analyze_products] Processing batch {batch_num}/{total_batches}, "
  496 + f"size={len(batch)}, target_lang={target_lang}"
  497 + )
  498 + batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang)
  499 + all_results.extend(batch_results)
  500 +
  501 + # 写入缓存
  502 + for item in batch_results:
  503 + title_input = str(item.get("title_input") or "").strip()
  504 + if not title_input:
  505 + continue
  506 + if item.get("error"):
  507 + # 不缓存错误结果,避免放大临时故障
  508 + continue
  509 + try:
  510 + _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id)
  511 + except Exception:
  512 + # 已在内部记录 warning
  513 + pass
  514 +
  515 + return all_results
  516 +
... ...
providers/translation.py
... ... @@ -2,7 +2,7 @@
2 2 from __future__ import annotations
3 3  
4 4 import logging
5   -from typing import Any, Dict, Optional
  5 +from typing import Any, Dict, List, Optional, Sequence, Union
6 6 import requests
7 7  
8 8 from config.services_config import get_translation_config, get_translation_base_url
... ... @@ -23,16 +23,49 @@ class HttpTranslationProvider:
23 23 self.model = model or "qwen"
24 24 self.timeout_sec = float(timeout_sec or 10.0)
25 25  
  26 + @property
  27 + def supports_batch(self) -> bool:
  28 + """
  29 + Whether this provider supports list input natively.
  30 +
  31 + 当前实现中,我们已经在 `_translate_once` 内处理了 list,
  32 + 所以可以直接视为支持 batch。
  33 + """
  34 + return True
  35 +
26 36 def _translate_once(
27 37 self,
28   - text: str,
  38 + text: Union[str, Sequence[str]],
29 39 target_lang: str,
30 40 source_lang: Optional[str] = None,
31 41 context: Optional[str] = None,
32 42 prompt: Optional[str] = None,
33   - ) -> Optional[str]:
  43 + ) -> Union[Optional[str], List[Optional[str]]]:
  44 + # 允许 text 为单个字符串或字符串列表
  45 + if isinstance(text, (list, tuple)):
  46 + # 上游约定:列表输入时,输出列表一一对应;失败位置为 None
  47 + results: List[Optional[str]] = []
  48 + for item in text:
  49 + if item is None or not str(item).strip():
  50 + # 空字符串/None 不视为失败,原样返回以保持语义
  51 + results.append(item) # type: ignore[arg-type]
  52 + continue
  53 + try:
  54 + single = self._translate_once(
  55 + text=str(item),
  56 + target_lang=target_lang,
  57 + source_lang=source_lang,
  58 + context=context,
  59 + prompt=prompt,
  60 + )
  61 + results.append(single) # type: ignore[arg-type]
  62 + except Exception:
  63 + # 理论上不会进入,因为内部已捕获;兜底保持长度一致
  64 + results.append(None)
  65 + return results
  66 +
34 67 if not text or not str(text).strip():
35   - return text
  68 + return text # type: ignore[return-value]
36 69 try:
37 70 url = f"{self.base_url}/translate"
38 71 payload = {
... ... @@ -62,12 +95,12 @@ class HttpTranslationProvider:
62 95  
63 96 def translate(
64 97 self,
65   - text: str,
  98 + text: Union[str, Sequence[str]],
66 99 target_lang: str,
67 100 source_lang: Optional[str] = None,
68 101 context: Optional[str] = None,
69 102 prompt: Optional[str] = None,
70   - ) -> Optional[str]:
  103 + ) -> Union[Optional[str], List[Optional[str]]]:
71 104 return self._translate_once(
72 105 text=text,
73 106 target_lang=target_lang,
... ...
query/deepl_provider.py
... ... @@ -10,7 +10,7 @@ from __future__ import annotations
10 10 import logging
11 11 import os
12 12 import re
13   -from typing import Dict, Optional, Tuple
  13 +from typing import Dict, List, Optional, Sequence, Tuple, Union
14 14  
15 15 import requests
16 16 from config.services_config import get_translation_config
... ... @@ -88,6 +88,14 @@ class DeepLProvider:
88 88 if not self.api_key:
89 89 logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable")
90 90  
  91 + @property
  92 + def supports_batch(self) -> bool:
  93 + """
  94 + DeepL HTTP API 本身支持一次传多条 text,这里先返回 False,
  95 + 由上层逐条拆分,后续如果要真正批量,可调整实现。
  96 + """
  97 + return False
  98 +
91 99 def _resolve_request_context(
92 100 self,
93 101 target_lang: str,
... ... @@ -108,12 +116,28 @@ class DeepLProvider:
108 116  
109 117 def translate(
110 118 self,
111   - text: str,
  119 + text: Union[str, Sequence[str]],
112 120 target_lang: str,
113 121 source_lang: Optional[str] = None,
114 122 context: Optional[str] = None,
115 123 prompt: Optional[str] = None,
116   - ) -> Optional[str]:
  124 + ) -> Union[Optional[str], List[Optional[str]]]:
  125 + if isinstance(text, (list, tuple)):
  126 + results: List[Optional[str]] = []
  127 + for item in text:
  128 + if item is None or not str(item).strip():
  129 + results.append(item) # type: ignore[arg-type]
  130 + continue
  131 + out = self.translate(
  132 + text=str(item),
  133 + target_lang=target_lang,
  134 + source_lang=source_lang,
  135 + context=context,
  136 + prompt=prompt,
  137 + )
  138 + results.append(out)
  139 + return results
  140 +
117 141 if not self.api_key:
118 142 return None
119 143  
... ...
query/llm_translate.py
... ... @@ -11,13 +11,14 @@ from __future__ import annotations
11 11 import logging
12 12 import os
13 13 import time
14   -from typing import Optional
  14 +from typing import List, Optional, Sequence, Union
15 15  
16 16 from openai import OpenAI
17 17  
18 18 from config.env_config import DASHSCOPE_API_KEY
19 19 from config.services_config import get_translation_config
20   -from config.translate_prompts import TRANSLATION_PROMPTS, SOURCE_LANG_CODE_MAP
  20 +from config.translate_prompts import TRANSLATION_PROMPTS
  21 +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP
21 22  
22 23  
23 24 logger = logging.getLogger(__name__)
... ... @@ -96,6 +97,12 @@ class LLMTranslatorProvider:
96 97 )
97 98 self.client = self._create_client()
98 99  
  100 + @property
  101 + def supports_batch(self) -> bool:
  102 + """Whether this provider efficiently supports list input."""
  103 + # 我们在 translate 中已经原生支持 list,所以这里返回 True
  104 + return True
  105 +
99 106 def _create_client(self) -> Optional[OpenAI]:
100 107 api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY")
101 108 if not api_key:
... ... @@ -107,7 +114,7 @@ class LLMTranslatorProvider:
107 114 logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True)
108 115 return None
109 116  
110   - def translate(
  117 + def _translate_single(
111 118 self,
112 119 text: str,
113 120 target_lang: str,
... ... @@ -148,7 +155,14 @@ class LLMTranslatorProvider:
148 155 if not content:
149 156 logger.warning("[llm] Empty result | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms)
150 157 return None
151   - logger.info("[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms", src, tgt, text, content, latency_ms)
  158 + logger.info(
  159 + "[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms",
  160 + src,
  161 + tgt,
  162 + text,
  163 + content,
  164 + latency_ms,
  165 + )
152 166 return content
153 167 except Exception as exc:
154 168 latency_ms = (time.time() - start) * 1000
... ... @@ -162,16 +176,56 @@ class LLMTranslatorProvider:
162 176 )
163 177 return None
164 178  
  179 + def translate(
  180 + self,
  181 + text: Union[str, Sequence[str]],
  182 + target_lang: str,
  183 + source_lang: Optional[str] = None,
  184 + context: Optional[str] = None,
  185 + prompt: Optional[str] = None,
  186 + ) -> Union[Optional[str], List[Optional[str]]]:
  187 + """
  188 + Translate a single string or a list of strings.
  189 +
  190 + - If input is a list, returns a list of the same length.
  191 + - Per-item failures are returned as None.
  192 + """
  193 + if isinstance(text, (list, tuple)):
  194 + results: List[Optional[str]] = []
  195 + for item in text:
  196 + # 保证一一对应,即使某个元素为空也占位
  197 + if item is None:
  198 + results.append(None)
  199 + continue
  200 + results.append(
  201 + self._translate_single(
  202 + text=str(item),
  203 + target_lang=target_lang,
  204 + source_lang=source_lang,
  205 + context=context,
  206 + prompt=prompt,
  207 + )
  208 + )
  209 + return results
  210 +
  211 + return self._translate_single(
  212 + text=str(text),
  213 + target_lang=target_lang,
  214 + source_lang=source_lang,
  215 + context=context,
  216 + prompt=prompt,
  217 + )
  218 +
165 219  
166 220 def llm_translate(
167   - text: str,
  221 + text: Union[str, Sequence[str]],
168 222 target_lang: str,
169 223 *,
170 224 source_lang: Optional[str] = None,
171 225 source_lang_label: Optional[str] = None,
172 226 target_lang_label: Optional[str] = None,
173 227 timeout_sec: Optional[float] = None,
174   -) -> Optional[str]:
  228 +) -> Union[Optional[str], List[Optional[str]]]:
175 229 provider = LLMTranslatorProvider(timeout_sec=timeout_sec or 30.0)
176 230 return provider.translate(
177 231 text=text,
... ...
query/qwen_mt_translate.py
... ... @@ -7,14 +7,14 @@ import logging
7 7 import os
8 8 import re
9 9 import time
10   -from typing import Dict, List, Optional
  10 +from typing import Dict, List, Optional, Sequence, Union
11 11  
12 12 import redis
13 13 from openai import OpenAI
14 14  
15 15 from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG
16 16 from config.services_config import get_translation_cache_config
17   -from config.translate_prompts import SOURCE_LANG_CODE_MAP
  17 +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP
18 18  
19 19 logger = logging.getLogger(__name__)
20 20  
... ... @@ -62,6 +62,16 @@ class Translator:
62 62 if self.use_cache and bool(cache_cfg.get("enabled", True)):
63 63 self.redis_client = self._init_redis_client()
64 64  
  65 + @property
  66 + def supports_batch(self) -> bool:
  67 + """
  68 + 标记该 provider 已支持列表输入。
  69 +
  70 + 当前实现为循环单条调用(带缓存),不是真正的并行批量请求,
  71 + 但对上层来说可以直接传 list,返回 list。
  72 + """
  73 + return True
  74 +
65 75 @staticmethod
66 76 def _normalize_model(model: str) -> str:
67 77 m = (model or "qwen").strip().lower()
... ... @@ -117,14 +127,31 @@ class Translator:
117 127  
118 128 def translate(
119 129 self,
120   - text: str,
  130 + text: Union[str, Sequence[str]],
121 131 target_lang: str,
122 132 source_lang: Optional[str] = None,
123 133 context: Optional[str] = None,
124 134 prompt: Optional[str] = None,
125   - ) -> Optional[str]:
126   - if not text or not text.strip():
127   - return text
  135 + ) -> Union[Optional[str], List[Optional[str]]]:
  136 + if isinstance(text, (list, tuple)):
  137 + results: List[Optional[str]] = []
  138 + for item in text:
  139 + if item is None or not str(item).strip():
  140 + results.append(item) # type: ignore[arg-type]
  141 + continue
  142 + # 对于 batch,这里沿用单条的缓存与规则,逐条调用
  143 + out = self.translate(
  144 + text=str(item),
  145 + target_lang=target_lang,
  146 + source_lang=source_lang,
  147 + context=context,
  148 + prompt=prompt,
  149 + )
  150 + results.append(out)
  151 + return results
  152 +
  153 + if not text or not str(text).strip():
  154 + return text # type: ignore[return-value]
128 155  
129 156 tgt = (target_lang or "").strip().lower()
130 157 src = (source_lang or "").strip().lower() or None
... ...
tests/ci/test_service_api_contracts.py
... ... @@ -342,7 +342,7 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient):
342 342  
343 343  
344 344 def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch):
345   - import indexer.product_annotator as process_products
  345 + import indexer.product_enrich as process_products
346 346  
347 347 def _fake_analyze_products(
348 348 products: List[Dict[str, str]],
... ...
tests/test_process_products_batching.py
... ... @@ -2,7 +2,7 @@ from __future__ import annotations
2 2  
3 3 from typing import Any, Dict, List
4 4  
5   -import indexer.product_annotator as process_products
  5 +import indexer.product_enrich as process_products
6 6  
7 7  
8 8 def _mk_products(n: int) -> List[Dict[str, str]]:
... ...