Blame view

indexer/spu_transformer.py 10.7 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
  """
  SPU data transformer for Shoplazza products.
  
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
4
  Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
5
6
7
  """
  
  import pandas as pd
c973d288   tangwang   1. 类目字段处理
8
  import logging
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
9
  from typing import Dict, Any, List, Optional
3c1f8031   tangwang   api/routes/indexe...
10
11
  from sqlalchemy import text
  from indexer.indexing_utils import load_category_mapping, create_document_transformer
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
12
  
c973d288   tangwang   1. 类目字段处理
13
14
15
  # Configure logger
  logger = logging.getLogger(__name__)
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
16
17
18
19
  
  class SPUTransformer:
      """Transform SPU and SKU data into SPU-level ES documents."""
  
3c1f8031   tangwang   api/routes/indexe...
20
      def __init__(self, db_engine: Any, tenant_id: str):
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
21
22
          self.db_engine = db_engine
          self.tenant_id = tenant_id
33839b37   tangwang   属性值参与搜索:
23
          
c973d288   tangwang   1. 类目字段处理
24
          # Load category ID to name mapping
3c1f8031   tangwang   api/routes/indexe...
25
26
          self.category_id_to_name = load_category_mapping(db_engine)
          logger.info(f"Loaded {len(self.category_id_to_name)} category ID to name mappings")
0064e946   tangwang   feat: 增量索引服务、租户配置...
27
28
          
          # Initialize document transformer
3c1f8031   tangwang   api/routes/indexe...
29
          self.document_transformer = create_document_transformer(
0064e946   tangwang   feat: 增量索引服务、租户配置...
30
              category_id_to_name=self.category_id_to_name,
3c1f8031   tangwang   api/routes/indexe...
31
              tenant_id=tenant_id
0064e946   tangwang   feat: 增量索引服务、租户配置...
32
          )
