Blame view

search/rerank_client.py 8.84 KB
506c39b7   tangwang   feat(search): 统一重...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  """
  重排客户端:调用外部 BGE 重排服务,并对 ES 分数与重排分数进行融合。
  
  流程:
  1.  ES hits 构造用于重排的文档文本列表
  2. POST 请求到重排服务 /rerank,获取每条文档的 relevance 分数
  3.  ES 分数(归一化)与重排分数线性融合,写回 hit["_score"] 并重排序
  """
  
  from typing import Dict, Any, List, Optional, Tuple
  import logging
  
  logger = logging.getLogger(__name__)
  
  # 默认融合权重:ES 归一化分数权重、重排分数权重(相加为 1)
  DEFAULT_WEIGHT_ES = 0.4
  DEFAULT_WEIGHT_AI = 0.6
  # 重排服务默认超时(文档较多时需更大,建议 config 中 timeout_sec 调大)
  DEFAULT_TIMEOUT_SEC = 15.0
  
  
  def build_docs_from_hits(
      es_hits: List[Dict[str, Any]],
      language: str = "zh",
ff32d894   tangwang   rerank
25
      doc_template: str = "{title}",
506c39b7   tangwang   feat(search): 统一重...
26
27
28
29
  ) -> List[str]:
      """
       ES 命中结果构造重排服务所需的文档文本列表(与 hits 一一对应)。
  
ff32d894   tangwang   rerank
30
31
      使用 doc_template 将文档字段组装为重排服务输入。
      支持占位符:{title} {brief} {vendor} {description} {category_path}
506c39b7   tangwang   feat(search): 统一重...
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  
      Args:
          es_hits: ES 返回的 hits 列表,每项含 _source
          language: 语言代码,如 "zh""en"
  
      Returns:
           es_hits 等长的字符串列表,用于 POST /rerank  docs
      """
      lang = (language or "zh").strip().lower()
      if lang not in ("zh", "en"):
          lang = "zh"
  
      def pick_lang_text(obj: Any) -> str:
          if obj is None:
              return ""
          if isinstance(obj, dict):
              return str(obj.get(lang) or obj.get("zh") or obj.get("en") or "").strip()
          return str(obj).strip()
  
ff32d894   tangwang   rerank
51
52
53
54
      class _SafeDict(dict):
          def __missing__(self, key: str) -> str:
              return ""
  
506c39b7   tangwang   feat(search): 统一重...
55
      docs: List[str] = []
ff32d894   tangwang   rerank
56
57
58
59
60
      only_title = "{title}" == doc_template
      need_brief = "{brief}" in doc_template
      need_vendor = "{vendor}" in doc_template
      need_description = "{description}" in doc_template
      need_category_path = "{category_path}" in doc_template
506c39b7   tangwang   feat(search): 统一重...
61
62
      for hit in es_hits:
          src = hit.get("_source") or {}
ff32d894   tangwang   rerank
63
64
65
66
67
68
69
70
71
72
73
          if only_title:
              docs.append(pick_lang_text(src.get("title")))
          else:
              values = _SafeDict(
                  title=pick_lang_text(src.get("title")),
                  brief=pick_lang_text(src.get("brief")) if need_brief else "",
                  vendor=pick_lang_text(src.get("vendor")) if need_vendor else "",
                  description=pick_lang_text(src.get("description")) if need_description else "",
                  category_path=pick_lang_text(src.get("category_path")) if need_category_path else "",
              )
              docs.append(str(doc_template).format_map(values))
