""" i2i - DeepWalk算法实现 基于用户-物品图结构训练DeepWalk模型,获取物品向量相似度 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import pandas as pd import argparse from datetime import datetime from collections import defaultdict from gensim.models import Word2Vec import numpy as np from db_service import create_db_connection from offline_tasks.config.offline_config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) def build_item_graph(df, behavior_weights): """ 构建物品图(基于用户共同交互) Args: df: DataFrame with columns: user_id, item_id, event_type behavior_weights: 行为权重字典 Returns: edge_dict: {item_id: {neighbor_id: weight}} """ # 构建用户-物品列表 user_items = defaultdict(list) for _, row in df.iterrows(): user_id = row['user_id'] item_id = str(row['item_id']) event_type = row['event_type'] weight = behavior_weights.get(event_type, 1.0) user_items[user_id].append((item_id, weight)) # 构建物品图边 edge_dict = defaultdict(lambda: defaultdict(float)) for user_id, items in user_items.items(): # 物品两两组合,构建边 for i in range(len(items)): item_i, weight_i = items[i] for j in range(i + 1, len(items)): item_j, weight_j = items[j] # 边的权重为两个物品权重的平均值 edge_weight = (weight_i + weight_j) / 2.0 edge_dict[item_i][item_j] += edge_weight edge_dict[item_j][item_i] += edge_weight return edge_dict def save_edge_file(edge_dict, output_path): """ 保存边文件 Args: edge_dict: 边字典 output_path: 输出路径 """ with open(output_path, 'w', encoding='utf-8') as f: for item_id, neighbors in edge_dict.items(): # 格式: item_id \t neighbor1:weight1,neighbor2:weight2,... neighbor_str = ','.join([f'{nbr}:{weight:.4f}' for nbr, weight in neighbors.items()]) f.write(f'{item_id}\t{neighbor_str}\n') print(f"Edge file saved to {output_path}") def random_walk(graph, start_node, walk_length): """ 执行随机游走 Args: graph: 图结构 {node: {neighbor: weight}} start_node: 起始节点 walk_length: 游走长度 Returns: 游走序列 """ walk = [start_node] while len(walk) < walk_length: cur = walk[-1] if cur not in graph or not graph[cur]: break # 获取邻居和权重 neighbors = list(graph[cur].keys()) weights = list(graph[cur].values()) # 归一化权重 total_weight = sum(weights) if total_weight == 0: break probs = [w / total_weight for w in weights] # 按权重随机选择下一个节点 next_node = np.random.choice(neighbors, p=probs) walk.append(next_node) return walk def generate_walks(graph, num_walks, walk_length): """ 生成随机游走序列 Args: graph: 图结构 num_walks: 每个节点的游走次数 walk_length: 游走长度 Returns: List of walks """ walks = [] nodes = list(graph.keys()) print(f"Generating {num_walks} walks per node, walk length {walk_length}...") for _ in range(num_walks): np.random.shuffle(nodes) for node in nodes: walk = random_walk(graph, node, walk_length) if len(walk) >= 2: walks.append(walk) return walks def train_word2vec(walks, config): """ 训练Word2Vec模型 Args: walks: 游走序列列表 config: Word2Vec配置 Returns: Word2Vec模型 """ print(f"Training Word2Vec with {len(walks)} walks...") model = Word2Vec( sentences=walks, vector_size=config['vector_size'], window=config['window_size'], min_count=config['min_count'], workers=config['workers'], sg=config['sg'], epochs=config['epochs'], seed=42 ) print(f"Training completed. Vocabulary size: {len(model.wv)}") return model def generate_similarities(model, top_n=50): """ 生成物品相似度 Args: model: Word2Vec模型 top_n: Top N similar items Returns: Dict[item_id, List[Tuple(similar_item_id, score)]] """ result = {} for item_id in model.wv.index_to_key: try: similar_items = model.wv.most_similar(item_id, topn=top_n) result[item_id] = [(sim_id, float(score)) for sim_id, score in similar_items] except KeyError: continue return result def main(): parser = argparse.ArgumentParser(description='Run DeepWalk for i2i similarity') parser.add_argument('--num_walks', type=int, default=I2I_CONFIG['deepwalk']['num_walks'], help='Number of walks per node') parser.add_argument('--walk_length', type=int, default=I2I_CONFIG['deepwalk']['walk_length'], help='Walk length') parser.add_argument('--window_size', type=int, default=I2I_CONFIG['deepwalk']['window_size'], help='Window size for Word2Vec') parser.add_argument('--vector_size', type=int, default=I2I_CONFIG['deepwalk']['vector_size'], help='Vector size for Word2Vec') parser.add_argument('--min_count', type=int, default=I2I_CONFIG['deepwalk']['min_count'], help='Minimum word count') parser.add_argument('--workers', type=int, default=I2I_CONFIG['deepwalk']['workers'], help='Number of workers') parser.add_argument('--epochs', type=int, default=I2I_CONFIG['deepwalk']['epochs'], help='Number of epochs') parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N, help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS, help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})') parser.add_argument('--output', type=str, default=None, help='Output file path') parser.add_argument('--save_model', action='store_true', help='Save Word2Vec model') parser.add_argument('--save_graph', action='store_true', help='Save graph edge file') parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed logging and readable output') args = parser.parse_args() # 创建数据库连接 print("Connecting to database...") engine = create_db_connection( DB_CONFIG['host'], DB_CONFIG['port'], DB_CONFIG['database'], DB_CONFIG['username'], DB_CONFIG['password'] ) # 获取时间范围 start_date, end_date = get_time_range(args.lookback_days) print(f"Fetching data from {start_date} to {end_date}...") # SQL查询 - 获取用户行为数据 sql_query = f""" SELECT se.anonymous_id AS user_id, se.item_id, se.event AS event_type, pgs.name AS item_name FROM sensors_events se LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id WHERE se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase') AND se.create_time >= '{start_date}' AND se.create_time <= '{end_date}' AND se.item_id IS NOT NULL AND se.anonymous_id IS NOT NULL """ print("Executing SQL query...") df = pd.read_sql(sql_query, engine) print(f"Fetched {len(df)} records") # 定义行为权重 behavior_weights = { 'click': 1.0, 'contactFactory': 5.0, 'addToPool': 2.0, 'addToCart': 3.0, 'purchase': 10.0 } # 构建物品图 print("Building item graph...") graph = build_item_graph(df, behavior_weights) print(f"Graph built with {len(graph)} nodes") # 保存边文件(可选) if args.save_graph: edge_file = os.path.join(OUTPUT_DIR, f'item_graph_{datetime.now().strftime("%Y%m%d")}.txt') save_edge_file(graph, edge_file) # 生成随机游走 print("Generating random walks...") walks = generate_walks(graph, args.num_walks, args.walk_length) print(f"Generated {len(walks)} walks") # 训练Word2Vec模型 w2v_config = { 'vector_size': args.vector_size, 'window_size': args.window_size, 'min_count': args.min_count, 'workers': args.workers, 'epochs': args.epochs, 'sg': 1 } model = train_word2vec(walks, w2v_config) # 保存模型(可选) if args.save_model: model_path = os.path.join(OUTPUT_DIR, f'deepwalk_model_{datetime.now().strftime("%Y%m%d")}.model') model.save(model_path) print(f"Model saved to {model_path}") # 生成相似度 print("Generating similarities...") result = generate_similarities(model, top_n=args.top_n) # 创建item_id到name的映射 item_name_map = dict(zip(df['item_id'].astype(str), df.groupby('item_id')['item_name'].first())) # 输出结果 output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_deepwalk_{datetime.now().strftime("%Y%m%d")}.txt') print(f"Writing results to {output_file}...") with open(output_file, 'w', encoding='utf-8') as f: for item_id, sims in result.items(): item_name = item_name_map.get(item_id, 'Unknown') if not sims: continue # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,... sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) f.write(f'{item_id}\t{item_name}\t{sim_str}\n') print(f"Done! Generated i2i similarities for {len(result)} items") print(f"Output saved to: {output_file}") if __name__ == '__main__': main()