check_cache_stats.py 12.3 KB
#!/usr/bin/env python3
"""
统计各种缓存的条目数和内存占用量

按 key 前缀分类统计,帮助了解不同缓存的使用情况

使用方法:

直接使用:
python scripts/check_cache_stats.py 

或者 只统计以下三种前缀:
python scripts/check_cache_stats.py --prefix trans embedding product




其他简单的统计方法(不依赖本脚本,直接使用redis-cli命令):

# 查看所有 key 的前缀分布(快速但不准确)
redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "*" | cut -d: -f1 | sort | uniq -c | sort -rn

# 统计特定前缀的数量
redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "trans:*" | wc -l
redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "embedding:*" | wc -l

# 查看内存统计 ( Redis MEMORY STATS )
redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning MEMORY STATS

"""

import redis
import os
import sys
from collections import defaultdict
from pathlib import Path
from datetime import datetime

# 添加项目路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))

from config.env_config import REDIS_CONFIG

def get_redis_client():
    """获取 Redis 客户端"""
    return redis.Redis(
        host=REDIS_CONFIG.get('host', 'localhost'),
        port=REDIS_CONFIG.get('port', 6479),
        password=REDIS_CONFIG.get('password'),
        decode_responses=True,
        socket_timeout=10,
        socket_connect_timeout=10,
    )

def get_key_prefix(key):
    """提取 key 的前缀(第一个冒号之前的部分)"""
    if ':' in key:
        return key.split(':', 1)[0]
    return key

