950a640e
tangwang
embeddings
|
1
|
"""Image embedding client for the local embedding HTTP service."""
|
be52af70
tangwang
first commit
|
2
|
|
950a640e
tangwang
embeddings
|
3
4
5
|
import logging
from typing import Any, List, Optional, Union
|
be52af70
tangwang
first commit
|
6
|
import numpy as np
|
950a640e
tangwang
embeddings
|
7
|
import requests
|
be52af70
tangwang
first commit
|
8
|
from PIL import Image
|
be52af70
tangwang
first commit
|
9
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
10
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
11
|
|
86d8358b
tangwang
config optimize
|
12
|
from config.loader import get_app_config
|
7a013ca7
tangwang
多模态文本向量服务ok
|
13
14
|
from config.services_config import get_embedding_image_backend_config, get_embedding_image_base_url
from embeddings.cache_keys import build_clip_text_cache_key, build_image_cache_key
|
4a37d233
tangwang
1. embedding cach...
|
15
|
from embeddings.redis_embedding_cache import RedisEmbeddingCache
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
16
|
from request_log_context import build_downstream_request_headers, build_request_log_extra
|
42e3aea6
tangwang
tidy
|
17
|
|
be52af70
tangwang
first commit
|
18
19
20
|
class CLIPImageEncoder:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
21
|
Image Encoder for generating image embeddings using network service.
|
be52af70
tangwang
first commit
|
22
|
|
950a640e
tangwang
embeddings
|
23
|
This client is stateless and safe to instantiate per caller.
|
be52af70
tangwang
first commit
|
24
25
|
"""
|
950a640e
tangwang
embeddings
|
26
|
def __init__(self, service_url: Optional[str] = None):
|
af03fdef
tangwang
embedding模块代码整理
|
27
|
resolved_url = service_url or get_embedding_image_base_url()
|
86d8358b
tangwang
config optimize
|
28
|
redis_config = get_app_config().infrastructure.redis
|
950a640e
tangwang
embeddings
|
29
30
|
self.service_url = str(resolved_url).rstrip("/")
self.endpoint = f"{self.service_url}/embed/image"
|
7a013ca7
tangwang
多模态文本向量服务ok
|
31
|
self.clip_text_endpoint = f"{self.service_url}/embed/clip_text"
|
4a37d233
tangwang
1. embedding cach...
|
32
|
# Reuse embedding cache prefix, but separate namespace for images to avoid collisions.
|
86d8358b
tangwang
config optimize
|
33
|
self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
|
950a640e
tangwang
embeddings
|
34
|
logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url)
|
4a37d233
tangwang
1. embedding cach...
|
35
36
37
38
|
self.cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="image",
)
|
7a013ca7
tangwang
多模态文本向量服务ok
|
39
40
41
42
|
self._clip_text_cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="clip_text",
)
|
be52af70
tangwang
first commit
|
43
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
44
45
46
47
48
|
def _call_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
49
50
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
51
|
) -> List[Any]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
52
53
|
"""
Call the embedding service API.
|
be52af70
tangwang
first commit
|
54
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
55
|
Args:
|
7bfb9946
tangwang
向量化模块
|
56
|
request_data: List of image URLs / local file paths
|
be52af70
tangwang
first commit
|
57
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
58
|
Returns:
|
7bfb9946
tangwang
向量化模块
|
59
|
List of embeddings (list[float]) or nulls (None), aligned to input order
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
60
|
"""
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
61
|
response = None
|
be52af70
tangwang
first commit
|
62
|
try:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
63
64
|
response = requests.post(
self.endpoint,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
65
66
67
68
|
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
69
|
json=request_data,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
70
|
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
71
72
73
74
75
|
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"CLIPImageEncoder service request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
90
|
raise
|
be52af70
tangwang
first commit
|
91
|
|
7a013ca7
tangwang
多模态文本向量服务ok
|
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
def _clip_text_via_grpc(
self,
request_data: List[str],
normalize_embeddings: bool,
) -> List[Any]:
"""旧版 6008 无 ``/embed/clip_text`` 时走 gRPC(需 ``image_backend: clip_as_service``)。"""
backend, cfg = get_embedding_image_backend_config()
if backend != "clip_as_service":
raise RuntimeError(
"POST /embed/clip_text 返回 404:请重启图片向量服务(6008)以加载新路由;"
"或配置 services.embedding.image_backend=clip_as_service 并启动 grpc cnclip。"
)
from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
from embeddings.config import CONFIG
enc = ClipAsServiceImageEncoder(
server=str(cfg.get("server") or CONFIG.CLIP_AS_SERVICE_SERVER),
batch_size=int(cfg.get("batch_size") or CONFIG.IMAGE_BATCH_SIZE),
)
arrs = enc.encode_clip_texts(
request_data,
batch_size=len(request_data),
normalize_embeddings=normalize_embeddings,
)
return [v.tolist() for v in arrs]
def _call_clip_text_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 1,
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> List[Any]:
response = None
try:
response = requests.post(
self.clip_text_endpoint,
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
json=request_data,
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
timeout=60,
)
if response.status_code == 404:
logger.warning(
"POST %s returned 404; using clip-as-service gRPC fallback (restart 6008 after deploy to use HTTP)",
self.clip_text_endpoint,
)
return self._clip_text_via_grpc(request_data, normalize_embeddings)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"CLIPImageEncoder clip_text request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
raise
def encode_clip_text(
self,
text: str,
normalize_embeddings: bool = True,
priority: int = 1,
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> np.ndarray:
"""
CN-CLIP 文本塔(与 ``/embed/image`` 同向量空间),对应服务端 ``POST /embed/clip_text``。
"""
cache_key = build_clip_text_cache_key(text, normalize=normalize_embeddings)
cached = self._clip_text_cache.get(cache_key)
if cached is not None:
return cached
response_data = self._call_clip_text_service(
[text.strip()],
normalize_embeddings=normalize_embeddings,
priority=priority,
request_id=request_id,
user_id=user_id,
)
if not response_data or len(response_data) != 1 or response_data[0] is None:
raise RuntimeError(f"No CLIP text embedding returned for: {text[:80]!r}")
vec = np.array(response_data[0], dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError("Invalid CLIP text embedding returned")
self._clip_text_cache.set(cache_key, vec)
return vec
|
ed948666
tangwang
tidy
|
194
|
def encode_image(self, image: Image.Image) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
195
196
|
"""
Encode image to embedding vector using network service.
|
be52af70
tangwang
first commit
|
197
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
198
199
|
Note: This method is kept for compatibility but the service only works with URLs.
"""
|
ed948666
tangwang
tidy
|
200
|
raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
|
be52af70
tangwang
first commit
|
201
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
202
203
204
205
206
|
def encode_image_from_url(
self,
url: str,
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
207
208
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
209
|
) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
210
211
|
"""
Generate image embedding via network service using URL.
|
be52af70
tangwang
first commit
|
212
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
213
214
|
Args:
url: Image URL to process
|
be52af70
tangwang
first commit
|
215
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
216
|
Returns:
|
ed948666
tangwang
tidy
|
217
|
Embedding vector
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
218
|
"""
|
7214c2e7
tangwang
mplemented**
|
219
220
|
cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
221
222
223
|
if cached is not None:
return cached
|
b754fd41
tangwang
图片向量化支持优先级参数
|
224
225
226
227
|
response_data = self._call_service(
[url],
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
228
229
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
230
|
)
|
ed948666
tangwang
tidy
|
231
232
233
234
235
|
if not response_data or len(response_data) != 1 or response_data[0] is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(response_data[0], dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
7214c2e7
tangwang
mplemented**
|
236
|
self.cache.set(cache_key, vec)
|
ed948666
tangwang
tidy
|
237
|
return vec
|
be52af70
tangwang
first commit
|
238
239
240
241
|
def encode_batch(
self,
images: List[Union[str, Image.Image]],
|
200fdddf
tangwang
embed norm
|
242
243
|
batch_size: int = 8,
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
244
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
245
246
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
247
|
) -> List[np.ndarray]:
|
be52af70
tangwang
first commit
|
248
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
249
|
Encode a batch of images efficiently via network service.
|
be52af70
tangwang
first commit
|
250
251
252
|
Args:
images: List of image URLs or PIL Images
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
253
|
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
254
255
|
Returns:
|
ed948666
tangwang
tidy
|
256
|
List of embeddings
|
be52af70
tangwang
first commit
|
257
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
258
|
for i, img in enumerate(images):
|
ed948666
tangwang
tidy
|
259
260
261
262
263
264
|
if isinstance(img, Image.Image):
raise NotImplementedError(f"PIL Image at index {i} is not supported by service")
if not isinstance(img, str) or not img.strip():
raise ValueError(f"Invalid image URL/path at index {i}: {img!r}")
results: List[np.ndarray] = []
|
4a37d233
tangwang
1. embedding cach...
|
265
266
267
268
269
|
pending_urls: List[str] = []
pending_positions: List[int] = []
normalized_urls = [str(u).strip() for u in images] # type: ignore[list-item]
for pos, url in enumerate(normalized_urls):
|
7214c2e7
tangwang
mplemented**
|
270
271
|
cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
272
273
|
if cached is not None:
results.append(cached)
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
274
275
276
277
|
continue
results.append(np.array([], dtype=np.float32)) # placeholder
pending_positions.append(pos)
pending_urls.append(url)
|
4a37d233
tangwang
1. embedding cach...
|
278
279
280
|
for i in range(0, len(pending_urls), batch_size):
batch_urls = pending_urls[i : i + batch_size]
|
b754fd41
tangwang
图片向量化支持优先级参数
|
281
282
283
284
|
response_data = self._call_service(
batch_urls,
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
285
286
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
287
|
)
|
ed948666
tangwang
tidy
|
288
289
290
291
292
293
294
295
296
297
298
299
|
if not response_data or len(response_data) != len(batch_urls):
raise RuntimeError(
f"Image embedding response length mismatch: expected {len(batch_urls)}, "
f"got {0 if response_data is None else len(response_data)}"
)
for j, url in enumerate(batch_urls):
embedding = response_data[j]
if embedding is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(embedding, dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
7214c2e7
tangwang
mplemented**
|
300
|
self.cache.set(build_image_cache_key(url, normalize=normalize_embeddings), vec)
|
4a37d233
tangwang
1. embedding cach...
|
301
302
|
pos = pending_positions[i + j]
results[pos] = vec
|
be52af70
tangwang
first commit
|
303
304
|
return results
|
e7a2c0b7
tangwang
img encode
|
305
306
307
308
309
|
def encode_image_urls(
self,
urls: List[str],
batch_size: Optional[int] = None,
|
200fdddf
tangwang
embed norm
|
310
|
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
311
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
312
313
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
314
|
) -> List[np.ndarray]:
|
e7a2c0b7
tangwang
img encode
|
315
316
317
318
319
320
321
322
|
"""
与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
Args:
urls: 图片 URL 列表
batch_size: 批大小(默认 8)
Returns:
|
ed948666
tangwang
tidy
|
323
|
与 urls 等长的向量列表
|
e7a2c0b7
tangwang
img encode
|
324
|
"""
|
200fdddf
tangwang
embed norm
|
325
326
327
328
|
return self.encode_batch(
urls,
batch_size=batch_size or 8,
normalize_embeddings=normalize_embeddings,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
329
|
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
330
331
|
request_id=request_id,
user_id=user_id,
|
200fdddf
tangwang
embed norm
|
332
|
)
|