Blame view

indexer/spu_transformer.py 26.9 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
  """
  SPU data transformer for Shoplazza products.
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
4
  Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
5
6
7
8
  """
  
  import pandas as pd
  import numpy as np
c973d288   tangwang   1. 类目字段处理
9
  import logging
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
10
11
12
  from typing import Dict, Any, List, Optional
  from sqlalchemy import create_engine, text
  from utils.db_connector import create_db_connection
33839b37   tangwang   属性值参与搜索:
13
  from config import ConfigLoader
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
14
  
c973d288   tangwang   1. 类目字段处理
15
16
17
  # Configure logger
  logger = logging.getLogger(__name__)
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  
  class SPUTransformer:
      """Transform SPU and SKU data into SPU-level ES documents."""
  
      def __init__(
          self,
          db_engine: Any,
          tenant_id: str
      ):
          """
          Initialize SPU transformer.
  
          Args:
              db_engine: SQLAlchemy database engine
              tenant_id: Tenant ID for filtering data
          """
          self.db_engine = db_engine
          self.tenant_id = tenant_id
33839b37   tangwang   属性值参与搜索:
36
37
38
39
40
41
42
43
44
          
          # Load configuration to get searchable_option_dimensions
          try:
              config_loader = ConfigLoader()
              config = config_loader.load_config()
              self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
          except Exception as e:
              print(f"Warning: Failed to load config, using default searchable_option_dimensions: {e}")
              self.searchable_option_dimensions = ['option1', 'option2', 'option3']
c973d288   tangwang   1. 类目字段处理
45
46
47
          
          # Load category ID to name mapping
          self.category_id_to_name = self._load_category_mapping()
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
48
  
