#!/usr/bin/env python3 """ Customer1 data ingestion script. Loads data from CSV and indexes into Elasticsearch with embeddings. """ import sys import os import pandas as pd import argparse # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import ConfigLoader from utils import ESClient, get_connection_from_config from indexer import DataTransformer, IndexingPipeline from embeddings import BgeEncoder, CLIPImageEncoder def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame: """ Load data from CSV file. Args: csv_path: Path to CSV file limit: Maximum number of rows to load (None for all) Returns: DataFrame with data """ print(f"[Ingestion] Loading data from: {csv_path}") df = pd.read_csv(csv_path) if limit: df = df.head(limit) print(f"[Ingestion] Loaded {len(df)} rows") print(f"[Ingestion] Columns: {df.columns.tolist()}") return df def main(): """Main ingestion function.""" parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch') parser.add_argument('--config', default='customer1', help='Customer config name') parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w', help='Path to CSV data file') parser.add_argument('--limit', type=int, help='Limit number of documents to index') parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing') parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists') parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation') args = parser.parse_args() print("="*60) print("Customer1 Data Ingestion") print("="*60) # Load configuration print(f"\n[1/6] Loading configuration: {args.config}") config_loader = ConfigLoader("config/schema") config = config_loader.load_customer_config(args.config) # Validate configuration errors = config_loader.validate_config(config) if errors: print(f"Configuration validation failed:") for error in errors: print(f" - {error}") return 1 print(f"Configuration loaded successfully") print(f" - Index: {config.es_index_name}") print(f" - Fields: {len(config.fields)}") print(f" - Indexes: {len(config.indexes)}") # Initialize Elasticsearch client print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}") os.environ['ES_HOST'] = args.es_host es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print("Failed to connect to Elasticsearch") return 1 print("Connected to Elasticsearch successfully") # Load data print(f"\n[3/6] Loading data from CSV") df = load_csv_data(args.csv, limit=args.limit) # Initialize encoders (if not skipping embeddings) text_encoder = None image_encoder = None if not args.skip_embeddings: print(f"\n[4/6] Initializing embedding encoders") print("This may take a few minutes on first run (downloading models)...") try: text_encoder = BgeEncoder() print("Text encoder initialized") except Exception as e: print(f"Warning: Failed to initialize text encoder: {e}") print("Continuing without text embeddings...") try: image_encoder = CLIPImageEncoder() print("Image encoder initialized") except Exception as e: print(f"Warning: Failed to initialize image encoder: {e}") print("Continuing without image embeddings...") else: print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)") # Initialize data transformer print(f"\n[5/6] Initializing data transformation pipeline") transformer = DataTransformer( config=config, text_encoder=text_encoder, image_encoder=image_encoder, use_cache=True ) # Run indexing pipeline print(f"\n[6/6] Starting indexing pipeline") pipeline = IndexingPipeline( config=config, es_client=es_client, data_transformer=transformer, recreate_index=args.recreate_index ) results = pipeline.run(df, batch_size=args.batch_size) # Print summary print("\n" + "="*60) print("Ingestion Complete!") print("="*60) print(f"Total documents: {results['total']}") print(f"Successfully indexed: {results['success']}") print(f"Failed: {results['failed']}") print(f"Time elapsed: {results['elapsed_time']:.2f}s") print(f"Throughput: {results['docs_per_second']:.2f} docs/s") if results['errors']: print(f"\nFirst few errors:") for error in results['errors'][:5]: print(f" - {error}") # Verify index print(f"\nVerifying index...") doc_count = es_client.count(config.es_index_name) print(f"Documents in index: {doc_count}") print("\nIngestion completed successfully!") return 0 if __name__ == "__main__": sys.exit(main())