""" Database connector utility for MySQL connections. """ from sqlalchemy import create_engine, text from sqlalchemy.pool import QueuePool from typing import Dict, Any, Optional import pymysql def create_db_connection( host: str, port: int, database: str, username: str, password: str, **kwargs ) -> Any: """ Create a SQLAlchemy database engine for MySQL. Args: host: Database host port: Database port database: Database name username: Username password: Password **kwargs: Additional connection parameters Returns: SQLAlchemy engine instance """ connection_string = ( f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}" f"?charset=utf8mb4" ) # Add additional parameters if kwargs: params = "&".join([f"{k}={v}" for k, v in kwargs.items()]) connection_string += f"&{params}" engine = create_engine( connection_string, poolclass=QueuePool, pool_size=10, max_overflow=20, pool_recycle=3600, pool_pre_ping=True, echo=False ) return engine def get_connection_from_config(mysql_config: Dict[str, Any]) -> Any: """ Create database connection from configuration dictionary. Args: mysql_config: Dictionary with keys: host, port, database, username, password Returns: SQLAlchemy engine instance """ return create_db_connection( host=mysql_config["host"], port=mysql_config.get("port", 3306), database=mysql_config["database"], username=mysql_config["username"], password=mysql_config["password"] ) def test_connection(engine) -> bool: """ Test database connection. Args: engine: SQLAlchemy engine instance Returns: True if connection successful, False otherwise """ try: with engine.connect() as conn: conn.execute(text("SELECT 1")) conn.commit() return True except Exception as e: print(f"Database connection test failed: {e}") return False