"""SQLite persistence for evaluation corpus, labels, rerank scores, and run metadata.""" from __future__ import annotations import json import sqlite3 from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from .constants import VALID_LABELS from .utils import ensure_dir, safe_json_dumps, utc_now_iso @dataclass class QueryBuildResult: query: str tenant_id: str search_total: int search_depth: int rerank_corpus_size: int annotated_count: int output_json_path: Path def _compact_batch_metadata(metadata: Dict[str, Any]) -> Dict[str, Any]: return { "batch_id": metadata.get("batch_id"), "created_at": metadata.get("created_at"), "tenant_id": metadata.get("tenant_id"), "top_k": metadata.get("top_k"), "query_count": len(metadata.get("queries") or []), "aggregate_metrics": dict(metadata.get("aggregate_metrics") or {}), "metric_context": dict(metadata.get("metric_context") or {}), } class EvalStore: def __init__(self, db_path: Path): self.db_path = db_path ensure_dir(db_path.parent) self.conn = sqlite3.connect(str(db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row self._init_schema() def _init_schema(self) -> None: self.conn.executescript( """ CREATE TABLE IF NOT EXISTS corpus_docs ( tenant_id TEXT NOT NULL, spu_id TEXT NOT NULL, title_json TEXT, vendor_json TEXT, category_path_json TEXT, category_name_json TEXT, image_url TEXT, skus_json TEXT, tags_json TEXT, raw_json TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, spu_id) ); CREATE TABLE IF NOT EXISTS rerank_scores ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, spu_id TEXT NOT NULL, score REAL NOT NULL, model_name TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, spu_id) ); CREATE TABLE IF NOT EXISTS relevance_labels ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, spu_id TEXT NOT NULL, label TEXT NOT NULL, judge_model TEXT, raw_response TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, spu_id) ); CREATE TABLE IF NOT EXISTS build_runs ( run_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, output_json_path TEXT NOT NULL, metadata_json TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS batch_runs ( batch_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, output_json_path TEXT NOT NULL, report_markdown_path TEXT NOT NULL, config_snapshot_path TEXT NOT NULL, metadata_json TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS query_profiles ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, prompt_version TEXT NOT NULL, judge_model TEXT, profile_json TEXT NOT NULL, raw_response TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, prompt_version) ); """ ) self.conn.commit() def upsert_corpus_docs(self, tenant_id: str, docs: Sequence[Dict[str, Any]]) -> None: now = utc_now_iso() rows = [] for doc in docs: rows.append( ( tenant_id, str(doc.get("spu_id") or ""), safe_json_dumps(doc.get("title")), safe_json_dumps(doc.get("vendor")), safe_json_dumps(doc.get("category_path")), safe_json_dumps(doc.get("category_name")), str(doc.get("image_url") or ""), safe_json_dumps(doc.get("skus") or []), safe_json_dumps(doc.get("tags") or []), safe_json_dumps(doc), now, ) ) self.conn.executemany( """ INSERT INTO corpus_docs ( tenant_id, spu_id, title_json, vendor_json, category_path_json, category_name_json, image_url, skus_json, tags_json, raw_json, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, spu_id) DO UPDATE SET title_json=excluded.title_json, vendor_json=excluded.vendor_json, category_path_json=excluded.category_path_json, category_name_json=excluded.category_name_json, image_url=excluded.image_url, skus_json=excluded.skus_json, tags_json=excluded.tags_json, raw_json=excluded.raw_json, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_corpus_docs(self, tenant_id: str) -> List[Dict[str, Any]]: rows = self.conn.execute( "SELECT raw_json FROM corpus_docs WHERE tenant_id=? ORDER BY spu_id", (tenant_id,), ).fetchall() return [json.loads(row["raw_json"]) for row in rows] def get_corpus_docs_by_spu_ids(self, tenant_id: str, spu_ids: Sequence[str]) -> Dict[str, Dict[str, Any]]: keys = [str(spu_id) for spu_id in spu_ids if str(spu_id).strip()] if not keys: return {} placeholders = ",".join("?" for _ in keys) rows = self.conn.execute( f""" SELECT spu_id, raw_json FROM corpus_docs WHERE tenant_id=? AND spu_id IN ({placeholders}) """, [tenant_id, *keys], ).fetchall() return { str(row["spu_id"]): json.loads(row["raw_json"]) for row in rows } def has_corpus(self, tenant_id: str) -> bool: row = self.conn.execute( "SELECT COUNT(1) AS n FROM corpus_docs WHERE tenant_id=?", (tenant_id,), ).fetchone() return bool(row and row["n"] > 0) def get_rerank_scores(self, tenant_id: str, query_text: str) -> Dict[str, float]: rows = self.conn.execute( """ SELECT spu_id, score FROM rerank_scores WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchall() return {str(row["spu_id"]): float(row["score"]) for row in rows} def upsert_rerank_scores( self, tenant_id: str, query_text: str, scores: Dict[str, float], model_name: str, ) -> None: now = utc_now_iso() rows = [ (tenant_id, query_text, spu_id, float(score), model_name, now) for spu_id, score in scores.items() ] self.conn.executemany( """ INSERT INTO rerank_scores (tenant_id, query_text, spu_id, score, model_name, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET score=excluded.score, model_name=excluded.model_name, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_labels(self, tenant_id: str, query_text: str) -> Dict[str, str]: rows = self.conn.execute( """ SELECT spu_id, label FROM relevance_labels WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchall() return {str(row["spu_id"]): str(row["label"]) for row in rows} def upsert_labels( self, tenant_id: str, query_text: str, labels: Dict[str, str], judge_model: str, raw_response: str, ) -> None: now = utc_now_iso() rows = [] for spu_id, label in labels.items(): if label not in VALID_LABELS: raise ValueError(f"invalid label: {label}") rows.append((tenant_id, query_text, spu_id, label, judge_model, raw_response, now)) self.conn.executemany( """ INSERT INTO relevance_labels (tenant_id, query_text, spu_id, label, judge_model, raw_response, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET label=excluded.label, judge_model=excluded.judge_model, raw_response=excluded.raw_response, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_query_profile(self, tenant_id: str, query_text: str, prompt_version: str) -> Optional[Dict[str, Any]]: row = self.conn.execute( """ SELECT profile_json FROM query_profiles WHERE tenant_id=? AND query_text=? AND prompt_version=? """, (tenant_id, query_text, prompt_version), ).fetchone() if not row: return None return json.loads(row["profile_json"]) def upsert_query_profile( self, tenant_id: str, query_text: str, prompt_version: str, judge_model: str, profile: Dict[str, Any], raw_response: str, ) -> None: self.conn.execute( """ INSERT OR REPLACE INTO query_profiles (tenant_id, query_text, prompt_version, judge_model, profile_json, raw_response, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, query_text, prompt_version, judge_model, safe_json_dumps(profile), raw_response, utc_now_iso(), ), ) self.conn.commit() def insert_build_run(self, run_id: str, tenant_id: str, query_text: str, output_json_path: Path, metadata: Dict[str, Any]) -> None: self.conn.execute( """ INSERT OR REPLACE INTO build_runs (run_id, tenant_id, query_text, output_json_path, metadata_json, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (run_id, tenant_id, query_text, str(output_json_path), safe_json_dumps(metadata), utc_now_iso()), ) self.conn.commit() def insert_batch_run( self, batch_id: str, tenant_id: str, output_json_path: Path, report_markdown_path: Path, config_snapshot_path: Path, metadata: Dict[str, Any], ) -> None: self.conn.execute( """ INSERT OR REPLACE INTO batch_runs (batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( batch_id, tenant_id, str(output_json_path), str(report_markdown_path), str(config_snapshot_path), safe_json_dumps(metadata), utc_now_iso(), ), ) self.conn.commit() def list_batch_runs(self, limit: int = 20) -> List[Dict[str, Any]]: rows = self.conn.execute( """ SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at FROM batch_runs ORDER BY created_at DESC LIMIT ? """, (limit,), ).fetchall() items: List[Dict[str, Any]] = [] for row in rows: metadata = json.loads(row["metadata_json"]) items.append( { "batch_id": row["batch_id"], "tenant_id": row["tenant_id"], "output_json_path": row["output_json_path"], "report_markdown_path": row["report_markdown_path"], "config_snapshot_path": row["config_snapshot_path"], "metadata": _compact_batch_metadata(metadata), "created_at": row["created_at"], } ) return items def get_batch_run(self, batch_id: str) -> Optional[Dict[str, Any]]: row = self.conn.execute( """ SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at FROM batch_runs WHERE batch_id = ? """, (batch_id,), ).fetchone() if row is None: return None return { "batch_id": row["batch_id"], "tenant_id": row["tenant_id"], "output_json_path": row["output_json_path"], "report_markdown_path": row["report_markdown_path"], "config_snapshot_path": row["config_snapshot_path"], "metadata": json.loads(row["metadata_json"]), "created_at": row["created_at"], } def list_query_label_stats(self, tenant_id: str) -> List[Dict[str, Any]]: rows = self.conn.execute( """ SELECT query_text, COUNT(*) AS total, SUM(CASE WHEN label='Fully Relevant' THEN 1 ELSE 0 END) AS exact_count, SUM(CASE WHEN label='Mostly Relevant' THEN 1 ELSE 0 END) AS high_relevant_count, SUM(CASE WHEN label='Weakly Relevant' THEN 1 ELSE 0 END) AS low_relevant_count, SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, MAX(updated_at) AS updated_at FROM relevance_labels WHERE tenant_id=? GROUP BY query_text ORDER BY query_text """, (tenant_id,), ).fetchall() return [ { "query": str(row["query_text"]), "total": int(row["total"]), "exact_count": int(row["exact_count"] or 0), "high_relevant_count": int(row["high_relevant_count"] or 0), "low_relevant_count": int(row["low_relevant_count"] or 0), "irrelevant_count": int(row["irrelevant_count"] or 0), "updated_at": row["updated_at"], } for row in rows ] def get_query_label_stats(self, tenant_id: str, query_text: str) -> Dict[str, Any]: row = self.conn.execute( """ SELECT COUNT(*) AS total, SUM(CASE WHEN label='Fully Relevant' THEN 1 ELSE 0 END) AS exact_count, SUM(CASE WHEN label='Mostly Relevant' THEN 1 ELSE 0 END) AS high_relevant_count, SUM(CASE WHEN label='Weakly Relevant' THEN 1 ELSE 0 END) AS low_relevant_count, SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, MAX(updated_at) AS updated_at FROM relevance_labels WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchone() return { "query": query_text, "total": int((row["total"] or 0) if row else 0), "exact_count": int((row["exact_count"] or 0) if row else 0), "high_relevant_count": int((row["high_relevant_count"] or 0) if row else 0), "low_relevant_count": int((row["low_relevant_count"] or 0) if row else 0), "irrelevant_count": int((row["irrelevant_count"] or 0) if row else 0), "updated_at": row["updated_at"] if row else None, }