"""Core orchestration: corpus, rerank, LLM labels, live/batch evaluation.""" from __future__ import annotations import json import logging import time from pathlib import Path from typing import Any, Dict, List, Sequence, Tuple import requests from elasticsearch.helpers import scan from api.app import get_app_config, get_es_client, init_service from indexer.mapping_generator import get_tenant_index_name from .clients import DashScopeLabelClient, RerankServiceClient, SearchServiceClient from .constants import ( DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, DEFAULT_REBUILD_LLM_BATCH_SIZE, DEFAULT_REBUILD_MAX_LLM_BATCHES, DEFAULT_REBUILD_MIN_LLM_BATCHES, DEFAULT_RERANK_HIGH_SKIP_COUNT, DEFAULT_RERANK_HIGH_THRESHOLD, DEFAULT_SEARCH_RECALL_TOP_K, RELEVANCE_EXACT, RELEVANCE_HIGH, RELEVANCE_IRRELEVANT, RELEVANCE_LOW, RELEVANCE_NON_IRRELEVANT, VALID_LABELS, ) from .metrics import aggregate_metrics, compute_query_metrics, label_distribution from .reports import render_batch_report_markdown from .store import EvalStore, QueryBuildResult from .utils import ( build_display_title, build_rerank_doc, compact_option_values, compact_product_payload, ensure_dir, sha1_text, utc_now_iso, utc_timestamp, zh_title_from_multilingual, ) _log = logging.getLogger("search_eval.framework") def _zh_titles_from_debug_per_result(debug_info: Any) -> Dict[str, str]: """Map ``spu_id`` -> Chinese title from ``debug_info.per_result[].title_multilingual``.""" out: Dict[str, str] = {} if not isinstance(debug_info, dict): return out for entry in debug_info.get("per_result") or []: if not isinstance(entry, dict): continue spu_id = str(entry.get("spu_id") or "").strip() if not spu_id: continue zh = zh_title_from_multilingual(entry.get("title_multilingual")) if zh: out[spu_id] = zh return out class SearchEvaluationFramework: def __init__( self, tenant_id: str, artifact_root: Path | None = None, search_base_url: str | None = None, *, judge_model: str | None = None, enable_thinking: bool | None = None, use_dashscope_batch: bool | None = None, intent_model: str | None = None, intent_enable_thinking: bool | None = None, ): app_cfg = get_app_config() se = app_cfg.search_evaluation init_service(app_cfg.infrastructure.elasticsearch.host) self.tenant_id = str(tenant_id) self.artifact_root = ensure_dir(artifact_root if artifact_root is not None else se.artifact_root) self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") sb = search_base_url if search_base_url is not None else se.search_base_url self.search_client = SearchServiceClient(sb, self.tenant_id) rerank_service_url = str( app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] ) self.rerank_client = RerankServiceClient(rerank_service_url) llm_cfg = app_cfg.services.translation.capabilities["llm"] api_key = app_cfg.infrastructure.secrets.dashscope_api_key if not api_key: raise RuntimeError("dashscope_api_key is required for search evaluation annotation") model = str(judge_model if judge_model is not None else se.judge_model) et = se.judge_enable_thinking if enable_thinking is None else enable_thinking use_batch = se.judge_dashscope_batch if use_dashscope_batch is None else use_dashscope_batch batch_window = se.judge_batch_completion_window batch_poll = float(se.judge_batch_poll_interval_sec) self.label_client = DashScopeLabelClient( model=model, base_url=str(llm_cfg["base_url"]), api_key=str(api_key), batch_completion_window=batch_window, batch_poll_interval_sec=batch_poll, enable_thinking=et, use_batch=use_batch, ) intent_m = str(intent_model if intent_model is not None else se.intent_model) intent_et = se.intent_enable_thinking if intent_enable_thinking is None else intent_enable_thinking self.intent_client = DashScopeLabelClient( model=intent_m, base_url=str(llm_cfg["base_url"]), api_key=str(api_key), batch_completion_window=batch_window, batch_poll_interval_sec=batch_poll, enable_thinking=bool(intent_et), use_batch=False, ) self._query_intent_cache: Dict[str, str] = {} def _ensure_query_intent_block(self, query: str) -> str: if query not in self._query_intent_cache: text, _raw = self.intent_client.query_intent(query) self._query_intent_cache[query] = str(text or "").strip() return self._query_intent_cache[query] def audit_live_query( self, query: str, *, top_k: int = 100, language: str = "en", auto_annotate: bool = False, ) -> Dict[str, Any]: live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": live["metrics"], "distribution": label_distribution(labels), "query_profile": None, "suspicious": [], "results": live["results"], } def queries_from_file(self, path: Path) -> List[str]: return [ line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: if not refresh and self.store.has_corpus(self.tenant_id): return self.store.get_corpus_docs(self.tenant_id) es_client = get_es_client().client index_name = get_tenant_index_name(self.tenant_id) docs: List[Dict[str, Any]] = [] for hit in scan( client=es_client, index=index_name, query={ "_source": [ "spu_id", "title", "vendor", "category_path", "category_name", "image_url", "skus", "tags", ], "query": {"match_all": {}}, }, size=500, preserve_order=False, clear_scroll=True, ): source = dict(hit.get("_source") or {}) source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") docs.append(source) self.store.upsert_corpus_docs(self.tenant_id, docs) return docs def full_corpus_rerank( self, query: str, docs: Sequence[Dict[str, Any]], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked = [] for doc in docs: spu_id = str(doc.get("spu_id")) ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def full_corpus_rerank_outside_exclude( self, query: str, docs: Sequence[Dict[str, Any]], exclude_spu_ids: set[str], batch_size: int = 80, force_refresh: bool = False, ) -> List[Dict[str, Any]]: """Rerank all corpus docs whose spu_id is not in ``exclude_spu_ids``; excluded IDs are not scored via API.""" exclude_spu_ids = {str(x) for x in exclude_spu_ids} cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) pending: List[Dict[str, Any]] = [ doc for doc in docs if str(doc.get("spu_id")) not in exclude_spu_ids and str(doc.get("spu_id")) and (force_refresh or str(doc.get("spu_id")) not in cached) ] if pending: new_scores: Dict[str, float] = {} for start in range(0, len(pending), batch_size): batch = pending[start : start + batch_size] scores = self._rerank_batch_with_retry(query=query, docs=batch) if len(scores) != len(batch): raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") for doc, score in zip(batch, scores): new_scores[str(doc.get("spu_id"))] = float(score) self.store.upsert_rerank_scores( self.tenant_id, query, new_scores, model_name="qwen3_vllm_score", ) cached.update(new_scores) ranked: List[Dict[str, Any]] = [] for doc in docs: spu_id = str(doc.get("spu_id") or "") if not spu_id or spu_id in exclude_spu_ids: continue ranked.append( {"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc} ) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: if not docs: return [] doc_texts = [build_rerank_doc(doc) for doc in docs] try: scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) return scores except Exception: if len(docs) == 1: return [-1.0] if len(docs) <= 6: scores: List[float] = [] for doc in docs: scores.extend(self._rerank_batch_with_retry(query, [doc])) return scores mid = len(docs) // 2 left = self._rerank_batch_with_retry(query, docs[:mid]) right = self._rerank_batch_with_retry(query, docs[mid:]) return left + right def annotate_missing_labels( self, query: str, docs: Sequence[Dict[str, Any]], force_refresh: bool = False, ) -> Dict[str, str]: labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] if not missing_docs: return labels for start in range(0, len(missing_docs), self.label_client.batch_size): batch = missing_docs[start : start + self.label_client.batch_size] batch_pairs = self._classify_with_retry(query, batch, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) return labels def _classify_with_retry( self, query: str, docs: Sequence[Dict[str, Any]], *, force_refresh: bool = False, ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: if not docs: return [] try: intent_block = self._ensure_query_intent_block(query) labels, raw_response = self.label_client.classify_batch( query, docs, query_intent_block=intent_block ) return [(labels, raw_response, docs)] except Exception: if len(docs) == 1: raise mid = len(docs) // 2 return self._classify_with_retry(query, docs[:mid], force_refresh=force_refresh) + self._classify_with_retry(query, docs[mid:], force_refresh=force_refresh) def _annotate_rebuild_batches( self, query: str, ordered_docs: Sequence[Dict[str, Any]], *, batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, irrelevant_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, force_refresh: bool = True, ) -> Tuple[Dict[str, str], List[Dict[str, Any]]]: """LLM-label ``ordered_docs`` in fixed-size batches along list order. **Early stop** (only after ``min_batches`` full batches have completed): Per batch, let *n* = batch size, and count labels among docs in that batch only. - *bad batch* iff **both** (strict ``>``): - ``#(Irrelevant)/n > irrelevant_stop_ratio`` (default 0.939), and - ``( #(Irrelevant) + #(Low Relevant) ) / n > irrelevant_low_combined_stop_ratio`` (default 0.959; weak relevance = ``RELEVANCE_LOW``). Maintain a streak of consecutive *bad* batches; any non-bad batch resets the streak to 0. Stop labeling when ``streak >= stop_streak`` (default 3) or when ``max_batches`` is reached or the ordered list is exhausted. Constants for defaults: ``eval_framework.constants`` (``DEFAULT_REBUILD_*``). """ batch_logs: List[Dict[str, Any]] = [] streak = 0 labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) total_ordered = len(ordered_docs) for batch_idx in range(max_batches): start = batch_idx * batch_size batch_docs = list(ordered_docs[start : start + batch_size]) if not batch_docs: break batch_pairs = self._classify_with_retry(query, batch_docs, force_refresh=force_refresh) for sub_labels, raw_response, sub_batch in batch_pairs: to_store = {str(doc.get("spu_id")): label for doc, label in zip(sub_batch, sub_labels)} self.store.upsert_labels( self.tenant_id, query, to_store, judge_model=self.label_client.model, raw_response=raw_response, ) labels.update(to_store) time.sleep(0.1) n = len(batch_docs) exact_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_EXACT) irrel_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_IRRELEVANT) low_n = sum(1 for doc in batch_docs if labels.get(str(doc.get("spu_id"))) == RELEVANCE_LOW) exact_ratio = exact_n / n if n else 0.0 irrelevant_ratio = irrel_n / n if n else 0.0 low_ratio = low_n / n if n else 0.0 irrel_low_ratio = (irrel_n + low_n) / n if n else 0.0 log_entry = { "batch_index": batch_idx + 1, "size": n, "exact_ratio": round(exact_ratio, 6), "irrelevant_ratio": round(irrelevant_ratio, 6), "low_ratio": round(low_ratio, 6), "irrelevant_plus_low_ratio": round(irrel_low_ratio, 6), "offset_start": start, "offset_end": min(start + n, total_ordered), } batch_logs.append(log_entry) _log.info( "[eval-rebuild] query=%r llm_batch=%s/%s size=%s exact_ratio=%.4f irrelevant_ratio=%.4f " "irrel_plus_low_ratio=%.4f", query, batch_idx + 1, max_batches, n, exact_ratio, irrelevant_ratio, irrel_low_ratio, ) # Early-stop streak: only evaluated after min_batches (warm-up before trusting tail quality). if batch_idx + 1 >= min_batches: bad_batch = (irrelevant_ratio > irrelevant_stop_ratio) and ( irrel_low_ratio > irrelevant_low_combined_stop_ratio ) if bad_batch: streak += 1 else: streak = 0 if streak >= stop_streak: _log.info( "[eval-rebuild] query=%r early_stop after %s batches (%s consecutive batches: " "irrelevant>%s and irrel+low>%s)", query, batch_idx + 1, stop_streak, irrelevant_stop_ratio, irrelevant_low_combined_stop_ratio, ) break return labels, batch_logs def build_query_annotation_set( self, query: str, *, search_depth: int = 1000, rerank_depth: int = 10000, annotate_search_top_k: int = 120, annotate_rerank_top_k: int = 200, language: str = "en", force_refresh_rerank: bool = False, force_refresh_labels: bool = False, search_recall_top_k: int = DEFAULT_SEARCH_RECALL_TOP_K, rerank_high_threshold: float = DEFAULT_RERANK_HIGH_THRESHOLD, rerank_high_skip_count: int = DEFAULT_RERANK_HIGH_SKIP_COUNT, rebuild_llm_batch_size: int = DEFAULT_REBUILD_LLM_BATCH_SIZE, rebuild_min_batches: int = DEFAULT_REBUILD_MIN_LLM_BATCHES, rebuild_max_batches: int = DEFAULT_REBUILD_MAX_LLM_BATCHES, rebuild_irrelevant_stop_ratio: float = DEFAULT_REBUILD_IRRELEVANT_STOP_RATIO, rebuild_irrel_low_combined_stop_ratio: float = DEFAULT_REBUILD_IRREL_LOW_COMBINED_STOP_RATIO, rebuild_irrelevant_stop_streak: int = DEFAULT_REBUILD_IRRELEVANT_STOP_STREAK, ) -> QueryBuildResult: """Build per-query annotation pool and write ``query_builds/*.json``. Normal mode unions search + rerank windows and fills missing labels once. **Rebuild mode** (``force_refresh_labels=True``): full recall pool + corpus rerank outside pool, optional skip for "easy" queries, then batched LLM labeling with **early stop**; see ``_build_query_annotation_set_rebuild`` and ``_annotate_rebuild_batches`` (docstring spells out the bad-batch / streak rule). Rebuild tuning knobs: ``rebuild_*`` and ``search_recall_top_k`` parameters below; CLI mirrors them under ``build --force-refresh-labels``. """ if force_refresh_labels: return self._build_query_annotation_set_rebuild( query=query, search_depth=search_depth, rerank_depth=rerank_depth, language=language, force_refresh_rerank=force_refresh_rerank, search_recall_top_k=search_recall_top_k, rerank_high_threshold=rerank_high_threshold, rerank_high_skip_count=rerank_high_skip_count, rebuild_llm_batch_size=rebuild_llm_batch_size, rebuild_min_batches=rebuild_min_batches, rebuild_max_batches=rebuild_max_batches, rebuild_irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, rebuild_irrel_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, rebuild_irrelevant_stop_streak=rebuild_irrelevant_stop_streak, ) search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) search_results = list(search_payload.get("results") or []) corpus = self.corpus_docs(refresh=False) full_rerank = self.full_corpus_rerank( query=query, docs=corpus, force_refresh=force_refresh_rerank, ) rerank_depth_effective = min(rerank_depth, len(full_rerank)) pool_docs: Dict[str, Dict[str, Any]] = {} for doc in search_results[:annotate_search_top_k]: pool_docs[str(doc.get("spu_id"))] = doc for item in full_rerank[:annotate_rerank_top_k]: pool_docs[str(item["spu_id"])] = item["doc"] labels = self.annotate_missing_labels( query=query, docs=list(pool_docs.values()), force_refresh=force_refresh_labels, ) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": None, "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": self.search_client.get_json("/admin/config/meta", timeout=20), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "annotate_search_top_k": annotate_search_top_k, "annotate_rerank_top_k": annotate_rerank_top_k, "pool_size": len(pool_docs), }, "metrics_top100": metrics, "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=len(pool_docs), output_json_path=output_json_path, ) def _build_query_annotation_set_rebuild( self, query: str, *, search_depth: int, rerank_depth: int, language: str, force_refresh_rerank: bool, search_recall_top_k: int, rerank_high_threshold: float, rerank_high_skip_count: int, rebuild_llm_batch_size: int, rebuild_min_batches: int, rebuild_max_batches: int, rebuild_irrelevant_stop_ratio: float, rebuild_irrel_low_combined_stop_ratio: float, rebuild_irrelevant_stop_streak: int, ) -> QueryBuildResult: search_size = max(int(search_depth), int(search_recall_top_k)) search_payload = self.search_client.search(query=query, size=search_size, from_=0, language=language) search_results = list(search_payload.get("results") or []) recall_n = min(int(search_recall_top_k), len(search_results)) pool_search_docs = search_results[:recall_n] pool_spu_ids = {str(d.get("spu_id")) for d in pool_search_docs if str(d.get("spu_id") or "").strip()} corpus = self.corpus_docs(refresh=False) corpus_by_id = {str(d.get("spu_id")): d for d in corpus if str(d.get("spu_id") or "").strip()} rerank_pending_n = sum( 1 for d in corpus if str(d.get("spu_id") or "").strip() and str(d.get("spu_id")) not in pool_spu_ids ) _log.info( "[eval-rebuild] query=%r phase=rerank_outside_pool docs≈%s (pool=%s, force_refresh_rerank=%s); " "this can take a long time with no further logs until LLM batches start", query, rerank_pending_n, len(pool_spu_ids), force_refresh_rerank, ) ranked_outside = self.full_corpus_rerank_outside_exclude( query=query, docs=corpus, exclude_spu_ids=pool_spu_ids, force_refresh=force_refresh_rerank, ) rerank_high_n = sum(1 for item in ranked_outside if float(item["score"]) > float(rerank_high_threshold)) rebuild_meta: Dict[str, Any] = { "mode": "rebuild_v1", "search_recall_top_k": search_recall_top_k, "recall_pool_size": len(pool_spu_ids), "pool_rerank_score_assigned": 1.0, "rerank_high_threshold": rerank_high_threshold, "rerank_high_count_outside_pool": rerank_high_n, "rerank_high_skip_count": rerank_high_skip_count, "rebuild_llm_batch_size": rebuild_llm_batch_size, "rebuild_min_batches": rebuild_min_batches, "rebuild_max_batches": rebuild_max_batches, "rebuild_irrelevant_stop_ratio": rebuild_irrelevant_stop_ratio, "rebuild_irrel_low_combined_stop_ratio": rebuild_irrel_low_combined_stop_ratio, "rebuild_irrelevant_stop_streak": rebuild_irrelevant_stop_streak, } batch_logs: List[Dict[str, Any]] = [] skipped = False skip_reason: str | None = None labels: Dict[str, str] = dict(self.store.get_labels(self.tenant_id, query)) llm_labeled_total = 0 if rerank_high_n > int(rerank_high_skip_count): skipped = True skip_reason = "too_many_high_rerank_scores" _log.info( "[eval-rebuild] query=%r skip: rerank_score>%s outside recall pool count=%s > %s " "(relevant tail too large / query too easy to satisfy)", query, rerank_high_threshold, rerank_high_n, rerank_high_skip_count, ) else: ordered_docs: List[Dict[str, Any]] = [] seen_ordered: set[str] = set() for doc in pool_search_docs: sid = str(doc.get("spu_id") or "") if not sid or sid in seen_ordered: continue seen_ordered.add(sid) ordered_docs.append(corpus_by_id.get(sid, doc)) for item in ranked_outside: sid = str(item["spu_id"]) if sid in seen_ordered: continue seen_ordered.add(sid) ordered_docs.append(item["doc"]) labels, batch_logs = self._annotate_rebuild_batches( query, ordered_docs, batch_size=rebuild_llm_batch_size, min_batches=rebuild_min_batches, max_batches=rebuild_max_batches, irrelevant_stop_ratio=rebuild_irrelevant_stop_ratio, irrelevant_low_combined_stop_ratio=rebuild_irrel_low_combined_stop_ratio, stop_streak=rebuild_irrelevant_stop_streak, force_refresh=True, ) llm_labeled_total = sum(int(entry.get("size") or 0) for entry in batch_logs) rebuild_meta["skipped"] = skipped rebuild_meta["skip_reason"] = skip_reason rebuild_meta["llm_batch_logs"] = batch_logs rebuild_meta["llm_labeled_total"] = llm_labeled_total rerank_depth_effective = min(int(rerank_depth), len(ranked_outside)) search_labeled_results: List[Dict[str, Any]] = [] for rank, doc in enumerate(search_results, start=1): spu_id = str(doc.get("spu_id")) in_pool = rank <= recall_n search_labeled_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": 1.0 if in_pool else None, "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) rerank_top_results: List[Dict[str, Any]] = [] for rank, item in enumerate(ranked_outside[:rerank_depth_effective], start=1): doc = item["doc"] spu_id = str(item["spu_id"]) rerank_top_results.append( { "rank": rank, "spu_id": spu_id, "title": build_display_title(doc), "image_url": doc.get("image_url"), "rerank_score": round(float(item["score"]), 8), "label": labels.get(spu_id), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) top100_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in search_labeled_results[:100] ] metrics = compute_query_metrics(top100_labels) output_dir = ensure_dir(self.artifact_root / "query_builds") run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" output_json_path = output_dir / f"{run_id}.json" pool_docs_count = len(pool_spu_ids) + len(ranked_outside) payload = { "run_id": run_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "query": query, "config_meta": self.search_client.get_json("/admin/config/meta", timeout=20), "search_total": int(search_payload.get("total") or 0), "search_depth_requested": search_depth, "search_depth_effective": len(search_results), "rerank_depth_requested": rerank_depth, "rerank_depth_effective": rerank_depth_effective, "corpus_size": len(corpus), "annotation_pool": { "rebuild": rebuild_meta, "ordered_union_size": pool_docs_count, }, "metrics_top100": metrics, "search_results": search_labeled_results, "full_rerank_top": rerank_top_results, } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) return QueryBuildResult( query=query, tenant_id=self.tenant_id, search_total=int(search_payload.get("total") or 0), search_depth=len(search_results), rerank_corpus_size=len(corpus), annotated_count=llm_labeled_total if not skipped else 0, output_json_path=output_json_path, ) def evaluate_live_query( self, query: str, top_k: int = 100, auto_annotate: bool = False, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: search_payload = self.search_client.search( query=query, size=max(top_k, 100), from_=0, language=language, debug=True ) zh_by_spu = _zh_titles_from_debug_per_result(search_payload.get("debug_info")) results = list(search_payload.get("results") or []) if auto_annotate: self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) labels = self.store.get_labels(self.tenant_id, query) recalled_spu_ids = {str(doc.get("spu_id")) for doc in results[:top_k]} labeled = [] unlabeled_hits = 0 for rank, doc in enumerate(results[:top_k], start=1): spu_id = str(doc.get("spu_id")) label = labels.get(spu_id) if label not in VALID_LABELS: unlabeled_hits += 1 primary_title = build_display_title(doc) title_zh = zh_by_spu.get(spu_id) or "" if not title_zh and isinstance(doc.get("title"), dict): title_zh = zh_title_from_multilingual(doc.get("title")) labeled.append( { "rank": rank, "spu_id": spu_id, "title": primary_title, "title_zh": title_zh if title_zh and title_zh != primary_title else "", "image_url": doc.get("image_url"), "label": label, "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) metric_labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in labeled ] label_stats = self.store.get_query_label_stats(self.tenant_id, query) rerank_scores = self.store.get_rerank_scores(self.tenant_id, query) relevant_missing_ids = [ spu_id for spu_id, label in labels.items() if label in RELEVANCE_NON_IRRELEVANT and spu_id not in recalled_spu_ids ] missing_docs_map = self.store.get_corpus_docs_by_spu_ids(self.tenant_id, relevant_missing_ids) missing_relevant = [] for spu_id in relevant_missing_ids: doc = missing_docs_map.get(spu_id) if not doc: continue miss_title = build_display_title(doc) miss_zh = zh_title_from_multilingual(doc.get("title")) if isinstance(doc.get("title"), dict) else "" missing_relevant.append( { "spu_id": spu_id, "label": labels[spu_id], "rerank_score": rerank_scores.get(spu_id), "title": miss_title, "title_zh": miss_zh if miss_zh and miss_zh != miss_title else "", "image_url": doc.get("image_url"), "option_values": list(compact_option_values(doc.get("skus") or [])), "product": compact_product_payload(doc), } ) label_order = { RELEVANCE_EXACT: 0, RELEVANCE_HIGH: 1, RELEVANCE_LOW: 2, RELEVANCE_IRRELEVANT: 3, } missing_relevant.sort( key=lambda item: ( label_order.get(str(item.get("label")), 9), -(float(item.get("rerank_score")) if item.get("rerank_score") is not None else float("-inf")), str(item.get("title") or ""), ) ) tips: List[str] = [] if auto_annotate: tips.append("Single-query evaluation used cached labels and refreshed missing labels for recalled results.") else: tips.append("Single-query evaluation used the offline annotation cache only; recalled SPUs without cached labels were treated as Irrelevant.") if label_stats["total"] == 0: tips.append("This query has no offline annotation set yet. Build or refresh labels first if you want stable evaluation.") if unlabeled_hits: tips.append(f"{unlabeled_hits} recalled results were not in the annotation set and were counted as Irrelevant.") if not missing_relevant: tips.append("No cached non-irrelevant products were missed by this recall set.") return { "query": query, "tenant_id": self.tenant_id, "top_k": top_k, "metrics": compute_query_metrics(metric_labels), "results": labeled, "missing_relevant": missing_relevant, "label_stats": { **label_stats, "unlabeled_hits_treated_irrelevant": unlabeled_hits, "recalled_hits": len(labeled), "missing_relevant_count": len(missing_relevant), "missing_exact_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_EXACT), "missing_high_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_HIGH), "missing_low_count": sum(1 for item in missing_relevant if item["label"] == RELEVANCE_LOW), }, "tips": tips, "total": int(search_payload.get("total") or 0), } def batch_evaluate( self, queries: Sequence[str], *, top_k: int = 100, auto_annotate: bool = True, language: str = "en", force_refresh_labels: bool = False, ) -> Dict[str, Any]: per_query = [] total_q = len(queries) _log.info("[batch-eval] starting %s queries top_k=%s auto_annotate=%s", total_q, top_k, auto_annotate) for q_index, query in enumerate(queries, start=1): live = self.evaluate_live_query( query, top_k=top_k, auto_annotate=auto_annotate, language=language, force_refresh_labels=force_refresh_labels, ) labels = [ item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT for item in live["results"] ] per_query.append( { "query": live["query"], "tenant_id": live["tenant_id"], "top_k": live["top_k"], "metrics": live["metrics"], "distribution": label_distribution(labels), "total": live["total"], } ) m = live["metrics"] _log.info( "[batch-eval] (%s/%s) query=%r P@10=%s MAP_3=%s total_hits=%s", q_index, total_q, query, m.get("P@10"), m.get("MAP_3"), live.get("total"), ) aggregate = aggregate_metrics([item["metrics"] for item in per_query]) aggregate_distribution = { RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), RELEVANCE_HIGH: sum(item["distribution"][RELEVANCE_HIGH] for item in per_query), RELEVANCE_LOW: sum(item["distribution"][RELEVANCE_LOW] for item in per_query), RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), } batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" report_dir = ensure_dir(self.artifact_root / "batch_reports") config_snapshot_path = report_dir / f"{batch_id}_config.json" config_snapshot = self.search_client.get_json("/admin/config", timeout=20) config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") output_json_path = report_dir / f"{batch_id}.json" report_md_path = report_dir / f"{batch_id}.md" payload = { "batch_id": batch_id, "created_at": utc_now_iso(), "tenant_id": self.tenant_id, "queries": list(queries), "top_k": top_k, "aggregate_metrics": aggregate, "aggregate_distribution": aggregate_distribution, "per_query": per_query, "config_snapshot_path": str(config_snapshot_path), } output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) _log.info( "[batch-eval] finished batch_id=%s per_query=%s json=%s", batch_id, len(per_query), output_json_path, ) return payload