""" SPU data transformer for Shoplazza products. Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested variants. """ import pandas as pd import numpy as np from typing import Dict, Any, List, Optional from sqlalchemy import create_engine, text from utils.db_connector import create_db_connection class SPUTransformer: """Transform SPU and SKU data into SPU-level ES documents.""" def __init__( self, db_engine: Any, tenant_id: str ): """ Initialize SPU transformer. Args: db_engine: SQLAlchemy database engine tenant_id: Tenant ID for filtering data """ self.db_engine = db_engine self.tenant_id = tenant_id def load_spu_data(self) -> pd.DataFrame: """ Load SPU data from MySQL. Returns: DataFrame with SPU data """ query = text(""" SELECT id, shop_id, shoplazza_id, handle, title, brief, description, spu, vendor, vendor_url, seo_title, seo_description, seo_keywords, image_src, image_width, image_height, image_path, image_alt, tags, note, category, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) return df def load_sku_data(self) -> pd.DataFrame: """ Load SKU data from MySQL. Returns: DataFrame with SKU data """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) return df def transform_batch(self) -> List[Dict[str, Any]]: """ Transform SPU and SKU data into ES documents. Returns: List of SPU-level ES documents """ # Load data spu_df = self.load_spu_data() sku_df = self.load_sku_data() if spu_df.empty: return [] # Group SKUs by SPU sku_groups = sku_df.groupby('spu_id') documents = [] for _, spu_row in spu_df.iterrows(): spu_id = spu_row['id'] # Get SKUs for this SPU skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame() # Transform to ES document doc = self._transform_spu_to_doc(spu_row, skus) if doc: documents.append(doc) return documents def _transform_spu_to_doc( self, spu_row: pd.Series, skus: pd.DataFrame ) -> Optional[Dict[str, Any]]: """ Transform a single SPU row and its SKUs into an ES document. Args: spu_row: SPU row from database skus: DataFrame with SKUs for this SPU Returns: ES document or None if transformation fails """ doc = {} # Tenant ID (required) doc['tenant_id'] = str(self.tenant_id) # Product ID doc['product_id'] = str(spu_row['id']) # Handle if pd.notna(spu_row.get('handle')): doc['handle'] = str(spu_row['handle']) # Title if pd.notna(spu_row.get('title')): doc['title'] = str(spu_row['title']) # Brief if pd.notna(spu_row.get('brief')): doc['brief'] = str(spu_row['brief']) # Description if pd.notna(spu_row.get('description')): doc['description'] = str(spu_row['description']) # SEO fields if pd.notna(spu_row.get('seo_title')): doc['seo_title'] = str(spu_row['seo_title']) if pd.notna(spu_row.get('seo_description')): doc['seo_description'] = str(spu_row['seo_description']) if pd.notna(spu_row.get('seo_keywords')): doc['seo_keywords'] = str(spu_row['seo_keywords']) # Vendor if pd.notna(spu_row.get('vendor')): doc['vendor'] = str(spu_row['vendor']) doc['vendor_keyword'] = str(spu_row['vendor']) # Product type (from category or tags) if pd.notna(spu_row.get('category')): doc['product_type'] = str(spu_row['category']) doc['product_type_keyword'] = str(spu_row['category']) # Tags if pd.notna(spu_row.get('tags')): tags_str = str(spu_row['tags']) doc['tags'] = tags_str doc['tags_keyword'] = tags_str # Category if pd.notna(spu_row.get('category')): doc['category'] = str(spu_row['category']) doc['category_keyword'] = str(spu_row['category']) # Image URL if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): image_src = f"//{image_src}" if image_src.startswith('//') else image_src doc['image_url'] = image_src # Process variants variants = [] prices = [] compare_prices = [] for _, sku_row in skus.iterrows(): variant = self._transform_sku_to_variant(sku_row) if variant: variants.append(variant) if 'price' in variant and variant['price'] is not None: try: prices.append(float(variant['price'])) except (ValueError, TypeError): pass if 'compare_at_price' in variant and variant['compare_at_price'] is not None: try: compare_prices.append(float(variant['compare_at_price'])) except (ValueError, TypeError): pass doc['variants'] = variants # Calculate price ranges if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None return doc def _transform_sku_to_variant(self, sku_row: pd.Series) -> Optional[Dict[str, Any]]: """ Transform a SKU row into a variant object. Args: sku_row: SKU row from database Returns: Variant dictionary or None """ variant = {} # Variant ID variant['variant_id'] = str(sku_row['id']) # Title if pd.notna(sku_row.get('title')): variant['title'] = str(sku_row['title']) # Price if pd.notna(sku_row.get('price')): try: variant['price'] = float(sku_row['price']) except (ValueError, TypeError): variant['price'] = None else: variant['price'] = None # Compare at price if pd.notna(sku_row.get('compare_at_price')): try: variant['compare_at_price'] = float(sku_row['compare_at_price']) except (ValueError, TypeError): variant['compare_at_price'] = None else: variant['compare_at_price'] = None # SKU if pd.notna(sku_row.get('sku')): variant['sku'] = str(sku_row['sku']) # Stock if pd.notna(sku_row.get('inventory_quantity')): try: variant['stock'] = int(sku_row['inventory_quantity']) except (ValueError, TypeError): variant['stock'] = 0 else: variant['stock'] = 0 # Options (from option1, option2, option3) options = {} if pd.notna(sku_row.get('option1')): options['option1'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): options['option2'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): options['option3'] = str(sku_row['option3']) if options: variant['options'] = options return variant