""" Qwen3-Reranker GGUF backend using llama-cpp-python. Reference: - https://huggingface.co/DevQuasar/Qwen.Qwen3-Reranker-4B-GGUF - https://huggingface.co/Qwen/Qwen3-Reranker-4B - https://huggingface.co/ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF - https://huggingface.co/Qwen/Qwen3-Reranker-0.6B """ from __future__ import annotations import logging import math import os import threading import time from pathlib import Path from typing import Any, Dict, List, Tuple logger = logging.getLogger("reranker.backends.qwen3_gguf") _BACKEND_DEFAULTS: Dict[str, Dict[str, str]] = { "qwen3_gguf": { "repo_id": "DevQuasar/Qwen.Qwen3-Reranker-4B-GGUF", "filename": "*Q8_0.gguf", "local_dir": "./models/reranker/qwen3-reranker-4b-gguf", }, "qwen3_gguf_06b": { "repo_id": "ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF", "filename": "qwen3-reranker-0.6b-q8_0.gguf", "local_dir": "./models/reranker/qwen3-reranker-0.6b-q8_0-gguf", }, } def deduplicate_with_positions(texts: List[str]) -> Tuple[List[str], List[int]]: """Deduplicate texts globally while preserving first-seen order.""" unique_texts: List[str] = [] position_to_unique: List[int] = [] seen: Dict[str, int] = {} for text in texts: idx = seen.get(text) if idx is None: idx = len(unique_texts) seen[text] = idx unique_texts.append(text) position_to_unique.append(idx) return unique_texts, position_to_unique def _format_instruction(instruction: str, query: str, doc: str) -> str: return ": {instruction}\n: {query}\n: {doc}".format( instruction=instruction, query=query, doc=doc, ) class Qwen3GGUFRerankerBackend: """ Qwen3-Reranker GGUF backend using llama.cpp through llama-cpp-python. Tuned for short-query / short-doc reranking on a single GPU. Config from services.rerank.backends.. """ def __init__(self, config: Dict[str, Any]) -> None: self._config = config or {} self._backend_name = str(self._config.get("_backend_name") or "qwen3_gguf").strip() defaults = _BACKEND_DEFAULTS.get(self._backend_name, _BACKEND_DEFAULTS["qwen3_gguf"]) self._repo_id = str(self._config.get("repo_id") or defaults["repo_id"]).strip() self._filename = str(self._config.get("filename") or defaults["filename"]).strip() self._model_path = str(self._config.get("model_path") or "").strip() self._cache_dir = str(self._config.get("cache_dir") or "").strip() or None self._local_dir = str(self._config.get("local_dir") or defaults["local_dir"]).strip() or None self._instruction = str( self._config.get("instruction") or "Rank products by query with category & style match prioritized" ) self._infer_batch_size = int( os.getenv("RERANK_GGUF_INFER_BATCH_SIZE") or self._config.get("infer_batch_size", 8) ) sort_by_doc_length = os.getenv("RERANK_GGUF_SORT_BY_DOC_LENGTH") if sort_by_doc_length is None: sort_by_doc_length = self._config.get("sort_by_doc_length", True) self._sort_by_doc_length = str(sort_by_doc_length).strip().lower() in { "1", "true", "yes", "y", "on", } self._length_sort_mode = str(self._config.get("length_sort_mode") or "char").strip().lower() self._reuse_query_state = bool(self._config.get("reuse_query_state", False)) n_ctx = int(self._config.get("n_ctx", self._config.get("max_model_len", 384))) n_batch = int(self._config.get("n_batch", min(n_ctx, 384))) n_ubatch = int(self._config.get("n_ubatch", min(n_batch, 128))) n_gpu_layers = int(self._config.get("n_gpu_layers", 24)) main_gpu = int(self._config.get("main_gpu", 0)) n_threads = int(self._config.get("n_threads", 2)) n_threads_batch = int(self._config.get("n_threads_batch", 4)) flash_attn = bool(self._config.get("flash_attn", True)) offload_kqv = bool(self._config.get("offload_kqv", True)) use_mmap = bool(self._config.get("use_mmap", True)) use_mlock = bool(self._config.get("use_mlock", False)) verbose = bool(self._config.get("verbose", False)) enable_warmup = bool(self._config.get("enable_warmup", True)) if self._infer_batch_size <= 0: raise ValueError(f"infer_batch_size must be > 0, got {self._infer_batch_size}") if n_ctx <= 0: raise ValueError(f"n_ctx must be > 0, got {n_ctx}") if n_batch <= 0 or n_ubatch <= 0: raise ValueError(f"n_batch/n_ubatch must be > 0, got {n_batch}/{n_ubatch}") try: from llama_cpp import Llama except Exception as exc: # pragma: no cover - depends on optional dependency raise RuntimeError( f"{self._backend_name} backend requires llama-cpp-python. " f"Install the {self._backend_name} backend venv first via " f"scripts/setup_reranker_venv.sh {self._backend_name}." ) from exc self._llama_class = Llama self._n_ctx = n_ctx self._n_batch = n_batch self._n_ubatch = n_ubatch self._n_gpu_layers = n_gpu_layers self._enable_warmup = enable_warmup self._infer_lock = threading.Lock() logger.info( "[Qwen3_GGUF] Loading backend=%s repo=%s filename=%s model_path=%s n_ctx=%s n_batch=%s n_ubatch=%s n_gpu_layers=%s flash_attn=%s offload_kqv=%s reuse_query_state=%s", self._backend_name, self._repo_id, self._filename, self._model_path or None, n_ctx, n_batch, n_ubatch, n_gpu_layers, flash_attn, offload_kqv, self._reuse_query_state, ) llm_kwargs = { "n_ctx": n_ctx, "n_batch": n_batch, "n_ubatch": n_ubatch, "n_gpu_layers": n_gpu_layers, "main_gpu": main_gpu, "n_threads": n_threads, "n_threads_batch": n_threads_batch, "logits_all": True, "offload_kqv": offload_kqv, "flash_attn": flash_attn, "use_mmap": use_mmap, "use_mlock": use_mlock, "verbose": verbose, } llm_kwargs = {key: value for key, value in llm_kwargs.items() if value is not None} self._llm = self._load_model(llm_kwargs) self._model_name = self._model_path or f"{self._repo_id}:{self._filename}" self._prefix = ( "<|im_start|>system\n" "Judge whether the Document meets the requirements based on the Query and the Instruct provided. " 'Note that the answer can only be "yes" or "no".' "<|im_end|>\n<|im_start|>user\n" ) self._suffix = "<|im_end|>\n<|im_start|>assistant\n\n\n\n\n" self._prefix_tokens = self._tokenize(self._prefix, special=True) self._suffix_tokens = self._tokenize(self._suffix, special=True) self._request_prefix_template = ": {instruction}\n: {query}\n: " self._effective_max_len = self._n_ctx - len(self._prefix_tokens) - len(self._suffix_tokens) if self._effective_max_len <= 16: raise RuntimeError( f"n_ctx={self._n_ctx} is too small after prompt overhead; effective={self._effective_max_len}" ) self._true_token = self._single_token_id("yes") self._false_token = self._single_token_id("no") if self._enable_warmup: self._warmup() logger.info( "[Qwen3_GGUF] Model ready | backend=%s model=%s effective_max_len=%s infer_batch_size=%s sort_by_doc_length=%s", self._backend_name, self._model_name, self._effective_max_len, self._infer_batch_size, self._sort_by_doc_length, ) def _load_model(self, llm_kwargs: Dict[str, Any]): if self._model_path: return self._llama_class(model_path=self._model_path, **llm_kwargs) if self._local_dir: matches = sorted( path for path in Path(self._local_dir).glob(self._filename) if path.is_file() ) if matches: local_model_path = str(matches[0].resolve()) logger.info("[Qwen3_GGUF] Using local GGUF file: %s", local_model_path) return self._llama_class(model_path=local_model_path, **llm_kwargs) return self._llama_class.from_pretrained( repo_id=self._repo_id, filename=self._filename, local_dir=self._local_dir, cache_dir=self._cache_dir, **llm_kwargs, ) def _tokenize(self, text: str, *, special: bool) -> List[int]: return list( self._llm.tokenize( text.encode("utf-8"), add_bos=False, special=special, ) ) def _single_token_id(self, text: str) -> int: token_ids = self._tokenize(text, special=False) if len(token_ids) != 1: raise RuntimeError(f"Expected {text!r} to be one token, got {token_ids}") return int(token_ids[0]) def _warmup(self) -> None: try: prompt = self._build_prompt_tokens("warmup query", "warmup document") with self._infer_lock: self._eval_logits(prompt) except Exception as exc: # pragma: no cover - defensive logger.warning("[Qwen3_GGUF] Warmup failed: %s", exc) def _build_request_prefix_tokens(self, query: str) -> List[int]: request_prefix = self._request_prefix_template.format( instruction=self._instruction, query=query, ) return self._tokenize(request_prefix, special=False) def _build_prompt_tokens(self, query: str, doc: str) -> List[int]: pair = _format_instruction(self._instruction, query, doc) pair_tokens = self._tokenize(pair, special=False) pair_tokens = pair_tokens[: self._effective_max_len] return self._prefix_tokens + pair_tokens + self._suffix_tokens def _eval_logits(self, prompt_tokens: List[int]) -> List[float]: self._llm.reset() self._llm.eval(prompt_tokens) logits = self._llm.eval_logits if not logits: raise RuntimeError("llama.cpp returned empty logits") return list(logits[-1]) def _score_prompt(self, prompt_tokens: List[int]) -> float: logits = self._eval_logits(prompt_tokens) true_logit = float(logits[self._true_token]) false_logit = float(logits[self._false_token]) max_logit = max(true_logit, false_logit) true_exp = math.exp(true_logit - max_logit) false_exp = math.exp(false_logit - max_logit) return float(true_exp / (true_exp + false_exp)) def _supports_query_state_reuse(self) -> bool: return ( self._reuse_query_state and hasattr(self._llm, "save_state") and hasattr(self._llm, "load_state") ) def _build_query_state_locked(self, query: str): request_prefix_tokens = self._build_request_prefix_tokens(query) max_doc_tokens = self._effective_max_len - len(request_prefix_tokens) if max_doc_tokens <= 0: return None, 0 self._llm.reset() self._llm.eval(self._prefix_tokens + request_prefix_tokens) return self._llm.save_state(), max_doc_tokens def _score_doc_with_state_locked(self, state, doc_tokens: List[int], max_doc_tokens: int) -> float: self._llm.load_state(state) self._llm.eval(doc_tokens[:max_doc_tokens] + self._suffix_tokens) logits = self._llm.eval_logits if not logits: raise RuntimeError("llama.cpp returned empty logits") final_logits = list(logits[-1]) true_logit = float(final_logits[self._true_token]) false_logit = float(final_logits[self._false_token]) max_logit = max(true_logit, false_logit) true_exp = math.exp(true_logit - max_logit) false_exp = math.exp(false_logit - max_logit) return float(true_exp / (true_exp + false_exp)) def _estimate_doc_lengths(self, docs: List[str]) -> List[int]: if self._length_sort_mode == "token": return [len(self._tokenize(text, special=False)) for text in docs] return [len(text) for text in docs] def score_with_meta( self, query: str, docs: List[str], normalize: bool = True, ) -> Tuple[List[float], Dict[str, Any]]: start_ts = time.time() total_docs = len(docs) if docs else 0 output_scores: List[float] = [0.0] * total_docs query = "" if query is None else str(query).strip() indexed: List[Tuple[int, str]] = [] for i, doc in enumerate(docs or []): if doc is None: continue text = str(doc).strip() if not text: continue indexed.append((i, text)) if not query or not indexed: elapsed_ms = (time.time() - start_ts) * 1000.0 return output_scores, { "input_docs": total_docs, "usable_docs": len(indexed), "unique_docs": 0, "dedup_ratio": 0.0, "elapsed_ms": round(elapsed_ms, 3), "model": self._model_name, "backend": self._backend_name, "normalize": normalize, "infer_batch_size": self._infer_batch_size, "inference_batches": 0, "sort_by_doc_length": self._sort_by_doc_length, "n_ctx": self._n_ctx, "n_batch": self._n_batch, "n_ubatch": self._n_ubatch, "n_gpu_layers": self._n_gpu_layers, } indexed_texts = [text for _, text in indexed] unique_texts, position_to_unique = deduplicate_with_positions(indexed_texts) lengths = self._estimate_doc_lengths(unique_texts) order = list(range(len(unique_texts))) if self._sort_by_doc_length and len(unique_texts) > 1: order = sorted(order, key=lambda i: lengths[i]) unique_scores: List[float] = [0.0] * len(unique_texts) unique_doc_tokens = [self._tokenize(text, special=False) for text in unique_texts] inference_batches = 0 with self._infer_lock: query_state = None max_doc_tokens = self._effective_max_len if self._supports_query_state_reuse(): query_state, max_doc_tokens = self._build_query_state_locked(query) for start in range(0, len(order), self._infer_batch_size): batch_indices = order[start : start + self._infer_batch_size] inference_batches += 1 for idx in batch_indices: if query_state is not None: unique_scores[idx] = self._score_doc_with_state_locked( query_state, unique_doc_tokens[idx], max_doc_tokens, ) else: prompt = self._build_prompt_tokens(query, unique_texts[idx]) unique_scores[idx] = self._score_prompt(prompt) for (orig_idx, _), unique_idx in zip(indexed, position_to_unique): output_scores[orig_idx] = float(unique_scores[unique_idx]) elapsed_ms = (time.time() - start_ts) * 1000.0 dedup_ratio = 0.0 if indexed: dedup_ratio = 1.0 - (len(unique_texts) / float(len(indexed))) meta = { "input_docs": total_docs, "usable_docs": len(indexed), "unique_docs": len(unique_texts), "dedup_ratio": round(dedup_ratio, 4), "elapsed_ms": round(elapsed_ms, 3), "model": self._model_name, "backend": self._backend_name, "normalize": normalize, "infer_batch_size": self._infer_batch_size, "inference_batches": inference_batches, "sort_by_doc_length": self._sort_by_doc_length, "length_sort_mode": self._length_sort_mode, "n_ctx": self._n_ctx, "n_batch": self._n_batch, "n_ubatch": self._n_ubatch, "n_gpu_layers": self._n_gpu_layers, "reuse_query_state": query_state is not None, } return output_scores, meta