import_test_data.py 5.16 KB
#!/usr/bin/env python3
"""
Import test data into MySQL Shoplazza tables.

Reads SQL file generated by generate_test_data.py and imports into MySQL.
"""

import sys
import os
import argparse
from pathlib import Path

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.db_connector import create_db_connection, test_connection


def import_sql_file(db_engine, sql_file: str):
    """
    Import SQL file into database.

    Args:
        db_engine: SQLAlchemy database engine
        sql_file: Path to SQL file
    """
    from sqlalchemy import text
    
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_content = f.read()
    
    # Split by semicolons to get individual statements
    # Remove comments and empty lines, then split by semicolon
    lines = []
    for line in sql_content.split('\n'):
        # Remove inline comments
        if '--' in line:
            line = line[:line.index('--')]
        line = line.strip()
        if line and not line.startswith('--'):
            lines.append(line)
    
    # Join lines and split by semicolon
    full_text = ' '.join(lines)
    statements = [s.strip() for s in full_text.split(';') if s.strip()]
    
    print(f"Executing {len(statements)} SQL statements...")
    
    with db_engine.connect() as conn:
        for i, statement in enumerate(statements, 1):
            if statement:
                try:
                    conn.execute(text(statement))
                    conn.commit()
                    print(f"  [{i}/{len(statements)}] Executed successfully")
                except Exception as e:
                    print(f"  [{i}/{len(statements)}] ERROR: {e}")
                    print(f"  Statement: {statement[:200]}...")
                    raise


def verify_import(db_engine, tenant_id: str):
    """
    Verify imported data.

    Args:
        db_engine: SQLAlchemy database engine
        tenant_id: Tenant ID to verify
    """
    from sqlalchemy import text
    
    with db_engine.connect() as conn:
        # Count SPUs
        result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_spu WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
        spu_count = result.scalar()
        
        # Count SKUs
        result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_sku WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
        sku_count = result.scalar()
        
        print(f"\nVerification:")
        print(f"  SPUs: {spu_count}")
        print(f"  SKUs: {sku_count}")
        
        return spu_count, sku_count


def main():
    parser = argparse.ArgumentParser(description='Import test data into MySQL')
    
    # Database connection
    parser.add_argument('--db-host', required=True, help='MySQL host')
    parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
    parser.add_argument('--db-database', required=True, help='MySQL database name')
    parser.add_argument('--db-username', required=True, help='MySQL username')
    parser.add_argument('--db-password', required=True, help='MySQL password')
    
    # Import options
    parser.add_argument('--sql-file', required=True, help='SQL file to import')
    parser.add_argument('--tenant-id', help='Tenant ID to verify (optional)')
    
    args = parser.parse_args()

    print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
    
    # Connect to database
    try:
        db_engine = create_db_connection(
            host=args.db_host,
            port=args.db_port,
            database=args.db_database,
            username=args.db_username,
            password=args.db_password
        )
    except Exception as e:
        print(f"ERROR: Failed to connect to MySQL: {e}")
        return 1

    # Test connection
    if not test_connection(db_engine):
        print("ERROR: Database connection test failed")
        return 1

    print("Database connection successful")

    # Clean existing data if tenant_id provided
    if args.tenant_id:
        print(f"\nCleaning existing data for tenant_id: {args.tenant_id}")
        from sqlalchemy import text
        try:
            with db_engine.connect() as conn:
                # Delete SKUs first (foreign key constraint)
                conn.execute(text(f"DELETE FROM shoplazza_product_sku WHERE tenant_id = '{args.tenant_id}'"))
                # Delete SPUs
                conn.execute(text(f"DELETE FROM shoplazza_product_spu WHERE tenant_id = '{args.tenant_id}'"))
                conn.commit()
                print("✓ Existing data cleaned")
        except Exception as e:
            print(f"⚠ Warning: Failed to clean existing data: {e}")
            # Continue anyway

    # Import SQL file
    print(f"\nImporting SQL file: {args.sql_file}")
    try:
        import_sql_file(db_engine, args.sql_file)
        print("Import completed successfully")
    except Exception as e:
        print(f"ERROR: Failed to import SQL file: {e}")
        import traceback
        traceback.print_exc()
        return 1

    # Verify import if tenant_id provided
    if args.tenant_id:
        verify_import(db_engine, args.tenant_id)

    return 0


if __name__ == '__main__':
    sys.exit(main())