generate_session.py 8.27 KB
"""
生成用户行为Session文件
从数据库读取用户行为,生成适用于C++ Swing算法的session文件
输出格式: uid \t {"item_id":score,"item_id":score,...}
"""
import pandas as pd
import json,os
from collections import defaultdict
import argparse
from datetime import datetime, timedelta
from db_service import create_db_connection
from config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, get_time_range,
    DEFAULT_LOOKBACK_DAYS
)
from scripts.debug_utils import setup_debug_logger, log_dataframe_info


def aggregate_user_sessions(df, behavior_weights, logger=None, debug=False):
    """
    聚合用户行为session
    
    Args:
        df: DataFrame with columns: user_id, item_id, event_type, create_time
        behavior_weights: 行为权重字典
        logger: 日志记录器
        debug: 是否开启debug模式
    
    Returns:
        Dict[user_id, Dict[item_id, score]]
    """
    if logger:
        logger.info("开始聚合用户行为session...")
    
    # 添加权重列
    df['weight'] = df['event_type'].map(behavior_weights).fillna(1.0)
    
    # 按用户聚合
    user_sessions = defaultdict(lambda: defaultdict(float))
    
    for _, row in df.iterrows():
        user_id = row['user_id']
        item_id = row['item_id']
        weight = row['weight']
        
        # 累加权重(同一用户对同一商品的多次行为)
        user_sessions[user_id][item_id] += weight
    
    if logger:
        logger.info(f"聚合完成,共 {len(user_sessions)} 个用户")
        
        # 统计
        total_interactions = sum(len(items) for items in user_sessions.values())
        avg_interactions = total_interactions / len(user_sessions) if user_sessions else 0
        logger.info(f"平均每个用户交互 {avg_interactions:.2f} 个商品")
        
        if debug:
            # 展示示例
            sample_users = list(user_sessions.items())[:3]
            for user_id, items in sample_users:
                logger.debug(f"用户 {user_id} 的session: {dict(list(items.items())[:5])}...")
    
    return user_sessions


def save_session_file(user_sessions, output_file, logger=None, debug=False):
    """
    保存session文件
    
    格式: uid \t {"item_id":score,"item_id":score,...}
    其中items按score降序排列
    
    Args:
        user_sessions: Dict[user_id, Dict[item_id, score]]
        output_file: 输出文件路径
        logger: 日志记录器
        debug: 是否开启debug模式
    """
    if logger:
        logger.info(f"保存session文件到: {output_file}")
    
    with open(output_file, 'w', encoding='utf-8') as f:
        for user_id, items in user_sessions.items():
            # 按分数降序排序
            sorted_items = sorted(items.items(), key=lambda x: -x[1])
            
            # 构建JSON字符串(注意item_id需要加引号)
            items_dict = {str(item_id): round(score, 4) for item_id, score in sorted_items}
            items_json = json.dumps(items_dict, ensure_ascii=False, separators=(',', ':'))
            
            # 写入文件
            f.write(f"{user_id}\t{items_json}\n")
    
    if logger:
        logger.info(f"保存完成,共 {len(user_sessions)} 个用户session")


def save_session_file_for_cpp(user_sessions, output_file, logger=None, debug=False):
    """
    保存session文件(C++版本格式,不包含uid)
    
    格式: {"item_id":score,"item_id":score,...}
    每行一个用户的session,按score降序排列
    
    Args:
        user_sessions: Dict[user_id, Dict[item_id, score]]
        output_file: 输出文件路径
        logger: 日志记录器
        debug: 是否开启debug模式
    """
    if logger:
        logger.info(f"保存session文件(C++格式)到: {output_file}")
    
    with open(output_file, 'w', encoding='utf-8') as f:
        for user_id, items in user_sessions.items():
            # 按分数降序排序
            sorted_items = sorted(items.items(), key=lambda x: -x[1])
            
            # 构建JSON字符串(注意item_id需要加引号)
            items_dict = {f'"{item_id}"': round(score, 4) for item_id, score in sorted_items}
            # 手动构建JSON格式(保证引号格式)
            items_str = ','.join([f'"{k.strip(chr(34))}":{v}' for k, v in items_dict.items()])
            items_json = '{' + items_str + '}'
            
            # 写入文件
            f.write(f"{items_json}\n")
    
    if logger:
        logger.info(f"保存完成(C++格式),共 {len(user_sessions)} 个用户session")


