#!/usr/bin/env python3 """ API-level performance test script for search stack services. Default scenarios (aligned with docs/搜索API对接指南.md): - backend_search POST /search/ - backend_suggest GET /search/suggestions - embed_text POST /embed/text - translate POST /translate - rerank POST /rerank Examples: python scripts/perf_api_benchmark.py --scenario backend_search --duration 30 --concurrency 20 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario backend_suggest --duration 30 --concurrency 50 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --duration 60 --concurrency 80 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --cases-file scripts/perf_cases.json.example --output perf_result.json """ from __future__ import annotations import argparse import asyncio import json import math import statistics import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import httpx @dataclass class RequestTemplate: method: str path: str params: Optional[Dict[str, Any]] = None json_body: Optional[Any] = None headers: Optional[Dict[str, str]] = None @dataclass class Scenario: name: str templates: List[RequestTemplate] timeout_sec: float @dataclass class RequestResult: ok: bool status_code: int latency_ms: float error: str = "" def _is_finite_number(v: Any) -> bool: if isinstance(v, bool): return False if isinstance(v, (int, float)): return math.isfinite(float(v)) return False def validate_response_payload( scenario_name: str, tpl: RequestTemplate, payload: Any, ) -> Tuple[bool, str]: """ Lightweight payload validation for correctness-aware perf tests. Currently strict for embed_text to catch NaN/null vector regressions. """ if scenario_name != "embed_text": return True, "" expected_len = len(tpl.json_body) if isinstance(tpl.json_body, list) else None if not isinstance(payload, list): return False, "invalid_payload_non_list" if expected_len is not None and len(payload) != expected_len: return False, "invalid_payload_length" if len(payload) == 0: return False, "invalid_payload_empty" for i, vec in enumerate(payload): if not isinstance(vec, list) or len(vec) == 0: return False, f"invalid_vector_{i}_shape" for x in vec: if not _is_finite_number(x): return False, f"invalid_vector_{i}_non_finite" return True, "" def percentile(sorted_values: List[float], p: float) -> float: if not sorted_values: return 0.0 if p <= 0: return sorted_values[0] if p >= 100: return sorted_values[-1] rank = (len(sorted_values) - 1) * (p / 100.0) low = int(math.floor(rank)) high = int(math.ceil(rank)) if low == high: return sorted_values[low] weight = rank - low return sorted_values[low] * (1.0 - weight) + sorted_values[high] * weight def make_default_templates(tenant_id: str) -> Dict[str, List[RequestTemplate]]: return { "backend_search": [ RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "wireless mouse", "size": 10, "language": "en"}, ), RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "芭比娃娃", "size": 10, "language": "zh"}, ), RequestTemplate( method="POST", path="/search/", headers={"X-Tenant-ID": tenant_id}, json_body={"query": "f", "size": 10, "language": "en"}, ), ], "backend_suggest": [ RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "f", "size": 10, "language": "en"}, ), RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "玩", "size": 10, "language": "zh"}, ), RequestTemplate( method="GET", path="/search/suggestions", headers={"X-Tenant-ID": tenant_id}, params={"q": "shi", "size": 10, "language": "en"}, ), ], "embed_text": [ RequestTemplate( method="POST", path="/embed/text", json_body=["wireless mouse", "gaming keyboard", "barbie doll"], ) ], "translate": [ RequestTemplate( method="POST", path="/translate", json_body={"text": "商品名称", "target_lang": "en", "source_lang": "zh", "model": "qwen"}, ), RequestTemplate( method="POST", path="/translate", json_body={"text": "Product title", "target_lang": "zh", "model": "qwen"}, ), ], "rerank": [ RequestTemplate( method="POST", path="/rerank", json_body={ "query": "wireless mouse", "docs": [ "Wireless ergonomic mouse with rechargeable battery", "USB-C cable 1m", "Gaming mouse 26000 DPI", ], "normalize": True, }, ) ], } def load_cases_from_file(path: Path, tenant_id: str) -> Dict[str, List[RequestTemplate]]: data = json.loads(path.read_text(encoding="utf-8")) out: Dict[str, List[RequestTemplate]] = {} for scenario_name, requests_data in (data.get("scenarios") or {}).items(): templates: List[RequestTemplate] = [] for item in requests_data: headers = dict(item.get("headers") or {}) if "X-Tenant-ID" in headers and str(headers["X-Tenant-ID"]).strip() == "${tenant_id}": headers["X-Tenant-ID"] = tenant_id templates.append( RequestTemplate( method=str(item.get("method", "GET")).upper(), path=str(item.get("path", "")).strip(), params=item.get("params"), json_body=item.get("json"), headers=headers or None, ) ) if templates: out[scenario_name] = templates return out def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]: defaults = make_default_templates(args.tenant_id) if args.cases_file: custom = load_cases_from_file(Path(args.cases_file), tenant_id=args.tenant_id) defaults.update(custom) scenario_base = { "backend_search": args.backend_base, "backend_suggest": args.backend_base, "embed_text": args.embedding_base, "translate": args.translator_base, "rerank": args.reranker_base, } scenarios: Dict[str, Scenario] = {} for name, templates in defaults.items(): if name not in scenario_base: continue base = scenario_base[name].rstrip("/") rewritten: List[RequestTemplate] = [] for t in templates: path = t.path if t.path.startswith("/") else f"/{t.path}" rewritten.append( RequestTemplate( method=t.method, path=f"{base}{path}", params=t.params, json_body=t.json_body, headers=t.headers, ) ) scenarios[name] = Scenario(name=name, templates=rewritten, timeout_sec=args.timeout) return scenarios async def run_single_scenario( scenario: Scenario, duration_sec: int, concurrency: int, max_requests: int, max_errors: int, ) -> Dict[str, Any]: latencies: List[float] = [] status_counter: Dict[int, int] = {} err_counter: Dict[str, int] = {} total_requests = 0 success_requests = 0 stop_flag = False lock = asyncio.Lock() start = time.perf_counter() timeout = httpx.Timeout(timeout=scenario.timeout_sec) limits = httpx.Limits(max_connections=max(concurrency * 2, 20), max_keepalive_connections=max(concurrency, 10)) async def worker(worker_id: int, client: httpx.AsyncClient) -> None: nonlocal total_requests, success_requests, stop_flag idx = worker_id % len(scenario.templates) while not stop_flag: elapsed = time.perf_counter() - start if duration_sec > 0 and elapsed >= duration_sec: break async with lock: if max_requests > 0 and total_requests >= max_requests: stop_flag = True break total_requests += 1 tpl = scenario.templates[idx % len(scenario.templates)] idx += 1 t0 = time.perf_counter() ok = False status = 0 err = "" try: resp = await client.request( method=tpl.method, url=tpl.path, params=tpl.params, json=tpl.json_body, headers=tpl.headers, ) status = int(resp.status_code) ok = 200 <= status < 300 if ok: try: payload = resp.json() except Exception: ok = False err = "invalid_json_response" else: valid, reason = validate_response_payload( scenario_name=scenario.name, tpl=tpl, payload=payload, ) if not valid: ok = False err = reason or "invalid_payload" if not ok and not err: err = f"http_{status}" except Exception as e: err = type(e).__name__ t1 = time.perf_counter() cost_ms = (t1 - t0) * 1000.0 async with lock: latencies.append(cost_ms) if status: status_counter[status] = status_counter.get(status, 0) + 1 if ok: success_requests += 1 else: err_counter[err or "unknown"] = err_counter.get(err or "unknown", 0) + 1 total_err = sum(err_counter.values()) if max_errors > 0 and total_err >= max_errors: stop_flag = True async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: tasks = [asyncio.create_task(worker(i, client)) for i in range(concurrency)] await asyncio.gather(*tasks) elapsed = max(time.perf_counter() - start, 1e-9) lat_sorted = sorted(latencies) result = { "scenario": scenario.name, "duration_sec": round(elapsed, 3), "total_requests": total_requests, "success_requests": success_requests, "failed_requests": max(total_requests - success_requests, 0), "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "throughput_rps": round(total_requests / elapsed, 2), "latency_ms": { "avg": round(statistics.mean(lat_sorted), 2) if lat_sorted else 0.0, "p50": round(percentile(lat_sorted, 50), 2), "p90": round(percentile(lat_sorted, 90), 2), "p95": round(percentile(lat_sorted, 95), 2), "p99": round(percentile(lat_sorted, 99), 2), "max": round(max(lat_sorted), 2) if lat_sorted else 0.0, }, "status_codes": dict(sorted(status_counter.items(), key=lambda x: x[0])), "errors": dict(sorted(err_counter.items(), key=lambda x: x[0])), } return result def format_summary(result: Dict[str, Any]) -> str: lines = [] lines.append(f"\\n=== Scenario: {result['scenario']} ===") lines.append( "requests={total_requests} success={success_requests} fail={failed_requests} " "success_rate={success_rate}% rps={throughput_rps}".format(**result) ) lat = result["latency_ms"] lines.append( f"latency(ms): avg={lat['avg']} p50={lat['p50']} p90={lat['p90']} p95={lat['p95']} p99={lat['p99']} max={lat['max']}" ) lines.append(f"status_codes: {result['status_codes']}") if result["errors"]: lines.append(f"errors: {result['errors']}") return "\\n".join(lines) def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: if not results: return {} total_requests = sum(x["total_requests"] for x in results) success_requests = sum(x["success_requests"] for x in results) failed_requests = sum(x["failed_requests"] for x in results) total_duration = sum(x["duration_sec"] for x in results) weighted_avg_latency = 0.0 if total_requests > 0: weighted_avg_latency = sum(x["latency_ms"]["avg"] * x["total_requests"] for x in results) / total_requests return { "scenario": "ALL", "total_requests": total_requests, "success_requests": success_requests, "failed_requests": failed_requests, "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "aggregate_rps": round(total_requests / max(total_duration, 1e-9), 2), "weighted_avg_latency_ms": round(weighted_avg_latency, 2), } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Interface-level load test for search and related microservices") parser.add_argument( "--scenario", type=str, default="all", help="Scenario: backend_search | backend_suggest | embed_text | translate | rerank | all", ) parser.add_argument("--tenant-id", type=str, default="162", help="Tenant ID for backend search/suggest") parser.add_argument("--duration", type=int, default=30, help="Duration seconds per scenario; <=0 means no duration cap") parser.add_argument("--concurrency", type=int, default=20, help="Concurrent workers per scenario") parser.add_argument("--max-requests", type=int, default=0, help="Stop after N requests per scenario (0 means unlimited)") parser.add_argument("--timeout", type=float, default=10.0, help="Request timeout seconds") parser.add_argument("--max-errors", type=int, default=0, help="Stop scenario when accumulated errors reach this value") parser.add_argument("--backend-base", type=str, default="http://127.0.0.1:6002", help="Base URL for backend search API") parser.add_argument("--embedding-base", type=str, default="http://127.0.0.1:6005", help="Base URL for embedding service") parser.add_argument("--translator-base", type=str, default="http://127.0.0.1:6006", help="Base URL for translation service") parser.add_argument("--reranker-base", type=str, default="http://127.0.0.1:6007", help="Base URL for reranker service") parser.add_argument("--cases-file", type=str, default="", help="Optional JSON file to override/add request templates") parser.add_argument("--output", type=str, default="", help="Optional output JSON path") parser.add_argument("--pause", type=float, default=0.0, help="Pause seconds between scenarios in all mode") return parser.parse_args() async def main_async() -> int: args = parse_args() scenarios = build_scenarios(args) all_names = ["backend_search", "backend_suggest", "embed_text", "translate", "rerank"] if args.scenario == "all": run_names = [x for x in all_names if x in scenarios] else: if args.scenario not in scenarios: print(f"Unknown scenario: {args.scenario}") print(f"Available: {', '.join(sorted(scenarios.keys()))}") return 2 run_names = [args.scenario] if not run_names: print("No scenarios to run.") return 2 print("Load test config:") print(f" scenario={args.scenario}") print(f" tenant_id={args.tenant_id}") print(f" duration={args.duration}s") print(f" concurrency={args.concurrency}") print(f" max_requests={args.max_requests}") print(f" timeout={args.timeout}s") print(f" max_errors={args.max_errors}") print(f" backend_base={args.backend_base}") print(f" embedding_base={args.embedding_base}") print(f" translator_base={args.translator_base}") print(f" reranker_base={args.reranker_base}") results: List[Dict[str, Any]] = [] for i, name in enumerate(run_names, start=1): scenario = scenarios[name] print(f"\\n[{i}/{len(run_names)}] running {name} ...") result = await run_single_scenario( scenario=scenario, duration_sec=args.duration, concurrency=args.concurrency, max_requests=args.max_requests, max_errors=args.max_errors, ) print(format_summary(result)) results.append(result) if args.pause > 0 and i < len(run_names): await asyncio.sleep(args.pause) final = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "config": { "scenario": args.scenario, "tenant_id": args.tenant_id, "duration_sec": args.duration, "concurrency": args.concurrency, "max_requests": args.max_requests, "timeout_sec": args.timeout, "max_errors": args.max_errors, "backend_base": args.backend_base, "embedding_base": args.embedding_base, "translator_base": args.translator_base, "reranker_base": args.reranker_base, "cases_file": args.cases_file or None, }, "results": results, "overall": aggregate_results(results), } print("\\n=== Overall ===") print(json.dumps(final["overall"], ensure_ascii=False, indent=2)) if args.output: out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(final, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Saved JSON report: {out_path}") return 0 def main() -> int: try: return asyncio.run(main_async()) except KeyboardInterrupt: print("Interrupted by user") return 130 if __name__ == "__main__": raise SystemExit(main())