#!/usr/bin/env python3 """Benchmark local translation models with products_analyzed.csv.""" from __future__ import annotations import argparse import concurrent.futures import copy import csv import json import math import platform import resource import statistics import subprocess import sys import time from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Sequence import torch import transformers PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from config.services_config import get_translation_config # noqa: E402 from translation.service import TranslationService # noqa: E402 from translation.settings import get_translation_capability # noqa: E402 DEFAULT_BATCH_SIZES = [1, 4, 8, 16, 32, 64] DEFAULT_CONCURRENCIES = [1, 2, 4, 8, 16, 64] SCENARIOS: List[Dict[str, str]] = [ { "name": "nllb-200-distilled-600m zh->en", "model": "nllb-200-distilled-600m", "source_lang": "zh", "target_lang": "en", "column": "title_cn", "scene": "sku_name", }, { "name": "nllb-200-distilled-600m en->zh", "model": "nllb-200-distilled-600m", "source_lang": "en", "target_lang": "zh", "column": "title", "scene": "sku_name", }, { "name": "opus-mt-zh-en zh->en", "model": "opus-mt-zh-en", "source_lang": "zh", "target_lang": "en", "column": "title_cn", "scene": "sku_name", }, { "name": "opus-mt-en-zh en->zh", "model": "opus-mt-en-zh", "source_lang": "en", "target_lang": "zh", "column": "title", "scene": "sku_name", }, ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Benchmark local translation models") parser.add_argument("--csv-path", default="products_analyzed.csv", help="Benchmark dataset CSV path") parser.add_argument("--limit", type=int, default=0, help="Limit rows for baseline or single-case run; 0 means all") parser.add_argument("--output-dir", default="", help="Directory for JSON/Markdown reports") parser.add_argument("--single", action="store_true", help="Run a single scenario in-process") parser.add_argument("--model", default="", help="Model name for --single mode") parser.add_argument("--source-lang", default="", help="Source language for --single mode") parser.add_argument("--target-lang", default="", help="Target language for --single mode") parser.add_argument("--column", default="", help="CSV column to benchmark for --single mode") parser.add_argument("--scene", default="sku_name", help="Scene passed to translation service") parser.add_argument("--batch-size", type=int, default=0, help="Override configured batch size") parser.add_argument("--device-override", default="", help="Override configured device, for example cpu or cuda") parser.add_argument("--torch-dtype-override", default="", help="Override configured torch dtype, for example float32 or float16") parser.add_argument("--max-new-tokens", type=int, default=0, help="Override configured max_new_tokens") parser.add_argument("--num-beams", type=int, default=0, help="Override configured num_beams") parser.add_argument("--attn-implementation", default="", help="Override attention implementation, for example sdpa") parser.add_argument("--ct2-inter-threads", type=int, default=-1, help="Override CTranslate2 inter_threads") parser.add_argument("--ct2-intra-threads", type=int, default=-1, help="Override CTranslate2 intra_threads") parser.add_argument( "--ct2-max-queued-batches", type=int, default=-1, help="Override CTranslate2 max_queued_batches", ) parser.add_argument( "--ct2-batch-type", default="", help="Override CTranslate2 batch_type, for example examples or tokens", ) parser.add_argument( "--ct2-decoding-length-mode", default="", help="Override CTranslate2 decoding length mode, for example fixed or source", ) parser.add_argument( "--ct2-decoding-length-extra", type=int, default=0, help="Extra tokens added when ct2 decoding length mode is source", ) parser.add_argument( "--ct2-decoding-length-min", type=int, default=0, help="Minimum decoding length when ct2 decoding length mode is source", ) parser.add_argument("--warmup-batches", type=int, default=1, help="Warmup batches before measuring") parser.add_argument("--disable-cache", action="store_true", help="Disable translation cache during benchmarks") parser.add_argument( "--suite", choices=["baseline", "extended"], default="baseline", help="baseline keeps the previous all-scenarios summary; extended adds batch/concurrency/matrix sweeps", ) parser.add_argument( "--batch-size-list", default="", help="Comma-separated batch sizes for extended suite; default 1,4,8,16,32,64", ) parser.add_argument( "--concurrency-list", default="", help="Comma-separated concurrency levels for extended suite; default 1,2,4,8,16,64", ) parser.add_argument( "--serial-items-per-case", type=int, default=512, help="Items per batch-size case in extended suite", ) parser.add_argument( "--concurrency-requests-per-case", type=int, default=128, help="Requests per concurrency or matrix case in extended suite", ) parser.add_argument( "--concurrency-batch-size", type=int, default=1, help="Batch size used by the dedicated concurrency sweep", ) parser.add_argument( "--max-batch-concurrency-product", type=int, default=128, help="Skip matrix cases where batch_size * concurrency exceeds this value; 0 disables the limit", ) return parser.parse_args() def parse_csv_ints(raw: str, fallback: Sequence[int]) -> List[int]: if not raw.strip(): return list(fallback) values: List[int] = [] for item in raw.split(","): stripped = item.strip() if not stripped: continue value = int(stripped) if value <= 0: raise ValueError(f"Expected positive integer, got {value}") values.append(value) if not values: raise ValueError("Parsed empty integer list") return values def load_texts(csv_path: Path, column: str, limit: int) -> List[str]: texts: List[str] = [] with csv_path.open("r", encoding="utf-8") as handle: reader = csv.DictReader(handle) for row in reader: value = (row.get(column) or "").strip() if value: texts.append(value) if limit > 0 and len(texts) >= limit: break if not texts: raise ValueError(f"No non-empty texts found in column '{column}' from {csv_path}") return texts def batched(values: Sequence[str], batch_size: int) -> Iterable[List[str]]: for start in range(0, len(values), batch_size): yield list(values[start:start + batch_size]) def percentile(values: List[float], p: float) -> float: if not values: return 0.0 ordered = sorted(values) if len(values) == 1: return float(ordered[0]) idx = (len(ordered) - 1) * p lower = math.floor(idx) upper = math.ceil(idx) if lower == upper: return float(ordered[lower]) return float(ordered[lower] + (ordered[upper] - ordered[lower]) * (idx - lower)) def resolve_output_dir(output_dir: str) -> Path: if output_dir: path = Path(output_dir) else: path = PROJECT_ROOT / "perf_reports" / datetime.now().strftime("%Y%m%d") / "translation_local_models" path.mkdir(parents=True, exist_ok=True) return path def build_environment_info() -> Dict[str, Any]: gpu_name = None gpu_total_mem_gb = None if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) props = torch.cuda.get_device_properties(0) gpu_total_mem_gb = round(props.total_memory / (1024 ** 3), 2) return { "python": platform.python_version(), "torch": torch.__version__, "transformers": transformers.__version__, "cuda_available": torch.cuda.is_available(), "gpu_name": gpu_name, "gpu_total_mem_gb": gpu_total_mem_gb, "platform": platform.platform(), } def scenario_from_args(args: argparse.Namespace) -> Dict[str, str]: return { "name": f"{args.model} {args.source_lang}->{args.target_lang}", "model": args.model, "source_lang": args.source_lang, "target_lang": args.target_lang, "column": args.column, "scene": args.scene, } def build_config_and_capability( args: argparse.Namespace, *, batch_size_override: int | None = None, ) -> tuple[Dict[str, Any], Dict[str, Any]]: config = copy.deepcopy(get_translation_config()) for name, cfg in config["capabilities"].items(): cfg["enabled"] = name == args.model config["default_model"] = args.model capability = get_translation_capability(config, args.model, require_enabled=False) if args.device_override: capability["device"] = args.device_override if args.torch_dtype_override: capability["torch_dtype"] = args.torch_dtype_override if batch_size_override is not None: capability["batch_size"] = batch_size_override elif args.batch_size: capability["batch_size"] = args.batch_size if args.max_new_tokens: capability["max_new_tokens"] = args.max_new_tokens if args.num_beams: capability["num_beams"] = args.num_beams if args.attn_implementation: capability["attn_implementation"] = args.attn_implementation if args.ct2_inter_threads >= 0: capability["ct2_inter_threads"] = args.ct2_inter_threads if args.ct2_intra_threads >= 0: capability["ct2_intra_threads"] = args.ct2_intra_threads if args.ct2_max_queued_batches >= 0: capability["ct2_max_queued_batches"] = args.ct2_max_queued_batches if args.ct2_batch_type: capability["ct2_batch_type"] = args.ct2_batch_type if args.ct2_decoding_length_mode: capability["ct2_decoding_length_mode"] = args.ct2_decoding_length_mode if args.ct2_decoding_length_extra: capability["ct2_decoding_length_extra"] = args.ct2_decoding_length_extra if args.ct2_decoding_length_min: capability["ct2_decoding_length_min"] = args.ct2_decoding_length_min if args.disable_cache: capability["use_cache"] = False config["capabilities"][args.model] = capability return config, capability def ensure_cuda_stats_reset() -> None: if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.reset_peak_memory_stats() def build_memory_metrics() -> Dict[str, Any]: peak_gpu_mem_gb = None peak_gpu_reserved_gb = None if torch.cuda.is_available(): peak_gpu_mem_gb = round(torch.cuda.max_memory_allocated() / (1024 ** 3), 3) peak_gpu_reserved_gb = round(torch.cuda.max_memory_reserved() / (1024 ** 3), 3) max_rss_mb = round(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024, 2) return { "max_rss_mb": max_rss_mb, "peak_gpu_memory_gb": peak_gpu_mem_gb, "peak_gpu_reserved_gb": peak_gpu_reserved_gb, } def make_request_payload(batch: Sequence[str]) -> str | List[str]: if len(batch) == 1: return batch[0] return list(batch) def benchmark_serial_case( *, service: TranslationService, backend: Any, scenario: Dict[str, str], capability: Dict[str, Any], texts: List[str], batch_size: int, warmup_batches: int, ) -> Dict[str, Any]: backend.batch_size = batch_size measured_batches = list(batched(texts, batch_size)) warmup_count = min(max(warmup_batches, 0), len(measured_batches)) for batch in measured_batches[:warmup_count]: service.translate( text=make_request_payload(batch), source_lang=scenario["source_lang"], target_lang=scenario["target_lang"], model=scenario["model"], scene=scenario["scene"], ) batch_latencies_ms: List[float] = [] success_count = 0 failure_count = 0 output_chars = 0 total_input_chars = sum(len(text) for text in texts) start = time.perf_counter() for batch in measured_batches: batch_start = time.perf_counter() outputs = service.translate( text=make_request_payload(batch), source_lang=scenario["source_lang"], target_lang=scenario["target_lang"], model=scenario["model"], scene=scenario["scene"], ) elapsed_ms = (time.perf_counter() - batch_start) * 1000 batch_latencies_ms.append(elapsed_ms) if isinstance(outputs, list): result_items = outputs else: result_items = [outputs] for item in result_items: if item is None: failure_count += 1 else: success_count += 1 output_chars += len(item) translate_seconds = time.perf_counter() - start total_items = len(texts) memory = build_memory_metrics() return { "mode": "serial_batch", "batch_size": batch_size, "concurrency": 1, "rows": total_items, "requests": len(measured_batches), "input_chars": total_input_chars, "load_seconds": 0.0, "translate_seconds": round(translate_seconds, 4), "total_seconds": round(translate_seconds, 4), "batch_count": len(batch_latencies_ms), "request_latency_p50_ms": round(percentile(batch_latencies_ms, 0.50), 2), "request_latency_p95_ms": round(percentile(batch_latencies_ms, 0.95), 2), "request_latency_max_ms": round(max(batch_latencies_ms), 2), "avg_request_latency_ms": round(statistics.fmean(batch_latencies_ms), 2), "avg_item_latency_ms": round((translate_seconds / total_items) * 1000, 3), "requests_per_second": round(len(measured_batches) / translate_seconds, 2), "items_per_second": round(total_items / translate_seconds, 2), "input_chars_per_second": round(total_input_chars / translate_seconds, 2), "output_chars_per_second": round(output_chars / translate_seconds, 2), "success_count": success_count, "failure_count": failure_count, "success_rate": round(success_count / total_items, 6), "device": str(getattr(backend, "device", capability.get("device", "unknown"))), "torch_dtype": str(getattr(backend, "torch_dtype", capability.get("torch_dtype", "unknown"))), "configured_batch_size": int(capability.get("batch_size") or batch_size), "used_batch_size": batch_size, "warmup_batches": warmup_count, **memory, } def benchmark_concurrency_case( *, service: TranslationService, backend: Any, scenario: Dict[str, str], capability: Dict[str, Any], texts: List[str], batch_size: int, concurrency: int, requests_per_case: int, warmup_batches: int, ) -> Dict[str, Any]: backend.batch_size = batch_size required_items = batch_size * requests_per_case case_texts = texts[:required_items] request_batches = list(batched(case_texts, batch_size)) if not request_batches: raise ValueError("No request batches prepared for concurrency benchmark") warmup_count = min(max(warmup_batches, 0), len(request_batches)) for batch in request_batches[:warmup_count]: service.translate( text=make_request_payload(batch), source_lang=scenario["source_lang"], target_lang=scenario["target_lang"], model=scenario["model"], scene=scenario["scene"], ) request_latencies_ms: List[float] = [] success_count = 0 failure_count = 0 output_chars = 0 total_input_chars = sum(len(text) for text in case_texts) def worker(batch: List[str]) -> tuple[float, int, int, int]: started = time.perf_counter() outputs = service.translate( text=make_request_payload(batch), source_lang=scenario["source_lang"], target_lang=scenario["target_lang"], model=scenario["model"], scene=scenario["scene"], ) elapsed_ms = (time.perf_counter() - started) * 1000 if isinstance(outputs, list): result_items = outputs else: result_items = [outputs] local_success = 0 local_failure = 0 local_output_chars = 0 for item in result_items: if item is None: local_failure += 1 else: local_success += 1 local_output_chars += len(item) return elapsed_ms, local_success, local_failure, local_output_chars wall_start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: futures = [executor.submit(worker, batch) for batch in request_batches] for future in concurrent.futures.as_completed(futures): latency_ms, local_success, local_failure, local_output_chars = future.result() request_latencies_ms.append(latency_ms) success_count += local_success failure_count += local_failure output_chars += local_output_chars wall_seconds = time.perf_counter() - wall_start total_items = len(case_texts) memory = build_memory_metrics() return { "mode": "concurrency", "batch_size": batch_size, "concurrency": concurrency, "rows": total_items, "requests": len(request_batches), "input_chars": total_input_chars, "load_seconds": 0.0, "translate_seconds": round(wall_seconds, 4), "total_seconds": round(wall_seconds, 4), "batch_count": len(request_latencies_ms), "request_latency_p50_ms": round(percentile(request_latencies_ms, 0.50), 2), "request_latency_p95_ms": round(percentile(request_latencies_ms, 0.95), 2), "request_latency_max_ms": round(max(request_latencies_ms), 2), "avg_request_latency_ms": round(statistics.fmean(request_latencies_ms), 2), "avg_item_latency_ms": round((wall_seconds / total_items) * 1000, 3), "requests_per_second": round(len(request_batches) / wall_seconds, 2), "items_per_second": round(total_items / wall_seconds, 2), "input_chars_per_second": round(total_input_chars / wall_seconds, 2), "output_chars_per_second": round(output_chars / wall_seconds, 2), "success_count": success_count, "failure_count": failure_count, "success_rate": round(success_count / total_items, 6), "device": str(getattr(backend, "device", capability.get("device", "unknown"))), "torch_dtype": str(getattr(backend, "torch_dtype", capability.get("torch_dtype", "unknown"))), "configured_batch_size": int(capability.get("batch_size") or batch_size), "used_batch_size": batch_size, "warmup_batches": warmup_count, **memory, } def benchmark_single_scenario(args: argparse.Namespace) -> Dict[str, Any]: csv_path = (PROJECT_ROOT / args.csv_path).resolve() if not Path(args.csv_path).is_absolute() else Path(args.csv_path) scenario = scenario_from_args(args) config, capability = build_config_and_capability(args) configured_batch_size = int(capability.get("batch_size") or 1) batch_size = configured_batch_size texts = load_texts(csv_path, args.column, args.limit) ensure_cuda_stats_reset() load_start = time.perf_counter() service = TranslationService(config) backend = service.get_backend(args.model) load_seconds = time.perf_counter() - load_start runtime = benchmark_serial_case( service=service, backend=backend, scenario=scenario, capability=capability, texts=texts, batch_size=batch_size, warmup_batches=args.warmup_batches, ) runtime["load_seconds"] = round(load_seconds, 4) runtime["total_seconds"] = round(runtime["load_seconds"] + runtime["translate_seconds"], 4) return { "scenario": scenario, "dataset": { "csv_path": str(csv_path), "rows": len(texts), "input_chars": sum(len(text) for text in texts), }, "runtime": runtime, } def benchmark_extended_scenario(args: argparse.Namespace) -> Dict[str, Any]: csv_path = (PROJECT_ROOT / args.csv_path).resolve() if not Path(args.csv_path).is_absolute() else Path(args.csv_path) scenario = scenario_from_args(args) batch_sizes = parse_csv_ints(args.batch_size_list, DEFAULT_BATCH_SIZES) concurrencies = parse_csv_ints(args.concurrency_list, DEFAULT_CONCURRENCIES) largest_batch = max(batch_sizes + [args.concurrency_batch_size]) largest_concurrency = max(concurrencies) max_product = args.max_batch_concurrency_product required_items = max( args.limit or 0, max(args.serial_items_per_case, largest_batch), args.concurrency_requests_per_case * args.concurrency_batch_size, largest_batch * args.concurrency_requests_per_case, ) texts = load_texts(csv_path, args.column, required_items) config, capability = build_config_and_capability(args) ensure_cuda_stats_reset() load_start = time.perf_counter() service = TranslationService(config) backend = service.get_backend(args.model) load_seconds = time.perf_counter() - load_start batch_sweep: List[Dict[str, Any]] = [] concurrency_sweep: List[Dict[str, Any]] = [] matrix_results: List[Dict[str, Any]] = [] for batch_size in batch_sizes: case_texts = texts[: max(batch_size, args.serial_items_per_case)] batch_sweep.append( benchmark_serial_case( service=service, backend=backend, scenario=scenario, capability=capability, texts=case_texts, batch_size=batch_size, warmup_batches=args.warmup_batches, ) ) for concurrency in concurrencies: concurrency_sweep.append( benchmark_concurrency_case( service=service, backend=backend, scenario=scenario, capability=capability, texts=texts, batch_size=args.concurrency_batch_size, concurrency=concurrency, requests_per_case=args.concurrency_requests_per_case, warmup_batches=args.warmup_batches, ) ) for batch_size in batch_sizes: for concurrency in concurrencies: if max_product > 0 and batch_size * concurrency > max_product: continue matrix_results.append( benchmark_concurrency_case( service=service, backend=backend, scenario=scenario, capability=capability, texts=texts, batch_size=batch_size, concurrency=concurrency, requests_per_case=args.concurrency_requests_per_case, warmup_batches=args.warmup_batches, ) ) for collection in (batch_sweep, concurrency_sweep, matrix_results): for idx, item in enumerate(collection): item["load_seconds"] = round(load_seconds if idx == 0 else 0.0, 4) item["total_seconds"] = round(item["load_seconds"] + item["translate_seconds"], 4) return { "scenario": scenario, "dataset": { "csv_path": str(csv_path), "rows_loaded": len(texts), }, "config": { "batch_sizes": batch_sizes, "concurrencies": concurrencies, "serial_items_per_case": args.serial_items_per_case, "concurrency_requests_per_case": args.concurrency_requests_per_case, "concurrency_batch_size": args.concurrency_batch_size, "max_batch_concurrency_product": max_product, "cache_disabled": bool(args.disable_cache), }, "runtime_defaults": { "device": str(getattr(backend, "device", capability.get("device", "unknown"))), "torch_dtype": str(getattr(backend, "torch_dtype", capability.get("torch_dtype", "unknown"))), "configured_batch_size": int(capability.get("batch_size") or 1), "load_seconds": round(load_seconds, 4), }, "batch_sweep": batch_sweep, "concurrency_sweep": concurrency_sweep, "matrix": matrix_results, } def run_all_scenarios(args: argparse.Namespace) -> Dict[str, Any]: report = { "generated_at": datetime.now().isoformat(timespec="seconds"), "suite": args.suite, "environment": build_environment_info(), "scenarios": [], } for scenario in SCENARIOS: cmd = [ sys.executable, str(Path(__file__).resolve()), "--single", "--csv-path", args.csv_path, "--model", scenario["model"], "--source-lang", scenario["source_lang"], "--target-lang", scenario["target_lang"], "--column", scenario["column"], "--scene", scenario["scene"], "--warmup-batches", str(args.warmup_batches), "--suite", args.suite, "--serial-items-per-case", str(args.serial_items_per_case), "--concurrency-requests-per-case", str(args.concurrency_requests_per_case), "--concurrency-batch-size", str(args.concurrency_batch_size), "--max-batch-concurrency-product", str(args.max_batch_concurrency_product), ] if args.limit: cmd.extend(["--limit", str(args.limit)]) if args.batch_size: cmd.extend(["--batch-size", str(args.batch_size)]) if args.batch_size_list: cmd.extend(["--batch-size-list", args.batch_size_list]) if args.concurrency_list: cmd.extend(["--concurrency-list", args.concurrency_list]) if args.device_override: cmd.extend(["--device-override", args.device_override]) if args.torch_dtype_override: cmd.extend(["--torch-dtype-override", args.torch_dtype_override]) if args.max_new_tokens: cmd.extend(["--max-new-tokens", str(args.max_new_tokens)]) if args.num_beams: cmd.extend(["--num-beams", str(args.num_beams)]) if args.attn_implementation: cmd.extend(["--attn-implementation", args.attn_implementation]) if args.ct2_inter_threads >= 0: cmd.extend(["--ct2-inter-threads", str(args.ct2_inter_threads)]) if args.ct2_intra_threads >= 0: cmd.extend(["--ct2-intra-threads", str(args.ct2_intra_threads)]) if args.ct2_max_queued_batches >= 0: cmd.extend(["--ct2-max-queued-batches", str(args.ct2_max_queued_batches)]) if args.ct2_batch_type: cmd.extend(["--ct2-batch-type", args.ct2_batch_type]) if args.ct2_decoding_length_mode: cmd.extend(["--ct2-decoding-length-mode", args.ct2_decoding_length_mode]) if args.ct2_decoding_length_extra: cmd.extend(["--ct2-decoding-length-extra", str(args.ct2_decoding_length_extra)]) if args.ct2_decoding_length_min: cmd.extend(["--ct2-decoding-length-min", str(args.ct2_decoding_length_min)]) if args.disable_cache: cmd.append("--disable-cache") completed = subprocess.run(cmd, capture_output=True, text=True, check=True) result_line = "" for line in reversed(completed.stdout.splitlines()): if line.startswith("JSON_RESULT="): result_line = line break if not result_line: raise RuntimeError(f"Scenario output missing JSON_RESULT marker:\n{completed.stdout}\n{completed.stderr}") payload = json.loads(result_line.split("=", 1)[1]) payload["scenario"]["name"] = scenario["name"] report["scenarios"].append(payload) return report def render_baseline_markdown_report(report: Dict[str, Any]) -> str: lines = [ "# Local Translation Model Benchmark", "", f"- Generated at: `{report['generated_at']}`", f"- Suite: `{report['suite']}`", f"- Python: `{report['environment']['python']}`", f"- Torch: `{report['environment']['torch']}`", f"- Transformers: `{report['environment']['transformers']}`", f"- CUDA: `{report['environment']['cuda_available']}`", ] if report["environment"]["gpu_name"]: lines.append(f"- GPU: `{report['environment']['gpu_name']}` ({report['environment']['gpu_total_mem_gb']} GiB)") lines.extend( [ "", "| Scenario | Items/s | Avg item ms | Req p50 ms | Req p95 ms | Load s | Peak GPU GiB | Success |", "|---|---:|---:|---:|---:|---:|---:|---:|", ] ) for item in report["scenarios"]: runtime = item["runtime"] lines.append( "| {name} | {items_per_second} | {avg_item_latency_ms} | {request_latency_p50_ms} | {request_latency_p95_ms} | {load_seconds} | {peak_gpu_memory_gb} | {success_rate} |".format( name=item["scenario"]["name"], items_per_second=runtime["items_per_second"], avg_item_latency_ms=runtime["avg_item_latency_ms"], request_latency_p50_ms=runtime["request_latency_p50_ms"], request_latency_p95_ms=runtime["request_latency_p95_ms"], load_seconds=runtime["load_seconds"], peak_gpu_memory_gb=runtime["peak_gpu_memory_gb"], success_rate=runtime["success_rate"], ) ) lines.append("") for item in report["scenarios"]: runtime = item["runtime"] dataset = item["dataset"] lines.extend( [ f"## {item['scenario']['name']}", "", f"- Dataset rows: `{dataset['rows']}` from column `{item['scenario']['column']}`", f"- Direction: `{item['scenario']['source_lang']} -> {item['scenario']['target_lang']}`", f"- Batch size: configured `{runtime['configured_batch_size']}`, used `{runtime['used_batch_size']}`", f"- Load time: `{runtime['load_seconds']} s`", f"- Translate time: `{runtime['translate_seconds']} s`", f"- Throughput: `{runtime['items_per_second']} items/s`, `{runtime['input_chars_per_second']} input chars/s`", f"- Latency: avg item `{runtime['avg_item_latency_ms']} ms`, req p50 `{runtime['request_latency_p50_ms']} ms`, req p95 `{runtime['request_latency_p95_ms']} ms`, req max `{runtime['request_latency_max_ms']} ms`", f"- Memory: max RSS `{runtime['max_rss_mb']} MB`, peak GPU allocated `{runtime['peak_gpu_memory_gb']} GiB`, peak GPU reserved `{runtime['peak_gpu_reserved_gb']} GiB`", f"- Success: `{runtime['success_count']}/{dataset['rows']}`", "", ] ) return "\n".join(lines) def render_case_table( title: str, rows: Sequence[Dict[str, Any]], *, include_batch: bool, include_concurrency: bool, ) -> List[str]: headers = ["Rows", "Requests", "Items/s", "Req/s", "Avg req ms", "Req p50 ms", "Req p95 ms", "Peak GPU GiB"] prefix_headers: List[str] = [] if include_batch: prefix_headers.append("Batch") if include_concurrency: prefix_headers.append("Concurrency") headers = prefix_headers + headers lines = [f"### {title}", ""] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "|".join(["---:"] * len(headers)) + "|") for item in rows: values: List[str] = [] if include_batch: values.append(str(item["batch_size"])) if include_concurrency: values.append(str(item["concurrency"])) values.extend( [ str(item["rows"]), str(item["requests"]), str(item["items_per_second"]), str(item["requests_per_second"]), str(item["avg_request_latency_ms"]), str(item["request_latency_p50_ms"]), str(item["request_latency_p95_ms"]), str(item["peak_gpu_memory_gb"]), ] ) lines.append("| " + " | ".join(values) + " |") lines.append("") return lines def render_extended_markdown_report(report: Dict[str, Any]) -> str: lines = [ "# Local Translation Model Extended Benchmark", "", f"- Generated at: `{report['generated_at']}`", f"- Suite: `{report['suite']}`", f"- Python: `{report['environment']['python']}`", f"- Torch: `{report['environment']['torch']}`", f"- Transformers: `{report['environment']['transformers']}`", f"- CUDA: `{report['environment']['cuda_available']}`", ] if report["environment"]["gpu_name"]: lines.append(f"- GPU: `{report['environment']['gpu_name']}` ({report['environment']['gpu_total_mem_gb']} GiB)") lines.extend( [ "", "## Reading Guide", "", "- `batch_sweep`: single stream only (`concurrency=1`), used to compare bulk translation efficiency across batch sizes.", "- `concurrency_sweep`: fixed request batch size, used to compare online request latency and throughput as concurrency rises.", "- `matrix`: combined `batch_size x concurrency` runs, filtered by `batch_size * concurrency <= limit` when configured.", "", ] ) for item in report["scenarios"]: lines.extend( [ f"## {item['scenario']['name']}", "", f"- Direction: `{item['scenario']['source_lang']} -> {item['scenario']['target_lang']}`", f"- Column: `{item['scenario']['column']}`", f"- Loaded rows: `{item['dataset']['rows_loaded']}`", f"- Load time: `{item['runtime_defaults']['load_seconds']} s`", f"- Device: `{item['runtime_defaults']['device']}`", f"- DType: `{item['runtime_defaults']['torch_dtype']}`", f"- Cache disabled: `{item['config']['cache_disabled']}`", "", ] ) lines.extend(render_case_table("Batch Sweep (`concurrency=1`)", item["batch_sweep"], include_batch=True, include_concurrency=False)) lines.extend( render_case_table( f"Concurrency Sweep (`batch_size={item['config']['concurrency_batch_size']}`)", item["concurrency_sweep"], include_batch=False, include_concurrency=True, ) ) lines.extend(render_case_table("Batch x Concurrency Matrix", item["matrix"], include_batch=True, include_concurrency=True)) return "\n".join(lines) def render_markdown_report(report: Dict[str, Any]) -> str: if report["suite"] == "extended": return render_extended_markdown_report(report) return render_baseline_markdown_report(report) def main() -> None: args = parse_args() if args.single: if args.suite == "extended": result = benchmark_extended_scenario(args) else: result = benchmark_single_scenario(args) print("JSON_RESULT=" + json.dumps(result, ensure_ascii=False)) return report = run_all_scenarios(args) output_dir = resolve_output_dir(args.output_dir) timestamp = datetime.now().strftime("%H%M%S") suffix = "extended" if args.suite == "extended" else "baseline" json_path = output_dir / f"translation_local_models_{suffix}_{timestamp}.json" md_path = output_dir / f"translation_local_models_{suffix}_{timestamp}.md" json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") md_path.write_text(render_markdown_report(report), encoding="utf-8") print(f"JSON report: {json_path}") print(f"Markdown report: {md_path}") for item in report["scenarios"]: if args.suite == "extended": best_batch = max(item["batch_sweep"], key=lambda x: x["items_per_second"]) best_concurrency = max(item["concurrency_sweep"], key=lambda x: x["items_per_second"]) print( f"{item['scenario']['name']}: " f"best_batch={best_batch['batch_size']} ({best_batch['items_per_second']} items/s) | " f"best_concurrency={best_concurrency['concurrency']} ({best_concurrency['items_per_second']} items/s @ batch={best_concurrency['batch_size']})" ) else: runtime = item["runtime"] print( f"{item['scenario']['name']}: " f"{runtime['items_per_second']} items/s | " f"avg_item={runtime['avg_item_latency_ms']} ms | " f"p95_req={runtime['request_latency_p95_ms']} ms | " f"load={runtime['load_seconds']} s" ) if __name__ == "__main__": main()