diff --git a/offline_tasks/B2B_LOW_FREQUENCY_OPTIMIZATION.md b/offline_tasks/B2B_LOW_FREQUENCY_OPTIMIZATION.md new file mode 100644 index 0000000..7df6a62 --- /dev/null +++ b/offline_tasks/B2B_LOW_FREQUENCY_OPTIMIZATION.md @@ -0,0 +1,223 @@ +# B2B低频场景优化总结 + +## 📋 优化背景 + +B2B场景的特点: +- ✅ 用户行为频次**非常低** +- ✅ 行为间隔时间**可能很长** +- ✅ 历史行为**依然重要**,不应过度衰减 + +## ✅ 已完成的优化 + +### 1. **i2i_session_w2v.py** - Session分割策略优化 + +#### 修改前(基于时间间隔) +```python +# 问题:B2B场景下,用户可能几个月才有一次行为 +# 基于30分钟间隔分割session不合适 +session_gap_minutes = 30 +if (current_time - last_time).total_seconds() / 60 > session_gap_minutes: + # 新session +``` + +#### 修改后(固定长度分块) +```python +# 参考: graphembedding/session_w2v/prepare_data.py +# 按固定长度分块,不考虑时间间隔 +max_session_length = 50 # 最大会话长度 +min_session_length = 2 # 最小会话长度 + +# 按用户行为序列分块 +user_sessions = [ + item_sequence[i:i + max_session_length] + for i in range(0, len(item_sequence), max_session_length) +] +``` + +**优势:** +- 不依赖时间间隔,适合低频场景 +- 逻辑简化,性能更好 +- 保留用户行为的顺序信息 + +**新增参数:** +```bash +--max_session_length 50 # 最大会话长度 +--min_session_length 2 # 最小会话长度(过滤太短的序列) +``` + +--- + +### 2. **i2i_swing.py** - 关闭时间衰减 + +#### 修改前 +```python +--time_decay # 默认True,开启时间衰减 +# 30天前权重: 0.95 +# 60天前权重: 0.90 +# 180天前权重: 0.74 (衰减过快) +``` + +#### 修改后 +```python +--time_decay # 默认False,关闭时间衰减 +# 所有历史行为权重相同,更适合低频场景 +``` + +**原因:** +在B2B低频场景下,6个月前的行为可能依然很有价值,不应该被大幅衰减。 + +--- + +### 3. **interest_aggregation.py** - 关闭时间衰减 + +#### 修改内容 +```python +# 热门商品索引 - 关闭时间衰减 +list_type_indices['hot'] = aggregate_by_dimensions( + df_hot, behavior_weights, time_decay=False # 改为False +) + +# 加购商品索引 - 关闭时间衰减 +list_type_indices['cart'] = aggregate_by_dimensions( + df_cart, behavior_weights, time_decay=False # 改为False +) + +# 全局索引 - 关闭时间衰减 +global_aggregations = aggregate_by_dimensions( + df, behavior_weights, time_decay=False # 改为False +) + +# 新品索引 - 本来就不用时间衰减(保持不变) +list_type_indices['new'] = aggregate_by_dimensions( + df_new, behavior_weights, time_decay=False +) +``` + +--- + +## 🚀 使用方法 + +### Session W2V(已自动优化) +```bash +# 使用新的固定长度分块策略 +python3 scripts/i2i_session_w2v.py \ + --lookback_days 365 \ + --max_session_length 50 \ + --min_session_length 2 \ + --top_n 50 \ + --debug +``` + +### Swing算法(默认已关闭时间衰减) +```bash +# 默认不使用时间衰减,适合B2B场景 +python3 scripts/i2i_swing.py \ + --lookback_days 365 \ + --top_n 50 \ + --debug + +# 如果需要开启时间衰减(不推荐) +python3 scripts/i2i_swing.py \ + --lookback_days 365 \ + --time_decay \ + --decay_factor 0.99 # 更缓慢的衰减 +``` + +### 兴趣聚合(已自动优化) +```bash +# 已默认关闭时间衰减 +python3 scripts/interest_aggregation.py \ + --lookback_days 365 \ + --top_n 100 \ + --debug +``` + +### 运行所有任务 +```bash +cd /home/tw/recommendation/offline_tasks + +# 使用更长的回溯天数,适合低频场景 +python3 run_all.py --lookback_days 365 --top_n 50 --debug +``` + +--- + +## 📊 其他索引(无需修改) + +### i2i_deepwalk.py +- ✅ 不涉及session分割 +- ✅ 不使用时间衰减 +- ✅ 无需修改 + +### i2i_content_similar.py +- ✅ 基于商品属性的相似度 +- ✅ 不涉及时间因素 +- ✅ 无需修改 + +--- + +## 💡 建议的配置调整 + +### 1. 增加回溯天数 +```python +# offline_tasks/config/offline_config.py +DEFAULT_LOOKBACK_DAYS = 365 # 从默认值改为365天 +``` + +### 2. 如果需要重新启用时间衰减(特殊场景) +```bash +# Swing算法 +python3 scripts/i2i_swing.py --time_decay --decay_factor 0.99 + +# 修改interest_aggregation.py中的hard-coded值 +# 将 time_decay=False 改回 time_decay=True +``` + +--- + +## ✨ 优化效果 + +### 预期改进 +1. **Session W2V**: + - 不再因为时间间隔过长而丢失用户行为序列 + - 能够更好地捕捉低频用户的行为模式 + +2. **Swing算法**: + - 历史行为不会因时间衰减而被低估 + - 在数据稀疏的情况下充分利用所有历史数据 + +3. **兴趣聚合**: + - 长期累积的用户偏好得到保留 + - 推荐结果更稳定 + +### 需要监控的指标 +- [ ] Session数量变化 +- [ ] 相似物品覆盖率 +- [ ] 推荐效果指标(CTR、转化率等) + +--- + +## 📝 回滚方法 + +如果需要恢复之前的逻辑: + +```bash +# 1. 恢复i2i_swing.py的时间衰减 +git diff offline_tasks/scripts/i2i_swing.py +# 将 default=False 改回 default=True + +# 2. 恢复interest_aggregation.py的时间衰减 +git diff offline_tasks/scripts/interest_aggregation.py +# 将所有 time_decay=False 改回 time_decay=True + +# 3. 恢复i2i_session_w2v.py的时间分割 +git diff offline_tasks/scripts/i2i_session_w2v.py +# 恢复基于时间间隔的session分割逻辑 +``` + +--- + +**修改时间**: 2025-10-16 +**适用场景**: B2B低频交易场景 +**核心原则**: 在低频场景下,所有历史数据都很宝贵,不要过度衰减 + diff --git a/offline_tasks/scripts/i2i_session_w2v.py b/offline_tasks/scripts/i2i_session_w2v.py index 3a71a4e..a14fcfd 100644 --- a/offline_tasks/scripts/i2i_session_w2v.py +++ b/offline_tasks/scripts/i2i_session_w2v.py @@ -25,9 +25,9 @@ from offline_tasks.scripts.debug_utils import ( ) -def prepare_session_data(df, session_gap_minutes=30, logger=None): +def prepare_session_data(df, max_session_length=50, min_session_length=2, logger=None): """ - 准备会话数据 + 准备会话数据 - 基于固定长度分块,适合B2B低频场景 Args: df: DataFrame with columns: user_id, item_id, create_time @@ -40,45 +40,39 @@ def prepare_session_data(df, session_gap_minutes=30, logger=None): sessions = [] if logger: - logger.debug(f"开始准备会话数据,会话间隔:{session_gap_minutes}分钟") + logger.debug(f"开始准备会话数据(固定长度分块):max_length={max_session_length}, min_length={min_session_length}") # 按用户和时间排序 df = df.sort_values(['user_id', 'create_time']) - # 按用户分组 + # 按用户分组,获取每个用户的行为序列 for user_id, user_df in df.groupby('user_id'): - user_sessions = [] - current_session = [] - last_time = None + # 获取用户的item序列 + item_sequence = user_df['item_id'].astype(str).tolist() - for _, row in user_df.iterrows(): - item_id = str(row['item_id']) - current_time = row['create_time'] - - # 判断是否需要开始新会话 - if last_time is None or (current_time - last_time).total_seconds() / 60 > session_gap_minutes: - if current_session: - user_sessions.append(current_session) - current_session = [item_id] - else: - current_session.append(item_id) - - last_time = current_time + # 如果序列太短,跳过 + if len(item_sequence) < min_session_length: + continue - # 添加最后一个会话 - if current_session: - user_sessions.append(current_session) + # 按最大长度分块(不重叠) + user_sessions = [ + item_sequence[i:i + max_session_length] + for i in range(0, len(item_sequence), max_session_length) + ] + + # 过滤掉长度不足的最后一块 + user_sessions = [s for s in user_sessions if len(s) >= min_session_length] sessions.extend(user_sessions) - # 过滤掉长度小于2的会话 - sessions = [s for s in sessions if len(s) >= 2] - if logger: - session_lengths = [len(s) for s in sessions] - logger.debug(f"生成 {len(sessions)} 个会话") - logger.debug(f"会话长度统计:最小={min(session_lengths)}, 最大={max(session_lengths)}, " - f"平均={sum(session_lengths)/len(session_lengths):.2f}") + if sessions: + session_lengths = [len(s) for s in sessions] + logger.debug(f"生成 {len(sessions)} 个会话") + logger.debug(f"会话长度统计:最小={min(session_lengths)}, 最大={max(session_lengths)}, " + f"平均={sum(session_lengths)/len(session_lengths):.2f}") + else: + logger.warning("未生成任何会话!") return sessions @@ -166,8 +160,10 @@ def main(): help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})') - parser.add_argument('--session_gap', type=int, default=30, - help='Session gap in minutes') + parser.add_argument('--max_session_length', type=int, default=50, + help='Maximum session length for chunking (default: 50)') + parser.add_argument('--min_session_length', type=int, default=2, + help='Minimum session length to keep (default: 2)') parser.add_argument('--output', type=str, default=None, help='Output file path') parser.add_argument('--save_model', action='store_true', @@ -241,7 +237,12 @@ def main(): # 准备会话数据 log_processing_step(logger, "准备会话数据") - sessions = prepare_session_data(df, session_gap_minutes=args.session_gap, logger=logger) + sessions = prepare_session_data( + df, + max_session_length=args.max_session_length, + min_session_length=args.min_session_length, + logger=logger + ) logger.info(f"生成 {len(sessions)} 个会话") # 训练Word2Vec模型 diff --git a/offline_tasks/scripts/i2i_swing.py b/offline_tasks/scripts/i2i_swing.py index 782214d..78ee956 100644 --- a/offline_tasks/scripts/i2i_swing.py +++ b/offline_tasks/scripts/i2i_swing.py @@ -197,8 +197,8 @@ def main(): help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})') - parser.add_argument('--time_decay', action='store_true', default=True, - help='Use time decay for behavior weights') + parser.add_argument('--time_decay', action='store_true', default=False, + help='Use time decay for behavior weights (default: False for B2B low-frequency scenarios)') parser.add_argument('--decay_factor', type=float, default=0.95, help='Time decay factor') parser.add_argument('--output', type=str, default=None, diff --git a/offline_tasks/scripts/interest_aggregation.py b/offline_tasks/scripts/interest_aggregation.py index bfc3abf..0920e1a 100644 --- a/offline_tasks/scripts/interest_aggregation.py +++ b/offline_tasks/scripts/interest_aggregation.py @@ -163,15 +163,17 @@ def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights): # 热门商品索引 if not df_hot.empty: print("Generating hot item indices...") + # B2B低频场景,不使用时间衰减 list_type_indices['hot'] = aggregate_by_dimensions( - df_hot, behavior_weights, time_decay=True + df_hot, behavior_weights, time_decay=False ) # 加购商品索引 if not df_cart.empty: print("Generating cart item indices...") + # B2B低频场景,不使用时间衰减 list_type_indices['cart'] = aggregate_by_dimensions( - df_cart, behavior_weights, time_decay=True + df_cart, behavior_weights, time_decay=False ) # 新品索引 @@ -368,8 +370,9 @@ def main(): # 生成全局索引(所有数据) log_processing_step(logger, "生成全局索引") + # B2B低频场景,不使用时间衰减 global_aggregations = aggregate_by_dimensions( - df, behavior_weights, time_decay=True, decay_factor=args.decay_factor + df, behavior_weights, time_decay=False, decay_factor=args.decay_factor ) logger.info("保存全局索引...") output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n) -- libgit2 0.21.2