Blame view

embeddings/image_encoder.py 13 KB
950a640e   tangwang   embeddings
1
  """Image embedding client for the local embedding HTTP service."""
be52af70   tangwang   first commit
2
  
950a640e   tangwang   embeddings
3
4
5
  import logging
  from typing import Any, List, Optional, Union
  
be52af70   tangwang   first commit
6
  import numpy as np
950a640e   tangwang   embeddings
7
  import requests
be52af70   tangwang   first commit
8
  from PIL import Image
be52af70   tangwang   first commit
9
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
10
  logger = logging.getLogger(__name__)
be52af70   tangwang   first commit
11
  
86d8358b   tangwang   config optimize
12
  from config.loader import get_app_config
7a013ca7   tangwang   多模态文本向量服务ok
13
14
  from config.services_config import get_embedding_image_backend_config, get_embedding_image_base_url
  from embeddings.cache_keys import build_clip_text_cache_key, build_image_cache_key
5a01af3c   tangwang   多模态hashkey调整:1. 加...
15
  from embeddings.config import CONFIG
4a37d233   tangwang   1. embedding cach...
16
  from embeddings.redis_embedding_cache import RedisEmbeddingCache
4650fcec   tangwang   日志优化、日志串联(uid rqid)
17
  from request_log_context import build_downstream_request_headers, build_request_log_extra
42e3aea6   tangwang   tidy
18
  
be52af70   tangwang   first commit
19
20
21
  
  class CLIPImageEncoder:
      """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
22
      Image Encoder for generating image embeddings using network service.
be52af70   tangwang   first commit
23
  
950a640e   tangwang   embeddings
24
      This client is stateless and safe to instantiate per caller.
be52af70   tangwang   first commit
25
26
      """
  
950a640e   tangwang   embeddings
27
      def __init__(self, service_url: Optional[str] = None):
af03fdef   tangwang   embedding模块代码整理
28
          resolved_url = service_url or get_embedding_image_base_url()
86d8358b   tangwang   config optimize
29
          redis_config = get_app_config().infrastructure.redis
950a640e   tangwang   embeddings
30
31
          self.service_url = str(resolved_url).rstrip("/")
          self.endpoint = f"{self.service_url}/embed/image"
7a013ca7   tangwang   多模态文本向量服务ok
32
          self.clip_text_endpoint = f"{self.service_url}/embed/clip_text"
4a37d233   tangwang   1. embedding cach...
33
          # Reuse embedding cache prefix, but separate namespace for images to avoid collisions.
86d8358b   tangwang   config optimize
34
          self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
5a01af3c   tangwang   多模态hashkey调整:1. 加...
35
          self._mm_model_name = CONFIG.MULTIMODAL_MODEL_NAME
950a640e   tangwang   embeddings
36
          logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url)
4a37d233   tangwang   1. embedding cach...
37
38
39
40
          self.cache = RedisEmbeddingCache(
              key_prefix=self.cache_prefix,
              namespace="image",
          )
7a013ca7   tangwang   多模态文本向量服务ok
41
42
43
44
          self._clip_text_cache = RedisEmbeddingCache(
              key_prefix=self.cache_prefix,
              namespace="clip_text",
          )
be52af70   tangwang   first commit
45
  
b754fd41   tangwang   图片向量化支持优先级参数
46
47
48
49
50
      def _call_service(
          self,
          request_data: List[str],
          normalize_embeddings: bool = True,
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
51
52
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
b754fd41   tangwang   图片向量化支持优先级参数
53
      ) -> List[Any]:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
54
55
          """
          Call the embedding service API.
be52af70   tangwang   first commit
56
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
57
          Args:
7bfb9946   tangwang   向量化模块
58
              request_data: List of image URLs / local file paths
be52af70   tangwang   first commit
59
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
60
          Returns:
7bfb9946   tangwang   向量化模块
61
              List of embeddings (list[float]) or nulls (None), aligned to input order
325eec03   tangwang   1. 日志、配置基础设施,使用优化
62
          """
4650fcec   tangwang   日志优化、日志串联(uid rqid)
63
          response = None
be52af70   tangwang   first commit
64
          try:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
