Blame view

indexer/incremental_service.py 18.2 KB
3c1f8031   tangwang   api/routes/indexe...
1
  """增量数据获取服务"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
2
3
  
  import pandas as pd
0064e946   tangwang   feat: 增量索引服务、租户配置...
4
  import logging
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
5
6
  import time
  from typing import Dict, Any, Optional, List
0064e946   tangwang   feat: 增量索引服务、租户配置...
7
  from sqlalchemy import text
3c1f8031   tangwang   api/routes/indexe...
8
  from indexer.indexing_utils import load_category_mapping, create_document_transformer
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
9
10
11
12
13
  from indexer.bulk_indexer import BulkIndexer
  from indexer.mapping_generator import DEFAULT_INDEX_NAME
  from indexer.indexer_logger import (
      get_indexer_logger, log_index_request, log_index_result, log_spu_processing
  )
0064e946   tangwang   feat: 增量索引服务、租户配置...
14
15
16
  
  # Configure logger
  logger = logging.getLogger(__name__)
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
17
18
  # Indexer专用日志器
  indexer_logger = get_indexer_logger()
0064e946   tangwang   feat: 增量索引服务、租户配置...
19
20
21
  
  
  class IncrementalIndexerService:
3c1f8031   tangwang   api/routes/indexe...
22
      """增量索引服务,提供SPU数据获取功能。"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
23
24
  
      def __init__(self, db_engine: Any):
3c1f8031   tangwang   api/routes/indexe...
25
          """初始化增量索引服务"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
26
27
28
          self.db_engine = db_engine
          
          # 预加载分类映射(全局,所有租户共享)
3c1f8031   tangwang   api/routes/indexe...
29
          self.category_id_to_name = load_category_mapping(db_engine)
0064e946   tangwang   feat: 增量索引服务、租户配置...
30
          logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
0064e946   tangwang   feat: 增量索引服务、租户配置...
31
32
  
      def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
3c1f8031   tangwang   api/routes/indexe...
33
          """获取SPU的ES文档数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
34
35
36
37
38
39
40
41
42
43
44
45
46
          try:
              # 加载SPU数据
              spu_row = self._load_single_spu(tenant_id, spu_id)
              if spu_row is None:
                  logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
                  return None
  
              # 加载SKU数据
              skus_df = self._load_skus_for_spu(tenant_id, spu_id)
  
              # 加载Option数据
              options_df = self._load_options_for_spu(tenant_id, spu_id)
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
47
              # 创建文档转换器
3c1f8031   tangwang   api/routes/indexe...
48
              transformer = create_document_transformer(
0064e946   tangwang   feat: 增量索引服务、租户配置...
49
                  category_id_to_name=self.category_id_to_name,
3c1f8031   tangwang   api/routes/indexe...
50
                  tenant_id=tenant_id
0064e946   tangwang   feat: 增量索引服务、租户配置...
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
              )
              
              # 转换为ES文档
              doc = transformer.transform_spu_to_doc(
                  tenant_id=tenant_id,
                  spu_row=spu_row,
                  skus=skus_df,
                  options=options_df
              )
              
              if doc is None:
                  logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
                  return None
  
              return doc
  
          except Exception as e:
              logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
              raise
  
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
      def _load_single_spu(self, tenant_id: str, spu_id: str, include_deleted: bool = False) -> Optional[pd.Series]:
          """
          加载单个SPU数据
          
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
              include_deleted: 是否包含已删除的记录(用于检查删除状态)
          
          Returns:
              SPU数据Series,如果不存在返回None
          """
          if include_deleted:
              # 查询所有记录(包括已删除的),用于检查删除状态
              query = text("""
                  SELECT 
                      id, shop_id, shoplazza_id, title, brief, description,
                      spu, vendor, vendor_url,
                      image_src, image_width, image_height, image_path, image_alt,
                      tags, note, category, category_id, category_google_id,
                      category_level, category_path,
                      fake_sales, display_fake_sales,
                      tenant_id, creator, create_time, updater, update_time, deleted
                  FROM shoplazza_product_spu
                  WHERE tenant_id = :tenant_id AND id = :spu_id
                  LIMIT 1
              """)
          else:
              # 只查询未删除的记录
              query = text("""
                  SELECT 
                      id, shop_id, shoplazza_id, title, brief, description,
                      spu, vendor, vendor_url,
                      image_src, image_width, image_height, image_path, image_alt,
                      tags, note, category, category_id, category_google_id,
                      category_level, category_path,
                      fake_sales, display_fake_sales,
                      tenant_id, creator, create_time, updater, update_time, deleted
                  FROM shoplazza_product_spu
                  WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
                  LIMIT 1
              """)
