Commit ff9efda02dde91b2f768206d8266fb3856f31884

Authored by tangwang
1 parent 200fdddf

suggest

1 1 # Elasticsearch Configuration
2   -ES_HOST=http://120.76.41.98:9200
3   -ES_USERNAME=essa
  2 +# ES_HOST=http://120.76.41.98:9200
  3 +# ES_USERNAME=essa
  4 +ES_HOST=http://localhost:9200
  5 +ES_USERNAME=saas
4 6 ES_PASSWORD=4hOaLaf41y2VuI8y
5 7  
6 8 # Redis Configuration (Optional) - AI 生产 10.200.16.14:6479
... ...
api/models.py
... ... @@ -197,14 +197,12 @@ class ImageSearchRequest(BaseModel):
197 197 class SearchSuggestRequest(BaseModel):
198 198 """搜索建议请求模型(框架,暂不实现)"""
199 199 query: str = Field(..., min_length=1, description="搜索查询字符串")
200   - size: int = Field(5, ge=1, le=20, description="返回建议数量")
  200 + size: int = Field(5, ge=1, le=50, description="返回建议数量")
201 201 types: List[Literal["query", "product", "category", "brand"]] = Field(
202 202 ["query"],
203 203 description="建议类型:query(查询建议), product(商品建议), category(类目建议), brand(品牌建议)"
204 204 )
205 205 language: Optional[str] = Field(None, description="请求语言(如 zh/en/ar/ru)")
206   - with_results: bool = Field(True, description="是否返回每条 suggestion 的直达商品结果")
207   - result_size: int = Field(3, ge=1, le=10, description="每条 suggestion 返回商品数量")
208 206  
209 207  
210 208 class FacetValue(BaseModel):
... ...
api/routes/search.py
... ... @@ -269,17 +269,15 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request):
269 269 @router.get("/suggestions", response_model=SearchSuggestResponse)
270 270 async def search_suggestions(
271 271 q: str = Query(..., min_length=1, description="搜索查询"),
272   - size: int = Query(10, ge=1, le=200, description="建议数量(1-200)"),
  272 + size: int = Query(10, ge=1, le=50, description="建议数量(1-50)"),
273 273 language: str = Query("en", description="请求语言,如 zh/en/ar/ru"),
274   - with_results: bool = Query(True, description="是否附带每条 suggestion 的直达商品"),
275   - result_size: int = Query(3, ge=1, le=10, description="每条 suggestion 直达商品数量"),
276 274 debug: bool = Query(False, description="是否返回调试信息"),
277 275 http_request: Request = None,
278 276 ):
279 277 """
280 278 获取搜索建议(自动补全)。
281 279  
282   - 获取搜索建议(自动补全,支持多语言与直达商品)。
  280 + 获取搜索建议(自动补全,支持多语言)。
283 281 """
284 282 # Extract tenant_id (required)
285 283 tenant_id = http_request.headers.get("X-Tenant-ID") if http_request else None
... ... @@ -305,8 +303,6 @@ async def search_suggestions(
305 303 query=q,
306 304 language=language,
307 305 size=size,
308   - with_results=with_results,
309   - result_size=result_size,
310 306 )
311 307 response = SearchSuggestResponse(
312 308 query=result["query"],
... ...
docs/Usage-Guide.md
... ... @@ -467,40 +467,41 @@ curl -X POST http://localhost:6002/search/image \
467 467  
468 468 ## 8. Suggestion 索引与接口使用
469 469  
470   -### 8.1 构建 Suggestion 索引(全量,多环境
  470 +### 8.1 构建 Suggestion 索引(Phase 2:全量 + 增量
471 471  
472 472 Suggestion 索引会从:
473 473  
474 474 - ES 商品索引:`title.{lang}`, `qanchors.{lang}`
475 475 - MySQL 日志表:`shoplazza_search_log.query`(含 `language`、`request_params`)
476 476  
477   -聚合生成 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}`。
  477 +聚合生成版本化索引并发布 alias:
478 478  
479   -在项目根目录执行(以 UAT 环境、tenant_id=162 为例):
  479 +- 物理索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_v<timestamp>`
  480 +- 读别名:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_current`
480 481  
481   -```bash
482   -# 1. 切换到 UAT 配置(包含 ES_INDEX_NAMESPACE=uat_)
483   -cp .env.uat .env
484   -
485   -# 2. 启动索引服务(如尚未启动)
486   -./scripts/start_indexer.sh
  482 +在项目根目录执行(以 tenant_id=162 为例):
487 483  
488   -# 3. 为指定租户全量重建 suggestion 索引(会删除旧索引)
489   -python main.py build-suggestions \
490   - --tenant-id 162 \
491   - --es-host http://localhost:9200 \
  484 +```bash
  485 +# 1) 全量重建(版本化 + alias 发布)
  486 +./scripts/build_suggestions.sh 162 \
  487 + --mode full \
492 488 --days 30 \
493   - --recreate
494   -```
  489 + --publish-alias \
  490 + --keep-versions 2
495 491  
496   -UAT 环境下,索引名为:`uat_search_suggestions_tenant_162`;
497   -prod 环境下(ES_INDEX_NAMESPACE 为空),索引名为:`search_suggestions_tenant_162`。
  492 +# 2) 增量更新(watermark + overlap)
  493 +./scripts/build_suggestions.sh 162 \
  494 + --mode incremental \
  495 + --overlap-minutes 30
  496 +```
498 497  
499 498 可选参数:
500 499  
501 500 - `--days`:回溯日志天数(默认 30)
502 501 - `--batch-size`:扫描商品索引的批大小(默认 500)
503 502 - `--min-query-len`:参与 suggestion 的最小查询长度(默认 1)
  503 +- `--overlap-minutes`:增量窗口重叠分钟数(默认 30)
  504 +- `--bootstrap-if-missing`:增量模式下若缺少 active index 则自动全量初始化(默认 true)
504 505  
505 506 > 建议在商品索引构建完成、日志正常写入一段时间后执行一次全量构建,然后按天/小时增加增量构建任务。
506 507  
... ... @@ -510,11 +511,11 @@ prod 环境下(ES_INDEX_NAMESPACE 为空),索引名为:`search_suggestio
510 511  
511 512 ```bash
512 513 # UAT 环境(本地或 UAT 集群)
513   -curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en&with_results=true" \
  514 +curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en" \
514 515 -H "X-Tenant-ID: 162"
515 516  
516 517 # PROD 环境(域名 / 端口按实际部署调整)
517   -curl "https://api.yourdomain.com/search/suggestions?q=iph&size=5&language=en&with_results=true" \
  518 +curl "https://api.yourdomain.com/search/suggestions?q=iph&size=5&language=en" \
518 519 -H "X-Tenant-ID: 162"
519 520 ```
520 521  
... ...
docs/搜索API对接指南.md
... ... @@ -136,7 +136,7 @@ curl -X POST &quot;http://43.166.252.75:6002/search/&quot; \
136 136 | 接口 | HTTP Method | Endpoint | 说明 |
137 137 |------|------|------|------|
138 138 | 搜索 | POST | `/search/` | 执行搜索查询 |
139   -| 搜索建议 | GET | `/search/suggestions` | 搜索建议(自动补全/热词,多语言 + 结果直达) |
  139 +| 搜索建议 | GET | `/search/suggestions` | 搜索建议(自动补全/热词,多语言) |
140 140 | 即时搜索 | GET | `/search/instant` | 即时搜索预留接口(当前返回 `501 Not Implemented`) |
141 141 | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 |
142 142 | 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引,仅推荐自测使用) |
... ... @@ -545,17 +545,15 @@ response = requests.post(url, headers=headers, json={&quot;query&quot;: &quot;芭比娃娃&quot;})
545 545 ### 3.7 搜索建议接口
546 546  
547 547 - **端点**: `GET /search/suggestions`
548   -- **描述**: 返回搜索建议(自动补全/热词),支持多语言与“结果直达”(每条 suggestion 附带商品列表)
  548 +- **描述**: 返回搜索建议(自动补全/热词),支持多语言
549 549  
550 550 #### 查询参数
551 551  
552 552 | 参数 | 类型 | 必填 | 默认值 | 描述 |
553 553 |------|------|------|--------|------|
554 554 | `q` | string | Y | - | 查询字符串(至少 1 个字符) |
555   -| `size` | integer | N | 10 | 返回建议数量(1-200) |
  555 +| `size` | integer | N | 10 | 返回建议数量(1-50) |
556 556 | `language` | string | N | `en` | 请求语言,如 `zh` / `en` / `ar` / `ru`,用于路由到对应语种 suggestion 索引 |
557   -| `with_results` | bool | N | `true` | 是否为每条 suggestion 返回商品列表(结果直达) |
558   -| `result_size` | integer | N | 3 | 每条 suggestion 返回的商品数量(1-10) |
559 557 | `debug` | bool | N | `false` | 是否开启调试(目前主要用于排查 suggestion 排序与语言解析) |
560 558  
561 559 > **租户标识**:同 [3.1](#31-接口信息),通过请求头 `X-Tenant-ID` 或 query 参数 `tenant_id` 传递。
... ... @@ -576,16 +574,7 @@ response = requests.post(url, headers=headers, json={&quot;query&quot;: &quot;芭比娃娃&quot;})
576 574 "sources": ["query_log", "qanchor"],
577 575 "lang_source": "log_field",
578 576 "lang_confidence": 1.0,
579   - "lang_conflict": false,
580   - "products": [
581   - {
582   - "spu_id": "12345",
583   - "title": "iPhone 15 Pro Max",
584   - "price": 999.0,
585   - "image_url": "https://example.com/image.jpg",
586   - "score": 3.21
587   - }
588   - ]
  577 + "lang_conflict": false
589 578 }
590 579 ],
591 580 "took_ms": 12
... ... @@ -595,7 +584,7 @@ response = requests.post(url, headers=headers, json={&quot;query&quot;: &quot;芭比娃娃&quot;})
595 584 #### 请求示例
596 585  
597 586 ```bash
598   -curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh&with_results=true" \
  587 +curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh" \
599 588 -H "X-Tenant-ID: 162"
600 589 ```
601 590  
... ...
docs/搜索API速查表.md
... ... @@ -282,7 +282,7 @@ POST /search/image
282 282 "size": 20
283 283 }
284 284  
285   -GET /search/suggestions?q=芭&size=5&language=zh&with_results=true
  285 +GET /search/suggestions?q=芭&size=5&language=zh
286 286  
287 287 GET /search/instant?q=玩具&size=5 # 当前返回 501 Not Implemented
288 288  
... ...
frontend/index.html
... ... @@ -284,7 +284,6 @@
284 284 url.searchParams.set('q', query);
285 285 url.searchParams.set('size', '40');
286 286 url.searchParams.set('language', getSelectedLang());
287   - url.searchParams.set('with_results', 'false');
288 287 // 同时通过 query 参数传 tenant_id,方便在代理层丢失 header 时仍能识别租户
289 288 url.searchParams.set('tenant_id', tenantId);
290 289  
... ...
... ... @@ -104,7 +104,7 @@ def cmd_search(args):
104 104  
105 105  
106 106 def cmd_build_suggestions(args):
107   - """Build suggestion index for a tenant."""
  107 + """Build/update suggestion index for a tenant."""
108 108 # Initialize ES client with optional authentication
109 109 es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username")
110 110 es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password")
... ... @@ -134,13 +134,27 @@ def cmd_build_suggestions(args):
134 134 password=db_pass,
135 135 )
136 136 builder = SuggestionIndexBuilder(es_client=es_client, db_engine=db_engine)
137   - result = builder.rebuild_tenant_index(
138   - tenant_id=args.tenant_id,
139   - days=args.days,
140   - recreate=args.recreate,
141   - batch_size=args.batch_size,
142   - min_query_len=args.min_query_len,
143   - )
  137 + if args.mode == "full":
  138 + result = builder.rebuild_tenant_index(
  139 + tenant_id=args.tenant_id,
  140 + days=args.days,
  141 + recreate=args.recreate,
  142 + batch_size=args.batch_size,
  143 + min_query_len=args.min_query_len,
  144 + publish_alias=args.publish_alias,
  145 + keep_versions=args.keep_versions,
  146 + use_versioned_index=not args.no_versioned_index,
  147 + )
  148 + else:
  149 + result = builder.incremental_update_tenant_index(
  150 + tenant_id=args.tenant_id,
  151 + min_query_len=args.min_query_len,
  152 + fallback_days=args.incremental_fallback_days,
  153 + overlap_minutes=args.overlap_minutes,
  154 + bootstrap_if_missing=args.bootstrap_if_missing,
  155 + bootstrap_days=args.bootstrap_days,
  156 + batch_size=args.batch_size,
  157 + )
144 158 print(json.dumps(result, indent=2, ensure_ascii=False))
145 159 return 0
146 160  
... ... @@ -158,7 +172,7 @@ def main():
158 172 serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)')
159 173 serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
160 174 serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
161   - serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  175 + serve_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host')
162 176 serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
163 177  
164 178 # Serve-indexer command
... ... @@ -168,14 +182,14 @@ def main():
168 182 )
169 183 serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
170 184 serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to')
171   - serve_indexer_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  185 + serve_indexer_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host')
172 186 serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
173 187  
174 188 # Search command
175 189 search_parser = subparsers.add_parser('search', help='Test search from command line')
176 190 search_parser.add_argument('query', help='Search query')
177 191 search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
178   - search_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  192 + search_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host')
179 193 search_parser.add_argument('--size', type=int, default=10, help='Number of results')
180 194 search_parser.add_argument('--no-translation', action='store_true', help='Disable translation')
181 195 search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings')
... ... @@ -184,17 +198,64 @@ def main():
184 198 # Suggestion build command
185 199 suggest_build_parser = subparsers.add_parser(
186 200 'build-suggestions',
187   - help='Build tenant suggestion index (full rebuild)'
  201 + help='Build tenant suggestion index (full/incremental)'
188 202 )
189 203 suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID')
190   - suggest_build_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  204 + suggest_build_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host')
  205 + suggest_build_parser.add_argument(
  206 + '--mode',
  207 + choices=['full', 'incremental'],
  208 + default='full',
  209 + help='Build mode: full rebuild or incremental update',
  210 + )
191 211 suggest_build_parser.add_argument('--days', type=int, default=360, help='Query log lookback days')
192 212 suggest_build_parser.add_argument('--batch-size', type=int, default=500, help='Product scan batch size')
193 213 suggest_build_parser.add_argument('--min-query-len', type=int, default=1, help='Minimum query length')
194 214 suggest_build_parser.add_argument(
  215 + '--publish-alias',
  216 + action=argparse.BooleanOptionalAction,
  217 + default=True,
  218 + help='For full mode: publish alias to new versioned index (default: true)',
  219 + )
  220 + suggest_build_parser.add_argument(
  221 + '--keep-versions',
  222 + type=int,
  223 + default=2,
  224 + help='For full mode: keep latest N versioned indices',
  225 + )
  226 + suggest_build_parser.add_argument(
  227 + '--no-versioned-index',
  228 + action='store_true',
  229 + help='For full mode: write to legacy concrete index (not recommended)',
  230 + )
  231 + suggest_build_parser.add_argument(
195 232 '--recreate',
196 233 action='store_true',
197   - help='Delete and recreate suggestion index before build'
  234 + help='For legacy concrete index mode: delete and recreate target index before build',
  235 + )
  236 + suggest_build_parser.add_argument(
  237 + '--incremental-fallback-days',
  238 + type=int,
  239 + default=7,
  240 + help='For incremental mode: default lookback days when no watermark',
  241 + )
  242 + suggest_build_parser.add_argument(
  243 + '--overlap-minutes',
  244 + type=int,
  245 + default=30,
  246 + help='For incremental mode: overlap window to avoid late-arrival misses',
  247 + )
  248 + suggest_build_parser.add_argument(
  249 + '--bootstrap-if-missing',
  250 + action=argparse.BooleanOptionalAction,
  251 + default=True,
  252 + help='For incremental mode: bootstrap with full build when active index is missing',
  253 + )
  254 + suggest_build_parser.add_argument(
  255 + '--bootstrap-days',
  256 + type=int,
  257 + default=30,
  258 + help='For incremental mode bootstrap full build: query log lookback days',
198 259 )
199 260  
200 261 args = parser.parse_args()
... ...
scripts/build_suggestions.sh
... ... @@ -3,14 +3,19 @@
3 3 # Convenience script to rebuild suggestion index for a tenant.
4 4 #
5 5 # Usage:
6   -# ./scripts/build_suggestions.sh <tenant_id> [--days 30] [--batch-size 500] [--min-query-len 1] [--es-host http://localhost:9200]
  6 +# # full rebuild + alias publish (default)
  7 +# ./scripts/build_suggestions.sh <tenant_id> --mode full --days 30
  8 +#
  9 +# # incremental update from watermark
  10 +# ./scripts/build_suggestions.sh <tenant_id> --mode incremental
