950a640e
tangwang
embeddings
|
1
|
"""Image embedding client for the local embedding HTTP service."""
|
be52af70
tangwang
first commit
|
2
|
|
950a640e
tangwang
embeddings
|
3
4
5
|
import logging
from typing import Any, List, Optional, Union
|
be52af70
tangwang
first commit
|
6
|
import numpy as np
|
950a640e
tangwang
embeddings
|
7
|
import requests
|
be52af70
tangwang
first commit
|
8
|
from PIL import Image
|
be52af70
tangwang
first commit
|
9
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
10
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
11
|
|
86d8358b
tangwang
config optimize
|
12
|
from config.loader import get_app_config
|
7a013ca7
tangwang
多模态文本向量服务ok
|
13
14
|
from config.services_config import get_embedding_image_backend_config, get_embedding_image_base_url
from embeddings.cache_keys import build_clip_text_cache_key, build_image_cache_key
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
15
|
from embeddings.config import CONFIG
|
4a37d233
tangwang
1. embedding cach...
|
16
|
from embeddings.redis_embedding_cache import RedisEmbeddingCache
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
17
|
from request_log_context import build_downstream_request_headers, build_request_log_extra
|
42e3aea6
tangwang
tidy
|
18
|
|
be52af70
tangwang
first commit
|
19
20
21
|
class CLIPImageEncoder:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
22
|
Image Encoder for generating image embeddings using network service.
|
be52af70
tangwang
first commit
|
23
|
|
950a640e
tangwang
embeddings
|
24
|
This client is stateless and safe to instantiate per caller.
|
be52af70
tangwang
first commit
|
25
26
|
"""
|
950a640e
tangwang
embeddings
|
27
|
def __init__(self, service_url: Optional[str] = None):
|
af03fdef
tangwang
embedding模块代码整理
|
28
|
resolved_url = service_url or get_embedding_image_base_url()
|
86d8358b
tangwang
config optimize
|
29
|
redis_config = get_app_config().infrastructure.redis
|
950a640e
tangwang
embeddings
|
30
31
|
self.service_url = str(resolved_url).rstrip("/")
self.endpoint = f"{self.service_url}/embed/image"
|
7a013ca7
tangwang
多模态文本向量服务ok
|
32
|
self.clip_text_endpoint = f"{self.service_url}/embed/clip_text"
|
4a37d233
tangwang
1. embedding cach...
|
33
|
# Reuse embedding cache prefix, but separate namespace for images to avoid collisions.
|
86d8358b
tangwang
config optimize
|
34
|
self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
35
|
self._mm_model_name = CONFIG.MULTIMODAL_MODEL_NAME
|
950a640e
tangwang
embeddings
|
36
|
logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url)
|
4a37d233
tangwang
1. embedding cach...
|
37
38
39
40
|
self.cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="image",
)
|
7a013ca7
tangwang
多模态文本向量服务ok
|
41
42
43
44
|
self._clip_text_cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="clip_text",
)
|
be52af70
tangwang
first commit
|
45
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
46
47
48
49
50
|
def _call_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
51
52
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
53
|
) -> List[Any]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
54
55
|
"""
Call the embedding service API.
|
be52af70
tangwang
first commit
|
56
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
57
|
Args:
|
7bfb9946
tangwang
向量化模块
|
58
|
request_data: List of image URLs / local file paths
|
be52af70
tangwang
first commit
|
59
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
60
|
Returns:
|
7bfb9946
tangwang
向量化模块
|
61
|
List of embeddings (list[float]) or nulls (None), aligned to input order
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
62
|
"""
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
63
|
response = None
|
be52af70
tangwang
first commit
|
64
|
try:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
65
66
|
response = requests.post(
self.endpoint,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
67
68
69
70
|
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
71
|
json=request_data,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
72
|
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
73
74
75
76
77
|
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"CLIPImageEncoder service request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
92
|
raise
|
be52af70
tangwang
first commit
|
93
|
|
7a013ca7
tangwang
多模态文本向量服务ok
|
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
def _clip_text_via_grpc(
self,
request_data: List[str],
normalize_embeddings: bool,
) -> List[Any]:
"""旧版 6008 无 ``/embed/clip_text`` 时走 gRPC(需 ``image_backend: clip_as_service``)。"""
backend, cfg = get_embedding_image_backend_config()
if backend != "clip_as_service":
raise RuntimeError(
"POST /embed/clip_text 返回 404:请重启图片向量服务(6008)以加载新路由;"
"或配置 services.embedding.image_backend=clip_as_service 并启动 grpc cnclip。"
)
from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
from embeddings.config import CONFIG
enc = ClipAsServiceImageEncoder(
server=str(cfg.get("server") or CONFIG.CLIP_AS_SERVICE_SERVER),
batch_size=int(cfg.get("batch_size") or CONFIG.IMAGE_BATCH_SIZE),
)
arrs = enc.encode_clip_texts(
request_data,
batch_size=len(request_data),
normalize_embeddings=normalize_embeddings,
)
return [v.tolist() for v in arrs]
def _call_clip_text_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 1,
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> List[Any]:
response = None
try:
response = requests.post(
self.clip_text_endpoint,
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
json=request_data,
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
timeout=60,
)
if response.status_code == 404:
logger.warning(
"POST %s returned 404; using clip-as-service gRPC fallback (restart 6008 after deploy to use HTTP)",
self.clip_text_endpoint,
)
return self._clip_text_via_grpc(request_data, normalize_embeddings)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"CLIPImageEncoder clip_text request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
raise
def encode_clip_text(
self,
text: str,
normalize_embeddings: bool = True,
priority: int = 1,
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> np.ndarray:
"""
CN-CLIP 文本塔(与 ``/embed/image`` 同向量空间),对应服务端 ``POST /embed/clip_text``。
"""
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
176
177
178
|
cache_key = build_clip_text_cache_key(
text, normalize=normalize_embeddings, model_name=self._mm_model_name
)
|
7a013ca7
tangwang
多模态文本向量服务ok
|
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
cached = self._clip_text_cache.get(cache_key)
if cached is not None:
return cached
response_data = self._call_clip_text_service(
[text.strip()],
normalize_embeddings=normalize_embeddings,
priority=priority,
request_id=request_id,
user_id=user_id,
)
if not response_data or len(response_data) != 1 or response_data[0] is None:
raise RuntimeError(f"No CLIP text embedding returned for: {text[:80]!r}")
vec = np.array(response_data[0], dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError("Invalid CLIP text embedding returned")
self._clip_text_cache.set(cache_key, vec)
return vec
|
ed948666
tangwang
tidy
|
198
|
def encode_image(self, image: Image.Image) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
199
200
|
"""
Encode image to embedding vector using network service.
|
be52af70
tangwang
first commit
|
201
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
202
203
|
Note: This method is kept for compatibility but the service only works with URLs.
"""
|
ed948666
tangwang
tidy
|
204
|
raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
|
be52af70
tangwang
first commit
|
205
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
206
207
208
209
210
|
def encode_image_from_url(
self,
url: str,
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
211
212
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
213
|
) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
214
215
|
"""
Generate image embedding via network service using URL.
|
be52af70
tangwang
first commit
|
216
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
217
218
|
Args:
url: Image URL to process
|
be52af70
tangwang
first commit
|
219
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
220
|
Returns:
|
ed948666
tangwang
tidy
|
221
|
Embedding vector
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
222
|
"""
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
223
224
225
|
cache_key = build_image_cache_key(
url, normalize=normalize_embeddings, model_name=self._mm_model_name
)
|
7214c2e7
tangwang
mplemented**
|
226
|
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
227
228
229
|
if cached is not None:
return cached
|
b754fd41
tangwang
图片向量化支持优先级参数
|
230
231
232
233
|
response_data = self._call_service(
[url],
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
234
235
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
236
|
)
|
ed948666
tangwang
tidy
|
237
238
239
240
241
|
if not response_data or len(response_data) != 1 or response_data[0] is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(response_data[0], dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
7214c2e7
tangwang
mplemented**
|
242
|
self.cache.set(cache_key, vec)
|
ed948666
tangwang
tidy
|
243
|
return vec
|
be52af70
tangwang
first commit
|
244
245
246
247
|
def encode_batch(
self,
images: List[Union[str, Image.Image]],
|
200fdddf
tangwang
embed norm
|
248
249
|
batch_size: int = 8,
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
250
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
251
252
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
253
|
) -> List[np.ndarray]:
|
be52af70
tangwang
first commit
|
254
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
255
|
Encode a batch of images efficiently via network service.
|
be52af70
tangwang
first commit
|
256
257
258
|
Args:
images: List of image URLs or PIL Images
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
259
|
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
260
261
|
Returns:
|
ed948666
tangwang
tidy
|
262
|
List of embeddings
|
be52af70
tangwang
first commit
|
263
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
264
|
for i, img in enumerate(images):
|
ed948666
tangwang
tidy
|
265
266
267
268
269
270
|
if isinstance(img, Image.Image):
raise NotImplementedError(f"PIL Image at index {i} is not supported by service")
if not isinstance(img, str) or not img.strip():
raise ValueError(f"Invalid image URL/path at index {i}: {img!r}")
results: List[np.ndarray] = []
|
4a37d233
tangwang
1. embedding cach...
|
271
272
273
274
275
|
pending_urls: List[str] = []
pending_positions: List[int] = []
normalized_urls = [str(u).strip() for u in images] # type: ignore[list-item]
for pos, url in enumerate(normalized_urls):
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
276
277
278
|
cache_key = build_image_cache_key(
url, normalize=normalize_embeddings, model_name=self._mm_model_name
)
|
7214c2e7
tangwang
mplemented**
|
279
|
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
280
281
|
if cached is not None:
results.append(cached)
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
282
283
284
285
|
continue
results.append(np.array([], dtype=np.float32)) # placeholder
pending_positions.append(pos)
pending_urls.append(url)
|
4a37d233
tangwang
1. embedding cach...
|
286
287
288
|
for i in range(0, len(pending_urls), batch_size):
batch_urls = pending_urls[i : i + batch_size]
|
b754fd41
tangwang
图片向量化支持优先级参数
|
289
290
291
292
|
response_data = self._call_service(
batch_urls,
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
293
294
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
295
|
)
|
ed948666
tangwang
tidy
|
296
297
298
299
300
301
302
303
304
305
306
307
|
if not response_data or len(response_data) != len(batch_urls):
raise RuntimeError(
f"Image embedding response length mismatch: expected {len(batch_urls)}, "
f"got {0 if response_data is None else len(response_data)}"
)
for j, url in enumerate(batch_urls):
embedding = response_data[j]
if embedding is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(embedding, dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
5a01af3c
tangwang
多模态hashkey调整:1. 加...
|
308
309
310
311
312
313
|
self.cache.set(
build_image_cache_key(
url, normalize=normalize_embeddings, model_name=self._mm_model_name
),
vec,
)
|
4a37d233
tangwang
1. embedding cach...
|
314
315
|
pos = pending_positions[i + j]
results[pos] = vec
|
be52af70
tangwang
first commit
|
316
317
|
return results
|
e7a2c0b7
tangwang
img encode
|
318
319
320
321
322
|
def encode_image_urls(
self,
urls: List[str],
batch_size: Optional[int] = None,
|
200fdddf
tangwang
embed norm
|
323
|
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
324
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
325
326
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
327
|
) -> List[np.ndarray]:
|
e7a2c0b7
tangwang
img encode
|
328
329
330
331
332
333
334
335
|
"""
与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
Args:
urls: 图片 URL 列表
batch_size: 批大小(默认 8)
Returns:
|
ed948666
tangwang
tidy
|
336
|
与 urls 等长的向量列表
|
e7a2c0b7
tangwang
img encode
|
337
|
"""
|
200fdddf
tangwang
embed norm
|
338
339
340
341
|
return self.encode_batch(
urls,
batch_size=batch_size or 8,
normalize_embeddings=normalize_embeddings,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
342
|
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
343
344
|
request_id=request_id,
user_id=user_id,
|
200fdddf
tangwang
embed norm
|
345
|
)
|