be52af70
tangwang
first commit
|
1
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
2
|
Image embedding encoder using network service.
|
be52af70
tangwang
first commit
|
3
|
|
7bfb9946
tangwang
向量化模块
|
4
|
Generates embeddings via HTTP API service (default localhost:6005).
|
be52af70
tangwang
first commit
|
5
6
7
8
|
"""
import sys
import os
|
be52af70
tangwang
first commit
|
9
|
import requests
|
be52af70
tangwang
first commit
|
10
11
12
13
|
import numpy as np
from PIL import Image
import logging
import threading
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
14
|
from typing import List, Optional, Union, Dict, Any
|
be52af70
tangwang
first commit
|
15
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
16
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
17
18
19
20
|
class CLIPImageEncoder:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
21
|
Image Encoder for generating image embeddings using network service.
|
be52af70
tangwang
first commit
|
22
23
24
25
26
27
28
|
Thread-safe singleton pattern.
"""
_instance = None
_lock = threading.Lock()
|
7bfb9946
tangwang
向量化模块
|
29
|
def __new__(cls, service_url: Optional[str] = None):
|
be52af70
tangwang
first commit
|
30
31
32
|
with cls._lock:
if cls._instance is None:
cls._instance = super(CLIPImageEncoder, cls).__new__(cls)
|
7bfb9946
tangwang
向量化模块
|
33
34
35
36
|
resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:6005")
logger.info(f"Creating CLIPImageEncoder instance with service URL: {resolved_url}")
cls._instance.service_url = resolved_url
cls._instance.endpoint = f"{resolved_url}/embed/image"
|
be52af70
tangwang
first commit
|
37
38
|
return cls._instance
|
7bfb9946
tangwang
向量化模块
|
39
|
def _call_service(self, request_data: List[str]) -> List[Any]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
40
41
|
"""
Call the embedding service API.
|
be52af70
tangwang
first commit
|
42
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
43
|
Args:
|
7bfb9946
tangwang
向量化模块
|
44
|
request_data: List of image URLs / local file paths
|
be52af70
tangwang
first commit
|
45
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
46
|
Returns:
|
7bfb9946
tangwang
向量化模块
|
47
|
List of embeddings (list[float]) or nulls (None), aligned to input order
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
48
|
"""
|
be52af70
tangwang
first commit
|
49
|
try:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
50
51
52
53
54
55
56
57
58
59
|
response = requests.post(
self.endpoint,
json=request_data,
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True)
raise
|
be52af70
tangwang
first commit
|
60
61
|
def encode_image(self, image: Image.Image) -> Optional[np.ndarray]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
62
63
|
"""
Encode image to embedding vector using network service.
|
be52af70
tangwang
first commit
|
64
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
65
66
67
68
|
Note: This method is kept for compatibility but the service only works with URLs.
"""
logger.warning("encode_image with PIL Image not supported by service, returning None")
return None
|
be52af70
tangwang
first commit
|
69
70
|
def encode_image_from_url(self, url: str) -> Optional[np.ndarray]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
71
72
|
"""
Generate image embedding via network service using URL.
|
be52af70
tangwang
first commit
|
73
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
74
75
|
Args:
url: Image URL to process
|
be52af70
tangwang
first commit
|
76
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
77
78
79
80
|
Returns:
Embedding vector or None if failed
"""
try:
|
7bfb9946
tangwang
向量化模块
|
81
82
83
84
85
|
response_data = self._call_service([url])
if response_data and len(response_data) > 0 and response_data[0] is not None:
return np.array(response_data[0], dtype=np.float32)
logger.warning(f"No embedding for URL {url}")
return None
|
be52af70
tangwang
first commit
|
86
87
|
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
88
|
logger.error(f"Failed to process image from URL {url}: {str(e)}", exc_info=True)
|
be52af70
tangwang
first commit
|
89
90
91
92
93
94
95
96
|
return None
def encode_batch(
self,
images: List[Union[str, Image.Image]],
batch_size: int = 8
) -> List[Optional[np.ndarray]]:
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
97
|
Encode a batch of images efficiently via network service.
|
be52af70
tangwang
first commit
|
98
99
100
|
Args:
images: List of image URLs or PIL Images
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
101
|
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
102
103
104
105
|
Returns:
List of embeddings (or None for failed images)
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# Initialize results with None for all images
results = [None] * len(images)
# Filter out PIL Images since service only supports URLs
url_images = []
url_indices = []
for i, img in enumerate(images):
if isinstance(img, str):
url_images.append(img)
url_indices.append(i)
elif isinstance(img, Image.Image):
logger.warning(f"PIL Image at index {i} not supported by service, returning None")
# results[i] is already None
# Process URLs in batches
for i in range(0, len(url_images), batch_size):
batch_urls = url_images[i:i + batch_size]
batch_indices = url_indices[i:i + batch_size]
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
126
127
|
try:
# Call service
|
7bfb9946
tangwang
向量化模块
|
128
|
response_data = self._call_service(batch_urls)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
129
|
|
7bfb9946
tangwang
向量化模块
|
130
|
# Process response (aligned list)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
131
132
|
batch_results = []
for j, url in enumerate(batch_urls):
|
7bfb9946
tangwang
向量化模块
|
133
134
|
if response_data and j < len(response_data) and response_data[j] is not None:
batch_results.append(np.array(response_data[j], dtype=np.float32))
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
135
|
else:
|
7bfb9946
tangwang
向量化模块
|
136
|
logger.warning(f"Failed to encode URL {url}: no embedding")
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
137
138
139
140
141
142
143
144
145
146
147
|
batch_results.append(None)
# Insert results at the correct positions
for j, result in enumerate(batch_results):
results[batch_indices[j]] = result
except Exception as e:
logger.error(f"Batch processing failed: {e}", exc_info=True)
# Fill with None for this batch
for j in range(len(batch_urls)):
results[batch_indices[j]] = None
|
be52af70
tangwang
first commit
|
148
149
|
return results
|