be52af70
tangwang
first commit
|
1
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
2
|
Image embedding encoder using network service.
|
be52af70
tangwang
first commit
|
3
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
4
|
Generates embeddings via HTTP API service running on localhost:5001.
|
be52af70
tangwang
first commit
|
5
6
7
8
|
"""
import sys
import os
|
be52af70
tangwang
first commit
|
9
|
import requests
|
be52af70
tangwang
first commit
|
10
11
12
13
|
import numpy as np
from PIL import Image
import logging
import threading
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
14
|
from typing import List, Optional, Union, Dict, Any
|
be52af70
tangwang
first commit
|
15
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
16
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
17
18
19
20
|
class CLIPImageEncoder:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
21
|
Image Encoder for generating image embeddings using network service.
|
be52af70
tangwang
first commit
|
22
23
24
25
26
27
28
|
Thread-safe singleton pattern.
"""
_instance = None
_lock = threading.Lock()
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
29
|
def __new__(cls, service_url='http://localhost:5001'):
|
be52af70
tangwang
first commit
|
30
31
32
|
with cls._lock:
if cls._instance is None:
cls._instance = super(CLIPImageEncoder, cls).__new__(cls)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
33
34
35
|
logger.info(f"Creating CLIPImageEncoder instance with service URL: {service_url}")
cls._instance.service_url = service_url
cls._instance.endpoint = f"{service_url}/embedding/generate_image_embeddings"
|
be52af70
tangwang
first commit
|
36
37
|
return cls._instance
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
38
39
40
|
def _call_service(self, request_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Call the embedding service API.
|
be52af70
tangwang
first commit
|
41
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
42
43
|
Args:
request_data: List of dictionaries with id and pic_url fields
|
be52af70
tangwang
first commit
|
44
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
45
46
47
|
Returns:
List of dictionaries with id, pic_url, embedding and error fields
"""
|
be52af70
tangwang
first commit
|
48
|
try:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
49
50
51
52
53
54
55
56
57
58
|
response = requests.post(
self.endpoint,
json=request_data,
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True)
raise
|
be52af70
tangwang
first commit
|
59
60
|
def encode_image(self, image: Image.Image) -> Optional[np.ndarray]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
61
62
|
"""
Encode image to embedding vector using network service.
|
be52af70
tangwang
first commit
|
63
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
64
65
66
67
|
Note: This method is kept for compatibility but the service only works with URLs.
"""
logger.warning("encode_image with PIL Image not supported by service, returning None")
return None
|
be52af70
tangwang
first commit
|
68
69
|
def encode_image_from_url(self, url: str) -> Optional[np.ndarray]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
70
71
|
"""
Generate image embedding via network service using URL.
|
be52af70
tangwang
first commit
|
72
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
73
74
|
Args:
url: Image URL to process
|
be52af70
tangwang
first commit
|
75
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
Returns:
Embedding vector or None if failed
"""
try:
# Prepare request data
request_data = [{
"id": "image_0",
"pic_url": url
}]
# Call service
response_data = self._call_service(request_data)
# Process response
if response_data and len(response_data) > 0:
response_item = response_data[0]
if response_item.get("embedding"):
return np.array(response_item["embedding"], dtype=np.float32)
else:
logger.warning(f"No embedding for URL {url}, error: {response_item.get('error', 'Unknown error')}")
return None
else:
logger.warning(f"No response for URL {url}")
return None
|
be52af70
tangwang
first commit
|
100
101
|
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
102
|
logger.error(f"Failed to process image from URL {url}: {str(e)}", exc_info=True)
|
be52af70
tangwang
first commit
|
103
104
105
106
107
108
109
110
|
return None
def encode_batch(
self,
images: List[Union[str, Image.Image]],
batch_size: int = 8
) -> List[Optional[np.ndarray]]:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
111
|
Encode a batch of images efficiently via network service.
|
be52af70
tangwang
first commit
|
112
113
114
|
Args:
images: List of image URLs or PIL Images
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
115
|
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
116
117
118
119
|
Returns:
List of embeddings (or None for failed images)
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# Initialize results with None for all images
results = [None] * len(images)
# Filter out PIL Images since service only supports URLs
url_images = []
url_indices = []
for i, img in enumerate(images):
if isinstance(img, str):
url_images.append(img)
url_indices.append(i)
elif isinstance(img, Image.Image):
logger.warning(f"PIL Image at index {i} not supported by service, returning None")
# results[i] is already None
# Process URLs in batches
for i in range(0, len(url_images), batch_size):
batch_urls = url_images[i:i + batch_size]
batch_indices = url_indices[i:i + batch_size]
# Prepare request data
request_data = []
for j, url in enumerate(batch_urls):
request_data.append({
"id": f"image_{j}",
"pic_url": url
})
try:
# Call service
response_data = self._call_service(request_data)
# Process response
batch_results = []
for j, url in enumerate(batch_urls):
response_item = None
for item in response_data:
if str(item.get("id")) == f"image_{j}":
response_item = item
break
if response_item and response_item.get("embedding"):
batch_results.append(np.array(response_item["embedding"], dtype=np.float32))
else:
error_msg = response_item.get("error", "Unknown error") if response_item else "No response"
logger.warning(f"Failed to encode URL {url}: {error_msg}")
batch_results.append(None)
# Insert results at the correct positions
for j, result in enumerate(batch_results):
results[batch_indices[j]] = result
except Exception as e:
logger.error(f"Batch processing failed: {e}", exc_info=True)
# Fill with None for this batch
for j in range(len(batch_urls)):
results[batch_indices[j]] = None
|
be52af70
tangwang
first commit
|
177
178
|
return results
|