incremental_service.py 8.39 KB
"""
增量数据获取服务。

提供单个SPU的数据获取接口,用于增量更新ES索引。
公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。
"""

import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, Optional
from sqlalchemy import text
from config import ConfigLoader
from config.tenant_config_loader import get_tenant_config_loader
from indexer.document_transformer import SPUDocumentTransformer

# Configure logger
logger = logging.getLogger(__name__)


class IncrementalIndexerService:
    """增量索引服务,提供单个SPU数据获取功能。"""

    def __init__(self, db_engine: Any):
        """
        初始化增量索引服务。

        Args:
            db_engine: SQLAlchemy database engine
        """
        self.db_engine = db_engine
        
        # 预加载分类映射(全局,所有租户共享)
        self.category_id_to_name = self._load_category_mapping()
        logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
        
        # 租户配置加载器(延迟加载,按需获取租户配置)
        self.tenant_config_loader = get_tenant_config_loader()

    def _load_category_mapping(self) -> Dict[str, str]:
        """
        加载分类ID到名称的映射(全局,所有租户共享)。
        
        Returns:
            Dictionary mapping category_id to category_name
        """
        query = text("""
            SELECT DISTINCT
                category_id,
                category
            FROM shoplazza_product_spu
            WHERE deleted = 0 AND category_id IS NOT NULL
        """)
        
        mapping = {}
        try:
            with self.db_engine.connect() as conn:
                result = conn.execute(query)
                for row in result:
                    category_id = str(int(row.category_id))
                    category_name = row.category
                    
                    if not category_name or not category_name.strip():
                        logger.warning(f"Category ID {category_id} has empty name, skipping")
                        continue
                    
                    mapping[category_id] = category_name
        except Exception as e:
            logger.error(f"Failed to load category mapping: {e}", exc_info=True)
        
        return mapping

    def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
        """
        获取单个SPU的ES文档数据。

        Args:
            tenant_id: 租户ID
            spu_id: SPU ID

        Returns:
            ES文档字典,如果SPU不存在或已删除则返回None
        """
        try:
            # 加载SPU数据
            spu_row = self._load_single_spu(tenant_id, spu_id)
            if spu_row is None:
                logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
                return None

            # 加载SKU数据
            skus_df = self._load_skus_for_spu(tenant_id, spu_id)

            # 加载Option数据
            options_df = self._load_options_for_spu(tenant_id, spu_id)

            # 获取租户配置
            tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id)
            
            # 加载搜索配置
            translator = None
            translation_prompts = {}
            searchable_option_dimensions = ['option1', 'option2', 'option3']
            try:
                config_loader = ConfigLoader()
                config = config_loader.load_config()
                searchable_option_dimensions = config.spu_config.searchable_option_dimensions
                
                # Initialize translator if translation is enabled
                if config.query_config.enable_translation:
                    from query.translator import Translator
                    translator = Translator(
                        api_key=config.query_config.translation_api_key,
                        use_cache=True,  # 索引时使用缓存避免重复翻译
                        glossary_id=config.query_config.translation_glossary_id,
                        translation_context=config.query_config.translation_context
                    )
                    translation_prompts = config.query_config.translation_prompts
            except Exception as e:
                logger.warning(f"Failed to load config, using default: {e}")
            
            # 创建文档转换器
            transformer = SPUDocumentTransformer(
                category_id_to_name=self.category_id_to_name,
                searchable_option_dimensions=searchable_option_dimensions,
                tenant_config=tenant_config,
                translator=translator,
                translation_prompts=translation_prompts
            )
            
            # 转换为ES文档
            doc = transformer.transform_spu_to_doc(
                tenant_id=tenant_id,
                spu_row=spu_row,
                skus=skus_df,
                options=options_df
            )
            
            if doc is None:
                logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
                return None

            return doc

        except Exception as e:
            logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
            raise

    def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
        """
        加载单个SPU数据。

        Args:
            tenant_id: 租户ID
            spu_id: SPU ID

        Returns:
            SPU行数据,如果不存在则返回None
        """
        query = text("""
            SELECT 
                id, shop_id, shoplazza_id, title, brief, description,
                spu, vendor, vendor_url,
                image_src, image_width, image_height, image_path, image_alt,
                tags, note, category, category_id, category_google_id,
                category_level, category_path,
                fake_sales, display_fake_sales,
                tenant_id, creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_spu
            WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
            LIMIT 1
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        if df.empty:
            return None
        
        return df.iloc[0]

    def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
        """
        加载指定SPU的所有SKU数据。

        Args:
            tenant_id: 租户ID
            spu_id: SPU ID

        Returns:
            SKU数据DataFrame
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                shoplazza_image_id, title, sku, barcode, position,
                price, compare_at_price, cost_price,
                option1, option2, option3,
                inventory_quantity, weight, weight_unit, image_src,
                wholesale_price, note, extend,
                shoplazza_created_at, shoplazza_updated_at, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_sku
            WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        return df

    def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
        """
        加载指定SPU的所有Option数据。

        Args:
            tenant_id: 租户ID
            spu_id: SPU ID

        Returns:
            Option数据DataFrame
        """
        query = text("""
            SELECT 
                id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                position, name, `values`, tenant_id,
                creator, create_time, updater, update_time, deleted
            FROM shoplazza_product_option
            WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
            ORDER BY position
        """)
        
        with self.db_engine.connect() as conn:
            df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
        
        return df