"""Core orchestration: corpus, rerank, LLM labels, live/batch evaluation.""" from __future__ import annotations import json import logging import time from pathlib import Path from typing import Any, Dict, List, Sequence, Tuple import requests from elasticsearch.helpers import scan from api.app import get_app_config, get_es_client, init_service from indexer.mapping_generator import get_tenant_index_name from .clients import DashScopeLabelClient, RerankServiceClient, SearchServiceClient from .constants import ( DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, DEFAULT_REBUILD_LLM_BATCH_SIZE, DEFAULT_REBUILD_MAX_LLM_BATCHES, DEFAULT_REBUILD_MIN_LLM_BATCHES, DEFAULT_RERANK_HIGH_SKIP_COUNT, DEFAULT_RERANK_HIGH_THRESHOLD, DEFAULT_SEARCH_RECALL_TOP_K, RELEVANCE_EXACT, RELEVANCE_GAIN_MAP, RELEVANCE_HIGH, RELEVANCE_IRRELEVANT, RELEVANCE_LOW, RELEVANCE_NON_IRRELEVANT, VALID_LABELS, ) from .metrics import aggregate_metrics, compute_query_metrics, label_distribution from .reports import render_batch_report_markdown from .store import EvalStore, QueryBuildResult from .utils import ( build_display_title, build_rerank_doc, compact_option_values, compact_product_payload, ensure_dir, sha1_text, utc_now_iso, utc_timestamp, zh_title_from_multilingual, ) _log = logging.getLogger("search_eval.framework") def _metric_context_payload() -> Dict[str, Any]: return { "primary_metric": "NDCG@10", "gain_scheme": dict(RELEVANCE_GAIN_MAP), "notes": [ "NDCG uses graded gains derived from the four relevance labels.", "Strong metrics treat Exact Match and High Relevant as strong business positives.", "Useful metrics treat any non-irrelevant item as useful recall coverage.", ], } def _zh_titles_from_debug_per_result(debug_info: Any) -> Dict[str, str]: """Map ``spu_id`` -> Chinese title from ``debug_info.per_result[].title_multilingual``.""" out: Dict[str, str] = {} if not isinstance(debug_info, dict): return out for entry in debug_info.get("per_result") or []: if not isinstance(entry, dict): continue spu_id = str(entry.get("spu_id") or "").strip() if not spu_id: continue zh = zh_title_from_multilingual(entry.get("title_multilingual")) if zh: out[spu_id] = zh return out class SearchEvaluationFramework: def __init__( self, tenant_id: str, artifact_root: Path | None = None, search_base_url: str | None = None, *, judge_model: str | None = None, enable_thinking: bool | None = None, use_dashscope_batch: bool | None = None, intent_model: str | None = None, intent_enable_thinking: bool | None = None, ): app_cfg = get_app_config() se = app_cfg.search_evaluation init_service(app_cfg.infrastructure.elasticsearch.host) self.tenant_id = str(tenant_id) self.artifact_root = ensure_dir(artifact_root if artifact_root is not None else se.artifact_root) self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") sb = search_base_url if search_base_url is not None else se.search_base_url self.search_client = SearchServiceClient(sb, self.tenant_id) rerank_service_url = str( app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] ) self.rerank_client = RerankServiceClient(rerank_service_url) llm_cfg = app_cfg.services.translation.capabilities["llm"] api_key = app_cfg.infrastructure.secrets.dashscope_api_key if not api_key: raise RuntimeError("dashscope_api_key is required for search evaluation annotation") model = str(judge_model if judge_model is not None else se.judge_model) et = se.judge_enable_thinking if enable_thinking is None else enable_thinking use_batch = se.judge_dashscope_batch if use_dashscope_batch is None else use_dashscope_batch batch_window = se.judge_batch_completion_window batch_poll = float(se.judge_batch_poll_interval_sec) self.label_client = DashScopeLabelClient( model=model, base_url=str(llm_cfg["base_url"]), api_key=str(api_key), batch_completion_window=batch_window, batch_poll_interval_sec=batch_poll, enable_thinking=et, use_batch=use_batch, ) intent_m = str(intent_model if intent_model is not None else se.intent_model) intent_et = se.intent_enable_thinking if intent_enable_thinking is None else intent_enable_thinking self.intent_client = DashScopeLabelClient( model=intent_m, base_url=str(llm_cfg["base_url"]), api_key=str(api_key), batch_completion_window=batch_window, batch_poll_interval_sec=batch_poll, enable_thinking=bool(intent_et), use_batch=False, ) self._query_intent_cache: Dict[str, str] = {} def _ensure_query_intent_block(self, query: str) -> str: if query not in self._query_intent_cache: text, _raw = self.intent_client.query_intent(query) self._query_intent_cache[query] = str(text or "").strip() return self._query_intent_cache[query] def audit_live_query( self, query: str, *, top_k: int = 100, language: str = "en", auto_annotate: bool = False, ) -> Dict[str, Any]: live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": live["metrics"], "distribution": label_distribution(labels), "query_profile": None, "suspicious": [], "results": live["results"], } def queries_from_file(self, path: Path) -> List[str]: return [ line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: if not refresh and self.store.has_corpus(self.tenant_id): return self.store.get_corpus_docs(self.tenant_id) es_client = get_es_client().client index_name = get_tenant_index_name(self.tenant_id) docs: List[Dict[str, Any]] = [] for hit in scan( client=es_client, index=index_name, query={ "_source": [ "spu_id", "title", "vendor", "category_path", "category_name", "image_url", "skus", "tags", ], "query": {"match_all": {}}, }, size=500, preserve_order=False, clear_scroll=True, ): source = dict(hit.get("_source") or {}) source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") docs.append(source) self.store.upsert_corpus_docs(self.tenant_id, docs) return docs def full_corpus_rerank( self, query: str, docs: Sequence[Dict[str, Any]], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked = [] for doc in docs: spu_id = str(doc.get("spu_id")) ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def full_corpus_rerank_outside_exclude( self, query: str, docs: Sequence[Dict[str, Any]], exclude_spu_ids: set[str], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: """Rerank all corpus docs whose spu_id is not in ``exclude_spu_ids``; excluded IDs are not scored via API.""" exclude_spu_ids = {str(x) for x in exclude_spu_ids} cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [ doc for doc in docs if str(doc.get("spu_id")) not in exclude_spu_ids and str(doc.get("spu_id")) and (force_refresh or str(doc.get("spu_id")) not in cached) ] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked: List[Dict[str, Any]] = [] for doc in docs: spu_id = str(doc.get("spu_id") or "") if not spu_id or spu_id in exclude_spu_ids: continue ranked.append( {"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc} ) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def _assign_fixed_rerank_scores( self, query: str, spu_ids: Sequence[str], *, score: float, force_refresh: bool = False, ) -> Dict[str, float]: """Persist a fixed rerank score for a deduplicated ``spu_id`` list.""" normalized_ids: List[str] = [] seen: set[str] = set() for spu_id in spu_ids: sid = str(spu_id or "").strip() if not sid or sid in seen: continue seen.add(sid) normalized_ids.append(sid) if not normalized_ids: return {} cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) to_store: Dict[str, float] = {} for sid in normalized_ids: if force_refresh or sid not in cached or float(cached[sid]) != float(score): to_store[sid] = float(score) if to_store: self.store.upsert_rerank_scores( self.tenant_id, query, to_store, model_name="search_recall_pool_fixed", ) return {sid: float(score) for sid in normalized_ids} def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: if not docs: return [] doc_texts = [build_rerank_doc(doc) for doc in docs] try: scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) return scores except Exception: if len(docs) == 1: return [-1.0] if len(docs) <= 6: scores: List[float] = [] for doc in docs: scores.extend(self._rerank_batch_with_retry(query, [doc])) return scores mid = len(docs) // 2 left = self._rerank_batch_with_retry(query, docs[:mid]) right = self._rerank_batch_with_retry(query, docs[mid:]) return left + right def annotate_missing_labels( self, query: str, docs: Sequence[Dict[str, Any]], force_refresh: bool = False, ) -> Dict[str, str]: labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] if not missing_docs: return labels for start in range(0, len(missing_docs), self.label_client.batch_size): batch = missing_docs[start : start + self.label_client.batch_size] batch_pairs = self._classify_with_retry(query, batch, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) return labels def _classify_with_retry( self, query: str, docs: Sequence[Dict[str, Any]], *, force_refresh: bool = False, ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: if not docs: return [] try: intent_block = self._ensure_query_intent_block(query) labels, raw_response = self.label_client.classify_batch( query, docs, query_intent_block=intent_block ) return [(labels, raw_response, docs)] except Exception: _log.exception( "[eval-rebuild] classify failed query=%r docs=%s; %s", query, len(docs), "splitting batch" if len(docs) > 1 else "single-doc failure", ) if len(docs) == 1: raise mid = len(docs) // 2 return self._classify_with_retry(query, docs[:mid], force_refresh=force_refresh) + self._classify_with_retry(query, docs[mid:], force_refresh=force_refresh) def _annotate_rebuild_batches( self, query: str, ordered_docs: Sequence[Dict[str, Any]], *, batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, irrelevant_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, force_refresh: bool = True, ) -> Tuple[Dict[str, str], List[Dict[str, Any]]]: """LLM-label ``ordered_docs`` in fixed-size batches along list order. **Early stop** (only after ``min_batches`` full batches have completed): Per batch, let *n* = batch size, and count labels among docs in that batch only. - *bad batch* iff **both** (strict ``>``): - ``#(Irrelevant)/n > irrelevant_stop_ratio`` (default 0.939), and - ``( #(Irrelevant) + #(Low Relevant) ) / n > irrelevant_low_combined_stop_ratio`` (default 0.959; weak relevance = ``RELEVANCE_LOW``). Maintain a streak of consecutive *bad* batches; any non-bad batch resets the streak to 0. Stop labeling when ``streak >= stop_streak`` (default 3) or when ``max_batches`` is reached or the ordered list is exhausted. Constants for defaults: ``eval_framework.constants`` (``DEFAULT_REBUILD_*``). """ batch_logs: List[Dict[str, Any]] = [] streak = 0 labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) total_ordered = len(ordered_docs) for batch_idx in range(max_batches): start = batch_idx * batch_size batch_docs = list(ordered_docs[start : start + batch_size]) if not batch_docs: break _log.info( "[eval-rebuild] query=%r starting llm_batch=%s/%s size=%s offset=%s", query, batch_idx + 1, max_batches, len(batch_docs), start, ) batch_pairs = self._classify_with_retry(query, batch_docs, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) n = len(batch_docs) exact_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_EXACT) irrel_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_IRRELEVANT) low_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_LOW) exact_ratio = exact_n / n if n else 0.0 irrelevant_ratio = irrel_n / n if n else 0.0 low_ratio = low_n / n if n else 0.0 irrel_low_ratio = (irrel_n + low_n) / n if n else 0.0 log_entry = { "batch_index": batch_idx + 1, "size": n, "exact_ratio": round(exact_ratio, 6), "irrelevant_ratio": round(irrelevant_ratio, 6), "low_ratio": round(low_ratio, 6), "irrelevant_plus_low_ratio": round(irrel_low_ratio, 6), "offset_start": start, "offset_end": min(start + n, total_ordered), } batch_logs.append(log_entry) _log.info( "[eval-rebuild] query=%r llm_batch=%s/%s size=%s exact_ratio=%.4f irrelevant_ratio=%.4f " "irrel_plus_low_ratio=%.4f", query, batch_idx + 1, max_batches, n, exact_ratio, irrelevant_ratio, irrel_low_ratio, ) # Early-stop streak: only evaluated after min_batches (warm-up before trusting tail quality). if batch_idx + 1 >= min_batches: bad_batch = (irrelevant_ratio > irrelevant_stop_ratio) and ( irrel_low_ratio > irrelevant_low_combined_stop_ratio ) if bad_batch: streak += 1 else: streak = 0 if streak >= stop_streak: _log.info( "[eval-rebuild] query=%r early_stop after %s batches (%s consecutive batches: " "irrelevant>%s and irrel+low>%s)", query, batch_idx + 1, stop_streak, irrelevant_stop_ratio, irrelevant_low_combined_stop_ratio, ) break return labels, batch_logs def build_query_annotation_set( self, query: str, *, search_depth: int = 1000, rerank_depth: int = 10000, annotate_search_top_k: int = 120, annotate_rerank_top_k: int = 200, language: str = "en", force_refresh_rerank: bool = False, force_refresh_labels: bool = False, search_recall_top_k: int = DEFAULT_SEARCH_RECALL_TOP_K, rerank_high_threshold: float = DEFAULT_RERANK_HIGH_THRESHOLD, rerank_high_skip_count: int = DEFAULT_RERANK_HIGH_SKIP_COUNT, rebuild_llm_batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, rebuild_min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, rebuild_max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, rebuild_irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, rebuild_irrel_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, rebuild_irrelevant_stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, ) -> QueryBuildResult: """Build per-query annotation pool and write ``query_builds/*.json``. Normal mode unions search + rerank windows and fills missing labels once. **Rebuild mode** (``force_refresh_labels=True``): full recall pool + corpus rerank outside pool, optional skip for "easy" queries, then batched LLM labeling with **early stop**; see ``_build_query_annotation_set_rebuild`` and ``_annotate_rebuild_batches`` (docstring spells out the bad-batch / streak rule). Rebuild tuning knobs: ``rebuild_*`` and ``search_recall_top_k`` parameters below; CLI mirrors them under ``build --force-refresh-labels``. """ if force_refresh_labels: return self._build_query_annotation_set_rebuild( query=query, search_depth=search_depth, rerank_depth=rerank_depth, language=language, force_refresh_rerank=force_refresh_rerank, search_recall_top_k=search_recall_top_k, rerank_high_threshold=rerank_high_threshold, rerank_high_skip_count=rerank_high_skip_count, rebuild_llm_batch_size=rebuild_llm_batch_size, rebuild_min_batches=rebuild_min_batches, rebuild_max_batches=rebuild_max_batches, rebuild_irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, rebuild_irrel_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, rebuild_irrelevant_stop_streak=rebuild_irrelevant_stop_streak, ) search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) search_results = list(search_payload.get("results") or []) corpus = self.corpus_docs(refresh=False) full_rerank = self.full_corpus_rerank( query=query, docs=corpus, force_refresh=force_refresh_rerank, ) rerank_depth_effective = min(rerank_depth, len(full_rerank)) pool_docs: Dict[str, Dict[str, Any]] = {} for doc in search_results[:annotate_search_top_k]: pool_docs[str(doc.get("spu_id"))] = doc for item in full_rerank[:annotate_rerank_top_k]: pool_docs[str(item["spu_id"])] = item["doc"] labels = self.annotate_missing_labels( query=query, docs=list(pool_docs.values()), force_refresh=force_refresh_labels, ) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": None, "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels, ideal_labels=list(labels.values())) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": self.search_client.get_json("/admin/config/meta", timeout=20), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "annotate_search_top_k": annotate_search_top_k, "annotate_rerank_top_k": annotate_rerank_top_k, "pool_size": len(pool_docs), }, "metrics_top100": metrics, "metric_context": _metric_context_payload(), "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=len(pool_docs), output_json_path=output_json_path, ) def _build_query_annotation_set_rebuild( self, query: str, *, search_depth: int, rerank_depth: int, language: str, force_refresh_rerank: bool, search_recall_top_k: int, rerank_high_threshold: float, rerank_high_skip_count: int, rebuild_llm_batch_size: int, rebuild_min_batches: int, rebuild_max_batches: int, rebuild_irrelevant_stop_ratio: float, rebuild_irrel_low_combined_stop_ratio: float, rebuild_irrelevant_stop_streak: int, ) -> QueryBuildResult: search_size = max(int(search_depth), int(search_recall_top_k)) search_payload = self.search_client.search(query=query, size=search_size, from_=0, language=language) search_results = list(search_payload.get("results") or []) search_result_spu_ids = [str(doc.get("spu_id") or "").strip() for doc in search_results] recall_spu_ids: List[str] = [] seen_recall_spu_ids: set[str] = set() for spu_id in search_result_spu_ids[: int(search_recall_top_k)]: if not spu_id or spu_id in seen_recall_spu_ids: continue seen_recall_spu_ids.add(spu_id) recall_spu_ids.append(spu_id) recall_n = len(recall_spu_ids) pool_spu_ids = set(recall_spu_ids) corpus = self.corpus_docs(refresh=False) corpus_by_id = {str(d.get("spu_id")): d for d in corpus if str(d.get("spu_id") or "").strip()} self._assign_fixed_rerank_scores( query=query, spu_ids=recall_spu_ids, score=1.0, force_refresh=force_refresh_rerank, ) rerank_pending_n = sum( 1 for d in corpus if str(d.get("spu_id") or "").strip() and str(d.get("spu_id")) not in pool_spu_ids ) _log.info( "[eval-rebuild] query=%r phase=rerank_outside_pool docs≈%s (pool=%s, force_refresh_rerank=%s); " "this can take a long time with no further logs until LLM batches start", query, rerank_pending_n, len(pool_spu_ids), force_refresh_rerank, ) ranked_outside = self.full_corpus_rerank_outside_exclude( query=query, docs=corpus, exclude_spu_ids=pool_spu_ids, force_refresh=force_refresh_rerank, ) rerank_high_n = sum(1 for item in ranked_outside if float(item["score"]) > float(rerank_high_threshold)) rebuild_meta: Dict[str, Any] = { "mode": "rebuild_v1", "search_recall_top_k": search_recall_top_k, "recall_pool_size": len(pool_spu_ids), "pool_rerank_score_assigned": 1.0, "rerank_high_threshold": rerank_high_threshold, "rerank_high_count_outside_pool": rerank_high_n, "rerank_high_skip_count": rerank_high_skip_count, "rebuild_llm_batch_size": rebuild_llm_batch_size, "rebuild_min_batches": rebuild_min_batches, "rebuild_max_batches": rebuild_max_batches, "rebuild_irrelevant_stop_ratio": rebuild_irrelevant_stop_ratio, "rebuild_irrel_low_combined_stop_ratio": rebuild_irrel_low_combined_stop_ratio, "rebuild_irrelevant_stop_streak": rebuild_irrelevant_stop_streak, } batch_logs: List[Dict[str, Any]] = [] skipped = False skip_reason: str | None = None labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) llm_labeled_total = 0 if rerank_high_n > int(rerank_high_skip_count): skipped = True skip_reason = "too_many_high_rerank_scores" _log.info( "[eval-rebuild] query=%r skip: rerank_score>%s outside recall pool count=%s > %s " "(relevant tail too large / query too easy to satisfy)", query, rerank_high_threshold, rerank_high_n, rerank_high_skip_count, ) else: ordered_docs: List[Dict[str, Any]] = [] seen_ordered: set[str] = set() for sid in recall_spu_ids: if not sid or sid in seen_ordered: continue seen_ordered.add(sid) doc = corpus_by_id.get(sid) if doc is not None: ordered_docs.append(doc) for item in ranked_outside: sid = str(item["spu_id"]) if sid in seen_ordered: continue seen_ordered.add(sid) ordered_docs.append(item["doc"]) labels, batch_logs = self._annotate_rebuild_batches( query, ordered_docs, batch_size=rebuild_llm_batch_size, min_batches=rebuild_min_batches, max_batches=rebuild_max_batches, irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, irrelevant_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, stop_streak=rebuild_irrelevant_stop_streak, force_refresh=True, ) llm_labeled_total = sum(int(entry.get("size") or 0) for entry in batch_logs) rebuild_meta["skipped"] = skipped rebuild_meta["skip_reason"] = skip_reason rebuild_meta["llm_batch_logs"] = batch_logs rebuild_meta["llm_labeled_total"] = llm_labeled_total rerank_depth_effective = min(int(rerank_depth), len(ranked_outside)) search_labeled_results: List[Dict[str, Any]] = [] for rank, search_doc in enumerate(search_results, start=1): spu_id = str(search_doc.get("spu_id") or "") doc = corpus_by_id.get(spu_id, search_doc) in_pool = spu_id in pool_spu_ids search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": 1.0 if in_pool else None, "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(ranked_outside[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels, ideal_labels=list(labels.values())) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" pool_docs_count = len(pool_spu_ids) + len(ranked_outside) payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": self.search_client.get_json("/admin/config/meta", timeout=20), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "rebuild": rebuild_meta, "ordered_union_size": pool_docs_count, }, "metrics_top100": metrics, "metric_context": _metric_context_payload(), "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=llm_labeled_total if not skipped else 0, output_json_path=output_json_path, ) def evaluate_live_query( self, query: str, top_k: int = 100, auto_annotate: bool = False, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: search_payload = self.search_client.search( query=query, size=max(top_k, 100), from_=0, language=language, debug=True ) zh_by_spu = _zh_titles_from_debug_per_result(search_payload.get("debug_info")) results = list(search_payload.get("results") or []) if auto_annotate: self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) labels = self.store.get_labels(self.tenant_id, query) recalled_spu_ids = {str(doc.get("spu_id")) for doc in results[:top_k]} labeled = [] unlabeled_hits = 0 for rank, doc in enumerate(results[:top_k], start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) if label not in VALID_LABELS: unlabeled_hits += 1 primary_title = build_display_title(doc) title_zh = zh_by_spu.get(spu_id) or "" if not title_zh and isinstance(doc.get("title"), dict): title_zh = zh_title_from_multilingual(doc.get("title")) labeled.append( { "rank": rank, "spu_id": spu_id, "title": primary_title, "title_zh": title_zh if title_zh and title_zh != primary_title else "", "image_url": doc.get("image_url"), "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) metric_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in labeled ] ideal_labels = [ label if label in VALID_LABELS else RELEVANCE_IRRELEVANT for label in labels.values() ] label_stats = self.store.get_query_label_stats(self.tenant_id, query) rerank_scores = self.store.get_rerank_scores(self.tenant_id, query) relevant_missing_ids = [ spu_id for spu_id, label in labels.items() if label in RELEVANCE_NON_IRRELEVANT and spu_id not in recalled_spu_ids ] missing_docs_map = self.store.get_corpus_docs_by_spu_ids(self.tenant_id, relevant_missing_ids) missing_relevant = [] for spu_id in relevant_missing_ids: doc = missing_docs_map.get(spu_id) if not doc: continue miss_title = build_display_title(doc) miss_zh = zh_title_from_multilingual(doc.get("title")) if isinstance(doc.get("title"), dict) else "" missing_relevant.append( { "spu_id": spu_id, "label": labels[spu_id], "rerank_score": rerank_scores.get(spu_id), "title": miss_title, "title_zh": miss_zh if miss_zh and miss_zh != miss_title else "", "image_url": doc.get("image_url"), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) label_order = { RELEVANCE_EXACT: 0, RELEVANCE_HIGH: 1, RELEVANCE_LOW: 2, RELEVANCE_IRRELEVANT: 3, } missing_relevant.sort( key=lambda item: ( label_order.get(str(item.get("label")), 9), -(float(item.get("rerank_score")) if item.get("rerank_score") is not None else float("-inf")), str(item.get("title") or ""), ) ) tips: List[str] = [] if auto_annotate: tips.append("Single-query evaluation used cached labels and refreshed missing labels for recalled results.") else: tips.append("Single-query evaluation used the offline annotation cache only; recalled SPUs without cached labels were treated as Irrelevant.") if label_stats["total"] == 0: tips.append("This query has no offline annotation set yet. Build or refresh labels first if you want stable evaluation.") if unlabeled_hits: tips.append(f"{unlabeled_hits} recalled results were not in the annotation set and were counted as Irrelevant.") if not missing_relevant: tips.append("No cached judged useful products were missed by this recall set.") return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": compute_query_metrics(metric_labels, ideal_labels=ideal_labels), "metric_context": _metric_context_payload(), "results": labeled, "missing_relevant": missing_relevant, "label_stats": { **label_stats, "unlabeled_hits_treated_irrelevant": unlabeled_hits, "recalled_hits": len(labeled), "missing_relevant_count": len(missing_relevant), "missing_exact_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_EXACT), "missing_high_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_HIGH), "missing_low_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_LOW), }, "tips": tips, "total": int(search_payload.get("total") or 0), } def batch_evaluate( self, queries: Sequence[str], *, top_k: int = 100, auto_annotate: bool = True, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: per_query = [] total_q = len(queries) _log.info("[batch-eval] starting %s queries top_k=%s auto_annotate=%s", total_q, top_k, auto_annotate) for q_index, query in enumerate(queries, start=1): live = self.evaluate_live_query( query, top_k=top_k, auto_annotate=auto_annotate, language=language, force_refresh_labels=force_refresh_labels, ) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] per_query.append( { "query": live["query"], "tenant_id": live["tenant_id"], "top_k": live["top_k"], "metrics": live["metrics"], "distribution": label_distribution(labels), "total": live["total"], } ) m = live["metrics"] _log.info( "[batch-eval] (%s/%s) query=%r NDCG@10=%s Strong_Precision@10=%s total_hits=%s", q_index, total_q, query, m.get("NDCG@10"), m.get("Strong_Precision@10"), live.get("total"), ) aggregate = aggregate_metrics([item["metrics"] for item in per_query]) aggregate_distribution = { RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), RELEVANCE_HIGH: sum(item["distribution"][RELEVANCE_HIGH] for item in per_query), RELEVANCE_LOW: sum(item["distribution"][RELEVANCE_LOW] for item in per_query), RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), } batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" report_dir = ensure_dir(self.artifact_root / "batch_reports") config_snapshot_path = report_dir / f"{batch_id}_config.json" config_snapshot = self.search_client.get_json("/admin/config", timeout=20) config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") output_json_path = report_dir / f"{batch_id}.json" report_md_path = report_dir / f"{batch_id}.md" payload = { "batch_id": batch_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "queries": list(queries), "top_k": top_k, "aggregate_metrics": aggregate, "metric_context": _metric_context_payload(), "aggregate_distribution": aggregate_distribution, "per_query": per_query, "config_snapshot_path": str(config_snapshot_path), } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) _log.info( "[batch-eval] finished batch_id=%s per_query=%s json=%s", batch_id, len(per_query), output_json_path, ) return payload