incremental_service.py 4.66 KB
"""增量数据获取服务"""

import pandas as pd
import logging
from typing import Dict, Any, Optional
from sqlalchemy import text
from indexer.indexing_utils import load_category_mapping, create_document_transformer

# Configure logger
logger = logging.getLogger(__name__)


class IncrementalIndexerService:
    """增量索引服务,提供SPU数据获取功能。"""

    def __init__(self, db_engine: Any):
        """初始化增量索引服务"""
        self.db_engine = db_engine
        
        # 预加载分类映射(全局,所有租户共享)
        self.category_id_to_name = load_category_mapping(db_engine)
        logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")

    def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
        """获取SPU的ES文档数据"""
        try:
            # 加载SPU数据
            spu_row = self._load_single_spu(tenant_id, spu_id)
            if spu_row is None:
                logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
                return None

            # 加载SKU数据
            skus_df = self._load_skus_for_spu(tenant_id, spu_id)

            # 加载Option数据
            options_df = self._load_options_for_spu(tenant_id, spu_id)

            # 创建文档转换器
            transformer = create_document_transformer(
                category_id_to_name=self.category_id_to_name,
                tenant_id=tenant_id
            )
            
            # 转换为ES文档
            doc = transformer.transform_spu_to_doc(
                tenant_id=tenant_id,
                spu_row=spu_row,
                skus=skus_df,
                options=options_df
            )
            
            if doc is None:
                logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
                return None

            return doc

        except Exception as e:
            logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
            raise

    def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
        """加载单个SPU数据"""
        query = text("""
            SELECT 
                id, shop_id, shoplazza_id, title, brief, description,
                spu, vendor, vendor_url,
                image_src, image_width, image_height, image_path, image_alt,
                tags, note, category, category_id, category_google_id,
                category_level, category_path,
                fake_sales, display_fake_sales,
                tenant_id, creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_spu
            WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
            LIMIT 1
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        if df.empty:
            return None
        
        return df.iloc[0]

    def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
        """加载指定SPU的所有SKU数据"""
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                shoplazza_image_id, title, sku, barcode, position,
                price, compare_at_price, cost_price,
                option1, option2, option3,
                inventory_quantity, weight, weight_unit, image_src,
                wholesale_price, note, extend,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_sku
            WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        return df

    def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
        """加载指定SPU的所有Option数据"""
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                position, name, `values`, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_option
            WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
            ORDER BY position
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        return df