5b954396
tangwang
add cos sim
|
1
2
|
import pandas as pd
import math
|
e89d7a84
tangwang
deepwalk refactor...
|
3
|
import os
|
5b954396
tangwang
add cos sim
|
4
5
6
7
8
|
from collections import defaultdict
from sqlalchemy import create_engine
from db_service import create_db_connection
import argparse
from datetime import datetime
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
9
|
from scripts.debug_utils import save_readable_index
|
789edb14
tangwang
add logs
|
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import logging
def setup_logger():
"""设置logger配置"""
# 创建logs目录
logs_dir = 'logs'
os.makedirs(logs_dir, exist_ok=True)
# 创建logger
logger = logging.getLogger('i2i_item_behavior')
logger.setLevel(logging.INFO)
# 避免重复添加handler
if logger.handlers:
return logger
# 创建文件handler
log_file = os.path.join(logs_dir, f'i2i_item_behavior_{datetime.now().strftime("%Y%m%d")}.log')
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加handler到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
|
5b954396
tangwang
add cos sim
|
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
def clean_text_field(text):
if pd.isna(text):
return ''
# 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符
return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()
# 解析命令行参数
parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')
parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')
parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')
parser.add_argument('--debug', action='store_true', help='开启debug模式')
args = parser.parse_args()
|
9eb36bd2
tangwang
add logs
|
59
60
61
|
# 初始化logger
logger = setup_logger()
|
5b954396
tangwang
add cos sim
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# 数据库连接配置
host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'
port = '9030'
database = 'datacenter'
username = 'readonly'
password = 'essa1234'
# 创建数据库连接
engine = create_db_connection(host, port, database, username, password)
# SQL 查询 - 获取用户点击序列
sql_query = f"""
SELECT
DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,
se.anonymous_id AS user_id,
se.item_id,
pgs.name AS item_name
FROM
sensors_events se
LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
WHERE
se.event IN ('contactFactory', 'addToPool', 'addToCart')
AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)
ORDER BY
se.anonymous_id,
se.create_time;
"""
|
9eb36bd2
tangwang
add logs
|
90
91
92
93
94
|
logger.info("="*80)
logger.info("I2I商品行为相似度计算开始")
logger.info("="*80)
logger.info(f"参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")
|
5b954396
tangwang
add cos sim
|
95
|
if args.debug:
|
9eb36bd2
tangwang
add logs
|
96
97
|
logger.debug(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")
logger.debug(f"[DEBUG] 开始查询数据库...")
|
5b954396
tangwang
add cos sim
|
98
99
100
101
|
# 执行 SQL 查询并将结果加载到 pandas DataFrame
df = pd.read_sql(sql_query, engine)
|
0b73c877
tangwang
fix
|
102
103
104
105
|
# 确保ID为整数类型
df['item_id'] = df['item_id'].astype(int)
df['user_id'] = df['user_id'].astype(str) # user_id保持为字符串
|
9eb36bd2
tangwang
add logs
|
106
107
108
109
|
logger.info(f"数据库查询完成,共 {len(df)} 条记录")
logger.info(f"唯一用户数: {df['user_id'].nunique()}")
logger.info(f"唯一商品数: {df['item_id'].nunique()}")
|
5b954396
tangwang
add cos sim
|
110
|
if args.debug:
|
9eb36bd2
tangwang
add logs
|
111
112
113
|
logger.debug(f"[DEBUG] 查询完成,共 {len(df)} 条记录")
logger.debug(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")
logger.debug(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")
|
5b954396
tangwang
add cos sim
|
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# 处理点击序列,计算共现关系
cooccur = defaultdict(lambda: defaultdict(int))
freq = defaultdict(int)
# 按用户和日期分组处理点击序列
for (user_id, date), group in df.groupby(['user_id', 'date']):
items = group['item_id'].tolist()
unique_items = set(items)
# 更新频率统计
for item in unique_items:
freq[item] += 1
# 更新共现关系
for i in range(len(items)):
for j in range(i + 1, len(items)):
item1, item2 = items[i], items[j]
if item1 != item2:
cooccur[item1][item2] += 1
cooccur[item2][item1] += 1
# 计算余弦相似度
|
9eb36bd2
tangwang
add logs
|
137
138
|
logger.info("开始计算商品相似度...")
|
5b954396
tangwang
add cos sim
|
139
|
if args.debug:
|
9eb36bd2
tangwang
add logs
|
140
|
logger.debug(f"[DEBUG] 开始计算相似度...")
|
5b954396
tangwang
add cos sim
|
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
result = {}
for item1 in cooccur:
sim_scores = []
for item2 in cooccur[item1]:
numerator = cooccur[item1][item2]
denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])
if denominator != 0:
score = numerator / denominator
sim_scores.append((item2, score))
sim_scores.sort(key=lambda x: -x[1]) # 按分数排序
# 只保留top_n个相似商品
result[item1] = sim_scores[:args.top_n]
|
9eb36bd2
tangwang
add logs
|
155
156
|
logger.info(f"相似度计算完成,共 {len(result)} 个商品有相似推荐")
|
5b954396
tangwang
add cos sim
|
157
|
if args.debug:
|
9eb36bd2
tangwang
add logs
|
158
|
logger.debug(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")
|
5b954396
tangwang
add cos sim
|
159
160
161
162
163
164
165
166
167
168
169
|
# 创建item_id到name的映射
item_name_map = dict(zip(df['item_id'], df['item_name']))
# 准备输出
date_str = datetime.now().strftime('%Y%m%d')
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')
# 输出相似商品到文件
|
9eb36bd2
tangwang
add logs
|
170
171
|
logger.info(f"开始写入输出文件: {output_file}")
|
5b954396
tangwang
add cos sim
|
172
|
if args.debug:
|
9eb36bd2
tangwang
add logs
|
173
|
logger.debug(f"[DEBUG] 开始写入文件: {output_file}")
|
5b954396
tangwang
add cos sim
|
174
175
176
177
178
179
180
181
|
with open(output_file, 'w', encoding='utf-8') as f:
for item_id, sims in sorted(result.items()):
item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))
# 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...
sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
|
9eb36bd2
tangwang
add logs
|
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
logger.info(f"输出文件写入完成: {output_file}")
# 统计信息
total_similarity_pairs = sum(len(sims) for sims in result.values())
avg_sims = total_similarity_pairs / len(result) if result else 0
logger.info("="*80)
logger.info("I2I商品行为相似度计算 - 关键统计信息")
logger.info("="*80)
logger.info(f"📊 数据概览:")
logger.info(f" - 总行为记录数: {len(df):,}")
logger.info(f" - 唯一用户数: {df['user_id'].nunique():,}")
logger.info(f" - 唯一商品数: {df['item_id'].nunique():,}")
logger.info(f" - 有相似度的商品数: {len(result):,}")
logger.info(f" - 总相似度对数量: {total_similarity_pairs:,}")
logger.info(f" - 平均每商品相似数: {avg_sims:.1f}")
logger.info(f"📁 输出文件:")
logger.info(f" - 输出文件: {output_file}")
logger.info(f"✅ I2I商品行为相似度计算完成")
logger.info("="*80)
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
204
205
206
|
# 如果启用debug模式,保存可读格式
if args.debug and result:
|
9eb36bd2
tangwang
add logs
|
207
|
logger.debug("[DEBUG] 保存可读格式文件...")
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
# 准备name_mappings
name_mappings = {
'item': {str(k): clean_text_field(v) for k, v in item_name_map.items()}
}
# 转换数据格式为 {key: [(sim_id, score), ...]}
readable_data = {}
for item_id, sims in result.items():
readable_data[f"i2i:item_behavior:{item_id}"] = sims
save_readable_index(
output_file,
readable_data,
name_mappings,
description='i2i:item_behavior'
)
print(f" - 可读文件: {output_file.replace('.txt', '_readable.txt')}")
|