i2i_deepwalk.py 10.3 KB
"""
i2i - DeepWalk算法实现
基于用户-物品图结构训练DeepWalk模型,获取物品向量相似度
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

import pandas as pd
import argparse
from datetime import datetime
from collections import defaultdict
from gensim.models import Word2Vec
import numpy as np
from db_service import create_db_connection
from offline_tasks.config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range,
    DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N
)


def build_item_graph(df, behavior_weights):
    """
    构建物品图(基于用户共同交互)
    
    Args:
        df: DataFrame with columns: user_id, item_id, event_type
        behavior_weights: 行为权重字典
    
    Returns:
        edge_dict: {item_id: {neighbor_id: weight}}
    """
    # 构建用户-物品列表
    user_items = defaultdict(list)
    
    for _, row in df.iterrows():
        user_id = row['user_id']
        item_id = str(row['item_id'])
        event_type = row['event_type']
        weight = behavior_weights.get(event_type, 1.0)
        
        user_items[user_id].append((item_id, weight))
    
    # 构建物品图边
    edge_dict = defaultdict(lambda: defaultdict(float))
    
    for user_id, items in user_items.items():
        # 物品两两组合,构建边
        for i in range(len(items)):
            item_i, weight_i = items[i]
            for j in range(i + 1, len(items)):
                item_j, weight_j = items[j]
                
                # 边的权重为两个物品权重的平均值
                edge_weight = (weight_i + weight_j) / 2.0
                edge_dict[item_i][item_j] += edge_weight
                edge_dict[item_j][item_i] += edge_weight
    
    return edge_dict


def save_edge_file(edge_dict, output_path):
    """
    保存边文件
    
    Args:
        edge_dict: 边字典
        output_path: 输出路径
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        for item_id, neighbors in edge_dict.items():
            # 格式: item_id \t neighbor1:weight1,neighbor2:weight2,...
            neighbor_str = ','.join([f'{nbr}:{weight:.4f}' for nbr, weight in neighbors.items()])
            f.write(f'{item_id}\t{neighbor_str}\n')
    
    print(f"Edge file saved to {output_path}")


def random_walk(graph, start_node, walk_length):
    """
    执行随机游走
    
    Args:
        graph: 图结构 {node: {neighbor: weight}}
        start_node: 起始节点
        walk_length: 游走长度
    
    Returns:
        游走序列
    """
    walk = [start_node]
    
    while len(walk) < walk_length:
        cur = walk[-1]
        
        if cur not in graph or not graph[cur]:
            break
        
        # 获取邻居和权重
        neighbors = list(graph[cur].keys())
        weights = list(graph[cur].values())
        
        # 归一化权重
        total_weight = sum(weights)
        if total_weight == 0:
            break
        
        probs = [w / total_weight for w in weights]
        
        # 按权重随机选择下一个节点
        next_node = np.random.choice(neighbors, p=probs)
        walk.append(next_node)
    
    return walk


def generate_walks(graph, num_walks, walk_length):
    """
    生成随机游走序列
    
    Args:
        graph: 图结构
        num_walks: 每个节点的游走次数
        walk_length: 游走长度
    
    Returns:
        List of walks
    """
    walks = []
    nodes = list(graph.keys())
    
    print(f"Generating {num_walks} walks per node, walk length {walk_length}...")
    
    for _ in range(num_walks):
        np.random.shuffle(nodes)
        for node in nodes:
            walk = random_walk(graph, node, walk_length)
            if len(walk) >= 2:
                walks.append(walk)
    
    return walks


def train_word2vec(walks, config):
    """
    训练Word2Vec模型
    
    Args:
        walks: 游走序列列表
        config: Word2Vec配置
    
    Returns:
        Word2Vec模型
    """
    print(f"Training Word2Vec with {len(walks)} walks...")
    
    model = Word2Vec(
        sentences=walks,
        vector_size=config['vector_size'],
        window=config['window_size'],
        min_count=config['min_count'],
        workers=config['workers'],
        sg=config['sg'],
        epochs=config['epochs'],
        seed=42
    )
    
    print(f"Training completed. Vocabulary size: {len(model.wv)}")
    return model


def generate_similarities(model, top_n=50):
    """
    生成物品相似度
    
    Args:
        model: Word2Vec模型
        top_n: Top N similar items
    
    Returns:
        Dict[item_id, List[Tuple(similar_item_id, score)]]
    """
    result = {}
    
    for item_id in model.wv.index_to_key:
        try:
            similar_items = model.wv.most_similar(item_id, topn=top_n)
            result[item_id] = [(sim_id, float(score)) for sim_id, score in similar_items]
        except KeyError:
            continue
    
    return result


