Commit 5f7d7f091df41dc3d99151161f8ca8e9bf744ca4

Authored by tangwang
1 parent 28e57bb1

性能测试报告.md

api/models.py
... ... @@ -151,9 +151,12 @@ class SearchRequest(BaseModel):
151 151 min_score: Optional[float] = Field(None, ge=0, description="最小相关性分数阈值")
152 152 highlight: bool = Field(False, description="是否高亮搜索关键词(暂不实现)")
153 153 debug: bool = Field(False, description="是否返回调试信息")
154   - enable_rerank: bool = Field(
155   - False,
156   - description="是否开启重排(调用外部重排服务对 ES 结果进行二次排序)"
  154 + enable_rerank: Optional[bool] = Field(
  155 + None,
  156 + description=(
  157 + "是否开启重排(调用外部重排服务对 ES 结果进行二次排序)。"
  158 + "不传则使用服务端配置 rerank.enabled(默认开启)。"
  159 + )
157 160 )
158 161 rerank_query_template: Optional[str] = Field(
159 162 None,
... ...
api/routes/search.py
... ... @@ -133,6 +133,16 @@ async def search(request: SearchRequest, http_request: Request):
133 133  
134 134 # Include performance summary in response
135 135 performance_summary = context.get_summary() if context else None
  136 + stage_timings = {
  137 + k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items()
  138 + }
  139 + total_ms = round(float(context.performance_metrics.total_duration or result.took_ms), 2)
  140 + context.logger.info(
  141 + "Before response | total_ms: %s | stage_timings_ms: %s",
  142 + total_ms,
  143 + stage_timings,
  144 + extra={'reqid': context.reqid, 'uid': context.uid}
  145 + )
136 146  
137 147 # Convert to response model
138 148 response = SearchResponse(
... ...
config/config.yaml
... ... @@ -113,6 +113,7 @@ function_score:
113 113  
114 114 # 重排配置(provider/URL 在 services.rerank)
115 115 rerank:
  116 + enabled: true
116 117 rerank_window: 1000
117 118 timeout_sec: 15.0
118 119 weight_es: 0.4
... ...
config/config_loader.py
... ... @@ -106,6 +106,7 @@ class RankingConfig:
106 106 @dataclass
107 107 class RerankConfig:
108 108 """重排配置(provider/URL 在 services.rerank)"""
  109 + enabled: bool = True
109 110 rerank_window: int = 1000
110 111 timeout_sec: float = 15.0
111 112 weight_es: float = 0.4
... ... @@ -310,6 +311,7 @@ class ConfigLoader:
310 311 # Parse Rerank (provider/URL in services.rerank)
311 312 rerank_data = config_data.get("rerank", {})
312 313 rerank = RerankConfig(
  314 + enabled=bool(rerank_data.get("enabled", True)),
313 315 rerank_window=int(rerank_data.get("rerank_window", 1000)),
314 316 timeout_sec=float(rerank_data.get("timeout_sec", 15.0)),
315 317 weight_es=float(rerank_data.get("weight_es", 0.4)),
... ... @@ -518,6 +520,7 @@ class ConfigLoader:
518 520 "functions": config.function_score.functions
519 521 },
520 522 "rerank": {
  523 + "enabled": config.rerank.enabled,
521 524 "rerank_window": config.rerank.rerank_window,
522 525 "timeout_sec": config.rerank.timeout_sec,
523 526 "weight_es": config.rerank.weight_es,
... ...
docs/性能测试报告.md 0 → 100644
... ... @@ -0,0 +1,230 @@
  1 +# 性能测试报告
  2 +
  3 +## 1. 文档目标
  4 +
  5 +本报告用于沉淀 `search / suggest / embedding / reranker` 四类接口的并发性能基线,并提供可复现的完整执行流程。
  6 +新同事可直接按本文命令重跑全流程,得到同结构结果文件并横向对比。
  7 +
  8 +## 2. 本次测试范围与方法
  9 +
  10 +测试范围:
  11 +- `backend_search` -> `POST /search/`
  12 +- `backend_suggest` -> `GET /search/suggestions`
  13 +- `embed_text` -> `POST /embed/text`
  14 +- `rerank` -> `POST /rerank`
  15 +
  16 +并发矩阵:
  17 +- `1 / 5 / 10 / 20`
  18 +
  19 +执行方式:
  20 +- 每组压测持续 `20s`
  21 +- 使用统一脚本 `scripts/perf_api_benchmark.py`
  22 +- 通过 `--scenario` 多值 + `--concurrency-list` 一次性跑完 `场景 x 并发`
  23 +
  24 +## 3. 压测工具优化说明(复用现有脚本)
  25 +
  26 +为了解决原脚本“一次只能跑一个场景+一个并发”的可用性问题,本次直接扩展现有脚本:
  27 +- `scripts/perf_api_benchmark.py`
  28 +
  29 +能力:
  30 +- 一条命令执行 `场景列表 x 并发列表` 全矩阵
  31 +- 输出单份 JSON 报告(含每组结果与 overall 汇总)
  32 +
  33 +示例:
  34 +
  35 +```bash
  36 +.venv/bin/python scripts/perf_api_benchmark.py \
  37 + --scenario backend_search,backend_suggest,embed_text,rerank \
  38 + --concurrency-list 1,5,10,20 \
  39 + --duration 20 \
  40 + --tenant-id 162 \
  41 + --output perf_reports/$(date +%F)/perf_matrix_report.json
  42 +```
  43 +
  44 +## 4. 测试环境快照(本次)
  45 +
  46 +时间:
  47 +- `2026-03-12 08:11:34 CST`
  48 +
  49 +代码版本:
  50 +- Git commit: `28e57bb`
  51 +- Python: `3.12.3`
  52 +
  53 +机器信息:
  54 +- OS: `Linux ai-db 6.8.0-71-generic`
  55 +- CPU: `Intel(R) Xeon(R) Platinum 8255C CPU @ 2.50GHz`
  56 +- vCPU: `8`
  57 +- 内存: `30Gi`(可用约 `15Gi`)
  58 +
  59 +服务健康:
  60 +- `GET http://127.0.0.1:6002/health` -> healthy
  61 +- `GET http://127.0.0.1:6005/health` -> embedding loaded (`tei`)
  62 +- `GET http://127.0.0.1:6006/health` -> translation healthy
  63 +- `GET http://127.0.0.1:6007/health` -> reranker loaded (`Qwen/Qwen3-Reranker-0.6B`)
  64 +
  65 +索引doc数/租户基本信息:
  66 +tenant_id = 162 :注意当前该租户总 doc 数只有53,reranker、suggest、search的性能指标跟租户的doc数高度相关。以后要补充一个
  67 +```
  68 +curl -u 'saas:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_count?pretty' -H 'Content-Type: application/json' -d '{
  69 + "query": {
  70 + "match_all": {}
  71 + }
  72 +}'
  73 +```
  74 +
  75 +## 5. 执行前准备(可复现步骤)
  76 +
  77 +### 5.1 环境与依赖
  78 +
  79 +```bash
  80 +cd /data/saas-search
  81 +source activate.sh
  82 +.venv/bin/python --version
  83 +```
  84 +
  85 +### 5.2 启动服务
  86 +
  87 +推荐:
  88 +
  89 +```bash
  90 +./scripts/service_ctl.sh start embedding translator reranker backend
  91 +```
  92 +
  93 +如果 `backend` 未成功常驻,可临时手动启动:
  94 +
  95 +```bash
  96 +.venv/bin/python main.py serve --host 0.0.0.0 --port 6002 --es-host http://localhost:9200
  97 +```
  98 +
  99 +### 5.3 健康检查
  100 +
  101 +```bash
  102 +curl -sS http://127.0.0.1:6002/health
  103 +curl -sS http://127.0.0.1:6005/health
  104 +curl -sS http://127.0.0.1:6006/health
  105 +curl -sS http://127.0.0.1:6007/health
  106 +```
  107 +
  108 +## 6. 压测执行命令(本次实际)
  109 +
  110 +```bash
  111 +cd /data/saas-search
  112 +.venv/bin/python scripts/perf_api_benchmark.py \
  113 + --scenario backend_search,backend_suggest,embed_text,rerank \
  114 + --concurrency-list 1,5,10,20 \
  115 + --duration 20 \
  116 + --tenant-id 162 \
  117 + --backend-base http://127.0.0.1:6002 \
  118 + --embedding-base http://127.0.0.1:6005 \
  119 + --translator-base http://127.0.0.1:6006 \
  120 + --reranker-base http://127.0.0.1:6007 \
  121 + --output perf_reports/2026-03-12/perf_matrix_report.json
  122 +```
  123 +
  124 +产物文件:
  125 +- `perf_reports/2026-03-12/perf_matrix_report.json`
  126 +- `results[]` 中每条包含 `scenario + concurrency` 的单组结果
  127 +- `overall` 为本次执行总体汇总
  128 +
  129 +## 7. 结果总览(本次实测)
  130 +
  131 +### 7.1 Search(backend_search)
  132 +
  133 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) |
  134 +|---:|---:|---:|---:|---:|---:|---:|
  135 +| 1 | 160 | 100.0% | 7.98 | 124.89 | 228.06 | 345.49 |
  136 +| 5 | 161 | 100.0% | 7.89 | 628.91 | 1271.49 | 1441.02 |
  137 +| 10 | 181 | 100.0% | 8.78 | 1129.23 | 1295.88 | 1330.96 |
  138 +| 20 | 161 | 100.0% | 7.63 | 2594.00 | 4706.44 | 4783.05 |
  139 +
  140 +### 7.2 Suggest(backend_suggest)
  141 +
  142 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) |
  143 +|---:|---:|---:|---:|---:|---:|---:|
  144 +| 1 | 3502 | 100.0% | 175.09 | 5.68 | 8.70 | 15.98 |
  145 +| 5 | 4168 | 100.0% | 208.10 | 23.93 | 36.93 | 59.53 |
  146 +| 10 | 4152 | 100.0% | 207.25 | 48.05 | 59.45 | 127.20 |
  147 +| 20 | 4190 | 100.0% | 208.99 | 95.20 | 110.74 | 181.37 |
  148 +
  149 +### 7.3 Embedding(embed_text)
  150 +
  151 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) |
  152 +|---:|---:|---:|---:|---:|---:|---:|
  153 +| 1 | 966 | 100.0% | 48.27 | 20.63 | 23.41 | 49.80 |
  154 +| 5 | 1796 | 100.0% | 89.57 | 55.55 | 69.62 | 109.84 |
  155 +| 10 | 2095 | 100.0% | 104.42 | 95.22 | 117.66 | 152.48 |
  156 +| 20 | 2393 | 100.0% | 118.70 | 167.37 | 212.21 | 318.70 |
  157 +
  158 +### 7.4 Reranker(rerank)
  159 +
  160 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P95(ms) | Max(ms) |
  161 +|---:|---:|---:|---:|---:|---:|---:|
  162 +| 1 | 802 | 100.0% | 40.06 | 24.87 | 37.45 | 49.63 |
  163 +| 5 | 796 | 100.0% | 39.53 | 125.70 | 190.02 | 218.60 |
  164 +| 10 | 853 | 100.0% | 41.89 | 235.87 | 315.37 | 402.27 |
  165 +| 20 | 836 | 100.0% | 40.92 | 481.98 | 723.56 | 781.81 |
  166 +
  167 +## 8. 指标解读与并发建议
  168 +
  169 +### 8.1 关键观察
  170 +
  171 +- `backend_search`:吞吐约 `8 rps` 平台化,延迟随并发上升明显,属于重链路(检索+向量+重排)特征。
  172 +- `backend_suggest`:吞吐高且稳定(约 `200+ rps`),对并发更友好。
  173 +- `embed_text`:随并发提升吞吐持续增长,延迟平滑上升,扩展性较好。
  174 +- `rerank`:吞吐在 `~40 rps` 附近平台化,延迟随并发线性抬升,符合模型推理瓶颈特征。
  175 +
  176 +### 8.2 并发压测建议
  177 +
  178 +- 冒烟并发:`1/5`
  179 +- 常规回归:`1/5/10/20`
  180 +- 稳态评估:建议把 `duration` 提升到 `60~300s`
  181 +- 峰值评估:在确认 timeout 与 max_errors 策略后,追加 `30/50` 并发
  182 +
  183 +## 9. 如何复现“完整全过程”
  184 +
  185 +1. 准备环境(第 5 节)
  186 +2. 启动服务并通过健康检查
  187 +3. 执行矩阵命令(第 6 节)
  188 +4. 查看结果:
  189 + - 原始明细:`perf_reports/<date>/perf_matrix_report.json` 的 `results[]`
  190 + - 汇总结果:同文件中的 `overall`
  191 +5. 若需导出到周报或 PR,直接拷贝本报告第 7 节四张表
  192 +
  193 +## 10. 常见问题与排障
  194 +
  195 +### 10.1 backend 端口起来又掉
  196 +
  197 +现象:
  198 +- `service_ctl status backend` 显示 `running=no`
  199 +
  200 +处理:
  201 +- 先看 `logs/backend.log`
  202 +- 用手动命令前台启动,确认根因:
  203 +
  204 +```bash
  205 +.venv/bin/python main.py serve --host 0.0.0.0 --port 6002 --es-host http://localhost:9200
  206 +```
  207 +
  208 +### 10.2 压测脚本依赖缺失
  209 +
  210 +现象:
  211 +- 报 `ModuleNotFoundError: httpx`
  212 +
  213 +处理:
  214 +- 使用项目虚拟环境执行:
  215 +
  216 +```bash
  217 +.venv/bin/python scripts/perf_api_benchmark.py -h
  218 +```
  219 +
  220 +### 10.3 某场景成功率下降
  221 +
  222 +排查顺序:
  223 +1. 看 `errors` 字段(HTTP码、timeout、payload校验失败)
  224 +2. 检查对应服务健康与日志
  225 +3. 缩小并发重跑单场景定位阈值
  226 +
  227 +## 11. 关联文件
  228 +
  229 +- 压测脚本:`scripts/perf_api_benchmark.py`
  230 +- 本次结果:`perf_reports/2026-03-12/perf_matrix_report.json`
... ...
docs/搜索API对接指南.md
... ... @@ -201,7 +201,7 @@ response = requests.post(url, headers=headers, json={&quot;query&quot;: &quot;芭比娃娃&quot;})
201 201 "min_score": 0.0,
202 202 "sku_filter_dimension": ["string"],
203 203 "debug": false,
204   - "enable_rerank": false,
  204 + "enable_rerank": null,
205 205 "rerank_query_template": "{query}",
206 206 "rerank_doc_template": "{title}",
207 207 "user_id": "string",
... ... @@ -225,7 +225,7 @@ response = requests.post(url, headers=headers, json={&quot;query&quot;: &quot;芭比娃娃&quot;})
225 225 | `min_score` | float | N | null | 最小相关性分数阈值 |
226 226 | `sku_filter_dimension` | array[string] | N | null | 子SKU筛选维度列表(见[SKU筛选维度](#35-sku筛选维度)) |
227 227 | `debug` | boolean | N | false | 是否返回调试信息 |
228   -| `enable_rerank` | boolean | N | false | 是否开启重排(调用外部重排服务对 ES 结果进行二次排序)。开启后若 `from+size<=rerank_window` 才会触发重排 |
  228 +| `enable_rerank` | boolean/null | N | null | 是否开启重排(调用外部重排服务对 ES 结果进行二次排序)。不传/传 null 使用服务端 `rerank.enabled`(默认开启)。开启后会先对 ES Top1000(`rerank_window`)重排,再按分页截取;若 `from+size>1000`,则不重排,直接按分页从 ES 返回 |
229 229 | `rerank_query_template` | string | N | null | 重排 query 模板(可选)。支持 `{query}` 占位符;不传则使用服务端配置 |
230 230 | `rerank_doc_template` | string | N | null | 重排 doc 模板(可选)。支持 `{title} {brief} {vendor} {description} {category_path}`;不传则使用服务端配置 |
231 231 | `user_id` | string | N | null | 用户ID(用于个性化,预留) |
... ...
perf_reports/2026-03-12/matrix_report/summary.md 0 → 100644
... ... @@ -0,0 +1,41 @@
  1 +# 性能测试矩阵结果
  2 +
  3 +- 生成时间: 2026-03-12 08:11:03
  4 +- 场景: backend_search, backend_suggest, embed_text, rerank
  5 +- 并发: 1, 5, 10, 20
  6 +
  7 +## backend_search
  8 +
  9 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P50 | P90 | P95 | P99 | Max |
  10 +|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
  11 +| 1 | 160 | 100.0% | 7.98 | 124.89 | 109.37 | 162.61 | 228.06 | 329.57 | 345.49 |
  12 +| 5 | 161 | 100.0% | 7.89 | 628.91 | 541.87 | 726.7 | 1271.49 | 1285.88 | 1441.02 |
  13 +| 10 | 181 | 100.0% | 8.78 | 1129.23 | 1100.46 | 1251.53 | 1295.88 | 1320.78 | 1330.96 |
  14 +| 20 | 161 | 100.0% | 7.63 | 2594.0 | 2303.96 | 4681.35 | 4706.44 | 4727.58 | 4783.05 |
  15 +
  16 +## backend_suggest
  17 +
  18 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P50 | P90 | P95 | P99 | Max |
  19 +|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
  20 +| 1 | 3502 | 100.0% | 175.09 | 5.68 | 4.36 | 8.42 | 8.7 | 10.43 | 15.98 |
  21 +| 5 | 4168 | 100.0% | 208.1 | 23.93 | 21.72 | 35.72 | 36.93 | 42.08 | 59.53 |
  22 +| 10 | 4152 | 100.0% | 207.25 | 48.05 | 46.72 | 55.72 | 59.45 | 70.74 | 127.2 |
  23 +| 20 | 4190 | 100.0% | 208.99 | 95.2 | 93.51 | 104.44 | 110.74 | 164.22 | 181.37 |
  24 +
  25 +## embed_text
  26 +
  27 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P50 | P90 | P95 | P99 | Max |
  28 +|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
  29 +| 1 | 966 | 100.0% | 48.27 | 20.63 | 20.0 | 21.14 | 23.41 | 30.03 | 49.8 |
  30 +| 5 | 1796 | 100.0% | 89.57 | 55.55 | 54.43 | 66.64 | 69.62 | 75.85 | 109.84 |
  31 +| 10 | 2095 | 100.0% | 104.42 | 95.22 | 98.74 | 112.52 | 117.66 | 135.94 | 152.48 |
  32 +| 20 | 2393 | 100.0% | 118.7 | 167.37 | 169.0 | 198.66 | 212.21 | 251.56 | 318.7 |
  33 +
  34 +## rerank
  35 +
  36 +| 并发 | 请求数 | 成功率 | 吞吐(RPS) | Avg(ms) | P50 | P90 | P95 | P99 | Max |
  37 +|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
  38 +| 1 | 802 | 100.0% | 40.06 | 24.87 | 23.0 | 30.96 | 37.45 | 43.98 | 49.63 |
  39 +| 5 | 796 | 100.0% | 39.53 | 125.7 | 113.04 | 178.39 | 190.02 | 202.09 | 218.6 |
  40 +| 10 | 853 | 100.0% | 41.89 | 235.87 | 224.75 | 274.4 | 315.37 | 383.74 | 402.27 |
  41 +| 20 | 836 | 100.0% | 40.92 | 481.98 | 454.32 | 565.75 | 723.56 | 764.15 | 781.81 |
... ...
scripts/perf_api_benchmark.py
... ... @@ -398,13 +398,34 @@ def aggregate_results(results: List[Dict[str, Any]]) -&gt; Dict[str, Any]:
398 398 }
399 399  
400 400  
  401 +def parse_csv_items(raw: str) -> List[str]:
  402 + return [x.strip() for x in str(raw or "").split(",") if x.strip()]
  403 +
  404 +
  405 +def parse_csv_ints(raw: str) -> List[int]:
  406 + values: List[int] = []
  407 + seen = set()
  408 + for item in parse_csv_items(raw):
  409 + try:
  410 + value = int(item)
  411 + except ValueError as exc:
  412 + raise ValueError(f"Invalid integer in CSV list: {item}") from exc
  413 + if value <= 0:
  414 + raise ValueError(f"Concurrency must be > 0, got {value}")
  415 + if value in seen:
  416 + continue
  417 + seen.add(value)
  418 + values.append(value)
  419 + return values
  420 +
  421 +
401 422 def parse_args() -> argparse.Namespace:
402 423 parser = argparse.ArgumentParser(description="Interface-level load test for search and related microservices")
403 424 parser.add_argument(
404 425 "--scenario",
405 426 type=str,
406 427 default="all",
407   - help="Scenario: backend_search | backend_suggest | embed_text | translate | rerank | all",
  428 + help="Scenario: backend_search | backend_suggest | embed_text | translate | rerank | all | comma-separated list",
408 429 )
409 430 parser.add_argument("--tenant-id", type=str, default="162", help="Tenant ID for backend search/suggest")
410 431 parser.add_argument("--duration", type=int, default=30, help="Duration seconds per scenario; <=0 means no duration cap")
... ... @@ -421,6 +442,12 @@ def parse_args() -&gt; argparse.Namespace:
421 442 parser.add_argument("--cases-file", type=str, default="", help="Optional JSON file to override/add request templates")
422 443 parser.add_argument("--output", type=str, default="", help="Optional output JSON path")
423 444 parser.add_argument("--pause", type=float, default=0.0, help="Pause seconds between scenarios in all mode")
  445 + parser.add_argument(
  446 + "--concurrency-list",
  447 + type=str,
  448 + default="",
  449 + help="Comma-separated concurrency list (e.g. 1,5,10,20). If set, overrides --concurrency.",
  450 + )
424 451 return parser.parse_args()
425 452  
426 453  
... ... @@ -432,21 +459,38 @@ async def main_async() -&gt; int:
432 459 if args.scenario == "all":
433 460 run_names = [x for x in all_names if x in scenarios]
434 461 else:
435   - if args.scenario not in scenarios:
436   - print(f"Unknown scenario: {args.scenario}")
  462 + requested = parse_csv_items(args.scenario)
  463 + if not requested:
  464 + print("No scenario specified.")
  465 + return 2
  466 + unknown = [name for name in requested if name not in scenarios]
  467 + if unknown:
  468 + print(f"Unknown scenario(s): {', '.join(unknown)}")
437 469 print(f"Available: {', '.join(sorted(scenarios.keys()))}")
438 470 return 2
439   - run_names = [args.scenario]
  471 + run_names = requested
440 472  
441 473 if not run_names:
442 474 print("No scenarios to run.")
443 475 return 2
444 476  
  477 + concurrency_values = [args.concurrency]
  478 + if args.concurrency_list:
  479 + try:
  480 + concurrency_values = parse_csv_ints(args.concurrency_list)
  481 + except ValueError as exc:
  482 + print(str(exc))
  483 + return 2
  484 + if not concurrency_values:
  485 + print("concurrency-list is empty after parsing.")
  486 + return 2
  487 +
445 488 print("Load test config:")
446 489 print(f" scenario={args.scenario}")
447 490 print(f" tenant_id={args.tenant_id}")
448 491 print(f" duration={args.duration}s")
449 492 print(f" concurrency={args.concurrency}")
  493 + print(f" concurrency_list={concurrency_values}")
450 494 print(f" max_requests={args.max_requests}")
451 495 print(f" timeout={args.timeout}s")
452 496 print(f" max_errors={args.max_errors}")
... ... @@ -456,29 +500,36 @@ async def main_async() -&gt; int:
456 500 print(f" reranker_base={args.reranker_base}")
457 501  
458 502 results: List[Dict[str, Any]] = []
459   - for i, name in enumerate(run_names, start=1):
  503 + total_jobs = len(run_names) * len(concurrency_values)
  504 + job_idx = 0
  505 + for name in run_names:
460 506 scenario = scenarios[name]
461   - print(f"\\n[{i}/{len(run_names)}] running {name} ...")
462   - result = await run_single_scenario(
463   - scenario=scenario,
464   - duration_sec=args.duration,
465   - concurrency=args.concurrency,
466   - max_requests=args.max_requests,
467   - max_errors=args.max_errors,
468   - )
469   - print(format_summary(result))
470   - results.append(result)
471   -
472   - if args.pause > 0 and i < len(run_names):
473   - await asyncio.sleep(args.pause)
  507 + for c in concurrency_values:
  508 + job_idx += 1
  509 + print(f"\\n[{job_idx}/{total_jobs}] running {name} @ concurrency={c} ...")
  510 + result = await run_single_scenario(
  511 + scenario=scenario,
  512 + duration_sec=args.duration,
  513 + concurrency=c,
  514 + max_requests=args.max_requests,
  515 + max_errors=args.max_errors,
  516 + )
  517 + result["concurrency"] = c
  518 + print(format_summary(result))
  519 + results.append(result)
  520 +
  521 + if args.pause > 0 and job_idx < total_jobs:
  522 + await asyncio.sleep(args.pause)
474 523  
475 524 final = {
476 525 "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
477 526 "config": {
478 527 "scenario": args.scenario,
  528 + "run_names": run_names,
479 529 "tenant_id": args.tenant_id,
480 530 "duration_sec": args.duration,
481 531 "concurrency": args.concurrency,
  532 + "concurrency_list": concurrency_values,
482 533 "max_requests": args.max_requests,
483 534 "timeout_sec": args.timeout,
484 535 "max_errors": args.max_errors,
... ...
search/es_query_builder.py
... ... @@ -815,45 +815,6 @@ class ESQueryBuilder:
815 815  
816 816 return filter_clauses
817 817  
818   - def add_spu_collapse(
819   - self,
820   - es_query: Dict[str, Any],
821   - spu_field: str,
822   - inner_hits_size: int = 3
823   - ) -> Dict[str, Any]:
824   - """
825   - Add SPU aggregation/collapse to query.
826   -
827   - Args:
828   - es_query: Existing ES query
829   - spu_field: Field containing SPU ID
830   - inner_hits_size: Number of SKUs to return per SPU
831   -
832   - Returns:
833   - Modified ES query
834   - """
835   - # Add collapse
836   - es_query["collapse"] = {
837   - "field": spu_field,
838   - "inner_hits": {
839   - "_source": False,
840   - "name": "top_docs",
841   - "size": inner_hits_size
842   - }
843   - }
844   -
845   - # Add cardinality aggregation to count unique SPUs
846   - if "aggs" not in es_query:
847   - es_query["aggs"] = {}
848   -
849   - es_query["aggs"]["unique_count"] = {
850   - "cardinality": {
851   - "field": spu_field
852   - }
853   - }
854   -
855   - return es_query
856   -
857 818 def add_sorting(
858 819 self,
859 820 es_query: Dict[str, Any],
... ...
search/searcher.py
... ... @@ -9,6 +9,7 @@ import os
9 9 import time, json
10 10 import logging
11 11 import hashlib
  12 +from string import Formatter
12 13  
13 14 from utils.es_client import ESClient
14 15 from query import QueryParser, ParsedQuery
... ... @@ -157,6 +158,75 @@ class Searcher:
157 158 return
158 159 es_query["_source"] = {"includes": self.source_fields}
159 160  
  161 + def _resolve_rerank_source_filter(self, doc_template: str) -> Dict[str, Any]:
  162 + """
  163 + Build a lightweight _source filter for rerank prefetch.
  164 +
  165 + Only fetch fields required by rerank doc template to reduce ES payload.
  166 + """
  167 + field_map = {
  168 + "title": "title",
  169 + "brief": "brief",
  170 + "vendor": "vendor",
  171 + "description": "description",
  172 + "category_path": "category_path",
  173 + }
  174 + includes: set[str] = set()
  175 + template = str(doc_template or "{title}")
  176 + for _, field_name, _, _ in Formatter().parse(template):
  177 + if not field_name:
  178 + continue
  179 + key = field_name.split(".", 1)[0].split("!", 1)[0].split(":", 1)[0]
  180 + mapped = field_map.get(key)
  181 + if mapped:
  182 + includes.add(mapped)
  183 +
  184 + # Fallback to title-only to keep rerank docs usable.
  185 + if not includes:
  186 + includes.add("title")
  187 +
  188 + return {"includes": sorted(includes)}
  189 +
  190 + def _fetch_hits_by_ids(
  191 + self,
  192 + index_name: str,
  193 + doc_ids: List[str],
  194 + source_spec: Optional[Any],
  195 + ) -> tuple[Dict[str, Dict[str, Any]], int]:
  196 + """
  197 + Fetch page documents by IDs for final response fill.
  198 +
  199 + Returns:
  200 + (hits_by_id, es_took_ms)
  201 + """
  202 + if not doc_ids:
  203 + return {}, 0
  204 +
  205 + body: Dict[str, Any] = {
  206 + "query": {
  207 + "ids": {
  208 + "values": doc_ids,
  209 + }
  210 + }
  211 + }
  212 + if source_spec is not None:
  213 + body["_source"] = source_spec
  214 +
  215 + resp = self.es_client.search(
  216 + index_name=index_name,
  217 + body=body,
  218 + size=len(doc_ids),
  219 + from_=0,
  220 + )
  221 + hits = resp.get("hits", {}).get("hits") or []
  222 + hits_by_id: Dict[str, Dict[str, Any]] = {}
  223 + for hit in hits:
  224 + hid = hit.get("_id")
  225 + if hid is None:
  226 + continue
  227 + hits_by_id[str(hid)] = hit
  228 + return hits_by_id, int(resp.get("took", 0) or 0)
  229 +
160 230 def search(
161 231 self,
162 232 query: str,
... ... @@ -173,7 +243,7 @@ class Searcher:
173 243 debug: bool = False,
174 244 language: str = "en",
175 245 sku_filter_dimension: Optional[List[str]] = None,
176   - enable_rerank: bool = False,
  246 + enable_rerank: Optional[bool] = None,
177 247 rerank_query_template: Optional[str] = None,
178 248 rerank_doc_template: Optional[str] = None,
179 249 ) -> SearchResult:
... ... @@ -206,9 +276,13 @@ class Searcher:
206 276 index_langs = tenant_cfg.get("index_languages") or []
207 277 enable_translation = len(index_langs) > 0
208 278 enable_embedding = self.config.query_config.enable_text_embedding
209   - # 重排仅由请求参数 enable_rerank 控制,唯一实现为调用外部 BGE 重排服务
210   - do_rerank = bool(enable_rerank)
211   - rerank_window = self.config.rerank.rerank_window or 1000
  279 + rc = self.config.rerank
  280 + effective_query_template = rerank_query_template or rc.rerank_query_template
  281 + effective_doc_template = rerank_doc_template or rc.rerank_doc_template
  282 + # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启)
  283 + rerank_enabled_by_config = bool(rc.enabled)
  284 + do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank)
  285 + rerank_window = rc.rerank_window or 1000
212 286 # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES
213 287 in_rerank_window = do_rerank and (from_ + size) <= rerank_window
214 288 es_fetch_from = 0 if in_rerank_window else from_
... ... @@ -219,7 +293,9 @@ class Searcher:
219 293  
220 294 context.logger.info(
221 295 f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, "
222   - f"enable_rerank={do_rerank}, in_rerank_window={in_rerank_window}, es_fetch=({es_fetch_from},{es_fetch_size}) | "
  296 + f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, "
  297 + f"enable_rerank(effective)={do_rerank}, in_rerank_window={in_rerank_window}, "
  298 + f"es_fetch=({es_fetch_from},{es_fetch_size}) | "
223 299 f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}",
224 300 extra={'reqid': context.reqid, 'uid': context.uid}
225 301 )
... ... @@ -231,8 +307,10 @@ class Searcher:
231 307 'es_fetch_from': es_fetch_from,
232 308 'es_fetch_size': es_fetch_size,
233 309 'in_rerank_window': in_rerank_window,
234   - 'rerank_query_template': rerank_query_template,
235   - 'rerank_doc_template': rerank_doc_template,
  310 + 'rerank_enabled_by_config': rerank_enabled_by_config,
  311 + 'enable_rerank_request': enable_rerank,
  312 + 'rerank_query_template': effective_query_template,
  313 + 'rerank_doc_template': effective_doc_template,
236 314 'filters': filters,
237 315 'range_filters': range_filters,
238 316 'facets': facets,
... ... @@ -323,26 +401,40 @@ class Searcher:
323 401 if sort_by:
324 402 es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order)
325 403  
  404 + # Keep requested response _source semantics for the final response fill.
  405 + response_source_spec = es_query.get("_source")
  406 +
  407 + # In rerank window, first pass only fetches minimal fields required by rerank template.
  408 + es_query_for_fetch = es_query
  409 + rerank_prefetch_source = None
  410 + if in_rerank_window:
  411 + rerank_prefetch_source = self._resolve_rerank_source_filter(effective_doc_template)
  412 + es_query_for_fetch = dict(es_query)
  413 + es_query_for_fetch["_source"] = rerank_prefetch_source
  414 +
326 415 # Extract size and from from body for ES client parameters
327   - body_for_es = {k: v for k, v in es_query.items() if k not in ['size', 'from']}
  416 + body_for_es = {k: v for k, v in es_query_for_fetch.items() if k not in ['size', 'from']}
328 417  
329 418 # Store ES query in context
330 419 context.store_intermediate_result('es_query', es_query)
  420 + if in_rerank_window and rerank_prefetch_source is not None:
  421 + context.store_intermediate_result('es_query_rerank_prefetch_source', rerank_prefetch_source)
331 422 context.store_intermediate_result('es_body_for_search', body_for_es)
332 423  
333 424 # Serialize ES query to compute a compact size + stable digest for correlation
334   - es_query_compact = json.dumps(es_query, ensure_ascii=False, separators=(",", ":"))
  425 + es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":"))
