Commit 9832fef6b395fdfb71045a634e87c4b270ece0b3

Authored by tangwang
1 parent 14f3dcbe

offline tasks

offline_tasks/B2B_LOW_FREQUENCY_OPTIMIZATION.md 0 → 100644
@@ -0,0 +1,223 @@ @@ -0,0 +1,223 @@
  1 +# B2B低频场景优化总结
  2 +
  3 +## 📋 优化背景
  4 +
  5 +B2B场景的特点:
  6 +- ✅ 用户行为频次**非常低**
  7 +- ✅ 行为间隔时间**可能很长**
  8 +- ✅ 历史行为**依然重要**,不应过度衰减
  9 +
  10 +## ✅ 已完成的优化
  11 +
  12 +### 1. **i2i_session_w2v.py** - Session分割策略优化
  13 +
  14 +#### 修改前(基于时间间隔)
  15 +```python
  16 +# 问题:B2B场景下,用户可能几个月才有一次行为
  17 +# 基于30分钟间隔分割session不合适
  18 +session_gap_minutes = 30
  19 +if (current_time - last_time).total_seconds() / 60 > session_gap_minutes:
  20 + # 新session
  21 +```
  22 +
  23 +#### 修改后(固定长度分块)
  24 +```python
  25 +# 参考: graphembedding/session_w2v/prepare_data.py
  26 +# 按固定长度分块,不考虑时间间隔
  27 +max_session_length = 50 # 最大会话长度
  28 +min_session_length = 2 # 最小会话长度
  29 +
  30 +# 按用户行为序列分块
  31 +user_sessions = [
  32 + item_sequence[i:i + max_session_length]
  33 + for i in range(0, len(item_sequence), max_session_length)
  34 +]
  35 +```
  36 +
  37 +**优势:**
  38 +- 不依赖时间间隔,适合低频场景
  39 +- 逻辑简化,性能更好
  40 +- 保留用户行为的顺序信息
  41 +
  42 +**新增参数:**
  43 +```bash
  44 +--max_session_length 50 # 最大会话长度
  45 +--min_session_length 2 # 最小会话长度(过滤太短的序列)
  46 +```
  47 +
  48 +---
  49 +
  50 +### 2. **i2i_swing.py** - 关闭时间衰减
  51 +
  52 +#### 修改前
  53 +```python
  54 +--time_decay # 默认True,开启时间衰减
  55 +# 30天前权重: 0.95
  56 +# 60天前权重: 0.90
  57 +# 180天前权重: 0.74 (衰减过快)
  58 +```
  59 +
  60 +#### 修改后
  61 +```python
  62 +--time_decay # 默认False,关闭时间衰减
  63 +# 所有历史行为权重相同,更适合低频场景
  64 +```
  65 +
  66 +**原因:**
  67 +在B2B低频场景下,6个月前的行为可能依然很有价值,不应该被大幅衰减。
  68 +
  69 +---
  70 +
  71 +### 3. **interest_aggregation.py** - 关闭时间衰减
  72 +
  73 +#### 修改内容
  74 +```python
  75 +# 热门商品索引 - 关闭时间衰减
  76 +list_type_indices['hot'] = aggregate_by_dimensions(
  77 + df_hot, behavior_weights, time_decay=False # 改为False
  78 +)
  79 +
  80 +# 加购商品索引 - 关闭时间衰减
  81 +list_type_indices['cart'] = aggregate_by_dimensions(
  82 + df_cart, behavior_weights, time_decay=False # 改为False
  83 +)
  84 +
  85 +# 全局索引 - 关闭时间衰减
  86 +global_aggregations = aggregate_by_dimensions(
  87 + df, behavior_weights, time_decay=False # 改为False
  88 +)
  89 +
  90 +# 新品索引 - 本来就不用时间衰减(保持不变)
  91 +list_type_indices['new'] = aggregate_by_dimensions(
  92 + df_new, behavior_weights, time_decay=False
  93 +)
  94 +```
  95 +
  96 +---
  97 +
  98 +## 🚀 使用方法
  99 +
  100 +### Session W2V(已自动优化)
  101 +```bash
  102 +# 使用新的固定长度分块策略
  103 +python3 scripts/i2i_session_w2v.py \
  104 + --lookback_days 365 \
  105 + --max_session_length 50 \
  106 + --min_session_length 2 \
  107 + --top_n 50 \
  108 + --debug
  109 +```
  110 +
  111 +### Swing算法(默认已关闭时间衰减)
  112 +```bash
  113 +# 默认不使用时间衰减,适合B2B场景
  114 +python3 scripts/i2i_swing.py \
  115 + --lookback_days 365 \
  116 + --top_n 50 \
  117 + --debug
  118 +
  119 +# 如果需要开启时间衰减(不推荐)
  120 +python3 scripts/i2i_swing.py \
  121 + --lookback_days 365 \
  122 + --time_decay \
  123 + --decay_factor 0.99 # 更缓慢的衰减
  124 +```
  125 +
  126 +### 兴趣聚合(已自动优化)
  127 +```bash
  128 +# 已默认关闭时间衰减
  129 +python3 scripts/interest_aggregation.py \
  130 + --lookback_days 365 \
  131 + --top_n 100 \
  132 + --debug
  133 +```
  134 +
  135 +### 运行所有任务
  136 +```bash
  137 +cd /home/tw/recommendation/offline_tasks
  138 +
  139 +# 使用更长的回溯天数,适合低频场景
  140 +python3 run_all.py --lookback_days 365 --top_n 50 --debug
  141 +```
  142 +
  143 +---
  144 +
  145 +## 📊 其他索引(无需修改)
  146 +
  147 +### i2i_deepwalk.py
  148 +- ✅ 不涉及session分割
  149 +- ✅ 不使用时间衰减
  150 +- ✅ 无需修改
  151 +
  152 +### i2i_content_similar.py
  153 +- ✅ 基于商品属性的相似度
  154 +- ✅ 不涉及时间因素
  155 +- ✅ 无需修改
  156 +
  157 +---
  158 +
  159 +## 💡 建议的配置调整
  160 +
  161 +### 1. 增加回溯天数
  162 +```python
  163 +# offline_tasks/config/offline_config.py
  164 +DEFAULT_LOOKBACK_DAYS = 365 # 从默认值改为365天
  165 +```
  166 +
  167 +### 2. 如果需要重新启用时间衰减(特殊场景)
  168 +```bash
  169 +# Swing算法
  170 +python3 scripts/i2i_swing.py --time_decay --decay_factor 0.99
  171 +
  172 +# 修改interest_aggregation.py中的hard-coded值
  173 +# 将 time_decay=False 改回 time_decay=True
  174 +```
  175 +
  176 +---
  177 +
  178 +## ✨ 优化效果
  179 +
  180 +### 预期改进
  181 +1. **Session W2V**:
  182 + - 不再因为时间间隔过长而丢失用户行为序列
  183 + - 能够更好地捕捉低频用户的行为模式
  184 +
  185 +2. **Swing算法**:
  186 + - 历史行为不会因时间衰减而被低估
  187 + - 在数据稀疏的情况下充分利用所有历史数据
  188 +
  189 +3. **兴趣聚合**:
  190 + - 长期累积的用户偏好得到保留
  191 + - 推荐结果更稳定
  192 +
  193 +### 需要监控的指标
  194 +- [ ] Session数量变化
  195 +- [ ] 相似物品覆盖率
  196 +- [ ] 推荐效果指标(CTR、转化率等)
  197 +
  198 +---
  199 +
  200 +## 📝 回滚方法
  201 +
  202 +如果需要恢复之前的逻辑:
  203 +
  204 +```bash
  205 +# 1. 恢复i2i_swing.py的时间衰减
  206 +git diff offline_tasks/scripts/i2i_swing.py
  207 +# 将 default=False 改回 default=True
  208 +
  209 +# 2. 恢复interest_aggregation.py的时间衰减
  210 +git diff offline_tasks/scripts/interest_aggregation.py
  211 +# 将所有 time_decay=False 改回 time_decay=True
  212 +
  213 +# 3. 恢复i2i_session_w2v.py的时间分割
  214 +git diff offline_tasks/scripts/i2i_session_w2v.py
  215 +# 恢复基于时间间隔的session分割逻辑
  216 +```
  217 +
  218 +---
  219 +
  220 +**修改时间**: 2025-10-16
  221 +**适用场景**: B2B低频交易场景
  222 +**核心原则**: 在低频场景下,所有历史数据都很宝贵,不要过度衰减
  223 +
