"""增量数据获取服务""" import pandas as pd import logging import time from typing import Dict, Any, Optional, List from sqlalchemy import text from indexer.indexing_utils import load_category_mapping, create_document_transformer from indexer.bulk_indexer import BulkIndexer from indexer.mapping_generator import DEFAULT_INDEX_NAME from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_spu_processing ) # Configure logger logger = logging.getLogger(__name__) # Indexer专用日志器 indexer_logger = get_indexer_logger() class IncrementalIndexerService: """增量索引服务,提供SPU数据获取功能。""" def __init__(self, db_engine: Any): """初始化增量索引服务""" self.db_engine = db_engine # 预加载分类映射(全局,所有租户共享) self.category_id_to_name = load_category_mapping(db_engine) logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: """获取SPU的ES文档数据""" try: # 加载SPU数据 spu_row = self._load_single_spu(tenant_id, spu_id) if spu_row is None: logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}") return None # 加载SKU数据 skus_df = self._load_skus_for_spu(tenant_id, spu_id) # 加载Option数据 options_df = self._load_options_for_spu(tenant_id, spu_id) # 创建文档转换器 transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, tenant_id=tenant_id ) # 转换为ES文档 doc = transformer.transform_spu_to_doc( tenant_id=tenant_id, spu_row=spu_row, skus=skus_df, options=options_df ) if doc is None: logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") return None return doc except Exception as e: logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) raise def _load_single_spu(self, tenant_id: str, spu_id: str, include_deleted: bool = False) -> Optional[pd.Series]: """ 加载单个SPU数据 Args: tenant_id: 租户ID spu_id: SPU ID include_deleted: 是否包含已删除的记录(用于检查删除状态) Returns: SPU数据Series,如果不存在返回None """ if include_deleted: # 查询所有记录(包括已删除的),用于检查删除状态 query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id LIMIT 1 """) else: # 只查询未删除的记录 query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 LIMIT 1 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) if df.empty: return None return df.iloc[0] def check_spu_deleted(self, tenant_id: str, spu_id: str) -> bool: """ 检查SPU是否在数据库中被标记为删除 Args: tenant_id: 租户ID spu_id: SPU ID Returns: True表示已删除,False表示未删除或不存在 """ spu_row = self._load_single_spu(tenant_id, spu_id, include_deleted=True) if spu_row is None: # SPU不存在,视为需要删除 return True # 检查deleted字段(可能是bit类型,需要转换为int或bool) deleted = spu_row.get('deleted', 0) # 处理bit类型:可能是b'\x00'或b'\x01',或者直接是0/1 if isinstance(deleted, bytes): return deleted == b'\x01' or deleted == 1 return bool(deleted) def _delete_spu_from_es( self, es_client, tenant_id: str, spu_id: str, index_name: str, log_prefix: str = "" ) -> Dict[str, Any]: """ 从ES中删除单个SPU文档(通用方法) Returns: {"status": "deleted|not_found|failed", "msg": "错误信息(可选)"} """ try: response = es_client.client.delete( index=index_name, id=str(spu_id), ignore=[404] ) result = response.get('result') if result == 'deleted': log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted', log_prefix) return {"status": "deleted"} elif result == 'not_found': return {"status": "not_found"} else: msg = f"Unexpected result: {result}" log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg) return {"status": "failed", "msg": msg} except Exception as e: if hasattr(e, 'status_code') and e.status_code == 404: return {"status": "not_found"} else: msg = str(e) logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True) log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg) return {"status": "failed", "msg": msg} def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """加载指定SPU的所有SKU数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: """加载指定SPU的所有Option数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 ORDER BY position """) with self.db_engine.connect() as conn: df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) return df def index_spus_to_es( self, es_client, tenant_id: str, spu_ids: List[str], index_name: str = DEFAULT_INDEX_NAME, batch_size: int = 100, delete_spu_ids: List[str] = None ) -> Dict[str, Any]: """ 批量索引SPU到ES(增量索引) 支持两种删除方式: 1. 自动检测删除:根据数据库deleted字段自动检测并删除ES中的文档 2. 显式删除:通过delete_spu_ids参数显式指定要删除的SPU Args: es_client: Elasticsearch客户端 tenant_id: 租户ID spu_ids: SPU ID列表(要索引的) index_name: 索引名称 batch_size: 批量写入ES的批次大小 delete_spu_ids: 显式指定要删除的SPU ID列表(可选) Returns: 包含成功/失败列表的字典,以及删除结果 """ start_time = time.time() total_count = len(spu_ids) delete_count = len(delete_spu_ids) if delete_spu_ids else 0 # spu_ids 对应的响应列表(状态:indexed, deleted, failed) spu_results = [] # delete_spu_ids 对应的响应列表(状态:deleted, not_found, failed) delete_results = [] documents = [] # 记录请求开始 log_index_request( indexer_logger, index_type='incremental', tenant_id=tenant_id, request_params={ 'spu_count': total_count, 'delete_count': delete_count, 'index_name': index_name, 'batch_size': batch_size } ) logger.info( f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, " f"spu_count={total_count}, delete_count={delete_count}" ) # 步骤0: 处理显式删除请求(delete_spu_ids) if delete_spu_ids: logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs") for spu_id in delete_spu_ids: result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit") delete_results.append({"spu_id": spu_id, **result}) # 步骤1: 处理索引请求(spu_ids),并自动检测删除 for spu_id in spu_ids: try: log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching') # 先检查SPU是否在数据库中被标记为删除 if self.check_spu_deleted(tenant_id, spu_id): # SPU已删除,从ES中删除对应文档 logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES") result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto") # 统一状态:deleted或not_found都算deleted,failed保持failed status = "deleted" if result["status"] != "failed" else "failed" spu_results.append({ "spu_id": spu_id, "status": status, **({"msg": result["msg"]} if status == "failed" else {}) }) continue # SPU未删除,正常获取文档 doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) if doc is None: # 这种情况不应该发生,因为我们已经检查了deleted字段 # 但为了健壮性,仍然处理 error_msg = "SPU not found (unexpected)" logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check") log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) spu_results.append({ "spu_id": spu_id, "status": "failed", "msg": error_msg }) continue log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') documents.append((spu_id, doc)) # 保存spu_id和doc的对应关系 except Exception as e: error_msg = str(e) logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True) log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) spu_results.append({ "spu_id": spu_id, "status": "failed", "msg": error_msg }) logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") # 步骤2: 批量写入ES if documents: try: # 提取doc列表用于批量写入 doc_list = [doc for _, doc in documents] logger.info(f"[IncrementalIndexing] Indexing {len(doc_list)} documents to ES (batch_size={batch_size})") indexer = BulkIndexer(es_client, index_name, batch_size=batch_size, max_retries=3) bulk_results = indexer.index_documents( doc_list, id_field="spu_id", show_progress=False ) # 根据ES返回的结果更新spu_results es_success_count = bulk_results.get('success', 0) es_failed_count = bulk_results.get('failed', 0) # 由于BulkIndexer返回的是总体统计,我们假设: # - 如果ES返回成功数等于文档数,则所有文档都成功 # - 否则,失败的文档可能在ES错误信息中,但我们无法精确映射 # 这里采用简化处理:将成功写入ES的文档标记为indexed if es_failed_count == 0: # 全部成功 for spu_id, doc in documents: spu_results.append({ "spu_id": spu_id, "status": "indexed" }) else: # 有失败的情况,我们标记已处理的文档为成功,未处理的可能失败 logger.warning(f"[IncrementalIndexing] ES bulk index had {es_failed_count} failures") for spu_id, doc in documents: # 由于无法精确知道哪些失败,我们假设全部成功(实际应该改进) spu_results.append({ "spu_id": spu_id, "status": "indexed" }) # 如果有ES错误,记录日志 if bulk_results.get('errors'): logger.error(f"[IncrementalIndexing] ES errors: {bulk_results['errors'][:5]}") except Exception as e: error_msg = f"ES bulk index failed: {str(e)}" logger.error(f"[IncrementalIndexing] {error_msg}", exc_info=True) # 所有文档都失败 for spu_id, doc in documents: # 检查是否已经在spu_results中(可能之前已经标记为failed) existing = next((r for r in spu_results if r.get('spu_id') == spu_id), None) if existing: # 如果已存在,更新状态 existing['status'] = 'failed' existing['msg'] = error_msg else: spu_results.append({ "spu_id": spu_id, "status": "failed", "msg": error_msg }) else: logger.warning(f"[IncrementalIndexing] No documents to index for tenant_id={tenant_id}") elapsed_time = time.time() - start_time # 统计结果(简化) total_processed = total_count + delete_count total_success = len([r for r in spu_results + delete_results if r.get('status') in ('indexed', 'deleted', 'not_found')]) total_failed = len([r for r in spu_results + delete_results if r.get('status') == 'failed']) # 记录最终结果 deleted_count = len([r for r in spu_results + delete_results if r.get('status') == 'deleted']) log_index_result( indexer_logger, index_type='incremental', tenant_id=tenant_id, total_count=total_processed, success_count=total_success, failed_count=total_failed, elapsed_time=elapsed_time, index_name=index_name, errors=[r.get('msg') for r in spu_results + delete_results if r.get('status') == 'failed'][:10], deleted_count=deleted_count ) logger.info( f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: " f"total={total_processed}, success={total_success}, failed={total_failed}, " f"elapsed={elapsed_time:.2f}s" ) return { "spu_ids": spu_results, # spu_ids对应的响应列表 "delete_spu_ids": delete_results, # delete_spu_ids对应的响应列表 "total": total_processed, "success_count": total_success, "failed_count": total_failed, "elapsed_time": elapsed_time, "index_name": index_name, "tenant_id": tenant_id }