diff --git a/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md b/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md new file mode 100644 index 0000000..d88b23e --- /dev/null +++ b/indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md @@ -0,0 +1,416 @@ +## qanchors 与 semantic_attributes 设计与索引逻辑说明 + +本文档详细说明: + +- **锚文本字段 `qanchors.{lang}` 的作用与来源** +- **语义属性字段 `semantic_attributes` 的结构、用途与写入流程** +- **多语言支持策略(zh / en / de / ru / fr)** +- **索引阶段与 LLM 调用的集成方式** + +本设计已默认开启,无需额外开关;在上游 LLM 不可用时会自动降级为“无锚点/语义属性”,不影响主索引流程。 + +--- + +### 1. 字段设计概览 + +#### 1.1 `qanchors.{lang}`:面向查询的锚文本 + +- **Mapping 位置**:`mappings/search_products.json` 中的 `qanchors` 对象。 +- **结构**(与 `title.{lang}` 一致): + +```140:182:/home/tw/SearchEngine/mappings/search_products.json +"qanchors": { + "type": "object", + "properties": { + "zh": { "type": "text", "analyzer": "index_ansj", "search_analyzer": "query_ansj" }, + "en": { "type": "text", "analyzer": "english" }, + "de": { "type": "text", "analyzer": "german" }, + "ru": { "type": "text", "analyzer": "russian" }, + "fr": { "type": "text", "analyzer": "french" }, + ... + } +} +``` + +- **语义**: + 用于承载“更接近用户自然搜索行为”的词/短语(query-style anchors),包括: + - 品类 + 细分类别表达; + - 使用场景(通勤、约会、度假、office outfit 等); + - 适用人群(年轻女性、plus size、teen boys 等); + - 材质 / 关键属性 / 功能特点等。 + +- **使用场景**: + - 主搜索:作为额外的全文字段参与 BM25 召回与打分(可在 `search/query_config.py` 中给一定权重); + - Suggestion:`suggestion/builder.py` 会从 `qanchors.{lang}` 中拆分词条作为候选(`source="qanchor"`,权重大于 `title`)。 + +#### 1.2 `semantic_attributes`:面向过滤/分面的通用语义属性 + +- **Mapping 位置**:`mappings/search_products.json`,追加的 nested 字段。 +- **结构**: + +```1392:1410:/home/tw/SearchEngine/mappings/search_products.json +"semantic_attributes": { + "type": "nested", + "properties": { + "lang": { "type": "keyword" }, // 语言:zh / en / de / ru / fr + "name": { "type": "keyword" }, // 维度名:usage_scene / target_audience / material / ... + "value": { "type": "keyword" } // 维度值:通勤 / office / Baumwolle ... + } +} +``` + +- **语义**: + - 将 LLM 输出的各维度信息统一规约到 `name/value/lang` 三元组; + - 维度名稳定、值内容可变,便于后续扩展新的语义维度而不需要修改 mapping。 + +- **当前支持的维度名**(在 `document_transformer.py` 中固定列表): + - `tags`:细分标签/风格标签; + - `target_audience`:适用人群; + - `usage_scene`:使用场景; + - `season`:适用季节; + - `key_attributes`:关键属性; + - `material`:材质说明; + - `features`:功能特点。 + +- **使用场景**: + - 按语义维度过滤: + - 例:只要“适用人群=年轻女性”的商品; + - 例:`usage_scene` 包含 “office” 或 “通勤”。 + - 按语义维度分面 / 展示筛选项: + - 例:展示当前结果中所有 `usage_scene` 的分布,供前端勾选; + - 例:展示所有 `material` 值 + 命中文档数。 + +--- + +### 2. LLM 分析服务:`indexer/process_products.py` + +#### 2.1 入口函数:`analyze_products` + +- **文件**:`indexer/process_products.py` +- **函数签名**: + +```365:392:/home/tw/SearchEngine/indexer/process_products.py +def analyze_products( + products: List[Dict[str, str]], + target_lang: str = "zh", + batch_size: Optional[int] = None, +) -> List[Dict[str, Any]]: + """ + 库调用入口:根据输入+语言,返回锚文本及各维度信息。 + + Args: + products: [{"id": "...", "title": "..."}] + target_lang: 输出语言,需在 SUPPORTED_LANGS 内 + batch_size: 批大小,默认使用全局 BATCH_SIZE + """ + ... +``` + +- **支持的输出语言**(在同文件中定义): + +```54:62:/home/tw/SearchEngine/indexer/process_products.py +LANG_LABELS: Dict[str, str] = { + "zh": "中文", + "en": "英文", + "de": "德文", + "ru": "俄文", + "fr": "法文", +} +SUPPORTED_LANGS = set(LANG_LABELS.keys()) +``` + +- **返回结构**(每个商品一条记录): + +```python +{ + "id": "", + "lang": "", + "title_input": "<原始输入标题>", + "title": "<目标语言的标题>", + "category_path": "", + "tags": "<逗号分隔的细分标签>", + "target_audience": "<逗号分隔的适用人群>", + "usage_scene": "<逗号分隔的使用场景>", + "season": "<逗号分隔的适用季节>", + "key_attributes": "<逗号分隔的关键属性>", + "material": "<逗号分隔的材质说明>", + "features": "<逗号分隔的功能特点>", + "selling_points": "<一句话卖点>", + "anchor_text": "<逗号分隔的锚文本短语>", + # 若发生错误,还会附带: + # "error": "<异常信息>" +} +``` + +> 注意:表格中的多值字段(标签/场景/人群/材质等)约定为**使用逗号分隔**,后续索引端会统一按正则 `[,;|/\\n\\t]+` 再拆分为短语。 + +#### 2.2 Prompt 设计与语言控制 + +- Prompt 中会明确要求“**所有输出内容使用目标语言**”,并给出中英文示例: + +```65:81:/home/tw/SearchEngine/indexer/process_products.py +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: + """创建LLM提示词(根据目标语言输出)""" + lang_label = LANG_LABELS.get(target_lang, "对应语言") + prompt = f"""请对输入的每条商品标题,分析并提取以下信息,所有输出内容请使用{lang_label}: + +1. 商品标题:将输入商品名称翻译为{lang_label} +2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) +3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) +4. 适用人群:性别/年龄段等(例如:年轻女性) +5. 使用场景 +6. 适用季节 +7. 关键属性 +8. 材质说明 +9. 功能特点 +10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 +11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 +""" +``` + +- 返回格式固定为 Markdown 表格,首行头为: + +```89:91:/home/tw/SearchEngine/indexer/process_products.py +| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | +|----|----|----|----|----|----|----|----|----|----|----|----| +``` + +`parse_markdown_table` 会按表格列顺序解析成字段。 + +--- + +### 3. 索引阶段集成:`SPUDocumentTransformer._fill_llm_attributes` + +#### 3.1 调用时机 + +在 `SPUDocumentTransformer.transform_spu_to_doc(...)` 的末尾,在所有基础字段(多语言文本、类目、SKU/规格、价格、库存等)填充完成后,会调用: + +```96:101:/home/tw/SearchEngine/indexer/document_transformer.py + # 文本字段处理(翻译等) + self._fill_text_fields(doc, spu_row, primary_lang) + + # 标题向量化 + if self.enable_title_embedding and self.encoder: + self._fill_title_embedding(doc) + ... + # 时间字段 + ... + + # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志) + self._fill_llm_attributes(doc, spu_row) +``` + +也就是说,**每个 SPU 文档默认会尝试补充 qanchors 与 semantic_attributes**。 + +#### 3.2 语言选择策略 + +在 `_fill_llm_attributes` 内部: + +```148:164:/home/tw/SearchEngine/indexer/document_transformer.py + try: + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] + except Exception: + index_langs = ["en", "zh"] + + # 只在支持的语言集合内调用 + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] + if not llm_langs: + return +``` + +- `tenant_config.index_languages` 决定该租户希望在索引中支持哪些语言; +- 实际调用 LLM 的语言集合 = `index_languages ∩ SUPPORTED_LANGS`; +- 当前 SUPPORTED_LANGS:`{"zh", "en", "de", "ru", "fr"}`。 + +这保证了: + +- 如果租户只索引 `zh`,就只跑中文; +- 如果租户同时索引 `en` + `de`,就为这两种语言各跑一次 LLM; +- 如果 `index_languages` 里包含暂不支持的语言(例如 `es`),会被自动忽略。 + +#### 3.3 调用 LLM 并写入字段 + +核心逻辑(简化描述): + +```164:210:/home/tw/SearchEngine/indexer/document_transformer.py + spu_id = str(spu_row.get("id") or "").strip() + title = str(spu_row.get("title") or "").strip() + if not spu_id or not title: + return + + semantic_list = doc.get("semantic_attributes") or [] + qanchors_obj = doc.get("qanchors") or {} + + dim_keys = [ + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + ] + + for lang in llm_langs: + try: + rows = analyze_products( + products=[{"id": spu_id, "title": title}], + target_lang=lang, + batch_size=1, + ) + except Exception as e: + logger.warning("LLM attribute fill failed for SPU %s, lang=%s: %s", spu_id, lang, e) + continue + + if not rows: + continue + row = rows[0] or {} + + # qanchors.{lang} + anchor_text = str(row.get("anchor_text") or "").strip() + if anchor_text: + qanchors_obj[lang] = anchor_text + + # 语义属性 + for name in dim_keys: + raw = row.get(name) + if not raw: + continue + parts = re.split(r"[,;|/\n\t]+", str(raw)) + for part in parts: + value = part.strip() + if not value: + continue + semantic_list.append( + { + "lang": lang, + "name": name, + "value": value, + } + ) + + if qanchors_obj: + doc["qanchors"] = qanchors_obj + if semantic_list: + doc["semantic_attributes"] = semantic_list +``` + +要点: + +- 每种语言**单独调用一次** `analyze_products`,传入同一 SPU 的原始标题; +- 将返回的 `anchor_text` 直接写入 `qanchors.{lang}`,其内部仍是逗号分隔短语,后续 suggestion builder 会再拆分; +- 对各维度字段(tags/usage_scene/...)用统一正则进行“松散拆词”,过滤空串后,以 `(lang,name,value)` 三元组追加到 nested 数组; +- 如果某个维度在该语言下为空,则跳过,不写入任何条目。 + +#### 3.4 容错 & 降级策略 + +- 如果: + - 没有 `title`; + - 或者 `tenant_config.index_languages` 与 `SUPPORTED_LANGS` 没有交集; + - 或 `DASHSCOPE_API_KEY` 未配置 / LLM 请求报错; +- 则 `_fill_llm_attributes` 会在日志中输出 `warning`,**不会抛异常**,索引流程继续,只是该 SPU 在这一轮不会得到 `qanchors` / `semantic_attributes`。 + +这保证了整个索引服务在 LLM 不可用时表现为一个普通的“传统索引”,而不会中断。 + +--- + +### 4. 查询与 Suggestion 中的使用建议 + +#### 4.1 主搜索(Search API) + +在 `search/query_config.py` 或构建 ES 查询时,可以: + +- 将 `qanchors.{lang}` 作为额外的 `should` 字段参与匹配,并给一个略高的权重,例如: + +```json +{ + "multi_match": { + "query": "", + "fields": [ + "title.zh^3.0", + "brief.zh^1.5", + "description.zh^1.0", + "vendor.zh^1.5", + "category_path.zh^1.5", + "category_name_text.zh^1.5", + "tags^1.0", + "qanchors.zh^2.0" // 建议新增 + ] + } +} +``` + +- 当用户做维度过滤时(例如“只看通勤场景 + 夏季 + 棉质”),可以在 filter 中增加 nested 查询: + +```json +{ + "nested": { + "path": "semantic_attributes", + "query": { + "bool": { + "must": [ + { "term": { "semantic_attributes.lang": "zh" } }, + { "term": { "semantic_attributes.name": "usage_scene" } }, + { "term": { "semantic_attributes.value": "通勤" } } + ] + } + } + } +} +``` + +多个维度可以通过多个 nested 子句组合(AND/OR 逻辑与 `specifications` 的设计类似)。 + +#### 4.2 Suggestion(联想词) + +现有 `suggestion/builder.py` 已经支持从 `qanchors.{lang}` 中提取候选: + +```249:287:/home/tw/SearchEngine/suggestion/builder.py + # Step 1: product title/qanchors + hits = self._scan_products(tenant_id, batch_size=batch_size) + ... + title_obj = src.get("title") or {} + qanchor_obj = src.get("qanchors") or {} + ... + for lang in index_languages: + ... + q_raw = None + if isinstance(qanchor_obj, dict): + q_raw = qanchor_obj.get(lang) + for q_text in self._split_qanchors(q_raw): + text_norm = self._normalize_text(q_text) + if self._looks_noise(text_norm): + continue + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6) +``` + +- `_split_qanchors` 使用与索引端一致的分隔符集合,确保: + - 无论 LLM 用逗号、分号还是换行分隔,只要符合约定,都能被拆成单独候选词; +- `add_product("qanchor", ...)` 会: + - 将来源标记为 `qanchor`; + - 在排序打分时,`qanchor` 命中会比纯 `title` 更有权重。 + +--- + +### 5. 总结与扩展方向 + +1. **功能定位**: + - `qanchors.{lang}`:更好地贴近用户真实查询词,用于召回与 suggestion; + - `semantic_attributes`:以结构化形式承载 LLM 抽取的语义维度,用于 filter / facet。 +2. **多语言对齐**: + - 完全复用租户级 `index_languages` 配置; + - 对每种语言单独生成锚文本与语义属性,不互相混用。 +3. **默认开启 / 自动降级**: + - 索引流程始终可用; + - 当 LLM/配置异常时,只是“缺少增强特征”,不影响基础搜索能力。 +4. **未来扩展**: + - 可以在 `dim_keys` 中新增维度名(如 `style`, `benefit` 等),只要在 prompt 与解析逻辑中增加对应列即可; + - 可以为 `semantic_attributes` 增加额外字段(如 `confidence`、`source`),用于更精细的控制(当前 mapping 为简单版)。 + +如需在查询层面增加基于 `semantic_attributes` 的统一 DSL(类似 `specifications` 的过滤/分面规则),推荐在 `docs/搜索API对接指南.md` 中新增一节,并在 `search/es_query_builder.py` 里封装构造逻辑,避免前端直接拼 nested 查询。 + diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 8177fc5..9522842 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -11,8 +11,10 @@ SPU文档转换器 - 公共转换逻辑。 import pandas as pd import numpy as np import logging +import re from typing import Dict, Any, Optional, List from config import ConfigLoader +from indexer.process_products import analyze_products, SUPPORTED_LANGS logger = logging.getLogger(__name__) @@ -168,6 +170,9 @@ class SPUDocumentTransformer: else: doc['update_time'] = str(update_time) + # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志) + self._fill_llm_attributes(doc, spu_row) + return doc def _fill_text_fields( @@ -473,6 +478,88 @@ class SPUDocumentTransformer: else: doc['option3_values'] = [] + def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: + """ + 调用 indexer.process_products.analyze_products,为当前 SPU 填充: + - qanchors.{lang} + - semantic_attributes (lang/name/value) + """ + try: + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] + except Exception: + index_langs = ["en", "zh"] + + # 只在支持的语言集合内调用 + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] + if not llm_langs: + return + + spu_id = str(spu_row.get("id") or "").strip() + title = str(spu_row.get("title") or "").strip() + if not spu_id or not title: + return + + semantic_list = doc.get("semantic_attributes") or [] + qanchors_obj = doc.get("qanchors") or {} + + dim_keys = [ + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + ] + + for lang in llm_langs: + try: + rows = analyze_products( + products=[{"id": spu_id, "title": title}], + target_lang=lang, + batch_size=1, + ) + except Exception as e: + logger.warning( + "LLM attribute fill failed for SPU %s, lang=%s: %s", + spu_id, + lang, + e, + ) + continue + + if not rows: + continue + row = rows[0] or {} + + # qanchors.{lang} + anchor_text = str(row.get("anchor_text") or "").strip() + if anchor_text: + qanchors_obj[lang] = anchor_text + + # 语义属性:按各维度拆分为短语 + for name in dim_keys: + raw = row.get(name) + if not raw: + continue + parts = re.split(r"[,;|/\n\t]+", str(raw)) + for part in parts: + value = part.strip() + if not value: + continue + semantic_list.append( + { + "lang": lang, + "name": name, + "value": value, + } + ) + + if qanchors_obj: + doc["qanchors"] = qanchors_obj + if semantic_list: + doc["semantic_attributes"] = semantic_list + def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ 将SKU行转换为SKU对象。 diff --git a/indexer/process_products.py b/indexer/process_products.py new file mode 100644 index 0000000..6214a46 --- /dev/null +++ b/indexer/process_products.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +""" +商品品类分析脚本 +批量读取商品标题,调用大模型进行品类分析,并保存结果 +""" + +import csv +import os +import json +import logging +import time +from datetime import datetime +from typing import List, Dict, Tuple, Any, Optional +import requests +from pathlib import Path +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# 配置 +BATCH_SIZE = 20 +API_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" +MODEL_NAME = "qwen-max" +API_KEY = os.environ.get("DASHSCOPE_API_KEY") +MAX_RETRIES = 3 +RETRY_DELAY = 5 # 秒 +REQUEST_TIMEOUT = 180 # 秒 + +# 禁用代理 +os.environ['NO_PROXY'] = '*' +os.environ['no_proxy'] = '*' + +# 文件路径 +INPUT_FILE = "saas_170_products.csv" +OUTPUT_DIR = Path("output_logs") +OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv" +LOG_DIR = OUTPUT_DIR / "logs" + +# 设置日志 +LOG_DIR.mkdir(parents=True, exist_ok=True) +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +log_file = LOG_DIR / f"process_{timestamp}.log" + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file, encoding='utf-8'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +LANG_LABELS: Dict[str, str] = { + "zh": "中文", + "en": "英文", + "de": "德文", + "ru": "俄文", + "fr": "法文", +} + +SUPPORTED_LANGS = set(LANG_LABELS.keys()) + + +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: + """创建LLM提示词(根据目标语言输出)""" + lang_label = LANG_LABELS.get(target_lang, "对应语言") + prompt = f"""请对输入的每条商品标题,分析并提取以下信息,所有输出内容请使用{lang_label}: + +1. 商品标题:将输入商品名称翻译为{lang_label} +2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) +3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) +4. 适用人群:性别/年龄段等(例如:年轻女性) +5. 使用场景 +6. 适用季节 +7. 关键属性 +8. 材质说明 +9. 功能特点 +10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 +11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 + +输入商品列表: + +""" + + prompt_tail = """ +请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明: + +| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | +|----|----|----|----|----|----|----|----|----|----|----|----| +""" + + for idx, product in enumerate(products, 1): + prompt += f'{idx}. {product["title"]}\n' + prompt += prompt_tail + + return prompt + + +def call_llm(prompt: str) -> Tuple[str, str]: + """调用大模型API(带重试机制)""" + headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json" + } + + payload = { + "model": MODEL_NAME, + "messages": [ + { + "role": "system", + "content": "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注,按要求格式返回Markdown表格。" + }, + { + "role": "user", + "content": prompt + } + ], + "temperature": 0.3, + "top_p": 0.8 + } + + request_data = { + "headers": {k: v for k, v in headers.items() if k != "Authorization"}, + "payload": payload + } + + logger.info(f"\n{'='*80}") + logger.info(f"LLM Request (Model: {MODEL_NAME}):") + logger.info(json.dumps(request_data, ensure_ascii=False, indent=2)) + logger.info(f"\nPrompt:\n{prompt}") + + # 创建session,禁用代理 + session = requests.Session() + session.trust_env = False # 忽略系统代理设置 + + try: + # 重试机制 + for attempt in range(MAX_RETRIES): + try: + response = session.post( + f"{API_BASE_URL}/chat/completions", + headers=headers, + json=payload, + timeout=REQUEST_TIMEOUT, + proxies={"http": None, "https": None} # 明确禁用代理 + ) + + response.raise_for_status() + result = response.json() + + logger.info(f"\nLLM Response:") + logger.info(json.dumps(result, ensure_ascii=False, indent=2)) + + content = result["choices"][0]["message"]["content"] + logger.info(f"\nExtracted Content:\n{content}") + + return content, json.dumps(result, ensure_ascii=False) + + except requests.exceptions.ProxyError as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except requests.exceptions.RequestException as e: + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + except Exception as e: + logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}") + if attempt < MAX_RETRIES - 1: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + raise + + finally: + session.close() + + +def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: + """解析markdown表格内容""" + lines = markdown_content.strip().split('\n') + data = [] + data_started = False + + for line in lines: + line = line.strip() + if not line: + continue + + # 跳过表头 + if line.startswith('|'): + # 跳过分隔行 + if set(line.replace('|', '').strip()) <= {'-', ':'}: + data_started = True + continue + + # 跳过表头行 + if not data_started: + if '序号' in line or '商品中文标题' in line: + continue + data_started = True + continue + + # 解析数据行 + parts = [p.strip() for p in line.split('|')] + parts = [p for p in parts if p] # 移除空字符串 + + if len(parts) >= 2: + row = { + "seq_no": parts[0], + "title": parts[1], # 商品标题(按目标语言) + "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签 + "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 + "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 + "season": parts[6] if len(parts) > 6 else "", # 适用季节 + "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 + "material": parts[8] if len(parts) > 8 else "", # 材质说明 + "features": parts[9] if len(parts) > 9 else "", # 功能特点 + "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点 + "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本 + } + data.append(row) + + return data + + +def process_batch( + batch_data: List[Dict[str, str]], + batch_num: int, + target_lang: str = "zh" +) -> List[Dict[str, str]]: + """处理一个批次的数据""" + logger.info(f"\n{'#'*80}") + logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") + + # 创建提示词 + prompt = create_prompt(batch_data, target_lang=target_lang) + + # 调用LLM + try: + raw_response, full_response_json = call_llm(prompt) + + # 解析结果 + parsed_results = parse_markdown_table(raw_response) + + logger.info(f"\nParsed Results ({len(parsed_results)} items):") + logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) + + # 映射回原始ID + results_with_ids = [] + for i, parsed_item in enumerate(parsed_results): + if i < len(batch_data): + original_id = batch_data[i]["id"] + result = { + "id": original_id, + "lang": target_lang, + "title_input": batch_data[i]["title"], # 原始输入标题 + "title": parsed_item.get("title", ""), # 模型生成的标题 + "category_path": parsed_item.get("category_path", ""), # 品类路径 + "tags": parsed_item.get("tags", ""), # 细分标签 + "target_audience": parsed_item.get("target_audience", ""), # 适用人群 + "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 + "season": parsed_item.get("season", ""), # 适用季节 + "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 + "material": parsed_item.get("material", ""), # 材质说明 + "features": parsed_item.get("features", ""), # 功能特点 + "selling_points": parsed_item.get("selling_points", ""), # 商品卖点 + "anchor_text": parsed_item.get("anchor_text", "") # 锚文本 + } + results_with_ids.append(result) + logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") + + # 保存日志 + batch_log = { + "batch_num": batch_num, + "timestamp": datetime.now().isoformat(), + "input_products": batch_data, + "raw_response": raw_response, + "full_response_json": full_response_json, + "parsed_results": parsed_results, + "final_results": results_with_ids + } + + batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json" + with open(batch_log_file, 'w', encoding='utf-8') as f: + json.dump(batch_log, f, ensure_ascii=False, indent=2) + + logger.info(f"Batch log saved to: {batch_log_file}") + + return results_with_ids + + except Exception as e: + logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) + # 返回空结果,保持ID映射 + return [{ + "id": item["id"], + "lang": target_lang, + "title_input": item["title"], + "title": "", + "category_path": "", + "tags": "", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": "", + "error": str(e), + } for item in batch_data] + + +def read_products(input_file: str) -> List[Dict[str, str]]: + """读取CSV文件""" + products = [] + with open(input_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + products.append({ + "id": row["id"], + "title": row["title"] + }) + return products + + +def write_results(results: List[Dict[str, str]], output_file: Path): + """写入结果到CSV文件""" + output_file.parent.mkdir(parents=True, exist_ok=True) + + fieldnames = [ + "id", + "lang", + "title_input", + "title", + "category_path", + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + "selling_points", + "anchor_text", + ] + + with open(output_file, 'w', encoding='utf-8', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + + logger.info(f"\nResults written to: {output_file}") + + +def main(): + """主函数""" + if not API_KEY: + logger.error("Error: DASHSCOPE_API_KEY environment variable not set!") + return + + logger.info(f"Starting product analysis process") + logger.info(f"Input file: {INPUT_FILE}") + logger.info(f"Output file: {OUTPUT_FILE}") + logger.info(f"Batch size: {BATCH_SIZE}") + logger.info(f"Model: {MODEL_NAME}") + + # 读取产品数据 + logger.info(f"\nReading products from {INPUT_FILE}...") + products = read_products(INPUT_FILE) + logger.info(f"Total products to process: {len(products)}") + + # 分批处理 + all_results = [] + total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE + + for i in range(0, len(products), BATCH_SIZE): + batch_num = i // BATCH_SIZE + 1 + batch = products[i:i + BATCH_SIZE] + + logger.info(f"\nProgress: Batch {batch_num}/{total_batches}") + + results = process_batch(batch, batch_num, target_lang="zh") + all_results.extend(results) + + # 每处理完一个批次就写入一次(断点续传) + write_results(all_results, OUTPUT_FILE) + logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed") + + logger.info(f"\n{'='*80}") + logger.info(f"Processing completed!") + logger.info(f"Total processed: {len(all_results)} items") + logger.info(f"Output file: {OUTPUT_FILE}") + logger.info(f"Log file: {log_file}") + + +if __name__ == "__main__": + main() + + +def analyze_products( + products: List[Dict[str, str]], + target_lang: str = "zh", + batch_size: Optional[int] = None, +) -> List[Dict[str, Any]]: + """ + 库调用入口:根据输入+语言,返回锚文本及各维度信息。 + + Args: + products: [{"id": "...", "title": "..."}] + target_lang: 输出语言,需在 SUPPORTED_LANGS 内 + batch_size: 批大小,默认使用全局 BATCH_SIZE + """ + if not API_KEY: + raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM") + + if target_lang not in SUPPORTED_LANGS: + raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}") + + if not products: + return [] + + bs = batch_size or BATCH_SIZE + all_results: List[Dict[str, Any]] = [] + total_batches = (len(products) + bs - 1) // bs + + for i in range(0, len(products), bs): + batch_num = i // bs + 1 + batch = products[i:i + bs] + logger.info( + f"[analyze_products] Processing batch {batch_num}/{total_batches}, " + f"size={len(batch)}, target_lang={target_lang}" + ) + batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) + all_results.extend(batch_results) + + return all_results diff --git a/mappings/search_products.json b/mappings/search_products.json index 105575b..9b1706e 100644 --- a/mappings/search_products.json +++ b/mappings/search_products.json @@ -1388,6 +1388,20 @@ "index": false } } + }, + "semantic_attributes": { + "type": "nested", + "properties": { + "lang": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "value": { + "type": "keyword" + } + } } } } -- libgit2 0.21.2