Blame view

api/routes/suggestion_indexer.py 4.74 KB
5b8f58c0   tangwang   sugg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  from typing import Any, Dict, Optional
  
  import asyncio
  import logging
  
  from fastapi import APIRouter, HTTPException
  from pydantic import BaseModel, Field
  
  from ..service_registry import get_suggestion_builder
  
  
  logger = logging.getLogger(__name__)
  
  router = APIRouter(prefix="/suggestions", tags=["suggestions"])
  
  
  class FullBuildRequest(BaseModel):
      """全量构建 suggestion 索引"""
  
      tenant_id: str = Field(..., description="租户 ID")
      days: int = Field(360, description="查询日志回溯天数")
      batch_size: int = Field(500, description="商品扫描 batch 大小")
1cca75c8   tangwang   sugg 索引文档
23
24
25
26
      min_query_len: int = Field(
          3,
          description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
      )
5b8f58c0   tangwang   sugg
27
28
29
30
31
32
33
34
35
36
37
38
39
40
      publish_alias: bool = Field(
          True,
          description="是否在构建完成后发布 alias 到新版本索引",
      )
      keep_versions: int = Field(
          2,
          description="保留的最新版本索引数量",
      )
  
  
  class IncrementalBuildRequest(BaseModel):
      """增量更新 suggestion 索引"""
  
      tenant_id: str = Field(..., description="租户 ID")
1cca75c8   tangwang   sugg 索引文档
41
42
43
44
      min_query_len: int = Field(
          3,
          description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
      )
5b8f58c0   tangwang   sugg
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
      fallback_days: int = Field(
          7,
          description="当没有增量水位线时,默认从最近多少天的查询日志开始补",
      )
      overlap_minutes: int = Field(
          30,
          description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏",
      )
      bootstrap_if_missing: bool = Field(
          True,
          description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建",
      )
      bootstrap_days: int = Field(
          30,
          description="bootstrap 全量构建时的查询日志回溯天数",
      )
      batch_size: int = Field(
          500,
          description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小",
      )
  
  
  async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]:
      loop = asyncio.get_event_loop()
      return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
  
  
  @router.post("/full")
  async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]:
      """
      全量构建/重建指定租户的 suggestion 索引。
  
      该接口会:
      - 从商品索引 + 查询日志中构建候选词
      - 写入新的 suggestion 索引(使用 versioned 命名)
      - 根据配置决定是否切换 alias 并清理旧版本
      """
      builder = get_suggestion_builder()
      if builder is None:
          raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
  
      try:
          result = await _run_in_executor(
              builder.rebuild_tenant_index,
              tenant_id=request.tenant_id,
              days=request.days,
              batch_size=request.batch_size,
              min_query_len=request.min_query_len,
              publish_alias=request.publish_alias,
              keep_versions=request.keep_versions,
          )
          return result
      except Exception as e:
          logger.error(
              "Error in full suggestion rebuild for tenant_id=%s: %s",
              request.tenant_id,
              e,
              exc_info=True,
          )
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  
  
  @router.post("/incremental")
  async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]:
      """
      增量更新指定租户的 suggestion 索引。
  
      - 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志
      - 根据查询频次对 suggestion 索引做增量 upsert
      - 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建
      """
      builder = get_suggestion_builder()
      if builder is None:
          raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
  
      try:
          result = await _run_in_executor(
              builder.incremental_update_tenant_index,
              tenant_id=request.tenant_id,
              min_query_len=request.min_query_len,
              fallback_days=request.fallback_days,
              overlap_minutes=request.overlap_minutes,
              bootstrap_if_missing=request.bootstrap_if_missing,
              bootstrap_days=request.bootstrap_days,
              batch_size=request.batch_size,
          )
          return result
      except Exception as e:
          logger.error(
              "Error in incremental suggestion update for tenant_id=%s: %s",
              request.tenant_id,
              e,
              exc_info=True,
          )
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")