offline_tasks/scripts/i2i_session_w2v.py
@@ -25,9 +25,9 @@ from offline_tasks.scripts.debug_utils import ( @@ -25,9 +25,9 @@ from offline_tasks.scripts.debug_utils import (
25 ) 25 )
26 26
27 27
28 -def prepare_session_data(df, session_gap_minutes=30, logger=None): 28 +def prepare_session_data(df, max_session_length=50, min_session_length=2, logger=None):
29 """ 29 """
30 - 准备会话数据 30 + 准备会话数据 - 基于固定长度分块,适合B2B低频场景
31 31
32 Args: 32 Args:
33 df: DataFrame with columns: user_id, item_id, create_time 33 df: DataFrame with columns: user_id, item_id, create_time
@@ -40,45 +40,39 @@ def prepare_session_data(df, session_gap_minutes=30, logger=None): @@ -40,45 +40,39 @@ def prepare_session_data(df, session_gap_minutes=30, logger=None):
40 sessions = [] 40 sessions = []
41 41
42 if logger: 42 if logger:
43 - logger.debug(f"开始准备会话数据,会话间隔:{session_gap_minutes}分钟") 43 + logger.debug(f"开始准备会话数据(固定长度分块):max_length={max_session_length}, min_length={min_session_length}")
44 44
45 # 按用户和时间排序 45 # 按用户和时间排序
46 df = df.sort_values(['user_id', 'create_time']) 46 df = df.sort_values(['user_id', 'create_time'])
47 47
48 - # 按用户分组 48 + # 按用户分组,获取每个用户的行为序列
49 for user_id, user_df in df.groupby('user_id'): 49 for user_id, user_df in df.groupby('user_id'):
50 - user_sessions = []  
51 - current_session = []  
52 - last_time = None 50 + # 获取用户的item序列
  51 + item_sequence = user_df['item_id'].astype(str).tolist()
