Blame view

offline_tasks/scripts/i2i_content_similar.py 11.3 KB
5ab1c29c   tangwang   first commit
1
  """
b57c6eb4   tangwang   offline tasks: fi...
2
3
4
5
  i2i - 基于ES向量的内容相似索引
  Elasticsearch获取商品向量,计算两种相似度:
  1. 基于名称文本向量的相似度
  2. 基于图片向量的相似度
5ab1c29c   tangwang   first commit
6
  """
b57c6eb4   tangwang   offline tasks: fi...
7
  import json
e89d7a84   tangwang   deepwalk refactor...
8
  import os
c59dd0b0   tangwang   补充部分任务明文版本输出
9
  import argparse
5ab1c29c   tangwang   first commit
10
  import pandas as pd
b57c6eb4   tangwang   offline tasks: fi...
11
12
  from datetime import datetime, timedelta
  from elasticsearch import Elasticsearch
5ab1c29c   tangwang   first commit
13
  from db_service import create_db_connection
c9f77c8f   tangwang   deepwalk refactor...
14
  from config.offline_config import DB_CONFIG, OUTPUT_DIR
c59dd0b0   tangwang   补充部分任务明文版本输出
15
16
17
18
  from scripts.debug_utils import (
      setup_debug_logger, log_processing_step, 
      save_readable_index, fetch_name_mappings
  )
5ab1c29c   tangwang   first commit
19
  
b57c6eb4   tangwang   offline tasks: fi...
20
21
22
23
24
25
26
  # ES配置
  ES_CONFIG = {
      'host': 'http://localhost:9200',
      'index_name': 'spu',
      'username': 'essa',
      'password': '4hOaLaf41y2VuI8y'
  }
5ab1c29c   tangwang   first commit
27
  
b57c6eb4   tangwang   offline tasks: fi...
28
29
30
31
32
33
34
  # 算法参数
  TOP_N = 50          # 每个商品返回的相似商品数量
  KNN_K = 100         # knn查询返回的候选数
  KNN_CANDIDATES = 200  # knn查询的候选池大小
  
  
  def get_active_items(engine):
5ab1c29c   tangwang   first commit
35
      """
b57c6eb4   tangwang   offline tasks: fi...
36
      获取最近1年有过行为的item列表
5ab1c29c   tangwang   first commit
37
      """
b57c6eb4   tangwang   offline tasks: fi...
38
39
40
41
42
43
44
45
46
47
48
      one_year_ago = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
      
      sql_query = f"""
      SELECT DISTINCT
          se.item_id
      FROM 
          sensors_events se
      WHERE 
          se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase')
          AND se.create_time >= '{one_year_ago}'
          AND se.item_id IS NOT NULL
5ab1c29c   tangwang   first commit
49
50
      """
      
5ab1c29c   tangwang   first commit
51
      df = pd.read_sql(sql_query, engine)
b57c6eb4   tangwang   offline tasks: fi...
52
      return df['item_id'].tolist()
5ab1c29c   tangwang   first commit
53
54
  
  
b57c6eb4   tangwang   offline tasks: fi...
55
56
57
58
59
60
61
62
63
64
65
66
  def connect_es():
      """连接到Elasticsearch"""
      es = Elasticsearch(
          [ES_CONFIG['host']],
          basic_auth=(ES_CONFIG['username'], ES_CONFIG['password']),
          verify_certs=False,
          request_timeout=30
      )
      return es
  
  
  def get_item_vectors(es, item_id):
5ab1c29c   tangwang   first commit
67
      """
b57c6eb4   tangwang   offline tasks: fi...
68
      ES获取商品的向量数据
5ab1c29c   tangwang   first commit
69
      
b57c6eb4   tangwang   offline tasks: fi...
70
      Returns:
fb8112e0   tangwang   offline tasks: me...
71
          dict with keys: _id, name_zh, embedding_name_zh, embedding_pic_h14, on_sell_days_boost
b57c6eb4   tangwang   offline tasks: fi...
72
73
74
75
76
77
78
79
80
81
82
83
           None if not found
      """
      try:
          response = es.search(
              index=ES_CONFIG['index_name'],
              body={
                  "query": {
                      "term": {
                          "_id": str(item_id)
                      }
                  },
                  "_source": {
fb8112e0   tangwang   offline tasks: me...
84
                      "includes": ["_id", "name_zh", "embedding_name_zh", "embedding_pic_h14", "on_sell_days_boost"]
b57c6eb4   tangwang   offline tasks: fi...
85
86
87
88
89
90
91
92
93
94
                  }
              }
          )
          
          if response['hits']['hits']:
              hit = response['hits']['hits'][0]
              return {
                  '_id': hit['_id'],
                  'name_zh': hit['_source'].get('name_zh', ''),
                  'embedding_name_zh': hit['_source'].get('embedding_name_zh'),
fb8112e0   tangwang   offline tasks: me...
95
96
                  'embedding_pic_h14': hit['_source'].get('embedding_pic_h14'),
                  'on_sell_days_boost': hit['_source'].get('on_sell_days_boost', 1.0)
b57c6eb4   tangwang   offline tasks: fi...
97
98
99
100
              }
          return None
      except Exception as e:
          return None
