""" 索引API路由。 提供全量和增量索引接口,供外部Java程序调用。 """ from fastapi import APIRouter, HTTPException from typing import List from pydantic import BaseModel import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/indexer", tags=["indexer"]) class ReindexRequest(BaseModel): """全量重建索引请求""" tenant_id: str recreate_index: bool = False batch_size: int = 500 class IndexSpusRequest(BaseModel): """增量索引请求(按SPU列表索引)""" tenant_id: str spu_ids: List[str] delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选) class GetDocumentsRequest(BaseModel): """查询文档请求(不写入ES)""" tenant_id: str spu_ids: List[str] @router.post("/reindex") async def reindex_all(request: ReindexRequest): """ 全量重建索引接口 将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。 """ try: from ..app import get_bulk_indexing_service service = get_bulk_indexing_service() if service is None: raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") return service.bulk_index( tenant_id=request.tenant_id, recreate_index=request.recreate_index, batch_size=request.batch_size ) except HTTPException: raise except Exception as e: logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/index") async def index_spus(request: IndexSpusRequest): """ 增量索引接口 根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。 支持两种删除方式: 1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档 2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何) 删除策略说明: - 数据库是唯一真实来源(Single Source of Truth) - 自动检测:查询数据库时发现deleted=1,自动从ES删除 - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景) 响应格式: - spu_ids: spu_ids对应的响应列表,每个元素包含spu_id和status(indexed/deleted/failed) - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed) - failed状态的元素会包含msg字段说明失败原因 - 最后给出总体统计:total, success_count, failed_count等 """ try: from ..app import get_incremental_service, get_es_client # 验证请求参数 if not request.spu_ids and not request.delete_spu_ids: raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty") if request.spu_ids and len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing") if request.delete_spu_ids and len(request.delete_spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") es_client = get_es_client() if es_client is None: raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") # 调用批量索引方法(支持显式删除参数) result = service.index_spus_to_es( es_client=es_client, tenant_id=request.tenant_id, spu_ids=request.spu_ids if request.spu_ids else [], delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None ) return result except HTTPException: raise except Exception as e: logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/documents") async def get_documents(request: GetDocumentsRequest): """ 查询文档接口 根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。 """ try: from ..app import get_incremental_service if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") success_list, failed_list = [], [] for spu_id in request.spu_ids: try: doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id) (success_list if doc else failed_list).append({ "spu_id": spu_id, "document": doc } if doc else { "spu_id": spu_id, "error": "SPU not found or deleted" }) except Exception as e: failed_list.append({"spu_id": spu_id, "error": str(e)}) return { "success": success_list, "failed": failed_list, "total": len(request.spu_ids), "success_count": len(success_list), "failed_count": len(failed_list) } except HTTPException: raise except Exception as e: logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/health") async def indexer_health_check(): """检查索引服务健康状态""" try: from ..app import get_incremental_service from sqlalchemy import text service = get_incremental_service() if service is None: return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}} try: with service.db_engine.connect() as conn: conn.execute(text("SELECT 1")) db_status = "connected" except Exception as e: db_status = f"disconnected: {str(e)}" return { "status": "available", "database": db_status, "preloaded_data": {"category_mappings": len(service.category_id_to_name)} } except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) return {"status": "error", "message": str(e)}