interest_aggregation.py 11.9 KB
"""
兴趣点聚合索引生成
按照多个维度(平台、国家、客户类型、分类、列表类型)生成商品索引
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

import pandas as pd
import math
import argparse
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from db_service import create_db_connection
from offline_tasks.config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range,
    DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N
)


def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30):
    """
    计算时间衰减权重
    
    Args:
        event_time: 事件发生时间
        reference_time: 参考时间(当前时间)
        decay_factor: 衰减因子
        days_unit: 衰减单位(天)
    
    Returns:
        时间权重
    """
    if pd.isna(event_time):
        return 1.0
    
    time_diff = (reference_time - event_time).days
    if time_diff < 0:
        return 1.0
    
    # 计算衰减权重
    periods = time_diff / days_unit
    weight = math.pow(decay_factor, periods)
    return weight


def aggregate_by_dimensions(df, behavior_weights, time_decay=True, decay_factor=0.95):
    """
    按多维度聚合商品
    
    Args:
        df: DataFrame with necessary columns
        behavior_weights: 行为权重字典
        time_decay: 是否使用时间衰减
        decay_factor: 时间衰减因子
    
    Returns:
        Dict: {dimension_key: {item_id: score}}
    """
    reference_time = datetime.now()
    
    # 添加行为权重
    df['behavior_weight'] = df['event_type'].map(behavior_weights).fillna(1.0)
    
    # 添加时间权重
    if time_decay:
        df['time_weight'] = df['create_time'].apply(
            lambda x: calculate_time_weight(x, reference_time, decay_factor)
        )
    else:
        df['time_weight'] = 1.0
    
    # 计算最终权重
    df['final_weight'] = df['behavior_weight'] * df['time_weight']
    
    # 初始化聚合结果
    aggregations = defaultdict(lambda: defaultdict(float))
    
    # 遍历数据,按不同维度聚合
    for _, row in df.iterrows():
        item_id = row['item_id']
        weight = row['final_weight']
        
        # 维度1: 业务平台 (business_platform)
        if pd.notna(row.get('platform')):
            key = f"platform:{row['platform']}"
            aggregations[key][item_id] += weight
        
        # 维度2: 客户端平台 (client_platform)
        if pd.notna(row.get('client_platform')):
            key = f"client_platform:{row['client_platform']}"
            aggregations[key][item_id] += weight
        
        # 维度3: 供应商 (supplier_id)
        if pd.notna(row.get('supplier_id')):
            key = f"supplier:{row['supplier_id']}"
            aggregations[key][item_id] += weight
        
        # 维度4: 一级分类 (category_level1)
        if pd.notna(row.get('category_level1_id')):
            key = f"category_level1:{row['category_level1_id']}"
            aggregations[key][item_id] += weight
        
        # 维度5: 二级分类 (category_level2)
        if pd.notna(row.get('category_level2_id')):
            key = f"category_level2:{row['category_level2_id']}"
            aggregations[key][item_id] += weight
        
        # 维度6: 三级分类 (category_level3)
        if pd.notna(row.get('category_level3_id')):
            key = f"category_level3:{row['category_level3_id']}"
            aggregations[key][item_id] += weight
        
        # 维度7: 四级分类 (category_level4)
        if pd.notna(row.get('category_level4_id')):
            key = f"category_level4:{row['category_level4_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 业务平台 + 客户端平台
        if pd.notna(row.get('platform')) and pd.notna(row.get('client_platform')):
            key = f"platform_client:{row['platform']}_{row['client_platform']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 平台 + 二级分类
        if pd.notna(row.get('platform')) and pd.notna(row.get('category_level2_id')):
            key = f"platform_category2:{row['platform']}_{row['category_level2_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 平台 + 三级分类
        if pd.notna(row.get('platform')) and pd.notna(row.get('category_level3_id')):
            key = f"platform_category3:{row['platform']}_{row['category_level3_id']}"
            aggregations[key][item_id] += weight
        
        # 组合维度: 客户端平台 + 二级分类
        if pd.notna(row.get('client_platform')) and pd.notna(row.get('category_level2_id')):
            key = f"client_category2:{row['client_platform']}_{row['category_level2_id']}"
            aggregations[key][item_id] += weight
    
    return aggregations


def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights):
    """
    生成不同列表类型的索引(热门、加购、新品)
    
    Args:
        df_hot: 热门商品数据
        df_cart: 加购商品数据
        df_new: 新品数据
        behavior_weights: 行为权重
    
    Returns:
        Dict: {list_type: aggregations}
    """
    list_type_indices = {}
    
    # 热门商品索引
    if not df_hot.empty:
        print("Generating hot item indices...")
        list_type_indices['hot'] = aggregate_by_dimensions(
            df_hot, behavior_weights, time_decay=True
        )
    
    # 加购商品索引
    if not df_cart.empty:
        print("Generating cart item indices...")
        list_type_indices['cart'] = aggregate_by_dimensions(
            df_cart, behavior_weights, time_decay=True
        )
    
    # 新品索引
    if not df_new.empty:
        print("Generating new item indices...")
        # 新品不使用时间衰减,因为新品本身就是时间敏感的
        list_type_indices['new'] = aggregate_by_dimensions(
            df_new, behavior_weights, time_decay=False
        )
    
    return list_type_indices


def output_indices(aggregations, output_prefix, top_n=1000):
    """
    输出索引到文件
    
    Args:
        aggregations: 聚合结果 {dimension_key: {item_id: score}}
        output_prefix: 输出文件前缀
        top_n: 每个维度输出前N个商品
    """
    output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{datetime.now().strftime("%Y%m%d")}.txt')
    
    print(f"Writing indices to {output_file}...")
    with open(output_file, 'w', encoding='utf-8') as f:
        for dim_key, items in aggregations.items():
            # 按分数排序,取前N个
            sorted_items = sorted(items.items(), key=lambda x: -x[1])[:top_n]
            
            if not sorted_items:
                continue
            
            # 格式:dimension_key \t item_id1:score1,item_id2:score2,...
            items_str = ','.join([f'{item_id}:{score:.4f}' for item_id, score in sorted_items])
            f.write(f'{dim_key}\t{items_str}\n')
    
    print(f"Output saved to: {output_file}")
    print(f"Generated indices for {len(aggregations)} dimension keys")


def main():
    parser = argparse.ArgumentParser(description='Generate interest aggregation indices')
    parser.add_argument('--top_n', type=int, default=DEFAULT_INTEREST_TOP_N,
                       help=f'Top N items per dimension (default: {DEFAULT_INTEREST_TOP_N})')
    parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
                       help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})')
    parser.add_argument('--recent_days', type=int, default=DEFAULT_RECENT_DAYS,
                       help=f'Recent days for hot items (default: {DEFAULT_RECENT_DAYS})')
    parser.add_argument('--new_days', type=int, default=DEFAULT_RECENT_DAYS,
                       help=f'Days for new items (default: {DEFAULT_RECENT_DAYS})')
    parser.add_argument('--decay_factor', type=float, default=INTEREST_AGGREGATION_CONFIG['time_decay_factor'],
                       help='Time decay factor')
    parser.add_argument('--output_prefix', type=str, default='interest_aggregation',
                       help='Output file prefix')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging and readable output')
    
    args = parser.parse_args()
    
    # 创建数据库连接
    print("Connecting to database...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取时间范围
    start_date, end_date = get_time_range(args.lookback_days)
    recent_start_date, _ = get_time_range(args.recent_days)
    new_start_date, _ = get_time_range(args.new_days)
    
    print(f"Fetching data from {start_date} to {end_date}...")
    
    # SQL查询 - 获取用户行为数据(包含用户特征和商品分类)
    sql_query = f"""
    SELECT 
        se.anonymous_id AS user_id,
        se.item_id,
        se.event AS event_type,
        se.create_time,
        pgs.name AS item_name,
        pgs.create_time AS item_create_time,
        se.business_platform AS platform,
        se.client_platform,
        pg.supplier_id,
        pg.category_id,
        pc_1.id as category_level1_id,
        pc_2.id as category_level2_id,
        pc_3.id as category_level3_id,
        pc_4.id as category_level4_id
    FROM 
        sensors_events se
    LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
    LEFT JOIN prd_goods pg ON pg.id = pgs.goods_id
    LEFT JOIN prd_category as pc ON pc.id = pg.category_id
    LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1)
    LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1)
    LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1)
    LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1)
    WHERE 
        se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase')
        AND se.create_time >= '{start_date}'
        AND se.create_time <= '{end_date}'
        AND se.item_id IS NOT NULL
    ORDER BY 
        se.create_time
    """
    
    print("Executing SQL query...")
    df = pd.read_sql(sql_query, engine)
    print(f"Fetched {len(df)} records")
    
    # 转换时间列
    df['create_time'] = pd.to_datetime(df['create_time'])
    df['item_create_time'] = pd.to_datetime(df['item_create_time'], errors='coerce')
    
    # 定义行为权重
    behavior_weights = INTEREST_AGGREGATION_CONFIG['behavior_weights']
    
    # 准备不同类型的数据集
    
    # 1. 热门商品:最近N天的高交互商品
    df_hot = df[df['create_time'] >= recent_start_date].copy()
    
    # 2. 加购商品:加购行为
    df_cart = df[df['event_type'].isin(['addToCart', 'addToPool'])].copy()
    
    # 3. 新品:商品创建时间在最近N天内
    df_new = df[df['item_create_time'] >= new_start_date].copy()
    
    # 生成不同列表类型的索引
    print("\n=== Generating indices ===")
    list_type_indices = generate_list_type_indices(
        df_hot, df_cart, df_new, behavior_weights
    )
    
    # 输出索引
    for list_type, aggregations in list_type_indices.items():
        output_prefix = f'{args.output_prefix}_{list_type}'
        output_indices(aggregations, output_prefix, top_n=args.top_n)
    
    # 生成全局索引(所有数据)
    print("\nGenerating global indices...")
    global_aggregations = aggregate_by_dimensions(
        df, behavior_weights, time_decay=True, decay_factor=args.decay_factor
    )
    output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n)
    
    print("\n=== All indices generated successfully! ===")


if __name__ == '__main__':
    main()