335 426 es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16]
336 427 knn_enabled = bool(enable_embedding and parsed_query.query_vector is not None)
337 428 vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0
338 429  
339 430 context.logger.info(
340   - "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | facets: %s",
  431 + "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | facets: %s | rerank_prefetch_source: %s",
341 432 len(es_query_compact),
342 433 es_query_digest,
343 434 "yes" if knn_enabled else "no",
344 435 vector_dims,
345 436 "yes" if facets else "no",
  437 + rerank_prefetch_source,
346 438 extra={'reqid': context.reqid, 'uid': context.uid}
347 439 )
348 440 _log_backend_verbose({
... ... @@ -355,7 +447,7 @@ class Searcher:
355 447 "knn_enabled": knn_enabled,
356 448 "vector_dims": vector_dims,
357 449 "has_facets": bool(facets),
358   - "query": es_query,
  450 + "query": es_query_for_fetch,
359 451 })
360 452 except Exception as e:
361 453 context.set_error(e)
... ... @@ -406,9 +498,6 @@ class Searcher:
406 498 from .rerank_client import run_rerank
407 499  
408 500 rerank_query = parsed_query.original_query if parsed_query else query
409   - rc = self.config.rerank
410   - effective_query_template = rerank_query_template or rc.rerank_query_template
411   - effective_doc_template = rerank_doc_template or rc.rerank_doc_template
412 501 es_response, rerank_meta, fused_debug = run_rerank(
413 502 query=rerank_query,
414 503 es_response=es_response,
... ... @@ -457,6 +546,41 @@ class Searcher:
457 546 es_response["hits"]["max_score"] = 0.0
458 547 else:
459 548 es_response["hits"]["max_score"] = 0.0
  549 +
  550 + # Page fill: fetch detailed fields only for final page hits.
  551 + if sliced:
  552 + if response_source_spec is False:
  553 + for hit in sliced:
  554 + hit.pop("_source", None)
  555 + context.logger.info(
  556 + "分页详情回填跳过 | 原查询 _source=false",
  557 + extra={'reqid': context.reqid, 'uid': context.uid}
  558 + )
  559 + else:
  560 + page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None]
  561 + details_by_id, fill_took = self._fetch_hits_by_ids(
  562 + index_name=index_name,
  563 + doc_ids=page_ids,
  564 + source_spec=response_source_spec,
  565 + )
  566 + filled = 0
  567 + for hit in sliced:
  568 + hid = hit.get("_id")
  569 + if hid is None:
  570 + continue
  571 + detail_hit = details_by_id.get(str(hid))
  572 + if detail_hit is None:
  573 + continue
  574 + if "_source" in detail_hit:
  575 + hit["_source"] = detail_hit.get("_source") or {}
  576 + filled += 1
  577 + if fill_took:
  578 + es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took)
  579 + context.logger.info(
  580 + f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms",
  581 + extra={'reqid': context.reqid, 'uid': context.uid}
  582 + )
  583 +
