Commit 0ba0e0fce48d6561324575dd5cb2344b9b1faf6d

Authored by tangwang
1 parent 984f14f9

1. rerank漏斗配置优化

2. +service_enabled_by_config() {
reranker|reranker-fine|translator 如果被关闭,则run.sh all 不启动该服务
@@ -154,7 +154,8 @@ class SearchRequest(BaseModel): @@ -154,7 +154,8 @@ class SearchRequest(BaseModel):
154 enable_rerank: Optional[bool] = Field( 154 enable_rerank: Optional[bool] = Field(
155 None, 155 None,
156 description=( 156 description=(
157 - "是否开启重排(调用外部重排服务对 ES 结果进行二次排序)。" 157 + "是否开启最终重排(调用外部 rerank 服务改写上一阶段顺序)。"
  158 + "关闭时仍保留 coarse/fine 流程,仅在 rerank 阶段保序透传。"
158 "不传则使用服务端配置 rerank.enabled(默认开启)。" 159 "不传则使用服务端配置 rerank.enabled(默认开启)。"
159 ) 160 )
160 ) 161 )
config/config.yaml
@@ -292,8 +292,8 @@ function_score: @@ -292,8 +292,8 @@ function_score:
292 # 粗排配置(仅融合 ES 文本/向量信号,不调用模型) 292 # 粗排配置(仅融合 ES 文本/向量信号,不调用模型)
293 coarse_rank: 293 coarse_rank:
294 enabled: true 294 enabled: true
295 - input_window: 700  
296 - output_window: 240 295 + input_window: 480
  296 + output_window: 160
297 fusion: 297 fusion:
298 es_bias: 10.0 298 es_bias: 10.0
299 es_exponent: 0.05 299 es_exponent: 0.05
@@ -309,6 +309,7 @@ coarse_rank: @@ -309,6 +309,7 @@ coarse_rank:
309 knn_exponent: 0.4 309 knn_exponent: 0.4
310 310
311 # 精排配置(轻量 reranker) 311 # 精排配置(轻量 reranker)
  312 +# enabled=false 时仍进入 fine 阶段,但保序透传,不调用 fine 模型服务
312 fine_rank: 313 fine_rank:
313 enabled: false 314 enabled: false
314 input_window: 160 315 input_window: 160
@@ -319,6 +320,7 @@ fine_rank: @@ -319,6 +320,7 @@ fine_rank:
319 service_profile: fine 320 service_profile: fine
320 321
321 # 重排配置(provider/URL 在 services.rerank) 322 # 重排配置(provider/URL 在 services.rerank)
  323 +# enabled=false 时仍进入 rerank 阶段,但保序透传,不调用最终 rerank 服务
322 rerank: 324 rerank:
323 enabled: true 325 enabled: true
324 rerank_window: 160 326 rerank_window: 160
@@ -510,7 +512,7 @@ services: @@ -510,7 +512,7 @@ services:
510 default: 512 default:
511 host: 0.0.0.0 513 host: 0.0.0.0
512 port: 6007 514 port: 6007
513 - backend: qwen3_vllm_score 515 + backend: bge
514 runtime_dir: ./.runtime/reranker/default 516 runtime_dir: ./.runtime/reranker/default
515 fine: 517 fine:
516 host: 0.0.0.0 518 host: 0.0.0.0
docs/issues/a 0 → 100644
docs/issues/issue-2026-04-12-test-env.md 0 → 100644
@@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
  1 +120.76.41.98 端口22 用户名和密码:
  2 +tw twtw@123 (有sudo权限)
  3 +这台机器上的目录/home/tw/saas-search 已经部署了本项目
  4 +请帮我运行项目
  5 +1. 帮我checkout一个test环境的分支,这个分支,把重排、翻译模型 都关闭掉,因为这台机gpu显存较小(embedding模型可以保留)
  6 +2. 在这个分支,把服务都启动起来
  7 +3. 使用docker,安装一个ES,参考本项目的文档 ES9*.md。因为这台机器已经有一个系统的elasticsearch,为了不相互干扰,将本项目依赖的es9安装到docker,并且在测试环境配置的es地址做适配的工作
  8 +
  9 +
  10 +1. 不是要禁用6005,而是6005端口已经有对应的文本服务了,直接用就行
  11 +2. 6005其实就是本项目的一个历史早期版本启动起来的,在另外一个目录:/home/tw/SearchEngine,请看他的启动配置
  12 +nohup bash scripts/start_embedding_service.sh > log.start_embedding_service.0412 2>&1 &
  13 +是这样启动起来的
  14 +看他陪的文本是用的哪套方案、哪个模型,跟他对齐(我指的是当前的测试分支)
  15 +
  16 +
scripts/service_ctl.sh
@@ -20,6 +20,7 @@ CORE_SERVICES=("backend" "indexer" "frontend" "eval-web") @@ -20,6 +20,7 @@ CORE_SERVICES=("backend" "indexer" "frontend" "eval-web")
20 OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker") 20 OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker")
21 FULL_SERVICES=("${OPTIONAL_SERVICES[@]}" "${CORE_SERVICES[@]}") 21 FULL_SERVICES=("${OPTIONAL_SERVICES[@]}" "${CORE_SERVICES[@]}")
22 STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei") 22 STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei")
  23 +declare -Ag SERVICE_ENABLED_CACHE=()
23 24
24 all_services() { 25 all_services() {
25 echo "${FULL_SERVICES[@]}" 26 echo "${FULL_SERVICES[@]}"
@@ -33,6 +34,72 @@ config_python_bin() { @@ -33,6 +34,72 @@ config_python_bin() {
33 fi 34 fi
34 } 35 }
35 36
  37 +service_enabled_by_config() {
  38 + local service="$1"
  39 + case "${service}" in
  40 + reranker|reranker-fine|translator)
  41 + ;;
  42 + *)
  43 + return 0
  44 + ;;
  45 + esac
  46 +
  47 + if [ -n "${SERVICE_ENABLED_CACHE[${service}]+x}" ]; then
  48 + [ "${SERVICE_ENABLED_CACHE[${service}]}" = "1" ]
  49 + return
  50 + fi
  51 +
  52 + local pybin
  53 + pybin="$(config_python_bin)"
  54 +
  55 + local enabled
  56 + if ! enabled="$(
  57 + SERVICE_NAME="${service}" \
  58 + PYTHONPATH="${PROJECT_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" \
  59 + "${pybin}" - <<'PY'
  60 +from config.loader import get_app_config
  61 +import os
  62 +
  63 +service = os.environ["SERVICE_NAME"]
  64 +cfg = get_app_config()
  65 +
  66 +enabled = True
  67 +if service == "reranker":
  68 + enabled = bool(cfg.search.rerank.enabled)
  69 +elif service == "reranker-fine":
  70 + enabled = bool(cfg.search.fine_rank.enabled)
  71 +elif service == "translator":
  72 + capabilities = dict(cfg.services.translation.capabilities or {})
  73 + enabled = any(bool((value or {}).get("enabled", True)) for value in capabilities.values())
  74 +
  75 +print("1" if enabled else "0")
  76 +PY
  77 + )"; then
  78 + echo "[warn] failed to read config state for ${service}; defaulting to enabled" >&2
  79 + enabled="1"
  80 + fi
  81 +
  82 + SERVICE_ENABLED_CACHE["${service}"]="${enabled}"
  83 + [ "${enabled}" = "1" ]
  84 +}
  85 +
  86 +filter_disabled_targets() {
  87 + local targets="$1"
  88 + local verbose="${2:-quiet}"
  89 + local out=""
  90 + local svc
  91 +
  92 + for svc in ${targets}; do
  93 + if service_enabled_by_config "${svc}"; then
  94 + out="${out} ${svc}"
  95 + elif [ "${verbose}" = "verbose" ]; then
  96 + echo "[skip] ${svc} disabled by config" >&2
  97 + fi
  98 + done
  99 +
  100 + echo "${out# }"
  101 +}
  102 +
