Blame view

indexer/spu_transformer.py 13.7 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
  """
  SPU data transformer for Shoplazza products.
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
4
  Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
5
6
7
8
  """
  
  import pandas as pd
  import numpy as np
c973d288   tangwang   1. 类目字段处理
9
  import logging
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
10
11
12
  from typing import Dict, Any, List, Optional
  from sqlalchemy import create_engine, text
  from utils.db_connector import create_db_connection
33839b37   tangwang   属性值参与搜索:
13
  from config import ConfigLoader
0064e946   tangwang   feat: 增量索引服务、租户配置...
14
15
  from config.tenant_config_loader import get_tenant_config_loader
  from indexer.document_transformer import SPUDocumentTransformer
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
16
  
c973d288   tangwang   1. 类目字段处理
17
18
19
  # Configure logger
  logger = logging.getLogger(__name__)
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  
  class SPUTransformer:
      """Transform SPU and SKU data into SPU-level ES documents."""
  
      def __init__(
          self,
          db_engine: Any,
          tenant_id: str
      ):
          """
          Initialize SPU transformer.
  
          Args:
              db_engine: SQLAlchemy database engine
              tenant_id: Tenant ID for filtering data
          """
          self.db_engine = db_engine
          self.tenant_id = tenant_id
33839b37   tangwang   属性值参与搜索:
38
39
          
          # Load configuration to get searchable_option_dimensions
0064e946   tangwang   feat: 增量索引服务、租户配置...
40
41
          translator = None
          translation_prompts = {}
33839b37   tangwang   属性值参与搜索:
42
43
44
45
          try:
              config_loader = ConfigLoader()
              config = config_loader.load_config()
              self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
0064e946   tangwang   feat: 增量索引服务、租户配置...
46
47
48
49
50
51
52
53
54
55
56
              
              # Initialize translator if translation is enabled
              if config.query_config.enable_translation:
                  from query.translator import Translator
                  translator = Translator(
                      api_key=config.query_config.translation_api_key,
                      use_cache=True,  # 索引时使用缓存避免重复翻译
                      glossary_id=config.query_config.translation_glossary_id,
                      translation_context=config.query_config.translation_context
                  )
                  translation_prompts = config.query_config.translation_prompts
33839b37   tangwang   属性值参与搜索:
57
          except Exception as e:
0064e946   tangwang   feat: 增量索引服务、租户配置...
58
              logger.warning(f"Failed to load config, using default: {e}")
33839b37   tangwang   属性值参与搜索:
59
              self.searchable_option_dimensions = ['option1', 'option2', 'option3']
c973d288   tangwang   1. 类目字段处理
60
61
62
          
          # Load category ID to name mapping
          self.category_id_to_name = self._load_category_mapping()
0064e946   tangwang   feat: 增量索引服务、租户配置...
63
64
65
66
67
68
69
70
71
72
73
74
75
          
          # Load tenant config
          tenant_config_loader = get_tenant_config_loader()
          tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
          
          # Initialize document transformer
          self.document_transformer = SPUDocumentTransformer(
              category_id_to_name=self.category_id_to_name,
              searchable_option_dimensions=self.searchable_option_dimensions,
              tenant_config=tenant_config,
              translator=translator,
              translation_prompts=translation_prompts
          )
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
76
  
