main.py 11.8 KB
import os
import json
import glob
import logging
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import shutil

# 设置日志配置
logging.basicConfig(
    filename='logs/index_generation.log', 
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 配置超参
CONFIG = {
    'base_dir': '../fetch_data/data/',
    'books_path': '../fetch_data/meta_data/all_books.json',
    'tenants_path': '../fetch_data/meta_data/tenants.json',
    'output_dir': './output',
    'days': 30,  # 天数,用于获取最近的文件
    'top_n': 1000,  # 生成的前 N 个书单
    'tenant_type_ratio': 0.01,  # 机构和所属行业融合的比例。可以解决机构的冷启动问题。机构内的行为数据越少,受到行业的影响越大。
    'use_simple_uv_processing': True  # 是否使用简单UV处理逻辑
                                    # 配置为True:则book的read UV统计规则为 每一天的UV的累加,
                                    # 配置为False:则book的read UV统计规则为统计范围内所有天的UV,该方法更多的收到运营配置的曝光的影响,
                                    # 默认为True
}

def load_json_files(path_pattern):
    """根据通配符加载 JSON 文件"""
    files = glob.glob(path_pattern)
    data = []
    for file in files:
        with open(file, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    logging.error(f"Failed to parse JSON line in {file}: {line}")
    return data

def load_books_data(books_path):
    """加载书籍属性词典,并将所有ID转换为字符串"""
    books_data = {}
    with open(books_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            book = json.loads(line)

            tags = book.get('merged_tags', '')
            category1 = book.get('category1', '')
            category2 = book.get('category2', '')
            combined_tags = ','.join(filter(lambda x: x not in [None, ''], [tags, category1, category2]))
            books_data[str(book['id'])] = combined_tags  # 将book['id']转换为字符串

    logging.info(f"Loaded {len(books_data)} books from {books_path}")
    return books_data

def load_tenants_data(tenants_path):
    """加载机构所属行业词典,并将所有ID转换为字符串"""
    tenants_data = {}
    with open(tenants_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            tenant = json.loads(line)
            tenant_type = tenant.get('tenant_type', '')
            if not tenant_type:
                tenant_type = ''
            tenants_data[str(tenant['id'])] = tenant_type  # 将tenant['id']转换为字符串
    logging.info(f"Loaded {len(tenants_data)} tenants from {tenants_path}")
    return tenants_data

def get_recent_files(base_dir, days=30):
    """获取最近 days 天的文件"""
    today = datetime.today()
    recent_files = []
    for i in range(days):
        date_str = (today - timedelta(days=i)).strftime('%Y%m%d')
        path_pattern = os.path.join(base_dir, f'reading_time.json.{date_str}')
        recent_files.extend(glob.glob(path_pattern))
    logging.info(f"Found {len(recent_files)} files for the last {days} days")
    return recent_files

def process_reading_data_by_uv(reading_files, books_data, tenants_data):
    """使用用户UV数据处理阅读数据"""
    tenant_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重
    tenant_type_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重
    tag_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重

    for file in reading_files:
        with open(file, 'r', encoding='utf-8') as f:
            for line in f:
                try:
                    record = json.loads(line.strip())
                    user_id = str(record.get('user_id', ''))  # 将user_id转换为字符串
                    book_id = str(record.get('book_id', ''))  # 将book_id转换为字符串
                    tenant_id = str(record.get('tenant_id', ''))  # 将tenant_id转换为字符串

                    if not book_id or not tenant_id or not user_id:
                        continue

                    tenant_uv[tenant_id][book_id].add(user_id)
                    tenant_type = tenants_data.get(tenant_id, '')  # tenant_id已经是字符串
                    tenant_type_uv[tenant_type][book_id].add(user_id)

                    tags = books_data.get(book_id, '').split(',')
                    for tag in tags:
                        if tag:
                            tag_uv[tag][book_id].add(user_id)

                except json.JSONDecodeError:
                    logging.error(f"Failed to parse JSON line in {file}: {line}")

    # 转换为UV数量,即集合中user_id的数量
    tenant_uv_count = {tenant: Counter({book: len(users) for book, users in books.items()})
                       for tenant, books in tenant_uv.items()}
    tenant_type_uv_count = {tenant_type: Counter({book: len(users) for book, users in books.items()})
                            for tenant_type, books in tenant_type_uv.items()}
    tag_uv_count = {tag: Counter({book: len(users) for book, users in books.items()})
                    for tag, books in tag_uv.items()}

    logging.info(f"Processed reading data, total tenants: {len(tenant_uv_count)}, tenant types: {len(tenant_type_uv_count)}, tags: {len(tag_uv_count)}")

    return tenant_uv_count, tenant_type_uv_count, tag_uv_count

def process_reading_data(reading_files, books_data, tenants_data):
    """使用简单的UV累加逻辑处理阅读数据"""
    tenant_uv = defaultdict(Counter)
    tenant_type_uv = defaultdict(Counter)
    tag_uv = defaultdict(Counter)
    
    for file in reading_files:
        with open(file, 'r', encoding='utf-8') as f:
            for line in f:
                try:
                    record = json.loads(line.strip())
                    user_id = str(record.get('user_id', ''))  # 将user_id转换为字符串
                    book_id = str(record.get('book_id', ''))  # 将book_id转换为字符串
                    tenant_id = str(record.get('tenant_id', ''))  # 将tenant_id转换为字符串
                    
                    if not book_id or not tenant_id:
                        continue

                    tenant_uv[tenant_id][book_id] += 1
                    tenant_type = tenants_data.get(tenant_id, '')  # tenant_id已经是字符串
                    tenant_type_uv[tenant_type][book_id] += 1

                    tags = books_data.get(book_id, '').split(',')
                    for tag in tags:
                        if tag:
                            tag_uv[tag][book_id] += 1

                except json.JSONDecodeError:
                    logging.error(f"Failed to parse JSON line in {file}: {line}")
    
    logging.info(f"Processed reading data, total tenants: {len(tenant_uv)}, tenant types: {len(tenant_type_uv)}, tags: {len(tag_uv)}")
    
    return tenant_uv, tenant_type_uv, tag_uv

def generate_top_booklist(counter_dict, top_n=1000):
    """生成排序后的前 top_n booklist"""
    result = {}
    for key, counter in counter_dict.items():
        top_books = counter.most_common(top_n)
        if not key or len(top_books) == 0:
            continue
        result[key] = ','.join([f'{bid}:{uv}' for bid, uv in top_books])
    return result

def write_output(data, output_dir, prefix, current_date):
    """写入输出文件,并生成软链接到 output 目录下"""
    try:
        output_file_path = os.path.join(output_dir, f'{prefix}_{current_date}.txt')
        output_file_link = os.path.join(output_dir, f'{prefix}.txt')
        
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        with open(output_file_path, 'w', encoding='utf-8') as f:
            for key, booklist in data.items():
                key.replace('\t', ' ')
                if not key or not booklist:
                    continue
                f.write(f"{key}\t{booklist}\n")
        
        logging.info(f"Output written to {output_file_path}")

        if os.path.islink(output_file_link) or os.path.exists(output_file_link):
            os.remove(output_file_link)
        
        os.symlink(os.path.basename(output_file_path), output_file_link)
        logging.info(f"Symlink created at {output_file_link} pointing to {output_file_path}")

    except Exception as e:
        logging.error(f"Error writing output or creating symlink: {str(e)}")

def merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']):
    """合并 tenant 的 UV 统计和其所属 tenant_type 的 UV 统计结果
    
    融合的目的:通过融合机构所属行业的UV数据,平滑处理小机构数据不足的情况,给予它们更多的行业UV权重    ,避免因数据量小而导致的统计偏差。
    
    ratio 参数控制行业 UV 统计数据在融合过程中所占的权重比例。较高的比例表示行业数据的影响较大,较低的比例则表示单个机构的数据占主导地位。
    """
    merged_tenant_uv = defaultdict(Counter)

    for tenant_id, books_counter in tenant_uv.items():
        # 获取该 tenant 的 tenant_type
        tenant_type = tenants_data.get(tenant_id, '')
        
        # 获取该 tenant_type 下的 UV 统计
        tenant_type_counter = tenant_type_uv.get(tenant_type, Counter())

        # 合并 tenant 自身的 UV 统计和 tenant_type 的 UV 统计结果(乘以比例系数)
        for book_id, uv_count in books_counter.items():
            tenant_type_uv_adjusted = int(tenant_type_counter.get(book_id, 0) * ratio)
            merged_tenant_uv[tenant_id][book_id] = uv_count + tenant_type_uv_adjusted

    logging.info(f"Merged tenant UV with tenant type UV using ratio {ratio}")
    return merged_tenant_uv

def main():
    # 获取当前日期
    current_date = datetime.today().strftime('%Y%m%d')

    # 加载书籍和机构数据
    books_data = load_books_data(CONFIG['books_path'])
    tenants_data = load_tenants_data(CONFIG['tenants_path'])
    
    # 获取最近配置的天数的阅读数据文件
    reading_files = get_recent_files(CONFIG['base_dir'], days=CONFIG['days'])

    # 根据配置选择UV处理逻辑
    if CONFIG['use_simple_uv_processing']:
        tenant_uv, tenant_type_uv, tag_uv = process_reading_data(reading_files, books_data, tenants_data)
    else:
        tenant_uv, tenant_type_uv, tag_uv = process_reading_data_by_uv(reading_files, books_data, tenants_data)

    # 合并 tenant UV 和 tenant_type UV(使用配置的比例)
    merged_tenant_uv = merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio'])

    # 生成前N本书的书单
    tenant_booklist = generate_top_booklist(merged_tenant_uv, top_n=CONFIG['top_n'])
    tenant_type_booklist = generate_top_booklist(tenant_type_uv, top_n=CONFIG['top_n'])
    tag_booklist = generate_top_booklist(tag_uv, top_n=CONFIG['top_n'])

    # 写入输出文件并生成软链接
    write_output(tenant_booklist, CONFIG['output_dir'], 'tenant_booklist', current_date)
    write_output(tenant_type_booklist, CONFIG['output_dir'], 'tenant_type_booklist', current_date)
    write_output(tag_booklist, CONFIG['output_dir'], 'tag_booklist', current_date)

if __name__ == '__main__':
    main()