From 40442baf1b796509110529e7ce705b24450e5da6 Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 17 Oct 2025 08:07:22 +0800 Subject: [PATCH] offline tasks: fix bugs of i2i swing / hot / sessionw2v --- offline_tasks/run.sh | 6 ++++++ offline_tasks/scripts/i2i_content_similar.py | 41 +++++++++++++++++++++++++++++------------ offline_tasks/scripts/i2i_deepwalk.py | 3 +-- offline_tasks/scripts/i2i_session_w2v.py | 6 +++--- offline_tasks/scripts/interest_aggregation.py | 8 ++++---- 5 files changed, 43 insertions(+), 21 deletions(-) diff --git a/offline_tasks/run.sh b/offline_tasks/run.sh index b794ce2..969a33f 100755 --- a/offline_tasks/run.sh +++ b/offline_tasks/run.sh @@ -6,6 +6,12 @@ cd /home/tw/recommendation/offline_tasks # 2. 测试连接 # python3 test_connection.py +ps -ef|grep run_all.py | awk '{print $2}' | xargs kill -9 +ps -ef|grep recommendation | awk '{print $2}' | xargs kill -9 +rm output/* -rf +rm logs/* -rf + + # 3. 调试模式运行(小数据量) python3 run_all.py --lookback_days 7 --top_n 10 --debug diff --git a/offline_tasks/scripts/i2i_content_similar.py b/offline_tasks/scripts/i2i_content_similar.py index b58861d..3db8fac 100644 --- a/offline_tasks/scripts/i2i_content_similar.py +++ b/offline_tasks/scripts/i2i_content_similar.py @@ -107,24 +107,37 @@ def build_feature_text(row): return ' '.join(features) -def calculate_content_similarity(df, top_n=50): +def calculate_content_similarity(df, top_n=50, logger=None): """ - 基于内容计算相似度 + 基于内容计算相似度(内存优化版) """ - print("Building feature texts...") + + if logger: + logger.info("构建特征文本...") + else: + print("Building feature texts...") df['feature_text'] = df.apply(build_feature_text, axis=1) - print("Calculating TF-IDF...") + if logger: + logger.info("计算 TF-IDF...") + else: + print("Calculating TF-IDF...") vectorizer = TfidfVectorizer(max_features=1000) tfidf_matrix = vectorizer.fit_transform(df['feature_text']) - print("Calculating cosine similarity...") - # 分批计算相似度以节省内存 + if logger: + logger.info(f"TF-IDF 矩阵形状: {tfidf_matrix.shape}") + logger.info("开始计算余弦相似度(内存优化模式)...") + else: + print("Calculating cosine similarity...") + batch_size = 1000 result = {} for i in range(0, len(df), batch_size): end_i = min(i + batch_size, len(df)) + + # 分批计算相似度 batch_similarity = cosine_similarity(tfidf_matrix[i:end_i], tfidf_matrix) for j, idx in enumerate(range(i, end_i)): @@ -144,8 +157,11 @@ def calculate_content_similarity(df, top_n=50): if similar_items: result[item_id] = similar_items - - print(f"Processed {end_i}/{len(df)} products...") + + if logger: + logger.info(f"已处理 {end_i}/{len(df)} 个商品...") + else: + print(f"Processed {end_i}/{len(df)} products...") return result @@ -257,14 +273,16 @@ def main(): log_processing_step(logger, f"计算相似度 (方法: {args.method})") if args.method == 'tfidf': logger.info("使用 TF-IDF 方法...") - result = calculate_content_similarity(df, args.top_n) + result = calculate_content_similarity(df, args.top_n, logger=logger) elif args.method == 'category': logger.info("使用基于分类的方法...") result = calculate_category_based_similarity(df) else: # hybrid logger.info("使用混合方法 (TF-IDF 70% + 分类 30%)...") - tfidf_sim = calculate_content_similarity(df, args.top_n) + tfidf_sim = calculate_content_similarity(df, args.top_n, logger=logger) + logger.info("计算基于分类的相似度...") category_sim = calculate_category_based_similarity(df) + logger.info("合并相似度...") result = merge_similarities(tfidf_sim, category_sim, weight1=0.7, weight2=0.3) logger.info(f"为 {len(result)} 个物品生成了相似度") @@ -307,8 +325,7 @@ def main(): output_file, result, name_mappings, - index_type=f'i2i:content:{args.method}', - logger=logger + description=f'i2i:content:{args.method}' ) diff --git a/offline_tasks/scripts/i2i_deepwalk.py b/offline_tasks/scripts/i2i_deepwalk.py index 955b9d6..4bdd2a6 100644 --- a/offline_tasks/scripts/i2i_deepwalk.py +++ b/offline_tasks/scripts/i2i_deepwalk.py @@ -372,8 +372,7 @@ def main(): output_file, result, name_mappings, - index_type='i2i:deepwalk', - logger=logger + description='i2i:deepwalk' ) diff --git a/offline_tasks/scripts/i2i_session_w2v.py b/offline_tasks/scripts/i2i_session_w2v.py index a14fcfd..6d5c459 100644 --- a/offline_tasks/scripts/i2i_session_w2v.py +++ b/offline_tasks/scripts/i2i_session_w2v.py @@ -185,7 +185,8 @@ def main(): 'epochs': args.epochs, 'top_n': args.top_n, 'lookback_days': args.lookback_days, - 'session_gap_minutes': args.session_gap, + 'max_session_length': args.max_session_length, + 'min_session_length': args.min_session_length, 'debug': args.debug } log_algorithm_params(logger, params) @@ -303,8 +304,7 @@ def main(): output_file, result, name_mappings, - index_type='i2i:session_w2v', - logger=logger + description='i2i:session_w2v' ) diff --git a/offline_tasks/scripts/interest_aggregation.py b/offline_tasks/scripts/interest_aggregation.py index 0920e1a..6528153 100644 --- a/offline_tasks/scripts/interest_aggregation.py +++ b/offline_tasks/scripts/interest_aggregation.py @@ -356,16 +356,16 @@ def main(): if args.debug and aggregations: for dim_key, items in aggregations.items(): if items: - # 为每个维度生成可读索引 - result_dict = {dim_key: items[:args.top_n]} + # 为每个维度生成可读索引 - 先排序再取前N个 + sorted_items = sorted(items.items(), key=lambda x: -x[1])[:args.top_n] + result_dict = {dim_key: sorted_items} output_file = os.path.join(OUTPUT_DIR, f'{output_prefix}_{dim_key}_{datetime.now().strftime("%Y%m%d")}.txt') if os.path.exists(output_file): save_readable_index( output_file, result_dict, name_mappings, - index_type=f'interest:{list_type}:{dim_key}', - logger=logger + description=f'interest:{list_type}:{dim_key}' ) # 生成全局索引(所有数据) -- libgit2 0.21.2