From 432d1c88efc7d32fd4ed509b3e6ad6d80acf4342 Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 31 Mar 2026 17:12:01 +0800 Subject: [PATCH] 评估框架 --- config/config.yaml | 589 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- scripts/evaluation/README.md | 105 --------------------------------------------------------------------------------------------------------- scripts/evaluation/README_Requirement.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/evaluation/README_Requirement_zh.md | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/evaluation/README_zh.md | 109 ------------------------------------------------------------------------------------------------------------- scripts/evaluation/build_annotation_set.py | 14 ++++++++++++++ scripts/evaluation/eval_framework.py | 1786 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/evaluation/serve_eval_web.py | 14 ++++++++++++++ scripts/evaluation/tune_fusion.py | 296 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 2580 insertions(+), 561 deletions(-) delete mode 100644 scripts/evaluation/README.md create mode 100644 scripts/evaluation/README_Requirement.md create mode 100644 scripts/evaluation/README_Requirement_zh.md delete mode 100644 scripts/evaluation/README_zh.md create mode 100644 scripts/evaluation/build_annotation_set.py create mode 100644 scripts/evaluation/eval_framework.py create mode 100644 scripts/evaluation/serve_eval_web.py create mode 100644 scripts/evaluation/tune_fusion.py diff --git a/config/config.yaml b/config/config.yaml index 83a2479..b2c1b0a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,35 +1,25 @@ -# Unified Configuration for Multi-Tenant Search Engine -# 统一配置文件,所有租户共用一套配置 -# 注意:索引结构由 mappings/search_products.json 定义,此文件只配置搜索行为 -# -# 约定:下列键为必填;进程环境变量可覆盖 infrastructure / runtime 中同名语义项 -#(如 ES_HOST、API_PORT 等),未设置环境变量时使用本文件中的值。 - -# Process / bind addresses (环境变量 APP_ENV、RUNTIME_ENV、ES_INDEX_NAMESPACE 可覆盖前两者的语义) runtime: - environment: "prod" - index_namespace: "" - api_host: "0.0.0.0" + environment: prod + index_namespace: '' + api_host: 0.0.0.0 api_port: 6002 - indexer_host: "0.0.0.0" + indexer_host: 0.0.0.0 indexer_port: 6004 - embedding_host: "0.0.0.0" + embedding_host: 0.0.0.0 embedding_port: 6005 embedding_text_port: 6005 embedding_image_port: 6008 - translator_host: "0.0.0.0" + translator_host: 0.0.0.0 translator_port: 6006 - reranker_host: "0.0.0.0" + reranker_host: 0.0.0.0 reranker_port: 6007 - -# 基础设施连接(敏感项优先读环境变量:ES_*、REDIS_*、DB_*、DASHSCOPE_API_KEY、DEEPL_AUTH_KEY) infrastructure: elasticsearch: - host: "http://localhost:9200" + host: http://localhost:9200 username: null password: null redis: - host: "localhost" + host: localhost port: 6479 snapshot_db: 0 password: null @@ -37,8 +27,8 @@ infrastructure: socket_connect_timeout: 1 retry_on_timeout: false cache_expire_days: 720 - embedding_cache_prefix: "embedding" - anchor_cache_prefix: "product_anchors" + embedding_cache_prefix: embedding + anchor_cache_prefix: product_anchors anchor_cache_expire_days: 30 database: host: null @@ -49,30 +39,16 @@ infrastructure: secrets: dashscope_api_key: null deepl_auth_key: null - -# Elasticsearch Index -es_index_name: "search_products" - -# 检索域 / 索引列表(可为空列表;每项字段均需显式给出) +es_index_name: search_products indexes: [] - -# Config assets assets: - query_rewrite_dictionary_path: "config/dictionaries/query_rewrite.dict" - -# Product content understanding (LLM enrich-content) configuration + query_rewrite_dictionary_path: config/dictionaries/query_rewrite.dict product_enrich: max_workers: 40 - -# ES Index Settings (基础设置) es_settings: number_of_shards: 1 number_of_replicas: 0 - refresh_interval: "30s" - -# 字段权重配置(用于搜索时的字段boost) -# 统一按“字段基名”配置;查询时按实际检索语言动态拼接 .{lang}。 -# 若需要按某个语言单独调权,也可以加显式 key(例如 title.de: 3.2)。 + refresh_interval: 30s field_boosts: title: 3.0 qanchors: 2.5 @@ -85,79 +61,64 @@ field_boosts: option1_values: 1.5 option2_values: 1.5 option3_values: 1.5 - -# Query Configuration(查询配置) query_config: - # 支持的语言 supported_languages: - - "zh" - - "en" - default_language: "en" - - # 功能开关(翻译开关由tenant_config控制) + - zh + - en + default_language: en enable_text_embedding: true enable_query_rewrite: true - - # 查询翻译模型(须与 services.translation.capabilities 中某项一致) - # 源语种在租户 index_languages 内:主召回可打在源语种字段,用下面三项。 - zh_to_en_model: "nllb-200-distilled-600m" # "opus-mt-zh-en" - en_to_zh_model: "nllb-200-distilled-600m" # "opus-mt-en-zh" - default_translation_model: "nllb-200-distilled-600m" - # zh_to_en_model: "deepl" - # en_to_zh_model: "deepl" - # default_translation_model: "deepl" - # 源语种不在 index_languages:翻译对可检索文本更关键,可单独指定(缺省则与上一组相同) - zh_to_en_model__source_not_in_index: "nllb-200-distilled-600m" - en_to_zh_model__source_not_in_index: "nllb-200-distilled-600m" - default_translation_model__source_not_in_index: "nllb-200-distilled-600m" - # zh_to_en_model__source_not_in_index: "deepl" - # en_to_zh_model__source_not_in_index: "deepl" - # default_translation_model__source_not_in_index: "deepl" - - # 查询解析阶段:翻译与 query 向量并发执行,共用同一等待预算(毫秒)。 - # 检测语言已在租户 index_languages 内:较短;不在索引语言内:较长(翻译对召回更关键)。 - translation_embedding_wait_budget_ms_source_in_index: 200 # 80 - translation_embedding_wait_budget_ms_source_not_in_index: 300 #200 - + zh_to_en_model: nllb-200-distilled-600m + en_to_zh_model: nllb-200-distilled-600m + default_translation_model: nllb-200-distilled-600m + zh_to_en_model__source_not_in_index: nllb-200-distilled-600m + en_to_zh_model__source_not_in_index: nllb-200-distilled-600m + default_translation_model__source_not_in_index: nllb-200-distilled-600m + translation_embedding_wait_budget_ms_source_in_index: 200 + translation_embedding_wait_budget_ms_source_not_in_index: 300 style_intent: enabled: true selected_sku_boost: 1.2 - color_dictionary_path: "config/dictionaries/style_intent_color.csv" - size_dictionary_path: "config/dictionaries/style_intent_size.csv" + color_dictionary_path: config/dictionaries/style_intent_color.csv + size_dictionary_path: config/dictionaries/style_intent_size.csv dimension_aliases: - color: ["color", "colors", "colour", "colours", "颜色", "色", "色系"] - size: ["size", "sizes", "sizing", "尺码", "尺寸", "码数", "号码", "码"] - + color: + - color + - colors + - colour + - colours + - 颜色 + - 色 + - 色系 + size: + - size + - sizes + - sizing + - 尺码 + - 尺寸 + - 码数 + - 号码 + - 码 product_title_exclusion: enabled: true - dictionary_path: "config/dictionaries/product_title_exclusion.tsv" - - # 动态多语言检索字段配置 - # multilingual_fields 会被拼成 title.{lang}/brief.{lang}/... 形式; - # shared_fields 为无语言后缀字段。 + dictionary_path: config/dictionaries/product_title_exclusion.tsv search_fields: multilingual_fields: - - "title" - - "qanchors" - - "category_path" - - "category_name_text" - - "brief" - - "description" - - "vendor" - shared_fields: - # - "tags" - # - "option1_values" - # - "option2_values" - # - "option3_values" + - title + - qanchors + - category_path + - category_name_text + - brief + - description + - vendor + shared_fields: null core_multilingual_fields: - - "title" - - "qanchors" - - "category_name_text" - - # 统一文本召回策略(主查询 + 翻译查询) + - title + - qanchors + - category_name_text text_query_strategy: - base_minimum_should_match: "60%" - translation_minimum_should_match: "60%" + base_minimum_should_match: 60% + translation_minimum_should_match: 60% translation_boost: 0.75 tie_breaker_base_query: 0.5 best_fields_boost: 2.0 @@ -169,67 +130,51 @@ query_config: title: 5.0 qanchors: 4.0 phrase_match_boost: 3.0 - - # Embedding字段名称 - text_embedding_field: "title_embedding" - image_embedding_field: "image_embedding.vector" - - # 返回字段配置(_source includes) - # null表示返回所有字段,[]表示不返回任何字段,列表表示只返回指定字段 - # 下列字段与 api/result_formatter.py(SpuResult 填充)及 search/searcher.py(SKU 排序/主图替换)一致 + text_embedding_field: title_embedding + image_embedding_field: image_embedding.vector source_fields: - - spu_id - - handle - - title - - brief - - description - - vendor - - category_name - - category_name_text - - category_path - - category_id - - category_level - - category1_name - - category2_name - - category3_name - - tags - - min_price - - compare_at_price - - image_url - - sku_prices - - sku_weights - - sku_weight_units - - total_inventory - - option1_name - - option1_values - - option2_name - - option2_values - - option3_name - - option3_values - - specifications - - skus - - # KNN:文本向量与多模态(图片)向量各自 boost 与召回(k / num_candidates) + - spu_id + - handle + - title + - brief + - description + - vendor + - category_name + - category_name_text + - category_path + - category_id + - category_level + - category1_name + - category2_name + - category3_name + - tags + - min_price + - compare_at_price + - image_url + - sku_prices + - sku_weights + - sku_weight_units + - total_inventory + - option1_name + - option1_values + - option2_name + - option2_values + - option3_name + - option3_values + - specifications + - skus knn_text_boost: 4 knn_image_boost: 4 - - # knn_text_num_candidates = k * 3.4 knn_text_k: 160 knn_text_num_candidates: 560 - knn_text_k_long: 400 knn_text_num_candidates_long: 1200 - knn_image_k: 400 knn_image_num_candidates: 1200 - -# Function Score配置(ES层打分规则) function_score: - score_mode: "sum" - boost_mode: "multiply" + score_mode: sum + boost_mode: multiply functions: [] - -# 粗排配置(仅融合 ES 文本/向量信号,不调用模型) coarse_rank: enabled: true input_window: 700 @@ -237,69 +182,52 @@ coarse_rank: fusion: text_bias: 0.1 text_exponent: 0.35 - # base_query_trans_* 相对 base_query 的权重(见 search/rerank_client 中文本 dismax 融合) - # 因为es的打分已经给了trans进行了折扣,所以这里不再继续折扣 text_translation_weight: 1.0 knn_text_weight: 1.0 knn_image_weight: 1.0 knn_tie_breaker: 0.1 knn_bias: 0.6 knn_exponent: 0.0 - -# 精排配置(轻量 reranker) fine_rank: enabled: false input_window: 160 output_window: 80 timeout_sec: 10.0 - rerank_query_template: "{query}" - rerank_doc_template: "{title}" - service_profile: "fine" - -# 重排配置(provider/URL 在 services.rerank) + rerank_query_template: '{query}' + rerank_doc_template: '{title}' + service_profile: fine rerank: enabled: true rerank_window: 160 timeout_sec: 15.0 weight_es: 0.4 weight_ai: 0.6 - rerank_query_template: "{query}" - rerank_doc_template: "{title}" - service_profile: "default" - # 乘法融合:fused = Π (max(score,0) + bias) ** exponent(rerank / text / knn 三项) - # 其中 knn_score 先做一层 dis_max: - # max(knn_text_weight * text_knn, knn_image_weight * image_knn) - # + knn_tie_breaker * 另一侧较弱信号 + rerank_query_template: '{query}' + rerank_doc_template: '{title}' + service_profile: default fusion: - rerank_bias: 0.00001 - rerank_exponent: 1.0 - fine_bias: 0.00001 + rerank_bias: 1.0e-05 + rerank_exponent: 1.15 + fine_bias: 1.0e-05 fine_exponent: 1.0 text_bias: 0.1 - text_exponent: 0.35 - # base_query_trans_* 相对 base_query 的权重(见 search/rerank_client 中文本 dismax 融合) - text_translation_weight: 1.0 + text_exponent: 0.25 + text_translation_weight: 0.8 knn_text_weight: 1.0 knn_image_weight: 1.0 knn_tie_breaker: 0.1 knn_bias: 0.6 knn_exponent: 0.0 - -# 可扩展服务/provider 注册表(单一配置源) services: translation: - service_url: "http://127.0.0.1:6006" - # default_model: "nllb-200-distilled-600m" - default_model: "nllb-200-distilled-600m" - default_scene: "general" + service_url: http://127.0.0.1:6006 + default_model: nllb-200-distilled-600m + default_scene: general timeout_sec: 10.0 cache: ttl_seconds: 62208000 sliding_expiration: true - # When false, cache keys are exact-match per request model only (ignores model_quality_tiers for lookups). enable_model_quality_tier_cache: true - # Higher tier = better quality. Multiple models may share one tier (同级). - # A request may reuse Redis keys from models with tier > A or tier == A (not from lower tiers). model_quality_tiers: deepl: 30 qwen-mt: 30 @@ -310,43 +238,43 @@ services: capabilities: qwen-mt: enabled: true - backend: "qwen_mt" - model: "qwen-mt-flash" - base_url: "https://dashscope-us.aliyuncs.com/compatible-mode/v1" + backend: qwen_mt + model: qwen-mt-flash + base_url: https://dashscope-us.aliyuncs.com/compatible-mode/v1 timeout_sec: 10.0 use_cache: true llm: enabled: true - backend: "llm" - model: "qwen-flash" - base_url: "https://dashscope-us.aliyuncs.com/compatible-mode/v1" + backend: llm + model: qwen-flash + base_url: https://dashscope-us.aliyuncs.com/compatible-mode/v1 timeout_sec: 30.0 use_cache: true deepl: enabled: true - backend: "deepl" - api_url: "https://api.deepl.com/v2/translate" + backend: deepl + api_url: https://api.deepl.com/v2/translate timeout_sec: 10.0 - glossary_id: "" + glossary_id: '' use_cache: true nllb-200-distilled-600m: enabled: true - backend: "local_nllb" - model_id: "facebook/nllb-200-distilled-600M" - model_dir: "./models/translation/facebook/nllb-200-distilled-600M" - ct2_model_dir: "./models/translation/facebook/nllb-200-distilled-600M/ctranslate2-float16" - ct2_compute_type: "float16" - ct2_conversion_quantization: "float16" + backend: local_nllb + model_id: facebook/nllb-200-distilled-600M + model_dir: ./models/translation/facebook/nllb-200-distilled-600M + ct2_model_dir: ./models/translation/facebook/nllb-200-distilled-600M/ctranslate2-float16 + ct2_compute_type: float16 + ct2_conversion_quantization: float16 ct2_auto_convert: true ct2_inter_threads: 4 ct2_intra_threads: 0 ct2_max_queued_batches: 32 - ct2_batch_type: "examples" - ct2_decoding_length_mode: "source" + ct2_batch_type: examples + ct2_decoding_length_mode: source ct2_decoding_length_extra: 8 ct2_decoding_length_min: 32 - device: "cuda" - torch_dtype: "float16" + device: cuda + torch_dtype: float16 batch_size: 64 max_input_length: 256 max_new_tokens: 64 @@ -354,19 +282,19 @@ services: use_cache: true opus-mt-zh-en: enabled: false - backend: "local_marian" - model_id: "Helsinki-NLP/opus-mt-zh-en" - model_dir: "./models/translation/Helsinki-NLP/opus-mt-zh-en" - ct2_model_dir: "./models/translation/Helsinki-NLP/opus-mt-zh-en/ctranslate2-float16" - ct2_compute_type: "float16" - ct2_conversion_quantization: "float16" + backend: local_marian + model_id: Helsinki-NLP/opus-mt-zh-en + model_dir: ./models/translation/Helsinki-NLP/opus-mt-zh-en + ct2_model_dir: ./models/translation/Helsinki-NLP/opus-mt-zh-en/ctranslate2-float16 + ct2_compute_type: float16 + ct2_conversion_quantization: float16 ct2_auto_convert: true ct2_inter_threads: 1 ct2_intra_threads: 0 ct2_max_queued_batches: 0 - ct2_batch_type: "examples" - device: "cuda" - torch_dtype: "float16" + ct2_batch_type: examples + device: cuda + torch_dtype: float16 batch_size: 16 max_input_length: 256 max_new_tokens: 256 @@ -374,181 +302,147 @@ services: use_cache: true opus-mt-en-zh: enabled: false - backend: "local_marian" - model_id: "Helsinki-NLP/opus-mt-en-zh" - model_dir: "./models/translation/Helsinki-NLP/opus-mt-en-zh" - ct2_model_dir: "./models/translation/Helsinki-NLP/opus-mt-en-zh/ctranslate2-float16" - ct2_compute_type: "float16" - ct2_conversion_quantization: "float16" + backend: local_marian + model_id: Helsinki-NLP/opus-mt-en-zh + model_dir: ./models/translation/Helsinki-NLP/opus-mt-en-zh + ct2_model_dir: ./models/translation/Helsinki-NLP/opus-mt-en-zh/ctranslate2-float16 + ct2_compute_type: float16 + ct2_conversion_quantization: float16 ct2_auto_convert: true ct2_inter_threads: 1 ct2_intra_threads: 0 ct2_max_queued_batches: 0 - ct2_batch_type: "examples" - device: "cuda" - torch_dtype: "float16" + ct2_batch_type: examples + device: cuda + torch_dtype: float16 batch_size: 16 max_input_length: 256 max_new_tokens: 256 num_beams: 1 use_cache: true embedding: - provider: "http" # http + provider: http providers: http: - text_base_url: "http://127.0.0.1:6005" - image_base_url: "http://127.0.0.1:6008" - # 服务内文本后端(embedding 进程启动时读取) - backend: "tei" # tei | local_st + text_base_url: http://127.0.0.1:6005 + image_base_url: http://127.0.0.1:6008 + backend: tei backends: tei: - base_url: "http://127.0.0.1:8080" + base_url: http://127.0.0.1:8080 timeout_sec: 20 - model_id: "Qwen/Qwen3-Embedding-0.6B" + model_id: Qwen/Qwen3-Embedding-0.6B local_st: - model_id: "Qwen/Qwen3-Embedding-0.6B" - device: "cuda" + model_id: Qwen/Qwen3-Embedding-0.6B + device: cuda batch_size: 32 normalize_embeddings: true - # 服务内图片后端(embedding 进程启动时读取;cnclip gRPC 与 6008 须同一 model_name) - # Chinese-CLIP:ViT-H-14 → 1024 维,ViT-L-14 → 768 维。须与 mappings/search_products.json 中 - # image_embedding.vector.dims 一致(当前索引为 1024 → 默认 ViT-H-14)。 - image_backend: "clip_as_service" # clip_as_service | local_cnclip + image_backend: clip_as_service image_backends: clip_as_service: - server: "grpc://127.0.0.1:51000" - model_name: "CN-CLIP/ViT-L-14" + server: grpc://127.0.0.1:51000 + model_name: CN-CLIP/ViT-L-14 batch_size: 8 normalize_embeddings: true local_cnclip: - model_name: "ViT-L-14" + model_name: ViT-L-14 device: null batch_size: 8 normalize_embeddings: true rerank: - provider: "http" + provider: http providers: http: instances: default: - base_url: "http://127.0.0.1:6007" - service_url: "http://127.0.0.1:6007/rerank" + base_url: http://127.0.0.1:6007 + service_url: http://127.0.0.1:6007/rerank fine: - base_url: "http://127.0.0.1:6009" - service_url: "http://127.0.0.1:6009/rerank" + base_url: http://127.0.0.1:6009 + service_url: http://127.0.0.1:6009/rerank request: max_docs: 1000 normalize: true - default_instance: "default" - # 命名实例:同一套 reranker 代码按实例名读取不同端口 / 后端 / runtime 目录。 + default_instance: default instances: default: - host: "0.0.0.0" + host: 0.0.0.0 port: 6007 - backend: "qwen3_vllm_score" - runtime_dir: "./.runtime/reranker/default" + backend: qwen3_vllm_score + runtime_dir: ./.runtime/reranker/default fine: - host: "0.0.0.0" + host: 0.0.0.0 port: 6009 - backend: "bge" - runtime_dir: "./.runtime/reranker/fine" + backend: bge + runtime_dir: ./.runtime/reranker/fine backends: bge: - model_name: "BAAI/bge-reranker-v2-m3" + model_name: BAAI/bge-reranker-v2-m3 device: null use_fp16: true batch_size: 80 max_length: 160 - cache_dir: "./model_cache" + cache_dir: ./model_cache enable_warmup: true jina_reranker_v3: - model_name: "jinaai/jina-reranker-v3" + model_name: jinaai/jina-reranker-v3 device: null - dtype: "float16" + dtype: float16 batch_size: 64 max_doc_length: 160 max_query_length: 64 sort_by_doc_length: true - cache_dir: "./model_cache" + cache_dir: ./model_cache trust_remote_code: true qwen3_vllm: - model_name: "Qwen/Qwen3-Reranker-0.6B" - engine: "vllm" + model_name: Qwen/Qwen3-Reranker-0.6B + engine: vllm max_model_len: 256 tensor_parallel_size: 1 - gpu_memory_utilization: 0.20 - dtype: "float16" + gpu_memory_utilization: 0.2 + dtype: float16 enable_prefix_caching: true enforce_eager: false infer_batch_size: 100 sort_by_doc_length: true - # standard=_format_instruction__standard(固定 yes/no system);compact=_format_instruction(instruction 作 system 且 user 内重复 Instruct) - instruction_format: standard # compact standard - # instruction: "Given a query, score the product for relevance" - # "rank products by given query" 比 “Given a query, score the product for relevance” 更好点 - # instruction: "rank products by given query, category match first" - # instruction: "Rank products by query relevance, prioritizing category match" - # instruction: "Rank products by query relevance, prioritizing category and style match" - # instruction: "Rank by query relevance, prioritize category & style" - # instruction: "Relevance ranking: category & style match first" - # instruction: "Score product relevance by query with category & style match prioritized" - # instruction: "Rank products by query with category & style match prioritized" - # instruction: "Given a fashion shopping query, retrieve relevant products that answer the query" - instruction: "rank products by given query" - # vLLM LLM.score()(跨编码打分)。独立高性能环境 .venv-reranker-score(vllm 0.18 固定版):./scripts/setup_reranker_venv.sh qwen3_vllm_score - # 与 qwen3_vllm 可共用同一 model_name / HF 缓存;venv 分离以便升级 vLLM 而不影响 generate 后端。 + instruction_format: standard + instruction: rank products by given query qwen3_vllm_score: - model_name: "Qwen/Qwen3-Reranker-0.6B" - # 官方 Hub 原版需 true;若改用已转换的 seq-cls 权重(如 tomaarsen/...-seq-cls)则设为 false + model_name: Qwen/Qwen3-Reranker-0.6B use_original_qwen3_hf_overrides: true - # vllm_runner: "auto" - # vllm_convert: "auto" - # 可选:在 use_original_qwen3_hf_overrides 为 true 时与内置 overrides 合并 - # hf_overrides: {} - engine: "vllm" + engine: vllm max_model_len: 172 tensor_parallel_size: 1 gpu_memory_utilization: 0.15 - dtype: "float16" + dtype: float16 enable_prefix_caching: true enforce_eager: false infer_batch_size: 80 sort_by_doc_length: true - # 默认 standard 与 vLLM 官方 Qwen3 reranker 前缀一致 - instruction_format: standard # compact standard - # instruction: "Rank products by query with category & style match prioritized" - instruction: "Rank products by query with category & style match prioritized" - # instruction: "Given a shopping query, rank products by relevance" + instruction_format: standard + instruction: Rank products by query with category & style match prioritized qwen3_transformers: - model_name: "Qwen/Qwen3-Reranker-0.6B" - instruction: "rank products by given query" - # instruction: "Score the product’s relevance to the given query" + model_name: Qwen/Qwen3-Reranker-0.6B + instruction: rank products by given query max_length: 8192 batch_size: 64 use_fp16: true - # sdpa:默认无需 flash-attn;若已安装 flash_attn 可改为 flash_attention_2 - attn_implementation: "sdpa" - # Packed Transformers backend: shared query prefix + custom position_ids/attention_mask. - # For 1 query + many short docs (for example 400 product titles), this usually reduces - # repeated prefix work and padding waste compared with pairwise batching. + attn_implementation: sdpa qwen3_transformers_packed: - model_name: "Qwen/Qwen3-Reranker-0.6B" - instruction: "Rank products by query with category & style match prioritized" + model_name: Qwen/Qwen3-Reranker-0.6B + instruction: Rank products by query with category & style match prioritized max_model_len: 256 max_doc_len: 160 max_docs_per_pack: 0 use_fp16: true sort_by_doc_length: true - # Packed mode relies on a custom 4D attention mask. "eager" is the safest default. - # If your torch/transformers stack validates it, you can benchmark "sdpa". - attn_implementation: "eager" + attn_implementation: eager qwen3_gguf: - repo_id: "DevQuasar/Qwen.Qwen3-Reranker-4B-GGUF" - filename: "*Q8_0.gguf" - cache_dir: "./model_cache" - local_dir: "./models/reranker/qwen3-reranker-4b-gguf" - instruction: "Rank products by query with category & style match prioritized" - # T4 16GB / 性能优先配置:全量层 offload,实测比保守配置明显更快 + repo_id: DevQuasar/Qwen.Qwen3-Reranker-4B-GGUF + filename: '*Q8_0.gguf' + cache_dir: ./model_cache + local_dir: ./models/reranker/qwen3-reranker-4b-gguf + instruction: Rank products by query with category & style match prioritized n_ctx: 512 n_batch: 512 n_ubatch: 512 @@ -562,17 +456,15 @@ services: use_mlock: false infer_batch_size: 8 sort_by_doc_length: true - length_sort_mode: "char" + length_sort_mode: char enable_warmup: true verbose: false qwen3_gguf_06b: - repo_id: "ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF" - filename: "qwen3-reranker-0.6b-q8_0.gguf" - cache_dir: "./model_cache" - local_dir: "./models/reranker/qwen3-reranker-0.6b-q8_0-gguf" - instruction: "Rank products by query with category & style match prioritized" - # 0.6B GGUF / online rerank baseline: - # 实测 400 titles 单请求约 265s,因此它更适合作为低显存功能后备,不适合在线低延迟主路由。 + repo_id: ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF + filename: qwen3-reranker-0.6b-q8_0.gguf + cache_dir: ./model_cache + local_dir: ./models/reranker/qwen3-reranker-0.6b-q8_0-gguf + instruction: Rank products by query with category & style match prioritized n_ctx: 256 n_batch: 256 n_ubatch: 256 @@ -586,54 +478,57 @@ services: use_mlock: false infer_batch_size: 32 sort_by_doc_length: true - length_sort_mode: "char" + length_sort_mode: char reuse_query_state: false enable_warmup: true verbose: false dashscope_rerank: - model_name: "qwen3-rerank" - # 按地域选择 endpoint: - # 中国: https://dashscope.aliyuncs.com/compatible-api/v1/reranks - # 新加坡: https://dashscope-intl.aliyuncs.com/compatible-api/v1/reranks - # 美国: https://dashscope-us.aliyuncs.com/compatible-api/v1/reranks - endpoint: "https://dashscope.aliyuncs.com/compatible-api/v1/reranks" - api_key_env: "RERANK_DASHSCOPE_API_KEY_CN" - timeout_sec: 10.0 # - top_n_cap: 0 # 0 表示 top_n=当前请求文档数;>0 则限制 top_n 上限 - batchsize: 64 # 0 关闭;>0 启用并发小包调度(top_n/top_n_cap 仍生效,分包后全局截断) - instruct: "Given a shopping query, rank product titles by relevance" + model_name: qwen3-rerank + endpoint: https://dashscope.aliyuncs.com/compatible-api/v1/reranks + api_key_env: RERANK_DASHSCOPE_API_KEY_CN + timeout_sec: 10.0 + top_n_cap: 0 + batchsize: 64 + instruct: Given a shopping query, rank product titles by relevance max_retries: 2 retry_backoff_sec: 0.2 - -# SPU配置(已启用,使用嵌套skus) spu_config: enabled: true - spu_field: "spu_id" + spu_field: spu_id inner_hits_size: 10 - # 配置哪些option维度参与检索(进索引、以及在线搜索) - # 格式为list,选择option1/option2/option3中的一个或多个 - searchable_option_dimensions: ['option1', 'option2', 'option3'] - -# 租户配置(Tenant Configuration) -# 每个租户可配置主语言 primary_language 与索引语言 index_languages(主市场语言,商家可勾选) -# 默认 index_languages: [en, zh],可配置为任意 SOURCE_LANG_CODE_MAP.keys() 的子集 + searchable_option_dimensions: + - option1 + - option2 + - option3 tenant_config: default: - primary_language: "en" - index_languages: ["en", "zh"] + primary_language: en + index_languages: + - en + - zh tenants: - "1": - primary_language: "zh" - index_languages: ["zh", "en"] - "2": - primary_language: "en" - index_languages: ["en", "zh"] - "3": - primary_language: "zh" - index_languages: ["zh", "en"] - "162": - primary_language: "zh" - index_languages: ["zh", "en"] - "170": - primary_language: "en" - index_languages: ["en", "zh"] + '1': + primary_language: zh + index_languages: + - zh + - en + '2': + primary_language: en + index_languages: + - en + - zh + '3': + primary_language: zh + index_languages: + - zh + - en + '162': + primary_language: zh + index_languages: + - zh + - en + '170': + primary_language: en + index_languages: + - en + - zh diff --git a/scripts/evaluation/README.md b/scripts/evaluation/README.md deleted file mode 100644 index 50efc48..0000000 --- a/scripts/evaluation/README.md +++ /dev/null @@ -1,105 +0,0 @@ -**Reference Materials:** - -1. Search Interface: - -```bash -export BASE_URL="${BASE_URL:-http://localhost:6002}" -export TENANT_ID="${TENANT_ID:-163}" # Change to your tenant ID -``` -```bash -curl -sS "$BASE_URL/search/" \ - -H "Content-Type: application/json" \ - -H "X-Tenant-ID: $TENANT_ID" \ - -d '{ - "query": "Barbie doll", - "size": 20, - "from": 0, - "language": "zh" - }' -``` - -response: -{ - "results": [ - { - "spu_id": "12345", - "title": "Barbie Fashion Doll", - "image_url": "https://example.com/image.jpg", - "specifications":[], - "skus":[{"sku_id":"... -... - -2. Reranking Service: -```bash -curl -X POST "http://localhost:6007/rerank" \ - -H "Content-Type: application/json" \ - -d '{ - "query": "toy Barbie", - "docs": ["12PCS 6 Types of Dolls with Bottles", "Cotton T-shirt Short Sleeve"], - "top_n":386, - "normalize": true - }' -``` - -3. Query by Specific Fields: `es_debug_search.py` - -**Main Tasks:** - -1. **Establish Evaluation Tooling:** - Note: To judge result quality, a unified evaluation tool must be used. Do not define keyword-matching rules per query to determine relevance—this is not scalable, prone to misjudgment, complicated, and difficult to extend to other search terms. - Therefore, build a search result evaluation tool and a multi-result comparison tool, to be called by the subsequent annotation set construction tool. The internal implementation may call an LLM to judge, clearly defining what counts as highly relevant, partially relevant, and irrelevant. - - Prompt: - ``` - You are an e-commerce search result relevance evaluation assistant. Based on the user query and each product's information, output the relevance level for the product. - - ## Relevance Level Criteria - Exact — Fully matches the user's search intent. - Partial — Primary intent satisfied (same category or similar use, basically aligns with search intent), but secondary attributes (e.g., color, style, size) deviate from or cannot be confirmed against user needs. - Irrelevant — Category or use case mismatched, primary intent not satisfied. - - 1. {title1} {option1_value1} {option2_value1} {option3_value1} - 2. {title2} {option1_value2} {option2_value2}, {option3_value2} - ... - 50. {title50} {option1_value50} {option2_value50} {option3_value50} - - ## Output Format - Strictly output {input_nums} lines, each line containing exactly one of Exact / Partial / Irrelevant. They must correspond sequentially to the 50 products above. Do not output any other information. - ``` - -2. **Test Set (Result Annotation) Construction:** - Source: `@queries/queries.txt` - - For each query: - 1. Retrieval: - - Use the search interface to retrieve 1k results. - - Traverse the entire product database, obtain the title of each SPU, call the reranking model to perform full ranking, and obtain the top 10k results. Note: Reranking model scores must be cached (local file cache is sufficient; key = query + title -> rerank_score). - 2. For the above results, split into batches and call the LLM to annotate the results. - 3. Consider how to store the results to facilitate future comparison, usage, and presentation. - -3. **Evaluation Tool Web Page:** - Design a search evaluation interactive page on port 6010. - Page theme: a search box at the top. When a search is issued, the page below shows overall metrics for this result set and the top 100 results (with pagination). - - Overall Metrics: - | Metric | Meaning | - |--------|---------| - | **P@5, P@10, P@20, P@50** | Precision at top K where only level 3 (Exact) counts as relevant | - | **P@5_2_3 ~ P@50_2_3** | Precision at top K where both level 2 (Partial) and level 3 (Exact) count as relevant | - | **MAP_3** | Mean Average Precision when only level 3 (Exact) is relevant (single query) | - | **MAP_2_3** | Mean Average Precision when both level 2 and level 3 are relevant | - - Results List: - Displayed row by row. For each row, on the left side show the annotation value (three levels, also color-coded). Display the image, title (en), and the first SKU's option1/2/3 values (shown in three lines, these three lines aligned horizontally with the image on the left). - - Leftmost part of the evaluation page: - Queries default to `queries/queries.txt`. Populate them in a list box on the left. Click any query to trigger its search. - -4. **Batch Evaluation Tool:** - Provide a batch execution script. - Additionally, create a batch evaluation page. Click a "Batch Evaluation" button to sequentially search for all queries, then aggregate overall metrics and generate a report. The report name should include a timestamp and some key information. Also record the main search program's `config.yaml` at that time. - Carefully design how to switch between the two modes (single query evaluation vs batch evaluation) on the same port, supporting these two different interactive contents. - Batch evaluation focuses on the aggregated metrics across all search terms. - It needs to record the test environment timestamp, the corresponding configuration file, and the results. All historical evaluation records should be saved, and for each evaluation result, it should be possible to look up the corresponding configuration file and associated metrics. - -The above is my overall design, but there may be gaps. You should understand my requirements at a higher level. You have sufficient freedom to adjust the design appropriately, drawing on best practices in automated search evaluation frameworks, to produce a superior design and implementation. \ No newline at end of file diff --git a/scripts/evaluation/README_Requirement.md b/scripts/evaluation/README_Requirement.md new file mode 100644 index 0000000..50efc48 --- /dev/null +++ b/scripts/evaluation/README_Requirement.md @@ -0,0 +1,105 @@ +**Reference Materials:** + +1. Search Interface: + +```bash +export BASE_URL="${BASE_URL:-http://localhost:6002}" +export TENANT_ID="${TENANT_ID:-163}" # Change to your tenant ID +``` +```bash +curl -sS "$BASE_URL/search/" \ + -H "Content-Type: application/json" \ + -H "X-Tenant-ID: $TENANT_ID" \ + -d '{ + "query": "Barbie doll", + "size": 20, + "from": 0, + "language": "zh" + }' +``` + +response: +{ + "results": [ + { + "spu_id": "12345", + "title": "Barbie Fashion Doll", + "image_url": "https://example.com/image.jpg", + "specifications":[], + "skus":[{"sku_id":"... +... + +2. Reranking Service: +```bash +curl -X POST "http://localhost:6007/rerank" \ + -H "Content-Type: application/json" \ + -d '{ + "query": "toy Barbie", + "docs": ["12PCS 6 Types of Dolls with Bottles", "Cotton T-shirt Short Sleeve"], + "top_n":386, + "normalize": true + }' +``` + +3. Query by Specific Fields: `es_debug_search.py` + +**Main Tasks:** + +1. **Establish Evaluation Tooling:** + Note: To judge result quality, a unified evaluation tool must be used. Do not define keyword-matching rules per query to determine relevance—this is not scalable, prone to misjudgment, complicated, and difficult to extend to other search terms. + Therefore, build a search result evaluation tool and a multi-result comparison tool, to be called by the subsequent annotation set construction tool. The internal implementation may call an LLM to judge, clearly defining what counts as highly relevant, partially relevant, and irrelevant. + + Prompt: + ``` + You are an e-commerce search result relevance evaluation assistant. Based on the user query and each product's information, output the relevance level for the product. + + ## Relevance Level Criteria + Exact — Fully matches the user's search intent. + Partial — Primary intent satisfied (same category or similar use, basically aligns with search intent), but secondary attributes (e.g., color, style, size) deviate from or cannot be confirmed against user needs. + Irrelevant — Category or use case mismatched, primary intent not satisfied. + + 1. {title1} {option1_value1} {option2_value1} {option3_value1} + 2. {title2} {option1_value2} {option2_value2}, {option3_value2} + ... + 50. {title50} {option1_value50} {option2_value50} {option3_value50} + + ## Output Format + Strictly output {input_nums} lines, each line containing exactly one of Exact / Partial / Irrelevant. They must correspond sequentially to the 50 products above. Do not output any other information. + ``` + +2. **Test Set (Result Annotation) Construction:** + Source: `@queries/queries.txt` + + For each query: + 1. Retrieval: + - Use the search interface to retrieve 1k results. + - Traverse the entire product database, obtain the title of each SPU, call the reranking model to perform full ranking, and obtain the top 10k results. Note: Reranking model scores must be cached (local file cache is sufficient; key = query + title -> rerank_score). + 2. For the above results, split into batches and call the LLM to annotate the results. + 3. Consider how to store the results to facilitate future comparison, usage, and presentation. + +3. **Evaluation Tool Web Page:** + Design a search evaluation interactive page on port 6010. + Page theme: a search box at the top. When a search is issued, the page below shows overall metrics for this result set and the top 100 results (with pagination). + + Overall Metrics: + | Metric | Meaning | + |--------|---------| + | **P@5, P@10, P@20, P@50** | Precision at top K where only level 3 (Exact) counts as relevant | + | **P@5_2_3 ~ P@50_2_3** | Precision at top K where both level 2 (Partial) and level 3 (Exact) count as relevant | + | **MAP_3** | Mean Average Precision when only level 3 (Exact) is relevant (single query) | + | **MAP_2_3** | Mean Average Precision when both level 2 and level 3 are relevant | + + Results List: + Displayed row by row. For each row, on the left side show the annotation value (three levels, also color-coded). Display the image, title (en), and the first SKU's option1/2/3 values (shown in three lines, these three lines aligned horizontally with the image on the left). + + Leftmost part of the evaluation page: + Queries default to `queries/queries.txt`. Populate them in a list box on the left. Click any query to trigger its search. + +4. **Batch Evaluation Tool:** + Provide a batch execution script. + Additionally, create a batch evaluation page. Click a "Batch Evaluation" button to sequentially search for all queries, then aggregate overall metrics and generate a report. The report name should include a timestamp and some key information. Also record the main search program's `config.yaml` at that time. + Carefully design how to switch between the two modes (single query evaluation vs batch evaluation) on the same port, supporting these two different interactive contents. + Batch evaluation focuses on the aggregated metrics across all search terms. + It needs to record the test environment timestamp, the corresponding configuration file, and the results. All historical evaluation records should be saved, and for each evaluation result, it should be possible to look up the corresponding configuration file and associated metrics. + +The above is my overall design, but there may be gaps. You should understand my requirements at a higher level. You have sufficient freedom to adjust the design appropriately, drawing on best practices in automated search evaluation frameworks, to produce a superior design and implementation. \ No newline at end of file diff --git a/scripts/evaluation/README_Requirement_zh.md b/scripts/evaluation/README_Requirement_zh.md new file mode 100644 index 0000000..cdaa439 --- /dev/null +++ b/scripts/evaluation/README_Requirement_zh.md @@ -0,0 +1,123 @@ +参考资料: + +1. 搜索接口: + +```bash +export BASE_URL="${BASE_URL:-http://localhost:6002}" +export TENANT_ID="${TENANT_ID:-163}" # 改成你的租户ID +``` +```bash +curl -sS "$BASE_URL/search/" \ + -H "Content-Type: application/json" \ + -H "X-Tenant-ID: $TENANT_ID" \ + -d '{ + "query": "芭比娃娃", + "size": 20, + "from": 0, + "language": "zh" + }' +``` + +response: +{ + "results": [ + { + "spu_id": "12345", + "title": "芭比时尚娃娃", + "image_url": "https://example.com/image.jpg", + "specifications":[], + "skus":[{"sku_id":" ... +... + +2. 重排服务: +curl -X POST "http://localhost:6007/rerank" \ + -H "Content-Type: application/json" \ + -d '{ + "query": "玩具 芭比", + "docs": ["12PCS 6 Types of Dolls with Bottles", "纯棉T恤 短袖"], + "top_n":386, + "normalize": true + }' + + +3. 基于指定字段查询:es_debug_search.py + + +主要任务: +1. 评估工具的建立: +注意判断结果好坏,要用统一的评估工具,不要对每个query设定关键词匹配的规则来判断是否符合要求,这样不可扩展,这种方式且容易有误判还是复杂,并且不好扩展到其他搜索词。 +因此要做一个搜索结果评估工具、多个结果对比的工具,供后面的标注集合构建工具调用。工具内部实现可以是调用大模型来判断,说清楚什么叫高相关、基本相关、不相关: + +prompt: +```bash +你是一个电商搜索结果相关性评估助手。请根据用户查询(query)和每个商品的信息,输出该商品的相关性等级。 + +## 相关性等级标准 +Exact 完全相关 — 完全匹配用户搜索需求。 +Partial 部分相关 — 主意图满足(同品类或相近用途,基本上符合搜索意图),但次要属性(如颜色、风格、尺码等)跟用户需求有偏差或无法确认。 +Irrelevant 不相关 — 品类或用途不符,主诉求未满足。 + +1. {title1} {option1_value1} {option2_value1} {option3_value1} +2. {title2} {option1_value2} {option2_value2}, {option3_value2} +... +50. {title50} {option1_value50} {option2_value50} {option3_value50} + +## 输出格式 +严格输出 {input_nums} 行,每行仅Exact / Partial / Irrelevant三者之一。按顺序对应上述 50 个商品。不要输出任何其他任何信息 +``` + + +2. 测试集(结果标注)建立: +@queries/queries.txt + +对其中每一个query: +1. 召回: +1)参考搜索接口 召回1k结果。 +2)遍历全库,得到每个spu的title,请求重排模型,进行全排序,得到top1w结果。注意重排模型打分一定要做缓存(本地文件缓存即可。query+title->rerank_score)。 +2. 对以上结果,拆分batch请求llm,进行结果标注。 +3. 请你思考如何存储结果、并利于以后的对比、使用、展示。 + + +3. 评估工具页面: +请你设计一个搜索评估交互页面。端口6010。 +页面主题:上方是搜索框,如果发起搜索,那么下方给出本次结果的总体指标以及top100结果(允许翻页) + +总体指标: +| 指标 | 含义 | +|------|------| +| **P@5, P@10, P@20, P@50** | 前 K 个结果中「仅 3 相关」的精确率 | +| **P@5_2_3 ~ P@50_2_3** | 前 K 个结果中「2 和 3 都算相关」的精确率 | +| **MAP_3** | 仅 3 相关时的 Average Precision(单 query) | +| **MAP_2_3** | 2 和 3 都相关时的 Average Precision | + +结果列表: +按行列下来,每行左侧给每个结果找到标注值(三个等级。对结果也可以颜色标记),展示图片,title.en+title.en+首个sku的option1/2/3_value(分三行展示,这三行和左侧的图片并列) + + +评测页面最左侧: +queries默认是queries/queries.txt,填入左侧列表框,点击其中任何一个发起搜索。 + +4. 批量评估工具 + +给一个批量执行脚本, + +这里要新增一个批量评估的页面。点击批量评估的按钮,对所有搜索词依次发起搜索,最后汇总总体的评估指标,生成报告,报告名称带上时间标记和一些关键信息。并且记录当时的主搜索程序的config.yaml。 +你需要精心地设计如何切换两种模式,通过同一个端口承载这两种不同交互的内容。 +批量评估关注的是所有搜索词总体的评估指标。 +需要记录测试环境时间以及当时的配置文件,以及对应的结果。要保存历次的评估记录,并能查到每一次评估结果对应的配置文件有相关的指标 + +以上是我的总体设计,但有不周全的地方。你要站在更高的层次理解我的需求,你有足够的自由可以适当调整设计,基于你所了解的自动化搜索评估框架的最佳实践,做出更优秀的设计和更好的实现。 + + + + + + + + + +1. 请仔细检验这个标注集的质量,如果质量不符合要求,那么你要优化工具,迭代直至标注集的结果质量足够高,可以以此为自动化工具来评估检索效果,对检索效果形成指导性意见。 +2. 在结果标注集的质量足够好,批量评估工具足够好用,并且经过你的试用,能判断出搜索质量好坏的情况下,开始真正的动手检索效果调优:基于这个50条query的结果标注集和批量评估工具,对融合公式进行调参。请你先精心地设计实验,设计几组参数,对几组参数分别修改config.yaml、重启(./restart.sh backend)、跑批量评估、收集结果。 +注意评估的过程中,如果发现工具不好用,发现日志不全,发现可以通过修改工具或者日志来提高效率,都可以先做这些,根据完善。 +注意你是代码的总负责人,你有任何权限来满足你进行检索效果调优的需要。你如果发现有其他可能带来更大提升的点,也可以进行实验,你甚至可以修改融合、重排漏斗的代码,来进行实验,以追求更好的结果指标。 +但是注意,因为收到性能和耗时的约束,不要调大reranker模型的输入条数、不要打开精排,耗时方面无法承受两轮reranker模型的调用。 diff --git a/scripts/evaluation/README_zh.md b/scripts/evaluation/README_zh.md deleted file mode 100644 index 02cee86..0000000 --- a/scripts/evaluation/README_zh.md +++ /dev/null @@ -1,109 +0,0 @@ -参考资料: - -1. 搜索接口: - -```bash -export BASE_URL="${BASE_URL:-http://localhost:6002}" -export TENANT_ID="${TENANT_ID:-163}" # 改成你的租户ID -``` -```bash -curl -sS "$BASE_URL/search/" \ - -H "Content-Type: application/json" \ - -H "X-Tenant-ID: $TENANT_ID" \ - -d '{ - "query": "芭比娃娃", - "size": 20, - "from": 0, - "language": "zh" - }' -``` - -response: -{ - "results": [ - { - "spu_id": "12345", - "title": "芭比时尚娃娃", - "image_url": "https://example.com/image.jpg", - "specifications":[], - "skus":[{"sku_id":" ... -... - -2. 重排服务: -curl -X POST "http://localhost:6007/rerank" \ - -H "Content-Type: application/json" \ - -d '{ - "query": "玩具 芭比", - "docs": ["12PCS 6 Types of Dolls with Bottles", "纯棉T恤 短袖"], - "top_n":386, - "normalize": true - }' - - -3. 基于指定字段查询:es_debug_search.py - - -主要任务: -1. 评估工具的建立: -注意判断结果好坏,要用统一的评估工具,不要对每个query设定关键词匹配的规则来判断是否符合要求,这样不可扩展,这种方式且容易有误判还是复杂,并且不好扩展到其他搜索词。 -因此要做一个搜索结果评估工具、多个结果对比的工具,供后面的标注集合构建工具调用。工具内部实现可以是调用大模型来判断,说清楚什么叫高相关、基本相关、不相关: - -prompt: -```bash -你是一个电商搜索结果相关性评估助手。请根据用户查询(query)和每个商品的信息,输出该商品的相关性等级。 - -## 相关性等级标准 -Exact 完全相关 — 完全匹配用户搜索需求。 -Partial 部分相关 — 主意图满足(同品类或相近用途,基本上符合搜索意图),但次要属性(如颜色、风格、尺码等)跟用户需求有偏差或无法确认。 -Irrelevant 不相关 — 品类或用途不符,主诉求未满足。 - -1. {title1} {option1_value1} {option2_value1} {option3_value1} -2. {title2} {option1_value2} {option2_value2}, {option3_value2} -... -50. {title50} {option1_value50} {option2_value50} {option3_value50} - -## 输出格式 -严格输出 {input_nums} 行,每行仅Exact / Partial / Irrelevant三者之一。按顺序对应上述 50 个商品。不要输出任何其他任何信息 -``` - - -2. 测试集(结果标注)建立: -@queries/queries.txt - -对其中每一个query: -1. 召回: -1)参考搜索接口 召回1k结果。 -2)遍历全库,得到每个spu的title,请求重排模型,进行全排序,得到top1w结果。注意重排模型打分一定要做缓存(本地文件缓存即可。query+title->rerank_score)。 -2. 对以上结果,拆分batch请求llm,进行结果标注。 -3. 请你思考如何存储结果、并利于以后的对比、使用、展示。 - - -3. 评估工具页面: -请你设计一个搜索评估交互页面。端口6010。 -页面主题:上方是搜索框,如果发起搜索,那么下方给出本次结果的总体指标以及top100结果(允许翻页) - -总体指标: -| 指标 | 含义 | -|------|------| -| **P@5, P@10, P@20, P@50** | 前 K 个结果中「仅 3 相关」的精确率 | -| **P@5_2_3 ~ P@50_2_3** | 前 K 个结果中「2 和 3 都算相关」的精确率 | -| **MAP_3** | 仅 3 相关时的 Average Precision(单 query) | -| **MAP_2_3** | 2 和 3 都相关时的 Average Precision | - -结果列表: -按行列下来,每行左侧给每个结果找到标注值(三个等级。对结果也可以颜色标记),展示图片,title.en+title.en+首个sku的option1/2/3_value(分三行展示,这三行和左侧的图片并列) - - -评测页面最左侧: -queries默认是queries/queries.txt,填入左侧列表框,点击其中任何一个发起搜索。 - -4. 批量评估工具 - -给一个批量执行脚本, - -这里要新增一个批量评估的页面。点击批量评估的按钮,对所有搜索词依次发起搜索,最后汇总总体的评估指标,生成报告,报告名称带上时间标记和一些关键信息。并且记录当时的主搜索程序的config.yaml。 -你需要精心地设计如何切换两种模式,通过同一个端口承载这两种不同交互的内容。 -批量评估关注的是所有搜索词总体的评估指标。 -需要记录测试环境时间以及当时的配置文件,以及对应的结果。要保存历次的评估记录,并能查到每一次评估结果对应的配置文件有相关的指标 - -以上是我的总体设计,但有不周全的地方。你要站在更高的层次理解我的需求,你有足够的自由可以适当调整设计,基于你所了解的自动化搜索评估框架的最佳实践,做出更优秀的设计和更好的实现。 \ No newline at end of file diff --git a/scripts/evaluation/build_annotation_set.py b/scripts/evaluation/build_annotation_set.py new file mode 100644 index 0000000..409906f --- /dev/null +++ b/scripts/evaluation/build_annotation_set.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +from pathlib import Path +import sys + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from scripts.evaluation.eval_framework import main + + +if __name__ == "__main__": + main() diff --git a/scripts/evaluation/eval_framework.py b/scripts/evaluation/eval_framework.py new file mode 100644 index 0000000..5b52530 --- /dev/null +++ b/scripts/evaluation/eval_framework.py @@ -0,0 +1,1786 @@ +#!/usr/bin/env python3 +""" +Search evaluation framework for pooled relevance annotation, live metrics, and reports. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import math +import os +import re +import sqlite3 +import sys +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple + +import requests +from elasticsearch.helpers import scan +from fastapi import FastAPI, HTTPException +from fastapi.responses import HTMLResponse +from pydantic import BaseModel, Field + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from api.app import get_app_config, get_es_client, get_query_parser, init_service +from indexer.mapping_generator import get_tenant_index_name + + +RELEVANCE_EXACT = "Exact" +RELEVANCE_PARTIAL = "Partial" +RELEVANCE_IRRELEVANT = "Irrelevant" +VALID_LABELS = {RELEVANCE_EXACT, RELEVANCE_PARTIAL, RELEVANCE_IRRELEVANT} +DEFAULT_ARTIFACT_ROOT = PROJECT_ROOT / "artifacts" / "search_evaluation" +DEFAULT_QUERY_FILE = PROJECT_ROOT / "scripts" / "evaluation" / "queries" / "queries.txt" +JUDGE_PROMPT_VERSION = "v2_structured_20260331" + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def utc_timestamp() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def ensure_dir(path: Path) -> Path: + path.mkdir(parents=True, exist_ok=True) + return path + + +def sha1_text(text: str) -> str: + return hashlib.sha1(text.encode("utf-8")).hexdigest() + + +def pick_text(value: Any, preferred_lang: str = "en") -> str: + if value is None: + return "" + if isinstance(value, dict): + return str( + value.get(preferred_lang) + or value.get("en") + or value.get("zh") + or next((v for v in value.values() if v), "") + ).strip() + return str(value).strip() + + +def safe_json_dumps(data: Any) -> str: + return json.dumps(data, ensure_ascii=False, separators=(",", ":")) + + +def compact_option_values(skus: Sequence[Dict[str, Any]]) -> Tuple[str, str, str]: + if not skus: + return "", "", "" + first = skus[0] or {} + return ( + str(first.get("option1_value") or "").strip(), + str(first.get("option2_value") or "").strip(), + str(first.get("option3_value") or "").strip(), + ) + + +def build_display_title(doc: Dict[str, Any]) -> str: + title = doc.get("title") + en = pick_text(title, "en") + zh = pick_text(title, "zh") + if en and zh and en != zh: + return f"{en} / {zh}" + return en or zh + + +def build_rerank_doc(doc: Dict[str, Any]) -> str: + title = build_display_title(doc) + return title[:400] + + +def build_label_doc_line(idx: int, doc: Dict[str, Any]) -> str: + title = build_display_title(doc) + option1, option2, option3 = compact_option_values(doc.get("skus") or []) + vendor = pick_text(doc.get("vendor"), "en") + category = pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en") + tags = doc.get("tags") or [] + tags_text = ", ".join(str(tag) for tag in tags[:4] if tag) + parts = [title] + if option1: + parts.append(f"option1={option1}") + if option2: + parts.append(f"option2={option2}") + if option3: + parts.append(f"option3={option3}") + if vendor: + parts.append(f"vendor={vendor}") + if category: + parts.append(f"category={category}") + if tags_text: + parts.append(f"tags={tags_text}") + return f"{idx}. " + " | ".join(part for part in parts if part) + + +def compact_product_payload(doc: Dict[str, Any]) -> Dict[str, Any]: + return { + "spu_id": str(doc.get("spu_id") or ""), + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "vendor": pick_text(doc.get("vendor"), "en"), + "category": pick_text(doc.get("category_path"), "en") or pick_text(doc.get("category_name"), "en"), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "tags": list((doc.get("tags") or [])[:6]), + } + + +def normalize_text(text: Any) -> str: + value = str(text or "").strip().lower() + value = re.sub(r"\s+", " ", value) + return value + + +def _extract_json_blob(text: str) -> Any: + cleaned = str(text or "").strip() + candidates: List[str] = [cleaned] + fence_matches = re.findall(r"```(?:json)?\s*(.*?)```", cleaned, flags=re.S | re.I) + candidates.extend(match.strip() for match in fence_matches if match.strip()) + + for candidate in candidates: + try: + return json.loads(candidate) + except Exception: + pass + + starts = [idx for idx, ch in enumerate(cleaned) if ch in "[{"] + ends = [idx for idx, ch in enumerate(cleaned) if ch in "]}"] + for start in starts: + for end in reversed(ends): + if end <= start: + continue + fragment = cleaned[start : end + 1] + try: + return json.loads(fragment) + except Exception: + continue + raise ValueError(f"failed to parse json from: {cleaned[:500]!r}") + + +@dataclass +class QueryBuildResult: + query: str + tenant_id: str + search_total: int + search_depth: int + rerank_corpus_size: int + annotated_count: int + output_json_path: Path + + +class EvalStore: + def __init__(self, db_path: Path): + self.db_path = db_path + ensure_dir(db_path.parent) + self.conn = sqlite3.connect(str(db_path), check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self._init_schema() + + def _init_schema(self) -> None: + self.conn.executescript( + """ + CREATE TABLE IF NOT EXISTS corpus_docs ( + tenant_id TEXT NOT NULL, + spu_id TEXT NOT NULL, + title_json TEXT, + vendor_json TEXT, + category_path_json TEXT, + category_name_json TEXT, + image_url TEXT, + skus_json TEXT, + tags_json TEXT, + raw_json TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, spu_id) + ); + + CREATE TABLE IF NOT EXISTS rerank_scores ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + spu_id TEXT NOT NULL, + score REAL NOT NULL, + model_name TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, spu_id) + ); + + CREATE TABLE IF NOT EXISTS relevance_labels ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + spu_id TEXT NOT NULL, + label TEXT NOT NULL, + judge_model TEXT, + raw_response TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, spu_id) + ); + + CREATE TABLE IF NOT EXISTS build_runs ( + run_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + output_json_path TEXT NOT NULL, + metadata_json TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS batch_runs ( + batch_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + output_json_path TEXT NOT NULL, + report_markdown_path TEXT NOT NULL, + config_snapshot_path TEXT NOT NULL, + metadata_json TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS query_profiles ( + tenant_id TEXT NOT NULL, + query_text TEXT NOT NULL, + prompt_version TEXT NOT NULL, + judge_model TEXT, + profile_json TEXT NOT NULL, + raw_response TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (tenant_id, query_text, prompt_version) + ); + """ + ) + self.conn.commit() + + def upsert_corpus_docs(self, tenant_id: str, docs: Sequence[Dict[str, Any]]) -> None: + now = utc_now_iso() + rows = [] + for doc in docs: + rows.append( + ( + tenant_id, + str(doc.get("spu_id") or ""), + safe_json_dumps(doc.get("title")), + safe_json_dumps(doc.get("vendor")), + safe_json_dumps(doc.get("category_path")), + safe_json_dumps(doc.get("category_name")), + str(doc.get("image_url") or ""), + safe_json_dumps(doc.get("skus") or []), + safe_json_dumps(doc.get("tags") or []), + safe_json_dumps(doc), + now, + ) + ) + self.conn.executemany( + """ + INSERT INTO corpus_docs ( + tenant_id, spu_id, title_json, vendor_json, category_path_json, category_name_json, + image_url, skus_json, tags_json, raw_json, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, spu_id) DO UPDATE SET + title_json=excluded.title_json, + vendor_json=excluded.vendor_json, + category_path_json=excluded.category_path_json, + category_name_json=excluded.category_name_json, + image_url=excluded.image_url, + skus_json=excluded.skus_json, + tags_json=excluded.tags_json, + raw_json=excluded.raw_json, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_corpus_docs(self, tenant_id: str) -> List[Dict[str, Any]]: + rows = self.conn.execute( + "SELECT raw_json FROM corpus_docs WHERE tenant_id=? ORDER BY spu_id", + (tenant_id,), + ).fetchall() + return [json.loads(row["raw_json"]) for row in rows] + + def get_corpus_docs_by_spu_ids(self, tenant_id: str, spu_ids: Sequence[str]) -> Dict[str, Dict[str, Any]]: + keys = [str(spu_id) for spu_id in spu_ids if str(spu_id).strip()] + if not keys: + return {} + placeholders = ",".join("?" for _ in keys) + rows = self.conn.execute( + f""" + SELECT spu_id, raw_json + FROM corpus_docs + WHERE tenant_id=? AND spu_id IN ({placeholders}) + """, + [tenant_id, *keys], + ).fetchall() + return { + str(row["spu_id"]): json.loads(row["raw_json"]) + for row in rows + } + + def has_corpus(self, tenant_id: str) -> bool: + row = self.conn.execute( + "SELECT COUNT(1) AS n FROM corpus_docs WHERE tenant_id=?", + (tenant_id,), + ).fetchone() + return bool(row and row["n"] > 0) + + def get_rerank_scores(self, tenant_id: str, query_text: str) -> Dict[str, float]: + rows = self.conn.execute( + """ + SELECT spu_id, score + FROM rerank_scores + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchall() + return {str(row["spu_id"]): float(row["score"]) for row in rows} + + def upsert_rerank_scores( + self, + tenant_id: str, + query_text: str, + scores: Dict[str, float], + model_name: str, + ) -> None: + now = utc_now_iso() + rows = [ + (tenant_id, query_text, spu_id, float(score), model_name, now) + for spu_id, score in scores.items() + ] + self.conn.executemany( + """ + INSERT INTO rerank_scores (tenant_id, query_text, spu_id, score, model_name, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET + score=excluded.score, + model_name=excluded.model_name, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_labels(self, tenant_id: str, query_text: str) -> Dict[str, str]: + rows = self.conn.execute( + """ + SELECT spu_id, label + FROM relevance_labels + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchall() + return {str(row["spu_id"]): str(row["label"]) for row in rows} + + def upsert_labels( + self, + tenant_id: str, + query_text: str, + labels: Dict[str, str], + judge_model: str, + raw_response: str, + ) -> None: + now = utc_now_iso() + rows = [] + for spu_id, label in labels.items(): + if label not in VALID_LABELS: + raise ValueError(f"invalid label: {label}") + rows.append((tenant_id, query_text, spu_id, label, judge_model, raw_response, now)) + self.conn.executemany( + """ + INSERT INTO relevance_labels (tenant_id, query_text, spu_id, label, judge_model, raw_response, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant_id, query_text, spu_id) DO UPDATE SET + label=excluded.label, + judge_model=excluded.judge_model, + raw_response=excluded.raw_response, + updated_at=excluded.updated_at + """, + rows, + ) + self.conn.commit() + + def get_query_profile(self, tenant_id: str, query_text: str, prompt_version: str) -> Optional[Dict[str, Any]]: + row = self.conn.execute( + """ + SELECT profile_json + FROM query_profiles + WHERE tenant_id=? AND query_text=? AND prompt_version=? + """, + (tenant_id, query_text, prompt_version), + ).fetchone() + if not row: + return None + return json.loads(row["profile_json"]) + + def upsert_query_profile( + self, + tenant_id: str, + query_text: str, + prompt_version: str, + judge_model: str, + profile: Dict[str, Any], + raw_response: str, + ) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO query_profiles + (tenant_id, query_text, prompt_version, judge_model, profile_json, raw_response, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + tenant_id, + query_text, + prompt_version, + judge_model, + safe_json_dumps(profile), + raw_response, + utc_now_iso(), + ), + ) + self.conn.commit() + + def insert_build_run(self, run_id: str, tenant_id: str, query_text: str, output_json_path: Path, metadata: Dict[str, Any]) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO build_runs (run_id, tenant_id, query_text, output_json_path, metadata_json, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (run_id, tenant_id, query_text, str(output_json_path), safe_json_dumps(metadata), utc_now_iso()), + ) + self.conn.commit() + + def insert_batch_run( + self, + batch_id: str, + tenant_id: str, + output_json_path: Path, + report_markdown_path: Path, + config_snapshot_path: Path, + metadata: Dict[str, Any], + ) -> None: + self.conn.execute( + """ + INSERT OR REPLACE INTO batch_runs + (batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + batch_id, + tenant_id, + str(output_json_path), + str(report_markdown_path), + str(config_snapshot_path), + safe_json_dumps(metadata), + utc_now_iso(), + ), + ) + self.conn.commit() + + def list_batch_runs(self, limit: int = 20) -> List[Dict[str, Any]]: + rows = self.conn.execute( + """ + SELECT batch_id, tenant_id, output_json_path, report_markdown_path, config_snapshot_path, metadata_json, created_at + FROM batch_runs + ORDER BY created_at DESC + LIMIT ? + """, + (limit,), + ).fetchall() + items: List[Dict[str, Any]] = [] + for row in rows: + items.append( + { + "batch_id": row["batch_id"], + "tenant_id": row["tenant_id"], + "output_json_path": row["output_json_path"], + "report_markdown_path": row["report_markdown_path"], + "config_snapshot_path": row["config_snapshot_path"], + "metadata": json.loads(row["metadata_json"]), + "created_at": row["created_at"], + } + ) + return items + + def list_query_label_stats(self, tenant_id: str) -> List[Dict[str, Any]]: + rows = self.conn.execute( + """ + SELECT + query_text, + COUNT(*) AS total, + SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, + SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, + SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, + MAX(updated_at) AS updated_at + FROM relevance_labels + WHERE tenant_id=? + GROUP BY query_text + ORDER BY query_text + """, + (tenant_id,), + ).fetchall() + return [ + { + "query": str(row["query_text"]), + "total": int(row["total"]), + "exact_count": int(row["exact_count"] or 0), + "partial_count": int(row["partial_count"] or 0), + "irrelevant_count": int(row["irrelevant_count"] or 0), + "updated_at": row["updated_at"], + } + for row in rows + ] + + def get_query_label_stats(self, tenant_id: str, query_text: str) -> Dict[str, Any]: + row = self.conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN label='Exact' THEN 1 ELSE 0 END) AS exact_count, + SUM(CASE WHEN label='Partial' THEN 1 ELSE 0 END) AS partial_count, + SUM(CASE WHEN label='Irrelevant' THEN 1 ELSE 0 END) AS irrelevant_count, + MAX(updated_at) AS updated_at + FROM relevance_labels + WHERE tenant_id=? AND query_text=? + """, + (tenant_id, query_text), + ).fetchone() + return { + "query": query_text, + "total": int((row["total"] or 0) if row else 0), + "exact_count": int((row["exact_count"] or 0) if row else 0), + "partial_count": int((row["partial_count"] or 0) if row else 0), + "irrelevant_count": int((row["irrelevant_count"] or 0) if row else 0), + "updated_at": row["updated_at"] if row else None, + } + + +class SearchServiceClient: + def __init__(self, base_url: str, tenant_id: str): + self.base_url = base_url.rstrip("/") + self.tenant_id = str(tenant_id) + self.session = requests.Session() + + def search(self, query: str, size: int, from_: int = 0, language: str = "en") -> Dict[str, Any]: + response = self.session.post( + f"{self.base_url}/search/", + headers={"Content-Type": "application/json", "X-Tenant-ID": self.tenant_id}, + json={"query": query, "size": size, "from": from_, "language": language}, + timeout=120, + ) + response.raise_for_status() + return response.json() + + +class RerankServiceClient: + def __init__(self, service_url: str): + self.service_url = service_url.rstrip("/") + self.session = requests.Session() + + def rerank(self, query: str, docs: Sequence[str], normalize: bool = False, top_n: Optional[int] = None) -> Tuple[List[float], Dict[str, Any]]: + payload: Dict[str, Any] = { + "query": query, + "docs": list(docs), + "normalize": normalize, + } + if top_n is not None: + payload["top_n"] = int(top_n) + response = self.session.post(self.service_url, json=payload, timeout=180) + response.raise_for_status() + data = response.json() + return list(data.get("scores") or []), dict(data.get("meta") or {}) + + +class DashScopeLabelClient: + def __init__(self, model: str, base_url: str, api_key: str, batch_size: int = 40): + self.model = model + self.base_url = base_url.rstrip("/") + self.api_key = api_key + self.batch_size = int(batch_size) + self.session = requests.Session() + + def _chat(self, prompt: str) -> Tuple[str, str]: + response = self.session.post( + f"{self.base_url}/chat/completions", + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + }, + json={ + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0, + "top_p": 0.1, + }, + timeout=180, + ) + response.raise_for_status() + data = response.json() + content = str(((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() + return content, safe_json_dumps(data) + + def extract_query_profile( + self, + query: str, + parser_hints: Dict[str, Any], + ) -> Tuple[Dict[str, Any], str]: + prompt = ( + "You are building a structured intent profile for e-commerce relevance judging.\n" + "Use the original user query as the source of truth. Parser hints may help, but if a hint conflicts with the original query, trust the original query.\n" + "Be conservative: only mark an attribute as required if the user explicitly asked for it.\n\n" + "Return JSON with this schema:\n" + "{\n" + ' "normalized_query_en": string,\n' + ' "primary_category": string,\n' + ' "allowed_categories": [string],\n' + ' "required_attributes": [\n' + ' {"name": string, "required_terms": [string], "conflicting_terms": [string], "match_mode": "explicit"}\n' + " ],\n" + ' "notes": [string]\n' + "}\n\n" + "Guidelines:\n" + "- Exact later will require explicit evidence for all required attributes.\n" + "- allowed_categories should contain only near-synonyms of the same product type, not substitutes. For example dress can allow midi dress/cocktail dress, but not skirt, top, jumpsuit, or outfit unless the query explicitly asks for them.\n" + "- If the query asks for dress/skirt/jeans/t-shirt, near but different product types are not Exact.\n" + "- If the query includes color, fit, silhouette, or length, include them as required_attributes.\n" + "- For fit words, include conflicting terms when obvious, e.g. fitted conflicts with oversized/loose; oversized conflicts with fitted/tight.\n" + "- For color, include conflicting colors only when clear from the query.\n\n" + f"Original query: {query}\n" + f"Parser hints JSON: {json.dumps(parser_hints, ensure_ascii=False)}\n" + ) + content, raw_response = self._chat(prompt) + payload = _extract_json_blob(content) + if not isinstance(payload, dict): + raise ValueError(f"unexpected query profile payload: {content!r}") + payload.setdefault("normalized_query_en", query) + payload.setdefault("primary_category", "") + payload.setdefault("allowed_categories", []) + payload.setdefault("required_attributes", []) + payload.setdefault("notes", []) + return payload, raw_response + + def classify_batch( + self, + query: str, + query_profile: Dict[str, Any], + docs: Sequence[Dict[str, Any]], + ) -> Tuple[List[str], str]: + numbered_docs = [build_label_doc_line(idx + 1, doc) for idx, doc in enumerate(docs)] + prompt = ( + "You are an e-commerce search relevance judge.\n" + "Judge each product against the structured query profile below.\n\n" + "Relevance rules:\n" + "- Exact: product type matches the target intent, and every explicit required attribute is positively supported by the title/options/tags/category. If an attribute is missing or only guessed, it is NOT Exact.\n" + "- Partial: main product type/use case matches, but some required attribute is missing, weaker, uncertain, or only approximately matched.\n" + "- Irrelevant: product type/use case mismatched, or an explicit required attribute clearly conflicts.\n" + "- Be conservative with Exact.\n" + "- Graphic/holiday/message tees are not Exact for a plain color/style tee query unless that graphic/theme was requested.\n" + "- Jumpsuit/romper/set is not Exact for dress/skirt/jeans queries.\n\n" + f"Original query: {query}\n" + f"Structured query profile JSON: {json.dumps(query_profile, ensure_ascii=False)}\n\n" + "Products:\n" + + "\n".join(numbered_docs) + + "\n\nReturn JSON only, with schema:\n" + '{"labels":[{"index":1,"label":"Exact","reason":"short phrase"}]}\n' + ) + content, raw_response = self._chat(prompt) + payload = _extract_json_blob(content) + if not isinstance(payload, dict) or not isinstance(payload.get("labels"), list): + raise ValueError(f"unexpected label payload: {content!r}") + labels_payload = payload["labels"] + labels: List[str] = [] + for item in labels_payload[: len(docs)]: + if not isinstance(item, dict): + continue + label = str(item.get("label") or "").strip() + if label in VALID_LABELS: + labels.append(label) + if len(labels) != len(docs) or any(label not in VALID_LABELS for label in labels): + raise ValueError(f"unexpected label output: {content!r}") + return labels, raw_response + + +def precision_at_k(labels: Sequence[str], k: int, relevant: Sequence[str]) -> float: + if k <= 0: + return 0.0 + sliced = list(labels[:k]) + if not sliced: + return 0.0 + hits = sum(1 for label in sliced if label in relevant) + return hits / float(min(k, len(sliced))) + + +def average_precision(labels: Sequence[str], relevant: Sequence[str]) -> float: + hit_count = 0 + precision_sum = 0.0 + for idx, label in enumerate(labels, start=1): + if label not in relevant: + continue + hit_count += 1 + precision_sum += hit_count / idx + if hit_count == 0: + return 0.0 + return precision_sum / hit_count + + +def compute_query_metrics(labels: Sequence[str]) -> Dict[str, float]: + metrics: Dict[str, float] = {} + for k in (5, 10, 20, 50): + metrics[f"P@{k}"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT]), 6) + metrics[f"P@{k}_2_3"] = round(precision_at_k(labels, k, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) + metrics["MAP_3"] = round(average_precision(labels, [RELEVANCE_EXACT]), 6) + metrics["MAP_2_3"] = round(average_precision(labels, [RELEVANCE_EXACT, RELEVANCE_PARTIAL]), 6) + return metrics + + +def aggregate_metrics(metric_items: Sequence[Dict[str, float]]) -> Dict[str, float]: + if not metric_items: + return {} + keys = sorted(metric_items[0].keys()) + return { + key: round(sum(float(item.get(key, 0.0)) for item in metric_items) / len(metric_items), 6) + for key in keys + } + + +def label_distribution(labels: Sequence[str]) -> Dict[str, int]: + return { + RELEVANCE_EXACT: sum(1 for label in labels if label == RELEVANCE_EXACT), + RELEVANCE_PARTIAL: sum(1 for label in labels if label == RELEVANCE_PARTIAL), + RELEVANCE_IRRELEVANT: sum(1 for label in labels if label == RELEVANCE_IRRELEVANT), + } + + +class SearchEvaluationFramework: + def __init__( + self, + tenant_id: str, + artifact_root: Path = DEFAULT_ARTIFACT_ROOT, + search_base_url: str = "http://localhost:6002", + ): + init_service(get_app_config().infrastructure.elasticsearch.host) + self.tenant_id = str(tenant_id) + self.artifact_root = ensure_dir(artifact_root) + self.store = EvalStore(self.artifact_root / "search_eval.sqlite3") + self.search_client = SearchServiceClient(search_base_url, self.tenant_id) + app_cfg = get_app_config() + rerank_service_url = str( + app_cfg.services.rerank.providers["http"]["instances"]["default"]["service_url"] + ) + self.rerank_client = RerankServiceClient(rerank_service_url) + llm_cfg = app_cfg.services.translation.capabilities["llm"] + api_key = app_cfg.infrastructure.secrets.dashscope_api_key + if not api_key: + raise RuntimeError("dashscope_api_key is required for search evaluation annotation") + self.label_client = DashScopeLabelClient( + model=str(llm_cfg["model"]), + base_url=str(llm_cfg["base_url"]), + api_key=str(api_key), + ) + self.query_parser = get_query_parser() + + def build_query_parser_hints(self, query: str) -> Dict[str, Any]: + parsed = self.query_parser.parse(query, generate_vector=False, target_languages=["en", "zh"]) + payload = parsed.to_dict() + payload["text_for_rerank"] = parsed.text_for_rerank() + return payload + + def get_query_profile(self, query: str, force_refresh: bool = False) -> Dict[str, Any]: + if not force_refresh: + cached = self.store.get_query_profile(self.tenant_id, query, JUDGE_PROMPT_VERSION) + if cached is not None: + return cached + parser_hints = self.build_query_parser_hints(query) + profile, raw_response = self.label_client.extract_query_profile(query, parser_hints) + profile["parser_hints"] = parser_hints + self.store.upsert_query_profile( + self.tenant_id, + query, + JUDGE_PROMPT_VERSION, + self.label_client.model, + profile, + raw_response, + ) + return profile + + @staticmethod + def _doc_evidence_text(doc: Dict[str, Any]) -> str: + pieces: List[str] = [ + build_display_title(doc), + pick_text(doc.get("vendor"), "en"), + pick_text(doc.get("category_path"), "en"), + pick_text(doc.get("category_name"), "en"), + ] + for sku in doc.get("skus") or []: + pieces.extend( + [ + str(sku.get("option1_value") or ""), + str(sku.get("option2_value") or ""), + str(sku.get("option3_value") or ""), + ] + ) + for tag in doc.get("tags") or []: + pieces.append(str(tag)) + return normalize_text(" | ".join(piece for piece in pieces if piece)) + + def _apply_rule_based_label_guardrails( + self, + label: str, + query_profile: Dict[str, Any], + doc: Dict[str, Any], + ) -> str: + if label not in VALID_LABELS: + return label + evidence = self._doc_evidence_text(doc) + category = normalize_text(query_profile.get("primary_category")) + allowed_categories = [normalize_text(item) for item in query_profile.get("allowed_categories") or [] if str(item).strip()] + + primary_category_match = True + if category: + primary_category_match = category in evidence + allowed_category_match = True + if allowed_categories: + allowed_category_match = any(signal in evidence for signal in allowed_categories) + + if label == RELEVANCE_EXACT and not primary_category_match: + if allowed_category_match: + label = RELEVANCE_PARTIAL + else: + return RELEVANCE_IRRELEVANT + + for attr in query_profile.get("required_attributes") or []: + if not isinstance(attr, dict): + continue + attr_name = normalize_text(attr.get("name")) + if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style", "waist_style", "rise"}: + continue + required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] + conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] + if attr_name == "fit": + if any(term in {"oversized", "oversize"} for term in required_terms): + conflicting_terms.extend(["slim", "slimming", "fitted", "tight", "close-fitting"]) + if any(term in {"fitted", "slim fit", "tight"} for term in required_terms): + conflicting_terms.extend(["oversized", "oversize", "loose", "relaxed"]) + has_required = any(term in evidence for term in required_terms) if required_terms else True + has_conflict = any(term in evidence for term in conflicting_terms) + + if has_conflict: + return RELEVANCE_IRRELEVANT + if label == RELEVANCE_EXACT and not has_required: + label = RELEVANCE_PARTIAL + + if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: + return RELEVANCE_IRRELEVANT + + return label + + @staticmethod + def _result_item_to_doc(item: Dict[str, Any]) -> Dict[str, Any]: + option_values = list(item.get("option_values") or []) + while len(option_values) < 3: + option_values.append("") + product = dict(item.get("product") or {}) + return { + "spu_id": item.get("spu_id"), + "title": product.get("title") or item.get("title"), + "vendor": product.get("vendor"), + "category_path": product.get("category"), + "category_name": product.get("category"), + "image_url": item.get("image_url") or product.get("image_url"), + "tags": product.get("tags") or [], + "skus": [ + { + "option1_value": option_values[0], + "option2_value": option_values[1], + "option3_value": option_values[2], + } + ], + } + + def _collect_label_issues( + self, + label: str, + query_profile: Dict[str, Any], + doc: Dict[str, Any], + ) -> List[str]: + evidence = self._doc_evidence_text(doc) + issues: List[str] = [] + category = normalize_text(query_profile.get("primary_category")) + allowed_categories = [ + normalize_text(item) + for item in query_profile.get("allowed_categories") or [] + if str(item).strip() + ] + + primary_category_match = True if not category else category in evidence + allowed_category_match = False if allowed_categories else primary_category_match + if allowed_categories: + allowed_category_match = any(signal in evidence for signal in allowed_categories) + + if label == RELEVANCE_EXACT and not primary_category_match: + if allowed_category_match: + issues.append("Exact missing primary category evidence") + else: + issues.append("Exact has category mismatch") + + if label == RELEVANCE_PARTIAL and not primary_category_match and not allowed_category_match: + issues.append("Partial has category mismatch") + + for attr in query_profile.get("required_attributes") or []: + if not isinstance(attr, dict): + continue + attr_name = normalize_text(attr.get("name")) + if attr_name not in {"color", "fit", "length", "type", "product_type", "material", "size", "gender", "style"}: + continue + required_terms = [normalize_text(item) for item in attr.get("required_terms") or [] if normalize_text(item)] + conflicting_terms = [normalize_text(item) for item in attr.get("conflicting_terms") or [] if normalize_text(item)] + has_required = any(term in evidence for term in required_terms) if required_terms else True + has_conflict = any(term in evidence for term in conflicting_terms) + + if has_conflict and label != RELEVANCE_IRRELEVANT: + issues.append(f"{label} conflicts on {attr_name}") + if label == RELEVANCE_EXACT and not has_required: + issues.append(f"Exact missing {attr_name}") + return issues + + def audit_live_query( + self, + query: str, + *, + top_k: int = 100, + language: str = "en", + auto_annotate: bool = True, + ) -> Dict[str, Any]: + live = self.evaluate_live_query(query=query, top_k=top_k, auto_annotate=auto_annotate, language=language) + query_profile = self.get_query_profile(query, force_refresh=False) + suspicious: List[Dict[str, Any]] = [] + + for item in live["results"]: + doc = self._result_item_to_doc(item) + issues = self._collect_label_issues(item["label"] or "", query_profile, doc) + suggested_label = self._apply_rule_based_label_guardrails(item["label"] or "", query_profile, doc) + if suggested_label != (item["label"] or ""): + issues = list(issues) + [f"Suggested relabel: {item['label']} -> {suggested_label}"] + if issues: + suspicious.append( + { + "rank": item["rank"], + "spu_id": item["spu_id"], + "title": item["title"], + "label": item["label"], + "suggested_label": suggested_label, + "issues": issues, + } + ) + + labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in live["results"] + ] + return { + "query": query, + "tenant_id": self.tenant_id, + "top_k": top_k, + "metrics": live["metrics"], + "distribution": label_distribution(labels), + "query_profile": query_profile, + "suspicious": suspicious, + "results": live["results"], + } + + def queries_from_file(self, path: Path) -> List[str]: + return [ + line.strip() + for line in path.read_text(encoding="utf-8").splitlines() + if line.strip() and not line.strip().startswith("#") + ] + + def corpus_docs(self, refresh: bool = False) -> List[Dict[str, Any]]: + if not refresh and self.store.has_corpus(self.tenant_id): + return self.store.get_corpus_docs(self.tenant_id) + + es_client = get_es_client().client + index_name = get_tenant_index_name(self.tenant_id) + docs: List[Dict[str, Any]] = [] + for hit in scan( + client=es_client, + index=index_name, + query={ + "_source": [ + "spu_id", + "title", + "vendor", + "category_path", + "category_name", + "image_url", + "skus", + "tags", + ], + "query": {"match_all": {}}, + }, + size=500, + preserve_order=False, + clear_scroll=True, + ): + source = dict(hit.get("_source") or {}) + source["spu_id"] = str(source.get("spu_id") or hit.get("_id") or "") + docs.append(source) + self.store.upsert_corpus_docs(self.tenant_id, docs) + return docs + + def full_corpus_rerank( + self, + query: str, + docs: Sequence[Dict[str, Any]], + batch_size: int = 24, + force_refresh: bool = False, + ) -> List[Dict[str, Any]]: + cached = {} if force_refresh else self.store.get_rerank_scores(self.tenant_id, query) + pending: List[Dict[str, Any]] = [doc for doc in docs if str(doc.get("spu_id")) not in cached] + if pending: + new_scores: Dict[str, float] = {} + for start in range(0, len(pending), batch_size): + batch = pending[start : start + batch_size] + scores = self._rerank_batch_with_retry(query=query, docs=batch) + if len(scores) != len(batch): + raise RuntimeError(f"rerank returned {len(scores)} scores for {len(batch)} docs") + for doc, score in zip(batch, scores): + new_scores[str(doc.get("spu_id"))] = float(score) + self.store.upsert_rerank_scores( + self.tenant_id, + query, + new_scores, + model_name="qwen3_vllm_score", + ) + cached.update(new_scores) + + ranked = [] + for doc in docs: + spu_id = str(doc.get("spu_id")) + ranked.append({"spu_id": spu_id, "score": float(cached.get(spu_id, float("-inf"))), "doc": doc}) + ranked.sort(key=lambda item: item["score"], reverse=True) + return ranked + + def _rerank_batch_with_retry(self, query: str, docs: Sequence[Dict[str, Any]]) -> List[float]: + if not docs: + return [] + doc_texts = [build_rerank_doc(doc) for doc in docs] + try: + scores, _meta = self.rerank_client.rerank(query=query, docs=doc_texts, normalize=False) + return scores + except Exception: + if len(docs) == 1: + return [-1.0] + if len(docs) <= 6: + scores: List[float] = [] + for doc in docs: + scores.extend(self._rerank_batch_with_retry(query, [doc])) + return scores + mid = len(docs) // 2 + left = self._rerank_batch_with_retry(query, docs[:mid]) + right = self._rerank_batch_with_retry(query, docs[mid:]) + return left + right + + def annotate_missing_labels( + self, + query: str, + docs: Sequence[Dict[str, Any]], + force_refresh: bool = False, + ) -> Dict[str, str]: + query_profile = self.get_query_profile(query, force_refresh=force_refresh) + labels = {} if force_refresh else self.store.get_labels(self.tenant_id, query) + missing_docs = [doc for doc in docs if str(doc.get("spu_id")) not in labels] + if not missing_docs: + return labels + + for start in range(0, len(missing_docs), self.label_client.batch_size): + batch = missing_docs[start : start + self.label_client.batch_size] + batch_pairs = self._classify_with_retry(query, query_profile, batch) + for sub_labels, raw_response, sub_batch in batch_pairs: + to_store = { + str(doc.get("spu_id")): self._apply_rule_based_label_guardrails(label, query_profile, doc) + for doc, label in zip(sub_batch, sub_labels) + } + self.store.upsert_labels( + self.tenant_id, + query, + to_store, + judge_model=self.label_client.model, + raw_response=raw_response, + ) + labels.update(to_store) + time.sleep(0.1) + return labels + + def _classify_with_retry( + self, + query: str, + query_profile: Dict[str, Any], + docs: Sequence[Dict[str, Any]], + ) -> List[Tuple[List[str], str, Sequence[Dict[str, Any]]]]: + if not docs: + return [] + try: + labels, raw_response = self.label_client.classify_batch(query, query_profile, docs) + return [(labels, raw_response, docs)] + except Exception: + if len(docs) == 1: + raise + mid = len(docs) // 2 + return self._classify_with_retry(query, query_profile, docs[:mid]) + self._classify_with_retry(query, query_profile, docs[mid:]) + + def build_query_annotation_set( + self, + query: str, + *, + search_depth: int = 1000, + rerank_depth: int = 10000, + annotate_search_top_k: int = 120, + annotate_rerank_top_k: int = 200, + language: str = "en", + force_refresh_rerank: bool = False, + force_refresh_labels: bool = False, + ) -> QueryBuildResult: + search_payload = self.search_client.search(query=query, size=search_depth, from_=0, language=language) + search_results = list(search_payload.get("results") or []) + corpus = self.corpus_docs(refresh=False) + full_rerank = self.full_corpus_rerank( + query=query, + docs=corpus, + force_refresh=force_refresh_rerank, + ) + rerank_depth_effective = min(rerank_depth, len(full_rerank)) + + pool_docs: Dict[str, Dict[str, Any]] = {} + for doc in search_results[:annotate_search_top_k]: + pool_docs[str(doc.get("spu_id"))] = doc + for item in full_rerank[:annotate_rerank_top_k]: + pool_docs[str(item["spu_id"])] = item["doc"] + + query_profile = self.get_query_profile(query, force_refresh=force_refresh_labels) + labels = self.annotate_missing_labels( + query=query, + docs=list(pool_docs.values()), + force_refresh=force_refresh_labels, + ) + + search_labeled_results: List[Dict[str, Any]] = [] + for rank, doc in enumerate(search_results, start=1): + spu_id = str(doc.get("spu_id")) + label = labels.get(spu_id) + search_labeled_results.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "rerank_score": None, + "label": label, + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + + rerank_top_results: List[Dict[str, Any]] = [] + for rank, item in enumerate(full_rerank[:rerank_depth_effective], start=1): + doc = item["doc"] + spu_id = str(item["spu_id"]) + rerank_top_results.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "rerank_score": round(float(item["score"]), 8), + "label": labels.get(spu_id), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + + top100_labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in search_labeled_results[:100] + ] + metrics = compute_query_metrics(top100_labels) + output_dir = ensure_dir(self.artifact_root / "query_builds") + run_id = f"{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + query)[:10]}" + output_json_path = output_dir / f"{run_id}.json" + payload = { + "run_id": run_id, + "created_at": utc_now_iso(), + "tenant_id": self.tenant_id, + "query": query, + "config_meta": requests.get("http://localhost:6002/admin/config/meta", timeout=20).json(), + "search_total": int(search_payload.get("total") or 0), + "search_depth_requested": search_depth, + "search_depth_effective": len(search_results), + "rerank_depth_requested": rerank_depth, + "rerank_depth_effective": rerank_depth_effective, + "corpus_size": len(corpus), + "annotation_pool": { + "annotate_search_top_k": annotate_search_top_k, + "annotate_rerank_top_k": annotate_rerank_top_k, + "pool_size": len(pool_docs), + }, + "query_profile": query_profile, + "metrics_top100": metrics, + "search_results": search_labeled_results, + "full_rerank_top": rerank_top_results, + } + output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + self.store.insert_build_run(run_id, self.tenant_id, query, output_json_path, payload["metrics_top100"]) + return QueryBuildResult( + query=query, + tenant_id=self.tenant_id, + search_total=int(search_payload.get("total") or 0), + search_depth=len(search_results), + rerank_corpus_size=len(corpus), + annotated_count=len(pool_docs), + output_json_path=output_json_path, + ) + + def evaluate_live_query( + self, + query: str, + top_k: int = 100, + auto_annotate: bool = True, + language: str = "en", + force_refresh_labels: bool = False, + ) -> Dict[str, Any]: + search_payload = self.search_client.search(query=query, size=max(top_k, 100), from_=0, language=language) + results = list(search_payload.get("results") or []) + if auto_annotate: + self.annotate_missing_labels(query=query, docs=results[:top_k], force_refresh=force_refresh_labels) + labels = self.store.get_labels(self.tenant_id, query) + labeled = [] + for rank, doc in enumerate(results[:top_k], start=1): + spu_id = str(doc.get("spu_id")) + labeled.append( + { + "rank": rank, + "spu_id": spu_id, + "title": build_display_title(doc), + "image_url": doc.get("image_url"), + "label": labels.get(spu_id), + "option_values": list(compact_option_values(doc.get("skus") or [])), + "product": compact_product_payload(doc), + } + ) + metric_labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in labeled + ] + return { + "query": query, + "tenant_id": self.tenant_id, + "top_k": top_k, + "metrics": compute_query_metrics(metric_labels), + "results": labeled, + "total": int(search_payload.get("total") or 0), + } + + def batch_evaluate( + self, + queries: Sequence[str], + *, + top_k: int = 100, + auto_annotate: bool = True, + language: str = "en", + force_refresh_labels: bool = False, + ) -> Dict[str, Any]: + per_query = [] + for query in queries: + live = self.evaluate_live_query( + query, + top_k=top_k, + auto_annotate=auto_annotate, + language=language, + force_refresh_labels=force_refresh_labels, + ) + labels = [ + item["label"] if item["label"] in VALID_LABELS else RELEVANCE_IRRELEVANT + for item in live["results"] + ] + per_query.append( + { + "query": live["query"], + "tenant_id": live["tenant_id"], + "top_k": live["top_k"], + "metrics": live["metrics"], + "distribution": label_distribution(labels), + "total": live["total"], + } + ) + aggregate = aggregate_metrics([item["metrics"] for item in per_query]) + aggregate_distribution = { + RELEVANCE_EXACT: sum(item["distribution"][RELEVANCE_EXACT] for item in per_query), + RELEVANCE_PARTIAL: sum(item["distribution"][RELEVANCE_PARTIAL] for item in per_query), + RELEVANCE_IRRELEVANT: sum(item["distribution"][RELEVANCE_IRRELEVANT] for item in per_query), + } + batch_id = f"batch_{utc_timestamp()}_{sha1_text(self.tenant_id + '|' + '|'.join(queries))[:10]}" + report_dir = ensure_dir(self.artifact_root / "batch_reports") + config_snapshot_path = report_dir / f"{batch_id}_config.json" + config_snapshot = requests.get("http://localhost:6002/admin/config", timeout=20).json() + config_snapshot_path.write_text(json.dumps(config_snapshot, ensure_ascii=False, indent=2), encoding="utf-8") + output_json_path = report_dir / f"{batch_id}.json" + report_md_path = report_dir / f"{batch_id}.md" + payload = { + "batch_id": batch_id, + "created_at": utc_now_iso(), + "tenant_id": self.tenant_id, + "queries": list(queries), + "top_k": top_k, + "aggregate_metrics": aggregate, + "aggregate_distribution": aggregate_distribution, + "per_query": per_query, + "config_snapshot_path": str(config_snapshot_path), + } + output_json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + report_md_path.write_text(render_batch_report_markdown(payload), encoding="utf-8") + self.store.insert_batch_run(batch_id, self.tenant_id, output_json_path, report_md_path, config_snapshot_path, payload) + return payload + + +def render_batch_report_markdown(payload: Dict[str, Any]) -> str: + lines = [ + "# Search Batch Evaluation", + "", + f"- Batch ID: {payload['batch_id']}", + f"- Created at: {payload['created_at']}", + f"- Tenant ID: {payload['tenant_id']}", + f"- Query count: {len(payload['queries'])}", + f"- Top K: {payload['top_k']}", + "", + "## Aggregate Metrics", + "", + ] + for key, value in sorted((payload.get("aggregate_metrics") or {}).items()): + lines.append(f"- {key}: {value}") + distribution = payload.get("aggregate_distribution") or {} + if distribution: + lines.extend( + [ + "", + "## Label Distribution", + "", + f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}", + f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}", + f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}", + ] + ) + lines.extend(["", "## Per Query", ""]) + for item in payload.get("per_query") or []: + lines.append(f"### {item['query']}") + lines.append("") + for key, value in sorted((item.get("metrics") or {}).items()): + lines.append(f"- {key}: {value}") + distribution = item.get("distribution") or {} + lines.append(f"- Exact: {distribution.get(RELEVANCE_EXACT, 0)}") + lines.append(f"- Partial: {distribution.get(RELEVANCE_PARTIAL, 0)}") + lines.append(f"- Irrelevant: {distribution.get(RELEVANCE_IRRELEVANT, 0)}") + lines.append("") + return "\n".join(lines) + + +class SearchEvalRequest(BaseModel): + query: str + top_k: int = Field(default=100, ge=1, le=500) + auto_annotate: bool = True + language: str = "en" + + +class BatchEvalRequest(BaseModel): + queries: Optional[List[str]] = None + top_k: int = Field(default=100, ge=1, le=500) + auto_annotate: bool = True + language: str = "en" + force_refresh_labels: bool = False + + +def create_web_app(framework: SearchEvaluationFramework, query_file: Path = DEFAULT_QUERY_FILE) -> FastAPI: + app = FastAPI(title="Search Evaluation UI", version="1.0.0") + + @app.get("/", response_class=HTMLResponse) + def home() -> str: + return WEB_APP_HTML + + @app.get("/api/queries") + def api_queries() -> Dict[str, Any]: + return {"queries": framework.queries_from_file(query_file)} + + @app.post("/api/search-eval") + def api_search_eval(request: SearchEvalRequest) -> Dict[str, Any]: + return framework.evaluate_live_query( + query=request.query, + top_k=request.top_k, + auto_annotate=request.auto_annotate, + language=request.language, + ) + + @app.post("/api/batch-eval") + def api_batch_eval(request: BatchEvalRequest) -> Dict[str, Any]: + queries = request.queries or framework.queries_from_file(query_file) + if not queries: + raise HTTPException(status_code=400, detail="No queries provided") + return framework.batch_evaluate( + queries=queries, + top_k=request.top_k, + auto_annotate=request.auto_annotate, + language=request.language, + force_refresh_labels=request.force_refresh_labels, + ) + + @app.get("/api/history") + def api_history() -> Dict[str, Any]: + return {"history": framework.store.list_batch_runs(limit=20)} + + return app + + +WEB_APP_HTML = """ + + + + + + Search Evaluation + + + +
+ +
+

