Commit d54b04679a319440b1e54ea8b6d33e51da4fd726

Authored by tangwang
1 parent 316c97c4

feat: 为商品索引补充 qanchors 与语义属性

- 新增 indexer/process_products.analyze_products 接口,封装对 DashScope LLM 的调用逻辑,支持 zh/en/de/ru/fr 多语言输出,并结构化返回 anchor_text、tags、usage_scene、target_audience、season、key_attributes、material、features 等字段,既可脚本批处理也可在索引阶段按需调用。
- 在 SPUDocumentTransformer 中引入 _fill_llm_attributes,按租户 index_languages 与支持语言的交集,对每个 SPU/语言调用 analyze_products,默认开启 LLM 增强:成功时为 doc 填充 qanchors.{lang}(query 风格锚文本)以及 nested semantic_attributes(lang/name/value) 语义维度信息,失败时仅打 warn 日志并优雅降级,不影响主索引链路。
- 扩展 search_products.json mapping,在商品文档上新增 nested 字段 semantic_attributes(lang/name/value),以通用三元组形式承载 LLM 抽取的场景、人群、材质、风格等可变维度,为后续按语义维度做过滤和分面聚合提供统一的结构化载体。
- 编写 indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md 设计文档,系统梳理 qanchors 与 semantic_attributes 的字段含义、索引与多语言策略、与 suggestion 构建器的集成方式以及在搜索过滤/分面中的推荐用法,方便后续维护与功能扩展。

