#!/usr/bin/env python3 """ 实时监控 Redis 缓存淘汰事件 持续监控 evicted_keys 统计,当有新的淘汰发生时发出警告 """ import redis import time import sys from pathlib import Path from datetime import datetime # 添加项目路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from config.env_config import REDIS_CONFIG def get_redis_client(): """获取 Redis 客户端""" return redis.Redis( host=REDIS_CONFIG.get('host', 'localhost'), port=REDIS_CONFIG.get('port', 6479), password=REDIS_CONFIG.get('password'), decode_responses=True, socket_timeout=5, socket_connect_timeout=5, ) def monitor_eviction(interval=5): """持续监控淘汰事件""" print("=" * 60) print("Redis 缓存淘汰实时监控") print("=" * 60) print(f"监控间隔: {interval} 秒") print("按 Ctrl+C 停止监控") print("=" * 60) print() try: client = get_redis_client() client.ping() except Exception as e: print(f"❌ Redis 连接失败: {e}") return last_evicted = 0 try: while True: info = client.info('stats') current_evicted = info.get('evicted_keys', 0) if current_evicted > last_evicted: new_evictions = current_evicted - last_evicted timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] ⚠️ 检测到 {new_evictions} 个新的淘汰事件!") print(f" 累计淘汰总数: {current_evicted:,}") # 检查内存使用情况 mem_info = client.info('memory') maxmemory = mem_info.get('maxmemory', 0) used_memory = mem_info.get('used_memory', 0) if maxmemory > 0: usage_percent = (used_memory / maxmemory) * 100 print(f" 当前内存使用率: {usage_percent:.2f}%") last_evicted = current_evicted else: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] ✅ 无新淘汰事件 (累计: {current_evicted:,})") time.sleep(interval) except KeyboardInterrupt: print("\n\n监控已停止") except Exception as e: print(f"\n❌ 监控出错: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='实时监控 Redis 缓存淘汰事件') parser.add_argument('--interval', type=int, default=5, help='监控间隔(秒),默认 5 秒') args = parser.parse_args() monitor_eviction(interval=args.interval)