db_connector.py
2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
Database connector utility for MySQL connections.
"""
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from typing import Dict, Any, Optional
import pymysql
def create_db_connection(
host: str,
port: int,
database: str,
username: str,
password: str,
**kwargs
) -> Any:
"""
Create a SQLAlchemy database engine for MySQL.
Args:
host: Database host
port: Database port
database: Database name
username: Username
password: Password
**kwargs: Additional connection parameters
Returns:
SQLAlchemy engine instance
"""
connection_string = (
f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"
f"?charset=utf8mb4"
)
# Add additional parameters
if kwargs:
params = "&".join([f"{k}={v}" for k, v in kwargs.items()])
connection_string += f"&{params}"
engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
pool_pre_ping=True,
echo=False
)
return engine
def get_connection_from_config(mysql_config: Dict[str, Any]) -> Any:
"""
Create database connection from configuration dictionary.
Args:
mysql_config: Dictionary with keys: host, port, database, username, password
Returns:
SQLAlchemy engine instance
"""
return create_db_connection(
host=mysql_config["host"],
port=mysql_config.get("port", 3306),
database=mysql_config["database"],
username=mysql_config["username"],
password=mysql_config["password"]
)
def test_connection(engine) -> bool:
"""
Test database connection.
Args:
engine: SQLAlchemy engine instance
Returns:
True if connection successful, False otherwise
"""
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
conn.commit()
return True
except Exception as e:
print(f"Database connection test failed: {e}")
return False