c973d288   tangwang   1. 类目字段处理
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
      def _load_category_mapping(self) -> Dict[str, str]:
          """
          Load category ID to name mapping from database.
          
          Returns:
              Dictionary mapping category_id to category_name
          """
          query = text("""
              SELECT DISTINCT
                  category_id,
                  category
              FROM shoplazza_product_spu
              WHERE deleted = 0 AND category_id IS NOT NULL
          """)
          
          mapping = {}
          with self.db_engine.connect() as conn:
              result = conn.execute(query)
              for row in result:
                  category_id = str(int(row.category_id))
                  category_name = row.category
                  
                  if not category_name or not category_name.strip():
                      logger.warning(f"Category ID {category_id} has empty name, skipping")
                      continue
                  
                  mapping[category_id] = category_name
          
          logger.info(f"Loaded {len(mapping)} category ID to name mappings")
          
          # Log all category mappings for debugging
          if mapping:
              logger.debug("Category ID mappings:")
              for cid, name in sorted(mapping.items()):
                  logger.debug(f"  {cid} -> {name}")
          
          return mapping
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
115
116
117
118
119
120
121
122
123
      def load_spu_data(self) -> pd.DataFrame:
          """
          Load SPU data from MySQL.
  
          Returns:
              DataFrame with SPU data
          """
          query = text("""
              SELECT 
5dcddc06   tangwang   索引重构
124
125
                  id, shop_id, shoplazza_id, title, brief, description,
                  spu, vendor, vendor_url,
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
126
                  image_src, image_width, image_height, image_path, image_alt,
5dcddc06   tangwang   索引重构
127
128
                  tags, note, category, category_id, category_google_id,
                  category_level, category_path,
13320ac6   tangwang   分面接口修改:
129
                  fake_sales, display_fake_sales,
5dcddc06   tangwang   索引重构
130
                  tenant_id, creator, create_time, updater, update_time, deleted
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
131
132
133
134
135
136
137
              FROM shoplazza_product_spu
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
          logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_category_path = df['category_path'].notna().sum()
              has_category = df['category'].notna().sum()
              has_title = df['title'].notna().sum()
              
              logger.info(f"SPU data statistics:")
              logger.info(f"  - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
              logger.info(f"  - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
              logger.info(f"  - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
              
              # Warn if too many SPUs don't have category_path
              if has_category_path < len(df) * 0.5:
                  logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
          else:
              logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
              
              # Debug: Check if there's any data for this tenant_id
8cff1628   tangwang   tenant2 1w测试数据 mo...
158
159
160
161
162
163
164
165
166
167
168
169
170
171
              debug_query = text("""
                  SELECT 
                      COUNT(*) as total_count,
                      SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
                      SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
                  FROM shoplazza_product_spu
                  WHERE tenant_id = :tenant_id
              """)
              with self.db_engine.connect() as conn:
                  debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
              if not debug_df.empty:
                  total = debug_df.iloc[0]['total_count']
                  active = debug_df.iloc[0]['active_count']
                  deleted = debug_df.iloc[0]['deleted_count']
c973d288   tangwang   1. 类目字段处理
172
                  logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
173
174
175
176
177
178
179
180
181
182
183
184
              
              # Check what tenant_ids exist in the table
              tenant_check_query = text("""
                  SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
                  FROM shoplazza_product_spu
                  GROUP BY tenant_id
                  ORDER BY tenant_id
                  LIMIT 10
              """)
              with self.db_engine.connect() as conn:
                  tenant_df = pd.read_sql(tenant_check_query, conn)
              if not tenant_df.empty:
c973d288   tangwang   1. 类目字段处理
185
                  logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
8cff1628   tangwang   tenant2 1w测试数据 mo...
186
                  for _, row in tenant_df.iterrows():
c973d288   tangwang   1. 类目字段处理
187
                      logger.debug(f"  tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
188
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
          return df
  
      def load_sku_data(self) -> pd.DataFrame:
          """
          Load SKU data from MySQL.
  
          Returns:
              DataFrame with SKU data
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
          logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_price = df['price'].notna().sum()
              has_inventory = df['inventory_quantity'].notna().sum()
              has_option1 = df['option1'].notna().sum()
              has_option2 = df['option2'].notna().sum()
              has_option3 = df['option3'].notna().sum()
              
              logger.info(f"SKU data statistics:")
              logger.info(f"  - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
              logger.info(f"  - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
              logger.info(f"  - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
              logger.info(f"  - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
              logger.info(f"  - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
              
              # Warn about data quality issues
              if has_price < len(df) * 0.95:
                  logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
8cff1628   tangwang   tenant2 1w测试数据 mo...
235
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
236
237
          return df
  
5dcddc06   tangwang   索引重构
238
239
240
241
242
243
244
245
246
247
      def load_option_data(self) -> pd.DataFrame:
          """
          Load option data from MySQL.
  
          Returns:
              DataFrame with option data (name, position for each SPU)
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
bf89b597   tangwang   feat(search): ada...
248
                  position, name, `values`, tenant_id,
5dcddc06   tangwang   索引重构
249
250
251
252
253
254
255
256
257
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND deleted = 0
              ORDER BY spu_id, position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
          logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              unique_spus_with_options = df['spu_id'].nunique()
              has_name = df['name'].notna().sum()
              
              logger.info(f"Option data statistics:")
              logger.info(f"  - Unique SPUs with options: {unique_spus_with_options}")
              logger.info(f"  - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
              
              # Warn about missing option names
              if has_name < len(df):
                  missing = len(df) - has_name
                  logger.warning(f"{missing} option records are missing names")
5dcddc06   tangwang   索引重构
273
274
275
          
          return df
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
276
277
278
279
280
281
282
      def transform_batch(self) -> List[Dict[str, Any]]:
          """
          Transform SPU and SKU data into ES documents.
  
          Returns:
              List of SPU-level ES documents
          """
c973d288   tangwang   1. 类目字段处理
283
284
          logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
285
286
287
          # Load data
          spu_df = self.load_spu_data()
          sku_df = self.load_sku_data()
5dcddc06   tangwang   索引重构
288
          option_df = self.load_option_data()
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
289
290
  
          if spu_df.empty:
c973d288   tangwang   1. 类目字段处理
291
              logger.warning("No SPU data to transform")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
292
293
294
295
              return []
  
          # Group SKUs by SPU
          sku_groups = sku_df.groupby('spu_id')
c973d288   tangwang   1. 类目字段处理
296
          logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
5dcddc06   tangwang   索引重构
297
298
299
          
          # Group options by SPU
          option_groups = option_df.groupby('spu_id') if not option_df.empty else None
c973d288   tangwang   1. 类目字段处理
300
301
          if option_groups:
              logger.info(f"Grouped options into {len(option_groups)} SPU groups")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
302
303
  
          documents = []
c973d288   tangwang   1. 类目字段处理
304
305
306
307
          skipped_count = 0
          error_count = 0
          
          for idx, spu_row in spu_df.iterrows():
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
308
309
              spu_id = spu_row['id']
              
c973d288   tangwang   1. 类目字段处理
310
311
312
313
314
315
316
317
318
319
320
321
              try:
                  # Get SKUs for this SPU
                  skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
                  
                  # Get options for this SPU
                  options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
                  
                  # Warn if SPU has no SKUs
                  if skus.empty:
                      logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
                  
                  # Transform to ES document
0064e946   tangwang   feat: 增量索引服务、租户配置...
322
323
324
325
326
327
                  doc = self.document_transformer.transform_spu_to_doc(
                      tenant_id=self.tenant_id,
                      spu_row=spu_row,
                      skus=skus,
                      options=options
                  )
c973d288   tangwang   1. 类目字段处理
328
329
330
331
332
333
334
335
                  if doc:
                      documents.append(doc)
                  else:
                      skipped_count += 1
                      logger.warning(f"SPU {spu_id} transformation returned None, skipped")
              except Exception as e:
                  error_count += 1
                  logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
336
  
c973d288   tangwang   1. 类目字段处理
337
338
339
340
341
342
          logger.info(f"Transformation complete:")
          logger.info(f"  - Total SPUs: {len(spu_df)}")
          logger.info(f"  - Successfully transformed: {len(documents)}")
          logger.info(f"  - Skipped: {skipped_count}")
          logger.info(f"  - Errors: {error_count}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
343
          return documents
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...