Commit 9f5994b421ab65556eab3b895d8d5c37ad3f3369

Authored by tangwang
1 parent efd435cf

reranker

api/routes/indexer.py
... ... @@ -5,6 +5,7 @@
5 5 """
6 6  
7 7 import asyncio
  8 +import re
8 9 from fastapi import APIRouter, HTTPException
9 10 from typing import Any, Dict, List
10 11 from pydantic import BaseModel, Field
... ... @@ -76,6 +77,25 @@ class BuildDocsFromDbRequest(BaseModel):
76 77 spu_ids: List[str] = Field(..., description="需要构建 doc 的 SPU ID 列表")
77 78  
78 79  
  80 +class EnrichContentItem(BaseModel):
  81 + """单条待生成内容理解字段的商品(仅需 spu_id + 标题)。"""
  82 + spu_id: str = Field(..., description="SPU ID")
  83 + title: str = Field(..., description="商品标题,用于 LLM 分析生成 qanchors / tags 等")
  84 +
  85 +
  86 +class EnrichContentRequest(BaseModel):
  87 + """
  88 + 内容理解字段生成请求:根据商品标题批量生成 qanchors、semantic_attributes、tags。
  89 + 供外部 indexer 在自行组织 doc 时调用,与翻译、向量化等微服务并列。
  90 + """
  91 + tenant_id: str = Field(..., description="租户 ID,用于缓存隔离")
  92 + items: List[EnrichContentItem] = Field(..., description="待分析的 SPU 列表(spu_id + title)")
  93 + languages: List[str] = Field(
  94 + default_factory=lambda: ["zh", "en"],
  95 + description="目标语言列表,需在支持范围内(zh/en/de/ru/fr),默认 zh, en",
  96 + )
  97 +
  98 +
79 99 @router.post("/reindex")
80 100 async def reindex_all(request: ReindexRequest):
81 101 """
... ... @@ -411,6 +431,152 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest):
411 431 raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
412 432  
413 433  
  434 +def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]:
  435 + """
  436 + 同步执行内容理解:调用 process_products.analyze_products,按语言批量跑 LLM,
  437 + 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。
  438 + """
  439 + from indexer.process_products import analyze_products, SUPPORTED_LANGS
  440 +
  441 + llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS]
  442 + if not llm_langs:
  443 + return [
  444 + {
  445 + "spu_id": it["spu_id"],
  446 + "qanchors": {},
  447 + "semantic_attributes": [],
  448 + "tags": [],
  449 + "error": "no supported languages (supported: %s)" % sorted(SUPPORTED_LANGS),
  450 + }
  451 + for it in items
  452 + ]
  453 +
  454 + products = [{"id": it["spu_id"], "title": (it.get("title") or "").strip()} for it in items]
  455 + dim_keys = [
  456 + "tags",
  457 + "target_audience",
  458 + "usage_scene",
  459 + "season",
  460 + "key_attributes",
  461 + "material",
  462 + "features",
  463 + ]
  464 +
  465 + # 按 spu_id 聚合:qanchors[lang], semantic_attributes[], tags[]
  466 + by_spu: Dict[str, Dict[str, Any]] = {}
  467 + for it in items:
  468 + sid = str(it["spu_id"])
  469 + by_spu[sid] = {"qanchors": {}, "semantic_attributes": [], "tags": []}
  470 +
  471 + for lang in llm_langs:
  472 + try:
  473 + rows = analyze_products(
  474 + products=products,
  475 + target_lang=lang,
  476 + batch_size=20,
  477 + tenant_id=tenant_id,
  478 + )
  479 + except Exception as e:
  480 + logger.warning("enrich-content analyze_products failed for lang=%s: %s", lang, e)
  481 + for it in items:
  482 + sid = str(it["spu_id"])
  483 + if "error" not in by_spu[sid]:
  484 + by_spu[sid]["error"] = str(e)
  485 + continue
  486 +
  487 + for row in rows:
  488 + spu_id = str(row.get("id") or "")
  489 + if spu_id not in by_spu:
  490 + continue
  491 + rec = by_spu[spu_id]
  492 + if row.get("error"):
  493 + rec["error"] = row["error"]
  494 + continue
  495 + anchor_text = str(row.get("anchor_text") or "").strip()
  496 + if anchor_text:
  497 + rec["qanchors"][lang] = anchor_text
  498 + for name in dim_keys:
  499 + raw = row.get(name)
  500 + if not raw:
  501 + continue
  502 + for part in re.split(r"[,;|/\n\t]+", str(raw)):
  503 + value = part.strip()
  504 + if not value:
  505 + continue
  506 + rec["semantic_attributes"].append({"lang": lang, "name": name, "value": value})
  507 + if name == "tags":
  508 + rec["tags"].append(value)
  509 +
  510 + # 去重 tags(保持顺序)
  511 + out = []
  512 + for it in items:
  513 + sid = str(it["spu_id"])
  514 + rec = by_spu[sid]
  515 + tags = list(dict.fromkeys(rec["tags"]))
  516 + out.append({
  517 + "spu_id": sid,
  518 + "qanchors": rec["qanchors"],
  519 + "semantic_attributes": rec["semantic_attributes"],
  520 + "tags": tags,
  521 + **({"error": rec["error"]} if rec.get("error") else {}),
  522 + })
  523 + return out
  524 +
  525 +
  526 +@router.post("/enrich-content")
  527 +async def enrich_content(request: EnrichContentRequest):
  528 + """
  529 + 内容理解字段生成接口:根据商品标题批量生成 qanchors、semantic_attributes、tags。
  530 +
  531 + 使用场景:
  532 + - 外部 indexer 采用「微服务组合」方式自己组织 doc 时,可调用本接口获取 LLM 生成的
  533 + 锚文本与语义属性,再与翻译、向量化结果合并写入 ES。
  534 + - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可
  535 + 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。
  536 +
  537 + 实现逻辑与 indexer.process_products.analyze_products 一致,支持多语言与 Redis 缓存。
  538 + """
  539 + try:
  540 + if not request.items:
  541 + raise HTTPException(status_code=400, detail="items cannot be empty")
  542 + if len(request.items) > 50:
  543 + raise HTTPException(
  544 + status_code=400,
  545 + detail="Maximum 50 items per request for enrich-content (LLM batch limit)",
  546 + )
  547 +
  548 + items_payload = [
  549 + {"spu_id": it.spu_id, "title": it.title or ""}
  550 + for it in request.items
  551 + ]
  552 + loop = asyncio.get_event_loop()
  553 + result = await loop.run_in_executor(
  554 + None,
  555 + lambda: _run_enrich_content(
  556 + tenant_id=request.tenant_id,
  557 + items=items_payload,
  558 + languages=request.languages or ["zh", "en"],
  559 + ),
  560 + )
  561 + return {
  562 + "tenant_id": request.tenant_id,
  563 + "results": result,
  564 + "total": len(result),
  565 + }
  566 + except HTTPException:
  567 + raise
  568 + except RuntimeError as e:
  569 + if "DASHSCOPE_API_KEY" in str(e) or "cannot call LLM" in str(e).lower():
  570 + raise HTTPException(
  571 + status_code=503,
  572 + detail="Content understanding service unavailable: DASHSCOPE_API_KEY not set",
  573 + )
  574 + raise HTTPException(status_code=500, detail=str(e))
  575 + except Exception as e:
  576 + logger.error(f"Error in enrich-content for tenant_id={request.tenant_id}: {e}", exc_info=True)
  577 + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  578 +
  579 +
