interest_aggregation.py 14.4 KB
"""
兴趣点聚合索引生成
按照多个维度(平台、国家、客户类型、分类、列表类型)生成商品索引
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

import pandas as pd
import math
import argparse
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from db_service import create_db_connection
from offline_tasks.config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range,
    DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N
)
from offline_tasks.scripts.debug_utils import (
    setup_debug_logger, log_dataframe_info, log_dict_stats,
    save_readable_index, fetch_name_mappings, log_algorithm_params,
    log_processing_step
)


def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30):
    """
    计算时间衰减权重
    
    Args:
        event_time: 事件发生时间
        reference_time: 参考时间(当前时间)
        decay_factor: 衰减因子
        days_unit: 衰减单位(天)
    
    Returns:
        时间权重
    """
    if pd.isna(event_time):
        return 1.0
    
    time_diff = (reference_time - event_time).days
    if time_diff < 0:
        return 1.0
    
    # 计算衰减权重
    periods = time_diff / days_unit
    weight = math.pow(decay_factor, periods)
    return weight


def aggregate_by_dimensions(df, behavior_weights, time_decay=True, decay_factor=0.95):
    """
    按多维度聚合商品
    
    Args:
        df: DataFrame with necessary columns
        behavior_weights: 行为权重字典
        time_decay: 是否使用时间衰减
        decay_factor: 时间衰减因子
    
    Returns:
        Dict: {dimension_key: {item_id: score}}
    """
    reference_time = datetime.now()
    
    # 添加行为权重
    df['behavior_weight'] = df['event_type'].map(behavior_weights).fillna(1.0)
    
    # 添加时间权重
    if time_decay:
        df['time_weight'] = df['create_time'].apply(
            lambda x: calculate_time_weight(x, reference_time, decay_factor)
        )
    else:
        df['time_weight'] = 1.0
    
    # 计算最终权重
    df['final_weight'] = df['behavior_weight'] * df['time_weight']
    
    # 初始化聚合结果
    aggregations = defaultdict(lambda: defaultdict(float))
    
    # 遍历数据,按不同维度聚合
    for _, row in df.iterrows():
        item_id = row['item_id']
        weight = row['final_weight']
        
        # 维度1: 业务平台 (business_platform)
        if pd.notna(row.get('platform')):
            key = f"platform:{row['platform']}"
            aggregations[key][item_id] += weight
        
        # 维度2: 客户端平台 (client_platform)
        if pd.notna(row.get('client_platform')):
            key = f"client_platform:{row['client_platform']}"
            aggregations[key][item_id] += weight
        
        # 维度3: 供应商 (supplier_id)
        if pd.notna(row.get('supplier_id')):
            key = f"supplier:{row['supplier_id']}"
            aggregations[key][item_id] += weight
        
        # 维度4: 一级分类 (category_level1)
        if pd.notna(row.get('category_level1_id')):
            key = f"category_level1:{row['category_level1_id']}"
            aggregations[key][item_id] += weight
        
        # 维度5: 二级分类 (category_level2)
        if pd.notna(row.get('category_level2_id')):
            key = f"category_level2:{row['category_level2_id']}"
            aggregations[key][item_id] += weight
        
        # 维度6: 三级分类 (category_level3)
        if pd.notna(row.get('category_level3_id')):
            key = f"category_level3:{row['category_level3_id']}"
            aggregations[key][item_id] += weight
        
        # 维度7: 四级分类 (category_level4)
        if pd.notna(row.get('category_level4_id')):
            key = f"category_level4:{row['category_level4_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 业务平台 + 客户端平台
        if pd.notna(row.get('platform')) and pd.notna(row.get('client_platform')):
            key = f"platform_client:{row['platform']}_{row['client_platform']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 平台 + 二级分类
        if pd.notna(row.get('platform')) and pd.notna(row.get('category_level2_id')):
            key = f"platform_category2:{row['platform']}_{row['category_level2_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 平台 + 三级分类
        if pd.notna(row.get('platform')) and pd.notna(row.get('category_level3_id')):
            key = f"platform_category3:{row['platform']}_{row['category_level3_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 客户端平台 + 二级分类
        if pd.notna(row.get('client_platform')) and pd.notna(row.get('category_level2_id')):
            key = f"client_category2:{row['client_platform']}_{row['category_level2_id']}"
            aggregations[key][item_id] += weight
    
    return aggregations


def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights):
    """
    生成不同列表类型的索引(热门、加购、新品)
    
    Args:
        df_hot: 热门商品数据
        df_cart: 加购商品数据
        df_new: 新品数据
        behavior_weights: 行为权重
    
    Returns:
        Dict: {list_type: aggregations}
    """
    list_type_indices = {}
    
    # 热门商品索引
    if not df_hot.empty:
        print("Generating hot item indices...")
        list_type_indices['hot'] = aggregate_by_dimensions(
            df_hot, behavior_weights, time_decay=True
        )
    
    # 加购商品索引
    if not df_cart.empty:
        print("Generating cart item indices...")
        list_type_indices['cart'] = aggregate_by_dimensions(
            df_cart, behavior_weights, time_decay=True
        )
    
    # 新品索引
    if not df_new.empty:
        print("Generating new item indices...")
        # 新品不使用时间衰减,因为新品本身就是时间敏感的
        list_type_indices['new'] = aggregate_by_dimensions(
            df_new, behavior_weights, time_decay=False
        )
    
    return list_type_indices


def output_indices(aggregations, output_prefix, top_n=1000):
    """
    输出索引到文件
    
    Args:
        aggregations: 聚合结果 {dimension_key: {item_id: score}}
        output_prefix: 输出文件前缀
        top_n: 每个维度输出前N个商品
    """
    output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{datetime.now().strftime("%Y%m%d")}.txt')
    
    print(f"Writing indices to {output_file}...")
    with open(output_file, 'w', encoding='utf-8') as f:
        for dim_key, items in aggregations.items():
            # 按分数排序,取前N个
            sorted_items = sorted(items.items(), key=lambda x: -x[1])[:top_n]
            
            if not sorted_items:
                continue
            
            # 格式:dimension_key \t item_id1:score1,item_id2:score2,...
            items_str = ','.join([f'{item_id}:{score:.4f}' for item_id, score in sorted_items])
            f.write(f'{dim_key}\t{items_str}\n')
    
    print(f"Output saved to: {output_file}")
    print(f"Generated indices for {len(aggregations)} dimension keys")


def main():
    parser = argparse.ArgumentParser(description='Generate interest aggregation indices')
    parser.add_argument('--top_n', type=int, default=DEFAULT_INTEREST_TOP_N,
                       help=f'Top N items per dimension (default: {DEFAULT_INTEREST_TOP_N})')
    parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
                       help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})')
    parser.add_argument('--recent_days', type=int, default=DEFAULT_RECENT_DAYS,
                       help=f'Recent days for hot items (default: {DEFAULT_RECENT_DAYS})')
    parser.add_argument('--new_days', type=int, default=DEFAULT_RECENT_DAYS,
                       help=f'Days for new items (default: {DEFAULT_RECENT_DAYS})')
    parser.add_argument('--decay_factor', type=float, default=INTEREST_AGGREGATION_CONFIG['time_decay_factor'],
                       help='Time decay factor')
    parser.add_argument('--output_prefix', type=str, default='interest_aggregation',
                       help='Output file prefix')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging and readable output')
    
    args = parser.parse_args()
    
    # 设置logger
    logger = setup_debug_logger('interest_aggregation', debug=args.debug)
    
    # 记录算法参数
    params = {
        'top_n': args.top_n,
        'lookback_days': args.lookback_days,
        'recent_days': args.recent_days,
        'new_days': args.new_days,
        'decay_factor': args.decay_factor,
        'debug': args.debug
    }
    log_algorithm_params(logger, params)
    
    # 创建数据库连接
    logger.info("连接数据库...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取时间范围
    start_date, end_date = get_time_range(args.lookback_days)
    recent_start_date, _ = get_time_range(args.recent_days)
    new_start_date, _ = get_time_range(args.new_days)
    
    logger.info(f"获取数据范围:{start_date} 到 {end_date}")
    logger.debug(f"热门商品起始日期:{recent_start_date}")
    logger.debug(f"新品起始日期:{new_start_date}")
    
    # SQL查询 - 获取用户行为数据(包含用户特征和商品分类)
    sql_query = f"""
    SELECT 
        se.anonymous_id AS user_id,
        se.item_id,
        se.event AS event_type,
        se.create_time,
        pgs.name AS item_name,
        pgs.create_time AS item_create_time,
        se.business_platform AS platform,
        se.client_platform,
        pg.supplier_id,
        pg.category_id,
        pc_1.id as category_level1_id,
        pc_2.id as category_level2_id,
        pc_3.id as category_level3_id,
        pc_4.id as category_level4_id
    FROM 
        sensors_events se
    LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
    LEFT JOIN prd_goods pg ON pg.id = pgs.goods_id
    LEFT JOIN prd_category as pc ON pc.id = pg.category_id
    LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1)
    LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1)
    LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1)
    LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1)
    WHERE 
        se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase')
        AND se.create_time >= '{start_date}'
        AND se.create_time <= '{end_date}'
        AND se.item_id IS NOT NULL
    ORDER BY 
        se.create_time
    """
    
    logger.info("执行SQL查询...")
    df = pd.read_sql(sql_query, engine)
    logger.info(f"获取到 {len(df)} 条记录")
    
    # 记录数据信息
    log_dataframe_info(logger, df, "用户行为数据")
    
    # 转换时间列
    df['create_time'] = pd.to_datetime(df['create_time'])
    df['item_create_time'] = pd.to_datetime(df['item_create_time'], errors='coerce')
    
    # 定义行为权重
    behavior_weights = INTEREST_AGGREGATION_CONFIG['behavior_weights']
    logger.debug(f"行为权重: {behavior_weights}")
    
    # 准备不同类型的数据集
    log_processing_step(logger, "准备不同类型的数据集")
    
    # 1. 热门商品:最近N天的高交互商品
    df_hot = df[df['create_time'] >= recent_start_date].copy()
    logger.info(f"热门商品数据集:{len(df_hot)} 条记录")
    
    # 2. 加购商品:加购行为
    df_cart = df[df['event_type'].isin(['addToCart', 'addToPool'])].copy()
    logger.info(f"加购商品数据集:{len(df_cart)} 条记录")
    
    # 3. 新品:商品创建时间在最近N天内
    df_new = df[df['item_create_time'] >= new_start_date].copy()
    logger.info(f"新品数据集:{len(df_new)} 条记录")
    
    # 生成不同列表类型的索引
    log_processing_step(logger, "生成不同列表类型的索引")
    list_type_indices = generate_list_type_indices(
        df_hot, df_cart, df_new, behavior_weights
    )
    logger.info(f"生成了 {len(list_type_indices)} 种列表类型的索引")
    
    # 获取name mappings用于debug输出
    name_mappings = {}
    if args.debug:
        logger.info("获取物品名称映射...")
        name_mappings = fetch_name_mappings(engine, debug=True)
    
    # 输出索引
    log_processing_step(logger, "保存索引文件")
    for list_type, aggregations in list_type_indices.items():
        output_prefix = f'{args.output_prefix}_{list_type}'
        logger.info(f"保存 {list_type} 类型的索引...")
        output_indices(aggregations, output_prefix, top_n=args.top_n)
        
        # 如果启用debug模式,保存可读格式
        if args.debug and aggregations:
            for dim_key, items in aggregations.items():
                if items:
                    # 为每个维度生成可读索引
                    result_dict = {dim_key: items[:args.top_n]}
                    output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{dim_key}_{datetime.now().strftime("%Y%m%d")}.txt')
                    if os.path.exists(output_file):
                        save_readable_index(
                            output_file,
                            result_dict,
                            name_mappings,
                            index_type=f'interest:{list_type}:{dim_key}',
                            logger=logger
                        )
    
    # 生成全局索引(所有数据)
    log_processing_step(logger, "生成全局索引")
    global_aggregations = aggregate_by_dimensions(
        df, behavior_weights, time_decay=True, decay_factor=args.decay_factor
    )
    logger.info("保存全局索引...")
    output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n)
    
    logger.info("="*80)
    logger.info("所有索引生成完成!")
    logger.info("="*80)


if __name__ == '__main__':
    main()