Commit f54b3854aa9bf20120f929c4436f13a18698eef0

Authored by tangwang
1 parent 737d4e6a

pu_ids参数。目前总共3个参数:

tenant_id
spu_ids
delete_spu_ids

spu_ids里面的,如果is_delete字段为1,我这边也要做删除。
delete_spu_ids的 直接删除
为您的变更输入提交说明。以 '#' 开始的行将被忽略,而一个空的提交
api/routes/indexer.py
... ... @@ -25,6 +25,7 @@ class IndexSpusRequest(BaseModel):
25 25 """增量索引请求(按SPU列表索引)"""
26 26 tenant_id: str
27 27 spu_ids: List[str]
  28 + delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选)
28 29  
29 30  
30 31 class GetDocumentsRequest(BaseModel):
... ... @@ -63,13 +64,28 @@ async def index_spus(request: IndexSpusRequest):
63 64 增量索引接口
64 65  
65 66 根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
  67 +
  68 + 支持两种删除方式:
  69 + 1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
  70 + 2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
  71 +
  72 + 删除策略说明:
  73 + - 数据库是唯一真实来源(Single Source of Truth)
  74 + - 自动检测:查询数据库时发现deleted=1,自动从ES删除
  75 + - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
66 76 """
67 77 try:
68 78 from ..app import get_incremental_service, get_es_client
69   - if not request.spu_ids:
70   - raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
71   - if len(request.spu_ids) > 100:
72   - raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
  79 +
  80 + # 验证请求参数
  81 + if not request.spu_ids and not request.delete_spu_ids:
  82 + raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
  83 +
  84 + if request.spu_ids and len(request.spu_ids) > 100:
  85 + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
  86 +
  87 + if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
  88 + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
73 89  
74 90 service = get_incremental_service()
75 91 if service is None:
... ... @@ -79,11 +95,12 @@ async def index_spus(request: IndexSpusRequest):
79 95 if es_client is None:
80 96 raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
81 97  
82   - # 调用批量索引方法
  98 + # 调用批量索引方法(支持显式删除参数)
83 99 result = service.index_spus_to_es(
84 100 es_client=es_client,
85 101 tenant_id=request.tenant_id,
86   - spu_ids=request.spu_ids
  102 + spu_ids=request.spu_ids if request.spu_ids else [],
  103 + delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
87 104 )
88 105  
89 106 return result
... ...
indexer/incremental_service.py
... ... @@ -68,21 +68,48 @@ class IncrementalIndexerService:
68 68 logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
69 69 raise
70 70  
71   - def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
72   - """加载单个SPU数据"""
73   - query = text("""
74   - SELECT
75   - id, shop_id, shoplazza_id, title, brief, description,
76   - spu, vendor, vendor_url,
77   - image_src, image_width, image_height, image_path, image_alt,
78   - tags, note, category, category_id, category_google_id,
79   - category_level, category_path,
80   - fake_sales, display_fake_sales,
81   - tenant_id, creator, create_time, updater, update_time, deleted
82   - FROM shoplazza_product_spu
83   - WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
84   - LIMIT 1
85   - """)
  71 + def _load_single_spu(self, tenant_id: str, spu_id: str, include_deleted: bool = False) -> Optional[pd.Series]:
  72 + """
  73 + 加载单个SPU数据
  74 +
  75 + Args:
  76 + tenant_id: 租户ID
  77 + spu_id: SPU ID
  78 + include_deleted: 是否包含已删除的记录(用于检查删除状态)
  79 +
  80 + Returns:
  81 + SPU数据Series,如果不存在返回None
  82 + """
  83 + if include_deleted:
  84 + # 查询所有记录(包括已删除的),用于检查删除状态
  85 + query = text("""
  86 + SELECT
  87 + id, shop_id, shoplazza_id, title, brief, description,
  88 + spu, vendor, vendor_url,
  89 + image_src, image_width, image_height, image_path, image_alt,
  90 + tags, note, category, category_id, category_google_id,
  91 + category_level, category_path,
  92 + fake_sales, display_fake_sales,
  93 + tenant_id, creator, create_time, updater, update_time, deleted
  94 + FROM shoplazza_product_spu
  95 + WHERE tenant_id = :tenant_id AND id = :spu_id
  96 + LIMIT 1
  97 + """)
  98 + else:
  99 + # 只查询未删除的记录
  100 + query = text("""
  101 + SELECT
  102 + id, shop_id, shoplazza_id, title, brief, description,
  103 + spu, vendor, vendor_url,
  104 + image_src, image_width, image_height, image_path, image_alt,
  105 + tags, note, category, category_id, category_google_id,
  106 + category_level, category_path,
  107 + fake_sales, display_fake_sales,
  108 + tenant_id, creator, create_time, updater, update_time, deleted
  109 + FROM shoplazza_product_spu
  110 + WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
  111 + LIMIT 1
  112 + """)
86 113  
87 114 with self.db_engine.connect() as conn:
88 115 df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
... ... @@ -91,6 +118,116 @@ class IncrementalIndexerService:
91 118 return None
92 119  
93 120 return df.iloc[0]
  121 +
  122 + def check_spu_deleted(self, tenant_id: str, spu_id: str) -> bool:
  123 + """
  124 + 检查SPU是否在数据库中被标记为删除
  125 +
  126 + Args:
  127 + tenant_id: 租户ID
  128 + spu_id: SPU ID
  129 +
  130 + Returns:
  131 + True表示已删除,False表示未删除或不存在
  132 + """
  133 + spu_row = self._load_single_spu(tenant_id, spu_id, include_deleted=True)
  134 + if spu_row is None:
  135 + # SPU不存在,视为需要删除
  136 + return True
  137 + # 检查deleted字段(可能是bit类型,需要转换为int或bool)
  138 + deleted = spu_row.get('deleted', 0)
  139 + # 处理bit类型:可能是b'\x00'或b'\x01',或者直接是0/1
  140 + if isinstance(deleted, bytes):
  141 + return deleted == b'\x01' or deleted == 1
  142 + return bool(deleted)
  143 +
  144 + def delete_spus_from_es(
  145 + self,
  146 + es_client,
  147 + tenant_id: str,
  148 + spu_ids: List[str],
  149 + index_name: str = DEFAULT_INDEX_NAME
  150 + ) -> Dict[str, Any]:
  151 + """
  152 + 从ES中批量删除SPU文档
  153 +
  154 + Args:
  155 + es_client: Elasticsearch客户端
  156 + tenant_id: 租户ID
  157 + spu_ids: 要删除的SPU ID列表
  158 + index_name: 索引名称
  159 +
  160 + Returns:
  161 + 包含删除结果的字典
  162 + """
  163 + if not spu_ids:
  164 + return {
  165 + "deleted": [],
  166 + "not_found": [],
  167 + "failed": [],
  168 + "total": 0,
  169 + "deleted_count": 0,
  170 + "not_found_count": 0,
  171 + "failed_count": 0
  172 + }
  173 +
  174 + deleted_list = []
  175 + not_found_list = []
  176 + failed_list = []
  177 +
  178 + logger.info(f"[IncrementalDeletion] Starting deletion for tenant_id={tenant_id}, spu_count={len(spu_ids)}")
  179 +
  180 + for spu_id in spu_ids:
  181 + try:
  182 + # 使用ES的delete API删除文档
  183 + # ES文档ID格式:通常是spu_id,但需要确认实际使用的ID格式
  184 + # 根据index_spus_to_es方法,使用的是spu_id作为文档ID
  185 + try:
  186 + response = es_client.client.delete(
  187 + index=index_name,
  188 + id=str(spu_id),
  189 + ignore=[404] # 忽略文档不存在的错误
  190 + )
  191 +
  192 + if response.get('result') == 'deleted':
  193 + deleted_list.append({"spu_id": spu_id, "status": "deleted"})
  194 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted')
  195 + elif response.get('result') == 'not_found':
  196 + not_found_list.append({"spu_id": spu_id, "status": "not_found"})
  197 + logger.debug(f"[IncrementalDeletion] SPU {spu_id} not found in ES")
  198 + else:
  199 + failed_list.append({"spu_id": spu_id, "error": f"Unexpected result: {response.get('result')}"})
  200 +
  201 + except Exception as e:
  202 + # 处理404错误(文档不存在)
  203 + if hasattr(e, 'status_code') and e.status_code == 404:
  204 + not_found_list.append({"spu_id": spu_id, "status": "not_found"})
  205 + else:
  206 + error_msg = str(e)
  207 + logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True)
  208 + failed_list.append({"spu_id": spu_id, "error": error_msg})
  209 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', error_msg)
  210 +
  211 + except Exception as e:
  212 + error_msg = str(e)
  213 + logger.error(f"[IncrementalDeletion] Unexpected error deleting SPU {spu_id}: {e}", exc_info=True)
  214 + failed_list.append({"spu_id": spu_id, "error": error_msg})
  215 +
  216 + logger.info(
  217 + f"[IncrementalDeletion] Completed for tenant_id={tenant_id}: "
  218 + f"total={len(spu_ids)}, deleted={len(deleted_list)}, "
  219 + f"not_found={len(not_found_list)}, failed={len(failed_list)}"
  220 + )
  221 +
  222 + return {
  223 + "deleted": deleted_list,
  224 + "not_found": not_found_list,
  225 + "failed": failed_list,
  226 + "total": len(spu_ids),
  227 + "deleted_count": len(deleted_list),
  228 + "not_found_count": len(not_found_list),
  229 + "failed_count": len(failed_list)
  230 + }
94 231  
95 232 def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
96 233 """加载指定SPU的所有SKU数据"""
... ... @@ -136,26 +273,34 @@ class IncrementalIndexerService:
136 273 tenant_id: str,
137 274 spu_ids: List[str],
138 275 index_name: str = DEFAULT_INDEX_NAME,
139   - batch_size: int = 100
  276 + batch_size: int = 100,
  277 + delete_spu_ids: List[str] = None
140 278 ) -> Dict[str, Any]:
141 279 """
142 280 批量索引SPU到ES(增量索引)
143 281  
  282 + 支持两种删除方式:
  283 + 1. 自动检测删除:根据数据库deleted字段自动检测并删除ES中的文档
  284 + 2. 显式删除:通过delete_spu_ids参数显式指定要删除的SPU
  285 +
