Commit 14f3dcbe9706dff7b8ae96eb45daf3c8f0845a21

Authored by tangwang
1 parent 8cc6477b

offline tasks

offline_tasks/run.sh
@@ -4,7 +4,7 @@ cd /home/tw/recommendation/offline_tasks @@ -4,7 +4,7 @@ cd /home/tw/recommendation/offline_tasks
4 # cat UPDATE_CONFIG_GUIDE.md 4 # cat UPDATE_CONFIG_GUIDE.md
5 5
6 # 2. 测试连接 6 # 2. 测试连接
7 -python3 test_connection.py 7 +# python3 test_connection.py
8 8
9 # 3. 调试模式运行(小数据量) 9 # 3. 调试模式运行(小数据量)
10 python3 run_all.py --lookback_days 7 --top_n 10 --debug 10 python3 run_all.py --lookback_days 7 --top_n 10 --debug
@@ -13,7 +13,9 @@ mv output output_debug @@ -13,7 +13,9 @@ mv output output_debug
13 mkdir output 13 mkdir output
14 14
15 # # 4. 生产模式运行(大数据量) 15 # # 4. 生产模式运行(大数据量)
16 -python3 run_all.py --lookback_days 730 --top_n 50 16 +python3 run_all.py --lookback_days 730 --top_n 50 --debug
17 17
18 # 5. 加载到Redis 18 # 5. 加载到Redis
19 python3 scripts/load_index_to_redis.py --redis-host localhost 19 python3 scripts/load_index_to_redis.py --redis-host localhost
  20 +
  21 +
offline_tasks/scripts/i2i_content_similar.py
@@ -17,6 +17,11 @@ from db_service import create_db_connection @@ -17,6 +17,11 @@ from db_service import create_db_connection
17 from offline_tasks.config.offline_config import ( 17 from offline_tasks.config.offline_config import (
18 DB_CONFIG, OUTPUT_DIR, DEFAULT_I2I_TOP_N 18 DB_CONFIG, OUTPUT_DIR, DEFAULT_I2I_TOP_N
19 ) 19 )
  20 +from offline_tasks.scripts.debug_utils import (
  21 + setup_debug_logger, log_dataframe_info, log_dict_stats,
  22 + save_readable_index, fetch_name_mappings, log_algorithm_params,
  23 + log_processing_step
  24 +)
20 25
21 26
22 def fetch_product_features(engine): 27 def fetch_product_features(engine):
@@ -221,8 +226,19 @@ def main(): @@ -221,8 +226,19 @@ def main():
221 226
222 args = parser.parse_args() 227 args = parser.parse_args()
223 228
  229 + # 设置logger
  230 + logger = setup_debug_logger('i2i_content_similar', debug=args.debug)
  231 +
  232 + # 记录算法参数
  233 + params = {
  234 + 'top_n': args.top_n,
  235 + 'method': args.method,
  236 + 'debug': args.debug
  237 + }
  238 + log_algorithm_params(logger, params)
  239 +
224 # 创建数据库连接 240 # 创建数据库连接
225 - print("Connecting to database...") 241 + logger.info("连接数据库...")
226 engine = create_db_connection( 242 engine = create_db_connection(
227 DB_CONFIG['host'], 243 DB_CONFIG['host'],
228 DB_CONFIG['port'], 244 DB_CONFIG['port'],
@@ -232,34 +248,47 @@ def main(): @@ -232,34 +248,47 @@ def main():
232 ) 248 )
233 249
234 # 获取商品特征 250 # 获取商品特征
  251 + log_processing_step(logger, "获取商品特征")
235 df = fetch_product_features(engine) 252 df = fetch_product_features(engine)
  253 + logger.info(f"获取到 {len(df)} 个商品的特征数据")
  254 + log_dataframe_info(logger, df, "商品特征数据")
236 255
237 # 计算相似度 256 # 计算相似度
  257 + log_processing_step(logger, f"计算相似度 (方法: {args.method})")
238 if args.method == 'tfidf': 258 if args.method == 'tfidf':
239 - print("\nUsing TF-IDF method...") 259 + logger.info("使用 TF-IDF 方法...")
240 result = calculate_content_similarity(df, args.top_n) 260 result = calculate_content_similarity(df, args.top_n)
241 elif args.method == 'category': 261 elif args.method == 'category':
242 - print("\nUsing category-based method...") 262 + logger.info("使用基于分类的方法...")
243 result = calculate_category_based_similarity(df) 263 result = calculate_category_based_similarity(df)
244 else: # hybrid 264 else: # hybrid
245 - print("\nUsing hybrid method...") 265 + logger.info("使用混合方法 (TF-IDF 70% + 分类 30%)...")
246 tfidf_sim = calculate_content_similarity(df, args.top_n) 266 tfidf_sim = calculate_content_similarity(df, args.top_n)
247 category_sim = calculate_category_based_similarity(df) 267 category_sim = calculate_category_based_similarity(df)
248 result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3) 268 result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3)
249 269
250 - # 创建item_id到name的映射  
251 - item_name_map = dict(zip(df['item_id'], df['item_name'])) 270 + logger.info(f"为 {len(result)} 个物品生成了相似度")
252 271
253 # 输出结果 272 # 输出结果
  273 + log_processing_step(logger, "保存结果")
