950a640e
tangwang
embeddings
|
1
|
"""Image embedding client for the local embedding HTTP service."""
|
be52af70
tangwang
first commit
|
2
|
|
950a640e
tangwang
embeddings
|
3
4
5
|
import logging
from typing import Any, List, Optional, Union
|
be52af70
tangwang
first commit
|
6
|
import numpy as np
|
950a640e
tangwang
embeddings
|
7
|
import requests
|
be52af70
tangwang
first commit
|
8
|
from PIL import Image
|
be52af70
tangwang
first commit
|
9
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
10
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
11
|
|
86d8358b
tangwang
config optimize
|
12
|
from config.loader import get_app_config
|
7214c2e7
tangwang
mplemented**
|
13
|
from config.services_config import get_embedding_image_base_url
|
7214c2e7
tangwang
mplemented**
|
14
|
from embeddings.cache_keys import build_image_cache_key
|
4a37d233
tangwang
1. embedding cach...
|
15
|
from embeddings.redis_embedding_cache import RedisEmbeddingCache
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
16
|
from request_log_context import build_downstream_request_headers, build_request_log_extra
|
42e3aea6
tangwang
tidy
|
17
|
|
be52af70
tangwang
first commit
|
18
19
20
|
class CLIPImageEncoder:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
21
|
Image Encoder for generating image embeddings using network service.
|
be52af70
tangwang
first commit
|
22
|
|
950a640e
tangwang
embeddings
|
23
|
This client is stateless and safe to instantiate per caller.
|
be52af70
tangwang
first commit
|
24
25
|
"""
|
950a640e
tangwang
embeddings
|
26
|
def __init__(self, service_url: Optional[str] = None):
|
af03fdef
tangwang
embedding模块代码整理
|
27
|
resolved_url = service_url or get_embedding_image_base_url()
|
86d8358b
tangwang
config optimize
|
28
|
redis_config = get_app_config().infrastructure.redis
|
950a640e
tangwang
embeddings
|
29
30
|
self.service_url = str(resolved_url).rstrip("/")
self.endpoint = f"{self.service_url}/embed/image"
|
4a37d233
tangwang
1. embedding cach...
|
31
|
# Reuse embedding cache prefix, but separate namespace for images to avoid collisions.
|
86d8358b
tangwang
config optimize
|
32
|
self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
|
950a640e
tangwang
embeddings
|
33
|
logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url)
|
4a37d233
tangwang
1. embedding cach...
|
34
35
36
37
|
self.cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="image",
)
|
be52af70
tangwang
first commit
|
38
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
39
40
41
42
43
|
def _call_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
44
45
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
46
|
) -> List[Any]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
47
48
|
"""
Call the embedding service API.
|
be52af70
tangwang
first commit
|
49
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
50
|
Args:
|
7bfb9946
tangwang
向量化模块
|
51
|
request_data: List of image URLs / local file paths
|
be52af70
tangwang
first commit
|
52
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
53
|
Returns:
|
7bfb9946
tangwang
向量化模块
|
54
|
List of embeddings (list[float]) or nulls (None), aligned to input order
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
55
|
"""
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
56
|
response = None
|
be52af70
tangwang
first commit
|
57
|
try:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
58
59
|
response = requests.post(
self.endpoint,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
60
61
62
63
|
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
64
|
json=request_data,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
65
|
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
66
67
68
69
70
|
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"CLIPImageEncoder service request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
85
|
raise
|
be52af70
tangwang
first commit
|
86
|
|
ed948666
tangwang
tidy
|
87
|
def encode_image(self, image: Image.Image) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
88
89
|
"""
Encode image to embedding vector using network service.
|
be52af70
tangwang
first commit
|
90
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
91
92
|
Note: This method is kept for compatibility but the service only works with URLs.
"""
|
ed948666
tangwang
tidy
|
93
|
raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
|
be52af70
tangwang
first commit
|
94
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
95
96
97
98
99
|
def encode_image_from_url(
self,
url: str,
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
100
101
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
102
|
) -> np.ndarray:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
103
104
|
"""
Generate image embedding via network service using URL.
|
be52af70
tangwang
first commit
|
105
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
106
107
|
Args:
url: Image URL to process
|
be52af70
tangwang
first commit
|
108
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
109
|
Returns:
|
ed948666
tangwang
tidy
|
110
|
Embedding vector
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
111
|
"""
|
7214c2e7
tangwang
mplemented**
|
112
113
|
cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
114
115
116
|
if cached is not None:
return cached
|
b754fd41
tangwang
图片向量化支持优先级参数
|
117
118
119
120
|
response_data = self._call_service(
[url],
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
121
122
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
123
|
)
|
ed948666
tangwang
tidy
|
124
125
126
127
128
|
if not response_data or len(response_data) != 1 or response_data[0] is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(response_data[0], dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
7214c2e7
tangwang
mplemented**
|
129
|
self.cache.set(cache_key, vec)
|
ed948666
tangwang
tidy
|
130
|
return vec
|
be52af70
tangwang
first commit
|
131
132
133
134
|
def encode_batch(
self,
images: List[Union[str, Image.Image]],
|
200fdddf
tangwang
embed norm
|
135
136
|
batch_size: int = 8,
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
137
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
138
139
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
140
|
) -> List[np.ndarray]:
|
be52af70
tangwang
first commit
|
141
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
142
|
Encode a batch of images efficiently via network service.
|
be52af70
tangwang
first commit
|
143
144
145
|
Args:
images: List of image URLs or PIL Images
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
146
|
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
147
148
|
Returns:
|
ed948666
tangwang
tidy
|
149
|
List of embeddings
|
be52af70
tangwang
first commit
|
150
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
151
|
for i, img in enumerate(images):
|
ed948666
tangwang
tidy
|
152
153
154
155
156
157
|
if isinstance(img, Image.Image):
raise NotImplementedError(f"PIL Image at index {i} is not supported by service")
if not isinstance(img, str) or not img.strip():
raise ValueError(f"Invalid image URL/path at index {i}: {img!r}")
results: List[np.ndarray] = []
|
4a37d233
tangwang
1. embedding cach...
|
158
159
160
161
162
|
pending_urls: List[str] = []
pending_positions: List[int] = []
normalized_urls = [str(u).strip() for u in images] # type: ignore[list-item]
for pos, url in enumerate(normalized_urls):
|
7214c2e7
tangwang
mplemented**
|
163
164
|
cache_key = build_image_cache_key(url, normalize=normalize_embeddings)
cached = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
165
166
|
if cached is not None:
results.append(cached)
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
167
168
169
170
|
continue
results.append(np.array([], dtype=np.float32)) # placeholder
pending_positions.append(pos)
pending_urls.append(url)
|
4a37d233
tangwang
1. embedding cach...
|
171
172
173
|
for i in range(0, len(pending_urls), batch_size):
batch_urls = pending_urls[i : i + batch_size]
|
b754fd41
tangwang
图片向量化支持优先级参数
|
174
175
176
177
|
response_data = self._call_service(
batch_urls,
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
178
179
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
180
|
)
|
ed948666
tangwang
tidy
|
181
182
183
184
185
186
187
188
189
190
191
192
|
if not response_data or len(response_data) != len(batch_urls):
raise RuntimeError(
f"Image embedding response length mismatch: expected {len(batch_urls)}, "
f"got {0 if response_data is None else len(response_data)}"
)
for j, url in enumerate(batch_urls):
embedding = response_data[j]
if embedding is None:
raise RuntimeError(f"No image embedding returned for URL: {url}")
vec = np.array(embedding, dtype=np.float32)
if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all():
raise RuntimeError(f"Invalid image embedding returned for URL: {url}")
|
7214c2e7
tangwang
mplemented**
|
193
|
self.cache.set(build_image_cache_key(url, normalize=normalize_embeddings), vec)
|
4a37d233
tangwang
1. embedding cach...
|
194
195
|
pos = pending_positions[i + j]
results[pos] = vec
|
be52af70
tangwang
first commit
|
196
197
|
return results
|
e7a2c0b7
tangwang
img encode
|
198
199
200
201
202
|
def encode_image_urls(
self,
urls: List[str],
batch_size: Optional[int] = None,
|
200fdddf
tangwang
embed norm
|
203
|
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
204
|
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
205
206
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
ed948666
tangwang
tidy
|
207
|
) -> List[np.ndarray]:
|
e7a2c0b7
tangwang
img encode
|
208
209
210
211
212
213
214
215
|
"""
与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
Args:
urls: 图片 URL 列表
batch_size: 批大小(默认 8)
Returns:
|
ed948666
tangwang
tidy
|
216
|
与 urls 等长的向量列表
|
e7a2c0b7
tangwang
img encode
|
217
|
"""
|
200fdddf
tangwang
embed norm
|
218
219
220
221
|
return self.encode_batch(
urls,
batch_size=batch_size or 8,
normalize_embeddings=normalize_embeddings,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
222
|
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
223
224
|
request_id=request_id,
user_id=user_id,
|
200fdddf
tangwang
embed norm
|
225
|
)
|