Blame view

offline_tasks/test_connection.py 3.32 KB
5ab1c29c   tangwang   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
  """
  测试数据库和Redis连接
  用于验证配置是否正确
  """
  import sys
  import os
  sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
  from db_service import create_db_connection
  from offline_tasks.config.offline_config import DB_CONFIG, REDIS_CONFIG
  import redis
  
  
  def test_database_connection():
      """测试数据库连接"""
      print("="*80)
      print("测试数据库连接...")
      print("="*80)
      
      try:
          engine = create_db_connection(
              DB_CONFIG['host'],
              DB_CONFIG['port'],
              DB_CONFIG['database'],
              DB_CONFIG['username'],
              DB_CONFIG['password']
          )
          
          # 执行简单查询
          import pandas as pd
          df = pd.read_sql("SELECT COUNT(*) as cnt FROM sensors_events LIMIT 1", engine)
          print(f"✓ 数据库连接成功!")
          print(f"  传感器事件表记录数: {df['cnt'].iloc[0]}")
          
          # 测试商品表
          df = pd.read_sql("SELECT COUNT(*) as cnt FROM prd_goods_sku LIMIT 1", engine)
          print(f"  商品SKU表记录数: {df['cnt'].iloc[0]}")
          
          return True
          
      except Exception as e:
          print(f"✗ 数据库连接失败: {e}")
          return False
  
  
  def test_redis_connection():
      """测试Redis连接"""
      print("\n" + "="*80)
      print("测试Redis连接...")
      print("="*80)
      
      try:
          redis_client = redis.Redis(
              host=REDIS_CONFIG.get('host', 'localhost'),
              port=REDIS_CONFIG.get('port', 6379),
              db=REDIS_CONFIG.get('db', 0),
              password=REDIS_CONFIG.get('password'),
              decode_responses=True
          )
          
          # 测试连接
          redis_client.ping()
          print(f"✓ Redis连接成功!")
          
          # 测试读写
          test_key = "test:connection"
          test_value = "success"
          redis_client.set(test_key, test_value, ex=10)
          result = redis_client.get(test_key)
          
          if result == test_value:
              print(f"  读写测试成功")
          
          # 删除测试键
          redis_client.delete(test_key)
          
          return True
          
      except Exception as e:
          print(f"✗ Redis连接失败: {e}")
          print(f"  提示:如果Redis未安装或未启动,可以跳过Redis相关功能")
          return False
  
  
  def main():
      """主函数"""
      print("\n" + "="*80)
      print("开始测试连接配置...")
      print("="*80 + "\n")
      
      db_ok = test_database_connection()
      redis_ok = test_redis_connection()
      
      print("\n" + "="*80)
      print("测试结果汇总")
      print("="*80)
      print(f"数据库连接: {'✓ 成功' if db_ok else '✗ 失败'}")
      print(f"Redis连接:  {'✓ 成功' if redis_ok else '✗ 失败 (可选)'}")
      print("="*80)
      
      if db_ok:
          print("\n✓ 数据库连接正常,可以开始运行离线任务!")
          print("\n运行命令:")
          print("  python run_all.py --lookback_days 730 --top_n 50")
      else:
          print("\n✗ 数据库连接失败,请检查配置文件:")
          print("  offline_tasks/config/offline_config.py")
      
      if not redis_ok:
          print("\n⚠ Redis连接失败(可选),索引加载功能将不可用")
          print("  如需使用,请安装并启动Redis,或修改配置")
  
  
  if __name__ == '__main__':
      main()