7 11 #
8 12  
9 13 set -euo pipefail
10 14  
11 15 if [ $# -lt 1 ]; then
12 16 echo "Usage: $0 <tenant_id> [extra args...]"
13   - echo "Example: $0 162 --days 30 --recreate"
  17 + echo "Example (full): $0 162 --mode full --days 30 --publish-alias"
  18 + echo "Example (incremental): $0 162 --mode incremental --overlap-minutes 30"
14 19 exit 1
15 20 fi
16 21  
... ... @@ -21,7 +26,11 @@ ROOT_DIR=&quot;$(cd &quot;$(dirname &quot;${BASH_SOURCE[0]}&quot;)/..&quot; &amp;&amp; pwd)&quot;
21 26  
22 27 cd "$ROOT_DIR"
23 28  
24   -python main.py build-suggestions \
  29 +PY_BIN="${PYTHON_BIN:-$ROOT_DIR/.venv/bin/python}"
  30 +if [ ! -x "$PY_BIN" ]; then
  31 + PY_BIN="python3"
  32 +fi
  33 +
  34 +"$PY_BIN" main.py build-suggestions \
25 35 --tenant-id "$TENANT_ID" \
26 36 "$@"
27   -
... ...
scripts/rebuild_suggestions.sh 0 → 100755
... ... @@ -0,0 +1,80 @@
  1 +#!/usr/bin/env bash
  2 +set -euo pipefail
  3 +
  4 +if [ $# -lt 1 ]; then
  5 + echo "Usage: $0 <tenant_id> [sample_query] [sample_language]"
  6 + echo "Example: $0 162 shi en"
  7 + exit 1
  8 +fi
  9 +
  10 +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
  11 +TENANT_ID="$1"
  12 +SAMPLE_Q="${2:-shi}"
  13 +SAMPLE_LANG="${3:-en}"
  14 +API_BASE="${API_BASE_URL:-http://localhost:6002}"
  15 +
  16 +cd "$ROOT_DIR"
  17 +
  18 +PY_BIN="${PYTHON_BIN:-$ROOT_DIR/.venv/bin/python}"
  19 +if [ ! -x "$PY_BIN" ]; then
  20 + PY_BIN="python3"
  21 +fi
  22 +
  23 +if [ -z "${ES_HOST:-}" ]; then
  24 + ES_HOST="$("$PY_BIN" - <<'PY'
  25 +from dotenv import dotenv_values
  26 +print(dotenv_values('.env').get('ES_HOST') or 'http://localhost:9200')
  27 +PY
  28 +)"
  29 +fi
  30 +
  31 +if [ -z "${ES_USERNAME:-}" ] || [ -z "${ES_PASSWORD:-}" ]; then
  32 + readarray -t _ES_CREDS < <("$PY_BIN" - <<'PY'
  33 +from dotenv import dotenv_values
  34 +cfg = dotenv_values('.env')
  35 +print(cfg.get('ES_USERNAME') or '')
  36 +print(cfg.get('ES_PASSWORD') or '')
  37 +PY
  38 +)
  39 + ES_USERNAME="${ES_USERNAME:-${_ES_CREDS[0]}}"
  40 + ES_PASSWORD="${ES_PASSWORD:-${_ES_CREDS[1]}}"
  41 +fi
  42 +
  43 +if [ -n "${ES_USERNAME:-}" ] && [ -n "${ES_PASSWORD:-}" ]; then
  44 + AUTH=(-u "${ES_USERNAME}:${ES_PASSWORD}")
  45 +else
  46 + AUTH=()
  47 +fi
  48 +
  49 +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_${TENANT_ID}_current"
  50 +
  51 +echo "[1/4] Full rebuild tenant=${TENANT_ID} (versioned + alias publish)"
  52 +"$PY_BIN" main.py build-suggestions \
  53 + --tenant-id "$TENANT_ID" \
  54 + --es-host "$ES_HOST" \
  55 + --mode full \
  56 + --days 365 \
  57 + --batch-size 500 \
  58 + --publish-alias \
  59 + --keep-versions 2
  60 +
  61 +echo "[2/4] Incremental update tenant=${TENANT_ID}"
  62 +"$PY_BIN" main.py build-suggestions \
  63 + --tenant-id "$TENANT_ID" \
  64 + --es-host "$ES_HOST" \
  65 + --mode incremental \
  66 + --overlap-minutes 30
  67 +
  68 +echo "[3/4] ES count + sample"
  69 +curl -sS "${AUTH[@]}" "$ES_HOST/$ALIAS_NAME/_count?pretty"
  70 +echo
  71 +curl -sS "${AUTH[@]}" "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{
  72 + "size": 5,
  73 + "query": {"match_all": {}},
  74 + "_source": ["lang", "text", "rank_score", "sources", "query_count_30d"]
  75 +}'
  76 +echo
  77 +
  78 +echo "[4/4] API smoke test"
  79 +curl -sS "$API_BASE/search/suggestions?q=${SAMPLE_Q}&size=10&language=${SAMPLE_LANG}" -H "X-Tenant-ID: ${TENANT_ID}"
  80 +echo
... ...
suggestion/ARCHITECTURE_V2.md 0 → 100644
... ... @@ -0,0 +1,304 @@
  1 +# Suggestion 架构方案 V2(仅 Suggest,去除结果直达)
  2 +
  3 +## 0. 结论
  4 +
  5 +本方案将 Suggest 设计为**独立高性能检索系统**,只返回建议词,不再返回商品卡片,也不做历史兼容。
  6 +
  7 +- 只保留 `/search/suggestions` 的词级自动补全能力
  8 +- 完全移除 `with_results/result_size/products[]` 链路
  9 +- 多语言优先,支持高并发、低延迟、可持续演进
  10 +
  11 +---
  12 +
  13 +## 1. 当前实现的关键问题(基于现有代码审视)
  14 +
  15 +1. 在线链路曾包含“suggest -> 二次商品查询”,属于典型 N+1 放大,QPS 上升后延迟和 ES 负载都不稳定。
  16 +2. `builder.py` 全量构建使用“大量 in-memory 聚合 + fetchall”,大租户下内存风险高。
  17 +3. 查询参数上限过大(原 `size<=200`),不符合自动补全接口性能边界。
  18 +4. 文档与实现长期混合(README 仍包含结果直达),导致认知不一致。
  19 +5. 多语言归一化仍偏基础(仅 lower/空白折叠),对 Unicode、变音符、跨语系兼容不够。
  20 +
  21 +---
  22 +
  23 +## 2. 目标与 SLO
  24 +
  25 +### 2.1 业务目标
  26 +
  27 +- 输入时实时返回高相关建议词(query suggestion)
  28 +- 多语言稳定(至少覆盖租户配置 `index_languages`)
  29 +- 支持词级排序和运营治理(黑白名单、降噪、降权)
  30 +
  31 +### 2.2 性能目标(建议)
  32 +
  33 +- P50 < 10ms,P95 < 25ms,P99 < 50ms(ES 查询耗时,不含网关)
  34 +- 单集群支持高并发(千级 QPS 可横向扩展)
  35 +- 数据新鲜度:增量 5-15 分钟可见
  36 +
  37 +---
  38 +
  39 +## 3. 总体架构
  40 +
  41 +## 3.1 在线路径(单跳)
  42 +
  43 +Client -> API `/search/suggestions` -> ES `search_suggestions_v2` -> 返回 suggestions
  44 +
  45 +原则:
  46 +
  47 +- **单次 ES 查询完成主路径**(可选双召回融合,但仍在同一次 API 请求内完成)
  48 +- 不调用 `search_products`,不返回商品结果
  49 +- 通过 `routing=tenant_id` 避免跨分片 fan-out
  50 +
  51 +## 3.2 离线路径(构建)
  52 +
  53 +数据源:
  54 +
  55 +- 商品字段:`title.{lang}`、`qanchors.{lang}`
  56 +- 搜索日志:`shoplazza_search_log`(含 `language/request_params`)
  57 +- 行为信号(可选增强):点击、加购、下单
  58 +
  59 +产物:
  60 +
  61 +- Suggest 文档(`tenant_id + lang + text_norm` 唯一)
  62 +- completion + prefix 检索字段
  63 +- 排序特征(热度、近期度、质量分)
  64 +
  65 +发布方式:
  66 +
  67 +- 写入新物理索引(版本化)
  68 +- 原子切换 alias(零停机)
  69 +
  70 +---
  71 +
  72 +## 4. 索引设计(ES)
  73 +
  74 +## 4.1 索引组织
  75 +
  76 +推荐两级策略:
  77 +
  78 +1. 默认:环境级共享索引(降低海量租户 index 数量)
  79 +2. 大租户:可升级为租户独享索引(隔离资源)
  80 +
  81 +统一通过 alias 暴露:
  82 +
  83 +- `search_suggestions_v2_current`
  84 +
  85 +## 4.2 Mapping(核心字段)
  86 +
  87 +```json
  88 +{
  89 + "settings": {
  90 + "number_of_shards": 3,
  91 + "number_of_replicas": 1,
  92 + "refresh_interval": "30s"
  93 + },
  94 + "mappings": {
  95 + "properties": {
  96 + "tenant_id": { "type": "keyword" },
  97 + "lang": { "type": "keyword" },
  98 + "text": { "type": "keyword" },
  99 + "text_norm": { "type": "keyword" },
  100 + "status": { "type": "byte" },
  101 + "sources": { "type": "keyword" },
  102 +
  103 + "query_count_7d": { "type": "integer" },
  104 + "query_count_30d": { "type": "integer" },
  105 + "ctr_30d": { "type": "float" },
  106 + "order_rate_30d": { "type": "float" },
  107 + "rank_score": { "type": "float" },
  108 +
  109 + "suggest": {
  110 + "type": "completion",
  111 + "contexts": [
  112 + { "name": "tenant", "type": "category" },
  113 + { "name": "lang", "type": "category" }
  114 + ]
  115 + },
  116 +
  117 + "sat": {
  118 + "properties": {
  119 + "zh": { "type": "search_as_you_type", "analyzer": "index_ik" },
  120 + "en": { "type": "search_as_you_type", "analyzer": "english" },
  121 + "ar": { "type": "search_as_you_type", "analyzer": "arabic" }
  122 + }
  123 + },
  124 +
  125 + "updated_at": { "type": "date" }
  126 + }
  127 + }
  128 +}
  129 +```
  130 +
  131 +说明:
  132 +
  133 +- `completion` 负责极速前缀命中(主召回)
  134 +- `search_as_you_type` 负责多词前缀和召回兜底
  135 +- `contexts` 强制租户与语言隔离
  136 +
  137 +---
  138 +
  139 +## 5. 多语言策略
  140 +
  141 +1. 语言归属优先级:`log.language > request_params.language > 脚本识别 > tenant.primary_language`
  142 +2. 统一归一化:NFKC、大小写折叠、空白折叠、标点清洗
  143 +3. 分词器按语言配置:
  144 + - 中文:IK/ANSJ(与主索引保持一致)
  145 + - 拉丁语系:对应内置 analyzer
  146 + - 未覆盖语种:`standard + ICU folding` 兜底
  147 +4. 保证写入语言必须在租户 `index_languages` 内
  148 +
  149 +---
  150 +
  151 +## 6. 在线检索策略(高性能)
  152 +
  153 +## 6.1 双通道召回(推荐)
  154 +
  155 +1. 通道 A:`completion suggester`(prefix,skip_duplicates)
  156 +2. 通道 B:`multi_match(type=bool_prefix)` on `search_as_you_type`
  157 +3. 融合去重:按 `text_norm` 去重,按最终分排序截断
  158 +
  159 +## 6.2 查询约束
  160 +
  161 +- 默认 `size=10`,最大 `size=50`
  162 +- `track_total_hits=false`
  163 +- `_source` 仅返回必要字段(`text/lang/rank_score/sources`)
  164 +- `routing=tenant_id`
  165 +
  166 +## 6.3 打分建议
  167 +
  168 +```text
  169 +final_score =
  170 + es_score
  171 + + a1*log1p(query_count_30d)
  172 + + a2*log1p(query_count_7d)
  173 + + a3*ctr_30d
  174 + + a4*order_rate_30d
  175 + + a5*freshness_decay
  176 +```
  177 +
  178 +---
  179 +
  180 +## 7. 构建与发布
  181 +
  182 +## 7.1 构建模式
  183 +
  184 +- 每日全量:重建全量特征,清理脏词
  185 +- 小时级增量:只处理新日志窗口
  186 +
  187 +## 7.2 工程要求
  188 +
  189 +- 禁止 `fetchall` 全量入内存,改为流式读取(分页/游标)
  190 +- ES 扫描采用 `search_after` 流式聚合
  191 +- 批量写入采用 bulk(分块 + 重试 + 失败重放)
  192 +
  193 +## 7.3 发布策略
  194 +
  195 +1. `search_suggestions_v2_YYYYMMDDHHmm` 写入完成
  196 +2. 校验 count/抽样查询/核心词覆盖
  197 +3. alias 原子切换到新索引
  198 +4. 保留上一个版本用于快速回滚
  199 +
  200 +---
  201 +
  202 +## 8. API 契约(V2)
  203 +
  204 +请求:
  205 +
  206 +- `GET /search/suggestions`
  207 +- 参数:`q`、`language`、`size`
  208 +- Header:`X-Tenant-ID`
  209 +
  210 +响应:
  211 +
  212 +```json
  213 +{
  214 + "query": "iph",
  215 + "language": "en",
  216 + "resolved_language": "en",
  217 + "suggestions": [
  218 + {
  219 + "text": "iphone 15",
  220 + "lang": "en",
  221 + "score": 8.31,
  222 + "rank_score": 6.72,
  223 + "sources": ["query_log", "qanchor"]
  224 + }
  225 + ],
  226 + "took_ms": 12
  227 +}
  228 +```
  229 +
  230 +删除项(明确不支持):
  231 +
  232 +- `with_results`
  233 +- `result_size`
  234 +- `products[]`
  235 +
  236 +---
  237 +
  238 +## 9. 观测与治理
  239 +
  240 +核心监控:
  241 +
  242 +- QPS、P50/P95/P99、错误率
  243 +- 空结果率(按语言、按租户)
  244 +- suggestion 覆盖率(top query 是否命中)
  245 +- 语言冲突率(log vs request_params)
  246 +- 噪声词比例、黑名单命中率
  247 +
  248 +治理机制:
  249 +
  250 +- 黑名单:强制下线
  251 +- 白名单:强制保留并可加权
  252 +- 最小热度阈值:低频垃圾词过滤
  253 +- 时间衰减:过期词自动下沉
  254 +
  255 +---
  256 +
  257 +## 10. 与官方最佳实践对齐(ES)
  258 +
  259 +本方案直接采用以下官方建议:
  260 +
  261 +1. `completion` 适合高性能自动补全,支持 `skip_duplicates` 与上下文过滤。
  262 +2. `search_as_you_type + bool_prefix` 是官方推荐的 as-you-type 查询方式。
  263 +3. `edge_ngram` 仅用于索引时分词,查询时应用普通 analyzer(`search_analyzer`)。
  264 +4. 多语言场景使用 ICU Analysis 插件增强 Unicode 处理。
  265 +5. 通过 `routing` 将租户请求路由到单分片,降低 fan-out。
  266 +
  267 +---
  268 +
  269 +## 11. 分阶段落地
  270 +
  271 +1. Phase 1(本次):去除结果直达,稳定 Suggest 单能力
  272 +2. Phase 2:流式增量构建 + alias 原子发布
  273 +3. Phase 3:行为信号排序(CTR/CVR)+ 运营治理台
  274 +4. Phase 4:大租户独享索引自动升降级
  275 +
  276 +---
  277 +
  278 +## 12. Phase 2 落地命令(当前仓库)
  279 +
  280 +全量重建(版本化索引 + alias 发布):
  281 +
  282 +```bash
  283 +python main.py build-suggestions \
  284 + --tenant-id 162 \
  285 + --mode full \
  286 + --days 365 \
  287 + --publish-alias \
  288 + --keep-versions 2
  289 +```
  290 +
  291 +增量更新(基于 watermark):
  292 +
  293 +```bash
  294 +python main.py build-suggestions \
  295 + --tenant-id 162 \
  296 + --mode incremental \
  297 + --overlap-minutes 30
  298 +```
  299 +
  300 +一键脚本(全量 + 增量 + ES/API 验证):
  301 +
  302 +```bash
  303 +./scripts/rebuild_suggestions.sh 162
  304 +```
... ...
suggestion/README.md
1   -# Suggestion 设计文档
  1 +# Suggestion 模块说明(统一入口)
2 2  
3   -## 文档导航
  3 +本文档是 suggestion 模块的统一入口,遵循 `docs/DEVELOPER_GUIDE.md` 的“单一入口、避免分叉”原则。
