import threading import embeddings.server as emb_server def test_text_inflight_limiter_priority_bypass(): limiter = emb_server._InflightLimiter(name="text", limit=1) accepted, active = limiter.try_acquire() assert accepted is True assert active == 1 accepted, active = limiter.try_acquire() assert accepted is False assert active == 1 accepted, active = limiter.try_acquire(bypass_limit=True) assert accepted is True assert active == 2 snapshot = limiter.snapshot() assert snapshot["priority_bypass_total"] == 1 limiter.release(success=True) limiter.release(success=True) def test_text_dispatch_prefers_high_priority_queue(): high_task = emb_server._TextDispatchTask( normalized=["online"], effective_normalize=True, request_id="high", priority=1, created_at=0.0, done=threading.Event(), ) normal_task = emb_server._TextDispatchTask( normalized=["offline"], effective_normalize=True, request_id="normal", priority=0, created_at=0.0, done=threading.Event(), ) with emb_server._text_dispatch_cv: emb_server._text_dispatch_high_queue.clear() emb_server._text_dispatch_normal_queue.clear() emb_server._text_dispatch_normal_queue.append(normal_task) emb_server._text_dispatch_high_queue.append(high_task) first = emb_server._pop_text_dispatch_task_locked() second = emb_server._pop_text_dispatch_task_locked() emb_server._text_dispatch_high_queue.clear() emb_server._text_dispatch_normal_queue.clear() assert first is high_task assert second is normal_task def test_image_inflight_limiter_priority_bypass(): limiter = emb_server._InflightLimiter(name="image", limit=1) accepted, active = limiter.try_acquire() assert accepted is True assert active == 1 accepted, active = limiter.try_acquire() assert accepted is False assert active == 1 accepted, active = limiter.try_acquire(bypass_limit=True) assert accepted is True assert active == 2 snapshot = limiter.snapshot() assert snapshot["priority_bypass_total"] == 1 limiter.release(success=True) limiter.release(success=True)