144 286 Args:
145 287 es_client: Elasticsearch客户端
146 288 tenant_id: 租户ID
147   - spu_ids: SPU ID列表
  289 + spu_ids: SPU ID列表(要索引的)
148 290 index_name: 索引名称
149 291 batch_size: 批量写入ES的批次大小
  292 + delete_spu_ids: 显式指定要删除的SPU ID列表(可选)
150 293  
151 294 Returns:
152   - 包含成功/失败列表的字典
  295 + 包含成功/失败列表的字典,以及删除结果
153 296 """
154 297 start_time = time.time()
155 298 total_count = len(spu_ids)
156 299 success_list = []
157 300 failed_list = []
158 301 documents = []
  302 + deleted_list = []
  303 + auto_deleted_list = []
159 304  
160 305 # 记录请求开始
161 306 log_index_request(
... ... @@ -164,21 +309,76 @@ class IncrementalIndexerService:
164 309 tenant_id=tenant_id,
165 310 request_params={
166 311 'spu_count': total_count,
  312 + 'delete_count': len(delete_spu_ids) if delete_spu_ids else 0,
167 313 'index_name': index_name,
168 314 'batch_size': batch_size
169 315 }
170 316 )
171 317  
172   - logger.info(f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, spu_count={total_count}")
  318 + logger.info(
  319 + f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, "
  320 + f"spu_count={total_count}, delete_count={len(delete_spu_ids) if delete_spu_ids else 0}"
  321 + )
173 322  
174   - # 步骤1: 获取所有SPU文档
  323 + # 步骤0: 处理显式删除请求
  324 + if delete_spu_ids:
  325 + logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs")
  326 + delete_result = self.delete_spus_from_es(
  327 + es_client=es_client,
  328 + tenant_id=tenant_id,
  329 + spu_ids=delete_spu_ids,
  330 + index_name=index_name
  331 + )
  332 + deleted_list = delete_result.get('deleted', [])
  333 + logger.info(f"[IncrementalIndexing] Explicitly deleted {len(deleted_list)} SPUs from ES")
  334 +
  335 + # 步骤1: 获取所有SPU文档,并自动检测删除
175 336 for spu_id in spu_ids:
176 337 try:
177 338 log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching')
  339 +
  340 + # 先检查SPU是否在数据库中被标记为删除
  341 + is_deleted = self.check_spu_deleted(tenant_id, spu_id)
  342 + if is_deleted:
  343 + # SPU已删除,从ES中删除对应文档
  344 + logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES")
  345 + try:
  346 + response = es_client.client.delete(
  347 + index=index_name,
  348 + id=str(spu_id),
  349 + ignore=[404]
  350 + )
  351 + if response.get('result') == 'deleted':
  352 + auto_deleted_list.append({
  353 + "spu_id": spu_id,
  354 + "status": "auto_deleted",
  355 + "reason": "deleted in database"
  356 + })
  357 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'auto_deleted', "deleted in database")
  358 + elif response.get('result') == 'not_found':
  359 + # ES中不存在,也算成功(可能已经被删除过了)
  360 + auto_deleted_list.append({
  361 + "spu_id": spu_id,
  362 + "status": "auto_deleted",
  363 + "reason": "deleted in database (not found in ES)"
  364 + })
  365 + except Exception as e:
  366 + error_msg = f"Failed to delete from ES: {str(e)}"
  367 + logger.error(f"[IncrementalIndexing] Error deleting SPU {spu_id} from ES: {e}", exc_info=True)
  368 + failed_list.append({
  369 + "spu_id": spu_id,
  370 + "error": error_msg
  371 + })
  372 + continue
  373 +
  374 + # SPU未删除,正常获取文档
178 375 doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
179 376  
180 377 if doc is None:
181   - error_msg = "SPU not found or deleted"
  378 + # 这种情况不应该发生,因为我们已经检查了deleted字段
  379 + # 但为了健壮性,仍然处理
  380 + error_msg = "SPU not found (unexpected)"
  381 + logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check")
182 382 log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
183 383 failed_list.append({
184 384 "spu_id": spu_id,
... ... @@ -260,7 +460,7 @@ class IncrementalIndexerService:
260 460 success_count = len(success_list)
261 461 failed_count = len(failed_list)
262 462  
263   - # 记录最终结果
  463 + # 记录最终结果(包含删除统计)
264 464 log_index_result(
265 465 indexer_logger,
266 466 index_type='incremental',
... ... @@ -270,18 +470,34 @@ class IncrementalIndexerService:
270 470 failed_count=failed_count,
271 471 elapsed_time=elapsed_time,
272 472 index_name=index_name,
273   - errors=[item.get('error') for item in failed_list[:10]] if failed_list else None
  473 + errors=[item.get('error') for item in failed_list[:10]] if failed_list else None,
  474 + deleted_count=total_deleted_count,
  475 + explicit_deleted_count=explicit_deleted_count,
  476 + auto_deleted_count=auto_deleted_count
274 477 )
275 478  
  479 + # 统计删除数量
  480 + explicit_deleted_count = len(deleted_list) if delete_spu_ids else 0
  481 + auto_deleted_count = len(auto_deleted_list)
  482 + total_deleted_count = explicit_deleted_count + auto_deleted_count
  483 +
276 484 logger.info(
277 485 f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: "
278 486 f"total={total_count}, success={success_count}, failed={failed_count}, "
  487 + f"explicit_deleted={explicit_deleted_count}, auto_deleted={auto_deleted_count}, "
279 488 f"elapsed={elapsed_time:.2f}s"
280 489 )
281 490  
282 491 return {
283 492 "success": success_list,
284 493 "failed": failed_list,
  494 + "deleted": {
  495 + "explicit": deleted_list if delete_spu_ids else [],
  496 + "auto": auto_deleted_list,
  497 + "total_count": total_deleted_count,
  498 + "explicit_count": explicit_deleted_count,
  499 + "auto_count": auto_deleted_count
  500 + },
285 501 "total": total_count,
286 502 "success_count": success_count,
287 503 "failed_count": failed_count,
... ...
indexer/indexer_logger.py
... ... @@ -150,7 +150,10 @@ def log_index_result(
150 150 failed_count: int,
151 151 elapsed_time: float,
152 152 index_name: Optional[str] = None,
153   - errors: Optional[list] = None
  153 + errors: Optional[list] = None,
  154 + deleted_count: Optional[int] = None,
  155 + explicit_deleted_count: Optional[int] = None,
  156 + auto_deleted_count: Optional[int] = None
154 157 ):
155 158 """
156 159 记录索引结果
... ... @@ -165,23 +168,43 @@ def log_index_result(
165 168 elapsed_time: 耗时(秒)
166 169 index_name: 索引名称
167 170 errors: 错误列表
  171 + deleted_count: 删除总数(可选)
  172 + explicit_deleted_count: 显式删除数(可选)
  173 + auto_deleted_count: 自动删除数(可选)
168 174 """
169   - logger.info(
  175 + message = (
170 176 f"Index request completed: type={index_type}, tenant_id={tenant_id}, "
171   - f"total={total_count}, success={success_count}, failed={failed_count}, "
172   - f"elapsed={elapsed_time:.2f}s",
173   - extra={
174   - 'index_type': index_type,
175   - 'tenant_id': tenant_id,
176   - 'operation': 'request_complete',
177   - 'total_count': total_count,
178   - 'success_count': success_count,
179   - 'failed_count': failed_count,
180   - 'elapsed_time': elapsed_time,
181   - 'index_name': index_name,
182   - 'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None
183   - }
  177 + f"total={total_count}, success={success_count}, failed={failed_count}"
184 178 )
  179 +
  180 + if deleted_count is not None:
  181 + message += f", deleted={deleted_count}"
  182 + if explicit_deleted_count is not None:
  183 + message += f" (explicit={explicit_deleted_count}, auto={auto_deleted_count or 0})"
  184 +
  185 + message += f", elapsed={elapsed_time:.2f}s"
  186 +
  187 + extra_data = {
  188 + 'index_type': index_type,
  189 + 'tenant_id': tenant_id,
  190 + 'operation': 'request_complete',
  191 + 'total_count': total_count,
  192 + 'success_count': success_count,
  193 + 'failed_count': failed_count,
  194 + 'elapsed_time': elapsed_time,
  195 + 'index_name': index_name,
  196 + 'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None
  197 + }
  198 +
  199 + # 添加删除统计信息(如果提供)
  200 + if deleted_count is not None:
  201 + extra_data['deleted_count'] = deleted_count
  202 + if explicit_deleted_count is not None:
  203 + extra_data['explicit_deleted_count'] = explicit_deleted_count
  204 + if auto_deleted_count is not None:
  205 + extra_data['auto_deleted_count'] = auto_deleted_count
  206 +
  207 + logger.info(message, extra=extra_data)
185 208  
186 209  
187 210 def log_spu_processing(
... ...