53 52
54 - for _, row in user_df.iterrows():  
55 - item_id = str(row['item_id'])  
56 - current_time = row['create_time']  
57 -  
58 - # 判断是否需要开始新会话  
59 - if last_time is None or (current_time - last_time).total_seconds() / 60 > session_gap_minutes:  
60 - if current_session:  
61 - user_sessions.append(current_session)  
62 - current_session = [item_id]  
63 - else:  
64 - current_session.append(item_id)  
65 -  
66 - last_time = current_time 53 + # 如果序列太短,跳过
  54 + if len(item_sequence) < min_session_length:
  55 + continue
67 56
68 - # 添加最后一个会话  
69 - if current_session:  
70 - user_sessions.append(current_session) 57 + # 按最大长度分块(不重叠)
  58 + user_sessions = [
  59 + item_sequence[i:i + max_session_length]
  60 + for i in range(0, len(item_sequence), max_session_length)
  61 + ]
  62 +
  63 + # 过滤掉长度不足的最后一块
  64 + user_sessions = [s for s in user_sessions if len(s) >= min_session_length]
71 65
72 sessions.extend(user_sessions) 66 sessions.extend(user_sessions)
73 67
74 - # 过滤掉长度小于2的会话  
75 - sessions = [s for s in sessions if len(s) >= 2]  
76 -  
77 if logger: 68 if logger:
78 - session_lengths = [len(s) for s in sessions]  
79 - logger.debug(f"生成 {len(sessions)} 个会话")  
80 - logger.debug(f"会话长度统计:最小={min(session_lengths)}, 最大={max(session_lengths)}, "  
81 - f"平均={sum(session_lengths)/len(session_lengths):.2f}") 69 + if sessions:
  70 + session_lengths = [len(s) for s in sessions]
  71 + logger.debug(f"生成 {len(sessions)} 个会话")
  72 + logger.debug(f"会话长度统计:最小={min(session_lengths)}, 最大={max(session_lengths)}, "
  73 + f"平均={sum(session_lengths)/len(session_lengths):.2f}")
  74 + else:
  75 + logger.warning("未生成任何会话!")
