From 6f7840cfeb5d4f57eb81674aff77a1a0d60094ab Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 17 Mar 2026 10:50:52 +0800 Subject: [PATCH] refactor: rename product annotator to enrich and expand multilingual prompts --- api/routes/indexer.py | 21 +++++---------------- api/translator_app.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------- config/config.yaml | 2 +- config/tenant_config_loader.py | 12 ++++++++---- config/translate_prompts.py | 42 +----------------------------------------- docs/DEVELOPER_GUIDE.md | 7 ++++++- docs/工作总结-微服务性能优化与架构.md | 4 ++-- docs/搜索API对接指南.md | 6 +++++- docs/翻译模块说明.md | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ indexer/document_transformer.py | 15 ++++++--------- indexer/product_annotator.py | 685 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- indexer/product_enrich.py | 516 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ providers/translation.py | 45 +++++++++++++++++++++++++++++++++++++++------ query/deepl_provider.py | 30 +++++++++++++++++++++++++++--- query/llm_translate.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ query/qwen_mt_translate.py | 39 +++++++++++++++++++++++++++++++++------ tests/ci/test_service_api_contracts.py | 2 +- tests/test_process_products_batching.py | 2 +- 18 files changed, 889 insertions(+), 798 deletions(-) delete mode 100644 indexer/product_annotator.py create mode 100644 indexer/product_enrich.py diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 57bef0c..d50f578 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -443,23 +443,12 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest): def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: """ - 同步执行内容理解:调用 product_annotator.analyze_products,按语言批量跑 LLM, + 同步执行内容理解:调用 product_enrich.analyze_products,按语言批量跑 LLM, 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。 """ - from indexer.product_annotator import analyze_products, SUPPORTED_LANGS - - llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS] - if not llm_langs: - return [ - { - "spu_id": it["spu_id"], - "qanchors": {}, - "semantic_attributes": [], - "tags": [], - "error": "no supported languages (supported: %s)" % sorted(SUPPORTED_LANGS), - } - for it in items - ] + from indexer.product_enrich import analyze_products + + llm_langs = list(dict.fromkeys(languages)) or ["en"] products = [{"id": it["spu_id"], "title": (it.get("title") or "").strip()} for it in items] dim_keys = [ @@ -544,7 +533,7 @@ async def enrich_content(request: EnrichContentRequest): - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 - 实现逻辑与 indexer.product_annotator.analyze_products 一致,支持多语言与 Redis 缓存。 + 实现逻辑与 indexer.product_enrich.analyze_products 一致,支持多语言与 Redis 缓存。 """ try: if not request.items: diff --git a/api/translator_app.py b/api/translator_app.py index 096c3c2..57a5f97 100644 --- a/api/translator_app.py +++ b/api/translator_app.py @@ -88,7 +88,7 @@ import sys import logging import argparse import uvicorn -from typing import Optional, Dict +from typing import Dict, List, Optional, Sequence, Union from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware @@ -162,7 +162,7 @@ def get_translator(model: str = "qwen") -> object: # Request/Response models class TranslationRequest(BaseModel): """Translation request model.""" - text: str = Field(..., description="Text to translate") + text: Union[str, List[str]] = Field(..., description="Text to translate (string or list of strings)") target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)") source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)") model: Optional[str] = Field(None, description="Translation model: qwen-mt | deepl | llm") @@ -183,10 +183,13 @@ class TranslationRequest(BaseModel): class TranslationResponse(BaseModel): """Translation response model.""" - text: str = Field(..., description="Original text") + text: Union[str, List[str]] = Field(..., description="Original text (string or list)") target_lang: str = Field(..., description="Target language code") source_lang: Optional[str] = Field(None, description="Source language code (detected or provided)") - translated_text: str = Field(..., description="Translated text") + translated_text: Union[str, List[Optional[str]]] = Field( + ..., + description="Translated text (string or list; list elements may be null on failure)", + ) status: str = Field(..., description="Translation status") model: str = Field(..., description="Translation model used") @@ -260,11 +263,19 @@ async def translate(request: TranslationRequest): Supports both Qwen (default) and DeepL models via the 'model' parameter. """ - if not request.text or not request.text.strip(): - raise HTTPException( - status_code=400, - detail="Text cannot be empty" - ) + # 允许 text 为字符串或字符串列表 + if isinstance(request.text, list): + if not request.text: + raise HTTPException( + status_code=400, + detail="Text list cannot be empty" + ) + else: + if not request.text or not request.text.strip(): + raise HTTPException( + status_code=400, + detail="Text cannot be empty" + ) if not request.target_lang: raise HTTPException( @@ -283,24 +294,96 @@ async def translate(request: TranslationRequest): try: # Get translator instance for the specified model translator = get_translator(model=model) - - # Translate using the fixed prompt + raw_text = request.text + + # 如果是列表,并且底层 provider 声明支持 batch,则直接传 list + if isinstance(raw_text, list) and getattr(translator, "supports_batch", False): + try: + translated_list = translator.translate( + text=raw_text, + target_lang=request.target_lang, + source_lang=request.source_lang, + context=request.context, + prompt=request.prompt, + ) + except Exception as exc: + logger.error("Batch translation failed: %s", exc, exc_info=True) + # 回退到逐条拆分逻辑 + translated_list = None + + if translated_list is not None: + # 规范化为 List[Optional[str]],并保证长度对应 + if not isinstance(translated_list, list): + raise HTTPException( + status_code=500, + detail="Batch translation provider returned non-list result", + ) + normalized: List[Optional[str]] = [] + for idx, item in enumerate(raw_text): + if idx < len(translated_list): + val = translated_list[idx] + else: + val = None + # 失败语义:失败位置为 None + normalized.append(val) + + return TranslationResponse( + text=raw_text, + target_lang=request.target_lang, + source_lang=request.source_lang, + translated_text=normalized, + status="success", + model=str(getattr(translator, "model", model)), + ) + + # 否则:统一走逐条拆分逻辑(包括不支持 batch 的 provider) + if isinstance(raw_text, list): + results: List[Optional[str]] = [] + for item in raw_text: + if item is None or not str(item).strip(): + # 空元素不视为失败,直接返回原值 + results.append(item) # type: ignore[arg-type] + continue + try: + out = translator.translate( + text=str(item), + target_lang=request.target_lang, + source_lang=request.source_lang, + context=request.context, + prompt=request.prompt, + ) + except Exception as exc: + logger.warning("Per-item translation failed: %s", exc, exc_info=True) + out = None + # 失败语义:该元素为 None + results.append(out) + + return TranslationResponse( + text=raw_text, + target_lang=request.target_lang, + source_lang=request.source_lang, + translated_text=results, + status="success", + model=str(getattr(translator, "model", model)), + ) + + # 单文本模式:保持原有严格失败语义 translated_text = translator.translate( - text=request.text, + text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, context=request.context, prompt=request.prompt, ) - + if translated_text is None: raise HTTPException( status_code=500, detail="Translation failed" ) - + return TranslationResponse( - text=request.text, + text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, translated_text=translated_text, diff --git a/config/config.yaml b/config/config.yaml index b18090e..ffed5e8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -224,7 +224,7 @@ spu_config: # 租户配置(Tenant Configuration) # 每个租户可配置主语言 primary_language 与索引语言 index_languages(主市场语言,商家可勾选) -# 默认 index_languages: [en, zh],可配置为任意 SUPPORTED_INDEX_LANGUAGES 的子集 +# 默认 index_languages: [en, zh],可配置为任意 SOURCE_LANG_CODE_MAP.keys() 的子集 tenant_config: default: primary_language: "en" diff --git a/config/tenant_config_loader.py b/config/tenant_config_loader.py index fd1867d..4450963 100644 --- a/config/tenant_config_loader.py +++ b/config/tenant_config_loader.py @@ -11,7 +11,8 @@ from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) # 支持的索引语言:code -> display name(供商家勾选主市场语言等场景使用) -SUPPORTED_INDEX_LANGUAGES: Dict[str, str] = { +# 语言代码与展示名的双向映射(供翻译/LLM 提示等统一使用) +SOURCE_LANG_CODE_MAP: Dict[str, str] = { "en": "English", "zh": "Chinese", "zh_tw": "Traditional Chinese", @@ -51,6 +52,9 @@ SUPPORTED_INDEX_LANGUAGES: Dict[str, str] = { "bg": "Bulgarian", } +TARGET_LANG_CODE_MAP: Dict[str, str] = {v: k for k, v in SOURCE_LANG_CODE_MAP.items()} + + def normalize_index_languages(value: Any, primary_language: str = "en") -> List[str]: """ 将 index_languages 配置规范化为合法语言代码列表。 @@ -67,7 +71,7 @@ def normalize_index_languages(value: Any, primary_language: str = "en") -> List[ code = (item or "").strip().lower() if not code or code in seen: continue - if code in SUPPORTED_INDEX_LANGUAGES: + if code in SOURCE_LANG_CODE_MAP: valid.append(code) seen.add(code) return valid @@ -91,11 +95,11 @@ def resolve_index_languages( to_en = bool(tenant_config.get("translate_to_en")) to_zh = bool(tenant_config.get("translate_to_zh")) langs: List[str] = [] - if primary and primary in SUPPORTED_INDEX_LANGUAGES: + if primary and primary in SOURCE_LANG_CODE_MAP: langs.append(primary) for code in ("en", "zh"): if code not in langs and ((code == "en" and to_en) or (code == "zh" and to_zh)): - if code in SUPPORTED_INDEX_LANGUAGES: + if code in SOURCE_LANG_CODE_MAP: langs.append(code) return langs if langs else list(default_index_languages) diff --git a/config/translate_prompts.py b/config/translate_prompts.py index d1e8f92..2637f8e 100644 --- a/config/translate_prompts.py +++ b/config/translate_prompts.py @@ -1,44 +1,4 @@ -SOURCE_LANG_CODE_MAP = { - "en": "English", - "zh": "Chinese", - "zh_tw": "Traditional Chinese", - "ru": "Russian", - "ja": "Japanese", - "ko": "Korean", - "es": "Spanish", - "fr": "French", - "pt": "Portuguese", - "de": "German", - "it": "Italian", - "th": "Thai", - "vi": "Vietnamese", - "id": "Indonesian", - "ms": "Malay", - "ar": "Arabic", - "hi": "Hindi", - "he": "Hebrew", - "my": "Burmese", - "ta": "Tamil", - "ur": "Urdu", - "bn": "Bengali", - "pl": "Polish", - "nl": "Dutch", - "ro": "Romanian", - "tr": "Turkish", - "km": "Khmer", - "lo": "Lao", - "yue": "Cantonese", - "cs": "Czech", - "el": "Greek", - "sv": "Swedish", - "hu": "Hungarian", - "da": "Danish", - "fi": "Finnish", - "uk": "Ukrainian", - "bg": "Bulgarian", -} - -TARGET_LANG_CODE_MAP = {v: k for k, v in SOURCE_LANG_CODE_MAP.items()} +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP TRANSLATION_PROMPTS = { "general": { diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 13f5f6b..0ce4514 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -92,7 +92,7 @@ MySQL (店匠 SPU/SKU) | indexer | 6004 | 索引 API(reindex/build-docs 等) | ✓ | | frontend | 6003 | 调试 UI | ✓ | | embedding | 6005 | 向量服务(文本/图片) | 可选 | -| translator | 6006 | 翻译服务 | 可选 | +| translator | 6006 | 翻译服务(`POST /translate` 支持单条或批量 list;批量失败用 `null` 占位) | 可选 | | reranker | 6007 | 重排服务 | 可选 | - 启动:`./run.sh` 仅启动 backend / indexer / frontend;需全功能时通过环境变量或脚本另行启动 embedding / translator / reranker。 @@ -170,6 +170,11 @@ docs/ # 文档(含本指南) - **原则**:业务代码只依赖 Provider 接口,不依赖具体 URL 或后端类型;新增调用方式(如新 Provider 类型)在对应 `providers/.py` 中实现并在工厂中注册。 - **详见**:本指南 §7.2;[QUICKSTART.md](./QUICKSTART.md) §3。 +补充约定(翻译 provider): + +- `translate(text=...)` 支持 `str` 与 `List[str]` 两种输入;当输入为列表时,输出必须与输入 **等长且顺序对应**,失败位置为 `None`(HTTP JSON 表现为 `null`)。 +- provider 可暴露 `supports_batch: bool`(property)用于标识其是否支持直接批量调用;上层在处理 `text` 为列表时可优先走 batch,否则逐条拆分调用。 + ### 4.9 suggestion - **职责**:建议索引的构建与检索:从 ES 商品索引与 MySQL 日志等构建 suggestion 索引;搜索 API 的 `/search/suggestions` 使用本模块。 diff --git a/docs/工作总结-微服务性能优化与架构.md b/docs/工作总结-微服务性能优化与架构.md index 503cbf7..2659c33 100644 --- a/docs/工作总结-微服务性能优化与架构.md +++ b/docs/工作总结-微服务性能优化与架构.md @@ -101,9 +101,9 @@ instruction: "Given a shopping query, rank product titles by relevance" **具体内容**: - **接口**:`POST /indexer/enrich-content`(Indexer 服务端口 **6004**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`semantic_attributes`、`tags`。 -- **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_annotator.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。 +-- **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_enrich.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。 - **Suggest 侧**:`suggestion/builder.py` 从 ES 商品索引读取 `_source: ["id", "spu_id", "title", "qanchors"]`,对 `qanchors.{lang}` 用 `_split_qanchors` 拆成词条,以 `source="qanchor"` 加入候选,排序时 `qanchor` 权重大于纯 title(`add_product("qanchor", ...)`);suggest 配置中 `sources: ["query_log", "qanchor"]` 表示候选来源包含 qanchor。 -- **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_annotator` 一致。 +- **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_enrich` 一致。 **状态**:内容理解字段已接入索引与 suggest 链路;依赖内容理解(qanchors/tags)的**全量数据尚未全部完成一轮**,后续需持续跑满并校验效果。 diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 3463299..5621b4f 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -1484,7 +1484,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ ### 5.8 内容理解字段生成接口 - **端点**: `POST /indexer/enrich-content` -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_annotator` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_enrich` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 #### 请求参数 @@ -1862,6 +1862,10 @@ curl "http://localhost:6007/health" } ``` +> **失败语义(批量)**:当 `text` 为列表时,如果其中某条翻译失败,对应位置返回 `null`(即 `translated_text[i] = null`),并保持数组长度与顺序不变;接口整体仍返回 `status="success"`,用于避免“部分失败”导致整批请求失败。 + +> **实现提示(可忽略)**:服务端会尽可能使用底层翻译 provider 的批量能力(若支持),否则自动拆分逐条翻译;无论采用哪种方式,上述批量契约保持一致。 + **完整 curl 示例**: 中文 → 英文: diff --git a/docs/翻译模块说明.md b/docs/翻译模块说明.md index 91bd643..5be5c3e 100644 --- a/docs/翻译模块说明.md +++ b/docs/翻译模块说明.md @@ -24,3 +24,83 @@ TRANSLATION_MODEL=qwen # 或 deepl ## Provider 配置 Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [QUICKSTART.md](./QUICKSTART.md) §3 与 [DEVELOPER_GUIDE.md](./DEVELOPER_GUIDE.md) §7.2。 + +## HTTP 接口契约(translator service,端口 6006) + +服务默认监听 `http://localhost:6006`,提供: + +- `POST /translate`: 文本翻译(支持 `qwen/qwen-mt`、`deepl`、`llm`) +- `GET /health`: 健康检查 + +### `POST /translate` + +**请求体**: + +```json +{ + "text": "商品名称", + "target_lang": "en", + "source_lang": "zh", + "model": "qwen", + "context": "sku_name", + "prompt": null +} +``` + +- `text` 支持两种形式: + - 单条:`string` + - 批量:`string[]`(等长返回,顺序对应) + +**响应体**(单条): + +```json +{ + "text": "商品名称", + "target_lang": "en", + "source_lang": "zh", + "translated_text": "Product name", + "status": "success", + "model": "qwen" +} +``` + +**响应体**(批量): + +```json +{ + "text": ["商品名称1", "商品名称2"], + "target_lang": "en", + "source_lang": "zh", + "translated_text": ["Product name 1", null], + "status": "success", + "model": "qwen" +} +``` + +批量模式下,**单条失败用 `null` 占位**(即 `translated_text[i] = null`),保证长度与顺序一一对应,避免部分失败导致整批报错。 + +--- + +## 开发者接口约定(Provider / 代码调用) + +除 HTTP 微服务外,代码侧(如 query/indexer)通常通过 `providers.translation.create_translation_provider()` 获取翻译 provider 实例并调用 `translate()`。 + +### 输入输出形状(Shape) + +- `translate(text=...)` 支持: + - **单条**:`text: str` → 返回 `Optional[str]` + - **批量**:`text: List[str]` → 返回 `List[Optional[str]]` +- **批量语义**:返回列表必须与输入 **等长且顺序对应**;某条翻译失败时,对应位置为 `None`(HTTP JSON 中表现为 `null`)。 + +### 批量能力标识(supports_batch) + +不同 provider 对批量的实现方式可能不同(例如:真正一次请求传多条,或内部循环逐条翻译并保持 shape)。 + +为便于上层(如 `api/translator_app.py`)做最优调用,provider 可暴露: + +- `supports_batch: bool`(property) + +上层在收到 `text` 为列表时: + +- **若 `supports_batch=True`**:可以直接将列表传给 `translate(text=[...])` +- **若 `supports_batch=False`**:上层会逐条拆分调用(仍保证输出列表一一对应、失败为 `null`) diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index a7d7e7b..993fc96 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -14,7 +14,7 @@ import logging import re from typing import Dict, Any, Optional, List from config import ConfigLoader -from indexer.product_annotator import analyze_products, SUPPORTED_LANGS +from indexer.product_enrich import analyze_products logger = logging.getLogger(__name__) @@ -225,9 +225,8 @@ class SPUDocumentTransformer: index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] except Exception: index_langs = ["en", "zh"] - llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] - if not llm_langs: - return + # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用 + llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序 # 只对有 title 的 SPU 参与 LLM;其余跳过 id_to_idx: Dict[str, int] = {} @@ -651,7 +650,7 @@ class SPUDocumentTransformer: def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: """ - 调用 indexer.product_annotator.analyze_products,为当前 SPU 填充: + 调用 indexer.product_enrich.analyze_products,为当前 SPU 填充: - qanchors.{lang} - semantic_attributes (lang/name/value) """ @@ -660,10 +659,8 @@ class SPUDocumentTransformer: except Exception: index_langs = ["en", "zh"] - # 只在支持的语言集合内调用 - llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] - if not llm_langs: - return + # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用 + llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序 spu_id = str(spu_row.get("id") or "").strip() title = str(spu_row.get("title") or "").strip() diff --git a/indexer/product_annotator.py b/indexer/product_annotator.py deleted file mode 100644 index 6660820..0000000 --- a/indexer/product_annotator.py +++ /dev/null @@ -1,685 +0,0 @@ -#!/usr/bin/env python3 -""" -商品品类分析脚本 -批量读取商品标题,调用大模型进行品类分析,并保存结果 -""" - -import csv -import os -import json -import logging -import time -import hashlib -from datetime import datetime -from typing import List, Dict, Tuple, Any, Optional - -import redis -import requests -from pathlib import Path -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from config.env_config import REDIS_CONFIG - -# 配置 -BATCH_SIZE = 20 -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 -API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" -MODEL_NAME = "qwen-flash" -API_KEY = os.environ.get("DASHSCOPE_API_KEY") -MAX_RETRIES = 3 -RETRY_DELAY = 5 # 秒 -REQUEST_TIMEOUT = 180 # 秒 - -# 文件路径 -INPUT_FILE = "saas_170_products.csv" -OUTPUT_DIR = Path("output_logs") -OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv" -LOG_DIR = OUTPUT_DIR / "logs" - -# 设置独立日志(不影响全局 indexer.log) -LOG_DIR.mkdir(parents=True, exist_ok=True) -timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") -log_file = LOG_DIR / f"process_{timestamp}.log" - -logger = logging.getLogger("product_annotator") -logger.setLevel(logging.INFO) - -if not logger.handlers: - formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - - file_handler = logging.FileHandler(log_file, encoding="utf-8") - file_handler.setFormatter(formatter) - - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(formatter) - - logger.addHandler(file_handler) - logger.addHandler(stream_handler) - - # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件 - logger.propagate = False - - -# Redis 缓存(用于 anchors / 语义属性) -ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") -ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) -_anchor_redis: Optional[redis.Redis] = None - -try: - _anchor_redis = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), - decode_responses=True, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), - health_check_interval=10, - ) - _anchor_redis.ping() - logger.info("Redis cache initialized for product anchors and semantic attributes") -except Exception as e: - logger.warning(f"Failed to initialize Redis for anchors cache: {e}") - _anchor_redis = None - - -LANG_LABELS: Dict[str, str] = { - "zh": "中文", - "en": "英文", - "de": "德文", - "ru": "俄文", - "fr": "法文", -} - -SUPPORTED_LANGS = set(LANG_LABELS.keys()) - -SYSTEM_MESSAGES: Dict[str, str] = { - "zh": ( - "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注," - "并按要求格式返回 Markdown 表格。所有输出内容必须为中文。" - ), - "en": ( - "You are a product annotator for an e-commerce platform. " - "For each input product, you must understand, analyze and label it, " - "and return a Markdown table strictly following the requested format. " - "All output must be in English." - ), - "de": ( - "Du bist ein Produktannotator für eine E‑Commerce‑Plattform. " - "Du sollst jedes Eingabeprodukt verstehen, analysieren und beschriften " - "und eine Markdown-Tabelle im geforderten Format zurückgeben. " - "Alle Ausgaben müssen auf Deutsch sein." - ), - "ru": ( - "Вы — разметчик товаров для платформы электронной коммерции. " - "Ваша задача — понимать, анализировать и размечать каждый товар " - "и возвращать таблицу Markdown в требуемом формате. " - "Весь вывод должен быть на русском языке." - ), - "fr": ( - "Vous êtes annotateur de produits pour une plateforme e‑commerce. " - "Pour chaque produit en entrée, vous devez le comprendre, l’analyser et l’annoter, " - "puis renvoyer un tableau Markdown au format demandé. " - "Toute la sortie doit être en français." - ), -} - - -def _make_anchor_cache_key( - title: str, - target_lang: str, - tenant_id: Optional[str] = None, -) -> str: - """构造 anchors/语义属性的缓存 key。""" - base = (tenant_id or "global").strip() - h = hashlib.md5(title.encode("utf-8")).hexdigest() - return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}" - - -def _get_cached_anchor_result( - title: str, - target_lang: str, - tenant_id: Optional[str] = None, -) -> Optional[Dict[str, Any]]: - if not _anchor_redis: - return None - try: - key = _make_anchor_cache_key(title, target_lang, tenant_id) - raw = _anchor_redis.get(key) - if not raw: - return None - return json.loads(raw) - except Exception as e: - logger.warning(f"Failed to get anchor cache: {e}") - return None - - -def _set_cached_anchor_result( - title: str, - target_lang: str, - result: Dict[str, Any], - tenant_id: Optional[str] = None, -) -> None: - if not _anchor_redis: - return - try: - key = _make_anchor_cache_key(title, target_lang, tenant_id) - ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 - _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) - except Exception as e: - logger.warning(f"Failed to set anchor cache: {e}") - - -def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: - """根据目标语言创建 LLM 提示词和表头说明。""" - if target_lang == "en": - prompt = """Please analyze each input product title and extract the following information: - -1. Product title: a natural English product name derived from the input title -2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress) -3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style) -4. Target audience: gender / age group, etc. (e.g. young women) -5. Usage scene -6. Applicable season -7. Key attributes -8. Material description -9. Functional features -10. Selling point: one concise key selling sentence for recommendation -11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc. - -Input product list: - -""" - prompt_tail = """ -Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations: - -| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "de": - prompt = """Bitte analysiere jeden eingegebenen Produkttitel und extrahiere die folgenden Informationen: - -1. Produkttitel: ein natürlicher deutscher Produkttitel basierend auf dem Eingangstitel -2. Kategoriepfad: von Oberkategorie bis Feinkategorie, getrennt durch ">" (z. B. Kleidung>Damen>Kleider>Businesskleid) -3. Feinkörnige Tags: Stil / Merkmale / Eigenschaften (z. B. Blumenmuster, tailliert, französischer Stil) -4. Zielgruppe: Geschlecht / Altersgruppe usw. (z. B. junge Frauen) -5. Einsatzszenario -6. Geeignete Saison -7. Wichtige Attribute -8. Materialbeschreibung -9. Funktionale Merkmale -10. Verkaufsargument: ein prägnanter, einzeiliger Haupt-Selling-Point für Empfehlungen -11. Ankertexte: eine Menge von Wörtern oder Phrasen, die Nutzer als Suchanfragen für dieses Produkt verwenden könnten und die Kategorie, feine Tags, Funktion und Nutzungsszenarien abdecken. - -Eingabeliste der Produkte: - -""" - prompt_tail = """ -Gib bitte strikt eine Markdown-Tabelle im folgenden Format zurück. Mehrere Werte in einer Spalte werden durch Kommas getrennt. Füge keine weiteren Erklärungen hinzu: - -| Nr. | Produkttitel | Kategoriepfad | Feintags | Zielgruppe | Einsatzszenario | Saison | Wichtige Attribute | Material | Merkmale | Verkaufsargument | Ankertexte | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "ru": - prompt = """Пожалуйста, проанализируйте каждый входной заголовок товара и извлеките следующую информацию: - -1. Заголовок товара: естественное русскоязычное название товара на основе исходного заголовка -2. Путь категории: от широкой до узкой категории, разделённый символом ">" (например: Одежда>Женская одежда>Платья>Деловое платье) -3. Детализированные теги: стиль / особенности / характеристики (например: цветочный принт, приталенный, французский стиль) -4. Целевая аудитория: пол / возрастная группа и т. п. (например: молодые женщины) -5. Сценарий использования -6. Подходящий сезон -7. Ключевые характеристики -8. Описание материала -9. Функциональные особенности -10. Торговое преимущество: одно краткое ключевое предложение для рекомендаций -11. Якорные запросы: набор слов или фраз, которые пользователи могут использовать в качестве поисковых запросов для этого товара, покрывающих категорию, детализированные теги, функциональные характеристики, сценарии использования и т. д. - -Список входных товаров: - -""" - prompt_tail = """ -Пожалуйста, строго верните Markdown‑таблицу в следующем формате. Для колонок с несколькими значениями разделяйте значения запятыми. Не добавляйте никаких дополнительных пояснений: - -| № | Заголовок товара | Путь категории | Детализированные теги | Целевая аудитория | Сценарий использования | Сезон | Ключевые характеристики | Материал | Особенности | Торговое преимущество | Якорные запросы | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "fr": - prompt = """Veuillez analyser chaque titre de produit en entrée et extraire les informations suivantes : - -1. Titre du produit : un titre de produit naturel en français basé sur le titre d’origine -2. Chemin de catégorie : de la catégorie la plus large à la plus fine, séparées par ">" (par ex. Vêtements>Femme>Robes>Robe de travail) -3. Tags détaillés : style / caractéristiques / attributs (par ex. fleuri, cintré, style français) -4. Public cible : sexe / tranche d’âge, etc. (par ex. jeunes femmes) -5. Scénario d’utilisation -6. Saison adaptée -7. Attributs clés -8. Description du matériau -9. Caractéristiques fonctionnelles -10. Argument de vente : une phrase concise résumant le principal atout pour la recommandation -11. Texte d’ancrage : un ensemble de mots ou d’expressions que les utilisateurs pourraient saisir comme requêtes de recherche pour ce produit, couvrant la catégorie, les tags détaillés, les fonctions, les scénarios d’usage, etc. - -Liste des produits en entrée : - -""" - prompt_tail = """ -Veuillez strictement renvoyer un tableau Markdown au format suivant. Pour toute colonne pouvant contenir plusieurs valeurs, séparez‑les par des virgules. N’ajoutez aucune autre explication : - -| N° | Titre du produit | Chemin de catégorie | Tags détaillés | Public cible | Scénario d’utilisation | Saison | Attributs clés | Matériau | Caractéristiques | Argument de vente | Texte d’ancrage | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - else: - # 默认中文版本 - prompt = """请对输入的每条商品标题,分析并提取以下信息: - -1. 商品标题:将输入商品名称翻译为自然、完整的中文商品标题 -2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) -3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) -4. 适用人群:性别/年龄段等(例如:年轻女性) -5. 使用场景 -6. 适用季节 -7. 关键属性 -8. 材质说明 -9. 功能特点 -10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 -11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 - -输入商品列表: - -""" - prompt_tail = """ -请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明: - -| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - - for idx, product in enumerate(products, 1): - prompt += f'{idx}. {product["title"]}\n' - prompt += prompt_tail - - return prompt - - -def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]: - """调用大模型API(带重试机制),按目标语言选择系统提示词。""" - headers = { - "Authorization": f"Bearer {API_KEY}", - "Content-Type": "application/json" - } - - payload = { - "model": MODEL_NAME, - "messages": [ - { - "role": "system", - "content": SYSTEM_MESSAGES.get(target_lang, SYSTEM_MESSAGES["zh"]) - }, - { - "role": "user", - "content": prompt - } - ], - "temperature": 0.3, - "top_p": 0.8 - } - - request_data = { - "headers": {k: v for k, v in headers.items() if k != "Authorization"}, - "payload": payload - } - - logger.info(f"\n{'='*80}") - logger.info(f"LLM Request (Model: {MODEL_NAME}):") - logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) - logger.info(f"\nPrompt:\n{prompt}") - - # 创建session,禁用代理 - session = requests.Session() - session.trust_env = False # 忽略系统代理设置 - - try: - # 重试机制 - for attempt in range(MAX_RETRIES): - try: - response = session.post( - f"{API_BASE_URL}/chat/completions", - headers=headers, - json=payload, - timeout=REQUEST_TIMEOUT, - proxies={"http": None, "https": None} # 明确禁用代理 - ) - - response.raise_for_status() - result = response.json() - - logger.info(f"\nLLM Response:") - logger.info(json.dumps(result, ensure_ascii=False, indent=2)) - - content = result["choices"][0]["message"]["content"] - logger.info(f"\nExtracted Content:\n{content}") - - return content, json.dumps(result, ensure_ascii=False) - - except requests.exceptions.ProxyError as e: - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - except requests.exceptions.RequestException as e: - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - except Exception as e: - logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - finally: - session.close() - - -def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: - """解析markdown表格内容""" - lines = markdown_content.strip().split('\n') - data = [] - data_started = False - - for line in lines: - line = line.strip() - if not line: - continue - - # 表格行处理 - if line.startswith('|'): - # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |") - sep_chars = line.replace('|', '').strip().replace(' ', '') - if sep_chars and set(sep_chars) <= {'-', ':'}: - data_started = True - continue - - # 首个表头行:无论语言如何,统一跳过 - if not data_started: - # 等待下一行数据行 - continue - - # 解析数据行 - parts = [p.strip() for p in line.split('|')] - parts = [p for p in parts if p] # 移除空字符串 - - if len(parts) >= 2: - row = { - "seq_no": parts[0], - "title": parts[1], # 商品标题(按目标语言) - "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 - "tags": parts[3] if len(parts) > 3 else "", # 细分标签 - "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 - "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 - "season": parts[6] if len(parts) > 6 else "", # 适用季节 - "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 - "material": parts[8] if len(parts) > 8 else "", # 材质说明 - "features": parts[9] if len(parts) > 9 else "", # 功能特点 - "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点 - "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本 - } - data.append(row) - - return data - - -def process_batch( - batch_data: List[Dict[str, str]], - batch_num: int, - target_lang: str = "zh" -) -> List[Dict[str, str]]: - """处理一个批次的数据""" - logger.info(f"\n{'#'*80}") - logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") - - # 创建提示词 - prompt = create_prompt(batch_data, target_lang=target_lang) - - # 调用LLM - try: - raw_response, full_response_json = call_llm(prompt, target_lang=target_lang) - - # 解析结果 - parsed_results = parse_markdown_table(raw_response) - - logger.info(f"\nParsed Results ({len(parsed_results)} items):") - logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) - - # 映射回原始ID - results_with_ids = [] - for i, parsed_item in enumerate(parsed_results): - if i < len(batch_data): - original_id = batch_data[i]["id"] - result = { - "id": original_id, - "lang": target_lang, - "title_input": batch_data[i]["title"], # 原始输入标题 - "title": parsed_item.get("title", ""), # 模型生成的标题 - "category_path": parsed_item.get("category_path", ""), # 品类路径 - "tags": parsed_item.get("tags", ""), # 细分标签 - "target_audience": parsed_item.get("target_audience", ""), # 适用人群 - "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 - "season": parsed_item.get("season", ""), # 适用季节 - "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 - "material": parsed_item.get("material", ""), # 材质说明 - "features": parsed_item.get("features", ""), # 功能特点 - "selling_points": parsed_item.get("selling_points", ""), # 商品卖点 - "anchor_text": parsed_item.get("anchor_text", "") # 锚文本 - } - results_with_ids.append(result) - logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") - - # 保存日志 - batch_log = { - "batch_num": batch_num, - "timestamp": datetime.now().isoformat(), - "input_products": batch_data, - "raw_response": raw_response, - "full_response_json": full_response_json, - "parsed_results": parsed_results, - "final_results": results_with_ids - } - - batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json" - with open(batch_log_file, 'w', encoding='utf-8') as f: - json.dump(batch_log, f, ensure_ascii=False, indent=2) - - logger.info(f"Batch log saved to: {batch_log_file}") - - return results_with_ids - - except Exception as e: - logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) - # 返回空结果,保持ID映射 - return [{ - "id": item["id"], - "lang": target_lang, - "title_input": item["title"], - "title": "", - "category_path": "", - "tags": "", - "target_audience": "", - "usage_scene": "", - "season": "", - "key_attributes": "", - "material": "", - "features": "", - "selling_points": "", - "anchor_text": "", - "error": str(e), - } for item in batch_data] - - -def read_products(input_file: str) -> List[Dict[str, str]]: - """读取CSV文件""" - products = [] - with open(input_file, 'r', encoding='utf-8') as f: - reader = csv.DictReader(f) - for row in reader: - products.append({ - "id": row["id"], - "title": row["title"] - }) - return products - - -def write_results(results: List[Dict[str, str]], output_file: Path): - """写入结果到CSV文件""" - output_file.parent.mkdir(parents=True, exist_ok=True) - - fieldnames = [ - "id", - "lang", - "title_input", - "title", - "category_path", - "tags", - "target_audience", - "usage_scene", - "season", - "key_attributes", - "material", - "features", - "selling_points", - "anchor_text", - ] - - with open(output_file, 'w', encoding='utf-8', newline='') as f: - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(results) - - logger.info(f"\nResults written to: {output_file}") - - -def main(): - """主函数""" - if not API_KEY: - logger.error("Error: DASHSCOPE_API_KEY environment variable not set!") - return - - logger.info(f"Starting product analysis process") - logger.info(f"Input file: {INPUT_FILE}") - logger.info(f"Output file: {OUTPUT_FILE}") - logger.info(f"Batch size: {BATCH_SIZE}") - logger.info(f"Model: {MODEL_NAME}") - - # 读取产品数据 - logger.info(f"\nReading products from {INPUT_FILE}...") - products = read_products(INPUT_FILE) - logger.info(f"Total products to process: {len(products)}") - - # 分批处理 - all_results = [] - total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE - - for i in range(0, len(products), BATCH_SIZE): - batch_num = i // BATCH_SIZE + 1 - batch = products[i:i + BATCH_SIZE] - - logger.info(f"\nProgress: Batch {batch_num}/{total_batches}") - - results = process_batch(batch, batch_num, target_lang="zh") - all_results.extend(results) - - # 每处理完一个批次就写入一次(断点续传) - write_results(all_results, OUTPUT_FILE) - logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed") - - logger.info(f"\n{'='*80}") - logger.info(f"Processing completed!") - logger.info(f"Total processed: {len(all_results)} items") - logger.info(f"Output file: {OUTPUT_FILE}") - logger.info(f"Log file: {log_file}") - - -if __name__ == "__main__": - main() - - -def analyze_products( - products: List[Dict[str, str]], - target_lang: str = "zh", - batch_size: Optional[int] = None, - tenant_id: Optional[str] = None, -) -> List[Dict[str, Any]]: - """ - 库调用入口:根据输入+语言,返回锚文本及各维度信息。 - - Args: - products: [{"id": "...", "title": "..."}] - target_lang: 输出语言,需在 SUPPORTED_LANGS 内 - batch_size: 批大小,默认使用全局 BATCH_SIZE - """ - if not API_KEY: - raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM") - - if target_lang not in SUPPORTED_LANGS: - raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}") - - if not products: - return [] - - # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中 - if len(products) == 1: - p = products[0] - title = str(p.get("title") or "").strip() - if title: - cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id) - if cached: - logger.info( - f"[analyze_products] Cache hit for title='{title[:50]}...', " - f"lang={target_lang}, tenant_id={tenant_id or 'global'}" - ) - return [cached] - - # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20): - # - 尽可能攒批处理; - # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。 - req_bs = BATCH_SIZE if batch_size is None else int(batch_size) - bs = max(1, min(req_bs, BATCH_SIZE)) - all_results: List[Dict[str, Any]] = [] - total_batches = (len(products) + bs - 1) // bs - - for i in range(0, len(products), bs): - batch_num = i // bs + 1 - batch = products[i:i + bs] - logger.info( - f"[analyze_products] Processing batch {batch_num}/{total_batches}, " - f"size={len(batch)}, target_lang={target_lang}" - ) - batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) - all_results.extend(batch_results) - - # 写入缓存 - for item in batch_results: - title_input = str(item.get("title_input") or "").strip() - if not title_input: - continue - if item.get("error"): - # 不缓存错误结果,避免放大临时故障 - continue - try: - _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id) - except Exception: - # 已在内部记录 warning - pass - - return all_results diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py new file mode 100644 index 0000000..1e14d44 --- /dev/null +++ b/indexer/product_enrich.py @@ -0,0 +1,516 @@ +#!/usr/bin/env python3 +""" +商品内容理解与属性补充模块(product_enrich) + +提供基于 LLM 的商品锚文本 / 语义属性 / 标签等分析能力, +供 indexer 与 API 在内存中调用(不再负责 CSV 读写)。 +""" + +import os +import json +import logging +import time +import hashlib +from datetime import datetime +from typing import List, Dict, Tuple, Any, Optional + +import redis +import requests +from pathlib import Path + +from config.env_config import REDIS_CONFIG +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP + +# 配置 +BATCH_SIZE = 20 +# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 +# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 +# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 +API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" +MODEL_NAME = "qwen-flash" +API_KEY = os.environ.get("DASHSCOPE_API_KEY") +MAX_RETRIES = 3 +RETRY_DELAY = 5 # 秒 +REQUEST_TIMEOUT = 180 # 秒 + +# 日志路径 +OUTPUT_DIR = Path("output_logs") +LOG_DIR = OUTPUT_DIR / "logs" + +# 设置独立日志(不影响全局 indexer.log) +LOG_DIR.mkdir(parents=True, exist_ok=True) +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +log_file = LOG_DIR / f"product_enrich_{timestamp}.log" +verbose_log_file = LOG_DIR / "product_enrich_verbose.log" + +# 主日志 logger:执行流程、批次信息等 +logger = logging.getLogger("product_enrich") +logger.setLevel(logging.INFO) + +if not logger.handlers: + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件 + logger.propagate = False + +# 详尽日志 logger:专门记录 LLM 请求与响应 +verbose_logger = logging.getLogger("product_enrich_verbose") +verbose_logger.setLevel(logging.INFO) + +if not verbose_logger.handlers: + verbose_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + verbose_file_handler = logging.FileHandler(verbose_log_file, encoding="utf-8") + verbose_file_handler.setFormatter(verbose_formatter) + verbose_logger.addHandler(verbose_file_handler) + verbose_logger.propagate = False + + +# Redis 缓存(用于 anchors / 语义属性) +ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") +ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) +_anchor_redis: Optional[redis.Redis] = None + +try: + _anchor_redis = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=True, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + _anchor_redis.ping() + logger.info("Redis cache initialized for product anchors and semantic attributes") +except Exception as e: + logger.warning(f"Failed to initialize Redis for anchors cache: {e}") + _anchor_redis = None + + +SYSTEM_MESSAGES = ( + "You are a product annotator for an e-commerce platform. " + "For each input product, you must understand, analyze and label it, " + "and return a Markdown table strictly following the requested format. " + "All output must be in English." +) + + +def _make_anchor_cache_key( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> str: + """构造 anchors/语义属性的缓存 key。""" + base = (tenant_id or "global").strip() + h = hashlib.md5(title.encode("utf-8")).hexdigest() + return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}" + + +def _get_cached_anchor_result( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + if not _anchor_redis: + return None + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + raw = _anchor_redis.get(key) + if not raw: + return None + return json.loads(raw) + except Exception as e: + logger.warning(f"Failed to get anchor cache: {e}") + return None + + +def _set_cached_anchor_result( + title: str, + target_lang: str, + result: Dict[str, Any], + tenant_id: Optional[str] = None, +) -> None: + if not _anchor_redis: + return + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 + _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) + except Exception as e: + logger.warning(f"Failed to set anchor cache: {e}") + + +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: + """根据目标语言创建 LLM 提示词和表头说明。 + + 约定: + - 提示词始终使用英文; + - 当 target_lang == "en" 时,直接要求用英文分析并输出英文表头; + - 当 target_lang 为其他语言时,视作“多轮对话”的后续轮次: + * 默认上一轮已经用英文完成了分析; + * 当前轮只需要在保持结构和含义不变的前提下,将整张表格翻译为目标语言, + 包含表头与所有单元格内容。 + """ + lang_name = SOURCE_LANG_CODE_MAP.get(target_lang, target_lang) + + prompt = """Please analyze each input product title and extract the following information: + +1. Product title: a natural English product name derived from the input title +2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress) +3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style) +4. Target audience: gender / age group, etc. (e.g. young women) +5. Usage scene +6. Applicable season +7. Key attributes +8. Material description +9. Functional features +10. Selling point: one concise key selling sentence for recommendation +11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc. + +Input product list: + +""" + + for idx, product in enumerate(products, 1): + prompt += f'{idx}. {product["title"]}\n' + + if target_lang == "en": + # 英文首轮:直接要求英文表头 + 英文内容 + prompt += """ +Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations: + +| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + else: + # 非英文语言:视作“下一轮对话”,只做翻译,要求表头与内容全部用目标语言 + prompt += f""" +Now we will output the same table in {lang_name}. + +IMPORTANT: +- Assume you have already generated the full table in English in a previous round. +- In this round, you must output exactly the same table structure and content, + but fully translated into {lang_name}, including ALL column headers and ALL cell values. +- Do NOT change the meaning, fields, or the number/order of rows and columns. +- Keep valid Markdown table syntax. + +Please return ONLY the Markdown table in {lang_name}, without any extra explanations. +""" + + return prompt + + +def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]: + """调用大模型API(带重试机制),按目标语言选择系统提示词。""" + headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json", + } + + payload = { + "model": MODEL_NAME, + "messages": [ + { + "role": "system", + "content": SYSTEM_MESSAGES, + }, + { + "role": "user", + "content": prompt, + }, + ], + "temperature": 0.3, + "top_p": 0.8, + } + + request_data = { + "headers": {k: v for k, v in headers.items() if k != "Authorization"}, + "payload": payload, + } + + # 主日志 + 详尽日志:LLM Request + logger.info(f"\n{'=' * 80}") + logger.info(f"LLM Request (Model: {MODEL_NAME}):") + logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) + logger.info(f"\nPrompt:\n{prompt}") + + verbose_logger.info(f"\n{'=' * 80}") + verbose_logger.info(f"LLM Request (Model: {MODEL_NAME}):") + verbose_logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) + verbose_logger.info(f"\nPrompt:\n{prompt}") + + # 创建session,禁用代理 + session = requests.Session() + session.trust_env = False # 忽略系统代理设置 + + try: + # 重试机制 + for attempt in range(MAX_RETRIES): + try: + response = session.post( + f"{API_BASE_URL}/chat/completions", + headers=headers, + json=payload, + timeout=REQUEST_TIMEOUT, + proxies={"http": None, "https": None}, # 明确禁用代理 + ) + + response.raise_for_status() + result = response.json() + + # 主日志 + 详尽日志:LLM Response + logger.info(f"\nLLM Response:") + logger.info(json.dumps(result, ensure_ascii=False, indent=2)) + + verbose_logger.info(f"\nLLM Response:") + verbose_logger.info(json.dumps(result, ensure_ascii=False, indent=2)) + + content = result["choices"][0]["message"]["content"] + logger.info(f"\nExtracted Content:\n{content}") + verbose_logger.info(f"\nExtracted Content:\n{content}") + + return content, json.dumps(result, ensure_ascii=False) + + except requests.exceptions.ProxyError as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except requests.exceptions.RequestException as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except Exception as e: + logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + finally: + session.close() + + +def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: + """解析markdown表格内容""" + lines = markdown_content.strip().split("\n") + data = [] + data_started = False + + for line in lines: + line = line.strip() + if not line: + continue + + # 表格行处理 + if line.startswith("|"): + # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |") + sep_chars = line.replace("|", "").strip().replace(" ", "") + if sep_chars and set(sep_chars) <= {"-", ":"}: + data_started = True + continue + + # 首个表头行:无论语言如何,统一跳过 + if not data_started: + # 等待下一行数据行 + continue + + # 解析数据行 + parts = [p.strip() for p in line.split("|")] + parts = [p for p in parts if p] # 移除空字符串 + + if len(parts) >= 2: + row = { + "seq_no": parts[0], + "title": parts[1], # 商品标题(按目标语言) + "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签 + "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 + "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 + "season": parts[6] if len(parts) > 6 else "", # 适用季节 + "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 + "material": parts[8] if len(parts) > 8 else "", # 材质说明 + "features": parts[9] if len(parts) > 9 else "", # 功能特点 + "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点 + "anchor_text": parts[11] if len(parts) > 11 else "", # 锚文本 + } + data.append(row) + + return data + + +def process_batch( + batch_data: List[Dict[str, str]], + batch_num: int, + target_lang: str = "zh", +) -> List[Dict[str, str]]: + """处理一个批次的数据""" + logger.info(f"\n{'#' * 80}") + logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") + + # 创建提示词 + prompt = create_prompt(batch_data, target_lang=target_lang) + + # 调用LLM + try: + raw_response, full_response_json = call_llm(prompt, target_lang=target_lang) + + # 解析结果 + parsed_results = parse_markdown_table(raw_response) + + logger.info(f"\nParsed Results ({len(parsed_results)} items):") + logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) + + # 映射回原始ID + results_with_ids = [] + for i, parsed_item in enumerate(parsed_results): + if i < len(batch_data): + original_id = batch_data[i]["id"] + result = { + "id": original_id, + "lang": target_lang, + "title_input": batch_data[i]["title"], # 原始输入标题 + "title": parsed_item.get("title", ""), # 模型生成的标题 + "category_path": parsed_item.get("category_path", ""), # 品类路径 + "tags": parsed_item.get("tags", ""), # 细分标签 + "target_audience": parsed_item.get("target_audience", ""), # 适用人群 + "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 + "season": parsed_item.get("season", ""), # 适用季节 + "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 + "material": parsed_item.get("material", ""), # 材质说明 + "features": parsed_item.get("features", ""), # 功能特点 + "selling_points": parsed_item.get("selling_points", ""), # 商品卖点 + "anchor_text": parsed_item.get("anchor_text", ""), # 锚文本 + } + results_with_ids.append(result) + logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") + + # 保存批次 JSON 日志到独立文件 + batch_log = { + "batch_num": batch_num, + "timestamp": datetime.now().isoformat(), + "input_products": batch_data, + "raw_response": raw_response, + "full_response_json": full_response_json, + "parsed_results": parsed_results, + "final_results": results_with_ids, + } + + batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json" + with open(batch_log_file, "w", encoding="utf-8") as f: + json.dump(batch_log, f, ensure_ascii=False, indent=2) + + logger.info(f"Batch log saved to: {batch_log_file}") + + return results_with_ids + + except Exception as e: + logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) + # 返回空结果,保持ID映射 + return [ + { + "id": item["id"], + "lang": target_lang, + "title_input": item["title"], + "title": "", + "category_path": "", + "tags": "", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": "", + "error": str(e), + } + for item in batch_data + ] + + +def analyze_products( + products: List[Dict[str, str]], + target_lang: str = "zh", + batch_size: Optional[int] = None, + tenant_id: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + 库调用入口:根据输入+语言,返回锚文本及各维度信息。 + + Args: + products: [{"id": "...", "title": "..."}] + target_lang: 输出语言 + batch_size: 批大小,默认使用全局 BATCH_SIZE + """ + if not API_KEY: + raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM") + + if not products: + return [] + + # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中 + if len(products) == 1: + p = products[0] + title = str(p.get("title") or "").strip() + if title: + cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id) + if cached: + logger.info( + f"[analyze_products] Cache hit for title='{title[:50]}...', " + f"lang={target_lang}, tenant_id={tenant_id or 'global'}" + ) + return [cached] + + # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20): + # - 尽可能攒批处理; + # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。 + req_bs = BATCH_SIZE if batch_size is None else int(batch_size) + bs = max(1, min(req_bs, BATCH_SIZE)) + all_results: List[Dict[str, Any]] = [] + total_batches = (len(products) + bs - 1) // bs + + for i in range(0, len(products), bs): + batch_num = i // bs + 1 + batch = products[i : i + bs] + logger.info( + f"[analyze_products] Processing batch {batch_num}/{total_batches}, " + f"size={len(batch)}, target_lang={target_lang}" + ) + batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) + all_results.extend(batch_results) + + # 写入缓存 + for item in batch_results: + title_input = str(item.get("title_input") or "").strip() + if not title_input: + continue + if item.get("error"): + # 不缓存错误结果,避免放大临时故障 + continue + try: + _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id) + except Exception: + # 已在内部记录 warning + pass + + return all_results + diff --git a/providers/translation.py b/providers/translation.py index e2b1db9..c904931 100644 --- a/providers/translation.py +++ b/providers/translation.py @@ -2,7 +2,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Sequence, Union import requests from config.services_config import get_translation_config, get_translation_base_url @@ -23,16 +23,49 @@ class HttpTranslationProvider: self.model = model or "qwen" self.timeout_sec = float(timeout_sec or 10.0) + @property + def supports_batch(self) -> bool: + """ + Whether this provider supports list input natively. + + 当前实现中,我们已经在 `_translate_once` 内处理了 list, + 所以可以直接视为支持 batch。 + """ + return True + def _translate_once( self, - text: str, + text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, - ) -> Optional[str]: + ) -> Union[Optional[str], List[Optional[str]]]: + # 允许 text 为单个字符串或字符串列表 + if isinstance(text, (list, tuple)): + # 上游约定:列表输入时,输出列表一一对应;失败位置为 None + results: List[Optional[str]] = [] + for item in text: + if item is None or not str(item).strip(): + # 空字符串/None 不视为失败,原样返回以保持语义 + results.append(item) # type: ignore[arg-type] + continue + try: + single = self._translate_once( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + results.append(single) # type: ignore[arg-type] + except Exception: + # 理论上不会进入,因为内部已捕获;兜底保持长度一致 + results.append(None) + return results + if not text or not str(text).strip(): - return text + return text # type: ignore[return-value] try: url = f"{self.base_url}/translate" payload = { @@ -62,12 +95,12 @@ class HttpTranslationProvider: def translate( self, - text: str, + text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, - ) -> Optional[str]: + ) -> Union[Optional[str], List[Optional[str]]]: return self._translate_once( text=text, target_lang=target_lang, diff --git a/query/deepl_provider.py b/query/deepl_provider.py index 916778b..a134d68 100644 --- a/query/deepl_provider.py +++ b/query/deepl_provider.py @@ -10,7 +10,7 @@ from __future__ import annotations import logging import os import re -from typing import Dict, Optional, Tuple +from typing import Dict, List, Optional, Sequence, Tuple, Union import requests from config.services_config import get_translation_config @@ -88,6 +88,14 @@ class DeepLProvider: if not self.api_key: logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable") + @property + def supports_batch(self) -> bool: + """ + DeepL HTTP API 本身支持一次传多条 text,这里先返回 False, + 由上层逐条拆分,后续如果要真正批量,可调整实现。 + """ + return False + def _resolve_request_context( self, target_lang: str, @@ -108,12 +116,28 @@ class DeepLProvider: def translate( self, - text: str, + text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, - ) -> Optional[str]: + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + if item is None or not str(item).strip(): + results.append(item) # type: ignore[arg-type] + continue + out = self.translate( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + results.append(out) + return results + if not self.api_key: return None diff --git a/query/llm_translate.py b/query/llm_translate.py index f9b3efd..f0af1e0 100644 --- a/query/llm_translate.py +++ b/query/llm_translate.py @@ -11,13 +11,14 @@ from __future__ import annotations import logging import os import time -from typing import Optional +from typing import List, Optional, Sequence, Union from openai import OpenAI from config.env_config import DASHSCOPE_API_KEY from config.services_config import get_translation_config -from config.translate_prompts import TRANSLATION_PROMPTS, SOURCE_LANG_CODE_MAP +from config.translate_prompts import TRANSLATION_PROMPTS +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP logger = logging.getLogger(__name__) @@ -96,6 +97,12 @@ class LLMTranslatorProvider: ) self.client = self._create_client() + @property + def supports_batch(self) -> bool: + """Whether this provider efficiently supports list input.""" + # 我们在 translate 中已经原生支持 list,所以这里返回 True + return True + def _create_client(self) -> Optional[OpenAI]: api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") if not api_key: @@ -107,7 +114,7 @@ class LLMTranslatorProvider: logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True) return None - def translate( + def _translate_single( self, text: str, target_lang: str, @@ -148,7 +155,14 @@ class LLMTranslatorProvider: if not content: logger.warning("[llm] Empty result | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms) return None - logger.info("[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms", src, tgt, text, content, latency_ms) + logger.info( + "[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms", + src, + tgt, + text, + content, + latency_ms, + ) return content except Exception as exc: latency_ms = (time.time() - start) * 1000 @@ -162,16 +176,56 @@ class LLMTranslatorProvider: ) return None + def translate( + self, + text: Union[str, Sequence[str]], + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Union[Optional[str], List[Optional[str]]]: + """ + Translate a single string or a list of strings. + + - If input is a list, returns a list of the same length. + - Per-item failures are returned as None. + """ + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + # 保证一一对应,即使某个元素为空也占位 + if item is None: + results.append(None) + continue + results.append( + self._translate_single( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + ) + return results + + return self._translate_single( + text=str(text), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + def llm_translate( - text: str, + text: Union[str, Sequence[str]], target_lang: str, *, source_lang: Optional[str] = None, source_lang_label: Optional[str] = None, target_lang_label: Optional[str] = None, timeout_sec: Optional[float] = None, -) -> Optional[str]: +) -> Union[Optional[str], List[Optional[str]]]: provider = LLMTranslatorProvider(timeout_sec=timeout_sec or 30.0) return provider.translate( text=text, diff --git a/query/qwen_mt_translate.py b/query/qwen_mt_translate.py index aec4e85..48273c5 100644 --- a/query/qwen_mt_translate.py +++ b/query/qwen_mt_translate.py @@ -7,14 +7,14 @@ import logging import os import re import time -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Sequence, Union import redis from openai import OpenAI from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG from config.services_config import get_translation_cache_config -from config.translate_prompts import SOURCE_LANG_CODE_MAP +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP logger = logging.getLogger(__name__) @@ -62,6 +62,16 @@ class Translator: if self.use_cache and bool(cache_cfg.get("enabled", True)): self.redis_client = self._init_redis_client() + @property + def supports_batch(self) -> bool: + """ + 标记该 provider 已支持列表输入。 + + 当前实现为循环单条调用(带缓存),不是真正的并行批量请求, + 但对上层来说可以直接传 list,返回 list。 + """ + return True + @staticmethod def _normalize_model(model: str) -> str: m = (model or "qwen").strip().lower() @@ -117,14 +127,31 @@ class Translator: def translate( self, - text: str, + text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, - ) -> Optional[str]: - if not text or not text.strip(): - return text + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + if item is None or not str(item).strip(): + results.append(item) # type: ignore[arg-type] + continue + # 对于 batch,这里沿用单条的缓存与规则,逐条调用 + out = self.translate( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + results.append(out) + return results + + if not text or not str(text).strip(): + return text # type: ignore[return-value] tgt = (target_lang or "").strip().lower() src = (source_lang or "").strip().lower() or None diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index c5c52bf..3668818 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -342,7 +342,7 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): - import indexer.product_annotator as process_products + import indexer.product_enrich as process_products def _fake_analyze_products( products: List[Dict[str, str]], diff --git a/tests/test_process_products_batching.py b/tests/test_process_products_batching.py index d491f43..a02f8da 100644 --- a/tests/test_process_products_batching.py +++ b/tests/test_process_products_batching.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import Any, Dict, List -import indexer.product_annotator as process_products +import indexer.product_enrich as process_products def _mk_products(n: int) -> List[Dict[str, str]]: -- libgit2 0.21.2