diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index f775f4f..d9e56cb 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -242,6 +242,7 @@ class SPUDocumentTransformer: - qanchors.{lang} - enriched_tags.{lang} - enriched_attributes[].value.{lang} + - enriched_taxonomy_attributes[].value.{lang} 设计目标: - 尽可能攒批调用 LLM; @@ -296,6 +297,8 @@ class SPUDocumentTransformer: doc["enriched_tags"] = enrichment["enriched_tags"] if enrichment.get("enriched_attributes"): doc["enriched_attributes"] = enrichment["enriched_attributes"] + if enrichment.get("enriched_taxonomy_attributes"): + doc["enriched_taxonomy_attributes"] = enrichment["enriched_taxonomy_attributes"] except Exception as e: logger.warning("Failed to apply enrichment to doc (spu_id=%s): %s", doc.get("spu_id"), e) diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py index c3397fb..da14fc1 100644 --- a/indexer/product_enrich.py +++ b/indexer/product_enrich.py @@ -14,6 +14,7 @@ import time import hashlib import uuid import threading +from dataclasses import dataclass, field from collections import OrderedDict from datetime import datetime from concurrent.futures import ThreadPoolExecutor @@ -30,6 +31,9 @@ from indexer.product_enrich_prompts import ( USER_INSTRUCTION_TEMPLATE, LANGUAGE_MARKDOWN_TABLE_HEADERS, SHARED_ANALYSIS_INSTRUCTION, + TAXONOMY_LANGUAGE_MARKDOWN_TABLE_HEADERS, + TAXONOMY_MARKDOWN_TABLE_HEADERS_EN, + TAXONOMY_SHARED_ANALYSIS_INSTRUCTION, ) # 配置 @@ -147,7 +151,7 @@ if _missing_prompt_langs: # 多值字段分隔:英文逗号、中文逗号、顿号,及历史约定的 ; | / 与空白 _MULTI_VALUE_FIELD_SPLIT_RE = re.compile(r"[,、,;|/\n\t]+") _CORE_INDEX_LANGUAGES = ("zh", "en") -_ANALYSIS_ATTRIBUTE_FIELD_MAP = ( +_CONTENT_ANALYSIS_ATTRIBUTE_FIELD_MAP = ( ("tags", "enriched_tags"), ("target_audience", "target_audience"), ("usage_scene", "usage_scene"), @@ -156,7 +160,7 @@ _ANALYSIS_ATTRIBUTE_FIELD_MAP = ( ("material", "material"), ("features", "features"), ) -_ANALYSIS_RESULT_FIELDS = ( +_CONTENT_ANALYSIS_RESULT_FIELDS = ( "title", "category_path", "tags", @@ -168,7 +172,7 @@ _ANALYSIS_RESULT_FIELDS = ( "features", "anchor_text", ) -_ANALYSIS_MEANINGFUL_FIELDS = ( +_CONTENT_ANALYSIS_MEANINGFUL_FIELDS = ( "tags", "target_audience", "usage_scene", @@ -178,9 +182,89 @@ _ANALYSIS_MEANINGFUL_FIELDS = ( "features", "anchor_text", ) -_ANALYSIS_FIELD_ALIASES = { +_CONTENT_ANALYSIS_FIELD_ALIASES = { "tags": ("tags", "enriched_tags"), } +_CONTENT_ANALYSIS_QUALITY_FIELDS = ("title", "category_path", "anchor_text") +_TAXONOMY_ANALYSIS_ATTRIBUTE_FIELD_MAP = ( + ("product_type", "Product Type"), + ("target_gender", "Target Gender"), + ("age_group", "Age Group"), + ("season", "Season"), + ("fit", "Fit"), + ("silhouette", "Silhouette"), + ("neckline", "Neckline"), + ("sleeve_length_type", "Sleeve Length Type"), + ("sleeve_style", "Sleeve Style"), + ("strap_type", "Strap Type"), + ("rise_waistline", "Rise / Waistline"), + ("leg_shape", "Leg Shape"), + ("skirt_shape", "Skirt Shape"), + ("length_type", "Length Type"), + ("closure_type", "Closure Type"), + ("design_details", "Design Details"), + ("fabric", "Fabric"), + ("material_composition", "Material Composition"), + ("fabric_properties", "Fabric Properties"), + ("clothing_features", "Clothing Features"), + ("functional_benefits", "Functional Benefits"), + ("color", "Color"), + ("color_family", "Color Family"), + ("print_pattern", "Print / Pattern"), + ("occasion_end_use", "Occasion / End Use"), + ("style_aesthetic", "Style Aesthetic"), +) +_TAXONOMY_ANALYSIS_RESULT_FIELDS = tuple( + field_name for field_name, _ in _TAXONOMY_ANALYSIS_ATTRIBUTE_FIELD_MAP +) + + +@dataclass(frozen=True) +class AnalysisSchema: + name: str + shared_instruction: str + markdown_table_headers: Dict[str, List[str]] + result_fields: Tuple[str, ...] + meaningful_fields: Tuple[str, ...] + field_aliases: Dict[str, Tuple[str, ...]] = field(default_factory=dict) + fallback_headers: Optional[List[str]] = None + quality_fields: Tuple[str, ...] = () + + def get_headers(self, target_lang: str) -> Optional[List[str]]: + headers = self.markdown_table_headers.get(target_lang) + if headers: + return headers + if self.fallback_headers: + return self.fallback_headers + return None + + +_ANALYSIS_SCHEMAS: Dict[str, AnalysisSchema] = { + "content": AnalysisSchema( + name="content", + shared_instruction=SHARED_ANALYSIS_INSTRUCTION, + markdown_table_headers=LANGUAGE_MARKDOWN_TABLE_HEADERS, + result_fields=_CONTENT_ANALYSIS_RESULT_FIELDS, + meaningful_fields=_CONTENT_ANALYSIS_MEANINGFUL_FIELDS, + field_aliases=_CONTENT_ANALYSIS_FIELD_ALIASES, + quality_fields=_CONTENT_ANALYSIS_QUALITY_FIELDS, + ), + "taxonomy": AnalysisSchema( + name="taxonomy", + shared_instruction=TAXONOMY_SHARED_ANALYSIS_INSTRUCTION, + markdown_table_headers=TAXONOMY_LANGUAGE_MARKDOWN_TABLE_HEADERS, + result_fields=_TAXONOMY_ANALYSIS_RESULT_FIELDS, + meaningful_fields=_TAXONOMY_ANALYSIS_RESULT_FIELDS, + fallback_headers=TAXONOMY_MARKDOWN_TABLE_HEADERS_EN, + ), +} + + +def _get_analysis_schema(analysis_kind: str) -> AnalysisSchema: + schema = _ANALYSIS_SCHEMAS.get(analysis_kind) + if schema is None: + raise ValueError(f"Unsupported analysis_kind: {analysis_kind}") + return schema def split_multi_value_field(text: Optional[str]) -> List[str]: @@ -235,12 +319,12 @@ def _get_product_id(product: Dict[str, Any]) -> str: return str(product.get("id") or product.get("spu_id") or "").strip() -def _get_analysis_field_aliases(field_name: str) -> Tuple[str, ...]: - return _ANALYSIS_FIELD_ALIASES.get(field_name, (field_name,)) +def _get_analysis_field_aliases(field_name: str, schema: AnalysisSchema) -> Tuple[str, ...]: + return schema.field_aliases.get(field_name, (field_name,)) -def _get_analysis_field_value(row: Dict[str, Any], field_name: str) -> Any: - for alias in _get_analysis_field_aliases(field_name): +def _get_analysis_field_value(row: Dict[str, Any], field_name: str, schema: AnalysisSchema) -> Any: + for alias in _get_analysis_field_aliases(field_name, schema): if alias in row: return row.get(alias) return None @@ -261,6 +345,7 @@ def _has_meaningful_value(value: Any) -> bool: def _make_empty_analysis_result( product: Dict[str, Any], target_lang: str, + schema: AnalysisSchema, error: Optional[str] = None, ) -> Dict[str, Any]: result = { @@ -268,7 +353,7 @@ def _make_empty_analysis_result( "lang": target_lang, "title_input": str(product.get("title") or "").strip(), } - for field in _ANALYSIS_RESULT_FIELDS: + for field in schema.result_fields: result[field] = "" if error: result["error"] = error @@ -279,42 +364,59 @@ def _normalize_analysis_result( result: Dict[str, Any], product: Dict[str, Any], target_lang: str, + schema: AnalysisSchema, ) -> Dict[str, Any]: - normalized = _make_empty_analysis_result(product, target_lang) + normalized = _make_empty_analysis_result(product, target_lang, schema) if not isinstance(result, dict): return normalized normalized["lang"] = str(result.get("lang") or target_lang).strip() or target_lang - normalized["title"] = str(result.get("title") or "").strip() - normalized["category_path"] = str(result.get("category_path") or "").strip() normalized["title_input"] = str( product.get("title") or result.get("title_input") or "" ).strip() - for field in _ANALYSIS_RESULT_FIELDS: - if field in {"title", "category_path"}: - continue - normalized[field] = str(_get_analysis_field_value(result, field) or "").strip() + for field in schema.result_fields: + normalized[field] = str(_get_analysis_field_value(result, field, schema) or "").strip() if result.get("error"): normalized["error"] = str(result.get("error")) return normalized -def _has_meaningful_analysis_content(result: Dict[str, Any]) -> bool: - return any(_has_meaningful_value(result.get(field)) for field in _ANALYSIS_MEANINGFUL_FIELDS) +def _has_meaningful_analysis_content(result: Dict[str, Any], schema: AnalysisSchema) -> bool: + return any(_has_meaningful_value(result.get(field)) for field in schema.meaningful_fields) + + +def _append_analysis_attributes( + target: List[Dict[str, Any]], + row: Dict[str, Any], + lang: str, + schema: AnalysisSchema, + field_map: Tuple[Tuple[str, str], ...], +) -> None: + for source_name, output_name in field_map: + raw = _get_analysis_field_value(row, source_name, schema) + if not raw: + continue + _append_named_lang_phrase_map( + target, + name=output_name, + lang=lang, + raw_value=raw, + ) def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None: if not row or row.get("error"): return - anchor_text = str(_get_analysis_field_value(row, "anchor_text") or "").strip() + content_schema = _get_analysis_schema("content") + anchor_text = str(_get_analysis_field_value(row, "anchor_text", content_schema) or "").strip() if anchor_text: _append_lang_phrase_map(result["qanchors"], lang=lang, raw_value=anchor_text) - for source_name, output_name in _ANALYSIS_ATTRIBUTE_FIELD_MAP: - raw = _get_analysis_field_value(row, source_name) + for source_name, output_name in _CONTENT_ANALYSIS_ATTRIBUTE_FIELD_MAP: + raw = _get_analysis_field_value(row, source_name, content_schema) if not raw: continue _append_named_lang_phrase_map( @@ -327,6 +429,19 @@ def _apply_index_content_row(result: Dict[str, Any], row: Dict[str, Any], lang: _append_lang_phrase_map(result["enriched_tags"], lang=lang, raw_value=raw) +def _apply_index_taxonomy_row(result: Dict[str, Any], row: Dict[str, Any], lang: str) -> None: + if not row or row.get("error"): + return + + _append_analysis_attributes( + result["enriched_taxonomy_attributes"], + row=row, + lang=lang, + schema=_get_analysis_schema("taxonomy"), + field_map=_TAXONOMY_ANALYSIS_ATTRIBUTE_FIELD_MAP, + ) + + def _normalize_index_content_item(item: Dict[str, Any]) -> Dict[str, str]: item_id = _get_product_id(item) return { @@ -355,6 +470,7 @@ def build_index_content_fields( - `qanchors` - `enriched_tags` - `enriched_attributes` + - `enriched_taxonomy_attributes` - 可选 `error` 其中: @@ -371,6 +487,7 @@ def build_index_content_fields( "qanchors": {}, "enriched_tags": {}, "enriched_attributes": [], + "enriched_taxonomy_attributes": [], } for item in normalized_items } @@ -398,6 +515,33 @@ def build_index_content_fields( continue _apply_index_content_row(results_by_id[item_id], row=row, lang=lang) + try: + taxonomy_rows = analyze_products( + products=normalized_items, + target_lang=lang, + batch_size=BATCH_SIZE, + tenant_id=tenant_id, + analysis_kind="taxonomy", + ) + except Exception as e: + logger.warning( + "build_index_content_fields taxonomy enrichment failed for lang=%s: %s", + lang, + e, + ) + for item in normalized_items: + results_by_id[item["id"]].setdefault("error", str(e)) + continue + + for row in taxonomy_rows or []: + item_id = str(row.get("id") or "").strip() + if not item_id or item_id not in results_by_id: + continue + if row.get("error"): + results_by_id[item_id].setdefault("error", row["error"]) + continue + _apply_index_taxonomy_row(results_by_id[item_id], row=row, lang=lang) + return [results_by_id[item["id"]] for item in normalized_items] @@ -463,52 +607,89 @@ def _build_prompt_input_text(product: Dict[str, Any]) -> str: return _truncate_by_words(candidate, PROMPT_INPUT_MAX_WORDS) -def _make_anchor_cache_key( +def _make_analysis_cache_key( product: Dict[str, Any], target_lang: str, + analysis_kind: str, ) -> str: - """构造缓存 key,仅由 prompt 实际输入文本内容 + 目标语言决定。""" + """构造缓存 key,仅由分析类型、prompt 实际输入文本内容与目标语言决定。""" prompt_input = _build_prompt_input_text(product) h = hashlib.md5(prompt_input.encode("utf-8")).hexdigest() - return f"{ANCHOR_CACHE_PREFIX}:{target_lang}:{prompt_input[:4]}{h}" + return f"{ANCHOR_CACHE_PREFIX}:{analysis_kind}:{target_lang}:{prompt_input[:4]}{h}" -def _get_cached_anchor_result( +def _make_anchor_cache_key( product: Dict[str, Any], target_lang: str, +) -> str: + return _make_analysis_cache_key(product, target_lang, analysis_kind="content") + + +def _get_cached_analysis_result( + product: Dict[str, Any], + target_lang: str, + analysis_kind: str, ) -> Optional[Dict[str, Any]]: if not _anchor_redis: return None + schema = _get_analysis_schema(analysis_kind) try: - key = _make_anchor_cache_key(product, target_lang) + key = _make_analysis_cache_key(product, target_lang, analysis_kind) raw = _anchor_redis.get(key) if not raw: return None - result = _normalize_analysis_result(json.loads(raw), product=product, target_lang=target_lang) - if not _has_meaningful_analysis_content(result): + result = _normalize_analysis_result( + json.loads(raw), + product=product, + target_lang=target_lang, + schema=schema, + ) + if not _has_meaningful_analysis_content(result, schema): return None return result except Exception as e: - logger.warning(f"Failed to get anchor cache: {e}") + logger.warning("Failed to get %s analysis cache: %s", analysis_kind, e) return None -def _set_cached_anchor_result( +def _get_cached_anchor_result( + product: Dict[str, Any], + target_lang: str, +) -> Optional[Dict[str, Any]]: + return _get_cached_analysis_result(product, target_lang, analysis_kind="content") + + +def _set_cached_analysis_result( product: Dict[str, Any], target_lang: str, result: Dict[str, Any], + analysis_kind: str, ) -> None: if not _anchor_redis: return + schema = _get_analysis_schema(analysis_kind) try: - normalized = _normalize_analysis_result(result, product=product, target_lang=target_lang) - if not _has_meaningful_analysis_content(normalized): + normalized = _normalize_analysis_result( + result, + product=product, + target_lang=target_lang, + schema=schema, + ) + if not _has_meaningful_analysis_content(normalized, schema): return - key = _make_anchor_cache_key(product, target_lang) + key = _make_analysis_cache_key(product, target_lang, analysis_kind) ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 _anchor_redis.setex(key, ttl, json.dumps(normalized, ensure_ascii=False)) except Exception as e: - logger.warning(f"Failed to set anchor cache: {e}") + logger.warning("Failed to set %s analysis cache: %s", analysis_kind, e) + + +def _set_cached_anchor_result( + product: Dict[str, Any], + target_lang: str, + result: Dict[str, Any], +) -> None: + _set_cached_analysis_result(product, target_lang, result, analysis_kind="content") def _build_assistant_prefix(headers: List[str]) -> str: @@ -517,8 +698,8 @@ def _build_assistant_prefix(headers: List[str]) -> str: return f"{header_line}\n{separator_line}\n" -def _build_shared_context(products: List[Dict[str, str]]) -> str: - shared_context = SHARED_ANALYSIS_INSTRUCTION +def _build_shared_context(products: List[Dict[str, str]], schema: AnalysisSchema) -> str: + shared_context = schema.shared_instruction for idx, product in enumerate(products, 1): prompt_input = _build_prompt_input_text(product) shared_context += f"{idx}. {prompt_input}\n" @@ -550,16 +731,19 @@ def reset_logged_shared_context_keys() -> None: def create_prompt( products: List[Dict[str, str]], target_lang: str = "zh", -) -> Tuple[str, str, str]: + analysis_kind: str = "content", +) -> Tuple[Optional[str], Optional[str], Optional[str]]: """根据目标语言创建共享上下文、本地化输出要求和 Partial Mode assistant 前缀。""" - markdown_table_headers = LANGUAGE_MARKDOWN_TABLE_HEADERS.get(target_lang) + schema = _get_analysis_schema(analysis_kind) + markdown_table_headers = schema.get_headers(target_lang) if not markdown_table_headers: logger.warning( - "Unsupported target_lang for markdown table headers: %s", + "Unsupported target_lang for markdown table headers: kind=%s lang=%s", + analysis_kind, target_lang, ) return None, None, None - shared_context = _build_shared_context(products) + shared_context = _build_shared_context(products, schema) language_label = SOURCE_LANG_CODE_MAP.get(target_lang, target_lang) user_prompt = USER_INSTRUCTION_TEMPLATE.format(language=language_label).strip() assistant_prefix = _build_assistant_prefix(markdown_table_headers) @@ -592,6 +776,7 @@ def call_llm( user_prompt: str, assistant_prefix: str, target_lang: str = "zh", + analysis_kind: str = "content", ) -> Tuple[str, str]: """调用大模型 API(带重试机制),使用 Partial Mode 强制 markdown 表格前缀。""" headers = { @@ -631,8 +816,9 @@ def call_llm( if _mark_shared_context_logged_once(shared_context_key): logger.info(f"\n{'=' * 80}") logger.info( - "LLM Shared Context [model=%s, shared_key=%s, chars=%s] (logged once per process key)", + "LLM Shared Context [model=%s, kind=%s, shared_key=%s, chars=%s] (logged once per process key)", MODEL_NAME, + analysis_kind, shared_context_key, len(shared_context), ) @@ -641,8 +827,9 @@ def call_llm( verbose_logger.info(f"\n{'=' * 80}") verbose_logger.info( - "LLM Request [model=%s, lang=%s, shared_key=%s, tail_key=%s]:", + "LLM Request [model=%s, kind=%s, lang=%s, shared_key=%s, tail_key=%s]:", MODEL_NAME, + analysis_kind, target_lang, shared_context_key, localized_tail_key, @@ -654,7 +841,8 @@ def call_llm( verbose_logger.info(f"\nAssistant Prefix:\n{assistant_prefix}") logger.info( - "\nLLM Request Variant [lang=%s, shared_key=%s, tail_key=%s, prompt_chars=%s, prefix_chars=%s]", + "\nLLM Request Variant [kind=%s, lang=%s, shared_key=%s, tail_key=%s, prompt_chars=%s, prefix_chars=%s]", + analysis_kind, target_lang, shared_context_key, localized_tail_key, @@ -685,8 +873,9 @@ def call_llm( usage = result.get("usage") or {} verbose_logger.info( - "\nLLM Response [model=%s, lang=%s, shared_key=%s, tail_key=%s]:", + "\nLLM Response [model=%s, kind=%s, lang=%s, shared_key=%s, tail_key=%s]:", MODEL_NAME, + analysis_kind, target_lang, shared_context_key, localized_tail_key, @@ -697,7 +886,8 @@ def call_llm( full_markdown = _merge_partial_response(assistant_prefix, generated_content) logger.info( - "\nLLM Response Summary [lang=%s, shared_key=%s, tail_key=%s, generated_chars=%s, completion_tokens=%s, prompt_tokens=%s, total_tokens=%s]", + "\nLLM Response Summary [kind=%s, lang=%s, shared_key=%s, tail_key=%s, generated_chars=%s, completion_tokens=%s, prompt_tokens=%s, total_tokens=%s]", + analysis_kind, target_lang, shared_context_key, localized_tail_key, @@ -742,8 +932,12 @@ def call_llm( session.close() -def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: +def parse_markdown_table( + markdown_content: str, + analysis_kind: str = "content", +) -> List[Dict[str, str]]: """解析markdown表格内容""" + schema = _get_analysis_schema(analysis_kind) lines = markdown_content.strip().split("\n") data = [] data_started = False @@ -768,22 +962,15 @@ def parse_markdown_table(markdown_content: str) -> List[Dict[str, str]]: # 解析数据行 parts = [p.strip() for p in line.split("|")] - parts = [p for p in parts if p] # 移除空字符串 + if parts and parts[0] == "": + parts = parts[1:] + if parts and parts[-1] == "": + parts = parts[:-1] if len(parts) >= 2: - row = { - "seq_no": parts[0], - "title": parts[1], # 商品标题(按目标语言) - "category_path": parts[2] if len(parts) > 2 else "", # 品类路径 - "tags": parts[3] if len(parts) > 3 else "", # 细分标签 - "target_audience": parts[4] if len(parts) > 4 else "", # 适用人群 - "usage_scene": parts[5] if len(parts) > 5 else "", # 使用场景 - "season": parts[6] if len(parts) > 6 else "", # 适用季节 - "key_attributes": parts[7] if len(parts) > 7 else "", # 关键属性 - "material": parts[8] if len(parts) > 8 else "", # 材质说明 - "features": parts[9] if len(parts) > 9 else "", # 功能特点 - "anchor_text": parts[10] if len(parts) > 10 else "", # 锚文本 - } + row = {"seq_no": parts[0]} + for field_index, field_name in enumerate(schema.result_fields, start=1): + row[field_name] = parts[field_index] if len(parts) > field_index else "" data.append(row) return data @@ -794,31 +981,45 @@ def _log_parsed_result_quality( parsed_results: List[Dict[str, str]], target_lang: str, batch_num: int, + analysis_kind: str, ) -> None: + schema = _get_analysis_schema(analysis_kind) expected = len(batch_data) actual = len(parsed_results) if actual != expected: logger.warning( - "Parsed row count mismatch for batch=%s lang=%s: expected=%s actual=%s", + "Parsed row count mismatch for kind=%s batch=%s lang=%s: expected=%s actual=%s", + analysis_kind, batch_num, target_lang, expected, actual, ) - missing_anchor = sum(1 for item in parsed_results if not str(item.get("anchor_text") or "").strip()) - missing_category = sum(1 for item in parsed_results if not str(item.get("category_path") or "").strip()) - missing_title = sum(1 for item in parsed_results if not str(item.get("title") or "").strip()) + if not schema.quality_fields: + logger.info( + "Parsed Quality Summary [kind=%s, batch=%s, lang=%s]: rows=%s/%s", + analysis_kind, + batch_num, + target_lang, + actual, + expected, + ) + return + missing_summary = ", ".join( + f"missing_{field}=" + f"{sum(1 for item in parsed_results if not str(item.get(field) or '').strip())}" + for field in schema.quality_fields + ) logger.info( - "Parsed Quality Summary [batch=%s, lang=%s]: rows=%s/%s, missing_title=%s, missing_category=%s, missing_anchor=%s", + "Parsed Quality Summary [kind=%s, batch=%s, lang=%s]: rows=%s/%s, %s", + analysis_kind, batch_num, target_lang, actual, expected, - missing_title, - missing_category, - missing_anchor, + missing_summary, ) @@ -826,29 +1027,39 @@ def process_batch( batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh", + analysis_kind: str = "content", ) -> List[Dict[str, Any]]: """处理一个批次的数据""" + schema = _get_analysis_schema(analysis_kind) logger.info(f"\n{'#' * 80}") - logger.info(f"Processing Batch {batch_num} ({len(batch_data)} items)") + logger.info( + "Processing Batch %s (%s items, kind=%s)", + batch_num, + len(batch_data), + analysis_kind, + ) # 创建提示词 shared_context, user_prompt, assistant_prefix = create_prompt( batch_data, target_lang=target_lang, + analysis_kind=analysis_kind, ) # 如果提示词创建失败(例如不支持的 target_lang),本次批次整体失败,不再继续调用 LLM if shared_context is None or user_prompt is None or assistant_prefix is None: logger.error( - "Failed to create prompt for batch %s, target_lang=%s; " + "Failed to create prompt for batch %s, kind=%s, target_lang=%s; " "marking entire batch as failed without calling LLM", batch_num, + analysis_kind, target_lang, ) return [ _make_empty_analysis_result( item, target_lang, + schema, error=f"prompt_creation_failed: unsupported target_lang={target_lang}", ) for item in batch_data @@ -861,11 +1072,18 @@ def process_batch( user_prompt, assistant_prefix, target_lang=target_lang, + analysis_kind=analysis_kind, ) # 解析结果 - parsed_results = parse_markdown_table(raw_response) - _log_parsed_result_quality(batch_data, parsed_results, target_lang, batch_num) + parsed_results = parse_markdown_table(raw_response, analysis_kind=analysis_kind) + _log_parsed_result_quality( + batch_data, + parsed_results, + target_lang, + batch_num, + analysis_kind, + ) logger.info(f"\nParsed Results ({len(parsed_results)} items):") logger.info(json.dumps(parsed_results, ensure_ascii=False, indent=2)) @@ -879,10 +1097,12 @@ def process_batch( parsed_item, product=source_product, target_lang=target_lang, + schema=schema, ) results_with_ids.append(result) logger.info( - "Mapped: seq=%s -> original_id=%s", + "Mapped: kind=%s seq=%s -> original_id=%s", + analysis_kind, parsed_item.get("seq_no"), source_product.get("id"), ) @@ -890,6 +1110,7 @@ def process_batch( # 保存批次 JSON 日志到独立文件 batch_log = { "batch_num": batch_num, + "analysis_kind": analysis_kind, "timestamp": datetime.now().isoformat(), "input_products": batch_data, "raw_response": raw_response, @@ -900,7 +1121,10 @@ def process_batch( # 并发写 batch json 日志时,保证文件名唯一避免覆盖 batch_call_id = uuid.uuid4().hex[:12] - batch_log_file = LOG_DIR / f"batch_{batch_num:04d}_{timestamp}_{batch_call_id}.json" + batch_log_file = ( + LOG_DIR + / f"batch_{analysis_kind}_{batch_num:04d}_{timestamp}_{batch_call_id}.json" + ) with open(batch_log_file, "w", encoding="utf-8") as f: json.dump(batch_log, f, ensure_ascii=False, indent=2) @@ -912,7 +1136,7 @@ def process_batch( logger.error(f"Error processing batch {batch_num}: {str(e)}", exc_info=True) # 返回空结果,保持ID映射 return [ - _make_empty_analysis_result(item, target_lang, error=str(e)) + _make_empty_analysis_result(item, target_lang, schema, error=str(e)) for item in batch_data ] @@ -922,6 +1146,7 @@ def analyze_products( target_lang: str = "zh", batch_size: Optional[int] = None, tenant_id: Optional[str] = None, + analysis_kind: str = "content", ) -> List[Dict[str, Any]]: """ 库调用入口:根据输入+语言,返回锚文本及各维度信息。 @@ -937,6 +1162,7 @@ def analyze_products( if not products: return [] + _get_analysis_schema(analysis_kind) results_by_index: List[Optional[Dict[str, Any]]] = [None] * len(products) uncached_items: List[Tuple[int, Dict[str, str]]] = [] @@ -946,11 +1172,11 @@ def analyze_products( uncached_items.append((idx, product)) continue - cached = _get_cached_anchor_result(product, target_lang) + cached = _get_cached_analysis_result(product, target_lang, analysis_kind) if cached: logger.info( f"[analyze_products] Cache hit for title='{title[:50]}...', " - f"lang={target_lang}" + f"kind={analysis_kind}, lang={target_lang}" ) results_by_index[idx] = cached continue @@ -979,9 +1205,14 @@ def analyze_products( for batch_num, batch_slice, batch in batch_jobs: logger.info( f"[analyze_products] Processing batch {batch_num}/{total_batches}, " - f"size={len(batch)}, target_lang={target_lang}" + f"size={len(batch)}, kind={analysis_kind}, target_lang={target_lang}" + ) + batch_results = process_batch( + batch, + batch_num=batch_num, + target_lang=target_lang, + analysis_kind=analysis_kind, ) - batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) for (original_idx, product), item in zip(batch_slice, batch_results): results_by_index[original_idx] = item @@ -992,7 +1223,7 @@ def analyze_products( # 不缓存错误结果,避免放大临时故障 continue try: - _set_cached_anchor_result(product, target_lang, item) + _set_cached_analysis_result(product, target_lang, item, analysis_kind) except Exception: # 已在内部记录 warning pass @@ -1000,10 +1231,11 @@ def analyze_products( max_workers = min(CONTENT_UNDERSTANDING_MAX_WORKERS, len(batch_jobs)) logger.info( "[analyze_products] Using ThreadPoolExecutor for uncached batches: " - "max_workers=%s, total_batches=%s, bs=%s, target_lang=%s", + "max_workers=%s, total_batches=%s, bs=%s, kind=%s, target_lang=%s", max_workers, total_batches, bs, + analysis_kind, target_lang, ) @@ -1013,7 +1245,11 @@ def analyze_products( future_by_batch_num: Dict[int, Any] = {} for batch_num, _batch_slice, batch in batch_jobs: future_by_batch_num[batch_num] = executor.submit( - process_batch, batch, batch_num=batch_num, target_lang=target_lang + process_batch, + batch, + batch_num=batch_num, + target_lang=target_lang, + analysis_kind=analysis_kind, ) # 按 batch_num 回填,确保输出稳定(results_by_index 是按原始 input index 映射的) @@ -1028,7 +1264,7 @@ def analyze_products( # 不缓存错误结果,避免放大临时故障 continue try: - _set_cached_anchor_result(product, target_lang, item) + _set_cached_analysis_result(product, target_lang, item, analysis_kind) except Exception: # 已在内部记录 warning pass diff --git a/indexer/product_enrich_prompts.py b/indexer/product_enrich_prompts.py index b6314ec..704ad09 100644 --- a/indexer/product_enrich_prompts.py +++ b/indexer/product_enrich_prompts.py @@ -33,6 +33,110 @@ Input product list: USER_INSTRUCTION_TEMPLATE = """Please strictly return a Markdown table following the given columns in the specified language. For any column containing multiple values, separate them with commas. Do not add any other explanation. Language: {language}""" +TAXONOMY_SHARED_ANALYSIS_INSTRUCTION = """Analyze each input product text and fill the columns below using an apparel attribute taxonomy. + +Output columns: +1. Product Type: concise ecommerce apparel category label, not a full marketing title +2. Target Gender: intended gender only if clearly implied +3. Age Group: only if clearly implied, e.g. adults, kids, teens, toddlers, babies +4. Season: season(s) or all-season suitability only if supported +5. Fit: body closeness, e.g. slim, regular, relaxed, oversized, fitted +6. Silhouette: overall garment shape, e.g. straight, A-line, boxy, tapered, bodycon, wide-leg +7. Neckline: neckline type when applicable, e.g. crew neck, V-neck, hooded, collared, square neck +8. Sleeve Length Type: sleeve length only, e.g. sleeveless, short sleeve, long sleeve, three-quarter sleeve +9. Sleeve Style: sleeve design only, e.g. puff sleeve, raglan sleeve, batwing sleeve, bell sleeve +10. Strap Type: strap design when applicable, e.g. spaghetti strap, wide strap, halter strap, adjustable strap +11. Rise / Waistline: waist placement when applicable, e.g. high rise, mid rise, low rise, empire waist +12. Leg Shape: for bottoms only, e.g. straight leg, wide leg, flare leg, tapered leg, skinny leg +13. Skirt Shape: for skirts only, e.g. A-line, pleated, pencil, mermaid +14. Length Type: design length only, not size, e.g. cropped, regular, longline, mini, midi, maxi, ankle length, full length +15. Closure Type: fastening method when applicable, e.g. zipper, button, drawstring, elastic waist, hook-and-loop +16. Design Details: construction or visual details, e.g. ruched, ruffled, pleated, cut-out, layered, distressed, split hem +17. Fabric: fabric type only, e.g. denim, knit, chiffon, jersey, fleece, cotton twill +18. Material Composition: fiber content or blend only if stated, e.g. cotton, polyester, spandex, linen blend, 95% cotton 5% elastane +19. Fabric Properties: inherent fabric traits, e.g. stretch, breathable, lightweight, soft-touch, water-resistant +20. Clothing Features: product features, e.g. lined, reversible, hooded, packable, padded, pocketed +21. Functional Benefits: wearer benefits, e.g. moisture-wicking, thermal insulation, UV protection, easy care, supportive compression +22. Color: specific color name when available +23. Color Family: normalized broad retail color group, e.g. black, white, blue, green, red, pink, beige, brown, gray +24. Print / Pattern: surface pattern when applicable, e.g. solid, striped, plaid, floral, graphic, animal print +25. Occasion / End Use: likely use occasion only if supported, e.g. office, casual wear, streetwear, lounge, workout, outdoor +26. Style Aesthetic: overall style only if supported, e.g. minimalist, streetwear, athleisure, smart casual, romantic, playful + +Rules: +- Keep the same row order and row count as input. +- Infer only from the provided product text. +- Leave blank if not applicable or not reasonably supported. +- Use concise, standardized ecommerce wording. +- Do not combine different attribute dimensions in one field. +- If multiple values are needed, use the delimiter required by the localization setting. + +Input product list: +""" + +TAXONOMY_MARKDOWN_TABLE_HEADERS_EN = [ + "No.", + "Product Type", + "Target Gender", + "Age Group", + "Season", + "Fit", + "Silhouette", + "Neckline", + "Sleeve Length Type", + "Sleeve Style", + "Strap Type", + "Rise / Waistline", + "Leg Shape", + "Skirt Shape", + "Length Type", + "Closure Type", + "Design Details", + "Fabric", + "Material Composition", + "Fabric Properties", + "Clothing Features", + "Functional Benefits", + "Color", + "Color Family", + "Print / Pattern", + "Occasion / End Use", + "Style Aesthetic", +] + +TAXONOMY_LANGUAGE_MARKDOWN_TABLE_HEADERS: Dict[str, Dict[str, Any]] = { + "en": TAXONOMY_MARKDOWN_TABLE_HEADERS_EN, + "zh": [ + "序号", + "品类", + "目标性别", + "年龄段", + "适用季节", + "版型", + "廓形", + "领型", + "袖长类型", + "袖型", + "肩带设计", + "腰型", + "裤型", + "裙型", + "长度类型", + "闭合方式", + "设计细节", + "面料", + "成分", + "面料特性", + "服装特征", + "功能", + "主颜色", + "色系", + "印花 / 图案", + "适用场景", + "风格", + ], +} + LANGUAGE_MARKDOWN_TABLE_HEADERS: Dict[str, Dict[str, Any]] = { "en": [ "No.", diff --git a/tests/test_llm_enrichment_batch_fill.py b/tests/test_llm_enrichment_batch_fill.py index 8cb006e..eee5b20 100644 --- a/tests/test_llm_enrichment_batch_fill.py +++ b/tests/test_llm_enrichment_batch_fill.py @@ -19,10 +19,13 @@ def test_fill_llm_attributes_batch_uses_product_enrich_helper(monkeypatch): "zh": [f"zh-anchor-{item['id']}"], "en": [f"en-anchor-{item['id']}"], }, - "tags": {"zh": ["t1", "t2"], "en": ["t1", "t2"]}, + "enriched_tags": {"zh": ["t1", "t2"], "en": ["t1", "t2"]}, "enriched_attributes": [ {"name": "tags", "value": {"zh": ["t1"], "en": ["t1"]}}, ], + "enriched_taxonomy_attributes": [ + {"name": "Product Type", "value": {"zh": ["连衣裙"], "en": ["dress"]}}, + ], } for item in items ] @@ -54,6 +57,10 @@ def test_fill_llm_attributes_batch_uses_product_enrich_helper(monkeypatch): assert docs[0]["qanchors"]["zh"] == ["zh-anchor-0"] assert docs[0]["qanchors"]["en"] == ["en-anchor-0"] - assert docs[0]["tags"]["zh"] == ["t1", "t2"] - assert docs[0]["tags"]["en"] == ["t1", "t2"] + assert docs[0]["enriched_tags"]["zh"] == ["t1", "t2"] + assert docs[0]["enriched_tags"]["en"] == ["t1", "t2"] assert {"name": "tags", "value": {"zh": ["t1"], "en": ["t1"]}} in docs[0]["enriched_attributes"] + assert { + "name": "Product Type", + "value": {"zh": ["连衣裙"], "en": ["dress"]}, + } in docs[0]["enriched_taxonomy_attributes"] diff --git a/tests/test_process_products_batching.py b/tests/test_process_products_batching.py index 319ce88..d39f1d3 100644 --- a/tests/test_process_products_batching.py +++ b/tests/test_process_products_batching.py @@ -13,7 +13,13 @@ def test_analyze_products_caps_batch_size_to_20(monkeypatch): monkeypatch.setattr(process_products, "API_KEY", "fake-key") seen_batch_sizes: List[int] = [] - def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"): + def _fake_process_batch( + batch_data: List[Dict[str, str]], + batch_num: int, + target_lang: str = "zh", + analysis_kind: str = "content", + ): + assert analysis_kind == "content" seen_batch_sizes.append(len(batch_data)) return [ { @@ -35,7 +41,7 @@ def test_analyze_products_caps_batch_size_to_20(monkeypatch): ] monkeypatch.setattr(process_products, "process_batch", _fake_process_batch) - monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None) + monkeypatch.setattr(process_products, "_set_cached_analysis_result", lambda *args, **kwargs: None) out = process_products.analyze_products( products=_mk_products(45), @@ -53,7 +59,13 @@ def test_analyze_products_uses_min_batch_size_1(monkeypatch): monkeypatch.setattr(process_products, "API_KEY", "fake-key") seen_batch_sizes: List[int] = [] - def _fake_process_batch(batch_data: List[Dict[str, str]], batch_num: int, target_lang: str = "zh"): + def _fake_process_batch( + batch_data: List[Dict[str, str]], + batch_num: int, + target_lang: str = "zh", + analysis_kind: str = "content", + ): + assert analysis_kind == "content" seen_batch_sizes.append(len(batch_data)) return [ { @@ -75,7 +87,7 @@ def test_analyze_products_uses_min_batch_size_1(monkeypatch): ] monkeypatch.setattr(process_products, "process_batch", _fake_process_batch) - monkeypatch.setattr(process_products, "_set_cached_anchor_result", lambda *args, **kwargs: None) + monkeypatch.setattr(process_products, "_set_cached_analysis_result", lambda *args, **kwargs: None) out = process_products.analyze_products( products=_mk_products(3), diff --git a/tests/test_product_enrich_partial_mode.py b/tests/test_product_enrich_partial_mode.py index 756cfbb..2382ff8 100644 --- a/tests/test_product_enrich_partial_mode.py +++ b/tests/test_product_enrich_partial_mode.py @@ -74,6 +74,28 @@ def test_create_prompt_splits_shared_context_and_localized_tail(): assert prefix_en.startswith("| No. | Product title | Category path |") +def test_create_prompt_supports_taxonomy_analysis_kind(): + products = [{"id": "1", "title": "linen dress"}] + + shared_zh, user_zh, prefix_zh = product_enrich.create_prompt( + products, + target_lang="zh", + analysis_kind="taxonomy", + ) + shared_fr, user_fr, prefix_fr = product_enrich.create_prompt( + products, + target_lang="fr", + analysis_kind="taxonomy", + ) + + assert "apparel attribute taxonomy" in shared_zh + assert "1. linen dress" in shared_zh + assert "Language: Chinese" in user_zh + assert "Language: French" in user_fr + assert prefix_zh.startswith("| 序号 | 品类 | 目标性别 |") + assert prefix_fr.startswith("| No. | Product Type | Target Gender |") + + def test_call_llm_logs_shared_context_once_and_verbose_contains_full_requests(): payloads = [] response_bodies = [ @@ -228,6 +250,38 @@ def test_process_batch_reads_result_and_validates_expected_fields(): assert row["anchor_text"] == "法式收腰连衣裙" +def test_process_batch_reads_taxonomy_result_with_schema_specific_fields(): + merged_markdown = """| 序号 | 品类 | 目标性别 | 年龄段 | 适用季节 | 版型 | 廓形 | 领型 | 袖长类型 | 袖型 | 肩带设计 | 腰型 | 裤型 | 裙型 | 长度类型 | 闭合方式 | 设计细节 | 面料 | 成分 | 面料特性 | 服装特征 | 功能 | 主颜色 | 色系 | 印花 / 图案 | 适用场景 | 风格 | +|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----| +| 1 | 连衣裙 | 女 | 成人 | 春季,夏季 | 修身 | A字 | V领 | 无袖 | | 细肩带 | 高腰 | | A字裙 | 中长款 | 拉链 | 褶皱 | 梭织 | 聚酯纤维,氨纶 | 轻薄,透气 | 有内衬 | 易打理 | 酒红色 | 红色 | 纯色 | 约会,度假 | 浪漫 | +""" + + with mock.patch.object( + product_enrich, + "call_llm", + return_value=(merged_markdown, json.dumps({"choices": [{"message": {"content": "stub"}}]})), + ): + results = product_enrich.process_batch( + [{"id": "sku-1", "title": "dress"}], + batch_num=1, + target_lang="zh", + analysis_kind="taxonomy", + ) + + assert len(results) == 1 + row = results[0] + assert row["id"] == "sku-1" + assert row["lang"] == "zh" + assert row["title_input"] == "dress" + assert row["product_type"] == "连衣裙" + assert row["target_gender"] == "女" + assert row["age_group"] == "成人" + assert row["sleeve_length_type"] == "无袖" + assert row["material_composition"] == "聚酯纤维,氨纶" + assert row["occasion_end_use"] == "约会,度假" + assert row["style_aesthetic"] == "浪漫" + + def test_analyze_products_uses_product_level_cache_across_batch_requests(): cache_store = {} process_calls = [] @@ -241,13 +295,16 @@ def test_analyze_products_uses_product_level_cache_across_batch_requests(): product.get("image_url", ""), ) - def fake_get_cached_anchor_result(product, target_lang): + def fake_get_cached_analysis_result(product, target_lang, analysis_kind="content"): + assert analysis_kind == "content" return cache_store.get(_cache_key(product, target_lang)) - def fake_set_cached_anchor_result(product, target_lang, result): + def fake_set_cached_analysis_result(product, target_lang, result, analysis_kind="content"): + assert analysis_kind == "content" cache_store[_cache_key(product, target_lang)] = result - def fake_process_batch(batch_data, batch_num, target_lang="zh"): + def fake_process_batch(batch_data, batch_num, target_lang="zh", analysis_kind="content"): + assert analysis_kind == "content" process_calls.append( { "batch_num": batch_num, @@ -281,12 +338,12 @@ def test_analyze_products_uses_product_level_cache_across_batch_requests(): with mock.patch.object(product_enrich, "API_KEY", "fake-key"), mock.patch.object( product_enrich, - "_get_cached_anchor_result", - side_effect=fake_get_cached_anchor_result, + "_get_cached_analysis_result", + side_effect=fake_get_cached_analysis_result, ), mock.patch.object( product_enrich, - "_set_cached_anchor_result", - side_effect=fake_set_cached_anchor_result, + "_set_cached_analysis_result", + side_effect=fake_set_cached_analysis_result, ), mock.patch.object( product_enrich, "process_batch", @@ -342,11 +399,12 @@ def test_analyze_products_reuses_cached_content_with_current_product_identity(): with mock.patch.object(product_enrich, "API_KEY", "fake-key"), mock.patch.object( product_enrich, - "_get_cached_anchor_result", - wraps=lambda product, target_lang: product_enrich._normalize_analysis_result( + "_get_cached_analysis_result", + wraps=lambda product, target_lang, analysis_kind="content": product_enrich._normalize_analysis_result( cached_result, product=product, target_lang=target_lang, + schema=product_enrich._get_analysis_schema("content"), ), ), mock.patch.object( product_enrich, @@ -379,7 +437,47 @@ def test_analyze_products_reuses_cached_content_with_current_product_identity(): def test_build_index_content_fields_maps_internal_tags_to_enriched_tags_output(): - def fake_analyze_products(products, target_lang="zh", batch_size=None, tenant_id=None): + def fake_analyze_products( + products, + target_lang="zh", + batch_size=None, + tenant_id=None, + analysis_kind="content", + ): + if analysis_kind == "taxonomy": + return [ + { + "id": products[0]["id"], + "lang": target_lang, + "title_input": products[0]["title"], + "product_type": f"{target_lang}-dress", + "target_gender": f"{target_lang}-women", + "age_group": "", + "season": f"{target_lang}-summer", + "fit": "", + "silhouette": "", + "neckline": "", + "sleeve_length_type": "", + "sleeve_style": "", + "strap_type": "", + "rise_waistline": "", + "leg_shape": "", + "skirt_shape": "", + "length_type": "", + "closure_type": "", + "design_details": "", + "fabric": "", + "material_composition": "", + "fabric_properties": "", + "clothing_features": "", + "functional_benefits": "", + "color": "", + "color_family": "", + "print_pattern": "", + "occasion_end_use": "", + "style_aesthetic": "", + } + ] return [ { "id": products[0]["id"], @@ -423,6 +521,20 @@ def test_build_index_content_fields_maps_internal_tags_to_enriched_tags_output() }, {"name": "target_audience", "value": {"zh": ["zh-audience"], "en": ["en-audience"]}}, ], + "enriched_taxonomy_attributes": [ + { + "name": "Product Type", + "value": {"zh": ["zh-dress"], "en": ["en-dress"]}, + }, + { + "name": "Target Gender", + "value": {"zh": ["zh-women"], "en": ["en-women"]}, + }, + { + "name": "Season", + "value": {"zh": ["zh-summer"], "en": ["en-summer"]}, + }, + ], } ] -- libgit2 0.21.2