#!/usr/bin/env python3 """ Shoplazza data ingestion script. Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer. """ import sys import os import argparse from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from utils.db_connector import create_db_connection from utils.es_client import ESClient from indexer.spu_transformer import SPUTransformer from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME from indexer.bulk_indexer import BulkIndexer def main(): parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch') # Database connection parser.add_argument('--db-host', required=True, help='MySQL host') parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)') parser.add_argument('--db-database', required=True, help='MySQL database name') parser.add_argument('--db-username', required=True, help='MySQL username') parser.add_argument('--db-password', required=True, help='MySQL password') # Tenant and index parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') # Options parser.add_argument('--recreate', action='store_true', help='Recreate index if exists') parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)') args = parser.parse_args() print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}") # Load mapping from JSON file try: mapping = load_mapping() print(f"Loaded mapping configuration") except Exception as e: print(f"ERROR: Failed to load mapping: {e}") return 1 index_name = DEFAULT_INDEX_NAME # Connect to MySQL print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}") try: db_engine = create_db_connection( host=args.db_host, port=args.db_port, database=args.db_database, username=args.db_username, password=args.db_password ) except Exception as e: print(f"ERROR: Failed to connect to MySQL: {e}") return 1 # Connect to Elasticsearch es_host = args.es_host es_username = os.environ.get('ES_USERNAME') es_password = os.environ.get('ES_PASSWORD') print(f"Connecting to Elasticsearch: {es_host}") if es_username and es_password: print(f"Using authentication: {es_username}") es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) else: es_client = ESClient(hosts=[es_host]) if not es_client.ping(): print(f"ERROR: Cannot connect to Elasticsearch at {es_host}") return 1 # Create index if needed if args.recreate: if es_client.index_exists(index_name): print(f"Deleting existing index: {index_name}") if not es_client.delete_index(index_name): print(f"ERROR: Failed to delete index '{index_name}'") return 1 if not es_client.index_exists(index_name): print(f"Creating index: {index_name}") if not es_client.create_index(index_name, mapping): print(f"ERROR: Failed to create index '{index_name}'") print("Please check the mapping configuration and try again.") return 1 else: print(f"Using existing index: {index_name}") # Initialize SPU transformer print(f"Initializing SPU transformer for tenant: {args.tenant_id}") transformer = SPUTransformer(db_engine, args.tenant_id) # Transform data print("Transforming SPU and SKU data...") try: documents = transformer.transform_batch() print(f"Transformed {len(documents)} SPU documents") except Exception as e: print(f"ERROR: Failed to transform data: {e}") import traceback traceback.print_exc() return 1 if not documents: print("WARNING: No documents to index") return 0 # Bulk index print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...") indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size) try: results = indexer.index_documents(documents, id_field="spu_id", show_progress=True) print(f"\nIngestion complete:") print(f" Success: {results['success']}") print(f" Failed: {results['failed']}") print(f" Time: {results.get('elapsed_time', 0):.2f}s") if results['failed'] > 0: print(f"\nWARNING: {results['failed']} documents failed to index") return 1 return 0 except Exception as e: print(f"ERROR: Failed to index documents: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())