Commit b754fd41470f2a1dab70339d336383de0cd8af1c

Authored by tangwang
1 parent 16204531

图片向量化支持优先级参数

config/config.yaml
... ... @@ -110,7 +110,7 @@ rerank:
110 110 services:
111 111 translation:
112 112 service_url: "http://127.0.0.1:6006"
113   - default_model: "llm"
  113 + default_model: "nllb-200-distilled-600m"
114 114 default_scene: "general"
115 115 timeout_sec: 10.0
116 116 cache:
... ...
docs/TODO.txt
1 1  
2 2  
3 3  
4   -先阅读图片和文本embedding相关的代ç ï¼š
5   -@embeddings/README.md @embeddings/server.py @docs/æœç´¢API对接指å—-07-å¾®æœåŠ¡æŽ¥å£ï¼ˆEmbedding-Reranker-Translation).md @embeddings/image_encoder.py @embeddings/text_encoder.py
  4 +先阅读文本embedding相关的代ç ï¼š
  5 +@embeddings/README.md @embeddings/server.py @docs/æœç´¢API对接指å—-07-å¾®æœåŠ¡æŽ¥å£ï¼ˆEmbedding-Reranker-Translation).md @embeddings/text_encoder.py
6 6 ç›®å‰æœ‰TEXT_MAX_INFLIGHT / IMAGE_MAX_INFLIGHT 准入é™åˆ¶ï¼Œè¶…é™è¿”回过载状æ€ç ã€‚
7 7  
8   -embeddingæœåŠ¡ï¼ˆåŒ…æ‹¬å›¾ç‰‡å’Œæ–‡æœ¬çš„embeddingï¼‰ï¼Œè¦æ”¯æŒ priority æŸ¥è¯¢å‚æ•°ï¼Œpriority > 0:ä¸è®¡å…¥ä¸Šè¿° inflightã€ä¸ä¼šå› å‡†å…¥è¢«æ‹’ç»ï¼›
  8 +文本embeddingæœåŠ¡ï¼Œè¦æ”¯æŒ priority æŸ¥è¯¢å‚æ•°ï¼Œpriority > 0:ä¸è®¡å…¥ä¸Šè¿° inflightã€ä¸ä¼šå› å‡†å…¥è¢«æ‹’ç»ï¼ˆå›¾ç‰‡embeddingä¸éœ€è¦æ”¯æŒï¼Œå› ä¸ºåªæœ‰ç¦»çº¿éœ€è¦ç”¨åˆ°å›¾ç‰‡embedding)
9 9 priority == 0(默认,适åˆåšç´¢å¼•之类的离线任务):ä»èµ°åŽŸæœ‰ TEXT_MAX_INFLIGHT / IMAGE_MAX_INFLIGHT 准入;超é™è¿”回过载状æ€ç ã€‚
10 10 priority > 0(或者==1)(适åˆåœ¨çº¿è¯·æ±‚):ä¸ä¼šå› å‡†å…¥è¢«æ‹’ç»ï¼Œä½†æ˜¯ä»ç„¶éœ€è¦å ç”¨inflight,这样ä¿è¯åœ¨çº¿è¯·æ±‚ä¸è¢«é™åˆ¶ï¼Œå¹¶ä¸”在线请求很多的时候å¯ä»¥æ‹’ç»æŽ‰ç¦»çº¿çš„è¯·æ±‚ã€‚
11 11  
... ... @@ -16,7 +16,6 @@ priority > 0(或者==1)(适åˆåœ¨çº¿è¯·æ±‚):ä¸ä¼šå› å‡†å…¥è¢«æ‹’ç»ï¼
16 16  
17 17  
18 18  
19   -
20 19 é…ç½®ä½“ç³»çš„é‡æž„。
21 20  
22 21 Referring to @docs/config-system-review-and-redesign.md , most of the modifications have been completed. Could you conduct a review to check what else needs improvement in the configuration documentation system? Are there any outstanding issues?
... ...
docs/搜索API对接指南-07-微服务接口(Embedding-Reranker-Translation).md
... ... @@ -38,6 +38,10 @@
38 38 - `TEXT_MAX_INFLIGHT`
39 39 - `IMAGE_MAX_INFLIGHT`
40 40 - 当超过处理能力时,服务会直接返回过载错误,而不是无限排队。
  41 +- 文本与图片服务均支持 `priority` query 参数(图片不做队列插队,仅 admission 规则与文本一致):
  42 + - `priority=0`(默认):适合离线索引,仍分别受 `TEXT_MAX_INFLIGHT` / `IMAGE_MAX_INFLIGHT` admission control 约束。
  43 + - `priority>0`(建议在线请求用 `1`):不会因 admission control 被拒绝,但仍会占用对应 text/image 的 inflight。
  44 + - 文本服务端会优先处理高优先级文本请求;图片端不实现插队,顺序按请求到达处理即可。
41 45 - `GET /health` 会返回各自的 `limits`、`stats`、`cache_enabled` 等状态;`GET /ready` 用于就绪探针。
42 46  
43 47 #### 7.1.1 `POST /embed/text` — 文本向量化
... ... @@ -59,11 +63,15 @@
59 63 **完整 curl 示例**:
60 64  
61 65 ```bash
62   -curl -X POST "http://localhost:6005/embed/text?normalize=true" \
  66 +curl -X POST "http://localhost:6005/embed/text?normalize=true&priority=1" \
63 67 -H "Content-Type: application/json" \
64 68 -d '["芭比娃娃 儿童玩具", "纯棉T恤 短袖"]'
65 69 ```
66 70  
  71 +说明:
  72 +- 在线 query / 实时请求:建议显式传 `priority=1`
  73 +- 离线索引 / 批量回填:保持默认 `priority=0` 即可
  74 +
67 75 #### 7.1.2 `POST /embed/image` — 图片向量化
68 76  
69 77 将图片 URL 或路径转为向量,用于以图搜图。
... ... @@ -85,11 +93,13 @@ curl -X POST "http://localhost:6005/embed/text?normalize=true" \
85 93 **完整 curl 示例**:
86 94  
87 95 ```bash
88   -curl -X POST "http://localhost:6008/embed/image?normalize=true" \
  96 +curl -X POST "http://localhost:6008/embed/image?normalize=true&priority=1" \
89 97 -H "Content-Type: application/json" \
90 98 -d '["https://oss.essa.cn/98532128-cf8e-456c-9e30-6f2a5ea0c19f.jpg"]'
91 99 ```
92 100  
  101 +在线以图搜图等实时场景可传 `priority=1`;离线索引回填保持默认 `priority=0`。
  102 +
