web_app.py
5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""FastAPI app for the search evaluation UI (static frontend + JSON APIs)."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from .api_models import BatchEvalRequest, SearchEvalRequest
from .constants import DEFAULT_QUERY_FILE
from .datasets import list_registered_datasets, resolve_dataset
from .framework import SearchEvaluationFramework
_STATIC_DIR = Path(__file__).resolve().parent / "static"
def create_web_app(framework: SearchEvaluationFramework, initial_dataset_id: str | None = None) -> FastAPI:
app = FastAPI(title="Search Evaluation UI", version="1.0.0")
current_dataset_id = initial_dataset_id or "core_queries"
app.mount(
"/static",
StaticFiles(directory=str(_STATIC_DIR)),
name="static",
)
index_path = _STATIC_DIR / "index.html"
@app.get("/", response_class=HTMLResponse)
def home() -> str:
return index_path.read_text(encoding="utf-8")
@app.get("/api/datasets")
def api_datasets() -> Dict[str, Any]:
stats_by_query = {item["query"]: item for item in framework.store.list_query_label_stats(framework.tenant_id)}
datasets = []
for item in list_registered_datasets(enabled_only=True):
snapshot = resolve_dataset(dataset_id=item.dataset_id, tenant_id=framework.tenant_id)
labeled_queries = sum(1 for query in snapshot.queries if (stats_by_query.get(query) or {}).get("total", 0) > 0)
datasets.append(
{
**snapshot.summary(),
"coverage_summary": {
"labeled_queries": labeled_queries,
"coverage_ratio": (labeled_queries / snapshot.query_count) if snapshot.query_count else 0.0,
},
}
)
return {"datasets": datasets, "current_dataset_id": current_dataset_id}
@app.get("/api/datasets/{dataset_id}/queries")
def api_dataset_queries(dataset_id: str) -> Dict[str, Any]:
dataset = resolve_dataset(dataset_id=dataset_id, tenant_id=framework.tenant_id, require_enabled=True)
return {"dataset": dataset.summary(), "queries": list(dataset.queries)}
@app.get("/api/queries")
def api_queries(dataset_id: str | None = None) -> Dict[str, Any]:
dataset = resolve_dataset(dataset_id=dataset_id or current_dataset_id, tenant_id=framework.tenant_id)
return {"dataset": dataset.summary(), "queries": list(dataset.queries)}
@app.post("/api/search-eval")
def api_search_eval(request: SearchEvalRequest) -> Dict[str, Any]:
dataset = resolve_dataset(
dataset_id=request.dataset_id or current_dataset_id,
tenant_id=framework.tenant_id,
language=request.language,
)
return framework.evaluate_live_query(
query=request.query,
top_k=request.top_k,
auto_annotate=request.auto_annotate,
language=dataset.language,
dataset=dataset,
)
@app.post("/api/batch-eval")
def api_batch_eval(request: BatchEvalRequest) -> Dict[str, Any]:
dataset = resolve_dataset(
dataset_id=request.dataset_id or current_dataset_id,
tenant_id=framework.tenant_id,
language=request.language,
)
queries = request.queries or list(dataset.queries)
if not queries:
raise HTTPException(status_code=400, detail="No queries provided")
return framework.batch_evaluate(
queries=queries,
dataset=dataset,
top_k=request.top_k,
auto_annotate=request.auto_annotate,
language=dataset.language,
force_refresh_labels=request.force_refresh_labels,
)
@app.get("/api/history")
def api_history(dataset_id: str | None = None, limit: int = 20) -> Dict[str, Any]:
effective_dataset_id = dataset_id or current_dataset_id
return {
"history": framework.store.list_batch_runs(limit=limit, dataset_id=effective_dataset_id),
"dataset_id": effective_dataset_id,
}
@app.get("/api/history/{batch_id}/report")
def api_history_report(batch_id: str) -> Dict[str, Any]:
row = framework.store.get_batch_run(batch_id)
if row is None:
raise HTTPException(status_code=404, detail="Unknown batch_id")
report_path = Path(row["report_markdown_path"]).resolve()
root = framework.artifact_root.resolve()
try:
report_path.relative_to(root)
except ValueError:
raise HTTPException(status_code=403, detail="Report path is outside artifact root")
if not report_path.is_file():
raise HTTPException(status_code=404, detail="Report file not found")
return {
"batch_id": row["batch_id"],
"created_at": row["created_at"],
"tenant_id": row["tenant_id"],
"dataset": row["metadata"].get("dataset") or {},
"report_markdown_path": str(report_path),
"markdown": report_path.read_text(encoding="utf-8"),
}
return app