c973d288   tangwang   1. 类目字段处理
33
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
34
35
36
37
38
39
40
41
42
      def load_spu_data(self) -> pd.DataFrame:
          """
          Load SPU data from MySQL.
  
          Returns:
              DataFrame with SPU data
          """
          query = text("""
              SELECT 
5dcddc06   tangwang   索引重构
43
44
                  id, shop_id, shoplazza_id, title, brief, description,
                  spu, vendor, vendor_url,
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
45
                  image_src, image_width, image_height, image_path, image_alt,
5dcddc06   tangwang   索引重构
46
47
                  tags, note, category, category_id, category_google_id,
                  category_level, category_path,
13320ac6   tangwang   分面接口修改:
48
                  fake_sales, display_fake_sales,
5dcddc06   tangwang   索引重构
49
                  tenant_id, creator, create_time, updater, update_time, deleted
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
50
51
52
53
54
55
56
              FROM shoplazza_product_spu
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
          logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_category_path = df['category_path'].notna().sum()
              has_category = df['category'].notna().sum()
              has_title = df['title'].notna().sum()
              
              logger.info(f"SPU data statistics:")
              logger.info(f"  - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
              logger.info(f"  - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
              logger.info(f"  - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
              
              # Warn if too many SPUs don't have category_path
              if has_category_path < len(df) * 0.5:
                  logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
          else:
              logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
              
              # Debug: Check if there's any data for this tenant_id
8cff1628   tangwang   tenant2 1w测试数据 mo...
77
78
79
80
81
82
83
84
85
86
87
88
89
90
              debug_query = text("""
                  SELECT 
                      COUNT(*) as total_count,
                      SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
                      SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
                  FROM shoplazza_product_spu
                  WHERE tenant_id = :tenant_id
              """)
              with self.db_engine.connect() as conn:
                  debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
              if not debug_df.empty:
                  total = debug_df.iloc[0]['total_count']
                  active = debug_df.iloc[0]['active_count']
                  deleted = debug_df.iloc[0]['deleted_count']
c973d288   tangwang   1. 类目字段处理
91
                  logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
92
93
94
95
96
97
98
99
100
101
102
103
              
              # Check what tenant_ids exist in the table
              tenant_check_query = text("""
                  SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
                  FROM shoplazza_product_spu
                  GROUP BY tenant_id
                  ORDER BY tenant_id
                  LIMIT 10
              """)
              with self.db_engine.connect() as conn:
                  tenant_df = pd.read_sql(tenant_check_query, conn)
              if not tenant_df.empty:
c973d288   tangwang   1. 类目字段处理
104
                  logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
8cff1628   tangwang   tenant2 1w测试数据 mo...
105
                  for _, row in tenant_df.iterrows():
c973d288   tangwang   1. 类目字段处理
106
                      logger.debug(f"  tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
8cff1628   tangwang   tenant2 1w测试数据 mo...
107
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
          return df
  
      def load_sku_data(self) -> pd.DataFrame:
          """
          Load SKU data from MySQL.
  
          Returns:
              DataFrame with SKU data
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
          logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              has_price = df['price'].notna().sum()
              has_inventory = df['inventory_quantity'].notna().sum()
              has_option1 = df['option1'].notna().sum()
              has_option2 = df['option2'].notna().sum()
              has_option3 = df['option3'].notna().sum()
              
              logger.info(f"SKU data statistics:")
              logger.info(f"  - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
              logger.info(f"  - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
              logger.info(f"  - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
              logger.info(f"  - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
              logger.info(f"  - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
              
              # Warn about data quality issues
              if has_price < len(df) * 0.95:
                  logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
8cff1628   tangwang   tenant2 1w测试数据 mo...
154
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
155
156
          return df
  
5dcddc06   tangwang   索引重构
157
158
159
160
161
162
163
164
165
166
      def load_option_data(self) -> pd.DataFrame:
          """
          Load option data from MySQL.
  
          Returns:
              DataFrame with option data (name, position for each SPU)
          """
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
bf89b597   tangwang   feat(search): ada...
167
                  position, name, `values`, tenant_id,
5dcddc06   tangwang   索引重构
168
169
170
171
172
173
174
175
176
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND deleted = 0
              ORDER BY spu_id, position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
          
c973d288   tangwang   1. 类目字段处理
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
          logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
          
          # Statistics
          if len(df) > 0:
              unique_spus_with_options = df['spu_id'].nunique()
              has_name = df['name'].notna().sum()
              
              logger.info(f"Option data statistics:")
              logger.info(f"  - Unique SPUs with options: {unique_spus_with_options}")
              logger.info(f"  - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
              
              # Warn about missing option names
              if has_name < len(df):
                  missing = len(df) - has_name
                  logger.warning(f"{missing} option records are missing names")
5dcddc06   tangwang   索引重构
192
193
194
          
          return df
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
195
196
197
198
199
200
201
      def transform_batch(self) -> List[Dict[str, Any]]:
          """
          Transform SPU and SKU data into ES documents.
  
          Returns:
              List of SPU-level ES documents
          """
c973d288   tangwang   1. 类目字段处理
202
203
          logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
204
205
206
          # Load data
          spu_df = self.load_spu_data()
          sku_df = self.load_sku_data()
5dcddc06   tangwang   索引重构
207
          option_df = self.load_option_data()
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
208
209
  
          if spu_df.empty:
c973d288   tangwang   1. 类目字段处理
210
              logger.warning("No SPU data to transform")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
211
212
213
214
              return []
  
          # Group SKUs by SPU
          sku_groups = sku_df.groupby('spu_id')
c973d288   tangwang   1. 类目字段处理
215
          logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
5dcddc06   tangwang   索引重构
216
217
218
          
          # Group options by SPU
          option_groups = option_df.groupby('spu_id') if not option_df.empty else None
c973d288   tangwang   1. 类目字段处理
219
220
          if option_groups:
              logger.info(f"Grouped options into {len(option_groups)} SPU groups")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
221
222
  
          documents = []
c973d288   tangwang   1. 类目字段处理
223
224
225
226
          skipped_count = 0
          error_count = 0
          
          for idx, spu_row in spu_df.iterrows():
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
227
228
              spu_id = spu_row['id']
              
c973d288   tangwang   1. 类目字段处理
229
230
231
232
233
234
235
236
237
238
239
240
              try:
                  # Get SKUs for this SPU
                  skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
                  
                  # Get options for this SPU
                  options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
                  
                  # Warn if SPU has no SKUs
                  if skus.empty:
                      logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
                  
                  # Transform to ES document
0064e946   tangwang   feat: 增量索引服务、租户配置...
241
242
243
244
245
246
                  doc = self.document_transformer.transform_spu_to_doc(
                      tenant_id=self.tenant_id,
                      spu_row=spu_row,
                      skus=skus,
                      options=options
                  )
c973d288   tangwang   1. 类目字段处理
247
248
249
250
251
252
253
254
                  if doc:
                      documents.append(doc)
                  else:
                      skipped_count += 1
                      logger.warning(f"SPU {spu_id} transformation returned None, skipped")
              except Exception as e:
                  error_count += 1
                  logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
255
  
c973d288   tangwang   1. 类目字段处理
256
257
258
259
260
261
          logger.info(f"Transformation complete:")
          logger.info(f"  - Total SPUs: {len(spu_df)}")
          logger.info(f"  - Successfully transformed: {len(documents)}")
          logger.info(f"  - Skipped: {skipped_count}")
          logger.info(f"  - Errors: {error_count}")
          
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
262
          return documents
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...