Commit 90de78aa7883f27b473ea2c3de7b48da325efdad

Authored by tangwang
1 parent a3734f13

enrich接口 因为接口迭代、跟缓存不兼容,bug修复

`indexer/product_enrich.py`,不是再补一层判断。

根因有两个:缓存 key 按内容复用,但缓存值里还带着旧商品的
`id/title_input`;同时内部分析结果在历史上混用了 `tags` 和
`enriched_tags`。这样一旦命中旧缓存,`build_index_content_fields()`
会因为 `id` 对不上把结果丢掉,最后对外就变成全空。

现在的处理是:
- 内部分析结果统一用 `tags` 作为 LLM/缓存层字段。
- 对外只在 `build_index_content_fields()` 封装时映射成
  `enriched_tags`,`enriched_attributes` 里也统一产出
`name="enriched_tags"`。
- 读取缓存时会先做归一化:把旧缓存里的 `enriched_tags` 兼容成内部
  `tags`,并把命中的缓存结果重绑到当前请求商品的 `id/title_input`。
- 写缓存时也统一写成归一化后的内部结构,并且空内容不再写入缓存。
indexer/product_enrich.py
@@ -147,15 +147,40 @@ if _missing_prompt_langs: @@ -147,15 +147,40 @@ if _missing_prompt_langs:
147 # 多值字段分隔:英文逗号、中文逗号、顿号,及历史约定的 ; | / 与空白 147 # 多值字段分隔:英文逗号、中文逗号、顿号,及历史约定的 ; | / 与空白
148 _MULTI_VALUE_FIELD_SPLIT_RE = re.compile(r"[,、,;|/\n\t]+") 148 _MULTI_VALUE_FIELD_SPLIT_RE = re.compile(r"[,、,;|/\n\t]+")
149 _CORE_INDEX_LANGUAGES = ("zh", "en") 149 _CORE_INDEX_LANGUAGES = ("zh", "en")
150 -_ENRICHED_ATTRIBUTE_DIMENSIONS = (  
151 - "enriched_tags", 150 +_ANALYSIS_ATTRIBUTE_FIELD_MAP = (
  151 + ("tags", "enriched_tags"),
  152 + ("target_audience", "target_audience"),
  153 + ("usage_scene", "usage_scene"),
  154 + ("season", "season"),
  155 + ("key_attributes", "key_attributes"),
  156 + ("material", "material"),
  157 + ("features", "features"),
  158 +)
  159 +_ANALYSIS_RESULT_FIELDS = (
  160 + "title",
  161 + "category_path",
  162 + "tags",
  163 + "target_audience",
  164 + "usage_scene",
  165 + "season",
  166 + "key_attributes",
  167 + "material",
  168 + "features",
  169 + "anchor_text",
  170 +)
  171 +_ANALYSIS_MEANINGFUL_FIELDS = (
  172 + "tags",
152 "target_audience", 173 "target_audience",
153 "usage_scene", 174 "usage_scene",
154 "season", 175 "season",
155 "key_attributes", 176 "key_attributes",
156 "material", 177 "material",
157 "features", 178 "features",
  179 + "anchor_text",
158 ) 180 )
  181 +_ANALYSIS_FIELD_ALIASES = {
  182 + "tags": ("tags", "enriched_tags"),
  183 +}
