ded6f29e
tangwang
补充suggestion模块
|
1
2
3
4
5
6
7
8
9
|
"""
Online suggestion query service.
"""
import logging
import time
from typing import Any, Dict, List, Optional
from config.tenant_config_loader import get_tenant_config_loader
|
00c8ddb9
tangwang
suggest rank opti...
|
10
|
from query.query_parser import simple_tokenize_query
|
5b8f58c0
tangwang
sugg
|
11
|
from suggestion.builder import get_suggestion_alias_name
|
ded6f29e
tangwang
补充suggestion模块
|
12
13
14
15
16
|
from utils.es_client import ESClient
logger = logging.getLogger(__name__)
|
00c8ddb9
tangwang
suggest rank opti...
|
17
18
19
20
21
22
23
24
25
26
27
|
def _suggestion_length_factor(text: str) -> float:
"""Down-weight longer strings at query time: factor 1 / sqrt(token_len)."""
n = max(len(simple_tokenize_query(str(text or ""))), 1)
return 1.0 / (n ** 0.5)
def _score_with_token_length_penalty(item: Dict[str, Any]) -> float:
base = float(item.get("score") or 0.0)
return base * _suggestion_length_factor(str(item.get("text") or ""))
|
ded6f29e
tangwang
补充suggestion模块
|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
class SuggestionService:
def __init__(self, es_client: ESClient):
self.es_client = es_client
def _resolve_language(self, tenant_id: str, language: str) -> str:
cfg = get_tenant_config_loader().get_tenant_config(tenant_id)
index_languages = cfg.get("index_languages") or ["en", "zh"]
primary = cfg.get("primary_language") or "en"
lang = (language or "").strip().lower().replace("-", "_")
if lang in {"zh_tw", "pt_br"}:
normalized = lang
else:
normalized = lang.split("_")[0] if lang else ""
if normalized in index_languages:
return normalized
if primary in index_languages:
return primary
return index_languages[0]
|
ff9efda0
tangwang
suggest
|
47
48
49
50
|
def _resolve_search_target(self, tenant_id: str) -> Optional[str]:
alias_name = get_suggestion_alias_name(tenant_id)
if self.es_client.alias_exists(alias_name):
return alias_name
|
ff9efda0
tangwang
suggest
|
51
52
|
return None
|
bd96cead
tangwang
1. 动态多语言字段与统一策略配置
|
53
54
55
56
57
58
|
def _completion_suggest(
self,
index_name: str,
query: str,
lang: str,
size: int,
|
ff9efda0
tangwang
suggest
|
59
|
tenant_id: str,
|
bd96cead
tangwang
1. 动态多语言字段与统一策略配置
|
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
) -> List[Dict[str, Any]]:
"""
Query ES completion suggester from `completion.<lang>`.
Returns items in the same shape as search hits -> dicts with "text"/"lang"/"score"/"rank_score"/"sources".
"""
field_name = f"completion.{lang}"
body = {
"suggest": {
"s": {
"prefix": query,
"completion": {
"field": field_name,
"size": size,
"skip_duplicates": True,
},
}
},
"_source": [
"text",
"lang",
"rank_score",
"sources",
"lang_source",
"lang_confidence",
"lang_conflict",
],
}
try:
|
ff9efda0
tangwang
suggest
|
89
|
resp = self.es_client.client.search(index=index_name, body=body, routing=str(tenant_id))
|
bd96cead
tangwang
1. 动态多语言字段与统一策略配置
|
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
except Exception as e:
# completion is an optimization path; never hard-fail the whole endpoint
logger.warning("Completion suggest failed for index=%s field=%s: %s", index_name, field_name, e)
return []
entries = (resp.get("suggest", {}) or {}).get("s", []) or []
if not entries:
return []
options = entries[0].get("options", []) or []
out: List[Dict[str, Any]] = []
for opt in options:
src = opt.get("_source", {}) or {}
out.append(
{
"text": src.get("text") or opt.get("text"),
"lang": src.get("lang") or lang,
"score": opt.get("_score", 0.0),
"rank_score": src.get("rank_score"),
"sources": src.get("sources", []),
"lang_source": src.get("lang_source"),
"lang_confidence": src.get("lang_confidence"),
"lang_conflict": src.get("lang_conflict", False),
}
)
return out
|
ded6f29e
tangwang
补充suggestion模块
|
116
117
118
119
120
121
|
def search(
self,
tenant_id: str,
query: str,
language: str,
size: int = 10,
|
ded6f29e
tangwang
补充suggestion模块
|
122
123
|
) -> Dict[str, Any]:
start = time.time()
|
efd435cf
tangwang
tei性能调优:
|
124
|
query_text = str(query or "").strip()
|
ded6f29e
tangwang
补充suggestion模块
|
125
|
resolved_lang = self._resolve_language(tenant_id, language)
|
ff9efda0
tangwang
suggest
|
126
127
|
index_name = self._resolve_search_target(tenant_id)
if not index_name:
|
bd96cead
tangwang
1. 动态多语言字段与统一策略配置
|
128
129
130
131
132
133
134
135
136
137
|
# On a fresh ES cluster the suggestion index might not be built yet.
# Keep endpoint stable for frontend autocomplete: return empty list instead of 500.
took_ms = int((time.time() - start) * 1000)
return {
"query": query,
"language": language,
"resolved_language": resolved_lang,
"suggestions": [],
"took_ms": took_ms,
}
|
ded6f29e
tangwang
补充suggestion模块
|
138
|
|
efd435cf
tangwang
tei性能调优:
|
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
# Recall path A: completion suggester (fast path, usually enough for short prefix typing)
t_completion_start = time.time()
completion_items = self._completion_suggest(
index_name=index_name,
query=query_text,
lang=resolved_lang,
size=size,
tenant_id=tenant_id,
)
completion_ms = int((time.time() - t_completion_start) * 1000)
suggestions: List[Dict[str, Any]] = []
seen_text_norm: set = set()
def _norm_text(v: Any) -> str:
return str(v or "").strip().lower()
def _append_items(items: List[Dict[str, Any]]) -> None:
for item in items:
text_val = item.get("text")
norm = _norm_text(text_val)
if not norm or norm in seen_text_norm:
continue
seen_text_norm.add(norm)
suggestions.append(dict(item))
|
00c8ddb9
tangwang
suggest rank opti...
|
165
166
167
168
169
170
171
172
173
174
175
|
def _finalize_suggestion_list(items: List[Dict[str, Any]], limit: int) -> List[Dict[str, Any]]:
out = list(items)
out.sort(
key=lambda x: (
_score_with_token_length_penalty(x),
float(x.get("rank_score") or 0.0),
),
reverse=True,
)
return out[:limit]
|
efd435cf
tangwang
tei性能调优:
|
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
_append_items(completion_items)
# Fast path: avoid a second ES query for short prefixes or when completion already full.
if len(query_text) <= 2 or len(suggestions) >= size:
took_ms = int((time.time() - start) * 1000)
logger.info(
"suggest completion-fast-return | tenant=%s lang=%s q=%s completion=%d took_ms=%d completion_ms=%d",
tenant_id,
resolved_lang,
query_text,
len(suggestions),
took_ms,
completion_ms,
)
return {
"query": query,
"language": language,
"resolved_language": resolved_lang,
|
00c8ddb9
tangwang
suggest rank opti...
|
194
|
"suggestions": _finalize_suggestion_list(suggestions, size),
|
efd435cf
tangwang
tei性能调优:
|
195
196
197
198
|
"took_ms": took_ms,
}
# Recall path B: bool_prefix on search_as_you_type (fallback/recall补全)
|
ded6f29e
tangwang
补充suggestion模块
|
199
200
|
sat_field = f"sat.{resolved_lang}"
dsl = {
|
ff9efda0
tangwang
suggest
|
201
|
"track_total_hits": False,
|
ded6f29e
tangwang
补充suggestion模块
|
202
203
204
205
206
207
208
209
210
211
212
|
"query": {
"function_score": {
"query": {
"bool": {
"filter": [
{"term": {"lang": resolved_lang}},
{"term": {"status": 1}},
],
"should": [
{
"multi_match": {
|
efd435cf
tangwang
tei性能调优:
|
213
|
"query": query_text,
|
ded6f29e
tangwang
补充suggestion模块
|
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
"type": "bool_prefix",
"fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"],
}
}
],
"minimum_should_match": 1,
}
},
"field_value_factor": {
"field": "rank_score",
"factor": 1.0,
"modifier": "log1p",
"missing": 0.0,
},
"boost_mode": "sum",
"score_mode": "sum",
}
},
"_source": [
"text",
"lang",
"rank_score",
"sources",
|
ded6f29e
tangwang
补充suggestion模块
|
237
238
239
240
241
|
"lang_source",
"lang_confidence",
"lang_conflict",
],
}
|
efd435cf
tangwang
tei性能调优:
|
242
|
t_sat_start = time.time()
|
ff9efda0
tangwang
suggest
|
243
244
245
246
247
248
249
|
es_resp = self.es_client.search(
index_name=index_name,
body=dsl,
size=size,
from_=0,
routing=str(tenant_id),
)
|
efd435cf
tangwang
tei性能调优:
|
250
|
sat_ms = int((time.time() - t_sat_start) * 1000)
|
ded6f29e
tangwang
补充suggestion模块
|
251
252
|
hits = es_resp.get("hits", {}).get("hits", []) or []
|
efd435cf
tangwang
tei性能调优:
|
253
|
sat_items: List[Dict[str, Any]] = []
|
ded6f29e
tangwang
补充suggestion模块
|
254
255
|
for hit in hits:
src = hit.get("_source", {}) or {}
|
efd435cf
tangwang
tei性能调优:
|
256
257
258
259
260
261
262
263
264
265
266
267
268
|
sat_items.append(
{
"text": src.get("text"),
"lang": src.get("lang"),
"score": hit.get("_score", 0.0),
"rank_score": src.get("rank_score"),
"sources": src.get("sources", []),
"lang_source": src.get("lang_source"),
"lang_confidence": src.get("lang_confidence"),
"lang_conflict": src.get("lang_conflict", False),
}
)
_append_items(sat_items)
|
ded6f29e
tangwang
补充suggestion模块
|
269
270
|
took_ms = int((time.time() - start) * 1000)
|
efd435cf
tangwang
tei性能调优:
|
271
272
273
274
275
276
277
278
279
280
281
|
logger.info(
"suggest completion+sat-return | tenant=%s lang=%s q=%s completion=%d sat_hits=%d took_ms=%d completion_ms=%d sat_ms=%d",
tenant_id,
resolved_lang,
query_text,
len(completion_items),
len(hits),
took_ms,
completion_ms,
sat_ms,
)
|
ded6f29e
tangwang
补充suggestion模块
|
282
283
284
285
|
return {
"query": query,
"language": language,
"resolved_language": resolved_lang,
|
00c8ddb9
tangwang
suggest rank opti...
|
286
|
"suggestions": _finalize_suggestion_list(suggestions, size),
|
ded6f29e
tangwang
补充suggestion模块
|
287
288
|
"took_ms": took_ms,
}
|