eval_search_quality.py 8.1 KB
#!/usr/bin/env python3
"""
Run search quality evaluation against real tenant indexes and emit JSON/Markdown reports.

Usage:
  source activate.sh
  python scripts/eval_search_quality.py
"""

from __future__ import annotations

import json
import sys
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from api.app import get_searcher, init_service
from context import create_request_context


DEFAULT_QUERIES_BY_TENANT: Dict[str, List[str]] = {
    "0": [
        "连衣裙",
        "dress",
        "dress 连衣裙",
        "maxi dress 长裙",
        "波西米亚连衣裙",
        "T恤",
        "graphic tee 图案T恤",
        "shirt",
        "礼服衬衫",
        "hoodie 卫衣",
        "连帽卫衣",
        "sweatshirt",
        "牛仔裤",
        "jeans",
        "阔腿牛仔裤",
        "毛衣 sweater",
        "cardigan 开衫",
        "jacket 外套",
        "puffer jacket 羽绒服",
        "飞行员夹克",
    ],
    "162": [
        "连衣裙",
        "dress",
        "dress 连衣裙",
        "T恤",
        "shirt",
        "hoodie 卫衣",
        "牛仔裤",
        "jeans",
        "毛衣 sweater",
        "jacket 外套",
        "娃娃衣服",
        "芭比裙子",
        "连衣短裙芭比",
        "公主大裙",
        "晚礼服芭比",
        "毛衣熊",
        "服饰饰品",
        "鞋子",
        "军人套",
        "陆军套",
    ],
}


@dataclass
class RankedItem:
    rank: int
    spu_id: str
    title: str
    vendor: str
    es_score: float | None
    rerank_score: float | None
    text_score: float | None
    text_source_score: float | None
    text_translation_score: float | None
    text_fallback_score: float | None
    text_primary_score: float | None
    text_support_score: float | None
    knn_score: float | None
    fused_score: float | None
    matched_queries: Any


def _pick_text(value: Any, language: str = "zh") -> str:
    if value is None:
        return ""
    if isinstance(value, dict):
        return str(value.get(language) or value.get("zh") or value.get("en") or "").strip()
    return str(value).strip()


def _to_float(value: Any) -> float | None:
    try:
        if value is None:
            return None
        return float(value)
    except (TypeError, ValueError):
        return None


def _evaluate_query(searcher, tenant_id: str, query: str) -> Dict[str, Any]:
    context = create_request_context(
        reqid=f"eval-{tenant_id}-{abs(hash(query)) % 1000000}",
        uid="codex",
    )
    result = searcher.search(
        query=query,
        tenant_id=tenant_id,
        size=20,
        from_=0,
        context=context,
        debug=True,
        language="zh",
        enable_rerank=True,
    )

    per_result_debug = ((result.debug_info or {}).get("per_result") or [])
    debug_by_spu_id = {
        str(item.get("spu_id")): item
        for item in per_result_debug
        if isinstance(item, dict) and item.get("spu_id") is not None
    }

    ranked_items: List[RankedItem] = []
    for rank, spu in enumerate(result.results[:20], 1):
        spu_id = str(getattr(spu, "spu_id", ""))
        debug_item = debug_by_spu_id.get(spu_id, {})
        ranked_items.append(
            RankedItem(
                rank=rank,
                spu_id=spu_id,
                title=_pick_text(getattr(spu, "title", None), language="zh"),
                vendor=_pick_text(getattr(spu, "vendor", None), language="zh"),
                es_score=_to_float(debug_item.get("es_score")),
                rerank_score=_to_float(debug_item.get("rerank_score")),
                text_score=_to_float(debug_item.get("text_score")),
                text_source_score=_to_float(debug_item.get("text_source_score")),
                text_translation_score=_to_float(debug_item.get("text_translation_score")),
                text_fallback_score=_to_float(debug_item.get("text_fallback_score")),
                text_primary_score=_to_float(debug_item.get("text_primary_score")),
                text_support_score=_to_float(debug_item.get("text_support_score")),
                knn_score=_to_float(debug_item.get("knn_score")),
                fused_score=_to_float(debug_item.get("fused_score")),
                matched_queries=debug_item.get("matched_queries"),
            )
        )

    return {
        "query": query,
        "tenant_id": tenant_id,
        "total": result.total,
        "max_score": result.max_score,
        "took_ms": result.took_ms,
        "query_analysis": ((result.debug_info or {}).get("query_analysis") or {}),
        "stage_timings": ((result.debug_info or {}).get("stage_timings") or {}),
        "top20": [asdict(item) for item in ranked_items],
    }


def _render_markdown(report: Dict[str, Any]) -> str:
    lines: List[str] = []
    lines.append(f"# Search Quality Evaluation")
    lines.append("")
    lines.append(f"- Generated at: {report['generated_at']}")
    lines.append(f"- Queries per tenant: {report['queries_per_tenant']}")
    lines.append("")
    for tenant_id, entries in report["tenants"].items():
        lines.append(f"## Tenant {tenant_id}")
        lines.append("")
        for entry in entries:
            qa = entry.get("query_analysis") or {}
            lines.append(f"### Query: {entry['query']}")
            lines.append("")
            lines.append(
                f"- total={entry['total']} max_score={entry['max_score']:.6f} took_ms={entry['took_ms']}"
            )
            lines.append(
                f"- detected_language={qa.get('detected_language')} search_langs={qa.get('search_langs')} supplemental_search_langs={qa.get('supplemental_search_langs')}"
            )
            lines.append(f"- query_text_by_lang={qa.get('query_text_by_lang')}")
            lines.append("")
            lines.append("| rank | spu_id | title | fused | rerank | text | text_src | text_trans | text_fb | knn | es | matched_queries |")
            lines.append("| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |")
            for item in entry.get("top20", []):
                title = str(item.get("title", "")).replace("|", "/")
                matched = json.dumps(item.get("matched_queries"), ensure_ascii=False)
                matched = matched.replace("|", "/")
                lines.append(
                    f"| {item.get('rank')} | {item.get('spu_id')} | {title} | "
                    f"{item.get('fused_score')} | {item.get('rerank_score')} | {item.get('text_score')} | "
                    f"{item.get('text_source_score')} | {item.get('text_translation_score')} | "
                    f"{item.get('text_fallback_score')} | {item.get('knn_score')} | {item.get('es_score')} | {matched} |"
                )
            lines.append("")
    return "\n".join(lines)


def main() -> None:
    init_service("http://localhost:9200")
    searcher = get_searcher()

    tenants_report: Dict[str, List[Dict[str, Any]]] = {}
    for tenant_id, queries in DEFAULT_QUERIES_BY_TENANT.items():
        tenant_entries: List[Dict[str, Any]] = []
        for query in queries:
            print(f"[eval] tenant={tenant_id} query={query}")
            tenant_entries.append(_evaluate_query(searcher, tenant_id, query))
        tenants_report[tenant_id] = tenant_entries

    report = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "queries_per_tenant": {tenant: len(queries) for tenant, queries in DEFAULT_QUERIES_BY_TENANT.items()},
        "tenants": tenants_report,
    }

    out_dir = Path("artifacts/search_eval")
    out_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    json_path = out_dir / f"search_eval_{timestamp}.json"
    md_path = out_dir / f"search_eval_{timestamp}.md"
    json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
    md_path.write_text(_render_markdown(report), encoding="utf-8")
    print(f"[done] json={json_path}")
    print(f"[done] md={md_path}")


if __name__ == "__main__":
    main()