36 reranker_instance_for_service() { 103 reranker_instance_for_service() {
37 local service="$1" 104 local service="$1"
38 case "${service}" in 105 case "${service}" in
@@ -468,6 +535,16 @@ stop_monitor_daemon() { @@ -468,6 +535,16 @@ stop_monitor_daemon() {
468 535
469 start_monitor_daemon() { 536 start_monitor_daemon() {
470 local targets="$1" 537 local targets="$1"
  538 + if [ -z "${targets}" ]; then
  539 + if is_monitor_daemon_running; then
  540 + echo "[info] no enabled services to monitor; stopping monitor daemon"
  541 + stop_monitor_daemon
  542 + else
  543 + echo "[info] no enabled services to monitor"
  544 + fi
  545 + return 0
  546 + fi
  547 +
471 local pf 548 local pf
472 pf="$(monitor_pid_file)" 549 pf="$(monitor_pid_file)"
473 local tf 550 local tf
@@ -581,6 +658,10 @@ wait_for_startup_health() { @@ -581,6 +658,10 @@ wait_for_startup_health() {
581 start_one() { 658 start_one() {
582 local service="$1" 659 local service="$1"
583 cd "${PROJECT_ROOT}" 660 cd "${PROJECT_ROOT}"
  661 + if ! service_enabled_by_config "${service}"; then
  662 + echo "[skip] ${service} disabled by config"
  663 + return 0
  664 + fi
584 local cmd 665 local cmd
585 if ! cmd="$(service_start_cmd "${service}")"; then 666 if ! cmd="$(service_start_cmd "${service}")"; then
586 echo "[error] unknown service: ${service}" >&2 667 echo "[error] unknown service: ${service}" >&2
@@ -953,6 +1034,7 @@ main() { @@ -953,6 +1034,7 @@ main() {
953 1034
954 load_env_file "${PROJECT_ROOT}/.env" 1035 load_env_file "${PROJECT_ROOT}/.env"
955 local targets="" 1036 local targets=""
  1037 + local effective_targets=""
956 local monitor_was_running=0 1038 local monitor_was_running=0
957 local monitor_prev_targets="" 1039 local monitor_prev_targets=""
958 local auto_monitor_on_start="${SERVICE_CTL_AUTO_MONITOR_ON_START:-1}" 1040 local auto_monitor_on_start="${SERVICE_CTL_AUTO_MONITOR_ON_START:-1}"
@@ -976,12 +1058,23 @@ main() { @@ -976,12 +1058,23 @@ main() {
976 ;; 1058 ;;
977 esac 1059 esac
978 1060
  1061 + effective_targets="${targets}"
  1062 + case "${action}" in
  1063 + up|start|restart|monitor|monitor-start)
  1064 + effective_targets="$(filter_disabled_targets "${targets}" "verbose")"
  1065 + ;;
  1066 + esac
  1067 +
979 case "${action}" in 1068 case "${action}" in
980 up) 1069 up)
981 - for svc in ${targets}; do 1070 + if [ -z "${effective_targets}" ]; then
  1071 + echo "[info] no enabled services in target set"
  1072 + exit 0
  1073 + fi
  1074 + for svc in ${effective_targets}; do
982 start_one "${svc}" 1075 start_one "${svc}"
983 done 1076 done
984 - start_monitor_daemon "${targets}" 1077 + start_monitor_daemon "${effective_targets}"
985 ;; 1078 ;;
986 down) 1079 down)
987 stop_monitor_daemon 1080 stop_monitor_daemon
@@ -990,11 +1083,15 @@ main() { @@ -990,11 +1083,15 @@ main() {
990 done 1083 done
991 ;; 1084 ;;
992 start) 1085 start)
993 - for svc in ${targets}; do 1086 + if [ -z "${effective_targets}" ]; then
  1087 + echo "[info] no enabled services in target set"
  1088 + exit 0
  1089 + fi
  1090 + for svc in ${effective_targets}; do
994 start_one "${svc}" 1091 start_one "${svc}"
995 done 1092 done
996 if [ "${auto_monitor_on_start}" = "1" ]; then 1093 if [ "${auto_monitor_on_start}" = "1" ]; then
997 - start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${targets}")" 1094 + start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${effective_targets}")"
998 fi 1095 fi
999 ;; 1096 ;;
1000 stop) 1097 stop)
@@ -1025,16 +1122,17 @@ main() { @@ -1025,16 +1122,17 @@ main() {
1025 for svc in ${restart_stop_targets}; do 1122 for svc in ${restart_stop_targets}; do
1026 stop_one "${svc}" 1123 stop_one "${svc}"
1027 done 1124 done
1028 - for svc in ${targets}; do 1125 + for svc in ${effective_targets}; do
1029 start_one "${svc}" 1126 start_one "${svc}"
1030 done 1127 done
1031 if [ "${monitor_was_running}" -eq 1 ]; then 1128 if [ "${monitor_was_running}" -eq 1 ]; then
1032 monitor_prev_targets="$(normalize_targets "${monitor_prev_targets}")" 1129 monitor_prev_targets="$(normalize_targets "${monitor_prev_targets}")"
  1130 + monitor_prev_targets="$(filter_disabled_targets "${monitor_prev_targets}" "quiet")"
1033 monitor_prev_targets="$(apply_target_order monitor "${monitor_prev_targets}")" 1131 monitor_prev_targets="$(apply_target_order monitor "${monitor_prev_targets}")"
1034 - [ -z "${monitor_prev_targets}" ] && monitor_prev_targets="${targets}" 1132 + [ -z "${monitor_prev_targets}" ] && monitor_prev_targets="${effective_targets}"
1035 start_monitor_daemon "${monitor_prev_targets}" 1133 start_monitor_daemon "${monitor_prev_targets}"
1036 elif [ "${auto_monitor_on_start}" = "1" ]; then 1134 elif [ "${auto_monitor_on_start}" = "1" ]; then
1037 - start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${targets}")" 1135 + start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${effective_targets}")"
1038 fi 1136 fi
1039 ;; 1137 ;;
1040 status) 1138 status)
@@ -1044,10 +1142,14 @@ main() { @@ -1044,10 +1142,14 @@ main() {
1044 monitor_daemon_status 1142 monitor_daemon_status
1045 ;; 1143 ;;
1046 monitor) 1144 monitor)
1047 - monitor_services "${targets}" 1145 + if [ -z "${effective_targets}" ]; then
  1146 + echo "[info] no enabled services in target set"
  1147 + exit 0
  1148 + fi
  1149 + monitor_services "${effective_targets}"
1048 ;; 1150 ;;
1049 monitor-start) 1151 monitor-start)
1050 - start_monitor_daemon "${targets}" 1152 + start_monitor_daemon "${effective_targets}"
1051 ;; 1153 ;;
1052 monitor-stop) 1154 monitor-stop)
1053 stop_monitor_daemon 1155 stop_monitor_daemon
search/searcher.py
@@ -401,7 +401,9 @@ class Searcher: @@ -401,7 +401,9 @@ class Searcher:
401 language: Response / field selection language hint (e.g. zh, en) 401 language: Response / field selection language hint (e.g. zh, en)
402 sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick 402 sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick
403 enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides 403 enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides
404 - whether the rerank provider is invoked (subject to rerank window). 404 + whether the final rerank provider is invoked (subject to rank window).
  405 + When false, the ranking pipeline still runs and rerank stage becomes
  406 + pass-through.
405 rerank_query_template: Override for rerank query text template; None uses 407 rerank_query_template: Override for rerank query text template; None uses
406 ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``). 408 ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``).
407 rerank_doc_template: Override for per-hit document text passed to rerank; 409 rerank_doc_template: Override for per-hit document text passed to rerank;
@@ -430,15 +432,16 @@ class Searcher: @@ -430,15 +432,16 @@ class Searcher:
430 # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启) 432 # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启)
431 rerank_enabled_by_config = bool(rc.enabled) 433 rerank_enabled_by_config = bool(rc.enabled)
432 do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank) 434 do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank)
  435 + fine_enabled = bool(fine_cfg.enabled)