4 4  
5   -- `README.md`(本文):完整方案设计(架构、索引、构建、查询、验证)
6   -- `RUNBOOK.md`:日常运行手册(如何构建、如何回归、如何发布)
7   -- `TROUBLESHOOTING.md`:故障排查手册(空结果、tenant 丢失、ES 401、版本未生效等)
  5 +## 1. 当前状态(Phase 2)
8 6  
9   -本文档定义 `search_suggestions` 独立索引方案,用于支持多语言自动补全(suggestion)与结果直达。
  7 +- 仅保留 Suggest 自动补全能力
  8 +- 不支持结果直达(`with_results` / `result_size` / `products[]` 已移除)
  9 +- 索引采用版本化发布:
  10 + - 物理索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_v<timestamp>`
  11 + - 读别名:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_current`
  12 +- 支持增量更新(watermark + overlap)
10 13  
11   -## 1. 背景与目标
  14 +## 2. 文档导航(唯一推荐顺序)
12 15  
13   -当前搜索系统已具备多语言商品索引(`title.{lang}`、`qanchors.{lang}`)与主搜索能力。为了实现输入中实时下拉 suggestion,需要新增一套面向“词”的能力。
  16 +1. `ARCHITECTURE_V2.md`:架构与设计原则
  17 +2. `RUNBOOK.md`:构建/发布/验证流程
  18 +3. `TROUBLESHOOTING.md`:常见问题排查
14 19  
15   -核心目标:
  20 +## 3. 命令入口
16 21  
17   -- 在不耦合主搜索链路的前提下,提供低延迟 suggestion(实时输入)。
18   -- 支持多语言,按请求语言路由到对应 suggestion 语种。
19   -- 支持“结果直达”:每条 suggestion 可附带候选商品列表(通过二次查询 `search_products` 完成)。
20   -- 支持后续词级排序演进(行为信号、运营控制、去噪治理)。
21   -
22   -非目标(当前阶段):
23   -
24   -- 不做个性化推荐(用户级 personalization)。
25   -- 不引入复杂在线学习排序服务。
26   -
27   -## 2. 总体架构
28   -
29   -采用双索引架构(支持多环境 namespace 前缀):
30   -
31   -- 商品索引:`{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`
32   -- 建议词索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}`
33   -
34   -在线查询主路径:
35   -
36   -1. 仅查询 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}` 得到 suggestion 列表。
37   -2. 对每条 suggestion 进行“结果直达”的二次查询(`msearch`)到 `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`:
38   - - 使用 suggestion 文本对 `title.{lang}` / `qanchors.{lang}` 执行 `term` / `match_phrase_prefix` 组合查询。
39   -3. 回填每条 suggestion 的商品卡片列表(例如每条 3~5 个)。
40   -
41   -## 3. API 设计
42   -
43   -建议保留并增强现有接口:`GET /search/suggestions`
44   -
45   -### 3.1 请求参数
46   -
47   -- `q` (string, required): 用户输入前缀
48   -- `size` (int, optional, default=10, max=20): 返回 suggestion 数量
49   -- `language` (string, required): 请求语言(如 `zh`, `en`, `ar`, `ru`)
50   -- `with_results` (bool, optional, default=true): 是否附带每条 suggestion 的直达商品
51   -- `result_size` (int, optional, default=3, max=10): 每条 suggestion 附带商品条数
52   -- `debug` (bool, optional, default=false): 是否返回调试信息
53   -
54   -Header:
55   -
56   -- `X-Tenant-ID` (required)
57   -
58   -### 3.2 响应结构
59   -
60   -```json
61   -{
62   - "query": "iph",
63   - "language": "en",
64   - "suggestions": [
65   - {
66   - "text": "iphone 15",
67   - "lang": "en",
68   - "score": 12.37,
69   - "sources": ["query_log", "qanchor"],
70   - "products": [
71   - {
72   - "spu_id": "12345",
73   - "title": "iPhone 15 Pro Max",
74   - "price": 999.0,
75   - "image_url": "https://..."
76   - }
77   - ]
78   - }
79   - ],
80   - "took_ms": 14,
81   - "debug_info": {}
82   -}
83   -```
84   -
85   -## 4. 索引设计:`search_suggestions_tenant_{tenant_id}`
86   -
87   -文档粒度:`tenant_id + lang + text_norm` 唯一一条文档。
88   -
89   -### 4.1 字段定义(建议)
90   -
91   -- `tenant_id` (`keyword`)
92   -- `lang` (`keyword`)
93   -- `text` (`keyword`):展示文本
94   -- `text_norm` (`keyword`):归一化文本(去重键)
95   -- `sources` (`keyword[]`):来源集合,取值:`title` / `qanchor` / `query_log`
96   -- `title_doc_count` (`integer`):来自 title 的命中文档数
97   -- `qanchor_doc_count` (`integer`):来自 qanchor 的命中文档数
98   -- `query_count_7d` (`integer`):7 天搜索词计数
99   -- `query_count_30d` (`integer`):30 天搜索词计数
100   -- `rank_score` (`float`):离线计算总分
101   -- `status` (`byte`):1=online, 0=offline
102   -- `updated_at` (`date`)
103   -
104   -用于召回:
105   -
106   -- `completion` (`object`):
107   - - `completion.{lang}`: `completion` 类型(按语言设置 analyzer)
108   -- `sat` (`object`):
109   - - `sat.{lang}`: `search_as_you_type`(增强多词前缀效果)
110   -
111   -可选字段(用于加速直达):
112   -
113   -- `top_spu_ids` (`keyword[]`):预计算商品候选 id
114   -
115   -### 4.2 Mapping 样例(简化)
116   -
117   -```json
118   -{
119   - "settings": {
120   - "number_of_shards": 1,
121   - "number_of_replicas": 0
122   - },
123   - "mappings": {
124   - "properties": {
125   - "tenant_id": { "type": "keyword" },
126   - "lang": { "type": "keyword" },
127   - "text": { "type": "keyword" },
128   - "text_norm": { "type": "keyword" },
129   - "sources": { "type": "keyword" },
130   - "title_doc_count": { "type": "integer" },
131   - "qanchor_doc_count": { "type": "integer" },
132   - "query_count_7d": { "type": "integer" },
133   - "query_count_30d": { "type": "integer" },
134   - "rank_score": { "type": "float" },
135   - "status": { "type": "byte" },
136   - "updated_at": { "type": "date" },
137   - "completion": {
138   - "properties": {
139   - "zh": { "type": "completion", "analyzer": "index_ansj", "search_analyzer": "query_ansj" },
140   - "en": { "type": "completion", "analyzer": "english" },
141   - "ar": { "type": "completion", "analyzer": "arabic" },
142   - "ru": { "type": "completion", "analyzer": "russian" }
143   - }
144   - },
145   - "sat": {
146   - "properties": {
147   - "zh": { "type": "search_as_you_type", "analyzer": "index_ansj" },
148   - "en": { "type": "search_as_you_type", "analyzer": "english" },
149   - "ar": { "type": "search_as_you_type", "analyzer": "arabic" },
150   - "ru": { "type": "search_as_you_type", "analyzer": "russian" }
151   - }
152   - },
153   - "top_spu_ids": { "type": "keyword" }
154   - }
155   - }
156   -}
157   -```
158   -
159   -说明:实际支持语种需与 `search_products` 已支持语种保持一致。
160   -
161   -## 5. 全量建索引逻辑(核心)
162   -
163   -全量程序职责:扫描商品 `title/qanchors` 与搜索日志 `query`,聚合后写入 `search_suggestions`。
164   -
165   -输入:
166   -
167   -- `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}` 文档
168   -- MySQL 表:`shoplazza_search_log`
169   -
170   -输出:
171   -
172   -- `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}` 全量文档
173   -
174   -### 5.1 流程
175   -
176   -1. 创建/重建 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}`。
177   -2. 遍历 `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`(`scroll` 或 `search_after`):
178   - - 提取每个商品的 `title.{lang}`、`qanchors.{lang}`。
179   - - 归一化文本(NFKC、trim、lower、空白折叠)。
180   - - 产出候选词并累加:
181   - - `title_doc_count += 1`
182   - - `qanchor_doc_count += 1`
183   - - `sources` 加来源。
184   -3. 读取日志:
185   - - SQL 拉取 `tenant_id` 下时间窗数据(如 30 天)。
186   - - 对每条 `query` 解析语言归属(优先 `shoplazza_search_log.language`,其次 `request_params.language`,见第 6 节)。
187   - - 累加 `query_count_7d` / `query_count_30d`,`sources` 加 `query_log`。
188   -4. 清洗与过滤:
189   - - 去空、去纯符号、长度阈值过滤。
190   - - 可选黑名单过滤(运营配置)。
191   -5. 计算 `rank_score`(见第 7 节)。
192   -6. 组装文档:
193   - - 写 `completion.{lang}` + `sat.{lang}`。
194   - - `_id = md5(tenant_id|lang|text_norm)`。
195   -7. 批量写入(bulk upsert)。
196   -
197   -### 5.2 伪代码
198   -
199   -```python
200   -for tenant_id in tenants:
201   - agg = {} # key: (lang, text_norm)
202   -
203   - for doc in scan_es_products(tenant_id):
204   - for lang in index_languages(tenant_id):
205   - add_from_title(agg, doc.title.get(lang), lang, doc.spu_id)
206   - add_from_qanchor(agg, doc.qanchors.get(lang), lang, doc.spu_id)
207   -
208   - for row in fetch_search_logs(tenant_id, days=30):
209   - lang, conf = resolve_query_lang(
210   - query=row.query,
211   - log_language=row.language,
212   - request_params_json=row.request_params,
213   - tenant_id=tenant_id
214   - )
215   - if not lang:
216   - continue
217   - add_from_query_log(agg, row.query, lang, row.create_time)
218   -
219   - docs = []
220   - for (lang, text_norm), item in agg.items():
221   - if not pass_filters(item):
222   - continue
223   - item.rank_score = compute_rank_score(item)
224   - docs.append(to_suggestion_doc(tenant_id, lang, item))
225   -
226   - bulk_upsert(index=f"{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}", docs=docs)
227   -```
228   -
229   -## 6. 日志语言解析策略(已新增 language 字段)
230   -
231   -现状:`shoplazza_search_log` 已新增 `language` 字段,且 `request_params`(JSON)中也包含 `language`。
232   -因此全量程序不再以“纯离线识别”为主,而是采用“日志显式语言优先”的三级策略。
233   -
234   -### 6.1 语言解析优先级
235   -
236   -1. **一级:`shoplazza_search_log.language`(最高优先级)**
237   - - 若值存在且合法,直接作为 query 归属语言。
238   -2. **二级:`request_params.language`(JSON 兜底)**
239   - - 当表字段为空/非法时,解析 `request_params` JSON 中的 `language`。
240   -3. **三级:离线识别(最后兜底)**
241   - - 仅在前两者都缺失时启用:
242   - - 脚本直判(CJK/Arabic/Cyrillic)
243   - - 轻量语言识别器(拉丁语)
244   -
245   -### 6.2 一致性校验(推荐)
246   -
247   -当 `shoplazza_search_log.language` 与 `request_params.language` 同时存在但不一致时:
248   -
249   -- 默认采用 `shoplazza_search_log.language`
250   -- 记录 `lang_conflict=true` 用于审计
251   -- 输出监控指标(冲突率)
252   -
253   -### 6.3 置信度与约束
254   -
255   -对于一级/二级来源:
256   -
257   -- `lang_confidence=1.0`
258   -- `lang_source=log_field` 或 `lang_source=request_params`
259   -
260   -对于三级离线识别:
261   -
262   -- `confidence >= 0.8`:写入 top1
263   -- `0.5 <= confidence < 0.8`:写入 top1(必要时兼容 top2 降权)
264   -- `< 0.5`:写入租户 `primary_language`(降权)
265   -
266   -统一约束:
267   -
268   -- 最终写入语言必须属于租户 `index_languages`
269   -
270   -建议额外存储:
271   -
272   -- `lang_confidence`(float)
273   -- `lang_source`(`log_field`/`request_params`/`script`/`model`/`default`)
274   -- `lang_conflict`(bool)
275   -
276   -便于后续质量审计与数据回溯。
277   -
278   -## 7. 排序分数设计(离线)
279   -
280   -建议采用可解释线性组合:
281   -
282   -```text
283   -rank_score =
284   - w1 * log1p(query_count_30d)
285   - + w2 * log1p(query_count_7d)
286   - + w3 * log1p(qanchor_doc_count)
287   - + w4 * log1p(title_doc_count)
288   - + w5 * business_bonus
289   -```
290   -
291   -推荐初始权重(可配置):
292   -
293   -- `w1=1.8`, `w2=1.2`, `w3=1.0`, `w4=0.6`, `w5=0.3`
294   -
295   -说明:
296   -
297   -- 搜索日志信号优先级最高(最接近真实用户意图)。
298   -- `qanchor` 高于 `title`(更偏 query 风格)。
299   -- `business_bonus` 可接入销量、库存可售率等轻量业务信号。
300   -
301   -## 8. 在线查询逻辑(suggestion)
302   -
303   -主路径只查 `search_suggestions`。
304   -
305   -### 8.1 Suggestion 查询 DSL(示例)
306   -
307   -```json
308   -{
309   - "size": 10,
310   - "query": {
311   - "function_score": {
312   - "query": {
313   - "bool": {
314   - "filter": [
315   - { "term": { "lang": "en" } },
316   - { "term": { "status": 1 } }
317   - ],
318   - "should": [
319   - {
320   - "multi_match": {
321   - "query": "iph",
322   - "type": "bool_prefix",
323   - "fields": [
324   - "sat.en",
325   - "sat.en._2gram",
326   - "sat.en._3gram"
327   - ]
328   - }
329   - }
330   - ],
331   - "minimum_should_match": 1
332   - }
333   - },
334   - "field_value_factor": {
335   - "field": "rank_score",
336   - "factor": 1.0,
337   - "modifier": "log1p",
338   - "missing": 0
339   - },
340   - "boost_mode": "sum",
341   - "score_mode": "sum"
342   - }
343   - },
344   - "_source": [
345   - "text",
346   - "lang",
347   - "rank_score",
348   - "sources",
349   - "top_spu_ids"
350   - ]
351   -}
352   -```
353   -
354   -可选:completion 方式(极低延迟)也可作为同接口内另一条召回通道,再与上面结果融合去重。
355   -
356   -## 9. 结果直达(二次查询)
357   -
358   -`with_results=true` 时,对每条 suggestion 的 `text` 做二次查询到 `search_products_tenant_{tenant_id}`。
359   -
360   -推荐使用 `msearch`,每条 suggestion 一个子查询:
361   -
362   -- `term`(精确)命中 `qanchors.{lang}.keyword`(若存在 keyword 子字段)
363   -- `match_phrase_prefix` 命中 `title.{lang}`
364   -- 可加权:`qanchors` 命中权重高于 `title`
365   -- 每条 suggestion 返回 `result_size` 条商品
366   -
367   -若未来希望进一步降在线复杂度,可改为离线写入 `top_spu_ids` 并在在线用 `mget` 回填。
368   -
369   -## 10. 数据治理与运营控制
370   -
371   -建议加入以下机制:
372   -
373   -- 黑名单词:人工屏蔽垃圾词、敏感词
374   -- 白名单词:活动词、品牌词强制保留
375   -- 最小阈值:低频词不过线(例如 `query_count_30d < 2` 且无 qanchor/title 支撑)
376   -- 去重规则:`text_norm` 维度强去重
377   -- 更新策略:每日全量 + 每小时增量(后续)
378   -
379   -## 11. 实施里程碑
380   -
381   -M1(快速上线):
382   -
383   -- 建 `search_suggestions` 索引
384   -- 全量程序:`title + qanchors + query_log`
385   -- `/search/suggestions` 仅查 suggestion,不带直达
386   -
387   -M2(增强):
388   -
389   -- 增加二次查询直达商品(`msearch`)
390   -- 引入语言置信度审计报表
391   -- 加黑白名单与去噪配置
392   -
393   -M3(优化):
394   -
395   -- completion + bool_prefix 双通道融合
396   -- 增量构建任务(小时级)
397   -- 排序参数在线配置化
398   -
399   -## 12. 关键风险与规避
400   -
401   -- 日志语言字段质量问题导致错写:通过 `log_field > request_params > model` 三级策略与冲突审计规避
402   -- 高频噪声词上浮:黑名单 + 最小阈值 + 分数截断
403   -- 直达二次查询成本上升:控制 `size/result_size`,优先 `msearch`
404   -- 多语言字段不一致:统一语言枚举与映射生成逻辑,避免手写散落
405   -
406   ----
407   -
408   -## 13. 实验与验证建议
409   -
410   -以租户 `tenant_id=171` 为例,推荐如下验证流程(其它租户 / 环境同理,可通过 ES_INDEX_NAMESPACE 区分 prod / uat / test):
411   -
412   -### 13.1 构建索引
413   -
414   -```bash
415   -./scripts/build_suggestions.sh 171 --days 30 --recreate
416   -```
417   -
418   -期望 CLI 输出类似(prod 环境,ES_INDEX_NAMESPACE 为空):
419   -
420   -```json
421   -{
422   - "tenant_id": "171",
423   - "index_name": "search_suggestions_tenant_171",
424   - "total_candidates": 61,
425   - "indexed_docs": 61,
426   - "bulk_result": {
427   - "success": 61,
428   - "failed": 0,
429   - "errors": []
430   - }
431   -}
432   -```
433   -
434   -含义:
435   -
436   -- `total_candidates`:聚合到的词候选总数(按 `(lang,text_norm)` 去重)
437   -- `indexed_docs`:实际写入 ES 的文档数(通常与 `total_candidates` 相同)
438   -- `bulk_result`:bulk 写入统计
439   -
440   -### 13.2 检查索引结构
441   -
442   -```bash
443   -# prod / 本地环境:ES_INDEX_NAMESPACE 为空
444   -curl "http://localhost:9200/search_suggestions_tenant_171/_mapping?pretty"
445   -curl "http://localhost:9200/search_suggestions_tenant_171/_count?pretty"
446   -curl "http://localhost:9200/search_suggestions_tenant_171/_search?size=5&pretty" -d '{
447   - "query": { "match_all": {} }
448   -}'
449   -
450   -# UAT 环境:假设 ES_INDEX_NAMESPACE=uat_
451   -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_mapping?pretty"
452   -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_count?pretty"
453   -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_search?size=5&pretty" -d '{
454   - "query": { "match_all": {} }
455   -}'
456   -```
457   -
458   -重点确认:
459   -
460   -- 是否存在 `lang/text/text_norm/sources/rank_score/completion/sat` 等字段。
461   -- 文档中 `lang` 是否只落在租户配置的 `index_languages` 范围内。
462   -- 常见 query(如你期望的热词)是否有对应文档,`query_count_*` 是否大致正确。
463   -
464   -### 13.3 通过 API 验证 suggestion 行为
465   -
466   -启动后端:
467   -
468   -```bash
469   -python main.py serve --es-host http://localhost:9200 --port 6002
470   -```
471   -
472   -示例调用(中文):
  22 +- 全量或增量构建:
473 23  
474 24 ```bash
475   -curl "http://localhost:6002/search/suggestions?q=玩具&size=5&language=zh&with_results=true" \
476   - -H "X-Tenant-ID: 171"
  25 +./scripts/build_suggestions.sh <tenant_id> --mode full
  26 +./scripts/build_suggestions.sh <tenant_id> --mode incremental
