""" 生成用户行为Session文件 从数据库读取用户行为,生成适用于C++ Swing算法的session文件 输出格式: uid \t {"item_id":score,"item_id":score,...} """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import pandas as pd import json from collections import defaultdict import argparse from datetime import datetime, timedelta from db_service import create_db_connection from offline_tasks.config.offline_config import ( DB_CONFIG, OUTPUT_DIR, get_time_range, DEFAULT_LOOKBACK_DAYS ) from offline_tasks.scripts.debug_utils import setup_debug_logger, log_dataframe_info def aggregate_user_sessions(df, behavior_weights, logger=None, debug=False): """ 聚合用户行为session Args: df: DataFrame with columns: user_id, item_id, event_type, create_time behavior_weights: 行为权重字典 logger: 日志记录器 debug: 是否开启debug模式 Returns: Dict[user_id, Dict[item_id, score]] """ if logger: logger.info("开始聚合用户行为session...") # 添加权重列 df['weight'] = df['event_type'].map(behavior_weights).fillna(1.0) # 按用户聚合 user_sessions = defaultdict(lambda: defaultdict(float)) for _, row in df.iterrows(): user_id = row['user_id'] item_id = row['item_id'] weight = row['weight'] # 累加权重(同一用户对同一商品的多次行为) user_sessions[user_id][item_id] += weight if logger: logger.info(f"聚合完成,共 {len(user_sessions)} 个用户") # 统计 total_interactions = sum(len(items) for items in user_sessions.values()) avg_interactions = total_interactions / len(user_sessions) if user_sessions else 0 logger.info(f"平均每个用户交互 {avg_interactions:.2f} 个商品") if debug: # 展示示例 sample_users = list(user_sessions.items())[:3] for user_id, items in sample_users: logger.debug(f"用户 {user_id} 的session: {dict(list(items.items())[:5])}...") return user_sessions def save_session_file(user_sessions, output_file, logger=None, debug=False): """ 保存session文件 格式: uid \t {"item_id":score,"item_id":score,...} 其中items按score降序排列 Args: user_sessions: Dict[user_id, Dict[item_id, score]] output_file: 输出文件路径 logger: 日志记录器 debug: 是否开启debug模式 """ if logger: logger.info(f"保存session文件到: {output_file}") with open(output_file, 'w', encoding='utf-8') as f: for user_id, items in user_sessions.items(): # 按分数降序排序 sorted_items = sorted(items.items(), key=lambda x: -x[1]) # 构建JSON字符串(注意item_id需要加引号) items_dict = {str(item_id): round(score, 4) for item_id, score in sorted_items} items_json = json.dumps(items_dict, ensure_ascii=False, separators=(',', ':')) # 写入文件 f.write(f"{user_id}\t{items_json}\n") if logger: logger.info(f"保存完成,共 {len(user_sessions)} 个用户session") def save_session_file_for_cpp(user_sessions, output_file, logger=None, debug=False): """ 保存session文件(C++版本格式,不包含uid) 格式: {"item_id":score,"item_id":score,...} 每行一个用户的session,按score降序排列 Args: user_sessions: Dict[user_id, Dict[item_id, score]] output_file: 输出文件路径 logger: 日志记录器 debug: 是否开启debug模式 """ if logger: logger.info(f"保存session文件(C++格式)到: {output_file}") with open(output_file, 'w', encoding='utf-8') as f: for user_id, items in user_sessions.items(): # 按分数降序排序 sorted_items = sorted(items.items(), key=lambda x: -x[1]) # 构建JSON字符串(注意item_id需要加引号) items_dict = {f'"{item_id}"': round(score, 4) for item_id, score in sorted_items} # 手动构建JSON格式(保证引号格式) items_str = ','.join([f'"{k.strip(chr(34))}":{v}' for k, v in items_dict.items()]) items_json = '{' + items_str + '}' # 写入文件 f.write(f"{items_json}\n") if logger: logger.info(f"保存完成(C++格式),共 {len(user_sessions)} 个用户session") def main(): parser = argparse.ArgumentParser(description='Generate user behavior session file') parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})') parser.add_argument('--output', type=str, default=None, help='Output file path') parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed logging') parser.add_argument('--format', type=str, default='both', choices=['standard', 'cpp', 'both'], help='Output format: standard (uid+json), cpp (json only), both (default: both)') args = parser.parse_args() # 设置日志 logger = setup_debug_logger('generate_session', debug=args.debug) # 记录参数 logger.info(f"参数配置:") logger.info(f" lookback_days: {args.lookback_days}") logger.info(f" debug: {args.debug}") logger.info(f" format: {args.format}") # 创建数据库连接 logger.info("连接数据库...") engine = create_db_connection( DB_CONFIG['host'], DB_CONFIG['port'], DB_CONFIG['database'], DB_CONFIG['username'], DB_CONFIG['password'] ) # 获取时间范围 start_date, end_date = get_time_range(args.lookback_days) logger.info(f"获取数据: {start_date} 到 {end_date}") # SQL查询 - 获取用户行为数据 sql_query = f""" SELECT se.anonymous_id AS user_id, se.item_id, se.event AS event_type, se.create_time FROM sensors_events se WHERE se.event IN ('contactFactory', 'addToPool', 'addToCart', 'purchase') AND se.create_time >= '{start_date}' AND se.create_time <= '{end_date}' AND se.item_id IS NOT NULL AND se.anonymous_id IS NOT NULL ORDER BY se.create_time """ try: logger.info("执行SQL查询...") df = pd.read_sql(sql_query, engine) logger.info(f"获取到 {len(df)} 条记录") # Debug: 显示数据详情 if args.debug: log_dataframe_info(logger, df, "用户行为数据", sample_size=10) except Exception as e: logger.error(f"获取数据失败: {e}") return if len(df) == 0: logger.warning("没有找到数据") return # 转换create_time为datetime df['create_time'] = pd.to_datetime(df['create_time']) # 定义行为权重 behavior_weights = { 'contactFactory': 5.0, 'addToPool': 2.0, 'addToCart': 3.0, 'purchase': 10.0 } if logger and args.debug: logger.debug(f"行为类型分布:") event_counts = df['event_type'].value_counts() for event, count in event_counts.items(): logger.debug(f" {event}: {count} ({count/len(df)*100:.2f}%)") # 聚合用户session user_sessions = aggregate_user_sessions( df, behavior_weights, logger=logger, debug=args.debug ) # 生成输出文件名 date_str = datetime.now().strftime("%Y%m%d") if args.output: output_base = args.output else: output_base = os.path.join(OUTPUT_DIR, f'session.txt.{date_str}') # 保存文件 if args.format in ['standard', 'both']: output_file = output_base save_session_file(user_sessions, output_file, logger=logger, debug=args.debug) if args.format in ['cpp', 'both']: output_file_cpp = output_base + '.cpp' save_session_file_for_cpp(user_sessions, output_file_cpp, logger=logger, debug=args.debug) logger.info("完成!") if __name__ == '__main__': main()