254 output_file = args.output or os.path.join( 274 output_file = args.output or os.path.join(
255 OUTPUT_DIR, 275 OUTPUT_DIR,
256 f'i2i_content_{args.method}_{datetime.now().strftime("%Y%m%d")}.txt' 276 f'i2i_content_{args.method}_{datetime.now().strftime("%Y%m%d")}.txt'
257 ) 277 )
258 278
259 - print(f"\nWriting results to {output_file}...") 279 + # 获取name mappings
  280 + name_mappings = {}
  281 + if args.debug:
  282 + logger.info("获取物品名称映射...")
  283 + name_mappings = fetch_name_mappings(engine, debug=True)
  284 +
  285 + logger.info(f"写入结果到 {output_file}...")
260 with open(output_file, 'w', encoding='utf-8') as f: 286 with open(output_file, 'w', encoding='utf-8') as f:
261 for item_id, sims in result.items(): 287 for item_id, sims in result.items():
262 - item_name = item_name_map.get(item_id, 'Unknown') 288 + # 使用name_mappings获取名称
  289 + item_name = name_mappings.get(item_id, 'Unknown')
  290 + if item_name == 'Unknown' and 'item_name' in df.columns:
  291 + item_name = df[df['item_id'] == item_id]['item_name'].iloc[0] if len(df[df['item_id'] == item_id]) > 0 else 'Unknown'
263 292
264 if not sims: 293 if not sims:
265 continue 294 continue
@@ -268,8 +297,19 @@ def main(): @@ -268,8 +297,19 @@ def main():
268 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) 297 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
269 f.write(f'{item_id}\t{item_name}\t{sim_str}\n') 298 f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
270 299
271 - print(f"Done! Generated content-based similarities for {len(result)} items")  
272 - print(f"Output saved to: {output_file}") 300 + logger.info(f"完成!为 {len(result)} 个物品生成了基于内容的相似度")
  301 + logger.info(f"输出保存到:{output_file}")
  302 +
  303 + # 如果启用debug模式,保存可读格式
  304 + if args.debug:
  305 + log_processing_step(logger, "保存Debug可读格式")
  306 + save_readable_index(
  307 + output_file,
  308 + result,
  309 + name_mappings,
  310 + index_type=f'i2i:content:{args.method}',
  311 + logger=logger
  312 + )
273 313
274 314
275 if __name__ == '__main__': 315 if __name__ == '__main__':
offline_tasks/scripts/i2i_deepwalk.py
@@ -17,6 +17,11 @@ from offline_tasks.config.offline_config import ( @@ -17,6 +17,11 @@ from offline_tasks.config.offline_config import (
17 DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, 17 DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range,
18 DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N 18 DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N
19 ) 19 )
  20 +from offline_tasks.scripts.debug_utils import (
  21 + setup_debug_logger, log_dataframe_info, log_dict_stats,
  22 + save_readable_index, fetch_name_mappings, log_algorithm_params,
  23 + log_processing_step
  24 +)
20 25
21 26
22 def build_item_graph(df, behavior_weights): 27 def build_item_graph(df, behavior_weights):
@@ -223,8 +228,26 @@ def main(): @@ -223,8 +228,26 @@ def main():
223 228
224 args = parser.parse_args() 229 args = parser.parse_args()
225 230
  231 + # 设置logger
  232 + logger = setup_debug_logger('i2i_deepwalk', debug=args.debug)
  233 +
  234 + # 记录算法参数
  235 + params = {
  236 + 'num_walks': args.num_walks,
  237 + 'walk_length': args.walk_length,
  238 + 'window_size': args.window_size,
  239 + 'vector_size': args.vector_size,
  240 + 'min_count': args.min_count,
  241 + 'workers': args.workers,
  242 + 'epochs': args.epochs,
  243 + 'top_n': args.top_n,
  244 + 'lookback_days': args.lookback_days,
  245 + 'debug': args.debug
  246 + }
  247 + log_algorithm_params(logger, params)
  248 +
