Blame view

embeddings/text_encoder__local.py 4.01 KB
325eec03   tangwang   1. 日志、配置基础设施,使用优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
  """
  Text embedding encoder using BGE-M3 model.
  
  Generates 1024-dimensional vectors for text using the BGE-M3 multilingual model.
  """
  
  import sys
  import torch
  from sentence_transformers import SentenceTransformer
  import time
  import threading
  from modelscope import snapshot_download
  from transformers import AutoModel
  import os
  import numpy as np
  from typing import List, Union
  
  
  class BgeEncoder:
      """
      Singleton text encoder using BGE-M3 model.
  
      Thread-safe singleton pattern ensures only one model instance exists.
      """
      _instance = None
      _lock = threading.Lock()
  
      def __new__(cls, model_dir='Xorbits/bge-m3'):
          with cls._lock:
              if cls._instance is None:
                  cls._instance = super(BgeEncoder, cls).__new__(cls)
                  print(f"[BgeEncoder] Creating a new instance with model directory: {model_dir}")
                  cls._instance.model = SentenceTransformer(snapshot_download(model_dir))
                  print("[BgeEncoder] New instance has been created")
          return cls._instance
  
      def encode(
          self,
          sentences: Union[str, List[str]],
          normalize_embeddings: bool = True,
          device: str = 'cuda',
          batch_size: int = 32
      ) -> np.ndarray:
          """
          Encode text into embeddings.
  
          Args:
              sentences: Single string or list of strings to encode
              normalize_embeddings: Whether to normalize embeddings
              device: Device to use ('cuda' or 'cpu')
              batch_size: Batch size for encoding
  
          Returns:
              numpy array of shape (n, 1024) containing embeddings
          """
          # Move model to specified device
          if device == 'gpu':
              device = 'cuda'
  
          # Try requested device, fallback to CPU if CUDA fails
          try:
              if device == 'cuda':
                  # Check CUDA memory first
                  import torch
                  if torch.cuda.is_available():
                      # Check if we have enough memory (at least 1GB free)
                      free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()
                      if free_memory < 1024 * 1024 * 1024:  # 1GB
                          print(f"[BgeEncoder] CUDA memory insufficient ({free_memory/1024/1024:.1f}MB free), falling back to CPU")
                          device = 'cpu'
                  else:
                      print(f"[BgeEncoder] CUDA not available, using CPU")
                      device = 'cpu'
  
              self.model = self.model.to(device)
  
              embeddings = self.model.encode(
                  sentences,
                  normalize_embeddings=normalize_embeddings,
                  device=device,
                  show_progress_bar=False,
                  batch_size=batch_size
              )
  
              return embeddings
  
          except Exception as e:
              print(f"[BgeEncoder] Device {device} failed: {e}")
              if device != 'cpu':
                  print(f"[BgeEncoder] Falling back to CPU")
                  try:
                      self.model = self.model.to('cpu')
                      embeddings = self.model.encode(
                          sentences,
                          normalize_embeddings=normalize_embeddings,
                          device='cpu',
                          show_progress_bar=False,
                          batch_size=batch_size
                      )
                      return embeddings
                  except Exception as e2:
                      print(f"[BgeEncoder] CPU also failed: {e2}")
                      raise
              else:
                  raise
  
      def encode_batch(
          self,
          texts: List[str],
          batch_size: int = 32,
          device: str = 'cuda'
      ) -> np.ndarray:
          """
          Encode a batch of texts efficiently.
  
          Args:
              texts: List of texts to encode
              batch_size: Batch size for processing
              device: Device to use
  
          Returns:
              numpy array of embeddings
          """
          return self.encode(texts, batch_size=batch_size, device=device)