Blame view

embeddings/image_encoder.py 8.23 KB
950a640e   tangwang   embeddings
1
  """Image embedding client for the local embedding HTTP service."""
be52af70   tangwang   first commit
2
  
950a640e   tangwang   embeddings
3
4
5
  import logging
  from typing import Any, List, Optional, Union
  
be52af70   tangwang   first commit
6
  import numpy as np
950a640e   tangwang   embeddings
7
  import requests
be52af70   tangwang   first commit
8
  from PIL import Image
be52af70   tangwang   first commit
9
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
10
  logger = logging.getLogger(__name__)
be52af70   tangwang   first commit
11
  
86d8358b   tangwang   config optimize
12
  from config.loader import get_app_config
7214c2e7   tangwang   mplemented**
13
  from config.services_config import get_embedding_image_base_url
7214c2e7   tangwang   mplemented**
14
  from embeddings.cache_keys import build_image_cache_key
4a37d233   tangwang   1. embedding cach...
15
  from embeddings.redis_embedding_cache import RedisEmbeddingCache
4650fcec   tangwang   日志优化、日志串联(uid rqid)
16
  from request_log_context import build_downstream_request_headers, build_request_log_extra
42e3aea6   tangwang   tidy
17
  
be52af70   tangwang   first commit
18
19
20
  
  class CLIPImageEncoder:
      """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
21
      Image Encoder for generating image embeddings using network service.
be52af70   tangwang   first commit
22
  
950a640e   tangwang   embeddings
23
      This client is stateless and safe to instantiate per caller.
be52af70   tangwang   first commit
24
25
      """
  
950a640e   tangwang   embeddings
26
      def __init__(self, service_url: Optional[str] = None):
af03fdef   tangwang   embedding模块代码整理
27
          resolved_url = service_url or get_embedding_image_base_url()
86d8358b   tangwang   config optimize
28
          redis_config = get_app_config().infrastructure.redis
950a640e   tangwang   embeddings
29
30
          self.service_url = str(resolved_url).rstrip("/")
          self.endpoint = f"{self.service_url}/embed/image"
4a37d233   tangwang   1. embedding cach...
31
          # Reuse embedding cache prefix, but separate namespace for images to avoid collisions.
86d8358b   tangwang   config optimize
32
          self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
950a640e   tangwang   embeddings
33
          logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url)
4a37d233   tangwang   1. embedding cach...
34
35
36
37
          self.cache = RedisEmbeddingCache(
              key_prefix=self.cache_prefix,
              namespace="image",
          )
be52af70   tangwang   first commit
38
  
b754fd41   tangwang   图片向量化支持优先级参数
39
40
41
42
43
      def _call_service(
          self,
          request_data: List[str],
          normalize_embeddings: bool = True,
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
44
45
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
b754fd41   tangwang   图片向量化支持优先级参数
46
      ) -> List[Any]:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
47
48
          """
          Call the embedding service API.
be52af70   tangwang   first commit
49
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
50
          Args:
7bfb9946   tangwang   向量化模块
51
              request_data: List of image URLs / local file paths
be52af70   tangwang   first commit
52
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
53
          Returns:
7bfb9946   tangwang   向量化模块
54
              List of embeddings (list[float]) or nulls (None), aligned to input order
325eec03   tangwang   1. 日志、配置基础设施,使用优化
55
          """
4650fcec   tangwang   日志优化、日志串联(uid rqid)
56
          response = None
be52af70   tangwang   first commit
57
          try:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
58
59
              response = requests.post(
                  self.endpoint,
b754fd41   tangwang   图片向量化支持优先级参数
60
61
62
63
                  params={
                      "normalize": "true" if normalize_embeddings else "false",
                      "priority": max(0, int(priority)),
                  },
325eec03   tangwang   1. 日志、配置基础设施,使用优化
64
                  json=request_data,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
65
                  headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
325eec03   tangwang   1. 日志、配置基础设施,使用优化
66
67
68
69
70
                  timeout=60
              )
              response.raise_for_status()
              return response.json()
          except requests.exceptions.RequestException as e:
4650fcec   tangwang   日志优化、日志串联(uid rqid)
71
72
73
74
75
76
77
78
79
80
81
82
83
84
              body_preview = ""
              if response is not None:
                  try:
                      body_preview = (response.text or "")[:300]
                  except Exception:
                      body_preview = ""
              logger.error(
                  "CLIPImageEncoder service request failed | status=%s body=%s error=%s",
                  getattr(response, "status_code", "n/a"),
                  body_preview,
                  e,
                  exc_info=True,
                  extra=build_request_log_extra(request_id=request_id, user_id=user_id),
              )
325eec03   tangwang   1. 日志、配置基础设施,使用优化
85
              raise
be52af70   tangwang   first commit
86
  
ed948666   tangwang   tidy
87
      def encode_image(self, image: Image.Image) -> np.ndarray:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
88
89
          """
          Encode image to embedding vector using network service.
be52af70   tangwang   first commit
90
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
91
92
          Note: This method is kept for compatibility but the service only works with URLs.
          """
ed948666   tangwang   tidy
93
          raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
be52af70   tangwang   first commit
94
  
b754fd41   tangwang   图片向量化支持优先级参数
95
96
97
98
99
      def encode_image_from_url(
          self,
          url: str,
          normalize_embeddings: bool = True,
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
100
101
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
b754fd41   tangwang   图片向量化支持优先级参数
102
      ) -> np.ndarray:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
103
104
          """
          Generate image embedding via network service using URL.
be52af70   tangwang   first commit
105
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
106
107
          Args:
              url: Image URL to process
be52af70   tangwang   first commit
108
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
109
          Returns:
ed948666   tangwang   tidy
110
              Embedding vector
325eec03   tangwang   1. 日志、配置基础设施,使用优化
111
          """
