diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 5eabbaf..2ae2cc3 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -25,6 +25,7 @@ class IndexSpusRequest(BaseModel): """增量索引请求(按SPU列表索引)""" tenant_id: str spu_ids: List[str] + delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选) class GetDocumentsRequest(BaseModel): @@ -63,13 +64,28 @@ async def index_spus(request: IndexSpusRequest): 增量索引接口 根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。 + + 支持两种删除方式: + 1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档 + 2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何) + + 删除策略说明: + - 数据库是唯一真实来源(Single Source of Truth) + - 自动检测:查询数据库时发现deleted=1,自动从ES删除 + - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景) """ try: from ..app import get_incremental_service, get_es_client - if not request.spu_ids: - raise HTTPException(status_code=400, detail="spu_ids cannot be empty") - if len(request.spu_ids) > 100: - raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") + + # 验证请求参数 + if not request.spu_ids and not request.delete_spu_ids: + raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty") + + if request.spu_ids and len(request.spu_ids) > 100: + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing") + + if request.delete_spu_ids and len(request.delete_spu_ids) > 100: + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion") service = get_incremental_service() if service is None: @@ -79,11 +95,12 @@ async def index_spus(request: IndexSpusRequest): if es_client is None: raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") - # 调用批量索引方法 + # 调用批量索引方法(支持显式删除参数) result = service.index_spus_to_es( es_client=es_client, tenant_id=request.tenant_id, - spu_ids=request.spu_ids + spu_ids=request.spu_ids if request.spu_ids else [], + delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None ) return result diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 662d65b..5f8dcf2 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -68,21 +68,48 @@ class IncrementalIndexerService: logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) raise - def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]: - """加载单个SPU数据""" - query = text(""" - SELECT - id, shop_id, shoplazza_id, title, brief, description, - spu, vendor, vendor_url, - image_src, image_width, image_height, image_path, image_alt, - tags, note, category, category_id, category_google_id, - category_level, category_path, - fake_sales, display_fake_sales, - tenant_id, creator, create_time, updater, update_time, deleted - FROM shoplazza_product_spu - WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 - LIMIT 1 - """) + def _load_single_spu(self, tenant_id: str, spu_id: str, include_deleted: bool = False) -> Optional[pd.Series]: + """ + 加载单个SPU数据 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + include_deleted: 是否包含已删除的记录(用于检查删除状态) + + Returns: + SPU数据Series,如果不存在返回None + """ + if include_deleted: + # 查询所有记录(包括已删除的),用于检查删除状态 + query = text(""" + SELECT + id, shop_id, shoplazza_id, title, brief, description, + spu, vendor, vendor_url, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, category_id, category_google_id, + category_level, category_path, + fake_sales, display_fake_sales, + tenant_id, creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND id = :spu_id + LIMIT 1 + """) + else: + # 只查询未删除的记录 + query = text(""" + SELECT + id, shop_id, shoplazza_id, title, brief, description, + spu, vendor, vendor_url, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, category_id, category_google_id, + category_level, category_path, + fake_sales, display_fake_sales, + tenant_id, creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 + LIMIT 1 + """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) @@ -91,6 +118,116 @@ class IncrementalIndexerService: return None return df.iloc[0] + + def check_spu_deleted(self, tenant_id: str, spu_id: str) -> bool: + """ + 检查SPU是否在数据库中被标记为删除 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + + Returns: + True表示已删除,False表示未删除或不存在 + """ + spu_row = self._load_single_spu(tenant_id, spu_id, include_deleted=True) + if spu_row is None: + # SPU不存在,视为需要删除 + return True + # 检查deleted字段(可能是bit类型,需要转换为int或bool) + deleted = spu_row.get('deleted', 0) + # 处理bit类型:可能是b'\x00'或b'\x01',或者直接是0/1 + if isinstance(deleted, bytes): + return deleted == b'\x01' or deleted == 1 + return bool(deleted) + + def delete_spus_from_es( + self, + es_client, + tenant_id: str, + spu_ids: List[str], + index_name: str = DEFAULT_INDEX_NAME + ) -> Dict[str, Any]: + """ + 从ES中批量删除SPU文档 + + Args: + es_client: Elasticsearch客户端 + tenant_id: 租户ID + spu_ids: 要删除的SPU ID列表 + index_name: 索引名称 + + Returns: + 包含删除结果的字典 + """ + if not spu_ids: + return { + "deleted": [], + "not_found": [], + "failed": [], + "total": 0, + "deleted_count": 0, + "not_found_count": 0, + "failed_count": 0 + } + + deleted_list = [] + not_found_list = [] + failed_list = [] + + logger.info(f"[IncrementalDeletion] Starting deletion for tenant_id={tenant_id}, spu_count={len(spu_ids)}") + + for spu_id in spu_ids: + try: + # 使用ES的delete API删除文档 + # ES文档ID格式:通常是spu_id,但需要确认实际使用的ID格式 + # 根据index_spus_to_es方法,使用的是spu_id作为文档ID + try: + response = es_client.client.delete( + index=index_name, + id=str(spu_id), + ignore=[404] # 忽略文档不存在的错误 + ) + + if response.get('result') == 'deleted': + deleted_list.append({"spu_id": spu_id, "status": "deleted"}) + log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted') + elif response.get('result') == 'not_found': + not_found_list.append({"spu_id": spu_id, "status": "not_found"}) + logger.debug(f"[IncrementalDeletion] SPU {spu_id} not found in ES") + else: + failed_list.append({"spu_id": spu_id, "error": f"Unexpected result: {response.get('result')}"}) + + except Exception as e: + # 处理404错误(文档不存在) + if hasattr(e, 'status_code') and e.status_code == 404: + not_found_list.append({"spu_id": spu_id, "status": "not_found"}) + else: + error_msg = str(e) + logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True) + failed_list.append({"spu_id": spu_id, "error": error_msg}) + log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', error_msg) + + except Exception as e: + error_msg = str(e) + logger.error(f"[IncrementalDeletion] Unexpected error deleting SPU {spu_id}: {e}", exc_info=True) + failed_list.append({"spu_id": spu_id, "error": error_msg}) + + logger.info( + f"[IncrementalDeletion] Completed for tenant_id={tenant_id}: " + f"total={len(spu_ids)}, deleted={len(deleted_list)}, " + f"not_found={len(not_found_list)}, failed={len(failed_list)}" + ) + + return { + "deleted": deleted_list, + "not_found": not_found_list, + "failed": failed_list, + "total": len(spu_ids), + "deleted_count": len(deleted_list), + "not_found_count": len(not_found_list), + "failed_count": len(failed_list) + } def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """加载指定SPU的所有SKU数据""" @@ -136,26 +273,34 @@ class IncrementalIndexerService: tenant_id: str, spu_ids: List[str], index_name: str = DEFAULT_INDEX_NAME, - batch_size: int = 100 + batch_size: int = 100, + delete_spu_ids: List[str] = None ) -> Dict[str, Any]: """ 批量索引SPU到ES(增量索引) + 支持两种删除方式: + 1. 自动检测删除:根据数据库deleted字段自动检测并删除ES中的文档 + 2. 显式删除:通过delete_spu_ids参数显式指定要删除的SPU + Args: es_client: Elasticsearch客户端 tenant_id: 租户ID - spu_ids: SPU ID列表 + spu_ids: SPU ID列表(要索引的) index_name: 索引名称 batch_size: 批量写入ES的批次大小 + delete_spu_ids: 显式指定要删除的SPU ID列表(可选) Returns: - 包含成功/失败列表的字典 + 包含成功/失败列表的字典,以及删除结果 """ start_time = time.time() total_count = len(spu_ids) success_list = [] failed_list = [] documents = [] + deleted_list = [] + auto_deleted_list = [] # 记录请求开始 log_index_request( @@ -164,21 +309,76 @@ class IncrementalIndexerService: tenant_id=tenant_id, request_params={ 'spu_count': total_count, + 'delete_count': len(delete_spu_ids) if delete_spu_ids else 0, 'index_name': index_name, 'batch_size': batch_size } ) - logger.info(f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, spu_count={total_count}") + logger.info( + f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, " + f"spu_count={total_count}, delete_count={len(delete_spu_ids) if delete_spu_ids else 0}" + ) - # 步骤1: 获取所有SPU文档 + # 步骤0: 处理显式删除请求 + if delete_spu_ids: + logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs") + delete_result = self.delete_spus_from_es( + es_client=es_client, + tenant_id=tenant_id, + spu_ids=delete_spu_ids, + index_name=index_name + ) + deleted_list = delete_result.get('deleted', []) + logger.info(f"[IncrementalIndexing] Explicitly deleted {len(deleted_list)} SPUs from ES") + + # 步骤1: 获取所有SPU文档,并自动检测删除 for spu_id in spu_ids: try: log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching') + + # 先检查SPU是否在数据库中被标记为删除 + is_deleted = self.check_spu_deleted(tenant_id, spu_id) + if is_deleted: + # SPU已删除,从ES中删除对应文档 + logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES") + try: + response = es_client.client.delete( + index=index_name, + id=str(spu_id), + ignore=[404] + ) + if response.get('result') == 'deleted': + auto_deleted_list.append({ + "spu_id": spu_id, + "status": "auto_deleted", + "reason": "deleted in database" + }) + log_spu_processing(indexer_logger, tenant_id, spu_id, 'auto_deleted', "deleted in database") + elif response.get('result') == 'not_found': + # ES中不存在,也算成功(可能已经被删除过了) + auto_deleted_list.append({ + "spu_id": spu_id, + "status": "auto_deleted", + "reason": "deleted in database (not found in ES)" + }) + except Exception as e: + error_msg = f"Failed to delete from ES: {str(e)}" + logger.error(f"[IncrementalIndexing] Error deleting SPU {spu_id} from ES: {e}", exc_info=True) + failed_list.append({ + "spu_id": spu_id, + "error": error_msg + }) + continue + + # SPU未删除,正常获取文档 doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) if doc is None: - error_msg = "SPU not found or deleted" + # 这种情况不应该发生,因为我们已经检查了deleted字段 + # 但为了健壮性,仍然处理 + error_msg = "SPU not found (unexpected)" + logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check") log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) failed_list.append({ "spu_id": spu_id, @@ -260,7 +460,7 @@ class IncrementalIndexerService: success_count = len(success_list) failed_count = len(failed_list) - # 记录最终结果 + # 记录最终结果(包含删除统计) log_index_result( indexer_logger, index_type='incremental', @@ -270,18 +470,34 @@ class IncrementalIndexerService: failed_count=failed_count, elapsed_time=elapsed_time, index_name=index_name, - errors=[item.get('error') for item in failed_list[:10]] if failed_list else None + errors=[item.get('error') for item in failed_list[:10]] if failed_list else None, + deleted_count=total_deleted_count, + explicit_deleted_count=explicit_deleted_count, + auto_deleted_count=auto_deleted_count ) + # 统计删除数量 + explicit_deleted_count = len(deleted_list) if delete_spu_ids else 0 + auto_deleted_count = len(auto_deleted_list) + total_deleted_count = explicit_deleted_count + auto_deleted_count + logger.info( f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: " f"total={total_count}, success={success_count}, failed={failed_count}, " + f"explicit_deleted={explicit_deleted_count}, auto_deleted={auto_deleted_count}, " f"elapsed={elapsed_time:.2f}s" ) return { "success": success_list, "failed": failed_list, + "deleted": { + "explicit": deleted_list if delete_spu_ids else [], + "auto": auto_deleted_list, + "total_count": total_deleted_count, + "explicit_count": explicit_deleted_count, + "auto_count": auto_deleted_count + }, "total": total_count, "success_count": success_count, "failed_count": failed_count, diff --git a/indexer/indexer_logger.py b/indexer/indexer_logger.py index 62ad63d..c5ac700 100644 --- a/indexer/indexer_logger.py +++ b/indexer/indexer_logger.py @@ -150,7 +150,10 @@ def log_index_result( failed_count: int, elapsed_time: float, index_name: Optional[str] = None, - errors: Optional[list] = None + errors: Optional[list] = None, + deleted_count: Optional[int] = None, + explicit_deleted_count: Optional[int] = None, + auto_deleted_count: Optional[int] = None ): """ 记录索引结果 @@ -165,23 +168,43 @@ def log_index_result( elapsed_time: 耗时(秒) index_name: 索引名称 errors: 错误列表 + deleted_count: 删除总数(可选) + explicit_deleted_count: 显式删除数(可选) + auto_deleted_count: 自动删除数(可选) """ - logger.info( + message = ( f"Index request completed: type={index_type}, tenant_id={tenant_id}, " - f"total={total_count}, success={success_count}, failed={failed_count}, " - f"elapsed={elapsed_time:.2f}s", - extra={ - 'index_type': index_type, - 'tenant_id': tenant_id, - 'operation': 'request_complete', - 'total_count': total_count, - 'success_count': success_count, - 'failed_count': failed_count, - 'elapsed_time': elapsed_time, - 'index_name': index_name, - 'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None - } + f"total={total_count}, success={success_count}, failed={failed_count}" ) + + if deleted_count is not None: + message += f", deleted={deleted_count}" + if explicit_deleted_count is not None: + message += f" (explicit={explicit_deleted_count}, auto={auto_deleted_count or 0})" + + message += f", elapsed={elapsed_time:.2f}s" + + extra_data = { + 'index_type': index_type, + 'tenant_id': tenant_id, + 'operation': 'request_complete', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'elapsed_time': elapsed_time, + 'index_name': index_name, + 'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None + } + + # 添加删除统计信息(如果提供) + if deleted_count is not None: + extra_data['deleted_count'] = deleted_count + if explicit_deleted_count is not None: + extra_data['explicit_deleted_count'] = explicit_deleted_count + if auto_deleted_count is not None: + extra_data['auto_deleted_count'] = auto_deleted_count + + logger.info(message, extra=extra_data) def log_spu_processing( -- libgit2 0.21.2