5ab1c29c   tangwang   first commit
101
102
  
  
b57c6eb4   tangwang   offline tasks: fi...
103
  def find_similar_by_vector(es, vector, field_name, k=KNN_K, num_candidates=KNN_CANDIDATES):
5ab1c29c   tangwang   first commit
104
      """
b57c6eb4   tangwang   offline tasks: fi...
105
      使用knn查询找到相似的items
40442baf   tangwang   offline tasks: fi...
106
      
b57c6eb4   tangwang   offline tasks: fi...
107
108
109
110
111
112
      Args:
          es: Elasticsearch客户端
          vector: 查询向量
          field_name: 向量字段名 (embedding_name_zh  embedding_pic_h14.vector)
          k: 返回的结果数
          num_candidates: 候选池大小
5ab1c29c   tangwang   first commit
113
      
b57c6eb4   tangwang   offline tasks: fi...
114
      Returns:
fb8112e0   tangwang   offline tasks: me...
115
          List of (item_id, boosted_score, name_zh) tuples
b57c6eb4   tangwang   offline tasks: fi...
116
117
118
119
120
121
122
123
124
125
126
      """
      try:
          response = es.search(
              index=ES_CONFIG['index_name'],
              body={
                  "knn": {
                      "field": field_name,
                      "query_vector": vector,
                      "k": k,
                      "num_candidates": num_candidates
                  },
fb8112e0   tangwang   offline tasks: me...
127
                  "_source": ["_id", "name_zh", "on_sell_days_boost"],
b57c6eb4   tangwang   offline tasks: fi...
128
129
130
                  "size": k
              }
          )
5ab1c29c   tangwang   first commit
131
          
b57c6eb4   tangwang   offline tasks: fi...
132
133
          results = []
          for hit in response['hits']['hits']:
fb8112e0   tangwang   offline tasks: me...
134
135
136
137
              # 获取基础分数
              base_score = hit['_score']
              
              # 获取on_sell_days_boost提权值,默认为1.0(不提权)
6773cdbe   tangwang   fix
138
              boost = hit['_source'].get('on_sell_days_boost', 1.0) or 1.0
fb8112e0   tangwang   offline tasks: me...
139
140
141
142
              
              # 应用提权
              boosted_score = base_score * boost
              
b57c6eb4   tangwang   offline tasks: fi...
143
144
              results.append((
                  hit['_id'],
fb8112e0   tangwang   offline tasks: me...
145
                  boosted_score,
b57c6eb4   tangwang   offline tasks: fi...
146
147
148
149
150
                  hit['_source'].get('name_zh', '')
              ))
          return results
      except Exception as e:
          return []
5ab1c29c   tangwang   first commit
151
152
  
  
c59dd0b0   tangwang   补充部分任务明文版本输出
153
  def generate_similarity_index(es, active_items, vector_field, field_name, logger, top_n=50):
5ab1c29c   tangwang   first commit
154
      """
b57c6eb4   tangwang   offline tasks: fi...
155
156
157
158
159
160
161
162
      生成一种向量的相似度索引
      
      Args:
          es: Elasticsearch客户端
          active_items: 活跃商品ID列表
          vector_field: 向量字段名 (embedding_name_zh  embedding_pic_h14)
          field_name: 字段简称 (name  pic)
          logger: 日志记录器
c59dd0b0   tangwang   补充部分任务明文版本输出
163
          top_n: 返回的相似商品数量
b57c6eb4   tangwang   offline tasks: fi...
164
165
166
      
      Returns:
          dict: {item_id: [(similar_id, score, name), ...]}
5ab1c29c   tangwang   first commit
167
      """
b57c6eb4   tangwang   offline tasks: fi...
168
169
      result = {}
      total = len(active_items)
5ab1c29c   tangwang   first commit
170
      
b57c6eb4   tangwang   offline tasks: fi...
171
172
173
174
175
176
177
      for idx, item_id in enumerate(active_items):
          if (idx + 1) % 100 == 0:
              logger.info(f"处理进度: {idx + 1}/{total} ({(idx + 1) / total * 100:.1f}%)")
          
          # 获取该商品的向量
          item_data = get_item_vectors(es, item_id)
          if not item_data:
5ab1c29c   tangwang   first commit
178
179
              continue
          