159 184
160 185
161 def split_multi_value_field(text: Optional[str]) -> List[str]: 186 def split_multi_value_field(text: Optional[str]) -> List[str]:
@@ -195,25 +220,104 @@ def _append_enriched_attribute( @@ -195,25 +220,104 @@ def _append_enriched_attribute(
195 target.append({"name": name, "value": {lang: value}}) 220 target.append({"name": name, "value": {lang: value}})
196 221
197 222
  223 +def _get_product_id(product: Dict[str, Any]) -> str:
  224 + return str(product.get("id") or product.get("spu_id") or "").strip()
  225 +
  226 +
  227 +def _get_analysis_field_aliases(field_name: str) -> Tuple[str, ...]:
  228 + return _ANALYSIS_FIELD_ALIASES.get(field_name, (field_name,))
  229 +
  230 +
  231 +def _get_analysis_field_value(row: Dict[str, Any], field_name: str) -> Any:
  232 + for alias in _get_analysis_field_aliases(field_name):
  233 + if alias in row:
  234 + return row.get(alias)
  235 + return None
  236 +
  237 +
  238 +def _has_meaningful_value(value: Any) -> bool:
  239 + if value is None:
  240 + return False
  241 + if isinstance(value, str):
  242 + return bool(value.strip())
  243 + if isinstance(value, dict):
  244 + return any(_has_meaningful_value(v) for v in value.values())
  245 + if isinstance(value, list):
  246 + return any(_has_meaningful_value(v) for v in value)
  247 + return bool(value)
  248 +
  249 +
  250 +def _make_empty_analysis_result(
  251 + product: Dict[str, Any],
  252 + target_lang: str,
  253 + error: Optional[str] = None,
  254 +) -> Dict[str, Any]:
  255 + result = {
  256 + "id": _get_product_id(product),
  257 + "lang": target_lang,
  258 + "title_input": str(product.get("title") or "").strip(),
  259 + }
  260 + for field in _ANALYSIS_RESULT_FIELDS:
  261 + result[field] = ""
  262 + if error:
  263 + result["error"] = error
  264 + return result
  265 +
  266 +
  267 +def _normalize_analysis_result(
  268 + result: Dict[str, Any],
  269 + product: Dict[str, Any],
  270 + target_lang: str,
  271 +) -> Dict[str, Any]:
  272 + normalized = _make_empty_analysis_result(product, target_lang)
  273 + if not isinstance(result, dict):
  274 + return normalized
  275 +
  276 + normalized["lang"] = str(result.get("lang") or target_lang).strip() or target_lang
  277 + normalized["title"] = str(result.get("title") or "").strip()
  278 + normalized["category_path"] = str(result.get("category_path") or "").strip()
  279 + normalized["title_input"] = str(
  280 + product.get("title") or result.get("title_input") or ""
  281 + ).strip()
  282 +
  283 + for field in _ANALYSIS_RESULT_FIELDS:
  284 + if field in {"title", "category_path"}:
  285 + continue
  286 + normalized[field] = str(_get_analysis_field_value(result, field) or "").strip()
  287 +
  288 + if result.get("error"):
  289 + normalized["error"] = str(result.get("error"))
  290 + return normalized
  291 +
  292 +
  293 +def _has_meaningful_analysis_content(result: Dict[str, Any]) -> bool:
  294 + return any(_has_meaningful_value(result.get(field)) for field in _ANALYSIS_MEANINGFUL_FIELDS)
  295 +
  296 +
198 def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None: 297 def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None:
199 if not row or row.get("error"): 298 if not row or row.get("error"):
200 return 299 return
201 300
202 - anchor_text = str(row.get("anchor_text") or "").strip() 301 + anchor_text = str(_get_analysis_field_value(row, "anchor_text") or "").strip()
203 if anchor_text: 302 if anchor_text:
204 _append_lang_phrase_map(result["qanchors"], lang=lang, raw_value=anchor_text) 303 _append_lang_phrase_map(result["qanchors"], lang=lang, raw_value=anchor_text)
205 304
206 - for name in _ENRICHED_ATTRIBUTE_DIMENSIONS:  
207 - raw = row.get(name) 305 + for source_name, output_name in _ANALYSIS_ATTRIBUTE_FIELD_MAP:
  306 + raw = _get_analysis_field_value(row, source_name)
208 if not raw: 307 if not raw:
209 continue 308 continue
210 - _append_enriched_attribute(result["enriched_attributes"], name=name, lang=lang, raw_value=raw)  
211 - if name == "enriched_tags": 309 + _append_enriched_attribute(
  310 + result["enriched_attributes"],
  311 + name=output_name,
  312 + lang=lang,
  313 + raw_value=raw,
  314 + )
  315 + if output_name == "enriched_tags":
212 _append_lang_phrase_map(result["enriched_tags"], lang=lang, raw_value=raw) 316 _append_lang_phrase_map(result["enriched_tags"], lang=lang, raw_value=raw)
213 317
214 318
215 def _normalize_index_content_item(item: Dict[str, Any]) -> Dict[str, str]: 319 def _normalize_index_content_item(item: Dict[str, Any]) -> Dict[str, str]:
216 - item_id = str(item.get("id") or item.get("spu_id") or "").strip() 320 + item_id = _get_product_id(item)
217 return { 321 return {
218 "id": item_id, 322 "id": item_id,
219 "title": str(item.get("title") or "").strip(), 323 "title": str(item.get("title") or "").strip(),
@@ -369,7 +473,10 @@ def _get_cached_anchor_result( @@ -369,7 +473,10 @@ def _get_cached_anchor_result(
369 raw = _anchor_redis.get(key) 473 raw = _anchor_redis.get(key)
370 if not raw: 474 if not raw:
371 return None 475 return None
372 - return json.loads(raw) 476 + result = _normalize_analysis_result(json.loads(raw), product=product, target_lang=target_lang)
  477 + if not _has_meaningful_analysis_content(result):
  478 + return None
  479 + return result
373 except Exception as e: 480 except Exception as e:
374 logger.warning(f"Failed to get anchor cache: {e}") 481 logger.warning(f"Failed to get anchor cache: {e}")
375 return None 482 return None
@@ -383,9 +490,12 @@ def _set_cached_anchor_result( @@ -383,9 +490,12 @@ def _set_cached_anchor_result(
383 if not _anchor_redis: 490 if not _anchor_redis:
384 return 491 return
385 try: 492 try:
  493 + normalized = _normalize_analysis_result(result, product=product, target_lang=target_lang)
  494 + if not _has_meaningful_analysis_content(normalized):
  495 + return
386 key = _make_anchor_cache_key(product, target_lang) 496 key = _make_anchor_cache_key(product, target_lang)
387 ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 497 ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600
388 - _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) 498 + _anchor_redis.setex(key, ttl, json.dumps(normalized, ensure_ascii=False))
389 except Exception as e: 499 except Exception as e:
390 logger.warning(f"Failed to set anchor cache: {e}") 500 logger.warning(f"Failed to set anchor cache: {e}")
391 501
@@ -654,7 +764,7 @@ def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: @@ -654,7 +764,7 @@ def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]:
654 "seq_no": parts[0], 764 "seq_no": parts[0],
655 "title": parts[1], # 商品标题(按目标语言) 765 "title": parts[1], # 商品标题(按目标语言)
656 "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 766 "category_path": parts[2] if len(parts) > 2 else "", # 品类路径
657 - "enriched_tags": parts[3] if len(parts) > 3 else "", # 细分标签 767 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签
658 "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 768 "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群
659 "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 769 "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景
660 "season": parts[6] if len(parts) > 6 else "", # 适用季节 770 "season": parts[6] if len(parts) > 6 else "", # 适用季节
@@ -705,7 +815,7 @@ def process_batch( @@ -705,7 +815,7 @@ def process_batch(
705 batch_data: List[Dict[str, str]], 815 batch_data: List[Dict[str, str]],
706 batch_num: int, 816 batch_num: int,
707 target_lang: str = "zh", 817 target_lang: str = "zh",
708 -) -> List[Dict[str, str]]: 818 +) -> List[Dict[str, Any]]:
709 """处理一个批次的数据""" 819 """处理一个批次的数据"""
710 logger.info(f"\n{'#' * 80}") 820 logger.info(f"\n{'#' * 80}")
711 logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") 821 logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)")
@@ -725,22 +835,11 @@ def process_batch( @@ -725,22 +835,11 @@ def process_batch(
725 target_lang, 835 target_lang,
726 ) 836 )
727 return [ 837 return [
728 - {  
729 - "id": item["id"],  
730 - "lang": target_lang,  
731 - "title_input": item.get("title", ""),  
732 - "title": "",  
733 - "category_path": "",  
734 - "enriched_tags": "",  
735 - "target_audience": "",  
736 - "usage_scene": "",  
737 - "season": "",  
738 - "key_attributes": "",  
739 - "material": "",  
740 - "features": "",  
741 - "anchor_text": "",  
742 - "error": f"prompt_creation_failed: unsupported target_lang={target_lang}",  
743 - } 838 + _make_empty_analysis_result(
  839 + item,
  840 + target_lang,
  841 + error=f"prompt_creation_failed: unsupported target_lang={target_lang}",
  842 + )
744 for item in batch_data 843 for item in batch_data
745 ] 844 ]
746 845
@@ -764,24 +863,18 @@ def process_batch( @@ -764,24 +863,18 @@ def process_batch(
764 results_with_ids = [] 863 results_with_ids = []
765 for i, parsed_item in enumerate(parsed_results): 864 for i, parsed_item in enumerate(parsed_results):
766 if i < len(batch_data): 865 if i < len(batch_data):
767 - original_id = batch_data[i]["id"]  
768 - result = {  
769 - "id": original_id,  
770 - "lang": target_lang,  
771 - "title_input": batch_data[i]["title"], # 原始输入标题  
772 - "title": parsed_item.get("title", ""), # 模型生成的标题  
773 - "category_path": parsed_item.get("category_path", ""), # 品类路径  
774 - "enriched_tags": parsed_item.get("enriched_tags", ""), # 细分标签  
775 - "target_audience": parsed_item.get("target_audience", ""), # 适用人群  
776 - "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景  
777 - "season": parsed_item.get("season", ""), # 适用季节  
778 - "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性  
779 - "material": parsed_item.get("material", ""), # 材质说明  
780 - "features": parsed_item.get("features", ""), # 功能特点  
781 - "anchor_text": parsed_item.get("anchor_text", ""), # 锚文本  
782 - } 866 + source_product = batch_data[i]
  867 + result = _normalize_analysis_result(
  868 + parsed_item,
  869 + product=source_product,
  870 + target_lang=target_lang,
  871 + )
783 results_with_ids.append(result) 872 results_with_ids.append(result)
784 - logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") 873 + logger.info(
  874 + "Mapped: seq=%s -> original_id=%s",
  875 + parsed_item.get("seq_no"),
  876 + source_product.get("id"),
  877 + )
785 878
786 # 保存批次 JSON 日志到独立文件 879 # 保存批次 JSON 日志到独立文件
787 batch_log = { 880 batch_log = {
@@ -808,22 +901,7 @@ def process_batch( @@ -808,22 +901,7 @@ def process_batch(
808 logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) 901 logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True)
809 # 返回空结果,保持ID映射 902 # 返回空结果,保持ID映射
810 return [ 903 return [
811 - {  
812 - "id": item["id"],  
813 - "lang": target_lang,  
814 - "title_input": item["title"],  
815 - "title": "",  
816 - "category_path": "",  
817 - "enriched_tags": "",  
818 - "target_audience": "",  
819 - "usage_scene": "",  
820 - "season": "",  
821 - "key_attributes": "",  
822 - "material": "",  
823 - "features": "",  
824 - "anchor_text": "",  
825 - "error": str(e),  
826 - } 904 + _make_empty_analysis_result(item, target_lang, error=str(e))
827 for item in batch_data 905 for item in batch_data
828 ] 906 ]
829 907
tests/test_product_enrich_partial_mode.py
@@ -322,6 +322,109 @@ def test_analyze_products_uses_product_level_cache_across_batch_requests(): @@ -322,6 +322,109 @@ def test_analyze_products_uses_product_level_cache_across_batch_requests():
322 assert third[1]["anchor_text"] == "anchor:shirt" 322 assert third[1]["anchor_text"] == "anchor:shirt"
323 323
324 324
  325 +def test_analyze_products_reuses_cached_content_with_current_product_identity():
  326 + cached_result = {
  327 + "id": "1165",
  328 + "lang": "zh",
  329 + "title_input": "old-title",
  330 + "title": "法式连衣裙",
  331 + "category_path": "女装>连衣裙",
  332 + "enriched_tags": "法式,收腰",
  333 + "target_audience": "年轻女性",
  334 + "usage_scene": "通勤,约会",
  335 + "season": "春季,夏季",
  336 + "key_attributes": "中长款",
  337 + "material": "聚酯纤维",
  338 + "features": "透气",
  339 + "anchor_text": "法式收腰连衣裙",
  340 + }
  341 + products = [{"id": "69960", "title": "dress"}]
  342 +
  343 + with mock.patch.object(product_enrich, "API_KEY", "fake-key"), mock.patch.object(
  344 + product_enrich,
  345 + "_get_cached_anchor_result",
  346 + wraps=lambda product, target_lang: product_enrich._normalize_analysis_result(
  347 + cached_result,
  348 + product=product,
  349 + target_lang=target_lang,
  350 + ),
  351 + ), mock.patch.object(
  352 + product_enrich,
  353 + "process_batch",
  354 + side_effect=AssertionError("process_batch should not be called on cache hit"),
  355 + ):
  356 + result = product_enrich.analyze_products(
  357 + products,
  358 + target_lang="zh",
  359 + tenant_id="170",
  360 + )
  361 +
  362 + assert result == [
  363 + {
  364 + "id": "69960",
  365 + "lang": "zh",
  366 + "title_input": "dress",
  367 + "title": "法式连衣裙",
  368 + "category_path": "女装>连衣裙",
  369 + "tags": "法式,收腰",
  370 + "target_audience": "年轻女性",
  371 + "usage_scene": "通勤,约会",
  372 + "season": "春季,夏季",
  373 + "key_attributes": "中长款",
  374 + "material": "聚酯纤维",
  375 + "features": "透气",
  376 + "anchor_text": "法式收腰连衣裙",
  377 + }
  378 + ]
  379 +
  380 +
  381 +def test_build_index_content_fields_maps_internal_tags_to_enriched_tags_output():
  382 + def fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None):
  383 + return [
  384 + {
  385 + "id": products[0]["id"],
  386 + "lang": target_lang,
  387 + "title_input": products[0]["title"],
  388 + "title": products[0]["title"],
  389 + "category_path": "玩具>滑行玩具",
  390 + "tags": f"{target_lang}-tag1,{target_lang}-tag2",
  391 + "target_audience": f"{target_lang}-audience",
  392 + "usage_scene": "",
  393 + "season": "",
  394 + "key_attributes": "",
  395 + "material": "",
  396 + "features": "",
  397 + "anchor_text": f"{target_lang}-anchor",
  398 + }
  399 + ]
  400 +
  401 + with mock.patch.object(
  402 + product_enrich,
  403 + "analyze_products",
  404 + side_effect=fake_analyze_products,
  405 + ):
  406 + result = product_enrich.build_index_content_fields(
  407 + items=[{"spu_id": "69960", "title": "dress"}],
  408 + tenant_id="170",
  409 + )
  410 +
  411 + assert result == [
  412 + {
  413 + "id": "69960",
  414 + "qanchors": {"zh": ["zh-anchor"], "en": ["en-anchor"]},
  415 + "enriched_tags": {"zh": ["zh-tag1", "zh-tag2"], "en": ["en-tag1", "en-tag2"]},
  416 + "enriched_attributes": [
  417 + {"name": "enriched_tags", "value": {"zh": "zh-tag1"}},
  418 + {"name": "enriched_tags", "value": {"zh": "zh-tag2"}},
  419 + {"name": "target_audience", "value": {"zh": "zh-audience"}},
  420 + {"name": "enriched_tags", "value": {"en": "en-tag1"}},
  421 + {"name": "enriched_tags", "value": {"en": "en-tag2"}},
  422 + {"name": "target_audience", "value": {"en": "en-audience"}},
  423 + ],
  424 + }
  425 + ]
  426 +
  427 +
325 def test_anchor_cache_key_depends_on_product_input_not_identifiers(): 428 def test_anchor_cache_key_depends_on_product_input_not_identifiers():
326 product_a = { 429 product_a = {
327 "id": "1", 430 "id": "1",