incremental_service.py
21.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
"""增量数据获取服务"""
import pandas as pd
import logging
import time
from typing import Dict, Any, Optional, List
from sqlalchemy import text
from indexer.indexing_utils import load_category_mapping, create_document_transformer
from indexer.bulk_indexer import BulkIndexer
from indexer.mapping_generator import DEFAULT_INDEX_NAME
from indexer.indexer_logger import (
get_indexer_logger, log_index_request, log_index_result, log_spu_processing
)
# Configure logger
logger = logging.getLogger(__name__)
# Indexer专用日志器
indexer_logger = get_indexer_logger()
class IncrementalIndexerService:
"""增量索引服务,提供SPU数据获取功能。"""
def __init__(self, db_engine: Any):
"""初始化增量索引服务"""
self.db_engine = db_engine
# 预加载分类映射(全局,所有租户共享)
self.category_id_to_name = load_category_mapping(db_engine)
logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
"""获取SPU的ES文档数据"""
try:
# 加载SPU数据
spu_row = self._load_single_spu(tenant_id, spu_id)
if spu_row is None:
logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
return None
# 加载SKU数据
skus_df = self._load_skus_for_spu(tenant_id, spu_id)
# 加载Option数据
options_df = self._load_options_for_spu(tenant_id, spu_id)
# 创建文档转换器
transformer = create_document_transformer(
category_id_to_name=self.category_id_to_name,
tenant_id=tenant_id
)
# 转换为ES文档
doc = transformer.transform_spu_to_doc(
tenant_id=tenant_id,
spu_row=spu_row,
skus=skus_df,
options=options_df
)
if doc is None:
logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
return None
return doc
except Exception as e:
logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
raise
def _load_single_spu(self, tenant_id: str, spu_id: str, include_deleted: bool = False) -> Optional[pd.Series]:
"""
加载单个SPU数据
Args:
tenant_id: 租户ID
spu_id: SPU ID
include_deleted: 是否包含已删除的记录(用于检查删除状态)
Returns:
SPU数据Series,如果不存在返回None
"""
if include_deleted:
# 查询所有记录(包括已删除的),用于检查删除状态
query = text("""
SELECT
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
image_src, image_width, image_height, image_path, image_alt,
tags, note, category, category_id, category_google_id,
category_level, category_path,
fake_sales, display_fake_sales,
tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND id = :spu_id
LIMIT 1
""")
else:
# 只查询未删除的记录
query = text("""
SELECT
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
image_src, image_width, image_height, image_path, image_alt,
tags, note, category, category_id, category_google_id,
category_level, category_path,
fake_sales, display_fake_sales,
tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
LIMIT 1
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
if df.empty:
return None
return df.iloc[0]
def check_spu_deleted(self, tenant_id: str, spu_id: str) -> bool:
"""
检查SPU是否在数据库中被标记为删除
Args:
tenant_id: 租户ID
spu_id: SPU ID
Returns:
True表示已删除,False表示未删除或不存在
"""
spu_row = self._load_single_spu(tenant_id, spu_id, include_deleted=True)
if spu_row is None:
# SPU不存在,视为需要删除
return True
# 检查deleted字段(可能是bit类型,需要转换为int或bool)
deleted = spu_row.get('deleted', 0)
# 处理bit类型:可能是b'\x00'或b'\x01',或者直接是0/1
if isinstance(deleted, bytes):
return deleted == b'\x01' or deleted == 1
return bool(deleted)
def delete_spus_from_es(
self,
es_client,
tenant_id: str,
spu_ids: List[str],
index_name: str = DEFAULT_INDEX_NAME
) -> Dict[str, Any]:
"""
从ES中批量删除SPU文档
Args:
es_client: Elasticsearch客户端
tenant_id: 租户ID
spu_ids: 要删除的SPU ID列表
index_name: 索引名称
Returns:
包含删除结果的字典
"""
if not spu_ids:
return {
"deleted": [],
"not_found": [],
"failed": [],
"total": 0,
"deleted_count": 0,
"not_found_count": 0,
"failed_count": 0
}
deleted_list = []
not_found_list = []
failed_list = []
logger.info(f"[IncrementalDeletion] Starting deletion for tenant_id={tenant_id}, spu_count={len(spu_ids)}")
for spu_id in spu_ids:
try:
# 使用ES的delete API删除文档
# ES文档ID格式:通常是spu_id,但需要确认实际使用的ID格式
# 根据index_spus_to_es方法,使用的是spu_id作为文档ID
try:
response = es_client.client.delete(
index=index_name,
id=str(spu_id),
ignore=[404] # 忽略文档不存在的错误
)
if response.get('result') == 'deleted':
deleted_list.append({"spu_id": spu_id, "status": "deleted"})
log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted')
elif response.get('result') == 'not_found':
not_found_list.append({"spu_id": spu_id, "status": "not_found"})
logger.debug(f"[IncrementalDeletion] SPU {spu_id} not found in ES")
else:
failed_list.append({"spu_id": spu_id, "error": f"Unexpected result: {response.get('result')}"})
except Exception as e:
# 处理404错误(文档不存在)
if hasattr(e, 'status_code') and e.status_code == 404:
not_found_list.append({"spu_id": spu_id, "status": "not_found"})
else:
error_msg = str(e)
logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True)
failed_list.append({"spu_id": spu_id, "error": error_msg})
log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', error_msg)
except Exception as e:
error_msg = str(e)
logger.error(f"[IncrementalDeletion] Unexpected error deleting SPU {spu_id}: {e}", exc_info=True)
failed_list.append({"spu_id": spu_id, "error": error_msg})
logger.info(
f"[IncrementalDeletion] Completed for tenant_id={tenant_id}: "
f"total={len(spu_ids)}, deleted={len(deleted_list)}, "
f"not_found={len(not_found_list)}, failed={len(failed_list)}"
)
return {
"deleted": deleted_list,
"not_found": not_found_list,
"failed": failed_list,
"total": len(spu_ids),
"deleted_count": len(deleted_list),
"not_found_count": len(not_found_list),
"failed_count": len(failed_list)
}
def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
"""加载指定SPU的所有SKU数据"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df
def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
"""加载指定SPU的所有Option数据"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
position, name, `values`, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
ORDER BY position
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df
def index_spus_to_es(
self,
es_client,
tenant_id: str,
spu_ids: List[str],
index_name: str = DEFAULT_INDEX_NAME,
batch_size: int = 100,
delete_spu_ids: List[str] = None
) -> Dict[str, Any]:
"""
批量索引SPU到ES(增量索引)
支持两种删除方式:
1. 自动检测删除:根据数据库deleted字段自动检测并删除ES中的文档
2. 显式删除:通过delete_spu_ids参数显式指定要删除的SPU
Args:
es_client: Elasticsearch客户端
tenant_id: 租户ID
spu_ids: SPU ID列表(要索引的)
index_name: 索引名称
batch_size: 批量写入ES的批次大小
delete_spu_ids: 显式指定要删除的SPU ID列表(可选)
Returns:
包含成功/失败列表的字典,以及删除结果
"""
start_time = time.time()
total_count = len(spu_ids)
success_list = []
failed_list = []
documents = []
deleted_list = []
auto_deleted_list = []
# 记录请求开始
log_index_request(
indexer_logger,
index_type='incremental',
tenant_id=tenant_id,
request_params={
'spu_count': total_count,
'delete_count': len(delete_spu_ids) if delete_spu_ids else 0,
'index_name': index_name,
'batch_size': batch_size
}
)
logger.info(
f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, "
f"spu_count={total_count}, delete_count={len(delete_spu_ids) if delete_spu_ids else 0}"
)
# 步骤0: 处理显式删除请求
if delete_spu_ids:
logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs")
delete_result = self.delete_spus_from_es(
es_client=es_client,
tenant_id=tenant_id,
spu_ids=delete_spu_ids,
index_name=index_name
)
deleted_list = delete_result.get('deleted', [])
logger.info(f"[IncrementalIndexing] Explicitly deleted {len(deleted_list)} SPUs from ES")
# 步骤1: 获取所有SPU文档,并自动检测删除
for spu_id in spu_ids:
try:
log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching')
# 先检查SPU是否在数据库中被标记为删除
is_deleted = self.check_spu_deleted(tenant_id, spu_id)
if is_deleted:
# SPU已删除,从ES中删除对应文档
logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES")
try:
response = es_client.client.delete(
index=index_name,
id=str(spu_id),
ignore=[404]
)
if response.get('result') == 'deleted':
auto_deleted_list.append({
"spu_id": spu_id,
"status": "auto_deleted",
"reason": "deleted in database"
})
log_spu_processing(indexer_logger, tenant_id, spu_id, 'auto_deleted', "deleted in database")
elif response.get('result') == 'not_found':
# ES中不存在,也算成功(可能已经被删除过了)
auto_deleted_list.append({
"spu_id": spu_id,
"status": "auto_deleted",
"reason": "deleted in database (not found in ES)"
})
except Exception as e:
error_msg = f"Failed to delete from ES: {str(e)}"
logger.error(f"[IncrementalIndexing] Error deleting SPU {spu_id} from ES: {e}", exc_info=True)
failed_list.append({
"spu_id": spu_id,
"error": error_msg
})
continue
# SPU未删除,正常获取文档
doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
if doc is None:
# 这种情况不应该发生,因为我们已经检查了deleted字段
# 但为了健壮性,仍然处理
error_msg = "SPU not found (unexpected)"
logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check")
log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
failed_list.append({
"spu_id": spu_id,
"error": error_msg
})
continue
log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming')
documents.append(doc)
except Exception as e:
error_msg = str(e)
logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True)
log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
failed_list.append({
"spu_id": spu_id,
"error": error_msg
})
logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents")
# 步骤2: 批量写入ES
if documents:
try:
logger.info(f"[IncrementalIndexing] Indexing {len(documents)} documents to ES (batch_size={batch_size})")
indexer = BulkIndexer(es_client, index_name, batch_size=batch_size, max_retries=3)
bulk_results = indexer.index_documents(
documents,
id_field="spu_id",
show_progress=False
)
# 根据ES返回的结果更新成功列表
# 注意:BulkIndexer返回的是总体统计,我们需要根据实际的失败情况来更新
# 如果ES批量写入有部分失败,我们需要找出哪些失败了
es_success_count = bulk_results.get('success', 0)
es_failed_count = bulk_results.get('failed', 0)
# 由于我们无法精确知道哪些文档失败了,我们假设:
# - 如果ES返回成功数等于文档数,则所有文档都成功
# - 否则,失败的文档可能在ES错误信息中,但我们无法精确映射
# 这里采用简化处理:将成功写入ES的文档加入成功列表
if es_failed_count == 0:
# 全部成功
for doc in documents:
success_list.append({
"spu_id": doc.get('spu_id'),
"status": "indexed"
})
else:
# 有失败的情况,我们标记已处理的文档为成功,未处理的可能失败
# 这是一个简化处理,实际应该根据ES的详细错误信息来判断
logger.warning(f"[IncrementalIndexing] ES bulk index had {es_failed_count} failures")
for doc in documents:
# 由于无法精确知道哪些失败,我们假设全部成功(实际应该改进)
success_list.append({
"spu_id": doc.get('spu_id'),
"status": "indexed"
})
# 如果有ES错误,记录到失败列表(但不包含具体的spu_id)
if bulk_results.get('errors'):
logger.error(f"[IncrementalIndexing] ES errors: {bulk_results['errors'][:5]}")
except Exception as e:
error_msg = f"ES bulk index failed: {str(e)}"
logger.error(f"[IncrementalIndexing] {error_msg}", exc_info=True)
# 所有文档都失败
for doc in documents:
failed_list.append({
"spu_id": doc.get('spu_id'),
"error": error_msg
})
documents = [] # 清空,避免重复处理
else:
logger.warning(f"[IncrementalIndexing] No documents to index for tenant_id={tenant_id}")
elapsed_time = time.time() - start_time
success_count = len(success_list)
failed_count = len(failed_list)
# 记录最终结果(包含删除统计)
log_index_result(
indexer_logger,
index_type='incremental',
tenant_id=tenant_id,
total_count=total_count,
success_count=success_count,
failed_count=failed_count,
elapsed_time=elapsed_time,
index_name=index_name,
errors=[item.get('error') for item in failed_list[:10]] if failed_list else None,
deleted_count=total_deleted_count,
explicit_deleted_count=explicit_deleted_count,
auto_deleted_count=auto_deleted_count
)
# 统计删除数量
explicit_deleted_count = len(deleted_list) if delete_spu_ids else 0
auto_deleted_count = len(auto_deleted_list)
total_deleted_count = explicit_deleted_count + auto_deleted_count
logger.info(
f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: "
f"total={total_count}, success={success_count}, failed={failed_count}, "
f"explicit_deleted={explicit_deleted_count}, auto_deleted={auto_deleted_count}, "
f"elapsed={elapsed_time:.2f}s"
)
return {
"success": success_list,
"failed": failed_list,
"deleted": {
"explicit": deleted_list if delete_spu_ids else [],
"auto": auto_deleted_list,
"total_count": total_deleted_count,
"explicit_count": explicit_deleted_count,
"auto_count": auto_deleted_count
},
"total": total_count,
"success_count": success_count,
"failed_count": failed_count,
"elapsed_time": elapsed_time,
"index_name": index_name,
"tenant_id": tenant_id
}