Commit be3f0d46fbb9b7b7bf7876e621be93bbd99dd6c1

Authored by tangwang
1 parent 9f5994b4

/indexer/enrich-content

api/routes/indexer.py
... ... @@ -30,7 +30,7 @@ class IndexSpusRequest(BaseModel):
30 30 """增量索引请求(按SPU列表索引)"""
31 31 tenant_id: str
32 32 spu_ids: List[str]
33   - delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选)
  33 + delete_spu_ids: List[str] = Field(default_factory=list) # 显式指定要删除的SPU ID列表(可选)
34 34  
35 35  
36 36 class GetDocumentsRequest(BaseModel):
... ... @@ -225,6 +225,7 @@ async def build_docs(request: BuildDocsRequest):
225 225 import pandas as pd
226 226  
227 227 docs: List[Dict[str, Any]] = []
  228 + doc_spu_rows: List[pd.Series] = []
228 229 failed: List[Dict[str, Any]] = []
229 230  
230 231 for item in request.items:
... ... @@ -240,6 +241,7 @@ async def build_docs(request: BuildDocsRequest):
240 241 spu_row=spu_row,
241 242 skus=skus_df,
242 243 options=options_df,
  244 + fill_llm_attributes=False,
243 245 )
244 246  
245 247 if doc is None:
... ... @@ -279,6 +281,7 @@ async def build_docs(request: BuildDocsRequest):
279 281 doc["title_embedding"] = emb0.tolist()
280 282  
281 283 docs.append(doc)
  284 + doc_spu_rows.append(spu_row)
282 285 except Exception as e:
283 286 failed.append(
284 287 {
... ... @@ -287,6 +290,13 @@ async def build_docs(request: BuildDocsRequest):
287 290 }
288 291 )
289 292  
  293 + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响 build-docs 主功能)
  294 + try:
  295 + if docs and doc_spu_rows:
  296 + transformer.fill_llm_attributes_batch(docs, doc_spu_rows)
  297 + except Exception as e:
  298 + logger.warning("Batch LLM fill failed in build-docs (tenant_id=%s): %s", request.tenant_id, e)
  299 +
