"""Core orchestration: corpus, rerank, LLM labels, live/batch evaluation.""" from __future__ import annotations import json import time from pathlib import Path from typing import Any, Dict, List, Sequence, Tuple import requests from elasticsearch.helpers import scan from api.app import get_app_config, get_es_client, init_service from indexer.mapping_generator import get_tenant_index_name from .clients import DashScopeLabelClient, RerankServiceClient, SearchServiceClient from .constants import ( DEFAULT_ARTIFACT_ROOT, DEFAULT_JUDGE_BATCH_COMPLETION_WINDOW, DEFAULT_JUDGE_BATCH_POLL_INTERVAL_SEC, DEFAULT_JUDGE_DASHSCOPE_BATCH, DEFAULT_JUDGE_ENABLE_THINKING, DEFAULT_JUDGE_MODEL, DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, DEFAULT_REBUILD_LLM_BATCH_SIZE, DEFAULT_REBUILD_MAX_LLM_BATCHES, DEFAULT_REBUILD_MIN_LLM_BATCHES, DEFAULT_RERANK_HIGH_SKIP_COUNT, DEFAULT_RERANK_HIGH_THRESHOLD, DEFAULT_SEARCH_RECALL_TOP_K, RELEVANCE_EXACT, RELEVANCE_HIGH, RELEVANCE_IRRELEVANT, RELEVANCE_LOW, RELEVANCE_NON_IRRELEVANT, VALID_LABELS, ) from .metrics import aggregate_metrics, compute_query_metrics, label_distribution from .reports import render_batch_report_markdown from .store import EvalStore, QueryBuildResult from .utils import ( build_display_title, build_rerank_doc, compact_option_values, compact_product_payload, ensure_dir, sha1_text, utc_now_iso, utc_timestamp, zh_title_from_multilingual, ) def _zh_titles_from_debug_per_result(debug_info: Any) -> Dict[str, str]: """Map ``spu_id`` -> Chinese title from ``debug_info.per_result[].title_multilingual``.""" out: Dict[str, str] = {} if not isinstance(debug_info, dict): return out for entry in debug_info.get("per_result") or []: if not isinstance(entry, dict): continue spu_id = str(entry.get("spu_id") or "").strip() if not spu_id: continue zh = zh_title_from_multilingual(entry.get("title_multilingual")) if zh: out[spu_id] = zh return out class SearchEvaluationFramework: def __init__( self, tenant_id: str, artifact_root: Path = DEFAULT_ARTIFACT_ROOT, search_base_url: str = "http://localhost:6002", *, judge_model: str | None = None, enable_thinking: bool | None = None, use_dashscope_batch: bool | None = None, ): init_service(get_app_config().infrastructure.elasticsearch.host) self.tenant_id = str(tenant_id) self.artifact_root = ensure_dir(artifact_root) self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") self.search_client = SearchServiceClient(search_base_url, self.tenant_id) app_cfg = get_app_config() rerank_service_url = str( app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] ) self.rerank_client = RerankServiceClient(rerank_service_url) llm_cfg = app_cfg.services.translation.capabilities["llm"] api_key = app_cfg.infrastructure.secrets.dashscope_api_key if not api_key: raise RuntimeError("dashscope_api_key is required for search evaluation annotation") model = str(judge_model or DEFAULT_JUDGE_MODEL) et = DEFAULT_JUDGE_ENABLE_THINKING if enable_thinking is None else enable_thinking use_batch = DEFAULT_JUDGE_DASHSCOPE_BATCH if use_dashscope_batch is None else use_dashscope_batch batch_window = DEFAULT_JUDGE_BATCH_COMPLETION_WINDOW batch_poll = float(DEFAULT_JUDGE_BATCH_POLL_INTERVAL_SEC) self.label_client = DashScopeLabelClient( model=model, base_url=str(llm_cfg["base_url"]), api_key=str(api_key), batch_completion_window=batch_window, batch_poll_interval_sec=batch_poll, enable_thinking=et, use_batch=use_batch, ) def audit_live_query( self, query: str, *, top_k: int = 100, language: str = "en", auto_annotate: bool = False, ) -> Dict[str, Any]: live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": live["metrics"], "distribution": label_distribution(labels), "query_profile": None, "suspicious": [], "results": live["results"], } def queries_from_file(self, path: Path) -> List[str]: return [ line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: if not refresh and self.store.has_corpus(self.tenant_id): return self.store.get_corpus_docs(self.tenant_id) es_client = get_es_client().client index_name = get_tenant_index_name(self.tenant_id) docs: List[Dict[str, Any]] = [] for hit in scan( client=es_client, index=index_name, query={ "_source": [ "spu_id", "title", "vendor", "category_path", "category_name", "image_url", "skus", "tags", ], "query": {"match_all": {}}, }, size=500, preserve_order=False, clear_scroll=True, ): source = dict(hit.get("_source") or {}) source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") docs.append(source) self.store.upsert_corpus_docs(self.tenant_id, docs) return docs def full_corpus_rerank( self, query: str, docs: Sequence[Dict[str, Any]], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked = [] for doc in docs: spu_id = str(doc.get("spu_id")) ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def full_corpus_rerank_outside_exclude( self, query: str, docs: Sequence[Dict[str, Any]], exclude_spu_ids: set[str], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: """Rerank all corpus docs whose spu_id is not in ``exclude_spu_ids``; excluded IDs are not scored via API.""" exclude_spu_ids = {str(x) for x in exclude_spu_ids} cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [ doc for doc in docs if str(doc.get("spu_id")) not in exclude_spu_ids and str(doc.get("spu_id")) and (force_refresh or str(doc.get("spu_id")) not in cached) ] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked: List[Dict[str, Any]] = [] for doc in docs: spu_id = str(doc.get("spu_id") or "") if not spu_id or spu_id in exclude_spu_ids: continue ranked.append( {"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc} ) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: if not docs: return [] doc_texts = [build_rerank_doc(doc) for doc in docs] try: scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) return scores except Exception: if len(docs) == 1: return [-1.0] if len(docs) <= 6: scores: List[float] = [] for doc in docs: scores.extend(self._rerank_batch_with_retry(query, [doc])) return scores mid = len(docs) // 2 left = self._rerank_batch_with_retry(query, docs[:mid]) right = self._rerank_batch_with_retry(query, docs[mid:]) return left + right def annotate_missing_labels( self, query: str, docs: Sequence[Dict[str, Any]], force_refresh: bool = False, ) -> Dict[str, str]: labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] if not missing_docs: return labels for start in range(0, len(missing_docs), self.label_client.batch_size): batch = missing_docs[start : start + self.label_client.batch_size] batch_pairs = self._classify_with_retry(query, batch, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) return labels def _classify_with_retry( self, query: str, docs: Sequence[Dict[str, Any]], *, force_refresh: bool = False, ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: if not docs: return [] try: labels, raw_response = self.label_client.classify_batch(query, docs) return [(labels, raw_response, docs)] except Exception: if len(docs) == 1: raise mid = len(docs) // 2 return self._classify_with_retry(query, docs[:mid], force_refresh=force_refresh) + self._classify_with_retry(query, docs[mid:], force_refresh=force_refresh) def _annotate_rebuild_batches( self, query: str, ordered_docs: Sequence[Dict[str, Any]], *, batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, irrelevant_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, force_refresh: bool = True, ) -> Tuple[Dict[str, str], List[Dict[str, Any]]]: """LLM-label ``ordered_docs`` in fixed-size batches along list order. **Early stop** (only after ``min_batches`` full batches have completed): Per batch, let *n* = batch size, and count labels among docs in that batch only. - *bad batch* iff **both** (strict ``>``): - ``#(Irrelevant)/n > irrelevant_stop_ratio`` (default 0.939), and - ``( #(Irrelevant) + #(Low Relevant) ) / n > irrelevant_low_combined_stop_ratio`` (default 0.959; weak relevance = ``RELEVANCE_LOW``). Maintain a streak of consecutive *bad* batches; any non-bad batch resets the streak to 0. Stop labeling when ``streak >= stop_streak`` (default 3) or when ``max_batches`` is reached or the ordered list is exhausted. Constants for defaults: ``eval_framework.constants`` (``DEFAULT_REBUILD_*``). """ batch_logs: List[Dict[str, Any]] = [] streak = 0 labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) total_ordered = len(ordered_docs) for batch_idx in range(max_batches): start = batch_idx * batch_size batch_docs = list(ordered_docs[start : start + batch_size]) if not batch_docs: break batch_pairs = self._classify_with_retry(query, batch_docs, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) n = len(batch_docs) exact_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_EXACT) irrel_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_IRRELEVANT) low_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_LOW) exact_ratio = exact_n / n if n else 0.0 irrelevant_ratio = irrel_n / n if n else 0.0 low_ratio = low_n / n if n else 0.0 irrel_low_ratio = (irrel_n + low_n) / n if n else 0.0 log_entry = { "batch_index": batch_idx + 1, "size": n, "exact_ratio": round(exact_ratio, 6), "irrelevant_ratio": round(irrelevant_ratio, 6), "low_ratio": round(low_ratio, 6), "irrelevant_plus_low_ratio": round(irrel_low_ratio, 6), "offset_start": start, "offset_end": min(start + n, total_ordered), } batch_logs.append(log_entry) print( f"[eval-rebuild] query={query!r} llm_batch={batch_idx + 1}/{max_batches} " f"size={n} exact_ratio={exact_ratio:.4f} irrelevant_ratio={irrelevant_ratio:.4f} " f"irrel_plus_low_ratio={irrel_low_ratio:.4f}", flush=True, ) # Early-stop streak: only evaluated after min_batches (warm-up before trusting tail quality). if batch_idx + 1 >= min_batches: bad_batch = (irrelevant_ratio > irrelevant_stop_ratio) and ( irrel_low_ratio > irrelevant_low_combined_stop_ratio ) if bad_batch: streak += 1 else: streak = 0 if streak >= stop_streak: print( f"[eval-rebuild] query={query!r} early_stop after {batch_idx + 1} batches " f"({stop_streak} consecutive batches: irrelevant>{irrelevant_stop_ratio} " f"and irrel+low>{irrelevant_low_combined_stop_ratio})", flush=True, ) break return labels, batch_logs def build_query_annotation_set( self, query: str, *, search_depth: int = 1000, rerank_depth: int = 10000, annotate_search_top_k: int = 120, annotate_rerank_top_k: int = 200, language: str = "en", force_refresh_rerank: bool = False, force_refresh_labels: bool = False, search_recall_top_k: int = DEFAULT_SEARCH_RECALL_TOP_K, rerank_high_threshold: float = DEFAULT_RERANK_HIGH_THRESHOLD, rerank_high_skip_count: int = DEFAULT_RERANK_HIGH_SKIP_COUNT, rebuild_llm_batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, rebuild_min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, rebuild_max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, rebuild_irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, rebuild_irrel_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, rebuild_irrelevant_stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, ) -> QueryBuildResult: """Build per-query annotation pool and write ``query_builds/*.json``. Normal mode unions search + rerank windows and fills missing labels once. **Rebuild mode** (``force_refresh_labels=True``): full recall pool + corpus rerank outside pool, optional skip for "easy" queries, then batched LLM labeling with **early stop**; see ``_build_query_annotation_set_rebuild`` and ``_annotate_rebuild_batches`` (docstring spells out the bad-batch / streak rule). Rebuild tuning knobs: ``rebuild_*`` and ``search_recall_top_k`` parameters below; CLI mirrors them under ``build --force-refresh-labels``. """ if force_refresh_labels: return self._build_query_annotation_set_rebuild( query=query, search_depth=search_depth, rerank_depth=rerank_depth, language=language, force_refresh_rerank=force_refresh_rerank, search_recall_top_k=search_recall_top_k, rerank_high_threshold=rerank_high_threshold, rerank_high_skip_count=rerank_high_skip_count, rebuild_llm_batch_size=rebuild_llm_batch_size, rebuild_min_batches=rebuild_min_batches, rebuild_max_batches=rebuild_max_batches, rebuild_irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, rebuild_irrel_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, rebuild_irrelevant_stop_streak=rebuild_irrelevant_stop_streak, ) search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) search_results = list(search_payload.get("results") or []) corpus = self.corpus_docs(refresh=False) full_rerank = self.full_corpus_rerank( query=query, docs=corpus, force_refresh=force_refresh_rerank, ) rerank_depth_effective = min(rerank_depth, len(full_rerank)) pool_docs: Dict[str, Dict[str, Any]] = {} for doc in search_results[:annotate_search_top_k]: pool_docs[str(doc.get("spu_id"))] = doc for item in full_rerank[:annotate_rerank_top_k]: pool_docs[str(item["spu_id"])] = item["doc"] labels = self.annotate_missing_labels( query=query, docs=list(pool_docs.values()), force_refresh=force_refresh_labels, ) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": None, "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "annotate_search_top_k": annotate_search_top_k, "annotate_rerank_top_k": annotate_rerank_top_k, "pool_size": len(pool_docs), }, "metrics_top100": metrics, "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=len(pool_docs), output_json_path=output_json_path, ) def _build_query_annotation_set_rebuild( self, query: str, *, search_depth: int, rerank_depth: int, language: str, force_refresh_rerank: bool, search_recall_top_k: int, rerank_high_threshold: float, rerank_high_skip_count: int, rebuild_llm_batch_size: int, rebuild_min_batches: int, rebuild_max_batches: int, rebuild_irrelevant_stop_ratio: float, rebuild_irrel_low_combined_stop_ratio: float, rebuild_irrelevant_stop_streak: int, ) -> QueryBuildResult: search_size = max(int(search_depth), int(search_recall_top_k)) search_payload = self.search_client.search(query=query, size=search_size, from_=0, language=language) search_results = list(search_payload.get("results") or []) recall_n = min(int(search_recall_top_k), len(search_results)) pool_search_docs = search_results[:recall_n] pool_spu_ids = {str(d.get("spu_id")) for d in pool_search_docs if str(d.get("spu_id") or "").strip()} corpus = self.corpus_docs(refresh=False) corpus_by_id = {str(d.get("spu_id")): d for d in corpus if str(d.get("spu_id") or "").strip()} ranked_outside = self.full_corpus_rerank_outside_exclude( query=query, docs=corpus, exclude_spu_ids=pool_spu_ids, force_refresh=force_refresh_rerank, ) rerank_high_n = sum(1 for item in ranked_outside if float(item["score"]) > float(rerank_high_threshold)) rebuild_meta: Dict[str, Any] = { "mode": "rebuild_v1", "search_recall_top_k": search_recall_top_k, "recall_pool_size": len(pool_spu_ids), "pool_rerank_score_assigned": 1.0, "rerank_high_threshold": rerank_high_threshold, "rerank_high_count_outside_pool": rerank_high_n, "rerank_high_skip_count": rerank_high_skip_count, "rebuild_llm_batch_size": rebuild_llm_batch_size, "rebuild_min_batches": rebuild_min_batches, "rebuild_max_batches": rebuild_max_batches, "rebuild_irrelevant_stop_ratio": rebuild_irrelevant_stop_ratio, "rebuild_irrel_low_combined_stop_ratio": rebuild_irrel_low_combined_stop_ratio, "rebuild_irrelevant_stop_streak": rebuild_irrelevant_stop_streak, } batch_logs: List[Dict[str, Any]] = [] skipped = False skip_reason: str | None = None labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) llm_labeled_total = 0 if rerank_high_n > int(rerank_high_skip_count): skipped = True skip_reason = "too_many_high_rerank_scores" print( f"[eval-rebuild] query={query!r} skip: rerank_score>{rerank_high_threshold} " f"outside recall pool count={rerank_high_n} > {rerank_high_skip_count} " f"(relevant tail too large / query too easy to satisfy)", flush=True, ) else: ordered_docs: List[Dict[str, Any]] = [] seen_ordered: set[str] = set() for doc in pool_search_docs: sid = str(doc.get("spu_id") or "") if not sid or sid in seen_ordered: continue seen_ordered.add(sid) ordered_docs.append(corpus_by_id.get(sid, doc)) for item in ranked_outside: sid = str(item["spu_id"]) if sid in seen_ordered: continue seen_ordered.add(sid) ordered_docs.append(item["doc"]) labels, batch_logs = self._annotate_rebuild_batches( query, ordered_docs, batch_size=rebuild_llm_batch_size, min_batches=rebuild_min_batches, max_batches=rebuild_max_batches, irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, irrelevant_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, stop_streak=rebuild_irrelevant_stop_streak, force_refresh=True, ) llm_labeled_total = sum(int(entry.get("size") or 0) for entry in batch_logs) rebuild_meta["skipped"] = skipped rebuild_meta["skip_reason"] = skip_reason rebuild_meta["llm_batch_logs"] = batch_logs rebuild_meta["llm_labeled_total"] = llm_labeled_total rerank_depth_effective = min(int(rerank_depth), len(ranked_outside)) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) in_pool = rank <= recall_n search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": 1.0 if in_pool else None, "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(ranked_outside[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" pool_docs_count = len(pool_spu_ids) + len(ranked_outside) payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "rebuild": rebuild_meta, "ordered_union_size": pool_docs_count, }, "metrics_top100": metrics, "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=llm_labeled_total if not skipped else 0, output_json_path=output_json_path, ) def evaluate_live_query( self, query: str, top_k: int = 100, auto_annotate: bool = False, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: search_payload = self.search_client.search( query=query, size=max(top_k, 100), from_=0, language=language, debug=True ) zh_by_spu = _zh_titles_from_debug_per_result(search_payload.get("debug_info")) results = list(search_payload.get("results") or []) if auto_annotate: self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) labels = self.store.get_labels(self.tenant_id, query) recalled_spu_ids = {str(doc.get("spu_id")) for doc in results[:top_k]} labeled = [] unlabeled_hits = 0 for rank, doc in enumerate(results[:top_k], start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) if label not in VALID_LABELS: unlabeled_hits += 1 primary_title = build_display_title(doc) title_zh = zh_by_spu.get(spu_id) or "" if not title_zh and isinstance(doc.get("title"), dict): title_zh = zh_title_from_multilingual(doc.get("title")) labeled.append( { "rank": rank, "spu_id": spu_id, "title": primary_title, "title_zh": title_zh if title_zh and title_zh != primary_title else "", "image_url": doc.get("image_url"), "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) metric_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in labeled ] label_stats = self.store.get_query_label_stats(self.tenant_id, query) rerank_scores = self.store.get_rerank_scores(self.tenant_id, query) relevant_missing_ids = [ spu_id for spu_id, label in labels.items() if label in RELEVANCE_NON_IRRELEVANT and spu_id not in recalled_spu_ids ] missing_docs_map = self.store.get_corpus_docs_by_spu_ids(self.tenant_id, relevant_missing_ids) missing_relevant = [] for spu_id in relevant_missing_ids: doc = missing_docs_map.get(spu_id) if not doc: continue miss_title = build_display_title(doc) miss_zh = zh_title_from_multilingual(doc.get("title")) if isinstance(doc.get("title"), dict) else "" missing_relevant.append( { "spu_id": spu_id, "label": labels[spu_id], "rerank_score": rerank_scores.get(spu_id), "title": miss_title, "title_zh": miss_zh if miss_zh and miss_zh != miss_title else "", "image_url": doc.get("image_url"), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) label_order = { RELEVANCE_EXACT: 0, RELEVANCE_HIGH: 1, RELEVANCE_LOW: 2, RELEVANCE_IRRELEVANT: 3, } missing_relevant.sort( key=lambda item: ( label_order.get(str(item.get("label")), 9), -(float(item.get("rerank_score")) if item.get("rerank_score") is not None else float("-inf")), str(item.get("title") or ""), ) ) tips: List[str] = [] if auto_annotate: tips.append("Single-query evaluation used cached labels and refreshed missing labels for recalled results.") else: tips.append("Single-query evaluation used the offline annotation cache only; recalled SPUs without cached labels were treated as Irrelevant.") if label_stats["total"] == 0: tips.append("This query has no offline annotation set yet. Build or refresh labels first if you want stable evaluation.") if unlabeled_hits: tips.append(f"{unlabeled_hits} recalled results were not in the annotation set and were counted as Irrelevant.") if not missing_relevant: tips.append("No cached non-irrelevant products were missed by this recall set.") return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": compute_query_metrics(metric_labels), "results": labeled, "missing_relevant": missing_relevant, "label_stats": { **label_stats, "unlabeled_hits_treated_irrelevant": unlabeled_hits, "recalled_hits": len(labeled), "missing_relevant_count": len(missing_relevant), "missing_exact_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_EXACT), "missing_high_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_HIGH), "missing_low_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_LOW), }, "tips": tips, "total": int(search_payload.get("total") or 0), } def batch_evaluate( self, queries: Sequence[str], *, top_k: int = 100, auto_annotate: bool = True, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: per_query = [] for query in queries: live = self.evaluate_live_query( query, top_k=top_k, auto_annotate=auto_annotate, language=language, force_refresh_labels=force_refresh_labels, ) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] per_query.append( { "query": live["query"], "tenant_id": live["tenant_id"], "top_k": live["top_k"], "metrics": live["metrics"], "distribution": label_distribution(labels), "total": live["total"], } ) aggregate = aggregate_metrics([item["metrics"] for item in per_query]) aggregate_distribution = { RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), RELEVANCE_HIGH: sum(item["distribution"][RELEVANCE_HIGH] for item in per_query), RELEVANCE_LOW: sum(item["distribution"][RELEVANCE_LOW] for item in per_query), RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), } batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" report_dir = ensure_dir(self.artifact_root / "batch_reports") config_snapshot_path = report_dir / f"{batch_id}_config.json" config_snapshot = requests.get("http://localhost:6002/admin/config", timeout=20).json() config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") output_json_path = report_dir / f"{batch_id}.json" report_md_path = report_dir / f"{batch_id}.md" payload = { "batch_id": batch_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "queries": list(queries), "top_k": top_k, "aggregate_metrics": aggregate, "aggregate_distribution": aggregate_distribution, "per_query": per_query, "config_snapshot_path": str(config_snapshot_path), } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) return payload