from __future__ import annotations import json from pathlib import Path from types import SimpleNamespace from typing import Any, Dict, List import numpy as np import pandas as pd import pytest from fastapi.testclient import TestClient from translation.scenes import normalize_scene_name class _FakeSearcher: def search(self, **kwargs): return SimpleNamespace( results=[ { "spu_id": "spu-1", "title": "测试商品", "price": 99.0, "currency": "USD", "in_stock": True, "skus": [], "relevance_score": 1.2, } ], total=1, max_score=1.2, took_ms=8, facets=[], query_info={"normalized_query": kwargs.get("query", "")}, suggestions=[], related_searches=[], debug_info=None, ) def search_by_image(self, **kwargs): return self.search(**kwargs) class _FakeSuggestionService: def search(self, **kwargs): return { "query": kwargs["query"], "language": kwargs.get("language", "en"), "resolved_language": kwargs.get("language", "en"), "suggestions": [{"text": "iphone 15", "score": 1.0}], "took_ms": 3, } @pytest.fixture def search_client(monkeypatch): import api.app as search_app monkeypatch.setattr(search_app, "init_service", lambda es_host="": None) monkeypatch.setattr(search_app, "get_searcher", lambda: _FakeSearcher()) monkeypatch.setattr(search_app, "get_suggestion_service", lambda: _FakeSuggestionService()) with TestClient(search_app.app) as client: yield client def test_search_api_contract(search_client: TestClient): response = search_client.post( "/search/", headers={"X-Tenant-ID": "162"}, json={"query": "toy", "size": 5}, ) assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert data["results"][0]["spu_id"] == "spu-1" def test_image_search_api_contract(search_client: TestClient): response = search_client.post( "/search/image", headers={"X-Tenant-ID": "162"}, json={"image_url": "https://example.com/a.jpg", "size": 3}, ) assert response.status_code == 200 assert response.json()["results"][0]["spu_id"] == "spu-1" def test_suggestion_api_contract(search_client: TestClient): response = search_client.get( "/search/suggestions?q=iph&size=5&language=en", headers={"X-Tenant-ID": "162"}, ) assert response.status_code == 200 data = response.json() assert data["query"] == "iph" assert len(data["suggestions"]) == 1 def test_instant_search_not_implemented(search_client: TestClient): response = search_client.get("/search/instant?q=iph&size=5") assert response.status_code == 501 def test_admin_stats_contract(search_client: TestClient, monkeypatch): import api.app as search_app class _FakeIndices: @staticmethod def exists(index: str) -> bool: return index.endswith("search_products_tenant_162") @staticmethod def stats(index: str): return { "indices": { index: { "total": { "store": { "size_in_bytes": 2 * 1024 * 1024, } } } } } class _FakeClient: indices = _FakeIndices() @staticmethod def count(index: str): assert index.endswith("search_products_tenant_162") return {"count": 123} monkeypatch.setattr(search_app, "get_es_client", lambda: SimpleNamespace(client=_FakeClient())) response = search_client.get("/admin/stats", headers={"X-Tenant-ID": "162"}) assert response.status_code == 200 data = response.json() assert data["tenant_id"] == "162" assert data["index_name"].endswith("search_products_tenant_162") assert data["document_count"] == 123 assert data["size_mb"] == 2.0 class _FakeBulkService: def bulk_index(self, tenant_id: str, recreate_index: bool, batch_size: int): return { "tenant_id": tenant_id, "recreate_index": recreate_index, "batch_size": batch_size, "success": True, } class _FakeTransformer: def transform_spu_to_doc(self, tenant_id: str, spu_row, skus, options, **kwargs): return { "tenant_id": tenant_id, "spu_id": str(spu_row.get("id", "0")), "title": {"zh": str(spu_row.get("title", ""))}, } class _FakeDbConnection: """Minimal fake for indexer health check: connect().execute(text('SELECT 1')).""" def __enter__(self): return self def __exit__(self, *args): pass def execute(self, stmt): pass class _FakeDbEngine: def connect(self): return _FakeDbConnection() class _FakeIncrementalService: def __init__(self): self.db_engine = _FakeDbEngine() self.category_id_to_name = {} def index_spus_to_es(self, es_client, tenant_id: str, spu_ids: List[str], delete_spu_ids=None): out = { "tenant_id": tenant_id, "spu_ids": [{"spu_id": s, "status": "indexed"} for s in spu_ids], "delete_spu_ids": [], "total": len(spu_ids), "success_count": len(spu_ids), "failed_count": 0, } if delete_spu_ids: out["delete_spu_ids"] = [{"spu_id": s, "status": "deleted"} for s in delete_spu_ids] out["total"] += len(delete_spu_ids) out["success_count"] += len(delete_spu_ids) return out def get_spu_document(self, tenant_id: str, spu_id: str): return { "tenant_id": tenant_id, "spu_id": spu_id, "title": {"zh": "Fake doc"}, } def _get_transformer_bundle(self, tenant_id: str): return _FakeTransformer(), None, False def _load_spus_for_spu_ids(self, tenant_id: str, spu_ids: List[str], include_deleted: bool = False): if not spu_ids: return pd.DataFrame() return pd.DataFrame([{"id": int(s), "title": "Fake", "tenant_id": tenant_id} for s in spu_ids]) def _load_skus_for_spu_ids(self, tenant_id: str, spu_ids: List[str]): return pd.DataFrame() def _load_options_for_spu_ids(self, tenant_id: str, spu_ids: List[str]): return pd.DataFrame() @pytest.fixture def indexer_client(monkeypatch): import api.indexer_app as indexer_app import api.routes.indexer as indexer_routes indexer_app.app.router.on_startup.clear() monkeypatch.setattr(indexer_app, "init_indexer_service", lambda es_host="": None) monkeypatch.setattr(indexer_routes, "get_bulk_indexing_service", lambda: _FakeBulkService()) monkeypatch.setattr(indexer_routes, "get_incremental_service", lambda: _FakeIncrementalService()) monkeypatch.setattr(indexer_routes, "get_es_client", lambda: object()) with TestClient(indexer_app.app) as client: yield client def test_indexer_reindex_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/reindex", json={"tenant_id": "162", "batch_size": 100}, ) assert response.status_code == 200 assert response.json()["success"] is True def test_indexer_incremental_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 2 def test_indexer_build_docs_contract(indexer_client: TestClient): response = indexer_client.post( "/indexer/build-docs", json={ "tenant_id": "162", "items": [{"spu": {"id": 1, "title": "T-shirt"}, "skus": [], "options": []}], }, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 1 assert data["docs"][0]["spu_id"] == "1" def test_indexer_build_docs_show_request_response(indexer_client: TestClient): """ 调用「输入 SPU 详情、输出 ES doc」的 build-docs 接口,并打印完整请求与响应, 便于核对返回的 ES 文档字段是否齐全。 运行: pytest tests/ci/test_service_api_contracts.py::test_indexer_build_docs_show_request_response -v -s """ request_body = { "tenant_id": "162", "items": [ { "spu": {"id": 1, "title": "T-shirt", "brief": "A simple T-shirt"}, "skus": [], "options": [], } ], } print("\n" + "=" * 60) print("【请求】POST /indexer/build-docs") print("=" * 60) print(json.dumps(request_body, ensure_ascii=False, indent=2)) response = indexer_client.post("/indexer/build-docs", json=request_body) data = response.json() print("\n" + "=" * 60) print("【响应】status_code =", response.status_code) print("=" * 60) print(json.dumps(data, ensure_ascii=False, indent=2, default=str)) if data.get("docs"): doc = data["docs"][0] doc_keys = sorted(doc.keys()) print("\n" + "=" * 60) print("【返回 doc 顶层字段】共 {} 个".format(len(doc_keys))) print("=" * 60) for k in doc_keys: print(" ", k) # 与 ES mapping 顶层字段对比 mapping_path = Path(__file__).resolve().parents[2] / "mappings" / "search_products.json" if mapping_path.exists(): with open(mapping_path) as f: mapping = json.load(f) expected_top_level = set(mapping.get("mappings", {}).get("properties", {}).keys()) returned = set(doc_keys) missing = expected_top_level - returned extra = returned - expected_top_level print("\n" + "=" * 60) print("【与 mappings/search_products.json 对比】") print("=" * 60) print(" mapping 中应有、当前 doc 未返回:", sorted(missing) if missing else "(无)") print(" 当前 doc 多出:", sorted(extra) if extra else "(无)") assert response.status_code == 200 assert data["success_count"] == 1 assert data["docs"][0]["spu_id"] == "1" def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): """POST /indexer/build-docs-from-db: tenant_id + spu_ids, returns same shape as build-docs.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert data["tenant_id"] == "162" assert "docs" in data assert data["success_count"] == 2 assert len(data["docs"]) == 2 assert data["docs"][0]["spu_id"] == "1001" def test_indexer_enrich_content_contract(indexer_client: TestClient, monkeypatch): import indexer.product_enrich as process_products def _fake_analyze_products( products: List[Dict[str, str]], target_lang: str = "zh", batch_size: int | None = None, tenant_id: str | None = None, ): assert batch_size == 20 return [ { "id": p["id"], "lang": target_lang, "title_input": p["title"], "title": p["title"], "category_path": "", "tags": "tag1,tag2", "target_audience": "", "usage_scene": "", "season": "", "key_attributes": "", "material": "", "features": "", "anchor_text": f"{target_lang}-anchor-{p['id']}", } for p in products ] monkeypatch.setattr(process_products, "analyze_products", _fake_analyze_products) response = indexer_client.post( "/indexer/enrich-content", json={ "tenant_id": "162", "items": [ {"spu_id": "1001", "title": "T-shirt"}, {"spu_id": "1002", "title": "Toy"}, ], "languages": ["zh", "en"], }, ) assert response.status_code == 200 data = response.json() assert data["tenant_id"] == "162" assert data["total"] == 2 assert len(data["results"]) == 2 assert data["results"][0]["spu_id"] == "1001" assert data["results"][0]["qanchors"]["zh"] == "zh-anchor-1001" assert data["results"][0]["qanchors"]["en"] == "en-anchor-1001" assert "tag1" in data["results"][0]["tags"] def test_indexer_documents_contract(indexer_client: TestClient): """POST /indexer/documents: tenant_id + spu_ids, returns success/failed lists (no ES write).""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": ["1001", "1002"]}, ) assert response.status_code == 200 data = response.json() assert "success" in data and "failed" in data assert data["total"] == 2 assert data["success_count"] == 2 assert data["failed_count"] == 0 assert len(data["success"]) == 2 assert data["success"][0]["spu_id"] == "1001" assert "document" in data["success"][0] assert data["success"][0]["document"]["title"]["zh"] == "Fake doc" def test_indexer_health_contract(indexer_client: TestClient): """GET /indexer/health: returns status and database/preloaded_data.""" response = indexer_client.get("/indexer/health") assert response.status_code == 200 data = response.json() assert "status" in data assert data["status"] in ("available", "unavailable", "error") assert "database" in data or "message" in data if "preloaded_data" in data: assert "category_mappings" in data["preloaded_data"] def test_indexer_incremental_with_delete_spu_ids(indexer_client: TestClient): """POST /indexer/index with delete_spu_ids: explicit delete path.""" response = indexer_client.post( "/indexer/index", json={ "tenant_id": "162", "spu_ids": ["1001"], "delete_spu_ids": ["2001", "2002"], }, ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 3 assert len(data["spu_ids"]) == 1 assert len(data["delete_spu_ids"]) == 2 assert data["delete_spu_ids"][0]["status"] == "deleted" def test_indexer_index_validation_both_empty(indexer_client: TestClient): """POST /indexer/index: 400 when spu_ids and delete_spu_ids both empty.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": []}, ) assert response.status_code == 400 def test_indexer_index_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/index: 400 when spu_ids > 100.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)], "delete_spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_validation_empty_items(indexer_client: TestClient): """POST /indexer/build-docs: 400 when items empty.""" response = indexer_client.post( "/indexer/build-docs", json={"tenant_id": "162", "items": []}, ) assert response.status_code == 400 def test_indexer_documents_validation_empty_spu_ids(indexer_client: TestClient): """POST /indexer/documents: 400 when spu_ids empty.""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_from_db_validation_empty_spu_ids(indexer_client: TestClient): """POST /indexer/build-docs-from-db: 400 when spu_ids empty.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": []}, ) assert response.status_code == 400 def test_indexer_build_docs_validation_max_items(indexer_client: TestClient): """POST /indexer/build-docs: 400 when items > 200.""" response = indexer_client.post( "/indexer/build-docs", json={ "tenant_id": "162", "items": [{"spu": {"id": i, "title": "x"}, "skus": [], "options": []} for i in range(201)], }, ) assert response.status_code == 400 def test_indexer_build_docs_from_db_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/build-docs-from-db: 400 when spu_ids > 200.""" response = indexer_client.post( "/indexer/build-docs-from-db", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(201)]}, ) assert response.status_code == 400 def test_indexer_enrich_content_validation_max_items(indexer_client: TestClient): response = indexer_client.post( "/indexer/enrich-content", json={ "tenant_id": "162", "items": [{"spu_id": str(i), "title": "x"} for i in range(51)], "languages": ["zh"], }, ) assert response.status_code == 400 def test_indexer_documents_validation_max_spu_ids(indexer_client: TestClient): """POST /indexer/documents: 400 when spu_ids > 100.""" response = indexer_client.post( "/indexer/documents", json={"tenant_id": "162", "spu_ids": [str(i) for i in range(101)]}, ) assert response.status_code == 400 def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient): """POST /indexer/index: 400 when delete_spu_ids > 100.""" response = indexer_client.post( "/indexer/index", json={"tenant_id": "162", "spu_ids": [], "delete_spu_ids": [str(i) for i in range(101)]}, ) assert response.status_code == 400 class _FakeTextModel: def encode_batch(self, texts, batch_size=32, device="cpu", normalize_embeddings=True): return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts] class _FakeImageModel: def encode_image_urls(self, urls, batch_size=8, normalize_embeddings=True): return [np.array([0.3, 0.2, 0.1], dtype=np.float32) for _ in urls] @pytest.fixture def embedding_module(): import embeddings.server as emb_server emb_server.app.router.on_startup.clear() emb_server._text_model = _FakeTextModel() emb_server._image_model = _FakeImageModel() yield emb_server def test_embedding_text_contract(embedding_module): data = embedding_module.embed_text(["hello", "world"]) assert len(data) == 2 assert len(data[0]) == 3 def test_embedding_image_contract(embedding_module): data = embedding_module.embed_image(["https://example.com/a.jpg"]) assert len(data[0]) == 3 class _FakeTranslator: model = "qwen-mt" supports_batch = True def translate( self, text: str | List[str], target_lang: str, source_lang: str | None = None, scene: str | None = None, ): del source_lang, scene if isinstance(text, list): return [f"{item}-{target_lang}" for item in text] return f"{text}-{target_lang}" class _FailingTranslator: model = "qwen-mt" supports_batch = True def translate( self, text: str | List[str], target_lang: str, source_lang: str | None = None, scene: str | None = None, ): del text, target_lang, source_lang, scene return None @pytest.fixture def translator_client(monkeypatch): import api.translator_app as translator_app translator_app.app.router.on_startup.clear() class _FakeService: def __init__(self, translator): self._translator = translator self.config = { "default_model": "qwen-mt", "default_scene": "general", "capabilities": { "qwen-mt": { "enabled": True, "backend": "qwen_mt", "model": "qwen-mt-flash", "base_url": "https://example.com", "timeout_sec": 10.0, "use_cache": True, } }, "cache": { "enabled": True, "key_prefix": "trans:v2", "ttl_seconds": 60, "sliding_expiration": True, "key_include_scene": True, "key_include_source_lang": True, }, } self.available_models = ["qwen-mt"] self.loaded_models = ["qwen-mt"] def get_backend(self, model=None): del model return self._translator def translate(self, **kwargs): kwargs.pop("model", None) return self._translator.translate(**kwargs) monkeypatch.setattr(translator_app, "get_translation_service", lambda: _FakeService(_FakeTranslator())) with TestClient(translator_app.app) as client: yield client def test_translator_api_contract(translator_client: TestClient): response = translator_client.post( "/translate", json={"text": "商品名称", "target_lang": "en", "source_lang": "zh"}, ) assert response.status_code == 200 assert response.json()["translated_text"] == "商品名称-en" def test_translator_api_failure_returns_500(monkeypatch): import api.translator_app as translator_app translator_app.app.router.on_startup.clear() class _FakeService: def __init__(self, translator): self._translator = translator self.config = { "default_model": "qwen-mt", "default_scene": "general", "capabilities": { "qwen-mt": { "enabled": True, "backend": "qwen_mt", "model": "qwen-mt-flash", "base_url": "https://example.com", "timeout_sec": 10.0, "use_cache": True, } }, "cache": { "enabled": True, "key_prefix": "trans:v2", "ttl_seconds": 60, "sliding_expiration": True, "key_include_scene": True, "key_include_source_lang": True, }, } self.available_models = ["qwen-mt"] self.loaded_models = ["qwen-mt"] def get_backend(self, model=None): del model return self._translator def translate(self, **kwargs): kwargs.pop("model", None) return self._translator.translate(**kwargs) monkeypatch.setattr(translator_app, "get_translation_service", lambda: _FakeService(_FailingTranslator())) with TestClient(translator_app.app) as client: response = client.post( "/translate", json={"text": "商品名称", "target_lang": "en", "source_lang": "zh"}, ) assert response.status_code == 500 assert response.json()["detail"] == "Translation failed" def test_translator_health_contract(translator_client: TestClient): response = translator_client.get("/health") assert response.status_code == 200 assert response.json()["status"] == "healthy" assert response.json()["loaded_models"] == ["qwen-mt"] class _FakeReranker: _model_name = "fake-reranker" def score_with_meta(self, query: str, docs: List[str], normalize: bool = True): scores = [float(i + 1) for i in range(len(docs))] meta: Dict[str, Any] = {"input_docs": len(docs), "unique_docs": len(set(docs))} return scores, meta @pytest.fixture def reranker_client(): import reranker.server as reranker_server reranker_server.app.router.on_startup.clear() reranker_server._reranker = _FakeReranker() reranker_server._backend_name = "fake" with TestClient(reranker_server.app) as client: yield client def test_reranker_api_contract(reranker_client: TestClient): response = reranker_client.post( "/rerank", json={"query": "wireless mouse", "docs": ["doc-a", "doc-b"]}, ) assert response.status_code == 200 data = response.json() assert data["scores"] == [1.0, 2.0] assert data["meta"]["input_docs"] == 2 def test_reranker_health_contract(reranker_client: TestClient): response = reranker_client.get("/health") assert response.status_code == 200 assert response.json()["status"] == "ok"