suggestion_indexer.py 4.74 KB
from typing import Any, Dict, Optional

import asyncio
import logging

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field

from ..service_registry import get_suggestion_builder


logger = logging.getLogger(__name__)

router = APIRouter(prefix="/suggestions", tags=["suggestions"])


class FullBuildRequest(BaseModel):
    """全量构建 suggestion 索引"""

    tenant_id: str = Field(..., description="租户 ID")
    days: int = Field(360, description="查询日志回溯天数")
    batch_size: int = Field(500, description="商品扫描 batch 大小")
    min_query_len: int = Field(
        3,
        description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
    )
    publish_alias: bool = Field(
        True,
        description="是否在构建完成后发布 alias 到新版本索引",
    )
    keep_versions: int = Field(
        2,
        description="保留的最新版本索引数量",
    )


class IncrementalBuildRequest(BaseModel):
    """增量更新 suggestion 索引"""

    tenant_id: str = Field(..., description="租户 ID")
    min_query_len: int = Field(
        3,
        description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
    )
    fallback_days: int = Field(
        7,
        description="当没有增量水位线时,默认从最近多少天的查询日志开始补",
    )
    overlap_minutes: int = Field(
        30,
        description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏",
    )
    bootstrap_if_missing: bool = Field(
        True,
        description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建",
    )
    bootstrap_days: int = Field(
        30,
        description="bootstrap 全量构建时的查询日志回溯天数",
    )
    batch_size: int = Field(
        500,
        description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小",
    )


async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: func(*args, **kwargs))


@router.post("/full")
async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]:
    """
    全量构建/重建指定租户的 suggestion 索引。

    该接口会:
    - 从商品索引 + 查询日志中构建候选词
    - 写入新的 suggestion 索引(使用 versioned 命名)
    - 根据配置决定是否切换 alias 并清理旧版本
    """
    builder = get_suggestion_builder()
    if builder is None:
        raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")

    try:
        result = await _run_in_executor(
            builder.rebuild_tenant_index,
            tenant_id=request.tenant_id,
            days=request.days,
            batch_size=request.batch_size,
            min_query_len=request.min_query_len,
            publish_alias=request.publish_alias,
            keep_versions=request.keep_versions,
        )
        return result
    except Exception as e:
        logger.error(
            "Error in full suggestion rebuild for tenant_id=%s: %s",
            request.tenant_id,
            e,
            exc_info=True,
        )
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")


@router.post("/incremental")
async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]:
    """
    增量更新指定租户的 suggestion 索引。

    - 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志
    - 根据查询频次对 suggestion 索引做增量 upsert
    - 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建
    """
    builder = get_suggestion_builder()
    if builder is None:
        raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")

    try:
        result = await _run_in_executor(
            builder.incremental_update_tenant_index,
            tenant_id=request.tenant_id,
            min_query_len=request.min_query_len,
            fallback_days=request.fallback_days,
            overlap_minutes=request.overlap_minutes,
            bootstrap_if_missing=request.bootstrap_if_missing,
            bootstrap_days=request.bootstrap_days,
            batch_size=request.batch_size,
        )
        return result
    except Exception as e:
        logger.error(
            "Error in incremental suggestion update for tenant_id=%s: %s",
            request.tenant_id,
            e,
            exc_info=True,
        )
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")