#!/usr/bin/env python3 """ Test script for base configuration. Tests data ingestion, search API, response format, and tenant isolation. """ import sys import os import argparse import requests import json from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) def test_search_api(base_url: str, tenant_id: str, query: str = "耳机"): """ Test search API. Args: base_url: API base URL tenant_id: Tenant ID query: Search query Returns: Response JSON or None if failed """ url = f"{base_url}/search/" headers = { "X-Tenant-ID": tenant_id, "Content-Type": "application/json" } payload = { "query": query, "size": 10, "from": 0 } print(f"\nTesting search API:") print(f" URL: {url}") print(f" Query: {query}") print(f" Tenant ID: {tenant_id}") try: response = requests.post(url, json=payload, headers=headers, timeout=30) response.raise_for_status() data = response.json() print(f" Status: {response.status_code}") print(f" Total: {data.get('total', 0)}") print(f" Results: {len(data.get('results', []))}") return data except Exception as e: print(f" ERROR: {e}") return None def validate_response_format(data: dict): """ Validate response format. Args: data: Response data Returns: List of validation errors (empty if valid) """ errors = [] # Check for results field (not hits) if 'hits' in data: errors.append("Response contains 'hits' field (should be 'results')") if 'results' not in data: errors.append("Response missing 'results' field") else: results = data['results'] if not isinstance(results, list): errors.append("'results' should be a list") else: # Validate first result structure if results: result = results[0] required_fields = ['product_id', 'title', 'variants', 'relevance_score'] for field in required_fields: if field not in result: errors.append(f"Result missing required field: {field}") # Check for ES internal fields es_internal_fields = ['_id', '_score', '_source'] for field in es_internal_fields: if field in result: errors.append(f"Result contains ES internal field: {field}") # Validate variants if 'variants' in result: variants = result['variants'] if not isinstance(variants, list): errors.append("'variants' should be a list") elif variants: variant = variants[0] variant_required = ['variant_id', 'price', 'sku', 'stock'] for field in variant_required: if field not in variant: errors.append(f"Variant missing required field: {field}") # Check for suggestions and related_searches if 'suggestions' not in data: errors.append("Response missing 'suggestions' field") if 'related_searches' not in data: errors.append("Response missing 'related_searches' field") return errors def test_facets(base_url: str, tenant_id: str): """ Test facets aggregation. Args: base_url: API base URL tenant_id: Tenant ID Returns: Response JSON or None if failed """ url = f"{base_url}/search/" headers = { "X-Tenant-ID": tenant_id, "Content-Type": "application/json" } payload = { "query": "商品", "size": 10, "facets": ["category_keyword", "vendor_keyword"] } print(f"\nTesting facets:") print(f" Facets: {payload['facets']}") try: response = requests.post(url, json=payload, headers=headers, timeout=30) response.raise_for_status() data = response.json() if 'facets' in data and data['facets']: print(f" Facets returned: {len(data['facets'])}") for facet in data['facets']: print(f" - {facet.get('field')}: {len(facet.get('values', []))} values") else: print(" WARNING: No facets returned") return data except Exception as e: print(f" ERROR: {e}") return None def test_tenant_isolation(base_url: str, tenant_id_1: str, tenant_id_2: str): """ Test tenant isolation. Args: base_url: API base URL tenant_id_1: First tenant ID tenant_id_2: Second tenant ID """ print(f"\nTesting tenant isolation:") print(f" Tenant 1: {tenant_id_1}") print(f" Tenant 2: {tenant_id_2}") # Search for tenant 1 data1 = test_search_api(base_url, tenant_id_1, "商品") # Search for tenant 2 data2 = test_search_api(base_url, tenant_id_2, "商品") if data1 and data2: results1 = set(r.get('product_id') for r in data1.get('results', [])) results2 = set(r.get('product_id') for r in data2.get('results', [])) overlap = results1 & results2 if overlap: print(f" WARNING: Found {len(overlap)} overlapping results between tenants") else: print(f" OK: No overlapping results (tenant isolation working)") def main(): parser = argparse.ArgumentParser(description='Test base configuration') parser.add_argument('--api-url', default='http://localhost:8000', help='API base URL') parser.add_argument('--tenant-id', default='1', help='Tenant ID for testing') parser.add_argument('--test-tenant-2', help='Second tenant ID for isolation test') args = parser.parse_args() print("=" * 60) print("Base Configuration Test Suite") print("=" * 60) # Test 1: Basic search print("\n[Test 1] Basic Search") data = test_search_api(args.api_url, args.tenant_id) if not data: print("FAILED: Basic search test") return 1 # Test 2: Response format validation print("\n[Test 2] Response Format Validation") errors = validate_response_format(data) if errors: print("FAILED: Response format validation") for error in errors: print(f" - {error}") return 1 else: print("PASSED: Response format is correct") # Test 3: Facets print("\n[Test 3] Facets Aggregation") facet_data = test_facets(args.api_url, args.tenant_id) if not facet_data: print("WARNING: Facets test failed (may be expected if no data)") # Test 4: Tenant isolation (if second tenant provided) if args.test_tenant_2: print("\n[Test 4] Tenant Isolation") test_tenant_isolation(args.api_url, args.tenant_id, args.test_tenant_2) print("\n" + "=" * 60) print("All tests completed") print("=" * 60) return 0 if __name__ == '__main__': sys.exit(main())