From 1088c2610d5b675bb560fd4d3ada5fd5be4d0f9e Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 21 Oct 2025 10:51:28 +0800 Subject: [PATCH] mv files --- offline_tasks/deepwalk/alias.py | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/deepwalk/deepwalk.py | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/scripts/add_names_to_swing.py | 4 ---- offline_tasks/scripts/db_service.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/scripts/fetch_item_attributes.py | 4 ---- offline_tasks/scripts/generate_session.py | 4 ---- offline_tasks/scripts/i2i_content_similar.py | 4 ---- offline_tasks/scripts/i2i_item_behavior.py | 1 - offline_tasks/scripts/i2i_session_w2v.py | 4 ---- offline_tasks/scripts/i2i_swing.py | 4 ---- offline_tasks/scripts/interest_aggregation.py | 4 ---- offline_tasks/scripts/load_index_to_redis.py | 4 ---- offline_tasks/scripts/tag_category_similar.py | 1 - offline_tasks/scripts/test_es_connection.py | 4 ---- 14 files changed, 369 insertions(+), 38 deletions(-) create mode 100644 offline_tasks/deepwalk/alias.py create mode 100644 offline_tasks/deepwalk/deepwalk.py create mode 100644 offline_tasks/scripts/db_service.py diff --git a/offline_tasks/deepwalk/alias.py b/offline_tasks/deepwalk/alias.py new file mode 100644 index 0000000..26c489c --- /dev/null +++ b/offline_tasks/deepwalk/alias.py @@ -0,0 +1,55 @@ +import numpy as np + + +def create_alias_table(area_ratio): + """ + + :param area_ratio: sum(area_ratio)=1 + :return: accept,alias + """ + l = len(area_ratio) + area_ratio = [prop * l for prop in area_ratio] + accept, alias = [0] * l, [0] * l + small, large = [], [] + + for i, prob in enumerate(area_ratio): + if prob < 1.0: + small.append(i) + else: + large.append(i) + + while small and large: + small_idx, large_idx = small.pop(), large.pop() + accept[small_idx] = area_ratio[small_idx] + alias[small_idx] = large_idx + area_ratio[large_idx] = area_ratio[large_idx] - \ + (1 - area_ratio[small_idx]) + if area_ratio[large_idx] < 1.0: + small.append(large_idx) + else: + large.append(large_idx) + + while large: + large_idx = large.pop() + accept[large_idx] = 1 + while small: + small_idx = small.pop() + accept[small_idx] = 1 + + return accept, alias + + +def alias_sample(accept, alias): + """ + + :param accept: + :param alias: + :return: sample index + """ + N = len(accept) + i = int(np.random.random()*N) + r = np.random.random() + if r < accept[i]: + return i + else: + return alias[i] diff --git a/offline_tasks/deepwalk/deepwalk.py b/offline_tasks/deepwalk/deepwalk.py new file mode 100644 index 0000000..7908620 --- /dev/null +++ b/offline_tasks/deepwalk/deepwalk.py @@ -0,0 +1,266 @@ +import random +import numpy as np +import networkx as nx +from joblib import Parallel, delayed +import itertools +from alias import create_alias_table, alias_sample +from tqdm import tqdm +import argparse +import multiprocessing +import logging +import os + +def softmax(x, temperature=1.0): + """ + 计算带有温度参数的softmax,并加入防止溢出的技巧 + """ + x = np.array(x) + x_max = np.max(x) + exp_x = np.exp((x - x_max) / temperature) # 加入temperature参数 + return exp_x / np.sum(exp_x) + +class DeepWalk: + def __init__(self, edge_file, node_tag_file, use_softmax=True, temperature=1.0, p_tag_walk=0.5): + """ + 初始化DeepWalk实例,构建图和标签索引,预处理alias采样表 + """ + logging.info(f"Initializing DeepWalk with edge file: {edge_file} and node-tag file: {node_tag_file}") + self.graph = self.build_graph_from_edge_file(edge_file) + if node_tag_file: + self.node_to_tags, self.tag_to_nodes = self.build_tag_index(node_tag_file) + else: + self.node_to_tags = None + self.tag_to_nodes = None + + self.alias_nodes = {} + self.p_tag_walk = p_tag_walk + logging.info(f"Graph built with {self.graph.number_of_nodes()} nodes and {self.graph.number_of_edges()} edges.") + + if use_softmax: + logging.info(f"Using softmax with temperature: {temperature}") + self.preprocess_transition_probs__softmax(temperature) + else: + logging.info("Using standard alias sampling.") + self.preprocess_transition_probs() + + def build_graph_from_edge_file(self, edge_file): + """ + 从edge文件构建图 + edge文件格式: bid1 \t bid2:weight1,bid2:weight2,... + """ + G = nx.Graph() + + # 打开edge文件并读取内容 + with open(edge_file, 'r') as f: + for line in f: + parts = line.strip().split('\t') + if len(parts) != 2: + continue + node, edges_str = parts + edges = edges_str.split(',') + + for edge in edges: + nbr, weight = edge.split(':') + try: + node, nbr = int(node), int(nbr) + except ValueError: + continue + weight = float(weight) + + # 检查图中是否已存在这条边 + if G.has_edge(node, nbr): + # 如果已经有这条边,更新权重,累加新权重 + G[node][nbr]['weight'] += weight + else: + # 如果没有这条边,直接添加 + G.add_edge(node, nbr, weight=weight) + + return G + + def build_tag_index(self, node_tag_file): + """ + 构建节点-标签的正排和倒排索引 + node_tag_file格式: book_id \t tag1,tag2,tag3 + """ + node_to_tags = {} + tag_to_nodes = {} + + with open(node_tag_file, 'r') as f: + for line in f: + parts = line.strip().split('\t') + if len(parts) != 2: + continue + node, tags_str = parts + try: + node = int(node) + except ValueError: + continue + # 只保留有过用户行为的node + if not node in self.graph: + continue + tags = tags_str.split(',') + node_to_tags[node] = tags + for tag in tags: + tag_to_nodes.setdefault(tag, []).append(node) + + return node_to_tags, tag_to_nodes + + def preprocess_transition_probs(self): + """ + 预处理节点的alias采样表,用于快速加权随机游走 + """ + G = self.graph + + for node in G.nodes(): + unnormalized_probs = [G[node][nbr].get('weight', 1.0) for nbr in G.neighbors(node)] + norm_const = sum(unnormalized_probs) + normalized_probs = [float(u_prob) / norm_const for u_prob in unnormalized_probs] + self.alias_nodes[node] = create_alias_table(normalized_probs) + + def preprocess_transition_probs__softmax(self, temperature=1.0): + """ + 预处理节点的alias采样表,用于快速加权随机游走 + """ + G = self.graph + + for node in G.nodes(): + unnormalized_probs = [G[node][nbr].get('weight', 1.0) for nbr in G.neighbors(node)] + normalized_probs = softmax(unnormalized_probs, temperature) + self.alias_nodes[node] = create_alias_table(normalized_probs) + + def deepwalk_walk(self, walk_length, start_node): + """ + 执行一次DeepWalk随机游走,基于alias方法加速,支持通过标签游走 + """ + G = self.graph + alias_nodes = self.alias_nodes + walk = [start_node] + + while len(walk) < walk_length: + cur = walk[-1] + + # 根据p_tag_walk的概率决定是通过邻居游走还是通过tag游走 + if self.node_to_tags and random.random() < self.p_tag_walk and cur in self.node_to_tags: + walk = self.tag_based_walk(cur, walk) + else: + walk = self.neighbor_based_walk(cur, alias_nodes, walk) + + if not walk: + break + + return walk + + def neighbor_based_walk(self, cur, alias_nodes, walk): + """ + 基于邻居的随机游走 + """ + G = self.graph + cur_nbrs = list(G.neighbors(cur)) + if len(cur_nbrs) > 0: + idx = alias_sample(alias_nodes[cur][0], alias_nodes[cur][1]) + walk.append(cur_nbrs[idx]) + else: + return None + return walk + + def tag_based_walk(self, cur, walk): + """ + 基于标签的随机游走 + """ + tags = self.node_to_tags[cur] + if not tags: + return None + + # 随机选择一个tag + chosen_tag = random.choice(tags) + + # 获取该tag下的节点列表 + nodes_with_tag = self.tag_to_nodes.get(chosen_tag, []) + if not nodes_with_tag: + return None + + # 随机选择一个节点 + chosen_node = random.choice(nodes_with_tag) + walk.append(chosen_node) + return walk + + def simulate_walks(self, num_walks, walk_length, workers, output_file): + """ + 多进程模拟多次随机游走,并将游走结果保存到文件 + """ + G = self.graph + nodes = list(G.nodes()) + num_walks_per_worker = max(1, num_walks // workers) + logging.info(f"Starting simulation with {num_walks_per_worker} walks per node, walk length {walk_length}, using {workers} workers.") + + # + # results = Parallel(n_jobs=workers)( + # results = Parallel(n_jobs=workers, backend='multiprocessing')( + # results = Parallel(n_jobs=workers, backend='loky')( + results = Parallel(n_jobs=workers)( + delayed(self._simulate_walks)(nodes, num_walks_per_worker, walk_length) + for _ in range(workers) + ) + walks = list(itertools.chain(*results)) + + # 保存游走结果到文件 + self.save_walks_to_file(walks, output_file) + + def _simulate_walks(self, nodes, num_walks, walk_length): + + """ + 模拟多次随机游走 + """ + logging.info(f"_simulate_walks started, num_walks:{num_walks}, walk_length:{walk_length}") + walks = [] + for i in range(num_walks): + logging.info(f"_simulate_walks run num_walks of {i}.") + random.shuffle(nodes) + for node in nodes: + walks.append(self.deepwalk_walk(walk_length=walk_length, start_node=node)) + return walks + + def save_walks_to_file(self, walks, output_file): + """ + 将游走结果保存到文件,按Word2Vec的输入格式 + """ + logging.info(f"Saving walks to file: {output_file}") + with open(output_file, 'w') as f: + for walk in walks: + walk_str = ' '.join(map(str, walk)) + f.write(walk_str + '\n') + logging.info(f"Successfully saved {len(walks)} walks to {output_file}.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run DeepWalk with tag-based random walks") + parser.add_argument('--edge-file', type=str, required=True, help="Path to the edge file") # ../../fetch_data/data/edge.txt.20240923 + parser.add_argument('--node-tag-file', type=str, help="Path to the node-tag file") + parser.add_argument('--num-walks', type=int, default=100, help="Number of walks per node (default: 10)") + parser.add_argument('--walk-length', type=int, default=40, help="Length of each walk (default: 40)") + parser.add_argument('--workers', type=int, default=multiprocessing.cpu_count() - 1, help="Number of workers (default: CPU cores - 1)") + parser.add_argument('--use-softmax', action='store_true', help="Use softmax-based alias sampling (default: False)") + parser.add_argument('--temperature', type=float, default=1.0, help="Temperature for softmax (default: 1.0)") + parser.add_argument('--p-tag-walk', type=float, default=0.2, help="Probability to walk through tag-based neighbors (default: 0.5)") + parser.add_argument('--output-file', type=str, required=True, help="Path to save the walks file") + + args = parser.parse_args() + + # 初始化日志记录 + logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) + + # 初始化DeepWalk实例,传入边文件和节点标签文件 + deepwalk = DeepWalk( + edge_file=args.edge_file, + node_tag_file=args.node_tag_file, + use_softmax=args.use_softmax, + temperature=args.temperature, + p_tag_walk=args.p_tag_walk + ) + + # 模拟随机游走并将结果保存到文件 + deepwalk.simulate_walks( + num_walks=args.num_walks, + walk_length=args.walk_length, + workers=args.workers, + output_file=args.output_file + ) diff --git a/offline_tasks/scripts/add_names_to_swing.py b/offline_tasks/scripts/add_names_to_swing.py index 1e703fc..c2f8c87 100644 --- a/offline_tasks/scripts/add_names_to_swing.py +++ b/offline_tasks/scripts/add_names_to_swing.py @@ -3,10 +3,6 @@ 输入格式: item_id \t similar_item_id1:score1,similar_item_id2:score2,... 输出格式: item_id:name \t similar_item_id1:name1:score1,similar_item_id2:name2:score2,... """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import argparse from datetime import datetime from offline_tasks.scripts.debug_utils import setup_debug_logger, load_name_mappings_from_file diff --git a/offline_tasks/scripts/db_service.py b/offline_tasks/scripts/db_service.py new file mode 100644 index 0000000..19b7619 --- /dev/null +++ b/offline_tasks/scripts/db_service.py @@ -0,0 +1,48 @@ +""" +数据库连接服务模块 +提供统一的数据库连接接口 +""" +from sqlalchemy import create_engine +from urllib.parse import quote_plus +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_db_connection(host, port, database, username, password): + """ + 创建数据库连接 + + Args: + host: 数据库主机地址 + port: 端口 + database: 数据库名 + username: 用户名 + password: 密码 + + Returns: + SQLAlchemy engine对象 + """ + try: + # 对密码进行URL编码,处理特殊字符 + encoded_password = quote_plus(password) + + # 构建连接字符串 + connection_string = f'mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}' + + # 创建引擎 + engine = create_engine( + connection_string, + pool_pre_ping=True, # 连接池预检 + pool_recycle=3600, # 连接回收时间 + echo=False + ) + + logger.info(f"Database connection created successfully: {host}:{port}/{database}") + return engine + + except Exception as e: + logger.error(f"Failed to create database connection: {e}") + raise + diff --git a/offline_tasks/scripts/fetch_item_attributes.py b/offline_tasks/scripts/fetch_item_attributes.py index bb21f4f..3ca9017 100644 --- a/offline_tasks/scripts/fetch_item_attributes.py +++ b/offline_tasks/scripts/fetch_item_attributes.py @@ -3,10 +3,6 @@ 从数据库获取ID->名称的映射,保存到本地文件供其他任务使用 避免每个任务重复查询数据库 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import pandas as pd import json import argparse diff --git a/offline_tasks/scripts/generate_session.py b/offline_tasks/scripts/generate_session.py index 8dc1eeb..5e11506 100644 --- a/offline_tasks/scripts/generate_session.py +++ b/offline_tasks/scripts/generate_session.py @@ -3,10 +3,6 @@ 从数据库读取用户行为,生成适用于C++ Swing算法的session文件 输出格式: uid \t {"item_id":score,"item_id":score,...} """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import pandas as pd import json from collections import defaultdict diff --git a/offline_tasks/scripts/i2i_content_similar.py b/offline_tasks/scripts/i2i_content_similar.py index d7fed8f..4504d20 100644 --- a/offline_tasks/scripts/i2i_content_similar.py +++ b/offline_tasks/scripts/i2i_content_similar.py @@ -4,10 +4,6 @@ i2i - 基于ES向量的内容相似索引 1. 基于名称文本向量的相似度 2. 基于图片向量的相似度 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import json import pandas as pd from datetime import datetime, timedelta diff --git a/offline_tasks/scripts/i2i_item_behavior.py b/offline_tasks/scripts/i2i_item_behavior.py index 75476df..23edb76 100644 --- a/offline_tasks/scripts/i2i_item_behavior.py +++ b/offline_tasks/scripts/i2i_item_behavior.py @@ -5,7 +5,6 @@ from sqlalchemy import create_engine from db_service import create_db_connection import argparse from datetime import datetime -import os def clean_text_field(text): if pd.isna(text): diff --git a/offline_tasks/scripts/i2i_session_w2v.py b/offline_tasks/scripts/i2i_session_w2v.py index 6d5c459..ce86f75 100644 --- a/offline_tasks/scripts/i2i_session_w2v.py +++ b/offline_tasks/scripts/i2i_session_w2v.py @@ -2,10 +2,6 @@ i2i - Session Word2Vec算法实现 基于用户会话序列训练Word2Vec模型,获取物品向量相似度 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import pandas as pd import json import argparse diff --git a/offline_tasks/scripts/i2i_swing.py b/offline_tasks/scripts/i2i_swing.py index 4bb5eac..ddaa607 100644 --- a/offline_tasks/scripts/i2i_swing.py +++ b/offline_tasks/scripts/i2i_swing.py @@ -3,10 +3,6 @@ i2i - Swing算法实现 基于用户行为的物品相似度计算 参考item_sim.py的数据格式,适配真实数据 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import pandas as pd import math from collections import defaultdict diff --git a/offline_tasks/scripts/interest_aggregation.py b/offline_tasks/scripts/interest_aggregation.py index 6528153..28989bd 100644 --- a/offline_tasks/scripts/interest_aggregation.py +++ b/offline_tasks/scripts/interest_aggregation.py @@ -2,10 +2,6 @@ 兴趣点聚合索引生成 按照多个维度(平台、国家、客户类型、分类、列表类型)生成商品索引 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import pandas as pd import math import argparse diff --git a/offline_tasks/scripts/load_index_to_redis.py b/offline_tasks/scripts/load_index_to_redis.py index c456bf1..df222b0 100644 --- a/offline_tasks/scripts/load_index_to_redis.py +++ b/offline_tasks/scripts/load_index_to_redis.py @@ -2,10 +2,6 @@ 将生成的索引加载到Redis 用于在线推荐系统查询 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - import redis import argparse import logging diff --git a/offline_tasks/scripts/tag_category_similar.py b/offline_tasks/scripts/tag_category_similar.py index d201114..3f7622c 100644 --- a/offline_tasks/scripts/tag_category_similar.py +++ b/offline_tasks/scripts/tag_category_similar.py @@ -5,7 +5,6 @@ from sqlalchemy import create_engine from db_service import create_db_connection import argparse from datetime import datetime -import os def clean_text_field(text): if pd.isna(text): diff --git a/offline_tasks/scripts/test_es_connection.py b/offline_tasks/scripts/test_es_connection.py index 99edf06..383b6e1 100644 --- a/offline_tasks/scripts/test_es_connection.py +++ b/offline_tasks/scripts/test_es_connection.py @@ -2,10 +2,6 @@ 测试Elasticsearch连接和向量查询 用于验证ES配置和向量字段是否正确 """ -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - from elasticsearch import Elasticsearch import json -- libgit2 0.21.2