65
66
              response = requests.post(
                  self.endpoint,
b754fd41   tangwang   图片向量化支持优先级参数
67
68
69
70
                  params={
                      "normalize": "true" if normalize_embeddings else "false",
                      "priority": max(0, int(priority)),
                  },
325eec03   tangwang   1. 日志、配置基础设施,使用优化
71
                  json=request_data,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
72
                  headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
325eec03   tangwang   1. 日志、配置基础设施,使用优化
73
74
75
76
77
                  timeout=60
              )
              response.raise_for_status()
              return response.json()
          except requests.exceptions.RequestException as e:
4650fcec   tangwang   日志优化、日志串联(uid rqid)
78
79
80
81
82
83
84
85
86
87
88
89
90
91
              body_preview = ""
              if response is not None:
                  try:
                      body_preview = (response.text or "")[:300]
                  except Exception:
                      body_preview = ""
              logger.error(
                  "CLIPImageEncoder service request failed | status=%s body=%s error=%s",
                  getattr(response, "status_code", "n/a"),
                  body_preview,
                  e,
                  exc_info=True,
                  extra=build_request_log_extra(request_id=request_id, user_id=user_id),
              )
325eec03   tangwang   1. 日志、配置基础设施,使用优化
92
              raise
be52af70   tangwang   first commit
93
  
7a013ca7   tangwang   多模态文本向量服务ok
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
      def _clip_text_via_grpc(
          self,
          request_data: List[str],
          normalize_embeddings: bool,
      ) -> List[Any]:
          """旧版 6008 无 ``/embed/clip_text`` 时走 gRPC(需 ``image_backend: clip_as_service``)。"""
          backend, cfg = get_embedding_image_backend_config()
          if backend != "clip_as_service":
              raise RuntimeError(
                  "POST /embed/clip_text 返回 404:请重启图片向量服务(6008)以加载新路由;"
                  "或配置 services.embedding.image_backend=clip_as_service 并启动 grpc cnclip。"
              )
          from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
          from embeddings.config import CONFIG
  
          enc = ClipAsServiceImageEncoder(
              server=str(cfg.get("server") or CONFIG.CLIP_AS_SERVICE_SERVER),
              batch_size=int(cfg.get("batch_size") or CONFIG.IMAGE_BATCH_SIZE),
          )
          arrs = enc.encode_clip_texts(
              request_data,
              batch_size=len(request_data),
              normalize_embeddings=normalize_embeddings,
          )
          return [v.tolist() for v in arrs]
  
      def _call_clip_text_service(
          self,
          request_data: List[str],
          normalize_embeddings: bool = True,
          priority: int = 1,
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
      ) -> List[Any]:
          response = None
          try:
              response = requests.post(
                  self.clip_text_endpoint,
                  params={
                      "normalize": "true" if normalize_embeddings else "false",
                      "priority": max(0, int(priority)),
                  },
                  json=request_data,
                  headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
                  timeout=60,
              )
              if response.status_code == 404:
                  logger.warning(
                      "POST %s returned 404; using clip-as-service gRPC fallback (restart 6008 after deploy to use HTTP)",
                      self.clip_text_endpoint,
                  )
                  return self._clip_text_via_grpc(request_data, normalize_embeddings)
              response.raise_for_status()
              return response.json()
          except requests.exceptions.RequestException as e:
              body_preview = ""
              if response is not None:
                  try:
                      body_preview = (response.text or "")[:300]
                  except Exception:
                      body_preview = ""
              logger.error(
                  "CLIPImageEncoder clip_text request failed | status=%s body=%s error=%s",
                  getattr(response, "status_code", "n/a"),
                  body_preview,
                  e,
                  exc_info=True,
                  extra=build_request_log_extra(request_id=request_id, user_id=user_id),
              )
              raise
  
      def encode_clip_text(
          self,
          text: str,
          normalize_embeddings: bool = True,
          priority: int = 1,
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
      ) -> np.ndarray:
          """
          CN-CLIP 文本塔(与 ``/embed/image`` 同向量空间),对应服务端 ``POST /embed/clip_text``
          """