477 27 ```
478 28  
479   -示例调用(英文)
  29 +- 一键重建 + 验证
480 30  
481 31 ```bash
482   -curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en&with_results=true" \
483   - -H "X-Tenant-ID: 171"
  32 +./scripts/rebuild_suggestions.sh <tenant_id>
484 33 ```
485 34  
486   -预期:
487   -
488   -- `resolved_language` 与传入 `language` 一致或回落到租户主语言。
489   -- 返回若干 `suggestions[]`,每条包含:
490   - - `text/lang/score/rank_score/sources`
491   - - `products[]` 为直达商品(数量由 `result_size` 控制)。
492   -
493   -如需进一步排查,可对比:
494   -
495   -- 某个 suggestion 的 `text` 与 `shoplazza_search_log.query` 的出现频次。
496   -- 该 suggestion 的 `products` 是否与主搜索接口 `POST /search/` 对同 query 的 topN 结果大体一致。
497   -
498   -### 13.4 语言归属与多语言检查
499   -
500   -挑选典型场景:
  35 +## 4. API 约定(简版)
501 36  
502   -- 纯中文 query(如商品中文标题)。
503   -- 纯英文 query(如品牌/型号)。
504   -- 混合或无明显语言的 query。
  37 +- 端点:`GET /search/suggestions`
  38 +- 参数:`q`, `size`, `language`
  39 +- Header:`X-Tenant-ID`
505 40  
506   -验证点:
507   -
508   -- 文档 `lang` 与期望语言是否匹配。
509   -- `lang_source` 是否按优先级反映来源:
510   - - `log_field` > `request_params` > `script/model/default`
511   -- 如存在 `lang_conflict=true` 的案例,采样检查日志中 `language` 与 `request_params.language` 是否存在冲突。
512   -
513   -## 14. 自动化测试建议
514   -
515   -已提供基础单元测试(见 `tests/test_suggestions.py`):
516   -
517   -- 语言解析逻辑:
518   - - `test_resolve_query_language_prefers_log_field`
519   - - `test_resolve_query_language_uses_request_params_when_log_missing`
520   - - `test_resolve_query_language_fallback_to_primary`
521   -- 在线查询逻辑:
522   - - `test_suggestion_service_basic_flow`:使用 `FakeESClient` 验证 suggestion + 结果直达商品整体流程。
523   -
524   -推荐在本地环境中执行:
  41 +示例:
525 42  
526 43 ```bash
527   -pytest tests/test_suggestions.py -q
  44 +curl "http://localhost:6002/search/suggestions?q=shi&size=10&language=en" \
  45 + -H "X-Tenant-ID: 162"
528 46 ```
529   -
530   -后续可根据业务需要补充:
531   -
532   -- 排序正确性测试(构造不同 `query_count_*`、`title/qanchor_doc_count`)。
533   -- 多语言覆盖测试(zh/en/ar/ru 等,结合租户 `index_languages`)。
534   -- 简单性能回归(单次查询时延、QPS 与 P95/P99 录制)。
535   -
536   -本设计优先保证可落地与可演进:先以独立 suggestion 索引跑通主能力,再逐步增强排序与在线性能。
... ...
suggestion/RUNBOOK.md
1   -# Suggestion 运行手册(Runbook
  1 +# Suggestion 运行手册(Phase 2
2 2  
3   -本文档面向研发/测试/运维,提供 suggestion 功能的标准操作流程。
  3 +本文档面向研发/测试/运维,提供 Suggestion 功能在 Phase 2 的标准操作流程。
4 4  
5 5 ## 1. 适用范围
6 6  
7   -- Suggestion 索引构建:`search_suggestions_tenant_{tenant_id}`
  7 +- Suggestion 全量构建(版本化索引 + alias 原子发布)
  8 +- Suggestion 增量更新(watermark)
8 9 - Suggestion 查询接口:`GET /search/suggestions`
9   -- 前端自动补全(`frontend/index.html`)联调
10 10  
11   -## 2. 依赖前置
  11 +## 2. 前置依赖
12 12  
13 13 确保以下服务和配置可用:
14 14  
15 15 - Elasticsearch(开启鉴权时需提供账号密码)
16   -- MySQL(表 `shoplazza_search_log` 可访问)
17   -- API 服务(端口默认 6002)
  16 +- MySQL(可访问 `shoplazza_search_log`)
  17 +- API 服务(默认 `6002`)
18 18  
19 19 建议环境变量:
20 20  
... ... @@ -29,108 +29,103 @@ DB_USERNAME=...
29 29 DB_PASSWORD=...
30 30 ```
31 31  
32   -## 3. 全量构建流程
  32 +## 3. 全量构建(推荐发布流程)
33 33  
34   -### 3.1 构建指定租户 suggestion 索引
  34 +### 3.1 执行
35 35  
36 36 ```bash
37   -./scripts/build_suggestions.sh 171 --days 365 --recreate
  37 +./scripts/build_suggestions.sh 162 \
  38 + --mode full \
  39 + --days 365 \
  40 + --publish-alias \
  41 + --keep-versions 2
38 42 ```
39 43  
40   -说明:
41   -
42   -- `--days`:日志回溯窗口
43   -- `--recreate`:删除旧索引并重建
44   -
45 44 ### 3.2 预期输出
46 45  
47   -示例:
48   -
49   -```json
50   -{
51   - "tenant_id": "171",
52   - "index_name": "search_suggestions_tenant_171",
53   - "total_candidates": 336,
54   - "indexed_docs": 336,
55   - "bulk_result": {
56   - "success": 336,
57   - "failed": 0,
58   - "errors": []
59   - }
60   -}
61   -```
62   -
63   -判定标准:
  46 +输出包含关键字段:
64 47  
65   -- `indexed_docs > 0`
66   -- `bulk_result.failed = 0`
  48 +- `mode=full`
  49 +- `index_name=..._vYYYYMMDDHHMMSS`
  50 +- `alias_published=true`
  51 +- `alias_publish.current_index` 指向新索引
  52 +- `bulk_result.failed=0`
67 53  
68   -## 4. ES 验证步骤
  54 +## 4. 增量更新(watermark)
69 55  
70   -> 若 ES 开启鉴权,请使用 `-u "$ES_USERNAME:$ES_PASSWORD"`。
  56 +### 4.1 执行
71 57  
72 58 ```bash
73   -curl -u "$ES_USERNAME:$ES_PASSWORD" \
74   - "$ES_HOST/search_suggestions_tenant_171/_count?pretty"
75   -
76   -curl -u "$ES_USERNAME:$ES_PASSWORD" \
77   - "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty"
78   -
79   -curl -u "$ES_USERNAME:$ES_PASSWORD" \
80   - "$ES_HOST/search_suggestions_tenant_171/_search?pretty" -d '{
81   - "size": 10,
82   - "query": {"match_all": {}},
83   - "_source": ["lang","text","sources","query_count_30d","rank_score"]
84   - }'
  59 +./scripts/build_suggestions.sh 162 \
  60 + --mode incremental \
  61 + --overlap-minutes 30
85 62 ```
86 63  
87   -重点检查:
  64 +### 4.2 预期输出
  65 +
  66 +输出包含关键字段:
88 67  
89   -- 字段是否齐全(`lang/text/sat/completion/rank_score`)
90   -- 文档是否覆盖预期语种(如 `zh/en`)
  68 +- `mode=incremental`
  69 +- `target_index`(当前 alias 对应索引)
  70 +- `query_window.since/until`
  71 +- `updated_terms`
  72 +- `bulk_result.failed=0`
91 73  
92   -## 5. API 回归步骤
  74 +## 5. ES 验证步骤
93 75  
94   -### 5.1 启动后端
  76 +> 若 ES 开启鉴权,请附带 `-u "$ES_USERNAME:$ES_PASSWORD"`。
95 77  
96 78 ```bash
97   -bash scripts/start_backend.sh
  79 +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current"
  80 +
  81 +curl "$ES_HOST/$ALIAS_NAME/_count?pretty"
  82 +
  83 +curl "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{
  84 + "size": 10,
  85 + "query": {"match_all": {}},
  86 + "_source": ["lang", "text", "rank_score", "sources", "query_count_30d"]
  87 +}'
98 88 ```
99 89  
100   -### 5.2 调用 suggestion 接口
  90 +重点检查:
  91 +
  92 +- alias 是否可查(说明发布成功)
  93 +- 文档数 > 0
  94 +- 关键字段存在:`lang/text/rank_score/completion/sat`
  95 +
  96 +## 6. API 回归步骤
101 97  
102 98 ```bash
103   -curl "http://localhost:6002/search/suggestions?q=shirt&size=5&language=en&with_results=false" \
104   - -H "X-Tenant-ID: 171"
  99 +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en" \
  100 + -H "X-Tenant-ID: 162"
105 101  
106   -curl "http://localhost:6002/search/suggestions?q=2025&size=5&language=zh&with_results=false" \
107   - -H "X-Tenant-ID: 171"
  102 +curl "http://localhost:6002/search/suggestions?q=玩具&size=10&language=zh" \
  103 + -H "X-Tenant-ID: 162"
108 104 ```
109 105  
110 106 通过标准:
111 107  
112 108 - 接口返回 `200`
113   -- `resolved_language` 合理
114   -- `suggestions` 非空(针对已知存在的 query)
  109 +- `suggestions` 非空(针对已知存在 query)
  110 +- `took_ms` 合理
115 111  
116   -## 6. 前端联调步骤
  112 +## 7. 一键验证脚本
117 113  
118   -1. 打开 `http://localhost:6002/`
119   -2. 选择租户(例如 `171`)
120   -3. 输入已知前缀词(如 `shirt` / `Ekouaer` / `2025`)
121   -4. 观察下拉 suggestion 是否出现
122   -
123   -注意:
  114 +```bash
  115 +./scripts/rebuild_suggestions.sh 162
  116 +```
124 117  
125   -- 前端已同时透传:
126   - - Header:`X-Tenant-ID`
127   - - Query:`tenant_id`
  118 +该脚本执行:
128 119  
129   -## 7. 发布检查清单
  120 +1. 全量重建并发布 alias
  121 +2. 增量更新
  122 +3. ES `_count` 与样例 `_search`
  123 +4. API `/search/suggestions` 冒烟请求
130 124  
131   -- [ ] 全量构建输出 `failed=0`
132   -- [ ] ES `_count` 与 `indexed_docs` 一致
133   -- [ ] 关键 query(中/英)接口有返回
134   -- [ ] 前端下拉正常
135   -- [ ] 文档已更新(`README.md` / 本 Runbook / API 指南)
  125 +## 8. 发布检查清单
136 126  
  127 +- [ ] 全量构建 `bulk_result.failed=0`
  128 +- [ ] alias 指向新版本索引
  129 +- [ ] 增量更新成功(`mode=incremental`)
  130 +- [ ] API 冒烟通过
  131 +- [ ] 文档与脚本已同步
... ...
suggestion/TROUBLESHOOTING.md
1   -# Suggestion 故障排查手册
  1 +# Suggestion 故障排查手册(Phase 2)
2 2  
3   -本文档汇总 suggestion 常见问题与定位步骤。
4   -
5   -## 1. `suggestions` 总是空数组
6   -
7   -### 现象
8   -
9   -```json
10   -{"query":"shirt","language":"en","resolved_language":"en","suggestions":[],"took_ms":0}
11   -```
  3 +## 1. `/search/suggestions` 总是空数组
12 4  
13 5 ### 排查步骤
14 6  
15   -1. 确认索引存在且有数据:
  7 +1. 检查 alias 是否存在并有数据:
16 8  
17 9 ```bash
18   -curl -u "$ES_USERNAME:$ES_PASSWORD" \
19   - "$ES_HOST/search_suggestions_tenant_171/_count?pretty"
  10 +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current"
  11 +curl "$ES_HOST/$ALIAS_NAME/_count?pretty"
20 12 ```
21 13  
22   -2. 直接查 suggestion 索引样本
  14 +2. 直接抽样查看
23 15  
24 16 ```bash
25   -curl -u "$ES_USERNAME:$ES_PASSWORD" \
26   - "$ES_HOST/search_suggestions_tenant_171/_search?pretty" -d '{
27   - "size": 20,
28   - "query": {"match_all": {}},
29   - "_source": ["lang","text","rank_score"]
30   - }'
  17 +curl "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{
  18 + "size": 20,
  19 + "query": {"match_all": {}},
  20 + "_source": ["lang", "text", "rank_score", "query_count_30d"]
  21 +}'
31 22 ```
32 23  
33   -3. 确认请求语种是否匹配(`language=en` 时,索引里应有 `lang=en` 文档)。
34   -
35   -4. 检查服务版本是否为最新(重启后端):
  24 +3. 确认请求租户和语言:
36 25  
37 26 ```bash
38   -bash scripts/stop.sh
39   -bash scripts/start_backend.sh
  27 +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en" \
  28 + -H "X-Tenant-ID: 162"
40 29 ```
41 30  
42   -### 已修复的历史问题
43   -
44   -- **重复传 `size` 导致 ES 查询异常并被吞掉**:
45   - - 症状:日志里出现 `Received multiple values for 'size'`
46   - - 结果:接口返回空 hits(看起来像“无数据”)
47   - - 处理:确保 query body 不再携带 `size`,仅通过 client 参数传 `size`
48   -
49 31 ## 2. 报错:`tenant_id is required`
50 32  
51   -### 现象
52   -
53   -```json
54   -{
55   - "error": "tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'",
56   - "status_code": 400
57   -}
58   -```
59   -
60   -### 原因
61   -
62   -- 请求缺少 `X-Tenant-ID`,且 URL 没有 `tenant_id`。
63   -
64   -### 处理
65   -
66   -- API 调用至少满足其一:
67   - - Header:`X-Tenant-ID: 171`
68   - - Query:`tenant_id=171`
  33 +请求缺少 `X-Tenant-ID`(或 query `tenant_id`)。
69 34  
70 35 示例:
71 36  
72 37 ```bash
73   -curl "http://localhost:6002/search/suggestions?q=shirt&size=5&language=en&with_results=false&tenant_id=171"
  38 +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en&tenant_id=162"
74 39 ```
75 40  
76   -## 3. ES 401:`missing authentication credentials`
  41 +## 3. 增量更新没有写入(`updated_terms=0`)
