""" SPU文档转换器 - 公共转换逻辑。 提取全量和增量索引共用的文档转换逻辑,避免代码冗余。 输出文档结构与 mappings/search_products.json 及 索引字段说明v2 一致, 供 search/searcher 与 search/es_query_builder 使用。 - 多语言字段:title, brief, description, vendor, category_path, category_name_text - 嵌套:specifications, skus;向量:title_embedding(image_embedding 由 Java 索引或外部填充) """ import pandas as pd import numpy as np import logging from typing import Dict, Any, Optional, List from config import ConfigLoader logger = logging.getLogger(__name__) # Try to import translator (optional dependency) try: from query.translator import Translator TRANSLATOR_AVAILABLE = True except ImportError: TRANSLATOR_AVAILABLE = False Translator = None class SPUDocumentTransformer: """SPU文档转换器,将SPU、SKU、Option数据转换为ES文档格式。""" def __init__( self, category_id_to_name: Dict[str, str], searchable_option_dimensions: List[str], tenant_config: Optional[Dict[str, Any]] = None, translator: Optional[Any] = None, translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, enable_title_embedding: bool = True ): """ 初始化文档转换器。 Args: category_id_to_name: 分类ID到名称的映射 searchable_option_dimensions: 可搜索的option维度列表 tenant_config: 租户配置(包含主语言和翻译配置) translator: 翻译器实例(可选,如果提供则启用翻译功能) translation_prompts: 翻译提示词配置(可选) encoder: 文本编码器实例(可选,用于生成title_embedding) enable_title_embedding: 是否启用标题向量化(默认True) """ self.category_id_to_name = category_id_to_name self.searchable_option_dimensions = searchable_option_dimensions self.tenant_config = tenant_config or {} self.translator = translator self.translation_prompts = translation_prompts or {} self.encoder = encoder self.enable_title_embedding = enable_title_embedding def transform_spu_to_doc( self, tenant_id: str, spu_row: pd.Series, skus: pd.DataFrame, options: pd.DataFrame ) -> Optional[Dict[str, Any]]: """ 将单个SPU行和其SKUs转换为ES文档。 Args: tenant_id: 租户ID spu_row: SPU行数据 skus: SKU数据DataFrame options: Option数据DataFrame Returns: ES文档字典 """ doc = {} # Tenant ID (required) doc['tenant_id'] = str(tenant_id) # SPU ID spu_id = spu_row['id'] doc['spu_id'] = str(spu_id) # Validate required fields if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip(): logger.error(f"SPU {spu_id} has no title, this may cause search issues") # 获取租户配置 primary_lang = self.tenant_config.get('primary_language', 'en') # 文本字段处理(使用translator的内部逻辑自动处理多语言翻译) self._fill_text_fields(doc, spu_row, primary_lang) # 标题向量化处理(如果启用) if self.enable_title_embedding and self.encoder: self._fill_title_embedding(doc) # Tags if pd.notna(spu_row.get('tags')): tags_str = str(spu_row['tags']) doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Category相关字段 self._fill_category_fields(doc, spu_row) # Option名称(从option表获取) self._fill_option_names(doc, options) # Image URL self._fill_image_url(doc, spu_row) # Sales (fake_sales) if pd.notna(spu_row.get('fake_sales')): try: doc['sales'] = int(spu_row['fake_sales']) except (ValueError, TypeError): doc['sales'] = 0 else: doc['sales'] = 0 # Process SKUs and build specifications skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications = \ self._process_skus(skus, options) doc['skus'] = skus_list doc['specifications'] = specifications # 提取option值(根据配置的searchable_option_dimensions) self._fill_option_values(doc, skus) # Calculate price ranges if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 # 优先使用 SPU 级 compare_at_price(与索引字段说明v2一致),否则取 SKU 最大值 if pd.notna(spu_row.get('compare_at_price')): try: doc['compare_at_price'] = float(spu_row['compare_at_price']) except (ValueError, TypeError): doc['compare_at_price'] = float(max(compare_prices)) if compare_prices else None elif compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None # SKU扁平化字段 doc['sku_prices'] = sku_prices doc['sku_weights'] = sku_weights doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 doc['total_inventory'] = total_inventory # Time fields - convert datetime to ISO format string for ES DATE type if pd.notna(spu_row.get('create_time')): create_time = spu_row['create_time'] if hasattr(create_time, 'isoformat'): doc['create_time'] = create_time.isoformat() else: doc['create_time'] = str(create_time) if pd.notna(spu_row.get('update_time')): update_time = spu_row['update_time'] if hasattr(update_time, 'isoformat'): doc['update_time'] = update_time.isoformat() else: doc['update_time'] = str(update_time) return doc def _fill_text_fields( self, doc: Dict[str, Any], spu_row: pd.Series, primary_lang: str ): """ 填充文本字段(根据租户 index_languages 处理多语言翻译)。 仅写入 primary_language 及 index_languages 中配置的语言。 """ index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] def _set_lang_obj(field_name: str, source_text: Optional[str], translations: Optional[Dict[str, Optional[str]]] = None): """写入多语言对象 doc[field_name] = {"zh": "...", "en": "...", ...},仅包含 index_languages。""" if not source_text or not str(source_text).strip(): return obj: Dict[str, str] = {} src = str(source_text) obj[primary_lang] = src tr = translations or {} for lang in index_langs: if lang == primary_lang: continue val = tr.get(lang) if val and str(val).strip(): obj[lang] = str(val) if obj: doc[field_name] = obj # Title if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) translations: Dict[str, Optional[str]] = {} if self.translator: prompt_zh = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') prompt_en = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( title_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt_zh if primary_lang == 'zh' else prompt_en, index_languages=index_langs, ) or {} _set_lang_obj("title", title_text, translations) # Brief if pd.notna(spu_row.get('brief')): brief_text = str(spu_row['brief']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( brief_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("brief", brief_text, translations) # Description if pd.notna(spu_row.get('description')): desc_text = str(spu_row['description']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( desc_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("description", desc_text, translations) # Vendor if pd.notna(spu_row.get('vendor')): vendor_text = str(spu_row['vendor']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( vendor_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("vendor", vendor_text, translations) def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): """填充类目相关字段。""" # 数据质量兜底: # - 当商品的类目ID在映射中不存在时,视为“不合法类目”,整条类目相关字段都不写入(当成没有类目) # - 仅记录错误日志,不阻塞索引流程 primary_lang = self.tenant_config.get('primary_language', 'en') if pd.notna(spu_row.get('category_path')): category_path = str(spu_row['category_path']) # 解析category_path - 这是逗号分隔的类目ID列表 category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] # 将ID映射为名称,如果找不到映射则记录错误并跳过 category_names = [] missing_ids = [] for cid in category_ids: if cid in self.category_id_to_name: category_names.append(self.category_id_to_name[cid]) else: missing_ids.append(cid) # 如果有缺失的类目ID,记录错误日志,不写入类目字段(当成没有类目) if missing_ids: logger.error( f"Category ID(s) not found in mapping for SPU {spu_row.get('id')} " f"(title: {spu_row.get('title', 'N/A')}), missing_ids={missing_ids}, " f"category_path={category_path}. Treating as no-category." ) return # 构建类目路径字符串(用于搜索) if category_names: category_path_str = '/'.join(category_names) doc['category_path'] = {primary_lang: category_path_str} # 与查询使用的 category_name_text.zh/en 对齐,便于类目搜索 doc['category_name_text'] = {primary_lang: category_path_str} # 填充分层类目名称 if len(category_names) > 0: doc['category1_name'] = category_names[0] if len(category_names) > 1: doc['category2_name'] = category_names[1] if len(category_names) > 2: doc['category3_name'] = category_names[2] elif pd.notna(spu_row.get('category')): # 如果category_path为空,使用category字段作为category1_name的备选 category = str(spu_row['category']) doc['category_name_text'] = {primary_lang: category} doc['category_name'] = category # 尝试从category字段解析多级分类 if '/' in category: path_parts = category.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() else: # 如果category不包含"/",直接作为category1_name doc['category1_name'] = category.strip() if pd.notna(spu_row.get('category')): # 确保category相关字段都被设置(如果前面没有设置) category_name = str(spu_row['category']) if 'category_name_text' not in doc: doc['category_name_text'] = {primary_lang: category_name} if 'category_name' not in doc: doc['category_name'] = category_name if pd.notna(spu_row.get('category_id')): doc['category_id'] = str(int(spu_row['category_id'])) if pd.notna(spu_row.get('category_level')): doc['category_level'] = int(spu_row['category_level']) def _fill_option_names(self, doc: Dict[str, Any], options: pd.DataFrame): """填充Option名称字段。""" if not options.empty: # 按position排序获取option名称 sorted_options = options.sort_values('position') if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): doc['option1_name'] = str(sorted_options.iloc[0]['name']) if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): doc['option2_name'] = str(sorted_options.iloc[1]['name']) if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): doc['option3_name'] = str(sorted_options.iloc[2]['name']) def _fill_image_url(self, doc: Dict[str, Any], spu_row: pd.Series): """填充图片URL字段。""" if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): # 仅当尚未是协议相对 URL 时才补 "//",避免 "//host" 变成 "////host" image_src = f"//{image_src}" if not image_src.startswith('//') else image_src doc['image_url'] = image_src def _process_skus( self, skus: pd.DataFrame, options: pd.DataFrame ) -> tuple: """处理SKU数据,返回处理结果。""" skus_list = [] prices = [] compare_prices = [] sku_prices = [] sku_weights = [] sku_weight_units = [] total_inventory = 0 specifications = [] # 构建option名称映射(position -> name) option_name_map = {} if not options.empty: for _, opt_row in options.iterrows(): position = opt_row.get('position') name = opt_row.get('name') if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: skus_list.append(sku_data) # 收集价格信息 if 'price' in sku_data and sku_data['price'] is not None: try: price_val = float(sku_data['price']) prices.append(price_val) sku_prices.append(price_val) except (ValueError, TypeError): pass if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: try: compare_prices.append(float(sku_data['compare_at_price'])) except (ValueError, TypeError): pass # 收集重量信息 if 'weight' in sku_data and sku_data['weight'] is not None: try: sku_weights.append(int(float(sku_data['weight']))) except (ValueError, TypeError): pass if 'weight_unit' in sku_data and sku_data['weight_unit']: sku_weight_units.append(str(sku_data['weight_unit'])) # 收集库存信息 if 'stock' in sku_data and sku_data['stock'] is not None: try: total_inventory += int(sku_data['stock']) except (ValueError, TypeError): pass # 构建specifications(从SKU的option值和option表的name) sku_id = str(sku_row['id']) if pd.notna(sku_row.get('option1')) and 1 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[1], 'value': str(sku_row['option1']) }) if pd.notna(sku_row.get('option2')) and 2 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[2], 'value': str(sku_row['option2']) }) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[3], 'value': str(sku_row['option3']) }) return skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications def _fill_option_values(self, doc: Dict[str, Any], skus: pd.DataFrame): """填充option值字段。""" option1_values = [] option2_values = [] option3_values = [] for _, sku_row in skus.iterrows(): if pd.notna(sku_row.get('option1')): option1_values.append(str(sku_row['option1'])) if pd.notna(sku_row.get('option2')): option2_values.append(str(sku_row['option2'])) if pd.notna(sku_row.get('option3')): option3_values.append(str(sku_row['option3'])) # 去重并根据配置决定是否写入索引 if 'option1' in self.searchable_option_dimensions: doc['option1_values'] = list(set(option1_values)) if option1_values else [] else: doc['option1_values'] = [] if 'option2' in self.searchable_option_dimensions: doc['option2_values'] = list(set(option2_values)) if option2_values else [] else: doc['option2_values'] = [] if 'option3' in self.searchable_option_dimensions: doc['option3_values'] = list(set(option3_values)) if option3_values else [] else: doc['option3_values'] = [] def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ 将SKU行转换为SKU对象。 Args: sku_row: SKU行数据 option_name_map: position到option名称的映射 Returns: SKU字典 """ sku_data = {} # SKU ID sku_data['sku_id'] = str(sku_row['id']) # Price if pd.notna(sku_row.get('price')): try: sku_data['price'] = float(sku_row['price']) except (ValueError, TypeError): sku_data['price'] = None else: sku_data['price'] = None # Compare at price if pd.notna(sku_row.get('compare_at_price')): try: sku_data['compare_at_price'] = float(sku_row['compare_at_price']) except (ValueError, TypeError): sku_data['compare_at_price'] = None else: sku_data['compare_at_price'] = None # SKU Code if pd.notna(sku_row.get('sku')): sku_data['sku_code'] = str(sku_row['sku']) # Stock if pd.notna(sku_row.get('inventory_quantity')): try: sku_data['stock'] = int(sku_row['inventory_quantity']) except (ValueError, TypeError): sku_data['stock'] = 0 else: sku_data['stock'] = 0 # Weight if pd.notna(sku_row.get('weight')): try: sku_data['weight'] = float(sku_row['weight']) except (ValueError, TypeError): sku_data['weight'] = None else: sku_data['weight'] = None # Weight unit if pd.notna(sku_row.get('weight_unit')): sku_data['weight_unit'] = str(sku_row['weight_unit']) # Option values if pd.notna(sku_row.get('option1')): sku_data['option1_value'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): sku_data['option2_value'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): sku_data['option3_value'] = str(sku_row['option3']) # Image src if pd.notna(sku_row.get('image_src')): sku_data['image_src'] = str(sku_row['image_src']) return sku_data def _fill_title_embedding(self, doc: Dict[str, Any]) -> None: """ 填充标题向量化字段。 使用英文标题(title.en)生成embedding。如果title.en不存在,则使用title.zh。 Args: doc: ES文档字典 """ # 优先使用英文标题,如果没有则使用中文标题;再没有则取任意可用语言 title_obj = doc.get("title") or {} if isinstance(title_obj, dict): title_text = title_obj.get("en") or title_obj.get("zh") if not title_text: for v in title_obj.values(): if v and str(v).strip(): title_text = str(v) break else: title_text = None if not title_text or not title_text.strip(): logger.debug(f"No title text available for embedding, SPU: {doc.get('spu_id')}") return try: # 使用BgeEncoder生成embedding # encode方法返回numpy数组,形状为(n, 1024) embeddings = self.encoder.encode(title_text) if embeddings is not None and len(embeddings) > 0: # 取第一个embedding(因为只传了一个文本) embedding = embeddings[0] if not isinstance(embedding, np.ndarray): logger.warning(f"Embedding is None/invalid for title: {title_text[:50]}...") return # 转换为列表格式(ES需要) doc['title_embedding'] = embedding.tolist() logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...") else: logger.warning(f"Failed to generate embedding for title: {title_text[:50]}...") except Exception as e: logger.error(f"Error generating title_embedding for SPU {doc.get('spu_id')}: {e}", exc_info=True)