Blame view

indexer/incremental_service.py 8.39 KB
0064e946   tangwang   feat: 增量索引服务、租户配置...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
  """
  增量数据获取服务。
  
  提供单个SPU的数据获取接口,用于增量更新ES索引。
  公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。
  """
  
  import pandas as pd
  import numpy as np
  import logging
  from typing import Dict, Any, Optional
  from sqlalchemy import text
  from config import ConfigLoader
  from config.tenant_config_loader import get_tenant_config_loader
  from indexer.document_transformer import SPUDocumentTransformer
  
  # Configure logger
  logger = logging.getLogger(__name__)
  
  
  class IncrementalIndexerService:
      """增量索引服务,提供单个SPU数据获取功能。"""
  
      def __init__(self, db_engine: Any):
          """
          初始化增量索引服务。
  
          Args:
              db_engine: SQLAlchemy database engine
          """
          self.db_engine = db_engine
          
          # 预加载分类映射(全局,所有租户共享)
          self.category_id_to_name = self._load_category_mapping()
          logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
          
          # 租户配置加载器(延迟加载,按需获取租户配置)
          self.tenant_config_loader = get_tenant_config_loader()
  
      def _load_category_mapping(self) -> Dict[str, str]:
          """
          加载分类ID到名称的映射(全局,所有租户共享)。
          
          Returns:
              Dictionary mapping category_id to category_name
          """
          query = text("""
              SELECT DISTINCT
                  category_id,
                  category
              FROM shoplazza_product_spu
              WHERE deleted = 0 AND category_id IS NOT NULL
          """)
          
          mapping = {}
          try:
              with self.db_engine.connect() as conn:
                  result = conn.execute(query)
                  for row in result:
                      category_id = str(int(row.category_id))
                      category_name = row.category
                      
                      if not category_name or not category_name.strip():
                          logger.warning(f"Category ID {category_id} has empty name, skipping")
                          continue
                      
                      mapping[category_id] = category_name
          except Exception as e:
              logger.error(f"Failed to load category mapping: {e}", exc_info=True)
          
          return mapping
  
      def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
          """
          获取单个SPUES文档数据。
  
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
  
          Returns:
              ES文档字典,如果SPU不存在或已删除则返回None
          """
          try:
              # 加载SPU数据
              spu_row = self._load_single_spu(tenant_id, spu_id)
              if spu_row is None:
                  logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
                  return None
  
              # 加载SKU数据
              skus_df = self._load_skus_for_spu(tenant_id, spu_id)
  
              # 加载Option数据
              options_df = self._load_options_for_spu(tenant_id, spu_id)
  
              # 获取租户配置
              tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id)
              
              # 加载搜索配置
              translator = None
              translation_prompts = {}
              searchable_option_dimensions = ['option1', 'option2', 'option3']
              try:
                  config_loader = ConfigLoader()
                  config = config_loader.load_config()
                  searchable_option_dimensions = config.spu_config.searchable_option_dimensions
                  
                  # Initialize translator if translation is enabled
                  if config.query_config.enable_translation:
                      from query.translator import Translator
                      translator = Translator(
                          api_key=config.query_config.translation_api_key,
                          use_cache=True,  # 索引时使用缓存避免重复翻译
                          glossary_id=config.query_config.translation_glossary_id,
                          translation_context=config.query_config.translation_context
                      )
                      translation_prompts = config.query_config.translation_prompts
              except Exception as e:
                  logger.warning(f"Failed to load config, using default: {e}")
              
              # 创建文档转换器
              transformer = SPUDocumentTransformer(
                  category_id_to_name=self.category_id_to_name,
                  searchable_option_dimensions=searchable_option_dimensions,
                  tenant_config=tenant_config,
                  translator=translator,
                  translation_prompts=translation_prompts
              )
              
              # 转换为ES文档
              doc = transformer.transform_spu_to_doc(
                  tenant_id=tenant_id,
                  spu_row=spu_row,
                  skus=skus_df,
                  options=options_df
              )
              
              if doc is None:
                  logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
                  return None
  
              return doc
  
          except Exception as e:
              logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
              raise
  
      def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
          """
          加载单个SPU数据。
  
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
  
          Returns:
              SPU行数据,如果不存在则返回None
          """
          query = text("""
              SELECT 
                  id, shop_id, shoplazza_id, title, brief, description,
                  spu, vendor, vendor_url,
                  image_src, image_width, image_height, image_path, image_alt,
                  tags, note, category, category_id, category_google_id,
                  category_level, category_path,
                  fake_sales, display_fake_sales,
                  tenant_id, creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_spu
              WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
              LIMIT 1
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          if df.empty:
              return None
          
          return df.iloc[0]
  
      def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
          """
          加载指定SPU的所有SKU数据。
  
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
  
          Returns:
              SKU数据DataFrame
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df
  
      def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
          """
          加载指定SPU的所有Option数据。
  
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
  
          Returns:
              Option数据DataFrame
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  position, name, `values`, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
              ORDER BY position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df