""" SPU data transformer for Shoplazza products. Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus. """ import pandas as pd import logging from typing import Dict, Any, List, Optional from sqlalchemy import text from indexer.indexing_utils import load_category_mapping, create_document_transformer # Configure logger logger = logging.getLogger(__name__) class SPUTransformer: """Transform SPU and SKU data into SPU-level ES documents.""" def __init__(self, db_engine: Any, tenant_id: str): self.db_engine = db_engine self.tenant_id = tenant_id # Load category ID to name mapping self.category_id_to_name = load_category_mapping(db_engine) logger.info(f"Loaded {len(self.category_id_to_name)} category ID to name mappings") # Initialize document transformer self.document_transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, tenant_id=tenant_id ) def load_spu_data(self) -> pd.DataFrame: """ Load SPU data from MySQL. Returns: DataFrame with SPU data """ query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}") # Statistics if len(df) > 0: has_category_path = df['category_path'].notna().sum() has_category = df['category'].notna().sum() has_title = df['title'].notna().sum() logger.info(f"SPU data statistics:") logger.info(f" - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)") logger.info(f" - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)") logger.info(f" - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)") # Warn if too many SPUs don't have category_path if has_category_path < len(df) * 0.5: logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low") else: logger.warning(f"No SPU data found for tenant_id={self.tenant_id}") # Debug: Check if there's any data for this tenant_id debug_query = text(""" SELECT COUNT(*) as total_count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count, SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count FROM shoplazza_product_spu WHERE tenant_id = :tenant_id """) with self.db_engine.connect() as conn: debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id}) if not debug_df.empty: total = debug_df.iloc[0]['total_count'] active = debug_df.iloc[0]['active_count'] deleted = debug_df.iloc[0]['deleted_count'] logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}") # Check what tenant_ids exist in the table tenant_check_query = text(""" SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active FROM shoplazza_product_spu GROUP BY tenant_id ORDER BY tenant_id LIMIT 10 """) with self.db_engine.connect() as conn: tenant_df = pd.read_sql(tenant_check_query, conn) if not tenant_df.empty: logger.debug(f"Available tenant_ids in shoplazza_product_spu:") for _, row in tenant_df.iterrows(): logger.debug(f" tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}") return df def load_sku_data(self) -> pd.DataFrame: """ Load SKU data from MySQL. Returns: DataFrame with SKU data """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}") # Statistics if len(df) > 0: has_price = df['price'].notna().sum() has_inventory = df['inventory_quantity'].notna().sum() has_option1 = df['option1'].notna().sum() has_option2 = df['option2'].notna().sum() has_option3 = df['option3'].notna().sum() logger.info(f"SKU data statistics:") logger.info(f" - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)") logger.info(f" - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)") logger.info(f" - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)") logger.info(f" - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)") logger.info(f" - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)") # Warn about data quality issues if has_price < len(df) * 0.95: logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price") return df def load_option_data(self) -> pd.DataFrame: """ Load option data from MySQL. Returns: DataFrame with option data (name, position for each SPU) """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 ORDER BY spu_id, position """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}") # Statistics if len(df) > 0: unique_spus_with_options = df['spu_id'].nunique() has_name = df['name'].notna().sum() logger.info(f"Option data statistics:") logger.info(f" - Unique SPUs with options: {unique_spus_with_options}") logger.info(f" - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)") # Warn about missing option names if has_name < len(df): missing = len(df) - has_name logger.warning(f"{missing} option records are missing names") return df def transform_batch(self) -> List[Dict[str, Any]]: """ Transform SPU and SKU data into ES documents. Returns: List of SPU-level ES documents """ logger.info(f"Starting data transformation for tenant_id={self.tenant_id}") # Load data spu_df = self.load_spu_data() sku_df = self.load_sku_data() option_df = self.load_option_data() if spu_df.empty: logger.warning("No SPU data to transform") return [] # Group SKUs by SPU sku_groups = sku_df.groupby('spu_id') logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups") # Group options by SPU option_groups = option_df.groupby('spu_id') if not option_df.empty else None if option_groups: logger.info(f"Grouped options into {len(option_groups)} SPU groups") documents = [] skipped_count = 0 error_count = 0 for idx, spu_row in spu_df.iterrows(): spu_id = spu_row['id'] try: # Get SKUs for this SPU skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame() # Get options for this SPU options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame() # Warn if SPU has no SKUs if skus.empty: logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs") # Transform to ES document doc = self.document_transformer.transform_spu_to_doc( tenant_id=self.tenant_id, spu_row=spu_row, skus=skus, options=options ) if doc: documents.append(doc) else: skipped_count += 1 logger.warning(f"SPU {spu_id} transformation returned None, skipped") except Exception as e: error_count += 1 logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True) logger.info(f"Transformation complete:") logger.info(f" - Total SPUs: {len(spu_df)}") logger.info(f" - Successfully transformed: {len(documents)}") logger.info(f" - Skipped: {skipped_count}") logger.info(f" - Errors: {error_count}") return documents