"""Image embedding client for the local embedding HTTP service.""" import logging from typing import Any, List, Optional, Union import numpy as np import requests from PIL import Image logger = logging.getLogger(__name__) from config.loader import get_app_config from config.services_config import get_embedding_image_base_url from embeddings.cache_keys import build_image_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache class CLIPImageEncoder: """ Image Encoder for generating image embeddings using network service. This client is stateless and safe to instantiate per caller. """ def __init__(self, service_url: Optional[str] = None): resolved_url = service_url or get_embedding_image_base_url() redis_config = get_app_config().infrastructure.redis self.service_url = str(resolved_url).rstrip("/") self.endpoint = f"{self.service_url}/embed/image" # Reuse embedding cache prefix, but separate namespace for images to avoid collisions. self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding" logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url) self.cache = RedisEmbeddingCache( key_prefix=self.cache_prefix, namespace="image", ) def _call_service(self, request_data: List[str], normalize_embeddings: bool = True) -> List[Any]: """ Call the embedding service API. Args: request_data: List of image URLs / local file paths Returns: List of embeddings (list[float]) or nulls (None), aligned to input order """ try: response = requests.post( self.endpoint, params={"normalize": "true" if normalize_embeddings else "false"}, json=request_data, timeout=60 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True) raise def encode_image(self, image: Image.Image) -> np.ndarray: """ Encode image to embedding vector using network service. Note: This method is kept for compatibility but the service only works with URLs. """ raise NotImplementedError("encode_image with PIL Image is not supported by embedding service") def encode_image_from_url(self, url: str, normalize_embeddings: bool = True) -> np.ndarray: """ Generate image embedding via network service using URL. Args: url: Image URL to process Returns: Embedding vector """ cache_key = build_image_cache_key(url, normalize=normalize_embeddings) cached = self.cache.get(cache_key) if cached is not None: return cached response_data = self._call_service([url], normalize_embeddings=normalize_embeddings) if not response_data or len(response_data) != 1 or response_data[0] is None: raise RuntimeError(f"No image embedding returned for URL: {url}") vec = np.array(response_data[0], dtype=np.float32) if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): raise RuntimeError(f"Invalid image embedding returned for URL: {url}") self.cache.set(cache_key, vec) return vec def encode_batch( self, images: List[Union[str, Image.Image]], batch_size: int = 8, normalize_embeddings: bool = True, ) -> List[np.ndarray]: """ Encode a batch of images efficiently via network service. Args: images: List of image URLs or PIL Images batch_size: Batch size for processing (used for service requests) Returns: List of embeddings """ for i, img in enumerate(images): if isinstance(img, Image.Image): raise NotImplementedError(f"PIL Image at index {i} is not supported by service") if not isinstance(img, str) or not img.strip(): raise ValueError(f"Invalid image URL/path at index {i}: {img!r}") results: List[np.ndarray] = [] pending_urls: List[str] = [] pending_positions: List[int] = [] normalized_urls = [str(u).strip() for u in images] # type: ignore[list-item] for pos, url in enumerate(normalized_urls): cache_key = build_image_cache_key(url, normalize=normalize_embeddings) cached = self.cache.get(cache_key) if cached is not None: results.append(cached) continue results.append(np.array([], dtype=np.float32)) # placeholder pending_positions.append(pos) pending_urls.append(url) for i in range(0, len(pending_urls), batch_size): batch_urls = pending_urls[i : i + batch_size] response_data = self._call_service(batch_urls, normalize_embeddings=normalize_embeddings) if not response_data or len(response_data) != len(batch_urls): raise RuntimeError( f"Image embedding response length mismatch: expected {len(batch_urls)}, " f"got {0 if response_data is None else len(response_data)}" ) for j, url in enumerate(batch_urls): embedding = response_data[j] if embedding is None: raise RuntimeError(f"No image embedding returned for URL: {url}") vec = np.array(embedding, dtype=np.float32) if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): raise RuntimeError(f"Invalid image embedding returned for URL: {url}") self.cache.set(build_image_cache_key(url, normalize=normalize_embeddings), vec) pos = pending_positions[i + j] results[pos] = vec return results def encode_image_urls( self, urls: List[str], batch_size: Optional[int] = None, normalize_embeddings: bool = True, ) -> List[np.ndarray]: """ 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。 Args: urls: 图片 URL 列表 batch_size: 批大小(默认 8) Returns: 与 urls 等长的向量列表 """ return self.encode_batch( urls, batch_size=batch_size or 8, normalize_embeddings=normalize_embeddings, )