suggestion_indexer.py
4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from typing import Any, Dict, Optional
import asyncio
import logging
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from ..service_registry import get_suggestion_builder
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/suggestions", tags=["suggestions"])
class FullBuildRequest(BaseModel):
"""全量构建 suggestion 索引"""
tenant_id: str = Field(..., description="租户 ID")
days: int = Field(360, description="查询日志回溯天数")
batch_size: int = Field(500, description="商品扫描 batch 大小")
min_query_len: int = Field(
3,
description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
)
publish_alias: bool = Field(
True,
description="是否在构建完成后发布 alias 到新版本索引",
)
keep_versions: int = Field(
2,
description="保留的最新版本索引数量",
)
class IncrementalBuildRequest(BaseModel):
"""增量更新 suggestion 索引"""
tenant_id: str = Field(..., description="租户 ID")
min_query_len: int = Field(
3,
description="最小查询长度过滤(中文字符按 2 计数,其余字符按 1 计数)",
)
fallback_days: int = Field(
7,
description="当没有增量水位线时,默认从最近多少天的查询日志开始补",
)
overlap_minutes: int = Field(
30,
description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏",
)
bootstrap_if_missing: bool = Field(
True,
description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建",
)
bootstrap_days: int = Field(
30,
description="bootstrap 全量构建时的查询日志回溯天数",
)
batch_size: int = Field(
500,
description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小",
)
async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
@router.post("/full")
async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]:
"""
全量构建/重建指定租户的 suggestion 索引。
该接口会:
- 从商品索引 + 查询日志中构建候选词
- 写入新的 suggestion 索引(使用 versioned 命名)
- 根据配置决定是否切换 alias 并清理旧版本
"""
builder = get_suggestion_builder()
if builder is None:
raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
try:
result = await _run_in_executor(
builder.rebuild_tenant_index,
tenant_id=request.tenant_id,
days=request.days,
batch_size=request.batch_size,
min_query_len=request.min_query_len,
publish_alias=request.publish_alias,
keep_versions=request.keep_versions,
)
return result
except Exception as e:
logger.error(
"Error in full suggestion rebuild for tenant_id=%s: %s",
request.tenant_id,
e,
exc_info=True,
)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/incremental")
async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]:
"""
增量更新指定租户的 suggestion 索引。
- 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志
- 根据查询频次对 suggestion 索引做增量 upsert
- 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建
"""
builder = get_suggestion_builder()
if builder is None:
raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
try:
result = await _run_in_executor(
builder.incremental_update_tenant_index,
tenant_id=request.tenant_id,
min_query_len=request.min_query_len,
fallback_days=request.fallback_days,
overlap_minutes=request.overlap_minutes,
bootstrap_if_missing=request.bootstrap_if_missing,
bootstrap_days=request.bootstrap_days,
batch_size=request.batch_size,
)
return result
except Exception as e:
logger.error(
"Error in incremental suggestion update for tenant_id=%s: %s",
request.tenant_id,
e,
exc_info=True,
)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")