""" 兴趣点聚合索引生成 按照多个维度(平台、国家、客户类型、分类、列表类型)生成商品索引 """ import pandas as pd import math import os import argparse import json from datetime import datetime, timedelta from collections import defaultdict, Counter from db_service import create_db_connection from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N ) from scripts.debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step ) def calculate_time_weight(event_time, reference_time, decay_factor=0.95, days_unit=30): """ 计算时间衰减权重 Args: event_time: 事件发生时间 reference_time: 参考时间(当前时间) decay_factor: 衰减因子 days_unit: 衰减单位(天) Returns: 时间权重 """ if pd.isna(event_time): return 1.0 time_diff = (reference_time - event_time).days if time_diff < 0: return 1.0 # 计算衰减权重 periods = time_diff / days_unit weight = math.pow(decay_factor, periods) return weight def aggregate_by_dimensions(df, behavior_weights, time_decay=True, decay_factor=0.95): """ 按多维度聚合商品 Args: df: DataFrame with necessary columns behavior_weights: 行为权重字典 time_decay: 是否使用时间衰减 decay_factor: 时间衰减因子 Returns: Dict: {dimension_key: {item_id: score}} """ reference_time = datetime.now() # 添加行为权重 df['behavior_weight'] = df['event_type'].map(behavior_weights).fillna(1.0) # 添加时间权重 if time_decay: df['time_weight'] = df['create_time'].apply( lambda x: calculate_time_weight(x, reference_time, decay_factor) ) else: df['time_weight'] = 1.0 # 计算最终权重 df['final_weight'] = df['behavior_weight'] * df['time_weight'] # 初始化聚合结果 aggregations = defaultdict(lambda: defaultdict(float)) # 遍历数据,按不同维度聚合 for _, row in df.iterrows(): item_id = row['item_id'] weight = row['final_weight'] # 维度1: 业务平台 (business_platform) if pd.notna(row.get('platform')): key = f"platform:{row['platform']}" aggregations[key][item_id] += weight # 维度2: 客户端平台 (client_platform) if pd.notna(row.get('client_platform')): key = f"client_platform:{row['client_platform']}" aggregations[key][item_id] += weight # 维度3: 供应商 (supplier_id) if pd.notna(row.get('supplier_id')): key = f"supplier:{row['supplier_id']}" aggregations[key][item_id] += weight # 维度4: 一级分类 (category_level1) if pd.notna(row.get('category_level1_id')): key = f"category_level1:{row['category_level1_id']}" aggregations[key][item_id] += weight # 维度5: 二级分类 (category_level2) if pd.notna(row.get('category_level2_id')): key = f"category_level2:{row['category_level2_id']}" aggregations[key][item_id] += weight # 维度6: 三级分类 (category_level3) if pd.notna(row.get('category_level3_id')): key = f"category_level3:{row['category_level3_id']}" aggregations[key][item_id] += weight # 维度7: 四级分类 (category_level4) if pd.notna(row.get('category_level4_id')): key = f"category_level4:{row['category_level4_id']}" aggregations[key][item_id] += weight # 组合维度: 业务平台 + 客户端平台 if pd.notna(row.get('platform')) and pd.notna(row.get('client_platform')): key = f"platform_client:{row['platform']}_{row['client_platform']}" aggregations[key][item_id] += weight # 组合维度: 平台 + 二级分类 if pd.notna(row.get('platform')) and pd.notna(row.get('category_level2_id')): key = f"platform_category2:{row['platform']}_{row['category_level2_id']}" aggregations[key][item_id] += weight # 组合维度: 平台 + 三级分类 if pd.notna(row.get('platform')) and pd.notna(row.get('category_level3_id')): key = f"platform_category3:{row['platform']}_{row['category_level3_id']}" aggregations[key][item_id] += weight # 组合维度: 客户端平台 + 二级分类 if pd.notna(row.get('client_platform')) and pd.notna(row.get('category_level2_id')): key = f"client_category2:{row['client_platform']}_{row['category_level2_id']}" aggregations[key][item_id] += weight return aggregations def generate_list_type_indices(df_hot, df_cart, df_new, behavior_weights): """ 生成不同列表类型的索引(热门、加购、新品) Args: df_hot: 热门商品数据 df_cart: 加购商品数据 df_new: 新品数据 behavior_weights: 行为权重 Returns: Dict: {list_type: aggregations} """ list_type_indices = {} # 热门商品索引 if not df_hot.empty: print("Generating hot item indices...") # B2B低频场景,不使用时间衰减 list_type_indices['hot'] = aggregate_by_dimensions( df_hot, behavior_weights, time_decay=False ) # 加购商品索引 if not df_cart.empty: print("Generating cart item indices...") # B2B低频场景,不使用时间衰减 list_type_indices['cart'] = aggregate_by_dimensions( df_cart, behavior_weights, time_decay=False ) # 新品索引 if not df_new.empty: print("Generating new item indices...") # 新品不使用时间衰减,因为新品本身就是时间敏感的 list_type_indices['new'] = aggregate_by_dimensions( df_new, behavior_weights, time_decay=False ) return list_type_indices def output_indices(aggregations, output_prefix, top_n=1000): """ 输出索引到文件 Args: aggregations: 聚合结果 {dimension_key: {item_id: score}} output_prefix: 输出文件前缀 top_n: 每个维度输出前N个商品 """ output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{datetime.now().strftime("%Y%m%d")}.txt') print(f"Writing indices to {output_file}...") with open(output_file, 'w', encoding='utf-8') as f: for dim_key, items in aggregations.items(): # 按分数排序,取前N个 sorted_items = sorted(items.items(), key=lambda x: -x[1])[:top_n] if not sorted_items: continue # 格式:dimension_key \t item_id1:score1,item_id2:score2,... items_str = ','.join([f'{item_id}:{score:.4f}' for item_id, score in sorted_items]) f.write(f'{dim_key}\t{items_str}\n') print(f"Output saved to: {output_file}") print(f"Generated indices for {len(aggregations)} dimension keys") def main(): parser = argparse.ArgumentParser(description='Generate interest aggregation indices') parser.add_argument('--top_n', type=int, default=DEFAULT_INTEREST_TOP_N, help=f'Top N items per dimension (default: {DEFAULT_INTEREST_TOP_N})') parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})') parser.add_argument('--recent_days', type=int, default=DEFAULT_RECENT_DAYS, help=f'Recent days for hot items (default: {DEFAULT_RECENT_DAYS})') parser.add_argument('--new_days', type=int, default=DEFAULT_RECENT_DAYS, help=f'Days for new items (default: {DEFAULT_RECENT_DAYS})') parser.add_argument('--decay_factor', type=float, default=INTEREST_AGGREGATION_CONFIG['time_decay_factor'], help='Time decay factor') parser.add_argument('--output_prefix', type=str, default='interest_aggregation', help='Output file prefix') parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed logging and readable output') args = parser.parse_args() # 设置logger logger = setup_debug_logger('interest_aggregation', debug=args.debug) # 记录算法参数 params = { 'top_n': args.top_n, 'lookback_days': args.lookback_days, 'recent_days': args.recent_days, 'new_days': args.new_days, 'decay_factor': args.decay_factor, 'debug': args.debug } log_algorithm_params(logger, params) # 创建数据库连接 logger.info("连接数据库...") engine = create_db_connection( DB_CONFIG['host'], DB_CONFIG['port'], DB_CONFIG['database'], DB_CONFIG['username'], DB_CONFIG['password'] ) # 获取时间范围 start_date, end_date = get_time_range(args.lookback_days) recent_start_date, _ = get_time_range(args.recent_days) new_start_date, _ = get_time_range(args.new_days) logger.info(f"获取数据范围:{start_date} 到 {end_date}") logger.debug(f"热门商品起始日期:{recent_start_date}") logger.debug(f"新品起始日期:{new_start_date}") # SQL查询 - 获取用户行为数据(包含用户特征和商品分类) sql_query = f""" SELECT se.anonymous_id AS user_id, se.item_id, se.event AS event_type, se.create_time, pgs.name AS item_name, pgs.create_time AS item_create_time, se.business_platform AS platform, se.client_platform, pg.supplier_id, pg.category_id, pc_1.id as category_level1_id, pc_2.id as category_level2_id, pc_3.id as category_level3_id, pc_4.id as category_level4_id FROM sensors_events se LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id LEFT JOIN prd_goods pg ON pg.id = pgs.goods_id LEFT JOIN prd_category as pc ON pc.id = pg.category_id LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1) LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1) LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1) LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1) WHERE se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase') AND se.create_time >= '{start_date}' AND se.create_time <= '{end_date}' AND se.item_id IS NOT NULL ORDER BY se.create_time """ logger.info("执行SQL查询...") df = pd.read_sql(sql_query, engine) logger.info(f"获取到 {len(df)} 条记录") # 确保ID为整数类型 df['item_id'] = df['item_id'].astype(int) df['user_id'] = df['user_id'].astype(str) if 'category_id' in df.columns: df['category_id'] = df['category_id'].astype(int) if 'supplier_id' in df.columns: df['supplier_id'] = df['supplier_id'].astype(int) # 记录数据信息 log_dataframe_info(logger, df, "用户行为数据") # 转换时间列 df['create_time'] = pd.to_datetime(df['create_time']) df['item_create_time'] = pd.to_datetime(df['item_create_time'], errors='coerce') # 定义行为权重 behavior_weights = INTEREST_AGGREGATION_CONFIG['behavior_weights'] logger.debug(f"行为权重: {behavior_weights}") # 准备不同类型的数据集 log_processing_step(logger, "准备不同类型的数据集") # 1. 热门商品:最近N天的高交互商品 df_hot = df[df['create_time'] >= recent_start_date].copy() logger.info(f"热门商品数据集:{len(df_hot)} 条记录") # 2. 加购商品:加购行为 df_cart = df[df['event_type'].isin(['addToCart', 'addToPool'])].copy() logger.info(f"加购商品数据集:{len(df_cart)} 条记录") # 3. 新品:商品创建时间在最近N天内 df_new = df[df['item_create_time'] >= new_start_date].copy() logger.info(f"新品数据集:{len(df_new)} 条记录") # 生成不同列表类型的索引 log_processing_step(logger, "生成不同列表类型的索引") list_type_indices = generate_list_type_indices( df_hot, df_cart, df_new, behavior_weights ) logger.info(f"生成了 {len(list_type_indices)} 种列表类型的索引") # 获取name mappings用于debug输出 name_mappings = {} if args.debug: logger.info("获取物品名称映射...") name_mappings = fetch_name_mappings(engine, debug=True) # 输出索引 log_processing_step(logger, "保存索引文件") for list_type, aggregations in list_type_indices.items(): output_prefix = f'{args.output_prefix}_{list_type}' logger.info(f"保存 {list_type} 类型的索引...") output_indices(aggregations, output_prefix, top_n=args.top_n) # 如果启用debug模式,保存可读格式 if args.debug and aggregations: for dim_key, items in aggregations.items(): if items: # 为每个维度生成可读索引 - 先排序再取前N个 sorted_items = sorted(items.items(), key=lambda x: -x[1])[:args.top_n] result_dict = {dim_key: sorted_items} output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{dim_key}_{datetime.now().strftime("%Y%m%d")}.txt') if os.path.exists(output_file): save_readable_index( output_file, result_dict, name_mappings, description=f'interest:{list_type}:{dim_key}' ) # 生成全局索引(所有数据) log_processing_step(logger, "生成全局索引") # B2B低频场景,不使用时间衰减 global_aggregations = aggregate_by_dimensions( df, behavior_weights, time_decay=False, decay_factor=args.decay_factor ) logger.info("保存全局索引...") output_indices(global_aggregations, f'{args.output_prefix}_global', top_n=args.top_n) logger.info("="*80) logger.info("所有索引生成完成!") logger.info("="*80) if __name__ == '__main__': main()