Commit daf66a51dbc4d9319f05454c1960a1fe44b71394

Authored by tangwang
1 parent 30f2a10b

已完成接口级压测脚本,覆盖搜索、suggest

和微服务(embedding/translate/rerank)。

**新增文件**
-
压测主脚本:[perf_api_benchmark.py](/data/saas-search/scripts/perf_api_benchmark.py:1)
-
自定义用例模板:[perf_cases.json.example](/data/saas-search/scripts/perf_cases.json.example:1)

**文档更新**
-
在接口对接文档增加“接口级压测脚本”章节:[搜索API对接指南.md](/data/saas-search/docs/搜索API对接指南.md:2089)

**支持的场景**
- `backend_search` -> `POST /search/`
- `backend_suggest` -> `GET /search/suggestions`
- `embed_text` -> `POST /embed/text`
- `translate` -> `POST /translate`
- `rerank` -> `POST /rerank`
- `all` -> 依次执行上述全部场景

**你可以直接执行的命令**
1. `./.venv/bin/python scripts/perf_api_benchmark.py --scenario
   backend_suggest --tenant-id 162 --duration 30 --concurrency 50`
2. `./.venv/bin/python scripts/perf_api_benchmark.py --scenario
   backend_search --tenant-id 162 --duration 30 --concurrency 20`
3. `./.venv/bin/python scripts/perf_api_benchmark.py --scenario all
   --tenant-id 162 --duration 60 --concurrency 30 --output
perf_reports/all.json`
4. `./.venv/bin/python scripts/perf_api_benchmark.py --scenario all
   --tenant-id 162 --cases-file scripts/perf_cases.json.example
--duration 60 --concurrency 40 --output perf_reports/custom_all.json`

**可选参数**
- `--backend-base` `--embedding-base` `--translator-base`
  `--reranker-base`:切到你的实际服务地址
- `--max-requests`:限制总请求数
- `--max-errors`:错误达到阈值提前停止
- `--pause`:`all` 模式下场景间暂停

**本地已验证**
- `backend_suggest` 小规模并发压测成功(200,成功率 100%)
- `backend_search` 小规模并发压测成功(200,成功率 100%)
- `translate` 小规模并发压测成功(200,成功率 100%)
docs/搜索API对接指南.md
@@ -2081,3 +2081,60 @@ curl "http://localhost:6006/health" @@ -2081,3 +2081,60 @@ curl "http://localhost:6006/health"
2081 | `hanlp_standard` ⚠️ TODO(暂不支持) | 中文 | 中文查询分析器(用于中文字段) | 2081 | `hanlp_standard` ⚠️ TODO(暂不支持) | 中文 | 中文查询分析器(用于中文字段) |
2082 | `english` | 英文 | 标准英文分析器(用于英文字段) | 2082 | `english` | 英文 | 标准英文分析器(用于英文字段) |
2083 | `lowercase` | - | 小写标准化器(用于keyword子字段) | 2083 | `lowercase` | - | 小写标准化器(用于keyword子字段) |
  2084 +
  2085 +---
  2086 +
  2087 +## 10. 接口级压测脚本
  2088 +
  2089 +仓库提供统一压测脚本:`scripts/perf_api_benchmark.py`,用于对以下接口做并发压测:
  2090 +
  2091 +- 后端搜索:`POST /search/`
  2092 +- 搜索建议:`GET /search/suggestions`
  2093 +- 向量服务:`POST /embed/text`
  2094 +- 翻译服务:`POST /translate`
  2095 +- 重排服务:`POST /rerank`
  2096 +
  2097 +### 10.1 快速示例
  2098 +
  2099 +```bash
  2100 +# suggest 压测(tenant 162)
  2101 +python scripts/perf_api_benchmark.py \
  2102 + --scenario backend_suggest \
  2103 + --tenant-id 162 \
  2104 + --duration 30 \
  2105 + --concurrency 50
  2106 +
  2107 +# search 压测
  2108 +python scripts/perf_api_benchmark.py \
  2109 + --scenario backend_search \
  2110 + --tenant-id 162 \
  2111 + --duration 30 \
  2112 + --concurrency 20
  2113 +
  2114 +# 全链路压测(search + suggest + embedding + translate + rerank)
  2115 +python scripts/perf_api_benchmark.py \
  2116 + --scenario all \
  2117 + --tenant-id 162 \
  2118 + --duration 60 \
  2119 + --concurrency 30 \
  2120 + --output perf_reports/all.json
  2121 +```
  2122 +
  2123 +### 10.2 自定义用例
  2124 +
  2125 +可通过 `--cases-file` 覆盖默认请求模板。示例文件:
  2126 +
  2127 +```bash
  2128 +scripts/perf_cases.json.example
  2129 +```
  2130 +
  2131 +执行示例:
  2132 +
  2133 +```bash
  2134 +python scripts/perf_api_benchmark.py \
  2135 + --scenario all \
  2136 + --tenant-id 162 \
  2137 + --cases-file scripts/perf_cases.json.example \
  2138 + --duration 60 \
  2139 + --concurrency 40
  2140 +```
