#!/usr/bin/env python3 from __future__ import annotations import argparse import copy import csv import json import math import random import re import shutil import subprocess import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Sequence import numpy as np import requests import yaml try: from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import ConstantKernel, Matern, WhiteKernel except Exception: # noqa: BLE001 GaussianProcessRegressor = None # type: ignore[assignment] ConstantKernel = None # type: ignore[assignment] Matern = None # type: ignore[assignment] WhiteKernel = None # type: ignore[assignment] PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from scripts.evaluation.eval_framework import ( # noqa: E402 DEFAULT_ARTIFACT_ROOT, DEFAULT_QUERY_FILE, ensure_dir, utc_now_iso, utc_timestamp, ) from scripts.evaluation.eval_framework.datasets import resolve_dataset CONFIG_PATH = PROJECT_ROOT / "config" / "config.yaml" LOG_DIR = PROJECT_ROOT / "logs" @dataclass class ExperimentSpec: name: str description: str params: Dict[str, Any] @dataclass class ParameterSpec: name: str lower: float upper: float scale: str = "linear" round_digits: int = 6 def __post_init__(self) -> None: if self.lower >= self.upper: raise ValueError(f"invalid bounds for {self.name}: {self.lower} >= {self.upper}") if self.scale not in {"linear", "log"}: raise ValueError(f"unsupported scale={self.scale!r} for {self.name}") if self.scale == "log" and (self.lower <= 0 or self.upper <= 0): raise ValueError(f"log-scaled parameter {self.name} must have positive bounds") @property def transformed_lower(self) -> float: return math.log10(self.lower) if self.scale == "log" else self.lower @property def transformed_upper(self) -> float: return math.log10(self.upper) if self.scale == "log" else self.upper @property def transformed_span(self) -> float: return self.transformed_upper - self.transformed_lower def transform(self, value: float) -> float: clipped = min(max(float(value), self.lower), self.upper) return math.log10(clipped) if self.scale == "log" else clipped def inverse_transform(self, value: float) -> float: raw = (10 ** value) if self.scale == "log" else value raw = min(max(float(raw), self.lower), self.upper) return round(raw, self.round_digits) def sample_uniform(self, rng: random.Random) -> float: draw = rng.uniform(self.transformed_lower, self.transformed_upper) return self.inverse_transform(draw) @dataclass class SearchSpace: target_path: str baseline: Dict[str, float] parameters: List[ParameterSpec] seed_experiments: List[ExperimentSpec] init_random: int = 6 candidate_pool_size: int = 256 explore_probability: float = 0.25 local_jitter_probability: float = 0.45 elite_fraction: float = 0.35 min_normalized_distance: float = 0.14 @property def parameter_names(self) -> List[str]: return [item.name for item in self.parameters] def fill_params(self, params: Dict[str, Any]) -> Dict[str, float]: merged = {name: float(self.baseline[name]) for name in self.parameter_names} for name, value in params.items(): if name not in merged: raise KeyError(f"unknown parameter in search space: {name}") merged[name] = float(value) return { spec.name: spec.inverse_transform(spec.transform(float(merged[spec.name]))) for spec in self.parameters } def sample_random(self, rng: random.Random) -> Dict[str, float]: return {spec.name: spec.sample_uniform(rng) for spec in self.parameters} def vectorize(self, params: Dict[str, Any]) -> np.ndarray: merged = self.fill_params(params) return np.array([spec.transform(float(merged[spec.name])) for spec in self.parameters], dtype=float) def from_vector(self, vector: Sequence[float]) -> Dict[str, float]: return { spec.name: spec.inverse_transform(float(vector[idx])) for idx, spec in enumerate(self.parameters) } def normalized_vector(self, params: Dict[str, Any]) -> np.ndarray: vector = self.vectorize(params) parts: List[float] = [] for idx, spec in enumerate(self.parameters): parts.append((vector[idx] - spec.transformed_lower) / max(spec.transformed_span, 1e-9)) return np.array(parts, dtype=float) def canonical_key(self, params: Dict[str, Any]) -> str: return json.dumps(self.fill_params(params), ensure_ascii=False, sort_keys=True) @dataclass class CandidateProposal: name: str description: str params: Dict[str, float] source: str def load_yaml(path: Path) -> Dict[str, Any]: return yaml.safe_load(path.read_text(encoding="utf-8")) def write_yaml(path: Path, payload: Dict[str, Any]) -> None: path.write_text( yaml.safe_dump(payload, sort_keys=False, allow_unicode=True), encoding="utf-8", ) def get_nested_value(payload: Dict[str, Any], dotted_path: str) -> Any: current: Any = payload for part in dotted_path.split("."): current = current[part] return current def set_nested_value(payload: Dict[str, Any], dotted_path: str, value: Any) -> None: current = payload parts = dotted_path.split(".") for part in parts[:-1]: current = current[part] current[parts[-1]] = value def apply_target_params(base_config: Dict[str, Any], target_path: str, params: Dict[str, Any]) -> Dict[str, Any]: candidate = copy.deepcopy(base_config) for key, value in params.items(): set_nested_value(candidate, f"{target_path}.{key}", value) return candidate def read_queries(path: Path) -> List[str]: return [ line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] def run_restart(targets: Sequence[str]) -> None: cmd = ["./restart.sh", *targets] subprocess.run(cmd, cwd=PROJECT_ROOT, check=True, timeout=900) def bytes_to_gib(value: int) -> float: return float(value) / float(1024 ** 3) def get_free_disk_bytes(path: Path) -> int: return int(shutil.disk_usage(path).free) def iter_log_cleanup_candidates() -> List[Path]: if not LOG_DIR.is_dir(): return [] items: List[Path] = [] seen: set[str] = set() for path in LOG_DIR.rglob("*"): try: if not path.is_file(): continue resolved = path.resolve() key = str(resolved) if key in seen: continue seen.add(key) items.append(resolved) except FileNotFoundError: continue items.sort(key=lambda item: item.stat().st_size if item.exists() else 0, reverse=True) return items def truncate_file(path: Path) -> int: if not path.exists() or not path.is_file(): return 0 size = int(path.stat().st_size) if size <= 0: return 0 with path.open("w", encoding="utf-8"): pass return size def ensure_disk_headroom( *, min_free_gb: float, auto_truncate_logs: bool, context: str, ) -> None: required_bytes = int(min_free_gb * (1024 ** 3)) free_bytes = get_free_disk_bytes(PROJECT_ROOT) if free_bytes >= required_bytes: return print( f"[disk] low free space before {context}: " f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB" ) if not auto_truncate_logs: raise RuntimeError( f"insufficient disk headroom before {context}: " f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB" ) reclaimed_bytes = 0 for candidate in iter_log_cleanup_candidates(): try: reclaimed = truncate_file(candidate) except Exception as exc: # noqa: BLE001 print(f"[disk] skip truncate {candidate}: {exc}") continue if reclaimed <= 0: continue reclaimed_bytes += reclaimed free_bytes = get_free_disk_bytes(PROJECT_ROOT) print( f"[disk] truncated {candidate} reclaimed={bytes_to_gib(reclaimed):.2f}GiB " f"free_now={bytes_to_gib(free_bytes):.2f}GiB" ) if free_bytes >= required_bytes: return raise RuntimeError( f"insufficient disk headroom after log truncation before {context}: " f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB " f"reclaimed={bytes_to_gib(reclaimed_bytes):.2f}GiB" ) def wait_for_backend(base_url: str, timeout_sec: float = 300.0) -> Dict[str, Any]: deadline = time.time() + timeout_sec last_error: Any = None while time.time() < deadline: try: response = requests.get(f"{base_url.rstrip('/')}/health", timeout=10) response.raise_for_status() payload = response.json() if str(payload.get("status")) == "healthy": return payload last_error = payload except Exception as exc: # noqa: BLE001 last_error = str(exc) time.sleep(2.0) raise RuntimeError(f"backend did not become healthy: {last_error}") def wait_for_eval_web(base_url: str, timeout_sec: float = 90.0) -> Dict[str, Any]: url = f"{base_url.rstrip('/')}/api/history" deadline = time.time() + timeout_sec last_error: Any = None while time.time() < deadline: try: response = requests.get(url, timeout=10) response.raise_for_status() payload = response.json() if isinstance(payload, dict) and "history" in payload: return payload last_error = payload except Exception as exc: # noqa: BLE001 last_error = str(exc) time.sleep(2.0) raise RuntimeError(f"eval-web did not become healthy: {last_error}") def ensure_eval_web(eval_web_base_url: str) -> Dict[str, Any]: try: return wait_for_eval_web(eval_web_base_url, timeout_sec=20.0) except Exception: # noqa: BLE001 run_restart(["eval-web"]) return wait_for_eval_web(eval_web_base_url, timeout_sec=120.0) def verify_backend_config(base_url: str, target_path: str, expected: Dict[str, Any], tol: float = 1e-6) -> bool: response = requests.get(f"{base_url.rstrip('/')}/admin/config", timeout=20) response.raise_for_status() payload = response.json() candidate_paths = [target_path] if not target_path.startswith("search."): candidate_paths.append(f"search.{target_path}") if target_path.startswith("search."): candidate_paths.append(target_path[len("search."):]) live_block = None for path in candidate_paths: try: maybe_block = get_nested_value(payload, path) except Exception: # noqa: BLE001 continue if isinstance(maybe_block, dict): live_block = maybe_block break if live_block is None: raise RuntimeError( f"unable to resolve backend config path {target_path!r}; " f"tried={candidate_paths!r} top_level_keys={sorted(payload.keys())[:20]!r}" ) for key, expected_value in expected.items(): live_value = live_block[key] if isinstance(expected_value, (int, float)): if abs(float(live_value) - float(expected_value)) > tol: raise RuntimeError( f"backend config mismatch for {target_path}.{key}: " f"expected={expected_value} live={live_value}" ) elif live_value != expected_value: raise RuntimeError( f"backend config mismatch for {target_path}.{key}: expected={expected_value!r} live={live_value!r}" ) return True def run_batch_eval( *, tenant_id: str, dataset_id: str | None, queries_file: Path, top_k: int, language: str, force_refresh_labels: bool, ) -> Dict[str, Any]: cmd = [ str(PROJECT_ROOT / ".venv" / "bin" / "python"), "scripts/evaluation/build_annotation_set.py", "batch", "--tenant-id", str(tenant_id), "--top-k", str(top_k), "--language", language, ] if dataset_id: cmd.extend(["--dataset-id", dataset_id]) else: cmd.extend(["--queries-file", str(queries_file)]) if force_refresh_labels: cmd.append("--force-refresh-labels") completed = subprocess.run( cmd, cwd=PROJECT_ROOT, check=True, capture_output=True, text=True, timeout=7200, ) output = (completed.stdout or "") + "\n" + (completed.stderr or "") batch_ids = re.findall(r"batch_id=([A-Za-z0-9_]+)", output) if not batch_ids: raise RuntimeError(f"failed to parse batch output: {output[-2000:]}") batch_id = batch_ids[-1] pattern = f"datasets/*/batch_reports/{batch_id}/report.json" matches = sorted(DEFAULT_ARTIFACT_ROOT.glob(pattern)) batch_json_path = matches[0] if matches else (DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_id}.json") if not batch_json_path.is_file(): raise RuntimeError(f"batch json not found after eval: {batch_json_path}") payload = json.loads(batch_json_path.read_text(encoding="utf-8")) report_path = batch_json_path.with_name("report.md") if not report_path.is_file(): report_path = DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_id}.md" return { "batch_id": batch_id, "payload": payload, "raw_output": output, "batch_json_path": str(batch_json_path), "batch_report_path": str(report_path), } def resolve_batch_json_path(path_like: str) -> Path: path = Path(path_like) if not path.is_absolute(): path = (PROJECT_ROOT / path).resolve() if path.suffix == ".json": return path if path.suffix == ".md": candidate = path.with_suffix(".json") if candidate.is_file(): return candidate if path.is_file(): return path candidate = path.parent / f"{path.name}.json" if candidate.is_file(): return candidate raise FileNotFoundError(f"cannot resolve batch json from: {path_like}") def load_batch_payload(path_like: str) -> Dict[str, Any]: path = resolve_batch_json_path(path_like) return json.loads(path.read_text(encoding="utf-8")) def load_experiments(path: Path) -> List[ExperimentSpec]: payload = json.loads(path.read_text(encoding="utf-8")) items = payload["experiments"] if isinstance(payload, dict) else payload experiments: List[ExperimentSpec] = [] for item in items: experiments.append( ExperimentSpec( name=str(item["name"]), description=str(item.get("description") or ""), params=dict(item.get("params") or {}), ) ) return experiments def load_search_space(path: Path) -> SearchSpace: payload = load_yaml(path) parameters = [ ParameterSpec( name=str(name), lower=float(spec["min"]), upper=float(spec["max"]), scale=str(spec.get("scale", "linear")), round_digits=int(spec.get("round", 6)), ) for name, spec in dict(payload["parameters"]).items() ] baseline = {str(key): float(value) for key, value in dict(payload["baseline"]).items()} seed_experiments = [ ExperimentSpec( name=str(item["name"]), description=str(item.get("description") or ""), params={str(k): float(v) for k, v in dict(item.get("params") or {}).items()}, ) for item in list(payload.get("seed_experiments") or []) ] optimizer = dict(payload.get("optimizer") or {}) return SearchSpace( target_path=str(payload["target_path"]), baseline=baseline, parameters=parameters, seed_experiments=seed_experiments, init_random=int(optimizer.get("init_random", 6)), candidate_pool_size=int(optimizer.get("candidate_pool_size", 256)), explore_probability=float(optimizer.get("explore_probability", 0.25)), local_jitter_probability=float(optimizer.get("local_jitter_probability", 0.45)), elite_fraction=float(optimizer.get("elite_fraction", 0.35)), min_normalized_distance=float(optimizer.get("min_normalized_distance", 0.14)), ) def load_existing_trials(run_dir: Path) -> List[Dict[str, Any]]: path = run_dir / "trials.jsonl" if not path.is_file(): return [] trials: List[Dict[str, Any]] = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if line: trials.append(json.loads(line)) return trials def append_trial(run_dir: Path, trial: Dict[str, Any]) -> None: path = run_dir / "trials.jsonl" with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(trial, ensure_ascii=False) + "\n") def live_success_trials(trials: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: return [ item for item in trials if item.get("status") == "ok" and not bool(item.get("is_seed")) ] def all_success_trials(trials: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: return [item for item in trials if item.get("status") == "ok"] def score_of(trial: Dict[str, Any], metric: str) -> float: return float((trial.get("aggregate_metrics") or {}).get(metric, trial.get("score", 0.0)) or 0.0) def next_trial_name(trials: Sequence[Dict[str, Any]], prefix: str) -> str: return f"{prefix}_{len(trials) + 1:03d}" def normal_pdf(x: float) -> float: return math.exp(-0.5 * x * x) / math.sqrt(2.0 * math.pi) def normal_cdf(x: float) -> float: return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) def expected_improvement(mu: float, sigma: float, best: float, xi: float = 0.002) -> float: if sigma <= 1e-12: return max(mu - best - xi, 0.0) z = (mu - best - xi) / sigma return (mu - best - xi) * normal_cdf(z) + sigma * normal_pdf(z) def normalized_distance(space: SearchSpace, left: Dict[str, Any], right: Dict[str, Any]) -> float: lv = space.normalized_vector(left) rv = space.normalized_vector(right) return float(np.linalg.norm(lv - rv) / math.sqrt(len(space.parameters))) def fit_surrogate(space: SearchSpace, trials: Sequence[Dict[str, Any]], metric: str, seed: int) -> Any: if GaussianProcessRegressor is None or len(trials) < 4: return None X = np.array([space.vectorize(item["params"]) for item in trials], dtype=float) y = np.array([score_of(item, metric) for item in trials], dtype=float) if len(np.unique(np.round(y, 8))) < 2: return None try: kernel = ( ConstantKernel(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(len(space.parameters)), length_scale_bounds=(1e-2, 1e2), nu=2.5) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-8, 1e-1)) ) gp = GaussianProcessRegressor( kernel=kernel, normalize_y=True, n_restarts_optimizer=2, random_state=seed, ) gp.fit(X, y) return gp except Exception: # noqa: BLE001 return None def build_sampling_spread(space: SearchSpace, elite_vectors: np.ndarray) -> np.ndarray: spans = np.array([spec.transformed_span for spec in space.parameters], dtype=float) floor = np.maximum(spans * 0.05, 0.015) ceiling = np.maximum(spans * 0.5, floor) if elite_vectors.shape[0] <= 1: return np.minimum(np.maximum(spans * 0.18, floor), ceiling) elite_std = elite_vectors.std(axis=0) elite_range = elite_vectors.max(axis=0) - elite_vectors.min(axis=0) spread = np.maximum(elite_std * 1.8, elite_range * 0.5) return np.minimum(np.maximum(spread, floor), ceiling) def sample_local_candidate( space: SearchSpace, rng: random.Random, center: np.ndarray, spread: np.ndarray, ) -> Dict[str, float]: draw = [] for idx, spec in enumerate(space.parameters): value = rng.gauss(float(center[idx]), float(spread[idx])) value = min(max(value, spec.transformed_lower), spec.transformed_upper) draw.append(value) return space.from_vector(draw) def sample_crossover_candidate( space: SearchSpace, rng: random.Random, left: np.ndarray, right: np.ndarray, ) -> Dict[str, float]: draw = [] for idx, spec in enumerate(space.parameters): mix = rng.random() value = float(left[idx]) * mix + float(right[idx]) * (1.0 - mix) jitter = spec.transformed_span * 0.04 value += rng.uniform(-jitter, jitter) value = min(max(value, spec.transformed_lower), spec.transformed_upper) draw.append(value) return space.from_vector(draw) def propose_candidates( *, space: SearchSpace, trials: Sequence[Dict[str, Any]], metric: str, batch_size: int, rng: random.Random, init_random: int, candidate_pool_size: int, ) -> List[CandidateProposal]: existing_keys = {space.canonical_key(item["params"]) for item in trials if item.get("params")} proposals: List[CandidateProposal] = [] for seed in space.seed_experiments: params = space.fill_params(seed.params) key = space.canonical_key(params) if key not in existing_keys: proposals.append( CandidateProposal( name=seed.name, description=seed.description, params=params, source="seed_experiment", ) ) existing_keys.add(key) if len(proposals) >= batch_size: return proposals successes = live_success_trials(trials) if len(successes) < init_random: while len(proposals) < batch_size: params = space.sample_random(rng) key = space.canonical_key(params) if key in existing_keys: continue proposals.append( CandidateProposal( name=f"random_{len(successes) + len(proposals) + 1:03d}", description="global random exploration", params=params, source="random", ) ) existing_keys.add(key) return proposals ranked = sorted(successes, key=lambda item: score_of(item, metric), reverse=True) elite_count = max(2, min(len(ranked), int(math.ceil(len(ranked) * space.elite_fraction)))) elites = ranked[:elite_count] elite_vectors = np.array([space.vectorize(item["params"]) for item in elites], dtype=float) spread = build_sampling_spread(space, elite_vectors) gp = fit_surrogate(space, successes, metric, seed=rng.randint(1, 10_000_000)) best_score = score_of(ranked[0], metric) best_vector = space.vectorize(ranked[0]["params"]) pool: List[Dict[str, Any]] = [] pool_keys = set(existing_keys) attempts = 0 max_attempts = max(candidate_pool_size * 12, 200) while len(pool) < candidate_pool_size and attempts < max_attempts: attempts += 1 roll = rng.random() if roll < space.explore_probability: params = space.sample_random(rng) source = "global_explore" elif roll < space.explore_probability + space.local_jitter_probability: center = elite_vectors[rng.randrange(len(elite_vectors))] params = sample_local_candidate(space, rng, center=center, spread=spread) source = "elite_jitter" else: if len(elite_vectors) >= 2: left = elite_vectors[rng.randrange(len(elite_vectors))] right = elite_vectors[rng.randrange(len(elite_vectors))] params = sample_crossover_candidate(space, rng, left=left, right=right) source = "elite_crossover" else: params = sample_local_candidate(space, rng, center=best_vector, spread=spread) source = "best_jitter" key = space.canonical_key(params) if key in pool_keys: continue pool_keys.add(key) pool.append({"params": params, "source": source}) if not pool: return proposals if gp is not None: X = np.array([space.vectorize(item["params"]) for item in pool], dtype=float) mu, sigma = gp.predict(X, return_std=True) for idx, item in enumerate(pool): item["acquisition"] = expected_improvement(float(mu[idx]), float(sigma[idx]), best_score) item["uncertainty"] = float(sigma[idx]) item["predicted_score"] = float(mu[idx]) pool.sort( key=lambda item: ( float(item.get("acquisition") or 0.0), float(item.get("uncertainty") or 0.0), float(item.get("predicted_score") or 0.0), ), reverse=True, ) else: rng.shuffle(pool) chosen_params = [item.params for item in proposals] chosen: List[CandidateProposal] = [] for item in pool: params = item["params"] if any(normalized_distance(space, params, other) < space.min_normalized_distance for other in chosen_params): continue chosen_params.append(params) chosen.append( CandidateProposal( name=f"bo_{len(successes) + len(proposals) + len(chosen) + 1:03d}", description=( f"{item['source']} predicted={item.get('predicted_score', 'n/a')} " f"ei={item.get('acquisition', 'n/a')}" ), params=params, source=str(item["source"]), ) ) if len(proposals) + len(chosen) >= batch_size: break proposals.extend(chosen) if len(proposals) < batch_size: while len(proposals) < batch_size: params = space.sample_random(rng) key = space.canonical_key(params) if key in existing_keys: continue proposals.append( CandidateProposal( name=f"fallback_{len(successes) + len(proposals) + 1:03d}", description="fallback random exploration", params=params, source="fallback_random", ) ) existing_keys.add(key) return proposals def compare_query_deltas( baseline_payload: Dict[str, Any] | None, best_payload: Dict[str, Any] | None, metric: str, limit: int = 8, ) -> Dict[str, List[Dict[str, Any]]]: if not baseline_payload or not best_payload: return {"gains": [], "losses": []} base = { str(item["query"]): float(item["metrics"].get(metric, 0.0)) for item in baseline_payload.get("per_query") or [] } cur = { str(item["query"]): float(item["metrics"].get(metric, 0.0)) for item in best_payload.get("per_query") or [] } rows: List[Dict[str, Any]] = [] for query, score in cur.items(): if query not in base: continue rows.append( { "query": query, "baseline": round(base[query], 6), "current": round(score, 6), "delta": round(score - base[query], 6), } ) rows.sort(key=lambda item: item["delta"], reverse=True) gains = [item for item in rows[:limit] if item["delta"] > 0] losses = [item for item in rows[-limit:] if item["delta"] < 0] losses.sort(key=lambda item: item["delta"]) return {"gains": gains, "losses": losses} def render_markdown( *, run_id: str, created_at: str, tenant_id: str, dataset_id: str, dataset_name: str, query_count: int, top_k: int, metric: str, trials: Sequence[Dict[str, Any]], ) -> str: successes = sorted(all_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) live_successes = sorted(live_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) best = successes[0] if successes else None baseline = next((item for item in successes if item.get("is_seed")), None) best_payload = load_batch_payload(best["batch_json_path"]) if best and best.get("batch_json_path") else None baseline_payload = ( load_batch_payload(baseline["batch_json_path"]) if baseline and baseline.get("batch_json_path") else None ) delta_summary = compare_query_deltas(baseline_payload, best_payload, metric) if best else {"gains": [], "losses": []} lines = [ "# Fusion Tuning Report", "", f"- Run ID: {run_id}", f"- Created at: {created_at}", f"- Tenant ID: {tenant_id}", f"- Dataset ID: {dataset_id}", f"- Dataset Name: {dataset_name}", f"- Query count: {query_count}", f"- Top K: {top_k}", f"- Score metric: {metric}", f"- Successful live evals: {len(live_successes)}", "", "## Leaderboard", "", "| Rank | Name | Source | Score | Primary | NDCG@20 | ERR@10 | Gain Recall@20 | Batch |", "|---|---|---|---:|---:|---:|---:|---:|---|", ] for idx, item in enumerate(successes, start=1): metrics = item.get("aggregate_metrics") or {} lines.append( "| " + " | ".join( [ str(idx), str(item.get("name") or ""), str(item.get("source") or ""), f"{score_of(item, metric):.6f}", str(metrics.get("Primary_Metric_Score", "")), str(metrics.get("NDCG@20", "")), str(metrics.get("ERR@10", "")), str(metrics.get("Gain_Recall@20", "")), str(item.get("batch_id") or ""), ] ) + " |" ) if best: lines.extend( [ "", "## Best Params", "", f"- Name: {best['name']}", f"- Source: {best['source']}", f"- Score: {score_of(best, metric):.6f}", f"- Params: `{json.dumps(best['params'], ensure_ascii=False, sort_keys=True)}`", f"- Batch report: {best.get('batch_report_path') or ''}", ] ) if delta_summary["gains"] or delta_summary["losses"]: lines.extend(["", "## Best vs Baseline", ""]) if delta_summary["gains"]: lines.append("### Top Gains") lines.append("") for item in delta_summary["gains"]: lines.append( f"- {item['query']}: {item['baseline']:.6f} -> {item['current']:.6f} ({item['delta']:+.6f})" ) if delta_summary["losses"]: lines.append("") lines.append("### Top Losses") lines.append("") for item in delta_summary["losses"]: lines.append( f"- {item['query']}: {item['baseline']:.6f} -> {item['current']:.6f} ({item['delta']:+.6f})" ) failures = [item for item in trials if item.get("status") != "ok"] if failures: lines.extend(["", "## Failures", ""]) for item in failures: lines.append(f"- {item.get('name')}: {item.get('error')}") return "\n".join(lines) + "\n" def write_leaderboard_csv(run_dir: Path, metric: str, trials: Sequence[Dict[str, Any]], parameter_names: Sequence[str]) -> None: path = run_dir / "leaderboard.csv" successes = sorted(all_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.writer(handle) writer.writerow( [ "rank", "name", "source", "score", "Primary_Metric_Score", "NDCG@20", "ERR@10", "Gain_Recall@20", "batch_id", *parameter_names, ] ) for idx, item in enumerate(successes, start=1): metrics = item.get("aggregate_metrics") or {} row = [ idx, item.get("name") or "", item.get("source") or "", f"{score_of(item, metric):.6f}", metrics.get("Primary_Metric_Score", ""), metrics.get("NDCG@20", ""), metrics.get("ERR@10", ""), metrics.get("Gain_Recall@20", ""), item.get("batch_id") or "", ] row.extend(item.get("params", {}).get(name, "") for name in parameter_names) writer.writerow(row) def persist_run_summary( *, run_dir: Path, run_id: str, tenant_id: str, dataset_id: str, dataset_name: str, query_count: int, top_k: int, metric: str, trials: Sequence[Dict[str, Any]], parameter_names: Sequence[str], ) -> None: summary = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": tenant_id, "dataset_id": dataset_id, "dataset_name": dataset_name, "query_count": query_count, "top_k": top_k, "score_metric": metric, "trials": list(trials), } (run_dir / "summary.json").write_text( json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8", ) (run_dir / "summary.md").write_text( render_markdown( run_id=run_id, created_at=summary["created_at"], tenant_id=tenant_id, dataset_id=dataset_id, dataset_name=dataset_name, query_count=query_count, top_k=top_k, metric=metric, trials=trials, ), encoding="utf-8", ) write_leaderboard_csv(run_dir, metric, trials, parameter_names) def run_experiment_mode(args: argparse.Namespace) -> None: dataset = resolve_dataset( dataset_id=getattr(args, "dataset_id", None), query_file=Path(args.queries_file).resolve() if getattr(args, "queries_file", None) else None, tenant_id=str(args.tenant_id), language=str(args.language), ) args.dataset_id = dataset.dataset_id args.queries_file = str(dataset.query_file) args.tenant_id = dataset.tenant_id args.language = dataset.language queries_file = dataset.query_file queries = list(dataset.queries) base_config_text = CONFIG_PATH.read_text(encoding="utf-8") base_config = load_yaml(CONFIG_PATH) experiments = load_experiments(Path(args.experiments_file)) tuning_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "tuning_runs") run_id = args.run_name or f"tuning_{utc_timestamp()}" run_dir = ensure_dir(tuning_dir / run_id) results: List[Dict[str, Any]] = [] try: for experiment in experiments: params = dict(experiment.params) target_path = args.target_path or "coarse_rank.fusion" candidate = apply_target_params(base_config, target_path, params) write_yaml(CONFIG_PATH, candidate) candidate_config_path = ensure_dir(run_dir / "configs") / f"{experiment.name}_config.yaml" write_yaml(candidate_config_path, candidate) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"restart {experiment.name}", ) run_restart(args.restart_targets) health = wait_for_backend(args.search_base_url) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"batch eval {experiment.name}", ) batch_result = run_batch_eval( tenant_id=args.tenant_id, dataset_id=args.dataset_id, queries_file=queries_file, top_k=args.top_k, language=args.language, force_refresh_labels=bool(args.force_refresh_labels_first_pass and not results), ) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"persist {experiment.name}", ) payload = batch_result["payload"] aggregate_metrics = dict(payload["aggregate_metrics"]) results.append( { "name": experiment.name, "description": experiment.description, "params": params, "aggregate_metrics": aggregate_metrics, "score": float(aggregate_metrics.get(args.score_metric, 0.0)), "batch_id": batch_result["batch_id"], "batch_json_path": batch_result["batch_json_path"], "batch_report_path": batch_result["batch_report_path"], "candidate_config_path": str(candidate_config_path), "backend_health": health, "status": "ok", "source": "experiments_file", } ) print( f"[tune] {experiment.name} score={aggregate_metrics.get(args.score_metric)} " f"metrics={aggregate_metrics}" ) finally: if args.apply_best and results: best = max(results, key=lambda item: score_of(item, args.score_metric)) best_config = apply_target_params(base_config, args.target_path or "coarse_rank.fusion", best["params"]) write_yaml(CONFIG_PATH, best_config) run_restart(args.restart_targets) wait_for_backend(args.search_base_url) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) else: CONFIG_PATH.write_text(base_config_text, encoding="utf-8") run_restart(args.restart_targets) wait_for_backend(args.search_base_url) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) persist_run_summary( run_dir=run_dir, run_id=run_id, tenant_id=str(args.tenant_id), dataset_id=str(args.dataset_id), dataset_name=dataset.display_name, query_count=len(queries), top_k=args.top_k, metric=args.score_metric, trials=results, parameter_names=list(results[0]["params"].keys()) if results else [], ) print(f"[done] summary_json={run_dir / 'summary.json'}") print(f"[done] summary_md={run_dir / 'summary.md'}") def run_optimize_mode(args: argparse.Namespace) -> None: dataset = resolve_dataset( dataset_id=getattr(args, "dataset_id", None), query_file=Path(args.queries_file).resolve() if getattr(args, "queries_file", None) else None, tenant_id=str(args.tenant_id), language=str(args.language), ) args.dataset_id = dataset.dataset_id args.queries_file = str(dataset.query_file) args.tenant_id = dataset.tenant_id args.language = dataset.language queries_file = dataset.query_file queries = list(dataset.queries) base_config_text = CONFIG_PATH.read_text(encoding="utf-8") base_config = load_yaml(CONFIG_PATH) search_space_path = Path(args.search_space) space = load_search_space(search_space_path) rng = random.Random(args.random_seed) tuning_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "tuning_runs") run_dir = ( Path(args.resume_run).resolve() if args.resume_run else ensure_dir(tuning_dir / (args.run_name or f"coarse_fusion_bo_{utc_timestamp()}")) ) run_id = run_dir.name ensure_dir(run_dir / "configs") ensure_dir(run_dir / "logs") if not (run_dir / "search_space.yaml").exists(): (run_dir / "search_space.yaml").write_text(search_space_path.read_text(encoding="utf-8"), encoding="utf-8") trials = load_existing_trials(run_dir) if args.seed_report: baseline_params = space.fill_params(space.baseline) baseline_key = space.canonical_key(baseline_params) if baseline_key not in {space.canonical_key(item["params"]) for item in trials if item.get("params")}: payload = load_batch_payload(args.seed_report) payload_dataset_id = str(((payload.get("dataset") or {}).get("dataset_id")) or "") if payload_dataset_id and payload_dataset_id != str(args.dataset_id): raise RuntimeError( f"seed report dataset mismatch: expected={args.dataset_id} actual={payload_dataset_id}" ) trial = { "trial_id": next_trial_name(trials, "trial"), "name": "seed_baseline", "description": f"seeded from {args.seed_report}", "source": "seed_report", "is_seed": True, "status": "ok", "created_at": utc_now_iso(), "params": baseline_params, "score": float(payload["aggregate_metrics"].get(args.score_metric, 0.0)), "aggregate_metrics": dict(payload["aggregate_metrics"]), "batch_id": payload["batch_id"], "batch_json_path": str(resolve_batch_json_path(args.seed_report)), "batch_report_path": str(resolve_batch_json_path(args.seed_report).with_suffix(".md")), } append_trial(run_dir, trial) trials.append(trial) init_random = args.init_random if args.init_random is not None else space.init_random candidate_pool_size = args.candidate_pool_size if args.candidate_pool_size is not None else space.candidate_pool_size try: live_done = len(live_success_trials(trials)) while live_done < args.max_evals: remaining = args.max_evals - live_done current_batch_size = min(args.batch_size, remaining) proposals = propose_candidates( space=space, trials=trials, metric=args.score_metric, batch_size=current_batch_size, rng=rng, init_random=init_random, candidate_pool_size=candidate_pool_size, ) if not proposals: raise RuntimeError("optimizer failed to produce new candidate proposals") for proposal in proposals: force_refresh_labels = bool(args.force_refresh_labels_first_pass and live_done == 0 and not any(t.get("is_seed") for t in trials)) trial_id = next_trial_name(trials, "trial") candidate_config = apply_target_params(base_config, space.target_path, proposal.params) candidate_config_path = run_dir / "configs" / f"{trial_id}_{proposal.name}.yaml" trial_log_path = run_dir / "logs" / f"{trial_id}_{proposal.name}.log" write_yaml(CONFIG_PATH, candidate_config) write_yaml(candidate_config_path, candidate_config) print( f"[tune] start {proposal.name} source={proposal.source} " f"params={json.dumps(proposal.params, ensure_ascii=False, sort_keys=True)}" ) try: ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"restart {proposal.name}", ) run_restart(args.restart_targets) backend_health = wait_for_backend(args.search_base_url) verify_backend_config(args.search_base_url, space.target_path, proposal.params) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"batch eval {proposal.name}", ) batch_result = run_batch_eval( tenant_id=args.tenant_id, dataset_id=args.dataset_id, queries_file=queries_file, top_k=args.top_k, language=args.language, force_refresh_labels=force_refresh_labels, ) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"persist {proposal.name}", ) payload = batch_result["payload"] trial_log_path.write_text(batch_result["raw_output"], encoding="utf-8") aggregate_metrics = dict(payload["aggregate_metrics"]) trial = { "trial_id": trial_id, "name": proposal.name, "description": proposal.description, "source": proposal.source, "is_seed": False, "status": "ok", "created_at": utc_now_iso(), "params": proposal.params, "score": float(aggregate_metrics.get(args.score_metric, 0.0)), "aggregate_metrics": aggregate_metrics, "batch_id": batch_result["batch_id"], "batch_json_path": batch_result["batch_json_path"], "batch_report_path": batch_result["batch_report_path"], "candidate_config_path": str(candidate_config_path), "trial_log_path": str(trial_log_path), "backend_health": backend_health, } print( f"[tune] done {proposal.name} " f"{args.score_metric}={trial['score']:.6f} " f"Primary={aggregate_metrics.get('Primary_Metric_Score')}" ) except Exception as exc: # noqa: BLE001 trial = { "trial_id": trial_id, "name": proposal.name, "description": proposal.description, "source": proposal.source, "is_seed": False, "status": "error", "created_at": utc_now_iso(), "params": proposal.params, "error": str(exc), "candidate_config_path": str(candidate_config_path), "trial_log_path": str(trial_log_path), } print(f"[tune] error {proposal.name}: {exc}") ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"error-persist {proposal.name}", ) append_trial(run_dir, trial) trials.append(trial) ensure_disk_headroom( min_free_gb=args.min_free_gb, auto_truncate_logs=args.auto_truncate_logs, context=f"summary {proposal.name}", ) persist_run_summary( run_dir=run_dir, run_id=run_id, tenant_id=str(args.tenant_id), dataset_id=str(args.dataset_id), dataset_name=dataset.display_name, query_count=len(queries), top_k=args.top_k, metric=args.score_metric, trials=trials, parameter_names=space.parameter_names, ) if trial.get("status") == "ok": live_done += 1 if live_done >= args.max_evals: break finally: if args.apply_best: successes = all_success_trials(trials) best_live = max(successes, key=lambda item: score_of(item, args.score_metric)) if successes else None if best_live: best_config = apply_target_params(base_config, space.target_path, best_live["params"]) write_yaml(CONFIG_PATH, best_config) run_restart(args.restart_targets) wait_for_backend(args.search_base_url) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) else: CONFIG_PATH.write_text(base_config_text, encoding="utf-8") run_restart(args.restart_targets) wait_for_backend(args.search_base_url) if args.heal_eval_web: ensure_eval_web(args.eval_web_base_url) persist_run_summary( run_dir=run_dir, run_id=run_id, tenant_id=str(args.tenant_id), dataset_id=str(args.dataset_id), dataset_name=dataset.display_name, query_count=len(queries), top_k=args.top_k, metric=args.score_metric, trials=trials, parameter_names=space.parameter_names, ) print(f"[done] run_dir={run_dir}") print(f"[done] summary_json={run_dir / 'summary.json'}") print(f"[done] summary_md={run_dir / 'summary.md'}") print(f"[done] leaderboard_csv={run_dir / 'leaderboard.csv'}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Tune coarse/fusion params against the live backend with adaptive Bayesian-style search." ) parser.add_argument("--mode", choices=["optimize", "experiments"], default="optimize") parser.add_argument("--tenant-id", default="163") parser.add_argument("--dataset-id", default="core_queries") parser.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) parser.add_argument("--top-k", type=int, default=100) parser.add_argument("--language", default="en") parser.add_argument("--search-base-url", default="http://127.0.0.1:6002") parser.add_argument("--eval-web-base-url", default="http://127.0.0.1:6010") parser.add_argument("--score-metric", default="Primary_Metric_Score") parser.add_argument("--restart-targets", nargs="+", default=["backend"]) parser.add_argument("--heal-eval-web", action=argparse.BooleanOptionalAction, default=True) parser.add_argument("--force-refresh-labels-first-pass", action="store_true") parser.add_argument("--apply-best", action="store_true") parser.add_argument("--run-name", default=None) parser.add_argument("--experiments-file") parser.add_argument("--target-path", default="coarse_rank.fusion") parser.add_argument( "--search-space", default=str(PROJECT_ROOT / "scripts" / "evaluation" / "tuning" / "coarse_rank_fusion_space.yaml"), ) parser.add_argument("--seed-report", default=None) parser.add_argument("--resume-run", default=None) parser.add_argument("--max-evals", type=int, default=12) parser.add_argument("--batch-size", type=int, default=3) parser.add_argument("--init-random", type=int, default=None) parser.add_argument("--candidate-pool-size", type=int, default=None) parser.add_argument("--random-seed", type=int, default=20260415) parser.add_argument("--min-free-gb", type=float, default=5.0) parser.add_argument("--auto-truncate-logs", action=argparse.BooleanOptionalAction, default=True) return parser def main() -> None: args = build_parser().parse_args() if args.mode == "experiments": if not args.experiments_file: raise SystemExit("--experiments-file is required when --mode=experiments") run_experiment_mode(args) return run_optimize_mode(args) if __name__ == "__main__": main()