import pandas as pd import math import os from collections import defaultdict from sqlalchemy import create_engine from db_service import create_db_connection import argparse from datetime import datetime from scripts.debug_utils import save_readable_index def clean_text_field(text): if pd.isna(text): return '' # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符 return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip() # 解析命令行参数 parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)') parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天') parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50') parser.add_argument('--debug', action='store_true', help='开启debug模式') args = parser.parse_args() # 数据库连接配置 host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com' port = '9030' database = 'datacenter' username = 'readonly' password = 'essa1234' # 创建数据库连接 engine = create_db_connection(host, port, database, username, password) # SQL 查询 - 获取用户点击序列 sql_query = f""" SELECT DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date, se.anonymous_id AS user_id, se.item_id, pgs.name AS item_name FROM sensors_events se LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id WHERE se.event IN ('contactFactory', 'addToPool', 'addToCart') AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY) ORDER BY se.anonymous_id, se.create_time; """ if args.debug: print(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}") print(f"[DEBUG] 开始查询数据库...") # 执行 SQL 查询并将结果加载到 pandas DataFrame df = pd.read_sql(sql_query, engine) # 确保ID为整数类型 df['item_id'] = df['item_id'].astype(int) df['user_id'] = df['user_id'].astype(str) # user_id保持为字符串 if args.debug: print(f"[DEBUG] 查询完成,共 {len(df)} 条记录") print(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}") print(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}") # 处理点击序列,计算共现关系 cooccur = defaultdict(lambda: defaultdict(int)) freq = defaultdict(int) # 按用户和日期分组处理点击序列 for (user_id, date), group in df.groupby(['user_id', 'date']): items = group['item_id'].tolist() unique_items = set(items) # 更新频率统计 for item in unique_items: freq[item] += 1 # 更新共现关系 for i in range(len(items)): for j in range(i + 1, len(items)): item1, item2 = items[i], items[j] if item1 != item2: cooccur[item1][item2] += 1 cooccur[item2][item1] += 1 # 计算余弦相似度 if args.debug: print(f"[DEBUG] 开始计算相似度...") result = {} for item1 in cooccur: sim_scores = [] for item2 in cooccur[item1]: numerator = cooccur[item1][item2] denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2]) if denominator != 0: score = numerator / denominator sim_scores.append((item2, score)) sim_scores.sort(key=lambda x: -x[1]) # 按分数排序 # 只保留top_n个相似商品 result[item1] = sim_scores[:args.top_n] if args.debug: print(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐") # 创建item_id到name的映射 item_name_map = dict(zip(df['item_id'], df['item_name'])) # 准备输出 date_str = datetime.now().strftime('%Y%m%d') output_dir = 'output' os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt') # 输出相似商品到文件 if args.debug: print(f"[DEBUG] 开始写入文件: {output_file}") with open(output_file, 'w', encoding='utf-8') as f: for item_id, sims in sorted(result.items()): item_name = clean_text_field(item_name_map.get(item_id, 'Unknown')) # 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,... sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) f.write(f'{item_id}\t{item_name}\t{sim_str}\n') print(f"✓ Item相似度计算完成") print(f" - 输出文件: {output_file}") print(f" - 商品数: {len(result)}") if result: avg_sims = sum(len(sims) for sims in result.values()) / len(result) print(f" - 平均相似商品数: {avg_sims:.1f}") # 如果启用debug模式,保存可读格式 if args.debug and result: print("[DEBUG] 保存可读格式文件...") # 准备name_mappings name_mappings = { 'item': {str(k): clean_text_field(v) for k, v in item_name_map.items()} } # 转换数据格式为 {key: [(sim_id, score), ...]} readable_data = {} for item_id, sims in result.items(): readable_data[f"i2i:item_behavior:{item_id}"] = sims save_readable_index( output_file, readable_data, name_mappings, description='i2i:item_behavior' ) print(f" - 可读文件: {output_file.replace('.txt', '_readable.txt')}")