diff --git a/scripts/evaluation/README.md b/scripts/evaluation/README.md index fc5fa2e..1f9cdab 100644 --- a/scripts/evaluation/README.md +++ b/scripts/evaluation/README.md @@ -19,12 +19,12 @@ The framework supports four related tasks: ## Files -- `eval_framework.py` - Search evaluation core implementation, CLI, FastAPI app, SQLite store, audit logic, and report generation. +- `eval_framework/` (Python package) + Modular layout: `framework.py` (orchestration), `store.py` (SQLite), `clients.py` (search/rerank/LLM), `prompts.py` (judge templates), `metrics.py`, `reports.py`, `web_app.py`, `cli.py`, and `static/` (evaluation UI HTML/CSS/JS). - `build_annotation_set.py` - Thin CLI entrypoint into `eval_framework.py`. + Thin CLI entrypoint into `eval_framework`. - `serve_eval_web.py` - Thin web entrypoint into `eval_framework.py`. + Thin web entrypoint into `eval_framework`. - `tune_fusion.py` Fusion experiment runner. It applies config variants, restarts backend, runs batch evaluation, and stores experiment reports. - `fusion_experiments_shortlist.json` diff --git a/scripts/evaluation/eval_framework.py b/scripts/evaluation/eval_framework.py deleted file mode 100644 index 834b566..0000000 --- a/scripts/evaluation/eval_framework.py +++ /dev/null @@ -1,2140 +0,0 @@ -#!/usr/bin/env python3 -""" -Search evaluation framework for pooled relevance annotation, live metrics, and reports. -""" - -from __future__ import annotations - -import argparse -import hashlib -import json -import math -import os -import re -import sqlite3 -import sys -import time -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple - -import requests -from elasticsearch.helpers import scan -from fastapi import FastAPI, HTTPException -from fastapi.responses import HTMLResponse -from pydantic import BaseModel, Field - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(PROJECT_ROOT)) - -from api.app import get_app_config, get_es_client, get_query_parser, init_service -from indexer.mapping_generator import get_tenant_index_name - - -RELEVANCE_EXACT = "Exact" -RELEVANCE_PARTIAL = "Partial" -RELEVANCE_IRRELEVANT = "Irrelevant" -VALID_LABELS = {RELEVANCE_EXACT, RELEVANCE_PARTIAL, RELEVANCE_IRRELEVANT} -DEFAULT_ARTIFACT_ROOT = PROJECT_ROOT / "artifacts" / "search_evaluation" -DEFAULT_QUERY_FILE = PROJECT_ROOT / "scripts" / "evaluation" / "queries" / "queries.txt" -JUDGE_PROMPT_VERSION_SIMPLE = "v3_simple_20260331" -JUDGE_PROMPT_VERSION_COMPLEX = "v2_structured_20260331" -DEFAULT_LABELER_MODE = "simple" - - -def utc_now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def utc_timestamp() -> str: - return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") - - -def ensure_dir(path: Path) -> Path: - path.mkdir(parents=True, exist_ok=True) - return path - - -def sha1_text(text: str) -> str: - return hashlib.sha1(text.encode("utf-8")).hexdigest() - - -def pick_text(value: Any, preferred_lang: str = "en") -> str: - if value is None: - return "" - if isinstance(value, dict): - return str( - value.get(preferred_lang) - or value.get("en") - or value.get("zh") - or next((v for v in value.values() if v), "") - ).strip() - return str(value).strip() - - -def safe_json_dumps(data: Any) -> str: - return json.dumps(data, ensure_ascii=False, separators=(",", ":")) - - -def compact_option_values(skus: Sequence[Dict[str, Any]]) -> Tuple[str, str, str]: - if not skus: - return "", "", "" - first = skus[0] or {} - return ( - str(first.get("option1_value") or "").strip(), - str(first.get("option2_value") or "").strip(), - str(first.get("option3_value") or "").strip(), - ) - - -def build_display_title(doc: Dict[str, Any]) -> str: - title = doc.get("title") - en = pick_text(title, "en") - zh = pick_text(title, "zh") - if en and zh and en != zh: - return f"{en} / {zh}" - return en or zh - - -def build_rerank_doc(doc: Dict[str, Any]) -> str: - title = build_display_title(doc) - return title[:400] - - -def build_label_doc_line(idx: int, doc: Dict[str, Any]) -> str: - title = build_display_title(doc) - option1, option2, option3 = compact_option_values(doc.get("skus") or []) - vendor = pick_text(doc.get("vendor"), "en") - category = pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en") - tags = doc.get("tags") or [] - tags_text = ", ".join(str(tag) for tag in tags[:4] if tag) - parts = [title] - if option1: - parts.append(f"option1={option1}") - if option2: - parts.append(f"option2={option2}") - if option3: - parts.append(f"option3={option3}") - if vendor: - parts.append(f"vendor={vendor}") - if category: - parts.append(f"category={category}") - if tags_text: - parts.append(f"tags={tags_text}") - return f"{idx}. " + " | ".join(part for part in parts if part) - - -def compact_product_payload(doc: Dict[str, Any]) -> Dict[str, Any]: - return { - "spu_id": str(doc.get("spu_id") or ""), - "title": build_display_title(doc), - "image_url": doc.get("image_url"), - "vendor": pick_text(doc.get("vendor"), "en"), - "category": pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en"), - "option_values": list(compact_option_values(doc.get("skus") or [])), - "tags": list((doc.get("tags") or [])[:6]), - } - - -def normalize_text(text: Any) -> str: - value = str(text or "").strip().lower() - value = re.sub(r"\s+", " ", value) - return value - - -def _extract_json_blob(text: str) -> Any: - cleaned = str(text or "").strip() - candidates: List[str] = [cleaned] - fence_matches = re.findall(r"```(?:json)?\s*(.*?)```", cleaned, flags=re.S | re.I) - candidates.extend(match.strip() for match in fence_matches if match.strip()) - - for candidate in candidates: - try: - return json.loads(candidate) - except Exception: - pass - - starts = [idx for idx, ch in enumerate(cleaned) if ch in "[{"] - ends = [idx for idx, ch in enumerate(cleaned) if ch in "]}"] - for start in starts: - for end in reversed(ends): - if end <= start: - continue - fragment = cleaned[start : end + 1] - try: - return json.loads(fragment) - except Exception: - continue - raise ValueError(f"failed to parse json from: {cleaned[:500]!r}") - - -@dataclass -class QueryBuildResult: - query: str - tenant_id: str - search_total: int - search_depth: int - rerank_corpus_size: int - annotated_count: int - output_json_path: Path - - -class EvalStore: - def __init__(self, db_path: Path): - self.db_path = db_path - ensure_dir(db_path.parent) - self.conn = sqlite3.connect(str(db_path), check_same_thread=False) - self.conn.row_factory = sqlite3.Row - self._init_schema() - - def _init_schema(self) -> None: - self.conn.executescript( - """ - CREATE TABLE IF NOT EXISTS corpus_docs ( - tenant_id TEXT NOT NULL, - spu_id TEXT NOT NULL, - title_json TEXT, - vendor_json TEXT, - category_path_json TEXT, - category_name_json TEXT, - image_url TEXT, - skus_json TEXT, - tags_json TEXT, - raw_json TEXT NOT NULL, - updated_at TEXT NOT NULL, - PRIMARY KEY (tenant_id, spu_id) - ); - - CREATE TABLE IF NOT EXISTS rerank_scores ( - tenant_id TEXT NOT NULL, - query_text TEXT NOT NULL, - spu_id TEXT NOT NULL, - score REAL NOT NULL, - model_name TEXT, - updated_at TEXT NOT NULL, - PRIMARY KEY (tenant_id, query_text, spu_id) - ); - - CREATE TABLE IF NOT EXISTS relevance_labels ( - tenant_id TEXT NOT NULL, - query_text TEXT NOT NULL, - spu_id TEXT NOT NULL, - label TEXT NOT NULL, - judge_model TEXT, - raw_response TEXT, - updated_at TEXT NOT NULL, - PRIMARY KEY (tenant_id, query_text, spu_id) - ); - - CREATE TABLE IF NOT EXISTS build_runs ( - run_id TEXT PRIMARY KEY, - tenant_id TEXT NOT NULL, - query_text TEXT NOT NULL, - output_json_path TEXT NOT NULL, - metadata_json TEXT NOT NULL, - created_at TEXT NOT NULL - ); - - CREATE TABLE IF NOT EXISTS batch_runs ( - batch_id TEXT PRIMARY KEY, - tenant_id TEXT NOT NULL, - output_json_path TEXT NOT NULL, - report_markdown_path TEXT NOT NULL, - config_snapshot_path TEXT NOT NULL, - metadata_json TEXT NOT NULL, - created_at TEXT NOT NULL - ); - - CREATE TABLE IF NOT EXISTS query_profiles ( - tenant_id TEXT NOT NULL, - query_text TEXT NOT NULL, - prompt_version TEXT NOT NULL, - judge_model TEXT, - profile_json TEXT NOT NULL, - raw_response TEXT NOT NULL, - updated_at TEXT NOT NULL, - PRIMARY KEY (tenant_id, query_text, prompt_version) - ); - """ - ) - self.conn.commit() - - def upsert_corpus_docs(self, tenant_id: str, docs: Sequence[Dict[str, Any]]) -> None: - now = utc_now_iso() - rows = [] - for doc in docs: - rows.append( - ( - tenant_id, - str(doc.get("spu_id") or ""), - safe_json_dumps(doc.get("title")), - safe_json_dumps(doc.get("vendor")), - safe_json_dumps(doc.get("category_path")), - safe_json_dumps(doc.get("category_name")), - str(doc.get("image_url") or ""), - safe_json_dumps(doc.get("skus") or []), - safe_json_dumps(doc.get("tags") or []), - safe_json_dumps(doc), - now, - ) - ) - self.conn.executemany( - """ - INSERT INTO corpus_docs ( - tenant_id, spu_id, title_json, vendor_json, category_path_json, category_name_json, - image_url, skus_json, tags_json, raw_json, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(tenant_id, spu_id) DO UPDATE SET - title_json=excluded.title_json, - vendor_json=excluded.vendor_json, - category_path_json=excluded.category_path_json, - category_name_json=excluded.category_name_json, - image_url=excluded.image_url, - skus_json=excluded.skus_json, - tags_json=excluded.tags_json, - raw_json=excluded.raw_json, - updated_at=excluded.updated_at - """, - rows, - ) - self.conn.commit() - - def get_corpus_docs(self, tenant_id: str) -> List[Dict[str, Any]]: - rows = self.conn.execute( - "SELECT raw_json FROM corpus_docs WHERE tenant_id=? ORDER BY spu_id", - (tenant_id,), - ).fetchall() - return [json.loads(row["raw_json"]) for row in rows] - - def get_corpus_docs_by_spu_ids(self, tenant_id: str, spu_ids: Sequence[str]) -> Dict[str, Dict[str, Any]]: - keys = [str(spu_id) for spu_id in spu_ids if str(spu_id).strip()] - if not keys: - return {} - placeholders = ",".join("?" for _ in keys) - rows = self.conn.execute( - f""" - SELECT spu_id, raw_json - FROM corpus_docs - WHERE tenant_id=? AND spu_id IN ({placeholders}) - """, - [tenant_id, *keys], - ).fetchall() - return { - str(row["spu_id"]): json.loads(row["raw_json"]) - for row in rows - } - - def has_corpus(self, tenant_id: str) -> bool: - row = self.conn.execute( - "SELECT COUNT(1) AS n FROM corpus_docs WHERE tenant_id=?", - (tenant_id,), - ).fetchone() - return bool(row and row["n"] > 0) - - def get_rerank_scores(self, tenant_id: str, query_text: str) -> Dict[str, float]: - rows = self.conn.execute( - """ - SELECT spu_id, score - FROM rerank_scores - WHERE tenant_id=? AND query_text=? - """, - (tenant_id, query_text), - ).fetchall() - return {str(row["spu_id"]): float(row["score"]) for row in rows} - - def upsert_rerank_scores( - self, - tenant_id: str, - query_text: str, - scores: Dict[str, float], - model_name: str, - ) -> None: - now = utc_now_iso() - rows = [ - (tenant_id, query_text, spu_id, float(score), model_name, now) - for spu_id, score in scores.items() - ] - self.conn.executemany( - """ - INSERT INTO rerank_scores (tenant_id, query_text, spu_id, score, model_name, updated_at) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET - score=excluded.score, - model_name=excluded.model_name, - updated_at=excluded.updated_at - """, - rows, - ) - self.conn.commit() - - def get_labels(self, tenant_id: str, query_text: str) -> Dict[str, str]: - rows = self.conn.execute( - """ - SELECT spu_id, label - FROM relevance_labels - WHERE tenant_id=? AND query_text=? - """, - (tenant_id, query_text), - ).fetchall() - return {str(row["spu_id"]): str(row["label"]) for row in rows} - - def upsert_labels( - self, - tenant_id: str, - query_text: str, - labels: Dict[str, str], - judge_model: str, - raw_response: str, - ) -> None: - now = utc_now_iso() - rows = [] - for spu_id, label in labels.items(): - if label not in VALID_LABELS: - raise ValueError(f"invalid label: {label}") - rows.append((tenant_id, query_text, spu_id, label, judge_model, raw_response, now)) - self.conn.executemany( - """ - INSERT INTO relevance_labels (tenant_id, query_text, spu_id, label, judge_model, raw_response, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET - label=excluded.label, - judge_model=excluded.judge_model, - raw_response=excluded.raw_response, - updated_at=excluded.updated_at - """, - rows, - ) - self.conn.commit() - - def get_query_profile(self, tenant_id: str, query_text: str, prompt_version: str) -> Optional[Dict[str, Any]]: - row = self.conn.execute( - """ - SELECT profile_json - FROM query_profiles - WHERE tenant_id=? AND query_text=? AND prompt_version=? - """, - (tenant_id, query_text, prompt_version), - ).fetchone() - if not row: - return None - return json.loads(row["profile_json"]) - - def upsert_query_profile( - self, - tenant_id: str, - query_text: str, - prompt_version: str, - judge_model: str, - profile: Dict[str, Any], - raw_response: str, - ) -> None: - self.conn.execute( - """ - INSERT OR REPLACE INTO query_profiles - (tenant_id, query_text, prompt_version, judge_model, profile_json, raw_response, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - tenant_id, - query_text, - prompt_version, - judge_model, - safe_json_dumps(profile), - raw_response, - utc_now_iso(), - ), - ) - self.conn.commit() - - def insert_build_run(self, run_id: str, tenant_id: str, query_text: str, output_json_path: Path, metadata: Dict[str, Any]) -> None: - self.conn.execute( - """ - INSERT OR REPLACE INTO build_runs (run_id, tenant_id, query_text, output_json_path, metadata_json, created_at) - VALUES (?, ?, ?, ?, ?, ?) - """, - (run_id, tenant_id, query_text, str(output_json_path), safe_json_dumps(metadata), utc_now_iso()), - ) - self.conn.commit() - - def insert_batch_run( - self, - batch_id: str, - tenant_id: str, - output_json_path: Path, - report_markdown_path: Path, - config_snapshot_path: Path, - metadata: Dict[str, Any], - ) -> None: - self.conn.execute( - """ - INSERT OR REPLACE INTO batch_runs - (batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - batch_id, - tenant_id, - str(output_json_path), - str(report_markdown_path), - str(config_snapshot_path), - safe_json_dumps(metadata), - utc_now_iso(), - ), - ) - self.conn.commit() - - def list_batch_runs(self, limit: int = 20) -> List[Dict[str, Any]]: - rows = self.conn.execute( - """ - SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at - FROM batch_runs - ORDER BY created_at DESC - LIMIT ? - """, - (limit,), - ).fetchall() - items: List[Dict[str, Any]] = [] - for row in rows: - items.append( - { - "batch_id": row["batch_id"], - "tenant_id": row["tenant_id"], - "output_json_path": row["output_json_path"], - "report_markdown_path": row["report_markdown_path"], - "config_snapshot_path": row["config_snapshot_path"], - "metadata": json.loads(row["metadata_json"]), - "created_at": row["created_at"], - } - ) - return items - - def get_batch_run(self, batch_id: str) -> Optional[Dict[str, Any]]: - row = self.conn.execute( - """ - SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at - FROM batch_runs - WHERE batch_id = ? - """, - (batch_id,), - ).fetchone() - if row is None: - return None - return { - "batch_id": row["batch_id"], - "tenant_id": row["tenant_id"], - "output_json_path": row["output_json_path"], - "report_markdown_path": row["report_markdown_path"], - "config_snapshot_path": row["config_snapshot_path"], - "metadata": json.loads(row["metadata_json"]), - "created_at": row["created_at"], - } - - def list_query_label_stats(self, tenant_id: str) -> List[Dict[str, Any]]: - rows = self.conn.execute( - """ - SELECT - query_text, - COUNT(*) AS total, - SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, - SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, - SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, - MAX(updated_at) AS updated_at - FROM relevance_labels - WHERE tenant_id=? - GROUP BY query_text - ORDER BY query_text - """, - (tenant_id,), - ).fetchall() - return [ - { - "query": str(row["query_text"]), - "total": int(row["total"]), - "exact_count": int(row["exact_count"] or 0), - "partial_count": int(row["partial_count"] or 0), - "irrelevant_count": int(row["irrelevant_count"] or 0), - "updated_at": row["updated_at"], - } - for row in rows - ] - - def get_query_label_stats(self, tenant_id: str, query_text: str) -> Dict[str, Any]: - row = self.conn.execute( - """ - SELECT - COUNT(*) AS total, - SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, - SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, - SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, - MAX(updated_at) AS updated_at - FROM relevance_labels - WHERE tenant_id=? AND query_text=? - """, - (tenant_id, query_text), - ).fetchone() - return { - "query": query_text, - "total": int((row["total"] or 0) if row else 0), - "exact_count": int((row["exact_count"] or 0) if row else 0), - "partial_count": int((row["partial_count"] or 0) if row else 0), - "irrelevant_count": int((row["irrelevant_count"] or 0) if row else 0), - "updated_at": row["updated_at"] if row else None, - } - - -class SearchServiceClient: - def __init__(self, base_url: str, tenant_id: str): - self.base_url = base_url.rstrip("/") - self.tenant_id = str(tenant_id) - self.session = requests.Session() - - def search(self, query: str, size: int, from_: int = 0, language: str = "en") -> Dict[str, Any]: - response = self.session.post( - f"{self.base_url}/search/", - headers={"Content-Type": "application/json", "X-Tenant-ID": self.tenant_id}, - json={"query": query, "size": size, "from": from_, "language": language}, - timeout=120, - ) - response.raise_for_status() - return response.json() - - -class RerankServiceClient: - def __init__(self, service_url: str): - self.service_url = service_url.rstrip("/") - self.session = requests.Session() - - def rerank(self, query: str, docs: Sequence[str], normalize: bool = False, top_n: Optional[int] = None) -> Tuple[List[float], Dict[str, Any]]: - payload: Dict[str, Any] = { - "query": query, - "docs": list(docs), - "normalize": normalize, - } - if top_n is not None: - payload["top_n"] = int(top_n) - response = self.session.post(self.service_url, json=payload, timeout=180) - response.raise_for_status() - data = response.json() - return list(data.get("scores") or []), dict(data.get("meta") or {}) - - -class DashScopeLabelClient: - def __init__(self, model: str, base_url: str, api_key: str, batch_size: int = 40): - self.model = model - self.base_url = base_url.rstrip("/") - self.api_key = api_key - self.batch_size = int(batch_size) - self.session = requests.Session() - - def _chat(self, prompt: str) -> Tuple[str, str]: - response = self.session.post( - f"{self.base_url}/chat/completions", - headers={ - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json", - }, - json={ - "model": self.model, - "messages": [{"role": "user", "content": prompt}], - "temperature": 0, - "top_p": 0.1, - }, - timeout=180, - ) - response.raise_for_status() - data = response.json() - content = str(((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() - return content, safe_json_dumps(data) - - def classify_batch_simple( - self, - query: str, - docs: Sequence[Dict[str, Any]], - ) -> Tuple[List[str], str]: - numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] - prompt = ( - "You are an e-commerce search result relevance evaluation assistant. " - "Based on the user query and each product's information, output the relevance level for each product.\n\n" - "## Relevance Level Criteria\n" - "Exact — Fully matches the user's search intent.\n" - "Partial — Primary intent satisfied (same category or similar use, basically aligns with search intent), " - "but secondary attributes such as color, style, size, fit, length, or material deviate from or cannot be confirmed.\n" - "Irrelevant — Category or use case mismatched, primary intent not satisfied.\n\n" - "Additional judging guidance:\n" - "- If the query clearly names a product type, product type matching has the highest priority. " - "Dress vs skirt vs jumpsuit, jeans vs pants, T-shirt vs blouse, cardigan vs sweater, boots vs shoes, " - "bra vs top, backpack vs bag are not interchangeable.\n" - "- When the query clearly specifies a concrete product type, a different product type should usually be Irrelevant, not Partial.\n" - "- If an attribute looks missing or uncertain, prefer Partial instead of Exact.\n" - "- Do not guess missing attributes.\n" - "- Graphic, slogan, holiday, memorial, or message tees are not Exact for a plain tee query unless that graphic/theme is requested.\n" - "- Be conservative with Exact.\n\n" - f"Query: {query}\n\n" - "Products:\n" - + "\n".join(numbered_docs) - + "\n\n## Output Format\n" - f"Strictly output {len(docs)} lines, each line containing exactly one of Exact / Partial / Irrelevant. " - "They must correspond sequentially to the products above. Do not output any other information.\n" - ) - content, raw_response = self._chat(prompt) - labels = [] - for line in str(content or "").splitlines(): - label = line.strip() - if label in VALID_LABELS: - labels.append(label) - if len(labels) != len(docs): - payload = _extract_json_blob(content) - if isinstance(payload, dict) and isinstance(payload.get("labels"), list): - labels = [] - for item in payload["labels"][: len(docs)]: - if isinstance(item, dict): - label = str(item.get("label") or "").strip() - else: - label = str(item).strip() - if label in VALID_LABELS: - labels.append(label) - if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): - raise ValueError(f"unexpected simple label output: {content!r}") - return labels, raw_response - - def extract_query_profile( - self, - query: str, - parser_hints: Dict[str, Any], - ) -> Tuple[Dict[str, Any], str]: - prompt = ( - "You are building a structured intent profile for e-commerce relevance judging.\n" - "Use the original user query as the source of truth. Parser hints may help, but if a hint conflicts with the original query, trust the original query.\n" - "Be conservative: only mark an attribute as required if the user explicitly asked for it.\n\n" - "Return JSON with this schema:\n" - "{\n" - ' "normalized_query_en": string,\n' - ' "primary_category": string,\n' - ' "allowed_categories": [string],\n' - ' "required_attributes": [\n' - ' {"name": string, "required_terms": [string], "conflicting_terms": [string], "match_mode": "explicit"}\n' - " ],\n" - ' "notes": [string]\n' - "}\n\n" - "Guidelines:\n" - "- Exact later will require explicit evidence for all required attributes.\n" - "- allowed_categories should contain only near-synonyms of the same product type, not substitutes. For example dress can allow midi dress/cocktail dress, but not skirt, top, jumpsuit, or outfit unless the query explicitly asks for them.\n" - "- If the query asks for dress/skirt/jeans/t-shirt, near but different product types are not Exact.\n" - "- If the query includes color, fit, silhouette, or length, include them as required_attributes.\n" - "- For fit words, include conflicting terms when obvious, e.g. fitted conflicts with oversized/loose; oversized conflicts with fitted/tight.\n" - "- For color, include conflicting colors only when clear from the query.\n\n" - f"Original query: {query}\n" - f"Parser hints JSON: {json.dumps(parser_hints, ensure_ascii=False)}\n" - ) - content, raw_response = self._chat(prompt) - payload = _extract_json_blob(content) - if not isinstance(payload, dict): - raise ValueError(f"unexpected query profile payload: {content!r}") - payload.setdefault("normalized_query_en", query) - payload.setdefault("primary_category", "") - payload.setdefault("allowed_categories", []) - payload.setdefault("required_attributes", []) - payload.setdefault("notes", []) - return payload, raw_response - - def classify_batch_complex( - self, - query: str, - query_profile: Dict[str, Any], - docs: Sequence[Dict[str, Any]], - ) -> Tuple[List[str], str]: - numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] - prompt = ( - "You are an e-commerce search relevance judge.\n" - "Judge each product against the structured query profile below.\n\n" - "Relevance rules:\n" - "- Exact: product type matches the target intent, and every explicit required attribute is positively supported by the title/options/tags/category. If an attribute is missing or only guessed, it is NOT Exact.\n" - "- Partial: main product type/use case matches, but some required attribute is missing, weaker, uncertain, or only approximately matched.\n" - "- Irrelevant: product type/use case mismatched, or an explicit required attribute clearly conflicts.\n" - "- Be conservative with Exact.\n" - "- Graphic/holiday/message tees are not Exact for a plain color/style tee query unless that graphic/theme was requested.\n" - "- Jumpsuit/romper/set is not Exact for dress/skirt/jeans queries.\n\n" - f"Original query: {query}\n" - f"Structured query profile JSON: {json.dumps(query_profile, ensure_ascii=False)}\n\n" - "Products:\n" - + "\n".join(numbered_docs) - + "\n\nReturn JSON only, with schema:\n" - '{"labels":[{"index":1,"label":"Exact","reason":"short phrase"}]}\n' - ) - content, raw_response = self._chat(prompt) - payload = _extract_json_blob(content) - if not isinstance(payload, dict) or not isinstance(payload.get("labels"), list): - raise ValueError(f"unexpected label payload: {content!r}") - labels_payload = payload["labels"] - labels: List[str] = [] - for item in labels_payload[: len(docs)]: - if not isinstance(item, dict): - continue - label = str(item.get("label") or "").strip() - if label in VALID_LABELS: - labels.append(label) - if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): - raise ValueError(f"unexpected label output: {content!r}") - return labels, raw_response - - -def precision_at_k(labels: Sequence[str], k: int, relevant: Sequence[str]) -> float: - if k <= 0: - return 0.0 - sliced = list(labels[:k]) - if not sliced: - return 0.0 - hits = sum(1 for label in sliced if label in relevant) - return hits / float(min(k, len(sliced))) - - -def average_precision(labels: Sequence[str], relevant: Sequence[str]) -> float: - hit_count = 0 - precision_sum = 0.0 - for idx, label in enumerate(labels, start=1): - if label not in relevant: - continue - hit_count += 1 - precision_sum += hit_count / idx - if hit_count == 0: - return 0.0 - return precision_sum / hit_count - - -def compute_query_metrics(labels: Sequence[str]) -> Dict[str, float]: - metrics: Dict[str, float] = {} - for k in (5, 10, 20, 50): - metrics[f"P@{k}"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT]), 6) - metrics[f"P@{k}_2_3"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) - metrics["MAP_3"] = round(average_precision(labels, [RELEVANCE_EXACT]), 6) - metrics["MAP_2_3"] = round(average_precision(labels, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) - return metrics - - -def aggregate_metrics(metric_items: Sequence[Dict[str, float]]) -> Dict[str, float]: - if not metric_items: - return {} - keys = sorted(metric_items[0].keys()) - return { - key: round(sum(float(item.get(key, 0.0)) for item in metric_items) / len(metric_items), 6) - for key in keys - } - - -def label_distribution(labels: Sequence[str]) -> Dict[str, int]: - return { - RELEVANCE_EXACT: sum(1 for label in labels if label == RELEVANCE_EXACT), - RELEVANCE_PARTIAL: sum(1 for label in labels if label == RELEVANCE_PARTIAL), - RELEVANCE_IRRELEVANT: sum(1 for label in labels if label == RELEVANCE_IRRELEVANT), - } - - -class SearchEvaluationFramework: - def __init__( - self, - tenant_id: str, - artifact_root: Path = DEFAULT_ARTIFACT_ROOT, - search_base_url: str = "http://localhost:6002", - labeler_mode: str = DEFAULT_LABELER_MODE, - ): - init_service(get_app_config().infrastructure.elasticsearch.host) - self.tenant_id = str(tenant_id) - self.artifact_root = ensure_dir(artifact_root) - self.labeler_mode = str(labeler_mode).strip().lower() or DEFAULT_LABELER_MODE - self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") - self.search_client = SearchServiceClient(search_base_url, self.tenant_id) - app_cfg = get_app_config() - rerank_service_url = str( - app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] - ) - self.rerank_client = RerankServiceClient(rerank_service_url) - llm_cfg = app_cfg.services.translation.capabilities["llm"] - api_key = app_cfg.infrastructure.secrets.dashscope_api_key - if not api_key: - raise RuntimeError("dashscope_api_key is required for search evaluation annotation") - self.label_client = DashScopeLabelClient( - model=str(llm_cfg["model"]), - base_url=str(llm_cfg["base_url"]), - api_key=str(api_key), - ) - self.query_parser = None - - def _get_query_parser(self): - if self.query_parser is None: - self.query_parser = get_query_parser() - return self.query_parser - - def build_query_parser_hints(self, query: str) -> Dict[str, Any]: - parsed = self._get_query_parser().parse(query, generate_vector=False, target_languages=["en", "zh"]) - payload = parsed.to_dict() - payload["text_for_rerank"] = parsed.text_for_rerank() - return payload - - def get_query_profile(self, query: str, force_refresh: bool = False) -> Dict[str, Any]: - if self.labeler_mode != "complex": - raise RuntimeError("query profiles are only used in complex labeler mode") - if not force_refresh: - cached = self.store.get_query_profile(self.tenant_id, query, JUDGE_PROMPT_VERSION_COMPLEX) - if cached is not None: - return cached - parser_hints = self.build_query_parser_hints(query) - profile, raw_response = self.label_client.extract_query_profile(query, parser_hints) - profile["parser_hints"] = parser_hints - self.store.upsert_query_profile( - self.tenant_id, - query, - JUDGE_PROMPT_VERSION_COMPLEX, - self.label_client.model, - profile, - raw_response, - ) - return profile - - @staticmethod - def _doc_evidence_text(doc: Dict[str, Any]) -> str: - pieces: List[str] = [ - build_display_title(doc), - pick_text(doc.get("vendor"), "en"), - pick_text(doc.get("category_path"), "en"), - pick_text(doc.get("category_name"), "en"), - ] - for sku in doc.get("skus") or []: - pieces.extend( - [ - str(sku.get("option1_value") or ""), - str(sku.get("option2_value") or ""), - str(sku.get("option3_value") or ""), - ] - ) - for tag in doc.get("tags") or []: - pieces.append(str(tag)) - return normalize_text(" | ".join(piece for piece in pieces if piece)) - - def _apply_rule_based_label_guardrails( - self, - label: str, - query_profile: Dict[str, Any], - doc: Dict[str, Any], - ) -> str: - if label not in VALID_LABELS: - return label - evidence = self._doc_evidence_text(doc) - category = normalize_text(query_profile.get("primary_category")) - allowed_categories = [normalize_text(item) for item in query_profile.get("allowed_categories") or [] if str(item).strip()] - - primary_category_match = True - if category: - primary_category_match = category in evidence - allowed_category_match = True - if allowed_categories: - allowed_category_match = any(signal in evidence for signal in allowed_categories) - - if label == RELEVANCE_EXACT and not primary_category_match: - if allowed_category_match: - label = RELEVANCE_PARTIAL - else: - return RELEVANCE_IRRELEVANT - - for attr in query_profile.get("required_attributes") or []: - if not isinstance(attr, dict): - continue - attr_name = normalize_text(attr.get("name")) - if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style", "waist_style", "rise"}: - continue - required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] - conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] - if attr_name == "fit": - if any(term in {"oversized", "oversize"} for term in required_terms): - conflicting_terms.extend(["slim", "slimming", "fitted", "tight", "close-fitting"]) - if any(term in {"fitted", "slim fit", "tight"} for term in required_terms): - conflicting_terms.extend(["oversized", "oversize", "loose", "relaxed"]) - has_required = any(term in evidence for term in required_terms) if required_terms else True - has_conflict = any(term in evidence for term in conflicting_terms) - - if has_conflict: - return RELEVANCE_IRRELEVANT - if label == RELEVANCE_EXACT and not has_required: - label = RELEVANCE_PARTIAL - - if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: - return RELEVANCE_IRRELEVANT - - return label - - @staticmethod - def _result_item_to_doc(item: Dict[str, Any]) -> Dict[str, Any]: - option_values = list(item.get("option_values") or []) - while len(option_values) < 3: - option_values.append("") - product = dict(item.get("product") or {}) - return { - "spu_id": item.get("spu_id"), - "title": product.get("title") or item.get("title"), - "vendor": product.get("vendor"), - "category_path": product.get("category"), - "category_name": product.get("category"), - "image_url": item.get("image_url") or product.get("image_url"), - "tags": product.get("tags") or [], - "skus": [ - { - "option1_value": option_values[0], - "option2_value": option_values[1], - "option3_value": option_values[2], - } - ], - } - - def _collect_label_issues( - self, - label: str, - query_profile: Dict[str, Any], - doc: Dict[str, Any], - ) -> List[str]: - evidence = self._doc_evidence_text(doc) - issues: List[str] = [] - category = normalize_text(query_profile.get("primary_category")) - allowed_categories = [ - normalize_text(item) - for item in query_profile.get("allowed_categories") or [] - if str(item).strip() - ] - - primary_category_match = True if not category else category in evidence - allowed_category_match = False if allowed_categories else primary_category_match - if allowed_categories: - allowed_category_match = any(signal in evidence for signal in allowed_categories) - - if label == RELEVANCE_EXACT and not primary_category_match: - if allowed_category_match: - issues.append("Exact missing primary category evidence") - else: - issues.append("Exact has category mismatch") - - if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: - issues.append("Partial has category mismatch") - - for attr in query_profile.get("required_attributes") or []: - if not isinstance(attr, dict): - continue - attr_name = normalize_text(attr.get("name")) - if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style"}: - continue - required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] - conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] - has_required = any(term in evidence for term in required_terms) if required_terms else True - has_conflict = any(term in evidence for term in conflicting_terms) - - if has_conflict and label != RELEVANCE_IRRELEVANT: - issues.append(f"{label} conflicts on {attr_name}") - if label == RELEVANCE_EXACT and not has_required: - issues.append(f"Exact missing {attr_name}") - return issues - - def audit_live_query( - self, - query: str, - *, - top_k: int = 100, - language: str = "en", - auto_annotate: bool = False, - ) -> Dict[str, Any]: - live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) - if self.labeler_mode != "complex": - labels = [ - item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT - for item in live["results"] - ] - return { - "query": query, - "tenant_id": self.tenant_id, - "top_k": top_k, - "metrics": live["metrics"], - "distribution": label_distribution(labels), - "query_profile": None, - "suspicious": [], - "results": live["results"], - } - query_profile = self.get_query_profile(query, force_refresh=False) - suspicious: List[Dict[str, Any]] = [] - - for item in live["results"]: - doc = self._result_item_to_doc(item) - issues = self._collect_label_issues(item["label"] or "", query_profile, doc) - suggested_label = self._apply_rule_based_label_guardrails(item["label"] or "", query_profile, doc) - if suggested_label != (item["label"] or ""): - issues = list(issues) + [f"Suggested relabel: {item['label']} -> {suggested_label}"] - if issues: - suspicious.append( - { - "rank": item["rank"], - "spu_id": item["spu_id"], - "title": item["title"], - "label": item["label"], - "suggested_label": suggested_label, - "issues": issues, - } - ) - - labels = [ - item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT - for item in live["results"] - ] - return { - "query": query, - "tenant_id": self.tenant_id, - "top_k": top_k, - "metrics": live["metrics"], - "distribution": label_distribution(labels), - "query_profile": query_profile, - "suspicious": suspicious, - "results": live["results"], - } - - def queries_from_file(self, path: Path) -> List[str]: - return [ - line.strip() - for line in path.read_text(encoding="utf-8").splitlines() - if line.strip() and not line.strip().startswith("#") - ] - - def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: - if not refresh and self.store.has_corpus(self.tenant_id): - return self.store.get_corpus_docs(self.tenant_id) - - es_client = get_es_client().client - index_name = get_tenant_index_name(self.tenant_id) - docs: List[Dict[str, Any]] = [] - for hit in scan( - client=es_client, - index=index_name, - query={ - "_source": [ - "spu_id", - "title", - "vendor", - "category_path", - "category_name", - "image_url", - "skus", - "tags", - ], - "query": {"match_all": {}}, - }, - size=500, - preserve_order=False, - clear_scroll=True, - ): - source = dict(hit.get("_source") or {}) - source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") - docs.append(source) - self.store.upsert_corpus_docs(self.tenant_id, docs) - return docs - - def full_corpus_rerank( - self, - query: str, - docs: Sequence[Dict[str, Any]], - batch_size: int = 24, - force_refresh: bool = False, - ) -> List[Dict[str, Any]]: - cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) - pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] - if pending: - new_scores: Dict[str, float] = {} - for start in range(0, len(pending), batch_size): - batch = pending[start : start + batch_size] - scores = self._rerank_batch_with_retry(query=query, docs=batch) - if len(scores) != len(batch): - raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") - for doc, score in zip(batch, scores): - new_scores[str(doc.get("spu_id"))] = float(score) - self.store.upsert_rerank_scores( - self.tenant_id, - query, - new_scores, - model_name="qwen3_vllm_score", - ) - cached.update(new_scores) - - ranked = [] - for doc in docs: - spu_id = str(doc.get("spu_id")) - ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) - ranked.sort(key=lambda item: item["score"], reverse=True) - return ranked - - def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: - if not docs: - return [] - doc_texts = [build_rerank_doc(doc) for doc in docs] - try: - scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) - return scores - except Exception: - if len(docs) == 1: - return [-1.0] - if len(docs) <= 6: - scores: List[float] = [] - for doc in docs: - scores.extend(self._rerank_batch_with_retry(query, [doc])) - return scores - mid = len(docs) // 2 - left = self._rerank_batch_with_retry(query, docs[:mid]) - right = self._rerank_batch_with_retry(query, docs[mid:]) - return left + right - - def annotate_missing_labels( - self, - query: str, - docs: Sequence[Dict[str, Any]], - force_refresh: bool = False, - ) -> Dict[str, str]: - labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) - missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] - if not missing_docs: - return labels - - for start in range(0, len(missing_docs), self.label_client.batch_size): - batch = missing_docs[start : start + self.label_client.batch_size] - batch_pairs = self._classify_with_retry(query, batch, force_refresh=force_refresh) - for sub_labels, raw_response, sub_batch in batch_pairs: - to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} - self.store.upsert_labels( - self.tenant_id, - query, - to_store, - judge_model=self.label_client.model, - raw_response=raw_response, - ) - labels.update(to_store) - time.sleep(0.1) - return labels - - def _classify_with_retry( - self, - query: str, - docs: Sequence[Dict[str, Any]], - *, - force_refresh: bool = False, - ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: - if not docs: - return [] - try: - if self.labeler_mode == "complex": - query_profile = self.get_query_profile(query, force_refresh=force_refresh) - labels, raw_response = self.label_client.classify_batch_complex(query, query_profile, docs) - labels = [ - self._apply_rule_based_label_guardrails(label, query_profile, doc) - for doc, label in zip(docs, labels) - ] - else: - labels, raw_response = self.label_client.classify_batch_simple(query, docs) - return [(labels, raw_response, docs)] - except Exception: - if len(docs) == 1: - raise - mid = len(docs) // 2 - return self._classify_with_retry(query, docs[:mid], force_refresh=force_refresh) + self._classify_with_retry(query, docs[mid:], force_refresh=force_refresh) - - def build_query_annotation_set( - self, - query: str, - *, - search_depth: int = 1000, - rerank_depth: int = 10000, - annotate_search_top_k: int = 120, - annotate_rerank_top_k: int = 200, - language: str = "en", - force_refresh_rerank: bool = False, - force_refresh_labels: bool = False, - ) -> QueryBuildResult: - search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) - search_results = list(search_payload.get("results") or []) - corpus = self.corpus_docs(refresh=False) - full_rerank = self.full_corpus_rerank( - query=query, - docs=corpus, - force_refresh=force_refresh_rerank, - ) - rerank_depth_effective = min(rerank_depth, len(full_rerank)) - - pool_docs: Dict[str, Dict[str, Any]] = {} - for doc in search_results[:annotate_search_top_k]: - pool_docs[str(doc.get("spu_id"))] = doc - for item in full_rerank[:annotate_rerank_top_k]: - pool_docs[str(item["spu_id"])] = item["doc"] - - labels = self.annotate_missing_labels( - query=query, - docs=list(pool_docs.values()), - force_refresh=force_refresh_labels, - ) - - search_labeled_results: List[Dict[str, Any]] = [] - for rank, doc in enumerate(search_results, start=1): - spu_id = str(doc.get("spu_id")) - label = labels.get(spu_id) - search_labeled_results.append( - { - "rank": rank, - "spu_id": spu_id, - "title": build_display_title(doc), - "image_url": doc.get("image_url"), - "rerank_score": None, - "label": label, - "option_values": list(compact_option_values(doc.get("skus") or [])), - "product": compact_product_payload(doc), - } - ) - - rerank_top_results: List[Dict[str, Any]] = [] - for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): - doc = item["doc"] - spu_id = str(item["spu_id"]) - rerank_top_results.append( - { - "rank": rank, - "spu_id": spu_id, - "title": build_display_title(doc), - "image_url": doc.get("image_url"), - "rerank_score": round(float(item["score"]), 8), - "label": labels.get(spu_id), - "option_values": list(compact_option_values(doc.get("skus") or [])), - "product": compact_product_payload(doc), - } - ) - - top100_labels = [ - item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT - for item in search_labeled_results[:100] - ] - metrics = compute_query_metrics(top100_labels) - output_dir = ensure_dir(self.artifact_root / "query_builds") - run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" - output_json_path = output_dir / f"{run_id}.json" - payload = { - "run_id": run_id, - "created_at": utc_now_iso(), - "tenant_id": self.tenant_id, - "query": query, - "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), - "search_total": int(search_payload.get("total") or 0), - "search_depth_requested": search_depth, - "search_depth_effective": len(search_results), - "rerank_depth_requested": rerank_depth, - "rerank_depth_effective": rerank_depth_effective, - "corpus_size": len(corpus), - "annotation_pool": { - "annotate_search_top_k": annotate_search_top_k, - "annotate_rerank_top_k": annotate_rerank_top_k, - "pool_size": len(pool_docs), - }, - "labeler_mode": self.labeler_mode, - "query_profile": self.get_query_profile(query, force_refresh=force_refresh_labels) if self.labeler_mode == "complex" else None, - "metrics_top100": metrics, - "search_results": search_labeled_results, - "full_rerank_top": rerank_top_results, - } - output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") - self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) - return QueryBuildResult( - query=query, - tenant_id=self.tenant_id, - search_total=int(search_payload.get("total") or 0), - search_depth=len(search_results), - rerank_corpus_size=len(corpus), - annotated_count=len(pool_docs), - output_json_path=output_json_path, - ) - - def evaluate_live_query( - self, - query: str, - top_k: int = 100, - auto_annotate: bool = False, - language: str = "en", - force_refresh_labels: bool = False, - ) -> Dict[str, Any]: - search_payload = self.search_client.search(query=query, size=max(top_k, 100), from_=0, language=language) - results = list(search_payload.get("results") or []) - if auto_annotate: - self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) - labels = self.store.get_labels(self.tenant_id, query) - recalled_spu_ids = {str(doc.get("spu_id")) for doc in results[:top_k]} - labeled = [] - unlabeled_hits = 0 - for rank, doc in enumerate(results[:top_k], start=1): - spu_id = str(doc.get("spu_id")) - label = labels.get(spu_id) - if label not in VALID_LABELS: - unlabeled_hits += 1 - labeled.append( - { - "rank": rank, - "spu_id": spu_id, - "title": build_display_title(doc), - "image_url": doc.get("image_url"), - "label": label, - "option_values": list(compact_option_values(doc.get("skus") or [])), - "product": compact_product_payload(doc), - } - ) - metric_labels = [ - item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT - for item in labeled - ] - label_stats = self.store.get_query_label_stats(self.tenant_id, query) - rerank_scores = self.store.get_rerank_scores(self.tenant_id, query) - relevant_missing_ids = [ - spu_id - for spu_id, label in labels.items() - if label in {RELEVANCE_EXACT, RELEVANCE_PARTIAL} and spu_id not in recalled_spu_ids - ] - missing_docs_map = self.store.get_corpus_docs_by_spu_ids(self.tenant_id, relevant_missing_ids) - missing_relevant = [] - for spu_id in relevant_missing_ids: - doc = missing_docs_map.get(spu_id) - if not doc: - continue - missing_relevant.append( - { - "spu_id": spu_id, - "label": labels[spu_id], - "rerank_score": rerank_scores.get(spu_id), - "title": build_display_title(doc), - "image_url": doc.get("image_url"), - "option_values": list(compact_option_values(doc.get("skus") or [])), - "product": compact_product_payload(doc), - } - ) - label_order = {RELEVANCE_EXACT: 0, RELEVANCE_PARTIAL: 1, RELEVANCE_IRRELEVANT: 2} - missing_relevant.sort( - key=lambda item: ( - label_order.get(str(item.get("label")), 9), - -(float(item.get("rerank_score")) if item.get("rerank_score") is not None else float("-inf")), - str(item.get("title") or ""), - ) - ) - tips: List[str] = [] - if auto_annotate: - tips.append("Single-query evaluation used cached labels and refreshed missing labels for recalled results.") - else: - tips.append("Single-query evaluation used the offline annotation cache only; recalled SPUs without cached labels were treated as Irrelevant.") - if label_stats["total"] == 0: - tips.append("This query has no offline annotation set yet. Build or refresh labels first if you want stable evaluation.") - if unlabeled_hits: - tips.append(f"{unlabeled_hits} recalled results were not in the annotation set and were counted as Irrelevant.") - if not missing_relevant: - tips.append("No cached Exact/Partial products were missed by this recall set.") - return { - "query": query, - "tenant_id": self.tenant_id, - "top_k": top_k, - "metrics": compute_query_metrics(metric_labels), - "results": labeled, - "missing_relevant": missing_relevant, - "label_stats": { - **label_stats, - "unlabeled_hits_treated_irrelevant": unlabeled_hits, - "recalled_hits": len(labeled), - "missing_relevant_count": len(missing_relevant), - "missing_exact_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_EXACT), - "missing_partial_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_PARTIAL), - }, - "tips": tips, - "total": int(search_payload.get("total") or 0), - } - - def batch_evaluate( - self, - queries: Sequence[str], - *, - top_k: int = 100, - auto_annotate: bool = True, - language: str = "en", - force_refresh_labels: bool = False, - ) -> Dict[str, Any]: - per_query = [] - for query in queries: - live = self.evaluate_live_query( - query, - top_k=top_k, - auto_annotate=auto_annotate, - language=language, - force_refresh_labels=force_refresh_labels, - ) - labels = [ - item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT - for item in live["results"] - ] - per_query.append( - { - "query": live["query"], - "tenant_id": live["tenant_id"], - "top_k": live["top_k"], - "metrics": live["metrics"], - "distribution": label_distribution(labels), - "total": live["total"], - } - ) - aggregate = aggregate_metrics([item["metrics"] for item in per_query]) - aggregate_distribution = { - RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), - RELEVANCE_PARTIAL: sum(item["distribution"][RELEVANCE_PARTIAL] for item in per_query), - RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), - } - batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" - report_dir = ensure_dir(self.artifact_root / "batch_reports") - config_snapshot_path = report_dir / f"{batch_id}_config.json" - config_snapshot = requests.get("http://localhost:6002/admin/config", timeout=20).json() - config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") - output_json_path = report_dir / f"{batch_id}.json" - report_md_path = report_dir / f"{batch_id}.md" - payload = { - "batch_id": batch_id, - "created_at": utc_now_iso(), - "tenant_id": self.tenant_id, - "queries": list(queries), - "top_k": top_k, - "aggregate_metrics": aggregate, - "aggregate_distribution": aggregate_distribution, - "per_query": per_query, - "config_snapshot_path": str(config_snapshot_path), - } - output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") - report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") - self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) - return payload - - -def render_batch_report_markdown(payload: Dict[str, Any]) -> str: - lines = [ - "# Search Batch Evaluation", - "", - f"- Batch ID: {payload['batch_id']}", - f"- Created at: {payload['created_at']}", - f"- Tenant ID: {payload['tenant_id']}", - f"- Query count: {len(payload['queries'])}", - f"- Top K: {payload['top_k']}", - "", - "## Aggregate Metrics", - "", - ] - for key, value in sorted((payload.get("aggregate_metrics") or {}).items()): - lines.append(f"- {key}: {value}") - distribution = payload.get("aggregate_distribution") or {} - if distribution: - lines.extend( - [ - "", - "## Label Distribution", - "", - f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}", - f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}", - f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}", - ] - ) - lines.extend(["", "## Per Query", ""]) - for item in payload.get("per_query") or []: - lines.append(f"### {item['query']}") - lines.append("") - for key, value in sorted((item.get("metrics") or {}).items()): - lines.append(f"- {key}: {value}") - distribution = item.get("distribution") or {} - lines.append(f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}") - lines.append(f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}") - lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}") - lines.append("") - return "\n".join(lines) - - -class SearchEvalRequest(BaseModel): - query: str - top_k: int = Field(default=100, ge=1, le=500) - auto_annotate: bool = False - language: str = "en" - - -class BatchEvalRequest(BaseModel): - queries: Optional[List[str]] = None - top_k: int = Field(default=100, ge=1, le=500) - auto_annotate: bool = False - language: str = "en" - force_refresh_labels: bool = False - - -def create_web_app(framework: SearchEvaluationFramework, query_file: Path = DEFAULT_QUERY_FILE) -> FastAPI: - app = FastAPI(title="Search Evaluation UI", version="1.0.0") - - @app.get("/", response_class=HTMLResponse) - def home() -> str: - return WEB_APP_HTML - - @app.get("/api/queries") - def api_queries() -> Dict[str, Any]: - return {"queries": framework.queries_from_file(query_file)} - - @app.post("/api/search-eval") - def api_search_eval(request: SearchEvalRequest) -> Dict[str, Any]: - return framework.evaluate_live_query( - query=request.query, - top_k=request.top_k, - auto_annotate=request.auto_annotate, - language=request.language, - ) - - @app.post("/api/batch-eval") - def api_batch_eval(request: BatchEvalRequest) -> Dict[str, Any]: - queries = request.queries or framework.queries_from_file(query_file) - if not queries: - raise HTTPException(status_code=400, detail="No queries provided") - return framework.batch_evaluate( - queries=queries, - top_k=request.top_k, - auto_annotate=request.auto_annotate, - language=request.language, - force_refresh_labels=request.force_refresh_labels, - ) - - @app.get("/api/history") - def api_history() -> Dict[str, Any]: - return {"history": framework.store.list_batch_runs(limit=20)} - - @app.get("/api/history/{batch_id}/report") - def api_history_report(batch_id: str) -> Dict[str, Any]: - row = framework.store.get_batch_run(batch_id) - if row is None: - raise HTTPException(status_code=404, detail="Unknown batch_id") - report_path = Path(row["report_markdown_path"]).resolve() - root = framework.artifact_root.resolve() - try: - report_path.relative_to(root) - except ValueError: - raise HTTPException(status_code=403, detail="Report path is outside artifact root") - if not report_path.is_file(): - raise HTTPException(status_code=404, detail="Report file not found") - return { - "batch_id": row["batch_id"], - "created_at": row["created_at"], - "tenant_id": row["tenant_id"], - "report_markdown_path": str(report_path), - "markdown": report_path.read_text(encoding="utf-8"), - } - - return app - - -WEB_APP_HTML = """ - - - - - - Search Evaluation - - - -
- -
-

