i2i_item_behavior.py
4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pandas as pd
import math
import os
from collections import defaultdict
from sqlalchemy import create_engine
from db_service import create_db_connection
import argparse
from datetime import datetime
def clean_text_field(text):
if pd.isna(text):
return ''
# 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符
return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()
# 解析命令行参数
parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')
parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')
parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')
parser.add_argument('--debug', action='store_true', help='开启debug模式')
args = parser.parse_args()
# 数据库连接配置
host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'
port = '9030'
database = 'datacenter'
username = 'readonly'
password = 'essa1234'
# 创建数据库连接
engine = create_db_connection(host, port, database, username, password)
# SQL 查询 - 获取用户点击序列
sql_query = f"""
SELECT
DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,
se.anonymous_id AS user_id,
se.item_id,
pgs.name AS item_name
FROM
sensors_events se
LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
WHERE
se.event IN ('contactFactory', 'addToPool', 'addToCart')
AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)
ORDER BY
se.anonymous_id,
se.create_time;
"""
if args.debug:
print(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")
print(f"[DEBUG] 开始查询数据库...")
# 执行 SQL 查询并将结果加载到 pandas DataFrame
df = pd.read_sql(sql_query, engine)
if args.debug:
print(f"[DEBUG] 查询完成,共 {len(df)} 条记录")
print(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")
print(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")
# 处理点击序列,计算共现关系
cooccur = defaultdict(lambda: defaultdict(int))
freq = defaultdict(int)
# 按用户和日期分组处理点击序列
for (user_id, date), group in df.groupby(['user_id', 'date']):
items = group['item_id'].tolist()
unique_items = set(items)
# 更新频率统计
for item in unique_items:
freq[item] += 1
# 更新共现关系
for i in range(len(items)):
for j in range(i + 1, len(items)):
item1, item2 = items[i], items[j]
if item1 != item2:
cooccur[item1][item2] += 1
cooccur[item2][item1] += 1
# 计算余弦相似度
if args.debug:
print(f"[DEBUG] 开始计算相似度...")
result = {}
for item1 in cooccur:
sim_scores = []
for item2 in cooccur[item1]:
numerator = cooccur[item1][item2]
denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])
if denominator != 0:
score = numerator / denominator
sim_scores.append((item2, score))
sim_scores.sort(key=lambda x: -x[1]) # 按分数排序
# 只保留top_n个相似商品
result[item1] = sim_scores[:args.top_n]
if args.debug:
print(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")
# 创建item_id到name的映射
item_name_map = dict(zip(df['item_id'], df['item_name']))
# 准备输出
date_str = datetime.now().strftime('%Y%m%d')
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')
# 输出相似商品到文件
if args.debug:
print(f"[DEBUG] 开始写入文件: {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
for item_id, sims in sorted(result.items()):
item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))
# 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...
sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
print(f"✓ Item相似度计算完成")
print(f" - 输出文件: {output_file}")
print(f" - 商品数: {len(result)}")
if result:
avg_sims = sum(len(sims) for sims in result.values()) / len(result)
print(f" - 平均相似商品数: {avg_sims:.1f}")