226 # 创建数据库连接 249 # 创建数据库连接
227 - print("Connecting to database...") 250 + logger.info("连接数据库...")
228 engine = create_db_connection( 251 engine = create_db_connection(
229 DB_CONFIG['host'], 252 DB_CONFIG['host'],
230 DB_CONFIG['port'], 253 DB_CONFIG['port'],
@@ -235,7 +258,7 @@ def main(): @@ -235,7 +258,7 @@ def main():
235 258
236 # 获取时间范围 259 # 获取时间范围
237 start_date, end_date = get_time_range(args.lookback_days) 260 start_date, end_date = get_time_range(args.lookback_days)
238 - print(f"Fetching data from {start_date} to {end_date}...") 261 + logger.info(f"获取数据范围:{start_date} 到 {end_date}")
239 262
240 # SQL查询 - 获取用户行为数据 263 # SQL查询 - 获取用户行为数据
241 sql_query = f""" 264 sql_query = f"""
@@ -255,9 +278,12 @@ def main(): @@ -255,9 +278,12 @@ def main():
255 AND se.anonymous_id IS NOT NULL 278 AND se.anonymous_id IS NOT NULL
256 """ 279 """
257 280
258 - print("Executing SQL query...") 281 + logger.info("执行SQL查询...")
259 df = pd.read_sql(sql_query, engine) 282 df = pd.read_sql(sql_query, engine)
260 - print(f"Fetched {len(df)} records") 283 + logger.info(f"获取到 {len(df)} 条记录")
  284 +
  285 + # 记录数据信息
  286 + log_dataframe_info(logger, df, "用户行为数据")
261 287
262 # 定义行为权重 288 # 定义行为权重
263 behavior_weights = { 289 behavior_weights = {
@@ -267,23 +293,26 @@ def main(): @@ -267,23 +293,26 @@ def main():
267 'addToCart': 3.0, 293 'addToCart': 3.0,
268 'purchase': 10.0 294 'purchase': 10.0
269 } 295 }
  296 + logger.debug(f"行为权重: {behavior_weights}")
270 297
271 # 构建物品图 298 # 构建物品图
272 - print("Building item graph...") 299 + log_processing_step(logger, "构建物品图")
273 graph = build_item_graph(df, behavior_weights) 300 graph = build_item_graph(df, behavior_weights)
274 - print(f"Graph built with {len(graph)} nodes") 301 + logger.info(f"构建物品图完成,共 {len(graph)} 个节点")
275 302
276 # 保存边文件(可选) 303 # 保存边文件(可选)
277 if args.save_graph: 304 if args.save_graph:
278 edge_file = os.path.join(OUTPUT_DIR, f'item_graph_{datetime.now().strftime("%Y%m%d")}.txt') 305 edge_file = os.path.join(OUTPUT_DIR, f'item_graph_{datetime.now().strftime("%Y%m%d")}.txt')
279 save_edge_file(graph, edge_file) 306 save_edge_file(graph, edge_file)
  307 + logger.info(f"图边文件已保存到 {edge_file}")
280 308
281 # 生成随机游走 309 # 生成随机游走
282 - print("Generating random walks...") 310 + log_processing_step(logger, "生成随机游走")
283 walks = generate_walks(graph, args.num_walks, args.walk_length) 311 walks = generate_walks(graph, args.num_walks, args.walk_length)
284 - print(f"Generated {len(walks)} walks") 312 + logger.info(f"生成 {len(walks)} 条游走路径")
285 313
286 # 训练Word2Vec模型 314 # 训练Word2Vec模型
  315 + log_processing_step(logger, "训练Word2Vec模型")
287 w2v_config = { 316 w2v_config = {
288 'vector_size': args.vector_size, 317 'vector_size': args.vector_size,
289 'window_size': args.window_size, 318 'window_size': args.window_size,
@@ -292,29 +321,39 @@ def main(): @@ -292,29 +321,39 @@ def main():
292 'epochs': args.epochs, 321 'epochs': args.epochs,
293 'sg': 1 322 'sg': 1
294 } 323 }
  324 + logger.debug(f"Word2Vec配置: {w2v_config}")
295 325
296 model = train_word2vec(walks, w2v_config) 326 model = train_word2vec(walks, w2v_config)
  327 + logger.info(f"训练完成。词汇表大小:{len(model.wv)}")
297 328
298 # 保存模型(可选) 329 # 保存模型(可选)
299 if args.save_model: 330 if args.save_model:
300 model_path = os.path.join(OUTPUT_DIR, f'deepwalk_model_{datetime.now().strftime("%Y%m%d")}.model') 331 model_path = os.path.join(OUTPUT_DIR, f'deepwalk_model_{datetime.now().strftime("%Y%m%d")}.model')
301 model.save(model_path) 332 model.save(model_path)
302 - print(f"Model saved to {model_path}") 333 + logger.info(f"模型已保存到 {model_path}")
303 334
304 # 生成相似度 335 # 生成相似度
305 - print("Generating similarities...") 336 + log_processing_step(logger, "生成相似度")
306 result = generate_similarities(model, top_n=args.top_n) 337 result = generate_similarities(model, top_n=args.top_n)
307 -  
308 - # 创建item_id到name的映射  
309 - item_name_map = dict(zip(df['item_id'].astype(str), df.groupby('item_id')['item_name'].first())) 338 + logger.info(f"生成了 {len(result)} 个物品的相似度")
310 339
311 # 输出结果 340 # 输出结果
  341 + log_processing_step(logger, "保存结果")
312 output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_deepwalk_{datetime.now().strftime("%Y%m%d")}.txt') 342 output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_deepwalk_{datetime.now().strftime("%Y%m%d")}.txt')
313 343
314 - print(f"Writing results to {output_file}...") 344 + # 获取name mappings
  345 + name_mappings = {}
  346 + if args.debug:
  347 + logger.info("获取物品名称映射...")
  348 + name_mappings = fetch_name_mappings(engine, debug=True)
  349 +
  350 + logger.info(f"写入结果到 {output_file}...")
315 with open(output_file, 'w', encoding='utf-8') as f: 351 with open(output_file, 'w', encoding='utf-8') as f:
316 for item_id, sims in result.items(): 352 for item_id, sims in result.items():
317 - item_name = item_name_map.get(item_id, 'Unknown') 353 + # 使用name_mappings获取名称
  354 + item_name = name_mappings.get(int(item_id), 'Unknown') if item_id.isdigit() else 'Unknown'
  355 + if item_name == 'Unknown' and 'item_name' in df.columns:
  356 + item_name = df[df['item_id'].astype(str) == item_id]['item_name'].iloc[0] if len(df[df['item_id'].astype(str) == item_id]) > 0 else 'Unknown'
318 357
319 if not sims: 358 if not sims:
320 continue 359 continue
@@ -323,8 +362,19 @@ def main(): @@ -323,8 +362,19 @@ def main():
323 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) 362 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
324 f.write(f'{item_id}\t{item_name}\t{sim_str}\n') 363 f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
325 364
326 - print(f"Done! Generated i2i similarities for {len(result)} items")  
327 - print(f"Output saved to: {output_file}") 365 + logger.info(f"完成!为 {len(result)} 个物品生成了相似度")
  366 + logger.info(f"输出保存到:{output_file}")
  367 +
  368 + # 如果启用debug模式,保存可读格式
  369 + if args.debug:
  370 + log_processing_step(logger, "保存Debug可读格式")
  371 + save_readable_index(
  372 + output_file,
  373 + result,
  374 + name_mappings,
  375 + index_type='i2i:deepwalk',
  376 + logger=logger
  377 + )
