#!/usr/bin/env python3 """ 统计各种缓存的条目数和内存占用量 按 key 前缀分类统计,帮助了解不同缓存的使用情况 使用方法: 直接使用: python scripts/check_cache_stats.py 或者 只统计以下三种前缀: python scripts/check_cache_stats.py --prefix trans embedding product 其他简单的统计方法(不依赖本脚本,直接使用redis-cli命令): # 查看所有 key 的前缀分布(快速但不准确) redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "*" | cut -d: -f1 | sort | uniq -c | sort -rn # 统计特定前缀的数量 redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "trans:*" | wc -l redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning --scan --pattern "embedding:*" | wc -l # 查看内存统计 ( Redis MEMORY STATS ) redis-cli -h localhost -p 6479 -a 'BMfv5aI31kgHWtlx' --no-auth-warning MEMORY STATS """ import redis import os import sys from collections import defaultdict from pathlib import Path from datetime import datetime # 添加项目路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from config.env_config import REDIS_CONFIG def get_redis_client(): """获取 Redis 客户端""" return redis.Redis( host=REDIS_CONFIG.get('host', 'localhost'), port=REDIS_CONFIG.get('port', 6479), password=REDIS_CONFIG.get('password'), decode_responses=True, socket_timeout=10, socket_connect_timeout=10, ) def get_key_prefix(key): """提取 key 的前缀(第一个冒号之前的部分)""" if ':' in key: return key.split(':', 1)[0] return key def format_bytes(bytes_size): """格式化字节数为可读格式""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_size < 1024.0: return f"{bytes_size:.2f} {unit}" bytes_size /= 1024.0 return f"{bytes_size:.2f} PB" def get_key_memory_usage(client, key): """获取单个 key 的内存占用量(字节)""" try: # 使用 MEMORY USAGE 命令(Redis 4.0+) try: memory = client.execute_command('MEMORY', 'USAGE', key) return memory if memory else 0 except: # 如果 MEMORY USAGE 不可用,使用估算方法 # 获取 key 和 value 的大小 key_size = len(key.encode('utf-8')) # 获取 value value = client.get(key) if value: value_size = len(value.encode('utf-8')) else: # 尝试获取其他类型 ttl = client.ttl(key) if ttl == -2: # key 不存在 return 0 # 估算:key + 基础开销 value_size = 0 # Redis 内存开销估算(粗略) # key 对象开销: ~48 bytes # value 对象开销: ~24 bytes # 其他开销: ~100 bytes overhead = 48 + 24 + 100 return key_size + value_size + overhead except Exception as e: return 0 def scan_all_keys(client, pattern="*"): """扫描所有匹配的 key""" keys = [] cursor = 0 while True: cursor, batch = client.scan(cursor, match=pattern, count=1000) keys.extend(batch) if cursor == 0: break return keys def analyze_cache_by_prefix(client): """按前缀分析缓存""" print("=" * 60) print("扫描 Redis 中的所有 key...") print("=" * 60) try: # 扫描所有 key all_keys = scan_all_keys(client) total_keys = len(all_keys) print(f"总 key 数量: {total_keys:,}") print(f"开始分析...\n") # 按前缀分类 prefix_stats = defaultdict(lambda: { 'count': 0, 'memory': 0, 'keys': [], 'sample_keys': [] # 采样一些 key 用于显示 }) # 统计每个前缀 processed = 0 for key in all_keys: prefix = get_key_prefix(key) prefix_stats[prefix]['count'] += 1 prefix_stats[prefix]['keys'].append(key) # 采样前 5 个 key if len(prefix_stats[prefix]['sample_keys']) < 5: prefix_stats[prefix]['sample_keys'].append(key) processed += 1 if processed % 1000 == 0: print(f" 已处理: {processed:,} / {total_keys:,} ({processed*100//total_keys}%)") print(f" 完成: {processed:,} / {total_keys:,}\n") # 计算每个前缀的内存占用量 print("=" * 60) print("计算内存占用量...") print("=" * 60) prefix_memory = {} for prefix, stats in prefix_stats.items(): print(f" 计算 {prefix}:* 的内存...") total_memory = 0 sample_count = min(100, stats['count']) # 采样前 100 个 # 如果数量较少,全部计算;否则采样计算 if stats['count'] <= 100: keys_to_check = stats['keys'] else: # 采样计算 import random keys_to_check = random.sample(stats['keys'], sample_count) for key in keys_to_check: memory = get_key_memory_usage(client, key) total_memory += memory # 如果是采样,估算总内存 if stats['count'] > sample_count: avg_memory = total_memory / sample_count estimated_total = avg_memory * stats['count'] prefix_memory[prefix] = { 'memory': estimated_total, 'is_estimated': True, 'sample_count': sample_count } else: prefix_memory[prefix] = { 'memory': total_memory, 'is_estimated': False, 'sample_count': stats['count'] } # 显示统计结果 print("\n" + "=" * 60) print("缓存统计结果(按前缀分类)") print("=" * 60) # 按内存使用量排序 sorted_prefixes = sorted( prefix_stats.items(), key=lambda x: prefix_memory[x[0]]['memory'], reverse=True ) total_memory_all = sum(pm['memory'] for pm in prefix_memory.values()) print(f"{'前缀':<20} {'条目数':>12} {'内存占用量':>20} {'占比':>10} {'说明'}") print("-" * 80) for prefix, stats in sorted_prefixes: memory_info = prefix_memory[prefix] memory = memory_info['memory'] memory_str = format_bytes(memory) if memory_info['is_estimated']: memory_str += f" (估算, 采样 {memory_info['sample_count']})" percentage = (memory / total_memory_all * 100) if total_memory_all > 0 else 0 # 添加说明 description = "" if prefix == 'trans': description = "翻译缓存" elif prefix.startswith('embedding') or prefix.startswith('emb'): description = "向量化缓存" elif prefix.startswith('session') or prefix.startswith('user'): description = "会话/用户缓存" elif prefix.startswith('product') or prefix.startswith('item'): description = "商品缓存" else: description = "其他" print(f"{prefix:<20} {stats['count']:>12,} {memory_str:>30} {percentage:>9.1f}% {description}") print("-" * 80) print(f"{'总计':<20} {total_keys:>12,} {format_bytes(total_memory_all):>30} {'100.0':>9}%") # 显示详细信息 print("\n" + "=" * 60) print("详细信息(每个前缀的示例 key)") print("=" * 60) for prefix, stats in sorted_prefixes[:10]: # 只显示前 10 个 print(f"\n{prefix}:* ({stats['count']:,} 个 key)") print(f" 内存: {format_bytes(prefix_memory[prefix]['memory'])}") print(f" 示例 key:") for sample_key in stats['sample_keys'][:3]: ttl = client.ttl(sample_key) if ttl == -1: ttl_str = "无过期时间" elif ttl == -2: ttl_str = "已过期" else: ttl_str = f"{ttl/86400:.1f} 天" key_display = sample_key[:60] + "..." if len(sample_key) > 60 else sample_key print(f" - {key_display} (TTL: {ttl_str})") # 获取 Redis 总内存信息 print("\n" + "=" * 60) print("Redis 内存使用情况") print("=" * 60) try: info = client.info('memory') used_memory = info.get('used_memory', 0) used_memory_human = info.get('used_memory_human', '0B') maxmemory = info.get('maxmemory', 0) maxmemory_human = info.get('maxmemory_human', '0B') print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)") print(f"统计的缓存内存: {format_bytes(total_memory_all)}") print(f"内存占比: {(total_memory_all / used_memory * 100) if used_memory > 0 else 0:.1f}%") if maxmemory > 0: print(f"最大内存限制: {maxmemory_human} ({maxmemory:,} bytes)") usage_percent = (used_memory / maxmemory) * 100 print(f"内存使用率: {usage_percent:.2f}%") except Exception as e: print(f"获取内存信息失败: {e}") except Exception as e: print(f"❌ 分析失败: {e}") import traceback traceback.print_exc() def analyze_specific_prefixes(client, prefixes): """分析指定的前缀""" print("=" * 60) print(f"分析指定前缀: {', '.join(prefixes)}") print("=" * 60) for prefix in prefixes: pattern = f"{prefix}:*" keys = scan_all_keys(client, pattern=pattern) if not keys: print(f"\n{prefix}:* - 未找到 key") continue print(f"\n{prefix}:*") print(f" 条目数: {len(keys):,}") # 计算内存 total_memory = 0 sample_count = min(100, len(keys)) import random sample_keys = random.sample(keys, sample_count) if len(keys) > sample_count else keys for key in sample_keys: memory = get_key_memory_usage(client, key) total_memory += memory if len(keys) > sample_count: avg_memory = total_memory / sample_count estimated_total = avg_memory * len(keys) print(f" 内存占用量: {format_bytes(estimated_total)} (估算, 采样 {sample_count})") else: print(f" 内存占用量: {format_bytes(total_memory)}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='统计 Redis 缓存的条目数和内存占用量') parser.add_argument('--prefix', nargs='+', help='指定要分析的前缀(如: trans embedding)') parser.add_argument('--all', action='store_true', help='分析所有前缀(默认)') args = parser.parse_args() print("Redis 缓存统计工具") print("=" * 60) print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() try: client = get_redis_client() client.ping() print("✅ Redis 连接成功\n") except Exception as e: print(f"❌ Redis 连接失败: {e}") print(f"\n请检查:") print(f" - Host: {REDIS_CONFIG.get('host', 'localhost')}") print(f" - Port: {REDIS_CONFIG.get('port', 6479)}") print(f" - Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}") return if args.prefix: analyze_specific_prefixes(client, args.prefix) else: analyze_cache_by_prefix(client) print("\n" + "=" * 60) print("统计完成") print("=" * 60) if __name__ == "__main__": main()