diff --git a/api/routes/indexer.py b/api/routes/indexer.py index bcc8078..95bded1 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -5,6 +5,7 @@ """ import asyncio +import re from fastapi import APIRouter, HTTPException from typing import Any, Dict, List from pydantic import BaseModel, Field @@ -76,6 +77,25 @@ class BuildDocsFromDbRequest(BaseModel): spu_ids: List[str] = Field(..., description="需要构建 doc 的 SPU ID 列表") +class EnrichContentItem(BaseModel): + """单条待生成内容理解字段的商品(仅需 spu_id + 标题)。""" + spu_id: str = Field(..., description="SPU ID") + title: str = Field(..., description="商品标题,用于 LLM 分析生成 qanchors / tags 等") + + +class EnrichContentRequest(BaseModel): + """ + 内容理解字段生成请求:根据商品标题批量生成 qanchors、semantic_attributes、tags。 + 供外部 indexer 在自行组织 doc 时调用,与翻译、向量化等微服务并列。 + """ + tenant_id: str = Field(..., description="租户 ID,用于缓存隔离") + items: List[EnrichContentItem] = Field(..., description="待分析的 SPU 列表(spu_id + title)") + languages: List[str] = Field( + default_factory=lambda: ["zh", "en"], + description="目标语言列表,需在支持范围内(zh/en/de/ru/fr),默认 zh, en", + ) + + @router.post("/reindex") async def reindex_all(request: ReindexRequest): """ @@ -411,6 +431,152 @@ async def build_docs_from_db(request: BuildDocsFromDbRequest): raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: + """ + 同步执行内容理解:调用 process_products.analyze_products,按语言批量跑 LLM, + 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。 + """ + from indexer.process_products import analyze_products, SUPPORTED_LANGS + + llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS] + if not llm_langs: + return [ + { + "spu_id": it["spu_id"], + "qanchors": {}, + "semantic_attributes": [], + "tags": [], + "error": "no supported languages (supported: %s)" % sorted(SUPPORTED_LANGS), + } + for it in items + ] + + products = [{"id": it["spu_id"], "title": (it.get("title") or "").strip()} for it in items] + dim_keys = [ + "tags", + "target_audience", + "usage_scene", + "season", + "key_attributes", + "material", + "features", + ] + + # 按 spu_id 聚合:qanchors[lang], semantic_attributes[], tags[] + by_spu: Dict[str, Dict[str, Any]] = {} + for it in items: + sid = str(it["spu_id"]) + by_spu[sid] = {"qanchors": {}, "semantic_attributes": [], "tags": []} + + for lang in llm_langs: + try: + rows = analyze_products( + products=products, + target_lang=lang, + batch_size=20, + tenant_id=tenant_id, + ) + except Exception as e: + logger.warning("enrich-content analyze_products failed for lang=%s: %s", lang, e) + for it in items: + sid = str(it["spu_id"]) + if "error" not in by_spu[sid]: + by_spu[sid]["error"] = str(e) + continue + + for row in rows: + spu_id = str(row.get("id") or "") + if spu_id not in by_spu: + continue + rec = by_spu[spu_id] + if row.get("error"): + rec["error"] = row["error"] + continue + anchor_text = str(row.get("anchor_text") or "").strip() + if anchor_text: + rec["qanchors"][lang] = anchor_text + for name in dim_keys: + raw = row.get(name) + if not raw: + continue + for part in re.split(r"[,;|/\n\t]+", str(raw)): + value = part.strip() + if not value: + continue + rec["semantic_attributes"].append({"lang": lang, "name": name, "value": value}) + if name == "tags": + rec["tags"].append(value) + + # 去重 tags(保持顺序) + out = [] + for it in items: + sid = str(it["spu_id"]) + rec = by_spu[sid] + tags = list(dict.fromkeys(rec["tags"])) + out.append({ + "spu_id": sid, + "qanchors": rec["qanchors"], + "semantic_attributes": rec["semantic_attributes"], + "tags": tags, + **({"error": rec["error"]} if rec.get("error") else {}), + }) + return out + + +@router.post("/enrich-content") +async def enrich_content(request: EnrichContentRequest): + """ + 内容理解字段生成接口:根据商品标题批量生成 qanchors、semantic_attributes、tags。 + + 使用场景: + - 外部 indexer 采用「微服务组合」方式自己组织 doc 时,可调用本接口获取 LLM 生成的 + 锚文本与语义属性,再与翻译、向量化结果合并写入 ES。 + - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 + 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 + + 实现逻辑与 indexer.process_products.analyze_products 一致,支持多语言与 Redis 缓存。 + """ + try: + if not request.items: + raise HTTPException(status_code=400, detail="items cannot be empty") + if len(request.items) > 50: + raise HTTPException( + status_code=400, + detail="Maximum 50 items per request for enrich-content (LLM batch limit)", + ) + + items_payload = [ + {"spu_id": it.spu_id, "title": it.title or ""} + for it in request.items + ] + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: _run_enrich_content( + tenant_id=request.tenant_id, + items=items_payload, + languages=request.languages or ["zh", "en"], + ), + ) + return { + "tenant_id": request.tenant_id, + "results": result, + "total": len(result), + } + except HTTPException: + raise + except RuntimeError as e: + if "DASHSCOPE_API_KEY" in str(e) or "cannot call LLM" in str(e).lower(): + raise HTTPException( + status_code=503, + detail="Content understanding service unavailable: DASHSCOPE_API_KEY not set", + ) + raise HTTPException(status_code=500, detail=str(e)) + except Exception as e: + logger.error(f"Error in enrich-content for tenant_id={request.tenant_id}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + @router.post("/documents") async def get_documents(request: GetDocumentsRequest): """ diff --git a/config/config.yaml b/config/config.yaml index 4624e83..71b874a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -184,6 +184,9 @@ services: dtype: "float16" enable_prefix_caching: true enforce_eager: false + infer_batch_size: 64 + sort_by_doc_length: true + length_sort_mode: "char" # char | token instruction: "Given a web search query, retrieve relevant passages that answer the query" # SPU配置(已启用,使用嵌套skus) diff --git a/docs/CNCLIP_SERVICE说明文档.md b/docs/CNCLIP_SERVICE说明文档.md index c176aff..8f9c3ab 100644 --- a/docs/CNCLIP_SERVICE说明文档.md +++ b/docs/CNCLIP_SERVICE说明文档.md @@ -147,42 +147,7 @@ curl -sS -X POST "http://127.0.0.1:6005/embed/image" \ 返回应为向量数组(非空)。 -## 8. 常见问题与排障 - -### 8.1 “为什么 cnclip 没用 GPU?” - -先看启动命令是否显式写了: - -```bash -CNCLIP_DEVICE=cpu ... -``` - -如果是,这就是预期行为,会强制 CPU。 - -### 8.2 “我要求 cuda,但 service_ctl 说 mode mismatch” - -说明当前已有 CPU 模式实例在跑。按提示重启: - -```bash -./scripts/service_ctl.sh stop cnclip -CNCLIP_DEVICE=cuda ./scripts/service_ctl.sh start cnclip -``` - -### 8.3 进程被 `Killed` - -常见于资源竞争(尤其单卡同时运行 TEI/reranker/cnclip 时的启动峰值)。建议: - -1. 先起 `tei` 与 `cnclip`,确认稳定后再起 `reranker`。 -2. 适当下调 `reranker` 显存预算(`config/config.yaml` 的 `gpu_memory_utilization`)。 -3. 必要时将 CN-CLIP 模型从 `ViT-H-14` 调整到更小规模。 - -### 8.4 gRPC 连接失败(`Connection refused`) - -- 检查端口:`lsof -i :51000` -- 看服务日志:`tail -f logs/cnclip_service.log` -- 确认客户端使用 `grpc://` 协议而非 `http://` - -## 9. 相关文档 +## 8. 相关文档 - 开发总览:`docs/QUICKSTART.md` - TEI 专项:`docs/TEI_SERVICE说明文档.md` diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 96fad44..020c54d 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -324,7 +324,7 @@ services: ### 7.6 新增后端清单(以 Qwen3-Reranker 为例) 1. **实现协议**:在 `reranker/backends/qwen3_vllm.py` 中实现类,提供 `score_with_meta(query, docs, normalize) -> (scores, meta)`,输出与 docs 等长且顺序一致。 -2. **配置**:在 `config/config.yaml` 的 `services.rerank.backends` 下增加 `qwen3_vllm` 块(model_name、engine、max_model_len、gpu_memory_utilization 等);支持环境变量 `RERANK_BACKEND=qwen3_vllm`。 +2. **配置**:在 `config/config.yaml` 的 `services.rerank.backends` 下增加 `qwen3_vllm` 块(model_name、engine、max_model_len、gpu_memory_utilization、`infer_batch_size`、`sort_by_doc_length`、`length_sort_mode` 等);支持环境变量 `RERANK_BACKEND=qwen3_vllm`。 3. **注册**:在 `reranker/backends/__init__.py` 的 `get_rerank_backend(name, config)` 中增加 `qwen3_vllm` 分支。 4. **服务启动**:`reranker/server.py` 启动时根据配置调用 `get_rerank_backend(backend_name, backend_cfg)` 得到实例。 5. **调用方**:无需修改;仅部署时启动使用新后端的 reranker 服务即可。 diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 10d9396..d409026 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -31,23 +31,26 @@ - 4.5 [多语言字段说明](#45-多语言字段说明) 5. [索引接口](#索引接口) - - 5.0 [为租户创建索引](#50-为租户创建索引) - - 5.1 [全量索引接口](#51-全量索引接口) - - 5.2 [增量索引接口](#52-增量索引接口) - - 5.3 [查询文档接口](#53-查询文档接口) - - 5.4 [索引健康检查接口](#54-索引健康检查接口) - - 5.5 [文档构建接口(正式对接)](#55-文档构建接口正式对接推荐) - - 5.6 [文档构建接口(测试/自测)](#56-文档构建接口测试--自测) + - 5.0 [支撑外部 indexer 的三种方式](#50-支撑外部-indexer-的三种方式) + - 5.1 [为租户创建索引](#51-为租户创建索引) + - 5.2 [全量索引接口](#52-全量索引接口) + - 5.3 [增量索引接口](#53-增量索引接口) + - 5.4 [查询文档接口](#54-查询文档接口) + - 5.5 [索引健康检查接口](#55-索引健康检查接口) + - 5.6 [文档构建接口(正式对接)](#56-文档构建接口正式对接推荐) + - 5.7 [文档构建接口(测试/自测)](#57-文档构建接口测试--自测) + - 5.8 [内容理解字段生成接口](#58-内容理解字段生成接口) 6. [管理接口](#管理接口) - 6.1 [健康检查](#61-健康检查) - 6.2 [获取配置](#62-获取配置) - 6.3 [索引统计](#63-索引统计) -7. [微服务接口(向量、重排、翻译)](#7-微服务接口向量重排翻译) +7. [微服务接口(向量、重排、翻译、内容理解)](#7-微服务接口向量重排翻译) - 7.1 [向量服务(Embedding)](#71-向量服务embedding) - 7.2 [重排服务(Reranker)](#72-重排服务reranker) - 7.3 [翻译服务(Translation)](#73-翻译服务translation) + - 7.4 [内容理解字段生成(Indexer 服务内)](#74-内容理解字段生成indexer-服务内) 8. [常见场景示例](#8-常见场景示例) - 8.1 [基础搜索与排序](#81-基础搜索与排序) @@ -144,12 +147,13 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) | | 构建ES文档(正式对接) | POST | `/indexer/build-docs` | 基于上游提供的 MySQL 行数据构建 ES doc,不写入 ES,供 Java 等调用后自行写入 | | 构建ES文档(测试用) | POST | `/indexer/build-docs-from-db` | 仅在测试/调试时使用,根据 `tenant_id + spu_ids` 内部查库并构建 ES doc | +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、semantic_attributes、tags,供微服务组合方式使用 | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | | 健康检查 | GET | `/admin/health` | 服务健康检查 | | 获取配置 | GET | `/admin/config` | 获取租户配置 | | 索引统计 | GET | `/admin/stats` | 获取租户索引统计信息(需 tenant_id) | -**微服务(独立端口,外部可直连)**: +**微服务(独立端口或 Indexer 内,外部可直连)**: | 服务 | 端口 | 接口 | 说明 | |------|------|------|------| @@ -157,6 +161,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 向量服务 | 6005 | `POST /embed/image` | 图片向量化 | | 翻译服务 | 6006 | `POST /translate` | 文本翻译(Qwen/DeepL) | | 重排服务 | 6007 | `POST /rerank` | 检索结果重排 | +| 内容理解(Indexer 内) | 6004 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 | --- @@ -856,9 +861,23 @@ curl "http://localhost:6002/search/12345" -H "X-Tenant-ID: 162" | 查询文档 | POST | `/indexer/documents` | 按 SPU ID 列表查询 ES 文档,不写入 ES | | 构建 ES 文档(正式) | POST | `/indexer/build-docs` | 由上游提供 MySQL 行数据,返回 ES-ready 文档,不写 ES | | 构建 ES 文档(测试) | POST | `/indexer/build-docs-from-db` | 由本服务查库并构建文档,仅测试/调试用 | +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、semantic_attributes、tags(供微服务组合方式使用) | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务与数据库连接状态 | -### 5.0 为租户创建索引 +#### 5.0 支撑外部 indexer 的三种方式 + +本服务对**外部 indexer 程序**(如 Java 索引系统)提供三种对接方式,可按需选择: + +| 方式 | 说明 | 适用场景 | +|------|------|----------| +| **1)doc 填充接口** | 调用 `POST /indexer/build-docs` 或 `POST /indexer/build-docs-from-db`,由本服务基于 MySQL 行数据构建完整 ES 文档(含多语言、向量、规格等),**不写入 ES**,由调用方自行写入。 | 希望一站式拿到 ES-ready doc,由己方控制写 ES 的时机与索引名。 | +| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 Indexer 服务内接口 `POST /indexer/enrich-content`。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 | +| **3)本服务直接写 ES** | 调用全量索引 `POST /indexer/reindex`、增量索引 `POST /indexer/index`(指定 SPU ID 列表),由本服务从 MySQL 拉数并直接写入 ES。 | 自建运维、联调或不需要由 Java 写 ES 的场景。 | + +- **方式 1** 与 **方式 2** 下,ES 的写入方均为外部 indexer(或 Java),职责清晰。 +- **方式 3** 下,本服务同时负责读库、构建 doc 与写 ES。 + +### 5.1 为租户创建索引 为租户创建索引需要两个步骤: @@ -894,7 +913,7 @@ export ES_INDEX_NAMESPACE=uat_ --- -### 5.1 全量索引接口 +### 5.2 全量索引接口 - **端点**: `POST /indexer/reindex` - **描述**: 全量索引,将指定租户的所有SPU数据导入到ES索引(不会删除现有索引)。**推荐仅用于自测/运维场景**;生产环境下更推荐由 Java 等上游控制调度与写 ES。 @@ -953,7 +972,7 @@ tail -f logs/api.log tail -f logs/*.log ``` -> ⚠️ **重要提示**:如需 **创建索引结构**,请参考 [5.0 为租户创建索引](#50-为租户创建索引) 章节,使用 `./scripts/create_tenant_index.sh `。创建后需要调用 `/indexer/reindex` 导入数据。 +> ⚠️ **重要提示**:如需 **创建索引结构**,请参考 [5.1 为租户创建索引](#51-为租户创建索引) 章节,使用 `./scripts/create_tenant_index.sh `。创建后需要调用 `/indexer/reindex` 导入数据。 **查看索引日志**: @@ -1000,7 +1019,7 @@ cat logs/indexer.log | jq 'select(.spu_id == "123")' cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp, index_type, tenant_id, total_count, success_count, failed_count, elapsed_time}' ``` -### 5.2 增量索引接口 +### 5.3 增量索引接口 - **端点**: `POST /indexer/index` - **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。**推荐仅作为内部/调试入口**;正式对接建议改用 `/indexer/build-docs`,由上游写 ES。 @@ -1158,7 +1177,7 @@ curl -X POST "http://localhost:6004/indexer/index" \ 日志查询方式请参考[5.1节查看索引日志](#51-全量重建索引接口)部分。 -### 5.3 查询文档接口 +### 5.4 查询文档接口 - **端点**: `POST /indexer/documents` - **描述**: 查询文档接口,根据SPU ID列表获取ES文档数据(**不写入ES**)。用于查看、调试或验证SPU数据。 @@ -1251,7 +1270,7 @@ curl -X POST "http://localhost:6004/indexer/documents" \ - `/indexer/documents`:用于查看、调试或验证SPU数据,不修改ES索引 - `/indexer/index`:用于实际的增量索引操作,将更新的SPU数据同步到ES -### 5.4 索引健康检查接口 +### 5.5 索引健康检查接口 - **端点**: `GET /indexer/health` - **描述**: 检查索引服务健康状态(与 `api/routes/indexer.py` 中 `indexer_health_check` 一致) @@ -1280,9 +1299,9 @@ curl -X POST "http://localhost:6004/indexer/documents" \ curl -X GET "http://localhost:6004/indexer/health" ``` -### 5.5 文档构建接口(正式对接推荐) +### 5.6 文档构建接口(正式对接推荐) -#### 5.5.1 `POST /indexer/build-docs` +#### 5.6.1 `POST /indexer/build-docs` - **描述**: 基于调用方(通常是 Java 索引程序)提供的 **MySQL 行数据** 构建 ES 文档(doc),**不写入 ES**。 @@ -1427,9 +1446,9 @@ curl -X POST "http://localhost:6004/indexer/build-docs" \ 3. 调用 `/indexer/build-docs` 获取 ES-ready `docs`; 4. Java 使用自己的 ES 客户端写入 `search_products_tenant_{tenant_id}`。 -### 5.6 文档构建接口(测试 / 自测) +### 5.7 文档构建接口(测试 / 自测) -#### 5.6.1 `POST /indexer/build-docs-from-db` +#### 5.7.1 `POST /indexer/build-docs-from-db` - **描述**: 仅用于测试/调试:调用方只提供 `tenant_id` 和 `spu_ids`,由 indexer 服务内部从 MySQL 查询 SPU/SKU/Option,然后调用与 `/indexer/build-docs` 相同的文档构建逻辑,返回 ES-ready doc。**生产环境请使用 `/indexer/build-docs`,由上游查库并写 ES。** @@ -1462,6 +1481,86 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ 返回结构与 `/indexer/build-docs` 相同,可直接用于对比 ES 实际文档或调试字段映射问题。 +### 5.8 内容理解字段生成接口 + +- **端点**: `POST /indexer/enrich-content` +- **描述**: 根据商品标题批量生成 **qanchors**(锚文本)、**semantic_attributes**(语义属性)、**tags**(细分标签),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。内部逻辑与 `indexer.process_products` 一致,支持多语言与 Redis 缓存;单次请求在线程池中执行,避免阻塞其他接口。 + +#### 请求参数 + +```json +{ + "tenant_id": "170", + "items": [ + { "spu_id": "223167", "title": "纯棉短袖T恤 夏季男装" }, + { "spu_id": "223168", "title": "12PCS Dolls with Bottles" } + ], + "languages": ["zh", "en"] +} +``` + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| `tenant_id` | string | Y | - | 租户 ID,用于缓存隔离 | +| `items` | array | Y | - | 待分析列表,每项含 `spu_id`、`title`;**单次最多 50 条** | +| `languages` | array[string] | N | `["zh", "en"]` | 目标语言,需在支持范围内:`zh`、`en`、`de`、`ru`、`fr` | + +#### 响应格式 + +```json +{ + "tenant_id": "170", + "total": 2, + "results": [ + { + "spu_id": "223167", + "qanchors": { + "zh": "短袖T恤,纯棉,男装,夏季", + "en": "cotton t-shirt, short sleeve, men, summer" + }, + "semantic_attributes": [ + { "lang": "zh", "name": "tags", "value": "纯棉" }, + { "lang": "zh", "name": "usage_scene", "value": "日常" }, + { "lang": "en", "name": "tags", "value": "cotton" } + ], + "tags": ["纯棉", "短袖", "男装", "cotton", "short sleeve"] + }, + { + "spu_id": "223168", + "qanchors": { "en": "dolls, toys, 12pcs" }, + "semantic_attributes": [], + "tags": ["dolls", "toys"] + } + ] +} +``` + +| 字段 | 类型 | 说明 | +|------|------|------| +| `results` | array | 与请求 `items` 一一对应,每项含 `spu_id`、`qanchors`、`semantic_attributes`、`tags` | +| `results[].qanchors` | object | 按语言键的锚文本(逗号分隔短语),可写入 ES 文档的 `qanchors.{lang}` | +| `results[].semantic_attributes` | array | 语义属性列表,每项为 `{ "lang", "name", "value" }`,可写入 ES 的 `semantic_attributes` nested 字段 | +| `results[].tags` | array | 从语义属性中抽取的 `name=tags` 的 value 集合,可与业务原有 `tags` 合并后写入 ES 的 `tags` 字段 | +| `results[].error` | string | 若该条处理失败(如 LLM 异常),会在此字段返回错误信息 | + +**错误响应**: +- `400`: `items` 为空或超过 50 条 +- `503`: 未配置 `DASHSCOPE_API_KEY`,内容理解服务不可用 + +#### 请求示例 + +```bash +curl -X POST "http://localhost:6004/indexer/enrich-content" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "170", + "items": [ + { "spu_id": "223167", "title": "纯棉短袖T恤 夏季男装" } + ], + "languages": ["zh", "en"] + }' +``` + --- ## 管理接口 @@ -1614,6 +1713,7 @@ START_TEI=0 ./scripts/service_ctl.sh restart embedding - **启动**: `./scripts/start_reranker.sh` 说明:默认后端为 `qwen3_vllm`(`Qwen/Qwen3-Reranker-0.6B`),需要可用 GPU 显存。 +补充:`docs` 的请求大小与模型推理 `batch size` 解耦。即使一次传入 1000 条文档,服务端也会按 `services.rerank.backends.qwen3_vllm.infer_batch_size` 自动拆分;若 `sort_by_doc_length=true`,会先按文档长度排序后分批,减少 padding,再按原输入顺序返回分数。`length_sort_mode` 可选 `char`(更快)或 `token`(更精确)。 #### 7.2.1 `POST /rerank` — 结果重排 @@ -1746,6 +1846,16 @@ curl -X POST "http://localhost:6006/translate" \ curl "http://localhost:6006/health" ``` +### 7.4 内容理解字段生成(Indexer 服务内) + +内容理解字段生成接口部署在 **Indexer 服务**(默认端口 6004)内,与「翻译、向量化」等独立端口微服务并列,供采用**微服务组合**方式的 indexer 调用。 + +- **Base URL**: Indexer 服务地址,如 `http://localhost:6004` +- **路径**: `POST /indexer/enrich-content` +- **说明**: 根据商品标题批量生成 `qanchors`、`semantic_attributes`、`tags`,用于拼装 ES 文档。内部使用大模型(需配置 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存;单次最多 50 条,建议批量调用以提升效率。 + +请求/响应格式、示例及错误码见 [5.8 内容理解字段生成接口](#58-内容理解字段生成接口)。 + --- ## 8. 常见场景示例 diff --git a/perf_reports/20260311/reranker_1000docs/report.md b/perf_reports/20260311/reranker_1000docs/report.md new file mode 100644 index 0000000..ba410d8 --- /dev/null +++ b/perf_reports/20260311/reranker_1000docs/report.md @@ -0,0 +1,38 @@ +# Reranker 1000-doc Performance Report (2026-03-11) + +Workload profile: +- backend: `qwen3_vllm` (`Qwen/Qwen3-Reranker-0.6B`) +- query: short e-commerce text (<100 tokens) +- docs/request: 1000 short titles/title+brief +- options: `sort_by_doc_length=true`, `length_sort_mode=char` + +## Results + +| infer_batch_size | concurrency | requests | rps | avg_ms | p95_ms | p99_ms | +|---:|---:|---:|---:|---:|---:|---:| +| 24 | 1 | 4 | 0.34 | 2962.42 | 4758.59 | 4969.54 | +| 32 | 1 | 4 | 0.56 | 1756.63 | 2285.96 | 2389.21 | +| 48 | 1 | 4 | 0.63 | 1570.78 | 2111.8 | 2206.45 | +| 64 | 1 | 4 | 0.68 | 1448.87 | 2014.51 | 2122.44 | +| 80 | 1 | 3 | 0.64 | 1546.78 | 2091.47 | 2157.13 | +| 96 | 1 | 3 | 0.64 | 1534.44 | 2202.48 | 2288.99 | +| 96 | 1 | 4 | 0.52 | 1914.41 | 2215.05 | 2216.08 | +| 24 | 4 | 8 | 0.46 | 8614.9 | 9886.68 | 9887.0 | +| 32 | 4 | 8 | 0.62 | 6432.39 | 6594.11 | 6595.41 | +| 48 | 4 | 8 | 0.72 | 5451.5 | 5495.01 | 5500.29 | +| 64 | 4 | 8 | 0.76 | 5217.79 | 5329.15 | 5332.3 | +| 80 | 4 | 6 | 0.49 | 7069.9 | 9198.54 | 9208.77 | +| 96 | 4 | 6 | 0.76 | 4302.86 | 5139.19 | 5149.08 | +| 96 | 4 | 8 | 0.81 | 4852.78 | 5038.37 | 5058.76 | + +## Decision + +- For latency-sensitive online rerank (single request with 1000 docs), `infer_batch_size=64` gives the best c=1 latency in this run set. +- For higher concurrency c=4, `infer_batch_size=96` has slightly higher throughput, but degrades c=1 latency noticeably. +- Default kept at **`infer_batch_size=64`** as balanced/safer baseline for mixed traffic. + +## Reproduce + +```bash +./scripts/benchmark_reranker_1000docs.sh +``` diff --git a/reranker/DEPLOYMENT_AND_TUNING.md b/reranker/DEPLOYMENT_AND_TUNING.md new file mode 100644 index 0000000..48a82e0 --- /dev/null +++ b/reranker/DEPLOYMENT_AND_TUNING.md @@ -0,0 +1,210 @@ +# Reranker 部署与性能调优手册(Qwen3-vLLM) + +本文档沉淀当前项目在电商搜索重排场景下的可复用实践,覆盖: + +- 环境准备与安装部署 +- `qwen3_vllm` 配置项与优化思路 +- 1000-doc 场景压测流程 +- 关键结论与推荐默认参数 +- 常见故障排查 + +适用范围: + +- 重排后端:`services.rerank.backend: qwen3_vllm` +- 模型:`Qwen/Qwen3-Reranker-0.6B` +- 场景:query 较短(通常 < 100 tokens),doc 为商品标题或标题+简短描述,单请求 docs 约 1000 条 + +## 1. 环境基线 + +当前验证环境(2026-03-11): + +- GPU:`Tesla T4 16GB` +- Driver / CUDA:`570.158.01 / 12.8` +- Python:`3.12.3` +- 关键依赖:`vllm==0.17.0`、`torch==2.10.0+cu128`、`transformers==4.57.6`、`fastapi==0.135.1`、`uvicorn==0.41.0` + +## 2. 环境准备与安装 + +### 2.1 准备 reranker 独立虚拟环境 + +```bash +./scripts/setup_reranker_venv.sh +``` + +### 2.2 基础检查 + +```bash +nvidia-smi +./.venv-reranker/bin/python -c "import torch; print(torch.cuda.is_available())" +./.venv-reranker/bin/python -c "import vllm, transformers; print(vllm.__version__, transformers.__version__)" +``` + +## 3. 部署与运行 + +### 3.1 配置(`config/config.yaml`) + +当前推荐基线: + +```yaml +services: + rerank: + backend: "qwen3_vllm" + backends: + qwen3_vllm: + model_name: "Qwen/Qwen3-Reranker-0.6B" + engine: "vllm" + max_model_len: 256 + tensor_parallel_size: 1 + gpu_memory_utilization: 0.36 + dtype: "float16" + enable_prefix_caching: true + enforce_eager: false + infer_batch_size: 64 + sort_by_doc_length: true + length_sort_mode: "char" # char | token +``` + +### 3.2 启停命令 + +推荐统一使用: + +```bash +./scripts/service_ctl.sh start reranker +./scripts/service_ctl.sh status reranker +./scripts/service_ctl.sh stop reranker +``` + +也可直接运行原生脚本: + +```bash +./scripts/start_reranker.sh +./scripts/stop_reranker.sh +``` + +健康检查: + +```bash +curl -sS http://127.0.0.1:6007/health +``` + +## 4. 核心优化点(已落地) + +### 4.1 请求大小与推理 batch 解耦 + +- 调用方一次可传入 1000 docs(业务需求) +- 服务端按 `infer_batch_size` 自动拆批推理(模型效率需求) + +### 4.2 先排序再分批,降低 padding 浪费 + +- `sort_by_doc_length: true`:按长度排序后再分批 +- `length_sort_mode: "char"`:短文本场景下开销更低,默认推荐 +- `length_sort_mode: "token"`:长度估计更精确,但有额外 tokenizer 开销 + +### 4.3 全局去重后回填 + +- 对 docs 进行全局去重(非“仅相邻去重”) +- 推理后按原请求顺序回填 scores,保证接口契约稳定 + +### 4.4 启动稳定性修复 + +- `service_ctl.sh` 对 reranker 使用独立启动路径 +- 增加“稳定健康检查”(连续健康探测)避免“刚 healthy 即退出”的假阳性 + +## 5. 性能调优流程(标准流程) + +### 5.1 使用一键压测脚本 + +```bash +./scripts/benchmark_reranker_1000docs.sh +``` + +输出目录: + +- `perf_reports//reranker_1000docs/*.json` +- `perf_reports//reranker_1000docs/report.md` + +### 5.2 常用参数扫描 + +默认扫描: + +- `infer_batch_size`: `24 32 48 64` +- 并发组:`c=1`(看单请求延迟)、`c=4`(看并发吞吐与尾延迟) + +可通过环境变量覆盖: + +- `BATCH_SIZES` +- `C1_REQUESTS` +- `C4_REQUESTS` +- `TENANT_ID` +- `RERANK_BASE` + +### 5.3 运行时临时覆盖服务参数 + +用于“同机对比不同参数”: + +- `RERANK_VLLM_INFER_BATCH_SIZE` +- `RERANK_VLLM_SORT_BY_DOC_LENGTH` +- `RERANK_VLLM_LENGTH_SORT_MODE` + +## 6. 本轮关键结论(2026-03-11) + +基于报告: + +- `perf_reports/20260311/reranker_1000docs/report.md` + +结论: + +- 对在线重排更重要的单请求延迟(`c=1`)指标,`infer_batch_size=64` 最优 +- `infer_batch_size=96` 在更高并发下吞吐略高,但会牺牲单请求延迟稳定性 +- 当前默认选择 `infer_batch_size=64` 作为平衡点 + +## 7. 生产建议 + +- 默认保持:`infer_batch_size: 64`、`sort_by_doc_length: true`、`length_sort_mode: "char"` +- 满足以下条件时可考虑提高到 `96`:业务以吞吐优先、可接受更高单请求延迟、已通过同机同数据压测验证收益 +- 每次改动后都必须复跑 `benchmark_reranker_1000docs.sh` 并归档结果 + +## 8. 故障排查 + +### 8.1 `start reranker` 显示 healthy 后很快退出 + +排查顺序: + +```bash +./scripts/service_ctl.sh status reranker +tail -n 200 logs/reranker.log +lsof -i :6007 -P -n +``` + +说明: + +- 该类问题已在 `scripts/service_ctl.sh` 中通过 reranker 独立启动分支与稳定健康检查进行修复 +- 若仍出现,优先检查 GPU 资源争用、vLLM 初始化失败或端口冲突 + +### 8.2 启动慢 + +现象: + +- 首次启动会加载模型、torch.compile、CUDA graph capture,耗时较长 + +建议: + +- 启动时预留足够健康检查窗口 +- 避免频繁重启(编译缓存在 `.runtime/reranker` 下可复用) + +### 8.3 OOM 或显存不足 + +优先调整: + +- 降低 `gpu_memory_utilization` +- 降低 `infer_batch_size` +- 检查是否有其他进程占用同卡 + +## 9. 变更与验证清单 + +每次 reranker 调优改动后,至少完成: + +- 配置变更已同步 `config/config.yaml` +- 文档变更已同步 `reranker/README.md` 与本手册 +- 压测报告已归档到 `perf_reports//reranker_1000docs/` +- 启停验证通过:`./scripts/service_ctl.sh start reranker`、`./scripts/service_ctl.sh status reranker`、`curl http://127.0.0.1:6007/health` diff --git a/reranker/README.md b/reranker/README.md index c217140..f8534d6 100644 --- a/reranker/README.md +++ b/reranker/README.md @@ -1,6 +1,6 @@ # Reranker 模块 -**请求示例**见 `docs/QUICKSTART.md` §3.5。扩展规范见 `docs/DEVELOPER_GUIDE.md` §7。 +**请求示例**见 `docs/QUICKSTART.md` §3.5。扩展规范见 `docs/DEVELOPER_GUIDE.md` §7。部署与调优实战见 `reranker/DEPLOYMENT_AND_TUNING.md`。 --- @@ -48,7 +48,13 @@ services: enable_warmup: true qwen3_vllm: model_name: "Qwen/Qwen3-Reranker-0.6B" - max_model_len: 8192 + max_model_len: 256 + infer_batch_size: 64 + sort_by_doc_length: true + length_sort_mode: "char" # char | token + enable_prefix_caching: true + enforce_eager: false + instruction: "Given a web search query, retrieve relevant passages that answer the query" qwen3_transformers: model_name: "Qwen/Qwen3-Reranker-0.6B" instruction: "Given a web search query, retrieve relevant passages that answer the query" @@ -57,7 +63,6 @@ services: use_fp16: true tensor_parallel_size: 1 gpu_memory_utilization: 0.8 - enable_prefix_caching: true instruction: "Given a web search query, retrieve relevant passages that answer the query" ``` @@ -69,6 +74,12 @@ services: ``` 该脚本会使用隔离环境 `.venv-reranker`;首次请先执行 `./scripts/setup_reranker_venv.sh`。 +## 性能压测(1000 docs) +```bash +./scripts/benchmark_reranker_1000docs.sh +``` +输出目录:`perf_reports//reranker_1000docs/`。 + ## API ### Health ``` @@ -118,5 +129,7 @@ uvicorn reranker.server:app --host 0.0.0.0 --port 6007 --log-level info ## Notes - 无请求级缓存;输入按字符串去重后推理,再按原始顺序回填分数。 - 空或 null 的 doc 跳过并计为 0。 +- **Qwen3-vLLM 分批策略**:`docs` 请求体可为 1000+,服务端会按 `infer_batch_size` 拆分;当 `sort_by_doc_length=true` 时,会先按文档长度排序后分批,减少 padding 开销,最终再按输入顺序回填分数。`length_sort_mode` 支持 `char`(默认,更快)与 `token`(更精确)。 +- 运行时可用环境变量临时覆盖批量参数:`RERANK_VLLM_INFER_BATCH_SIZE`、`RERANK_VLLM_SORT_BY_DOC_LENGTH`、`RERANK_VLLM_LENGTH_SORT_MODE`。 - **Qwen3-vLLM**:参考 [Qwen3-Reranker-0.6B](https://huggingface.co/Qwen/Qwen3-Reranker-0.6B),需 GPU 与较多显存;与 BGE 相比适合长文本、高吞吐场景(vLLM 前缀缓存)。 - **Qwen3-Transformers**:官方 Transformers Usage 方式,无需 vLLM;适合 CPU 或小显存,可选 `attn_implementation: "flash_attention_2"` 加速。 diff --git a/reranker/backends/batching_utils.py b/reranker/backends/batching_utils.py new file mode 100644 index 0000000..ed48f96 --- /dev/null +++ b/reranker/backends/batching_utils.py @@ -0,0 +1,41 @@ +"""Utilities for reranker batching and deduplication.""" + +from __future__ import annotations + +from typing import Iterable, List, Sequence, Tuple + + +def deduplicate_with_positions(texts: Sequence[str]) -> Tuple[List[str], List[int]]: + """ + Deduplicate texts globally while preserving first-seen order. + + Returns: + unique_texts: deduplicated texts in first-seen order + position_to_unique: mapping from each original position to unique index + """ + unique_texts: List[str] = [] + position_to_unique: List[int] = [] + seen: dict[str, int] = {} + + for text in texts: + idx = seen.get(text) + if idx is None: + idx = len(unique_texts) + seen[text] = idx + unique_texts.append(text) + position_to_unique.append(idx) + + return unique_texts, position_to_unique + + +def sort_indices_by_length(lengths: Sequence[int]) -> List[int]: + """Return stable ascending indices by lengths.""" + return sorted(range(len(lengths)), key=lambda i: lengths[i]) + + +def iter_batches(indices: Sequence[int], batch_size: int) -> Iterable[List[int]]: + """Yield consecutive batches from indices.""" + if batch_size <= 0: + raise ValueError(f"batch_size must be > 0, got {batch_size}") + for i in range(0, len(indices), batch_size): + yield list(indices[i : i + batch_size]) diff --git a/reranker/backends/qwen3_vllm.py b/reranker/backends/qwen3_vllm.py index f6ebce4..7a08450 100644 --- a/reranker/backends/qwen3_vllm.py +++ b/reranker/backends/qwen3_vllm.py @@ -9,9 +9,16 @@ from __future__ import annotations import logging import math +import os import threading import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple + +from reranker.backends.batching_utils import ( + deduplicate_with_positions, + iter_batches, + sort_indices_by_length, +) logger = logging.getLogger("reranker.backends.qwen3_vllm") @@ -60,10 +67,23 @@ class Qwen3VLLMRerankerBackend: self._config.get("instruction") or "Given a web search query, retrieve relevant passages that answer the query" ) + infer_batch_size = os.getenv("RERANK_VLLM_INFER_BATCH_SIZE") or self._config.get("infer_batch_size", 64) + sort_by_doc_length = os.getenv("RERANK_VLLM_SORT_BY_DOC_LENGTH") + if sort_by_doc_length is None: + sort_by_doc_length = self._config.get("sort_by_doc_length", True) + length_sort_mode = os.getenv("RERANK_VLLM_LENGTH_SORT_MODE") or self._config.get("length_sort_mode", "char") + + self._infer_batch_size = int(infer_batch_size) + self._sort_by_doc_length = str(sort_by_doc_length).strip().lower() in {"1", "true", "yes", "y", "on"} + self._length_sort_mode = str(length_sort_mode).strip().lower() if not torch.cuda.is_available(): raise RuntimeError("qwen3_vllm backend requires CUDA GPU, but torch.cuda.is_available() is False") if dtype not in {"float16", "half", "auto"}: raise ValueError(f"Unsupported dtype for qwen3_vllm: {dtype!r}. Use float16/half/auto.") + if self._infer_batch_size <= 0: + raise ValueError(f"infer_batch_size must be > 0, got {self._infer_batch_size}") + if self._length_sort_mode not in {"char", "token"}: + raise ValueError(f"length_sort_mode must be 'char' or 'token', got {self._length_sort_mode!r}") logger.info( "[Qwen3_VLLM] Loading model %s (max_model_len=%s, tp=%s, gpu_mem=%.2f, dtype=%s, prefix_caching=%s)", @@ -169,6 +189,30 @@ class Qwen3VLLMRerankerBackend: scores.append(float(score)) return scores + def _estimate_doc_lengths(self, docs: List[str]) -> List[int]: + """ + Estimate token lengths for sorting documents into similar-length batches. + Falls back to character length when tokenizer length output is unavailable. + """ + if not docs: + return [] + if self._length_sort_mode == "char": + return [len(text) for text in docs] + try: + enc = self._tokenizer( + docs, + add_special_tokens=False, + truncation=True, + max_length=self._max_prompt_len, + return_length=True, + ) + lengths = enc.get("length") + if isinstance(lengths, list) and len(lengths) == len(docs): + return [int(x) for x in lengths] + except Exception as exc: + logger.debug("Length estimation fallback to char length: %s", exc) + return [len(text) for text in docs] + def score_with_meta( self, query: str, @@ -200,22 +244,35 @@ class Qwen3VLLMRerankerBackend: "model": self._model_name, "backend": "qwen3_vllm", "normalize": normalize, + "infer_batch_size": self._infer_batch_size, + "inference_batches": 0, + "sort_by_doc_length": self._sort_by_doc_length, + "length_sort_mode": self._length_sort_mode, } - # Deduplicate by text, keep mapping to original indices - unique_texts: List[str] = [] - position_to_unique: List[int] = [] - prev: Optional[str] = None - for _idx, text in indexed: - if text != prev: - unique_texts.append(text) - prev = text - position_to_unique.append(len(unique_texts) - 1) - - pairs = [(query, t) for t in unique_texts] - with self._infer_lock: + # Deduplicate globally by text, keep mapping to original indices. + indexed_texts = [text for _, text in indexed] + unique_texts, position_to_unique = deduplicate_with_positions(indexed_texts) + + lengths = self._estimate_doc_lengths(unique_texts) + order = list(range(len(unique_texts))) + if self._sort_by_doc_length and len(unique_texts) > 1: + order = sort_indices_by_length(lengths) + + unique_scores: List[float] = [0.0] * len(unique_texts) + inference_batches = 0 + for batch_indices in iter_batches(order, self._infer_batch_size): + inference_batches += 1 + pairs = [(query, unique_texts[i]) for i in batch_indices] prompts = self._process_inputs(pairs) - unique_scores = self._compute_scores(prompts) + with self._infer_lock: + batch_scores = self._compute_scores(prompts) + if len(batch_scores) != len(batch_indices): + raise RuntimeError( + f"Reranker score size mismatch: expected {len(batch_indices)}, got {len(batch_scores)}" + ) + for idx, score in zip(batch_indices, batch_scores): + unique_scores[idx] = float(score) for (orig_idx, _), unique_idx in zip(indexed, position_to_unique): # Score is already P(yes) in [0,1] from yes/(yes+no) @@ -235,5 +292,9 @@ class Qwen3VLLMRerankerBackend: "model": self._model_name, "backend": "qwen3_vllm", "normalize": normalize, + "infer_batch_size": self._infer_batch_size, + "inference_batches": inference_batches, + "sort_by_doc_length": self._sort_by_doc_length, + "length_sort_mode": self._length_sort_mode, } return output_scores, meta diff --git a/scripts/benchmark_reranker_1000docs.sh b/scripts/benchmark_reranker_1000docs.sh new file mode 100755 index 0000000..8f06e85 --- /dev/null +++ b/scripts/benchmark_reranker_1000docs.sh @@ -0,0 +1,131 @@ +#!/bin/bash +# +# Benchmark reranker for e-commerce short-text workload: +# - query <= ~100 tokens +# - docs are short title / title+brief +# - one request contains ~1000 docs +# +# Outputs JSON reports under perf_reports//reranker_1000docs/ +# +# Usage: +# ./scripts/benchmark_reranker_1000docs.sh +# Optional env: +# BATCH_SIZES="24 32 48 64" +# C1_REQUESTS=4 +# C4_REQUESTS=8 +# TENANT_ID=162 +# +set -euo pipefail + +PROJECT_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "${PROJECT_ROOT}" + +TENANT_ID="${TENANT_ID:-162}" +BATCH_SIZES="${BATCH_SIZES:-24 32 48 64}" +C1_REQUESTS="${C1_REQUESTS:-4}" +C4_REQUESTS="${C4_REQUESTS:-8}" +TIMEOUT_SEC="${TIMEOUT_SEC:-240}" +RERANK_BASE="${RERANK_BASE:-http://127.0.0.1:6007}" + +DATE_TAG="$(date +%Y%m%d)" +OUT_DIR="perf_reports/${DATE_TAG}/reranker_1000docs" +TMP_CASES="/tmp/rerank_1000_shortdocs_cases.json" +mkdir -p "${OUT_DIR}" + +cleanup() { + ./scripts/stop_reranker.sh >/dev/null 2>&1 || true +} +trap cleanup EXIT + +cat > "${TMP_CASES}" <<'JSON' +{ + "scenarios": { + "rerank": [ + { + "method": "POST", + "path": "/rerank", + "json": { + "query": "wireless ergonomic gaming mouse for office use with rechargeable battery and bluetooth", + "docs": [], + "normalize": true + } + } + ] + } +} +JSON + +python3 - <<'PY' +import json +from pathlib import Path + +p = Path("/tmp/rerank_1000_shortdocs_cases.json") +d = json.loads(p.read_text(encoding="utf-8")) +docs = [] +for i in range(1000): + if i % 3 == 0: + doc = f"wireless mouse model {i} ergonomic grip 2.4g bluetooth" + elif i % 3 == 1: + doc = f"gaming mouse {i} rgb lightweight high precision sensor" + else: + doc = f"office mouse {i} rechargeable silent click compact" + if i % 5 == 0: + doc += " with usb receiver" + if i % 7 == 0: + doc += " long battery life" + docs.append(doc) + +d["scenarios"]["rerank"][0]["json"]["docs"] = docs +p.write_text(json.dumps(d, ensure_ascii=False), encoding="utf-8") +print(f"[info] generated docs={len(docs)} at {p}") +PY + +run_bench() { + local bs="$1" + local c="$2" + local req="$3" + local out="${OUT_DIR}/rerank_bs${bs}_c${c}_r${req}.json" + .venv/bin/python scripts/perf_api_benchmark.py \ + --scenario rerank \ + --tenant-id "${TENANT_ID}" \ + --reranker-base "${RERANK_BASE}" \ + --cases-file "${TMP_CASES}" \ + --concurrency "${c}" \ + --max-requests "${req}" \ + --timeout "${TIMEOUT_SEC}" \ + --output "${out}" >/dev/null + python3 - <"${OUT_DIR}/start_bs${bs}.log" 2>&1 & + + for i in $(seq 1 180); do + if curl -sf "${RERANK_BASE}/health" >/dev/null 2>&1; then + break + fi + sleep 1 + if [ "${i}" -eq 180 ]; then + echo "[error] reranker startup timeout for bs=${bs}" >&2 + tail -n 80 "${OUT_DIR}/start_bs${bs}.log" >&2 || true + exit 1 + fi + done + + run_bench "${bs}" 1 "${C1_REQUESTS}" + run_bench "${bs}" 4 "${C4_REQUESTS}" +done + +echo "[info] benchmark done: ${OUT_DIR}" diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index f2734e4..6593349 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -98,6 +98,28 @@ wait_for_health() { return 1 } +wait_for_stable_health() { + local service="$1" + local checks="${2:-3}" + local interval_sec="${3:-1}" + local port + port="$(get_port "${service}")" + local path="/health" + + local i=0 + while [ "${i}" -lt "${checks}" ]; do + if ! is_running_by_port "${service}"; then + return 1 + fi + if ! curl -sf "http://127.0.0.1:${port}${path}" >/dev/null 2>&1; then + return 1 + fi + i=$((i + 1)) + sleep "${interval_sec}" + done + return 0 +} + is_running_by_pid() { local service="$1" local pf @@ -185,18 +207,12 @@ start_one() { return 1 fi ;; - backend|indexer|frontend|embedding|translator|reranker) + backend|indexer|frontend|embedding|translator) echo "[start] ${service}" nohup bash -lc "${cmd}" > "${lf}" 2>&1 & local pid=$! echo "${pid}" > "${pf}" - # Some services (notably reranker with vLLM backend) can take longer - # to load models / compile graphs on first start. Give them a longer - # health check window to avoid false negatives. local retries=30 - if [ "${service}" = "reranker" ]; then - retries=90 - fi if wait_for_health "${service}" "${retries}"; then echo "[ok] ${service} healthy (pid=${pid}, log=${lf})" else @@ -204,6 +220,25 @@ start_one() { return 1 fi ;; + reranker) + echo "[start] ${service}" + # Start reranker directly so pid file points to the script process that + # will exec uvicorn, avoiding extra shell wrapper lifecycle edge-cases. + nohup "${cmd}" > "${lf}" 2>&1 & + local pid=$! + echo "${pid}" > "${pf}" + if wait_for_health "${service}" 90; then + if wait_for_stable_health "${service}" 5 1; then + echo "[ok] ${service} healthy (pid=${pid}, log=${lf})" + else + echo "[error] ${service} became unavailable right after startup, inspect ${lf}" >&2 + return 1 + fi + else + echo "[error] ${service} health check timeout, inspect ${lf}" >&2 + return 1 + fi + ;; *) echo "[warn] ${service} unsupported start path" ;; diff --git a/tests/test_reranker_batching_utils.py b/tests/test_reranker_batching_utils.py new file mode 100644 index 0000000..709d327 --- /dev/null +++ b/tests/test_reranker_batching_utils.py @@ -0,0 +1,32 @@ +import pytest + +from reranker.backends.batching_utils import ( + deduplicate_with_positions, + iter_batches, + sort_indices_by_length, +) + + +def test_deduplicate_with_positions_global_not_adjacent(): + texts = ["a", "b", "a", "c", "b", "a"] + unique, mapping = deduplicate_with_positions(texts) + assert unique == ["a", "b", "c"] + assert mapping == [0, 1, 0, 2, 1, 0] + + +def test_sort_indices_by_length_stable(): + lengths = [5, 2, 2, 9, 4] + order = sort_indices_by_length(lengths) + # Stable sort: index 1 remains ahead of index 2 when lengths are equal. + assert order == [1, 2, 4, 0, 3] + + +def test_iter_batches(): + indices = list(range(10)) + batches = list(iter_batches(indices, 4)) + assert batches == [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]] + + +def test_iter_batches_invalid_batch_size(): + with pytest.raises(ValueError, match="batch_size must be > 0"): + list(iter_batches([0, 1], 0)) -- libgit2 0.21.2