433 rerank_window = rc.rerank_window 436 rerank_window = rc.rerank_window
434 coarse_input_window = max(rerank_window, int(coarse_cfg.input_window)) 437 coarse_input_window = max(rerank_window, int(coarse_cfg.input_window))
435 coarse_output_window = max(rerank_window, int(coarse_cfg.output_window)) 438 coarse_output_window = max(rerank_window, int(coarse_cfg.output_window))
436 fine_input_window = max(rerank_window, int(fine_cfg.input_window)) 439 fine_input_window = max(rerank_window, int(fine_cfg.input_window))
437 fine_output_window = max(rerank_window, int(fine_cfg.output_window)) 440 fine_output_window = max(rerank_window, int(fine_cfg.output_window))
438 - # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES  
439 - in_rerank_window = do_rerank and (from_ + size) <= rerank_window  
440 - es_fetch_from = 0 if in_rerank_window else from_  
441 - es_fetch_size = coarse_input_window if in_rerank_window else size 441 + # 多阶段排序窗口独立于最终 rerank 开关:即使关闭最终 rerank,也保留 coarse/fine 流程。
  442 + in_rank_window = (from_ + size) <= rerank_window
  443 + es_fetch_from = 0 if in_rank_window else from_
  444 + es_fetch_size = coarse_input_window if in_rank_window else size
442 445
443 es_score_normalization_factor: Optional[float] = None 446 es_score_normalization_factor: Optional[float] = None
444 initial_ranks_by_doc: Dict[str, int] = {} 447 initial_ranks_by_doc: Dict[str, int] = {}
@@ -455,7 +458,8 @@ class Searcher: @@ -455,7 +458,8 @@ class Searcher:
455 context.logger.info( 458 context.logger.info(
456 f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " 459 f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, "
457 f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, " 460 f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, "
458 - f"enable_rerank(effective)={do_rerank}, in_rerank_window={in_rerank_window}, " 461 + f"fine_enabled(config)={fine_enabled}, "
  462 + f"enable_rerank(effective)={do_rerank}, in_rank_window={in_rank_window}, "
459 f"es_fetch=({es_fetch_from},{es_fetch_size}) | " 463 f"es_fetch=({es_fetch_from},{es_fetch_size}) | "
460 f"index_languages={index_langs} | " 464 f"index_languages={index_langs} | "
461 f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", 465 f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}",
@@ -468,8 +472,9 @@ class Searcher: @@ -468,8 +472,9 @@ class Searcher:
468 'from_': from_, 472 'from_': from_,
469 'es_fetch_from': es_fetch_from, 473 'es_fetch_from': es_fetch_from,
470 'es_fetch_size': es_fetch_size, 474 'es_fetch_size': es_fetch_size,
471 - 'in_rerank_window': in_rerank_window, 475 + 'in_rank_window': in_rank_window,
472 'rerank_enabled_by_config': rerank_enabled_by_config, 476 'rerank_enabled_by_config': rerank_enabled_by_config,
  477 + 'fine_enabled': fine_enabled,
473 'enable_rerank_request': enable_rerank, 478 'enable_rerank_request': enable_rerank,
474 'rerank_query_template': effective_query_template, 479 'rerank_query_template': effective_query_template,
475 'rerank_doc_template': effective_doc_template, 480 'rerank_doc_template': effective_doc_template,
@@ -494,6 +499,7 @@ class Searcher: @@ -494,6 +499,7 @@ class Searcher:
494 context.metadata['feature_flags'] = { 499 context.metadata['feature_flags'] = {
495 'translation_enabled': enable_translation, 500 'translation_enabled': enable_translation,
496 'embedding_enabled': enable_embedding, 501 'embedding_enabled': enable_embedding,
  502 + 'fine_enabled': fine_enabled,
497 'rerank_enabled': do_rerank, 503 'rerank_enabled': do_rerank,
498 'style_intent_enabled': bool(self.style_intent_registry.enabled), 504 'style_intent_enabled': bool(self.style_intent_registry.enabled),
499 } 505 }
@@ -526,7 +532,7 @@ class Searcher: @@ -526,7 +532,7 @@ class Searcher:
526 f"语言: {parsed_query.detected_language} | " 532 f"语言: {parsed_query.detected_language} | "
527 f"关键词: {parsed_query.keywords_queries} | " 533 f"关键词: {parsed_query.keywords_queries} | "
528 f"文本向量: {'是' if parsed_query.query_vector is not None else '否'} | " 534 f"文本向量: {'是' if parsed_query.query_vector is not None else '否'} | "
529 - f"图片向量: {'是' if getattr(parsed_query, 'image_query_vector', None) is not None else '否'}", 535 + f"图片向量: {'是' if parsed_query.image_query_vector is not None else '否'}",
530 extra={'reqid': context.reqid, 'uid': context.uid} 536 extra={'reqid': context.reqid, 'uid': context.uid}
531 ) 537 )
532 except Exception as e: 538 except Exception as e:
@@ -545,17 +551,16 @@ class Searcher: @@ -545,17 +551,16 @@ class Searcher:
545 # Generate tenant-specific index name 551 # Generate tenant-specific index name
546 index_name = get_tenant_index_name(tenant_id) 552 index_name = get_tenant_index_name(tenant_id)
547 # index_name = "search_products" 553 # index_name = "search_products"
548 - 554 +
549 # No longer need to add tenant_id to filters since each tenant has its own index 555 # No longer need to add tenant_id to filters since each tenant has its own index
  556 + image_query_vector = None
  557 + if enable_embedding:
  558 + image_query_vector = parsed_query.image_query_vector