77 42  
78   -### 现象
  43 +### 常见原因
79 44  
80   -```json
81   -{
82   - "type":"security_exception",
83   - "reason":"missing authentication credentials ..."
84   -}
85   -```
86   -
87   -### 原因
88   -
89   -- ES 开启了安全认证,curl/脚本未带凭证。
  45 +- watermark 时间窗内没有新日志
  46 +- `overlap_minutes` 太小
  47 +- `min_query_len` 过滤过严
90 48  
91 49 ### 处理
92 50  
93 51 ```bash
94   -curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty"
  52 +./scripts/build_suggestions.sh 162 --mode incremental --overlap-minutes 60
95 53 ```
96 54  
97   -或使用 API Key:
  55 +并在 MySQL 中确认窗口内日志存在。
98 56  
99   -```bash
100   -curl -H "Authorization: ApiKey <base64_key>" "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty"
101   -```
102   -
103   -## 4. 构建脚本报 `Cannot connect to Elasticsearch`
104   -
105   -### 原因
106   -
107   -- ES 地址不对,或账号密码未配置,或网络不可达。
  57 +## 4. alias 未切到新索引
108 58  
109 59 ### 检查
110 60  
111 61 ```bash
112   -echo "$ES_HOST"
113   -echo "$ES_USERNAME"
114   -curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST"
  62 +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current"
  63 +curl "$ES_HOST/_alias/$ALIAS_NAME?pretty"
115 64 ```
116 65  
117   -## 5. 前端请求未携带租户参数
118   -
119   -### 现象
120   -
121   -- Network 中请求 URL 无 `tenant_id`
122   -- Header 里无 `X-Tenant-ID`
123   -
124 66 ### 处理
125 67  
126   -- 确认前端最新代码已生效(清缓存后强刷)。
127   -- 前端应同时透传:
128   - - `X-Tenant-ID`
129   - - `tenant_id` query 参数(兜底,避免代理丢 header)
130   -
131   -## 6. 关键 query(如 `shirt`)没有被索引
  68 +重新执行全量发布:
132 69  
133   -### 检查路径
  70 +```bash
  71 +./scripts/build_suggestions.sh 162 --mode full --publish-alias --keep-versions 2
  72 +```
134 73  
135   -1. MySQL 里确认日志存在并在回溯窗口内:
  74 +## 5. ES 401:`missing authentication credentials`
136 75  
137   -```sql
138   -SELECT query, language, create_time
139   -FROM shoplazza_search_log
140   -WHERE tenant_id = 171
141   - AND query = 'shirt'
142   -ORDER BY create_time DESC
143   -LIMIT 20;
144   -```
  76 +ES 开启鉴权时,所有 curl 都需要 `-u "$ES_USERNAME:$ES_PASSWORD"` 或 API Key。
145 77  
146   -2. 构建命令是否使用足够大的 `--days`(例如 365)。
147   -3. 检查 query 是否被清洗规则过滤(空白/符号/过长等)。
  78 +## 6. 构建脚本报 `Cannot connect to Elasticsearch`
148 79  
149   -## 7. `Invalid HTTP request received.`
  80 +检查 `ES_HOST`、账号密码、网络连通性:
150 81  
151   -### 原因
  82 +```bash
  83 +echo "$ES_HOST"
  84 +curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST"
  85 +```
152 86  
153   -- 6002 端口上跑的进程不是当前 FastAPI 服务,或请求协议与服务不匹配。
  87 +## 7. 首次增量执行失败:找不到 active index
154 88  
155   -### 处理
  89 +说明该租户尚未完成全量构建。可直接启用 bootstrap(默认开启):
156 90  
157 91 ```bash
158   -bash scripts/stop.sh
159   -bash scripts/start_backend.sh
160   -curl "http://localhost:6002/health"
  92 +./scripts/build_suggestions.sh 162 --mode incremental --bootstrap-if-missing
161 93 ```
162 94  
163   -若 `/health` 正常,再测试 `/search/suggestions`。
164   -
  95 +或先执行一次全量。
... ...
suggestion/builder.py
1 1 """
2   -Full suggestion index builder.
  2 +Suggestion index builder (Phase 2).
3 3  
4   -Build data from:
5   -- ES product index fields: title.{lang}, qanchors.{lang}
6   -- MySQL search logs: shoplazza_search_log.query (+ language metadata)
  4 +Capabilities:
  5 +- Full rebuild to versioned index
  6 +- Atomic alias publish
  7 +- Incremental update from query logs with watermark
7 8 """
8 9  
9 10 import json
10 11 import logging
11 12 import math
12 13 import re
  14 +import unicodedata
13 15 from dataclasses import dataclass, field
14 16 from datetime import datetime, timedelta, timezone
15   -from typing import Any, Dict, List, Optional, Tuple
  17 +from typing import Any, Dict, Iterator, List, Optional, Tuple
16 18  
17 19 from sqlalchemy import text
18 20  
  21 +from config.env_config import ES_INDEX_NAMESPACE
19 22 from config.tenant_config_loader import get_tenant_config_loader
20   -from utils.es_client import ESClient
21 23 from suggestion.mapping import build_suggestion_mapping
22   -from config.env_config import ES_INDEX_NAMESPACE
  24 +from utils.es_client import ESClient
23 25  
24 26 logger = logging.getLogger(__name__)
25 27  
26 28  
  29 +def _index_prefix() -> str:
  30 + return ES_INDEX_NAMESPACE or ""
  31 +
  32 +
  33 +def get_suggestion_legacy_index_name(tenant_id: str) -> str:
  34 + """Legacy concrete index name (Phase1 compatibility)."""
  35 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}"
  36 +
  37 +
  38 +def get_suggestion_alias_name(tenant_id: str) -> str:
  39 + """Read alias for suggestion index (Phase2 default search target)."""
  40 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current"
  41 +
  42 +
  43 +def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str:
  44 + """Versioned suggestion index name."""
  45 + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S")
  46 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}"
  47 +
  48 +
  49 +def get_suggestion_versioned_index_pattern(tenant_id: str) -> str:
  50 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v*"
  51 +
  52 +
  53 +def get_suggestion_meta_index_name() -> str:
  54 + return f"{_index_prefix()}search_suggestions_meta"
  55 +
  56 +
27 57 def get_suggestion_index_name(tenant_id: str) -> str:
28 58 """
29   - 生成 suggestion 索引名称。
30   -
31   - 命名规则:
32   - {ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}
  59 + Search target for suggestion query.
33 60  
34   - 通过 ES_INDEX_NAMESPACE 统一区分 prod/uat/test 等环境。
  61 + Phase2 uses alias by default.
35 62 """
36   - prefix = ES_INDEX_NAMESPACE or ""
37   - return f"{prefix}search_suggestions_tenant_{tenant_id}"
  63 + return get_suggestion_alias_name(tenant_id)
38 64  
39 65  
40 66 @dataclass
... ... @@ -50,17 +76,13 @@ class SuggestionCandidate:
50 76 lang_confidence: float = 1.0
51 77 lang_source: str = "default"
52 78 lang_conflict: bool = False
53   - top_spu_scores: Dict[str, float] = field(default_factory=dict)
54 79  
55   - def add_product(self, source: str, spu_id: str, score: float) -> None:
  80 + def add_product(self, source: str, spu_id: str) -> None:
56 81 self.sources.add(source)
57 82 if source == "title":
58 83 self.title_spu_ids.add(spu_id)
59 84 elif source == "qanchor":
60 85 self.qanchor_spu_ids.add(spu_id)
61   - prev = self.top_spu_scores.get(spu_id)
62   - if prev is None or score > prev:
63   - self.top_spu_scores[spu_id] = score
64 86  
65 87 def add_query_log(self, is_7d: bool) -> None:
66 88 self.sources.add("query_log")
... ... @@ -69,16 +91,39 @@ class SuggestionCandidate:
69 91 self.query_count_7d += 1
70 92  
71 93  
  94 +@dataclass
  95 +class QueryDelta:
  96 + tenant_id: str
  97 + lang: str
  98 + text: str
  99 + text_norm: str
  100 + delta_7d: int = 0
  101 + delta_30d: int = 0
  102 + lang_confidence: float = 1.0
  103 + lang_source: str = "default"
  104 + lang_conflict: bool = False
  105 +
  106 +
72 107 class SuggestionIndexBuilder:
73   - """Build and rebuild suggestion index."""
  108 + """Build and update suggestion index."""
74 109  
75 110 def __init__(self, es_client: ESClient, db_engine: Any):
76 111 self.es_client = es_client
77 112 self.db_engine = db_engine
78 113  
79 114 @staticmethod
  115 + def _to_utc(dt: Any) -> Optional[datetime]:
  116 + if dt is None:
  117 + return None
  118 + if isinstance(dt, datetime):
  119 + if dt.tzinfo is None:
  120 + return dt.replace(tzinfo=timezone.utc)
  121 + return dt.astimezone(timezone.utc)
  122 + return None
  123 +
  124 + @staticmethod
80 125 def _normalize_text(value: str) -> str:
81   - text_value = (value or "").strip().lower()
  126 + text_value = unicodedata.normalize("NFKC", (value or "")).strip().lower()
82 127 text_value = re.sub(r"\s+", " ", text_value)
83 128 return text_value
84 129  
... ... @@ -114,7 +159,6 @@ class SuggestionIndexBuilder:
114 159 token = str(lang).strip().lower().replace("-", "_")
115 160 if not token:
116 161 return None
117   - # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br
118 162 if token in {"zh_tw", "pt_br"}:
119 163 return token
120 164 return token.split("_")[0]
... ... @@ -138,19 +182,14 @@ class SuggestionIndexBuilder:
138 182  
139 183 @staticmethod
140 184 def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]:
141   - # CJK unified
142 185 if re.search(r"[\u4e00-\u9fff]", query):
143 186 return "zh", 0.98, "script"
144   - # Arabic
145 187 if re.search(r"[\u0600-\u06FF]", query):
146 188 return "ar", 0.98, "script"
147   - # Cyrillic
148 189 if re.search(r"[\u0400-\u04FF]", query):
149 190 return "ru", 0.95, "script"
150   - # Greek
151 191 if re.search(r"[\u0370-\u03FF]", query):
152 192 return "el", 0.95, "script"
153   - # Latin fallback
154 193 if re.search(r"[a-zA-Z]", query):
155 194 return "en", 0.55, "model"
156 195 return None, 0.0, "default"
... ... @@ -186,32 +225,34 @@ class SuggestionIndexBuilder:
186 225 return primary, 0.3, "default", conflict
187 226  
188 227 @staticmethod
189   - def _score_product_hit(source: Dict[str, Any]) -> float:
190   - sales = float(source.get("sales") or 0.0)
191   - inventory = float(source.get("total_inventory") or 0.0)
192   - return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4
193   -
194   - @staticmethod
195   - def _compute_rank_score(c: SuggestionCandidate) -> float:
  228 + def _compute_rank_score(query_count_30d: int, query_count_7d: int, qanchor_doc_count: int, title_doc_count: int) -> float:
196 229 return (
197   - 1.8 * math.log1p(c.query_count_30d)
198   - + 1.2 * math.log1p(c.query_count_7d)
199   - + 1.0 * math.log1p(len(c.qanchor_spu_ids))
200   - + 0.6 * math.log1p(len(c.title_spu_ids))
  230 + 1.8 * math.log1p(max(query_count_30d, 0))
  231 + + 1.2 * math.log1p(max(query_count_7d, 0))
  232 + + 1.0 * math.log1p(max(qanchor_doc_count, 0))
  233 + + 0.6 * math.log1p(max(title_doc_count, 0))
201 234 )
202 235  
203   - def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]:
204   - """Scan all product docs from tenant index using search_after."""
  236 + @classmethod
  237 + def _compute_rank_score_from_candidate(cls, c: SuggestionCandidate) -> float:
  238 + return cls._compute_rank_score(
  239 + query_count_30d=c.query_count_30d,
  240 + query_count_7d=c.query_count_7d,
  241 + qanchor_doc_count=len(c.qanchor_spu_ids),
  242 + title_doc_count=len(c.title_spu_ids),
  243 + )
  244 +
  245 + def _iter_products(self, tenant_id: str, batch_size: int = 500) -> Iterator[Dict[str, Any]]:
  246 + """Stream product docs from tenant index using search_after."""
205 247 from indexer.mapping_generator import get_tenant_index_name
206 248  
207 249 index_name = get_tenant_index_name(tenant_id)
208   - all_hits: List[Dict[str, Any]] = []
209 250 search_after: Optional[List[Any]] = None
210 251  
211 252 while True:
212 253 body: Dict[str, Any] = {
213 254 "size": batch_size,
214   - "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"],
  255 + "_source": ["spu_id", "title", "qanchors"],
215 256 "sort": [{"spu_id": "asc"}],
216 257 "query": {"match_all": {}},
217 258 }
... ... @@ -222,50 +263,179 @@ class SuggestionIndexBuilder:
222 263 hits = resp.get("hits", {}).get("hits", []) or []
223 264 if not hits:
224 265 break
225   - all_hits.extend(hits)
  266 + for hit in hits:
  267 + yield hit
226 268 search_after = hits[-1].get("sort")
227 269 if len(hits) < batch_size:
228 270 break
229   - return all_hits
230   -
231   - def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str:
232   - index_name = get_suggestion_index_name(tenant_id)
233   - if recreate and self.es_client.index_exists(index_name):
234   - logger.info("Deleting existing suggestion index: %s", index_name)
235   - self.es_client.delete_index(index_name)
236   - if not self.es_client.index_exists(index_name):
237   - mapping = build_suggestion_mapping(index_languages=index_languages)
238   - ok = self.es_client.create_index(index_name, mapping)
239   - if not ok:
240   - raise RuntimeError(f"Failed to create suggestion index: {index_name}")
241   - return index_name
242 271  
243   - def rebuild_tenant_index(
  272 + def _iter_query_log_rows(
244 273 self,
245 274 tenant_id: str,
246   - days: int = 365,
247   - recreate: bool = True,
248   - batch_size: int = 500,
249   - min_query_len: int = 1,
250   - ) -> Dict[str, Any]:
251   - tenant_loader = get_tenant_config_loader()
252   - tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
253   - index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
254   - primary_language: str = tenant_cfg.get("primary_language") or "en"
  275 + since: datetime,
  276 + until: datetime,
  277 + fetch_size: int = 2000,
  278 + ) -> Iterator[Any]:
  279 + """Stream search logs from MySQL with bounded time range."""
  280 + query_sql = text(
  281 + """
  282 + SELECT query, language, request_params, create_time
  283 + FROM shoplazza_search_log
  284 + WHERE tenant_id = :tenant_id
  285 + AND deleted = 0
  286 + AND query IS NOT NULL
  287 + AND query <> ''
  288 + AND create_time >= :since_time
  289 + AND create_time < :until_time
  290 + ORDER BY create_time ASC
  291 + """
  292 + )
  293 +
  294 + with self.db_engine.connect().execution_options(stream_results=True) as conn:
  295 + result = conn.execute(
  296 + query_sql,
  297 + {
  298 + "tenant_id": int(tenant_id),
  299 + "since_time": since,
  300 + "until_time": until,
  301 + },
  302 + )
  303 + while True:
  304 + rows = result.fetchmany(fetch_size)
  305 + if not rows:
  306 + break
  307 + for row in rows:
  308 + yield row
  309 +
  310 + def _ensure_meta_index(self) -> str:
  311 + meta_index = get_suggestion_meta_index_name()
  312 + if self.es_client.index_exists(meta_index):
  313 + return meta_index
  314 + body = {
  315 + "settings": {
  316 + "number_of_shards": 1,
  317 + "number_of_replicas": 0,
  318 + "refresh_interval": "1s",
  319 + },
  320 + "mappings": {
  321 + "properties": {
  322 + "tenant_id": {"type": "keyword"},
  323 + "active_alias": {"type": "keyword"},
  324 + "active_index": {"type": "keyword"},
  325 + "last_full_build_at": {"type": "date"},
  326 + "last_incremental_build_at": {"type": "date"},
  327 + "last_incremental_watermark": {"type": "date"},
  328 + "updated_at": {"type": "date"},
  329 + }
  330 + },
  331 + }
  332 + if not self.es_client.create_index(meta_index, body):
  333 + raise RuntimeError(f"Failed to create suggestion meta index: {meta_index}")
  334 + return meta_index
  335 +
  336 + def _get_meta(self, tenant_id: str) -> Dict[str, Any]:
  337 + meta_index = self._ensure_meta_index()
  338 + try:
  339 + resp = self.es_client.client.get(index=meta_index, id=str(tenant_id))
  340 + return resp.get("_source", {}) or {}
  341 + except Exception:
  342 + return {}
  343 +
  344 + def _upsert_meta(self, tenant_id: str, patch: Dict[str, Any]) -> None:
  345 + meta_index = self._ensure_meta_index()
  346 + current = self._get_meta(tenant_id)
  347 + now_iso = datetime.now(timezone.utc).isoformat()
  348 + merged = {
  349 + "tenant_id": str(tenant_id),
  350 + **current,
  351 + **patch,
  352 + "updated_at": now_iso,
  353 + }
  354 + self.es_client.client.index(index=meta_index, id=str(tenant_id), document=merged, refresh="wait_for")
  355 +
  356 + def _cleanup_old_versions(self, tenant_id: str, keep_versions: int, protected_indices: Optional[List[str]] = None) -> List[str]:
  357 + if keep_versions < 1:
  358 + keep_versions = 1
  359 + protected = set(protected_indices or [])
  360 + pattern = get_suggestion_versioned_index_pattern(tenant_id)
  361 + all_indices = self.es_client.list_indices(pattern)
  362 + if len(all_indices) <= keep_versions:
  363 + return []
  364 +
  365 + # Names are timestamp-ordered by suffix; keep newest N.
  366 + kept = set(sorted(all_indices)[-keep_versions:])
  367 + dropped: List[str] = []
  368 + for idx in sorted(all_indices):
  369 + if idx in kept or idx in protected:
  370 + continue
  371 + if self.es_client.delete_index(idx):
  372 + dropped.append(idx)
  373 + return dropped
  374 +
  375 + def _publish_alias(self, tenant_id: str, index_name: str, keep_versions: int = 2) -> Dict[str, Any]:
  376 + alias_name = get_suggestion_alias_name(tenant_id)
  377 + current_indices = self.es_client.get_alias_indices(alias_name)
  378 +
  379 + actions: List[Dict[str, Any]] = []
  380 + for idx in current_indices:
  381 + actions.append({"remove": {"index": idx, "alias": alias_name}})
  382 + actions.append({"add": {"index": index_name, "alias": alias_name}})
  383 +
  384 + if not self.es_client.update_aliases(actions):
  385 + raise RuntimeError(f"Failed to publish alias {alias_name} -> {index_name}")
  386 +
  387 + dropped = self._cleanup_old_versions(
  388 + tenant_id=tenant_id,
  389 + keep_versions=keep_versions,
  390 + protected_indices=[index_name],
  391 + )
