#!/usr/bin/env python3 """ API-level performance test script for search stack services. Default scenarios (aligned with docs/搜索API对接指南.md): - backend_search POST /search/ - backend_suggest GET /search/suggestions - embed_text POST /embed/text - translate POST /translate - rerank POST /rerank Examples: python scripts/perf_api_benchmark.py --scenario backend_search --duration 30 --concurrency 20 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario backend_suggest --duration 30 --concurrency 50 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --duration 60 --concurrency 80 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --cases-file scripts/perf_cases.json.example --output perf_result.json """ from __future__ import annotations import argparse import asyncio import json import math import random import statistics import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import httpx @dataclass class RequestTemplate: method: str path: str params: Optional[Dict[str, Any]] = None json_body: Optional[Any] = None headers: Optional[Dict[str, str]] = None @dataclass class Scenario: name: str templates: List[RequestTemplate] timeout_sec: float @dataclass class RequestResult: ok: bool status_code: int latency_ms: float error: str = "" def _is_finite_number(v: Any) -> bool: if isinstance(v, bool): return False if isinstance(v, (int, float)): return math.isfinite(float(v)) return False def validate_response_payload( scenario_name: str, tpl: RequestTemplate, payload: Any, ) -> Tuple[bool, str]: """ Lightweight payload validation for correctness-aware perf tests. Currently strict for embed_text to catch NaN/null vector regressions. """ if scenario_name != "embed_text": return True, "" expected_len = len(tpl.json_body) if isinstance(tpl.json_body, list) else None if not isinstance(payload, list): return False, "invalid_payload_non_list" if expected_len is not None and len(payload) != expected_len: return False, "invalid_payload_length" if len(payload) == 0: return False, "invalid_payload_empty" for i, vec in enumerate(payload): if not isinstance(vec, list) or len(vec) == 0: return False, f"invalid_vector_{i}_shape" for x in vec: if not _is_finite_number(x): return False, f"invalid_vector_{i}_non_finite" return True, "" def percentile(sorted_values: List[float], p: float) -> float: if not sorted_values: return 0.0 if p <= 0: return sorted_values[0] if p >= 100: return sorted_values[-1] rank = (len(sorted_values) - 1) * (p / 100.0) low = int(math.floor(rank)) high = int(math.ceil(rank)) if low == high: return sorted_values[low] weight = rank - low return sorted_values[low] * (1.0 - weight) + sorted_values[high] * weight def make_default_templates(tenant_id: str) -> Dict[str, List[RequestTemplate]]: return { "backend_search": [ RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "wireless mouse", "size": 10, "language": "en"}, ), RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "芭比娃娃", "size": 10, "language": "zh"}, ), RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "f", "size": 10, "language": "en"}, ), ], "backend_suggest": [ RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "f", "size": 10, "language": "en"}, ), RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "玩", "size": 10, "language": "zh"}, ), RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "shi", "size": 10, "language": "en"}, ), ], "embed_text": [ RequestTemplate( method="POST", path="/embed/text", json_body=["wireless mouse", "gaming keyboard", "barbie doll"], ) ], "translate": [ RequestTemplate( method="POST", path="/translate", json_body={"text": "商品名称", "target_lang": "en", "source_lang": "zh", "model": "qwen"}, ), RequestTemplate( method="POST", path="/translate", json_body={"text": "Product title", "target_lang": "zh", "model": "qwen"}, ), ], "rerank": [ RequestTemplate( method="POST", path="/rerank", json_body={ "query": "wireless mouse", "docs": [ "Wireless ergonomic mouse with rechargeable battery", "USB-C cable 1m", "Gaming mouse 26000 DPI", ], "normalize": True, }, ) ], } def load_cases_from_file(path: Path, tenant_id: str) -> Dict[str, List[RequestTemplate]]: data = json.loads(path.read_text(encoding="utf-8")) out: Dict[str, List[RequestTemplate]] = {} for scenario_name, requests_data in (data.get("scenarios") or {}).items(): templates: List[RequestTemplate] = [] for item in requests_data: headers = dict(item.get("headers") or {}) if "X-Tenant-ID" in headers and str(headers["X-Tenant-ID"]).strip() == "${tenant_id}": headers["X-Tenant-ID"] = tenant_id templates.append( RequestTemplate( method=str(item.get("method", "GET")).upper(), path=str(item.get("path", "")).strip(), params=item.get("params"), json_body=item.get("json"), headers=headers or None, ) ) if templates: out[scenario_name] = templates return out def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]: defaults = make_default_templates(args.tenant_id) if args.cases_file: custom = load_cases_from_file(Path(args.cases_file), tenant_id=args.tenant_id) defaults.update(custom) scenario_base = { "backend_search": args.backend_base, "backend_suggest": args.backend_base, "embed_text": args.embedding_base, "translate": args.translator_base, "rerank": args.reranker_base, } scenarios: Dict[str, Scenario] = {} for name, templates in defaults.items(): if name not in scenario_base: continue base = scenario_base[name].rstrip("/") rewritten: List[RequestTemplate] = [] for t in templates: path = t.path if t.path.startswith("/") else f"/{t.path}" rewritten.append( RequestTemplate( method=t.method, path=f"{base}{path}", params=t.params, json_body=t.json_body, headers=t.headers, ) ) scenarios[name] = Scenario(name=name, templates=rewritten, timeout_sec=args.timeout) return scenarios async def run_single_scenario( scenario: Scenario, duration_sec: int, concurrency: int, max_requests: int, max_errors: int, rerank_dynamic_cfg: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: latencies: List[float] = [] status_counter: Dict[int, int] = {} err_counter: Dict[str, int] = {} total_requests = 0 success_requests = 0 stop_flag = False lock = asyncio.Lock() start = time.perf_counter() timeout = httpx.Timeout(timeout=scenario.timeout_sec) limits = httpx.Limits(max_connections=max(concurrency * 2, 20), max_keepalive_connections=max(concurrency, 10)) async def worker(worker_id: int, client: httpx.AsyncClient) -> None: nonlocal total_requests, success_requests, stop_flag idx = worker_id % len(scenario.templates) worker_rng: Optional[random.Random] = None if rerank_dynamic_cfg is not None: worker_rng = random.Random(int(rerank_dynamic_cfg["seed"]) + worker_id) while not stop_flag: elapsed = time.perf_counter() - start if duration_sec > 0 and elapsed >= duration_sec: break async with lock: if max_requests > 0 and total_requests >= max_requests: stop_flag = True break total_requests += 1 tpl = scenario.templates[idx % len(scenario.templates)] idx += 1 t0 = time.perf_counter() ok = False status = 0 err = "" try: req_json_body = tpl.json_body if rerank_dynamic_cfg is not None and worker_rng is not None: req_json_body = build_random_rerank_payload(rerank_dynamic_cfg, worker_rng) resp = await client.request( method=tpl.method, url=tpl.path, params=tpl.params, json=req_json_body, headers=tpl.headers, ) status = int(resp.status_code) ok = 200 <= status < 300 if ok: try: payload = resp.json() except Exception: ok = False err = "invalid_json_response" else: valid, reason = validate_response_payload( scenario_name=scenario.name, tpl=tpl, payload=payload, ) if not valid: ok = False err = reason or "invalid_payload" if not ok and not err: err = f"http_{status}" except Exception as e: err = type(e).__name__ t1 = time.perf_counter() cost_ms = (t1 - t0) * 1000.0 async with lock: latencies.append(cost_ms) if status: status_counter[status] = status_counter.get(status, 0) + 1 if ok: success_requests += 1 else: err_counter[err or "unknown"] = err_counter.get(err or "unknown", 0) + 1 total_err = sum(err_counter.values()) if max_errors > 0 and total_err >= max_errors: stop_flag = True async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: tasks = [asyncio.create_task(worker(i, client)) for i in range(concurrency)] await asyncio.gather(*tasks) elapsed = max(time.perf_counter() - start, 1e-9) lat_sorted = sorted(latencies) result = { "scenario": scenario.name, "duration_sec": round(elapsed, 3), "total_requests": total_requests, "success_requests": success_requests, "failed_requests": max(total_requests - success_requests, 0), "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "throughput_rps": round(total_requests / elapsed, 2), "latency_ms": { "avg": round(statistics.mean(lat_sorted), 2) if lat_sorted else 0.0, "p50": round(percentile(lat_sorted, 50), 2), "p90": round(percentile(lat_sorted, 90), 2), "p95": round(percentile(lat_sorted, 95), 2), "p99": round(percentile(lat_sorted, 99), 2), "max": round(max(lat_sorted), 2) if lat_sorted else 0.0, }, "status_codes": dict(sorted(status_counter.items(), key=lambda x: x[0])), "errors": dict(sorted(err_counter.items(), key=lambda x: x[0])), } return result def format_summary(result: Dict[str, Any]) -> str: lines = [] lines.append(f"\\n=== Scenario: {result['scenario']} ===") lines.append( "requests={total_requests} success={success_requests} fail={failed_requests} " "success_rate={success_rate}% rps={throughput_rps}".format(**result) ) lat = result["latency_ms"] lines.append( f"latency(ms): avg={lat['avg']} p50={lat['p50']} p90={lat['p90']} p95={lat['p95']} p99={lat['p99']} max={lat['max']}" ) lines.append(f"status_codes: {result['status_codes']}") if result["errors"]: lines.append(f"errors: {result['errors']}") return "\\n".join(lines) def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: if not results: return {} total_requests = sum(x["total_requests"] for x in results) success_requests = sum(x["success_requests"] for x in results) failed_requests = sum(x["failed_requests"] for x in results) total_duration = sum(x["duration_sec"] for x in results) weighted_avg_latency = 0.0 if total_requests > 0: weighted_avg_latency = sum(x["latency_ms"]["avg"] * x["total_requests"] for x in results) / total_requests return { "scenario": "ALL", "total_requests": total_requests, "success_requests": success_requests, "failed_requests": failed_requests, "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "aggregate_rps": round(total_requests / max(total_duration, 1e-9), 2), "weighted_avg_latency_ms": round(weighted_avg_latency, 2), } def parse_csv_items(raw: str) -> List[str]: return [x.strip() for x in str(raw or "").split(",") if x.strip()] def parse_csv_ints(raw: str) -> List[int]: values: List[int] = [] seen = set() for item in parse_csv_items(raw): try: value = int(item) except ValueError as exc: raise ValueError(f"Invalid integer in CSV list: {item}") from exc if value <= 0: raise ValueError(f"Concurrency must be > 0, got {value}") if value in seen: continue seen.add(value) values.append(value) return values def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Interface-level load test for search and related microservices") parser.add_argument( "--scenario", type=str, default="all", help="Scenario: backend_search | backend_suggest | embed_text | translate | rerank | all | comma-separated list", ) parser.add_argument("--tenant-id", type=str, default="162", help="Tenant ID for backend search/suggest") parser.add_argument("--duration", type=int, default=30, help="Duration seconds per scenario; <=0 means no duration cap") parser.add_argument("--concurrency", type=int, default=20, help="Concurrent workers per scenario") parser.add_argument("--max-requests", type=int, default=0, help="Stop after N requests per scenario (0 means unlimited)") parser.add_argument("--timeout", type=float, default=10.0, help="Request timeout seconds") parser.add_argument("--max-errors", type=int, default=0, help="Stop scenario when accumulated errors reach this value") parser.add_argument("--backend-base", type=str, default="http://127.0.0.1:6002", help="Base URL for backend search API") parser.add_argument("--embedding-base", type=str, default="http://127.0.0.1:6005", help="Base URL for embedding service") parser.add_argument("--translator-base", type=str, default="http://127.0.0.1:6006", help="Base URL for translation service") parser.add_argument("--reranker-base", type=str, default="http://127.0.0.1:6007", help="Base URL for reranker service") parser.add_argument("--cases-file", type=str, default="", help="Optional JSON file to override/add request templates") parser.add_argument("--output", type=str, default="", help="Optional output JSON path") parser.add_argument("--pause", type=float, default=0.0, help="Pause seconds between scenarios in all mode") parser.add_argument( "--concurrency-list", type=str, default="", help="Comma-separated concurrency list (e.g. 1,5,10,20). If set, overrides --concurrency.", ) parser.add_argument( "--rerank-dynamic-docs", action="store_true", help="For rerank scenario, generate docs payload dynamically on every request.", ) parser.add_argument("--rerank-doc-count", type=int, default=386, help="Doc count per rerank request when dynamic docs are enabled") parser.add_argument("--rerank-vocab-size", type=int, default=1000, help="Word pool size for rerank dynamic docs generation") parser.add_argument("--rerank-sentence-min-words", type=int, default=15, help="Minimum words per generated doc sentence") parser.add_argument("--rerank-sentence-max-words", type=int, default=40, help="Maximum words per generated doc sentence") parser.add_argument("--rerank-query", type=str, default="wireless mouse", help="Fixed query used for rerank dynamic docs mode") parser.add_argument("--rerank-seed", type=int, default=20260312, help="Base random seed for rerank dynamic docs mode") parser.add_argument( "--rerank-top-n", type=int, default=0, help="Optional top_n for rerank requests in dynamic docs mode (0 means omit top_n).", ) return parser.parse_args() def build_rerank_dynamic_cfg(args: argparse.Namespace) -> Dict[str, Any]: min_words = int(args.rerank_sentence_min_words) max_words = int(args.rerank_sentence_max_words) doc_count = int(args.rerank_doc_count) vocab_size = int(args.rerank_vocab_size) if doc_count <= 0: raise ValueError(f"rerank-doc-count must be > 0, got {doc_count}") if vocab_size <= 0: raise ValueError(f"rerank-vocab-size must be > 0, got {vocab_size}") if min_words <= 0: raise ValueError(f"rerank-sentence-min-words must be > 0, got {min_words}") if max_words < min_words: raise ValueError( f"rerank-sentence-max-words must be >= rerank-sentence-min-words, got {max_words} < {min_words}" ) if args.rerank_seed < 0: raise ValueError(f"rerank-seed must be >= 0, got {args.rerank_seed}") if int(args.rerank_top_n) < 0: raise ValueError(f"rerank-top-n must be >= 0, got {args.rerank_top_n}") # Use deterministic, letter-only pseudo words to avoid long tokenization of numeric strings. syllables = [ "al", "an", "ar", "as", "at", "ba", "be", "bi", "bo", "ca", "ce", "ci", "co", "da", "de", "di", "do", "el", "en", "er", "fa", "fe", "fi", "fo", "ga", "ge", "gi", "go", "ha", "he", "hi", "ho", "ia", "ie", "il", "in", "io", "is", "ka", "ke", "ki", "ko", "la", "le", "li", "lo", "ma", "me", "mi", "mo", ] word_pool: List[str] = [] for a in syllables: for b in syllables: word_pool.append(f"{a}{b}") if len(word_pool) >= vocab_size: break if len(word_pool) >= vocab_size: break if len(word_pool) < vocab_size: raise ValueError(f"Unable to generate enough synthetic words: requested={vocab_size}, got={len(word_pool)}") return { "query": args.rerank_query, "doc_count": doc_count, "min_words": min_words, "max_words": max_words, "seed": int(args.rerank_seed), "normalize": True, "top_n": int(args.rerank_top_n), "word_pool": word_pool, } def build_random_rerank_payload( cfg: Dict[str, Any], rng: random.Random, ) -> Dict[str, Any]: word_pool: List[str] = cfg["word_pool"] docs = [] for _ in range(cfg["doc_count"]): doc_len = rng.randint(cfg["min_words"], cfg["max_words"]) docs.append(" ".join(rng.choices(word_pool, k=doc_len))) return { "query": cfg["query"], "docs": docs, "normalize": bool(cfg.get("normalize", True)), **({"top_n": int(cfg["top_n"])} if int(cfg.get("top_n", 0)) > 0 else {}), } async def main_async() -> int: args = parse_args() scenarios = build_scenarios(args) all_names = ["backend_search", "backend_suggest", "embed_text", "translate", "rerank"] if args.scenario == "all": run_names = [x for x in all_names if x in scenarios] else: requested = parse_csv_items(args.scenario) if not requested: print("No scenario specified.") return 2 unknown = [name for name in requested if name not in scenarios] if unknown: print(f"Unknown scenario(s): {', '.join(unknown)}") print(f"Available: {', '.join(sorted(scenarios.keys()))}") return 2 run_names = requested if not run_names: print("No scenarios to run.") return 2 rerank_dynamic_cfg: Optional[Dict[str, Any]] = None if args.rerank_dynamic_docs: try: rerank_dynamic_cfg = build_rerank_dynamic_cfg(args) except ValueError as exc: print(str(exc)) return 2 concurrency_values = [args.concurrency] if args.concurrency_list: try: concurrency_values = parse_csv_ints(args.concurrency_list) except ValueError as exc: print(str(exc)) return 2 if not concurrency_values: print("concurrency-list is empty after parsing.") return 2 print("Load test config:") print(f" scenario={args.scenario}") print(f" tenant_id={args.tenant_id}") print(f" duration={args.duration}s") print(f" concurrency={args.concurrency}") print(f" concurrency_list={concurrency_values}") print(f" max_requests={args.max_requests}") print(f" timeout={args.timeout}s") print(f" max_errors={args.max_errors}") print(f" backend_base={args.backend_base}") print(f" embedding_base={args.embedding_base}") print(f" translator_base={args.translator_base}") print(f" reranker_base={args.reranker_base}") if args.rerank_dynamic_docs: print(" rerank_dynamic_docs=True") print(f" rerank_doc_count={args.rerank_doc_count}") print(f" rerank_vocab_size={args.rerank_vocab_size}") print(f" rerank_sentence_words=[{args.rerank_sentence_min_words},{args.rerank_sentence_max_words}]") print(f" rerank_query={args.rerank_query}") print(f" rerank_seed={args.rerank_seed}") print(f" rerank_top_n={args.rerank_top_n}") results: List[Dict[str, Any]] = [] total_jobs = len(run_names) * len(concurrency_values) job_idx = 0 for name in run_names: scenario = scenarios[name] for c in concurrency_values: job_idx += 1 print(f"\\n[{job_idx}/{total_jobs}] running {name} @ concurrency={c} ...") result = await run_single_scenario( scenario=scenario, duration_sec=args.duration, concurrency=c, max_requests=args.max_requests, max_errors=args.max_errors, rerank_dynamic_cfg=rerank_dynamic_cfg if name == "rerank" else None, ) result["concurrency"] = c print(format_summary(result)) results.append(result) if args.pause > 0 and job_idx < total_jobs: await asyncio.sleep(args.pause) final = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "config": { "scenario": args.scenario, "run_names": run_names, "tenant_id": args.tenant_id, "duration_sec": args.duration, "concurrency": args.concurrency, "concurrency_list": concurrency_values, "max_requests": args.max_requests, "timeout_sec": args.timeout, "max_errors": args.max_errors, "backend_base": args.backend_base, "embedding_base": args.embedding_base, "translator_base": args.translator_base, "reranker_base": args.reranker_base, "cases_file": args.cases_file or None, "rerank_dynamic_docs": args.rerank_dynamic_docs, "rerank_doc_count": args.rerank_doc_count, "rerank_vocab_size": args.rerank_vocab_size, "rerank_sentence_min_words": args.rerank_sentence_min_words, "rerank_sentence_max_words": args.rerank_sentence_max_words, "rerank_query": args.rerank_query, "rerank_seed": args.rerank_seed, "rerank_top_n": args.rerank_top_n, }, "results": results, "overall": aggregate_results(results), } print("\\n=== Overall ===") print(json.dumps(final["overall"], ensure_ascii=False, indent=2)) if args.output: out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(final, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Saved JSON report: {out_path}") return 0 def main() -> int: try: return asyncio.run(main_async()) except KeyboardInterrupt: print("Interrupted by user") return 130 if __name__ == "__main__": raise SystemExit(main())