Made-with: Cursor
indexer/ANCHORS_AND_SEMANTIC_ATTRIBUTES.md 0 → 100644
@@ -0,0 +1,416 @@ @@ -0,0 +1,416 @@
  1 +## qanchors 与 semantic_attributes 设计与索引逻辑说明
  2 +
  3 +本文档详细说明:
  4 +
  5 +- **锚文本字段 `qanchors.{lang}` 的作用与来源**
  6 +- **语义属性字段 `semantic_attributes` 的结构、用途与写入流程**
  7 +- **多语言支持策略(zh / en / de / ru / fr)**
  8 +- **索引阶段与 LLM 调用的集成方式**
  9 +
  10 +本设计已默认开启,无需额外开关;在上游 LLM 不可用时会自动降级为“无锚点/语义属性”,不影响主索引流程。
  11 +
  12 +---
  13 +
  14 +### 1. 字段设计概览
  15 +
  16 +#### 1.1 `qanchors.{lang}`:面向查询的锚文本
  17 +
  18 +- **Mapping 位置**:`mappings/search_products.json` 中的 `qanchors` 对象。
  19 +- **结构**(与 `title.{lang}` 一致):
  20 +
  21 +```140:182:/home/tw/SearchEngine/mappings/search_products.json
  22 +"qanchors": {
  23 + "type": "object",
  24 + "properties": {
  25 + "zh": { "type": "text", "analyzer": "index_ansj", "search_analyzer": "query_ansj" },
  26 + "en": { "type": "text", "analyzer": "english" },
  27 + "de": { "type": "text", "analyzer": "german" },
  28 + "ru": { "type": "text", "analyzer": "russian" },
  29 + "fr": { "type": "text", "analyzer": "french" },
  30 + ...
  31 + }
  32 +}
  33 +```
  34 +
  35 +- **语义**:
  36 + 用于承载“更接近用户自然搜索行为”的词/短语(query-style anchors),包括:
  37 + - 品类 + 细分类别表达;
  38 + - 使用场景(通勤、约会、度假、office outfit 等);
  39 + - 适用人群(年轻女性、plus size、teen boys 等);
  40 + - 材质 / 关键属性 / 功能特点等。
  41 +
  42 +- **使用场景**:
  43 + - 主搜索:作为额外的全文字段参与 BM25 召回与打分(可在 `search/query_config.py` 中给一定权重);
  44 + - Suggestion:`suggestion/builder.py` 会从 `qanchors.{lang}` 中拆分词条作为候选(`source="qanchor"`,权重大于 `title`)。
  45 +
  46 +#### 1.2 `semantic_attributes`:面向过滤/分面的通用语义属性
  47 +
  48 +- **Mapping 位置**:`mappings/search_products.json`,追加的 nested 字段。
  49 +- **结构**:
  50 +
  51 +```1392:1410:/home/tw/SearchEngine/mappings/search_products.json
  52 +"semantic_attributes": {
  53 + "type": "nested",
  54 + "properties": {
  55 + "lang": { "type": "keyword" }, // 语言:zh / en / de / ru / fr
  56 + "name": { "type": "keyword" }, // 维度名:usage_scene / target_audience / material / ...
  57 + "value": { "type": "keyword" } // 维度值:通勤 / office / Baumwolle ...
  58 + }
  59 +}
  60 +```
  61 +
  62 +- **语义**:
  63 + - 将 LLM 输出的各维度信息统一规约到 `name/value/lang` 三元组;
  64 + - 维度名稳定、值内容可变,便于后续扩展新的语义维度而不需要修改 mapping。
  65 +
  66 +- **当前支持的维度名**(在 `document_transformer.py` 中固定列表):
  67 + - `tags`:细分标签/风格标签;
  68 + - `target_audience`:适用人群;
  69 + - `usage_scene`:使用场景;
  70 + - `season`:适用季节;
  71 + - `key_attributes`:关键属性;
  72 + - `material`:材质说明;
  73 + - `features`:功能特点。
  74 +
  75 +- **使用场景**:
  76 + - 按语义维度过滤:
  77 + - 例:只要“适用人群=年轻女性”的商品;
  78 + - 例:`usage_scene` 包含 “office” 或 “通勤”。
  79 + - 按语义维度分面 / 展示筛选项:
  80 + - 例:展示当前结果中所有 `usage_scene` 的分布,供前端勾选;
  81 + - 例:展示所有 `material` 值 + 命中文档数。
  82 +
  83 +---
  84 +
  85 +### 2. LLM 分析服务:`indexer/process_products.py`
  86 +
  87 +#### 2.1 入口函数:`analyze_products`
  88 +
  89 +- **文件**:`indexer/process_products.py`
  90 +- **函数签名**:
  91 +
  92 +```365:392:/home/tw/SearchEngine/indexer/process_products.py
  93 +def analyze_products(
  94 + products: List[Dict[str, str]],
  95 + target_lang: str = "zh",
  96 + batch_size: Optional[int] = None,
  97 +) -> List[Dict[str, Any]]:
  98 + """
  99 + 库调用入口:根据输入+语言,返回锚文本及各维度信息。
  100 +
  101 + Args:
  102 + products: [{"id": "...", "title": "..."}]
  103 + target_lang: 输出语言,需在 SUPPORTED_LANGS 内
  104 + batch_size: 批大小,默认使用全局 BATCH_SIZE
  105 + """
  106 + ...
  107 +```
  108 +
  109 +- **支持的输出语言**(在同文件中定义):
  110 +
  111 +```54:62:/home/tw/SearchEngine/indexer/process_products.py
  112 +LANG_LABELS: Dict[str, str] = {
  113 + "zh": "中文",
  114 + "en": "英文",
  115 + "de": "德文",
  116 + "ru": "俄文",
  117 + "fr": "法文",
  118 +}
  119 +SUPPORTED_LANGS = set(LANG_LABELS.keys())
  120 +```
  121 +
  122 +- **返回结构**(每个商品一条记录):
  123 +
  124 +```python
  125 +{
  126 + "id": "<SPU_ID>",
  127 + "lang": "<zh|en|de|ru|fr>",
  128 + "title_input": "<原始输入标题>",
  129 + "title": "<目标语言的标题>",
  130 + "category_path": "<LLM 生成的品类路径>",
  131 + "tags": "<逗号分隔的细分标签>",
  132 + "target_audience": "<逗号分隔的适用人群>",
  133 + "usage_scene": "<逗号分隔的使用场景>",
  134 + "season": "<逗号分隔的适用季节>",
  135 + "key_attributes": "<逗号分隔的关键属性>",
  136 + "material": "<逗号分隔的材质说明>",
  137 + "features": "<逗号分隔的功能特点>",
  138 + "selling_points": "<一句话卖点>",
  139 + "anchor_text": "<逗号分隔的锚文本短语>",
  140 + # 若发生错误,还会附带:
  141 + # "error": "<异常信息>"
  142 +}
  143 +```
  144 +
  145 +> 注意:表格中的多值字段(标签/场景/人群/材质等)约定为**使用逗号分隔**,后续索引端会统一按正则 `[,;|/\\n\\t]+` 再拆分为短语。
  146 +
  147 +#### 2.2 Prompt 设计与语言控制
  148 +
  149 +- Prompt 中会明确要求“**所有输出内容使用目标语言**”,并给出中英文示例:
  150 +
  151 +```65:81:/home/tw/SearchEngine/indexer/process_products.py
  152 +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
  153 + """创建LLM提示词(根据目标语言输出)"""
  154 + lang_label = LANG_LABELS.get(target_lang, "对应语言")
  155 + prompt = f"""请对输入的每条商品标题,分析并提取以下信息,所有输出内容请使用{lang_label}:
  156 +
  157 +1. 商品标题:将输入商品名称翻译为{lang_label}
  158 +2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤)
  159 +3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式)
  160 +4. 适用人群:性别/年龄段等(例如:年轻女性)
  161 +5. 使用场景
  162 +6. 适用季节
  163 +7. 关键属性
  164 +8. 材质说明
  165 +9. 功能特点
  166 +10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由
  167 +11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。
  168 +"""
  169 +```
  170 +
  171 +- 返回格式固定为 Markdown 表格,首行头为:
  172 +
  173 +```89:91:/home/tw/SearchEngine/indexer/process_products.py
  174 +| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 |
  175 +|----|----|----|----|----|----|----|----|----|----|----|----|
  176 +```
  177 +
  178 +`parse_markdown_table` 会按表格列顺序解析成字段。
  179 +
  180 +---
  181 +
  182 +### 3. 索引阶段集成:`SPUDocumentTransformer._fill_llm_attributes`
  183 +
  184 +#### 3.1 调用时机
  185 +
  186 +在 `SPUDocumentTransformer.transform_spu_to_doc(...)` 的末尾,在所有基础字段(多语言文本、类目、SKU/规格、价格、库存等)填充完成后,会调用:
  187 +
  188 +```96:101:/home/tw/SearchEngine/indexer/document_transformer.py
  189 + # 文本字段处理(翻译等)
  190 + self._fill_text_fields(doc, spu_row, primary_lang)
  191 +
  192 + # 标题向量化
  193 + if self.enable_title_embedding and self.encoder:
  194 + self._fill_title_embedding(doc)
  195 + ...
  196 + # 时间字段
  197 + ...
  198 +
  199 + # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志)
  200 + self._fill_llm_attributes(doc, spu_row)
  201 +```
  202 +
  203 +也就是说,**每个 SPU 文档默认会尝试补充 qanchors 与 semantic_attributes**。
  204 +
  205 +#### 3.2 语言选择策略
  206 +
  207 +在 `_fill_llm_attributes` 内部:
  208 +
  209 +```148:164:/home/tw/SearchEngine/indexer/document_transformer.py
  210 + try:
  211 + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"]
  212 + except Exception:
  213 + index_langs = ["en", "zh"]
  214 +
  215 + # 只在支持的语言集合内调用
  216 + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS]
  217 + if not llm_langs:
  218 + return
  219 +```
  220 +
  221 +- `tenant_config.index_languages` 决定该租户希望在索引中支持哪些语言;
  222 +- 实际调用 LLM 的语言集合 = `index_languages ∩ SUPPORTED_LANGS`;
  223 +- 当前 SUPPORTED_LANGS:`{"zh", "en", "de", "ru", "fr"}`。
  224 +
  225 +这保证了:
  226 +
  227 +- 如果租户只索引 `zh`,就只跑中文;
  228 +- 如果租户同时索引 `en` + `de`,就为这两种语言各跑一次 LLM;
  229 +- 如果 `index_languages` 里包含暂不支持的语言(例如 `es`),会被自动忽略。
  230 +
  231 +#### 3.3 调用 LLM 并写入字段
  232 +
  233 +核心逻辑(简化描述):
  234 +
  235 +```164:210:/home/tw/SearchEngine/indexer/document_transformer.py
  236 + spu_id = str(spu_row.get("id") or "").strip()
  237 + title = str(spu_row.get("title") or "").strip()
  238 + if not spu_id or not title:
  239 + return
  240 +
  241 + semantic_list = doc.get("semantic_attributes") or []
  242 + qanchors_obj = doc.get("qanchors") or {}
  243 +
  244 + dim_keys = [
  245 + "tags",
  246 + "target_audience",
  247 + "usage_scene",
  248 + "season",
  249 + "key_attributes",
  250 + "material",
  251 + "features",
  252 + ]
  253 +
  254 + for lang in llm_langs:
  255 + try:
  256 + rows = analyze_products(
  257 + products=[{"id": spu_id, "title": title}],
  258 + target_lang=lang,
  259 + batch_size=1,
  260 + )
  261 + except Exception as e:
  262 + logger.warning("LLM attribute fill failed for SPU %s, lang=%s: %s", spu_id, lang, e)
  263 + continue
  264 +
  265 + if not rows:
  266 + continue
  267 + row = rows[0] or {}
  268 +
  269 + # qanchors.{lang}
  270 + anchor_text = str(row.get("anchor_text") or "").strip()
  271 + if anchor_text:
  272 + qanchors_obj[lang] = anchor_text
  273 +
  274 + # 语义属性
  275 + for name in dim_keys:
  276 + raw = row.get(name)
  277 + if not raw:
  278 + continue
  279 + parts = re.split(r"[,;|/\n\t]+", str(raw))
  280 + for part in parts:
  281 + value = part.strip()
  282 + if not value:
  283 + continue
  284 + semantic_list.append(
  285 + {
  286 + "lang": lang,
  287 + "name": name,
  288 + "value": value,
  289 + }
  290 + )
  291 +
  292 + if qanchors_obj:
  293 + doc["qanchors"] = qanchors_obj
  294 + if semantic_list:
  295 + doc["semantic_attributes"] = semantic_list
  296 +```
  297 +
  298 +要点:
  299 +
  300 +- 每种语言**单独调用一次** `analyze_products`,传入同一 SPU 的原始标题;
  301 +- 将返回的 `anchor_text` 直接写入 `qanchors.{lang}`,其内部仍是逗号分隔短语,后续 suggestion builder 会再拆分;
  302 +- 对各维度字段(tags/usage_scene/...)用统一正则进行“松散拆词”,过滤空串后,以 `(lang,name,value)` 三元组追加到 nested 数组;
  303 +- 如果某个维度在该语言下为空,则跳过,不写入任何条目。
  304 +
  305 +#### 3.4 容错 & 降级策略
  306 +
  307 +- 如果:
  308 + - 没有 `title`;
  309 + - 或者 `tenant_config.index_languages` 与 `SUPPORTED_LANGS` 没有交集;
  310 + - 或 `DASHSCOPE_API_KEY` 未配置 / LLM 请求报错;
  311 +- 则 `_fill_llm_attributes` 会在日志中输出 `warning`,**不会抛异常**,索引流程继续,只是该 SPU 在这一轮不会得到 `qanchors` / `semantic_attributes`。
  312 +
  313 +这保证了整个索引服务在 LLM 不可用时表现为一个普通的“传统索引”,而不会中断。
  314 +
  315 +---
  316 +
  317 +### 4. 查询与 Suggestion 中的使用建议
  318 +
  319 +#### 4.1 主搜索(Search API)
  320 +
  321 +在 `search/query_config.py` 或构建 ES 查询时,可以:
  322 +
  323 +- 将 `qanchors.{lang}` 作为额外的 `should` 字段参与匹配,并给一个略高的权重,例如:
  324 +
  325 +```json
  326 +{
  327 + "multi_match": {
  328 + "query": "<user_query>",
  329 + "fields": [
  330 + "title.zh^3.0",
  331 + "brief.zh^1.5",
  332 + "description.zh^1.0",
  333 + "vendor.zh^1.5",
  334 + "category_path.zh^1.5",
  335 + "category_name_text.zh^1.5",
  336 + "tags^1.0",
  337 + "qanchors.zh^2.0" // 建议新增
  338 + ]
  339 + }
  340 +}
  341 +```
  342 +
  343 +- 当用户做维度过滤时(例如“只看通勤场景 + 夏季 + 棉质”),可以在 filter 中增加 nested 查询:
  344 +
  345 +```json
  346 +{
  347 + "nested": {
  348 + "path": "semantic_attributes",
  349 + "query": {
  350 + "bool": {
  351 + "must": [
  352 + { "term": { "semantic_attributes.lang": "zh" } },
  353 + { "term": { "semantic_attributes.name": "usage_scene" } },
  354 + { "term": { "semantic_attributes.value": "通勤" } }
  355 + ]
  356 + }
  357 + }
  358 + }
  359 +}
  360 +```
  361 +
  362 +多个维度可以通过多个 nested 子句组合(AND/OR 逻辑与 `specifications` 的设计类似)。
  363 +
  364 +#### 4.2 Suggestion(联想词)
  365 +
  366 +现有 `suggestion/builder.py` 已经支持从 `qanchors.{lang}` 中提取候选:
  367 +
  368 +```249:287:/home/tw/SearchEngine/suggestion/builder.py
  369 + # Step 1: product title/qanchors
  370 + hits = self._scan_products(tenant_id, batch_size=batch_size)
  371 + ...
  372 + title_obj = src.get("title") or {}
  373 + qanchor_obj = src.get("qanchors") or {}
  374 + ...
  375 + for lang in index_languages:
  376 + ...
  377 + q_raw = None
  378 + if isinstance(qanchor_obj, dict):
  379 + q_raw = qanchor_obj.get(lang)
  380 + for q_text in self._split_qanchors(q_raw):
  381 + text_norm = self._normalize_text(q_text)
  382 + if self._looks_noise(text_norm):
  383 + continue
  384 + key = (lang, text_norm)
  385 + c = key_to_candidate.get(key)
  386 + if c is None:
  387 + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
  388 + key_to_candidate[key] = c
  389 + c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6)
  390 +```
  391 +
  392 +- `_split_qanchors` 使用与索引端一致的分隔符集合,确保:
  393 + - 无论 LLM 用逗号、分号还是换行分隔,只要符合约定,都能被拆成单独候选词;
  394 +- `add_product("qanchor", ...)` 会:
  395 + - 将来源标记为 `qanchor`;
  396 + - 在排序打分时,`qanchor` 命中会比纯 `title` 更有权重。
  397 +
  398 +---
  399 +
  400 +### 5. 总结与扩展方向
  401 +
  402 +1. **功能定位**:
  403 + - `qanchors.{lang}`:更好地贴近用户真实查询词,用于召回与 suggestion;
  404 + - `semantic_attributes`:以结构化形式承载 LLM 抽取的语义维度,用于 filter / facet。
  405 +2. **多语言对齐**:
  406 + - 完全复用租户级 `index_languages` 配置;
  407 + - 对每种语言单独生成锚文本与语义属性,不互相混用。
  408 +3. **默认开启 / 自动降级**:
  409 + - 索引流程始终可用;
  410 + - 当 LLM/配置异常时,只是“缺少增强特征”,不影响基础搜索能力。
  411 +4. **未来扩展**:
  412 + - 可以在 `dim_keys` 中新增维度名(如 `style`, `benefit` 等),只要在 prompt 与解析逻辑中增加对应列即可;
  413 + - 可以为 `semantic_attributes` 增加额外字段(如 `confidence`、`source`),用于更精细的控制(当前 mapping 为简单版)。
  414 +
  415 +如需在查询层面增加基于 `semantic_attributes` 的统一 DSL(类似 `specifications` 的过滤/分面规则),推荐在 `docs/搜索API对接指南.md` 中新增一节,并在 `search/es_query_builder.py` 里封装构造逻辑,避免前端直接拼 nested 查询。
  416 +
