"""Markdown and text reports for batch evaluation.""" from __future__ import annotations from typing import Any, Dict from .constants import RELEVANCE_GAIN_MAP, RELEVANCE_LV0, RELEVANCE_LV1, RELEVANCE_LV2, RELEVANCE_LV3 from .metrics import PRIMARY_METRIC_KEYS def _append_metric_block(lines: list[str], metrics: Dict[str, Any]) -> None: primary_keys = ( "Primary_Metric_Score", *PRIMARY_METRIC_KEYS, "ERR@10", ) included = set() for key in primary_keys: if key in metrics: lines.append(f"- {key}: {metrics[key]}") included.add(key) for key, value in sorted(metrics.items()): if key in included: continue lines.append(f"- {key}: {value}") def _label_level_code(label: str) -> str: grade = RELEVANCE_GAIN_MAP.get(label) return f"L{grade}" if grade is not None else "?" def _append_case_snapshot(lines: list[str], item: Dict[str, Any]) -> None: request_id = str(item.get("request_id") or "").strip() if request_id: lines.append(f"- Request ID: `{request_id}`") seq10 = str(item.get("top_label_sequence_top10") or "").strip() if seq10: lines.append(f"- Top-10 Labels: `{seq10}`") seq20 = str(item.get("top_label_sequence_top20") or "").strip() if seq20 and seq20 != seq10: lines.append(f"- Top-20 Labels: `{seq20}`") top_results = item.get("top_results") or [] if not top_results: return lines.append("- Case Snapshot:") for result in top_results[:5]: rank = int(result.get("rank") or 0) label = _label_level_code(str(result.get("label") or "")) spu_id = str(result.get("spu_id") or "") title = str(result.get("title") or "") title_zh = str(result.get("title_zh") or "") relevance_score = result.get("relevance_score") score_suffix = f" (rel={relevance_score})" if relevance_score not in (None, "") else "" lines.append(f" - #{rank} [{label}] spu={spu_id} {title}{score_suffix}") if title_zh: lines.append(f" zh: {title_zh}") def render_batch_report_markdown(payload: Dict[str, Any]) -> str: lines = [ "# Search Batch Evaluation", "", f"- Batch ID: {payload['batch_id']}", f"- Created at: {payload['created_at']}", f"- Tenant ID: {payload['tenant_id']}", f"- Query count: {len(payload['queries'])}", f"- Top K: {payload['top_k']}", "", "## Aggregate Metrics", "", ] metric_context = payload.get("metric_context") or {} if metric_context: lines.extend( [ f"- Primary metric: {metric_context.get('primary_metric', 'N/A')}", f"- Gain scheme (NDCG): {metric_context.get('gain_scheme', {})}", f"- Stop probabilities (ERR): {metric_context.get('stop_prob_scheme', {})}", "", ] ) _append_metric_block(lines, payload.get("aggregate_metrics") or {}) distribution = payload.get("aggregate_distribution") or {} if distribution: lines.extend( [ "", "## Label Distribution", "", f"- Fully Relevant: {distribution.get(RELEVANCE_LV3, 0)}", f"- Mostly Relevant: {distribution.get(RELEVANCE_LV2, 0)}", f"- Weakly Relevant: {distribution.get(RELEVANCE_LV1, 0)}", f"- Irrelevant: {distribution.get(RELEVANCE_LV0, 0)}", ] ) lines.extend(["", "## Per Query", ""]) for item in payload.get("per_query") or []: lines.append(f"### {item['query']}") lines.append("") _append_metric_block(lines, item.get("metrics") or {}) distribution = item.get("distribution") or {} lines.append(f"- Fully Relevant: {distribution.get(RELEVANCE_LV3, 0)}") lines.append(f"- Mostly Relevant: {distribution.get(RELEVANCE_LV2, 0)}") lines.append(f"- Weakly Relevant: {distribution.get(RELEVANCE_LV1, 0)}") lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_LV0, 0)}") _append_case_snapshot(lines, item) lines.append("") return "\n".join(lines)