reports.py
4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Markdown and text reports for batch evaluation."""
from __future__ import annotations
from typing import Any, Dict
from .constants import RELEVANCE_GAIN_MAP, RELEVANCE_LV0, RELEVANCE_LV1, RELEVANCE_LV2, RELEVANCE_LV3
from .metrics import PRIMARY_METRIC_KEYS
def _append_metric_block(lines: list[str], metrics: Dict[str, Any]) -> None:
primary_keys = (
"Primary_Metric_Score",
*PRIMARY_METRIC_KEYS,
"ERR@10",
)
included = set()
for key in primary_keys:
if key in metrics:
lines.append(f"- {key}: {metrics[key]}")
included.add(key)
for key, value in sorted(metrics.items()):
if key in included:
continue
lines.append(f"- {key}: {value}")
def _label_level_code(label: str) -> str:
grade = RELEVANCE_GAIN_MAP.get(label)
return f"L{grade}" if grade is not None else "?"
def _append_case_snapshot(lines: list[str], item: Dict[str, Any]) -> None:
request_id = str(item.get("request_id") or "").strip()
if request_id:
lines.append(f"- Request ID: `{request_id}`")
seq10 = str(item.get("top_label_sequence_top10") or "").strip()
if seq10:
lines.append(f"- Top-10 Labels: `{seq10}`")
seq20 = str(item.get("top_label_sequence_top20") or "").strip()
if seq20 and seq20 != seq10:
lines.append(f"- Top-20 Labels: `{seq20}`")
top_results = item.get("top_results") or []
if not top_results:
return
lines.append("- Case Snapshot:")
for result in top_results[:5]:
rank = int(result.get("rank") or 0)
label = _label_level_code(str(result.get("label") or ""))
spu_id = str(result.get("spu_id") or "")
title = str(result.get("title") or "")
title_zh = str(result.get("title_zh") or "")
relevance_score = result.get("relevance_score")
score_suffix = f" (rel={relevance_score})" if relevance_score not in (None, "") else ""
lines.append(f" - #{rank} [{label}] spu={spu_id} {title}{score_suffix}")
if title_zh:
lines.append(f" zh: {title_zh}")
def render_batch_report_markdown(payload: Dict[str, Any]) -> str:
lines = [
"# Search Batch Evaluation",
"",
f"- Batch ID: {payload['batch_id']}",
f"- Created at: {payload['created_at']}",
f"- Tenant ID: {payload['tenant_id']}",
f"- Query count: {len(payload['queries'])}",
f"- Top K: {payload['top_k']}",
"",
"## Aggregate Metrics",
"",
]
metric_context = payload.get("metric_context") or {}
if metric_context:
lines.extend(
[
f"- Primary metric: {metric_context.get('primary_metric', 'N/A')}",
f"- Gain scheme (NDCG): {metric_context.get('gain_scheme', {})}",
f"- Stop probabilities (ERR): {metric_context.get('stop_prob_scheme', {})}",
"",
]
)
_append_metric_block(lines, payload.get("aggregate_metrics") or {})
distribution = payload.get("aggregate_distribution") or {}
if distribution:
lines.extend(
[
"",
"## Label Distribution",
"",
f"- Fully Relevant: {distribution.get(RELEVANCE_LV3, 0)}",
f"- Mostly Relevant: {distribution.get(RELEVANCE_LV2, 0)}",
f"- Weakly Relevant: {distribution.get(RELEVANCE_LV1, 0)}",
f"- Irrelevant: {distribution.get(RELEVANCE_LV0, 0)}",
]
)
lines.extend(["", "## Per Query", ""])
for item in payload.get("per_query") or []:
lines.append(f"### {item['query']}")
lines.append("")
_append_metric_block(lines, item.get("metrics") or {})
distribution = item.get("distribution") or {}
lines.append(f"- Fully Relevant: {distribution.get(RELEVANCE_LV3, 0)}")
lines.append(f"- Mostly Relevant: {distribution.get(RELEVANCE_LV2, 0)}")
lines.append(f"- Weakly Relevant: {distribution.get(RELEVANCE_LV1, 0)}")
lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_LV0, 0)}")
_append_case_snapshot(lines, item)
lines.append("")
return "\n".join(lines)