spu_transformer.py 13.7 KB
"""
SPU data transformer for Shoplazza products.

Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
"""

import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, text
from utils.db_connector import create_db_connection
from config import ConfigLoader
from config.tenant_config_loader import get_tenant_config_loader
from indexer.document_transformer import SPUDocumentTransformer

# Configure logger
logger = logging.getLogger(__name__)


class SPUTransformer:
    """Transform SPU and SKU data into SPU-level ES documents."""

    def __init__(
        self,
        db_engine: Any,
        tenant_id: str
    ):
        """
        Initialize SPU transformer.

        Args:
            db_engine: SQLAlchemy database engine
            tenant_id: Tenant ID for filtering data
        """
        self.db_engine = db_engine
        self.tenant_id = tenant_id
        
        # Load configuration to get searchable_option_dimensions
        translator = None
        translation_prompts = {}
        try:
            config_loader = ConfigLoader()
            config = config_loader.load_config()
            self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
            
            # Initialize translator if translation is enabled
            if config.query_config.enable_translation:
                from query.translator import Translator
                translator = Translator(
                    api_key=config.query_config.translation_api_key,
                    use_cache=True,  # 索引时使用缓存避免重复翻译
                    glossary_id=config.query_config.translation_glossary_id,
                    translation_context=config.query_config.translation_context
                )
                translation_prompts = config.query_config.translation_prompts
        except Exception as e:
            logger.warning(f"Failed to load config, using default: {e}")
            self.searchable_option_dimensions = ['option1', 'option2', 'option3']
        
        # Load category ID to name mapping
        self.category_id_to_name = self._load_category_mapping()
        
        # Load tenant config
        tenant_config_loader = get_tenant_config_loader()
        tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
        
        # Initialize document transformer
        self.document_transformer = SPUDocumentTransformer(
            category_id_to_name=self.category_id_to_name,
            searchable_option_dimensions=self.searchable_option_dimensions,
            tenant_config=tenant_config,
            translator=translator,
            translation_prompts=translation_prompts
        )

    def _load_category_mapping(self) -> Dict[str, str]:
        """
        Load category ID to name mapping from database.
        
        Returns:
            Dictionary mapping category_id to category_name
        """
        query = text("""
            SELECT DISTINCT
                category_id,
                category
            FROM shoplazza_product_spu
            WHERE deleted = 0 AND category_id IS NOT NULL
        """)
        
        mapping = {}
        with self.db_engine.connect() as conn:
            result = conn.execute(query)
            for row in result:
                category_id = str(int(row.category_id))
                category_name = row.category
                
                if not category_name or not category_name.strip():
                    logger.warning(f"Category ID {category_id} has empty name, skipping")
                    continue
                
                mapping[category_id] = category_name
        
        logger.info(f"Loaded {len(mapping)} category ID to name mappings")
        
        # Log all category mappings for debugging
        if mapping:
            logger.debug("Category ID mappings:")
            for cid, name in sorted(mapping.items()):
                logger.debug(f"  {cid} -> {name}")
        
        return mapping
    
    def load_spu_data(self) -> pd.DataFrame:
        """
        Load SPU data from MySQL.

        Returns:
            DataFrame with SPU data
        """
        query = text("""
            SELECT 
                id, shop_id, shoplazza_id, title, brief, description,
                spu, vendor, vendor_url,
                image_src, image_width, image_height, image_path, image_alt,
                tags, note, category, category_id, category_google_id,
                category_level, category_path,
                fake_sales, display_fake_sales,
                tenant_id, creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_spu
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
        
        # Statistics
        if len(df) > 0:
            has_category_path = df['category_path'].notna().sum()
            has_category = df['category'].notna().sum()
            has_title = df['title'].notna().sum()
            
            logger.info(f"SPU data statistics:")
            logger.info(f"  - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
            logger.info(f"  - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
            logger.info(f"  - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
            
            # Warn if too many SPUs don't have category_path
            if has_category_path < len(df) * 0.5:
                logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
        else:
            logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
            
            # Debug: Check if there's any data for this tenant_id
            debug_query = text("""
                SELECT 
                    COUNT(*) as total_count,
                    SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
                    SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
                FROM shoplazza_product_spu
                WHERE tenant_id = :tenant_id
            """)
            with self.db_engine.connect() as conn:
                debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
            if not debug_df.empty:
                total = debug_df.iloc[0]['total_count']
                active = debug_df.iloc[0]['active_count']
                deleted = debug_df.iloc[0]['deleted_count']
                logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
            
            # Check what tenant_ids exist in the table
            tenant_check_query = text("""
                SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
                FROM shoplazza_product_spu
                GROUP BY tenant_id
                ORDER BY tenant_id
                LIMIT 10
            """)
            with self.db_engine.connect() as conn:
                tenant_df = pd.read_sql(tenant_check_query, conn)
            if not tenant_df.empty:
                logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
                for _, row in tenant_df.iterrows():
                    logger.debug(f"  tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
        
        return df

    def load_sku_data(self) -> pd.DataFrame:
        """
        Load SKU data from MySQL.

        Returns:
            DataFrame with SKU data
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                shoplazza_image_id, title, sku, barcode, position,
                price, compare_at_price, cost_price,
                option1, option2, option3,
                inventory_quantity, weight, weight_unit, image_src,
                wholesale_price, note, extend,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_sku
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
        
        # Statistics
        if len(df) > 0:
            has_price = df['price'].notna().sum()
            has_inventory = df['inventory_quantity'].notna().sum()
            has_option1 = df['option1'].notna().sum()
            has_option2 = df['option2'].notna().sum()
            has_option3 = df['option3'].notna().sum()
            
            logger.info(f"SKU data statistics:")
            logger.info(f"  - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
            logger.info(f"  - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
            logger.info(f"  - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
            logger.info(f"  - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
            logger.info(f"  - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
            
            # Warn about data quality issues
            if has_price < len(df) * 0.95:
                logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
        
        return df

    def load_option_data(self) -> pd.DataFrame:
        """
        Load option data from MySQL.

        Returns:
            DataFrame with option data (name, position for each SPU)
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                position, name, `values`, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_option
            WHERE tenant_id = :tenant_id AND deleted = 0
            ORDER BY spu_id, position
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
        
        # Statistics
        if len(df) > 0:
            unique_spus_with_options = df['spu_id'].nunique()
            has_name = df['name'].notna().sum()
            
            logger.info(f"Option data statistics:")
            logger.info(f"  - Unique SPUs with options: {unique_spus_with_options}")
            logger.info(f"  - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
            
            # Warn about missing option names
            if has_name < len(df):
                missing = len(df) - has_name
                logger.warning(f"{missing} option records are missing names")
        
        return df

    def transform_batch(self) -> List[Dict[str, Any]]:
        """
        Transform SPU and SKU data into ES documents.

        Returns:
            List of SPU-level ES documents
        """
        logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
        
        # Load data
        spu_df = self.load_spu_data()
        sku_df = self.load_sku_data()
        option_df = self.load_option_data()

        if spu_df.empty:
            logger.warning("No SPU data to transform")
            return []

        # Group SKUs by SPU
        sku_groups = sku_df.groupby('spu_id')
        logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
        
        # Group options by SPU
        option_groups = option_df.groupby('spu_id') if not option_df.empty else None
        if option_groups:
            logger.info(f"Grouped options into {len(option_groups)} SPU groups")

        documents = []
        skipped_count = 0
        error_count = 0
        
        for idx, spu_row in spu_df.iterrows():
            spu_id = spu_row['id']
            
            try:
                # Get SKUs for this SPU
                skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
                
                # Get options for this SPU
                options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
                
                # Warn if SPU has no SKUs
                if skus.empty:
                    logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
                
                # Transform to ES document
                doc = self.document_transformer.transform_spu_to_doc(
                    tenant_id=self.tenant_id,
                    spu_row=spu_row,
                    skus=skus,
                    options=options
                )
                if doc:
                    documents.append(doc)
                else:
                    skipped_count += 1
                    logger.warning(f"SPU {spu_id} transformation returned None, skipped")
            except Exception as e:
                error_count += 1
                logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)

        logger.info(f"Transformation complete:")
        logger.info(f"  - Total SPUs: {len(spu_df)}")
        logger.info(f"  - Successfully transformed: {len(documents)}")
        logger.info(f"  - Skipped: {skipped_count}")
        logger.info(f"  - Errors: {error_count}")
        
        return documents