def main():
    parser = argparse.ArgumentParser(description='Generate user behavior session file')
    parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
                       help=f'Number of days to look back for user behavior (default: {DEFAULT_LOOKBACK_DAYS})')
    parser.add_argument('--output', type=str, default=None,
                       help='Output file path')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging')
    parser.add_argument('--format', type=str, default='both', choices=['standard', 'cpp', 'both'],
                       help='Output format: standard (uid+json), cpp (json only), both (default: both)')
    
    args = parser.parse_args()
    
    # 设置日志
    logger = setup_debug_logger('generate_session', debug=args.debug)
    
    # 记录参数
    logger.info(f"参数配置:")
    logger.info(f"  lookback_days: {args.lookback_days}")
    logger.info(f"  debug: {args.debug}")
    logger.info(f"  format: {args.format}")
    
    # 创建数据库连接
    logger.info("连接数据库...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取时间范围
    start_date, end_date = get_time_range(args.lookback_days)
    logger.info(f"获取数据: {start_date} 到 {end_date}")
    
    # SQL查询 - 获取用户行为数据
    sql_query = f"""
    SELECT 
        se.anonymous_id AS user_id,
        se.item_id,
        se.event AS event_type,
        se.create_time
    FROM 
        sensors_events se
    WHERE 
        se.event IN ('contactFactory', 'addToPool', 'addToCart', 'purchase')
        AND se.create_time >= '{start_date}'
        AND se.create_time <= '{end_date}'
        AND se.item_id IS NOT NULL
        AND se.anonymous_id IS NOT NULL
    ORDER BY 
        se.create_time
    """
    
    try:
        logger.info("执行SQL查询...")
        df = pd.read_sql(sql_query, engine)
        logger.info(f"获取到 {len(df)} 条记录")
        
        # Debug: 显示数据详情
        if args.debug:
            log_dataframe_info(logger, df, "用户行为数据", sample_size=10)
    except Exception as e:
        logger.error(f"获取数据失败: {e}")
        return
    
    if len(df) == 0:
        logger.warning("没有找到数据")
        return
    
    # 转换create_time为datetime
    df['create_time'] = pd.to_datetime(df['create_time'])
    
    # 定义行为权重
    behavior_weights = {
        'contactFactory': 5.0,
        'addToPool': 2.0,
        'addToCart': 3.0,
        'purchase': 10.0
    }
    
    if logger and args.debug:
        logger.debug(f"行为类型分布:")
        event_counts = df['event_type'].value_counts()
        for event, count in event_counts.items():
            logger.debug(f"  {event}: {count} ({count/len(df)*100:.2f}%)")
    
    # 聚合用户session
    user_sessions = aggregate_user_sessions(
        df,
        behavior_weights,
        logger=logger,
        debug=args.debug
    )
    
    # 生成输出文件名
    date_str = datetime.now().strftime("%Y%m%d")
    
    if args.output:
        output_base = args.output
    else:
        output_base = os.path.join(OUTPUT_DIR, f'session.txt.{date_str}')
    
    # 保存文件
    if args.format in ['standard', 'both']:
        output_file = output_base
        save_session_file(user_sessions, output_file, logger=logger, debug=args.debug)
    
    if args.format in ['cpp', 'both']:
        output_file_cpp = output_base + '.cpp'
        save_session_file_for_cpp(user_sessions, output_file_cpp, logger=logger, debug=args.debug)
    
    logger.info("完成!")


if __name__ == '__main__':
    main()