Blame view

offline_tasks/scripts/i2i_item_behavior.py 5.34 KB
5b954396   tangwang   add cos sim
1
2
  import pandas as pd

  import math

e89d7a84   tangwang   deepwalk refactor...
3
  import os

5b954396   tangwang   add cos sim
4
5
6
7
8
  from collections import defaultdict

  from sqlalchemy import create_engine

  from db_service import create_db_connection

  import argparse

  from datetime import datetime

c59dd0b0   tangwang   补充部分任务明文版本输出
9
  from scripts.debug_utils import save_readable_index

5b954396   tangwang   add cos sim
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  

  def clean_text_field(text):

      if pd.isna(text):

          return ''

      # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符

      return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

  

  # 解析命令行参数

  parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')

  parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')

  parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')

  parser.add_argument('--debug', action='store_true', help='开启debug模式')

  args = parser.parse_args()

  

  # 数据库连接配置

  host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'

  port = '9030'

  database = 'datacenter'

  username = 'readonly'

  password = 'essa1234'

  

  # 创建数据库连接

  engine = create_db_connection(host, port, database, username, password)

  

  # SQL 查询 - 获取用户点击序列

  sql_query = f"""

  SELECT 

      DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,

      se.anonymous_id AS user_id,

      se.item_id,

      pgs.name AS item_name

  FROM 

      sensors_events se

  LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id

  WHERE 

      se.event IN ('contactFactory', 'addToPool', 'addToCart')

      AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)

  ORDER BY 

      se.anonymous_id,

      se.create_time;

  """

  

  if args.debug:

      print(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")

      print(f"[DEBUG] 开始查询数据库...")

  

  # 执行 SQL 查询并将结果加载到 pandas DataFrame

  df = pd.read_sql(sql_query, engine)

  

0b73c877   tangwang   fix
59
60
61
62
  # 确保ID为整数类型

  df['item_id'] = df['item_id'].astype(int)

  df['user_id'] = df['user_id'].astype(str)  # user_id保持为字符串

  

5b954396   tangwang   add cos sim
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
  if args.debug:

      print(f"[DEBUG] 查询完成,共 {len(df)} 条记录")

      print(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")

      print(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")

  

  # 处理点击序列,计算共现关系

  cooccur = defaultdict(lambda: defaultdict(int))

  freq = defaultdict(int)

  

  # 按用户和日期分组处理点击序列

  for (user_id, date), group in df.groupby(['user_id', 'date']):

      items = group['item_id'].tolist()

      unique_items = set(items)

      

      # 更新频率统计

      for item in unique_items:

          freq[item] += 1

      

      # 更新共现关系

      for i in range(len(items)):

          for j in range(i + 1, len(items)):

              item1, item2 = items[i], items[j]

              if item1 != item2:

                  cooccur[item1][item2] += 1

                  cooccur[item2][item1] += 1

  

  # 计算余弦相似度

  if args.debug:

      print(f"[DEBUG] 开始计算相似度...")

  

  result = {}

  for item1 in cooccur:

      sim_scores = []

      for item2 in cooccur[item1]:

          numerator = cooccur[item1][item2]

          denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])

          if denominator != 0:

              score = numerator / denominator

              sim_scores.append((item2, score))

      sim_scores.sort(key=lambda x: -x[1])  # 按分数排序

      # 只保留top_n个相似商品

      result[item1] = sim_scores[:args.top_n]

  

  if args.debug:

      print(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")

  

  # 创建item_id到name的映射

  item_name_map = dict(zip(df['item_id'], df['item_name']))

  

  # 准备输出

  date_str = datetime.now().strftime('%Y%m%d')

  output_dir = 'output'

  os.makedirs(output_dir, exist_ok=True)

  output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')

  

  # 输出相似商品到文件

  if args.debug:

      print(f"[DEBUG] 开始写入文件: {output_file}")

  

  with open(output_file, 'w', encoding='utf-8') as f:

      for item_id, sims in sorted(result.items()):

          item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))

          # 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...

          sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])

          f.write(f'{item_id}\t{item_name}\t{sim_str}\n')

  

  print(f"✓ Item相似度计算完成")

  print(f"  - 输出文件: {output_file}")

  print(f"  - 商品数: {len(result)}")

  if result:

      avg_sims = sum(len(sims) for sims in result.values()) / len(result)

      print(f"  - 平均相似商品数: {avg_sims:.1f}")

c59dd0b0   tangwang   补充部分任务明文版本输出
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  

  # 如果启用debug模式,保存可读格式

  if args.debug and result:

      print("[DEBUG] 保存可读格式文件...")

      

      # 准备name_mappings

      name_mappings = {

          'item': {str(k): clean_text_field(v) for k, v in item_name_map.items()}

      }

      

      # 转换数据格式为 {key: [(sim_id, score), ...]}

      readable_data = {}

      for item_id, sims in result.items():

          readable_data[f"i2i:item_behavior:{item_id}"] = sims

      

      save_readable_index(

          output_file,

          readable_data,

          name_mappings,

          description='i2i:item_behavior'

      )

      print(f"  - 可读文件: {output_file.replace('.txt', '_readable.txt')}")