Commit 5b8f58c0c50469f08e984e1d83b369b7ee1f8f5d

Authored by tangwang
1 parent 208e079a

sugg

api/indexer_app.py
@@ -44,14 +44,21 @@ from utils import ESClient # noqa: E402 @@ -44,14 +44,21 @@ from utils import ESClient # noqa: E402
44 from utils.db_connector import create_db_connection # noqa: E402 44 from utils.db_connector import create_db_connection # noqa: E402
45 from indexer.incremental_service import IncrementalIndexerService # noqa: E402 45 from indexer.incremental_service import IncrementalIndexerService # noqa: E402
46 from indexer.bulk_indexing_service import BulkIndexingService # noqa: E402 46 from indexer.bulk_indexing_service import BulkIndexingService # noqa: E402
  47 +from suggestion import SuggestionIndexBuilder # noqa: E402
47 from .routes import indexer as indexer_routes # noqa: E402 48 from .routes import indexer as indexer_routes # noqa: E402
48 -from .service_registry import set_es_client, set_indexer_services # noqa: E402 49 +from .routes import suggestion_indexer as suggestion_indexer_routes # noqa: E402
  50 +from .service_registry import (
  51 + set_es_client,
  52 + set_indexer_services,
  53 + set_suggestion_builder,
  54 +) # noqa: E402
49 55
50 56
51 _es_client: Optional[ESClient] = None 57 _es_client: Optional[ESClient] = None
52 _config = None 58 _config = None
53 _incremental_service: Optional[IncrementalIndexerService] = None 59 _incremental_service: Optional[IncrementalIndexerService] = None
54 _bulk_indexing_service: Optional[BulkIndexingService] = None 60 _bulk_indexing_service: Optional[BulkIndexingService] = None
  61 +_suggestion_builder: Optional[SuggestionIndexBuilder] = None
55 62
56 63
57 def init_indexer_service(es_host: str = "http://localhost:9200"): 64 def init_indexer_service(es_host: str = "http://localhost:9200"):
@@ -61,7 +68,7 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): @@ -61,7 +68,7 @@ def init_indexer_service(es_host: str = "http://localhost:9200"):
61 This mirrors the indexing-related initialization logic in api.app.init_service 68 This mirrors the indexing-related initialization logic in api.app.init_service
62 but without search-related components. 69 but without search-related components.
63 """ 70 """
64 - global _es_client, _config, _incremental_service, _bulk_indexing_service 71 + global _es_client, _config, _incremental_service, _bulk_indexing_service, _suggestion_builder
65 72
66 start_time = time.time() 73 start_time = time.time()
67 logger.info("Initializing Indexer service") 74 logger.info("Initializing Indexer service")
@@ -108,10 +115,12 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): @@ -108,10 +115,12 @@ def init_indexer_service(es_host: str = "http://localhost:9200"):
108 115
109 _incremental_service = IncrementalIndexerService(db_engine) 116 _incremental_service = IncrementalIndexerService(db_engine)
110 _bulk_indexing_service = BulkIndexingService(db_engine, _es_client) 117 _bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
  118 + _suggestion_builder = SuggestionIndexBuilder(es_client=_es_client, db_engine=db_engine)
111 set_indexer_services( 119 set_indexer_services(
112 incremental_service=_incremental_service, 120 incremental_service=_incremental_service,
113 bulk_indexing_service=_bulk_indexing_service, 121 bulk_indexing_service=_bulk_indexing_service,
114 ) 122 )
  123 + set_suggestion_builder(_suggestion_builder)
