spu_transformer.py 8.9 KB
"""
SPU data transformer for Shoplazza products.

Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested variants.
"""

import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, text
from utils.db_connector import create_db_connection


class SPUTransformer:
    """Transform SPU and SKU data into SPU-level ES documents."""

    def __init__(
        self,
        db_engine: Any,
        tenant_id: str
    ):
        """
        Initialize SPU transformer.

        Args:
            db_engine: SQLAlchemy database engine
            tenant_id: Tenant ID for filtering data
        """
        self.db_engine = db_engine
        self.tenant_id = tenant_id

    def load_spu_data(self) -> pd.DataFrame:
        """
        Load SPU data from MySQL.

        Returns:
            DataFrame with SPU data
        """
        query = text("""
            SELECT 
                id, shop_id, shoplazza_id, handle, title, brief, description,
                spu, vendor, vendor_url, seo_title, seo_description, seo_keywords,
                image_src, image_width, image_height, image_path, image_alt,
                tags, note, category,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_spu
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        return df

    def load_sku_data(self) -> pd.DataFrame:
        """
        Load SKU data from MySQL.

        Returns:
            DataFrame with SKU data
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                shoplazza_image_id, title, sku, barcode, position,
                price, compare_at_price, cost_price,
                option1, option2, option3,
                inventory_quantity, weight, weight_unit, image_src,
                wholesale_price, note, extend,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_sku
            WHERE tenant_id = :tenant_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
        
        return df

    def transform_batch(self) -> List[Dict[str, Any]]:
        """
        Transform SPU and SKU data into ES documents.

        Returns:
            List of SPU-level ES documents
        """
        # Load data
        spu_df = self.load_spu_data()
        sku_df = self.load_sku_data()

        if spu_df.empty:
            return []

        # Group SKUs by SPU
        sku_groups = sku_df.groupby('spu_id')

        documents = []
        for _, spu_row in spu_df.iterrows():
            spu_id = spu_row['id']
            
            # Get SKUs for this SPU
            skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
            
            # Transform to ES document
            doc = self._transform_spu_to_doc(spu_row, skus)
            if doc:
                documents.append(doc)

        return documents

    def _transform_spu_to_doc(
        self,
        spu_row: pd.Series,
        skus: pd.DataFrame
    ) -> Optional[Dict[str, Any]]:
        """
        Transform a single SPU row and its SKUs into an ES document.

        Args:
            spu_row: SPU row from database
            skus: DataFrame with SKUs for this SPU

        Returns:
            ES document or None if transformation fails
        """
        doc = {}

        # Tenant ID (required)
        doc['tenant_id'] = str(self.tenant_id)

        # Product ID
        doc['product_id'] = str(spu_row['id'])

        # Handle
        if pd.notna(spu_row.get('handle')):
            doc['handle'] = str(spu_row['handle'])

        # Title
        if pd.notna(spu_row.get('title')):
            doc['title'] = str(spu_row['title'])

        # Brief
        if pd.notna(spu_row.get('brief')):
            doc['brief'] = str(spu_row['brief'])

        # Description
        if pd.notna(spu_row.get('description')):
            doc['description'] = str(spu_row['description'])

        # SEO fields
        if pd.notna(spu_row.get('seo_title')):
            doc['seo_title'] = str(spu_row['seo_title'])
        if pd.notna(spu_row.get('seo_description')):
            doc['seo_description'] = str(spu_row['seo_description'])
        if pd.notna(spu_row.get('seo_keywords')):
            doc['seo_keywords'] = str(spu_row['seo_keywords'])

        # Vendor
        if pd.notna(spu_row.get('vendor')):
            doc['vendor'] = str(spu_row['vendor'])
            doc['vendor_keyword'] = str(spu_row['vendor'])

        # Product type (from category or tags)
        if pd.notna(spu_row.get('category')):
            doc['product_type'] = str(spu_row['category'])
            doc['product_type_keyword'] = str(spu_row['category'])

        # Tags
        if pd.notna(spu_row.get('tags')):
            tags_str = str(spu_row['tags'])
            doc['tags'] = tags_str
            doc['tags_keyword'] = tags_str

        # Category
        if pd.notna(spu_row.get('category')):
            doc['category'] = str(spu_row['category'])
            doc['category_keyword'] = str(spu_row['category'])

        # Image URL
        if pd.notna(spu_row.get('image_src')):
            image_src = str(spu_row['image_src'])
            if not image_src.startswith('http'):
                image_src = f"//{image_src}" if image_src.startswith('//') else image_src
            doc['image_url'] = image_src

        # Process variants
        variants = []
        prices = []
        compare_prices = []

        for _, sku_row in skus.iterrows():
            variant = self._transform_sku_to_variant(sku_row)
            if variant:
                variants.append(variant)
                if 'price' in variant and variant['price'] is not None:
                    try:
                        prices.append(float(variant['price']))
                    except (ValueError, TypeError):
                        pass
                if 'compare_at_price' in variant and variant['compare_at_price'] is not None:
                    try:
                        compare_prices.append(float(variant['compare_at_price']))
                    except (ValueError, TypeError):
                        pass

        doc['variants'] = variants

        # Calculate price ranges
        if prices:
            doc['min_price'] = float(min(prices))
            doc['max_price'] = float(max(prices))
        else:
            doc['min_price'] = 0.0
            doc['max_price'] = 0.0

        if compare_prices:
            doc['compare_at_price'] = float(max(compare_prices))
        else:
            doc['compare_at_price'] = None

        return doc

    def _transform_sku_to_variant(self, sku_row: pd.Series) -> Optional[Dict[str, Any]]:
        """
        Transform a SKU row into a variant object.

        Args:
            sku_row: SKU row from database

        Returns:
            Variant dictionary or None
        """
        variant = {}

        # Variant ID
        variant['variant_id'] = str(sku_row['id'])

        # Title
        if pd.notna(sku_row.get('title')):
            variant['title'] = str(sku_row['title'])

        # Price
        if pd.notna(sku_row.get('price')):
            try:
                variant['price'] = float(sku_row['price'])
            except (ValueError, TypeError):
                variant['price'] = None
        else:
            variant['price'] = None

        # Compare at price
        if pd.notna(sku_row.get('compare_at_price')):
            try:
                variant['compare_at_price'] = float(sku_row['compare_at_price'])
            except (ValueError, TypeError):
                variant['compare_at_price'] = None
        else:
            variant['compare_at_price'] = None

        # SKU
        if pd.notna(sku_row.get('sku')):
            variant['sku'] = str(sku_row['sku'])

        # Stock
        if pd.notna(sku_row.get('inventory_quantity')):
            try:
                variant['stock'] = int(sku_row['inventory_quantity'])
            except (ValueError, TypeError):
                variant['stock'] = 0
        else:
            variant['stock'] = 0

        # Options (from option1, option2, option3)
        options = {}
        if pd.notna(sku_row.get('option1')):
            options['option1'] = str(sku_row['option1'])
        if pd.notna(sku_row.get('option2')):
            options['option2'] = str(sku_row['option2'])
        if pd.notna(sku_row.get('option3')):
            options['option3'] = str(sku_row['option3'])
        
        if options:
            variant['options'] = options

        return variant