290 300 return {
291 301 "tenant_id": request.tenant_id,
292 302 "docs": docs,
... ...
indexer/document_transformer.py
... ... @@ -71,7 +71,8 @@ class SPUDocumentTransformer:
71 71 tenant_id: str,
72 72 spu_row: pd.Series,
73 73 skus: pd.DataFrame,
74   - options: pd.DataFrame
  74 + options: pd.DataFrame,
  75 + fill_llm_attributes: bool = True,
75 76 ) -> Optional[Dict[str, Any]]:
76 77 """
77 78 将单个SPU行和其SKUs转换为ES文档。
... ... @@ -181,10 +182,112 @@ class SPUDocumentTransformer:
181 182 doc['update_time'] = str(update_time)
182 183  
183 184 # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志)
184   - self._fill_llm_attributes(doc, spu_row)
  185 + # 注意:批处理场景(build-docs / bulk / incremental)应优先在外层攒批,
  186 + # 再调用 fill_llm_attributes_batch(),避免逐条调用 LLM。
  187 + if fill_llm_attributes:
  188 + self._fill_llm_attributes(doc, spu_row)
185 189  
186 190 return doc
187 191  
  192 + def fill_llm_attributes_batch(self, docs: List[Dict[str, Any]], spu_rows: List[pd.Series]) -> None:
  193 + """
  194 + 批量调用 LLM,为一批 doc 填充:
  195 + - qanchors.{lang}
  196 + - semantic_attributes (lang/name/value)
  197 +
  198 + 设计目标:
  199 + - 尽可能攒批调用 LLM;
  200 + - 单次 LLM 调用最多 20 条(由 analyze_products 内部强制 cap 并自动拆批)。
  201 + """
  202 + if not docs or not spu_rows or len(docs) != len(spu_rows):
  203 + return
  204 +
  205 + try:
  206 + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"]
  207 + except Exception:
  208 + index_langs = ["en", "zh"]
  209 + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS]
  210 + if not llm_langs:
  211 + return
  212 +
  213 + # 只对有 title 的 SPU 参与 LLM;其余跳过
  214 + id_to_idx: Dict[str, int] = {}
  215 + products: List[Dict[str, str]] = []
  216 + for i, row in enumerate(spu_rows):
  217 + raw_id = row.get("id")
  218 + spu_id = "" if raw_id is None else str(raw_id).strip()
  219 + title = str(row.get("title") or "").strip()
  220 + if not spu_id or not title:
  221 + continue
  222 + id_to_idx[spu_id] = i
  223 + products.append({"id": spu_id, "title": title})
  224 + if not products:
  225 + return
  226 +
  227 + tenant_id = str(docs[0].get("tenant_id") or "").strip() or None
  228 +
  229 + dim_keys = [
  230 + "tags",
  231 + "target_audience",
  232 + "usage_scene",
  233 + "season",
  234 + "key_attributes",
  235 + "material",
  236 + "features",
  237 + ]
  238 +
  239 + for lang in llm_langs:
  240 + try:
  241 + rows = analyze_products(
  242 + products=products,
  243 + target_lang=lang,
  244 + batch_size=20,
  245 + tenant_id=tenant_id,
  246 + )
  247 + except Exception as e:
  248 + logger.warning("LLM batch attribute fill failed (lang=%s): %s", lang, e)
  249 + continue
  250 +
  251 + for row in rows or []:
  252 + spu_id = str(row.get("id") or "").strip()
  253 + if not spu_id:
  254 + continue
  255 + idx = id_to_idx.get(spu_id)
  256 + if idx is None:
  257 + continue
  258 + self._apply_llm_row(docs[idx], row=row, lang=lang, dim_keys=dim_keys)
  259 +
  260 + def _apply_llm_row(self, doc: Dict[str, Any], row: Dict[str, Any], lang: str, dim_keys: List[str]) -> None:
  261 + """将单条 LLM 输出 row 按既定结构写入 doc(不抛异常)。"""
  262 + try:
  263 + if row.get("error"):
  264 + return
  265 +
  266 + semantic_list = doc.get("semantic_attributes") or []
  267 + qanchors_obj = doc.get("qanchors") or {}
  268 +
  269 + anchor_text = str(row.get("anchor_text") or "").strip()
  270 + if anchor_text:
  271 + qanchors_obj[lang] = anchor_text
  272 +
  273 + for name in dim_keys:
  274 + raw = row.get(name)
  275 + if not raw:
  276 + continue
  277 + parts = re.split(r"[,;|/\n\t]+", str(raw))
  278 + for part in parts:
  279 + value = part.strip()
  280 + if not value:
  281 + continue
  282 + semantic_list.append({"lang": lang, "name": name, "value": value})
  283 +
  284 + if qanchors_obj:
  285 + doc["qanchors"] = qanchors_obj
  286 + if semantic_list:
  287 + doc["semantic_attributes"] = semantic_list
  288 + except Exception as e:
  289 + logger.warning("Failed to apply LLM row to doc (spu_id=%s, lang=%s): %s", doc.get("spu_id"), lang, e)
  290 +
188 291 def _fill_text_fields(
189 292 self,
190 293 doc: Dict[str, Any],
... ...
indexer/incremental_service.py
... ... @@ -588,6 +588,7 @@ class IncrementalIndexerService:
588 588 transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id)
589 589  
590 590 # 按输入顺序处理 active SPUs
  591 + doc_spu_rows: List[pd.Series] = []
591 592 for spu_id in spu_ids:
592 593 try:
593 594 spu_id_int = int(spu_id)
... ... @@ -606,6 +607,7 @@ class IncrementalIndexerService:
606 607 spu_row=spu_row,
607 608 skus=skus_for_spu,
608 609 options=opts_for_spu,
  610 + fill_llm_attributes=False,
609 611 )
610 612 if doc is None:
611 613 error_msg = "SPU transform returned None"
... ... @@ -614,6 +616,14 @@ class IncrementalIndexerService:
614 616 continue
615 617  
616 618 documents.append((spu_id, doc))
  619 + doc_spu_rows.append(spu_row)
  620 +
  621 + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响主流程)
  622 + try:
  623 + if documents and doc_spu_rows:
  624 + transformer.fill_llm_attributes_batch([d for _, d in documents], doc_spu_rows)
  625 + except Exception as e:
  626 + logger.warning("[IncrementalIndexing] Batch LLM fill failed: %s", e)
617 627  
618 628 # 批量生成 embedding(保持翻译逻辑不变;embedding 走缓存)
619 629 if enable_embedding and encoder and documents:
... ...
indexer/process_products.py
... ... @@ -646,7 +646,11 @@ def analyze_products(
646 646 )
647 647 return [cached]
648 648  
649   - bs = batch_size or BATCH_SIZE
  649 + # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20):
  650 + # - 尽可能攒批处理;
  651 + # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。
  652 + req_bs = BATCH_SIZE if batch_size is None else int(batch_size)
  653 + bs = max(1, min(req_bs, BATCH_SIZE))
650 654 all_results: List[Dict[str, Any]] = []
651 655 total_batches = (len(products) + bs - 1) // bs
652 656  
... ...
indexer/spu_transformer.py
... ... @@ -219,7 +219,8 @@ class SPUTransformer:
219 219 if option_groups:
220 220 logger.info(f"Grouped options into {len(option_groups)} SPU groups")
221 221  
222   - documents = []
  222 + documents: List[Dict[str, Any]] = []
  223 + doc_spu_rows: List[pd.Series] = []
223 224 skipped_count = 0
224 225 error_count = 0
225 226  
... ... @@ -242,10 +243,12 @@ class SPUTransformer:
242 243 tenant_id=self.tenant_id,
243 244 spu_row=spu_row,
244 245 skus=skus,
245   - options=options
  246 + options=options,
  247 + fill_llm_attributes=False,
246 248 )
247 249 if doc:
248 250 documents.append(doc)
  251 + doc_spu_rows.append(spu_row)
249 252 else:
250 253 skipped_count += 1
251 254 logger.warning(f"SPU {spu_id} transformation returned None, skipped")
... ... @@ -253,6 +256,13 @@ class SPUTransformer:
253 256 error_count += 1
254 257 logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
255 258  
  259 + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响主流程)
  260 + try:
  261 + if documents and doc_spu_rows:
  262 + self.document_transformer.fill_llm_attributes_batch(documents, doc_spu_rows)
  263 + except Exception as e:
  264 + logger.warning("Batch LLM fill failed in transform_batch: %s", e)
  265 +
256 266 logger.info(f"Transformation complete:")
257 267 logger.info(f" - Total SPUs: {len(spu_df)}")
258 268 logger.info(f" - Successfully transformed: {len(documents)}")
... ...
tests/ci/test_service_api_contracts.py
... ... @@ -341,6 +341,60 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient):
341 341 assert data["docs"][0]["spu_id"] == "1001"
342 342  
343 343  
  344 +def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch):
  345 + import indexer.process_products as process_products
  346 +
  347 + def _fake_analyze_products(
  348 + products: List[Dict[str, str]],
  349 + target_lang: str = "zh",
  350 + batch_size: int | None = None,
  351 + tenant_id: str | None = None,
  352 + ):
  353 + assert batch_size == 20
  354 + return [
  355 + {
  356 + "id": p["id"],
  357 + "lang": target_lang,
  358 + "title_input": p["title"],
  359 + "title": p["title"],
  360 + "category_path": "",
  361 + "tags": "tag1,tag2",
  362 + "target_audience": "",
  363 + "usage_scene": "",
  364 + "season": "",
  365 + "key_attributes": "",
  366 + "material": "",
  367 + "features": "",
  368 + "selling_points": "",
  369 + "anchor_text": f"{target_lang}-anchor-{p['id']}",
  370 + }
  371 + for p in products
  372 + ]
  373 +
  374 + monkeypatch.setattr(process_products, "analyze_products", _fake_analyze_products)
  375 +
  376 + response = indexer_client.post(
  377 + "/indexer/enrich-content",
  378 + json={
  379 + "tenant_id": "162",
  380 + "items": [
  381 + {"spu_id": "1001", "title": "T-shirt"},
  382 + {"spu_id": "1002", "title": "Toy"},
  383 + ],
  384 + "languages": ["zh", "en"],
  385 + },
  386 + )
  387 + assert response.status_code == 200
  388 + data = response.json()
  389 + assert data["tenant_id"] == "162"
  390 + assert data["total"] == 2
  391 + assert len(data["results"]) == 2
  392 + assert data["results"][0]["spu_id"] == "1001"
  393 + assert data["results"][0]["qanchors"]["zh"] == "zh-anchor-1001"
  394 + assert data["results"][0]["qanchors"]["en"] == "en-anchor-1001"
  395 + assert "tag1" in data["results"][0]["tags"]
  396 +
  397 +
344 398 def test_indexer_documents_contract(indexer_client: TestClient):
345 399 """POST /indexer/documents: tenant_id + spu_ids, returns success/failed lists (no ES write)."""
346 400 response = indexer_client.post(
... ... @@ -455,6 +509,18 @@ def test_indexer_build_docs_from_db_validation_max_spu_ids(indexer_client: TestC
455 509 assert response.status_code == 400
456 510  
457 511  
  512 +def test_indexer_enrich_content_validation_max_items(indexer_client: TestClient):
  513 + response = indexer_client.post(
  514 + "/indexer/enrich-content",
  515 + json={
  516 + "tenant_id": "162",
  517 + "items": [{"spu_id": str(i), "title": "x"} for i in range(51)],
  518 + "languages": ["zh"],
  519 + },
  520 + )
  521 + assert response.status_code == 400
  522 +
  523 +
458 524 def test_indexer_documents_validation_max_spu_ids(indexer_client: TestClient):
459 525 """POST /indexer/documents: 400 when spu_ids > 100."""
460 526 response = indexer_client.post(
... ...
tests/test_llm_enrichment_batch_fill.py 0 → 100644
... ... @@ -0,0 +1,64 @@
  1 +from __future__ import annotations
  2 +
  3 +from typing import Any, Dict, List
  4 +
  5 +import pandas as pd
  6 +
  7 +from indexer.document_transformer import SPUDocumentTransformer
  8 +
  9 +
  10 +def test_fill_llm_attributes_batch_calls_analyze_in_batches(monkeypatch):
  11 + seen_calls: List[Dict[str, Any]] = []
  12 +
  13 + def _fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None):
  14 + # should always request batch_size=20 and pass full list; internal splitter handles >20
  15 + seen_calls.append(
  16 + {
  17 + "n": len(products),
  18 + "target_lang": target_lang,
  19 + "batch_size": batch_size,
  20 + "tenant_id": tenant_id,
  21 + }
  22 + )
  23 + return [
  24 + {
  25 + "id": p["id"],
  26 + "lang": target_lang,
  27 + "title_input": p["title"],
  28 + "tags": "t1,t2",
  29 + "anchor_text": f"{target_lang}-anchor-{p['id']}",
  30 + }
  31 + for p in products
  32 + ]
  33 +
  34 + import indexer.document_transformer as doc_tr
  35 +
  36 + monkeypatch.setattr(doc_tr, "analyze_products", _fake_analyze_products)
  37 +
  38 + transformer = SPUDocumentTransformer(
  39 + category_id_to_name={},
  40 + searchable_option_dimensions=[],
  41 + tenant_config={"index_languages": ["zh", "en"], "primary_language": "zh"},
  42 + translator=None,
  43 + encoder=None,
  44 + enable_title_embedding=False,
  45 + image_encoder=None,
  46 + enable_image_embedding=False,
  47 + )
  48 +
  49 + docs: List[Dict[str, Any]] = []
  50 + rows: List[pd.Series] = []
  51 + for i in range(45):
  52 + docs.append({"tenant_id": "162", "spu_id": str(i)})
  53 + rows.append(pd.Series({"id": i, "title": f"title-{i}"}))
  54 +
  55 + transformer.fill_llm_attributes_batch(docs, rows)
  56 +
  57 + # called once per language, with full list; analyze_products handles splitting
  58 + assert seen_calls == [
  59 + {"n": 45, "target_lang": "zh", "batch_size": 20, "tenant_id": "162"},
  60 + {"n": 45, "target_lang": "en", "batch_size": 20, "tenant_id": "162"},
  61 + ]
  62 +
  63 + assert docs[0]["qanchors"]["zh"] == "zh-anchor-0"
  64 + assert docs[0]["qanchors"]["en"] == "en-anchor-0"
... ...
tests/test_process_products_batching.py 0 → 100644
... ... @@ -0,0 +1,89 @@
  1 +from __future__ import annotations
  2 +
  3 +from typing import Any, Dict, List
  4 +
  5 +import indexer.process_products as process_products
  6 +
  7 +
  8 +def _mk_products(n: int) -> List[Dict[str, str]]:
  9 + return [{"id": str(i), "title": f"title-{i}"} for i in range(n)]
  10 +
  11 +
  12 +def test_analyze_products_caps_batch_size_to_20(monkeypatch):
  13 + monkeypatch.setattr(process_products, "API_KEY", "fake-key")
  14 + seen_batch_sizes: List[int] = []
  15 +
  16 + def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"):
  17 + seen_batch_sizes.append(len(batch_data))
  18 + return [
  19 + {
  20 + "id": item["id"],
  21 + "lang": target_lang,
  22 + "title_input": item["title"],
  23 + "title": "",
  24 + "category_path": "",
  25 + "tags": "",
  26 + "target_audience": "",
  27 + "usage_scene": "",
  28 + "season": "",
  29 + "key_attributes": "",
  30 + "material": "",
  31 + "features": "",
  32 + "selling_points": "",
  33 + "anchor_text": "",
  34 + }
  35 + for item in batch_data
  36 + ]
  37 +
  38 + monkeypatch.setattr(process_products, "process_batch", _fake_process_batch)
  39 + monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None)
  40 +
  41 + out = process_products.analyze_products(
  42 + products=_mk_products(45),
  43 + target_lang="zh",
  44 + batch_size=200,
  45 + tenant_id="162",
  46 + )
  47 +
  48 + assert len(out) == 45
  49 + assert seen_batch_sizes == [20, 20, 5]
  50 +
  51 +
  52 +def test_analyze_products_uses_min_batch_size_1(monkeypatch):
  53 + monkeypatch.setattr(process_products, "API_KEY", "fake-key")
  54 + seen_batch_sizes: List[int] = []
  55 +
  56 + def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"):
  57 + seen_batch_sizes.append(len(batch_data))
  58 + return [
  59 + {
  60 + "id": item["id"],
  61 + "lang": target_lang,
  62 + "title_input": item["title"],
  63 + "title": "",
  64 + "category_path": "",
  65 + "tags": "",
  66 + "target_audience": "",
  67 + "usage_scene": "",
  68 + "season": "",
  69 + "key_attributes": "",
  70 + "material": "",
  71 + "features": "",
  72 + "selling_points": "",
  73 + "anchor_text": "",
  74 + }
  75 + for item in batch_data
  76 + ]
  77 +
  78 + monkeypatch.setattr(process_products, "process_batch", _fake_process_batch)
  79 + monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None)
  80 +
  81 + out = process_products.analyze_products(
  82 + products=_mk_products(3),
  83 + target_lang="zh",
  84 + batch_size=0,
  85 + tenant_id="162",
  86 + )
  87 +
  88 + assert len(out) == 3
  89 + assert seen_batch_sizes == [1, 1, 1]
... ...