def main():
    parser = argparse.ArgumentParser(description='Run DeepWalk for i2i similarity')
    parser.add_argument('--num_walks', type=int, default=I2I_CONFIG['deepwalk']['num_walks'],
                       help='Number of walks per node')
    parser.add_argument('--walk_length', type=int, default=I2I_CONFIG['deepwalk']['walk_length'],
                       help='Walk length')
    parser.add_argument('--window_size', type=int, default=I2I_CONFIG['deepwalk']['window_size'],
                       help='Window size for Word2Vec')
    parser.add_argument('--vector_size', type=int, default=I2I_CONFIG['deepwalk']['vector_size'],
                       help='Vector size for Word2Vec')
    parser.add_argument('--min_count', type=int, default=I2I_CONFIG['deepwalk']['min_count'],
                       help='Minimum word count')
    parser.add_argument('--workers', type=int, default=I2I_CONFIG['deepwalk']['workers'],
                       help='Number of workers')
    parser.add_argument('--epochs', type=int, default=I2I_CONFIG['deepwalk']['epochs'],
                       help='Number of epochs')
    parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N,
                       help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
    parser.add_argument('--lookback_days', type=int, default=DEFAULT_LOOKBACK_DAYS,
                       help=f'Number of days to look back (default: {DEFAULT_LOOKBACK_DAYS})')
    parser.add_argument('--output', type=str, default=None,
                       help='Output file path')
    parser.add_argument('--save_model', action='store_true',
                       help='Save Word2Vec model')
    parser.add_argument('--save_graph', action='store_true',
                       help='Save graph edge file')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging and readable output')
    
    args = parser.parse_args()
    
    # 创建数据库连接
    print("Connecting to database...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取时间范围
    start_date, end_date = get_time_range(args.lookback_days)
    print(f"Fetching data from {start_date} to {end_date}...")
    
    # SQL查询 - 获取用户行为数据
    sql_query = f"""
    SELECT 
        se.anonymous_id AS user_id,
        se.item_id,
        se.event AS event_type,
        pgs.name AS item_name
    FROM 
        sensors_events se
    LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
    WHERE 
        se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase')
        AND se.create_time >= '{start_date}'
        AND se.create_time <= '{end_date}'
        AND se.item_id IS NOT NULL
        AND se.anonymous_id IS NOT NULL
    """
    
    print("Executing SQL query...")
    df = pd.read_sql(sql_query, engine)
    print(f"Fetched {len(df)} records")
    
    # 定义行为权重
    behavior_weights = {
        'click': 1.0,
        'contactFactory': 5.0,
        'addToPool': 2.0,
        'addToCart': 3.0,
        'purchase': 10.0
    }
    
    # 构建物品图
    print("Building item graph...")
    graph = build_item_graph(df, behavior_weights)
    print(f"Graph built with {len(graph)} nodes")
    
    # 保存边文件(可选)
    if args.save_graph:
        edge_file = os.path.join(OUTPUT_DIR, f'item_graph_{datetime.now().strftime("%Y%m%d")}.txt')
        save_edge_file(graph, edge_file)
    
    # 生成随机游走
    print("Generating random walks...")
    walks = generate_walks(graph, args.num_walks, args.walk_length)
    print(f"Generated {len(walks)} walks")
    
    # 训练Word2Vec模型
    w2v_config = {
        'vector_size': args.vector_size,
        'window_size': args.window_size,
        'min_count': args.min_count,
        'workers': args.workers,
        'epochs': args.epochs,
        'sg': 1
    }
    
    model = train_word2vec(walks, w2v_config)
    
    # 保存模型(可选)
    if args.save_model:
        model_path = os.path.join(OUTPUT_DIR, f'deepwalk_model_{datetime.now().strftime("%Y%m%d")}.model')
        model.save(model_path)
        print(f"Model saved to {model_path}")
    
    # 生成相似度
    print("Generating similarities...")
    result = generate_similarities(model, top_n=args.top_n)
    
    # 创建item_id到name的映射
    item_name_map = dict(zip(df['item_id'].astype(str), df.groupby('item_id')['item_name'].first()))
    
    # 输出结果
    output_file = args.output or os.path.join(OUTPUT_DIR, f'i2i_deepwalk_{datetime.now().strftime("%Y%m%d")}.txt')
    
    print(f"Writing results to {output_file}...")
    with open(output_file, 'w', encoding='utf-8') as f:
        for item_id, sims in result.items():
            item_name = item_name_map.get(item_id, 'Unknown')
            
            if not sims:
                continue
            
            # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,...
            sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
            f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
    
    print(f"Done! Generated i2i similarities for {len(result)} items")
    print(f"Output saved to: {output_file}")


if __name__ == '__main__':
    main()