Blame view

offline_tasks/scripts/i2i_item_behavior.py 4.46 KB
5b954396   tangwang   add cos sim
1
2
3
4
5
6
7
  import pandas as pd

  import math

  from collections import defaultdict

  from sqlalchemy import create_engine

  from db_service import create_db_connection

  import argparse

  from datetime import datetime

5b954396   tangwang   add cos sim
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
  

  def clean_text_field(text):

      if pd.isna(text):

          return ''

      # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符

      return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

  

  # 解析命令行参数

  parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')

  parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')

  parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')

  parser.add_argument('--debug', action='store_true', help='开启debug模式')

  args = parser.parse_args()

  

  # 数据库连接配置

  host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'

  port = '9030'

  database = 'datacenter'

  username = 'readonly'

  password = 'essa1234'

  

  # 创建数据库连接

  engine = create_db_connection(host, port, database, username, password)

  

  # SQL 查询 - 获取用户点击序列

  sql_query = f"""

  SELECT 

      DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,

      se.anonymous_id AS user_id,

      se.item_id,

      pgs.name AS item_name

  FROM 

      sensors_events se

  LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id

  WHERE 

      se.event IN ('contactFactory', 'addToPool', 'addToCart')

      AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)

  ORDER BY 

      se.anonymous_id,

      se.create_time;

  """

  

  if args.debug:

      print(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")

      print(f"[DEBUG] 开始查询数据库...")

  

  # 执行 SQL 查询并将结果加载到 pandas DataFrame

  df = pd.read_sql(sql_query, engine)

  

  if args.debug:

      print(f"[DEBUG] 查询完成,共 {len(df)} 条记录")

      print(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")

      print(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")

  

  # 处理点击序列,计算共现关系

  cooccur = defaultdict(lambda: defaultdict(int))

  freq = defaultdict(int)

  

  # 按用户和日期分组处理点击序列

  for (user_id, date), group in df.groupby(['user_id', 'date']):

      items = group['item_id'].tolist()

      unique_items = set(items)

      

      # 更新频率统计

      for item in unique_items:

          freq[item] += 1

      

      # 更新共现关系

      for i in range(len(items)):

          for j in range(i + 1, len(items)):

              item1, item2 = items[i], items[j]

              if item1 != item2:

                  cooccur[item1][item2] += 1

                  cooccur[item2][item1] += 1

  

  # 计算余弦相似度

  if args.debug:

      print(f"[DEBUG] 开始计算相似度...")

  

  result = {}

  for item1 in cooccur:

      sim_scores = []

      for item2 in cooccur[item1]:

          numerator = cooccur[item1][item2]

          denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])

          if denominator != 0:

              score = numerator / denominator

              sim_scores.append((item2, score))

      sim_scores.sort(key=lambda x: -x[1])  # 按分数排序

      # 只保留top_n个相似商品

      result[item1] = sim_scores[:args.top_n]

  

  if args.debug:

      print(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")

  

  # 创建item_id到name的映射

  item_name_map = dict(zip(df['item_id'], df['item_name']))

  

  # 准备输出

  date_str = datetime.now().strftime('%Y%m%d')

  output_dir = 'output'

  os.makedirs(output_dir, exist_ok=True)

  output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')

  

  # 输出相似商品到文件

  if args.debug:

      print(f"[DEBUG] 开始写入文件: {output_file}")

  

  with open(output_file, 'w', encoding='utf-8') as f:

      for item_id, sims in sorted(result.items()):

          item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))

          # 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...

          sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])

          f.write(f'{item_id}\t{item_name}\t{sim_str}\n')

  

  print(f"✓ Item相似度计算完成")

  print(f"  - 输出文件: {output_file}")

  print(f"  - 商品数: {len(result)}")

  if result:

      avg_sims = sum(len(sims) for sims in result.values()) / len(result)

      print(f"  - 平均相似商品数: {avg_sims:.1f}")