text_encoder.py
4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Text embedding encoder using BGE-M3 model.
Generates 1024-dimensional vectors for text using the BGE-M3 multilingual model.
"""
import sys
import torch
from sentence_transformers import SentenceTransformer
import time
import threading
from modelscope import snapshot_download
from transformers import AutoModel
import os
import numpy as np
from typing import List, Union
class BgeEncoder:
"""
Singleton text encoder using BGE-M3 model.
Thread-safe singleton pattern ensures only one model instance exists.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, model_dir='Xorbits/bge-m3'):
with cls._lock:
if cls._instance is None:
cls._instance = super(BgeEncoder, cls).__new__(cls)
print(f"[BgeEncoder] Creating a new instance with model directory: {model_dir}")
cls._instance.model = SentenceTransformer(snapshot_download(model_dir))
print("[BgeEncoder] New instance has been created")
return cls._instance
def encode(
self,
sentences: Union[str, List[str]],
normalize_embeddings: bool = True,
device: str = 'cuda',
batch_size: int = 32
) -> np.ndarray:
"""
Encode text into embeddings.
Args:
sentences: Single string or list of strings to encode
normalize_embeddings: Whether to normalize embeddings
device: Device to use ('cuda' or 'cpu')
batch_size: Batch size for encoding
Returns:
numpy array of shape (n, 1024) containing embeddings
"""
# Move model to specified device
if device == 'gpu':
device = 'cuda'
# Try requested device, fallback to CPU if CUDA fails
try:
if device == 'cuda':
# Check CUDA memory first
import torch
if torch.cuda.is_available():
# Check if we have enough memory (at least 1GB free)
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()
if free_memory < 1024 * 1024 * 1024: # 1GB
print(f"[BgeEncoder] CUDA memory insufficient ({free_memory/1024/1024:.1f}MB free), falling back to CPU")
device = 'cpu'
else:
print(f"[BgeEncoder] CUDA not available, using CPU")
device = 'cpu'
self.model = self.model.to(device)
embeddings = self.model.encode(
sentences,
normalize_embeddings=normalize_embeddings,
device=device,
show_progress_bar=False,
batch_size=batch_size
)
return embeddings
except Exception as e:
print(f"[BgeEncoder] Device {device} failed: {e}")
if device != 'cpu':
print(f"[BgeEncoder] Falling back to CPU")
try:
self.model = self.model.to('cpu')
embeddings = self.model.encode(
sentences,
normalize_embeddings=normalize_embeddings,
device='cpu',
show_progress_bar=False,
batch_size=batch_size
)
return embeddings
except Exception as e2:
print(f"[BgeEncoder] CPU also failed: {e2}")
raise
else:
raise
def encode_batch(
self,
texts: List[str],
batch_size: int = 32,
device: str = 'cuda'
) -> np.ndarray:
"""
Encode a batch of texts efficiently.
Args:
texts: List of texts to encode
batch_size: Batch size for processing
device: Device to use
Returns:
numpy array of embeddings
"""
return self.encode(texts, batch_size=batch_size, device=device)