Blame view

offline_tasks/scripts/i2i_item_behavior.py 7.67 KB
5b954396   tangwang   add cos sim
1
2
  import pandas as pd

  import math

e89d7a84   tangwang   deepwalk refactor...
3
  import os

5b954396   tangwang   add cos sim
4
5
6
7
8
  from collections import defaultdict

  from sqlalchemy import create_engine

  from db_service import create_db_connection

  import argparse

  from datetime import datetime

c59dd0b0   tangwang   补充部分任务明文版本输出
9
  from scripts.debug_utils import save_readable_index

789edb14   tangwang   add logs
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  import logging

  

  def setup_logger():

      """设置logger配置"""

      # 创建logs目录

      logs_dir = 'logs'

      os.makedirs(logs_dir, exist_ok=True)

      

      # 创建logger

      logger = logging.getLogger('i2i_item_behavior')

      logger.setLevel(logging.INFO)

      

      # 避免重复添加handler

      if logger.handlers:

          return logger

      

      # 创建文件handler

      log_file = os.path.join(logs_dir, f'i2i_item_behavior_{datetime.now().strftime("%Y%m%d")}.log')

      file_handler = logging.FileHandler(log_file, encoding='utf-8')

      file_handler.setLevel(logging.INFO)

      

      # 创建控制台handler

      console_handler = logging.StreamHandler()

      console_handler.setLevel(logging.INFO)

      

      # 创建formatter

      formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

      file_handler.setFormatter(formatter)

      console_handler.setFormatter(formatter)

      

      # 添加handler到logger

      logger.addHandler(file_handler)

      logger.addHandler(console_handler)

      

      return logger

5b954396   tangwang   add cos sim
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  

  def clean_text_field(text):

      if pd.isna(text):

          return ''

      # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符

      return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

  

  # 解析命令行参数

  parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')

  parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')

  parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')

  parser.add_argument('--debug', action='store_true', help='开启debug模式')

  args = parser.parse_args()

  

9eb36bd2   tangwang   add logs
59
60
61
  # 初始化logger

  logger = setup_logger()

  

5b954396   tangwang   add cos sim
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  # 数据库连接配置

  host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'

  port = '9030'

  database = 'datacenter'

  username = 'readonly'

  password = 'essa1234'

  

  # 创建数据库连接

  engine = create_db_connection(host, port, database, username, password)

  

  # SQL 查询 - 获取用户点击序列

  sql_query = f"""

  SELECT 

      DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,

      se.anonymous_id AS user_id,

      se.item_id,

      pgs.name AS item_name

  FROM 

      sensors_events se

  LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id

  WHERE 

      se.event IN ('contactFactory', 'addToPool', 'addToCart')

      AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)

  ORDER BY 

      se.anonymous_id,

      se.create_time;

  """

  

9eb36bd2   tangwang   add logs
90
91
92
93
94
  logger.info("="*80)

  logger.info("I2I商品行为相似度计算开始")

  logger.info("="*80)

  logger.info(f"参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")

  

5b954396   tangwang   add cos sim
95
  if args.debug:

