Commit a99e62ba4e6b6e7493d2d0afc445d5794edaa616

Authored by tangwang
1 parent c51d254f

记录各阶段耗时

config/config.yaml
... ... @@ -188,7 +188,7 @@ services:
188 188 infer_batch_size: 64
189 189 sort_by_doc_length: true
190 190 length_sort_mode: "char" # char | token
191   - instruction: "Given a web search query, retrieve relevant passages that answer the query"
  191 + instruction: "Given a shopping query, rank product titles by relevance"
192 192  
193 193 # SPU配置(已启用,使用嵌套skus)
194 194 spu_config:
... ...
context/request_context.py
... ... @@ -19,7 +19,10 @@ class RequestContextStage(Enum):
19 19 QUERY_PARSING = "query_parsing"
20 20 BOOLEAN_PARSING = "boolean_parsing"
21 21 QUERY_BUILDING = "query_building"
22   - ELASTICSEARCH_SEARCH = "elasticsearch_search"
  22 + # ES 主召回查询
  23 + ELASTICSEARCH_SEARCH_PRIMARY = "elasticsearch_search_primary"
  24 + # ES 按 ID 回源分页详情回填
  25 + ELASTICSEARCH_PAGE_FILL = "elasticsearch_page_fill"
23 26 RESULT_PROCESSING = "result_processing"
24 27 RERANKING = "reranking"
25 28  
... ...
docs/TEI_SERVICE说明文档.md
... ... @@ -107,7 +107,7 @@ curl -sS http://127.0.0.1:8080/health
107 107 ```bash
108 108 curl -sS http://127.0.0.1:8080/embed \
109 109 -H "Content-Type: application/json" \
110   - -d '{"inputs":["Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: What is the capital of China?"]}'
  110 + -d '{"inputs":["Instruct: Given a shopping query, rank product titles by relevance\nQuery: What is the capital of China?"]}'
111 111 ```
112 112  
113 113 返回应为二维数组(每条输入对应一个向量)。
... ...
docs/性能测试报告.md
... ... @@ -93,12 +93,6 @@ source activate.sh
93 93 ./scripts/service_ctl.sh start embedding translator reranker backend
94 94 ```
95 95  
96   -如果 `backend` 未成功常驻,可临时手动启动:
97   -
98   -```bash
99   -.venv/bin/python main.py serve --host 0.0.0.0 --port 6002 --es-host http://localhost:9200
100   -```
101   -
102 96 ### 5.3 健康检查
103 97  
104 98 ```bash
... ... @@ -160,12 +154,38 @@ cd /data/saas-search
160 154  
161 155 ### 7.4 Reranker(rerank)
162 156  
  157 +测试方法(本节已按新口径重跑):
  158 +- `query` 固定为 `wireless mouse`
  159 +- 每次请求 `docs=386`
  160 +- 从 `1000` 个候选单词中随机采样,先随机句长 `15-40`,再生成每条 doc 句子
  161 +- 并发 `1/5/10/20`,每档 `20s`
  162 +- 结果文件:`perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json`
  163 +
  164 +复现命令:
  165 +
  166 +```bash
  167 +.venv/bin/python scripts/perf_api_benchmark.py \
  168 + --scenario rerank \
  169 + --duration 20 \
  170 + --concurrency-list 1,5,10,20 \
  171 + --timeout 60 \
  172 + --rerank-dynamic-docs \
  173 + --rerank-doc-count 386 \
  174 + --rerank-vocab-size 1000 \
  175 + --rerank-sentence-min-words 15 \
  176 + --rerank-sentence-max-words 40 \
  177 + --rerank-query "wireless mouse" \
  178 + --rerank-seed 20260312 \
  179 + --reranker-base http://127.0.0.1:6007 \
  180 + --output perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json
  181 +```
  182 +