0064e946   tangwang   feat: 增量索引服务、租户配置...
113
114
115
116
117
118
119
120
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          if df.empty:
              return None
          
          return df.iloc[0]
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
      
      def check_spu_deleted(self, tenant_id: str, spu_id: str) -> bool:
          """
          检查SPU是否在数据库中被标记为删除
          
          Args:
              tenant_id: 租户ID
              spu_id: SPU ID
          
          Returns:
              True表示已删除,False表示未删除或不存在
          """
          spu_row = self._load_single_spu(tenant_id, spu_id, include_deleted=True)
          if spu_row is None:
              # SPU不存在,视为需要删除
              return True
          # 检查deleted字段(可能是bit类型,需要转换为int或bool)
          deleted = spu_row.get('deleted', 0)
          # 处理bit类型:可能是b'\x00'或b'\x01',或者直接是0/1
          if isinstance(deleted, bytes):
              return deleted == b'\x01' or deleted == 1
          return bool(deleted)
      
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
144
      def _delete_spu_from_es(
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
145
146
147
          self,
          es_client,
          tenant_id: str,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
148
149
150
          spu_id: str,
          index_name: str,
          log_prefix: str = ""
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
151
152
      ) -> Dict[str, Any]:
          """
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
153
          ES中删除单个SPU文档(通用方法)
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
154
155
          
          Returns:
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
156
              {"status": "deleted|not_found|failed", "msg": "错误信息(可选)"}
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
157
          """
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
          try:
              response = es_client.client.delete(
                  index=index_name,
                  id=str(spu_id),
                  ignore=[404]
              )
              
              result = response.get('result')
              if result == 'deleted':
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted', log_prefix)
                  return {"status": "deleted"}
              elif result == 'not_found':
                  return {"status": "not_found"}
              else:
                  msg = f"Unexpected result: {result}"
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg)
                  return {"status": "failed", "msg": msg}
                  
          except Exception as e:
              if hasattr(e, 'status_code') and e.status_code == 404:
                  return {"status": "not_found"}
              else:
                  msg = str(e)
                  logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True)
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg)
                  return {"status": "failed", "msg": msg}
0064e946   tangwang   feat: 增量索引服务、租户配置...
184
185
  
      def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
