diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py index 56ff167..4611005 100644 --- a/indexer/product_enrich.py +++ b/indexer/product_enrich.py @@ -147,15 +147,40 @@ if _missing_prompt_langs: # 多值字段分隔:英文逗号、中文逗号、顿号,及历史约定的 ; | / 与空白 _MULTI_VALUE_FIELD_SPLIT_RE = re.compile(r"[,、,;|/\n\t]+") _CORE_INDEX_LANGUAGES = ("zh", "en") -_ENRICHED_ATTRIBUTE_DIMENSIONS = ( - "enriched_tags", +_ANALYSIS_ATTRIBUTE_FIELD_MAP = ( + ("tags", "enriched_tags"), + ("target_audience", "target_audience"), + ("usage_scene", "usage_scene"), + ("season", "season"), + ("key_attributes", "key_attributes"), + ("material", "material"), + ("features", "features"), +) +_ANALYSIS_RESULT_FIELDS = ( + "title", + "category_path", + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + "anchor_text", +) +_ANALYSIS_MEANINGFUL_FIELDS = ( + "tags", "target_audience", "usage_scene", "season", "key_attributes", "material", "features", + "anchor_text", ) +_ANALYSIS_FIELD_ALIASES = { + "tags": ("tags", "enriched_tags"), +} def split_multi_value_field(text: Optional[str]) -> List[str]: @@ -195,25 +220,104 @@ def _append_enriched_attribute( target.append({"name": name, "value": {lang: value}}) +def _get_product_id(product: Dict[str, Any]) -> str: + return str(product.get("id") or product.get("spu_id") or "").strip() + + +def _get_analysis_field_aliases(field_name: str) -> Tuple[str, ...]: + return _ANALYSIS_FIELD_ALIASES.get(field_name, (field_name,)) + + +def _get_analysis_field_value(row: Dict[str, Any], field_name: str) -> Any: + for alias in _get_analysis_field_aliases(field_name): + if alias in row: + return row.get(alias) + return None + + +def _has_meaningful_value(value: Any) -> bool: + if value is None: + return False + if isinstance(value, str): + return bool(value.strip()) + if isinstance(value, dict): + return any(_has_meaningful_value(v) for v in value.values()) + if isinstance(value, list): + return any(_has_meaningful_value(v) for v in value) + return bool(value) + + +def _make_empty_analysis_result( + product: Dict[str, Any], + target_lang: str, + error: Optional[str] = None, +) -> Dict[str, Any]: + result = { + "id": _get_product_id(product), + "lang": target_lang, + "title_input": str(product.get("title") or "").strip(), + } + for field in _ANALYSIS_RESULT_FIELDS: + result[field] = "" + if error: + result["error"] = error + return result + + +def _normalize_analysis_result( + result: Dict[str, Any], + product: Dict[str, Any], + target_lang: str, +) -> Dict[str, Any]: + normalized = _make_empty_analysis_result(product, target_lang) + if not isinstance(result, dict): + return normalized + + normalized["lang"] = str(result.get("lang") or target_lang).strip() or target_lang + normalized["title"] = str(result.get("title") or "").strip() + normalized["category_path"] = str(result.get("category_path") or "").strip() + normalized["title_input"] = str( + product.get("title") or result.get("title_input") or "" + ).strip() + + for field in _ANALYSIS_RESULT_FIELDS: + if field in {"title", "category_path"}: + continue + normalized[field] = str(_get_analysis_field_value(result, field) or "").strip() + + if result.get("error"): + normalized["error"] = str(result.get("error")) + return normalized + + +def _has_meaningful_analysis_content(result: Dict[str, Any]) -> bool: + return any(_has_meaningful_value(result.get(field)) for field in _ANALYSIS_MEANINGFUL_FIELDS) + + def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None: if not row or row.get("error"): return - anchor_text = str(row.get("anchor_text") or "").strip() + anchor_text = str(_get_analysis_field_value(row, "anchor_text") or "").strip() if anchor_text: _append_lang_phrase_map(result["qanchors"], lang=lang, raw_value=anchor_text) - for name in _ENRICHED_ATTRIBUTE_DIMENSIONS: - raw = row.get(name) + for source_name, output_name in _ANALYSIS_ATTRIBUTE_FIELD_MAP: + raw = _get_analysis_field_value(row, source_name) if not raw: continue - _append_enriched_attribute(result["enriched_attributes"], name=name, lang=lang, raw_value=raw) - if name == "enriched_tags": + _append_enriched_attribute( + result["enriched_attributes"], + name=output_name, + lang=lang, + raw_value=raw, + ) + if output_name == "enriched_tags": _append_lang_phrase_map(result["enriched_tags"], lang=lang, raw_value=raw) def _normalize_index_content_item(item: Dict[str, Any]) -> Dict[str, str]: - item_id = str(item.get("id") or item.get("spu_id") or "").strip() + item_id = _get_product_id(item) return { "id": item_id, "title": str(item.get("title") or "").strip(), @@ -369,7 +473,10 @@ def _get_cached_anchor_result( raw = _anchor_redis.get(key) if not raw: return None - return json.loads(raw) + result = _normalize_analysis_result(json.loads(raw), product=product, target_lang=target_lang) + if not _has_meaningful_analysis_content(result): + return None + return result except Exception as e: logger.warning(f"Failed to get anchor cache: {e}") return None @@ -383,9 +490,12 @@ def _set_cached_anchor_result( if not _anchor_redis: return try: + normalized = _normalize_analysis_result(result, product=product, target_lang=target_lang) + if not _has_meaningful_analysis_content(normalized): + return key = _make_anchor_cache_key(product, target_lang) ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 - _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) + _anchor_redis.setex(key, ttl, json.dumps(normalized, ensure_ascii=False)) except Exception as e: logger.warning(f"Failed to set anchor cache: {e}") @@ -654,7 +764,7 @@ def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: "seq_no": parts[0], "title": parts[1], # 商品标题(按目标语言) "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 - "enriched_tags": parts[3] if len(parts) > 3 else "", # 细分标签 + "tags": parts[3] if len(parts) > 3 else "", # 细分标签 "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 "season": parts[6] if len(parts) > 6 else "", # 适用季节 @@ -705,7 +815,7 @@ def process_batch( batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh", -) -> List[Dict[str, str]]: +) -> List[Dict[str, Any]]: """处理一个批次的数据""" logger.info(f"\n{'#' * 80}") logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") @@ -725,22 +835,11 @@ def process_batch( target_lang, ) return [ - { - "id": item["id"], - "lang": target_lang, - "title_input": item.get("title", ""), - "title": "", - "category_path": "", - "enriched_tags": "", - "target_audience": "", - "usage_scene": "", - "season": "", - "key_attributes": "", - "material": "", - "features": "", - "anchor_text": "", - "error": f"prompt_creation_failed: unsupported target_lang={target_lang}", - } + _make_empty_analysis_result( + item, + target_lang, + error=f"prompt_creation_failed: unsupported target_lang={target_lang}", + ) for item in batch_data ] @@ -764,24 +863,18 @@ def process_batch( results_with_ids = [] for i, parsed_item in enumerate(parsed_results): if i < len(batch_data): - original_id = batch_data[i]["id"] - result = { - "id": original_id, - "lang": target_lang, - "title_input": batch_data[i]["title"], # 原始输入标题 - "title": parsed_item.get("title", ""), # 模型生成的标题 - "category_path": parsed_item.get("category_path", ""), # 品类路径 - "enriched_tags": parsed_item.get("enriched_tags", ""), # 细分标签 - "target_audience": parsed_item.get("target_audience", ""), # 适用人群 - "usage_scene": parsed_item.get("usage_scene", ""), # 使用场景 - "season": parsed_item.get("season", ""), # 适用季节 - "key_attributes": parsed_item.get("key_attributes", ""), # 关键属性 - "material": parsed_item.get("material", ""), # 材质说明 - "features": parsed_item.get("features", ""), # 功能特点 - "anchor_text": parsed_item.get("anchor_text", ""), # 锚文本 - } + source_product = batch_data[i] + result = _normalize_analysis_result( + parsed_item, + product=source_product, + target_lang=target_lang, + ) results_with_ids.append(result) - logger.info(f"Mapped: seq={parsed_item['seq_no']} -> original_id={original_id}") + logger.info( + "Mapped: seq=%s -> original_id=%s", + parsed_item.get("seq_no"), + source_product.get("id"), + ) # 保存批次 JSON 日志到独立文件 batch_log = { @@ -808,22 +901,7 @@ def process_batch( logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) # 返回空结果,保持ID映射 return [ - { - "id": item["id"], - "lang": target_lang, - "title_input": item["title"], - "title": "", - "category_path": "", - "enriched_tags": "", - "target_audience": "", - "usage_scene": "", - "season": "", - "key_attributes": "", - "material": "", - "features": "", - "anchor_text": "", - "error": str(e), - } + _make_empty_analysis_result(item, target_lang, error=str(e)) for item in batch_data ] diff --git a/tests/test_product_enrich_partial_mode.py b/tests/test_product_enrich_partial_mode.py index 705cec5..cb00eec 100644 --- a/tests/test_product_enrich_partial_mode.py +++ b/tests/test_product_enrich_partial_mode.py @@ -322,6 +322,109 @@ def test_analyze_products_uses_product_level_cache_across_batch_requests(): assert third[1]["anchor_text"] == "anchor:shirt" +def test_analyze_products_reuses_cached_content_with_current_product_identity(): + cached_result = { + "id": "1165", + "lang": "zh", + "title_input": "old-title", + "title": "法式连衣裙", + "category_path": "女装>连衣裙", + "enriched_tags": "法式,收腰", + "target_audience": "年轻女性", + "usage_scene": "通勤,约会", + "season": "春季,夏季", + "key_attributes": "中长款", + "material": "聚酯纤维", + "features": "透气", + "anchor_text": "法式收腰连衣裙", + } + products = [{"id": "69960", "title": "dress"}] + + with mock.patch.object(product_enrich, "API_KEY", "fake-key"), mock.patch.object( + product_enrich, + "_get_cached_anchor_result", + wraps=lambda product, target_lang: product_enrich._normalize_analysis_result( + cached_result, + product=product, + target_lang=target_lang, + ), + ), mock.patch.object( + product_enrich, + "process_batch", + side_effect=AssertionError("process_batch should not be called on cache hit"), + ): + result = product_enrich.analyze_products( + products, + target_lang="zh", + tenant_id="170", + ) + + assert result == [ + { + "id": "69960", + "lang": "zh", + "title_input": "dress", + "title": "法式连衣裙", + "category_path": "女装>连衣裙", + "tags": "法式,收腰", + "target_audience": "年轻女性", + "usage_scene": "通勤,约会", + "season": "春季,夏季", + "key_attributes": "中长款", + "material": "聚酯纤维", + "features": "透气", + "anchor_text": "法式收腰连衣裙", + } + ] + + +def test_build_index_content_fields_maps_internal_tags_to_enriched_tags_output(): + def fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None): + return [ + { + "id": products[0]["id"], + "lang": target_lang, + "title_input": products[0]["title"], + "title": products[0]["title"], + "category_path": "玩具>滑行玩具", + "tags": f"{target_lang}-tag1,{target_lang}-tag2", + "target_audience": f"{target_lang}-audience", + "usage_scene": "", + "season": "", + "key_attributes": "", + "material": "", + "features": "", + "anchor_text": f"{target_lang}-anchor", + } + ] + + with mock.patch.object( + product_enrich, + "analyze_products", + side_effect=fake_analyze_products, + ): + result = product_enrich.build_index_content_fields( + items=[{"spu_id": "69960", "title": "dress"}], + tenant_id="170", + ) + + assert result == [ + { + "id": "69960", + "qanchors": {"zh": ["zh-anchor"], "en": ["en-anchor"]}, + "enriched_tags": {"zh": ["zh-tag1", "zh-tag2"], "en": ["en-tag1", "en-tag2"]}, + "enriched_attributes": [ + {"name": "enriched_tags", "value": {"zh": "zh-tag1"}}, + {"name": "enriched_tags", "value": {"zh": "zh-tag2"}}, + {"name": "target_audience", "value": {"zh": "zh-audience"}}, + {"name": "enriched_tags", "value": {"en": "en-tag1"}}, + {"name": "enriched_tags", "value": {"en": "en-tag2"}}, + {"name": "target_audience", "value": {"en": "en-audience"}}, + ], + } + ] + + def test_anchor_cache_key_depends_on_product_input_not_identifiers(): product_a = { "id": "1", -- libgit2 0.21.2