255 392  
256   - index_name = self._create_or_reset_index(tenant_id, index_languages, recreate)
  393 + self._upsert_meta(
  394 + tenant_id,
  395 + {
  396 + "active_alias": alias_name,
  397 + "active_index": index_name,
  398 + },
  399 + )
  400 +
  401 + return {
  402 + "alias": alias_name,
  403 + "previous_indices": current_indices,
  404 + "current_index": index_name,
  405 + "dropped_old_indices": dropped,
  406 + }
  407 +
  408 + def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]:
  409 + alias_name = get_suggestion_alias_name(tenant_id)
  410 + aliased = self.es_client.get_alias_indices(alias_name)
  411 + if aliased:
  412 + # alias should map to one index in this design
  413 + return sorted(aliased)[-1]
  414 +
  415 + legacy = get_suggestion_legacy_index_name(tenant_id)
  416 + if self.es_client.index_exists(legacy):
  417 + return legacy
  418 + return None
  419 +
  420 + def _build_full_candidates(
  421 + self,
  422 + tenant_id: str,
  423 + index_languages: List[str],
  424 + primary_language: str,
  425 + days: int,
  426 + batch_size: int,
  427 + min_query_len: int,
  428 + ) -> Dict[Tuple[str, str], SuggestionCandidate]:
257 429 key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {}
258 430  
259 431 # Step 1: product title/qanchors
260   - hits = self._scan_products(tenant_id, batch_size=batch_size)
261   - for hit in hits:
  432 + for hit in self._iter_products(tenant_id, batch_size=batch_size):
262 433 src = hit.get("_source", {}) or {}
263 434 spu_id = str(src.get("spu_id") or "")
264 435 if not spu_id:
265 436 continue
266 437 title_obj = src.get("title") or {}
267 438 qanchor_obj = src.get("qanchors") or {}
268   - product_score = self._score_product_hit(src)
269 439  
270 440 for lang in index_languages:
271 441 title = ""
... ... @@ -279,7 +449,7 @@ class SuggestionIndexBuilder:
279 449 if c is None:
280 450 c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang)
281 451 key_to_candidate[key] = c
282   - c.add_product("title", spu_id=spu_id, score=product_score)
  452 + c.add_product("title", spu_id=spu_id)
283 453  
284 454 q_raw = None
285 455 if isinstance(qanchor_obj, dict):
... ... @@ -293,30 +463,18 @@ class SuggestionIndexBuilder:
293 463 if c is None:
294 464 c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
295 465 key_to_candidate[key] = c
296   - c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6)
  466 + c.add_product("qanchor", spu_id=spu_id)
297 467  
298 468 # Step 2: query logs
299 469 now = datetime.now(timezone.utc)
300   - since_30d = now - timedelta(days=days)
  470 + since = now - timedelta(days=days)
301 471 since_7d = now - timedelta(days=7)
302   - query_sql = text(
303   - """
304   - SELECT query, language, request_params, create_time
305   - FROM shoplazza_search_log
306   - WHERE tenant_id = :tenant_id
307   - AND deleted = 0
308   - AND query IS NOT NULL
309   - AND query <> ''
310   - AND create_time >= :since_30d
311   - """
312   - )
313   - with self.db_engine.connect() as conn:
314   - rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall()
315 472  
316   - for row in rows:
  473 + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=now):
317 474 q = str(row.query or "").strip()
318 475 if len(q) < min_query_len:
319 476 continue
  477 +
320 478 lang, conf, source, conflict = self._resolve_query_language(
321 479 query=q,
322 480 log_language=getattr(row, "language", None),
... ... @@ -327,74 +485,396 @@ class SuggestionIndexBuilder:
327 485 text_norm = self._normalize_text(q)
328 486 if self._looks_noise(text_norm):
329 487 continue
  488 +
330 489 key = (lang, text_norm)
331 490 c = key_to_candidate.get(key)
332 491 if c is None:
333 492 c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang)
334 493 key_to_candidate[key] = c
  494 +
335 495 c.lang_confidence = max(c.lang_confidence, conf)
336 496 c.lang_source = source if c.lang_source == "default" else c.lang_source
337 497 c.lang_conflict = c.lang_conflict or conflict
338 498  
339   - created_at = getattr(row, "create_time", None)
340   - if created_at is None:
341   - is_7d = False
342   - else:
343   - # DB datetime usually naive local time; compare conservatively
344   - if isinstance(created_at, datetime) and created_at.tzinfo is None:
345   - created_at = created_at.replace(tzinfo=timezone.utc)
346   - is_7d = bool(created_at and created_at >= since_7d)
  499 + created_at = self._to_utc(getattr(row, "create_time", None))
  500 + is_7d = bool(created_at and created_at >= since_7d)
347 501 c.add_query_log(is_7d=is_7d)
348 502  
349   - # Step 3: build docs
  503 + return key_to_candidate
  504 +
  505 + def _candidate_to_doc(self, tenant_id: str, c: SuggestionCandidate, now_iso: str) -> Dict[str, Any]:
  506 + rank_score = self._compute_rank_score_from_candidate(c)
  507 + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}}
  508 + sat_obj = {c.lang: c.text}
  509 + return {
  510 + "_id": f"{tenant_id}|{c.lang}|{c.text_norm}",
  511 + "tenant_id": str(tenant_id),
  512 + "lang": c.lang,
  513 + "text": c.text,
  514 + "text_norm": c.text_norm,
  515 + "sources": sorted(c.sources),
  516 + "title_doc_count": len(c.title_spu_ids),
  517 + "qanchor_doc_count": len(c.qanchor_spu_ids),
  518 + "query_count_7d": c.query_count_7d,
  519 + "query_count_30d": c.query_count_30d,
  520 + "rank_score": float(rank_score),
  521 + "lang_confidence": float(c.lang_confidence),
  522 + "lang_source": c.lang_source,
  523 + "lang_conflict": bool(c.lang_conflict),
  524 + "status": 1,
  525 + "updated_at": now_iso,
  526 + "completion": completion_obj,
  527 + "sat": sat_obj,
  528 + }
  529 +
  530 + def rebuild_tenant_index(
  531 + self,
  532 + tenant_id: str,
  533 + days: int = 365,
  534 + recreate: bool = False,
  535 + batch_size: int = 500,
  536 + min_query_len: int = 1,
  537 + publish_alias: bool = True,
  538 + keep_versions: int = 2,
  539 + use_versioned_index: bool = True,
  540 + ) -> Dict[str, Any]:
  541 + """
  542 + Full rebuild.
  543 +
  544 + Phase2 default behavior:
  545 + - write to versioned index
  546 + - atomically publish alias
  547 + """
  548 + tenant_loader = get_tenant_config_loader()
  549 + tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
  550 + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
  551 + primary_language: str = tenant_cfg.get("primary_language") or "en"
  552 +
  553 + if use_versioned_index:
  554 + index_name = get_suggestion_versioned_index_name(tenant_id)
  555 + else:
  556 + index_name = get_suggestion_legacy_index_name(tenant_id)
  557 + if recreate and self.es_client.index_exists(index_name):
  558 + logger.info("Deleting existing suggestion index: %s", index_name)
  559 + self.es_client.delete_index(index_name)
  560 +
  561 + if self.es_client.index_exists(index_name):
  562 + raise RuntimeError(f"Target suggestion index already exists: {index_name}")
  563 +
  564 + mapping = build_suggestion_mapping(index_languages=index_languages)
  565 + if not self.es_client.create_index(index_name, mapping):
  566 + raise RuntimeError(f"Failed to create suggestion index: {index_name}")
  567 +
  568 + key_to_candidate = self._build_full_candidates(
  569 + tenant_id=tenant_id,
  570 + index_languages=index_languages,
  571 + primary_language=primary_language,
  572 + days=days,
  573 + batch_size=batch_size,
  574 + min_query_len=min_query_len,
  575 + )
  576 +
350 577 now_iso = datetime.now(timezone.utc).isoformat()
351   - docs: List[Dict[str, Any]] = []
352   - for (_, _), c in key_to_candidate.items():
353   - rank_score = self._compute_rank_score(c)
354   - # keep top 20 product ids by score
355   - top_spu_ids = [
356   - item[0]
357   - for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20]
358   - ]
359   -
360   - completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}}
361   - sat_obj = {c.lang: c.text}
362   - doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}"
363   - docs.append(
364   - {
365   - "_id": doc_id,
366   - "tenant_id": str(tenant_id),
367   - "lang": c.lang,
368   - "text": c.text,
369   - "text_norm": c.text_norm,
370   - "sources": sorted(c.sources),
371   - "title_doc_count": len(c.title_spu_ids),
372   - "qanchor_doc_count": len(c.qanchor_spu_ids),
373   - "query_count_7d": c.query_count_7d,
374   - "query_count_30d": c.query_count_30d,
375   - "rank_score": float(rank_score),
376   - "lang_confidence": float(c.lang_confidence),
377   - "lang_source": c.lang_source,
378   - "lang_conflict": bool(c.lang_conflict),
379   - "top_spu_ids": top_spu_ids,
380   - "status": 1,
381   - "updated_at": now_iso,
382   - "completion": completion_obj,
383   - "sat": sat_obj,
384   - }
385   - )
  578 + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()]
386 579  
387 580 if docs:
388   - result = self.es_client.bulk_index(index_name=index_name, docs=docs)
  581 + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs)
389 582 self.es_client.refresh(index_name)
390 583 else:
391   - result = {"success": 0, "failed": 0, "errors": []}
  584 + bulk_result = {"success": 0, "failed": 0, "errors": []}
  585 +
  586 + alias_publish: Optional[Dict[str, Any]] = None
  587 + if publish_alias and use_versioned_index:
  588 + alias_publish = self._publish_alias(
  589 + tenant_id=tenant_id,
  590 + index_name=index_name,
  591 + keep_versions=keep_versions,
  592 + )
  593 +
  594 + now_utc = datetime.now(timezone.utc).isoformat()
  595 + meta_patch: Dict[str, Any] = {
  596 + "last_full_build_at": now_utc,
  597 + "last_incremental_watermark": now_utc,
  598 + }
  599 + if publish_alias and use_versioned_index:
  600 + meta_patch["active_index"] = index_name
  601 + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id)
  602 + self._upsert_meta(tenant_id, meta_patch)