c973d288   tangwang   1. 类目字段处理
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
      def _load_category_mapping(self) -> Dict[str, str]:
          """
          Load category ID to name mapping from database.
          
          Returns:
              Dictionary mapping category_id to category_name
          """
          query = text("""
              SELECT DISTINCT
                  category_id,
                  category
              FROM shoplazza_product_spu
              WHERE deleted = 0 AND category_id IS NOT NULL
          """)
          
          mapping = {}
          with self.db_engine.connect() as conn:
              result = conn.execute(query)
              for row in result:
                  category_id = str(int(row.category_id))
                  category_name = row.category
                  
                  if not category_name or not category_name.strip():
                      logger.warning(f"Category ID {category_id} has empty name, skipping")
                      continue
                  
                  mapping[category_id] = category_name
          
          logger.info(f"Loaded {len(mapping)} category ID to name mappings")
          
          # Log all category mappings for debugging
          if mapping:
              logger.debug("Category ID mappings:")
              for cid, name in sorted(mapping.items()):
                  logger.debug(f"  {cid} -> {name}")
          
          return mapping
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
87
88
89
90
91
92
93
94
95
      def load_spu_data(self) -> pd.DataFrame:
          """
          Load SPU data from MySQL.
  
          Returns:
              DataFrame with SPU data
          """
          query = text("""
              SELECT 
5dcddc06   tangwang   索引重构
96
97
                  id, shop_id, shoplazza_id, title, brief, description,
                  spu, vendor, vendor_url,
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
98
                  image_src, image_width, image_height, image_path, image_alt,
5dcddc06   tangwang   索引重构
99
100
                  tags, note, category, category_id, category_google_id,
                  category_level, category_path,
13320ac6   tangwang   分面接口修改:
101
                  fake_sales, display_fake_sales,
5dcddc06   tangwang   索引重构
102
                  tenant_id, creator, create_time, updater, update_time, deleted
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
103
104
105
106
107
108
109
              FROM shoplazza_product_spu
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
          logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_category_path = df['category_path'].notna().sum()
              has_category = df['category'].notna().sum()
              has_title = df['title'].notna().sum()
              
              logger.info(f"SPU data statistics:")
              logger.info(f"  - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
              logger.info(f"  - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
              logger.info(f"  - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
              
              # Warn if too many SPUs don't have category_path
              if has_category_path < len(df) * 0.5:
                  logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
          else:
              logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
              
              # Debug: Check if there's any data for this tenant_id
8cff1628   tangwang   tenant2 1w测试数据 mo...
130
131
132
133
134
135
136
137
138
139
140
141
142
143
              debug_query = text("""
                  SELECT 
                      COUNT(*) as total_count,
                      SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
                      SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
                  FROM shoplazza_product_spu
                  WHERE tenant_id = :tenant_id
              """)
              with self.db_engine.connect() as conn:
                  debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
              if not debug_df.empty:
                  total = debug_df.iloc[0]['total_count']
                  active = debug_df.iloc[0]['active_count']
                  deleted = debug_df.iloc[0]['deleted_count']
c973d288   tangwang   1. 类目字段处理
144
                  logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
145
146
147
148
149
150
151
152
153
154
155
156
              
              # Check what tenant_ids exist in the table
              tenant_check_query = text("""
                  SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
                  FROM shoplazza_product_spu
                  GROUP BY tenant_id
                  ORDER BY tenant_id
                  LIMIT 10
              """)
              with self.db_engine.connect() as conn:
                  tenant_df = pd.read_sql(tenant_check_query, conn)
              if not tenant_df.empty:
c973d288   tangwang   1. 类目字段处理
157
                  logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
8cff1628   tangwang   tenant2 1w测试数据 mo...
158
                  for _, row in tenant_df.iterrows():
c973d288   tangwang   1. 类目字段处理
159
                      logger.debug(f"  tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
160
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
          return df
  
      def load_sku_data(self) -> pd.DataFrame:
          """
          Load SKU data from MySQL.
  
          Returns:
              DataFrame with SKU data
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
          logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_price = df['price'].notna().sum()
              has_inventory = df['inventory_quantity'].notna().sum()
              has_option1 = df['option1'].notna().sum()
              has_option2 = df['option2'].notna().sum()
              has_option3 = df['option3'].notna().sum()
              
              logger.info(f"SKU data statistics:")
              logger.info(f"  - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
              logger.info(f"  - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
              logger.info(f"  - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
              logger.info(f"  - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
              logger.info(f"  - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
              
              # Warn about data quality issues
              if has_price < len(df) * 0.95:
                  logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
8cff1628   tangwang   tenant2 1w测试数据 mo...
207
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
208
209
          return df
  
5dcddc06   tangwang   索引重构
210
211
212
213
214
215
216
217
218
219
      def load_option_data(self) -> pd.DataFrame:
          """
          Load option data from MySQL.
  
          Returns:
              DataFrame with option data (name, position for each SPU)
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
bf89b597   tangwang   feat(search): ada...
220
                  position, name, `values`, tenant_id,
5dcddc06   tangwang   索引重构
221
222
223
224
225
226
227
228
229
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND deleted = 0
              ORDER BY spu_id, position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
          logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              unique_spus_with_options = df['spu_id'].nunique()
              has_name = df['name'].notna().sum()
              
              logger.info(f"Option data statistics:")
              logger.info(f"  - Unique SPUs with options: {unique_spus_with_options}")
              logger.info(f"  - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
              
              # Warn about missing option names
              if has_name < len(df):
                  missing = len(df) - has_name
                  logger.warning(f"{missing} option records are missing names")
5dcddc06   tangwang   索引重构
245
246
247
          
          return df
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
248
249
250
251
252
253
254
      def transform_batch(self) -> List[Dict[str, Any]]:
          """
          Transform SPU and SKU data into ES documents.
  
          Returns:
              List of SPU-level ES documents
          """
c973d288   tangwang   1. 类目字段处理
255
256
          logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
257
258
259
          # Load data
          spu_df = self.load_spu_data()
          sku_df = self.load_sku_data()
5dcddc06   tangwang   索引重构
260
          option_df = self.load_option_data()
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
261
262
  
          if spu_df.empty:
c973d288   tangwang   1. 类目字段处理
263
              logger.warning("No SPU data to transform")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
264
265
266
267
              return []
  
          # Group SKUs by SPU
          sku_groups = sku_df.groupby('spu_id')
c973d288   tangwang   1. 类目字段处理
268
          logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
5dcddc06   tangwang   索引重构
269
270
271
          
          # Group options by SPU
          option_groups = option_df.groupby('spu_id') if not option_df.empty else None
c973d288   tangwang   1. 类目字段处理
272
273
          if option_groups:
              logger.info(f"Grouped options into {len(option_groups)} SPU groups")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
274
275
  
          documents = []
c973d288   tangwang   1. 类目字段处理
276
277
278
279
          skipped_count = 0
          error_count = 0
          
          for idx, spu_row in spu_df.iterrows():
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
280
281
              spu_id = spu_row['id']
              
c973d288   tangwang   1. 类目字段处理
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
              try:
                  # Get SKUs for this SPU
                  skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
                  
                  # Get options for this SPU
                  options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
                  
                  # Warn if SPU has no SKUs
                  if skus.empty:
                      logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
                  
                  # Transform to ES document
                  doc = self._transform_spu_to_doc(spu_row, skus, options)
                  if doc:
                      documents.append(doc)
                  else:
                      skipped_count += 1
                      logger.warning(f"SPU {spu_id} transformation returned None, skipped")
              except Exception as e:
                  error_count += 1
                  logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
303
  
c973d288   tangwang   1. 类目字段处理
304
305
306
307
308
309
          logger.info(f"Transformation complete:")
          logger.info(f"  - Total SPUs: {len(spu_df)}")
          logger.info(f"  - Successfully transformed: {len(documents)}")
          logger.info(f"  - Skipped: {skipped_count}")
          logger.info(f"  - Errors: {error_count}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
310
311
312
313
314
          return documents
  
      def _transform_spu_to_doc(
          self,
          spu_row: pd.Series,
5dcddc06   tangwang   索引重构
315
316
          skus: pd.DataFrame,
          options: pd.DataFrame
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
317
318
319
320
321
322
323
      ) -> Optional[Dict[str, Any]]:
          """
          Transform a single SPU row and its SKUs into an ES document.
  
          Args:
              spu_row: SPU row from database
              skus: DataFrame with SKUs for this SPU
5dcddc06   tangwang   索引重构
324
              options: DataFrame with options for this SPU
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
325
326
327
328
329
330
331
332
333
  
          Returns:
              ES document or None if transformation fails
          """
          doc = {}
  
          # Tenant ID (required)
          doc['tenant_id'] = str(self.tenant_id)
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
334
          # SPU ID
c973d288   tangwang   1. 类目字段处理
335
336
337
338
339
340
          spu_id = spu_row['id']
          doc['spu_id'] = str(spu_id)
          
          # Validate required fields
          if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip():
              logger.error(f"SPU {spu_id} has no title, this may cause search issues")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
341
  
5dcddc06   tangwang   索引重构
342
          # 文本相关性相关字段(中英文双语,暂时只填充中文)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
343
          if pd.notna(spu_row.get('title')):
5dcddc06   tangwang   索引重构
344
345
              doc['title_zh'] = str(spu_row['title'])
          doc['title_en'] = None  # 暂时设为空
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
346
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
347
          if pd.notna(spu_row.get('brief')):
5dcddc06   tangwang   索引重构
348
349
              doc['brief_zh'] = str(spu_row['brief'])
          doc['brief_en'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
350
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
351
          if pd.notna(spu_row.get('description')):
5dcddc06   tangwang   索引重构
352
353
              doc['description_zh'] = str(spu_row['description'])
          doc['description_en'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
354
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
355
          if pd.notna(spu_row.get('vendor')):
5dcddc06   tangwang   索引重构
356
357
              doc['vendor_zh'] = str(spu_row['vendor'])
          doc['vendor_en'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
358
359
360
  
          # Tags
          if pd.notna(spu_row.get('tags')):
5dcddc06   tangwang   索引重构
361
362
363
364
365
366
367
              # Tags是逗号分隔的字符串,需要转换为数组
              tags_str = str(spu_row['tags'])
              doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()]
  
          # Category相关字段
          if pd.notna(spu_row.get('category_path')):
              category_path = str(spu_row['category_path'])
5dcddc06   tangwang   索引重构
368
              
c973d288   tangwang   1. 类目字段处理
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
              # 解析category_path - 这是逗号分隔的类目ID列表
              category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()]
              
              # 将ID映射为名称
              category_names = []
              missing_category_ids = []
              for cid in category_ids:
                  if cid in self.category_id_to_name:
                      category_names.append(self.category_id_to_name[cid])
                  else:
                      # 如果找不到映射,记录错误并使用ID作为备选
                      logger.error(f"Category ID {cid} not found in mapping for SPU {spu_row['id']} (title: {spu_row.get('title', 'N/A')}), category_path={category_path}")
                      missing_category_ids.append(cid)
                      category_names.append(cid)  # 使用ID作为备选
              
              # 构建类目路径字符串(用于搜索)
              if category_names:
                  category_path_str = '/'.join(category_names)
                  doc['category_path_zh'] = category_path_str
                  doc['category_path_en'] = None  # 暂时设为空
                  
                  # 填充分层类目名称
                  if len(category_names) > 0:
                      doc['category1_name'] = category_names[0]
                  if len(category_names) > 1:
                      doc['category2_name'] = category_names[1]
                  if len(category_names) > 2:
                      doc['category3_name'] = category_names[2]
a10a89a3   tangwang   构造测试数据用于测试分类 和 三种...
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
          elif pd.notna(spu_row.get('category')):
              # 如果category_path为空,使用category字段作为category1_name的备选
              category = str(spu_row['category'])
              doc['category_name_zh'] = category
              doc['category_name_en'] = None
              doc['category_name'] = category
              
              # 尝试从category字段解析多级分类
              if '/' in category:
                  path_parts = category.split('/')
                  if len(path_parts) > 0:
                      doc['category1_name'] = path_parts[0].strip()
                  if len(path_parts) > 1:
                      doc['category2_name'] = path_parts[1].strip()
                  if len(path_parts) > 2:
                      doc['category3_name'] = path_parts[2].strip()
              else:
                  # 如果category不包含"/",直接作为category1_name
                  doc['category1_name'] = category.strip()
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
416
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
417
          if pd.notna(spu_row.get('category')):
a10a89a3   tangwang   构造测试数据用于测试分类 和 三种...
418
              # 确保category相关字段都被设置(如果前面没有设置)
5dcddc06   tangwang   索引重构
419
              category_name = str(spu_row['category'])
a10a89a3   tangwang   构造测试数据用于测试分类 和 三种...
420
421
422
423
424
425
              if 'category_name_zh' not in doc:
                  doc['category_name_zh'] = category_name
              if 'category_name_en' not in doc:
                  doc['category_name_en'] = None
              if 'category_name' not in doc:
                  doc['category_name'] = category_name
5dcddc06   tangwang   索引重构
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
  
          if pd.notna(spu_row.get('category_id')):
              doc['category_id'] = str(int(spu_row['category_id']))
  
          if pd.notna(spu_row.get('category_level')):
              doc['category_level'] = int(spu_row['category_level'])
  
          # Option名称(从option表获取)
          if not options.empty:
              # 按position排序获取option名称
              sorted_options = options.sort_values('position')
              if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')):
                  doc['option1_name'] = str(sorted_options.iloc[0]['name'])
              if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')):
                  doc['option2_name'] = str(sorted_options.iloc[1]['name'])
              if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')):
                  doc['option3_name'] = str(sorted_options.iloc[2]['name'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
443
444
445
446
447
448
449
450
  
          # Image URL
          if pd.notna(spu_row.get('image_src')):
              image_src = str(spu_row['image_src'])
              if not image_src.startswith('http'):
                  image_src = f"//{image_src}" if image_src.startswith('//') else image_src
              doc['image_url'] = image_src
  
13320ac6   tangwang   分面接口修改:
451
452
453
454
455
456
457
458
459
          # Sales (fake_sales)
          if pd.notna(spu_row.get('fake_sales')):
              try:
                  doc['sales'] = int(spu_row['fake_sales'])
              except (ValueError, TypeError):
                  doc['sales'] = 0
          else:
              doc['sales'] = 0
  
5dcddc06   tangwang   索引重构
460
          # Process SKUs and build specifications
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
461
          skus_list = []
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
462
463
          prices = []
          compare_prices = []
5dcddc06   tangwang   索引重构
464
465
466
467
468
469
470
471
472
473
474
475
476
477
          sku_prices = []
          sku_weights = []
          sku_weight_units = []
          total_inventory = 0
          specifications = []
  
          # 构建option名称映射(position -> name)
          option_name_map = {}
          if not options.empty:
              for _, opt_row in options.iterrows():
                  position = opt_row.get('position')
                  name = opt_row.get('name')
                  if pd.notna(position) and pd.notna(name):
                      option_name_map[int(position)] = str(name)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
478
479
  
          for _, sku_row in skus.iterrows():
5dcddc06   tangwang   索引重构
480
              sku_data = self._transform_sku_row(sku_row, option_name_map)
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
481
482
              if sku_data:
                  skus_list.append(sku_data)
5dcddc06   tangwang   索引重构
483
484
                  
                  # 收集价格信息
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
485
                  if 'price' in sku_data and sku_data['price'] is not None:
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
486
                      try:
5dcddc06   tangwang   索引重构
487
488
489
                          price_val = float(sku_data['price'])
                          prices.append(price_val)
                          sku_prices.append(price_val)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
490
491
                      except (ValueError, TypeError):
                          pass
5dcddc06   tangwang   索引重构
492
                  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
493
                  if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None:
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
494
                      try:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
495
                          compare_prices.append(float(sku_data['compare_at_price']))
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
496
497
                      except (ValueError, TypeError):
                          pass
5dcddc06   tangwang   索引重构
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
                  
                  # 收集重量信息
                  if 'weight' in sku_data and sku_data['weight'] is not None:
                      try:
                          sku_weights.append(int(float(sku_data['weight'])))
                      except (ValueError, TypeError):
                          pass
                  
                  if 'weight_unit' in sku_data and sku_data['weight_unit']:
                      sku_weight_units.append(str(sku_data['weight_unit']))
                  
                  # 收集库存信息
                  if 'stock' in sku_data and sku_data['stock'] is not None:
                      try:
                          total_inventory += int(sku_data['stock'])
                      except (ValueError, TypeError):
                          pass
                  
                  # 构建specifications(从SKU的option值和option表的name)
                  sku_id = str(sku_row['id'])
                  if pd.notna(sku_row.get('option1')) and 1 in option_name_map:
                      specifications.append({
                          'sku_id': sku_id,
                          'name': option_name_map[1],
                          'value': str(sku_row['option1'])
                      })
                  if pd.notna(sku_row.get('option2')) and 2 in option_name_map:
                      specifications.append({
                          'sku_id': sku_id,
                          'name': option_name_map[2],
                          'value': str(sku_row['option2'])
                      })
                  if pd.notna(sku_row.get('option3')) and 3 in option_name_map:
                      specifications.append({
                          'sku_id': sku_id,
                          'name': option_name_map[3],
                          'value': str(sku_row['option3'])
                      })
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
536
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
537
          doc['skus'] = skus_list
5dcddc06   tangwang   索引重构
538
          doc['specifications'] = specifications
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
539
  
33839b37   tangwang   属性值参与搜索:
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
          # 提取option值(根据配置的searchable_option_dimensions)
          # 从子SKU的option1_value, option2_value, option3_value中提取去重后的值
          option1_values = []
          option2_values = []
          option3_values = []
          
          for _, sku_row in skus.iterrows():
              if pd.notna(sku_row.get('option1')):
                  option1_values.append(str(sku_row['option1']))
              if pd.notna(sku_row.get('option2')):
                  option2_values.append(str(sku_row['option2']))
              if pd.notna(sku_row.get('option3')):
                  option3_values.append(str(sku_row['option3']))
          
          # 去重并根据配置决定是否写入索引
          if 'option1' in self.searchable_option_dimensions:
              doc['option1_values'] = list(set(option1_values)) if option1_values else []
          else:
              doc['option1_values'] = []
          
          if 'option2' in self.searchable_option_dimensions:
              doc['option2_values'] = list(set(option2_values)) if option2_values else []
          else:
              doc['option2_values'] = []
          
          if 'option3' in self.searchable_option_dimensions:
              doc['option3_values'] = list(set(option3_values)) if option3_values else []
          else:
              doc['option3_values'] = []
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
570
571
572
573
574
575
576
577
578
579
580
581
582
          # Calculate price ranges
          if prices:
              doc['min_price'] = float(min(prices))
              doc['max_price'] = float(max(prices))
          else:
              doc['min_price'] = 0.0
              doc['max_price'] = 0.0
  
          if compare_prices:
              doc['compare_at_price'] = float(max(compare_prices))
          else:
              doc['compare_at_price'] = None
  
5dcddc06   tangwang   索引重构
583
584
585
586
587
588
589
590
591
592
593
594
595
          # SKU扁平化字段
          doc['sku_prices'] = sku_prices
          doc['sku_weights'] = sku_weights
          doc['sku_weight_units'] = list(set(sku_weight_units))  # 去重
          doc['total_inventory'] = total_inventory
  
          # Image URL
          if pd.notna(spu_row.get('image_src')):
              image_src = str(spu_row['image_src'])
              if not image_src.startswith('http'):
                  image_src = f"//{image_src}" if image_src.startswith('//') else image_src
              doc['image_url'] = image_src
  
cd3799c6   tangwang   tenant2 1w测试数据 mo...
596
597
598
599
600
601
602
603
604
605
606
607
608
609
          # Time fields - convert datetime to ISO format string for ES DATE type
          if pd.notna(spu_row.get('create_time')):
              create_time = spu_row['create_time']
              if hasattr(create_time, 'isoformat'):
                  doc['create_time'] = create_time.isoformat()
              else:
                  doc['create_time'] = str(create_time)
          
          if pd.notna(spu_row.get('update_time')):
              update_time = spu_row['update_time']
              if hasattr(update_time, 'isoformat'):
                  doc['update_time'] = update_time.isoformat()
              else:
                  doc['update_time'] = str(update_time)
cd3799c6   tangwang   tenant2 1w测试数据 mo...
610
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
611
612
          return doc
  
5dcddc06   tangwang   索引重构
613
      def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]:
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
614
          """
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
615
          Transform a SKU row into a SKU object.
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
616
617
618
  
          Args:
              sku_row: SKU row from database
5dcddc06   tangwang   索引重构
619
              option_name_map: Mapping from position to option name
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
620
621
  
          Returns:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
622
              SKU dictionary or None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
623
          """
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
624
          sku_data = {}
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
625
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
626
627
          # SKU ID
          sku_data['sku_id'] = str(sku_row['id'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
628
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
629
630
631
          # Price
          if pd.notna(sku_row.get('price')):
              try:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
632
                  sku_data['price'] = float(sku_row['price'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
633
              except (ValueError, TypeError):
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
634
                  sku_data['price'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
635
          else:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
636
              sku_data['price'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
637
638
639
640
  
          # Compare at price
          if pd.notna(sku_row.get('compare_at_price')):
              try:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
641
                  sku_data['compare_at_price'] = float(sku_row['compare_at_price'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
642
              except (ValueError, TypeError):
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
643
                  sku_data['compare_at_price'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
644
          else:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
645
              sku_data['compare_at_price'] = None
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
646
  
5dcddc06   tangwang   索引重构
647
          # SKU Code
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
648
          if pd.notna(sku_row.get('sku')):
5dcddc06   tangwang   索引重构
649
              sku_data['sku_code'] = str(sku_row['sku'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
650
651
652
653
  
          # Stock
          if pd.notna(sku_row.get('inventory_quantity')):
              try:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
654
                  sku_data['stock'] = int(sku_row['inventory_quantity'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
655
              except (ValueError, TypeError):
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
656
                  sku_data['stock'] = 0
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
657
          else:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
658
              sku_data['stock'] = 0
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
659
  
5dcddc06   tangwang   索引重构
660
661
662
663
664
665
666
667
668
669
670
671
672
673
          # Weight
          if pd.notna(sku_row.get('weight')):
              try:
                  sku_data['weight'] = float(sku_row['weight'])
              except (ValueError, TypeError):
                  sku_data['weight'] = None
          else:
              sku_data['weight'] = None
  
          # Weight unit
          if pd.notna(sku_row.get('weight_unit')):
              sku_data['weight_unit'] = str(sku_row['weight_unit'])
  
          # Option values
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
674
          if pd.notna(sku_row.get('option1')):
5dcddc06   tangwang   索引重构
675
              sku_data['option1_value'] = str(sku_row['option1'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
676
          if pd.notna(sku_row.get('option2')):
5dcddc06   tangwang   索引重构
677
              sku_data['option2_value'] = str(sku_row['option2'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
678
          if pd.notna(sku_row.get('option3')):
5dcddc06   tangwang   索引重构
679
              sku_data['option3_value'] = str(sku_row['option3'])
a10a89a3   tangwang   构造测试数据用于测试分类 和 三种...
680
          
5dcddc06   tangwang   索引重构
681
682
683
          # Image src
          if pd.notna(sku_row.get('image_src')):
              sku_data['image_src'] = str(sku_row['image_src'])
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
684
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
685
          return sku_data
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...