""" 增量数据获取服务。 提供单个SPU的数据获取接口,用于增量更新ES索引。 公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。 """ import pandas as pd import numpy as np import logging from typing import Dict, Any, Optional from sqlalchemy import text from config import ConfigLoader from config.tenant_config_loader import get_tenant_config_loader from indexer.document_transformer import SPUDocumentTransformer # Configure logger logger = logging.getLogger(__name__) class IncrementalIndexerService: """增量索引服务,提供单个SPU数据获取功能。""" def __init__(self, db_engine: Any): """ 初始化增量索引服务。 Args: db_engine: SQLAlchemy database engine """ self.db_engine = db_engine # 预加载分类映射(全局,所有租户共享) self.category_id_to_name = self._load_category_mapping() logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") # 租户配置加载器(延迟加载,按需获取租户配置) self.tenant_config_loader = get_tenant_config_loader() def _load_category_mapping(self) -> Dict[str, str]: """ 加载分类ID到名称的映射(全局,所有租户共享)。 Returns: Dictionary mapping category_id to category_name """ query = text(""" SELECT DISTINCT category_id, category FROM shoplazza_product_spu WHERE deleted = 0 AND category_id IS NOT NULL """) mapping = {} try: with self.db_engine.connect() as conn: result = conn.execute(query) for row in result: category_id = str(int(row.category_id)) category_name = row.category if not category_name or not category_name.strip(): logger.warning(f"Category ID {category_id} has empty name, skipping") continue mapping[category_id] = category_name except Exception as e: logger.error(f"Failed to load category mapping: {e}", exc_info=True) return mapping def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: """ 获取单个SPU的ES文档数据。 Args: tenant_id: 租户ID spu_id: SPU ID Returns: ES文档字典,如果SPU不存在或已删除则返回None """ try: # 加载SPU数据 spu_row = self._load_single_spu(tenant_id, spu_id) if spu_row is None: logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}") return None # 加载SKU数据 skus_df = self._load_skus_for_spu(tenant_id, spu_id) # 加载Option数据 options_df = self._load_options_for_spu(tenant_id, spu_id) # 获取租户配置 tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id) # 加载搜索配置 translator = None translation_prompts = {} searchable_option_dimensions = ['option1', 'option2', 'option3'] try: config_loader = ConfigLoader() config = config_loader.load_config() searchable_option_dimensions = config.spu_config.searchable_option_dimensions # Initialize translator if translation is enabled if config.query_config.enable_translation: from query.translator import Translator translator = Translator( api_key=config.query_config.translation_api_key, use_cache=True, # 索引时使用缓存避免重复翻译 glossary_id=config.query_config.translation_glossary_id, translation_context=config.query_config.translation_context ) translation_prompts = config.query_config.translation_prompts except Exception as e: logger.warning(f"Failed to load config, using default: {e}") # 创建文档转换器 transformer = SPUDocumentTransformer( category_id_to_name=self.category_id_to_name, searchable_option_dimensions=searchable_option_dimensions, tenant_config=tenant_config, translator=translator, translation_prompts=translation_prompts ) # 转换为ES文档 doc = transformer.transform_spu_to_doc( tenant_id=tenant_id, spu_row=spu_row, skus=skus_df, options=options_df ) if doc is None: logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") return None return doc except Exception as e: logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) raise def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]: """ 加载单个SPU数据。 Args: tenant_id: 租户ID spu_id: SPU ID Returns: SPU行数据,如果不存在则返回None """ query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 LIMIT 1 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) if df.empty: return None return df.iloc[0] def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """ 加载指定SPU的所有SKU数据。 Args: tenant_id: 租户ID spu_id: SPU ID Returns: SKU数据DataFrame """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """ 加载指定SPU的所有Option数据。 Args: tenant_id: 租户ID spu_id: SPU ID Returns: Option数据DataFrame """ query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 ORDER BY position """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df