Blame view

utils/db_connector.py 2.11 KB
be52af70   tangwang   first commit
1
2
3
4
  """
  Database connector utility for MySQL connections.
  """
  
fb68a0ef   tangwang   配置优化
5
  from sqlalchemy import create_engine, text
be52af70   tangwang   first commit
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
  from sqlalchemy.pool import QueuePool
  from typing import Dict, Any, Optional
  import pymysql
  
  
  def create_db_connection(
      host: str,
      port: int,
      database: str,
      username: str,
      password: str,
      **kwargs
  ) -> Any:
      """
      Create a SQLAlchemy database engine for MySQL.
  
      Args:
          host: Database host
          port: Database port
          database: Database name
          username: Username
          password: Password
          **kwargs: Additional connection parameters
  
      Returns:
          SQLAlchemy engine instance
      """
      connection_string = (
          f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"
          f"?charset=utf8mb4"
      )
  
      # Add additional parameters
      if kwargs:
          params = "&".join([f"{k}={v}" for k, v in kwargs.items()])
          connection_string += f"&{params}"
  
      engine = create_engine(
          connection_string,
          poolclass=QueuePool,
          pool_size=10,
          max_overflow=20,
          pool_recycle=3600,
          pool_pre_ping=True,
          echo=False
      )
  
      return engine
  
  
  def get_connection_from_config(mysql_config: Dict[str, Any]) -> Any:
      """
      Create database connection from configuration dictionary.
  
      Args:
          mysql_config: Dictionary with keys: host, port, database, username, password
  
      Returns:
          SQLAlchemy engine instance
      """
      return create_db_connection(
          host=mysql_config["host"],
          port=mysql_config.get("port", 3306),
          database=mysql_config["database"],
          username=mysql_config["username"],
          password=mysql_config["password"]
      )
  
  
  def test_connection(engine) -> bool:
      """
      Test database connection.
  
      Args:
          engine: SQLAlchemy engine instance
  
      Returns:
          True if connection successful, False otherwise
      """
      try:
          with engine.connect() as conn:
fb68a0ef   tangwang   配置优化
87
88
              conn.execute(text("SELECT 1"))
              conn.commit()
be52af70   tangwang   first commit
89
90
91
92
          return True
      except Exception as e:
          print(f"Database connection test failed: {e}")
          return False