diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 0ce4514..c02103d 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -423,6 +423,7 @@ services: | 向量模块与 clip-as-service | [embeddings/README.md](../embeddings/README.md) | | TEI 服务专项(安装/部署/GPU-CPU 模式) | [TEI_SERVICE说明文档.md](./TEI_SERVICE说明文档.md) | | CN-CLIP 服务专项(环境/运维/GPU) | [CNCLIP_SERVICE说明文档.md](./CNCLIP_SERVICE说明文档.md) | +| 缓存 / Redis 使用与 key 设计 | [缓存与Redis使用说明.md](./缓存与Redis使用说明.md) | ### 10.2 仓库内入口 diff --git a/docs/缓存与Redis使用说明.md b/docs/缓存与Redis使用说明.md new file mode 100644 index 0000000..b2c1e2e --- /dev/null +++ b/docs/缓存与Redis使用说明.md @@ -0,0 +1,347 @@ +## 缓存与 Redis 使用说明 + +本仓库中 Redis 主要用于**性能型缓存**,当前落地的业务缓存包括: + +- **文本向量缓存**(embedding 缓存) +- **翻译结果缓存**(Qwen-MT 等机器翻译) +- **商品内容理解缓存**(锚文本 / 语义属性 / 标签) + +底层连接配置统一来自 `config/env_config.py` 的 `REDIS_CONFIG`: + +- **Host/Port**:`REDIS_HOST` / `REDIS_PORT`(默认 `localhost:6479`) +- **Password**:`REDIS_PASSWORD` +- **Socket & 超时**:`REDIS_SOCKET_TIMEOUT` / `REDIS_SOCKET_CONNECT_TIMEOUT` / `REDIS_RETRY_ON_TIMEOUT` +- **通用缓存 TTL**:`REDIS_CACHE_EXPIRE_DAYS`(默认 `360*2` 天,代码注释为 “6 months”) +- **翻译缓存 TTL & 前缀**:`REDIS_TRANSLATION_CACHE_EXPIRE_DAYS`、`REDIS_TRANSLATION_CACHE_PREFIX` + +--- + +## 1. 缓存总览表 + +| 模块 / 场景 | Key 模板 | Value 内容示例 | 过期策略 | 备注 | +|------------|----------|----------------|----------|------| +| 文本向量缓存(embedding) | `embedding:{language}:{norm_flag}:{query}` | `pickle.dumps(np.ndarray)`,如 1024 维 BGE 向量 | TTL=`REDIS_CONFIG["cache_expire_days"]` 天;访问时滑动过期 | 见 `embeddings/text_encoder.py` | +| 翻译结果缓存(Qwen-MT 翻译) | `{cache_prefix}:{model}:{src}:{tgt}:{sha256(payload)}` | 机翻后的单条字符串 | TTL=`services.translation.cache.ttl_seconds` 秒;可配置滑动过期 | 见 `query/qwen_mt_translate.py` + `config/config.yaml` | +| 商品内容理解缓存(anchors / 语义属性 / tags) | `{ANCHOR_CACHE_PREFIX}:{tenant_or_global}:{target_lang}:{md5(title)}` | `json.dumps(dict)`,包含 id/title/category/tags/anchor_text 等 | TTL=`ANCHOR_CACHE_EXPIRE_DAYS` 天 | 见 `indexer/product_enrich.py` | + +下面按模块详细说明。 + +--- + +## 2. 文本向量缓存(embeddings/text_encoder.py) + +- **代码位置**:`embeddings/text_encoder.py` 中 `TextEmbeddingEncoder` +- **用途**:缓存调用向量服务(6005)的文本向量结果,避免重复计算。 + +### 2.1 Key 设计 + +- 函数:`_get_cache_key(query: str, language: str, normalize_embeddings: bool) -> str` +- 模板: + +```text +embedding:{language}:{norm_flag}:{query} +``` + +- 字段说明: + - `language`:当前实现中统一传入 `"generic"`; + - `norm_flag`:`"norm1"` 表示归一化向量,`"norm0"` 表示未归一化; + - `query`:原始文本(未做哈希),注意长度特别长的 query 会直接出现在 key 中。 + +### 2.2 Value 与类型 + +- 类型:`pickle.dumps(np.ndarray)`,在读取时通过 `pickle.loads` 还原为 `np.ndarray`。 +- 典型示例:BGE-M3 1024 维 `float32` 向量。 + +### 2.3 过期策略 + +- 初始化: + - `self.expire_time = timedelta(days=REDIS_CONFIG.get("cache_expire_days", 180))` + - `.env` 中可通过 `REDIS_CACHE_EXPIRE_DAYS` 配置,默认 `360*2`(代码注释标注为 6 个月)。 +- 写入: + - `redis.setex(cache_key, self.expire_time, serialized_data)` +- 访问(滑动过期): + - 命中缓存后,会调用 `redis.expire(cache_key, self.expire_time)` 延长 TTL。 + +### 2.4 特殊处理 + +- 若缓存中的向量 **为空 / shape 异常 / 含 NaN/Inf**,会: + - 直接丢弃该缓存(并尝试 `delete` key); + - 回退为重新调用向量服务。 + +--- + +## 3. 翻译结果缓存(query/qwen_mt_translate.py) + +- **代码位置**:`query/qwen_mt_translate.py` 中 `Translator` 类 +- **用途**:缓存 Qwen-MT 翻译(及 translator service 复用的翻译)结果,减少云端请求,遵守限速。 +- **配置入口**:`config/config.yaml -> services.translation.cache`,统一由 `config/services_config.get_translation_cache_config()` 解析。 + +### 3.1 Key 设计 + +- 内部构造函数:`_build_cache_key(...)` +- 模板: + +```text +{cache_prefix}:{model}:{src}:{tgt}:{sha256(payload)} +``` + +其中: + +- `cache_prefix`:来自 `services.translation.cache.key_prefix`,默认 `trans:v2`; +- `model`:如 `"qwen-mt"`; +- `src`:源语言(如 `zh` / `en` / `auto`),是否包含在 key 中由 `key_include_source_lang` 控制; +- `tgt`:目标语言,如 `en` / `zh`; +- `sha256(payload)`:对以下内容整体做 SHA-256: + - `model` + - `src` / `tgt` + - `context`(受 `key_include_context` 控制) + - `prompt`(受 `key_include_prompt` 控制) + - 原始 `text` + +> 注意:所有 key 设计集中在 `_build_cache_key`,**不要在其他位置手动拼翻译缓存 key**。 + +### 3.2 Value 与类型 + +- 类型:**UTF-8 字符串**,即翻译后的文本结果。 +- 存取逻辑: + - 读取:`redis.get(key)` 返回 `str` 或 `None`; + - 写入:`redis.setex(key, expire_seconds, translation)`。 + +### 3.3 过期策略 + +- 配置来源:`config/config.yaml -> services.translation.cache`,经 `get_translation_cache_config()` 解析: + +```yaml +services: + translation: + cache: + enabled: true + key_prefix: "trans:v2" + ttl_seconds: 62208000 # 默认约 720 天 + sliding_expiration: true + key_include_context: true + key_include_prompt: true + key_include_source_lang: true +``` + +- 运行时行为: + - 创建 `Translator` 时,从 `cache_cfg` 读取: + - `self.cache_prefix` + - `self.expire_seconds` + - `self.cache_sliding_expiration` + - `self.cache_include_*` 一系列布尔开关; + - **读缓存**: + - 命中后,若 `sliding_expiration=True`,会调用 `redis.expire(key, expire_seconds)`; + - **写缓存**: + - 使用 `redis.setex(key, expire_seconds, translation)`。 + +### 3.4 关联模块 + +- `api/translator_app.py` 会通过 `query.qwen_mt_translate.Translator` 复用同一套缓存逻辑; +- 文档说明:`docs/翻译模块说明.md` 中提到“推荐通过 Redis 翻译缓存复用结果”。 + +--- + +## 4. 商品内容理解缓存(indexer/product_enrich.py) + +- **代码位置**:`indexer/product_enrich.py` +- **用途**:在生成商品锚文本(qanchors)、语义属性、标签等内容理解结果时复用缓存,避免对同一标题重复调用大模型。 + +### 4.1 Key 设计 + +- 配置项: + - `ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors")` + - `ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30))` +- Key 构造函数:`_make_anchor_cache_key(title, target_lang, tenant_id)` +- 模板: + +```text +{ANCHOR_CACHE_PREFIX}:{tenant_or_global}:{target_lang}:{md5(title)} +``` + +- 字段说明: + - `ANCHOR_CACHE_PREFIX`:默认 `"product_anchors"`,可通过 `.env` 中的 `REDIS_ANCHOR_CACHE_PREFIX`(若存在)间接配置到 `REDIS_CONFIG`; + - `tenant_or_global`:`tenant_id` 去空白后的字符串,若为空则使用 `"global"`; + - `target_lang`:内容理解输出语言,例如 `zh`; + - `md5(title)`:对原始商品标题(UTF-8)做 MD5。 + +### 4.2 Value 与类型 + +- 类型:`json.dumps(dict, ensure_ascii=False)`。 +- 典型结构(简化): + +```json +{ + "id": "123", + "lang": "zh", + "title_input": "原始标题", + "title": "归一化后的商品标题", + "category_path": "...", + "tags": "...", + "target_audience": "...", + "usage_scene": "...", + "anchor_text": "..., ..." +} +``` + +- 读取时通过 `json.loads(raw)` 还原为 `Dict[str, Any]`。 + +### 4.3 过期策略 + +- TTL:`ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600` 秒(默认 30 天); +- 写入:`redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))`; +- 读取:仅做 `redis.get(key)`,**不做滑动过期**。 + +### 4.4 调用流程中的位置 + +- 单条调用(索引阶段常见)时,`analyze_products()` 会先尝试命中缓存: + - 若命中,直接返回缓存结果; + - 若 miss,调用 LLM,解析结果后再写入缓存。 + +--- + +## 5. Redis 运维脚本工具 + +`scripts/redis/` 下提供三个脚本,用于查看缓存数量、内存占用与健康状态。连接配置均来自 `config/env_config.py` 的 `REDIS_CONFIG`,运行前需在项目根目录执行(或保证 `PYTHONPATH` 含项目根),以便加载配置。 + +### 5.1 redis_cache_health_check.py(缓存健康巡检) + +**功能**:按**业务缓存类型**(embedding / translation / anchors)做健康巡检,不扫全库。 + +- 对每类缓存:SCAN 匹配对应 key 前缀,统计**匹配 key 数量**(受 `--max-scan` 上限约束); +- **TTL 分布**:对采样 key 统计 `no-expire-or-expired` / `0-1h` / `1h-1d` / `1d-30d` / `>30d`; +- **近期活跃 key**:从采样中选出 `OBJECT IDLETIME <= 600s` 的 key,用于判断是否有新写入; +- **样本 key 与 value 预览**:对 embedding 显示 ndarray 信息,对 translation 显示译文片段,对 anchors 显示 JSON 摘要。 + +**适用场景**:日常查看三类缓存是否在增长、TTL 是否合理、是否有近期写入;与「缓存总览表」中的 key 设计一一对应。 + +**用法示例**: + +```bash +# 默认:检查 embedding / translation / anchors 三类 +python scripts/redis/redis_cache_health_check.py + +# 只检查某一类或两类 +python scripts/redis/redis_cache_health_check.py --type embedding +python scripts/redis/redis_cache_health_check.py --type translation anchors + +# 按自定义 pattern 检查(不按业务类型) +python scripts/redis/redis_cache_health_check.py --pattern "mycache:*" + +# 调整采样与扫描规模 +python scripts/redis/redis_cache_health_check.py --sample-size 100 --max-scan 50000 --db 0 +``` + +**常用参数**: + +| 参数 | 说明 | 默认 | +|------|------|------| +| `--type` | 缓存类型:`embedding` / `translation` / `anchors`,可多选 | 三类都检查 | +| `--pattern` | 自定义 key pattern(如 `mycache:*`),指定后忽略 `--type` | - | +| `--db` | Redis 数据库编号 | 0 | +| `--sample-size` | 每类采样的 key 数量 | 50 | +| `--max-scan` | 每类最多 SCAN 的 key 数量上限 | 20000 | + +--- + +### 5.2 redis_cache_prefix_stats.py(按前缀统计条数与内存) + +**功能**:**全局视角**,扫描当前 DB 下所有 key,按 key 的**前缀**(第一个冒号前)分类,统计每类 key 的**条数**与**内存占用量**(含占比),并输出每类示例 key 与 Redis 总内存信息。 + +- 使用 `SCAN` 扫全库,按前缀聚合; +- 内存优先用 Redis `MEMORY USAGE`,不可用时用 key+value 长度估算;key 过多时按 `--sample-size` 采样后按均值推算总内存; +- 输出:前缀、条数、内存及计算方式、占比、简要说明(如「翻译缓存」「向量化缓存」);末尾附 Redis 总内存与 maxmemory(若配置)。 + +**适用场景**:了解「哪一类前缀占了多少条、多少内存」,做容量规划或清理决策;支持多 DB(`--all-db`)或指定前缀(`--prefix`)缩小范围。 + +**用法示例**: + +```bash +# 默认 DB 0,全库按前缀统计 +python scripts/redis/redis_cache_prefix_stats.py + +# 统计所有有数据的 DB +python scripts/redis/redis_cache_prefix_stats.py --all-db + +# 指定 DB +python scripts/redis/redis_cache_prefix_stats.py --db 1 + +# 只统计指定前缀(可多个) +python scripts/redis/redis_cache_prefix_stats.py --prefix trans embedding product_anchors + +# 全 DB + 指定前缀 +python scripts/redis/redis_cache_prefix_stats.py --all-db --prefix trans embedding +``` + +**常用参数**: + +| 参数 | 说明 | 默认 | +|------|------|------| +| `--prefix` | 只统计这些前缀下的 key(如 `trans` `embedding`) | 全库按前缀统计 | +| `--db` | 数据库编号(0–15) | 0 | +| `--all-db` | 对所有有数据的 DB 分别执行 | 否 | +| `--sample-size` | 单前缀 key 过多时,用于内存采样的数量 | 100 | +| `--real` | 对单前缀 key 数 ≤10000 时计算全部 key 真实内存(较慢) | 否 | + +--- + +### 5.3 redis_memory_heavy_keys.py(大 key / 内存占用排查) + +**功能**:找出当前 DB 中**占用内存最多的 key**,并分析「按 key 估算的总内存」与「Redis 实际使用内存」的差异原因。 + +- 全库 `SCAN` 获取 key 列表,对 key 采样(默认最多 1000)调用 `MEMORY USAGE`(或估算)得到单 key 内存; +- 按内存排序,输出**占用最高的 N 个 key**(`--top`,默认 50); +- 输出:前缀分布、采样统计、估算总内存 vs 实际内存、差异说明(碎片、内部结构等);并检测超大 value(>1MB)、key 类型分布。 + +**适用场景**:内存异常升高时定位大 key;理解为何「按 key 加总」与 `used_memory` 不一致。 + +**用法示例**: + +```bash +# 显示占用内存最多的 50 个 key(默认) +python scripts/redis/redis_memory_heavy_keys.py + +# 显示前 100 个 +python scripts/redis/redis_memory_heavy_keys.py --top 100 +``` + +**常用参数**: + +| 参数 | 说明 | 默认 | +|------|------|------| +| `--top` | 显示内存占用最高的 N 个 key | 50 | + +--- + +### 5.4 三个脚本的选用建议 + +| 需求 | 推荐脚本 | +|------|----------| +| 看三类业务缓存(embedding/translation/anchors)的数量、TTL、近期写入、样本 value | `redis_cache_health_check.py` | +| 看全库或某前缀的 key 条数与内存占比 | `redis_cache_prefix_stats.py` | +| 找占用内存最多的大 key、分析内存差异 | `redis_memory_heavy_keys.py` | + +--- + +## 6. 其他 Redis 相关代码(测试与替身) + +除上述运维脚本外,以下代码使用 Redis 或其替身,但不定义新的业务缓存 key 协议: + +- **单元测试**:`tests/test_embedding_pipeline.py` 中的 `_FakeRedis` 用于 mock embedding 缓存;`tests/test_translator_failure_semantics.py` 中的 `_RecordingRedis` 用于验证翻译失败时不写缓存。 + +--- + +## 7. 添加新缓存时的建议 + +新增 Redis 缓存时,建议遵循以下约定: + +- **配置集中**: + - key 前缀、TTL 优先放在 `config/env_config.py`(通用)或 `config/config.yaml -> services..cache`; + - 避免在业务代码中硬编码 TTL。 +- **命名规范**: + - 统一使用 `prefix:维度1:维度2:...` 的扁平 key 结构; + - 对长文本/value 使用 `md5`/`sha256` 做哈希,避免过长 key。 +- **文档同步**: + - 新增缓存后,应在本文件中补充一行总览表 + 详细小节; + - 若缓存与外部系统/历史实现兼容(如 Java 侧翻译缓存),需在说明中显式标注。 + diff --git a/scripts/redis/check_cache_stats.py b/scripts/redis/check_cache_stats.py deleted file mode 100755 index 26175bc..0000000 --- a/scripts/redis/check_cache_stats.py +++ /dev/null @@ -1,549 +0,0 @@ -#!/usr/bin/env python3 -""" -统计各种缓存的条目数和内存占用量 - -按 key 前缀分类统计,帮助了解不同缓存的使用情况 - -使用方法: - -直接使用(默认数据库 0): -python scripts/redis/check_cache_stats.py - -统计所有数据库: -python scripts/redis/check_cache_stats.py --all-db - -统计指定数据库: -python scripts/redis/check_cache_stats.py --db 1 - -只统计以下三种前缀: -python scripts/redis/check_cache_stats.py --prefix trans embedding product - -统计所有数据库的指定前缀: -python scripts/redis/check_cache_stats.py --all-db --prefix trans embedding - - - - -其他简单的统计方法(不依赖本脚本,直接使用redis-cli命令,密码从 .env 的 REDIS_PASSWORD 读取): - -# 查看所有 key 的前缀分布(快速但不准确) -redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "*" | cut -d: -f1 | sort | uniq -c | sort -rn - -# 统计特定前缀的数量 -redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "trans:*" | wc -l -redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "embedding:*" | wc -l - -# 查看内存统计 ( Redis MEMORY STATS ) -redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning MEMORY STATS - -""" - -import redis -import os -import sys -from collections import defaultdict -from pathlib import Path -from datetime import datetime - -# 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) -project_root = Path(__file__).parent.parent.parent -sys.path.insert(0, str(project_root)) - -from config.env_config import REDIS_CONFIG - -def get_redis_client(db=0): - """获取 Redis 客户端""" - return redis.Redis( - host=REDIS_CONFIG.get('host', 'localhost'), - port=REDIS_CONFIG.get('port', 6479), - password=REDIS_CONFIG.get('password'), - db=db, - decode_responses=True, - socket_timeout=10, - socket_connect_timeout=10, - ) - -def get_key_prefix(key): - """提取 key 的前缀(第一个冒号之前的部分)""" - if ':' in key: - return key.split(':', 1)[0] - return key - -def format_bytes(bytes_size): - """格式化字节数为可读格式""" - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if bytes_size < 1024.0: - return f"{bytes_size:.2f} {unit}" - bytes_size /= 1024.0 - return f"{bytes_size:.2f} PB" - -def get_key_memory_usage(client, key, use_real_memory=True): - """ - 获取单个 key 的内存占用量(字节) - - Args: - client: Redis 客户端 - key: key 名称 - use_real_memory: 是否使用真实的 MEMORY USAGE 命令(True=真实,False=估算) - - Returns: - 内存占用量(字节) - """ - try: - if use_real_memory: - # 使用 MEMORY USAGE 命令(Redis 4.0+)- 这是真实的内存占用 - try: - memory = client.execute_command('MEMORY', 'USAGE', key) - return memory if memory else 0 - except: - # 如果 MEMORY USAGE 不可用,降级到估算方法 - pass - - # 估算方法(不够准确,但速度快) - # 获取 key 和 value 的大小 - key_size = len(key.encode('utf-8')) - - # 获取 value - value = client.get(key) - if value: - value_size = len(value.encode('utf-8')) - else: - # 尝试获取其他类型 - ttl = client.ttl(key) - if ttl == -2: # key 不存在 - return 0 - # 估算:key + 基础开销 - value_size = 0 - - # Redis 内存开销估算(粗略) - # key 对象开销: ~48 bytes - # value 对象开销: ~24 bytes - # 其他开销: ~100 bytes - # 注意:这个估算不准确,特别是对于复杂数据结构(hash、set、zset等) - overhead = 48 + 24 + 100 - return key_size + value_size + overhead - except Exception as e: - return 0 - -def scan_all_keys(client, pattern="*"): - """扫描所有匹配的 key""" - keys = [] - cursor = 0 - while True: - cursor, batch = client.scan(cursor, match=pattern, count=1000) - keys.extend(batch) - if cursor == 0: - break - return keys - -def analyze_cache_by_prefix(client, args=None, db_num=0): - """按前缀分析缓存""" - if args is None: - class Args: - real = False - sample_size = 100 - args = Args() - - # 显示当前数据库 - if db_num > 0: - print(f"\n{'='*60}") - print(f"数据库 {db_num}") - print(f"{'='*60}\n") - print("=" * 60) - print("扫描 Redis 中的所有 key...") - print("=" * 60) - - try: - # 扫描所有 key - all_keys = scan_all_keys(client) - total_keys = len(all_keys) - - print(f"总 key 数量: {total_keys:,}") - print(f"开始分析...\n") - - # 按前缀分类 - prefix_stats = defaultdict(lambda: { - 'count': 0, - 'memory': 0, - 'keys': [], - 'sample_keys': [] # 采样一些 key 用于显示 - }) - - # 统计每个前缀 - processed = 0 - for key in all_keys: - prefix = get_key_prefix(key) - prefix_stats[prefix]['count'] += 1 - prefix_stats[prefix]['keys'].append(key) - - # 采样前 5 个 key - if len(prefix_stats[prefix]['sample_keys']) < 5: - prefix_stats[prefix]['sample_keys'].append(key) - - processed += 1 - if processed % 1000 == 0: - print(f" 已处理: {processed:,} / {total_keys:,} ({processed*100//total_keys}%)") - - print(f" 完成: {processed:,} / {total_keys:,}\n") - - # 计算每个前缀的内存占用量 - print("=" * 60) - print("计算内存占用量...") - print("=" * 60) - print("注意:") - print(" - 如果 key 数量 > 100,会采样前 100 个进行估算") - print(" - 优先使用 Redis MEMORY USAGE 命令(真实值)") - print(" - 如果 MEMORY USAGE 不可用,会使用估算方法(不准确)") - print(" - 估算方法只计算 key+value 大小,不包括 Redis 内部数据结构开销") - print() - - # 测试是否支持 MEMORY USAGE - test_key = all_keys[0] if all_keys else None - supports_memory_usage = False - if test_key: - try: - client.execute_command('MEMORY', 'USAGE', test_key) - supports_memory_usage = True - print("✅ Redis 支持 MEMORY USAGE 命令,将使用真实内存值") - except: - print("⚠️ Redis 不支持 MEMORY USAGE 命令,将使用估算方法(可能不准确)") - print() - - prefix_memory = {} - for prefix, stats in prefix_stats.items(): - print(f" 计算 {prefix}:* 的内存...") - total_memory = 0 - # 如果指定了 --real,且数量不太大,计算全部 - if args.real and stats['count'] <= 10000: - sample_count = stats['count'] - else: - sample_count = min(args.sample_size, stats['count']) # 采样 - - # 如果数量较少,全部计算;否则采样计算 - if stats['count'] <= 100: - keys_to_check = stats['keys'] - is_sampled = False - else: - # 采样计算 - import random - keys_to_check = random.sample(stats['keys'], sample_count) - is_sampled = True - - for key in keys_to_check: - memory = get_key_memory_usage(client, key, use_real_memory=supports_memory_usage) - total_memory += memory - - # 如果是采样,估算总内存 - if is_sampled: - avg_memory = total_memory / sample_count - estimated_total = avg_memory * stats['count'] - prefix_memory[prefix] = { - 'memory': estimated_total, - 'is_estimated': True, - 'is_sampled': True, - 'sample_count': sample_count, - 'uses_real_memory': supports_memory_usage - } - else: - prefix_memory[prefix] = { - 'memory': total_memory, - 'is_estimated': False, - 'is_sampled': False, - 'sample_count': stats['count'], - 'uses_real_memory': supports_memory_usage - } - - # 显示统计结果 - print("\n" + "=" * 60) - print("缓存统计结果(按前缀分类)") - print("=" * 60) - - # 按内存使用量排序 - sorted_prefixes = sorted( - prefix_stats.items(), - key=lambda x: prefix_memory[x[0]]['memory'], - reverse=True - ) - - total_memory_all = sum(pm['memory'] for pm in prefix_memory.values()) - - print(f"{'前缀':<20} {'条目数':>12} {'内存占用量和计算方式':>50} {'占比':>10} {'说明'}") - print("-" * 120) - - for prefix, stats in sorted_prefixes: - memory_info = prefix_memory[prefix] - memory = memory_info['memory'] - - # 计算平均每条 key 的大小 - avg_memory_per_key = memory / stats['count'] if stats['count'] > 0 else 0 - avg_memory_str = format_bytes(avg_memory_per_key) - - # 标注内存计算方式和结果 - if memory_info['is_sampled']: - if memory_info['uses_real_memory']: - calc_method = f"采样估算(采样{memory_info['sample_count']}个, 使用真实MEMORY USAGE)" - else: - calc_method = f"采样估算(采样{memory_info['sample_count']}个, 估算方法)" - else: - if memory_info['uses_real_memory']: - calc_method = "真实值(全部计算, 使用MEMORY USAGE)" - else: - calc_method = "估算值(全部计算, 估算方法)" - - memory_str = f"{format_bytes(memory)} | 每条: {avg_memory_str} | {calc_method}" - - percentage = (memory / total_memory_all * 100) if total_memory_all > 0 else 0 - - # 添加说明 - description = "" - if prefix == 'trans': - description = "翻译缓存" - elif prefix.startswith('embedding') or prefix.startswith('emb'): - description = "向量化缓存" - elif prefix.startswith('session') or prefix.startswith('user'): - description = "会话/用户缓存" - elif prefix.startswith('product') or prefix.startswith('item'): - description = "商品缓存" - else: - description = "其他" - - # 格式化输出,内存信息可能很长,需要适当处理 - memory_display = memory_str[:70] + "..." if len(memory_str) > 70 else memory_str - print(f"{prefix:<20} {stats['count']:>12,} {memory_display:<70} {percentage:>9.1f}% {description}") - - print("-" * 120) - avg_total = total_memory_all / total_keys if total_keys > 0 else 0 - total_display = f"{format_bytes(total_memory_all)} | 每条: {format_bytes(avg_total)}" - print(f"{'总计':<20} {total_keys:>12,} {total_display:<70} {'100.0':>9}%") - - # 显示详细信息 - print("\n" + "=" * 60) - print("详细信息(每个前缀的示例 key)") - print("=" * 60) - - for prefix, stats in sorted_prefixes[:10]: # 只显示前 10 个 - mem_info = prefix_memory[prefix] - avg_per_key = mem_info['memory'] / stats['count'] if stats['count'] > 0 else 0 - - print(f"\n{prefix}:* ({stats['count']:,} 个 key)") - print(f" 总内存: {format_bytes(mem_info['memory'])}") - print(f" 每条 key 平均: {format_bytes(avg_per_key)}") - - # 显示计算方式 - if mem_info['is_sampled']: - if mem_info['uses_real_memory']: - print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用真实 MEMORY USAGE)") - else: - print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用估算方法)") - else: - if mem_info['uses_real_memory']: - print(f" 计算方式: 真实值(全部计算,使用 MEMORY USAGE)") - else: - print(f" 计算方式: 估算值(全部计算,使用估算方法)") - - print(f" 示例 key:") - for sample_key in stats['sample_keys'][:3]: - ttl = client.ttl(sample_key) - if ttl == -1: - ttl_str = "无过期时间" - elif ttl == -2: - ttl_str = "已过期" - else: - ttl_str = f"{ttl/86400:.1f} 天" - key_display = sample_key[:60] + "..." if len(sample_key) > 60 else sample_key - print(f" - {key_display} (TTL: {ttl_str})") - - # 获取 Redis 总内存信息 - print("\n" + "=" * 60) - print("Redis 内存使用情况") - print("=" * 60) - - try: - info = client.info('memory') - used_memory = info.get('used_memory', 0) - used_memory_human = info.get('used_memory_human', '0B') - maxmemory = info.get('maxmemory', 0) - maxmemory_human = info.get('maxmemory_human', '0B') - - print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)") - print(f"统计的缓存内存: {format_bytes(total_memory_all)}") - print(f"内存占比: {(total_memory_all / used_memory * 100) if used_memory > 0 else 0:.1f}%") - - if maxmemory > 0: - print(f"最大内存限制: {maxmemory_human} ({maxmemory:,} bytes)") - usage_percent = (used_memory / maxmemory) * 100 - print(f"内存使用率: {usage_percent:.2f}%") - except Exception as e: - print(f"获取内存信息失败: {e}") - - except Exception as e: - print(f"❌ 分析失败: {e}") - import traceback - traceback.print_exc() - -def analyze_specific_prefixes(client, prefixes, db_num=0): - """分析指定的前缀""" - print("=" * 60) - if db_num > 0: - print(f"数据库 {db_num} - 分析指定前缀: {', '.join(prefixes)}") - else: - print(f"分析指定前缀: {', '.join(prefixes)}") - print("=" * 60) - - for prefix in prefixes: - pattern = f"{prefix}:*" - keys = scan_all_keys(client, pattern=pattern) - - if not keys: - print(f"\n{prefix}:* - 未找到 key") - continue - - print(f"\n{prefix}:*") - print(f" 条目数: {len(keys):,}") - - # 计算内存 - total_memory = 0 - sample_count = min(100, len(keys)) - import random - sample_keys = random.sample(keys, sample_count) if len(keys) > sample_count else keys - - for key in sample_keys: - memory = get_key_memory_usage(client, key) - total_memory += memory - - if len(keys) > sample_count: - avg_memory = total_memory / sample_count - estimated_total = avg_memory * len(keys) - print(f" 内存占用量: {format_bytes(estimated_total)} (估算, 采样 {sample_count})") - else: - print(f" 内存占用量: {format_bytes(total_memory)}") - -def get_all_databases(): - """获取所有有数据的数据库列表""" - databases = [] - # Redis 默认有 16 个数据库(0-15) - for db_num in range(16): - try: - client = get_redis_client(db=db_num) - client.ping() - # 检查是否有 key - key_count = client.dbsize() - if key_count > 0: - databases.append(db_num) - except: - pass - return databases - -def analyze_all_databases(args): - """分析所有数据库""" - print("=" * 60) - print("扫描所有数据库...") - print("=" * 60) - - databases = get_all_databases() - - if not databases: - print("未找到有数据的数据库") - return - - print(f"发现 {len(databases)} 个有数据的数据库: {databases}\n") - - # 汇总统计 - total_stats_by_prefix = defaultdict(lambda: {'count': 0, 'memory': 0, 'dbs': []}) - total_keys_all_db = 0 - total_memory_all_db = 0 - - for db_num in databases: - try: - client = get_redis_client(db=db_num) - client.ping() - db_size = client.dbsize() - - print(f"\n{'='*60}") - print(f"数据库 {db_num} (共 {db_size:,} 个 key)") - print(f"{'='*60}") - - if args.prefix: - analyze_specific_prefixes(client, args.prefix, db_num=db_num) - else: - # 分析当前数据库 - analyze_cache_by_prefix(client, args, db_num=db_num) - - # 收集统计信息(简化版,只统计 key 数量) - total_keys_all_db += db_size - - except Exception as e: - print(f"❌ 数据库 {db_num} 分析失败: {e}") - import traceback - traceback.print_exc() - continue - - # 显示汇总统计 - if not args.prefix: - print("\n" + "=" * 60) - print("所有数据库汇总") - print("=" * 60) - print(f"有数据的数据库: {len(databases)} 个 ({', '.join(map(str, databases))})") - print(f"总 key 数量: {total_keys_all_db:,}") - print(f"\n提示: 要查看详细的内存统计,请分别运行每个数据库:") - for db_num in databases: - print(f" python scripts/redis/check_cache_stats.py --db {db_num}") - -def main(): - """主函数""" - import argparse - - parser = argparse.ArgumentParser(description='统计 Redis 缓存的条目数和内存占用量') - parser.add_argument('--prefix', nargs='+', help='指定要分析的前缀(如: trans embedding)') - parser.add_argument('--all', action='store_true', help='分析所有前缀(默认)') - parser.add_argument('--real', action='store_true', help='计算所有 key 的真实内存(很慢,但准确)') - parser.add_argument('--sample-size', type=int, default=100, help='采样大小(默认 100,仅当 key 数量 > 采样大小时使用)') - parser.add_argument('--db', type=int, help='指定数据库编号(0-15),默认只统计 db 0') - parser.add_argument('--all-db', action='store_true', help='统计所有数据库(0-15)') - - args = parser.parse_args() - - print("Redis 缓存统计工具") - print("=" * 60) - print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print() - - # 如果指定了 --all-db,分析所有数据库 - if args.all_db: - analyze_all_databases(args) - print("\n" + "=" * 60) - print("统计完成") - print("=" * 60) - return - - # 否则分析指定或默认的数据库 - db_num = args.db if args.db is not None else 0 - - try: - client = get_redis_client(db=db_num) - client.ping() - if db_num > 0: - print(f"✅ Redis 连接成功(数据库 {db_num})\n") - else: - print("✅ Redis 连接成功(默认数据库 0)\n") - except Exception as e: - print(f"❌ Redis 连接失败: {e}") - print(f"\n请检查:") - print(f" - Host: {REDIS_CONFIG.get('host', 'localhost')}") - print(f" - Port: {REDIS_CONFIG.get('port', 6479)}") - print(f" - Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}") - print(f" - Database: {db_num}") - return - - if args.prefix: - analyze_specific_prefixes(client, args.prefix, db_num=db_num) - else: - # 传递参数到分析函数 - analyze_cache_by_prefix(client, args, db_num=db_num) - - print("\n" + "=" * 60) - print("统计完成") - print("=" * 60) - -if __name__ == "__main__": - main() diff --git a/scripts/redis/find_memory_usage.py b/scripts/redis/find_memory_usage.py deleted file mode 100755 index c61a1e5..0000000 --- a/scripts/redis/find_memory_usage.py +++ /dev/null @@ -1,235 +0,0 @@ -#!/usr/bin/env python3 -""" -查找 Redis 中占用内存的主要 key - -分析为什么统计的缓存内存和总内存差异很大 -""" - -import redis -import os -import sys -from collections import defaultdict -from pathlib import Path -from datetime import datetime - -# 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) -project_root = Path(__file__).parent.parent.parent -sys.path.insert(0, str(project_root)) - -from config.env_config import REDIS_CONFIG - -def get_redis_client(): - """获取 Redis 客户端""" - return redis.Redis( - host=REDIS_CONFIG.get('host', 'localhost'), - port=REDIS_CONFIG.get('port', 6479), - password=REDIS_CONFIG.get('password'), - decode_responses=True, - socket_timeout=10, - socket_connect_timeout=10, - ) - -def format_bytes(bytes_size): - """格式化字节数为可读格式""" - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if bytes_size < 1024.0: - return f"{bytes_size:.2f} {unit}" - bytes_size /= 1024.0 - return f"{bytes_size:.2f} PB" - -def get_key_memory_usage(client, key): - """获取单个 key 的内存占用量(字节)""" - try: - # 使用 MEMORY USAGE 命令(Redis 4.0+) - try: - memory = client.execute_command('MEMORY', 'USAGE', key) - return memory if memory else 0 - except: - # 如果 MEMORY USAGE 不可用,使用估算方法 - key_size = len(key.encode('utf-8')) - value = client.get(key) - if value: - value_size = len(value.encode('utf-8')) - else: - value_size = 0 - overhead = 48 + 24 + 100 - return key_size + value_size + overhead - except Exception as e: - return 0 - -def analyze_all_keys(client, top_n=50): - """分析所有 key 的内存占用,找出占用最多的""" - print("=" * 60) - print("分析所有 key 的内存占用") - print("=" * 60) - - try: - # 获取总内存信息 - info = client.info('memory') - used_memory = info.get('used_memory', 0) - used_memory_human = info.get('used_memory_human', '0B') - - print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)\n") - - # 扫描所有 key - print("扫描所有 key...") - all_keys = [] - cursor = 0 - while True: - cursor, batch = client.scan(cursor, count=1000) - all_keys.extend(batch) - if cursor == 0: - break - - total_keys = len(all_keys) - print(f"总 key 数量: {total_keys:,}\n") - - # 分析 key 的命名模式 - print("分析 key 命名模式...") - no_prefix_count = 0 - prefix_patterns = defaultdict(int) - - for key in all_keys: - if ':' in key: - prefix = key.split(':', 1)[0] - prefix_patterns[prefix] += 1 - else: - no_prefix_count += 1 - - print(f" 无前缀的 key: {no_prefix_count:,}") - print(f" 有前缀的 key: {total_keys - no_prefix_count:,}") - print(f" 不同前缀数量: {len(prefix_patterns):,}\n") - - # 显示所有前缀 - print("所有前缀列表:") - sorted_prefixes = sorted(prefix_patterns.items(), key=lambda x: x[1], reverse=True) - for prefix, count in sorted_prefixes[:20]: - print(f" {prefix}:* - {count:,} 个 key") - if len(sorted_prefixes) > 20: - print(f" ... 还有 {len(sorted_prefixes) - 20} 个前缀") - - # 采样分析内存占用 - print(f"\n采样分析内存占用(采样前 {min(1000, total_keys)} 个 key)...") - - key_memories = [] - sample_size = min(1000, total_keys) - import random - sample_keys = random.sample(all_keys, sample_size) if total_keys > sample_size else all_keys - - processed = 0 - for key in sample_keys: - memory = get_key_memory_usage(client, key) - if memory > 0: - key_memories.append((key, memory)) - processed += 1 - if processed % 100 == 0: - print(f" 已处理: {processed}/{sample_size}") - - # 按内存排序 - key_memories.sort(key=lambda x: x[1], reverse=True) - - # 计算采样统计 - total_sample_memory = sum(mem for _, mem in key_memories) - avg_memory = total_sample_memory / len(key_memories) if key_memories else 0 - estimated_total_memory = avg_memory * total_keys - - print(f"\n采样统计:") - print(f" 采样 key 数量: {len(key_memories):,}") - print(f" 采样总内存: {format_bytes(total_sample_memory)}") - print(f" 平均每个 key 内存: {format_bytes(avg_memory)}") - print(f" 估算所有 key 总内存: {format_bytes(estimated_total_memory)}") - print(f" 实际 Redis 使用内存: {format_bytes(used_memory)}") - print(f" 差异: {format_bytes(used_memory - estimated_total_memory)}") - - # 显示占用内存最多的 key - print(f"\n占用内存最多的 {top_n} 个 key:") - print(f"{'排名':<6} {'内存':<15} {'Key'}") - print("-" * 80) - - for i, (key, memory) in enumerate(key_memories[:top_n], 1): - key_display = key[:60] + "..." if len(key) > 60 else key - print(f"{i:<6} {format_bytes(memory):<15} {key_display}") - - # 分析内存差异的原因 - print("\n" + "=" * 60) - print("内存差异分析") - print("=" * 60) - - difference = used_memory - estimated_total_memory - difference_percent = (difference / used_memory * 100) if used_memory > 0 else 0 - - print(f"实际内存: {format_bytes(used_memory)}") - print(f"估算 key 内存: {format_bytes(estimated_total_memory)}") - print(f"差异: {format_bytes(difference)} ({difference_percent:.1f}%)") - - print("\n可能的原因:") - print("1. Redis 内部数据结构开销(hash table、skiplist 等)") - print("2. 内存碎片") - print("3. Redis 进程本身的内存占用") - print("4. 其他数据结构(如 list、set、zset、hash)的内存开销更大") - print("5. 采样估算的误差") - - # 检查是否有大 value - print(f"\n检查是否有超大 value(> 1MB)...") - large_values = [] - for key, memory in key_memories[:100]: # 检查前 100 个最大的 - if memory > 1024 * 1024: # > 1MB - large_values.append((key, memory)) - - if large_values: - print(f"发现 {len(large_values)} 个超大 value (> 1MB):") - for key, memory in large_values[:10]: - key_display = key[:60] + "..." if len(key) > 60 else key - print(f" {format_bytes(memory):<15} {key_display}") - else: - print(" 未发现超大 value") - - # 检查 key 类型分布 - print(f"\n检查 key 类型分布(采样前 1000 个)...") - type_distribution = defaultdict(int) - for key in sample_keys[:1000]: - try: - key_type = client.type(key) - type_distribution[key_type] += 1 - except: - pass - - print("Key 类型分布:") - for key_type, count in sorted(type_distribution.items(), key=lambda x: x[1], reverse=True): - print(f" {key_type}: {count}") - - except Exception as e: - print(f"❌ 分析失败: {e}") - import traceback - traceback.print_exc() - -def main(): - """主函数""" - import argparse - - parser = argparse.ArgumentParser(description='查找 Redis 中占用内存的主要 key') - parser.add_argument('--top', type=int, default=50, help='显示占用内存最多的 N 个 key(默认 50)') - - args = parser.parse_args() - - print("Redis 内存占用分析工具") - print("=" * 60) - print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print() - - try: - client = get_redis_client() - client.ping() - print("✅ Redis 连接成功\n") - except Exception as e: - print(f"❌ Redis 连接失败: {e}") - return - - analyze_all_keys(client, top_n=args.top) - - print("\n" + "=" * 60) - print("分析完成") - print("=" * 60) - -if __name__ == "__main__": - main() diff --git a/scripts/redis/redis_cache_health_check.py b/scripts/redis/redis_cache_health_check.py new file mode 100644 index 0000000..6c8e293 --- /dev/null +++ b/scripts/redis/redis_cache_health_check.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 +""" +缓存状态巡检脚本 + +按「缓存类型」维度(embedding / translation / anchors)查看: +- 估算 key 数量 +- TTL 分布(采样) +- 近期活跃 key(按 IDLETIME 近似) +- 若干样本 key 及 value 概览 + +使用示例: + + # 默认:检查已知三类缓存,使用 env_config 中的 Redis 配置 + python scripts/redis/redis_cache_health_check.py + + # 只看某一类缓存 + python scripts/redis/redis_cache_health_check.py --type embedding + python scripts/redis/redis_cache_health_check.py --type translation anchors + + # 自定义前缀(pattern),不限定缓存类型 + python scripts/redis/redis_cache_health_check.py --pattern "mycache:*" + + # 调整采样规模 + python scripts/redis/redis_cache_health_check.py --sample-size 100 --max-scan 50000 +""" + +from __future__ import annotations + +import argparse +import json +import pickle +import sys +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple + +import redis +import numpy as np + +# 让脚本可以直接使用 config/env_config 与 services_config +PROJECT_ROOT = Path(__file__).parent.parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +from config.env_config import REDIS_CONFIG # type: ignore +from config.services_config import get_translation_cache_config # type: ignore + + +@dataclass +class CacheTypeConfig: + name: str + pattern: str + description: str + + +def _load_known_cache_types() -> Dict[str, CacheTypeConfig]: + """根据当前配置装配三种已知缓存类型及其前缀 pattern。""" + cache_types: Dict[str, CacheTypeConfig] = {} + + # embedding 缓存:固定 embedding:* 前缀 + cache_types["embedding"] = CacheTypeConfig( + name="embedding", + pattern="embedding:*", + description="文本向量缓存(embeddings/text_encoder.py)", + ) + + # translation 缓存:prefix 来自 services.translation.cache.key_prefix + cache_cfg = get_translation_cache_config() + trans_prefix = cache_cfg.get("key_prefix", "trans:v2") + cache_types["translation"] = CacheTypeConfig( + name="translation", + pattern=f"{trans_prefix}:*", + description="翻译结果缓存(query/qwen_mt_translate.Translator)", + ) + + # anchors 缓存:prefix 来自 REDIS_CONFIG['anchor_cache_prefix'](若存在),否则 product_anchors + anchor_prefix = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") + cache_types["anchors"] = CacheTypeConfig( + name="anchors", + pattern=f"{anchor_prefix}:*", + description="商品内容理解缓存(indexer/product_enrich.py,anchors/语义属性/tags)", + ) + + return cache_types + + +def get_redis_client(db: int = 0) -> redis.Redis: + return redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + db=db, + decode_responses=False, # 原始 bytes,方便区分 pickle / str + socket_timeout=10, + socket_connect_timeout=10, + ) + + +def scan_keys( + client: redis.Redis, pattern: str, max_scan: int, scan_count: int = 1000 +) -> List[bytes]: + """使用 SCAN 扫描匹配 pattern 的 key,最多扫描 max_scan 个结果。""" + keys: List[bytes] = [] + cursor: int = 0 + scanned = 0 + while True: + cursor, batch = client.scan(cursor=cursor, match=pattern, count=scan_count) + for k in batch: + keys.append(k) + scanned += 1 + if scanned >= max_scan: + return keys + if cursor == 0: + break + return keys + + +def ttl_bucket(ttl: int) -> str: + """将 TTL(秒)归类到简短区间标签中。""" + if ttl < 0: + # -1: 永不过期;-2: 不存在 + return "no-expire-or-expired" + if ttl <= 3600: + return "0-1h" + if ttl <= 86400: + return "1h-1d" + if ttl <= 30 * 86400: + return "1d-30d" + return ">30d" + + +def format_seconds(sec: int) -> str: + if sec < 0: + return str(sec) + if sec < 60: + return f"{sec}s" + if sec < 3600: + return f"{sec // 60}m{sec % 60}s" + if sec < 86400: + h = sec // 3600 + m = (sec % 3600) // 60 + return f"{h}h{m}m" + d = sec // 86400 + h = (sec % 86400) // 3600 + return f"{d}d{h}h" + + +def decode_value_preview( + cache_type: str, key: bytes, raw_value: Optional[bytes] +) -> str: + """根据缓存类型生成简短的 value 概览字符串。""" + if raw_value is None: + return "" + + # embedding: pickle 序列化的 numpy.ndarray + if cache_type == "embedding": + try: + arr = pickle.loads(raw_value) + if isinstance(arr, np.ndarray): + return f"ndarray shape={arr.shape} dtype={arr.dtype}" + return f"pickle object type={type(arr).__name__}" + except Exception: + return f"" + + # anchors: JSON dict + if cache_type == "anchors": + try: + text = raw_value.decode("utf-8", errors="replace") + obj = json.loads(text) + if isinstance(obj, dict): + brief = { + k: obj.get(k) + for k in ["id", "lang", "title_input", "title", "category_path", "anchor_text"] + if k in obj + } + return "json " + json.dumps(brief, ensure_ascii=False)[:200] + # 其他情况简单截断 + return "json " + text[:200] + except Exception: + return raw_value.decode("utf-8", errors="replace")[:200] + + # translation: 纯字符串 + if cache_type == "translation": + try: + text = raw_value.decode("utf-8", errors="replace") + return text[:200] + except Exception: + return f"" + + # 兜底:尝试解码为 UTF-8 + try: + text = raw_value.decode("utf-8", errors="replace") + return text[:200] + except Exception: + return f"" + + +def analyze_cache_type( + client: redis.Redis, + cache_type: str, + cfg: CacheTypeConfig, + sample_size: int, + max_scan: int, +) -> None: + """对单个缓存类型做统计与样本展示。""" + print("=" * 80) + print(f"Cache type: {cache_type} pattern={cfg.pattern}") + print(f"Description: {cfg.description}") + print("=" * 80) + + keys = scan_keys(client, cfg.pattern, max_scan=max_scan) + total_scanned = len(keys) + print(f"Scanned up to {max_scan} keys, matched {total_scanned} keys.") + if total_scanned == 0: + print("No keys found for this cache type.\n") + return + + # 采样 + if total_scanned <= sample_size: + sampled_keys = keys + else: + # 简单取前 sample_size 个,scan 本身已是渐进遍历,足够近似随机 + sampled_keys = keys[:sample_size] + + ttl_hist: Dict[str, int] = defaultdict(int) + recent_keys: List[Tuple[bytes, int, int]] = [] # (key, ttl, idletime) + samples: List[Tuple[bytes, int, int, str]] = [] # (key, ttl, idletime, preview) + + for k in sampled_keys: + try: + ttl = client.ttl(k) + except Exception: + ttl = -3 # 表示 TTL 查询失败 + + # TTL 分布 + ttl_hist[ttl_bucket(ttl)] += 1 + + # 近期活跃判断(idletime 越小越“新”) + idletime = -1 + try: + # OBJECT IDLETIME 返回秒数(整数) + idletime = client.object("idletime", k) # type: ignore[arg-type] + except Exception: + pass + + # 记录近期活跃样本 + if idletime >= 0 and idletime <= 600: + recent_keys.append((k, ttl, idletime)) + + # 收集样本 value 预览(控制数量) + if len(samples) < 5: + raw_val = None + try: + raw_val = client.get(k) + except Exception: + pass + preview = decode_value_preview(cache_type, k, raw_val) + samples.append((k, ttl, idletime, preview)) + + # TTL 分布输出 + print("\nTTL distribution (sampled):") + total_sampled = len(sampled_keys) + for bucket in ["no-expire-or-expired", "0-1h", "1h-1d", "1d-30d", ">30d"]: + cnt = ttl_hist.get(bucket, 0) + pct = (cnt / total_sampled * 100.0) if total_sampled else 0.0 + print(f" {bucket:<18}: {cnt:>4} ({pct:>5.1f}%)") + + # 近期活跃 key + recent_keys = sorted(recent_keys, key=lambda x: x[2])[:5] + print("\nRecent active keys (idletime <= 600s, from sampled set):") + if not recent_keys: + print(" (none in sampled set)") + else: + for k, ttl, idle in recent_keys: + try: + k_str = k.decode("utf-8", errors="replace") + except Exception: + k_str = repr(k) + if len(k_str) > 80: + k_str = k_str[:77] + "..." + print( + f" key={k_str} ttl={ttl} ({format_seconds(ttl)}) " + f"idletime={idle} ({format_seconds(idle)})" + ) + + # 样本 value 概览 + print("\nSample keys & value preview:") + if not samples: + print(" (no samples)") + else: + for k, ttl, idle, preview in samples: + try: + k_str = k.decode("utf-8", errors="replace") + except Exception: + k_str = repr(k) + if len(k_str) > 80: + k_str = k_str[:77] + "..." + print(f" key={k_str}") + print(f" ttl={ttl} ({format_seconds(ttl)}) idletime={idle} ({format_seconds(idle)})") + print(f" value: {preview}") + + print() # 结尾空行 + + +def main() -> None: + parser = argparse.ArgumentParser(description="Redis 缓存状态巡检(按缓存类型)") + parser.add_argument( + "--type", + dest="types", + nargs="+", + choices=["embedding", "translation", "anchors"], + help="指定要检查的缓存类型(默认:三种全部)", + ) + parser.add_argument( + "--pattern", + type=str, + help="自定义 key pattern(如 'mycache:*')。设置后将忽略 --type,仅按 pattern 进行一次检查。", + ) + parser.add_argument( + "--db", + type=int, + default=0, + help="Redis 数据库编号(默认 0)", + ) + parser.add_argument( + "--sample-size", + type=int, + default=50, + help="每种缓存类型采样的 key 数量(默认 50)", + ) + parser.add_argument( + "--max-scan", + type=int, + default=20000, + help="每种缓存类型最多 SCAN 的 key 数量上限(默认 20000)", + ) + + args = parser.parse_args() + + print("Redis 缓存状态巡检") + print("=" * 80) + print(f"Checked at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Redis host={REDIS_CONFIG.get('host', 'localhost')} port={REDIS_CONFIG.get('port', 6479)} db={args.db}") + print() + + try: + client = get_redis_client(db=args.db) + client.ping() + print("✅ Redis 连接成功\n") + except Exception as exc: + print(f"❌ Redis 连接失败: {exc}") + print(f" Host: {REDIS_CONFIG.get('host', 'localhost')}") + print(f" Port: {REDIS_CONFIG.get('port', 6479)}") + print(f" Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}") + return + + # 如果指定了自定义 pattern,则只做一次“匿名类型”的巡检 + if args.pattern: + anon_cfg = CacheTypeConfig( + name=f"pattern:{args.pattern}", + pattern=args.pattern, + description="自定义 pattern 检查", + ) + analyze_cache_type( + client=client, + cache_type="custom", + cfg=anon_cfg, + sample_size=args.sample_size, + max_scan=args.max_scan, + ) + print("巡检完成。") + return + + # 否则根据已知缓存类型巡检 + known_types = _load_known_cache_types() + types_to_check: Iterable[str] + if args.types: + types_to_check = args.types + else: + types_to_check = known_types.keys() + + for t in types_to_check: + cfg = known_types.get(t) + if not cfg: + print(f"⚠️ 未知缓存类型: {t},跳过") + continue + analyze_cache_type( + client=client, + cache_type=t, + cfg=cfg, + sample_size=args.sample_size, + max_scan=args.max_scan, + ) + + print("巡检完成。") + + +if __name__ == "__main__": + main() + diff --git a/scripts/redis/redis_cache_prefix_stats.py b/scripts/redis/redis_cache_prefix_stats.py new file mode 100755 index 0000000..a295dce --- /dev/null +++ b/scripts/redis/redis_cache_prefix_stats.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python3 +""" +统计各种缓存的条目数和内存占用量 + +按 key 前缀分类统计,帮助了解不同缓存的使用情况 + +使用方法: + +直接使用(默认数据库 0): +python scripts/redis/redis_cache_prefix_stats.py + +统计所有数据库: +python scripts/redis/redis_cache_prefix_stats.py --all-db + +统计指定数据库: +python scripts/redis/redis_cache_prefix_stats.py --db 1 + +只统计以下三种前缀: +python scripts/redis/redis_cache_prefix_stats.py --prefix trans embedding product + +统计所有数据库的指定前缀: +python scripts/redis/redis_cache_prefix_stats.py --all-db --prefix trans embedding + + + + +其他简单的统计方法(不依赖本脚本,直接使用redis-cli命令,密码从 .env 的 REDIS_PASSWORD 读取): + +# 查看所有 key 的前缀分布(快速但不准确) +redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "*" | cut -d: -f1 | sort | uniq -c | sort -rn + +# 统计特定前缀的数量 +redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "trans:*" | wc -l +redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning --scan --pattern "embedding:*" | wc -l + +# 查看内存统计 ( Redis MEMORY STATS ) +redis-cli -h $REDIS_HOST -p $REDIS_PORT -a '$REDIS_PASSWORD' --no-auth-warning MEMORY STATS + +""" + +import redis +import os +import sys +from collections import defaultdict +from pathlib import Path +from datetime import datetime + +# 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from config.env_config import REDIS_CONFIG + +def get_redis_client(db=0): + """获取 Redis 客户端""" + return redis.Redis( + host=REDIS_CONFIG.get('host', 'localhost'), + port=REDIS_CONFIG.get('port', 6479), + password=REDIS_CONFIG.get('password'), + db=db, + decode_responses=True, + socket_timeout=10, + socket_connect_timeout=10, + ) + +def get_key_prefix(key): + """提取 key 的前缀(第一个冒号之前的部分)""" + if ':' in key: + return key.split(':', 1)[0] + return key + +def format_bytes(bytes_size): + """格式化字节数为可读格式""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_size < 1024.0: + return f"{bytes_size:.2f} {unit}" + bytes_size /= 1024.0 + return f"{bytes_size:.2f} PB" + +def get_key_memory_usage(client, key, use_real_memory=True): + """ + 获取单个 key 的内存占用量(字节) + + Args: + client: Redis 客户端 + key: key 名称 + use_real_memory: 是否使用真实的 MEMORY USAGE 命令(True=真实,False=估算) + + Returns: + 内存占用量(字节) + """ + try: + if use_real_memory: + # 使用 MEMORY USAGE 命令(Redis 4.0+)- 这是真实的内存占用 + try: + memory = client.execute_command('MEMORY', 'USAGE', key) + return memory if memory else 0 + except: + # 如果 MEMORY USAGE 不可用,降级到估算方法 + pass + + # 估算方法(不够准确,但速度快) + # 获取 key 和 value 的大小 + key_size = len(key.encode('utf-8')) + + # 获取 value + value = client.get(key) + if value: + value_size = len(value.encode('utf-8')) + else: + # 尝试获取其他类型 + ttl = client.ttl(key) + if ttl == -2: # key 不存在 + return 0 + # 估算:key + 基础开销 + value_size = 0 + + # Redis 内存开销估算(粗略) + # key 对象开销: ~48 bytes + # value 对象开销: ~24 bytes + # 其他开销: ~100 bytes + # 注意:这个估算不准确,特别是对于复杂数据结构(hash、set、zset等) + overhead = 48 + 24 + 100 + return key_size + value_size + overhead + except Exception as e: + return 0 + +def scan_all_keys(client, pattern="*"): + """扫描所有匹配的 key""" + keys = [] + cursor = 0 + while True: + cursor, batch = client.scan(cursor, match=pattern, count=1000) + keys.extend(batch) + if cursor == 0: + break + return keys + +def analyze_cache_by_prefix(client, args=None, db_num=0): + """按前缀分析缓存""" + if args is None: + class Args: + real = False + sample_size = 100 + args = Args() + + # 显示当前数据库 + if db_num > 0: + print(f"\n{'='*60}") + print(f"数据库 {db_num}") + print(f"{'='*60}\n") + print("=" * 60) + print("扫描 Redis 中的所有 key...") + print("=" * 60) + + try: + # 扫描所有 key + all_keys = scan_all_keys(client) + total_keys = len(all_keys) + + print(f"总 key 数量: {total_keys:,}") + print(f"开始分析...\n") + + # 按前缀分类 + prefix_stats = defaultdict(lambda: { + 'count': 0, + 'memory': 0, + 'keys': [], + 'sample_keys': [] # 采样一些 key 用于显示 + }) + + # 统计每个前缀 + processed = 0 + for key in all_keys: + prefix = get_key_prefix(key) + prefix_stats[prefix]['count'] += 1 + prefix_stats[prefix]['keys'].append(key) + + # 采样前 5 个 key + if len(prefix_stats[prefix]['sample_keys']) < 5: + prefix_stats[prefix]['sample_keys'].append(key) + + processed += 1 + if processed % 1000 == 0: + print(f" 已处理: {processed:,} / {total_keys:,} ({processed*100//total_keys}%)") + + print(f" 完成: {processed:,} / {total_keys:,}\n") + + # 计算每个前缀的内存占用量 + print("=" * 60) + print("计算内存占用量...") + print("=" * 60) + print("注意:") + print(" - 如果 key 数量 > 100,会采样前 100 个进行估算") + print(" - 优先使用 Redis MEMORY USAGE 命令(真实值)") + print(" - 如果 MEMORY USAGE 不可用,会使用估算方法(不准确)") + print(" - 估算方法只计算 key+value 大小,不包括 Redis 内部数据结构开销") + print() + + # 测试是否支持 MEMORY USAGE + test_key = all_keys[0] if all_keys else None + supports_memory_usage = False + if test_key: + try: + client.execute_command('MEMORY', 'USAGE', test_key) + supports_memory_usage = True + print("✅ Redis 支持 MEMORY USAGE 命令,将使用真实内存值") + except: + print("⚠️ Redis 不支持 MEMORY USAGE 命令,将使用估算方法(可能不准确)") + print() + + prefix_memory = {} + for prefix, stats in prefix_stats.items(): + print(f" 计算 {prefix}:* 的内存...") + total_memory = 0 + # 如果指定了 --real,且数量不太大,计算全部 + if args.real and stats['count'] <= 10000: + sample_count = stats['count'] + else: + sample_count = min(args.sample_size, stats['count']) # 采样 + + # 如果数量较少,全部计算;否则采样计算 + if stats['count'] <= 100: + keys_to_check = stats['keys'] + is_sampled = False + else: + # 采样计算 + import random + keys_to_check = random.sample(stats['keys'], sample_count) + is_sampled = True + + for key in keys_to_check: + memory = get_key_memory_usage(client, key, use_real_memory=supports_memory_usage) + total_memory += memory + + # 如果是采样,估算总内存 + if is_sampled: + avg_memory = total_memory / sample_count + estimated_total = avg_memory * stats['count'] + prefix_memory[prefix] = { + 'memory': estimated_total, + 'is_estimated': True, + 'is_sampled': True, + 'sample_count': sample_count, + 'uses_real_memory': supports_memory_usage + } + else: + prefix_memory[prefix] = { + 'memory': total_memory, + 'is_estimated': False, + 'is_sampled': False, + 'sample_count': stats['count'], + 'uses_real_memory': supports_memory_usage + } + + # 显示统计结果 + print("\n" + "=" * 60) + print("缓存统计结果(按前缀分类)") + print("=" * 60) + + # 按内存使用量排序 + sorted_prefixes = sorted( + prefix_stats.items(), + key=lambda x: prefix_memory[x[0]]['memory'], + reverse=True + ) + + total_memory_all = sum(pm['memory'] for pm in prefix_memory.values()) + + print(f"{'前缀':<20} {'条目数':>12} {'内存占用量和计算方式':>50} {'占比':>10} {'说明'}") + print("-" * 120) + + for prefix, stats in sorted_prefixes: + memory_info = prefix_memory[prefix] + memory = memory_info['memory'] + + # 计算平均每条 key 的大小 + avg_memory_per_key = memory / stats['count'] if stats['count'] > 0 else 0 + avg_memory_str = format_bytes(avg_memory_per_key) + + # 标注内存计算方式和结果 + if memory_info['is_sampled']: + if memory_info['uses_real_memory']: + calc_method = f"采样估算(采样{memory_info['sample_count']}个, 使用真实MEMORY USAGE)" + else: + calc_method = f"采样估算(采样{memory_info['sample_count']}个, 估算方法)" + else: + if memory_info['uses_real_memory']: + calc_method = "真实值(全部计算, 使用MEMORY USAGE)" + else: + calc_method = "估算值(全部计算, 估算方法)" + + memory_str = f"{format_bytes(memory)} | 每条: {avg_memory_str} | {calc_method}" + + percentage = (memory / total_memory_all * 100) if total_memory_all > 0 else 0 + + # 添加说明 + description = "" + if prefix == 'trans': + description = "翻译缓存" + elif prefix.startswith('embedding') or prefix.startswith('emb'): + description = "向量化缓存" + elif prefix.startswith('session') or prefix.startswith('user'): + description = "会话/用户缓存" + elif prefix.startswith('product') or prefix.startswith('item'): + description = "商品缓存" + else: + description = "其他" + + # 格式化输出,内存信息可能很长,需要适当处理 + memory_display = memory_str[:70] + "..." if len(memory_str) > 70 else memory_str + print(f"{prefix:<20} {stats['count']:>12,} {memory_display:<70} {percentage:>9.1f}% {description}") + + print("-" * 120) + avg_total = total_memory_all / total_keys if total_keys > 0 else 0 + total_display = f"{format_bytes(total_memory_all)} | 每条: {format_bytes(avg_total)}" + print(f"{'总计':<20} {total_keys:>12,} {total_display:<70} {'100.0':>9}%") + + # 显示详细信息 + print("\n" + "=" * 60) + print("详细信息(每个前缀的示例 key)") + print("=" * 60) + + for prefix, stats in sorted_prefixes[:10]: # 只显示前 10 个 + mem_info = prefix_memory[prefix] + avg_per_key = mem_info['memory'] / stats['count'] if stats['count'] > 0 else 0 + + print(f"\n{prefix}:* ({stats['count']:,} 个 key)") + print(f" 总内存: {format_bytes(mem_info['memory'])}") + print(f" 每条 key 平均: {format_bytes(avg_per_key)}") + + # 显示计算方式 + if mem_info['is_sampled']: + if mem_info['uses_real_memory']: + print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用真实 MEMORY USAGE)") + else: + print(f" 计算方式: 采样估算(采样 {mem_info['sample_count']} 个,使用估算方法)") + else: + if mem_info['uses_real_memory']: + print(f" 计算方式: 真实值(全部计算,使用 MEMORY USAGE)") + else: + print(f" 计算方式: 估算值(全部计算,使用估算方法)") + + print(f" 示例 key:") + for sample_key in stats['sample_keys'][:3]: + ttl = client.ttl(sample_key) + if ttl == -1: + ttl_str = "无过期时间" + elif ttl == -2: + ttl_str = "已过期" + else: + ttl_str = f"{ttl/86400:.1f} 天" + key_display = sample_key[:60] + "..." if len(sample_key) > 60 else sample_key + print(f" - {key_display} (TTL: {ttl_str})") + + # 获取 Redis 总内存信息 + print("\n" + "=" * 60) + print("Redis 内存使用情况") + print("=" * 60) + + try: + info = client.info('memory') + used_memory = info.get('used_memory', 0) + used_memory_human = info.get('used_memory_human', '0B') + maxmemory = info.get('maxmemory', 0) + maxmemory_human = info.get('maxmemory_human', '0B') + + print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)") + print(f"统计的缓存内存: {format_bytes(total_memory_all)}") + print(f"内存占比: {(total_memory_all / used_memory * 100) if used_memory > 0 else 0:.1f}%") + + if maxmemory > 0: + print(f"最大内存限制: {maxmemory_human} ({maxmemory:,} bytes)") + usage_percent = (used_memory / maxmemory) * 100 + print(f"内存使用率: {usage_percent:.2f}%") + except Exception as e: + print(f"获取内存信息失败: {e}") + + except Exception as e: + print(f"❌ 分析失败: {e}") + import traceback + traceback.print_exc() + +def analyze_specific_prefixes(client, prefixes, db_num=0): + """分析指定的前缀""" + print("=" * 60) + if db_num > 0: + print(f"数据库 {db_num} - 分析指定前缀: {', '.join(prefixes)}") + else: + print(f"分析指定前缀: {', '.join(prefixes)}") + print("=" * 60) + + for prefix in prefixes: + pattern = f"{prefix}:*" + keys = scan_all_keys(client, pattern=pattern) + + if not keys: + print(f"\n{prefix}:* - 未找到 key") + continue + + print(f"\n{prefix}:*") + print(f" 条目数: {len(keys):,}") + + # 计算内存 + total_memory = 0 + sample_count = min(100, len(keys)) + import random + sample_keys = random.sample(keys, sample_count) if len(keys) > sample_count else keys + + for key in sample_keys: + memory = get_key_memory_usage(client, key) + total_memory += memory + + if len(keys) > sample_count: + avg_memory = total_memory / sample_count + estimated_total = avg_memory * len(keys) + print(f" 内存占用量: {format_bytes(estimated_total)} (估算, 采样 {sample_count})") + else: + print(f" 内存占用量: {format_bytes(total_memory)}") + +def get_all_databases(): + """获取所有有数据的数据库列表""" + databases = [] + # Redis 默认有 16 个数据库(0-15) + for db_num in range(16): + try: + client = get_redis_client(db=db_num) + client.ping() + # 检查是否有 key + key_count = client.dbsize() + if key_count > 0: + databases.append(db_num) + except: + pass + return databases + +def analyze_all_databases(args): + """分析所有数据库""" + print("=" * 60) + print("扫描所有数据库...") + print("=" * 60) + + databases = get_all_databases() + + if not databases: + print("未找到有数据的数据库") + return + + print(f"发现 {len(databases)} 个有数据的数据库: {databases}\n") + + # 汇总统计 + total_stats_by_prefix = defaultdict(lambda: {'count': 0, 'memory': 0, 'dbs': []}) + total_keys_all_db = 0 + total_memory_all_db = 0 + + for db_num in databases: + try: + client = get_redis_client(db=db_num) + client.ping() + db_size = client.dbsize() + + print(f"\n{'='*60}") + print(f"数据库 {db_num} (共 {db_size:,} 个 key)") + print(f"{'='*60}") + + if args.prefix: + analyze_specific_prefixes(client, args.prefix, db_num=db_num) + else: + # 分析当前数据库 + analyze_cache_by_prefix(client, args, db_num=db_num) + + # 收集统计信息(简化版,只统计 key 数量) + total_keys_all_db += db_size + + except Exception as e: + print(f"❌ 数据库 {db_num} 分析失败: {e}") + import traceback + traceback.print_exc() + continue + + # 显示汇总统计 + if not args.prefix: + print("\n" + "=" * 60) + print("所有数据库汇总") + print("=" * 60) + print(f"有数据的数据库: {len(databases)} 个 ({', '.join(map(str, databases))})") + print(f"总 key 数量: {total_keys_all_db:,}") + print(f"\n提示: 要查看详细的内存统计,请分别运行每个数据库:") + for db_num in databases: + print(f" python scripts/redis/redis_cache_prefix_stats.py --db {db_num}") + +def main(): + """主函数""" + import argparse + + parser = argparse.ArgumentParser(description='统计 Redis 缓存的条目数和内存占用量') + parser.add_argument('--prefix', nargs='+', help='指定要分析的前缀(如: trans embedding)') + parser.add_argument('--all', action='store_true', help='分析所有前缀(默认)') + parser.add_argument('--real', action='store_true', help='计算所有 key 的真实内存(很慢,但准确)') + parser.add_argument('--sample-size', type=int, default=100, help='采样大小(默认 100,仅当 key 数量 > 采样大小时使用)') + parser.add_argument('--db', type=int, help='指定数据库编号(0-15),默认只统计 db 0') + parser.add_argument('--all-db', action='store_true', help='统计所有数据库(0-15)') + + args = parser.parse_args() + + print("Redis 缓存统计工具") + print("=" * 60) + print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print() + + # 如果指定了 --all-db,分析所有数据库 + if args.all_db: + analyze_all_databases(args) + print("\n" + "=" * 60) + print("统计完成") + print("=" * 60) + return + + # 否则分析指定或默认的数据库 + db_num = args.db if args.db is not None else 0 + + try: + client = get_redis_client(db=db_num) + client.ping() + if db_num > 0: + print(f"✅ Redis 连接成功(数据库 {db_num})\n") + else: + print("✅ Redis 连接成功(默认数据库 0)\n") + except Exception as e: + print(f"❌ Redis 连接失败: {e}") + print(f"\n请检查:") + print(f" - Host: {REDIS_CONFIG.get('host', 'localhost')}") + print(f" - Port: {REDIS_CONFIG.get('port', 6479)}") + print(f" - Password: {'已配置' if REDIS_CONFIG.get('password') else '未配置'}") + print(f" - Database: {db_num}") + return + + if args.prefix: + analyze_specific_prefixes(client, args.prefix, db_num=db_num) + else: + # 传递参数到分析函数 + analyze_cache_by_prefix(client, args, db_num=db_num) + + print("\n" + "=" * 60) + print("统计完成") + print("=" * 60) + +if __name__ == "__main__": + main() diff --git a/scripts/redis/redis_memory_heavy_keys.py b/scripts/redis/redis_memory_heavy_keys.py new file mode 100755 index 0000000..c61a1e5 --- /dev/null +++ b/scripts/redis/redis_memory_heavy_keys.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +查找 Redis 中占用内存的主要 key + +分析为什么统计的缓存内存和总内存差异很大 +""" + +import redis +import os +import sys +from collections import defaultdict +from pathlib import Path +from datetime import datetime + +# 添加项目路径(文件在 scripts/redis/ 目录下,需要向上三级到项目根目录) +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from config.env_config import REDIS_CONFIG + +def get_redis_client(): + """获取 Redis 客户端""" + return redis.Redis( + host=REDIS_CONFIG.get('host', 'localhost'), + port=REDIS_CONFIG.get('port', 6479), + password=REDIS_CONFIG.get('password'), + decode_responses=True, + socket_timeout=10, + socket_connect_timeout=10, + ) + +def format_bytes(bytes_size): + """格式化字节数为可读格式""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_size < 1024.0: + return f"{bytes_size:.2f} {unit}" + bytes_size /= 1024.0 + return f"{bytes_size:.2f} PB" + +def get_key_memory_usage(client, key): + """获取单个 key 的内存占用量(字节)""" + try: + # 使用 MEMORY USAGE 命令(Redis 4.0+) + try: + memory = client.execute_command('MEMORY', 'USAGE', key) + return memory if memory else 0 + except: + # 如果 MEMORY USAGE 不可用,使用估算方法 + key_size = len(key.encode('utf-8')) + value = client.get(key) + if value: + value_size = len(value.encode('utf-8')) + else: + value_size = 0 + overhead = 48 + 24 + 100 + return key_size + value_size + overhead + except Exception as e: + return 0 + +def analyze_all_keys(client, top_n=50): + """分析所有 key 的内存占用,找出占用最多的""" + print("=" * 60) + print("分析所有 key 的内存占用") + print("=" * 60) + + try: + # 获取总内存信息 + info = client.info('memory') + used_memory = info.get('used_memory', 0) + used_memory_human = info.get('used_memory_human', '0B') + + print(f"Redis 总使用内存: {used_memory_human} ({used_memory:,} bytes)\n") + + # 扫描所有 key + print("扫描所有 key...") + all_keys = [] + cursor = 0 + while True: + cursor, batch = client.scan(cursor, count=1000) + all_keys.extend(batch) + if cursor == 0: + break + + total_keys = len(all_keys) + print(f"总 key 数量: {total_keys:,}\n") + + # 分析 key 的命名模式 + print("分析 key 命名模式...") + no_prefix_count = 0 + prefix_patterns = defaultdict(int) + + for key in all_keys: + if ':' in key: + prefix = key.split(':', 1)[0] + prefix_patterns[prefix] += 1 + else: + no_prefix_count += 1 + + print(f" 无前缀的 key: {no_prefix_count:,}") + print(f" 有前缀的 key: {total_keys - no_prefix_count:,}") + print(f" 不同前缀数量: {len(prefix_patterns):,}\n") + + # 显示所有前缀 + print("所有前缀列表:") + sorted_prefixes = sorted(prefix_patterns.items(), key=lambda x: x[1], reverse=True) + for prefix, count in sorted_prefixes[:20]: + print(f" {prefix}:* - {count:,} 个 key") + if len(sorted_prefixes) > 20: + print(f" ... 还有 {len(sorted_prefixes) - 20} 个前缀") + + # 采样分析内存占用 + print(f"\n采样分析内存占用(采样前 {min(1000, total_keys)} 个 key)...") + + key_memories = [] + sample_size = min(1000, total_keys) + import random + sample_keys = random.sample(all_keys, sample_size) if total_keys > sample_size else all_keys + + processed = 0 + for key in sample_keys: + memory = get_key_memory_usage(client, key) + if memory > 0: + key_memories.append((key, memory)) + processed += 1 + if processed % 100 == 0: + print(f" 已处理: {processed}/{sample_size}") + + # 按内存排序 + key_memories.sort(key=lambda x: x[1], reverse=True) + + # 计算采样统计 + total_sample_memory = sum(mem for _, mem in key_memories) + avg_memory = total_sample_memory / len(key_memories) if key_memories else 0 + estimated_total_memory = avg_memory * total_keys + + print(f"\n采样统计:") + print(f" 采样 key 数量: {len(key_memories):,}") + print(f" 采样总内存: {format_bytes(total_sample_memory)}") + print(f" 平均每个 key 内存: {format_bytes(avg_memory)}") + print(f" 估算所有 key 总内存: {format_bytes(estimated_total_memory)}") + print(f" 实际 Redis 使用内存: {format_bytes(used_memory)}") + print(f" 差异: {format_bytes(used_memory - estimated_total_memory)}") + + # 显示占用内存最多的 key + print(f"\n占用内存最多的 {top_n} 个 key:") + print(f"{'排名':<6} {'内存':<15} {'Key'}") + print("-" * 80) + + for i, (key, memory) in enumerate(key_memories[:top_n], 1): + key_display = key[:60] + "..." if len(key) > 60 else key + print(f"{i:<6} {format_bytes(memory):<15} {key_display}") + + # 分析内存差异的原因 + print("\n" + "=" * 60) + print("内存差异分析") + print("=" * 60) + + difference = used_memory - estimated_total_memory + difference_percent = (difference / used_memory * 100) if used_memory > 0 else 0 + + print(f"实际内存: {format_bytes(used_memory)}") + print(f"估算 key 内存: {format_bytes(estimated_total_memory)}") + print(f"差异: {format_bytes(difference)} ({difference_percent:.1f}%)") + + print("\n可能的原因:") + print("1. Redis 内部数据结构开销(hash table、skiplist 等)") + print("2. 内存碎片") + print("3. Redis 进程本身的内存占用") + print("4. 其他数据结构(如 list、set、zset、hash)的内存开销更大") + print("5. 采样估算的误差") + + # 检查是否有大 value + print(f"\n检查是否有超大 value(> 1MB)...") + large_values = [] + for key, memory in key_memories[:100]: # 检查前 100 个最大的 + if memory > 1024 * 1024: # > 1MB + large_values.append((key, memory)) + + if large_values: + print(f"发现 {len(large_values)} 个超大 value (> 1MB):") + for key, memory in large_values[:10]: + key_display = key[:60] + "..." if len(key) > 60 else key + print(f" {format_bytes(memory):<15} {key_display}") + else: + print(" 未发现超大 value") + + # 检查 key 类型分布 + print(f"\n检查 key 类型分布(采样前 1000 个)...") + type_distribution = defaultdict(int) + for key in sample_keys[:1000]: + try: + key_type = client.type(key) + type_distribution[key_type] += 1 + except: + pass + + print("Key 类型分布:") + for key_type, count in sorted(type_distribution.items(), key=lambda x: x[1], reverse=True): + print(f" {key_type}: {count}") + + except Exception as e: + print(f"❌ 分析失败: {e}") + import traceback + traceback.print_exc() + +def main(): + """主函数""" + import argparse + + parser = argparse.ArgumentParser(description='查找 Redis 中占用内存的主要 key') + parser.add_argument('--top', type=int, default=50, help='显示占用内存最多的 N 个 key(默认 50)') + + args = parser.parse_args() + + print("Redis 内存占用分析工具") + print("=" * 60) + print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print() + + try: + client = get_redis_client() + client.ping() + print("✅ Redis 连接成功\n") + except Exception as e: + print(f"❌ Redis 连接失败: {e}") + return + + analyze_all_keys(client, top_n=args.top) + + print("\n" + "=" * 60) + print("分析完成") + print("=" * 60) + +if __name__ == "__main__": + main() -- libgit2 0.21.2