scripts/perf_api_benchmark.py 0 → 100755
@@ -0,0 +1,464 @@ @@ -0,0 +1,464 @@
  1 +#!/usr/bin/env python3
  2 +"""
  3 +API-level performance test script for search stack services.
  4 +
  5 +Default scenarios (aligned with docs/搜索API对接指南.md):
  6 +- backend_search POST /search/
  7 +- backend_suggest GET /search/suggestions
  8 +- embed_text POST /embed/text
  9 +- translate POST /translate
  10 +- rerank POST /rerank
  11 +
  12 +Examples:
  13 + python scripts/perf_api_benchmark.py --scenario backend_search --duration 30 --concurrency 20 --tenant-id 162
  14 + python scripts/perf_api_benchmark.py --scenario backend_suggest --duration 30 --concurrency 50 --tenant-id 162
  15 + python scripts/perf_api_benchmark.py --scenario all --duration 60 --concurrency 80 --tenant-id 162
  16 + python scripts/perf_api_benchmark.py --scenario all --cases-file scripts/perf_cases.json.example --output perf_result.json
  17 +"""
  18 +
  19 +from __future__ import annotations
  20 +
  21 +import argparse
  22 +import asyncio
  23 +import json
  24 +import math
  25 +import statistics
  26 +import time
  27 +from dataclasses import dataclass
  28 +from pathlib import Path
  29 +from typing import Any, Dict, List, Optional, Tuple
  30 +
  31 +import httpx
  32 +
  33 +
  34 +@dataclass
  35 +class RequestTemplate:
  36 + method: str
  37 + path: str
  38 + params: Optional[Dict[str, Any]] = None
  39 + json_body: Optional[Any] = None
  40 + headers: Optional[Dict[str, str]] = None
  41 +
  42 +
  43 +@dataclass
  44 +class Scenario:
  45 + name: str
  46 + templates: List[RequestTemplate]
  47 + timeout_sec: float
  48 +
  49 +
  50 +@dataclass
  51 +class RequestResult:
  52 + ok: bool
  53 + status_code: int
  54 + latency_ms: float
  55 + error: str = ""
  56 +
  57 +
  58 +def percentile(sorted_values: List[float], p: float) -> float:
  59 + if not sorted_values:
  60 + return 0.0
  61 + if p <= 0:
  62 + return sorted_values[0]
  63 + if p >= 100:
  64 + return sorted_values[-1]
  65 + rank = (len(sorted_values) - 1) * (p / 100.0)
  66 + low = int(math.floor(rank))
  67 + high = int(math.ceil(rank))
  68 + if low == high:
  69 + return sorted_values[low]
  70 + weight = rank - low
  71 + return sorted_values[low] * (1.0 - weight) + sorted_values[high] * weight
  72 +
  73 +
  74 +def make_default_templates(tenant_id: str) -> Dict[str, List[RequestTemplate]]:
  75 + return {
  76 + "backend_search": [
  77 + RequestTemplate(
  78 + method="POST",
  79 + path="/search/",
  80 + headers={"X-Tenant-ID": tenant_id},
  81 + json_body={"query": "wireless mouse", "size": 10, "language": "en"},
  82 + ),
  83 + RequestTemplate(
  84 + method="POST",
  85 + path="/search/",
  86 + headers={"X-Tenant-ID": tenant_id},
  87 + json_body={"query": "芭比娃娃", "size": 10, "language": "zh"},
  88 + ),
  89 + RequestTemplate(
  90 + method="POST",
  91 + path="/search/",
  92 + headers={"X-Tenant-ID": tenant_id},
  93 + json_body={"query": "f", "size": 10, "language": "en"},
  94 + ),
  95 + ],
  96 + "backend_suggest": [
  97 + RequestTemplate(
  98 + method="GET",
  99 + path="/search/suggestions",
  100 + headers={"X-Tenant-ID": tenant_id},
  101 + params={"q": "f", "size": 10, "language": "en"},
  102 + ),
  103 + RequestTemplate(
  104 + method="GET",
  105 + path="/search/suggestions",
  106 + headers={"X-Tenant-ID": tenant_id},
  107 + params={"q": "玩", "size": 10, "language": "zh"},
  108 + ),
  109 + RequestTemplate(
  110 + method="GET",
  111 + path="/search/suggestions",
  112 + headers={"X-Tenant-ID": tenant_id},
  113 + params={"q": "shi", "size": 10, "language": "en"},
  114 + ),
  115 + ],
  116 + "embed_text": [
  117 + RequestTemplate(
  118 + method="POST",
  119 + path="/embed/text",
  120 + json_body=["wireless mouse", "gaming keyboard", "barbie doll"],
  121 + )
  122 + ],
  123 + "translate": [
  124 + RequestTemplate(
  125 + method="POST",
  126 + path="/translate",
  127 + json_body={"text": "商品名称", "target_lang": "en", "source_lang": "zh", "model": "qwen"},
  128 + ),
  129 + RequestTemplate(
  130 + method="POST",
  131 + path="/translate",
  132 + json_body={"text": "Product title", "target_lang": "zh", "model": "qwen"},
  133 + ),
  134 + ],
  135 + "rerank": [
  136 + RequestTemplate(
  137 + method="POST",
  138 + path="/rerank",
  139 + json_body={
  140 + "query": "wireless mouse",
  141 + "docs": [
  142 + "Wireless ergonomic mouse with rechargeable battery",
  143 + "USB-C cable 1m",
  144 + "Gaming mouse 26000 DPI",
  145 + ],
  146 + "normalize": True,
  147 + },
  148 + )
  149 + ],
  150 + }
  151 +
  152 +
  153 +def load_cases_from_file(path: Path, tenant_id: str) -> Dict[str, List[RequestTemplate]]:
  154 + data = json.loads(path.read_text(encoding="utf-8"))
  155 + out: Dict[str, List[RequestTemplate]] = {}
  156 + for scenario_name, requests_data in (data.get("scenarios") or {}).items():
  157 + templates: List[RequestTemplate] = []
  158 + for item in requests_data:
  159 + headers = dict(item.get("headers") or {})
  160 + if "X-Tenant-ID" in headers and str(headers["X-Tenant-ID"]).strip() == "${tenant_id}":
  161 + headers["X-Tenant-ID"] = tenant_id
  162 + templates.append(
  163 + RequestTemplate(
  164 + method=str(item.get("method", "GET")).upper(),
  165 + path=str(item.get("path", "")).strip(),
  166 + params=item.get("params"),
  167 + json_body=item.get("json"),
  168 + headers=headers or None,
  169 + )
  170 + )
  171 + if templates:
  172 + out[scenario_name] = templates
  173 + return out
  174 +
  175 +
  176 +def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]:
  177 + defaults = make_default_templates(args.tenant_id)
  178 + if args.cases_file:
  179 + custom = load_cases_from_file(Path(args.cases_file), tenant_id=args.tenant_id)
  180 + defaults.update(custom)
  181 +
  182 + scenario_base = {
  183 + "backend_search": args.backend_base,
  184 + "backend_suggest": args.backend_base,
  185 + "embed_text": args.embedding_base,
  186 + "translate": args.translator_base,
  187 + "rerank": args.reranker_base,
  188 + }
  189 +
  190 + scenarios: Dict[str, Scenario] = {}
  191 + for name, templates in defaults.items():
  192 + if name not in scenario_base:
  193 + continue
  194 + base = scenario_base[name].rstrip("/")
  195 + rewritten: List[RequestTemplate] = []
  196 + for t in templates:
  197 + path = t.path if t.path.startswith("/") else f"/{t.path}"
  198 + rewritten.append(
  199 + RequestTemplate(
  200 + method=t.method,
  201 + path=f"{base}{path}",
  202 + params=t.params,
  203 + json_body=t.json_body,
  204 + headers=t.headers,
  205 + )
  206 + )
  207 + scenarios[name] = Scenario(name=name, templates=rewritten, timeout_sec=args.timeout)
  208 + return scenarios
  209 +
  210 +
  211 +async def run_single_scenario(
  212 + scenario: Scenario,
  213 + duration_sec: int,
  214 + concurrency: int,
  215 + max_requests: int,
  216 + max_errors: int,
  217 +) -> Dict[str, Any]:
  218 + latencies: List[float] = []
  219 + status_counter: Dict[int, int] = {}
  220 + err_counter: Dict[str, int] = {}
  221 + total_requests = 0
  222 + success_requests = 0
  223 + stop_flag = False
  224 + lock = asyncio.Lock()
  225 + start = time.perf_counter()
  226 +
  227 + timeout = httpx.Timeout(timeout=scenario.timeout_sec)
  228 + limits = httpx.Limits(max_connections=max(concurrency * 2, 20), max_keepalive_connections=max(concurrency, 10))
  229 +
  230 + async def worker(worker_id: int, client: httpx.AsyncClient) -> None:
  231 + nonlocal total_requests, success_requests, stop_flag
  232 + idx = worker_id % len(scenario.templates)
  233 +
  234 + while not stop_flag:
  235 + elapsed = time.perf_counter() - start
  236 + if duration_sec > 0 and elapsed >= duration_sec:
  237 + break
  238 +
  239 + async with lock:
  240 + if max_requests > 0 and total_requests >= max_requests:
  241 + stop_flag = True
  242 + break
  243 + total_requests += 1
  244 +
  245 + tpl = scenario.templates[idx % len(scenario.templates)]
  246 + idx += 1
  247 +
  248 + t0 = time.perf_counter()
  249 + ok = False
  250 + status = 0
  251 + err = ""
  252 + try:
  253 + resp = await client.request(
  254 + method=tpl.method,
  255 + url=tpl.path,
  256 + params=tpl.params,
  257 + json=tpl.json_body,
  258 + headers=tpl.headers,
  259 + )
  260 + status = int(resp.status_code)
  261 + ok = 200 <= status < 300
  262 + if not ok:
  263 + err = f"http_{status}"
  264 + except Exception as e:
  265 + err = type(e).__name__
  266 + t1 = time.perf_counter()
  267 + cost_ms = (t1 - t0) * 1000.0
  268 +
  269 + async with lock:
  270 + latencies.append(cost_ms)
  271 + if status:
  272 + status_counter[status] = status_counter.get(status, 0) + 1
  273 + if ok:
  274 + success_requests += 1
  275 + else:
  276 + err_counter[err or "unknown"] = err_counter.get(err or "unknown", 0) + 1
  277 + total_err = sum(err_counter.values())
  278 + if max_errors > 0 and total_err >= max_errors:
  279 + stop_flag = True
  280 +
  281 + async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
  282 + tasks = [asyncio.create_task(worker(i, client)) for i in range(concurrency)]
  283 + await asyncio.gather(*tasks)
  284 +
  285 + elapsed = max(time.perf_counter() - start, 1e-9)
  286 + lat_sorted = sorted(latencies)
  287 +
  288 + result = {
  289 + "scenario": scenario.name,
  290 + "duration_sec": round(elapsed, 3),
  291 + "total_requests": total_requests,
  292 + "success_requests": success_requests,
  293 + "failed_requests": max(total_requests - success_requests, 0),
  294 + "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0,
  295 + "throughput_rps": round(total_requests / elapsed, 2),
  296 + "latency_ms": {
  297 + "avg": round(statistics.mean(lat_sorted), 2) if lat_sorted else 0.0,
  298 + "p50": round(percentile(lat_sorted, 50), 2),
  299 + "p90": round(percentile(lat_sorted, 90), 2),
  300 + "p95": round(percentile(lat_sorted, 95), 2),
  301 + "p99": round(percentile(lat_sorted, 99), 2),
  302 + "max": round(max(lat_sorted), 2) if lat_sorted else 0.0,
  303 + },
  304 + "status_codes": dict(sorted(status_counter.items(), key=lambda x: x[0])),
  305 + "errors": dict(sorted(err_counter.items(), key=lambda x: x[0])),
  306 + }
  307 + return result
  308 +
  309 +
  310 +def format_summary(result: Dict[str, Any]) -> str:
  311 + lines = []
  312 + lines.append(f"\\n=== Scenario: {result['scenario']} ===")
  313 + lines.append(
  314 + "requests={total_requests} success={success_requests} fail={failed_requests} "
  315 + "success_rate={success_rate}% rps={throughput_rps}".format(**result)
  316 + )
  317 + lat = result["latency_ms"]
  318 + lines.append(
  319 + f"latency(ms): avg={lat['avg']} p50={lat['p50']} p90={lat['p90']} p95={lat['p95']} p99={lat['p99']} max={lat['max']}"
  320 + )
  321 + lines.append(f"status_codes: {result['status_codes']}")
  322 + if result["errors"]:
  323 + lines.append(f"errors: {result['errors']}")
  324 + return "\\n".join(lines)
  325 +
  326 +
  327 +def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
  328 + if not results:
  329 + return {}
  330 + total_requests = sum(x["total_requests"] for x in results)
  331 + success_requests = sum(x["success_requests"] for x in results)
  332 + failed_requests = sum(x["failed_requests"] for x in results)
  333 + total_duration = sum(x["duration_sec"] for x in results)
  334 + weighted_avg_latency = 0.0
  335 + if total_requests > 0:
  336 + weighted_avg_latency = sum(x["latency_ms"]["avg"] * x["total_requests"] for x in results) / total_requests
  337 +
  338 + return {
  339 + "scenario": "ALL",
  340 + "total_requests": total_requests,
  341 + "success_requests": success_requests,
  342 + "failed_requests": failed_requests,
  343 + "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0,
  344 + "aggregate_rps": round(total_requests / max(total_duration, 1e-9), 2),
  345 + "weighted_avg_latency_ms": round(weighted_avg_latency, 2),
  346 + }
  347 +
  348 +
  349 +def parse_args() -> argparse.Namespace:
  350 + parser = argparse.ArgumentParser(description="Interface-level load test for search and related microservices")
  351 + parser.add_argument(
  352 + "--scenario",
  353 + type=str,
  354 + default="all",
  355 + help="Scenario: backend_search | backend_suggest | embed_text | translate | rerank | all",
  356 + )
  357 + parser.add_argument("--tenant-id", type=str, default="162", help="Tenant ID for backend search/suggest")
  358 + parser.add_argument("--duration", type=int, default=30, help="Duration seconds per scenario; <=0 means no duration cap")
  359 + parser.add_argument("--concurrency", type=int, default=20, help="Concurrent workers per scenario")
  360 + parser.add_argument("--max-requests", type=int, default=0, help="Stop after N requests per scenario (0 means unlimited)")
  361 + parser.add_argument("--timeout", type=float, default=10.0, help="Request timeout seconds")
  362 + parser.add_argument("--max-errors", type=int, default=0, help="Stop scenario when accumulated errors reach this value")
  363 +
  364 + parser.add_argument("--backend-base", type=str, default="http://127.0.0.1:6002", help="Base URL for backend search API")
  365 + parser.add_argument("--embedding-base", type=str, default="http://127.0.0.1:6005", help="Base URL for embedding service")
  366 + parser.add_argument("--translator-base", type=str, default="http://127.0.0.1:6006", help="Base URL for translation service")
  367 + parser.add_argument("--reranker-base", type=str, default="http://127.0.0.1:6007", help="Base URL for reranker service")
  368 +
  369 + parser.add_argument("--cases-file", type=str, default="", help="Optional JSON file to override/add request templates")
  370 + parser.add_argument("--output", type=str, default="", help="Optional output JSON path")
  371 + parser.add_argument("--pause", type=float, default=0.0, help="Pause seconds between scenarios in all mode")
  372 + return parser.parse_args()
  373 +
  374 +
  375 +async def main_async() -> int:
  376 + args = parse_args()
  377 + scenarios = build_scenarios(args)
  378 +
  379 + all_names = ["backend_search", "backend_suggest", "embed_text", "translate", "rerank"]
  380 + if args.scenario == "all":
  381 + run_names = [x for x in all_names if x in scenarios]
  382 + else:
  383 + if args.scenario not in scenarios:
  384 + print(f"Unknown scenario: {args.scenario}")
  385 + print(f"Available: {', '.join(sorted(scenarios.keys()))}")
  386 + return 2
  387 + run_names = [args.scenario]
  388 +
  389 + if not run_names:
  390 + print("No scenarios to run.")
  391 + return 2
  392 +
  393 + print("Load test config:")
  394 + print(f" scenario={args.scenario}")
  395 + print(f" tenant_id={args.tenant_id}")
  396 + print(f" duration={args.duration}s")
  397 + print(f" concurrency={args.concurrency}")
  398 + print(f" max_requests={args.max_requests}")
  399 + print(f" timeout={args.timeout}s")
  400 + print(f" max_errors={args.max_errors}")
  401 + print(f" backend_base={args.backend_base}")
  402 + print(f" embedding_base={args.embedding_base}")
  403 + print(f" translator_base={args.translator_base}")
  404 + print(f" reranker_base={args.reranker_base}")
  405 +
  406 + results: List[Dict[str, Any]] = []
  407 + for i, name in enumerate(run_names, start=1):
  408 + scenario = scenarios[name]
  409 + print(f"\\n[{i}/{len(run_names)}] running {name} ...")
  410 + result = await run_single_scenario(
  411 + scenario=scenario,
  412 + duration_sec=args.duration,
  413 + concurrency=args.concurrency,
  414 + max_requests=args.max_requests,
  415 + max_errors=args.max_errors,
  416 + )
  417 + print(format_summary(result))
  418 + results.append(result)
  419 +
  420 + if args.pause > 0 and i < len(run_names):
  421 + await asyncio.sleep(args.pause)
  422 +
  423 + final = {
  424 + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
  425 + "config": {
  426 + "scenario": args.scenario,
  427 + "tenant_id": args.tenant_id,
  428 + "duration_sec": args.duration,
  429 + "concurrency": args.concurrency,
  430 + "max_requests": args.max_requests,
  431 + "timeout_sec": args.timeout,
  432 + "max_errors": args.max_errors,
  433 + "backend_base": args.backend_base,
  434 + "embedding_base": args.embedding_base,
  435 + "translator_base": args.translator_base,
  436 + "reranker_base": args.reranker_base,
  437 + "cases_file": args.cases_file or None,
  438 + },
  439 + "results": results,
  440 + "overall": aggregate_results(results),
  441 + }
  442 +
  443 + print("\\n=== Overall ===")
  444 + print(json.dumps(final["overall"], ensure_ascii=False, indent=2))
  445 +
  446 + if args.output:
  447 + out_path = Path(args.output)
  448 + out_path.parent.mkdir(parents=True, exist_ok=True)
  449 + out_path.write_text(json.dumps(final, ensure_ascii=False, indent=2), encoding="utf-8")
  450 + print(f"Saved JSON report: {out_path}")
  451 +
  452 + return 0
  453 +
  454 +
  455 +def main() -> int:
  456 + try:
  457 + return asyncio.run(main_async())
  458 + except KeyboardInterrupt:
  459 + print("Interrupted by user")
  460 + return 130
  461 +
  462 +
  463 +if __name__ == "__main__":
  464 + raise SystemExit(main())
