""" 离线任务统一调度脚本 按顺序运行所有离线任务,生成推荐系统所需的各种索引 """ import os import sys import subprocess import argparse import logging from datetime import datetime # 添加父目录到路径以导入配置 parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, parent_dir) from offline_tasks.config.offline_config import ( DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N, DEFAULT_INTEREST_TOP_N ) # 设置日志 LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(LOG_DIR, f'run_all_{datetime.now().strftime("%Y%m%d")}.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 脚本目录 SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts') def run_script(script_name, args=None): """ 运行Python脚本 Args: script_name: 脚本名称 args: 命令行参数列表 Returns: bool: 是否成功 """ script_path = os.path.join(SCRIPTS_DIR, script_name) if not os.path.exists(script_path): logger.error(f"Script not found: {script_path}") return False cmd = [sys.executable, script_path] if args: cmd.extend(args) logger.info(f"Running: {' '.join(cmd)}") try: result = subprocess.run( cmd, check=True, capture_output=True, text=True ) logger.info(f"Script {script_name} completed successfully") logger.debug(result.stdout) return True except subprocess.CalledProcessError as e: logger.error(f"Script {script_name} failed with return code {e.returncode}") logger.error(f"Error output: {e.stderr}") return False except Exception as e: logger.error(f"Unexpected error running {script_name}: {e}") return False def main(): parser = argparse.ArgumentParser(description='Run all offline recommendation tasks') parser.add_argument('--debug', action='store_true', help='Enable debug mode for all tasks (detailed logs + readable output files)') args = parser.parse_args() logger.info("="*80) logger.info("Starting offline recommendation tasks") if args.debug: logger.info("🐛 DEBUG MODE ENABLED - 详细日志 + 明文输出") logger.info("="*80) success_count = 0 total_count = 0 # i2i 行为相似任务 logger.info("\n" + "="*80) logger.info("Task 1: Running Swing algorithm for i2i similarity") logger.info("="*80) total_count += 1 script_args = [ '--lookback_days', str(DEFAULT_LOOKBACK_DAYS), '--top_n', str(DEFAULT_I2I_TOP_N), '--time_decay' ] if args.debug: script_args.append('--debug') if run_script('i2i_swing.py', script_args): success_count += 1 # 2. Session W2V logger.info("\n" + "="*80) logger.info("Task 2: Running Session Word2Vec for i2i similarity") logger.info("="*80) total_count += 1 script_args = [ '--lookback_days', str(DEFAULT_LOOKBACK_DAYS), '--top_n', str(DEFAULT_I2I_TOP_N), '--save_model' ] if args.debug: script_args.append('--debug') if run_script('i2i_session_w2v.py', script_args): success_count += 1 # 3. DeepWalk logger.info("\n" + "="*80) logger.info("Task 3: Running DeepWalk for i2i similarity") logger.info("="*80) total_count += 1 script_args = [ '--lookback_days', str(DEFAULT_LOOKBACK_DAYS), '--top_n', str(DEFAULT_I2I_TOP_N), '--save_model', '--save_graph' ] if args.debug: script_args.append('--debug') if run_script('i2i_deepwalk.py', script_args): success_count += 1 # 4. Content-based similarity (ES vectors) logger.info("\n" + "="*80) logger.info("Task 4: Running Content-based similarity (ES vectors)") logger.info("="*80) total_count += 1 if run_script('i2i_content_similar.py', []): success_count += 1 # 5. 兴趣点聚合任务 logger.info("\n" + "="*80) logger.info("Task 5: Running interest aggregation") logger.info("="*80) total_count += 1 script_args = [ '--lookback_days', str(DEFAULT_LOOKBACK_DAYS), '--top_n', str(DEFAULT_INTEREST_TOP_N) ] if args.debug: script_args.append('--debug') if run_script('interest_aggregation.py', script_args): success_count += 1 # 总结 logger.info("\n" + "="*80) logger.info(f"All tasks completed: {success_count}/{total_count} succeeded") logger.info("="*80) if success_count == total_count: logger.info("✓ All tasks completed successfully!") return 0 else: logger.warning(f"✗ {total_count - success_count} task(s) failed") return 1 if __name__ == '__main__': sys.exit(main())