indexer/document_transformer.py
@@ -11,8 +11,10 @@ SPU文档转换器 - 公共转换逻辑。 @@ -11,8 +11,10 @@ SPU文档转换器 - 公共转换逻辑。
11 import pandas as pd 11 import pandas as pd
12 import numpy as np 12 import numpy as np
13 import logging 13 import logging
  14 +import re
14 from typing import Dict, Any, Optional, List 15 from typing import Dict, Any, Optional, List
15 from config import ConfigLoader 16 from config import ConfigLoader
  17 +from indexer.process_products import analyze_products, SUPPORTED_LANGS
16 18
17 logger = logging.getLogger(__name__) 19 logger = logging.getLogger(__name__)
18 20
@@ -168,6 +170,9 @@ class SPUDocumentTransformer: @@ -168,6 +170,9 @@ class SPUDocumentTransformer:
168 else: 170 else:
169 doc['update_time'] = str(update_time) 171 doc['update_time'] = str(update_time)
170 172
  173 + # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志)
  174 + self._fill_llm_attributes(doc, spu_row)
  175 +
171 return doc 176 return doc
172 177
173 def _fill_text_fields( 178 def _fill_text_fields(
@@ -473,6 +478,88 @@ class SPUDocumentTransformer: @@ -473,6 +478,88 @@ class SPUDocumentTransformer:
473 else: 478 else:
474 doc['option3_values'] = [] 479 doc['option3_values'] = []
475 480
  481 + def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None:
  482 + """
  483 + 调用 indexer.process_products.analyze_products,为当前 SPU 填充:
  484 + - qanchors.{lang}
  485 + - semantic_attributes (lang/name/value)
  486 + """
  487 + try:
  488 + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"]
  489 + except Exception:
  490 + index_langs = ["en", "zh"]
  491 +
  492 + # 只在支持的语言集合内调用
  493 + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS]
  494 + if not llm_langs:
  495 + return
  496 +
  497 + spu_id = str(spu_row.get("id") or "").strip()
  498 + title = str(spu_row.get("title") or "").strip()
  499 + if not spu_id or not title:
  500 + return
  501 +
  502 + semantic_list = doc.get("semantic_attributes") or []
  503 + qanchors_obj = doc.get("qanchors") or {}
  504 +
  505 + dim_keys = [
  506 + "tags",
  507 + "target_audience",
  508 + "usage_scene",
  509 + "season",
  510 + "key_attributes",
  511 + "material",
  512 + "features",
  513 + ]
  514 +
  515 + for lang in llm_langs:
  516 + try:
  517 + rows = analyze_products(
  518 + products=[{"id": spu_id, "title": title}],
  519 + target_lang=lang,
  520 + batch_size=1,
  521 + )
  522 + except Exception as e:
  523 + logger.warning(
  524 + "LLM attribute fill failed for SPU %s, lang=%s: %s",
  525 + spu_id,
  526 + lang,
  527 + e,
  528 + )
  529 + continue
  530 +
  531 + if not rows:
  532 + continue
  533 + row = rows[0] or {}
  534 +
  535 + # qanchors.{lang}
  536 + anchor_text = str(row.get("anchor_text") or "").strip()
  537 + if anchor_text:
  538 + qanchors_obj[lang] = anchor_text
  539 +
  540 + # 语义属性:按各维度拆分为短语
  541 + for name in dim_keys:
  542 + raw = row.get(name)
  543 + if not raw:
  544 + continue
  545 + parts = re.split(r"[,;|/\n\t]+", str(raw))
  546 + for part in parts:
  547 + value = part.strip()
  548 + if not value:
  549 + continue
  550 + semantic_list.append(
  551 + {
  552 + "lang": lang,
  553 + "name": name,
  554 + "value": value,
  555 + }
  556 + )
  557 +
  558 + if qanchors_obj:
  559 + doc["qanchors"] = qanchors_obj
  560 + if semantic_list:
  561 + doc["semantic_attributes"] = semantic_list
  562 +
476 def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: 563 def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]:
477 """ 564 """
478 将SKU行转换为SKU对象。 565 将SKU行转换为SKU对象。
indexer/process_products.py 0 → 100644
@@ -0,0 +1,447 @@ @@ -0,0 +1,447 @@
  1 +#!/usr/bin/env python3
  2 +"""
  3 +商品品类分析脚本
  4 +批量读取商品标题,调用大模型进行品类分析,并保存结果
  5 +"""
  6 +
  7 +import csv
  8 +import os
  9 +import json
  10 +import logging
  11 +import time
  12 +from datetime import datetime
  13 +from typing import List, Dict, Tuple, Any, Optional
  14 +import requests
  15 +from pathlib import Path
  16 +from requests.adapters import HTTPAdapter
  17 +from urllib3.util.retry import Retry
  18 +
  19 +# 配置
  20 +BATCH_SIZE = 20
  21 +API_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
  22 +MODEL_NAME = "qwen-max"
  23 +API_KEY = os.environ.get("DASHSCOPE_API_KEY")
  24 +MAX_RETRIES = 3
  25 +RETRY_DELAY = 5 # 秒
  26 +REQUEST_TIMEOUT = 180 # 秒
  27 +
  28 +# 禁用代理
  29 +os.environ['NO_PROXY'] = '*'
  30 +os.environ['no_proxy'] = '*'
  31 +
  32 +# 文件路径
  33 +INPUT_FILE = "saas_170_products.csv"
  34 +OUTPUT_DIR = Path("output_logs")
  35 +OUTPUT_FILE = OUTPUT_DIR / "products_analyzed.csv"
  36 +LOG_DIR = OUTPUT_DIR / "logs"
  37 +
  38 +# 设置日志
  39 +LOG_DIR.mkdir(parents=True, exist_ok=True)
  40 +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  41 +log_file = LOG_DIR / f"process_{timestamp}.log"
  42 +
  43 +logging.basicConfig(
  44 + level=logging.INFO,
  45 + format='%(asctime)s - %(levelname)s - %(message)s',
  46 + handlers=[
  47 + logging.FileHandler(log_file, encoding='utf-8'),
  48 + logging.StreamHandler()
  49 + ]
  50 +)
  51 +logger = logging.getLogger(__name__)
  52 +
  53 +
  54 +LANG_LABELS: Dict[str, str] = {
  55 + "zh": "中文",
  56 + "en": "英文",
  57 + "de": "德文",
  58 + "ru": "俄文",
  59 + "fr": "法文",
  60 +}
  61 +
  62 +SUPPORTED_LANGS = set(LANG_LABELS.keys())
  63 +
  64 +
  65 +def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
  66 + """创建LLM提示词(根据目标语言输出)"""
  67 + lang_label = LANG_LABELS.get(target_lang, "对应语言")
  68 + prompt = f"""请对输入的每条商品标题,分析并提取以下信息,所有输出内容请使用{lang_label}:
  69 +
  70 +1. 商品标题:将输入商品名称翻译为{lang_label}
  71 +2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤)
  72 +3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式)
  73 +4. 适用人群:性别/年龄段等(例如:年轻女性)
  74 +5. 使用场景
  75 +6. 适用季节
  76 +7. 关键属性
  77 +8. 材质说明
  78 +9. 功能特点
  79 +10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由
  80 +11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。
  81 +
  82 +输入商品列表:
  83 +
  84 +"""
  85 +
  86 + prompt_tail = """
  87 +请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明:
  88 +
  89 +| 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 |
  90 +|----|----|----|----|----|----|----|----|----|----|----|----|
  91 +"""
  92 +
  93 + for idx, product in enumerate(products, 1):
  94 + prompt += f'{idx}. {product["title"]}\n'
  95 + prompt += prompt_tail
  96 +
  97 + return prompt
  98 +
  99 +
  100 +def call_llm(prompt: str) -> Tuple[str, str]:
  101 + """调用大模型API(带重试机制)"""
  102 + headers = {
  103 + "Authorization": f"Bearer {API_KEY}",
  104 + "Content-Type": "application/json"
  105 + }
  106 +
  107 + payload = {
  108 + "model": MODEL_NAME,
  109 + "messages": [
  110 + {
  111 + "role": "system",
  112 + "content": "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注,按要求格式返回Markdown表格。"
  113 + },
  114 + {
  115 + "role": "user",
  116 + "content": prompt
  117 + }
  118 + ],
  119 + "temperature": 0.3,
  120 + "top_p": 0.8
  121 + }
  122 +
  123 + request_data = {
  124 + "headers": {k: v for k, v in headers.items() if k != "Authorization"},
  125 + "payload": payload
  126 + }
  127 +
  128 + logger.info(f"\n{'='*80}")
  129 + logger.info(f"LLM Request (Model: {MODEL_NAME}):")
  130 + logger.info(json.dumps(request_data, ensure_ascii=False, indent=2))
  131 + logger.info(f"\nPrompt:\n{prompt}")
  132 +
  133 + # 创建session,禁用代理
  134 + session = requests.Session()
  135 + session.trust_env = False # 忽略系统代理设置
  136 +
  137 + try:
  138 + # 重试机制
  139 + for attempt in range(MAX_RETRIES):
  140 + try:
  141 + response = session.post(
  142 + f"{API_BASE_URL}/chat/completions",
  143 + headers=headers,
  144 + json=payload,
  145 + timeout=REQUEST_TIMEOUT,
  146 + proxies={"http": None, "https": None} # 明确禁用代理
  147 + )
  148 +
  149 + response.raise_for_status()
  150 + result = response.json()
  151 +
  152 + logger.info(f"\nLLM Response:")
  153 + logger.info(json.dumps(result, ensure_ascii=False, indent=2))
  154 +
  155 + content = result["choices"][0]["message"]["content"]
  156 + logger.info(f"\nExtracted Content:\n{content}")
  157 +
  158 + return content, json.dumps(result, ensure_ascii=False)
  159 +
  160 + except requests.exceptions.ProxyError as e:
  161 + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Proxy error - {str(e)}")
  162 + if attempt < MAX_RETRIES - 1:
  163 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  164 + time.sleep(RETRY_DELAY)
  165 + else:
  166 + raise
  167 +
  168 + except requests.exceptions.RequestException as e:
  169 + logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES}: Request error - {str(e)}")
  170 + if attempt < MAX_RETRIES - 1:
  171 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  172 + time.sleep(RETRY_DELAY)
  173 + else:
  174 + raise
  175 +
  176 + except Exception as e:
  177 + logger.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES}: {str(e)}")
  178 + if attempt < MAX_RETRIES - 1:
  179 + logger.info(f"Retrying in {RETRY_DELAY} seconds...")
  180 + time.sleep(RETRY_DELAY)
  181 + else:
  182 + raise
  183 +
  184 + finally:
  185 + session.close()
  186 +
  187 +
  188 +def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]:
  189 + """解析markdown表格内容"""
  190 + lines = markdown_content.strip().split('\n')
  191 + data = []
  192 + data_started = False
  193 +
  194 + for line in lines:
  195 + line = line.strip()
  196 + if not line:
  197 + continue
  198 +
  199 + # 跳过表头
  200 + if line.startswith('|'):
  201 + # 跳过分隔行
  202 + if set(line.replace('|', '').strip()) <= {'-', ':'}:
  203 + data_started = True
  204 + continue
  205 +
  206 + # 跳过表头行
  207 + if not data_started:
  208 + if '序号' in line or '商品中文标题' in line:
  209 + continue
  210 + data_started = True
  211 + continue
  212 +
  213 + # 解析数据行
  214 + parts = [p.strip() for p in line.split('|')]
  215 + parts = [p for p in parts if p] # 移除空字符串
  216 +
  217 + if len(parts) >= 2:
  218 + row = {
  219 + "seq_no": parts[0],
  220 + "title": parts[1], # 商品标题(按目标语言)
  221 + "category_path": parts[2] if len(parts) > 2 else "", # 品类路径
  222 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签
  223 + "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群
  224 + "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景
  225 + "season": parts[6] if len(parts) > 6 else "", # 适用季节
  226 + "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性
  227 + "material": parts[8] if len(parts) > 8 else "", # 材质说明
  228 + "features": parts[9] if len(parts) > 9 else "", # 功能特点
  229 + "selling_points": parts[10] if len(parts) > 10 else "", # 商品卖点
  230 + "anchor_text": parts[11] if len(parts) > 11 else "" # 锚文本
  231 + }
  232 + data.append(row)
  233 +
  234 + return data
  235 +
  236 +
  237 +def process_batch(
  238 + batch_data: List[Dict[str, str]],
  239 + batch_num: int,
  240 + target_lang: str = "zh"
  241 +) -> List[Dict[str, str]]:
  242 + """处理一个批次的数据"""
  243 + logger.info(f"\n{'#'*80}")
  244 + logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)")
  245 +
  246 + # 创建提示词
  247 + prompt = create_prompt(batch_data, target_lang=target_lang)
  248 +
  249 + # 调用LLM
  250 + try:
  251 + raw_response, full_response_json = call_llm(prompt)
  252 +
  253 + # 解析结果
  254 + parsed_results = parse_markdown_table(raw_response)
  255 +
  256 + logger.info(f"\nParsed Results ({len(parsed_results)} items):")
  257 + logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2))
  258 +
  259 + # 映射回原始ID
  260 + results_with_ids = []
  261 + for i, parsed_item in enumerate(parsed_results):
  262 + if i < len(batch_data):
  263 + original_id = batch_data[i]["id"]
  264 + result = {
  265 + "id": original_id,
  266 + "lang": target_lang,
  267 + "title_input": batch_data[i]["title"], # 原始输入标题
  268 + "title": parsed_item.get("title", ""), # 模型生成的标题
  269 + "category_path": parsed_item.get("category_path", ""), # 品类路径
  270 + "tags": parsed_item.get("tags", ""), # 细分标签
  271 + "target_audience": parsed_item.get("target_audience", ""), # 适用人群
  272 + "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景
  273 + "season": parsed_item.get("season", ""), # 适用季节
  274 + "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性
  275 + "material": parsed_item.get("material", ""), # 材质说明
  276 + "features": parsed_item.get("features", ""), # 功能特点
  277 + "selling_points": parsed_item.get("selling_points", ""), # 商品卖点
  278 + "anchor_text": parsed_item.get("anchor_text", "") # 锚文本
  279 + }
  280 + results_with_ids.append(result)
  281 + logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}")
  282 +
  283 + # 保存日志
  284 + batch_log = {
  285 + "batch_num": batch_num,
  286 + "timestamp": datetime.now().isoformat(),
  287 + "input_products": batch_data,
  288 + "raw_response": raw_response,
  289 + "full_response_json": full_response_json,
  290 + "parsed_results": parsed_results,
  291 + "final_results": results_with_ids
  292 + }
  293 +
  294 + batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}.json"
  295 + with open(batch_log_file, 'w', encoding='utf-8') as f:
  296 + json.dump(batch_log, f, ensure_ascii=False, indent=2)
  297 +
  298 + logger.info(f"Batch log saved to: {batch_log_file}")
  299 +
  300 + return results_with_ids
  301 +
  302 + except Exception as e:
  303 + logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True)
  304 + # 返回空结果,保持ID映射
  305 + return [{
  306 + "id": item["id"],
  307 + "lang": target_lang,
  308 + "title_input": item["title"],
  309 + "title": "",
  310 + "category_path": "",
  311 + "tags": "",
  312 + "target_audience": "",
  313 + "usage_scene": "",
  314 + "season": "",
  315 + "key_attributes": "",
  316 + "material": "",
  317 + "features": "",
  318 + "selling_points": "",
  319 + "anchor_text": "",
  320 + "error": str(e),
  321 + } for item in batch_data]
  322 +
  323 +
  324 +def read_products(input_file: str) -> List[Dict[str, str]]:
  325 + """读取CSV文件"""
  326 + products = []
  327 + with open(input_file, 'r', encoding='utf-8') as f:
  328 + reader = csv.DictReader(f)
  329 + for row in reader:
  330 + products.append({
  331 + "id": row["id"],
  332 + "title": row["title"]
  333 + })
  334 + return products
  335 +
  336 +
  337 +def write_results(results: List[Dict[str, str]], output_file: Path):
  338 + """写入结果到CSV文件"""
  339 + output_file.parent.mkdir(parents=True, exist_ok=True)
  340 +
  341 + fieldnames = [
  342 + "id",
  343 + "lang",
  344 + "title_input",
  345 + "title",
  346 + "category_path",
  347 + "tags",
  348 + "target_audience",
  349 + "usage_scene",
  350 + "season",
  351 + "key_attributes",
  352 + "material",
  353 + "features",
  354 + "selling_points",
  355 + "anchor_text",
  356 + ]
  357 +
  358 + with open(output_file, 'w', encoding='utf-8', newline='') as f:
  359 + writer = csv.DictWriter(f, fieldnames=fieldnames)
  360 + writer.writeheader()
  361 + writer.writerows(results)
  362 +
  363 + logger.info(f"\nResults written to: {output_file}")
  364 +
  365 +
  366 +def main():
  367 + """主函数"""
  368 + if not API_KEY:
  369 + logger.error("Error: DASHSCOPE_API_KEY environment variable not set!")
  370 + return
  371 +
  372 + logger.info(f"Starting product analysis process")
  373 + logger.info(f"Input file: {INPUT_FILE}")
  374 + logger.info(f"Output file: {OUTPUT_FILE}")
  375 + logger.info(f"Batch size: {BATCH_SIZE}")
  376 + logger.info(f"Model: {MODEL_NAME}")
  377 +
  378 + # 读取产品数据
  379 + logger.info(f"\nReading products from {INPUT_FILE}...")
  380 + products = read_products(INPUT_FILE)
  381 + logger.info(f"Total products to process: {len(products)}")
  382 +
  383 + # 分批处理
  384 + all_results = []
  385 + total_batches = (len(products) + BATCH_SIZE - 1) // BATCH_SIZE
  386 +
  387 + for i in range(0, len(products), BATCH_SIZE):
  388 + batch_num = i // BATCH_SIZE + 1
  389 + batch = products[i:i + BATCH_SIZE]
  390 +
  391 + logger.info(f"\nProgress: Batch {batch_num}/{total_batches}")
  392 +
  393 + results = process_batch(batch, batch_num, target_lang="zh")
  394 + all_results.extend(results)
  395 +
  396 + # 每处理完一个批次就写入一次(断点续传)
  397 + write_results(all_results, OUTPUT_FILE)
  398 + logger.info(f"Progress saved: {len(all_results)}/{len(products)} items completed")
  399 +
  400 + logger.info(f"\n{'='*80}")
  401 + logger.info(f"Processing completed!")
  402 + logger.info(f"Total processed: {len(all_results)} items")
  403 + logger.info(f"Output file: {OUTPUT_FILE}")
  404 + logger.info(f"Log file: {log_file}")
  405 +
  406 +
  407 +if __name__ == "__main__":
  408 + main()
  409 +
  410 +
  411 +def analyze_products(
  412 + products: List[Dict[str, str]],
  413 + target_lang: str = "zh",
  414 + batch_size: Optional[int] = None,
  415 +) -> List[Dict[str, Any]]:
  416 + """
  417 + 库调用入口:根据输入+语言,返回锚文本及各维度信息。
  418 +
  419 + Args:
  420 + products: [{"id": "...", "title": "..."}]
  421 + target_lang: 输出语言,需在 SUPPORTED_LANGS 内
  422 + batch_size: 批大小,默认使用全局 BATCH_SIZE
  423 + """
  424 + if not API_KEY:
  425 + raise RuntimeError("DASHSCOPE_API_KEY is not set, cannot call LLM")
  426 +
  427 + if target_lang not in SUPPORTED_LANGS:
  428 + raise ValueError(f"Unsupported target_lang={target_lang}, supported={sorted(SUPPORTED_LANGS)}")
  429 +
  430 + if not products:
  431 + return []
  432 +
  433 + bs = batch_size or BATCH_SIZE
  434 + all_results: List[Dict[str, Any]] = []
  435 + total_batches = (len(products) + bs - 1) // bs
  436 +
  437 + for i in range(0, len(products), bs):
  438 + batch_num = i // bs + 1
  439 + batch = products[i:i + bs]
  440 + logger.info(
  441 + f"[analyze_products] Processing batch {batch_num}/{total_batches}, "
  442 + f"size={len(batch)}, target_lang={target_lang}"
  443 + )
  444 + batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang)
  445 + all_results.extend(batch_results)
  446 +
  447 + return all_results
mappings/search_products.json
@@ -1388,6 +1388,20 @@ @@ -1388,6 +1388,20 @@
1388 "index": false 1388 "index": false
1389 } 1389 }
1390 } 1390 }
  1391 + },
  1392 + "semantic_attributes": {
  1393 + "type": "nested",
  1394 + "properties": {
  1395 + "lang": {
  1396 + "type": "keyword"
  1397 + },
  1398 + "name": {
  1399 + "type": "keyword"
  1400 + },
  1401 + "value": {
  1402 + "type": "keyword"
  1403 + }
  1404 + }
1391 } 1405 }
1392 } 1406 }
1393 } 1407 }