import sys import json import logging from collections import defaultdict from sklearn.metrics.pairwise import cosine_similarity import numpy as np # 日志配置 logging.basicConfig(filename='logs/ucf.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 输入数据,用户对item的评分 # 暂定为0.0,也就是所有的行为都会进行考虑。如果要过滤掉只有一次点击的,可以设定为1.1,1分是一次点击,有点击阅读页或者多次点击就会达到2分以上 user_rating_threshold = 0.0 # 当某个用于基于最近邻推荐输出的item list低于多少时不输出 least_items_size_to_output = 5 # 每个用户输出的top_k top_k = 50 # 本模块的主要特点: # 读取数据并合并同一个用户的多行记录:同一个用户可能会出现在多行中,对同一个用户的多行记录进行了合并。 # 计算用户之间的相似性:用户协同过滤的关键是计算用户之间的相似度。为了加速计算,可以使用基于向量化的余弦相似度,而避免直接计算两两用户之间的相似度。 # 为每个用户推荐物品:根据相似用户的评分,为每个用户推荐新的物品,并计算推荐得分。 # 读取数据,并合并同一个用户的多行记录 def read_input(input_file): user_items = defaultdict(dict) with open(input_file, 'r') as f: for line_num, line in enumerate(f, 1): try: uid, items_str = line.strip().split('\t') items = json.loads(items_str) for item_id, score in items.items(): if score < user_rating_threshold: continue if item_id in user_items[uid]: user_items[uid][item_id] += score # 合并相同用户的评分 else: user_items[uid][item_id] = score except ValueError as ve: logging.error(f"Data format error at line {line_num}: {line.strip()}. Error: {ve}") except json.JSONDecodeError as je: logging.error(f"JSON parse error at line {line_num}: {line.strip()}. Error: {je}") logging.info(f"Input data loaded from {input_file}. Total users: {len(user_items)}") return user_items # 基于物品评分构建用户-物品矩阵 def build_user_item_matrix(user_items): all_items = set() for items in user_items.values(): all_items.update(items.keys()) item_list = list(all_items) item_index = {item_id: idx for idx, item_id in enumerate(item_list)} user_list = list(user_items.keys()) user_index = {uid: idx for idx, uid in enumerate(user_list)} user_item_matrix = np.zeros((len(user_list), len(item_list))) for uid, items in user_items.items(): for item_id, score in items.items(): user_item_matrix[user_index[uid]][item_index[item_id]] = score logging.info(f"User-item matrix built with shape: {user_item_matrix.shape}") return user_item_matrix, user_list, item_list, user_index, item_index # 基于余弦相似度计算用户相似性矩阵 def compute_user_similarity(user_item_matrix): similarity_matrix = cosine_similarity(user_item_matrix) logging.info("User similarity matrix computed.") return similarity_matrix # 基于相似用户为每个用户推荐物品 def recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k=50): recommendations = defaultdict(dict) for uid in user_list: u_idx = user_index[uid] similar_users = np.argsort(-similarity_matrix[u_idx])[:top_k] # 取前top_k个相似用户 # 遍历这些相似用户的物品,累积推荐得分 item_scores = defaultdict(float) for sim_uid_idx in similar_users: if sim_uid_idx == u_idx: # 跳过自己 continue sim_uid = user_list[sim_uid_idx] for item_id, score in user_items[sim_uid].items(): if item_id not in user_items[uid]: # 只推荐未交互过的物品 item_scores[item_id] += score * similarity_matrix[u_idx][sim_uid_idx] # 将得分最高的物品推荐给用户 recom_list = {item_id: score for item_id, score in sorted(item_scores.items(), key=lambda x: -x[1])[:top_k]} if len(recom_list) > least_items_size_to_output: recommendations[uid] = recom_list logging.info("Recommendations computed for all users.") return recommendations # 输出推荐结果 def write_output(recommendations, output_file): try: with open(output_file, 'w') as f: for uid, rec_items in recommendations.items(): rec_str = ",".join([f"{item_id}:{score:.2f}" for item_id, score in rec_items.items()]) f.write(f"{uid}\t{rec_str}\n") logging.info(f"Recommendations written to {output_file}.") except Exception as e: logging.error(f"Error writing recommendations to {output_file}: {e}") def main(): if len(sys.argv) != 3: print("Usage: python recommend.py ") logging.error("Invalid number of arguments. Expected 2 arguments: input_file and output_file.") sys.exit(1) input_file = sys.argv[1] output_file = sys.argv[2] logging.info(f"Starting recommendation process. Input file: {input_file}, Output file: {output_file}") # Step 1: 读取并合并输入 user_items = read_input(input_file) if not user_items: logging.error(f"No valid user-item data found in {input_file}. Exiting.") sys.exit(1) # Step 2: 构建用户-物品矩阵 user_item_matrix, user_list, item_list, user_index, item_index = build_user_item_matrix(user_items) # Step 3: 计算用户相似性 similarity_matrix = compute_user_similarity(user_item_matrix) # Step 4: 为用户推荐物品 recommendations = recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k) # Step 5: 输出推荐结果 write_output(recommendations, output_file) if __name__ == '__main__': main()