Blame view

offline_tasks/scripts/i2i_swing.py 14.2 KB
5ab1c29c   tangwang   first commit
1
2
3
4
5
  """
  i2i - Swing算法实现
  基于用户行为的物品相似度计算
  参考item_sim.py的数据格式,适配真实数据
  """
5ab1c29c   tangwang   first commit
6
7
8
9
10
11
12
  import pandas as pd
  import math
  from collections import defaultdict
  import argparse
  import json
  from datetime import datetime, timedelta
  from db_service import create_db_connection
06cb25fa   tangwang   deepwalk refactor...
13
  from config import (
5ab1c29c   tangwang   first commit
14
15
16
      DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range,
      DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N
  )
06cb25fa   tangwang   deepwalk refactor...
17
  from debug_utils import (
1721766b   tangwang   offline tasks
18
      setup_debug_logger, log_dataframe_info, log_dict_stats,
12118125   tangwang   offline tasks: me...
19
      save_readable_index, load_name_mappings_from_file, log_algorithm_params,
1721766b   tangwang   offline tasks
20
21
      log_processing_step
  )
5ab1c29c   tangwang   first commit
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  
  
  def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30):
      """
      计算时间衰减权重
      
      Args:
          event_time: 事件发生时间
          reference_time: 参考时间(通常是当前时间)
          decay_factor: 衰减因子
          days_unit: 衰减单位(天)
      
      Returns:
          时间权重
      """
      if pd.isna(event_time):
          return 1.0
      
      time_diff = (reference_time - event_time).days
      if time_diff < 0:
          return 1.0
      
      # 计算衰减权重
      periods = time_diff / days_unit
      weight = math.pow(decay_factor, periods)
      return weight
  
  
12118125   tangwang   offline tasks: me...
50
  def swing_algorithm(df, alpha=0.5, time_decay=True, decay_factor=0.95, use_daily_session=True, logger=None, debug=False):
5ab1c29c   tangwang   first commit
51
52
53
54
55
56
57
58
      """
      Swing算法实现
      
      Args:
          df: DataFrame with columns: user_id, item_id, weight, create_time
          alpha: Swing算法的alpha参数
          time_decay: 是否使用时间衰减
          decay_factor: 时间衰减因子
12118125   tangwang   offline tasks: me...
59
          use_daily_session: 是否同时使用uid+日期作为session维度
1721766b   tangwang   offline tasks
60
61
          logger: 日志记录器
          debug: 是否开启debug模式
5ab1c29c   tangwang   first commit
62
63
64
65
      
      Returns:
          Dict[item_id, List[Tuple(similar_item_id, score)]]
      """
1721766b   tangwang   offline tasks
66
67
      start_time = datetime.now()
      if logger:
12118125   tangwang   offline tasks: me...
68
          logger.debug(f"开始Swing算法计算,参数: alpha={alpha}, time_decay={time_decay}, use_daily_session={use_daily_session}")
1721766b   tangwang   offline tasks
69
      
5ab1c29c   tangwang   first commit
70
71
72
      # 如果使用时间衰减,计算时间权重
      reference_time = datetime.now()
      if time_decay and 'create_time' in df.columns:
1721766b   tangwang   offline tasks
73
74
          if logger:
              logger.debug("应用时间衰减...")
5ab1c29c   tangwang   first commit
75
76
77
78
          df['time_weight'] = df['create_time'].apply(
              lambda x: calculate_time_weight(x, reference_time, decay_factor)
          )
          df['weight'] = df['weight'] * df['time_weight']
1721766b   tangwang   offline tasks
79
80
          if logger and debug:
              logger.debug(f"时间权重统计: min={df['time_weight'].min():.4f}, max={df['time_weight'].max():.4f}, avg={df['time_weight'].mean():.4f}")
5ab1c29c   tangwang   first commit
81
      
12118125   tangwang   offline tasks: me...
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
      # 如果启用daily_session,duplicate数据:添加uid+date作为新的uid
      if use_daily_session and 'create_time' in df.columns:
          if logger:
              logger.info("启用日期维度:duplicate数据,添加uid+日期作为新的session")
          
          # 创建原始数据副本
          df_original = df.copy()
          
          # 创建uid+date版本
          df_daily = df.copy()
          df_daily['date'] = pd.to_datetime(df_daily['create_time']).dt.strftime('%Y%m%d')
          df_daily['user_id'] = df_daily['user_id'].astype(str) + '_' + df_daily['date']
          
          # 合并两份数据
          df = pd.concat([df_original, df_daily], ignore_index=True)
          
          if logger:
              logger.info(f"原始数据: {len(df_original)} 条")
              logger.info(f"日期维度数据: {len(df_daily)} 条")
              logger.info(f"合并后总数据: {len(df)} 条")
      
