7299bae6
tangwang
tests
|
1
2
|
from __future__ import annotations
|
e7a2c0b7
tangwang
img encode
|
3
4
|
import json
from pathlib import Path
|
7299bae6
tangwang
tests
|
5
6
7
8
|
from types import SimpleNamespace
from typing import Any, Dict, List
import numpy as np
|
2e3670ab
tangwang
index services
|
9
|
import pandas as pd
|
7299bae6
tangwang
tests
|
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import pytest
from fastapi.testclient import TestClient
class _FakeSearcher:
def search(self, **kwargs):
return SimpleNamespace(
results=[
{
"spu_id": "spu-1",
"title": "测试商品",
"price": 99.0,
"currency": "USD",
"in_stock": True,
"skus": [],
"relevance_score": 1.2,
}
],
total=1,
max_score=1.2,
took_ms=8,
facets=[],
query_info={"normalized_query": kwargs.get("query", "")},
suggestions=[],
related_searches=[],
debug_info=None,
)
def search_by_image(self, **kwargs):
return self.search(**kwargs)
class _FakeSuggestionService:
def search(self, **kwargs):
return {
"query": kwargs["query"],
"language": kwargs.get("language", "en"),
"resolved_language": kwargs.get("language", "en"),
"suggestions": [{"text": "iphone 15", "score": 1.0}],
"took_ms": 3,
}
@pytest.fixture
def search_client(monkeypatch):
import api.app as search_app
monkeypatch.setattr(search_app, "init_service", lambda es_host="": None)
monkeypatch.setattr(search_app, "get_searcher", lambda: _FakeSearcher())
monkeypatch.setattr(search_app, "get_suggestion_service", lambda: _FakeSuggestionService())
with TestClient(search_app.app) as client:
yield client
def test_search_api_contract(search_client: TestClient):
response = search_client.post(
"/search/",
headers={"X-Tenant-ID": "162"},
json={"query": "toy", "size": 5},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["results"][0]["spu_id"] == "spu-1"
def test_image_search_api_contract(search_client: TestClient):
response = search_client.post(
"/search/image",
headers={"X-Tenant-ID": "162"},
json={"image_url": "https://example.com/a.jpg", "size": 3},
)
assert response.status_code == 200
assert response.json()["results"][0]["spu_id"] == "spu-1"
def test_suggestion_api_contract(search_client: TestClient):
response = search_client.get(
"/search/suggestions?q=iph&size=5&language=en",
headers={"X-Tenant-ID": "162"},
)
assert response.status_code == 200
data = response.json()
assert data["query"] == "iph"
assert len(data["suggestions"]) == 1
|
26b910bd
tangwang
refactor service ...
|
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
def test_instant_search_not_implemented(search_client: TestClient):
response = search_client.get("/search/instant?q=iph&size=5")
assert response.status_code == 501
def test_admin_stats_contract(search_client: TestClient, monkeypatch):
import api.app as search_app
class _FakeIndices:
@staticmethod
def exists(index: str) -> bool:
return index.endswith("search_products_tenant_162")
@staticmethod
def stats(index: str):
return {
"indices": {
index: {
"total": {
"store": {
"size_in_bytes": 2 * 1024 * 1024,
}
}
}
}
}
class _FakeClient:
indices = _FakeIndices()
@staticmethod
def count(index: str):
assert index.endswith("search_products_tenant_162")
return {"count": 123}
monkeypatch.setattr(search_app, "get_es_client", lambda: SimpleNamespace(client=_FakeClient()))
response = search_client.get("/admin/stats", headers={"X-Tenant-ID": "162"})
assert response.status_code == 200
data = response.json()
assert data["tenant_id"] == "162"
assert data["index_name"].endswith("search_products_tenant_162")
assert data["document_count"] == 123
assert data["size_mb"] == 2.0
|
7299bae6
tangwang
tests
|
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
class _FakeBulkService:
def bulk_index(self, tenant_id: str, recreate_index: bool, batch_size: int):
return {
"tenant_id": tenant_id,
"recreate_index": recreate_index,
"batch_size": batch_size,
"success": True,
}
class _FakeTransformer:
def transform_spu_to_doc(self, tenant_id: str, spu_row, skus, options):
return {
"tenant_id": tenant_id,
"spu_id": str(spu_row.get("id", "0")),
"title": {"zh": str(spu_row.get("title", ""))},
}
|
2e3670ab
tangwang
index services
|
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
class _FakeDbConnection:
"""Minimal fake for indexer health check: connect().execute(text('SELECT 1'))."""
def __enter__(self):
return self
def __exit__(self, *args):
pass
def execute(self, stmt):
pass
class _FakeDbEngine:
def connect(self):
return _FakeDbConnection()
|
7299bae6
tangwang
tests
|
181
|
class _FakeIncrementalService:
|
2e3670ab
tangwang
index services
|
182
183
184
185
|
def __init__(self):
self.db_engine = _FakeDbEngine()
self.category_id_to_name = {}
|
7299bae6
tangwang
tests
|
186
|
def index_spus_to_es(self, es_client, tenant_id: str, spu_ids: List[str], delete_spu_ids=None):
|
2e3670ab
tangwang
index services
|
187
|
out = {
|
7299bae6
tangwang
tests
|
188
189
190
191
192
193
194
|
"tenant_id": tenant_id,
"spu_ids": [{"spu_id": s, "status": "indexed"} for s in spu_ids],
"delete_spu_ids": [],
"total": len(spu_ids),
"success_count": len(spu_ids),
"failed_count": 0,
}
|
2e3670ab
tangwang
index services
|
195
196
197
198
199
200
201
202
203
204
205
206
|
if delete_spu_ids:
out["delete_spu_ids"] = [{"spu_id": s, "status": "deleted"} for s in delete_spu_ids]
out["total"] += len(delete_spu_ids)
out["success_count"] += len(delete_spu_ids)
return out
def get_spu_document(self, tenant_id: str, spu_id: str):
return {
"tenant_id": tenant_id,
"spu_id": spu_id,
"title": {"zh": "Fake doc"},
}
|
7299bae6
tangwang
tests
|
207
208
209
210
|
def _get_transformer_bundle(self, tenant_id: str):
return _FakeTransformer(), None, False
|
2e3670ab
tangwang
index services
|
211
212
213
214
215
216
217
218
219
220
221
|
def _load_spus_for_spu_ids(self, tenant_id: str, spu_ids: List[str], include_deleted: bool = False):
if not spu_ids:
return pd.DataFrame()
return pd.DataFrame([{"id": int(s), "title": "Fake", "tenant_id": tenant_id} for s in spu_ids])
def _load_skus_for_spu_ids(self, tenant_id: str, spu_ids: List[str]):
return pd.DataFrame()
def _load_options_for_spu_ids(self, tenant_id: str, spu_ids: List[str]):
return pd.DataFrame()
|
7299bae6
tangwang
tests
|
222
223
224
225
226
227
|
@pytest.fixture
def indexer_client(monkeypatch):
import api.indexer_app as indexer_app
import api.routes.indexer as indexer_routes
|
ed948666
tangwang
tidy
|
228
|
indexer_app.app.router.on_startup.clear()
|
7299bae6
tangwang
tests
|
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
|
monkeypatch.setattr(indexer_app, "init_indexer_service", lambda es_host="": None)
monkeypatch.setattr(indexer_routes, "get_bulk_indexing_service", lambda: _FakeBulkService())
monkeypatch.setattr(indexer_routes, "get_incremental_service", lambda: _FakeIncrementalService())
monkeypatch.setattr(indexer_routes, "get_es_client", lambda: object())
with TestClient(indexer_app.app) as client:
yield client
def test_indexer_reindex_contract(indexer_client: TestClient):
response = indexer_client.post(
"/indexer/reindex",
json={"tenant_id": "162", "batch_size": 100},
)
assert response.status_code == 200
assert response.json()["success"] is True
def test_indexer_incremental_contract(indexer_client: TestClient):
response = indexer_client.post(
"/indexer/index",
json={"tenant_id": "162", "spu_ids": ["1001", "1002"]},
)
assert response.status_code == 200
data = response.json()
assert data["success_count"] == 2
def test_indexer_build_docs_contract(indexer_client: TestClient):
response = indexer_client.post(
"/indexer/build-docs",
json={
"tenant_id": "162",
"items": [{"spu": {"id": 1, "title": "T-shirt"}, "skus": [], "options": []}],
},
)
assert response.status_code == 200
data = response.json()
assert data["success_count"] == 1
assert data["docs"][0]["spu_id"] == "1"
|
e7a2c0b7
tangwang
img encode
|
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
|
def test_indexer_build_docs_show_request_response(indexer_client: TestClient):
"""
调用「输入 SPU 详情、输出 ES doc」的 build-docs 接口,并打印完整请求与响应,
便于核对返回的 ES 文档字段是否齐全。
运行: pytest tests/ci/test_service_api_contracts.py::test_indexer_build_docs_show_request_response -v -s
"""
request_body = {
"tenant_id": "162",
"items": [
{
"spu": {"id": 1, "title": "T-shirt", "brief": "A simple T-shirt"},
"skus": [],
"options": [],
}
],
}
print("\n" + "=" * 60)
print("【请求】POST /indexer/build-docs")
print("=" * 60)
print(json.dumps(request_body, ensure_ascii=False, indent=2))
response = indexer_client.post("/indexer/build-docs", json=request_body)
data = response.json()
print("\n" + "=" * 60)
print("【响应】status_code =", response.status_code)
print("=" * 60)
print(json.dumps(data, ensure_ascii=False, indent=2, default=str))
if data.get("docs"):
doc = data["docs"][0]
doc_keys = sorted(doc.keys())
print("\n" + "=" * 60)
print("【返回 doc 顶层字段】共 {} 个".format(len(doc_keys)))
print("=" * 60)
for k in doc_keys:
print(" ", k)
# 与 ES mapping 顶层字段对比
mapping_path = Path(__file__).resolve().parents[2] / "mappings" / "search_products.json"
if mapping_path.exists():
with open(mapping_path) as f:
mapping = json.load(f)
expected_top_level = set(mapping.get("mappings", {}).get("properties", {}).keys())
returned = set(doc_keys)
missing = expected_top_level - returned
extra = returned - expected_top_level
print("\n" + "=" * 60)
print("【与 mappings/search_products.json 对比】")
print("=" * 60)
print(" mapping 中应有、当前 doc 未返回:", sorted(missing) if missing else "(无)")
print(" 当前 doc 多出:", sorted(extra) if extra else "(无)")
assert response.status_code == 200
assert data["success_count"] == 1
assert data["docs"][0]["spu_id"] == "1"
|
2e3670ab
tangwang
index services
|
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
|
def test_indexer_build_docs_from_db_contract(indexer_client: TestClient):
"""POST /indexer/build-docs-from-db: tenant_id + spu_ids, returns same shape as build-docs."""
response = indexer_client.post(
"/indexer/build-docs-from-db",
json={"tenant_id": "162", "spu_ids": ["1001", "1002"]},
)
assert response.status_code == 200
data = response.json()
assert data["tenant_id"] == "162"
assert "docs" in data
assert data["success_count"] == 2
assert len(data["docs"]) == 2
assert data["docs"][0]["spu_id"] == "1001"
def test_indexer_documents_contract(indexer_client: TestClient):
"""POST /indexer/documents: tenant_id + spu_ids, returns success/failed lists (no ES write)."""
response = indexer_client.post(
"/indexer/documents",
json={"tenant_id": "162", "spu_ids": ["1001", "1002"]},
)
assert response.status_code == 200
data = response.json()
assert "success" in data and "failed" in data
assert data["total"] == 2
assert data["success_count"] == 2
assert data["failed_count"] == 0
assert len(data["success"]) == 2
assert data["success"][0]["spu_id"] == "1001"
assert "document" in data["success"][0]
assert data["success"][0]["document"]["title"]["zh"] == "Fake doc"
def test_indexer_health_contract(indexer_client: TestClient):
"""GET /indexer/health: returns status and database/preloaded_data."""
response = indexer_client.get("/indexer/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert data["status"] in ("available", "unavailable", "error")
assert "database" in data or "message" in data
if "preloaded_data" in data:
assert "category_mappings" in data["preloaded_data"]
def test_indexer_incremental_with_delete_spu_ids(indexer_client: TestClient):
"""POST /indexer/index with delete_spu_ids: explicit delete path."""
response = indexer_client.post(
"/indexer/index",
json={
"tenant_id": "162",
"spu_ids": ["1001"],
"delete_spu_ids": ["2001", "2002"],
},
)
assert response.status_code == 200
data = response.json()
assert data["success_count"] == 3
assert len(data["spu_ids"]) == 1
assert len(data["delete_spu_ids"]) == 2
assert data["delete_spu_ids"][0]["status"] == "deleted"
def test_indexer_index_validation_both_empty(indexer_client: TestClient):
"""POST /indexer/index: 400 when spu_ids and delete_spu_ids both empty."""
response = indexer_client.post(
"/indexer/index",
json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": []},
)
assert response.status_code == 400
def test_indexer_index_validation_max_spu_ids(indexer_client: TestClient):
"""POST /indexer/index: 400 when spu_ids > 100."""
response = indexer_client.post(
"/indexer/index",
json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)], "delete_spu_ids": []},
)
assert response.status_code == 400
def test_indexer_build_docs_validation_empty_items(indexer_client: TestClient):
"""POST /indexer/build-docs: 400 when items empty."""
response = indexer_client.post(
"/indexer/build-docs",
json={"tenant_id": "162", "items": []},
)
assert response.status_code == 400
def test_indexer_documents_validation_empty_spu_ids(indexer_client: TestClient):
"""POST /indexer/documents: 400 when spu_ids empty."""
response = indexer_client.post(
"/indexer/documents",
json={"tenant_id": "162", "spu_ids": []},
)
assert response.status_code == 400
def test_indexer_build_docs_from_db_validation_empty_spu_ids(indexer_client: TestClient):
"""POST /indexer/build-docs-from-db: 400 when spu_ids empty."""
response = indexer_client.post(
"/indexer/build-docs-from-db",
json={"tenant_id": "162", "spu_ids": []},
)
assert response.status_code == 400
def test_indexer_build_docs_validation_max_items(indexer_client: TestClient):
"""POST /indexer/build-docs: 400 when items > 200."""
response = indexer_client.post(
"/indexer/build-docs",
json={
"tenant_id": "162",
"items": [{"spu": {"id": i, "title": "x"}, "skus": [], "options": []} for i in range(201)],
},
)
assert response.status_code == 400
def test_indexer_build_docs_from_db_validation_max_spu_ids(indexer_client: TestClient):
"""POST /indexer/build-docs-from-db: 400 when spu_ids > 200."""
response = indexer_client.post(
"/indexer/build-docs-from-db",
json={"tenant_id": "162", "spu_ids": [str(i) for i in range(201)]},
)
assert response.status_code == 400
def test_indexer_documents_validation_max_spu_ids(indexer_client: TestClient):
"""POST /indexer/documents: 400 when spu_ids > 100."""
response = indexer_client.post(
"/indexer/documents",
json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)]},
)
assert response.status_code == 400
def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient):
"""POST /indexer/index: 400 when delete_spu_ids > 100."""
response = indexer_client.post(
"/indexer/index",
json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": [str(i) for i in range(101)]},
)
assert response.status_code == 400
|
7299bae6
tangwang
tests
|
476
|
class _FakeTextModel:
|
950a640e
tangwang
embeddings
|
477
|
def encode_batch(self, texts, batch_size=32, device="cpu", normalize_embeddings=True):
|
7299bae6
tangwang
tests
|
478
479
480
481
482
483
484
485
486
|
return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts]
class _FakeImageModel:
def encode_image_urls(self, urls, batch_size=8):
return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls]
@pytest.fixture
|
950a640e
tangwang
embeddings
|
487
|
def embedding_module():
|
7299bae6
tangwang
tests
|
488
489
490
491
492
|
import embeddings.server as emb_server
emb_server.app.router.on_startup.clear()
emb_server._text_model = _FakeTextModel()
emb_server._image_model = _FakeImageModel()
|
950a640e
tangwang
embeddings
|
493
|
yield emb_server
|
7299bae6
tangwang
tests
|
494
495
|
|
950a640e
tangwang
embeddings
|
496
497
|
def test_embedding_text_contract(embedding_module):
data = embedding_module.embed_text(["hello", "world"])
|
7299bae6
tangwang
tests
|
498
499
500
501
|
assert len(data) == 2
assert len(data[0]) == 3
|
950a640e
tangwang
embeddings
|
502
503
504
|
def test_embedding_image_contract(embedding_module):
data = embedding_module.embed_image(["https://example.com/a.jpg"])
assert len(data[0]) == 3
|
7299bae6
tangwang
tests
|
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
|
class _FakeTranslator:
model = "qwen"
use_cache = True
def translate(self, text: str, target_lang: str, source_lang: str | None = None, prompt: str | None = None):
return f"{text}-{target_lang}"
@pytest.fixture
def translator_client(monkeypatch):
import api.translator_app as translator_app
translator_app.app.router.on_startup.clear()
monkeypatch.setattr(translator_app, "get_translator", lambda model="qwen": _FakeTranslator())
with TestClient(translator_app.app) as client:
yield client
def test_translator_api_contract(translator_client: TestClient):
response = translator_client.post(
"/translate",
json={"text": "商品名称", "target_lang": "en", "source_lang": "zh"},
)
assert response.status_code == 200
assert response.json()["translated_text"] == "商品名称-en"
def test_translator_health_contract(translator_client: TestClient):
response = translator_client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
class _FakeReranker:
_model_name = "fake-reranker"
def score_with_meta(self, query: str, docs: List[str], normalize: bool = True):
scores = [float(i + 1) for i in range(len(docs))]
meta: Dict[str, Any] = {"input_docs": len(docs), "unique_docs": len(set(docs))}
return scores, meta
@pytest.fixture
def reranker_client():
import reranker.server as reranker_server
reranker_server.app.router.on_startup.clear()
reranker_server._reranker = _FakeReranker()
reranker_server._backend_name = "fake"
with TestClient(reranker_server.app) as client:
yield client
def test_reranker_api_contract(reranker_client: TestClient):
response = reranker_client.post(
"/rerank",
json={"query": "wireless mouse", "docs": ["doc-a", "doc-b"]},
)
assert response.status_code == 200
data = response.json()
assert data["scores"] == [1.0, 2.0]
assert data["meta"]["input_docs"] == 2
def test_reranker_health_contract(reranker_client: TestClient):
response = reranker_client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"
|