328 378
329 379
330 if __name__ == '__main__': 380 if __name__ == '__main__':
offline_tasks/scripts/i2i_session_w2v.py
@@ -18,21 +18,30 @@ from offline_tasks.config.offline_config import ( @@ -18,21 +18,30 @@ from offline_tasks.config.offline_config import (
18 DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, 18 DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range,
19 DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N 19 DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N
20 ) 20 )
  21 +from offline_tasks.scripts.debug_utils import (
  22 + setup_debug_logger, log_dataframe_info, log_dict_stats,
  23 + save_readable_index, fetch_name_mappings, log_algorithm_params,
  24 + log_processing_step
  25 +)
21 26
22 27
23 -def prepare_session_data(df, session_gap_minutes=30): 28 +def prepare_session_data(df, session_gap_minutes=30, logger=None):
24 """ 29 """
25 准备会话数据 30 准备会话数据
26 31
27 Args: 32 Args:
28 df: DataFrame with columns: user_id, item_id, create_time 33 df: DataFrame with columns: user_id, item_id, create_time
29 session_gap_minutes: 会话间隔时间(分钟) 34 session_gap_minutes: 会话间隔时间(分钟)
  35 + logger: Logger instance for debugging
30 36
31 Returns: 37 Returns:
32 List of sessions, each session is a list of item_ids 38 List of sessions, each session is a list of item_ids
33 """ 39 """
34 sessions = [] 40 sessions = []
35 41
  42 + if logger:
  43 + logger.debug(f"开始准备会话数据,会话间隔:{session_gap_minutes}分钟")
  44 +