5ab1c29c   tangwang   first commit
103
      # 构建用户-物品倒排索引
1721766b   tangwang   offline tasks
104
105
106
      if logger:
          log_processing_step(logger, "步骤1: 构建用户-物品倒排索引")
      
5ab1c29c   tangwang   first commit
107
108
109
110
111
112
113
114
115
116
117
118
119
      user_items = defaultdict(set)
      item_users = defaultdict(set)
      item_freq = defaultdict(float)
      
      for _, row in df.iterrows():
          user_id = row['user_id']
          item_id = row['item_id']
          weight = row['weight']
          
          user_items[user_id].add(item_id)
          item_users[item_id].add(user_id)
          item_freq[item_id] += weight
      
1721766b   tangwang   offline tasks
120
121
122
123
124
      if logger:
          logger.info(f"总用户数: {len(user_items)}, 总商品数: {len(item_users)}")
          if debug:
              log_dict_stats(logger, dict(list(user_items.items())[:1000]), "用户-商品倒排索引(采样)", top_n=5)
              log_dict_stats(logger, dict(list(item_users.items())[:1000]), "商品-用户倒排索引(采样)", top_n=5)
5ab1c29c   tangwang   first commit
125
126
      
      # 计算物品相似度
1721766b   tangwang   offline tasks
127
128
129
      if logger:
          log_processing_step(logger, "步骤2: 计算Swing物品相似度")
      
5ab1c29c   tangwang   first commit
130
131
132
      item_sim_dict = defaultdict(lambda: defaultdict(float))
      
      # 遍历每个物品对
1721766b   tangwang   offline tasks
133
134
135
136
      processed_pairs = 0
      total_items = len(item_users)
      
      for idx_i, item_i in enumerate(item_users):
5ab1c29c   tangwang   first commit
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
          users_i = item_users[item_i]
          
          # 找到所有与item_i共现的物品
          for item_j in item_users:
              if item_i >= item_j:  # 避免重复计算
                  continue
              
              users_j = item_users[item_j]
              common_users = users_i & users_j
              
              if len(common_users) < 2:
                  continue
              
              # 计算Swing相似度
              sim_score = 0.0
              common_users_list = list(common_users)
              
              for idx_u in range(len(common_users_list)):
                  user_u = common_users_list[idx_u]
                  items_u = user_items[user_u]
                  
                  for idx_v in range(idx_u + 1, len(common_users_list)):
                      user_v = common_users_list[idx_v]
                      items_v = user_items[user_v]
                      
                      # 计算用户u和用户v的共同物品数
                      common_items = items_u & items_v
                      
                      # Swing公式
                      sim_score += 1.0 / (alpha + len(common_items))
              
              item_sim_dict[item_i][item_j] = sim_score
              item_sim_dict[item_j][item_i] = sim_score
1721766b   tangwang   offline tasks
170
171
172
173
174
175
176
177
              processed_pairs += 1
          
          # Debug: 显示处理进度
          if logger and debug and (idx_i + 1) % 50 == 0:
              logger.debug(f"已处理 {idx_i + 1}/{total_items} 个商品 ({(idx_i+1)/total_items*100:.1f}%)")
      
      if logger:
          logger.info(f"计算了 {processed_pairs} 对商品相似度")
5ab1c29c   tangwang   first commit
178
179
      
      # 对相似度进行归一化并排序
1721766b   tangwang   offline tasks
180
181
182
      if logger:
          log_processing_step(logger, "步骤3: 整理和排序结果")
      
5ab1c29c   tangwang   first commit
183
184
185
186
      result = {}
      for item_i in item_sim_dict:
          sims = item_sim_dict[item_i]
          
5ab1c29c   tangwang   first commit
187
188
189
190
          # 按相似度排序
          sorted_sims = sorted(sims.items(), key=lambda x: -x[1])
          result[item_i] = sorted_sims
      
