test_connection.py 3.28 KB
"""
测试数据库和Redis连接
用于验证配置是否正确
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from db_service import create_db_connection
from offline_tasks.config.offline_config import DB_CONFIG, REDIS_CONFIG
import redis


def test_database_connection():
    """测试数据库连接"""
    print("="*80)
    print("测试数据库连接...")
    print("="*80)
    
    try:
        engine = create_db_connection(
            DB_CONFIG['host'],
            DB_CONFIG['port'],
            DB_CONFIG['database'],
            DB_CONFIG['username'],
            DB_CONFIG['password']
        )
        
        # 执行简单查询
        import pandas as pd
        df = pd.read_sql("SELECT COUNT(*) as cnt FROM sensors_events LIMIT 1", engine)
        print(f"✓ 数据库连接成功!")
        print(f"  传感器事件表记录数: {df['cnt'].iloc[0]}")
        
        # 测试商品表
        df = pd.read_sql("SELECT COUNT(*) as cnt FROM prd_goods_sku LIMIT 1", engine)
        print(f"  商品SKU表记录数: {df['cnt'].iloc[0]}")
        
        return True
        
    except Exception as e:
        print(f"✗ 数据库连接失败: {e}")
        return False


def test_redis_connection():
    """测试Redis连接"""
    print("\n" + "="*80)
    print("测试Redis连接...")
    print("="*80)
    
    try:
        redis_client = redis.Redis(
            host=REDIS_CONFIG['host'],
            port=REDIS_CONFIG['port'],
            db=REDIS_CONFIG['db'],
            password=REDIS_CONFIG['password'],
            decode_responses=True
        )
        
        # 测试连接
        redis_client.ping()
        print(f"✓ Redis连接成功!")
        
        # 测试读写
        test_key = "test:connection"
        test_value = "success"
        redis_client.set(test_key, test_value, ex=10)
        result = redis_client.get(test_key)
        
        if result == test_value:
            print(f"  读写测试成功")
        
        # 删除测试键
        redis_client.delete(test_key)
        
        return True
        
    except Exception as e:
        print(f"✗ Redis连接失败: {e}")
        print(f"  提示:如果Redis未安装或未启动,可以跳过Redis相关功能")
        return False


def main():
    """主函数"""
    print("\n" + "="*80)
    print("开始测试连接配置...")
    print("="*80 + "\n")
    
    db_ok = test_database_connection()
    redis_ok = test_redis_connection()
    
    print("\n" + "="*80)
    print("测试结果汇总")
    print("="*80)
    print(f"数据库连接: {'✓ 成功' if db_ok else '✗ 失败'}")
    print(f"Redis连接:  {'✓ 成功' if redis_ok else '✗ 失败 (可选)'}")
    print("="*80)
    
    if db_ok:
        print("\n✓ 数据库连接正常,可以开始运行离线任务!")
        print("\n运行命令:")
        print("  python run_all.py --lookback_days 730 --top_n 50")
    else:
        print("\n✗ 数据库连接失败,请检查配置文件:")
        print("  offline_tasks/config/offline_config.py")
    
    if not redis_ok:
        print("\n⚠ Redis连接失败(可选),索引加载功能将不可用")
        print("  如需使用,请安装并启动Redis,或修改配置")


if __name__ == '__main__':
    main()