414 580 @router.post("/documents")
415 581 async def get_documents(request: GetDocumentsRequest):
416 582 """
... ...
config/config.yaml
... ... @@ -184,6 +184,9 @@ services:
184 184 dtype: "float16"
185 185 enable_prefix_caching: true
186 186 enforce_eager: false
  187 + infer_batch_size: 64
  188 + sort_by_doc_length: true
  189 + length_sort_mode: "char" # char | token
187 190 instruction: "Given a web search query, retrieve relevant passages that answer the query"
188 191  
189 192 # SPU配置(已启用,使用嵌套skus)
... ...
docs/CNCLIP_SERVICE说明文档.md
... ... @@ -147,42 +147,7 @@ curl -sS -X POST "http://127.0.0.1:6005/embed/image" \
147 147  
148 148 返回应为向量数组(非空)。
149 149  
150   -## 8. 常见问题与排障
151   -
152   -### 8.1 “为什么 cnclip 没用 GPU?”
153   -
154   -先看启动命令是否显式写了:
155   -
156   -```bash
157   -CNCLIP_DEVICE=cpu ...
158   -```
159   -
160   -如果是,这就是预期行为,会强制 CPU。
161   -
162   -### 8.2 “我要求 cuda,但 service_ctl 说 mode mismatch”
163   -
164   -说明当前已有 CPU 模式实例在跑。按提示重启:
165   -
166   -```bash
167   -./scripts/service_ctl.sh stop cnclip
168   -CNCLIP_DEVICE=cuda ./scripts/service_ctl.sh start cnclip
169   -```
170   -
171   -### 8.3 进程被 `Killed`
172   -
173   -常见于资源竞争(尤其单卡同时运行 TEI/reranker/cnclip 时的启动峰值)。建议:
174   -
175   -1. 先起 `tei` 与 `cnclip`,确认稳定后再起 `reranker`。
176   -2. 适当下调 `reranker` 显存预算(`config/config.yaml` 的 `gpu_memory_utilization`)。
177   -3. 必要时将 CN-CLIP 模型从 `ViT-H-14` 调整到更小规模。
178   -
179   -### 8.4 gRPC 连接失败(`Connection refused`)
180   -
181   -- 检查端口:`lsof -i :51000`
182   -- 看服务日志:`tail -f logs/cnclip_service.log`
183   -- 确认客户端使用 `grpc://` 协议而非 `http://`
184   -
185   -## 9. 相关文档
  150 +## 8. 相关文档
186 151  
187 152 - 开发总览:`docs/QUICKSTART.md`
188 153 - TEI 专项:`docs/TEI_SERVICE说明文档.md`
... ...
docs/DEVELOPER_GUIDE.md
... ... @@ -324,7 +324,7 @@ services:
324 324 ### 7.6 新增后端清单(以 Qwen3-Reranker 为例)
325 325  
326 326 1. **实现协议**:在 `reranker/backends/qwen3_vllm.py` 中实现类,提供 `score_with_meta(query, docs, normalize) -> (scores, meta)`,输出与 docs 等长且顺序一致。
327   -2. **配置**:在 `config/config.yaml` 的 `services.rerank.backends` 下增加 `qwen3_vllm` 块(model_name、engine、max_model_len、gpu_memory_utilization 等);支持环境变量 `RERANK_BACKEND=qwen3_vllm`。
  327 +2. **配置**:在 `config/config.yaml` 的 `services.rerank.backends` 下增加 `qwen3_vllm` 块(model_name、engine、max_model_len、gpu_memory_utilization、`infer_batch_size`、`sort_by_doc_length`、`length_sort_mode` 等);支持环境变量 `RERANK_BACKEND=qwen3_vllm`。
328 328 3. **注册**:在 `reranker/backends/__init__.py` 的 `get_rerank_backend(name, config)` 中增加 `qwen3_vllm` 分支。
329 329 4. **服务启动**:`reranker/server.py` 启动时根据配置调用 `get_rerank_backend(backend_name, backend_cfg)` 得到实例。
330 330 5. **调用方**:无需修改;仅部署时启动使用新后端的 reranker 服务即可。
... ...
docs/搜索API对接指南.md
... ... @@ -31,23 +31,26 @@
31 31 - 4.5 [多语言字段说明](#45-多语言字段说明)
32 32  
33 33 5. [索引接口](#索引接口)
34   - - 5.0 [为租户创建索引](#50-为租户创建索引)
35   - - 5.1 [全量索引接口](#51-全量索引接口)
36   - - 5.2 [增量索引接口](#52-增量索引接口)
37   - - 5.3 [查询文档接口](#53-查询文档接口)
38   - - 5.4 [索引健康检查接口](#54-索引健康检查接口)
39   - - 5.5 [文档构建接口(正式对接)](#55-文档构建接口正式对接推荐)
40   - - 5.6 [文档构建接口(测试/自测)](#56-文档构建接口测试--自测)
  34 + - 5.0 [支撑外部 indexer 的三种方式](#50-支撑外部-indexer-的三种方式)
  35 + - 5.1 [为租户创建索引](#51-为租户创建索引)
  36 + - 5.2 [全量索引接口](#52-全量索引接口)
  37 + - 5.3 [增量索引接口](#53-增量索引接口)
  38 + - 5.4 [查询文档接口](#54-查询文档接口)
  39 + - 5.5 [索引健康检查接口](#55-索引健康检查接口)
  40 + - 5.6 [文档构建接口(正式对接)](#56-文档构建接口正式对接推荐)
  41 + - 5.7 [文档构建接口(测试/自测)](#57-文档构建接口测试--自测)
  42 + - 5.8 [内容理解字段生成接口](#58-内容理解字段生成接口)
41 43  
42 44 6. [管理接口](#管理接口)
43 45 - 6.1 [健康检查](#61-健康检查)
44 46 - 6.2 [获取配置](#62-获取配置)
45 47 - 6.3 [索引统计](#63-索引统计)
46 48  
47   -7. [微服务接口(向量、重排、翻译)](#7-微服务接口向量重排翻译)
  49 +7. [微服务接口(向量、重排、翻译、内容理解)](#7-微服务接口向量重排翻译)
48 50 - 7.1 [向量服务(Embedding)](#71-向量服务embedding)
49 51 - 7.2 [重排服务(Reranker)](#72-重排服务reranker)
50 52 - 7.3 [翻译服务(Translation)](#73-翻译服务translation)
  53 + - 7.4 [内容理解字段生成(Indexer 服务内)](#74-内容理解字段生成indexer-服务内)
51 54  
52 55 8. [常见场景示例](#8-常见场景示例)
53 56 - 8.1 [基础搜索与排序](#81-基础搜索与排序)
... ... @@ -144,12 +147,13 @@ curl -X POST "http://43.166.252.75:6002/search/" \
144 147 | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) |
145 148 | 构建ES文档(正式对接) | POST | `/indexer/build-docs` | 基于上游提供的 MySQL 行数据构建 ES doc,不写入 ES,供 Java 等调用后自行写入 |
146 149 | 构建ES文档(测试用) | POST | `/indexer/build-docs-from-db` | 仅在测试/调试时使用,根据 `tenant_id + spu_ids` 内部查库并构建 ES doc |
  150 +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、semantic_attributes、tags,供微服务组合方式使用 |
147 151 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 |
148 152 | 健康检查 | GET | `/admin/health` | 服务健康检查 |
149 153 | 获取配置 | GET | `/admin/config` | 获取租户配置 |
150 154 | 索引统计 | GET | `/admin/stats` | 获取租户索引统计信息(需 tenant_id) |
151 155  
152   -**微服务(独立端口,外部可直连)**:
  156 +**微服务(独立端口或 Indexer 内,外部可直连)**:
153 157  
154 158 | 服务 | 端口 | 接口 | 说明 |
155 159 |------|------|------|------|
... ... @@ -157,6 +161,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \
157 161 | 向量服务 | 6005 | `POST /embed/image` | 图片向量化 |
158 162 | 翻译服务 | 6006 | `POST /translate` | 文本翻译(Qwen/DeepL) |
159 163 | 重排服务 | 6007 | `POST /rerank` | 检索结果重排 |
  164 +| 内容理解(Indexer 内) | 6004 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 |
160 165  
161 166 ---
162 167  
... ... @@ -856,9 +861,23 @@ curl "http://localhost:6002/search/12345" -H "X-Tenant-ID: 162"
856 861 | 查询文档 | POST | `/indexer/documents` | 按 SPU ID 列表查询 ES 文档,不写入 ES |
857 862 | 构建 ES 文档(正式) | POST | `/indexer/build-docs` | 由上游提供 MySQL 行数据,返回 ES-ready 文档,不写 ES |
858 863 | 构建 ES 文档(测试) | POST | `/indexer/build-docs-from-db` | 由本服务查库并构建文档,仅测试/调试用 |
  864 +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、semantic_attributes、tags(供微服务组合方式使用) |
859 865 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务与数据库连接状态 |
860 866  
861   -### 5.0 为租户创建索引
  867 +#### 5.0 支撑外部 indexer 的三种方式
  868 +
  869 +本服务对**外部 indexer 程序**(如 Java 索引系统)提供三种对接方式,可按需选择:
  870 +
  871 +| 方式 | 说明 | 适用场景 |
  872 +|------|------|----------|
  873 +| **1)doc 填充接口** | 调用 `POST /indexer/build-docs` 或 `POST /indexer/build-docs-from-db`,由本服务基于 MySQL 行数据构建完整 ES 文档(含多语言、向量、规格等),**不写入 ES**,由调用方自行写入。 | 希望一站式拿到 ES-ready doc,由己方控制写 ES 的时机与索引名。 |
  874 +| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 Indexer 服务内接口 `POST /indexer/enrich-content`。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 |
  875 +| **3)本服务直接写 ES** | 调用全量索引 `POST /indexer/reindex`、增量索引 `POST /indexer/index`(指定 SPU ID 列表),由本服务从 MySQL 拉数并直接写入 ES。 | 自建运维、联调或不需要由 Java 写 ES 的场景。 |
  876 +
  877 +- **方式 1** 与 **方式 2** 下,ES 的写入方均为外部 indexer(或 Java),职责清晰。
  878 +- **方式 3** 下,本服务同时负责读库、构建 doc 与写 ES。
  879 +
  880 +### 5.1 为租户创建索引
862 881  
863 882 为租户创建索引需要两个步骤:
864 883  
... ... @@ -894,7 +913,7 @@ export ES_INDEX_NAMESPACE=uat_
894 913  
895 914 ---
896 915  
897   -### 5.1 全量索引接口
  916 +### 5.2 全量索引接口
898 917  
899 918 - **端点**: `POST /indexer/reindex`
900 919 - **描述**: 全量索引,将指定租户的所有SPU数据导入到ES索引(不会删除现有索引)。**推荐仅用于自测/运维场景**;生产环境下更推荐由 Java 等上游控制调度与写 ES。
... ... @@ -953,7 +972,7 @@ tail -f logs/api.log
953 972 tail -f logs/*.log
954 973 ```
955 974  
956   -> ⚠️ **重要提示**:如需 **创建索引结构**,请参考 [5.0 为租户创建索引](#50-为租户创建索引) 章节,使用 `./scripts/create_tenant_index.sh <tenant_id>`。创建后需要调用 `/indexer/reindex` 导入数据。
  975 +> ⚠️ **重要提示**:如需 **创建索引结构**,请参考 [5.1 为租户创建索引](#51-为租户创建索引) 章节,使用 `./scripts/create_tenant_index.sh <tenant_id>`。创建后需要调用 `/indexer/reindex` 导入数据。
957 976  
958 977 **查看索引日志**:
959 978  
... ... @@ -1000,7 +1019,7 @@ cat logs/indexer.log | jq &#39;select(.spu_id == &quot;123&quot;)&#39;
1000 1019 cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp, index_type, tenant_id, total_count, success_count, failed_count, elapsed_time}'
1001 1020 ```
1002 1021  
1003   -### 5.2 增量索引接口
  1022 +### 5.3 增量索引接口
1004 1023  
1005 1024 - **端点**: `POST /indexer/index`
1006 1025 - **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。**推荐仅作为内部/调试入口**;正式对接建议改用 `/indexer/build-docs`,由上游写 ES。
... ... @@ -1158,7 +1177,7 @@ curl -X POST &quot;http://localhost:6004/indexer/index&quot; \
1158 1177  
1159 1178 日志查询方式请参考[5.1节查看索引日志](#51-全量重建索引接口)部分。
1160 1179  
1161   -### 5.3 查询文档接口
  1180 +### 5.4 查询文档接口
1162 1181  
1163 1182 - **端点**: `POST /indexer/documents`
1164 1183 - **描述**: 查询文档接口,根据SPU ID列表获取ES文档数据(**不写入ES**)。用于查看、调试或验证SPU数据。
... ... @@ -1251,7 +1270,7 @@ curl -X POST &quot;http://localhost:6004/indexer/documents&quot; \
1251 1270 - `/indexer/documents`:用于查看、调试或验证SPU数据,不修改ES索引
1252 1271 - `/indexer/index`:用于实际的增量索引操作,将更新的SPU数据同步到ES
1253 1272  
1254   -### 5.4 索引健康检查接口
  1273 +### 5.5 索引健康检查接口
1255 1274  
1256 1275 - **端点**: `GET /indexer/health`
1257 1276 - **描述**: 检查索引服务健康状态(与 `api/routes/indexer.py` 中 `indexer_health_check` 一致)
... ... @@ -1280,9 +1299,9 @@ curl -X POST &quot;http://localhost:6004/indexer/documents&quot; \
1280 1299 curl -X GET "http://localhost:6004/indexer/health"
1281 1300 ```
1282 1301  
1283   -### 5.5 文档构建接口(正式对接推荐)
  1302 +### 5.6 文档构建接口(正式对接推荐)
1284 1303  
1285   -#### 5.5.1 `POST /indexer/build-docs`
  1304 +#### 5.6.1 `POST /indexer/build-docs`
1286 1305  
1287 1306 - **描述**:
1288 1307 基于调用方(通常是 Java 索引程序)提供的 **MySQL 行数据** 构建 ES 文档(doc),**不写入 ES**。
... ... @@ -1427,9 +1446,9 @@ curl -X POST &quot;http://localhost:6004/indexer/build-docs&quot; \
1427 1446 3. 调用 `/indexer/build-docs` 获取 ES-ready `docs`;
1428 1447 4. Java 使用自己的 ES 客户端写入 `search_products_tenant_{tenant_id}`。
1429 1448  
1430   -### 5.6 文档构建接口(测试 / 自测)
  1449 +### 5.7 文档构建接口(测试 / 自测)
1431 1450  
1432   -#### 5.6.1 `POST /indexer/build-docs-from-db`
  1451 +#### 5.7.1 `POST /indexer/build-docs-from-db`
1433 1452  
1434 1453 - **描述**:
1435 1454 仅用于测试/调试:调用方只提供 `tenant_id` 和 `spu_ids`,由 indexer 服务内部从 MySQL 查询 SPU/SKU/Option,然后调用与 `/indexer/build-docs` 相同的文档构建逻辑,返回 ES-ready doc。**生产环境请使用 `/indexer/build-docs`,由上游查库并写 ES。**
... ... @@ -1462,6 +1481,86 @@ curl -X POST &quot;http://127.0.0.1:6004/indexer/build-docs-from-db&quot; \
1462 1481  
1463 1482 返回结构与 `/indexer/build-docs` 相同,可直接用于对比 ES 实际文档或调试字段映射问题。
1464 1483  
  1484 +### 5.8 内容理解字段生成接口
  1485 +
  1486 +- **端点**: `POST /indexer/enrich-content`
  1487 +- **描述**: 根据商品标题批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。内部逻辑与 `indexer.process_products` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。
  1488 +
  1489 +#### 请求参数
  1490 +
  1491 +```json
  1492 +{
  1493 + "tenant_id": "170",
  1494 + "items": [
  1495 + { "spu_id": "223167", "title": "纯棉短袖T恤 夏季男装" },
  1496 + { "spu_id": "223168", "title": "12PCS Dolls with Bottles" }
  1497 + ],
  1498 + "languages": ["zh", "en"]
  1499 +}
  1500 +```
  1501 +
  1502 +| 参数 | 类型 | 必填 | 默认值 | 说明 |
  1503 +|------|------|------|--------|------|
  1504 +| `tenant_id` | string | Y | - | 租户 ID,用于缓存隔离 |
  1505 +| `items` | array | Y | - | 待分析列表,每项含 `spu_id`、`title`;**单次最多 50 条** |
  1506 +| `languages` | array[string] | N | `["zh", "en"]` | 目标语言,需在支持范围内:`zh`、`en`、`de`、`ru`、`fr` |
  1507 +
  1508 +#### 响应格式
  1509 +
  1510 +```json
  1511 +{
  1512 + "tenant_id": "170",
  1513 + "total": 2,
  1514 + "results": [
  1515 + {
  1516 + "spu_id": "223167",
  1517 + "qanchors": {
  1518 + "zh": "短袖T恤,纯棉,男装,夏季",
  1519 + "en": "cotton t-shirt, short sleeve, men, summer"
  1520 + },
  1521 + "semantic_attributes": [
  1522 + { "lang": "zh", "name": "tags", "value": "纯棉" },
  1523 + { "lang": "zh", "name": "usage_scene", "value": "日常" },
  1524 + { "lang": "en", "name": "tags", "value": "cotton" }
  1525 + ],
  1526 + "tags": ["纯棉", "短袖", "男装", "cotton", "short sleeve"]
  1527 + },
  1528 + {
  1529 + "spu_id": "223168",
  1530 + "qanchors": { "en": "dolls, toys, 12pcs" },
  1531 + "semantic_attributes": [],
  1532 + "tags": ["dolls", "toys"]
  1533 + }
  1534 + ]
  1535 +}
  1536 +```
  1537 +
  1538 +| 字段 | 类型 | 说明 |
  1539 +|------|------|------|
  1540 +| `results` | array | 与请求 `items` 一一对应,每项含 `spu_id`、`qanchors`、`semantic_attributes`、`tags` |
  1541 +| `results[].qanchors` | object | 按语言键的锚文本(逗号分隔短语),可写入 ES 文档的 `qanchors.{lang}` |
  1542 +| `results[].semantic_attributes` | array | 语义属性列表,每项为 `{ "lang", "name", "value" }`,可写入 ES 的 `semantic_attributes` nested 字段 |
  1543 +| `results[].tags` | array | 从语义属性中抽取的 `name=tags` 的 value 集合,可与业务原有 `tags` 合并后写入 ES 的 `tags` 字段 |
  1544 +| `results[].error` | string | 若该条处理失败(如 LLM 异常),会在此字段返回错误信息 |
  1545 +
  1546 +**错误响应**:
  1547 +- `400`: `items` 为空或超过 50 条
  1548 +- `503`: 未配置 `DASHSCOPE_API_KEY`,内容理解服务不可用
  1549 +
  1550 +#### 请求示例
  1551 +
  1552 +```bash
  1553 +curl -X POST "http://localhost:6004/indexer/enrich-content" \
  1554 + -H "Content-Type: application/json" \
  1555 + -d '{
  1556 + "tenant_id": "170",
  1557 + "items": [
  1558 + { "spu_id": "223167", "title": "纯棉短袖T恤 夏季男装" }
  1559 + ],
  1560 + "languages": ["zh", "en"]
  1561 + }'
  1562 +```
  1563 +
1465 1564 ---
1466 1565  
1467 1566 ## 管理接口
... ... @@ -1614,6 +1713,7 @@ START_TEI=0 ./scripts/service_ctl.sh restart embedding
1614 1713 - **启动**: `./scripts/start_reranker.sh`
1615 1714  
1616 1715 说明:默认后端为 `qwen3_vllm`(`Qwen/Qwen3-Reranker-0.6B`),需要可用 GPU 显存。
  1716 +补充:`docs` 的请求大小与模型推理 `batch size` 解耦。即使一次传入 1000 条文档,服务端也会按 `services.rerank.backends.qwen3_vllm.infer_batch_size` 自动拆分;若 `sort_by_doc_length=true`,会先按文档长度排序后分批,减少 padding,再按原输入顺序返回分数。`length_sort_mode` 可选 `char`(更快)或 `token`(更精确)。
1617 1717  
1618 1718 #### 7.2.1 `POST /rerank` — 结果重排
1619 1719  
... ... @@ -1746,6 +1846,16 @@ curl -X POST &quot;http://localhost:6006/translate&quot; \
1746 1846 curl "http://localhost:6006/health"
1747 1847 ```
1748 1848  
  1849 +### 7.4 内容理解字段生成(Indexer 服务内)
  1850 +
  1851 +内容理解字段生成接口部署在 **Indexer 服务**(默认端口 6004)内,与「翻译、向量化」等独立端口微服务并列,供采用**微服务组合**方式的 indexer 调用。
  1852 +
  1853 +- **Base URL**: Indexer 服务地址,如 `http://localhost:6004`
  1854 +- **路径**: `POST /indexer/enrich-content`
  1855 +- **说明**: 根据商品标题批量生成 `qanchors`、`semantic_attributes`、`tags`,用于拼装 ES 文档。内部使用大模型(需配置 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存;单次最多 50 条,建议批量调用以提升效率。
  1856 +
  1857 +请求/响应格式、示例及错误码见 [5.8 内容理解字段生成接口](#58-内容理解字段生成接口)。
  1858 +
1749 1859 ---
1750 1860  
1751 1861 ## 8. 常见场景示例
... ...
perf_reports/20260311/reranker_1000docs/report.md 0 → 100644
... ... @@ -0,0 +1,38 @@
  1 +# Reranker 1000-doc Performance Report (2026-03-11)
  2 +
  3 +Workload profile:
  4 +- backend: `qwen3_vllm` (`Qwen/Qwen3-Reranker-0.6B`)
  5 +- query: short e-commerce text (<100 tokens)
  6 +- docs/request: 1000 short titles/title+brief
  7 +- options: `sort_by_doc_length=true`, `length_sort_mode=char`
  8 +
  9 +## Results
  10 +
  11 +| infer_batch_size | concurrency | requests | rps | avg_ms | p95_ms | p99_ms |
  12 +|---:|---:|---:|---:|---:|---:|---:|
  13 +| 24 | 1 | 4 | 0.34 | 2962.42 | 4758.59 | 4969.54 |
  14 +| 32 | 1 | 4 | 0.56 | 1756.63 | 2285.96 | 2389.21 |
  15 +| 48 | 1 | 4 | 0.63 | 1570.78 | 2111.8 | 2206.45 |
  16 +| 64 | 1 | 4 | 0.68 | 1448.87 | 2014.51 | 2122.44 |
  17 +| 80 | 1 | 3 | 0.64 | 1546.78 | 2091.47 | 2157.13 |
  18 +| 96 | 1 | 3 | 0.64 | 1534.44 | 2202.48 | 2288.99 |
  19 +| 96 | 1 | 4 | 0.52 | 1914.41 | 2215.05 | 2216.08 |
  20 +| 24 | 4 | 8 | 0.46 | 8614.9 | 9886.68 | 9887.0 |
  21 +| 32 | 4 | 8 | 0.62 | 6432.39 | 6594.11 | 6595.41 |
  22 +| 48 | 4 | 8 | 0.72 | 5451.5 | 5495.01 | 5500.29 |
  23 +| 64 | 4 | 8 | 0.76 | 5217.79 | 5329.15 | 5332.3 |
  24 +| 80 | 4 | 6 | 0.49 | 7069.9 | 9198.54 | 9208.77 |
  25 +| 96 | 4 | 6 | 0.76 | 4302.86 | 5139.19 | 5149.08 |
  26 +| 96 | 4 | 8 | 0.81 | 4852.78 | 5038.37 | 5058.76 |
  27 +
  28 +## Decision
  29 +
  30 +- For latency-sensitive online rerank (single request with 1000 docs), `infer_batch_size=64` gives the best c=1 latency in this run set.
  31 +- For higher concurrency c=4, `infer_batch_size=96` has slightly higher throughput, but degrades c=1 latency noticeably.
  32 +- Default kept at **`infer_batch_size=64`** as balanced/safer baseline for mixed traffic.
  33 +
  34 +## Reproduce
  35 +
  36 +```bash
  37 +./scripts/benchmark_reranker_1000docs.sh
  38 +```
... ...
reranker/DEPLOYMENT_AND_TUNING.md 0 → 100644
... ... @@ -0,0 +1,210 @@
  1 +# Reranker 部署与性能调优手册(Qwen3-vLLM)
  2 +
  3 +本文档沉淀当前项目在电商搜索重排场景下的可复用实践,覆盖:
  4 +
  5 +- 环境准备与安装部署
  6 +- `qwen3_vllm` 配置项与优化思路
  7 +- 1000-doc 场景压测流程
  8 +- 关键结论与推荐默认参数
  9 +- 常见故障排查
  10 +
  11 +适用范围:
  12 +
  13 +- 重排后端:`services.rerank.backend: qwen3_vllm`
  14 +- 模型:`Qwen/Qwen3-Reranker-0.6B`
  15 +- 场景:query 较短(通常 < 100 tokens),doc 为商品标题或标题+简短描述,单请求 docs 约 1000 条
  16 +
  17 +## 1. 环境基线
  18 +
  19 +当前验证环境(2026-03-11):
  20 +
  21 +- GPU:`Tesla T4 16GB`
  22 +- Driver / CUDA:`570.158.01 / 12.8`
  23 +- Python:`3.12.3`
  24 +- 关键依赖:`vllm==0.17.0`、`torch==2.10.0+cu128`、`transformers==4.57.6`、`fastapi==0.135.1`、`uvicorn==0.41.0`
  25 +
  26 +## 2. 环境准备与安装
  27 +
  28 +### 2.1 准备 reranker 独立虚拟环境
  29 +
  30 +```bash
  31 +./scripts/setup_reranker_venv.sh
  32 +```
  33 +
  34 +### 2.2 基础检查
  35 +
  36 +```bash
  37 +nvidia-smi
  38 +./.venv-reranker/bin/python -c "import torch; print(torch.cuda.is_available())"
  39 +./.venv-reranker/bin/python -c "import vllm, transformers; print(vllm.__version__, transformers.__version__)"
  40 +```
  41 +
  42 +## 3. 部署与运行
  43 +
  44 +### 3.1 配置(`config/config.yaml`)
  45 +
  46 +当前推荐基线:
  47 +
  48 +```yaml
  49 +services:
  50 + rerank:
  51 + backend: "qwen3_vllm"
  52 + backends:
  53 + qwen3_vllm:
  54 + model_name: "Qwen/Qwen3-Reranker-0.6B"
  55 + engine: "vllm"
  56 + max_model_len: 256
  57 + tensor_parallel_size: 1
  58 + gpu_memory_utilization: 0.36
  59 + dtype: "float16"
  60 + enable_prefix_caching: true
  61 + enforce_eager: false
  62 + infer_batch_size: 64
  63 + sort_by_doc_length: true
  64 + length_sort_mode: "char" # char | token
  65 +```
  66 +
  67 +### 3.2 启停命令
  68 +
  69 +推荐统一使用:
  70 +
  71 +```bash
  72 +./scripts/service_ctl.sh start reranker
  73 +./scripts/service_ctl.sh status reranker
  74 +./scripts/service_ctl.sh stop reranker
  75 +```
  76 +
  77 +也可直接运行原生脚本:
  78 +
  79 +```bash
  80 +./scripts/start_reranker.sh
  81 +./scripts/stop_reranker.sh
  82 +```
  83 +
  84 +健康检查:
  85 +
  86 +```bash
  87 +curl -sS http://127.0.0.1:6007/health
  88 +```
  89 +
  90 +## 4. 核心优化点(已落地)
  91 +
  92 +### 4.1 请求大小与推理 batch 解耦
  93 +
  94 +- 调用方一次可传入 1000 docs(业务需求)
  95 +- 服务端按 `infer_batch_size` 自动拆批推理(模型效率需求)
  96 +
  97 +### 4.2 先排序再分批,降低 padding 浪费
  98 +
  99 +- `sort_by_doc_length: true`:按长度排序后再分批
  100 +- `length_sort_mode: "char"`:短文本场景下开销更低,默认推荐
  101 +- `length_sort_mode: "token"`:长度估计更精确,但有额外 tokenizer 开销
  102 +
  103 +### 4.3 全局去重后回填
  104 +
  105 +- 对 docs 进行全局去重(非“仅相邻去重”)
  106 +- 推理后按原请求顺序回填 scores,保证接口契约稳定
  107 +
  108 +### 4.4 启动稳定性修复
  109 +
  110 +- `service_ctl.sh` 对 reranker 使用独立启动路径
  111 +- 增加“稳定健康检查”(连续健康探测)避免“刚 healthy 即退出”的假阳性
  112 +
  113 +## 5. 性能调优流程(标准流程)
  114 +
  115 +### 5.1 使用一键压测脚本
  116 +
  117 +```bash
  118 +./scripts/benchmark_reranker_1000docs.sh
  119 +```
  120 +
  121 +输出目录:
  122 +
  123 +- `perf_reports/<date>/reranker_1000docs/*.json`
  124 +- `perf_reports/<date>/reranker_1000docs/report.md`
  125 +
  126 +### 5.2 常用参数扫描
  127 +
  128 +默认扫描:
  129 +
  130 +- `infer_batch_size`: `24 32 48 64`
  131 +- 并发组:`c=1`(看单请求延迟)、`c=4`(看并发吞吐与尾延迟)
  132 +
  133 +可通过环境变量覆盖:
  134 +
  135 +- `BATCH_SIZES`
  136 +- `C1_REQUESTS`
  137 +- `C4_REQUESTS`
  138 +- `TENANT_ID`
  139 +- `RERANK_BASE`
  140 +
  141 +### 5.3 运行时临时覆盖服务参数
  142 +
  143 +用于“同机对比不同参数”:
  144 +
  145 +- `RERANK_VLLM_INFER_BATCH_SIZE`
  146 +- `RERANK_VLLM_SORT_BY_DOC_LENGTH`
  147 +- `RERANK_VLLM_LENGTH_SORT_MODE`
  148 +
  149 +## 6. 本轮关键结论(2026-03-11)
  150 +
  151 +基于报告:
  152 +
  153 +- `perf_reports/20260311/reranker_1000docs/report.md`
  154 +
  155 +结论:
  156 +
  157 +- 对在线重排更重要的单请求延迟(`c=1`)指标,`infer_batch_size=64` 最优
  158 +- `infer_batch_size=96` 在更高并发下吞吐略高,但会牺牲单请求延迟稳定性
  159 +- 当前默认选择 `infer_batch_size=64` 作为平衡点
  160 +
  161 +## 7. 生产建议
  162 +
  163 +- 默认保持:`infer_batch_size: 64`、`sort_by_doc_length: true`、`length_sort_mode: "char"`
  164 +- 满足以下条件时可考虑提高到 `96`:业务以吞吐优先、可接受更高单请求延迟、已通过同机同数据压测验证收益
  165 +- 每次改动后都必须复跑 `benchmark_reranker_1000docs.sh` 并归档结果
  166 +
  167 +## 8. 故障排查
  168 +
  169 +### 8.1 `start reranker` 显示 healthy 后很快退出
  170 +
  171 +排查顺序:
  172 +
  173 +```bash
  174 +./scripts/service_ctl.sh status reranker
  175 +tail -n 200 logs/reranker.log
  176 +lsof -i :6007 -P -n
  177 +```
  178 +
  179 +说明:
  180 +
  181 +- 该类问题已在 `scripts/service_ctl.sh` 中通过 reranker 独立启动分支与稳定健康检查进行修复
  182 +- 若仍出现,优先检查 GPU 资源争用、vLLM 初始化失败或端口冲突
  183 +
  184 +### 8.2 启动慢
  185 +
  186 +现象:
  187 +
  188 +- 首次启动会加载模型、torch.compile、CUDA graph capture,耗时较长
  189 +
  190 +建议:
  191 +
  192 +- 启动时预留足够健康检查窗口
  193 +- 避免频繁重启(编译缓存在 `.runtime/reranker` 下可复用)
  194 +
  195 +### 8.3 OOM 或显存不足
  196 +
  197 +优先调整:
  198 +
  199 +- 降低 `gpu_memory_utilization`
  200 +- 降低 `infer_batch_size`
  201 +- 检查是否有其他进程占用同卡
  202 +
  203 +## 9. 变更与验证清单
  204 +
  205 +每次 reranker 调优改动后,至少完成:
  206 +
  207 +- 配置变更已同步 `config/config.yaml`
  208 +- 文档变更已同步 `reranker/README.md` 与本手册
  209 +- 压测报告已归档到 `perf_reports/<date>/reranker_1000docs/`
  210 +- 启停验证通过:`./scripts/service_ctl.sh start reranker`、`./scripts/service_ctl.sh status reranker`、`curl http://127.0.0.1:6007/health`
... ...
reranker/README.md
1 1 # Reranker 模块
2 2  
3   -**请求示例**见 `docs/QUICKSTART.md` §3.5。扩展规范见 `docs/DEVELOPER_GUIDE.md` §7。
  3 +**请求示例**见 `docs/QUICKSTART.md` §3.5。扩展规范见 `docs/DEVELOPER_GUIDE.md` §7。部署与调优实战见 `reranker/DEPLOYMENT_AND_TUNING.md`。
4 4  
5 5 ---
6 6  
... ... @@ -48,7 +48,13 @@ services:
48 48 enable_warmup: true
49 49 qwen3_vllm:
50 50 model_name: "Qwen/Qwen3-Reranker-0.6B"
51   - max_model_len: 8192
  51 + max_model_len: 256
  52 + infer_batch_size: 64
  53 + sort_by_doc_length: true
  54 + length_sort_mode: "char" # char | token
  55 + enable_prefix_caching: true
  56 + enforce_eager: false
  57 + instruction: "Given a web search query, retrieve relevant passages that answer the query"
52 58 qwen3_transformers:
53 59 model_name: "Qwen/Qwen3-Reranker-0.6B"
54 60 instruction: "Given a web search query, retrieve relevant passages that answer the query"
... ... @@ -57,7 +63,6 @@ services:
57 63 use_fp16: true
58 64 tensor_parallel_size: 1
59 65 gpu_memory_utilization: 0.8
60   - enable_prefix_caching: true
61 66 instruction: "Given a web search query, retrieve relevant passages that answer the query"
62 67 ```
63 68  
... ... @@ -69,6 +74,12 @@ services:
69 74 ```
70 75 该脚本会使用隔离环境 `.venv-reranker`;首次请先执行 `./scripts/setup_reranker_venv.sh`。
71 76  
  77 +## 性能压测(1000 docs)
  78 +```bash
  79 +./scripts/benchmark_reranker_1000docs.sh
  80 +```
  81 +输出目录:`perf_reports/<date>/reranker_1000docs/`。
  82 +
72 83 ## API
73 84 ### Health
74 85 ```
... ... @@ -118,5 +129,7 @@ uvicorn reranker.server:app --host 0.0.0.0 --port 6007 --log-level info
118 129 ## Notes
119 130 - 无请求级缓存;输入按字符串去重后推理,再按原始顺序回填分数。
120 131 - 空或 null 的 doc 跳过并计为 0。
  132 +- **Qwen3-vLLM 分批策略**:`docs` 请求体可为 1000+,服务端会按 `infer_batch_size` 拆分;当 `sort_by_doc_length=true` 时,会先按文档长度排序后分批,减少 padding 开销,最终再按输入顺序回填分数。`length_sort_mode` 支持 `char`(默认,更快)与 `token`(更精确)。
  133 +- 运行时可用环境变量临时覆盖批量参数:`RERANK_VLLM_INFER_BATCH_SIZE`、`RERANK_VLLM_SORT_BY_DOC_LENGTH`、`RERANK_VLLM_LENGTH_SORT_MODE`。
121 134 - **Qwen3-vLLM**:参考 [Qwen3-Reranker-0.6B](https://huggingface.co/Qwen/Qwen3-Reranker-0.6B),需 GPU 与较多显存;与 BGE 相比适合长文本、高吞吐场景(vLLM 前缀缓存)。
122 135 - **Qwen3-Transformers**:官方 Transformers Usage 方式,无需 vLLM;适合 CPU 或小显存,可选 `attn_implementation: "flash_attention_2"` 加速。
... ...
reranker/backends/batching_utils.py 0 → 100644
... ... @@ -0,0 +1,41 @@
  1 +"""Utilities for reranker batching and deduplication."""
  2 +
  3 +from __future__ import annotations
  4 +
  5 +from typing import Iterable, List, Sequence, Tuple
  6 +
  7 +
  8 +def deduplicate_with_positions(texts: Sequence[str]) -> Tuple[List[str], List[int]]:
  9 + """
  10 + Deduplicate texts globally while preserving first-seen order.
  11 +
  12 + Returns:
  13 + unique_texts: deduplicated texts in first-seen order
  14 + position_to_unique: mapping from each original position to unique index
  15 + """
  16 + unique_texts: List[str] = []
  17 + position_to_unique: List[int] = []
  18 + seen: dict[str, int] = {}
  19 +
  20 + for text in texts:
  21 + idx = seen.get(text)
  22 + if idx is None:
  23 + idx = len(unique_texts)
  24 + seen[text] = idx
  25 + unique_texts.append(text)
  26 + position_to_unique.append(idx)
  27 +
  28 + return unique_texts, position_to_unique
  29 +
  30 +
  31 +def sort_indices_by_length(lengths: Sequence[int]) -> List[int]:
  32 + """Return stable ascending indices by lengths."""
  33 + return sorted(range(len(lengths)), key=lambda i: lengths[i])
  34 +
  35 +
  36 +def iter_batches(indices: Sequence[int], batch_size: int) -> Iterable[List[int]]:
  37 + """Yield consecutive batches from indices."""
  38 + if batch_size <= 0:
  39 + raise ValueError(f"batch_size must be > 0, got {batch_size}")
  40 + for i in range(0, len(indices), batch_size):
  41 + yield list(indices[i : i + batch_size])
... ...
reranker/backends/qwen3_vllm.py
... ... @@ -9,9 +9,16 @@ from __future__ import annotations
9 9  
10 10 import logging
11 11 import math
  12 +import os
12 13 import threading
13 14 import time
14   -from typing import Any, Dict, List, Optional, Tuple
  15 +from typing import Any, Dict, List, Tuple
  16 +
  17 +from reranker.backends.batching_utils import (
  18 + deduplicate_with_positions,
  19 + iter_batches,
  20 + sort_indices_by_length,
  21 +)
15 22  
16 23 logger = logging.getLogger("reranker.backends.qwen3_vllm")
17 24  
... ... @@ -60,10 +67,23 @@ class Qwen3VLLMRerankerBackend:
60 67 self._config.get("instruction")
61 68 or "Given a web search query, retrieve relevant passages that answer the query"
62 69 )
  70 + infer_batch_size = os.getenv("RERANK_VLLM_INFER_BATCH_SIZE") or self._config.get("infer_batch_size", 64)
  71 + sort_by_doc_length = os.getenv("RERANK_VLLM_SORT_BY_DOC_LENGTH")
  72 + if sort_by_doc_length is None:
  73 + sort_by_doc_length = self._config.get("sort_by_doc_length", True)
  74 + length_sort_mode = os.getenv("RERANK_VLLM_LENGTH_SORT_MODE") or self._config.get("length_sort_mode", "char")
  75 +
  76 + self._infer_batch_size = int(infer_batch_size)
  77 + self._sort_by_doc_length = str(sort_by_doc_length).strip().lower() in {"1", "true", "yes", "y", "on"}
  78 + self._length_sort_mode = str(length_sort_mode).strip().lower()
63 79 if not torch.cuda.is_available():
64 80 raise RuntimeError("qwen3_vllm backend requires CUDA GPU, but torch.cuda.is_available() is False")
65 81 if dtype not in {"float16", "half", "auto"}:
66 82 raise ValueError(f"Unsupported dtype for qwen3_vllm: {dtype!r}. Use float16/half/auto.")
  83 + if self._infer_batch_size <= 0:
  84 + raise ValueError(f"infer_batch_size must be > 0, got {self._infer_batch_size}")
  85 + if self._length_sort_mode not in {"char", "token"}:
  86 + raise ValueError(f"length_sort_mode must be 'char' or 'token', got {self._length_sort_mode!r}")
67 87  
68 88 logger.info(
69 89 "[Qwen3_VLLM] Loading model %s (max_model_len=%s, tp=%s, gpu_mem=%.2f, dtype=%s, prefix_caching=%s)",
... ... @@ -169,6 +189,30 @@ class Qwen3VLLMRerankerBackend:
169 189 scores.append(float(score))
170 190 return scores
171 191  
  192 + def _estimate_doc_lengths(self, docs: List[str]) -> List[int]:
  193 + """
  194 + Estimate token lengths for sorting documents into similar-length batches.
  195 + Falls back to character length when tokenizer length output is unavailable.
  196 + """
  197 + if not docs:
  198 + return []
  199 + if self._length_sort_mode == "char":
  200 + return [len(text) for text in docs]
  201 + try:
  202 + enc = self._tokenizer(
  203 + docs,
  204 + add_special_tokens=False,
  205 + truncation=True,
  206 + max_length=self._max_prompt_len,
  207 + return_length=True,
  208 + )
  209 + lengths = enc.get("length")
  210 + if isinstance(lengths, list) and len(lengths) == len(docs):
  211 + return [int(x) for x in lengths]
  212 + except Exception as exc:
  213 + logger.debug("Length estimation fallback to char length: %s", exc)
  214 + return [len(text) for text in docs]
  215 +
172 216 def score_with_meta(
173 217 self,
174 218 query: str,
... ... @@ -200,22 +244,35 @@ class Qwen3VLLMRerankerBackend:
200 244 "model": self._model_name,
201 245 "backend": "qwen3_vllm",
202 246 "normalize": normalize,
  247 + "infer_batch_size": self._infer_batch_size,
  248 + "inference_batches": 0,
  249 + "sort_by_doc_length": self._sort_by_doc_length,
  250 + "length_sort_mode": self._length_sort_mode,
203 251 }
204 252  
205   - # Deduplicate by text, keep mapping to original indices
206   - unique_texts: List[str] = []
207   - position_to_unique: List[int] = []
208   - prev: Optional[str] = None
209   - for _idx, text in indexed:
210   - if text != prev:
211   - unique_texts.append(text)
212   - prev = text
213   - position_to_unique.append(len(unique_texts) - 1)
214   -
215   - pairs = [(query, t) for t in unique_texts]
216   - with self._infer_lock:
  253 + # Deduplicate globally by text, keep mapping to original indices.
  254 + indexed_texts = [text for _, text in indexed]
  255 + unique_texts, position_to_unique = deduplicate_with_positions(indexed_texts)
  256 +
  257 + lengths = self._estimate_doc_lengths(unique_texts)
  258 + order = list(range(len(unique_texts)))
  259 + if self._sort_by_doc_length and len(unique_texts) > 1:
  260 + order = sort_indices_by_length(lengths)
  261 +
  262 + unique_scores: List[float] = [0.0] * len(unique_texts)
  263 + inference_batches = 0
  264 + for batch_indices in iter_batches(order, self._infer_batch_size):
  265 + inference_batches += 1
  266 + pairs = [(query, unique_texts[i]) for i in batch_indices]
217 267 prompts = self._process_inputs(pairs)
218   - unique_scores = self._compute_scores(prompts)
  268 + with self._infer_lock:
  269 + batch_scores = self._compute_scores(prompts)
  270 + if len(batch_scores) != len(batch_indices):
  271 + raise RuntimeError(
  272 + f"Reranker score size mismatch: expected {len(batch_indices)}, got {len(batch_scores)}"
  273 + )
  274 + for idx, score in zip(batch_indices, batch_scores):
  275 + unique_scores[idx] = float(score)
219 276  
220 277 for (orig_idx, _), unique_idx in zip(indexed, position_to_unique):
221 278 # Score is already P(yes) in [0,1] from yes/(yes+no)
... ... @@ -235,5 +292,9 @@ class Qwen3VLLMRerankerBackend:
235 292 "model": self._model_name,
236 293 "backend": "qwen3_vllm",
237 294 "normalize": normalize,
  295 + "infer_batch_size": self._infer_batch_size,
  296 + "inference_batches": inference_batches,
  297 + "sort_by_doc_length": self._sort_by_doc_length,
  298 + "length_sort_mode": self._length_sort_mode,
238 299 }
239 300 return output_scores, meta
... ...
scripts/benchmark_reranker_1000docs.sh 0 → 100755
... ... @@ -0,0 +1,131 @@
  1 +#!/bin/bash
  2 +#
  3 +# Benchmark reranker for e-commerce short-text workload:
  4 +# - query <= ~100 tokens
  5 +# - docs are short title / title+brief
  6 +# - one request contains ~1000 docs
  7 +#
  8 +# Outputs JSON reports under perf_reports/<date>/reranker_1000docs/
  9 +#
  10 +# Usage:
  11 +# ./scripts/benchmark_reranker_1000docs.sh
  12 +# Optional env:
  13 +# BATCH_SIZES="24 32 48 64"
  14 +# C1_REQUESTS=4
  15 +# C4_REQUESTS=8
  16 +# TENANT_ID=162
  17 +#
  18 +set -euo pipefail
  19 +
  20 +PROJECT_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
  21 +cd "${PROJECT_ROOT}"
  22 +
  23 +TENANT_ID="${TENANT_ID:-162}"
  24 +BATCH_SIZES="${BATCH_SIZES:-24 32 48 64}"
  25 +C1_REQUESTS="${C1_REQUESTS:-4}"
  26 +C4_REQUESTS="${C4_REQUESTS:-8}"
  27 +TIMEOUT_SEC="${TIMEOUT_SEC:-240}"
  28 +RERANK_BASE="${RERANK_BASE:-http://127.0.0.1:6007}"
  29 +
  30 +DATE_TAG="$(date +%Y%m%d)"
  31 +OUT_DIR="perf_reports/${DATE_TAG}/reranker_1000docs"
  32 +TMP_CASES="/tmp/rerank_1000_shortdocs_cases.json"
  33 +mkdir -p "${OUT_DIR}"
  34 +
  35 +cleanup() {
  36 + ./scripts/stop_reranker.sh >/dev/null 2>&1 || true
  37 +}
  38 +trap cleanup EXIT
  39 +
  40 +cat > "${TMP_CASES}" <<'JSON'
  41 +{
  42 + "scenarios": {
  43 + "rerank": [
  44 + {
  45 + "method": "POST",
  46 + "path": "/rerank",
  47 + "json": {
  48 + "query": "wireless ergonomic gaming mouse for office use with rechargeable battery and bluetooth",
  49 + "docs": [],
  50 + "normalize": true
  51 + }
  52 + }
  53 + ]
  54 + }
  55 +}
  56 +JSON
  57 +
  58 +python3 - <<'PY'
  59 +import json
  60 +from pathlib import Path
  61 +
  62 +p = Path("/tmp/rerank_1000_shortdocs_cases.json")
  63 +d = json.loads(p.read_text(encoding="utf-8"))
  64 +docs = []
  65 +for i in range(1000):
  66 + if i % 3 == 0:
  67 + doc = f"wireless mouse model {i} ergonomic grip 2.4g bluetooth"
  68 + elif i % 3 == 1:
  69 + doc = f"gaming mouse {i} rgb lightweight high precision sensor"
  70 + else:
  71 + doc = f"office mouse {i} rechargeable silent click compact"
  72 + if i % 5 == 0:
  73 + doc += " with usb receiver"
  74 + if i % 7 == 0:
  75 + doc += " long battery life"
  76 + docs.append(doc)
  77 +
  78 +d["scenarios"]["rerank"][0]["json"]["docs"] = docs
  79 +p.write_text(json.dumps(d, ensure_ascii=False), encoding="utf-8")
  80 +print(f"[info] generated docs={len(docs)} at {p}")
  81 +PY
  82 +
  83 +run_bench() {
  84 + local bs="$1"
  85 + local c="$2"
  86 + local req="$3"
  87 + local out="${OUT_DIR}/rerank_bs${bs}_c${c}_r${req}.json"
  88 + .venv/bin/python scripts/perf_api_benchmark.py \
  89 + --scenario rerank \
  90 + --tenant-id "${TENANT_ID}" \
  91 + --reranker-base "${RERANK_BASE}" \
  92 + --cases-file "${TMP_CASES}" \
  93 + --concurrency "${c}" \
  94 + --max-requests "${req}" \
  95 + --timeout "${TIMEOUT_SEC}" \
  96 + --output "${out}" >/dev/null
  97 + python3 - <<PY
  98 +import json
  99 +p="${out}"
  100 +d=json.load(open(p))
  101 +r=d["results"][0]
  102 +lat=r["latency_ms"]
  103 +print(f"[result] bs=${bs} c=${c} req=${req} avg={lat['avg']}ms p95={lat['p95']}ms rps={r['throughput_rps']}")
  104 +PY
  105 +}
  106 +
  107 +for bs in ${BATCH_SIZES}; do
  108 + echo "[info] benchmarking infer_batch_size=${bs}"
  109 + cleanup
  110 + RERANK_VLLM_INFER_BATCH_SIZE="${bs}" \
  111 + RERANK_VLLM_SORT_BY_DOC_LENGTH="true" \
  112 + RERANK_VLLM_LENGTH_SORT_MODE="char" \
  113 + nohup ./scripts/start_reranker.sh >"${OUT_DIR}/start_bs${bs}.log" 2>&1 &
  114 +
  115 + for i in $(seq 1 180); do
  116 + if curl -sf "${RERANK_BASE}/health" >/dev/null 2>&1; then
  117 + break
  118 + fi
  119 + sleep 1
  120 + if [ "${i}" -eq 180 ]; then
  121 + echo "[error] reranker startup timeout for bs=${bs}" >&2
  122 + tail -n 80 "${OUT_DIR}/start_bs${bs}.log" >&2 || true
  123 + exit 1
  124 + fi
  125 + done
  126 +
  127 + run_bench "${bs}" 1 "${C1_REQUESTS}"
  128 + run_bench "${bs}" 4 "${C4_REQUESTS}"
  129 +done
  130 +
  131 +echo "[info] benchmark done: ${OUT_DIR}"
... ...
scripts/service_ctl.sh
... ... @@ -98,6 +98,28 @@ wait_for_health() {
98 98 return 1
99 99 }
100 100  
  101 +wait_for_stable_health() {
  102 + local service="$1"
  103 + local checks="${2:-3}"
  104 + local interval_sec="${3:-1}"
  105 + local port
  106 + port="$(get_port "${service}")"
  107 + local path="/health"
  108 +
  109 + local i=0
  110 + while [ "${i}" -lt "${checks}" ]; do
  111 + if ! is_running_by_port "${service}"; then
  112 + return 1
  113 + fi
  114 + if ! curl -sf "http://127.0.0.1:${port}${path}" >/dev/null 2>&1; then
  115 + return 1
  116 + fi
  117 + i=$((i + 1))
  118 + sleep "${interval_sec}"
  119 + done
  120 + return 0
  121 +}
  122 +
101 123 is_running_by_pid() {
102 124 local service="$1"
103 125 local pf
... ... @@ -185,18 +207,12 @@ start_one() {
185 207 return 1
186 208 fi
187 209 ;;
188   - backend|indexer|frontend|embedding|translator|reranker)
  210 + backend|indexer|frontend|embedding|translator)
189 211 echo "[start] ${service}"
190 212 nohup bash -lc "${cmd}" > "${lf}" 2>&1 &
191 213 local pid=$!
192 214 echo "${pid}" > "${pf}"
193   - # Some services (notably reranker with vLLM backend) can take longer
194   - # to load models / compile graphs on first start. Give them a longer
195   - # health check window to avoid false negatives.
196 215 local retries=30
197   - if [ "${service}" = "reranker" ]; then
198   - retries=90
199   - fi
200 216 if wait_for_health "${service}" "${retries}"; then
201 217 echo "[ok] ${service} healthy (pid=${pid}, log=${lf})"
202 218 else
... ... @@ -204,6 +220,25 @@ start_one() {
204 220 return 1
205 221 fi
206 222 ;;
  223 + reranker)
  224 + echo "[start] ${service}"
  225 + # Start reranker directly so pid file points to the script process that
  226 + # will exec uvicorn, avoiding extra shell wrapper lifecycle edge-cases.
  227 + nohup "${cmd}" > "${lf}" 2>&1 &
  228 + local pid=$!
  229 + echo "${pid}" > "${pf}"
  230 + if wait_for_health "${service}" 90; then
  231 + if wait_for_stable_health "${service}" 5 1; then
  232 + echo "[ok] ${service} healthy (pid=${pid}, log=${lf})"
  233 + else
  234 + echo "[error] ${service} became unavailable right after startup, inspect ${lf}" >&2
  235 + return 1
  236 + fi
  237 + else
  238 + echo "[error] ${service} health check timeout, inspect ${lf}" >&2
  239 + return 1
  240 + fi
  241 + ;;
207 242 *)
208 243 echo "[warn] ${service} unsupported start path"
209 244 ;;
... ...
tests/test_reranker_batching_utils.py 0 → 100644
... ... @@ -0,0 +1,32 @@
  1 +import pytest
  2 +
  3 +from reranker.backends.batching_utils import (
  4 + deduplicate_with_positions,
  5 + iter_batches,
  6 + sort_indices_by_length,
  7 +)
  8 +
  9 +
  10 +def test_deduplicate_with_positions_global_not_adjacent():
  11 + texts = ["a", "b", "a", "c", "b", "a"]
  12 + unique, mapping = deduplicate_with_positions(texts)
  13 + assert unique == ["a", "b", "c"]
  14 + assert mapping == [0, 1, 0, 2, 1, 0]
  15 +
  16 +
  17 +def test_sort_indices_by_length_stable():
  18 + lengths = [5, 2, 2, 9, 4]
  19 + order = sort_indices_by_length(lengths)
  20 + # Stable sort: index 1 remains ahead of index 2 when lengths are equal.
  21 + assert order == [1, 2, 4, 0, 3]
  22 +
  23 +
  24 +def test_iter_batches():
  25 + indices = list(range(10))
  26 + batches = list(iter_batches(indices, 4))
  27 + assert batches == [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]
  28 +
  29 +
  30 +def test_iter_batches_invalid_batch_size():
  31 + with pytest.raises(ValueError, match="batch_size must be > 0"):
  32 + list(iter_batches([0, 1], 0))
... ...