From 506c39b751130ec541c23b7fdabb1f5d373a97f8 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 5 Feb 2026 14:13:41 +0800 Subject: [PATCH] feat(search): 统一重排逻辑,仅由 ai_search 控制并调用外部 BGE 重排服务 --- api/models.py | 4 ++++ api/routes/search.py | 2 ++ config/config.yaml | 11 +++++++---- config/config_loader.py | 30 +++++++++++++++++++----------- docs/搜索API对接指南.md | 4 +++- search/__init__.py | 2 -- search/rerank_client.py | 244 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ search/rerank_engine.py | 171 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- search/searcher.py | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------- tests/conftest.py | 4 +--- 10 files changed, 362 insertions(+), 203 deletions(-) create mode 100644 search/rerank_client.py delete mode 100644 search/rerank_engine.py diff --git a/api/models.py b/api/models.py index 9c741b3..f4c9a0f 100644 --- a/api/models.py +++ b/api/models.py @@ -151,6 +151,10 @@ class SearchRequest(BaseModel): min_score: Optional[float] = Field(None, ge=0, description="最小相关性分数阈值") highlight: bool = Field(False, description="是否高亮搜索关键词(暂不实现)") debug: bool = Field(False, description="是否返回调试信息") + ai_search: bool = Field( + False, + description="是否开启 AI 搜索(调用本地重排服务对 ES 结果进行二次排序)" + ) # SKU筛选参数 sku_filter_dimension: Optional[List[str]] = Field( diff --git a/api/routes/search.py b/api/routes/search.py index 3b18bfc..b1cf4cf 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -84,6 +84,7 @@ async def search(request: SearchRequest, http_request: Request): f"min_score: {request.min_score} | " f"language: {request.language} | " f"debug: {request.debug} | " + f"ai_search: {request.ai_search} | " f"sku_filter_dimension: {request.sku_filter_dimension} | " f"filters: {request.filters} | " f"range_filters: {request.range_filters} | " @@ -111,6 +112,7 @@ async def search(request: SearchRequest, http_request: Request): debug=request.debug, language=request.language, sku_filter_dimension=request.sku_filter_dimension, + ai_search=request.ai_search, ) # Include performance summary in response diff --git a/config/config.yaml b/config/config.yaml index 7d9b404..7c041da 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -133,11 +133,14 @@ function_score: boost_mode: "multiply" functions: [] -# Rerank配置(本地重排,当前禁用) +# 重排配置(唯一实现:外部 BGE 重排服务,由请求参数 ai_search 控制是否执行) +# ai_search 且 from+size<=rerank_window 时:从 ES 取前 rerank_window 条、重排后再按 from/size 分页 rerank: - enabled: false - expression: "" - description: "Local reranking (disabled, use ES function_score instead)" + rerank_window: 1000 + # service_url: "http://127.0.0.1:6007/rerank" # 可选,不填则用默认端口 6007 + timeout_sec: 15.0 # 文档多时重排耗时长,可按需调大 + weight_es: 0.4 + weight_ai: 0.6 # SPU配置(已启用,使用嵌套skus) spu_config: diff --git a/config/config_loader.py b/config/config_loader.py index 2c8585b..72109f4 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -88,10 +88,14 @@ class RankingConfig: @dataclass class RerankConfig: - """本地重排配置(当前禁用)""" - enabled: bool = False - expression: str = "" - description: str = "" + """重排配置(唯一实现:调用外部 BGE 重排服务,由请求参数 ai_search 控制是否执行)""" + # 重排窗口:ai_search 且 from+size<=rerank_window 时,从 ES 取前 rerank_window 条重排后再分页 + rerank_window: int = 1000 + # 可选:重排服务 URL,为空时使用 reranker 模块默认端口 6007 + service_url: Optional[str] = None + timeout_sec: float = 15.0 + weight_es: float = 0.4 + weight_ai: float = 0.6 @dataclass @@ -263,12 +267,14 @@ class ConfigLoader: functions=fs_data.get("functions") or [] ) - # Parse Rerank configuration + # Parse Rerank configuration(唯一实现:外部重排服务,由 ai_search 控制) rerank_data = config_data.get("rerank", {}) rerank = RerankConfig( - enabled=rerank_data.get("enabled", False), - expression=rerank_data.get("expression") or "", - description=rerank_data.get("description") or "" + rerank_window=int(rerank_data.get("rerank_window", 1000)), + service_url=rerank_data.get("service_url") or None, + timeout_sec=float(rerank_data.get("timeout_sec", 15.0)), + weight_es=float(rerank_data.get("weight_es", 0.4)), + weight_ai=float(rerank_data.get("weight_ai", 0.6)), ) # Parse SPU config @@ -399,9 +405,11 @@ class ConfigLoader: "functions": config.function_score.functions }, "rerank": { - "enabled": config.rerank.enabled, - "expression": config.rerank.expression, - "description": config.rerank.description + "rerank_window": config.rerank.rerank_window, + "service_url": config.rerank.service_url, + "timeout_sec": config.rerank.timeout_sec, + "weight_es": config.rerank.weight_es, + "weight_ai": config.rerank.weight_ai, }, "spu_config": { "enabled": config.spu_config.enabled, diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 8a43a79..2a07055 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -167,6 +167,7 @@ curl -X POST "http://120.76.41.98:6002/search/" \ "min_score": 0.0, "sku_filter_dimension": ["string"], "debug": false, + "ai_search": false, "user_id": "string", "session_id": "string" } @@ -188,6 +189,7 @@ curl -X POST "http://120.76.41.98:6002/search/" \ | `min_score` | float | N | null | 最小相关性分数阈值 | | `sku_filter_dimension` | array[string] | N | null | 子SKU筛选维度列表(见[SKU筛选维度](#35-sku筛选维度)) | | `debug` | boolean | N | false | 是否返回调试信息 | +| `ai_search` | boolean | N | false | 是否开启 AI 搜索(调用本地重排服务对 ES 结果进行二次排序) | | `user_id` | string | N | null | 用户ID(用于个性化,预留) | | `session_id` | string | N | null | 会话ID(用于分析,预留) | @@ -787,7 +789,7 @@ curl "http://localhost:6002/search/12345" | `option3_name` | string | 选项3名称 | | `specifications` | array[object] | 规格列表(与ES specifications字段对应) | | `skus` | array | SKU 列表 | -| `relevance_score` | float | 相关性分数 | +| `relevance_score` | float | 相关性分数(默认为 ES 原始分数;当开启 AI 搜索时为融合后的最终分数) | ### 4.4 SkuResult字段说明 diff --git a/search/__init__.py b/search/__init__.py index 5655018..a2bf707 100644 --- a/search/__init__.py +++ b/search/__init__.py @@ -2,14 +2,12 @@ from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder -from .rerank_engine import RerankEngine from .searcher import Searcher, SearchResult __all__ = [ 'BooleanParser', 'QueryNode', 'ESQueryBuilder', - 'RerankEngine', 'Searcher', 'SearchResult', ] diff --git a/search/rerank_client.py b/search/rerank_client.py new file mode 100644 index 0000000..8f11820 --- /dev/null +++ b/search/rerank_client.py @@ -0,0 +1,244 @@ +""" +重排客户端:调用外部 BGE 重排服务,并对 ES 分数与重排分数进行融合。 + +流程: +1. 从 ES hits 构造用于重排的文档文本列表 +2. POST 请求到重排服务 /rerank,获取每条文档的 relevance 分数 +3. 将 ES 分数(归一化)与重排分数线性融合,写回 hit["_score"] 并重排序 +""" + +from typing import Dict, Any, List, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + +# 默认融合权重:ES 归一化分数权重、重排分数权重(相加为 1) +DEFAULT_WEIGHT_ES = 0.4 +DEFAULT_WEIGHT_AI = 0.6 +# 重排服务默认超时(文档较多时需更大,建议 config 中 timeout_sec 调大) +DEFAULT_TIMEOUT_SEC = 15.0 + + +def build_docs_from_hits( + es_hits: List[Dict[str, Any]], + language: str = "zh", +) -> List[str]: + """ + 从 ES 命中结果构造重排服务所需的文档文本列表(与 hits 一一对应)。 + + 文本由 title、brief、description、vendor、category_path 等多语言字段拼接, + 按 language 优先选取对应语言;若无内容则用 spu_id 兜底。 + + Args: + es_hits: ES 返回的 hits 列表,每项含 _source + language: 语言代码,如 "zh"、"en" + + Returns: + 与 es_hits 等长的字符串列表,用于 POST /rerank 的 docs + """ + lang = (language or "zh").strip().lower() + if lang not in ("zh", "en"): + lang = "zh" + + def pick_lang_text(obj: Any) -> str: + if obj is None: + return "" + if isinstance(obj, dict): + return str(obj.get(lang) or obj.get("zh") or obj.get("en") or "").strip() + return str(obj).strip() + + docs: List[str] = [] + for hit in es_hits: + src = hit.get("_source") or {} + parts: List[str] = [] + for key in ("title", "brief", "description", "vendor", "category_path"): + parts.append(pick_lang_text(src.get(key))) + text = " ".join(p for p in parts if p).strip() + if not text: + text = str(src.get("spu_id", "")) + docs.append(text) + return docs + + +def call_rerank_service( + query: str, + docs: List[str], + service_url: str, + timeout_sec: float = DEFAULT_TIMEOUT_SEC, +) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]: + """ + 调用重排服务 POST /rerank,返回分数列表与 meta。 + + Args: + query: 搜索查询字符串 + docs: 文档文本列表(与 ES hits 顺序一致) + service_url: 完整 URL,如 http://127.0.0.1:6007/rerank + timeout_sec: 请求超时秒数 + + Returns: + (scores, meta):成功时 scores 与 docs 等长,meta 为服务返回的 meta; + 失败时返回 (None, None) + """ + if not docs: + return [], {} + try: + import requests + payload = {"query": (query or "").strip(), "docs": docs} + response = requests.post(service_url, json=payload, timeout=timeout_sec) + if response.status_code != 200: + logger.warning( + "Rerank service HTTP %s: %s", + response.status_code, + (response.text or "")[:200], + ) + return None, None + data = response.json() + scores = data.get("scores") + if not isinstance(scores, list): + return None, None + return scores, data.get("meta") or {} + except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout) as e: + logger.warning( + "Rerank request timed out after %.1fs (docs=%d); returning ES order. %s", + timeout_sec, len(docs), e, + ) + return None, None + except Exception as e: + logger.warning("Rerank request failed: %s", e, exc_info=True) + return None, None + + +def fuse_scores_and_resort( + es_hits: List[Dict[str, Any]], + rerank_scores: List[float], + weight_es: float = DEFAULT_WEIGHT_ES, + weight_ai: float = DEFAULT_WEIGHT_AI, +) -> List[Dict[str, Any]]: + """ + 将 ES 分数与重排分数线性融合,写回每条 hit 的 _score,并按融合分数降序重排。 + + 对每条 hit 会写入: + - _original_score: 原始 ES 分数 + - _ai_rerank_score: 重排服务返回的分数 + - _fused_score: 融合分数 + - _score: 置为融合分数(供后续 ResultFormatter 使用) + + Args: + es_hits: ES hits 列表(会被原地修改) + rerank_scores: 与 es_hits 等长的重排分数列表 + weight_es: ES 归一化分数权重 + weight_ai: 重排分数权重 + + Returns: + 每条文档的融合调试信息列表,用于 debug_info + """ + n = len(es_hits) + if n == 0 or len(rerank_scores) != n: + return [] + + # 收集 ES 原始分数 + es_scores: List[float] = [] + for hit in es_hits: + raw = hit.get("_score") + try: + es_scores.append(float(raw) if raw is not None else 0.0) + except (TypeError, ValueError): + es_scores.append(0.0) + + max_es = max(es_scores) if es_scores else 0.0 + fused_debug: List[Dict[str, Any]] = [] + + for idx, hit in enumerate(es_hits): + es_score = es_scores[idx] + ai_score_raw = rerank_scores[idx] + try: + ai_score = float(ai_score_raw) + except (TypeError, ValueError): + ai_score = 0.0 + + es_norm = (es_score / max_es) if max_es > 0 else 0.0 + fused = weight_es * es_norm + weight_ai * ai_score + + hit["_original_score"] = hit.get("_score") + hit["_ai_rerank_score"] = ai_score + hit["_fused_score"] = fused + hit["_score"] = fused + + fused_debug.append({ + "doc_id": hit.get("_id"), + "es_score": es_score, + "es_score_norm": es_norm, + "ai_rerank_score": ai_score, + "fused_score": fused, + }) + + # 按融合分数降序重排 + es_hits.sort( + key=lambda h: h.get("_fused_score", h.get("_score", 0.0)), + reverse=True, + ) + return fused_debug + + +def run_rerank( + query: str, + es_response: Dict[str, Any], + language: str = "zh", + service_url: Optional[str] = None, + timeout_sec: float = DEFAULT_TIMEOUT_SEC, + weight_es: float = DEFAULT_WEIGHT_ES, + weight_ai: float = DEFAULT_WEIGHT_AI, +) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], List[Dict[str, Any]]]: + """ + 完整重排流程:从 es_response 取 hits -> 构造 docs -> 调服务 -> 融合分数并重排 -> 更新 max_score。 + + Args: + query: 搜索查询 + es_response: ES 原始响应(其中的 hits["hits"] 会被原地修改) + language: 文档文本使用的语言 + service_url: 重排服务 URL,为 None 时使用默认 127.0.0.1:6007 + timeout_sec: 请求超时 + weight_es: ES 分数权重 + weight_ai: 重排分数权重 + + Returns: + (es_response, rerank_meta, fused_debug): + - es_response: 已更新 hits 与 max_score 的响应(同一引用) + - rerank_meta: 重排服务返回的 meta,失败时为 None + - fused_debug: 每条文档的融合信息,供 debug 使用 + """ + try: + from reranker.config import CONFIG as RERANKER_CONFIG + except Exception: + RERANKER_CONFIG = None + + url = service_url + if not url and RERANKER_CONFIG is not None: + url = f"http://127.0.0.1:{RERANKER_CONFIG.PORT}/rerank" + if not url: + url = "http://127.0.0.1:6007/rerank" + + hits = es_response.get("hits", {}).get("hits") or [] + if not hits: + return es_response, None, [] + + docs = build_docs_from_hits(hits, language=language) + scores, meta = call_rerank_service(query, docs, url, timeout_sec=timeout_sec) + + if scores is None or len(scores) != len(hits): + return es_response, None, [] + + fused_debug = fuse_scores_and_resort( + hits, + scores, + weight_es=weight_es, + weight_ai=weight_ai, + ) + + # 更新 max_score 为融合后的最高分 + if hits: + top = hits[0].get("_fused_score", hits[0].get("_score", 0.0)) or 0.0 + if "hits" in es_response: + es_response["hits"]["max_score"] = top + + return es_response, meta, fused_debug diff --git a/search/rerank_engine.py b/search/rerank_engine.py deleted file mode 100644 index 258894f..0000000 --- a/search/rerank_engine.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -Reranking engine for post-processing search result scoring. - -本地重排引擎,用于ES返回结果后的二次排序。 -当前状态:已禁用,优先使用ES的function_score。 - -Supports expression-based ranking with functions like: -- bm25(): Base BM25 text relevance score -- text_embedding_relevance(): KNN embedding similarity -- field_value(field): Use field value in scoring -- timeliness(date_field): Time decay function -""" - -import re -from typing import Dict, Any, List, Optional -import math - - -class RerankEngine: - """ - 本地重排引擎(当前禁用) - - 功能:对ES返回的结果进行二次打分和排序 - 用途:复杂的自定义排序逻辑、实时个性化等 - """ - - def __init__(self, ranking_expression: str, enabled: bool = False): - """ - Initialize rerank engine. - - Args: - ranking_expression: Ranking expression string - Example: "bm25() + 0.2*text_embedding_relevance() + general_score*2" - enabled: Whether local reranking is enabled (default: False) - """ - self.enabled = enabled - self.expression = ranking_expression - self.parsed_terms = [] - - if enabled: - self.parsed_terms = self._parse_expression(ranking_expression) - - def _parse_expression(self, expression: str) -> List[Dict[str, Any]]: - """ - Parse ranking expression into terms. - - Args: - expression: Ranking expression - - Returns: - List of term dictionaries - """ - terms = [] - - # Pattern to match: coefficient * function() or field_name - # Example: "0.2*text_embedding_relevance()" or "general_score*2" - pattern = r'([+-]?\s*\d*\.?\d*)\s*\*?\s*([a-zA-Z_]\w*(?:\([^)]*\))?)' - - for match in re.finditer(pattern, expression): - coef_str = match.group(1).strip() - func_str = match.group(2).strip() - - # Parse coefficient - if coef_str in ['', '+']: - coefficient = 1.0 - elif coef_str == '-': - coefficient = -1.0 - else: - try: - coefficient = float(coef_str) - except ValueError: - coefficient = 1.0 - - # Check if function or field - if '(' in func_str: - # Function call - func_name = func_str[:func_str.index('(')] - args_str = func_str[func_str.index('(') + 1:func_str.rindex(')')] - args = [arg.strip() for arg in args_str.split(',')] if args_str else [] - - terms.append({ - 'type': 'function', - 'name': func_name, - 'args': args, - 'coefficient': coefficient - }) - else: - # Field reference - terms.append({ - 'type': 'field', - 'name': func_str, - 'coefficient': coefficient - }) - - return terms - - def calculate_score( - self, - hit: Dict[str, Any], - base_score: float, - knn_score: Optional[float] = None - ) -> float: - """ - Calculate final score for a search result. - - Args: - hit: ES hit document - base_score: Base BM25 score - knn_score: KNN similarity score (if available) - - Returns: - Final calculated score - """ - if not self.enabled: - return base_score - - score = 0.0 - source = hit.get('_source', {}) - - for term in self.parsed_terms: - term_value = 0.0 - - if term['type'] == 'function': - func_name = term['name'] - - if func_name == 'bm25': - term_value = base_score - - elif func_name == 'text_embedding_relevance': - term_value = knn_score if knn_score is not None else 0.0 - - elif func_name == 'timeliness': - # Time decay function - if term['args']: - date_field = term['args'][0] - if date_field in source: - # Simple time decay (would need actual implementation) - term_value = 1.0 - else: - term_value = 1.0 - - elif func_name == 'field_value': - # Get field value - if term['args'] and term['args'][0] in source: - field_value = source[term['args'][0]] - try: - term_value = float(field_value) - except (ValueError, TypeError): - term_value = 0.0 - - elif term['type'] == 'field': - # Direct field reference - field_name = term['name'] - if field_name in source: - try: - term_value = float(source[field_name]) - except (ValueError, TypeError): - term_value = 0.0 - - score += term['coefficient'] * term_value - - return score - - def get_expression(self) -> str: - """Get ranking expression.""" - return self.expression - - def get_terms(self) -> List[Dict[str, Any]]: - """Get parsed expression terms.""" - return self.parsed_terms - diff --git a/search/searcher.py b/search/searcher.py index 9e81206..a0a85ff 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -13,7 +13,6 @@ from query import QueryParser, ParsedQuery from embeddings import CLIPImageEncoder from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder -from .rerank_engine import RerankEngine from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from config.utils import get_match_fields_for_index @@ -99,7 +98,6 @@ class Searcher: # Initialize components self.boolean_parser = BooleanParser() - self.rerank_engine = RerankEngine(config.ranking.expression, enabled=False) # Get match fields from config self.match_fields = get_match_fields_for_index(config, "default") @@ -137,6 +135,7 @@ class Searcher: debug: bool = False, language: str = "en", sku_filter_dimension: Optional[List[str]] = None, + ai_search: bool = False, ) -> SearchResult: """ Execute search query (外部友好格式). @@ -168,15 +167,21 @@ class Searcher: index_langs = tenant_cfg.get("index_languages") or [] enable_translation = len(index_langs) > 0 enable_embedding = self.config.query_config.enable_text_embedding - enable_rerank = False # Temporarily disabled + # 重排仅由请求参数 ai_search 控制,唯一实现为调用外部 BGE 重排服务 + enable_rerank = bool(ai_search) + rerank_window = self.config.rerank.rerank_window or 1000 + # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES + in_rerank_window = enable_rerank and (from_ + size) <= rerank_window + es_fetch_from = 0 if in_rerank_window else from_ + es_fetch_size = rerank_window if in_rerank_window else size # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " - f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, " - f"enable_rerank={enable_rerank}, min_score={min_score}", + f"enable_rerank={enable_rerank}, in_rerank_window={in_rerank_window}, es_fetch=({es_fetch_from},{es_fetch_size}) | " + f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) @@ -184,6 +189,9 @@ class Searcher: context.metadata['search_params'] = { 'size': size, 'from_': from_, + 'es_fetch_from': es_fetch_from, + 'es_fetch_size': es_fetch_size, + 'in_rerank_window': in_rerank_window, 'filters': filters, 'range_filters': range_filters, 'facets': facets, @@ -287,8 +295,8 @@ class Searcher: filters=filters, range_filters=range_filters, facet_configs=facets, - size=size, - from_=from_, + size=es_fetch_size, + from_=es_fetch_from, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score, parsed_query=parsed_query @@ -336,12 +344,12 @@ class Searcher: # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: - # Use tenant-specific index name + # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( index_name=index_name, body=body_for_es, - size=size, - from_=from_ + size=es_fetch_size, + from_=es_fetch_from ) # Store ES response in context @@ -365,6 +373,69 @@ class Searcher: finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) + # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行) + if enable_rerank and in_rerank_window: + context.start_stage(RequestContextStage.RERANKING) + try: + from .rerank_client import run_rerank + + rerank_query = parsed_query.original_query if parsed_query else query + rc = self.config.rerank + es_response, rerank_meta, fused_debug = run_rerank( + query=rerank_query, + es_response=es_response, + language=language, + service_url=rc.service_url, + timeout_sec=rc.timeout_sec, + weight_es=rc.weight_es, + weight_ai=rc.weight_ai, + ) + + if rerank_meta is not None: + try: + from reranker.config import CONFIG as RERANKER_CONFIG + rerank_url = f"http://127.0.0.1:{RERANKER_CONFIG.PORT}/rerank" + except Exception: + rerank_url = "http://127.0.0.1:6007/rerank" + context.metadata.setdefault("rerank_info", {}) + context.metadata["rerank_info"].update({ + "service_url": rerank_url, + "docs": len(es_response.get("hits", {}).get("hits") or []), + "meta": rerank_meta, + }) + context.store_intermediate_result("rerank_scores", fused_debug) + context.logger.info( + f"重排完成 | docs={len(fused_debug)} | meta={rerank_meta}", + extra={'reqid': context.reqid, 'uid': context.uid} + ) + except Exception as e: + context.add_warning(f"Rerank failed: {e}") + context.logger.warning( + f"调用重排服务失败 | error: {e}", + extra={'reqid': context.reqid, 'uid': context.uid}, + exc_info=True, + ) + finally: + context.end_stage(RequestContextStage.RERANKING) + + # 当本次请求在重排窗口内时:已从 ES 取了 rerank_window 条并可能已重排,需按请求的 from/size 做分页切片 + if in_rerank_window: + hits = es_response.get("hits", {}).get("hits") or [] + sliced = hits[from_ : from_ + size] + es_response.setdefault("hits", {})["hits"] = sliced + if sliced: + slice_max = max((h.get("_score") for h in sliced), default=0.0) + try: + es_response["hits"]["max_score"] = float(slice_max) + except (TypeError, ValueError): + es_response["hits"]["max_score"] = 0.0 + else: + es_response["hits"]["max_score"] = 0.0 + context.logger.info( + f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", + extra={'reqid': context.reqid, 'uid': context.uid} + ) + # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: @@ -379,7 +450,7 @@ class Searcher: total_value = total.get('value', 0) else: total_value = total - + # max_score 会在启用 AI 搜索时被更新为融合分数的最大值 max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter diff --git a/tests/conftest.py b/tests/conftest.py index 49373c6..e192ef0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -191,9 +191,7 @@ def temp_config_file() -> Generator[str, None, None]: "functions": [] }, "rerank": { - "enabled": False, - "expression": "", - "description": "" + "rerank_window": 1000 } } -- libgit2 0.21.2