""" SPU data transformer for Shoplazza products. Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus. """ import pandas as pd import numpy as np from typing import Dict, Any, List, Optional from sqlalchemy import create_engine, text from utils.db_connector import create_db_connection from config import ConfigLoader class SPUTransformer: """Transform SPU and SKU data into SPU-level ES documents.""" def __init__( self, db_engine: Any, tenant_id: str ): """ Initialize SPU transformer. Args: db_engine: SQLAlchemy database engine tenant_id: Tenant ID for filtering data """ self.db_engine = db_engine self.tenant_id = tenant_id # Load configuration to get searchable_option_dimensions try: config_loader = ConfigLoader() config = config_loader.load_config() self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions except Exception as e: print(f"Warning: Failed to load config, using default searchable_option_dimensions: {e}") self.searchable_option_dimensions = ['option1', 'option2', 'option3'] def load_spu_data(self) -> pd.DataFrame: """ Load SPU data from MySQL. Returns: DataFrame with SPU data """ query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) # Debug: Check if there's any data for this tenant_id if len(df) == 0: debug_query = text(""" SELECT COUNT(*) as total_count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count, SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count FROM shoplazza_product_spu WHERE tenant_id = :tenant_id """) with self.db_engine.connect() as conn: debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id}) if not debug_df.empty: total = debug_df.iloc[0]['total_count'] active = debug_df.iloc[0]['active_count'] deleted = debug_df.iloc[0]['deleted_count'] print(f"DEBUG: tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}") # Check what tenant_ids exist in the table tenant_check_query = text(""" SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active FROM shoplazza_product_spu GROUP BY tenant_id ORDER BY tenant_id LIMIT 10 """) with self.db_engine.connect() as conn: tenant_df = pd.read_sql(tenant_check_query, conn) if not tenant_df.empty: print(f"DEBUG: Available tenant_ids in shoplazza_product_spu:") for _, row in tenant_df.iterrows(): print(f" tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}") return df def load_sku_data(self) -> pd.DataFrame: """ Load SKU data from MySQL. Returns: DataFrame with SKU data """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) print(f"DEBUG: Loaded {len(df)} SKU records for tenant_id={self.tenant_id}") return df def load_option_data(self) -> pd.DataFrame: """ Load option data from MySQL. Returns: DataFrame with option data (name, position for each SPU) """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 ORDER BY spu_id, position """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) print(f"DEBUG: Loaded {len(df)} option records for tenant_id={self.tenant_id}") return df def transform_batch(self) -> List[Dict[str, Any]]: """ Transform SPU and SKU data into ES documents. Returns: List of SPU-level ES documents """ # Load data spu_df = self.load_spu_data() sku_df = self.load_sku_data() option_df = self.load_option_data() if spu_df.empty: return [] # Group SKUs by SPU sku_groups = sku_df.groupby('spu_id') # Group options by SPU option_groups = option_df.groupby('spu_id') if not option_df.empty else None documents = [] for _, spu_row in spu_df.iterrows(): spu_id = spu_row['id'] # Get SKUs for this SPU skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame() # Get options for this SPU options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame() # Transform to ES document doc = self._transform_spu_to_doc(spu_row, skus, options) if doc: documents.append(doc) return documents def _transform_spu_to_doc( self, spu_row: pd.Series, skus: pd.DataFrame, options: pd.DataFrame ) -> Optional[Dict[str, Any]]: """ Transform a single SPU row and its SKUs into an ES document. Args: spu_row: SPU row from database skus: DataFrame with SKUs for this SPU options: DataFrame with options for this SPU Returns: ES document or None if transformation fails """ doc = {} # Tenant ID (required) doc['tenant_id'] = str(self.tenant_id) # SPU ID doc['spu_id'] = str(spu_row['id']) # 文本相关性相关字段(中英文双语,暂时只填充中文) if pd.notna(spu_row.get('title')): doc['title_zh'] = str(spu_row['title']) doc['title_en'] = None # 暂时设为空 if pd.notna(spu_row.get('brief')): doc['brief_zh'] = str(spu_row['brief']) doc['brief_en'] = None if pd.notna(spu_row.get('description')): doc['description_zh'] = str(spu_row['description']) doc['description_en'] = None if pd.notna(spu_row.get('vendor')): doc['vendor_zh'] = str(spu_row['vendor']) doc['vendor_en'] = None # Tags if pd.notna(spu_row.get('tags')): # Tags是逗号分隔的字符串,需要转换为数组 tags_str = str(spu_row['tags']) doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Category相关字段 if pd.notna(spu_row.get('category_path')): category_path = str(spu_row['category_path']) doc['category_path_zh'] = category_path doc['category_path_en'] = None # 暂时设为空 # 解析category_path获取多层级分类名称 path_parts = category_path.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() elif pd.notna(spu_row.get('category')): # 如果category_path为空,使用category字段作为category1_name的备选 category = str(spu_row['category']) doc['category_name_zh'] = category doc['category_name_en'] = None doc['category_name'] = category # 尝试从category字段解析多级分类 if '/' in category: path_parts = category.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() else: # 如果category不包含"/",直接作为category1_name doc['category1_name'] = category.strip() if pd.notna(spu_row.get('category')): # 确保category相关字段都被设置(如果前面没有设置) category_name = str(spu_row['category']) if 'category_name_zh' not in doc: doc['category_name_zh'] = category_name if 'category_name_en' not in doc: doc['category_name_en'] = None if 'category_name' not in doc: doc['category_name'] = category_name if pd.notna(spu_row.get('category_id')): doc['category_id'] = str(int(spu_row['category_id'])) if pd.notna(spu_row.get('category_level')): doc['category_level'] = int(spu_row['category_level']) # Option名称(从option表获取) if not options.empty: # 按position排序获取option名称 sorted_options = options.sort_values('position') if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): doc['option1_name'] = str(sorted_options.iloc[0]['name']) if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): doc['option2_name'] = str(sorted_options.iloc[1]['name']) if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): doc['option3_name'] = str(sorted_options.iloc[2]['name']) # Image URL if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): image_src = f"//{image_src}" if image_src.startswith('//') else image_src doc['image_url'] = image_src # Process SKUs and build specifications skus_list = [] prices = [] compare_prices = [] sku_prices = [] sku_weights = [] sku_weight_units = [] total_inventory = 0 specifications = [] # 构建option名称映射(position -> name) option_name_map = {} if not options.empty: for _, opt_row in options.iterrows(): position = opt_row.get('position') name = opt_row.get('name') if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: skus_list.append(sku_data) # 收集价格信息 if 'price' in sku_data and sku_data['price'] is not None: try: price_val = float(sku_data['price']) prices.append(price_val) sku_prices.append(price_val) except (ValueError, TypeError): pass if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: try: compare_prices.append(float(sku_data['compare_at_price'])) except (ValueError, TypeError): pass # 收集重量信息 if 'weight' in sku_data and sku_data['weight'] is not None: try: sku_weights.append(int(float(sku_data['weight']))) except (ValueError, TypeError): pass if 'weight_unit' in sku_data and sku_data['weight_unit']: sku_weight_units.append(str(sku_data['weight_unit'])) # 收集库存信息 if 'stock' in sku_data and sku_data['stock'] is not None: try: total_inventory += int(sku_data['stock']) except (ValueError, TypeError): pass # 构建specifications(从SKU的option值和option表的name) sku_id = str(sku_row['id']) if pd.notna(sku_row.get('option1')) and 1 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[1], 'value': str(sku_row['option1']) }) if pd.notna(sku_row.get('option2')) and 2 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[2], 'value': str(sku_row['option2']) }) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[3], 'value': str(sku_row['option3']) }) doc['skus'] = skus_list doc['specifications'] = specifications # 提取option值(根据配置的searchable_option_dimensions) # 从子SKU的option1_value, option2_value, option3_value中提取去重后的值 option1_values = [] option2_values = [] option3_values = [] for _, sku_row in skus.iterrows(): if pd.notna(sku_row.get('option1')): option1_values.append(str(sku_row['option1'])) if pd.notna(sku_row.get('option2')): option2_values.append(str(sku_row['option2'])) if pd.notna(sku_row.get('option3')): option3_values.append(str(sku_row['option3'])) # 去重并根据配置决定是否写入索引 if 'option1' in self.searchable_option_dimensions: doc['option1_values'] = list(set(option1_values)) if option1_values else [] else: doc['option1_values'] = [] if 'option2' in self.searchable_option_dimensions: doc['option2_values'] = list(set(option2_values)) if option2_values else [] else: doc['option2_values'] = [] if 'option3' in self.searchable_option_dimensions: doc['option3_values'] = list(set(option3_values)) if option3_values else [] else: doc['option3_values'] = [] # Calculate price ranges if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None # SKU扁平化字段 doc['sku_prices'] = sku_prices doc['sku_weights'] = sku_weights doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 doc['total_inventory'] = total_inventory # Image URL if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): image_src = f"//{image_src}" if image_src.startswith('//') else image_src doc['image_url'] = image_src # Time fields - convert datetime to ISO format string for ES DATE type if pd.notna(spu_row.get('create_time')): create_time = spu_row['create_time'] if hasattr(create_time, 'isoformat'): doc['create_time'] = create_time.isoformat() else: doc['create_time'] = str(create_time) if pd.notna(spu_row.get('update_time')): update_time = spu_row['update_time'] if hasattr(update_time, 'isoformat'): doc['update_time'] = update_time.isoformat() else: doc['update_time'] = str(update_time) return doc def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ Transform a SKU row into a SKU object. Args: sku_row: SKU row from database option_name_map: Mapping from position to option name Returns: SKU dictionary or None """ sku_data = {} # SKU ID sku_data['sku_id'] = str(sku_row['id']) # Price if pd.notna(sku_row.get('price')): try: sku_data['price'] = float(sku_row['price']) except (ValueError, TypeError): sku_data['price'] = None else: sku_data['price'] = None # Compare at price if pd.notna(sku_row.get('compare_at_price')): try: sku_data['compare_at_price'] = float(sku_row['compare_at_price']) except (ValueError, TypeError): sku_data['compare_at_price'] = None else: sku_data['compare_at_price'] = None # SKU Code if pd.notna(sku_row.get('sku')): sku_data['sku_code'] = str(sku_row['sku']) # Stock if pd.notna(sku_row.get('inventory_quantity')): try: sku_data['stock'] = int(sku_row['inventory_quantity']) except (ValueError, TypeError): sku_data['stock'] = 0 else: sku_data['stock'] = 0 # Weight if pd.notna(sku_row.get('weight')): try: sku_data['weight'] = float(sku_row['weight']) except (ValueError, TypeError): sku_data['weight'] = None else: sku_data['weight'] = None # Weight unit if pd.notna(sku_row.get('weight_unit')): sku_data['weight_unit'] = str(sku_row['weight_unit']) # Option values if pd.notna(sku_row.get('option1')): sku_data['option1_value'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): sku_data['option2_value'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): sku_data['option3_value'] = str(sku_row['option3']) # Image src if pd.notna(sku_row.get('image_src')): sku_data['image_src'] = str(sku_row['image_src']) return sku_data