diff --git a/README.md b/README.md index 790e7f2..781a6e6 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ 语义: -query anchor +query anchor 我想给elasticsearch 增加字段 query anchor ,即哪些query点击到了这个doc,一个doc下面有多个query anchor,每个query anchor又有这两个属性:weight、dweight,分别代表 query在doc下的点击分布权重、doc在query下的点击分布权重。请问该如何设计这两个ES字段。 需要有zh en两套query anchor,因为他们的解析器不一样。 @@ -89,6 +89,13 @@ docker run -d --name es -p 9200:9200 elasticsearch:8.11.0 # 4. 启动服务 ./run.sh +# (可选)启动本地向量服务(BGE-M3 / CN-CLIP,本地模型推理) +# 提供: POST http://localhost:6005/embed/text +# POST http://localhost:6005/embed/image +./scripts/start_embedding_service.sh +# +# 详细说明见:`embeddings/README.md` + # 5. 调用文本搜索 API curl -X POST http://localhost:6002/search/ \ -H "Content-Type: application/json" \ diff --git a/embeddings/README.md b/embeddings/README.md new file mode 100644 index 0000000..3a1deca --- /dev/null +++ b/embeddings/README.md @@ -0,0 +1,40 @@ +## 向量化模块(embeddings) + +这个目录是一个完整的“向量化模块”,包含: + +- **HTTP 客户端**:`text_encoder.py` / `image_encoder.py`(供搜索/索引模块调用) +- **本地模型实现**:`bge_model.py` / `clip_model.py` +- **向量化服务(FastAPI)**:`server.py` +- **统一配置**:`config.py` + +### 服务接口 + +- `POST /embed/text` + - 入参:`["文本1", "文本2", ...]` + - 出参:`[[...], null, ...]`(与输入按 index 对齐,失败为 `null`) + +- `POST /embed/image` + - 入参:`["url或本地路径1", ...]` + - 出参:`[[...], null, ...]`(与输入按 index 对齐,失败为 `null`) + +### 启动服务 + +使用仓库脚本启动(默认端口 6005): + +```bash +./scripts/start_embedding_service.sh +``` + +### 修改配置 + +编辑 `embeddings/config.py`: + +- `PORT`: 服务端口(默认 6005) +- `TEXT_MODEL_DIR`, `TEXT_DEVICE`, `TEXT_BATCH_SIZE` +- `IMAGE_MODEL_NAME`, `IMAGE_DEVICE` + +### 目录说明(旧文件) + +旧的 `vector_service/` 目录与 `*_encoder__local.py` 文件已经废弃,统一由本目录实现与维护。 + + diff --git a/embeddings/__init__.py b/embeddings/__init__.py index f57d672..8cd0723 100644 --- a/embeddings/__init__.py +++ b/embeddings/__init__.py @@ -1,9 +1,34 @@ -"""Embeddings package initialization.""" +""" +Embeddings module. -from .text_encoder import BgeEncoder -from .image_encoder import CLIPImageEncoder +Important: keep package import lightweight. -__all__ = [ - 'BgeEncoder', - 'CLIPImageEncoder', -] +Some callers do: + - `from embeddings import BgeEncoder` + - `from embeddings import CLIPImageEncoder` + +But the underlying implementations may import heavy optional deps (Pillow, torch, etc). +To avoid importing those at package import time (and to allow the embedding service to boot +without importing client code), we provide small lazy factories here. +""" + + +class BgeEncoder(object): + """Lazy factory for `embeddings.text_encoder.BgeEncoder`.""" + + def __new__(cls, *args, **kwargs): + from .text_encoder import BgeEncoder as _Real + + return _Real(*args, **kwargs) + + +class CLIPImageEncoder(object): + """Lazy factory for `embeddings.image_encoder.CLIPImageEncoder`.""" + + def __new__(cls, *args, **kwargs): + from .image_encoder import CLIPImageEncoder as _Real + + return _Real(*args, **kwargs) + + +__all__ = ["BgeEncoder", "CLIPImageEncoder"] diff --git a/embeddings/bge_model.py b/embeddings/bge_model.py new file mode 100644 index 0000000..2eec3bd --- /dev/null +++ b/embeddings/bge_model.py @@ -0,0 +1,81 @@ +""" +BGE-M3 local text embedding implementation. + +Internal model implementation used by the embedding service. +""" + +import threading +from typing import List, Union + +import numpy as np +from sentence_transformers import SentenceTransformer +from modelscope import snapshot_download + + +class BgeTextModel(object): + """ + Thread-safe singleton text encoder using BGE-M3 model (local inference). + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, model_dir: str = "Xorbits/bge-m3"): + with cls._lock: + if cls._instance is None: + cls._instance = super(BgeTextModel, cls).__new__(cls) + cls._instance.model = SentenceTransformer(snapshot_download(model_dir)) + return cls._instance + + def encode( + self, + sentences: Union[str, List[str]], + normalize_embeddings: bool = True, + device: str = "cuda", + batch_size: int = 32, + ) -> np.ndarray: + if device == "gpu": + device = "cuda" + + # Try requested device, fallback to CPU if CUDA fails + try: + if device == "cuda": + import torch + + if torch.cuda.is_available(): + free_memory = ( + torch.cuda.get_device_properties(0).total_memory + - torch.cuda.memory_allocated() + ) + if free_memory < 1024 * 1024 * 1024: # 1GB + device = "cpu" + else: + device = "cpu" + + self.model = self.model.to(device) + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device=device, + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings + + except Exception: + if device != "cpu": + self.model = self.model.to("cpu") + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device="cpu", + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings + raise + + def encode_batch(self, texts: List[str], batch_size: int = 32, device: str = "cuda") -> np.ndarray: + return self.encode(texts, batch_size=batch_size, device=device) + + diff --git a/embeddings/clip_model.py b/embeddings/clip_model.py new file mode 100644 index 0000000..9bb1cf8 --- /dev/null +++ b/embeddings/clip_model.py @@ -0,0 +1,107 @@ +""" +CN-CLIP local image embedding implementation. + +Internal model implementation used by the embedding service. +""" + +import io +import threading +from typing import List, Optional, Union + +import numpy as np +import requests +import torch +from PIL import Image +from cn_clip.clip import load_from_name +import cn_clip.clip as clip + + +DEFAULT_MODEL_NAME = "ViT-H-14" +MODEL_DOWNLOAD_DIR = "/data/tw/uat/EsSearcher" + + +class ClipImageModel(object): + """ + Thread-safe singleton image encoder using cn_clip (local inference). + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, model_name: str = DEFAULT_MODEL_NAME, device: Optional[str] = None): + with cls._lock: + if cls._instance is None: + cls._instance = super(ClipImageModel, cls).__new__(cls) + cls._instance._initialize_model(model_name, device) + return cls._instance + + def _initialize_model(self, model_name: str, device: Optional[str]): + self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") + self.model, self.preprocess = load_from_name( + model_name, device=self.device, download_root=MODEL_DOWNLOAD_DIR + ) + self.model.eval() + self.model_name = model_name + + def validate_image(self, image_data: bytes) -> Image.Image: + image_stream = io.BytesIO(image_data) + image = Image.open(image_stream) + image.verify() + image_stream.seek(0) + image = Image.open(image_stream) + if image.mode != "RGB": + image = image.convert("RGB") + return image + + def download_image(self, url: str, timeout: int = 10) -> bytes: + if url.startswith(("http://", "https://")): + response = requests.get(url, timeout=timeout) + if response.status_code != 200: + raise ValueError("HTTP %s" % response.status_code) + return response.content + with open(url, "rb") as f: + return f.read() + + def preprocess_image(self, image: Image.Image, max_size: int = 1024) -> Image.Image: + if max(image.size) > max_size: + ratio = float(max_size) / float(max(image.size)) + new_size = tuple(int(dim * ratio) for dim in image.size) + image = image.resize(new_size, Image.Resampling.LANCZOS) + return image + + def encode_text(self, text): + text_data = clip.tokenize([text] if isinstance(text, str) else text).to(self.device) + with torch.no_grad(): + text_features = self.model.encode_text(text_data) + text_features /= text_features.norm(dim=-1, keepdim=True) + return text_features + + def encode_image(self, image: Image.Image) -> Optional[np.ndarray]: + if not isinstance(image, Image.Image): + raise ValueError("ClipImageModel.encode_image input must be a PIL.Image") + infer_data = self.preprocess(image).unsqueeze(0).to(self.device) + with torch.no_grad(): + image_features = self.model.encode_image(infer_data) + image_features /= image_features.norm(dim=-1, keepdim=True) + return image_features.cpu().numpy().astype("float32")[0] + + def encode_image_from_url(self, url: str) -> Optional[np.ndarray]: + image_data = self.download_image(url) + image = self.validate_image(image_data) + image = self.preprocess_image(image) + return self.encode_image(image) + + def encode_batch(self, images: List[Union[str, Image.Image]], batch_size: int = 8) -> List[Optional[np.ndarray]]: + results: List[Optional[np.ndarray]] = [] + for i in range(0, len(images), batch_size): + batch = images[i : i + batch_size] + for img in batch: + if isinstance(img, str): + results.append(self.encode_image_from_url(img)) + elif isinstance(img, Image.Image): + results.append(self.encode_image(img)) + else: + results.append(None) + return results + + diff --git a/embeddings/config.py b/embeddings/config.py new file mode 100644 index 0000000..a5fc5f7 --- /dev/null +++ b/embeddings/config.py @@ -0,0 +1,33 @@ +""" +Embedding module configuration. + +This module is intentionally a plain Python file (no env var parsing, no extra deps). +Edit values here to configure: +- server host/port +- local model settings (paths/devices/batch sizes) +""" + +from typing import Optional + + +class EmbeddingConfig(object): + # Server + HOST = "0.0.0.0" + PORT = 6005 + + # Text embeddings (BGE-M3) + TEXT_MODEL_DIR = "Xorbits/bge-m3" + TEXT_DEVICE = "cuda" # "cuda" or "cpu" (model may fall back to CPU if needed) + TEXT_BATCH_SIZE = 32 + + # Image embeddings (CN-CLIP) + IMAGE_MODEL_NAME = "ViT-H-14" + IMAGE_DEVICE = None # type: Optional[str] # "cuda" / "cpu" / None(auto) + + # Service behavior + IMAGE_BATCH_SIZE = 8 + + +CONFIG = EmbeddingConfig() + + diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index f415b28..fec709e 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -1,7 +1,7 @@ """ Image embedding encoder using network service. -Generates embeddings via HTTP API service running on localhost:5001. +Generates embeddings via HTTP API service (default localhost:6005). """ import sys @@ -26,24 +26,25 @@ class CLIPImageEncoder: _instance = None _lock = threading.Lock() - def __new__(cls, service_url='http://localhost:5001'): + def __new__(cls, service_url: Optional[str] = None): with cls._lock: if cls._instance is None: cls._instance = super(CLIPImageEncoder, cls).__new__(cls) - logger.info(f"Creating CLIPImageEncoder instance with service URL: {service_url}") - cls._instance.service_url = service_url - cls._instance.endpoint = f"{service_url}/embedding/generate_image_embeddings" + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:6005") + logger.info(f"Creating CLIPImageEncoder instance with service URL: {resolved_url}") + cls._instance.service_url = resolved_url + cls._instance.endpoint = f"{resolved_url}/embed/image" return cls._instance - def _call_service(self, request_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _call_service(self, request_data: List[str]) -> List[Any]: """ Call the embedding service API. Args: - request_data: List of dictionaries with id and pic_url fields + request_data: List of image URLs / local file paths Returns: - List of dictionaries with id, pic_url, embedding and error fields + List of embeddings (list[float]) or nulls (None), aligned to input order """ try: response = requests.post( @@ -77,26 +78,11 @@ class CLIPImageEncoder: Embedding vector or None if failed """ try: - # Prepare request data - request_data = [{ - "id": "image_0", - "pic_url": url - }] - - # Call service - response_data = self._call_service(request_data) - - # Process response - if response_data and len(response_data) > 0: - response_item = response_data[0] - if response_item.get("embedding"): - return np.array(response_item["embedding"], dtype=np.float32) - else: - logger.warning(f"No embedding for URL {url}, error: {response_item.get('error', 'Unknown error')}") - return None - else: - logger.warning(f"No response for URL {url}") - return None + response_data = self._call_service([url]) + if response_data and len(response_data) > 0 and response_data[0] is not None: + return np.array(response_data[0], dtype=np.float32) + logger.warning(f"No embedding for URL {url}") + return None except Exception as e: logger.error(f"Failed to process image from URL {url}: {str(e)}", exc_info=True) @@ -137,32 +123,17 @@ class CLIPImageEncoder: batch_urls = url_images[i:i + batch_size] batch_indices = url_indices[i:i + batch_size] - # Prepare request data - request_data = [] - for j, url in enumerate(batch_urls): - request_data.append({ - "id": f"image_{j}", - "pic_url": url - }) - try: # Call service - response_data = self._call_service(request_data) + response_data = self._call_service(batch_urls) - # Process response + # Process response (aligned list) batch_results = [] for j, url in enumerate(batch_urls): - response_item = None - for item in response_data: - if str(item.get("id")) == f"image_{j}": - response_item = item - break - - if response_item and response_item.get("embedding"): - batch_results.append(np.array(response_item["embedding"], dtype=np.float32)) + if response_data and j < len(response_data) and response_data[j] is not None: + batch_results.append(np.array(response_data[j], dtype=np.float32)) else: - error_msg = response_item.get("error", "Unknown error") if response_item else "No response" - logger.warning(f"Failed to encode URL {url}: {error_msg}") + logger.warning(f"Failed to encode URL {url}: no embedding") batch_results.append(None) # Insert results at the correct positions diff --git a/embeddings/image_encoder__local.py b/embeddings/image_encoder__local.py deleted file mode 100644 index 0ecaf6d..0000000 --- a/embeddings/image_encoder__local.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Image embedding encoder using CN-CLIP model. - -Generates 1024-dimensional vectors for images using the CN-CLIP ViT-H-14 model. -""" - -import sys -import os -import io -import requests -import torch -import numpy as np -from PIL import Image -import logging -import threading -from typing import List, Optional, Union -import cn_clip.clip as clip -from cn_clip.clip import load_from_name - - -# DEFAULT_MODEL_NAME = "ViT-L-14-336" # ["ViT-B-16", "ViT-L-14", "ViT-L-14-336", "ViT-H-14", "RN50"] -DEFAULT_MODEL_NAME = "ViT-H-14" -MODEL_DOWNLOAD_DIR = "/data/tw/uat/EsSearcher" - - -class CLIPImageEncoder: - """ - CLIP Image Encoder for generating image embeddings using cn_clip. - - Thread-safe singleton pattern. - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls, model_name=DEFAULT_MODEL_NAME, device=None): - with cls._lock: - if cls._instance is None: - cls._instance = super(CLIPImageEncoder, cls).__new__(cls) - print(f"[CLIPImageEncoder] Creating new instance with model: {model_name}") - cls._instance._initialize_model(model_name, device) - return cls._instance - - def _initialize_model(self, model_name, device): - """Initialize the CLIP model using cn_clip""" - try: - self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") - self.model, self.preprocess = load_from_name( - model_name, - device=self.device, - download_root=MODEL_DOWNLOAD_DIR - ) - self.model.eval() - self.model_name = model_name - print(f"[CLIPImageEncoder] Model {model_name} initialized successfully on device {self.device}") - - except Exception as e: - print(f"[CLIPImageEncoder] Failed to initialize model: {str(e)}") - raise - - def validate_image(self, image_data: bytes) -> Image.Image: - """Validate image data and return PIL Image if valid""" - try: - image_stream = io.BytesIO(image_data) - image = Image.open(image_stream) - image.verify() - image_stream.seek(0) - image = Image.open(image_stream) - if image.mode != 'RGB': - image = image.convert('RGB') - return image - except Exception as e: - raise ValueError(f"Invalid image data: {str(e)}") - - def download_image(self, url: str, timeout: int = 10) -> bytes: - """Download image from URL""" - try: - if url.startswith(('http://', 'https://')): - response = requests.get(url, timeout=timeout) - if response.status_code != 200: - raise ValueError(f"HTTP {response.status_code}") - return response.content - else: - # Local file path - with open(url, 'rb') as f: - return f.read() - except Exception as e: - raise ValueError(f"Failed to download image from {url}: {str(e)}") - - def preprocess_image(self, image: Image.Image, max_size: int = 1024) -> Image.Image: - """Preprocess image for CLIP model""" - # Resize if too large - if max(image.size) > max_size: - ratio = max_size / max(image.size) - new_size = tuple(int(dim * ratio) for dim in image.size) - image = image.resize(new_size, Image.Resampling.LANCZOS) - return image - - def encode_text(self, text): - """Encode text to embedding vector using cn_clip""" - text_data = clip.tokenize([text] if type(text) == str else text).to(self.device) - with torch.no_grad(): - text_features = self.model.encode_text(text_data) - text_features /= text_features.norm(dim=-1, keepdim=True) - return text_features - - def encode_image(self, image: Image.Image) -> Optional[np.ndarray]: - """Encode image to embedding vector using cn_clip""" - if not isinstance(image, Image.Image): - raise ValueError("CLIPImageEncoder.encode_image Input must be a PIL.Image") - - try: - infer_data = self.preprocess(image).unsqueeze(0).to(self.device) - with torch.no_grad(): - image_features = self.model.encode_image(infer_data) - image_features /= image_features.norm(dim=-1, keepdim=True) - return image_features.cpu().numpy().astype('float32')[0] - except Exception as e: - print(f"Failed to process image. Reason: {str(e)}") - return None - - def encode_image_from_url(self, url: str) -> Optional[np.ndarray]: - """Complete pipeline: download, validate, preprocess and encode image from URL""" - try: - # Download image - image_data = self.download_image(url) - - # Validate image - image = self.validate_image(image_data) - - # Preprocess image - image = self.preprocess_image(image) - - # Encode image - embedding = self.encode_image(image) - - return embedding - - except Exception as e: - print(f"Error processing image from URL {url}: {str(e)}") - return None - - def encode_batch( - self, - images: List[Union[str, Image.Image]], - batch_size: int = 8 - ) -> List[Optional[np.ndarray]]: - """ - Encode a batch of images efficiently. - - Args: - images: List of image URLs or PIL Images - batch_size: Batch size for processing - - Returns: - List of embeddings (or None for failed images) - """ - results = [] - - for i in range(0, len(images), batch_size): - batch = images[i:i + batch_size] - batch_embeddings = [] - - for img in batch: - if isinstance(img, str): - # URL or file path - emb = self.encode_image_from_url(img) - elif isinstance(img, Image.Image): - # PIL Image - emb = self.encode_image(img) - else: - emb = None - - batch_embeddings.append(emb) - - results.extend(batch_embeddings) - - return results diff --git a/embeddings/server.py b/embeddings/server.py new file mode 100644 index 0000000..d26c4c5 --- /dev/null +++ b/embeddings/server.py @@ -0,0 +1,122 @@ +""" +Embedding service (FastAPI). + +API (simple list-in, list-out; aligned by index; failures -> null): +- POST /embed/text body: ["text1", "text2", ...] -> [[...], null, ...] +- POST /embed/image body: ["url_or_path1", ...] -> [[...], null, ...] +""" + +import threading +from typing import Any, Dict, List, Optional + +import numpy as np +from fastapi import FastAPI + +from embeddings.config import CONFIG +from embeddings.bge_model import BgeTextModel +from embeddings.clip_model import ClipImageModel + + +app = FastAPI(title="SearchEngine Embedding Service", version="1.0.0") + +_text_model = None +_image_model = None + +_text_init_lock = threading.Lock() +_image_init_lock = threading.Lock() + +_text_encode_lock = threading.Lock() +_image_encode_lock = threading.Lock() + + +def _get_text_model(): + global _text_model + if _text_model is None: + with _text_init_lock: + if _text_model is None: + _text_model = BgeTextModel(model_dir=CONFIG.TEXT_MODEL_DIR) + return _text_model + + +def _get_image_model(): + global _image_model + if _image_model is None: + with _image_init_lock: + if _image_model is None: + _image_model = ClipImageModel( + model_name=CONFIG.IMAGE_MODEL_NAME, + device=CONFIG.IMAGE_DEVICE, + ) + return _image_model + + +def _as_list(embedding: Optional[np.ndarray]) -> Optional[List[float]]: + if embedding is None: + return None + if not isinstance(embedding, np.ndarray): + embedding = np.array(embedding, dtype=np.float32) + if embedding.ndim != 1: + embedding = embedding.reshape(-1) + return embedding.astype(np.float32).tolist() + + +@app.get("/health") +def health() -> Dict[str, Any]: + return {"status": "ok"} + + +@app.post("/embed/text") +def embed_text(texts: List[str]) -> List[Optional[List[float]]]: + model = _get_text_model() + out: List[Optional[List[float]]] = [None] * len(texts) + + indexed_texts: List[tuple] = [] + for i, t in enumerate(texts): + if t is None: + continue + if not isinstance(t, str): + t = str(t) + t = t.strip() + if not t: + continue + indexed_texts.append((i, t)) + + if not indexed_texts: + return out + + batch_texts = [t for _, t in indexed_texts] + try: + with _text_encode_lock: + embs = model.encode_batch( + batch_texts, batch_size=int(CONFIG.TEXT_BATCH_SIZE), device=CONFIG.TEXT_DEVICE + ) + for j, (idx, _t) in enumerate(indexed_texts): + out[idx] = _as_list(embs[j]) + except Exception: + # keep Nones + pass + return out + + +@app.post("/embed/image") +def embed_image(images: List[str]) -> List[Optional[List[float]]]: + model = _get_image_model() + out: List[Optional[List[float]]] = [None] * len(images) + + with _image_encode_lock: + for i, url_or_path in enumerate(images): + try: + if url_or_path is None: + continue + if not isinstance(url_or_path, str): + url_or_path = str(url_or_path) + url_or_path = url_or_path.strip() + if not url_or_path: + continue + emb = model.encode_image_from_url(url_or_path) + out[i] = _as_list(emb) + except Exception: + out[i] = None + return out + + diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index c4ff1b3..3cc1158 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -1,7 +1,7 @@ """ Text embedding encoder using network service. -Generates embeddings via HTTP API service running on localhost:5001. +Generates embeddings via HTTP API service (default localhost:6005). """ import sys @@ -11,6 +11,7 @@ import threading import numpy as np import pickle import redis +import os from datetime import timedelta from typing import List, Union, Dict, Any, Optional import logging @@ -33,13 +34,14 @@ class BgeEncoder: _instance = None _lock = threading.Lock() - def __new__(cls, service_url='http://localhost:5001'): + def __new__(cls, service_url: Optional[str] = None): with cls._lock: if cls._instance is None: cls._instance = super(BgeEncoder, cls).__new__(cls) - logger.info(f"Creating BgeEncoder instance with service URL: {service_url}") - cls._instance.service_url = service_url - cls._instance.endpoint = f"{service_url}/embedding/generate_embeddings" + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:6005") + logger.info(f"Creating BgeEncoder instance with service URL: {resolved_url}") + cls._instance.service_url = resolved_url + cls._instance.endpoint = f"{resolved_url}/embed/text" # Initialize Redis cache try: @@ -62,15 +64,15 @@ class BgeEncoder: cls._instance.redis_client = None return cls._instance - def _call_service(self, request_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _call_service(self, request_data: List[str]) -> List[Any]: """ Call the embedding service API. Args: - request_data: List of dictionaries with id and text fields + request_data: List of texts Returns: - List of dictionaries with id and embedding fields + List of embeddings (list[float]) or nulls (None), aligned to input order """ try: response = requests.post( @@ -126,19 +128,7 @@ class BgeEncoder: uncached_texts.append(text) # Prepare request data for uncached texts (after cache check) - request_data = [] - for i, text in enumerate(uncached_texts): - request_item = { - "id": str(uncached_indices[i]), - "name_zh": text - } - - # Add English and Russian fields as empty for now - # Could be enhanced with language detection in the future - request_item["name_en"] = None - request_item["name_ru"] = None - - request_data.append(request_item) + request_data = list(uncached_texts) # If there are uncached texts, call service if uncached_texts: @@ -149,43 +139,27 @@ class BgeEncoder: # Process response for i, text in enumerate(uncached_texts): original_idx = uncached_indices[i] - # Find corresponding response by ID - response_item = None - for item in response_data: - if str(item.get("id")) == str(original_idx): - response_item = item - break - - if response_item: - # Try Chinese embedding first, then English, then Russian + if response_data and i < len(response_data): + embedding = response_data[i] + else: embedding = None - for lang in ["embedding_zh", "embedding_en", "embedding_ru"]: - if lang in response_item and response_item[lang] is not None: - embedding = response_item[lang] - break - if embedding is not None: - embedding_array = np.array(embedding, dtype=np.float32) - # Validate embedding from service - if invalid, treat as no result - if self._is_valid_embedding(embedding_array): - embeddings[original_idx] = embedding_array - # Cache the embedding - self._set_cached_embedding(text, 'en', embedding_array) - else: - logger.warning( - f"Invalid embedding returned from service for text {original_idx} " - f"(contains NaN/Inf or invalid shape), treating as no result. " - f"Text preview: {text[:50]}..." - ) - # 不生成兜底向量,保持为 None - embeddings[original_idx] = None + if embedding is not None: + embedding_array = np.array(embedding, dtype=np.float32) + # Validate embedding from service - if invalid, treat as no result + if self._is_valid_embedding(embedding_array): + embeddings[original_idx] = embedding_array + # Cache the embedding + self._set_cached_embedding(text, 'en', embedding_array) else: - logger.warning(f"No embedding found for text {original_idx}: {text[:50]}...") - # 不生成兜底向量,保持为 None + logger.warning( + f"Invalid embedding returned from service for text {original_idx} " + f"(contains NaN/Inf or invalid shape), treating as no result. " + f"Text preview: {text[:50]}..." + ) embeddings[original_idx] = None else: - logger.warning(f"No response found for text {original_idx}") - # 不生成兜底向量,保持为 None + logger.warning(f"No embedding found for text {original_idx}: {text[:50]}...") embeddings[original_idx] = None except Exception as e: diff --git a/embeddings/text_encoder__local.py b/embeddings/text_encoder__local.py deleted file mode 100644 index d2a893c..0000000 --- a/embeddings/text_encoder__local.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -Text embedding encoder using BGE-M3 model. - -Generates 1024-dimensional vectors for text using the BGE-M3 multilingual model. -""" - -import sys -import torch -from sentence_transformers import SentenceTransformer -import time -import threading -from modelscope import snapshot_download -from transformers import AutoModel -import os -import numpy as np -from typing import List, Union - - -class BgeEncoder: - """ - Singleton text encoder using BGE-M3 model. - - Thread-safe singleton pattern ensures only one model instance exists. - """ - _instance = None - _lock = threading.Lock() - - def __new__(cls, model_dir='Xorbits/bge-m3'): - with cls._lock: - if cls._instance is None: - cls._instance = super(BgeEncoder, cls).__new__(cls) - print(f"[BgeEncoder] Creating a new instance with model directory: {model_dir}") - cls._instance.model = SentenceTransformer(snapshot_download(model_dir)) - print("[BgeEncoder] New instance has been created") - return cls._instance - - def encode( - self, - sentences: Union[str, List[str]], - normalize_embeddings: bool = True, - device: str = 'cuda', - batch_size: int = 32 - ) -> np.ndarray: - """ - Encode text into embeddings. - - Args: - sentences: Single string or list of strings to encode - normalize_embeddings: Whether to normalize embeddings - device: Device to use ('cuda' or 'cpu') - batch_size: Batch size for encoding - - Returns: - numpy array of shape (n, 1024) containing embeddings - """ - # Move model to specified device - if device == 'gpu': - device = 'cuda' - - # Try requested device, fallback to CPU if CUDA fails - try: - if device == 'cuda': - # Check CUDA memory first - import torch - if torch.cuda.is_available(): - # Check if we have enough memory (at least 1GB free) - free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() - if free_memory < 1024 * 1024 * 1024: # 1GB - print(f"[BgeEncoder] CUDA memory insufficient ({free_memory/1024/1024:.1f}MB free), falling back to CPU") - device = 'cpu' - else: - print(f"[BgeEncoder] CUDA not available, using CPU") - device = 'cpu' - - self.model = self.model.to(device) - - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device=device, - show_progress_bar=False, - batch_size=batch_size - ) - - return embeddings - - except Exception as e: - print(f"[BgeEncoder] Device {device} failed: {e}") - if device != 'cpu': - print(f"[BgeEncoder] Falling back to CPU") - try: - self.model = self.model.to('cpu') - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device='cpu', - show_progress_bar=False, - batch_size=batch_size - ) - return embeddings - except Exception as e2: - print(f"[BgeEncoder] CPU also failed: {e2}") - raise - else: - raise - - def encode_batch( - self, - texts: List[str], - batch_size: int = 32, - device: str = 'cuda' - ) -> np.ndarray: - """ - Encode a batch of texts efficiently. - - Args: - texts: List of texts to encode - batch_size: Batch size for processing - device: Device to use - - Returns: - numpy array of embeddings - """ - return self.encode(texts, batch_size=batch_size, device=device) diff --git a/scripts/start_embedding_service.sh b/scripts/start_embedding_service.sh new file mode 100755 index 0000000..117b1b1 --- /dev/null +++ b/scripts/start_embedding_service.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Start Local Embedding Service +# +# This service exposes: +# - POST /embed/text +# - POST /embed/image +# +# Defaults are defined in `embeddings/config.py` +# +set -e + +cd "$(dirname "$0")/.." + +# Load conda env if available (keep consistent with other scripts) +if [ -f "/home/tw/miniconda3/etc/profile.d/conda.sh" ]; then + source /home/tw/miniconda3/etc/profile.d/conda.sh + conda activate searchengine +fi + +EMBEDDING_SERVICE_HOST=$(python -c "from embeddings.config import CONFIG; print(CONFIG.HOST)") +EMBEDDING_SERVICE_PORT=$(python -c "from embeddings.config import CONFIG; print(CONFIG.PORT)") + +echo "========================================" +echo "Starting Local Embedding Service" +echo "========================================" +echo "Host: ${EMBEDDING_SERVICE_HOST}" +echo "Port: ${EMBEDDING_SERVICE_PORT}" +echo +echo "Tips:" +echo " - Use a single worker (GPU models cannot be safely duplicated across workers)." +echo " - Clients can set EMBEDDING_SERVICE_URL=http://localhost:${EMBEDDING_SERVICE_PORT}" +echo + +exec python -m uvicorn embeddings.server:app \ + --host "${EMBEDDING_SERVICE_HOST}" \ + --port "${EMBEDDING_SERVICE_PORT}" \ + --workers 1 + + -- libgit2 0.21.2