115 logger.info("Indexer services initialized (incremental + bulk)") 124 logger.info("Indexer services initialized (incremental + bulk)")
116 else: 125 else:
117 missing = [ 126 missing = [
@@ -242,6 +251,8 @@ async def health_check(): @@ -242,6 +251,8 @@ async def health_check():
242 251
243 # Mount the single source of truth indexer routes 252 # Mount the single source of truth indexer routes
244 app.include_router(indexer_routes.router) 253 app.include_router(indexer_routes.router)
  254 +# Mount suggestion indexing routes (full + incremental)
  255 +app.include_router(suggestion_indexer_routes.router)
245 256
246 257
247 if __name__ == "__main__": 258 if __name__ == "__main__":
api/routes/suggestion_indexer.py 0 → 100644
@@ -0,0 +1,134 @@ @@ -0,0 +1,134 @@
  1 +from typing import Any, Dict, Optional
  2 +
  3 +import asyncio
  4 +import logging
  5 +
  6 +from fastapi import APIRouter, HTTPException
  7 +from pydantic import BaseModel, Field
  8 +
  9 +from ..service_registry import get_suggestion_builder
  10 +
  11 +
  12 +logger = logging.getLogger(__name__)
  13 +
  14 +router = APIRouter(prefix="/suggestions", tags=["suggestions"])
  15 +
  16 +
  17 +class FullBuildRequest(BaseModel):
  18 + """全量构建 suggestion 索引"""
  19 +
  20 + tenant_id: str = Field(..., description="租户 ID")
  21 + days: int = Field(360, description="查询日志回溯天数")
  22 + batch_size: int = Field(500, description="商品扫描 batch 大小")
  23 + min_query_len: int = Field(1, description="最小查询长度过滤")
  24 + publish_alias: bool = Field(
  25 + True,
  26 + description="是否在构建完成后发布 alias 到新版本索引",
  27 + )
  28 + keep_versions: int = Field(
  29 + 2,
  30 + description="保留的最新版本索引数量",
  31 + )
  32 +
  33 +
  34 +class IncrementalBuildRequest(BaseModel):
  35 + """增量更新 suggestion 索引"""
  36 +
  37 + tenant_id: str = Field(..., description="租户 ID")
  38 + min_query_len: int = Field(1, description="最小查询长度过滤")
  39 + fallback_days: int = Field(
  40 + 7,
  41 + description="当没有增量水位线时,默认从最近多少天的查询日志开始补",
  42 + )
  43 + overlap_minutes: int = Field(
  44 + 30,
  45 + description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏",
  46 + )
  47 + bootstrap_if_missing: bool = Field(
  48 + True,
  49 + description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建",
  50 + )
  51 + bootstrap_days: int = Field(
  52 + 30,
  53 + description="bootstrap 全量构建时的查询日志回溯天数",
  54 + )
  55 + batch_size: int = Field(
  56 + 500,
  57 + description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小",
  58 + )
  59 +
  60 +
  61 +async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]:
  62 + loop = asyncio.get_event_loop()
  63 + return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
  64 +
  65 +
  66 +@router.post("/full")
  67 +async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]:
  68 + """
  69 + 全量构建/重建指定租户的 suggestion 索引。
  70 +
  71 + 该接口会:
  72 + - 从商品索引 + 查询日志中构建候选词
  73 + - 写入新的 suggestion 索引(使用 versioned 命名)
  74 + - 根据配置决定是否切换 alias 并清理旧版本
  75 + """
  76 + builder = get_suggestion_builder()
  77 + if builder is None:
  78 + raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
  79 +
  80 + try:
  81 + result = await _run_in_executor(
  82 + builder.rebuild_tenant_index,
  83 + tenant_id=request.tenant_id,
  84 + days=request.days,
  85 + batch_size=request.batch_size,
  86 + min_query_len=request.min_query_len,
  87 + publish_alias=request.publish_alias,
  88 + keep_versions=request.keep_versions,
  89 + )
  90 + return result
  91 + except Exception as e:
  92 + logger.error(
  93 + "Error in full suggestion rebuild for tenant_id=%s: %s",
  94 + request.tenant_id,
  95 + e,
  96 + exc_info=True,
  97 + )
  98 + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  99 +
  100 +
  101 +@router.post("/incremental")
  102 +async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]:
  103 + """
  104 + 增量更新指定租户的 suggestion 索引。
  105 +
  106 + - 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志
  107 + - 根据查询频次对 suggestion 索引做增量 upsert
  108 + - 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建
  109 + """
  110 + builder = get_suggestion_builder()
  111 + if builder is None:
  112 + raise HTTPException(status_code=503, detail="Suggestion builder is not initialized")
  113 +
  114 + try:
  115 + result = await _run_in_executor(
  116 + builder.incremental_update_tenant_index,
  117 + tenant_id=request.tenant_id,
  118 + min_query_len=request.min_query_len,
  119 + fallback_days=request.fallback_days,
  120 + overlap_minutes=request.overlap_minutes,
  121 + bootstrap_if_missing=request.bootstrap_if_missing,
  122 + bootstrap_days=request.bootstrap_days,
  123 + batch_size=request.batch_size,
  124 + )
  125 + return result
  126 + except Exception as e:
  127 + logger.error(
  128 + "Error in incremental suggestion update for tenant_id=%s: %s",
  129 + request.tenant_id,
  130 + e,
  131 + exc_info=True,
  132 + )
  133 + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  134 +
api/service_registry.py
@@ -15,6 +15,7 @@ from typing import Any, Optional @@ -15,6 +15,7 @@ from typing import Any, Optional
15 _es_client: Optional[Any] = None 15 _es_client: Optional[Any] = None
16 _incremental_service: Optional[Any] = None 16 _incremental_service: Optional[Any] = None
17 _bulk_indexing_service: Optional[Any] = None 17 _bulk_indexing_service: Optional[Any] = None
  18 +_suggestion_builder: Optional[Any] = None
18 19
19 20
20 def set_es_client(es_client: Any) -> None: 21 def set_es_client(es_client: Any) -> None:
@@ -41,3 +42,12 @@ def get_incremental_service() -> Optional[Any]: @@ -41,3 +42,12 @@ def get_incremental_service() -> Optional[Any]:
41 def get_bulk_indexing_service() -> Optional[Any]: 42 def get_bulk_indexing_service() -> Optional[Any]:
42 return _bulk_indexing_service 43 return _bulk_indexing_service
43 44
  45 +
  46 +def set_suggestion_builder(builder: Any) -> None:
  47 + global _suggestion_builder
  48 + _suggestion_builder = builder
  49 +
  50 +
  51 +def get_suggestion_builder() -> Optional[Any]:
  52 + return _suggestion_builder
  53 +
@@ -63,7 +63,7 @@ When sorting on a field, scores are not computed. By setting track_scores to tru @@ -63,7 +63,7 @@ When sorting on a field, scores are not computed. By setting track_scores to tru
63 63
64 64
65 65
66 - 66 +provider backend 两者的关系,如何配合。
67 translator的设计 : 67 translator的设计 :
68 68
69 QueryParser 里面 并不是调用的6006,目前是把6006做了一个provider,然后translate的总体配置又有6006的baseurl,很混乱!!! 69 QueryParser 里面 并不是调用的6006,目前是把6006做了一个provider,然后translate的总体配置又有6006的baseurl,很混乱!!!
@@ -119,6 +119,9 @@ translation: @@ -119,6 +119,9 @@ translation:
119 119
120 120
121 121
  122 +suggest 索引,现在是全量脚本,要交给金伟
  123 +
  124 +
122 125
123 翻译,增加facebook/nllb-200-distilled-600M 126 翻译,增加facebook/nllb-200-distilled-600M
124 https://blog.csdn.net/qq_42746084/article/details/154947534 127 https://blog.csdn.net/qq_42746084/article/details/154947534
@@ -153,6 +156,16 @@ https://modelscope.cn/models/dengcao/Qwen3-Reranker-4B-GGUF/summary @@ -153,6 +156,16 @@ https://modelscope.cn/models/dengcao/Qwen3-Reranker-4B-GGUF/summary
153 156
154 157
155 158
  159 +reranker 补充:nvidia/llama-nemotron-rerank-1b-v2
  160 +encoder架构。
  161 +比较新。
  162 +性能更好。
  163 +亚马逊 电商搜索数据集比qwen-reranker-4b更好。
  164 +支持vLLM。
  165 +
  166 +
  167 +
  168 +
