main.py 6.04 KB
#!/usr/bin/env python3
"""
Main entry point for SearchEngine operations.

Provides a unified CLI for common operations:
- ingest: Ingest data into Elasticsearch
- serve: Start API service
- search: Test search from command line
"""

import sys
import os
import argparse
import json
import pandas as pd
import uvicorn

# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from config import ConfigLoader
from utils import ESClient
from indexer import DataTransformer, IndexingPipeline
from embeddings import BgeEncoder, CLIPImageEncoder
from search import Searcher


def cmd_ingest(args):
    """Run data ingestion."""
    print("Starting data ingestion")

    # Load config
    config_loader = ConfigLoader("config/config.yaml")
    config = config_loader.load_config()

    # Initialize ES
    es_client = ESClient(hosts=[args.es_host])
    if not es_client.ping():
        print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}")
        return 1

    # Load data
    df = pd.read_csv(args.csv_file)
    if args.limit:
        df = df.head(args.limit)
    print(f"Loaded {len(df)} documents")

    # Initialize encoders
    text_encoder = None if args.skip_embeddings else BgeEncoder()
    image_encoder = None if args.skip_embeddings else CLIPImageEncoder()

    # Transform and index
    transformer = DataTransformer(config, text_encoder, image_encoder, use_cache=True)
    pipeline = IndexingPipeline(config, es_client, transformer, recreate_index=args.recreate)

    results = pipeline.run(df, batch_size=args.batch_size)

    print(f"\nIngestion complete:")
    print(f"  Success: {results['success']}")
    print(f"  Failed: {results['failed']}")
    print(f"  Time: {results['elapsed_time']:.2f}s")

    return 0


def cmd_serve(args):
    """Start API service."""
    os.environ['ES_HOST'] = args.es_host

    print("Starting API service (multi-tenant)...")
    print(f"  Host: {args.host}:{args.port}")
    print(f"  Elasticsearch: {args.es_host}")

    uvicorn.run(
        "api.app:app",
        host=args.host,
        port=args.port,
        reload=args.reload
    )


def cmd_search(args):
    """Test search from command line."""
    # Load config
    config_loader = ConfigLoader("config/config.yaml")
    config = config_loader.load_config()

    # Initialize ES and searcher
    es_client = ESClient(hosts=[args.es_host])
    if not es_client.ping():
        print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}")
        return 1

    from query import QueryParser
    query_parser = QueryParser(config)
    searcher = Searcher(config, es_client, query_parser)

    # Execute search
    print(f"Searching for: '{args.query}' (tenant: {args.tenant_id})")
    result = searcher.search(
        query=args.query,
        tenant_id=args.tenant_id,
        size=args.size
    )

    # Display results
    print(f"\nFound {result.total} results in {result.took_ms}ms")
    print(f"Max score: {result.max_score}")

    if args.json:
        print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False))
    else:
        print(f"\nTop {len(result.hits)} results:")
        for i, hit in enumerate(result.hits, 1):
            source = hit['_source']
            score = hit['_score']
            print(f"\n{i}. Score: {score:.4f}")
            print(f"   ID: {source.get('skuId', 'N/A')}")
            print(f"   Name: {source.get('name', 'N/A')}")
            print(f"   Category: {source.get('categoryName', 'N/A')}")
            print(f"   Brand: {source.get('brandName', 'N/A')}")

    return 0


def main():
    """Main CLI entry point."""
    parser = argparse.ArgumentParser(
        description='SearchEngine - E-Commerce Search SaaS',
        formatter_class=argparse.RawDescriptionHelpFormatter
    )

    subparsers = parser.add_subparsers(dest='command', help='Command to execute')

    # Ingest command
    ingest_parser = subparsers.add_parser('ingest', help='Ingest data into Elasticsearch')
    ingest_parser.add_argument('csv_file', help='Path to CSV data file')
    ingest_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    ingest_parser.add_argument('--limit', type=int, help='Limit number of documents')
    ingest_parser.add_argument('--batch-size', type=int, default=100, help='Batch size')
    ingest_parser.add_argument('--recreate', action='store_true', help='Recreate index')
    ingest_parser.add_argument('--skip-embeddings', action='store_true', help='Skip embeddings')

    # Serve command
    serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)')
    serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
    serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
    serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload')

    # Search command
    search_parser = subparsers.add_parser('search', help='Test search from command line')
    search_parser.add_argument('query', help='Search query')
    search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
    search_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    search_parser.add_argument('--size', type=int, default=10, help='Number of results')
    search_parser.add_argument('--no-translation', action='store_true', help='Disable translation')
    search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings')
    search_parser.add_argument('--json', action='store_true', help='Output JSON')

    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        return 1

    # Execute command
    if args.command == 'ingest':
        return cmd_ingest(args)
    elif args.command == 'serve':
        return cmd_serve(args)
    elif args.command == 'search':
        return cmd_search(args)
    else:
        print(f"Unknown command: {args.command}")
        return 1


if __name__ == "__main__":
    sys.exit(main())