Blame view

tests/test_embedding_service_priority.py 2.37 KB
b754fd41   tangwang   图片向量化支持优先级参数
1
2
3
4
  import threading
  
  import embeddings.server as emb_server
  
99b72698   tangwang   测试回归钩子梳理
5
6
7
8
  import pytest
  
  pytestmark = [pytest.mark.embedding, pytest.mark.regression]
  
b754fd41   tangwang   图片向量化支持优先级参数
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  
  def test_text_inflight_limiter_priority_bypass():
      limiter = emb_server._InflightLimiter(name="text", limit=1)
  
      accepted, active = limiter.try_acquire()
      assert accepted is True
      assert active == 1
  
      accepted, active = limiter.try_acquire()
      assert accepted is False
      assert active == 1
  
      accepted, active = limiter.try_acquire(bypass_limit=True)
      assert accepted is True
      assert active == 2
  
      snapshot = limiter.snapshot()
      assert snapshot["priority_bypass_total"] == 1
  
      limiter.release(success=True)
      limiter.release(success=True)
  
  
  def test_text_dispatch_prefers_high_priority_queue():
      high_task = emb_server._TextDispatchTask(
          normalized=["online"],
          effective_normalize=True,
          request_id="high",
99b72698   tangwang   测试回归钩子梳理
37
          user_id="u-high",
b754fd41   tangwang   图片向量化支持优先级参数
38
39
40
41
42
43
44
45
          priority=1,
          created_at=0.0,
          done=threading.Event(),
      )
      normal_task = emb_server._TextDispatchTask(
          normalized=["offline"],
          effective_normalize=True,
          request_id="normal",
99b72698   tangwang   测试回归钩子梳理
46
          user_id="u-normal",
b754fd41   tangwang   图片向量化支持优先级参数
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
          priority=0,
          created_at=0.0,
          done=threading.Event(),
      )
  
      with emb_server._text_dispatch_cv:
          emb_server._text_dispatch_high_queue.clear()
          emb_server._text_dispatch_normal_queue.clear()
          emb_server._text_dispatch_normal_queue.append(normal_task)
          emb_server._text_dispatch_high_queue.append(high_task)
  
          first = emb_server._pop_text_dispatch_task_locked()
          second = emb_server._pop_text_dispatch_task_locked()
  
          emb_server._text_dispatch_high_queue.clear()
          emb_server._text_dispatch_normal_queue.clear()
  
      assert first is high_task
      assert second is normal_task
  
  
  def test_image_inflight_limiter_priority_bypass():
      limiter = emb_server._InflightLimiter(name="image", limit=1)
  
      accepted, active = limiter.try_acquire()
      assert accepted is True
      assert active == 1
  
      accepted, active = limiter.try_acquire()
      assert accepted is False
      assert active == 1
  
      accepted, active = limiter.try_acquire(bypass_limit=True)
      assert accepted is True
      assert active == 2
  
      snapshot = limiter.snapshot()
      assert snapshot["priority_bypass_total"] == 1
  
      limiter.release(success=True)
      limiter.release(success=True)