163 183 | 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) |
164 184 |---:|---:|---:|---:|---:|---:|---:|
165   -| 1 | 802 | 100.0% | 40.06 | 24.87 | 37.45 | 49.63 |
166   -| 5 | 796 | 100.0% | 39.53 | 125.70 | 190.02 | 218.60 |
167   -| 10 | 853 | 100.0% | 41.89 | 235.87 | 315.37 | 402.27 |
168   -| 20 | 836 | 100.0% | 40.92 | 481.98 | 723.56 | 781.81 |
  185 +| 1 | 14 | 100.0% | 0.67 | 1498.64 | 1799.25 | 2160.96 |
  186 +| 5 | 15 | 100.0% | 0.62 | 8011.99 | 9725.61 | 9726.02 |
  187 +| 10 | 20 | 100.0% | 0.61 | 16217.12 | 18043.05 | 18050.04 |
  188 +| 20 | 20 | 100.0% | 0.60 | 33252.35 | 33456.74 | 33480.14 |
169 189  
170 190 ## 8. 指标解读与并发建议
171 191  
... ... @@ -174,7 +194,7 @@ cd /data/saas-search
174 194 - `backend_search`:吞吐约 `8 rps` 平台化,延迟随并发上升明显,属于重链路(检索+向量+重排)特征。
175 195 - `backend_suggest`:吞吐高且稳定(约 `200+ rps`),对并发更友好。
176 196 - `embed_text`:随并发提升吞吐持续增长,延迟平滑上升,扩展性较好。
177   -- `rerank`:吞吐在 `~40 rps` 附近平台化,延迟随并发线性抬升,符合模型推理瓶颈特征
  197 +- `rerank`:在 `docs=386` 的真实口径下,吞吐约 `0.6 rps`,延迟随并发显著抬升(并发20下 P95 约 `33.5s`),是当前最重瓶颈
178 198  
179 199 ### 8.2 并发压测建议
180 200  
... ... @@ -232,6 +252,7 @@ cd /data/saas-search
232 252 - 压测脚本:`scripts/perf_api_benchmark.py`
233 253 - 本次结果:`perf_reports/2026-03-12/perf_matrix_report.json`
234 254 - Search 多租户补测:`perf_reports/2026-03-12/search_tenant_matrix/`
  255 +- Reranker 386 docs 口径补测:`perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json`
235 256  
236 257 ## 12. Search 多租户补测(2026-03-12)
237 258  
... ...
reranker/README.md
... ... @@ -54,16 +54,16 @@ services:
54 54 length_sort_mode: "char" # char | token
55 55 enable_prefix_caching: true
56 56 enforce_eager: false
57   - instruction: "Given a web search query, retrieve relevant passages that answer the query"
  57 + instruction: "Given a shopping query, rank product titles by relevance"
58 58 qwen3_transformers:
59 59 model_name: "Qwen/Qwen3-Reranker-0.6B"
60   - instruction: "Given a web search query, retrieve relevant passages that answer the query"
  60 + instruction: "Given a shopping query, rank product titles by relevance"
61 61 max_length: 8192
62 62 batch_size: 64
63 63 use_fp16: true
64 64 tensor_parallel_size: 1
65 65 gpu_memory_utilization: 0.8
66   - instruction: "Given a web search query, retrieve relevant passages that answer the query"
  66 + instruction: "Given a shopping query, rank product titles by relevance"
