#!/usr/bin/env python3 """ 统计各种缓存的条目数和内存占用量 按 key 前缀分类统计,帮助了解不同缓存的使用情况 使用方法: 直接使用(默认数据库 0): python scripts/redis/check_cache_stats.py 统计所有数据库: python scripts/redis/check_cache_stats.py --all-db 统计指定数据库: python scripts/redis/check_cache_stats.py --db 1 只统计以下三种前缀: python scripts/redis/check_cache_stats.py --prefix trans embedding product 统计所有数据库的指定前缀: python scripts/redis/check_cache_stats.py --all-db --prefix trans embedding 其他简单的统计方法(不依赖本脚本,直接使用redis-cli命令): # 查看所有 key 的前缀分布(快速但不准确) redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "*" | cut -d: -f1 | sort | uniq -c | sort -rn # 统计特定前缀的数量 redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "trans:*" | wc -l redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "embedding:*" | wc -l # 查看内存统计 ( Redis MEMORY STATS ) redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning MEMORY STATS """ import redis import os import sys from collections import defaultdict from pathlib import Path from datetime import datetime # 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from config.env_config import REDIS_CONFIG def get_redis_client(db=0): """获取 Redis 客户端""" return redis.Redis( host=REDIS_CONFIG.get('host', 'localhost'), port=REDIS_CONFIG.get('port', 6479), password=REDIS_CONFIG.get('password'), db=db, decode_responses=True, socket_timeout=10, socket_connect_timeout=10, ) def get_key_prefix(key): """提取 key 的前缀(第一个冒号之前的部分)""" if ':' in key: return key.split(':', 1)[0] return key def format_bytes(bytes_size): """格式化字节数为可读格式""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_size < 1024.0: return f"{bytes_size:.2f} {unit}" bytes_size /= 1024.0 return f"{bytes_size:.2f} PB" def get_key_memory_usage(client, key, use_real_memory=True): """ 获取单个 key 的内存占用量(字节) Args: client: Redis 客户端 key: key 名称 use_real_memory: 是否使用真实的 MEMORY USAGE 命令(True=真实,False=估算) Returns: 内存占用量(字节) """ try: if use_real_memory: # 使用 MEMORY USAGE 命令(Redis 4.0+)- 这是真实的内存占用 try: memory = client.execute_command('MEMORY', 'USAGE', key) return memory if memory else 0 except: # 如果 MEMORY USAGE 不可用,降级到估算方法 pass # 估算方法(不够准确,但速度快) # 获取 key 和 value 的大小 key_size = len(key.encode('utf-8')) # 获取 value value = client.get(key) if value: value_size = len(value.encode('utf-8')) else: # 尝试获取其他类型 ttl = client.ttl(key) if ttl == -2: # key 不存在 return 0 # 估算:key + 基础开销 value_size = 0 # Redis 内存开销估算(粗略) # key 对象开销: ~48 bytes # value 对象开销: ~24 bytes # 其他开销: ~100 bytes # 注意:这个估算不准确,特别是对于复杂数据结构(hash、set、zset等) overhead = 48 + 24 + 100 return key_size + value_size + overhead except Exception as e: return 0 def scan_all_keys(client, pattern="*"): """扫描所有匹配的 key""" keys = [] cursor = 0 while True: cursor, batch = client.scan(cursor, match=pattern, count=1000) keys.extend(batch) if cursor == 0: break return keys def analyze_cache_by_prefix(client, args=None, db_num=0): """按前缀分析缓存""" if args is None: class Args: real = False sample_size = 100 args = Args() # 显示当前数据库 if db_num > 0: print(f"\n{'='*60}") print(f"数据库 {db_num}") print(f"{'='*60}\n") print("=" * 60) print("扫描 Redis 中的所有 key...") print("=" * 60) try: # 扫描所有 key all_keys = scan_all_keys(client) total_keys = len(all_keys) print(f"总 key 数量: {total_keys:,}") print(f"开始分析...\n") # 按前缀分类 prefix_stats = defaultdict(lambda: { 'count': 0, 'memory': 0, 'keys': [], 'sample_keys': [] # 采样一些 key 用于显示 }) # 统计每个前缀 processed = 0 for key in all_keys: prefix = get_key_prefix(key) prefix_stats[prefix]['count'] += 1 prefix_stats[prefix]['keys'].append(key) # 采样前 5 个 key if len(prefix_stats[prefix]['sample_keys']) < 5: prefix_stats[prefix]['sample_keys'].append(key) processed += 1 if processed % 1000 == 0: print(f" 已处理: {processed:,} / {total_keys:,} ({processed*100//total_keys}%)") print(f" 完成: {processed:,} / {total_keys:,}\n") # 计算每个前缀的内存占用量 print("=" * 60) print("计算内存占用量...") print("=" * 60) print("注意:") print(" - 如果 key 数量 > 100,会采样前 100 个进行估算") print(" - 优先使用 Redis MEMORY USAGE 命令(真实值)") print(" - 如果 MEMORY USAGE 不可用,会使用估算方法(不准确)") print(" - 估算方法只计算 key+value 大小,不包括 Redis 内部数据结构开销") print() # 测试是否支持 MEMORY USAGE test_key = all_keys[0] if all_keys else None supports_memory_usage = False if test_key: try: client.execute_command('MEMORY', 'USAGE', test_key) supports_memory_usage = True print("✅ Redis 支持 MEMORY USAGE 命令,将使用真实内存值") except: print("⚠️ Redis 不支持 MEMORY USAGE 命令,将使用估算方法(可能不准确)") print() prefix_memory = {} for prefix, stats in prefix_stats.items(): print(f" 计算 {prefix}:* 的内存...") total_memory = 0 # 如果指定了 --real,且数量不太大,计算全部 if args.real and stats['count'] <= 10000: sample_count = stats['count'] else: sample_count = min(args.sample_size, stats['count']) # 采样 # 如果数量较少,全部计算;否则采样计算 if stats['count'] <= 100: keys_to_check = stats['keys'] is_sampled = False else: # 采样计算 import random keys_to_check = random.sample(stats['keys'], sample_count) is_sampled = True for key in keys_to_check: memory = get_key_memory_usage(client, key, use_real_memory=supports_memory_usage) total_memory += memory # 如果是采样,估算总内存 if is_sampled: avg_memory = total_memory / sample_count estimated_total = avg_memory * stats['count'] prefix_memory[prefix] = { 'memory': estimated_total, 'is_estimated': True, 'is_sampled': True, 'sample_count': sample_count, 'uses_real_memory': supports_memory_usage } else: prefix_memory[prefix] = { 'memory': total_memory, 'is_estimated': False, 'is_sampled': False, 'sample_count': stats['count'], 'uses_real_memory': supports_memory_usage } # 显示统计结果 print("\n" + "=" * 60) print("缓存统计结果(按前缀分类)") print("=" * 60) # 按内存使用量排序 sorted_prefixes = sorted( prefix_stats.items(), key=lambda x: prefix_memory[x[0]]['memory'], reverse=True ) total_memory_all = sum(pm['memory'] for pm in prefix_memory.values()) print(f"{'前缀':<20} {'条目数':>12} {'内存占用量和计算方式':>50} {'占比':>10} {'说明'}") print("-" * 120) for prefix, stats in sorted_prefixes: memory_info = prefix_memory[prefix] memory = memory_info['memory'] # 计算平均每条 key 的大小 avg_memory_per_key = memory / stats['count'] if stats['count'] > 0 else 0 avg_memory_str = format_bytes(avg_memory_per_key) # 标注内存计算方式和结果 if memory_info['is_sampled']: if memory_info['uses_real_memory']: calc_method = f"采样估算(采样{memory_info['sample_count']}个, 使用真实MEMORY USAGE)" else: calc_method = f"采样估算(采样{memory_info['sample_count']}个, 估算方法)" else: if memory_info['uses_real_memory']: calc_method = "真实值(全部计算, 使用MEMORY USAGE)" else: calc_method = "估算值(全部计算, 估算方法)" memory_str = f"{format_bytes(memory)} | 每条: {avg_memory_str} | {calc_method}" percentage = (memory / total_memory_all * 100) if total_memory_all > 0 else 0 # 添加说明 description = "" if prefix == 'trans': description = "翻译缓存" elif prefix.startswith('embedding') or prefix.startswith('emb'): description = "向量化缓存" elif prefix.startswith('session') or prefix.startswith('user'): description = "会话/用户缓存" elif prefix.startswith('product') or prefix.startswith('item'): description = "商品缓存" else: description = "其他" # 格式化输出,内存信息可能很长,需要适当处理 memory_display = memory_str[:70] + "..." if len(memory_str) > 70 else memory_str print(f"{prefix:<20} {stats['count']:>12,} {memory_display:<70} {percentage:>9.1f}% {description}") print("-" * 120) avg_total = total_memory_all / total_keys if total_keys > 0 else 0 total_display = f"{format_bytes(total_memory_all)} | 每条: {format_bytes(avg_total)}" print(f"{'总计':<20} {total_keys:>12,} {total_display:<70} {'100.0':>9}%") # 显示详细信息 print("\n" + "=" * 60) print("详细信息(每个前缀的示例 key)") print("=" * 60) for prefix, stats in sorted_prefixes[:10]: # 只显示前 10 个 mem_info = prefix_memory[prefix] avg_per_key = mem_info['memory'] / stats['count'] if stats['count'] > 0 else 0 print(f"\n{prefix}:* ({stats['count']:,} 个 key)") print(f" 总内存: {format_bytes(mem_info['memory'])}") print(f" 每条 key 平均: {format_bytes(avg_per_key)}") # 显示计算方式 if mem_info['is_sampled']: if mem_info['uses_real_memory']: print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用真实 MEMORY USAGE)") else: print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用估算方法)") else: if mem_info['uses_real_memory']: print(f" 计算方式: 真实值(全部计算,使用 MEMORY USAGE)") else: print(f" 计算方式: 估算值(全部计算,使用估算方法)") print(f" 示例 key:") for sample_key in stats['sample_keys'][:3]: ttl = client.ttl(sample_key) if ttl == -1: ttl_str = "无过期时间" elif ttl == -2: ttl_str = "已过期" else: ttl_str = f"{ttl/86400:.1f} 天" key_display = sample_key[:60] + "..." if len(sample_key) > 60 else sample_key print(f" - {key_display} (TTL: {ttl_str})") # 获取 Redis 总内存信息 print("\n" + "=" * 60) print("Redis 内存使用情况") print("=" * 60) try: info = client.info('memory') used_memory = info.get('used_memory', 0) used_memory_human = info.get('used_memory_human', '0B') maxmemory = info.get('maxmemory', 0) maxmemory_human = info.get('maxmemory_human', '0B') print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)") print(f"统计的缓存内存: {format_bytes(total_memory_all)}") print(f"内存占比: {(total_memory_all / used_memory * 100) if used_memory > 0 else 0:.1f}%") if maxmemory > 0: print(f"最大内存限制: {maxmemory_human} ({maxmemory:,} bytes)") usage_percent = (used_memory / maxmemory) * 100 print(f"内存使用率: {usage_percent:.2f}%") except Exception as e: print(f"获取内存信息失败: {e}") except Exception as e: print(f"❌ 分析失败: {e}") import traceback traceback.print_exc() def analyze_specific_prefixes(client, prefixes, db_num=0): """分析指定的前缀""" print("=" * 60) if db_num > 0: print(f"数据库 {db_num} - 分析指定前缀: {', '.join(prefixes)}") else: print(f"分析指定前缀: {', '.join(prefixes)}") print("=" * 60) for prefix in prefixes: pattern = f"{prefix}:*" keys = scan_all_keys(client, pattern=pattern) if not keys: print(f"\n{prefix}:* - 未找到 key") continue print(f"\n{prefix}:*") print(f" 条目数: {len(keys):,}") # 计算内存 total_memory = 0 sample_count = min(100, len(keys)) import random sample_keys = random.sample(keys, sample_count) if len(keys) > sample_count else keys for key in sample_keys: memory = get_key_memory_usage(client, key) total_memory += memory if len(keys) > sample_count: avg_memory = total_memory / sample_count estimated_total = avg_memory * len(keys) print(f" 内存占用量: {format_bytes(estimated_total)} (估算, 采样 {sample_count})") else: print(f" 内存占用量: {format_bytes(total_memory)}") def get_all_databases(): """获取所有有数据的数据库列表""" databases = [] # Redis 默认有 16 个数据库(0-15) for db_num in range(16): try: client = get_redis_client(db=db_num) client.ping() # 检查是否有 key key_count = client.dbsize() if key_count > 0: databases.append(db_num) except: pass return databases def analyze_all_databases(args): """分析所有数据库""" print("=" * 60) print("扫描所有数据库...") print("=" * 60) databases = get_all_databases() if not databases: print("未找到有数据的数据库") return print(f"发现 {len(databases)} 个有数据的数据库: {databases}\n") # 汇总统计 total_stats_by_prefix = defaultdict(lambda: {'count': 0, 'memory': 0, 'dbs': []}) total_keys_all_db = 0 total_memory_all_db = 0 for db_num in databases: try: client = get_redis_client(db=db_num) client.ping() db_size = client.dbsize() print(f"\n{'='*60}") print(f"数据库 {db_num} (共 {db_size:,} 个 key)") print(f"{'='*60}") if args.prefix: analyze_specific_prefixes(client, args.prefix, db_num=db_num) else: # 分析当前数据库 analyze_cache_by_prefix(client, args, db_num=db_num) # 收集统计信息(简化版,只统计 key 数量) total_keys_all_db += db_size except Exception as e: print(f"❌ 数据库 {db_num} 分析失败: {e}") import traceback traceback.print_exc() continue # 显示汇总统计 if not args.prefix: print("\n" + "=" * 60) print("所有数据库汇总") print("=" * 60) print(f"有数据的数据库: {len(databases)} 个 ({', '.join(map(str, databases))})") print(f"总 key 数量: {total_keys_all_db:,}") print(f"\n提示: 要查看详细的内存统计,请分别运行每个数据库:") for db_num in databases: print(f" python scripts/redis/check_cache_stats.py --db {db_num}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='统计 Redis 缓存的条目数和内存占用量') parser.add_argument('--prefix', nargs='+', help='指定要分析的前缀(如: trans embedding)') parser.add_argument('--all', action='store_true', help='分析所有前缀(默认)') parser.add_argument('--real', action='store_true', help='计算所有 key 的真实内存(很慢,但准确)') parser.add_argument('--sample-size', type=int, default=100, help='采样大小(默认 100,仅当 key 数量 > 采样大小时使用)') parser.add_argument('--db', type=int, help='指定数据库编号(0-15),默认只统计 db 0') parser.add_argument('--all-db', action='store_true', help='统计所有数据库(0-15)') args = parser.parse_args() print("Redis 缓存统计工具") print("=" * 60) print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() # 如果指定了 --all-db,分析所有数据库 if args.all_db: analyze_all_databases(args) print("\n" + "=" * 60) print("统计完成") print("=" * 60) return # 否则分析指定或默认的数据库 db_num = args.db if args.db is not None else 0 try: client = get_redis_client(db=db_num) client.ping() if db_num > 0: print(f"✅ Redis 连接成功(数据库 {db_num})\n") else: print("✅ Redis 连接成功(默认数据库 0)\n") except Exception as e: print(f"❌ Redis 连接失败: {e}") print(f"\n请检查:") print(f" - Host: {REDIS_CONFIG.get('host', 'localhost')}") print(f" - Port: {REDIS_CONFIG.get('port', 6479)}") print(f" - Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}") print(f" - Database: {db_num}") return if args.prefix: analyze_specific_prefixes(client, args.prefix, db_num=db_num) else: # 传递参数到分析函数 analyze_cache_by_prefix(client, args, db_num=db_num) print("\n" + "=" * 60) print("统计完成") print("=" * 60) if __name__ == "__main__": main()