#!/usr/bin/env python3 """ 查找 Redis 中占用内存的主要 key 分析为什么统计的缓存内存和总内存差异很大 """ import redis import os import sys from collections import defaultdict from pathlib import Path from datetime import datetime # 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from config.env_config import REDIS_CONFIG def get_redis_client(): """获取 Redis 客户端""" return redis.Redis( host=REDIS_CONFIG.get('host', 'localhost'), port=REDIS_CONFIG.get('port', 6479), password=REDIS_CONFIG.get('password'), decode_responses=True, socket_timeout=10, socket_connect_timeout=10, ) def format_bytes(bytes_size): """格式化字节数为可读格式""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_size < 1024.0: return f"{bytes_size:.2f} {unit}" bytes_size /= 1024.0 return f"{bytes_size:.2f} PB" def get_key_memory_usage(client, key): """获取单个 key 的内存占用量(字节)""" try: # 使用 MEMORY USAGE 命令(Redis 4.0+) try: memory = client.execute_command('MEMORY', 'USAGE', key) return memory if memory else 0 except: # 如果 MEMORY USAGE 不可用,使用估算方法 key_size = len(key.encode('utf-8')) value = client.get(key) if value: value_size = len(value.encode('utf-8')) else: value_size = 0 overhead = 48 + 24 + 100 return key_size + value_size + overhead except Exception as e: return 0 def analyze_all_keys(client, top_n=50): """分析所有 key 的内存占用,找出占用最多的""" print("=" * 60) print("分析所有 key 的内存占用") print("=" * 60) try: # 获取总内存信息 info = client.info('memory') used_memory = info.get('used_memory', 0) used_memory_human = info.get('used_memory_human', '0B') print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)\n") # 扫描所有 key print("扫描所有 key...") all_keys = [] cursor = 0 while True: cursor, batch = client.scan(cursor, count=1000) all_keys.extend(batch) if cursor == 0: break total_keys = len(all_keys) print(f"总 key 数量: {total_keys:,}\n") # 分析 key 的命名模式 print("分析 key 命名模式...") no_prefix_count = 0 prefix_patterns = defaultdict(int) for key in all_keys: if ':' in key: prefix = key.split(':', 1)[0] prefix_patterns[prefix] += 1 else: no_prefix_count += 1 print(f" 无前缀的 key: {no_prefix_count:,}") print(f" 有前缀的 key: {total_keys - no_prefix_count:,}") print(f" 不同前缀数量: {len(prefix_patterns):,}\n") # 显示所有前缀 print("所有前缀列表:") sorted_prefixes = sorted(prefix_patterns.items(), key=lambda x: x[1], reverse=True) for prefix, count in sorted_prefixes[:20]: print(f" {prefix}:* - {count:,} 个 key") if len(sorted_prefixes) > 20: print(f" ... 还有 {len(sorted_prefixes) - 20} 个前缀") # 采样分析内存占用 print(f"\n采样分析内存占用(采样前 {min(1000, total_keys)} 个 key)...") key_memories = [] sample_size = min(1000, total_keys) import random sample_keys = random.sample(all_keys, sample_size) if total_keys > sample_size else all_keys processed = 0 for key in sample_keys: memory = get_key_memory_usage(client, key) if memory > 0: key_memories.append((key, memory)) processed += 1 if processed % 100 == 0: print(f" 已处理: {processed}/{sample_size}") # 按内存排序 key_memories.sort(key=lambda x: x[1], reverse=True) # 计算采样统计 total_sample_memory = sum(mem for _, mem in key_memories) avg_memory = total_sample_memory / len(key_memories) if key_memories else 0 estimated_total_memory = avg_memory * total_keys print(f"\n采样统计:") print(f" 采样 key 数量: {len(key_memories):,}") print(f" 采样总内存: {format_bytes(total_sample_memory)}") print(f" 平均每个 key 内存: {format_bytes(avg_memory)}") print(f" 估算所有 key 总内存: {format_bytes(estimated_total_memory)}") print(f" 实际 Redis 使用内存: {format_bytes(used_memory)}") print(f" 差异: {format_bytes(used_memory - estimated_total_memory)}") # 显示占用内存最多的 key print(f"\n占用内存最多的 {top_n} 个 key:") print(f"{'排名':<6} {'内存':<15} {'Key'}") print("-" * 80) for i, (key, memory) in enumerate(key_memories[:top_n], 1): key_display = key[:60] + "..." if len(key) > 60 else key print(f"{i:<6} {format_bytes(memory):<15} {key_display}") # 分析内存差异的原因 print("\n" + "=" * 60) print("内存差异分析") print("=" * 60) difference = used_memory - estimated_total_memory difference_percent = (difference / used_memory * 100) if used_memory > 0 else 0 print(f"实际内存: {format_bytes(used_memory)}") print(f"估算 key 内存: {format_bytes(estimated_total_memory)}") print(f"差异: {format_bytes(difference)} ({difference_percent:.1f}%)") print("\n可能的原因:") print("1. Redis 内部数据结构开销(hash table、skiplist 等)") print("2. 内存碎片") print("3. Redis 进程本身的内存占用") print("4. 其他数据结构(如 list、set、zset、hash)的内存开销更大") print("5. 采样估算的误差") # 检查是否有大 value print(f"\n检查是否有超大 value(> 1MB)...") large_values = [] for key, memory in key_memories[:100]: # 检查前 100 个最大的 if memory > 1024 * 1024: # > 1MB large_values.append((key, memory)) if large_values: print(f"发现 {len(large_values)} 个超大 value (> 1MB):") for key, memory in large_values[:10]: key_display = key[:60] + "..." if len(key) > 60 else key print(f" {format_bytes(memory):<15} {key_display}") else: print(" 未发现超大 value") # 检查 key 类型分布 print(f"\n检查 key 类型分布(采样前 1000 个)...") type_distribution = defaultdict(int) for key in sample_keys[:1000]: try: key_type = client.type(key) type_distribution[key_type] += 1 except: pass print("Key 类型分布:") for key_type, count in sorted(type_distribution.items(), key=lambda x: x[1], reverse=True): print(f" {key_type}: {count}") except Exception as e: print(f"❌ 分析失败: {e}") import traceback traceback.print_exc() def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='查找 Redis 中占用内存的主要 key') parser.add_argument('--top', type=int, default=50, help='显示占用内存最多的 N 个 key(默认 50)') args = parser.parse_args() print("Redis 内存占用分析工具") print("=" * 60) print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() try: client = get_redis_client() client.ping() print("✅ Redis 连接成功\n") except Exception as e: print(f"❌ Redis 连接失败: {e}") return analyze_all_keys(client, top_n=args.top) print("\n" + "=" * 60) print("分析完成") print("=" * 60) if __name__ == "__main__": main()