Blame view

offline_tasks/test_connection.py 3.28 KB
5ab1c29c   tangwang   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  """
  测试数据库和Redis连接
  用于验证配置是否正确
  """
  import sys
  import os
  sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
  from db_service import create_db_connection
  from offline_tasks.config.offline_config import DB_CONFIG, REDIS_CONFIG
  import redis
  
  
  def test_database_connection():
      """测试数据库连接"""
      print("="*80)
      print("测试数据库连接...")
      print("="*80)
      
      try:
          engine = create_db_connection(
              DB_CONFIG['host'],
              DB_CONFIG['port'],
              DB_CONFIG['database'],
              DB_CONFIG['username'],
              DB_CONFIG['password']
          )
          
          # 执行简单查询
          import pandas as pd
          df = pd.read_sql("SELECT COUNT(*) as cnt FROM sensors_events LIMIT 1", engine)
          print(f"✓ 数据库连接成功!")
          print(f"  传感器事件表记录数: {df['cnt'].iloc[0]}")
          
          # 测试商品表
          df = pd.read_sql("SELECT COUNT(*) as cnt FROM prd_goods_sku LIMIT 1", engine)
          print(f"  商品SKU表记录数: {df['cnt'].iloc[0]}")
          
          return True
          
      except Exception as e:
          print(f"✗ 数据库连接失败: {e}")
          return False
  
  
  def test_redis_connection():
      """测试Redis连接"""
      print("\n" + "="*80)
      print("测试Redis连接...")
      print("="*80)
      
      try:
          redis_client = redis.Redis(
be9c0900   tangwang   之前多个任务,配置分散,进行集中管...
54
55
56
57
              host=REDIS_CONFIG['host'],
              port=REDIS_CONFIG['port'],
              db=REDIS_CONFIG['db'],
              password=REDIS_CONFIG['password'],
5ab1c29c   tangwang   first commit
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
              decode_responses=True
          )
          
          # 测试连接
          redis_client.ping()
          print(f"✓ Redis连接成功!")
          
          # 测试读写
          test_key = "test:connection"
          test_value = "success"
          redis_client.set(test_key, test_value, ex=10)
          result = redis_client.get(test_key)
          
          if result == test_value:
              print(f"  读写测试成功")
          
          # 删除测试键
          redis_client.delete(test_key)
          
          return True
          
      except Exception as e:
          print(f"✗ Redis连接失败: {e}")
          print(f"  提示:如果Redis未安装或未启动,可以跳过Redis相关功能")
          return False
  
  
  def main():
      """主函数"""
      print("\n" + "="*80)
      print("开始测试连接配置...")
      print("="*80 + "\n")
      
      db_ok = test_database_connection()
      redis_ok = test_redis_connection()
      
      print("\n" + "="*80)
      print("测试结果汇总")
      print("="*80)
      print(f"数据库连接: {'✓ 成功' if db_ok else '✗ 失败'}")
      print(f"Redis连接:  {'✓ 成功' if redis_ok else '✗ 失败 (可选)'}")
      print("="*80)
      
      if db_ok:
          print("\n✓ 数据库连接正常,可以开始运行离线任务!")
          print("\n运行命令:")
          print("  python run_all.py --lookback_days 730 --top_n 50")
      else:
          print("\n✗ 数据库连接失败,请检查配置文件:")
          print("  offline_tasks/config/offline_config.py")
      
      if not redis_ok:
          print("\n⚠ Redis连接失败(可选),索引加载功能将不可用")
          print("  如需使用,请安装并启动Redis,或修改配置")
  
  
  if __name__ == '__main__':
      main()