""" SPU文档转换器 - 公共转换逻辑。 提取全量和增量索引共用的文档转换逻辑,避免代码冗余。 """ import pandas as pd import logging from typing import Dict, Any, Optional, List from config import ConfigLoader logger = logging.getLogger(__name__) # Try to import translator (optional dependency) try: from query.translator import Translator TRANSLATOR_AVAILABLE = True except ImportError: TRANSLATOR_AVAILABLE = False Translator = None class SPUDocumentTransformer: """SPU文档转换器,将SPU、SKU、Option数据转换为ES文档格式。""" def __init__( self, category_id_to_name: Dict[str, str], searchable_option_dimensions: List[str], tenant_config: Optional[Dict[str, Any]] = None, translator: Optional[Any] = None, translation_prompts: Optional[Dict[str, str]] = None ): """ 初始化文档转换器。 Args: category_id_to_name: 分类ID到名称的映射 searchable_option_dimensions: 可搜索的option维度列表 tenant_config: 租户配置(包含主语言和翻译配置) translator: 翻译器实例(可选,如果提供则启用翻译功能) translation_prompts: 翻译提示词配置(可选) """ self.category_id_to_name = category_id_to_name self.searchable_option_dimensions = searchable_option_dimensions self.tenant_config = tenant_config or {} self.translator = translator self.translation_prompts = translation_prompts or {} def transform_spu_to_doc( self, tenant_id: str, spu_row: pd.Series, skus: pd.DataFrame, options: pd.DataFrame ) -> Optional[Dict[str, Any]]: """ 将单个SPU行和其SKUs转换为ES文档。 Args: tenant_id: 租户ID spu_row: SPU行数据 skus: SKU数据DataFrame options: Option数据DataFrame Returns: ES文档字典 """ doc = {} # Tenant ID (required) doc['tenant_id'] = str(tenant_id) # SPU ID spu_id = spu_row['id'] doc['spu_id'] = str(spu_id) # Validate required fields if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip(): logger.error(f"SPU {spu_id} has no title, this may cause search issues") # 获取租户配置 primary_lang = self.tenant_config.get('primary_language', 'zh') translate_to_en = self.tenant_config.get('translate_to_en', True) translate_to_zh = self.tenant_config.get('translate_to_zh', False) # 文本字段处理(根据主语言和翻译配置) self._fill_text_fields(doc, spu_row, primary_lang, translate_to_en, translate_to_zh) # Tags if pd.notna(spu_row.get('tags')): tags_str = str(spu_row['tags']) doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Category相关字段 self._fill_category_fields(doc, spu_row) # Option名称(从option表获取) self._fill_option_names(doc, options) # Image URL self._fill_image_url(doc, spu_row) # Sales (fake_sales) if pd.notna(spu_row.get('fake_sales')): try: doc['sales'] = int(spu_row['fake_sales']) except (ValueError, TypeError): doc['sales'] = 0 else: doc['sales'] = 0 # Process SKUs and build specifications skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications = \ self._process_skus(skus, options) doc['skus'] = skus_list doc['specifications'] = specifications # 提取option值(根据配置的searchable_option_dimensions) self._fill_option_values(doc, skus) # Calculate price ranges if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None # SKU扁平化字段 doc['sku_prices'] = sku_prices doc['sku_weights'] = sku_weights doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 doc['total_inventory'] = total_inventory # Time fields - convert datetime to ISO format string for ES DATE type if pd.notna(spu_row.get('create_time')): create_time = spu_row['create_time'] if hasattr(create_time, 'isoformat'): doc['create_time'] = create_time.isoformat() else: doc['create_time'] = str(create_time) if pd.notna(spu_row.get('update_time')): update_time = spu_row['update_time'] if hasattr(update_time, 'isoformat'): doc['update_time'] = update_time.isoformat() else: doc['update_time'] = str(update_time) return doc def _fill_text_fields( self, doc: Dict[str, Any], spu_row: pd.Series, primary_lang: str, translate_to_en: bool, translate_to_zh: bool ): """填充文本字段(根据主语言和翻译配置)。""" # 主语言字段 primary_suffix = '_zh' if primary_lang == 'zh' else '_en' secondary_suffix = '_en' if primary_lang == 'zh' else '_zh' # Title if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) doc[f'title{primary_suffix}'] = title_text # 如果需要翻译,调用翻译服务(同步模式) if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): if self.translator: target_lang = 'en' if primary_lang == 'zh' else 'zh' # 根据目标语言选择对应的提示词 if target_lang == 'zh': prompt = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') else: prompt = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') translated = self.translator.translate( title_text, target_lang=target_lang, source_lang=primary_lang, prompt=prompt ) doc[f'title{secondary_suffix}'] = translated if translated else None else: doc[f'title{secondary_suffix}'] = None # 无翻译器,设为None else: doc[f'title{secondary_suffix}'] = None else: doc[f'title{primary_suffix}'] = None doc[f'title{secondary_suffix}'] = None # Brief if pd.notna(spu_row.get('brief')): brief_text = str(spu_row['brief']) doc[f'brief{primary_suffix}'] = brief_text if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): if self.translator: target_lang = 'en' if primary_lang == 'zh' else 'zh' # 根据目标语言选择对应的提示词 prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translated = self.translator.translate( brief_text, target_lang=target_lang, source_lang=primary_lang, prompt=prompt ) doc[f'brief{secondary_suffix}'] = translated if translated else None else: doc[f'brief{secondary_suffix}'] = None else: doc[f'brief{secondary_suffix}'] = None else: doc[f'brief{primary_suffix}'] = None doc[f'brief{secondary_suffix}'] = None # Description if pd.notna(spu_row.get('description')): desc_text = str(spu_row['description']) doc[f'description{primary_suffix}'] = desc_text if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): if self.translator: target_lang = 'en' if primary_lang == 'zh' else 'zh' # 根据目标语言选择对应的提示词 prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translated = self.translator.translate( desc_text, target_lang=target_lang, source_lang=primary_lang, prompt=prompt ) doc[f'description{secondary_suffix}'] = translated if translated else None else: doc[f'description{secondary_suffix}'] = None else: doc[f'description{secondary_suffix}'] = None else: doc[f'description{primary_suffix}'] = None doc[f'description{secondary_suffix}'] = None # Vendor if pd.notna(spu_row.get('vendor')): vendor_text = str(spu_row['vendor']) doc[f'vendor{primary_suffix}'] = vendor_text if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): if self.translator: target_lang = 'en' if primary_lang == 'zh' else 'zh' # 根据目标语言选择对应的提示词 prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translated = self.translator.translate( vendor_text, target_lang=target_lang, source_lang=primary_lang, prompt=prompt ) doc[f'vendor{secondary_suffix}'] = translated if translated else None else: doc[f'vendor{secondary_suffix}'] = None else: doc[f'vendor{secondary_suffix}'] = None else: doc[f'vendor{primary_suffix}'] = None doc[f'vendor{secondary_suffix}'] = None def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): """填充类目相关字段。""" if pd.notna(spu_row.get('category_path')): category_path = str(spu_row['category_path']) # 解析category_path - 这是逗号分隔的类目ID列表 category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] # 将ID映射为名称 category_names = [] for cid in category_ids: if cid in self.category_id_to_name: category_names.append(self.category_id_to_name[cid]) else: logger.error(f"Category ID {cid} not found in mapping for SPU {spu_row['id']} (title: {spu_row.get('title', 'N/A')}), category_path={category_path}") category_names.append(cid) # 使用ID作为备选 # 构建类目路径字符串(用于搜索) if category_names: category_path_str = '/'.join(category_names) doc['category_path_zh'] = category_path_str doc['category_path_en'] = None # 暂时设为空 # 填充分层类目名称 if len(category_names) > 0: doc['category1_name'] = category_names[0] if len(category_names) > 1: doc['category2_name'] = category_names[1] if len(category_names) > 2: doc['category3_name'] = category_names[2] elif pd.notna(spu_row.get('category')): # 如果category_path为空,使用category字段作为category1_name的备选 category = str(spu_row['category']) doc['category_name_zh'] = category doc['category_name_en'] = None doc['category_name'] = category # 尝试从category字段解析多级分类 if '/' in category: path_parts = category.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() else: # 如果category不包含"/",直接作为category1_name doc['category1_name'] = category.strip() if pd.notna(spu_row.get('category')): # 确保category相关字段都被设置(如果前面没有设置) category_name = str(spu_row['category']) if 'category_name_zh' not in doc: doc['category_name_zh'] = category_name if 'category_name_en' not in doc: doc['category_name_en'] = None if 'category_name' not in doc: doc['category_name'] = category_name if pd.notna(spu_row.get('category_id')): doc['category_id'] = str(int(spu_row['category_id'])) if pd.notna(spu_row.get('category_level')): doc['category_level'] = int(spu_row['category_level']) def _fill_option_names(self, doc: Dict[str, Any], options: pd.DataFrame): """填充Option名称字段。""" if not options.empty: # 按position排序获取option名称 sorted_options = options.sort_values('position') if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): doc['option1_name'] = str(sorted_options.iloc[0]['name']) if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): doc['option2_name'] = str(sorted_options.iloc[1]['name']) if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): doc['option3_name'] = str(sorted_options.iloc[2]['name']) def _fill_image_url(self, doc: Dict[str, Any], spu_row: pd.Series): """填充图片URL字段。""" if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): image_src = f"//{image_src}" if image_src.startswith('//') else image_src doc['image_url'] = image_src def _process_skus( self, skus: pd.DataFrame, options: pd.DataFrame ) -> tuple: """处理SKU数据,返回处理结果。""" skus_list = [] prices = [] compare_prices = [] sku_prices = [] sku_weights = [] sku_weight_units = [] total_inventory = 0 specifications = [] # 构建option名称映射(position -> name) option_name_map = {} if not options.empty: for _, opt_row in options.iterrows(): position = opt_row.get('position') name = opt_row.get('name') if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: skus_list.append(sku_data) # 收集价格信息 if 'price' in sku_data and sku_data['price'] is not None: try: price_val = float(sku_data['price']) prices.append(price_val) sku_prices.append(price_val) except (ValueError, TypeError): pass if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: try: compare_prices.append(float(sku_data['compare_at_price'])) except (ValueError, TypeError): pass # 收集重量信息 if 'weight' in sku_data and sku_data['weight'] is not None: try: sku_weights.append(int(float(sku_data['weight']))) except (ValueError, TypeError): pass if 'weight_unit' in sku_data and sku_data['weight_unit']: sku_weight_units.append(str(sku_data['weight_unit'])) # 收集库存信息 if 'stock' in sku_data and sku_data['stock'] is not None: try: total_inventory += int(sku_data['stock']) except (ValueError, TypeError): pass # 构建specifications(从SKU的option值和option表的name) sku_id = str(sku_row['id']) if pd.notna(sku_row.get('option1')) and 1 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[1], 'value': str(sku_row['option1']) }) if pd.notna(sku_row.get('option2')) and 2 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[2], 'value': str(sku_row['option2']) }) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[3], 'value': str(sku_row['option3']) }) return skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications def _fill_option_values(self, doc: Dict[str, Any], skus: pd.DataFrame): """填充option值字段。""" option1_values = [] option2_values = [] option3_values = [] for _, sku_row in skus.iterrows(): if pd.notna(sku_row.get('option1')): option1_values.append(str(sku_row['option1'])) if pd.notna(sku_row.get('option2')): option2_values.append(str(sku_row['option2'])) if pd.notna(sku_row.get('option3')): option3_values.append(str(sku_row['option3'])) # 去重并根据配置决定是否写入索引 if 'option1' in self.searchable_option_dimensions: doc['option1_values'] = list(set(option1_values)) if option1_values else [] else: doc['option1_values'] = [] if 'option2' in self.searchable_option_dimensions: doc['option2_values'] = list(set(option2_values)) if option2_values else [] else: doc['option2_values'] = [] if 'option3' in self.searchable_option_dimensions: doc['option3_values'] = list(set(option3_values)) if option3_values else [] else: doc['option3_values'] = [] def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ 将SKU行转换为SKU对象。 Args: sku_row: SKU行数据 option_name_map: position到option名称的映射 Returns: SKU字典 """ sku_data = {} # SKU ID sku_data['sku_id'] = str(sku_row['id']) # Price if pd.notna(sku_row.get('price')): try: sku_data['price'] = float(sku_row['price']) except (ValueError, TypeError): sku_data['price'] = None else: sku_data['price'] = None # Compare at price if pd.notna(sku_row.get('compare_at_price')): try: sku_data['compare_at_price'] = float(sku_row['compare_at_price']) except (ValueError, TypeError): sku_data['compare_at_price'] = None else: sku_data['compare_at_price'] = None # SKU Code if pd.notna(sku_row.get('sku')): sku_data['sku_code'] = str(sku_row['sku']) # Stock if pd.notna(sku_row.get('inventory_quantity')): try: sku_data['stock'] = int(sku_row['inventory_quantity']) except (ValueError, TypeError): sku_data['stock'] = 0 else: sku_data['stock'] = 0 # Weight if pd.notna(sku_row.get('weight')): try: sku_data['weight'] = float(sku_row['weight']) except (ValueError, TypeError): sku_data['weight'] = None else: sku_data['weight'] = None # Weight unit if pd.notna(sku_row.get('weight_unit')): sku_data['weight_unit'] = str(sku_row['weight_unit']) # Option values if pd.notna(sku_row.get('option1')): sku_data['option1_value'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): sku_data['option2_value'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): sku_data['option3_value'] = str(sku_row['option3']) # Image src if pd.notna(sku_row.get('image_src')): sku_data['image_src'] = str(sku_row['image_src']) return sku_data