#!/usr/bin/env python3 """ Main entry point for saas-search operations. Provides a unified CLI for common operations: - serve: Start API service (search + admin routes) - serve-indexer: Start dedicated Indexer API service - search: Test search from command line """ import sys import os import argparse import json # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import get_app_config from utils import ESClient from search import Searcher from suggestion import SuggestionIndexBuilder from utils.db_connector import create_db_connection from context.request_context import create_request_context def cmd_serve(args): """Start API service.""" import uvicorn os.environ['ES_HOST'] = args.es_host print("Starting API service (multi-tenant)...") print(f" Host: {args.host}:{args.port} (search + admin routes)") print(f" Elasticsearch: {args.es_host}") uvicorn.run( "api.app:app", host=args.host, port=args.port, reload=args.reload ) def cmd_serve_indexer(args): """Start dedicated Indexer API service (no search endpoints).""" import uvicorn os.environ['ES_HOST'] = args.es_host print("Starting Indexer API service...") print(f" Host: {args.host}:{args.port} (indexer only)") print(f" Elasticsearch: {args.es_host}") uvicorn.run( "api.indexer_app:app", host=args.host, port=args.port, reload=args.reload ) def cmd_search(args): """Test search from command line.""" # Load config config = get_app_config().search # Initialize ES and searcher es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") return 1 from query import QueryParser query_parser = QueryParser(config) searcher = Searcher(es_client, config, query_parser) # Execute search print(f"Searching for: '{args.query}' (tenant: {args.tenant_id})") result = searcher.search( query=args.query, tenant_id=args.tenant_id, size=args.size, context=create_request_context(), ) # Display results print(f"\nFound {result.total} results in {result.took_ms}ms") print(f"Max score: {result.max_score}") if args.json: print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) else: print(f"\nTop {len(result.hits)} results:") for i, hit in enumerate(result.hits, 1): source = hit['_source'] score = hit['_score'] print(f"\n{i}. Score: {score:.4f}") print(f" ID: {source.get('skuId', 'N/A')}") print(f" Name: {source.get('name', 'N/A')}") print(f" Category: {source.get('categoryName', 'N/A')}") print(f" Brand: {source.get('brandName', 'N/A')}") return 0 def cmd_build_suggestions(args): """Build/update suggestion index for a tenant.""" # Initialize ES client with optional authentication es_cfg = get_app_config().infrastructure.elasticsearch es_username = es_cfg.username es_password = es_cfg.password if es_username and es_password: es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password) else: es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") return 1 # Build DB config directly from environment to avoid dotenv dependency db_cfg = get_app_config().infrastructure.database db_host = db_cfg.host db_port = db_cfg.port db_name = db_cfg.database db_user = db_cfg.username db_pass = db_cfg.password if not all([db_host, db_name, db_user, db_pass]): print("ERROR: DB_HOST/DB_PORT/DB_DATABASE/DB_USERNAME/DB_PASSWORD must be set in environment") return 1 db_engine = create_db_connection( host=db_host, port=db_port, database=db_name, username=db_user, password=db_pass, ) builder = SuggestionIndexBuilder(es_client=es_client, db_engine=db_engine) if args.mode == "full": result = builder.rebuild_tenant_index( tenant_id=args.tenant_id, days=args.days, batch_size=args.batch_size, min_query_len=args.min_query_len, publish_alias=args.publish_alias, keep_versions=args.keep_versions, ) else: result = builder.incremental_update_tenant_index( tenant_id=args.tenant_id, min_query_len=args.min_query_len, fallback_days=args.incremental_fallback_days, overlap_minutes=args.overlap_minutes, bootstrap_if_missing=args.bootstrap_if_missing, bootstrap_days=args.bootstrap_days, batch_size=args.batch_size, ) print(json.dumps(result, indent=2, ensure_ascii=False)) return 0 def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description='saas-search - E-Commerce Search SaaS', formatter_class=argparse.RawDescriptionHelpFormatter ) subparsers = parser.add_subparsers(dest='command', help='Command to execute') # Serve command serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)') serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to') serve_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Serve-indexer command serve_indexer_parser = subparsers.add_parser( 'serve-indexer', help='Start dedicated Indexer API service (indexer routes only)' ) serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to') serve_indexer_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Search command search_parser = subparsers.add_parser('search', help='Test search from command line') search_parser.add_argument('query', help='Search query') search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') search_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') search_parser.add_argument('--size', type=int, default=10, help='Number of results') search_parser.add_argument('--no-translation', action='store_true', help='Disable translation') search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings') search_parser.add_argument('--json', action='store_true', help='Output JSON') # Suggestion build command suggest_build_parser = subparsers.add_parser( 'build-suggestions', help='Build tenant suggestion index (full/incremental)' ) suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID') suggest_build_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') suggest_build_parser.add_argument( '--mode', choices=['full', 'incremental'], default='full', help='Build mode: full rebuild or incremental update', ) suggest_build_parser.add_argument('--days', type=int, default=360, help='Query log lookback days') suggest_build_parser.add_argument('--batch-size', type=int, default=500, help='Product scan batch size') suggest_build_parser.add_argument('--min-query-len', type=int, default=1, help='Minimum query length') suggest_build_parser.add_argument( '--publish-alias', action=argparse.BooleanOptionalAction, default=True, help='For full mode: publish alias to new versioned index (default: true)', ) suggest_build_parser.add_argument( '--keep-versions', type=int, default=2, help='For full mode: keep latest N versioned indices', ) suggest_build_parser.add_argument( '--incremental-fallback-days', type=int, default=7, help='For incremental mode: default lookback days when no watermark', ) suggest_build_parser.add_argument( '--overlap-minutes', type=int, default=30, help='For incremental mode: overlap window to avoid late-arrival misses', ) suggest_build_parser.add_argument( '--bootstrap-if-missing', action=argparse.BooleanOptionalAction, default=True, help='For incremental mode: bootstrap with full build when active index is missing', ) suggest_build_parser.add_argument( '--bootstrap-days', type=int, default=30, help='For incremental mode bootstrap full build: query log lookback days', ) args = parser.parse_args() if not args.command: parser.print_help() return 1 # Execute command if args.command == 'serve': return cmd_serve(args) elif args.command == 'serve-indexer': return cmd_serve_indexer(args) elif args.command == 'search': return cmd_search(args) elif args.command == 'build-suggestions': return cmd_build_suggestions(args) else: print(f"Unknown command: {args.command}") return 1 if __name__ == "__main__": sys.exit(main())