Blame view

embeddings/cloud_text_encoder.py 4.6 KB
8c503501   tangwang   补充基于阿里云的embedding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
  """
  Text embedding encoder using Aliyun DashScope API.
  
  Generates embeddings via Aliyun's text-embedding-v4 model.
  """
  
  import os
  import logging
  import threading
  import time
  import numpy as np
  from typing import List, Union
  from openai import OpenAI
  
  logger = logging.getLogger(__name__)
  
  
  class CloudTextEncoder:
      """
      Singleton text encoder using Aliyun DashScope API.
      
      Thread-safe singleton pattern ensures only one instance exists.
      Uses text-embedding-v4 model for generating embeddings.
      """
      _instance = None
      _lock = threading.Lock()
  
      def __new__(cls, api_key: str = None, base_url: str = None):
          with cls._lock:
              if cls._instance is None:
                  cls._instance = super(CloudTextEncoder, cls).__new__(cls)
                  
                  # Get API key from parameter or environment variable
                  api_key = api_key or os.getenv("DASHSCOPE_API_KEY")
                  if not api_key:
                      raise ValueError("DASHSCOPE_API_KEY must be set in environment or passed as parameter")
                  
                  # Use Beijing region by default
                  base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
                  
                  cls._instance.client = OpenAI(
                      api_key=api_key,
                      base_url=base_url
                  )
                  cls._instance.model = "text-embedding-v4"
                  logger.info(f"Created CloudTextEncoder instance with base_url: {base_url}")
                  
          return cls._instance
  
      def encode(
          self,
          sentences: Union[str, List[str]],
          normalize_embeddings: bool = True,
          device: str = 'cpu',
          batch_size: int = 32
      ) -> np.ndarray:
          """
          Encode text into embeddings via Aliyun DashScope API.
  
          Args:
              sentences: Single string or list of strings to encode
              normalize_embeddings: Whether to normalize embeddings (handled by API)
              device: Device parameter (ignored, for compatibility)
              batch_size: Batch size for processing (currently processes all at once)
  
          Returns:
              numpy array of shape (n, dimension) containing embeddings
          """
          # Convert single string to list
          if isinstance(sentences, str):
              sentences = [sentences]
          
          if not sentences:
              return np.array([])
  
          try:
              # Call DashScope API
              start_time = time.time()
              completion = self.client.embeddings.create(
                  model=self.model,
                  input=sentences
              )
              elapsed_time = time.time() - start_time
              
              logger.info(f"Generated embeddings for {len(sentences)} texts in {elapsed_time:.3f}s")
              
              # Extract embeddings from response
              embeddings = []
              for item in completion.data:
                  embeddings.append(item.embedding)
              
              return np.array(embeddings, dtype=np.float32)
              
          except Exception as e:
              logger.error(f"Failed to encode texts via DashScope API: {e}", exc_info=True)
              # Return zero embeddings as fallback (dimension based on text-embedding-v4)
              # text-embedding-v4 typically returns 1024-dimensional vectors
              return np.zeros((len(sentences), 1024), dtype=np.float32)
  
      def encode_batch(
          self,
          texts: List[str],
          batch_size: int = 32,
          device: str = 'cpu'
      ) -> np.ndarray:
          """
          Encode a batch of texts via Aliyun DashScope API.
  
          Args:
              texts: List of texts to encode
              batch_size: Batch size for processing
              device: Device parameter (ignored, for compatibility)
  
          Returns:
              numpy array of embeddings
          """
          if not texts:
              return np.array([])
          
          # Process in batches to avoid API limits
          all_embeddings = []
          
          for i in range(0, len(texts), batch_size):
              batch = texts[i:i + batch_size]
              embeddings = self.encode(batch, device=device)
              all_embeddings.append(embeddings)
              
              # Small delay to avoid rate limiting
              if i + batch_size < len(texts):
                  time.sleep(0.1)
          
          return np.vstack(all_embeddings) if all_embeddings else np.array([])
  
      def get_embedding_dimension(self) -> int:
          """
          Get the dimension of embeddings produced by this encoder.
          
          Returns:
              Embedding dimension (1024 for text-embedding-v4)
          """
          return 1024