From be3f0d46fbb9b7b7bf7876e621be93bbd99dd6c1 Mon Sep 17 00:00:00 2001 From: tangwang Date: Wed, 11 Mar 2026 14:36:33 +0800 Subject: [PATCH] /indexer/enrich-content --- api/routes/indexer.py | 12 +++++++++++- indexer/document_transformer.py | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- indexer/incremental_service.py | 10 ++++++++++ indexer/process_products.py | 6 +++++- indexer/spu_transformer.py | 14 ++++++++++++-- tests/ci/test_service_api_contracts.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_llm_enrichment_batch_fill.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_process_products_batching.py | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 362 insertions(+), 6 deletions(-) create mode 100644 tests/test_llm_enrichment_batch_fill.py create mode 100644 tests/test_process_products_batching.py diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 95bded1..74dc674 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -30,7 +30,7 @@ class IndexSpusRequest(BaseModel): """增量索引请求(按SPU列表索引)""" tenant_id: str spu_ids: List[str] - delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选) + delete_spu_ids: List[str] = Field(default_factory=list) # 显式指定要删除的SPU ID列表(可选) class GetDocumentsRequest(BaseModel): @@ -225,6 +225,7 @@ async def build_docs(request: BuildDocsRequest): import pandas as pd docs: List[Dict[str, Any]] = [] + doc_spu_rows: List[pd.Series] = [] failed: List[Dict[str, Any]] = [] for item in request.items: @@ -240,6 +241,7 @@ async def build_docs(request: BuildDocsRequest): spu_row=spu_row, skus=skus_df, options=options_df, + fill_llm_attributes=False, ) if doc is None: @@ -279,6 +281,7 @@ async def build_docs(request: BuildDocsRequest): doc["title_embedding"] = emb0.tolist() docs.append(doc) + doc_spu_rows.append(spu_row) except Exception as e: failed.append( { @@ -287,6 +290,13 @@ async def build_docs(request: BuildDocsRequest): } ) + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响 build-docs 主功能) + try: + if docs and doc_spu_rows: + transformer.fill_llm_attributes_batch(docs, doc_spu_rows) + except Exception as e: + logger.warning("Batch LLM fill failed in build-docs (tenant_id=%s): %s", request.tenant_id, e) + return { "tenant_id": request.tenant_id, "docs": docs, diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 12ddaa8..7d5dff6 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -71,7 +71,8 @@ class SPUDocumentTransformer: tenant_id: str, spu_row: pd.Series, skus: pd.DataFrame, - options: pd.DataFrame + options: pd.DataFrame, + fill_llm_attributes: bool = True, ) -> Optional[Dict[str, Any]]: """ 将单个SPU行和其SKUs转换为ES文档。 @@ -181,10 +182,112 @@ class SPUDocumentTransformer: doc['update_time'] = str(update_time) # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志) - self._fill_llm_attributes(doc, spu_row) + # 注意:批处理场景(build-docs / bulk / incremental)应优先在外层攒批, + # 再调用 fill_llm_attributes_batch(),避免逐条调用 LLM。 + if fill_llm_attributes: + self._fill_llm_attributes(doc, spu_row) return doc + def fill_llm_attributes_batch(self, docs: List[Dict[str, Any]], spu_rows: List[pd.Series]) -> None: + """ + 批量调用 LLM,为一批 doc 填充: + - qanchors.{lang} + - semantic_attributes (lang/name/value) + + 设计目标: + - 尽可能攒批调用 LLM; + - 单次 LLM 调用最多 20 条(由 analyze_products 内部强制 cap 并自动拆批)。 + """ + if not docs or not spu_rows or len(docs) != len(spu_rows): + return + + try: + index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] + except Exception: + index_langs = ["en", "zh"] + llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] + if not llm_langs: + return + + # 只对有 title 的 SPU 参与 LLM;其余跳过 + id_to_idx: Dict[str, int] = {} + products: List[Dict[str, str]] = [] + for i, row in enumerate(spu_rows): + raw_id = row.get("id") + spu_id = "" if raw_id is None else str(raw_id).strip() + title = str(row.get("title") or "").strip() + if not spu_id or not title: + continue + id_to_idx[spu_id] = i + products.append({"id": spu_id, "title": title}) + if not products: + return + + tenant_id = str(docs[0].get("tenant_id") or "").strip() or None + + dim_keys = [ + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + ] + + for lang in llm_langs: + try: + rows = analyze_products( + products=products, + target_lang=lang, + batch_size=20, + tenant_id=tenant_id, + ) + except Exception as e: + logger.warning("LLM batch attribute fill failed (lang=%s): %s", lang, e) + continue + + for row in rows or []: + spu_id = str(row.get("id") or "").strip() + if not spu_id: + continue + idx = id_to_idx.get(spu_id) + if idx is None: + continue + self._apply_llm_row(docs[idx], row=row, lang=lang, dim_keys=dim_keys) + + def _apply_llm_row(self, doc: Dict[str, Any], row: Dict[str, Any], lang: str, dim_keys: List[str]) -> None: + """将单条 LLM 输出 row 按既定结构写入 doc(不抛异常)。""" + try: + if row.get("error"): + return + + semantic_list = doc.get("semantic_attributes") or [] + qanchors_obj = doc.get("qanchors") or {} + + anchor_text = str(row.get("anchor_text") or "").strip() + if anchor_text: + qanchors_obj[lang] = anchor_text + + for name in dim_keys: + raw = row.get(name) + if not raw: + continue + parts = re.split(r"[,;|/\n\t]+", str(raw)) + for part in parts: + value = part.strip() + if not value: + continue + semantic_list.append({"lang": lang, "name": name, "value": value}) + + if qanchors_obj: + doc["qanchors"] = qanchors_obj + if semantic_list: + doc["semantic_attributes"] = semantic_list + except Exception as e: + logger.warning("Failed to apply LLM row to doc (spu_id=%s, lang=%s): %s", doc.get("spu_id"), lang, e) + def _fill_text_fields( self, doc: Dict[str, Any], diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 4979ca0..aa1e5b0 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -588,6 +588,7 @@ class IncrementalIndexerService: transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id) # 按输入顺序处理 active SPUs + doc_spu_rows: List[pd.Series] = [] for spu_id in spu_ids: try: spu_id_int = int(spu_id) @@ -606,6 +607,7 @@ class IncrementalIndexerService: spu_row=spu_row, skus=skus_for_spu, options=opts_for_spu, + fill_llm_attributes=False, ) if doc is None: error_msg = "SPU transform returned None" @@ -614,6 +616,14 @@ class IncrementalIndexerService: continue documents.append((spu_id, doc)) + doc_spu_rows.append(spu_row) + + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响主流程) + try: + if documents and doc_spu_rows: + transformer.fill_llm_attributes_batch([d for _, d in documents], doc_spu_rows) + except Exception as e: + logger.warning("[IncrementalIndexing] Batch LLM fill failed: %s", e) # 批量生成 embedding(保持翻译逻辑不变;embedding 走缓存) if enable_embedding and encoder and documents: diff --git a/indexer/process_products.py b/indexer/process_products.py index 5491339..2dccda6 100644 --- a/indexer/process_products.py +++ b/indexer/process_products.py @@ -646,7 +646,11 @@ def analyze_products( ) return [cached] - bs = batch_size or BATCH_SIZE + # call_llm 一次处理上限固定为 BATCH_SIZE(默认 20): + # - 尽可能攒批处理; + # - 即便调用方传入更大的 batch_size,也会自动按上限拆批。 + req_bs = BATCH_SIZE if batch_size is None else int(batch_size) + bs = max(1, min(req_bs, BATCH_SIZE)) all_results: List[Dict[str, Any]] = [] total_batches = (len(products) + bs - 1) // bs diff --git a/indexer/spu_transformer.py b/indexer/spu_transformer.py index 5c78b34..f4900dc 100644 --- a/indexer/spu_transformer.py +++ b/indexer/spu_transformer.py @@ -219,7 +219,8 @@ class SPUTransformer: if option_groups: logger.info(f"Grouped options into {len(option_groups)} SPU groups") - documents = [] + documents: List[Dict[str, Any]] = [] + doc_spu_rows: List[pd.Series] = [] skipped_count = 0 error_count = 0 @@ -242,10 +243,12 @@ class SPUTransformer: tenant_id=self.tenant_id, spu_row=spu_row, skus=skus, - options=options + options=options, + fill_llm_attributes=False, ) if doc: documents.append(doc) + doc_spu_rows.append(spu_row) else: skipped_count += 1 logger.warning(f"SPU {spu_id} transformation returned None, skipped") @@ -253,6 +256,13 @@ class SPUTransformer: error_count += 1 logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True) + # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响主流程) + try: + if documents and doc_spu_rows: + self.document_transformer.fill_llm_attributes_batch(documents, doc_spu_rows) + except Exception as e: + logger.warning("Batch LLM fill failed in transform_batch: %s", e) + logger.info(f"Transformation complete:") logger.info(f" - Total SPUs: {len(spu_df)}") logger.info(f" - Successfully transformed: {len(documents)}") diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index a59f0ee..e6f05af 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -341,6 +341,60 @@ def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): assert data["docs"][0]["spu_id"] == "1001" +def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): + import indexer.process_products as process_products + + def _fake_analyze_products( + products: List[Dict[str, str]], + target_lang: str = "zh", + batch_size: int | None = None, + tenant_id: str | None = None, + ): + assert batch_size == 20 + return [ + { + "id": p["id"], + "lang": target_lang, + "title_input": p["title"], + "title": p["title"], + "category_path": "", + "tags": "tag1,tag2", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": f"{target_lang}-anchor-{p['id']}", + } + for p in products + ] + + monkeypatch.setattr(process_products, "analyze_products", _fake_analyze_products) + + response = indexer_client.post( + "/indexer/enrich-content", + json={ + "tenant_id": "162", + "items": [ + {"spu_id": "1001", "title": "T-shirt"}, + {"spu_id": "1002", "title": "Toy"}, + ], + "languages": ["zh", "en"], + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["tenant_id"] == "162" + assert data["total"] == 2 + assert len(data["results"]) == 2 + assert data["results"][0]["spu_id"] == "1001" + assert data["results"][0]["qanchors"]["zh"] == "zh-anchor-1001" + assert data["results"][0]["qanchors"]["en"] == "en-anchor-1001" + assert "tag1" in data["results"][0]["tags"] + + def test_indexer_documents_contract(indexer_client: TestClient): """POST /indexer/documents: tenant_id + spu_ids, returns success/failed lists (no ES write).""" response = indexer_client.post( @@ -455,6 +509,18 @@ def test_indexer_build_docs_from_db_validation_max_spu_ids(indexer_client: TestC assert response.status_code == 400 +def test_indexer_enrich_content_validation_max_items(indexer_client: TestClient): + response = indexer_client.post( + "/indexer/enrich-content", + json={ + "tenant_id": "162", + "items": [{"spu_id": str(i), "title": "x"} for i in range(51)], + "languages": ["zh"], + }, + ) + assert response.status_code == 400 + + def test_indexer_documents_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/documents: 400 when spu_ids > 100.""" response = indexer_client.post( diff --git a/tests/test_llm_enrichment_batch_fill.py b/tests/test_llm_enrichment_batch_fill.py new file mode 100644 index 0000000..8dba4f3 --- /dev/null +++ b/tests/test_llm_enrichment_batch_fill.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +import pandas as pd + +from indexer.document_transformer import SPUDocumentTransformer + + +def test_fill_llm_attributes_batch_calls_analyze_in_batches(monkeypatch): + seen_calls: List[Dict[str, Any]] = [] + + def _fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None): + # should always request batch_size=20 and pass full list; internal splitter handles >20 + seen_calls.append( + { + "n": len(products), + "target_lang": target_lang, + "batch_size": batch_size, + "tenant_id": tenant_id, + } + ) + return [ + { + "id": p["id"], + "lang": target_lang, + "title_input": p["title"], + "tags": "t1,t2", + "anchor_text": f"{target_lang}-anchor-{p['id']}", + } + for p in products + ] + + import indexer.document_transformer as doc_tr + + monkeypatch.setattr(doc_tr, "analyze_products", _fake_analyze_products) + + transformer = SPUDocumentTransformer( + category_id_to_name={}, + searchable_option_dimensions=[], + tenant_config={"index_languages": ["zh", "en"], "primary_language": "zh"}, + translator=None, + encoder=None, + enable_title_embedding=False, + image_encoder=None, + enable_image_embedding=False, + ) + + docs: List[Dict[str, Any]] = [] + rows: List[pd.Series] = [] + for i in range(45): + docs.append({"tenant_id": "162", "spu_id": str(i)}) + rows.append(pd.Series({"id": i, "title": f"title-{i}"})) + + transformer.fill_llm_attributes_batch(docs, rows) + + # called once per language, with full list; analyze_products handles splitting + assert seen_calls == [ + {"n": 45, "target_lang": "zh", "batch_size": 20, "tenant_id": "162"}, + {"n": 45, "target_lang": "en", "batch_size": 20, "tenant_id": "162"}, + ] + + assert docs[0]["qanchors"]["zh"] == "zh-anchor-0" + assert docs[0]["qanchors"]["en"] == "en-anchor-0" diff --git a/tests/test_process_products_batching.py b/tests/test_process_products_batching.py new file mode 100644 index 0000000..12de801 --- /dev/null +++ b/tests/test_process_products_batching.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +import indexer.process_products as process_products + + +def _mk_products(n: int) -> List[Dict[str, str]]: + return [{"id": str(i), "title": f"title-{i}"} for i in range(n)] + + +def test_analyze_products_caps_batch_size_to_20(monkeypatch): + monkeypatch.setattr(process_products, "API_KEY", "fake-key") + seen_batch_sizes: List[int] = [] + + def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"): + seen_batch_sizes.append(len(batch_data)) + return [ + { + "id": item["id"], + "lang": target_lang, + "title_input": item["title"], + "title": "", + "category_path": "", + "tags": "", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": "", + } + for item in batch_data + ] + + monkeypatch.setattr(process_products, "process_batch", _fake_process_batch) + monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None) + + out = process_products.analyze_products( + products=_mk_products(45), + target_lang="zh", + batch_size=200, + tenant_id="162", + ) + + assert len(out) == 45 + assert seen_batch_sizes == [20, 20, 5] + + +def test_analyze_products_uses_min_batch_size_1(monkeypatch): + monkeypatch.setattr(process_products, "API_KEY", "fake-key") + seen_batch_sizes: List[int] = [] + + def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"): + seen_batch_sizes.append(len(batch_data)) + return [ + { + "id": item["id"], + "lang": target_lang, + "title_input": item["title"], + "title": "", + "category_path": "", + "tags": "", + "target_audience": "", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "selling_points": "", + "anchor_text": "", + } + for item in batch_data + ] + + monkeypatch.setattr(process_products, "process_batch", _fake_process_batch) + monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None) + + out = process_products.analyze_products( + products=_mk_products(3), + target_lang="zh", + batch_size=0, + tenant_id="162", + ) + + assert len(out) == 3 + assert seen_batch_sizes == [1, 1, 1] -- libgit2 0.21.2