ingest_customer1.py 7.04 KB
#!/usr/bin/env python3
"""
Customer1 data ingestion script.

Loads data from CSV and indexes into Elasticsearch with embeddings.
"""

import sys
import os
import pandas as pd
import argparse
from typing import Optional

# Add parent directory to path (go up 3 levels: customer1 -> data -> SearchEngine -> root)
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, project_root)

from config import ConfigLoader
from utils import ESClient, get_connection_from_config
from indexer import DataTransformer, IndexingPipeline
from embeddings import BgeEncoder, CLIPImageEncoder


def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame:
    """
    Load data from CSV file.

    Args:
        csv_path: Path to CSV file
        limit: Maximum number of rows to load (None for all)

    Returns:
        DataFrame with data
    """
    print(f"[Ingestion] Loading data from: {csv_path}")

    df = pd.read_csv(csv_path)

    if limit:
        df = df.head(limit)

    print(f"[Ingestion] Loaded {len(df)} rows")
    print(f"[Ingestion] Columns: {df.columns.tolist()}")

    return df


def main():
    """Main ingestion function."""
    parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch')
    parser.add_argument('--config', default='customer1', help='Customer config name')
    parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w',
                       help='Path to CSV data file')
    parser.add_argument('--limit', type=int, help='Limit number of documents to index')
    parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing')
    parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists')
    parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    parser.add_argument('--es-username', default=None, help='Elasticsearch username (or set ES_USERNAME env var)')
    parser.add_argument('--es-password', default=None, help='Elasticsearch password (or set ES_PASSWORD env var)')
    parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation')
    args = parser.parse_args()

    print("="*60)
    print("Customer1 Data Ingestion")
    print("="*60)

    # Load configuration
    print(f"\n[1/6] Loading configuration: {args.config}")
    config_loader = ConfigLoader("config/schema")
    config = config_loader.load_customer_config(args.config)

    # Validate configuration
    errors = config_loader.validate_config(config)
    if errors:
        print(f"Configuration validation failed:")
        for error in errors:
            print(f"  - {error}")
        return 1

    print(f"Configuration loaded successfully")
    print(f"  - Index: {config.es_index_name}")
    print(f"  - Fields: {len(config.fields)}")
    print(f"  - Indexes: {len(config.indexes)}")

    # Initialize Elasticsearch client
    print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}")
    
    # Get credentials: prioritize command-line args, then environment variables, then .env file
    es_username = args.es_username
    es_password = args.es_password
    
    # If not provided via args, try to load from .env file via env_config
    if not es_username or not es_password:
        try:
            from config.env_config import get_es_config
            es_config = get_es_config()
            es_username = es_username or es_config.get('username')
            es_password = es_password or es_config.get('password')
        except Exception:
            # Fallback to environment variables
            es_username = es_username or os.getenv('ES_USERNAME')
            es_password = es_password or os.getenv('ES_PASSWORD')
    
    # Create ES client with credentials if available
    if es_username and es_password:
        print(f"  Using authentication: {es_username}")
        es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password)
    else:
        print(f"  Warning: No authentication credentials found")
        print(f"  Attempting connection without authentication (will fail if ES requires auth)")
        es_client = ESClient(hosts=[args.es_host])

    if not es_client.ping():
        print("Failed to connect to Elasticsearch")
        print("\nTroubleshooting:")
        print("  1. Check if Elasticsearch is running: curl http://localhost:9200")
        print("  2. If ES requires authentication, provide credentials:")
        print("     - Use --es-username and --es-password arguments, or")
        print("     - Set ES_USERNAME and ES_PASSWORD environment variables")
        print("  3. Verify the host URL is correct: --es-host")
        return 1

    print("Connected to Elasticsearch successfully")

    # Load data
    print(f"\n[3/6] Loading data from CSV")
    df = load_csv_data(args.csv, limit=args.limit)

    # Initialize encoders (if not skipping embeddings)
    text_encoder = None
    image_encoder = None

    if not args.skip_embeddings:
        print(f"\n[4/6] Initializing embedding encoders")
        print("This may take a few minutes on first run (downloading models)...")

        try:
            text_encoder = BgeEncoder()
            print("Text encoder initialized")
        except Exception as e:
            print(f"Warning: Failed to initialize text encoder: {e}")
            print("Continuing without text embeddings...")

        try:
            image_encoder = CLIPImageEncoder()
            print("Image encoder initialized")
        except Exception as e:
            print(f"Warning: Failed to initialize image encoder: {e}")
            print("Continuing without image embeddings...")
    else:
        print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)")

    # Initialize data transformer
    print(f"\n[5/6] Initializing data transformation pipeline")
    transformer = DataTransformer(
        config=config,
        text_encoder=text_encoder,
        image_encoder=image_encoder,
        use_cache=True
    )

    # Run indexing pipeline
    print(f"\n[6/6] Starting indexing pipeline")
    pipeline = IndexingPipeline(
        config=config,
        es_client=es_client,
        data_transformer=transformer,
        recreate_index=args.recreate_index
    )

    results = pipeline.run(df, batch_size=args.batch_size)

    # Print summary
    print("\n" + "="*60)
    print("Ingestion Complete!")
    print("="*60)
    print(f"Total documents: {results['total']}")
    print(f"Successfully indexed: {results['success']}")
    print(f"Failed: {results['failed']}")
    print(f"Time elapsed: {results['elapsed_time']:.2f}s")
    print(f"Throughput: {results['docs_per_second']:.2f} docs/s")

    if results['errors']:
        print(f"\nFirst few errors:")
        for error in results['errors'][:5]:
            print(f"  - {error}")

    # Verify index
    print(f"\nVerifying index...")
    doc_count = es_client.count(config.es_index_name)
    print(f"Documents in index: {doc_count}")

    print("\nIngestion completed successfully!")
    return 0


if __name__ == "__main__":
    sys.exit(main())