36 # 按用户和时间排序 45 # 按用户和时间排序
37 df = df.sort_values(['user_id', 'create_time']) 46 df = df.sort_values(['user_id', 'create_time'])
38 47
@@ -65,21 +74,33 @@ def prepare_session_data(df, session_gap_minutes=30): @@ -65,21 +74,33 @@ def prepare_session_data(df, session_gap_minutes=30):
65 # 过滤掉长度小于2的会话 74 # 过滤掉长度小于2的会话
66 sessions = [s for s in sessions if len(s) >= 2] 75 sessions = [s for s in sessions if len(s) >= 2]
67 76
  77 + if logger:
  78 + session_lengths = [len(s) for s in sessions]
  79 + logger.debug(f"生成 {len(sessions)} 个会话")
  80 + logger.debug(f"会话长度统计:最小={min(session_lengths)}, 最大={max(session_lengths)}, "
  81 + f"平均={sum(session_lengths)/len(session_lengths):.2f}")
  82 +
68 return sessions 83 return sessions
69 84
70 85
71 -def train_word2vec(sessions, config): 86 +def train_word2vec(sessions, config, logger=None):
72 """ 87 """
73 训练Word2Vec模型 88 训练Word2Vec模型
74 89
75 Args: 90 Args:
76 sessions: List of sessions 91 sessions: List of sessions
77 config: Word2Vec配置 92 config: Word2Vec配置
  93 + logger: Logger instance for debugging
78 94
79 Returns: 95 Returns:
80 Word2Vec模型 96 Word2Vec模型
81 """ 97 """
82 - print(f"Training Word2Vec with {len(sessions)} sessions...") 98 + if logger:
  99 + logger.info(f"训练Word2Vec模型,共 {len(sessions)} 个会话")
  100 + logger.debug(f"模型参数:vector_size={config['vector_size']}, window={config['window_size']}, "
  101 + f"min_count={config['min_count']}, epochs={config['epochs']}")
  102 + else:
  103 + print(f"Training Word2Vec with {len(sessions)} sessions...")
83 104
84 model = Word2Vec( 105 model = Word2Vec(
85 sentences=sessions, 106 sentences=sessions,
@@ -92,23 +113,30 @@ def train_word2vec(sessions, config): @@ -92,23 +113,30 @@ def train_word2vec(sessions, config):
92 seed=42 113 seed=42
93 ) 114 )
94 115
95 - print(f"Training completed. Vocabulary size: {len(model.wv)}") 116 + if logger:
  117 + logger.info(f"训练完成。词汇表大小:{len(model.wv)}")
  118 + else:
  119 + print(f"Training completed. Vocabulary size: {len(model.wv)}")
96 return model 120 return model
97 121
98 122
99 -def generate_similarities(model, top_n=50): 123 +def generate_similarities(model, top_n=50, logger=None):
100 """ 124 """
101 生成物品相似度 125 生成物品相似度
102 126
103 Args: 127 Args:
104 model: Word2Vec模型 128 model: Word2Vec模型
105 top_n: Top N similar items 129 top_n: Top N similar items
  130 + logger: Logger instance for debugging
106 131
107 Returns: 132 Returns:
108 Dict[item_id, List[Tuple(similar_item_id, score)]] 133 Dict[item_id, List[Tuple(similar_item_id, score)]]
109 """ 134 """
110 result = {} 135 result = {}
111 136
  137 + if logger:
  138 + logger.info(f"生成Top {top_n} 相似物品")
  139 +
112 for item_id in model.wv.index_to_key: 140 for item_id in model.wv.index_to_key:
113 try: 141 try:
114 similar_items = model.wv.most_similar(item_id, topn=top_n) 142 similar_items = model.wv.most_similar(item_id, topn=top_n)
@@ -116,6 +144,9 @@ def generate_similarities(model, top_n=50): @@ -116,6 +144,9 @@ def generate_similarities(model, top_n=50):
116 except KeyError: 144 except KeyError:
117 continue 145 continue
118 146
  147 + if logger:
  148 + logger.info(f"生成了 {len(result)} 个物品的相似度")
  149 +
119 return result 150 return result
120 151
121 152
@@ -146,8 +177,25 @@ def main(): @@ -146,8 +177,25 @@ def main():
146 177
147 args = parser.parse_args() 178 args = parser.parse_args()
148 179
  180 + # 设置logger
  181 + logger = setup_debug_logger('i2i_session_w2v', debug=args.debug)
  182 +
  183 + # 记录算法参数
  184 + params = {
  185 + 'window_size': args.window_size,
  186 + 'vector_size': args.vector_size,
  187 + 'min_count': args.min_count,
  188 + 'workers': args.workers,
  189 + 'epochs': args.epochs,
  190 + 'top_n': args.top_n,
  191 + 'lookback_days': args.lookback_days,
  192 + 'session_gap_minutes': args.session_gap,
  193 + 'debug': args.debug
  194 + }
  195 + log_algorithm_params(logger, params)
  196 +
