bulk_indexing_service.py 4.13 KB
"""
全量索引服务。

提供全量索引功能,将指定租户的所有SPU数据导入到ES。
"""

import logging
from typing import Dict, Any
from sqlalchemy import Engine
from utils.es_client import ESClient
from indexer.spu_transformer import SPUTransformer
from indexer.bulk_indexer import BulkIndexer
from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME

logger = logging.getLogger(__name__)


class BulkIndexingService:
    """全量索引服务,提供批量导入功能。"""

    def __init__(self, db_engine: Engine, es_client: ESClient):
        """
        初始化全量索引服务。

        Args:
            db_engine: SQLAlchemy database engine
            es_client: Elasticsearch client
        """
        self.db_engine = db_engine
        self.es_client = es_client
        self.index_name = DEFAULT_INDEX_NAME

    def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
        """执行全量索引"""
        import time
        start_time = time.time()

        try:
            # 1. 加载mapping
            logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}")
            mapping = load_mapping()

            # 2. 处理索引(删除并重建或创建)
            if recreate_index:
                logger.info(f"[BulkIndexing] Recreating index: {self.index_name}")
                if self.es_client.index_exists(self.index_name):
                    if delete_index_if_exists(self.es_client, self.index_name):
                        logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}")
                    else:
                        raise Exception(f"Failed to delete index: {self.index_name}")

            if not self.es_client.index_exists(self.index_name):
                logger.info(f"[BulkIndexing] Creating index: {self.index_name}")
                if not self.es_client.create_index(self.index_name, mapping):
                    raise Exception(f"Failed to create index: {self.index_name}")
                logger.info(f"[BulkIndexing] Created index: {self.index_name}")
            else:
                logger.info(f"[BulkIndexing] Index already exists: {self.index_name}")

            # 3. 转换数据
            logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
            transformer = SPUTransformer(self.db_engine, tenant_id)
            documents = transformer.transform_batch()

            if not documents:
                logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}")
                return {
                    "success": True,
                    "total": 0,
                    "indexed": 0,
                    "failed": 0,
                    "elapsed_time": time.time() - start_time,
                    "message": "No documents to index"
                }

            logger.info(f"[BulkIndexing] Transformed {len(documents)} documents")

            # 4. 批量导入
            logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
            indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size)
            results = indexer.index_documents(
                documents,
                id_field="spu_id",
                show_progress=False  # API调用时不打印进度
            )

            elapsed_time = time.time() - start_time

            logger.info(
                f"[BulkIndexing] Completed for tenant_id={tenant_id}: "
                f"indexed={results['success']}, failed={results['failed']}, "
                f"elapsed={elapsed_time:.2f}s"
            )

            return {
                "success": results['failed'] == 0,
                "total": len(documents),
                "indexed": results['success'],
                "failed": results['failed'],
                "elapsed_time": elapsed_time,
                "index_name": self.index_name,
                "tenant_id": tenant_id
            }

        except Exception as e:
            logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True)
            raise