scripts/perf_cases.json.example 0 → 100644
@@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
  1 +{
  2 + "scenarios": {
  3 + "backend_search": [
  4 + {
  5 + "method": "POST",
  6 + "path": "/search/",
  7 + "headers": {"X-Tenant-ID": "${tenant_id}"},
  8 + "json": {"query": "wireless mouse", "size": 20, "language": "en", "enable_rerank": false}
  9 + },
  10 + {
  11 + "method": "POST",
  12 + "path": "/search/",
  13 + "headers": {"X-Tenant-ID": "${tenant_id}"},
  14 + "json": {"query": "芭比娃娃", "size": 20, "language": "zh", "enable_rerank": false}
  15 + }
  16 + ],
  17 + "backend_suggest": [
  18 + {
  19 + "method": "GET",
  20 + "path": "/search/suggestions",
  21 + "headers": {"X-Tenant-ID": "${tenant_id}"},
  22 + "params": {"q": "f", "size": 20, "language": "en"}
  23 + },
  24 + {
  25 + "method": "GET",
  26 + "path": "/search/suggestions",
  27 + "headers": {"X-Tenant-ID": "${tenant_id}"},
  28 + "params": {"q": "玩", "size": 20, "language": "zh"}
  29 + }
  30 + ],
  31 + "embed_text": [
  32 + {
  33 + "method": "POST",
  34 + "path": "/embed/text",
  35 + "json": ["wireless mouse", "gaming keyboard", "USB-C cable", "barbie doll"]
  36 + }
  37 + ],
  38 + "translate": [
  39 + {
  40 + "method": "POST",
  41 + "path": "/translate",
  42 + "json": {"text": "商品标题", "target_lang": "en", "source_lang": "zh", "model": "qwen"}
  43 + }
  44 + ],
  45 + "rerank": [
  46 + {
  47 + "method": "POST",
  48 + "path": "/rerank",
  49 + "json": {
  50 + "query": "wireless mouse",
  51 + "docs": [
  52 + "Wireless ergonomic mouse",
  53 + "Bluetooth gaming mouse",
  54 + "USB cable 1 meter",
  55 + "Mouse pad large size"
  56 + ],
  57 + "normalize": true
  58 + }
  59 + }
  60 + ]
  61 + }
  62 +}