b57c6eb4   tangwang   offline tasks: fi...
180
181
182
183
184
185
186
187
188
189
190
191
192
          # 提取向量
          if vector_field == 'embedding_name_zh':
              query_vector = item_data.get('embedding_name_zh')
          elif vector_field == 'embedding_pic_h14':
              pic_data = item_data.get('embedding_pic_h14')
              if pic_data and isinstance(pic_data, list) and len(pic_data) > 0:
                  query_vector = pic_data[0].get('vector') if isinstance(pic_data[0], dict) else None
              else:
                  query_vector = None
          else:
              query_vector = None
          
          if not query_vector:
5ab1c29c   tangwang   first commit
193
194
              continue
          
b57c6eb4   tangwang   offline tasks: fi...
195
196
197
198
199
          # 使用knn查询相似items(需要排除自己)
          knn_field = f"{vector_field}.vector" if vector_field == 'embedding_pic_h14' else vector_field
          similar_items = find_similar_by_vector(es, query_vector, knn_field)
          
          # 过滤掉自己,只保留top N
fb8112e0   tangwang   offline tasks: me...
200
          # 注意:分数已经在find_similar_by_vector中应用了on_sell_days_boost提权
b57c6eb4   tangwang   offline tasks: fi...
201
          filtered_items = []
fb8112e0   tangwang   offline tasks: me...
202
          for sim_id, boosted_score, name in similar_items:
b57c6eb4   tangwang   offline tasks: fi...
203
              if sim_id != str(item_id):
fb8112e0   tangwang   offline tasks: me...
204
                  filtered_items.append((sim_id, boosted_score, name))
c59dd0b0   tangwang   补充部分任务明文版本输出
205
              if len(filtered_items) >= top_n:
b57c6eb4   tangwang   offline tasks: fi...
206
207
208
209
                  break
          
          if filtered_items:
              result[item_id] = filtered_items
5ab1c29c   tangwang   first commit
210
211
212
213
      
      return result
  
  
b57c6eb4   tangwang   offline tasks: fi...
214
  def save_index_file(result, es, output_file, logger):
5ab1c29c   tangwang   first commit
215
      """
b57c6eb4   tangwang   offline tasks: fi...
216
217
218
      保存索引文件
      
      格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...
5ab1c29c   tangwang   first commit
219
      """
b57c6eb4   tangwang   offline tasks: fi...
220
      logger.info(f"保存索引到: {output_file}")
5ab1c29c   tangwang   first commit
221
      
b57c6eb4   tangwang   offline tasks: fi...
222
223
224
225
226
227
228
229
230
231
232
233
      with open(output_file, 'w', encoding='utf-8') as f:
          for item_id, similar_items in result.items():
              if not similar_items:
                  continue
              
              # 获取当前商品的名称
              item_data = get_item_vectors(es, item_id)
              item_name = item_data.get('name_zh', 'Unknown') if item_data else 'Unknown'
              
              # 格式化相似商品列表
              sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score, _ in similar_items])
              f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
5ab1c29c   tangwang   first commit
234
      
b57c6eb4   tangwang   offline tasks: fi...
235
      logger.info(f"索引保存完成,共 {len(result)} 个商品")
5ab1c29c   tangwang   first commit
236
237
238
  
  
  def main():
b57c6eb4   tangwang   offline tasks: fi...
239
      """主函数"""
c59dd0b0   tangwang   补充部分任务明文版本输出
240
241
242
243
244
245
246
247
248
      # 解析命令行参数
      parser = argparse.ArgumentParser(description='Generate content-based similarity using ES vectors')
      parser.add_argument('--debug', action='store_true', help='Enable debug mode with readable output')
      parser.add_argument('--top_n', type=int, default=50, help='Number of similar items per item (default: 50)')
      args = parser.parse_args()
      
      # 使用参数中的top_n值
      top_n = args.top_n
      
14f3dcbe   tangwang   offline tasks
249
      # 设置logger
c59dd0b0   tangwang   补充部分任务明文版本输出
250
      logger = setup_debug_logger('i2i_content_similar', debug=args.debug)
14f3dcbe   tangwang   offline tasks
251
      
b57c6eb4   tangwang   offline tasks: fi...
252
253
254
255
      logger.info("="*80)
      logger.info("开始生成基于ES向量的内容相似索引")
      logger.info(f"ES地址: {ES_CONFIG['host']}")
      logger.info(f"索引名: {ES_CONFIG['index_name']}")
c59dd0b0   tangwang   补充部分任务明文版本输出
256
      logger.info(f"Top N: {top_n}")
b57c6eb4   tangwang   offline tasks: fi...
257
      logger.info("="*80)
14f3dcbe   tangwang   offline tasks
258
      