5a01af3c   tangwang   多模态hashkey调整:1. 加...
176
177
178
          cache_key = build_clip_text_cache_key(
              text, normalize=normalize_embeddings, model_name=self._mm_model_name
          )
7a013ca7   tangwang   多模态文本向量服务ok
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
          cached = self._clip_text_cache.get(cache_key)
          if cached is not None:
              return cached
  
          response_data = self._call_clip_text_service(
              [text.strip()],
              normalize_embeddings=normalize_embeddings,
              priority=priority,
              request_id=request_id,
              user_id=user_id,
          )
          if not response_data or len(response_data) != 1 or response_data[0] is None:
              raise RuntimeError(f"No CLIP text embedding returned for: {text[:80]!r}")
          vec = np.array(response_data[0], dtype=np.float32)
          if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
              raise RuntimeError("Invalid CLIP text embedding returned")
          self._clip_text_cache.set(cache_key, vec)
          return vec
  
ed948666   tangwang   tidy
198
      def encode_image(self, image: Image.Image) -> np.ndarray:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
199
200
          """
          Encode image to embedding vector using network service.
be52af70   tangwang   first commit
201
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
202
203
          Note: This method is kept for compatibility but the service only works with URLs.
          """
ed948666   tangwang   tidy
204
          raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
be52af70   tangwang   first commit
205
  
b754fd41   tangwang   图片向量化支持优先级参数
206
207
208
209
210
      def encode_image_from_url(
          self,
          url: str,
          normalize_embeddings: bool = True,
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
211
212
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
b754fd41   tangwang   图片向量化支持优先级参数
213
      ) -> np.ndarray:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
214
215
          """
          Generate image embedding via network service using URL.
be52af70   tangwang   first commit
216
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
217
218
          Args:
              url: Image URL to process
be52af70   tangwang   first commit
219
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
220
          Returns:
ed948666   tangwang   tidy
221
              Embedding vector
325eec03   tangwang   1. 日志、配置基础设施,使用优化
222
          """
5a01af3c   tangwang   多模态hashkey调整:1. 加...
223
224
225
          cache_key = build_image_cache_key(
              url, normalize=normalize_embeddings, model_name=self._mm_model_name
          )
7214c2e7   tangwang   mplemented**
226
          cached = self.cache.get(cache_key)
4a37d233   tangwang   1. embedding cach...
227
228
229
          if cached is not None:
              return cached
  
b754fd41   tangwang   图片向量化支持优先级参数
230
231
232
233
          response_data = self._call_service(
              [url],
              normalize_embeddings=normalize_embeddings,
              priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
234
235
              request_id=request_id,
              user_id=user_id,
b754fd41   tangwang   图片向量化支持优先级参数
236
          )
ed948666   tangwang   tidy
237
238
239
240
241
          if not response_data or len(response_data) != 1 or response_data[0] is None:
              raise RuntimeError(f"No image embedding returned for URL: {url}")
          vec = np.array(response_data[0], dtype=np.float32)
          if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
              raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
7214c2e7   tangwang   mplemented**
242
          self.cache.set(cache_key, vec)
ed948666   tangwang   tidy
243
          return vec
be52af70   tangwang   first commit
244
245
246
247
  
      def encode_batch(
          self,
          images: List[Union[str, Image.Image]],
200fdddf   tangwang   embed norm
248
249
          batch_size: int = 8,
          normalize_embeddings: bool = True,
b754fd41   tangwang   图片向量化支持优先级参数
250
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
251
252
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
ed948666   tangwang   tidy
253
      ) -> List[np.ndarray]:
be52af70   tangwang   first commit
254
          """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
255
          Encode a batch of images efficiently via network service.
be52af70   tangwang   first commit
256
257
258
  
          Args:
              images: List of image URLs or PIL Images
325eec03   tangwang   1. 日志、配置基础设施,使用优化
259
              batch_size: Batch size for processing (used for service requests)
be52af70   tangwang   first commit
260
261
  
          Returns:
ed948666   tangwang   tidy
262
              List of embeddings
be52af70   tangwang   first commit
263
          """
325eec03   tangwang   1. 日志、配置基础设施,使用优化
264
          for i, img in enumerate(images):