392 603  
393 604 return {
  605 + "mode": "full",
394 606 "tenant_id": str(tenant_id),
395 607 "index_name": index_name,
  608 + "alias_published": bool(alias_publish),
  609 + "alias_publish": alias_publish,
396 610 "total_candidates": len(key_to_candidate),
397 611 "indexed_docs": len(docs),
398   - "bulk_result": result,
  612 + "bulk_result": bulk_result,
399 613 }
400 614  
  615 + def _build_incremental_deltas(
  616 + self,
  617 + tenant_id: str,
  618 + index_languages: List[str],
  619 + primary_language: str,
  620 + since: datetime,
  621 + until: datetime,
  622 + min_query_len: int,
  623 + ) -> Dict[Tuple[str, str], QueryDelta]:
  624 + now = datetime.now(timezone.utc)
  625 + since_7d = now - timedelta(days=7)
  626 + deltas: Dict[Tuple[str, str], QueryDelta] = {}
  627 +
  628 + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=until):
  629 + q = str(row.query or "").strip()
  630 + if len(q) < min_query_len:
  631 + continue
  632 +
  633 + lang, conf, source, conflict = self._resolve_query_language(
  634 + query=q,
  635 + log_language=getattr(row, "language", None),
  636 + request_params=getattr(row, "request_params", None),
  637 + index_languages=index_languages,
  638 + primary_language=primary_language,
  639 + )
  640 + text_norm = self._normalize_text(q)
  641 + if self._looks_noise(text_norm):
  642 + continue
  643 +
  644 + key = (lang, text_norm)
  645 + item = deltas.get(key)
  646 + if item is None:
  647 + item = QueryDelta(
  648 + tenant_id=str(tenant_id),
  649 + lang=lang,
  650 + text=q,
  651 + text_norm=text_norm,
  652 + lang_confidence=conf,
  653 + lang_source=source,
  654 + lang_conflict=conflict,
  655 + )
  656 + deltas[key] = item
  657 +
  658 + created_at = self._to_utc(getattr(row, "create_time", None))
  659 + item.delta_30d += 1
  660 + if created_at and created_at >= since_7d:
  661 + item.delta_7d += 1
  662 +
  663 + if conf > item.lang_confidence:
  664 + item.lang_confidence = conf
  665 + item.lang_source = source
  666 + item.lang_conflict = item.lang_conflict or conflict
  667 +
  668 + return deltas
  669 +
  670 + def _delta_to_upsert_doc(self, delta: QueryDelta, now_iso: str) -> Dict[str, Any]:
  671 + rank_score = self._compute_rank_score(
  672 + query_count_30d=delta.delta_30d,
  673 + query_count_7d=delta.delta_7d,
  674 + qanchor_doc_count=0,
  675 + title_doc_count=0,
  676 + )
  677 + return {
  678 + "tenant_id": delta.tenant_id,
  679 + "lang": delta.lang,
  680 + "text": delta.text,
  681 + "text_norm": delta.text_norm,
  682 + "sources": ["query_log"],
  683 + "title_doc_count": 0,
  684 + "qanchor_doc_count": 0,
  685 + "query_count_7d": delta.delta_7d,
  686 + "query_count_30d": delta.delta_30d,
  687 + "rank_score": float(rank_score),
  688 + "lang_confidence": float(delta.lang_confidence),
  689 + "lang_source": delta.lang_source,
  690 + "lang_conflict": bool(delta.lang_conflict),
  691 + "status": 1,
  692 + "updated_at": now_iso,
  693 + "completion": {
  694 + delta.lang: {
  695 + "input": [delta.text],
  696 + "weight": int(max(rank_score, 1.0) * 100),
  697 + }
  698 + },
  699 + "sat": {delta.lang: delta.text},
  700 + }
  701 +
  702 + @staticmethod
  703 + def _build_incremental_update_script() -> str:
  704 + return """
  705 + if (ctx._source == null || ctx._source.isEmpty()) {
  706 + ctx._source = params.upsert;
  707 + return;
  708 + }
  709 +
  710 + if (ctx._source.query_count_30d == null) { ctx._source.query_count_30d = 0; }
  711 + if (ctx._source.query_count_7d == null) { ctx._source.query_count_7d = 0; }
  712 + if (ctx._source.qanchor_doc_count == null) { ctx._source.qanchor_doc_count = 0; }
  713 + if (ctx._source.title_doc_count == null) { ctx._source.title_doc_count = 0; }
  714 +
  715 + ctx._source.query_count_30d += params.delta_30d;
  716 + ctx._source.query_count_7d += params.delta_7d;
  717 +
  718 + if (ctx._source.sources == null) { ctx._source.sources = new ArrayList(); }
  719 + if (!ctx._source.sources.contains('query_log')) { ctx._source.sources.add('query_log'); }
  720 +
  721 + if (ctx._source.lang_conflict == null) { ctx._source.lang_conflict = false; }
  722 + ctx._source.lang_conflict = ctx._source.lang_conflict || params.lang_conflict;
  723 +
  724 + if (ctx._source.lang_confidence == null || params.lang_confidence > ctx._source.lang_confidence) {
  725 + ctx._source.lang_confidence = params.lang_confidence;
  726 + ctx._source.lang_source = params.lang_source;
  727 + }
  728 +
  729 + int q30 = ctx._source.query_count_30d;
  730 + int q7 = ctx._source.query_count_7d;
  731 + int qa = ctx._source.qanchor_doc_count;
  732 + int td = ctx._source.title_doc_count;
  733 +
  734 + double score = 1.8 * Math.log(1 + q30)
  735 + + 1.2 * Math.log(1 + q7)
  736 + + 1.0 * Math.log(1 + qa)
  737 + + 0.6 * Math.log(1 + td);
  738 + ctx._source.rank_score = score;
  739 + ctx._source.status = 1;
  740 + ctx._source.updated_at = params.now_iso;
  741 + ctx._source.text = params.text;
  742 + ctx._source.lang = params.lang;
  743 + ctx._source.text_norm = params.text_norm;
  744 +
  745 + if (ctx._source.completion == null) { ctx._source.completion = new HashMap(); }
  746 + Map c = new HashMap();
  747 + c.put('input', params.completion_input);
  748 + c.put('weight', params.completion_weight);
  749 + ctx._source.completion.put(params.lang, c);
  750 +
  751 + if (ctx._source.sat == null) { ctx._source.sat = new HashMap(); }
  752 + ctx._source.sat.put(params.lang, params.text);
  753 + """
  754 +
  755 + def _build_incremental_actions(self, target_index: str, deltas: Dict[Tuple[str, str], QueryDelta]) -> List[Dict[str, Any]]:
  756 + now_iso = datetime.now(timezone.utc).isoformat()
  757 + script_source = self._build_incremental_update_script()
  758 + actions: List[Dict[str, Any]] = []
  759 +
  760 + for delta in deltas.values():
  761 + upsert_doc = self._delta_to_upsert_doc(delta=delta, now_iso=now_iso)
  762 + upsert_rank = float(upsert_doc.get("rank_score") or 0.0)
  763 + action = {
  764 + "_op_type": "update",
  765 + "_index": target_index,
  766 + "_id": f"{delta.tenant_id}|{delta.lang}|{delta.text_norm}",
  767 + "scripted_upsert": True,
  768 + "script": {
  769 + "lang": "painless",
  770 + "source": script_source,
  771 + "params": {
  772 + "delta_30d": int(delta.delta_30d),
  773 + "delta_7d": int(delta.delta_7d),
  774 + "lang_confidence": float(delta.lang_confidence),
  775 + "lang_source": delta.lang_source,
  776 + "lang_conflict": bool(delta.lang_conflict),
  777 + "now_iso": now_iso,
  778 + "lang": delta.lang,
  779 + "text": delta.text,
  780 + "text_norm": delta.text_norm,
  781 + "completion_input": [delta.text],
  782 + "completion_weight": int(max(upsert_rank, 1.0) * 100),
  783 + "upsert": upsert_doc,
  784 + },
  785 + },
  786 + "upsert": upsert_doc,
  787 + }
  788 + actions.append(action)
  789 +
  790 + return actions
  791 +
  792 + def incremental_update_tenant_index(
  793 + self,
  794 + tenant_id: str,
  795 + min_query_len: int = 1,
  796 + fallback_days: int = 7,
  797 + overlap_minutes: int = 30,
  798 + bootstrap_if_missing: bool = True,
  799 + bootstrap_days: int = 30,
  800 + batch_size: int = 500,
  801 + ) -> Dict[str, Any]:
  802 + tenant_loader = get_tenant_config_loader()
  803 + tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
  804 + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
  805 + primary_language: str = tenant_cfg.get("primary_language") or "en"
  806 +
  807 + target_index = self._resolve_incremental_target_index(tenant_id)
  808 + if not target_index:
  809 + if not bootstrap_if_missing:
  810 + raise RuntimeError(
  811 + f"No active suggestion index for tenant={tenant_id}. "
  812 + "Run full rebuild first or enable bootstrap_if_missing."
  813 + )
  814 + full_result = self.rebuild_tenant_index(
  815 + tenant_id=tenant_id,
  816 + days=bootstrap_days,
  817 + batch_size=batch_size,
  818 + min_query_len=min_query_len,
  819 + publish_alias=True,
  820 + use_versioned_index=True,
  821 + )
  822 + return {
  823 + "mode": "incremental",
  824 + "tenant_id": str(tenant_id),
  825 + "bootstrapped": True,
  826 + "bootstrap_result": full_result,
  827 + }
  828 +
  829 + meta = self._get_meta(tenant_id)
  830 + watermark_raw = meta.get("last_incremental_watermark") or meta.get("last_full_build_at")
  831 + now = datetime.now(timezone.utc)
  832 + default_since = now - timedelta(days=fallback_days)
  833 + since = None
  834 + if isinstance(watermark_raw, str) and watermark_raw.strip():
  835 + try:
  836 + since = self._to_utc(datetime.fromisoformat(watermark_raw.replace("Z", "+00:00")))
  837 + except Exception:
  838 + since = None
  839 + if since is None:
  840 + since = default_since
  841 + since = since - timedelta(minutes=max(overlap_minutes, 0))
  842 + if since < default_since:
  843 + since = default_since
  844 +
  845 + deltas = self._build_incremental_deltas(
  846 + tenant_id=tenant_id,
  847 + index_languages=index_languages,
  848 + primary_language=primary_language,
  849 + since=since,
  850 + until=now,
  851 + min_query_len=min_query_len,
  852 + )
  853 +
  854 + actions = self._build_incremental_actions(target_index=target_index, deltas=deltas)
  855 + bulk_result = self.es_client.bulk_actions(actions)
  856 + self.es_client.refresh(target_index)
  857 +
  858 + now_iso = now.isoformat()
  859 + self._upsert_meta(
  860 + tenant_id,
  861 + {
  862 + "last_incremental_build_at": now_iso,
  863 + "last_incremental_watermark": now_iso,
  864 + "active_index": target_index,
  865 + "active_alias": get_suggestion_alias_name(tenant_id),
  866 + },
  867 + )
  868 +
  869 + return {
  870 + "mode": "incremental",
  871 + "tenant_id": str(tenant_id),
  872 + "target_index": target_index,
  873 + "query_window": {
  874 + "since": since.isoformat(),
  875 + "until": now_iso,
  876 + "overlap_minutes": int(overlap_minutes),
  877 + },
  878 + "updated_terms": len(deltas),
  879 + "bulk_result": bulk_result,
  880 + }
... ...
suggestion/mapping.py
... ... @@ -88,7 +88,6 @@ def build_suggestion_mapping(index_languages: List[str]) -&gt; Dict[str, Any]:
88 88 "lang_confidence": {"type": "float"},
89 89 "lang_source": {"type": "keyword"},
90 90 "lang_conflict": {"type": "boolean"},
91   - "top_spu_ids": {"type": "keyword"},
92 91 "status": {"type": "byte"},
93 92 "updated_at": {"type": "date"},
94 93 "completion": {"properties": completion_props},
... ... @@ -96,4 +95,3 @@ def build_suggestion_mapping(index_languages: List[str]) -&gt; Dict[str, Any]:
96 95 }
97 96 },
98 97 }
99   -
... ...
suggestion/service.py
... ... @@ -7,8 +7,11 @@ import time
7 7 from typing import Any, Dict, List, Optional
8 8  
9 9 from config.tenant_config_loader import get_tenant_config_loader
10   -from indexer.mapping_generator import get_tenant_index_name
11   -from suggestion.builder import get_suggestion_index_name
  10 +from suggestion.builder import (
  11 + get_suggestion_alias_name,
  12 + get_suggestion_index_name,
  13 + get_suggestion_legacy_index_name,
  14 +)
12 15 from utils.es_client import ESClient
13 16  
14 17 logger = logging.getLogger(__name__)
... ... @@ -33,12 +36,29 @@ class SuggestionService:
33 36 return primary
34 37 return index_languages[0]
35 38  
  39 + def _resolve_search_target(self, tenant_id: str) -> Optional[str]:
  40 + alias_name = get_suggestion_alias_name(tenant_id)
  41 + if self.es_client.alias_exists(alias_name):
  42 + return alias_name
  43 +
  44 + # Fallback for pre-Phase2 deployments
  45 + legacy = get_suggestion_legacy_index_name(tenant_id)
  46 + if self.es_client.index_exists(legacy):
  47 + return legacy
  48 +
  49 + # Last fallback: current naming helper
  50 + candidate = get_suggestion_index_name(tenant_id)
  51 + if self.es_client.index_exists(candidate):
  52 + return candidate
  53 + return None
  54 +
