""" 增量索引API路由。 提供单个SPU数据获取接口,用于增量更新ES索引。 """ from fastapi import APIRouter, HTTPException, Query, Request from typing import Optional import logging from ..models import ErrorResponse logger = logging.getLogger(__name__) router = APIRouter(prefix="/indexer", tags=["indexer"]) @router.get("/spu/{spu_id}") async def get_spu_document( spu_id: str, tenant_id: str = Query(..., description="租户ID"), request: Request = None ): """ 获取单个SPU的ES文档数据(用于增量索引更新)。 功能说明: - 根据 tenant_id 和 spu_id 查询MySQL数据库 - 返回该SPU的完整ES文档数据(JSON格式) - 外部Java程序可以调用此接口获取数据后推送到ES 参数: - spu_id: SPU ID(路径参数) - tenant_id: 租户ID(查询参数,必需) 返回: - 成功:返回ES文档JSON对象 - SPU不存在或已删除:返回404 - 其他错误:返回500 示例请求: ``` GET /indexer/spu/123?tenant_id=1 ``` 示例响应: ```json { "tenant_id": "1", "spu_id": "123", "title_zh": "商品标题", "brief_zh": "商品简介", "description_zh": "商品描述", "vendor_zh": "供应商", "tags": ["标签1", "标签2"], "category_path_zh": "类目1/类目2/类目3", "category1_name": "类目1", "category2_name": "类目2", "category3_name": "类目3", "category_id": "100", "category_level": 3, "min_price": 99.99, "max_price": 199.99, "compare_at_price": 299.99, "sales": 1000, "total_inventory": 500, "skus": [...], "specifications": [...], ... } ``` """ try: from ..app import get_incremental_service # 获取增量服务实例 service = get_incremental_service() if service is None: raise HTTPException( status_code=503, detail="Incremental indexer service is not initialized. Please check database connection." ) # 获取SPU文档 doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) if doc is None: raise HTTPException( status_code=404, detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted" ) return doc except HTTPException: raise except Exception as e: logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/health") async def indexer_health_check(): """ 检查增量索引服务健康状态。 返回: - 服务是否可用 - 数据库连接状态 - 预加载数据状态 """ try: from ..app import get_incremental_service service = get_incremental_service() if service is None: return { "status": "unavailable", "message": "Incremental indexer service is not initialized", "database": "unknown", "preloaded_data": { "category_mappings": 0 } } # 检查数据库连接 try: from sqlalchemy import text with service.db_engine.connect() as conn: conn.execute(text("SELECT 1")) db_status = "connected" except Exception as e: db_status = f"disconnected: {str(e)}" return { "status": "available", "database": db_status, "preloaded_data": { "category_mappings": len(service.category_id_to_name) } } except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) return { "status": "error", "message": str(e) }