Search Evaluation

+

Single-query evaluation and batch evaluation share the same service on port 6010.

+
+ + + +
+
+
+

Metrics

+
+
+
+

Top Results

+
+
+
+
+ + + +""" + + +def build_cli_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Search evaluation annotation builder and web UI") + sub = parser.add_subparsers(dest="command", required=True) + + build = sub.add_parser("build", help="Build pooled annotation set for queries") + build.add_argument("--tenant-id", default="163") + build.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + build.add_argument("--search-depth", type=int, default=1000) + build.add_argument("--rerank-depth", type=int, default=10000) + build.add_argument("--annotate-search-top-k", type=int, default=120) + build.add_argument("--annotate-rerank-top-k", type=int, default=200) + build.add_argument("--language", default="en") + build.add_argument("--force-refresh-rerank", action="store_true") + build.add_argument("--force-refresh-labels", action="store_true") + + batch = sub.add_parser("batch", help="Run batch evaluation against live search") + batch.add_argument("--tenant-id", default="163") + batch.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + batch.add_argument("--top-k", type=int, default=100) + batch.add_argument("--language", default="en") + batch.add_argument("--force-refresh-labels", action="store_true") + + audit = sub.add_parser("audit", help="Audit annotation quality for queries") + audit.add_argument("--tenant-id", default="163") + audit.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + audit.add_argument("--top-k", type=int, default=100) + audit.add_argument("--language", default="en") + audit.add_argument("--limit-suspicious", type=int, default=5) + audit.add_argument("--force-refresh-labels", action="store_true") + + serve = sub.add_parser("serve", help="Serve evaluation web UI on port 6010") + serve.add_argument("--tenant-id", default="163") + serve.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + serve.add_argument("--host", default="0.0.0.0") + serve.add_argument("--port", type=int, default=6010) + + return parser + + +def run_build(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id) + queries = framework.queries_from_file(Path(args.queries_file)) + summary = [] + for query in queries: + result = framework.build_query_annotation_set( + query=query, + search_depth=args.search_depth, + rerank_depth=args.rerank_depth, + annotate_search_top_k=args.annotate_search_top_k, + annotate_rerank_top_k=args.annotate_rerank_top_k, + language=args.language, + force_refresh_rerank=args.force_refresh_rerank, + force_refresh_labels=args.force_refresh_labels, + ) + summary.append( + { + "query": result.query, + "search_total": result.search_total, + "search_depth": result.search_depth, + "rerank_corpus_size": result.rerank_corpus_size, + "annotated_count": result.annotated_count, + "output_json_path": str(result.output_json_path), + } + ) + print( + f"[build] query={result.query!r} search_total={result.search_total} " + f"search_depth={result.search_depth} corpus={result.rerank_corpus_size} " + f"annotated={result.annotated_count} output={result.output_json_path}" + ) + out_path = ensure_dir(framework.artifact_root / "query_builds") / f"build_summary_{utc_timestamp()}.json" + out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"[done] summary={out_path}") + + +def run_batch(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id) + queries = framework.queries_from_file(Path(args.queries_file)) + payload = framework.batch_evaluate( + queries=queries, + top_k=args.top_k, + auto_annotate=True, + language=args.language, + force_refresh_labels=args.force_refresh_labels, + ) + print(f"[done] batch_id={payload['batch_id']} aggregate_metrics={payload['aggregate_metrics']}") + + +def run_audit(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id) + queries = framework.queries_from_file(Path(args.queries_file)) + audit_items = [] + for query in queries: + item = framework.audit_live_query( + query=query, + top_k=args.top_k, + language=args.language, + auto_annotate=not args.force_refresh_labels, + ) + if args.force_refresh_labels: + live_payload = framework.search_client.search(query=query, size=max(args.top_k, 100), from_=0, language=args.language) + framework.annotate_missing_labels( + query=query, + docs=list(live_payload.get("results") or [])[: args.top_k], + force_refresh=True, + ) + item = framework.audit_live_query( + query=query, + top_k=args.top_k, + language=args.language, + auto_annotate=False, + ) + audit_items.append( + { + "query": query, + "metrics": item["metrics"], + "distribution": item["distribution"], + "suspicious_count": len(item["suspicious"]), + "suspicious_examples": item["suspicious"][: args.limit_suspicious], + } + ) + print( + f"[audit] query={query!r} suspicious={len(item['suspicious'])} metrics={item['metrics']}" + ) + + summary = { + "created_at": utc_now_iso(), + "tenant_id": args.tenant_id, + "top_k": args.top_k, + "query_count": len(queries), + "total_suspicious": sum(item["suspicious_count"] for item in audit_items), + "queries": audit_items, + } + out_path = ensure_dir(framework.artifact_root / "audits") / f"audit_{utc_timestamp()}.json" + out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"[done] audit={out_path}") + + +def run_serve(args: argparse.Namespace) -> None: + framework = SearchEvaluationFramework(tenant_id=args.tenant_id) + app = create_web_app(framework, Path(args.queries_file)) + import uvicorn + + uvicorn.run(app, host=args.host, port=args.port, log_level="info") + + +def main() -> None: + parser = build_cli_parser() + args = parser.parse_args() + if args.command == "build": + run_build(args) + return + if args.command == "batch": + run_batch(args) + return + if args.command == "audit": + run_audit(args) + return + if args.command == "serve": + run_serve(args) + return + raise SystemExit(f"unknown command: {args.command}") + + +if __name__ == "__main__": + main() diff --git a/scripts/evaluation/serve_eval_web.py b/scripts/evaluation/serve_eval_web.py new file mode 100644 index 0000000..409906f --- /dev/null +++ b/scripts/evaluation/serve_eval_web.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +from pathlib import Path +import sys + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from scripts.evaluation.eval_framework import main + + +if __name__ == "__main__": + main() diff --git a/scripts/evaluation/tune_fusion.py b/scripts/evaluation/tune_fusion.py new file mode 100644 index 0000000..de40a49 --- /dev/null +++ b/scripts/evaluation/tune_fusion.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import copy +import json +import re +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List + +import requests +import yaml + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from scripts.evaluation.eval_framework import ( + DEFAULT_ARTIFACT_ROOT, + DEFAULT_QUERY_FILE, + ensure_dir, + utc_now_iso, + utc_timestamp, +) + + +CONFIG_PATH = PROJECT_ROOT / "config" / "config.yaml" + + +@dataclass +class ExperimentSpec: + name: str + description: str + params: Dict[str, Any] + + +def load_yaml(path: Path) -> Dict[str, Any]: + return yaml.safe_load(path.read_text(encoding="utf-8")) + + +def write_yaml(path: Path, payload: Dict[str, Any]) -> None: + path.write_text( + yaml.safe_dump(payload, sort_keys=False, allow_unicode=True), + encoding="utf-8", + ) + + +def set_nested_value(payload: Dict[str, Any], dotted_path: str, value: Any) -> None: + current = payload + parts = dotted_path.split(".") + for part in parts[:-1]: + current = current[part] + current[parts[-1]] = value + + +def apply_params(base_config: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]: + candidate = copy.deepcopy(base_config) + for dotted_path, value in params.items(): + set_nested_value(candidate, dotted_path, value) + return candidate + + +def wait_for_backend(base_url: str, timeout_sec: float = 300.0) -> Dict[str, Any]: + deadline = time.time() + timeout_sec + last_error = None + while time.time() < deadline: + try: + response = requests.get(f"{base_url.rstrip('/')}/health", timeout=10) + response.raise_for_status() + payload = response.json() + if str(payload.get("status")) == "healthy": + return payload + last_error = payload + except Exception as exc: # noqa: BLE001 + last_error = str(exc) + time.sleep(2.0) + raise RuntimeError(f"backend did not become healthy: {last_error}") + + +def run_restart() -> None: + subprocess.run(["./restart.sh", "backend"], cwd=PROJECT_ROOT, check=True, timeout=600) + + +def read_queries(path: Path) -> List[str]: + return [ + line.strip() + for line in path.read_text(encoding="utf-8").splitlines() + if line.strip() and not line.strip().startswith("#") + ] + + +def run_batch_eval( + *, + tenant_id: str, + queries_file: Path, + top_k: int, + language: str, + force_refresh_labels: bool, +) -> Dict[str, Any]: + cmd = [ + str(PROJECT_ROOT / ".venv" / "bin" / "python"), + "scripts/evaluation/build_annotation_set.py", + "batch", + "--tenant-id", + str(tenant_id), + "--queries-file", + str(queries_file), + "--top-k", + str(top_k), + "--language", + language, + ] + if force_refresh_labels: + cmd.append("--force-refresh-labels") + completed = subprocess.run( + cmd, + cwd=PROJECT_ROOT, + check=True, + capture_output=True, + text=True, + timeout=7200, + ) + output = (completed.stdout or "") + "\n" + (completed.stderr or "") + match = re.search(r"batch_id=([A-Za-z0-9_]+)\s+aggregate_metrics=(\{.*\})", output) + if not match: + raise RuntimeError(f"failed to parse batch output: {output[-2000:]}") + batch_id = match.group(1) + aggregate_metrics = json.loads(match.group(2).replace("'", '"')) + return { + "batch_id": batch_id, + "aggregate_metrics": aggregate_metrics, + "raw_output": output, + } + + +def render_markdown(summary: Dict[str, Any]) -> str: + lines = [ + "# Fusion Tuning Report", + "", + f"- Created at: {summary['created_at']}", + f"- Tenant ID: {summary['tenant_id']}", + f"- Query count: {summary['query_count']}", + f"- Top K: {summary['top_k']}", + f"- Score metric: {summary['score_metric']}", + "", + "## Experiments", + "", + "| Rank | Name | Score | MAP_3 | MAP_2_3 | P@5 | P@10 | Config |", + "|---|---|---:|---:|---:|---:|---:|---|", + ] + for idx, item in enumerate(summary["experiments"], start=1): + metrics = item["aggregate_metrics"] + lines.append( + "| " + + " | ".join( + [ + str(idx), + item["name"], + str(item["score"]), + str(metrics.get("MAP_3", "")), + str(metrics.get("MAP_2_3", "")), + str(metrics.get("P@5", "")), + str(metrics.get("P@10", "")), + item["config_snapshot_path"], + ] + ) + + " |" + ) + lines.extend(["", "## Details", ""]) + for item in summary["experiments"]: + lines.append(f"### {item['name']}") + lines.append("") + lines.append(f"- Description: {item['description']}") + lines.append(f"- Score: {item['score']}") + lines.append(f"- Params: `{json.dumps(item['params'], ensure_ascii=False, sort_keys=True)}`") + lines.append(f"- Batch report: {item['batch_report_path']}") + lines.append("") + return "\n".join(lines) + + +def load_experiments(path: Path) -> List[ExperimentSpec]: + payload = json.loads(path.read_text(encoding="utf-8")) + items = payload["experiments"] if isinstance(payload, dict) else payload + experiments: List[ExperimentSpec] = [] + for item in items: + experiments.append( + ExperimentSpec( + name=str(item["name"]), + description=str(item.get("description") or ""), + params=dict(item.get("params") or {}), + ) + ) + return experiments + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run fusion tuning experiments against the live backend") + parser.add_argument("--tenant-id", default="163") + parser.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + parser.add_argument("--top-k", type=int, default=100) + parser.add_argument("--language", default="en") + parser.add_argument("--experiments-file", required=True) + parser.add_argument("--search-base-url", default="http://127.0.0.1:6002") + parser.add_argument("--score-metric", default="MAP_3") + parser.add_argument("--apply-best", action="store_true") + parser.add_argument("--force-refresh-labels-first-pass", action="store_true") + return parser + + +def main() -> None: + args = build_parser().parse_args() + queries_file = Path(args.queries_file) + queries = read_queries(queries_file) + base_config_text = CONFIG_PATH.read_text(encoding="utf-8") + base_config = load_yaml(CONFIG_PATH) + experiments = load_experiments(Path(args.experiments_file)) + + tuning_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "tuning_runs") + run_id = f"tuning_{utc_timestamp()}" + run_dir = ensure_dir(tuning_dir / run_id) + results: List[Dict[str, Any]] = [] + + try: + for experiment in experiments: + candidate = apply_params(base_config, experiment.params) + write_yaml(CONFIG_PATH, candidate) + candidate_config_path = run_dir / f"{experiment.name}_config.yaml" + write_yaml(candidate_config_path, candidate) + + run_restart() + health = wait_for_backend(args.search_base_url) + batch_result = run_batch_eval( + tenant_id=args.tenant_id, + queries_file=queries_file, + top_k=args.top_k, + language=args.language, + force_refresh_labels=bool(args.force_refresh_labels_first_pass and not results), + ) + aggregate_metrics = dict(batch_result["aggregate_metrics"]) + results.append( + { + "name": experiment.name, + "description": experiment.description, + "params": experiment.params, + "aggregate_metrics": aggregate_metrics, + "score": float(aggregate_metrics.get(args.score_metric, 0.0)), + "batch_id": batch_result["batch_id"], + "batch_report_path": str( + DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_result['batch_id']}.md" + ), + "config_snapshot_path": str(candidate_config_path), + "backend_health": health, + "batch_stdout": batch_result["raw_output"], + } + ) + print( + f"[tune] {experiment.name} score={aggregate_metrics.get(args.score_metric)} " + f"metrics={aggregate_metrics}" + ) + finally: + if args.apply_best and results: + best = max(results, key=lambda item: item["score"]) + best_config = apply_params(base_config, best["params"]) + write_yaml(CONFIG_PATH, best_config) + run_restart() + wait_for_backend(args.search_base_url) + else: + CONFIG_PATH.write_text(base_config_text, encoding="utf-8") + run_restart() + wait_for_backend(args.search_base_url) + + results.sort(key=lambda item: item["score"], reverse=True) + summary = { + "run_id": run_id, + "created_at": utc_now_iso(), + "tenant_id": args.tenant_id, + "query_count": len(queries), + "top_k": args.top_k, + "score_metric": args.score_metric, + "experiments": results, + } + summary_json_path = run_dir / "summary.json" + summary_md_path = run_dir / "summary.md" + summary_json_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + summary_md_path.write_text(render_markdown(summary), encoding="utf-8") + print(f"[done] summary_json={summary_json_path}") + print(f"[done] summary_md={summary_md_path}") + + +if __name__ == "__main__": + main() -- libgit2 0.21.2