from __future__ import annotations from types import SimpleNamespace from typing import Any, Dict, List import numpy as np import pandas as pd import pytest from fastapi.testclient import TestClient class _FakeSearcher: def search(self, **kwargs): return SimpleNamespace( results=[ { "spu_id": "spu-1", "title": "测试商品", "price": 99.0, "currency": "USD", "in_stock": True, "skus": [], "relevance_score": 1.2, } ], total=1, max_score=1.2, took_ms=8, facets=[], query_info={"normalized_query": kwargs.get("query", "")}, suggestions=[], related_searches=[], debug_info=None, ) def search_by_image(self, **kwargs): return self.search(**kwargs) class _FakeSuggestionService: def search(self, **kwargs): return { "query": kwargs["query"], "language": kwargs.get("language", "en"), "resolved_language": kwargs.get("language", "en"), "suggestions": [{"text": "iphone 15", "score": 1.0}], "took_ms": 3, } @pytest.fixture def search_client(monkeypatch): import api.app as search_app monkeypatch.setattr(search_app, "init_service", lambda es_host="": None) monkeypatch.setattr(search_app, "get_searcher", lambda: _FakeSearcher()) monkeypatch.setattr(search_app, "get_suggestion_service", lambda: _FakeSuggestionService()) with TestClient(search_app.app) as client: yield client def test_search_api_contract(search_client: TestClient): response = search_client.post( "/search/", headers={"X-Tenant-ID": "162"}, json={"query": "toy", "size": 5}, ) assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert data["results"][0]["spu_id"] == "spu-1" def test_image_search_api_contract(search_client: TestClient): response = search_client.post( "/search/image", headers={"X-Tenant-ID": "162"}, json={"image_url": "https://example.com/a.jpg", "size": 3}, ) assert response.status_code == 200 assert response.json()["results"][0]["spu_id"] == "spu-1" def test_suggestion_api_contract(search_client: TestClient): response = search_client.get( "/search/suggestions?q=iph&size=5&language=en", headers={"X-Tenant-ID": "162"}, ) assert response.status_code == 200 data = response.json() assert data["query"] == "iph" assert len(data["suggestions"]) == 1 class _FakeBulkService: def bulk_index(self, tenant_id: str, recreate_index: bool, batch_size: int): return { "tenant_id": tenant_id, "recreate_index": recreate_index, "batch_size": batch_size, "success": True, } class _FakeTransformer: def transform_spu_to_doc(self, tenant_id: str, spu_row, skus, options): return { "tenant_id": tenant_id, "spu_id": str(spu_row.get("id", "0")), "title": {"zh": str(spu_row.get("title", ""))}, } class _FakeDbConnection: """Minimal fake for indexer health check: connect().execute(text('SELECT 1')).""" def __enter__(self): return self def __exit__(self, *args): pass def execute(self, stmt): pass class _FakeDbEngine: def connect(self): return _FakeDbConnection() class _FakeIncrementalService: def __init__(self): self.db_engine = _FakeDbEngine() self.category_id_to_name = {} def index_spus_to_es(self, es_client, tenant_id: str, spu_ids: List[str], delete_spu_ids=None): out = { "tenant_id": tenant_id, "spu_ids": [{"spu_id": s, "status": "indexed"} for s in spu_ids], "delete_spu_ids": [], "total": len(spu_ids), "success_count": len(spu_ids), "failed_count": 0, } if delete_spu_ids: out["delete_spu_ids"] = [{"spu_id": s, "status": "deleted"} for s in delete_spu_ids] out["total"] += len(delete_spu_ids) out["success_count"] += len(delete_spu_ids) return out def get_spu_document(self, tenant_id: str, spu_id: str): return { "tenant_id": tenant_id, "spu_id": spu_id, "title": {"zh": "Fake doc"}, } def _get_transformer_bundle(self, tenant_id: str): return _FakeTransformer(), None, False def _load_spus_for_spu_ids(self, tenant_id: str, spu_ids: List[str], include_deleted: bool = False): if not spu_ids: return pd.DataFrame() return pd.DataFrame([{"id": int(s), "title": "Fake", "tenant_id": tenant_id} for s in spu_ids]) def _load_skus_for_spu_ids(self, tenant_id: str, spu_ids: List[str]): return pd.DataFrame() def _load_options_for_spu_ids(self, tenant_id: str, spu_ids: List[str]): return pd.DataFrame() @pytest.fixture def indexer_client(monkeypatch): import api.indexer_app as indexer_app import api.routes.indexer as indexer_routes monkeypatch.setattr(indexer_app, "init_indexer_service", lambda es_host="": None) monkeypatch.setattr(indexer_routes, "get_bulk_indexing_service", lambda: _FakeBulkService()) monkeypatch.setattr(indexer_routes, "get_incremental_service", lambda: _FakeIncrementalService()) monkeypatch.setattr(indexer_routes, "get_es_client", lambda: object()) with TestClient(indexer_app.app) as client: yield client def test_indexer_reindex_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/reindex", json={"tenant_id": "162", "batch_size": 100}, ) assert response.status_code == 200 assert response.json()["success"] is True def test_indexer_incremental_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 2 def test_indexer_build_docs_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/build-docs", json={ "tenant_id": "162", "items": [{"spu": {"id": 1, "title": "T-shirt"}, "skus": [], "options": []}], }, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 1 assert data["docs"][0]["spu_id"] == "1" def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): """POST /indexer/build-docs-from-db: tenant_id + spu_ids, returns same shape as build-docs.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert data["tenant_id"] == "162" assert "docs" in data assert data["success_count"] == 2 assert len(data["docs"]) == 2 assert data["docs"][0]["spu_id"] == "1001" def test_indexer_documents_contract(indexer_client: TestClient): """POST /indexer/documents: tenant_id + spu_ids, returns success/failed lists (no ES write).""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert "success" in data and "failed" in data assert data["total"] == 2 assert data["success_count"] == 2 assert data["failed_count"] == 0 assert len(data["success"]) == 2 assert data["success"][0]["spu_id"] == "1001" assert "document" in data["success"][0] assert data["success"][0]["document"]["title"]["zh"] == "Fake doc" def test_indexer_health_contract(indexer_client: TestClient): """GET /indexer/health: returns status and database/preloaded_data.""" response = indexer_client.get("/indexer/health") assert response.status_code == 200 data = response.json() assert "status" in data assert data["status"] in ("available", "unavailable", "error") assert "database" in data or "message" in data if "preloaded_data" in data: assert "category_mappings" in data["preloaded_data"] def test_indexer_incremental_with_delete_spu_ids(indexer_client: TestClient): """POST /indexer/index with delete_spu_ids: explicit delete path.""" response = indexer_client.post( "/indexer/index", json={ "tenant_id": "162", "spu_ids": ["1001"], "delete_spu_ids": ["2001", "2002"], }, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 3 assert len(data["spu_ids"]) == 1 assert len(data["delete_spu_ids"]) == 2 assert data["delete_spu_ids"][0]["status"] == "deleted" def test_indexer_index_validation_both_empty(indexer_client: TestClient): """POST /indexer/index: 400 when spu_ids and delete_spu_ids both empty.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": []}, ) assert response.status_code == 400 def test_indexer_index_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/index: 400 when spu_ids > 100.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)], "delete_spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_validation_empty_items(indexer_client: TestClient): """POST /indexer/build-docs: 400 when items empty.""" response = indexer_client.post( "/indexer/build-docs", json={"tenant_id": "162", "items": []}, ) assert response.status_code == 400 def test_indexer_documents_validation_empty_spu_ids(indexer_client: TestClient): """POST /indexer/documents: 400 when spu_ids empty.""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_from_db_validation_empty_spu_ids(indexer_client: TestClient): """POST /indexer/build-docs-from-db: 400 when spu_ids empty.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_validation_max_items(indexer_client: TestClient): """POST /indexer/build-docs: 400 when items > 200.""" response = indexer_client.post( "/indexer/build-docs", json={ "tenant_id": "162", "items": [{"spu": {"id": i, "title": "x"}, "skus": [], "options": []} for i in range(201)], }, ) assert response.status_code == 400 def test_indexer_build_docs_from_db_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/build-docs-from-db: 400 when spu_ids > 200.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(201)]}, ) assert response.status_code == 400 def test_indexer_documents_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/documents: 400 when spu_ids > 100.""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)]}, ) assert response.status_code == 400 def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient): """POST /indexer/index: 400 when delete_spu_ids > 100.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": [str(i) for i in range(101)]}, ) assert response.status_code == 400 class _FakeTextModel: def encode_batch(self, texts, batch_size=32, device="cpu"): return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts] class _FakeImageModel: def encode_image_urls(self, urls, batch_size=8): return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls] @pytest.fixture def embedding_client(): import embeddings.server as emb_server emb_server.app.router.on_startup.clear() emb_server._text_model = _FakeTextModel() emb_server._image_model = _FakeImageModel() with TestClient(emb_server.app) as client: yield client def test_embedding_text_contract(embedding_client: TestClient): response = embedding_client.post("/embed/text", json=["hello", "world"]) assert response.status_code == 200 data = response.json() assert len(data) == 2 assert len(data[0]) == 3 def test_embedding_image_contract(embedding_client: TestClient): response = embedding_client.post("/embed/image", json=["https://example.com/a.jpg"]) assert response.status_code == 200 assert len(response.json()[0]) == 3 class _FakeTranslator: model = "qwen" use_cache = True def translate(self, text: str, target_lang: str, source_lang: str | None = None, prompt: str | None = None): return f"{text}-{target_lang}" @pytest.fixture def translator_client(monkeypatch): import api.translator_app as translator_app translator_app.app.router.on_startup.clear() monkeypatch.setattr(translator_app, "get_translator", lambda model="qwen": _FakeTranslator()) with TestClient(translator_app.app) as client: yield client def test_translator_api_contract(translator_client: TestClient): response = translator_client.post( "/translate", json={"text": "商品名称", "target_lang": "en", "source_lang": "zh"}, ) assert response.status_code == 200 assert response.json()["translated_text"] == "商品名称-en" def test_translator_health_contract(translator_client: TestClient): response = translator_client.get("/health") assert response.status_code == 200 assert response.json()["status"] == "healthy" class _FakeReranker: _model_name = "fake-reranker" def score_with_meta(self, query: str, docs: List[str], normalize: bool = True): scores = [float(i + 1) for i in range(len(docs))] meta: Dict[str, Any] = {"input_docs": len(docs), "unique_docs": len(set(docs))} return scores, meta @pytest.fixture def reranker_client(): import reranker.server as reranker_server reranker_server.app.router.on_startup.clear() reranker_server._reranker = _FakeReranker() reranker_server._backend_name = "fake" with TestClient(reranker_server.app) as client: yield client def test_reranker_api_contract(reranker_client: TestClient): response = reranker_client.post( "/rerank", json={"query": "wireless mouse", "docs": ["doc-a", "doc-b"]}, ) assert response.status_code == 200 data = response.json() assert data["scores"] == [1.0, 2.0] assert data["meta"]["input_docs"] == 2 def test_reranker_health_contract(reranker_client: TestClient): response = reranker_client.get("/health") assert response.status_code == 200 assert response.json()["status"] == "ok"