i2i_item_behavior.py 6.39 KB
import pandas as pd
import math
import os
from collections import defaultdict
from sqlalchemy import create_engine
from db_service import create_db_connection
import argparse
from datetime import datetime
from scripts.debug_utils import save_readable_index
import logging

def setup_logger():
    """设置logger配置"""
    # 创建logs目录
    logs_dir = 'logs'
    os.makedirs(logs_dir, exist_ok=True)
    
    # 创建logger
    logger = logging.getLogger('i2i_item_behavior')
    logger.setLevel(logging.INFO)
    
    # 避免重复添加handler
    if logger.handlers:
        return logger
    
    # 创建文件handler
    log_file = os.path.join(logs_dir, f'i2i_item_behavior_{datetime.now().strftime("%Y%m%d")}.log')
    file_handler = logging.FileHandler(log_file, encoding='utf-8')
    file_handler.setLevel(logging.INFO)
    
    # 创建控制台handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 创建formatter
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加handler到logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

def clean_text_field(text):
    if pd.isna(text):
        return ''
    # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符
    return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

# 解析命令行参数
parser = argparse.ArgumentParser(description='计算基于用户行为的商品相似度(Item Similarity)')
parser.add_argument('--lookback_days', type=int, default=180, help='回溯天数,默认180天')
parser.add_argument('--top_n', type=int, default=50, help='每个商品保留的相似商品数量,默认50')
parser.add_argument('--debug', action='store_true', help='开启debug模式')
args = parser.parse_args()

# 数据库连接配置
host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'
port = '9030'
database = 'datacenter'
username = 'readonly'
password = 'essa1234'

# 创建数据库连接
engine = create_db_connection(host, port, database, username, password)

# SQL 查询 - 获取用户点击序列
sql_query = f"""
SELECT 
    DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date,
    se.anonymous_id AS user_id,
    se.item_id,
    pgs.name AS item_name
FROM 
    sensors_events se
LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id
WHERE 
    se.event IN ('contactFactory', 'addToPool', 'addToCart')
    AND se.create_time >= DATE_SUB(NOW(), INTERVAL {args.lookback_days} DAY)
ORDER BY 
    se.anonymous_id,
    se.create_time;
"""

if args.debug:
    print(f"[DEBUG] 参数配置: lookback_days={args.lookback_days}, top_n={args.top_n}")
    print(f"[DEBUG] 开始查询数据库...")

# 执行 SQL 查询并将结果加载到 pandas DataFrame
df = pd.read_sql(sql_query, engine)

# 确保ID为整数类型
df['item_id'] = df['item_id'].astype(int)
df['user_id'] = df['user_id'].astype(str)  # user_id保持为字符串

if args.debug:
    print(f"[DEBUG] 查询完成,共 {len(df)} 条记录")
    print(f"[DEBUG] 唯一用户数: {df['user_id'].nunique()}")
    print(f"[DEBUG] 唯一商品数: {df['item_id'].nunique()}")

# 处理点击序列,计算共现关系
cooccur = defaultdict(lambda: defaultdict(int))
freq = defaultdict(int)

# 按用户和日期分组处理点击序列
for (user_id, date), group in df.groupby(['user_id', 'date']):
    items = group['item_id'].tolist()
    unique_items = set(items)
    
    # 更新频率统计
    for item in unique_items:
        freq[item] += 1
    
    # 更新共现关系
    for i in range(len(items)):
        for j in range(i + 1, len(items)):
            item1, item2 = items[i], items[j]
            if item1 != item2:
                cooccur[item1][item2] += 1
                cooccur[item2][item1] += 1

# 计算余弦相似度
if args.debug:
    print(f"[DEBUG] 开始计算相似度...")

result = {}
for item1 in cooccur:
    sim_scores = []
    for item2 in cooccur[item1]:
        numerator = cooccur[item1][item2]
        denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2])
        if denominator != 0:
            score = numerator / denominator
            sim_scores.append((item2, score))
    sim_scores.sort(key=lambda x: -x[1])  # 按分数排序
    # 只保留top_n个相似商品
    result[item1] = sim_scores[:args.top_n]

if args.debug:
    print(f"[DEBUG] 相似度计算完成,共 {len(result)} 个商品有相似推荐")

# 创建item_id到name的映射
item_name_map = dict(zip(df['item_id'], df['item_name']))

# 准备输出
date_str = datetime.now().strftime('%Y%m%d')
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f'i2i_item_behavior_{date_str}.txt')

# 输出相似商品到文件
if args.debug:
    print(f"[DEBUG] 开始写入文件: {output_file}")

with open(output_file, 'w', encoding='utf-8') as f:
    for item_id, sims in sorted(result.items()):
        item_name = clean_text_field(item_name_map.get(item_id, 'Unknown'))
        # 格式: item_id \t item_name \t similar_id1:score1,similar_id2:score2,...
        sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
        f.write(f'{item_id}\t{item_name}\t{sim_str}\n')

print(f"✓ Item相似度计算完成")
print(f"  - 输出文件: {output_file}")
print(f"  - 商品数: {len(result)}")
if result:
    avg_sims = sum(len(sims) for sims in result.values()) / len(result)
    print(f"  - 平均相似商品数: {avg_sims:.1f}")

# 如果启用debug模式,保存可读格式
if args.debug and result:
    print("[DEBUG] 保存可读格式文件...")
    
    # 准备name_mappings
    name_mappings = {
        'item': {str(k): clean_text_field(v) for k, v in item_name_map.items()}
    }
    
    # 转换数据格式为 {key: [(sim_id, score), ...]}
    readable_data = {}
    for item_id, sims in result.items():
        readable_data[f"i2i:item_behavior:{item_id}"] = sims
    
    save_readable_index(
        output_file,
        readable_data,
        name_mappings,
        description='i2i:item_behavior'
    )
    print(f"  - 可读文件: {output_file.replace('.txt', '_readable.txt')}")