1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
1
2
3
|
"""
SPU data transformer for Shoplazza products.
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
4
|
Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
5
6
7
8
|
"""
import pandas as pd
import numpy as np
|
c973d288
tangwang
1. 类目字段处理
|
9
|
import logging
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
10
11
12
|
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, text
from utils.db_connector import create_db_connection
|
33839b37
tangwang
属性值参与搜索:
|
13
|
from config import ConfigLoader
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
14
|
|
c973d288
tangwang
1. 类目字段处理
|
15
16
17
|
# Configure logger
logger = logging.getLogger(__name__)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class SPUTransformer:
"""Transform SPU and SKU data into SPU-level ES documents."""
def __init__(
self,
db_engine: Any,
tenant_id: str
):
"""
Initialize SPU transformer.
Args:
db_engine: SQLAlchemy database engine
tenant_id: Tenant ID for filtering data
"""
self.db_engine = db_engine
self.tenant_id = tenant_id
|
33839b37
tangwang
属性值参与搜索:
|
36
37
38
39
40
41
42
43
44
|
# Load configuration to get searchable_option_dimensions
try:
config_loader = ConfigLoader()
config = config_loader.load_config()
self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
except Exception as e:
print(f"Warning: Failed to load config, using default searchable_option_dimensions: {e}")
self.searchable_option_dimensions = ['option1', 'option2', 'option3']
|
c973d288
tangwang
1. 类目字段处理
|
45
46
47
|
# Load category ID to name mapping
self.category_id_to_name = self._load_category_mapping()
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
48
|
|
c973d288
tangwang
1. 类目字段处理
|
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
def _load_category_mapping(self) -> Dict[str, str]:
"""
Load category ID to name mapping from database.
Returns:
Dictionary mapping category_id to category_name
"""
query = text("""
SELECT DISTINCT
category_id,
category
FROM shoplazza_product_spu
WHERE deleted = 0 AND category_id IS NOT NULL
""")
mapping = {}
with self.db_engine.connect() as conn:
result = conn.execute(query)
for row in result:
category_id = str(int(row.category_id))
category_name = row.category
if not category_name or not category_name.strip():
logger.warning(f"Category ID {category_id} has empty name, skipping")
continue
mapping[category_id] = category_name
logger.info(f"Loaded {len(mapping)} category ID to name mappings")
# Log all category mappings for debugging
if mapping:
logger.debug("Category ID mappings:")
for cid, name in sorted(mapping.items()):
logger.debug(f" {cid} -> {name}")
return mapping
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
87
88
89
90
91
92
93
94
95
|
def load_spu_data(self) -> pd.DataFrame:
"""
Load SPU data from MySQL.
Returns:
DataFrame with SPU data
"""
query = text("""
SELECT
|
5dcddc06
tangwang
索引重构
|
96
97
|
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
98
|
image_src, image_width, image_height, image_path, image_alt,
|
5dcddc06
tangwang
索引重构
|
99
100
|
tags, note, category, category_id, category_google_id,
category_level, category_path,
|
13320ac6
tangwang
分面接口修改:
|
101
|
fake_sales, display_fake_sales,
|
5dcddc06
tangwang
索引重构
|
102
|
tenant_id, creator, create_time, updater, update_time, deleted
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
103
104
105
106
107
108
109
|
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
has_category_path = df['category_path'].notna().sum()
has_category = df['category'].notna().sum()
has_title = df['title'].notna().sum()
logger.info(f"SPU data statistics:")
logger.info(f" - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
logger.info(f" - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
logger.info(f" - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
# Warn if too many SPUs don't have category_path
if has_category_path < len(df) * 0.5:
logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
else:
logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
# Debug: Check if there's any data for this tenant_id
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
debug_query = text("""
SELECT
COUNT(*) as total_count,
SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id
""")
with self.db_engine.connect() as conn:
debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
if not debug_df.empty:
total = debug_df.iloc[0]['total_count']
active = debug_df.iloc[0]['active_count']
deleted = debug_df.iloc[0]['deleted_count']
|
c973d288
tangwang
1. 类目字段处理
|
144
|
logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
145
146
147
148
149
150
151
152
153
154
155
156
|
# Check what tenant_ids exist in the table
tenant_check_query = text("""
SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
FROM shoplazza_product_spu
GROUP BY tenant_id
ORDER BY tenant_id
LIMIT 10
""")
with self.db_engine.connect() as conn:
tenant_df = pd.read_sql(tenant_check_query, conn)
if not tenant_df.empty:
|
c973d288
tangwang
1. 类目字段处理
|
157
|
logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
158
|
for _, row in tenant_df.iterrows():
|
c973d288
tangwang
1. 类目字段处理
|
159
|
logger.debug(f" tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
160
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
return df
def load_sku_data(self) -> pd.DataFrame:
"""
Load SKU data from MySQL.
Returns:
DataFrame with SKU data
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
has_price = df['price'].notna().sum()
has_inventory = df['inventory_quantity'].notna().sum()
has_option1 = df['option1'].notna().sum()
has_option2 = df['option2'].notna().sum()
has_option3 = df['option3'].notna().sum()
logger.info(f"SKU data statistics:")
logger.info(f" - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
logger.info(f" - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
logger.info(f" - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
logger.info(f" - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
logger.info(f" - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
# Warn about data quality issues
if has_price < len(df) * 0.95:
logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
207
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
208
209
|
return df
|
5dcddc06
tangwang
索引重构
|
210
211
212
213
214
215
216
217
218
219
|
def load_option_data(self) -> pd.DataFrame:
"""
Load option data from MySQL.
Returns:
DataFrame with option data (name, position for each SPU)
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
|
bf89b597
tangwang
feat(search): ada...
|
220
|
position, name, `values`, tenant_id,
|
5dcddc06
tangwang
索引重构
|
221
222
223
224
225
226
227
228
229
|
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND deleted = 0
ORDER BY spu_id, position
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
unique_spus_with_options = df['spu_id'].nunique()
has_name = df['name'].notna().sum()
logger.info(f"Option data statistics:")
logger.info(f" - Unique SPUs with options: {unique_spus_with_options}")
logger.info(f" - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
# Warn about missing option names
if has_name < len(df):
missing = len(df) - has_name
logger.warning(f"{missing} option records are missing names")
|
5dcddc06
tangwang
索引重构
|
245
246
247
|
return df
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
248
249
250
251
252
253
254
|
def transform_batch(self) -> List[Dict[str, Any]]:
"""
Transform SPU and SKU data into ES documents.
Returns:
List of SPU-level ES documents
"""
|
c973d288
tangwang
1. 类目字段处理
|
255
256
|
logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
257
258
259
|
# Load data
spu_df = self.load_spu_data()
sku_df = self.load_sku_data()
|
5dcddc06
tangwang
索引重构
|
260
|
option_df = self.load_option_data()
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
261
262
|
if spu_df.empty:
|
c973d288
tangwang
1. 类目字段处理
|
263
|
logger.warning("No SPU data to transform")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
264
265
266
267
|
return []
# Group SKUs by SPU
sku_groups = sku_df.groupby('spu_id')
|
c973d288
tangwang
1. 类目字段处理
|
268
|
logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
|
5dcddc06
tangwang
索引重构
|
269
270
271
|
# Group options by SPU
option_groups = option_df.groupby('spu_id') if not option_df.empty else None
|
c973d288
tangwang
1. 类目字段处理
|
272
273
|
if option_groups:
logger.info(f"Grouped options into {len(option_groups)} SPU groups")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
274
275
|
documents = []
|
c973d288
tangwang
1. 类目字段处理
|
276
277
278
279
|
skipped_count = 0
error_count = 0
for idx, spu_row in spu_df.iterrows():
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
280
281
|
spu_id = spu_row['id']
|
c973d288
tangwang
1. 类目字段处理
|
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
|
try:
# Get SKUs for this SPU
skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
# Get options for this SPU
options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
# Warn if SPU has no SKUs
if skus.empty:
logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
# Transform to ES document
doc = self._transform_spu_to_doc(spu_row, skus, options)
if doc:
documents.append(doc)
else:
skipped_count += 1
logger.warning(f"SPU {spu_id} transformation returned None, skipped")
except Exception as e:
error_count += 1
logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
303
|
|
c973d288
tangwang
1. 类目字段处理
|
304
305
306
307
308
309
|
logger.info(f"Transformation complete:")
logger.info(f" - Total SPUs: {len(spu_df)}")
logger.info(f" - Successfully transformed: {len(documents)}")
logger.info(f" - Skipped: {skipped_count}")
logger.info(f" - Errors: {error_count}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
310
311
312
313
314
|
return documents
def _transform_spu_to_doc(
self,
spu_row: pd.Series,
|
5dcddc06
tangwang
索引重构
|
315
316
|
skus: pd.DataFrame,
options: pd.DataFrame
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
317
318
319
320
321
322
323
|
) -> Optional[Dict[str, Any]]:
"""
Transform a single SPU row and its SKUs into an ES document.
Args:
spu_row: SPU row from database
skus: DataFrame with SKUs for this SPU
|
5dcddc06
tangwang
索引重构
|
324
|
options: DataFrame with options for this SPU
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
325
326
327
328
329
330
331
332
333
|
Returns:
ES document or None if transformation fails
"""
doc = {}
# Tenant ID (required)
doc['tenant_id'] = str(self.tenant_id)
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
334
|
# SPU ID
|
c973d288
tangwang
1. 类目字段处理
|
335
336
337
338
339
340
|
spu_id = spu_row['id']
doc['spu_id'] = str(spu_id)
# Validate required fields
if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip():
logger.error(f"SPU {spu_id} has no title, this may cause search issues")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
341
|
|
5dcddc06
tangwang
索引重构
|
342
|
# 文本相关性相关字段(中英文双语,暂时只填充中文)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
343
|
if pd.notna(spu_row.get('title')):
|
5dcddc06
tangwang
索引重构
|
344
345
|
doc['title_zh'] = str(spu_row['title'])
doc['title_en'] = None # 暂时设为空
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
346
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
347
|
if pd.notna(spu_row.get('brief')):
|
5dcddc06
tangwang
索引重构
|
348
349
|
doc['brief_zh'] = str(spu_row['brief'])
doc['brief_en'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
350
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
351
|
if pd.notna(spu_row.get('description')):
|
5dcddc06
tangwang
索引重构
|
352
353
|
doc['description_zh'] = str(spu_row['description'])
doc['description_en'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
354
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
355
|
if pd.notna(spu_row.get('vendor')):
|
5dcddc06
tangwang
索引重构
|
356
357
|
doc['vendor_zh'] = str(spu_row['vendor'])
doc['vendor_en'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
358
359
360
|
# Tags
if pd.notna(spu_row.get('tags')):
|
5dcddc06
tangwang
索引重构
|
361
362
363
364
365
366
367
|
# Tags是逗号分隔的字符串,需要转换为数组
tags_str = str(spu_row['tags'])
doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()]
# Category相关字段
if pd.notna(spu_row.get('category_path')):
category_path = str(spu_row['category_path'])
|
5dcddc06
tangwang
索引重构
|
368
|
|
c973d288
tangwang
1. 类目字段处理
|
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
|
# 解析category_path - 这是逗号分隔的类目ID列表
category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()]
# 将ID映射为名称
category_names = []
missing_category_ids = []
for cid in category_ids:
if cid in self.category_id_to_name:
category_names.append(self.category_id_to_name[cid])
else:
# 如果找不到映射,记录错误并使用ID作为备选
logger.error(f"Category ID {cid} not found in mapping for SPU {spu_row['id']} (title: {spu_row.get('title', 'N/A')}), category_path={category_path}")
missing_category_ids.append(cid)
category_names.append(cid) # 使用ID作为备选
# 构建类目路径字符串(用于搜索)
if category_names:
category_path_str = '/'.join(category_names)
doc['category_path_zh'] = category_path_str
doc['category_path_en'] = None # 暂时设为空
# 填充分层类目名称
if len(category_names) > 0:
doc['category1_name'] = category_names[0]
if len(category_names) > 1:
doc['category2_name'] = category_names[1]
if len(category_names) > 2:
doc['category3_name'] = category_names[2]
|
a10a89a3
tangwang
构造测试数据用于测试分类 和 三种...
|
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
|
elif pd.notna(spu_row.get('category')):
# 如果category_path为空,使用category字段作为category1_name的备选
category = str(spu_row['category'])
doc['category_name_zh'] = category
doc['category_name_en'] = None
doc['category_name'] = category
# 尝试从category字段解析多级分类
if '/' in category:
path_parts = category.split('/')
if len(path_parts) > 0:
doc['category1_name'] = path_parts[0].strip()
if len(path_parts) > 1:
doc['category2_name'] = path_parts[1].strip()
if len(path_parts) > 2:
doc['category3_name'] = path_parts[2].strip()
else:
# 如果category不包含"/",直接作为category1_name
doc['category1_name'] = category.strip()
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
416
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
417
|
if pd.notna(spu_row.get('category')):
|
a10a89a3
tangwang
构造测试数据用于测试分类 和 三种...
|
418
|
# 确保category相关字段都被设置(如果前面没有设置)
|
5dcddc06
tangwang
索引重构
|
419
|
category_name = str(spu_row['category'])
|
a10a89a3
tangwang
构造测试数据用于测试分类 和 三种...
|
420
421
422
423
424
425
|
if 'category_name_zh' not in doc:
doc['category_name_zh'] = category_name
if 'category_name_en' not in doc:
doc['category_name_en'] = None
if 'category_name' not in doc:
doc['category_name'] = category_name
|
5dcddc06
tangwang
索引重构
|
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
|
if pd.notna(spu_row.get('category_id')):
doc['category_id'] = str(int(spu_row['category_id']))
if pd.notna(spu_row.get('category_level')):
doc['category_level'] = int(spu_row['category_level'])
# Option名称(从option表获取)
if not options.empty:
# 按position排序获取option名称
sorted_options = options.sort_values('position')
if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')):
doc['option1_name'] = str(sorted_options.iloc[0]['name'])
if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')):
doc['option2_name'] = str(sorted_options.iloc[1]['name'])
if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')):
doc['option3_name'] = str(sorted_options.iloc[2]['name'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
443
444
445
446
447
448
449
450
|
# Image URL
if pd.notna(spu_row.get('image_src')):
image_src = str(spu_row['image_src'])
if not image_src.startswith('http'):
image_src = f"//{image_src}" if image_src.startswith('//') else image_src
doc['image_url'] = image_src
|
13320ac6
tangwang
分面接口修改:
|
451
452
453
454
455
456
457
458
459
|
# Sales (fake_sales)
if pd.notna(spu_row.get('fake_sales')):
try:
doc['sales'] = int(spu_row['fake_sales'])
except (ValueError, TypeError):
doc['sales'] = 0
else:
doc['sales'] = 0
|
5dcddc06
tangwang
索引重构
|
460
|
# Process SKUs and build specifications
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
461
|
skus_list = []
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
462
463
|
prices = []
compare_prices = []
|
5dcddc06
tangwang
索引重构
|
464
465
466
467
468
469
470
471
472
473
474
475
476
477
|
sku_prices = []
sku_weights = []
sku_weight_units = []
total_inventory = 0
specifications = []
# 构建option名称映射(position -> name)
option_name_map = {}
if not options.empty:
for _, opt_row in options.iterrows():
position = opt_row.get('position')
name = opt_row.get('name')
if pd.notna(position) and pd.notna(name):
option_name_map[int(position)] = str(name)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
478
479
|
for _, sku_row in skus.iterrows():
|
5dcddc06
tangwang
索引重构
|
480
|
sku_data = self._transform_sku_row(sku_row, option_name_map)
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
481
482
|
if sku_data:
skus_list.append(sku_data)
|
5dcddc06
tangwang
索引重构
|
483
484
|
# 收集价格信息
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
485
|
if 'price' in sku_data and sku_data['price'] is not None:
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
486
|
try:
|
5dcddc06
tangwang
索引重构
|
487
488
489
|
price_val = float(sku_data['price'])
prices.append(price_val)
sku_prices.append(price_val)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
490
491
|
except (ValueError, TypeError):
pass
|
5dcddc06
tangwang
索引重构
|
492
|
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
493
|
if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None:
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
494
|
try:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
495
|
compare_prices.append(float(sku_data['compare_at_price']))
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
496
497
|
except (ValueError, TypeError):
pass
|
5dcddc06
tangwang
索引重构
|
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
|
# 收集重量信息
if 'weight' in sku_data and sku_data['weight'] is not None:
try:
sku_weights.append(int(float(sku_data['weight'])))
except (ValueError, TypeError):
pass
if 'weight_unit' in sku_data and sku_data['weight_unit']:
sku_weight_units.append(str(sku_data['weight_unit']))
# 收集库存信息
if 'stock' in sku_data and sku_data['stock'] is not None:
try:
total_inventory += int(sku_data['stock'])
except (ValueError, TypeError):
pass
# 构建specifications(从SKU的option值和option表的name)
sku_id = str(sku_row['id'])
if pd.notna(sku_row.get('option1')) and 1 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[1],
'value': str(sku_row['option1'])
})
if pd.notna(sku_row.get('option2')) and 2 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[2],
'value': str(sku_row['option2'])
})
if pd.notna(sku_row.get('option3')) and 3 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[3],
'value': str(sku_row['option3'])
})
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
536
|
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
537
|
doc['skus'] = skus_list
|
5dcddc06
tangwang
索引重构
|
538
|
doc['specifications'] = specifications
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
539
|
|
33839b37
tangwang
属性值参与搜索:
|
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
|
# 提取option值(根据配置的searchable_option_dimensions)
# 从子SKU的option1_value, option2_value, option3_value中提取去重后的值
option1_values = []
option2_values = []
option3_values = []
for _, sku_row in skus.iterrows():
if pd.notna(sku_row.get('option1')):
option1_values.append(str(sku_row['option1']))
if pd.notna(sku_row.get('option2')):
option2_values.append(str(sku_row['option2']))
if pd.notna(sku_row.get('option3')):
option3_values.append(str(sku_row['option3']))
# 去重并根据配置决定是否写入索引
if 'option1' in self.searchable_option_dimensions:
doc['option1_values'] = list(set(option1_values)) if option1_values else []
else:
doc['option1_values'] = []
if 'option2' in self.searchable_option_dimensions:
doc['option2_values'] = list(set(option2_values)) if option2_values else []
else:
doc['option2_values'] = []
if 'option3' in self.searchable_option_dimensions:
doc['option3_values'] = list(set(option3_values)) if option3_values else []
else:
doc['option3_values'] = []
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
570
571
572
573
574
575
576
577
578
579
580
581
582
|
# Calculate price ranges
if prices:
doc['min_price'] = float(min(prices))
doc['max_price'] = float(max(prices))
else:
doc['min_price'] = 0.0
doc['max_price'] = 0.0
if compare_prices:
doc['compare_at_price'] = float(max(compare_prices))
else:
doc['compare_at_price'] = None
|
5dcddc06
tangwang
索引重构
|
583
584
585
586
587
588
589
590
591
592
593
594
595
|
# SKU扁平化字段
doc['sku_prices'] = sku_prices
doc['sku_weights'] = sku_weights
doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重
doc['total_inventory'] = total_inventory
# Image URL
if pd.notna(spu_row.get('image_src')):
image_src = str(spu_row['image_src'])
if not image_src.startswith('http'):
image_src = f"//{image_src}" if image_src.startswith('//') else image_src
doc['image_url'] = image_src
|
cd3799c6
tangwang
tenant2 1w测试数据 mo...
|
596
597
598
599
600
601
602
603
604
605
606
607
608
609
|
# Time fields - convert datetime to ISO format string for ES DATE type
if pd.notna(spu_row.get('create_time')):
create_time = spu_row['create_time']
if hasattr(create_time, 'isoformat'):
doc['create_time'] = create_time.isoformat()
else:
doc['create_time'] = str(create_time)
if pd.notna(spu_row.get('update_time')):
update_time = spu_row['update_time']
if hasattr(update_time, 'isoformat'):
doc['update_time'] = update_time.isoformat()
else:
doc['update_time'] = str(update_time)
|
cd3799c6
tangwang
tenant2 1w测试数据 mo...
|
610
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
611
612
|
return doc
|
5dcddc06
tangwang
索引重构
|
613
|
def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]:
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
614
|
"""
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
615
|
Transform a SKU row into a SKU object.
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
616
617
618
|
Args:
sku_row: SKU row from database
|
5dcddc06
tangwang
索引重构
|
619
|
option_name_map: Mapping from position to option name
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
620
621
|
Returns:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
622
|
SKU dictionary or None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
623
|
"""
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
624
|
sku_data = {}
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
625
|
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
626
627
|
# SKU ID
sku_data['sku_id'] = str(sku_row['id'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
628
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
629
630
631
|
# Price
if pd.notna(sku_row.get('price')):
try:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
632
|
sku_data['price'] = float(sku_row['price'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
633
|
except (ValueError, TypeError):
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
634
|
sku_data['price'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
635
|
else:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
636
|
sku_data['price'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
637
638
639
640
|
# Compare at price
if pd.notna(sku_row.get('compare_at_price')):
try:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
641
|
sku_data['compare_at_price'] = float(sku_row['compare_at_price'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
642
|
except (ValueError, TypeError):
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
643
|
sku_data['compare_at_price'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
644
|
else:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
645
|
sku_data['compare_at_price'] = None
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
646
|
|
5dcddc06
tangwang
索引重构
|
647
|
# SKU Code
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
648
|
if pd.notna(sku_row.get('sku')):
|
5dcddc06
tangwang
索引重构
|
649
|
sku_data['sku_code'] = str(sku_row['sku'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
650
651
652
653
|
# Stock
if pd.notna(sku_row.get('inventory_quantity')):
try:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
654
|
sku_data['stock'] = int(sku_row['inventory_quantity'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
655
|
except (ValueError, TypeError):
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
656
|
sku_data['stock'] = 0
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
657
|
else:
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
658
|
sku_data['stock'] = 0
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
659
|
|
5dcddc06
tangwang
索引重构
|
660
661
662
663
664
665
666
667
668
669
670
671
672
673
|
# Weight
if pd.notna(sku_row.get('weight')):
try:
sku_data['weight'] = float(sku_row['weight'])
except (ValueError, TypeError):
sku_data['weight'] = None
else:
sku_data['weight'] = None
# Weight unit
if pd.notna(sku_row.get('weight_unit')):
sku_data['weight_unit'] = str(sku_row['weight_unit'])
# Option values
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
674
|
if pd.notna(sku_row.get('option1')):
|
5dcddc06
tangwang
索引重构
|
675
|
sku_data['option1_value'] = str(sku_row['option1'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
676
|
if pd.notna(sku_row.get('option2')):
|
5dcddc06
tangwang
索引重构
|
677
|
sku_data['option2_value'] = str(sku_row['option2'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
678
|
if pd.notna(sku_row.get('option3')):
|
5dcddc06
tangwang
索引重构
|
679
|
sku_data['option3_value'] = str(sku_row['option3'])
|
a10a89a3
tangwang
构造测试数据用于测试分类 和 三种...
|
680
|
|
5dcddc06
tangwang
索引重构
|
681
682
683
|
# Image src
if pd.notna(sku_row.get('image_src')):
sku_data['image_src'] = str(sku_row['image_src'])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
684
|
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
685
|
return sku_data
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
|
|