9eb36bd2   tangwang   add logs
96
97
      logger.debug(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")

      logger.debug(f"[DEBUG] 开始查询数据库...")

5b954396   tangwang   add cos sim
98
99
100
101
  

  # 执行 SQL 查询并将结果加载到 pandas DataFrame

  df = pd.read_sql(sql_query, engine)

  

0b73c877   tangwang   fix
102
103
104
105
  # 确保ID为整数类型

  df['item_id'] = df['item_id'].astype(int)

  df['user_id'] = df['user_id'].astype(str)  # user_id保持为字符串

  

9eb36bd2   tangwang   add logs
106
107
108
109
  logger.info(f"数据库查询完成,共 {len(df)} 条记录")

  logger.info(f"唯一用户数: {df['user_id'].nunique()}")

  logger.info(f"唯一商品数: {df['item_id'].nunique()}")

  

5b954396   tangwang   add cos sim
110
  if args.debug:

9eb36bd2   tangwang   add logs
111
112
113
      logger.debug(f"[DEBUG] 查询完成,共 {len(df)} 条记录")

      logger.debug(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")

      logger.debug(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")

5b954396   tangwang   add cos sim
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
  

  # 处理点击序列,计算共现关系

  cooccur = defaultdict(lambda: defaultdict(int))

  freq = defaultdict(int)

  

  # 按用户和日期分组处理点击序列

  for (user_id, date), group in df.groupby(['user_id', 'date']):

      items = group['item_id'].tolist()

      unique_items = set(items)

      

      # 更新频率统计

      for item in unique_items:

          freq[item] += 1

      

      # 更新共现关系

      for i in range(len(items)):

          for j in range(i + 1, len(items)):

              item1, item2 = items[i], items[j]

              if item1 != item2:

                  cooccur[item1][item2] += 1

                  cooccur[item2][item1] += 1

  

  # 计算余弦相似度

9eb36bd2   tangwang   add logs
137
138
  logger.info("开始计算商品相似度...")

  

5b954396   tangwang   add cos sim
139
  if args.debug:

9eb36bd2   tangwang   add logs
140
      logger.debug(f"[DEBUG] 开始计算相似度...")

5b954396   tangwang   add cos sim
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  

  result = {}

  for item1 in cooccur:

      sim_scores = []

      for item2 in cooccur[item1]:

          numerator = cooccur[item1][item2]

          denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])

          if denominator != 0:

              score = numerator / denominator

              sim_scores.append((item2, score))

      sim_scores.sort(key=lambda x: -x[1])  # 按分数排序

      # 只保留top_n个相似商品

      result[item1] = sim_scores[:args.top_n]

  

9eb36bd2   tangwang   add logs
155
156
  logger.info(f"相似度计算完成,共 {len(result)} 个商品有相似推荐")

  

5b954396   tangwang   add cos sim
157
  if args.debug:

9eb36bd2   tangwang   add logs
158
      logger.debug(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")

5b954396   tangwang   add cos sim
159
160
161
162
163
164
165
166
167
168
169
  

  # 创建item_id到name的映射

  item_name_map = dict(zip(df['item_id'], df['item_name']))

  

  # 准备输出

  date_str = datetime.now().strftime('%Y%m%d')

  output_dir = 'output'

  os.makedirs(output_dir, exist_ok=True)

  output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')

  

  # 输出相似商品到文件

9eb36bd2   tangwang   add logs
170
171
  logger.info(f"开始写入输出文件: {output_file}")

  

5b954396   tangwang   add cos sim
172
  if args.debug:

9eb36bd2   tangwang   add logs
173
      logger.debug(f"[DEBUG] 开始写入文件: {output_file}")

5b954396   tangwang   add cos sim
174
175
176
177
178
179
180
181
  

  with open(output_file, 'w', encoding='utf-8') as f:

      for item_id, sims in sorted(result.items()):

          item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))

          # 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...

          sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])

          f.write(f'{item_id}\t{item_name}\t{sim_str}\n')

  

9eb36bd2   tangwang   add logs
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
  logger.info(f"输出文件写入完成: {output_file}")

  

  # 统计信息

  total_similarity_pairs = sum(len(sims) for sims in result.values())

  avg_sims = total_similarity_pairs / len(result) if result else 0

  

  logger.info("="*80)

  logger.info("I2I商品行为相似度计算 - 关键统计信息")

  logger.info("="*80)

  logger.info(f"📊 数据概览:")

  logger.info(f"  - 总行为记录数: {len(df):,}")

  logger.info(f"  - 唯一用户数: {df['user_id'].nunique():,}")

  logger.info(f"  - 唯一商品数: {df['item_id'].nunique():,}")

  logger.info(f"  - 有相似度的商品数: {len(result):,}")

  logger.info(f"  - 总相似度对数量: {total_similarity_pairs:,}")

  logger.info(f"  - 平均每商品相似数: {avg_sims:.1f}")

  

  logger.info(f"📁 输出文件:")

  logger.info(f"  - 输出文件: {output_file}")

  

  logger.info(f"✅ I2I商品行为相似度计算完成")

  logger.info("="*80)

c59dd0b0   tangwang   补充部分任务明文版本输出
204
205
206
  

  # 如果启用debug模式,保存可读格式

  if args.debug and result:

9eb36bd2   tangwang   add logs
207
      logger.debug("[DEBUG] 保存可读格式文件...")

c59dd0b0   tangwang   补充部分任务明文版本输出
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
      

      # 准备name_mappings

      name_mappings = {

          'item': {str(k): clean_text_field(v) for k, v in item_name_map.items()}

      }

      

      # 转换数据格式为 {key: [(sim_id, score), ...]}

      readable_data = {}

      for item_id, sims in result.items():

          readable_data[f"i2i:item_behavior:{item_id}"] = sims

      

      save_readable_index(

          output_file,

          readable_data,

          name_mappings,

          description='i2i:item_behavior'

      )

      print(f"  - 可读文件: {output_file.replace('.txt', '_readable.txt')}")