# Redis数据灌入规范 ## 📋 数据灌入概述 将离线生成的推荐索引加载到Redis,供在线系统实时查询使用。 ## 🔑 Redis Key规范 ### 通用规则 ``` {namespace}:{function}:{algorithm}:{identifier} ``` - `namespace`: 业务命名空间(item, user, interest等) - `function`: 功能类型(similar, feature, hot等) - `algorithm`: 算法名称(swing, w2v, deepwalk等) - `identifier`: 具体标识(item_id, dimension_key等) ## 📊 数据灌入规范表 | 模块名称 | 源数据地址 | 格式描述 | RedisKey模板 | RedisValue格式 | TTL | |---------|-----------|---------|-------------|---------------|-----| | **i2i_swing_cpp** | `offline_tasks/collaboration/output/swing_similar.txt` | `item_id\tsimilar_id1:score1,...` | `item:similar:swing_cpp:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 7天 | | **i2i_swing** | `output/i2i_swing_YYYYMMDD.txt` | `item_id\titem_name\tsimilar_id1:score1,...` | `item:similar:swing:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 7天 | | **i2i_session_w2v** | `output/i2i_session_w2v_YYYYMMDD.txt` | `item_id\titem_name\tsimilar_id1:score1,...` | `item:similar:w2v:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 7天 | | **i2i_deepwalk** | `output/i2i_deepwalk_YYYYMMDD.txt` | `item_id\titem_name\tsimilar_id1:score1,...` | `item:similar:deepwalk:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 7天 | | **i2i_content_name** | `output/i2i_content_name_YYYYMMDD.txt` | `item_id\titem_name\tsimilar_id1:score1,...` | `item:similar:content_name:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 30天 | | **i2i_content_pic** | `output/i2i_content_pic_YYYYMMDD.txt` | `item_id\titem_name\tsimilar_id1:score1,...` | `item:similar:content_pic:{item_id}` | `[[similar_id1,score1],[similar_id2,score2],...]` | 30天 | | **interest_hot** | `output/interest_aggregation_hot_YYYYMMDD.txt` | `dimension_key\titem_id1,item_id2,...` | `interest:hot:{dimension_key}` | `[item_id1,item_id2,item_id3,...]` | 3天 | | **interest_cart** | `output/interest_aggregation_cart_YYYYMMDD.txt` | `dimension_key\titem_id1,item_id2,...` | `interest:cart:{dimension_key}` | `[item_id1,item_id2,item_id3,...]` | 3天 | | **interest_new** | `output/interest_aggregation_new_YYYYMMDD.txt` | `dimension_key\titem_id1,item_id2,...` | `interest:new:{dimension_key}` | `[item_id1,item_id2,item_id3,...]` | 3天 | | **interest_global** | `output/interest_aggregation_global_YYYYMMDD.txt` | `dimension_key\titem_id1,item_id2,...` | `interest:global:{dimension_key}` | `[item_id1,item_id2,item_id3,...]` | 7天 | ## 📝 详细说明 ### 1. i2i相似度索引 #### 1.1 C++ Swing算法(高性能版本) **源数据格式** ``` 3600052 2704531:0.00431593,2503886:0.00431593,3371410:0.00431593,3186572:0.00431593 ``` **Redis存储** **Key**: `item:similar:swing_cpp:3600052` **Value** (JSON格式): ```json [[2704531, 0.00431593], [2503886, 0.00431593], [3371410, 0.00431593], [3186572, 0.00431593]] ``` **Value** (序列化后): ```python import json value = json.dumps([[2704531, 0.00431593], [2503886, 0.00431593], [3371410, 0.00431593], [3186572, 0.00431593]]) # 存储: "[[2704531,0.00431593],[2503886,0.00431593],[3371410,0.00431593],[3186572,0.00431593]]" ``` **特点**: - 原始Swing分数(未归一化) - 高性能C++计算 - 适合大规模数据 #### 1.2 Python Swing算法(标准版本) **源数据格式** ``` 12345 香蕉干 67890:0.8567,11223:0.7234,44556:0.6891 ``` **Redis存储** **Key**: `item:similar:swing:12345` **Value** (JSON格式): ```json [[67890, 0.8567], [11223, 0.7234], [44556, 0.6891]] ``` **Value** (序列化后): ```python import json value = json.dumps([[67890, 0.8567], [11223, 0.7234], [44556, 0.6891]]) # 存储: "[[67890,0.8567],[11223,0.7234],[44556,0.6891]]" ``` **特点**: - 归一化分数(0-1区间) - 支持时间衰减和日期维度 - 便于调试 #### 1.3 查询示例 ```python import redis import json r = redis.Redis(host='localhost', port=6379, db=0) # 方式1: 获取C++ Swing结果(生产推荐) similar_items_cpp = json.loads(r.get('item:similar:swing_cpp:3600052')) # 返回: [[2704531, 0.00431593], [2503886, 0.00431593], ...] # 方式2: 获取Python Swing结果(开发测试) similar_items = json.loads(r.get('item:similar:swing:12345')) # 返回: [[67890, 0.8567], [11223, 0.7234], [44556, 0.6891]] # 获取Top5相似商品 top_5 = similar_items[:5] # 多算法融合(可选) swing_cpp = json.loads(r.get('item:similar:swing_cpp:3600052') or '[]') swing_py = json.loads(r.get('item:similar:swing:3600052') or '[]') w2v = json.loads(r.get('item:similar:w2v:3600052') or '[]') # 融合多个算法结果... ``` ### 2. 兴趣点聚合索引 #### 源数据格式 ``` platform:pc 12345,67890,11223,44556,22334 category_level2:200 67890,12345,22334,55667,11223 ``` #### Redis存储 **Key**: `interest:hot:platform:pc` **Value** (JSON格式): ```json [12345, 67890, 11223, 44556, 22334] ``` **Value** (序列化后): ```python import json value = json.dumps([12345, 67890, 11223, 44556, 22334]) # 存储: "[12345,67890,11223,44556,22334]" ``` #### 查询示例 ```python import redis import json r = redis.Redis(host='localhost', port=6379, db=0) # 获取PC平台的热门商品 hot_items = json.loads(r.get('interest:hot:platform:pc')) # 返回: [12345, 67890, 11223, 44556, 22334] # 获取Top10热门商品 top_10 = hot_items[:10] ``` ## 🔄 数据加载流程 ### 1. 加载i2i索引 #### 1.1 加载C++ Swing索引(无商品名) ```python def load_cpp_swing_index(file_path, redis_client, expire_seconds=604800): """ 加载C++ Swing索引到Redis Args: file_path: 索引文件路径(collaboration/output/swing_similar.txt) redis_client: Redis客户端 expire_seconds: 过期时间(秒),默认7天 """ import json count = 0 with open(file_path, 'r', encoding='utf-8') as f: for line in f: parts = line.strip().split('\t') if len(parts) < 2: continue item_id = parts[0] similar_str = parts[1] # similar_id1:score1,similar_id2:score2,... # 解析相似商品 similar_items = [] for pair in similar_str.split(','): if ':' in pair: sim_id, score = pair.split(':') similar_items.append([int(sim_id), float(score)]) # 存储到Redis redis_key = f"item:similar:swing_cpp:{item_id}" redis_value = json.dumps(similar_items) redis_client.set(redis_key, redis_value) redis_client.expire(redis_key, expire_seconds) count += 1 return count ``` #### 1.2 加载Python i2i索引(含商品名) ```python def load_i2i_index(file_path, algorithm_name, redis_client, expire_seconds=604800): """ 加载Python i2i相似度索引到Redis Args: file_path: 索引文件路径 algorithm_name: 算法名称(swing, w2v, deepwalk, content) redis_client: Redis客户端 expire_seconds: 过期时间(秒),默认7天 """ import json count = 0 with open(file_path, 'r', encoding='utf-8') as f: for line in f: parts = line.strip().split('\t') if len(parts) < 3: continue item_id = parts[0] # item_name = parts[1] # 可选:如果需要缓存商品名 similar_str = parts[2] # similar_id1:score1,similar_id2:score2,... # 解析相似商品 similar_items = [] for pair in similar_str.split(','): if ':' in pair: sim_id, score = pair.split(':') similar_items.append([int(sim_id), float(score)]) # 存储到Redis redis_key = f"item:similar:{algorithm_name}:{item_id}" redis_value = json.dumps(similar_items) redis_client.set(redis_key, redis_value) redis_client.expire(redis_key, expire_seconds) count += 1 return count ``` ### 2. 加载兴趣聚合索引 ```python def load_interest_index(file_path, list_type, redis_client, expire_seconds=259200): """ 加载兴趣点聚合索引到Redis Args: file_path: 索引文件路径 list_type: 列表类型(hot, cart, new, global) redis_client: Redis客户端 expire_seconds: 过期时间(秒),默认3天 """ import json count = 0 with open(file_path, 'r', encoding='utf-8') as f: for line in f: parts = line.strip().split('\t') if len(parts) != 2: continue dimension_key = parts[0] # platform:pc item_ids_str = parts[1] # 12345,67890,11223,... # 解析商品ID列表 item_ids = [int(item_id) for item_id in item_ids_str.split(',')] # 存储到Redis redis_key = f"interest:{list_type}:{dimension_key}" redis_value = json.dumps(item_ids) redis_client.set(redis_key, redis_value) redis_client.expire(redis_key, expire_seconds) count += 1 return count ``` ## 🚀 快速加载命令 ### 加载所有索引 ```bash cd /home/tw/recommendation/offline_tasks # 加载所有索引(使用今天的数据,包括C++ Swing) python3 scripts/load_index_to_redis.py --redis-host localhost --redis-port 6379 # 加载指定日期的索引 python3 scripts/load_index_to_redis.py --date 20251016 --redis-host localhost # 只加载i2i索引(包括C++ Swing) python3 scripts/load_index_to_redis.py --load-i2i --redis-host localhost # 只加载C++ Swing索引 python3 scripts/load_index_to_redis.py \ --file collaboration/output/swing_similar.txt \ --algorithm swing_cpp \ --redis-host localhost # 只加载兴趣聚合索引 python3 scripts/load_index_to_redis.py --load-interest --redis-host localhost ``` ### 验证数据 ```bash # 连接Redis redis-cli # 检查key数量 DBSIZE # 查看某个商品的相似推荐(C++ Swing) GET item:similar:swing_cpp:3600052 # 查看某个商品的相似推荐(Python Swing) GET item:similar:swing:12345 # 查看平台热门商品 GET interest:hot:platform:pc # 查看所有i2i相关的key KEYS item:similar:* # 查看C++ Swing的key KEYS item:similar:swing_cpp:* # 查看Python Swing的key KEYS item:similar:swing:* # 查看所有interest相关的key KEYS interest:* # 检查key的过期时间 TTL item:similar:swing_cpp:3600052 TTL item:similar:swing:12345 ``` ## 📊 数据统计 ### Redis内存占用估算 | 索引类型 | Key数量 | 单条Value大小 | 总内存 | |---------|--------|-------------|--------| | i2i_swing_cpp | 50,000 | ~400B | ~20MB | | i2i_swing | 50,000 | ~500B | ~25MB | | i2i_w2v | 50,000 | ~500B | ~25MB | | i2i_deepwalk | 50,000 | ~500B | ~25MB | | i2i_content_name | 50,000 | ~500B | ~25MB | | i2i_content_pic | 50,000 | ~500B | ~25MB | | interest_hot | 10,000 | ~1KB | ~10MB | | interest_cart | 10,000 | ~1KB | ~10MB | | interest_new | 5,000 | ~1KB | ~5MB | | interest_global | 10,000 | ~1KB | ~10MB | | **总计** | **320,000** | - | **~180MB** | **说明**: - C++ Swing数据更紧凑(无商品名),单条大小约400B - 建议生产环境使用C++ Swing (`swing_cpp`),性能更优 - Python Swing可作为对照组或特殊场景使用 ### 过期策略 | 索引类型 | TTL | 原因 | |---------|-----|------| | i2i行为相似 | 7天 | 用户行为变化快,需要频繁更新 | | i2i内容相似 | 30天 | 商品属性变化慢,可以保留更久 | | 热门/加购 | 3天 | 热度变化快,需要及时更新 | | 新品 | 3天 | 新品概念有时效性 | | 全局热门 | 7天 | 相对稳定,可以保留更久 | ## ⚠️ 注意事项 1. **原子性**: 使用Pipeline批量写入,提高性能 2. **过期时间**: 合理设置TTL,避免过期数据 3. **内存管理**: 定期清理过期key,监控内存使用 4. **数据版本**: 使用日期标记,支持数据回滚 5. **容错处理**: 加载失败时不影响线上服务 6. **监控告警**: 监控加载成功率、Redis内存、查询延迟 ## 🔍 监控指标 ### 数据质量指标 ```python # 检查加载成功率 total_keys = redis_client.dbsize() expected_keys = 320000 # 更新:包含C++ Swing success_rate = total_keys / expected_keys * 100 # 检查数据完整性 sample_keys = [ 'item:similar:swing_cpp:3600052', # C++ Swing 'item:similar:swing:12345', # Python Swing 'item:similar:w2v:12345', 'interest:hot:platform:pc' ] for key in sample_keys: if not redis_client.exists(key): print(f"Missing key: {key}") # 检查C++ Swing vs Python Swing覆盖率 cpp_swing_count = len(redis_client.keys('item:similar:swing_cpp:*')) py_swing_count = len(redis_client.keys('item:similar:swing:*')) print(f"C++ Swing keys: {cpp_swing_count}") print(f"Python Swing keys: {py_swing_count}") ``` ### 性能指标 - 加载耗时: < 5分钟 - 内存占用: < 200MB - 查询延迟: < 1ms - 成功率: > 99% ## 🔗 相关文档 - **离线索引规范**: `OFFLINE_INDEX_SPEC.md` - **API接口文档**: `RECOMMENDATION_API.md` - **运维手册**: `OPERATIONS.md`