i2i_content_similar.py 9.7 KB
"""
i2i - 内容相似索引
基于商品属性(分类、供应商、属性等)计算物品相似度
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

import pandas as pd
import numpy as np
import argparse
from datetime import datetime
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from db_service import create_db_connection
from offline_tasks.config.offline_config import (
    DB_CONFIG, OUTPUT_DIR, DEFAULT_I2I_TOP_N
)


def fetch_product_features(engine):
    """
    获取商品特征数据
    """
    sql_query = """
    SELECT 
        pgs.id as item_id,
        pgs.name as item_name,
        pg.supplier_id,
        ss.name as supplier_name,
        pg.category_id,
        pc_1.id as category_level1_id,
        pc_1.name as category_level1,
        pc_2.id as category_level2_id,
        pc_2.name as category_level2,
        pc_3.id as category_level3_id,
        pc_3.name as category_level3,
        pc_4.id as category_level4_id,
        pc_4.name as category_level4,
        pgs.capacity,
        pgs.factory_no,
        po.name as package_type,
        po2.name as package_mode,
        pgs.fir_on_sell_time,
        pgs.status
    FROM prd_goods_sku pgs
    INNER JOIN prd_goods pg ON pg.id = pgs.goods_id
    INNER JOIN sup_supplier ss ON ss.id = pg.supplier_id
    LEFT JOIN prd_category as pc ON pc.id = pg.category_id
    LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1)
    LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1)
    LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1)
    LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1)
    LEFT JOIN prd_goods_sku_attribute pgsa ON pgs.id = pgsa.goods_sku_id 
        AND pgsa.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'PKG' LIMIT 1)
    LEFT JOIN prd_option po ON po.id = pgsa.option_id
    LEFT JOIN prd_goods_sku_attribute pgsa2 ON pgs.id = pgsa2.goods_sku_id 
        AND pgsa2.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'pkg_mode' LIMIT 1)
    LEFT JOIN prd_option po2 ON po2.id = pgsa2.option_id
    WHERE pgs.status IN (2, 4, 5)
        AND pgs.is_delete = 0
    """
    
    print("Executing SQL query...")
    df = pd.read_sql(sql_query, engine)
    print(f"Fetched {len(df)} products")
    return df


def build_feature_text(row):
    """
    构建商品的特征文本
    """
    features = []
    
    # 添加分类信息(权重最高,重复多次)
    if pd.notna(row['category_level1']):
        features.extend([str(row['category_level1'])] * 5)
    if pd.notna(row['category_level2']):
        features.extend([str(row['category_level2'])] * 4)
    if pd.notna(row['category_level3']):
        features.extend([str(row['category_level3'])] * 3)
    if pd.notna(row['category_level4']):
        features.extend([str(row['category_level4'])] * 2)
    
    # 添加供应商信息
    if pd.notna(row['supplier_name']):
        features.extend([str(row['supplier_name'])] * 2)
    
    # 添加包装信息
    if pd.notna(row['package_type']):
        features.append(str(row['package_type']))
    if pd.notna(row['package_mode']):
        features.append(str(row['package_mode']))
    
    # 添加商品名称的关键词(简单分词)
    if pd.notna(row['item_name']):
        name_words = str(row['item_name']).split()
        features.extend(name_words)
    
    return ' '.join(features)


def calculate_content_similarity(df, top_n=50):
    """
    基于内容计算相似度
    """
    print("Building feature texts...")
    df['feature_text'] = df.apply(build_feature_text, axis=1)
    
    print("Calculating TF-IDF...")
    vectorizer = TfidfVectorizer(max_features=1000)
    tfidf_matrix = vectorizer.fit_transform(df['feature_text'])
    
    print("Calculating cosine similarity...")
    # 分批计算相似度以节省内存
    batch_size = 1000
    result = {}
    
    for i in range(0, len(df), batch_size):
        end_i = min(i + batch_size, len(df))
        batch_similarity = cosine_similarity(tfidf_matrix[i:end_i], tfidf_matrix)
        
        for j, idx in enumerate(range(i, end_i)):
            item_id = df.iloc[idx]['item_id']
            similarities = batch_similarity[j]
            
            # 获取最相似的top_n个(排除自己)
            similar_indices = np.argsort(similarities)[::-1][1:top_n+1]
            similar_items = []
            
            for sim_idx in similar_indices:
                if similarities[sim_idx] > 0:  # 只保留有相似度的
                    similar_items.append((
                        df.iloc[sim_idx]['item_id'],
                        float(similarities[sim_idx])
                    ))
            
            if similar_items:
                result[item_id] = similar_items
        
        print(f"Processed {end_i}/{len(df)} products...")
    
    return result


def calculate_category_based_similarity(df):
    """
    基于分类的相似度(同类目下的商品)
    """
    result = defaultdict(list)
    
    # 按四级类目分组
    for cat4_id, group in df.groupby('category_level4_id'):
        if pd.isna(cat4_id) or len(group) < 2:
            continue
        
        items = group['item_id'].tolist()
        for item_id in items:
            other_items = [x for x in items if x != item_id]
            # 同四级类目的商品相似度设为0.9
            result[item_id].extend([(x, 0.9) for x in other_items[:50]])
    
    # 按三级类目分组(补充)
    for cat3_id, group in df.groupby('category_level3_id'):
        if pd.isna(cat3_id) or len(group) < 2:
            continue
        
        items = group['item_id'].tolist()
        for item_id in items:
            if item_id not in result or len(result[item_id]) < 50:
                other_items = [x for x in items if x != item_id]
                # 同三级类目的商品相似度设为0.7
                existing = {x[0] for x in result[item_id]}
                new_items = [(x, 0.7) for x in other_items if x not in existing]
                result[item_id].extend(new_items[:50 - len(result[item_id])])
    
    return result


def merge_similarities(sim1, sim2, weight1=0.7, weight2=0.3):
    """
    融合两种相似度
    """
    result = {}
    all_items = set(sim1.keys()) | set(sim2.keys())
    
    for item_id in all_items:
        similarities = defaultdict(float)
        
        # 添加第一种相似度
        if item_id in sim1:
            for similar_id, score in sim1[item_id]:
                similarities[similar_id] += score * weight1
        
        # 添加第二种相似度
        if item_id in sim2:
            for similar_id, score in sim2[item_id]:
                similarities[similar_id] += score * weight2
        
        # 排序并取top N
        sorted_sims = sorted(similarities.items(), key=lambda x: -x[1])[:50]
        if sorted_sims:
            result[item_id] = sorted_sims
    
    return result


def main():
    parser = argparse.ArgumentParser(description='Calculate content-based item similarity')
    parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N,
                       help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})')
    parser.add_argument('--method', type=str, default='hybrid',
                       choices=['tfidf', 'category', 'hybrid'],
                       help='Similarity calculation method')
    parser.add_argument('--output', type=str, default=None,
                       help='Output file path')
    parser.add_argument('--debug', action='store_true',
                       help='Enable debug mode with detailed logging and readable output')

    args = parser.parse_args()
    
    # 创建数据库连接
    print("Connecting to database...")
    engine = create_db_connection(
        DB_CONFIG['host'],
        DB_CONFIG['port'],
        DB_CONFIG['database'],
        DB_CONFIG['username'],
        DB_CONFIG['password']
    )
    
    # 获取商品特征
    df = fetch_product_features(engine)
    
    # 计算相似度
    if args.method == 'tfidf':
        print("\nUsing TF-IDF method...")
        result = calculate_content_similarity(df, args.top_n)
    elif args.method == 'category':
        print("\nUsing category-based method...")
        result = calculate_category_based_similarity(df)
    else:  # hybrid
        print("\nUsing hybrid method...")
        tfidf_sim = calculate_content_similarity(df, args.top_n)
        category_sim = calculate_category_based_similarity(df)
        result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3)
    
    # 创建item_id到name的映射
    item_name_map = dict(zip(df['item_id'], df['item_name']))
    
    # 输出结果
    output_file = args.output or os.path.join(
        OUTPUT_DIR, 
        f'i2i_content_{args.method}_{datetime.now().strftime("%Y%m%d")}.txt'
    )
    
    print(f"\nWriting results to {output_file}...")
    with open(output_file, 'w', encoding='utf-8') as f:
        for item_id, sims in result.items():
            item_name = item_name_map.get(item_id, 'Unknown')
            
            if not sims:
                continue
            
            # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,...
            sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims])
            f.write(f'{item_id}\t{item_name}\t{sim_str}\n')
    
    print(f"Done! Generated content-based similarities for {len(result)} items")
    print(f"Output saved to: {output_file}")


if __name__ == '__main__':
    main()