5ab1c29c
tangwang
first commit
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
import sys
import json
import logging
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 日志配置
logging.basicConfig(filename='logs/ucf.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 输入数据,用户对item的评分
# 暂定为0.0,也就是所有的行为都会进行考虑。如果要过滤掉只有一次点击的,可以设定为1.1,1分是一次点击,有点击阅读页或者多次点击就会达到2分以上
user_rating_threshold = 0.0
# 当某个用于基于最近邻推荐输出的item list低于多少时不输出
least_items_size_to_output = 5
# 每个用户输出的top_k
top_k = 50
# 本模块的主要特点:
# 读取数据并合并同一个用户的多行记录:同一个用户可能会出现在多行中,对同一个用户的多行记录进行了合并。
# 计算用户之间的相似性:用户协同过滤的关键是计算用户之间的相似度。为了加速计算,可以使用基于向量化的余弦相似度,而避免直接计算两两用户之间的相似度。
# 为每个用户推荐物品:根据相似用户的评分,为每个用户推荐新的物品,并计算推荐得分。
# 读取数据,并合并同一个用户的多行记录
def read_input(input_file):
user_items = defaultdict(dict)
with open(input_file, 'r') as f:
for line_num, line in enumerate(f, 1):
try:
uid, items_str = line.strip().split('\t')
items = json.loads(items_str)
for item_id, score in items.items():
if score < user_rating_threshold:
continue
if item_id in user_items[uid]:
user_items[uid][item_id] += score # 合并相同用户的评分
else:
user_items[uid][item_id] = score
except ValueError as ve:
logging.error(f"Data format error at line {line_num}: {line.strip()}. Error: {ve}")
except json.JSONDecodeError as je:
logging.error(f"JSON parse error at line {line_num}: {line.strip()}. Error: {je}")
logging.info(f"Input data loaded from {input_file}. Total users: {len(user_items)}")
return user_items
# 基于物品评分构建用户-物品矩阵
def build_user_item_matrix(user_items):
all_items = set()
for items in user_items.values():
all_items.update(items.keys())
item_list = list(all_items)
item_index = {item_id: idx for idx, item_id in enumerate(item_list)}
user_list = list(user_items.keys())
user_index = {uid: idx for idx, uid in enumerate(user_list)}
user_item_matrix = np.zeros((len(user_list), len(item_list)))
for uid, items in user_items.items():
for item_id, score in items.items():
user_item_matrix[user_index[uid]][item_index[item_id]] = score
logging.info(f"User-item matrix built with shape: {user_item_matrix.shape}")
return user_item_matrix, user_list, item_list, user_index, item_index
# 基于余弦相似度计算用户相似性矩阵
def compute_user_similarity(user_item_matrix):
similarity_matrix = cosine_similarity(user_item_matrix)
logging.info("User similarity matrix computed.")
return similarity_matrix
# 基于相似用户为每个用户推荐物品
def recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k=50):
recommendations = defaultdict(dict)
for uid in user_list:
u_idx = user_index[uid]
similar_users = np.argsort(-similarity_matrix[u_idx])[:top_k] # 取前top_k个相似用户
# 遍历这些相似用户的物品,累积推荐得分
item_scores = defaultdict(float)
for sim_uid_idx in similar_users:
if sim_uid_idx == u_idx: # 跳过自己
continue
sim_uid = user_list[sim_uid_idx]
for item_id, score in user_items[sim_uid].items():
if item_id not in user_items[uid]: # 只推荐未交互过的物品
item_scores[item_id] += score * similarity_matrix[u_idx][sim_uid_idx]
# 将得分最高的物品推荐给用户
recom_list = {item_id: score for item_id, score in sorted(item_scores.items(), key=lambda x: -x[1])[:top_k]}
if len(recom_list) > least_items_size_to_output:
recommendations[uid] = recom_list
logging.info("Recommendations computed for all users.")
return recommendations
# 输出推荐结果
def write_output(recommendations, output_file):
try:
with open(output_file, 'w') as f:
for uid, rec_items in recommendations.items():
rec_str = ",".join([f"{item_id}:{score:.2f}" for item_id, score in rec_items.items()])
f.write(f"{uid}\t{rec_str}\n")
logging.info(f"Recommendations written to {output_file}.")
except Exception as e:
logging.error(f"Error writing recommendations to {output_file}: {e}")
def main():
if len(sys.argv) != 3:
print("Usage: python recommend.py <input_file> <output_file>")
logging.error("Invalid number of arguments. Expected 2 arguments: input_file and output_file.")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
logging.info(f"Starting recommendation process. Input file: {input_file}, Output file: {output_file}")
# Step 1: 读取并合并输入
user_items = read_input(input_file)
if not user_items:
logging.error(f"No valid user-item data found in {input_file}. Exiting.")
sys.exit(1)
# Step 2: 构建用户-物品矩阵
user_item_matrix, user_list, item_list, user_index, item_index = build_user_item_matrix(user_items)
# Step 3: 计算用户相似性
similarity_matrix = compute_user_similarity(user_item_matrix)
# Step 4: 为用户推荐物品
recommendations = recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k)
# Step 5: 输出推荐结果
write_output(recommendations, output_file)
if __name__ == '__main__':
main()
|