5ab1c29c
tangwang
first commit
|
1
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
2
3
4
5
|
i2i - 基于ES向量的内容相似索引
从Elasticsearch获取商品向量,计算两种相似度:
1. 基于名称文本向量的相似度
2. 基于图片向量的相似度
|
5ab1c29c
tangwang
first commit
|
6
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
7
|
import json
|
e89d7a84
tangwang
deepwalk refactor...
|
8
|
import os
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
9
|
import argparse
|
5ab1c29c
tangwang
first commit
|
10
|
import pandas as pd
|
b57c6eb4
tangwang
offline tasks: fi...
|
11
12
|
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
|
5ab1c29c
tangwang
first commit
|
13
|
from db_service import create_db_connection
|
c9f77c8f
tangwang
deepwalk refactor...
|
14
|
from config.offline_config import DB_CONFIG, OUTPUT_DIR
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
15
16
17
18
|
from scripts.debug_utils import (
setup_debug_logger, log_processing_step,
save_readable_index, fetch_name_mappings
)
|
5ab1c29c
tangwang
first commit
|
19
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
20
21
22
23
24
25
26
|
# ES配置
ES_CONFIG = {
'host': 'http://localhost:9200',
'index_name': 'spu',
'username': 'essa',
'password': '4hOaLaf41y2VuI8y'
}
|
5ab1c29c
tangwang
first commit
|
27
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
28
29
30
31
32
33
34
|
# 算法参数
TOP_N = 50 # 每个商品返回的相似商品数量
KNN_K = 100 # knn查询返回的候选数
KNN_CANDIDATES = 200 # knn查询的候选池大小
def get_active_items(engine):
|
5ab1c29c
tangwang
first commit
|
35
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
36
|
获取最近1年有过行为的item列表
|
5ab1c29c
tangwang
first commit
|
37
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
38
39
40
41
42
43
44
45
46
47
48
|
one_year_ago = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
sql_query = f"""
SELECT DISTINCT
se.item_id
FROM
sensors_events se
WHERE
se.event IN ('click', 'contactFactory', 'addToPool', 'addToCart', 'purchase')
AND se.create_time >= '{one_year_ago}'
AND se.item_id IS NOT NULL
|
5ab1c29c
tangwang
first commit
|
49
50
|
"""
|
5ab1c29c
tangwang
first commit
|
51
|
df = pd.read_sql(sql_query, engine)
|
b57c6eb4
tangwang
offline tasks: fi...
|
52
|
return df['item_id'].tolist()
|
5ab1c29c
tangwang
first commit
|
53
54
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
55
56
57
58
59
60
61
62
63
64
65
66
|
def connect_es():
"""连接到Elasticsearch"""
es = Elasticsearch(
[ES_CONFIG['host']],
basic_auth=(ES_CONFIG['username'], ES_CONFIG['password']),
verify_certs=False,
request_timeout=30
)
return es
def get_item_vectors(es, item_id):
|
5ab1c29c
tangwang
first commit
|
67
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
68
|
从ES获取商品的向量数据
|
5ab1c29c
tangwang
first commit
|
69
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
70
|
Returns:
|
fb8112e0
tangwang
offline tasks: me...
|
71
|
dict with keys: _id, name_zh, embedding_name_zh, embedding_pic_h14, on_sell_days_boost
|
b57c6eb4
tangwang
offline tasks: fi...
|
72
73
74
75
76
77
78
79
80
81
82
83
|
或 None if not found
"""
try:
response = es.search(
index=ES_CONFIG['index_name'],
body={
"query": {
"term": {
"_id": str(item_id)
}
},
"_source": {
|
fb8112e0
tangwang
offline tasks: me...
|
84
|
"includes": ["_id", "name_zh", "embedding_name_zh", "embedding_pic_h14", "on_sell_days_boost"]
|
b57c6eb4
tangwang
offline tasks: fi...
|
85
86
87
88
89
90
91
92
93
94
|
}
}
)
if response['hits']['hits']:
hit = response['hits']['hits'][0]
return {
'_id': hit['_id'],
'name_zh': hit['_source'].get('name_zh', ''),
'embedding_name_zh': hit['_source'].get('embedding_name_zh'),
|
fb8112e0
tangwang
offline tasks: me...
|
95
96
|
'embedding_pic_h14': hit['_source'].get('embedding_pic_h14'),
'on_sell_days_boost': hit['_source'].get('on_sell_days_boost', 1.0)
|
b57c6eb4
tangwang
offline tasks: fi...
|
97
98
99
100
|
}
return None
except Exception as e:
return None
|
5ab1c29c
tangwang
first commit
|
101
102
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
103
|
def find_similar_by_vector(es, vector, field_name, k=KNN_K, num_candidates=KNN_CANDIDATES):
|
5ab1c29c
tangwang
first commit
|
104
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
105
|
使用knn查询找到相似的items
|
40442baf
tangwang
offline tasks: fi...
|
106
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
107
108
109
110
111
112
|
Args:
es: Elasticsearch客户端
vector: 查询向量
field_name: 向量字段名 (embedding_name_zh 或 embedding_pic_h14.vector)
k: 返回的结果数
num_candidates: 候选池大小
|
5ab1c29c
tangwang
first commit
|
113
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
114
|
Returns:
|
fb8112e0
tangwang
offline tasks: me...
|
115
|
List of (item_id, boosted_score, name_zh) tuples
|
b57c6eb4
tangwang
offline tasks: fi...
|
116
117
118
119
120
121
122
123
124
125
126
|
"""
try:
response = es.search(
index=ES_CONFIG['index_name'],
body={
"knn": {
"field": field_name,
"query_vector": vector,
"k": k,
"num_candidates": num_candidates
},
|
fb8112e0
tangwang
offline tasks: me...
|
127
|
"_source": ["_id", "name_zh", "on_sell_days_boost"],
|
b57c6eb4
tangwang
offline tasks: fi...
|
128
129
130
|
"size": k
}
)
|
5ab1c29c
tangwang
first commit
|
131
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
132
133
|
results = []
for hit in response['hits']['hits']:
|
fb8112e0
tangwang
offline tasks: me...
|
134
135
136
137
|
# 获取基础分数
base_score = hit['_score']
# 获取on_sell_days_boost提权值,默认为1.0(不提权)
|
6773cdbe
tangwang
fix
|
138
|
boost = hit['_source'].get('on_sell_days_boost', 1.0) or 1.0
|
fb8112e0
tangwang
offline tasks: me...
|
139
140
141
142
|
# 应用提权
boosted_score = base_score * boost
|
b57c6eb4
tangwang
offline tasks: fi...
|
143
144
|
results.append((
hit['_id'],
|
fb8112e0
tangwang
offline tasks: me...
|
145
|
boosted_score,
|
b57c6eb4
tangwang
offline tasks: fi...
|
146
147
148
149
150
|
hit['_source'].get('name_zh', '')
))
return results
except Exception as e:
return []
|
5ab1c29c
tangwang
first commit
|
151
152
|
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
153
|
def generate_similarity_index(es, active_items, vector_field, field_name, logger, top_n=50):
|
5ab1c29c
tangwang
first commit
|
154
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
155
156
157
158
159
160
161
162
|
生成一种向量的相似度索引
Args:
es: Elasticsearch客户端
active_items: 活跃商品ID列表
vector_field: 向量字段名 (embedding_name_zh 或 embedding_pic_h14)
field_name: 字段简称 (name 或 pic)
logger: 日志记录器
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
163
|
top_n: 返回的相似商品数量
|
b57c6eb4
tangwang
offline tasks: fi...
|
164
165
166
|
Returns:
dict: {item_id: [(similar_id, score, name), ...]}
|
5ab1c29c
tangwang
first commit
|
167
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
168
169
|
result = {}
total = len(active_items)
|
5ab1c29c
tangwang
first commit
|
170
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
171
172
173
174
175
176
177
|
for idx, item_id in enumerate(active_items):
if (idx + 1) % 100 == 0:
logger.info(f"处理进度: {idx + 1}/{total} ({(idx + 1) / total * 100:.1f}%)")
# 获取该商品的向量
item_data = get_item_vectors(es, item_id)
if not item_data:
|
5ab1c29c
tangwang
first commit
|
178
179
|
continue
|
b57c6eb4
tangwang
offline tasks: fi...
|
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# 提取向量
if vector_field == 'embedding_name_zh':
query_vector = item_data.get('embedding_name_zh')
elif vector_field == 'embedding_pic_h14':
pic_data = item_data.get('embedding_pic_h14')
if pic_data and isinstance(pic_data, list) and len(pic_data) > 0:
query_vector = pic_data[0].get('vector') if isinstance(pic_data[0], dict) else None
else:
query_vector = None
else:
query_vector = None
if not query_vector:
|
5ab1c29c
tangwang
first commit
|
193
194
|
continue
|
b57c6eb4
tangwang
offline tasks: fi...
|
195
196
197
198
199
|
# 使用knn查询相似items(需要排除自己)
knn_field = f"{vector_field}.vector" if vector_field == 'embedding_pic_h14' else vector_field
similar_items = find_similar_by_vector(es, query_vector, knn_field)
# 过滤掉自己,只保留top N
|
fb8112e0
tangwang
offline tasks: me...
|
200
|
# 注意:分数已经在find_similar_by_vector中应用了on_sell_days_boost提权
|
b57c6eb4
tangwang
offline tasks: fi...
|
201
|
filtered_items = []
|
fb8112e0
tangwang
offline tasks: me...
|
202
|
for sim_id, boosted_score, name in similar_items:
|
b57c6eb4
tangwang
offline tasks: fi...
|
203
|
if sim_id != str(item_id):
|
fb8112e0
tangwang
offline tasks: me...
|
204
|
filtered_items.append((sim_id, boosted_score, name))
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
205
|
if len(filtered_items) >= top_n:
|
b57c6eb4
tangwang
offline tasks: fi...
|
206
207
208
209
|
break
if filtered_items:
result[item_id] = filtered_items
|
5ab1c29c
tangwang
first commit
|
210
211
212
213
|
return result
|
b57c6eb4
tangwang
offline tasks: fi...
|
214
|
def save_index_file(result, es, output_file, logger):
|
5ab1c29c
tangwang
first commit
|
215
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
216
217
218
|
保存索引文件
格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...
|
5ab1c29c
tangwang
first commit
|
219
|
"""
|
b57c6eb4
tangwang
offline tasks: fi...
|
220
|
logger.info(f"保存索引到: {output_file}")
|
5ab1c29c
tangwang
first commit
|
221
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
222
223
224
225
226
227
228
229
230
231
232
233
|
with open(output_file, 'w', encoding='utf-8') as f:
for item_id, similar_items in result.items():
if not similar_items:
continue
# 获取当前商品的名称
item_data = get_item_vectors(es, item_id)
item_name = item_data.get('name_zh', 'Unknown') if item_data else 'Unknown'
# 格式化相似商品列表
sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score, _ in similar_items])
f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
|
5ab1c29c
tangwang
first commit
|
234
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
235
|
logger.info(f"索引保存完成,共 {len(result)} 个商品")
|
5ab1c29c
tangwang
first commit
|
236
237
238
|
def main():
|
b57c6eb4
tangwang
offline tasks: fi...
|
239
|
"""主函数"""
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
240
241
242
243
244
245
246
247
248
|
# 解析命令行参数
parser = argparse.ArgumentParser(description='Generate content-based similarity using ES vectors')
parser.add_argument('--debug', action='store_true', help='Enable debug mode with readable output')
parser.add_argument('--top_n', type=int, default=50, help='Number of similar items per item (default: 50)')
args = parser.parse_args()
# 使用参数中的top_n值
top_n = args.top_n
|
14f3dcbe
tangwang
offline tasks
|
249
|
# 设置logger
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
250
|
logger = setup_debug_logger('i2i_content_similar', debug=args.debug)
|
14f3dcbe
tangwang
offline tasks
|
251
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
252
253
254
255
|
logger.info("="*80)
logger.info("开始生成基于ES向量的内容相似索引")
logger.info(f"ES地址: {ES_CONFIG['host']}")
logger.info(f"索引名: {ES_CONFIG['index_name']}")
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
256
|
logger.info(f"Top N: {top_n}")
|
b57c6eb4
tangwang
offline tasks: fi...
|
257
|
logger.info("="*80)
|
14f3dcbe
tangwang
offline tasks
|
258
|
|
5ab1c29c
tangwang
first commit
|
259
|
# 创建数据库连接
|
b57c6eb4
tangwang
offline tasks: fi...
|
260
|
log_processing_step(logger, "连接数据库")
|
5ab1c29c
tangwang
first commit
|
261
262
263
264
265
266
267
268
|
engine = create_db_connection(
DB_CONFIG['host'],
DB_CONFIG['port'],
DB_CONFIG['database'],
DB_CONFIG['username'],
DB_CONFIG['password']
)
|
b57c6eb4
tangwang
offline tasks: fi...
|
269
270
271
272
|
# 获取活跃商品
log_processing_step(logger, "获取最近1年有过行为的商品")
active_items = get_active_items(engine)
logger.info(f"找到 {len(active_items)} 个活跃商品")
|
5ab1c29c
tangwang
first commit
|
273
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
274
275
276
277
|
# 连接ES
log_processing_step(logger, "连接Elasticsearch")
es = connect_es()
logger.info("ES连接成功")
|
5ab1c29c
tangwang
first commit
|
278
|
|
b57c6eb4
tangwang
offline tasks: fi...
|
279
280
|
# 生成两份相似度索引
date_str = datetime.now().strftime("%Y%m%d")
|
5ab1c29c
tangwang
first commit
|
281
|
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
282
283
284
285
286
287
|
# 获取name mappings用于debug模式
name_mappings = {}
if args.debug:
log_processing_step(logger, "获取物品名称映射")
name_mappings = fetch_name_mappings(engine, debug=True)
|
b57c6eb4
tangwang
offline tasks: fi...
|
288
289
290
|
# 1. 基于名称文本向量
log_processing_step(logger, "生成基于名称文本向量的相似索引")
name_result = generate_similarity_index(
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
291
|
es, active_items, 'embedding_name_zh', 'name', logger, top_n=top_n
|
5ab1c29c
tangwang
first commit
|
292
|
)
|
b57c6eb4
tangwang
offline tasks: fi...
|
293
294
|
name_output = os.path.join(OUTPUT_DIR, f'i2i_content_name_{date_str}.txt')
save_index_file(name_result, es, name_output, logger)
|
5ab1c29c
tangwang
first commit
|
295
|
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
# 如果启用debug模式,保存可读格式
if args.debug and name_result:
log_processing_step(logger, "保存i2i_content_name可读格式")
# 转换数据格式为 {item_id: [(sim_id, score), ...]}
readable_data = {}
for item_id, similar_items in name_result.items():
readable_data[f"i2i:content_name:{item_id}"] = [
(sim_id, score) for sim_id, score, _ in similar_items
]
save_readable_index(
name_output,
readable_data,
name_mappings,
description='i2i:content_name'
)
|
b57c6eb4
tangwang
offline tasks: fi...
|
312
313
314
|
# 2. 基于图片向量
log_processing_step(logger, "生成基于图片向量的相似索引")
pic_result = generate_similarity_index(
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
315
|
es, active_items, 'embedding_pic_h14', 'pic', logger, top_n=top_n
|
b57c6eb4
tangwang
offline tasks: fi...
|
316
317
318
|
)
pic_output = os.path.join(OUTPUT_DIR, f'i2i_content_pic_{date_str}.txt')
save_index_file(pic_result, es, pic_output, logger)
|
14f3dcbe
tangwang
offline tasks
|
319
|
|
c59dd0b0
tangwang
补充部分任务明文版本输出
|
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
# 如果启用debug模式,保存可读格式
if args.debug and pic_result:
log_processing_step(logger, "保存i2i_content_pic可读格式")
# 转换数据格式为 {item_id: [(sim_id, score), ...]}
readable_data = {}
for item_id, similar_items in pic_result.items():
readable_data[f"i2i:content_pic:{item_id}"] = [
(sim_id, score) for sim_id, score, _ in similar_items
]
save_readable_index(
pic_output,
readable_data,
name_mappings,
description='i2i:content_pic'
)
|
b57c6eb4
tangwang
offline tasks: fi...
|
336
337
338
339
340
|
logger.info("="*80)
logger.info("完成!生成了两份内容相似索引:")
logger.info(f" 1. 名称向量索引: {name_output} ({len(name_result)} 个商品)")
logger.info(f" 2. 图片向量索引: {pic_output} ({len(pic_result)} 个商品)")
logger.info("="*80)
|
5ab1c29c
tangwang
first commit
|
341
342
343
344
|
if __name__ == '__main__':
main()
|