149 # 创建数据库连接 197 # 创建数据库连接
150 - print("Connecting to database...") 198 + logger.info("连接数据库...")
151 engine = create_db_connection( 199 engine = create_db_connection(
152 DB_CONFIG['host'], 200 DB_CONFIG['host'],
153 DB_CONFIG['port'], 201 DB_CONFIG['port'],
@@ -158,7 +206,7 @@ def main(): @@ -158,7 +206,7 @@ def main():
158 206
159 # 获取时间范围 207 # 获取时间范围
160 start_date, end_date = get_time_range(args.lookback_days) 208 start_date, end_date = get_time_range(args.lookback_days)
161 - print(f"Fetching data from {start_date} to {end_date}...") 209 + logger.info(f"获取数据范围:{start_date} 到 {end_date}")
162 210
163 # SQL查询 - 获取用户行为序列 211 # SQL查询 - 获取用户行为序列
164 sql_query = f""" 212 sql_query = f"""
@@ -181,19 +229,23 @@ def main(): @@ -181,19 +229,23 @@ def main():
181 se.create_time 229 se.create_time
182 """ 230 """
183 231
184 - print("Executing SQL query...") 232 + logger.info("执行SQL查询...")
185 df = pd.read_sql(sql_query, engine) 233 df = pd.read_sql(sql_query, engine)
186 - print(f"Fetched {len(df)} records") 234 + logger.info(f"获取到 {len(df)} 条记录")
  235 +
  236 + # 记录数据信息
  237 + log_dataframe_info(logger, df, "用户行为数据")
187 238
188 # 转换create_time为datetime 239 # 转换create_time为datetime
189 df['create_time'] = pd.to_datetime(df['create_time']) 240 df['create_time'] = pd.to_datetime(df['create_time'])
190 241
191 # 准备会话数据 242 # 准备会话数据
192 - print("Preparing session data...")  
193 - sessions = prepare_session_data(df, session_gap_minutes=args.session_gap)  
194 - print(f"Generated {len(sessions)} sessions") 243 + log_processing_step(logger, "准备会话数据")
  244 + sessions = prepare_session_data(df, session_gap_minutes=args.session_gap, logger=logger)
  245 + logger.info(f"生成 {len(sessions)} 个会话")
195 246
196 # 训练Word2Vec模型 247 # 训练Word2Vec模型
  248 + log_processing_step(logger, "训练Word2Vec模型")
197 w2v_config = { 249 w2v_config = {
198 'vector_size': args.vector_size, 250 'vector_size': args.vector_size,
199 'window_size': args.window_size, 251 'window_size': args.window_size,
@@ -203,28 +255,35 @@ def main(): @@ -203,28 +255,35 @@ def main():
203 'sg': 1 255 'sg': 1
204 } 256 }
205 257
206 - model = train_word2vec(sessions, w2v_config) 258 + model = train_word2vec(sessions, w2v_config, logger=logger)
207 259
208 # 保存模型(可选) 260 # 保存模型(可选)
209 if args.save_model: 261 if args.save_model:
210 model_path = os.path.join(OUTPUT_DIR, f'session_w2v_model_{datetime.now().strftime("%Y%m%d")}.model') 262 model_path = os.path.join(OUTPUT_DIR, f'session_w2v_model_{datetime.now().strftime("%Y%m%d")}.model')
211 model.save(model_path) 263 model.save(model_path)
212 - print(f"Model saved to {model_path}") 264 + logger.info(f"模型已保存到 {model_path}")
213 265
214 # 生成相似度 266 # 生成相似度
215 - print("Generating similarities...")  
216 - result = generate_similarities(model, top_n=args.top_n)  
217 -  
218 - # 创建item_id到name的映射  
219 - item_name_map = dict(zip(df['item_id'].astype(str), df.groupby('item_id')['item_name'].first())) 267 + log_processing_step(logger, "生成相似度")
  268 + result = generate_similarities(model, top_n=args.top_n, logger=logger)
220 269
221 # 输出结果 270 # 输出结果
  271 + log_processing_step(logger, "保存结果")
222 output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_session_w2v_{datetime.now().strftime("%Y%m%d")}.txt') 272 output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_session_w2v_{datetime.now().strftime("%Y%m%d")}.txt')
223 273
224 - print(f"Writing results to {output_file}...") 274 + # 获取name mappings用于标准输出格式
  275 + name_mappings = {}
  276 + if args.debug:
  277 + logger.info("获取物品名称映射...")
  278 + name_mappings = fetch_name_mappings(engine, debug=True)
  279 +
  280 + logger.info(f"写入结果到 {output_file}...")
225 with open(output_file, 'w', encoding='utf-8') as f: 281 with open(output_file, 'w', encoding='utf-8') as f:
226 for item_id, sims in result.items(): 282 for item_id, sims in result.items():
227 - item_name = item_name_map.get(item_id, 'Unknown') 283 + # 使用name_mappings获取名称,如果没有则从df中获取
  284 + item_name = name_mappings.get(int(item_id), 'Unknown') if item_id.isdigit() else 'Unknown'
  285 + if item_name == 'Unknown' and 'item_name' in df.columns:
  286 + item_name = df[df['item_id'].astype(str) == item_id]['item_name'].iloc[0] if len(df[df['item_id'].astype(str) == item_id]) > 0 else 'Unknown'
228 287
229 if not sims: 288 if not sims:
230 continue 289 continue
@@ -233,8 +292,19 @@ def main(): @@ -233,8 +292,19 @@ def main():
233 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) 292 sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
234 f.write(f'{item_id}\t{item_name}\t{sim_str}\n') 293 f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
235 294
236 - print(f"Done! Generated i2i similarities for {len(result)} items")  
237 - print(f"Output saved to: {output_file}") 295 + logger.info(f"完成!为 {len(result)} 个物品生成了相似度")
  296 + logger.info(f"输出保存到:{output_file}")
  297 +
  298 + # 如果启用debug模式,保存可读格式
  299 + if args.debug:
  300 + log_processing_step(logger, "保存Debug可读格式")
  301 + save_readable_index(
  302 + output_file,
  303 + result,
  304 + name_mappings,
  305 + index_type='i2i:session_w2v',
  306 + logger=logger
  307 + )
