Search Evaluation
Single-query evaluation and batch evaluation share the same service on port 6010.
#!/usr/bin/env python3 """ Search evaluation framework for pooled relevance annotation, live metrics, and reports. """ from __future__ import annotations import argparse import hashlib import json import math import os import re import sqlite3 import sys import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple import requests from elasticsearch.helpers import scan from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from api.app import get_app_config, get_es_client, get_query_parser, init_service from indexer.mapping_generator import get_tenant_index_name RELEVANCE_EXACT = "Exact" RELEVANCE_PARTIAL = "Partial" RELEVANCE_IRRELEVANT = "Irrelevant" VALID_LABELS = {RELEVANCE_EXACT, RELEVANCE_PARTIAL, RELEVANCE_IRRELEVANT} DEFAULT_ARTIFACT_ROOT = PROJECT_ROOT / "artifacts" / "search_evaluation" DEFAULT_QUERY_FILE = PROJECT_ROOT / "scripts" / "evaluation" / "queries" / "queries.txt" JUDGE_PROMPT_VERSION = "v2_structured_20260331" def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def utc_timestamp() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def ensure_dir(path: Path) -> Path: path.mkdir(parents=True, exist_ok=True) return path def sha1_text(text: str) -> str: return hashlib.sha1(text.encode("utf-8")).hexdigest() def pick_text(value: Any, preferred_lang: str = "en") -> str: if value is None: return "" if isinstance(value, dict): return str( value.get(preferred_lang) or value.get("en") or value.get("zh") or next((v for v in value.values() if v), "") ).strip() return str(value).strip() def safe_json_dumps(data: Any) -> str: return json.dumps(data, ensure_ascii=False, separators=(",", ":")) def compact_option_values(skus: Sequence[Dict[str, Any]]) -> Tuple[str, str, str]: if not skus: return "", "", "" first = skus[0] or {} return ( str(first.get("option1_value") or "").strip(), str(first.get("option2_value") or "").strip(), str(first.get("option3_value") or "").strip(), ) def build_display_title(doc: Dict[str, Any]) -> str: title = doc.get("title") en = pick_text(title, "en") zh = pick_text(title, "zh") if en and zh and en != zh: return f"{en} / {zh}" return en or zh def build_rerank_doc(doc: Dict[str, Any]) -> str: title = build_display_title(doc) return title[:400] def build_label_doc_line(idx: int, doc: Dict[str, Any]) -> str: title = build_display_title(doc) option1, option2, option3 = compact_option_values(doc.get("skus") or []) vendor = pick_text(doc.get("vendor"), "en") category = pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en") tags = doc.get("tags") or [] tags_text = ", ".join(str(tag) for tag in tags[:4] if tag) parts = [title] if option1: parts.append(f"option1={option1}") if option2: parts.append(f"option2={option2}") if option3: parts.append(f"option3={option3}") if vendor: parts.append(f"vendor={vendor}") if category: parts.append(f"category={category}") if tags_text: parts.append(f"tags={tags_text}") return f"{idx}. " + " | ".join(part for part in parts if part) def compact_product_payload(doc: Dict[str, Any]) -> Dict[str, Any]: return { "spu_id": str(doc.get("spu_id") or ""), "title": build_display_title(doc), "image_url": doc.get("image_url"), "vendor": pick_text(doc.get("vendor"), "en"), "category": pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en"), "option_values": list(compact_option_values(doc.get("skus") or [])), "tags": list((doc.get("tags") or [])[:6]), } def normalize_text(text: Any) -> str: value = str(text or "").strip().lower() value = re.sub(r"\s+", " ", value) return value def _extract_json_blob(text: str) -> Any: cleaned = str(text or "").strip() candidates: List[str] = [cleaned] fence_matches = re.findall(r"```(?:json)?\s*(.*?)```", cleaned, flags=re.S | re.I) candidates.extend(match.strip() for match in fence_matches if match.strip()) for candidate in candidates: try: return json.loads(candidate) except Exception: pass starts = [idx for idx, ch in enumerate(cleaned) if ch in "[{"] ends = [idx for idx, ch in enumerate(cleaned) if ch in "]}"] for start in starts: for end in reversed(ends): if end <= start: continue fragment = cleaned[start : end + 1] try: return json.loads(fragment) except Exception: continue raise ValueError(f"failed to parse json from: {cleaned[:500]!r}") @dataclass class QueryBuildResult: query: str tenant_id: str search_total: int search_depth: int rerank_corpus_size: int annotated_count: int output_json_path: Path class EvalStore: def __init__(self, db_path: Path): self.db_path = db_path ensure_dir(db_path.parent) self.conn = sqlite3.connect(str(db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row self._init_schema() def _init_schema(self) -> None: self.conn.executescript( """ CREATE TABLE IF NOT EXISTS corpus_docs ( tenant_id TEXT NOT NULL, spu_id TEXT NOT NULL, title_json TEXT, vendor_json TEXT, category_path_json TEXT, category_name_json TEXT, image_url TEXT, skus_json TEXT, tags_json TEXT, raw_json TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, spu_id) ); CREATE TABLE IF NOT EXISTS rerank_scores ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, spu_id TEXT NOT NULL, score REAL NOT NULL, model_name TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, spu_id) ); CREATE TABLE IF NOT EXISTS relevance_labels ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, spu_id TEXT NOT NULL, label TEXT NOT NULL, judge_model TEXT, raw_response TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, spu_id) ); CREATE TABLE IF NOT EXISTS build_runs ( run_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, output_json_path TEXT NOT NULL, metadata_json TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS batch_runs ( batch_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, output_json_path TEXT NOT NULL, report_markdown_path TEXT NOT NULL, config_snapshot_path TEXT NOT NULL, metadata_json TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS query_profiles ( tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, prompt_version TEXT NOT NULL, judge_model TEXT, profile_json TEXT NOT NULL, raw_response TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (tenant_id, query_text, prompt_version) ); """ ) self.conn.commit() def upsert_corpus_docs(self, tenant_id: str, docs: Sequence[Dict[str, Any]]) -> None: now = utc_now_iso() rows = [] for doc in docs: rows.append( ( tenant_id, str(doc.get("spu_id") or ""), safe_json_dumps(doc.get("title")), safe_json_dumps(doc.get("vendor")), safe_json_dumps(doc.get("category_path")), safe_json_dumps(doc.get("category_name")), str(doc.get("image_url") or ""), safe_json_dumps(doc.get("skus") or []), safe_json_dumps(doc.get("tags") or []), safe_json_dumps(doc), now, ) ) self.conn.executemany( """ INSERT INTO corpus_docs ( tenant_id, spu_id, title_json, vendor_json, category_path_json, category_name_json, image_url, skus_json, tags_json, raw_json, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, spu_id) DO UPDATE SET title_json=excluded.title_json, vendor_json=excluded.vendor_json, category_path_json=excluded.category_path_json, category_name_json=excluded.category_name_json, image_url=excluded.image_url, skus_json=excluded.skus_json, tags_json=excluded.tags_json, raw_json=excluded.raw_json, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_corpus_docs(self, tenant_id: str) -> List[Dict[str, Any]]: rows = self.conn.execute( "SELECT raw_json FROM corpus_docs WHERE tenant_id=? ORDER BY spu_id", (tenant_id,), ).fetchall() return [json.loads(row["raw_json"]) for row in rows] def get_corpus_docs_by_spu_ids(self, tenant_id: str, spu_ids: Sequence[str]) -> Dict[str, Dict[str, Any]]: keys = [str(spu_id) for spu_id in spu_ids if str(spu_id).strip()] if not keys: return {} placeholders = ",".join("?" for _ in keys) rows = self.conn.execute( f""" SELECT spu_id, raw_json FROM corpus_docs WHERE tenant_id=? AND spu_id IN ({placeholders}) """, [tenant_id, *keys], ).fetchall() return { str(row["spu_id"]): json.loads(row["raw_json"]) for row in rows } def has_corpus(self, tenant_id: str) -> bool: row = self.conn.execute( "SELECT COUNT(1) AS n FROM corpus_docs WHERE tenant_id=?", (tenant_id,), ).fetchone() return bool(row and row["n"] > 0) def get_rerank_scores(self, tenant_id: str, query_text: str) -> Dict[str, float]: rows = self.conn.execute( """ SELECT spu_id, score FROM rerank_scores WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchall() return {str(row["spu_id"]): float(row["score"]) for row in rows} def upsert_rerank_scores( self, tenant_id: str, query_text: str, scores: Dict[str, float], model_name: str, ) -> None: now = utc_now_iso() rows = [ (tenant_id, query_text, spu_id, float(score), model_name, now) for spu_id, score in scores.items() ] self.conn.executemany( """ INSERT INTO rerank_scores (tenant_id, query_text, spu_id, score, model_name, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET score=excluded.score, model_name=excluded.model_name, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_labels(self, tenant_id: str, query_text: str) -> Dict[str, str]: rows = self.conn.execute( """ SELECT spu_id, label FROM relevance_labels WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchall() return {str(row["spu_id"]): str(row["label"]) for row in rows} def upsert_labels( self, tenant_id: str, query_text: str, labels: Dict[str, str], judge_model: str, raw_response: str, ) -> None: now = utc_now_iso() rows = [] for spu_id, label in labels.items(): if label not in VALID_LABELS: raise ValueError(f"invalid label: {label}") rows.append((tenant_id, query_text, spu_id, label, judge_model, raw_response, now)) self.conn.executemany( """ INSERT INTO relevance_labels (tenant_id, query_text, spu_id, label, judge_model, raw_response, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET label=excluded.label, judge_model=excluded.judge_model, raw_response=excluded.raw_response, updated_at=excluded.updated_at """, rows, ) self.conn.commit() def get_query_profile(self, tenant_id: str, query_text: str, prompt_version: str) -> Optional[Dict[str, Any]]: row = self.conn.execute( """ SELECT profile_json FROM query_profiles WHERE tenant_id=? AND query_text=? AND prompt_version=? """, (tenant_id, query_text, prompt_version), ).fetchone() if not row: return None return json.loads(row["profile_json"]) def upsert_query_profile( self, tenant_id: str, query_text: str, prompt_version: str, judge_model: str, profile: Dict[str, Any], raw_response: str, ) -> None: self.conn.execute( """ INSERT OR REPLACE INTO query_profiles (tenant_id, query_text, prompt_version, judge_model, profile_json, raw_response, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, query_text, prompt_version, judge_model, safe_json_dumps(profile), raw_response, utc_now_iso(), ), ) self.conn.commit() def insert_build_run(self, run_id: str, tenant_id: str, query_text: str, output_json_path: Path, metadata: Dict[str, Any]) -> None: self.conn.execute( """ INSERT OR REPLACE INTO build_runs (run_id, tenant_id, query_text, output_json_path, metadata_json, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (run_id, tenant_id, query_text, str(output_json_path), safe_json_dumps(metadata), utc_now_iso()), ) self.conn.commit() def insert_batch_run( self, batch_id: str, tenant_id: str, output_json_path: Path, report_markdown_path: Path, config_snapshot_path: Path, metadata: Dict[str, Any], ) -> None: self.conn.execute( """ INSERT OR REPLACE INTO batch_runs (batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( batch_id, tenant_id, str(output_json_path), str(report_markdown_path), str(config_snapshot_path), safe_json_dumps(metadata), utc_now_iso(), ), ) self.conn.commit() def list_batch_runs(self, limit: int = 20) -> List[Dict[str, Any]]: rows = self.conn.execute( """ SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at FROM batch_runs ORDER BY created_at DESC LIMIT ? """, (limit,), ).fetchall() items: List[Dict[str, Any]] = [] for row in rows: items.append( { "batch_id": row["batch_id"], "tenant_id": row["tenant_id"], "output_json_path": row["output_json_path"], "report_markdown_path": row["report_markdown_path"], "config_snapshot_path": row["config_snapshot_path"], "metadata": json.loads(row["metadata_json"]), "created_at": row["created_at"], } ) return items def list_query_label_stats(self, tenant_id: str) -> List[Dict[str, Any]]: rows = self.conn.execute( """ SELECT query_text, COUNT(*) AS total, SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, MAX(updated_at) AS updated_at FROM relevance_labels WHERE tenant_id=? GROUP BY query_text ORDER BY query_text """, (tenant_id,), ).fetchall() return [ { "query": str(row["query_text"]), "total": int(row["total"]), "exact_count": int(row["exact_count"] or 0), "partial_count": int(row["partial_count"] or 0), "irrelevant_count": int(row["irrelevant_count"] or 0), "updated_at": row["updated_at"], } for row in rows ] def get_query_label_stats(self, tenant_id: str, query_text: str) -> Dict[str, Any]: row = self.conn.execute( """ SELECT COUNT(*) AS total, SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, MAX(updated_at) AS updated_at FROM relevance_labels WHERE tenant_id=? AND query_text=? """, (tenant_id, query_text), ).fetchone() return { "query": query_text, "total": int((row["total"] or 0) if row else 0), "exact_count": int((row["exact_count"] or 0) if row else 0), "partial_count": int((row["partial_count"] or 0) if row else 0), "irrelevant_count": int((row["irrelevant_count"] or 0) if row else 0), "updated_at": row["updated_at"] if row else None, } class SearchServiceClient: def __init__(self, base_url: str, tenant_id: str): self.base_url = base_url.rstrip("/") self.tenant_id = str(tenant_id) self.session = requests.Session() def search(self, query: str, size: int, from_: int = 0, language: str = "en") -> Dict[str, Any]: response = self.session.post( f"{self.base_url}/search/", headers={"Content-Type": "application/json", "X-Tenant-ID": self.tenant_id}, json={"query": query, "size": size, "from": from_, "language": language}, timeout=120, ) response.raise_for_status() return response.json() class RerankServiceClient: def __init__(self, service_url: str): self.service_url = service_url.rstrip("/") self.session = requests.Session() def rerank(self, query: str, docs: Sequence[str], normalize: bool = False, top_n: Optional[int] = None) -> Tuple[List[float], Dict[str, Any]]: payload: Dict[str, Any] = { "query": query, "docs": list(docs), "normalize": normalize, } if top_n is not None: payload["top_n"] = int(top_n) response = self.session.post(self.service_url, json=payload, timeout=180) response.raise_for_status() data = response.json() return list(data.get("scores") or []), dict(data.get("meta") or {}) class DashScopeLabelClient: def __init__(self, model: str, base_url: str, api_key: str, batch_size: int = 40): self.model = model self.base_url = base_url.rstrip("/") self.api_key = api_key self.batch_size = int(batch_size) self.session = requests.Session() def _chat(self, prompt: str) -> Tuple[str, str]: response = self.session.post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, json={ "model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0, "top_p": 0.1, }, timeout=180, ) response.raise_for_status() data = response.json() content = str(((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() return content, safe_json_dumps(data) def extract_query_profile( self, query: str, parser_hints: Dict[str, Any], ) -> Tuple[Dict[str, Any], str]: prompt = ( "You are building a structured intent profile for e-commerce relevance judging.\n" "Use the original user query as the source of truth. Parser hints may help, but if a hint conflicts with the original query, trust the original query.\n" "Be conservative: only mark an attribute as required if the user explicitly asked for it.\n\n" "Return JSON with this schema:\n" "{\n" ' "normalized_query_en": string,\n' ' "primary_category": string,\n' ' "allowed_categories": [string],\n' ' "required_attributes": [\n' ' {"name": string, "required_terms": [string], "conflicting_terms": [string], "match_mode": "explicit"}\n' " ],\n" ' "notes": [string]\n' "}\n\n" "Guidelines:\n" "- Exact later will require explicit evidence for all required attributes.\n" "- allowed_categories should contain only near-synonyms of the same product type, not substitutes. For example dress can allow midi dress/cocktail dress, but not skirt, top, jumpsuit, or outfit unless the query explicitly asks for them.\n" "- If the query asks for dress/skirt/jeans/t-shirt, near but different product types are not Exact.\n" "- If the query includes color, fit, silhouette, or length, include them as required_attributes.\n" "- For fit words, include conflicting terms when obvious, e.g. fitted conflicts with oversized/loose; oversized conflicts with fitted/tight.\n" "- For color, include conflicting colors only when clear from the query.\n\n" f"Original query: {query}\n" f"Parser hints JSON: {json.dumps(parser_hints, ensure_ascii=False)}\n" ) content, raw_response = self._chat(prompt) payload = _extract_json_blob(content) if not isinstance(payload, dict): raise ValueError(f"unexpected query profile payload: {content!r}") payload.setdefault("normalized_query_en", query) payload.setdefault("primary_category", "") payload.setdefault("allowed_categories", []) payload.setdefault("required_attributes", []) payload.setdefault("notes", []) return payload, raw_response def classify_batch( self, query: str, query_profile: Dict[str, Any], docs: Sequence[Dict[str, Any]], ) -> Tuple[List[str], str]: numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] prompt = ( "You are an e-commerce search relevance judge.\n" "Judge each product against the structured query profile below.\n\n" "Relevance rules:\n" "- Exact: product type matches the target intent, and every explicit required attribute is positively supported by the title/options/tags/category. If an attribute is missing or only guessed, it is NOT Exact.\n" "- Partial: main product type/use case matches, but some required attribute is missing, weaker, uncertain, or only approximately matched.\n" "- Irrelevant: product type/use case mismatched, or an explicit required attribute clearly conflicts.\n" "- Be conservative with Exact.\n" "- Graphic/holiday/message tees are not Exact for a plain color/style tee query unless that graphic/theme was requested.\n" "- Jumpsuit/romper/set is not Exact for dress/skirt/jeans queries.\n\n" f"Original query: {query}\n" f"Structured query profile JSON: {json.dumps(query_profile, ensure_ascii=False)}\n\n" "Products:\n" + "\n".join(numbered_docs) + "\n\nReturn JSON only, with schema:\n" '{"labels":[{"index":1,"label":"Exact","reason":"short phrase"}]}\n' ) content, raw_response = self._chat(prompt) payload = _extract_json_blob(content) if not isinstance(payload, dict) or not isinstance(payload.get("labels"), list): raise ValueError(f"unexpected label payload: {content!r}") labels_payload = payload["labels"] labels: List[str] = [] for item in labels_payload[: len(docs)]: if not isinstance(item, dict): continue label = str(item.get("label") or "").strip() if label in VALID_LABELS: labels.append(label) if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): raise ValueError(f"unexpected label output: {content!r}") return labels, raw_response def precision_at_k(labels: Sequence[str], k: int, relevant: Sequence[str]) -> float: if k <= 0: return 0.0 sliced = list(labels[:k]) if not sliced: return 0.0 hits = sum(1 for label in sliced if label in relevant) return hits / float(min(k, len(sliced))) def average_precision(labels: Sequence[str], relevant: Sequence[str]) -> float: hit_count = 0 precision_sum = 0.0 for idx, label in enumerate(labels, start=1): if label not in relevant: continue hit_count += 1 precision_sum += hit_count / idx if hit_count == 0: return 0.0 return precision_sum / hit_count def compute_query_metrics(labels: Sequence[str]) -> Dict[str, float]: metrics: Dict[str, float] = {} for k in (5, 10, 20, 50): metrics[f"P@{k}"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT]), 6) metrics[f"P@{k}_2_3"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) metrics["MAP_3"] = round(average_precision(labels, [RELEVANCE_EXACT]), 6) metrics["MAP_2_3"] = round(average_precision(labels, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) return metrics def aggregate_metrics(metric_items: Sequence[Dict[str, float]]) -> Dict[str, float]: if not metric_items: return {} keys = sorted(metric_items[0].keys()) return { key: round(sum(float(item.get(key, 0.0)) for item in metric_items) / len(metric_items), 6) for key in keys } def label_distribution(labels: Sequence[str]) -> Dict[str, int]: return { RELEVANCE_EXACT: sum(1 for label in labels if label == RELEVANCE_EXACT), RELEVANCE_PARTIAL: sum(1 for label in labels if label == RELEVANCE_PARTIAL), RELEVANCE_IRRELEVANT: sum(1 for label in labels if label == RELEVANCE_IRRELEVANT), } class SearchEvaluationFramework: def __init__( self, tenant_id: str, artifact_root: Path = DEFAULT_ARTIFACT_ROOT, search_base_url: str = "http://localhost:6002", ): init_service(get_app_config().infrastructure.elasticsearch.host) self.tenant_id = str(tenant_id) self.artifact_root = ensure_dir(artifact_root) self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") self.search_client = SearchServiceClient(search_base_url, self.tenant_id) app_cfg = get_app_config() rerank_service_url = str( app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] ) self.rerank_client = RerankServiceClient(rerank_service_url) llm_cfg = app_cfg.services.translation.capabilities["llm"] api_key = app_cfg.infrastructure.secrets.dashscope_api_key if not api_key: raise RuntimeError("dashscope_api_key is required for search evaluation annotation") self.label_client = DashScopeLabelClient( model=str(llm_cfg["model"]), base_url=str(llm_cfg["base_url"]), api_key=str(api_key), ) self.query_parser = get_query_parser() def build_query_parser_hints(self, query: str) -> Dict[str, Any]: parsed = self.query_parser.parse(query, generate_vector=False, target_languages=["en", "zh"]) payload = parsed.to_dict() payload["text_for_rerank"] = parsed.text_for_rerank() return payload def get_query_profile(self, query: str, force_refresh: bool = False) -> Dict[str, Any]: if not force_refresh: cached = self.store.get_query_profile(self.tenant_id, query, JUDGE_PROMPT_VERSION) if cached is not None: return cached parser_hints = self.build_query_parser_hints(query) profile, raw_response = self.label_client.extract_query_profile(query, parser_hints) profile["parser_hints"] = parser_hints self.store.upsert_query_profile( self.tenant_id, query, JUDGE_PROMPT_VERSION, self.label_client.model, profile, raw_response, ) return profile @staticmethod def _doc_evidence_text(doc: Dict[str, Any]) -> str: pieces: List[str] = [ build_display_title(doc), pick_text(doc.get("vendor"), "en"), pick_text(doc.get("category_path"), "en"), pick_text(doc.get("category_name"), "en"), ] for sku in doc.get("skus") or []: pieces.extend( [ str(sku.get("option1_value") or ""), str(sku.get("option2_value") or ""), str(sku.get("option3_value") or ""), ] ) for tag in doc.get("tags") or []: pieces.append(str(tag)) return normalize_text(" | ".join(piece for piece in pieces if piece)) def _apply_rule_based_label_guardrails( self, label: str, query_profile: Dict[str, Any], doc: Dict[str, Any], ) -> str: if label not in VALID_LABELS: return label evidence = self._doc_evidence_text(doc) category = normalize_text(query_profile.get("primary_category")) allowed_categories = [normalize_text(item) for item in query_profile.get("allowed_categories") or [] if str(item).strip()] primary_category_match = True if category: primary_category_match = category in evidence allowed_category_match = True if allowed_categories: allowed_category_match = any(signal in evidence for signal in allowed_categories) if label == RELEVANCE_EXACT and not primary_category_match: if allowed_category_match: label = RELEVANCE_PARTIAL else: return RELEVANCE_IRRELEVANT for attr in query_profile.get("required_attributes") or []: if not isinstance(attr, dict): continue attr_name = normalize_text(attr.get("name")) if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style", "waist_style", "rise"}: continue required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] if attr_name == "fit": if any(term in {"oversized", "oversize"} for term in required_terms): conflicting_terms.extend(["slim", "slimming", "fitted", "tight", "close-fitting"]) if any(term in {"fitted", "slim fit", "tight"} for term in required_terms): conflicting_terms.extend(["oversized", "oversize", "loose", "relaxed"]) has_required = any(term in evidence for term in required_terms) if required_terms else True has_conflict = any(term in evidence for term in conflicting_terms) if has_conflict: return RELEVANCE_IRRELEVANT if label == RELEVANCE_EXACT and not has_required: label = RELEVANCE_PARTIAL if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: return RELEVANCE_IRRELEVANT return label @staticmethod def _result_item_to_doc(item: Dict[str, Any]) -> Dict[str, Any]: option_values = list(item.get("option_values") or []) while len(option_values) < 3: option_values.append("") product = dict(item.get("product") or {}) return { "spu_id": item.get("spu_id"), "title": product.get("title") or item.get("title"), "vendor": product.get("vendor"), "category_path": product.get("category"), "category_name": product.get("category"), "image_url": item.get("image_url") or product.get("image_url"), "tags": product.get("tags") or [], "skus": [ { "option1_value": option_values[0], "option2_value": option_values[1], "option3_value": option_values[2], } ], } def _collect_label_issues( self, label: str, query_profile: Dict[str, Any], doc: Dict[str, Any], ) -> List[str]: evidence = self._doc_evidence_text(doc) issues: List[str] = [] category = normalize_text(query_profile.get("primary_category")) allowed_categories = [ normalize_text(item) for item in query_profile.get("allowed_categories") or [] if str(item).strip() ] primary_category_match = True if not category else category in evidence allowed_category_match = False if allowed_categories else primary_category_match if allowed_categories: allowed_category_match = any(signal in evidence for signal in allowed_categories) if label == RELEVANCE_EXACT and not primary_category_match: if allowed_category_match: issues.append("Exact missing primary category evidence") else: issues.append("Exact has category mismatch") if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: issues.append("Partial has category mismatch") for attr in query_profile.get("required_attributes") or []: if not isinstance(attr, dict): continue attr_name = normalize_text(attr.get("name")) if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style"}: continue required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] has_required = any(term in evidence for term in required_terms) if required_terms else True has_conflict = any(term in evidence for term in conflicting_terms) if has_conflict and label != RELEVANCE_IRRELEVANT: issues.append(f"{label} conflicts on {attr_name}") if label == RELEVANCE_EXACT and not has_required: issues.append(f"Exact missing {attr_name}") return issues def audit_live_query( self, query: str, *, top_k: int = 100, language: str = "en", auto_annotate: bool = True, ) -> Dict[str, Any]: live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) query_profile = self.get_query_profile(query, force_refresh=False) suspicious: List[Dict[str, Any]] = [] for item in live["results"]: doc = self._result_item_to_doc(item) issues = self._collect_label_issues(item["label"] or "", query_profile, doc) suggested_label = self._apply_rule_based_label_guardrails(item["label"] or "", query_profile, doc) if suggested_label != (item["label"] or ""): issues = list(issues) + [f"Suggested relabel: {item['label']} -> {suggested_label}"] if issues: suspicious.append( { "rank": item["rank"], "spu_id": item["spu_id"], "title": item["title"], "label": item["label"], "suggested_label": suggested_label, "issues": issues, } ) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": live["metrics"], "distribution": label_distribution(labels), "query_profile": query_profile, "suspicious": suspicious, "results": live["results"], } def queries_from_file(self, path: Path) -> List[str]: return [ line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: if not refresh and self.store.has_corpus(self.tenant_id): return self.store.get_corpus_docs(self.tenant_id) es_client = get_es_client().client index_name = get_tenant_index_name(self.tenant_id) docs: List[Dict[str, Any]] = [] for hit in scan( client=es_client, index=index_name, query={ "_source": [ "spu_id", "title", "vendor", "category_path", "category_name", "image_url", "skus", "tags", ], "query": {"match_all": {}}, }, size=500, preserve_order=False, clear_scroll=True, ): source = dict(hit.get("_source") or {}) source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") docs.append(source) self.store.upsert_corpus_docs(self.tenant_id, docs) return docs def full_corpus_rerank( self, query: str, docs: Sequence[Dict[str, Any]], batch_size: int = 24, force_refresh: bool = False, ) -> List[Dict[str, Any]]: cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked = [] for doc in docs: spu_id = str(doc.get("spu_id")) ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: if not docs: return [] doc_texts = [build_rerank_doc(doc) for doc in docs] try: scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) return scores except Exception: if len(docs) == 1: return [-1.0] if len(docs) <= 6: scores: List[float] = [] for doc in docs: scores.extend(self._rerank_batch_with_retry(query, [doc])) return scores mid = len(docs) // 2 left = self._rerank_batch_with_retry(query, docs[:mid]) right = self._rerank_batch_with_retry(query, docs[mid:]) return left + right def annotate_missing_labels( self, query: str, docs: Sequence[Dict[str, Any]], force_refresh: bool = False, ) -> Dict[str, str]: query_profile = self.get_query_profile(query, force_refresh=force_refresh) labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] if not missing_docs: return labels for start in range(0, len(missing_docs), self.label_client.batch_size): batch = missing_docs[start : start + self.label_client.batch_size] batch_pairs = self._classify_with_retry(query, query_profile, batch) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = { str(doc.get("spu_id")): self._apply_rule_based_label_guardrails(label, query_profile, doc) for doc, label in zip(sub_batch, sub_labels) } self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) return labels def _classify_with_retry( self, query: str, query_profile: Dict[str, Any], docs: Sequence[Dict[str, Any]], ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: if not docs: return [] try: labels, raw_response = self.label_client.classify_batch(query, query_profile, docs) return [(labels, raw_response, docs)] except Exception: if len(docs) == 1: raise mid = len(docs) // 2 return self._classify_with_retry(query, query_profile, docs[:mid]) + self._classify_with_retry(query, query_profile, docs[mid:]) def build_query_annotation_set( self, query: str, *, search_depth: int = 1000, rerank_depth: int = 10000, annotate_search_top_k: int = 120, annotate_rerank_top_k: int = 200, language: str = "en", force_refresh_rerank: bool = False, force_refresh_labels: bool = False, ) -> QueryBuildResult: search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) search_results = list(search_payload.get("results") or []) corpus = self.corpus_docs(refresh=False) full_rerank = self.full_corpus_rerank( query=query, docs=corpus, force_refresh=force_refresh_rerank, ) rerank_depth_effective = min(rerank_depth, len(full_rerank)) pool_docs: Dict[str, Dict[str, Any]] = {} for doc in search_results[:annotate_search_top_k]: pool_docs[str(doc.get("spu_id"))] = doc for item in full_rerank[:annotate_rerank_top_k]: pool_docs[str(item["spu_id"])] = item["doc"] query_profile = self.get_query_profile(query, force_refresh=force_refresh_labels) labels = self.annotate_missing_labels( query=query, docs=list(pool_docs.values()), force_refresh=force_refresh_labels, ) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": None, "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "annotate_search_top_k": annotate_search_top_k, "annotate_rerank_top_k": annotate_rerank_top_k, "pool_size": len(pool_docs), }, "query_profile": query_profile, "metrics_top100": metrics, "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=len(pool_docs), output_json_path=output_json_path, ) def evaluate_live_query( self, query: str, top_k: int = 100, auto_annotate: bool = True, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: search_payload = self.search_client.search(query=query, size=max(top_k, 100), from_=0, language=language) results = list(search_payload.get("results") or []) if auto_annotate: self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) labels = self.store.get_labels(self.tenant_id, query) labeled = [] for rank, doc in enumerate(results[:top_k], start=1): spu_id = str(doc.get("spu_id")) labeled.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) metric_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in labeled ] return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": compute_query_metrics(metric_labels), "results": labeled, "total": int(search_payload.get("total") or 0), } def batch_evaluate( self, queries: Sequence[str], *, top_k: int = 100, auto_annotate: bool = True, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: per_query = [] for query in queries: live = self.evaluate_live_query( query, top_k=top_k, auto_annotate=auto_annotate, language=language, force_refresh_labels=force_refresh_labels, ) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] per_query.append( { "query": live["query"], "tenant_id": live["tenant_id"], "top_k": live["top_k"], "metrics": live["metrics"], "distribution": label_distribution(labels), "total": live["total"], } ) aggregate = aggregate_metrics([item["metrics"] for item in per_query]) aggregate_distribution = { RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), RELEVANCE_PARTIAL: sum(item["distribution"][RELEVANCE_PARTIAL] for item in per_query), RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), } batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" report_dir = ensure_dir(self.artifact_root / "batch_reports") config_snapshot_path = report_dir / f"{batch_id}_config.json" config_snapshot = requests.get("http://localhost:6002/admin/config", timeout=20).json() config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") output_json_path = report_dir / f"{batch_id}.json" report_md_path = report_dir / f"{batch_id}.md" payload = { "batch_id": batch_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "queries": list(queries), "top_k": top_k, "aggregate_metrics": aggregate, "aggregate_distribution": aggregate_distribution, "per_query": per_query, "config_snapshot_path": str(config_snapshot_path), } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) return payload def render_batch_report_markdown(payload: Dict[str, Any]) -> str: lines = [ "# Search Batch Evaluation", "", f"- Batch ID: {payload['batch_id']}", f"- Created at: {payload['created_at']}", f"- Tenant ID: {payload['tenant_id']}", f"- Query count: {len(payload['queries'])}", f"- Top K: {payload['top_k']}", "", "## Aggregate Metrics", "", ] for key, value in sorted((payload.get("aggregate_metrics") or {}).items()): lines.append(f"- {key}: {value}") distribution = payload.get("aggregate_distribution") or {} if distribution: lines.extend( [ "", "## Label Distribution", "", f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}", f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}", f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}", ] ) lines.extend(["", "## Per Query", ""]) for item in payload.get("per_query") or []: lines.append(f"### {item['query']}") lines.append("") for key, value in sorted((item.get("metrics") or {}).items()): lines.append(f"- {key}: {value}") distribution = item.get("distribution") or {} lines.append(f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}") lines.append(f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}") lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}") lines.append("") return "\n".join(lines) class SearchEvalRequest(BaseModel): query: str top_k: int = Field(default=100, ge=1, le=500) auto_annotate: bool = True language: str = "en" class BatchEvalRequest(BaseModel): queries: Optional[List[str]] = None top_k: int = Field(default=100, ge=1, le=500) auto_annotate: bool = True language: str = "en" force_refresh_labels: bool = False def create_web_app(framework: SearchEvaluationFramework, query_file: Path = DEFAULT_QUERY_FILE) -> FastAPI: app = FastAPI(title="Search Evaluation UI", version="1.0.0") @app.get("/", response_class=HTMLResponse) def home() -> str: return WEB_APP_HTML @app.get("/api/queries") def api_queries() -> Dict[str, Any]: return {"queries": framework.queries_from_file(query_file)} @app.post("/api/search-eval") def api_search_eval(request: SearchEvalRequest) -> Dict[str, Any]: return framework.evaluate_live_query( query=request.query, top_k=request.top_k, auto_annotate=request.auto_annotate, language=request.language, ) @app.post("/api/batch-eval") def api_batch_eval(request: BatchEvalRequest) -> Dict[str, Any]: queries = request.queries or framework.queries_from_file(query_file) if not queries: raise HTTPException(status_code=400, detail="No queries provided") return framework.batch_evaluate( queries=queries, top_k=request.top_k, auto_annotate=request.auto_annotate, language=request.language, force_refresh_labels=request.force_refresh_labels, ) @app.get("/api/history") def api_history() -> Dict[str, Any]: return {"history": framework.store.list_batch_runs(limit=20)} return app WEB_APP_HTML = """
Single-query evaluation and batch evaluation share the same service on port 6010.