diff --git a/.env b/.env index 3d775d1..6fa43db 100644 --- a/.env +++ b/.env @@ -1,6 +1,8 @@ # Elasticsearch Configuration -ES_HOST=http://120.76.41.98:9200 -ES_USERNAME=essa +# ES_HOST=http://120.76.41.98:9200 +# ES_USERNAME=essa +ES_HOST=http://localhost:9200 +ES_USERNAME=saas ES_PASSWORD=4hOaLaf41y2VuI8y # Redis Configuration (Optional) - AI 生产 10.200.16.14:6479 diff --git a/api/models.py b/api/models.py index e1e2d2a..472d6e7 100644 --- a/api/models.py +++ b/api/models.py @@ -197,14 +197,12 @@ class ImageSearchRequest(BaseModel): class SearchSuggestRequest(BaseModel): """搜索建议请求模型(框架,暂不实现)""" query: str = Field(..., min_length=1, description="搜索查询字符串") - size: int = Field(5, ge=1, le=20, description="返回建议数量") + size: int = Field(5, ge=1, le=50, description="返回建议数量") types: List[Literal["query", "product", "category", "brand"]] = Field( ["query"], description="建议类型:query(查询建议), product(商品建议), category(类目建议), brand(品牌建议)" ) language: Optional[str] = Field(None, description="请求语言(如 zh/en/ar/ru)") - with_results: bool = Field(True, description="是否返回每条 suggestion 的直达商品结果") - result_size: int = Field(3, ge=1, le=10, description="每条 suggestion 返回商品数量") class FacetValue(BaseModel): diff --git a/api/routes/search.py b/api/routes/search.py index 1650953..5a9401f 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -269,17 +269,15 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): @router.get("/suggestions", response_model=SearchSuggestResponse) async def search_suggestions( q: str = Query(..., min_length=1, description="搜索查询"), - size: int = Query(10, ge=1, le=200, description="建议数量(1-200)"), + size: int = Query(10, ge=1, le=50, description="建议数量(1-50)"), language: str = Query("en", description="请求语言,如 zh/en/ar/ru"), - with_results: bool = Query(True, description="是否附带每条 suggestion 的直达商品"), - result_size: int = Query(3, ge=1, le=10, description="每条 suggestion 直达商品数量"), debug: bool = Query(False, description="是否返回调试信息"), http_request: Request = None, ): """ 获取搜索建议(自动补全)。 - 获取搜索建议(自动补全,支持多语言与直达商品)。 + 获取搜索建议(自动补全,支持多语言)。 """ # Extract tenant_id (required) tenant_id = http_request.headers.get("X-Tenant-ID") if http_request else None @@ -305,8 +303,6 @@ async def search_suggestions( query=q, language=language, size=size, - with_results=with_results, - result_size=result_size, ) response = SearchSuggestResponse( query=result["query"], diff --git a/docs/Usage-Guide.md b/docs/Usage-Guide.md index 861c760..65ca0de 100644 --- a/docs/Usage-Guide.md +++ b/docs/Usage-Guide.md @@ -467,40 +467,41 @@ curl -X POST http://localhost:6002/search/image \ ## 8. Suggestion 索引与接口使用 -### 8.1 构建 Suggestion 索引(全量,多环境) +### 8.1 构建 Suggestion 索引(Phase 2:全量 + 增量) Suggestion 索引会从: - ES 商品索引:`title.{lang}`, `qanchors.{lang}` - MySQL 日志表:`shoplazza_search_log.query`(含 `language`、`request_params`) -聚合生成 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}`。 +聚合生成版本化索引并发布 alias: -在项目根目录执行(以 UAT 环境、tenant_id=162 为例): +- 物理索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_v` +- 读别名:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_current` -```bash -# 1. 切换到 UAT 配置(包含 ES_INDEX_NAMESPACE=uat_) -cp .env.uat .env - -# 2. 启动索引服务(如尚未启动) -./scripts/start_indexer.sh +在项目根目录执行(以 tenant_id=162 为例): -# 3. 为指定租户全量重建 suggestion 索引(会删除旧索引) -python main.py build-suggestions \ - --tenant-id 162 \ - --es-host http://localhost:9200 \ +```bash +# 1) 全量重建(版本化 + alias 发布) +./scripts/build_suggestions.sh 162 \ + --mode full \ --days 30 \ - --recreate -``` + --publish-alias \ + --keep-versions 2 -UAT 环境下,索引名为:`uat_search_suggestions_tenant_162`; -prod 环境下(ES_INDEX_NAMESPACE 为空),索引名为:`search_suggestions_tenant_162`。 +# 2) 增量更新(watermark + overlap) +./scripts/build_suggestions.sh 162 \ + --mode incremental \ + --overlap-minutes 30 +``` 可选参数: - `--days`:回溯日志天数(默认 30) - `--batch-size`:扫描商品索引的批大小(默认 500) - `--min-query-len`:参与 suggestion 的最小查询长度(默认 1) +- `--overlap-minutes`:增量窗口重叠分钟数(默认 30) +- `--bootstrap-if-missing`:增量模式下若缺少 active index 则自动全量初始化(默认 true) > 建议在商品索引构建完成、日志正常写入一段时间后执行一次全量构建,然后按天/小时增加增量构建任务。 @@ -510,11 +511,11 @@ prod 环境下(ES_INDEX_NAMESPACE 为空),索引名为:`search_suggestio ```bash # UAT 环境(本地或 UAT 集群) -curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en&with_results=true" \ +curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en" \ -H "X-Tenant-ID: 162" # PROD 环境(域名 / 端口按实际部署调整) -curl "https://api.yourdomain.com/search/suggestions?q=iph&size=5&language=en&with_results=true" \ +curl "https://api.yourdomain.com/search/suggestions?q=iph&size=5&language=en" \ -H "X-Tenant-ID: 162" ``` diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 5b2fbb9..00fb539 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -136,7 +136,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 接口 | HTTP Method | Endpoint | 说明 | |------|------|------|------| | 搜索 | POST | `/search/` | 执行搜索查询 | -| 搜索建议 | GET | `/search/suggestions` | 搜索建议(自动补全/热词,多语言 + 结果直达) | +| 搜索建议 | GET | `/search/suggestions` | 搜索建议(自动补全/热词,多语言) | | 即时搜索 | GET | `/search/instant` | 即时搜索预留接口(当前返回 `501 Not Implemented`) | | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | | 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引,仅推荐自测使用) | @@ -545,17 +545,15 @@ response = requests.post(url, headers=headers, json={"query": "芭比娃娃"}) ### 3.7 搜索建议接口 - **端点**: `GET /search/suggestions` -- **描述**: 返回搜索建议(自动补全/热词),支持多语言与“结果直达”(每条 suggestion 附带商品列表)。 +- **描述**: 返回搜索建议(自动补全/热词),支持多语言。 #### 查询参数 | 参数 | 类型 | 必填 | 默认值 | 描述 | |------|------|------|--------|------| | `q` | string | Y | - | 查询字符串(至少 1 个字符) | -| `size` | integer | N | 10 | 返回建议数量(1-200) | +| `size` | integer | N | 10 | 返回建议数量(1-50) | | `language` | string | N | `en` | 请求语言,如 `zh` / `en` / `ar` / `ru`,用于路由到对应语种 suggestion 索引 | -| `with_results` | bool | N | `true` | 是否为每条 suggestion 返回商品列表(结果直达) | -| `result_size` | integer | N | 3 | 每条 suggestion 返回的商品数量(1-10) | | `debug` | bool | N | `false` | 是否开启调试(目前主要用于排查 suggestion 排序与语言解析) | > **租户标识**:同 [3.1](#31-接口信息),通过请求头 `X-Tenant-ID` 或 query 参数 `tenant_id` 传递。 @@ -576,16 +574,7 @@ response = requests.post(url, headers=headers, json={"query": "芭比娃娃"}) "sources": ["query_log", "qanchor"], "lang_source": "log_field", "lang_confidence": 1.0, - "lang_conflict": false, - "products": [ - { - "spu_id": "12345", - "title": "iPhone 15 Pro Max", - "price": 999.0, - "image_url": "https://example.com/image.jpg", - "score": 3.21 - } - ] + "lang_conflict": false } ], "took_ms": 12 @@ -595,7 +584,7 @@ response = requests.post(url, headers=headers, json={"query": "芭比娃娃"}) #### 请求示例 ```bash -curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh&with_results=true" \ +curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh" \ -H "X-Tenant-ID: 162" ``` diff --git a/docs/搜索API速查表.md b/docs/搜索API速查表.md index 61179ef..20a358e 100644 --- a/docs/搜索API速查表.md +++ b/docs/搜索API速查表.md @@ -282,7 +282,7 @@ POST /search/image "size": 20 } -GET /search/suggestions?q=芭&size=5&language=zh&with_results=true +GET /search/suggestions?q=芭&size=5&language=zh GET /search/instant?q=玩具&size=5 # 当前返回 501 Not Implemented diff --git a/frontend/index.html b/frontend/index.html index 0faf099..158bc0c 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -284,7 +284,6 @@ url.searchParams.set('q', query); url.searchParams.set('size', '40'); url.searchParams.set('language', getSelectedLang()); - url.searchParams.set('with_results', 'false'); // 同时通过 query 参数传 tenant_id,方便在代理层丢失 header 时仍能识别租户 url.searchParams.set('tenant_id', tenantId); diff --git a/main.py b/main.py index c62f55a..4bb7e9c 100755 --- a/main.py +++ b/main.py @@ -104,7 +104,7 @@ def cmd_search(args): def cmd_build_suggestions(args): - """Build suggestion index for a tenant.""" + """Build/update suggestion index for a tenant.""" # Initialize ES client with optional authentication es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username") es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password") @@ -134,13 +134,27 @@ def cmd_build_suggestions(args): password=db_pass, ) builder = SuggestionIndexBuilder(es_client=es_client, db_engine=db_engine) - result = builder.rebuild_tenant_index( - tenant_id=args.tenant_id, - days=args.days, - recreate=args.recreate, - batch_size=args.batch_size, - min_query_len=args.min_query_len, - ) + if args.mode == "full": + result = builder.rebuild_tenant_index( + tenant_id=args.tenant_id, + days=args.days, + recreate=args.recreate, + batch_size=args.batch_size, + min_query_len=args.min_query_len, + publish_alias=args.publish_alias, + keep_versions=args.keep_versions, + use_versioned_index=not args.no_versioned_index, + ) + else: + result = builder.incremental_update_tenant_index( + tenant_id=args.tenant_id, + min_query_len=args.min_query_len, + fallback_days=args.incremental_fallback_days, + overlap_minutes=args.overlap_minutes, + bootstrap_if_missing=args.bootstrap_if_missing, + bootstrap_days=args.bootstrap_days, + batch_size=args.batch_size, + ) print(json.dumps(result, indent=2, ensure_ascii=False)) return 0 @@ -158,7 +172,7 @@ def main(): serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)') serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to') - serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + serve_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Serve-indexer command @@ -168,14 +182,14 @@ def main(): ) serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to') - serve_indexer_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + serve_indexer_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Search command search_parser = subparsers.add_parser('search', help='Test search from command line') search_parser.add_argument('query', help='Search query') search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') - search_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + search_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') search_parser.add_argument('--size', type=int, default=10, help='Number of results') search_parser.add_argument('--no-translation', action='store_true', help='Disable translation') search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings') @@ -184,17 +198,64 @@ def main(): # Suggestion build command suggest_build_parser = subparsers.add_parser( 'build-suggestions', - help='Build tenant suggestion index (full rebuild)' + help='Build tenant suggestion index (full/incremental)' ) suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID') - suggest_build_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + suggest_build_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') + suggest_build_parser.add_argument( + '--mode', + choices=['full', 'incremental'], + default='full', + help='Build mode: full rebuild or incremental update', + ) suggest_build_parser.add_argument('--days', type=int, default=360, help='Query log lookback days') suggest_build_parser.add_argument('--batch-size', type=int, default=500, help='Product scan batch size') suggest_build_parser.add_argument('--min-query-len', type=int, default=1, help='Minimum query length') suggest_build_parser.add_argument( + '--publish-alias', + action=argparse.BooleanOptionalAction, + default=True, + help='For full mode: publish alias to new versioned index (default: true)', + ) + suggest_build_parser.add_argument( + '--keep-versions', + type=int, + default=2, + help='For full mode: keep latest N versioned indices', + ) + suggest_build_parser.add_argument( + '--no-versioned-index', + action='store_true', + help='For full mode: write to legacy concrete index (not recommended)', + ) + suggest_build_parser.add_argument( '--recreate', action='store_true', - help='Delete and recreate suggestion index before build' + help='For legacy concrete index mode: delete and recreate target index before build', + ) + suggest_build_parser.add_argument( + '--incremental-fallback-days', + type=int, + default=7, + help='For incremental mode: default lookback days when no watermark', + ) + suggest_build_parser.add_argument( + '--overlap-minutes', + type=int, + default=30, + help='For incremental mode: overlap window to avoid late-arrival misses', + ) + suggest_build_parser.add_argument( + '--bootstrap-if-missing', + action=argparse.BooleanOptionalAction, + default=True, + help='For incremental mode: bootstrap with full build when active index is missing', + ) + suggest_build_parser.add_argument( + '--bootstrap-days', + type=int, + default=30, + help='For incremental mode bootstrap full build: query log lookback days', ) args = parser.parse_args() diff --git a/scripts/build_suggestions.sh b/scripts/build_suggestions.sh index a0095e4..ecd6456 100755 --- a/scripts/build_suggestions.sh +++ b/scripts/build_suggestions.sh @@ -3,14 +3,19 @@ # Convenience script to rebuild suggestion index for a tenant. # # Usage: -# ./scripts/build_suggestions.sh [--days 30] [--batch-size 500] [--min-query-len 1] [--es-host http://localhost:9200] +# # full rebuild + alias publish (default) +# ./scripts/build_suggestions.sh --mode full --days 30 +# +# # incremental update from watermark +# ./scripts/build_suggestions.sh --mode incremental # set -euo pipefail if [ $# -lt 1 ]; then echo "Usage: $0 [extra args...]" - echo "Example: $0 162 --days 30 --recreate" + echo "Example (full): $0 162 --mode full --days 30 --publish-alias" + echo "Example (incremental): $0 162 --mode incremental --overlap-minutes 30" exit 1 fi @@ -21,7 +26,11 @@ ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" cd "$ROOT_DIR" -python main.py build-suggestions \ +PY_BIN="${PYTHON_BIN:-$ROOT_DIR/.venv/bin/python}" +if [ ! -x "$PY_BIN" ]; then + PY_BIN="python3" +fi + +"$PY_BIN" main.py build-suggestions \ --tenant-id "$TENANT_ID" \ "$@" - diff --git a/scripts/rebuild_suggestions.sh b/scripts/rebuild_suggestions.sh new file mode 100755 index 0000000..3a62827 --- /dev/null +++ b/scripts/rebuild_suggestions.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [ $# -lt 1 ]; then + echo "Usage: $0 [sample_query] [sample_language]" + echo "Example: $0 162 shi en" + exit 1 +fi + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +TENANT_ID="$1" +SAMPLE_Q="${2:-shi}" +SAMPLE_LANG="${3:-en}" +API_BASE="${API_BASE_URL:-http://localhost:6002}" + +cd "$ROOT_DIR" + +PY_BIN="${PYTHON_BIN:-$ROOT_DIR/.venv/bin/python}" +if [ ! -x "$PY_BIN" ]; then + PY_BIN="python3" +fi + +if [ -z "${ES_HOST:-}" ]; then + ES_HOST="$("$PY_BIN" - <<'PY' +from dotenv import dotenv_values +print(dotenv_values('.env').get('ES_HOST') or 'http://localhost:9200') +PY +)" +fi + +if [ -z "${ES_USERNAME:-}" ] || [ -z "${ES_PASSWORD:-}" ]; then + readarray -t _ES_CREDS < <("$PY_BIN" - <<'PY' +from dotenv import dotenv_values +cfg = dotenv_values('.env') +print(cfg.get('ES_USERNAME') or '') +print(cfg.get('ES_PASSWORD') or '') +PY +) + ES_USERNAME="${ES_USERNAME:-${_ES_CREDS[0]}}" + ES_PASSWORD="${ES_PASSWORD:-${_ES_CREDS[1]}}" +fi + +if [ -n "${ES_USERNAME:-}" ] && [ -n "${ES_PASSWORD:-}" ]; then + AUTH=(-u "${ES_USERNAME}:${ES_PASSWORD}") +else + AUTH=() +fi + +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_${TENANT_ID}_current" + +echo "[1/4] Full rebuild tenant=${TENANT_ID} (versioned + alias publish)" +"$PY_BIN" main.py build-suggestions \ + --tenant-id "$TENANT_ID" \ + --es-host "$ES_HOST" \ + --mode full \ + --days 365 \ + --batch-size 500 \ + --publish-alias \ + --keep-versions 2 + +echo "[2/4] Incremental update tenant=${TENANT_ID}" +"$PY_BIN" main.py build-suggestions \ + --tenant-id "$TENANT_ID" \ + --es-host "$ES_HOST" \ + --mode incremental \ + --overlap-minutes 30 + +echo "[3/4] ES count + sample" +curl -sS "${AUTH[@]}" "$ES_HOST/$ALIAS_NAME/_count?pretty" +echo +curl -sS "${AUTH[@]}" "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{ + "size": 5, + "query": {"match_all": {}}, + "_source": ["lang", "text", "rank_score", "sources", "query_count_30d"] +}' +echo + +echo "[4/4] API smoke test" +curl -sS "$API_BASE/search/suggestions?q=${SAMPLE_Q}&size=10&language=${SAMPLE_LANG}" -H "X-Tenant-ID: ${TENANT_ID}" +echo diff --git a/suggestion/ARCHITECTURE_V2.md b/suggestion/ARCHITECTURE_V2.md new file mode 100644 index 0000000..b84e142 --- /dev/null +++ b/suggestion/ARCHITECTURE_V2.md @@ -0,0 +1,304 @@ +# Suggestion 架构方案 V2(仅 Suggest,去除结果直达) + +## 0. 结论 + +本方案将 Suggest 设计为**独立高性能检索系统**,只返回建议词,不再返回商品卡片,也不做历史兼容。 + +- 只保留 `/search/suggestions` 的词级自动补全能力 +- 完全移除 `with_results/result_size/products[]` 链路 +- 多语言优先,支持高并发、低延迟、可持续演进 + +--- + +## 1. 当前实现的关键问题(基于现有代码审视) + +1. 在线链路曾包含“suggest -> 二次商品查询”,属于典型 N+1 放大,QPS 上升后延迟和 ES 负载都不稳定。 +2. `builder.py` 全量构建使用“大量 in-memory 聚合 + fetchall”,大租户下内存风险高。 +3. 查询参数上限过大(原 `size<=200`),不符合自动补全接口性能边界。 +4. 文档与实现长期混合(README 仍包含结果直达),导致认知不一致。 +5. 多语言归一化仍偏基础(仅 lower/空白折叠),对 Unicode、变音符、跨语系兼容不够。 + +--- + +## 2. 目标与 SLO + +### 2.1 业务目标 + +- 输入时实时返回高相关建议词(query suggestion) +- 多语言稳定(至少覆盖租户配置 `index_languages`) +- 支持词级排序和运营治理(黑白名单、降噪、降权) + +### 2.2 性能目标(建议) + +- P50 < 10ms,P95 < 25ms,P99 < 50ms(ES 查询耗时,不含网关) +- 单集群支持高并发(千级 QPS 可横向扩展) +- 数据新鲜度:增量 5-15 分钟可见 + +--- + +## 3. 总体架构 + +## 3.1 在线路径(单跳) + +Client -> API `/search/suggestions` -> ES `search_suggestions_v2` -> 返回 suggestions + +原则: + +- **单次 ES 查询完成主路径**(可选双召回融合,但仍在同一次 API 请求内完成) +- 不调用 `search_products`,不返回商品结果 +- 通过 `routing=tenant_id` 避免跨分片 fan-out + +## 3.2 离线路径(构建) + +数据源: + +- 商品字段:`title.{lang}`、`qanchors.{lang}` +- 搜索日志:`shoplazza_search_log`(含 `language/request_params`) +- 行为信号(可选增强):点击、加购、下单 + +产物: + +- Suggest 文档(`tenant_id + lang + text_norm` 唯一) +- completion + prefix 检索字段 +- 排序特征(热度、近期度、质量分) + +发布方式: + +- 写入新物理索引(版本化) +- 原子切换 alias(零停机) + +--- + +## 4. 索引设计(ES) + +## 4.1 索引组织 + +推荐两级策略: + +1. 默认:环境级共享索引(降低海量租户 index 数量) +2. 大租户:可升级为租户独享索引(隔离资源) + +统一通过 alias 暴露: + +- `search_suggestions_v2_current` + +## 4.2 Mapping(核心字段) + +```json +{ + "settings": { + "number_of_shards": 3, + "number_of_replicas": 1, + "refresh_interval": "30s" + }, + "mappings": { + "properties": { + "tenant_id": { "type": "keyword" }, + "lang": { "type": "keyword" }, + "text": { "type": "keyword" }, + "text_norm": { "type": "keyword" }, + "status": { "type": "byte" }, + "sources": { "type": "keyword" }, + + "query_count_7d": { "type": "integer" }, + "query_count_30d": { "type": "integer" }, + "ctr_30d": { "type": "float" }, + "order_rate_30d": { "type": "float" }, + "rank_score": { "type": "float" }, + + "suggest": { + "type": "completion", + "contexts": [ + { "name": "tenant", "type": "category" }, + { "name": "lang", "type": "category" } + ] + }, + + "sat": { + "properties": { + "zh": { "type": "search_as_you_type", "analyzer": "index_ik" }, + "en": { "type": "search_as_you_type", "analyzer": "english" }, + "ar": { "type": "search_as_you_type", "analyzer": "arabic" } + } + }, + + "updated_at": { "type": "date" } + } + } +} +``` + +说明: + +- `completion` 负责极速前缀命中(主召回) +- `search_as_you_type` 负责多词前缀和召回兜底 +- `contexts` 强制租户与语言隔离 + +--- + +## 5. 多语言策略 + +1. 语言归属优先级:`log.language > request_params.language > 脚本识别 > tenant.primary_language` +2. 统一归一化:NFKC、大小写折叠、空白折叠、标点清洗 +3. 分词器按语言配置: + - 中文:IK/ANSJ(与主索引保持一致) + - 拉丁语系:对应内置 analyzer + - 未覆盖语种:`standard + ICU folding` 兜底 +4. 保证写入语言必须在租户 `index_languages` 内 + +--- + +## 6. 在线检索策略(高性能) + +## 6.1 双通道召回(推荐) + +1. 通道 A:`completion suggester`(prefix,skip_duplicates) +2. 通道 B:`multi_match(type=bool_prefix)` on `search_as_you_type` +3. 融合去重:按 `text_norm` 去重,按最终分排序截断 + +## 6.2 查询约束 + +- 默认 `size=10`,最大 `size=50` +- `track_total_hits=false` +- `_source` 仅返回必要字段(`text/lang/rank_score/sources`) +- `routing=tenant_id` + +## 6.3 打分建议 + +```text +final_score = + es_score + + a1*log1p(query_count_30d) + + a2*log1p(query_count_7d) + + a3*ctr_30d + + a4*order_rate_30d + + a5*freshness_decay +``` + +--- + +## 7. 构建与发布 + +## 7.1 构建模式 + +- 每日全量:重建全量特征,清理脏词 +- 小时级增量:只处理新日志窗口 + +## 7.2 工程要求 + +- 禁止 `fetchall` 全量入内存,改为流式读取(分页/游标) +- ES 扫描采用 `search_after` 流式聚合 +- 批量写入采用 bulk(分块 + 重试 + 失败重放) + +## 7.3 发布策略 + +1. `search_suggestions_v2_YYYYMMDDHHmm` 写入完成 +2. 校验 count/抽样查询/核心词覆盖 +3. alias 原子切换到新索引 +4. 保留上一个版本用于快速回滚 + +--- + +## 8. API 契约(V2) + +请求: + +- `GET /search/suggestions` +- 参数:`q`、`language`、`size` +- Header:`X-Tenant-ID` + +响应: + +```json +{ + "query": "iph", + "language": "en", + "resolved_language": "en", + "suggestions": [ + { + "text": "iphone 15", + "lang": "en", + "score": 8.31, + "rank_score": 6.72, + "sources": ["query_log", "qanchor"] + } + ], + "took_ms": 12 +} +``` + +删除项(明确不支持): + +- `with_results` +- `result_size` +- `products[]` + +--- + +## 9. 观测与治理 + +核心监控: + +- QPS、P50/P95/P99、错误率 +- 空结果率(按语言、按租户) +- suggestion 覆盖率(top query 是否命中) +- 语言冲突率(log vs request_params) +- 噪声词比例、黑名单命中率 + +治理机制: + +- 黑名单:强制下线 +- 白名单:强制保留并可加权 +- 最小热度阈值:低频垃圾词过滤 +- 时间衰减:过期词自动下沉 + +--- + +## 10. 与官方最佳实践对齐(ES) + +本方案直接采用以下官方建议: + +1. `completion` 适合高性能自动补全,支持 `skip_duplicates` 与上下文过滤。 +2. `search_as_you_type + bool_prefix` 是官方推荐的 as-you-type 查询方式。 +3. `edge_ngram` 仅用于索引时分词,查询时应用普通 analyzer(`search_analyzer`)。 +4. 多语言场景使用 ICU Analysis 插件增强 Unicode 处理。 +5. 通过 `routing` 将租户请求路由到单分片,降低 fan-out。 + +--- + +## 11. 分阶段落地 + +1. Phase 1(本次):去除结果直达,稳定 Suggest 单能力 +2. Phase 2:流式增量构建 + alias 原子发布 +3. Phase 3:行为信号排序(CTR/CVR)+ 运营治理台 +4. Phase 4:大租户独享索引自动升降级 + +--- + +## 12. Phase 2 落地命令(当前仓库) + +全量重建(版本化索引 + alias 发布): + +```bash +python main.py build-suggestions \ + --tenant-id 162 \ + --mode full \ + --days 365 \ + --publish-alias \ + --keep-versions 2 +``` + +增量更新(基于 watermark): + +```bash +python main.py build-suggestions \ + --tenant-id 162 \ + --mode incremental \ + --overlap-minutes 30 +``` + +一键脚本(全量 + 增量 + ES/API 验证): + +```bash +./scripts/rebuild_suggestions.sh 162 +``` diff --git a/suggestion/README.md b/suggestion/README.md index 63de3a8..645b4c1 100644 --- a/suggestion/README.md +++ b/suggestion/README.md @@ -1,536 +1,46 @@ -# Suggestion 设计文档 +# Suggestion 模块说明(统一入口) -## 文档导航 +本文档是 suggestion 模块的统一入口,遵循 `docs/DEVELOPER_GUIDE.md` 的“单一入口、避免分叉”原则。 -- `README.md`(本文):完整方案设计(架构、索引、构建、查询、验证) -- `RUNBOOK.md`:日常运行手册(如何构建、如何回归、如何发布) -- `TROUBLESHOOTING.md`:故障排查手册(空结果、tenant 丢失、ES 401、版本未生效等) +## 1. 当前状态(Phase 2) -本文档定义 `search_suggestions` 独立索引方案,用于支持多语言自动补全(suggestion)与结果直达。 +- 仅保留 Suggest 自动补全能力 +- 不支持结果直达(`with_results` / `result_size` / `products[]` 已移除) +- 索引采用版本化发布: + - 物理索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_v` + - 读别名:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}_current` +- 支持增量更新(watermark + overlap) -## 1. 背景与目标 +## 2. 文档导航(唯一推荐顺序) -当前搜索系统已具备多语言商品索引(`title.{lang}`、`qanchors.{lang}`)与主搜索能力。为了实现输入中实时下拉 suggestion,需要新增一套面向“词”的能力。 +1. `ARCHITECTURE_V2.md`:架构与设计原则 +2. `RUNBOOK.md`:构建/发布/验证流程 +3. `TROUBLESHOOTING.md`:常见问题排查 -核心目标: +## 3. 命令入口 -- 在不耦合主搜索链路的前提下,提供低延迟 suggestion(实时输入)。 -- 支持多语言,按请求语言路由到对应 suggestion 语种。 -- 支持“结果直达”:每条 suggestion 可附带候选商品列表(通过二次查询 `search_products` 完成)。 -- 支持后续词级排序演进(行为信号、运营控制、去噪治理)。 - -非目标(当前阶段): - -- 不做个性化推荐(用户级 personalization)。 -- 不引入复杂在线学习排序服务。 - -## 2. 总体架构 - -采用双索引架构(支持多环境 namespace 前缀): - -- 商品索引:`{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}` -- 建议词索引:`{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}` - -在线查询主路径: - -1. 仅查询 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}` 得到 suggestion 列表。 -2. 对每条 suggestion 进行“结果直达”的二次查询(`msearch`)到 `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`: - - 使用 suggestion 文本对 `title.{lang}` / `qanchors.{lang}` 执行 `term` / `match_phrase_prefix` 组合查询。 -3. 回填每条 suggestion 的商品卡片列表(例如每条 3~5 个)。 - -## 3. API 设计 - -建议保留并增强现有接口:`GET /search/suggestions` - -### 3.1 请求参数 - -- `q` (string, required): 用户输入前缀 -- `size` (int, optional, default=10, max=20): 返回 suggestion 数量 -- `language` (string, required): 请求语言(如 `zh`, `en`, `ar`, `ru`) -- `with_results` (bool, optional, default=true): 是否附带每条 suggestion 的直达商品 -- `result_size` (int, optional, default=3, max=10): 每条 suggestion 附带商品条数 -- `debug` (bool, optional, default=false): 是否返回调试信息 - -Header: - -- `X-Tenant-ID` (required) - -### 3.2 响应结构 - -```json -{ - "query": "iph", - "language": "en", - "suggestions": [ - { - "text": "iphone 15", - "lang": "en", - "score": 12.37, - "sources": ["query_log", "qanchor"], - "products": [ - { - "spu_id": "12345", - "title": "iPhone 15 Pro Max", - "price": 999.0, - "image_url": "https://..." - } - ] - } - ], - "took_ms": 14, - "debug_info": {} -} -``` - -## 4. 索引设计:`search_suggestions_tenant_{tenant_id}` - -文档粒度:`tenant_id + lang + text_norm` 唯一一条文档。 - -### 4.1 字段定义(建议) - -- `tenant_id` (`keyword`) -- `lang` (`keyword`) -- `text` (`keyword`):展示文本 -- `text_norm` (`keyword`):归一化文本(去重键) -- `sources` (`keyword[]`):来源集合,取值:`title` / `qanchor` / `query_log` -- `title_doc_count` (`integer`):来自 title 的命中文档数 -- `qanchor_doc_count` (`integer`):来自 qanchor 的命中文档数 -- `query_count_7d` (`integer`):7 天搜索词计数 -- `query_count_30d` (`integer`):30 天搜索词计数 -- `rank_score` (`float`):离线计算总分 -- `status` (`byte`):1=online, 0=offline -- `updated_at` (`date`) - -用于召回: - -- `completion` (`object`): - - `completion.{lang}`: `completion` 类型(按语言设置 analyzer) -- `sat` (`object`): - - `sat.{lang}`: `search_as_you_type`(增强多词前缀效果) - -可选字段(用于加速直达): - -- `top_spu_ids` (`keyword[]`):预计算商品候选 id - -### 4.2 Mapping 样例(简化) - -```json -{ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "properties": { - "tenant_id": { "type": "keyword" }, - "lang": { "type": "keyword" }, - "text": { "type": "keyword" }, - "text_norm": { "type": "keyword" }, - "sources": { "type": "keyword" }, - "title_doc_count": { "type": "integer" }, - "qanchor_doc_count": { "type": "integer" }, - "query_count_7d": { "type": "integer" }, - "query_count_30d": { "type": "integer" }, - "rank_score": { "type": "float" }, - "status": { "type": "byte" }, - "updated_at": { "type": "date" }, - "completion": { - "properties": { - "zh": { "type": "completion", "analyzer": "index_ansj", "search_analyzer": "query_ansj" }, - "en": { "type": "completion", "analyzer": "english" }, - "ar": { "type": "completion", "analyzer": "arabic" }, - "ru": { "type": "completion", "analyzer": "russian" } - } - }, - "sat": { - "properties": { - "zh": { "type": "search_as_you_type", "analyzer": "index_ansj" }, - "en": { "type": "search_as_you_type", "analyzer": "english" }, - "ar": { "type": "search_as_you_type", "analyzer": "arabic" }, - "ru": { "type": "search_as_you_type", "analyzer": "russian" } - } - }, - "top_spu_ids": { "type": "keyword" } - } - } -} -``` - -说明:实际支持语种需与 `search_products` 已支持语种保持一致。 - -## 5. 全量建索引逻辑(核心) - -全量程序职责:扫描商品 `title/qanchors` 与搜索日志 `query`,聚合后写入 `search_suggestions`。 - -输入: - -- `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}` 文档 -- MySQL 表:`shoplazza_search_log` - -输出: - -- `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}` 全量文档 - -### 5.1 流程 - -1. 创建/重建 `{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}`。 -2. 遍历 `{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`(`scroll` 或 `search_after`): - - 提取每个商品的 `title.{lang}`、`qanchors.{lang}`。 - - 归一化文本(NFKC、trim、lower、空白折叠)。 - - 产出候选词并累加: - - `title_doc_count += 1` - - `qanchor_doc_count += 1` - - `sources` 加来源。 -3. 读取日志: - - SQL 拉取 `tenant_id` 下时间窗数据(如 30 天)。 - - 对每条 `query` 解析语言归属(优先 `shoplazza_search_log.language`,其次 `request_params.language`,见第 6 节)。 - - 累加 `query_count_7d` / `query_count_30d`,`sources` 加 `query_log`。 -4. 清洗与过滤: - - 去空、去纯符号、长度阈值过滤。 - - 可选黑名单过滤(运营配置)。 -5. 计算 `rank_score`(见第 7 节)。 -6. 组装文档: - - 写 `completion.{lang}` + `sat.{lang}`。 - - `_id = md5(tenant_id|lang|text_norm)`。 -7. 批量写入(bulk upsert)。 - -### 5.2 伪代码 - -```python -for tenant_id in tenants: - agg = {} # key: (lang, text_norm) - - for doc in scan_es_products(tenant_id): - for lang in index_languages(tenant_id): - add_from_title(agg, doc.title.get(lang), lang, doc.spu_id) - add_from_qanchor(agg, doc.qanchors.get(lang), lang, doc.spu_id) - - for row in fetch_search_logs(tenant_id, days=30): - lang, conf = resolve_query_lang( - query=row.query, - log_language=row.language, - request_params_json=row.request_params, - tenant_id=tenant_id - ) - if not lang: - continue - add_from_query_log(agg, row.query, lang, row.create_time) - - docs = [] - for (lang, text_norm), item in agg.items(): - if not pass_filters(item): - continue - item.rank_score = compute_rank_score(item) - docs.append(to_suggestion_doc(tenant_id, lang, item)) - - bulk_upsert(index=f"{ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}", docs=docs) -``` - -## 6. 日志语言解析策略(已新增 language 字段) - -现状:`shoplazza_search_log` 已新增 `language` 字段,且 `request_params`(JSON)中也包含 `language`。 -因此全量程序不再以“纯离线识别”为主,而是采用“日志显式语言优先”的三级策略。 - -### 6.1 语言解析优先级 - -1. **一级:`shoplazza_search_log.language`(最高优先级)** - - 若值存在且合法,直接作为 query 归属语言。 -2. **二级:`request_params.language`(JSON 兜底)** - - 当表字段为空/非法时,解析 `request_params` JSON 中的 `language`。 -3. **三级:离线识别(最后兜底)** - - 仅在前两者都缺失时启用: - - 脚本直判(CJK/Arabic/Cyrillic) - - 轻量语言识别器(拉丁语) - -### 6.2 一致性校验(推荐) - -当 `shoplazza_search_log.language` 与 `request_params.language` 同时存在但不一致时: - -- 默认采用 `shoplazza_search_log.language` -- 记录 `lang_conflict=true` 用于审计 -- 输出监控指标(冲突率) - -### 6.3 置信度与约束 - -对于一级/二级来源: - -- `lang_confidence=1.0` -- `lang_source=log_field` 或 `lang_source=request_params` - -对于三级离线识别: - -- `confidence >= 0.8`:写入 top1 -- `0.5 <= confidence < 0.8`:写入 top1(必要时兼容 top2 降权) -- `< 0.5`:写入租户 `primary_language`(降权) - -统一约束: - -- 最终写入语言必须属于租户 `index_languages` - -建议额外存储: - -- `lang_confidence`(float) -- `lang_source`(`log_field`/`request_params`/`script`/`model`/`default`) -- `lang_conflict`(bool) - -便于后续质量审计与数据回溯。 - -## 7. 排序分数设计(离线) - -建议采用可解释线性组合: - -```text -rank_score = - w1 * log1p(query_count_30d) - + w2 * log1p(query_count_7d) - + w3 * log1p(qanchor_doc_count) - + w4 * log1p(title_doc_count) - + w5 * business_bonus -``` - -推荐初始权重(可配置): - -- `w1=1.8`, `w2=1.2`, `w3=1.0`, `w4=0.6`, `w5=0.3` - -说明: - -- 搜索日志信号优先级最高(最接近真实用户意图)。 -- `qanchor` 高于 `title`(更偏 query 风格)。 -- `business_bonus` 可接入销量、库存可售率等轻量业务信号。 - -## 8. 在线查询逻辑(suggestion) - -主路径只查 `search_suggestions`。 - -### 8.1 Suggestion 查询 DSL(示例) - -```json -{ - "size": 10, - "query": { - "function_score": { - "query": { - "bool": { - "filter": [ - { "term": { "lang": "en" } }, - { "term": { "status": 1 } } - ], - "should": [ - { - "multi_match": { - "query": "iph", - "type": "bool_prefix", - "fields": [ - "sat.en", - "sat.en._2gram", - "sat.en._3gram" - ] - } - } - ], - "minimum_should_match": 1 - } - }, - "field_value_factor": { - "field": "rank_score", - "factor": 1.0, - "modifier": "log1p", - "missing": 0 - }, - "boost_mode": "sum", - "score_mode": "sum" - } - }, - "_source": [ - "text", - "lang", - "rank_score", - "sources", - "top_spu_ids" - ] -} -``` - -可选:completion 方式(极低延迟)也可作为同接口内另一条召回通道,再与上面结果融合去重。 - -## 9. 结果直达(二次查询) - -`with_results=true` 时,对每条 suggestion 的 `text` 做二次查询到 `search_products_tenant_{tenant_id}`。 - -推荐使用 `msearch`,每条 suggestion 一个子查询: - -- `term`(精确)命中 `qanchors.{lang}.keyword`(若存在 keyword 子字段) -- `match_phrase_prefix` 命中 `title.{lang}` -- 可加权:`qanchors` 命中权重高于 `title` -- 每条 suggestion 返回 `result_size` 条商品 - -若未来希望进一步降在线复杂度,可改为离线写入 `top_spu_ids` 并在在线用 `mget` 回填。 - -## 10. 数据治理与运营控制 - -建议加入以下机制: - -- 黑名单词:人工屏蔽垃圾词、敏感词 -- 白名单词:活动词、品牌词强制保留 -- 最小阈值:低频词不过线(例如 `query_count_30d < 2` 且无 qanchor/title 支撑) -- 去重规则:`text_norm` 维度强去重 -- 更新策略:每日全量 + 每小时增量(后续) - -## 11. 实施里程碑 - -M1(快速上线): - -- 建 `search_suggestions` 索引 -- 全量程序:`title + qanchors + query_log` -- `/search/suggestions` 仅查 suggestion,不带直达 - -M2(增强): - -- 增加二次查询直达商品(`msearch`) -- 引入语言置信度审计报表 -- 加黑白名单与去噪配置 - -M3(优化): - -- completion + bool_prefix 双通道融合 -- 增量构建任务(小时级) -- 排序参数在线配置化 - -## 12. 关键风险与规避 - -- 日志语言字段质量问题导致错写:通过 `log_field > request_params > model` 三级策略与冲突审计规避 -- 高频噪声词上浮:黑名单 + 最小阈值 + 分数截断 -- 直达二次查询成本上升:控制 `size/result_size`,优先 `msearch` -- 多语言字段不一致:统一语言枚举与映射生成逻辑,避免手写散落 - ---- - -## 13. 实验与验证建议 - -以租户 `tenant_id=171` 为例,推荐如下验证流程(其它租户 / 环境同理,可通过 ES_INDEX_NAMESPACE 区分 prod / uat / test): - -### 13.1 构建索引 - -```bash -./scripts/build_suggestions.sh 171 --days 30 --recreate -``` - -期望 CLI 输出类似(prod 环境,ES_INDEX_NAMESPACE 为空): - -```json -{ - "tenant_id": "171", - "index_name": "search_suggestions_tenant_171", - "total_candidates": 61, - "indexed_docs": 61, - "bulk_result": { - "success": 61, - "failed": 0, - "errors": [] - } -} -``` - -含义: - -- `total_candidates`:聚合到的词候选总数(按 `(lang,text_norm)` 去重) -- `indexed_docs`:实际写入 ES 的文档数(通常与 `total_candidates` 相同) -- `bulk_result`:bulk 写入统计 - -### 13.2 检查索引结构 - -```bash -# prod / 本地环境:ES_INDEX_NAMESPACE 为空 -curl "http://localhost:9200/search_suggestions_tenant_171/_mapping?pretty" -curl "http://localhost:9200/search_suggestions_tenant_171/_count?pretty" -curl "http://localhost:9200/search_suggestions_tenant_171/_search?size=5&pretty" -d '{ - "query": { "match_all": {} } -}' - -# UAT 环境:假设 ES_INDEX_NAMESPACE=uat_ -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_mapping?pretty" -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_count?pretty" -curl "http://localhost:9200/uat_search_suggestions_tenant_171/_search?size=5&pretty" -d '{ - "query": { "match_all": {} } -}' -``` - -重点确认: - -- 是否存在 `lang/text/text_norm/sources/rank_score/completion/sat` 等字段。 -- 文档中 `lang` 是否只落在租户配置的 `index_languages` 范围内。 -- 常见 query(如你期望的热词)是否有对应文档,`query_count_*` 是否大致正确。 - -### 13.3 通过 API 验证 suggestion 行为 - -启动后端: - -```bash -python main.py serve --es-host http://localhost:9200 --port 6002 -``` - -示例调用(中文): +- 全量或增量构建: ```bash -curl "http://localhost:6002/search/suggestions?q=玩具&size=5&language=zh&with_results=true" \ - -H "X-Tenant-ID: 171" +./scripts/build_suggestions.sh --mode full +./scripts/build_suggestions.sh --mode incremental ``` -示例调用(英文): +- 一键重建 + 验证: ```bash -curl "http://localhost:6002/search/suggestions?q=iph&size=5&language=en&with_results=true" \ - -H "X-Tenant-ID: 171" +./scripts/rebuild_suggestions.sh ``` -预期: - -- `resolved_language` 与传入 `language` 一致或回落到租户主语言。 -- 返回若干 `suggestions[]`,每条包含: - - `text/lang/score/rank_score/sources` - - `products[]` 为直达商品(数量由 `result_size` 控制)。 - -如需进一步排查,可对比: - -- 某个 suggestion 的 `text` 与 `shoplazza_search_log.query` 的出现频次。 -- 该 suggestion 的 `products` 是否与主搜索接口 `POST /search/` 对同 query 的 topN 结果大体一致。 - -### 13.4 语言归属与多语言检查 - -挑选典型场景: +## 4. API 约定(简版) -- 纯中文 query(如商品中文标题)。 -- 纯英文 query(如品牌/型号)。 -- 混合或无明显语言的 query。 +- 端点:`GET /search/suggestions` +- 参数:`q`, `size`, `language` +- Header:`X-Tenant-ID` -验证点: - -- 文档 `lang` 与期望语言是否匹配。 -- `lang_source` 是否按优先级反映来源: - - `log_field` > `request_params` > `script/model/default` -- 如存在 `lang_conflict=true` 的案例,采样检查日志中 `language` 与 `request_params.language` 是否存在冲突。 - -## 14. 自动化测试建议 - -已提供基础单元测试(见 `tests/test_suggestions.py`): - -- 语言解析逻辑: - - `test_resolve_query_language_prefers_log_field` - - `test_resolve_query_language_uses_request_params_when_log_missing` - - `test_resolve_query_language_fallback_to_primary` -- 在线查询逻辑: - - `test_suggestion_service_basic_flow`:使用 `FakeESClient` 验证 suggestion + 结果直达商品整体流程。 - -推荐在本地环境中执行: +示例: ```bash -pytest tests/test_suggestions.py -q +curl "http://localhost:6002/search/suggestions?q=shi&size=10&language=en" \ + -H "X-Tenant-ID: 162" ``` - -后续可根据业务需要补充: - -- 排序正确性测试(构造不同 `query_count_*`、`title/qanchor_doc_count`)。 -- 多语言覆盖测试(zh/en/ar/ru 等,结合租户 `index_languages`)。 -- 简单性能回归(单次查询时延、QPS 与 P95/P99 录制)。 - -本设计优先保证可落地与可演进:先以独立 suggestion 索引跑通主能力,再逐步增强排序与在线性能。 diff --git a/suggestion/RUNBOOK.md b/suggestion/RUNBOOK.md index 7047646..502c6aa 100644 --- a/suggestion/RUNBOOK.md +++ b/suggestion/RUNBOOK.md @@ -1,20 +1,20 @@ -# Suggestion 运行手册(Runbook) +# Suggestion 运行手册(Phase 2) -本文档面向研发/测试/运维,提供 suggestion 功能的标准操作流程。 +本文档面向研发/测试/运维,提供 Suggestion 功能在 Phase 2 的标准操作流程。 ## 1. 适用范围 -- Suggestion 索引构建:`search_suggestions_tenant_{tenant_id}` +- Suggestion 全量构建(版本化索引 + alias 原子发布) +- Suggestion 增量更新(watermark) - Suggestion 查询接口:`GET /search/suggestions` -- 前端自动补全(`frontend/index.html`)联调 -## 2. 依赖前置 +## 2. 前置依赖 确保以下服务和配置可用: - Elasticsearch(开启鉴权时需提供账号密码) -- MySQL(表 `shoplazza_search_log` 可访问) -- API 服务(端口默认 6002) +- MySQL(可访问 `shoplazza_search_log`) +- API 服务(默认 `6002`) 建议环境变量: @@ -29,108 +29,103 @@ DB_USERNAME=... DB_PASSWORD=... ``` -## 3. 全量构建流程 +## 3. 全量构建(推荐发布流程) -### 3.1 构建指定租户 suggestion 索引 +### 3.1 执行 ```bash -./scripts/build_suggestions.sh 171 --days 365 --recreate +./scripts/build_suggestions.sh 162 \ + --mode full \ + --days 365 \ + --publish-alias \ + --keep-versions 2 ``` -说明: - -- `--days`:日志回溯窗口 -- `--recreate`:删除旧索引并重建 - ### 3.2 预期输出 -示例: - -```json -{ - "tenant_id": "171", - "index_name": "search_suggestions_tenant_171", - "total_candidates": 336, - "indexed_docs": 336, - "bulk_result": { - "success": 336, - "failed": 0, - "errors": [] - } -} -``` - -判定标准: +输出包含关键字段: -- `indexed_docs > 0` -- `bulk_result.failed = 0` +- `mode=full` +- `index_name=..._vYYYYMMDDHHMMSS` +- `alias_published=true` +- `alias_publish.current_index` 指向新索引 +- `bulk_result.failed=0` -## 4. ES 验证步骤 +## 4. 增量更新(watermark) -> 若 ES 开启鉴权,请使用 `-u "$ES_USERNAME:$ES_PASSWORD"`。 +### 4.1 执行 ```bash -curl -u "$ES_USERNAME:$ES_PASSWORD" \ - "$ES_HOST/search_suggestions_tenant_171/_count?pretty" - -curl -u "$ES_USERNAME:$ES_PASSWORD" \ - "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty" - -curl -u "$ES_USERNAME:$ES_PASSWORD" \ - "$ES_HOST/search_suggestions_tenant_171/_search?pretty" -d '{ - "size": 10, - "query": {"match_all": {}}, - "_source": ["lang","text","sources","query_count_30d","rank_score"] - }' +./scripts/build_suggestions.sh 162 \ + --mode incremental \ + --overlap-minutes 30 ``` -重点检查: +### 4.2 预期输出 + +输出包含关键字段: -- 字段是否齐全(`lang/text/sat/completion/rank_score`) -- 文档是否覆盖预期语种(如 `zh/en`) +- `mode=incremental` +- `target_index`(当前 alias 对应索引) +- `query_window.since/until` +- `updated_terms` +- `bulk_result.failed=0` -## 5. API 回归步骤 +## 5. ES 验证步骤 -### 5.1 启动后端 +> 若 ES 开启鉴权,请附带 `-u "$ES_USERNAME:$ES_PASSWORD"`。 ```bash -bash scripts/start_backend.sh +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current" + +curl "$ES_HOST/$ALIAS_NAME/_count?pretty" + +curl "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{ + "size": 10, + "query": {"match_all": {}}, + "_source": ["lang", "text", "rank_score", "sources", "query_count_30d"] +}' ``` -### 5.2 调用 suggestion 接口 +重点检查: + +- alias 是否可查(说明发布成功) +- 文档数 > 0 +- 关键字段存在:`lang/text/rank_score/completion/sat` + +## 6. API 回归步骤 ```bash -curl "http://localhost:6002/search/suggestions?q=shirt&size=5&language=en&with_results=false" \ - -H "X-Tenant-ID: 171" +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en" \ + -H "X-Tenant-ID: 162" -curl "http://localhost:6002/search/suggestions?q=2025&size=5&language=zh&with_results=false" \ - -H "X-Tenant-ID: 171" +curl "http://localhost:6002/search/suggestions?q=玩具&size=10&language=zh" \ + -H "X-Tenant-ID: 162" ``` 通过标准: - 接口返回 `200` -- `resolved_language` 合理 -- `suggestions` 非空(针对已知存在的 query) +- `suggestions` 非空(针对已知存在 query) +- `took_ms` 合理 -## 6. 前端联调步骤 +## 7. 一键验证脚本 -1. 打开 `http://localhost:6002/` -2. 选择租户(例如 `171`) -3. 输入已知前缀词(如 `shirt` / `Ekouaer` / `2025`) -4. 观察下拉 suggestion 是否出现 - -注意: +```bash +./scripts/rebuild_suggestions.sh 162 +``` -- 前端已同时透传: - - Header:`X-Tenant-ID` - - Query:`tenant_id` +该脚本执行: -## 7. 发布检查清单 +1. 全量重建并发布 alias +2. 增量更新 +3. ES `_count` 与样例 `_search` +4. API `/search/suggestions` 冒烟请求 -- [ ] 全量构建输出 `failed=0` -- [ ] ES `_count` 与 `indexed_docs` 一致 -- [ ] 关键 query(中/英)接口有返回 -- [ ] 前端下拉正常 -- [ ] 文档已更新(`README.md` / 本 Runbook / API 指南) +## 8. 发布检查清单 +- [ ] 全量构建 `bulk_result.failed=0` +- [ ] alias 指向新版本索引 +- [ ] 增量更新成功(`mode=incremental`) +- [ ] API 冒烟通过 +- [ ] 文档与脚本已同步 diff --git a/suggestion/TROUBLESHOOTING.md b/suggestion/TROUBLESHOOTING.md index 72faaa5..b0427b5 100644 --- a/suggestion/TROUBLESHOOTING.md +++ b/suggestion/TROUBLESHOOTING.md @@ -1,164 +1,95 @@ -# Suggestion 故障排查手册 +# Suggestion 故障排查手册(Phase 2) -本文档汇总 suggestion 常见问题与定位步骤。 - -## 1. `suggestions` 总是空数组 - -### 现象 - -```json -{"query":"shirt","language":"en","resolved_language":"en","suggestions":[],"took_ms":0} -``` +## 1. `/search/suggestions` 总是空数组 ### 排查步骤 -1. 确认索引存在且有数据: +1. 检查 alias 是否存在并有数据: ```bash -curl -u "$ES_USERNAME:$ES_PASSWORD" \ - "$ES_HOST/search_suggestions_tenant_171/_count?pretty" +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current" +curl "$ES_HOST/$ALIAS_NAME/_count?pretty" ``` -2. 直接查 suggestion 索引样本: +2. 直接抽样查看: ```bash -curl -u "$ES_USERNAME:$ES_PASSWORD" \ - "$ES_HOST/search_suggestions_tenant_171/_search?pretty" -d '{ - "size": 20, - "query": {"match_all": {}}, - "_source": ["lang","text","rank_score"] - }' +curl "$ES_HOST/$ALIAS_NAME/_search?pretty" -H 'Content-Type: application/json' -d '{ + "size": 20, + "query": {"match_all": {}}, + "_source": ["lang", "text", "rank_score", "query_count_30d"] +}' ``` -3. 确认请求语种是否匹配(`language=en` 时,索引里应有 `lang=en` 文档)。 - -4. 检查服务版本是否为最新(重启后端): +3. 确认请求租户和语言: ```bash -bash scripts/stop.sh -bash scripts/start_backend.sh +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en" \ + -H "X-Tenant-ID: 162" ``` -### 已修复的历史问题 - -- **重复传 `size` 导致 ES 查询异常并被吞掉**: - - 症状:日志里出现 `Received multiple values for 'size'` - - 结果:接口返回空 hits(看起来像“无数据”) - - 处理:确保 query body 不再携带 `size`,仅通过 client 参数传 `size` - ## 2. 报错:`tenant_id is required` -### 现象 - -```json -{ - "error": "tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'", - "status_code": 400 -} -``` - -### 原因 - -- 请求缺少 `X-Tenant-ID`,且 URL 没有 `tenant_id`。 - -### 处理 - -- API 调用至少满足其一: - - Header:`X-Tenant-ID: 171` - - Query:`tenant_id=171` +请求缺少 `X-Tenant-ID`(或 query `tenant_id`)。 示例: ```bash -curl "http://localhost:6002/search/suggestions?q=shirt&size=5&language=en&with_results=false&tenant_id=171" +curl "http://localhost:6002/search/suggestions?q=shirt&size=10&language=en&tenant_id=162" ``` -## 3. ES 401:`missing authentication credentials` +## 3. 增量更新没有写入(`updated_terms=0`) -### 现象 +### 常见原因 -```json -{ - "type":"security_exception", - "reason":"missing authentication credentials ..." -} -``` - -### 原因 - -- ES 开启了安全认证,curl/脚本未带凭证。 +- watermark 时间窗内没有新日志 +- `overlap_minutes` 太小 +- `min_query_len` 过滤过严 ### 处理 ```bash -curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty" +./scripts/build_suggestions.sh 162 --mode incremental --overlap-minutes 60 ``` -或使用 API Key: +并在 MySQL 中确认窗口内日志存在。 -```bash -curl -H "Authorization: ApiKey " "$ES_HOST/search_suggestions_tenant_171/_mapping?pretty" -``` - -## 4. 构建脚本报 `Cannot connect to Elasticsearch` - -### 原因 - -- ES 地址不对,或账号密码未配置,或网络不可达。 +## 4. alias 未切到新索引 ### 检查 ```bash -echo "$ES_HOST" -echo "$ES_USERNAME" -curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST" +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current" +curl "$ES_HOST/_alias/$ALIAS_NAME?pretty" ``` -## 5. 前端请求未携带租户参数 - -### 现象 - -- Network 中请求 URL 无 `tenant_id` -- Header 里无 `X-Tenant-ID` - ### 处理 -- 确认前端最新代码已生效(清缓存后强刷)。 -- 前端应同时透传: - - `X-Tenant-ID` - - `tenant_id` query 参数(兜底,避免代理丢 header) - -## 6. 关键 query(如 `shirt`)没有被索引 +重新执行全量发布: -### 检查路径 +```bash +./scripts/build_suggestions.sh 162 --mode full --publish-alias --keep-versions 2 +``` -1. MySQL 里确认日志存在并在回溯窗口内: +## 5. ES 401:`missing authentication credentials` -```sql -SELECT query, language, create_time -FROM shoplazza_search_log -WHERE tenant_id = 171 - AND query = 'shirt' -ORDER BY create_time DESC -LIMIT 20; -``` +ES 开启鉴权时,所有 curl 都需要 `-u "$ES_USERNAME:$ES_PASSWORD"` 或 API Key。 -2. 构建命令是否使用足够大的 `--days`(例如 365)。 -3. 检查 query 是否被清洗规则过滤(空白/符号/过长等)。 +## 6. 构建脚本报 `Cannot connect to Elasticsearch` -## 7. `Invalid HTTP request received.` +检查 `ES_HOST`、账号密码、网络连通性: -### 原因 +```bash +echo "$ES_HOST" +curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST" +``` -- 6002 端口上跑的进程不是当前 FastAPI 服务,或请求协议与服务不匹配。 +## 7. 首次增量执行失败:找不到 active index -### 处理 +说明该租户尚未完成全量构建。可直接启用 bootstrap(默认开启): ```bash -bash scripts/stop.sh -bash scripts/start_backend.sh -curl "http://localhost:6002/health" +./scripts/build_suggestions.sh 162 --mode incremental --bootstrap-if-missing ``` -若 `/health` 正常,再测试 `/search/suggestions`。 - +或先执行一次全量。 diff --git a/suggestion/builder.py b/suggestion/builder.py index d60b0f8..a7027f5 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -1,40 +1,66 @@ """ -Full suggestion index builder. +Suggestion index builder (Phase 2). -Build data from: -- ES product index fields: title.{lang}, qanchors.{lang} -- MySQL search logs: shoplazza_search_log.query (+ language metadata) +Capabilities: +- Full rebuild to versioned index +- Atomic alias publish +- Incremental update from query logs with watermark """ import json import logging import math import re +import unicodedata from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple from sqlalchemy import text +from config.env_config import ES_INDEX_NAMESPACE from config.tenant_config_loader import get_tenant_config_loader -from utils.es_client import ESClient from suggestion.mapping import build_suggestion_mapping -from config.env_config import ES_INDEX_NAMESPACE +from utils.es_client import ESClient logger = logging.getLogger(__name__) +def _index_prefix() -> str: + return ES_INDEX_NAMESPACE or "" + + +def get_suggestion_legacy_index_name(tenant_id: str) -> str: + """Legacy concrete index name (Phase1 compatibility).""" + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}" + + +def get_suggestion_alias_name(tenant_id: str) -> str: + """Read alias for suggestion index (Phase2 default search target).""" + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current" + + +def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str: + """Versioned suggestion index name.""" + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S") + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}" + + +def get_suggestion_versioned_index_pattern(tenant_id: str) -> str: + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v*" + + +def get_suggestion_meta_index_name() -> str: + return f"{_index_prefix()}search_suggestions_meta" + + def get_suggestion_index_name(tenant_id: str) -> str: """ - 生成 suggestion 索引名称。 - - 命名规则: - {ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id} + Search target for suggestion query. - 通过 ES_INDEX_NAMESPACE 统一区分 prod/uat/test 等环境。 + Phase2 uses alias by default. """ - prefix = ES_INDEX_NAMESPACE or "" - return f"{prefix}search_suggestions_tenant_{tenant_id}" + return get_suggestion_alias_name(tenant_id) @dataclass @@ -50,17 +76,13 @@ class SuggestionCandidate: lang_confidence: float = 1.0 lang_source: str = "default" lang_conflict: bool = False - top_spu_scores: Dict[str, float] = field(default_factory=dict) - def add_product(self, source: str, spu_id: str, score: float) -> None: + def add_product(self, source: str, spu_id: str) -> None: self.sources.add(source) if source == "title": self.title_spu_ids.add(spu_id) elif source == "qanchor": self.qanchor_spu_ids.add(spu_id) - prev = self.top_spu_scores.get(spu_id) - if prev is None or score > prev: - self.top_spu_scores[spu_id] = score def add_query_log(self, is_7d: bool) -> None: self.sources.add("query_log") @@ -69,16 +91,39 @@ class SuggestionCandidate: self.query_count_7d += 1 +@dataclass +class QueryDelta: + tenant_id: str + lang: str + text: str + text_norm: str + delta_7d: int = 0 + delta_30d: int = 0 + lang_confidence: float = 1.0 + lang_source: str = "default" + lang_conflict: bool = False + + class SuggestionIndexBuilder: - """Build and rebuild suggestion index.""" + """Build and update suggestion index.""" def __init__(self, es_client: ESClient, db_engine: Any): self.es_client = es_client self.db_engine = db_engine @staticmethod + def _to_utc(dt: Any) -> Optional[datetime]: + if dt is None: + return None + if isinstance(dt, datetime): + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + return None + + @staticmethod def _normalize_text(value: str) -> str: - text_value = (value or "").strip().lower() + text_value = unicodedata.normalize("NFKC", (value or "")).strip().lower() text_value = re.sub(r"\s+", " ", text_value) return text_value @@ -114,7 +159,6 @@ class SuggestionIndexBuilder: token = str(lang).strip().lower().replace("-", "_") if not token: return None - # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] @@ -138,19 +182,14 @@ class SuggestionIndexBuilder: @staticmethod def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]: - # CJK unified if re.search(r"[\u4e00-\u9fff]", query): return "zh", 0.98, "script" - # Arabic if re.search(r"[\u0600-\u06FF]", query): return "ar", 0.98, "script" - # Cyrillic if re.search(r"[\u0400-\u04FF]", query): return "ru", 0.95, "script" - # Greek if re.search(r"[\u0370-\u03FF]", query): return "el", 0.95, "script" - # Latin fallback if re.search(r"[a-zA-Z]", query): return "en", 0.55, "model" return None, 0.0, "default" @@ -186,32 +225,34 @@ class SuggestionIndexBuilder: return primary, 0.3, "default", conflict @staticmethod - def _score_product_hit(source: Dict[str, Any]) -> float: - sales = float(source.get("sales") or 0.0) - inventory = float(source.get("total_inventory") or 0.0) - return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4 - - @staticmethod - def _compute_rank_score(c: SuggestionCandidate) -> float: + def _compute_rank_score(query_count_30d: int, query_count_7d: int, qanchor_doc_count: int, title_doc_count: int) -> float: return ( - 1.8 * math.log1p(c.query_count_30d) - + 1.2 * math.log1p(c.query_count_7d) - + 1.0 * math.log1p(len(c.qanchor_spu_ids)) - + 0.6 * math.log1p(len(c.title_spu_ids)) + 1.8 * math.log1p(max(query_count_30d, 0)) + + 1.2 * math.log1p(max(query_count_7d, 0)) + + 1.0 * math.log1p(max(qanchor_doc_count, 0)) + + 0.6 * math.log1p(max(title_doc_count, 0)) ) - def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]: - """Scan all product docs from tenant index using search_after.""" + @classmethod + def _compute_rank_score_from_candidate(cls, c: SuggestionCandidate) -> float: + return cls._compute_rank_score( + query_count_30d=c.query_count_30d, + query_count_7d=c.query_count_7d, + qanchor_doc_count=len(c.qanchor_spu_ids), + title_doc_count=len(c.title_spu_ids), + ) + + def _iter_products(self, tenant_id: str, batch_size: int = 500) -> Iterator[Dict[str, Any]]: + """Stream product docs from tenant index using search_after.""" from indexer.mapping_generator import get_tenant_index_name index_name = get_tenant_index_name(tenant_id) - all_hits: List[Dict[str, Any]] = [] search_after: Optional[List[Any]] = None while True: body: Dict[str, Any] = { "size": batch_size, - "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"], + "_source": ["spu_id", "title", "qanchors"], "sort": [{"spu_id": "asc"}], "query": {"match_all": {}}, } @@ -222,50 +263,179 @@ class SuggestionIndexBuilder: hits = resp.get("hits", {}).get("hits", []) or [] if not hits: break - all_hits.extend(hits) + for hit in hits: + yield hit search_after = hits[-1].get("sort") if len(hits) < batch_size: break - return all_hits - - def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str: - index_name = get_suggestion_index_name(tenant_id) - if recreate and self.es_client.index_exists(index_name): - logger.info("Deleting existing suggestion index: %s", index_name) - self.es_client.delete_index(index_name) - if not self.es_client.index_exists(index_name): - mapping = build_suggestion_mapping(index_languages=index_languages) - ok = self.es_client.create_index(index_name, mapping) - if not ok: - raise RuntimeError(f"Failed to create suggestion index: {index_name}") - return index_name - def rebuild_tenant_index( + def _iter_query_log_rows( self, tenant_id: str, - days: int = 365, - recreate: bool = True, - batch_size: int = 500, - min_query_len: int = 1, - ) -> Dict[str, Any]: - tenant_loader = get_tenant_config_loader() - tenant_cfg = tenant_loader.get_tenant_config(tenant_id) - index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] - primary_language: str = tenant_cfg.get("primary_language") or "en" + since: datetime, + until: datetime, + fetch_size: int = 2000, + ) -> Iterator[Any]: + """Stream search logs from MySQL with bounded time range.""" + query_sql = text( + """ + SELECT query, language, request_params, create_time + FROM shoplazza_search_log + WHERE tenant_id = :tenant_id + AND deleted = 0 + AND query IS NOT NULL + AND query <> '' + AND create_time >= :since_time + AND create_time < :until_time + ORDER BY create_time ASC + """ + ) + + with self.db_engine.connect().execution_options(stream_results=True) as conn: + result = conn.execute( + query_sql, + { + "tenant_id": int(tenant_id), + "since_time": since, + "until_time": until, + }, + ) + while True: + rows = result.fetchmany(fetch_size) + if not rows: + break + for row in rows: + yield row + + def _ensure_meta_index(self) -> str: + meta_index = get_suggestion_meta_index_name() + if self.es_client.index_exists(meta_index): + return meta_index + body = { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "1s", + }, + "mappings": { + "properties": { + "tenant_id": {"type": "keyword"}, + "active_alias": {"type": "keyword"}, + "active_index": {"type": "keyword"}, + "last_full_build_at": {"type": "date"}, + "last_incremental_build_at": {"type": "date"}, + "last_incremental_watermark": {"type": "date"}, + "updated_at": {"type": "date"}, + } + }, + } + if not self.es_client.create_index(meta_index, body): + raise RuntimeError(f"Failed to create suggestion meta index: {meta_index}") + return meta_index + + def _get_meta(self, tenant_id: str) -> Dict[str, Any]: + meta_index = self._ensure_meta_index() + try: + resp = self.es_client.client.get(index=meta_index, id=str(tenant_id)) + return resp.get("_source", {}) or {} + except Exception: + return {} + + def _upsert_meta(self, tenant_id: str, patch: Dict[str, Any]) -> None: + meta_index = self._ensure_meta_index() + current = self._get_meta(tenant_id) + now_iso = datetime.now(timezone.utc).isoformat() + merged = { + "tenant_id": str(tenant_id), + **current, + **patch, + "updated_at": now_iso, + } + self.es_client.client.index(index=meta_index, id=str(tenant_id), document=merged, refresh="wait_for") + + def _cleanup_old_versions(self, tenant_id: str, keep_versions: int, protected_indices: Optional[List[str]] = None) -> List[str]: + if keep_versions < 1: + keep_versions = 1 + protected = set(protected_indices or []) + pattern = get_suggestion_versioned_index_pattern(tenant_id) + all_indices = self.es_client.list_indices(pattern) + if len(all_indices) <= keep_versions: + return [] + + # Names are timestamp-ordered by suffix; keep newest N. + kept = set(sorted(all_indices)[-keep_versions:]) + dropped: List[str] = [] + for idx in sorted(all_indices): + if idx in kept or idx in protected: + continue + if self.es_client.delete_index(idx): + dropped.append(idx) + return dropped + + def _publish_alias(self, tenant_id: str, index_name: str, keep_versions: int = 2) -> Dict[str, Any]: + alias_name = get_suggestion_alias_name(tenant_id) + current_indices = self.es_client.get_alias_indices(alias_name) + + actions: List[Dict[str, Any]] = [] + for idx in current_indices: + actions.append({"remove": {"index": idx, "alias": alias_name}}) + actions.append({"add": {"index": index_name, "alias": alias_name}}) + + if not self.es_client.update_aliases(actions): + raise RuntimeError(f"Failed to publish alias {alias_name} -> {index_name}") + + dropped = self._cleanup_old_versions( + tenant_id=tenant_id, + keep_versions=keep_versions, + protected_indices=[index_name], + ) - index_name = self._create_or_reset_index(tenant_id, index_languages, recreate) + self._upsert_meta( + tenant_id, + { + "active_alias": alias_name, + "active_index": index_name, + }, + ) + + return { + "alias": alias_name, + "previous_indices": current_indices, + "current_index": index_name, + "dropped_old_indices": dropped, + } + + def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]: + alias_name = get_suggestion_alias_name(tenant_id) + aliased = self.es_client.get_alias_indices(alias_name) + if aliased: + # alias should map to one index in this design + return sorted(aliased)[-1] + + legacy = get_suggestion_legacy_index_name(tenant_id) + if self.es_client.index_exists(legacy): + return legacy + return None + + def _build_full_candidates( + self, + tenant_id: str, + index_languages: List[str], + primary_language: str, + days: int, + batch_size: int, + min_query_len: int, + ) -> Dict[Tuple[str, str], SuggestionCandidate]: key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {} # Step 1: product title/qanchors - hits = self._scan_products(tenant_id, batch_size=batch_size) - for hit in hits: + for hit in self._iter_products(tenant_id, batch_size=batch_size): src = hit.get("_source", {}) or {} spu_id = str(src.get("spu_id") or "") if not spu_id: continue title_obj = src.get("title") or {} qanchor_obj = src.get("qanchors") or {} - product_score = self._score_product_hit(src) for lang in index_languages: title = "" @@ -279,7 +449,7 @@ class SuggestionIndexBuilder: if c is None: c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) key_to_candidate[key] = c - c.add_product("title", spu_id=spu_id, score=product_score) + c.add_product("title", spu_id=spu_id) q_raw = None if isinstance(qanchor_obj, dict): @@ -293,30 +463,18 @@ class SuggestionIndexBuilder: if c is None: c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) key_to_candidate[key] = c - c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6) + c.add_product("qanchor", spu_id=spu_id) # Step 2: query logs now = datetime.now(timezone.utc) - since_30d = now - timedelta(days=days) + since = now - timedelta(days=days) since_7d = now - timedelta(days=7) - query_sql = text( - """ - SELECT query, language, request_params, create_time - FROM shoplazza_search_log - WHERE tenant_id = :tenant_id - AND deleted = 0 - AND query IS NOT NULL - AND query <> '' - AND create_time >= :since_30d - """ - ) - with self.db_engine.connect() as conn: - rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall() - for row in rows: + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=now): q = str(row.query or "").strip() if len(q) < min_query_len: continue + lang, conf, source, conflict = self._resolve_query_language( query=q, log_language=getattr(row, "language", None), @@ -327,74 +485,396 @@ class SuggestionIndexBuilder: text_norm = self._normalize_text(q) if self._looks_noise(text_norm): continue + key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang) key_to_candidate[key] = c + c.lang_confidence = max(c.lang_confidence, conf) c.lang_source = source if c.lang_source == "default" else c.lang_source c.lang_conflict = c.lang_conflict or conflict - created_at = getattr(row, "create_time", None) - if created_at is None: - is_7d = False - else: - # DB datetime usually naive local time; compare conservatively - if isinstance(created_at, datetime) and created_at.tzinfo is None: - created_at = created_at.replace(tzinfo=timezone.utc) - is_7d = bool(created_at and created_at >= since_7d) + created_at = self._to_utc(getattr(row, "create_time", None)) + is_7d = bool(created_at and created_at >= since_7d) c.add_query_log(is_7d=is_7d) - # Step 3: build docs + return key_to_candidate + + def _candidate_to_doc(self, tenant_id: str, c: SuggestionCandidate, now_iso: str) -> Dict[str, Any]: + rank_score = self._compute_rank_score_from_candidate(c) + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} + sat_obj = {c.lang: c.text} + return { + "_id": f"{tenant_id}|{c.lang}|{c.text_norm}", + "tenant_id": str(tenant_id), + "lang": c.lang, + "text": c.text, + "text_norm": c.text_norm, + "sources": sorted(c.sources), + "title_doc_count": len(c.title_spu_ids), + "qanchor_doc_count": len(c.qanchor_spu_ids), + "query_count_7d": c.query_count_7d, + "query_count_30d": c.query_count_30d, + "rank_score": float(rank_score), + "lang_confidence": float(c.lang_confidence), + "lang_source": c.lang_source, + "lang_conflict": bool(c.lang_conflict), + "status": 1, + "updated_at": now_iso, + "completion": completion_obj, + "sat": sat_obj, + } + + def rebuild_tenant_index( + self, + tenant_id: str, + days: int = 365, + recreate: bool = False, + batch_size: int = 500, + min_query_len: int = 1, + publish_alias: bool = True, + keep_versions: int = 2, + use_versioned_index: bool = True, + ) -> Dict[str, Any]: + """ + Full rebuild. + + Phase2 default behavior: + - write to versioned index + - atomically publish alias + """ + tenant_loader = get_tenant_config_loader() + tenant_cfg = tenant_loader.get_tenant_config(tenant_id) + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] + primary_language: str = tenant_cfg.get("primary_language") or "en" + + if use_versioned_index: + index_name = get_suggestion_versioned_index_name(tenant_id) + else: + index_name = get_suggestion_legacy_index_name(tenant_id) + if recreate and self.es_client.index_exists(index_name): + logger.info("Deleting existing suggestion index: %s", index_name) + self.es_client.delete_index(index_name) + + if self.es_client.index_exists(index_name): + raise RuntimeError(f"Target suggestion index already exists: {index_name}") + + mapping = build_suggestion_mapping(index_languages=index_languages) + if not self.es_client.create_index(index_name, mapping): + raise RuntimeError(f"Failed to create suggestion index: {index_name}") + + key_to_candidate = self._build_full_candidates( + tenant_id=tenant_id, + index_languages=index_languages, + primary_language=primary_language, + days=days, + batch_size=batch_size, + min_query_len=min_query_len, + ) + now_iso = datetime.now(timezone.utc).isoformat() - docs: List[Dict[str, Any]] = [] - for (_, _), c in key_to_candidate.items(): - rank_score = self._compute_rank_score(c) - # keep top 20 product ids by score - top_spu_ids = [ - item[0] - for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20] - ] - - completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} - sat_obj = {c.lang: c.text} - doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}" - docs.append( - { - "_id": doc_id, - "tenant_id": str(tenant_id), - "lang": c.lang, - "text": c.text, - "text_norm": c.text_norm, - "sources": sorted(c.sources), - "title_doc_count": len(c.title_spu_ids), - "qanchor_doc_count": len(c.qanchor_spu_ids), - "query_count_7d": c.query_count_7d, - "query_count_30d": c.query_count_30d, - "rank_score": float(rank_score), - "lang_confidence": float(c.lang_confidence), - "lang_source": c.lang_source, - "lang_conflict": bool(c.lang_conflict), - "top_spu_ids": top_spu_ids, - "status": 1, - "updated_at": now_iso, - "completion": completion_obj, - "sat": sat_obj, - } - ) + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()] if docs: - result = self.es_client.bulk_index(index_name=index_name, docs=docs) + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs) self.es_client.refresh(index_name) else: - result = {"success": 0, "failed": 0, "errors": []} + bulk_result = {"success": 0, "failed": 0, "errors": []} + + alias_publish: Optional[Dict[str, Any]] = None + if publish_alias and use_versioned_index: + alias_publish = self._publish_alias( + tenant_id=tenant_id, + index_name=index_name, + keep_versions=keep_versions, + ) + + now_utc = datetime.now(timezone.utc).isoformat() + meta_patch: Dict[str, Any] = { + "last_full_build_at": now_utc, + "last_incremental_watermark": now_utc, + } + if publish_alias and use_versioned_index: + meta_patch["active_index"] = index_name + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) + self._upsert_meta(tenant_id, meta_patch) return { + "mode": "full", "tenant_id": str(tenant_id), "index_name": index_name, + "alias_published": bool(alias_publish), + "alias_publish": alias_publish, "total_candidates": len(key_to_candidate), "indexed_docs": len(docs), - "bulk_result": result, + "bulk_result": bulk_result, } + def _build_incremental_deltas( + self, + tenant_id: str, + index_languages: List[str], + primary_language: str, + since: datetime, + until: datetime, + min_query_len: int, + ) -> Dict[Tuple[str, str], QueryDelta]: + now = datetime.now(timezone.utc) + since_7d = now - timedelta(days=7) + deltas: Dict[Tuple[str, str], QueryDelta] = {} + + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=until): + q = str(row.query or "").strip() + if len(q) < min_query_len: + continue + + lang, conf, source, conflict = self._resolve_query_language( + query=q, + log_language=getattr(row, "language", None), + request_params=getattr(row, "request_params", None), + index_languages=index_languages, + primary_language=primary_language, + ) + text_norm = self._normalize_text(q) + if self._looks_noise(text_norm): + continue + + key = (lang, text_norm) + item = deltas.get(key) + if item is None: + item = QueryDelta( + tenant_id=str(tenant_id), + lang=lang, + text=q, + text_norm=text_norm, + lang_confidence=conf, + lang_source=source, + lang_conflict=conflict, + ) + deltas[key] = item + + created_at = self._to_utc(getattr(row, "create_time", None)) + item.delta_30d += 1 + if created_at and created_at >= since_7d: + item.delta_7d += 1 + + if conf > item.lang_confidence: + item.lang_confidence = conf + item.lang_source = source + item.lang_conflict = item.lang_conflict or conflict + + return deltas + + def _delta_to_upsert_doc(self, delta: QueryDelta, now_iso: str) -> Dict[str, Any]: + rank_score = self._compute_rank_score( + query_count_30d=delta.delta_30d, + query_count_7d=delta.delta_7d, + qanchor_doc_count=0, + title_doc_count=0, + ) + return { + "tenant_id": delta.tenant_id, + "lang": delta.lang, + "text": delta.text, + "text_norm": delta.text_norm, + "sources": ["query_log"], + "title_doc_count": 0, + "qanchor_doc_count": 0, + "query_count_7d": delta.delta_7d, + "query_count_30d": delta.delta_30d, + "rank_score": float(rank_score), + "lang_confidence": float(delta.lang_confidence), + "lang_source": delta.lang_source, + "lang_conflict": bool(delta.lang_conflict), + "status": 1, + "updated_at": now_iso, + "completion": { + delta.lang: { + "input": [delta.text], + "weight": int(max(rank_score, 1.0) * 100), + } + }, + "sat": {delta.lang: delta.text}, + } + + @staticmethod + def _build_incremental_update_script() -> str: + return """ + if (ctx._source == null || ctx._source.isEmpty()) { + ctx._source = params.upsert; + return; + } + + if (ctx._source.query_count_30d == null) { ctx._source.query_count_30d = 0; } + if (ctx._source.query_count_7d == null) { ctx._source.query_count_7d = 0; } + if (ctx._source.qanchor_doc_count == null) { ctx._source.qanchor_doc_count = 0; } + if (ctx._source.title_doc_count == null) { ctx._source.title_doc_count = 0; } + + ctx._source.query_count_30d += params.delta_30d; + ctx._source.query_count_7d += params.delta_7d; + + if (ctx._source.sources == null) { ctx._source.sources = new ArrayList(); } + if (!ctx._source.sources.contains('query_log')) { ctx._source.sources.add('query_log'); } + + if (ctx._source.lang_conflict == null) { ctx._source.lang_conflict = false; } + ctx._source.lang_conflict = ctx._source.lang_conflict || params.lang_conflict; + + if (ctx._source.lang_confidence == null || params.lang_confidence > ctx._source.lang_confidence) { + ctx._source.lang_confidence = params.lang_confidence; + ctx._source.lang_source = params.lang_source; + } + + int q30 = ctx._source.query_count_30d; + int q7 = ctx._source.query_count_7d; + int qa = ctx._source.qanchor_doc_count; + int td = ctx._source.title_doc_count; + + double score = 1.8 * Math.log(1 + q30) + + 1.2 * Math.log(1 + q7) + + 1.0 * Math.log(1 + qa) + + 0.6 * Math.log(1 + td); + ctx._source.rank_score = score; + ctx._source.status = 1; + ctx._source.updated_at = params.now_iso; + ctx._source.text = params.text; + ctx._source.lang = params.lang; + ctx._source.text_norm = params.text_norm; + + if (ctx._source.completion == null) { ctx._source.completion = new HashMap(); } + Map c = new HashMap(); + c.put('input', params.completion_input); + c.put('weight', params.completion_weight); + ctx._source.completion.put(params.lang, c); + + if (ctx._source.sat == null) { ctx._source.sat = new HashMap(); } + ctx._source.sat.put(params.lang, params.text); + """ + + def _build_incremental_actions(self, target_index: str, deltas: Dict[Tuple[str, str], QueryDelta]) -> List[Dict[str, Any]]: + now_iso = datetime.now(timezone.utc).isoformat() + script_source = self._build_incremental_update_script() + actions: List[Dict[str, Any]] = [] + + for delta in deltas.values(): + upsert_doc = self._delta_to_upsert_doc(delta=delta, now_iso=now_iso) + upsert_rank = float(upsert_doc.get("rank_score") or 0.0) + action = { + "_op_type": "update", + "_index": target_index, + "_id": f"{delta.tenant_id}|{delta.lang}|{delta.text_norm}", + "scripted_upsert": True, + "script": { + "lang": "painless", + "source": script_source, + "params": { + "delta_30d": int(delta.delta_30d), + "delta_7d": int(delta.delta_7d), + "lang_confidence": float(delta.lang_confidence), + "lang_source": delta.lang_source, + "lang_conflict": bool(delta.lang_conflict), + "now_iso": now_iso, + "lang": delta.lang, + "text": delta.text, + "text_norm": delta.text_norm, + "completion_input": [delta.text], + "completion_weight": int(max(upsert_rank, 1.0) * 100), + "upsert": upsert_doc, + }, + }, + "upsert": upsert_doc, + } + actions.append(action) + + return actions + + def incremental_update_tenant_index( + self, + tenant_id: str, + min_query_len: int = 1, + fallback_days: int = 7, + overlap_minutes: int = 30, + bootstrap_if_missing: bool = True, + bootstrap_days: int = 30, + batch_size: int = 500, + ) -> Dict[str, Any]: + tenant_loader = get_tenant_config_loader() + tenant_cfg = tenant_loader.get_tenant_config(tenant_id) + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] + primary_language: str = tenant_cfg.get("primary_language") or "en" + + target_index = self._resolve_incremental_target_index(tenant_id) + if not target_index: + if not bootstrap_if_missing: + raise RuntimeError( + f"No active suggestion index for tenant={tenant_id}. " + "Run full rebuild first or enable bootstrap_if_missing." + ) + full_result = self.rebuild_tenant_index( + tenant_id=tenant_id, + days=bootstrap_days, + batch_size=batch_size, + min_query_len=min_query_len, + publish_alias=True, + use_versioned_index=True, + ) + return { + "mode": "incremental", + "tenant_id": str(tenant_id), + "bootstrapped": True, + "bootstrap_result": full_result, + } + + meta = self._get_meta(tenant_id) + watermark_raw = meta.get("last_incremental_watermark") or meta.get("last_full_build_at") + now = datetime.now(timezone.utc) + default_since = now - timedelta(days=fallback_days) + since = None + if isinstance(watermark_raw, str) and watermark_raw.strip(): + try: + since = self._to_utc(datetime.fromisoformat(watermark_raw.replace("Z", "+00:00"))) + except Exception: + since = None + if since is None: + since = default_since + since = since - timedelta(minutes=max(overlap_minutes, 0)) + if since < default_since: + since = default_since + + deltas = self._build_incremental_deltas( + tenant_id=tenant_id, + index_languages=index_languages, + primary_language=primary_language, + since=since, + until=now, + min_query_len=min_query_len, + ) + + actions = self._build_incremental_actions(target_index=target_index, deltas=deltas) + bulk_result = self.es_client.bulk_actions(actions) + self.es_client.refresh(target_index) + + now_iso = now.isoformat() + self._upsert_meta( + tenant_id, + { + "last_incremental_build_at": now_iso, + "last_incremental_watermark": now_iso, + "active_index": target_index, + "active_alias": get_suggestion_alias_name(tenant_id), + }, + ) + + return { + "mode": "incremental", + "tenant_id": str(tenant_id), + "target_index": target_index, + "query_window": { + "since": since.isoformat(), + "until": now_iso, + "overlap_minutes": int(overlap_minutes), + }, + "updated_terms": len(deltas), + "bulk_result": bulk_result, + } diff --git a/suggestion/mapping.py b/suggestion/mapping.py index 86c597d..1fc0ad5 100644 --- a/suggestion/mapping.py +++ b/suggestion/mapping.py @@ -88,7 +88,6 @@ def build_suggestion_mapping(index_languages: List[str]) -> Dict[str, Any]: "lang_confidence": {"type": "float"}, "lang_source": {"type": "keyword"}, "lang_conflict": {"type": "boolean"}, - "top_spu_ids": {"type": "keyword"}, "status": {"type": "byte"}, "updated_at": {"type": "date"}, "completion": {"properties": completion_props}, @@ -96,4 +95,3 @@ def build_suggestion_mapping(index_languages: List[str]) -> Dict[str, Any]: } }, } - diff --git a/suggestion/service.py b/suggestion/service.py index 4816c04..3804cee 100644 --- a/suggestion/service.py +++ b/suggestion/service.py @@ -7,8 +7,11 @@ import time from typing import Any, Dict, List, Optional from config.tenant_config_loader import get_tenant_config_loader -from indexer.mapping_generator import get_tenant_index_name -from suggestion.builder import get_suggestion_index_name +from suggestion.builder import ( + get_suggestion_alias_name, + get_suggestion_index_name, + get_suggestion_legacy_index_name, +) from utils.es_client import ESClient logger = logging.getLogger(__name__) @@ -33,12 +36,29 @@ class SuggestionService: return primary return index_languages[0] + def _resolve_search_target(self, tenant_id: str) -> Optional[str]: + alias_name = get_suggestion_alias_name(tenant_id) + if self.es_client.alias_exists(alias_name): + return alias_name + + # Fallback for pre-Phase2 deployments + legacy = get_suggestion_legacy_index_name(tenant_id) + if self.es_client.index_exists(legacy): + return legacy + + # Last fallback: current naming helper + candidate = get_suggestion_index_name(tenant_id) + if self.es_client.index_exists(candidate): + return candidate + return None + def _completion_suggest( self, index_name: str, query: str, lang: str, size: int, + tenant_id: str, ) -> List[Dict[str, Any]]: """ Query ES completion suggester from `completion.`. @@ -68,7 +88,7 @@ class SuggestionService: ], } try: - resp = self.es_client.client.search(index=index_name, body=body) + resp = self.es_client.client.search(index=index_name, body=body, routing=str(tenant_id)) except Exception as e: # completion is an optimization path; never hard-fail the whole endpoint logger.warning("Completion suggest failed for index=%s field=%s: %s", index_name, field_name, e) @@ -95,69 +115,17 @@ class SuggestionService: ) return out - def _search_products_for_suggestion( - self, - tenant_id: str, - text_value: str, - lang: str, - result_size: int, - ) -> List[Dict[str, Any]]: - index_name = get_tenant_index_name(tenant_id) - title_field = f"title.{lang}" - qanchor_field = f"qanchors.{lang}" - - body = { - "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"], - "query": { - "bool": { - "should": [ - {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}}, - {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}}, - {"match": {title_field: {"query": text_value, "boost": 1.0}}}, - ], - "minimum_should_match": 1, - } - }, - "sort": [{"_score": "desc"}, {"sales": "desc"}], - } - resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0) - hits = resp.get("hits", {}).get("hits", []) or [] - out: List[Dict[str, Any]] = [] - for hit in hits: - src = hit.get("_source", {}) or {} - title_obj = src.get("title") or {} - resolved_title = None - if isinstance(title_obj, dict): - resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh") - if not resolved_title: - for v in title_obj.values(): - if v: - resolved_title = v - break - out.append( - { - "spu_id": src.get("spu_id"), - "title": resolved_title, - "price": src.get("min_price"), - "image_url": src.get("image_url"), - "score": hit.get("_score", 0.0), - } - ) - return out - def search( self, tenant_id: str, query: str, language: str, size: int = 10, - with_results: bool = True, - result_size: int = 3, ) -> Dict[str, Any]: start = time.time() resolved_lang = self._resolve_language(tenant_id, language) - index_name = get_suggestion_index_name(tenant_id) - if not self.es_client.index_exists(index_name): + index_name = self._resolve_search_target(tenant_id) + if not index_name: # On a fresh ES cluster the suggestion index might not be built yet. # Keep endpoint stable for frontend autocomplete: return empty list instead of 500. took_ms = int((time.time() - start) * 1000) @@ -171,6 +139,7 @@ class SuggestionService: sat_field = f"sat.{resolved_lang}" dsl = { + "track_total_hits": False, "query": { "function_score": { "query": { @@ -206,14 +175,19 @@ class SuggestionService: "lang", "rank_score", "sources", - "top_spu_ids", "lang_source", "lang_confidence", "lang_conflict", ], } # Recall path A: bool_prefix on search_as_you_type - es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0) + es_resp = self.es_client.search( + index_name=index_name, + body=dsl, + size=size, + from_=0, + routing=str(tenant_id), + ) hits = es_resp.get("hits", {}).get("hits", []) or [] # Recall path B: completion suggester (optional optimization) @@ -222,6 +196,7 @@ class SuggestionService: query=query, lang=resolved_lang, size=size, + tenant_id=tenant_id, ) suggestions: List[Dict[str, Any]] = [] @@ -256,17 +231,6 @@ class SuggestionService: "lang_confidence": src.get("lang_confidence"), "lang_conflict": src.get("lang_conflict", False), } - if with_results: - try: - item["products"] = self._search_products_for_suggestion( - tenant_id=tenant_id, - text_value=str(src.get("text") or ""), - lang=resolved_lang, - result_size=result_size, - ) - except Exception as e: - logger.warning("Failed to enrich suggestion products: %s", e) - item["products"] = [] suggestions.append(item) took_ms = int((time.time() - start) * 1000) @@ -277,4 +241,3 @@ class SuggestionService: "suggestions": suggestions[:size], "took_ms": took_ms, } - diff --git a/tests/test_suggestions.py b/tests/test_suggestions.py index 2e24e7a..01a0443 100644 --- a/tests/test_suggestions.py +++ b/tests/test_suggestions.py @@ -1,22 +1,80 @@ import json +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List import pytest -from suggestion.builder import SuggestionIndexBuilder +from suggestion.builder import ( + QueryDelta, + SuggestionIndexBuilder, + get_suggestion_alias_name, +) from suggestion.service import SuggestionService class FakeESClient: - """Minimal fake ES client for SuggestionService tests.""" + """Lightweight fake ES client for suggestion unit tests.""" def __init__(self) -> None: self.calls: List[Dict[str, Any]] = [] + self.indices: set[str] = set() + self.aliases: Dict[str, List[str]] = {} + self.client = self # support service._completion_suggest -> self.es_client.client.search - def search(self, index_name: str, body: Dict[str, Any], size: int = 10, from_: int = 0) -> Dict[str, Any]: - self.calls.append({"index": index_name, "body": body, "size": size, "from": from_}) - # Suggestion index - if "search_suggestions_tenant_" in index_name: + def search( + self, + index_name: str = None, + body: Dict[str, Any] = None, + size: int = 10, + from_: int = 0, + routing: str = None, + index: str = None, + **kwargs, + ) -> Dict[str, Any]: + idx = index_name or index + body = body or {} + self.calls.append( + { + "op": "search", + "index": idx, + "body": body, + "size": size, + "from": from_, + "routing": routing, + } + ) + + # Completion suggest path + if "suggest" in body: + return { + "suggest": { + "s": [ + { + "text": "iph", + "offset": 0, + "length": 3, + "options": [ + { + "text": "iphone 15", + "_score": 6.3, + "_source": { + "text": "iphone 15", + "lang": "en", + "rank_score": 5.0, + "sources": ["query_log", "qanchor"], + "lang_source": "log_field", + "lang_confidence": 1.0, + "lang_conflict": False, + }, + } + ], + } + ] + } + } + + # bool_prefix path + if idx and "search_suggestions_tenant_" in idx: return { "hits": { "total": {"value": 1}, @@ -33,61 +91,68 @@ class FakeESClient: "lang_source": "log_field", "lang_confidence": 1.0, "lang_conflict": False, - "top_spu_ids": ["12345"], - }, - } - ], - } - } - # Product index - if "search_products_tenant_" in index_name: - return { - "hits": { - "total": {"value": 1}, - "max_score": 2.5, - "hits": [ - { - "_id": "12345", - "_score": 2.5, - "_source": { - "spu_id": "12345", - "title": {"en": "iPhone 15 Pro Max"}, - "min_price": 999.0, - "image_url": "https://example.com/image.jpg", - "sales": 100, - "total_inventory": 50, }, } ], } } + return {"hits": {"total": {"value": 0}, "max_score": 0.0, "hits": []}} - # For builder.bulk_index usage compatibility in full runs (not used in these unit tests) def bulk_index(self, index_name: str, docs: List[Dict[str, Any]]) -> Dict[str, Any]: - self.calls.append({"index": index_name, "bulk": True, "docs": docs}) + self.calls.append({"op": "bulk_index", "index": index_name, "docs": docs}) return {"success": len(docs), "failed": 0, "errors": []} + def bulk_actions(self, actions: List[Dict[str, Any]]) -> Dict[str, Any]: + self.calls.append({"op": "bulk_actions", "actions": actions}) + return {"success": len(actions), "failed": 0, "errors": []} + def index_exists(self, index_name: str) -> bool: - return False + return index_name in self.indices def delete_index(self, index_name: str) -> bool: - return True + if index_name in self.indices: + self.indices.remove(index_name) + return True + return False def create_index(self, index_name: str, body: Dict[str, Any]) -> bool: - self.calls.append({"index": index_name, "create": True, "body": body}) + self.calls.append({"op": "create_index", "index": index_name, "body": body}) + self.indices.add(index_name) return True def refresh(self, index_name: str) -> bool: + self.calls.append({"op": "refresh", "index": index_name}) + return True + + def alias_exists(self, alias_name: str) -> bool: + return alias_name in self.aliases and len(self.aliases[alias_name]) > 0 + + def get_alias_indices(self, alias_name: str) -> List[str]: + return list(self.aliases.get(alias_name, [])) + + def update_aliases(self, actions: List[Dict[str, Any]]) -> bool: + self.calls.append({"op": "update_aliases", "actions": actions}) + for action in actions: + if "remove" in action: + alias = action["remove"]["alias"] + index = action["remove"]["index"] + self.aliases[alias] = [x for x in self.aliases.get(alias, []) if x != index] + if "add" in action: + alias = action["add"]["alias"] + index = action["add"]["index"] + self.aliases[alias] = [index] return True + def list_indices(self, pattern: str) -> List[str]: + prefix = pattern.rstrip("*") + return sorted([x for x in self.indices if x.startswith(prefix)]) + @pytest.mark.unit -def test_resolve_query_language_prefers_log_field(monkeypatch): - """builder.resolve_query_language 应优先使用日志 language 字段。""" +def test_resolve_query_language_prefers_log_field(): fake_es = FakeESClient() builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) - # index_languages 里包含 en/zh,primary 设为 zh lang, conf, source, conflict = builder._resolve_query_language( query="iphone 15", log_language="en", @@ -103,7 +168,6 @@ def test_resolve_query_language_prefers_log_field(monkeypatch): @pytest.mark.unit def test_resolve_query_language_uses_request_params_when_log_missing(): - """当日志 language 为空时,应从 request_params.language 解析。""" fake_es = FakeESClient() builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) request_params = json.dumps({"language": "zh"}) @@ -122,10 +186,8 @@ def test_resolve_query_language_uses_request_params_when_log_missing(): @pytest.mark.unit def test_resolve_query_language_fallback_to_primary(): - """当无任何语言线索时(无 script 检测),应回落到租户 primary_language。""" fake_es = FakeESClient() builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) - # "123" 无 CJK/Latin 等 script,_detect_script_language 返回 None lang, conf, source, conflict = builder._resolve_query_language( query="123", log_language=None, @@ -139,16 +201,10 @@ def test_resolve_query_language_fallback_to_primary(): @pytest.mark.unit -def test_suggestion_service_basic_flow(monkeypatch): - """ - SuggestionService.search 应正确调用 ES 并返回 suggestion + products。 - 使用 FakeESClient 避免真实 ES 依赖。 - """ - # 覆盖 tenant_config_loader 以避免依赖外部 config.yaml 改动 +def test_suggestion_service_basic_flow_uses_alias_and_routing(): from config import tenant_config_loader as tcl loader = tcl.get_tenant_config_loader() - # 强制覆盖内部缓存配置 loader._config = { "default": {"primary_language": "en", "index_languages": ["en", "zh"]}, "tenants": { @@ -157,14 +213,15 @@ def test_suggestion_service_basic_flow(monkeypatch): } fake_es = FakeESClient() + alias_name = get_suggestion_alias_name("1") + fake_es.aliases[alias_name] = ["search_suggestions_tenant_1_v20260310190000"] + service = SuggestionService(es_client=fake_es) result = service.search( tenant_id="1", query="iph", language="en", size=5, - with_results=True, - result_size=2, ) assert result["resolved_language"] == "en" @@ -172,13 +229,119 @@ def test_suggestion_service_basic_flow(monkeypatch): assert result["took_ms"] >= 0 suggestions = result["suggestions"] assert len(suggestions) == 1 - s0 = suggestions[0] - assert s0["text"] == "iphone 15" - assert s0["lang"] == "en" - assert isinstance(s0.get("products"), list) - assert len(s0["products"]) >= 1 - p0 = s0["products"][0] - assert p0["spu_id"] == "12345" - assert "title" in p0 - assert "price" in p0 + assert suggestions[0]["text"] == "iphone 15" + + search_calls = [x for x in fake_es.calls if x.get("op") == "search"] + assert len(search_calls) >= 2 + assert any(x.get("routing") == "1" for x in search_calls) + assert any(x.get("index") == alias_name for x in search_calls) + + +@pytest.mark.unit +def test_publish_alias_and_cleanup_old_versions(monkeypatch): + fake_es = FakeESClient() + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) + + tenant_id = "162" + alias_name = get_suggestion_alias_name(tenant_id) + fake_es.indices.update( + { + "search_suggestions_tenant_162_v20260310170000", + "search_suggestions_tenant_162_v20260310180000", + "search_suggestions_tenant_162_v20260310190000", + } + ) + fake_es.aliases[alias_name] = ["search_suggestions_tenant_162_v20260310180000"] + + monkeypatch.setattr(builder, "_upsert_meta", lambda tenant_id, patch: None) + + result = builder._publish_alias( + tenant_id=tenant_id, + index_name="search_suggestions_tenant_162_v20260310190000", + keep_versions=2, + ) + + assert result["current_index"] == "search_suggestions_tenant_162_v20260310190000" + assert fake_es.aliases[alias_name] == ["search_suggestions_tenant_162_v20260310190000"] + assert "search_suggestions_tenant_162_v20260310170000" not in fake_es.indices + + +@pytest.mark.unit +def test_incremental_bootstrap_when_no_active_index(monkeypatch): + fake_es = FakeESClient() + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) + + from config import tenant_config_loader as tcl + + loader = tcl.get_tenant_config_loader() + loader._config = { + "default": {"primary_language": "en", "index_languages": ["en", "zh"]}, + "tenants": {"162": {"primary_language": "en", "index_languages": ["en", "zh"]}}, + } + + monkeypatch.setattr( + builder, + "rebuild_tenant_index", + lambda **kwargs: {"mode": "full", "tenant_id": kwargs["tenant_id"], "index_name": "v_idx"}, + ) + + result = builder.incremental_update_tenant_index(tenant_id="162", bootstrap_if_missing=True) + assert result["mode"] == "incremental" + assert result["bootstrapped"] is True + assert result["bootstrap_result"]["mode"] == "full" + + +@pytest.mark.unit +def test_incremental_updates_existing_index(monkeypatch): + fake_es = FakeESClient() + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) + + from config import tenant_config_loader as tcl + + loader = tcl.get_tenant_config_loader() + loader._config = { + "default": {"primary_language": "en", "index_languages": ["en", "zh"]}, + "tenants": {"162": {"primary_language": "en", "index_languages": ["en", "zh"]}}, + } + + tenant_id = "162" + alias_name = get_suggestion_alias_name(tenant_id) + active_index = "search_suggestions_tenant_162_v20260310190000" + fake_es.aliases[alias_name] = [active_index] + + watermark = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() + monkeypatch.setattr(builder, "_get_meta", lambda _tenant_id: {"last_incremental_watermark": watermark}) + monkeypatch.setattr(builder, "_upsert_meta", lambda tenant_id, patch: None) + + monkeypatch.setattr( + builder, + "_build_incremental_deltas", + lambda **kwargs: { + ("en", "iphone 15"): QueryDelta( + tenant_id=tenant_id, + lang="en", + text="iphone 15", + text_norm="iphone 15", + delta_7d=2, + delta_30d=3, + lang_confidence=1.0, + lang_source="log_field", + lang_conflict=False, + ) + }, + ) + + result = builder.incremental_update_tenant_index( + tenant_id=tenant_id, + bootstrap_if_missing=False, + overlap_minutes=10, + ) + + assert result["mode"] == "incremental" + assert result["target_index"] == active_index + assert result["updated_terms"] == 1 + assert result["bulk_result"]["failed"] == 0 + bulk_calls = [x for x in fake_es.calls if x.get("op") == "bulk_actions"] + assert len(bulk_calls) == 1 + assert len(bulk_calls[0]["actions"]) == 1 diff --git a/utils/es_client.py b/utils/es_client.py index 8799508..4d0e1d0 100644 --- a/utils/es_client.py +++ b/utils/es_client.py @@ -49,7 +49,7 @@ class ESClient: # Add authentication if provided if username and password: - client_config['http_auth'] = (username, password) + client_config['basic_auth'] = (username, password) # Merge additional kwargs client_config.update(kwargs) @@ -88,6 +88,54 @@ class ESClient: logger.error(f"Failed to create index '{index_name}': {e}", exc_info=True) return False + def put_alias(self, index_name: str, alias_name: str) -> bool: + """Add alias for an index.""" + try: + self.client.indices.put_alias(index=index_name, name=alias_name) + return True + except Exception as e: + logger.error( + "Failed to put alias '%s' for index '%s': %s", + alias_name, + index_name, + e, + exc_info=True, + ) + return False + + def alias_exists(self, alias_name: str) -> bool: + """Check if alias exists.""" + try: + return self.client.indices.exists_alias(name=alias_name) + except Exception as e: + logger.error("Failed to check alias exists '%s': %s", alias_name, e, exc_info=True) + return False + + def get_alias_indices(self, alias_name: str) -> List[str]: + """Get concrete indices behind alias.""" + try: + result = self.client.indices.get_alias(name=alias_name) + return sorted(list((result or {}).keys())) + except Exception: + return [] + + def update_aliases(self, actions: List[Dict[str, Any]]) -> bool: + """Atomically update aliases.""" + try: + self.client.indices.update_aliases(body={"actions": actions}) + return True + except Exception as e: + logger.error("Failed to update aliases: %s", e, exc_info=True) + return False + + def list_indices(self, pattern: str) -> List[str]: + """List indices by wildcard pattern.""" + try: + result = self.client.indices.get(index=pattern, allow_no_indices=True) + return sorted(list((result or {}).keys())) + except Exception: + return [] + def delete_index(self, index_name: str) -> bool: """ Delete an index. @@ -153,12 +201,37 @@ class ESClient: 'errors': [str(e)] } + def bulk_actions(self, actions: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Execute generic bulk actions. + + Args: + actions: elasticsearch.helpers.bulk compatible action list + """ + if not actions: + return {'success': 0, 'failed': 0, 'errors': []} + try: + success, failed = bulk(self.client, actions, raise_on_error=False) + return { + 'success': success, + 'failed': len(failed), + 'errors': failed + } + except Exception as e: + logger.error("Bulk actions failed: %s", e, exc_info=True) + return { + 'success': 0, + 'failed': len(actions), + 'errors': [str(e)], + } + def search( self, index_name: str, body: Dict[str, Any], size: int = 10, - from_: int = 0 + from_: int = 0, + routing: Optional[str] = None, ) -> Dict[str, Any]: """ Execute search query. @@ -189,7 +262,8 @@ class ESClient: index=index_name, body=body, size=size, - from_=from_ + from_=from_, + routing=routing, ) except Exception as e: logger.error(f"Search failed: {e}", exc_info=True) -- libgit2 0.21.2