rerank_client.py
30.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
"""
重排客户端:调用外部 BGE 重排服务,并对 ES 分数与重排分数进行融合。
流程:
1. 从 ES hits 构造用于重排的文档文本列表
2. POST 请求到重排服务 /rerank,获取每条文档的 relevance 分数
3. 提取 ES 文本/向量子句分数,与重排分数做乘法融合并重排序
"""
from typing import Dict, Any, List, Optional, Tuple
import logging
from config.schema import CoarseRankFusionConfig, RerankFusionConfig
from providers import create_rerank_provider
logger = logging.getLogger(__name__)
# 历史配置项,保留签名兼容;当前乘法融合公式不再使用线性权重。
DEFAULT_WEIGHT_ES = 0.4
DEFAULT_WEIGHT_AI = 0.6
# 重排服务默认超时(文档较多时需更大,建议 config 中 timeout_sec 调大)
DEFAULT_TIMEOUT_SEC = 15.0
def build_docs_from_hits(
es_hits: List[Dict[str, Any]],
language: str = "zh",
doc_template: str = "{title}",
debug_rows: Optional[List[Dict[str, Any]]] = None,
) -> List[str]:
"""
从 ES 命中结果构造重排服务所需的文档文本列表(与 hits 一一对应)。
使用 doc_template 将文档字段组装为重排服务输入。
支持占位符:{title} {brief} {vendor} {description} {category_path}
Args:
es_hits: ES 返回的 hits 列表,每项含 _source
language: 语言代码,如 "zh"、"en"
Returns:
与 es_hits 等长的字符串列表,用于 POST /rerank 的 docs
"""
lang = (language or "zh").strip().lower()
if lang not in ("zh", "en"):
lang = "zh"
def pick_lang_text(obj: Any) -> str:
if obj is None:
return ""
if isinstance(obj, dict):
return str(obj.get(lang) or obj.get("zh") or obj.get("en") or "").strip()
return str(obj).strip()
class _SafeDict(dict):
def __missing__(self, key: str) -> str:
return ""
docs: List[str] = []
only_title = "{title}" == doc_template
need_brief = "{brief}" in doc_template
need_vendor = "{vendor}" in doc_template
need_description = "{description}" in doc_template
need_category_path = "{category_path}" in doc_template
for hit in es_hits:
src = hit.get("_source") or {}
title_suffix = str(hit.get("_style_rerank_suffix") or "").strip()
title_str=(
f"{pick_lang_text(src.get('title'))} {title_suffix}".strip()
if title_suffix
else pick_lang_text(src.get("title"))
)
title_str = str(title_str).strip()
if only_title:
doc_text = title_str
if debug_rows is not None:
preview = doc_text if len(doc_text) <= 300 else f"{doc_text[:300]}..."
debug_rows.append({
"doc_template": doc_template,
"title_suffix": title_suffix or None,
"fields": {
"title": title_str,
},
"doc_preview": preview,
"doc_length": len(doc_text),
})
else:
values = _SafeDict(
title=title_str,
brief=pick_lang_text(src.get("brief")) if need_brief else "",
vendor=pick_lang_text(src.get("vendor")) if need_vendor else "",
description=pick_lang_text(src.get("description")) if need_description else "",
category_path=pick_lang_text(src.get("category_path")) if need_category_path else "",
)
doc_text = str(doc_template).format_map(values)
if debug_rows is not None:
preview = doc_text if len(doc_text) <= 300 else f"{doc_text[:300]}..."
debug_rows.append({
"doc_template": doc_template,
"title_suffix": title_suffix or None,
"fields": {
"title": title_str,
"brief": values.get("brief") or None,
"vendor": values.get("vendor") or None,
"category_path": values.get("category_path") or None
},
"doc_preview": preview,
"doc_length": len(doc_text),
})
docs.append(doc_text)
return docs
def call_rerank_service(
query: str,
docs: List[str],
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
top_n: Optional[int] = None,
service_profile: Optional[str] = None,
) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]:
"""
调用重排服务 POST /rerank,返回分数列表与 meta。
Provider 和 URL 从 services_config 读取。
"""
if not docs:
return [], {}
try:
client = create_rerank_provider(service_profile=service_profile)
return client.rerank(query=query, docs=docs, timeout_sec=timeout_sec, top_n=top_n)
except Exception as e:
logger.warning("Rerank request failed: %s", e, exc_info=True)
return None, None
def _to_score(value: Any) -> float:
try:
if value is None:
return 0.0
return float(value)
except (TypeError, ValueError):
return 0.0
def _extract_named_query_score(matched_queries: Any, name: str) -> float:
if isinstance(matched_queries, dict):
return _to_score(matched_queries.get(name))
if isinstance(matched_queries, list):
return 1.0 if name in matched_queries else 0.0
return 0.0
def _collect_knn_score_components(
matched_queries: Any,
fusion: RerankFusionConfig,
) -> Dict[str, float]:
text_knn_score = _extract_named_query_score(matched_queries, "knn_query")
image_knn_score = _extract_named_query_score(matched_queries, "image_knn_query")
weighted_text_knn_score = text_knn_score * float(fusion.knn_text_weight)
weighted_image_knn_score = image_knn_score * float(fusion.knn_image_weight)
weighted_components = [weighted_text_knn_score, weighted_image_knn_score]
primary_knn_score = max(weighted_components)
support_knn_score = sum(weighted_components) - primary_knn_score
knn_score = primary_knn_score + float(fusion.knn_tie_breaker) * support_knn_score
return {
"text_knn_score": text_knn_score,
"image_knn_score": image_knn_score,
"weighted_text_knn_score": weighted_text_knn_score,
"weighted_image_knn_score": weighted_image_knn_score,
"primary_knn_score": primary_knn_score,
"support_knn_score": support_knn_score,
"knn_score": knn_score,
}
"""
原始变量:
ES总分
source_score:从 ES 返回的 matched_queries 里取 base_query 这条 named query 的分(dict 用具体分数;list 形式则“匹配到名字就算 1.0”)。
translation_score:所有名字以 base_query_trans_ 开头的 named query 的分,在 dict 里取 最大值;在 list 里只要存在这类名字就记为 1.0。
中间变量:计算原始query得分和翻译query得分
weighted_source :
weighted_translation : text_translation_weight * translation_score(由 fusion.text_translation_weight 配置)
区分主信号和辅助信号:
合成primary_text_score和support_text_score,取 更强 的那一路(原文检索 vs 翻译检索)作为主信号
primary_text_score : max(weighted_source, weighted_translation)
support_text_score : weighted_source + weighted_translation - primary_text_score
主信号和辅助信号的融合:dismax融合公式
最终text_score:主信号 + 0.25 * 辅助信号
text_score : primary_text_score + 0.25 * support_text_score
"""
def _collect_text_score_components(
matched_queries: Any,
fallback_es_score: float,
*,
translation_weight: float,
) -> Dict[str, float]:
source_score = _extract_named_query_score(matched_queries, "base_query")
translation_score = 0.0
if isinstance(matched_queries, dict):
for query_name, score in matched_queries.items():
if not isinstance(query_name, str):
continue
numeric_score = _to_score(score)
if query_name.startswith("base_query_trans_"):
translation_score = max(translation_score, numeric_score)
elif isinstance(matched_queries, list):
for query_name in matched_queries:
if not isinstance(query_name, str):
continue
if query_name.startswith("base_query_trans_"):
translation_score = 1.0
weighted_source = source_score
weighted_translation = float(translation_weight) * translation_score
weighted_components = [weighted_source, weighted_translation]
primary_text_score = max(weighted_components)
support_text_score = sum(weighted_components) - primary_text_score
text_score = primary_text_score + 0.25 * support_text_score
if text_score <= 0.0:
text_score = fallback_es_score
weighted_source = fallback_es_score
primary_text_score = fallback_es_score
support_text_score = 0.0
return {
"source_score": source_score,
"translation_score": translation_score,
"weighted_source_score": weighted_source,
"weighted_translation_score": weighted_translation,
"primary_text_score": primary_text_score,
"support_text_score": support_text_score,
"text_score": text_score,
}
def _format_debug_float(value: float) -> str:
return f"{float(value):.6g}"
def _build_hit_signal_bundle(
hit: Dict[str, Any],
fusion: CoarseRankFusionConfig | RerankFusionConfig,
) -> Dict[str, Any]:
raw_es_score = _to_score(hit.get("_raw_es_score", hit.get("_original_score", hit.get("_score"))))
hit["_raw_es_score"] = raw_es_score
matched_queries = hit.get("matched_queries")
text_components = _collect_text_score_components(
matched_queries,
raw_es_score,
translation_weight=fusion.text_translation_weight,
)
knn_components = _collect_knn_score_components(matched_queries, fusion)
return {
"doc_id": hit.get("_id"),
"es_score": raw_es_score,
"matched_queries": matched_queries,
"text_components": text_components,
"knn_components": knn_components,
"text_score": text_components["text_score"],
"knn_score": knn_components["knn_score"],
}
def _build_formula_summary(
term_rows: List[Dict[str, Any]],
style_boost: float,
final_score: float,
) -> str:
segments = [
(
f"{row['name']}=("
f"{_format_debug_float(row['raw_score'])}"
f"+{_format_debug_float(row['bias'])})"
f"^{_format_debug_float(row['exponent'])}"
f"={_format_debug_float(row['factor'])}"
)
for row in term_rows
]
if style_boost != 1.0:
segments.append(f"style_boost={_format_debug_float(style_boost)}")
segments.append(f"final={_format_debug_float(final_score)}")
return " | ".join(segments)
def _build_ltr_feature_block(
*,
signal_bundle: Dict[str, Any],
text_components: Dict[str, float],
knn_components: Dict[str, float],
rerank_score: Optional[float] = None,
fine_score: Optional[float] = None,
style_boost: float = 1.0,
stage_score: Optional[float] = None,
) -> Dict[str, Any]:
es_score = float(signal_bundle["es_score"])
text_score = float(signal_bundle["text_score"])
knn_score = float(signal_bundle["knn_score"])
source_score = float(text_components["source_score"])
translation_score = float(text_components["translation_score"])
text_knn_score = float(knn_components["text_knn_score"])
image_knn_score = float(knn_components["image_knn_score"])
return {
"es_score": es_score,
"text_score": text_score,
"knn_score": knn_score,
"rerank_score": None if rerank_score is None else float(rerank_score),
"fine_score": None if fine_score is None else float(fine_score),
"source_score": source_score,
"translation_score": translation_score,
"text_primary_score": float(text_components["primary_text_score"]),
"text_support_score": float(text_components["support_text_score"]),
"text_knn_score": text_knn_score,
"image_knn_score": image_knn_score,
"knn_primary_score": float(knn_components["primary_knn_score"]),
"knn_support_score": float(knn_components["support_knn_score"]),
"has_text_match": source_score > 0.0,
"has_translation_match": translation_score > 0.0,
"has_text_knn": text_knn_score > 0.0,
"has_image_knn": image_knn_score > 0.0,
"text_score_fallback_to_es": (
text_score == es_score and source_score <= 0.0 and translation_score <= 0.0
),
"style_boost": float(style_boost),
"has_style_boost": float(style_boost) > 1.0,
"stage_score": None if stage_score is None else float(stage_score),
}
def _compute_multiplicative_fusion(
*,
es_score: float,
text_score: float,
knn_score: float,
fusion: RerankFusionConfig,
rerank_score: Optional[float] = None,
fine_score: Optional[float] = None,
style_boost: float = 1.0,
) -> Dict[str, Any]:
term_rows: List[Dict[str, Any]] = []
def _add_term(name: str, raw_score: Optional[float], bias: float, exponent: float) -> None:
if raw_score is None:
return
factor = (max(float(raw_score), 0.0) + bias) ** exponent
term_rows.append(
{
"name": name,
"raw_score": float(raw_score),
"bias": float(bias),
"exponent": float(exponent),
"factor": factor,
}
)
_add_term("es_score", es_score, fusion.es_bias, fusion.es_exponent)
_add_term("rerank_score", rerank_score, fusion.rerank_bias, fusion.rerank_exponent)
_add_term("fine_score", fine_score, fusion.fine_bias, fusion.fine_exponent)
_add_term("text_score", text_score, fusion.text_bias, fusion.text_exponent)
_add_term("knn_score", knn_score, fusion.knn_bias, fusion.knn_exponent)
fused = 1.0
factors: Dict[str, float] = {}
inputs: Dict[str, float] = {}
for row in term_rows:
fused *= row["factor"]
factors[row["name"]] = row["factor"]
inputs[row["name"]] = row["raw_score"]
fused *= style_boost
factors["style_boost"] = style_boost
return {
"inputs": inputs,
"factors": factors,
"score": fused,
"summary": _build_formula_summary(term_rows, style_boost, fused),
}
def _multiply_coarse_fusion_factors(
es_score: float,
text_score: float,
knn_score: float,
fusion: CoarseRankFusionConfig,
) -> Tuple[float, float, float, float]:
es_factor = (max(es_score, 0.0) + fusion.es_bias) ** fusion.es_exponent
text_factor = (max(text_score, 0.0) + fusion.text_bias) ** fusion.text_exponent
knn_factor = (max(knn_score, 0.0) + fusion.knn_bias) ** fusion.knn_exponent
return es_factor, text_factor, knn_factor, es_factor * text_factor * knn_factor
def _has_selected_sku(hit: Dict[str, Any]) -> bool:
return bool(str(hit.get("_style_rerank_suffix") or "").strip())
def coarse_resort_hits(
es_hits: List[Dict[str, Any]],
fusion: Optional[CoarseRankFusionConfig] = None,
debug: bool = False,
) -> List[Dict[str, Any]]:
"""Coarse rank with es/text/knn multiplicative fusion."""
if not es_hits:
return []
f = fusion or CoarseRankFusionConfig()
coarse_debug: List[Dict[str, Any]] = [] if debug else []
for hit in es_hits:
signal_bundle = _build_hit_signal_bundle(hit, f)
es_score = signal_bundle["es_score"]
matched_queries = signal_bundle["matched_queries"]
text_components = signal_bundle["text_components"]
knn_components = signal_bundle["knn_components"]
text_score = signal_bundle["text_score"]
knn_score = signal_bundle["knn_score"]
es_factor, text_factor, knn_factor, coarse_score = _multiply_coarse_fusion_factors(
es_score=es_score,
text_score=text_score,
knn_score=knn_score,
fusion=f,
)
hit["_text_score"] = text_score
hit["_knn_score"] = knn_score
hit["_text_knn_score"] = knn_components["text_knn_score"]
hit["_image_knn_score"] = knn_components["image_knn_score"]
hit["_coarse_score"] = coarse_score
if debug:
ltr_features = _build_ltr_feature_block(
signal_bundle=signal_bundle,
text_components=text_components,
knn_components=knn_components,
stage_score=coarse_score,
)
coarse_debug.append(
{
"doc_id": hit.get("_id"),
"es_score": es_score,
"text_score": text_score,
"text_source_score": text_components["source_score"],
"text_translation_score": text_components["translation_score"],
"text_weighted_source_score": text_components["weighted_source_score"],
"text_weighted_translation_score": text_components["weighted_translation_score"],
"text_primary_score": text_components["primary_text_score"],
"text_support_score": text_components["support_text_score"],
"text_score_fallback_to_es": (
text_score == es_score
and text_components["source_score"] <= 0.0
and text_components["translation_score"] <= 0.0
),
"text_knn_score": knn_components["text_knn_score"],
"image_knn_score": knn_components["image_knn_score"],
"weighted_text_knn_score": knn_components["weighted_text_knn_score"],
"weighted_image_knn_score": knn_components["weighted_image_knn_score"],
"knn_primary_score": knn_components["primary_knn_score"],
"knn_support_score": knn_components["support_knn_score"],
"knn_score": knn_score,
"coarse_es_factor": es_factor,
"coarse_text_factor": text_factor,
"coarse_knn_factor": knn_factor,
"coarse_score": coarse_score,
"matched_queries": matched_queries,
"ltr_features": ltr_features,
}
)
es_hits.sort(key=lambda h: h.get("_coarse_score", h.get("_score", 0.0)), reverse=True)
return coarse_debug
def fuse_scores_and_resort(
es_hits: List[Dict[str, Any]],
rerank_scores: List[float],
fine_scores: Optional[List[float]] = None,
weight_es: float = DEFAULT_WEIGHT_ES,
weight_ai: float = DEFAULT_WEIGHT_AI,
fusion: Optional[RerankFusionConfig] = None,
style_intent_selected_sku_boost: float = 1.2,
debug: bool = False,
rerank_debug_rows: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""
将 ES 分数与重排分数按乘法公式融合(不修改原始 _score),并按融合分数降序重排。
融合形式(由 ``fusion`` 配置 bias / exponent)::
fused = (max(es,0)+b_es)^e_es
* (max(rerank,0)+b_r)^e_r
* (max(fine,0)+b_f)^e_f
* (max(text,0)+b_t)^e_t
* (max(knn,0)+b_k)^e_k
* sku_boost
其中 sku_boost 仅在当前 hit 已选中 SKU 时生效,默认值为 1.2,可通过
``query.style_intent.selected_sku_boost`` 配置。
对每条 hit 会写入:
- _original_score: 原始 ES 分数
- _raw_es_score: ES 原始总分(后续阶段始终复用,不依赖可能被改写的 `_score`)
- _rerank_score: 重排服务返回的分数
- _fused_score: 融合分数
- _text_score: 文本相关性分数(优先取 named queries 的 base_query 分数)
- _knn_score: KNN 分数(优先取 named queries 的 knn_query 分数)
Args:
es_hits: ES hits 列表(会被原地修改)
rerank_scores: 与 es_hits 等长的重排分数列表
weight_es: 兼容保留,当前未使用
weight_ai: 兼容保留,当前未使用
"""
n = len(es_hits)
if n == 0 or len(rerank_scores) != n:
return []
f = fusion or RerankFusionConfig()
fused_debug: List[Dict[str, Any]] = [] if debug else []
for idx, hit in enumerate(es_hits):
signal_bundle = _build_hit_signal_bundle(hit, f)
text_components = signal_bundle["text_components"]
knn_components = signal_bundle["knn_components"]
text_score = signal_bundle["text_score"]
knn_score = signal_bundle["knn_score"]
rerank_score = _to_score(rerank_scores[idx])
fine_score_raw = (
_to_score(fine_scores[idx])
if fine_scores is not None and len(fine_scores) == n
else _to_score(hit.get("_fine_score"))
)
fine_score = fine_score_raw if (fine_scores is not None and len(fine_scores) == n) or "_fine_score" in hit else None
sku_selected = _has_selected_sku(hit)
style_boost = style_intent_selected_sku_boost if sku_selected else 1.0
fusion_result = _compute_multiplicative_fusion(
es_score=signal_bundle["es_score"],
rerank_score=rerank_score,
fine_score=fine_score,
text_score=text_score,
knn_score=knn_score,
fusion=f,
style_boost=style_boost,
)
fused = fusion_result["score"]
hit["_original_score"] = hit.get("_score")
hit["_rerank_score"] = rerank_score
if fine_score is not None:
hit["_fine_score"] = fine_score
hit["_text_score"] = text_score
hit["_knn_score"] = knn_score
hit["_text_knn_score"] = knn_components["text_knn_score"]
hit["_image_knn_score"] = knn_components["image_knn_score"]
hit["_fused_score"] = fused
hit["_style_intent_selected_sku_boost"] = style_boost
if debug:
ltr_features = _build_ltr_feature_block(
signal_bundle=signal_bundle,
text_components=text_components,
knn_components=knn_components,
rerank_score=rerank_score,
fine_score=fine_score,
style_boost=style_boost,
stage_score=fused,
)
debug_entry = {
"doc_id": hit.get("_id"),
"score": fused,
"es_score": signal_bundle["es_score"],
"rerank_score": rerank_score,
"fine_score": fine_score,
"text_score": text_score,
"knn_score": knn_score,
"fusion_inputs": fusion_result["inputs"],
"fusion_factors": fusion_result["factors"],
"fusion_summary": fusion_result["summary"],
"text_source_score": text_components["source_score"],
"text_translation_score": text_components["translation_score"],
"text_weighted_source_score": text_components["weighted_source_score"],
"text_weighted_translation_score": text_components["weighted_translation_score"],
"text_primary_score": text_components["primary_text_score"],
"text_support_score": text_components["support_text_score"],
"text_knn_score": knn_components["text_knn_score"],
"image_knn_score": knn_components["image_knn_score"],
"weighted_text_knn_score": knn_components["weighted_text_knn_score"],
"weighted_image_knn_score": knn_components["weighted_image_knn_score"],
"knn_primary_score": knn_components["primary_knn_score"],
"knn_support_score": knn_components["support_knn_score"],
"text_score_fallback_to_es": (
text_score == signal_bundle["es_score"]
and text_components["source_score"] <= 0.0
and text_components["translation_score"] <= 0.0
),
"rerank_factor": fusion_result["factors"].get("rerank_score"),
"fine_factor": fusion_result["factors"].get("fine_score"),
"es_factor": fusion_result["factors"].get("es_score"),
"text_factor": fusion_result["factors"].get("text_score"),
"knn_factor": fusion_result["factors"].get("knn_score"),
"style_intent_selected_sku": sku_selected,
"style_intent_selected_sku_boost": style_boost,
"matched_queries": signal_bundle["matched_queries"],
"fused_score": fused,
"ltr_features": ltr_features,
}
if rerank_debug_rows is not None and idx < len(rerank_debug_rows):
debug_entry["rerank_input"] = rerank_debug_rows[idx]
fused_debug.append(debug_entry)
es_hits.sort(
key=lambda h: h.get("_fused_score", h.get("_score", 0.0)),
reverse=True,
)
return fused_debug
def run_rerank(
query: str,
es_response: Dict[str, Any],
language: str = "zh",
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
weight_es: float = DEFAULT_WEIGHT_ES,
weight_ai: float = DEFAULT_WEIGHT_AI,
rerank_query_template: str = "{query}",
rerank_doc_template: str = "{title}",
top_n: Optional[int] = None,
debug: bool = False,
fusion: Optional[RerankFusionConfig] = None,
style_intent_selected_sku_boost: float = 1.2,
fine_scores: Optional[List[float]] = None,
service_profile: Optional[str] = None,
) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], List[Dict[str, Any]]]:
"""
完整重排流程:从 es_response 取 hits -> 构造 docs -> 调服务 -> 融合分数并重排 -> 更新 max_score。
Provider 和 URL 从 services_config 读取。
top_n 可选;若传入,会透传给 /rerank(供云后端按 page+size 做部分重排)。
"""
hits = es_response.get("hits", {}).get("hits") or []
if not hits:
return es_response, None, []
query_text = str(rerank_query_template).format_map({"query": query})
rerank_debug_rows: Optional[List[Dict[str, Any]]] = [] if debug else None
docs = build_docs_from_hits(
hits,
language=language,
doc_template=rerank_doc_template,
debug_rows=rerank_debug_rows,
)
scores, meta = call_rerank_service(
query_text,
docs,
timeout_sec=timeout_sec,
top_n=top_n,
service_profile=service_profile,
)
if scores is None or len(scores) != len(hits):
return es_response, None, []
fused_debug = fuse_scores_and_resort(
hits,
scores,
fine_scores=fine_scores,
weight_es=weight_es,
weight_ai=weight_ai,
fusion=fusion,
style_intent_selected_sku_boost=style_intent_selected_sku_boost,
debug=debug,
rerank_debug_rows=rerank_debug_rows,
)
# 更新 max_score 为融合后的最高分
if hits:
top = hits[0].get("_fused_score", hits[0].get("_score", 0.0)) or 0.0
if "hits" in es_response:
es_response["hits"]["max_score"] = top
return es_response, meta, fused_debug
def run_lightweight_rerank(
query: str,
es_hits: List[Dict[str, Any]],
language: str = "zh",
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
rerank_query_template: str = "{query}",
rerank_doc_template: str = "{title}",
top_n: Optional[int] = None,
debug: bool = False,
fusion: Optional[RerankFusionConfig] = None,
style_intent_selected_sku_boost: float = 1.2,
service_profile: Optional[str] = "fine",
) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]], List[Dict[str, Any]]]:
"""Call lightweight reranker and rank by lightweight-model fusion."""
if not es_hits:
return [], {}, []
query_text = str(rerank_query_template).format_map({"query": query})
rerank_debug_rows: Optional[List[Dict[str, Any]]] = [] if debug else None
docs = build_docs_from_hits(
es_hits,
language=language,
doc_template=rerank_doc_template,
debug_rows=rerank_debug_rows,
)
scores, meta = call_rerank_service(
query_text,
docs,
timeout_sec=timeout_sec,
top_n=top_n,
service_profile=service_profile,
)
if scores is None or len(scores) != len(es_hits):
return None, None, []
f = fusion or RerankFusionConfig()
debug_rows: List[Dict[str, Any]] = [] if debug else []
for idx, hit in enumerate(es_hits):
signal_bundle = _build_hit_signal_bundle(hit, f)
text_score = signal_bundle["text_score"]
knn_score = signal_bundle["knn_score"]
fine_score = _to_score(scores[idx])
sku_selected = _has_selected_sku(hit)
style_boost = style_intent_selected_sku_boost if sku_selected else 1.0
fusion_result = _compute_multiplicative_fusion(
es_score=signal_bundle["es_score"],
fine_score=fine_score,
text_score=text_score,
knn_score=knn_score,
fusion=f,
style_boost=style_boost,
)
hit["_fine_score"] = fine_score
hit["_fine_fused_score"] = fusion_result["score"]
hit["_text_score"] = text_score
hit["_knn_score"] = knn_score
hit["_text_knn_score"] = signal_bundle["knn_components"]["text_knn_score"]
hit["_image_knn_score"] = signal_bundle["knn_components"]["image_knn_score"]
hit["_style_intent_selected_sku_boost"] = style_boost
if debug:
ltr_features = _build_ltr_feature_block(
signal_bundle=signal_bundle,
text_components=signal_bundle["text_components"],
knn_components=signal_bundle["knn_components"],
fine_score=fine_score,
style_boost=style_boost,
stage_score=fusion_result["score"],
)
row: Dict[str, Any] = {
"doc_id": hit.get("_id"),
"score": fusion_result["score"],
"fine_score": fine_score,
"text_score": text_score,
"knn_score": knn_score,
"fusion_inputs": fusion_result["inputs"],
"fusion_factors": fusion_result["factors"],
"fusion_summary": fusion_result["summary"],
"es_score": signal_bundle["es_score"],
"fine_factor": fusion_result["factors"].get("fine_score"),
"es_factor": fusion_result["factors"].get("es_score"),
"text_factor": fusion_result["factors"].get("text_score"),
"knn_factor": fusion_result["factors"].get("knn_score"),
"style_intent_selected_sku": sku_selected,
"style_intent_selected_sku_boost": style_boost,
"ltr_features": ltr_features,
}
if rerank_debug_rows is not None and idx < len(rerank_debug_rows):
row["rerank_input"] = rerank_debug_rows[idx]
debug_rows.append(row)
es_hits.sort(key=lambda h: h.get("_fine_fused_score", h.get("_fine_score", 0.0)), reverse=True)
return scores, meta, debug_rows