From b754fd41470f2a1dab70339d336383de0cd8af1c Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 20 Mar 2026 11:59:57 +0800 Subject: [PATCH] 图片向量化支持优先级参数 --- config/config.yaml | 2 +- docs/TODO.txt | 7 +++---- docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md | 16 ++++++++++++++-- embeddings/README.md | 17 +++++++++++++++-- embeddings/image_encoder.py | 34 +++++++++++++++++++++++++++++----- embeddings/server.py | 315 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------- embeddings/text_encoder.py | 19 ++++++++++++++++--- perf_reports/README.md | 34 ++++++++++++++++++++++++++++++++++ query/query_parser.py | 2 +- scripts/perf_api_benchmark.py | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- scripts/perf_cases.json.example | 9 +++++++++ search/searcher.py | 2 +- tests/ci/test_service_api_contracts.py | 40 +++++++++++++++++++++++++++++++++++++--- tests/test_embedding_pipeline.py | 29 ++++++++++++++++++++++++++--- tests/test_embedding_service_limits.py | 2 ++ tests/test_embedding_service_priority.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 16 files changed, 609 insertions(+), 65 deletions(-) create mode 100644 perf_reports/README.md create mode 100644 tests/test_embedding_service_priority.py diff --git a/config/config.yaml b/config/config.yaml index 87b7b0d..70ee0d6 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -110,7 +110,7 @@ rerank: services: translation: service_url: "http://127.0.0.1:6006" - default_model: "llm" + default_model: "nllb-200-distilled-600m" default_scene: "general" timeout_sec: 10.0 cache: diff --git a/docs/TODO.txt b/docs/TODO.txt index ffbdc5e..a116c17 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -1,11 +1,11 @@ -先阅读图片和文本embedding相关的代码: -@embeddings/README.md @embeddings/server.py @docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md @embeddings/image_encoder.py @embeddings/text_encoder.py +先阅读文本embedding相关的代码: +@embeddings/README.md @embeddings/server.py @docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md @embeddings/text_encoder.py 目前有TEXT_MAX_INFLIGHT / IMAGE_MAX_INFLIGHT 准入限制,超限返回过载状态码。 -embedding服务(包括图片和文本的embedding),要支持 priority 查询参数,priority > 0:不计入上述 inflight、不会因准入被拒绝; +文本embedding服务,要支持 priority 查询参数,priority > 0:不计入上述 inflight、不会因准入被拒绝(图片embedding不需要支持,因为只有离线需要用到图片embedding) priority == 0(默认,适合做索引之类的离线任务):仍走原有 TEXT_MAX_INFLIGHT / IMAGE_MAX_INFLIGHT 准入;超限返回过载状态码。 priority > 0(或者==1)(适合在线请求):不会因准入被拒绝,但是仍然需要占用inflight,这样保证在线请求不被限制,并且在线请求很多的时候可以拒绝掉离线的请求。 @@ -16,7 +16,6 @@ priority > 0(或者==1)(适合在线请求):不会因准入被拒绝 - 配置体系的重构。 Referring to @docs/config-system-review-and-redesign.md , most of the modifications have been completed. Could you conduct a review to check what else needs improvement in the configuration documentation system? Are there any outstanding issues? diff --git a/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md b/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md index ad33930..a8d4545 100644 --- a/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md +++ b/docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md @@ -38,6 +38,10 @@ - `TEXT_MAX_INFLIGHT` - `IMAGE_MAX_INFLIGHT` - 当超过处理能力时,服务会直接返回过载错误,而不是无限排队。 +- 文本与图片服务均支持 `priority` query 参数(图片不做队列插队,仅 admission 规则与文本一致): + - `priority=0`(默认):适合离线索引,仍分别受 `TEXT_MAX_INFLIGHT` / `IMAGE_MAX_INFLIGHT` admission control 约束。 + - `priority>0`(建议在线请求用 `1`):不会因 admission control 被拒绝,但仍会占用对应 text/image 的 inflight。 + - 文本服务端会优先处理高优先级文本请求;图片端不实现插队,顺序按请求到达处理即可。 - `GET /health` 会返回各自的 `limits`、`stats`、`cache_enabled` 等状态;`GET /ready` 用于就绪探针。 #### 7.1.1 `POST /embed/text` — 文本向量化 @@ -59,11 +63,15 @@ **完整 curl 示例**: ```bash -curl -X POST "http://localhost:6005/embed/text?normalize=true" \ +curl -X POST "http://localhost:6005/embed/text?normalize=true&priority=1" \ -H "Content-Type: application/json" \ -d '["芭比娃娃 儿童玩具", "纯棉T恤 短袖"]' ``` +说明: +- 在线 query / 实时请求:建议显式传 `priority=1` +- 离线索引 / 批量回填:保持默认 `priority=0` 即可 + #### 7.1.2 `POST /embed/image` — 图片向量化 将图片 URL 或路径转为向量,用于以图搜图。 @@ -85,11 +93,13 @@ curl -X POST "http://localhost:6005/embed/text?normalize=true" \ **完整 curl 示例**: ```bash -curl -X POST "http://localhost:6008/embed/image?normalize=true" \ +curl -X POST "http://localhost:6008/embed/image?normalize=true&priority=1" \ -H "Content-Type: application/json" \ -d '["https://oss.essa.cn/98532128-cf8e-456c-9e30-6f2a5ea0c19f.jpg"]' ``` +在线以图搜图等实时场景可传 `priority=1`;离线索引回填保持默认 `priority=0`。 + #### 7.1.3 `GET /health` — 健康检查 ```bash @@ -118,6 +128,8 @@ curl "http://localhost:6008/ready" - cache key 已区分 `normalize=true/false`,避免不同归一化策略命中同一条缓存。 - 当服务端发现请求是 **full-cache-hit** 时,会直接返回,不占用模型并发槽位。 - 当服务端发现超过 `TEXT_MAX_INFLIGHT` / `IMAGE_MAX_INFLIGHT` 时,会直接拒绝,而不是无限排队。 +- 其中 `POST /embed/text` 的 `priority=0` 会按上面的 inflight 规则直接拒绝;`priority>0` 不会被 admission 拒绝,但仍计入 inflight,并在服务端排队时优先于 `priority=0` 请求。 +- `POST /embed/image` 的 `priority=0` 受 `IMAGE_MAX_INFLIGHT` 约束;`priority>0` 不会被 admission 拒绝,但仍计入 inflight(无插队)。 #### 7.1.6 TEI 统一调优建议(主服务) diff --git a/embeddings/README.md b/embeddings/README.md index e89901d..66e9185 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -30,13 +30,13 @@ - 文本服务(默认 `6005`) - `POST /embed/text` - 请求体:`["文本1", "文本2", ...]` - - 可选 query 参数:`normalize=true|false` + - 可选 query 参数:`normalize=true|false`、`priority=0|1` - 返回:`[[...], [...], ...]` - 健康接口:`GET /health`、`GET /ready` - 图片服务(默认 `6008`) - `POST /embed/image` - 请求体:`["url或本地路径1", ...]` - - 可选 query 参数:`normalize=true|false` + - 可选 query 参数:`normalize=true|false`、`priority=0|1` - 返回:`[[...], [...], ...]` - 健康接口:`GET /health`、`GET /ready` @@ -61,6 +61,11 @@ - 图片服务可以配置得比文本更严格。 - 请求若是 full-cache-hit,会在服务端直接返回,不占用模型并发槽位。 - 超过处理能力时直接拒绝,比无限排队更稳定。 +- 文本服务支持 `priority`: + - `priority=0`(默认,适合离线索引)仍受 `TEXT_MAX_INFLIGHT` 限制,超限直接返回 overload。 + - `priority>0`(建议在线 query 用 `1`)不会因 admission control 被拒绝,但仍会计入 inflight。 + - 文本服务内部使用双队列调度,处理时会优先消费高优先级请求,避免在线请求长期排在离线批量任务后面。 +- 图片服务同样支持 `priority`(语义与文本一致,按 `IMAGE_MAX_INFLIGHT` 计数;不做队列插队,仅 admission 规则不同)。 ### 图片向量:clip-as-service(推荐) @@ -86,6 +91,14 @@ - `CLIP_AS_SERVICE_MODEL_NAME=CN-CLIP/ViT-L-14` - `scripts/start_cnclip_service.sh` 默认会读取同一个 `CLIP_AS_SERVICE_MODEL_NAME`,也可用 `CNCLIP_MODEL_NAME` 或 `--model-name` 临时覆盖 +### 性能与压测(沿用仓库脚本) + +- 接口级压测(与 `perf_reports/2026-03-12/matrix_report/` 等方法一致):`scripts/perf_api_benchmark.py` + - 示例:`python scripts/perf_api_benchmark.py --scenario embed_text --duration 30 --concurrency 20` + - 文本/图片向量可带 `priority`(与线上 admission 语义一致):`--embed-text-priority 1`、`--embed-image-priority 1` + - 自定义请求模板:`--cases-file scripts/perf_cases.json.example` +- 历史矩阵结果与说明见 `perf_reports/2026-03-12/matrix_report/summary.md`。 + ### 启动服务 使用仓库脚本启动: diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index acb313a..861bfbc 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -35,7 +35,12 @@ class CLIPImageEncoder: namespace="image", ) - def _call_service(self, request_data: List[str], normalize_embeddings: bool = True) -> List[Any]: + def _call_service( + self, + request_data: List[str], + normalize_embeddings: bool = True, + priority: int = 0, + ) -> List[Any]: """ Call the embedding service API. @@ -48,7 +53,10 @@ class CLIPImageEncoder: try: response = requests.post( self.endpoint, - params={"normalize": "true" if normalize_embeddings else "false"}, + params={ + "normalize": "true" if normalize_embeddings else "false", + "priority": max(0, int(priority)), + }, json=request_data, timeout=60 ) @@ -66,7 +74,12 @@ class CLIPImageEncoder: """ raise NotImplementedError("encode_image with PIL Image is not supported by embedding service") - def encode_image_from_url(self, url: str, normalize_embeddings: bool = True) -> np.ndarray: + def encode_image_from_url( + self, + url: str, + normalize_embeddings: bool = True, + priority: int = 0, + ) -> np.ndarray: """ Generate image embedding via network service using URL. @@ -81,7 +94,11 @@ class CLIPImageEncoder: if cached is not None: return cached - response_data = self._call_service([url], normalize_embeddings=normalize_embeddings) + response_data = self._call_service( + [url], + normalize_embeddings=normalize_embeddings, + priority=priority, + ) if not response_data or len(response_data) != 1 or response_data[0] is None: raise RuntimeError(f"No image embedding returned for URL: {url}") vec = np.array(response_data[0], dtype=np.float32) @@ -95,6 +112,7 @@ class CLIPImageEncoder: images: List[Union[str, Image.Image]], batch_size: int = 8, normalize_embeddings: bool = True, + priority: int = 0, ) -> List[np.ndarray]: """ Encode a batch of images efficiently via network service. @@ -129,7 +147,11 @@ class CLIPImageEncoder: for i in range(0, len(pending_urls), batch_size): batch_urls = pending_urls[i : i + batch_size] - response_data = self._call_service(batch_urls, normalize_embeddings=normalize_embeddings) + response_data = self._call_service( + batch_urls, + normalize_embeddings=normalize_embeddings, + priority=priority, + ) if not response_data or len(response_data) != len(batch_urls): raise RuntimeError( f"Image embedding response length mismatch: expected {len(batch_urls)}, " @@ -153,6 +175,7 @@ class CLIPImageEncoder: urls: List[str], batch_size: Optional[int] = None, normalize_embeddings: bool = True, + priority: int = 0, ) -> List[np.ndarray]: """ 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。 @@ -168,4 +191,5 @@ class CLIPImageEncoder: urls, batch_size=batch_size or 8, normalize_embeddings=normalize_embeddings, + priority=priority, ) diff --git a/embeddings/server.py b/embeddings/server.py index ee41def..8e80321 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -206,23 +206,24 @@ class _InflightLimiter: def __init__(self, name: str, limit: int): self.name = name self.limit = max(1, int(limit)) - self._sem = threading.BoundedSemaphore(self.limit) self._lock = threading.Lock() self._active = 0 self._rejected = 0 self._completed = 0 self._failed = 0 self._max_active = 0 + self._priority_bypass_total = 0 - def try_acquire(self) -> tuple[bool, int]: - if not self._sem.acquire(blocking=False): - with self._lock: + def try_acquire(self, *, bypass_limit: bool = False) -> tuple[bool, int]: + with self._lock: + if not bypass_limit and self._active >= self.limit: self._rejected += 1 active = self._active - return False, active - with self._lock: + return False, active self._active += 1 self._max_active = max(self._max_active, self._active) + if bypass_limit: + self._priority_bypass_total += 1 active = self._active return True, active @@ -234,7 +235,6 @@ class _InflightLimiter: else: self._failed += 1 active = self._active - self._sem.release() return active def snapshot(self) -> Dict[str, int]: @@ -246,9 +246,157 @@ class _InflightLimiter: "completed_total": self._completed, "failed_total": self._failed, "max_active": self._max_active, + "priority_bypass_total": self._priority_bypass_total, } +def _effective_priority(priority: int) -> int: + return 1 if int(priority) > 0 else 0 + + +def _priority_label(priority: int) -> str: + return "high" if _effective_priority(priority) > 0 else "normal" + + +@dataclass +class _TextDispatchTask: + normalized: List[str] + effective_normalize: bool + request_id: str + priority: int + created_at: float + done: threading.Event + result: Optional[_EmbedResult] = None + error: Optional[Exception] = None + + +_text_dispatch_high_queue: "deque[_TextDispatchTask]" = deque() +_text_dispatch_normal_queue: "deque[_TextDispatchTask]" = deque() +_text_dispatch_cv = threading.Condition() +_text_dispatch_workers: List[threading.Thread] = [] +_text_dispatch_worker_stop = False +_text_dispatch_worker_count = 0 + + +def _text_dispatch_queue_depth() -> Dict[str, int]: + with _text_dispatch_cv: + return { + "high": len(_text_dispatch_high_queue), + "normal": len(_text_dispatch_normal_queue), + "total": len(_text_dispatch_high_queue) + len(_text_dispatch_normal_queue), + } + + +def _pop_text_dispatch_task_locked() -> Optional["_TextDispatchTask"]: + if _text_dispatch_high_queue: + return _text_dispatch_high_queue.popleft() + if _text_dispatch_normal_queue: + return _text_dispatch_normal_queue.popleft() + return None + + +def _start_text_dispatch_workers() -> None: + global _text_dispatch_workers, _text_dispatch_worker_stop, _text_dispatch_worker_count + if _text_model is None: + return + target_worker_count = 1 if _text_backend_name == "local_st" else _TEXT_MAX_INFLIGHT + alive_workers = [worker for worker in _text_dispatch_workers if worker.is_alive()] + if len(alive_workers) == target_worker_count: + _text_dispatch_workers = alive_workers + _text_dispatch_worker_count = target_worker_count + return + _text_dispatch_worker_stop = False + _text_dispatch_worker_count = target_worker_count + _text_dispatch_workers = [] + for idx in range(target_worker_count): + worker = threading.Thread( + target=_text_dispatch_worker_loop, + args=(idx,), + name=f"embed-text-dispatch-{idx}", + daemon=True, + ) + worker.start() + _text_dispatch_workers.append(worker) + logger.info( + "Started text dispatch workers | backend=%s workers=%d", + _text_backend_name, + target_worker_count, + ) + + +def _stop_text_dispatch_workers() -> None: + global _text_dispatch_worker_stop + with _text_dispatch_cv: + _text_dispatch_worker_stop = True + _text_dispatch_cv.notify_all() + + +def _text_dispatch_worker_loop(worker_idx: int) -> None: + while True: + with _text_dispatch_cv: + while ( + not _text_dispatch_high_queue + and not _text_dispatch_normal_queue + and not _text_dispatch_worker_stop + ): + _text_dispatch_cv.wait() + if _text_dispatch_worker_stop: + return + task = _pop_text_dispatch_task_locked() + if task is None: + continue + try: + queue_wait_ms = (time.perf_counter() - task.created_at) * 1000.0 + logger.info( + "text dispatch start | worker=%d priority=%s inputs=%d queue_wait_ms=%.2f", + worker_idx, + _priority_label(task.priority), + len(task.normalized), + queue_wait_ms, + extra=_request_log_extra(task.request_id), + ) + task.result = _embed_text_impl( + task.normalized, + task.effective_normalize, + task.request_id, + task.priority, + ) + except Exception as exc: + task.error = exc + finally: + task.done.set() + + +def _submit_text_dispatch_and_wait( + normalized: List[str], + effective_normalize: bool, + request_id: str, + priority: int, +) -> _EmbedResult: + if not any(worker.is_alive() for worker in _text_dispatch_workers): + _start_text_dispatch_workers() + task = _TextDispatchTask( + normalized=normalized, + effective_normalize=effective_normalize, + request_id=request_id, + priority=_effective_priority(priority), + created_at=time.perf_counter(), + done=threading.Event(), + ) + with _text_dispatch_cv: + if task.priority > 0: + _text_dispatch_high_queue.append(task) + else: + _text_dispatch_normal_queue.append(task) + _text_dispatch_cv.notify() + task.done.wait() + if task.error is not None: + raise task.error + if task.result is None: + raise RuntimeError("Text dispatch worker returned empty result") + return task.result + + _text_request_limiter = _InflightLimiter(name="text", limit=_TEXT_MAX_INFLIGHT) _image_request_limiter = _InflightLimiter(name="image", limit=_IMAGE_MAX_INFLIGHT) _text_stats = _EndpointStats(name="text") @@ -261,6 +409,7 @@ _image_cache = RedisEmbeddingCache(key_prefix=_CACHE_PREFIX, namespace="image") class _SingleTextTask: text: str normalize: bool + priority: int created_at: float request_id: str done: threading.Event @@ -268,12 +417,30 @@ class _SingleTextTask: error: Optional[Exception] = None -_text_single_queue: "deque[_SingleTextTask]" = deque() +_text_single_high_queue: "deque[_SingleTextTask]" = deque() +_text_single_normal_queue: "deque[_SingleTextTask]" = deque() _text_single_queue_cv = threading.Condition() _text_batch_worker: Optional[threading.Thread] = None _text_batch_worker_stop = False +def _text_microbatch_queue_depth() -> Dict[str, int]: + with _text_single_queue_cv: + return { + "high": len(_text_single_high_queue), + "normal": len(_text_single_normal_queue), + "total": len(_text_single_high_queue) + len(_text_single_normal_queue), + } + + +def _pop_single_text_task_locked() -> Optional["_SingleTextTask"]: + if _text_single_high_queue: + return _text_single_high_queue.popleft() + if _text_single_normal_queue: + return _text_single_normal_queue.popleft() + return None + + def _compact_preview(text: str, max_chars: int) -> str: compact = " ".join((text or "").split()) if len(compact) <= max_chars: @@ -356,30 +523,41 @@ def _text_batch_worker_loop() -> None: max_batch = max(1, int(CONFIG.TEXT_BATCH_SIZE)) while True: with _text_single_queue_cv: - while not _text_single_queue and not _text_batch_worker_stop: + while ( + not _text_single_high_queue + and not _text_single_normal_queue + and not _text_batch_worker_stop + ): _text_single_queue_cv.wait() if _text_batch_worker_stop: return - batch: List[_SingleTextTask] = [_text_single_queue.popleft()] + first_task = _pop_single_text_task_locked() + if first_task is None: + continue + batch: List[_SingleTextTask] = [first_task] deadline = time.perf_counter() + _TEXT_MICROBATCH_WINDOW_SEC while len(batch) < max_batch: remaining = deadline - time.perf_counter() if remaining <= 0: break - if not _text_single_queue: + if not _text_single_high_queue and not _text_single_normal_queue: _text_single_queue_cv.wait(timeout=remaining) continue - while _text_single_queue and len(batch) < max_batch: - batch.append(_text_single_queue.popleft()) + while len(batch) < max_batch: + next_task = _pop_single_text_task_locked() + if next_task is None: + break + batch.append(next_task) try: queue_wait_ms = [(time.perf_counter() - task.created_at) * 1000.0 for task in batch] reqids = [task.request_id for task in batch] logger.info( - "text microbatch dispatch | size=%d queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s", + "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s", len(batch), + _priority_label(max(task.priority for task in batch)), min(queue_wait_ms) if queue_wait_ms else 0.0, max(queue_wait_ms) if queue_wait_ms else 0.0, reqids, @@ -423,22 +601,32 @@ def _text_batch_worker_loop() -> None: task.done.set() -def _encode_single_text_with_microbatch(text: str, normalize: bool, request_id: str) -> List[float]: +def _encode_single_text_with_microbatch( + text: str, + normalize: bool, + request_id: str, + priority: int, +) -> List[float]: task = _SingleTextTask( text=text, normalize=normalize, + priority=_effective_priority(priority), created_at=time.perf_counter(), request_id=request_id, done=threading.Event(), ) with _text_single_queue_cv: - _text_single_queue.append(task) + if task.priority > 0: + _text_single_high_queue.append(task) + else: + _text_single_normal_queue.append(task) _text_single_queue_cv.notify() if not task.done.wait(timeout=_TEXT_REQUEST_TIMEOUT_SEC): with _text_single_queue_cv: + queue = _text_single_high_queue if task.priority > 0 else _text_single_normal_queue try: - _text_single_queue.remove(task) + queue.remove(task) except ValueError: pass raise RuntimeError( @@ -489,6 +677,7 @@ def load_models(): f"Unsupported embedding backend: {backend_name}. " "Supported: tei, local_st" ) + _start_text_dispatch_workers() logger.info("Text backend loaded successfully: %s", _text_backend_name) except Exception as e: logger.error("Failed to load text model: %s", e, exc_info=True) @@ -532,6 +721,7 @@ def load_models(): @app.on_event("shutdown") def stop_workers() -> None: _stop_text_batch_worker() + _stop_text_dispatch_workers() def _normalize_vector(vec: np.ndarray) -> np.ndarray: @@ -602,6 +792,8 @@ def _try_full_image_cache_hit( def health() -> Dict[str, Any]: """Health check endpoint. Returns status and current throttling stats.""" ready = (not open_text_model or _text_model is not None) and (not open_image_model or _image_model is not None) + text_dispatch_depth = _text_dispatch_queue_depth() + text_microbatch_depth = _text_microbatch_queue_depth() return { "status": "ok" if ready else "degraded", "service_kind": _SERVICE_KIND, @@ -620,9 +812,18 @@ def health() -> Dict[str, Any]: "text": _text_stats.snapshot(), "image": _image_stats.snapshot(), }, + "text_dispatch": { + "workers": _text_dispatch_worker_count, + "workers_alive": sum(1 for worker in _text_dispatch_workers if worker.is_alive()), + "queue_depth": text_dispatch_depth["total"], + "queue_depth_high": text_dispatch_depth["high"], + "queue_depth_normal": text_dispatch_depth["normal"], + }, "text_microbatch": { "window_ms": round(_TEXT_MICROBATCH_WINDOW_SEC * 1000.0, 3), - "queue_depth": len(_text_single_queue), + "queue_depth": text_microbatch_depth["total"], + "queue_depth_high": text_microbatch_depth["high"], + "queue_depth_normal": text_microbatch_depth["normal"], "worker_alive": bool(_text_batch_worker is not None and _text_batch_worker.is_alive()), "request_timeout_sec": _TEXT_REQUEST_TIMEOUT_SEC, }, @@ -654,6 +855,7 @@ def _embed_text_impl( normalized: List[str], effective_normalize: bool, request_id: str, + priority: int = 0, ) -> _EmbedResult: if _text_model is None: raise RuntimeError("Text model not loaded") @@ -703,6 +905,7 @@ def _embed_text_impl( missing_texts[0], normalize=effective_normalize, request_id=request_id, + priority=priority, ) ] mode = "microbatch-single" @@ -777,6 +980,7 @@ async def embed_text( http_request: Request, response: Response, normalize: Optional[bool] = None, + priority: int = 0, ) -> List[Optional[List[float]]]: if _text_model is None: raise HTTPException(status_code=503, detail="Text embedding model not loaded in this service") @@ -784,6 +988,9 @@ async def embed_text( request_id = _resolve_request_id(http_request) response.headers["X-Request-ID"] = request_id + if priority < 0: + raise HTTPException(status_code=400, detail="priority must be >= 0") + effective_priority = _effective_priority(priority) effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) normalized: List[str] = [] for i, t in enumerate(texts): @@ -806,8 +1013,9 @@ async def embed_text( cache_misses=0, ) logger.info( - "embed_text response | backend=%s mode=cache-only inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", _text_backend_name, + _priority_label(effective_priority), len(normalized), effective_normalize, len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, @@ -818,13 +1026,14 @@ async def embed_text( ) return cache_only.vectors - accepted, active = _text_request_limiter.try_acquire() + accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0) if not accepted: _text_stats.record_rejected() logger.warning( - "embed_text rejected | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), _text_backend_name, + _priority_label(effective_priority), len(normalized), effective_normalize, active, @@ -834,7 +1043,10 @@ async def embed_text( ) raise HTTPException( status_code=_OVERLOAD_STATUS_CODE, - detail=f"Text embedding service busy: active={active}, limit={_TEXT_MAX_INFLIGHT}", + detail=( + "Text embedding service busy for priority=0 requests: " + f"active={active}, limit={_TEXT_MAX_INFLIGHT}" + ), ) request_started = time.perf_counter() @@ -844,9 +1056,10 @@ async def embed_text( cache_misses = 0 try: logger.info( - "embed_text request | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "embed_text request | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), _text_backend_name, + _priority_label(effective_priority), len(normalized), effective_normalize, active, @@ -855,13 +1068,20 @@ async def embed_text( extra=_request_log_extra(request_id), ) verbose_logger.info( - "embed_text detail | payload=%s normalize=%s backend=%s", + "embed_text detail | payload=%s normalize=%s backend=%s priority=%s", normalized, effective_normalize, _text_backend_name, + _priority_label(effective_priority), extra=_request_log_extra(request_id), ) - result = await run_in_threadpool(_embed_text_impl, normalized, effective_normalize, request_id) + result = await run_in_threadpool( + _submit_text_dispatch_and_wait, + normalized, + effective_normalize, + request_id, + effective_priority, + ) success = True backend_elapsed_ms = result.backend_elapsed_ms cache_hits = result.cache_hits @@ -875,9 +1095,10 @@ async def embed_text( cache_misses=cache_misses, ) logger.info( - "embed_text response | backend=%s mode=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", + "embed_text response | backend=%s mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", _text_backend_name, result.mode, + _priority_label(effective_priority), len(normalized), effective_normalize, len(result.vectors[0]) if result.vectors and result.vectors[0] is not None else 0, @@ -888,8 +1109,9 @@ async def embed_text( extra=_request_log_extra(request_id), ) verbose_logger.info( - "embed_text result detail | count=%d first_vector=%s latency_ms=%.2f", + "embed_text result detail | count=%d priority=%s first_vector=%s latency_ms=%.2f", len(result.vectors), + _priority_label(effective_priority), result.vectors[0][: _VECTOR_PREVIEW_DIMS] if result.vectors and result.vectors[0] is not None else [], @@ -909,8 +1131,9 @@ async def embed_text( cache_misses=cache_misses, ) logger.error( - "embed_text failed | backend=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", + "embed_text failed | backend=%s priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", _text_backend_name, + _priority_label(effective_priority), len(normalized), effective_normalize, latency_ms, @@ -922,8 +1145,9 @@ async def embed_text( finally: remaining = _text_request_limiter.release(success=success) logger.info( - "embed_text finalize | success=%s active_after=%d", + "embed_text finalize | success=%s priority=%s active_after=%d", success, + _priority_label(effective_priority), remaining, extra=_request_log_extra(request_id), ) @@ -1019,6 +1243,7 @@ async def embed_image( http_request: Request, response: Response, normalize: Optional[bool] = None, + priority: int = 0, ) -> List[Optional[List[float]]]: if _image_model is None: raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service") @@ -1026,6 +1251,10 @@ async def embed_image( request_id = _resolve_request_id(http_request) response.headers["X-Request-ID"] = request_id + if priority < 0: + raise HTTPException(status_code=400, detail="priority must be >= 0") + effective_priority = _effective_priority(priority) + effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) urls: List[str] = [] for i, url_or_path in enumerate(images): @@ -1048,7 +1277,8 @@ async def embed_image( cache_misses=0, ) logger.info( - "embed_image response | mode=cache-only inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + _priority_label(effective_priority), len(urls), effective_normalize, len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, @@ -1059,12 +1289,13 @@ async def embed_image( ) return cache_only.vectors - accepted, active = _image_request_limiter.try_acquire() + accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0) if not accepted: _image_stats.record_rejected() logger.warning( - "embed_image rejected | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), + _priority_label(effective_priority), len(urls), effective_normalize, active, @@ -1074,7 +1305,10 @@ async def embed_image( ) raise HTTPException( status_code=_OVERLOAD_STATUS_CODE, - detail=f"Image embedding service busy: active={active}, limit={_IMAGE_MAX_INFLIGHT}", + detail=( + "Image embedding service busy for priority=0 requests: " + f"active={active}, limit={_IMAGE_MAX_INFLIGHT}" + ), ) request_started = time.perf_counter() @@ -1084,8 +1318,9 @@ async def embed_image( cache_misses = 0 try: logger.info( - "embed_image request | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + "embed_image request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), + _priority_label(effective_priority), len(urls), effective_normalize, active, @@ -1094,9 +1329,10 @@ async def embed_image( extra=_request_log_extra(request_id), ) verbose_logger.info( - "embed_image detail | payload=%s normalize=%s", + "embed_image detail | payload=%s normalize=%s priority=%s", urls, effective_normalize, + _priority_label(effective_priority), extra=_request_log_extra(request_id), ) result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id) @@ -1113,8 +1349,9 @@ async def embed_image( cache_misses=cache_misses, ) logger.info( - "embed_image response | mode=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", + "embed_image response | mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f", result.mode, + _priority_label(effective_priority), len(urls), effective_normalize, len(result.vectors[0]) if result.vectors and result.vectors[0] is not None else 0, @@ -1146,7 +1383,8 @@ async def embed_image( cache_misses=cache_misses, ) logger.error( - "embed_image failed | inputs=%d normalize=%s latency_ms=%.2f error=%s", + "embed_image failed | priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s", + _priority_label(effective_priority), len(urls), effective_normalize, latency_ms, @@ -1158,8 +1396,9 @@ async def embed_image( finally: remaining = _image_request_limiter.release(success=success) logger.info( - "embed_image finalize | success=%s active_after=%d", + "embed_image finalize | success=%s priority=%s active_after=%d", success, + _priority_label(effective_priority), remaining, extra=_request_log_extra(request_id), ) diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 048f7fa..6202931 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -35,7 +35,12 @@ class TextEmbeddingEncoder: expire_time=self.expire_time, ) - def _call_service(self, request_data: List[str], normalize_embeddings: bool = True) -> List[Any]: + def _call_service( + self, + request_data: List[str], + normalize_embeddings: bool = True, + priority: int = 0, + ) -> List[Any]: """ Call the embedding service API. @@ -48,7 +53,10 @@ class TextEmbeddingEncoder: try: response = requests.post( self.endpoint, - params={"normalize": "true" if normalize_embeddings else "false"}, + params={ + "normalize": "true" if normalize_embeddings else "false", + "priority": max(0, int(priority)), + }, json=request_data, timeout=60 ) @@ -62,6 +70,7 @@ class TextEmbeddingEncoder: self, sentences: Union[str, List[str]], normalize_embeddings: bool = True, + priority: int = 0, device: str = 'cpu', batch_size: int = 32 ) -> np.ndarray: @@ -100,7 +109,11 @@ class TextEmbeddingEncoder: # If there are uncached texts, call service if uncached_texts: - response_data = self._call_service(request_data, normalize_embeddings=normalize_embeddings) + response_data = self._call_service( + request_data, + normalize_embeddings=normalize_embeddings, + priority=priority, + ) # Process response for i, text in enumerate(uncached_texts): diff --git a/perf_reports/README.md b/perf_reports/README.md new file mode 100644 index 0000000..c918ef4 --- /dev/null +++ b/perf_reports/README.md @@ -0,0 +1,34 @@ +# 性能测试报告索引 + +本目录存放各次压测/矩阵的原始 JSON 与说明。**推荐复用**仓库脚本,避免重复造轮子: + +| 脚本 | 用途 | +|------|------| +| `scripts/perf_api_benchmark.py` | 搜索后端、向量、翻译、重排等 HTTP 接口压测;支持 `--embed-text-priority` / `--embed-image-priority` 与 `scripts/perf_cases.json.example` | + +历史矩阵示例(并发扫描): + +- `2026-03-12/matrix_report/summary.md` — 与 `summary.json` 同目录 + +## 2026-03-20 — 向量服务 `priority` 参数烟测 + +环境:本机 `127.0.0.1:6005`(文本)、`127.0.0.1:6008`(图片),命令与结果见同目录 JSON: + +| 报告文件 | 场景 | 说明 | +|----------|------|------| +| `2026-03-20_embed_text_p0.json` | `embed_text` | `priority=0`(默认),8s,并发 10 | +| `2026-03-20_embed_text_p1.json` | `embed_text` | `--embed-text-priority 1`,8s,并发 10 | +| `2026-03-20_embed_image_p0.json` | `embed_image` | `priority=0`,8s,并发 5 | +| `2026-03-20_embed_image_p1.json` | `embed_image` | `--embed-image-priority 1`,8s,并发 5 | + +复现示例: + +```bash +source activate.sh +python scripts/perf_api_benchmark.py --scenario embed_text --duration 8 --concurrency 10 --timeout 30 --output perf_reports/2026-03-20_embed_text_p0.json +python scripts/perf_api_benchmark.py --scenario embed_text --duration 8 --concurrency 10 --embed-text-priority 1 --output perf_reports/2026-03-20_embed_text_p1.json +python scripts/perf_api_benchmark.py --scenario embed_image --duration 8 --concurrency 5 --timeout 60 --output perf_reports/2026-03-20_embed_image_p0.json +python scripts/perf_api_benchmark.py --scenario embed_image --duration 8 --concurrency 5 --embed-image-priority 1 --output perf_reports/2026-03-20_embed_image_p1.json +``` + +说明:本次为 **8 秒 smoke**,与 `2026-03-12` 矩阵的时长/并发不可直接横向对比;仅验证 `priority` 参数下服务仍返回 200 且 payload 校验通过。 diff --git a/query/query_parser.py b/query/query_parser.py index a311d00..b86f60c 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -442,7 +442,7 @@ class QueryParser: # Submit encoding task to thread pool for async execution encoding_executor = ThreadPoolExecutor(max_workers=1) def _encode_query_vector() -> Optional[np.ndarray]: - arr = self.text_encoder.encode([query_text]) + arr = self.text_encoder.encode([query_text], priority=1) if arr is None or len(arr) == 0: return None vec = arr[0] diff --git a/scripts/perf_api_benchmark.py b/scripts/perf_api_benchmark.py index 23acfbe..f334734 100755 --- a/scripts/perf_api_benchmark.py +++ b/scripts/perf_api_benchmark.py @@ -15,6 +15,9 @@ Examples: python scripts/perf_api_benchmark.py --scenario backend_suggest --duration 30 --concurrency 50 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --duration 60 --concurrency 80 --tenant-id 162 python scripts/perf_api_benchmark.py --scenario all --cases-file scripts/perf_cases.json.example --output perf_result.json + # Embedding admission / priority (query param `priority`; same semantics as embedding service): + python scripts/perf_api_benchmark.py --scenario embed_text --embed-text-priority 1 --duration 30 --concurrency 20 + python scripts/perf_api_benchmark.py --scenario embed_image --embed-image-priority 1 --duration 30 --concurrency 10 """ from __future__ import annotations @@ -72,9 +75,9 @@ def validate_response_payload( ) -> Tuple[bool, str]: """ Lightweight payload validation for correctness-aware perf tests. - Currently strict for embed_text to catch NaN/null vector regressions. + Strict for embed_text / embed_image to catch NaN/null vector regressions. """ - if scenario_name != "embed_text": + if scenario_name not in ("embed_text", "embed_image"): return True, "" expected_len = len(tpl.json_body) if isinstance(tpl.json_body, list) else None @@ -219,6 +222,43 @@ def load_cases_from_file(path: Path, tenant_id: str) -> Dict[str, List[RequestTe return out +def apply_embed_priority_params( + scenarios: Dict[str, Scenario], + embed_text_priority: int, + embed_image_priority: int, +) -> None: + """ + Merge default `priority` query param into embed templates when absent. + `scripts/perf_cases.json` may set per-request `params.priority` to override. + """ + mapping = { + "embed_text": max(0, int(embed_text_priority)), + "embed_image": max(0, int(embed_image_priority)), + } + for name, pri in mapping.items(): + if name not in scenarios: + continue + scen = scenarios[name] + new_templates: List[RequestTemplate] = [] + for t in scen.templates: + params = dict(t.params or {}) + params.setdefault("priority", str(pri)) + new_templates.append( + RequestTemplate( + method=t.method, + path=t.path, + params=params, + json_body=t.json_body, + headers=t.headers, + ) + ) + scenarios[name] = Scenario( + name=scen.name, + templates=new_templates, + timeout_sec=scen.timeout_sec, + ) + + def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]: defaults = make_default_templates(args.tenant_id) if args.cases_file: @@ -252,6 +292,11 @@ def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]: ) ) scenarios[name] = Scenario(name=name, templates=rewritten, timeout_sec=args.timeout) + apply_embed_priority_params( + scenarios, + embed_text_priority=args.embed_text_priority, + embed_image_priority=args.embed_image_priority, + ) return scenarios @@ -483,6 +528,18 @@ def parse_args() -> argparse.Namespace: default=0, help="Optional top_n for rerank requests in dynamic docs mode (0 means omit top_n).", ) + parser.add_argument( + "--embed-text-priority", + type=int, + default=0, + help="Default query param priority= for embed_text (0=offline admission; >0 bypasses rejection). Merged into params unless set in --cases-file.", + ) + parser.add_argument( + "--embed-image-priority", + type=int, + default=0, + help="Default query param priority= for embed_image (same semantics as embed-text-priority).", + ) return parser.parse_args() @@ -609,6 +666,8 @@ async def main_async() -> int: print(f" embedding_image_base={args.embedding_image_base}") print(f" translator_base={args.translator_base}") print(f" reranker_base={args.reranker_base}") + print(f" embed_text_priority={args.embed_text_priority}") + print(f" embed_image_priority={args.embed_image_priority}") if args.rerank_dynamic_docs: print(" rerank_dynamic_docs=True") print(f" rerank_doc_count={args.rerank_doc_count}") @@ -667,6 +726,8 @@ async def main_async() -> int: "rerank_query": args.rerank_query, "rerank_seed": args.rerank_seed, "rerank_top_n": args.rerank_top_n, + "embed_text_priority": args.embed_text_priority, + "embed_image_priority": args.embed_image_priority, }, "results": results, "overall": aggregate_results(results), diff --git a/scripts/perf_cases.json.example b/scripts/perf_cases.json.example index df4a5be..0291dcb 100644 --- a/scripts/perf_cases.json.example +++ b/scripts/perf_cases.json.example @@ -32,9 +32,18 @@ { "method": "POST", "path": "/embed/text", + "params": {"priority": "0"}, "json": ["wireless mouse", "gaming keyboard", "USB-C cable", "barbie doll"] } ], + "embed_image": [ + { + "method": "POST", + "path": "/embed/image", + "params": {"normalize": "true", "priority": "0"}, + "json": ["/data/saas-search/docs/image-dress1.png"] + } + ], "translate": [ { "method": "POST", diff --git a/search/searcher.py b/search/searcher.py index 6a29fbb..231ffeb 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -791,7 +791,7 @@ class Searcher: # Generate image embedding if self.image_encoder is None: raise RuntimeError("Image encoder is not initialized at startup") - image_vector = self.image_encoder.encode_image_from_url(image_url) + image_vector = self.image_encoder.encode_image_from_url(image_url, priority=1) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index 7428e7b..73f0159 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -540,7 +540,15 @@ def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient) class _FakeTextModel: - def encode_batch(self, texts, batch_size=32, device="cpu", normalize_embeddings=True): + """Matches TEI / server path: `_text_model.encode(...)` (not encode_batch).""" + + def encode( + self, + texts, + batch_size=32, + device="cpu", + normalize_embeddings=True, + ): return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts] @@ -549,6 +557,18 @@ class _FakeImageModel: return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls] +class _EmbeddingCacheMiss: + """Avoid Redis/module cache hits so contract tests exercise the encode path.""" + + redis_client = None + + def get(self, key): + return None + + def set(self, key, value): + return True + + @pytest.fixture def embedding_module(): import embeddings.server as emb_server @@ -556,17 +576,31 @@ def embedding_module(): emb_server.app.router.on_startup.clear() emb_server._text_model = _FakeTextModel() emb_server._image_model = _FakeImageModel() + emb_server._text_backend_name = "tei" + emb_server._text_cache = _EmbeddingCacheMiss() + emb_server._image_cache = _EmbeddingCacheMiss() yield emb_server def test_embedding_text_contract(embedding_module): - data = embedding_module.embed_text(["hello", "world"]) + """Contract via HTTP like production; route handlers require Request/Response.""" + from fastapi.testclient import TestClient + + with TestClient(embedding_module.app) as client: + resp = client.post("/embed/text", json=["hello", "world"]) + assert resp.status_code == 200 + data = resp.json() assert len(data) == 2 assert len(data[0]) == 3 def test_embedding_image_contract(embedding_module): - data = embedding_module.embed_image(["https://example.com/a.jpg"]) + from fastapi.testclient import TestClient + + with TestClient(embedding_module.app) as client: + resp = client.post("/embed/image", json=["https://example.com/a.jpg"]) + assert resp.status_code == 200 + data = resp.json() assert len(data[0]) == 3 diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py index 3470fad..704910a 100644 --- a/tests/test_embedding_pipeline.py +++ b/tests/test_embedding_pipeline.py @@ -63,7 +63,11 @@ class _FakeTranslator: class _FakeQueryEncoder: + def __init__(self): + self.calls = [] + def encode(self, sentences, **kwargs): + self.calls.append({"sentences": sentences, "kwargs": dict(kwargs)}) if isinstance(sentences, str): sentences = [sentences] return np.array([np.array([0.11, 0.22, 0.33], dtype=np.float32) for _ in sentences], dtype=object) @@ -98,9 +102,7 @@ def _build_test_config() -> SearchConfig: rerank=RerankConfig(), spu_config=SPUConfig(enabled=True, spu_field="spu_id", inner_hits_size=3), es_index_name="test_products", - tenant_config={}, es_settings={}, - services={}, ) @@ -111,6 +113,7 @@ def test_text_embedding_encoder_response_alignment(monkeypatch): def _fake_post(url, json, timeout, **kwargs): assert url.endswith("/embed/text") assert json == ["hello", "world"] + assert kwargs["params"]["priority"] == 0 return _FakeResponse([[0.1, 0.2], [0.3, 0.4]]) monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) @@ -172,6 +175,7 @@ def test_image_embedding_encoder_cache_hit(monkeypatch): def _fake_post(url, params, json, timeout, **kwargs): calls["count"] += 1 + assert params["priority"] == 0 return _FakeResponse([[0.1, 0.2]]) monkeypatch.setattr("embeddings.image_encoder.requests.post", _fake_post) @@ -184,16 +188,35 @@ def test_image_embedding_encoder_cache_hit(monkeypatch): assert np.allclose(out[1], np.array([0.1, 0.2], dtype=np.float32)) +def test_image_embedding_encoder_passes_priority(monkeypatch): + fake_cache = _FakeEmbeddingCache() + monkeypatch.setattr("embeddings.image_encoder.RedisEmbeddingCache", lambda **kwargs: fake_cache) + + def _fake_post(url, params, json, timeout, **kwargs): + assert params["priority"] == 1 + return _FakeResponse([[0.1, 0.2]]) + + monkeypatch.setattr("embeddings.image_encoder.requests.post", _fake_post) + + encoder = CLIPImageEncoder(service_url="http://127.0.0.1:6008") + out = encoder.encode_batch(["https://example.com/a.jpg"], priority=1) + assert len(out) == 1 + assert np.allclose(out[0], np.array([0.1, 0.2], dtype=np.float32)) + + def test_query_parser_generates_query_vector_with_encoder(): + encoder = _FakeQueryEncoder() parser = QueryParser( config=_build_test_config(), - text_encoder=_FakeQueryEncoder(), + text_encoder=encoder, translator=_FakeTranslator(), ) parsed = parser.parse("red dress", tenant_id="162", generate_vector=True) assert parsed.query_vector is not None assert parsed.query_vector.shape == (3,) + assert encoder.calls + assert encoder.calls[0]["kwargs"]["priority"] == 1 def test_query_parser_skips_query_vector_when_disabled(): diff --git a/tests/test_embedding_service_limits.py b/tests/test_embedding_service_limits.py index 7d14ab7..4b744cc 100644 --- a/tests/test_embedding_service_limits.py +++ b/tests/test_embedding_service_limits.py @@ -69,6 +69,8 @@ def test_health_exposes_limit_stats(monkeypatch): def test_embed_image_rejects_when_image_lane_is_full(monkeypatch): + # Ensure no cache hit (module-level Redis cache may contain this URL from other tests). + monkeypatch.setattr(embedding_server, "_image_cache", _FakeCache({})) limiter = embedding_server._InflightLimiter("image", 1) acquired, _ = limiter.try_acquire() assert acquired is True diff --git a/tests/test_embedding_service_priority.py b/tests/test_embedding_service_priority.py new file mode 100644 index 0000000..35cfd0d --- /dev/null +++ b/tests/test_embedding_service_priority.py @@ -0,0 +1,81 @@ +import threading + +import embeddings.server as emb_server + + +def test_text_inflight_limiter_priority_bypass(): + limiter = emb_server._InflightLimiter(name="text", limit=1) + + accepted, active = limiter.try_acquire() + assert accepted is True + assert active == 1 + + accepted, active = limiter.try_acquire() + assert accepted is False + assert active == 1 + + accepted, active = limiter.try_acquire(bypass_limit=True) + assert accepted is True + assert active == 2 + + snapshot = limiter.snapshot() + assert snapshot["priority_bypass_total"] == 1 + + limiter.release(success=True) + limiter.release(success=True) + + +def test_text_dispatch_prefers_high_priority_queue(): + high_task = emb_server._TextDispatchTask( + normalized=["online"], + effective_normalize=True, + request_id="high", + priority=1, + created_at=0.0, + done=threading.Event(), + ) + normal_task = emb_server._TextDispatchTask( + normalized=["offline"], + effective_normalize=True, + request_id="normal", + priority=0, + created_at=0.0, + done=threading.Event(), + ) + + with emb_server._text_dispatch_cv: + emb_server._text_dispatch_high_queue.clear() + emb_server._text_dispatch_normal_queue.clear() + emb_server._text_dispatch_normal_queue.append(normal_task) + emb_server._text_dispatch_high_queue.append(high_task) + + first = emb_server._pop_text_dispatch_task_locked() + second = emb_server._pop_text_dispatch_task_locked() + + emb_server._text_dispatch_high_queue.clear() + emb_server._text_dispatch_normal_queue.clear() + + assert first is high_task + assert second is normal_task + + +def test_image_inflight_limiter_priority_bypass(): + limiter = emb_server._InflightLimiter(name="image", limit=1) + + accepted, active = limiter.try_acquire() + assert accepted is True + assert active == 1 + + accepted, active = limiter.try_acquire() + assert accepted is False + assert active == 1 + + accepted, active = limiter.try_acquire(bypass_limit=True) + assert accepted is True + assert active == 2 + + snapshot = limiter.snapshot() + assert snapshot["priority_bypass_total"] == 1 + + limiter.release(success=True) + limiter.release(success=True) -- libgit2 0.21.2