diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 74dc674..57bef0c 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -443,10 +443,10 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest): def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: """ - 同步执行内容理解:调用 process_products.analyze_products,按语言批量跑 LLM, + 同步执行内容理解:调用 product_annotator.analyze_products,按语言批量跑 LLM, 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。 """ - from indexer.process_products import analyze_products, SUPPORTED_LANGS + from indexer.product_annotator import analyze_products, SUPPORTED_LANGS llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS] if not llm_langs: @@ -544,7 +544,7 @@ async def enrich_content(request: EnrichContentRequest): - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 - 实现逻辑与 indexer.process_products.analyze_products 一致,支持多语言与 Redis 缓存。 + 实现逻辑与 indexer.product_annotator.analyze_products 一致,支持多语言与 Redis 缓存。 """ try: if not request.items: diff --git a/config/config.yaml b/config/config.yaml index 8d29fd7..e48bf88 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -161,7 +161,7 @@ services: base_url: "http://127.0.0.1:6007" service_url: "http://127.0.0.1:6007/rerank" # 服务内后端(reranker 进程启动时读取) - backend: "dashscope_rerank" # bge | qwen3_vllm | qwen3_transformers | dashscope_rerank + backend: "qwen3_vllm" # bge | qwen3_vllm | qwen3_transformers | dashscope_rerank backends: bge: model_name: "BAAI/bge-reranker-v2-m3" diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 3bb2470..47e0e07 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -1484,7 +1484,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ ### 5.8 内容理解字段生成接口 - **端点**: `POST /indexer/enrich-content` -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.process_products` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_annotator` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 #### 请求参数 diff --git a/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md b/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md index 25abe52..9b4f5fb 100644 --- a/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md +++ b/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md @@ -82,14 +82,14 @@ --- -### 2. LLM 分析服务:`indexer/process_products.py` +### 2. LLM 分析服务:`indexer/product_annotator.py` #### 2.1 入口函数:`analyze_products` -- **文件**:`indexer/process_products.py` +- **文件**:`indexer/product_annotator.py` - **函数签名**: -```365:392:/home/tw/saas-search/indexer/process_products.py +```365:392:/home/tw/saas-search/indexer/product_annotator.py def analyze_products( products: List[Dict[str, str]], target_lang: str = "zh", @@ -108,7 +108,7 @@ def analyze_products( - **支持的输出语言**(在同文件中定义): -```54:62:/home/tw/saas-search/indexer/process_products.py +```54:62:/home/tw/saas-search/indexer/product_annotator.py LANG_LABELS: Dict[str, str] = { "zh": "中文", "en": "英文", @@ -148,7 +148,7 @@ SUPPORTED_LANGS = set(LANG_LABELS.keys()) - Prompt 中会明确要求“**所有输出内容使用目标语言**”,并给出中英文示例: -```65:81:/home/tw/saas-search/indexer/process_products.py +```65:81:/home/tw/saas-search/indexer/product_annotator.py def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: """创建LLM提示词(根据目标语言输出)""" lang_label = LANG_LABELS.get(target_lang, "对应语言") @@ -170,7 +170,7 @@ def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> st - 返回格式固定为 Markdown 表格,首行头为: -```89:91:/home/tw/saas-search/indexer/process_products.py +```89:91:/home/tw/saas-search/indexer/product_annotator.py | 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | |----|----|----|----|----|----|----|----|----|----|----|----| ``` diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 7d5dff6..83cacd3 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -14,7 +14,7 @@ import logging import re from typing import Dict, Any, Optional, List from config import ConfigLoader -from indexer.process_products import analyze_products, SUPPORTED_LANGS +from indexer.product_annotator import analyze_products, SUPPORTED_LANGS logger = logging.getLogger(__name__) @@ -641,7 +641,7 @@ class SPUDocumentTransformer: def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: """ - 调用 indexer.process_products.analyze_products,为当前 SPU 填充: + 调用 indexer.product_annotator.analyze_products,为当前 SPU 填充: - qanchors.{lang} - semantic_attributes (lang/name/value) """ diff --git a/indexer/process_products.py b/indexer/process_products.py deleted file mode 100644 index 2dccda6..0000000 --- a/indexer/process_products.py +++ /dev/null @@ -1,681 +0,0 @@ -#!/usr/bin/env python3 -""" -商品品类分析脚本 -批量读取商品标题,调用大模型进行品类分析,并保存结果 -""" - -import csv -import os -import json -import logging -import time -import hashlib -from datetime import datetime -from typing import List, Dict, Tuple, Any, Optional - -import redis -import requests -from pathlib import Path -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from config.env_config import REDIS_CONFIG - -# 配置 -BATCH_SIZE = 20 -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 -API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" -MODEL_NAME = "qwen-flash" -API_KEY = os.environ.get("DASHSCOPE_API_KEY") -MAX_RETRIES = 3 -RETRY_DELAY = 5 # 秒 -REQUEST_TIMEOUT = 180 # 秒 - -# 禁用代理 -os.environ['NO_PROXY'] = '*' -os.environ['no_proxy'] = '*' - -# 文件路径 -INPUT_FILE = "saas_170_products.csv" -OUTPUT_DIR = Path("output_logs") -OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv" -LOG_DIR = OUTPUT_DIR / "logs" - -# 设置日志 -LOG_DIR.mkdir(parents=True, exist_ok=True) -timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") -log_file = LOG_DIR / f"process_{timestamp}.log" - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file, encoding='utf-8'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - - -# Redis 缓存(用于 anchors / 语义属性) -ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") -ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) -_anchor_redis: Optional[redis.Redis] = None - -try: - _anchor_redis = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), - decode_responses=True, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), - health_check_interval=10, - ) - _anchor_redis.ping() - logger.info("Redis cache initialized for product anchors and semantic attributes") -except Exception as e: - logger.warning(f"Failed to initialize Redis for anchors cache: {e}") - _anchor_redis = None - - -LANG_LABELS: Dict[str, str] = { - "zh": "中文", - "en": "英文", - "de": "德文", - "ru": "俄文", - "fr": "法文", -} - -SUPPORTED_LANGS = set(LANG_LABELS.keys()) - -SYSTEM_MESSAGES: Dict[str, str] = { - "zh": ( - "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注," - "并按要求格式返回 Markdown 表格。所有输出内容必须为中文。" - ), - "en": ( - "You are a product annotator for an e-commerce platform. " - "For each input product, you must understand, analyze and label it, " - "and return a Markdown table strictly following the requested format. " - "All output must be in English." - ), - "de": ( - "Du bist ein Produktannotator für eine E‑Commerce‑Plattform. " - "Du sollst jedes Eingabeprodukt verstehen, analysieren und beschriften " - "und eine Markdown-Tabelle im geforderten Format zurückgeben. " - "Alle Ausgaben müssen auf Deutsch sein." - ), - "ru": ( - "Вы — разметчик товаров для платформы электронной коммерции. " - "Ваша задача — понимать, анализировать и размечать каждый товар " - "и возвращать таблицу Markdown в требуемом формате. " - "Весь вывод должен быть на русском языке." - ), - "fr": ( - "Vous êtes annotateur de produits pour une plateforme e‑commerce. " - "Pour chaque produit en entrée, vous devez le comprendre, l’analyser et l’annoter, " - "puis renvoyer un tableau Markdown au format demandé. " - "Toute la sortie doit être en français." - ), -} - - -def _make_anchor_cache_key( - title: str, - target_lang: str, - tenant_id: Optional[str] = None, -) -> str: - """构造 anchors/语义属性的缓存 key。""" - base = (tenant_id or "global").strip() - h = hashlib.md5(title.encode("utf-8")).hexdigest() - return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}" - - -def _get_cached_anchor_result( - title: str, - target_lang: str, - tenant_id: Optional[str] = None, -) -> Optional[Dict[str, Any]]: - if not _anchor_redis: - return None - try: - key = _make_anchor_cache_key(title, target_lang, tenant_id) - raw = _anchor_redis.get(key) - if not raw: - return None - return json.loads(raw) - except Exception as e: - logger.warning(f"Failed to get anchor cache: {e}") - return None - - -def _set_cached_anchor_result( - title: str, - target_lang: str, - result: Dict[str, Any], - tenant_id: Optional[str] = None, -) -> None: - if not _anchor_redis: - return - try: - key = _make_anchor_cache_key(title, target_lang, tenant_id) - ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 - _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) - except Exception as e: - logger.warning(f"Failed to set anchor cache: {e}") - - -def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: - """根据目标语言创建 LLM 提示词和表头说明。""" - if target_lang == "en": - prompt = """Please analyze each input product title and extract the following information: - -1. Product title: a natural English product name derived from the input title -2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress) -3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style) -4. Target audience: gender / age group, etc. (e.g. young women) -5. Usage scene -6. Applicable season -7. Key attributes -8. Material description -9. Functional features -10. Selling point: one concise key selling sentence for recommendation -11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc. - -Input product list: - -""" - prompt_tail = """ -Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations: - -| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "de": - prompt = """Bitte analysiere jeden eingegebenen Produkttitel und extrahiere die folgenden Informationen: - -1. Produkttitel: ein natürlicher deutscher Produkttitel basierend auf dem Eingangstitel -2. Kategoriepfad: von Oberkategorie bis Feinkategorie, getrennt durch ">" (z. B. Kleidung>Damen>Kleider>Businesskleid) -3. Feinkörnige Tags: Stil / Merkmale / Eigenschaften (z. B. Blumenmuster, tailliert, französischer Stil) -4. Zielgruppe: Geschlecht / Altersgruppe usw. (z. B. junge Frauen) -5. Einsatzszenario -6. Geeignete Saison -7. Wichtige Attribute -8. Materialbeschreibung -9. Funktionale Merkmale -10. Verkaufsargument: ein prägnanter, einzeiliger Haupt-Selling-Point für Empfehlungen -11. Ankertexte: eine Menge von Wörtern oder Phrasen, die Nutzer als Suchanfragen für dieses Produkt verwenden könnten und die Kategorie, feine Tags, Funktion und Nutzungsszenarien abdecken. - -Eingabeliste der Produkte: - -""" - prompt_tail = """ -Gib bitte strikt eine Markdown-Tabelle im folgenden Format zurück. Mehrere Werte in einer Spalte werden durch Kommas getrennt. Füge keine weiteren Erklärungen hinzu: - -| Nr. | Produkttitel | Kategoriepfad | Feintags | Zielgruppe | Einsatzszenario | Saison | Wichtige Attribute | Material | Merkmale | Verkaufsargument | Ankertexte | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "ru": - prompt = """Пожалуйста, проанализируйте каждый входной заголовок товара и извлеките следующую информацию: - -1. Заголовок товара: естественное русскоязычное название товара на основе исходного заголовка -2. Путь категории: от широкой до узкой категории, разделённый символом ">" (например: Одежда>Женская одежда>Платья>Деловое платье) -3. Детализированные теги: стиль / особенности / характеристики (например: цветочный принт, приталенный, французский стиль) -4. Целевая аудитория: пол / возрастная группа и т. п. (например: молодые женщины) -5. Сценарий использования -6. Подходящий сезон -7. Ключевые характеристики -8. Описание материала -9. Функциональные особенности -10. Торговое преимущество: одно краткое ключевое предложение для рекомендаций -11. Якорные запросы: набор слов или фраз, которые пользователи могут использовать в качестве поисковых запросов для этого товара, покрывающих категорию, детализированные теги, функциональные характеристики, сценарии использования и т. д. - -Список входных товаров: - -""" - prompt_tail = """ -Пожалуйста, строго верните Markdown‑таблицу в следующем формате. Для колонок с несколькими значениями разделяйте значения запятыми. Не добавляйте никаких дополнительных пояснений: - -| № | Заголовок товара | Путь категории | Детализированные теги | Целевая аудитория | Сценарий использования | Сезон | Ключевые характеристики | Материал | Особенности | Торговое преимущество | Якорные запросы | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - elif target_lang == "fr": - prompt = """Veuillez analyser chaque titre de produit en entrée et extraire les informations suivantes : - -1. Titre du produit : un titre de produit naturel en français basé sur le titre d’origine -2. Chemin de catégorie : de la catégorie la plus large à la plus fine, séparées par ">" (par ex. Vêtements>Femme>Robes>Robe de travail) -3. Tags détaillés : style / caractéristiques / attributs (par ex. fleuri, cintré, style français) -4. Public cible : sexe / tranche d’âge, etc. (par ex. jeunes femmes) -5. Scénario d’utilisation -6. Saison adaptée -7. Attributs clés -8. Description du matériau -9. Caractéristiques fonctionnelles -10. Argument de vente : une phrase concise résumant le principal atout pour la recommandation -11. Texte d’ancrage : un ensemble de mots ou d’expressions que les utilisateurs pourraient saisir comme requêtes de recherche pour ce produit, couvrant la catégorie, les tags détaillés, les fonctions, les scénarios d’usage, etc. - -Liste des produits en entrée : - -""" - prompt_tail = """ -Veuillez strictement renvoyer un tableau Markdown au format suivant. Pour toute colonne pouvant contenir plusieurs valeurs, séparez‑les par des virgules. N’ajoutez aucune autre explication : - -| N° | Titre du produit | Chemin de catégorie | Tags détaillés | Public cible | Scénario d’utilisation | Saison | Attributs clés | Matériau | Caractéristiques | Argument de vente | Texte d’ancrage | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - else: - # 默认中文版本 - prompt = """请对输入的每条商品标题,分析并提取以下信息: - -1. 商品标题:将输入商品名称翻译为自然、完整的中文商品标题 -2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) -3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) -4. 适用人群:性别/年龄段等(例如:年轻女性) -5. 使用场景 -6. 适用季节 -7. 关键属性 -8. 材质说明 -9. 功能特点 -10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 -11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 - -输入商品列表: - -""" - prompt_tail = """ -请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明: - -| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | -|----|----|----|----|----|----|----|----|----|----|----|----| -""" - - for idx, product in enumerate(products, 1): - prompt += f'{idx}. {product["title"]}\n' - prompt += prompt_tail - - return prompt - - -def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]: - """调用大模型API(带重试机制),按目标语言选择系统提示词。""" - headers = { - "Authorization": f"Bearer {API_KEY}", - "Content-Type": "application/json" - } - - payload = { - "model": MODEL_NAME, - "messages": [ - { - "role": "system", - "content": SYSTEM_MESSAGES.get(target_lang, SYSTEM_MESSAGES["zh"]) - }, - { - "role": "user", - "content": prompt - } - ], - "temperature": 0.3, - "top_p": 0.8 - } - - request_data = { - "headers": {k: v for k, v in headers.items() if k != "Authorization"}, - "payload": payload - } - - logger.info(f"\n{'='*80}") - logger.info(f"LLM Request (Model: {MODEL_NAME}):") - logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) - logger.info(f"\nPrompt:\n{prompt}") - - # 创建session,禁用代理 - session = requests.Session() - session.trust_env = False # 忽略系统代理设置 - - try: - # 重试机制 - for attempt in range(MAX_RETRIES): - try: - response = session.post( - f"{API_BASE_URL}/chat/completions", - headers=headers, - json=payload, - timeout=REQUEST_TIMEOUT, - proxies={"http": None, "https": None} # 明确禁用代理 - ) - - response.raise_for_status() - result = response.json() - - logger.info(f"\nLLM Response:") - logger.info(json.dumps(result, ensure_ascii=False, indent=2)) - - content = result["choices"][0]["message"]["content"] - logger.info(f"\nExtracted Content:\n{content}") - - return content, json.dumps(result, ensure_ascii=False) - - except requests.exceptions.ProxyError as e: - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - except requests.exceptions.RequestException as e: - logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - except Exception as e: - logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - raise - - finally: - session.close() - - -def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: - """解析markdown表格内容""" - lines = markdown_content.strip().split('\n') - data = [] - data_started = False - - for line in lines: - line = line.strip() - if not line: - continue - - # 表格行处理 - if line.startswith('|'): - # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |") - sep_chars = line.replace('|', '').strip().replace(' ', '') - if sep_chars and set(sep_chars) <= {'-', ':'}: - data_started = True - continue - - # 首个表头行:无论语言如何,统一跳过 - if not data_started: - # 等待下一行数据行 - continue - - # 解析数据行 - parts = [p.strip() for p in line.split('|')] - parts = [p for p in parts if p] # 移除空字符串 - - if len(parts) >= 2: - row = { - "seq_no": parts[0], - "title": parts[1], # 商品标题(按目标语言) - "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 - "tags": parts[3] if len(parts) > 3 else "", # 细分标签 - "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 - "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 - "season": parts[6] if len(parts) > 6 else "", # 适用季节 - "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 - "material": parts[8] if len(parts) > 8 else "", # 材质说明 - "features": parts[9] if len(parts) > 9 else "", # 功能特点 - "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点 - "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本 - } - data.append(row) - - return data - - -def process_batch( - batch_data: List[Dict[str, str]], - batch_num: int, - target_lang: str = "zh" -) -> List[Dict[str, str]]: - """处理一个批次的数据""" - logger.info(f"\n{'#'*80}") - logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") - - # 创建提示词 - prompt = create_prompt(batch_data, target_lang=target_lang) - - # 调用LLM - try: - raw_response, full_response_json = call_llm(prompt, target_lang=target_lang) - - # 解析结果 - parsed_results = parse_markdown_table(raw_response) - - logger.info(f"\nParsed Results ({len(parsed_results)} items):") - logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) - - # 映射回原始ID - results_with_ids = [] - for i, parsed_item in enumerate(parsed_results): - if i < len(batch_data): - original_id = batch_data[i]["id"] - result = { - "id": original_id, - "lang": target_lang, - "title_input": batch_data[i]["title"], # 原始输入标题 - "title": parsed_item.get("title", ""), # 模型生成的标题 - "category_path": parsed_item.get("category_path", ""), # 品类路径 - "tags": parsed_item.get("tags", ""), # 细分标签 - "target_audience": parsed_item.get("target_audience", ""), # 适用人群 - "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 - "season": parsed_item.get("season", ""), # 适用季节 - "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 - "material": parsed_item.get("material", ""), # 材质说明 - "features": parsed_item.get("features", ""), # 功能特点 - "selling_points": parsed_item.get("selling_points", ""), # 商品卖点 - "anchor_text": parsed_item.get("anchor_text", "") # 锚文本 - } - results_with_ids.append(result) - logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") - - # 保存日志 - batch_log = { - "batch_num": batch_num, - "timestamp": datetime.now().isoformat(), - "input_products": batch_data, - "raw_response": raw_response, - "full_response_json": full_response_json, - "parsed_results": parsed_results, - "final_results": results_with_ids - } - - batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json" - with open(batch_log_file, 'w', encoding='utf-8') as f: - json.dump(batch_log, f, ensure_ascii=False, indent=2) - - logger.info(f"Batch log saved to: {batch_log_file}") - - return results_with_ids - - except Exception as e: - logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) - # 返回空结果,保持ID映射 - return [{ - "id": item["id"], - "lang": target_lang, - "title_input": item["title"], - "title": "", - "category_path": "", - "tags": "", - "target_audience": "", - "usage_scene": "", - "season": "", - "key_attributes": "", - "material": "", - "features": "", - "selling_points": "", - "anchor_text": "", - "error": str(e), - } for item in batch_data] - - -def read_products(input_file: str) -> List[Dict[str, str]]: - """读取CSV文件""" - products = [] - with open(input_file, 'r', encoding='utf-8') as f: - reader = csv.DictReader(f) - for row in reader: - products.append({ - "id": row["id"], - "title": row["title"] - }) - return products - - -def write_results(results: List[Dict[str, str]], output_file: Path): - """写入结果到CSV文件""" - output_file.parent.mkdir(parents=True, exist_ok=True) - - fieldnames = [ - "id", - "lang", - "title_input", - "title", - "category_path", - "tags", - "target_audience", - "usage_scene", - "season", - "key_attributes", - "material", - "features", - "selling_points", - "anchor_text", - ] - - with open(output_file, 'w', encoding='utf-8', newline='') as f: - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(results) - - logger.info(f"\nResults written to: {output_file}") - - -def main(): - """主函数""" - if not API_KEY: - logger.error("Error: DASHSCOPE_API_KEY environment variable not set!") - return - - logger.info(f"Starting product analysis process") - logger.info(f"Input file: {INPUT_FILE}") - logger.info(f"Output file: {OUTPUT_FILE}") - logger.info(f"Batch size: {BATCH_SIZE}") - logger.info(f"Model: {MODEL_NAME}") - - # 读取产品数据 - logger.info(f"\nReading products from {INPUT_FILE}...") - products = read_products(INPUT_FILE) - logger.info(f"Total products to process: {len(products)}") - - # 分批处理 - all_results = [] - total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE - - for i in range(0, len(products), BATCH_SIZE): - batch_num = i // BATCH_SIZE + 1 - batch = products[i:i + BATCH_SIZE] - - logger.info(f"\nProgress: Batch {batch_num}/{total_batches}") - - results = process_batch(batch, batch_num, target_lang="zh") - all_results.extend(results) - - # 每处理完一个批次就写入一次(断点续传) - write_results(all_results, OUTPUT_FILE) - logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed") - - logger.info(f"\n{'='*80}") - logger.info(f"Processing completed!") - logger.info(f"Total processed: {len(all_results)} items") - logger.info(f"Output file: {OUTPUT_FILE}") - logger.info(f"Log file: {log_file}") - - -if __name__ == "__main__": - main() - - -def analyze_products( - products: List[Dict[str, str]], - target_lang: str = "zh", - batch_size: Optional[int] = None, - tenant_id: Optional[str] = None, -) -> List[Dict[str, Any]]: - """ - 库调用入口:根据输入+语言,返回锚文本及各维度信息。 - - Args: - products: [{"id": "...", "title": "..."}] - target_lang: 输出语言,需在 SUPPORTED_LANGS 内 - batch_size: 批大小,默认使用全局 BATCH_SIZE - """ - if not API_KEY: - raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM") - - if target_lang not in SUPPORTED_LANGS: - raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}") - - if not products: - return [] - - # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中 - if len(products) == 1: - p = products[0] - title = str(p.get("title") or "").strip() - if title: - cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id) - if cached: - logger.info( - f"[analyze_products] Cache hit for title='{title[:50]}...', " - f"lang={target_lang}, tenant_id={tenant_id or 'global'}" - ) - return [cached] - - # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20): - # - 尽可能攒批处理; - # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。 - req_bs = BATCH_SIZE if batch_size is None else int(batch_size) - bs = max(1, min(req_bs, BATCH_SIZE)) - all_results: List[Dict[str, Any]] = [] - total_batches = (len(products) + bs - 1) // bs - - for i in range(0, len(products), bs): - batch_num = i // bs + 1 - batch = products[i:i + bs] - logger.info( - f"[analyze_products] Processing batch {batch_num}/{total_batches}, " - f"size={len(batch)}, target_lang={target_lang}" - ) - batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) - all_results.extend(batch_results) - - # 写入缓存 - for item in batch_results: - title_input = str(item.get("title_input") or "").strip() - if not title_input: - continue - if item.get("error"): - # 不缓存错误结果,避免放大临时故障 - continue - try: - _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id) - except Exception: - # 已在内部记录 warning - pass - - return all_results diff --git a/indexer/product_annotator.py b/indexer/product_annotator.py new file mode 100644 index 0000000..a94eb67 --- /dev/null +++ b/indexer/product_annotator.py @@ -0,0 +1,689 @@ +#!/usr/bin/env python3 +""" +商品品类分析脚本 +批量读取商品标题,调用大模型进行品类分析,并保存结果 +""" + +import csv +import os +import json +import logging +import time +import hashlib +from datetime import datetime +from typing import List, Dict, Tuple, Any, Optional + +import redis +import requests +from pathlib import Path +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from config.env_config import REDIS_CONFIG + +# 配置 +BATCH_SIZE = 20 +# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 +# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 +# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 +API_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" +MODEL_NAME = "qwen-flash" +API_KEY = os.environ.get("DASHSCOPE_API_KEY") +MAX_RETRIES = 3 +RETRY_DELAY = 5 # 秒 +REQUEST_TIMEOUT = 180 # 秒 + +# 禁用代理 +os.environ['NO_PROXY'] = '*' +os.environ['no_proxy'] = '*' + +# 文件路径 +INPUT_FILE = "saas_170_products.csv" +OUTPUT_DIR = Path("output_logs") +OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv" +LOG_DIR = OUTPUT_DIR / "logs" + +# 设置独立日志(不影响全局 indexer.log) +LOG_DIR.mkdir(parents=True, exist_ok=True) +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +log_file = LOG_DIR / f"process_{timestamp}.log" + +logger = logging.getLogger("product_annotator") +logger.setLevel(logging.INFO) + +if not logger.handlers: + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + # 避免日志向根 logger 传播,防止写入 logs/indexer.log 等其他文件 + logger.propagate = False + + +# Redis 缓存(用于 anchors / 语义属性) +ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") +ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) +_anchor_redis: Optional[redis.Redis] = None + +try: + _anchor_redis = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=True, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + _anchor_redis.ping() + logger.info("Redis cache initialized for product anchors and semantic attributes") +except Exception as e: + logger.warning(f"Failed to initialize Redis for anchors cache: {e}") + _anchor_redis = None + + +LANG_LABELS: Dict[str, str] = { + "zh": "中文", + "en": "英文", + "de": "德文", + "ru": "俄文", + "fr": "法文", +} + +SUPPORTED_LANGS = set(LANG_LABELS.keys()) + +SYSTEM_MESSAGES: Dict[str, str] = { + "zh": ( + "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注," + "并按要求格式返回 Markdown 表格。所有输出内容必须为中文。" + ), + "en": ( + "You are a product annotator for an e-commerce platform. " + "For each input product, you must understand, analyze and label it, " + "and return a Markdown table strictly following the requested format. " + "All output must be in English." + ), + "de": ( + "Du bist ein Produktannotator für eine E‑Commerce‑Plattform. " + "Du sollst jedes Eingabeprodukt verstehen, analysieren und beschriften " + "und eine Markdown-Tabelle im geforderten Format zurückgeben. " + "Alle Ausgaben müssen auf Deutsch sein." + ), + "ru": ( + "Вы — разметчик товаров для платформы электронной коммерции. " + "Ваша задача — понимать, анализировать и размечать каждый товар " + "и возвращать таблицу Markdown в требуемом формате. " + "Весь вывод должен быть на русском языке." + ), + "fr": ( + "Vous êtes annotateur de produits pour une plateforme e‑commerce. " + "Pour chaque produit en entrée, vous devez le comprendre, l’analyser et l’annoter, " + "puis renvoyer un tableau Markdown au format demandé. " + "Toute la sortie doit être en français." + ), +} + + +def _make_anchor_cache_key( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> str: + """构造 anchors/语义属性的缓存 key。""" + base = (tenant_id or "global").strip() + h = hashlib.md5(title.encode("utf-8")).hexdigest() + return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}" + + +def _get_cached_anchor_result( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + if not _anchor_redis: + return None + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + raw = _anchor_redis.get(key) + if not raw: + return None + return json.loads(raw) + except Exception as e: + logger.warning(f"Failed to get anchor cache: {e}") + return None + + +def _set_cached_anchor_result( + title: str, + target_lang: str, + result: Dict[str, Any], + tenant_id: Optional[str] = None, +) -> None: + if not _anchor_redis: + return + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 + _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) + except Exception as e: + logger.warning(f"Failed to set anchor cache: {e}") + + +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: + """根据目标语言创建 LLM 提示词和表头说明。""" + if target_lang == "en": + prompt = """Please analyze each input product title and extract the following information: + +1. Product title: a natural English product name derived from the input title +2. Category path: from broad to fine-grained category, separated by ">" (e.g. Clothing>Women>Dresses>Work Dress) +3. Fine-grained tags: style / features / attributes (e.g. floral, waist-cinching, French style) +4. Target audience: gender / age group, etc. (e.g. young women) +5. Usage scene +6. Applicable season +7. Key attributes +8. Material description +9. Functional features +10. Selling point: one concise key selling sentence for recommendation +11. Anchor text: a set of words or phrases that could be used by users as search queries for this product, covering category, fine-grained tags, functional attributes, usage scenes, etc. + +Input product list: + +""" + prompt_tail = """ +Please strictly return a Markdown table in the following format. For any column that can contain multiple values, separate values with commas. Do not add any other explanations: + +| No. | Product title | Category path | Fine-grained tags | Target audience | Usage scene | Season | Key attributes | Material | Features | Selling point | Anchor text | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + elif target_lang == "de": + prompt = """Bitte analysiere jeden eingegebenen Produkttitel und extrahiere die folgenden Informationen: + +1. Produkttitel: ein natürlicher deutscher Produkttitel basierend auf dem Eingangstitel +2. Kategoriepfad: von Oberkategorie bis Feinkategorie, getrennt durch ">" (z. B. Kleidung>Damen>Kleider>Businesskleid) +3. Feinkörnige Tags: Stil / Merkmale / Eigenschaften (z. B. Blumenmuster, tailliert, französischer Stil) +4. Zielgruppe: Geschlecht / Altersgruppe usw. (z. B. junge Frauen) +5. Einsatzszenario +6. Geeignete Saison +7. Wichtige Attribute +8. Materialbeschreibung +9. Funktionale Merkmale +10. Verkaufsargument: ein prägnanter, einzeiliger Haupt-Selling-Point für Empfehlungen +11. Ankertexte: eine Menge von Wörtern oder Phrasen, die Nutzer als Suchanfragen für dieses Produkt verwenden könnten und die Kategorie, feine Tags, Funktion und Nutzungsszenarien abdecken. + +Eingabeliste der Produkte: + +""" + prompt_tail = """ +Gib bitte strikt eine Markdown-Tabelle im folgenden Format zurück. Mehrere Werte in einer Spalte werden durch Kommas getrennt. Füge keine weiteren Erklärungen hinzu: + +| Nr. | Produkttitel | Kategoriepfad | Feintags | Zielgruppe | Einsatzszenario | Saison | Wichtige Attribute | Material | Merkmale | Verkaufsargument | Ankertexte | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + elif target_lang == "ru": + prompt = """Пожалуйста, проанализируйте каждый входной заголовок товара и извлеките следующую информацию: + +1. Заголовок товара: естественное русскоязычное название товара на основе исходного заголовка +2. Путь категории: от широкой до узкой категории, разделённый символом ">" (например: Одежда>Женская одежда>Платья>Деловое платье) +3. Детализированные теги: стиль / особенности / характеристики (например: цветочный принт, приталенный, французский стиль) +4. Целевая аудитория: пол / возрастная группа и т. п. (например: молодые женщины) +5. Сценарий использования +6. Подходящий сезон +7. Ключевые характеристики +8. Описание материала +9. Функциональные особенности +10. Торговое преимущество: одно краткое ключевое предложение для рекомендаций +11. Якорные запросы: набор слов или фраз, которые пользователи могут использовать в качестве поисковых запросов для этого товара, покрывающих категорию, детализированные теги, функциональные характеристики, сценарии использования и т. д. + +Список входных товаров: + +""" + prompt_tail = """ +Пожалуйста, строго верните Markdown‑таблицу в следующем формате. Для колонок с несколькими значениями разделяйте значения запятыми. Не добавляйте никаких дополнительных пояснений: + +| № | Заголовок товара | Путь категории | Детализированные теги | Целевая аудитория | Сценарий использования | Сезон | Ключевые характеристики | Материал | Особенности | Торговое преимущество | Якорные запросы | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + elif target_lang == "fr": + prompt = """Veuillez analyser chaque titre de produit en entrée et extraire les informations suivantes : + +1. Titre du produit : un titre de produit naturel en français basé sur le titre d’origine +2. Chemin de catégorie : de la catégorie la plus large à la plus fine, séparées par ">" (par ex. Vêtements>Femme>Robes>Robe de travail) +3. Tags détaillés : style / caractéristiques / attributs (par ex. fleuri, cintré, style français) +4. Public cible : sexe / tranche d’âge, etc. (par ex. jeunes femmes) +5. Scénario d’utilisation +6. Saison adaptée +7. Attributs clés +8. Description du matériau +9. Caractéristiques fonctionnelles +10. Argument de vente : une phrase concise résumant le principal atout pour la recommandation +11. Texte d’ancrage : un ensemble de mots ou d’expressions que les utilisateurs pourraient saisir comme requêtes de recherche pour ce produit, couvrant la catégorie, les tags détaillés, les fonctions, les scénarios d’usage, etc. + +Liste des produits en entrée : + +""" + prompt_tail = """ +Veuillez strictement renvoyer un tableau Markdown au format suivant. Pour toute colonne pouvant contenir plusieurs valeurs, séparez‑les par des virgules. N’ajoutez aucune autre explication : + +| N° | Titre du produit | Chemin de catégorie | Tags détaillés | Public cible | Scénario d’utilisation | Saison | Attributs clés | Matériau | Caractéristiques | Argument de vente | Texte d’ancrage | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + else: + # 默认中文版本 + prompt = """请对输入的每条商品标题,分析并提取以下信息: + +1. 商品标题:将输入商品名称翻译为自然、完整的中文商品标题 +2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) +3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) +4. 适用人群:性别/年龄段等(例如:年轻女性) +5. 使用场景 +6. 适用季节 +7. 关键属性 +8. 材质说明 +9. 功能特点 +10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 +11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 + +输入商品列表: + +""" + prompt_tail = """ +请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明: + +| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + + for idx, product in enumerate(products, 1): + prompt += f'{idx}. {product["title"]}\n' + prompt += prompt_tail + + return prompt + + +def call_llm(prompt: str, target_lang: str = "zh") -> Tuple[str, str]: + """调用大模型API(带重试机制),按目标语言选择系统提示词。""" + headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json" + } + + payload = { + "model": MODEL_NAME, + "messages": [ + { + "role": "system", + "content": SYSTEM_MESSAGES.get(target_lang, SYSTEM_MESSAGES["zh"]) + }, + { + "role": "user", + "content": prompt + } + ], + "temperature": 0.3, + "top_p": 0.8 + } + + request_data = { + "headers": {k: v for k, v in headers.items() if k != "Authorization"}, + "payload": payload + } + + logger.info(f"\n{'='*80}") + logger.info(f"LLM Request (Model: {MODEL_NAME}):") + logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) + logger.info(f"\nPrompt:\n{prompt}") + + # 创建session,禁用代理 + session = requests.Session() + session.trust_env = False # 忽略系统代理设置 + + try: + # 重试机制 + for attempt in range(MAX_RETRIES): + try: + response = session.post( + f"{API_BASE_URL}/chat/completions", + headers=headers, + json=payload, + timeout=REQUEST_TIMEOUT, + proxies={"http": None, "https": None} # 明确禁用代理 + ) + + response.raise_for_status() + result = response.json() + + logger.info(f"\nLLM Response:") + logger.info(json.dumps(result, ensure_ascii=False, indent=2)) + + content = result["choices"][0]["message"]["content"] + logger.info(f"\nExtracted Content:\n{content}") + + return content, json.dumps(result, ensure_ascii=False) + + except requests.exceptions.ProxyError as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except requests.exceptions.RequestException as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except Exception as e: + logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + finally: + session.close() + + +def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: + """解析markdown表格内容""" + lines = markdown_content.strip().split('\n') + data = [] + data_started = False + + for line in lines: + line = line.strip() + if not line: + continue + + # 表格行处理 + if line.startswith('|'): + # 分隔行(---- 或 :---: 等;允许空格,如 "| ---- | ---- |") + sep_chars = line.replace('|', '').strip().replace(' ', '') + if sep_chars and set(sep_chars) <= {'-', ':'}: + data_started = True + continue + + # 首个表头行:无论语言如何,统一跳过 + if not data_started: + # 等待下一行数据行 + continue + + # 解析数据行 + parts = [p.strip() for p in line.split('|')] + parts = [p for p in parts if p] # 移除空字符串 + + if len(parts) >= 2: + row = { + "seq_no": parts[0], + "title": parts[1], # 商品标题(按目标语言) + "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签 + "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 + "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 + "season": parts[6] if len(parts) > 6 else "", # 适用季节 + "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 + "material": parts[8] if len(parts) > 8 else "", # 材质说明 + "features": parts[9] if len(parts) > 9 else "", # 功能特点 + "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点 + "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本 + } + data.append(row) + + return data + + +def process_batch( + batch_data: List[Dict[str, str]], + batch_num: int, + target_lang: str = "zh" +) -> List[Dict[str, str]]: + """处理一个批次的数据""" + logger.info(f"\n{'#'*80}") + logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") + + # 创建提示词 + prompt = create_prompt(batch_data, target_lang=target_lang) + + # 调用LLM + try: + raw_response, full_response_json = call_llm(prompt, target_lang=target_lang) + + # 解析结果 + parsed_results = parse_markdown_table(raw_response) + + logger.info(f"\nParsed Results ({len(parsed_results)} items):") + logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) + + # 映射回原始ID + results_with_ids = [] + for i, parsed_item in enumerate(parsed_results): + if i < len(batch_data): + original_id = batch_data[i]["id"] + result = { + "id": original_id, + "lang": target_lang, + "title_input": batch_data[i]["title"], # 原始输入标题 + "title": parsed_item.get("title", ""), # 模型生成的标题 + "category_path": parsed_item.get("category_path", ""), # 品类路径 + "tags": parsed_item.get("tags", ""), # 细分标签 + "target_audience": parsed_item.get("target_audience", ""), # 适用人群 + "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 + "season": parsed_item.get("season", ""), # 适用季节 + "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 + "material": parsed_item.get("material", ""), # 材质说明 + "features": parsed_item.get("features", ""), # 功能特点 + "selling_points": parsed_item.get("selling_points", ""), # 商品卖点 + "anchor_text": parsed_item.get("anchor_text", "") # 锚文本 + } + results_with_ids.append(result) + logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") + + # 保存日志 + batch_log = { + "batch_num": batch_num, + "timestamp": datetime.now().isoformat(), + "input_products": batch_data, + "raw_response": raw_response, + "full_response_json": full_response_json, + "parsed_results": parsed_results, + "final_results": results_with_ids + } + + batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json" + with open(batch_log_file, 'w', encoding='utf-8') as f: + json.dump(batch_log, f, ensure_ascii=False, indent=2) + + logger.info(f"Batch log saved to: {batch_log_file}") + + return results_with_ids + + except Exception as e: + logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) + # 返回空结果,保持ID映射 + return [{ + "id": item["id"], + "lang": target_lang, + "title_input": item["title"], + "title": "", + "category_path": "", + "tags": "", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": "", + "error": str(e), + } for item in batch_data] + + +def read_products(input_file: str) -> List[Dict[str, str]]: + """读取CSV文件""" + products = [] + with open(input_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + products.append({ + "id": row["id"], + "title": row["title"] + }) + return products + + +def write_results(results: List[Dict[str, str]], output_file: Path): + """写入结果到CSV文件""" + output_file.parent.mkdir(parents=True, exist_ok=True) + + fieldnames = [ + "id", + "lang", + "title_input", + "title", + "category_path", + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + "selling_points", + "anchor_text", + ] + + with open(output_file, 'w', encoding='utf-8', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + + logger.info(f"\nResults written to: {output_file}") + + +def main(): + """主函数""" + if not API_KEY: + logger.error("Error: DASHSCOPE_API_KEY environment variable not set!") + return + + logger.info(f"Starting product analysis process") + logger.info(f"Input file: {INPUT_FILE}") + logger.info(f"Output file: {OUTPUT_FILE}") + logger.info(f"Batch size: {BATCH_SIZE}") + logger.info(f"Model: {MODEL_NAME}") + + # 读取产品数据 + logger.info(f"\nReading products from {INPUT_FILE}...") + products = read_products(INPUT_FILE) + logger.info(f"Total products to process: {len(products)}") + + # 分批处理 + all_results = [] + total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE + + for i in range(0, len(products), BATCH_SIZE): + batch_num = i // BATCH_SIZE + 1 + batch = products[i:i + BATCH_SIZE] + + logger.info(f"\nProgress: Batch {batch_num}/{total_batches}") + + results = process_batch(batch, batch_num, target_lang="zh") + all_results.extend(results) + + # 每处理完一个批次就写入一次(断点续传) + write_results(all_results, OUTPUT_FILE) + logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed") + + logger.info(f"\n{'='*80}") + logger.info(f"Processing completed!") + logger.info(f"Total processed: {len(all_results)} items") + logger.info(f"Output file: {OUTPUT_FILE}") + logger.info(f"Log file: {log_file}") + + +if __name__ == "__main__": + main() + + +def analyze_products( + products: List[Dict[str, str]], + target_lang: str = "zh", + batch_size: Optional[int] = None, + tenant_id: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + 库调用入口:根据输入+语言,返回锚文本及各维度信息。 + + Args: + products: [{"id": "...", "title": "..."}] + target_lang: 输出语言,需在 SUPPORTED_LANGS 内 + batch_size: 批大小,默认使用全局 BATCH_SIZE + """ + if not API_KEY: + raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM") + + if target_lang not in SUPPORTED_LANGS: + raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}") + + if not products: + return [] + + # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中 + if len(products) == 1: + p = products[0] + title = str(p.get("title") or "").strip() + if title: + cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id) + if cached: + logger.info( + f"[analyze_products] Cache hit for title='{title[:50]}...', " + f"lang={target_lang}, tenant_id={tenant_id or 'global'}" + ) + return [cached] + + # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20): + # - 尽可能攒批处理; + # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。 + req_bs = BATCH_SIZE if batch_size is None else int(batch_size) + bs = max(1, min(req_bs, BATCH_SIZE)) + all_results: List[Dict[str, Any]] = [] + total_batches = (len(products) + bs - 1) // bs + + for i in range(0, len(products), bs): + batch_num = i // bs + 1 + batch = products[i:i + bs] + logger.info( + f"[analyze_products] Processing batch {batch_num}/{total_batches}, " + f"size={len(batch)}, target_lang={target_lang}" + ) + batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) + all_results.extend(batch_results) + + # 写入缓存 + for item in batch_results: + title_input = str(item.get("title_input") or "").strip() + if not title_input: + continue + if item.get("error"): + # 不缓存错误结果,避免放大临时故障 + continue + try: + _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id) + except Exception: + # 已在内部记录 warning + pass + + return all_results diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index 084fcfb..bdf7f06 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -868,6 +868,7 @@ Special targets: Examples: ./scripts/service_ctl.sh up all + ./scripts/service_ctl.sh up tei cnclip embedding translator reranker ./scripts/service_ctl.sh up backend indexer frontend ./scripts/service_ctl.sh restart ./scripts/service_ctl.sh monitor-start all diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index 330f5ae..c5c52bf 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -342,7 +342,7 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): - import indexer.process_products as process_products + import indexer.product_annotator as process_products def _fake_analyze_products( products: List[Dict[str, str]], diff --git a/tests/test_process_products_batching.py b/tests/test_process_products_batching.py index 12de801..d491f43 100644 --- a/tests/test_process_products_batching.py +++ b/tests/test_process_products_batching.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import Any, Dict, List -import indexer.process_products as process_products +import indexer.product_annotator as process_products def _mk_products(n: int) -> List[Dict[str, str]]: -- libgit2 0.21.2