def format_bytes(bytes_size):
    """格式化字节数为可读格式"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if bytes_size < 1024.0:
            return f"{bytes_size:.2f} {unit}"
        bytes_size /= 1024.0
    return f"{bytes_size:.2f} PB"

def get_key_memory_usage(client, key):
    """获取单个 key 的内存占用量(字节)"""
    try:
        # 使用 MEMORY USAGE 命令(Redis 4.0+)
        try:
            memory = client.execute_command('MEMORY', 'USAGE', key)
            return memory if memory else 0
        except:
            # 如果 MEMORY USAGE 不可用,使用估算方法
            # 获取 key 和 value 的大小
            key_size = len(key.encode('utf-8'))
            
            # 获取 value
            value = client.get(key)
            if value:
                value_size = len(value.encode('utf-8'))
            else:
                # 尝试获取其他类型
                ttl = client.ttl(key)
                if ttl == -2:  # key 不存在
                    return 0
                # 估算:key + 基础开销
                value_size = 0
            
            # Redis 内存开销估算(粗略)
            # key 对象开销: ~48 bytes
            # value 对象开销: ~24 bytes
            # 其他开销: ~100 bytes
            overhead = 48 + 24 + 100
            return key_size + value_size + overhead
    except Exception as e:
        return 0

def scan_all_keys(client, pattern="*"):
    """扫描所有匹配的 key"""
    keys = []
    cursor = 0
    while True:
        cursor, batch = client.scan(cursor, match=pattern, count=1000)
        keys.extend(batch)
        if cursor == 0:
            break
    return keys

def analyze_cache_by_prefix(client):
    """按前缀分析缓存"""
    print("=" * 60)
    print("扫描 Redis 中的所有 key...")
    print("=" * 60)
    
    try:
        # 扫描所有 key
        all_keys = scan_all_keys(client)
        total_keys = len(all_keys)
        
        print(f"总 key 数量: {total_keys:,}")
        print(f"开始分析...\n")
        
        # 按前缀分类
        prefix_stats = defaultdict(lambda: {
            'count': 0,
            'memory': 0,
            'keys': [],
            'sample_keys': []  # 采样一些 key 用于显示
        })
        
        # 统计每个前缀
        processed = 0
        for key in all_keys:
            prefix = get_key_prefix(key)
            prefix_stats[prefix]['count'] += 1
            prefix_stats[prefix]['keys'].append(key)
            
            # 采样前 5 个 key
            if len(prefix_stats[prefix]['sample_keys']) < 5:
                prefix_stats[prefix]['sample_keys'].append(key)
            
            processed += 1
            if processed % 1000 == 0:
                print(f"  已处理: {processed:,} / {total_keys:,} ({processed*100//total_keys}%)")
        
        print(f"  完成: {processed:,} / {total_keys:,}\n")
        
        # 计算每个前缀的内存占用量
        print("=" * 60)
        print("计算内存占用量...")
        print("=" * 60)
        
        prefix_memory = {}
        for prefix, stats in prefix_stats.items():
            print(f"  计算 {prefix}:* 的内存...")
            total_memory = 0
            sample_count = min(100, stats['count'])  # 采样前 100 个
            
            # 如果数量较少,全部计算;否则采样计算
            if stats['count'] <= 100:
                keys_to_check = stats['keys']
            else:
                # 采样计算
                import random
                keys_to_check = random.sample(stats['keys'], sample_count)
            
            for key in keys_to_check:
                memory = get_key_memory_usage(client, key)
                total_memory += memory
            
            # 如果是采样,估算总内存
            if stats['count'] > sample_count:
                avg_memory = total_memory / sample_count
                estimated_total = avg_memory * stats['count']
                prefix_memory[prefix] = {
                    'memory': estimated_total,
                    'is_estimated': True,
                    'sample_count': sample_count
                }
            else:
                prefix_memory[prefix] = {
                    'memory': total_memory,
                    'is_estimated': False,
                    'sample_count': stats['count']
                }
        
        # 显示统计结果
        print("\n" + "=" * 60)
        print("缓存统计结果(按前缀分类)")
        print("=" * 60)
        
        # 按内存使用量排序
        sorted_prefixes = sorted(
            prefix_stats.items(),
            key=lambda x: prefix_memory[x[0]]['memory'],
            reverse=True
        )
        
        total_memory_all = sum(pm['memory'] for pm in prefix_memory.values())
        
        print(f"{'前缀':<20} {'条目数':>12} {'内存占用量':>20} {'占比':>10} {'说明'}")
        print("-" * 80)
        
        for prefix, stats in sorted_prefixes:
            memory_info = prefix_memory[prefix]
            memory = memory_info['memory']
            memory_str = format_bytes(memory)
            if memory_info['is_estimated']:
                memory_str += f" (估算, 采样 {memory_info['sample_count']})"
            
            percentage = (memory / total_memory_all * 100) if total_memory_all > 0 else 0
            
            # 添加说明
            description = ""
            if prefix == 'trans':
                description = "翻译缓存"
            elif prefix.startswith('embedding') or prefix.startswith('emb'):
                description = "向量化缓存"
            elif prefix.startswith('session') or prefix.startswith('user'):
                description = "会话/用户缓存"
            elif prefix.startswith('product') or prefix.startswith('item'):
                description = "商品缓存"
            else:
                description = "其他"
            
            print(f"{prefix:<20} {stats['count']:>12,} {memory_str:>30} {percentage:>9.1f}%  {description}")
        
        print("-" * 80)
        print(f"{'总计':<20} {total_keys:>12,} {format_bytes(total_memory_all):>30} {'100.0':>9}%")
        
        # 显示详细信息
        print("\n" + "=" * 60)
        print("详细信息(每个前缀的示例 key)")
        print("=" * 60)
        
        for prefix, stats in sorted_prefixes[:10]:  # 只显示前 10 个
            print(f"\n{prefix}:* ({stats['count']:,} 个 key)")
            print(f"  内存: {format_bytes(prefix_memory[prefix]['memory'])}")
            print(f"  示例 key:")
            for sample_key in stats['sample_keys'][:3]:
                ttl = client.ttl(sample_key)
                if ttl == -1:
                    ttl_str = "无过期时间"
                elif ttl == -2:
                    ttl_str = "已过期"
                else:
                    ttl_str = f"{ttl/86400:.1f} 天"
                key_display = sample_key[:60] + "..." if len(sample_key) > 60 else sample_key
                print(f"    - {key_display} (TTL: {ttl_str})")
        
        # 获取 Redis 总内存信息
        print("\n" + "=" * 60)
        print("Redis 内存使用情况")
        print("=" * 60)
        
        try:
            info = client.info('memory')
            used_memory = info.get('used_memory', 0)
            used_memory_human = info.get('used_memory_human', '0B')
            maxmemory = info.get('maxmemory', 0)
            maxmemory_human = info.get('maxmemory_human', '0B')
            
            print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)")
            print(f"统计的缓存内存: {format_bytes(total_memory_all)}")
            print(f"内存占比: {(total_memory_all / used_memory * 100) if used_memory > 0 else 0:.1f}%")
            
            if maxmemory > 0:
                print(f"最大内存限制: {maxmemory_human} ({maxmemory:,} bytes)")
                usage_percent = (used_memory / maxmemory) * 100
                print(f"内存使用率: {usage_percent:.2f}%")
        except Exception as e:
            print(f"获取内存信息失败: {e}")
        
    except Exception as e:
        print(f"❌ 分析失败: {e}")
        import traceback
        traceback.print_exc()

def analyze_specific_prefixes(client, prefixes):
    """分析指定的前缀"""
    print("=" * 60)
    print(f"分析指定前缀: {', '.join(prefixes)}")
    print("=" * 60)
    
    for prefix in prefixes:
        pattern = f"{prefix}:*"
        keys = scan_all_keys(client, pattern=pattern)
        
        if not keys:
            print(f"\n{prefix}:* - 未找到 key")
            continue
        
        print(f"\n{prefix}:*")
        print(f"  条目数: {len(keys):,}")
        
        # 计算内存
        total_memory = 0
        sample_count = min(100, len(keys))
        import random
        sample_keys = random.sample(keys, sample_count) if len(keys) > sample_count else keys
        
        for key in sample_keys:
            memory = get_key_memory_usage(client, key)
            total_memory += memory
        
        if len(keys) > sample_count:
            avg_memory = total_memory / sample_count
            estimated_total = avg_memory * len(keys)
            print(f"  内存占用量: {format_bytes(estimated_total)} (估算, 采样 {sample_count})")
        else:
            print(f"  内存占用量: {format_bytes(total_memory)}")

def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description='统计 Redis 缓存的条目数和内存占用量')
    parser.add_argument('--prefix', nargs='+', help='指定要分析的前缀(如: trans embedding)')
    parser.add_argument('--all', action='store_true', help='分析所有前缀(默认)')
    
    args = parser.parse_args()
    
    print("Redis 缓存统计工具")
    print("=" * 60)
    print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    try:
        client = get_redis_client()
        client.ping()
        print("✅ Redis 连接成功\n")
    except Exception as e:
        print(f"❌ Redis 连接失败: {e}")
        print(f"\n请检查:")
        print(f"  - Host: {REDIS_CONFIG.get('host', 'localhost')}")
        print(f"  - Port: {REDIS_CONFIG.get('port', 6479)}")
        print(f"  - Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}")
        return
    
    if args.prefix:
        analyze_specific_prefixes(client, args.prefix)
    else:
        analyze_cache_by_prefix(client)
    
    print("\n" + "=" * 60)
    print("统计完成")
    print("=" * 60)

if __name__ == "__main__":
    main()