Blame view

tests/test_embedding_pipeline.py 5.12 KB
950a640e   tangwang   embeddings
1
2
3
4
  import pickle
  from typing import Any, Dict, List, Optional
  
  import numpy as np
ed948666   tangwang   tidy
5
  import pytest
950a640e   tangwang   embeddings
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
  
  from config import (
      FunctionScoreConfig,
      IndexConfig,
      QueryConfig,
      RankingConfig,
      RerankConfig,
      SPUConfig,
      SearchConfig,
  )
  from embeddings.text_encoder import TextEmbeddingEncoder
  from query import QueryParser
  
  
  class _FakeRedis:
      def __init__(self):
          self.store: Dict[str, bytes] = {}
  
      def ping(self):
          return True
  
      def get(self, key: str):
          return self.store.get(key)
  
      def setex(self, key: str, _expire, value: bytes):
          self.store[key] = value
          return True
  
      def expire(self, key: str, _expire):
          return key in self.store
  
      def delete(self, key: str):
          self.store.pop(key, None)
          return True
  
  
  class _FakeResponse:
      def __init__(self, payload: List[Optional[List[float]]]):
          self._payload = payload
  
      def raise_for_status(self):
          return None
  
      def json(self):
          return self._payload
  
  
  class _FakeTranslator:
      def translate(
          self,
          text: str,
          target_lang: str,
          source_lang: Optional[str] = None,
          prompt: Optional[str] = None,
      ) -> str:
          return f"{text}-{target_lang}"
  
  
  class _FakeQueryEncoder:
      def encode(self, sentences, **kwargs):
          if isinstance(sentences, str):
              sentences = [sentences]
          return np.array([np.array([0.11, 0.22, 0.33], dtype=np.float32) for _ in sentences], dtype=object)
  
  
  def _build_test_config() -> SearchConfig:
      return SearchConfig(
          field_boosts={"title.en": 3.0},
          indexes=[IndexConfig(name="default", label="default", fields=["title.en"], boost=1.0)],
          query_config=QueryConfig(
              supported_languages=["en", "zh"],
              default_language="en",
              enable_text_embedding=True,
              enable_query_rewrite=False,
950a640e   tangwang   embeddings
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
              rewrite_dictionary={},
              translation_prompts={"query_zh": "e-commerce domain", "query_en": "e-commerce domain"},
              text_embedding_field="title_embedding",
              image_embedding_field=None,
          ),
          ranking=RankingConfig(expression="bm25()", description="test"),
          function_score=FunctionScoreConfig(),
          rerank=RerankConfig(),
          spu_config=SPUConfig(enabled=True, spu_field="spu_id", inner_hits_size=3),
          es_index_name="test_products",
          tenant_config={},
          es_settings={},
          services={},
      )
  
  
  def test_text_embedding_encoder_response_alignment(monkeypatch):
      fake_redis = _FakeRedis()
      monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis)
  
      def _fake_post(url, json, timeout):
          assert url.endswith("/embed/text")
          assert json == ["hello", "world"]
ed948666   tangwang   tidy
103
          return _FakeResponse([[0.1, 0.2], [0.3, 0.4]])
950a640e   tangwang   embeddings
104
105
106
107
108
109
110
111
112
  
      monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post)
  
      encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005")
      out = encoder.encode(["hello", "world"])
  
      assert len(out) == 2
      assert isinstance(out[0], np.ndarray)
      assert out[0].shape == (2,)
ed948666   tangwang   tidy
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
      assert isinstance(out[1], np.ndarray)
      assert out[1].shape == (2,)
  
  
  def test_text_embedding_encoder_raises_on_missing_vector(monkeypatch):
      fake_redis = _FakeRedis()
      monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis)
  
      def _fake_post(url, json, timeout):
          return _FakeResponse([[0.1, 0.2], None])
  
      monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post)
  
      encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005")
      with pytest.raises(ValueError):
          encoder.encode(["hello", "world"])
950a640e   tangwang   embeddings
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
  
  
  def test_text_embedding_encoder_cache_hit(monkeypatch):
      fake_redis = _FakeRedis()
      cached = np.array([0.9, 0.8], dtype=np.float32)
      fake_redis.store["embedding:generic:cached-text"] = pickle.dumps(cached)
      monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis)
  
      calls = {"count": 0}
  
      def _fake_post(url, json, timeout):
          calls["count"] += 1
          return _FakeResponse([[0.3, 0.4]])
  
      monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post)
  
      encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005")
      out = encoder.encode(["cached-text", "new-text"])
  
      assert calls["count"] == 1
      assert np.allclose(out[0], cached)
      assert np.allclose(out[1], np.array([0.3, 0.4], dtype=np.float32))
  
  
  def test_query_parser_generates_query_vector_with_encoder():
      parser = QueryParser(
          config=_build_test_config(),
          text_encoder=_FakeQueryEncoder(),
          translator=_FakeTranslator(),
      )
  
      parsed = parser.parse("red dress", tenant_id="162", generate_vector=True)
      assert parsed.query_vector is not None
      assert parsed.query_vector.shape == (3,)
  
  
  def test_query_parser_skips_query_vector_when_disabled():
      parser = QueryParser(
          config=_build_test_config(),
          text_encoder=_FakeQueryEncoder(),
          translator=_FakeTranslator(),
      )
  
      parsed = parser.parse("red dress", tenant_id="162", generate_vector=False)
      assert parsed.query_vector is None