From a99e62ba4e6b6e7493d2d0afc445d5794edaa616 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 12 Mar 2026 11:42:49 +0800 Subject: [PATCH] 记录各阶段耗时 --- config/config.yaml | 2 +- context/request_context.py | 5 ++++- docs/TEI_SERVICE说明文档.md | 2 +- docs/性能测试报告.md | 43 ++++++++++++++++++++++++++++++++----------- reranker/README.md | 6 +++--- reranker/backends/qwen3_transformers.py | 2 +- reranker/backends/qwen3_vllm.py | 2 +- scripts/perf_api_benchmark.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- search/searcher.py | 56 ++++++++++++++++++++++++++++++-------------------------- tests/conftest.py | 2 +- 10 files changed, 180 insertions(+), 47 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 5467cac..ae3abb4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -188,7 +188,7 @@ services: infer_batch_size: 64 sort_by_doc_length: true length_sort_mode: "char" # char | token - instruction: "Given a web search query, retrieve relevant passages that answer the query" + instruction: "Given a shopping query, rank product titles by relevance" # SPU配置(已启用,使用嵌套skus) spu_config: diff --git a/context/request_context.py b/context/request_context.py index 708afb3..e3837a7 100644 --- a/context/request_context.py +++ b/context/request_context.py @@ -19,7 +19,10 @@ class RequestContextStage(Enum): QUERY_PARSING = "query_parsing" BOOLEAN_PARSING = "boolean_parsing" QUERY_BUILDING = "query_building" - ELASTICSEARCH_SEARCH = "elasticsearch_search" + # ES 主召回查询 + ELASTICSEARCH_SEARCH_PRIMARY = "elasticsearch_search_primary" + # ES 按 ID 回源分页详情回填 + ELASTICSEARCH_PAGE_FILL = "elasticsearch_page_fill" RESULT_PROCESSING = "result_processing" RERANKING = "reranking" diff --git a/docs/TEI_SERVICE说明文档.md b/docs/TEI_SERVICE说明文档.md index f825b5f..8271483 100644 --- a/docs/TEI_SERVICE说明文档.md +++ b/docs/TEI_SERVICE说明文档.md @@ -107,7 +107,7 @@ curl -sS http://127.0.0.1:8080/health ```bash curl -sS http://127.0.0.1:8080/embed \ -H "Content-Type: application/json" \ - -d '{"inputs":["Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: What is the capital of China?"]}' + -d '{"inputs":["Instruct: Given a shopping query, rank product titles by relevance\nQuery: What is the capital of China?"]}' ``` 返回应为二维数组(每条输入对应一个向量)。 diff --git a/docs/性能测试报告.md b/docs/性能测试报告.md index 74390c2..c50264e 100644 --- a/docs/性能测试报告.md +++ b/docs/性能测试报告.md @@ -93,12 +93,6 @@ source activate.sh ./scripts/service_ctl.sh start embedding translator reranker backend ``` -如果 `backend` 未成功常驻,可临时手动启动: - -```bash -.venv/bin/python main.py serve --host 0.0.0.0 --port 6002 --es-host http://localhost:9200 -``` - ### 5.3 健康检查 ```bash @@ -160,12 +154,38 @@ cd /data/saas-search ### 7.4 Reranker(rerank) +测试方法(本节已按新口径重跑): +- `query` 固定为 `wireless mouse` +- 每次请求 `docs=386` +- 从 `1000` 个候选单词中随机采样,先随机句长 `15-40`,再生成每条 doc 句子 +- 并发 `1/5/10/20`,每档 `20s` +- 结果文件:`perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json` + +复现命令: + +```bash +.venv/bin/python scripts/perf_api_benchmark.py \ + --scenario rerank \ + --duration 20 \ + --concurrency-list 1,5,10,20 \ + --timeout 60 \ + --rerank-dynamic-docs \ + --rerank-doc-count 386 \ + --rerank-vocab-size 1000 \ + --rerank-sentence-min-words 15 \ + --rerank-sentence-max-words 40 \ + --rerank-query "wireless mouse" \ + --rerank-seed 20260312 \ + --reranker-base http://127.0.0.1:6007 \ + --output perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json +``` + | 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) | |---:|---:|---:|---:|---:|---:|---:| -| 1 | 802 | 100.0% | 40.06 | 24.87 | 37.45 | 49.63 | -| 5 | 796 | 100.0% | 39.53 | 125.70 | 190.02 | 218.60 | -| 10 | 853 | 100.0% | 41.89 | 235.87 | 315.37 | 402.27 | -| 20 | 836 | 100.0% | 40.92 | 481.98 | 723.56 | 781.81 | +| 1 | 14 | 100.0% | 0.67 | 1498.64 | 1799.25 | 2160.96 | +| 5 | 15 | 100.0% | 0.62 | 8011.99 | 9725.61 | 9726.02 | +| 10 | 20 | 100.0% | 0.61 | 16217.12 | 18043.05 | 18050.04 | +| 20 | 20 | 100.0% | 0.60 | 33252.35 | 33456.74 | 33480.14 | ## 8. 指标解读与并发建议 @@ -174,7 +194,7 @@ cd /data/saas-search - `backend_search`:吞吐约 `8 rps` 平台化,延迟随并发上升明显,属于重链路(检索+向量+重排)特征。 - `backend_suggest`:吞吐高且稳定(约 `200+ rps`),对并发更友好。 - `embed_text`:随并发提升吞吐持续增长,延迟平滑上升,扩展性较好。 -- `rerank`:吞吐在 `~40 rps` 附近平台化,延迟随并发线性抬升,符合模型推理瓶颈特征。 +- `rerank`:在 `docs=386` 的真实口径下,吞吐约 `0.6 rps`,延迟随并发显著抬升(并发20下 P95 约 `33.5s`),是当前最重瓶颈。 ### 8.2 并发压测建议 @@ -232,6 +252,7 @@ cd /data/saas-search - 压测脚本:`scripts/perf_api_benchmark.py` - 本次结果:`perf_reports/2026-03-12/perf_matrix_report.json` - Search 多租户补测:`perf_reports/2026-03-12/search_tenant_matrix/` +- Reranker 386 docs 口径补测:`perf_reports/2026-03-12/rerank_realistic/rerank_386docs.json` ## 12. Search 多租户补测(2026-03-12) diff --git a/reranker/README.md b/reranker/README.md index f8534d6..878c527 100644 --- a/reranker/README.md +++ b/reranker/README.md @@ -54,16 +54,16 @@ services: length_sort_mode: "char" # char | token enable_prefix_caching: true enforce_eager: false - instruction: "Given a web search query, retrieve relevant passages that answer the query" + instruction: "Given a shopping query, rank product titles by relevance" qwen3_transformers: model_name: "Qwen/Qwen3-Reranker-0.6B" - instruction: "Given a web search query, retrieve relevant passages that answer the query" + instruction: "Given a shopping query, rank product titles by relevance" max_length: 8192 batch_size: 64 use_fp16: true tensor_parallel_size: 1 gpu_memory_utilization: 0.8 - instruction: "Given a web search query, retrieve relevant passages that answer the query" + instruction: "Given a shopping query, rank product titles by relevance" ``` - 服务端口、请求限制等仍在 `reranker/config.py`(或环境变量 `RERANKER_PORT`、`RERANKER_HOST`)。 diff --git a/reranker/backends/qwen3_transformers.py b/reranker/backends/qwen3_transformers.py index beb2b18..7955167 100644 --- a/reranker/backends/qwen3_transformers.py +++ b/reranker/backends/qwen3_transformers.py @@ -42,7 +42,7 @@ class Qwen3TransformersRerankerBackend: model_name = str(self._config.get("model_name") or "Qwen/Qwen3-Reranker-0.6B") self._instruction = str( self._config.get("instruction") - or "Given a web search query, retrieve relevant passages that answer the query" + or "Given a shopping query, rank product titles by relevance" ) max_length = int(self._config.get("max_length", 8192)) batch_size = int(self._config.get("batch_size", 64)) diff --git a/reranker/backends/qwen3_vllm.py b/reranker/backends/qwen3_vllm.py index 7a08450..9ca830d 100644 --- a/reranker/backends/qwen3_vllm.py +++ b/reranker/backends/qwen3_vllm.py @@ -65,7 +65,7 @@ class Qwen3VLLMRerankerBackend: dtype = str(self._config.get("dtype", "float16")).strip().lower() self._instruction = str( self._config.get("instruction") - or "Given a web search query, retrieve relevant passages that answer the query" + or "Given a shopping query, rank product titles by relevance" ) infer_batch_size = os.getenv("RERANK_VLLM_INFER_BATCH_SIZE") or self._config.get("infer_batch_size", 64) sort_by_doc_length = os.getenv("RERANK_VLLM_SORT_BY_DOC_LENGTH") diff --git a/scripts/perf_api_benchmark.py b/scripts/perf_api_benchmark.py index 510181b..27039aa 100755 --- a/scripts/perf_api_benchmark.py +++ b/scripts/perf_api_benchmark.py @@ -22,6 +22,7 @@ import argparse import asyncio import json import math +import random import statistics import time from dataclasses import dataclass @@ -251,6 +252,7 @@ async def run_single_scenario( concurrency: int, max_requests: int, max_errors: int, + rerank_dynamic_cfg: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: latencies: List[float] = [] status_counter: Dict[int, int] = {} @@ -267,6 +269,9 @@ async def run_single_scenario( async def worker(worker_id: int, client: httpx.AsyncClient) -> None: nonlocal total_requests, success_requests, stop_flag idx = worker_id % len(scenario.templates) + worker_rng: Optional[random.Random] = None + if rerank_dynamic_cfg is not None: + worker_rng = random.Random(int(rerank_dynamic_cfg["seed"]) + worker_id) while not stop_flag: elapsed = time.perf_counter() - start @@ -287,11 +292,14 @@ async def run_single_scenario( status = 0 err = "" try: + req_json_body = tpl.json_body + if rerank_dynamic_cfg is not None and worker_rng is not None: + req_json_body = build_random_rerank_payload(rerank_dynamic_cfg, worker_rng) resp = await client.request( method=tpl.method, url=tpl.path, params=tpl.params, - json=tpl.json_body, + json=req_json_body, headers=tpl.headers, ) status = int(resp.status_code) @@ -448,9 +456,83 @@ def parse_args() -> argparse.Namespace: default="", help="Comma-separated concurrency list (e.g. 1,5,10,20). If set, overrides --concurrency.", ) + parser.add_argument( + "--rerank-dynamic-docs", + action="store_true", + help="For rerank scenario, generate docs payload dynamically on every request.", + ) + parser.add_argument("--rerank-doc-count", type=int, default=386, help="Doc count per rerank request when dynamic docs are enabled") + parser.add_argument("--rerank-vocab-size", type=int, default=1000, help="Word pool size for rerank dynamic docs generation") + parser.add_argument("--rerank-sentence-min-words", type=int, default=15, help="Minimum words per generated doc sentence") + parser.add_argument("--rerank-sentence-max-words", type=int, default=40, help="Maximum words per generated doc sentence") + parser.add_argument("--rerank-query", type=str, default="wireless mouse", help="Fixed query used for rerank dynamic docs mode") + parser.add_argument("--rerank-seed", type=int, default=20260312, help="Base random seed for rerank dynamic docs mode") return parser.parse_args() +def build_rerank_dynamic_cfg(args: argparse.Namespace) -> Dict[str, Any]: + min_words = int(args.rerank_sentence_min_words) + max_words = int(args.rerank_sentence_max_words) + doc_count = int(args.rerank_doc_count) + vocab_size = int(args.rerank_vocab_size) + if doc_count <= 0: + raise ValueError(f"rerank-doc-count must be > 0, got {doc_count}") + if vocab_size <= 0: + raise ValueError(f"rerank-vocab-size must be > 0, got {vocab_size}") + if min_words <= 0: + raise ValueError(f"rerank-sentence-min-words must be > 0, got {min_words}") + if max_words < min_words: + raise ValueError( + f"rerank-sentence-max-words must be >= rerank-sentence-min-words, got {max_words} < {min_words}" + ) + if args.rerank_seed < 0: + raise ValueError(f"rerank-seed must be >= 0, got {args.rerank_seed}") + + # Use deterministic, letter-only pseudo words to avoid long tokenization of numeric strings. + syllables = [ + "al", "an", "ar", "as", "at", "ba", "be", "bi", "bo", "ca", + "ce", "ci", "co", "da", "de", "di", "do", "el", "en", "er", + "fa", "fe", "fi", "fo", "ga", "ge", "gi", "go", "ha", "he", + "hi", "ho", "ia", "ie", "il", "in", "io", "is", "ka", "ke", + "ki", "ko", "la", "le", "li", "lo", "ma", "me", "mi", "mo", + ] + word_pool: List[str] = [] + for a in syllables: + for b in syllables: + word_pool.append(f"{a}{b}") + if len(word_pool) >= vocab_size: + break + if len(word_pool) >= vocab_size: + break + if len(word_pool) < vocab_size: + raise ValueError(f"Unable to generate enough synthetic words: requested={vocab_size}, got={len(word_pool)}") + return { + "query": args.rerank_query, + "doc_count": doc_count, + "min_words": min_words, + "max_words": max_words, + "seed": int(args.rerank_seed), + "normalize": True, + "word_pool": word_pool, + } + + +def build_random_rerank_payload( + cfg: Dict[str, Any], + rng: random.Random, +) -> Dict[str, Any]: + word_pool: List[str] = cfg["word_pool"] + docs = [] + for _ in range(cfg["doc_count"]): + doc_len = rng.randint(cfg["min_words"], cfg["max_words"]) + docs.append(" ".join(rng.choices(word_pool, k=doc_len))) + return { + "query": cfg["query"], + "docs": docs, + "normalize": bool(cfg.get("normalize", True)), + } + + async def main_async() -> int: args = parse_args() scenarios = build_scenarios(args) @@ -474,6 +556,14 @@ async def main_async() -> int: print("No scenarios to run.") return 2 + rerank_dynamic_cfg: Optional[Dict[str, Any]] = None + if args.rerank_dynamic_docs: + try: + rerank_dynamic_cfg = build_rerank_dynamic_cfg(args) + except ValueError as exc: + print(str(exc)) + return 2 + concurrency_values = [args.concurrency] if args.concurrency_list: try: @@ -498,6 +588,13 @@ async def main_async() -> int: print(f" embedding_base={args.embedding_base}") print(f" translator_base={args.translator_base}") print(f" reranker_base={args.reranker_base}") + if args.rerank_dynamic_docs: + print(" rerank_dynamic_docs=True") + print(f" rerank_doc_count={args.rerank_doc_count}") + print(f" rerank_vocab_size={args.rerank_vocab_size}") + print(f" rerank_sentence_words=[{args.rerank_sentence_min_words},{args.rerank_sentence_max_words}]") + print(f" rerank_query={args.rerank_query}") + print(f" rerank_seed={args.rerank_seed}") results: List[Dict[str, Any]] = [] total_jobs = len(run_names) * len(concurrency_values) @@ -513,6 +610,7 @@ async def main_async() -> int: concurrency=c, max_requests=args.max_requests, max_errors=args.max_errors, + rerank_dynamic_cfg=rerank_dynamic_cfg if name == "rerank" else None, ) result["concurrency"] = c print(format_summary(result)) @@ -538,6 +636,13 @@ async def main_async() -> int: "translator_base": args.translator_base, "reranker_base": args.reranker_base, "cases_file": args.cases_file or None, + "rerank_dynamic_docs": args.rerank_dynamic_docs, + "rerank_doc_count": args.rerank_doc_count, + "rerank_vocab_size": args.rerank_vocab_size, + "rerank_sentence_min_words": args.rerank_sentence_min_words, + "rerank_sentence_max_words": args.rerank_sentence_max_words, + "rerank_query": args.rerank_query, + "rerank_seed": args.rerank_seed, }, "results": results, "overall": aggregate_results(results), diff --git a/search/searcher.py b/search/searcher.py index 2afa588..091fa97 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -459,8 +459,8 @@ class Searcher: finally: context.end_stage(RequestContextStage.QUERY_BUILDING) - # Step 4: Elasticsearch search - context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) + # Step 4: Elasticsearch search (primary recall) + context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) try: # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( @@ -489,7 +489,7 @@ class Searcher: ) raise finally: - context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) + context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行) if do_rerank and in_rerank_window: @@ -557,29 +557,33 @@ class Searcher: extra={'reqid': context.reqid, 'uid': context.uid} ) else: - page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None] - details_by_id, fill_took = self._fetch_hits_by_ids( - index_name=index_name, - doc_ids=page_ids, - source_spec=response_source_spec, - ) - filled = 0 - for hit in sliced: - hid = hit.get("_id") - if hid is None: - continue - detail_hit = details_by_id.get(str(hid)) - if detail_hit is None: - continue - if "_source" in detail_hit: - hit["_source"] = detail_hit.get("_source") or {} - filled += 1 - if fill_took: - es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) - context.logger.info( - f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms", - extra={'reqid': context.reqid, 'uid': context.uid} - ) + context.start_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) + try: + page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None] + details_by_id, fill_took = self._fetch_hits_by_ids( + index_name=index_name, + doc_ids=page_ids, + source_spec=response_source_spec, + ) + filled = 0 + for hit in sliced: + hid = hit.get("_id") + if hid is None: + continue + detail_hit = details_by_id.get(str(hid)) + if detail_hit is None: + continue + if "_source" in detail_hit: + hit["_source"] = detail_hit.get("_source") or {} + filled += 1 + if fill_took: + es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) + context.logger.info( + f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms", + extra={'reqid': context.reqid, 'uid': context.uid} + ) + finally: + context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) context.logger.info( f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", diff --git a/tests/conftest.py b/tests/conftest.py index a861c60..f8a80df 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -191,7 +191,7 @@ def temp_config_file() -> Generator[str, None, None]: "functions": [] }, "rerank": { - "rerank_window": 400 + "rerank_window": 386 } } -- libgit2 0.21.2