Commit ded6f29efd71c7de27104bc5a9f08d9e5c48dfc7

Authored by tangwang
1 parent 89638140

补充suggestion模块

- 新增 `suggestion` 模块:
  - `suggestion/mapping.py`:`search_suggestions` mapping 生成(多语言 `completion` + `search_as_you_type`)
  - `suggestion/builder.py`:全量构建程序(扫描 `search_products` 的 `title/qanchors` + MySQL `shoplazza_search_log`)
  - `suggestion/service.py`:在线查询服务(suggestion 检索 + 结果直达商品二次查询)
  - `suggestion/__init__.py`

- 接入 API 服务初始化:
  - `api/app.py` 新增 `SuggestionService` 初始化和 `get_suggestion_service()`

- 接口实现:
  - `api/routes/search.py` 的 `GET /search/suggestions` 从“空框架”改为真实调用
  - 支持参数:
    - `q`, `size`, `language`
    - `with_results`(是否直达商品)
    - `result_size`(每条 suggestion 商品数)
    - `debug`
  - 继续要求 `X-Tenant-ID`(或 query 的 `tenant_id`)

- 模型补充:
  - `api/models.py` 增加 suggestion 请求/响应字段(`language`, `resolved_language`, `with_results`, `result_size`)

- CLI 全量构建命令:
  - `main.py` 新增 `build-suggestions`
  - 使用方式:
    - `python main.py build-suggestions --tenant-id 1 --recreate`
    - 可选:`--days 30 --batch-size 500 --min-query-len 1 --es-host ...`

---

 关键实现逻辑(已编码)

- 语言归属优先级(按你要求):
  - `shoplazza_search_log.language` > `request_params.language` > 脚本/模型兜底
- 候选词聚合键:
  - `(tenant_id, lang, text_norm)`(文档唯一)
- 评分:
  - 基于 `query_count_30d/7d + qanchor_doc_count + title_doc_count` 的离线分
- 结果直达:
  - 对每条 suggestion 在 `search_products_tenant_{id}` 做二次查询(`qanchors/title` 组合)

---

 变更文件

- `api/app.py`
- `api/models.py`
- `api/routes/search.py`
- `main.py`
- `suggestion/__init__.py`
- `suggestion/mapping.py`
- `suggestion/builder.py`
- `suggestion/service.py`
api/app.py
... ... @@ -48,12 +48,14 @@ from config import ConfigLoader
48 48 from utils import ESClient
49 49 from search import Searcher
50 50 from query import QueryParser
  51 +from suggestion import SuggestionService
51 52 from .service_registry import set_es_client
52 53  
53 54 # Global instances
54 55 _es_client: Optional[ESClient] = None
55 56 _searcher: Optional[Searcher] = None
56 57 _query_parser: Optional[QueryParser] = None
  58 +_suggestion_service: Optional[SuggestionService] = None
57 59 _config = None
58 60  
59 61  
... ... @@ -64,7 +66,7 @@ def init_service(es_host: str = "http://localhost:9200"):
64 66 Args:
65 67 es_host: Elasticsearch host URL
66 68 """
67   - global _es_client, _searcher, _query_parser, _config
  69 + global _es_client, _searcher, _query_parser, _suggestion_service, _config
68 70  
69 71 start_time = time.time()
70 72 logger.info("Initializing search service (multi-tenant)")
... ... @@ -98,6 +100,8 @@ def init_service(es_host: str = "http://localhost:9200"):
98 100  
99 101 logger.info("Initializing searcher...")
100 102 _searcher = Searcher(_es_client, _config, _query_parser)
  103 + logger.info("Initializing suggestion service...")
  104 + _suggestion_service = SuggestionService(_es_client)
101 105  
102 106 elapsed = time.time() - start_time
103 107 logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}")
... ... @@ -126,6 +130,13 @@ def get_query_parser() -> QueryParser:
126 130 return _query_parser
127 131  
128 132  
  133 +def get_suggestion_service() -> SuggestionService:
  134 + """Get suggestion service instance."""
  135 + if _suggestion_service is None:
  136 + raise RuntimeError("Service not initialized")
  137 + return _suggestion_service
  138 +
  139 +
129 140 def get_config():
130 141 """Get global config instance."""
131 142 if _config is None:
... ...
api/models.py
... ... @@ -202,6 +202,9 @@ class SearchSuggestRequest(BaseModel):
202 202 ["query"],
203 203 description="建议类型:query(查询建议), product(商品建议), category(类目建议), brand(品牌建议)"
204 204 )
  205 + language: Optional[str] = Field(None, description="请求语言(如 zh/en/ar/ru)")
  206 + with_results: bool = Field(True, description="是否返回每条 suggestion 的直达商品结果")
  207 + result_size: int = Field(3, ge=1, le=10, description="每条 suggestion 返回商品数量")
205 208  
206 209  
207 210 class FacetValue(BaseModel):
... ... @@ -310,6 +313,8 @@ class SearchResponse(BaseModel):
310 313 class SearchSuggestResponse(BaseModel):
311 314 """搜索建议响应模型(框架,暂不实现)"""
312 315 query: str = Field(..., description="原始查询")
  316 + language: Optional[str] = Field(None, description="请求语言")
  317 + resolved_language: Optional[str] = Field(None, description="服务端解析后的语言")
313 318 suggestions: List[Dict[str, Any]] = Field(..., description="建议列表")
314 319 took_ms: int = Field(..., description="耗时(毫秒)")
315 320  
... ...
api/routes/search.py
... ... @@ -269,48 +269,58 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request):
269 269 @router.get("/suggestions", response_model=SearchSuggestResponse)
270 270 async def search_suggestions(
271 271 q: str = Query(..., min_length=1, description="搜索查询"),
272   - size: int = Query(5, ge=1, le=20, description="建议数量"),
273   - types: str = Query("query", description="建议类型(逗号分隔): query, product, category, brand")
  272 + size: int = Query(10, ge=1, le=20, description="建议数量"),
  273 + language: str = Query("en", description="请求语言,如 zh/en/ar/ru"),
  274 + with_results: bool = Query(True, description="是否附带每条 suggestion 的直达商品"),
  275 + result_size: int = Query(3, ge=1, le=10, description="每条 suggestion 直达商品数量"),
  276 + debug: bool = Query(False, description="是否返回调试信息"),
  277 + http_request: Request = None,
274 278 ):
275 279 """
276 280 获取搜索建议(自动补全)。
277 281  
278   - 功能说明:
279   - - 查询建议(query):基于历史搜索和热门搜索
280   - - 商品建议(product):匹配的商品
281   - - 类目建议(category):匹配的类目
282   - - 品牌建议(brand):匹配的品牌
283   -
284   - 注意:此功能暂未实现,仅返回框架响应。
  282 + 获取搜索建议(自动补全,支持多语言与直达商品)。
