import_test_data.py 6.95 KB
#!/usr/bin/env python3
"""
Import test data into MySQL Shoplazza tables.

Reads SQL file generated by generate_test_data.py and imports into MySQL.
"""

import sys
import os
import argparse
from pathlib import Path

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.db_connector import create_db_connection, test_connection


def import_sql_file(db_engine, sql_file: str):
    """
    Import SQL file into database.

    Args:
        db_engine: SQLAlchemy database engine
        sql_file: Path to SQL file
    """
    from sqlalchemy import text
    
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_content = f.read()
    
    # More robust SQL statement parsing
    # Handle multi-line statements and quoted strings properly
    statements = []
    current_statement = []
    in_string = False
    string_char = None
    i = 0
    
    while i < len(sql_content):
        char = sql_content[i]
        
        # Track string boundaries (single quotes)
        if char in ("'", '"') and (i == 0 or sql_content[i-1] != '\\'):
            if not in_string:
                in_string = True
                string_char = char
            elif char == string_char:
                in_string = False
                string_char = None
        
        current_statement.append(char)
        
        # Only split on semicolon if not inside a string
        if char == ';' and not in_string:
            stmt = ''.join(current_statement).strip()
            # Remove comments
            if '--' in stmt:
                # Only remove comments that are not inside strings
                parts = []
                in_quote = False
                quote_char = None
                j = 0
                while j < len(stmt):
                    c = stmt[j]
                    if c in ("'", '"') and (j == 0 or stmt[j-1] != '\\'):
                        if not in_quote:
                            in_quote = True
                            quote_char = c
                        elif c == quote_char:
                            in_quote = False
                            quote_char = None
                    if c == '-' and j < len(stmt) - 1 and stmt[j+1] == '-' and not in_quote:
                        # Found comment marker outside string
                        break
                    parts.append(c)
                    j += 1
                stmt = ''.join(parts).strip()
            
            if stmt and not stmt.startswith('--'):
                statements.append(stmt)
            current_statement = []
        
        i += 1
    
    # Handle last statement if no semicolon at end
    if current_statement:
        stmt = ''.join(current_statement).strip()
        if stmt and not stmt.startswith('--'):
            statements.append(stmt)
    
    print(f"Executing {len(statements)} SQL statements...")
    
    with db_engine.connect() as conn:
        for i, statement in enumerate(statements, 1):
            if statement:
                try:
                    conn.execute(text(statement))
                    conn.commit()
                    print(f"  [{i}/{len(statements)}] Executed successfully")
                except Exception as e:
                    print(f"  [{i}/{len(statements)}] ERROR: {e}")
                    # Show more context for debugging
                    error_start = max(0, statement.find('VALUES') - 100)
                    error_end = min(len(statement), error_start + 500)
                    print(f"  Statement context: ...{statement[error_start:error_end]}...")
                    raise


def verify_import(db_engine, tenant_id: str):
    """
    Verify imported data.

    Args:
        db_engine: SQLAlchemy database engine
        tenant_id: Tenant ID to verify
    """
    from sqlalchemy import text
    
    with db_engine.connect() as conn:
        # Count SPUs
        result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_spu WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
        spu_count = result.scalar()
        
        # Count SKUs
        result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_sku WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
        sku_count = result.scalar()
        
        print(f"\nVerification:")
        print(f"  SPUs: {spu_count}")
        print(f"  SKUs: {sku_count}")
        
        return spu_count, sku_count


def main():
    parser = argparse.ArgumentParser(description='Import test data into MySQL')
    
    # Database connection
    parser.add_argument('--db-host', required=True, help='MySQL host')
    parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
    parser.add_argument('--db-database', required=True, help='MySQL database name')
    parser.add_argument('--db-username', required=True, help='MySQL username')
    parser.add_argument('--db-password', required=True, help='MySQL password')
    
    # Import options
    parser.add_argument('--sql-file', required=True, help='SQL file to import')
    parser.add_argument('--tenant-id', help='Tenant ID to verify (optional)')
    
    args = parser.parse_args()

    print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
    
    # Connect to database
    try:
        db_engine = create_db_connection(
            host=args.db_host,
            port=args.db_port,
            database=args.db_database,
            username=args.db_username,
            password=args.db_password
        )
    except Exception as e:
        print(f"ERROR: Failed to connect to MySQL: {e}")
        return 1

    # Test connection
    if not test_connection(db_engine):
        print("ERROR: Database connection test failed")
        return 1

    print("Database connection successful")

    # Clean existing data if tenant_id provided
    if args.tenant_id:
        print(f"\nCleaning existing data for tenant_id: {args.tenant_id}")
        from sqlalchemy import text
        try:
            with db_engine.connect() as conn:
                # Delete SKUs first (foreign key constraint)
                conn.execute(text(f"DELETE FROM shoplazza_product_sku WHERE tenant_id = '{args.tenant_id}'"))
                # Delete SPUs
                conn.execute(text(f"DELETE FROM shoplazza_product_spu WHERE tenant_id = '{args.tenant_id}'"))
                conn.commit()
                print("✓ Existing data cleaned")
        except Exception as e:
            print(f"⚠ Warning: Failed to clean existing data: {e}")
            # Continue anyway

    # Import SQL file
    print(f"\nImporting SQL file: {args.sql_file}")
    try:
        import_sql_file(db_engine, args.sql_file)
        print("Import completed successfully")
    except Exception as e:
        print(f"ERROR: Failed to import SQL file: {e}")
        import traceback
        traceback.print_exc()
        return 1

    # Verify import if tenant_id provided
    if args.tenant_id:
        verify_import(db_engine, args.tenant_id)

    return 0


if __name__ == '__main__':
    sys.exit(main())