ingest_shoplazza.py 5.62 KB
#!/usr/bin/env python3
"""
Shoplazza data ingestion script.

Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer.
"""

import sys
import os
import argparse
from pathlib import Path

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.db_connector import create_db_connection
from utils.es_client import ESClient
from indexer.spu_transformer import SPUTransformer
from indexer.mapping_generator import MappingGenerator
from indexer.bulk_indexer import BulkIndexer
from config import ConfigLoader


def main():
    parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch')
    
    # Database connection
    parser.add_argument('--db-host', required=True, help='MySQL host')
    parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
    parser.add_argument('--db-database', required=True, help='MySQL database name')
    parser.add_argument('--db-username', required=True, help='MySQL username')
    parser.add_argument('--db-password', required=True, help='MySQL password')
    
    # Tenant and index
    parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
    parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
    
    # Options
    parser.add_argument('--recreate', action='store_true', help='Recreate index if exists')
    parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)')
    
    args = parser.parse_args()

    print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}")

    # Load unified configuration
    config_loader = ConfigLoader("config/config.yaml")
    try:
        config = config_loader.load_config()
        print(f"Loaded configuration: {config.es_index_name}")
    except Exception as e:
        print(f"ERROR: Failed to load configuration: {e}")
        return 1

    # Validate tenant_id field exists
    tenant_id_field = None
    for field in config.fields:
        if field.name == "tenant_id":
            tenant_id_field = field
            break
    
    if not tenant_id_field:
        print("ERROR: Configuration must include 'tenant_id' field")
        return 1

    # Connect to MySQL
    print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
    try:
        db_engine = create_db_connection(
            host=args.db_host,
            port=args.db_port,
            database=args.db_database,
            username=args.db_username,
            password=args.db_password
        )
    except Exception as e:
        print(f"ERROR: Failed to connect to MySQL: {e}")
        return 1

    # Connect to Elasticsearch (use unified config loading)
    from config.env_config import ES_CONFIG
    
    # Use provided es_host or fallback to config
    es_host = args.es_host or ES_CONFIG.get('host', 'http://localhost:9200')
    es_username = ES_CONFIG.get('username')
    es_password = ES_CONFIG.get('password')
    
    print(f"Connecting to Elasticsearch: {es_host}")
    if es_username and es_password:
        print(f"Using authentication: {es_username}")
        es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
    else:
        es_client = ESClient(hosts=[es_host])
    
    if not es_client.ping():
        print(f"ERROR: Cannot connect to Elasticsearch at {es_host}")
        return 1

    # Generate and create index
    mapping_gen = MappingGenerator(config)
    mapping = mapping_gen.generate_mapping()
    index_name = config.es_index_name

    if args.recreate:
        if es_client.index_exists(index_name):
            print(f"Deleting existing index: {index_name}")
            if not es_client.delete_index(index_name):
                print(f"ERROR: Failed to delete index '{index_name}'")
                return 1

    if not es_client.index_exists(index_name):
        print(f"Creating index: {index_name}")
        if not es_client.create_index(index_name, mapping):
            print(f"ERROR: Failed to create index '{index_name}'")
            print("Please check the mapping configuration and try again.")
            return 1
    else:
        print(f"Using existing index: {index_name}")

    # Initialize SPU transformer
    print(f"Initializing SPU transformer for tenant: {args.tenant_id}")
    transformer = SPUTransformer(db_engine, args.tenant_id)

    # Transform data
    print("Transforming SPU and SKU data...")
    try:
        documents = transformer.transform_batch()
        print(f"Transformed {len(documents)} SPU documents")
    except Exception as e:
        print(f"ERROR: Failed to transform data: {e}")
        import traceback
        traceback.print_exc()
        return 1

    if not documents:
        print("WARNING: No documents to index")
        return 0

    # Bulk index
    print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...")
    indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size)
    
    try:
        results = indexer.index_documents(documents, id_field="spu_id", show_progress=True)
        print(f"\nIngestion complete:")
        print(f"  Success: {results['success']}")
        print(f"  Failed: {results['failed']}")
        print(f"  Time: {results.get('elapsed_time', 0):.2f}s")
        
        if results['failed'] > 0:
            print(f"\nWARNING: {results['failed']} documents failed to index")
            return 1
        
        return 0
    except Exception as e:
        print(f"ERROR: Failed to index documents: {e}")
        import traceback
        traceback.print_exc()
        return 1


if __name__ == '__main__':
    sys.exit(main())