550 559
551 es_query = self.query_builder.build_query( 560 es_query = self.query_builder.build_query(
552 query_text=parsed_query.rewritten_query or parsed_query.query_normalized, 561 query_text=parsed_query.rewritten_query or parsed_query.query_normalized,
553 query_vector=parsed_query.query_vector if enable_embedding else None, 562 query_vector=parsed_query.query_vector if enable_embedding else None,
554 - image_query_vector=(  
555 - getattr(parsed_query, "image_query_vector", None)  
556 - if enable_embedding  
557 - else None  
558 - ), 563 + image_query_vector=image_query_vector,
559 filters=filters, 564 filters=filters,
560 range_filters=range_filters, 565 range_filters=range_filters,
561 facet_configs=facets, 566 facet_configs=facets,
@@ -563,7 +568,7 @@ class Searcher: @@ -563,7 +568,7 @@ class Searcher:
563 from_=es_fetch_from, 568 from_=es_fetch_from,
564 enable_knn=enable_embedding and ( 569 enable_knn=enable_embedding and (
565 parsed_query.query_vector is not None 570 parsed_query.query_vector is not None
566 - or getattr(parsed_query, "image_query_vector", None) is not None 571 + or image_query_vector is not None
567 ), 572 ),
568 min_score=min_score, 573 min_score=min_score,
569 parsed_query=parsed_query, 574 parsed_query=parsed_query,
@@ -587,8 +592,7 @@ class Searcher: @@ -587,8 +592,7 @@ class Searcher:
587 592
588 # In multi-stage rank window, first pass only needs score signals for coarse rank. 593 # In multi-stage rank window, first pass only needs score signals for coarse rank.
589 es_query_for_fetch = es_query 594 es_query_for_fetch = es_query
590 - rerank_prefetch_source = None  
591 - if in_rerank_window: 595 + if in_rank_window:
592 es_query_for_fetch = dict(es_query) 596 es_query_for_fetch = dict(es_query)
593 es_query_for_fetch["_source"] = False 597 es_query_for_fetch["_source"] = False
594 598
@@ -597,31 +601,28 @@ class Searcher: @@ -597,31 +601,28 @@ class Searcher:
597 601
598 # Store ES query in context 602 # Store ES query in context
599 context.store_intermediate_result('es_query', es_query) 603 context.store_intermediate_result('es_query', es_query)
600 - if in_rerank_window and rerank_prefetch_source is not None:  
601 - context.store_intermediate_result('es_query_rerank_prefetch_source', rerank_prefetch_source)  
602 # Serialize ES query to compute a compact size + stable digest for correlation 604 # Serialize ES query to compute a compact size + stable digest for correlation
603 es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":")) 605 es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":"))
604 es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16] 606 es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16]
605 knn_enabled = bool(enable_embedding and ( 607 knn_enabled = bool(enable_embedding and (
606 parsed_query.query_vector is not None 608 parsed_query.query_vector is not None
607 - or getattr(parsed_query, "image_query_vector", None) is not None 609 + or image_query_vector is not None
608 )) 610 ))
609 vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0 611 vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0
610 image_vector_dims = ( 612 image_vector_dims = (
611 - int(len(parsed_query.image_query_vector))  
612 - if getattr(parsed_query, "image_query_vector", None) is not None 613 + int(len(image_query_vector))
  614 + if image_query_vector is not None
613 else 0 615 else 0
614 ) 616 )
615 617
616 context.logger.info( 618 context.logger.info(
617 - "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | image_vector_dims: %s | facets: %s | rerank_prefetch_source: %s", 619 + "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | image_vector_dims: %s | facets: %s",
618 len(es_query_compact), 620 len(es_query_compact),
619 es_query_digest, 621 es_query_digest,
620 "yes" if knn_enabled else "no", 622 "yes" if knn_enabled else "no",
621 vector_dims, 623 vector_dims,
622 image_vector_dims, 624 image_vector_dims,
623 "yes" if facets else "no", 625 "yes" if facets else "no",
624 - rerank_prefetch_source,  
625 extra={'reqid': context.reqid, 'uid': context.uid} 626 extra={'reqid': context.reqid, 'uid': context.uid}
626 ) 627 )
627 _log_backend_verbose({ 628 _log_backend_verbose({
@@ -656,7 +657,7 @@ class Searcher: @@ -656,7 +657,7 @@ class Searcher:
656 body=body_for_es, 657 body=body_for_es,
657 size=es_fetch_size, 658 size=es_fetch_size,
658 from_=es_fetch_from, 659 from_=es_fetch_from,
659 - include_named_queries_score=bool(do_rerank and in_rerank_window), 660 + include_named_queries_score=bool(in_rank_window),
660 ) 661 )
661 662
662 # Store ES response in context 663 # Store ES response in context
@@ -698,10 +699,177 @@ class Searcher: @@ -698,10 +699,177 @@ class Searcher:
698 context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) 699 context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY)
699 700
700 style_intent_decisions: Dict[str, SkuSelectionDecision] = {} 701 style_intent_decisions: Dict[str, SkuSelectionDecision] = {}
701 - if do_rerank and in_rerank_window: 702 + if in_rank_window:
702 from dataclasses import asdict 703 from dataclasses import asdict
703 from config.services_config import get_rerank_backend_config, get_rerank_service_url 704 from config.services_config import get_rerank_backend_config, get_rerank_service_url
704 from .rerank_client import coarse_resort_hits, run_lightweight_rerank, run_rerank 705 from .rerank_client import coarse_resort_hits, run_lightweight_rerank, run_rerank
  706 + coarse_fusion_debug = asdict(coarse_cfg.fusion)
  707 + stage_fusion_debug = asdict(rc.fusion)
  708 +
  709 + def _rank_map(stage_hits: List[Dict[str, Any]]) -> Dict[str, int]:
  710 + return {
  711 + str(hit.get("_id")): rank
  712 + for rank, hit in enumerate(stage_hits, 1)
  713 + if hit.get("_id") is not None
  714 + }
  715 +
  716 + def _stage_debug_info(
  717 + *,
  718 + enabled: bool,
  719 + applied: bool,
  720 + skipped_reason: Optional[str],
  721 + service_profile: Optional[str],
  722 + query_template: str,
  723 + doc_template: str,
  724 + docs_in: int,
  725 + docs_out: int,
  726 + top_n: int,
  727 + meta: Optional[Dict[str, Any]] = None,
  728 + backend: Optional[str] = None,
  729 + backend_model_name: Optional[str] = None,
  730 + service_url: Optional[str] = None,
  731 + model: Optional[str] = None,
  732 + fusion: Optional[Dict[str, Any]] = None,
  733 + ) -> Dict[str, Any]:
  734 + return {
  735 + "enabled": enabled,
  736 + "applied": applied,
  737 + "passthrough": not applied,
  738 + "skipped_reason": skipped_reason,
  739 + "service_profile": service_profile,
  740 + "service_url": service_url,
  741 + "backend": backend,
  742 + "model": model,
  743 + "backend_model_name": backend_model_name,
  744 + "query_template": query_template,
  745 + "doc_template": doc_template,
  746 + "query_text": str(query_template).format_map({"query": rerank_query}),
  747 + "docs_in": docs_in,
  748 + "docs_out": docs_out,
  749 + "top_n": top_n,
  750 + "meta": meta,
  751 + "fusion": fusion,
  752 + }
  753 +
  754 + def _run_optional_stage(
  755 + *,
  756 + stage: RequestContextStage,
  757 + stage_label: str,
  758 + enabled: bool,
  759 + stage_hits: List[Dict[str, Any]],
  760 + input_limit: int,
  761 + output_limit: int,
  762 + service_profile: Optional[str],
  763 + query_template: str,
  764 + doc_template: str,
  765 + top_n: int,
  766 + debug_key: Optional[str],
  767 + runner,
  768 + ) -> tuple[List[Dict[str, Any]], Dict[str, int], Optional[Dict[str, Any]]]:
  769 + context.start_stage(stage)
  770 + try:
  771 + input_hits = list(stage_hits[:input_limit])
  772 + output_hits = list(stage_hits[:output_limit])
  773 + applied = False
  774 + skip_reason: Optional[str] = None
  775 + meta: Optional[Dict[str, Any]] = None
  776 + debug_rows: Optional[List[Dict[str, Any]]] = None
  777 +
  778 + if enabled and input_hits:
  779 + output_hits_candidate, applied, meta, debug_rows = runner(input_hits)
  780 + if applied:
  781 + output_hits = list((output_hits_candidate or input_hits)[:output_limit])
  782 + else:
  783 + skip_reason = "service_returned_none"
  784 + else:
  785 + skip_reason = "disabled" if not enabled else "no_hits"
  786 +
  787 + ranks = _rank_map(output_hits) if debug else {}
  788 + stage_info = None
  789 + if debug:
  790 + if applied:
  791 + backend_name, backend_cfg = get_rerank_backend_config(service_profile)
  792 + stage_info = _stage_debug_info(
  793 + enabled=True,
  794 + applied=True,
  795 + skipped_reason=None,
  796 + service_profile=service_profile,
  797 + service_url=get_rerank_service_url(profile=service_profile),
  798 + backend=backend_name,
  799 + backend_model_name=backend_cfg.get("model_name"),
  800 + model=meta.get("model") if isinstance(meta, dict) else None,
  801 + query_template=query_template,
  802 + doc_template=doc_template,
  803 + docs_in=len(input_hits),
  804 + docs_out=len(output_hits),
  805 + top_n=top_n,
  806 + meta=meta,
  807 + fusion=stage_fusion_debug,
  808 + )
  809 + if debug_key is not None and debug_rows is not None:
  810 + context.store_intermediate_result(debug_key, debug_rows)
  811 + else:
  812 + stage_info = _stage_debug_info(
  813 + enabled=enabled,
  814 + applied=False,
  815 + skipped_reason=skip_reason,
  816 + service_profile=service_profile,
  817 + query_template=query_template,
  818 + doc_template=doc_template,
  819 + docs_in=len(input_hits),
  820 + docs_out=len(output_hits),
  821 + top_n=top_n,
  822 + fusion=stage_fusion_debug,
  823 + )
  824 +
  825 + if applied:
  826 + context.logger.info(
  827 + "%s完成 | docs=%s | top_n=%s | meta=%s",
  828 + stage_label,
  829 + len(output_hits),
  830 + top_n,
  831 + meta,
  832 + extra={'reqid': context.reqid, 'uid': context.uid}
  833 + )
  834 + else:
  835 + context.logger.info(
  836 + "%s透传 | reason=%s | docs=%s | top_n=%s",
  837 + stage_label,
  838 + skip_reason,
  839 + len(output_hits),
  840 + top_n,
  841 + extra={'reqid': context.reqid, 'uid': context.uid}
  842 + )
  843 + return output_hits, ranks, stage_info
  844 + except Exception as e:
  845 + output_hits = list(stage_hits[:output_limit])
  846 + ranks = _rank_map(output_hits) if debug else {}
  847 + stage_info = None
  848 + if debug:
  849 + stage_info = _stage_debug_info(
  850 + enabled=enabled,
  851 + applied=False,
  852 + skipped_reason="error",
  853 + service_profile=service_profile,
  854 + query_template=query_template,
  855 + doc_template=doc_template,
  856 + docs_in=min(len(stage_hits), input_limit),
  857 + docs_out=len(output_hits),
  858 + top_n=top_n,
  859 + meta={"error": str(e)},
  860 + fusion=stage_fusion_debug,
  861 + )
  862 + context.add_warning(f"{stage_label} failed: {e}")
  863 + context.logger.warning(
  864 + "调用%s服务失败 | error: %s",
  865 + stage_label,
  866 + e,
  867 + extra={'reqid': context.reqid, 'uid': context.uid},
  868 + exc_info=True,
  869 + )
  870 + return output_hits, ranks, stage_info
  871 + finally:
  872 + context.end_stage(stage)