suggestion/TROUBLESHOOTING.md
@@ -93,3 +93,44 @@ curl -u &quot;$ES_USERNAME:$ES_PASSWORD&quot; &quot;$ES_HOST&quot; @@ -93,3 +93,44 @@ curl -u &quot;$ES_USERNAME:$ES_PASSWORD&quot; &quot;$ES_HOST&quot;
93 ``` 93 ```
94 94
95 或先执行一次全量。 95 或先执行一次全量。
  96 +
  97 +## 8. `q=F` 这类前缀为空,但商品里明明有 `F...` 标题
  98 +
  99 +### 典型原因
  100 +
  101 +- suggestion 索引里只写入了 query_log,没写入商品 title(例如商品文档缺少 `spu_id`,但有 `id`)。
  102 +- 英文标题太长,被噪声过滤(现在会自动提取前导短语,例如 `Furby Furblets 2-Pack`)。
  103 +
  104 +### 逐条排查
  105 +
  106 +1. 看 suggestion alias 是否有 `en` 文档:
  107 +
  108 +```bash
  109 +ALIAS_NAME="${ES_INDEX_NAMESPACE:-}search_suggestions_tenant_162_current"
  110 +curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST/$ALIAS_NAME/_search?pretty" \
  111 + -H 'Content-Type: application/json' \
  112 + -d '{"size":0,"aggs":{"langs":{"terms":{"field":"lang","size":20}}}}'
  113 +```
  114 +
  115 +2. 查 `en` 下是否有 `f` 前缀:
  116 +
  117 +```bash
  118 +curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST/$ALIAS_NAME/_search?pretty" \
  119 + -H 'Content-Type: application/json' \
  120 + -d '{"size":20,"_source":["text","text_norm","lang"],"query":{"bool":{"filter":[{"term":{"lang":"en"}}],"must":[{"prefix":{"text_norm":"f"}}]}}}'
  121 +```
  122 +
  123 +3. 对照商品索引确认源数据确实存在 `F...`:
  124 +
  125 +```bash
  126 +curl -u "$ES_USERNAME:$ES_PASSWORD" "$ES_HOST/search_products_tenant_162/_search?pretty" \
  127 + -H 'Content-Type: application/json' \
  128 + -d '{"size":20,"_source":["id","spu_id","title.en"],"query":{"match_phrase_prefix":{"title.en":"f"}}}'
  129 +```
  130 +
  131 +4. 重建后再测 API:
  132 +
  133 +```bash
  134 +./scripts/rebuild_suggestions.sh 162 F en
  135 +curl "http://localhost:6002/search/suggestions?q=F&size=40&language=en&tenant_id=162"
  136 +```
