""" 重排客户端:调用外部 BGE 重排服务,并对 ES 分数与重排分数进行融合。 流程: 1. 从 ES hits 构造用于重排的文档文本列表 2. POST 请求到重排服务 /rerank,获取每条文档的 relevance 分数 3. 提取 ES 文本/向量子句分数,与重排分数做乘法融合并重排序 """ from typing import Dict, Any, List, Optional, Tuple import logging from config.schema import CoarseRankFusionConfig, RerankFusionConfig from providers import create_rerank_provider logger = logging.getLogger(__name__) # 历史配置项,保留签名兼容;当前乘法融合公式不再使用线性权重。 DEFAULT_WEIGHT_ES = 0.4 DEFAULT_WEIGHT_AI = 0.6 # 重排服务默认超时(文档较多时需更大,建议 config 中 timeout_sec 调大) DEFAULT_TIMEOUT_SEC = 15.0 def build_docs_from_hits( es_hits: List[Dict[str, Any]], language: str = "zh", doc_template: str = "{title}", debug_rows: Optional[List[Dict[str, Any]]] = None, ) -> List[str]: """ 从 ES 命中结果构造重排服务所需的文档文本列表(与 hits 一一对应)。 使用 doc_template 将文档字段组装为重排服务输入。 支持占位符:{title} {brief} {vendor} {description} {category_path} Args: es_hits: ES 返回的 hits 列表,每项含 _source language: 语言代码,如 "zh"、"en" Returns: 与 es_hits 等长的字符串列表,用于 POST /rerank 的 docs """ lang = (language or "zh").strip().lower() if lang not in ("zh", "en"): lang = "zh" def pick_lang_text(obj: Any) -> str: if obj is None: return "" if isinstance(obj, dict): return str(obj.get(lang) or obj.get("zh") or obj.get("en") or "").strip() return str(obj).strip() class _SafeDict(dict): def __missing__(self, key: str) -> str: return "" docs: List[str] = [] only_title = "{title}" == doc_template need_brief = "{brief}" in doc_template need_vendor = "{vendor}" in doc_template need_description = "{description}" in doc_template need_category_path = "{category_path}" in doc_template for hit in es_hits: src = hit.get("_source") or {} title_suffix = str(hit.get("_style_rerank_suffix") or "").strip() title_str=( f"{pick_lang_text(src.get('title'))} {title_suffix}".strip() if title_suffix else pick_lang_text(src.get("title")) ) title_str = str(title_str).strip() if only_title: doc_text = title_str if debug_rows is not None: preview = doc_text if len(doc_text) <= 300 else f"{doc_text[:300]}..." debug_rows.append({ "doc_template": doc_template, "title_suffix": title_suffix or None, "fields": { "title": title_str, }, "doc_preview": preview, "doc_length": len(doc_text), }) else: values = _SafeDict( title=title_str, brief=pick_lang_text(src.get("brief")) if need_brief else "", vendor=pick_lang_text(src.get("vendor")) if need_vendor else "", description=pick_lang_text(src.get("description")) if need_description else "", category_path=pick_lang_text(src.get("category_path")) if need_category_path else "", ) doc_text = str(doc_template).format_map(values) if debug_rows is not None: preview = doc_text if len(doc_text) <= 300 else f"{doc_text[:300]}..." debug_rows.append({ "doc_template": doc_template, "title_suffix": title_suffix or None, "fields": { "title": title_str, "brief": values.get("brief") or None, "vendor": values.get("vendor") or None, "category_path": values.get("category_path") or None }, "doc_preview": preview, "doc_length": len(doc_text), }) docs.append(doc_text) return docs def call_rerank_service( query: str, docs: List[str], timeout_sec: float = DEFAULT_TIMEOUT_SEC, top_n: Optional[int] = None, service_profile: Optional[str] = None, ) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]: """ 调用重排服务 POST /rerank,返回分数列表与 meta。 Provider 和 URL 从 services_config 读取。 """ if not docs: return [], {} try: client = create_rerank_provider(service_profile=service_profile) return client.rerank(query=query, docs=docs, timeout_sec=timeout_sec, top_n=top_n) except Exception as e: logger.warning("Rerank request failed: %s", e, exc_info=True) return None, None def _to_score(value: Any) -> float: try: if value is None: return 0.0 return float(value) except (TypeError, ValueError): return 0.0 def _extract_named_query_score(matched_queries: Any, name: str) -> float: if isinstance(matched_queries, dict): return _to_score(matched_queries.get(name)) if isinstance(matched_queries, list): return 1.0 if name in matched_queries else 0.0 return 0.0 def _resolve_named_query_score( matched_queries: Any, *, preferred_names: List[str], fallback_names: List[str], ) -> Tuple[float, Optional[str], float, Optional[str]]: preferred_score = 0.0 preferred_name: Optional[str] = None for name in preferred_names: score = _extract_named_query_score(matched_queries, name) if score > 0.0: preferred_score = score preferred_name = name break fallback_score = 0.0 fallback_name: Optional[str] = None for name in fallback_names: score = _extract_named_query_score(matched_queries, name) if score > 0.0: fallback_score = score fallback_name = name break if preferred_name is None and preferred_names: preferred_name = preferred_names[0] preferred_score = _extract_named_query_score(matched_queries, preferred_name) if fallback_name is None and fallback_names: fallback_name = fallback_names[0] fallback_score = _extract_named_query_score(matched_queries, fallback_name) if preferred_score > 0.0: return preferred_score, preferred_name, fallback_score, fallback_name return fallback_score, fallback_name, preferred_score, preferred_name def _collect_knn_score_components( matched_queries: Any, fusion: RerankFusionConfig, ) -> Dict[str, float]: text_knn_score, text_knn_source, _, _ = _resolve_named_query_score( matched_queries, preferred_names=["exact_text_knn_query"], fallback_names=["knn_query"], ) image_knn_score, image_knn_source, _, _ = _resolve_named_query_score( matched_queries, preferred_names=["exact_image_knn_query"], fallback_names=["image_knn_query"], ) exact_text_knn_score = _extract_named_query_score(matched_queries, "exact_text_knn_query") exact_image_knn_score = _extract_named_query_score(matched_queries, "exact_image_knn_query") approx_text_knn_score = _extract_named_query_score(matched_queries, "knn_query") approx_image_knn_score = _extract_named_query_score(matched_queries, "image_knn_query") weighted_text_knn_score = text_knn_score * float(fusion.knn_text_weight) weighted_image_knn_score = image_knn_score * float(fusion.knn_image_weight) weighted_components = [weighted_text_knn_score, weighted_image_knn_score] primary_knn_score = max(weighted_components) support_knn_score = sum(weighted_components) - primary_knn_score knn_score = primary_knn_score + float(fusion.knn_tie_breaker) * support_knn_score return { "text_knn_score": text_knn_score, "image_knn_score": image_knn_score, "exact_text_knn_score": exact_text_knn_score, "exact_image_knn_score": exact_image_knn_score, "approx_text_knn_score": approx_text_knn_score, "approx_image_knn_score": approx_image_knn_score, "text_knn_source": text_knn_source, "image_knn_source": image_knn_source, "approx_text_knn_source": "knn_query", "approx_image_knn_source": "image_knn_query", "weighted_text_knn_score": weighted_text_knn_score, "weighted_image_knn_score": weighted_image_knn_score, "primary_knn_score": primary_knn_score, "support_knn_score": support_knn_score, "knn_score": knn_score, } """ 原始变量: ES总分 source_score:从 ES 返回的 matched_queries 里取 base_query 这条 named query 的分(dict 用具体分数;list 形式则“匹配到名字就算 1.0”)。 translation_score:所有名字以 base_query_trans_ 开头的 named query 的分,在 dict 里取 最大值;在 list 里只要存在这类名字就记为 1.0。 中间变量:计算原始query得分和翻译query得分 weighted_source : weighted_translation : text_translation_weight * translation_score(由 fusion.text_translation_weight 配置) 区分主信号和辅助信号: 合成primary_text_score和support_text_score,取 更强 的那一路(原文检索 vs 翻译检索)作为主信号 primary_text_score : max(weighted_source, weighted_translation) support_text_score : weighted_source + weighted_translation - primary_text_score 主信号和辅助信号的融合:dismax融合公式 最终text_score:主信号 + 0.25 * 辅助信号 text_score : primary_text_score + 0.25 * support_text_score """ def _collect_text_score_components( matched_queries: Any, fallback_es_score: float, *, translation_weight: float, ) -> Dict[str, float]: source_score = _extract_named_query_score(matched_queries, "base_query") translation_score = 0.0 if isinstance(matched_queries, dict): for query_name, score in matched_queries.items(): if not isinstance(query_name, str): continue numeric_score = _to_score(score) if query_name.startswith("base_query_trans_"): translation_score = max(translation_score, numeric_score) elif isinstance(matched_queries, list): for query_name in matched_queries: if not isinstance(query_name, str): continue if query_name.startswith("base_query_trans_"): translation_score = 1.0 weighted_source = source_score weighted_translation = float(translation_weight) * translation_score weighted_components = [weighted_source, weighted_translation] primary_text_score = max(weighted_components) support_text_score = sum(weighted_components) - primary_text_score text_score = primary_text_score + 0.25 * support_text_score if text_score <= 0.0: text_score = fallback_es_score weighted_source = fallback_es_score primary_text_score = fallback_es_score support_text_score = 0.0 return { "source_score": source_score, "translation_score": translation_score, "weighted_source_score": weighted_source, "weighted_translation_score": weighted_translation, "primary_text_score": primary_text_score, "support_text_score": support_text_score, "text_score": text_score, } def _format_debug_float(value: float) -> str: return f"{float(value):.6g}" def _build_hit_signal_bundle( hit: Dict[str, Any], fusion: CoarseRankFusionConfig | RerankFusionConfig, ) -> Dict[str, Any]: raw_es_score = _to_score(hit.get("_raw_es_score", hit.get("_original_score", hit.get("_score")))) hit["_raw_es_score"] = raw_es_score matched_queries = hit.get("matched_queries") text_components = _collect_text_score_components( matched_queries, raw_es_score, translation_weight=fusion.text_translation_weight, ) knn_components = _collect_knn_score_components(matched_queries, fusion) return { "doc_id": hit.get("_id"), "es_score": raw_es_score, "matched_queries": matched_queries, "text_components": text_components, "knn_components": knn_components, "text_score": text_components["text_score"], "knn_score": knn_components["knn_score"], } def _build_formula_summary( term_rows: List[Dict[str, Any]], style_boost: float, final_score: float, ) -> str: segments = [ ( f"{row['name']}=(" f"{_format_debug_float(row['raw_score'])}" f"+{_format_debug_float(row['bias'])})" f"^{_format_debug_float(row['exponent'])}" f"={_format_debug_float(row['factor'])}" ) for row in term_rows ] if style_boost != 1.0: segments.append(f"style_boost={_format_debug_float(style_boost)}") segments.append(f"final={_format_debug_float(final_score)}") return " | ".join(segments) def _build_ltr_feature_block( *, signal_bundle: Dict[str, Any], text_components: Dict[str, float], knn_components: Dict[str, float], rerank_score: Optional[float] = None, fine_score: Optional[float] = None, style_boost: float = 1.0, stage_score: Optional[float] = None, ) -> Dict[str, Any]: es_score = float(signal_bundle["es_score"]) text_score = float(signal_bundle["text_score"]) knn_score = float(signal_bundle["knn_score"]) source_score = float(text_components["source_score"]) translation_score = float(text_components["translation_score"]) text_knn_score = float(knn_components["text_knn_score"]) image_knn_score = float(knn_components["image_knn_score"]) return { "es_score": es_score, "text_score": text_score, "knn_score": knn_score, "rerank_score": None if rerank_score is None else float(rerank_score), "fine_score": None if fine_score is None else float(fine_score), "source_score": source_score, "translation_score": translation_score, "text_primary_score": float(text_components["primary_text_score"]), "text_support_score": float(text_components["support_text_score"]), "text_knn_score": text_knn_score, "image_knn_score": image_knn_score, "exact_text_knn_score": float(knn_components["exact_text_knn_score"]), "exact_image_knn_score": float(knn_components["exact_image_knn_score"]), "approx_text_knn_score": float(knn_components["approx_text_knn_score"]), "approx_image_knn_score": float(knn_components["approx_image_knn_score"]), "knn_primary_score": float(knn_components["primary_knn_score"]), "knn_support_score": float(knn_components["support_knn_score"]), "has_text_match": source_score > 0.0, "has_translation_match": translation_score > 0.0, "has_text_knn": text_knn_score > 0.0, "has_image_knn": image_knn_score > 0.0, "text_score_fallback_to_es": ( text_score == es_score and source_score <= 0.0 and translation_score <= 0.0 ), "style_boost": float(style_boost), "has_style_boost": float(style_boost) > 1.0, "stage_score": None if stage_score is None else float(stage_score), } def _maybe_append_weighted_knn_terms( *, term_rows: List[Dict[str, Any]], fusion: CoarseRankFusionConfig | RerankFusionConfig, knn_components: Optional[Dict[str, Any]], ) -> None: if not knn_components: return weighted_text_knn_score = _to_score(knn_components.get("weighted_text_knn_score")) weighted_image_knn_score = _to_score(knn_components.get("weighted_image_knn_score")) if float(getattr(fusion, "knn_text_exponent", 0.0)) != 0.0: text_bias = float(getattr(fusion, "knn_text_bias", fusion.knn_bias)) term_rows.append( { "name": "weighted_text_knn_score", "raw_score": weighted_text_knn_score, "bias": text_bias, "exponent": float(fusion.knn_text_exponent), "factor": (max(weighted_text_knn_score, 0.0) + text_bias) ** float(fusion.knn_text_exponent), } ) if float(getattr(fusion, "knn_image_exponent", 0.0)) != 0.0: image_bias = float(getattr(fusion, "knn_image_bias", fusion.knn_bias)) term_rows.append( { "name": "weighted_image_knn_score", "raw_score": weighted_image_knn_score, "bias": image_bias, "exponent": float(fusion.knn_image_exponent), "factor": (max(weighted_image_knn_score, 0.0) + image_bias) ** float(fusion.knn_image_exponent), } ) def _compute_multiplicative_fusion( *, es_score: float, text_score: float, knn_score: float, fusion: CoarseRankFusionConfig | RerankFusionConfig, knn_components: Optional[Dict[str, Any]] = None, rerank_score: Optional[float] = None, fine_score: Optional[float] = None, style_boost: float = 1.0, ) -> Dict[str, Any]: term_rows: List[Dict[str, Any]] = [] def _add_term(name: str, raw_score: Optional[float], bias: float, exponent: float) -> None: if raw_score is None: return factor = (max(float(raw_score), 0.0) + bias) ** exponent term_rows.append( { "name": name, "raw_score": float(raw_score), "bias": float(bias), "exponent": float(exponent), "factor": factor, } ) _add_term("es_score", es_score, fusion.es_bias, fusion.es_exponent) if isinstance(fusion, RerankFusionConfig): _add_term("rerank_score", rerank_score, fusion.rerank_bias, fusion.rerank_exponent) _add_term("fine_score", fine_score, fusion.fine_bias, fusion.fine_exponent) _add_term("text_score", text_score, fusion.text_bias, fusion.text_exponent) _add_term("knn_score", knn_score, fusion.knn_bias, fusion.knn_exponent) _maybe_append_weighted_knn_terms(term_rows=term_rows, fusion=fusion, knn_components=knn_components) fused = 1.0 factors: Dict[str, float] = {} inputs: Dict[str, float] = {} for row in term_rows: fused *= row["factor"] factors[row["name"]] = row["factor"] inputs[row["name"]] = row["raw_score"] fused *= style_boost factors["style_boost"] = style_boost return { "inputs": inputs, "factors": factors, "score": fused, "summary": _build_formula_summary(term_rows, style_boost, fused), } def _has_selected_sku(hit: Dict[str, Any]) -> bool: return bool(str(hit.get("_style_rerank_suffix") or "").strip()) def coarse_resort_hits( es_hits: List[Dict[str, Any]], fusion: Optional[CoarseRankFusionConfig] = None, debug: bool = False, ) -> List[Dict[str, Any]]: """Coarse rank with es/text/knn multiplicative fusion.""" if not es_hits: return [] f = fusion or CoarseRankFusionConfig() coarse_debug: List[Dict[str, Any]] = [] if debug else [] for hit in es_hits: signal_bundle = _build_hit_signal_bundle(hit, f) es_score = signal_bundle["es_score"] matched_queries = signal_bundle["matched_queries"] text_components = signal_bundle["text_components"] knn_components = signal_bundle["knn_components"] text_score = signal_bundle["text_score"] knn_score = signal_bundle["knn_score"] fusion_result = _compute_multiplicative_fusion( es_score=es_score, text_score=text_score, knn_score=knn_score, fusion=f, knn_components=knn_components, ) coarse_score = fusion_result["score"] hit["_text_score"] = text_score hit["_knn_score"] = knn_score hit["_text_knn_score"] = knn_components["text_knn_score"] hit["_image_knn_score"] = knn_components["image_knn_score"] hit["_exact_text_knn_score"] = knn_components["exact_text_knn_score"] hit["_exact_image_knn_score"] = knn_components["exact_image_knn_score"] hit["_coarse_score"] = coarse_score if debug: ltr_features = _build_ltr_feature_block( signal_bundle=signal_bundle, text_components=text_components, knn_components=knn_components, stage_score=coarse_score, ) coarse_debug.append( { "doc_id": hit.get("_id"), "es_score": es_score, "text_score": text_score, "text_source_score": text_components["source_score"], "text_translation_score": text_components["translation_score"], "text_weighted_source_score": text_components["weighted_source_score"], "text_weighted_translation_score": text_components["weighted_translation_score"], "text_primary_score": text_components["primary_text_score"], "text_support_score": text_components["support_text_score"], "text_score_fallback_to_es": ( text_score == es_score and text_components["source_score"] <= 0.0 and text_components["translation_score"] <= 0.0 ), "text_knn_score": knn_components["text_knn_score"], "image_knn_score": knn_components["image_knn_score"], "exact_text_knn_score": knn_components["exact_text_knn_score"], "exact_image_knn_score": knn_components["exact_image_knn_score"], "approx_text_knn_score": knn_components["approx_text_knn_score"], "approx_image_knn_score": knn_components["approx_image_knn_score"], "text_knn_source": knn_components["text_knn_source"], "image_knn_source": knn_components["image_knn_source"], "weighted_text_knn_score": knn_components["weighted_text_knn_score"], "weighted_image_knn_score": knn_components["weighted_image_knn_score"], "knn_primary_score": knn_components["primary_knn_score"], "knn_support_score": knn_components["support_knn_score"], "knn_score": knn_score, "fusion_inputs": fusion_result["inputs"], "fusion_factors": fusion_result["factors"], "fusion_summary": fusion_result["summary"], "coarse_es_factor": fusion_result["factors"].get("es_score"), "coarse_text_factor": fusion_result["factors"].get("text_score"), "coarse_knn_factor": fusion_result["factors"].get("knn_score"), "coarse_text_knn_factor": fusion_result["factors"].get("weighted_text_knn_score"), "coarse_image_knn_factor": fusion_result["factors"].get("weighted_image_knn_score"), "coarse_score": coarse_score, "matched_queries": matched_queries, "ltr_features": ltr_features, } ) es_hits.sort(key=lambda h: h.get("_coarse_score", h.get("_score", 0.0)), reverse=True) return coarse_debug def fuse_scores_and_resort( es_hits: List[Dict[str, Any]], rerank_scores: List[float], fine_scores: Optional[List[float]] = None, weight_es: float = DEFAULT_WEIGHT_ES, weight_ai: float = DEFAULT_WEIGHT_AI, fusion: Optional[RerankFusionConfig] = None, style_intent_selected_sku_boost: float = 1.2, debug: bool = False, rerank_debug_rows: Optional[List[Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """ 将 ES 分数与重排分数按乘法公式融合(不修改原始 _score),并按融合分数降序重排。 融合形式(由 ``fusion`` 配置 bias / exponent):: fused = (max(es,0)+b_es)^e_es * (max(rerank,0)+b_r)^e_r * (max(fine,0)+b_f)^e_f * (max(text,0)+b_t)^e_t * (max(knn,0)+b_k)^e_k * sku_boost 其中 sku_boost 仅在当前 hit 已选中 SKU 时生效,默认值为 1.2,可通过 ``query.style_intent.selected_sku_boost`` 配置。 对每条 hit 会写入: - _original_score: 原始 ES 分数 - _raw_es_score: ES 原始总分(后续阶段始终复用,不依赖可能被改写的 `_score`) - _rerank_score: 重排服务返回的分数 - _fused_score: 融合分数 - _text_score: 文本相关性分数(优先取 named queries 的 base_query 分数) - _knn_score: KNN 分数(优先取 exact named queries,缺失时回退 ANN named queries) Args: es_hits: ES hits 列表(会被原地修改) rerank_scores: 与 es_hits 等长的重排分数列表 weight_es: 兼容保留,当前未使用 weight_ai: 兼容保留,当前未使用 """ n = len(es_hits) if n == 0 or len(rerank_scores) != n: return [] f = fusion or RerankFusionConfig() fused_debug: List[Dict[str, Any]] = [] if debug else [] for idx, hit in enumerate(es_hits): signal_bundle = _build_hit_signal_bundle(hit, f) text_components = signal_bundle["text_components"] knn_components = signal_bundle["knn_components"] text_score = signal_bundle["text_score"] knn_score = signal_bundle["knn_score"] rerank_score = _to_score(rerank_scores[idx]) fine_score_raw = ( _to_score(fine_scores[idx]) if fine_scores is not None and len(fine_scores) == n else _to_score(hit.get("_fine_score")) ) fine_score = fine_score_raw if (fine_scores is not None and len(fine_scores) == n) or "_fine_score" in hit else None sku_selected = _has_selected_sku(hit) style_boost = style_intent_selected_sku_boost if sku_selected else 1.0 fusion_result = _compute_multiplicative_fusion( es_score=signal_bundle["es_score"], rerank_score=rerank_score, fine_score=fine_score, text_score=text_score, knn_score=knn_score, fusion=f, knn_components=knn_components, style_boost=style_boost, ) fused = fusion_result["score"] hit["_original_score"] = hit.get("_score") hit["_rerank_score"] = rerank_score if fine_score is not None: hit["_fine_score"] = fine_score hit["_text_score"] = text_score hit["_knn_score"] = knn_score hit["_text_knn_score"] = knn_components["text_knn_score"] hit["_image_knn_score"] = knn_components["image_knn_score"] hit["_exact_text_knn_score"] = knn_components["exact_text_knn_score"] hit["_exact_image_knn_score"] = knn_components["exact_image_knn_score"] hit["_fused_score"] = fused hit["_style_intent_selected_sku_boost"] = style_boost if debug: ltr_features = _build_ltr_feature_block( signal_bundle=signal_bundle, text_components=text_components, knn_components=knn_components, rerank_score=rerank_score, fine_score=fine_score, style_boost=style_boost, stage_score=fused, ) debug_entry = { "doc_id": hit.get("_id"), "score": fused, "es_score": signal_bundle["es_score"], "rerank_score": rerank_score, "fine_score": fine_score, "text_score": text_score, "knn_score": knn_score, "fusion_inputs": fusion_result["inputs"], "fusion_factors": fusion_result["factors"], "fusion_summary": fusion_result["summary"], "text_source_score": text_components["source_score"], "text_translation_score": text_components["translation_score"], "text_weighted_source_score": text_components["weighted_source_score"], "text_weighted_translation_score": text_components["weighted_translation_score"], "text_primary_score": text_components["primary_text_score"], "text_support_score": text_components["support_text_score"], "text_knn_score": knn_components["text_knn_score"], "image_knn_score": knn_components["image_knn_score"], "exact_text_knn_score": knn_components["exact_text_knn_score"], "exact_image_knn_score": knn_components["exact_image_knn_score"], "approx_text_knn_score": knn_components["approx_text_knn_score"], "approx_image_knn_score": knn_components["approx_image_knn_score"], "text_knn_source": knn_components["text_knn_source"], "image_knn_source": knn_components["image_knn_source"], "weighted_text_knn_score": knn_components["weighted_text_knn_score"], "weighted_image_knn_score": knn_components["weighted_image_knn_score"], "knn_primary_score": knn_components["primary_knn_score"], "knn_support_score": knn_components["support_knn_score"], "text_score_fallback_to_es": ( text_score == signal_bundle["es_score"] and text_components["source_score"] <= 0.0 and text_components["translation_score"] <= 0.0 ), "rerank_factor": fusion_result["factors"].get("rerank_score"), "fine_factor": fusion_result["factors"].get("fine_score"), "es_factor": fusion_result["factors"].get("es_score"), "text_factor": fusion_result["factors"].get("text_score"), "knn_factor": fusion_result["factors"].get("knn_score"), "text_knn_factor": fusion_result["factors"].get("weighted_text_knn_score"), "image_knn_factor": fusion_result["factors"].get("weighted_image_knn_score"), "style_intent_selected_sku": sku_selected, "style_intent_selected_sku_boost": style_boost, "matched_queries": signal_bundle["matched_queries"], "fused_score": fused, "ltr_features": ltr_features, } if rerank_debug_rows is not None and idx < len(rerank_debug_rows): debug_entry["rerank_input"] = rerank_debug_rows[idx] fused_debug.append(debug_entry) es_hits.sort( key=lambda h: h.get("_fused_score", h.get("_score", 0.0)), reverse=True, ) return fused_debug def run_rerank( query: str, es_response: Dict[str, Any], language: str = "zh", timeout_sec: float = DEFAULT_TIMEOUT_SEC, weight_es: float = DEFAULT_WEIGHT_ES, weight_ai: float = DEFAULT_WEIGHT_AI, rerank_query_template: str = "{query}", rerank_doc_template: str = "{title}", top_n: Optional[int] = None, debug: bool = False, fusion: Optional[RerankFusionConfig] = None, style_intent_selected_sku_boost: float = 1.2, fine_scores: Optional[List[float]] = None, service_profile: Optional[str] = None, ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], List[Dict[str, Any]]]: """ 完整重排流程:从 es_response 取 hits -> 构造 docs -> 调服务 -> 融合分数并重排 -> 更新 max_score。 Provider 和 URL 从 services_config 读取。 top_n 可选;若传入,会透传给 /rerank(供云后端按 page+size 做部分重排)。 """ hits = es_response.get("hits", {}).get("hits") or [] if not hits: return es_response, None, [] query_text = str(rerank_query_template).format_map({"query": query}) rerank_debug_rows: Optional[List[Dict[str, Any]]] = [] if debug else None docs = build_docs_from_hits( hits, language=language, doc_template=rerank_doc_template, debug_rows=rerank_debug_rows, ) scores, meta = call_rerank_service( query_text, docs, timeout_sec=timeout_sec, top_n=top_n, service_profile=service_profile, ) if scores is None or len(scores) != len(hits): return es_response, None, [] fused_debug = fuse_scores_and_resort( hits, scores, fine_scores=fine_scores, weight_es=weight_es, weight_ai=weight_ai, fusion=fusion, style_intent_selected_sku_boost=style_intent_selected_sku_boost, debug=debug, rerank_debug_rows=rerank_debug_rows, ) # 更新 max_score 为融合后的最高分 if hits: top = hits[0].get("_fused_score", hits[0].get("_score", 0.0)) or 0.0 if "hits" in es_response: es_response["hits"]["max_score"] = top return es_response, meta, fused_debug def run_lightweight_rerank( query: str, es_hits: List[Dict[str, Any]], language: str = "zh", timeout_sec: float = DEFAULT_TIMEOUT_SEC, rerank_query_template: str = "{query}", rerank_doc_template: str = "{title}", top_n: Optional[int] = None, debug: bool = False, fusion: Optional[RerankFusionConfig] = None, style_intent_selected_sku_boost: float = 1.2, service_profile: Optional[str] = "fine", ) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]], List[Dict[str, Any]]]: """Call lightweight reranker and rank by lightweight-model fusion.""" if not es_hits: return [], {}, [] query_text = str(rerank_query_template).format_map({"query": query}) rerank_debug_rows: Optional[List[Dict[str, Any]]] = [] if debug else None docs = build_docs_from_hits( es_hits, language=language, doc_template=rerank_doc_template, debug_rows=rerank_debug_rows, ) scores, meta = call_rerank_service( query_text, docs, timeout_sec=timeout_sec, top_n=top_n, service_profile=service_profile, ) if scores is None or len(scores) != len(es_hits): return None, None, [] f = fusion or RerankFusionConfig() debug_rows: List[Dict[str, Any]] = [] if debug else [] for idx, hit in enumerate(es_hits): signal_bundle = _build_hit_signal_bundle(hit, f) text_score = signal_bundle["text_score"] knn_score = signal_bundle["knn_score"] fine_score = _to_score(scores[idx]) sku_selected = _has_selected_sku(hit) style_boost = style_intent_selected_sku_boost if sku_selected else 1.0 fusion_result = _compute_multiplicative_fusion( es_score=signal_bundle["es_score"], fine_score=fine_score, text_score=text_score, knn_score=knn_score, fusion=f, knn_components=signal_bundle["knn_components"], style_boost=style_boost, ) hit["_fine_score"] = fine_score hit["_fine_fused_score"] = fusion_result["score"] hit["_text_score"] = text_score hit["_knn_score"] = knn_score hit["_text_knn_score"] = signal_bundle["knn_components"]["text_knn_score"] hit["_image_knn_score"] = signal_bundle["knn_components"]["image_knn_score"] hit["_exact_text_knn_score"] = signal_bundle["knn_components"]["exact_text_knn_score"] hit["_exact_image_knn_score"] = signal_bundle["knn_components"]["exact_image_knn_score"] hit["_style_intent_selected_sku_boost"] = style_boost if debug: ltr_features = _build_ltr_feature_block( signal_bundle=signal_bundle, text_components=signal_bundle["text_components"], knn_components=signal_bundle["knn_components"], fine_score=fine_score, style_boost=style_boost, stage_score=fusion_result["score"], ) row: Dict[str, Any] = { "doc_id": hit.get("_id"), "score": fusion_result["score"], "fine_score": fine_score, "text_score": text_score, "knn_score": knn_score, "fusion_inputs": fusion_result["inputs"], "fusion_factors": fusion_result["factors"], "fusion_summary": fusion_result["summary"], "es_score": signal_bundle["es_score"], "fine_factor": fusion_result["factors"].get("fine_score"), "es_factor": fusion_result["factors"].get("es_score"), "text_factor": fusion_result["factors"].get("text_score"), "knn_factor": fusion_result["factors"].get("knn_score"), "text_knn_factor": fusion_result["factors"].get("weighted_text_knn_score"), "image_knn_factor": fusion_result["factors"].get("weighted_image_knn_score"), "style_intent_selected_sku": sku_selected, "style_intent_selected_sku_boost": style_boost, "ltr_features": ltr_features, } if rerank_debug_rows is not None and idx < len(rerank_debug_rows): row["rerank_input"] = rerank_debug_rows[idx] debug_rows.append(row) es_hits.sort(key=lambda h: h.get("_fine_fused_score", h.get("_fine_score", 0.0)), reverse=True) return scores, meta, debug_rows