705 873
706 rerank_query = parsed_query.text_for_rerank() if parsed_query else query 874 rerank_query = parsed_query.text_for_rerank() if parsed_query else query
707 hits = es_response.get("hits", {}).get("hits") or [] 875 hits = es_response.get("hits", {}).get("hits") or []
@@ -716,17 +884,12 @@ class Searcher: @@ -716,17 +884,12 @@ class Searcher:
716 hits = hits[:coarse_output_window] 884 hits = hits[:coarse_output_window]
717 es_response.setdefault("hits", {})["hits"] = hits 885 es_response.setdefault("hits", {})["hits"] = hits
718 if debug: 886 if debug:
719 - coarse_ranks_by_doc = {  
720 - str(hit.get("_id")): rank  
721 - for rank, hit in enumerate(hits, 1)  
722 - if hit.get("_id") is not None 887 + coarse_ranks_by_doc = _rank_map(hits)
  888 + coarse_debug_info = {
  889 + "docs_in": es_fetch_size,
  890 + "docs_out": len(hits),
  891 + "fusion": coarse_fusion_debug,
723 } 892 }
724 - if debug:  
725 - coarse_debug_info = {  
726 - "docs_in": es_fetch_size,  
727 - "docs_out": len(hits),  
728 - "fusion": asdict(coarse_cfg.fusion),  
729 - }  
730 context.store_intermediate_result("coarse_rank_scores", coarse_debug) 893 context.store_intermediate_result("coarse_rank_scores", coarse_debug)
731 context.logger.info( 894 context.logger.info(
732 "粗排完成 | docs_in=%s | docs_out=%s", 895 "粗排完成 | docs_in=%s | docs_out=%s",
@@ -777,72 +940,42 @@ class Searcher: @@ -777,72 +940,42 @@ class Searcher:
777 extra={'reqid': context.reqid, 'uid': context.uid} 940 extra={'reqid': context.reqid, 'uid': context.uid}
778 ) 941 )
779 942
780 - fine_scores: Optional[List[float]] = None  
781 - hits = es_response.get("hits", {}).get("hits") or []  
782 - if fine_cfg.enabled and hits:  
783 - context.start_stage(RequestContextStage.FINE_RANKING)  
784 - try:  
785 - fine_scores, fine_meta, fine_debug_rows = run_lightweight_rerank(  
786 - query=rerank_query,  
787 - es_hits=hits[:fine_input_window],  
788 - language=language,  
789 - timeout_sec=fine_cfg.timeout_sec,  
790 - rerank_query_template=fine_query_template,  
791 - rerank_doc_template=fine_doc_template,  
792 - top_n=fine_output_window,  
793 - debug=debug,  
794 - fusion=rc.fusion,  
795 - style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost,  
796 - service_profile=fine_cfg.service_profile,  
797 - )  
798 - if fine_scores is not None:  
799 - hits = hits[:fine_output_window]  
800 - es_response["hits"]["hits"] = hits  
801 - if debug:  
802 - fine_ranks_by_doc = {  
803 - str(hit.get("_id")): rank  
804 - for rank, hit in enumerate(hits, 1)  
805 - if hit.get("_id") is not None  
806 - }  
807 - fine_backend_name, fine_backend_cfg = get_rerank_backend_config(fine_cfg.service_profile)  
808 - fine_debug_info = {  
809 - "service_profile": fine_cfg.service_profile,  
810 - "service_url": get_rerank_service_url(profile=fine_cfg.service_profile),  
811 - "backend": fine_backend_name,  
812 - "model": fine_meta.get("model") if isinstance(fine_meta, dict) else None,  
813 - "backend_model_name": fine_backend_cfg.get("model_name"),  
814 - "query_template": fine_query_template,  
815 - "doc_template": fine_doc_template,  
816 - "query_text": str(fine_query_template).format_map({"query": rerank_query}),  
817 - "docs_in": min(len(fine_scores), fine_input_window),  
818 - "docs_out": len(hits),  
819 - "top_n": fine_output_window,  
820 - "meta": fine_meta,  
821 - "fusion": asdict(rc.fusion),  
822 - }  
823 - context.store_intermediate_result("fine_rank_scores", fine_debug_rows)  
824 - context.logger.info(  
825 - "精排完成 | docs=%s | top_n=%s | meta=%s",  
826 - len(hits),  
827 - fine_output_window,  
828 - fine_meta,  
829 - extra={'reqid': context.reqid, 'uid': context.uid}  
830 - )  
831 - except Exception as e:  
832 - context.add_warning(f"Fine rerank failed: {e}")  
833 - context.logger.warning(  
834 - f"调用精排服务失败 | error: {e}",  
835 - extra={'reqid': context.reqid, 'uid': context.uid},  
836 - exc_info=True,  
837 - )  
838 - finally:  
839 - context.end_stage(RequestContextStage.FINE_RANKING) 943 + def _run_fine_stage(stage_input: List[Dict[str, Any]]):
  944 + fine_scores, fine_meta, fine_debug_rows = run_lightweight_rerank(
  945 + query=rerank_query,
  946 + es_hits=stage_input,
  947 + language=language,
  948 + timeout_sec=fine_cfg.timeout_sec,
  949 + rerank_query_template=fine_query_template,
  950 + rerank_doc_template=fine_doc_template,
  951 + top_n=fine_output_window,
  952 + debug=debug,
  953 + fusion=rc.fusion,
  954 + style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost,
  955 + service_profile=fine_cfg.service_profile,
  956 + )
  957 + return stage_input, fine_scores is not None, fine_meta, fine_debug_rows
  958 +
  959 + hits, fine_ranks_by_doc, fine_debug_info = _run_optional_stage(
  960 + stage=RequestContextStage.FINE_RANKING,
  961 + stage_label="精排",
  962 + enabled=fine_enabled,
  963 + stage_hits=es_response.get("hits", {}).get("hits") or [],
  964 + input_limit=fine_input_window,
  965 + output_limit=fine_output_window,
  966 + service_profile=fine_cfg.service_profile,
  967 + query_template=fine_query_template,
  968 + doc_template=fine_doc_template,
  969 + top_n=fine_output_window,
  970 + debug_key="fine_rank_scores",
  971 + runner=_run_fine_stage,
  972 + )
  973 + es_response["hits"]["hits"] = hits