238 308
239 309
240 if __name__ == '__main__': 310 if __name__ == '__main__':
offline_tasks/scripts/interest_aggregation.py
@@ -17,6 +17,11 @@ from offline_tasks.config.offline_config import ( @@ -17,6 +17,11 @@ from offline_tasks.config.offline_config import (
17 DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range, 17 DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range,
18 DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N 18 DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N
19 ) 19 )
  20 +from offline_tasks.scripts.debug_utils import (
  21 + setup_debug_logger, log_dataframe_info, log_dict_stats,
  22 + save_readable_index, fetch_name_mappings, log_algorithm_params,
  23 + log_processing_step
  24 +)
20 25
21 26
22 def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30): 27 def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30):
@@ -227,8 +232,22 @@ def main(): @@ -227,8 +232,22 @@ def main():
227 232
228 args = parser.parse_args() 233 args = parser.parse_args()
229 234
  235 + # 设置logger
  236 + logger = setup_debug_logger('interest_aggregation', debug=args.debug)
  237 +
  238 + # 记录算法参数
  239 + params = {
  240 + 'top_n': args.top_n,
  241 + 'lookback_days': args.lookback_days,
  242 + 'recent_days': args.recent_days,
  243 + 'new_days': args.new_days,
  244 + 'decay_factor': args.decay_factor,
  245 + 'debug': args.debug
  246 + }
  247 + log_algorithm_params(logger, params)
  248 +