156 169
157 查看翻译的缓存情况 170 查看翻译的缓存情况
158 171
docs/suggestion索引构建.md 0 → 100644
@@ -0,0 +1,503 @@ @@ -0,0 +1,503 @@
  1 +
  2 +### 一、整体架构与索引设计
  3 +
  4 +#### 1. 索引命名 & 多租户约定
  5 +
  6 +- **每个租户的相关索引:**
  7 +
  8 +- **版本化 suggestion 索引 **
  9 +
  10 + - 单个版本索引名:
  11 +
  12 + \[
  13 + search_suggestions_tenant_{tenant\_id}\_v{yyyyMMddHHmmss}
  14 + \]
  15 +
  16 + 示例:`search_suggestions_tenant_1_v20250318123045`
  17 +
  18 + - 对应的读 alias(线上查询默认走 alias):
  19 +
  20 + \[
  21 + search_suggestions_tenant_{tenant\_id}\_current
  22 + \]
  23 +
  24 + 示例:`search_suggestions_tenant_1_current`
  25 +
  26 +- **元信息索引(所有租户共用)**
  27 +
  28 + - 名称:
  29 +
  30 + \[
  31 + search_suggestions\_meta
  32 + \]
  33 +
  34 + - 用于记录每个租户的:
  35 + - `active_alias`(当前 alias 名)
  36 + - `active_index`(当前实际索引名)
  37 + - `last_full_build_at`
  38 + - `last_incremental_build_at`
  39 + - `last_incremental_watermark`
  40 + - `updated_at`
  41 +
  42 +
  43 +#### 2. suggestion 索引 mapping 结构
  44 +
  45 +出自 `suggestion/mapping.py` → `build_suggestion_mapping(index_languages)`:
  46 +
  47 +- **settings**
  48 +
  49 + ```json
  50 + {
  51 + "number_of_shards": 1,
  52 + "number_of_replicas": 0,
  53 + "refresh_interval": "30s",
  54 + "analysis": {
  55 + "analyzer": {
  56 + "index_ik": {
  57 + "type": "custom",
  58 + "tokenizer": "ik_max_word",
  59 + "filter": ["lowercase", "asciifolding"]
  60 + },
  61 + "query_ik": {
  62 + "type": "custom",
  63 + "tokenizer": "ik_smart",
  64 + "filter": ["lowercase", "asciifolding"]
  65 + }
  66 + }
  67 + }
  68 + }
  69 + ```
  70 +
  71 +- **mappings.properties**
  72 +
  73 + ```json
  74 + {
  75 + "tenant_id": { "type": "keyword" },
  76 + "lang": { "type": "keyword" },
  77 + "text": { "type": "keyword" }, // 显示给用户的原始文案
  78 + "text_norm": { "type": "keyword" }, // 归一化后文本:小写+空白规整
  79 + "sources": { "type": "keyword" }, // 来源集合:["title", "qanchor", "query_log"]
  80 + "title_doc_count": { "type": "integer" },
  81 + "qanchor_doc_count": { "type": "integer" },
  82 + "query_count_7d": { "type": "integer" },
  83 + "query_count_30d": { "type": "integer" },
  84 + "rank_score": { "type": "float" }, // 排序打分
  85 + "lang_confidence": { "type": "float" },
  86 + "lang_source": { "type": "keyword" }, // 语言来源:log_field / request_params / script / default
  87 + "lang_conflict": { "type": "boolean" }, // 是否存在多来源语言冲突
  88 + "status": { "type": "byte" }, // 1 = 有效
  89 + "updated_at": { "type": "date" },
  90 +
  91 + "completion": { // completion suggester 用
  92 + "properties": {
  93 + "<lang>": {
  94 + "type": "completion",
  95 + "analyzer": "...", // 见下文
  96 + "search_analyzer": "..." // zh 使用 query_ik
  97 + }
  98 + }
  99 + },
  100 +
  101 + "sat": { // search_as_you_type 用于 bool_prefix fallback
  102 + "properties": {
  103 + "<lang>": {
  104 + "type": "search_as_you_type",
  105 + "analyzer": "..."
  106 + }
  107 + }
  108 + }
  109 + }
  110 + ```
  111 +
  112 +
  113 +- **语言 → analyzer 映射(ANALYZER_BY_LANG)**(只列关键):
  114 +
  115 + - `"zh"`:`index_ik` / 搜索时 `query_ik`
  116 + - `"en"`:`english`
  117 + - 其他:按 ES 内建语言 analyzer 映射
  118 +
  119 +> 构建索引时,必须根据每个租户的 `index_languages`(如 `["en", "zh"]`)动态构造 `completion.<lang>` 与 `sat.<lang>` 两棵子结构。
  120 +> index_languages永远包括 en
  121 +>
  122 +
  123 +---
  124 +
  125 +### 二、全量构建(Full Rebuild)
  126 +
  127 +对应 `SuggestionIndexBuilder.rebuild_tenant_index(...)` 和 `_build_full_candidates(...)`。
  128 +
  129 +#### 1. 输入参数
  130 +
  131 +关键参数:
  132 +
  133 +- `tenant_id: str`
  134 +- `days: int`:查询日志回溯天数
  135 +- `recreate: bool`:仅 legacy 索引模式下生效,是否先删除旧索引
  136 +- `batch_size: int`:从商品索引扫描商品的分页大小
  137 +- `min_query_len: int`:过滤短查询
  138 +- `publish_alias: bool`:是否构建完成后切 alias(只在 versioned 模式下起作用)
  139 +- `keep_versions: int`:保留多少个最新版本索引
  140 +- `use_versioned_index: bool`:true 使用 `*_v{timestamp}` 版本索引;false 使用 legacy 索引
  141 +
  142 +#### 2. 每租户语言配置
  143 +
  144 +- 通过 `tenant_config`(`get_tenant_config_loader().get_tenant_config(tenant_id)`)拿到:
  145 + - `index_languages: List[str]`,默认 `["en", "zh"]`
  146 + - `primary_language: str`,默认 `"en"`
  147 +
  148 +Java 端需要自己的 tenant 配置源,但**要保持同样字段含义与默认值**。
  149 +
  150 +#### 3. 构建目标索引
  151 +
  152 +1. 如果 `use_versioned_index = true`:
  153 + - 新索引名:`get_suggestion_versioned_index_name(tenant_id)`,即带时间戳后缀
  154 +2. 否则(legacy 模式):
  155 + - 索引名:`get_suggestion_legacy_index_name(tenant_id)`
  156 + - 若 `recreate = true` 且索引已存在,先删除再重建
  157 +3. 若目标索引已经存在 → 抛错(防止误覆盖)
  158 +
  159 +4. 按上文 mapping 创建索引。
  160 +
  161 +#### 4. 构建候选词
  162 +
  163 +##### 4.1 从商品索引收集 title / qanchors(Step 1)
  164 +
  165 +对应 `_iter_products` 与 `_build_full_candidates` 的前半段。
  166 +
  167 +- 数据源:**商品 ES 索引**(SPU 索引)
  168 + - 获取当前租户商品索引名称:通过 `indexer.mapping_generator.get_tenant_index_name(tenant_id)` 获取
  169 + - 遍历店铺的所有商品:获取每个商品的 `"spu_id"`, `"title"`, `"qanchors"` 3个字段(按`spu_id`升序)
  170 +
  171 +
  172 +- 对每个商品文档:
  173 +
  174 + 1. 确定 `product_id`:
  175 + - `spu_id` 优先,否则 `id` / `_id`
  176 + 2. `title` 字段结构:**多语言对象**,如:
  177 +
  178 + ```json
  179 + "title": { "en": "Running Shoes", "zh": "跑步鞋" }
  180 + ```
  181 +
  182 + 3. 对于每个 `lang in index_languages`:
  183 +
  184 + - **title 处理**:
  185 + - 从 `title[lang]` 取出字符串
  186 + - 经过 `_prepare_title_for_suggest` 做长度与截断控制:
  187 + - 若整体长度 ≤ 120,直接使用
  188 + - 若过长:尝试按常见分隔符(中文逗号、英文逗号、分号、竖线、斜杠、括号、方括号等)截取第一段;仍然过长则硬截断到 120 字符,并去掉末尾多余分隔符。
  189 + - 将结果记为 `text_raw`。若为空,跳过。
  190 + - 归一化得到 `text_norm = _normalize_text(text_raw)`:
  191 + - `NFKC` 归一化
  192 + - `strip()`
  193 + - `lower()`
  194 + - 连续空白压缩为单个空格
  195 + - 若 `_looks_noise(text_norm)` 为真(长度为 0 或 > 120,或全部为非字母数字符号),跳过。
  196 + - 使用键 `(lang, text_norm)` 在候选表中查找 / 新建 `SuggestionCandidate`:
  197 + - 初始字段:
  198 + - `text = text_raw`
  199 + - `text_norm = text_norm`
  200 + - `lang = lang`
  201 + - 调用 `add_product("title", spu_id=product_id)`:
  202 + - 记录来源 `"title"`
  203 + - 将 `product_id` 加入 `title_spu_ids`
  204 +
  205 + - **qanchors 处理**:
  206 + - `qanchors` 字段同样为多语言对象:
  207 + ```json
  208 + "qanchors": { "en": "...", "zh": "..." }
  209 + ```
  210 + - 取 `q_raw = qanchors[lang]`
  211 + - 通过 `_split_qanchors(q_raw)` 拆分为若干字符串:
  212 + - 若原值为 list → 去空白后直接返回
  213 + - 若为字符串 → 按 `[,;|/\n\t]+` 拆分,去空白,保底返回原字符串
  214 + - 对每个 `q_text`:
  215 + - `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤
  216 + - 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。
  217 +
  218 +##### 4.2 从查询日志收集用户 query(Step 2)
  219 +
  220 +对应 `_iter_query_log_rows` 与 `_build_full_candidates` 的后半段。
  221 +
  222 +- 时间窗口:
  223 + - `now = 当前 UTC`
  224 + - `since = now - days`
  225 + - `since_7d = now - 7天`
  226 +- 数据源:**MySQL 表 `shoplazza_search_log`**:
  227 + - 字段:
  228 + - `tenant_id`
  229 + - `query`
  230 + - `language`
  231 + - `request_params`
  232 + - `create_time`
  233 + - `deleted`(0 表示有效)
  234 + - 条件:
  235 + - `tenant_id = :tenant_id`
  236 + - `deleted = 0`
  237 + - `query IS NOT NULL AND query <> ''`
  238 + - `create_time >= :since AND create_time < :now`
  239 + - 结果按 `create_time ASC` 排序,`fetch_size` 分页流式遍历。
  240 +
  241 +- 对每一行:
  242 +
  243 + 1. `q = row.query.strip()`
  244 + - 若 `len(q) < min_query_len` 跳过。
  245 + 2. 语言解析 `_resolve_query_language(...)`:
  246 + - 输入:
  247 + - `query=q`
  248 + - `log_language = row.language`
  249 + - `request_params = row.request_params`(可能是 JSON 字符串,内部再解析 `language` 字段)
  250 + - `index_languages` & `primary_language`
  251 + - 决策顺序:
  252 + 1. **日志字段 request_params.language 有值**
  253 + 2. 用检测方法进行检测:`_detect_script_language(query)`:
  254 + 4. 若都失败 → 回退到 `primary_language`,`lang_confidence=0.3`,`lang_source="default"`
  255 + - 若日志里的 `language` 与 `request_params` 解析出的语言不一致,则 `lang_conflict=True`。
  256 +
  257 + 3. 文本归一化:
  258 + - `text_norm = _normalize_text(q)`
  259 + - 若 `_looks_noise(text_norm)` → 跳过。
  260 +
  261 + 4. 合并到候选表:
  262 +
  263 + - key = `(lang, text_norm)`
  264 + - 若不存在,则新建 `SuggestionCandidate`:
  265 + - `text = q`
  266 + - `lang = lang`
  267 + - 其他字段默认。
  268 + - 更新:
  269 + - `lang_confidence = max(c.lang_confidence, conf)`
  270 + - `lang_source`:第一次从 `"default"` 变成具体来源,之后保持已有非 default 值
  271 + - `lang_conflict |= conflict`
  272 + - 根据 `create_time` 是否在 `since_7d` 之后判断 `is_7d`
  273 + - `add_query_log(is_7d)`:
  274 + - `query_count_30d += 1`
  275 + - 最近 7 天则 `query_count_7d += 1`
  276 +
  277 +#### 5. rank_score 计算与文档成型
  278 +
  279 +- **打分公式**(`_compute_rank_score` 与 `_compute_rank_score_from_candidate`):
  280 +
  281 + \[
  282 + rank\_score =
  283 + 1.8 \cdot \log(1 + query\_count\_{30d}) +
  284 + 1.2 \cdot \log(1 + query\_count\_{7d}) +
  285 + 1.0 \cdot \log(1 + qanchor\_doc\_count) +
  286 + 0.6 \cdot \log(1 + title\_doc\_count)
  287 + \]
  288 +
  289 +- 构造最终文档(`_candidate_to_doc`):
  290 +
  291 + - `_id = f"{tenant_id}|{lang}|{text_norm}"`
  292 + - `completion` 字段:
  293 +
  294 + ```json
  295 + "completion": {
  296 + "<lang>": {
  297 + "input": [text], // 原始展示文案
  298 + "weight": int(max(rank_score, 1.0) * 100)
  299 + }
  300 + }
  301 + ```
  302 +
  303 + - `sat` 字段:
  304 +
  305 + ```json
  306 + "sat": {
  307 + "<lang>": text
  308 + }
  309 + ```
  310 +
  311 + - 其余字段直接来自 `SuggestionCandidate` 聚合结果(见 mapping 部分)。
  312 +
  313 +- 将所有 doc 批量写入目标索引,然后执行 `refresh`。
  314 +
  315 +#### 6. alias 发布与旧版本清理
  316 +
  317 +- 若 `publish_alias = true` 且 `use_versioned_index = true`:
  318 +
  319 + 1. 通过 alias 名(`get_suggestion_alias_name`)查出当前挂载的索引列表 `current_indices`
  320 + 2. 构造 `update_aliases` 操作:
  321 + - 为所有旧索引 `remove` alias
  322 + - 为新索引 `add` alias
  323 + 3. 调用 `_cleanup_old_versions`:
  324 + - 通过通配符 `get_suggestion_versioned_index_pattern(tenant_id)` 列出所有版本索引
  325 + - 按名称排序,保留最新 `keep_versions` 个
  326 + - 删除其余版本索引,但保护当前新索引
  327 + 4. 更新 meta 索引:
  328 + - `active_alias`
  329 + - `active_index`
  330 +
  331 +#### 7. meta 索引更新
  332 +
  333 +- 无论是否发布 alias,都会更新 meta:
  334 +
  335 + ```json
  336 + {
  337 + "tenant_id": "<tenant_id>",
  338 + "last_full_build_at": <now_utc_iso>,
  339 + "last_incremental_watermark": <now_utc_iso>,
  340 + "active_index": <index_name>, // 若使用 versioned + alias
  341 + "active_alias": "<alias_name>" // ditto
  342 + }
  343 + ```
  344 +
  345 +- 更新行为是「读旧值 → merge patch → index 文档,refresh wait_for」。
  346 +
  347 +---
  348 +
  349 +### 三、增量构建(Incremental Update)
  350 +
  351 +对应 `incremental_update_tenant_index(...)` 与 `_build_incremental_deltas(...)`, `_build_incremental_actions(...)`。
  352 +
  353 +#### 1. 输入参数
  354 +
  355 +- `tenant_id: str`
  356 +- `min_query_len: int`:过滤短 query
  357 +- `fallback_days: int`:如果找不到上次水位线,从最近 N 天回补
  358 +- `overlap_minutes: int`:窗口向前重叠几分钟,防止日志延迟
  359 +- `bootstrap_if_missing: bool`:没有任何 suggestion 索引时是否先全量构建
  360 +- `bootstrap_days: int`:bootstrap 全量构建的日志回溯天数
  361 +- `batch_size: int`:仅在需要 bootstrap 全量构建时使用
  362 +
  363 +#### 2. 目标索引解析
  364 +
  365 +- `_resolve_incremental_target_index(tenant_id)`:
  366 +
  367 + 1. 首选:alias `get_suggestion_alias_name(tenant_id)` 所指向的索引(一般只会有一个)
  368 + 2. 若 alias 不存在:回退到 legacy 索引名 `get_suggestion_legacy_index_name(tenant_id)`,且索引存在
  369 + 3. 若都不存在 → 返回 `None`
  370 +
  371 +- 若 `target_index is None`:
  372 +
  373 + - 若 `bootstrap_if_missing = false` → 抛错,提示先跑全量或打开 bootstrap。
  374 + - 否则执行一次全量构建 `rebuild_tenant_index(...)`:
  375 + - 参数:`days=bootstrap_days`, `batch_size=batch_size`, `min_query_len`, `publish_alias=true`, `use_versioned_index=true`
  376 + - 返回 `{"mode": "incremental", "bootstrapped": true, "bootstrap_result": ...}`
  377 +
  378 +#### 3. 增量时间窗口计算
  379 +
  380 +- 从 meta 索引 `_get_meta(tenant_id)` 中读取:
  381 + - `last_incremental_watermark` 或 `last_full_build_at`
  382 +- 若是 ISO 字符串(可能带 `Z` 后缀)→ 解析为 UTC `since`。
  383 +- 若解析失败或不存在 → `since = now - fallback_days`
  384 +- 然后:
  385 + - `since = since - overlap_minutes`
  386 + - 但不得早于 `now - fallback_days`(防止 overlap 过大)
  387 +- 最终时间窗口:
  388 +
  389 + ```text
  390 + [since, now)
  391 + ```
  392 +
  393 +> 时间计算与 UTC 处理逻辑保持一致(字符串格式可兼容 ISO 8601)。
  394 +
  395 +#### 4. 构建 QueryDelta(按 query + 语言累积增量)
  396 +
  397 +- `_build_incremental_deltas(...)` 同样从表 `shoplazza_search_log` 读取,只是时间窗口换成增量窗口 `since` ~ `now`。
  398 +- 对每一条日志:
  399 + 1. `q = row.query.strip()`,若 `len(q) < min_query_len` → 跳过。
  400 + 2. 仍然通过 `_resolve_query_language` 计算 `(lang, conf, source, conflict)`。
  401 + 3. `text_norm = _normalize_text(q)`,`_looks_noise` 过滤。
  402 + 4. 键 `(lang, text_norm)`,合并为 `QueryDelta`:
  403 + - 初始:
  404 + - `tenant_id`
  405 + - `lang`
  406 + - `text = q`
  407 + - `text_norm`
  408 + - `lang_confidence = conf`
  409 + - `lang_source = source`
  410 + - `lang_conflict = conflict`
  411 + - 更新逻辑:
  412 + - `delta_30d += 1`
  413 + - 若 `create_time >= now - 7d` → `delta_7d += 1`
  414 + - 若新的 `conf > lang_confidence` → 更新 `lang_confidence` & `lang_source`
  415 + - `lang_conflict |= conflict`
  416 +
  417 +#### 5. 构建增量更新动作(Bulk Update with Scripted Upsert)
  418 +
  419 +- 将每个 `QueryDelta` 转换为 upsert 文档(`_delta_to_upsert_doc`):
  420 + - 使用与全量同一套打分公式,但只基于增量的 `delta_7d / delta_30d`。
  421 + - 设置:
  422 + - `sources = ["query_log"]`
  423 + - `title_doc_count = 0`
  424 + - `qanchor_doc_count = 0`
  425 + - `completion.<lang>.input = [text]`
  426 + - `completion.<lang>.weight = int(max(rank_score, 1.0) * 100)`
  427 + - `sat.<lang> = text`
  428 + - `status = 1`
  429 + - `updated_at = now_iso`(本次执行时间)
  430 +
  431 +- 为每个 delta 构造一个 bulk **update** 动作(`_build_incremental_actions`):
  432 +
  433 + - `_op_type = "update"`
  434 + - `_index = target_index`
  435 + - `_id = f"{tenant_id}|{lang}|{text_norm}"`
  436 + - `scripted_upsert = true`
  437 + - `script.lang = "painless"`
  438 + - `script.source` 为一段 Painless 脚本(见 `SuggestionIndexBuilder._build_incremental_update_script`),其要点:
  439 +
  440 + - 若文档不存在 → 直接 `ctx._source = params.upsert`
  441 + - 若存在:
  442 + - 初始化空字段
  443 + - `query_count_30d += params.delta_30d`
  444 + - `query_count_7d += params.delta_7d`
  445 + - 将 `"query_log"` 加入 `sources`
  446 + - `lang_conflict` 与 `params.lang_conflict` 取或
  447 + - 若 `params.lang_confidence > ctx._source.lang_confidence` 则更新 `lang_confidence` 和 `lang_source`
  448 + - 基于更新后的 `query_count_7d/30d` + `qanchor_doc_count` + `title_doc_count` 重新计算 `rank_score`
  449 + - `status = 1`
  450 + - `updated_at = params.now_iso`
  451 + - 同步更新 `text / lang / text_norm`
  452 + - 更新 `completion[lang]` 的 `input` 与 `weight`
  453 + - 更新 `sat[lang] = text`
  454 +
  455 + - `script.params` 包含:
  456 + - `delta_30d`, `delta_7d`, `lang_confidence`, `lang_source`, `lang_conflict`
  457 + - `now_iso`, `lang`, `text`, `text_norm`
  458 + - `completion_input`, `completion_weight`
  459 + - `upsert`(完整 upsert 文档)
  460 +
  461 +- 整个增量批次调 `bulk` / `bulk_actions`,执行完成后 `refresh(target_index)`。
  462 +
  463 +#### 6. 更新 meta 索引
  464 +
  465 +- 更新字段:
  466 +
  467 + ```json
  468 + {
  469 + "last_incremental_build_at": now_iso,
  470 + "last_incremental_watermark": now_iso,
  471 + "active_index": target_index,
  472 + "active_alias": "<alias_name>"
  473 + }
  474 + ```
  475 +
  476 +> 注意:即便外部不依赖 `active_alias`,最好仍然保持与现有 Python 实现一致,方便混合环境切换。
  477 +
  478 +---
  479 +
  480 +### 四、语言解析与噪音过滤
  481 +
  482 +#### 1. 文本归一化 `_normalize_text(value: str)`
  483 +
  484 +- `unicodedata.normalize("NFKC", value or "")`
  485 +- `strip()`
  486 +- 转小写
  487 +- 用正则 `\s+` 将多空白压缩成单个空格
  488 +
  489 +#### 2. 噪音检测 `_looks_noise(text_value: str)`
  490 +
  491 +- 任何一条为真即视为噪音,候选会被忽略:
  492 +
  493 + - 空字符串
  494 + - 长度 > 120
  495 + - `re.fullmatch(r"[\W_]+", text_value)`:全是符号/下划线,没有文字
  496 +
  497 +#### 3. 语言标准化 `_normalize_lang(lang)`
  498 +
  499 +- 输入 lang 为空 → None
  500 +- 转小写,`-` 替换成 `_`
  501 +- 若是 `"zh_tw"` / `"pt_br"` → 保留全量
  502 +- 其他 → 取 `_` 前缀(例如 `"en_US"` → `"en"`)
  503 +
frontend/index.html
@@ -281,7 +281,10 @@ @@ -281,7 +281,10 @@
281 return; 281 return;
282 } 282 }
283 283
284 - const url = new URL(SUGGEST_API); 284 + // SUGGEST_API 可能是相对路径(例如 "/search/suggestions"),
  285 + // 直接 new URL(SUGGEST_API) 在浏览器环境下会因缺少 base URL 报错。
  286 + // 显式指定 window.location.origin 作为 base,保证同源调用正常工作。
  287 + const url = new URL(SUGGEST_API, window.location.origin);
285 url.searchParams.set('q', query); 288 url.searchParams.set('q', query);
286 url.searchParams.set('size', '40'); 289 url.searchParams.set('size', '40');
287 url.searchParams.set('language', getSelectedLang()); 290 url.searchParams.set('language', getSelectedLang());
frontend/static/js/app.js
@@ -30,7 +30,7 @@ function getTenantId() { @@ -30,7 +30,7 @@ function getTenantId() {
30 if (tenantSelect) { 30 if (tenantSelect) {
31 return tenantSelect.value.trim(); 31 return tenantSelect.value.trim();
32 } 32 }
33 - return '170'; // Default fallback 33 + return '';
34 } 34 }
35 35
36 // Get sku_filter_dimension (as list) from input 36 // Get sku_filter_dimension (as list) from input
@@ -138,12 +138,10 @@ def cmd_build_suggestions(args): @@ -138,12 +138,10 @@ def cmd_build_suggestions(args):
138 result = builder.rebuild_tenant_index( 138 result = builder.rebuild_tenant_index(
139 tenant_id=args.tenant_id, 139 tenant_id=args.tenant_id,
140 days=args.days, 140 days=args.days,
141 - recreate=args.recreate,  
142 batch_size=args.batch_size, 141 batch_size=args.batch_size,
143 min_query_len=args.min_query_len, 142 min_query_len=args.min_query_len,
144 publish_alias=args.publish_alias, 143 publish_alias=args.publish_alias,
145 keep_versions=args.keep_versions, 144 keep_versions=args.keep_versions,
146 - use_versioned_index=not args.no_versioned_index,  
147 ) 145 )
148 else: 146 else:
149 result = builder.incremental_update_tenant_index( 147 result = builder.incremental_update_tenant_index(
@@ -224,16 +222,6 @@ def main(): @@ -224,16 +222,6 @@ def main():
224 help='For full mode: keep latest N versioned indices', 222 help='For full mode: keep latest N versioned indices',
225 ) 223 )
226 suggest_build_parser.add_argument( 224 suggest_build_parser.add_argument(
227 - '--no-versioned-index',  
228 - action='store_true',  
229 - help='For full mode: write to legacy concrete index (not recommended)',  
230 - )  
231 - suggest_build_parser.add_argument(  
232 - '--recreate',  
233 - action='store_true',  
234 - help='For legacy concrete index mode: delete and recreate target index before build',  
235 - )  
236 - suggest_build_parser.add_argument(  
237 '--incremental-fallback-days', 225 '--incremental-fallback-days',
238 type=int, 226 type=int,
239 default=7, 227 default=7,
suggestion/builder.py
@@ -30,13 +30,8 @@ def _index_prefix() -&gt; str: @@ -30,13 +30,8 @@ def _index_prefix() -&gt; str:
30 return ES_INDEX_NAMESPACE or "" 30 return ES_INDEX_NAMESPACE or ""
31 31
32 32
33 -def get_suggestion_legacy_index_name(tenant_id: str) -> str:  
34 - """Legacy concrete index name (Phase1 compatibility)."""  
35 - return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}"  
36 -  
37 -  
38 def get_suggestion_alias_name(tenant_id: str) -> str: 33 def get_suggestion_alias_name(tenant_id: str) -> str:
39 - """Read alias for suggestion index (Phase2 default search target).""" 34 + """Read alias for suggestion index (single source of truth)."""
40 return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current" 35 return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current"
41 36
42 37
@@ -54,15 +49,6 @@ def get_suggestion_meta_index_name() -&gt; str: @@ -54,15 +49,6 @@ def get_suggestion_meta_index_name() -&gt; str:
54 return f"{_index_prefix()}search_suggestions_meta" 49 return f"{_index_prefix()}search_suggestions_meta"
55 50
56 51
57 -def get_suggestion_index_name(tenant_id: str) -> str:  
58 - """  
59 - Search target for suggestion query.  
60 -  
61 - Phase2 uses alias by default.  
62 - """  
63 - return get_suggestion_alias_name(tenant_id)  
64 -  
65 -  
66 @dataclass 52 @dataclass
67 class SuggestionCandidate: 53 class SuggestionCandidate:
68 text: str 54 text: str
@@ -431,15 +417,12 @@ class SuggestionIndexBuilder: @@ -431,15 +417,12 @@ class SuggestionIndexBuilder:
431 } 417 }
432 418
433 def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]: 419 def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]:
  420 + """Resolve active suggestion index for incremental updates (alias only)."""
434 alias_name = get_suggestion_alias_name(tenant_id) 421 alias_name = get_suggestion_alias_name(tenant_id)
435 aliased = self.es_client.get_alias_indices(alias_name) 422 aliased = self.es_client.get_alias_indices(alias_name)
436 if aliased: 423 if aliased:
437 # alias should map to one index in this design 424 # alias should map to one index in this design
438 return sorted(aliased)[-1] 425 return sorted(aliased)[-1]
439 -  
440 - legacy = get_suggestion_legacy_index_name(tenant_id)  
441 - if self.es_client.index_exists(legacy):  
442 - return legacy  
443 return None 426 return None
444 427
445 def _build_full_candidates( 428 def _build_full_candidates(
@@ -556,12 +539,10 @@ class SuggestionIndexBuilder: @@ -556,12 +539,10 @@ class SuggestionIndexBuilder:
556 self, 539 self,
557 tenant_id: str, 540 tenant_id: str,
558 days: int = 365, 541 days: int = 365,
559 - recreate: bool = False,  
560 batch_size: int = 500, 542 batch_size: int = 500,
561 min_query_len: int = 1, 543 min_query_len: int = 1,
562 publish_alias: bool = True, 544 publish_alias: bool = True,
563 keep_versions: int = 2, 545 keep_versions: int = 2,
564 - use_versioned_index: bool = True,  
565 ) -> Dict[str, Any]: 546 ) -> Dict[str, Any]:
566 """ 547 """
567 Full rebuild. 548 Full rebuild.
@@ -575,13 +556,8 @@ class SuggestionIndexBuilder: @@ -575,13 +556,8 @@ class SuggestionIndexBuilder:
575 index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] 556 index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
576 primary_language: str = tenant_cfg.get("primary_language") or "en" 557 primary_language: str = tenant_cfg.get("primary_language") or "en"
577 558
578 - if use_versioned_index:  
579 - index_name = get_suggestion_versioned_index_name(tenant_id)  
580 - else:  
581 - index_name = get_suggestion_legacy_index_name(tenant_id)  
582 - if recreate and self.es_client.index_exists(index_name):  
583 - logger.info("Deleting existing suggestion index: %s", index_name)  
584 - self.es_client.delete_index(index_name) 559 + # Always write to a fresh versioned index; legacy concrete index is no longer supported.
  560 + index_name = get_suggestion_versioned_index_name(tenant_id)
585 561
586 if self.es_client.index_exists(index_name): 562 if self.es_client.index_exists(index_name):
587 raise RuntimeError(f"Target suggestion index already exists: {index_name}") 563 raise RuntimeError(f"Target suggestion index already exists: {index_name}")
@@ -609,7 +585,7 @@ class SuggestionIndexBuilder: @@ -609,7 +585,7 @@ class SuggestionIndexBuilder:
609 bulk_result = {"success": 0, "failed": 0, "errors": []} 585 bulk_result = {"success": 0, "failed": 0, "errors": []}
610 586
611 alias_publish: Optional[Dict[str, Any]] = None 587 alias_publish: Optional[Dict[str, Any]] = None
612 - if publish_alias and use_versioned_index: 588 + if publish_alias:
613 alias_publish = self._publish_alias( 589 alias_publish = self._publish_alias(
614 tenant_id=tenant_id, 590 tenant_id=tenant_id,
615 index_name=index_name, 591 index_name=index_name,
@@ -621,7 +597,7 @@ class SuggestionIndexBuilder: @@ -621,7 +597,7 @@ class SuggestionIndexBuilder:
621 "last_full_build_at": now_utc, 597 "last_full_build_at": now_utc,
622 "last_incremental_watermark": now_utc, 598 "last_incremental_watermark": now_utc,
623 } 599 }
624 - if publish_alias and use_versioned_index: 600 + if publish_alias:
625 meta_patch["active_index"] = index_name 601 meta_patch["active_index"] = index_name
626 meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) 602 meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id)
627 self._upsert_meta(tenant_id, meta_patch) 603 self._upsert_meta(tenant_id, meta_patch)
suggestion/service.py
@@ -7,11 +7,7 @@ import time @@ -7,11 +7,7 @@ import time
7 from typing import Any, Dict, List, Optional 7 from typing import Any, Dict, List, Optional
8 8
9 from config.tenant_config_loader import get_tenant_config_loader 9 from config.tenant_config_loader import get_tenant_config_loader
10 -from suggestion.builder import (  
11 - get_suggestion_alias_name,  
12 - get_suggestion_index_name,  
13 - get_suggestion_legacy_index_name,  
14 -) 10 +from suggestion.builder import get_suggestion_alias_name
15 from utils.es_client import ESClient 11 from utils.es_client import ESClient
16 12
17 logger = logging.getLogger(__name__) 13 logger = logging.getLogger(__name__)
@@ -40,16 +36,6 @@ class SuggestionService: @@ -40,16 +36,6 @@ class SuggestionService:
40 alias_name = get_suggestion_alias_name(tenant_id) 36 alias_name = get_suggestion_alias_name(tenant_id)
41 if self.es_client.alias_exists(alias_name): 37 if self.es_client.alias_exists(alias_name):
42 return alias_name 38 return alias_name
43 -  
44 - # Fallback for pre-Phase2 deployments  
45 - legacy = get_suggestion_legacy_index_name(tenant_id)  
46 - if self.es_client.index_exists(legacy):  
47 - return legacy  
48 -  
49 - # Last fallback: current naming helper  
50 - candidate = get_suggestion_index_name(tenant_id)  
51 - if self.es_client.index_exists(candidate):  
52 - return candidate  
53 return None 39 return None
54 40
55 def _completion_suggest( 41 def _completion_suggest(