7214c2e7   tangwang   mplemented**
112
113
          cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
          cached = self.cache.get(cache_key)
4a37d233   tangwang   1. embedding cach...
114
115
116
          if cached is not None:
              return cached
  
b754fd41   tangwang   图片向量化支持优先级参数
117
118
119
120
          response_data = self._call_service(
              [url],
              normalize_embeddings=normalize_embeddings,
              priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
121
122
              request_id=request_id,
              user_id=user_id,
b754fd41   tangwang   图片向量化支持优先级参数
123
          )
ed948666   tangwang   tidy
124
125
126
127
128
          if not response_data or len(response_data) != 1 or response_data[0] is None:
              raise RuntimeError(f"No image embedding returned for URL: {url}")
          vec = np.array(response_data[0], dtype=np.float32)
          if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
              raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
7214c2e7   tangwang   mplemented**
129
          self.cache.set(cache_key, vec)
ed948666   tangwang   tidy
130
          return vec
be52af70   tangwang   first commit
131
132
133
134
  
      def encode_batch(
          self,
          images: List[Union[str, Image.Image]],
200fdddf   tangwang   embed norm
135
136
          batch_size: int = 8,
          normalize_embeddings: bool = True,
b754fd41   tangwang   图片向量化支持优先级参数
137
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
138
139
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
ed948666   tangwang   tidy
140
      ) -> List[np.ndarray]:
be52af70   tangwang   first commit
141
          """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
142
          Encode a batch of images efficiently via network service.
be52af70   tangwang   first commit
143
144
145
  
          Args:
              images: List of image URLs or PIL Images
325eec03   tangwang   1. 日志、配置基础设施,使用优化
146
              batch_size: Batch size for processing (used for service requests)
be52af70   tangwang   first commit
147
148
  
          Returns:
ed948666   tangwang   tidy
149
              List of embeddings
be52af70   tangwang   first commit
150
          """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
151
          for i, img in enumerate(images):
ed948666   tangwang   tidy
152
153
154
155
156
157
              if isinstance(img, Image.Image):
                  raise NotImplementedError(f"PIL Image at index {i} is not supported by service")
              if not isinstance(img, str) or not img.strip():
                  raise ValueError(f"Invalid image URL/path at index {i}: {img!r}")
  
          results: List[np.ndarray] = []
4a37d233   tangwang   1. embedding cach...
158
159
160
161
162
          pending_urls: List[str] = []
          pending_positions: List[int] = []
  
          normalized_urls = [str(u).strip() for u in images]  # type: ignore[list-item]
          for pos, url in enumerate(normalized_urls):
7214c2e7   tangwang   mplemented**
163
164
              cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
              cached = self.cache.get(cache_key)
4a37d233   tangwang   1. embedding cach...
165
166
              if cached is not None:
                  results.append(cached)
5bac9649   tangwang   文本 embedding 与图片 ...
167
168
169
170
                  continue
              results.append(np.array([], dtype=np.float32))  # placeholder
              pending_positions.append(pos)
              pending_urls.append(url)
4a37d233   tangwang   1. embedding cach...
171
172
173
  
          for i in range(0, len(pending_urls), batch_size):
              batch_urls = pending_urls[i : i + batch_size]
b754fd41   tangwang   图片向量化支持优先级参数
174
175
176
177
              response_data = self._call_service(
                  batch_urls,
                  normalize_embeddings=normalize_embeddings,
                  priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
178
179
                  request_id=request_id,
                  user_id=user_id,
b754fd41   tangwang   图片向量化支持优先级参数
180
              )
ed948666   tangwang   tidy
181
182
183
184
185
186
187
188
189
190
191
192
              if not response_data or len(response_data) != len(batch_urls):
                  raise RuntimeError(
                      f"Image embedding response length mismatch: expected {len(batch_urls)}, "
                      f"got {0 if response_data is None else len(response_data)}"
                  )
              for j, url in enumerate(batch_urls):
                  embedding = response_data[j]
                  if embedding is None:
                      raise RuntimeError(f"No image embedding returned for URL: {url}")
                  vec = np.array(embedding, dtype=np.float32)
                  if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
                      raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
7214c2e7   tangwang   mplemented**
193
                  self.cache.set(build_image_cache_key(url, normalize=normalize_embeddings), vec)
4a37d233   tangwang   1. embedding cach...
194
195
                  pos = pending_positions[i + j]
                  results[pos] = vec
be52af70   tangwang   first commit
196
197
  
          return results
e7a2c0b7   tangwang   img encode
198
199
200
201
202
  
      def encode_image_urls(
          self,
          urls: List[str],
          batch_size: Optional[int] = None,
200fdddf   tangwang   embed norm
203
          normalize_embeddings: bool = True,
b754fd41   tangwang   图片向量化支持优先级参数
204
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
205
206
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
ed948666   tangwang   tidy
207
      ) -> List[np.ndarray]:
e7a2c0b7   tangwang   img encode
208
209
210
211
212
213
214
215
          """
           ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
  
          Args:
              urls: 图片 URL 列表
              batch_size: 批大小(默认 8
  
          Returns:
ed948666   tangwang   tidy
216
               urls 等长的向量列表
e7a2c0b7   tangwang   img encode
217
          """
200fdddf   tangwang   embed norm
218
219
220
221
          return self.encode_batch(
              urls,
              batch_size=batch_size or 8,
              normalize_embeddings=normalize_embeddings,
b754fd41   tangwang   图片向量化支持优先级参数
222
              priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
223
224
              request_id=request_id,
              user_id=user_id,
200fdddf   tangwang   embed norm
225
          )