3c1f8031   tangwang   api/routes/indexe...
186
          """加载指定SPU的所有SKU数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  shoplazza_image_id, title, sku, barcode, position,
                  price, compare_at_price, cost_price,
                  option1, option2, option3,
                  inventory_quantity, weight, weight_unit, image_src,
                  wholesale_price, note, extend,
                  shoplazza_created_at, shoplazza_updated_at, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_sku
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df
  
      def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
3c1f8031   tangwang   api/routes/indexe...
207
          """加载指定SPU的所有Option数据"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
          query = text("""
              SELECT 
                  id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
                  position, name, `values`, tenant_id,
                  creator, create_time, updater, update_time, deleted
              FROM shoplazza_product_option
              WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
              ORDER BY position
          """)
          
          with self.db_engine.connect() as conn:
              df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
          
          return df
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
223
224
225
226
227
228
      def index_spus_to_es(
          self,
          es_client,
          tenant_id: str,
          spu_ids: List[str],
          index_name: str = DEFAULT_INDEX_NAME,
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
229
230
          batch_size: int = 100,
          delete_spu_ids: List[str] = None
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
231
232
233
234
      ) -> Dict[str, Any]:
          """
          批量索引SPUES(增量索引)
          
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
235
236
237
238
          支持两种删除方式:
          1. 自动检测删除:根据数据库deleted字段自动检测并删除ES中的文档
          2. 显式删除:通过delete_spu_ids参数显式指定要删除的SPU
          
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
239
240
241
          Args:
              es_client: Elasticsearch客户端
              tenant_id: 租户ID
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
242
              spu_ids: SPU ID列表(要索引的)
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
243
244
              index_name: 索引名称
              batch_size: 批量写入ES的批次大小
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
245
              delete_spu_ids: 显式指定要删除的SPU ID列表(可选)
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
246
247
              
          Returns:
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
248
              包含成功/失败列表的字典,以及删除结果
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
249
250
251
          """
          start_time = time.time()
          total_count = len(spu_ids)
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
252
253
254
255
256
257
258
          delete_count = len(delete_spu_ids) if delete_spu_ids else 0
          
          # spu_ids 对应的响应列表(状态:indexed, deleted, failed)
          spu_results = []
          # delete_spu_ids 对应的响应列表(状态:deleted, not_found, failed)
          delete_results = []
          
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
259
          documents = []
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
260
261
262
263
264
265
266
267
          
          # 记录请求开始
          log_index_request(
              indexer_logger,
              index_type='incremental',
              tenant_id=tenant_id,
              request_params={
                  'spu_count': total_count,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
268
                  'delete_count': delete_count,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
269
270
271
272
273
                  'index_name': index_name,
                  'batch_size': batch_size
              }
          )
          
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
274
275
          logger.info(
              f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, "
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
276
              f"spu_count={total_count}, delete_count={delete_count}"
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
277
          )
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
278
          
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
279
          # 步骤0: 处理显式删除请求(delete_spu_ids)
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
280
281
          if delete_spu_ids:
              logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs")
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
282
283
284
              for spu_id in delete_spu_ids:
                  result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit")
                  delete_results.append({"spu_id": spu_id, **result})
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
285
          
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
286
          # 步骤1: 处理索引请求(spu_ids),并自动检测删除
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
287
288
289
          for spu_id in spu_ids:
              try:
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching')
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
290
291
                  
                  # 先检查SPU是否在数据库中被标记为删除
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
292
                  if self.check_spu_deleted(tenant_id, spu_id):
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
293
294
                      # SPU已删除,从ES中删除对应文档
                      logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES")
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
295
296
297
298
299
300
301
302
                      result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto")
                      # 统一状态:deleted或not_found都算deleted,failed保持failed
                      status = "deleted" if result["status"] != "failed" else "failed"
                      spu_results.append({
                          "spu_id": spu_id,
                          "status": status,
                          **({"msg": result["msg"]} if status == "failed" else {})
                      })
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
303
304
305
                      continue
                  
                  # SPU未删除,正常获取文档
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
306
307
308
                  doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
                  
                  if doc is None:
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
309
310
311
312
                      # 这种情况不应该发生,因为我们已经检查了deleted字段
                      # 但为了健壮性,仍然处理
                      error_msg = "SPU not found (unexpected)"
                      logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
313
                      log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
314
                      spu_results.append({
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
315
                          "spu_id": spu_id,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
316
317
                          "status": "failed",
                          "msg": error_msg
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
318
319
320
321
                      })
                      continue
                  
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming')
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
322
                  documents.append((spu_id, doc))  # 保存spu_id和doc的对应关系
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
323
324
325
326
327
                  
              except Exception as e:
                  error_msg = str(e)
                  logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True)
                  log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
328
                  spu_results.append({
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
329
                      "spu_id": spu_id,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
330
331
                      "status": "failed",
                      "msg": error_msg
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
332
333
334
335
336
337
338
                  })
          
          logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents")
          
          # 步骤2: 批量写入ES
          if documents:
              try:
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
339
340
341
                  # 提取doc列表用于批量写入
                  doc_list = [doc for _, doc in documents]
                  logger.info(f"[IncrementalIndexing] Indexing {len(doc_list)} documents to ES (batch_size={batch_size})")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
