From 9de5ef4988cef11a85d55e9eed44075d74a71a49 Mon Sep 17 00:00:00 2001 From: tangwang Date: Wed, 25 Mar 2026 16:11:12 +0800 Subject: [PATCH] qwen3_vllm_score : task="score" +(原版 + hf_overrides)或 HuggingFace 上已转好的 seq-cls 模型。generate() --- config/config.yaml | 24 ++++++++++++++++++++++-- reranker/backends/__init__.py | 5 ++++- reranker/backends/qwen3_vllm.py | 4 ++-- reranker/backends/qwen3_vllm_score.py | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ reranker/server.py | 2 +- scripts/lib/reranker_backend_env.sh | 4 ++-- scripts/start_reranker.sh | 6 +++--- 7 files changed, 294 insertions(+), 11 deletions(-) create mode 100644 reranker/backends/qwen3_vllm_score.py diff --git a/config/config.yaml b/config/config.yaml index 79f7a79..e5dbf4c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -231,7 +231,7 @@ rerank: text_bias: 0.1 text_exponent: 0.35 knn_bias: 0.6 - knn_exponent: 0.2 + knn_exponent: 0.0 # 可扩展服务/provider 注册表(单一配置源) services: @@ -381,7 +381,7 @@ services: max_docs: 1000 normalize: true # 服务内后端(reranker 进程启动时读取) - backend: "qwen3_vllm" # bge | qwen3_vllm | qwen3_transformers | qwen3_gguf | qwen3_gguf_06b | dashscope_rerank + backend: "qwen3_vllm_score" # bge | qwen3_vllm | qwen3_vllm_score | qwen3_transformers | qwen3_gguf | qwen3_gguf_06b | dashscope_rerank backends: bge: model_name: "BAAI/bge-reranker-v2-m3" @@ -411,6 +411,26 @@ services: # instruction: "Relevance ranking: category & style match first" # instruction: "Score product relevance by query with category & style match prioritized" instruction: "Rank products by query with category & style match prioritized" + # vLLM LLM.score()(跨编码打分);与 qwen3_vllm 共用 .venv-reranker 与同模型权重(vLLM 0.17+ 用 runner/convert=auto,旧版曾用 task=score) + qwen3_vllm_score: + model_name: "Qwen/Qwen3-Reranker-0.6B" + # 官方 Hub 原版需 true;若改用已转换的 seq-cls 权重(如 tomaarsen/...-seq-cls)则设为 false + use_original_qwen3_hf_overrides: true + # 可选:与 vLLM 对齐;一般保持 auto + # vllm_runner: "auto" + # vllm_convert: "auto" + # 可选:在 use_original_qwen3_hf_overrides 为 true 时与内置 overrides 合并 + # hf_overrides: {} + engine: "vllm" + max_model_len: 160 + tensor_parallel_size: 1 + gpu_memory_utilization: 0.20 + dtype: "float16" + enable_prefix_caching: true + enforce_eager: false + infer_batch_size: 100 + sort_by_doc_length: true + instruction: "Rank products by query with category & style match prioritized" qwen3_transformers: model_name: "Qwen/Qwen3-Reranker-0.6B" instruction: "rank products by given query" diff --git a/reranker/backends/__init__.py b/reranker/backends/__init__.py index 2daf354..f0d4499 100644 --- a/reranker/backends/__init__.py +++ b/reranker/backends/__init__.py @@ -43,6 +43,9 @@ def get_rerank_backend(name: str, config: Dict[str, Any]) -> RerankBackendProtoc if name == "qwen3_vllm": from reranker.backends.qwen3_vllm import Qwen3VLLMRerankerBackend return Qwen3VLLMRerankerBackend(config) + if name == "qwen3_vllm_score": + from reranker.backends.qwen3_vllm_score import Qwen3VLLMScoreRerankerBackend + return Qwen3VLLMScoreRerankerBackend(config) if name == "qwen3_transformers": from reranker.backends.qwen3_transformers import Qwen3TransformersRerankerBackend return Qwen3TransformersRerankerBackend(config) @@ -60,7 +63,7 @@ def get_rerank_backend(name: str, config: Dict[str, Any]) -> RerankBackendProtoc from reranker.backends.dashscope_rerank import DashScopeRerankBackend return DashScopeRerankBackend(config) raise ValueError( - f"Unknown rerank backend: {name!r}. Supported: bge, qwen3_vllm, qwen3_transformers, qwen3_gguf, qwen3_gguf_06b, dashscope_rerank" + f"Unknown rerank backend: {name!r}. Supported: bge, qwen3_vllm, qwen3_vllm_score, qwen3_transformers, qwen3_gguf, qwen3_gguf_06b, dashscope_rerank" ) diff --git a/reranker/backends/qwen3_vllm.py b/reranker/backends/qwen3_vllm.py index 5dbe952..4903700 100644 --- a/reranker/backends/qwen3_vllm.py +++ b/reranker/backends/qwen3_vllm.py @@ -50,11 +50,11 @@ def _format_instruction(instruction: str, query: str, doc: str) -> List[Dict[str return [ { "role": "system", - "content": "Judge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be \"yes\" or \"no\".", + "content": instruction, }, { "role": "user", - "content": f": {instruction}\n\n: {query}\n\n: {doc}", + "content": f": {query}\n\n: {doc}", }, ] diff --git a/reranker/backends/qwen3_vllm_score.py b/reranker/backends/qwen3_vllm_score.py new file mode 100644 index 0000000..c26580e --- /dev/null +++ b/reranker/backends/qwen3_vllm_score.py @@ -0,0 +1,260 @@ +""" +Qwen3-Reranker via vLLM ``task="score"`` (official pooling/score API). + +Matches vLLM ``examples/offline_inference/qwen3_reranker.py``: paired ``llm.score(query_texts, doc_texts)`` +with the recommended prefix/suffix templates. Same venv and default model as ``qwen3_vllm``. + +Reference: https://docs.vllm.ai/ (Qwen3 reranker example) +https://docs.vllm.com.cn/en/latest/examples/offline_inference/qwen3_reranker.html +""" + +from __future__ import annotations + +import logging +import os +import threading +import time +from typing import Any, Dict, List, Tuple + +logger = logging.getLogger("reranker.backends.qwen3_vllm_score") + +import torch +from vllm import LLM + +from reranker.backends.qwen3_vllm import deduplicate_with_positions + +# Official vLLM Qwen3 reranker prompt layout (im_start blocks + assistant suffix). +_DEFAULT_PREFIX = ( + "<|im_start|>system\n" + "Judge whether the Document meets the requirements based on the Query and the Instruct " + 'provided. Note that the answer can only be "yes" or "no".' + "<|im_end|>\n<|im_start|>user\n" +) +_DEFAULT_SUFFIX = "<|im_end|>\n<|im_start|>assistant\n\n\n\n\n" +_DEFAULT_QUERY_TEMPLATE = "{prefix}: {instruction}\n: {query}\n" +_DEFAULT_DOCUMENT_TEMPLATE = ": {doc}{suffix}" + + +class Qwen3VLLMScoreRerankerBackend: + """ + Qwen3 reranker using vLLM ``LLM(..., task="score")`` and ``llm.score(queries, documents)``. + + Config from ``services.rerank.backends.qwen3_vllm_score``. + """ + + def __init__(self, config: Dict[str, Any]) -> None: + self._config = config or {} + model_name = str(self._config.get("model_name") or "Qwen/Qwen3-Reranker-0.6B") + max_model_len = int(self._config.get("max_model_len", 2048)) + tensor_parallel_size = int(self._config.get("tensor_parallel_size", 1)) + gpu_memory_utilization = float(self._config.get("gpu_memory_utilization", 0.4)) + enable_prefix_caching = bool(self._config.get("enable_prefix_caching", False)) + enforce_eager = bool(self._config.get("enforce_eager", True)) + dtype = str(self._config.get("dtype", "float16")).strip().lower() + use_hf_overrides = self._config.get("use_original_qwen3_hf_overrides") + if use_hf_overrides is None: + use_hf_overrides = True + use_hf_overrides = bool(use_hf_overrides) + + self._instruction = str( + self._config.get("instruction") + or "Given a query, score the product for relevance" + ) + self._prefix = str(self._config.get("prompt_prefix") or _DEFAULT_PREFIX) + self._suffix = str(self._config.get("prompt_suffix") or _DEFAULT_SUFFIX) + self._query_template = str(self._config.get("query_template") or _DEFAULT_QUERY_TEMPLATE) + self._document_template = str( + self._config.get("document_template") or _DEFAULT_DOCUMENT_TEMPLATE + ) + + infer_batch_size = os.getenv("RERANK_VLLM_INFER_BATCH_SIZE") or self._config.get( + "infer_batch_size", 64 + ) + sort_by_doc_length = os.getenv("RERANK_VLLM_SORT_BY_DOC_LENGTH") + if sort_by_doc_length is None: + sort_by_doc_length = self._config.get("sort_by_doc_length", True) + + self._infer_batch_size = int(infer_batch_size) + self._sort_by_doc_length = str(sort_by_doc_length).strip().lower() in { + "1", + "true", + "yes", + "y", + "on", + } + + if not torch.cuda.is_available(): + raise RuntimeError( + "qwen3_vllm_score backend requires CUDA GPU, but torch.cuda.is_available() is False" + ) + if dtype not in {"float16", "half", "auto"}: + raise ValueError( + f"Unsupported dtype for qwen3_vllm_score: {dtype!r}. Use float16/half/auto." + ) + if self._infer_batch_size <= 0: + raise ValueError(f"infer_batch_size must be > 0, got {self._infer_batch_size}") + + runner = str(self._config.get("vllm_runner") or "auto").strip().lower() + convert = str(self._config.get("vllm_convert") or "auto").strip().lower() + if runner not in {"auto", "generate", "pooling", "draft"}: + raise ValueError(f"Invalid vllm_runner: {runner!r}") + if convert not in {"auto", "none", "embed", "classify"}: + raise ValueError(f"Invalid vllm_convert: {convert!r}") + + logger.info( + "[Qwen3_VLLM_SCORE] Loading model %s (LLM.score API, runner=%s, convert=%s, " + "hf_overrides=%s, max_model_len=%s, tp=%s, gpu_mem=%.2f, dtype=%s, prefix_caching=%s)", + model_name, + runner, + convert, + use_hf_overrides, + max_model_len, + tensor_parallel_size, + gpu_memory_utilization, + dtype, + enable_prefix_caching, + ) + + # vLLM 0.17+ uses runner/convert instead of LLM(..., task="score"). With the official + # Qwen3 reranker hf_overrides, architecture becomes *ForSequenceClassification -> pooling+classify. + llm_kwargs: Dict[str, Any] = { + "model": model_name, + "runner": runner, + "convert": convert, + "tensor_parallel_size": tensor_parallel_size, + "max_model_len": max_model_len, + "gpu_memory_utilization": gpu_memory_utilization, + "enable_prefix_caching": enable_prefix_caching, + "enforce_eager": enforce_eager, + "dtype": dtype, + } + hf_overrides: Dict[str, Any] = dict(self._config.get("hf_overrides") or {}) + if use_hf_overrides: + hf_overrides = { + **hf_overrides, + "architectures": ["Qwen3ForSequenceClassification"], + "classifier_from_token": ["no", "yes"], + "is_original_qwen3_reranker": True, + } + if hf_overrides: + llm_kwargs["hf_overrides"] = hf_overrides + + self._llm = LLM(**llm_kwargs) + # vLLM score path: single-process safety (mirrors generate backend until verified). + self._infer_lock = threading.Lock() + + self._model_name = model_name + logger.info("[Qwen3_VLLM_SCORE] Model ready | model=%s", model_name) + + def _format_pair(self, query: str, doc: str) -> Tuple[str, str]: + q_text = self._query_template.format( + prefix=self._prefix, + instruction=self._instruction, + query=query, + ) + d_text = self._document_template.format(doc=doc, suffix=self._suffix) + return q_text, d_text + + def _score_batch(self, pairs: List[Tuple[str, str]]) -> List[float]: + if not pairs: + return [] + queries: List[str] = [] + documents: List[str] = [] + for q, d in pairs: + qt, dt = self._format_pair(q, d) + queries.append(qt) + documents.append(dt) + with self._infer_lock: + outputs = self._llm.score(queries, documents, use_tqdm=False) + scores: List[float] = [] + for out in outputs: + so = out.outputs + scores.append(float(so.score)) + return scores + + @staticmethod + def _estimate_doc_lengths(docs: List[str]) -> List[int]: + if not docs: + return [] + return [len(text) for text in docs] + + def score_with_meta( + self, + query: str, + docs: List[str], + normalize: bool = True, + ) -> Tuple[List[float], Dict[str, Any]]: + start_ts = time.time() + total_docs = len(docs) if docs else 0 + output_scores: List[float] = [0.0] * total_docs + + query = "" if query is None else str(query).strip() + indexed: List[Tuple[int, str]] = [] + for i, doc in enumerate(docs or []): + if doc is None: + continue + text = str(doc).strip() + if not text: + continue + indexed.append((i, text)) + + if not query or not indexed: + elapsed_ms = (time.time() - start_ts) * 1000.0 + return output_scores, { + "input_docs": total_docs, + "usable_docs": len(indexed), + "unique_docs": 0, + "dedup_ratio": 0.0, + "elapsed_ms": round(elapsed_ms, 3), + "model": self._model_name, + "backend": "qwen3_vllm_score", + "normalize": normalize, + "infer_batch_size": self._infer_batch_size, + "inference_batches": 0, + "sort_by_doc_length": self._sort_by_doc_length, + } + + indexed_texts = [text for _, text in indexed] + unique_texts, position_to_unique = deduplicate_with_positions(indexed_texts) + + lengths = self._estimate_doc_lengths(unique_texts) + order = list(range(len(unique_texts))) + if self._sort_by_doc_length and len(unique_texts) > 1: + order = sorted(order, key=lambda i: lengths[i]) + + unique_scores: List[float] = [0.0] * len(unique_texts) + inference_batches = 0 + for start in range(0, len(order), self._infer_batch_size): + batch_indices = order[start : start + self._infer_batch_size] + inference_batches += 1 + pairs = [(query, unique_texts[i]) for i in batch_indices] + batch_scores = self._score_batch(pairs) + if len(batch_scores) != len(batch_indices): + raise RuntimeError( + f"Reranker score size mismatch: expected {len(batch_indices)}, got {len(batch_scores)}" + ) + for idx, score in zip(batch_indices, batch_scores): + unique_scores[idx] = float(score) + + for (orig_idx, _), unique_idx in zip(indexed, position_to_unique): + output_scores[orig_idx] = float(unique_scores[unique_idx]) + + elapsed_ms = (time.time() - start_ts) * 1000.0 + dedup_ratio = 0.0 + if indexed: + dedup_ratio = 1.0 - (len(unique_texts) / float(len(indexed))) + + meta = { + "input_docs": total_docs, + "usable_docs": len(indexed), + "unique_docs": len(unique_texts), + "dedup_ratio": round(dedup_ratio, 4), + "elapsed_ms": round(elapsed_ms, 3), + "model": self._model_name, + "backend": "qwen3_vllm_score", + "normalize": normalize, + "infer_batch_size": self._infer_batch_size, + "inference_batches": inference_batches, + "sort_by_doc_length": self._sort_by_doc_length, + } + return output_scores, meta diff --git a/reranker/server.py b/reranker/server.py index 5faccfc..4b60af3 100644 --- a/reranker/server.py +++ b/reranker/server.py @@ -7,7 +7,7 @@ Request: { "query": "...", "docs": ["doc1", "doc2", ...], "normalize": optional Response: { "scores": [float], "meta": {...} } Backend selected via config: services.rerank.backend -(bge | qwen3_vllm | qwen3_transformers | qwen3_gguf | qwen3_gguf_06b | dashscope_rerank), env RERANK_BACKEND. +(bge | qwen3_vllm | qwen3_vllm_score | qwen3_transformers | qwen3_gguf | qwen3_gguf_06b | dashscope_rerank), env RERANK_BACKEND. """ import logging diff --git a/scripts/lib/reranker_backend_env.sh b/scripts/lib/reranker_backend_env.sh index 4aec319..f5812b3 100644 --- a/scripts/lib/reranker_backend_env.sh +++ b/scripts/lib/reranker_backend_env.sh @@ -38,7 +38,7 @@ reranker_backend_venv_dir() { local backend="$2" case "${backend}" in - qwen3_vllm) printf '%s/.venv-reranker\n' "${project_root}" ;; + qwen3_vllm|qwen3_vllm_score) printf '%s/.venv-reranker\n' "${project_root}" ;; qwen3_gguf) printf '%s/.venv-reranker-gguf\n' "${project_root}" ;; qwen3_gguf_06b) printf '%s/.venv-reranker-gguf-06b\n' "${project_root}" ;; qwen3_transformers) printf '%s/.venv-reranker-transformers\n' "${project_root}" ;; @@ -53,7 +53,7 @@ reranker_backend_requirements_file() { local backend="$2" case "${backend}" in - qwen3_vllm) printf '%s/requirements_reranker_qwen3_vllm.txt\n' "${project_root}" ;; + qwen3_vllm|qwen3_vllm_score) printf '%s/requirements_reranker_qwen3_vllm.txt\n' "${project_root}" ;; qwen3_gguf) printf '%s/requirements_reranker_qwen3_gguf.txt\n' "${project_root}" ;; qwen3_gguf_06b) printf '%s/requirements_reranker_qwen3_gguf_06b.txt\n' "${project_root}" ;; qwen3_transformers) printf '%s/requirements_reranker_qwen3_transformers.txt\n' "${project_root}" ;; diff --git a/scripts/start_reranker.sh b/scripts/start_reranker.sh index 9c4acd4..e86428b 100755 --- a/scripts/start_reranker.sh +++ b/scripts/start_reranker.sh @@ -47,9 +47,9 @@ if [[ "${RERANK_BACKEND}" == qwen3_gguf* ]]; then export HF_HUB_DISABLE_XET="${HF_HUB_DISABLE_XET:-1}" fi -if [[ "${RERANK_BACKEND}" == "qwen3_vllm" ]]; then +if [[ "${RERANK_BACKEND}" == "qwen3_vllm" || "${RERANK_BACKEND}" == "qwen3_vllm_score" ]]; then if ! command -v nvidia-smi >/dev/null 2>&1 || ! nvidia-smi >/dev/null 2>&1; then - echo "ERROR: qwen3_vllm backend requires NVIDIA GPU, but nvidia-smi is unavailable." >&2 + echo "ERROR: ${RERANK_BACKEND} backend requires NVIDIA GPU, but nvidia-smi is unavailable." >&2 exit 1 fi if ! "${PYTHON_BIN}" - <<'PY' @@ -62,7 +62,7 @@ except Exception: raise SystemExit(1) PY then - echo "ERROR: qwen3_vllm backend requires vllm + CUDA runtime in ${RERANKER_VENV}." >&2 + echo "ERROR: ${RERANK_BACKEND} backend requires vllm + CUDA runtime in ${RERANKER_VENV}." >&2 echo "Please run: ./scripts/setup_reranker_venv.sh ${RERANK_BACKEND} and verify CUDA is available." >&2 exit 1 fi -- libgit2 0.21.2