5ab1c29c   tangwang   first commit
259
      # 创建数据库连接
b57c6eb4   tangwang   offline tasks: fi...
260
      log_processing_step(logger, "连接数据库")
5ab1c29c   tangwang   first commit
261
262
263
264
265
266
267
268
      engine = create_db_connection(
          DB_CONFIG['host'],
          DB_CONFIG['port'],
          DB_CONFIG['database'],
          DB_CONFIG['username'],
          DB_CONFIG['password']
      )
      
b57c6eb4   tangwang   offline tasks: fi...
269
270
271
272
      # 获取活跃商品
      log_processing_step(logger, "获取最近1年有过行为的商品")
      active_items = get_active_items(engine)
      logger.info(f"找到 {len(active_items)} 个活跃商品")
5ab1c29c   tangwang   first commit
273
      
b57c6eb4   tangwang   offline tasks: fi...
274
275
276
277
      # 连接ES
      log_processing_step(logger, "连接Elasticsearch")
      es = connect_es()
      logger.info("ES连接成功")
5ab1c29c   tangwang   first commit
278
      
b57c6eb4   tangwang   offline tasks: fi...
279
280
      # 生成两份相似度索引
      date_str = datetime.now().strftime("%Y%m%d")
5ab1c29c   tangwang   first commit
281
      
c59dd0b0   tangwang   补充部分任务明文版本输出
282
283
284
285
286
287
      # 获取name mappings用于debug模式
      name_mappings = {}
      if args.debug:
          log_processing_step(logger, "获取物品名称映射")
          name_mappings = fetch_name_mappings(engine, debug=True)
      
b57c6eb4   tangwang   offline tasks: fi...
288
289
290
      # 1. 基于名称文本向量
      log_processing_step(logger, "生成基于名称文本向量的相似索引")
      name_result = generate_similarity_index(
c59dd0b0   tangwang   补充部分任务明文版本输出
291
          es, active_items, 'embedding_name_zh', 'name', logger, top_n=top_n
5ab1c29c   tangwang   first commit
292
      )
b57c6eb4   tangwang   offline tasks: fi...
293
294
      name_output = os.path.join(OUTPUT_DIR, f'i2i_content_name_{date_str}.txt')
      save_index_file(name_result, es, name_output, logger)
5ab1c29c   tangwang   first commit
295
      
c59dd0b0   tangwang   补充部分任务明文版本输出
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
      # 如果启用debug模式,保存可读格式
      if args.debug and name_result:
          log_processing_step(logger, "保存i2i_content_name可读格式")
          # 转换数据格式为 {item_id: [(sim_id, score), ...]}
          readable_data = {}
          for item_id, similar_items in name_result.items():
              readable_data[f"i2i:content_name:{item_id}"] = [
                  (sim_id, score) for sim_id, score, _ in similar_items
              ]
          save_readable_index(
              name_output,
              readable_data,
              name_mappings,
              description='i2i:content_name'
          )
      
b57c6eb4   tangwang   offline tasks: fi...
312
313
314
      # 2. 基于图片向量
      log_processing_step(logger, "生成基于图片向量的相似索引")
      pic_result = generate_similarity_index(
c59dd0b0   tangwang   补充部分任务明文版本输出
315
          es, active_items, 'embedding_pic_h14', 'pic', logger, top_n=top_n
b57c6eb4   tangwang   offline tasks: fi...
316
317
318
      )
      pic_output = os.path.join(OUTPUT_DIR, f'i2i_content_pic_{date_str}.txt')
      save_index_file(pic_result, es, pic_output, logger)
14f3dcbe   tangwang   offline tasks
319
      
c59dd0b0   tangwang   补充部分任务明文版本输出
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
      # 如果启用debug模式,保存可读格式
      if args.debug and pic_result:
          log_processing_step(logger, "保存i2i_content_pic可读格式")
          # 转换数据格式为 {item_id: [(sim_id, score), ...]}
          readable_data = {}
          for item_id, similar_items in pic_result.items():
              readable_data[f"i2i:content_pic:{item_id}"] = [
                  (sim_id, score) for sim_id, score, _ in similar_items
              ]
          save_readable_index(
              pic_output,
              readable_data,
              name_mappings,
              description='i2i:content_pic'
          )
      
b57c6eb4   tangwang   offline tasks: fi...
336
337
338
339
340
      logger.info("="*80)
      logger.info("完成!生成了两份内容相似索引:")
      logger.info(f"  1. 名称向量索引: {name_output} ({len(name_result)} 个商品)")
      logger.info(f"  2. 图片向量索引: {pic_output} ({len(pic_result)} 个商品)")
      logger.info("="*80)
5ab1c29c   tangwang   first commit
341
342
343
344
  
  
  if __name__ == '__main__':
      main()