""" 全量索引服务。 提供全量索引功能,将指定租户的所有SPU数据导入到ES。 """ import logging from typing import Dict, Any from sqlalchemy import Engine from utils.es_client import ESClient from indexer.spu_transformer import SPUTransformer from indexer.bulk_indexer import BulkIndexer from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_bulk_index_batch ) logger = logging.getLogger(__name__) # Indexer专用日志器 indexer_logger = get_indexer_logger() class BulkIndexingService: """全量索引服务,提供批量导入功能。""" def __init__(self, db_engine: Engine, es_client: ESClient): """ 初始化全量索引服务。 Args: db_engine: SQLAlchemy database engine es_client: Elasticsearch client """ self.db_engine = db_engine self.es_client = es_client self.index_name = DEFAULT_INDEX_NAME def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]: """执行全量索引""" import time start_time = time.time() # 记录请求开始 log_index_request( indexer_logger, index_type='bulk', tenant_id=tenant_id, request_params={ 'recreate_index': recreate_index, 'batch_size': batch_size, 'index_name': self.index_name } ) try: # 1. 加载mapping logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}") indexer_logger.info( f"Loading mapping for bulk index", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'load_mapping', 'index_name': self.index_name } ) mapping = load_mapping() # 2. 处理索引(删除并重建或创建) if recreate_index: logger.info(f"[BulkIndexing] Recreating index: {self.index_name}") indexer_logger.info( f"Recreating index: {self.index_name}", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'recreate_index', 'index_name': self.index_name } ) if self.es_client.index_exists(self.index_name): if delete_index_if_exists(self.es_client, self.index_name): logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}") else: raise Exception(f"Failed to delete index: {self.index_name}") if not self.es_client.index_exists(self.index_name): logger.info(f"[BulkIndexing] Creating index: {self.index_name}") indexer_logger.info( f"Creating index: {self.index_name}", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'create_index', 'index_name': self.index_name } ) if not self.es_client.create_index(self.index_name, mapping): raise Exception(f"Failed to create index: {self.index_name}") logger.info(f"[BulkIndexing] Created index: {self.index_name}") else: logger.info(f"[BulkIndexing] Index already exists: {self.index_name}") # 3. 转换数据 logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}") indexer_logger.info( f"Transforming SPU data", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'transform_data', 'index_name': self.index_name } ) transformer = SPUTransformer(self.db_engine, tenant_id) documents = transformer.transform_batch() if not documents: logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}") elapsed_time = time.time() - start_time log_index_result( indexer_logger, index_type='bulk', tenant_id=tenant_id, total_count=0, success_count=0, failed_count=0, elapsed_time=elapsed_time, index_name=self.index_name ) return { "success": True, "total": 0, "indexed": 0, "failed": 0, "elapsed_time": elapsed_time, "message": "No documents to index", "index_name": self.index_name, "tenant_id": tenant_id } logger.info(f"[BulkIndexing] Transformed {len(documents)} documents") indexer_logger.info( f"Transformed {len(documents)} documents", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'transform_complete', 'total_count': len(documents), 'index_name': self.index_name } ) # 4. 批量导入 logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})") indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size, max_retries=3) results = indexer.index_documents( documents, id_field="spu_id", show_progress=False # API调用时不打印进度 ) elapsed_time = time.time() - start_time # 记录最终结果 log_index_result( indexer_logger, index_type='bulk', tenant_id=tenant_id, total_count=len(documents), success_count=results['success'], failed_count=results['failed'], elapsed_time=elapsed_time, index_name=self.index_name, errors=results.get('errors', []) ) logger.info( f"[BulkIndexing] Completed for tenant_id={tenant_id}: " f"indexed={results['success']}, failed={results['failed']}, " f"elapsed={elapsed_time:.2f}s" ) return { "success": results['failed'] == 0, "total": len(documents), "indexed": results['success'], "failed": results['failed'], "elapsed_time": elapsed_time, "index_name": self.index_name, "tenant_id": tenant_id } except Exception as e: elapsed_time = time.time() - start_time error_msg = str(e) logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True) indexer_logger.error( f"Bulk index failed: {error_msg}", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'request_failed', 'error': error_msg, 'elapsed_time': elapsed_time, 'index_name': self.index_name }, exc_info=True ) raise