From 4747e2f40cb1e479219bcc25f420216e04d20092 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 19 Mar 2026 12:27:05 +0800 Subject: [PATCH] embedding performance The instability is very likely real overload, but `lsof -i :6005 | wc -l = 75` alone does not prove it. What does matter is the live shape of the service: it is a single `uvicorn` worker on port `6005`, and the code had one shared process handling both text and image requests, with image work serialized behind a single lock. Under bursty image traffic, requests could pile up and sit blocked with almost no useful tracing, which matches the “only blocking observed” symptom. --- CLAUDE.md | 4 ++-- config/config.yaml | 2 +- docs/CNCLIP_SERVICE说明文档.md | 22 +++++++++++++++++++++- docs/工作总结-微服务性能优化与架构.md | 4 ++-- embeddings/README.md | 4 +++- embeddings/clip_model.py | 2 +- embeddings/config.py | 3 ++- embeddings/server.py | 544 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------------------------------------------- perf_reports/20260319/nllb_t4_longtext_reassessment.md | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/benchmark_translation_longtext_single.py | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/start_cnclip_service.sh | 55 ++++++++++++++++++++++++++++++++++++++++++++----------- scripts/start_embedding_service.sh | 6 +++++- tests/test_embedding_service_limits.py | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ translation/backends/local_ctranslate2.py | 20 +++++++++++++++----- translation/backends/local_seq2seq.py | 20 +++++++++++++++----- 15 files changed, 956 insertions(+), 106 deletions(-) create mode 100644 perf_reports/20260319/nllb_t4_longtext_reassessment.md create mode 100644 scripts/benchmark_translation_longtext_single.py create mode 100644 tests/test_embedding_service_limits.py diff --git a/CLAUDE.md b/CLAUDE.md index 70cf005..dfa1da7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -195,7 +195,7 @@ The system uses centralized configuration through `config/config.yaml`: - Configurable caching to avoid recomputation **Image Embedding** (`embeddings/clip_encoder.py`): -- Uses CN-CLIP model (ViT-H-14) +- Uses a configurable CN-CLIP model (default managed in `embeddings/config.py`) - Downloads and preprocesses images from URLs - Supports both local and remote image processing - Generates 1024-dimensional vectors @@ -563,7 +563,7 @@ GET /admin/stats # Index statistics - **Usage**: Semantic search combined with BM25 relevance **Image Search Pipeline**: -- **Model**: CN-CLIP (ViT-H-14) +- **Model**: CN-CLIP (configured in `embeddings/config.py`) - **Processing**: URL download → preprocessing → vectorization - **Storage**: Nested structure with vector + original URL - **Application**: Visual similarity search for products diff --git a/config/config.yaml b/config/config.yaml index 99178ec..34c227d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -148,7 +148,7 @@ services: ct2_decoding_length_min: 32 device: "cuda" torch_dtype: "float16" - batch_size: 16 + batch_size: 64 max_input_length: 256 max_new_tokens: 64 num_beams: 1 diff --git a/docs/CNCLIP_SERVICE说明文档.md b/docs/CNCLIP_SERVICE说明文档.md index 650630a..adb7484 100644 --- a/docs/CNCLIP_SERVICE说明文档.md +++ b/docs/CNCLIP_SERVICE说明文档.md @@ -59,7 +59,26 @@ cd /data/saas-search ./scripts/start_cnclip_service.sh --device cpu ``` -### 5.3 停止服务 +### 5.3 模型配置与覆盖 + +- 仓库默认模型取自 `embeddings/config.py` 的 `CLIP_AS_SERVICE_MODEL_NAME`。 +- `scripts/start_cnclip_service.sh` 会自动读取这个配置,因此修改默认模型时不需要再去脚本里找硬编码。 +- 覆盖优先级:`--model-name` > `CNCLIP_MODEL_NAME` > `CLIP_AS_SERVICE_MODEL_NAME` / `embeddings/config.py`。 + +查看当前默认模型: + +```bash +python3 -c "from embeddings.config import CONFIG; print(CONFIG.CLIP_AS_SERVICE_MODEL_NAME)" +``` + +临时覆盖模型: + +```bash +./scripts/start_cnclip_service.sh --model-name CN-CLIP/ViT-L-14 +CNCLIP_MODEL_NAME=CN-CLIP/ViT-H-14 ./scripts/service_ctl.sh start cnclip +``` + +### 5.4 停止服务 ```bash ./scripts/stop_cnclip_service.sh @@ -110,6 +129,7 @@ cat third-party/clip-as-service/server/torch-flow-temp.yml - GPU 模式:`device: 'cuda'` - CPU 模式:`device: 'cpu'` +- 模型名:`name: '<当前实际模型名>'` ### 7.2.1 日志与 PID 文件 diff --git a/docs/工作总结-微服务性能优化与架构.md b/docs/工作总结-微服务性能优化与架构.md index 2ba53c4..221d1f2 100644 --- a/docs/工作总结-微服务性能优化与架构.md +++ b/docs/工作总结-微服务性能优化与架构.md @@ -67,13 +67,13 @@ instruction: "Given a shopping query, rank product titles by relevance" ### 3. 图片向量(Image Embedding) -**方案**:**clip-as-service**(CN-CLIP,ViT-H-14),由独立服务提供图片向量化能力。 +**方案**:**clip-as-service**(CN-CLIP,模型由配置控制),由独立服务提供图片向量化能力。 **具体内容**: - **端口**:clip-as-service 默认 **51000**(`CNCLIP_PORT`);文本走 TEI(8080),图片走 clip-as-service。 - **API**:embedding 服务(6005)统一暴露 `POST /embed/text` 与 `POST /embed/image`;图片请求由 `embeddings/server.py` 按配置调用实现 `ImageEncoderProtocol` 的后端(clip-as-service 或本地 CN-CLIP)。 - **环境与启停**:CN-CLIP 使用独立虚拟环境 `.venv-cnclip`;启动 `scripts/start_cnclip_service.sh`,或 `./scripts/service_ctl.sh start cnclip`;设备可通过 `CNCLIP_DEVICE=cuda`(默认)或 `cpu` 指定。 -- **配置**:图片后端在 `config/config.yaml` 的 `services.embedding` 下配置(若存在 image 相关 backend);clip-as-service 的 flow 配置在 `third-party/clip-as-service/server/torch-flow-temp.yml`。 +- **配置**:图片后端在 `config/config.yaml` 的 `services.embedding` 下配置(若存在 image 相关 backend);clip-as-service 默认模型由 `embeddings/config.py` 的 `CLIP_AS_SERVICE_MODEL_NAME` 控制,flow 配置在 `third-party/clip-as-service/server/torch-flow-temp.yml`。 详见:`docs/CNCLIP_SERVICE说明文档.md`、`embeddings/README.md`。 diff --git a/embeddings/README.md b/embeddings/README.md index 934ac3d..0dac9e7 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -58,6 +58,8 @@ 3. **配置**(`embeddings/config.py` 或环境变量): - `USE_CLIP_AS_SERVICE=true`(默认) - `CLIP_AS_SERVICE_SERVER=grpc://127.0.0.1:51000` + - `CLIP_AS_SERVICE_MODEL_NAME=CN-CLIP/ViT-L-14` + - `scripts/start_cnclip_service.sh` 默认会读取同一个 `CLIP_AS_SERVICE_MODEL_NAME`,也可用 `CNCLIP_MODEL_NAME` 或 `--model-name` 临时覆盖 ### 启动服务 @@ -80,6 +82,6 @@ TEI_DEVICE=cpu ./scripts/start_tei_service.sh - `PORT`: 服务端口(默认 6005) - `TEXT_MODEL_ID`, `TEXT_DEVICE`, `TEXT_BATCH_SIZE`, `TEXT_NORMALIZE_EMBEDDINGS` - `IMAGE_NORMALIZE_EMBEDDINGS`(默认 true) -- `USE_CLIP_AS_SERVICE`, `CLIP_AS_SERVICE_SERVER`:图片向量(clip-as-service) +- `USE_CLIP_AS_SERVICE`, `CLIP_AS_SERVICE_SERVER`, `CLIP_AS_SERVICE_MODEL_NAME`:图片向量(clip-as-service) - `IMAGE_MODEL_NAME`, `IMAGE_DEVICE`:本地 CN-CLIP(当 `USE_CLIP_AS_SERVICE=false` 时) - TEI 相关:`TEI_DEVICE`、`TEI_VERSION`、`TEI_MAX_BATCH_TOKENS`、`TEI_MAX_CLIENT_BATCH_SIZE`、`TEI_HEALTH_TIMEOUT_SEC` diff --git a/embeddings/clip_model.py b/embeddings/clip_model.py index 835fd14..657c67e 100644 --- a/embeddings/clip_model.py +++ b/embeddings/clip_model.py @@ -16,7 +16,7 @@ from cn_clip.clip import load_from_name import cn_clip.clip as clip -DEFAULT_MODEL_NAME = "ViT-H-14" +DEFAULT_MODEL_NAME = "ViT-L-14" # "ViT-H-14", "ViT-L-14-336" MODEL_DOWNLOAD_DIR = "/data/" diff --git a/embeddings/config.py b/embeddings/config.py index 1df70aa..4b7b9e8 100644 --- a/embeddings/config.py +++ b/embeddings/config.py @@ -30,9 +30,10 @@ class EmbeddingConfig(object): # Option A: clip-as-service (Jina CLIP server, recommended) USE_CLIP_AS_SERVICE = os.getenv("USE_CLIP_AS_SERVICE", "true").lower() in ("1", "true", "yes") CLIP_AS_SERVICE_SERVER = os.getenv("CLIP_AS_SERVICE_SERVER", "grpc://127.0.0.1:51000") + CLIP_AS_SERVICE_MODEL_NAME = os.getenv("CLIP_AS_SERVICE_MODEL_NAME", "CN-CLIP/ViT-L-14") # Option B: local CN-CLIP (when USE_CLIP_AS_SERVICE=false) - IMAGE_MODEL_NAME = "ViT-H-14" + IMAGE_MODEL_NAME = os.getenv("IMAGE_MODEL_NAME", "ViT-L-14") IMAGE_DEVICE = None # type: Optional[str] # "cuda" / "cpu" / None(auto) # Service behavior diff --git a/embeddings/server.py b/embeddings/server.py index 9aa8579..bd244a0 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -8,23 +8,100 @@ API (simple list-in, list-out; aligned by index): import logging import os +import pathlib import threading import time +import uuid from collections import deque from dataclasses import dataclass +from logging.handlers import TimedRotatingFileHandler from typing import Any, Dict, List, Optional import numpy as np -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request, Response +from fastapi.concurrency import run_in_threadpool +from config.services_config import get_embedding_backend_config from embeddings.config import CONFIG from embeddings.protocols import ImageEncoderProtocol -from config.services_config import get_embedding_backend_config - -logger = logging.getLogger(__name__) app = FastAPI(title="saas-search Embedding Service", version="1.0.0") + +class _DefaultRequestIdFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + if not hasattr(record, "reqid"): + record.reqid = "-1" + return True + + +def configure_embedding_logging() -> None: + root_logger = logging.getLogger() + if getattr(root_logger, "_embedding_logging_configured", False): + return + + log_dir = pathlib.Path("logs") + verbose_dir = log_dir / "verbose" + log_dir.mkdir(exist_ok=True) + verbose_dir.mkdir(parents=True, exist_ok=True) + + log_level = os.getenv("LOG_LEVEL", "INFO").upper() + numeric_level = getattr(logging, log_level, logging.INFO) + formatter = logging.Formatter( + "%(asctime)s | reqid:%(reqid)s | %(name)s | %(levelname)s | %(message)s" + ) + request_filter = _DefaultRequestIdFilter() + + root_logger.setLevel(numeric_level) + + file_handler = TimedRotatingFileHandler( + filename=log_dir / "embedding_api.log", + when="midnight", + interval=1, + backupCount=30, + encoding="utf-8", + ) + file_handler.setLevel(numeric_level) + file_handler.setFormatter(formatter) + file_handler.addFilter(request_filter) + root_logger.addHandler(file_handler) + + error_handler = TimedRotatingFileHandler( + filename=log_dir / "embedding_api_error.log", + when="midnight", + interval=1, + backupCount=30, + encoding="utf-8", + ) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(formatter) + error_handler.addFilter(request_filter) + root_logger.addHandler(error_handler) + + verbose_logger = logging.getLogger("embedding.verbose") + verbose_logger.setLevel(numeric_level) + verbose_logger.handlers.clear() + verbose_logger.propagate = False + + verbose_handler = TimedRotatingFileHandler( + filename=verbose_dir / "embedding_verbose.log", + when="midnight", + interval=1, + backupCount=30, + encoding="utf-8", + ) + verbose_handler.setLevel(numeric_level) + verbose_handler.setFormatter(formatter) + verbose_handler.addFilter(request_filter) + verbose_logger.addHandler(verbose_handler) + + root_logger._embedding_logging_configured = True # type: ignore[attr-defined] + + +configure_embedding_logging() +logger = logging.getLogger(__name__) +verbose_logger = logging.getLogger("embedding.verbose") + # Models are loaded at startup, not lazily _text_model: Optional[Any] = None _image_model: Optional[ImageEncoderProtocol] = None @@ -35,12 +112,78 @@ open_image_model = os.getenv("EMBEDDING_ENABLE_IMAGE_MODEL", "true").lower() in _text_encode_lock = threading.Lock() _image_encode_lock = threading.Lock() +_TEXT_MICROBATCH_WINDOW_SEC = max( + 0.0, float(os.getenv("TEXT_MICROBATCH_WINDOW_MS", "4")) / 1000.0 +) +_TEXT_REQUEST_TIMEOUT_SEC = max( + 1.0, float(os.getenv("TEXT_REQUEST_TIMEOUT_SEC", "30")) +) +_TEXT_MAX_INFLIGHT = max(1, int(os.getenv("TEXT_MAX_INFLIGHT", "32"))) +_IMAGE_MAX_INFLIGHT = max(1, int(os.getenv("IMAGE_MAX_INFLIGHT", "1"))) +_OVERLOAD_STATUS_CODE = int(os.getenv("EMBEDDING_OVERLOAD_STATUS_CODE", "503")) +_LOG_PREVIEW_COUNT = max(1, int(os.getenv("EMBEDDING_LOG_PREVIEW_COUNT", "3"))) +_LOG_TEXT_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_TEXT_PREVIEW_CHARS", "120"))) +_LOG_IMAGE_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_IMAGE_PREVIEW_CHARS", "180"))) +_VECTOR_PREVIEW_DIMS = max(1, int(os.getenv("EMBEDDING_VECTOR_PREVIEW_DIMS", "6"))) + + +class _InflightLimiter: + def __init__(self, name: str, limit: int): + self.name = name + self.limit = max(1, int(limit)) + self._sem = threading.BoundedSemaphore(self.limit) + self._lock = threading.Lock() + self._active = 0 + self._rejected = 0 + self._completed = 0 + self._failed = 0 + self._max_active = 0 + + def try_acquire(self) -> tuple[bool, int]: + if not self._sem.acquire(blocking=False): + with self._lock: + self._rejected += 1 + active = self._active + return False, active + with self._lock: + self._active += 1 + self._max_active = max(self._max_active, self._active) + active = self._active + return True, active + + def release(self, *, success: bool) -> int: + with self._lock: + self._active = max(0, self._active - 1) + if success: + self._completed += 1 + else: + self._failed += 1 + active = self._active + self._sem.release() + return active + + def snapshot(self) -> Dict[str, int]: + with self._lock: + return { + "limit": self.limit, + "active": self._active, + "rejected_total": self._rejected, + "completed_total": self._completed, + "failed_total": self._failed, + "max_active": self._max_active, + } + + +_text_request_limiter = _InflightLimiter(name="text", limit=_TEXT_MAX_INFLIGHT) +_image_request_limiter = _InflightLimiter(name="image", limit=_IMAGE_MAX_INFLIGHT) + @dataclass class _SingleTextTask: text: str normalize: bool created_at: float + request_id: str done: threading.Event result: Optional[List[float]] = None error: Optional[Exception] = None @@ -50,15 +193,6 @@ _text_single_queue: "deque[_SingleTextTask]" = deque() _text_single_queue_cv = threading.Condition() _text_batch_worker: Optional[threading.Thread] = None _text_batch_worker_stop = False -_TEXT_MICROBATCH_WINDOW_SEC = max( - 0.0, float(os.getenv("TEXT_MICROBATCH_WINDOW_MS", "4")) / 1000.0 -) -_TEXT_REQUEST_TIMEOUT_SEC = max( - 1.0, float(os.getenv("TEXT_REQUEST_TIMEOUT_SEC", "30")) -) -_LOG_PREVIEW_COUNT = max(1, int(os.getenv("EMBEDDING_LOG_PREVIEW_COUNT", "3"))) -_LOG_TEXT_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_TEXT_PREVIEW_CHARS", "120"))) -_LOG_IMAGE_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_IMAGE_PREVIEW_CHARS", "180"))) def _compact_preview(text: str, max_chars: int) -> str: @@ -81,6 +215,29 @@ def _preview_inputs(items: List[str], max_items: int, max_chars: int) -> List[Di return previews +def _preview_vector(vec: Optional[List[float]], max_dims: int = _VECTOR_PREVIEW_DIMS) -> List[float]: + if not vec: + return [] + return [round(float(v), 6) for v in vec[:max_dims]] + + +def _request_log_extra(request_id: str) -> Dict[str, str]: + return {"reqid": request_id} + + +def _resolve_request_id(http_request: Request) -> str: + header_value = http_request.headers.get("X-Request-ID") + if header_value and header_value.strip(): + return header_value.strip()[:32] + return str(uuid.uuid4())[:8] + + +def _request_client(http_request: Request) -> str: + client = getattr(http_request, "client", None) + host = getattr(client, "host", None) + return str(host or "-") + + def _encode_local_st(texts: List[str], normalize_embeddings: bool) -> Any: with _text_encode_lock: return _text_model.encode( @@ -139,6 +296,21 @@ def _text_batch_worker_loop() -> None: batch.append(_text_single_queue.popleft()) try: + queue_wait_ms = [(time.perf_counter() - task.created_at) * 1000.0 for task in batch] + reqids = [task.request_id for task in batch] + logger.info( + "text microbatch dispatch | size=%d queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s", + len(batch), + min(queue_wait_ms) if queue_wait_ms else 0.0, + max(queue_wait_ms) if queue_wait_ms else 0.0, + reqids, + _preview_inputs( + [task.text for task in batch], + _LOG_PREVIEW_COUNT, + _LOG_TEXT_PREVIEW_CHARS, + ), + ) + batch_t0 = time.perf_counter() embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False) if embs is None or len(embs) != len(batch): raise RuntimeError( @@ -150,7 +322,21 @@ def _text_batch_worker_loop() -> None: if vec is None: raise RuntimeError("Text model returned empty embedding in micro-batch") task.result = vec + logger.info( + "text microbatch done | size=%d reqids=%s dim=%d backend_elapsed_ms=%.2f", + len(batch), + reqids, + len(batch[0].result) if batch and batch[0].result is not None else 0, + (time.perf_counter() - batch_t0) * 1000.0, + ) except Exception as exc: + logger.error( + "text microbatch failed | size=%d reqids=%s error=%s", + len(batch), + [task.request_id for task in batch], + exc, + exc_info=True, + ) for task in batch: task.error = exc finally: @@ -158,11 +344,12 @@ def _text_batch_worker_loop() -> None: task.done.set() -def _encode_single_text_with_microbatch(text: str, normalize: bool) -> List[float]: +def _encode_single_text_with_microbatch(text: str, normalize: bool, request_id: str) -> List[float]: task = _SingleTextTask( text=text, normalize=normalize, created_at=time.perf_counter(), + request_id=request_id, done=threading.Event(), ) with _text_single_queue_cv: @@ -192,7 +379,6 @@ def load_models(): logger.info("Loading embedding models at startup...") - # Load text model if open_text_model: try: backend_name, backend_cfg = get_embedding_backend_config() @@ -233,17 +419,19 @@ def load_models(): ) logger.info("Text backend loaded successfully: %s", _text_backend_name) except Exception as e: - logger.error(f"Failed to load text model: {e}", exc_info=True) + logger.error("Failed to load text model: %s", e, exc_info=True) raise - - # Load image model: clip-as-service (recommended) or local CN-CLIP if open_image_model: try: if CONFIG.USE_CLIP_AS_SERVICE: from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder - logger.info(f"Loading image encoder via clip-as-service: {CONFIG.CLIP_AS_SERVICE_SERVER}") + logger.info( + "Loading image encoder via clip-as-service: %s (configured model: %s)", + CONFIG.CLIP_AS_SERVICE_SERVER, + CONFIG.CLIP_AS_SERVICE_MODEL_NAME, + ) _image_model = ClipAsServiceImageEncoder( server=CONFIG.CLIP_AS_SERVICE_SERVER, batch_size=CONFIG.IMAGE_BATCH_SIZE, @@ -252,7 +440,11 @@ def load_models(): else: from embeddings.clip_model import ClipImageModel - logger.info(f"Loading local image model: {CONFIG.IMAGE_MODEL_NAME} (device: {CONFIG.IMAGE_DEVICE})") + logger.info( + "Loading local image model: %s (device: %s)", + CONFIG.IMAGE_MODEL_NAME, + CONFIG.IMAGE_DEVICE, + ) _image_model = ClipImageModel( model_name=CONFIG.IMAGE_MODEL_NAME, device=CONFIG.IMAGE_DEVICE, @@ -292,55 +484,56 @@ def _as_list(embedding: Optional[np.ndarray], normalize: bool = False) -> Option @app.get("/health") def health() -> Dict[str, Any]: - """Health check endpoint. Returns status and model loading state.""" + """Health check endpoint. Returns status and current throttling stats.""" return { "status": "ok", "text_model_loaded": _text_model is not None, "text_backend": _text_backend_name, "image_model_loaded": _image_model is not None, + "limits": { + "text": _text_request_limiter.snapshot(), + "image": _image_request_limiter.snapshot(), + }, + "text_microbatch": { + "window_ms": round(_TEXT_MICROBATCH_WINDOW_SEC * 1000.0, 3), + "queue_depth": len(_text_single_queue), + "worker_alive": bool(_text_batch_worker is not None and _text_batch_worker.is_alive()), + "request_timeout_sec": _TEXT_REQUEST_TIMEOUT_SEC, + }, } -@app.post("/embed/text") -def embed_text(texts: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]: +def _embed_text_impl( + normalized: List[str], + effective_normalize: bool, + request_id: str, +) -> List[Optional[List[float]]]: if _text_model is None: raise RuntimeError("Text model not loaded") - effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) - normalized: List[str] = [] - for i, t in enumerate(texts): - if not isinstance(t, str): - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string") - s = t.strip() - if not s: - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string") - normalized.append(s) - - logger.info( - "embed_text request | backend=%s inputs=%d normalize=%s preview=%s", - _text_backend_name, - len(normalized), - effective_normalize, - _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), - ) t0 = time.perf_counter() try: - # local_st backend uses in-process torch model, keep serialized encode for safety; - # TEI backend is an HTTP client and supports concurrent requests. if _text_backend_name == "local_st": if len(normalized) == 1 and _text_batch_worker is not None: - out = [_encode_single_text_with_microbatch(normalized[0], normalize=effective_normalize)] - elapsed_ms = (time.perf_counter() - t0) * 1000.0 + out = [ + _encode_single_text_with_microbatch( + normalized[0], + normalize=effective_normalize, + request_id=request_id, + ) + ] logger.info( - "embed_text done | backend=%s mode=microbatch-single inputs=%d normalize=%s dim=%d elapsed_ms=%.2f", + "text backend done | backend=%s mode=microbatch-single inputs=%d normalize=%s dim=%d backend_elapsed_ms=%.2f", _text_backend_name, len(normalized), effective_normalize, len(out[0]) if out and out[0] is not None else 0, - elapsed_ms, + (time.perf_counter() - t0) * 1000.0, + extra=_request_log_extra(request_id), ) return out embs = _encode_local_st(normalized, normalize_embeddings=False) + mode = "direct-batch" else: embs = _text_model.encode( normalized, @@ -348,55 +541,154 @@ def embed_text(texts: List[str], normalize: Optional[bool] = None) -> List[Optio device=CONFIG.TEXT_DEVICE, normalize_embeddings=effective_normalize, ) + mode = "backend-batch" except Exception as e: - logger.error("Text embedding backend failure: %s", e, exc_info=True) - raise HTTPException( - status_code=502, - detail=f"Text embedding backend failure: {e}", - ) from e + logger.error( + "Text embedding backend failure: %s", + e, + exc_info=True, + extra=_request_log_extra(request_id), + ) + raise RuntimeError(f"Text embedding backend failure: {e}") from e + if embs is None or len(embs) != len(normalized): raise RuntimeError( f"Text model response length mismatch: expected {len(normalized)}, " f"got {0 if embs is None else len(embs)}" ) + out: List[Optional[List[float]]] = [] for i, emb in enumerate(embs): vec = _as_list(emb, normalize=effective_normalize) if vec is None: raise RuntimeError(f"Text model returned empty embedding for index {i}") out.append(vec) - elapsed_ms = (time.perf_counter() - t0) * 1000.0 + logger.info( - "embed_text done | backend=%s inputs=%d normalize=%s dim=%d elapsed_ms=%.2f", + "text backend done | backend=%s mode=%s inputs=%d normalize=%s dim=%d backend_elapsed_ms=%.2f", _text_backend_name, + mode, len(normalized), effective_normalize, len(out[0]) if out and out[0] is not None else 0, - elapsed_ms, + (time.perf_counter() - t0) * 1000.0, + extra=_request_log_extra(request_id), ) return out -@app.post("/embed/image") -def embed_image(images: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]: - if _image_model is None: - raise RuntimeError("Image model not loaded") - effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) - urls: List[str] = [] - for i, url_or_path in enumerate(images): - if not isinstance(url_or_path, str): - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") - s = url_or_path.strip() +@app.post("/embed/text") +async def embed_text( + texts: List[str], + http_request: Request, + response: Response, + normalize: Optional[bool] = None, +) -> List[Optional[List[float]]]: + request_id = _resolve_request_id(http_request) + response.headers["X-Request-ID"] = request_id + + effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) + normalized: List[str] = [] + for i, t in enumerate(texts): + if not isinstance(t, str): + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string") + s = t.strip() if not s: - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") - urls.append(s) + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string") + normalized.append(s) - logger.info( - "embed_image request | inputs=%d normalize=%s preview=%s", - len(urls), - effective_normalize, - _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), - ) + accepted, active = _text_request_limiter.try_acquire() + if not accepted: + logger.warning( + "embed_text rejected | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + _text_backend_name, + len(normalized), + effective_normalize, + active, + _TEXT_MAX_INFLIGHT, + _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), + extra=_request_log_extra(request_id), + ) + raise HTTPException( + status_code=_OVERLOAD_STATUS_CODE, + detail=f"Text embedding service busy: active={active}, limit={_TEXT_MAX_INFLIGHT}", + ) + + request_started = time.perf_counter() + success = False + try: + logger.info( + "embed_text request | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + _text_backend_name, + len(normalized), + effective_normalize, + active, + _TEXT_MAX_INFLIGHT, + _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), + extra=_request_log_extra(request_id), + ) + verbose_logger.info( + "embed_text detail | payload=%s normalize=%s backend=%s", + normalized, + effective_normalize, + _text_backend_name, + extra=_request_log_extra(request_id), + ) + out = await run_in_threadpool(_embed_text_impl, normalized, effective_normalize, request_id) + success = True + latency_ms = (time.perf_counter() - request_started) * 1000.0 + logger.info( + "embed_text response | backend=%s inputs=%d normalize=%s dim=%d first_vector=%s latency_ms=%.2f", + _text_backend_name, + len(normalized), + effective_normalize, + len(out[0]) if out and out[0] is not None else 0, + _preview_vector(out[0] if out else None), + latency_ms, + extra=_request_log_extra(request_id), + ) + verbose_logger.info( + "embed_text result detail | count=%d first_vector=%s latency_ms=%.2f", + len(out), + out[0][: _VECTOR_PREVIEW_DIMS] if out and out[0] is not None else [], + latency_ms, + extra=_request_log_extra(request_id), + ) + return out + except HTTPException: + raise + except Exception as e: + latency_ms = (time.perf_counter() - request_started) * 1000.0 + logger.error( + "embed_text failed | backend=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", + _text_backend_name, + len(normalized), + effective_normalize, + latency_ms, + e, + exc_info=True, + extra=_request_log_extra(request_id), + ) + raise HTTPException(status_code=502, detail=str(e)) from e + finally: + remaining = _text_request_limiter.release(success=success) + logger.info( + "embed_text finalize | success=%s active_after=%d", + success, + remaining, + extra=_request_log_extra(request_id), + ) + + +def _embed_image_impl( + urls: List[str], + effective_normalize: bool, + request_id: str, +) -> List[Optional[List[float]]]: + if _image_model is None: + raise RuntimeError("Image model not loaded") t0 = time.perf_counter() with _image_encode_lock: @@ -410,18 +702,120 @@ def embed_image(images: List[str], normalize: Optional[bool] = None) -> List[Opt f"Image model response length mismatch: expected {len(urls)}, " f"got {0 if vectors is None else len(vectors)}" ) + out: List[Optional[List[float]]] = [] for i, vec in enumerate(vectors): out_vec = _as_list(vec, normalize=effective_normalize) if out_vec is None: raise RuntimeError(f"Image model returned empty embedding for index {i}") out.append(out_vec) - elapsed_ms = (time.perf_counter() - t0) * 1000.0 + logger.info( - "embed_image done | inputs=%d normalize=%s dim=%d elapsed_ms=%.2f", + "image backend done | inputs=%d normalize=%s dim=%d backend_elapsed_ms=%.2f", len(urls), effective_normalize, len(out[0]) if out and out[0] is not None else 0, - elapsed_ms, + (time.perf_counter() - t0) * 1000.0, + extra=_request_log_extra(request_id), ) return out + + +@app.post("/embed/image") +async def embed_image( + images: List[str], + http_request: Request, + response: Response, + normalize: Optional[bool] = None, +) -> List[Optional[List[float]]]: + request_id = _resolve_request_id(http_request) + response.headers["X-Request-ID"] = request_id + + effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) + urls: List[str] = [] + for i, url_or_path in enumerate(images): + if not isinstance(url_or_path, str): + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") + s = url_or_path.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") + urls.append(s) + + accepted, active = _image_request_limiter.try_acquire() + if not accepted: + logger.warning( + "embed_image rejected | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + len(urls), + effective_normalize, + active, + _IMAGE_MAX_INFLIGHT, + _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), + extra=_request_log_extra(request_id), + ) + raise HTTPException( + status_code=_OVERLOAD_STATUS_CODE, + detail=f"Image embedding service busy: active={active}, limit={_IMAGE_MAX_INFLIGHT}", + ) + + request_started = time.perf_counter() + success = False + try: + logger.info( + "embed_image request | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + len(urls), + effective_normalize, + active, + _IMAGE_MAX_INFLIGHT, + _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), + extra=_request_log_extra(request_id), + ) + verbose_logger.info( + "embed_image detail | payload=%s normalize=%s", + urls, + effective_normalize, + extra=_request_log_extra(request_id), + ) + out = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id) + success = True + latency_ms = (time.perf_counter() - request_started) * 1000.0 + logger.info( + "embed_image response | inputs=%d normalize=%s dim=%d first_vector=%s latency_ms=%.2f", + len(urls), + effective_normalize, + len(out[0]) if out and out[0] is not None else 0, + _preview_vector(out[0] if out else None), + latency_ms, + extra=_request_log_extra(request_id), + ) + verbose_logger.info( + "embed_image result detail | count=%d first_vector=%s latency_ms=%.2f", + len(out), + out[0][: _VECTOR_PREVIEW_DIMS] if out and out[0] is not None else [], + latency_ms, + extra=_request_log_extra(request_id), + ) + return out + except HTTPException: + raise + except Exception as e: + latency_ms = (time.perf_counter() - request_started) * 1000.0 + logger.error( + "embed_image failed | inputs=%d normalize=%s latency_ms=%.2f error=%s", + len(urls), + effective_normalize, + latency_ms, + e, + exc_info=True, + extra=_request_log_extra(request_id), + ) + raise HTTPException(status_code=502, detail=f"Image embedding backend failure: {e}") from e + finally: + remaining = _image_request_limiter.release(success=success) + logger.info( + "embed_image finalize | success=%s active_after=%d", + success, + remaining, + extra=_request_log_extra(request_id), + ) diff --git a/perf_reports/20260319/nllb_t4_longtext_reassessment.md b/perf_reports/20260319/nllb_t4_longtext_reassessment.md new file mode 100644 index 0000000..8d4c9d2 --- /dev/null +++ b/perf_reports/20260319/nllb_t4_longtext_reassessment.md @@ -0,0 +1,97 @@ +# NLLB T4 Long-Text Reassessment + +Date: 2026-03-19 +Model: `nllb-200-distilled-600m` +Backend: `CTranslate2 + float16` +Direction: `zh -> en` + +## Why This Reassessment Exists + +Earlier notes mixed two different ideas: + +- `batch_size=64` was the highest-throughput point in the original product-title sweeps. +- `batch_size=16` was only a more conservative default candidate when trying to balance throughput with tail latency for online use. + +That distinction was not carried forward clearly enough. We re-checked the current long-text segmented workload instead of reusing the product-title conclusion mechanically. + +## Current Long-Text Workload Observed in Logs + +The clearest apples-to-apples evidence came from repeated uncached requests of the same long Chinese input: + +- input length: about `3944` to `3966` chars +- segmented into `60` pieces +- target language: `en` +- source language: `zh` + +### Log-Derived Comparison + +`batch_size=16` samples from [`logs/translator-2026-03-19.log`](/data/saas-search/logs/translator-2026-03-19.log): + +- `reqid=181f00ae` -> `1586.87 ms` +- `reqid=d6c1213f` -> `1732.95 ms` +- `reqid=26f8acd1` -> `4745.32 ms` + +`batch_size=64` samples from the same log: + +- `reqid=28262f1e` -> `752.96 ms` +- `reqid=737fc848` -> `815.66 ms` +- `reqid=8d05fa20` -> `835.25 ms` +- `reqid=e29d2629` -> `3927.87 ms` +- `reqid=c2b1df14` -> `4049.31 ms` + +### Summary + +For this `~3950 char / 60 segment` workload: + +- `batch_size=16` + - median end-to-end latency: `1732.95 ms` + - median `segmentation_summary -> response`: `1672 ms` +- `batch_size=64` + - median end-to-end latency: `835.25 ms` + - median `segmentation_summary -> response`: `782 ms` + +This means the steady-state inference portion was cut by about half after moving from `16` to `64`. + +## Important Environment Finding + +This machine was not in an isolated benchmark state while re-checking: + +- the single T4 was shared with translator, embedding, CN-CLIP, and reranker processes +- `nvidia-smi` showed about `15157 / 16384 MiB` in use during the re-check + +That explains the multi-second outliers in both the `16` and `64` groups. The outliers mainly appeared before the segmentation summary log, so they should be treated as shared-GPU contention noise, not pure model execution time. + +## Current Config Drift + +During this review, the live config had already been moved again to `batch_size=256`. + +That larger value is not yet backed by the same quality of evidence: + +- for `60` segments, `256` cannot improve on `64` in any meaningful way because both already fit the whole request into one inference batch +- for much larger requests such as `11847` chars and `180` segments, `256` may help, but we do not yet have a clean isolated comparison against `64` +- on a shared T4, larger batches also reduce memory headroom and make benchmarking less stable + +## Recommendation + +For the current shared-T4 deployment, keep the general NLLB default at: + +- `batch_size=64` +- `ct2_inter_threads=4` +- `ct2_max_queued_batches=32` +- `ct2_batch_type=examples` +- `max_new_tokens=64` +- `ct2_decoding_length_mode=source` +- `ct2_decoding_length_extra=8` +- `ct2_decoding_length_min=32` + +Treat `batch_size=128` or `256` as workload-specific experiments, not as the default baseline. + +## Best Practices Going Forward + +- Benchmark long-text segmented translation separately from product-title translation. +- Use uncached repeated requests with the same long sample when checking single-request latency. +- Split latency analysis into: + - `request -> segmentation summary` + - `segmentation summary -> response` +- Do not treat shared-GPU results as a clean config ranking. +- Before promoting a larger batch like `128` or `256` to default, re-run in a translator-only GPU window. diff --git a/scripts/benchmark_translation_longtext_single.py b/scripts/benchmark_translation_longtext_single.py new file mode 100644 index 0000000..334faa8 --- /dev/null +++ b/scripts/benchmark_translation_longtext_single.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""Benchmark a single long-text translation request for local models.""" + +from __future__ import annotations + +import argparse +import copy +import json +import logging +import statistics +import time +from pathlib import Path + +import torch + +PROJECT_ROOT = Path(__file__).resolve().parent.parent + +import sys + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from config.services_config import get_translation_config # noqa: E402 +from translation.service import TranslationService # noqa: E402 +from translation.text_splitter import compute_safe_input_token_limit # noqa: E402 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Benchmark a long-text translation request") + parser.add_argument("--model", default="nllb-200-distilled-600m") + parser.add_argument("--source-lang", default="zh") + parser.add_argument("--target-lang", default="en") + parser.add_argument("--scene", default="sku_name") + parser.add_argument("--source-md", default="docs/搜索API对接指南.md") + parser.add_argument("--paragraph-min-chars", type=int, default=250) + parser.add_argument("--target-doc-chars", type=int, default=4500) + parser.add_argument("--min-doc-chars", type=int, default=2400) + parser.add_argument("--runs", type=int, default=3) + parser.add_argument("--batch-size", type=int, default=64) + parser.add_argument("--ct2-inter-threads", type=int, default=4) + parser.add_argument("--ct2-max-queued-batches", type=int, default=32) + parser.add_argument("--ct2-batch-type", default="examples") + parser.add_argument("--max-new-tokens", type=int, default=64) + parser.add_argument("--ct2-decoding-length-mode", default="source") + parser.add_argument("--ct2-decoding-length-extra", type=int, default=8) + parser.add_argument("--ct2-decoding-length-min", type=int, default=32) + return parser.parse_args() + + +def build_long_document(args: argparse.Namespace) -> str: + source_path = (PROJECT_ROOT / args.source_md).resolve() + text = source_path.read_text(encoding="utf-8") + paragraphs = [] + for raw in text.split("\n\n"): + normalized = " ".join(line.strip() for line in raw.splitlines() if line.strip()) + if len(normalized) >= args.paragraph_min_chars and not normalized.startswith("```"): + paragraphs.append(normalized) + + parts = [] + total = 0 + for paragraph in paragraphs: + parts.append(paragraph) + total += len(paragraph) + 2 + if total >= args.target_doc_chars: + break + document = "\n\n".join(parts) + if len(document) < args.min_doc_chars: + raise ValueError( + f"Prepared long document is too short: {len(document)} chars < {args.min_doc_chars}" + ) + return document + + +def build_service(args: argparse.Namespace) -> TranslationService: + config = copy.deepcopy(get_translation_config()) + for name, capability in config["capabilities"].items(): + capability["enabled"] = name == args.model + + capability = config["capabilities"][args.model] + capability["use_cache"] = False + capability["batch_size"] = args.batch_size + capability["ct2_inter_threads"] = args.ct2_inter_threads + capability["ct2_max_queued_batches"] = args.ct2_max_queued_batches + capability["ct2_batch_type"] = args.ct2_batch_type + capability["max_new_tokens"] = args.max_new_tokens + capability["ct2_decoding_length_mode"] = args.ct2_decoding_length_mode + capability["ct2_decoding_length_extra"] = args.ct2_decoding_length_extra + capability["ct2_decoding_length_min"] = args.ct2_decoding_length_min + config["default_model"] = args.model + return TranslationService(config) + + +def percentile(values: list[float], p: float) -> float: + if not values: + return 0.0 + ordered = sorted(values) + if len(ordered) == 1: + return float(ordered[0]) + index = min(len(ordered) - 1, max(0, round((len(ordered) - 1) * p))) + return float(ordered[index]) + + +def main() -> None: + args = parse_args() + logging.getLogger().setLevel(logging.WARNING) + + document = build_long_document(args) + load_started = time.perf_counter() + service = build_service(args) + backend = service.get_backend(args.model) + load_seconds = time.perf_counter() - load_started + + safe_input_limit = compute_safe_input_token_limit( + max_input_length=backend.max_input_length, + max_new_tokens=backend.max_new_tokens, + decoding_length_mode=backend.ct2_decoding_length_mode, + decoding_length_extra=backend.ct2_decoding_length_extra, + ) + segments = backend._split_text_if_needed( + document, + target_lang=args.target_lang, + source_lang=args.source_lang, + ) + + # Warm up once before measurements. + _ = service.translate( + document, + source_lang=args.source_lang, + target_lang=args.target_lang, + model=args.model, + scene=args.scene, + ) + if torch.cuda.is_available(): + torch.cuda.synchronize() + + latencies_ms: list[float] = [] + output_chars = 0 + for _ in range(args.runs): + started = time.perf_counter() + output = service.translate( + document, + source_lang=args.source_lang, + target_lang=args.target_lang, + model=args.model, + scene=args.scene, + ) + if torch.cuda.is_available(): + torch.cuda.synchronize() + latencies_ms.append((time.perf_counter() - started) * 1000) + output_chars += len(output or "") + + total_seconds = sum(latencies_ms) / 1000.0 + payload = { + "model": args.model, + "source_lang": args.source_lang, + "target_lang": args.target_lang, + "doc_chars": len(document), + "runs": args.runs, + "load_seconds": round(load_seconds, 3), + "batch_size": backend.batch_size, + "ct2_inter_threads": backend.ct2_inter_threads, + "ct2_max_queued_batches": backend.ct2_max_queued_batches, + "ct2_batch_type": backend.ct2_batch_type, + "max_new_tokens": backend.max_new_tokens, + "ct2_decoding_length_mode": backend.ct2_decoding_length_mode, + "ct2_decoding_length_extra": backend.ct2_decoding_length_extra, + "ct2_decoding_length_min": backend.ct2_decoding_length_min, + "safe_input_limit": safe_input_limit, + "segment_count": len(segments), + "segment_char_lengths": { + "min": min(len(segment) for segment in segments), + "max": max(len(segment) for segment in segments), + "avg": round(statistics.fmean(len(segment) for segment in segments), 1), + }, + "latency_avg_ms": round(statistics.fmean(latencies_ms), 2), + "latency_p50_ms": round(percentile(latencies_ms, 0.50), 2), + "latency_p95_ms": round(percentile(latencies_ms, 0.95), 2), + "latency_max_ms": round(max(latencies_ms), 2), + "input_chars_per_second": round((len(document) * args.runs) / total_seconds, 2), + "output_chars_per_second": round(output_chars / total_seconds, 2), + } + print(json.dumps(payload, ensure_ascii=False)) + + +if __name__ == "__main__": + main() diff --git a/scripts/start_cnclip_service.sh b/scripts/start_cnclip_service.sh index c3823cb..0c22084 100755 --- a/scripts/start_cnclip_service.sh +++ b/scripts/start_cnclip_service.sh @@ -12,7 +12,7 @@ # 选项: # --port PORT 服务端口(默认:51000) # --device DEVICE 设备类型:cuda 或 cpu(默认:cuda) -# --model-name NAME 模型名称(默认:CN-CLIP/ViT-H-14) +# --model-name NAME 模型名称(默认读取 embeddings/config.py) # --replicas NUM 副本数(默认:1) # --help 显示帮助信息 # @@ -31,15 +31,31 @@ YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # No Color +# 项目路径(以仓库实际路径为准,避免写死 /data/tw/...) +PROJECT_ROOT="$(cd "$(dirname "$0")/.." && pwd)" + +resolve_default_model_name() { + local python_bin + local resolved_model_name + for python_bin in python3 python; do + if command -v "${python_bin}" >/dev/null 2>&1; then + if resolved_model_name="$(PYTHONPATH="${PROJECT_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" "${python_bin}" -c "from embeddings.config import CONFIG; print(CONFIG.CLIP_AS_SERVICE_MODEL_NAME)" 2>/dev/null)"; then + if [ -n "${resolved_model_name}" ]; then + echo "${resolved_model_name}" + return 0 + fi + fi + fi + done + echo "CN-CLIP/ViT-L-14" +} + # 默认配置 DEFAULT_PORT=51000 DEFAULT_DEVICE="cuda" -DEFAULT_MODEL_NAME="CN-CLIP/ViT-H-14" -# DEFAULT_MODEL_NAME="CN-CLIP/ViT-L-14-336" +DEFAULT_MODEL_NAME="$(resolve_default_model_name)" DEFAULT_REPLICAS=1 # 副本数 -# 项目路径(以仓库实际路径为准,避免写死 /data/tw/...) -PROJECT_ROOT="$(cd "$(dirname "$0")/.." && pwd)" CLIP_SERVER_DIR="${PROJECT_ROOT}/third-party/clip-as-service/server" LOG_DIR="${PROJECT_ROOT}/logs" PID_FILE="${LOG_DIR}/cnclip.pid" @@ -64,20 +80,37 @@ show_help() { echo " $0 # 使用默认配置启动" echo " $0 --port 52000 --device cuda # 指定 CUDA 模式,端口 52000" echo " $0 --port 52000 --device cpu # 显式使用 CPU 模式" + echo " $0 --model-name CN-CLIP/ViT-L-14 # 临时覆盖模型" echo " $0 --replicas 2 # 启动2个副本(需8-10GB显存)" echo "" + echo "说明:" + echo " - 默认模型取自 embeddings/config.py 的 CLIP_AS_SERVICE_MODEL_NAME" + echo " - 也可通过环境变量 CNCLIP_MODEL_NAME 覆盖,再由 --model-name 最终覆盖" + echo "" echo "支持的模型:" - echo " - CN-CLIP/ViT-B-16 基础版本,速度快" - echo " - CN-CLIP/ViT-L-14 平衡版本" - echo " - CN-CLIP/ViT-L-14-336 高分辨率版本" - echo " - CN-CLIP/ViT-H-14 大型版本,精度高(默认)" - echo " - CN-CLIP/RN50 ResNet-50 版本" + local supported_models=( + "CN-CLIP/ViT-B-16|基础版本,速度快" + "CN-CLIP/ViT-L-14|平衡版本" + "CN-CLIP/ViT-L-14-336|高分辨率版本" + "CN-CLIP/ViT-H-14|大型版本,精度高" + "CN-CLIP/RN50|ResNet-50 版本" + ) + local item model desc suffix + for item in "${supported_models[@]}"; do + model="${item%%|*}" + desc="${item#*|}" + suffix="" + if [ "${model}" = "${DEFAULT_MODEL_NAME}" ]; then + suffix="(当前默认)" + fi + echo " - ${model} ${desc}${suffix}" + done } # 解析命令行参数 PORT="${CNCLIP_PORT:-${DEFAULT_PORT}}" DEVICE=${DEFAULT_DEVICE} -MODEL_NAME=${DEFAULT_MODEL_NAME} +MODEL_NAME="${CNCLIP_MODEL_NAME:-${DEFAULT_MODEL_NAME}}" REPLICAS=${DEFAULT_REPLICAS} while [[ $# -gt 0 ]]; do diff --git a/scripts/start_embedding_service.sh b/scripts/start_embedding_service.sh index 67d49f3..0eb7252 100755 --- a/scripts/start_embedding_service.sh +++ b/scripts/start_embedding_service.sh @@ -30,6 +30,7 @@ DEFAULT_EMBEDDING_SERVICE_HOST=$("${PYTHON_BIN}" -c "from embeddings.config impo DEFAULT_EMBEDDING_SERVICE_PORT=$("${PYTHON_BIN}" -c "from embeddings.config import CONFIG; print(CONFIG.PORT)") USE_CLIP_AS_SERVICE=$("${PYTHON_BIN}" -c "from embeddings.config import CONFIG; print('1' if CONFIG.USE_CLIP_AS_SERVICE else '0')") CLIP_AS_SERVICE_SERVER=$("${PYTHON_BIN}" -c "from embeddings.config import CONFIG; print(CONFIG.CLIP_AS_SERVICE_SERVER)") +CLIP_AS_SERVICE_MODEL_NAME=$("${PYTHON_BIN}" -c "from embeddings.config import CONFIG; print(CONFIG.CLIP_AS_SERVICE_MODEL_NAME)") TEXT_BACKEND=$("${PYTHON_BIN}" -c "from config.services_config import get_embedding_backend_config; print(get_embedding_backend_config()[0])") TEI_BASE_URL=$("${PYTHON_BIN}" -c "import os; from config.services_config import get_embedding_backend_config; from embeddings.config import CONFIG; _, cfg = get_embedding_backend_config(); print(os.getenv('TEI_BASE_URL') or cfg.get('base_url') or CONFIG.TEI_BASE_URL)") ENABLE_IMAGE_MODEL="${EMBEDDING_ENABLE_IMAGE_MODEL:-true}" @@ -84,14 +85,17 @@ echo "Python: ${PYTHON_BIN}" echo "Host: ${EMBEDDING_SERVICE_HOST}" echo "Port: ${EMBEDDING_SERVICE_PORT}" echo "Text backend: ${TEXT_BACKEND}" +echo "Text max inflight: ${TEXT_MAX_INFLIGHT:-32}" if [[ "${TEXT_BACKEND}" == "tei" ]]; then echo "TEI URL: ${TEI_BASE_URL}" fi if [[ "${IMAGE_MODEL_ENABLED}" == "0" ]]; then echo "Image backend: disabled" elif [[ "${USE_CLIP_AS_SERVICE}" == "1" ]]; then - echo "Image backend: clip-as-service (${CLIP_AS_SERVICE_SERVER})" + echo "Image backend: clip-as-service (${CLIP_AS_SERVICE_SERVER}, model=${CLIP_AS_SERVICE_MODEL_NAME})" fi +echo "Image max inflight: ${IMAGE_MAX_INFLIGHT:-1}" +echo "Logs: logs/embedding_api.log, logs/embedding_api_error.log, logs/verbose/embedding_verbose.log" echo echo "Tips:" echo " - Use a single worker (GPU models cannot be safely duplicated across workers)." diff --git a/tests/test_embedding_service_limits.py b/tests/test_embedding_service_limits.py new file mode 100644 index 0000000..2daa28d --- /dev/null +++ b/tests/test_embedding_service_limits.py @@ -0,0 +1,93 @@ +import asyncio + +import numpy as np +import pytest + +import embeddings.server as embedding_server + + +class _DummyClient: + host = "127.0.0.1" + + +class _DummyRequest: + def __init__(self, headers=None): + self.headers = headers or {} + self.client = _DummyClient() + + +class _DummyResponse: + def __init__(self): + self.headers = {} + + +class _FakeTextModel: + def encode(self, texts, batch_size, device, normalize_embeddings): + assert texts == ["hello world"] + assert normalize_embeddings is False + return [np.array([1.0, 2.0, 3.0], dtype=np.float32)] + + +def test_health_exposes_limit_stats(monkeypatch): + monkeypatch.setattr( + embedding_server, + "_text_request_limiter", + embedding_server._InflightLimiter("text", 2), + ) + monkeypatch.setattr( + embedding_server, + "_image_request_limiter", + embedding_server._InflightLimiter("image", 1), + ) + + payload = embedding_server.health() + + assert payload["status"] == "ok" + assert payload["limits"]["text"]["limit"] == 2 + assert payload["limits"]["image"]["limit"] == 1 + assert "queue_depth" in payload["text_microbatch"] + + +def test_embed_image_rejects_when_image_lane_is_full(monkeypatch): + limiter = embedding_server._InflightLimiter("image", 1) + acquired, _ = limiter.try_acquire() + assert acquired is True + monkeypatch.setattr(embedding_server, "_image_request_limiter", limiter) + + response = _DummyResponse() + with pytest.raises(embedding_server.HTTPException) as exc_info: + asyncio.run( + embedding_server.embed_image( + ["https://example.com/a.jpg"], + _DummyRequest(), + response, + ) + ) + + assert exc_info.value.status_code == embedding_server._OVERLOAD_STATUS_CODE + assert "busy" in exc_info.value.detail + assert limiter.snapshot()["rejected_total"] == 1 + + +def test_embed_text_returns_request_id_and_vector(monkeypatch): + monkeypatch.setattr( + embedding_server, + "_text_request_limiter", + embedding_server._InflightLimiter("text", 2), + ) + monkeypatch.setattr(embedding_server, "_text_model", _FakeTextModel()) + monkeypatch.setattr(embedding_server, "_text_backend_name", "tei") + + request = _DummyRequest(headers={"X-Request-ID": "req-123456"}) + response = _DummyResponse() + result = asyncio.run( + embedding_server.embed_text( + ["hello world"], + request, + response, + normalize=False, + ) + ) + + assert response.headers["X-Request-ID"] == "req-123456" + assert result == [[1.0, 2.0, 3.0]] diff --git a/translation/backends/local_ctranslate2.py b/translation/backends/local_ctranslate2.py index 7d1f5d0..58de075 100644 --- a/translation/backends/local_ctranslate2.py +++ b/translation/backends/local_ctranslate2.py @@ -353,14 +353,24 @@ class LocalCTranslate2TranslationBackend: source_lang: Optional[str] = None, ) -> List[str]: limit = self._effective_input_token_limit(target_lang, source_lang) - return split_text_for_translation( - text, - max_tokens=limit, - token_length_fn=lambda value: self._token_count( + token_count_cache: Dict[str, int] = {} + + def _cached_token_count(value: str) -> int: + cached = token_count_cache.get(value) + if cached is not None: + return cached + count = self._token_count( value, target_lang=target_lang, source_lang=source_lang, - ), + ) + token_count_cache[value] = count + return count + + return split_text_for_translation( + text, + max_tokens=limit, + token_length_fn=_cached_token_count, ) def _log_segmentation_summary( diff --git a/translation/backends/local_seq2seq.py b/translation/backends/local_seq2seq.py index 6a517c1..893055d 100644 --- a/translation/backends/local_seq2seq.py +++ b/translation/backends/local_seq2seq.py @@ -203,14 +203,24 @@ class LocalSeq2SeqTranslationBackend: source_lang: Optional[str] = None, ) -> List[str]: limit = self._effective_input_token_limit(target_lang, source_lang) - return split_text_for_translation( - text, - max_tokens=limit, - token_length_fn=lambda value: self._token_count( + token_count_cache: Dict[str, int] = {} + + def _cached_token_count(value: str) -> int: + cached = token_count_cache.get(value) + if cached is not None: + return cached + count = self._token_count( value, target_lang=target_lang, source_lang=source_lang, - ), + ) + token_count_cache[value] = count + return count + + return split_text_for_translation( + text, + max_tokens=limit, + token_length_fn=_cached_token_count, ) def _log_segmentation_summary( -- libgit2 0.21.2