bge_model.py 2.44 KB
"""
BGE-M3 local text embedding implementation.

Internal model implementation used by the embedding service.
"""

import threading
from typing import List, Union

import numpy as np
from sentence_transformers import SentenceTransformer
from modelscope import snapshot_download


class BgeTextModel(object):
    """
    Thread-safe singleton text encoder using BGE-M3 model (local inference).
    """

    _instance = None
    _lock = threading.Lock()

    def __new__(cls, model_dir: str = "Xorbits/bge-m3"):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(BgeTextModel, cls).__new__(cls)
                cls._instance.model = SentenceTransformer(snapshot_download(model_dir))
        return cls._instance

    def encode(
        self,
        sentences: Union[str, List[str]],
        normalize_embeddings: bool = True,
        device: str = "cuda",
        batch_size: int = 32,
    ) -> np.ndarray:
        if device == "gpu":
            device = "cuda"

        # Try requested device, fallback to CPU if CUDA fails
        try:
            if device == "cuda":
                import torch

                if torch.cuda.is_available():
                    free_memory = (
                        torch.cuda.get_device_properties(0).total_memory
                        - torch.cuda.memory_allocated()
                    )
                    if free_memory < 1024 * 1024 * 1024:  # 1GB
                        device = "cpu"
                else:
                    device = "cpu"

            self.model = self.model.to(device)
            embeddings = self.model.encode(
                sentences,
                normalize_embeddings=normalize_embeddings,
                device=device,
                show_progress_bar=False,
                batch_size=batch_size,
            )
            return embeddings

        except Exception:
            if device != "cpu":
                self.model = self.model.to("cpu")
                embeddings = self.model.encode(
                    sentences,
                    normalize_embeddings=normalize_embeddings,
                    device="cpu",
                    show_progress_bar=False,
                    batch_size=batch_size,
                )
                return embeddings
            raise

    def encode_batch(self, texts: List[str], batch_size: int = 32, device: str = "cuda") -> np.ndarray:
        return self.encode(texts, batch_size=batch_size, device=device)