93 103 #### 7.1.3 `GET /health` — 健康检查
94 104  
95 105 ```bash
... ... @@ -118,6 +128,8 @@ curl "http://localhost:6008/ready"
118 128 - cache key 已区分 `normalize=true/false`,避免不同归一化策略命中同一条缓存。
119 129 - 当服务端发现请求是 **full-cache-hit** 时,会直接返回,不占用模型并发槽位。
120 130 - 当服务端发现超过 `TEXT_MAX_INFLIGHT` / `IMAGE_MAX_INFLIGHT` 时,会直接拒绝,而不是无限排队。
  131 +- 其中 `POST /embed/text` 的 `priority=0` 会按上面的 inflight 规则直接拒绝;`priority>0` 不会被 admission 拒绝,但仍计入 inflight,并在服务端排队时优先于 `priority=0` 请求。
  132 +- `POST /embed/image` 的 `priority=0` 受 `IMAGE_MAX_INFLIGHT` 约束;`priority>0` 不会被 admission 拒绝,但仍计入 inflight(无插队)。
121 133  
122 134 #### 7.1.6 TEI 统一调优建议(主服务)
123 135  
... ...
embeddings/README.md
... ... @@ -30,13 +30,13 @@
30 30 - 文本服务(默认 `6005`)
31 31 - `POST /embed/text`
32 32 - 请求体:`["文本1", "文本2", ...]`
33   - - 可选 query 参数:`normalize=true|false`
  33 + - 可选 query 参数:`normalize=true|false`、`priority=0|1`
34 34 - 返回:`[[...], [...], ...]`
35 35 - 健康接口:`GET /health`、`GET /ready`
36 36 - 图片服务(默认 `6008`)
37 37 - `POST /embed/image`
38 38 - 请求体:`["url或本地路径1", ...]`
39   - - 可选 query 参数:`normalize=true|false`
  39 + - 可选 query 参数:`normalize=true|false`、`priority=0|1`
40 40 - 返回:`[[...], [...], ...]`
41 41 - 健康接口:`GET /health`、`GET /ready`
42 42  
... ... @@ -61,6 +61,11 @@
61 61 - 图片服务可以配置得比文本更严格。
62 62 - 请求若是 full-cache-hit,会在服务端直接返回,不占用模型并发槽位。
63 63 - 超过处理能力时直接拒绝,比无限排队更稳定。
  64 +- 文本服务支持 `priority`:
  65 + - `priority=0`(默认,适合离线索引)仍受 `TEXT_MAX_INFLIGHT` 限制,超限直接返回 overload。
  66 + - `priority>0`(建议在线 query 用 `1`)不会因 admission control 被拒绝,但仍会计入 inflight。
  67 + - 文本服务内部使用双队列调度,处理时会优先消费高优先级请求,避免在线请求长期排在离线批量任务后面。
  68 +- 图片服务同样支持 `priority`(语义与文本一致,按 `IMAGE_MAX_INFLIGHT` 计数;不做队列插队,仅 admission 规则不同)。
64 69  
65 70 ### 图片向量:clip-as-service(推荐)
66 71  
... ... @@ -86,6 +91,14 @@
86 91 - `CLIP_AS_SERVICE_MODEL_NAME=CN-CLIP/ViT-L-14`
87 92 - `scripts/start_cnclip_service.sh` 默认会读取同一个 `CLIP_AS_SERVICE_MODEL_NAME`,也可用 `CNCLIP_MODEL_NAME` 或 `--model-name` 临时覆盖
88 93  
  94 +### 性能与压测(沿用仓库脚本)
  95 +
  96 +- 接口级压测(与 `perf_reports/2026-03-12/matrix_report/` 等方法一致):`scripts/perf_api_benchmark.py`
  97 + - 示例:`python scripts/perf_api_benchmark.py --scenario embed_text --duration 30 --concurrency 20`
  98 + - 文本/图片向量可带 `priority`(与线上 admission 语义一致):`--embed-text-priority 1`、`--embed-image-priority 1`
  99 + - 自定义请求模板:`--cases-file scripts/perf_cases.json.example`
  100 +- 历史矩阵结果与说明见 `perf_reports/2026-03-12/matrix_report/summary.md`。
  101 +
89 102 ### 启动服务
90 103  
91 104 使用仓库脚本启动:
... ...
embeddings/image_encoder.py
... ... @@ -35,7 +35,12 @@ class CLIPImageEncoder:
35 35 namespace="image",
36 36 )
37 37  
38   - def _call_service(self, request_data: List[str], normalize_embeddings: bool = True) -> List[Any]:
  38 + def _call_service(
  39 + self,
  40 + request_data: List[str],
  41 + normalize_embeddings: bool = True,
  42 + priority: int = 0,
  43 + ) -> List[Any]:
39 44 """
40 45 Call the embedding service API.
41 46  
... ... @@ -48,7 +53,10 @@ class CLIPImageEncoder:
48 53 try:
49 54 response = requests.post(
50 55 self.endpoint,
51   - params={"normalize": "true" if normalize_embeddings else "false"},
  56 + params={
  57 + "normalize": "true" if normalize_embeddings else "false",
  58 + "priority": max(0, int(priority)),
  59 + },
52 60 json=request_data,
53 61 timeout=60
54 62 )
... ... @@ -66,7 +74,12 @@ class CLIPImageEncoder:
66 74 """
67 75 raise NotImplementedError("encode_image with PIL Image is not supported by embedding service")
68 76  
69   - def encode_image_from_url(self, url: str, normalize_embeddings: bool = True) -> np.ndarray:
  77 + def encode_image_from_url(
  78 + self,
  79 + url: str,
  80 + normalize_embeddings: bool = True,
  81 + priority: int = 0,
  82 + ) -> np.ndarray:
70 83 """
71 84 Generate image embedding via network service using URL.
72 85  
... ... @@ -81,7 +94,11 @@ class CLIPImageEncoder:
81 94 if cached is not None:
82 95 return cached
83 96  
84   - response_data = self._call_service([url], normalize_embeddings=normalize_embeddings)
  97 + response_data = self._call_service(
  98 + [url],
  99 + normalize_embeddings=normalize_embeddings,
  100 + priority=priority,
  101 + )
85 102 if not response_data or len(response_data) != 1 or response_data[0] is None:
86 103 raise RuntimeError(f"No image embedding returned for URL: {url}")
87 104 vec = np.array(response_data[0], dtype=np.float32)
... ... @@ -95,6 +112,7 @@ class CLIPImageEncoder:
95 112 images: List[Union[str, Image.Image]],
96 113 batch_size: int = 8,
97 114 normalize_embeddings: bool = True,
  115 + priority: int = 0,
98 116 ) -> List[np.ndarray]:
99 117 """
100 118 Encode a batch of images efficiently via network service.
... ... @@ -129,7 +147,11 @@ class CLIPImageEncoder:
129 147  
130 148 for i in range(0, len(pending_urls), batch_size):
131 149 batch_urls = pending_urls[i : i + batch_size]
132   - response_data = self._call_service(batch_urls, normalize_embeddings=normalize_embeddings)
  150 + response_data = self._call_service(
  151 + batch_urls,
  152 + normalize_embeddings=normalize_embeddings,
  153 + priority=priority,
  154 + )
133 155 if not response_data or len(response_data) != len(batch_urls):
134 156 raise RuntimeError(
135 157 f"Image embedding response length mismatch: expected {len(batch_urls)}, "
... ... @@ -153,6 +175,7 @@ class CLIPImageEncoder:
153 175 urls: List[str],
154 176 batch_size: Optional[int] = None,
155 177 normalize_embeddings: bool = True,
  178 + priority: int = 0,
156 179 ) -> List[np.ndarray]:
157 180 """
158 181 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
... ... @@ -168,4 +191,5 @@ class CLIPImageEncoder:
168 191 urls,
169 192 batch_size=batch_size or 8,
170 193 normalize_embeddings=normalize_embeddings,
  194 + priority=priority,
171 195 )
... ...
embeddings/server.py
... ... @@ -206,23 +206,24 @@ class _InflightLimiter:
206 206 def __init__(self, name: str, limit: int):
207 207 self.name = name
208 208 self.limit = max(1, int(limit))
209   - self._sem = threading.BoundedSemaphore(self.limit)
210 209 self._lock = threading.Lock()
211 210 self._active = 0
212 211 self._rejected = 0
213 212 self._completed = 0
214 213 self._failed = 0
215 214 self._max_active = 0
  215 + self._priority_bypass_total = 0
216 216  
217   - def try_acquire(self) -> tuple[bool, int]:
218   - if not self._sem.acquire(blocking=False):
219   - with self._lock:
  217 + def try_acquire(self, *, bypass_limit: bool = False) -> tuple[bool, int]:
  218 + with self._lock:
  219 + if not bypass_limit and self._active >= self.limit:
220 220 self._rejected += 1
221 221 active = self._active
222   - return False, active
223   - with self._lock:
  222 + return False, active
224 223 self._active += 1
225 224 self._max_active = max(self._max_active, self._active)
  225 + if bypass_limit:
  226 + self._priority_bypass_total += 1
226 227 active = self._active
227 228 return True, active
228 229  
... ... @@ -234,7 +235,6 @@ class _InflightLimiter:
234 235 else:
235 236 self._failed += 1
236 237 active = self._active
237   - self._sem.release()
238 238 return active
239 239  
240 240 def snapshot(self) -> Dict[str, int]:
... ... @@ -246,9 +246,157 @@ class _InflightLimiter:
246 246 "completed_total": self._completed,
247 247 "failed_total": self._failed,
248 248 "max_active": self._max_active,
  249 + "priority_bypass_total": self._priority_bypass_total,
249 250 }
250 251  
251 252  
  253 +def _effective_priority(priority: int) -> int:
  254 + return 1 if int(priority) > 0 else 0
  255 +
  256 +
  257 +def _priority_label(priority: int) -> str:
  258 + return "high" if _effective_priority(priority) > 0 else "normal"
  259 +
  260 +
  261 +@dataclass
  262 +class _TextDispatchTask:
  263 + normalized: List[str]
  264 + effective_normalize: bool
  265 + request_id: str
  266 + priority: int
  267 + created_at: float
  268 + done: threading.Event
  269 + result: Optional[_EmbedResult] = None
  270 + error: Optional[Exception] = None
  271 +
  272 +
  273 +_text_dispatch_high_queue: "deque[_TextDispatchTask]" = deque()
  274 +_text_dispatch_normal_queue: "deque[_TextDispatchTask]" = deque()
  275 +_text_dispatch_cv = threading.Condition()
  276 +_text_dispatch_workers: List[threading.Thread] = []
  277 +_text_dispatch_worker_stop = False
  278 +_text_dispatch_worker_count = 0
  279 +
  280 +
  281 +def _text_dispatch_queue_depth() -> Dict[str, int]:
  282 + with _text_dispatch_cv:
  283 + return {
  284 + "high": len(_text_dispatch_high_queue),
  285 + "normal": len(_text_dispatch_normal_queue),
  286 + "total": len(_text_dispatch_high_queue) + len(_text_dispatch_normal_queue),
  287 + }
  288 +
  289 +
  290 +def _pop_text_dispatch_task_locked() -> Optional["_TextDispatchTask"]:
  291 + if _text_dispatch_high_queue:
  292 + return _text_dispatch_high_queue.popleft()
  293 + if _text_dispatch_normal_queue:
  294 + return _text_dispatch_normal_queue.popleft()
  295 + return None
  296 +
  297 +
  298 +def _start_text_dispatch_workers() -> None:
  299 + global _text_dispatch_workers, _text_dispatch_worker_stop, _text_dispatch_worker_count
  300 + if _text_model is None:
  301 + return
  302 + target_worker_count = 1 if _text_backend_name == "local_st" else _TEXT_MAX_INFLIGHT
  303 + alive_workers = [worker for worker in _text_dispatch_workers if worker.is_alive()]
  304 + if len(alive_workers) == target_worker_count:
  305 + _text_dispatch_workers = alive_workers
  306 + _text_dispatch_worker_count = target_worker_count
  307 + return
  308 + _text_dispatch_worker_stop = False
  309 + _text_dispatch_worker_count = target_worker_count
  310 + _text_dispatch_workers = []
  311 + for idx in range(target_worker_count):
  312 + worker = threading.Thread(
  313 + target=_text_dispatch_worker_loop,
  314 + args=(idx,),
  315 + name=f"embed-text-dispatch-{idx}",
  316 + daemon=True,
  317 + )
  318 + worker.start()
  319 + _text_dispatch_workers.append(worker)
  320 + logger.info(
  321 + "Started text dispatch workers | backend=%s workers=%d",
  322 + _text_backend_name,
  323 + target_worker_count,
  324 + )
  325 +
  326 +
  327 +def _stop_text_dispatch_workers() -> None:
  328 + global _text_dispatch_worker_stop
  329 + with _text_dispatch_cv:
  330 + _text_dispatch_worker_stop = True
  331 + _text_dispatch_cv.notify_all()
  332 +
  333 +
  334 +def _text_dispatch_worker_loop(worker_idx: int) -> None:
  335 + while True:
  336 + with _text_dispatch_cv:
  337 + while (
  338 + not _text_dispatch_high_queue
  339 + and not _text_dispatch_normal_queue
  340 + and not _text_dispatch_worker_stop
  341 + ):
  342 + _text_dispatch_cv.wait()
  343 + if _text_dispatch_worker_stop:
  344 + return
  345 + task = _pop_text_dispatch_task_locked()
  346 + if task is None:
  347 + continue
  348 + try:
  349 + queue_wait_ms = (time.perf_counter() - task.created_at) * 1000.0
  350 + logger.info(
  351 + "text dispatch start | worker=%d priority=%s inputs=%d queue_wait_ms=%.2f",
  352 + worker_idx,
  353 + _priority_label(task.priority),
  354 + len(task.normalized),
  355 + queue_wait_ms,
  356 + extra=_request_log_extra(task.request_id),
  357 + )
  358 + task.result = _embed_text_impl(
  359 + task.normalized,
  360 + task.effective_normalize,
  361 + task.request_id,
  362 + task.priority,
  363 + )
  364 + except Exception as exc:
  365 + task.error = exc
  366 + finally:
  367 + task.done.set()
  368 +
  369 +
  370 +def _submit_text_dispatch_and_wait(
  371 + normalized: List[str],
  372 + effective_normalize: bool,
  373 + request_id: str,
  374 + priority: int,
  375 +) -> _EmbedResult:
  376 + if not any(worker.is_alive() for worker in _text_dispatch_workers):
  377 + _start_text_dispatch_workers()
  378 + task = _TextDispatchTask(
  379 + normalized=normalized,
  380 + effective_normalize=effective_normalize,
  381 + request_id=request_id,
  382 + priority=_effective_priority(priority),
  383 + created_at=time.perf_counter(),
  384 + done=threading.Event(),
  385 + )
  386 + with _text_dispatch_cv:
  387 + if task.priority > 0:
  388 + _text_dispatch_high_queue.append(task)
  389 + else:
  390 + _text_dispatch_normal_queue.append(task)
  391 + _text_dispatch_cv.notify()
  392 + task.done.wait()
  393 + if task.error is not None:
  394 + raise task.error
  395 + if task.result is None:
  396 + raise RuntimeError("Text dispatch worker returned empty result")
  397 + return task.result
  398 +
  399 +
252 400 _text_request_limiter = _InflightLimiter(name="text", limit=_TEXT_MAX_INFLIGHT)
253 401 _image_request_limiter = _InflightLimiter(name="image", limit=_IMAGE_MAX_INFLIGHT)
254 402 _text_stats = _EndpointStats(name="text")
... ... @@ -261,6 +409,7 @@ _image_cache = RedisEmbeddingCache(key_prefix=_CACHE_PREFIX, namespace="image")
261 409 class _SingleTextTask:
262 410 text: str
263 411 normalize: bool
  412 + priority: int
264 413 created_at: float
265 414 request_id: str
266 415 done: threading.Event
... ... @@ -268,12 +417,30 @@ class _SingleTextTask:
268 417 error: Optional[Exception] = None
269 418  
270 419  
271   -_text_single_queue: "deque[_SingleTextTask]" = deque()
  420 +_text_single_high_queue: "deque[_SingleTextTask]" = deque()
  421 +_text_single_normal_queue: "deque[_SingleTextTask]" = deque()
272 422 _text_single_queue_cv = threading.Condition()
273 423 _text_batch_worker: Optional[threading.Thread] = None
274 424 _text_batch_worker_stop = False
275 425  
276 426  
  427 +def _text_microbatch_queue_depth() -> Dict[str, int]:
  428 + with _text_single_queue_cv:
  429 + return {
  430 + "high": len(_text_single_high_queue),
  431 + "normal": len(_text_single_normal_queue),
  432 + "total": len(_text_single_high_queue) + len(_text_single_normal_queue),
  433 + }
  434 +
  435 +
  436 +def _pop_single_text_task_locked() -> Optional["_SingleTextTask"]:
  437 + if _text_single_high_queue:
  438 + return _text_single_high_queue.popleft()
  439 + if _text_single_normal_queue:
  440 + return _text_single_normal_queue.popleft()
  441 + return None
  442 +
  443 +
277 444 def _compact_preview(text: str, max_chars: int) -> str:
278 445 compact = " ".join((text or "").split())
279 446 if len(compact) <= max_chars:
... ... @@ -356,30 +523,41 @@ def _text_batch_worker_loop() -&gt; None:
356 523 max_batch = max(1, int(CONFIG.TEXT_BATCH_SIZE))
357 524 while True:
358 525 with _text_single_queue_cv:
359   - while not _text_single_queue and not _text_batch_worker_stop:
  526 + while (
  527 + not _text_single_high_queue
  528 + and not _text_single_normal_queue
  529 + and not _text_batch_worker_stop
  530 + ):
360 531 _text_single_queue_cv.wait()
361 532 if _text_batch_worker_stop:
362 533 return
363 534  
364   - batch: List[_SingleTextTask] = [_text_single_queue.popleft()]
  535 + first_task = _pop_single_text_task_locked()
  536 + if first_task is None:
  537 + continue
  538 + batch: List[_SingleTextTask] = [first_task]
365 539 deadline = time.perf_counter() + _TEXT_MICROBATCH_WINDOW_SEC
366 540  
367 541 while len(batch) < max_batch:
368 542 remaining = deadline - time.perf_counter()
369 543 if remaining <= 0:
370 544 break
371   - if not _text_single_queue:
  545 + if not _text_single_high_queue and not _text_single_normal_queue:
372 546 _text_single_queue_cv.wait(timeout=remaining)
373 547 continue
374   - while _text_single_queue and len(batch) < max_batch:
375   - batch.append(_text_single_queue.popleft())
  548 + while len(batch) < max_batch:
  549 + next_task = _pop_single_text_task_locked()
  550 + if next_task is None:
  551 + break
  552 + batch.append(next_task)
376 553  
377 554 try:
378 555 queue_wait_ms = [(time.perf_counter() - task.created_at) * 1000.0 for task in batch]
379 556 reqids = [task.request_id for task in batch]
380 557 logger.info(
381   - "text microbatch dispatch | size=%d queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s",
  558 + "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s",
382 559 len(batch),
  560 + _priority_label(max(task.priority for task in batch)),
383 561 min(queue_wait_ms) if queue_wait_ms else 0.0,
384 562 max(queue_wait_ms) if queue_wait_ms else 0.0,
385 563 reqids,
... ... @@ -423,22 +601,32 @@ def _text_batch_worker_loop() -&gt; None:
423 601 task.done.set()
424 602  
425 603  
426   -def _encode_single_text_with_microbatch(text: str, normalize: bool, request_id: str) -> List[float]:
  604 +def _encode_single_text_with_microbatch(
  605 + text: str,
  606 + normalize: bool,
  607 + request_id: str,
  608 + priority: int,
  609 +) -> List[float]:
427 610 task = _SingleTextTask(
428 611 text=text,
429 612 normalize=normalize,
  613 + priority=_effective_priority(priority),
430 614 created_at=time.perf_counter(),
431 615 request_id=request_id,
432 616 done=threading.Event(),
433 617 )
434 618 with _text_single_queue_cv:
435   - _text_single_queue.append(task)
  619 + if task.priority > 0:
  620 + _text_single_high_queue.append(task)
  621 + else:
  622 + _text_single_normal_queue.append(task)
436 623 _text_single_queue_cv.notify()
437 624  
438 625 if not task.done.wait(timeout=_TEXT_REQUEST_TIMEOUT_SEC):
439 626 with _text_single_queue_cv:
  627 + queue = _text_single_high_queue if task.priority > 0 else _text_single_normal_queue
440 628 try:
441   - _text_single_queue.remove(task)
  629 + queue.remove(task)
442 630 except ValueError:
443 631 pass
444 632 raise RuntimeError(
... ... @@ -489,6 +677,7 @@ def load_models():
489 677 f"Unsupported embedding backend: {backend_name}. "
490 678 "Supported: tei, local_st"
491 679 )
  680 + _start_text_dispatch_workers()
492 681 logger.info("Text backend loaded successfully: %s", _text_backend_name)
493 682 except Exception as e:
494 683 logger.error("Failed to load text model: %s", e, exc_info=True)
... ... @@ -532,6 +721,7 @@ def load_models():
532 721 @app.on_event("shutdown")
533 722 def stop_workers() -> None:
534 723 _stop_text_batch_worker()
  724 + _stop_text_dispatch_workers()
535 725  
536 726  
537 727 def _normalize_vector(vec: np.ndarray) -> np.ndarray:
... ... @@ -602,6 +792,8 @@ def _try_full_image_cache_hit(
602 792 def health() -> Dict[str, Any]:
603 793 """Health check endpoint. Returns status and current throttling stats."""
604 794 ready = (not open_text_model or _text_model is not None) and (not open_image_model or _image_model is not None)
  795 + text_dispatch_depth = _text_dispatch_queue_depth()
  796 + text_microbatch_depth = _text_microbatch_queue_depth()
605 797 return {
606 798 "status": "ok" if ready else "degraded",
607 799 "service_kind": _SERVICE_KIND,
... ... @@ -620,9 +812,18 @@ def health() -&gt; Dict[str, Any]:
620 812 "text": _text_stats.snapshot(),
621 813 "image": _image_stats.snapshot(),
622 814 },
  815 + "text_dispatch": {
  816 + "workers": _text_dispatch_worker_count,
  817 + "workers_alive": sum(1 for worker in _text_dispatch_workers if worker.is_alive()),
  818 + "queue_depth": text_dispatch_depth["total"],
  819 + "queue_depth_high": text_dispatch_depth["high"],
  820 + "queue_depth_normal": text_dispatch_depth["normal"],
  821 + },
623 822 "text_microbatch": {
624 823 "window_ms": round(_TEXT_MICROBATCH_WINDOW_SEC * 1000.0, 3),
625   - "queue_depth": len(_text_single_queue),
  824 + "queue_depth": text_microbatch_depth["total"],
  825 + "queue_depth_high": text_microbatch_depth["high"],
  826 + "queue_depth_normal": text_microbatch_depth["normal"],
626 827 "worker_alive": bool(_text_batch_worker is not None and _text_batch_worker.is_alive()),
627 828 "request_timeout_sec": _TEXT_REQUEST_TIMEOUT_SEC,
628 829 },
... ... @@ -654,6 +855,7 @@ def _embed_text_impl(
654 855 normalized: List[str],
655 856 effective_normalize: bool,
656 857 request_id: str,
  858 + priority: int = 0,
657 859 ) -> _EmbedResult:
658 860 if _text_model is None:
659 861 raise RuntimeError("Text model not loaded")
... ... @@ -703,6 +905,7 @@ def _embed_text_impl(
703 905 missing_texts[0],
704 906 normalize=effective_normalize,
705 907 request_id=request_id,
  908 + priority=priority,
706 909 )
707 910 ]
708 911 mode = "microbatch-single"
... ... @@ -777,6 +980,7 @@ async def embed_text(
777 980 http_request: Request,
778 981 response: Response,
779 982 normalize: Optional[bool] = None,
  983 + priority: int = 0,
780 984 ) -> List[Optional[List[float]]]:
781 985 if _text_model is None:
782 986 raise HTTPException(status_code=503, detail="Text embedding model not loaded in this service")
... ... @@ -784,6 +988,9 @@ async def embed_text(
784 988 request_id = _resolve_request_id(http_request)
785 989 response.headers["X-Request-ID"] = request_id
786 990  
  991 + if priority < 0:
  992 + raise HTTPException(status_code=400, detail="priority must be >= 0")
  993 + effective_priority = _effective_priority(priority)
787 994 effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
788 995 normalized: List[str] = []
789 996 for i, t in enumerate(texts):
... ... @@ -806,8 +1013,9 @@ async def embed_text(
806 1013 cache_misses=0,
807 1014 )
808 1015 logger.info(
809   - "embed_text response | backend=%s mode=cache-only inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
  1016 + "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
810 1017 _text_backend_name,
  1018 + _priority_label(effective_priority),
811 1019 len(normalized),
812 1020 effective_normalize,
813 1021 len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
... ... @@ -818,13 +1026,14 @@ async def embed_text(
818 1026 )
819 1027 return cache_only.vectors
820 1028  
821   - accepted, active = _text_request_limiter.try_acquire()
  1029 + accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
822 1030 if not accepted:
823 1031 _text_stats.record_rejected()
824 1032 logger.warning(
825   - "embed_text rejected | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1033 + "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
826 1034 _request_client(http_request),
827 1035 _text_backend_name,
  1036 + _priority_label(effective_priority),
828 1037 len(normalized),
829 1038 effective_normalize,
830 1039 active,
... ... @@ -834,7 +1043,10 @@ async def embed_text(
834 1043 )
835 1044 raise HTTPException(
836 1045 status_code=_OVERLOAD_STATUS_CODE,
837   - detail=f"Text embedding service busy: active={active}, limit={_TEXT_MAX_INFLIGHT}",
  1046 + detail=(
  1047 + "Text embedding service busy for priority=0 requests: "
  1048 + f"active={active}, limit={_TEXT_MAX_INFLIGHT}"
  1049 + ),
838 1050 )
839 1051  
840 1052 request_started = time.perf_counter()
... ... @@ -844,9 +1056,10 @@ async def embed_text(
844 1056 cache_misses = 0
845 1057 try:
846 1058 logger.info(
847   - "embed_text request | client=%s backend=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1059 + "embed_text request | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
848 1060 _request_client(http_request),
849 1061 _text_backend_name,
  1062 + _priority_label(effective_priority),
850 1063 len(normalized),
851 1064 effective_normalize,
852 1065 active,
... ... @@ -855,13 +1068,20 @@ async def embed_text(
855 1068 extra=_request_log_extra(request_id),
856 1069 )
857 1070 verbose_logger.info(
858   - "embed_text detail | payload=%s normalize=%s backend=%s",
  1071 + "embed_text detail | payload=%s normalize=%s backend=%s priority=%s",
859 1072 normalized,
860 1073 effective_normalize,
861 1074 _text_backend_name,
  1075 + _priority_label(effective_priority),
862 1076 extra=_request_log_extra(request_id),
863 1077 )
864   - result = await run_in_threadpool(_embed_text_impl, normalized, effective_normalize, request_id)
  1078 + result = await run_in_threadpool(
  1079 + _submit_text_dispatch_and_wait,
  1080 + normalized,
  1081 + effective_normalize,
  1082 + request_id,
  1083 + effective_priority,
  1084 + )
865 1085 success = True
866 1086 backend_elapsed_ms = result.backend_elapsed_ms
867 1087 cache_hits = result.cache_hits
... ... @@ -875,9 +1095,10 @@ async def embed_text(
875 1095 cache_misses=cache_misses,
876 1096 )
877 1097 logger.info(
878   - "embed_text response | backend=%s mode=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f",
  1098 + "embed_text response | backend=%s mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f",
879 1099 _text_backend_name,
880 1100 result.mode,
  1101 + _priority_label(effective_priority),
881 1102 len(normalized),
882 1103 effective_normalize,
883 1104 len(result.vectors[0]) if result.vectors and result.vectors[0] is not None else 0,
... ... @@ -888,8 +1109,9 @@ async def embed_text(
888 1109 extra=_request_log_extra(request_id),
889 1110 )
890 1111 verbose_logger.info(
891   - "embed_text result detail | count=%d first_vector=%s latency_ms=%.2f",
  1112 + "embed_text result detail | count=%d priority=%s first_vector=%s latency_ms=%.2f",
892 1113 len(result.vectors),
  1114 + _priority_label(effective_priority),
893 1115 result.vectors[0][: _VECTOR_PREVIEW_DIMS]
894 1116 if result.vectors and result.vectors[0] is not None
895 1117 else [],
... ... @@ -909,8 +1131,9 @@ async def embed_text(
909 1131 cache_misses=cache_misses,
910 1132 )
911 1133 logger.error(
912   - "embed_text failed | backend=%s inputs=%d normalize=%s latency_ms=%.2f error=%s",
  1134 + "embed_text failed | backend=%s priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s",
913 1135 _text_backend_name,
  1136 + _priority_label(effective_priority),
914 1137 len(normalized),
915 1138 effective_normalize,
916 1139 latency_ms,
... ... @@ -922,8 +1145,9 @@ async def embed_text(
922 1145 finally:
923 1146 remaining = _text_request_limiter.release(success=success)
924 1147 logger.info(
925   - "embed_text finalize | success=%s active_after=%d",
  1148 + "embed_text finalize | success=%s priority=%s active_after=%d",
926 1149 success,
  1150 + _priority_label(effective_priority),
927 1151 remaining,
928 1152 extra=_request_log_extra(request_id),
929 1153 )
... ... @@ -1019,6 +1243,7 @@ async def embed_image(
1019 1243 http_request: Request,
1020 1244 response: Response,
1021 1245 normalize: Optional[bool] = None,
  1246 + priority: int = 0,
1022 1247 ) -> List[Optional[List[float]]]:
1023 1248 if _image_model is None:
1024 1249 raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service")
... ... @@ -1026,6 +1251,10 @@ async def embed_image(
1026 1251 request_id = _resolve_request_id(http_request)
1027 1252 response.headers["X-Request-ID"] = request_id
1028 1253  
  1254 + if priority < 0:
  1255 + raise HTTPException(status_code=400, detail="priority must be >= 0")
  1256 + effective_priority = _effective_priority(priority)
  1257 +
1029 1258 effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
1030 1259 urls: List[str] = []
1031 1260 for i, url_or_path in enumerate(images):
... ... @@ -1048,7 +1277,8 @@ async def embed_image(
1048 1277 cache_misses=0,
1049 1278 )
1050 1279 logger.info(
1051   - "embed_image response | mode=cache-only inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
  1280 + "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
  1281 + _priority_label(effective_priority),
1052 1282 len(urls),
1053 1283 effective_normalize,
1054 1284 len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
... ... @@ -1059,12 +1289,13 @@ async def embed_image(
1059 1289 )
1060 1290 return cache_only.vectors
1061 1291  
1062   - accepted, active = _image_request_limiter.try_acquire()
  1292 + accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
1063 1293 if not accepted:
1064 1294 _image_stats.record_rejected()
1065 1295 logger.warning(
1066   - "embed_image rejected | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1296 + "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1067 1297 _request_client(http_request),
  1298 + _priority_label(effective_priority),
1068 1299 len(urls),
1069 1300 effective_normalize,
1070 1301 active,
... ... @@ -1074,7 +1305,10 @@ async def embed_image(
1074 1305 )
1075 1306 raise HTTPException(
1076 1307 status_code=_OVERLOAD_STATUS_CODE,
1077   - detail=f"Image embedding service busy: active={active}, limit={_IMAGE_MAX_INFLIGHT}",
  1308 + detail=(
  1309 + "Image embedding service busy for priority=0 requests: "
  1310 + f"active={active}, limit={_IMAGE_MAX_INFLIGHT}"
  1311 + ),
1078 1312 )
1079 1313  
1080 1314 request_started = time.perf_counter()
... ... @@ -1084,8 +1318,9 @@ async def embed_image(
1084 1318 cache_misses = 0
1085 1319 try:
1086 1320 logger.info(
1087   - "embed_image request | client=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1321 + "embed_image request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1088 1322 _request_client(http_request),
  1323 + _priority_label(effective_priority),
1089 1324 len(urls),
1090 1325 effective_normalize,
1091 1326 active,
... ... @@ -1094,9 +1329,10 @@ async def embed_image(
1094 1329 extra=_request_log_extra(request_id),
1095 1330 )
1096 1331 verbose_logger.info(
1097   - "embed_image detail | payload=%s normalize=%s",
  1332 + "embed_image detail | payload=%s normalize=%s priority=%s",
1098 1333 urls,
1099 1334 effective_normalize,
  1335 + _priority_label(effective_priority),
1100 1336 extra=_request_log_extra(request_id),
1101 1337 )
1102 1338 result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id)
... ... @@ -1113,8 +1349,9 @@ async def embed_image(
1113 1349 cache_misses=cache_misses,
1114 1350 )
1115 1351 logger.info(
1116   - "embed_image response | mode=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f",
  1352 + "embed_image response | mode=%s priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=%d first_vector=%s latency_ms=%.2f",
1117 1353 result.mode,
  1354 + _priority_label(effective_priority),
1118 1355 len(urls),
1119 1356 effective_normalize,
1120 1357 len(result.vectors[0]) if result.vectors and result.vectors[0] is not None else 0,
... ... @@ -1146,7 +1383,8 @@ async def embed_image(
1146 1383 cache_misses=cache_misses,
1147 1384 )
1148 1385 logger.error(
1149   - "embed_image failed | inputs=%d normalize=%s latency_ms=%.2f error=%s",
  1386 + "embed_image failed | priority=%s inputs=%d normalize=%s latency_ms=%.2f error=%s",
  1387 + _priority_label(effective_priority),
1150 1388 len(urls),
1151 1389 effective_normalize,
1152 1390 latency_ms,
... ... @@ -1158,8 +1396,9 @@ async def embed_image(
1158 1396 finally:
1159 1397 remaining = _image_request_limiter.release(success=success)
1160 1398 logger.info(
1161   - "embed_image finalize | success=%s active_after=%d",
  1399 + "embed_image finalize | success=%s priority=%s active_after=%d",
1162 1400 success,
  1401 + _priority_label(effective_priority),
1163 1402 remaining,
1164 1403 extra=_request_log_extra(request_id),
1165 1404 )
... ...
embeddings/text_encoder.py
... ... @@ -35,7 +35,12 @@ class TextEmbeddingEncoder:
35 35 expire_time=self.expire_time,
36 36 )
37 37  
38   - def _call_service(self, request_data: List[str], normalize_embeddings: bool = True) -> List[Any]:
  38 + def _call_service(
  39 + self,
  40 + request_data: List[str],
  41 + normalize_embeddings: bool = True,
  42 + priority: int = 0,
  43 + ) -> List[Any]:
39 44 """
40 45 Call the embedding service API.
41 46  
... ... @@ -48,7 +53,10 @@ class TextEmbeddingEncoder:
48 53 try:
49 54 response = requests.post(
50 55 self.endpoint,
51   - params={"normalize": "true" if normalize_embeddings else "false"},
  56 + params={
  57 + "normalize": "true" if normalize_embeddings else "false",
  58 + "priority": max(0, int(priority)),
  59 + },
52 60 json=request_data,
53 61 timeout=60
54 62 )
... ... @@ -62,6 +70,7 @@ class TextEmbeddingEncoder:
62 70 self,
63 71 sentences: Union[str, List[str]],
64 72 normalize_embeddings: bool = True,
  73 + priority: int = 0,
65 74 device: str = 'cpu',
66 75 batch_size: int = 32
67 76 ) -> np.ndarray:
... ... @@ -100,7 +109,11 @@ class TextEmbeddingEncoder:
100 109  
101 110 # If there are uncached texts, call service
102 111 if uncached_texts:
103   - response_data = self._call_service(request_data, normalize_embeddings=normalize_embeddings)
  112 + response_data = self._call_service(
  113 + request_data,
  114 + normalize_embeddings=normalize_embeddings,
  115 + priority=priority,
  116 + )
104 117  
105 118 # Process response
106 119 for i, text in enumerate(uncached_texts):
... ...
perf_reports/README.md 0 → 100644
... ... @@ -0,0 +1,34 @@
  1 +# 性能测试报告索引
  2 +
  3 +本目录存放各次压测/矩阵的原始 JSON 与说明。**推荐复用**仓库脚本,避免重复造轮子:
  4 +
  5 +| 脚本 | 用途 |
  6 +|------|------|
  7 +| `scripts/perf_api_benchmark.py` | 搜索后端、向量、翻译、重排等 HTTP 接口压测;支持 `--embed-text-priority` / `--embed-image-priority` 与 `scripts/perf_cases.json.example` |
  8 +
  9 +历史矩阵示例(并发扫描):
  10 +
  11 +- `2026-03-12/matrix_report/summary.md` — 与 `summary.json` 同目录
  12 +
  13 +## 2026-03-20 — 向量服务 `priority` 参数烟测
  14 +
  15 +环境:本机 `127.0.0.1:6005`(文本)、`127.0.0.1:6008`(图片),命令与结果见同目录 JSON:
  16 +
  17 +| 报告文件 | 场景 | 说明 |
  18 +|----------|------|------|
  19 +| `2026-03-20_embed_text_p0.json` | `embed_text` | `priority=0`(默认),8s,并发 10 |
  20 +| `2026-03-20_embed_text_p1.json` | `embed_text` | `--embed-text-priority 1`,8s,并发 10 |
  21 +| `2026-03-20_embed_image_p0.json` | `embed_image` | `priority=0`,8s,并发 5 |
  22 +| `2026-03-20_embed_image_p1.json` | `embed_image` | `--embed-image-priority 1`,8s,并发 5 |
  23 +
  24 +复现示例:
  25 +
  26 +```bash
  27 +source activate.sh
  28 +python scripts/perf_api_benchmark.py --scenario embed_text --duration 8 --concurrency 10 --timeout 30 --output perf_reports/2026-03-20_embed_text_p0.json
  29 +python scripts/perf_api_benchmark.py --scenario embed_text --duration 8 --concurrency 10 --embed-text-priority 1 --output perf_reports/2026-03-20_embed_text_p1.json
  30 +python scripts/perf_api_benchmark.py --scenario embed_image --duration 8 --concurrency 5 --timeout 60 --output perf_reports/2026-03-20_embed_image_p0.json
  31 +python scripts/perf_api_benchmark.py --scenario embed_image --duration 8 --concurrency 5 --embed-image-priority 1 --output perf_reports/2026-03-20_embed_image_p1.json
  32 +```
  33 +
  34 +说明:本次为 **8 秒 smoke**,与 `2026-03-12` 矩阵的时长/并发不可直接横向对比;仅验证 `priority` 参数下服务仍返回 200 且 payload 校验通过。
... ...
query/query_parser.py
... ... @@ -442,7 +442,7 @@ class QueryParser:
442 442 # Submit encoding task to thread pool for async execution
443 443 encoding_executor = ThreadPoolExecutor(max_workers=1)
444 444 def _encode_query_vector() -> Optional[np.ndarray]:
445   - arr = self.text_encoder.encode([query_text])
  445 + arr = self.text_encoder.encode([query_text], priority=1)
446 446 if arr is None or len(arr) == 0:
447 447 return None
448 448 vec = arr[0]
... ...
scripts/perf_api_benchmark.py
... ... @@ -15,6 +15,9 @@ Examples:
15 15 python scripts/perf_api_benchmark.py --scenario backend_suggest --duration 30 --concurrency 50 --tenant-id 162
16 16 python scripts/perf_api_benchmark.py --scenario all --duration 60 --concurrency 80 --tenant-id 162
17 17 python scripts/perf_api_benchmark.py --scenario all --cases-file scripts/perf_cases.json.example --output perf_result.json
  18 + # Embedding admission / priority (query param `priority`; same semantics as embedding service):
  19 + python scripts/perf_api_benchmark.py --scenario embed_text --embed-text-priority 1 --duration 30 --concurrency 20
  20 + python scripts/perf_api_benchmark.py --scenario embed_image --embed-image-priority 1 --duration 30 --concurrency 10
18 21 """
19 22  
20 23 from __future__ import annotations
... ... @@ -72,9 +75,9 @@ def validate_response_payload(
72 75 ) -> Tuple[bool, str]:
73 76 """
74 77 Lightweight payload validation for correctness-aware perf tests.
75   - Currently strict for embed_text to catch NaN/null vector regressions.
  78 + Strict for embed_text / embed_image to catch NaN/null vector regressions.
76 79 """
77   - if scenario_name != "embed_text":
  80 + if scenario_name not in ("embed_text", "embed_image"):
78 81 return True, ""
79 82  
80 83 expected_len = len(tpl.json_body) if isinstance(tpl.json_body, list) else None
... ... @@ -219,6 +222,43 @@ def load_cases_from_file(path: Path, tenant_id: str) -&gt; Dict[str, List[RequestTe
219 222 return out
220 223  
221 224  
  225 +def apply_embed_priority_params(
  226 + scenarios: Dict[str, Scenario],
  227 + embed_text_priority: int,
  228 + embed_image_priority: int,
  229 +) -> None:
  230 + """
  231 + Merge default `priority` query param into embed templates when absent.
  232 + `scripts/perf_cases.json` may set per-request `params.priority` to override.
  233 + """
  234 + mapping = {
  235 + "embed_text": max(0, int(embed_text_priority)),
  236 + "embed_image": max(0, int(embed_image_priority)),
  237 + }
  238 + for name, pri in mapping.items():
  239 + if name not in scenarios:
  240 + continue
  241 + scen = scenarios[name]
  242 + new_templates: List[RequestTemplate] = []
  243 + for t in scen.templates:
  244 + params = dict(t.params or {})
  245 + params.setdefault("priority", str(pri))
  246 + new_templates.append(
  247 + RequestTemplate(
  248 + method=t.method,
  249 + path=t.path,
  250 + params=params,
  251 + json_body=t.json_body,
  252 + headers=t.headers,
  253 + )
  254 + )
  255 + scenarios[name] = Scenario(
  256 + name=scen.name,
  257 + templates=new_templates,
  258 + timeout_sec=scen.timeout_sec,
  259 + )
  260 +
  261 +
222 262 def build_scenarios(args: argparse.Namespace) -> Dict[str, Scenario]:
223 263 defaults = make_default_templates(args.tenant_id)
224 264 if args.cases_file:
... ... @@ -252,6 +292,11 @@ def build_scenarios(args: argparse.Namespace) -&gt; Dict[str, Scenario]:
252 292 )
253 293 )
254 294 scenarios[name] = Scenario(name=name, templates=rewritten, timeout_sec=args.timeout)
  295 + apply_embed_priority_params(
  296 + scenarios,
  297 + embed_text_priority=args.embed_text_priority,
  298 + embed_image_priority=args.embed_image_priority,
  299 + )
255 300 return scenarios
256 301  
257 302  
... ... @@ -483,6 +528,18 @@ def parse_args() -&gt; argparse.Namespace:
483 528 default=0,
484 529 help="Optional top_n for rerank requests in dynamic docs mode (0 means omit top_n).",
485 530 )
  531 + parser.add_argument(
  532 + "--embed-text-priority",
  533 + type=int,
  534 + default=0,
  535 + help="Default query param priority= for embed_text (0=offline admission; >0 bypasses rejection). Merged into params unless set in --cases-file.",
  536 + )
  537 + parser.add_argument(
  538 + "--embed-image-priority",
  539 + type=int,
  540 + default=0,
  541 + help="Default query param priority= for embed_image (same semantics as embed-text-priority).",
  542 + )
486 543 return parser.parse_args()
487 544  
488 545  
... ... @@ -609,6 +666,8 @@ async def main_async() -&gt; int:
609 666 print(f" embedding_image_base={args.embedding_image_base}")
610 667 print(f" translator_base={args.translator_base}")
611 668 print(f" reranker_base={args.reranker_base}")
  669 + print(f" embed_text_priority={args.embed_text_priority}")
  670 + print(f" embed_image_priority={args.embed_image_priority}")
612 671 if args.rerank_dynamic_docs:
613 672 print(" rerank_dynamic_docs=True")
614 673 print(f" rerank_doc_count={args.rerank_doc_count}")
... ... @@ -667,6 +726,8 @@ async def main_async() -&gt; int:
667 726 "rerank_query": args.rerank_query,
668 727 "rerank_seed": args.rerank_seed,
669 728 "rerank_top_n": args.rerank_top_n,
  729 + "embed_text_priority": args.embed_text_priority,
  730 + "embed_image_priority": args.embed_image_priority,
670 731 },
671 732 "results": results,
672 733 "overall": aggregate_results(results),
... ...
scripts/perf_cases.json.example
... ... @@ -32,9 +32,18 @@
32 32 {
33 33 "method": "POST",
34 34 "path": "/embed/text",
  35 + "params": {"priority": "0"},
35 36 "json": ["wireless mouse", "gaming keyboard", "USB-C cable", "barbie doll"]
36 37 }
37 38 ],
  39 + "embed_image": [
  40 + {
  41 + "method": "POST",
  42 + "path": "/embed/image",
  43 + "params": {"normalize": "true", "priority": "0"},
  44 + "json": ["/data/saas-search/docs/image-dress1.png"]
  45 + }
  46 + ],
38 47 "translate": [
39 48 {
40 49 "method": "POST",
... ...
search/searcher.py
... ... @@ -791,7 +791,7 @@ class Searcher:
791 791 # Generate image embedding
792 792 if self.image_encoder is None:
793 793 raise RuntimeError("Image encoder is not initialized at startup")
794   - image_vector = self.image_encoder.encode_image_from_url(image_url)
  794 + image_vector = self.image_encoder.encode_image_from_url(image_url, priority=1)
795 795  
796 796 if image_vector is None:
797 797 raise ValueError(f"Failed to encode image: {image_url}")
... ...
tests/ci/test_service_api_contracts.py
... ... @@ -540,7 +540,15 @@ def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient)
540 540  
541 541  
542 542 class _FakeTextModel:
543   - def encode_batch(self, texts, batch_size=32, device="cpu", normalize_embeddings=True):
  543 + """Matches TEI / server path: `_text_model.encode(...)` (not encode_batch)."""
  544 +
  545 + def encode(
  546 + self,
  547 + texts,
  548 + batch_size=32,
  549 + device="cpu",
  550 + normalize_embeddings=True,
  551 + ):
544 552 return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts]
545 553  
546 554  
... ... @@ -549,6 +557,18 @@ class _FakeImageModel:
549 557 return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls]
550 558  
551 559  
  560 +class _EmbeddingCacheMiss:
  561 + """Avoid Redis/module cache hits so contract tests exercise the encode path."""
  562 +
  563 + redis_client = None
  564 +
  565 + def get(self, key):
  566 + return None
  567 +
  568 + def set(self, key, value):
  569 + return True
  570 +
  571 +
552 572 @pytest.fixture
553 573 def embedding_module():
554 574 import embeddings.server as emb_server
... ... @@ -556,17 +576,31 @@ def embedding_module():
556 576 emb_server.app.router.on_startup.clear()
557 577 emb_server._text_model = _FakeTextModel()
558 578 emb_server._image_model = _FakeImageModel()
  579 + emb_server._text_backend_name = "tei"
  580 + emb_server._text_cache = _EmbeddingCacheMiss()
  581 + emb_server._image_cache = _EmbeddingCacheMiss()
559 582 yield emb_server
560 583  
561 584  
562 585 def test_embedding_text_contract(embedding_module):
563   - data = embedding_module.embed_text(["hello", "world"])
  586 + """Contract via HTTP like production; route handlers require Request/Response."""
  587 + from fastapi.testclient import TestClient
  588 +
  589 + with TestClient(embedding_module.app) as client:
  590 + resp = client.post("/embed/text", json=["hello", "world"])
  591 + assert resp.status_code == 200
  592 + data = resp.json()
564 593 assert len(data) == 2
565 594 assert len(data[0]) == 3
566 595  
567 596  
568 597 def test_embedding_image_contract(embedding_module):
569   - data = embedding_module.embed_image(["https://example.com/a.jpg"])
  598 + from fastapi.testclient import TestClient
  599 +
  600 + with TestClient(embedding_module.app) as client:
  601 + resp = client.post("/embed/image", json=["https://example.com/a.jpg"])
  602 + assert resp.status_code == 200
  603 + data = resp.json()
570 604 assert len(data[0]) == 3
571 605  
572 606  
... ...
tests/test_embedding_pipeline.py
... ... @@ -63,7 +63,11 @@ class _FakeTranslator:
63 63  
64 64  
65 65 class _FakeQueryEncoder:
  66 + def __init__(self):
  67 + self.calls = []
  68 +
66 69 def encode(self, sentences, **kwargs):
  70 + self.calls.append({"sentences": sentences, "kwargs": dict(kwargs)})
67 71 if isinstance(sentences, str):
68 72 sentences = [sentences]
69 73 return np.array([np.array([0.11, 0.22, 0.33], dtype=np.float32) for _ in sentences], dtype=object)
... ... @@ -98,9 +102,7 @@ def _build_test_config() -&gt; SearchConfig:
98 102 rerank=RerankConfig(),
99 103 spu_config=SPUConfig(enabled=True, spu_field="spu_id", inner_hits_size=3),
100 104 es_index_name="test_products",
101   - tenant_config={},
102 105 es_settings={},
103   - services={},
104 106 )
105 107  
106 108  
... ... @@ -111,6 +113,7 @@ def test_text_embedding_encoder_response_alignment(monkeypatch):
111 113 def _fake_post(url, json, timeout, **kwargs):
112 114 assert url.endswith("/embed/text")
113 115 assert json == ["hello", "world"]
  116 + assert kwargs["params"]["priority"] == 0
114 117 return _FakeResponse([[0.1, 0.2], [0.3, 0.4]])
115 118  
116 119 monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post)
... ... @@ -172,6 +175,7 @@ def test_image_embedding_encoder_cache_hit(monkeypatch):
172 175  
173 176 def _fake_post(url, params, json, timeout, **kwargs):
174 177 calls["count"] += 1
  178 + assert params["priority"] == 0
175 179 return _FakeResponse([[0.1, 0.2]])
176 180  
177 181 monkeypatch.setattr("embeddings.image_encoder.requests.post", _fake_post)
... ... @@ -184,16 +188,35 @@ def test_image_embedding_encoder_cache_hit(monkeypatch):
184 188 assert np.allclose(out[1], np.array([0.1, 0.2], dtype=np.float32))
185 189  
186 190  
  191 +def test_image_embedding_encoder_passes_priority(monkeypatch):
  192 + fake_cache = _FakeEmbeddingCache()
  193 + monkeypatch.setattr("embeddings.image_encoder.RedisEmbeddingCache", lambda **kwargs: fake_cache)
  194 +
  195 + def _fake_post(url, params, json, timeout, **kwargs):
  196 + assert params["priority"] == 1
  197 + return _FakeResponse([[0.1, 0.2]])
  198 +
  199 + monkeypatch.setattr("embeddings.image_encoder.requests.post", _fake_post)
  200 +
  201 + encoder = CLIPImageEncoder(service_url="http://127.0.0.1:6008")
  202 + out = encoder.encode_batch(["https://example.com/a.jpg"], priority=1)
  203 + assert len(out) == 1
  204 + assert np.allclose(out[0], np.array([0.1, 0.2], dtype=np.float32))
  205 +
  206 +
187 207 def test_query_parser_generates_query_vector_with_encoder():
  208 + encoder = _FakeQueryEncoder()
188 209 parser = QueryParser(
189 210 config=_build_test_config(),
190   - text_encoder=_FakeQueryEncoder(),
  211 + text_encoder=encoder,
191 212 translator=_FakeTranslator(),
192 213 )
193 214  
194 215 parsed = parser.parse("red dress", tenant_id="162", generate_vector=True)
195 216 assert parsed.query_vector is not None
196 217 assert parsed.query_vector.shape == (3,)
  218 + assert encoder.calls
  219 + assert encoder.calls[0]["kwargs"]["priority"] == 1
197 220  
198 221  
199 222 def test_query_parser_skips_query_vector_when_disabled():
... ...
tests/test_embedding_service_limits.py
... ... @@ -69,6 +69,8 @@ def test_health_exposes_limit_stats(monkeypatch):
69 69  
70 70  
71 71 def test_embed_image_rejects_when_image_lane_is_full(monkeypatch):
  72 + # Ensure no cache hit (module-level Redis cache may contain this URL from other tests).
  73 + monkeypatch.setattr(embedding_server, "_image_cache", _FakeCache({}))
72 74 limiter = embedding_server._InflightLimiter("image", 1)
73 75 acquired, _ = limiter.try_acquire()
74 76 assert acquired is True
... ...
tests/test_embedding_service_priority.py 0 → 100644
... ... @@ -0,0 +1,81 @@
  1 +import threading
  2 +
  3 +import embeddings.server as emb_server
  4 +
  5 +
  6 +def test_text_inflight_limiter_priority_bypass():
  7 + limiter = emb_server._InflightLimiter(name="text", limit=1)
  8 +
  9 + accepted, active = limiter.try_acquire()
  10 + assert accepted is True
  11 + assert active == 1
  12 +
  13 + accepted, active = limiter.try_acquire()
  14 + assert accepted is False
  15 + assert active == 1
  16 +
  17 + accepted, active = limiter.try_acquire(bypass_limit=True)
  18 + assert accepted is True
  19 + assert active == 2
  20 +
  21 + snapshot = limiter.snapshot()
  22 + assert snapshot["priority_bypass_total"] == 1
  23 +
  24 + limiter.release(success=True)
  25 + limiter.release(success=True)
  26 +
  27 +
  28 +def test_text_dispatch_prefers_high_priority_queue():
  29 + high_task = emb_server._TextDispatchTask(
  30 + normalized=["online"],
  31 + effective_normalize=True,
  32 + request_id="high",
  33 + priority=1,
  34 + created_at=0.0,
  35 + done=threading.Event(),
  36 + )
  37 + normal_task = emb_server._TextDispatchTask(
  38 + normalized=["offline"],
  39 + effective_normalize=True,
  40 + request_id="normal",
  41 + priority=0,
  42 + created_at=0.0,
  43 + done=threading.Event(),
  44 + )
  45 +
  46 + with emb_server._text_dispatch_cv:
  47 + emb_server._text_dispatch_high_queue.clear()
  48 + emb_server._text_dispatch_normal_queue.clear()
  49 + emb_server._text_dispatch_normal_queue.append(normal_task)
  50 + emb_server._text_dispatch_high_queue.append(high_task)
  51 +
  52 + first = emb_server._pop_text_dispatch_task_locked()
  53 + second = emb_server._pop_text_dispatch_task_locked()
  54 +
  55 + emb_server._text_dispatch_high_queue.clear()
  56 + emb_server._text_dispatch_normal_queue.clear()
  57 +
  58 + assert first is high_task
  59 + assert second is normal_task
  60 +
  61 +
  62 +def test_image_inflight_limiter_priority_bypass():
  63 + limiter = emb_server._InflightLimiter(name="image", limit=1)
  64 +
  65 + accepted, active = limiter.try_acquire()
  66 + assert accepted is True
  67 + assert active == 1
  68 +
  69 + accepted, active = limiter.try_acquire()
  70 + assert accepted is False
  71 + assert active == 1
  72 +
  73 + accepted, active = limiter.try_acquire(bypass_limit=True)
  74 + assert accepted is True
  75 + assert active == 2
  76 +
  77 + snapshot = limiter.snapshot()
  78 + assert snapshot["priority_bypass_total"] == 1
  79 +
  80 + limiter.release(success=True)
  81 + limiter.release(success=True)
... ...