840 974
841 - context.start_stage(RequestContextStage.RERANKING)  
842 - try:  
843 - final_hits = es_response.get("hits", {}).get("hits") or []  
844 - final_input = final_hits[:rerank_window]  
845 - es_response["hits"]["hits"] = final_input 975 + def _run_rerank_stage(stage_input: List[Dict[str, Any]]):
  976 + nonlocal es_response
  977 +
  978 + es_response["hits"]["hits"] = stage_input
846 es_response, rerank_meta, fused_debug = run_rerank( 979 es_response, rerank_meta, fused_debug = run_rerank(
847 query=rerank_query, 980 query=rerank_query,
848 es_response=es_response, 981 es_response=es_response,
@@ -858,48 +991,31 @@ class Searcher: @@ -858,48 +991,31 @@ class Searcher:
858 service_profile=rc.service_profile, 991 service_profile=rc.service_profile,
859 style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, 992 style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost,
860 ) 993 )
861 -  
862 - if rerank_meta is not None:  
863 - if debug:  
864 - rerank_ranks_by_doc = {  
865 - str(hit.get("_id")): rank  
866 - for rank, hit in enumerate(es_response.get("hits", {}).get("hits") or [], 1)  
867 - if hit.get("_id") is not None  
868 - }  
869 - rerank_backend_name, rerank_backend_cfg = get_rerank_backend_config(rc.service_profile)  
870 - rerank_debug_info = {  
871 - "service_profile": rc.service_profile,  
872 - "service_url": get_rerank_service_url(profile=rc.service_profile),  
873 - "backend": rerank_backend_name,  
874 - "model": rerank_meta.get("model") if isinstance(rerank_meta, dict) else None,  
875 - "backend_model_name": rerank_backend_cfg.get("model_name"),  
876 - "query_template": effective_query_template,  
877 - "doc_template": effective_doc_template,  
878 - "query_text": str(effective_query_template).format_map({"query": rerank_query}),  
879 - "docs_in": len(final_input),  
880 - "docs_out": len(es_response.get("hits", {}).get("hits") or []),  
881 - "top_n": from_ + size,  
882 - "meta": rerank_meta,  
883 - "fusion": asdict(rc.fusion),  
884 - }  
885 - context.store_intermediate_result("rerank_scores", fused_debug)  
886 - context.logger.info(  
887 - f"重排完成 | docs={len(es_response.get('hits', {}).get('hits') or [])} | "  
888 - f"top_n={from_ + size} | meta={rerank_meta}",  
889 - extra={'reqid': context.reqid, 'uid': context.uid}  
890 - )  
891 - except Exception as e:  
892 - context.add_warning(f"Rerank failed: {e}")  
893 - context.logger.warning(  
894 - f"调用重排服务失败 | error: {e}",  
895 - extra={'reqid': context.reqid, 'uid': context.uid},  
896 - exc_info=True, 994 + return (
  995 + es_response.get("hits", {}).get("hits") or [],
  996 + rerank_meta is not None,
  997 + rerank_meta,
  998 + fused_debug,
897 ) 999 )
898 - finally:  
899 - context.end_stage(RequestContextStage.RERANKING)  
900 1000
901 - # 当本次请求在重排窗口内时:已按多阶段排序产出前 rerank_window 条,需按请求的 from/size 做分页切片  
902 - if in_rerank_window: 1001 + hits, rerank_ranks_by_doc, rerank_debug_info = _run_optional_stage(
  1002 + stage=RequestContextStage.RERANKING,
  1003 + stage_label="重排",
  1004 + enabled=do_rerank,
  1005 + stage_hits=es_response.get("hits", {}).get("hits") or [],
  1006 + input_limit=rerank_window,
  1007 + output_limit=rerank_window,
  1008 + service_profile=rc.service_profile,
  1009 + query_template=effective_query_template,
  1010 + doc_template=effective_doc_template,
  1011 + top_n=from_ + size,
  1012 + debug_key="rerank_scores",
  1013 + runner=_run_rerank_stage,
  1014 + )
  1015 + es_response["hits"]["hits"] = hits
  1016 +
  1017 + # 当本次请求在排序窗口内时:已按多阶段排序产出前 rerank_window 条,需按请求的 from/size 做分页切片
  1018 + if in_rank_window:
903 hits = es_response.get("hits", {}).get("hits") or [] 1019 hits = es_response.get("hits", {}).get("hits") or []
904 sliced = hits[from_ : from_ + size] 1020 sliced = hits[from_ : from_ + size]
905 es_response.setdefault("hits", {})["hits"] = sliced 1021 es_response.setdefault("hits", {})["hits"] = sliced
@@ -961,12 +1077,12 @@ class Searcher: @@ -961,12 +1077,12 @@ class Searcher:
961 context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) 1077 context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL)
962 1078
963 context.logger.info( 1079 context.logger.info(
964 - f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", 1080 + f"排序窗口分页切片 | from={from_}, size={size}, 返回={len(sliced)}条",
965 extra={'reqid': context.reqid, 'uid': context.uid} 1081 extra={'reqid': context.reqid, 'uid': context.uid}
966 ) 1082 )
967 1083
968 # 非重排窗口:款式意图在 result_processing 之前执行,便于单独计时且与 ES 召回阶段衔接 1084 # 非重排窗口:款式意图在 result_processing 之前执行,便于单独计时且与 ES 召回阶段衔接
969 - if self._has_style_intent(parsed_query) and not in_rerank_window: 1085 + if self._has_style_intent(parsed_query) and not in_rank_window:
970 es_hits_pre = es_response.get("hits", {}).get("hits") or [] 1086 es_hits_pre = es_response.get("hits", {}).get("hits") or []
971 style_intent_decisions = self._apply_style_intent_to_hits( 1087 style_intent_decisions = self._apply_style_intent_to_hits(
972 es_hits_pre, 1088 es_hits_pre,
@@ -1259,7 +1375,7 @@ class Searcher: @@ -1259,7 +1375,7 @@ class Searcher:
1259 # Collect debug information if requested 1375 # Collect debug information if requested
1260 debug_info = None 1376 debug_info = None
1261 if debug: 1377 if debug:
1262 - query_tokens = getattr(parsed_query, "query_tokens", []) if parsed_query else [] 1378 + query_tokens = parsed_query.query_tokens if parsed_query else []
1263 token_count = len(query_tokens) 1379 token_count = len(query_tokens)
1264 text_knn_is_long = token_count >= 5 1380 text_knn_is_long = token_count >= 5
1265 text_knn_k = self.query_builder.knn_text_k_long if text_knn_is_long else self.query_builder.knn_text_k 1381 text_knn_k = self.query_builder.knn_text_k_long if text_knn_is_long else self.query_builder.knn_text_k
@@ -1279,7 +1395,7 @@ class Searcher: @@ -1279,7 +1395,7 @@ class Searcher:
1279 "translations": context.query_analysis.translations, 1395 "translations": context.query_analysis.translations,
1280 "keywords_queries": context.query_analysis.keywords_queries, 1396 "keywords_queries": context.query_analysis.keywords_queries,
1281 "has_vector": context.query_analysis.query_vector is not None, 1397 "has_vector": context.query_analysis.query_vector is not None,
1282 - "has_image_vector": getattr(parsed_query, "image_query_vector", None) is not None, 1398 + "has_image_vector": parsed_query.image_query_vector is not None,
1283 "query_tokens": query_tokens, 1399 "query_tokens": query_tokens,
1284 "intent_detection": context.get_intermediate_result("style_intent_profile"), 1400 "intent_detection": context.get_intermediate_result("style_intent_profile"),
1285 }, 1401 },
@@ -1298,9 +1414,10 @@ class Searcher: @@ -1298,9 +1414,10 @@ class Searcher:
1298 }, 1414 },
1299 "image_knn": { 1415 "image_knn": {
1300 "enabled": bool( 1416 "enabled": bool(
1301 - enable_embedding 1417 + self.image_embedding_field
  1418 + and enable_embedding
1302 and parsed_query 1419 and parsed_query
1303 - and getattr(parsed_query, "image_query_vector", None) is not None 1420 + and image_query_vector is not None
1304 ), 1421 ),
1305 "k": self.query_builder.knn_image_k, 1422 "k": self.query_builder.knn_image_k,
1306 "num_candidates": self.query_builder.knn_image_num_candidates, 1423 "num_candidates": self.query_builder.knn_image_num_candidates,
@@ -1311,9 +1428,8 @@ class Searcher: @@ -1311,9 +1428,8 @@ class Searcher:
1311 "es_query_context": { 1428 "es_query_context": {
1312 "es_fetch_from": es_fetch_from, 1429 "es_fetch_from": es_fetch_from,
1313 "es_fetch_size": es_fetch_size, 1430 "es_fetch_size": es_fetch_size,
1314 - "in_rerank_window": in_rerank_window,  
1315 - "rerank_prefetch_source": context.get_intermediate_result('es_query_rerank_prefetch_source'),  
1316 - "include_named_queries_score": bool(do_rerank and in_rerank_window), 1431 + "in_rank_window": in_rank_window,
  1432 + "include_named_queries_score": bool(in_rank_window),
1317 }, 1433 },
1318 "es_response": { 1434 "es_response": {
1319 "took_ms": es_response.get('took', 0), 1435 "took_ms": es_response.get('took', 0),
@@ -1369,10 +1485,10 @@ class Searcher: @@ -1369,10 +1485,10 @@ class Searcher:
1369 "retrieval_plan": debug_info["retrieval_plan"], 1485 "retrieval_plan": debug_info["retrieval_plan"],
1370 "ranking_windows": { 1486 "ranking_windows": {
1371 "es_fetch_size": es_fetch_size, 1487 "es_fetch_size": es_fetch_size,
1372 - "coarse_output_window": coarse_output_window if do_rerank and in_rerank_window else None,  
1373 - "fine_input_window": fine_input_window if do_rerank and in_rerank_window else None,  
1374 - "fine_output_window": fine_output_window if do_rerank and in_rerank_window else None,  
1375 - "rerank_window": rerank_window if do_rerank and in_rerank_window else None, 1488 + "coarse_output_window": coarse_output_window if in_rank_window else None,
  1489 + "fine_input_window": fine_input_window if in_rank_window else None,
  1490 + "fine_output_window": fine_output_window if in_rank_window else None,
  1491 + "rerank_window": rerank_window if in_rank_window else None,
1376 "page_from": from_, 1492 "page_from": from_,
1377 "page_size": size, 1493 "page_size": size,
1378 }, 1494 },
tests/test_embedding_pipeline.py
  1 +from dataclasses import asdict
1 from typing import Any, Dict, List, Optional 2 from typing import Any, Dict, List, Optional
2 3
3 import numpy as np 4 import numpy as np
tests/test_search_rerank_window.py
1 from __future__ import annotations 1 from __future__ import annotations
2 2
3 -from dataclasses import dataclass 3 +from dataclasses import dataclass, field
4 from pathlib import Path 4 from pathlib import Path
5 from types import SimpleNamespace 5 from types import SimpleNamespace
6 from typing import Any, Dict, List 6 from typing import Any, Dict, List
@@ -30,7 +30,10 @@ class _FakeParsedQuery: @@ -30,7 +30,10 @@ class _FakeParsedQuery:
30 rewritten_query: str 30 rewritten_query: str
31 detected_language: str = "en" 31 detected_language: str = "en"
32 translations: Dict[str, str] = None 32 translations: Dict[str, str] = None
  33 + keywords_queries: Dict[str, str] = field(default_factory=dict)
33 query_vector: Any = None 34 query_vector: Any = None
  35 + image_query_vector: Any = None
  36 + query_tokens: List[str] = field(default_factory=list)
34 style_intent_profile: Any = None 37 style_intent_profile: Any = None
35 38
36 def text_for_rerank(self) -> str: 39 def text_for_rerank(self) -> str:
@@ -89,6 +92,15 @@ class _FakeQueryParser: @@ -89,6 +92,15 @@ class _FakeQueryParser:
89 92
90 93
91 class _FakeQueryBuilder: 94 class _FakeQueryBuilder:
  95 + knn_text_k = 120
  96 + knn_text_k_long = 160
  97 + knn_text_num_candidates = 400
  98 + knn_text_num_candidates_long = 500
  99 + knn_text_boost = 20.0
  100 + knn_image_k = 120
  101 + knn_image_num_candidates = 400
  102 + knn_image_boost = 20.0
  103 +
92 def build_query(self, **kwargs): 104 def build_query(self, **kwargs):
93 return { 105 return {
94 "query": {"match_all": {}}, 106 "query": {"match_all": {}},
@@ -583,7 +595,7 @@ def test_searcher_rerank_prefetch_source_includes_sku_fields_when_style_intent_a @@ -583,7 +595,7 @@ def test_searcher_rerank_prefetch_source_includes_sku_fields_when_style_intent_a
583 } 595 }
584 596
585 597
586 -def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch): 598 +def test_searcher_keeps_previous_stage_order_when_request_explicitly_disables_rerank(monkeypatch):
587 es_client = _FakeESClient() 599 es_client = _FakeESClient()
588 searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client) 600 searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client)
589 context = create_request_context(reqid="t2", uid="u2") 601 context = create_request_context(reqid="t2", uid="u2")
@@ -593,28 +605,95 @@ def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch): @@ -593,28 +605,95 @@ def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch):
593 lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}), 605 lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
594 ) 606 )
595 607
596 - called: Dict[str, int] = {"count": 0} 608 + called: Dict[str, int] = {"count": 0, "fine": 0}
  609 +
  610 + def _fake_run_lightweight_rerank(**kwargs):
  611 + called["fine"] += 1
  612 + hits = kwargs["es_hits"]
  613 + for idx, hit in enumerate(hits):
  614 + hit["_fine_score"] = float(idx + 1)
  615 + hits.reverse()
  616 + return [hit["_fine_score"] for hit in hits], {"stage": "fine"}, []
597 617
598 def _fake_run_rerank(**kwargs): 618 def _fake_run_rerank(**kwargs):
599 called["count"] += 1 619 called["count"] += 1
600 return kwargs["es_response"], None, [] 620 return kwargs["es_response"], None, []
601 621
  622 + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank)
