indexer.py 4.05 KB
"""
增量索引API路由。

提供单个SPU数据获取接口,用于增量更新ES索引。
"""

from fastapi import APIRouter, HTTPException, Query, Request
from typing import Optional
import logging

from ..models import ErrorResponse

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/indexer", tags=["indexer"])


@router.get("/spu/{spu_id}")
async def get_spu_document(
    spu_id: str,
    tenant_id: str = Query(..., description="租户ID"),
    request: Request = None
):
    """
    获取单个SPU的ES文档数据(用于增量索引更新)。

    功能说明:
    - 根据 tenant_id 和 spu_id 查询MySQL数据库
    - 返回该SPU的完整ES文档数据(JSON格式)
    - 外部Java程序可以调用此接口获取数据后推送到ES

    参数:
    - spu_id: SPU ID(路径参数)
    - tenant_id: 租户ID(查询参数,必需)

    返回:
    - 成功:返回ES文档JSON对象
    - SPU不存在或已删除:返回404
    - 其他错误:返回500

    示例请求:
    ```
    GET /indexer/spu/123?tenant_id=1
    ```

    示例响应:
    ```json
    {
        "tenant_id": "1",
        "spu_id": "123",
        "title_zh": "商品标题",
        "brief_zh": "商品简介",
        "description_zh": "商品描述",
        "vendor_zh": "供应商",
        "tags": ["标签1", "标签2"],
        "category_path_zh": "类目1/类目2/类目3",
        "category1_name": "类目1",
        "category2_name": "类目2",
        "category3_name": "类目3",
        "category_id": "100",
        "category_level": 3,
        "min_price": 99.99,
        "max_price": 199.99,
        "compare_at_price": 299.99,
        "sales": 1000,
        "total_inventory": 500,
        "skus": [...],
        "specifications": [...],
        ...
    }
    ```
    """
    try:
        from ..app import get_incremental_service
        
        # 获取增量服务实例
        service = get_incremental_service()
        if service is None:
            raise HTTPException(
                status_code=503,
                detail="Incremental indexer service is not initialized. Please check database connection."
            )

        # 获取SPU文档
        doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
        
        if doc is None:
            raise HTTPException(
                status_code=404,
                detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted"
            )

        return doc

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")


@router.get("/health")
async def indexer_health_check():
    """
    检查增量索引服务健康状态。

    返回:
    - 服务是否可用
    - 数据库连接状态
    - 预加载数据状态
    """
    try:
        from ..app import get_incremental_service
        
        service = get_incremental_service()
        if service is None:
            return {
                "status": "unavailable",
                "message": "Incremental indexer service is not initialized",
                "database": "unknown",
                "preloaded_data": {
                    "category_mappings": 0
                }
            }

        # 检查数据库连接
        try:
            from sqlalchemy import text
            with service.db_engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            db_status = "connected"
        except Exception as e:
            db_status = f"disconnected: {str(e)}"

        return {
            "status": "available",
            "database": db_status,
            "preloaded_data": {
                "category_mappings": len(service.category_id_to_name)
            }
        }

    except Exception as e:
        logger.error(f"Error checking indexer health: {e}", exc_info=True)
        return {
            "status": "error",
            "message": str(e)
        }