1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
1
2
3
|
"""
SPU data transformer for Shoplazza products.
|
cadc77b6
tangwang
索引字段名、变量名、API数据结构...
|
4
|
Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested skus.
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
5
6
7
8
|
"""
import pandas as pd
import numpy as np
|
c973d288
tangwang
1. 类目字段处理
|
9
|
import logging
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
10
11
12
|
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, text
from utils.db_connector import create_db_connection
|
33839b37
tangwang
属性值参与搜索:
|
13
|
from config import ConfigLoader
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
14
15
|
from config.tenant_config_loader import get_tenant_config_loader
from indexer.document_transformer import SPUDocumentTransformer
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
16
|
|
c973d288
tangwang
1. 类目字段处理
|
17
18
19
|
# Configure logger
logger = logging.getLogger(__name__)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class SPUTransformer:
"""Transform SPU and SKU data into SPU-level ES documents."""
def __init__(
self,
db_engine: Any,
tenant_id: str
):
"""
Initialize SPU transformer.
Args:
db_engine: SQLAlchemy database engine
tenant_id: Tenant ID for filtering data
"""
self.db_engine = db_engine
self.tenant_id = tenant_id
|
33839b37
tangwang
属性值参与搜索:
|
38
39
|
# Load configuration to get searchable_option_dimensions
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
40
41
|
translator = None
translation_prompts = {}
|
33839b37
tangwang
属性值参与搜索:
|
42
43
44
45
|
try:
config_loader = ConfigLoader()
config = config_loader.load_config()
self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
46
47
48
49
50
51
52
53
54
55
56
|
# Initialize translator if translation is enabled
if config.query_config.enable_translation:
from query.translator import Translator
translator = Translator(
api_key=config.query_config.translation_api_key,
use_cache=True, # 索引时使用缓存避免重复翻译
glossary_id=config.query_config.translation_glossary_id,
translation_context=config.query_config.translation_context
)
translation_prompts = config.query_config.translation_prompts
|
33839b37
tangwang
属性值参与搜索:
|
57
|
except Exception as e:
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
58
|
logger.warning(f"Failed to load config, using default: {e}")
|
33839b37
tangwang
属性值参与搜索:
|
59
|
self.searchable_option_dimensions = ['option1', 'option2', 'option3']
|
c973d288
tangwang
1. 类目字段处理
|
60
61
62
|
# Load category ID to name mapping
self.category_id_to_name = self._load_category_mapping()
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# Load tenant config
tenant_config_loader = get_tenant_config_loader()
tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
# Initialize document transformer
self.document_transformer = SPUDocumentTransformer(
category_id_to_name=self.category_id_to_name,
searchable_option_dimensions=self.searchable_option_dimensions,
tenant_config=tenant_config,
translator=translator,
translation_prompts=translation_prompts
)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
76
|
|
c973d288
tangwang
1. 类目字段处理
|
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
def _load_category_mapping(self) -> Dict[str, str]:
"""
Load category ID to name mapping from database.
Returns:
Dictionary mapping category_id to category_name
"""
query = text("""
SELECT DISTINCT
category_id,
category
FROM shoplazza_product_spu
WHERE deleted = 0 AND category_id IS NOT NULL
""")
mapping = {}
with self.db_engine.connect() as conn:
result = conn.execute(query)
for row in result:
category_id = str(int(row.category_id))
category_name = row.category
if not category_name or not category_name.strip():
logger.warning(f"Category ID {category_id} has empty name, skipping")
continue
mapping[category_id] = category_name
logger.info(f"Loaded {len(mapping)} category ID to name mappings")
# Log all category mappings for debugging
if mapping:
logger.debug("Category ID mappings:")
for cid, name in sorted(mapping.items()):
logger.debug(f" {cid} -> {name}")
return mapping
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
115
116
117
118
119
120
121
122
123
|
def load_spu_data(self) -> pd.DataFrame:
"""
Load SPU data from MySQL.
Returns:
DataFrame with SPU data
"""
query = text("""
SELECT
|
5dcddc06
tangwang
索引重构
|
124
125
|
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
126
|
image_src, image_width, image_height, image_path, image_alt,
|
5dcddc06
tangwang
索引重构
|
127
128
|
tags, note, category, category_id, category_google_id,
category_level, category_path,
|
13320ac6
tangwang
分面接口修改:
|
129
|
fake_sales, display_fake_sales,
|
5dcddc06
tangwang
索引重构
|
130
|
tenant_id, creator, create_time, updater, update_time, deleted
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
131
132
133
134
135
136
137
|
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
logger.info(f"Loaded {len(df)} SPU records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
has_category_path = df['category_path'].notna().sum()
has_category = df['category'].notna().sum()
has_title = df['title'].notna().sum()
logger.info(f"SPU data statistics:")
logger.info(f" - Has title: {has_title}/{len(df)} ({100*has_title/len(df):.1f}%)")
logger.info(f" - Has category_path: {has_category_path}/{len(df)} ({100*has_category_path/len(df):.1f}%)")
logger.info(f" - Has category: {has_category}/{len(df)} ({100*has_category/len(df):.1f}%)")
# Warn if too many SPUs don't have category_path
if has_category_path < len(df) * 0.5:
logger.warning(f"Only {100*has_category_path/len(df):.1f}% of SPUs have category_path, data quality may be low")
else:
logger.warning(f"No SPU data found for tenant_id={self.tenant_id}")
# Debug: Check if there's any data for this tenant_id
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
debug_query = text("""
SELECT
COUNT(*) as total_count,
SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_count,
SUM(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) as deleted_count
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id
""")
with self.db_engine.connect() as conn:
debug_df = pd.read_sql(debug_query, conn, params={"tenant_id": self.tenant_id})
if not debug_df.empty:
total = debug_df.iloc[0]['total_count']
active = debug_df.iloc[0]['active_count']
deleted = debug_df.iloc[0]['deleted_count']
|
c973d288
tangwang
1. 类目字段处理
|
172
|
logger.debug(f"tenant_id={self.tenant_id}: total={total}, active={active}, deleted={deleted}")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
173
174
175
176
177
178
179
180
181
182
183
184
|
# Check what tenant_ids exist in the table
tenant_check_query = text("""
SELECT tenant_id, COUNT(*) as count, SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active
FROM shoplazza_product_spu
GROUP BY tenant_id
ORDER BY tenant_id
LIMIT 10
""")
with self.db_engine.connect() as conn:
tenant_df = pd.read_sql(tenant_check_query, conn)
if not tenant_df.empty:
|
c973d288
tangwang
1. 类目字段处理
|
185
|
logger.debug(f"Available tenant_ids in shoplazza_product_spu:")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
186
|
for _, row in tenant_df.iterrows():
|
c973d288
tangwang
1. 类目字段处理
|
187
|
logger.debug(f" tenant_id={row['tenant_id']}: total={row['count']}, active={row['active']}")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
188
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
return df
def load_sku_data(self) -> pd.DataFrame:
"""
Load SKU data from MySQL.
Returns:
DataFrame with SKU data
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
logger.info(f"Loaded {len(df)} SKU records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
has_price = df['price'].notna().sum()
has_inventory = df['inventory_quantity'].notna().sum()
has_option1 = df['option1'].notna().sum()
has_option2 = df['option2'].notna().sum()
has_option3 = df['option3'].notna().sum()
logger.info(f"SKU data statistics:")
logger.info(f" - Has price: {has_price}/{len(df)} ({100*has_price/len(df):.1f}%)")
logger.info(f" - Has inventory: {has_inventory}/{len(df)} ({100*has_inventory/len(df):.1f}%)")
logger.info(f" - Has option1: {has_option1}/{len(df)} ({100*has_option1/len(df):.1f}%)")
logger.info(f" - Has option2: {has_option2}/{len(df)} ({100*has_option2/len(df):.1f}%)")
logger.info(f" - Has option3: {has_option3}/{len(df)} ({100*has_option3/len(df):.1f}%)")
# Warn about data quality issues
if has_price < len(df) * 0.95:
logger.warning(f"Only {100*has_price/len(df):.1f}% of SKUs have price")
|
8cff1628
tangwang
tenant2 1w测试数据 mo...
|
235
|
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
236
237
|
return df
|
5dcddc06
tangwang
索引重构
|
238
239
240
241
242
243
244
245
246
247
|
def load_option_data(self) -> pd.DataFrame:
"""
Load option data from MySQL.
Returns:
DataFrame with option data (name, position for each SPU)
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
|
bf89b597
tangwang
feat(search): ada...
|
248
|
position, name, `values`, tenant_id,
|
5dcddc06
tangwang
索引重构
|
249
250
251
252
253
254
255
256
257
|
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND deleted = 0
ORDER BY spu_id, position
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id})
|
c973d288
tangwang
1. 类目字段处理
|
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
logger.info(f"Loaded {len(df)} option records for tenant_id={self.tenant_id}")
# Statistics
if len(df) > 0:
unique_spus_with_options = df['spu_id'].nunique()
has_name = df['name'].notna().sum()
logger.info(f"Option data statistics:")
logger.info(f" - Unique SPUs with options: {unique_spus_with_options}")
logger.info(f" - Has name: {has_name}/{len(df)} ({100*has_name/len(df):.1f}%)")
# Warn about missing option names
if has_name < len(df):
missing = len(df) - has_name
logger.warning(f"{missing} option records are missing names")
|
5dcddc06
tangwang
索引重构
|
273
274
275
|
return df
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
276
277
278
279
280
281
282
|
def transform_batch(self) -> List[Dict[str, Any]]:
"""
Transform SPU and SKU data into ES documents.
Returns:
List of SPU-level ES documents
"""
|
c973d288
tangwang
1. 类目字段处理
|
283
284
|
logger.info(f"Starting data transformation for tenant_id={self.tenant_id}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
285
286
287
|
# Load data
spu_df = self.load_spu_data()
sku_df = self.load_sku_data()
|
5dcddc06
tangwang
索引重构
|
288
|
option_df = self.load_option_data()
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
289
290
|
if spu_df.empty:
|
c973d288
tangwang
1. 类目字段处理
|
291
|
logger.warning("No SPU data to transform")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
292
293
294
295
|
return []
# Group SKUs by SPU
sku_groups = sku_df.groupby('spu_id')
|
c973d288
tangwang
1. 类目字段处理
|
296
|
logger.info(f"Grouped SKUs into {len(sku_groups)} SPU groups")
|
5dcddc06
tangwang
索引重构
|
297
298
299
|
# Group options by SPU
option_groups = option_df.groupby('spu_id') if not option_df.empty else None
|
c973d288
tangwang
1. 类目字段处理
|
300
301
|
if option_groups:
logger.info(f"Grouped options into {len(option_groups)} SPU groups")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
302
303
|
documents = []
|
c973d288
tangwang
1. 类目字段处理
|
304
305
306
307
|
skipped_count = 0
error_count = 0
for idx, spu_row in spu_df.iterrows():
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
308
309
|
spu_id = spu_row['id']
|
c973d288
tangwang
1. 类目字段处理
|
310
311
312
313
314
315
316
317
318
319
320
321
|
try:
# Get SKUs for this SPU
skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame()
# Get options for this SPU
options = option_groups.get_group(spu_id) if option_groups and spu_id in option_groups.groups else pd.DataFrame()
# Warn if SPU has no SKUs
if skus.empty:
logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs")
# Transform to ES document
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
322
323
324
325
326
327
|
doc = self.document_transformer.transform_spu_to_doc(
tenant_id=self.tenant_id,
spu_row=spu_row,
skus=skus,
options=options
)
|
c973d288
tangwang
1. 类目字段处理
|
328
329
330
331
332
333
334
335
|
if doc:
documents.append(doc)
else:
skipped_count += 1
logger.warning(f"SPU {spu_id} transformation returned None, skipped")
except Exception as e:
error_count += 1
logger.error(f"Error transforming SPU {spu_id}: {e}", exc_info=True)
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
336
|
|
c973d288
tangwang
1. 类目字段处理
|
337
338
339
340
341
342
|
logger.info(f"Transformation complete:")
logger.info(f" - Total SPUs: {len(spu_df)}")
logger.info(f" - Successfully transformed: {len(documents)}")
logger.info(f" - Skipped: {skipped_count}")
logger.info(f" - Errors: {error_count}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
343
|
return documents
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
|
|