spu_transformer.py 11.6 KB
"""
SPU data transformer for Shoplazza products.

Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
"""

import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, text
from utils.db_connector import create_db_connection


class SPUTransformer:
    """Transform SPU and SKU data into SPU-level ES documents."""

    def __init__(
        self,
        db_engine: Any,
        tenant_id: str
    ):
        """
        Initialize SPU transformer.

        Args:
            db_engine: SQLAlchemy database engine
            tenant_id: Tenant ID for filtering data
        """
        self.db_engine = db_engine
        self.tenant_id = tenant_id

    def load_spu_data(self) -> pd.DataFrame:
        """
        Load SPU data from MySQL.

        Returns:
            DataFrame with SPU data
        """
        query = text("""
            SELECT 
                id, shop_id, shoplazza_id, handle, title, brief, description,
                spu, vendor, vendor_url, seo_title, seo_description, seo_keywords,
                image_src, image_width, image_height, image_path, image_alt,
                tags, note, category,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_spu
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        # Debug: Check if there's any data for this tenant_id
        if len(df) == 0:
            debug_query = text("""
                SELECT 
                    COUNT(*) as total_count,
                    SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
                    SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
                FROM shoplazza_product_spu
                WHERE tenant_id = :tenant_id
            """)
            with self.db_engine.connect() as conn:
                debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
            if not debug_df.empty:
                total = debug_df.iloc[0]['total_count']
                active = debug_df.iloc[0]['active_count']
                deleted = debug_df.iloc[0]['deleted_count']
                print(f"DEBUG: tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
            
            # Check what tenant_ids exist in the table
            tenant_check_query = text("""
                SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
                FROM shoplazza_product_spu
                GROUP BY tenant_id
                ORDER BY tenant_id
                LIMIT 10
            """)
            with self.db_engine.connect() as conn:
                tenant_df = pd.read_sql(tenant_check_query, conn)
            if not tenant_df.empty:
                print(f"DEBUG: Available tenant_ids in shoplazza_product_spu:")
                for _, row in tenant_df.iterrows():
                    print(f"  tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
        
        return df

    def load_sku_data(self) -> pd.DataFrame:
        """
        Load SKU data from MySQL.

        Returns:
            DataFrame with SKU data
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                shoplazza_image_id, title, sku, barcode, position,
                price, compare_at_price, cost_price,
                option1, option2, option3,
                inventory_quantity, weight, weight_unit, image_src,
                wholesale_price, note, extend,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_sku
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        print(f"DEBUG: Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
        
        return df

    def transform_batch(self) -> List[Dict[str, Any]]:
        """
        Transform SPU and SKU data into ES documents.

        Returns:
            List of SPU-level ES documents
        """
        # Load data
        spu_df = self.load_spu_data()
        sku_df = self.load_sku_data()

        if spu_df.empty:
            return []

        # Group SKUs by SPU
        sku_groups = sku_df.groupby('spu_id')

        documents = []
        for _, spu_row in spu_df.iterrows():
            spu_id = spu_row['id']
            
            # Get SKUs for this SPU
            skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
            
            # Transform to ES document
            doc = self._transform_spu_to_doc(spu_row, skus)
            if doc:
                documents.append(doc)

        return documents

    def _transform_spu_to_doc(
        self,
        spu_row: pd.Series,
        skus: pd.DataFrame
    ) -> Optional[Dict[str, Any]]:
        """
        Transform a single SPU row and its SKUs into an ES document.

        Args:
            spu_row: SPU row from database
            skus: DataFrame with SKUs for this SPU

        Returns:
            ES document or None if transformation fails
        """
        doc = {}

        # Tenant ID (required)
        doc['tenant_id'] = str(self.tenant_id)

        # SPU ID
        doc['spu_id'] = str(spu_row['id'])

        # Handle
        if pd.notna(spu_row.get('handle')):
            doc['handle'] = str(spu_row['handle'])

        # Title
        if pd.notna(spu_row.get('title')):
            doc['title'] = str(spu_row['title'])

        # Brief
        if pd.notna(spu_row.get('brief')):
            doc['brief'] = str(spu_row['brief'])

        # Description
        if pd.notna(spu_row.get('description')):
            doc['description'] = str(spu_row['description'])

        # SEO fields
        if pd.notna(spu_row.get('seo_title')):
            doc['seo_title'] = str(spu_row['seo_title'])
        if pd.notna(spu_row.get('seo_description')):
            doc['seo_description'] = str(spu_row['seo_description'])
        if pd.notna(spu_row.get('seo_keywords')):
            doc['seo_keywords'] = str(spu_row['seo_keywords'])

        # Vendor
        if pd.notna(spu_row.get('vendor')):
            doc['vendor'] = str(spu_row['vendor'])

        # Tags
        if pd.notna(spu_row.get('tags')):
            doc['tags'] = str(spu_row['tags'])

        # Category
        if pd.notna(spu_row.get('category')):
            doc['category'] = str(spu_row['category'])

        # Image URL
        if pd.notna(spu_row.get('image_src')):
            image_src = str(spu_row['image_src'])
            if not image_src.startswith('http'):
                image_src = f"//{image_src}" if image_src.startswith('//') else image_src
            doc['image_url'] = image_src

        # Process SKUs
        skus_list = []
        prices = []
        compare_prices = []

        for _, sku_row in skus.iterrows():
            sku_data = self._transform_sku_row(sku_row)
            if sku_data:
                skus_list.append(sku_data)
                if 'price' in sku_data and sku_data['price'] is not None:
                    try:
                        prices.append(float(sku_data['price']))
                    except (ValueError, TypeError):
                        pass
                if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None:
                    try:
                        compare_prices.append(float(sku_data['compare_at_price']))
                    except (ValueError, TypeError):
                        pass

        doc['skus'] = skus_list

        # Calculate price ranges
        if prices:
            doc['min_price'] = float(min(prices))
            doc['max_price'] = float(max(prices))
        else:
            doc['min_price'] = 0.0
            doc['max_price'] = 0.0

        if compare_prices:
            doc['compare_at_price'] = float(max(compare_prices))
        else:
            doc['compare_at_price'] = None

        # Time fields - convert datetime to ISO format string for ES DATE type
        if pd.notna(spu_row.get('create_time')):
            create_time = spu_row['create_time']
            if hasattr(create_time, 'isoformat'):
                doc['create_time'] = create_time.isoformat()
            else:
                doc['create_time'] = str(create_time)
        
        if pd.notna(spu_row.get('update_time')):
            update_time = spu_row['update_time']
            if hasattr(update_time, 'isoformat'):
                doc['update_time'] = update_time.isoformat()
            else:
                doc['update_time'] = str(update_time)
        
        if pd.notna(spu_row.get('shoplazza_created_at')):
            shoplazza_created_at = spu_row['shoplazza_created_at']
            if hasattr(shoplazza_created_at, 'isoformat'):
                doc['shoplazza_created_at'] = shoplazza_created_at.isoformat()
            else:
                doc['shoplazza_created_at'] = str(shoplazza_created_at)
        
        if pd.notna(spu_row.get('shoplazza_updated_at')):
            shoplazza_updated_at = spu_row['shoplazza_updated_at']
            if hasattr(shoplazza_updated_at, 'isoformat'):
                doc['shoplazza_updated_at'] = shoplazza_updated_at.isoformat()
            else:
                doc['shoplazza_updated_at'] = str(shoplazza_updated_at)

        return doc

    def _transform_sku_row(self, sku_row: pd.Series) -> Optional[Dict[str, Any]]:
        """
        Transform a SKU row into a SKU object.

        Args:
            sku_row: SKU row from database

        Returns:
            SKU dictionary or None
        """
        sku_data = {}

        # SKU ID
        sku_data['sku_id'] = str(sku_row['id'])

        # Title
        if pd.notna(sku_row.get('title')):
            sku_data['title'] = str(sku_row['title'])

        # Price
        if pd.notna(sku_row.get('price')):
            try:
                sku_data['price'] = float(sku_row['price'])
            except (ValueError, TypeError):
                sku_data['price'] = None
        else:
            sku_data['price'] = None

        # Compare at price
        if pd.notna(sku_row.get('compare_at_price')):
            try:
                sku_data['compare_at_price'] = float(sku_row['compare_at_price'])
            except (ValueError, TypeError):
                sku_data['compare_at_price'] = None
        else:
            sku_data['compare_at_price'] = None

        # SKU
        if pd.notna(sku_row.get('sku')):
            sku_data['sku'] = str(sku_row['sku'])

        # Stock
        if pd.notna(sku_row.get('inventory_quantity')):
            try:
                sku_data['stock'] = int(sku_row['inventory_quantity'])
            except (ValueError, TypeError):
                sku_data['stock'] = 0
        else:
            sku_data['stock'] = 0

        # Options (from option1, option2, option3)
        options = {}
        if pd.notna(sku_row.get('option1')):
            options['option1'] = str(sku_row['option1'])
        if pd.notna(sku_row.get('option2')):
            options['option2'] = str(sku_row['option2'])
        if pd.notna(sku_row.get('option3')):
            options['option3'] = str(sku_row['option3'])
        
        if options:
            sku_data['options'] = options

        return sku_data