test_embedding_service_priority.py 2.24 KB
import threading

import embeddings.server as emb_server


def test_text_inflight_limiter_priority_bypass():
    limiter = emb_server._InflightLimiter(name="text", limit=1)

    accepted, active = limiter.try_acquire()
    assert accepted is True
    assert active == 1

    accepted, active = limiter.try_acquire()
    assert accepted is False
    assert active == 1

    accepted, active = limiter.try_acquire(bypass_limit=True)
    assert accepted is True
    assert active == 2

    snapshot = limiter.snapshot()
    assert snapshot["priority_bypass_total"] == 1

    limiter.release(success=True)
    limiter.release(success=True)


def test_text_dispatch_prefers_high_priority_queue():
    high_task = emb_server._TextDispatchTask(
        normalized=["online"],
        effective_normalize=True,
        request_id="high",
        priority=1,
        created_at=0.0,
        done=threading.Event(),
    )
    normal_task = emb_server._TextDispatchTask(
        normalized=["offline"],
        effective_normalize=True,
        request_id="normal",
        priority=0,
        created_at=0.0,
        done=threading.Event(),
    )

    with emb_server._text_dispatch_cv:
        emb_server._text_dispatch_high_queue.clear()
        emb_server._text_dispatch_normal_queue.clear()
        emb_server._text_dispatch_normal_queue.append(normal_task)
        emb_server._text_dispatch_high_queue.append(high_task)

        first = emb_server._pop_text_dispatch_task_locked()
        second = emb_server._pop_text_dispatch_task_locked()

        emb_server._text_dispatch_high_queue.clear()
        emb_server._text_dispatch_normal_queue.clear()

    assert first is high_task
    assert second is normal_task


def test_image_inflight_limiter_priority_bypass():
    limiter = emb_server._InflightLimiter(name="image", limit=1)

    accepted, active = limiter.try_acquire()
    assert accepted is True
    assert active == 1

    accepted, active = limiter.try_acquire()
    assert accepted is False
    assert active == 1

    accepted, active = limiter.try_acquire(bypass_limit=True)
    assert accepted is True
    assert active == 2

    snapshot = limiter.snapshot()
    assert snapshot["priority_bypass_total"] == 1

    limiter.release(success=True)
    limiter.release(success=True)