1721766b   tangwang   offline tasks
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
      if logger:
          total_time = (datetime.now() - start_time).total_seconds()
          logger.info(f"Swing算法完成: {len(result)} 个商品有相似推荐")
          logger.info(f"总耗时: {total_time:.2f}秒")
          
          # 统计每个商品的相似商品数
          sim_counts = [len(sims) for sims in result.values()]
          if sim_counts:
              logger.info(f"相似商品数统计: min={min(sim_counts)}, max={max(sim_counts)}, avg={sum(sim_counts)/len(sim_counts):.2f}")
          
          # 采样展示结果
          if debug:
              sample_results = list(result.items())[:3]
              for item_i, sims in sample_results:
                  logger.debug(f"  商品 {item_i} 的Top5相似商品: {sims[:5]}")
      
5ab1c29c   tangwang   first commit
207
208
209
210
211
212
213
214
215
216
217
      return result
  
  
  def main():
      parser = argparse.ArgumentParser(description='Run Swing algorithm for i2i similarity')
      parser.add_argument('--alpha', type=float, default=I2I_CONFIG['swing']['alpha'],
                         help='Alpha parameter for Swing algorithm')
      parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N,
                         help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
      parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
                         help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})')
9832fef6   tangwang   offline tasks
218
219
      parser.add_argument('--time_decay', action='store_true', default=False,
                         help='Use time decay for behavior weights (default: False for B2B low-frequency scenarios)')
5ab1c29c   tangwang   first commit
220
221
      parser.add_argument('--decay_factor', type=float, default=0.95,
                         help='Time decay factor')
12118125   tangwang   offline tasks: me...
222
223
224
225
      parser.add_argument('--use_daily_session', action='store_true', default=True,
                         help='Use uid+date as additional session dimension (default: True)')
      parser.add_argument('--no_daily_session', action='store_false', dest='use_daily_session',
                         help='Disable uid+date session dimension')
5ab1c29c   tangwang   first commit
226
227
      parser.add_argument('--output', type=str, default=None,
                         help='Output file path')
1721766b   tangwang   offline tasks
228
229
      parser.add_argument('--debug', action='store_true',
                         help='Enable debug mode with detailed logging and readable output')
5ab1c29c   tangwang   first commit
230
231
232
      
      args = parser.parse_args()
      
1721766b   tangwang   offline tasks
233
234
235
236
237
238
239
240
241
242
      # 设置日志
      logger = setup_debug_logger('i2i_swing', debug=args.debug)
      
      # 记录参数
      log_algorithm_params(logger, {
          'alpha': args.alpha,
          'top_n': args.top_n,
          'lookback_days': args.lookback_days,
          'time_decay': args.time_decay,
          'decay_factor': args.decay_factor,
12118125   tangwang   offline tasks: me...
243
          'use_daily_session': args.use_daily_session,
1721766b   tangwang   offline tasks
244
245
246
          'debug': args.debug
      })
      
5ab1c29c   tangwang   first commit
247
      # 创建数据库连接
1721766b   tangwang   offline tasks
248
      logger.info("连接数据库...")
5ab1c29c   tangwang   first commit
249
250
251
252
253
254
255
256
257
258
      engine = create_db_connection(
          DB_CONFIG['host'],
          DB_CONFIG['port'],
          DB_CONFIG['database'],
          DB_CONFIG['username'],
          DB_CONFIG['password']
      )
      
      # 获取时间范围
      start_date, end_date = get_time_range(args.lookback_days)
1721766b   tangwang   offline tasks
259
      logger.info(f"获取数据: {start_date} 到 {end_date}")
5ab1c29c   tangwang   first commit
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
      
      # SQL查询 - 获取用户行为数据
      sql_query = f"""
      SELECT 
          se.anonymous_id AS user_id,
          se.item_id,
          se.event AS event_type,
          se.create_time,
          pgs.name AS item_name
      FROM 
          sensors_events se
      LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
      WHERE 
          se.event IN ('contactFactory', 'addToPool', 'addToCart', 'purchase')
          AND se.create_time >= '{start_date}'
          AND se.create_time <= '{end_date}'
          AND se.item_id IS NOT NULL
          AND se.anonymous_id IS NOT NULL
      ORDER BY 
          se.create_time
      """
      
1721766b   tangwang   offline tasks
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
      try:
          logger.info("执行SQL查询...")
          df = pd.read_sql(sql_query, engine)
          logger.info(f"获取到 {len(df)} 条记录")
          
          # Debug: 显示数据详情
          if args.debug:
              log_dataframe_info(logger, df, "用户行为数据", sample_size=10)
      except Exception as e:
          logger.error(f"获取数据失败: {e}")
          return
      
      if len(df) == 0:
          logger.warning("没有找到数据")
          return
