#!/usr/bin/env python3 from __future__ import annotations import argparse import asyncio import json import math import random import statistics import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import httpx @dataclass class RequestTemplate: method: str url: str json_body: Optional[Dict[str, Any]] = None headers: Optional[Dict[str, str]] = None def percentile(sorted_values: List[float], p: float) -> float: if not sorted_values: return 0.0 if p <= 0: return sorted_values[0] if p >= 100: return sorted_values[-1] rank = (len(sorted_values) - 1) * (p / 100.0) low = int(math.floor(rank)) high = int(math.ceil(rank)) if low == high: return sorted_values[low] weight = rank - low return sorted_values[low] * (1.0 - weight) + sorted_values[high] * weight def parse_csv_items(raw: str) -> List[str]: return [x.strip() for x in str(raw or "").split(",") if x.strip()] def parse_csv_ints(raw: str) -> List[int]: values: List[int] = [] seen = set() for item in parse_csv_items(raw): try: value = int(item) except ValueError as exc: raise ValueError(f"Invalid integer in CSV list: {item}") from exc if value <= 0: raise ValueError(f"Concurrency must be > 0, got {value}") if value in seen: continue seen.add(value) values.append(value) return values def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="DashScope /compatible-api/v1/reranks perf test") parser.add_argument("--duration", type=int, default=20, help="Duration seconds per concurrency; <=0 means no duration cap") parser.add_argument("--concurrency", type=int, default=1, help="Default concurrency if --concurrency-list is not set") parser.add_argument( "--concurrency-list", type=str, default="1,5,10,20", help="Comma-separated concurrency list (e.g. 1,5,10,20). If set, overrides --concurrency.", ) parser.add_argument("--max-requests", type=int, default=0, help="Stop after N requests per concurrency (0 means unlimited)") parser.add_argument("--timeout", type=float, default=90.0, help="Request timeout seconds") parser.add_argument("--max-errors", type=int, default=0, help="Stop current run when accumulated errors reach this value") parser.add_argument( "--base-url", type=str, default="https://dashscope.aliyuncs.com/compatible-api/v1", help="Base URL for DashScope compatible API", ) parser.add_argument("--api-key", type=str, default="", help="DashScope API key; if omitted, read from DASHSCOPE_API_KEY env") parser.add_argument("--output", type=str, default="", help="Optional output JSON path") parser.add_argument("--pause", type=float, default=0.0, help="Pause seconds between concurrency runs") parser.add_argument("--model", type=str, default="qwen3-rerank", help="Rerank model name") parser.add_argument("--rerank-dynamic-docs", action="store_true", help="Generate documents payload dynamically on every request") parser.add_argument("--rerank-doc-count", type=int, default=386, help="Document count per rerank request") parser.add_argument("--rerank-vocab-size", type=int, default=1000, help="Word pool size for synthetic document generation") parser.add_argument("--rerank-sentence-min-words", type=int, default=15, help="Minimum words per generated document") parser.add_argument("--rerank-sentence-max-words", type=int, default=40, help="Maximum words per generated document") parser.add_argument("--rerank-query", type=str, default="wireless mouse", help="Fixed query used for rerank dynamic docs mode") parser.add_argument("--rerank-seed", type=int, default=20260312, help="Base random seed for dynamic docs mode") parser.add_argument("--rerank-top-n", type=int, default=386, help="top_n for rerank requests; 0 means omit") parser.add_argument( "--rerank-instruct", type=str, default="Given a web search query, retrieve relevant passages that answer the query.", help="Instruct field for DashScope rerank", ) return parser.parse_args() def build_word_pool(vocab_size: int) -> List[str]: syllables = [ "al", "an", "ar", "as", "at", "ba", "be", "bi", "bo", "ca", "ce", "ci", "co", "da", "de", "di", "do", "el", "en", "er", "fa", "fe", "fi", "fo", "ga", "ge", "gi", "go", "ha", "he", "hi", "ho", "ia", "ie", "il", "in", "io", "is", "ka", "ke", "ki", "ko", "la", "le", "li", "lo", "ma", "me", "mi", "mo", ] word_pool: List[str] = [] for a in syllables: for b in syllables: word_pool.append(f"{a}{b}") if len(word_pool) >= vocab_size: return word_pool raise ValueError(f"Unable to generate enough synthetic words: requested={vocab_size}, got={len(word_pool)}") def build_rerank_dynamic_cfg(args: argparse.Namespace) -> Dict[str, Any]: min_words = int(args.rerank_sentence_min_words) max_words = int(args.rerank_sentence_max_words) doc_count = int(args.rerank_doc_count) vocab_size = int(args.rerank_vocab_size) if doc_count <= 0: raise ValueError(f"rerank-doc-count must be > 0, got {doc_count}") if vocab_size <= 0: raise ValueError(f"rerank-vocab-size must be > 0, got {vocab_size}") if min_words <= 0: raise ValueError(f"rerank-sentence-min-words must be > 0, got {min_words}") if max_words < min_words: raise ValueError( f"rerank-sentence-max-words must be >= rerank-sentence-min-words, got {max_words} < {min_words}" ) if args.rerank_seed < 0: raise ValueError(f"rerank-seed must be >= 0, got {args.rerank_seed}") if int(args.rerank_top_n) < 0: raise ValueError(f"rerank-top-n must be >= 0, got {args.rerank_top_n}") return { "model": args.model, "query": args.rerank_query, "doc_count": doc_count, "min_words": min_words, "max_words": max_words, "seed": int(args.rerank_seed), "top_n": int(args.rerank_top_n), "instruct": args.rerank_instruct, "word_pool": build_word_pool(vocab_size), } def build_random_rerank_payload(cfg: Dict[str, Any], rng: random.Random) -> Dict[str, Any]: word_pool: List[str] = cfg["word_pool"] documents: List[str] = [] for _ in range(cfg["doc_count"]): doc_len = rng.randint(cfg["min_words"], cfg["max_words"]) documents.append(" ".join(rng.choices(word_pool, k=doc_len))) payload = { "model": cfg["model"], "documents": documents, "query": cfg["query"], "instruct": cfg["instruct"], } if int(cfg.get("top_n", 0)) > 0: payload["top_n"] = int(cfg["top_n"]) return payload def build_static_template(base_url: str, api_key: str, args: argparse.Namespace) -> RequestTemplate: payload: Dict[str, Any] = { "model": args.model, "documents": [ "文本排序模型广泛用于搜索引擎和推荐系统中,它们根据文本相关性对候选文本进行排序", "量子计算是计算科学的一个前沿领域", "预训练语言模型的发展给文本排序模型带来了新的进展", ], "query": "什么是文本排序模型", "instruct": args.rerank_instruct, } if int(args.rerank_top_n) > 0: payload["top_n"] = int(args.rerank_top_n) return RequestTemplate( method="POST", url=f"{base_url.rstrip('/')}/reranks", json_body=payload, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, ) def validate_response_payload(payload: Any) -> tuple[bool, str]: if not isinstance(payload, dict): return False, "invalid_payload_non_dict" if "results" not in payload: return False, "invalid_payload_missing_results" if not isinstance(payload["results"], list): return False, "invalid_payload_results_non_list" return True, "" async def run_single_concurrency( template: RequestTemplate, duration_sec: int, concurrency: int, max_requests: int, max_errors: int, timeout_sec: float, rerank_dynamic_cfg: Optional[Dict[str, Any]], ) -> Dict[str, Any]: latencies: List[float] = [] status_counter: Dict[int, int] = {} err_counter: Dict[str, int] = {} total_requests = 0 success_requests = 0 stop_flag = False lock = asyncio.Lock() start = time.perf_counter() timeout = httpx.Timeout(timeout=timeout_sec) limits = httpx.Limits(max_connections=max(concurrency * 2, 20), max_keepalive_connections=max(concurrency, 10)) async def worker(worker_id: int, client: httpx.AsyncClient) -> None: nonlocal total_requests, success_requests, stop_flag worker_rng: Optional[random.Random] = None if rerank_dynamic_cfg is not None: worker_rng = random.Random(int(rerank_dynamic_cfg["seed"]) + worker_id) while not stop_flag: elapsed = time.perf_counter() - start if duration_sec > 0 and elapsed >= duration_sec: break async with lock: if max_requests > 0 and total_requests >= max_requests: stop_flag = True break total_requests += 1 payload = template.json_body if rerank_dynamic_cfg is not None and worker_rng is not None: payload = build_random_rerank_payload(rerank_dynamic_cfg, worker_rng) t0 = time.perf_counter() ok = False status = 0 err = "" try: resp = await client.request( method=template.method, url=template.url, headers=template.headers, json=payload, ) status = int(resp.status_code) ok = 200 <= status < 300 if ok: try: body = resp.json() except Exception: ok = False err = "invalid_json_response" else: valid, reason = validate_response_payload(body) if not valid: ok = False err = reason or "invalid_payload" if not ok and not err: err = f"http_{status}" except Exception as e: err = type(e).__name__ latency_ms = (time.perf_counter() - t0) * 1000.0 async with lock: latencies.append(latency_ms) if status: status_counter[status] = status_counter.get(status, 0) + 1 if ok: success_requests += 1 else: err_counter[err or "unknown"] = err_counter.get(err or "unknown", 0) + 1 if max_errors > 0 and sum(err_counter.values()) >= max_errors: stop_flag = True async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: tasks = [asyncio.create_task(worker(i, client)) for i in range(concurrency)] await asyncio.gather(*tasks) elapsed = max(time.perf_counter() - start, 1e-9) lat_sorted = sorted(latencies) result = { "scenario": "rerank_dashscope", "concurrency": concurrency, "duration_sec": round(elapsed, 3), "total_requests": total_requests, "success_requests": success_requests, "failed_requests": max(total_requests - success_requests, 0), "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "throughput_rps": round(total_requests / elapsed, 2), "latency_ms": { "avg": round(statistics.mean(lat_sorted), 2) if lat_sorted else 0.0, "p50": round(percentile(lat_sorted, 50), 2), "p90": round(percentile(lat_sorted, 90), 2), "p95": round(percentile(lat_sorted, 95), 2), "p99": round(percentile(lat_sorted, 99), 2), "max": round(max(lat_sorted), 2) if lat_sorted else 0.0, }, "status_codes": dict(sorted(status_counter.items(), key=lambda x: x[0])), "errors": dict(sorted(err_counter.items(), key=lambda x: x[0])), } return result def format_summary(result: Dict[str, Any]) -> str: lat = result["latency_ms"] lines = [ f" === Scenario: {result['scenario']} @ concurrency={result['concurrency']} ===", "requests={total_requests} success={success_requests} fail={failed_requests} success_rate={success_rate}% rps={throughput_rps}".format(**result), f"latency(ms): avg={lat['avg']} p50={lat['p50']} p90={lat['p90']} p95={lat['p95']} p99={lat['p99']} max={lat['max']}", f"status_codes: {result['status_codes']}", ] if result["errors"]: lines.append(f"errors: {result['errors']}") return " ".join(lines) def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: if not results: return {} total_requests = sum(x["total_requests"] for x in results) success_requests = sum(x["success_requests"] for x in results) failed_requests = sum(x["failed_requests"] for x in results) total_duration = sum(x["duration_sec"] for x in results) weighted_avg_latency = 0.0 if total_requests > 0: weighted_avg_latency = sum(x["latency_ms"]["avg"] * x["total_requests"] for x in results) / total_requests return { "scenario": "ALL", "total_requests": total_requests, "success_requests": success_requests, "failed_requests": failed_requests, "success_rate": round((success_requests / total_requests) * 100.0, 2) if total_requests else 0.0, "aggregate_rps": round(total_requests / max(total_duration, 1e-9), 2), "weighted_avg_latency_ms": round(weighted_avg_latency, 2), } async def main_async() -> int: import os args = parse_args() api_key = (args.api_key or os.getenv("DASHSCOPE_API_KEY") or "").strip() if not api_key: print("Missing API key. Set --api-key or DASHSCOPE_API_KEY.") return 2 try: concurrency_values = parse_csv_ints(args.concurrency_list) if args.concurrency_list else [args.concurrency] except ValueError as exc: print(str(exc)) return 2 if not concurrency_values: print("concurrency-list is empty after parsing.") return 2 try: rerank_dynamic_cfg = build_rerank_dynamic_cfg(args) if args.rerank_dynamic_docs else None except ValueError as exc: print(str(exc)) return 2 template = build_static_template(args.base_url, api_key, args) print("Load test config:") print(" scenario=rerank_dashscope") print(f" duration={args.duration}s") print(f" concurrency={args.concurrency}") print(f" concurrency_list={concurrency_values}") print(f" max_requests={args.max_requests}") print(f" timeout={args.timeout}s") print(f" max_errors={args.max_errors}") print(f" base_url={args.base_url}") print(f" model={args.model}") print(f" rerank_dynamic_docs={args.rerank_dynamic_docs}") if args.rerank_dynamic_docs: print(f" rerank_doc_count={args.rerank_doc_count}") print(f" rerank_vocab_size={args.rerank_vocab_size}") print(f" rerank_sentence_words=[{args.rerank_sentence_min_words},{args.rerank_sentence_max_words}]") print(f" rerank_query={args.rerank_query}") print(f" rerank_seed={args.rerank_seed}") print(f" rerank_top_n={args.rerank_top_n}") print(f" rerank_instruct={args.rerank_instruct}") else: print(" static_request_payload=demo_payload") results: List[Dict[str, Any]] = [] total_jobs = len(concurrency_values) for idx, c in enumerate(concurrency_values, start=1): print(f" [{idx}/{total_jobs}] running rerank_dashscope @ concurrency={c} ...") result = await run_single_concurrency( template=template, duration_sec=args.duration, concurrency=c, max_requests=args.max_requests, max_errors=args.max_errors, timeout_sec=args.timeout, rerank_dynamic_cfg=rerank_dynamic_cfg, ) print(format_summary(result)) results.append(result) if args.pause > 0 and idx < total_jobs: await asyncio.sleep(args.pause) final = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "config": { "scenario": "rerank_dashscope", "duration_sec": args.duration, "concurrency": args.concurrency, "concurrency_list": concurrency_values, "max_requests": args.max_requests, "timeout_sec": args.timeout, "max_errors": args.max_errors, "base_url": args.base_url, "model": args.model, "output": args.output or None, "rerank_dynamic_docs": args.rerank_dynamic_docs, "rerank_doc_count": args.rerank_doc_count, "rerank_vocab_size": args.rerank_vocab_size, "rerank_sentence_min_words": args.rerank_sentence_min_words, "rerank_sentence_max_words": args.rerank_sentence_max_words, "rerank_query": args.rerank_query, "rerank_seed": args.rerank_seed, "rerank_top_n": args.rerank_top_n, "rerank_instruct": args.rerank_instruct, }, "results": results, "overall": aggregate_results(results), } print(" === Overall ===") print(json.dumps(final["overall"], ensure_ascii=False, indent=2)) if args.output: out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(final, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Saved JSON report: {out_path}") return 0 def main() -> int: try: return asyncio.run(main_async()) except KeyboardInterrupt: print("Interrupted by user") return 130 if __name__ == "__main__": raise SystemExit(main())