db_connector.py 2.11 KB
"""
Database connector utility for MySQL connections.
"""

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from typing import Dict, Any, Optional
import pymysql


def create_db_connection(
    host: str,
    port: int,
    database: str,
    username: str,
    password: str,
    **kwargs
) -> Any:
    """
    Create a SQLAlchemy database engine for MySQL.

    Args:
        host: Database host
        port: Database port
        database: Database name
        username: Username
        password: Password
        **kwargs: Additional connection parameters

    Returns:
        SQLAlchemy engine instance
    """
    connection_string = (
        f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"
        f"?charset=utf8mb4"
    )

    # Add additional parameters
    if kwargs:
        params = "&".join([f"{k}={v}" for k, v in kwargs.items()])
        connection_string += f"&{params}"

    engine = create_engine(
        connection_string,
        poolclass=QueuePool,
        pool_size=10,
        max_overflow=20,
        pool_recycle=3600,
        pool_pre_ping=True,
        echo=False
    )

    return engine


def get_connection_from_config(mysql_config: Dict[str, Any]) -> Any:
    """
    Create database connection from configuration dictionary.

    Args:
        mysql_config: Dictionary with keys: host, port, database, username, password

    Returns:
        SQLAlchemy engine instance
    """
    return create_db_connection(
        host=mysql_config["host"],
        port=mysql_config.get("port", 3306),
        database=mysql_config["database"],
        username=mysql_config["username"],
        password=mysql_config["password"]
    )


def test_connection(engine) -> bool:
    """
    Test database connection.

    Args:
        engine: SQLAlchemy engine instance

    Returns:
        True if connection successful, False otherwise
    """
    try:
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
            conn.commit()
        return True
    except Exception as e:
        print(f"Database connection test failed: {e}")
        return False