From 7a013ca7b2030bf2b14b92eeb9f5282fea304300 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 26 Mar 2026 20:46:24 +0800 Subject: [PATCH] 多模态文本向量服务ok --- docs/CNCLIP_SERVICE说明文档.md | 18 +++++++++++++++--- docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md | 43 ++++++++++++++++++++++++++++++++++++------- embeddings/README.md | 10 ++++++---- embeddings/cache_keys.py | 11 +++++++++-- embeddings/clip_as_service_encoder.py | 38 ++++++++++++++++++++++++++++++++++++++ embeddings/clip_model.py | 25 +++++++++++++++++++++++++ embeddings/image_encoder.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- embeddings/protocols.py | 14 ++++++++++++++ embeddings/server.py | 345 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------ scripts/es_debug_search.py | 40 ++++++---------------------------------- scripts/service_ctl.sh | 2 +- tests/ci/test_service_api_contracts.py | 15 +++++++++++++++ 12 files changed, 493 insertions(+), 179 deletions(-) diff --git a/docs/CNCLIP_SERVICE说明文档.md b/docs/CNCLIP_SERVICE说明文档.md index 5629688..826318b 100644 --- a/docs/CNCLIP_SERVICE说明文档.md +++ b/docs/CNCLIP_SERVICE说明文档.md @@ -4,9 +4,11 @@ ## 1. 作用与边界 -- `cnclip` 是独立 gRPC 服务(默认 `grpc://127.0.0.1:51000`)。 -- 图片 embedding 服务(默认 `6008`)在 `USE_CLIP_AS_SERVICE=true` 时调用它完成 `/embed/image`。 -- `cnclip` 不负责文本向量;文本向量由 TEI(8080)负责。 +- `cnclip` 是独立 gRPC 服务(默认 `grpc://127.0.0.1:51000`),底层为 **Chinese-CLIP**:**图像与文本在同一向量空间**(图文可互检)。 +- 图片 embedding 服务(默认 `6008`)在 `image_backend: clip_as_service` 时通过 gRPC 调用它完成: + - `POST /embed/image`:图片 URL / 本地路径 → 图向量; + - `POST /embed/clip_text`:**自然语言短语** → 文本塔向量(与上图向量同空间,用于 `image_embedding` 检索、以文搜图等)。 +- **语义检索用的文本向量**(`title_embedding`、query 语义召回)仍由 **TEI(8080)+ `POST /embed/text`(6005)** 负责,与 CN-CLIP 不是同一模型、不是同一向量空间;请勿混淆。 ## 2. 代码与脚本入口 @@ -138,6 +140,8 @@ cat third-party/clip-as-service/server/torch-flow-temp.yml ### 7.3 发送一次编码请求(触发模型加载) +**gRPC 直连(文本或图片 URL 混传时由 client 自动区分):** + ```bash PYTHONPATH="third-party/clip-as-service/client:${PYTHONPATH}" \ NO_VERSION_CHECK=1 \ @@ -151,6 +155,14 @@ PY 预期 `shape` 为 `(1, 1024)`。 +**HTTP(推荐业务侧):图片走 6008 `/embed/image`,纯文本走 `/embed/clip_text`(勿把 `http://` 图 URL 发到 `clip_text`):** + +```bash +curl -sS -X POST "http://127.0.0.1:6008/embed/clip_text?normalize=true&priority=1" \ + -H "Content-Type: application/json" \ + -d '["纯棉T恤", "芭比娃娃"]' +``` + ### 7.4 GPU 验证 ```bash diff --git a/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md b/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md index 274f6eb..de8b10e 100644 --- a/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md +++ b/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md @@ -9,7 +9,7 @@ | 服务 | 默认端口 | Base URL | 说明 | |------|----------|----------|------| | 向量服务(文本) | 6005 | `http://localhost:6005` | 文本向量化,用于 query/doc 语义检索 | -| 向量服务(图片) | 6008 | `http://localhost:6008` | 图片向量化,用于以图搜图 | +| 向量服务(图片 / 多模态 CN-CLIP) | 6008 | `http://localhost:6008` | 图片向量 `/embed/image`;同空间文本向量 `/embed/clip_text`(以文搜图等) | | 翻译服务 | 6006 | `http://localhost:6006` | 多语言翻译(云端与本地模型统一入口) | | 重排服务 | 6007 | `http://localhost:6007` | 对检索结果进行二次排序 | @@ -100,7 +100,35 @@ curl -X POST "http://localhost:6008/embed/image?normalize=true&priority=1" \ 在线以图搜图等实时场景可传 `priority=1`;离线索引回填保持默认 `priority=0`。 -#### 7.1.3 `GET /health` — 健康检查 +#### 7.1.3 `POST /embed/clip_text` — CN-CLIP 文本多模态向量(与图片同空间) + +将**自然语言短语**编码为向量,与 `POST /embed/image` 输出的图向量**处于同一向量空间**(Chinese-CLIP 文本塔 / 图塔),用于 **以文搜图**、与 ES `image_embedding` 对齐的 KNN 等。 + +与 **7.1.1** 的 `POST /embed/text`(TEI/BGE,语义检索)**不是同一模型、不是同一空间**,请勿混用。 + +**请求体**(JSON 数组,每项为字符串;**不要**传入 `http://` / `https://` 图片 URL,图片请用 `/embed/image`): + +```json +["纯棉短袖T恤", "芭比娃娃连衣裙"] +``` + +**响应**(JSON 数组,与输入一一对应): + +```json +[[0.01, -0.02, ...], [0.03, 0.01, ...], ...] +``` + +**curl 示例**: + +```bash +curl -X POST "http://localhost:6008/embed/clip_text?normalize=true&priority=1" \ + -H "Content-Type: application/json" \ + -d '["纯棉短袖", "street tee"]' +``` + +说明:与 `/embed/image` 共用图片侧限流与 `IMAGE_MAX_INFLIGHT`;Redis 缓存键 namespace 为 `clip_text`,与 TEI 文本缓存区分。 + +#### 7.1.4 `GET /health` — 健康检查 ```bash curl "http://localhost:6005/health" @@ -110,18 +138,18 @@ curl "http://localhost:6008/health" 返回中会包含: - `service_kind`:`text` / `image` / `all` -- `cache_enabled`:text/image Redis 缓存是否可用 +- `cache_enabled`:text/image/clip_text Redis 缓存是否可用 - `limits`:当前 inflight limit、active、rejected_total 等 - `stats`:request_total、cache_hits、cache_misses、avg_latency_ms 等 -#### 7.1.4 `GET /ready` — 就绪检查 +#### 7.1.5 `GET /ready` — 就绪检查 ```bash curl "http://localhost:6005/ready" curl "http://localhost:6008/ready" ``` -#### 7.1.5 缓存与限流说明 +#### 7.1.6 缓存与限流说明 - 文本与图片都会先查 Redis 向量缓存。 - Redis 中 value 仍是 **BF16 bytes**,读取后恢复成 `float32` 返回。 @@ -130,8 +158,9 @@ curl "http://localhost:6008/ready" - 当服务端发现超过 `TEXT_MAX_INFLIGHT` / `IMAGE_MAX_INFLIGHT` 时,会直接拒绝,而不是无限排队。 - 其中 `POST /embed/text` 的 `priority=0` 会按上面的 inflight 规则直接拒绝;`priority>0` 不会被 admission 拒绝,但仍计入 inflight,并在服务端排队时优先于 `priority=0` 请求。 - `POST /embed/image` 的 `priority=0` 受 `IMAGE_MAX_INFLIGHT` 约束;`priority>0` 不会被 admission 拒绝,但仍计入 inflight(无插队)。 +- `POST /embed/clip_text` 与 `/embed/image` 共用同一后端与 `IMAGE_MAX_INFLIGHT`(计入图片侧并发)。 -#### 7.1.6 TEI 统一调优建议(主服务) +#### 7.1.7 TEI 统一调优建议(主服务) 使用单套主服务即可同时兼顾: - 在线 query 向量化(低延迟,常见 `batch=1~4`) @@ -147,7 +176,7 @@ curl "http://localhost:6008/ready" 默认端口: - TEI: `http://127.0.0.1:8080` - 文本向量服务(`/embed/text`): `http://127.0.0.1:6005` -- 图片向量服务(`/embed/image`): `http://127.0.0.1:6008` +- 图片向量服务(`/embed/image`、`/embed/clip_text`): `http://127.0.0.1:6008` 当前主 TEI 启动默认值(已按 T4/短文本场景调优): - `TEI_MAX_BATCH_TOKENS=4096` diff --git a/embeddings/README.md b/embeddings/README.md index c150750..e0df5d5 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -16,7 +16,7 @@ - **clip-as-service 客户端**:`clip_as_service_encoder.py`(图片向量,推荐) - **向量化服务(FastAPI)**:`server.py` - **统一配置**:`config.py` -- **接口契约**:`protocols.ImageEncoderProtocol`(图片编码统一为 `encode_image_urls(urls, batch_size, normalize_embeddings)`,本地 CN-CLIP 与 clip-as-service 均实现该接口) +- **接口契约**:`protocols.ImageEncoderProtocol`(`encode_image_urls` + `encode_clip_texts`;本地 CN-CLIP 与 clip-as-service 均实现) 说明:历史上的云端 embedding 试验实现(DashScope)已从主仓库移除。当前默认部署为**文本服务 6005** 与**图片服务 6008** 两条独立链路;`all` 模式仅作为单进程调试入口。 @@ -36,8 +36,9 @@ - 返回:`[[...], [...], ...]` - 健康接口:`GET /health`、`GET /ready` - 图片服务(默认 `6008`) - - `POST /embed/image` - - 请求体:`["url或本地路径1", ...]` + - `POST /embed/image`:图片 URL 或本地路径 + - `POST /embed/clip_text`:自然语言(CN-CLIP 文本塔,与 `/embed/image` 同向量空间;勿传 `http://` 图 URL) + - 请求体:`["...", ...]` 字符串数组 - 可选 query 参数:`normalize=true|false`、`priority=0|1` - 返回:`[[...], [...], ...]` - 健康接口:`GET /health`、`GET /ready` @@ -51,8 +52,9 @@ - client 侧:`text_encoder.py` / `image_encoder.py` - service 侧:`server.py` - 当前主 key 格式: - - 文本:`embedding:embed:norm{0|1}:{text}` + - 文本(TEI):`embedding:embed:norm{0|1}:{text}` - 图片:`embedding:image:embed:norm{0|1}:{url_or_path}` + - CN-CLIP 文本:`embedding:clip_text:clip_mm:norm{0|1}:{text}` - 当前实现不再兼容历史 key 规则,只保留这一套格式,减少代码路径和缓存歧义。 ### 压力隔离与拒绝策略 diff --git a/embeddings/cache_keys.py b/embeddings/cache_keys.py index cbe2e3a..1680263 100644 --- a/embeddings/cache_keys.py +++ b/embeddings/cache_keys.py @@ -1,8 +1,9 @@ """Shared cache key helpers for embedding inputs. Current canonical raw-key format: -- text: ``embed:norm1:`` / ``embed:norm0:`` -- image: ``embed:norm1:`` / ``embed:norm0:`` +- text (TEI/BGE): ``embed:norm1:`` / ``embed:norm0:`` +- image (CLIP): ``embed:norm1:`` / ``embed:norm0:`` +- clip_text (CN-CLIP 文本,与图同空间): ``clip_mm:norm1:`` / ``clip_mm:norm0:`` `RedisEmbeddingCache` adds the configured key prefix and optional namespace on top. """ @@ -18,3 +19,9 @@ def build_text_cache_key(text: str, *, normalize: bool) -> str: def build_image_cache_key(url: str, *, normalize: bool) -> str: normalized_url = str(url or "").strip() return f"embed:norm{1 if normalize else 0}:{normalized_url}" + + +def build_clip_text_cache_key(text: str, *, normalize: bool) -> str: + """CN-CLIP / multimodal text (same vector space as /embed/image).""" + normalized_text = str(text or "").strip() + return f"clip_mm:norm{1 if normalize else 0}:{normalized_text}" diff --git a/embeddings/clip_as_service_encoder.py b/embeddings/clip_as_service_encoder.py index de41b98..3f786f0 100644 --- a/embeddings/clip_as_service_encoder.py +++ b/embeddings/clip_as_service_encoder.py @@ -127,3 +127,41 @@ class ClipAsServiceImageEncoder: if not results: raise RuntimeError("clip-as-service returned empty result for single image URL") return results[0] + + def encode_clip_texts( + self, + texts: List[str], + batch_size: Optional[int] = None, + normalize_embeddings: bool = True, + ) -> List[np.ndarray]: + """ + CN-CLIP 文本塔:与 encode_image_urls 输出同一向量空间(图文检索 / image_embedding)。 + + 仅传入自然语言字符串;HTTP 侧见 ``POST /embed/clip_text``。 + """ + if not texts: + return [] + bs = batch_size if batch_size is not None else self._batch_size + arr = self._client.encode( + texts, + batch_size=bs, + show_progress=self._show_progress, + ) + if arr is None or not hasattr(arr, "shape"): + raise RuntimeError("clip-as-service encode (text) returned empty result") + if len(arr) != len(texts): + raise RuntimeError( + f"clip-as-service text encode length mismatch: expected {len(texts)}, got {len(arr)}" + ) + out: List[np.ndarray] = [] + for row in arr: + vec = np.asarray(row, dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError("clip-as-service returned invalid text embedding vector") + if normalize_embeddings: + norm = float(np.linalg.norm(vec)) + if not np.isfinite(norm) or norm <= 0.0: + raise RuntimeError("clip-as-service returned zero/invalid norm vector") + vec = vec / norm + out.append(vec) + return out diff --git a/embeddings/clip_model.py b/embeddings/clip_model.py index d01dd71..b35766f 100644 --- a/embeddings/clip_model.py +++ b/embeddings/clip_model.py @@ -131,3 +131,28 @@ class ClipImageModel(object): else: raise ValueError(f"Unsupported image input type: {type(img)!r}") return results + + def encode_clip_texts( + self, + texts: List[str], + batch_size: Optional[int] = None, + normalize_embeddings: bool = True, + ) -> List[np.ndarray]: + """ + CN-CLIP 文本塔向量,与 encode_image 同空间;供 ``POST /embed/clip_text`` 使用。 + """ + if not texts: + return [] + bs = batch_size or 8 + out: List[np.ndarray] = [] + for i in range(0, len(texts), bs): + batch = texts[i : i + bs] + text_data = clip.tokenize(batch).to(self.device) + with torch.no_grad(): + feats = self.model.encode_text(text_data) + if normalize_embeddings: + feats = feats / feats.norm(dim=-1, keepdim=True) + arr = feats.cpu().numpy().astype("float32") + for row in arr: + out.append(np.asarray(row, dtype=np.float32)) + return out diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index bd754f2..f7f058f 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -10,8 +10,8 @@ from PIL import Image logger = logging.getLogger(__name__) from config.loader import get_app_config -from config.services_config import get_embedding_image_base_url -from embeddings.cache_keys import build_image_cache_key +from config.services_config import get_embedding_image_backend_config, get_embedding_image_base_url +from embeddings.cache_keys import build_clip_text_cache_key, build_image_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache from request_log_context import build_downstream_request_headers, build_request_log_extra @@ -28,6 +28,7 @@ class CLIPImageEncoder: redis_config = get_app_config().infrastructure.redis self.service_url = str(resolved_url).rstrip("/") self.endpoint = f"{self.service_url}/embed/image" + self.clip_text_endpoint = f"{self.service_url}/embed/clip_text" # Reuse embedding cache prefix, but separate namespace for images to avoid collisions. self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding" logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url) @@ -35,6 +36,10 @@ class CLIPImageEncoder: key_prefix=self.cache_prefix, namespace="image", ) + self._clip_text_cache = RedisEmbeddingCache( + key_prefix=self.cache_prefix, + namespace="clip_text", + ) def _call_service( self, @@ -84,6 +89,108 @@ class CLIPImageEncoder: ) raise + def _clip_text_via_grpc( + self, + request_data: List[str], + normalize_embeddings: bool, + ) -> List[Any]: + """旧版 6008 无 ``/embed/clip_text`` 时走 gRPC(需 ``image_backend: clip_as_service``)。""" + backend, cfg = get_embedding_image_backend_config() + if backend != "clip_as_service": + raise RuntimeError( + "POST /embed/clip_text 返回 404:请重启图片向量服务(6008)以加载新路由;" + "或配置 services.embedding.image_backend=clip_as_service 并启动 grpc cnclip。" + ) + from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder + from embeddings.config import CONFIG + + enc = ClipAsServiceImageEncoder( + server=str(cfg.get("server") or CONFIG.CLIP_AS_SERVICE_SERVER), + batch_size=int(cfg.get("batch_size") or CONFIG.IMAGE_BATCH_SIZE), + ) + arrs = enc.encode_clip_texts( + request_data, + batch_size=len(request_data), + normalize_embeddings=normalize_embeddings, + ) + return [v.tolist() for v in arrs] + + def _call_clip_text_service( + self, + request_data: List[str], + normalize_embeddings: bool = True, + priority: int = 1, + request_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> List[Any]: + response = None + try: + response = requests.post( + self.clip_text_endpoint, + params={ + "normalize": "true" if normalize_embeddings else "false", + "priority": max(0, int(priority)), + }, + json=request_data, + headers=build_downstream_request_headers(request_id=request_id, user_id=user_id), + timeout=60, + ) + if response.status_code == 404: + logger.warning( + "POST %s returned 404; using clip-as-service gRPC fallback (restart 6008 after deploy to use HTTP)", + self.clip_text_endpoint, + ) + return self._clip_text_via_grpc(request_data, normalize_embeddings) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + body_preview = "" + if response is not None: + try: + body_preview = (response.text or "")[:300] + except Exception: + body_preview = "" + logger.error( + "CLIPImageEncoder clip_text request failed | status=%s body=%s error=%s", + getattr(response, "status_code", "n/a"), + body_preview, + e, + exc_info=True, + extra=build_request_log_extra(request_id=request_id, user_id=user_id), + ) + raise + + def encode_clip_text( + self, + text: str, + normalize_embeddings: bool = True, + priority: int = 1, + request_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> np.ndarray: + """ + CN-CLIP 文本塔(与 ``/embed/image`` 同向量空间),对应服务端 ``POST /embed/clip_text``。 + """ + cache_key = build_clip_text_cache_key(text, normalize=normalize_embeddings) + cached = self._clip_text_cache.get(cache_key) + if cached is not None: + return cached + + response_data = self._call_clip_text_service( + [text.strip()], + normalize_embeddings=normalize_embeddings, + priority=priority, + request_id=request_id, + user_id=user_id, + ) + if not response_data or len(response_data) != 1 or response_data[0] is None: + raise RuntimeError(f"No CLIP text embedding returned for: {text[:80]!r}") + vec = np.array(response_data[0], dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError("Invalid CLIP text embedding returned") + self._clip_text_cache.set(cache_key, vec) + return vec + def encode_image(self, image: Image.Image) -> np.ndarray: """ Encode image to embedding vector using network service. diff --git a/embeddings/protocols.py b/embeddings/protocols.py index 3edb9a8..53c4ebb 100644 --- a/embeddings/protocols.py +++ b/embeddings/protocols.py @@ -27,3 +27,17 @@ class ImageEncoderProtocol(Protocol): of returning partial None placeholders. """ ... + + def encode_clip_texts( + self, + texts: List[str], + batch_size: Optional[int] = None, + normalize_embeddings: bool = True, + ) -> List[np.ndarray]: + """ + Encode natural-language strings with the CLIP/CN-CLIP text tower (same space as images). + + Returns: + List of vectors, same length as texts. + """ + ... diff --git a/embeddings/server.py b/embeddings/server.py index 9e4d294..1feff47 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -2,8 +2,9 @@ Embedding service (FastAPI). API (simple list-in, list-out; aligned by index): -- POST /embed/text body: ["text1", "text2", ...] -> [[...], ...] -- POST /embed/image body: ["url_or_path1", ...] -> [[...], ...] +- POST /embed/text body: ["text1", "text2", ...] -> [[...], ...] (TEI/BGE,语义检索 title_embedding) +- POST /embed/image body: ["url_or_path1", ...] -> [[...], ...] (CN-CLIP 图向量) +- POST /embed/clip_text body: ["短语1", "短语2", ...] -> [[...], ...] (CN-CLIP 文本塔,与 /embed/image 同空间) """ import logging @@ -22,7 +23,7 @@ from fastapi.concurrency import run_in_threadpool from config.env_config import REDIS_CONFIG from config.services_config import get_embedding_backend_config -from embeddings.cache_keys import build_image_cache_key, build_text_cache_key +from embeddings.cache_keys import build_clip_text_cache_key, build_image_cache_key, build_text_cache_key from embeddings.config import CONFIG from embeddings.protocols import ImageEncoderProtocol from embeddings.redis_embedding_cache import RedisEmbeddingCache @@ -373,6 +374,7 @@ _text_stats = _EndpointStats(name="text") _image_stats = _EndpointStats(name="image") _text_cache = RedisEmbeddingCache(key_prefix=_CACHE_PREFIX, namespace="") _image_cache = RedisEmbeddingCache(key_prefix=_CACHE_PREFIX, namespace="image") +_clip_text_cache = RedisEmbeddingCache(key_prefix=_CACHE_PREFIX, namespace="clip_text") @dataclass @@ -752,13 +754,20 @@ def _try_full_text_cache_hit( ) -def _try_full_image_cache_hit( - urls: List[str], +def _try_full_image_lane_cache_hit( + items: List[str], effective_normalize: bool, + *, + lane: str, ) -> Optional[_EmbedResult]: out: List[Optional[List[float]]] = [] - for url in urls: - cached = _image_cache.get(build_image_cache_key(url, normalize=effective_normalize)) + for item in items: + if lane == "image": + ck = build_image_cache_key(item, normalize=effective_normalize) + cached = _image_cache.get(ck) + else: + ck = build_clip_text_cache_key(item, normalize=effective_normalize) + cached = _clip_text_cache.get(ck) if cached is None: return None vec = _as_list(cached, normalize=False) @@ -774,6 +783,108 @@ def _try_full_image_cache_hit( ) +def _embed_image_lane_impl( + items: List[str], + effective_normalize: bool, + request_id: str, + user_id: str, + *, + lane: str, +) -> _EmbedResult: + if _image_model is None: + raise RuntimeError("Image model not loaded") + + out: List[Optional[List[float]]] = [None] * len(items) + missing_indices: List[int] = [] + missing_items: List[str] = [] + missing_keys: List[str] = [] + cache_hits = 0 + for idx, item in enumerate(items): + if lane == "image": + ck = build_image_cache_key(item, normalize=effective_normalize) + cached = _image_cache.get(ck) + else: + ck = build_clip_text_cache_key(item, normalize=effective_normalize) + cached = _clip_text_cache.get(ck) + if cached is not None: + vec = _as_list(cached, normalize=False) + if vec is not None: + out[idx] = vec + cache_hits += 1 + continue + missing_indices.append(idx) + missing_items.append(item) + missing_keys.append(ck) + + if not missing_items: + logger.info( + "%s lane cache-only | inputs=%d normalize=%s dim=%d cache_hits=%d", + lane, + len(items), + effective_normalize, + len(out[0]) if out and out[0] is not None else 0, + cache_hits, + extra=build_request_log_extra(request_id=request_id, user_id=user_id), + ) + return _EmbedResult( + vectors=out, + cache_hits=cache_hits, + cache_misses=0, + backend_elapsed_ms=0.0, + mode="cache-only", + ) + + backend_t0 = time.perf_counter() + with _image_encode_lock: + if lane == "image": + vectors = _image_model.encode_image_urls( + missing_items, + batch_size=CONFIG.IMAGE_BATCH_SIZE, + normalize_embeddings=effective_normalize, + ) + else: + vectors = _image_model.encode_clip_texts( + missing_items, + batch_size=CONFIG.IMAGE_BATCH_SIZE, + normalize_embeddings=effective_normalize, + ) + if vectors is None or len(vectors) != len(missing_items): + raise RuntimeError( + f"{lane} lane length mismatch: expected {len(missing_items)}, " + f"got {0 if vectors is None else len(vectors)}" + ) + + for pos, ck, vec in zip(missing_indices, missing_keys, vectors): + out_vec = _as_list(vec, normalize=effective_normalize) + if out_vec is None: + raise RuntimeError(f"{lane} lane empty embedding at position {pos}") + out[pos] = out_vec + if lane == "image": + _image_cache.set(ck, np.asarray(out_vec, dtype=np.float32)) + else: + _clip_text_cache.set(ck, np.asarray(out_vec, dtype=np.float32)) + + backend_elapsed_ms = (time.perf_counter() - backend_t0) * 1000.0 + logger.info( + "%s lane backend-batch | inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d backend_elapsed_ms=%.2f", + lane, + len(items), + effective_normalize, + len(out[0]) if out and out[0] is not None else 0, + cache_hits, + len(missing_items), + backend_elapsed_ms, + extra=build_request_log_extra(request_id=request_id, user_id=user_id), + ) + return _EmbedResult( + vectors=out, + cache_hits=cache_hits, + cache_misses=len(missing_items), + backend_elapsed_ms=backend_elapsed_ms, + mode="backend-batch", + ) + + @app.get("/health") def health() -> Dict[str, Any]: """Health check endpoint. Returns status and current throttling stats.""" @@ -789,6 +900,7 @@ def health() -> Dict[str, Any]: "cache_enabled": { "text": _text_cache.redis_client is not None, "image": _image_cache.redis_client is not None, + "clip_text": _clip_text_cache.redis_client is not None, }, "limits": { "text": _text_request_limiter.snapshot(), @@ -1148,102 +1260,29 @@ async def embed_text( reset_request_log_context(log_tokens) -def _embed_image_impl( - urls: List[str], - effective_normalize: bool, - request_id: str, - user_id: str, -) -> _EmbedResult: - if _image_model is None: - raise RuntimeError("Image model not loaded") - - out: List[Optional[List[float]]] = [None] * len(urls) - missing_indices: List[int] = [] - missing_urls: List[str] = [] - missing_cache_keys: List[str] = [] - cache_hits = 0 - for idx, url in enumerate(urls): - cache_key = build_image_cache_key(url, normalize=effective_normalize) - cached = _image_cache.get(cache_key) - if cached is not None: - vec = _as_list(cached, normalize=False) - if vec is not None: - out[idx] = vec - cache_hits += 1 - continue - missing_indices.append(idx) - missing_urls.append(url) - missing_cache_keys.append(cache_key) - - if not missing_urls: - logger.info( - "image backend done | mode=cache-only inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 backend_elapsed_ms=0.00", - len(urls), - effective_normalize, - len(out[0]) if out and out[0] is not None else 0, - cache_hits, - extra=build_request_log_extra(request_id, user_id), - ) - return _EmbedResult( - vectors=out, - cache_hits=cache_hits, - cache_misses=0, - backend_elapsed_ms=0.0, - mode="cache-only", - ) - - backend_t0 = time.perf_counter() - with _image_encode_lock: - vectors = _image_model.encode_image_urls( - missing_urls, - batch_size=CONFIG.IMAGE_BATCH_SIZE, - normalize_embeddings=effective_normalize, - ) - if vectors is None or len(vectors) != len(missing_urls): - raise RuntimeError( - f"Image model response length mismatch: expected {len(missing_urls)}, " - f"got {0 if vectors is None else len(vectors)}" - ) +def _parse_string_inputs(raw: List[Any], *, kind: str, empty_detail: str) -> List[str]: + out: List[str] = [] + for i, x in enumerate(raw): + if not isinstance(x, str): + raise HTTPException(status_code=400, detail=f"Invalid {kind} at index {i}: must be string") + s = x.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid {kind} at index {i}: {empty_detail}") + out.append(s) + return out - for pos, cache_key, vec in zip(missing_indices, missing_cache_keys, vectors): - out_vec = _as_list(vec, normalize=effective_normalize) - if out_vec is None: - raise RuntimeError(f"Image model returned empty embedding for position {pos}") - out[pos] = out_vec - _image_cache.set(cache_key, np.asarray(out_vec, dtype=np.float32)) - - backend_elapsed_ms = (time.perf_counter() - backend_t0) * 1000.0 - - logger.info( - "image backend done | mode=backend-batch inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d backend_elapsed_ms=%.2f", - len(urls), - effective_normalize, - len(out[0]) if out and out[0] is not None else 0, - cache_hits, - len(missing_urls), - backend_elapsed_ms, - extra=build_request_log_extra(request_id, user_id), - ) - return _EmbedResult( - vectors=out, - cache_hits=cache_hits, - cache_misses=len(missing_urls), - backend_elapsed_ms=backend_elapsed_ms, - mode="backend-batch", - ) - -@app.post("/embed/image") -async def embed_image( - images: List[str], +async def _run_image_lane_embed( + *, + route: str, + lane: str, + items: List[str], http_request: Request, response: Response, - normalize: Optional[bool] = None, - priority: int = 0, + normalize: Optional[bool], + priority: int, + preview_chars: int, ) -> List[Optional[List[float]]]: - if _image_model is None: - raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service") - request_id = _resolve_request_id(http_request) user_id = _resolve_user_id(http_request) _, _, log_tokens = bind_request_log_context(request_id, user_id) @@ -1255,24 +1294,16 @@ async def embed_image( cache_hits = 0 cache_misses = 0 limiter_acquired = False + items_in: List[str] = list(items) try: if priority < 0: raise HTTPException(status_code=400, detail="priority must be >= 0") effective_priority = _effective_priority(priority) - effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) - urls: List[str] = [] - for i, url_or_path in enumerate(images): - if not isinstance(url_or_path, str): - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") - s = url_or_path.strip() - if not s: - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") - urls.append(s) cache_check_started = time.perf_counter() - cache_only = _try_full_image_cache_hit(urls, effective_normalize) + cache_only = _try_full_image_lane_cache_hit(items, effective_normalize, lane=lane) if cache_only is not None: latency_ms = (time.perf_counter() - cache_check_started) * 1000.0 _image_stats.record_completed( @@ -1283,9 +1314,10 @@ async def embed_image( cache_misses=0, ) logger.info( - "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + "%s response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d first_vector=%s latency_ms=%.2f", + route, _priority_label(effective_priority), - len(urls), + len(items), effective_normalize, len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, cache_only.cache_hits, @@ -1299,14 +1331,15 @@ async def embed_image( if not accepted: _image_stats.record_rejected() logger.warning( - "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "%s rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + route, _request_client(http_request), _priority_label(effective_priority), - len(urls), + len(items), effective_normalize, active, _IMAGE_MAX_INFLIGHT, - _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), + _preview_inputs(items, _LOG_PREVIEW_COUNT, preview_chars), extra=build_request_log_extra(request_id, user_id), ) raise HTTPException( @@ -1318,24 +1351,33 @@ async def embed_image( ) limiter_acquired = True logger.info( - "embed_image request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "%s request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + route, _request_client(http_request), _priority_label(effective_priority), - len(urls), + len(items), effective_normalize, active, _IMAGE_MAX_INFLIGHT, - _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), + _preview_inputs(items, _LOG_PREVIEW_COUNT, preview_chars), extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( - "embed_image detail | payload=%s normalize=%s priority=%s", - urls, + "%s detail | payload=%s normalize=%s priority=%s", + route, + items, effective_normalize, _priority_label(effective_priority), extra=build_request_log_extra(request_id, user_id), ) - result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id, user_id) + result = await run_in_threadpool( + _embed_image_lane_impl, + items, + effective_normalize, + request_id, + user_id, + lane=lane, + ) success = True backend_elapsed_ms = result.backend_elapsed_ms cache_hits = result.cache_hits @@ -1349,10 +1391,11 @@ async def embed_image( cache_misses=cache_misses, ) logger.info( - "embed_image response | mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", + "%s response | mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", + route, result.mode, _priority_label(effective_priority), - len(urls), + len(items), effective_normalize, len(result.vectors[0]) if result.vectors and result.vectors[0] is not None else 0, cache_hits, @@ -1362,7 +1405,8 @@ async def embed_image( extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( - "embed_image result detail | count=%d first_vector=%s latency_ms=%.2f", + "%s result detail | count=%d first_vector=%s latency_ms=%.2f", + route, len(result.vectors), result.vectors[0][: _VECTOR_PREVIEW_DIMS] if result.vectors and result.vectors[0] is not None @@ -1383,24 +1427,73 @@ async def embed_image( cache_misses=cache_misses, ) logger.error( - "embed_image failed | priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", + "%s failed | priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", + route, _priority_label(effective_priority), - len(urls), + len(items_in), effective_normalize, latency_ms, e, exc_info=True, extra=build_request_log_extra(request_id, user_id), ) - raise HTTPException(status_code=502, detail=f"Image embedding backend failure: {e}") from e + raise HTTPException(status_code=502, detail=f"{route} backend failure: {e}") from e finally: if limiter_acquired: remaining = _image_request_limiter.release(success=success) logger.info( - "embed_image finalize | success=%s priority=%s active_after=%d", + "%s finalize | success=%s priority=%s active_after=%d", + route, success, _priority_label(effective_priority), remaining, extra=build_request_log_extra(request_id, user_id), ) reset_request_log_context(log_tokens) + + +@app.post("/embed/image") +async def embed_image( + images: List[str], + http_request: Request, + response: Response, + normalize: Optional[bool] = None, + priority: int = 0, +) -> List[Optional[List[float]]]: + if _image_model is None: + raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service") + items = _parse_string_inputs(images, kind="image", empty_detail="empty URL/path") + return await _run_image_lane_embed( + route="embed_image", + lane="image", + items=items, + http_request=http_request, + response=response, + normalize=normalize, + priority=priority, + preview_chars=_LOG_IMAGE_PREVIEW_CHARS, + ) + + +@app.post("/embed/clip_text") +async def embed_clip_text( + texts: List[str], + http_request: Request, + response: Response, + normalize: Optional[bool] = None, + priority: int = 0, +) -> List[Optional[List[float]]]: + """CN-CLIP 文本塔,与 ``POST /embed/image`` 同向量空间。""" + if _image_model is None: + raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service") + items = _parse_string_inputs(texts, kind="text", empty_detail="empty string") + return await _run_image_lane_embed( + route="embed_clip_text", + lane="clip_text", + items=items, + http_request=http_request, + response=response, + normalize=normalize, + priority=priority, + preview_chars=_LOG_TEXT_PREVIEW_CHARS, + ) diff --git a/scripts/es_debug_search.py b/scripts/es_debug_search.py index f37ac5f..8097150 100644 --- a/scripts/es_debug_search.py +++ b/scripts/es_debug_search.py @@ -412,7 +412,7 @@ def _looks_like_image_ref(url: str) -> bool: def _encode_clip_query_vector(query: str) -> List[float]: """ - 与索引中 image_embedding 同空间:图走 HTTP /embed/image;文本走 clip-as-service gRPC encode。 + 与索引中 image_embedding 同空间:图走 ``POST /embed/image``;文本走 ``POST /embed/clip_text``(6008)。 """ import numpy as np @@ -420,40 +420,14 @@ def _encode_clip_query_vector(query: str) -> List[float]: if not q: raise ValueError("empty query") - from config.services_config import get_embedding_image_backend_config - - backend, cfg = get_embedding_image_backend_config() + from embeddings.image_encoder import CLIPImageEncoder + enc = CLIPImageEncoder() if _looks_like_image_ref(q): - from embeddings.image_encoder import CLIPImageEncoder - - enc = CLIPImageEncoder() vec = enc.encode_image_from_url(q, normalize_embeddings=True, priority=1) - return vec.astype(np.float32).flatten().tolist() - - if backend != "clip_as_service": - raise RuntimeError( - "mode 5 纯文本查询需要 CN-CLIP 文本向量(与 clip-as-service 同空间)。" - "当前 image_backend 为 local_cnclip,本脚本不加载本地模型。" - "请将 config 中 services.embedding.image_backend 设为 clip_as_service 并启动 grpc " - "(默认 51000),或输入图片 URL/路径(将调用 POST /embed/image 到 6008)。" - ) - - from embeddings.clip_as_service_encoder import _ensure_clip_client_path - - _ensure_clip_client_path() - from clip_client import Client - - server = str(cfg.get("server") or "grpc://127.0.0.1:51000").strip() - normalize = bool(cfg.get("normalize_embeddings", True)) - client = Client(server) - arr = client.encode([q], batch_size=1, show_progress=False) - vec = np.asarray(arr[0], dtype=np.float32).flatten() - if normalize: - n = float(np.linalg.norm(vec)) - if np.isfinite(n) and n > 0.0: - vec = vec / n - return vec.tolist() + else: + vec = enc.encode_clip_text(q, normalize_embeddings=True, priority=1) + return vec.astype(np.float32).flatten().tolist() def search_title_knn(es: Any, index_name: str, query: str, size: int) -> List[Dict[str, Any]]: @@ -467,7 +441,6 @@ def search_title_knn(es: Any, index_name: str, query: str, size: int) -> List[Di qv = vec.astype("float32").flatten().tolist() num_cand = max(size * 10, 100) body: Dict[str, Any] = { - "size": size, "knn": { "field": "title_embedding", "query_vector": qv, @@ -484,7 +457,6 @@ def search_image_knn(es: Any, index_name: str, query: str, size: int) -> List[Di num_cand = max(size * 10, 100) field = "image_embedding.vector" body: Dict[str, Any] = { - "size": size, "knn": { "field": field, "query_vector": qv, diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index 46e1932..d816c04 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -871,7 +871,7 @@ Special targets: Examples: ./scripts/service_ctl.sh up all - ./scripts/service_ctl.sh up tei cnclip embedding translator reranker + ./scripts/service_ctl.sh up tei cnclip embedding embedding-image translator reranker ./scripts/service_ctl.sh up backend indexer frontend ./scripts/service_ctl.sh restart ./scripts/service_ctl.sh monitor-start all diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index 73f0159..80ba7b5 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -556,6 +556,9 @@ class _FakeImageModel: def encode_image_urls(self, urls, batch_size=8, normalize_embeddings=True): return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls] + def encode_clip_texts(self, texts, batch_size=8, normalize_embeddings=True): + return [np.array([0.31, 0.21, 0.11], dtype=np.float32) for _ in texts] + class _EmbeddingCacheMiss: """Avoid Redis/module cache hits so contract tests exercise the encode path.""" @@ -579,6 +582,7 @@ def embedding_module(): emb_server._text_backend_name = "tei" emb_server._text_cache = _EmbeddingCacheMiss() emb_server._image_cache = _EmbeddingCacheMiss() + emb_server._clip_text_cache = _EmbeddingCacheMiss() yield emb_server @@ -604,6 +608,17 @@ def test_embedding_image_contract(embedding_module): assert len(data[0]) == 3 +def test_embedding_clip_text_contract(embedding_module): + from fastapi.testclient import TestClient + + with TestClient(embedding_module.app) as client: + resp = client.post("/embed/clip_text", json=["纯棉短袖", "street tee"]) + assert resp.status_code == 200 + data = resp.json() + assert len(data) == 2 + assert len(data[0]) == 3 + + class _FakeTranslator: model = "qwen-mt" supports_batch = True -- libgit2 0.21.2