Search Evaluation

-

Single-query evaluation and batch evaluation share the same service on port 6010.

-
- - - -
-
-
-

Metrics

-
-
-
-

Top Results

-
-
-
-

Missed Exact / Partial

-
-
-
-

Notes

-
-
-
-
- - - - - - -""" - - -def build_cli_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Search evaluation annotation builder and web UI") - sub = parser.add_subparsers(dest="command", required=True) - - build = sub.add_parser("build", help="Build pooled annotation set for queries") - build.add_argument("--tenant-id", default="163") - build.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) - build.add_argument("--search-depth", type=int, default=1000) - build.add_argument("--rerank-depth", type=int, default=10000) - build.add_argument("--annotate-search-top-k", type=int, default=120) - build.add_argument("--annotate-rerank-top-k", type=int, default=200) - build.add_argument("--language", default="en") - build.add_argument("--force-refresh-rerank", action="store_true") - build.add_argument("--force-refresh-labels", action="store_true") - build.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) - - batch = sub.add_parser("batch", help="Run batch evaluation against live search") - batch.add_argument("--tenant-id", default="163") - batch.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) - batch.add_argument("--top-k", type=int, default=100) - batch.add_argument("--language", default="en") - batch.add_argument("--force-refresh-labels", action="store_true") - batch.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) - - audit = sub.add_parser("audit", help="Audit annotation quality for queries") - audit.add_argument("--tenant-id", default="163") - audit.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) - audit.add_argument("--top-k", type=int, default=100) - audit.add_argument("--language", default="en") - audit.add_argument("--limit-suspicious", type=int, default=5) - audit.add_argument("--force-refresh-labels", action="store_true") - audit.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) - - serve = sub.add_parser("serve", help="Serve evaluation web UI on port 6010") - serve.add_argument("--tenant-id", default="163") - serve.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) - serve.add_argument("--host", default="0.0.0.0") - serve.add_argument("--port", type=int, default=6010) - serve.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) - - return parser - - -def run_build(args: argparse.Namespace) -> None: - framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) - queries = framework.queries_from_file(Path(args.queries_file)) - summary = [] - for query in queries: - result = framework.build_query_annotation_set( - query=query, - search_depth=args.search_depth, - rerank_depth=args.rerank_depth, - annotate_search_top_k=args.annotate_search_top_k, - annotate_rerank_top_k=args.annotate_rerank_top_k, - language=args.language, - force_refresh_rerank=args.force_refresh_rerank, - force_refresh_labels=args.force_refresh_labels, - ) - summary.append( - { - "query": result.query, - "search_total": result.search_total, - "search_depth": result.search_depth, - "rerank_corpus_size": result.rerank_corpus_size, - "annotated_count": result.annotated_count, - "output_json_path": str(result.output_json_path), - } - ) - print( - f"[build] query={result.query!r} search_total={result.search_total} " - f"search_depth={result.search_depth} corpus={result.rerank_corpus_size} " - f"annotated={result.annotated_count} output={result.output_json_path}" - ) - out_path = ensure_dir(framework.artifact_root / "query_builds") / f"build_summary_{utc_timestamp()}.json" - out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") - print(f"[done] summary={out_path}") - - -def run_batch(args: argparse.Namespace) -> None: - framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) - queries = framework.queries_from_file(Path(args.queries_file)) - payload = framework.batch_evaluate( - queries=queries, - top_k=args.top_k, - auto_annotate=True, - language=args.language, - force_refresh_labels=args.force_refresh_labels, - ) - print(f"[done] batch_id={payload['batch_id']} aggregate_metrics={payload['aggregate_metrics']}") - - -def run_audit(args: argparse.Namespace) -> None: - framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) - queries = framework.queries_from_file(Path(args.queries_file)) - audit_items = [] - for query in queries: - item = framework.audit_live_query( - query=query, - top_k=args.top_k, - language=args.language, - auto_annotate=not args.force_refresh_labels, - ) - if args.force_refresh_labels: - live_payload = framework.search_client.search(query=query, size=max(args.top_k, 100), from_=0, language=args.language) - framework.annotate_missing_labels( - query=query, - docs=list(live_payload.get("results") or [])[: args.top_k], - force_refresh=True, - ) - item = framework.audit_live_query( - query=query, - top_k=args.top_k, - language=args.language, - auto_annotate=False, - ) - audit_items.append( - { - "query": query, - "metrics": item["metrics"], - "distribution": item["distribution"], - "suspicious_count": len(item["suspicious"]), - "suspicious_examples": item["suspicious"][: args.limit_suspicious], - } - ) - print( - f"[audit] query={query!r} suspicious={len(item['suspicious'])} metrics={item['metrics']}" - ) - - summary = { - "created_at": utc_now_iso(), - "tenant_id": args.tenant_id, - "top_k": args.top_k, - "query_count": len(queries), - "total_suspicious": sum(item["suspicious_count"] for item in audit_items), - "queries": audit_items, - } - out_path = ensure_dir(framework.artifact_root / "audits") / f"audit_{utc_timestamp()}.json" - out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") - print(f"[done] audit={out_path}") - - -def run_serve(args: argparse.Namespace) -> None: - framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) - app = create_web_app(framework, Path(args.queries_file)) - import uvicorn - - uvicorn.run(app, host=args.host, port=args.port, log_level="info") - - -def main() -> None: - parser = build_cli_parser() - args = parser.parse_args() - if args.command == "build": - run_build(args) - return - if args.command == "batch": - run_batch(args) - return - if args.command == "audit": - run_audit(args) - return - if args.command == "serve": - run_serve(args) - return - raise SystemExit(f"unknown command: {args.command}") - - -if __name__ == "__main__": - main() diff --git a/scripts/evaluation/eval_framework/__init__.py b/scripts/evaluation/eval_framework/__init__.py new file mode 100644 index 0000000..acbcaff --- /dev/null +++ b/scripts/evaluation/eval_framework/__init__.py @@ -0,0 +1,59 @@ +""" +Search evaluation framework: pooled relevance annotation, live metrics, batch reports. + +Importing this package ensures the project root is on ``sys.path`` (for ``api.*`` imports). +""" + +from __future__ import annotations + +from .utils import ensure_project_on_path + +ensure_project_on_path() + +from .constants import ( # noqa: E402 + DEFAULT_ARTIFACT_ROOT, + DEFAULT_LABELER_MODE, + DEFAULT_QUERY_FILE, + JUDGE_PROMPT_VERSION_COMPLEX, + JUDGE_PROMPT_VERSION_SIMPLE, + PROJECT_ROOT, + RELEVANCE_EXACT, + RELEVANCE_IRRELEVANT, + RELEVANCE_PARTIAL, + VALID_LABELS, +) +from .framework import SearchEvaluationFramework # noqa: E402 +from .store import EvalStore, QueryBuildResult # noqa: E402 +from .cli import build_cli_parser, main # noqa: E402 +from .web_app import create_web_app # noqa: E402 +from .reports import render_batch_report_markdown # noqa: E402 +from .utils import ( # noqa: E402 + ensure_dir, + sha1_text, + utc_now_iso, + utc_timestamp, +) + +__all__ = [ + "DEFAULT_ARTIFACT_ROOT", + "DEFAULT_LABELER_MODE", + "DEFAULT_QUERY_FILE", + "EvalStore", + "JUDGE_PROMPT_VERSION_COMPLEX", + "JUDGE_PROMPT_VERSION_SIMPLE", + "PROJECT_ROOT", + "QueryBuildResult", + "RELEVANCE_EXACT", + "RELEVANCE_IRRELEVANT", + "RELEVANCE_PARTIAL", + "SearchEvaluationFramework", + "VALID_LABELS", + "build_cli_parser", + "create_web_app", + "ensure_dir", + "main", + "render_batch_report_markdown", + "sha1_text", + "utc_now_iso", + "utc_timestamp", +] diff --git a/scripts/evaluation/eval_framework/__main__.py b/scripts/evaluation/eval_framework/__main__.py new file mode 100644 index 0000000..9ae637f --- /dev/null +++ b/scripts/evaluation/eval_framework/__main__.py @@ -0,0 +1,4 @@ +from .cli import main + +if __name__ == "__main__": + main() diff --git a/scripts/evaluation/eval_framework/api_models.py b/scripts/evaluation/eval_framework/api_models.py new file mode 100644 index 0000000..80b22e5 --- /dev/null +++ b/scripts/evaluation/eval_framework/api_models.py @@ -0,0 +1,22 @@ +"""Pydantic request bodies for the evaluation FastAPI app.""" + +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class SearchEvalRequest(BaseModel): + query: str + top_k: int = Field(default=100, ge=1, le=500) + auto_annotate: bool = False + language: str = "en" + + +class BatchEvalRequest(BaseModel): + queries: Optional[List[str]] = None + top_k: int = Field(default=100, ge=1, le=500) + auto_annotate: bool = False + language: str = "en" + force_refresh_labels: bool = False diff --git a/scripts/evaluation/eval_framework/cli.py b/scripts/evaluation/eval_framework/cli.py new file mode 100644 index 0000000..c3a55bb --- /dev/null +++ b/scripts/evaluation/eval_framework/cli.py @@ -0,0 +1,179 @@ +"""CLI: build annotations, batch eval, audit, serve web UI.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from .constants import DEFAULT_LABELER_MODE, DEFAULT_QUERY_FILE +from .framework import SearchEvaluationFramework +from .utils import ensure_dir, utc_now_iso, utc_timestamp +from .web_app import create_web_app + + +def build_cli_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Search evaluation annotation builder and web UI") + sub = parser.add_subparsers(dest="command", required=True) + + build = sub.add_parser("build", help="Build pooled annotation set for queries") + build.add_argument("--tenant-id", default="163") + build.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + build.add_argument("--search-depth", type=int, default=1000) + build.add_argument("--rerank-depth", type=int, default=10000) + build.add_argument("--annotate-search-top-k", type=int, default=120) + build.add_argument("--annotate-rerank-top-k", type=int, default=200) + build.add_argument("--language", default="en") + build.add_argument("--force-refresh-rerank", action="store_true") + build.add_argument("--force-refresh-labels", action="store_true") + build.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) + + batch = sub.add_parser("batch", help="Run batch evaluation against live search") + batch.add_argument("--tenant-id", default="163") + batch.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + batch.add_argument("--top-k", type=int, default=100) + batch.add_argument("--language", default="en") + batch.add_argument("--force-refresh-labels", action="store_true") + batch.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) + + audit = sub.add_parser("audit", help="Audit annotation quality for queries") + audit.add_argument("--tenant-id", default="163") + audit.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + audit.add_argument("--top-k", type=int, default=100) + audit.add_argument("--language", default="en") + audit.add_argument("--limit-suspicious", type=int, default=5) + audit.add_argument("--force-refresh-labels", action="store_true") + audit.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) + + serve = sub.add_parser("serve", help="Serve evaluation web UI on port 6010") + serve.add_argument("--tenant-id", default="163") + serve.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + serve.add_argument("--host", default="0.0.0.0") + serve.add_argument("--port", type=int, default=6010) + serve.add_argument("--labeler-mode", default=DEFAULT_LABELER_MODE, choices=["simple", "complex"]) + + return parser + + +def run_build(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) + queries = framework.queries_from_file(Path(args.queries_file)) + summary = [] + for query in queries: + result = framework.build_query_annotation_set( + query=query, + search_depth=args.search_depth, + rerank_depth=args.rerank_depth, + annotate_search_top_k=args.annotate_search_top_k, + annotate_rerank_top_k=args.annotate_rerank_top_k, + language=args.language, + force_refresh_rerank=args.force_refresh_rerank, + force_refresh_labels=args.force_refresh_labels, + ) + summary.append( + { + "query": result.query, + "search_total": result.search_total, + "search_depth": result.search_depth, + "rerank_corpus_size": result.rerank_corpus_size, + "annotated_count": result.annotated_count, + "output_json_path": str(result.output_json_path), + } + ) + print( + f"[build] query={result.query!r} search_total={result.search_total} " + f"search_depth={result.search_depth} corpus={result.rerank_corpus_size} " + f"annotated={result.annotated_count} output={result.output_json_path}" + ) + out_path = ensure_dir(framework.artifact_root / "query_builds") / f"build_summary_{utc_timestamp()}.json" + out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"[done] summary={out_path}") + + +def run_batch(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) + queries = framework.queries_from_file(Path(args.queries_file)) + payload = framework.batch_evaluate( + queries=queries, + top_k=args.top_k, + auto_annotate=True, + language=args.language, + force_refresh_labels=args.force_refresh_labels, + ) + print(f"[done] batch_id={payload['batch_id']} aggregate_metrics={payload['aggregate_metrics']}") + + +def run_audit(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) + queries = framework.queries_from_file(Path(args.queries_file)) + audit_items = [] + for query in queries: + item = framework.audit_live_query( + query=query, + top_k=args.top_k, + language=args.language, + auto_annotate=not args.force_refresh_labels, + ) + if args.force_refresh_labels: + live_payload = framework.search_client.search(query=query, size=max(args.top_k, 100), from_=0, language=args.language) + framework.annotate_missing_labels( + query=query, + docs=list(live_payload.get("results") or [])[: args.top_k], + force_refresh=True, + ) + item = framework.audit_live_query( + query=query, + top_k=args.top_k, + language=args.language, + auto_annotate=False, + ) + audit_items.append( + { + "query": query, + "metrics": item["metrics"], + "distribution": item["distribution"], + "suspicious_count": len(item["suspicious"]), + "suspicious_examples": item["suspicious"][: args.limit_suspicious], + } + ) + print( + f"[audit] query={query!r} suspicious={len(item['suspicious'])} metrics={item['metrics']}" + ) + + summary = { + "created_at": utc_now_iso(), + "tenant_id": args.tenant_id, + "top_k": args.top_k, + "query_count": len(queries), + "total_suspicious": sum(item["suspicious_count"] for item in audit_items), + "queries": audit_items, + } + out_path = ensure_dir(framework.artifact_root / "audits") / f"audit_{utc_timestamp()}.json" + out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"[done] audit={out_path}") + + +def run_serve(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id, labeler_mode=args.labeler_mode) + app = create_web_app(framework, Path(args.queries_file)) + import uvicorn + + uvicorn.run(app, host=args.host, port=args.port, log_level="info") + + +def main() -> None: + parser = build_cli_parser() + args = parser.parse_args() + if args.command == "build": + run_build(args) + return + if args.command == "batch": + run_batch(args) + return + if args.command == "audit": + run_audit(args) + return + if args.command == "serve": + run_serve(args) + return + raise SystemExit(f"unknown command: {args.command}") diff --git a/scripts/evaluation/eval_framework/clients.py b/scripts/evaluation/eval_framework/clients.py new file mode 100644 index 0000000..a7a5065 --- /dev/null +++ b/scripts/evaluation/eval_framework/clients.py @@ -0,0 +1,149 @@ +"""HTTP clients for search API, reranker, and DashScope chat (relevance labeling).""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import requests + +from .constants import VALID_LABELS +from .prompts import ( + classify_batch_complex_prompt, + classify_batch_simple_prompt, + extract_query_profile_prompt, +) +from .utils import build_label_doc_line, extract_json_blob, safe_json_dumps + + +class SearchServiceClient: + def __init__(self, base_url: str, tenant_id: str): + self.base_url = base_url.rstrip("/") + self.tenant_id = str(tenant_id) + self.session = requests.Session() + + def search(self, query: str, size: int, from_: int = 0, language: str = "en") -> Dict[str, Any]: + response = self.session.post( + f"{self.base_url}/search/", + headers={"Content-Type": "application/json", "X-Tenant-ID": self.tenant_id}, + json={"query": query, "size": size, "from": from_, "language": language}, + timeout=120, + ) + response.raise_for_status() + return response.json() + + +class RerankServiceClient: + def __init__(self, service_url: str): + self.service_url = service_url.rstrip("/") + self.session = requests.Session() + + def rerank(self, query: str, docs: Sequence[str], normalize: bool = False, top_n: Optional[int] = None) -> Tuple[List[float], Dict[str, Any]]: + payload: Dict[str, Any] = { + "query": query, + "docs": list(docs), + "normalize": normalize, + } + if top_n is not None: + payload["top_n"] = int(top_n) + response = self.session.post(self.service_url, json=payload, timeout=180) + response.raise_for_status() + data = response.json() + return list(data.get("scores") or []), dict(data.get("meta") or {}) + + +class DashScopeLabelClient: + def __init__(self, model: str, base_url: str, api_key: str, batch_size: int = 40): + self.model = model + self.base_url = base_url.rstrip("/") + self.api_key = api_key + self.batch_size = int(batch_size) + self.session = requests.Session() + + def _chat(self, prompt: str) -> Tuple[str, str]: + response = self.session.post( + f"{self.base_url}/chat/completions", + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + }, + json={ + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0, + "top_p": 0.1, + }, + timeout=180, + ) + response.raise_for_status() + data = response.json() + content = str(((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() + return content, safe_json_dumps(data) + + def classify_batch_simple( + self, + query: str, + docs: Sequence[Dict[str, Any]], + ) -> Tuple[List[str], str]: + numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] + prompt = classify_batch_simple_prompt(query, numbered_docs) + content, raw_response = self._chat(prompt) + labels = [] + for line in str(content or "").splitlines(): + label = line.strip() + if label in VALID_LABELS: + labels.append(label) + if len(labels) != len(docs): + payload = extract_json_blob(content) + if isinstance(payload, dict) and isinstance(payload.get("labels"), list): + labels = [] + for item in payload["labels"][: len(docs)]: + if isinstance(item, dict): + label = str(item.get("label") or "").strip() + else: + label = str(item).strip() + if label in VALID_LABELS: + labels.append(label) + if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): + raise ValueError(f"unexpected simple label output: {content!r}") + return labels, raw_response + + def extract_query_profile( + self, + query: str, + parser_hints: Dict[str, Any], + ) -> Tuple[Dict[str, Any], str]: + prompt = extract_query_profile_prompt(query, parser_hints) + content, raw_response = self._chat(prompt) + payload = extract_json_blob(content) + if not isinstance(payload, dict): + raise ValueError(f"unexpected query profile payload: {content!r}") + payload.setdefault("normalized_query_en", query) + payload.setdefault("primary_category", "") + payload.setdefault("allowed_categories", []) + payload.setdefault("required_attributes", []) + payload.setdefault("notes", []) + return payload, raw_response + + def classify_batch_complex( + self, + query: str, + query_profile: Dict[str, Any], + docs: Sequence[Dict[str, Any]], + ) -> Tuple[List[str], str]: + numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] + prompt = classify_batch_complex_prompt(query, query_profile, numbered_docs) + content, raw_response = self._chat(prompt) + payload = extract_json_blob(content) + if not isinstance(payload, dict) or not isinstance(payload.get("labels"), list): + raise ValueError(f"unexpected label payload: {content!r}") + labels_payload = payload["labels"] + labels: List[str] = [] + for item in labels_payload[: len(docs)]: + if not isinstance(item, dict): + continue + label = str(item.get("label") or "").strip() + if label in VALID_LABELS: + labels.append(label) + if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): + raise ValueError(f"unexpected label output: {content!r}") + return labels, raw_response diff --git a/scripts/evaluation/eval_framework/constants.py b/scripts/evaluation/eval_framework/constants.py new file mode 100644 index 0000000..ad6496d --- /dev/null +++ b/scripts/evaluation/eval_framework/constants.py @@ -0,0 +1,19 @@ +"""Paths and shared constants for search evaluation.""" + +from pathlib import Path + +_PKG_DIR = Path(__file__).resolve().parent +_SCRIPTS_EVAL_DIR = _PKG_DIR.parent +PROJECT_ROOT = _SCRIPTS_EVAL_DIR.parents[1] + +RELEVANCE_EXACT = "Exact" +RELEVANCE_PARTIAL = "Partial" +RELEVANCE_IRRELEVANT = "Irrelevant" +VALID_LABELS = {RELEVANCE_EXACT, RELEVANCE_PARTIAL, RELEVANCE_IRRELEVANT} + +DEFAULT_ARTIFACT_ROOT = PROJECT_ROOT / "artifacts" / "search_evaluation" +DEFAULT_QUERY_FILE = _SCRIPTS_EVAL_DIR / "queries" / "queries.txt" + +JUDGE_PROMPT_VERSION_SIMPLE = "v3_simple_20260331" +JUDGE_PROMPT_VERSION_COMPLEX = "v2_structured_20260331" +DEFAULT_LABELER_MODE = "simple" diff --git a/scripts/evaluation/eval_framework/framework.py b/scripts/evaluation/eval_framework/framework.py new file mode 100644 index 0000000..0e0c5a0 --- /dev/null +++ b/scripts/evaluation/eval_framework/framework.py @@ -0,0 +1,719 @@ +"""Core orchestration: corpus, rerank, LLM labels, live/batch evaluation.""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any, Dict, List, Sequence, Tuple + +import requests +from elasticsearch.helpers import scan + +from api.app import get_app_config, get_es_client, get_query_parser, init_service +from indexer.mapping_generator import get_tenant_index_name + +from .clients import DashScopeLabelClient, RerankServiceClient, SearchServiceClient +from .constants import ( + DEFAULT_ARTIFACT_ROOT, + DEFAULT_LABELER_MODE, + JUDGE_PROMPT_VERSION_COMPLEX, + RELEVANCE_EXACT, + RELEVANCE_IRRELEVANT, + RELEVANCE_PARTIAL, + VALID_LABELS, +) +from .metrics import aggregate_metrics, compute_query_metrics, label_distribution +from .reports import render_batch_report_markdown +from .store import EvalStore, QueryBuildResult +from .utils import ( + build_display_title, + build_rerank_doc, + compact_option_values, + compact_product_payload, + ensure_dir, + normalize_text, + pick_text, + sha1_text, + utc_now_iso, + utc_timestamp, +) + + +class SearchEvaluationFramework: + def __init__( + self, + tenant_id: str, + artifact_root: Path = DEFAULT_ARTIFACT_ROOT, + search_base_url: str = "http://localhost:6002", + labeler_mode: str = DEFAULT_LABELER_MODE, + ): + init_service(get_app_config().infrastructure.elasticsearch.host) + self.tenant_id = str(tenant_id) + self.artifact_root = ensure_dir(artifact_root) + self.labeler_mode = str(labeler_mode).strip().lower() or DEFAULT_LABELER_MODE + self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") + self.search_client = SearchServiceClient(search_base_url, self.tenant_id) + app_cfg = get_app_config() + rerank_service_url = str( + app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] + ) + self.rerank_client = RerankServiceClient(rerank_service_url) + llm_cfg = app_cfg.services.translation.capabilities["llm"] + api_key = app_cfg.infrastructure.secrets.dashscope_api_key + if not api_key: + raise RuntimeError("dashscope_api_key is required for search evaluation annotation") + self.label_client = DashScopeLabelClient( + model=str(llm_cfg["model"]), + base_url=str(llm_cfg["base_url"]), + api_key=str(api_key), + ) + self.query_parser = None + + def _get_query_parser(self): + if self.query_parser is None: + self.query_parser = get_query_parser() + return self.query_parser + + def build_query_parser_hints(self, query: str) -> Dict[str, Any]: + parsed = self._get_query_parser().parse(query, generate_vector=False, target_languages=["en", "zh"]) + payload = parsed.to_dict() + payload["text_for_rerank"] = parsed.text_for_rerank() + return payload + + def get_query_profile(self, query: str, force_refresh: bool = False) -> Dict[str, Any]: + if self.labeler_mode != "complex": + raise RuntimeError("query profiles are only used in complex labeler mode") + if not force_refresh: + cached = self.store.get_query_profile(self.tenant_id, query, JUDGE_PROMPT_VERSION_COMPLEX) + if cached is not None: + return cached + parser_hints = self.build_query_parser_hints(query) + profile, raw_response = self.label_client.extract_query_profile(query, parser_hints) + profile["parser_hints"] = parser_hints + self.store.upsert_query_profile( + self.tenant_id, + query, + JUDGE_PROMPT_VERSION_COMPLEX, + self.label_client.model, + profile, + raw_response, + ) + return profile + + @staticmethod + def _doc_evidence_text(doc: Dict[str, Any]) -> str: + pieces: List[str] = [ + build_display_title(doc), + pick_text(doc.get("vendor"), "en"), + pick_text(doc.get("category_path"), "en"), + pick_text(doc.get("category_name"), "en"), + ] + for sku in doc.get("skus") or []: + pieces.extend( + [ + str(sku.get("option1_value") or ""), + str(sku.get("option2_value") or ""), + str(sku.get("option3_value") or ""), + ] + ) + for tag in doc.get("tags") or []: + pieces.append(str(tag)) + return normalize_text(" | ".join(piece for piece in pieces if piece)) + + def _apply_rule_based_label_guardrails( + self, + label: str, + query_profile: Dict[str, Any], + doc: Dict[str, Any], + ) -> str: + if label not in VALID_LABELS: + return label + evidence = self._doc_evidence_text(doc) + category = normalize_text(query_profile.get("primary_category")) + allowed_categories = [normalize_text(item) for item in query_profile.get("allowed_categories") or [] if str(item).strip()] + + primary_category_match = True + if category: + primary_category_match = category in evidence + allowed_category_match = True + if allowed_categories: + allowed_category_match = any(signal in evidence for signal in allowed_categories) + + if label == RELEVANCE_EXACT and not primary_category_match: + if allowed_category_match: + label = RELEVANCE_PARTIAL + else: + return RELEVANCE_IRRELEVANT + + for attr in query_profile.get("required_attributes") or []: + if not isinstance(attr, dict): + continue + attr_name = normalize_text(attr.get("name")) + if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style", "waist_style", "rise"}: + continue + required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] + conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] + if attr_name == "fit": + if any(term in {"oversized", "oversize"} for term in required_terms): + conflicting_terms.extend(["slim", "slimming", "fitted", "tight", "close-fitting"]) + if any(term in {"fitted", "slim fit", "tight"} for term in required_terms): + conflicting_terms.extend(["oversized", "oversize", "loose", "relaxed"]) + has_required = any(term in evidence for term in required_terms) if required_terms else True + has_conflict = any(term in evidence for term in conflicting_terms) + + if has_conflict: + return RELEVANCE_IRRELEVANT + if label == RELEVANCE_EXACT and not has_required: + label = RELEVANCE_PARTIAL + + if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: + return RELEVANCE_IRRELEVANT + + return label + + @staticmethod + def _result_item_to_doc(item: Dict[str, Any]) -> Dict[str, Any]: + option_values = list(item.get("option_values") or []) + while len(option_values) < 3: + option_values.append("") + product = dict(item.get("product") or {}) + return { + "spu_id": item.get("spu_id"), + "title": product.get("title") or item.get("title"), + "vendor": product.get("vendor"), + "category_path": product.get("category"), + "category_name": product.get("category"), + "image_url": item.get("image_url") or product.get("image_url"), + "tags": product.get("tags") or [], + "skus": [ + { + "option1_value": option_values[0], + "option2_value": option_values[1], + "option3_value": option_values[2], + } + ], + } + + def _collect_label_issues( + self, + label: str, + query_profile: Dict[str, Any], + doc: Dict[str, Any], + ) -> List[str]: + evidence = self._doc_evidence_text(doc) + issues: List[str] = [] + category = normalize_text(query_profile.get("primary_category")) + allowed_categories = [ + normalize_text(item) + for item in query_profile.get("allowed_categories") or [] + if str(item).strip() + ] + + primary_category_match = True if not category else category in evidence + allowed_category_match = False if allowed_categories else primary_category_match + if allowed_categories: + allowed_category_match = any(signal in evidence for signal in allowed_categories) + + if label == RELEVANCE_EXACT and not primary_category_match: + if allowed_category_match: + issues.append("Exact missing primary category evidence") + else: + issues.append("Exact has category mismatch") + + if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: + issues.append("Partial has category mismatch") + + for attr in query_profile.get("required_attributes") or []: + if not isinstance(attr, dict): + continue + attr_name = normalize_text(attr.get("name")) + if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style"}: + continue + required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] + conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] + has_required = any(term in evidence for term in required_terms) if required_terms else True + has_conflict = any(term in evidence for term in conflicting_terms) + + if has_conflict and label != RELEVANCE_IRRELEVANT: + issues.append(f"{label} conflicts on {attr_name}") + if label == RELEVANCE_EXACT and not has_required: + issues.append(f"Exact missing {attr_name}") + return issues + + def audit_live_query( + self, + query: str, + *, + top_k: int = 100, + language: str = "en", + auto_annotate: bool = False, + ) -> Dict[str, Any]: + live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) + if self.labeler_mode != "complex": + labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in live["results"] + ] + return { + "query": query, + "tenant_id": self.tenant_id, + "top_k": top_k, + "metrics": live["metrics"], + "distribution": label_distribution(labels), + "query_profile": None, + "suspicious": [], + "results": live["results"], + } + query_profile = self.get_query_profile(query, force_refresh=False) + suspicious: List[Dict[str, Any]] = [] + + for item in live["results"]: + doc = self._result_item_to_doc(item) + issues = self._collect_label_issues(item["label"] or "", query_profile, doc) + suggested_label = self._apply_rule_based_label_guardrails(item["label"] or "", query_profile, doc) + if suggested_label != (item["label"] or ""): + issues = list(issues) + [f"Suggested relabel: {item['label']} -> {suggested_label}"] + if issues: + suspicious.append( + { + "rank": item["rank"], + "spu_id": item["spu_id"], + "title": item["title"], + "label": item["label"], + "suggested_label": suggested_label, + "issues": issues, + } + ) + + labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in live["results"] + ] + return { + "query": query, + "tenant_id": self.tenant_id, + "top_k": top_k, + "metrics": live["metrics"], + "distribution": label_distribution(labels), + "query_profile": query_profile, + "suspicious": suspicious, + "results": live["results"], + } + + def queries_from_file(self, path: Path) -> List[str]: + return [ + line.strip() + for line in path.read_text(encoding="utf-8").splitlines() + if line.strip() and not line.strip().startswith("#") + ] + + def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: + if not refresh and self.store.has_corpus(self.tenant_id): + return self.store.get_corpus_docs(self.tenant_id) + + es_client = get_es_client().client + index_name = get_tenant_index_name(self.tenant_id) + docs: List[Dict[str, Any]] = [] + for hit in scan( + client=es_client, + index=index_name, + query={ + "_source": [ + "spu_id", + "title", + "vendor", + "category_path", + "category_name", + "image_url", + "skus", + "tags", + ], + "query": {"match_all": {}}, + }, + size=500, + preserve_order=False, + clear_scroll=True, + ): + source = dict(hit.get("_source") or {}) + source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") + docs.append(source) + self.store.upsert_corpus_docs(self.tenant_id, docs) + return docs + + def full_corpus_rerank( + self, + query: str, + docs: Sequence[Dict[str, Any]], + batch_size: int = 24, + force_refresh: bool = False, + ) -> List[Dict[str, Any]]: + cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) + pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] + if pending: + new_scores: Dict[str, float] = {} + for start in range(0, len(pending), batch_size): + batch = pending[start : start + batch_size] + scores = self._rerank_batch_with_retry(query=query, docs=batch) + if len(scores) != len(batch): + raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") + for doc, score in zip(batch, scores): + new_scores[str(doc.get("spu_id"))] = float(score) + self.store.upsert_rerank_scores( + self.tenant_id, + query, + new_scores, + model_name="qwen3_vllm_score", + ) + cached.update(new_scores) + + ranked = [] + for doc in docs: + spu_id = str(doc.get("spu_id")) + ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) + ranked.sort(key=lambda item: item["score"], reverse=True) + return ranked + + def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: + if not docs: + return [] + doc_texts = [build_rerank_doc(doc) for doc in docs] + try: + scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) + return scores + except Exception: + if len(docs) == 1: + return [-1.0] + if len(docs) <= 6: + scores: List[float] = [] + for doc in docs: + scores.extend(self._rerank_batch_with_retry(query, [doc])) + return scores + mid = len(docs) // 2 + left = self._rerank_batch_with_retry(query, docs[:mid]) + right = self._rerank_batch_with_retry(query, docs[mid:]) + return left + right + + def annotate_missing_labels( + self, + query: str, + docs: Sequence[Dict[str, Any]], + force_refresh: bool = False, + ) -> Dict[str, str]: + labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) + missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] + if not missing_docs: + return labels + + for start in range(0, len(missing_docs), self.label_client.batch_size): + batch = missing_docs[start : start + self.label_client.batch_size] + batch_pairs = self._classify_with_retry(query, batch, force_refresh=force_refresh) + for sub_labels, raw_response, sub_batch in batch_pairs: + to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} + self.store.upsert_labels( + self.tenant_id, + query, + to_store, + judge_model=self.label_client.model, + raw_response=raw_response, + ) + labels.update(to_store) + time.sleep(0.1) + return labels + + def _classify_with_retry( + self, + query: str, + docs: Sequence[Dict[str, Any]], + *, + force_refresh: bool = False, + ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: + if not docs: + return [] + try: + if self.labeler_mode == "complex": + query_profile = self.get_query_profile(query, force_refresh=force_refresh) + labels, raw_response = self.label_client.classify_batch_complex(query, query_profile, docs) + labels = [ + self._apply_rule_based_label_guardrails(label, query_profile, doc) + for doc, label in zip(docs, labels) + ] + else: + labels, raw_response = self.label_client.classify_batch_simple(query, docs) + return [(labels, raw_response, docs)] + except Exception: + if len(docs) == 1: + raise + mid = len(docs) // 2 + return self._classify_with_retry(query, docs[:mid], force_refresh=force_refresh) + self._classify_with_retry(query, docs[mid:], force_refresh=force_refresh) + + def build_query_annotation_set( + self, + query: str, + *, + search_depth: int = 1000, + rerank_depth: int = 10000, + annotate_search_top_k: int = 120, + annotate_rerank_top_k: int = 200, + language: str = "en", + force_refresh_rerank: bool = False, + force_refresh_labels: bool = False, + ) -> QueryBuildResult: + search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) + search_results = list(search_payload.get("results") or []) + corpus = self.corpus_docs(refresh=False) + full_rerank = self.full_corpus_rerank( + query=query, + docs=corpus, + force_refresh=force_refresh_rerank, + ) + rerank_depth_effective = min(rerank_depth, len(full_rerank)) + + pool_docs: Dict[str, Dict[str, Any]] = {} + for doc in search_results[:annotate_search_top_k]: + pool_docs[str(doc.get("spu_id"))] = doc + for item in full_rerank[:annotate_rerank_top_k]: + pool_docs[str(item["spu_id"])] = item["doc"] + + labels = self.annotate_missing_labels( + query=query, + docs=list(pool_docs.values()), + force_refresh=force_refresh_labels, + ) + + search_labeled_results: List[Dict[str, Any]] = [] + for rank, doc in enumerate(search_results, start=1): + spu_id = str(doc.get("spu_id")) + label = labels.get(spu_id) + search_labeled_results.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "rerank_score": None, + "label": label, + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + + rerank_top_results: List[Dict[str, Any]] = [] + for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): + doc = item["doc"] + spu_id = str(item["spu_id"]) + rerank_top_results.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "rerank_score": round(float(item["score"]), 8), + "label": labels.get(spu_id), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + + top100_labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in search_labeled_results[:100] + ] + metrics = compute_query_metrics(top100_labels) + output_dir = ensure_dir(self.artifact_root / "query_builds") + run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" + output_json_path = output_dir / f"{run_id}.json" + payload = { + "run_id": run_id, + "created_at": utc_now_iso(), + "tenant_id": self.tenant_id, + "query": query, + "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), + "search_total": int(search_payload.get("total") or 0), + "search_depth_requested": search_depth, + "search_depth_effective": len(search_results), + "rerank_depth_requested": rerank_depth, + "rerank_depth_effective": rerank_depth_effective, + "corpus_size": len(corpus), + "annotation_pool": { + "annotate_search_top_k": annotate_search_top_k, + "annotate_rerank_top_k": annotate_rerank_top_k, + "pool_size": len(pool_docs), + }, + "labeler_mode": self.labeler_mode, + "query_profile": self.get_query_profile(query, force_refresh=force_refresh_labels) if self.labeler_mode == "complex" else None, + "metrics_top100": metrics, + "search_results": search_labeled_results, + "full_rerank_top": rerank_top_results, + } + output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) + return QueryBuildResult( + query=query, + tenant_id=self.tenant_id, + search_total=int(search_payload.get("total") or 0), + search_depth=len(search_results), + rerank_corpus_size=len(corpus), + annotated_count=len(pool_docs), + output_json_path=output_json_path, + ) + + def evaluate_live_query( + self, + query: str, + top_k: int = 100, + auto_annotate: bool = False, + language: str = "en", + force_refresh_labels: bool = False, + ) -> Dict[str, Any]: + search_payload = self.search_client.search(query=query, size=max(top_k, 100), from_=0, language=language) + results = list(search_payload.get("results") or []) + if auto_annotate: + self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) + labels = self.store.get_labels(self.tenant_id, query) + recalled_spu_ids = {str(doc.get("spu_id")) for doc in results[:top_k]} + labeled = [] + unlabeled_hits = 0 + for rank, doc in enumerate(results[:top_k], start=1): + spu_id = str(doc.get("spu_id")) + label = labels.get(spu_id) + if label not in VALID_LABELS: + unlabeled_hits += 1 + labeled.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "label": label, + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + metric_labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in labeled + ] + label_stats = self.store.get_query_label_stats(self.tenant_id, query) + rerank_scores = self.store.get_rerank_scores(self.tenant_id, query) + relevant_missing_ids = [ + spu_id + for spu_id, label in labels.items() + if label in {RELEVANCE_EXACT, RELEVANCE_PARTIAL} and spu_id not in recalled_spu_ids + ] + missing_docs_map = self.store.get_corpus_docs_by_spu_ids(self.tenant_id, relevant_missing_ids) + missing_relevant = [] + for spu_id in relevant_missing_ids: + doc = missing_docs_map.get(spu_id) + if not doc: + continue + missing_relevant.append( + { + "spu_id": spu_id, + "label": labels[spu_id], + "rerank_score": rerank_scores.get(spu_id), + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + label_order = {RELEVANCE_EXACT: 0, RELEVANCE_PARTIAL: 1, RELEVANCE_IRRELEVANT: 2} + missing_relevant.sort( + key=lambda item: ( + label_order.get(str(item.get("label")), 9), + -(float(item.get("rerank_score")) if item.get("rerank_score") is not None else float("-inf")), + str(item.get("title") or ""), + ) + ) + tips: List[str] = [] + if auto_annotate: + tips.append("Single-query evaluation used cached labels and refreshed missing labels for recalled results.") + else: + tips.append("Single-query evaluation used the offline annotation cache only; recalled SPUs without cached labels were treated as Irrelevant.") + if label_stats["total"] == 0: + tips.append("This query has no offline annotation set yet. Build or refresh labels first if you want stable evaluation.") + if unlabeled_hits: + tips.append(f"{unlabeled_hits} recalled results were not in the annotation set and were counted as Irrelevant.") + if not missing_relevant: + tips.append("No cached Exact/Partial products were missed by this recall set.") + return { + "query": query, + "tenant_id": self.tenant_id, + "top_k": top_k, + "metrics": compute_query_metrics(metric_labels), + "results": labeled, + "missing_relevant": missing_relevant, + "label_stats": { + **label_stats, + "unlabeled_hits_treated_irrelevant": unlabeled_hits, + "recalled_hits": len(labeled), + "missing_relevant_count": len(missing_relevant), + "missing_exact_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_EXACT), + "missing_partial_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_PARTIAL), + }, + "tips": tips, + "total": int(search_payload.get("total") or 0), + } + + def batch_evaluate( + self, + queries: Sequence[str], + *, + top_k: int = 100, + auto_annotate: bool = True, + language: str = "en", + force_refresh_labels: bool = False, + ) -> Dict[str, Any]: + per_query = [] + for query in queries: + live = self.evaluate_live_query( + query, + top_k=top_k, + auto_annotate=auto_annotate, + language=language, + force_refresh_labels=force_refresh_labels, + ) + labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in live["results"] + ] + per_query.append( + { + "query": live["query"], + "tenant_id": live["tenant_id"], + "top_k": live["top_k"], + "metrics": live["metrics"], + "distribution": label_distribution(labels), + "total": live["total"], + } + ) + aggregate = aggregate_metrics([item["metrics"] for item in per_query]) + aggregate_distribution = { + RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), + RELEVANCE_PARTIAL: sum(item["distribution"][RELEVANCE_PARTIAL] for item in per_query), + RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), + } + batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" + report_dir = ensure_dir(self.artifact_root / "batch_reports") + config_snapshot_path = report_dir / f"{batch_id}_config.json" + config_snapshot = requests.get("http://localhost:6002/admin/config", timeout=20).json() + config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") + output_json_path = report_dir / f"{batch_id}.json" + report_md_path = report_dir / f"{batch_id}.md" + payload = { + "batch_id": batch_id, + "created_at": utc_now_iso(), + "tenant_id": self.tenant_id, + "queries": list(queries), + "top_k": top_k, + "aggregate_metrics": aggregate, + "aggregate_distribution": aggregate_distribution, + "per_query": per_query, + "config_snapshot_path": str(config_snapshot_path), + } + output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") + self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) + return payload + diff --git a/scripts/evaluation/eval_framework/metrics.py b/scripts/evaluation/eval_framework/metrics.py new file mode 100644 index 0000000..b6f5681 --- /dev/null +++ b/scripts/evaluation/eval_framework/metrics.py @@ -0,0 +1,58 @@ +"""IR metrics for labeled result lists.""" + +from __future__ import annotations + +from typing import Dict, Sequence + +from .constants import RELEVANCE_EXACT, RELEVANCE_IRRELEVANT, RELEVANCE_PARTIAL + + +def precision_at_k(labels: Sequence[str], k: int, relevant: Sequence[str]) -> float: + if k <= 0: + return 0.0 + sliced = list(labels[:k]) + if not sliced: + return 0.0 + hits = sum(1 for label in sliced if label in relevant) + return hits / float(min(k, len(sliced))) + + +def average_precision(labels: Sequence[str], relevant: Sequence[str]) -> float: + hit_count = 0 + precision_sum = 0.0 + for idx, label in enumerate(labels, start=1): + if label not in relevant: + continue + hit_count += 1 + precision_sum += hit_count / idx + if hit_count == 0: + return 0.0 + return precision_sum / hit_count + + +def compute_query_metrics(labels: Sequence[str]) -> Dict[str, float]: + metrics: Dict[str, float] = {} + for k in (5, 10, 20, 50): + metrics[f"P@{k}"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT]), 6) + metrics[f"P@{k}_2_3"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) + metrics["MAP_3"] = round(average_precision(labels, [RELEVANCE_EXACT]), 6) + metrics["MAP_2_3"] = round(average_precision(labels, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) + return metrics + + +def aggregate_metrics(metric_items: Sequence[Dict[str, float]]) -> Dict[str, float]: + if not metric_items: + return {} + keys = sorted(metric_items[0].keys()) + return { + key: round(sum(float(item.get(key, 0.0)) for item in metric_items) / len(metric_items), 6) + for key in keys + } + + +def label_distribution(labels: Sequence[str]) -> Dict[str, int]: + return { + RELEVANCE_EXACT: sum(1 for label in labels if label == RELEVANCE_EXACT), + RELEVANCE_PARTIAL: sum(1 for label in labels if label == RELEVANCE_PARTIAL), + RELEVANCE_IRRELEVANT: sum(1 for label in labels if label == RELEVANCE_IRRELEVANT), + } diff --git a/scripts/evaluation/eval_framework/prompts.py b/scripts/evaluation/eval_framework/prompts.py new file mode 100644 index 0000000..8b6731b --- /dev/null +++ b/scripts/evaluation/eval_framework/prompts.py @@ -0,0 +1,89 @@ +"""LLM prompt templates for relevance judging (keep wording changes here).""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Sequence + + +def classify_batch_simple_prompt(query: str, numbered_doc_lines: Sequence[str]) -> str: + lines = "\n".join(numbered_doc_lines) + n = len(numbered_doc_lines) + return ( + "You are an e-commerce search result relevance evaluation assistant. " + "Based on the user query and each product's information, output the relevance level for each product.\n\n" + "## Relevance Level Criteria\n" + "Exact — Fully matches the user's search intent.\n" + "Partial — Primary intent satisfied (same category or similar use, basically aligns with search intent), " + "but secondary attributes such as color, style, size, fit, length, or material deviate from or cannot be confirmed.\n" + "Irrelevant — Category or use case mismatched, primary intent not satisfied.\n\n" + "Additional judging guidance:\n" + "- If the query clearly names a product type, product type matching has the highest priority. " + "Dress vs skirt vs jumpsuit, jeans vs pants, T-shirt vs blouse, cardigan vs sweater, boots vs shoes, " + "bra vs top, backpack vs bag are not interchangeable.\n" + "- When the query clearly specifies a concrete product type, a different product type should usually be Irrelevant, not Partial.\n" + "- If an attribute looks missing or uncertain, prefer Partial instead of Exact.\n" + "- Do not guess missing attributes.\n" + "- Graphic, slogan, holiday, memorial, or message tees are not Exact for a plain tee query unless that graphic/theme is requested.\n" + "- Be conservative with Exact.\n\n" + f"Query: {query}\n\n" + "Products:\n" + f"{lines}\n\n" + "## Output Format\n" + f"Strictly output {n} lines, each line containing exactly one of Exact / Partial / Irrelevant. " + "They must correspond sequentially to the products above. Do not output any other information.\n" + ) + + +def extract_query_profile_prompt(query: str, parser_hints: Dict[str, Any]) -> str: + hints_json = json.dumps(parser_hints, ensure_ascii=False) + return ( + "You are building a structured intent profile for e-commerce relevance judging.\n" + "Use the original user query as the source of truth. Parser hints may help, but if a hint conflicts with the original query, trust the original query.\n" + "Be conservative: only mark an attribute as required if the user explicitly asked for it.\n\n" + "Return JSON with this schema:\n" + "{\n" + ' "normalized_query_en": string,\n' + ' "primary_category": string,\n' + ' "allowed_categories": [string],\n' + ' "required_attributes": [\n' + ' {"name": string, "required_terms": [string], "conflicting_terms": [string], "match_mode": "explicit"}\n' + " ],\n" + ' "notes": [string]\n' + "}\n\n" + "Guidelines:\n" + "- Exact later will require explicit evidence for all required attributes.\n" + "- allowed_categories should contain only near-synonyms of the same product type, not substitutes. For example dress can allow midi dress/cocktail dress, but not skirt, top, jumpsuit, or outfit unless the query explicitly asks for them.\n" + "- If the query asks for dress/skirt/jeans/t-shirt, near but different product types are not Exact.\n" + "- If the query includes color, fit, silhouette, or length, include them as required_attributes.\n" + "- For fit words, include conflicting terms when obvious, e.g. fitted conflicts with oversized/loose; oversized conflicts with fitted/tight.\n" + "- For color, include conflicting colors only when clear from the query.\n\n" + f"Original query: {query}\n" + f"Parser hints JSON: {hints_json}\n" + ) + + +def classify_batch_complex_prompt( + query: str, + query_profile: Dict[str, Any], + numbered_doc_lines: Sequence[str], +) -> str: + lines = "\n".join(numbered_doc_lines) + profile_json = json.dumps(query_profile, ensure_ascii=False) + return ( + "You are an e-commerce search relevance judge.\n" + "Judge each product against the structured query profile below.\n\n" + "Relevance rules:\n" + "- Exact: product type matches the target intent, and every explicit required attribute is positively supported by the title/options/tags/category. If an attribute is missing or only guessed, it is NOT Exact.\n" + "- Partial: main product type/use case matches, but some required attribute is missing, weaker, uncertain, or only approximately matched.\n" + "- Irrelevant: product type/use case mismatched, or an explicit required attribute clearly conflicts.\n" + "- Be conservative with Exact.\n" + "- Graphic/holiday/message tees are not Exact for a plain color/style tee query unless that graphic/theme was requested.\n" + "- Jumpsuit/romper/set is not Exact for dress/skirt/jeans queries.\n\n" + f"Original query: {query}\n" + f"Structured query profile JSON: {profile_json}\n\n" + "Products:\n" + f"{lines}\n\n" + "Return JSON only, with schema:\n" + '{"labels":[{"index":1,"label":"Exact","reason":"short phrase"}]}\n' + ) diff --git a/scripts/evaluation/eval_framework/reports.py b/scripts/evaluation/eval_framework/reports.py new file mode 100644 index 0000000..3fe4908 --- /dev/null +++ b/scripts/evaluation/eval_framework/reports.py @@ -0,0 +1,48 @@ +"""Markdown and text reports for batch evaluation.""" + +from __future__ import annotations + +from typing import Any, Dict + +from .constants import RELEVANCE_EXACT, RELEVANCE_IRRELEVANT, RELEVANCE_PARTIAL + + +def render_batch_report_markdown(payload: Dict[str, Any]) -> str: + lines = [ + "# Search Batch Evaluation", + "", + f"- Batch ID: {payload['batch_id']}", + f"- Created at: {payload['created_at']}", + f"- Tenant ID: {payload['tenant_id']}", + f"- Query count: {len(payload['queries'])}", + f"- Top K: {payload['top_k']}", + "", + "## Aggregate Metrics", + "", + ] + for key, value in sorted((payload.get("aggregate_metrics") or {}).items()): + lines.append(f"- {key}: {value}") + distribution = payload.get("aggregate_distribution") or {} + if distribution: + lines.extend( + [ + "", + "## Label Distribution", + "", + f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}", + f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}", + f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}", + ] + ) + lines.extend(["", "## Per Query", ""]) + for item in payload.get("per_query") or []: + lines.append(f"### {item['query']}") + lines.append("") + for key, value in sorted((item.get("metrics") or {}).items()): + lines.append(f"- {key}: {value}") + distribution = item.get("distribution") or {} + lines.append(f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}") + lines.append(f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}") + lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}") + lines.append("") + return "\n".join(lines) diff --git a/scripts/evaluation/eval_framework/static/eval_web.css b/scripts/evaluation/eval_framework/static/eval_web.css new file mode 100644 index 0000000..fbb75ad --- /dev/null +++ b/scripts/evaluation/eval_framework/static/eval_web.css @@ -0,0 +1,91 @@ +:root { + --bg: #f5f3ed; + --panel: #fffdf8; + --ink: #1f2a24; + --muted: #6b756e; + --line: #ddd4c6; + --accent: #0f766e; + --exact: #0f766e; + --partial: #b7791f; + --irrelevant: #b42318; + } + body { margin: 0; font-family: "IBM Plex Sans", "Segoe UI", sans-serif; color: var(--ink); background: + radial-gradient(circle at top left, #f0e6d6 0, transparent 28%), + linear-gradient(180deg, #f9f6f0 0%, #f0ece3 100%); } + .app { display: grid; grid-template-columns: 280px 1fr; min-height: 100vh; } + .sidebar { border-right: 1px solid var(--line); padding: 20px; background: rgba(255,255,255,0.55); backdrop-filter: blur(10px); } + .main { padding: 24px; } + h1, h2 { margin: 0 0 12px; } + .muted { color: var(--muted); } + .query-list { max-height: 60vh; overflow: auto; border: 1px solid var(--line); background: var(--panel); border-radius: 14px; padding: 8px; } + .query-item { + display: block; width: 100%; border: 0; background: transparent; text-align: left; + padding: 10px 12px; border-radius: 10px; cursor: pointer; + color: var(--ink); font-size: 15px; font-weight: 500; + } + .query-item:hover { background: #eef6f4; } + .toolbar { display: flex; gap: 12px; flex-wrap: wrap; align-items: center; margin-bottom: 16px; } + input[type=text] { flex: 1 1 420px; padding: 12px 14px; border-radius: 14px; border: 1px solid var(--line); font-size: 15px; } + button { border: 0; background: var(--accent); color: white; padding: 12px 16px; border-radius: 14px; cursor: pointer; font-weight: 600; } + button.secondary { background: #d9e6e3; color: #12433d; } + .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(170px, 1fr)); gap: 12px; margin-bottom: 16px; } + .metric { background: var(--panel); border: 1px solid var(--line); border-radius: 16px; padding: 14px; } + .metric .label { font-size: 12px; color: var(--muted); text-transform: uppercase; letter-spacing: 0.04em; } + .metric .value { font-size: 24px; font-weight: 700; margin-top: 4px; } + .results { display: grid; gap: 10px; } + .result { display: grid; grid-template-columns: 110px 100px 1fr; gap: 14px; align-items: center; background: var(--panel); border: 1px solid var(--line); border-radius: 18px; padding: 12px; } + .badge { display: inline-block; padding: 8px 10px; border-radius: 999px; color: white; font-weight: 700; text-align: center; } + .Exact { background: var(--exact); } + .Partial { background: var(--partial); } + .Irrelevant { background: var(--irrelevant); } + .Unknown { background: #637381; } + .thumb { width: 100px; height: 100px; object-fit: cover; border-radius: 14px; background: #e7e1d4; } + .title { font-size: 16px; font-weight: 700; margin-bottom: 8px; } + .options { color: var(--muted); line-height: 1.5; font-size: 14px; } + .section { margin-bottom: 28px; } + .history { font-size: 13px; line-height: 1.5; } + .history-list { max-height: 42vh; overflow: auto; display: flex; flex-direction: column; gap: 8px; margin-top: 8px; } + .history-item { + display: block; width: 100%; border: 1px solid var(--line); background: var(--panel); + text-align: left; padding: 10px 12px; border-radius: 12px; cursor: pointer; + color: var(--ink); font-size: 13px; transition: background 0.15s, border-color 0.15s, box-shadow 0.15s; + } + .history-item:hover { background: #eef6f4; border-color: #b8d4cd; } + .history-item:focus-visible { outline: 2px solid var(--accent); outline-offset: 2px; } + .history-item .hid { font-weight: 700; font-size: 12px; word-break: break-all; color: #12433d; } + .history-item .hmeta { color: var(--muted); font-size: 11px; margin-top: 4px; } + .history-item .hstats { margin-top: 6px; font-size: 12px; color: var(--ink); line-height: 1.45; } + .history-item .hstats span { color: var(--muted); } + .report-modal-root { + position: fixed; inset: 0; z-index: 200; display: none; align-items: center; justify-content: center; + padding: 16px; box-sizing: border-box; + } + .report-modal-root.is-open { display: flex; } + .report-modal-backdrop { position: absolute; inset: 0; background: rgba(31, 42, 36, 0.45); backdrop-filter: blur(4px); } + .report-modal-dialog { + position: relative; z-index: 1; width: min(920px, 100%); max-height: min(92vh, 900px); display: flex; flex-direction: column; + background: var(--panel); border: 1px solid var(--line); border-radius: 18px; + box-shadow: 0 24px 48px rgba(31, 42, 36, 0.18); + } + .report-modal-head { + flex: 0 0 auto; display: flex; align-items: flex-start; justify-content: space-between; gap: 12px; + padding: 16px 18px; border-bottom: 1px solid var(--line); + } + .report-modal-head h3 { margin: 0; font-size: 15px; font-weight: 700; word-break: break-all; } + .report-modal-head .head-actions { display: flex; gap: 8px; flex-shrink: 0; } + .report-modal-head button { padding: 8px 12px; font-size: 13px; border-radius: 10px; } + .report-modal-meta { flex: 0 0 auto; padding: 10px 18px; font-size: 12px; border-bottom: 1px solid var(--line); background: rgba(255,253,248,0.9); } + .report-modal-body { + flex: 1 1 auto; overflow: auto; padding: 18px 22px 22px; + font-size: 14px; line-height: 1.55; + } + .batch-report-md h1 { font-size: 1.35rem; margin: 0 0 0.75rem; color: #12433d; } + .batch-report-md h2 { font-size: 1.05rem; margin: 1.35rem 0 0.6rem; padding-bottom: 0.35rem; border-bottom: 1px solid var(--line); color: #1a5249; } + .batch-report-md h2:first-of-type { margin-top: 0; } + .batch-report-md h3 { font-size: 0.95rem; margin: 1rem 0 0.4rem; color: var(--ink); font-weight: 700; } + .batch-report-md ul { margin: 0.35rem 0 0.5rem; padding-left: 1.25rem; } + .batch-report-md li { margin: 0.2rem 0; } + .batch-report-md code { font-size: 0.88em; background: #e8e4d8; padding: 0.12em 0.35em; border-radius: 4px; } + .report-modal-body.report-modal-loading, .report-modal-body.report-modal-error { color: var(--muted); font-style: italic; } + .tips { background: var(--panel); border: 1px solid var(--line); border-radius: 16px; padding: 14px; line-height: 1.6; } + .tip { margin-bottom: 6px; color: var(--muted); } diff --git a/scripts/evaluation/eval_framework/static/eval_web.js b/scripts/evaluation/eval_framework/static/eval_web.js new file mode 100644 index 0000000..f4d1276 --- /dev/null +++ b/scripts/evaluation/eval_framework/static/eval_web.js @@ -0,0 +1,181 @@ + async function fetchJSON(url, options) { + const res = await fetch(url, options); + if (!res.ok) throw new Error(await res.text()); + return await res.json(); + } + function renderMetrics(metrics) { + const root = document.getElementById('metrics'); + root.innerHTML = ''; + Object.entries(metrics || {}).forEach(([key, value]) => { + const card = document.createElement('div'); + card.className = 'metric'; + card.innerHTML = `
${key}
${value}
`; + root.appendChild(card); + }); + } + function renderResults(results, rootId='results', showRank=true) { + const mount = document.getElementById(rootId); + mount.innerHTML = ''; + (results || []).forEach(item => { + const label = item.label || 'Unknown'; + const box = document.createElement('div'); + box.className = 'result'; + box.innerHTML = ` +
${label}
${showRank ? `#${item.rank || '-'}` : (item.rerank_score != null ? `rerank=${item.rerank_score.toFixed ? item.rerank_score.toFixed(4) : item.rerank_score}` : 'not recalled')}
+ +
+
${item.title || ''}
+
+
${(item.option_values || [])[0] || ''}
+
${(item.option_values || [])[1] || ''}
+
${(item.option_values || [])[2] || ''}
+
+
`; + mount.appendChild(box); + }); + if (!(results || []).length) { + mount.innerHTML = '
None.
'; + } + } + function renderTips(data) { + const root = document.getElementById('tips'); + const tips = [...(data.tips || [])]; + const stats = data.label_stats || {}; + tips.unshift(`Cached labels for query: ${stats.total || 0}. Recalled hits: ${stats.recalled_hits || 0}. Missed Exact: ${stats.missing_exact_count || 0}. Missed Partial: ${stats.missing_partial_count || 0}.`); + root.innerHTML = tips.map(text => `
${text}
`).join(''); + } + async function loadQueries() { + const data = await fetchJSON('/api/queries'); + const root = document.getElementById('queryList'); + root.innerHTML = ''; + data.queries.forEach(query => { + const btn = document.createElement('button'); + btn.className = 'query-item'; + btn.textContent = query; + btn.onclick = () => { + document.getElementById('queryInput').value = query; + runSingle(); + }; + root.appendChild(btn); + }); + } + function fmtMetric(m, key, digits) { + const v = m && m[key]; + if (v == null || Number.isNaN(Number(v))) return null; + const n = Number(v); + return n.toFixed(digits); + } + function historySummaryHtml(meta) { + const m = meta && meta.aggregate_metrics; + const nq = (meta && meta.queries && meta.queries.length) || (meta && meta.per_query && meta.per_query.length) || null; + const parts = []; + if (nq != null) parts.push(`Queries ${nq}`); + const p10 = fmtMetric(m, 'P@10', 3); + const p52 = fmtMetric(m, 'P@5_2_3', 3); + const map3 = fmtMetric(m, 'MAP_3', 3); + if (p10) parts.push(`P@10 ${p10}`); + if (p52) parts.push(`P@5_2_3 ${p52}`); + if (map3) parts.push(`MAP_3 ${map3}`); + if (!parts.length) return ''; + return `
${parts.join(' · ')}
`; + } + async function loadHistory() { + const data = await fetchJSON('/api/history'); + const root = document.getElementById('history'); + root.classList.remove('muted'); + const items = data.history || []; + if (!items.length) { + root.innerHTML = 'No history yet.'; + return; + } + root.innerHTML = `
`; + const list = root.querySelector('.history-list'); + items.forEach(item => { + const btn = document.createElement('button'); + btn.type = 'button'; + btn.className = 'history-item'; + btn.setAttribute('aria-label', `Open report ${item.batch_id}`); + const sum = historySummaryHtml(item.metadata); + btn.innerHTML = `
${item.batch_id}
+
${item.created_at} · tenant ${item.tenant_id}
${sum}`; + btn.onclick = () => openBatchReport(item.batch_id); + list.appendChild(btn); + }); + } + let _lastReportPath = ''; + function closeReportModal() { + const el = document.getElementById('reportModal'); + el.classList.remove('is-open'); + el.setAttribute('aria-hidden', 'true'); + document.getElementById('reportModalBody').innerHTML = ''; + document.getElementById('reportModalMeta').textContent = ''; + } + async function openBatchReport(batchId) { + const el = document.getElementById('reportModal'); + const body = document.getElementById('reportModalBody'); + const metaEl = document.getElementById('reportModalMeta'); + const titleEl = document.getElementById('reportModalTitle'); + el.classList.add('is-open'); + el.setAttribute('aria-hidden', 'false'); + titleEl.textContent = batchId; + metaEl.textContent = ''; + body.className = 'report-modal-body batch-report-md report-modal-loading'; + body.textContent = 'Loading report…'; + try { + const rep = await fetchJSON('/api/history/' + encodeURIComponent(batchId) + '/report'); + _lastReportPath = rep.report_markdown_path || ''; + metaEl.textContent = rep.report_markdown_path || ''; + const raw = marked.parse(rep.markdown || '', { gfm: true }); + const safe = DOMPurify.sanitize(raw, { USE_PROFILES: { html: true } }); + body.className = 'report-modal-body batch-report-md'; + body.innerHTML = safe; + } catch (e) { + body.className = 'report-modal-body report-modal-error'; + body.textContent = (e && e.message) ? e.message : String(e); + } + } + document.getElementById('reportModal').addEventListener('click', (ev) => { + if (ev.target && ev.target.getAttribute('data-close-report') === '1') closeReportModal(); + }); + document.addEventListener('keydown', (ev) => { + if (ev.key === 'Escape') closeReportModal(); + }); + document.getElementById('reportCopyPath').addEventListener('click', async () => { + if (!_lastReportPath) return; + try { + await navigator.clipboard.writeText(_lastReportPath); + } catch (_) {} + }); + async function runSingle() { + const query = document.getElementById('queryInput').value.trim(); + if (!query) return; + document.getElementById('status').textContent = `Evaluating "${query}"...`; + const data = await fetchJSON('/api/search-eval', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({query, top_k: 100, auto_annotate: false}) + }); + document.getElementById('status').textContent = `Done. total=${data.total}`; + renderMetrics(data.metrics); + renderResults(data.results, 'results', true); + renderResults(data.missing_relevant, 'missingRelevant', false); + renderTips(data); + loadHistory(); + } + async function runBatch() { + document.getElementById('status').textContent = 'Running batch evaluation...'; + const data = await fetchJSON('/api/batch-eval', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({top_k: 100, auto_annotate: false}) + }); + document.getElementById('status').textContent = `Batch done. report=${data.batch_id}`; + renderMetrics(data.aggregate_metrics); + renderResults([], 'results', true); + renderResults([], 'missingRelevant', false); + document.getElementById('tips').innerHTML = '
Batch evaluation uses cached labels only unless force refresh is requested via CLI/API.
'; + loadHistory(); + } + loadQueries(); + loadHistory(); + diff --git a/scripts/evaluation/eval_framework/static/index.html b/scripts/evaluation/eval_framework/static/index.html new file mode 100644 index 0000000..42273f2 --- /dev/null +++ b/scripts/evaluation/eval_framework/static/index.html @@ -0,0 +1,70 @@ + + + + + + Search Evaluation + + + + +
+ +
+

Search Evaluation

+

Single-query evaluation and batch evaluation share the same service on port 6010.

+
+ + + +
+
+
+

Metrics

+
+
+
+

Top Results

+
+
+
+

Missed Exact / Partial

+
+
+
+

Notes

+
+
+
+
+ + + + + + + + + \ No newline at end of file diff --git a/scripts/evaluation/eval_framework/store.py b/scripts/evaluation/eval_framework/store.py new file mode 100644 index 0000000..8c16787 --- /dev/null +++ b/scripts/evaluation/eval_framework/store.py @@ -0,0 +1,426 @@ +"""SQLite persistence for evaluation corpus, labels, rerank scores, and run metadata.""" + +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence + +from .constants import VALID_LABELS +from .utils import ensure_dir, safe_json_dumps, utc_now_iso + + +@dataclass +class QueryBuildResult: + query: str + tenant_id: str + search_total: int + search_depth: int + rerank_corpus_size: int + annotated_count: int + output_json_path: Path + + +class EvalStore: + def __init__(self, db_path: Path): + self.db_path = db_path + ensure_dir(db_path.parent) + self.conn = sqlite3.connect(str(db_path), check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self._init_schema() + + def _init_schema(self) -> None: + self.conn.executescript( + """ + CREATE TABLE IF NOT EXISTS corpus_docs ( + tenant_id TEXT NOT NULL, + spu_id TEXT NOT NULL, + title_json TEXT, + vendor_json TEXT, + category_path_json TEXT, + category_name_json TEXT, + image_url TEXT, + skus_json TEXT, + tags_json TEXT, + raw_json TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, spu_id) + ); + + CREATE TABLE IF NOT EXISTS rerank_scores ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + spu_id TEXT NOT NULL, + score REAL NOT NULL, + model_name TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, spu_id) + ); + + CREATE TABLE IF NOT EXISTS relevance_labels ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + spu_id TEXT NOT NULL, + label TEXT NOT NULL, + judge_model TEXT, + raw_response TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, spu_id) + ); + + CREATE TABLE IF NOT EXISTS build_runs ( + run_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + output_json_path TEXT NOT NULL, + metadata_json TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS batch_runs ( + batch_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + output_json_path TEXT NOT NULL, + report_markdown_path TEXT NOT NULL, + config_snapshot_path TEXT NOT NULL, + metadata_json TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS query_profiles ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + prompt_version TEXT NOT NULL, + judge_model TEXT, + profile_json TEXT NOT NULL, + raw_response TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, prompt_version) + ); + """ + ) + self.conn.commit() + + def upsert_corpus_docs(self, tenant_id: str, docs: Sequence[Dict[str, Any]]) -> None: + now = utc_now_iso() + rows = [] + for doc in docs: + rows.append( + ( + tenant_id, + str(doc.get("spu_id") or ""), + safe_json_dumps(doc.get("title")), + safe_json_dumps(doc.get("vendor")), + safe_json_dumps(doc.get("category_path")), + safe_json_dumps(doc.get("category_name")), + str(doc.get("image_url") or ""), + safe_json_dumps(doc.get("skus") or []), + safe_json_dumps(doc.get("tags") or []), + safe_json_dumps(doc), + now, + ) + ) + self.conn.executemany( + """ + INSERT INTO corpus_docs ( + tenant_id, spu_id, title_json, vendor_json, category_path_json, category_name_json, + image_url, skus_json, tags_json, raw_json, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, spu_id) DO UPDATE SET + title_json=excluded.title_json, + vendor_json=excluded.vendor_json, + category_path_json=excluded.category_path_json, + category_name_json=excluded.category_name_json, + image_url=excluded.image_url, + skus_json=excluded.skus_json, + tags_json=excluded.tags_json, + raw_json=excluded.raw_json, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_corpus_docs(self, tenant_id: str) -> List[Dict[str, Any]]: + rows = self.conn.execute( + "SELECT raw_json FROM corpus_docs WHERE tenant_id=? ORDER BY spu_id", + (tenant_id,), + ).fetchall() + return [json.loads(row["raw_json"]) for row in rows] + + def get_corpus_docs_by_spu_ids(self, tenant_id: str, spu_ids: Sequence[str]) -> Dict[str, Dict[str, Any]]: + keys = [str(spu_id) for spu_id in spu_ids if str(spu_id).strip()] + if not keys: + return {} + placeholders = ",".join("?" for _ in keys) + rows = self.conn.execute( + f""" + SELECT spu_id, raw_json + FROM corpus_docs + WHERE tenant_id=? AND spu_id IN ({placeholders}) + """, + [tenant_id, *keys], + ).fetchall() + return { + str(row["spu_id"]): json.loads(row["raw_json"]) + for row in rows + } + + def has_corpus(self, tenant_id: str) -> bool: + row = self.conn.execute( + "SELECT COUNT(1) AS n FROM corpus_docs WHERE tenant_id=?", + (tenant_id,), + ).fetchone() + return bool(row and row["n"] > 0) + + def get_rerank_scores(self, tenant_id: str, query_text: str) -> Dict[str, float]: + rows = self.conn.execute( + """ + SELECT spu_id, score + FROM rerank_scores + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchall() + return {str(row["spu_id"]): float(row["score"]) for row in rows} + + def upsert_rerank_scores( + self, + tenant_id: str, + query_text: str, + scores: Dict[str, float], + model_name: str, + ) -> None: + now = utc_now_iso() + rows = [ + (tenant_id, query_text, spu_id, float(score), model_name, now) + for spu_id, score in scores.items() + ] + self.conn.executemany( + """ + INSERT INTO rerank_scores (tenant_id, query_text, spu_id, score, model_name, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET + score=excluded.score, + model_name=excluded.model_name, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_labels(self, tenant_id: str, query_text: str) -> Dict[str, str]: + rows = self.conn.execute( + """ + SELECT spu_id, label + FROM relevance_labels + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchall() + return {str(row["spu_id"]): str(row["label"]) for row in rows} + + def upsert_labels( + self, + tenant_id: str, + query_text: str, + labels: Dict[str, str], + judge_model: str, + raw_response: str, + ) -> None: + now = utc_now_iso() + rows = [] + for spu_id, label in labels.items(): + if label not in VALID_LABELS: + raise ValueError(f"invalid label: {label}") + rows.append((tenant_id, query_text, spu_id, label, judge_model, raw_response, now)) + self.conn.executemany( + """ + INSERT INTO relevance_labels (tenant_id, query_text, spu_id, label, judge_model, raw_response, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET + label=excluded.label, + judge_model=excluded.judge_model, + raw_response=excluded.raw_response, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_query_profile(self, tenant_id: str, query_text: str, prompt_version: str) -> Optional[Dict[str, Any]]: + row = self.conn.execute( + """ + SELECT profile_json + FROM query_profiles + WHERE tenant_id=? AND query_text=? AND prompt_version=? + """, + (tenant_id, query_text, prompt_version), + ).fetchone() + if not row: + return None + return json.loads(row["profile_json"]) + + def upsert_query_profile( + self, + tenant_id: str, + query_text: str, + prompt_version: str, + judge_model: str, + profile: Dict[str, Any], + raw_response: str, + ) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO query_profiles + (tenant_id, query_text, prompt_version, judge_model, profile_json, raw_response, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + tenant_id, + query_text, + prompt_version, + judge_model, + safe_json_dumps(profile), + raw_response, + utc_now_iso(), + ), + ) + self.conn.commit() + + def insert_build_run(self, run_id: str, tenant_id: str, query_text: str, output_json_path: Path, metadata: Dict[str, Any]) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO build_runs (run_id, tenant_id, query_text, output_json_path, metadata_json, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (run_id, tenant_id, query_text, str(output_json_path), safe_json_dumps(metadata), utc_now_iso()), + ) + self.conn.commit() + + def insert_batch_run( + self, + batch_id: str, + tenant_id: str, + output_json_path: Path, + report_markdown_path: Path, + config_snapshot_path: Path, + metadata: Dict[str, Any], + ) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO batch_runs + (batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + batch_id, + tenant_id, + str(output_json_path), + str(report_markdown_path), + str(config_snapshot_path), + safe_json_dumps(metadata), + utc_now_iso(), + ), + ) + self.conn.commit() + + def list_batch_runs(self, limit: int = 20) -> List[Dict[str, Any]]: + rows = self.conn.execute( + """ + SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at + FROM batch_runs + ORDER BY created_at DESC + LIMIT ? + """, + (limit,), + ).fetchall() + items: List[Dict[str, Any]] = [] + for row in rows: + items.append( + { + "batch_id": row["batch_id"], + "tenant_id": row["tenant_id"], + "output_json_path": row["output_json_path"], + "report_markdown_path": row["report_markdown_path"], + "config_snapshot_path": row["config_snapshot_path"], + "metadata": json.loads(row["metadata_json"]), + "created_at": row["created_at"], + } + ) + return items + + def get_batch_run(self, batch_id: str) -> Optional[Dict[str, Any]]: + row = self.conn.execute( + """ + SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at + FROM batch_runs + WHERE batch_id = ? + """, + (batch_id,), + ).fetchone() + if row is None: + return None + return { + "batch_id": row["batch_id"], + "tenant_id": row["tenant_id"], + "output_json_path": row["output_json_path"], + "report_markdown_path": row["report_markdown_path"], + "config_snapshot_path": row["config_snapshot_path"], + "metadata": json.loads(row["metadata_json"]), + "created_at": row["created_at"], + } + + def list_query_label_stats(self, tenant_id: str) -> List[Dict[str, Any]]: + rows = self.conn.execute( + """ + SELECT + query_text, + COUNT(*) AS total, + SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, + SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, + SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, + MAX(updated_at) AS updated_at + FROM relevance_labels + WHERE tenant_id=? + GROUP BY query_text + ORDER BY query_text + """, + (tenant_id,), + ).fetchall() + return [ + { + "query": str(row["query_text"]), + "total": int(row["total"]), + "exact_count": int(row["exact_count"] or 0), + "partial_count": int(row["partial_count"] or 0), + "irrelevant_count": int(row["irrelevant_count"] or 0), + "updated_at": row["updated_at"], + } + for row in rows + ] + + def get_query_label_stats(self, tenant_id: str, query_text: str) -> Dict[str, Any]: + row = self.conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, + SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, + SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, + MAX(updated_at) AS updated_at + FROM relevance_labels + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchone() + return { + "query": query_text, + "total": int((row["total"] or 0) if row else 0), + "exact_count": int((row["exact_count"] or 0) if row else 0), + "partial_count": int((row["partial_count"] or 0) if row else 0), + "irrelevant_count": int((row["irrelevant_count"] or 0) if row else 0), + "updated_at": row["updated_at"] if row else None, + } diff --git a/scripts/evaluation/eval_framework/utils.py b/scripts/evaluation/eval_framework/utils.py new file mode 100644 index 0000000..0425097 --- /dev/null +++ b/scripts/evaluation/eval_framework/utils.py @@ -0,0 +1,145 @@ +"""Small helpers: time, JSON, document text, LLM output parsing.""" + +from __future__ import annotations + +import hashlib +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Sequence, Tuple + +from .constants import PROJECT_ROOT + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def utc_timestamp() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def ensure_dir(path: Path) -> Path: + path.mkdir(parents=True, exist_ok=True) + return path + + +def sha1_text(text: str) -> str: + return hashlib.sha1(text.encode("utf-8")).hexdigest() + + +def pick_text(value: Any, preferred_lang: str = "en") -> str: + if value is None: + return "" + if isinstance(value, dict): + return str( + value.get(preferred_lang) + or value.get("en") + or value.get("zh") + or next((v for v in value.values() if v), "") + ).strip() + return str(value).strip() + + +def safe_json_dumps(data: Any) -> str: + return json.dumps(data, ensure_ascii=False, separators=(",", ":")) + + +def compact_option_values(skus: Sequence[Dict[str, Any]]) -> Tuple[str, str, str]: + if not skus: + return "", "", "" + first = skus[0] or {} + return ( + str(first.get("option1_value") or "").strip(), + str(first.get("option2_value") or "").strip(), + str(first.get("option3_value") or "").strip(), + ) + + +def build_display_title(doc: Dict[str, Any]) -> str: + title = doc.get("title") + en = pick_text(title, "en") + zh = pick_text(title, "zh") + if en and zh and en != zh: + return f"{en} / {zh}" + return en or zh + + +def build_rerank_doc(doc: Dict[str, Any]) -> str: + title = build_display_title(doc) + return title[:400] + + +def build_label_doc_line(idx: int, doc: Dict[str, Any]) -> str: + title = build_display_title(doc) + option1, option2, option3 = compact_option_values(doc.get("skus") or []) + vendor = pick_text(doc.get("vendor"), "en") + category = pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en") + tags = doc.get("tags") or [] + tags_text = ", ".join(str(tag) for tag in tags[:4] if tag) + parts = [title] + if option1: + parts.append(f"option1={option1}") + if option2: + parts.append(f"option2={option2}") + if option3: + parts.append(f"option3={option3}") + if vendor: + parts.append(f"vendor={vendor}") + if category: + parts.append(f"category={category}") + if tags_text: + parts.append(f"tags={tags_text}") + return f"{idx}. " + " | ".join(part for part in parts if part) + + +def compact_product_payload(doc: Dict[str, Any]) -> Dict[str, Any]: + return { + "spu_id": str(doc.get("spu_id") or ""), + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "vendor": pick_text(doc.get("vendor"), "en"), + "category": pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en"), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "tags": list((doc.get("tags") or [])[:6]), + } + + +def normalize_text(text: Any) -> str: + value = str(text or "").strip().lower() + value = re.sub(r"\s+", " ", value) + return value + + +def extract_json_blob(text: str) -> Any: + cleaned = str(text or "").strip() + candidates: List[str] = [cleaned] + fence_matches = re.findall(r"```(?:json)?\s*(.*?)```", cleaned, flags=re.S | re.I) + candidates.extend(match.strip() for match in fence_matches if match.strip()) + + for candidate in candidates: + try: + return json.loads(candidate) + except Exception: + pass + + starts = [idx for idx, ch in enumerate(cleaned) if ch in "[{"] + ends = [idx for idx, ch in enumerate(cleaned) if ch in "]}"] + for start in starts: + for end in reversed(ends): + if end <= start: + continue + fragment = cleaned[start : end + 1] + try: + return json.loads(fragment) + except Exception: + continue + raise ValueError(f"failed to parse json from: {cleaned[:500]!r}") + + +def ensure_project_on_path() -> None: + import sys + + if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) diff --git a/scripts/evaluation/eval_framework/web_app.py b/scripts/evaluation/eval_framework/web_app.py new file mode 100644 index 0000000..3a176f8 --- /dev/null +++ b/scripts/evaluation/eval_framework/web_app.py @@ -0,0 +1,85 @@ +"""FastAPI app for the search evaluation UI (static frontend + JSON APIs).""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict + +from fastapi import FastAPI, HTTPException +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles + +from .api_models import BatchEvalRequest, SearchEvalRequest +from .constants import DEFAULT_QUERY_FILE +from .framework import SearchEvaluationFramework + +_STATIC_DIR = Path(__file__).resolve().parent / "static" + + +def create_web_app(framework: SearchEvaluationFramework, query_file: Path = DEFAULT_QUERY_FILE) -> FastAPI: + app = FastAPI(title="Search Evaluation UI", version="1.0.0") + + app.mount( + "/static", + StaticFiles(directory=str(_STATIC_DIR)), + name="static", + ) + + index_path = _STATIC_DIR / "index.html" + + @app.get("/", response_class=HTMLResponse) + def home() -> str: + return index_path.read_text(encoding="utf-8") + + @app.get("/api/queries") + def api_queries() -> Dict[str, Any]: + return {"queries": framework.queries_from_file(query_file)} + + @app.post("/api/search-eval") + def api_search_eval(request: SearchEvalRequest) -> Dict[str, Any]: + return framework.evaluate_live_query( + query=request.query, + top_k=request.top_k, + auto_annotate=request.auto_annotate, + language=request.language, + ) + + @app.post("/api/batch-eval") + def api_batch_eval(request: BatchEvalRequest) -> Dict[str, Any]: + queries = request.queries or framework.queries_from_file(query_file) + if not queries: + raise HTTPException(status_code=400, detail="No queries provided") + return framework.batch_evaluate( + queries=queries, + top_k=request.top_k, + auto_annotate=request.auto_annotate, + language=request.language, + force_refresh_labels=request.force_refresh_labels, + ) + + @app.get("/api/history") + def api_history() -> Dict[str, Any]: + return {"history": framework.store.list_batch_runs(limit=20)} + + @app.get("/api/history/{batch_id}/report") + def api_history_report(batch_id: str) -> Dict[str, Any]: + row = framework.store.get_batch_run(batch_id) + if row is None: + raise HTTPException(status_code=404, detail="Unknown batch_id") + report_path = Path(row["report_markdown_path"]).resolve() + root = framework.artifact_root.resolve() + try: + report_path.relative_to(root) + except ValueError: + raise HTTPException(status_code=403, detail="Report path is outside artifact root") + if not report_path.is_file(): + raise HTTPException(status_code=404, detail="Report file not found") + return { + "batch_id": row["batch_id"], + "created_at": row["created_at"], + "tenant_id": row["tenant_id"], + "report_markdown_path": str(report_path), + "markdown": report_path.read_text(encoding="utf-8"), + } + + return app -- libgit2 0.21.2