indexer_logger.py 8.14 KB
"""
索引服务专用日志配置。

提供独立的索引日志文件(indexer.log),用于记录全量和增量索引操作的关键信息。
参考电商搜索引擎最佳实践,记录请求、处理过程、ES写入结果等关键信息。
"""

import logging
import logging.handlers
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional


class IndexerFormatter(logging.Formatter):
    """索引服务专用日志格式化器,输出结构化JSON格式"""
    
    def format(self, record: logging.LogRecord) -> str:
        """格式化日志记录为JSON格式"""
        log_data = {
            "timestamp": datetime.fromtimestamp(record.created).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        
        # 添加额外字段
        if hasattr(record, 'tenant_id'):
            log_data['tenant_id'] = record.tenant_id
        if hasattr(record, 'spu_id'):
            log_data['spu_id'] = record.spu_id
        if hasattr(record, 'operation'):
            log_data['operation'] = record.operation
        if hasattr(record, 'index_type'):
            log_data['index_type'] = record.index_type  # 'bulk' or 'incremental'
        if hasattr(record, 'total_count'):
            log_data['total_count'] = record.total_count
        if hasattr(record, 'success_count'):
            log_data['success_count'] = record.success_count
        if hasattr(record, 'failed_count'):
            log_data['failed_count'] = record.failed_count
        if hasattr(record, 'elapsed_time'):
            log_data['elapsed_time'] = record.elapsed_time
        if hasattr(record, 'error'):
            log_data['error'] = record.error
        if hasattr(record, 'index_name'):
            log_data['index_name'] = record.index_name
        
        # 添加异常信息
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_data, ensure_ascii=False)


def setup_indexer_logger(log_dir: str = "logs") -> logging.Logger:
    """
    设置索引服务专用日志器
    
    Args:
        log_dir: 日志目录
        
    Returns:
        配置好的日志器实例
    """
    # 创建日志目录
    log_path = Path(log_dir)
    log_path.mkdir(parents=True, exist_ok=True)
    
    # 创建索引服务专用日志器
    indexer_logger = logging.getLogger('indexer')
    indexer_logger.setLevel(logging.INFO)
    
    # 避免重复添加handler
    if indexer_logger.handlers:
        return indexer_logger
    
    # 创建格式化器
    formatter = IndexerFormatter()
    
    # 文件handler - 每天轮转,保留30天
    file_handler = logging.handlers.TimedRotatingFileHandler(
        filename=log_path / "indexer.log",
        when='midnight',
        interval=1,
        backupCount=30,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    indexer_logger.addHandler(file_handler)
    
    # 也输出到控制台(使用简单格式)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter(
        '%(asctime)s | %(levelname)-8s | [%(name)s] | %(message)s'
    )
    console_handler.setFormatter(console_formatter)
    indexer_logger.addHandler(console_handler)
    
    # 防止传播到根日志器(避免重复)
    indexer_logger.propagate = False
    
    return indexer_logger


def get_indexer_logger() -> logging.Logger:
    """获取索引服务日志器"""
    logger = logging.getLogger('indexer')
    if not logger.handlers:
        # 如果还没有配置,使用默认配置
        setup_indexer_logger()
    return logger


def log_index_request(
    logger: logging.Logger,
    index_type: str,
    tenant_id: str,
    request_params: Dict[str, Any]
):
    """
    记录索引请求开始
    
    Args:
        logger: 日志器
        index_type: 索引类型 ('bulk' 或 'incremental')
        tenant_id: 租户ID
        request_params: 请求参数
    """
    logger.info(
        f"Index request started: type={index_type}, tenant_id={tenant_id}",
        extra={
            'index_type': index_type,
            'tenant_id': tenant_id,
            'operation': 'request_start',
            **request_params
        }
    )


def log_index_result(
    logger: logging.Logger,
    index_type: str,
    tenant_id: str,
    total_count: int,
    success_count: int,
    failed_count: int,
    elapsed_time: float,
    index_name: Optional[str] = None,
    errors: Optional[list] = None,
    deleted_count: Optional[int] = None,
    explicit_deleted_count: Optional[int] = None,
    auto_deleted_count: Optional[int] = None
):
    """
    记录索引结果
    
    Args:
        logger: 日志器
        index_type: 索引类型
        tenant_id: 租户ID
        total_count: 总数
        success_count: 成功数
        failed_count: 失败数
        elapsed_time: 耗时(秒)
        index_name: 索引名称
        errors: 错误列表
        deleted_count: 删除总数(可选)
        explicit_deleted_count: 显式删除数(可选)
        auto_deleted_count: 自动删除数(可选)
    """
    message = (
        f"Index request completed: type={index_type}, tenant_id={tenant_id}, "
        f"total={total_count}, success={success_count}, failed={failed_count}"
    )
    
    if deleted_count is not None:
        message += f", deleted={deleted_count}"
        if explicit_deleted_count is not None:
            message += f" (explicit={explicit_deleted_count}, auto={auto_deleted_count or 0})"
    
    message += f", elapsed={elapsed_time:.2f}s"
    
    extra_data = {
        'index_type': index_type,
        'tenant_id': tenant_id,
        'operation': 'request_complete',
        'total_count': total_count,
        'success_count': success_count,
        'failed_count': failed_count,
        'elapsed_time': elapsed_time,
        'index_name': index_name,
        'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None
    }
    
    # 添加删除统计信息(如果提供)
    if deleted_count is not None:
        extra_data['deleted_count'] = deleted_count
    if explicit_deleted_count is not None:
        extra_data['explicit_deleted_count'] = explicit_deleted_count
    if auto_deleted_count is not None:
        extra_data['auto_deleted_count'] = auto_deleted_count
    
    logger.info(message, extra=extra_data)


def log_spu_processing(
    logger: logging.Logger,
    tenant_id: str,
    spu_id: str,
    status: str,
    error: Optional[str] = None
):
    """
    记录单个SPU的处理状态
    
    Args:
        logger: 日志器
        tenant_id: 租户ID
        spu_id: SPU ID
        status: 状态 ('fetching', 'transforming', 'indexing', 'success', 'failed')
        error: 错误信息(如果有)
    """
    level = logging.ERROR if status == 'failed' else logging.INFO
    logger.log(
        level,
        f"SPU processing: tenant_id={tenant_id}, spu_id={spu_id}, status={status}",
        extra={
            'tenant_id': tenant_id,
            'spu_id': spu_id,
            'operation': 'spu_processing',
            'status': status,
            'error': error
        }
    )


def log_bulk_index_batch(
    logger: logging.Logger,
    tenant_id: str,
    batch_num: int,
    total_batches: int,
    batch_size: int,
    success: int,
    failed: int
):
    """
    记录批量索引批次信息
    
    Args:
        logger: 日志器
        tenant_id: 租户ID
        batch_num: 批次号
        total_batches: 总批次数
        batch_size: 批次大小
        success: 成功数
        failed: 失败数
    """
    logger.info(
        f"Bulk index batch: tenant_id={tenant_id}, batch={batch_num}/{total_batches}, "
        f"size={batch_size}, success={success}, failed={failed}",
        extra={
            'tenant_id': tenant_id,
            'operation': 'bulk_batch',
            'batch_num': batch_num,
            'total_batches': total_batches,
            'batch_size': batch_size,
            'success_count': success,
            'failed_count': failed
        }
    )