text_encoder__local.py 4.01 KB
"""
Text embedding encoder using BGE-M3 model.

Generates 1024-dimensional vectors for text using the BGE-M3 multilingual model.
"""

import sys
import torch
from sentence_transformers import SentenceTransformer
import time
import threading
from modelscope import snapshot_download
from transformers import AutoModel
import os
import numpy as np
from typing import List, Union


class BgeEncoder:
    """
    Singleton text encoder using BGE-M3 model.

    Thread-safe singleton pattern ensures only one model instance exists.
    """
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, model_dir='Xorbits/bge-m3'):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(BgeEncoder, cls).__new__(cls)
                print(f"[BgeEncoder] Creating a new instance with model directory: {model_dir}")
                cls._instance.model = SentenceTransformer(snapshot_download(model_dir))
                print("[BgeEncoder] New instance has been created")
        return cls._instance

    def encode(
        self,
        sentences: Union[str, List[str]],
        normalize_embeddings: bool = True,
        device: str = 'cuda',
        batch_size: int = 32
    ) -> np.ndarray:
        """
        Encode text into embeddings.

        Args:
            sentences: Single string or list of strings to encode
            normalize_embeddings: Whether to normalize embeddings
            device: Device to use ('cuda' or 'cpu')
            batch_size: Batch size for encoding

        Returns:
            numpy array of shape (n, 1024) containing embeddings
        """
        # Move model to specified device
        if device == 'gpu':
            device = 'cuda'

        # Try requested device, fallback to CPU if CUDA fails
        try:
            if device == 'cuda':
                # Check CUDA memory first
                import torch
                if torch.cuda.is_available():
                    # Check if we have enough memory (at least 1GB free)
                    free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()
                    if free_memory < 1024 * 1024 * 1024:  # 1GB
                        print(f"[BgeEncoder] CUDA memory insufficient ({free_memory/1024/1024:.1f}MB free), falling back to CPU")
                        device = 'cpu'
                else:
                    print(f"[BgeEncoder] CUDA not available, using CPU")
                    device = 'cpu'

            self.model = self.model.to(device)

            embeddings = self.model.encode(
                sentences,
                normalize_embeddings=normalize_embeddings,
                device=device,
                show_progress_bar=False,
                batch_size=batch_size
            )

            return embeddings

        except Exception as e:
            print(f"[BgeEncoder] Device {device} failed: {e}")
            if device != 'cpu':
                print(f"[BgeEncoder] Falling back to CPU")
                try:
                    self.model = self.model.to('cpu')
                    embeddings = self.model.encode(
                        sentences,
                        normalize_embeddings=normalize_embeddings,
                        device='cpu',
                        show_progress_bar=False,
                        batch_size=batch_size
                    )
                    return embeddings
                except Exception as e2:
                    print(f"[BgeEncoder] CPU also failed: {e2}")
                    raise
            else:
                raise

    def encode_batch(
        self,
        texts: List[str],
        batch_size: int = 32,
        device: str = 'cuda'
    ) -> np.ndarray:
        """
        Encode a batch of texts efficiently.

        Args:
            texts: List of texts to encode
            batch_size: Batch size for processing
            device: Device to use

        Returns:
            numpy array of embeddings
        """
        return self.encode(texts, batch_size=batch_size, device=device)