67 67 ```
68 68  
69 69 - 服务端口、请求限制等仍在 `reranker/config.py`(或环境变量 `RERANKER_PORT`、`RERANKER_HOST`)。
... ...
reranker/backends/qwen3_transformers.py
... ... @@ -42,7 +42,7 @@ class Qwen3TransformersRerankerBackend:
42 42 model_name = str(self._config.get("model_name") or "Qwen/Qwen3-Reranker-0.6B")
43 43 self._instruction = str(
44 44 self._config.get("instruction")
45   - or "Given a web search query, retrieve relevant passages that answer the query"
  45 + or "Given a shopping query, rank product titles by relevance"
46 46 )
47 47 max_length = int(self._config.get("max_length", 8192))
48 48 batch_size = int(self._config.get("batch_size", 64))
... ...
reranker/backends/qwen3_vllm.py
... ... @@ -65,7 +65,7 @@ class Qwen3VLLMRerankerBackend:
65 65 dtype = str(self._config.get("dtype", "float16")).strip().lower()
66 66 self._instruction = str(
67 67 self._config.get("instruction")
68   - or "Given a web search query, retrieve relevant passages that answer the query"
  68 + or "Given a shopping query, rank product titles by relevance"
69 69 )
70 70 infer_batch_size = os.getenv("RERANK_VLLM_INFER_BATCH_SIZE") or self._config.get("infer_batch_size", 64)
71 71 sort_by_doc_length = os.getenv("RERANK_VLLM_SORT_BY_DOC_LENGTH")
... ...
scripts/perf_api_benchmark.py
... ... @@ -22,6 +22,7 @@ import argparse
22 22 import asyncio
23 23 import json
24 24 import math
  25 +import random
25 26 import statistics
26 27 import time
27 28 from dataclasses import dataclass
... ... @@ -251,6 +252,7 @@ async def run_single_scenario(
251 252 concurrency: int,
252 253 max_requests: int,
253 254 max_errors: int,
  255 + rerank_dynamic_cfg: Optional[Dict[str, Any]] = None,
254 256 ) -> Dict[str, Any]:
255 257 latencies: List[float] = []
256 258 status_counter: Dict[int, int] = {}
... ... @@ -267,6 +269,9 @@ async def run_single_scenario(
267 269 async def worker(worker_id: int, client: httpx.AsyncClient) -> None:
268 270 nonlocal total_requests, success_requests, stop_flag
269 271 idx = worker_id % len(scenario.templates)
  272 + worker_rng: Optional[random.Random] = None
  273 + if rerank_dynamic_cfg is not None:
  274 + worker_rng = random.Random(int(rerank_dynamic_cfg["seed"]) + worker_id)
270 275  
271 276 while not stop_flag:
272 277 elapsed = time.perf_counter() - start
... ... @@ -287,11 +292,14 @@ async def run_single_scenario(
287 292 status = 0
288 293 err = ""
289 294 try:
  295 + req_json_body = tpl.json_body
  296 + if rerank_dynamic_cfg is not None and worker_rng is not None:
  297 + req_json_body = build_random_rerank_payload(rerank_dynamic_cfg, worker_rng)
290 298 resp = await client.request(
291 299 method=tpl.method,
292 300 url=tpl.path,
293 301 params=tpl.params,
294   - json=tpl.json_body,
  302 + json=req_json_body,
295 303 headers=tpl.headers,
296 304 )
297 305 status = int(resp.status_code)
... ... @@ -448,9 +456,83 @@ def parse_args() -> argparse.Namespace:
448 456 default="",
449 457 help="Comma-separated concurrency list (e.g. 1,5,10,20). If set, overrides --concurrency.",
450 458 )
  459 + parser.add_argument(
  460 + "--rerank-dynamic-docs",
  461 + action="store_true",
  462 + help="For rerank scenario, generate docs payload dynamically on every request.",
  463 + )
  464 + parser.add_argument("--rerank-doc-count", type=int, default=386, help="Doc count per rerank request when dynamic docs are enabled")
  465 + parser.add_argument("--rerank-vocab-size", type=int, default=1000, help="Word pool size for rerank dynamic docs generation")
  466 + parser.add_argument("--rerank-sentence-min-words", type=int, default=15, help="Minimum words per generated doc sentence")
  467 + parser.add_argument("--rerank-sentence-max-words", type=int, default=40, help="Maximum words per generated doc sentence")
  468 + parser.add_argument("--rerank-query", type=str, default="wireless mouse", help="Fixed query used for rerank dynamic docs mode")
  469 + parser.add_argument("--rerank-seed", type=int, default=20260312, help="Base random seed for rerank dynamic docs mode")
451 470 return parser.parse_args()
452 471  
453 472  
  473 +def build_rerank_dynamic_cfg(args: argparse.Namespace) -> Dict[str, Any]:
  474 + min_words = int(args.rerank_sentence_min_words)
  475 + max_words = int(args.rerank_sentence_max_words)
  476 + doc_count = int(args.rerank_doc_count)
  477 + vocab_size = int(args.rerank_vocab_size)
  478 + if doc_count <= 0:
  479 + raise ValueError(f"rerank-doc-count must be > 0, got {doc_count}")
  480 + if vocab_size <= 0:
  481 + raise ValueError(f"rerank-vocab-size must be > 0, got {vocab_size}")
  482 + if min_words <= 0:
  483 + raise ValueError(f"rerank-sentence-min-words must be > 0, got {min_words}")
  484 + if max_words < min_words:
  485 + raise ValueError(
  486 + f"rerank-sentence-max-words must be >= rerank-sentence-min-words, got {max_words} < {min_words}"
  487 + )
  488 + if args.rerank_seed < 0:
  489 + raise ValueError(f"rerank-seed must be >= 0, got {args.rerank_seed}")
  490 +
  491 + # Use deterministic, letter-only pseudo words to avoid long tokenization of numeric strings.
  492 + syllables = [
  493 + "al", "an", "ar", "as", "at", "ba", "be", "bi", "bo", "ca",
  494 + "ce", "ci", "co", "da", "de", "di", "do", "el", "en", "er",
  495 + "fa", "fe", "fi", "fo", "ga", "ge", "gi", "go", "ha", "he",
  496 + "hi", "ho", "ia", "ie", "il", "in", "io", "is", "ka", "ke",
  497 + "ki", "ko", "la", "le", "li", "lo", "ma", "me", "mi", "mo",
  498 + ]
  499 + word_pool: List[str] = []
  500 + for a in syllables:
  501 + for b in syllables:
  502 + word_pool.append(f"{a}{b}")
  503 + if len(word_pool) >= vocab_size:
  504 + break
  505 + if len(word_pool) >= vocab_size:
  506 + break
  507 + if len(word_pool) < vocab_size:
  508 + raise ValueError(f"Unable to generate enough synthetic words: requested={vocab_size}, got={len(word_pool)}")
  509 + return {
  510 + "query": args.rerank_query,
  511 + "doc_count": doc_count,
  512 + "min_words": min_words,
  513 + "max_words": max_words,
  514 + "seed": int(args.rerank_seed),
  515 + "normalize": True,
  516 + "word_pool": word_pool,
  517 + }
  518 +
  519 +
  520 +def build_random_rerank_payload(
  521 + cfg: Dict[str, Any],
  522 + rng: random.Random,
  523 +) -> Dict[str, Any]:
  524 + word_pool: List[str] = cfg["word_pool"]
  525 + docs = []
  526 + for _ in range(cfg["doc_count"]):
  527 + doc_len = rng.randint(cfg["min_words"], cfg["max_words"])
  528 + docs.append(" ".join(rng.choices(word_pool, k=doc_len)))
  529 + return {
  530 + "query": cfg["query"],
  531 + "docs": docs,
  532 + "normalize": bool(cfg.get("normalize", True)),
  533 + }
  534 +
  535 +
454 536 async def main_async() -> int:
455 537 args = parse_args()
456 538 scenarios = build_scenarios(args)
... ... @@ -474,6 +556,14 @@ async def main_async() -&gt; int:
474 556 print("No scenarios to run.")
475 557 return 2
476 558  
  559 + rerank_dynamic_cfg: Optional[Dict[str, Any]] = None
  560 + if args.rerank_dynamic_docs:
  561 + try:
  562 + rerank_dynamic_cfg = build_rerank_dynamic_cfg(args)
  563 + except ValueError as exc:
  564 + print(str(exc))
  565 + return 2
  566 +
477 567 concurrency_values = [args.concurrency]
478 568 if args.concurrency_list:
479 569 try:
... ... @@ -498,6 +588,13 @@ async def main_async() -&gt; int:
498 588 print(f" embedding_base={args.embedding_base}")
499 589 print(f" translator_base={args.translator_base}")
500 590 print(f" reranker_base={args.reranker_base}")
  591 + if args.rerank_dynamic_docs:
  592 + print(" rerank_dynamic_docs=True")
  593 + print(f" rerank_doc_count={args.rerank_doc_count}")
  594 + print(f" rerank_vocab_size={args.rerank_vocab_size}")
  595 + print(f" rerank_sentence_words=[{args.rerank_sentence_min_words},{args.rerank_sentence_max_words}]")
  596 + print(f" rerank_query={args.rerank_query}")
  597 + print(f" rerank_seed={args.rerank_seed}")
501 598  
502 599 results: List[Dict[str, Any]] = []
503 600 total_jobs = len(run_names) * len(concurrency_values)
... ... @@ -513,6 +610,7 @@ async def main_async() -&gt; int:
513 610 concurrency=c,
514 611 max_requests=args.max_requests,
515 612 max_errors=args.max_errors,
  613 + rerank_dynamic_cfg=rerank_dynamic_cfg if name == "rerank" else None,
516 614 )
517 615 result["concurrency"] = c
518 616 print(format_summary(result))
... ... @@ -538,6 +636,13 @@ async def main_async() -&gt; int:
538 636 "translator_base": args.translator_base,
539 637 "reranker_base": args.reranker_base,
540 638 "cases_file": args.cases_file or None,
  639 + "rerank_dynamic_docs": args.rerank_dynamic_docs,
  640 + "rerank_doc_count": args.rerank_doc_count,
  641 + "rerank_vocab_size": args.rerank_vocab_size,
  642 + "rerank_sentence_min_words": args.rerank_sentence_min_words,
  643 + "rerank_sentence_max_words": args.rerank_sentence_max_words,
  644 + "rerank_query": args.rerank_query,
  645 + "rerank_seed": args.rerank_seed,
541 646 },
542 647 "results": results,
543 648 "overall": aggregate_results(results),
... ...
search/searcher.py
... ... @@ -459,8 +459,8 @@ class Searcher:
459 459 finally:
460 460 context.end_stage(RequestContextStage.QUERY_BUILDING)
461 461  
462   - # Step 4: Elasticsearch search
463   - context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
  462 + # Step 4: Elasticsearch search (primary recall)
  463 + context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY)
464 464 try:
465 465 # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from)
466 466 es_response = self.es_client.search(
... ... @@ -489,7 +489,7 @@ class Searcher:
489 489 )
490 490 raise
491 491 finally:
492   - context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
  492 + context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY)
493 493  
494 494 # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行)
495 495 if do_rerank and in_rerank_window:
... ... @@ -557,29 +557,33 @@ class Searcher:
557 557 extra={'reqid': context.reqid, 'uid': context.uid}
558 558 )
559 559 else:
560   - page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None]
561   - details_by_id, fill_took = self._fetch_hits_by_ids(
562   - index_name=index_name,
563   - doc_ids=page_ids,
564   - source_spec=response_source_spec,
565   - )
566   - filled = 0
567   - for hit in sliced:
568   - hid = hit.get("_id")
569   - if hid is None:
570   - continue
571   - detail_hit = details_by_id.get(str(hid))
572   - if detail_hit is None:
573   - continue
574   - if "_source" in detail_hit:
575   - hit["_source"] = detail_hit.get("_source") or {}
576   - filled += 1
577   - if fill_took:
578   - es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took)
579   - context.logger.info(
580   - f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms",
581   - extra={'reqid': context.reqid, 'uid': context.uid}
582   - )
  560 + context.start_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL)
  561 + try:
  562 + page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None]
  563 + details_by_id, fill_took = self._fetch_hits_by_ids(
  564 + index_name=index_name,
  565 + doc_ids=page_ids,
  566 + source_spec=response_source_spec,
  567 + )
  568 + filled = 0
  569 + for hit in sliced:
  570 + hid = hit.get("_id")
  571 + if hid is None:
  572 + continue
  573 + detail_hit = details_by_id.get(str(hid))
  574 + if detail_hit is None:
  575 + continue
  576 + if "_source" in detail_hit:
  577 + hit["_source"] = detail_hit.get("_source") or {}
  578 + filled += 1
  579 + if fill_took:
  580 + es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took)
  581 + context.logger.info(
  582 + f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms",
  583 + extra={'reqid': context.reqid, 'uid': context.uid}
  584 + )
  585 + finally:
  586 + context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL)
583 587  
584 588 context.logger.info(
585 589 f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条",
... ...
tests/conftest.py
... ... @@ -191,7 +191,7 @@ def temp_config_file() -&gt; Generator[str, None, None]:
191 191 "functions": []
192 192 },
193 193 "rerank": {
194   - "rerank_window": 400
  194 + "rerank_window": 386
195 195 }
196 196 }
197 197  
... ...