""" Text embedding encoder using BGE-M3 model. Generates 1024-dimensional vectors for text using the BGE-M3 multilingual model. """ import sys import torch from sentence_transformers import SentenceTransformer import time import threading from modelscope import snapshot_download from transformers import AutoModel import os import numpy as np from typing import List, Union class BgeEncoder: """ Singleton text encoder using BGE-M3 model. Thread-safe singleton pattern ensures only one model instance exists. """ _instance = None _lock = threading.Lock() def __new__(cls, model_dir='Xorbits/bge-m3'): with cls._lock: if cls._instance is None: cls._instance = super(BgeEncoder, cls).__new__(cls) print(f"[BgeEncoder] Creating a new instance with model directory: {model_dir}") cls._instance.model = SentenceTransformer(snapshot_download(model_dir)) print("[BgeEncoder] New instance has been created") return cls._instance def encode( self, sentences: Union[str, List[str]], normalize_embeddings: bool = True, device: str = 'cuda', batch_size: int = 32 ) -> np.ndarray: """ Encode text into embeddings. Args: sentences: Single string or list of strings to encode normalize_embeddings: Whether to normalize embeddings device: Device to use ('cuda' or 'cpu') batch_size: Batch size for encoding Returns: numpy array of shape (n, 1024) containing embeddings """ # Move model to specified device if device == 'gpu': device = 'cuda' # Try requested device, fallback to CPU if CUDA fails try: if device == 'cuda': # Check CUDA memory first import torch if torch.cuda.is_available(): # Check if we have enough memory (at least 1GB free) free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() if free_memory < 1024 * 1024 * 1024: # 1GB print(f"[BgeEncoder] CUDA memory insufficient ({free_memory/1024/1024:.1f}MB free), falling back to CPU") device = 'cpu' else: print(f"[BgeEncoder] CUDA not available, using CPU") device = 'cpu' self.model = self.model.to(device) embeddings = self.model.encode( sentences, normalize_embeddings=normalize_embeddings, device=device, show_progress_bar=False, batch_size=batch_size ) return embeddings except Exception as e: print(f"[BgeEncoder] Device {device} failed: {e}") if device != 'cpu': print(f"[BgeEncoder] Falling back to CPU") try: self.model = self.model.to('cpu') embeddings = self.model.encode( sentences, normalize_embeddings=normalize_embeddings, device='cpu', show_progress_bar=False, batch_size=batch_size ) return embeddings except Exception as e2: print(f"[BgeEncoder] CPU also failed: {e2}") raise else: raise def encode_batch( self, texts: List[str], batch_size: int = 32, device: str = 'cuda' ) -> np.ndarray: """ Encode a batch of texts efficiently. Args: texts: List of texts to encode batch_size: Batch size for processing device: Device to use Returns: numpy array of embeddings """ return self.encode(texts, batch_size=batch_size, device=device)