diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 57baf69..ae20cc1 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -80,7 +80,7 @@ class BuildDocsFromDbRequest(BaseModel): class EnrichContentItem(BaseModel): """单条待生成内容理解字段的商品。""" spu_id: str = Field(..., description="SPU ID") - title: str = Field(..., description="商品标题,用于 LLM 分析生成 qanchors / tags 等") + title: str = Field(..., description="商品标题,用于 LLM 分析生成 qanchors / enriched_tags 等") image_url: Optional[str] = Field(None, description="商品主图 URL(预留给多模态/内容理解扩展)") brief: Optional[str] = Field(None, description="商品简介/短描述") description: Optional[str] = Field(None, description="商品详情/长描述") @@ -93,10 +93,6 @@ class EnrichContentRequest(BaseModel): """ tenant_id: str = Field(..., description="租户 ID,用于请求路由与结果归属,不参与缓存键") items: List[EnrichContentItem] = Field(..., description="待分析的 SPU 列表(spu_id + title,可附带 brief/description/image_url)") - languages: List[str] = Field( - default_factory=lambda: ["zh", "en"], - description="目标语言列表,需在支持范围内(zh/en/de/ru/fr),默认 zh, en", - ) @router.post("/reindex") @@ -444,92 +440,25 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest): raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") -def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: +def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]]) -> List[Dict[str, Any]]: """ - 同步执行内容理解:调用 product_enrich.analyze_products,按语言批量跑 LLM, - 再聚合成每 SPU 的 qanchors、enriched_attributes、tags。供 run_in_executor 调用。 + 同步执行内容理解,返回与 ES mapping 对齐的字段结构。 + 语言策略由 product_enrich 内部统一决定,路由层不参与。 """ - from indexer.product_enrich import analyze_products, split_multi_value_field + from indexer.product_enrich import build_index_content_fields - llm_langs = list(dict.fromkeys(languages)) or ["en"] - - products = [ + results = build_index_content_fields(items=items, tenant_id=tenant_id) + return [ { - "id": it["spu_id"], - "title": (it.get("title") or "").strip(), - "brief": (it.get("brief") or "").strip(), - "description": (it.get("description") or "").strip(), - "image_url": (it.get("image_url") or "").strip(), + "spu_id": item["id"], + "qanchors": item["qanchors"], + "enriched_attributes": item["enriched_attributes"], + "enriched_tags": item["enriched_tags"], + **({"error": item["error"]} if item.get("error") else {}), } - for it in items - ] - dim_keys = [ - "tags", - "target_audience", - "usage_scene", - "season", - "key_attributes", - "material", - "features", + for item in results ] - # 按 spu_id 聚合:qanchors[lang], enriched_attributes[], tags[] - by_spu: Dict[str, Dict[str, Any]] = {} - for it in items: - sid = str(it["spu_id"]) - by_spu[sid] = {"qanchors": {}, "enriched_attributes": [], "tags": []} - - for lang in llm_langs: - try: - rows = analyze_products( - products=products, - target_lang=lang, - batch_size=20, - tenant_id=tenant_id, - ) - except Exception as e: - logger.warning("enrich-content analyze_products failed for lang=%s: %s", lang, e) - for it in items: - sid = str(it["spu_id"]) - if "error" not in by_spu[sid]: - by_spu[sid]["error"] = str(e) - continue - - for row in rows: - spu_id = str(row.get("id") or "") - if spu_id not in by_spu: - continue - rec = by_spu[spu_id] - if row.get("error"): - rec["error"] = row["error"] - continue - anchor_text = str(row.get("anchor_text") or "").strip() - if anchor_text: - rec["qanchors"][lang] = anchor_text - for name in dim_keys: - raw = row.get(name) - if not raw: - continue - for value in split_multi_value_field(str(raw)): - rec["enriched_attributes"].append({"lang": lang, "name": name, "value": value}) - if name == "tags": - rec["tags"].append(value) - - # 去重 tags(保持顺序) - out = [] - for it in items: - sid = str(it["spu_id"]) - rec = by_spu[sid] - tags = list(dict.fromkeys(rec["tags"])) - out.append({ - "spu_id": sid, - "qanchors": rec["qanchors"], - "enriched_attributes": rec["enriched_attributes"], - "tags": tags, - **({"error": rec["error"]} if rec.get("error") else {}), - }) - return out - @router.post("/enrich-content") async def enrich_content(request: EnrichContentRequest): @@ -540,7 +469,7 @@ async def enrich_content(request: EnrichContentRequest): - 外部 indexer 采用「微服务组合」方式自己组织 doc 时,可调用本接口获取 LLM 生成的 锚文本与语义属性,再与翻译、向量化结果合并写入 ES。 - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 - 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 + 先拿不含 qanchors/enriched_tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 实现逻辑与 indexer.product_enrich.analyze_products 一致,支持多语言与 Redis 缓存。 """ @@ -568,8 +497,7 @@ async def enrich_content(request: EnrichContentRequest): None, lambda: _run_enrich_content( tenant_id=request.tenant_id, - items=items_payload, - languages=request.languages or ["zh", "en"], + items=items_payload ), ) return { diff --git a/docs/suggestion索引构建.md b/docs/suggestion索引构建.md index 0f3b4d5..73a3099 100644 --- a/docs/suggestion索引构建.md +++ b/docs/suggestion索引构建.md @@ -169,7 +169,7 @@ ##### 4.1 从商品索引收集 title / qanchors / tags(Step 1) - - 遍历店铺的所有商品:获取每个商品的 `"spu_id"`, `"title"`, `"qanchors"`, `"tags"`(按 `spu_id`、`id.keyword` 升序,便于 `search_after` 稳定分页) + - 遍历店铺的所有商品:获取每个商品的 `"spu_id"`, `"title"`, `"qanchors"`, `"enriched_tags"`(按 `spu_id`、`id.keyword` 升序,便于 `search_after` 稳定分页) - 对每个商品文档: @@ -207,7 +207,7 @@ - **qanchors 处理**: - `qanchors` 字段同样为多语言对象: ```json - "qanchors": { "en": "...", "zh": "..." } + "qanchors": { "en": ["slim fit", "sporty casual"], "zh": ["修身", "显瘦"] } ``` - 取 `q_raw = qanchors[lang]` - 通过 `_split_qanchors(q_raw)` 拆分为若干字符串: @@ -217,10 +217,14 @@ - `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤 - 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。 - 4. **tags 处理**(与 `index_languages` 循环并列,每个商品只做一次): - - `tags` 可为字符串数组,或逗号等分隔的单个字符串;经 `_iter_product_tags` 展开为若干条。 - - 每条 tag **无语言字段**:使用 `query.query_parser.detect_text_language_for_suggestions`(与 `QueryParser` 相同的 `LanguageDetector`)判定语言,并约束在租户的 `index_languages` 内。 - - 通过 `_looks_noise` 后按 `(detected_lang, text_norm)` 合并,调用 `add_product("tag", spu_id=product_id)`。 + 4. **enriched_tags 处理**(与 `index_languages` 循环并列): + - `enriched_tags` 现为多语言对象,例如: + ```json + "enriched_tags": { "en": ["Classic", "ribbed neckline"], "zh": ["辣妹风"] } + ``` + - 优先读取 `enriched_tags[lang]`,每个值可为字符串数组,或逗号等分隔的单个字符串;经 `_iter_product_tags` 展开为若干条。 + - 对历史旧数据,若 `enriched_tags` 仍是单层字符串 / 数组,则继续走语言检测兜底,并约束在租户的 `index_languages` 内。 + - 通过 `_looks_noise` 后按 `(lang, text_norm)` 合并,调用 `add_product("tag", spu_id=product_id)`。 ##### 4.2 从查询日志收集用户 query(Step 2) diff --git a/docs/搜索API对接指南-05-索引接口(Indexer).md b/docs/搜索API对接指南-05-索引接口(Indexer).md index c3129fa..1605e6f 100644 --- a/docs/搜索API对接指南-05-索引接口(Indexer).md +++ b/docs/搜索API对接指南-05-索引接口(Indexer).md @@ -13,7 +13,7 @@ | 查询文档 | POST | `/indexer/documents` | 按 SPU ID 列表查询 ES 文档,不写入 ES | | 构建 ES 文档(正式) | POST | `/indexer/build-docs` | 由上游提供 MySQL 行数据,返回 ES-ready 文档,不写 ES | | 构建 ES 文档(测试) | POST | `/indexer/build-docs-from-db` | 由本服务查库并构建文档,仅测试/调试用 | -| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、semantic_attributes、tags(供微服务组合方式使用) | +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags(供微服务组合方式使用) | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务与数据库连接状态 | #### 5.0 支撑外部 indexer 的三种方式 @@ -510,7 +510,6 @@ curl -X POST "http://localhost:6004/indexer/build-docs" \ { "spu": { "id": 10001, - "tenant_id": "162", "title": "测试T恤 纯棉短袖", "brief": "舒适纯棉,多色可选", "description": "这是一款适合日常穿着的纯棉T恤,透气吸汗。", @@ -521,7 +520,7 @@ curl -X POST "http://localhost:6004/indexer/build-docs" \ "category_path": "服装/上衣/T恤", "fake_sales": 1280, "image_src": "https://oss.essa.cn/98532128-cf8e-456c-9e30-6f2a5ea0c19f.jpg", - "tags": "T恤,纯棉,短袖,夏季", + "enriched_tags": ["T恤", "纯棉"], "create_time": "2024-01-01T00:00:00Z", "update_time": "2024-01-01T00:00:00Z" }, @@ -570,7 +569,7 @@ curl -X POST "http://localhost:6004/indexer/build-docs" \ "tenant_id": "170", "spu_id": "223167", "title": { "en": "...", "zh": "..." }, - "tags": ["Floerns", "Clothing", "Shoes & Jewelry"], + "enriched_tags": ["Floerns", "Clothing", "Shoes & Jewelry"], "skus": [ { "sku_id": "3988393", @@ -649,7 +648,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ ### 5.8 内容理解字段生成接口 - **端点**: `POST /indexer/enrich-content` -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。内部逻辑与 `indexer.product_enrich` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(语义属性)、**enriched_tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。接口只暴露商品内容输入,语言选择、分析维度与最终字段结构统一由 `indexer.product_enrich` 内部决定;当前返回结果与 `search_products` mapping 保持一致。单次请求在线程池中执行,避免阻塞其他接口。 #### 请求参数 @@ -669,8 +668,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ "title": "12PCS Dolls with Bottles", "image_url": "https://example.com/images/223168.jpg" } - ], - "languages": ["zh", "en"] + ] } ``` @@ -678,7 +676,6 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ |------|------|------|--------|------| | `tenant_id` | string | Y | - | 租户 ID。目前仅用于记录日志,不产生实际作用| | `items` | array | Y | - | 待分析列表;**单次最多 50 条** | -| `languages` | array[string] | N | `["zh", "en"]` | 目标语言,需在支持范围内:`zh`、`en`、`de`、`ru`、`fr` | `items[]` 字段说明: @@ -696,6 +693,12 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ - `tenant_id`、`spu_id` 只用于请求归属与结果回填,不参与缓存键。 - 因此,输入内容不变时可跨请求直接命中缓存;任一输入字段变化时,会自然落到新的缓存 key。 +语言说明: + +- 接口不接受语言控制参数。 +- 返回哪些语言、返回哪些语义维度,统一由 `indexer.product_enrich` 内部逻辑决定。 +- 当前为了与 `search_products` mapping 对齐,返回结果只包含核心索引语言 `zh`、`en`。 + 批量请求建议: - **全量**:强烈建议 尽可能 **20 个 SPU/doc** 攒成一个批次后再请求一次。 - **增量**:可按时效要求设置时间窗口(例如 **5 分钟**),在窗口内尽可能攒到 **20 个**;达到 20 或窗口到期就发送一次请求。 @@ -711,21 +714,28 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ { "spu_id": "223167", "qanchors": { - "zh": "短袖T恤,纯棉,男装,夏季", - "en": "cotton t-shirt, short sleeve, men, summer" + "zh": ["短袖T恤", "纯棉", "男装", "夏季"], + "en": ["cotton t-shirt", "short sleeve", "men", "summer"] + }, + "enriched_tags": { + "zh": ["纯棉", "短袖", "男装"], + "en": ["cotton", "short sleeve", "men"] }, "enriched_attributes": [ - { "lang": "zh", "name": "tags", "value": "纯棉" }, - { "lang": "zh", "name": "usage_scene", "value": "日常" }, - { "lang": "en", "name": "tags", "value": "cotton" } - ], - "tags": ["纯棉", "短袖", "男装", "cotton", "short sleeve"] + { "name": "enriched_tags", "value": { "zh": "纯棉" } }, + { "name": "usage_scene", "value": { "zh": "日常" } }, + { "name": "enriched_tags", "value": { "en": "cotton" } } + ] }, { "spu_id": "223168", - "qanchors": { "en": "dolls, toys, 12pcs" }, - "enriched_attributes": [], - "tags": ["dolls", "toys"] + "qanchors": { + "en": ["dolls", "toys", "12pcs"] + }, + "enriched_tags": { + "en": ["dolls", "toys"] + }, + "enriched_attributes": [] } ] } @@ -733,10 +743,10 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ | 字段 | 类型 | 说明 | |------|------|------| -| `results` | array | 与请求 `items` 一一对应,每项含 `spu_id`、`qanchors`、`enriched_attributes`、`tags` | -| `results[].qanchors` | object | 按语言键的锚文本(逗号分隔短语),可写入 ES 文档的 `qanchors.{lang}` | -| `results[].enriched_attributes` | array | 语义属性列表,每项为 `{ "lang", "name", "value" }`,可写入 ES 的 `enriched_attributes` nested 字段 | -| `results[].tags` | array | 从语义属性中抽取的 `name=tags` 的 value 集合,可与业务原有 `tags` 合并后写入 ES 的 `tags` 字段 | +| `results` | array | 与请求 `items` 一一对应,每项含 `spu_id`、`qanchors`、`enriched_attributes`、`enriched_tags` | +| `results[].qanchors` | object | 与 ES `qanchors` 字段同结构,按语言键返回短语数组 | +| `results[].enriched_tags` | object | 与 ES `enriched_tags` 字段同结构,按语言键返回标签数组 | +| `results[].enriched_attributes` | array | 与 ES `enriched_attributes` nested 字段同结构,每项为 `{ "name", "value": { "zh"?: "...", "en"?: "..." } }` | | `results[].error` | string | 若该条处理失败(如 LLM 异常),会在此字段返回错误信息 | **错误响应**: @@ -758,8 +768,7 @@ curl -X POST "http://localhost:6004/indexer/enrich-content" \ "description": "100%棉,圆领版型,适合日常通勤与休闲穿搭。", "image_url": "https://example.com/images/223167.jpg" } - ], - "languages": ["zh", "en"] + ] }' ``` diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index d220417..7ddc5ee 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -11,9 +11,8 @@ SPU文档转换器 - 公共转换逻辑。 import pandas as pd import numpy as np import logging -import re from typing import Dict, Any, Optional, List -from indexer.product_enrich import analyze_products, split_multi_value_field +from indexer.product_enrich import build_index_content_fields logger = logging.getLogger(__name__) @@ -75,6 +74,39 @@ class SPUDocumentTransformer: ) return translations + def _build_core_language_text_object( + self, + text: Optional[str], + source_lang: str, + scene: str = "general", + ) -> Dict[str, str]: + """ + 构建与 mapping 中 core_language_text(_with_keyword) 对齐的对象。 + 当前核心语言固定为 zh/en。 + """ + if not text or not str(text).strip(): + return {} + + source_text = str(text).strip() + obj: Dict[str, str] = {} + + if source_lang in CORE_INDEX_LANGUAGES: + obj[source_lang] = source_text + + if self.translator: + translations = self._translate_index_languages( + text=source_text, + source_lang=source_lang, + index_languages=CORE_INDEX_LANGUAGES, + scene=scene, + ) + for lang in CORE_INDEX_LANGUAGES: + val = translations.get(lang) + if val and str(val).strip(): + obj[lang] = str(val).strip() + + return obj + def transform_spu_to_doc( self, tenant_id: str, @@ -118,10 +150,16 @@ class SPUDocumentTransformer: if self.enable_title_embedding and self.encoder: self._fill_title_embedding(doc) - # Tags + # Tags:统一转成与 mapping 一致的 core-language object if pd.notna(spu_row.get('tags')): tags_str = str(spu_row['tags']) - doc['tags'] = split_multi_value_field(tags_str) + tags_obj = self._build_core_language_text_object( + tags_str, + source_lang=primary_lang, + scene="general", + ) + if tags_obj: + doc['tags'] = tags_obj # Category相关字段 self._fill_category_fields(doc, spu_row) @@ -202,7 +240,8 @@ class SPUDocumentTransformer: """ 批量调用 LLM,为一批 doc 填充: - qanchors.{lang} - - enriched_attributes (lang/name/value) + - tags.{lang} + - enriched_attributes[].value.{lang} 设计目标: - 尽可能攒批调用 LLM; @@ -211,16 +250,8 @@ class SPUDocumentTransformer: if not docs or not spu_rows or len(docs) != len(spu_rows): return - try: - index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] - except Exception: - index_langs = ["en", "zh"] - # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用 - llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序 - - # 只对有 title 的 SPU 参与 LLM;其余跳过 id_to_idx: Dict[str, int] = {} - products: List[Dict[str, str]] = [] + items: List[Dict[str, str]] = [] for i, row in enumerate(spu_rows): raw_id = row.get("id") spu_id = "" if raw_id is None else str(raw_id).strip() @@ -228,69 +259,45 @@ class SPUDocumentTransformer: if not spu_id or not title: continue id_to_idx[spu_id] = i - products.append({"id": spu_id, "title": title}) - if not products: + items.append( + { + "id": spu_id, + "title": title, + "brief": str(row.get("brief") or "").strip(), + "description": str(row.get("description") or "").strip(), + "image_url": str(row.get("image_src") or "").strip(), + } + ) + if not items: return tenant_id = str(docs[0].get("tenant_id") or "").strip() or None + try: + results = build_index_content_fields(items=items, tenant_id=tenant_id) + except Exception as e: + logger.warning("LLM batch attribute fill failed: %s", e) + return - dim_keys = [ - "tags", - "target_audience", - "usage_scene", - "season", - "key_attributes", - "material", - "features", - ] - - for lang in llm_langs: - try: - rows = analyze_products( - products=products, - target_lang=lang, - batch_size=20, - tenant_id=tenant_id, - ) - except Exception as e: - logger.warning("LLM batch attribute fill failed (lang=%s): %s", lang, e) + for result in results: + spu_id = str(result.get("id") or "").strip() + if not spu_id: continue + idx = id_to_idx.get(spu_id) + if idx is None: + continue + self._apply_content_enrichment(docs[idx], result) - for row in rows or []: - spu_id = str(row.get("id") or "").strip() - if not spu_id: - continue - idx = id_to_idx.get(spu_id) - if idx is None: - continue - self._apply_llm_row(docs[idx], row=row, lang=lang, dim_keys=dim_keys) - - def _apply_llm_row(self, doc: Dict[str, Any], row: Dict[str, Any], lang: str, dim_keys: List[str]) -> None: - """将单条 LLM 输出 row 按既定结构写入 doc(不抛异常)。""" + def _apply_content_enrichment(self, doc: Dict[str, Any], enrichment: Dict[str, Any]) -> None: + """将 product_enrich 产出的 ES-ready 内容字段写入 doc。""" try: - if row.get("error"): - return - - semantic_list = doc.get("enriched_attributes") or [] - qanchors_obj = doc.get("qanchors") or {} - - anchor_text = str(row.get("anchor_text") or "").strip() - if anchor_text: - qanchors_obj[lang] = anchor_text - - for name in dim_keys: - raw = row.get(name) - if not raw: - continue - for value in split_multi_value_field(str(raw)): - semantic_list.append({"lang": lang, "name": name, "value": value}) - - if qanchors_obj: - doc["qanchors"] = qanchors_obj - if semantic_list: - doc["enriched_attributes"] = semantic_list + if enrichment.get("qanchors"): + doc["qanchors"] = enrichment["qanchors"] + if enrichment.get("tags"): + doc["tags"] = enrichment["tags"] + if enrichment.get("enriched_attributes"): + doc["enriched_attributes"] = enrichment["enriched_attributes"] except Exception as e: - logger.warning("Failed to apply LLM row to doc (spu_id=%s, lang=%s): %s", doc.get("spu_id"), lang, e) + logger.warning("Failed to apply enrichment to doc (spu_id=%s): %s", doc.get("spu_id"), e) def _fill_text_fields( self, @@ -544,6 +551,23 @@ class SPUDocumentTransformer: if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) + primary_lang = self.tenant_config.get('primary_language', 'en') + + def _build_specification(name: str, raw_value: Any, sku_id: str) -> Optional[Dict[str, Any]]: + value = "" if raw_value is None else str(raw_value).strip() + if not value: + return None + return { + 'sku_id': sku_id, + 'name': name, + 'value_keyword': value, + 'value_text': self._build_core_language_text_object( + value, + source_lang=primary_lang, + scene="general", + ) or normalize_core_text_field_value(value, primary_lang), + } + for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: @@ -584,23 +608,17 @@ class SPUDocumentTransformer: # 构建specifications(从SKU的option值和option表的name) sku_id = str(sku_row['id']) if pd.notna(sku_row.get('option1')) and 1 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[1], - 'value': str(sku_row['option1']) - }) + spec = _build_specification(option_name_map[1], sku_row['option1'], sku_id) + if spec: + specifications.append(spec) if pd.notna(sku_row.get('option2')) and 2 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[2], - 'value': str(sku_row['option2']) - }) + spec = _build_specification(option_name_map[2], sku_row['option2'], sku_id) + if spec: + specifications.append(spec) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[3], - 'value': str(sku_row['option3']) - }) + spec = _build_specification(option_name_map[3], sku_row['option3'], sku_id) + if spec: + specifications.append(spec) return skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications @@ -636,82 +654,36 @@ class SPUDocumentTransformer: def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: """ - 调用 indexer.product_enrich.analyze_products,为当前 SPU 填充: + 调用 indexer.product_enrich 的高层内容理解入口,为当前 SPU 填充: - qanchors.{lang} - - enriched_attributes (lang/name/value) + - tags.{lang} + - enriched_attributes[].value.{lang} """ - try: - index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] - except Exception: - index_langs = ["en", "zh"] - - # 不再限制为固定 SUPPORTED_LANGS,直接按照租户配置的 index_languages 调用 - llm_langs = list(dict.fromkeys(index_langs)) # 去重并保持顺序 - spu_id = str(spu_row.get("id") or "").strip() title = str(spu_row.get("title") or "").strip() if not spu_id or not title: return - semantic_list = doc.get("enriched_attributes") or [] - qanchors_obj = doc.get("qanchors") or {} - - dim_keys = [ - "tags", - "target_audience", - "usage_scene", - "season", - "key_attributes", - "material", - "features", - ] - tenant_id = doc.get("tenant_id") + try: + results = build_index_content_fields( + items=[ + { + "id": spu_id, + "title": title, + "brief": str(spu_row.get("brief") or "").strip(), + "description": str(spu_row.get("description") or "").strip(), + "image_url": str(spu_row.get("image_src") or "").strip(), + } + ], + tenant_id=str(tenant_id), + ) + except Exception as e: + logger.warning("LLM attribute fill failed for SPU %s: %s", spu_id, e) + return - for lang in llm_langs: - try: - rows = analyze_products( - products=[{"id": spu_id, "title": title}], - target_lang=lang, - batch_size=1, - tenant_id=str(tenant_id), - ) - except Exception as e: - logger.warning( - "LLM attribute fill failed for SPU %s, lang=%s: %s", - spu_id, - lang, - e, - ) - continue - - if not rows: - continue - row = rows[0] or {} - - # qanchors.{lang} - anchor_text = str(row.get("anchor_text") or "").strip() - if anchor_text: - qanchors_obj[lang] = anchor_text - - # 语义属性:按各维度拆分为短语 - for name in dim_keys: - raw = row.get(name) - if not raw: - continue - for value in split_multi_value_field(str(raw)): - semantic_list.append( - { - "lang": lang, - "name": name, - "value": value, - } - ) - - if qanchors_obj: - doc["qanchors"] = qanchors_obj - if semantic_list: - doc["enriched_attributes"] = semantic_list + if results: + self._apply_content_enrichment(doc, results[0]) def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py index 48544f1..56ff167 100644 --- a/indexer/product_enrich.py +++ b/indexer/product_enrich.py @@ -146,6 +146,16 @@ if _missing_prompt_langs: # 多值字段分隔:英文逗号、中文逗号、顿号,及历史约定的 ; | / 与空白 _MULTI_VALUE_FIELD_SPLIT_RE = re.compile(r"[,、,;|/\n\t]+") +_CORE_INDEX_LANGUAGES = ("zh", "en") +_ENRICHED_ATTRIBUTE_DIMENSIONS = ( + "enriched_tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", +) def split_multi_value_field(text: Optional[str]) -> List[str]: @@ -158,6 +168,124 @@ def split_multi_value_field(text: Optional[str]) -> List[str]: return [p.strip() for p in _MULTI_VALUE_FIELD_SPLIT_RE.split(s) if p.strip()] +def _append_lang_phrase_map(target: Dict[str, List[str]], lang: str, raw_value: Any) -> None: + parts = split_multi_value_field(raw_value) + if not parts: + return + existing = target.get(lang) or [] + merged = list(dict.fromkeys([str(x).strip() for x in existing if str(x).strip()] + parts)) + if merged: + target[lang] = merged + + +def _append_enriched_attribute( + target: List[Dict[str, Any]], + name: str, + lang: str, + raw_value: Any, +) -> None: + for value in split_multi_value_field(raw_value): + if any( + item.get("name") == name + and isinstance(item.get("value"), dict) + and item["value"].get(lang) == value + for item in target + ): + continue + target.append({"name": name, "value": {lang: value}}) + + +def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None: + if not row or row.get("error"): + return + + anchor_text = str(row.get("anchor_text") or "").strip() + if anchor_text: + _append_lang_phrase_map(result["qanchors"], lang=lang, raw_value=anchor_text) + + for name in _ENRICHED_ATTRIBUTE_DIMENSIONS: + raw = row.get(name) + if not raw: + continue + _append_enriched_attribute(result["enriched_attributes"], name=name, lang=lang, raw_value=raw) + if name == "enriched_tags": + _append_lang_phrase_map(result["enriched_tags"], lang=lang, raw_value=raw) + + +def _normalize_index_content_item(item: Dict[str, Any]) -> Dict[str, str]: + item_id = str(item.get("id") or item.get("spu_id") or "").strip() + return { + "id": item_id, + "title": str(item.get("title") or "").strip(), + "brief": str(item.get("brief") or "").strip(), + "description": str(item.get("description") or "").strip(), + "image_url": str(item.get("image_url") or "").strip(), + } + + +def build_index_content_fields( + items: List[Dict[str, Any]], + tenant_id: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + 高层入口:生成与 ES mapping 对齐的内容理解字段。 + + 输入项需包含: + - `id` 或 `spu_id` + - `title` + - 可选 `brief` / `description` / `image_url` + + 返回项结构: + - `id` + - `qanchors` + - `enriched_tags` + - `enriched_attributes` + - 可选 `error` + + 其中: + - `qanchors.{lang}` 为短语数组 + - `enriched_tags.{lang}` 为标签数组 + """ + normalized_items = [_normalize_index_content_item(item) for item in items] + if not normalized_items: + return [] + + results_by_id: Dict[str, Dict[str, Any]] = { + item["id"]: { + "id": item["id"], + "qanchors": {}, + "enriched_tags": {}, + "enriched_attributes": [], + } + for item in normalized_items + } + + for lang in _CORE_INDEX_LANGUAGES: + try: + rows = analyze_products( + products=normalized_items, + target_lang=lang, + batch_size=BATCH_SIZE, + tenant_id=tenant_id, + ) + except Exception as e: + logger.warning("build_index_content_fields failed for lang=%s: %s", lang, e) + for item in normalized_items: + results_by_id[item["id"]].setdefault("error", str(e)) + continue + + for row in rows or []: + item_id = str(row.get("id") or "").strip() + if not item_id or item_id not in results_by_id: + continue + if row.get("error"): + results_by_id[item_id].setdefault("error", row["error"]) + continue + _apply_index_content_row(results_by_id[item_id], row=row, lang=lang) + + return [results_by_id[item["id"]] for item in normalized_items] + + def _normalize_space(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip()) @@ -526,7 +654,7 @@ def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: "seq_no": parts[0], "title": parts[1], # 商品标题(按目标语言) "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 - "tags": parts[3] if len(parts) > 3 else "", # 细分标签 + "enriched_tags": parts[3] if len(parts) > 3 else "", # 细分标签 "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 "season": parts[6] if len(parts) > 6 else "", # 适用季节 @@ -603,7 +731,7 @@ def process_batch( "title_input": item.get("title", ""), "title": "", "category_path": "", - "tags": "", + "enriched_tags": "", "target_audience": "", "usage_scene": "", "season": "", @@ -643,7 +771,7 @@ def process_batch( "title_input": batch_data[i]["title"], # 原始输入标题 "title": parsed_item.get("title", ""), # 模型生成的标题 "category_path": parsed_item.get("category_path", ""), # 品类路径 - "tags": parsed_item.get("tags", ""), # 细分标签 + "enriched_tags": parsed_item.get("enriched_tags", ""), # 细分标签 "target_audience": parsed_item.get("target_audience", ""), # 适用人群 "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 "season": parsed_item.get("season", ""), # 适用季节 @@ -686,7 +814,7 @@ def process_batch( "title_input": item["title"], "title": "", "category_path": "", - "tags": "", + "enriched_tags": "", "target_audience": "", "usage_scene": "", "season": "", diff --git a/mappings/README.md b/mappings/README.md index ea70518..929b615 100644 --- a/mappings/README.md +++ b/mappings/README.md @@ -34,8 +34,8 @@ 当前字段大致分为几类: -- 全语言字段:`title`、`keywords`、`brief`、`description`、`vendor`、`category_path`、`category_name_text`、`specifications.value` -- 核心索引语言字段:`qanchors`、`tags`、`option1_values`、`option2_values`、`option3_values`、`enriched_attributes.value` +- 全语言字段:`title`、`keywords`、`brief`、`description`、`vendor`、`category_path`、`category_name_text` +- 核心索引语言字段:`qanchors`、`enriched_tags`、`option1_values`、`option2_values`、`option3_values`、`enriched_attributes.value` - 复合嵌套字段:`image_embedding`、`specifications`、`enriched_attributes`、`skus` - 其他标量字段:`tenant_id`、`spu_id`、价格、库存、类目等 @@ -63,11 +63,12 @@ 典型字段: - `qanchors` -- `tags` +- `enriched_tags` - `option1_values` - `option2_values` - `option3_values` - `enriched_attributes.value` +- `specifications.value_text` 以 `category_path` 和 `option*_values` 为例,核心语言灌入结果应至少包含: @@ -118,7 +119,6 @@ - `vendor` - `category_path` - `category_name_text` -- `specifications.value` 灌入规则: @@ -151,7 +151,7 @@ } ``` -示例:规格值 `specifications.value` +示例:规格值 `specifications.value_text` / `specifications.value_keyword` ```json { @@ -159,16 +159,21 @@ { "sku_id": "sku-red-s", "name": "color", - "value": { + "value_keyword": "красный", + "value_text": { "zh": "红色", - "en": "red", - "ru": "красный" + "en": "red" } } ] } ``` +其中: + +- `specifications.value_keyword` 保存原始规格值,用于精确过滤 / 分面 +- `specifications.value_text` 保存 `zh/en` 两个核心索引语言版本,用于检索召回 + ### 原始语言为中文或英文时 如果原始语言就是核心索引语言之一,不需要额外再写第三份语言字段。 @@ -210,7 +215,7 @@ - 标量字段:直接写固定值,例如 `tenant_id`、`spu_id`、`min_price` - 核心索引语言字段:只生成 `zh/en` - 全语言字段:生成 `zh/en`,再按原始语言补一个对应语种字段 -- 嵌套字段:对每个元素内部重复应用同样规则,例如 `specifications[].value` +- 嵌套字段:对每个元素内部重复应用同样规则,例如 `specifications[].value_text`、`enriched_attributes[].value` ### 推荐灌入流程 diff --git a/mappings/generate_search_products_mapping.py b/mappings/generate_search_products_mapping.py index ea9b24c..e102630 100644 --- a/mappings/generate_search_products_mapping.py +++ b/mappings/generate_search_products_mapping.py @@ -194,8 +194,7 @@ FIELD_SPECS = [ ), text_field("category_path", "all_language_text_with_keyword"), text_field("category_name_text", "all_language_text_with_keyword"), - text_field("qanchors", "core_language_text"), - text_field("tags", "core_language_text_with_keyword"), + text_field("tags", "all_language_text_with_keyword"), scalar_field("category_id", "keyword"), scalar_field("category_name", "keyword"), scalar_field("category_level", "integer"), @@ -209,6 +208,8 @@ FIELD_SPECS = [ scalar_field("value_keyword", "keyword"), text_field("value_text", "core_language_text_with_keyword"), ), + text_field("qanchors", "core_language_text"), + text_field("enriched_tags", "core_language_text_with_keyword"), nested_field( "enriched_attributes", scalar_field("name", "keyword"), diff --git a/suggestion/builder.py b/suggestion/builder.py index b376ab7..e418427 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -166,6 +166,29 @@ class SuggestionIndexBuilder: out = [p.strip() for p in parts if p and p.strip()] return out if out else [s] + def _iter_multilang_product_tags( + self, + raw: Any, + index_languages: List[str], + primary_language: str, + ) -> List[Tuple[str, str]]: + if isinstance(raw, dict): + pairs: List[Tuple[str, str]] = [] + for lang in index_languages: + for tag in self._iter_product_tags(raw.get(lang)): + pairs.append((lang, tag)) + return pairs + + pairs = [] + for tag in self._iter_product_tags(raw): + tag_lang, _, _ = detect_text_language_for_suggestions( + tag, + index_languages=index_languages, + primary_language=primary_language, + ) + pairs.append((tag_lang, tag)) + return pairs + @staticmethod def _looks_noise(text_value: str) -> bool: if not text_value: @@ -487,12 +510,11 @@ class SuggestionIndexBuilder: key_to_candidate[key] = c c.add_product("qanchor", spu_id=product_id) - for tag in self._iter_product_tags(src.get("tags")): - tag_lang, _, _ = detect_text_language_for_suggestions( - tag, - index_languages=index_languages, - primary_language=primary_language, - ) + for tag_lang, tag in self._iter_multilang_product_tags( + src.get("tags"), + index_languages=index_languages, + primary_language=primary_language, + ): text_norm = self._normalize_text(tag) if self._looks_noise(text_norm): continue diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index 80ba7b5..13ee04a 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -345,33 +345,25 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): import indexer.product_enrich as process_products - def _fake_analyze_products( - products: List[Dict[str, str]], - target_lang: str = "zh", - batch_size: int | None = None, - tenant_id: str | None = None, - ): - assert batch_size == 20 + def _fake_build_index_content_fields(items: List[Dict[str, str]], tenant_id: str | None = None): + assert tenant_id == "162" return [ { - "id": p["id"], - "lang": target_lang, - "title_input": p["title"], - "title": p["title"], - "category_path": "", - "tags": "tag1,tag2", - "target_audience": "", - "usage_scene": "", - "season": "", - "key_attributes": "", - "material": "", - "features": "", - "anchor_text": f"{target_lang}-anchor-{p['id']}", + "id": p["spu_id"], + "qanchors": { + "zh": [f"zh-anchor-{p['spu_id']}"], + "en": [f"en-anchor-{p['spu_id']}"], + }, + "enriched_tags": {"zh": ["tag1", "tag2"], "en": ["tag1", "tag2"]}, + "enriched_attributes": [ + {"name": "enriched_tags", "value": {"zh": "tag1"}}, + {"name": "enriched_tags", "value": {"en": "tag1"}}, + ], } - for p in products + for p in items ] - monkeypatch.setattr(process_products, "analyze_products", _fake_analyze_products) + monkeypatch.setattr(process_products, "build_index_content_fields", _fake_build_index_content_fields) response = indexer_client.post( "/indexer/enrich-content", @@ -381,7 +373,6 @@ def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch {"spu_id": "1001", "title": "T-shirt"}, {"spu_id": "1002", "title": "Toy"}, ], - "languages": ["zh", "en"], }, ) assert response.status_code == 200 @@ -390,9 +381,14 @@ def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch assert data["total"] == 2 assert len(data["results"]) == 2 assert data["results"][0]["spu_id"] == "1001" - assert data["results"][0]["qanchors"]["zh"] == "zh-anchor-1001" - assert data["results"][0]["qanchors"]["en"] == "en-anchor-1001" - assert "tag1" in data["results"][0]["tags"] + assert data["results"][0]["qanchors"]["zh"] == ["zh-anchor-1001"] + assert data["results"][0]["qanchors"]["en"] == ["en-anchor-1001"] + assert data["results"][0]["enriched_tags"]["zh"] == ["tag1", "tag2"] + assert data["results"][0]["enriched_tags"]["en"] == ["tag1", "tag2"] + assert data["results"][0]["enriched_attributes"][0] == { + "name": "enriched_tags", + "value": {"zh": "tag1"}, + } def test_indexer_documents_contract(indexer_client: TestClient): @@ -515,7 +511,6 @@ def test_indexer_enrich_content_validation_max_items(indexer_client: TestClient) json={ "tenant_id": "162", "items": [{"spu_id": str(i), "title": "x"} for i in range(51)], - "languages": ["zh"], }, ) assert response.status_code == 400 diff --git a/tests/test_llm_enrichment_batch_fill.py b/tests/test_llm_enrichment_batch_fill.py index 8dba4f3..3e05e82 100644 --- a/tests/test_llm_enrichment_batch_fill.py +++ b/tests/test_llm_enrichment_batch_fill.py @@ -7,33 +7,30 @@ import pandas as pd from indexer.document_transformer import SPUDocumentTransformer -def test_fill_llm_attributes_batch_calls_analyze_in_batches(monkeypatch): +def test_fill_llm_attributes_batch_uses_product_enrich_helper(monkeypatch): seen_calls: List[Dict[str, Any]] = [] - def _fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None): - # should always request batch_size=20 and pass full list; internal splitter handles >20 - seen_calls.append( - { - "n": len(products), - "target_lang": target_lang, - "batch_size": batch_size, - "tenant_id": tenant_id, - } - ) + def _fake_build_index_content_fields(items, tenant_id=None): + seen_calls.append({"n": len(items), "tenant_id": tenant_id}) return [ { - "id": p["id"], - "lang": target_lang, - "title_input": p["title"], - "tags": "t1,t2", - "anchor_text": f"{target_lang}-anchor-{p['id']}", + "id": item["id"], + "qanchors": { + "zh": [f"zh-anchor-{item['id']}"], + "en": [f"en-anchor-{item['id']}"], + }, + "tags": {"zh": ["t1", "t2"], "en": ["t1", "t2"]}, + "enriched_attributes": [ + {"name": "tags", "value": {"zh": "t1"}}, + {"name": "tags", "value": {"en": "t1"}}, + ], } - for p in products + for item in items ] import indexer.document_transformer as doc_tr - monkeypatch.setattr(doc_tr, "analyze_products", _fake_analyze_products) + monkeypatch.setattr(doc_tr, "build_index_content_fields", _fake_build_index_content_fields) transformer = SPUDocumentTransformer( category_id_to_name={}, @@ -54,11 +51,11 @@ def test_fill_llm_attributes_batch_calls_analyze_in_batches(monkeypatch): transformer.fill_llm_attributes_batch(docs, rows) - # called once per language, with full list; analyze_products handles splitting - assert seen_calls == [ - {"n": 45, "target_lang": "zh", "batch_size": 20, "tenant_id": "162"}, - {"n": 45, "target_lang": "en", "batch_size": 20, "tenant_id": "162"}, - ] + assert seen_calls == [{"n": 45, "tenant_id": "162"}] - assert docs[0]["qanchors"]["zh"] == "zh-anchor-0" - assert docs[0]["qanchors"]["en"] == "en-anchor-0" + assert docs[0]["qanchors"]["zh"] == ["zh-anchor-0"] + assert docs[0]["qanchors"]["en"] == ["en-anchor-0"] + assert docs[0]["tags"]["zh"] == ["t1", "t2"] + assert docs[0]["tags"]["en"] == ["t1", "t2"] + assert {"name": "tags", "value": {"zh": "t1"}} in docs[0]["enriched_attributes"] + assert {"name": "tags", "value": {"en": "t1"}} in docs[0]["enriched_attributes"] diff --git a/tests/test_suggestions.py b/tests/test_suggestions.py index 8b97b96..f9ba07a 100644 --- a/tests/test_suggestions.py +++ b/tests/test_suggestions.py @@ -403,10 +403,13 @@ def test_build_full_candidates_tags_and_qanchor_phrases(monkeypatch): "spu_id": "900", "title": {"en": "Tee", "zh": "T恤"}, "qanchors": { - "en": "slim fit, sporty casual", - "zh": "修身, 显瘦", + "en": ["slim fit", "sporty casual"], + "zh": ["修身", "显瘦"], + }, + "tags": { + "en": ["Classic", "ribbed neckline"], + "zh": ["辣妹风"], }, - "tags": ["Classic", "辣妹风", "ribbed neckline"], }, } ] -- libgit2 0.21.2