602 monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank) 623 monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
603 624
604 - searcher.search( 625 + result = searcher.search(
605 query="toy", 626 query="toy",
606 tenant_id="162", 627 tenant_id="162",
607 from_=20, 628 from_=20,
608 size=10, 629 size=10,
609 context=context, 630 context=context,
610 enable_rerank=False, 631 enable_rerank=False,
  632 + debug=True,
611 ) 633 )
612 634
613 assert called["count"] == 0 635 assert called["count"] == 0
614 - assert es_client.calls[0]["from_"] == 20  
615 - assert es_client.calls[0]["size"] == 10  
616 - assert es_client.calls[0]["include_named_queries_score"] is False  
617 - assert len(es_client.calls) == 1 636 + assert called["fine"] == 1
  637 + assert es_client.calls[0]["from_"] == 0
  638 + assert es_client.calls[0]["size"] == searcher.config.coarse_rank.input_window
  639 + assert es_client.calls[0]["include_named_queries_score"] is True
  640 + assert len(es_client.calls) == 3
  641 + assert es_client.calls[2]["body"]["query"]["ids"]["values"] == [str(i) for i in range(363, 353, -1)]
  642 + assert len(result.results) == 10
  643 + assert [item.spu_id for item in result.results[:3]] == ["363", "362", "361"]
  644 + assert result.debug_info["rerank"]["enabled"] is False
  645 + assert result.debug_info["rerank"]["applied"] is False
  646 + assert result.debug_info["rerank"]["skipped_reason"] == "disabled"
  647 + assert result.debug_info["per_result"][0]["ranking_funnel"]["rerank"]["rank"] == 21
  648 +
  649 +
  650 +def test_searcher_keeps_previous_stage_order_when_config_disables_rerank(monkeypatch):
  651 + es_client = _FakeESClient()
  652 + searcher = _build_searcher(_build_search_config(rerank_enabled=False), es_client)
  653 + context = create_request_context(reqid="t2b", uid="u2b")
  654 +
  655 + monkeypatch.setattr(
  656 + "search.searcher.get_tenant_config_loader",
  657 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
  658 + )
  659 +
  660 + called: Dict[str, int] = {"count": 0, "fine": 0}
  661 +
  662 + def _fake_run_lightweight_rerank(**kwargs):
  663 + called["fine"] += 1
  664 + hits = kwargs["es_hits"]
  665 + hits.reverse()
  666 + for idx, hit in enumerate(hits):
  667 + hit["_fine_score"] = float(len(hits) - idx)
  668 + return [hit["_fine_score"] for hit in hits], {"stage": "fine"}, []
  669 +
  670 + def _fake_run_rerank(**kwargs):
  671 + called["count"] += 1
  672 + return kwargs["es_response"], None, []
  673 +
  674 + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank)
  675 + monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
  676 +
  677 + result = searcher.search(
  678 + query="toy",
  679 + tenant_id="162",
  680 + from_=0,
  681 + size=5,
  682 + context=context,
  683 + enable_rerank=None,
  684 + debug=True,
  685 + )
  686 +
  687 + assert called["count"] == 0
  688 + assert called["fine"] == 1
  689 + assert es_client.calls[0]["from_"] == 0
  690 + assert es_client.calls[0]["size"] == searcher.config.coarse_rank.input_window
  691 + assert es_client.calls[0]["include_named_queries_score"] is True
  692 + assert len(result.results) == 5
  693 + assert [item.spu_id for item in result.results] == ["383", "382", "381", "380", "379"]
  694 + assert result.debug_info["rerank"]["enabled"] is False
  695 + assert result.debug_info["rerank"]["applied"] is False
  696 + assert result.debug_info["rerank"]["skipped_reason"] == "disabled"
618 697
619 698
620 def test_searcher_skips_rerank_when_page_exceeds_window(monkeypatch): 699 def test_searcher_skips_rerank_when_page_exceeds_window(monkeypatch):
@@ -919,7 +998,8 @@ def test_searcher_promotes_sku_by_embedding_when_query_has_no_direct_option_matc @@ -919,7 +998,8 @@ def test_searcher_promotes_sku_by_embedding_when_query_has_no_direct_option_matc
919 998
920 def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeypatch): 999 def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeypatch):
921 es_client = _FakeESClient(total_hits=3) 1000 es_client = _FakeESClient(total_hits=3)
922 - searcher = _build_searcher(_build_search_config(rerank_enabled=False), es_client) 1001 + cfg = _build_search_config(rerank_enabled=False)
  1002 + searcher = _build_searcher(cfg, es_client)
923 context = create_request_context(reqid="dbg", uid="u-dbg") 1003 context = create_request_context(reqid="dbg", uid="u-dbg")
924 1004
925 monkeypatch.setattr( 1005 monkeypatch.setattr(
@@ -939,7 +1019,8 @@ def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeyp @@ -939,7 +1019,8 @@ def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeyp
939 1019
940 assert result.debug_info["query_analysis"]["index_languages"] == ["en", "zh"] 1020 assert result.debug_info["query_analysis"]["index_languages"] == ["en", "zh"]
941 assert result.debug_info["query_analysis"]["query_tokens"] == [] 1021 assert result.debug_info["query_analysis"]["query_tokens"] == []
942 - assert result.debug_info["es_query_context"]["es_fetch_size"] == 2 1022 + expected_es_fetch = max(cfg.rerank.rerank_window, cfg.coarse_rank.input_window)
  1023 + assert result.debug_info["es_query_context"]["es_fetch_size"] == expected_es_fetch
943 assert result.debug_info["es_response"]["es_score_normalization_factor"] == 3.0 1024 assert result.debug_info["es_response"]["es_score_normalization_factor"] == 3.0
944 assert result.debug_info["per_result"][0]["initial_rank"] == 1 1025 assert result.debug_info["per_result"][0]["initial_rank"] == 1
945 assert result.debug_info["per_result"][0]["final_rank"] == 1 1026 assert result.debug_info["per_result"][0]["final_rank"] == 1
@@ -970,6 +1051,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable @@ -970,6 +1051,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable
970 lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}), 1051 lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
971 ) 1052 )
972 1053
  1054 + fine_called: Dict[str, int] = {"count": 0}
  1055 +
  1056 + def _fake_run_lightweight_rerank(**kwargs):
  1057 + fine_called["count"] += 1
  1058 + return [], {"stage": "fine"}, []
  1059 +
973 def _fake_run_rerank(**kwargs): 1060 def _fake_run_rerank(**kwargs):
974 hits = kwargs["es_response"]["hits"]["hits"] 1061 hits = kwargs["es_response"]["hits"]["hits"]
975 hits.reverse() 1062 hits.reverse()
@@ -994,6 +1081,7 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable @@ -994,6 +1081,7 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable
994 ) 1081 )
995 return kwargs["es_response"], {"model": "final-reranker"}, fused_debug 1082 return kwargs["es_response"], {"model": "final-reranker"}, fused_debug
996 1083
  1084 + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank)
997 monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank) 1085 monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
998 1086
999 result = searcher.search( 1087 result = searcher.search(
@@ -1008,7 +1096,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable @@ -1008,7 +1096,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable
1008 1096
1009 per_result = {row["spu_id"]: row for row in result.debug_info["per_result"]} 1097 per_result = {row["spu_id"]: row for row in result.debug_info["per_result"]}
1010 moved = per_result["4"]["ranking_funnel"] 1098 moved = per_result["4"]["ranking_funnel"]
1011 - assert moved["fine_rank"]["rank"] is None 1099 + assert fine_called["count"] == 0
  1100 + assert result.debug_info["fine_rank"]["enabled"] is False
  1101 + assert result.debug_info["fine_rank"]["applied"] is False
  1102 + assert result.debug_info["fine_rank"]["skipped_reason"] == "disabled"
  1103 + assert moved["fine_rank"]["rank"] == 5
  1104 + assert moved["fine_rank"]["rank_change"] == 0
1012 assert moved["rerank"]["rank"] == 1 1105 assert moved["rerank"]["rank"] == 1
1013 assert moved["rerank"]["rank_change"] == 4 1106 assert moved["rerank"]["rank_change"] == 4
1014 assert moved["final_page"]["rank_change"] == 0 1107 assert moved["final_page"]["rank_change"] == 0