""" 索引API路由。 提供全量和增量索引接口,供外部Java程序调用。 """ from fastapi import APIRouter, HTTPException from typing import List from pydantic import BaseModel import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/indexer", tags=["indexer"]) class ReindexRequest(BaseModel): """全量重建索引请求""" tenant_id: str recreate_index: bool = False batch_size: int = 500 class IndexSpusRequest(BaseModel): """增量索引请求(按SPU列表索引)""" tenant_id: str spu_ids: List[str] class GetDocumentsRequest(BaseModel): """查询文档请求(不写入ES)""" tenant_id: str spu_ids: List[str] @router.post("/reindex") async def reindex_all(request: ReindexRequest): """ 全量重建索引接口 将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。 """ try: from ..app import get_bulk_indexing_service service = get_bulk_indexing_service() if service is None: raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") return service.bulk_index( tenant_id=request.tenant_id, recreate_index=request.recreate_index, batch_size=request.batch_size ) except HTTPException: raise except Exception as e: logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/index") async def index_spus(request: IndexSpusRequest): """ 增量索引接口 根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。 """ try: from ..app import get_incremental_service, get_es_client if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") es_client = get_es_client() if es_client is None: raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") # 调用批量索引方法 result = service.index_spus_to_es( es_client=es_client, tenant_id=request.tenant_id, spu_ids=request.spu_ids ) return result except HTTPException: raise except Exception as e: logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/documents") async def get_documents(request: GetDocumentsRequest): """ 查询文档接口 根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。 """ try: from ..app import get_incremental_service if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") success_list, failed_list = [], [] for spu_id in request.spu_ids: try: doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id) (success_list if doc else failed_list).append({ "spu_id": spu_id, "document": doc } if doc else { "spu_id": spu_id, "error": "SPU not found or deleted" }) except Exception as e: failed_list.append({"spu_id": spu_id, "error": str(e)}) return { "success": success_list, "failed": failed_list, "total": len(request.spu_ids), "success_count": len(success_list), "failed_count": len(failed_list) } except HTTPException: raise except Exception as e: logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/health") async def indexer_health_check(): """检查索引服务健康状态""" try: from ..app import get_incremental_service from sqlalchemy import text service = get_incremental_service() if service is None: return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}} try: with service.db_engine.connect() as conn: conn.execute(text("SELECT 1")) db_status = "connected" except Exception as e: db_status = f"disconnected: {str(e)}" return { "status": "available", "database": db_status, "preloaded_data": {"category_mappings": len(service.category_id_to_name)} } except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) return {"status": "error", "message": str(e)}