ed948666   tangwang   tidy
265
266
267
268
269
270
              if isinstance(img, Image.Image):
                  raise NotImplementedError(f"PIL Image at index {i} is not supported by service")
              if not isinstance(img, str) or not img.strip():
                  raise ValueError(f"Invalid image URL/path at index {i}: {img!r}")
  
          results: List[np.ndarray] = []
4a37d233   tangwang   1. embedding cach...
271
272
273
274
275
          pending_urls: List[str] = []
          pending_positions: List[int] = []
  
          normalized_urls = [str(u).strip() for u in images]  # type: ignore[list-item]
          for pos, url in enumerate(normalized_urls):
5a01af3c   tangwang   多模态hashkey调整:1. 加...
276
277
278
              cache_key = build_image_cache_key(
                  url, normalize=normalize_embeddings, model_name=self._mm_model_name
              )
7214c2e7   tangwang   mplemented**
279
              cached = self.cache.get(cache_key)
4a37d233   tangwang   1. embedding cach...
280
281
              if cached is not None:
                  results.append(cached)
5bac9649   tangwang   文本 embedding 与图片 ...
282
283
284
285
                  continue
              results.append(np.array([], dtype=np.float32))  # placeholder
              pending_positions.append(pos)
              pending_urls.append(url)
4a37d233   tangwang   1. embedding cach...
286
287
288
  
          for i in range(0, len(pending_urls), batch_size):
              batch_urls = pending_urls[i : i + batch_size]
b754fd41   tangwang   图片向量化支持优先级参数
289
290
291
292
              response_data = self._call_service(
                  batch_urls,
                  normalize_embeddings=normalize_embeddings,
                  priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
293
294
                  request_id=request_id,
                  user_id=user_id,
b754fd41   tangwang   图片向量化支持优先级参数
295
              )
ed948666   tangwang   tidy
296
297
298
299
300
301
302
303
304
305
306
307
              if not response_data or len(response_data) != len(batch_urls):
                  raise RuntimeError(
                      f"Image embedding response length mismatch: expected {len(batch_urls)}, "
                      f"got {0 if response_data is None else len(response_data)}"
                  )
              for j, url in enumerate(batch_urls):
                  embedding = response_data[j]
                  if embedding is None:
                      raise RuntimeError(f"No image embedding returned for URL: {url}")
                  vec = np.array(embedding, dtype=np.float32)
                  if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
                      raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
5a01af3c   tangwang   多模态hashkey调整:1. 加...
308
309
310
311
312
313
                  self.cache.set(
                      build_image_cache_key(
                          url, normalize=normalize_embeddings, model_name=self._mm_model_name
                      ),
                      vec,
                  )
4a37d233   tangwang   1. embedding cach...
314
315
                  pos = pending_positions[i + j]
                  results[pos] = vec
be52af70   tangwang   first commit
316
317
  
          return results
e7a2c0b7   tangwang   img encode
318
319
320
321
322
  
      def encode_image_urls(
          self,
          urls: List[str],
          batch_size: Optional[int] = None,
200fdddf   tangwang   embed norm
323
          normalize_embeddings: bool = True,
b754fd41   tangwang   图片向量化支持优先级参数
324
          priority: int = 0,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
325
326
          request_id: Optional[str] = None,
          user_id: Optional[str] = None,
ed948666   tangwang   tidy
327
      ) -> List[np.ndarray]:
e7a2c0b7   tangwang   img encode
328
329
330
331
332
333
334
335
          """
           ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
  
          Args:
              urls: 图片 URL 列表
              batch_size: 批大小(默认 8
  
          Returns:
ed948666   tangwang   tidy
336
               urls 等长的向量列表
e7a2c0b7   tangwang   img encode
337
          """
200fdddf   tangwang   embed norm
338
339
340
341
          return self.encode_batch(
              urls,
              batch_size=batch_size or 8,
              normalize_embeddings=normalize_embeddings,
b754fd41   tangwang   图片向量化支持优先级参数
342
              priority=priority,
4650fcec   tangwang   日志优化、日志串联(uid rqid)
343
344
              request_id=request_id,
              user_id=user_id,
200fdddf   tangwang   embed norm
345
          )