ingest_customer1.py 5.19 KB
#!/usr/bin/env python3
"""
Customer1 data ingestion script.

Loads data from CSV and indexes into Elasticsearch with embeddings.
"""

import sys
import os
import pandas as pd
import argparse

# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from config import ConfigLoader
from utils import ESClient, get_connection_from_config
from indexer import DataTransformer, IndexingPipeline
from embeddings import BgeEncoder, CLIPImageEncoder


def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame:
    """
    Load data from CSV file.

    Args:
        csv_path: Path to CSV file
        limit: Maximum number of rows to load (None for all)

    Returns:
        DataFrame with data
    """
    print(f"[Ingestion] Loading data from: {csv_path}")

    df = pd.read_csv(csv_path)

    if limit:
        df = df.head(limit)

    print(f"[Ingestion] Loaded {len(df)} rows")
    print(f"[Ingestion] Columns: {df.columns.tolist()}")

    return df


def main():
    """Main ingestion function."""
    parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch')
    parser.add_argument('--config', default='customer1', help='Customer config name')
    parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w',
                       help='Path to CSV data file')
    parser.add_argument('--limit', type=int, help='Limit number of documents to index')
    parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing')
    parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists')
    parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation')
    args = parser.parse_args()

    print("="*60)
    print("Customer1 Data Ingestion")
    print("="*60)

    # Load configuration
    print(f"\n[1/6] Loading configuration: {args.config}")
    config_loader = ConfigLoader("config/schema")
    config = config_loader.load_customer_config(args.config)

    # Validate configuration
    errors = config_loader.validate_config(config)
    if errors:
        print(f"Configuration validation failed:")
        for error in errors:
            print(f"  - {error}")
        return 1

    print(f"Configuration loaded successfully")
    print(f"  - Index: {config.es_index_name}")
    print(f"  - Fields: {len(config.fields)}")
    print(f"  - Indexes: {len(config.indexes)}")

    # Initialize Elasticsearch client
    print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}")
    os.environ['ES_HOST'] = args.es_host
    es_client = ESClient(hosts=[args.es_host])

    if not es_client.ping():
        print("Failed to connect to Elasticsearch")
        return 1

    print("Connected to Elasticsearch successfully")

    # Load data
    print(f"\n[3/6] Loading data from CSV")
    df = load_csv_data(args.csv, limit=args.limit)

    # Initialize encoders (if not skipping embeddings)
    text_encoder = None
    image_encoder = None

    if not args.skip_embeddings:
        print(f"\n[4/6] Initializing embedding encoders")
        print("This may take a few minutes on first run (downloading models)...")

        try:
            text_encoder = BgeEncoder()
            print("Text encoder initialized")
        except Exception as e:
            print(f"Warning: Failed to initialize text encoder: {e}")
            print("Continuing without text embeddings...")

        try:
            image_encoder = CLIPImageEncoder()
            print("Image encoder initialized")
        except Exception as e:
            print(f"Warning: Failed to initialize image encoder: {e}")
            print("Continuing without image embeddings...")
    else:
        print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)")

    # Initialize data transformer
    print(f"\n[5/6] Initializing data transformation pipeline")
    transformer = DataTransformer(
        config=config,
        text_encoder=text_encoder,
        image_encoder=image_encoder,
        use_cache=True
    )

    # Run indexing pipeline
    print(f"\n[6/6] Starting indexing pipeline")
    pipeline = IndexingPipeline(
        config=config,
        es_client=es_client,
        data_transformer=transformer,
        recreate_index=args.recreate_index
    )

    results = pipeline.run(df, batch_size=args.batch_size)

    # Print summary
    print("\n" + "="*60)
    print("Ingestion Complete!")
    print("="*60)
    print(f"Total documents: {results['total']}")
    print(f"Successfully indexed: {results['success']}")
    print(f"Failed: {results['failed']}")
    print(f"Time elapsed: {results['elapsed_time']:.2f}s")
    print(f"Throughput: {results['docs_per_second']:.2f} docs/s")

    if results['errors']:
        print(f"\nFirst few errors:")
        for error in results['errors'][:5]:
            print(f"  - {error}")

    # Verify index
    print(f"\nVerifying index...")
    doc_count = es_client.count(config.es_index_name)
    print(f"Documents in index: {doc_count}")

    print("\nIngestion completed successfully!")
    return 0


if __name__ == "__main__":
    sys.exit(main())