"""增量数据获取服务""" import pandas as pd import logging from typing import Dict, Any, Optional from sqlalchemy import text from indexer.indexing_utils import load_category_mapping, create_document_transformer # Configure logger logger = logging.getLogger(__name__) class IncrementalIndexerService: """增量索引服务,提供SPU数据获取功能。""" def __init__(self, db_engine: Any): """初始化增量索引服务""" self.db_engine = db_engine # 预加载分类映射(全局,所有租户共享) self.category_id_to_name = load_category_mapping(db_engine) logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: """获取SPU的ES文档数据""" try: # 加载SPU数据 spu_row = self._load_single_spu(tenant_id, spu_id) if spu_row is None: logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}") return None # 加载SKU数据 skus_df = self._load_skus_for_spu(tenant_id, spu_id) # 加载Option数据 options_df = self._load_options_for_spu(tenant_id, spu_id) # 创建文档转换器 transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, tenant_id=tenant_id ) # 转换为ES文档 doc = transformer.transform_spu_to_doc( tenant_id=tenant_id, spu_row=spu_row, skus=skus_df, options=options_df ) if doc is None: logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") return None return doc except Exception as e: logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) raise def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]: """加载单个SPU数据""" query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 LIMIT 1 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) if df.empty: return None return df.iloc[0] def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """加载指定SPU的所有SKU数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """加载指定SPU的所有Option数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 ORDER BY position """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df