from typing import Any, Dict, Optional import asyncio import logging from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from ..service_registry import get_suggestion_builder logger = logging.getLogger(__name__) router = APIRouter(prefix="/suggestions", tags=["suggestions"]) class FullBuildRequest(BaseModel): """全量构建 suggestion 索引""" tenant_id: str = Field(..., description="租户 ID") days: int = Field(360, description="查询日志回溯天数") batch_size: int = Field(500, description="商品扫描 batch 大小") min_query_len: int = Field( 3, description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)", ) publish_alias: bool = Field( True, description="是否在构建完成后发布 alias 到新版本索引", ) keep_versions: int = Field( 2, description="保留的最新版本索引数量", ) class IncrementalBuildRequest(BaseModel): """增量更新 suggestion 索引""" tenant_id: str = Field(..., description="租户 ID") min_query_len: int = Field( 3, description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)", ) fallback_days: int = Field( 7, description="当没有增量水位线时,默认从最近多少天的查询日志开始补", ) overlap_minutes: int = Field( 30, description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏", ) bootstrap_if_missing: bool = Field( True, description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建", ) bootstrap_days: int = Field( 30, description="bootstrap 全量构建时的查询日志回溯天数", ) batch_size: int = Field( 500, description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小", ) async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) @router.post("/full") async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]: """ 全量构建/重建指定租户的 suggestion 索引。 该接口会: - 从商品索引 + 查询日志中构建候选词 - 写入新的 suggestion 索引(使用 versioned 命名) - 根据配置决定是否切换 alias 并清理旧版本 """ builder = get_suggestion_builder() if builder is None: raise HTTPException(status_code=503, detail="Suggestion builder is not initialized") try: result = await _run_in_executor( builder.rebuild_tenant_index, tenant_id=request.tenant_id, days=request.days, batch_size=request.batch_size, min_query_len=request.min_query_len, publish_alias=request.publish_alias, keep_versions=request.keep_versions, ) return result except Exception as e: logger.error( "Error in full suggestion rebuild for tenant_id=%s: %s", request.tenant_id, e, exc_info=True, ) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/incremental") async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]: """ 增量更新指定租户的 suggestion 索引。 - 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志 - 根据查询频次对 suggestion 索引做增量 upsert - 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建 """ builder = get_suggestion_builder() if builder is None: raise HTTPException(status_code=503, detail="Suggestion builder is not initialized") try: result = await _run_in_executor( builder.incremental_update_tenant_index, tenant_id=request.tenant_id, min_query_len=request.min_query_len, fallback_days=request.fallback_days, overlap_minutes=request.overlap_minutes, bootstrap_if_missing=request.bootstrap_if_missing, bootstrap_days=request.bootstrap_days, batch_size=request.batch_size, ) return result except Exception as e: logger.error( "Error in incremental suggestion update for tenant_id=%s: %s", request.tenant_id, e, exc_info=True, ) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")