indexer.py 6.68 KB
"""
索引API路由。

提供全量和增量索引接口,供外部Java程序调用。
"""

from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
import logging

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/indexer", tags=["indexer"])


class ReindexRequest(BaseModel):
    """全量重建索引请求"""
    tenant_id: str
    recreate_index: bool = False
    batch_size: int = 500


class IndexSpusRequest(BaseModel):
    """增量索引请求(按SPU列表索引)"""
    tenant_id: str
    spu_ids: List[str]
    delete_spu_ids: List[str] = []  # 显式指定要删除的SPU ID列表(可选)


class GetDocumentsRequest(BaseModel):
    """查询文档请求(不写入ES)"""
    tenant_id: str
    spu_ids: List[str]


@router.post("/reindex")
async def reindex_all(request: ReindexRequest):
    """
    全量重建索引接口
    
    将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。
    """
    try:
        from ..app import get_bulk_indexing_service
        service = get_bulk_indexing_service()
        if service is None:
            raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
        return service.bulk_index(
            tenant_id=request.tenant_id,
            recreate_index=request.recreate_index,
            batch_size=request.batch_size
        )
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")


@router.post("/index")
async def index_spus(request: IndexSpusRequest):
    """
    增量索引接口
    
    根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
    
    支持两种删除方式:
    1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
    2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
    
    删除策略说明:
    - 数据库是唯一真实来源(Single Source of Truth)
    - 自动检测:查询数据库时发现deleted=1,自动从ES删除
    - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
    """
    try:
        from ..app import get_incremental_service, get_es_client
        
        # 验证请求参数
        if not request.spu_ids and not request.delete_spu_ids:
            raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
        
        if request.spu_ids and len(request.spu_ids) > 100:
            raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
        
        if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
            raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
        
        service = get_incremental_service()
        if service is None:
            raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
        
        es_client = get_es_client()
        if es_client is None:
            raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
        
        # 调用批量索引方法(支持显式删除参数)
        result = service.index_spus_to_es(
            es_client=es_client,
            tenant_id=request.tenant_id,
            spu_ids=request.spu_ids if request.spu_ids else [],
            delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
        )
        
        return result
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")


@router.post("/documents")
async def get_documents(request: GetDocumentsRequest):
    """
    查询文档接口
    
    根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
    """
    try:
        from ..app import get_incremental_service
        if not request.spu_ids:
            raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
        if len(request.spu_ids) > 100:
            raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
        service = get_incremental_service()
        if service is None:
            raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
        success_list, failed_list = [], []
        for spu_id in request.spu_ids:
            try:
                doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
                (success_list if doc else failed_list).append({
                    "spu_id": spu_id,
                    "document": doc
                } if doc else {
                    "spu_id": spu_id,
                    "error": "SPU not found or deleted"
                })
            except Exception as e:
                failed_list.append({"spu_id": spu_id, "error": str(e)})
        return {
            "success": success_list,
            "failed": failed_list,
            "total": len(request.spu_ids),
            "success_count": len(success_list),
            "failed_count": len(failed_list)
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")


@router.get("/health")
async def indexer_health_check():
    """检查索引服务健康状态"""
    try:
        from ..app import get_incremental_service
        from sqlalchemy import text
        service = get_incremental_service()
        if service is None:
            return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
        try:
            with service.db_engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            db_status = "connected"
        except Exception as e:
            db_status = f"disconnected: {str(e)}"
        return {
            "status": "available",
            "database": db_status,
            "preloaded_data": {"category_mappings": len(service.category_id_to_name)}
        }
    except Exception as e:
        logger.error(f"Error checking indexer health: {e}", exc_info=True)
        return {"status": "error", "message": str(e)}