82 76
83 return sessions 77 return sessions
84 78
@@ -166,8 +160,10 @@ def main(): @@ -166,8 +160,10 @@ def main():
166 help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') 160 help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
167 parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, 161 parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
168 help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})') 162 help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})')
169 - parser.add_argument('--session_gap', type=int, default=30,  
170 - help='Session gap in minutes') 163 + parser.add_argument('--max_session_length', type=int, default=50,
  164 + help='Maximum session length for chunking (default: 50)')
  165 + parser.add_argument('--min_session_length', type=int, default=2,
  166 + help='Minimum session length to keep (default: 2)')
171 parser.add_argument('--output', type=str, default=None, 167 parser.add_argument('--output', type=str, default=None,
172 help='Output file path') 168 help='Output file path')
173 parser.add_argument('--save_model', action='store_true', 169 parser.add_argument('--save_model', action='store_true',
@@ -241,7 +237,12 @@ def main(): @@ -241,7 +237,12 @@ def main():
241 237
242 # 准备会话数据 238 # 准备会话数据
243 log_processing_step(logger, "准备会话数据") 239 log_processing_step(logger, "准备会话数据")
244 - sessions = prepare_session_data(df, session_gap_minutes=args.session_gap, logger=logger) 240 + sessions = prepare_session_data(
  241 + df,
  242 + max_session_length=args.max_session_length,
  243 + min_session_length=args.min_session_length,
  244 + logger=logger
  245 + )
245 logger.info(f"生成 {len(sessions)} 个会话") 246 logger.info(f"生成 {len(sessions)} 个会话")
246 247
247 # 训练Word2Vec模型 248 # 训练Word2Vec模型
offline_tasks/scripts/i2i_swing.py
@@ -197,8 +197,8 @@ def main(): @@ -197,8 +197,8 @@ def main():
197 help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') 197 help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
198 parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, 198 parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
199 help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})') 199 help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})')
200 - parser.add_argument('--time_decay', action='store_true', default=True,  
201 - help='Use time decay for behavior weights') 200 + parser.add_argument('--time_decay', action='store_true', default=False,
  201 + help='Use time decay for behavior weights (default: False for B2B low-frequency scenarios)')
202 parser.add_argument('--decay_factor', type=float, default=0.95, 202 parser.add_argument('--decay_factor', type=float, default=0.95,
203 help='Time decay factor') 203 help='Time decay factor')
204 parser.add_argument('--output', type=str, default=None, 204 parser.add_argument('--output', type=str, default=None,
offline_tasks/scripts/interest_aggregation.py
@@ -163,15 +163,17 @@ def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights): @@ -163,15 +163,17 @@ def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights):
163 # 热门商品索引 163 # 热门商品索引
164 if not df_hot.empty: 164 if not df_hot.empty:
165 print("Generating hot item indices...") 165 print("Generating hot item indices...")
  166 + # B2B低频场景,不使用时间衰减
166 list_type_indices['hot'] = aggregate_by_dimensions( 167 list_type_indices['hot'] = aggregate_by_dimensions(
167 - df_hot, behavior_weights, time_decay=True 168 + df_hot, behavior_weights, time_decay=False
168 ) 169 )
169 170
170 # 加购商品索引 171 # 加购商品索引
171 if not df_cart.empty: 172 if not df_cart.empty:
172 print("Generating cart item indices...") 173 print("Generating cart item indices...")
  174 + # B2B低频场景,不使用时间衰减
173 list_type_indices['cart'] = aggregate_by_dimensions( 175 list_type_indices['cart'] = aggregate_by_dimensions(
174 - df_cart, behavior_weights, time_decay=True 176 + df_cart, behavior_weights, time_decay=False
175 ) 177 )
176 178
177 # 新品索引 179 # 新品索引
@@ -368,8 +370,9 @@ def main(): @@ -368,8 +370,9 @@ def main():
368 370
369 # 生成全局索引(所有数据) 371 # 生成全局索引(所有数据)
370 log_processing_step(logger, "生成全局索引") 372 log_processing_step(logger, "生成全局索引")
  373 + # B2B低频场景,不使用时间衰减
371 global_aggregations = aggregate_by_dimensions( 374 global_aggregations = aggregate_by_dimensions(
372 - df, behavior_weights, time_decay=True, decay_factor=args.decay_factor 375 + df, behavior_weights, time_decay=False, decay_factor=args.decay_factor
373 ) 376 )
374 logger.info("保存全局索引...") 377 logger.info("保存全局索引...")
375 output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n) 378 output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n)