suggestion/builder.py
@@ -128,6 +128,27 @@ class SuggestionIndexBuilder: @@ -128,6 +128,27 @@ class SuggestionIndexBuilder:
128 return text_value 128 return text_value
129 129
130 @staticmethod 130 @staticmethod
  131 + def _prepare_title_for_suggest(title: str, max_len: int = 120) -> str:
  132 + """
  133 + Keep title-derived suggestions concise:
  134 + - keep raw title when short enough
  135 + - for long titles, keep the leading phrase before common separators
  136 + - fallback to hard truncate
  137 + """
  138 + raw = str(title or "").strip()
  139 + if not raw:
  140 + return ""
  141 + if len(raw) <= max_len:
  142 + return raw
  143 +
  144 + head = re.split(r"[,,;;|/\\\\((\\[【]", raw, maxsplit=1)[0].strip()
  145 + if 1 < len(head) <= max_len:
  146 + return head
  147 +
  148 + truncated = raw[:max_len].rstrip(" ,,;;|/\\\\-—–()()[]【】")
  149 + return truncated or raw[:max_len]
  150 +
  151 + @staticmethod
131 def _split_qanchors(value: Any) -> List[str]: 152 def _split_qanchors(value: Any) -> List[str]:
132 if value is None: 153 if value is None:
133 return [] 154 return []
@@ -252,8 +273,12 @@ class SuggestionIndexBuilder: @@ -252,8 +273,12 @@ class SuggestionIndexBuilder:
252 while True: 273 while True:
253 body: Dict[str, Any] = { 274 body: Dict[str, Any] = {
254 "size": batch_size, 275 "size": batch_size,
255 - "_source": ["spu_id", "title", "qanchors"],  
256 - "sort": [{"spu_id": "asc"}], 276 + "_source": ["id", "spu_id", "title", "qanchors"],
  277 + # Prefer spu_id when present; fall back to id.keyword for current mappings.
  278 + "sort": [
  279 + {"spu_id": {"order": "asc", "missing": "_last"}},
  280 + {"id.keyword": {"order": "asc", "missing": "_last"}},
  281 + ],
257 "query": {"match_all": {}}, 282 "query": {"match_all": {}},
258 } 283 }
259 if search_after is not None: 284 if search_after is not None:
@@ -431,8 +456,8 @@ class SuggestionIndexBuilder: @@ -431,8 +456,8 @@ class SuggestionIndexBuilder:
431 # Step 1: product title/qanchors 456 # Step 1: product title/qanchors
432 for hit in self._iter_products(tenant_id, batch_size=batch_size): 457 for hit in self._iter_products(tenant_id, batch_size=batch_size):
433 src = hit.get("_source", {}) or {} 458 src = hit.get("_source", {}) or {}
434 - spu_id = str(src.get("spu_id") or "")  
435 - if not spu_id: 459 + product_id = str(src.get("spu_id") or src.get("id") or hit.get("_id") or "")
  460 + if not product_id:
436 continue 461 continue
437 title_obj = src.get("title") or {} 462 title_obj = src.get("title") or {}
438 qanchor_obj = src.get("qanchors") or {} 463 qanchor_obj = src.get("qanchors") or {}
@@ -440,7 +465,7 @@ class SuggestionIndexBuilder: @@ -440,7 +465,7 @@ class SuggestionIndexBuilder:
440 for lang in index_languages: 465 for lang in index_languages:
441 title = "" 466 title = ""
442 if isinstance(title_obj, dict): 467 if isinstance(title_obj, dict):
443 - title = str(title_obj.get(lang) or "").strip() 468 + title = self._prepare_title_for_suggest(title_obj.get(lang) or "")
444 if title: 469 if title:
445 text_norm = self._normalize_text(title) 470 text_norm = self._normalize_text(title)
446 if not self._looks_noise(text_norm): 471 if not self._looks_noise(text_norm):
@@ -449,7 +474,7 @@ class SuggestionIndexBuilder: @@ -449,7 +474,7 @@ class SuggestionIndexBuilder:
449 if c is None: 474 if c is None:
450 c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) 475 c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang)
451 key_to_candidate[key] = c 476 key_to_candidate[key] = c
452 - c.add_product("title", spu_id=spu_id) 477 + c.add_product("title", spu_id=product_id)
453 478
454 q_raw = None 479 q_raw = None
455 if isinstance(qanchor_obj, dict): 480 if isinstance(qanchor_obj, dict):
@@ -463,7 +488,7 @@ class SuggestionIndexBuilder: @@ -463,7 +488,7 @@ class SuggestionIndexBuilder:
463 if c is None: 488 if c is None:
464 c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) 489 c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
465 key_to_candidate[key] = c 490 key_to_candidate[key] = c
466 - c.add_product("qanchor", spu_id=spu_id) 491 + c.add_product("qanchor", spu_id=product_id)
467 492
468 # Step 2: query logs 493 # Step 2: query logs
469 now = datetime.now(timezone.utc) 494 now = datetime.now(timezone.utc)
tests/test_suggestions.py
@@ -345,3 +345,93 @@ def test_incremental_updates_existing_index(monkeypatch): @@ -345,3 +345,93 @@ def test_incremental_updates_existing_index(monkeypatch):
345 bulk_calls = [x for x in fake_es.calls if x.get("op") == "bulk_actions"] 345 bulk_calls = [x for x in fake_es.calls if x.get("op") == "bulk_actions"]
346 assert len(bulk_calls) == 1 346 assert len(bulk_calls) == 1
347 assert len(bulk_calls[0]["actions"]) == 1 347 assert len(bulk_calls[0]["actions"]) == 1
  348 +
  349 +
  350 +@pytest.mark.unit
  351 +def test_build_full_candidates_fallback_to_id_when_spu_id_missing(monkeypatch):
  352 + fake_es = FakeESClient()
  353 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  354 +
  355 + monkeypatch.setattr(
  356 + builder,
  357 + "_iter_products",
  358 + lambda tenant_id, batch_size=500: iter(
  359 + [
  360 + {
  361 + "_id": "521",
  362 + "_source": {
  363 + "id": "521",
  364 + "title": {"en": "Furby Toy"},
  365 + "qanchors": {"en": "furby"},
  366 + },
  367 + }
  368 + ]
  369 + ),
  370 + )
  371 + monkeypatch.setattr(builder, "_iter_query_log_rows", lambda **kwargs: iter([]))
  372 +
  373 + key_to_candidate = builder._build_full_candidates(
  374 + tenant_id="162",
  375 + index_languages=["en"],
  376 + primary_language="en",
  377 + days=365,
  378 + batch_size=100,
  379 + min_query_len=1,
  380 + )
  381 +
  382 + title_key = ("en", "furby toy")
  383 + qanchor_key = ("en", "furby")
  384 + assert title_key in key_to_candidate
  385 + assert qanchor_key in key_to_candidate
  386 + assert key_to_candidate[title_key].title_spu_ids == {"521"}
  387 + assert key_to_candidate[qanchor_key].qanchor_spu_ids == {"521"}
  388 +
  389 +
  390 +@pytest.mark.unit
  391 +def test_build_full_candidates_splits_long_title_for_suggest(monkeypatch):
  392 + fake_es = FakeESClient()
  393 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  394 +
  395 + long_title = (
  396 + "Furby Furblets 2-Pack, Mini Friends Ray-Vee & Hip-Bop, 45+ Sounds Each, "
  397 + "Music & Furbish Phrases, Electronic Plush Toys, Rainbow & Pink/Purple, "
  398 + "Ages 6+ (Amazon Exclusive)"
  399 + )
  400 + monkeypatch.setattr(
  401 + builder,
  402 + "_iter_products",
  403 + lambda tenant_id, batch_size=500: iter(
  404 + [{"_id": "521", "_source": {"id": "521", "title": {"en": long_title}, "qanchors": {}}}]
  405 + ),
  406 + )
  407 + monkeypatch.setattr(builder, "_iter_query_log_rows", lambda **kwargs: iter([]))
  408 +
  409 + key_to_candidate = builder._build_full_candidates(
  410 + tenant_id="162",
  411 + index_languages=["en"],
  412 + primary_language="en",
  413 + days=365,
  414 + batch_size=100,
  415 + min_query_len=1,
  416 + )
  417 +
  418 + key = ("en", "furby furblets 2-pack")
  419 + assert key in key_to_candidate
  420 + assert key_to_candidate[key].text == "Furby Furblets 2-Pack"
  421 +
  422 +
  423 +@pytest.mark.unit
  424 +def test_iter_products_requests_dual_sort_and_fields():
  425 + fake_es = FakeESClient()
  426 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  427 +
  428 + list(builder._iter_products(tenant_id="162", batch_size=10))
  429 +
  430 + search_calls = [x for x in fake_es.calls if x.get("op") == "search"]
  431 + assert len(search_calls) >= 1
  432 + body = search_calls[0]["body"]
  433 + sort = body.get("sort", [])
  434 + assert {"spu_id": {"order": "asc", "missing": "_last"}} in sort
  435 + assert {"id.keyword": {"order": "asc", "missing": "_last"}} in sort
  436 + assert "id" in body.get("_source", [])
  437 + assert "spu_id" in body.get("_source", [])