i2i_content_similar.py 11.2 KB
"""
i2i - 内容相似索引
基于商品属性(分类、供应商、属性等)计算物品相似度
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

import pandas as pd
import numpy as np
import argparse
from datetime import datetime
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from db_service import create_db_connection
from offline_tasks.config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, DEFAULT_I2I_TOP_N
)
from offline_tasks.scripts.debug_utils import (
    setup_debug_logger, log_dataframe_info, log_dict_stats,
    save_readable_index, fetch_name_mappings, log_algorithm_params,
    log_processing_step
)


def fetch_product_features(engine):
    """
    获取商品特征数据
    """
    sql_query = """
    SELECT 
        pgs.id as item_id,
        pgs.name as item_name,
        pg.supplier_id,
        ss.name as supplier_name,
        pg.category_id,
        pc_1.id as category_level1_id,
        pc_1.name as category_level1,
        pc_2.id as category_level2_id,
        pc_2.name as category_level2,
        pc_3.id as category_level3_id,
        pc_3.name as category_level3,
        pc_4.id as category_level4_id,
        pc_4.name as category_level4,
        pgs.capacity,
        pgs.factory_no,
        po.name as package_type,
        po2.name as package_mode,
        pgs.fir_on_sell_time,
        pgs.status
    FROM prd_goods_sku pgs
    INNER JOIN prd_goods pg ON pg.id = pgs.goods_id
    INNER JOIN sup_supplier ss ON ss.id = pg.supplier_id
    LEFT JOIN prd_category as pc ON pc.id = pg.category_id
    LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1)
    LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1)
    LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1)
    LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1)
    LEFT JOIN prd_goods_sku_attribute pgsa ON pgs.id = pgsa.goods_sku_id 
        AND pgsa.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'PKG' LIMIT 1)
    LEFT JOIN prd_option po ON po.id = pgsa.option_id
    LEFT JOIN prd_goods_sku_attribute pgsa2 ON pgs.id = pgsa2.goods_sku_id 
        AND pgsa2.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'pkg_mode' LIMIT 1)
    LEFT JOIN prd_option po2 ON po2.id = pgsa2.option_id
    WHERE pgs.status IN (2, 4, 5)
        AND pgs.is_delete = 0
    """
    
    print("Executing SQL query...")
    df = pd.read_sql(sql_query, engine)
    print(f"Fetched {len(df)} products")
    return df


def build_feature_text(row):
    """
    构建商品的特征文本
    """
    features = []
    
    # 添加分类信息(权重最高,重复多次)
    if pd.notna(row['category_level1']):
        features.extend([str(row['category_level1'])] * 5)
    if pd.notna(row['category_level2']):
        features.extend([str(row['category_level2'])] * 4)
    if pd.notna(row['category_level3']):
        features.extend([str(row['category_level3'])] * 3)
    if pd.notna(row['category_level4']):
        features.extend([str(row['category_level4'])] * 2)
    
    # 添加供应商信息
    if pd.notna(row['supplier_name']):
        features.extend([str(row['supplier_name'])] * 2)
    
    # 添加包装信息
    if pd.notna(row['package_type']):
        features.append(str(row['package_type']))
    if pd.notna(row['package_mode']):
        features.append(str(row['package_mode']))
    
    # 添加商品名称的关键词(简单分词)
    if pd.notna(row['item_name']):
        name_words = str(row['item_name']).split()
        features.extend(name_words)
    
    return ' '.join(features)


def calculate_content_similarity(df, top_n=50):
    """
    基于内容计算相似度
    """
    print("Building feature texts...")
    df['feature_text'] = df.apply(build_feature_text, axis=1)
    
    print("Calculating TF-IDF...")
    vectorizer = TfidfVectorizer(max_features=1000)
    tfidf_matrix = vectorizer.fit_transform(df['feature_text'])
    
    print("Calculating cosine similarity...")
    # 分批计算相似度以节省内存
    batch_size = 1000
    result = {}
    
    for i in range(0, len(df), batch_size):
        end_i = min(i + batch_size, len(df))
        batch_similarity = cosine_similarity(tfidf_matrix[i:end_i], tfidf_matrix)
        
        for j, idx in enumerate(range(i, end_i)):
            item_id = df.iloc[idx]['item_id']
            similarities = batch_similarity[j]
            
            # 获取最相似的top_n个(排除自己)
            similar_indices = np.argsort(similarities)[::-1][1:top_n+1]
            similar_items = []
            
            for sim_idx in similar_indices:
                if similarities[sim_idx] > 0:  # 只保留有相似度的
                    similar_items.append((
                        df.iloc[sim_idx]['item_id'],
                        float(similarities[sim_idx])
                    ))
            
            if similar_items:
                result[item_id] = similar_items
        
        print(f"Processed {end_i}/{len(df)} products...")
    
    return result


def calculate_category_based_similarity(df):
    """
    基于分类的相似度(同类目下的商品)
    """
    result = defaultdict(list)
    
    # 按四级类目分组
    for cat4_id, group in df.groupby('category_level4_id'):
        if pd.isna(cat4_id) or len(group) < 2:
            continue
        
        items = group['item_id'].tolist()
        for item_id in items:
            other_items = [x for x in items if x != item_id]
            # 同四级类目的商品相似度设为0.9
            result[item_id].extend([(x, 0.9) for x in other_items[:50]])
    
    # 按三级类目分组(补充)
    for cat3_id, group in df.groupby('category_level3_id'):
        if pd.isna(cat3_id) or len(group) < 2:
            continue
        
        items = group['item_id'].tolist()
        for item_id in items:
            if item_id not in result or len(result[item_id]) < 50:
                other_items = [x for x in items if x != item_id]
                # 同三级类目的商品相似度设为0.7
                existing = {x[0] for x in result[item_id]}
                new_items = [(x, 0.7) for x in other_items if x not in existing]
                result[item_id].extend(new_items[:50 - len(result[item_id])])
    
    return result


def merge_similarities(sim1, sim2, weight1=0.7, weight2=0.3):
    """
    融合两种相似度
    """
    result = {}
    all_items = set(sim1.keys()) | set(sim2.keys())
    
    for item_id in all_items:
        similarities = defaultdict(float)
        
        # 添加第一种相似度
        if item_id in sim1:
            for similar_id, score in sim1[item_id]:
                similarities[similar_id] += score * weight1
        
        # 添加第二种相似度
        if item_id in sim2:
            for similar_id, score in sim2[item_id]:
                similarities[similar_id] += score * weight2
        
        # 排序并取top N
        sorted_sims = sorted(similarities.items(), key=lambda x: -x[1])[:50]
        if sorted_sims:
            result[item_id] = sorted_sims
    
    return result


def main():
    parser = argparse.ArgumentParser(description='Calculate content-based item similarity')
    parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N,
                       help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
    parser.add_argument('--method', type=str, default='hybrid',
                       choices=['tfidf', 'category', 'hybrid'],
                       help='Similarity calculation method')
    parser.add_argument('--output', type=str, default=None,
                       help='Output file path')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging and readable output')

    args = parser.parse_args()
    
    # 设置logger
    logger = setup_debug_logger('i2i_content_similar', debug=args.debug)
    
    # 记录算法参数
    params = {
        'top_n': args.top_n,
        'method': args.method,
        'debug': args.debug
    }
    log_algorithm_params(logger, params)
    
    # 创建数据库连接
    logger.info("连接数据库...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取商品特征
    log_processing_step(logger, "获取商品特征")
    df = fetch_product_features(engine)
    logger.info(f"获取到 {len(df)} 个商品的特征数据")
    log_dataframe_info(logger, df, "商品特征数据")
    
    # 计算相似度
    log_processing_step(logger, f"计算相似度 (方法: {args.method})")
    if args.method == 'tfidf':
        logger.info("使用 TF-IDF 方法...")
        result = calculate_content_similarity(df, args.top_n)
    elif args.method == 'category':
        logger.info("使用基于分类的方法...")
        result = calculate_category_based_similarity(df)
    else:  # hybrid
        logger.info("使用混合方法 (TF-IDF 70% + 分类 30%)...")
        tfidf_sim = calculate_content_similarity(df, args.top_n)
        category_sim = calculate_category_based_similarity(df)
        result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3)
    
    logger.info(f"为 {len(result)} 个物品生成了相似度")
    
    # 输出结果
    log_processing_step(logger, "保存结果")
    output_file = args.output or os.path.join(
        OUTPUT_DIR, 
        f'i2i_content_{args.method}_{datetime.now().strftime("%Y%m%d")}.txt'
    )
    
    # 获取name mappings
    name_mappings = {}
    if args.debug:
        logger.info("获取物品名称映射...")
        name_mappings = fetch_name_mappings(engine, debug=True)
    
    logger.info(f"写入结果到 {output_file}...")
    with open(output_file, 'w', encoding='utf-8') as f:
        for item_id, sims in result.items():
            # 使用name_mappings获取名称
            item_name = name_mappings.get(item_id, 'Unknown')
            if item_name == 'Unknown' and 'item_name' in df.columns:
                item_name = df[df['item_id'] == item_id]['item_name'].iloc[0] if len(df[df['item_id'] == item_id]) > 0 else 'Unknown'
            
            if not sims:
                continue
            
            # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,...
            sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
            f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
    
    logger.info(f"完成!为 {len(result)} 个物品生成了基于内容的相似度")
    logger.info(f"输出保存到:{output_file}")
    
    # 如果启用debug模式,保存可读格式
    if args.debug:
        log_processing_step(logger, "保存Debug可读格式")
        save_readable_index(
            output_file,
            result,
            name_mappings,
            index_type=f'i2i:content:{args.method}',
            logger=logger
        )


if __name__ == '__main__':
    main()