Blame view

indexer/incremental_service.py 4.66 KB
3c1f8031   tangwang   api/routes/indexe...
1
  """增量数据获取服务"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
2
3
  
  import pandas as pd
0064e946   tangwang   feat: 增量索引服务、租户配置...
4
5
6
  import logging
  from typing import Dict, Any, Optional
  from sqlalchemy import text
3c1f8031   tangwang   api/routes/indexe...
7
  from indexer.indexing_utils import load_category_mapping, create_document_transformer
0064e946   tangwang   feat: 增量索引服务、租户配置...
8
9
10
11
12
13
  
  # Configure logger
  logger = logging.getLogger(__name__)
  
  
  class IncrementalIndexerService:
3c1f8031   tangwang   api/routes/indexe...
14
      """增量索引服务,提供SPU数据获取功能。"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
15
16
  
      def __init__(self, db_engine: Any):
3c1f8031   tangwang   api/routes/indexe...
17
          """初始化增量索引服务"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
18
19
20
          self.db_engine = db_engine
          
          # 预加载分类映射(全局,所有租户共享)
3c1f8031   tangwang   api/routes/indexe...
21
          self.category_id_to_name = load_category_mapping(db_engine)
0064e946   tangwang   feat: 增量索引服务、租户配置...
22
          logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
0064e946   tangwang   feat: 增量索引服务、租户配置...
23
24
  
      def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
3c1f8031   tangwang   api/routes/indexe...
25
          """获取SPU的ES文档数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
26
27
28
29
30
31
32
33
34
35
36
37
38
          try:
              # 加载SPU数据
              spu_row = self._load_single_spu(tenant_id, spu_id)
              if spu_row is None:
                  logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
                  return None
  
              # 加载SKU数据
              skus_df = self._load_skus_for_spu(tenant_id, spu_id)
  
              # 加载Option数据
              options_df = self._load_options_for_spu(tenant_id, spu_id)
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
39
              # 创建文档转换器
3c1f8031   tangwang   api/routes/indexe...
40
              transformer = create_document_transformer(
0064e946   tangwang   feat: 增量索引服务、租户配置...
41
                  category_id_to_name=self.category_id_to_name,
3c1f8031   tangwang   api/routes/indexe...
42
                  tenant_id=tenant_id
0064e946   tangwang   feat: 增量索引服务、租户配置...
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
              )
              
              # 转换为ES文档
              doc = transformer.transform_spu_to_doc(
                  tenant_id=tenant_id,
                  spu_row=spu_row,
                  skus=skus_df,
                  options=options_df
              )
              
              if doc is None:
                  logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
                  return None
  
              return doc
  
          except Exception as e:
              logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
              raise
  
      def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
3c1f8031   tangwang   api/routes/indexe...
64
          """加载单个SPU数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
          query = text("""
              SELECT 
                  id, shop_id, shoplazza_id, title, brief, description,
                  spu, vendor, vendor_url,
                  image_src, image_width, image_height, image_path, image_alt,
                  tags, note, category, category_id, category_google_id,
                  category_level, category_path,
                  fake_sales, display_fake_sales,
                  tenant_id, creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_spu
              WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
              LIMIT 1
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          if df.empty:
              return None
          
          return df.iloc[0]
  
      def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
3c1f8031   tangwang   api/routes/indexe...
88
          """加载指定SPU的所有SKU数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df
  
      def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
3c1f8031   tangwang   api/routes/indexe...
109
          """加载指定SPU的所有Option数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
110
111
112
113
114
115
116
117
118
119
120
121
122
123
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  position, name, `values`, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
              ORDER BY position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df