""" i2i - 内容相似索引 基于商品属性(分类、供应商、属性等)计算物品相似度 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import pandas as pd import numpy as np import argparse from datetime import datetime from collections import defaultdict from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from db_service import create_db_connection from offline_tasks.config.offline_config import ( DB_CONFIG, OUTPUT_DIR, DEFAULT_I2I_TOP_N ) def fetch_product_features(engine): """ 获取商品特征数据 """ sql_query = """ SELECT pgs.id as item_id, pgs.name as item_name, pg.supplier_id, ss.name as supplier_name, pg.category_id, pc_1.id as category_level1_id, pc_1.name as category_level1, pc_2.id as category_level2_id, pc_2.name as category_level2, pc_3.id as category_level3_id, pc_3.name as category_level3, pc_4.id as category_level4_id, pc_4.name as category_level4, pgs.capacity, pgs.factory_no, po.name as package_type, po2.name as package_mode, pgs.fir_on_sell_time, pgs.status FROM prd_goods_sku pgs INNER JOIN prd_goods pg ON pg.id = pgs.goods_id INNER JOIN sup_supplier ss ON ss.id = pg.supplier_id LEFT JOIN prd_category as pc ON pc.id = pg.category_id LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 2), '.', -1) LEFT JOIN prd_category AS pc_2 ON pc_2.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 3), '.', -1) LEFT JOIN prd_category AS pc_3 ON pc_3.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 4), '.', -1) LEFT JOIN prd_category AS pc_4 ON pc_4.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pc.path, '.', 5), '.', -1) LEFT JOIN prd_goods_sku_attribute pgsa ON pgs.id = pgsa.goods_sku_id AND pgsa.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'PKG' LIMIT 1) LEFT JOIN prd_option po ON po.id = pgsa.option_id LEFT JOIN prd_goods_sku_attribute pgsa2 ON pgs.id = pgsa2.goods_sku_id AND pgsa2.attribute_id = (SELECT id FROM prd_attribute WHERE code = 'pkg_mode' LIMIT 1) LEFT JOIN prd_option po2 ON po2.id = pgsa2.option_id WHERE pgs.status IN (2, 4, 5) AND pgs.is_delete = 0 """ print("Executing SQL query...") df = pd.read_sql(sql_query, engine) print(f"Fetched {len(df)} products") return df def build_feature_text(row): """ 构建商品的特征文本 """ features = [] # 添加分类信息(权重最高,重复多次) if pd.notna(row['category_level1']): features.extend([str(row['category_level1'])] * 5) if pd.notna(row['category_level2']): features.extend([str(row['category_level2'])] * 4) if pd.notna(row['category_level3']): features.extend([str(row['category_level3'])] * 3) if pd.notna(row['category_level4']): features.extend([str(row['category_level4'])] * 2) # 添加供应商信息 if pd.notna(row['supplier_name']): features.extend([str(row['supplier_name'])] * 2) # 添加包装信息 if pd.notna(row['package_type']): features.append(str(row['package_type'])) if pd.notna(row['package_mode']): features.append(str(row['package_mode'])) # 添加商品名称的关键词(简单分词) if pd.notna(row['item_name']): name_words = str(row['item_name']).split() features.extend(name_words) return ' '.join(features) def calculate_content_similarity(df, top_n=50): """ 基于内容计算相似度 """ print("Building feature texts...") df['feature_text'] = df.apply(build_feature_text, axis=1) print("Calculating TF-IDF...") vectorizer = TfidfVectorizer(max_features=1000) tfidf_matrix = vectorizer.fit_transform(df['feature_text']) print("Calculating cosine similarity...") # 分批计算相似度以节省内存 batch_size = 1000 result = {} for i in range(0, len(df), batch_size): end_i = min(i + batch_size, len(df)) batch_similarity = cosine_similarity(tfidf_matrix[i:end_i], tfidf_matrix) for j, idx in enumerate(range(i, end_i)): item_id = df.iloc[idx]['item_id'] similarities = batch_similarity[j] # 获取最相似的top_n个(排除自己) similar_indices = np.argsort(similarities)[::-1][1:top_n+1] similar_items = [] for sim_idx in similar_indices: if similarities[sim_idx] > 0: # 只保留有相似度的 similar_items.append(( df.iloc[sim_idx]['item_id'], float(similarities[sim_idx]) )) if similar_items: result[item_id] = similar_items print(f"Processed {end_i}/{len(df)} products...") return result def calculate_category_based_similarity(df): """ 基于分类的相似度(同类目下的商品) """ result = defaultdict(list) # 按四级类目分组 for cat4_id, group in df.groupby('category_level4_id'): if pd.isna(cat4_id) or len(group) < 2: continue items = group['item_id'].tolist() for item_id in items: other_items = [x for x in items if x != item_id] # 同四级类目的商品相似度设为0.9 result[item_id].extend([(x, 0.9) for x in other_items[:50]]) # 按三级类目分组(补充) for cat3_id, group in df.groupby('category_level3_id'): if pd.isna(cat3_id) or len(group) < 2: continue items = group['item_id'].tolist() for item_id in items: if item_id not in result or len(result[item_id]) < 50: other_items = [x for x in items if x != item_id] # 同三级类目的商品相似度设为0.7 existing = {x[0] for x in result[item_id]} new_items = [(x, 0.7) for x in other_items if x not in existing] result[item_id].extend(new_items[:50 - len(result[item_id])]) return result def merge_similarities(sim1, sim2, weight1=0.7, weight2=0.3): """ 融合两种相似度 """ result = {} all_items = set(sim1.keys()) | set(sim2.keys()) for item_id in all_items: similarities = defaultdict(float) # 添加第一种相似度 if item_id in sim1: for similar_id, score in sim1[item_id]: similarities[similar_id] += score * weight1 # 添加第二种相似度 if item_id in sim2: for similar_id, score in sim2[item_id]: similarities[similar_id] += score * weight2 # 排序并取top N sorted_sims = sorted(similarities.items(), key=lambda x: -x[1])[:50] if sorted_sims: result[item_id] = sorted_sims return result def main(): parser = argparse.ArgumentParser(description='Calculate content-based item similarity') parser.add_argument('--top_n', type=int, default=DEFAULT_I2I_TOP_N, help=f'Top N similar items to output (default: {DEFAULT_I2I_TOP_N})') parser.add_argument('--method', type=str, default='hybrid', choices=['tfidf', 'category', 'hybrid'], help='Similarity calculation method') parser.add_argument('--output', type=str, default=None, help='Output file path') parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed logging and readable output') args = parser.parse_args() # 创建数据库连接 print("Connecting to database...") engine = create_db_connection( DB_CONFIG['host'], DB_CONFIG['port'], DB_CONFIG['database'], DB_CONFIG['username'], DB_CONFIG['password'] ) # 获取商品特征 df = fetch_product_features(engine) # 计算相似度 if args.method == 'tfidf': print("\nUsing TF-IDF method...") result = calculate_content_similarity(df, args.top_n) elif args.method == 'category': print("\nUsing category-based method...") result = calculate_category_based_similarity(df) else: # hybrid print("\nUsing hybrid method...") tfidf_sim = calculate_content_similarity(df, args.top_n) category_sim = calculate_category_based_similarity(df) result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3) # 创建item_id到name的映射 item_name_map = dict(zip(df['item_id'], df['item_name'])) # 输出结果 output_file = args.output or os.path.join( OUTPUT_DIR, f'i2i_content_{args.method}_{datetime.now().strftime("%Y%m%d")}.txt' ) print(f"\nWriting results to {output_file}...") with open(output_file, 'w', encoding='utf-8') as f: for item_id, sims in result.items(): item_name = item_name_map.get(item_id, 'Unknown') if not sims: continue # 格式:item_id \t item_name \t similar_item_id1:score1,similar_item_id2:score2,... sim_str = ','.join([f'{sim_id}:{score:.4f}' for sim_id, score in sims]) f.write(f'{item_id}\t{item_name}\t{sim_str}\n') print(f"Done! Generated content-based similarities for {len(result)} items") print(f"Output saved to: {output_file}") if __name__ == '__main__': main()