285 283 """
286   - import time
287   - start_time = time.time()
288   -
289   - # TODO: 实现搜索建议逻辑
290   - # 1. 从搜索历史中获取建议
291   - # 2. 从商品标题中匹配前缀
292   - # 3. 从类目、品牌中匹配
293   -
294   - # 临时返回空结果
295   - suggestions = []
296   -
297   - # 示例结构(暂不实现):
298   - # suggestions = [
299   - # {
300   - # "text": "芭比娃娃",
301   - # "type": "query",
302   - # "highlight": "<em>芭</em>比娃娃",
303   - # "popularity": 850
304   - # }
305   - # ]
306   -
307   - took_ms = int((time.time() - start_time) * 1000)
308   -
309   - return SearchSuggestResponse(
310   - query=q,
311   - suggestions=suggestions,
312   - took_ms=took_ms
313   - )
  284 + # Extract tenant_id (required)
  285 + tenant_id = http_request.headers.get("X-Tenant-ID") if http_request else None
  286 + if not tenant_id and http_request:
  287 + from urllib.parse import parse_qs
  288 + query_string = http_request.url.query
  289 + if query_string:
  290 + params = parse_qs(query_string)
  291 + tenant_id = params.get("tenant_id", [None])[0]
  292 +
  293 + if not tenant_id:
  294 + raise HTTPException(
  295 + status_code=400,
  296 + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'",
  297 + )
  298 +
  299 + try:
  300 + from api.app import get_suggestion_service
  301 +
  302 + service = get_suggestion_service()
  303 + result = service.search(
  304 + tenant_id=tenant_id,
  305 + query=q,
  306 + language=language,
  307 + size=size,
  308 + with_results=with_results,
  309 + result_size=result_size,
  310 + )
  311 + response = SearchSuggestResponse(
  312 + query=result["query"],
  313 + language=result.get("language"),
  314 + resolved_language=result.get("resolved_language"),
  315 + suggestions=result["suggestions"],
  316 + took_ms=result["took_ms"],
  317 + )
  318 + if debug:
  319 + # keep response_model stable; debug info stays inside suggestions payload for now
  320 + return response
  321 + return response
  322 + except Exception as e:
  323 + raise HTTPException(status_code=500, detail=str(e))
314 324  
315 325  
316 326 @router.get("/instant", response_model=SearchResponse)
... ...
... ... @@ -18,8 +18,11 @@ import uvicorn
18 18 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
19 19  
20 20 from config import ConfigLoader
  21 +from config.env_config import DB_CONFIG
21 22 from utils import ESClient
22 23 from search import Searcher
  24 +from suggestion import SuggestionIndexBuilder
  25 +from utils.db_connector import create_db_connection
23 26  
24 27  
25 28 def cmd_serve(args):
... ... @@ -97,6 +100,32 @@ def cmd_search(args):
97 100 return 0
98 101  
99 102  
  103 +def cmd_build_suggestions(args):
  104 + """Build suggestion index for a tenant."""
  105 + es_client = ESClient(hosts=[args.es_host])
  106 + if not es_client.ping():
  107 + print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}")
  108 + return 1
  109 +
  110 + db_engine = create_db_connection(
  111 + host=DB_CONFIG["host"],
  112 + port=DB_CONFIG["port"],
  113 + database=DB_CONFIG["database"],
  114 + username=DB_CONFIG["username"],
  115 + password=DB_CONFIG["password"],
  116 + )
  117 + builder = SuggestionIndexBuilder(es_client=es_client, db_engine=db_engine)
  118 + result = builder.rebuild_tenant_index(
  119 + tenant_id=args.tenant_id,
  120 + days=args.days,
  121 + recreate=args.recreate,
  122 + batch_size=args.batch_size,
  123 + min_query_len=args.min_query_len,
  124 + )
  125 + print(json.dumps(result, indent=2, ensure_ascii=False))
  126 + return 0
  127 +
  128 +
100 129 def main():
101 130 """Main CLI entry point."""
102 131 parser = argparse.ArgumentParser(
... ... @@ -133,6 +162,22 @@ def main():
133 162 search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings')
134 163 search_parser.add_argument('--json', action='store_true', help='Output JSON')
135 164  
  165 + # Suggestion build command
  166 + suggest_build_parser = subparsers.add_parser(
  167 + 'build-suggestions',
  168 + help='Build tenant suggestion index (full rebuild)'
  169 + )
  170 + suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID')
  171 + suggest_build_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  172 + suggest_build_parser.add_argument('--days', type=int, default=30, help='Query log lookback days')
  173 + suggest_build_parser.add_argument('--batch-size', type=int, default=500, help='Product scan batch size')
  174 + suggest_build_parser.add_argument('--min-query-len', type=int, default=1, help='Minimum query length')
  175 + suggest_build_parser.add_argument(
  176 + '--recreate',
  177 + action='store_true',
  178 + help='Delete and recreate suggestion index before build'
  179 + )
  180 +
136 181 args = parser.parse_args()
137 182  
138 183 if not args.command:
... ... @@ -146,6 +191,8 @@ def main():
146 191 return cmd_serve_indexer(args)
147 192 elif args.command == 'search':
148 193 return cmd_search(args)
  194 + elif args.command == 'build-suggestions':
  195 + return cmd_build_suggestions(args)
149 196 else:
150 197 print(f"Unknown command: {args.command}")
151 198 return 1
... ...
suggestion/README.md 0 → 100644
... ... @@ -0,0 +1,402 @@
  1 +# Suggestion 设计文档
  2 +
  3 +本文档定义 `search_suggestions` 独立索引方案,用于支持多语言自动补全(suggestion)与结果直达。
  4 +
  5 +## 1. 背景与目标
  6 +
  7 +当前搜索系统已具备多语言商品索引(`title.{lang}`、`qanchors.{lang}`)与主搜索能力。为了实现输入中实时下拉 suggestion,需要新增一套面向“词”的能力。
  8 +
  9 +核心目标:
  10 +
  11 +- 在不耦合主搜索链路的前提下,提供低延迟 suggestion(实时输入)。
  12 +- 支持多语言,按请求语言路由到对应 suggestion 语种。
  13 +- 支持“结果直达”:每条 suggestion 可附带候选商品列表(通过二次查询 `search_products` 完成)。
  14 +- 支持后续词级排序演进(行为信号、运营控制、去噪治理)。
  15 +
  16 +非目标(当前阶段):
  17 +
  18 +- 不做个性化推荐(用户级 personalization)。
  19 +- 不引入复杂在线学习排序服务。
  20 +
  21 +## 2. 总体架构
  22 +
  23 +采用双索引架构:
  24 +
  25 +- 商品索引:`search_products_tenant_{tenant_id}`
  26 +- 建议词索引:`search_suggestions_tenant_{tenant_id}`
  27 +
  28 +在线查询主路径:
  29 +
  30 +1. 仅查询 `search_suggestions_tenant_{tenant_id}` 得到 suggestion 列表。
  31 +2. 对每条 suggestion 进行“结果直达”的二次查询(`msearch`)到 `search_products_tenant_{tenant_id}`:
  32 + - 使用 suggestion 文本对 `title.{lang}` / `qanchors.{lang}` 执行 `term` / `match_phrase_prefix` 组合查询。
  33 +3. 回填每条 suggestion 的商品卡片列表(例如每条 3~5 个)。
  34 +
  35 +## 3. API 设计
  36 +
  37 +建议保留并增强现有接口:`GET /search/suggestions`
  38 +
  39 +### 3.1 请求参数
  40 +
  41 +- `q` (string, required): 用户输入前缀
  42 +- `size` (int, optional, default=10, max=20): 返回 suggestion 数量
  43 +- `language` (string, required): 请求语言(如 `zh`, `en`, `ar`, `ru`)
  44 +- `with_results` (bool, optional, default=true): 是否附带每条 suggestion 的直达商品
  45 +- `result_size` (int, optional, default=3, max=10): 每条 suggestion 附带商品条数
  46 +- `debug` (bool, optional, default=false): 是否返回调试信息
  47 +
  48 +Header:
  49 +
  50 +- `X-Tenant-ID` (required)
  51 +
  52 +### 3.2 响应结构
  53 +
  54 +```json
  55 +{
  56 + "query": "iph",
  57 + "language": "en",
  58 + "suggestions": [
  59 + {
  60 + "text": "iphone 15",
  61 + "lang": "en",
  62 + "score": 12.37,
  63 + "sources": ["query_log", "qanchor"],
  64 + "products": [
  65 + {
  66 + "spu_id": "12345",
  67 + "title": "iPhone 15 Pro Max",
  68 + "price": 999.0,
  69 + "image_url": "https://..."
  70 + }
  71 + ]
  72 + }
  73 + ],
  74 + "took_ms": 14,
  75 + "debug_info": {}
  76 +}
  77 +```
  78 +
  79 +## 4. 索引设计:`search_suggestions_tenant_{tenant_id}`
  80 +
  81 +文档粒度:`tenant_id + lang + text_norm` 唯一一条文档。
  82 +
  83 +### 4.1 字段定义(建议)
  84 +
  85 +- `tenant_id` (`keyword`)
  86 +- `lang` (`keyword`)
  87 +- `text` (`keyword`):展示文本
  88 +- `text_norm` (`keyword`):归一化文本(去重键)
  89 +- `sources` (`keyword[]`):来源集合,取值:`title` / `qanchor` / `query_log`
  90 +- `title_doc_count` (`integer`):来自 title 的命中文档数
  91 +- `qanchor_doc_count` (`integer`):来自 qanchor 的命中文档数
  92 +- `query_count_7d` (`integer`):7 天搜索词计数
  93 +- `query_count_30d` (`integer`):30 天搜索词计数
  94 +- `rank_score` (`float`):离线计算总分
  95 +- `status` (`byte`):1=online, 0=offline
  96 +- `updated_at` (`date`)
  97 +
  98 +用于召回:
  99 +
  100 +- `completion` (`object`):
  101 + - `completion.{lang}`: `completion` 类型(按语言设置 analyzer)
  102 +- `sat` (`object`):
  103 + - `sat.{lang}`: `search_as_you_type`(增强多词前缀效果)
  104 +
  105 +可选字段(用于加速直达):
  106 +
  107 +- `top_spu_ids` (`keyword[]`):预计算商品候选 id
  108 +
  109 +### 4.2 Mapping 样例(简化)
  110 +
  111 +```json
  112 +{
  113 + "settings": {
  114 + "number_of_shards": 1,
  115 + "number_of_replicas": 0
  116 + },
  117 + "mappings": {
  118 + "properties": {
  119 + "tenant_id": { "type": "keyword" },
  120 + "lang": { "type": "keyword" },
  121 + "text": { "type": "keyword" },
  122 + "text_norm": { "type": "keyword" },
  123 + "sources": { "type": "keyword" },
  124 + "title_doc_count": { "type": "integer" },
  125 + "qanchor_doc_count": { "type": "integer" },
  126 + "query_count_7d": { "type": "integer" },
  127 + "query_count_30d": { "type": "integer" },
  128 + "rank_score": { "type": "float" },
  129 + "status": { "type": "byte" },
  130 + "updated_at": { "type": "date" },
  131 + "completion": {
  132 + "properties": {
  133 + "zh": { "type": "completion", "analyzer": "index_ansj", "search_analyzer": "query_ansj" },
  134 + "en": { "type": "completion", "analyzer": "english" },
  135 + "ar": { "type": "completion", "analyzer": "arabic" },
  136 + "ru": { "type": "completion", "analyzer": "russian" }
  137 + }
  138 + },
  139 + "sat": {
  140 + "properties": {
  141 + "zh": { "type": "search_as_you_type", "analyzer": "index_ansj" },
  142 + "en": { "type": "search_as_you_type", "analyzer": "english" },
  143 + "ar": { "type": "search_as_you_type", "analyzer": "arabic" },
  144 + "ru": { "type": "search_as_you_type", "analyzer": "russian" }
  145 + }
  146 + },
  147 + "top_spu_ids": { "type": "keyword" }
  148 + }
  149 + }
  150 +}
  151 +```
  152 +
  153 +说明:实际支持语种需与 `search_products` 已支持语种保持一致。
  154 +
  155 +## 5. 全量建索引逻辑(核心)
  156 +
  157 +全量程序职责:扫描商品 `title/qanchors` 与搜索日志 `query`,聚合后写入 `search_suggestions`。
  158 +
  159 +输入:
  160 +
  161 +- `search_products_tenant_{tenant_id}` 文档
  162 +- MySQL 表:`shoplazza_search_log`
  163 +
  164 +输出:
  165 +
  166 +- `search_suggestions_tenant_{tenant_id}` 全量文档
  167 +
  168 +### 5.1 流程
  169 +
  170 +1. 创建/重建 `search_suggestions_tenant_{tenant_id}`。
  171 +2. 遍历 `search_products_tenant_{tenant_id}`(`scroll` 或 `search_after`):
  172 + - 提取每个商品的 `title.{lang}`、`qanchors.{lang}`。
  173 + - 归一化文本(NFKC、trim、lower、空白折叠)。
  174 + - 产出候选词并累加:
  175 + - `title_doc_count += 1`
  176 + - `qanchor_doc_count += 1`
  177 + - `sources` 加来源。
  178 +3. 读取日志:
  179 + - SQL 拉取 `tenant_id` 下时间窗数据(如 30 天)。
  180 + - 对每条 `query` 解析语言归属(优先 `shoplazza_search_log.language`,其次 `request_params.language`,见第 6 节)。
  181 + - 累加 `query_count_7d` / `query_count_30d`,`sources` 加 `query_log`。
  182 +4. 清洗与过滤:
  183 + - 去空、去纯符号、长度阈值过滤。
  184 + - 可选黑名单过滤(运营配置)。
  185 +5. 计算 `rank_score`(见第 7 节)。
  186 +6. 组装文档:
  187 + - 写 `completion.{lang}` + `sat.{lang}`。
  188 + - `_id = md5(tenant_id|lang|text_norm)`。
  189 +7. 批量写入(bulk upsert)。
  190 +
  191 +### 5.2 伪代码
  192 +
  193 +```python
  194 +for tenant_id in tenants:
  195 + agg = {} # key: (lang, text_norm)
  196 +
  197 + for doc in scan_es_products(tenant_id):
  198 + for lang in index_languages(tenant_id):
  199 + add_from_title(agg, doc.title.get(lang), lang, doc.spu_id)
  200 + add_from_qanchor(agg, doc.qanchors.get(lang), lang, doc.spu_id)
  201 +
  202 + for row in fetch_search_logs(tenant_id, days=30):
  203 + lang, conf = resolve_query_lang(
  204 + query=row.query,
  205 + log_language=row.language,
  206 + request_params_json=row.request_params,
  207 + tenant_id=tenant_id
  208 + )
  209 + if not lang:
  210 + continue
  211 + add_from_query_log(agg, row.query, lang, row.create_time)
  212 +
  213 + docs = []
  214 + for (lang, text_norm), item in agg.items():
  215 + if not pass_filters(item):
  216 + continue
  217 + item.rank_score = compute_rank_score(item)
  218 + docs.append(to_suggestion_doc(tenant_id, lang, item))
  219 +
  220 + bulk_upsert(index=f"search_suggestions_tenant_{tenant_id}", docs=docs)
  221 +```
  222 +
  223 +## 6. 日志语言解析策略(已新增 language 字段)
  224 +
  225 +现状:`shoplazza_search_log` 已新增 `language` 字段,且 `request_params`(JSON)中也包含 `language`。
  226 +因此全量程序不再以“纯离线识别”为主,而是采用“日志显式语言优先”的三级策略。
  227 +
  228 +### 6.1 语言解析优先级
  229 +
  230 +1. **一级:`shoplazza_search_log.language`(最高优先级)**
  231 + - 若值存在且合法,直接作为 query 归属语言。
  232 +2. **二级:`request_params.language`(JSON 兜底)**
  233 + - 当表字段为空/非法时,解析 `request_params` JSON 中的 `language`。
  234 +3. **三级:离线识别(最后兜底)**
  235 + - 仅在前两者都缺失时启用:
  236 + - 脚本直判(CJK/Arabic/Cyrillic)
  237 + - 轻量语言识别器(拉丁语)
  238 +
  239 +### 6.2 一致性校验(推荐)
  240 +
  241 +当 `shoplazza_search_log.language` 与 `request_params.language` 同时存在但不一致时:
  242 +
  243 +- 默认采用 `shoplazza_search_log.language`
  244 +- 记录 `lang_conflict=true` 用于审计
  245 +- 输出监控指标(冲突率)
  246 +
  247 +### 6.3 置信度与约束
  248 +
  249 +对于一级/二级来源:
  250 +
  251 +- `lang_confidence=1.0`
  252 +- `lang_source=log_field` 或 `lang_source=request_params`
  253 +
  254 +对于三级离线识别:
  255 +
  256 +- `confidence >= 0.8`:写入 top1
  257 +- `0.5 <= confidence < 0.8`:写入 top1(必要时兼容 top2 降权)
  258 +- `< 0.5`:写入租户 `primary_language`(降权)
  259 +
  260 +统一约束:
  261 +
  262 +- 最终写入语言必须属于租户 `index_languages`
  263 +
  264 +建议额外存储:
  265 +
  266 +- `lang_confidence`(float)
  267 +- `lang_source`(`log_field`/`request_params`/`script`/`model`/`default`)
  268 +- `lang_conflict`(bool)
  269 +
  270 +便于后续质量审计与数据回溯。
  271 +
  272 +## 7. 排序分数设计(离线)
  273 +
  274 +建议采用可解释线性组合:
  275 +
  276 +```text
  277 +rank_score =
  278 + w1 * log1p(query_count_30d)
  279 + + w2 * log1p(query_count_7d)
  280 + + w3 * log1p(qanchor_doc_count)
  281 + + w4 * log1p(title_doc_count)
  282 + + w5 * business_bonus
  283 +```
  284 +
  285 +推荐初始权重(可配置):
  286 +
  287 +- `w1=1.8`, `w2=1.2`, `w3=1.0`, `w4=0.6`, `w5=0.3`
  288 +
  289 +说明:
  290 +
  291 +- 搜索日志信号优先级最高(最接近真实用户意图)。
  292 +- `qanchor` 高于 `title`(更偏 query 风格)。
  293 +- `business_bonus` 可接入销量、库存可售率等轻量业务信号。
  294 +
  295 +## 8. 在线查询逻辑(suggestion)
  296 +
  297 +主路径只查 `search_suggestions`。
  298 +
  299 +### 8.1 Suggestion 查询 DSL(示例)
  300 +
  301 +```json
  302 +{
  303 + "size": 10,
  304 + "query": {
  305 + "function_score": {
  306 + "query": {
  307 + "bool": {
  308 + "filter": [
  309 + { "term": { "lang": "en" } },
  310 + { "term": { "status": 1 } }
  311 + ],
  312 + "should": [
  313 + {
  314 + "multi_match": {
  315 + "query": "iph",
  316 + "type": "bool_prefix",
  317 + "fields": [
  318 + "sat.en",
  319 + "sat.en._2gram",
  320 + "sat.en._3gram"
  321 + ]
  322 + }
  323 + }
  324 + ],
  325 + "minimum_should_match": 1
  326 + }
  327 + },
  328 + "field_value_factor": {
  329 + "field": "rank_score",
  330 + "factor": 1.0,
  331 + "modifier": "log1p",
  332 + "missing": 0
  333 + },
  334 + "boost_mode": "sum",
  335 + "score_mode": "sum"
  336 + }
  337 + },
  338 + "_source": [
  339 + "text",
  340 + "lang",
  341 + "rank_score",
  342 + "sources",
  343 + "top_spu_ids"
  344 + ]
  345 +}
  346 +```
  347 +
  348 +可选:completion 方式(极低延迟)也可作为同接口内另一条召回通道,再与上面结果融合去重。
  349 +
  350 +## 9. 结果直达(二次查询)
  351 +
  352 +`with_results=true` 时,对每条 suggestion 的 `text` 做二次查询到 `search_products_tenant_{tenant_id}`。
  353 +
  354 +推荐使用 `msearch`,每条 suggestion 一个子查询:
  355 +
  356 +- `term`(精确)命中 `qanchors.{lang}.keyword`(若存在 keyword 子字段)
  357 +- `match_phrase_prefix` 命中 `title.{lang}`
  358 +- 可加权:`qanchors` 命中权重高于 `title`
  359 +- 每条 suggestion 返回 `result_size` 条商品
  360 +
  361 +若未来希望进一步降在线复杂度,可改为离线写入 `top_spu_ids` 并在在线用 `mget` 回填。
  362 +
  363 +## 10. 数据治理与运营控制
  364 +
  365 +建议加入以下机制:
  366 +
  367 +- 黑名单词:人工屏蔽垃圾词、敏感词
  368 +- 白名单词:活动词、品牌词强制保留
  369 +- 最小阈值:低频词不过线(例如 `query_count_30d < 2` 且无 qanchor/title 支撑)
  370 +- 去重规则:`text_norm` 维度强去重
  371 +- 更新策略:每日全量 + 每小时增量(后续)
  372 +
  373 +## 11. 实施里程碑
  374 +
  375 +M1(快速上线):
  376 +
  377 +- 建 `search_suggestions` 索引
  378 +- 全量程序:`title + qanchors + query_log`
  379 +- `/search/suggestions` 仅查 suggestion,不带直达
  380 +
  381 +M2(增强):
  382 +
  383 +- 增加二次查询直达商品(`msearch`)
  384 +- 引入语言置信度审计报表
  385 +- 加黑白名单与去噪配置
  386 +
  387 +M3(优化):
  388 +
  389 +- completion + bool_prefix 双通道融合
  390 +- 增量构建任务(小时级)
  391 +- 排序参数在线配置化
  392 +
  393 +## 12. 关键风险与规避
  394 +
  395 +- 日志语言字段质量问题导致错写:通过 `log_field > request_params > model` 三级策略与冲突审计规避
  396 +- 高频噪声词上浮:黑名单 + 最小阈值 + 分数截断
  397 +- 直达二次查询成本上升:控制 `size/result_size`,优先 `msearch`
  398 +- 多语言字段不一致:统一语言枚举与映射生成逻辑,避免手写散落
  399 +
  400 +---
  401 +
  402 +本设计优先保证可落地与可演进:先以独立 suggestion 索引跑通主能力,再逐步增强排序与在线性能。
... ...
suggestion/__init__.py 0 → 100644
... ... @@ -0,0 +1,14 @@
  1 +"""
  2 +Suggestion module.
  3 +
  4 +Contains:
  5 +- Suggestion index mapping builder
  6 +- Full rebuild indexer (product + query logs)
  7 +- Online suggestion query service
  8 +"""
  9 +
  10 +from .builder import SuggestionIndexBuilder
  11 +from .service import SuggestionService
  12 +
  13 +__all__ = ["SuggestionIndexBuilder", "SuggestionService"]
  14 +
... ...
suggestion/builder.py 0 → 100644
... ... @@ -0,0 +1,390 @@
  1 +"""
  2 +Full suggestion index builder.
  3 +
  4 +Build data from:
  5 +- ES product index fields: title.{lang}, qanchors.{lang}
  6 +- MySQL search logs: shoplazza_search_log.query (+ language metadata)
  7 +"""
  8 +
  9 +import json
  10 +import logging
  11 +import math
  12 +import re
  13 +from dataclasses import dataclass, field
  14 +from datetime import datetime, timedelta, timezone
  15 +from typing import Any, Dict, List, Optional, Tuple
  16 +
  17 +from sqlalchemy import text
  18 +
  19 +from config.tenant_config_loader import get_tenant_config_loader
  20 +from utils.es_client import ESClient
  21 +from suggestion.mapping import build_suggestion_mapping
  22 +
  23 +logger = logging.getLogger(__name__)
  24 +
  25 +
  26 +def get_suggestion_index_name(tenant_id: str) -> str:
  27 + return f"search_suggestions_tenant_{tenant_id}"
  28 +
  29 +
  30 +@dataclass
  31 +class SuggestionCandidate:
  32 + text: str
  33 + text_norm: str
  34 + lang: str
  35 + sources: set = field(default_factory=set)
  36 + title_spu_ids: set = field(default_factory=set)
  37 + qanchor_spu_ids: set = field(default_factory=set)
  38 + query_count_7d: int = 0
  39 + query_count_30d: int = 0
  40 + lang_confidence: float = 1.0
  41 + lang_source: str = "default"
  42 + lang_conflict: bool = False
  43 + top_spu_scores: Dict[str, float] = field(default_factory=dict)
  44 +
  45 + def add_product(self, source: str, spu_id: str, score: float) -> None:
  46 + self.sources.add(source)
  47 + if source == "title":
  48 + self.title_spu_ids.add(spu_id)
  49 + elif source == "qanchor":
  50 + self.qanchor_spu_ids.add(spu_id)
  51 + prev = self.top_spu_scores.get(spu_id)
  52 + if prev is None or score > prev:
  53 + self.top_spu_scores[spu_id] = score
  54 +
  55 + def add_query_log(self, is_7d: bool) -> None:
  56 + self.sources.add("query_log")
  57 + self.query_count_30d += 1
  58 + if is_7d:
  59 + self.query_count_7d += 1
  60 +
  61 +
  62 +class SuggestionIndexBuilder:
  63 + """Build and rebuild suggestion index."""
  64 +
  65 + def __init__(self, es_client: ESClient, db_engine: Any):
  66 + self.es_client = es_client
  67 + self.db_engine = db_engine
  68 +
  69 + @staticmethod
  70 + def _normalize_text(value: str) -> str:
  71 + text_value = (value or "").strip().lower()
  72 + text_value = re.sub(r"\s+", " ", text_value)
  73 + return text_value
  74 +
  75 + @staticmethod
  76 + def _split_qanchors(value: Any) -> List[str]:
  77 + if value is None:
  78 + return []
  79 + if isinstance(value, list):
  80 + return [str(x).strip() for x in value if str(x).strip()]
  81 + raw = str(value).strip()
  82 + if not raw:
  83 + return []
  84 + parts = re.split(r"[,;|/\n\t]+", raw)
  85 + out = [p.strip() for p in parts if p and p.strip()]
  86 + if not out:
  87 + return [raw]
  88 + return out
  89 +
  90 + @staticmethod
  91 + def _looks_noise(text_value: str) -> bool:
  92 + if not text_value:
  93 + return True
  94 + if len(text_value) > 120:
  95 + return True
  96 + if re.fullmatch(r"[\W_]+", text_value):
  97 + return True
  98 + return False
  99 +
  100 + @staticmethod
  101 + def _normalize_lang(lang: Optional[str]) -> Optional[str]:
  102 + if not lang:
  103 + return None
  104 + token = str(lang).strip().lower().replace("-", "_")
  105 + if not token:
  106 + return None
  107 + # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br
  108 + if token in {"zh_tw", "pt_br"}:
  109 + return token
  110 + return token.split("_")[0]
  111 +
  112 + @staticmethod
  113 + def _parse_request_params_language(raw: Any) -> Optional[str]:
  114 + if raw is None:
  115 + return None
  116 + if isinstance(raw, dict):
  117 + return raw.get("language")
  118 + text_raw = str(raw).strip()
  119 + if not text_raw:
  120 + return None
  121 + try:
  122 + obj = json.loads(text_raw)
  123 + if isinstance(obj, dict):
  124 + return obj.get("language")
  125 + except Exception:
  126 + return None
  127 + return None
  128 +
  129 + @staticmethod
  130 + def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]:
  131 + # CJK unified
  132 + if re.search(r"[\u4e00-\u9fff]", query):
  133 + return "zh", 0.98, "script"
  134 + # Arabic
  135 + if re.search(r"[\u0600-\u06FF]", query):
  136 + return "ar", 0.98, "script"
  137 + # Cyrillic
  138 + if re.search(r"[\u0400-\u04FF]", query):
  139 + return "ru", 0.95, "script"
  140 + # Greek
  141 + if re.search(r"[\u0370-\u03FF]", query):
  142 + return "el", 0.95, "script"
  143 + # Latin fallback
  144 + if re.search(r"[a-zA-Z]", query):
  145 + return "en", 0.55, "model"
  146 + return None, 0.0, "default"
  147 +
  148 + def _resolve_query_language(
  149 + self,
  150 + query: str,
  151 + log_language: Optional[str],
  152 + request_params: Any,
  153 + index_languages: List[str],
  154 + primary_language: str,
  155 + ) -> Tuple[str, float, str, bool]:
  156 + """Resolve lang with priority: log field > request_params > script/model."""
  157 + langs_set = set(index_languages or [])
  158 + primary = self._normalize_lang(primary_language) or "en"
  159 + if primary not in langs_set and langs_set:
  160 + primary = index_languages[0]
  161 +
  162 + log_lang = self._normalize_lang(log_language)
  163 + req_lang = self._normalize_lang(self._parse_request_params_language(request_params))
  164 + conflict = bool(log_lang and req_lang and log_lang != req_lang)
  165 +
  166 + if log_lang and (not langs_set or log_lang in langs_set):
  167 + return log_lang, 1.0, "log_field", conflict
  168 +
  169 + if req_lang and (not langs_set or req_lang in langs_set):
  170 + return req_lang, 1.0, "request_params", conflict
  171 +
  172 + detected_lang, conf, source = self._detect_script_language(query)
  173 + if detected_lang and (not langs_set or detected_lang in langs_set):
  174 + return detected_lang, conf, source, conflict
  175 +
  176 + return primary, 0.3, "default", conflict
  177 +
  178 + @staticmethod
  179 + def _score_product_hit(source: Dict[str, Any]) -> float:
  180 + sales = float(source.get("sales") or 0.0)
  181 + inventory = float(source.get("total_inventory") or 0.0)
  182 + return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4
  183 +
  184 + @staticmethod
  185 + def _compute_rank_score(c: SuggestionCandidate) -> float:
  186 + return (
  187 + 1.8 * math.log1p(c.query_count_30d)
  188 + + 1.2 * math.log1p(c.query_count_7d)
  189 + + 1.0 * math.log1p(len(c.qanchor_spu_ids))
  190 + + 0.6 * math.log1p(len(c.title_spu_ids))
  191 + )
  192 +
  193 + def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]:
  194 + """Scan all product docs from tenant index using search_after."""
  195 + from indexer.mapping_generator import get_tenant_index_name
  196 +
  197 + index_name = get_tenant_index_name(tenant_id)
  198 + all_hits: List[Dict[str, Any]] = []
  199 + search_after: Optional[List[Any]] = None
  200 +
  201 + while True:
  202 + body: Dict[str, Any] = {
  203 + "size": batch_size,
  204 + "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"],
  205 + "sort": [{"spu_id": "asc"}],
  206 + "query": {"match_all": {}},
  207 + }
  208 + if search_after is not None:
  209 + body["search_after"] = search_after
  210 +
  211 + resp = self.es_client.client.search(index=index_name, body=body)
  212 + hits = resp.get("hits", {}).get("hits", []) or []
  213 + if not hits:
  214 + break
  215 + all_hits.extend(hits)
  216 + search_after = hits[-1].get("sort")
  217 + if len(hits) < batch_size:
  218 + break
  219 + return all_hits
  220 +
  221 + def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str:
  222 + index_name = get_suggestion_index_name(tenant_id)
  223 + if recreate and self.es_client.index_exists(index_name):
  224 + logger.info("Deleting existing suggestion index: %s", index_name)
  225 + self.es_client.delete_index(index_name)
  226 + if not self.es_client.index_exists(index_name):
  227 + mapping = build_suggestion_mapping(index_languages=index_languages)
  228 + ok = self.es_client.create_index(index_name, mapping)
  229 + if not ok:
  230 + raise RuntimeError(f"Failed to create suggestion index: {index_name}")
  231 + return index_name
  232 +
  233 + def rebuild_tenant_index(
  234 + self,
  235 + tenant_id: str,
  236 + days: int = 30,
  237 + recreate: bool = True,
  238 + batch_size: int = 500,
  239 + min_query_len: int = 1,
  240 + ) -> Dict[str, Any]:
  241 + tenant_loader = get_tenant_config_loader()
  242 + tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
  243 + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
  244 + primary_language: str = tenant_cfg.get("primary_language") or "en"
  245 +
  246 + index_name = self._create_or_reset_index(tenant_id, index_languages, recreate)
  247 + key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {}
  248 +
  249 + # Step 1: product title/qanchors
  250 + hits = self._scan_products(tenant_id, batch_size=batch_size)
  251 + for hit in hits:
  252 + src = hit.get("_source", {}) or {}
  253 + spu_id = str(src.get("spu_id") or "")
  254 + if not spu_id:
  255 + continue
  256 + title_obj = src.get("title") or {}
  257 + qanchor_obj = src.get("qanchors") or {}
  258 + product_score = self._score_product_hit(src)
  259 +
  260 + for lang in index_languages:
  261 + title = ""
  262 + if isinstance(title_obj, dict):
  263 + title = str(title_obj.get(lang) or "").strip()
  264 + if title:
  265 + text_norm = self._normalize_text(title)
  266 + if not self._looks_noise(text_norm):
  267 + key = (lang, text_norm)
  268 + c = key_to_candidate.get(key)
  269 + if c is None:
  270 + c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang)
  271 + key_to_candidate[key] = c
  272 + c.add_product("title", spu_id=spu_id, score=product_score)
  273 +
  274 + q_raw = None
  275 + if isinstance(qanchor_obj, dict):
  276 + q_raw = qanchor_obj.get(lang)
  277 + for q_text in self._split_qanchors(q_raw):
  278 + text_norm = self._normalize_text(q_text)
  279 + if self._looks_noise(text_norm):
  280 + continue
  281 + key = (lang, text_norm)
  282 + c = key_to_candidate.get(key)
  283 + if c is None:
  284 + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
  285 + key_to_candidate[key] = c
  286 + c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6)
  287 +
  288 + # Step 2: query logs
  289 + now = datetime.now(timezone.utc)
  290 + since_30d = now - timedelta(days=days)
  291 + since_7d = now - timedelta(days=7)
  292 + query_sql = text(
  293 + """
  294 + SELECT query, language, request_params, create_time
  295 + FROM shoplazza_search_log
  296 + WHERE tenant_id = :tenant_id
  297 + AND deleted = 0
  298 + AND query IS NOT NULL
  299 + AND query <> ''
  300 + AND create_time >= :since_30d
  301 + """
  302 + )
  303 + with self.db_engine.connect() as conn:
  304 + rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall()
  305 +
  306 + for row in rows:
  307 + q = str(row.query or "").strip()
  308 + if len(q) < min_query_len:
  309 + continue
  310 + lang, conf, source, conflict = self._resolve_query_language(
  311 + query=q,
  312 + log_language=getattr(row, "language", None),
  313 + request_params=getattr(row, "request_params", None),
  314 + index_languages=index_languages,
  315 + primary_language=primary_language,
  316 + )
  317 + text_norm = self._normalize_text(q)
  318 + if self._looks_noise(text_norm):
  319 + continue
  320 + key = (lang, text_norm)
  321 + c = key_to_candidate.get(key)
  322 + if c is None:
  323 + c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang)
  324 + key_to_candidate[key] = c
  325 + c.lang_confidence = max(c.lang_confidence, conf)
  326 + c.lang_source = source if c.lang_source == "default" else c.lang_source
  327 + c.lang_conflict = c.lang_conflict or conflict
  328 +
  329 + created_at = getattr(row, "create_time", None)
  330 + if created_at is None:
  331 + is_7d = False
  332 + else:
  333 + # DB datetime usually naive local time; compare conservatively
  334 + if isinstance(created_at, datetime) and created_at.tzinfo is None:
  335 + created_at = created_at.replace(tzinfo=timezone.utc)
  336 + is_7d = bool(created_at and created_at >= since_7d)
  337 + c.add_query_log(is_7d=is_7d)
  338 +
  339 + # Step 3: build docs
  340 + now_iso = datetime.now(timezone.utc).isoformat()
  341 + docs: List[Dict[str, Any]] = []
  342 + for (_, _), c in key_to_candidate.items():
  343 + rank_score = self._compute_rank_score(c)
  344 + # keep top 20 product ids by score
  345 + top_spu_ids = [
  346 + item[0]
  347 + for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20]
  348 + ]
  349 +
  350 + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}}
  351 + sat_obj = {c.lang: c.text}
  352 + doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}"
  353 + docs.append(
  354 + {
  355 + "_id": doc_id,
  356 + "tenant_id": str(tenant_id),
  357 + "lang": c.lang,
  358 + "text": c.text,
  359 + "text_norm": c.text_norm,
  360 + "sources": sorted(c.sources),
  361 + "title_doc_count": len(c.title_spu_ids),
  362 + "qanchor_doc_count": len(c.qanchor_spu_ids),
  363 + "query_count_7d": c.query_count_7d,
  364 + "query_count_30d": c.query_count_30d,
  365 + "rank_score": float(rank_score),
  366 + "lang_confidence": float(c.lang_confidence),
  367 + "lang_source": c.lang_source,
  368 + "lang_conflict": bool(c.lang_conflict),
  369 + "top_spu_ids": top_spu_ids,
  370 + "status": 1,
  371 + "updated_at": now_iso,
  372 + "completion": completion_obj,
  373 + "sat": sat_obj,
  374 + }
  375 + )
  376 +
  377 + if docs:
  378 + result = self.es_client.bulk_index(index_name=index_name, docs=docs)
  379 + self.es_client.refresh(index_name)
  380 + else:
  381 + result = {"success": 0, "failed": 0, "errors": []}
  382 +
  383 + return {
  384 + "tenant_id": str(tenant_id),
  385 + "index_name": index_name,
  386 + "total_candidates": len(key_to_candidate),
  387 + "indexed_docs": len(docs),
  388 + "bulk_result": result,
  389 + }
  390 +
... ...
suggestion/mapping.py 0 → 100644
... ... @@ -0,0 +1,99 @@
  1 +"""
  2 +Mapping generator for suggestion indices.
  3 +"""
  4 +
  5 +from typing import Dict, Any, List
  6 +
  7 +
  8 +ANALYZER_BY_LANG: Dict[str, str] = {
  9 + "zh": "index_ansj",
  10 + "en": "english",
  11 + "ar": "arabic",
  12 + "hy": "armenian",
  13 + "eu": "basque",
  14 + "pt_br": "brazilian",
  15 + "bg": "bulgarian",
  16 + "ca": "catalan",
  17 + "cjk": "cjk",
  18 + "cs": "czech",
  19 + "da": "danish",
  20 + "nl": "dutch",
  21 + "fi": "finnish",
  22 + "fr": "french",
  23 + "gl": "galician",
  24 + "de": "german",
  25 + "el": "greek",
  26 + "hi": "hindi",
  27 + "hu": "hungarian",
  28 + "id": "indonesian",
  29 + "it": "italian",
  30 + "no": "norwegian",
  31 + "fa": "persian",
  32 + "pt": "portuguese",
  33 + "ro": "romanian",
  34 + "ru": "russian",
  35 + "es": "spanish",
  36 + "sv": "swedish",
  37 + "tr": "turkish",
  38 + "th": "thai",
  39 +}
  40 +
  41 +
  42 +def _completion_field(lang: str) -> Dict[str, Any]:
  43 + analyzer = ANALYZER_BY_LANG.get(lang, "standard")
  44 + if lang == "zh":
  45 + return {
  46 + "type": "completion",
  47 + "analyzer": analyzer,
  48 + "search_analyzer": "query_ansj",
  49 + }
  50 + return {"type": "completion", "analyzer": analyzer}
  51 +
  52 +
  53 +def _sat_field(lang: str) -> Dict[str, Any]:
  54 + analyzer = ANALYZER_BY_LANG.get(lang, "standard")
  55 + return {"type": "search_as_you_type", "analyzer": analyzer}
  56 +
  57 +
  58 +def build_suggestion_mapping(index_languages: List[str]) -> Dict[str, Any]:
  59 + """Build index settings+mappings for suggestion index."""
  60 + langs = [x for x in (index_languages or []) if x]
  61 + if not langs:
  62 + langs = ["en", "zh"]
  63 +
  64 + completion_props: Dict[str, Any] = {}
  65 + sat_props: Dict[str, Any] = {}
  66 + for lang in langs:
  67 + completion_props[lang] = _completion_field(lang)
  68 + sat_props[lang] = _sat_field(lang)
  69 +
  70 + return {
  71 + "settings": {
  72 + "number_of_shards": 1,
  73 + "number_of_replicas": 0,
  74 + "refresh_interval": "30s",
  75 + },
  76 + "mappings": {
  77 + "properties": {
  78 + "tenant_id": {"type": "keyword"},
  79 + "lang": {"type": "keyword"},
  80 + "text": {"type": "keyword"},
  81 + "text_norm": {"type": "keyword"},
  82 + "sources": {"type": "keyword"},
  83 + "title_doc_count": {"type": "integer"},
  84 + "qanchor_doc_count": {"type": "integer"},
  85 + "query_count_7d": {"type": "integer"},
  86 + "query_count_30d": {"type": "integer"},
  87 + "rank_score": {"type": "float"},
  88 + "lang_confidence": {"type": "float"},
  89 + "lang_source": {"type": "keyword"},
  90 + "lang_conflict": {"type": "boolean"},
  91 + "top_spu_ids": {"type": "keyword"},
  92 + "status": {"type": "byte"},
  93 + "updated_at": {"type": "date"},
  94 + "completion": {"properties": completion_props},
  95 + "sat": {"properties": sat_props},
  96 + }
  97 + },
  98 + }
  99 +
... ...
suggestion/service.py 0 → 100644
... ... @@ -0,0 +1,181 @@
  1 +"""
  2 +Online suggestion query service.
  3 +"""
  4 +
  5 +import logging
  6 +import time
  7 +from typing import Any, Dict, List, Optional
  8 +
  9 +from config.tenant_config_loader import get_tenant_config_loader
  10 +from indexer.mapping_generator import get_tenant_index_name
  11 +from suggestion.builder import get_suggestion_index_name
  12 +from utils.es_client import ESClient
  13 +
  14 +logger = logging.getLogger(__name__)
  15 +
  16 +
  17 +class SuggestionService:
  18 + def __init__(self, es_client: ESClient):
  19 + self.es_client = es_client
  20 +
  21 + def _resolve_language(self, tenant_id: str, language: str) -> str:
  22 + cfg = get_tenant_config_loader().get_tenant_config(tenant_id)
  23 + index_languages = cfg.get("index_languages") or ["en", "zh"]
  24 + primary = cfg.get("primary_language") or "en"
  25 + lang = (language or "").strip().lower().replace("-", "_")
  26 + if lang in {"zh_tw", "pt_br"}:
  27 + normalized = lang
  28 + else:
  29 + normalized = lang.split("_")[0] if lang else ""
  30 + if normalized in index_languages:
  31 + return normalized
  32 + if primary in index_languages:
  33 + return primary
  34 + return index_languages[0]
  35 +
  36 + def _search_products_for_suggestion(
  37 + self,
  38 + tenant_id: str,
  39 + text_value: str,
  40 + lang: str,
  41 + result_size: int,
  42 + ) -> List[Dict[str, Any]]:
  43 + index_name = get_tenant_index_name(tenant_id)
  44 + title_field = f"title.{lang}"
  45 + qanchor_field = f"qanchors.{lang}"
  46 +
  47 + body = {
  48 + "size": result_size,
  49 + "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"],
  50 + "query": {
  51 + "bool": {
  52 + "should": [
  53 + {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}},
  54 + {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}},
  55 + {"match": {title_field: {"query": text_value, "boost": 1.0}}},
  56 + ],
  57 + "minimum_should_match": 1,
  58 + }
  59 + },
  60 + "sort": [{"_score": "desc"}, {"sales": "desc"}],
  61 + }
  62 + resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0)
  63 + hits = resp.get("hits", {}).get("hits", []) or []
  64 + out: List[Dict[str, Any]] = []
  65 + for hit in hits:
  66 + src = hit.get("_source", {}) or {}
  67 + title_obj = src.get("title") or {}
  68 + resolved_title = None
  69 + if isinstance(title_obj, dict):
  70 + resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh")
  71 + if not resolved_title:
  72 + for v in title_obj.values():
  73 + if v:
  74 + resolved_title = v
  75 + break
  76 + out.append(
  77 + {
  78 + "spu_id": src.get("spu_id"),
  79 + "title": resolved_title,
  80 + "price": src.get("min_price"),
  81 + "image_url": src.get("image_url"),
  82 + "score": hit.get("_score", 0.0),
  83 + }
  84 + )
  85 + return out
  86 +
  87 + def search(
  88 + self,
  89 + tenant_id: str,
  90 + query: str,
  91 + language: str,
  92 + size: int = 10,
  93 + with_results: bool = True,
  94 + result_size: int = 3,
  95 + ) -> Dict[str, Any]:
  96 + start = time.time()
  97 + resolved_lang = self._resolve_language(tenant_id, language)
  98 + index_name = get_suggestion_index_name(tenant_id)
  99 +
  100 + sat_field = f"sat.{resolved_lang}"
  101 + dsl = {
  102 + "size": size,
  103 + "query": {
  104 + "function_score": {
  105 + "query": {
  106 + "bool": {
  107 + "filter": [
  108 + {"term": {"lang": resolved_lang}},
  109 + {"term": {"status": 1}},
  110 + ],
  111 + "should": [
  112 + {
  113 + "multi_match": {
  114 + "query": query,
  115 + "type": "bool_prefix",
  116 + "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"],
  117 + }
  118 + }
  119 + ],
  120 + "minimum_should_match": 1,
  121 + }
  122 + },
  123 + "field_value_factor": {
  124 + "field": "rank_score",
  125 + "factor": 1.0,
  126 + "modifier": "log1p",
  127 + "missing": 0.0,
  128 + },
  129 + "boost_mode": "sum",
  130 + "score_mode": "sum",
  131 + }
  132 + },
  133 + "_source": [
  134 + "text",
  135 + "lang",
  136 + "rank_score",
  137 + "sources",
  138 + "top_spu_ids",
  139 + "lang_source",
  140 + "lang_confidence",
  141 + "lang_conflict",
  142 + ],
  143 + }
  144 + es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0)
  145 + hits = es_resp.get("hits", {}).get("hits", []) or []
  146 +
  147 + suggestions: List[Dict[str, Any]] = []
  148 + for hit in hits:
  149 + src = hit.get("_source", {}) or {}
  150 + item = {
  151 + "text": src.get("text"),
  152 + "lang": src.get("lang"),
  153 + "score": hit.get("_score", 0.0),
  154 + "rank_score": src.get("rank_score"),
  155 + "sources": src.get("sources", []),
  156 + "lang_source": src.get("lang_source"),
  157 + "lang_confidence": src.get("lang_confidence"),
  158 + "lang_conflict": src.get("lang_conflict", False),
  159 + }
  160 + if with_results:
  161 + try:
  162 + item["products"] = self._search_products_for_suggestion(
  163 + tenant_id=tenant_id,
  164 + text_value=str(src.get("text") or ""),
  165 + lang=resolved_lang,
  166 + result_size=result_size,
  167 + )
  168 + except Exception as e:
  169 + logger.warning("Failed to enrich suggestion products: %s", e)
  170 + item["products"] = []
  171 + suggestions.append(item)
  172 +
  173 + took_ms = int((time.time() - start) * 1000)
  174 + return {
  175 + "query": query,
  176 + "language": language,
  177 + "resolved_language": resolved_lang,
  178 + "suggestions": suggestions,
  179 + "took_ms": took_ms,
  180 + }
  181 +
... ...