Commit 22ae00c78124374e37763045a0b9dd35915c9058

Authored by tangwang
1 parent 77ab67ad

product_annotator

api/routes/indexer.py
@@ -443,10 +443,10 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest): @@ -443,10 +443,10 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest):
443 443
444 def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: 444 def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]:
445 """ 445 """
446 - 同步执行内容理解:调用 process_products.analyze_products,按语言批量跑 LLM, 446 + 同步执行内容理解:调用 product_annotator.analyze_products,按语言批量跑 LLM,
447 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。 447 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。
448 """ 448 """
449 - from indexer.process_products import analyze_products, SUPPORTED_LANGS 449 + from indexer.product_annotator import analyze_products, SUPPORTED_LANGS
450 450
451 llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS] 451 llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS]
452 if not llm_langs: 452 if not llm_langs:
@@ -544,7 +544,7 @@ async def enrich_content(request: EnrichContentRequest): @@ -544,7 +544,7 @@ async def enrich_content(request: EnrichContentRequest):
544 - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 544 - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可
545 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 545 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。
546 546
547 - 实现逻辑与 indexer.process_products.analyze_products 一致,支持多语言与 Redis 缓存。 547 + 实现逻辑与 indexer.product_annotator.analyze_products 一致,支持多语言与 Redis 缓存。
548 """ 548 """
549 try: 549 try:
550 if not request.items: 550 if not request.items:
config/config.yaml
@@ -161,7 +161,7 @@ services: @@ -161,7 +161,7 @@ services:
161 base_url: "http://127.0.0.1:6007" 161 base_url: "http://127.0.0.1:6007"
162 service_url: "http://127.0.0.1:6007/rerank" 162 service_url: "http://127.0.0.1:6007/rerank"
163 # 服务内后端(reranker 进程启动时读取) 163 # 服务内后端(reranker 进程启动时读取)
164 - backend: "dashscope_rerank" # bge | qwen3_vllm | qwen3_transformers | dashscope_rerank 164 + backend: "qwen3_vllm" # bge | qwen3_vllm | qwen3_transformers | dashscope_rerank
165 backends: 165 backends:
166 bge: 166 bge:
167 model_name: "BAAI/bge-reranker-v2-m3" 167 model_name: "BAAI/bge-reranker-v2-m3"
docs/搜索API对接指南.md
@@ -1484,7 +1484,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ @@ -1484,7 +1484,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
1484 ### 5.8 内容理解字段生成接口 1484 ### 5.8 内容理解字段生成接口
1485 1485
1486 - **端点**: `POST /indexer/enrich-content` 1486 - **端点**: `POST /indexer/enrich-content`
1487 -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.process_products` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 1487 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_annotator` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。
1488 1488
1489 #### 请求参数 1489 #### 请求参数
1490 1490
indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md
@@ -82,14 +82,14 @@ @@ -82,14 +82,14 @@
82 82
83 --- 83 ---
84 84
85 -### 2. LLM 分析服务:`indexer/process_products.py` 85 +### 2. LLM 分析服务:`indexer/product_annotator.py`
86 86
87 #### 2.1 入口函数:`analyze_products` 87 #### 2.1 入口函数:`analyze_products`
88 88
89 -- **文件**:`indexer/process_products.py` 89 +- **文件**:`indexer/product_annotator.py`
90 - **函数签名**: 90 - **函数签名**:
91 91
92 -```365:392:/home/tw/saas-search/indexer/process_products.py 92 +```365:392:/home/tw/saas-search/indexer/product_annotator.py
93 def analyze_products( 93 def analyze_products(
94 products: List[Dict[str, str]], 94 products: List[Dict[str, str]],
95 target_lang: str = "zh", 95 target_lang: str = "zh",
@@ -108,7 +108,7 @@ def analyze_products( @@ -108,7 +108,7 @@ def analyze_products(
108 108
109 - **支持的输出语言**(在同文件中定义): 109 - **支持的输出语言**(在同文件中定义):
110 110
111 -```54:62:/home/tw/saas-search/indexer/process_products.py 111 +```54:62:/home/tw/saas-search/indexer/product_annotator.py
112 LANG_LABELS: Dict[str, str] = { 112 LANG_LABELS: Dict[str, str] = {
113 "zh": "中文", 113 "zh": "中文",
114 "en": "英文", 114 "en": "英文",
@@ -148,7 +148,7 @@ SUPPORTED_LANGS = set(LANG_LABELS.keys()) @@ -148,7 +148,7 @@ SUPPORTED_LANGS = set(LANG_LABELS.keys())
148 148
149 - Prompt 中会明确要求“**所有输出内容使用目标语言**”,并给出中英文示例: 149 - Prompt 中会明确要求“**所有输出内容使用目标语言**”,并给出中英文示例:
150 150
151 -```65:81:/home/tw/saas-search/indexer/process_products.py 151 +```65:81:/home/tw/saas-search/indexer/product_annotator.py
152 def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: 152 def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
153 """创建LLM提示词(根据目标语言输出)""" 153 """创建LLM提示词(根据目标语言输出)"""
154 lang_label = LANG_LABELS.get(target_lang, "对应语言") 154 lang_label = LANG_LABELS.get(target_lang, "对应语言")
@@ -170,7 +170,7 @@ def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> st @@ -170,7 +170,7 @@ def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> st
170 170
171 - 返回格式固定为 Markdown 表格,首行头为: 171 - 返回格式固定为 Markdown 表格,首行头为:
172 172
173 -```89:91:/home/tw/saas-search/indexer/process_products.py 173 +```89:91:/home/tw/saas-search/indexer/product_annotator.py
174 | 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | 174 | 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 |
175 |----|----|----|----|----|----|----|----|----|----|----|----| 175 |----|----|----|----|----|----|----|----|----|----|----|----|
176 ``` 176 ```
indexer/document_transformer.py
@@ -14,7 +14,7 @@ import logging @@ -14,7 +14,7 @@ import logging
14 import re 14 import re
15 from typing import Dict, Any, Optional, List 15 from typing import Dict, Any, Optional, List
16 from config import ConfigLoader 16 from config import ConfigLoader
17 -from indexer.process_products import analyze_products, SUPPORTED_LANGS 17 +from indexer.product_annotator import analyze_products, SUPPORTED_LANGS
18 18
19 logger = logging.getLogger(__name__) 19 logger = logging.getLogger(__name__)
20 20
@@ -641,7 +641,7 @@ class SPUDocumentTransformer: @@ -641,7 +641,7 @@ class SPUDocumentTransformer:
641 641
642 def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: 642 def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None:
643 """ 643 """
644 - 调用 indexer.process_products.analyze_products,为当前 SPU 填充: 644 + 调用 indexer.product_annotator.analyze_products,为当前 SPU 填充:
645 - qanchors.{lang} 645 - qanchors.{lang}
646 - semantic_attributes (lang/name/value) 646 - semantic_attributes (lang/name/value)
647 """ 647 """
indexer/process_products.py renamed to indexer/product_annotator.py
@@ -43,20 +43,28 @@ OUTPUT_DIR = Path("output_logs") @@ -43,20 +43,28 @@ OUTPUT_DIR = Path("output_logs")
43 OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv" 43 OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv"
44 LOG_DIR = OUTPUT_DIR / "logs" 44 LOG_DIR = OUTPUT_DIR / "logs"
45 45
46 -# 设置日志 46 +# 设置独立日志(不影响全局 indexer.log)
47 LOG_DIR.mkdir(parents=True, exist_ok=True) 47 LOG_DIR.mkdir(parents=True, exist_ok=True)
48 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 48 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
49 log_file = LOG_DIR / f"process_{timestamp}.log" 49 log_file = LOG_DIR / f"process_{timestamp}.log"
50 50
51 -logging.basicConfig(  
52 - level=logging.INFO,  
53 - format='%(asctime)s - %(levelname)s - %(message)s',  
54 - handlers=[  
55 - logging.FileHandler(log_file, encoding='utf-8'),  
56 - logging.StreamHandler()  
57 - ]  
58 -)  
59 -logger = logging.getLogger(__name__) 51 +logger = logging.getLogger("product_annotator")
  52 +logger.setLevel(logging.INFO)
  53 +
  54 +if not logger.handlers:
  55 + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
  56 +
  57 + file_handler = logging.FileHandler(log_file, encoding="utf-8")
  58 + file_handler.setFormatter(formatter)
  59 +
  60 + stream_handler = logging.StreamHandler()
  61 + stream_handler.setFormatter(formatter)
  62 +
  63 + logger.addHandler(file_handler)
  64 + logger.addHandler(stream_handler)
  65 +
  66 + # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件
  67 + logger.propagate = False
60 68
61 69
62 # Redis 缓存(用于 anchors / 语义属性) 70 # Redis 缓存(用于 anchors / 语义属性)
scripts/service_ctl.sh
@@ -868,6 +868,7 @@ Special targets: @@ -868,6 +868,7 @@ Special targets:
868 868
869 Examples: 869 Examples:
870 ./scripts/service_ctl.sh up all 870 ./scripts/service_ctl.sh up all
  871 + ./scripts/service_ctl.sh up tei cnclip embedding translator reranker
871 ./scripts/service_ctl.sh up backend indexer frontend 872 ./scripts/service_ctl.sh up backend indexer frontend
872 ./scripts/service_ctl.sh restart 873 ./scripts/service_ctl.sh restart
873 ./scripts/service_ctl.sh monitor-start all 874 ./scripts/service_ctl.sh monitor-start all
tests/ci/test_service_api_contracts.py
@@ -342,7 +342,7 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): @@ -342,7 +342,7 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient):
342 342
343 343
344 def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): 344 def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch):
345 - import indexer.process_products as process_products 345 + import indexer.product_annotator as process_products
346 346
347 def _fake_analyze_products( 347 def _fake_analyze_products(
348 products: List[Dict[str, str]], 348 products: List[Dict[str, str]],
tests/test_process_products_batching.py
@@ -2,7 +2,7 @@ from __future__ import annotations @@ -2,7 +2,7 @@ from __future__ import annotations
2 2
3 from typing import Any, Dict, List 3 from typing import Any, Dict, List
4 4
5 -import indexer.process_products as process_products 5 +import indexer.product_annotator as process_products
6 6
7 7
8 def _mk_products(n: int) -> List[Dict[str, str]]: 8 def _mk_products(n: int) -> List[Dict[str, str]]: