From 775168412f4c1d83386da3d642ff77bd74ca645a Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 17 Mar 2026 14:05:05 +0800 Subject: [PATCH] tidy embeddings --- docs/DEVELOPER_GUIDE.md | 2 +- docs/MySQL到ES文档映射说明.md | 4 ++-- embeddings/README.md | 2 +- embeddings/qwen3_model.py | 75 --------------------------------------------------------------------------- embeddings/server.py | 8 ++++---- embeddings/tei_model.py | 126 ------------------------------------------------------------------------------------------------------------------------------ embeddings/text_embedding_sentence_transformers.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ embeddings/text_embedding_tei.py | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ embeddings/text_encoder.py | 27 +-------------------------- indexer/incremental_service.py | 2 +- indexer/product_enrich.py | 28 ++++++++++++++++++++++++++++ 11 files changed, 220 insertions(+), 236 deletions(-) delete mode 100644 embeddings/qwen3_model.py delete mode 100644 embeddings/tei_model.py create mode 100644 embeddings/text_embedding_sentence_transformers.py create mode 100644 embeddings/text_embedding_tei.py diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index c02103d..67166ee 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -315,7 +315,7 @@ services: **重排后端协议(服务内)**:所有在 reranker 服务内加载的后端须实现 `score_with_meta(query, docs, normalize=True) -> (scores: List[float], meta: dict)`。返回的 `scores[i]` 与 `docs[i]` 一一对应;meta 至少含 `input_docs`、`usable_docs`、`elapsed_ms` 等。对外 HTTP 契约固定:`POST /rerank` 请求体 `{ "query": str, "docs": [str] }`,响应体 `{ "scores": [float], "meta": object }`;`GET /health` 返回 `status`、`model`、`backend` 等。 -**向量化后端协议(服务内)**:文本后端需支持 `encode_batch(texts, batch_size, device) -> List[ndarray]`,与 texts 一一对应;图片后端实现 `embeddings/protocols.ImageEncoderProtocol`:`encode_image_urls(urls, batch_size) -> List[Optional[ndarray]]`,与 urls 等长。 +**向量化后端协议(服务内)**:文本后端需支持 `encode(sentences: Union[str, List[str]], batch_size, device) -> ndarray | List[ndarray]`,单条与批量输入统一通过一个接口处理;图片后端实现 `embeddings/protocols.ImageEncoderProtocol`:`encode_image_urls(urls, batch_size) -> List[Optional[ndarray]]`,与 urls 等长。 **配置速查**: diff --git a/docs/MySQL到ES文档映射说明.md b/docs/MySQL到ES文档映射说明.md index 31b530f..ff4cae2 100644 --- a/docs/MySQL到ES文档映射说明.md +++ b/docs/MySQL到ES文档映射说明.md @@ -679,7 +679,7 @@ if enable_embedding and encoder and documents: title_doc_indices.append(i) if title_texts: - embeddings = encoder.encode_batch(title_texts, batch_size=32) + embeddings = encoder.encode(title_texts, batch_size=32) for j, emb in enumerate(embeddings): doc_idx = title_doc_indices[j] if isinstance(emb, np.ndarray): @@ -731,7 +731,7 @@ if enable_embedding and encoder and documents: 7. **批量生成 Embedding**(如果启用) - 收集所有文档的标题文本 - - 批量调用 `encoder.encode_batch()` 生成 embedding + - 批量调用 `encoder.encode()`(传入 list[str])生成 embedding - 填充到对应文档 8. **批量写入 ES** diff --git a/embeddings/README.md b/embeddings/README.md index 730f515..934ac3d 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -10,7 +10,7 @@ 这个目录是一个完整的“向量化模块”,包含: - **HTTP 客户端**:`text_encoder.py` / `image_encoder.py`(供搜索/索引模块调用) -- **本地模型实现**:`qwen3_model.py` / `clip_model.py` +- **本地模型实现**:`text_embedding_sentence_transformers.py` / `clip_model.py` - **clip-as-service 客户端**:`clip_as_service_encoder.py`(图片向量,推荐) - **向量化服务(FastAPI)**:`server.py` - **统一配置**:`config.py` diff --git a/embeddings/qwen3_model.py b/embeddings/qwen3_model.py deleted file mode 100644 index 9e858f3..0000000 --- a/embeddings/qwen3_model.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Qwen3-Embedding-0.6B local text embedding implementation. - -Internal model implementation used by the embedding service. -""" - -import threading -from typing import List, Union - -import numpy as np -from sentence_transformers import SentenceTransformer -import torch - - -class Qwen3TextModel(object): - """ - Thread-safe singleton text encoder using Qwen3-Embedding-0.6B (local inference). - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls, model_id: str = "Qwen/Qwen3-Embedding-0.6B"): - with cls._lock: - if cls._instance is None: - cls._instance = super(Qwen3TextModel, cls).__new__(cls) - cls._instance.model = SentenceTransformer(model_id, trust_remote_code=True) - cls._instance._current_device = None - cls._instance._encode_lock = threading.Lock() - return cls._instance - - def _ensure_device(self, device: str) -> str: - target = (device or "cpu").strip().lower() - if target == "gpu": - target = "cuda" - if target == "cuda" and not torch.cuda.is_available(): - target = "cpu" - if target != self._current_device: - self.model = self.model.to(target) - self._current_device = target - return target - - def encode( - self, - sentences: Union[str, List[str]], - normalize_embeddings: bool = True, - device: str = "cuda", - batch_size: int = 32, - ) -> np.ndarray: - # SentenceTransformer + CUDA inference is not thread-safe in our usage; - # keep one in-flight encode call while avoiding repeated .to(device) hops. - with self._encode_lock: - run_device = self._ensure_device(device) - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device=run_device, - show_progress_bar=False, - batch_size=batch_size, - ) - return embeddings - - def encode_batch( - self, - texts: List[str], - batch_size: int = 32, - device: str = "cuda", - normalize_embeddings: bool = True, - ) -> np.ndarray: - return self.encode( - texts, - batch_size=batch_size, - device=device, - normalize_embeddings=normalize_embeddings, - ) diff --git a/embeddings/server.py b/embeddings/server.py index 4aafcd8..9aa8579 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -83,7 +83,7 @@ def _preview_inputs(items: List[str], max_items: int, max_chars: int) -> List[Di def _encode_local_st(texts: List[str], normalize_embeddings: bool) -> Any: with _text_encode_lock: - return _text_model.encode_batch( + return _text_model.encode( texts, batch_size=int(CONFIG.TEXT_BATCH_SIZE), device=CONFIG.TEXT_DEVICE, @@ -198,7 +198,7 @@ def load_models(): backend_name, backend_cfg = get_embedding_backend_config() _text_backend_name = backend_name if backend_name == "tei": - from embeddings.tei_model import TEITextModel + from embeddings.text_embedding_tei import TEITextModel base_url = ( os.getenv("TEI_BASE_URL") @@ -216,7 +216,7 @@ def load_models(): timeout_sec=timeout_sec, ) elif backend_name == "local_st": - from embeddings.qwen3_model import Qwen3TextModel + from embeddings.text_embedding_sentence_transformers import Qwen3TextModel model_id = ( os.getenv("TEXT_MODEL_ID") @@ -342,7 +342,7 @@ def embed_text(texts: List[str], normalize: Optional[bool] = None) -> List[Optio return out embs = _encode_local_st(normalized, normalize_embeddings=False) else: - embs = _text_model.encode_batch( + embs = _text_model.encode( normalized, batch_size=int(CONFIG.TEXT_BATCH_SIZE), device=CONFIG.TEXT_DEVICE, diff --git a/embeddings/tei_model.py b/embeddings/tei_model.py deleted file mode 100644 index 529445a..0000000 --- a/embeddings/tei_model.py +++ /dev/null @@ -1,126 +0,0 @@ -"""TEI text embedding backend client.""" - -from __future__ import annotations - -from typing import Any, List, Union - -import numpy as np -import requests - - -class TEITextModel: - """ - Text embedding backend implemented via Hugging Face TEI HTTP API. - - Expected TEI endpoint: - POST {base_url}/embed - body: {"inputs": ["text1", "text2", ...]} - response: [[...], [...], ...] - """ - - def __init__(self, base_url: str, timeout_sec: int = 60): - if not base_url or not str(base_url).strip(): - raise ValueError("TEI base_url must not be empty") - self.base_url = str(base_url).rstrip("/") - self.endpoint = f"{self.base_url}/embed" - self.timeout_sec = int(timeout_sec) - self._health_check() - - def _health_check(self) -> None: - health_url = f"{self.base_url}/health" - response = requests.get(health_url, timeout=5) - response.raise_for_status() - # Probe one tiny embedding at startup so runtime requests do not fail later - # with opaque "Invalid TEI embedding" errors. - probe_resp = requests.post( - self.endpoint, - json={"inputs": ["health check"]}, - timeout=min(self.timeout_sec, 15), - ) - probe_resp.raise_for_status() - self._parse_payload(probe_resp.json(), expected_len=1) - - @staticmethod - def _normalize(embedding: np.ndarray) -> np.ndarray: - norm = np.linalg.norm(embedding) - if norm <= 0: - raise RuntimeError("TEI returned zero-norm embedding") - return embedding / norm - - def encode( - self, - sentences: Union[str, List[str]], - normalize_embeddings: bool = True, - device: str = "cuda", - batch_size: int = 32, - ) -> np.ndarray: - if isinstance(sentences, str): - sentences = [sentences] - return self.encode_batch( - texts=sentences, - batch_size=batch_size, - device=device, - normalize_embeddings=normalize_embeddings, - ) - - def encode_batch( - self, - texts: List[str], - batch_size: int = 32, - device: str = "cuda", - normalize_embeddings: bool = True, - ) -> np.ndarray: - del batch_size # TEI performs its own batching. - del device # Not used by HTTP backend. - - if texts is None or len(texts) == 0: - return np.array([], dtype=object) - for i, t in enumerate(texts): - if not isinstance(t, str) or not t.strip(): - raise ValueError(f"Invalid input text at index {i}: {t!r}") - - response = requests.post( - self.endpoint, - json={"inputs": texts}, - timeout=self.timeout_sec, - ) - response.raise_for_status() - payload = response.json() - vectors = self._parse_payload(payload, expected_len=len(texts)) - if normalize_embeddings: - vectors = [self._normalize(vec) for vec in vectors] - return np.array(vectors, dtype=object) - - def _parse_payload(self, payload: Any, expected_len: int) -> List[np.ndarray]: - if not isinstance(payload, list) or len(payload) != expected_len: - got = 0 if payload is None else (len(payload) if isinstance(payload, list) else "non-list") - raise RuntimeError( - f"TEI response length mismatch: expected {expected_len}, got {got}. " - f"Response type={type(payload).__name__}" - ) - - vectors: List[np.ndarray] = [] - for i, item in enumerate(payload): - emb = item.get("embedding") if isinstance(item, dict) else item - try: - vec = np.asarray(emb, dtype=np.float32) - except (TypeError, ValueError) as exc: - raise RuntimeError( - f"Invalid TEI embedding at index {i}: cannot convert to float array " - f"(item_type={type(item).__name__})" - ) from exc - - if vec.ndim != 1 or vec.size == 0: - raise RuntimeError( - f"Invalid TEI embedding at index {i}: shape={vec.shape}, size={vec.size}" - ) - if not np.isfinite(vec).all(): - preview = vec[:8].tolist() - raise RuntimeError( - f"Invalid TEI embedding at index {i}: contains non-finite values, " - f"preview={preview}. This often indicates TEI backend/model runtime issues " - f"(for example an incompatible dtype or model config)." - ) - vectors.append(vec) - return vectors - diff --git a/embeddings/text_embedding_sentence_transformers.py b/embeddings/text_embedding_sentence_transformers.py new file mode 100644 index 0000000..c3e23a6 --- /dev/null +++ b/embeddings/text_embedding_sentence_transformers.py @@ -0,0 +1,63 @@ +""" +Qwen3-Embedding-0.6B local text embedding implementation. + +Internal model implementation used by the embedding service. +""" + +import threading +from typing import List, Union + +import numpy as np +from sentence_transformers import SentenceTransformer +import torch + + +class Qwen3TextModel(object): + """ + Thread-safe singleton text encoder using Qwen3-Embedding-0.6B (local inference). + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, model_id: str = "Qwen/Qwen3-Embedding-0.6B"): + with cls._lock: + if cls._instance is None: + cls._instance = super(Qwen3TextModel, cls).__new__(cls) + cls._instance.model = SentenceTransformer(model_id, trust_remote_code=True) + cls._instance._current_device = None + cls._instance._encode_lock = threading.Lock() + return cls._instance + + def _ensure_device(self, device: str) -> str: + target = (device or "cpu").strip().lower() + if target == "gpu": + target = "cuda" + if target == "cuda" and not torch.cuda.is_available(): + target = "cpu" + if target != self._current_device: + self.model = self.model.to(target) + self._current_device = target + return target + + def encode( + self, + sentences: Union[str, List[str]], + normalize_embeddings: bool = True, + device: str = "cuda", + batch_size: int = 32, + ) -> np.ndarray: + + # SentenceTransformer + CUDA inference is not thread-safe in our usage; + # keep one in-flight encode call while avoiding repeated .to(device) hops. + with self._encode_lock: + run_device = self._ensure_device(device) + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device=run_device, + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings + diff --git a/embeddings/text_embedding_tei.py b/embeddings/text_embedding_tei.py new file mode 100644 index 0000000..908fac9 --- /dev/null +++ b/embeddings/text_embedding_tei.py @@ -0,0 +1,119 @@ +"""TEI text embedding backend client.""" + +from __future__ import annotations + +from typing import Any, List, Union + +import numpy as np +import requests + + +class TEITextModel: + """ + Text embedding backend implemented via Hugging Face TEI HTTP API. + + Expected TEI endpoint: + POST {base_url}/embed + body: {"inputs": ["text1", "text2", ...]} + response: [[...], [...], ...] + """ + + def __init__(self, base_url: str, timeout_sec: int = 60): + if not base_url or not str(base_url).strip(): + raise ValueError("TEI base_url must not be empty") + self.base_url = str(base_url).rstrip("/") + self.endpoint = f"{self.base_url}/embed" + self.timeout_sec = int(timeout_sec) + self._health_check() + + def _health_check(self) -> None: + health_url = f"{self.base_url}/health" + response = requests.get(health_url, timeout=5) + response.raise_for_status() + # Probe one tiny embedding at startup so runtime requests do not fail later + # with opaque "Invalid TEI embedding" errors. + probe_resp = requests.post( + self.endpoint, + json={"inputs": ["health check"]}, + timeout=min(self.timeout_sec, 15), + ) + probe_resp.raise_for_status() + self._parse_payload(probe_resp.json(), expected_len=1) + + @staticmethod + def _normalize(embedding: np.ndarray) -> np.ndarray: + norm = np.linalg.norm(embedding) + if norm <= 0: + raise RuntimeError("TEI returned zero-norm embedding") + return embedding / norm + + def encode( + self, + sentences: Union[str, List[str]], + normalize_embeddings: bool = True, + device: str = "cuda", + batch_size: int = 32, + ) -> np.ndarray: + """ + Encode a single sentence or a list of sentences. + + TEI HTTP 后端天然是批量接口,这里统一通过 encode 处理单条和批量输入, + 不再额外暴露 encode_batch。 + """ + + if isinstance(sentences, str): + texts: List[str] = [sentences] + else: + texts = sentences + + if texts is None or len(texts) == 0: + return np.array([], dtype=object) + for i, t in enumerate(texts): + if not isinstance(t, str) or not t.strip(): + raise ValueError(f"Invalid input text at index {i}: {t!r}") + + response = requests.post( + self.endpoint, + json={"inputs": texts}, + timeout=self.timeout_sec, + ) + response.raise_for_status() + payload = response.json() + vectors = self._parse_payload(payload, expected_len=len(texts)) + if normalize_embeddings: + vectors = [self._normalize(vec) for vec in vectors] + return np.array(vectors, dtype=object) + + def _parse_payload(self, payload: Any, expected_len: int) -> List[np.ndarray]: + if not isinstance(payload, list) or len(payload) != expected_len: + got = 0 if payload is None else (len(payload) if isinstance(payload, list) else "non-list") + raise RuntimeError( + f"TEI response length mismatch: expected {expected_len}, got {got}. " + f"Response type={type(payload).__name__}" + ) + + vectors: List[np.ndarray] = [] + for i, item in enumerate(payload): + emb = item.get("embedding") if isinstance(item, dict) else item + try: + vec = np.asarray(emb, dtype=np.float32) + except (TypeError, ValueError) as exc: + raise RuntimeError( + f"Invalid TEI embedding at index {i}: cannot convert to float array " + f"(item_type={type(item).__name__})" + ) from exc + + if vec.ndim != 1 or vec.size == 0: + raise RuntimeError( + f"Invalid TEI embedding at index {i}: shape={vec.shape}, size={vec.size}" + ) + if not np.isfinite(vec).all(): + preview = vec[:8].tolist() + raise RuntimeError( + f"Invalid TEI embedding at index {i}: contains non-finite values, " + f"preview={preview}. This often indicates TEI backend/model runtime issues " + f"(for example an incompatible dtype or model config)." + ) + vectors.append(vec) + return vectors + diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 04004c9..ee54a46 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -135,33 +135,8 @@ class TextEmbeddingEncoder: else: raise ValueError(f"No embedding found for text index {original_idx}: {text[:50]}...") - # 返回 numpy 数组(dtype=object),元素为 np.ndarray 或 None + # 返回 numpy 数组(dtype=object),元素均为有效 np.ndarray 向量 return np.array(embeddings, dtype=object) - - def encode_batch( - self, - texts: List[str], - batch_size: int = 32, - device: str = 'cpu', - normalize_embeddings: bool = True, - ) -> np.ndarray: - """ - Encode a batch of texts efficiently via network service. - - Args: - texts: List of texts to encode - batch_size: Batch size for processing - device: Device parameter ignored for service compatibility - - Returns: - numpy array of embeddings - """ - return self.encode( - texts, - batch_size=batch_size, - device=device, - normalize_embeddings=normalize_embeddings, - ) def _is_valid_embedding(self, embedding: np.ndarray) -> bool: """ diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 257403e..a00d880 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -641,7 +641,7 @@ class IncrementalIndexerService: title_doc_indices.append(i) if title_texts: - embeddings = encoder.encode_batch(title_texts, batch_size=32) + embeddings = encoder.encode(title_texts, batch_size=32) if embeddings is None or len(embeddings) != len(title_texts): raise RuntimeError( f"[IncrementalIndexing] Batch embedding length mismatch for tenant_id={tenant_id}: " diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py index 1e14d44..b046a56 100644 --- a/indexer/product_enrich.py +++ b/indexer/product_enrich.py @@ -96,6 +96,9 @@ except Exception as e: logger.warning(f"Failed to initialize Redis for anchors cache: {e}") _anchor_redis = None +# 中文版本提示词(请勿删除): +# "你是一名电商平台的商品标注员,你的工作是对输入的每个商品进行理解、分析和标注," +# "并按要求格式返回 Markdown 表格。所有输出内容必须为中文。" SYSTEM_MESSAGES = ( "You are a product annotator for an e-commerce platform. " @@ -163,6 +166,31 @@ def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> st """ lang_name = SOURCE_LANG_CODE_MAP.get(target_lang, target_lang) +# 中文版本提示词(请勿删除) +# prompt = """请对输入的每条商品标题,分析并提取以下信息: + +# 1. 商品标题:将输入商品名称翻译为自然、完整的中文商品标题 +# 2. 品类路径:从大类到细分品类,用">"分隔(例如:服装>女装>裤子>工装裤) +# 3. 细分标签:商品的风格、特点、功能等(例如:碎花,收腰,法式) +# 4. 适用人群:性别/年龄段等(例如:年轻女性) +# 5. 使用场景 +# 6. 适用季节 +# 7. 关键属性 +# 8. 材质说明 +# 9. 功能特点 +# 10. 商品卖点:分析和提取一句话核心卖点,用于推荐理由 +# 11. 锚文本:生成一组能够代表该商品、并可能被用户用于搜索的词语或短语。这些词语应覆盖用户需求的各个维度,如品类、细分标签、功能特性、需求场景等等。 + +# 输入商品列表: + +# """ +# prompt_tail = """ +# 请严格按照以下markdown表格格式返回,每列内部的多值内容都用逗号分隔,不要添加任何其他说明: + +# | 序号 | 商品标题 | 品类路径 | 细分标签 | 适用人群 | 使用场景 | 适用季节 | 关键属性 | 材质说明 | 功能特点 | 商品卖点 | 锚文本 | +# |----|----|----|----|----|----|----|----|----|----|----|----| +# """ + prompt = """Please analyze each input product title and extract the following information: 1. Product title: a natural English product name derived from the input title -- libgit2 0.21.2