From 8095cb00634aae042fa78f01cd2dda629e6f288c Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 20 Oct 2025 22:40:16 +0800 Subject: [PATCH] add cos sim --- hot/README.md | 85 ------------------------------------------------------------------------------------- hot/main.py | 261 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- hot/run.sh | 7 ------- item_sim.py | 88 ---------------------------------------------------------------------------------------- tag_sim.py | 81 --------------------------------------------------------------------------------- 5 files changed, 0 insertions(+), 522 deletions(-) delete mode 100644 hot/README.md delete mode 100644 hot/main.py delete mode 100644 hot/run.sh delete mode 100644 item_sim.py delete mode 100644 tag_sim.py diff --git a/hot/README.md b/hot/README.md deleted file mode 100644 index 6323aa4..0000000 --- a/hot/README.md +++ /dev/null @@ -1,85 +0,0 @@ -# 热门书籍索引生成项目 - -## 项目简介 -本项目旨在根据机构的阅读行为数据(reading_time埋点数据)生成热门书籍索引,通过多种方法统计不同维度下的用户访问(UV)数据。项目支持基于机构(tenant)、机构所属行业(tenant_type)及书籍标签(tag)(包括category1和category2,当成tag同等处理)等不同维度进行统计和排名,从而生成热门书籍清单。并带有自动更新的软链接以方便外部访问。 - - -## 文件结构 -- `index_generation.py`:主程序代码,包含数据加载、UV处理、书单生成和输出等主要功能。 -- `logs/`:日志文件存放目录。 -- `output/`:程序生成的书单输出目录。 - -## 输入数据 -### 1. 书籍属性数据 (`all_books.json`) -- **路径**:`CONFIG['books_path']` -- **内容**:每行包含一个书籍的 JSON 数据,主要字段为: - - `id`:书籍ID。 - - `merged_tags`:书籍相关的标签列表,用逗号分隔。 - -### 2. 机构所属行业数据 (`tenants.json`) -- **路径**:`CONFIG['tenants_path']` -- **内容**:每行包含一个机构的 JSON 数据,主要字段为: - - `id`:机构ID。 - - `tenant_type`:机构所属行业类型。 - -### 3. 阅读行为数据 (`reading_time.json`) -- **路径**:`CONFIG['base_dir']` 下的文件夹,文件名格式为 `reading_time.json.YYYYMMDD`。 -- **内容**:每行包含一个阅读行为的 JSON 数据,主要字段为: - - `user_id`:用户ID。 - - `book_id`:书籍ID。 - - `tenant_id`:机构ID。 - -## 输出数据 -输出数据为生成的热门书籍列表,每个文件包含按指定维度统计的前 `N` 个书籍的排名结果: -- 文件输出路径:`CONFIG['output_dir']` -- 文件名格式:`_.txt`,并生成软链接至 `.txt`。 -- 输出内容示例:`tenant_id book_id1:uv_count1,book_id2:uv_count2,...` - -### 输出文件类型 -1. `tenant_booklist.txt`:按机构(tenant)统计的热门书籍列表。 -2. `tenant_type_booklist.txt`:按机构所属行业(tenant_type)统计的热门书籍列表。 -3. `tag_booklist.txt`:按标签(tag)统计的热门书籍列表。 - -## 配置参数 -### `CONFIG` 说明 -- `base_dir`:阅读数据文件的目录。 -- `books_path`:书籍属性数据文件路径。 -- `tenants_path`:机构所属行业数据文件路径。 -- `output_dir`:输出目录路径。 -- `days`:用于选择最近 `days` 天内的数据文件。 -- `top_n`:生成前 `N` 个热门书籍。 -- `tenant_type_ratio`:用于在机构数据不足时融合所属行业数据的权重比例。 -- `use_simple_uv_processing`: - - `True`:累加每天的 UV。 - - `False`:以数据周期内总 UV 统计为准。 - -## 计算逻辑 -1. **数据加载** - - 使用 `load_books_data()` 和 `load_tenants_data()` 分别加载书籍和机构的基本信息,确保各个 ID 均为字符串。 - - 使用 `get_recent_files()` 获取最近 `days` 天的阅读数据文件列表。 - -2. **UV 数据处理** - - `process_reading_data()`:简单 UV 统计,每条记录中的用户访问量直接累加。 - - `process_reading_data_by_uv()`:用户 UV 去重统计,计算某书籍在一天内的 UV 数量。 - - `CONFIG['use_simple_uv_processing']` 用于决定是否使用简单的累加逻辑。 - -3. **数据融合** - - 使用 `merge_tenant_uv_with_type_uv()` 将机构的 UV 数据与其所属行业的 UV 数据按比例进行融合,减小数据量较小的机构所带来的统计偏差。 - -4. **生成书单** - - `generate_top_booklist()` 根据 UV 统计数据生成指定维度的前 `N` 本热门书籍列表。 - - 生成的书单文件分别保存机构、机构所属行业、标签维度的热门书籍排名。 - -5. **输出与软链接** - - 使用 `write_output()` 将生成的书单写入指定文件,并更新软链接到最新文件。 - -## 日志 -程序的所有日志信息输出至 `logs/index_generation.log`,主要记录数据加载、文件处理、UV 统计、文件写入等步骤的成功与错误信息,以便跟踪和排查问题。 - -## 运行方法 -在终端中执行以下命令来运行主程序: -```bash -python main.py -# 或者 -sh run.sh -``` \ No newline at end of file diff --git a/hot/main.py b/hot/main.py deleted file mode 100644 index ae62864..0000000 --- a/hot/main.py +++ /dev/null @@ -1,261 +0,0 @@ -import os -import json -import glob -import logging -from collections import defaultdict, Counter -from datetime import datetime, timedelta -import shutil - -# 设置日志配置 -logging.basicConfig( - filename='logs/index_generation.log', - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' -) - -# 配置超参 -CONFIG = { - 'base_dir': '../fetch_data/data/', - 'books_path': '../fetch_data/meta_data/all_books.json', - 'tenants_path': '../fetch_data/meta_data/tenants.json', - 'output_dir': './output', - 'days': 30, # 天数,用于获取最近的文件 - 'top_n': 1000, # 生成的前 N 个书单 - 'tenant_type_ratio': 0.01, # 机构和所属行业融合的比例。可以解决机构的冷启动问题。机构内的行为数据越少,受到行业的影响越大。 - 'use_simple_uv_processing': True # 是否使用简单UV处理逻辑 - # 配置为True:则book的read UV统计规则为 每一天的UV的累加, - # 配置为False:则book的read UV统计规则为统计范围内所有天的UV,该方法更多的收到运营配置的曝光的影响, - # 默认为True -} - -def load_json_files(path_pattern): - """根据通配符加载 JSON 文件""" - files = glob.glob(path_pattern) - data = [] - for file in files: - with open(file, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line: - continue - try: - data.append(json.loads(line)) - except json.JSONDecodeError: - logging.error(f"Failed to parse JSON line in {file}: {line}") - return data - -def load_books_data(books_path): - """加载书籍属性词典,并将所有ID转换为字符串""" - books_data = {} - with open(books_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line: - continue - book = json.loads(line) - - tags = book.get('merged_tags', '') - category1 = book.get('category1', '') - category2 = book.get('category2', '') - combined_tags = ','.join(filter(lambda x: x not in [None, ''], [tags, category1, category2])) - books_data[str(book['id'])] = combined_tags # 将book['id']转换为字符串 - - logging.info(f"Loaded {len(books_data)} books from {books_path}") - return books_data - -def load_tenants_data(tenants_path): - """加载机构所属行业词典,并将所有ID转换为字符串""" - tenants_data = {} - with open(tenants_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line: - continue - tenant = json.loads(line) - tenant_type = tenant.get('tenant_type', '') - if not tenant_type: - tenant_type = '' - tenants_data[str(tenant['id'])] = tenant_type # 将tenant['id']转换为字符串 - logging.info(f"Loaded {len(tenants_data)} tenants from {tenants_path}") - return tenants_data - -def get_recent_files(base_dir, days=30): - """获取最近 days 天的文件""" - today = datetime.today() - recent_files = [] - for i in range(days): - date_str = (today - timedelta(days=i)).strftime('%Y%m%d') - path_pattern = os.path.join(base_dir, f'reading_time.json.{date_str}') - recent_files.extend(glob.glob(path_pattern)) - logging.info(f"Found {len(recent_files)} files for the last {days} days") - return recent_files - -def process_reading_data_by_uv(reading_files, books_data, tenants_data): - """使用用户UV数据处理阅读数据""" - tenant_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 - tenant_type_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 - tag_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 - - for file in reading_files: - with open(file, 'r', encoding='utf-8') as f: - for line in f: - try: - record = json.loads(line.strip()) - user_id = str(record.get('user_id', '')) # 将user_id转换为字符串 - book_id = str(record.get('book_id', '')) # 将book_id转换为字符串 - tenant_id = str(record.get('tenant_id', '')) # 将tenant_id转换为字符串 - - if not book_id or not tenant_id or not user_id: - continue - - tenant_uv[tenant_id][book_id].add(user_id) - tenant_type = tenants_data.get(tenant_id, '') # tenant_id已经是字符串 - tenant_type_uv[tenant_type][book_id].add(user_id) - - tags = books_data.get(book_id, '').split(',') - for tag in tags: - if tag: - tag_uv[tag][book_id].add(user_id) - - except json.JSONDecodeError: - logging.error(f"Failed to parse JSON line in {file}: {line}") - - # 转换为UV数量,即集合中user_id的数量 - tenant_uv_count = {tenant: Counter({book: len(users) for book, users in books.items()}) - for tenant, books in tenant_uv.items()} - tenant_type_uv_count = {tenant_type: Counter({book: len(users) for book, users in books.items()}) - for tenant_type, books in tenant_type_uv.items()} - tag_uv_count = {tag: Counter({book: len(users) for book, users in books.items()}) - for tag, books in tag_uv.items()} - - logging.info(f"Processed reading data, total tenants: {len(tenant_uv_count)}, tenant types: {len(tenant_type_uv_count)}, tags: {len(tag_uv_count)}") - - return tenant_uv_count, tenant_type_uv_count, tag_uv_count - -def process_reading_data(reading_files, books_data, tenants_data): - """使用简单的UV累加逻辑处理阅读数据""" - tenant_uv = defaultdict(Counter) - tenant_type_uv = defaultdict(Counter) - tag_uv = defaultdict(Counter) - - for file in reading_files: - with open(file, 'r', encoding='utf-8') as f: - for line in f: - try: - record = json.loads(line.strip()) - user_id = str(record.get('user_id', '')) # 将user_id转换为字符串 - book_id = str(record.get('book_id', '')) # 将book_id转换为字符串 - tenant_id = str(record.get('tenant_id', '')) # 将tenant_id转换为字符串 - - if not book_id or not tenant_id: - continue - - tenant_uv[tenant_id][book_id] += 1 - tenant_type = tenants_data.get(tenant_id, '') # tenant_id已经是字符串 - tenant_type_uv[tenant_type][book_id] += 1 - - tags = books_data.get(book_id, '').split(',') - for tag in tags: - if tag: - tag_uv[tag][book_id] += 1 - - except json.JSONDecodeError: - logging.error(f"Failed to parse JSON line in {file}: {line}") - - logging.info(f"Processed reading data, total tenants: {len(tenant_uv)}, tenant types: {len(tenant_type_uv)}, tags: {len(tag_uv)}") - - return tenant_uv, tenant_type_uv, tag_uv - -def generate_top_booklist(counter_dict, top_n=1000): - """生成排序后的前 top_n booklist""" - result = {} - for key, counter in counter_dict.items(): - top_books = counter.most_common(top_n) - if not key or len(top_books) == 0: - continue - result[key] = ','.join([f'{bid}:{uv}' for bid, uv in top_books]) - return result - -def write_output(data, output_dir, prefix, current_date): - """写入输出文件,并生成软链接到 output 目录下""" - try: - output_file_path = os.path.join(output_dir, f'{prefix}_{current_date}.txt') - output_file_link = os.path.join(output_dir, f'{prefix}.txt') - - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - with open(output_file_path, 'w', encoding='utf-8') as f: - for key, booklist in data.items(): - key.replace('\t', ' ') - if not key or not booklist: - continue - f.write(f"{key}\t{booklist}\n") - - logging.info(f"Output written to {output_file_path}") - - if os.path.islink(output_file_link) or os.path.exists(output_file_link): - os.remove(output_file_link) - - os.symlink(os.path.basename(output_file_path), output_file_link) - logging.info(f"Symlink created at {output_file_link} pointing to {output_file_path}") - - except Exception as e: - logging.error(f"Error writing output or creating symlink: {str(e)}") - -def merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']): - """合并 tenant 的 UV 统计和其所属 tenant_type 的 UV 统计结果 - - 融合的目的:通过融合机构所属行业的UV数据,平滑处理小机构数据不足的情况,给予它们更多的行业UV权重 ,避免因数据量小而导致的统计偏差。 - - ratio 参数控制行业 UV 统计数据在融合过程中所占的权重比例。较高的比例表示行业数据的影响较大,较低的比例则表示单个机构的数据占主导地位。 - """ - merged_tenant_uv = defaultdict(Counter) - - for tenant_id, books_counter in tenant_uv.items(): - # 获取该 tenant 的 tenant_type - tenant_type = tenants_data.get(tenant_id, '') - - # 获取该 tenant_type 下的 UV 统计 - tenant_type_counter = tenant_type_uv.get(tenant_type, Counter()) - - # 合并 tenant 自身的 UV 统计和 tenant_type 的 UV 统计结果(乘以比例系数) - for book_id, uv_count in books_counter.items(): - tenant_type_uv_adjusted = int(tenant_type_counter.get(book_id, 0) * ratio) - merged_tenant_uv[tenant_id][book_id] = uv_count + tenant_type_uv_adjusted - - logging.info(f"Merged tenant UV with tenant type UV using ratio {ratio}") - return merged_tenant_uv - -def main(): - # 获取当前日期 - current_date = datetime.today().strftime('%Y%m%d') - - # 加载书籍和机构数据 - books_data = load_books_data(CONFIG['books_path']) - tenants_data = load_tenants_data(CONFIG['tenants_path']) - - # 获取最近配置的天数的阅读数据文件 - reading_files = get_recent_files(CONFIG['base_dir'], days=CONFIG['days']) - - # 根据配置选择UV处理逻辑 - if CONFIG['use_simple_uv_processing']: - tenant_uv, tenant_type_uv, tag_uv = process_reading_data(reading_files, books_data, tenants_data) - else: - tenant_uv, tenant_type_uv, tag_uv = process_reading_data_by_uv(reading_files, books_data, tenants_data) - - # 合并 tenant UV 和 tenant_type UV(使用配置的比例) - merged_tenant_uv = merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']) - - # 生成前N本书的书单 - tenant_booklist = generate_top_booklist(merged_tenant_uv, top_n=CONFIG['top_n']) - tenant_type_booklist = generate_top_booklist(tenant_type_uv, top_n=CONFIG['top_n']) - tag_booklist = generate_top_booklist(tag_uv, top_n=CONFIG['top_n']) - - # 写入输出文件并生成软链接 - write_output(tenant_booklist, CONFIG['output_dir'], 'tenant_booklist', current_date) - write_output(tenant_type_booklist, CONFIG['output_dir'], 'tenant_type_booklist', current_date) - write_output(tag_booklist, CONFIG['output_dir'], 'tag_booklist', current_date) - -if __name__ == '__main__': - main() diff --git a/hot/run.sh b/hot/run.sh deleted file mode 100644 index df2aedc..0000000 --- a/hot/run.sh +++ /dev/null @@ -1,7 +0,0 @@ -mkdir -p output -mkdir -p logs -python3 main.py - -# 清理output目录下365天以前的文件 -find output/ -type f -mtime +365 -exec rm -f {} \; -find logs/ -type f -mtime +180 -exec rm -f {} \; diff --git a/item_sim.py b/item_sim.py deleted file mode 100644 index 27b35d3..0000000 --- a/item_sim.py +++ /dev/null @@ -1,88 +0,0 @@ -import pandas as pd -import math -from collections import defaultdict -from sqlalchemy import create_engine -from db_service import create_db_connection -import argparse - -def clean_text_field(text): - if pd.isna(text): - return '' - # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符 - return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip() - -# 数据库连接配置 -host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com' -port = '9030' -database = 'datacenter' -username = 'readonly' -password = 'essa1234' - -# 创建数据库连接 -engine = create_db_connection(host, port, database, username, password) - -# SQL 查询 - 获取用户点击序列 -sql_query = """ -SELECT - DATE_FORMAT(se.create_time, '%%Y-%%m-%%d') AS date, - se.anonymous_id AS user_id, - se.item_id, - pgs.name AS item_name -FROM - sensors_events se -LEFT JOIN prd_goods_sku pgs ON se.item_id = pgs.id -WHERE - se.event IN ('contactFactory', 'addToPool', 'addToCart') - AND se.create_time >= '2025-04-01' -ORDER BY - se.anonymous_id, - se.create_time; -""" - -# 执行 SQL 查询并将结果加载到 pandas DataFrame -df = pd.read_sql(sql_query, engine) - -# 处理点击序列,计算共现关系 -cooccur = defaultdict(lambda: defaultdict(int)) -freq = defaultdict(int) - -# 按用户和日期分组处理点击序列 -for (user_id, date), group in df.groupby(['user_id', 'date']): - items = group['item_id'].tolist() - unique_items = set(items) - - # 更新频率统计 - for item in unique_items: - freq[item] += 1 - - # 更新共现关系 - for i in range(len(items)): - for j in range(i + 1, len(items)): - item1, item2 = items[i], items[j] - if item1 != item2: - cooccur[item1][item2] += 1 - cooccur[item2][item1] += 1 - -# 计算余弦相似度 -result = {} -for item1 in cooccur: - sim_scores = [] - for item2 in cooccur[item1]: - numerator = cooccur[item1][item2] - denominator = math.sqrt(freq[item1]) * math.sqrt(freq[item2]) - if denominator != 0: - score = numerator / denominator - sim_scores.append((item2, score)) - sim_scores.sort(key=lambda x: -x[1]) # 按分数排序 - result[item1] = sim_scores - -# 创建item_id到name的映射 -item_name_map = dict(zip(df['item_id'], df['item_name'])) - -# 输出相似商品 -for item_id, sims in result.items(): - item_name = item_name_map.get(item_id, 'Unknown') - # 只取前8个最相似的商品 - top_sims = sims[:8] - sim_str = ','.join([f'{item_name_map.get(sim_id, "Unknown")}:{score:.4f}' for sim_id, score in top_sims]) - print(f'{item_name}\t{sim_str}') diff --git a/tag_sim.py b/tag_sim.py deleted file mode 100644 index 36a6396..0000000 --- a/tag_sim.py +++ /dev/null @@ -1,81 +0,0 @@ -import pandas as pd -import math -from collections import defaultdict -from sqlalchemy import create_engine -from db_service import create_db_connection -import argparse - -def clean_text_field(text): - if pd.isna(text): - return '' - # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符 - return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip() - -bpms_host = '120.76.244.158' -bpms_port = '3325' -bpms_database = 'bpms' -bpms_username = 'PRD_M1_190311' -bpms_password = 'WTF)xdbqtW!4gwA7' - -# 创建数据库连接 -engine = create_db_connection(bpms_host, bpms_port, bpms_database, bpms_username, bpms_password) - -# SQL 查询 -sql_query = """ -SELECT - sp.code AS `PO单号`, - psm.name AS `区域`, - bb.code AS `客户编码`, - GROUP_CONCAT(pc_1.name) AS `商品信息`, - MIN(spi.order_time) AS `下单货时间` -FROM sale_po sp -INNER JOIN sale_po_item spi ON sp.id = spi.po_id -LEFT JOIN buy_buyer bb ON bb.id = sp.buyer_id -LEFT JOIN prd_goods pg ON pg.id = spi.spu_id -LEFT JOIN prd_category AS pc_1 ON pc_1.id = SUBSTRING_INDEX(SUBSTRING_INDEX(pg.category_id, '.', 2), '.', -1) -LEFT JOIN pub_sale_market_setting psms ON psms.country_code = bb.countries -LEFT JOIN pub_sale_market psm ON psms.sale_market_id = psm.id -WHERE spi.quantity > 0 - AND spi.is_delete = 0 - AND bb.is_delete = 0 -GROUP BY sp.code, psm.name, bb.code; -""" - -# 执行 SQL 查询并将结果加载到 pandas DataFrame -df = pd.read_sql(sql_query, engine) - -# 处理商品信息,分割并去重 -cooccur = defaultdict(lambda: defaultdict(int)) -freq = defaultdict(int) - -for _, row in df.iterrows(): - # Handle None values in 商品信息 - if pd.isna(row['商品信息']): - continue - categories = [cat.strip() for cat in str(row['商品信息']).split(',') if cat.strip()] - unique_cats = set(categories) - for c1 in unique_cats: - freq[c1] += 1 - for c2 in unique_cats: - if c1 != c2: - cooccur[c1][c2] += 1 - -# 计算余弦相似度 -result = {} -for c1 in cooccur: - sim_scores = [] - for c2 in cooccur[c1]: - numerator = cooccur[c1][c2] - denominator = math.sqrt(freq[c1]) * math.sqrt(freq[c2]) - if denominator != 0: - score = numerator / denominator - sim_scores.append((c2, score)) - sim_scores.sort(key=lambda x: -x[1]) # 按分数排序 - result[c1] = sim_scores - -# 输出相似分类 -for cat, sims in result.items(): - # 只取前8个最相似的分类 - top_sims = sims[:8] - sim_str = ','.join([f'{sim_cat}:{score:.4f}' for sim_cat, score in top_sims]) - print(f'{cat}\t{sim_str}') -- libgit2 0.21.2