460 584 context.logger.info(
461 585 f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条",
462 586 extra={'reqid': context.reqid, 'uid': context.uid}
... ...
tests/test_search_rerank_window.py 0 → 100644
... ... @@ -0,0 +1,314 @@
  1 +from __future__ import annotations
  2 +
  3 +from dataclasses import dataclass
  4 +from pathlib import Path
  5 +from types import SimpleNamespace
  6 +from typing import Any, Dict, List
  7 +
  8 +import yaml
  9 +
  10 +from config import (
  11 + ConfigLoader,
  12 + FunctionScoreConfig,
  13 + IndexConfig,
  14 + QueryConfig,
  15 + RankingConfig,
  16 + RerankConfig,
  17 + SPUConfig,
  18 + SearchConfig,
  19 +)
  20 +from context import create_request_context
  21 +from search.searcher import Searcher
  22 +
  23 +
  24 +@dataclass
  25 +class _FakeParsedQuery:
  26 + original_query: str
  27 + query_normalized: str
  28 + rewritten_query: str
  29 + detected_language: str = "en"
  30 + translations: Dict[str, str] = None
  31 + query_vector: Any = None
  32 + domain: str = "default"
  33 +
  34 + def to_dict(self) -> Dict[str, Any]:
  35 + return {
  36 + "original_query": self.original_query,
  37 + "query_normalized": self.query_normalized,
  38 + "rewritten_query": self.rewritten_query,
  39 + "detected_language": self.detected_language,
  40 + "translations": self.translations or {},
  41 + "domain": self.domain,
  42 + }
  43 +
  44 +
  45 +class _FakeQueryParser:
  46 + def parse(self, query: str, tenant_id: str, generate_vector: bool, context: Any):
  47 + return _FakeParsedQuery(
  48 + original_query=query,
  49 + query_normalized=query,
  50 + rewritten_query=query,
  51 + translations={},
  52 + )
  53 +
  54 +
  55 +class _FakeQueryBuilder:
  56 + def build_query(self, **kwargs):
  57 + return {
  58 + "query": {"match_all": {}},
  59 + "size": kwargs["size"],
  60 + "from": kwargs["from_"],
  61 + }
  62 +
  63 + def build_facets(self, facets: Any):
  64 + return {}
  65 +
  66 + def add_sorting(self, es_query: Dict[str, Any], sort_by: str, sort_order: str):
  67 + return es_query
  68 +
  69 +
  70 +class _FakeESClient:
  71 + def __init__(self, total_hits: int = 5000):
  72 + self.calls: List[Dict[str, Any]] = []
  73 + self.total_hits = total_hits
  74 +
  75 + @staticmethod
  76 + def _apply_source_filter(src: Dict[str, Any], source_spec: Any) -> Dict[str, Any]:
  77 + if source_spec is None:
  78 + return dict(src)
  79 + if source_spec is False:
  80 + return {}
  81 + if isinstance(source_spec, dict):
  82 + includes = source_spec.get("includes") or []
  83 + elif isinstance(source_spec, list):
  84 + includes = source_spec
  85 + else:
  86 + includes = []
  87 + if not includes:
  88 + return dict(src)
  89 + return {k: v for k, v in src.items() if k in set(includes)}
  90 +
  91 + @staticmethod
  92 + def _full_source(doc_id: str) -> Dict[str, Any]:
  93 + return {
  94 + "spu_id": doc_id,
  95 + "title": {"en": f"product-{doc_id}"},
  96 + "brief": {"en": f"brief-{doc_id}"},
  97 + "vendor": {"en": f"vendor-{doc_id}"},
  98 + "skus": [],
  99 + }
  100 +
  101 + def search(self, index_name: str, body: Dict[str, Any], size: int, from_: int):
  102 + self.calls.append(
  103 + {"index_name": index_name, "body": body, "size": size, "from_": from_}
  104 + )
  105 + ids_query = (((body or {}).get("query") or {}).get("ids") or {}).get("values")
  106 + source_spec = (body or {}).get("_source")
  107 +
  108 + if isinstance(ids_query, list):
  109 + # Return reversed order intentionally; caller should restore original ranking order.
  110 + ids = [str(i) for i in ids_query][::-1]
  111 + hits = []
  112 + for doc_id in ids:
  113 + src = self._apply_source_filter(self._full_source(doc_id), source_spec)
  114 + hit = {"_id": doc_id, "_score": 1.0}
  115 + if source_spec is not False:
  116 + hit["_source"] = src
  117 + hits.append(hit)
  118 + else:
  119 + end = min(from_ + size, self.total_hits)
  120 + hits = []
  121 + for i in range(from_, end):
  122 + doc_id = str(i)
  123 + src = self._apply_source_filter(self._full_source(doc_id), source_spec)
  124 + hit = {"_id": doc_id, "_score": float(self.total_hits - i)}
  125 + if source_spec is not False:
  126 + hit["_source"] = src
  127 + hits.append(hit)
  128 +
  129 + return {
  130 + "took": 8,
  131 + "hits": {
  132 + "total": {"value": self.total_hits},
  133 + "max_score": hits[0]["_score"] if hits else 0.0,
  134 + "hits": hits,
  135 + },
  136 + }
  137 +
  138 +
  139 +def _build_search_config(*, rerank_enabled: bool = True, rerank_window: int = 1000):
  140 + return SearchConfig(
  141 + field_boosts={"title.en": 3.0},
  142 + indexes=[IndexConfig(name="default", label="default", fields=["title.en"])],
  143 + query_config=QueryConfig(enable_text_embedding=False, enable_query_rewrite=False),
  144 + ranking=RankingConfig(),
  145 + function_score=FunctionScoreConfig(),
  146 + rerank=RerankConfig(enabled=rerank_enabled, rerank_window=rerank_window),
  147 + spu_config=SPUConfig(enabled=False),
  148 + es_index_name="test_products",
  149 + tenant_config={},
  150 + es_settings={},
  151 + services={},
  152 + )
  153 +
  154 +
  155 +def _build_searcher(config: SearchConfig, es_client: _FakeESClient) -> Searcher:
  156 + searcher = Searcher(
  157 + es_client=es_client,
  158 + config=config,
  159 + query_parser=_FakeQueryParser(),
  160 + )
  161 + searcher.query_builder = _FakeQueryBuilder()
  162 + return searcher
  163 +
  164 +
  165 +def test_config_loader_rerank_enabled_defaults_true(tmp_path: Path):
  166 + config_data = {
  167 + "es_index_name": "test_products",
  168 + "field_boosts": {"title.en": 3.0},
  169 + "indexes": [{"name": "default", "label": "default", "fields": ["title.en"]}],
  170 + "query_config": {"supported_languages": ["en"], "default_language": "en"},
  171 + "spu_config": {"enabled": False},
  172 + "ranking": {"expression": "bm25()", "description": "test"},
  173 + "function_score": {"score_mode": "sum", "boost_mode": "multiply", "functions": []},
  174 + "rerank": {"rerank_window": 1000},
  175 + }
  176 + config_path = tmp_path / "config.yaml"
  177 + config_path.write_text(yaml.safe_dump(config_data), encoding="utf-8")
  178 +
  179 + loader = ConfigLoader(config_path)
  180 + loaded = loader.load_config(validate=False)
  181 +
  182 + assert loaded.rerank.enabled is True
  183 +
  184 +
  185 +def test_searcher_reranks_top_window_by_default(monkeypatch):
  186 + es_client = _FakeESClient()
  187 + searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client)
  188 + context = create_request_context(reqid="t1", uid="u1")
  189 +
  190 + monkeypatch.setattr(
  191 + "search.searcher.get_tenant_config_loader",
  192 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
  193 + )
  194 +
  195 + called: Dict[str, Any] = {"count": 0, "docs": 0}
  196 +
  197 + def _fake_run_rerank(**kwargs):
  198 + called["count"] += 1
  199 + called["docs"] = len(kwargs["es_response"]["hits"]["hits"])
  200 + return kwargs["es_response"], None, []
  201 +
  202 + monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
  203 +
  204 + result = searcher.search(
  205 + query="toy",
  206 + tenant_id="162",
  207 + from_=20,
  208 + size=10,
  209 + context=context,
  210 + enable_rerank=None,
  211 + )
  212 +
  213 + assert called["count"] == 1
  214 + assert called["docs"] == 1000
  215 + assert es_client.calls[0]["from_"] == 0
  216 + assert es_client.calls[0]["size"] == 1000
  217 + assert es_client.calls[0]["body"]["_source"] == {"includes": ["title"]}
  218 + assert len(es_client.calls) == 2
  219 + assert es_client.calls[1]["size"] == 10
  220 + assert es_client.calls[1]["from_"] == 0
  221 + assert es_client.calls[1]["body"]["query"]["ids"]["values"] == [str(i) for i in range(20, 30)]
  222 + assert len(result.results) == 10
  223 + assert result.results[0].spu_id == "20"
  224 + assert result.results[0].brief == "brief-20"
  225 +
  226 +
  227 +def test_searcher_rerank_prefetch_source_follows_doc_template(monkeypatch):
  228 + es_client = _FakeESClient()
  229 + searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client)
  230 + context = create_request_context(reqid="t1b", uid="u1b")
  231 +
  232 + monkeypatch.setattr(
  233 + "search.searcher.get_tenant_config_loader",
  234 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
  235 + )
  236 + monkeypatch.setattr("search.rerank_client.run_rerank", lambda **kwargs: (kwargs["es_response"], None, []))
  237 +
  238 + searcher.search(
  239 + query="toy",
  240 + tenant_id="162",
  241 + from_=0,
  242 + size=5,
  243 + context=context,
  244 + enable_rerank=None,
  245 + rerank_doc_template="{title} {vendor} {brief}",
  246 + )
  247 +
  248 + assert es_client.calls[0]["body"]["_source"] == {"includes": ["brief", "title", "vendor"]}
  249 +
  250 +
  251 +def test_searcher_skips_rerank_when_request_explicitly_false(monkeypatch):
  252 + es_client = _FakeESClient()
  253 + searcher = _build_searcher(_build_search_config(rerank_enabled=True), es_client)
  254 + context = create_request_context(reqid="t2", uid="u2")
  255 +
  256 + monkeypatch.setattr(
  257 + "search.searcher.get_tenant_config_loader",
  258 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
  259 + )
  260 +
  261 + called: Dict[str, int] = {"count": 0}
  262 +
  263 + def _fake_run_rerank(**kwargs):
  264 + called["count"] += 1
  265 + return kwargs["es_response"], None, []
  266 +
  267 + monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
  268 +
  269 + searcher.search(
  270 + query="toy",
  271 + tenant_id="162",
  272 + from_=20,
  273 + size=10,
  274 + context=context,
  275 + enable_rerank=False,
  276 + )
  277 +
  278 + assert called["count"] == 0
  279 + assert es_client.calls[0]["from_"] == 20
  280 + assert es_client.calls[0]["size"] == 10
  281 + assert len(es_client.calls) == 1
  282 +
  283 +
  284 +def test_searcher_skips_rerank_when_page_exceeds_window(monkeypatch):
  285 + es_client = _FakeESClient()
  286 + searcher = _build_searcher(_build_search_config(rerank_enabled=True, rerank_window=1000), es_client)
  287 + context = create_request_context(reqid="t3", uid="u3")
  288 +
  289 + monkeypatch.setattr(
  290 + "search.searcher.get_tenant_config_loader",
  291 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en"]}),
  292 + )
  293 +
  294 + called: Dict[str, int] = {"count": 0}
  295 +
  296 + def _fake_run_rerank(**kwargs):
  297 + called["count"] += 1
  298 + return kwargs["es_response"], None, []
  299 +
  300 + monkeypatch.setattr("search.rerank_client.run_rerank", _fake_run_rerank)
  301 +
  302 + searcher.search(
  303 + query="toy",
  304 + tenant_id="162",
  305 + from_=995,
  306 + size=10,
  307 + context=context,
  308 + enable_rerank=None,
  309 + )
  310 +
  311 + assert called["count"] == 0
  312 + assert es_client.calls[0]["from_"] == 995
  313 + assert es_client.calls[0]["size"] == 10
  314 + assert len(es_client.calls) == 1
... ...
utils/es_client.py
... ... @@ -258,13 +258,23 @@ class ESClient:
258 258 body.pop("collapse", None)
259 259  
260 260 try:
261   - return self.client.search(
  261 + response = self.client.search(
262 262 index=index_name,
263 263 body=body,
264 264 size=size,
265 265 from_=from_,
266 266 routing=routing,
267 267 )
  268 + # elasticsearch-py 8.x returns ObjectApiResponse; normalize to mutable dict
  269 + # so caller can safely patch hits/took during post-processing.
  270 + if hasattr(response, "body"):
  271 + payload = response.body
  272 + if isinstance(payload, dict):
  273 + return dict(payload)
  274 + return payload
  275 + if isinstance(response, dict):
  276 + return response
  277 + return dict(response)
268 278 except Exception as e:
269 279 logger.error(f"Search failed: {e}", exc_info=True)
270 280 raise RuntimeError(f"Elasticsearch search failed for index '{index_name}': {e}") from e
... ...