230 # 创建数据库连接 249 # 创建数据库连接
231 - print("Connecting to database...") 250 + logger.info("连接数据库...")
232 engine = create_db_connection( 251 engine = create_db_connection(
233 DB_CONFIG['host'], 252 DB_CONFIG['host'],
234 DB_CONFIG['port'], 253 DB_CONFIG['port'],
@@ -242,7 +261,9 @@ def main(): @@ -242,7 +261,9 @@ def main():
242 recent_start_date, _ = get_time_range(args.recent_days) 261 recent_start_date, _ = get_time_range(args.recent_days)
243 new_start_date, _ = get_time_range(args.new_days) 262 new_start_date, _ = get_time_range(args.new_days)
244 263
245 - print(f"Fetching data from {start_date} to {end_date}...") 264 + logger.info(f"获取数据范围:{start_date} 到 {end_date}")
  265 + logger.debug(f"热门商品起始日期:{recent_start_date}")
  266 + logger.debug(f"新品起始日期:{new_start_date}")
246 267
247 # SQL查询 - 获取用户行为数据(包含用户特征和商品分类) 268 # SQL查询 - 获取用户行为数据(包含用户特征和商品分类)
248 sql_query = f""" 269 sql_query = f"""
@@ -279,9 +300,12 @@ def main(): @@ -279,9 +300,12 @@ def main():
279 se.create_time 300 se.create_time
280 """ 301 """
281 302
282 - print("Executing SQL query...") 303 + logger.info("执行SQL查询...")
283 df = pd.read_sql(sql_query, engine) 304 df = pd.read_sql(sql_query, engine)
284 - print(f"Fetched {len(df)} records") 305 + logger.info(f"获取到 {len(df)} 条记录")
  306 +
  307 + # 记录数据信息
  308 + log_dataframe_info(logger, df, "用户行为数据")
285 309
286 # 转换时间列 310 # 转换时间列
287 df['create_time'] = pd.to_datetime(df['create_time']) 311 df['create_time'] = pd.to_datetime(df['create_time'])
@@ -289,37 +313,70 @@ def main(): @@ -289,37 +313,70 @@ def main():
289 313
290 # 定义行为权重 314 # 定义行为权重
291 behavior_weights = INTEREST_AGGREGATION_CONFIG['behavior_weights'] 315 behavior_weights = INTEREST_AGGREGATION_CONFIG['behavior_weights']
  316 + logger.debug(f"行为权重: {behavior_weights}")
292 317
293 # 准备不同类型的数据集 318 # 准备不同类型的数据集
  319 + log_processing_step(logger, "准备不同类型的数据集")
294 320
295 # 1. 热门商品:最近N天的高交互商品 321 # 1. 热门商品:最近N天的高交互商品
296 df_hot = df[df['create_time'] >= recent_start_date].copy() 322 df_hot = df[df['create_time'] >= recent_start_date].copy()
  323 + logger.info(f"热门商品数据集:{len(df_hot)} 条记录")
297 324
298 # 2. 加购商品:加购行为 325 # 2. 加购商品:加购行为
299 df_cart = df[df['event_type'].isin(['addToCart', 'addToPool'])].copy() 326 df_cart = df[df['event_type'].isin(['addToCart', 'addToPool'])].copy()
  327 + logger.info(f"加购商品数据集:{len(df_cart)} 条记录")
300 328
301 # 3. 新品:商品创建时间在最近N天内 329 # 3. 新品:商品创建时间在最近N天内
302 df_new = df[df['item_create_time'] >= new_start_date].copy() 330 df_new = df[df['item_create_time'] >= new_start_date].copy()
  331 + logger.info(f"新品数据集:{len(df_new)} 条记录")
303 332
304 # 生成不同列表类型的索引 333 # 生成不同列表类型的索引
305 - print("\n=== Generating indices ===") 334 + log_processing_step(logger, "生成不同列表类型的索引")
306 list_type_indices = generate_list_type_indices( 335 list_type_indices = generate_list_type_indices(
307 df_hot, df_cart, df_new, behavior_weights 336 df_hot, df_cart, df_new, behavior_weights
308 ) 337 )
  338 + logger.info(f"生成了 {len(list_type_indices)} 种列表类型的索引")
  339 +
  340 + # 获取name mappings用于debug输出
  341 + name_mappings = {}
  342 + if args.debug:
  343 + logger.info("获取物品名称映射...")
  344 + name_mappings = fetch_name_mappings(engine, debug=True)
309 345
310 # 输出索引 346 # 输出索引
  347 + log_processing_step(logger, "保存索引文件")
311 for list_type, aggregations in list_type_indices.items(): 348 for list_type, aggregations in list_type_indices.items():
312 output_prefix = f'{args.output_prefix}_{list_type}' 349 output_prefix = f'{args.output_prefix}_{list_type}'
  350 + logger.info(f"保存 {list_type} 类型的索引...")
313 output_indices(aggregations, output_prefix, top_n=args.top_n) 351 output_indices(aggregations, output_prefix, top_n=args.top_n)
  352 +
  353 + # 如果启用debug模式,保存可读格式
  354 + if args.debug and aggregations:
  355 + for dim_key, items in aggregations.items():
  356 + if items:
  357 + # 为每个维度生成可读索引
  358 + result_dict = {dim_key: items[:args.top_n]}
  359 + output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{dim_key}_{datetime.now().strftime("%Y%m%d")}.txt')
  360 + if os.path.exists(output_file):
  361 + save_readable_index(
  362 + output_file,
  363 + result_dict,
  364 + name_mappings,
  365 + index_type=f'interest:{list_type}:{dim_key}',
  366 + logger=logger
  367 + )
314 368
315 # 生成全局索引(所有数据) 369 # 生成全局索引(所有数据)
316 - print("\nGenerating global indices...") 370 + log_processing_step(logger, "生成全局索引")
317 global_aggregations = aggregate_by_dimensions( 371 global_aggregations = aggregate_by_dimensions(
318 df, behavior_weights, time_decay=True, decay_factor=args.decay_factor 372 df, behavior_weights, time_decay=True, decay_factor=args.decay_factor
319 ) 373 )
  374 + logger.info("保存全局索引...")
320 output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n) 375 output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n)
321 376
322 - print("\n=== All indices generated successfully! ===") 377 + logger.info("="*80)
  378 + logger.info("所有索引生成完成!")
  379 + logger.info("="*80)
323 380
324 381
325 if __name__ == '__main__': 382 if __name__ == '__main__':