#!/usr/bin/env python3 """ Main entry point for SearchEngine operations. Provides a unified CLI for common operations: - ingest: Ingest data into Elasticsearch - serve: Start API service - search: Test search from command line """ import sys import os import argparse import json import pandas as pd import uvicorn # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ConfigLoader from utils import ESClient from indexer import DataTransformer, IndexingPipeline from embeddings import BgeEncoder, CLIPImageEncoder from search import Searcher def cmd_ingest(args): """Run data ingestion.""" print("Starting data ingestion") # Load config config_loader = ConfigLoader("config/config.yaml") config = config_loader.load_config() # Initialize ES es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") return 1 # Load data df = pd.read_csv(args.csv_file) if args.limit: df = df.head(args.limit) print(f"Loaded {len(df)} documents") # Initialize encoders text_encoder = None if args.skip_embeddings else BgeEncoder() image_encoder = None if args.skip_embeddings else CLIPImageEncoder() # Transform and index transformer = DataTransformer(config, text_encoder, image_encoder, use_cache=True) pipeline = IndexingPipeline(config, es_client, transformer, recreate_index=args.recreate) results = pipeline.run(df, batch_size=args.batch_size) print(f"\nIngestion complete:") print(f" Success: {results['success']}") print(f" Failed: {results['failed']}") print(f" Time: {results['elapsed_time']:.2f}s") return 0 def cmd_serve(args): """Start API service.""" os.environ['ES_HOST'] = args.es_host print("Starting API service (multi-tenant)...") print(f" Host: {args.host}:{args.port}") print(f" Elasticsearch: {args.es_host}") uvicorn.run( "api.app:app", host=args.host, port=args.port, reload=args.reload ) def cmd_search(args): """Test search from command line.""" # Load config config_loader = ConfigLoader("config/config.yaml") config = config_loader.load_config() # Initialize ES and searcher es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") return 1 from query import QueryParser query_parser = QueryParser(config) searcher = Searcher(config, es_client, query_parser) # Execute search print(f"Searching for: '{args.query}' (tenant: {args.tenant_id})") result = searcher.search( query=args.query, tenant_id=args.tenant_id, size=args.size ) # Display results print(f"\nFound {result.total} results in {result.took_ms}ms") print(f"Max score: {result.max_score}") if args.json: print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) else: print(f"\nTop {len(result.hits)} results:") for i, hit in enumerate(result.hits, 1): source = hit['_source'] score = hit['_score'] print(f"\n{i}. Score: {score:.4f}") print(f" ID: {source.get('skuId', 'N/A')}") print(f" Name: {source.get('name', 'N/A')}") print(f" Category: {source.get('categoryName', 'N/A')}") print(f" Brand: {source.get('brandName', 'N/A')}") return 0 def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description='SearchEngine - E-Commerce Search SaaS', formatter_class=argparse.RawDescriptionHelpFormatter ) subparsers = parser.add_subparsers(dest='command', help='Command to execute') # Ingest command ingest_parser = subparsers.add_parser('ingest', help='Ingest data into Elasticsearch') ingest_parser.add_argument('csv_file', help='Path to CSV data file') ingest_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') ingest_parser.add_argument('--limit', type=int, help='Limit number of documents') ingest_parser.add_argument('--batch-size', type=int, default=100, help='Batch size') ingest_parser.add_argument('--recreate', action='store_true', help='Recreate index') ingest_parser.add_argument('--skip-embeddings', action='store_true', help='Skip embeddings') # Serve command serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)') serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to') serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Search command search_parser = subparsers.add_parser('search', help='Test search from command line') search_parser.add_argument('query', help='Search query') search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') search_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') search_parser.add_argument('--size', type=int, default=10, help='Number of results') search_parser.add_argument('--no-translation', action='store_true', help='Disable translation') search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings') search_parser.add_argument('--json', action='store_true', help='Output JSON') args = parser.parse_args() if not args.command: parser.print_help() return 1 # Execute command if args.command == 'ingest': return cmd_ingest(args) elif args.command == 'serve': return cmd_serve(args) elif args.command == 'search': return cmd_search(args) else: print(f"Unknown command: {args.command}") return 1 if __name__ == "__main__": sys.exit(main())