#!/usr/bin/env python3 """ Customer1 data ingestion script. Loads data from CSV and indexes into Elasticsearch with embeddings. """ import sys import os import pandas as pd import argparse from typing import Optional # Add parent directory to path (go up 3 levels: customer1 -> data -> SearchEngine -> root) project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, project_root) from config import ConfigLoader from utils import ESClient, get_connection_from_config from indexer import DataTransformer, IndexingPipeline from embeddings import BgeEncoder, CLIPImageEncoder def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame: """ Load data from CSV file. Args: csv_path: Path to CSV file limit: Maximum number of rows to load (None for all) Returns: DataFrame with data """ print(f"[Ingestion] Loading data from: {csv_path}") df = pd.read_csv(csv_path) if limit: df = df.head(limit) print(f"[Ingestion] Loaded {len(df)} rows") print(f"[Ingestion] Columns: {df.columns.tolist()}") return df def main(): """Main ingestion function.""" parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch') parser.add_argument('--config', default='customer1', help='Customer config name') parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w', help='Path to CSV data file') parser.add_argument('--limit', type=int, help='Limit number of documents to index') parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing') parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists') parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') parser.add_argument('--es-username', default=None, help='Elasticsearch username (or set ES_USERNAME env var)') parser.add_argument('--es-password', default=None, help='Elasticsearch password (or set ES_PASSWORD env var)') parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation') args = parser.parse_args() print("="*60) print("Customer1 Data Ingestion") print("="*60) # Load configuration print(f"\n[1/6] Loading configuration: {args.config}") config_loader = ConfigLoader("config/schema") config = config_loader.load_customer_config(args.config) # Validate configuration errors = config_loader.validate_config(config) if errors: print(f"Configuration validation failed:") for error in errors: print(f" - {error}") return 1 print(f"Configuration loaded successfully") print(f" - Index: {config.es_index_name}") print(f" - Fields: {len(config.fields)}") print(f" - Indexes: {len(config.indexes)}") # Initialize Elasticsearch client print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}") # Get credentials: prioritize command-line args, then environment variables, then .env file es_username = args.es_username es_password = args.es_password # If not provided via args, try to load from .env file via env_config if not es_username or not es_password: try: from config.env_config import get_es_config es_config = get_es_config() es_username = es_username or es_config.get('username') es_password = es_password or es_config.get('password') except Exception: # Fallback to environment variables es_username = es_username or os.getenv('ES_USERNAME') es_password = es_password or os.getenv('ES_PASSWORD') # Create ES client with credentials if available if es_username and es_password: print(f" Using authentication: {es_username}") es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password) else: print(f" Warning: No authentication credentials found") print(f" Attempting connection without authentication (will fail if ES requires auth)") es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print("Failed to connect to Elasticsearch") print("\nTroubleshooting:") print(" 1. Check if Elasticsearch is running: curl http://localhost:9200") print(" 2. If ES requires authentication, provide credentials:") print(" - Use --es-username and --es-password arguments, or") print(" - Set ES_USERNAME and ES_PASSWORD environment variables") print(" 3. Verify the host URL is correct: --es-host") return 1 print("Connected to Elasticsearch successfully") # Load data print(f"\n[3/6] Loading data from CSV") df = load_csv_data(args.csv, limit=args.limit) # Initialize encoders (if not skipping embeddings) text_encoder = None image_encoder = None if not args.skip_embeddings: print(f"\n[4/6] Initializing embedding encoders") print("This may take a few minutes on first run (downloading models)...") try: text_encoder = BgeEncoder() print("Text encoder initialized") except Exception as e: print(f"Warning: Failed to initialize text encoder: {e}") print("Continuing without text embeddings...") try: image_encoder = CLIPImageEncoder() print("Image encoder initialized") except Exception as e: print(f"Warning: Failed to initialize image encoder: {e}") print("Continuing without image embeddings...") else: print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)") # Initialize data transformer print(f"\n[5/6] Initializing data transformation pipeline") transformer = DataTransformer( config=config, text_encoder=text_encoder, image_encoder=image_encoder, use_cache=True ) # Run indexing pipeline print(f"\n[6/6] Starting indexing pipeline") pipeline = IndexingPipeline( config=config, es_client=es_client, data_transformer=transformer, recreate_index=args.recreate_index ) results = pipeline.run(df, batch_size=args.batch_size) # Print summary print("\n" + "="*60) print("Ingestion Complete!") print("="*60) print(f"Total documents: {results['total']}") print(f"Successfully indexed: {results['success']}") print(f"Failed: {results['failed']}") print(f"Time elapsed: {results['elapsed_time']:.2f}s") print(f"Throughput: {results['docs_per_second']:.2f} docs/s") if results['errors']: print(f"\nFirst few errors:") for error in results['errors'][:5]: print(f" - {error}") # Verify index print(f"\nVerifying index...") doc_count = es_client.count(config.es_index_name) print(f"Documents in index: {doc_count}") print("\nIngestion completed successfully!") return 0 if __name__ == "__main__": sys.exit(main())