ucf.py 6.24 KB
import sys
import json
import logging
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# 日志配置
logging.basicConfig(filename='logs/ucf.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 输入数据,用户对item的评分
# 暂定为0.0,也就是所有的行为都会进行考虑。如果要过滤掉只有一次点击的,可以设定为1.1,1分是一次点击,有点击阅读页或者多次点击就会达到2分以上
user_rating_threshold = 0.0
# 当某个用于基于最近邻推荐输出的item list低于多少时不输出
least_items_size_to_output = 5
# 每个用户输出的top_k
top_k = 50

# 本模块的主要特点:
# 读取数据并合并同一个用户的多行记录:同一个用户可能会出现在多行中,对同一个用户的多行记录进行了合并。
# 计算用户之间的相似性:用户协同过滤的关键是计算用户之间的相似度。为了加速计算,可以使用基于向量化的余弦相似度,而避免直接计算两两用户之间的相似度。
# 为每个用户推荐物品:根据相似用户的评分,为每个用户推荐新的物品,并计算推荐得分。


# 读取数据,并合并同一个用户的多行记录
def read_input(input_file):
    user_items = defaultdict(dict)
    
    with open(input_file, 'r') as f:
        for line_num, line in enumerate(f, 1):
            try:
                uid, items_str = line.strip().split('\t')
                items = json.loads(items_str)
                for item_id, score in items.items():
                    if score < user_rating_threshold:
                        continue
                    if item_id in user_items[uid]:
                        user_items[uid][item_id] += score  # 合并相同用户的评分
                    else:
                        user_items[uid][item_id] = score
            except ValueError as ve:
                logging.error(f"Data format error at line {line_num}: {line.strip()}. Error: {ve}")
            except json.JSONDecodeError as je:
                logging.error(f"JSON parse error at line {line_num}: {line.strip()}. Error: {je}")

    logging.info(f"Input data loaded from {input_file}. Total users: {len(user_items)}")
    return user_items

# 基于物品评分构建用户-物品矩阵
def build_user_item_matrix(user_items):
    all_items = set()
    for items in user_items.values():
        all_items.update(items.keys())
    
    item_list = list(all_items)
    item_index = {item_id: idx for idx, item_id in enumerate(item_list)}

    user_list = list(user_items.keys())
    user_index = {uid: idx for idx, uid in enumerate(user_list)}
    
    user_item_matrix = np.zeros((len(user_list), len(item_list)))
    
    for uid, items in user_items.items():
        for item_id, score in items.items():
            user_item_matrix[user_index[uid]][item_index[item_id]] = score

    logging.info(f"User-item matrix built with shape: {user_item_matrix.shape}")
    
    return user_item_matrix, user_list, item_list, user_index, item_index

# 基于余弦相似度计算用户相似性矩阵
def compute_user_similarity(user_item_matrix):
    similarity_matrix = cosine_similarity(user_item_matrix)
    logging.info("User similarity matrix computed.")
    return similarity_matrix

# 基于相似用户为每个用户推荐物品
def recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k=50):
    recommendations = defaultdict(dict)

    for uid in user_list:
        u_idx = user_index[uid]
        similar_users = np.argsort(-similarity_matrix[u_idx])[:top_k]  # 取前top_k个相似用户

        # 遍历这些相似用户的物品,累积推荐得分
        item_scores = defaultdict(float)
        for sim_uid_idx in similar_users:
            if sim_uid_idx == u_idx:  # 跳过自己
                continue
            sim_uid = user_list[sim_uid_idx]
            for item_id, score in user_items[sim_uid].items():
                if item_id not in user_items[uid]:  # 只推荐未交互过的物品
                    item_scores[item_id] += score * similarity_matrix[u_idx][sim_uid_idx]
        
        # 将得分最高的物品推荐给用户
        recom_list = {item_id: score for item_id, score in sorted(item_scores.items(), key=lambda x: -x[1])[:top_k]}
        if len(recom_list) > least_items_size_to_output:
            recommendations[uid] = recom_list
    
    logging.info("Recommendations computed for all users.")
    return recommendations

# 输出推荐结果
def write_output(recommendations, output_file):
    try:
        with open(output_file, 'w') as f:
            for uid, rec_items in recommendations.items():
                rec_str = ",".join([f"{item_id}:{score:.2f}" for item_id, score in rec_items.items()])
                f.write(f"{uid}\t{rec_str}\n")
        logging.info(f"Recommendations written to {output_file}.")
    except Exception as e:
        logging.error(f"Error writing recommendations to {output_file}: {e}")

def main():
    if len(sys.argv) != 3:
        print("Usage: python recommend.py <input_file> <output_file>")
        logging.error("Invalid number of arguments. Expected 2 arguments: input_file and output_file.")
        sys.exit(1)
    
    input_file = sys.argv[1]
    output_file = sys.argv[2]

    logging.info(f"Starting recommendation process. Input file: {input_file}, Output file: {output_file}")

    # Step 1: 读取并合并输入
    user_items = read_input(input_file)

    if not user_items:
        logging.error(f"No valid user-item data found in {input_file}. Exiting.")
        sys.exit(1)

    # Step 2: 构建用户-物品矩阵
    user_item_matrix, user_list, item_list, user_index, item_index = build_user_item_matrix(user_items)

    # Step 3: 计算用户相似性
    similarity_matrix = compute_user_similarity(user_item_matrix)

    # Step 4: 为用户推荐物品
    recommendations = recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k)

    # Step 5: 输出推荐结果
    write_output(recommendations, output_file)

if __name__ == '__main__':
    main()