"""CLI: build annotations, batch eval, audit, serve web UI.""" from __future__ import annotations import argparse import json import logging import shutil from pathlib import Path from typing import Any, Dict from config.loader import get_app_config from .datasets import audits_dir, query_builds_dir, resolve_dataset from .framework import SearchEvaluationFramework from .logging_setup import setup_eval_logging from .utils import ensure_dir, utc_now_iso, utc_timestamp from .web_app import create_web_app _cli_log = logging.getLogger("search_eval.cli") def _reset_build_artifacts(dataset_id: str) -> None: artifact_root = get_app_config().search_evaluation.artifact_root removed = [] dataset_query_builds = query_builds_dir(artifact_root, dataset_id) dataset_audits = audits_dir(artifact_root, dataset_id) if dataset_query_builds.exists(): shutil.rmtree(dataset_query_builds) removed.append(str(dataset_query_builds)) if dataset_audits.exists(): shutil.rmtree(dataset_audits) removed.append(str(dataset_audits)) if removed: _cli_log.info("[build] reset dataset artifacts for %s: %s", dataset_id, ", ".join(removed)) else: _cli_log.info("[build] no previous dataset artifacts to reset under %s for dataset=%s", artifact_root, dataset_id) def add_judge_llm_args(p: argparse.ArgumentParser) -> None: p.add_argument( "--judge-model", default=None, metavar="MODEL", help="Judge LLM model (default: config.yaml search_evaluation.judge_model).", ) p.add_argument( "--enable-thinking", action=argparse.BooleanOptionalAction, default=None, help="enable_thinking for DashScope (default: search_evaluation.judge_enable_thinking).", ) p.add_argument( "--dashscope-batch", action=argparse.BooleanOptionalAction, default=None, help="DashScope Batch File API vs sync chat (default: search_evaluation.judge_dashscope_batch).", ) def add_intent_llm_args(p: argparse.ArgumentParser) -> None: p.add_argument( "--intent-model", default=None, metavar="MODEL", help="Query-intent LLM model before relevance judging (default: search_evaluation.intent_model).", ) p.add_argument( "--intent-enable-thinking", action=argparse.BooleanOptionalAction, default=None, help="enable_thinking for intent model (default: search_evaluation.intent_enable_thinking).", ) def framework_kwargs_from_args(args: argparse.Namespace) -> Dict[str, Any]: kw: Dict[str, Any] = {} if args.judge_model is not None: kw["judge_model"] = args.judge_model if args.enable_thinking is not None: kw["enable_thinking"] = args.enable_thinking if args.dashscope_batch is not None: kw["use_dashscope_batch"] = args.dashscope_batch if getattr(args, "intent_model", None) is not None: kw["intent_model"] = args.intent_model if getattr(args, "intent_enable_thinking", None) is not None: kw["intent_enable_thinking"] = args.intent_enable_thinking return kw def _apply_search_evaluation_cli_defaults(args: argparse.Namespace) -> None: """Fill None CLI defaults from ``config.yaml`` ``search_evaluation`` (via ``get_app_config()``).""" se = get_app_config().search_evaluation if getattr(args, "dataset_id", None) in (None, "") and getattr(args, "queries_file", None) in (None, ""): args.dataset_id = se.default_dataset_id if getattr(args, "tenant_id", None) in (None, ""): args.tenant_id = se.default_tenant_id if getattr(args, "queries_file", None) in (None, ""): args.queries_file = str(se.queries_file) if getattr(args, "language", None) in (None, ""): args.language = se.default_language if args.command == "serve": if getattr(args, "host", None) in (None, ""): args.host = se.web_host if getattr(args, "port", None) is None: args.port = se.web_port if args.command == "batch": if getattr(args, "top_k", None) is None: args.top_k = se.batch_top_k if args.command == "audit": if getattr(args, "top_k", None) is None: args.top_k = se.audit_top_k if getattr(args, "limit_suspicious", None) is None: args.limit_suspicious = se.audit_limit_suspicious if args.command == "build": if getattr(args, "search_depth", None) is None: args.search_depth = se.build_search_depth if getattr(args, "rerank_depth", None) is None: args.rerank_depth = se.build_rerank_depth if getattr(args, "annotate_search_top_k", None) is None: args.annotate_search_top_k = se.annotate_search_top_k if getattr(args, "annotate_rerank_top_k", None) is None: args.annotate_rerank_top_k = se.annotate_rerank_top_k if getattr(args, "search_recall_top_k", None) is None: args.search_recall_top_k = se.search_recall_top_k if getattr(args, "rerank_high_threshold", None) is None: args.rerank_high_threshold = se.rerank_high_threshold if getattr(args, "rerank_high_skip_count", None) is None: args.rerank_high_skip_count = se.rerank_high_skip_count if getattr(args, "rebuild_llm_batch_size", None) is None: args.rebuild_llm_batch_size = se.rebuild_llm_batch_size if getattr(args, "rebuild_min_batches", None) is None: args.rebuild_min_batches = se.rebuild_min_llm_batches if getattr(args, "rebuild_max_batches", None) is None: args.rebuild_max_batches = se.rebuild_max_llm_batches if getattr(args, "rebuild_irrelevant_stop_ratio", None) is None: args.rebuild_irrelevant_stop_ratio = se.rebuild_irrelevant_stop_ratio if getattr(args, "rebuild_irrel_low_combined_stop_ratio", None) is None: args.rebuild_irrel_low_combined_stop_ratio = se.rebuild_irrel_low_combined_stop_ratio if getattr(args, "rebuild_irrelevant_stop_streak", None) is None: args.rebuild_irrelevant_stop_streak = se.rebuild_irrelevant_stop_streak def _resolve_dataset_from_args(args: argparse.Namespace, *, require_enabled: bool = False): queries_file = getattr(args, "queries_file", None) query_path = Path(str(queries_file)).resolve() if queries_file not in (None, "") else None dataset = resolve_dataset( dataset_id=getattr(args, "dataset_id", None), query_file=query_path, tenant_id=getattr(args, "tenant_id", None), language=getattr(args, "language", None), require_enabled=require_enabled, ) args.dataset_id = dataset.dataset_id args.queries_file = str(dataset.query_file) args.tenant_id = dataset.tenant_id args.language = dataset.language return dataset def build_cli_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Search evaluation annotation builder and web UI") sub = parser.add_subparsers(dest="command", required=True) build = sub.add_parser("build", help="Build pooled annotation set for queries") build.add_argument( "--tenant-id", default=None, help="Tenant id (default: search_evaluation.default_tenant_id in config.yaml).", ) build.add_argument("--dataset-id", default=None, help="Named evaluation dataset id from config.yaml.") build.add_argument( "--queries-file", default=None, help="Legacy override for query list file. Prefer --dataset-id.", ) build.add_argument( "--search-depth", type=int, default=None, help="Default: search_evaluation.build_search_depth.", ) build.add_argument( "--rerank-depth", type=int, default=None, help="Default: search_evaluation.build_rerank_depth.", ) build.add_argument( "--annotate-search-top-k", type=int, default=None, help="Default: search_evaluation.annotate_search_top_k.", ) build.add_argument( "--annotate-rerank-top-k", type=int, default=None, help="Default: search_evaluation.annotate_rerank_top_k.", ) build.add_argument( "--search-recall-top-k", type=int, default=None, help="Rebuild mode only: top-K search hits enter recall pool with score 1 (default when --force-refresh-labels: 200).", ) build.add_argument( "--rerank-high-threshold", type=float, default=None, help="Rebuild only: count rerank scores above this on non-pool docs (default 0.5).", ) build.add_argument( "--rerank-high-skip-count", type=int, default=None, help="Rebuild only: skip query if more than this many non-pool docs have rerank score > threshold (default 1000).", ) build.add_argument("--rebuild-llm-batch-size", type=int, default=None, help="Rebuild only: LLM batch size (default 50).") build.add_argument("--rebuild-min-batches", type=int, default=None, help="Rebuild only: min LLM batches before early stop (default 10).") build.add_argument("--rebuild-max-batches", type=int, default=None, help="Rebuild only: max LLM batches (default 40).") build.add_argument( "--rebuild-irrelevant-stop-ratio", type=float, default=None, help="Rebuild only: bad batch requires irrelevant_ratio > this (default: search_evaluation.rebuild_irrelevant_stop_ratio).", ) build.add_argument( "--rebuild-irrel-low-combined-stop-ratio", type=float, default=None, help="Rebuild only: bad batch requires (irrelevant+low)/n > this (default 0.959).", ) build.add_argument( "--rebuild-irrelevant-stop-streak", type=int, default=None, help="Rebuild only: consecutive bad batches (both thresholds strict >) before early stop (default 3).", ) build.add_argument( "--language", default=None, help="Default: search_evaluation.default_language.", ) build.add_argument( "--reset-artifacts", action="store_true", help="Delete dataset-specific query_builds/audits before starting. Shared SQLite cache is preserved.", ) build.add_argument("--force-refresh-rerank", action="store_true") build.add_argument("--force-refresh-labels", action="store_true") add_judge_llm_args(build) add_intent_llm_args(build) batch = sub.add_parser("batch", help="Run batch evaluation against live search") batch.add_argument("--tenant-id", default=None, help="Default: search_evaluation.default_tenant_id.") batch.add_argument("--dataset-id", default=None, help="Named evaluation dataset id from config.yaml.") batch.add_argument("--queries-file", default=None, help="Legacy override for query list file. Prefer --dataset-id.") batch.add_argument("--top-k", type=int, default=None, help="Default: search_evaluation.batch_top_k.") batch.add_argument("--language", default=None, help="Default: search_evaluation.default_language.") batch.add_argument("--force-refresh-labels", action="store_true") add_judge_llm_args(batch) add_intent_llm_args(batch) audit = sub.add_parser("audit", help="Audit annotation quality for queries") audit.add_argument("--tenant-id", default=None, help="Default: search_evaluation.default_tenant_id.") audit.add_argument("--dataset-id", default=None, help="Named evaluation dataset id from config.yaml.") audit.add_argument("--queries-file", default=None, help="Legacy override for query list file. Prefer --dataset-id.") audit.add_argument("--top-k", type=int, default=None, help="Default: search_evaluation.audit_top_k.") audit.add_argument("--language", default=None, help="Default: search_evaluation.default_language.") audit.add_argument( "--limit-suspicious", type=int, default=None, help="Default: search_evaluation.audit_limit_suspicious.", ) audit.add_argument("--force-refresh-labels", action="store_true") add_judge_llm_args(audit) add_intent_llm_args(audit) serve = sub.add_parser("serve", help="Serve evaluation web UI on port 6010") serve.add_argument("--tenant-id", default=None, help="Default: search_evaluation.default_tenant_id.") serve.add_argument("--dataset-id", default=None, help="Initial evaluation dataset id from config.yaml.") serve.add_argument("--queries-file", default=None, help="Legacy initial query file override. Prefer --dataset-id.") serve.add_argument("--host", default=None, help="Default: search_evaluation.web_host.") serve.add_argument("--port", type=int, default=None, help="Default: search_evaluation.web_port.") add_judge_llm_args(serve) add_intent_llm_args(serve) return parser def run_build(args: argparse.Namespace) -> None: dataset = _resolve_dataset_from_args(args) if args.reset_artifacts: _reset_build_artifacts(dataset.dataset_id) framework = SearchEvaluationFramework(tenant_id=args.tenant_id, **framework_kwargs_from_args(args)) queries = list(dataset.queries) summary = [] rebuild_kwargs = {} if args.force_refresh_labels: rebuild_kwargs = { "search_recall_top_k": args.search_recall_top_k, "rerank_high_threshold": args.rerank_high_threshold, "rerank_high_skip_count": args.rerank_high_skip_count, "rebuild_llm_batch_size": args.rebuild_llm_batch_size, "rebuild_min_batches": args.rebuild_min_batches, "rebuild_max_batches": args.rebuild_max_batches, "rebuild_irrelevant_stop_ratio": args.rebuild_irrelevant_stop_ratio, "rebuild_irrel_low_combined_stop_ratio": args.rebuild_irrel_low_combined_stop_ratio, "rebuild_irrelevant_stop_streak": args.rebuild_irrelevant_stop_streak, } total_q = len(queries) for q_index, query in enumerate(queries, start=1): _cli_log.info("[build] (%s/%s) starting query=%r", q_index, total_q, query) try: result = framework.build_query_annotation_set( query=query, dataset=dataset, search_depth=args.search_depth, rerank_depth=args.rerank_depth, annotate_search_top_k=args.annotate_search_top_k, annotate_rerank_top_k=args.annotate_rerank_top_k, language=args.language, force_refresh_rerank=args.force_refresh_rerank, force_refresh_labels=args.force_refresh_labels, **rebuild_kwargs, ) except Exception: _cli_log.exception("[build] failed query=%r index=%s/%s", query, q_index, total_q) raise summary.append( { "query": result.query, "search_total": result.search_total, "search_depth": result.search_depth, "rerank_corpus_size": result.rerank_corpus_size, "annotated_count": result.annotated_count, "output_json_path": str(result.output_json_path), } ) _cli_log.info( "[build] query=%r search_total=%s search_depth=%s corpus=%s annotated=%s output=%s", result.query, result.search_total, result.search_depth, result.rerank_corpus_size, result.annotated_count, result.output_json_path, ) out_path = ensure_dir(framework.artifact_root / "query_builds") / f"build_summary_{utc_timestamp()}.json" out_path = query_builds_dir(framework.artifact_root, dataset.dataset_id) / f"build_summary_{utc_timestamp()}.json" out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") _cli_log.info("[done] summary=%s", out_path) def run_batch(args: argparse.Namespace) -> None: dataset = _resolve_dataset_from_args(args, require_enabled=True) framework = SearchEvaluationFramework(tenant_id=args.tenant_id, **framework_kwargs_from_args(args)) queries = list(dataset.queries) _cli_log.info("[batch] dataset_id=%s queries_file=%s count=%s", dataset.dataset_id, args.queries_file, len(queries)) try: payload = framework.batch_evaluate( queries=queries, dataset=dataset, top_k=args.top_k, auto_annotate=True, language=args.language, force_refresh_labels=args.force_refresh_labels, ) except Exception: _cli_log.exception("[batch] failed while evaluating query list from %s", args.queries_file) raise _cli_log.info("[done] batch_id=%s aggregate_metrics=%s", payload["batch_id"], payload["aggregate_metrics"]) def run_audit(args: argparse.Namespace) -> None: dataset = _resolve_dataset_from_args(args, require_enabled=True) framework = SearchEvaluationFramework(tenant_id=args.tenant_id, **framework_kwargs_from_args(args)) queries = list(dataset.queries) audit_items = [] for query in queries: item = framework.audit_live_query( query=query, top_k=args.top_k, language=args.language, auto_annotate=not args.force_refresh_labels, ) if args.force_refresh_labels: live_payload = framework.search_client.search(query=query, size=max(args.top_k, 100), from_=0, language=args.language) framework.annotate_missing_labels( query=query, docs=list(live_payload.get("results") or [])[: args.top_k], force_refresh=True, ) item = framework.audit_live_query( query=query, top_k=args.top_k, language=args.language, auto_annotate=False, ) audit_items.append( { "query": query, "metrics": item["metrics"], "distribution": item["distribution"], "suspicious_count": len(item["suspicious"]), "suspicious_examples": item["suspicious"][: args.limit_suspicious], } ) _cli_log.info( "[audit] query=%r suspicious=%s metrics=%s", query, len(item["suspicious"]), item["metrics"], ) summary = { "created_at": utc_now_iso(), "tenant_id": args.tenant_id, "dataset": dataset.summary(), "top_k": args.top_k, "query_count": len(queries), "total_suspicious": sum(item["suspicious_count"] for item in audit_items), "queries": audit_items, } out_path = audits_dir(framework.artifact_root, dataset.dataset_id) / f"audit_{utc_timestamp()}.json" out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") _cli_log.info("[done] audit=%s", out_path) def run_serve(args: argparse.Namespace) -> None: dataset = _resolve_dataset_from_args(args, require_enabled=True) framework = SearchEvaluationFramework(tenant_id=args.tenant_id, **framework_kwargs_from_args(args)) app = create_web_app(framework, initial_dataset_id=dataset.dataset_id) import uvicorn uvicorn.run(app, host=args.host, port=args.port, log_level="info") def main() -> None: se = get_app_config().search_evaluation log_file = setup_eval_logging(se.eval_log_dir) parser = build_cli_parser() args = parser.parse_args() _apply_search_evaluation_cli_defaults(args) logging.getLogger("search_eval").info( "CLI start command=%s tenant_id=%s log_file=%s", args.command, getattr(args, "tenant_id", ""), log_file.resolve(), ) if args.command == "build": run_build(args) return if args.command == "batch": run_batch(args) return if args.command == "audit": run_audit(args) return if args.command == "serve": run_serve(args) return raise SystemExit(f"unknown command: {args.command}")