diff --git a/api/models.py b/api/models.py index 9b9d384..4c767b7 100644 --- a/api/models.py +++ b/api/models.py @@ -154,7 +154,8 @@ class SearchRequest(BaseModel): enable_rerank: Optional[bool] = Field( None, description=( - "是否开启重排(调用外部重排服务对 ES 结果进行二次排序)。" + "是否开启最终重排(调用外部 rerank 服务改写上一阶段顺序)。" + "关闭时仍保留 coarse/fine 流程,仅在 rerank 阶段保序透传。" "不传则使用服务端配置 rerank.enabled(默认开启)。" ) ) diff --git a/config/config.yaml b/config/config.yaml index 2ef3790..5a94a65 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -292,8 +292,8 @@ function_score: # 粗排配置(仅融合 ES 文本/向量信号,不调用模型) coarse_rank: enabled: true - input_window: 700 - output_window: 240 + input_window: 480 + output_window: 160 fusion: es_bias: 10.0 es_exponent: 0.05 @@ -309,6 +309,7 @@ coarse_rank: knn_exponent: 0.4 # 精排配置(轻量 reranker) +# enabled=false 时仍进入 fine 阶段,但保序透传,不调用 fine 模型服务 fine_rank: enabled: false input_window: 160 @@ -319,6 +320,7 @@ fine_rank: service_profile: fine # 重排配置(provider/URL 在 services.rerank) +# enabled=false 时仍进入 rerank 阶段,但保序透传,不调用最终 rerank 服务 rerank: enabled: true rerank_window: 160 @@ -510,7 +512,7 @@ services: default: host: 0.0.0.0 port: 6007 - backend: qwen3_vllm_score + backend: bge runtime_dir: ./.runtime/reranker/default fine: host: 0.0.0.0 diff --git a/docs/issues/a b/docs/issues/a new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/docs/issues/a diff --git a/docs/issues/issue-2026-04-12-test-env.md b/docs/issues/issue-2026-04-12-test-env.md new file mode 100644 index 0000000..60c86e4 --- /dev/null +++ b/docs/issues/issue-2026-04-12-test-env.md @@ -0,0 +1,16 @@ +120.76.41.98 端口22 用户名和密码: +tw twtw@123 (有sudo权限) +这台机器上的目录/home/tw/saas-search 已经部署了本项目 +请帮我运行项目 +1. 帮我checkout一个test环境的分支,这个分支,把重排、翻译模型 都关闭掉,因为这台机gpu显存较小(embedding模型可以保留) +2. 在这个分支,把服务都启动起来 +3. 使用docker,安装一个ES,参考本项目的文档 ES9*.md。因为这台机器已经有一个系统的elasticsearch,为了不相互干扰,将本项目依赖的es9安装到docker,并且在测试环境配置的es地址做适配的工作 + + +1. 不是要禁用6005,而是6005端口已经有对应的文本服务了,直接用就行 +2. 6005其实就是本项目的一个历史早期版本启动起来的,在另外一个目录:/home/tw/SearchEngine,请看他的启动配置 +nohup bash scripts/start_embedding_service.sh > log.start_embedding_service.0412 2>&1 & +是这样启动起来的 +看他陪的文本是用的哪套方案、哪个模型,跟他对齐(我指的是当前的测试分支) + + diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index 5fb26a3..d9cf873 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -20,6 +20,7 @@ CORE_SERVICES=("backend" "indexer" "frontend" "eval-web") OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker") FULL_SERVICES=("${OPTIONAL_SERVICES[@]}" "${CORE_SERVICES[@]}") STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei") +declare -Ag SERVICE_ENABLED_CACHE=() all_services() { echo "${FULL_SERVICES[@]}" @@ -33,6 +34,72 @@ config_python_bin() { fi } +service_enabled_by_config() { + local service="$1" + case "${service}" in + reranker|reranker-fine|translator) + ;; + *) + return 0 + ;; + esac + + if [ -n "${SERVICE_ENABLED_CACHE[${service}]+x}" ]; then + [ "${SERVICE_ENABLED_CACHE[${service}]}" = "1" ] + return + fi + + local pybin + pybin="$(config_python_bin)" + + local enabled + if ! enabled="$( + SERVICE_NAME="${service}" \ + PYTHONPATH="${PROJECT_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" \ + "${pybin}" - <<'PY' +from config.loader import get_app_config +import os + +service = os.environ["SERVICE_NAME"] +cfg = get_app_config() + +enabled = True +if service == "reranker": + enabled = bool(cfg.search.rerank.enabled) +elif service == "reranker-fine": + enabled = bool(cfg.search.fine_rank.enabled) +elif service == "translator": + capabilities = dict(cfg.services.translation.capabilities or {}) + enabled = any(bool((value or {}).get("enabled", True)) for value in capabilities.values()) + +print("1" if enabled else "0") +PY + )"; then + echo "[warn] failed to read config state for ${service}; defaulting to enabled" >&2 + enabled="1" + fi + + SERVICE_ENABLED_CACHE["${service}"]="${enabled}" + [ "${enabled}" = "1" ] +} + +filter_disabled_targets() { + local targets="$1" + local verbose="${2:-quiet}" + local out="" + local svc + + for svc in ${targets}; do + if service_enabled_by_config "${svc}"; then + out="${out} ${svc}" + elif [ "${verbose}" = "verbose" ]; then + echo "[skip] ${svc} disabled by config" >&2 + fi + done + + echo "${out# }" +} + reranker_instance_for_service() { local service="$1" case "${service}" in @@ -468,6 +535,16 @@ stop_monitor_daemon() { start_monitor_daemon() { local targets="$1" + if [ -z "${targets}" ]; then + if is_monitor_daemon_running; then + echo "[info] no enabled services to monitor; stopping monitor daemon" + stop_monitor_daemon + else + echo "[info] no enabled services to monitor" + fi + return 0 + fi + local pf pf="$(monitor_pid_file)" local tf @@ -581,6 +658,10 @@ wait_for_startup_health() { start_one() { local service="$1" cd "${PROJECT_ROOT}" + if ! service_enabled_by_config "${service}"; then + echo "[skip] ${service} disabled by config" + return 0 + fi local cmd if ! cmd="$(service_start_cmd "${service}")"; then echo "[error] unknown service: ${service}" >&2 @@ -953,6 +1034,7 @@ main() { load_env_file "${PROJECT_ROOT}/.env" local targets="" + local effective_targets="" local monitor_was_running=0 local monitor_prev_targets="" local auto_monitor_on_start="${SERVICE_CTL_AUTO_MONITOR_ON_START:-1}" @@ -976,12 +1058,23 @@ main() { ;; esac + effective_targets="${targets}" + case "${action}" in + up|start|restart|monitor|monitor-start) + effective_targets="$(filter_disabled_targets "${targets}" "verbose")" + ;; + esac + case "${action}" in up) - for svc in ${targets}; do + if [ -z "${effective_targets}" ]; then + echo "[info] no enabled services in target set" + exit 0 + fi + for svc in ${effective_targets}; do start_one "${svc}" done - start_monitor_daemon "${targets}" + start_monitor_daemon "${effective_targets}" ;; down) stop_monitor_daemon @@ -990,11 +1083,15 @@ main() { done ;; start) - for svc in ${targets}; do + if [ -z "${effective_targets}" ]; then + echo "[info] no enabled services in target set" + exit 0 + fi + for svc in ${effective_targets}; do start_one "${svc}" done if [ "${auto_monitor_on_start}" = "1" ]; then - start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${targets}")" + start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${effective_targets}")" fi ;; stop) @@ -1025,16 +1122,17 @@ main() { for svc in ${restart_stop_targets}; do stop_one "${svc}" done - for svc in ${targets}; do + for svc in ${effective_targets}; do start_one "${svc}" done if [ "${monitor_was_running}" -eq 1 ]; then monitor_prev_targets="$(normalize_targets "${monitor_prev_targets}")" + monitor_prev_targets="$(filter_disabled_targets "${monitor_prev_targets}" "quiet")" monitor_prev_targets="$(apply_target_order monitor "${monitor_prev_targets}")" - [ -z "${monitor_prev_targets}" ] && monitor_prev_targets="${targets}" + [ -z "${monitor_prev_targets}" ] && monitor_prev_targets="${effective_targets}" start_monitor_daemon "${monitor_prev_targets}" elif [ "${auto_monitor_on_start}" = "1" ]; then - start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${targets}")" + start_monitor_daemon "$(merge_targets "$(monitor_current_targets)" "${effective_targets}")" fi ;; status) @@ -1044,10 +1142,14 @@ main() { monitor_daemon_status ;; monitor) - monitor_services "${targets}" + if [ -z "${effective_targets}" ]; then + echo "[info] no enabled services in target set" + exit 0 + fi + monitor_services "${effective_targets}" ;; monitor-start) - start_monitor_daemon "${targets}" + start_monitor_daemon "${effective_targets}" ;; monitor-stop) stop_monitor_daemon diff --git a/search/searcher.py b/search/searcher.py index 5d3c6fa..d3537c3 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -401,7 +401,9 @@ class Searcher: language: Response / field selection language hint (e.g. zh, en) sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides - whether the rerank provider is invoked (subject to rerank window). + whether the final rerank provider is invoked (subject to rank window). + When false, the ranking pipeline still runs and rerank stage becomes + pass-through. rerank_query_template: Override for rerank query text template; None uses ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``). rerank_doc_template: Override for per-hit document text passed to rerank; @@ -430,15 +432,16 @@ class Searcher: # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启) rerank_enabled_by_config = bool(rc.enabled) do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank) + fine_enabled = bool(fine_cfg.enabled) rerank_window = rc.rerank_window coarse_input_window = max(rerank_window, int(coarse_cfg.input_window)) coarse_output_window = max(rerank_window, int(coarse_cfg.output_window)) fine_input_window = max(rerank_window, int(fine_cfg.input_window)) fine_output_window = max(rerank_window, int(fine_cfg.output_window)) - # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES - in_rerank_window = do_rerank and (from_ + size) <= rerank_window - es_fetch_from = 0 if in_rerank_window else from_ - es_fetch_size = coarse_input_window if in_rerank_window else size + # 多阶段排序窗口独立于最终 rerank 开关:即使关闭最终 rerank,也保留 coarse/fine 流程。 + in_rank_window = (from_ + size) <= rerank_window + es_fetch_from = 0 if in_rank_window else from_ + es_fetch_size = coarse_input_window if in_rank_window else size es_score_normalization_factor: Optional[float] = None initial_ranks_by_doc: Dict[str, int] = {} @@ -455,7 +458,8 @@ class Searcher: context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, " - f"enable_rerank(effective)={do_rerank}, in_rerank_window={in_rerank_window}, " + f"fine_enabled(config)={fine_enabled}, " + f"enable_rerank(effective)={do_rerank}, in_rank_window={in_rank_window}, " f"es_fetch=({es_fetch_from},{es_fetch_size}) | " f"index_languages={index_langs} | " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", @@ -468,8 +472,9 @@ class Searcher: 'from_': from_, 'es_fetch_from': es_fetch_from, 'es_fetch_size': es_fetch_size, - 'in_rerank_window': in_rerank_window, + 'in_rank_window': in_rank_window, 'rerank_enabled_by_config': rerank_enabled_by_config, + 'fine_enabled': fine_enabled, 'enable_rerank_request': enable_rerank, 'rerank_query_template': effective_query_template, 'rerank_doc_template': effective_doc_template, @@ -494,6 +499,7 @@ class Searcher: context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, + 'fine_enabled': fine_enabled, 'rerank_enabled': do_rerank, 'style_intent_enabled': bool(self.style_intent_registry.enabled), } @@ -526,7 +532,7 @@ class Searcher: f"语言: {parsed_query.detected_language} | " f"关键词: {parsed_query.keywords_queries} | " f"文本向量: {'是' if parsed_query.query_vector is not None else '否'} | " - f"图片向量: {'是' if getattr(parsed_query, 'image_query_vector', None) is not None else '否'}", + f"图片向量: {'是' if parsed_query.image_query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: @@ -545,17 +551,16 @@ class Searcher: # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # index_name = "search_products" - + # No longer need to add tenant_id to filters since each tenant has its own index + image_query_vector = None + if enable_embedding: + image_query_vector = parsed_query.image_query_vector es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.query_normalized, query_vector=parsed_query.query_vector if enable_embedding else None, - image_query_vector=( - getattr(parsed_query, "image_query_vector", None) - if enable_embedding - else None - ), + image_query_vector=image_query_vector, filters=filters, range_filters=range_filters, facet_configs=facets, @@ -563,7 +568,7 @@ class Searcher: from_=es_fetch_from, enable_knn=enable_embedding and ( parsed_query.query_vector is not None - or getattr(parsed_query, "image_query_vector", None) is not None + or image_query_vector is not None ), min_score=min_score, parsed_query=parsed_query, @@ -587,8 +592,7 @@ class Searcher: # In multi-stage rank window, first pass only needs score signals for coarse rank. es_query_for_fetch = es_query - rerank_prefetch_source = None - if in_rerank_window: + if in_rank_window: es_query_for_fetch = dict(es_query) es_query_for_fetch["_source"] = False @@ -597,31 +601,28 @@ class Searcher: # Store ES query in context context.store_intermediate_result('es_query', es_query) - if in_rerank_window and rerank_prefetch_source is not None: - context.store_intermediate_result('es_query_rerank_prefetch_source', rerank_prefetch_source) # Serialize ES query to compute a compact size + stable digest for correlation es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":")) es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16] knn_enabled = bool(enable_embedding and ( parsed_query.query_vector is not None - or getattr(parsed_query, "image_query_vector", None) is not None + or image_query_vector is not None )) vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0 image_vector_dims = ( - int(len(parsed_query.image_query_vector)) - if getattr(parsed_query, "image_query_vector", None) is not None + int(len(image_query_vector)) + if image_query_vector is not None else 0 ) context.logger.info( - "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | image_vector_dims: %s | facets: %s | rerank_prefetch_source: %s", + "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | image_vector_dims: %s | facets: %s", len(es_query_compact), es_query_digest, "yes" if knn_enabled else "no", vector_dims, image_vector_dims, "yes" if facets else "no", - rerank_prefetch_source, extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ @@ -656,7 +657,7 @@ class Searcher: body=body_for_es, size=es_fetch_size, from_=es_fetch_from, - include_named_queries_score=bool(do_rerank and in_rerank_window), + include_named_queries_score=bool(in_rank_window), ) # Store ES response in context @@ -698,10 +699,177 @@ class Searcher: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) style_intent_decisions: Dict[str, SkuSelectionDecision] = {} - if do_rerank and in_rerank_window: + if in_rank_window: from dataclasses import asdict from config.services_config import get_rerank_backend_config, get_rerank_service_url from .rerank_client import coarse_resort_hits, run_lightweight_rerank, run_rerank + coarse_fusion_debug = asdict(coarse_cfg.fusion) + stage_fusion_debug = asdict(rc.fusion) + + def _rank_map(stage_hits: List[Dict[str, Any]]) -> Dict[str, int]: + return { + str(hit.get("_id")): rank + for rank, hit in enumerate(stage_hits, 1) + if hit.get("_id") is not None + } + + def _stage_debug_info( + *, + enabled: bool, + applied: bool, + skipped_reason: Optional[str], + service_profile: Optional[str], + query_template: str, + doc_template: str, + docs_in: int, + docs_out: int, + top_n: int, + meta: Optional[Dict[str, Any]] = None, + backend: Optional[str] = None, + backend_model_name: Optional[str] = None, + service_url: Optional[str] = None, + model: Optional[str] = None, + fusion: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + return { + "enabled": enabled, + "applied": applied, + "passthrough": not applied, + "skipped_reason": skipped_reason, + "service_profile": service_profile, + "service_url": service_url, + "backend": backend, + "model": model, + "backend_model_name": backend_model_name, + "query_template": query_template, + "doc_template": doc_template, + "query_text": str(query_template).format_map({"query": rerank_query}), + "docs_in": docs_in, + "docs_out": docs_out, + "top_n": top_n, + "meta": meta, + "fusion": fusion, + } + + def _run_optional_stage( + *, + stage: RequestContextStage, + stage_label: str, + enabled: bool, + stage_hits: List[Dict[str, Any]], + input_limit: int, + output_limit: int, + service_profile: Optional[str], + query_template: str, + doc_template: str, + top_n: int, + debug_key: Optional[str], + runner, + ) -> tuple[List[Dict[str, Any]], Dict[str, int], Optional[Dict[str, Any]]]: + context.start_stage(stage) + try: + input_hits = list(stage_hits[:input_limit]) + output_hits = list(stage_hits[:output_limit]) + applied = False + skip_reason: Optional[str] = None + meta: Optional[Dict[str, Any]] = None + debug_rows: Optional[List[Dict[str, Any]]] = None + + if enabled and input_hits: + output_hits_candidate, applied, meta, debug_rows = runner(input_hits) + if applied: + output_hits = list((output_hits_candidate or input_hits)[:output_limit]) + else: + skip_reason = "service_returned_none" + else: + skip_reason = "disabled" if not enabled else "no_hits" + + ranks = _rank_map(output_hits) if debug else {} + stage_info = None + if debug: + if applied: + backend_name, backend_cfg = get_rerank_backend_config(service_profile) + stage_info = _stage_debug_info( + enabled=True, + applied=True, + skipped_reason=None, + service_profile=service_profile, + service_url=get_rerank_service_url(profile=service_profile), + backend=backend_name, + backend_model_name=backend_cfg.get("model_name"), + model=meta.get("model") if isinstance(meta, dict) else None, + query_template=query_template, + doc_template=doc_template, + docs_in=len(input_hits), + docs_out=len(output_hits), + top_n=top_n, + meta=meta, + fusion=stage_fusion_debug, + ) + if debug_key is not None and debug_rows is not None: + context.store_intermediate_result(debug_key, debug_rows) + else: + stage_info = _stage_debug_info( + enabled=enabled, + applied=False, + skipped_reason=skip_reason, + service_profile=service_profile, + query_template=query_template, + doc_template=doc_template, + docs_in=len(input_hits), + docs_out=len(output_hits), + top_n=top_n, + fusion=stage_fusion_debug, + ) + + if applied: + context.logger.info( + "%s完成 | docs=%s | top_n=%s | meta=%s", + stage_label, + len(output_hits), + top_n, + meta, + extra={'reqid': context.reqid, 'uid': context.uid} + ) + else: + context.logger.info( + "%s透传 | reason=%s | docs=%s | top_n=%s", + stage_label, + skip_reason, + len(output_hits), + top_n, + extra={'reqid': context.reqid, 'uid': context.uid} + ) + return output_hits, ranks, stage_info + except Exception as e: + output_hits = list(stage_hits[:output_limit]) + ranks = _rank_map(output_hits) if debug else {} + stage_info = None + if debug: + stage_info = _stage_debug_info( + enabled=enabled, + applied=False, + skipped_reason="error", + service_profile=service_profile, + query_template=query_template, + doc_template=doc_template, + docs_in=min(len(stage_hits), input_limit), + docs_out=len(output_hits), + top_n=top_n, + meta={"error": str(e)}, + fusion=stage_fusion_debug, + ) + context.add_warning(f"{stage_label} failed: {e}") + context.logger.warning( + "调用%s服务失败 | error: %s", + stage_label, + e, + extra={'reqid': context.reqid, 'uid': context.uid}, + exc_info=True, + ) + return output_hits, ranks, stage_info + finally: + context.end_stage(stage) rerank_query = parsed_query.text_for_rerank() if parsed_query else query hits = es_response.get("hits", {}).get("hits") or [] @@ -716,17 +884,12 @@ class Searcher: hits = hits[:coarse_output_window] es_response.setdefault("hits", {})["hits"] = hits if debug: - coarse_ranks_by_doc = { - str(hit.get("_id")): rank - for rank, hit in enumerate(hits, 1) - if hit.get("_id") is not None + coarse_ranks_by_doc = _rank_map(hits) + coarse_debug_info = { + "docs_in": es_fetch_size, + "docs_out": len(hits), + "fusion": coarse_fusion_debug, } - if debug: - coarse_debug_info = { - "docs_in": es_fetch_size, - "docs_out": len(hits), - "fusion": asdict(coarse_cfg.fusion), - } context.store_intermediate_result("coarse_rank_scores", coarse_debug) context.logger.info( "粗排完成 | docs_in=%s | docs_out=%s", @@ -777,72 +940,42 @@ class Searcher: extra={'reqid': context.reqid, 'uid': context.uid} ) - fine_scores: Optional[List[float]] = None - hits = es_response.get("hits", {}).get("hits") or [] - if fine_cfg.enabled and hits: - context.start_stage(RequestContextStage.FINE_RANKING) - try: - fine_scores, fine_meta, fine_debug_rows = run_lightweight_rerank( - query=rerank_query, - es_hits=hits[:fine_input_window], - language=language, - timeout_sec=fine_cfg.timeout_sec, - rerank_query_template=fine_query_template, - rerank_doc_template=fine_doc_template, - top_n=fine_output_window, - debug=debug, - fusion=rc.fusion, - style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, - service_profile=fine_cfg.service_profile, - ) - if fine_scores is not None: - hits = hits[:fine_output_window] - es_response["hits"]["hits"] = hits - if debug: - fine_ranks_by_doc = { - str(hit.get("_id")): rank - for rank, hit in enumerate(hits, 1) - if hit.get("_id") is not None - } - fine_backend_name, fine_backend_cfg = get_rerank_backend_config(fine_cfg.service_profile) - fine_debug_info = { - "service_profile": fine_cfg.service_profile, - "service_url": get_rerank_service_url(profile=fine_cfg.service_profile), - "backend": fine_backend_name, - "model": fine_meta.get("model") if isinstance(fine_meta, dict) else None, - "backend_model_name": fine_backend_cfg.get("model_name"), - "query_template": fine_query_template, - "doc_template": fine_doc_template, - "query_text": str(fine_query_template).format_map({"query": rerank_query}), - "docs_in": min(len(fine_scores), fine_input_window), - "docs_out": len(hits), - "top_n": fine_output_window, - "meta": fine_meta, - "fusion": asdict(rc.fusion), - } - context.store_intermediate_result("fine_rank_scores", fine_debug_rows) - context.logger.info( - "精排完成 | docs=%s | top_n=%s | meta=%s", - len(hits), - fine_output_window, - fine_meta, - extra={'reqid': context.reqid, 'uid': context.uid} - ) - except Exception as e: - context.add_warning(f"Fine rerank failed: {e}") - context.logger.warning( - f"调用精排服务失败 | error: {e}", - extra={'reqid': context.reqid, 'uid': context.uid}, - exc_info=True, - ) - finally: - context.end_stage(RequestContextStage.FINE_RANKING) + def _run_fine_stage(stage_input: List[Dict[str, Any]]): + fine_scores, fine_meta, fine_debug_rows = run_lightweight_rerank( + query=rerank_query, + es_hits=stage_input, + language=language, + timeout_sec=fine_cfg.timeout_sec, + rerank_query_template=fine_query_template, + rerank_doc_template=fine_doc_template, + top_n=fine_output_window, + debug=debug, + fusion=rc.fusion, + style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, + service_profile=fine_cfg.service_profile, + ) + return stage_input, fine_scores is not None, fine_meta, fine_debug_rows + + hits, fine_ranks_by_doc, fine_debug_info = _run_optional_stage( + stage=RequestContextStage.FINE_RANKING, + stage_label="精排", + enabled=fine_enabled, + stage_hits=es_response.get("hits", {}).get("hits") or [], + input_limit=fine_input_window, + output_limit=fine_output_window, + service_profile=fine_cfg.service_profile, + query_template=fine_query_template, + doc_template=fine_doc_template, + top_n=fine_output_window, + debug_key="fine_rank_scores", + runner=_run_fine_stage, + ) + es_response["hits"]["hits"] = hits - context.start_stage(RequestContextStage.RERANKING) - try: - final_hits = es_response.get("hits", {}).get("hits") or [] - final_input = final_hits[:rerank_window] - es_response["hits"]["hits"] = final_input + def _run_rerank_stage(stage_input: List[Dict[str, Any]]): + nonlocal es_response + + es_response["hits"]["hits"] = stage_input es_response, rerank_meta, fused_debug = run_rerank( query=rerank_query, es_response=es_response, @@ -858,48 +991,31 @@ class Searcher: service_profile=rc.service_profile, style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, ) - - if rerank_meta is not None: - if debug: - rerank_ranks_by_doc = { - str(hit.get("_id")): rank - for rank, hit in enumerate(es_response.get("hits", {}).get("hits") or [], 1) - if hit.get("_id") is not None - } - rerank_backend_name, rerank_backend_cfg = get_rerank_backend_config(rc.service_profile) - rerank_debug_info = { - "service_profile": rc.service_profile, - "service_url": get_rerank_service_url(profile=rc.service_profile), - "backend": rerank_backend_name, - "model": rerank_meta.get("model") if isinstance(rerank_meta, dict) else None, - "backend_model_name": rerank_backend_cfg.get("model_name"), - "query_template": effective_query_template, - "doc_template": effective_doc_template, - "query_text": str(effective_query_template).format_map({"query": rerank_query}), - "docs_in": len(final_input), - "docs_out": len(es_response.get("hits", {}).get("hits") or []), - "top_n": from_ + size, - "meta": rerank_meta, - "fusion": asdict(rc.fusion), - } - context.store_intermediate_result("rerank_scores", fused_debug) - context.logger.info( - f"重排完成 | docs={len(es_response.get('hits', {}).get('hits') or [])} | " - f"top_n={from_ + size} | meta={rerank_meta}", - extra={'reqid': context.reqid, 'uid': context.uid} - ) - except Exception as e: - context.add_warning(f"Rerank failed: {e}") - context.logger.warning( - f"调用重排服务失败 | error: {e}", - extra={'reqid': context.reqid, 'uid': context.uid}, - exc_info=True, + return ( + es_response.get("hits", {}).get("hits") or [], + rerank_meta is not None, + rerank_meta, + fused_debug, ) - finally: - context.end_stage(RequestContextStage.RERANKING) - # 当本次请求在重排窗口内时:已按多阶段排序产出前 rerank_window 条,需按请求的 from/size 做分页切片 - if in_rerank_window: + hits, rerank_ranks_by_doc, rerank_debug_info = _run_optional_stage( + stage=RequestContextStage.RERANKING, + stage_label="重排", + enabled=do_rerank, + stage_hits=es_response.get("hits", {}).get("hits") or [], + input_limit=rerank_window, + output_limit=rerank_window, + service_profile=rc.service_profile, + query_template=effective_query_template, + doc_template=effective_doc_template, + top_n=from_ + size, + debug_key="rerank_scores", + runner=_run_rerank_stage, + ) + es_response["hits"]["hits"] = hits + + # 当本次请求在排序窗口内时:已按多阶段排序产出前 rerank_window 条,需按请求的 from/size 做分页切片 + if in_rank_window: hits = es_response.get("hits", {}).get("hits") or [] sliced = hits[from_ : from_ + size] es_response.setdefault("hits", {})["hits"] = sliced @@ -961,12 +1077,12 @@ class Searcher: context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) context.logger.info( - f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", + f"排序窗口分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", extra={'reqid': context.reqid, 'uid': context.uid} ) # 非重排窗口:款式意图在 result_processing 之前执行,便于单独计时且与 ES 召回阶段衔接 - if self._has_style_intent(parsed_query) and not in_rerank_window: + if self._has_style_intent(parsed_query) and not in_rank_window: es_hits_pre = es_response.get("hits", {}).get("hits") or [] style_intent_decisions = self._apply_style_intent_to_hits( es_hits_pre, @@ -1259,7 +1375,7 @@ class Searcher: # Collect debug information if requested debug_info = None if debug: - query_tokens = getattr(parsed_query, "query_tokens", []) if parsed_query else [] + query_tokens = parsed_query.query_tokens if parsed_query else [] token_count = len(query_tokens) text_knn_is_long = token_count >= 5 text_knn_k = self.query_builder.knn_text_k_long if text_knn_is_long else self.query_builder.knn_text_k @@ -1279,7 +1395,7 @@ class Searcher: "translations": context.query_analysis.translations, "keywords_queries": context.query_analysis.keywords_queries, "has_vector": context.query_analysis.query_vector is not None, - "has_image_vector": getattr(parsed_query, "image_query_vector", None) is not None, + "has_image_vector": parsed_query.image_query_vector is not None, "query_tokens": query_tokens, "intent_detection": context.get_intermediate_result("style_intent_profile"), }, @@ -1298,9 +1414,10 @@ class Searcher: }, "image_knn": { "enabled": bool( - enable_embedding + self.image_embedding_field + and enable_embedding and parsed_query - and getattr(parsed_query, "image_query_vector", None) is not None + and image_query_vector is not None ), "k": self.query_builder.knn_image_k, "num_candidates": self.query_builder.knn_image_num_candidates, @@ -1311,9 +1428,8 @@ class Searcher: "es_query_context": { "es_fetch_from": es_fetch_from, "es_fetch_size": es_fetch_size, - "in_rerank_window": in_rerank_window, - "rerank_prefetch_source": context.get_intermediate_result('es_query_rerank_prefetch_source'), - "include_named_queries_score": bool(do_rerank and in_rerank_window), + "in_rank_window": in_rank_window, + "include_named_queries_score": bool(in_rank_window), }, "es_response": { "took_ms": es_response.get('took', 0), @@ -1369,10 +1485,10 @@ class Searcher: "retrieval_plan": debug_info["retrieval_plan"], "ranking_windows": { "es_fetch_size": es_fetch_size, - "coarse_output_window": coarse_output_window if do_rerank and in_rerank_window else None, - "fine_input_window": fine_input_window if do_rerank and in_rerank_window else None, - "fine_output_window": fine_output_window if do_rerank and in_rerank_window else None, - "rerank_window": rerank_window if do_rerank and in_rerank_window else None, + "coarse_output_window": coarse_output_window if in_rank_window else None, + "fine_input_window": fine_input_window if in_rank_window else None, + "fine_output_window": fine_output_window if in_rank_window else None, + "rerank_window": rerank_window if in_rank_window else None, "page_from": from_, "page_size": size, }, diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py index 01f587c..95dd7e0 100644 --- a/tests/test_embedding_pipeline.py +++ b/tests/test_embedding_pipeline.py @@ -1,3 +1,4 @@ +from dataclasses import asdict from typing import Any, Dict, List, Optional import numpy as np diff --git a/tests/test_search_rerank_window.py b/tests/test_search_rerank_window.py index 5002b89..c0b776b 100644 --- a/tests/test_search_rerank_window.py +++ b/tests/test_search_rerank_window.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from types import SimpleNamespace from typing import Any, Dict, List @@ -30,7 +30,10 @@ class _FakeParsedQuery: rewritten_query: str detected_language: str = "en" translations: Dict[str, str] = None + keywords_queries: Dict[str, str] = field(default_factory=dict) query_vector: Any = None + image_query_vector: Any = None + query_tokens: List[str] = field(default_factory=list) style_intent_profile: Any = None def text_for_rerank(self) -> str: @@ -89,6 +92,15 @@ class _FakeQueryParser: class _FakeQueryBuilder: + knn_text_k = 120 + knn_text_k_long = 160 + knn_text_num_candidates = 400 + knn_text_num_candidates_long = 500 + knn_text_boost = 20.0 + knn_image_k = 120 + knn_image_num_candidates = 400 + knn_image_boost = 20.0 + def build_query(self, **kwargs): return { "query": {"match_all": {}}, @@ -583,7 +595,7 @@ def test_searcher_rerank_prefetch_source_includes_sku_fields_when_style_intent_a } -def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch): +def test_searcher_keeps_previous_stage_order_when_request_explicitly_disables_rerank(monkeypatch): es_client = _FakeESClient() searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client) context = create_request_context(reqid="t2", uid="u2") @@ -593,28 +605,95 @@ def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch): lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}), ) - called: Dict[str, int] = {"count": 0} + called: Dict[str, int] = {"count": 0, "fine": 0} + + def _fake_run_lightweight_rerank(**kwargs): + called["fine"] += 1 + hits = kwargs["es_hits"] + for idx, hit in enumerate(hits): + hit["_fine_score"] = float(idx + 1) + hits.reverse() + return [hit["_fine_score"] for hit in hits], {"stage": "fine"}, [] def _fake_run_rerank(**kwargs): called["count"] += 1 return kwargs["es_response"], None, [] + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank) monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank) - searcher.search( + result = searcher.search( query="toy", tenant_id="162", from_=20, size=10, context=context, enable_rerank=False, + debug=True, ) assert called["count"] == 0 - assert es_client.calls[0]["from_"] == 20 - assert es_client.calls[0]["size"] == 10 - assert es_client.calls[0]["include_named_queries_score"] is False - assert len(es_client.calls) == 1 + assert called["fine"] == 1 + assert es_client.calls[0]["from_"] == 0 + assert es_client.calls[0]["size"] == searcher.config.coarse_rank.input_window + assert es_client.calls[0]["include_named_queries_score"] is True + assert len(es_client.calls) == 3 + assert es_client.calls[2]["body"]["query"]["ids"]["values"] == [str(i) for i in range(363, 353, -1)] + assert len(result.results) == 10 + assert [item.spu_id for item in result.results[:3]] == ["363", "362", "361"] + assert result.debug_info["rerank"]["enabled"] is False + assert result.debug_info["rerank"]["applied"] is False + assert result.debug_info["rerank"]["skipped_reason"] == "disabled" + assert result.debug_info["per_result"][0]["ranking_funnel"]["rerank"]["rank"] == 21 + + +def test_searcher_keeps_previous_stage_order_when_config_disables_rerank(monkeypatch): + es_client = _FakeESClient() + searcher = _build_searcher(_build_search_config(rerank_enabled=False), es_client) + context = create_request_context(reqid="t2b", uid="u2b") + + monkeypatch.setattr( + "search.searcher.get_tenant_config_loader", + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}), + ) + + called: Dict[str, int] = {"count": 0, "fine": 0} + + def _fake_run_lightweight_rerank(**kwargs): + called["fine"] += 1 + hits = kwargs["es_hits"] + hits.reverse() + for idx, hit in enumerate(hits): + hit["_fine_score"] = float(len(hits) - idx) + return [hit["_fine_score"] for hit in hits], {"stage": "fine"}, [] + + def _fake_run_rerank(**kwargs): + called["count"] += 1 + return kwargs["es_response"], None, [] + + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank) + monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank) + + result = searcher.search( + query="toy", + tenant_id="162", + from_=0, + size=5, + context=context, + enable_rerank=None, + debug=True, + ) + + assert called["count"] == 0 + assert called["fine"] == 1 + assert es_client.calls[0]["from_"] == 0 + assert es_client.calls[0]["size"] == searcher.config.coarse_rank.input_window + assert es_client.calls[0]["include_named_queries_score"] is True + assert len(result.results) == 5 + assert [item.spu_id for item in result.results] == ["383", "382", "381", "380", "379"] + assert result.debug_info["rerank"]["enabled"] is False + assert result.debug_info["rerank"]["applied"] is False + assert result.debug_info["rerank"]["skipped_reason"] == "disabled" def test_searcher_skips_rerank_when_page_exceeds_window(monkeypatch): @@ -919,7 +998,8 @@ def test_searcher_promotes_sku_by_embedding_when_query_has_no_direct_option_matc def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeypatch): es_client = _FakeESClient(total_hits=3) - searcher = _build_searcher(_build_search_config(rerank_enabled=False), es_client) + cfg = _build_search_config(rerank_enabled=False) + searcher = _build_searcher(cfg, es_client) context = create_request_context(reqid="dbg", uid="u-dbg") monkeypatch.setattr( @@ -939,7 +1019,8 @@ def test_searcher_debug_info_uses_initial_es_max_score_for_normalization(monkeyp assert result.debug_info["query_analysis"]["index_languages"] == ["en", "zh"] assert result.debug_info["query_analysis"]["query_tokens"] == [] - assert result.debug_info["es_query_context"]["es_fetch_size"] == 2 + expected_es_fetch = max(cfg.rerank.rerank_window, cfg.coarse_rank.input_window) + assert result.debug_info["es_query_context"]["es_fetch_size"] == expected_es_fetch assert result.debug_info["es_response"]["es_score_normalization_factor"] == 3.0 assert result.debug_info["per_result"][0]["initial_rank"] == 1 assert result.debug_info["per_result"][0]["final_rank"] == 1 @@ -970,6 +1051,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}), ) + fine_called: Dict[str, int] = {"count": 0} + + def _fake_run_lightweight_rerank(**kwargs): + fine_called["count"] += 1 + return [], {"stage": "fine"}, [] + def _fake_run_rerank(**kwargs): hits = kwargs["es_response"]["hits"]["hits"] hits.reverse() @@ -994,6 +1081,7 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable ) return kwargs["es_response"], {"model": "final-reranker"}, fused_debug + monkeypatch.setattr("search.rerank_client.run_lightweight_rerank", _fake_run_lightweight_rerank) monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank) result = searcher.search( @@ -1008,7 +1096,12 @@ def test_searcher_rerank_rank_change_falls_back_to_coarse_rank_when_fine_disable per_result = {row["spu_id"]: row for row in result.debug_info["per_result"]} moved = per_result["4"]["ranking_funnel"] - assert moved["fine_rank"]["rank"] is None + assert fine_called["count"] == 0 + assert result.debug_info["fine_rank"]["enabled"] is False + assert result.debug_info["fine_rank"]["applied"] is False + assert result.debug_info["fine_rank"]["skipped_reason"] == "disabled" + assert moved["fine_rank"]["rank"] == 5 + assert moved["fine_rank"]["rank_change"] == 0 assert moved["rerank"]["rank"] == 1 assert moved["rerank"]["rank_change"] == 4 assert moved["final_page"]["rank_change"] == 0 -- libgit2 0.21.2