diff --git a/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.cmd b/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.cmd new file mode 100644 index 0000000..66a1d19 --- /dev/null +++ b/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.cmd @@ -0,0 +1 @@ +python scripts/evaluation/tune_fusion.py --mode optimize --run-name coarse_fusion_long_001 --search-space scripts/evaluation/tuning/coarse_rank_fusion_space.yaml --seed-report artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md --tenant-id 163 --queries-file scripts/evaluation/queries/queries.txt --top-k 100 --language en --search-base-url http://127.0.0.1:6002 --eval-web-base-url http://127.0.0.1:6010 --max-evals 400 --batch-size 3 --candidate-pool-size 512 --random-seed 20260416 diff --git a/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.pid b/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.pid new file mode 100644 index 0000000..d3496cd --- /dev/null +++ b/artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.pid @@ -0,0 +1 @@ +2218620 diff --git a/config/config.yaml b/config/config.yaml index 65936a5..00acb01 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -256,9 +256,11 @@ coarse_rank: knn_text_weight: 1.0 knn_image_weight: 2.0 knn_tie_breaker: 0.3 - knn_bias: 0.0 + knn_bias: 0.2 knn_exponent: 5.6 + knn_text_bias: 0.2 knn_text_exponent: 0.0 + knn_image_bias: 0.2 knn_image_exponent: 0.0 fine_rank: enabled: false # false 时保序透传 @@ -649,4 +651,4 @@ tenant_config: primary_language: en index_languages: - en - - zh + - zh \ No newline at end of file diff --git a/docs/caches-inventory.md b/docs/caches-inventory.md index 3838279..39457b2 100644 --- a/docs/caches-inventory.md +++ b/docs/caches-inventory.md @@ -96,9 +96,22 @@ | `scripts/redis/redis_cache_prefix_stats.py` | 按前缀统计 key 数量与 **MEMORY USAGE**(可多 DB) | | `scripts/redis/redis_memory_heavy_keys.py` | 扫描占用内存最大的 key,辅助排查「统计与总内存不一致」 | | `scripts/redis/monitor_eviction.py` | 实时监控 **eviction** 相关事件,用于容量与驱逐策略排查 | +| `scripts/redis/purge_caches.py` | 一键清空业务缓存:embedding(含 `:image:` / `:clip_text:`)、anchors、translation;**默认跳过 `trans:deepl*`**(可 dry-run 预览) | 使用前需加载项目配置(如 `source activate.sh`)以保证 `REDIS_CONFIG` 与生产一致。脚本注释中给出了 **`redis-cli` 手工统计**示例(按前缀 `wc -l`、`MEMORY STATS` 等)。 +### 快速清空(排除 `trans:deepl*`) + +```bash +source activate.sh + +# 先预览会删多少 key(推荐) +python scripts/redis/purge_caches.py --dry-run + +# 真正删除(默认 db=0) +python scripts/redis/purge_caches.py +``` + --- ## 六、总表(Redis 与各层缓存) @@ -106,8 +119,8 @@ | 缓存名称 | 业务模块 | 存储 | Key 前缀 / 命名模式 | 过期时间 | 过期策略 | 值摘要 | 配置键 / 环境变量 | |----------|----------|------|---------------------|----------|----------|--------|-------------------| | 文本向量 | 检索 / 索引 / Embedding 服务 | Redis db≈0 | `{embedding_cache_prefix}:*`(逻辑键以 `embed:norm…` 开头) | `cache_expire_days`(默认 720 天) | 写入 TTL + 命中滑动续期 | BF16 字节向量 | `infrastructure.redis.*`;`REDIS_EMBEDDING_CACHE_PREFIX`、`REDIS_CACHE_EXPIRE_DAYS` | -| 图像向量(CLIP 图) | 图搜 / 多模态 | 同上 | `{prefix}:image:*` | 同上 | 同上 | BF16 字节 | 同上 | -| CLIP 文本塔向量 | 图搜文本侧 | 同上 | `{prefix}:clip_text:*` | 同上 | 同上 | BF16 字节 | 同上 | +| 图像向量(CLIP 图) | 图搜 / 多模态 | 同上 | `{embedding_cache_prefix}:image:*`(其中 `{embedding_cache_prefix}` 默认 `embedding`) | 同上 | 同上 | BF16 字节 | 同上 | +| CLIP 文本塔向量 | 图搜文本侧 | 同上 | `{embedding_cache_prefix}:clip_text:*`(其中 `{embedding_cache_prefix}` 默认 `embedding`) | 同上 | 同上 | BF16 字节 | 同上 | | 翻译译文 | 查询翻译、翻译服务 | 同上 | `trans:{model}:{lang}:*` | `services.translation.cache.ttl_seconds`(默认 720 天) | 可配置滑动(`sliding_expiration`) | UTF-8 字符串 | `services.translation.cache.*`;各能力 `use_cache` | | 商品分析 / Anchors | 索引富化、LLM 内容理解 | 同上 | `{anchor_cache_prefix}:{kind}:{hash}:{lang}:*` | `anchor_cache_expire_days`(默认 30 天) | 固定 TTL,不滑动 | JSON 字符串 | `anchor_cache_prefix`、`anchor_cache_expire_days`;`REDIS_ANCHOR_*` | | 应用配置 | 全栈 | 进程内存 | N/A(单例) | 进程生命周期 | `reload_app_config` 清除 | `AppConfig` 对象 | `config/loader.py` | diff --git a/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env b/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env deleted file mode 100644 index 336fe02..0000000 --- a/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env +++ /dev/null @@ -1,25 +0,0 @@ -需求: -目前160条结果(rerank_window: 160)会进入重排,重排中 文本和图片向量的相关性,都会作为融合公式的因子之一(粗排和reranker都有): -knn_score -text_knn -image_knn -text_factor -knn_factor -但是文本向量召回和图片向量召回,是使用 KNN 索引召回的方式,并不是所有结果都有这两个得分,这两项得分都有为0的。 -为了解决这个问题,有一个方法是对最终能进入重排的 160 条,看其中还有哪些分别缺失文本和图片向量召回的得分,再通过某种方式让 ES 去算,或者从 ES 把向量拉回来,自己算,或者在召回的时候请求 ES 的时候,就通过某种设定,确保前面的若干条都带有这两个分数,不知道有哪些方法,我感觉这些方法都不太好,请你思考一下 - -考虑的一个方案: -想在“第一次 ES 搜索”里,只对 topN 补向量精算,考虑 rescore 或 retriever.rescorer的方案(官方明确支持多段 rescore/支持 score_mode: multiply,甚至示例里就有 function_score/script_score 放进 rescore 的写法。) -这意味着你完全可以: -初检仍然用现在的 lexical + text knn + image knn 召回候选 -对 window_size=160 做 rescore -用 exact script_score 给 top160 补 text/image vector 分 -顺手把你现在本地 coarse 融合迁回 ES - -export ES_AUTH="saas:4hOaLaf41y2VuI8y" -export ES="http://127.0.0.1:9200" -"index":"search_products_tenant_163" - -有个细节暴露出来了:dotProduct() 这类向量函数在 script_score 评分上下文能用,但在 script_fields 取字段上下文里不认。所以如果我们要把 exact 分顺手回传给 rerank,用 script_fields 的话得自己写数组循环,不能直接调向量内建函数。 - -重排打分公式需要的base_query base_query_trans_zh knn_query image_knn_query还能不能拿到?请你考虑,尽量想想如何得到这些打分,如果实在拿不到去想替代的办法比如简化打分公式。 diff --git a/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env.md b/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env.md new file mode 100644 index 0000000..336fe02 --- /dev/null +++ b/docs/issues/issue-2026-04-14-粗排流程放入ES-TODO-env.md @@ -0,0 +1,25 @@ +需求: +目前160条结果(rerank_window: 160)会进入重排,重排中 文本和图片向量的相关性,都会作为融合公式的因子之一(粗排和reranker都有): +knn_score +text_knn +image_knn +text_factor +knn_factor +但是文本向量召回和图片向量召回,是使用 KNN 索引召回的方式,并不是所有结果都有这两个得分,这两项得分都有为0的。 +为了解决这个问题,有一个方法是对最终能进入重排的 160 条,看其中还有哪些分别缺失文本和图片向量召回的得分,再通过某种方式让 ES 去算,或者从 ES 把向量拉回来,自己算,或者在召回的时候请求 ES 的时候,就通过某种设定,确保前面的若干条都带有这两个分数,不知道有哪些方法,我感觉这些方法都不太好,请你思考一下 + +考虑的一个方案: +想在“第一次 ES 搜索”里,只对 topN 补向量精算,考虑 rescore 或 retriever.rescorer的方案(官方明确支持多段 rescore/支持 score_mode: multiply,甚至示例里就有 function_score/script_score 放进 rescore 的写法。) +这意味着你完全可以: +初检仍然用现在的 lexical + text knn + image knn 召回候选 +对 window_size=160 做 rescore +用 exact script_score 给 top160 补 text/image vector 分 +顺手把你现在本地 coarse 融合迁回 ES + +export ES_AUTH="saas:4hOaLaf41y2VuI8y" +export ES="http://127.0.0.1:9200" +"index":"search_products_tenant_163" + +有个细节暴露出来了:dotProduct() 这类向量函数在 script_score 评分上下文能用,但在 script_fields 取字段上下文里不认。所以如果我们要把 exact 分顺手回传给 rerank,用 script_fields 的话得自己写数组循环,不能直接调向量内建函数。 + +重排打分公式需要的base_query base_query_trans_zh knn_query image_knn_query还能不能拿到?请你考虑,尽量想想如何得到这些打分,如果实在拿不到去想替代的办法比如简化打分公式。 diff --git a/docs/issues/issue-2026-04-16-bayes寻参-TODO.md b/docs/issues/issue-2026-04-16-bayes寻参-TODO.md new file mode 100644 index 0000000..66c1c0c --- /dev/null +++ b/docs/issues/issue-2026-04-16-bayes寻参-TODO.md @@ -0,0 +1,136 @@ + +我以前经过过一轮调参,是基于54个评测样本(queries.txt),过程中发现的最优的参数是这一组: +0.641241 {'es_bias': '7.214', 'es_exponent': '0.2025', 'text_bias': '4.0', 'text_exponent': '1.584', 'text_translation_weight': '1.4441', 'knn_text_weight': '0.1', 'knn_image_weight': '5.6232', 'knn_tie_breaker': + '0.021', 'knn_bias': '0.0019', 'knn_exponent': '11.8477', 'knn_text_bias': '2.3125', 'knn_text_exponent': '1.1547', 'knn_image_bias': '0.9641', 'knn_image_exponent': '5.8671'} + +这一组参数分布比较极端,text_bias太大(文本项得分事0~1的,加上4被稀释的很大),图片的exponent太大,不过在这个数据集上面确实是最好的,我觉得有过拟合的可能,因此要扩大数据集,先扩展标注集,然后使用扩展的标注集,继续进行寻参。 +因为标注任务和寻参任务耗时都比较长,请你写好一个脚本,内部先启动标注任务,然后再启动寻参任务,把任务跑起来,程序已经正常跑起来了、运转正常,你才可以退出,以后等程序跑完了应该能拿到寻参结果,下次你可以结合程序执行的结果进行结论分析。 + + +以前的一轮调参: +我当时的调参需求: + +请对coarse_rank fusion公式进行调参: + 目前的baseline是这一组,Primary_Metric_Score: 0.637642: + coarse_rank: + ... + fusion: + es_bias: 10.0 + es_exponent: 0.05 + text_bias: 0.1 + text_exponent: 0.35 + text_translation_weight: 1.0 + knn_text_weight: 1.0 + knn_image_weight: 2.0 + knn_tie_breaker: 0.3 + knn_bias: 0.2 + knn_exponent: 5.6 + knn_text_bias: 0.2 + knn_text_exponent: 0.0 + knn_image_bias: 0.2 + knn_image_exponent: 0.0 + 评估指标在/data/saas-search/artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md + 请以这个为基准,发散思维,进行宽一点的范围调参。因为每次重启、评测都需要几分钟,请写一个调参的框架,基于框架、设定好多组参数、写好脚本,每轮收集效果自动的调整参数分布(因为笛卡尔积、多种参数调参成本太高,因此考虑贝叶斯调参等方法通 + 过多轮迭代通过脚本自动收敛) + 每次调整参数后需要重启backend (有时候重启backend后eval-web好像也挂了,如果有可以追查原因并修复) + ./restart.sh backend + 注意:请你调试脚本、进行一轮分析,最终要沉淀一套调参脚本,我下次可以重新跑(还是针对这组参数调参),能自动迭代(调整参数分布)、收集每组参数的指标、调优到最优的参数组合。 + + + +你给出的调参脚本: + + 长时间自动调参脚本已经配好,核心文件是: + + - 调参主程序:scripts/evaluation/tune_fusion.py + - 宽范围搜索空间:scripts/evaluation/tuning/coarse_rank_fusion_space.yaml + - 后台长跑启动脚本:scripts/evaluation/start_coarse_fusion_tuning_long.sh + - 续跑脚本:scripts/evaluation/resume_coarse_fusion_tuning_long.sh + - 使用说明:scripts/evaluation/tuning/README.md + + 这套方案不是笛卡尔积穷举,而是“种子实验 + 随机探索 + 基于历史最优分布的自适应采样”反复迭代。搜索范围已经放宽,新增了多组偏离 baseline 很远的 seed,也保留了 baseline 附近抖动,适合长时间慢慢收敛。 + + 直接启动一轮: + + ./scripts/evaluation/start_coarse_fusion_tuning_long.sh + + 如果想跑更久一点: + + MAX_EVALS=48 BATCH_SIZE=3 CANDIDATE_POOL_SIZE=512 RUN_NAME=coarse_fusion_long_001 \ + ./scripts/evaluation/start_coarse_fusion_tuning_long.sh + + 看日志: + + tail -f artifacts/search_evaluation/tuning_launches/.log + + 看结果目录: + + ls artifacts/search_evaluation/tuning_runs// + + 续跑: + + ./scripts/evaluation/resume_coarse_fusion_tuning_long.sh + + 每轮会自动做这些事: + + - 生成一批候选 coarse_rank.fusion 参数 + - 写入配置并重启 backend + - 检查 eval-web,必要时自愈拉起 + - 跑评测并收集 Primary_Metric_Score + - 更新 trials.jsonl、leaderboard.csv、summary.md + - 基于历史结果调整下一轮采样分布 + + 你先跑一轮长任务。跑完后把 run_name 或 scripts/evaluation/tuning/README.md 里对应实验目录发我,我再基于结果做下一步分析和收缩搜索空间。 + + + + +› 我运行了:MAX_EVALS=400 BATCH_SIZE=3 CANDIDATE_POOL_SIZE=512 RUN_NAME=coarse_fusion_long_001 ./scripts/evaluation/start_coarse_fusion_tuning_long.sh + + run_name=coarse_fusion_long_001 + pid=2218620 + log=artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.log + pid_file=artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.pid + cmd_file=artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.cmd + run_dir=artifacts/search_evaluation/tuning_runs/coarse_fusion_long_001 + + tail -f artifacts/search_evaluation/tuning_launches/coarse_fusion_long_001.log + cat artifacts/search_evaluation/tuning_runs/coarse_fusion_long_001/leaderboard.csv + + +• 我先看这次长跑的日志和当前排行榜,确认任务是否正常推进、是否有异常中断,以及目前最优参数落在哪个方向。 + +• Explored + └ Read coarse_fusion_long_001.log, leaderboard.csv + List coarse_fusion_long_001 + Read summary.md + +─── + +• 结果已经足够看出明显趋势了。我再补一层聚合分析,看看 top 区间里哪些参数方向反复出现,避免只盯着单个最优点。 + +• Ran ps -p 2218620 -o pid=,etimes=,cmd= + └ (no output) + +• Ran python - <<'PY' + │ import csv, statistics, math + │ from pathlib import Path + │ … +15 lines + └ rows 385 + + … +61 lines (ctrl + t to view transcript) + knn_image_bias: mean=0.5631 median=0.2000 min=0.0010 max=4.0000 + knn_image_exponent: mean=4.6316 median=5.8680 min=0.0000 max=6.0000 + +• Ran python - <<'PY' + │ import csv + │ from pathlib import Path + │ … +19 lines + └ baseline_score 0.637642 + + … +30 lines (ctrl + t to view transcript) + 10 bo_234 0.641241 {'es_bias': '7.214', 'es_exponent': '0.2025', 'text_bias': '4.0', 'text_exponent': '1.584', 'text_translation_weight': '1.4441', 'knn_text_weight': '0.1', 'knn_image_weight': '5.6232', 'knn_tie_breaker': + '0.021', 'knn_bias': '0.0019', 'knn_exponent': '11.8477', 'knn_text_bias': '2.3125', 'knn_text_exponent': '1.1547', 'knn_image_bias': '0.9641', 'knn_image_exponent': '5.8671'} + +这一次因为外部原因(磁盘满)终止了,以上是最好的一组参数。 + diff --git a/scripts/evaluation/resume_coarse_fusion_tuning_long.sh b/scripts/evaluation/resume_coarse_fusion_tuning_long.sh new file mode 100755 index 0000000..8171454 --- /dev/null +++ b/scripts/evaluation/resume_coarse_fusion_tuning_long.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +set -euo pipefail + +if [ "$#" -lt 1 ]; then + echo "usage: $0 [extra tune_fusion args...]" >&2 + exit 1 +fi + +cd "$(dirname "$0")/../.." +source ./activate.sh + +TARGET="$1" +shift + +if [ -d "${TARGET}" ]; then + RUN_DIR="${TARGET}" + RUN_NAME="$(basename "${RUN_DIR}")" +else + RUN_NAME="${TARGET}" + RUN_DIR="artifacts/search_evaluation/tuning_runs/${RUN_NAME}" +fi + +if [ ! -d "${RUN_DIR}" ]; then + echo "run dir not found: ${RUN_DIR}" >&2 + exit 1 +fi + +MAX_EVALS="${MAX_EVALS:-36}" +BATCH_SIZE="${BATCH_SIZE:-3}" +CANDIDATE_POOL_SIZE="${CANDIDATE_POOL_SIZE:-512}" + +LAUNCH_DIR="artifacts/search_evaluation/tuning_launches" +mkdir -p "${LAUNCH_DIR}" +LOG_PATH="${LAUNCH_DIR}/${RUN_NAME}.resume.log" +PID_PATH="${LAUNCH_DIR}/${RUN_NAME}.resume.pid" +CMD_PATH="${LAUNCH_DIR}/${RUN_NAME}.resume.cmd" + +CMD=( + python + scripts/evaluation/tune_fusion.py + --mode optimize + --resume-run "${RUN_DIR}" + --search-space "${RUN_DIR}/search_space.yaml" + --seed-report artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md + --tenant-id 163 + --queries-file scripts/evaluation/queries/queries.txt + --top-k 100 + --language en + --search-base-url http://127.0.0.1:6002 + --eval-web-base-url http://127.0.0.1:6010 + --max-evals "${MAX_EVALS}" + --batch-size "${BATCH_SIZE}" + --candidate-pool-size "${CANDIDATE_POOL_SIZE}" +) + +if [ "$#" -gt 0 ]; then + CMD+=("$@") +fi + +printf '%q ' "${CMD[@]}" > "${CMD_PATH}" +printf '\n' >> "${CMD_PATH}" + +nohup "${CMD[@]}" > "${LOG_PATH}" 2>&1 & +PID=$! +echo "${PID}" > "${PID_PATH}" + +echo "run_name=${RUN_NAME}" +echo "pid=${PID}" +echo "log=${LOG_PATH}" +echo "pid_file=${PID_PATH}" +echo "cmd_file=${CMD_PATH}" +echo "run_dir=${RUN_DIR}" +echo +echo "tail -f ${LOG_PATH}" +echo "cat ${RUN_DIR}/leaderboard.csv" diff --git a/scripts/evaluation/run_coarse_fusion_tuning.sh b/scripts/evaluation/run_coarse_fusion_tuning.sh new file mode 100755 index 0000000..d3b3df9 --- /dev/null +++ b/scripts/evaluation/run_coarse_fusion_tuning.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -euo pipefail + +cd "$(dirname "$0")/../.." +source ./activate.sh + +python scripts/evaluation/tune_fusion.py \ + --mode optimize \ + --search-space scripts/evaluation/tuning/coarse_rank_fusion_space.yaml \ + --seed-report artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md \ + --tenant-id 163 \ + --queries-file scripts/evaluation/queries/queries.txt \ + --top-k 100 \ + --language en \ + --search-base-url http://127.0.0.1:6002 \ + --eval-web-base-url http://127.0.0.1:6010 \ + "$@" diff --git a/scripts/evaluation/start_coarse_fusion_tuning_long.sh b/scripts/evaluation/start_coarse_fusion_tuning_long.sh new file mode 100755 index 0000000..1560923 --- /dev/null +++ b/scripts/evaluation/start_coarse_fusion_tuning_long.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +set -euo pipefail + +cd "$(dirname "$0")/../.." +source ./activate.sh + +RUN_NAME="${RUN_NAME:-coarse_fusion_long_$(date -u +%Y%m%dT%H%M%SZ)}" +MAX_EVALS="${MAX_EVALS:-36}" +BATCH_SIZE="${BATCH_SIZE:-3}" +CANDIDATE_POOL_SIZE="${CANDIDATE_POOL_SIZE:-512}" +RANDOM_SEED="${RANDOM_SEED:-20260416}" + +LAUNCH_DIR="artifacts/search_evaluation/tuning_launches" +mkdir -p "${LAUNCH_DIR}" +LOG_PATH="${LAUNCH_DIR}/${RUN_NAME}.log" +PID_PATH="${LAUNCH_DIR}/${RUN_NAME}.pid" +CMD_PATH="${LAUNCH_DIR}/${RUN_NAME}.cmd" + +CMD=( + python + scripts/evaluation/tune_fusion.py + --mode optimize + --run-name "${RUN_NAME}" + --search-space scripts/evaluation/tuning/coarse_rank_fusion_space.yaml + --seed-report artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md + --tenant-id 163 + --queries-file scripts/evaluation/queries/queries.txt + --top-k 100 + --language en + --search-base-url http://127.0.0.1:6002 + --eval-web-base-url http://127.0.0.1:6010 + --max-evals "${MAX_EVALS}" + --batch-size "${BATCH_SIZE}" + --candidate-pool-size "${CANDIDATE_POOL_SIZE}" + --random-seed "${RANDOM_SEED}" +) + +if [ "$#" -gt 0 ]; then + CMD+=("$@") +fi + +printf '%q ' "${CMD[@]}" > "${CMD_PATH}" +printf '\n' >> "${CMD_PATH}" + +nohup "${CMD[@]}" > "${LOG_PATH}" 2>&1 & +PID=$! +echo "${PID}" > "${PID_PATH}" + +echo "run_name=${RUN_NAME}" +echo "pid=${PID}" +echo "log=${LOG_PATH}" +echo "pid_file=${PID_PATH}" +echo "cmd_file=${CMD_PATH}" +echo "run_dir=artifacts/search_evaluation/tuning_runs/${RUN_NAME}" +echo +echo "tail -f ${LOG_PATH}" +echo "cat artifacts/search_evaluation/tuning_runs/${RUN_NAME}/leaderboard.csv" diff --git a/scripts/evaluation/tune_fusion.py b/scripts/evaluation/tune_fusion.py index f7d2a87..9f6f112 100644 --- a/scripts/evaluation/tune_fusion.py +++ b/scripts/evaluation/tune_fusion.py @@ -4,23 +4,37 @@ from __future__ import annotations import argparse import copy +import csv import json +import math +import random import re +import shutil import subprocess import sys import time from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Sequence +import numpy as np import requests import yaml +try: + from sklearn.gaussian_process import GaussianProcessRegressor + from sklearn.gaussian_process.kernels import ConstantKernel, Matern, WhiteKernel +except Exception: # noqa: BLE001 + GaussianProcessRegressor = None # type: ignore[assignment] + ConstantKernel = None # type: ignore[assignment] + Matern = None # type: ignore[assignment] + WhiteKernel = None # type: ignore[assignment] + PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) -from scripts.evaluation.eval_framework import ( +from scripts.evaluation.eval_framework import ( # noqa: E402 DEFAULT_ARTIFACT_ROOT, DEFAULT_QUERY_FILE, ensure_dir, @@ -30,6 +44,7 @@ from scripts.evaluation.eval_framework import ( CONFIG_PATH = PROJECT_ROOT / "config" / "config.yaml" +LOG_DIR = PROJECT_ROOT / "logs" @dataclass @@ -39,6 +54,108 @@ class ExperimentSpec: params: Dict[str, Any] +@dataclass +class ParameterSpec: + name: str + lower: float + upper: float + scale: str = "linear" + round_digits: int = 6 + + def __post_init__(self) -> None: + if self.lower >= self.upper: + raise ValueError(f"invalid bounds for {self.name}: {self.lower} >= {self.upper}") + if self.scale not in {"linear", "log"}: + raise ValueError(f"unsupported scale={self.scale!r} for {self.name}") + if self.scale == "log" and (self.lower <= 0 or self.upper <= 0): + raise ValueError(f"log-scaled parameter {self.name} must have positive bounds") + + @property + def transformed_lower(self) -> float: + return math.log10(self.lower) if self.scale == "log" else self.lower + + @property + def transformed_upper(self) -> float: + return math.log10(self.upper) if self.scale == "log" else self.upper + + @property + def transformed_span(self) -> float: + return self.transformed_upper - self.transformed_lower + + def transform(self, value: float) -> float: + clipped = min(max(float(value), self.lower), self.upper) + return math.log10(clipped) if self.scale == "log" else clipped + + def inverse_transform(self, value: float) -> float: + raw = (10 ** value) if self.scale == "log" else value + raw = min(max(float(raw), self.lower), self.upper) + return round(raw, self.round_digits) + + def sample_uniform(self, rng: random.Random) -> float: + draw = rng.uniform(self.transformed_lower, self.transformed_upper) + return self.inverse_transform(draw) + + +@dataclass +class SearchSpace: + target_path: str + baseline: Dict[str, float] + parameters: List[ParameterSpec] + seed_experiments: List[ExperimentSpec] + init_random: int = 6 + candidate_pool_size: int = 256 + explore_probability: float = 0.25 + local_jitter_probability: float = 0.45 + elite_fraction: float = 0.35 + min_normalized_distance: float = 0.14 + + @property + def parameter_names(self) -> List[str]: + return [item.name for item in self.parameters] + + def fill_params(self, params: Dict[str, Any]) -> Dict[str, float]: + merged = {name: float(self.baseline[name]) for name in self.parameter_names} + for name, value in params.items(): + if name not in merged: + raise KeyError(f"unknown parameter in search space: {name}") + merged[name] = float(value) + return { + spec.name: spec.inverse_transform(spec.transform(float(merged[spec.name]))) + for spec in self.parameters + } + + def sample_random(self, rng: random.Random) -> Dict[str, float]: + return {spec.name: spec.sample_uniform(rng) for spec in self.parameters} + + def vectorize(self, params: Dict[str, Any]) -> np.ndarray: + merged = self.fill_params(params) + return np.array([spec.transform(float(merged[spec.name])) for spec in self.parameters], dtype=float) + + def from_vector(self, vector: Sequence[float]) -> Dict[str, float]: + return { + spec.name: spec.inverse_transform(float(vector[idx])) + for idx, spec in enumerate(self.parameters) + } + + def normalized_vector(self, params: Dict[str, Any]) -> np.ndarray: + vector = self.vectorize(params) + parts: List[float] = [] + for idx, spec in enumerate(self.parameters): + parts.append((vector[idx] - spec.transformed_lower) / max(spec.transformed_span, 1e-9)) + return np.array(parts, dtype=float) + + def canonical_key(self, params: Dict[str, Any]) -> str: + return json.dumps(self.fill_params(params), ensure_ascii=False, sort_keys=True) + + +@dataclass +class CandidateProposal: + name: str + description: str + params: Dict[str, float] + source: str + + def load_yaml(path: Path) -> Dict[str, Any]: return yaml.safe_load(path.read_text(encoding="utf-8")) @@ -50,6 +167,13 @@ def write_yaml(path: Path, payload: Dict[str, Any]) -> None: ) +def get_nested_value(payload: Dict[str, Any], dotted_path: str) -> Any: + current: Any = payload + for part in dotted_path.split("."): + current = current[part] + return current + + def set_nested_value(payload: Dict[str, Any], dotted_path: str, value: Any) -> None: current = payload parts = dotted_path.split(".") @@ -58,16 +182,115 @@ def set_nested_value(payload: Dict[str, Any], dotted_path: str, value: Any) -> N current[parts[-1]] = value -def apply_params(base_config: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]: +def apply_target_params(base_config: Dict[str, Any], target_path: str, params: Dict[str, Any]) -> Dict[str, Any]: candidate = copy.deepcopy(base_config) - for dotted_path, value in params.items(): - set_nested_value(candidate, dotted_path, value) + for key, value in params.items(): + set_nested_value(candidate, f"{target_path}.{key}", value) return candidate +def read_queries(path: Path) -> List[str]: + return [ + line.strip() + for line in path.read_text(encoding="utf-8").splitlines() + if line.strip() and not line.strip().startswith("#") + ] + + +def run_restart(targets: Sequence[str]) -> None: + cmd = ["./restart.sh", *targets] + subprocess.run(cmd, cwd=PROJECT_ROOT, check=True, timeout=900) + + +def bytes_to_gib(value: int) -> float: + return float(value) / float(1024 ** 3) + + +def get_free_disk_bytes(path: Path) -> int: + return int(shutil.disk_usage(path).free) + + +def iter_log_cleanup_candidates() -> List[Path]: + if not LOG_DIR.is_dir(): + return [] + items: List[Path] = [] + seen: set[str] = set() + for path in LOG_DIR.rglob("*"): + try: + if not path.is_file(): + continue + resolved = path.resolve() + key = str(resolved) + if key in seen: + continue + seen.add(key) + items.append(resolved) + except FileNotFoundError: + continue + items.sort(key=lambda item: item.stat().st_size if item.exists() else 0, reverse=True) + return items + + +def truncate_file(path: Path) -> int: + if not path.exists() or not path.is_file(): + return 0 + size = int(path.stat().st_size) + if size <= 0: + return 0 + with path.open("w", encoding="utf-8"): + pass + return size + + +def ensure_disk_headroom( + *, + min_free_gb: float, + auto_truncate_logs: bool, + context: str, +) -> None: + required_bytes = int(min_free_gb * (1024 ** 3)) + free_bytes = get_free_disk_bytes(PROJECT_ROOT) + if free_bytes >= required_bytes: + return + + print( + f"[disk] low free space before {context}: " + f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB" + ) + if not auto_truncate_logs: + raise RuntimeError( + f"insufficient disk headroom before {context}: " + f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB" + ) + + reclaimed_bytes = 0 + for candidate in iter_log_cleanup_candidates(): + try: + reclaimed = truncate_file(candidate) + except Exception as exc: # noqa: BLE001 + print(f"[disk] skip truncate {candidate}: {exc}") + continue + if reclaimed <= 0: + continue + reclaimed_bytes += reclaimed + free_bytes = get_free_disk_bytes(PROJECT_ROOT) + print( + f"[disk] truncated {candidate} reclaimed={bytes_to_gib(reclaimed):.2f}GiB " + f"free_now={bytes_to_gib(free_bytes):.2f}GiB" + ) + if free_bytes >= required_bytes: + return + + raise RuntimeError( + f"insufficient disk headroom after log truncation before {context}: " + f"free={bytes_to_gib(free_bytes):.2f}GiB required={min_free_gb:.2f}GiB " + f"reclaimed={bytes_to_gib(reclaimed_bytes):.2f}GiB" + ) + + def wait_for_backend(base_url: str, timeout_sec: float = 300.0) -> Dict[str, Any]: deadline = time.time() + timeout_sec - last_error = None + last_error: Any = None while time.time() < deadline: try: response = requests.get(f"{base_url.rstrip('/')}/health", timeout=10) @@ -82,16 +305,69 @@ def wait_for_backend(base_url: str, timeout_sec: float = 300.0) -> Dict[str, Any raise RuntimeError(f"backend did not become healthy: {last_error}") -def run_restart() -> None: - subprocess.run(["./restart.sh", "backend"], cwd=PROJECT_ROOT, check=True, timeout=600) +def wait_for_eval_web(base_url: str, timeout_sec: float = 90.0) -> Dict[str, Any]: + url = f"{base_url.rstrip('/')}/api/history" + deadline = time.time() + timeout_sec + last_error: Any = None + while time.time() < deadline: + try: + response = requests.get(url, timeout=10) + response.raise_for_status() + payload = response.json() + if isinstance(payload, dict) and "history" in payload: + return payload + last_error = payload + except Exception as exc: # noqa: BLE001 + last_error = str(exc) + time.sleep(2.0) + raise RuntimeError(f"eval-web did not become healthy: {last_error}") + + +def ensure_eval_web(eval_web_base_url: str) -> Dict[str, Any]: + try: + return wait_for_eval_web(eval_web_base_url, timeout_sec=20.0) + except Exception: # noqa: BLE001 + run_restart(["eval-web"]) + return wait_for_eval_web(eval_web_base_url, timeout_sec=120.0) -def read_queries(path: Path) -> List[str]: - return [ - line.strip() - for line in path.read_text(encoding="utf-8").splitlines() - if line.strip() and not line.strip().startswith("#") - ] +def verify_backend_config(base_url: str, target_path: str, expected: Dict[str, Any], tol: float = 1e-6) -> bool: + response = requests.get(f"{base_url.rstrip('/')}/admin/config", timeout=20) + response.raise_for_status() + payload = response.json() + candidate_paths = [target_path] + if not target_path.startswith("search."): + candidate_paths.append(f"search.{target_path}") + if target_path.startswith("search."): + candidate_paths.append(target_path[len("search."):]) + + live_block = None + for path in candidate_paths: + try: + maybe_block = get_nested_value(payload, path) + except Exception: # noqa: BLE001 + continue + if isinstance(maybe_block, dict): + live_block = maybe_block + break + if live_block is None: + raise RuntimeError( + f"unable to resolve backend config path {target_path!r}; " + f"tried={candidate_paths!r} top_level_keys={sorted(payload.keys())[:20]!r}" + ) + for key, expected_value in expected.items(): + live_value = live_block[key] + if isinstance(expected_value, (int, float)): + if abs(float(live_value) - float(expected_value)) > tol: + raise RuntimeError( + f"backend config mismatch for {target_path}.{key}: " + f"expected={expected_value} live={live_value}" + ) + elif live_value != expected_value: + raise RuntimeError( + f"backend config mismatch for {target_path}.{key}: expected={expected_value!r} live={live_value!r}" + ) + return True def run_batch_eval( @@ -126,95 +402,580 @@ def run_batch_eval( timeout=7200, ) output = (completed.stdout or "") + "\n" + (completed.stderr or "") - match = re.search(r"batch_id=([A-Za-z0-9_]+)\s+aggregate_metrics=(\{.*\})", output) - if not match: + batch_ids = re.findall(r"batch_id=([A-Za-z0-9_]+)", output) + if not batch_ids: raise RuntimeError(f"failed to parse batch output: {output[-2000:]}") - batch_id = match.group(1) - aggregate_metrics = json.loads(match.group(2).replace("'", '"')) + batch_id = batch_ids[-1] + batch_json_path = DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_id}.json" + if not batch_json_path.is_file(): + raise RuntimeError(f"batch json not found after eval: {batch_json_path}") + payload = json.loads(batch_json_path.read_text(encoding="utf-8")) return { "batch_id": batch_id, - "aggregate_metrics": aggregate_metrics, + "payload": payload, "raw_output": output, + "batch_json_path": str(batch_json_path), + "batch_report_path": str(DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_id}.md"), + } + + +def resolve_batch_json_path(path_like: str) -> Path: + path = Path(path_like) + if not path.is_absolute(): + path = (PROJECT_ROOT / path).resolve() + if path.suffix == ".json": + return path + if path.suffix == ".md": + candidate = path.with_suffix(".json") + if candidate.is_file(): + return candidate + if path.is_file(): + return path + candidate = path.parent / f"{path.name}.json" + if candidate.is_file(): + return candidate + raise FileNotFoundError(f"cannot resolve batch json from: {path_like}") + + +def load_batch_payload(path_like: str) -> Dict[str, Any]: + path = resolve_batch_json_path(path_like) + return json.loads(path.read_text(encoding="utf-8")) + + +def load_experiments(path: Path) -> List[ExperimentSpec]: + payload = json.loads(path.read_text(encoding="utf-8")) + items = payload["experiments"] if isinstance(payload, dict) else payload + experiments: List[ExperimentSpec] = [] + for item in items: + experiments.append( + ExperimentSpec( + name=str(item["name"]), + description=str(item.get("description") or ""), + params=dict(item.get("params") or {}), + ) + ) + return experiments + + +def load_search_space(path: Path) -> SearchSpace: + payload = load_yaml(path) + parameters = [ + ParameterSpec( + name=str(name), + lower=float(spec["min"]), + upper=float(spec["max"]), + scale=str(spec.get("scale", "linear")), + round_digits=int(spec.get("round", 6)), + ) + for name, spec in dict(payload["parameters"]).items() + ] + baseline = {str(key): float(value) for key, value in dict(payload["baseline"]).items()} + seed_experiments = [ + ExperimentSpec( + name=str(item["name"]), + description=str(item.get("description") or ""), + params={str(k): float(v) for k, v in dict(item.get("params") or {}).items()}, + ) + for item in list(payload.get("seed_experiments") or []) + ] + optimizer = dict(payload.get("optimizer") or {}) + return SearchSpace( + target_path=str(payload["target_path"]), + baseline=baseline, + parameters=parameters, + seed_experiments=seed_experiments, + init_random=int(optimizer.get("init_random", 6)), + candidate_pool_size=int(optimizer.get("candidate_pool_size", 256)), + explore_probability=float(optimizer.get("explore_probability", 0.25)), + local_jitter_probability=float(optimizer.get("local_jitter_probability", 0.45)), + elite_fraction=float(optimizer.get("elite_fraction", 0.35)), + min_normalized_distance=float(optimizer.get("min_normalized_distance", 0.14)), + ) + + +def load_existing_trials(run_dir: Path) -> List[Dict[str, Any]]: + path = run_dir / "trials.jsonl" + if not path.is_file(): + return [] + trials: List[Dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if line: + trials.append(json.loads(line)) + return trials + + +def append_trial(run_dir: Path, trial: Dict[str, Any]) -> None: + path = run_dir / "trials.jsonl" + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(trial, ensure_ascii=False) + "\n") + + +def live_success_trials(trials: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: + return [ + item + for item in trials + if item.get("status") == "ok" and not bool(item.get("is_seed")) + ] + + +def all_success_trials(trials: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: + return [item for item in trials if item.get("status") == "ok"] + + +def score_of(trial: Dict[str, Any], metric: str) -> float: + return float((trial.get("aggregate_metrics") or {}).get(metric, trial.get("score", 0.0)) or 0.0) + + +def next_trial_name(trials: Sequence[Dict[str, Any]], prefix: str) -> str: + return f"{prefix}_{len(trials) + 1:03d}" + + +def normal_pdf(x: float) -> float: + return math.exp(-0.5 * x * x) / math.sqrt(2.0 * math.pi) + + +def normal_cdf(x: float) -> float: + return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) + + +def expected_improvement(mu: float, sigma: float, best: float, xi: float = 0.002) -> float: + if sigma <= 1e-12: + return max(mu - best - xi, 0.0) + z = (mu - best - xi) / sigma + return (mu - best - xi) * normal_cdf(z) + sigma * normal_pdf(z) + + +def normalized_distance(space: SearchSpace, left: Dict[str, Any], right: Dict[str, Any]) -> float: + lv = space.normalized_vector(left) + rv = space.normalized_vector(right) + return float(np.linalg.norm(lv - rv) / math.sqrt(len(space.parameters))) + + +def fit_surrogate(space: SearchSpace, trials: Sequence[Dict[str, Any]], metric: str, seed: int) -> Any: + if GaussianProcessRegressor is None or len(trials) < 4: + return None + X = np.array([space.vectorize(item["params"]) for item in trials], dtype=float) + y = np.array([score_of(item, metric) for item in trials], dtype=float) + if len(np.unique(np.round(y, 8))) < 2: + return None + try: + kernel = ( + ConstantKernel(1.0, (1e-3, 1e3)) + * Matern(length_scale=np.ones(len(space.parameters)), length_scale_bounds=(1e-2, 1e2), nu=2.5) + + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-8, 1e-1)) + ) + gp = GaussianProcessRegressor( + kernel=kernel, + normalize_y=True, + n_restarts_optimizer=2, + random_state=seed, + ) + gp.fit(X, y) + return gp + except Exception: # noqa: BLE001 + return None + + +def build_sampling_spread(space: SearchSpace, elite_vectors: np.ndarray) -> np.ndarray: + spans = np.array([spec.transformed_span for spec in space.parameters], dtype=float) + floor = np.maximum(spans * 0.05, 0.015) + ceiling = np.maximum(spans * 0.5, floor) + if elite_vectors.shape[0] <= 1: + return np.minimum(np.maximum(spans * 0.18, floor), ceiling) + elite_std = elite_vectors.std(axis=0) + elite_range = elite_vectors.max(axis=0) - elite_vectors.min(axis=0) + spread = np.maximum(elite_std * 1.8, elite_range * 0.5) + return np.minimum(np.maximum(spread, floor), ceiling) + + +def sample_local_candidate( + space: SearchSpace, + rng: random.Random, + center: np.ndarray, + spread: np.ndarray, +) -> Dict[str, float]: + draw = [] + for idx, spec in enumerate(space.parameters): + value = rng.gauss(float(center[idx]), float(spread[idx])) + value = min(max(value, spec.transformed_lower), spec.transformed_upper) + draw.append(value) + return space.from_vector(draw) + + +def sample_crossover_candidate( + space: SearchSpace, + rng: random.Random, + left: np.ndarray, + right: np.ndarray, +) -> Dict[str, float]: + draw = [] + for idx, spec in enumerate(space.parameters): + mix = rng.random() + value = float(left[idx]) * mix + float(right[idx]) * (1.0 - mix) + jitter = spec.transformed_span * 0.04 + value += rng.uniform(-jitter, jitter) + value = min(max(value, spec.transformed_lower), spec.transformed_upper) + draw.append(value) + return space.from_vector(draw) + + +def propose_candidates( + *, + space: SearchSpace, + trials: Sequence[Dict[str, Any]], + metric: str, + batch_size: int, + rng: random.Random, + init_random: int, + candidate_pool_size: int, +) -> List[CandidateProposal]: + existing_keys = {space.canonical_key(item["params"]) for item in trials if item.get("params")} + proposals: List[CandidateProposal] = [] + + for seed in space.seed_experiments: + params = space.fill_params(seed.params) + key = space.canonical_key(params) + if key not in existing_keys: + proposals.append( + CandidateProposal( + name=seed.name, + description=seed.description, + params=params, + source="seed_experiment", + ) + ) + existing_keys.add(key) + if len(proposals) >= batch_size: + return proposals + + successes = live_success_trials(trials) + if len(successes) < init_random: + while len(proposals) < batch_size: + params = space.sample_random(rng) + key = space.canonical_key(params) + if key in existing_keys: + continue + proposals.append( + CandidateProposal( + name=f"random_{len(successes) + len(proposals) + 1:03d}", + description="global random exploration", + params=params, + source="random", + ) + ) + existing_keys.add(key) + return proposals + + ranked = sorted(successes, key=lambda item: score_of(item, metric), reverse=True) + elite_count = max(2, min(len(ranked), int(math.ceil(len(ranked) * space.elite_fraction)))) + elites = ranked[:elite_count] + elite_vectors = np.array([space.vectorize(item["params"]) for item in elites], dtype=float) + spread = build_sampling_spread(space, elite_vectors) + gp = fit_surrogate(space, successes, metric, seed=rng.randint(1, 10_000_000)) + best_score = score_of(ranked[0], metric) + best_vector = space.vectorize(ranked[0]["params"]) + + pool: List[Dict[str, Any]] = [] + pool_keys = set(existing_keys) + attempts = 0 + max_attempts = max(candidate_pool_size * 12, 200) + while len(pool) < candidate_pool_size and attempts < max_attempts: + attempts += 1 + roll = rng.random() + if roll < space.explore_probability: + params = space.sample_random(rng) + source = "global_explore" + elif roll < space.explore_probability + space.local_jitter_probability: + center = elite_vectors[rng.randrange(len(elite_vectors))] + params = sample_local_candidate(space, rng, center=center, spread=spread) + source = "elite_jitter" + else: + if len(elite_vectors) >= 2: + left = elite_vectors[rng.randrange(len(elite_vectors))] + right = elite_vectors[rng.randrange(len(elite_vectors))] + params = sample_crossover_candidate(space, rng, left=left, right=right) + source = "elite_crossover" + else: + params = sample_local_candidate(space, rng, center=best_vector, spread=spread) + source = "best_jitter" + key = space.canonical_key(params) + if key in pool_keys: + continue + pool_keys.add(key) + pool.append({"params": params, "source": source}) + + if not pool: + return proposals + + if gp is not None: + X = np.array([space.vectorize(item["params"]) for item in pool], dtype=float) + mu, sigma = gp.predict(X, return_std=True) + for idx, item in enumerate(pool): + item["acquisition"] = expected_improvement(float(mu[idx]), float(sigma[idx]), best_score) + item["uncertainty"] = float(sigma[idx]) + item["predicted_score"] = float(mu[idx]) + pool.sort( + key=lambda item: ( + float(item.get("acquisition") or 0.0), + float(item.get("uncertainty") or 0.0), + float(item.get("predicted_score") or 0.0), + ), + reverse=True, + ) + else: + rng.shuffle(pool) + + chosen_params = [item.params for item in proposals] + chosen: List[CandidateProposal] = [] + for item in pool: + params = item["params"] + if any(normalized_distance(space, params, other) < space.min_normalized_distance for other in chosen_params): + continue + chosen_params.append(params) + chosen.append( + CandidateProposal( + name=f"bo_{len(successes) + len(proposals) + len(chosen) + 1:03d}", + description=( + f"{item['source']} predicted={item.get('predicted_score', 'n/a')} " + f"ei={item.get('acquisition', 'n/a')}" + ), + params=params, + source=str(item["source"]), + ) + ) + if len(proposals) + len(chosen) >= batch_size: + break + + proposals.extend(chosen) + if len(proposals) < batch_size: + while len(proposals) < batch_size: + params = space.sample_random(rng) + key = space.canonical_key(params) + if key in existing_keys: + continue + proposals.append( + CandidateProposal( + name=f"fallback_{len(successes) + len(proposals) + 1:03d}", + description="fallback random exploration", + params=params, + source="fallback_random", + ) + ) + existing_keys.add(key) + return proposals + + +def compare_query_deltas( + baseline_payload: Dict[str, Any] | None, + best_payload: Dict[str, Any] | None, + metric: str, + limit: int = 8, +) -> Dict[str, List[Dict[str, Any]]]: + if not baseline_payload or not best_payload: + return {"gains": [], "losses": []} + base = { + str(item["query"]): float(item["metrics"].get(metric, 0.0)) + for item in baseline_payload.get("per_query") or [] + } + cur = { + str(item["query"]): float(item["metrics"].get(metric, 0.0)) + for item in best_payload.get("per_query") or [] } + rows: List[Dict[str, Any]] = [] + for query, score in cur.items(): + if query not in base: + continue + rows.append( + { + "query": query, + "baseline": round(base[query], 6), + "current": round(score, 6), + "delta": round(score - base[query], 6), + } + ) + rows.sort(key=lambda item: item["delta"], reverse=True) + gains = [item for item in rows[:limit] if item["delta"] > 0] + losses = [item for item in rows[-limit:] if item["delta"] < 0] + losses.sort(key=lambda item: item["delta"]) + return {"gains": gains, "losses": losses} + +def render_markdown( + *, + run_id: str, + created_at: str, + tenant_id: str, + query_count: int, + top_k: int, + metric: str, + trials: Sequence[Dict[str, Any]], +) -> str: + successes = sorted(all_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) + live_successes = sorted(live_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) + best = successes[0] if successes else None + baseline = next((item for item in successes if item.get("is_seed")), None) + best_payload = load_batch_payload(best["batch_json_path"]) if best and best.get("batch_json_path") else None + baseline_payload = ( + load_batch_payload(baseline["batch_json_path"]) + if baseline and baseline.get("batch_json_path") + else None + ) + delta_summary = compare_query_deltas(baseline_payload, best_payload, metric) if best else {"gains": [], "losses": []} -def render_markdown(summary: Dict[str, Any]) -> str: lines = [ "# Fusion Tuning Report", "", - f"- Created at: {summary['created_at']}", - f"- Tenant ID: {summary['tenant_id']}", - f"- Query count: {summary['query_count']}", - f"- Top K: {summary['top_k']}", - f"- Score metric: {summary['score_metric']}", + f"- Run ID: {run_id}", + f"- Created at: {created_at}", + f"- Tenant ID: {tenant_id}", + f"- Query count: {query_count}", + f"- Top K: {top_k}", + f"- Score metric: {metric}", + f"- Successful live evals: {len(live_successes)}", "", - "## Experiments", + "## Leaderboard", "", - "| Rank | Name | Score | Primary | NDCG@20 | ERR@10 | Strong@10 | Gain Recall@20 | Config |", - "|---|---|---:|---:|---:|---:|---:|---:|---|", + "| Rank | Name | Source | Score | Primary | NDCG@20 | ERR@10 | Gain Recall@20 | Batch |", + "|---|---|---|---:|---:|---:|---:|---:|---|", ] - for idx, item in enumerate(summary["experiments"], start=1): - metrics = item["aggregate_metrics"] + for idx, item in enumerate(successes, start=1): + metrics = item.get("aggregate_metrics") or {} lines.append( "| " + " | ".join( [ str(idx), - item["name"], - str(item["score"]), + str(item.get("name") or ""), + str(item.get("source") or ""), + f"{score_of(item, metric):.6f}", str(metrics.get("Primary_Metric_Score", "")), str(metrics.get("NDCG@20", "")), str(metrics.get("ERR@10", "")), - str(metrics.get("Strong_Precision@10", "")), str(metrics.get("Gain_Recall@20", "")), - item["config_snapshot_path"], + str(item.get("batch_id") or ""), ] ) + " |" ) - lines.extend(["", "## Details", ""]) - for item in summary["experiments"]: - lines.append(f"### {item['name']}") - lines.append("") - lines.append(f"- Description: {item['description']}") - lines.append(f"- Score: {item['score']}") - lines.append(f"- Params: `{json.dumps(item['params'], ensure_ascii=False, sort_keys=True)}`") - lines.append(f"- Batch report: {item['batch_report_path']}") - lines.append("") - return "\n".join(lines) + if best: + lines.extend( + [ + "", + "## Best Params", + "", + f"- Name: {best['name']}", + f"- Source: {best['source']}", + f"- Score: {score_of(best, metric):.6f}", + f"- Params: `{json.dumps(best['params'], ensure_ascii=False, sort_keys=True)}`", + f"- Batch report: {best.get('batch_report_path') or ''}", + ] + ) -def load_experiments(path: Path) -> List[ExperimentSpec]: - payload = json.loads(path.read_text(encoding="utf-8")) - items = payload["experiments"] if isinstance(payload, dict) else payload - experiments: List[ExperimentSpec] = [] - for item in items: - experiments.append( - ExperimentSpec( - name=str(item["name"]), - description=str(item.get("description") or ""), - params=dict(item.get("params") or {}), - ) + if delta_summary["gains"] or delta_summary["losses"]: + lines.extend(["", "## Best vs Baseline", ""]) + if delta_summary["gains"]: + lines.append("### Top Gains") + lines.append("") + for item in delta_summary["gains"]: + lines.append( + f"- {item['query']}: {item['baseline']:.6f} -> {item['current']:.6f} ({item['delta']:+.6f})" + ) + if delta_summary["losses"]: + lines.append("") + lines.append("### Top Losses") + lines.append("") + for item in delta_summary["losses"]: + lines.append( + f"- {item['query']}: {item['baseline']:.6f} -> {item['current']:.6f} ({item['delta']:+.6f})" + ) + + failures = [item for item in trials if item.get("status") != "ok"] + if failures: + lines.extend(["", "## Failures", ""]) + for item in failures: + lines.append(f"- {item.get('name')}: {item.get('error')}") + + return "\n".join(lines) + "\n" + + +def write_leaderboard_csv(run_dir: Path, metric: str, trials: Sequence[Dict[str, Any]], parameter_names: Sequence[str]) -> None: + path = run_dir / "leaderboard.csv" + successes = sorted(all_success_trials(trials), key=lambda item: score_of(item, metric), reverse=True) + with path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.writer(handle) + writer.writerow( + [ + "rank", + "name", + "source", + "score", + "Primary_Metric_Score", + "NDCG@20", + "ERR@10", + "Gain_Recall@20", + "batch_id", + *parameter_names, + ] ) - return experiments + for idx, item in enumerate(successes, start=1): + metrics = item.get("aggregate_metrics") or {} + row = [ + idx, + item.get("name") or "", + item.get("source") or "", + f"{score_of(item, metric):.6f}", + metrics.get("Primary_Metric_Score", ""), + metrics.get("NDCG@20", ""), + metrics.get("ERR@10", ""), + metrics.get("Gain_Recall@20", ""), + item.get("batch_id") or "", + ] + row.extend(item.get("params", {}).get(name, "") for name in parameter_names) + writer.writerow(row) -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Run fusion tuning experiments against the live backend") - parser.add_argument("--tenant-id", default="163") - parser.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) - parser.add_argument("--top-k", type=int, default=100) - parser.add_argument("--language", default="en") - parser.add_argument("--experiments-file", required=True) - parser.add_argument("--search-base-url", default="http://127.0.0.1:6002") - parser.add_argument("--score-metric", default="Primary_Metric_Score") - parser.add_argument("--apply-best", action="store_true") - parser.add_argument("--force-refresh-labels-first-pass", action="store_true") - return parser +def persist_run_summary( + *, + run_dir: Path, + run_id: str, + tenant_id: str, + query_count: int, + top_k: int, + metric: str, + trials: Sequence[Dict[str, Any]], + parameter_names: Sequence[str], +) -> None: + summary = { + "run_id": run_id, + "created_at": utc_now_iso(), + "tenant_id": tenant_id, + "query_count": query_count, + "top_k": top_k, + "score_metric": metric, + "trials": list(trials), + } + (run_dir / "summary.json").write_text( + json.dumps(summary, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + (run_dir / "summary.md").write_text( + render_markdown( + run_id=run_id, + created_at=summary["created_at"], + tenant_id=tenant_id, + query_count=query_count, + top_k=top_k, + metric=metric, + trials=trials, + ), + encoding="utf-8", + ) + write_leaderboard_csv(run_dir, metric, trials, parameter_names) -def main() -> None: - args = build_parser().parse_args() +def run_experiment_mode(args: argparse.Namespace) -> None: queries_file = Path(args.queries_file) queries = read_queries(queries_file) base_config_text = CONFIG_PATH.read_text(encoding="utf-8") @@ -222,19 +983,33 @@ def main() -> None: experiments = load_experiments(Path(args.experiments_file)) tuning_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "tuning_runs") - run_id = f"tuning_{utc_timestamp()}" + run_id = args.run_name or f"tuning_{utc_timestamp()}" run_dir = ensure_dir(tuning_dir / run_id) results: List[Dict[str, Any]] = [] try: for experiment in experiments: - candidate = apply_params(base_config, experiment.params) + params = dict(experiment.params) + target_path = args.target_path or "coarse_rank.fusion" + candidate = apply_target_params(base_config, target_path, params) write_yaml(CONFIG_PATH, candidate) - candidate_config_path = run_dir / f"{experiment.name}_config.yaml" + candidate_config_path = ensure_dir(run_dir / "configs") / f"{experiment.name}_config.yaml" write_yaml(candidate_config_path, candidate) - run_restart() + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"restart {experiment.name}", + ) + run_restart(args.restart_targets) health = wait_for_backend(args.search_base_url) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"batch eval {experiment.name}", + ) batch_result = run_batch_eval( tenant_id=args.tenant_id, queries_file=queries_file, @@ -242,21 +1017,27 @@ def main() -> None: language=args.language, force_refresh_labels=bool(args.force_refresh_labels_first_pass and not results), ) - aggregate_metrics = dict(batch_result["aggregate_metrics"]) + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"persist {experiment.name}", + ) + payload = batch_result["payload"] + aggregate_metrics = dict(payload["aggregate_metrics"]) results.append( { "name": experiment.name, "description": experiment.description, - "params": experiment.params, + "params": params, "aggregate_metrics": aggregate_metrics, "score": float(aggregate_metrics.get(args.score_metric, 0.0)), "batch_id": batch_result["batch_id"], - "batch_report_path": str( - DEFAULT_ARTIFACT_ROOT / "batch_reports" / f"{batch_result['batch_id']}.md" - ), - "config_snapshot_path": str(candidate_config_path), + "batch_json_path": batch_result["batch_json_path"], + "batch_report_path": batch_result["batch_report_path"], + "candidate_config_path": str(candidate_config_path), "backend_health": health, - "batch_stdout": batch_result["raw_output"], + "status": "ok", + "source": "experiments_file", } ) print( @@ -265,32 +1046,285 @@ def main() -> None: ) finally: if args.apply_best and results: - best = max(results, key=lambda item: item["score"]) - best_config = apply_params(base_config, best["params"]) + best = max(results, key=lambda item: score_of(item, args.score_metric)) + best_config = apply_target_params(base_config, args.target_path or "coarse_rank.fusion", best["params"]) write_yaml(CONFIG_PATH, best_config) - run_restart() + run_restart(args.restart_targets) wait_for_backend(args.search_base_url) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) else: CONFIG_PATH.write_text(base_config_text, encoding="utf-8") - run_restart() + run_restart(args.restart_targets) wait_for_backend(args.search_base_url) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) - results.sort(key=lambda item: item["score"], reverse=True) - summary = { - "run_id": run_id, - "created_at": utc_now_iso(), - "tenant_id": args.tenant_id, - "query_count": len(queries), - "top_k": args.top_k, - "score_metric": args.score_metric, - "experiments": results, - } - summary_json_path = run_dir / "summary.json" - summary_md_path = run_dir / "summary.md" - summary_json_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") - summary_md_path.write_text(render_markdown(summary), encoding="utf-8") - print(f"[done] summary_json={summary_json_path}") - print(f"[done] summary_md={summary_md_path}") + persist_run_summary( + run_dir=run_dir, + run_id=run_id, + tenant_id=str(args.tenant_id), + query_count=len(queries), + top_k=args.top_k, + metric=args.score_metric, + trials=results, + parameter_names=list(results[0]["params"].keys()) if results else [], + ) + print(f"[done] summary_json={run_dir / 'summary.json'}") + print(f"[done] summary_md={run_dir / 'summary.md'}") + + +def run_optimize_mode(args: argparse.Namespace) -> None: + queries_file = Path(args.queries_file) + queries = read_queries(queries_file) + base_config_text = CONFIG_PATH.read_text(encoding="utf-8") + base_config = load_yaml(CONFIG_PATH) + search_space_path = Path(args.search_space) + space = load_search_space(search_space_path) + rng = random.Random(args.random_seed) + + tuning_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "tuning_runs") + run_dir = ( + Path(args.resume_run).resolve() + if args.resume_run + else ensure_dir(tuning_dir / (args.run_name or f"coarse_fusion_bo_{utc_timestamp()}")) + ) + run_id = run_dir.name + ensure_dir(run_dir / "configs") + ensure_dir(run_dir / "logs") + if not (run_dir / "search_space.yaml").exists(): + (run_dir / "search_space.yaml").write_text(search_space_path.read_text(encoding="utf-8"), encoding="utf-8") + + trials = load_existing_trials(run_dir) + if args.seed_report: + baseline_params = space.fill_params(space.baseline) + baseline_key = space.canonical_key(baseline_params) + if baseline_key not in {space.canonical_key(item["params"]) for item in trials if item.get("params")}: + payload = load_batch_payload(args.seed_report) + trial = { + "trial_id": next_trial_name(trials, "trial"), + "name": "seed_baseline", + "description": f"seeded from {args.seed_report}", + "source": "seed_report", + "is_seed": True, + "status": "ok", + "created_at": utc_now_iso(), + "params": baseline_params, + "score": float(payload["aggregate_metrics"].get(args.score_metric, 0.0)), + "aggregate_metrics": dict(payload["aggregate_metrics"]), + "batch_id": payload["batch_id"], + "batch_json_path": str(resolve_batch_json_path(args.seed_report)), + "batch_report_path": str(resolve_batch_json_path(args.seed_report).with_suffix(".md")), + } + append_trial(run_dir, trial) + trials.append(trial) + + init_random = args.init_random if args.init_random is not None else space.init_random + candidate_pool_size = args.candidate_pool_size if args.candidate_pool_size is not None else space.candidate_pool_size + + try: + live_done = len(live_success_trials(trials)) + while live_done < args.max_evals: + remaining = args.max_evals - live_done + current_batch_size = min(args.batch_size, remaining) + proposals = propose_candidates( + space=space, + trials=trials, + metric=args.score_metric, + batch_size=current_batch_size, + rng=rng, + init_random=init_random, + candidate_pool_size=candidate_pool_size, + ) + if not proposals: + raise RuntimeError("optimizer failed to produce new candidate proposals") + + for proposal in proposals: + force_refresh_labels = bool(args.force_refresh_labels_first_pass and live_done == 0 and not any(t.get("is_seed") for t in trials)) + trial_id = next_trial_name(trials, "trial") + candidate_config = apply_target_params(base_config, space.target_path, proposal.params) + candidate_config_path = run_dir / "configs" / f"{trial_id}_{proposal.name}.yaml" + trial_log_path = run_dir / "logs" / f"{trial_id}_{proposal.name}.log" + write_yaml(CONFIG_PATH, candidate_config) + write_yaml(candidate_config_path, candidate_config) + print( + f"[tune] start {proposal.name} source={proposal.source} " + f"params={json.dumps(proposal.params, ensure_ascii=False, sort_keys=True)}" + ) + try: + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"restart {proposal.name}", + ) + run_restart(args.restart_targets) + backend_health = wait_for_backend(args.search_base_url) + verify_backend_config(args.search_base_url, space.target_path, proposal.params) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"batch eval {proposal.name}", + ) + batch_result = run_batch_eval( + tenant_id=args.tenant_id, + queries_file=queries_file, + top_k=args.top_k, + language=args.language, + force_refresh_labels=force_refresh_labels, + ) + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"persist {proposal.name}", + ) + payload = batch_result["payload"] + trial_log_path.write_text(batch_result["raw_output"], encoding="utf-8") + aggregate_metrics = dict(payload["aggregate_metrics"]) + trial = { + "trial_id": trial_id, + "name": proposal.name, + "description": proposal.description, + "source": proposal.source, + "is_seed": False, + "status": "ok", + "created_at": utc_now_iso(), + "params": proposal.params, + "score": float(aggregate_metrics.get(args.score_metric, 0.0)), + "aggregate_metrics": aggregate_metrics, + "batch_id": batch_result["batch_id"], + "batch_json_path": batch_result["batch_json_path"], + "batch_report_path": batch_result["batch_report_path"], + "candidate_config_path": str(candidate_config_path), + "trial_log_path": str(trial_log_path), + "backend_health": backend_health, + } + print( + f"[tune] done {proposal.name} " + f"{args.score_metric}={trial['score']:.6f} " + f"Primary={aggregate_metrics.get('Primary_Metric_Score')}" + ) + except Exception as exc: # noqa: BLE001 + trial = { + "trial_id": trial_id, + "name": proposal.name, + "description": proposal.description, + "source": proposal.source, + "is_seed": False, + "status": "error", + "created_at": utc_now_iso(), + "params": proposal.params, + "error": str(exc), + "candidate_config_path": str(candidate_config_path), + "trial_log_path": str(trial_log_path), + } + print(f"[tune] error {proposal.name}: {exc}") + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"error-persist {proposal.name}", + ) + append_trial(run_dir, trial) + trials.append(trial) + ensure_disk_headroom( + min_free_gb=args.min_free_gb, + auto_truncate_logs=args.auto_truncate_logs, + context=f"summary {proposal.name}", + ) + persist_run_summary( + run_dir=run_dir, + run_id=run_id, + tenant_id=str(args.tenant_id), + query_count=len(queries), + top_k=args.top_k, + metric=args.score_metric, + trials=trials, + parameter_names=space.parameter_names, + ) + if trial.get("status") == "ok": + live_done += 1 + if live_done >= args.max_evals: + break + finally: + if args.apply_best: + successes = all_success_trials(trials) + best_live = max(successes, key=lambda item: score_of(item, args.score_metric)) if successes else None + if best_live: + best_config = apply_target_params(base_config, space.target_path, best_live["params"]) + write_yaml(CONFIG_PATH, best_config) + run_restart(args.restart_targets) + wait_for_backend(args.search_base_url) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) + else: + CONFIG_PATH.write_text(base_config_text, encoding="utf-8") + run_restart(args.restart_targets) + wait_for_backend(args.search_base_url) + if args.heal_eval_web: + ensure_eval_web(args.eval_web_base_url) + + persist_run_summary( + run_dir=run_dir, + run_id=run_id, + tenant_id=str(args.tenant_id), + query_count=len(queries), + top_k=args.top_k, + metric=args.score_metric, + trials=trials, + parameter_names=space.parameter_names, + ) + print(f"[done] run_dir={run_dir}") + print(f"[done] summary_json={run_dir / 'summary.json'}") + print(f"[done] summary_md={run_dir / 'summary.md'}") + print(f"[done] leaderboard_csv={run_dir / 'leaderboard.csv'}") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Tune coarse/fusion params against the live backend with adaptive Bayesian-style search." + ) + parser.add_argument("--mode", choices=["optimize", "experiments"], default="optimize") + parser.add_argument("--tenant-id", default="163") + parser.add_argument("--queries-file", default=str(DEFAULT_QUERY_FILE)) + parser.add_argument("--top-k", type=int, default=100) + parser.add_argument("--language", default="en") + parser.add_argument("--search-base-url", default="http://127.0.0.1:6002") + parser.add_argument("--eval-web-base-url", default="http://127.0.0.1:6010") + parser.add_argument("--score-metric", default="Primary_Metric_Score") + parser.add_argument("--restart-targets", nargs="+", default=["backend"]) + parser.add_argument("--heal-eval-web", action=argparse.BooleanOptionalAction, default=True) + parser.add_argument("--force-refresh-labels-first-pass", action="store_true") + parser.add_argument("--apply-best", action="store_true") + parser.add_argument("--run-name", default=None) + + parser.add_argument("--experiments-file") + parser.add_argument("--target-path", default="coarse_rank.fusion") + + parser.add_argument( + "--search-space", + default=str(PROJECT_ROOT / "scripts" / "evaluation" / "tuning" / "coarse_rank_fusion_space.yaml"), + ) + parser.add_argument("--seed-report", default=None) + parser.add_argument("--resume-run", default=None) + parser.add_argument("--max-evals", type=int, default=12) + parser.add_argument("--batch-size", type=int, default=3) + parser.add_argument("--init-random", type=int, default=None) + parser.add_argument("--candidate-pool-size", type=int, default=None) + parser.add_argument("--random-seed", type=int, default=20260415) + parser.add_argument("--min-free-gb", type=float, default=5.0) + parser.add_argument("--auto-truncate-logs", action=argparse.BooleanOptionalAction, default=True) + return parser + + +def main() -> None: + args = build_parser().parse_args() + if args.mode == "experiments": + if not args.experiments_file: + raise SystemExit("--experiments-file is required when --mode=experiments") + run_experiment_mode(args) + return + run_optimize_mode(args) if __name__ == "__main__": diff --git a/scripts/evaluation/tuning/README.md b/scripts/evaluation/tuning/README.md new file mode 100644 index 0000000..2351258 --- /dev/null +++ b/scripts/evaluation/tuning/README.md @@ -0,0 +1,71 @@ +# Coarse Fusion 长跑调参 + +## 启动一轮长跑 + +```bash +./scripts/evaluation/start_coarse_fusion_tuning_long.sh +``` + +可用环境变量: + +```bash +MAX_EVALS=48 BATCH_SIZE=3 CANDIDATE_POOL_SIZE=512 \ +RUN_NAME=coarse_fusion_long_001 \ +./scripts/evaluation/start_coarse_fusion_tuning_long.sh +``` + +启动后会打印: + +- `run_name` +- `pid` +- `log` +- `run_dir` + +默认搜索空间: + +- `scripts/evaluation/tuning/coarse_rank_fusion_space.yaml` + +默认 baseline seed: + +- `artifacts/search_evaluation/batch_reports/batch_20260415T150754Z_00b6a8aa3d.md` + +## 查看进度 + +```bash +tail -f artifacts/search_evaluation/tuning_launches/.log +cat artifacts/search_evaluation/tuning_runs//leaderboard.csv +sed -n '1,200p' artifacts/search_evaluation/tuning_runs//summary.md +``` + +实时记录文件: + +- `trials.jsonl` +- `leaderboard.csv` +- `summary.json` +- `summary.md` + +## 续跑 + +```bash +./scripts/evaluation/resume_coarse_fusion_tuning_long.sh +``` + +也可直接传完整目录: + +```bash +./scripts/evaluation/resume_coarse_fusion_tuning_long.sh \ + artifacts/search_evaluation/tuning_runs/ +``` + +## 停止 + +```bash +kill "$(cat artifacts/search_evaluation/tuning_launches/.pid)" +``` + +## 说明 + +- 每轮会自动写入 `config/config.yaml` +- 每轮会自动执行 `./restart.sh backend` +- 如果 `eval-web` 因 backend 重启不可用,调参器会尝试补拉起 `eval-web` +- 默认不 `apply-best`,跑完后会恢复 baseline 配置 diff --git a/scripts/evaluation/tuning/coarse_rank_fusion_space.yaml b/scripts/evaluation/tuning/coarse_rank_fusion_space.yaml new file mode 100644 index 0000000..5d9ed38 --- /dev/null +++ b/scripts/evaluation/tuning/coarse_rank_fusion_space.yaml @@ -0,0 +1,153 @@ +target_path: coarse_rank.fusion + +baseline: + es_bias: 10.0 + es_exponent: 0.05 + text_bias: 0.1 + text_exponent: 0.35 + text_translation_weight: 1.0 + knn_text_weight: 1.0 + knn_image_weight: 2.0 + knn_tie_breaker: 0.3 + knn_bias: 0.2 + knn_exponent: 5.6 + knn_text_bias: 0.2 + knn_text_exponent: 0.0 + knn_image_bias: 0.2 + knn_image_exponent: 0.0 + +parameters: + es_bias: {min: 0.3, max: 80.0, scale: log, round: 4} + es_exponent: {min: 0.0, max: 0.4, scale: linear, round: 4} + text_bias: {min: 0.001, max: 4.0, scale: log, round: 4} + text_exponent: {min: 0.02, max: 1.6, scale: linear, round: 4} + text_translation_weight: {min: 0.1, max: 2.5, scale: linear, round: 4} + knn_text_weight: {min: 0.1, max: 4.0, scale: linear, round: 4} + knn_image_weight: {min: 0.1, max: 6.0, scale: linear, round: 4} + knn_tie_breaker: {min: 0.0, max: 1.0, scale: linear, round: 4} + knn_bias: {min: 0.001, max: 4.0, scale: log, round: 4} + knn_exponent: {min: 0.05, max: 12.0, scale: log, round: 4} + knn_text_bias: {min: 0.001, max: 4.0, scale: log, round: 4} + knn_text_exponent: {min: 0.0, max: 6.0, scale: linear, round: 4} + knn_image_bias: {min: 0.001, max: 4.0, scale: log, round: 4} + knn_image_exponent: {min: 0.0, max: 6.0, scale: linear, round: 4} + +seed_experiments: + - name: seed_knn_soften + description: 压低 knn 全局指数,先验证当前 5.6 是否过猛 + params: + text_exponent: 0.42 + knn_image_weight: 1.2 + knn_bias: 0.35 + knn_exponent: 1.4 + - name: seed_text_guard + description: 提升 lexical 稳定性,抑制翻译与 image knn 过度主导 + params: + text_exponent: 0.62 + text_translation_weight: 0.75 + knn_image_weight: 1.0 + knn_tie_breaker: 0.15 + knn_exponent: 2.2 + - name: seed_semantic_balanced + description: 让 text/image knn 都参与,但降低 image 偏置和总指数 + params: + text_exponent: 0.32 + knn_text_weight: 1.4 + knn_image_weight: 1.8 + knn_tie_breaker: 0.45 + knn_bias: 0.18 + knn_exponent: 3.0 + - name: seed_component_exp + description: 打开 knn_text/image 子项指数,观察全局 knn_exponent 是否可下放 + params: + knn_bias: 0.15 + knn_exponent: 1.6 + knn_text_exponent: 0.8 + knn_image_exponent: 0.4 + - name: seed_es_relax + description: 增强 es 因子的区分度,验证 coarse 是否过分压平 lexical 分数 + params: + es_bias: 3.0 + es_exponent: 0.11 + text_exponent: 0.48 + knn_exponent: 2.6 + - name: seed_image_heavy + description: 刻意放大 image knn 做对照,看哪些 query 会明显受损 + params: + text_exponent: 0.22 + knn_text_weight: 0.9 + knn_image_weight: 3.4 + knn_tie_breaker: 0.55 + knn_bias: 0.12 + knn_exponent: 3.8 + - name: seed_high_knn_global + description: 沿着 baseline 继续上探更强 knn 全局指数,验证 5.6 是否仍偏保守 + params: + text_exponent: 0.28 + knn_text_weight: 1.1 + knn_image_weight: 2.6 + knn_tie_breaker: 0.4 + knn_bias: 0.12 + knn_exponent: 7.2 + - name: seed_text_knn_split + description: 提高 text knn,压低 image knn,同时打开 text/image 子项指数 + params: + text_exponent: 0.38 + knn_text_weight: 2.0 + knn_image_weight: 0.8 + knn_tie_breaker: 0.2 + knn_bias: 0.08 + knn_exponent: 4.8 + knn_text_exponent: 1.1 + knn_image_exponent: 0.15 + - name: seed_image_split + description: 保持较高 image 权重,但把非线性拆到 image 子项而不是全局 knn + params: + text_exponent: 0.26 + knn_text_weight: 0.9 + knn_image_weight: 3.0 + knn_tie_breaker: 0.35 + knn_bias: 0.08 + knn_exponent: 3.4 + knn_text_exponent: 0.2 + knn_image_exponent: 1.0 + - name: seed_es_text_sharpen + description: 提升 es 与 lexical 区分度,测试 coarse 是否需要更强文本排序稳定性 + params: + es_bias: 2.0 + es_exponent: 0.16 + text_bias: 0.03 + text_exponent: 0.78 + text_translation_weight: 0.9 + knn_bias: 0.1 + knn_exponent: 5.0 + - name: seed_translation_discount + description: 明显削弱 translation 命中,验证抽象 query 是否过度依赖翻译通路 + params: + text_exponent: 0.44 + text_translation_weight: 0.45 + knn_text_weight: 1.2 + knn_image_weight: 1.7 + knn_tie_breaker: 0.25 + knn_exponent: 5.4 + - name: seed_near_baseline_jitter + description: 贴近 baseline 做小扰动,优先寻找可行增益而不是只测极端方向 + params: + es_bias: 8.0 + es_exponent: 0.06 + text_bias: 0.06 + text_exponent: 0.31 + text_translation_weight: 1.1 + knn_text_weight: 1.1 + knn_image_weight: 2.2 + knn_tie_breaker: 0.34 + knn_bias: 0.16 + knn_exponent: 5.9 + +optimizer: + init_random: 8 + candidate_pool_size: 512 + explore_probability: 0.28 + local_jitter_probability: 0.42 + elite_fraction: 0.35 + min_normalized_distance: 0.12 diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index d9cf873..9d89cc6 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -213,6 +213,7 @@ health_path_for_service() { local service="$1" case "${service}" in backend|indexer|embedding|embedding-image|translator|reranker|reranker-fine|tei) echo "/health" ;; + eval-web) echo "/api/history" ;; *) echo "" ;; esac } @@ -469,7 +470,7 @@ monitor_services() { if [ "${recent_count}" -ge "${max_restarts_per_hour}" ]; then monitor_log_event "${svc}" "error" "restart suppressed by hourly cap (${max_restarts_per_hour}/hour)" if [ -x "${wechat_alert_py}" ] || [ -f "${wechat_alert_py}" ]; then - python "${wechat_alert_py}" \ + "$(config_python_bin)" "${wechat_alert_py}" \ --service "${svc}" \ --level "error" \ --message "监控检测到服务连续多次健康检查失败,且已达到每小时最大重启次数上限(${max_restarts_per_hour} 次/小时),请及时排查。" @@ -479,7 +480,7 @@ monitor_services() { monitor_log_event "${svc}" "error" "triggering restart after ${fail_streak[${svc}]} consecutive failures" if [ -x "${wechat_alert_py}" ] || [ -f "${wechat_alert_py}" ]; then - python "${wechat_alert_py}" \ + "$(config_python_bin)" "${wechat_alert_py}" \ --service "${svc}" \ --level "error" \ --message "监控检测到服务连续 ${fail_streak[${svc}]} 次健康检查失败,正在尝试自动重启。" @@ -494,7 +495,7 @@ monitor_services() { restart_history["${svc}"]="${restart_history[${svc}]:-} ${now}" monitor_log_event "${svc}" "error" "restart failed, inspect $(log_file "${svc}")" if [ -x "${wechat_alert_py}" ] || [ -f "${wechat_alert_py}" ]; then - python "${wechat_alert_py}" \ + "$(config_python_bin)" "${wechat_alert_py}" \ --service "${svc}" \ --level "error" \ --message "监控检测到服务连续 ${fail_streak[${svc}]} 次健康检查失败,自动重启尝试失败,请尽快登录服务器查看日志:$(log_file "${svc}")." @@ -609,7 +610,13 @@ is_running_by_port() { local service="$1" local port port="$(get_port "${service}")" - [ -n "${port}" ] && lsof -ti:"${port}" >/dev/null 2>&1 + [ -n "${port}" ] && lsof -nP -iTCP:"${port}" -sTCP:LISTEN -t >/dev/null 2>&1 +} + +list_listen_pids_by_port() { + local port="$1" + [ -n "${port}" ] || return 0 + lsof -nP -iTCP:"${port}" -sTCP:LISTEN -t 2>/dev/null || true } is_running_tei_container() { @@ -794,14 +801,14 @@ stop_one() { port="$(get_port "${service}")" if [ -n "${port}" ]; then local pids - pids="$(lsof -ti:${port} 2>/dev/null || true)" + pids="$(list_listen_pids_by_port "${port}")" if [ -n "${pids}" ]; then echo "[stop] ${service} port=${port} pids=${pids}" for pid in ${pids}; do kill -TERM "${pid}" 2>/dev/null || true done sleep 1 - pids="$(lsof -ti:${port} 2>/dev/null || true)" + pids="$(list_listen_pids_by_port "${port}")" for pid in ${pids}; do kill -KILL "${pid}" 2>/dev/null || true done @@ -854,7 +861,7 @@ status_one() { pid_info="$(cat "$(pid_file "${service}")" 2>/dev/null || echo "-")" elif is_running_by_port "${service}"; then running="yes" - pid_info="$(lsof -ti:${port} 2>/dev/null | tr '\n' ',' | sed 's/,$//' || echo "-")" + pid_info="$(list_listen_pids_by_port "${port}" | tr '\n' ',' | sed 's/,$//' || echo "-")" fi if [ "${running}" = "yes" ]; then -- libgit2 0.21.2