test_aggregation_functionality.py 7.11 KB
#!/usr/bin/env python3
"""
Simple test script to verify aggregation functionality without external dependencies.
"""

import sys
import os
import inspect

# Add the project root to the Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

def test_es_query_builder_aggregations():
    """Test the ES query builder aggregation methods."""
    print("Testing ES Query Builder Aggregation Methods...")

    # Import the query builder
    try:
        from search.es_query_builder import ESQueryBuilder
        print("✓ ESQueryBuilder imported successfully")
    except ImportError as e:
        print(f"✗ Failed to import ESQueryBuilder: {e}")
        return False

    # Create a query builder instance
    builder = ESQueryBuilder(
        index_name="test_index",
        match_fields=["name", "description"]
    )

    # Test basic aggregation
    es_query = {"query": {"match_all": {}}}

    # Test add_dynamic_aggregations
    aggregations = {
        "category_name": {
            "type": "terms",
            "field": "categoryName_keyword",
            "size": 10
        },
        "price_ranges": {
            "type": "range",
            "field": "price",
            "ranges": [
                {"key": "0-50", "to": 50},
                {"key": "50-100", "from": 50, "to": 100}
            ]
        }
    }

    result_query = builder.add_dynamic_aggregations(es_query, aggregations)

    if "aggs" in result_query:
        print("✓ Aggregations added to query")

        # Check category aggregation
        if "category_name" in result_query["aggs"]:
            category_agg = result_query["aggs"]["category_name"]
            if "terms" in category_agg and category_agg["terms"]["field"] == "categoryName_keyword":
                print("✓ Category aggregation correctly configured")
            else:
                print("✗ Category aggregation incorrectly configured")
                return False

        # Check price range aggregation
        if "price_ranges" in result_query["aggs"]:
            price_agg = result_query["aggs"]["price_ranges"]
            if "range" in price_agg and price_agg["range"]["field"] == "price":
                print("✓ Price range aggregation correctly configured")
            else:
                print("✗ Price range aggregation incorrectly configured")
                return False
    else:
        print("✗ No aggregations added to query")
        return False

    # Test sorting
    result_query_asc = builder.add_sorting({}, "price_asc")
    if "sort" in result_query_asc:
        print("✓ Price ascending sort added")
    else:
        print("✗ Price ascending sort not added")
        return False

    result_query_desc = builder.add_sorting({}, "price_desc")
    if "sort" in result_query_desc:
        print("✓ Price descending sort added")
    else:
        print("✗ Price descending sort not added")
        return False

    result_query_time = builder.add_sorting({}, "time_desc")
    if "sort" in result_query_time:
        print("✓ Time descending sort added")
    else:
        print("✗ Time descending sort not added")
        return False

    return True


def test_searcher_integration():
    """Test searcher integration with new parameters."""
    print("\nTesting Searcher Integration...")

    try:
        from search.searcher import Searcher
        print("✓ Searcher imported successfully")
    except ImportError as e:
        print(f"✗ Failed to import Searcher: {e}")
        return False

    # We can't easily test the full searcher without ES, but we can check the method signature
    search_method = getattr(Searcher, 'search', None)

    if search_method:
        sig = inspect.signature(search_method)
        params = list(sig.parameters.keys())

        expected_params = ['query', 'size', 'from_', 'filters', 'min_score', 'aggregations', 'sort_by', 'context']
        for param in expected_params:
            if param in params:
                print(f"✓ Parameter '{param}' found in search method")
            else:
                print(f"✗ Parameter '{param}' missing from search method")
                return False
    else:
        print("✗ Search method not found in Searcher class")
        return False

    return True


def test_api_route_integration():
    """Test API route integration."""
    print("\nTesting API Route Integration...")

    try:
        from api.routes.search import router
        print("✓ Search router imported successfully")
    except ImportError as e:
        print(f"✗ Failed to import search router: {e}")
        return False

    # Check if the route exists
    routes = [route.path for route in router.routes]
    if "/" in routes:
        print("✓ Main search route found")
    else:
        print("✗ Main search route not found")
        return False

    return True


def test_configuration():
    """Test configuration parsing."""
    print("\nTesting Configuration...")

    try:
        from config import CustomerConfig
        print("✓ CustomerConfig imported successfully")
    except ImportError as e:
        print(f"✗ Failed to import CustomerConfig: {e}")
        return False

    # Try to load the customer1 config
    try:
        config = CustomerConfig.load_from_file("config/schema/customer1_config.yaml")
        print("✓ Customer1 configuration loaded successfully")

        # Check if price field is in the configuration
        field_names = [field.name for field in config.fields]
        if "price" in field_names:
            print("✓ Price field found in configuration")
        else:
            print("✗ Price field not found in configuration")
            return False

        # Check keyword fields for aggregations
        if "categoryName_keyword" in field_names:
            print("✓ Category keyword field found")
        else:
            print("✗ Category keyword field not found")
            return False

        if "brandName_keyword" in field_names:
            print("✓ Brand keyword field found")
        else:
            print("✗ Brand keyword field not found")
            return False

    except Exception as e:
        print(f"✗ Failed to load configuration: {e}")
        return False

    return True


def main():
    """Run all tests."""
    print("=== Search Engine Aggregation Functionality Tests ===\n")

    tests = [
        test_es_query_builder_aggregations,
        test_searcher_integration,
        test_api_route_integration,
        test_configuration
    ]

    passed = 0
    total = len(tests)

    for test in tests:
        try:
            if test():
                passed += 1
                print(f"✓ {test.__name__} PASSED")
            else:
                print(f"✗ {test.__name__} FAILED")
        except Exception as e:
            print(f"✗ {test.__name__} ERROR: {e}")

    print(f"\n=== Test Results: {passed}/{total} tests passed ===")

    if passed == total:
        print("🎉 All tests passed! Aggregation functionality is ready.")
        return True
    else:
        print("❌ Some tests failed. Please check the implementation.")
        return False


if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)