342
343
                  indexer = BulkIndexer(es_client, index_name, batch_size=batch_size, max_retries=3)
                  bulk_results = indexer.index_documents(
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
344
                      doc_list,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
345
346
347
348
                      id_field="spu_id",
                      show_progress=False
                  )
                  
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
349
                  # 根据ES返回的结果更新spu_results
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
350
351
352
                  es_success_count = bulk_results.get('success', 0)
                  es_failed_count = bulk_results.get('failed', 0)
                  
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
353
                  # 由于BulkIndexer返回的是总体统计,我们假设:
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
354
355
                  # - 如果ES返回成功数等于文档数,则所有文档都成功
                  # - 否则,失败的文档可能在ES错误信息中,但我们无法精确映射
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
356
                  # 这里采用简化处理:将成功写入ES的文档标记为indexed
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
357
358
                  if es_failed_count == 0:
                      # 全部成功
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
359
360
361
                      for spu_id, doc in documents:
                          spu_results.append({
                              "spu_id": spu_id,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
362
363
364
365
                              "status": "indexed"
                          })
                  else:
                      # 有失败的情况,我们标记已处理的文档为成功,未处理的可能失败
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
366
                      logger.warning(f"[IncrementalIndexing] ES bulk index had {es_failed_count} failures")
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
367
                      for spu_id, doc in documents:
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
368
                          # 由于无法精确知道哪些失败,我们假设全部成功(实际应该改进)
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
369
370
                          spu_results.append({
                              "spu_id": spu_id,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
371
372
373
                              "status": "indexed"
                          })
                      
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
374
                      # 如果有ES错误,记录日志
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
375
376
377
378
379
380
381
                      if bulk_results.get('errors'):
                          logger.error(f"[IncrementalIndexing] ES errors: {bulk_results['errors'][:5]}")
                  
              except Exception as e:
                  error_msg = f"ES bulk index failed: {str(e)}"
                  logger.error(f"[IncrementalIndexing] {error_msg}", exc_info=True)
                  # 所有文档都失败
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
382
383
384
385
386
387
388
389
390
391
392
393
394
                  for spu_id, doc in documents:
                      # 检查是否已经在spu_results中(可能之前已经标记为failed)
                      existing = next((r for r in spu_results if r.get('spu_id') == spu_id), None)
                      if existing:
                          # 如果已存在,更新状态
                          existing['status'] = 'failed'
                          existing['msg'] = error_msg
                      else:
                          spu_results.append({
                              "spu_id": spu_id,
                              "status": "failed",
                              "msg": error_msg
                          })
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
395
396
397
398
          else:
              logger.warning(f"[IncrementalIndexing] No documents to index for tenant_id={tenant_id}")
          
          elapsed_time = time.time() - start_time
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
399
          
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
400
401
402
403
404
405
406
          # 统计结果(简化)
          total_processed = total_count + delete_count
          total_success = len([r for r in spu_results + delete_results if r.get('status') in ('indexed', 'deleted', 'not_found')])
          total_failed = len([r for r in spu_results + delete_results if r.get('status') == 'failed'])
          
          # 记录最终结果
          deleted_count = len([r for r in spu_results + delete_results if r.get('status') == 'deleted'])
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
407
408
409
410
          log_index_result(
              indexer_logger,
              index_type='incremental',
              tenant_id=tenant_id,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
411
412
413
              total_count=total_processed,
              success_count=total_success,
              failed_count=total_failed,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
414
415
              elapsed_time=elapsed_time,
              index_name=index_name,
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
416
417
              errors=[r.get('msg') for r in spu_results + delete_results if r.get('status') == 'failed'][:10],
              deleted_count=deleted_count
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
418
419
          )
          
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
420
421
          logger.info(
              f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: "
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
422
              f"total={total_processed}, success={total_success}, failed={total_failed}, "
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
423
424
425
426
              f"elapsed={elapsed_time:.2f}s"
          )
          
          return {
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
427
428
429
430
431
              "spu_ids": spu_results,  # spu_ids对应的响应列表
              "delete_spu_ids": delete_results,  # delete_spu_ids对应的响应列表
              "total": total_processed,
              "success_count": total_success,
              "failed_count": total_failed,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
432
433
434
435
              "elapsed_time": elapsed_time,
              "index_name": index_name,
              "tenant_id": tenant_id
          }
0064e946   tangwang   feat: 增量索引服务、租户配置...