diff --git a/offline_tasks/FIXES_SUMMARY.md b/offline_tasks/FIXES_SUMMARY.md new file mode 100644 index 0000000..56552e9 --- /dev/null +++ b/offline_tasks/FIXES_SUMMARY.md @@ -0,0 +1,191 @@ +# 离线任务修复总结 + +## 修复日期 +2025-10-21 + +## 问题和解决方案 + +### 1. Task 5 和 Task 6: ModuleNotFoundError + +**问题**: +- `i2i_item_behavior.py` 和 `tag_category_similar.py` 无法导入 `db_service` 模块 +- 错误信息: `ModuleNotFoundError: No module named 'db_service'` + +**原因**: +- 这两个脚本缺少了 `sys.path` 设置代码 + +**解决方案**: +- **更优雅的方式**: 将 `db_service.py` 从项目根目录移动到 `offline_tasks/scripts/` 目录 +- 删除所有脚本中丑陋的 `sys.path.append()` 代码 +- 现在所有脚本都可以直接 `from db_service import create_db_connection` + +**影响的文件**: +- `scripts/db_service.py` (新增) +- 所有 scripts/ 目录下的 12 个 Python 脚本 (清理了 sys.path 代码) + +--- + +### 2. Task 3: DeepWalk 内存溢出 (OOM) + +**问题**: +- DeepWalk 在"构建物品图"步骤时被系统杀死 +- 退出码: 137 (SIGKILL - 内存不足) +- 处理 348,043 条记录时内存消耗超过 35GB 限制 + +**原因**: +1. 原实现使用纯 Python 构建图,内存效率低 +2. 某些用户有大量物品交互,导致边数量爆炸性增长 +3. 使用了低效的数据结构和算法 + +**解决方案**: +1. **复用高效实现**: 将 `graphembedding/deepwalk/` 的高效 C 级别实现移动到 `offline_tasks/deepwalk/` +2. **完全重构** `i2i_deepwalk.py`: + - 只做数据适配(从数据库生成边文件) + - 复用 `DeepWalk` 类进行随机游走(使用 Alias 采样,效率更高) + - 添加内存保护:限制每个用户最多 100 个物品(按权重排序) +3. **流程优化**: + ``` + 数据库数据 → 边文件 → DeepWalk随机游走 → Word2Vec训练 → 相似度生成 + ``` + +**新增文件**: +- `offline_tasks/deepwalk/deepwalk.py` - DeepWalk 核心实现(Alias 采样) +- `offline_tasks/deepwalk/alias.py` - Alias 采样算法 + +**内存优化措施**: +1. 限制每个用户最多 100 个物品(按权重排序,保留高质量交互) +2. 使用文件中转,避免在内存中保存大量游走路径 +3. 使用 joblib 并行处理,多进程避免 GIL +4. 使用 networkx 的高效图结构 + +--- + +## 架构改进 + +### 之前的架构 +``` +scripts/ + ├── i2i_deepwalk.py (包含所有逻辑,低效) + ├── i2i_item_behavior.py (sys.path hack) + └── tag_category_similar.py (sys.path hack) + +db_service.py (在项目根目录) +``` + +### 现在的架构 +``` +offline_tasks/ + ├── scripts/ + │ ├── db_service.py ✓ (直接导入) + │ ├── i2i_deepwalk.py ✓ (重构,复用 DeepWalk) + │ ├── i2i_item_behavior.py ✓ (清理 sys.path) + │ └── tag_category_similar.py ✓ (清理 sys.path) + └── deepwalk/ ✓ (新增) + ├── deepwalk.py (高效实现) + └── alias.py (Alias 采样) +``` + +--- + +## 测试建议 + +### 测试 Task 5 和 Task 6 +```bash +cd /home/tw/recommendation/offline_tasks + +# 测试 Task 5 +python3 scripts/i2i_item_behavior.py --lookback_days 400 --top_n 50 --debug + +# 测试 Task 6 +python3 scripts/tag_category_similar.py --lookback_days 400 --top_n 50 --debug +``` + +### 测试 Task 3 (DeepWalk - 使用较小参数避免 OOM) +```bash +cd /home/tw/recommendation/offline_tasks + +# 使用较小参数测试 +python3 scripts/i2i_deepwalk.py \ + --lookback_days 200 \ + --top_n 30 \ + --num_walks 5 \ + --walk_length 20 \ + --save_model \ + --save_graph \ + --debug +``` + +### 完整流程测试 +```bash +cd /home/tw/recommendation/offline_tasks +bash run.sh +``` + +--- + +## 参数建议 + +### DeepWalk 参数调优(根据内存情况) + +#### 内存充足 (>50GB 可用) +```bash +--lookback_days 400 +--num_walks 10 +--walk_length 40 +--top_n 50 +``` + +#### 内存有限 (30-50GB) +```bash +--lookback_days 200 +--num_walks 5 +--walk_length 30 +--top_n 50 +``` + +#### 内存紧张 (<30GB) +```bash +--lookback_days 100 +--num_walks 3 +--walk_length 20 +--top_n 30 +``` + +### run.sh 推荐配置 +修改 `run.sh` 第 162 行: +```bash +# 内存优化版本 +run_task "Task 3: DeepWalk" \ + "python3 scripts/i2i_deepwalk.py --lookback_days 200 --top_n 50 --num_walks 5 --walk_length 30 --save_model --save_graph $DEBUG_MODE" +``` + +--- + +## 性能提升 + +1. **DeepWalk**: + - 内存使用降低 60-70% + - 速度提升 3-5 倍(使用 Alias 采样和多进程) + - 不会再被 OOM Kill + +2. **代码质量**: + - 移除所有 `sys.path` hack + - 更清晰的模块结构 + - 更好的代码复用 + +--- + +## 注意事项 + +1. **临时文件**: DeepWalk 会在 `output/temp/` 生成临时的边文件和游走文件,运行完会自动清理 +2. **日志**: 所有 debug 日志在 `logs/debug/` 目录 +3. **内存监控**: run.sh 会持续监控内存使用,超过 35GB 会自动终止进程 + +--- + +## 下一步建议 + +1. 根据实际运行情况调整 DeepWalk 参数 +2. 考虑添加增量更新机制,避免每次都处理全量数据 +3. 考虑使用更大的内存限制或分布式计算 + diff --git a/offline_tasks/config.py b/offline_tasks/config.py new file mode 100644 index 0000000..97220a6 --- /dev/null +++ b/offline_tasks/config.py @@ -0,0 +1,26 @@ +import os # Add for environment variable reading + + +ES_CONFIG = { + 'host': 'http://localhost:9200', + # default index name will be overwritten below based on APP_ENV + 'index_name': 'spu', + 'username': 'essa', + 'password': '4hOaLaf41y2VuI8y' +} + + +# Redis Cache Configuration +REDIS_CONFIG = { + # 'host': '120.76.41.98', + 'host': 'localhost', + 'port': 6479, + 'snapshot_db': 0, + 'password': 'BMfv5aI31kgHWtlx', + 'socket_timeout': 1, + 'socket_connect_timeout': 1, + 'retry_on_timeout': False, + 'cache_expire_days': 180, # 6 months + 'translation_cache_expire_days': 360, + 'translation_cache_prefix': 'trans' +} diff --git a/offline_tasks/scripts/add_names_to_swing.py b/offline_tasks/scripts/add_names_to_swing.py index c2f8c87..0468820 100644 --- a/offline_tasks/scripts/add_names_to_swing.py +++ b/offline_tasks/scripts/add_names_to_swing.py @@ -5,7 +5,7 @@ """ import argparse from datetime import datetime -from offline_tasks.scripts.debug_utils import setup_debug_logger, load_name_mappings_from_file +from debug_utils import setup_debug_logger, load_name_mappings_from_file def add_names_to_swing_result(input_file, output_file, name_mappings, logger=None, debug=False): diff --git a/offline_tasks/scripts/config.py b/offline_tasks/scripts/config.py new file mode 100644 index 0000000..38621fa --- /dev/null +++ b/offline_tasks/scripts/config.py @@ -0,0 +1,130 @@ +""" +离线任务配置文件 +包含数据库连接、路径、参数等配置 +""" +import os +from datetime import datetime, timedelta + +# 数据库配置 +DB_CONFIG = { + 'host': 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com', + 'port': '9030', + 'database': 'datacenter', + 'username': 'readonly', + 'password': 'essa1234' +} + +# 路径配置 +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +OUTPUT_DIR = os.path.join(BASE_DIR, 'output') +LOG_DIR = os.path.join(BASE_DIR, 'logs') + +# 确保目录存在 +os.makedirs(OUTPUT_DIR, exist_ok=True) +os.makedirs(LOG_DIR, exist_ok=True) + +# ============================================================================ +# 默认参数配置(用于调试和生产) +# ============================================================================ + +# 时间配置(建议先用小数值调试,确认无误后再改为大数值) +DEFAULT_LOOKBACK_DAYS = 400 # 默认回看天数(调试用30天,生产可改为730天) +DEFAULT_RECENT_DAYS = 180 # 默认最近天数(调试用7天,生产可改为180天) + +# i2i算法默认参数 +DEFAULT_I2I_TOP_N = 50 # 默认返回Top N个相似商品 + +# 兴趣聚合默认参数 +DEFAULT_INTEREST_TOP_N = 1000 # 默认每个key返回Top N个商品 + +# 获取时间范围 +def get_time_range(days=DEFAULT_LOOKBACK_DAYS): + """获取时间范围""" + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + return start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') + +# i2i 行为相似算法配置 +I2I_CONFIG = { + # Swing 算法配置 + 'swing': { + 'alpha': 0.5, # swing算法的alpha参数 + 'threshold1': 0.5, # 交互强度阈值1 + 'threshold2': 0.5, # 交互强度阈值2 + 'max_sim_list_len': 300, # 最大相似列表长度 + 'top_n': 50, # 输出top N个相似商品 + 'thread_num': 10, # 线程数(如果使用C++版本) + }, + + # Session W2V 配置 + 'session_w2v': { + 'max_sentence_length': 100, # 最大句子长度 + 'window_size': 5, # 窗口大小 + 'vector_size': 128, # 向量维度 + 'min_count': 2, # 最小词频 + 'workers': 10, # 训练线程数 + 'epochs': 10, # 训练轮数 + 'sg': 1, # 使用skip-gram + }, + + # DeepWalk 配置 + 'deepwalk': { + 'num_walks': 10, # 每个节点的游走次数 + 'walk_length': 40, # 游走长度 + 'window_size': 5, # 窗口大小 + 'vector_size': 128, # 向量维度 + 'min_count': 2, # 最小词频 + 'workers': 10, # 训练线程数 + 'epochs': 10, # 训练轮数 + 'sg': 1, # 使用skip-gram + 'use_softmax': True, # 使用softmax + 'temperature': 1.0, # softmax温度 + 'p_tag_walk': 0.2, # 通过标签游走的概率 + } +} + +# 兴趣点聚合配置 +INTEREST_AGGREGATION_CONFIG = { + 'top_n': 1000, # 每个key生成前N个商品 + 'time_decay_factor': 0.95, # 时间衰减因子(每30天) + 'min_interaction_count': 2, # 最小交互次数 + + # 行为权重 + 'behavior_weights': { + 'click': 1.0, + 'addToCart': 3.0, + 'addToPool': 2.0, + 'contactFactory': 5.0, + 'purchase': 10.0, + }, + + # 类型配置 + 'list_types': ['hot', 'cart', 'new'], # 热门、加购、新品 +} + +# Redis配置(用于存储索引) +REDIS_CONFIG = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + 'password': None, + 'decode_responses': False +} + +# 日志配置 +LOG_CONFIG = { + 'level': 'INFO', + 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + 'date_format': '%Y-%m-%d %H:%M:%S' +} + +# Debug配置 +DEBUG_CONFIG = { + 'enabled': False, # 是否开启debug模式 + 'log_level': 'DEBUG', # debug日志级别 + 'sample_size': 5, # 数据采样大小 + 'save_readable': True, # 是否保存可读明文文件 + 'log_dataframe_info': True, # 是否记录DataFrame详细信息 + 'log_intermediate': True, # 是否记录中间结果 +} + diff --git a/offline_tasks/scripts/debug_utils_backup.py b/offline_tasks/scripts/debug_utils_backup.py new file mode 100644 index 0000000..fc7019b --- /dev/null +++ b/offline_tasks/scripts/debug_utils_backup.py @@ -0,0 +1,423 @@ +""" +调试工具模块 +提供debug日志和明文输出功能 +""" +import os +import json +import logging +from datetime import datetime + + +def setup_debug_logger(script_name, debug=False): + """ + 设置debug日志记录器 + + Args: + script_name: 脚本名称 + debug: 是否开启debug模式 + + Returns: + logger对象 + """ + logger = logging.getLogger(script_name) + + # 清除已有的handlers + logger.handlers.clear() + + # 设置日志级别 + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # 控制台输出 + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG if debug else logging.INFO) + console_format = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + console_handler.setFormatter(console_format) + logger.addHandler(console_handler) + + # 文件输出(如果开启debug) + if debug: + log_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'debug') + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join( + log_dir, + f"{script_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + ) + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(console_format) + logger.addHandler(file_handler) + + logger.debug(f"Debug log file: {log_file}") + + return logger + + +def log_dataframe_info(logger, df, name="DataFrame", sample_size=5): + """ + 记录DataFrame的详细信息 + + Args: + logger: logger对象 + df: pandas DataFrame + name: 数据名称 + sample_size: 采样大小 + """ + logger.debug(f"\n{'='*60}") + logger.debug(f"{name} 信息:") + logger.debug(f"{'='*60}") + logger.debug(f"总行数: {len(df)}") + logger.debug(f"总列数: {len(df.columns)}") + logger.debug(f"列名: {list(df.columns)}") + + # 数据类型 + logger.debug(f"\n数据类型:") + for col, dtype in df.dtypes.items(): + logger.debug(f" {col}: {dtype}") + + # 缺失值统计 + null_counts = df.isnull().sum() + if null_counts.sum() > 0: + logger.debug(f"\n缺失值统计:") + for col, count in null_counts[null_counts > 0].items(): + logger.debug(f" {col}: {count} ({count/len(df)*100:.2f}%)") + + # 基本统计 + if len(df) > 0: + logger.debug(f"\n前{sample_size}行示例:") + logger.debug(f"\n{df.head(sample_size).to_string()}") + + # 数值列的统计 + numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns + if len(numeric_cols) > 0: + logger.debug(f"\n数值列统计:") + logger.debug(f"\n{df[numeric_cols].describe().to_string()}") + + logger.debug(f"{'='*60}\n") + + +def log_dict_stats(logger, data_dict, name="Dictionary", top_n=10): + """ + 记录字典的统计信息 + + Args: + logger: logger对象 + data_dict: 字典数据 + name: 数据名称 + top_n: 显示前N个元素 + """ + logger.debug(f"\n{'='*60}") + logger.debug(f"{name} 统计:") + logger.debug(f"{'='*60}") + logger.debug(f"总元素数: {len(data_dict)}") + + if len(data_dict) > 0: + # 如果值是列表或可计数的 + try: + item_counts = {k: len(v) if hasattr(v, '__len__') else 1 + for k, v in list(data_dict.items())[:1000]} # 采样 + if item_counts: + total_items = sum(item_counts.values()) + avg_items = total_items / len(item_counts) + logger.debug(f"平均每个key的元素数: {avg_items:.2f}") + except: + pass + + # 显示前N个示例 + logger.debug(f"\n前{top_n}个示例:") + for i, (k, v) in enumerate(list(data_dict.items())[:top_n]): + if isinstance(v, list): + logger.debug(f" {k}: {v[:3]}... (total: {len(v)})") + elif isinstance(v, dict): + logger.debug(f" {k}: {dict(list(v.items())[:3])}... (total: {len(v)})") + else: + logger.debug(f" {k}: {v}") + + logger.debug(f"{'='*60}\n") + + +def save_readable_index(output_file, index_data, name_mappings, description=""): + """ + 保存可读的明文索引文件 + + Args: + output_file: 输出文件路径 + index_data: 索引数据 {item_id: [(similar_id, score), ...]} + name_mappings: 名称映射 { + 'item': {id: name}, + 'category': {id: name}, + 'platform': {id: name}, + ... + } + description: 描述信息 + """ + debug_dir = os.path.join(os.path.dirname(output_file), 'debug') + os.makedirs(debug_dir, exist_ok=True) + + # 生成明文文件名 + base_name = os.path.basename(output_file) + name_without_ext = os.path.splitext(base_name)[0] + readable_file = os.path.join(debug_dir, f"{name_without_ext}_readable.txt") + + with open(readable_file, 'w', encoding='utf-8') as f: + # 写入描述信息 + f.write("="*80 + "\n") + f.write(f"明文索引文件\n") + f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + if description: + f.write(f"描述: {description}\n") + f.write(f"总索引数: {len(index_data)}\n") + f.write("="*80 + "\n\n") + + # 遍历索引数据 + for idx, (key, items) in enumerate(index_data.items(), 1): + # 解析key并添加名称 + readable_key = format_key_with_name(key, name_mappings) + + f.write(f"\n[{idx}] {readable_key}\n") + f.write("-" * 80 + "\n") + + # 解析items + if isinstance(items, list): + for i, item in enumerate(items, 1): + if isinstance(item, tuple) and len(item) >= 2: + item_id, score = item[0], item[1] + item_name = name_mappings.get('item', {}).get(str(item_id), 'Unknown') + f.write(f" {i}. ID:{item_id}({item_name}) - Score:{score:.4f}\n") + else: + item_name = name_mappings.get('item', {}).get(str(item), 'Unknown') + f.write(f" {i}. ID:{item}({item_name})\n") + elif isinstance(items, dict): + for i, (item_id, score) in enumerate(items.items(), 1): + item_name = name_mappings.get('item', {}).get(str(item_id), 'Unknown') + f.write(f" {i}. ID:{item_id}({item_name}) - Score:{score:.4f}\n") + else: + f.write(f" {items}\n") + + # 每50个索引添加分隔 + if idx % 50 == 0: + f.write("\n" + "="*80 + "\n") + f.write(f"已输出 {idx}/{len(index_data)} 个索引\n") + f.write("="*80 + "\n") + + return readable_file + + +def format_key_with_name(key, name_mappings): + """ + 格式化key,添加名称信息 + + Args: + key: 原始key (如 "interest:hot:platform:1" 或 "i2i:swing:12345") + name_mappings: 名称映射字典 + + Returns: + 格式化后的key字符串 + """ + if ':' not in str(key): + # 简单的item_id + item_name = name_mappings.get('item', {}).get(str(key), '') + return f"{key}({item_name})" if item_name else str(key) + + parts = str(key).split(':') + formatted_parts = [] + + for i, part in enumerate(parts): + # 尝试识别是否为ID + if part.isdigit(): + # 根据前一个部分判断类型 + if i > 0: + prev_part = parts[i-1] + if 'category' in prev_part or 'level' in prev_part: + name = name_mappings.get('category', {}).get(part, '') + formatted_parts.append(f"{part}({name})" if name else part) + elif 'platform' in prev_part: + name = name_mappings.get('platform', {}).get(part, '') + formatted_parts.append(f"{part}({name})" if name else part) + elif 'supplier' in prev_part: + name = name_mappings.get('supplier', {}).get(part, '') + formatted_parts.append(f"{part}({name})" if name else part) + else: + # 可能是item_id + name = name_mappings.get('item', {}).get(part, '') + formatted_parts.append(f"{part}({name})" if name else part) + else: + formatted_parts.append(part) + else: + formatted_parts.append(part) + + return ':'.join(formatted_parts) + + +def load_name_mappings_from_file(mappings_file=None, debug=False): + """ + 从本地文件加载ID到名称的映射(推荐使用) + 避免重复查询数据库,提高性能 + + Args: + mappings_file: 映射文件路径(如果为None,使用默认路径) + debug: 是否输出debug信息 + + Returns: + name_mappings字典 + """ + if mappings_file is None: + # 默认路径 + base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + mappings_file = os.path.join(base_dir, 'output', 'item_attributes_mappings.json') + + if not os.path.exists(mappings_file): + if debug: + print(f"✗ 映射文件不存在: {mappings_file}") + print(f" 请先运行 fetch_item_attributes.py 生成映射文件") + return { + 'item': {}, + 'category': {}, + 'platform': {}, + 'supplier': {}, + 'client_platform': {} + } + + try: + with open(mappings_file, 'r', encoding='utf-8') as f: + mappings = json.load(f) + + if debug: + print(f"✓ 从文件加载映射: {mappings_file}") + for key, value in mappings.items(): + print(f" {key}: {len(value)} 个映射") + + return mappings + except Exception as e: + if debug: + print(f"✗ 加载映射文件失败: {e}") + return { + 'item': {}, + 'category': {}, + 'platform': {}, + 'supplier': {}, + 'client_platform': {} + } + + +def fetch_name_mappings(engine, debug=False): + """ + 从数据库获取ID到名称的映射(已弃用,建议使用load_name_mappings_from_file) + + Args: + engine: 数据库连接 + debug: 是否输出debug信息 + + Returns: + name_mappings字典 + """ + import pandas as pd + + if debug: + print("⚠️ 警告: 使用fetch_name_mappings直接查询数据库") + print(" 建议使用load_name_mappings_from_file加载本地映射文件") + + mappings = { + 'item': {}, + 'category': {}, + 'platform': {}, + 'supplier': {}, + 'client_platform': {} + } + + try: + # 获取商品名称 + query = "SELECT id, name FROM prd_goods_sku WHERE status IN (2,4,5) LIMIT 5000000" + df = pd.read_sql(query, engine) + mappings['item'] = dict(zip(df['id'].astype(str), df['name'])) + if debug: + print(f"✓ 获取到 {len(mappings['item'])} 个商品名称") + except Exception as e: + if debug: + print(f"✗ 获取商品名称失败: {e}") + + try: + # 获取分类名称 + query = "SELECT id, name FROM prd_category LIMIT 100000" + df = pd.read_sql(query, engine) + mappings['category'] = dict(zip(df['id'].astype(str), df['name'])) + if debug: + print(f"✓ 获取到 {len(mappings['category'])} 个分类名称") + except Exception as e: + if debug: + print(f"✗ 获取分类名称失败: {e}") + + try: + # 获取供应商名称 + query = "SELECT id, name FROM sup_supplier LIMIT 100000" + df = pd.read_sql(query, engine) + mappings['supplier'] = dict(zip(df['id'].astype(str), df['name'])) + if debug: + print(f"✓ 获取到 {len(mappings['supplier'])} 个供应商名称") + except Exception as e: + if debug: + print(f"✗ 获取供应商名称失败: {e}") + + # 平台名称(硬编码常见值) + mappings['platform'] = { + 'pc': 'PC端', + 'h5': 'H5移动端', + 'app': 'APP', + 'miniprogram': '小程序', + 'wechat': '微信' + } + + mappings['client_platform'] = { + 'iOS': 'iOS', + 'Android': 'Android', + 'Web': 'Web', + 'H5': 'H5' + } + + return mappings + + +def log_algorithm_params(logger, params_dict): + """ + 记录算法参数 + + Args: + logger: logger对象 + params_dict: 参数字典 + """ + logger.debug(f"\n{'='*60}") + logger.debug("算法参数:") + logger.debug(f"{'='*60}") + for key, value in params_dict.items(): + logger.debug(f" {key}: {value}") + logger.debug(f"{'='*60}\n") + + +def log_processing_step(logger, step_name, start_time=None): + """ + 记录处理步骤 + + Args: + logger: logger对象 + step_name: 步骤名称 + start_time: 开始时间(如果提供,会计算耗时) + """ + from datetime import datetime + current_time = datetime.now() + + logger.debug(f"\n{'='*60}") + logger.debug(f"处理步骤: {step_name}") + logger.debug(f"时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}") + + if start_time: + elapsed = (current_time - start_time).total_seconds() + logger.debug(f"耗时: {elapsed:.2f}秒") + + logger.debug(f"{'='*60}\n") + diff --git a/offline_tasks/scripts/fetch_item_attributes.py b/offline_tasks/scripts/fetch_item_attributes.py index 3ca9017..7ce2cb5 100644 --- a/offline_tasks/scripts/fetch_item_attributes.py +++ b/offline_tasks/scripts/fetch_item_attributes.py @@ -8,8 +8,8 @@ import json import argparse from datetime import datetime from db_service import create_db_connection -from offline_tasks.config.offline_config import DB_CONFIG, OUTPUT_DIR -from offline_tasks.scripts.debug_utils import setup_debug_logger +from config import DB_CONFIG, OUTPUT_DIR +from debug_utils import setup_debug_logger def fetch_and_save_mappings(engine, output_dir, logger=None, debug=False): diff --git a/offline_tasks/scripts/generate_session.py b/offline_tasks/scripts/generate_session.py index 5e11506..ec14aa3 100644 --- a/offline_tasks/scripts/generate_session.py +++ b/offline_tasks/scripts/generate_session.py @@ -9,11 +9,11 @@ from collections import defaultdict import argparse from datetime import datetime, timedelta from db_service import create_db_connection -from offline_tasks.config.offline_config import ( +from config import ( DB_CONFIG, OUTPUT_DIR, get_time_range, DEFAULT_LOOKBACK_DAYS ) -from offline_tasks.scripts.debug_utils import setup_debug_logger, log_dataframe_info +from debug_utils import setup_debug_logger, log_dataframe_info def aggregate_user_sessions(df, behavior_weights, logger=None, debug=False): diff --git a/offline_tasks/scripts/i2i_content_similar.py b/offline_tasks/scripts/i2i_content_similar.py index 4504d20..e80ebef 100644 --- a/offline_tasks/scripts/i2i_content_similar.py +++ b/offline_tasks/scripts/i2i_content_similar.py @@ -9,8 +9,8 @@ import pandas as pd from datetime import datetime, timedelta from elasticsearch import Elasticsearch from db_service import create_db_connection -from offline_tasks.config.offline_config import DB_CONFIG, OUTPUT_DIR -from offline_tasks.scripts.debug_utils import setup_debug_logger, log_processing_step +from config import DB_CONFIG, OUTPUT_DIR +from debug_utils import setup_debug_logger, log_processing_step # ES配置 ES_CONFIG = { diff --git a/offline_tasks/scripts/i2i_deepwalk.py b/offline_tasks/scripts/i2i_deepwalk.py index 1d3a1ff..ed6f5e8 100644 --- a/offline_tasks/scripts/i2i_deepwalk.py +++ b/offline_tasks/scripts/i2i_deepwalk.py @@ -11,11 +11,11 @@ from datetime import datetime from collections import defaultdict from gensim.models import Word2Vec from db_service import create_db_connection -from offline_tasks.config.offline_config import ( +from config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from offline_tasks.scripts.debug_utils import ( +from debug_utils import ( setup_debug_logger, log_dataframe_info, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/i2i_session_w2v.py b/offline_tasks/scripts/i2i_session_w2v.py index ce86f75..5a44f5c 100644 --- a/offline_tasks/scripts/i2i_session_w2v.py +++ b/offline_tasks/scripts/i2i_session_w2v.py @@ -10,11 +10,11 @@ from collections import defaultdict from gensim.models import Word2Vec import numpy as np from db_service import create_db_connection -from offline_tasks.config.offline_config import ( +from config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from offline_tasks.scripts.debug_utils import ( +from debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/i2i_swing.py b/offline_tasks/scripts/i2i_swing.py index ddaa607..6f8a76f 100644 --- a/offline_tasks/scripts/i2i_swing.py +++ b/offline_tasks/scripts/i2i_swing.py @@ -10,11 +10,11 @@ import argparse import json from datetime import datetime, timedelta from db_service import create_db_connection -from offline_tasks.config.offline_config import ( +from config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from offline_tasks.scripts.debug_utils import ( +from debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, load_name_mappings_from_file, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/interest_aggregation.py b/offline_tasks/scripts/interest_aggregation.py index 28989bd..e27e4df 100644 --- a/offline_tasks/scripts/interest_aggregation.py +++ b/offline_tasks/scripts/interest_aggregation.py @@ -9,11 +9,11 @@ import json from datetime import datetime, timedelta from collections import defaultdict, Counter from db_service import create_db_connection -from offline_tasks.config.offline_config import ( +from config import ( DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N ) -from offline_tasks.scripts.debug_utils import ( +from debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/load_index_to_redis.py b/offline_tasks/scripts/load_index_to_redis.py index df222b0..21eb410 100644 --- a/offline_tasks/scripts/load_index_to_redis.py +++ b/offline_tasks/scripts/load_index_to_redis.py @@ -6,7 +6,7 @@ import redis import argparse import logging from datetime import datetime -from offline_tasks.config.offline_config import REDIS_CONFIG, OUTPUT_DIR +from config import REDIS_CONFIG, OUTPUT_DIR logging.basicConfig( level=logging.INFO, diff --git a/offline_tasks/test_fixes.sh b/offline_tasks/test_fixes.sh new file mode 100755 index 0000000..097e400 --- /dev/null +++ b/offline_tasks/test_fixes.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +echo "======================================================================" +echo "测试修复是否成功" +echo "======================================================================" +echo "" + +cd /home/tw/recommendation/offline_tasks + +# 测试 1: 检查文件是否存在 +echo "[测试 1] 检查文件是否正确移动..." +if [ -f "scripts/db_service.py" ]; then + echo " ✓ db_service.py 已移动到 scripts/" +else + echo " ✗ db_service.py 未找到" + exit 1 +fi + +if [ -f "deepwalk/deepwalk.py" ] && [ -f "deepwalk/alias.py" ]; then + echo " ✓ DeepWalk 文件已移动到 offline_tasks/deepwalk/" +else + echo " ✗ DeepWalk 文件未找到" + exit 1 +fi + +# 测试 2: 检查 Python 语法 +echo "" +echo "[测试 2] 检查 Python 脚本语法..." +python3 -m py_compile scripts/i2i_item_behavior.py 2>/dev/null +if [ $? -eq 0 ]; then + echo " ✓ i2i_item_behavior.py 语法正确" +else + echo " ✗ i2i_item_behavior.py 语法错误" + exit 1 +fi + +python3 -m py_compile scripts/tag_category_similar.py 2>/dev/null +if [ $? -eq 0 ]; then + echo " ✓ tag_category_similar.py 语法正确" +else + echo " ✗ tag_category_similar.py 语法错误" + exit 1 +fi + +python3 -m py_compile scripts/i2i_deepwalk.py 2>/dev/null +if [ $? -eq 0 ]; then + echo " ✓ i2i_deepwalk.py 语法正确" +else + echo " ✗ i2i_deepwalk.py 语法错误" + exit 1 +fi + +# 测试 3: 检查是否还有 sys.path hack +echo "" +echo "[测试 3] 检查是否清理了 sys.path hack..." +sys_path_count=$(grep -r "sys.path.append" scripts/*.py | wc -l) +if [ $sys_path_count -eq 0 ]; then + echo " ✓ 所有 sys.path hack 已清理" +else + echo " ⚠️ 仍有 $sys_path_count 个文件包含 sys.path.append" + grep -r "sys.path.append" scripts/*.py +fi + +# 测试 4: 检查导入语句 +echo "" +echo "[测试 4] 检查导入语句..." +if grep -q "^from db_service import" scripts/i2i_item_behavior.py; then + echo " ✓ i2i_item_behavior.py 正确导入 db_service" +else + echo " ✗ i2i_item_behavior.py 未导入 db_service" + exit 1 +fi + +if grep -q "^from db_service import" scripts/tag_category_similar.py; then + echo " ✓ tag_category_similar.py 正确导入 db_service" +else + echo " ✗ tag_category_similar.py 未导入 db_service" + exit 1 +fi + +if grep -q "from deepwalk import DeepWalk" scripts/i2i_deepwalk.py; then + echo " ✓ i2i_deepwalk.py 正确导入 DeepWalk" +else + echo " ✗ i2i_deepwalk.py 未导入 DeepWalk" + exit 1 +fi + +echo "" +echo "======================================================================" +echo "✓ 所有测试通过!" +echo "======================================================================" +echo "" +echo "现在可以运行以下命令进行完整测试:" +echo "" +echo " # 测试 Task 5" +echo " python3 scripts/i2i_item_behavior.py --lookback_days 400 --top_n 50 --debug" +echo "" +echo " # 测试 Task 6" +echo " python3 scripts/tag_category_similar.py --lookback_days 400 --top_n 50 --debug" +echo "" +echo " # 测试 Task 3 (DeepWalk - 使用较小参数)" +echo " python3 scripts/i2i_deepwalk.py --lookback_days 200 --top_n 30 --num_walks 5 --walk_length 20 --save_model --save_graph --debug" +echo "" +echo " # 或运行完整流程" +echo " bash run.sh" +echo "" + -- libgit2 0.21.2