5ab1c29c   tangwang   first commit
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
      
      # 转换create_time为datetime
      df['create_time'] = pd.to_datetime(df['create_time'])
      
      # 定义行为权重
      behavior_weights = {
          'contactFactory': 5.0,
          'addToPool': 2.0,
          'addToCart': 3.0,
          'purchase': 10.0
      }
      
      # 添加权重列
      df['weight'] = df['event_type'].map(behavior_weights).fillna(1.0)
      
1721766b   tangwang   offline tasks
312
313
314
315
316
317
      if logger and args.debug:
          logger.debug(f"行为类型分布:")
          event_counts = df['event_type'].value_counts()
          for event, count in event_counts.items():
              logger.debug(f"  {event}: {count} ({count/len(df)*100:.2f}%)")
      
5ab1c29c   tangwang   first commit
318
      # 运行Swing算法
1721766b   tangwang   offline tasks
319
      logger.info("运行Swing算法...")
5ab1c29c   tangwang   first commit
320
321
322
323
      result = swing_algorithm(
          df,
          alpha=args.alpha,
          time_decay=args.time_decay,
1721766b   tangwang   offline tasks
324
          decay_factor=args.decay_factor,
12118125   tangwang   offline tasks: me...
325
          use_daily_session=args.use_daily_session,
1721766b   tangwang   offline tasks
326
327
          logger=logger,
          debug=args.debug
5ab1c29c   tangwang   first commit
328
329
      )
      
a1f370ee   tangwang   offline tasks
330
331
      # 创建item_id到name的映射(key转为字符串,与name_mappings一致)
      item_name_map = dict(zip(df['item_id'].unique().astype(str), df.groupby('item_id')['item_name'].first()))
5ab1c29c   tangwang   first commit
332
333
334
335
      
      # 输出结果
      output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_swing_{datetime.now().strftime("%Y%m%d")}.txt')
      
1721766b   tangwang   offline tasks
336
337
      logger.info(f"保存结果到: {output_file}")
      output_count = 0
5ab1c29c   tangwang   first commit
338
339
      with open(output_file, 'w', encoding='utf-8') as f:
          for item_id, sims in result.items():
a1f370ee   tangwang   offline tasks
340
341
              # item_name_map的key是字符串,需要转换
              item_name = item_name_map.get(str(item_id), 'Unknown')
5ab1c29c   tangwang   first commit
342
343
344
345
346
347
348
349
350
351
              
              # 只取前N个最相似的商品
              top_sims = sims[:args.top_n]
              
              if not top_sims:
                  continue
              
              # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,...
              sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in top_sims])
              f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
1721766b   tangwang   offline tasks
352
353
354
              output_count += 1
      
      logger.info(f"输出了 {output_count} 个商品的推荐")
5ab1c29c   tangwang   first commit
355
      
1721766b   tangwang   offline tasks
356
357
358
359
      # Debug模式:生成明文文件
      if args.debug:
          logger.info("Debug模式:生成明文索引文件...")
          try:
12118125   tangwang   offline tasks: me...
360
361
362
              # 从本地文件加载名称映射
              logger.debug("加载ID到名称的映射...")
              name_mappings = load_name_mappings_from_file(debug=True)
1721766b   tangwang   offline tasks
363
              
a1f370ee   tangwang   offline tasks
364
365
              # 准备索引数据(合并已有的item_name_map)
              # item_name_map的key已经是str类型,可以直接更新
1721766b   tangwang   offline tasks
366
367
              name_mappings['item'].update(item_name_map)
              
8cc6477b   tangwang   offline tasks
368
              if args.debug:
a1f370ee   tangwang   offline tasks
369
370
                  logger.debug(f"name_mappings['item']共有 {len(name_mappings['item'])} 个商品名称")
              
1721766b   tangwang   offline tasks
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
              index_data = {}
              for item_id, sims in result.items():
                  top_sims = sims[:args.top_n]
                  if top_sims:
                      index_data[f"i2i:swing:{item_id}"] = top_sims
              
              # 保存明文文件
              readable_file = save_readable_index(
                  output_file,
                  index_data,
                  name_mappings,
                  description=f"Swing算法 i2i相似度推荐 (alpha={args.alpha}, lookback_days={args.lookback_days})"
              )
              logger.info(f"明文索引文件: {readable_file}")
          except Exception as e:
              logger.error(f"生成明文文件失败: {e}", exc_info=True)
      
      logger.info("完成!")
5ab1c29c   tangwang   first commit
389
390
391
392
  
  
  if __name__ == '__main__':
      main()