506c39b7   tangwang   feat(search): 统一重...
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
      return docs
  
  
  def call_rerank_service(
      query: str,
      docs: List[str],
      service_url: str,
      timeout_sec: float = DEFAULT_TIMEOUT_SEC,
  ) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]:
      """
      调用重排服务 POST /rerank,返回分数列表与 meta
  
      Args:
          query: 搜索查询字符串
          docs: 文档文本列表(与 ES hits 顺序一致)
          service_url: 完整 URL,如 http://127.0.0.1:6007/rerank
          timeout_sec: 请求超时秒数
  
      Returns:
          (scores, meta):成功时 scores  docs 等长,meta 为服务返回的 meta
          失败时返回 (None, None)
      """
      if not docs:
          return [], {}
      try:
          import requests
          payload = {"query": (query or "").strip(), "docs": docs}
          response = requests.post(service_url, json=payload, timeout=timeout_sec)
          if response.status_code != 200:
              logger.warning(
                  "Rerank service HTTP %s: %s",
                  response.status_code,
                  (response.text or "")[:200],
              )
              return None, None
          data = response.json()
          scores = data.get("scores")
          if not isinstance(scores, list):
              return None, None
          return scores, data.get("meta") or {}
      except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout) as e:
          logger.warning(
              "Rerank request timed out after %.1fs (docs=%d); returning ES order. %s",
              timeout_sec, len(docs), e,
          )
          return None, None
      except Exception as e:
          logger.warning("Rerank request failed: %s", e, exc_info=True)
          return None, None
  
  
  def fuse_scores_and_resort(
      es_hits: List[Dict[str, Any]],
      rerank_scores: List[float],
      weight_es: float = DEFAULT_WEIGHT_ES,
      weight_ai: float = DEFAULT_WEIGHT_AI,
  ) -> List[Dict[str, Any]]:
      """
       ES 分数与重排分数线性融合,写回每条 hit  _score,并按融合分数降序重排。
  
      对每条 hit 会写入:
      - _original_score: 原始 ES 分数
      - _ai_rerank_score: 重排服务返回的分数
      - _fused_score: 融合分数
      - _score: 置为融合分数(供后续 ResultFormatter 使用)
  
      Args:
          es_hits: ES hits 列表(会被原地修改)
          rerank_scores:  es_hits 等长的重排分数列表
          weight_es: ES 归一化分数权重
          weight_ai: 重排分数权重
  
      Returns:
          每条文档的融合调试信息列表,用于 debug_info
      """
      n = len(es_hits)
      if n == 0 or len(rerank_scores) != n:
          return []
  
      # 收集 ES 原始分数
      es_scores: List[float] = []
      for hit in es_hits:
          raw = hit.get("_score")
          try:
              es_scores.append(float(raw) if raw is not None else 0.0)
          except (TypeError, ValueError):
              es_scores.append(0.0)
  
      max_es = max(es_scores) if es_scores else 0.0
      fused_debug: List[Dict[str, Any]] = []
  
      for idx, hit in enumerate(es_hits):
          es_score = es_scores[idx]
          ai_score_raw = rerank_scores[idx]
          try:
              ai_score = float(ai_score_raw)
          except (TypeError, ValueError):
              ai_score = 0.0
  
          es_norm = (es_score / max_es) if max_es > 0 else 0.0
          fused = weight_es * es_norm + weight_ai * ai_score
  
          hit["_original_score"] = hit.get("_score")
          hit["_ai_rerank_score"] = ai_score
          hit["_fused_score"] = fused
          hit["_score"] = fused
  
          fused_debug.append({
              "doc_id": hit.get("_id"),
              "es_score": es_score,
              "es_score_norm": es_norm,
              "ai_rerank_score": ai_score,
              "fused_score": fused,
          })
  
      # 按融合分数降序重排
      es_hits.sort(
          key=lambda h: h.get("_fused_score", h.get("_score", 0.0)),
          reverse=True,
      )
      return fused_debug
  
  
  def run_rerank(
      query: str,
      es_response: Dict[str, Any],
      language: str = "zh",
      service_url: Optional[str] = None,
      timeout_sec: float = DEFAULT_TIMEOUT_SEC,
      weight_es: float = DEFAULT_WEIGHT_ES,
      weight_ai: float = DEFAULT_WEIGHT_AI,
ff32d894   tangwang   rerank
205
206
      rerank_query_template: str = "{query}",
      rerank_doc_template: str = "{title}",
506c39b7   tangwang   feat(search): 统一重...
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
  ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], List[Dict[str, Any]]]:
      """
      完整重排流程:从 es_response  hits -> 构造 docs -> 调服务 -> 融合分数并重排 -> 更新 max_score
  
      Args:
          query: 搜索查询
          es_response: ES 原始响应(其中的 hits["hits"] 会被原地修改)
          language: 文档文本使用的语言
          service_url: 重排服务 URL,为 None 时使用默认 127.0.0.1:6007
          timeout_sec: 请求超时
          weight_es: ES 分数权重
          weight_ai: 重排分数权重
  
      Returns:
          (es_response, rerank_meta, fused_debug):
          - es_response: 已更新 hits  max_score 的响应(同一引用)
          - rerank_meta: 重排服务返回的 meta,失败时为 None
          - fused_debug: 每条文档的融合信息,供 debug 使用
      """
      try:
          from reranker.config import CONFIG as RERANKER_CONFIG
      except Exception:
          RERANKER_CONFIG = None
  
      url = service_url
      if not url and RERANKER_CONFIG is not None:
          url = f"http://127.0.0.1:{RERANKER_CONFIG.PORT}/rerank"
      if not url:
          url = "http://127.0.0.1:6007/rerank"
  
      hits = es_response.get("hits", {}).get("hits") or []
      if not hits:
          return es_response, None, []
  
ff32d894   tangwang   rerank
241
242
243
244
      # Apply query template (supports {query})
      query_text = str(rerank_query_template).format_map({"query": query})
      docs = build_docs_from_hits(hits, language=language, doc_template=rerank_doc_template)
      scores, meta = call_rerank_service(query_text, docs, url, timeout_sec=timeout_sec)
506c39b7   tangwang   feat(search): 统一重...
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
  
      if scores is None or len(scores) != len(hits):
          return es_response, None, []
  
      fused_debug = fuse_scores_and_resort(
          hits,
          scores,
          weight_es=weight_es,
          weight_ai=weight_ai,
      )
  
      # 更新 max_score 为融合后的最高分
      if hits:
          top = hits[0].get("_fused_score", hits[0].get("_score", 0.0)) or 0.0
          if "hits" in es_response:
              es_response["hits"]["max_score"] = top
  
      return es_response, meta, fused_debug