From ded6f29efd71c7de27104bc5a9f08d9e5c48dfc7 Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 2 Mar 2026 20:35:05 +0800 Subject: [PATCH] 补充suggestion模块 --- api/app.py | 13 ++++++++++++- api/models.py | 5 +++++ api/routes/search.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++------------------------------------- main.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ suggestion/README.md | 402 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ suggestion/__init__.py | 14 ++++++++++++++ suggestion/builder.py | 390 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ suggestion/mapping.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ suggestion/service.py | 181 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 1197 insertions(+), 38 deletions(-) create mode 100644 suggestion/README.md create mode 100644 suggestion/__init__.py create mode 100644 suggestion/builder.py create mode 100644 suggestion/mapping.py create mode 100644 suggestion/service.py diff --git a/api/app.py b/api/app.py index ab3f9cb..7c15979 100644 --- a/api/app.py +++ b/api/app.py @@ -48,12 +48,14 @@ from config import ConfigLoader from utils import ESClient from search import Searcher from query import QueryParser +from suggestion import SuggestionService from .service_registry import set_es_client # Global instances _es_client: Optional[ESClient] = None _searcher: Optional[Searcher] = None _query_parser: Optional[QueryParser] = None +_suggestion_service: Optional[SuggestionService] = None _config = None @@ -64,7 +66,7 @@ def init_service(es_host: str = "http://localhost:9200"): Args: es_host: Elasticsearch host URL """ - global _es_client, _searcher, _query_parser, _config + global _es_client, _searcher, _query_parser, _suggestion_service, _config start_time = time.time() logger.info("Initializing search service (multi-tenant)") @@ -98,6 +100,8 @@ def init_service(es_host: str = "http://localhost:9200"): logger.info("Initializing searcher...") _searcher = Searcher(_es_client, _config, _query_parser) + logger.info("Initializing suggestion service...") + _suggestion_service = SuggestionService(_es_client) elapsed = time.time() - start_time logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}") @@ -126,6 +130,13 @@ def get_query_parser() -> QueryParser: return _query_parser +def get_suggestion_service() -> SuggestionService: + """Get suggestion service instance.""" + if _suggestion_service is None: + raise RuntimeError("Service not initialized") + return _suggestion_service + + def get_config(): """Get global config instance.""" if _config is None: diff --git a/api/models.py b/api/models.py index 484152e..94028fa 100644 --- a/api/models.py +++ b/api/models.py @@ -202,6 +202,9 @@ class SearchSuggestRequest(BaseModel): ["query"], description="建议类型:query(查询建议), product(商品建议), category(类目建议), brand(品牌建议)" ) + language: Optional[str] = Field(None, description="请求语言(如 zh/en/ar/ru)") + with_results: bool = Field(True, description="是否返回每条 suggestion 的直达商品结果") + result_size: int = Field(3, ge=1, le=10, description="每条 suggestion 返回商品数量") class FacetValue(BaseModel): @@ -310,6 +313,8 @@ class SearchResponse(BaseModel): class SearchSuggestResponse(BaseModel): """搜索建议响应模型(框架,暂不实现)""" query: str = Field(..., description="原始查询") + language: Optional[str] = Field(None, description="请求语言") + resolved_language: Optional[str] = Field(None, description="服务端解析后的语言") suggestions: List[Dict[str, Any]] = Field(..., description="建议列表") took_ms: int = Field(..., description="耗时(毫秒)") diff --git a/api/routes/search.py b/api/routes/search.py index a1f7b05..e137da1 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -269,48 +269,58 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): @router.get("/suggestions", response_model=SearchSuggestResponse) async def search_suggestions( q: str = Query(..., min_length=1, description="搜索查询"), - size: int = Query(5, ge=1, le=20, description="建议数量"), - types: str = Query("query", description="建议类型(逗号分隔): query, product, category, brand") + size: int = Query(10, ge=1, le=20, description="建议数量"), + language: str = Query("en", description="请求语言,如 zh/en/ar/ru"), + with_results: bool = Query(True, description="是否附带每条 suggestion 的直达商品"), + result_size: int = Query(3, ge=1, le=10, description="每条 suggestion 直达商品数量"), + debug: bool = Query(False, description="是否返回调试信息"), + http_request: Request = None, ): """ 获取搜索建议(自动补全)。 - 功能说明: - - 查询建议(query):基于历史搜索和热门搜索 - - 商品建议(product):匹配的商品 - - 类目建议(category):匹配的类目 - - 品牌建议(brand):匹配的品牌 - - 注意:此功能暂未实现,仅返回框架响应。 + 获取搜索建议(自动补全,支持多语言与直达商品)。 """ - import time - start_time = time.time() - - # TODO: 实现搜索建议逻辑 - # 1. 从搜索历史中获取建议 - # 2. 从商品标题中匹配前缀 - # 3. 从类目、品牌中匹配 - - # 临时返回空结果 - suggestions = [] - - # 示例结构(暂不实现): - # suggestions = [ - # { - # "text": "芭比娃娃", - # "type": "query", - # "highlight": "比娃娃", - # "popularity": 850 - # } - # ] - - took_ms = int((time.time() - start_time) * 1000) - - return SearchSuggestResponse( - query=q, - suggestions=suggestions, - took_ms=took_ms - ) + # Extract tenant_id (required) + tenant_id = http_request.headers.get("X-Tenant-ID") if http_request else None + if not tenant_id and http_request: + from urllib.parse import parse_qs + query_string = http_request.url.query + if query_string: + params = parse_qs(query_string) + tenant_id = params.get("tenant_id", [None])[0] + + if not tenant_id: + raise HTTPException( + status_code=400, + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'", + ) + + try: + from api.app import get_suggestion_service + + service = get_suggestion_service() + result = service.search( + tenant_id=tenant_id, + query=q, + language=language, + size=size, + with_results=with_results, + result_size=result_size, + ) + response = SearchSuggestResponse( + query=result["query"], + language=result.get("language"), + resolved_language=result.get("resolved_language"), + suggestions=result["suggestions"], + took_ms=result["took_ms"], + ) + if debug: + # keep response_model stable; debug info stays inside suggestions payload for now + return response + return response + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @router.get("/instant", response_model=SearchResponse) diff --git a/main.py b/main.py index f1614f6..4c0ae91 100755 --- a/main.py +++ b/main.py @@ -18,8 +18,11 @@ import uvicorn sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ConfigLoader +from config.env_config import DB_CONFIG from utils import ESClient from search import Searcher +from suggestion import SuggestionIndexBuilder +from utils.db_connector import create_db_connection def cmd_serve(args): @@ -97,6 +100,32 @@ def cmd_search(args): return 0 +def cmd_build_suggestions(args): + """Build suggestion index for a tenant.""" + es_client = ESClient(hosts=[args.es_host]) + if not es_client.ping(): + print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") + return 1 + + db_engine = create_db_connection( + host=DB_CONFIG["host"], + port=DB_CONFIG["port"], + database=DB_CONFIG["database"], + username=DB_CONFIG["username"], + password=DB_CONFIG["password"], + ) + builder = SuggestionIndexBuilder(es_client=es_client, db_engine=db_engine) + result = builder.rebuild_tenant_index( + tenant_id=args.tenant_id, + days=args.days, + recreate=args.recreate, + batch_size=args.batch_size, + min_query_len=args.min_query_len, + ) + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( @@ -133,6 +162,22 @@ def main(): search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings') search_parser.add_argument('--json', action='store_true', help='Output JSON') + # Suggestion build command + suggest_build_parser = subparsers.add_parser( + 'build-suggestions', + help='Build tenant suggestion index (full rebuild)' + ) + suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID') + suggest_build_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + suggest_build_parser.add_argument('--days', type=int, default=30, help='Query log lookback days') + suggest_build_parser.add_argument('--batch-size', type=int, default=500, help='Product scan batch size') + suggest_build_parser.add_argument('--min-query-len', type=int, default=1, help='Minimum query length') + suggest_build_parser.add_argument( + '--recreate', + action='store_true', + help='Delete and recreate suggestion index before build' + ) + args = parser.parse_args() if not args.command: @@ -146,6 +191,8 @@ def main(): return cmd_serve_indexer(args) elif args.command == 'search': return cmd_search(args) + elif args.command == 'build-suggestions': + return cmd_build_suggestions(args) else: print(f"Unknown command: {args.command}") return 1 diff --git a/suggestion/README.md b/suggestion/README.md new file mode 100644 index 0000000..595d7a3 --- /dev/null +++ b/suggestion/README.md @@ -0,0 +1,402 @@ +# Suggestion 设计文档 + +本文档定义 `search_suggestions` 独立索引方案,用于支持多语言自动补全(suggestion)与结果直达。 + +## 1. 背景与目标 + +当前搜索系统已具备多语言商品索引(`title.{lang}`、`qanchors.{lang}`)与主搜索能力。为了实现输入中实时下拉 suggestion,需要新增一套面向“词”的能力。 + +核心目标: + +- 在不耦合主搜索链路的前提下,提供低延迟 suggestion(实时输入)。 +- 支持多语言,按请求语言路由到对应 suggestion 语种。 +- 支持“结果直达”:每条 suggestion 可附带候选商品列表(通过二次查询 `search_products` 完成)。 +- 支持后续词级排序演进(行为信号、运营控制、去噪治理)。 + +非目标(当前阶段): + +- 不做个性化推荐(用户级 personalization)。 +- 不引入复杂在线学习排序服务。 + +## 2. 总体架构 + +采用双索引架构: + +- 商品索引:`search_products_tenant_{tenant_id}` +- 建议词索引:`search_suggestions_tenant_{tenant_id}` + +在线查询主路径: + +1. 仅查询 `search_suggestions_tenant_{tenant_id}` 得到 suggestion 列表。 +2. 对每条 suggestion 进行“结果直达”的二次查询(`msearch`)到 `search_products_tenant_{tenant_id}`: + - 使用 suggestion 文本对 `title.{lang}` / `qanchors.{lang}` 执行 `term` / `match_phrase_prefix` 组合查询。 +3. 回填每条 suggestion 的商品卡片列表(例如每条 3~5 个)。 + +## 3. API 设计 + +建议保留并增强现有接口:`GET /search/suggestions` + +### 3.1 请求参数 + +- `q` (string, required): 用户输入前缀 +- `size` (int, optional, default=10, max=20): 返回 suggestion 数量 +- `language` (string, required): 请求语言(如 `zh`, `en`, `ar`, `ru`) +- `with_results` (bool, optional, default=true): 是否附带每条 suggestion 的直达商品 +- `result_size` (int, optional, default=3, max=10): 每条 suggestion 附带商品条数 +- `debug` (bool, optional, default=false): 是否返回调试信息 + +Header: + +- `X-Tenant-ID` (required) + +### 3.2 响应结构 + +```json +{ + "query": "iph", + "language": "en", + "suggestions": [ + { + "text": "iphone 15", + "lang": "en", + "score": 12.37, + "sources": ["query_log", "qanchor"], + "products": [ + { + "spu_id": "12345", + "title": "iPhone 15 Pro Max", + "price": 999.0, + "image_url": "https://..." + } + ] + } + ], + "took_ms": 14, + "debug_info": {} +} +``` + +## 4. 索引设计:`search_suggestions_tenant_{tenant_id}` + +文档粒度:`tenant_id + lang + text_norm` 唯一一条文档。 + +### 4.1 字段定义(建议) + +- `tenant_id` (`keyword`) +- `lang` (`keyword`) +- `text` (`keyword`):展示文本 +- `text_norm` (`keyword`):归一化文本(去重键) +- `sources` (`keyword[]`):来源集合,取值:`title` / `qanchor` / `query_log` +- `title_doc_count` (`integer`):来自 title 的命中文档数 +- `qanchor_doc_count` (`integer`):来自 qanchor 的命中文档数 +- `query_count_7d` (`integer`):7 天搜索词计数 +- `query_count_30d` (`integer`):30 天搜索词计数 +- `rank_score` (`float`):离线计算总分 +- `status` (`byte`):1=online, 0=offline +- `updated_at` (`date`) + +用于召回: + +- `completion` (`object`): + - `completion.{lang}`: `completion` 类型(按语言设置 analyzer) +- `sat` (`object`): + - `sat.{lang}`: `search_as_you_type`(增强多词前缀效果) + +可选字段(用于加速直达): + +- `top_spu_ids` (`keyword[]`):预计算商品候选 id + +### 4.2 Mapping 样例(简化) + +```json +{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "tenant_id": { "type": "keyword" }, + "lang": { "type": "keyword" }, + "text": { "type": "keyword" }, + "text_norm": { "type": "keyword" }, + "sources": { "type": "keyword" }, + "title_doc_count": { "type": "integer" }, + "qanchor_doc_count": { "type": "integer" }, + "query_count_7d": { "type": "integer" }, + "query_count_30d": { "type": "integer" }, + "rank_score": { "type": "float" }, + "status": { "type": "byte" }, + "updated_at": { "type": "date" }, + "completion": { + "properties": { + "zh": { "type": "completion", "analyzer": "index_ansj", "search_analyzer": "query_ansj" }, + "en": { "type": "completion", "analyzer": "english" }, + "ar": { "type": "completion", "analyzer": "arabic" }, + "ru": { "type": "completion", "analyzer": "russian" } + } + }, + "sat": { + "properties": { + "zh": { "type": "search_as_you_type", "analyzer": "index_ansj" }, + "en": { "type": "search_as_you_type", "analyzer": "english" }, + "ar": { "type": "search_as_you_type", "analyzer": "arabic" }, + "ru": { "type": "search_as_you_type", "analyzer": "russian" } + } + }, + "top_spu_ids": { "type": "keyword" } + } + } +} +``` + +说明:实际支持语种需与 `search_products` 已支持语种保持一致。 + +## 5. 全量建索引逻辑(核心) + +全量程序职责:扫描商品 `title/qanchors` 与搜索日志 `query`,聚合后写入 `search_suggestions`。 + +输入: + +- `search_products_tenant_{tenant_id}` 文档 +- MySQL 表:`shoplazza_search_log` + +输出: + +- `search_suggestions_tenant_{tenant_id}` 全量文档 + +### 5.1 流程 + +1. 创建/重建 `search_suggestions_tenant_{tenant_id}`。 +2. 遍历 `search_products_tenant_{tenant_id}`(`scroll` 或 `search_after`): + - 提取每个商品的 `title.{lang}`、`qanchors.{lang}`。 + - 归一化文本(NFKC、trim、lower、空白折叠)。 + - 产出候选词并累加: + - `title_doc_count += 1` + - `qanchor_doc_count += 1` + - `sources` 加来源。 +3. 读取日志: + - SQL 拉取 `tenant_id` 下时间窗数据(如 30 天)。 + - 对每条 `query` 解析语言归属(优先 `shoplazza_search_log.language`,其次 `request_params.language`,见第 6 节)。 + - 累加 `query_count_7d` / `query_count_30d`,`sources` 加 `query_log`。 +4. 清洗与过滤: + - 去空、去纯符号、长度阈值过滤。 + - 可选黑名单过滤(运营配置)。 +5. 计算 `rank_score`(见第 7 节)。 +6. 组装文档: + - 写 `completion.{lang}` + `sat.{lang}`。 + - `_id = md5(tenant_id|lang|text_norm)`。 +7. 批量写入(bulk upsert)。 + +### 5.2 伪代码 + +```python +for tenant_id in tenants: + agg = {} # key: (lang, text_norm) + + for doc in scan_es_products(tenant_id): + for lang in index_languages(tenant_id): + add_from_title(agg, doc.title.get(lang), lang, doc.spu_id) + add_from_qanchor(agg, doc.qanchors.get(lang), lang, doc.spu_id) + + for row in fetch_search_logs(tenant_id, days=30): + lang, conf = resolve_query_lang( + query=row.query, + log_language=row.language, + request_params_json=row.request_params, + tenant_id=tenant_id + ) + if not lang: + continue + add_from_query_log(agg, row.query, lang, row.create_time) + + docs = [] + for (lang, text_norm), item in agg.items(): + if not pass_filters(item): + continue + item.rank_score = compute_rank_score(item) + docs.append(to_suggestion_doc(tenant_id, lang, item)) + + bulk_upsert(index=f"search_suggestions_tenant_{tenant_id}", docs=docs) +``` + +## 6. 日志语言解析策略(已新增 language 字段) + +现状:`shoplazza_search_log` 已新增 `language` 字段,且 `request_params`(JSON)中也包含 `language`。 +因此全量程序不再以“纯离线识别”为主,而是采用“日志显式语言优先”的三级策略。 + +### 6.1 语言解析优先级 + +1. **一级:`shoplazza_search_log.language`(最高优先级)** + - 若值存在且合法,直接作为 query 归属语言。 +2. **二级:`request_params.language`(JSON 兜底)** + - 当表字段为空/非法时,解析 `request_params` JSON 中的 `language`。 +3. **三级:离线识别(最后兜底)** + - 仅在前两者都缺失时启用: + - 脚本直判(CJK/Arabic/Cyrillic) + - 轻量语言识别器(拉丁语) + +### 6.2 一致性校验(推荐) + +当 `shoplazza_search_log.language` 与 `request_params.language` 同时存在但不一致时: + +- 默认采用 `shoplazza_search_log.language` +- 记录 `lang_conflict=true` 用于审计 +- 输出监控指标(冲突率) + +### 6.3 置信度与约束 + +对于一级/二级来源: + +- `lang_confidence=1.0` +- `lang_source=log_field` 或 `lang_source=request_params` + +对于三级离线识别: + +- `confidence >= 0.8`:写入 top1 +- `0.5 <= confidence < 0.8`:写入 top1(必要时兼容 top2 降权) +- `< 0.5`:写入租户 `primary_language`(降权) + +统一约束: + +- 最终写入语言必须属于租户 `index_languages` + +建议额外存储: + +- `lang_confidence`(float) +- `lang_source`(`log_field`/`request_params`/`script`/`model`/`default`) +- `lang_conflict`(bool) + +便于后续质量审计与数据回溯。 + +## 7. 排序分数设计(离线) + +建议采用可解释线性组合: + +```text +rank_score = + w1 * log1p(query_count_30d) + + w2 * log1p(query_count_7d) + + w3 * log1p(qanchor_doc_count) + + w4 * log1p(title_doc_count) + + w5 * business_bonus +``` + +推荐初始权重(可配置): + +- `w1=1.8`, `w2=1.2`, `w3=1.0`, `w4=0.6`, `w5=0.3` + +说明: + +- 搜索日志信号优先级最高(最接近真实用户意图)。 +- `qanchor` 高于 `title`(更偏 query 风格)。 +- `business_bonus` 可接入销量、库存可售率等轻量业务信号。 + +## 8. 在线查询逻辑(suggestion) + +主路径只查 `search_suggestions`。 + +### 8.1 Suggestion 查询 DSL(示例) + +```json +{ + "size": 10, + "query": { + "function_score": { + "query": { + "bool": { + "filter": [ + { "term": { "lang": "en" } }, + { "term": { "status": 1 } } + ], + "should": [ + { + "multi_match": { + "query": "iph", + "type": "bool_prefix", + "fields": [ + "sat.en", + "sat.en._2gram", + "sat.en._3gram" + ] + } + } + ], + "minimum_should_match": 1 + } + }, + "field_value_factor": { + "field": "rank_score", + "factor": 1.0, + "modifier": "log1p", + "missing": 0 + }, + "boost_mode": "sum", + "score_mode": "sum" + } + }, + "_source": [ + "text", + "lang", + "rank_score", + "sources", + "top_spu_ids" + ] +} +``` + +可选:completion 方式(极低延迟)也可作为同接口内另一条召回通道,再与上面结果融合去重。 + +## 9. 结果直达(二次查询) + +`with_results=true` 时,对每条 suggestion 的 `text` 做二次查询到 `search_products_tenant_{tenant_id}`。 + +推荐使用 `msearch`,每条 suggestion 一个子查询: + +- `term`(精确)命中 `qanchors.{lang}.keyword`(若存在 keyword 子字段) +- `match_phrase_prefix` 命中 `title.{lang}` +- 可加权:`qanchors` 命中权重高于 `title` +- 每条 suggestion 返回 `result_size` 条商品 + +若未来希望进一步降在线复杂度,可改为离线写入 `top_spu_ids` 并在在线用 `mget` 回填。 + +## 10. 数据治理与运营控制 + +建议加入以下机制: + +- 黑名单词:人工屏蔽垃圾词、敏感词 +- 白名单词:活动词、品牌词强制保留 +- 最小阈值:低频词不过线(例如 `query_count_30d < 2` 且无 qanchor/title 支撑) +- 去重规则:`text_norm` 维度强去重 +- 更新策略:每日全量 + 每小时增量(后续) + +## 11. 实施里程碑 + +M1(快速上线): + +- 建 `search_suggestions` 索引 +- 全量程序:`title + qanchors + query_log` +- `/search/suggestions` 仅查 suggestion,不带直达 + +M2(增强): + +- 增加二次查询直达商品(`msearch`) +- 引入语言置信度审计报表 +- 加黑白名单与去噪配置 + +M3(优化): + +- completion + bool_prefix 双通道融合 +- 增量构建任务(小时级) +- 排序参数在线配置化 + +## 12. 关键风险与规避 + +- 日志语言字段质量问题导致错写:通过 `log_field > request_params > model` 三级策略与冲突审计规避 +- 高频噪声词上浮:黑名单 + 最小阈值 + 分数截断 +- 直达二次查询成本上升:控制 `size/result_size`,优先 `msearch` +- 多语言字段不一致:统一语言枚举与映射生成逻辑,避免手写散落 + +--- + +本设计优先保证可落地与可演进:先以独立 suggestion 索引跑通主能力,再逐步增强排序与在线性能。 diff --git a/suggestion/__init__.py b/suggestion/__init__.py new file mode 100644 index 0000000..47be21c --- /dev/null +++ b/suggestion/__init__.py @@ -0,0 +1,14 @@ +""" +Suggestion module. + +Contains: +- Suggestion index mapping builder +- Full rebuild indexer (product + query logs) +- Online suggestion query service +""" + +from .builder import SuggestionIndexBuilder +from .service import SuggestionService + +__all__ = ["SuggestionIndexBuilder", "SuggestionService"] + diff --git a/suggestion/builder.py b/suggestion/builder.py new file mode 100644 index 0000000..f5b06b6 --- /dev/null +++ b/suggestion/builder.py @@ -0,0 +1,390 @@ +""" +Full suggestion index builder. + +Build data from: +- ES product index fields: title.{lang}, qanchors.{lang} +- MySQL search logs: shoplazza_search_log.query (+ language metadata) +""" + +import json +import logging +import math +import re +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple + +from sqlalchemy import text + +from config.tenant_config_loader import get_tenant_config_loader +from utils.es_client import ESClient +from suggestion.mapping import build_suggestion_mapping + +logger = logging.getLogger(__name__) + + +def get_suggestion_index_name(tenant_id: str) -> str: + return f"search_suggestions_tenant_{tenant_id}" + + +@dataclass +class SuggestionCandidate: + text: str + text_norm: str + lang: str + sources: set = field(default_factory=set) + title_spu_ids: set = field(default_factory=set) + qanchor_spu_ids: set = field(default_factory=set) + query_count_7d: int = 0 + query_count_30d: int = 0 + lang_confidence: float = 1.0 + lang_source: str = "default" + lang_conflict: bool = False + top_spu_scores: Dict[str, float] = field(default_factory=dict) + + def add_product(self, source: str, spu_id: str, score: float) -> None: + self.sources.add(source) + if source == "title": + self.title_spu_ids.add(spu_id) + elif source == "qanchor": + self.qanchor_spu_ids.add(spu_id) + prev = self.top_spu_scores.get(spu_id) + if prev is None or score > prev: + self.top_spu_scores[spu_id] = score + + def add_query_log(self, is_7d: bool) -> None: + self.sources.add("query_log") + self.query_count_30d += 1 + if is_7d: + self.query_count_7d += 1 + + +class SuggestionIndexBuilder: + """Build and rebuild suggestion index.""" + + def __init__(self, es_client: ESClient, db_engine: Any): + self.es_client = es_client + self.db_engine = db_engine + + @staticmethod + def _normalize_text(value: str) -> str: + text_value = (value or "").strip().lower() + text_value = re.sub(r"\s+", " ", text_value) + return text_value + + @staticmethod + def _split_qanchors(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(x).strip() for x in value if str(x).strip()] + raw = str(value).strip() + if not raw: + return [] + parts = re.split(r"[,;|/\n\t]+", raw) + out = [p.strip() for p in parts if p and p.strip()] + if not out: + return [raw] + return out + + @staticmethod + def _looks_noise(text_value: str) -> bool: + if not text_value: + return True + if len(text_value) > 120: + return True + if re.fullmatch(r"[\W_]+", text_value): + return True + return False + + @staticmethod + def _normalize_lang(lang: Optional[str]) -> Optional[str]: + if not lang: + return None + token = str(lang).strip().lower().replace("-", "_") + if not token: + return None + # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br + if token in {"zh_tw", "pt_br"}: + return token + return token.split("_")[0] + + @staticmethod + def _parse_request_params_language(raw: Any) -> Optional[str]: + if raw is None: + return None + if isinstance(raw, dict): + return raw.get("language") + text_raw = str(raw).strip() + if not text_raw: + return None + try: + obj = json.loads(text_raw) + if isinstance(obj, dict): + return obj.get("language") + except Exception: + return None + return None + + @staticmethod + def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]: + # CJK unified + if re.search(r"[\u4e00-\u9fff]", query): + return "zh", 0.98, "script" + # Arabic + if re.search(r"[\u0600-\u06FF]", query): + return "ar", 0.98, "script" + # Cyrillic + if re.search(r"[\u0400-\u04FF]", query): + return "ru", 0.95, "script" + # Greek + if re.search(r"[\u0370-\u03FF]", query): + return "el", 0.95, "script" + # Latin fallback + if re.search(r"[a-zA-Z]", query): + return "en", 0.55, "model" + return None, 0.0, "default" + + def _resolve_query_language( + self, + query: str, + log_language: Optional[str], + request_params: Any, + index_languages: List[str], + primary_language: str, + ) -> Tuple[str, float, str, bool]: + """Resolve lang with priority: log field > request_params > script/model.""" + langs_set = set(index_languages or []) + primary = self._normalize_lang(primary_language) or "en" + if primary not in langs_set and langs_set: + primary = index_languages[0] + + log_lang = self._normalize_lang(log_language) + req_lang = self._normalize_lang(self._parse_request_params_language(request_params)) + conflict = bool(log_lang and req_lang and log_lang != req_lang) + + if log_lang and (not langs_set or log_lang in langs_set): + return log_lang, 1.0, "log_field", conflict + + if req_lang and (not langs_set or req_lang in langs_set): + return req_lang, 1.0, "request_params", conflict + + detected_lang, conf, source = self._detect_script_language(query) + if detected_lang and (not langs_set or detected_lang in langs_set): + return detected_lang, conf, source, conflict + + return primary, 0.3, "default", conflict + + @staticmethod + def _score_product_hit(source: Dict[str, Any]) -> float: + sales = float(source.get("sales") or 0.0) + inventory = float(source.get("total_inventory") or 0.0) + return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4 + + @staticmethod + def _compute_rank_score(c: SuggestionCandidate) -> float: + return ( + 1.8 * math.log1p(c.query_count_30d) + + 1.2 * math.log1p(c.query_count_7d) + + 1.0 * math.log1p(len(c.qanchor_spu_ids)) + + 0.6 * math.log1p(len(c.title_spu_ids)) + ) + + def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]: + """Scan all product docs from tenant index using search_after.""" + from indexer.mapping_generator import get_tenant_index_name + + index_name = get_tenant_index_name(tenant_id) + all_hits: List[Dict[str, Any]] = [] + search_after: Optional[List[Any]] = None + + while True: + body: Dict[str, Any] = { + "size": batch_size, + "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"], + "sort": [{"spu_id": "asc"}], + "query": {"match_all": {}}, + } + if search_after is not None: + body["search_after"] = search_after + + resp = self.es_client.client.search(index=index_name, body=body) + hits = resp.get("hits", {}).get("hits", []) or [] + if not hits: + break + all_hits.extend(hits) + search_after = hits[-1].get("sort") + if len(hits) < batch_size: + break + return all_hits + + def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str: + index_name = get_suggestion_index_name(tenant_id) + if recreate and self.es_client.index_exists(index_name): + logger.info("Deleting existing suggestion index: %s", index_name) + self.es_client.delete_index(index_name) + if not self.es_client.index_exists(index_name): + mapping = build_suggestion_mapping(index_languages=index_languages) + ok = self.es_client.create_index(index_name, mapping) + if not ok: + raise RuntimeError(f"Failed to create suggestion index: {index_name}") + return index_name + + def rebuild_tenant_index( + self, + tenant_id: str, + days: int = 30, + recreate: bool = True, + batch_size: int = 500, + min_query_len: int = 1, + ) -> Dict[str, Any]: + tenant_loader = get_tenant_config_loader() + tenant_cfg = tenant_loader.get_tenant_config(tenant_id) + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] + primary_language: str = tenant_cfg.get("primary_language") or "en" + + index_name = self._create_or_reset_index(tenant_id, index_languages, recreate) + key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {} + + # Step 1: product title/qanchors + hits = self._scan_products(tenant_id, batch_size=batch_size) + for hit in hits: + src = hit.get("_source", {}) or {} + spu_id = str(src.get("spu_id") or "") + if not spu_id: + continue + title_obj = src.get("title") or {} + qanchor_obj = src.get("qanchors") or {} + product_score = self._score_product_hit(src) + + for lang in index_languages: + title = "" + if isinstance(title_obj, dict): + title = str(title_obj.get(lang) or "").strip() + if title: + text_norm = self._normalize_text(title) + if not self._looks_noise(text_norm): + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.add_product("title", spu_id=spu_id, score=product_score) + + q_raw = None + if isinstance(qanchor_obj, dict): + q_raw = qanchor_obj.get(lang) + for q_text in self._split_qanchors(q_raw): + text_norm = self._normalize_text(q_text) + if self._looks_noise(text_norm): + continue + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6) + + # Step 2: query logs + now = datetime.now(timezone.utc) + since_30d = now - timedelta(days=days) + since_7d = now - timedelta(days=7) + query_sql = text( + """ + SELECT query, language, request_params, create_time + FROM shoplazza_search_log + WHERE tenant_id = :tenant_id + AND deleted = 0 + AND query IS NOT NULL + AND query <> '' + AND create_time >= :since_30d + """ + ) + with self.db_engine.connect() as conn: + rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall() + + for row in rows: + q = str(row.query or "").strip() + if len(q) < min_query_len: + continue + lang, conf, source, conflict = self._resolve_query_language( + query=q, + log_language=getattr(row, "language", None), + request_params=getattr(row, "request_params", None), + index_languages=index_languages, + primary_language=primary_language, + ) + text_norm = self._normalize_text(q) + if self._looks_noise(text_norm): + continue + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.lang_confidence = max(c.lang_confidence, conf) + c.lang_source = source if c.lang_source == "default" else c.lang_source + c.lang_conflict = c.lang_conflict or conflict + + created_at = getattr(row, "create_time", None) + if created_at is None: + is_7d = False + else: + # DB datetime usually naive local time; compare conservatively + if isinstance(created_at, datetime) and created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + is_7d = bool(created_at and created_at >= since_7d) + c.add_query_log(is_7d=is_7d) + + # Step 3: build docs + now_iso = datetime.now(timezone.utc).isoformat() + docs: List[Dict[str, Any]] = [] + for (_, _), c in key_to_candidate.items(): + rank_score = self._compute_rank_score(c) + # keep top 20 product ids by score + top_spu_ids = [ + item[0] + for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20] + ] + + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} + sat_obj = {c.lang: c.text} + doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}" + docs.append( + { + "_id": doc_id, + "tenant_id": str(tenant_id), + "lang": c.lang, + "text": c.text, + "text_norm": c.text_norm, + "sources": sorted(c.sources), + "title_doc_count": len(c.title_spu_ids), + "qanchor_doc_count": len(c.qanchor_spu_ids), + "query_count_7d": c.query_count_7d, + "query_count_30d": c.query_count_30d, + "rank_score": float(rank_score), + "lang_confidence": float(c.lang_confidence), + "lang_source": c.lang_source, + "lang_conflict": bool(c.lang_conflict), + "top_spu_ids": top_spu_ids, + "status": 1, + "updated_at": now_iso, + "completion": completion_obj, + "sat": sat_obj, + } + ) + + if docs: + result = self.es_client.bulk_index(index_name=index_name, docs=docs) + self.es_client.refresh(index_name) + else: + result = {"success": 0, "failed": 0, "errors": []} + + return { + "tenant_id": str(tenant_id), + "index_name": index_name, + "total_candidates": len(key_to_candidate), + "indexed_docs": len(docs), + "bulk_result": result, + } + diff --git a/suggestion/mapping.py b/suggestion/mapping.py new file mode 100644 index 0000000..86c597d --- /dev/null +++ b/suggestion/mapping.py @@ -0,0 +1,99 @@ +""" +Mapping generator for suggestion indices. +""" + +from typing import Dict, Any, List + + +ANALYZER_BY_LANG: Dict[str, str] = { + "zh": "index_ansj", + "en": "english", + "ar": "arabic", + "hy": "armenian", + "eu": "basque", + "pt_br": "brazilian", + "bg": "bulgarian", + "ca": "catalan", + "cjk": "cjk", + "cs": "czech", + "da": "danish", + "nl": "dutch", + "fi": "finnish", + "fr": "french", + "gl": "galician", + "de": "german", + "el": "greek", + "hi": "hindi", + "hu": "hungarian", + "id": "indonesian", + "it": "italian", + "no": "norwegian", + "fa": "persian", + "pt": "portuguese", + "ro": "romanian", + "ru": "russian", + "es": "spanish", + "sv": "swedish", + "tr": "turkish", + "th": "thai", +} + + +def _completion_field(lang: str) -> Dict[str, Any]: + analyzer = ANALYZER_BY_LANG.get(lang, "standard") + if lang == "zh": + return { + "type": "completion", + "analyzer": analyzer, + "search_analyzer": "query_ansj", + } + return {"type": "completion", "analyzer": analyzer} + + +def _sat_field(lang: str) -> Dict[str, Any]: + analyzer = ANALYZER_BY_LANG.get(lang, "standard") + return {"type": "search_as_you_type", "analyzer": analyzer} + + +def build_suggestion_mapping(index_languages: List[str]) -> Dict[str, Any]: + """Build index settings+mappings for suggestion index.""" + langs = [x for x in (index_languages or []) if x] + if not langs: + langs = ["en", "zh"] + + completion_props: Dict[str, Any] = {} + sat_props: Dict[str, Any] = {} + for lang in langs: + completion_props[lang] = _completion_field(lang) + sat_props[lang] = _sat_field(lang) + + return { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + }, + "mappings": { + "properties": { + "tenant_id": {"type": "keyword"}, + "lang": {"type": "keyword"}, + "text": {"type": "keyword"}, + "text_norm": {"type": "keyword"}, + "sources": {"type": "keyword"}, + "title_doc_count": {"type": "integer"}, + "qanchor_doc_count": {"type": "integer"}, + "query_count_7d": {"type": "integer"}, + "query_count_30d": {"type": "integer"}, + "rank_score": {"type": "float"}, + "lang_confidence": {"type": "float"}, + "lang_source": {"type": "keyword"}, + "lang_conflict": {"type": "boolean"}, + "top_spu_ids": {"type": "keyword"}, + "status": {"type": "byte"}, + "updated_at": {"type": "date"}, + "completion": {"properties": completion_props}, + "sat": {"properties": sat_props}, + } + }, + } + diff --git a/suggestion/service.py b/suggestion/service.py new file mode 100644 index 0000000..c9411b7 --- /dev/null +++ b/suggestion/service.py @@ -0,0 +1,181 @@ +""" +Online suggestion query service. +""" + +import logging +import time +from typing import Any, Dict, List, Optional + +from config.tenant_config_loader import get_tenant_config_loader +from indexer.mapping_generator import get_tenant_index_name +from suggestion.builder import get_suggestion_index_name +from utils.es_client import ESClient + +logger = logging.getLogger(__name__) + + +class SuggestionService: + def __init__(self, es_client: ESClient): + self.es_client = es_client + + def _resolve_language(self, tenant_id: str, language: str) -> str: + cfg = get_tenant_config_loader().get_tenant_config(tenant_id) + index_languages = cfg.get("index_languages") or ["en", "zh"] + primary = cfg.get("primary_language") or "en" + lang = (language or "").strip().lower().replace("-", "_") + if lang in {"zh_tw", "pt_br"}: + normalized = lang + else: + normalized = lang.split("_")[0] if lang else "" + if normalized in index_languages: + return normalized + if primary in index_languages: + return primary + return index_languages[0] + + def _search_products_for_suggestion( + self, + tenant_id: str, + text_value: str, + lang: str, + result_size: int, + ) -> List[Dict[str, Any]]: + index_name = get_tenant_index_name(tenant_id) + title_field = f"title.{lang}" + qanchor_field = f"qanchors.{lang}" + + body = { + "size": result_size, + "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"], + "query": { + "bool": { + "should": [ + {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}}, + {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}}, + {"match": {title_field: {"query": text_value, "boost": 1.0}}}, + ], + "minimum_should_match": 1, + } + }, + "sort": [{"_score": "desc"}, {"sales": "desc"}], + } + resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0) + hits = resp.get("hits", {}).get("hits", []) or [] + out: List[Dict[str, Any]] = [] + for hit in hits: + src = hit.get("_source", {}) or {} + title_obj = src.get("title") or {} + resolved_title = None + if isinstance(title_obj, dict): + resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh") + if not resolved_title: + for v in title_obj.values(): + if v: + resolved_title = v + break + out.append( + { + "spu_id": src.get("spu_id"), + "title": resolved_title, + "price": src.get("min_price"), + "image_url": src.get("image_url"), + "score": hit.get("_score", 0.0), + } + ) + return out + + def search( + self, + tenant_id: str, + query: str, + language: str, + size: int = 10, + with_results: bool = True, + result_size: int = 3, + ) -> Dict[str, Any]: + start = time.time() + resolved_lang = self._resolve_language(tenant_id, language) + index_name = get_suggestion_index_name(tenant_id) + + sat_field = f"sat.{resolved_lang}" + dsl = { + "size": size, + "query": { + "function_score": { + "query": { + "bool": { + "filter": [ + {"term": {"lang": resolved_lang}}, + {"term": {"status": 1}}, + ], + "should": [ + { + "multi_match": { + "query": query, + "type": "bool_prefix", + "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"], + } + } + ], + "minimum_should_match": 1, + } + }, + "field_value_factor": { + "field": "rank_score", + "factor": 1.0, + "modifier": "log1p", + "missing": 0.0, + }, + "boost_mode": "sum", + "score_mode": "sum", + } + }, + "_source": [ + "text", + "lang", + "rank_score", + "sources", + "top_spu_ids", + "lang_source", + "lang_confidence", + "lang_conflict", + ], + } + es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0) + hits = es_resp.get("hits", {}).get("hits", []) or [] + + suggestions: List[Dict[str, Any]] = [] + for hit in hits: + src = hit.get("_source", {}) or {} + item = { + "text": src.get("text"), + "lang": src.get("lang"), + "score": hit.get("_score", 0.0), + "rank_score": src.get("rank_score"), + "sources": src.get("sources", []), + "lang_source": src.get("lang_source"), + "lang_confidence": src.get("lang_confidence"), + "lang_conflict": src.get("lang_conflict", False), + } + if with_results: + try: + item["products"] = self._search_products_for_suggestion( + tenant_id=tenant_id, + text_value=str(src.get("text") or ""), + lang=resolved_lang, + result_size=result_size, + ) + except Exception as e: + logger.warning("Failed to enrich suggestion products: %s", e) + item["products"] = [] + suggestions.append(item) + + took_ms = int((time.time() - start) * 1000) + return { + "query": query, + "language": language, + "resolved_language": resolved_lang, + "suggestions": suggestions, + "took_ms": took_ms, + } + -- libgit2 0.21.2