qwen3_model.py 2.33 KB
"""
Qwen3-Embedding-0.6B local text embedding implementation.

Internal model implementation used by the embedding service.
"""

import threading
from typing import List, Union

import numpy as np
from sentence_transformers import SentenceTransformer
import torch


class Qwen3TextModel(object):
    """
    Thread-safe singleton text encoder using Qwen3-Embedding-0.6B (local inference).
    """

    _instance = None
    _lock = threading.Lock()

    def __new__(cls, model_id: str = "Qwen/Qwen3-Embedding-0.6B"):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(Qwen3TextModel, cls).__new__(cls)
                cls._instance.model = SentenceTransformer(model_id, trust_remote_code=True)
                cls._instance._current_device = None
                cls._instance._encode_lock = threading.Lock()
        return cls._instance

    def _ensure_device(self, device: str) -> str:
        target = (device or "cpu").strip().lower()
        if target == "gpu":
            target = "cuda"
        if target == "cuda" and not torch.cuda.is_available():
            target = "cpu"
        if target != self._current_device:
            self.model = self.model.to(target)
            self._current_device = target
        return target

    def encode(
        self,
        sentences: Union[str, List[str]],
        normalize_embeddings: bool = True,
        device: str = "cuda",
        batch_size: int = 32,
    ) -> np.ndarray:
        # SentenceTransformer + CUDA inference is not thread-safe in our usage;
        # keep one in-flight encode call while avoiding repeated .to(device) hops.
        with self._encode_lock:
            run_device = self._ensure_device(device)
            embeddings = self.model.encode(
                sentences,
                normalize_embeddings=normalize_embeddings,
                device=run_device,
                show_progress_bar=False,
                batch_size=batch_size,
            )
            return embeddings

    def encode_batch(
        self,
        texts: List[str],
        batch_size: int = 32,
        device: str = "cuda",
        normalize_embeddings: bool = True,
    ) -> np.ndarray:
        return self.encode(
            texts,
            batch_size=batch_size,
            device=device,
            normalize_embeddings=normalize_embeddings,
        )