find_memory_usage.py 8.35 KB
#!/usr/bin/env python3
"""
查找 Redis 中占用内存的主要 key

分析为什么统计的缓存内存和总内存差异很大
"""

import redis
import os
import sys
from collections import defaultdict
from pathlib import Path
from datetime import datetime

# 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录)
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

from config.env_config import REDIS_CONFIG

def get_redis_client():
    """获取 Redis 客户端"""
    return redis.Redis(
        host=REDIS_CONFIG.get('host', 'localhost'),
        port=REDIS_CONFIG.get('port', 6479),
        password=REDIS_CONFIG.get('password'),
        decode_responses=True,
        socket_timeout=10,
        socket_connect_timeout=10,
    )

def format_bytes(bytes_size):
    """格式化字节数为可读格式"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if bytes_size < 1024.0:
            return f"{bytes_size:.2f} {unit}"
        bytes_size /= 1024.0
    return f"{bytes_size:.2f} PB"

def get_key_memory_usage(client, key):
    """获取单个 key 的内存占用量(字节)"""
    try:
        # 使用 MEMORY USAGE 命令(Redis 4.0+)
        try:
            memory = client.execute_command('MEMORY', 'USAGE', key)
            return memory if memory else 0
        except:
            # 如果 MEMORY USAGE 不可用,使用估算方法
            key_size = len(key.encode('utf-8'))
            value = client.get(key)
            if value:
                value_size = len(value.encode('utf-8'))
            else:
                value_size = 0
            overhead = 48 + 24 + 100
            return key_size + value_size + overhead
    except Exception as e:
        return 0

def analyze_all_keys(client, top_n=50):
    """分析所有 key 的内存占用,找出占用最多的"""
    print("=" * 60)
    print("分析所有 key 的内存占用")
    print("=" * 60)
    
    try:
        # 获取总内存信息
        info = client.info('memory')
        used_memory = info.get('used_memory', 0)
        used_memory_human = info.get('used_memory_human', '0B')
        
        print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)\n")
        
        # 扫描所有 key
        print("扫描所有 key...")
        all_keys = []
        cursor = 0
        while True:
            cursor, batch = client.scan(cursor, count=1000)
            all_keys.extend(batch)
            if cursor == 0:
                break
        
        total_keys = len(all_keys)
        print(f"总 key 数量: {total_keys:,}\n")
        
        # 分析 key 的命名模式
        print("分析 key 命名模式...")
        no_prefix_count = 0
        prefix_patterns = defaultdict(int)
        
        for key in all_keys:
            if ':' in key:
                prefix = key.split(':', 1)[0]
                prefix_patterns[prefix] += 1
            else:
                no_prefix_count += 1
        
        print(f"  无前缀的 key: {no_prefix_count:,}")
        print(f"  有前缀的 key: {total_keys - no_prefix_count:,}")
        print(f"  不同前缀数量: {len(prefix_patterns):,}\n")
        
        # 显示所有前缀
        print("所有前缀列表:")
        sorted_prefixes = sorted(prefix_patterns.items(), key=lambda x: x[1], reverse=True)
        for prefix, count in sorted_prefixes[:20]:
            print(f"  {prefix}:* - {count:,} 个 key")
        if len(sorted_prefixes) > 20:
            print(f"  ... 还有 {len(sorted_prefixes) - 20} 个前缀")
        
        # 采样分析内存占用
        print(f"\n采样分析内存占用(采样前 {min(1000, total_keys)} 个 key)...")
        
        key_memories = []
        sample_size = min(1000, total_keys)
        import random
        sample_keys = random.sample(all_keys, sample_size) if total_keys > sample_size else all_keys
        
        processed = 0
        for key in sample_keys:
            memory = get_key_memory_usage(client, key)
            if memory > 0:
                key_memories.append((key, memory))
            processed += 1
            if processed % 100 == 0:
                print(f"  已处理: {processed}/{sample_size}")
        
        # 按内存排序
        key_memories.sort(key=lambda x: x[1], reverse=True)
        
        # 计算采样统计
        total_sample_memory = sum(mem for _, mem in key_memories)
        avg_memory = total_sample_memory / len(key_memories) if key_memories else 0
        estimated_total_memory = avg_memory * total_keys
        
        print(f"\n采样统计:")
        print(f"  采样 key 数量: {len(key_memories):,}")
        print(f"  采样总内存: {format_bytes(total_sample_memory)}")
        print(f"  平均每个 key 内存: {format_bytes(avg_memory)}")
        print(f"  估算所有 key 总内存: {format_bytes(estimated_total_memory)}")
        print(f"  实际 Redis 使用内存: {format_bytes(used_memory)}")
        print(f"  差异: {format_bytes(used_memory - estimated_total_memory)}")
        
        # 显示占用内存最多的 key
        print(f"\n占用内存最多的 {top_n} 个 key:")
        print(f"{'排名':<6} {'内存':<15} {'Key'}")
        print("-" * 80)
        
        for i, (key, memory) in enumerate(key_memories[:top_n], 1):
            key_display = key[:60] + "..." if len(key) > 60 else key
            print(f"{i:<6} {format_bytes(memory):<15} {key_display}")
        
        # 分析内存差异的原因
        print("\n" + "=" * 60)
        print("内存差异分析")
        print("=" * 60)
        
        difference = used_memory - estimated_total_memory
        difference_percent = (difference / used_memory * 100) if used_memory > 0 else 0
        
        print(f"实际内存: {format_bytes(used_memory)}")
        print(f"估算 key 内存: {format_bytes(estimated_total_memory)}")
        print(f"差异: {format_bytes(difference)} ({difference_percent:.1f}%)")
        
        print("\n可能的原因:")
        print("1. Redis 内部数据结构开销(hash table、skiplist 等)")
        print("2. 内存碎片")
        print("3. Redis 进程本身的内存占用")
        print("4. 其他数据结构(如 list、set、zset、hash)的内存开销更大")
        print("5. 采样估算的误差")
        
        # 检查是否有大 value
        print(f"\n检查是否有超大 value(> 1MB)...")
        large_values = []
        for key, memory in key_memories[:100]:  # 检查前 100 个最大的
            if memory > 1024 * 1024:  # > 1MB
                large_values.append((key, memory))
        
        if large_values:
            print(f"发现 {len(large_values)} 个超大 value (> 1MB):")
            for key, memory in large_values[:10]:
                key_display = key[:60] + "..." if len(key) > 60 else key
                print(f"  {format_bytes(memory):<15} {key_display}")
        else:
            print("  未发现超大 value")
        
        # 检查 key 类型分布
        print(f"\n检查 key 类型分布(采样前 1000 个)...")
        type_distribution = defaultdict(int)
        for key in sample_keys[:1000]:
            try:
                key_type = client.type(key)
                type_distribution[key_type] += 1
            except:
                pass
        
        print("Key 类型分布:")
        for key_type, count in sorted(type_distribution.items(), key=lambda x: x[1], reverse=True):
            print(f"  {key_type}: {count}")
        
    except Exception as e:
        print(f"❌ 分析失败: {e}")
        import traceback
        traceback.print_exc()

def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description='查找 Redis 中占用内存的主要 key')
    parser.add_argument('--top', type=int, default=50, help='显示占用内存最多的 N 个 key(默认 50)')
    
    args = parser.parse_args()
    
    print("Redis 内存占用分析工具")
    print("=" * 60)
    print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    try:
        client = get_redis_client()
        client.ping()
        print("✅ Redis 连接成功\n")
    except Exception as e:
        print(f"❌ Redis 连接失败: {e}")
        return
    
    analyze_all_keys(client, top_n=args.top)
    
    print("\n" + "=" * 60)
    print("分析完成")
    print("=" * 60)

if __name__ == "__main__":
    main()