36 55 def _completion_suggest(
37 56 self,
38 57 index_name: str,
39 58 query: str,
40 59 lang: str,
41 60 size: int,
  61 + tenant_id: str,
42 62 ) -> List[Dict[str, Any]]:
43 63 """
44 64 Query ES completion suggester from `completion.<lang>`.
... ... @@ -68,7 +88,7 @@ class SuggestionService:
68 88 ],
69 89 }
70 90 try:
71   - resp = self.es_client.client.search(index=index_name, body=body)
  91 + resp = self.es_client.client.search(index=index_name, body=body, routing=str(tenant_id))
72 92 except Exception as e:
73 93 # completion is an optimization path; never hard-fail the whole endpoint
74 94 logger.warning("Completion suggest failed for index=%s field=%s: %s", index_name, field_name, e)
... ... @@ -95,69 +115,17 @@ class SuggestionService:
95 115 )
96 116 return out
97 117  
98   - def _search_products_for_suggestion(
99   - self,
100   - tenant_id: str,
101   - text_value: str,
102   - lang: str,
103   - result_size: int,
104   - ) -> List[Dict[str, Any]]:
105   - index_name = get_tenant_index_name(tenant_id)
106   - title_field = f"title.{lang}"
107   - qanchor_field = f"qanchors.{lang}"
108   -
109   - body = {
110   - "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"],
111   - "query": {
112   - "bool": {
113   - "should": [
114   - {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}},
115   - {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}},
116   - {"match": {title_field: {"query": text_value, "boost": 1.0}}},
117   - ],
118   - "minimum_should_match": 1,
119   - }
120   - },
121   - "sort": [{"_score": "desc"}, {"sales": "desc"}],
122   - }
123   - resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0)
124   - hits = resp.get("hits", {}).get("hits", []) or []
125   - out: List[Dict[str, Any]] = []
126   - for hit in hits:
127   - src = hit.get("_source", {}) or {}
128   - title_obj = src.get("title") or {}
129   - resolved_title = None
130   - if isinstance(title_obj, dict):
131   - resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh")
132   - if not resolved_title:
133   - for v in title_obj.values():
134   - if v:
135   - resolved_title = v
136   - break
137   - out.append(
138   - {
139   - "spu_id": src.get("spu_id"),
140   - "title": resolved_title,
141   - "price": src.get("min_price"),
142   - "image_url": src.get("image_url"),
143   - "score": hit.get("_score", 0.0),
144   - }
145   - )
146   - return out
147   -
148 118 def search(
149 119 self,
150 120 tenant_id: str,
151 121 query: str,
152 122 language: str,
153 123 size: int = 10,
154   - with_results: bool = True,
155   - result_size: int = 3,
156 124 ) -> Dict[str, Any]:
157 125 start = time.time()
158 126 resolved_lang = self._resolve_language(tenant_id, language)
159   - index_name = get_suggestion_index_name(tenant_id)
160   - if not self.es_client.index_exists(index_name):
  127 + index_name = self._resolve_search_target(tenant_id)
  128 + if not index_name:
161 129 # On a fresh ES cluster the suggestion index might not be built yet.
162 130 # Keep endpoint stable for frontend autocomplete: return empty list instead of 500.
163 131 took_ms = int((time.time() - start) * 1000)
... ... @@ -171,6 +139,7 @@ class SuggestionService:
171 139  
172 140 sat_field = f"sat.{resolved_lang}"
173 141 dsl = {
  142 + "track_total_hits": False,
174 143 "query": {
175 144 "function_score": {
176 145 "query": {
... ... @@ -206,14 +175,19 @@ class SuggestionService:
206 175 "lang",
207 176 "rank_score",
208 177 "sources",
209   - "top_spu_ids",
210 178 "lang_source",
211 179 "lang_confidence",
212 180 "lang_conflict",
213 181 ],
214 182 }
215 183 # Recall path A: bool_prefix on search_as_you_type
216   - es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0)
  184 + es_resp = self.es_client.search(
  185 + index_name=index_name,
  186 + body=dsl,
  187 + size=size,
  188 + from_=0,
  189 + routing=str(tenant_id),
  190 + )
217 191 hits = es_resp.get("hits", {}).get("hits", []) or []
218 192  
219 193 # Recall path B: completion suggester (optional optimization)
... ... @@ -222,6 +196,7 @@ class SuggestionService:
222 196 query=query,
223 197 lang=resolved_lang,
224 198 size=size,
  199 + tenant_id=tenant_id,
225 200 )
226 201  
227 202 suggestions: List[Dict[str, Any]] = []
... ... @@ -256,17 +231,6 @@ class SuggestionService:
256 231 "lang_confidence": src.get("lang_confidence"),
257 232 "lang_conflict": src.get("lang_conflict", False),
258 233 }
259   - if with_results:
260   - try:
261   - item["products"] = self._search_products_for_suggestion(
262   - tenant_id=tenant_id,
263   - text_value=str(src.get("text") or ""),
264   - lang=resolved_lang,
265   - result_size=result_size,
266   - )
267   - except Exception as e:
268   - logger.warning("Failed to enrich suggestion products: %s", e)
269   - item["products"] = []
270 234 suggestions.append(item)
271 235  
272 236 took_ms = int((time.time() - start) * 1000)
... ... @@ -277,4 +241,3 @@ class SuggestionService:
277 241 "suggestions": suggestions[:size],
278 242 "took_ms": took_ms,
279 243 }
280   -
... ...
tests/test_suggestions.py
1 1 import json
  2 +from datetime import datetime, timedelta, timezone
2 3 from typing import Any, Dict, List
3 4  
4 5 import pytest
5 6  
6   -from suggestion.builder import SuggestionIndexBuilder
  7 +from suggestion.builder import (
  8 + QueryDelta,
  9 + SuggestionIndexBuilder,
  10 + get_suggestion_alias_name,
  11 +)
7 12 from suggestion.service import SuggestionService
8 13  
9 14  
10 15 class FakeESClient:
11   - """Minimal fake ES client for SuggestionService tests."""
  16 + """Lightweight fake ES client for suggestion unit tests."""
12 17  
13 18 def __init__(self) -> None:
14 19 self.calls: List[Dict[str, Any]] = []
  20 + self.indices: set[str] = set()
  21 + self.aliases: Dict[str, List[str]] = {}
  22 + self.client = self # support service._completion_suggest -> self.es_client.client.search
15 23  
16   - def search(self, index_name: str, body: Dict[str, Any], size: int = 10, from_: int = 0) -> Dict[str, Any]:
17   - self.calls.append({"index": index_name, "body": body, "size": size, "from": from_})
18   - # Suggestion index
19   - if "search_suggestions_tenant_" in index_name:
  24 + def search(
  25 + self,
  26 + index_name: str = None,
  27 + body: Dict[str, Any] = None,
  28 + size: int = 10,
  29 + from_: int = 0,
  30 + routing: str = None,
  31 + index: str = None,
  32 + **kwargs,
  33 + ) -> Dict[str, Any]:
  34 + idx = index_name or index
  35 + body = body or {}
  36 + self.calls.append(
  37 + {
  38 + "op": "search",
  39 + "index": idx,
  40 + "body": body,
  41 + "size": size,
  42 + "from": from_,
  43 + "routing": routing,
  44 + }
  45 + )
  46 +
  47 + # Completion suggest path
  48 + if "suggest" in body:
  49 + return {
  50 + "suggest": {
  51 + "s": [
  52 + {
  53 + "text": "iph",
  54 + "offset": 0,
  55 + "length": 3,
  56 + "options": [
  57 + {
  58 + "text": "iphone 15",
  59 + "_score": 6.3,
  60 + "_source": {
  61 + "text": "iphone 15",
  62 + "lang": "en",
  63 + "rank_score": 5.0,
  64 + "sources": ["query_log", "qanchor"],
  65 + "lang_source": "log_field",
  66 + "lang_confidence": 1.0,
  67 + "lang_conflict": False,
  68 + },
  69 + }
  70 + ],
  71 + }
  72 + ]
  73 + }
  74 + }
  75 +
  76 + # bool_prefix path
  77 + if idx and "search_suggestions_tenant_" in idx:
20 78 return {
21 79 "hits": {
22 80 "total": {"value": 1},
... ... @@ -33,61 +91,68 @@ class FakeESClient:
33 91 "lang_source": "log_field",
34 92 "lang_confidence": 1.0,
35 93 "lang_conflict": False,
36   - "top_spu_ids": ["12345"],
37   - },
38   - }
39   - ],
40   - }
41   - }
42   - # Product index
43   - if "search_products_tenant_" in index_name:
44   - return {
45   - "hits": {
46   - "total": {"value": 1},
47   - "max_score": 2.5,
48   - "hits": [
49   - {
50   - "_id": "12345",
51   - "_score": 2.5,
52   - "_source": {
53   - "spu_id": "12345",
54   - "title": {"en": "iPhone 15 Pro Max"},
55   - "min_price": 999.0,
56   - "image_url": "https://example.com/image.jpg",
57   - "sales": 100,
58   - "total_inventory": 50,
59 94 },
60 95 }
61 96 ],
62 97 }
63 98 }
  99 +
64 100 return {"hits": {"total": {"value": 0}, "max_score": 0.0, "hits": []}}
65 101  
66   - # For builder.bulk_index usage compatibility in full runs (not used in these unit tests)
67 102 def bulk_index(self, index_name: str, docs: List[Dict[str, Any]]) -> Dict[str, Any]:
68   - self.calls.append({"index": index_name, "bulk": True, "docs": docs})
  103 + self.calls.append({"op": "bulk_index", "index": index_name, "docs": docs})
69 104 return {"success": len(docs), "failed": 0, "errors": []}
70 105  
  106 + def bulk_actions(self, actions: List[Dict[str, Any]]) -> Dict[str, Any]:
  107 + self.calls.append({"op": "bulk_actions", "actions": actions})
  108 + return {"success": len(actions), "failed": 0, "errors": []}
  109 +
71 110 def index_exists(self, index_name: str) -> bool:
72   - return False
  111 + return index_name in self.indices
73 112  
74 113 def delete_index(self, index_name: str) -> bool:
75   - return True
  114 + if index_name in self.indices:
  115 + self.indices.remove(index_name)
  116 + return True
  117 + return False
76 118  
77 119 def create_index(self, index_name: str, body: Dict[str, Any]) -> bool:
78   - self.calls.append({"index": index_name, "create": True, "body": body})
  120 + self.calls.append({"op": "create_index", "index": index_name, "body": body})
  121 + self.indices.add(index_name)
79 122 return True
80 123  
81 124 def refresh(self, index_name: str) -> bool:
  125 + self.calls.append({"op": "refresh", "index": index_name})
  126 + return True
  127 +
  128 + def alias_exists(self, alias_name: str) -> bool:
  129 + return alias_name in self.aliases and len(self.aliases[alias_name]) > 0
  130 +
  131 + def get_alias_indices(self, alias_name: str) -> List[str]:
  132 + return list(self.aliases.get(alias_name, []))
  133 +
  134 + def update_aliases(self, actions: List[Dict[str, Any]]) -> bool:
  135 + self.calls.append({"op": "update_aliases", "actions": actions})
  136 + for action in actions:
  137 + if "remove" in action:
  138 + alias = action["remove"]["alias"]
  139 + index = action["remove"]["index"]
  140 + self.aliases[alias] = [x for x in self.aliases.get(alias, []) if x != index]
  141 + if "add" in action:
  142 + alias = action["add"]["alias"]
  143 + index = action["add"]["index"]
  144 + self.aliases[alias] = [index]
82 145 return True
83 146  
  147 + def list_indices(self, pattern: str) -> List[str]:
  148 + prefix = pattern.rstrip("*")
  149 + return sorted([x for x in self.indices if x.startswith(prefix)])
  150 +
84 151  
85 152 @pytest.mark.unit
86   -def test_resolve_query_language_prefers_log_field(monkeypatch):
87   - """builder.resolve_query_language 应优先使用日志 language 字段。"""
  153 +def test_resolve_query_language_prefers_log_field():
88 154 fake_es = FakeESClient()
89 155 builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
90   - # index_languages 里包含 en/zh,primary 设为 zh
91 156 lang, conf, source, conflict = builder._resolve_query_language(
92 157 query="iphone 15",
93 158 log_language="en",
... ... @@ -103,7 +168,6 @@ def test_resolve_query_language_prefers_log_field(monkeypatch):
103 168  
104 169 @pytest.mark.unit
105 170 def test_resolve_query_language_uses_request_params_when_log_missing():
106   - """当日志 language 为空时,应从 request_params.language 解析。"""
107 171 fake_es = FakeESClient()
108 172 builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
109 173 request_params = json.dumps({"language": "zh"})
... ... @@ -122,10 +186,8 @@ def test_resolve_query_language_uses_request_params_when_log_missing():
122 186  
123 187 @pytest.mark.unit
124 188 def test_resolve_query_language_fallback_to_primary():
125   - """当无任何语言线索时(无 script 检测),应回落到租户 primary_language。"""
126 189 fake_es = FakeESClient()
127 190 builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
128   - # "123" 无 CJK/Latin 等 script,_detect_script_language 返回 None
129 191 lang, conf, source, conflict = builder._resolve_query_language(
130 192 query="123",
131 193 log_language=None,
... ... @@ -139,16 +201,10 @@ def test_resolve_query_language_fallback_to_primary():
139 201  
140 202  
141 203 @pytest.mark.unit
142   -def test_suggestion_service_basic_flow(monkeypatch):
143   - """
144   - SuggestionService.search 应正确调用 ES 并返回 suggestion + products。
145   - 使用 FakeESClient 避免真实 ES 依赖。
146   - """
147   - # 覆盖 tenant_config_loader 以避免依赖外部 config.yaml 改动
  204 +def test_suggestion_service_basic_flow_uses_alias_and_routing():
148 205 from config import tenant_config_loader as tcl
149 206  
150 207 loader = tcl.get_tenant_config_loader()
151   - # 强制覆盖内部缓存配置
152 208 loader._config = {
153 209 "default": {"primary_language": "en", "index_languages": ["en", "zh"]},
154 210 "tenants": {
... ... @@ -157,14 +213,15 @@ def test_suggestion_service_basic_flow(monkeypatch):
157 213 }
158 214  
159 215 fake_es = FakeESClient()
  216 + alias_name = get_suggestion_alias_name("1")
  217 + fake_es.aliases[alias_name] = ["search_suggestions_tenant_1_v20260310190000"]
  218 +
160 219 service = SuggestionService(es_client=fake_es)
161 220 result = service.search(
162 221 tenant_id="1",
163 222 query="iph",
164 223 language="en",
165 224 size=5,
166   - with_results=True,
167   - result_size=2,
168 225 )
169 226  
170 227 assert result["resolved_language"] == "en"
... ... @@ -172,13 +229,119 @@ def test_suggestion_service_basic_flow(monkeypatch):
172 229 assert result["took_ms"] >= 0
173 230 suggestions = result["suggestions"]
174 231 assert len(suggestions) == 1
175   - s0 = suggestions[0]
176   - assert s0["text"] == "iphone 15"
177   - assert s0["lang"] == "en"
178   - assert isinstance(s0.get("products"), list)
179   - assert len(s0["products"]) >= 1
180   - p0 = s0["products"][0]
181   - assert p0["spu_id"] == "12345"
182   - assert "title" in p0
183   - assert "price" in p0
  232 + assert suggestions[0]["text"] == "iphone 15"
  233 +
  234 + search_calls = [x for x in fake_es.calls if x.get("op") == "search"]
  235 + assert len(search_calls) >= 2
  236 + assert any(x.get("routing") == "1" for x in search_calls)
  237 + assert any(x.get("index") == alias_name for x in search_calls)
  238 +
  239 +
  240 +@pytest.mark.unit
  241 +def test_publish_alias_and_cleanup_old_versions(monkeypatch):
  242 + fake_es = FakeESClient()
  243 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  244 +
  245 + tenant_id = "162"
  246 + alias_name = get_suggestion_alias_name(tenant_id)
  247 + fake_es.indices.update(
  248 + {
  249 + "search_suggestions_tenant_162_v20260310170000",
  250 + "search_suggestions_tenant_162_v20260310180000",
  251 + "search_suggestions_tenant_162_v20260310190000",
  252 + }
  253 + )
  254 + fake_es.aliases[alias_name] = ["search_suggestions_tenant_162_v20260310180000"]
  255 +
  256 + monkeypatch.setattr(builder, "_upsert_meta", lambda tenant_id, patch: None)
  257 +
  258 + result = builder._publish_alias(
  259 + tenant_id=tenant_id,
  260 + index_name="search_suggestions_tenant_162_v20260310190000",
  261 + keep_versions=2,
  262 + )
  263 +
  264 + assert result["current_index"] == "search_suggestions_tenant_162_v20260310190000"
  265 + assert fake_es.aliases[alias_name] == ["search_suggestions_tenant_162_v20260310190000"]
  266 + assert "search_suggestions_tenant_162_v20260310170000" not in fake_es.indices
  267 +
  268 +
  269 +@pytest.mark.unit
  270 +def test_incremental_bootstrap_when_no_active_index(monkeypatch):
  271 + fake_es = FakeESClient()
  272 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  273 +
  274 + from config import tenant_config_loader as tcl
  275 +
  276 + loader = tcl.get_tenant_config_loader()
  277 + loader._config = {
  278 + "default": {"primary_language": "en", "index_languages": ["en", "zh"]},
  279 + "tenants": {"162": {"primary_language": "en", "index_languages": ["en", "zh"]}},
  280 + }
  281 +
  282 + monkeypatch.setattr(
  283 + builder,
  284 + "rebuild_tenant_index",
  285 + lambda **kwargs: {"mode": "full", "tenant_id": kwargs["tenant_id"], "index_name": "v_idx"},
  286 + )
  287 +
  288 + result = builder.incremental_update_tenant_index(tenant_id="162", bootstrap_if_missing=True)
  289 + assert result["mode"] == "incremental"
  290 + assert result["bootstrapped"] is True
  291 + assert result["bootstrap_result"]["mode"] == "full"
  292 +
  293 +
  294 +@pytest.mark.unit
  295 +def test_incremental_updates_existing_index(monkeypatch):
  296 + fake_es = FakeESClient()
  297 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  298 +
  299 + from config import tenant_config_loader as tcl
  300 +
  301 + loader = tcl.get_tenant_config_loader()
  302 + loader._config = {
  303 + "default": {"primary_language": "en", "index_languages": ["en", "zh"]},
  304 + "tenants": {"162": {"primary_language": "en", "index_languages": ["en", "zh"]}},
  305 + }
  306 +
  307 + tenant_id = "162"
  308 + alias_name = get_suggestion_alias_name(tenant_id)
  309 + active_index = "search_suggestions_tenant_162_v20260310190000"
  310 + fake_es.aliases[alias_name] = [active_index]
  311 +
  312 + watermark = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
  313 + monkeypatch.setattr(builder, "_get_meta", lambda _tenant_id: {"last_incremental_watermark": watermark})
  314 + monkeypatch.setattr(builder, "_upsert_meta", lambda tenant_id, patch: None)
  315 +
  316 + monkeypatch.setattr(
  317 + builder,
  318 + "_build_incremental_deltas",
  319 + lambda **kwargs: {
  320 + ("en", "iphone 15"): QueryDelta(
  321 + tenant_id=tenant_id,
  322 + lang="en",
  323 + text="iphone 15",
  324 + text_norm="iphone 15",
  325 + delta_7d=2,
  326 + delta_30d=3,
  327 + lang_confidence=1.0,
  328 + lang_source="log_field",
  329 + lang_conflict=False,
  330 + )
  331 + },
  332 + )
  333 +
  334 + result = builder.incremental_update_tenant_index(
  335 + tenant_id=tenant_id,
  336 + bootstrap_if_missing=False,
  337 + overlap_minutes=10,
  338 + )
  339 +
  340 + assert result["mode"] == "incremental"
  341 + assert result["target_index"] == active_index
  342 + assert result["updated_terms"] == 1
  343 + assert result["bulk_result"]["failed"] == 0
184 344  
  345 + bulk_calls = [x for x in fake_es.calls if x.get("op") == "bulk_actions"]
  346 + assert len(bulk_calls) == 1
  347 + assert len(bulk_calls[0]["actions"]) == 1
... ...
utils/es_client.py
... ... @@ -49,7 +49,7 @@ class ESClient:
49 49  
50 50 # Add authentication if provided
51 51 if username and password:
52   - client_config['http_auth'] = (username, password)
  52 + client_config['basic_auth'] = (username, password)
53 53  
54 54 # Merge additional kwargs
55 55 client_config.update(kwargs)
... ... @@ -88,6 +88,54 @@ class ESClient:
88 88 logger.error(f"Failed to create index '{index_name}': {e}", exc_info=True)
89 89 return False
90 90  
  91 + def put_alias(self, index_name: str, alias_name: str) -> bool:
  92 + """Add alias for an index."""
  93 + try:
  94 + self.client.indices.put_alias(index=index_name, name=alias_name)
  95 + return True
  96 + except Exception as e:
  97 + logger.error(
  98 + "Failed to put alias '%s' for index '%s': %s",
  99 + alias_name,
  100 + index_name,
  101 + e,
  102 + exc_info=True,
  103 + )
  104 + return False
  105 +
  106 + def alias_exists(self, alias_name: str) -> bool:
  107 + """Check if alias exists."""
  108 + try:
  109 + return self.client.indices.exists_alias(name=alias_name)
  110 + except Exception as e:
  111 + logger.error("Failed to check alias exists '%s': %s", alias_name, e, exc_info=True)
  112 + return False
  113 +
  114 + def get_alias_indices(self, alias_name: str) -> List[str]:
  115 + """Get concrete indices behind alias."""
  116 + try:
  117 + result = self.client.indices.get_alias(name=alias_name)
  118 + return sorted(list((result or {}).keys()))
  119 + except Exception:
  120 + return []
  121 +
  122 + def update_aliases(self, actions: List[Dict[str, Any]]) -> bool:
  123 + """Atomically update aliases."""
  124 + try:
  125 + self.client.indices.update_aliases(body={"actions": actions})
  126 + return True
  127 + except Exception as e:
  128 + logger.error("Failed to update aliases: %s", e, exc_info=True)
  129 + return False
  130 +
  131 + def list_indices(self, pattern: str) -> List[str]:
  132 + """List indices by wildcard pattern."""
  133 + try:
  134 + result = self.client.indices.get(index=pattern, allow_no_indices=True)
  135 + return sorted(list((result or {}).keys()))
  136 + except Exception:
  137 + return []
  138 +
91 139 def delete_index(self, index_name: str) -> bool:
92 140 """
93 141 Delete an index.
... ... @@ -153,12 +201,37 @@ class ESClient:
153 201 'errors': [str(e)]
154 202 }
155 203  
  204 + def bulk_actions(self, actions: List[Dict[str, Any]]) -> Dict[str, Any]:
  205 + """
  206 + Execute generic bulk actions.
  207 +
  208 + Args:
  209 + actions: elasticsearch.helpers.bulk compatible action list
  210 + """
  211 + if not actions:
  212 + return {'success': 0, 'failed': 0, 'errors': []}
  213 + try:
  214 + success, failed = bulk(self.client, actions, raise_on_error=False)
  215 + return {
  216 + 'success': success,
  217 + 'failed': len(failed),
  218 + 'errors': failed
  219 + }
  220 + except Exception as e:
  221 + logger.error("Bulk actions failed: %s", e, exc_info=True)
  222 + return {
  223 + 'success': 0,
  224 + 'failed': len(actions),
  225 + 'errors': [str(e)],
  226 + }
  227 +
156 228 def search(
157 229 self,
158 230 index_name: str,
159 231 body: Dict[str, Any],
160 232 size: int = 10,
161   - from_: int = 0
  233 + from_: int = 0,
  234 + routing: Optional[str] = None,
162 235 ) -> Dict[str, Any]:
163 236 """
164 237 Execute search query.
... ... @@ -189,7 +262,8 @@ class ESClient:
189 262 index=index_name,
190 263 body=body,
191 264 size=size,
192   - from_=from_
  265 + from_=from_,
  266 + routing=routing,
193 267 )
194 268 except Exception as e:
195 269 logger.error(f"Search failed: {e}", exc_info=True)
... ...