diff --git a/offline_tasks/config.py b/offline_tasks/config.py deleted file mode 100644 index 97220a6..0000000 --- a/offline_tasks/config.py +++ /dev/null @@ -1,26 +0,0 @@ -import os # Add for environment variable reading - - -ES_CONFIG = { - 'host': 'http://localhost:9200', - # default index name will be overwritten below based on APP_ENV - 'index_name': 'spu', - 'username': 'essa', - 'password': '4hOaLaf41y2VuI8y' -} - - -# Redis Cache Configuration -REDIS_CONFIG = { - # 'host': '120.76.41.98', - 'host': 'localhost', - 'port': 6479, - 'snapshot_db': 0, - 'password': 'BMfv5aI31kgHWtlx', - 'socket_timeout': 1, - 'socket_connect_timeout': 1, - 'retry_on_timeout': False, - 'cache_expire_days': 180, # 6 months - 'translation_cache_expire_days': 360, - 'translation_cache_prefix': 'trans' -} diff --git a/offline_tasks/db_service.py b/offline_tasks/db_service.py new file mode 100644 index 0000000..19b7619 --- /dev/null +++ b/offline_tasks/db_service.py @@ -0,0 +1,48 @@ +""" +数据库连接服务模块 +提供统一的数据库连接接口 +""" +from sqlalchemy import create_engine +from urllib.parse import quote_plus +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_db_connection(host, port, database, username, password): + """ + 创建数据库连接 + + Args: + host: 数据库主机地址 + port: 端口 + database: 数据库名 + username: 用户名 + password: 密码 + + Returns: + SQLAlchemy engine对象 + """ + try: + # 对密码进行URL编码,处理特殊字符 + encoded_password = quote_plus(password) + + # 构建连接字符串 + connection_string = f'mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}' + + # 创建引擎 + engine = create_engine( + connection_string, + pool_pre_ping=True, # 连接池预检 + pool_recycle=3600, # 连接回收时间 + echo=False + ) + + logger.info(f"Database connection created successfully: {host}:{port}/{database}") + return engine + + except Exception as e: + logger.error(f"Failed to create database connection: {e}") + raise + diff --git a/offline_tasks/scripts/add_names_to_swing.py b/offline_tasks/scripts/add_names_to_swing.py index 0468820..1b103bb 100644 --- a/offline_tasks/scripts/add_names_to_swing.py +++ b/offline_tasks/scripts/add_names_to_swing.py @@ -5,7 +5,7 @@ """ import argparse from datetime import datetime -from debug_utils import setup_debug_logger, load_name_mappings_from_file +from scripts.debug_utils import setup_debug_logger, load_name_mappings_from_file def add_names_to_swing_result(input_file, output_file, name_mappings, logger=None, debug=False): diff --git a/offline_tasks/scripts/config.py b/offline_tasks/scripts/config.py deleted file mode 100644 index 38621fa..0000000 --- a/offline_tasks/scripts/config.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -离线任务配置文件 -包含数据库连接、路径、参数等配置 -""" -import os -from datetime import datetime, timedelta - -# 数据库配置 -DB_CONFIG = { - 'host': 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com', - 'port': '9030', - 'database': 'datacenter', - 'username': 'readonly', - 'password': 'essa1234' -} - -# 路径配置 -BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -OUTPUT_DIR = os.path.join(BASE_DIR, 'output') -LOG_DIR = os.path.join(BASE_DIR, 'logs') - -# 确保目录存在 -os.makedirs(OUTPUT_DIR, exist_ok=True) -os.makedirs(LOG_DIR, exist_ok=True) - -# ============================================================================ -# 默认参数配置(用于调试和生产) -# ============================================================================ - -# 时间配置(建议先用小数值调试,确认无误后再改为大数值) -DEFAULT_LOOKBACK_DAYS = 400 # 默认回看天数(调试用30天,生产可改为730天) -DEFAULT_RECENT_DAYS = 180 # 默认最近天数(调试用7天,生产可改为180天) - -# i2i算法默认参数 -DEFAULT_I2I_TOP_N = 50 # 默认返回Top N个相似商品 - -# 兴趣聚合默认参数 -DEFAULT_INTEREST_TOP_N = 1000 # 默认每个key返回Top N个商品 - -# 获取时间范围 -def get_time_range(days=DEFAULT_LOOKBACK_DAYS): - """获取时间范围""" - end_date = datetime.now() - start_date = end_date - timedelta(days=days) - return start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') - -# i2i 行为相似算法配置 -I2I_CONFIG = { - # Swing 算法配置 - 'swing': { - 'alpha': 0.5, # swing算法的alpha参数 - 'threshold1': 0.5, # 交互强度阈值1 - 'threshold2': 0.5, # 交互强度阈值2 - 'max_sim_list_len': 300, # 最大相似列表长度 - 'top_n': 50, # 输出top N个相似商品 - 'thread_num': 10, # 线程数(如果使用C++版本) - }, - - # Session W2V 配置 - 'session_w2v': { - 'max_sentence_length': 100, # 最大句子长度 - 'window_size': 5, # 窗口大小 - 'vector_size': 128, # 向量维度 - 'min_count': 2, # 最小词频 - 'workers': 10, # 训练线程数 - 'epochs': 10, # 训练轮数 - 'sg': 1, # 使用skip-gram - }, - - # DeepWalk 配置 - 'deepwalk': { - 'num_walks': 10, # 每个节点的游走次数 - 'walk_length': 40, # 游走长度 - 'window_size': 5, # 窗口大小 - 'vector_size': 128, # 向量维度 - 'min_count': 2, # 最小词频 - 'workers': 10, # 训练线程数 - 'epochs': 10, # 训练轮数 - 'sg': 1, # 使用skip-gram - 'use_softmax': True, # 使用softmax - 'temperature': 1.0, # softmax温度 - 'p_tag_walk': 0.2, # 通过标签游走的概率 - } -} - -# 兴趣点聚合配置 -INTEREST_AGGREGATION_CONFIG = { - 'top_n': 1000, # 每个key生成前N个商品 - 'time_decay_factor': 0.95, # 时间衰减因子(每30天) - 'min_interaction_count': 2, # 最小交互次数 - - # 行为权重 - 'behavior_weights': { - 'click': 1.0, - 'addToCart': 3.0, - 'addToPool': 2.0, - 'contactFactory': 5.0, - 'purchase': 10.0, - }, - - # 类型配置 - 'list_types': ['hot', 'cart', 'new'], # 热门、加购、新品 -} - -# Redis配置(用于存储索引) -REDIS_CONFIG = { - 'host': 'localhost', - 'port': 6379, - 'db': 0, - 'password': None, - 'decode_responses': False -} - -# 日志配置 -LOG_CONFIG = { - 'level': 'INFO', - 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', - 'date_format': '%Y-%m-%d %H:%M:%S' -} - -# Debug配置 -DEBUG_CONFIG = { - 'enabled': False, # 是否开启debug模式 - 'log_level': 'DEBUG', # debug日志级别 - 'sample_size': 5, # 数据采样大小 - 'save_readable': True, # 是否保存可读明文文件 - 'log_dataframe_info': True, # 是否记录DataFrame详细信息 - 'log_intermediate': True, # 是否记录中间结果 -} - diff --git a/offline_tasks/scripts/db_service.py b/offline_tasks/scripts/db_service.py deleted file mode 100644 index 19b7619..0000000 --- a/offline_tasks/scripts/db_service.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -数据库连接服务模块 -提供统一的数据库连接接口 -""" -from sqlalchemy import create_engine -from urllib.parse import quote_plus -import logging - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def create_db_connection(host, port, database, username, password): - """ - 创建数据库连接 - - Args: - host: 数据库主机地址 - port: 端口 - database: 数据库名 - username: 用户名 - password: 密码 - - Returns: - SQLAlchemy engine对象 - """ - try: - # 对密码进行URL编码,处理特殊字符 - encoded_password = quote_plus(password) - - # 构建连接字符串 - connection_string = f'mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}' - - # 创建引擎 - engine = create_engine( - connection_string, - pool_pre_ping=True, # 连接池预检 - pool_recycle=3600, # 连接回收时间 - echo=False - ) - - logger.info(f"Database connection created successfully: {host}:{port}/{database}") - return engine - - except Exception as e: - logger.error(f"Failed to create database connection: {e}") - raise - diff --git a/offline_tasks/scripts/fetch_item_attributes.py b/offline_tasks/scripts/fetch_item_attributes.py index 7ce2cb5..d6c7043 100644 --- a/offline_tasks/scripts/fetch_item_attributes.py +++ b/offline_tasks/scripts/fetch_item_attributes.py @@ -8,8 +8,8 @@ import json import argparse from datetime import datetime from db_service import create_db_connection -from config import DB_CONFIG, OUTPUT_DIR -from debug_utils import setup_debug_logger +from config.offline_config import DB_CONFIG, OUTPUT_DIR +from scripts.debug_utils import setup_debug_logger def fetch_and_save_mappings(engine, output_dir, logger=None, debug=False): diff --git a/offline_tasks/scripts/generate_session.py b/offline_tasks/scripts/generate_session.py index ec14aa3..8ff8319 100644 --- a/offline_tasks/scripts/generate_session.py +++ b/offline_tasks/scripts/generate_session.py @@ -9,11 +9,11 @@ from collections import defaultdict import argparse from datetime import datetime, timedelta from db_service import create_db_connection -from config import ( +from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, get_time_range, DEFAULT_LOOKBACK_DAYS ) -from debug_utils import setup_debug_logger, log_dataframe_info +from scripts.debug_utils import setup_debug_logger, log_dataframe_info def aggregate_user_sessions(df, behavior_weights, logger=None, debug=False): diff --git a/offline_tasks/scripts/i2i_content_similar.py b/offline_tasks/scripts/i2i_content_similar.py index e80ebef..415da0c 100644 --- a/offline_tasks/scripts/i2i_content_similar.py +++ b/offline_tasks/scripts/i2i_content_similar.py @@ -9,8 +9,8 @@ import pandas as pd from datetime import datetime, timedelta from elasticsearch import Elasticsearch from db_service import create_db_connection -from config import DB_CONFIG, OUTPUT_DIR -from debug_utils import setup_debug_logger, log_processing_step +from config.offline_config import DB_CONFIG, OUTPUT_DIR +from scripts.debug_utils import setup_debug_logger, log_processing_step # ES配置 ES_CONFIG = { diff --git a/offline_tasks/scripts/i2i_deepwalk.py b/offline_tasks/scripts/i2i_deepwalk.py index ed6f5e8..035cab8 100644 --- a/offline_tasks/scripts/i2i_deepwalk.py +++ b/offline_tasks/scripts/i2i_deepwalk.py @@ -6,24 +6,22 @@ i2i - DeepWalk算法实现 import pandas as pd import argparse import os -import sys from datetime import datetime from collections import defaultdict from gensim.models import Word2Vec from db_service import create_db_connection -from config import ( +from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from debug_utils import ( +from scripts.debug_utils import ( setup_debug_logger, log_dataframe_info, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step ) # 导入 DeepWalk 实现 -sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), 'deepwalk')) -from deepwalk import DeepWalk +from deepwalk.deepwalk import DeepWalk def build_edge_file_from_db(df, behavior_weights, output_path, logger): diff --git a/offline_tasks/scripts/i2i_session_w2v.py b/offline_tasks/scripts/i2i_session_w2v.py index 5a44f5c..92c13cf 100644 --- a/offline_tasks/scripts/i2i_session_w2v.py +++ b/offline_tasks/scripts/i2i_session_w2v.py @@ -10,11 +10,11 @@ from collections import defaultdict from gensim.models import Word2Vec import numpy as np from db_service import create_db_connection -from config import ( +from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from debug_utils import ( +from scripts.debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/i2i_swing.py b/offline_tasks/scripts/i2i_swing.py index 6f8a76f..212b955 100644 --- a/offline_tasks/scripts/i2i_swing.py +++ b/offline_tasks/scripts/i2i_swing.py @@ -10,11 +10,11 @@ import argparse import json from datetime import datetime, timedelta from db_service import create_db_connection -from config import ( +from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, I2I_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_I2I_TOP_N ) -from debug_utils import ( +from scripts.debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, load_name_mappings_from_file, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/interest_aggregation.py b/offline_tasks/scripts/interest_aggregation.py index e27e4df..73a2e01 100644 --- a/offline_tasks/scripts/interest_aggregation.py +++ b/offline_tasks/scripts/interest_aggregation.py @@ -9,11 +9,11 @@ import json from datetime import datetime, timedelta from collections import defaultdict, Counter from db_service import create_db_connection -from config import ( +from config.offline_config import ( DB_CONFIG, OUTPUT_DIR, INTEREST_AGGREGATION_CONFIG, get_time_range, DEFAULT_LOOKBACK_DAYS, DEFAULT_RECENT_DAYS, DEFAULT_INTEREST_TOP_N ) -from debug_utils import ( +from scripts.debug_utils import ( setup_debug_logger, log_dataframe_info, log_dict_stats, save_readable_index, fetch_name_mappings, log_algorithm_params, log_processing_step diff --git a/offline_tasks/scripts/load_index_to_redis.py b/offline_tasks/scripts/load_index_to_redis.py index 21eb410..21d30e8 100644 --- a/offline_tasks/scripts/load_index_to_redis.py +++ b/offline_tasks/scripts/load_index_to_redis.py @@ -6,7 +6,7 @@ import redis import argparse import logging from datetime import datetime -from config import REDIS_CONFIG, OUTPUT_DIR +from config.offline_config import REDIS_CONFIG, OUTPUT_DIR logging.basicConfig( level=logging.INFO, -- libgit2 0.21.2