import os import json import glob import logging from collections import defaultdict, Counter from datetime import datetime, timedelta import shutil # 设置日志配置 logging.basicConfig( filename='logs/index_generation.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # 配置超参 CONFIG = { 'base_dir': '../fetch_data/data/', 'books_path': '../fetch_data/meta_data/all_books.json', 'tenants_path': '../fetch_data/meta_data/tenants.json', 'output_dir': './output', 'days': 30, # 天数,用于获取最近的文件 'top_n': 1000, # 生成的前 N 个书单 'tenant_type_ratio': 0.01, # 机构和所属行业融合的比例。可以解决机构的冷启动问题。机构内的行为数据越少,受到行业的影响越大。 'use_simple_uv_processing': True # 是否使用简单UV处理逻辑 # 配置为True:则book的read UV统计规则为 每一天的UV的累加, # 配置为False:则book的read UV统计规则为统计范围内所有天的UV,该方法更多的收到运营配置的曝光的影响, # 默认为True } def load_json_files(path_pattern): """根据通配符加载 JSON 文件""" files = glob.glob(path_pattern) data = [] for file in files: with open(file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue try: data.append(json.loads(line)) except json.JSONDecodeError: logging.error(f"Failed to parse JSON line in {file}: {line}") return data def load_books_data(books_path): """加载书籍属性词典,并将所有ID转换为字符串""" books_data = {} with open(books_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue book = json.loads(line) tags = book.get('merged_tags', '') category1 = book.get('category1', '') category2 = book.get('category2', '') combined_tags = ','.join(filter(lambda x: x not in [None, ''], [tags, category1, category2])) books_data[str(book['id'])] = combined_tags # 将book['id']转换为字符串 logging.info(f"Loaded {len(books_data)} books from {books_path}") return books_data def load_tenants_data(tenants_path): """加载机构所属行业词典,并将所有ID转换为字符串""" tenants_data = {} with open(tenants_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue tenant = json.loads(line) tenant_type = tenant.get('tenant_type', '') if not tenant_type: tenant_type = '' tenants_data[str(tenant['id'])] = tenant_type # 将tenant['id']转换为字符串 logging.info(f"Loaded {len(tenants_data)} tenants from {tenants_path}") return tenants_data def get_recent_files(base_dir, days=30): """获取最近 days 天的文件""" today = datetime.today() recent_files = [] for i in range(days): date_str = (today - timedelta(days=i)).strftime('%Y%m%d') path_pattern = os.path.join(base_dir, f'reading_time.json.{date_str}') recent_files.extend(glob.glob(path_pattern)) logging.info(f"Found {len(recent_files)} files for the last {days} days") return recent_files def process_reading_data_by_uv(reading_files, books_data, tenants_data): """使用用户UV数据处理阅读数据""" tenant_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 tenant_type_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 tag_uv = defaultdict(lambda: defaultdict(set)) # 使用集合来去重 for file in reading_files: with open(file, 'r', encoding='utf-8') as f: for line in f: try: record = json.loads(line.strip()) user_id = str(record.get('user_id', '')) # 将user_id转换为字符串 book_id = str(record.get('book_id', '')) # 将book_id转换为字符串 tenant_id = str(record.get('tenant_id', '')) # 将tenant_id转换为字符串 if not book_id or not tenant_id or not user_id: continue tenant_uv[tenant_id][book_id].add(user_id) tenant_type = tenants_data.get(tenant_id, '') # tenant_id已经是字符串 tenant_type_uv[tenant_type][book_id].add(user_id) tags = books_data.get(book_id, '').split(',') for tag in tags: if tag: tag_uv[tag][book_id].add(user_id) except json.JSONDecodeError: logging.error(f"Failed to parse JSON line in {file}: {line}") # 转换为UV数量,即集合中user_id的数量 tenant_uv_count = {tenant: Counter({book: len(users) for book, users in books.items()}) for tenant, books in tenant_uv.items()} tenant_type_uv_count = {tenant_type: Counter({book: len(users) for book, users in books.items()}) for tenant_type, books in tenant_type_uv.items()} tag_uv_count = {tag: Counter({book: len(users) for book, users in books.items()}) for tag, books in tag_uv.items()} logging.info(f"Processed reading data, total tenants: {len(tenant_uv_count)}, tenant types: {len(tenant_type_uv_count)}, tags: {len(tag_uv_count)}") return tenant_uv_count, tenant_type_uv_count, tag_uv_count def process_reading_data(reading_files, books_data, tenants_data): """使用简单的UV累加逻辑处理阅读数据""" tenant_uv = defaultdict(Counter) tenant_type_uv = defaultdict(Counter) tag_uv = defaultdict(Counter) for file in reading_files: with open(file, 'r', encoding='utf-8') as f: for line in f: try: record = json.loads(line.strip()) user_id = str(record.get('user_id', '')) # 将user_id转换为字符串 book_id = str(record.get('book_id', '')) # 将book_id转换为字符串 tenant_id = str(record.get('tenant_id', '')) # 将tenant_id转换为字符串 if not book_id or not tenant_id: continue tenant_uv[tenant_id][book_id] += 1 tenant_type = tenants_data.get(tenant_id, '') # tenant_id已经是字符串 tenant_type_uv[tenant_type][book_id] += 1 tags = books_data.get(book_id, '').split(',') for tag in tags: if tag: tag_uv[tag][book_id] += 1 except json.JSONDecodeError: logging.error(f"Failed to parse JSON line in {file}: {line}") logging.info(f"Processed reading data, total tenants: {len(tenant_uv)}, tenant types: {len(tenant_type_uv)}, tags: {len(tag_uv)}") return tenant_uv, tenant_type_uv, tag_uv def generate_top_booklist(counter_dict, top_n=1000): """生成排序后的前 top_n booklist""" result = {} for key, counter in counter_dict.items(): top_books = counter.most_common(top_n) if not key or len(top_books) == 0: continue result[key] = ','.join([f'{bid}:{uv}' for bid, uv in top_books]) return result def write_output(data, output_dir, prefix, current_date): """写入输出文件,并生成软链接到 output 目录下""" try: output_file_path = os.path.join(output_dir, f'{prefix}_{current_date}.txt') output_file_link = os.path.join(output_dir, f'{prefix}.txt') if not os.path.exists(output_dir): os.makedirs(output_dir) with open(output_file_path, 'w', encoding='utf-8') as f: for key, booklist in data.items(): key.replace('\t', ' ') if not key or not booklist: continue f.write(f"{key}\t{booklist}\n") logging.info(f"Output written to {output_file_path}") if os.path.islink(output_file_link) or os.path.exists(output_file_link): os.remove(output_file_link) os.symlink(os.path.basename(output_file_path), output_file_link) logging.info(f"Symlink created at {output_file_link} pointing to {output_file_path}") except Exception as e: logging.error(f"Error writing output or creating symlink: {str(e)}") def merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']): """合并 tenant 的 UV 统计和其所属 tenant_type 的 UV 统计结果 融合的目的:通过融合机构所属行业的UV数据,平滑处理小机构数据不足的情况,给予它们更多的行业UV权重 ,避免因数据量小而导致的统计偏差。 ratio 参数控制行业 UV 统计数据在融合过程中所占的权重比例。较高的比例表示行业数据的影响较大,较低的比例则表示单个机构的数据占主导地位。 """ merged_tenant_uv = defaultdict(Counter) for tenant_id, books_counter in tenant_uv.items(): # 获取该 tenant 的 tenant_type tenant_type = tenants_data.get(tenant_id, '') # 获取该 tenant_type 下的 UV 统计 tenant_type_counter = tenant_type_uv.get(tenant_type, Counter()) # 合并 tenant 自身的 UV 统计和 tenant_type 的 UV 统计结果(乘以比例系数) for book_id, uv_count in books_counter.items(): tenant_type_uv_adjusted = int(tenant_type_counter.get(book_id, 0) * ratio) merged_tenant_uv[tenant_id][book_id] = uv_count + tenant_type_uv_adjusted logging.info(f"Merged tenant UV with tenant type UV using ratio {ratio}") return merged_tenant_uv def main(): # 获取当前日期 current_date = datetime.today().strftime('%Y%m%d') # 加载书籍和机构数据 books_data = load_books_data(CONFIG['books_path']) tenants_data = load_tenants_data(CONFIG['tenants_path']) # 获取最近配置的天数的阅读数据文件 reading_files = get_recent_files(CONFIG['base_dir'], days=CONFIG['days']) # 根据配置选择UV处理逻辑 if CONFIG['use_simple_uv_processing']: tenant_uv, tenant_type_uv, tag_uv = process_reading_data(reading_files, books_data, tenants_data) else: tenant_uv, tenant_type_uv, tag_uv = process_reading_data_by_uv(reading_files, books_data, tenants_data) # 合并 tenant UV 和 tenant_type UV(使用配置的比例) merged_tenant_uv = merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']) # 生成前N本书的书单 tenant_booklist = generate_top_booklist(merged_tenant_uv, top_n=CONFIG['top_n']) tenant_type_booklist = generate_top_booklist(tenant_type_uv, top_n=CONFIG['top_n']) tag_booklist = generate_top_booklist(tag_uv, top_n=CONFIG['top_n']) # 写入输出文件并生成软链接 write_output(tenant_booklist, CONFIG['output_dir'], 'tenant_booklist', current_date) write_output(tenant_type_booklist, CONFIG['output_dir'], 'tenant_type_booklist', current_date) write_output(tag_booklist, CONFIG['output_dir'], 'tag_booklist', current_date) if __name__ == '__main__': main()