""" 索引服务专用日志配置。 提供独立的索引日志文件(indexer.log),用于记录全量和增量索引操作的关键信息。 参考电商搜索引擎最佳实践,记录请求、处理过程、ES写入结果等关键信息。 """ import logging import logging.handlers import json from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional class IndexerFormatter(logging.Formatter): """索引服务专用日志格式化器,输出结构化JSON格式""" def format(self, record: logging.LogRecord) -> str: """格式化日志记录为JSON格式""" log_data = { "timestamp": datetime.fromtimestamp(record.created).isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } # 添加额外字段 if hasattr(record, 'tenant_id'): log_data['tenant_id'] = record.tenant_id if hasattr(record, 'spu_id'): log_data['spu_id'] = record.spu_id if hasattr(record, 'operation'): log_data['operation'] = record.operation if hasattr(record, 'index_type'): log_data['index_type'] = record.index_type # 'bulk' or 'incremental' if hasattr(record, 'total_count'): log_data['total_count'] = record.total_count if hasattr(record, 'success_count'): log_data['success_count'] = record.success_count if hasattr(record, 'failed_count'): log_data['failed_count'] = record.failed_count if hasattr(record, 'elapsed_time'): log_data['elapsed_time'] = record.elapsed_time if hasattr(record, 'error'): log_data['error'] = record.error if hasattr(record, 'index_name'): log_data['index_name'] = record.index_name # 添加异常信息 if record.exc_info: log_data['exception'] = self.formatException(record.exc_info) return json.dumps(log_data, ensure_ascii=False) def setup_indexer_logger(log_dir: str = "logs") -> logging.Logger: """ 设置索引服务专用日志器 Args: log_dir: 日志目录 Returns: 配置好的日志器实例 """ # 创建日志目录 log_path = Path(log_dir) log_path.mkdir(parents=True, exist_ok=True) # 创建索引服务专用日志器 indexer_logger = logging.getLogger('indexer') indexer_logger.setLevel(logging.INFO) # 避免重复添加handler if indexer_logger.handlers: return indexer_logger # 创建格式化器 formatter = IndexerFormatter() # 文件handler - 每天轮转,保留30天 file_handler = logging.handlers.TimedRotatingFileHandler( filename=log_path / "indexer.log", when='midnight', interval=1, backupCount=30, encoding='utf-8' ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) indexer_logger.addHandler(file_handler) # 也输出到控制台(使用简单格式) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | [%(name)s] | %(message)s' ) console_handler.setFormatter(console_formatter) indexer_logger.addHandler(console_handler) # 防止传播到根日志器(避免重复) indexer_logger.propagate = False return indexer_logger def get_indexer_logger() -> logging.Logger: """获取索引服务日志器""" logger = logging.getLogger('indexer') if not logger.handlers: # 如果还没有配置,使用默认配置 setup_indexer_logger() return logger def log_index_request( logger: logging.Logger, index_type: str, tenant_id: str, request_params: Dict[str, Any] ): """ 记录索引请求开始 Args: logger: 日志器 index_type: 索引类型 ('bulk' 或 'incremental') tenant_id: 租户ID request_params: 请求参数 """ logger.info( f"Index request started: type={index_type}, tenant_id={tenant_id}", extra={ 'index_type': index_type, 'tenant_id': tenant_id, 'operation': 'request_start', **request_params } ) def log_index_result( logger: logging.Logger, index_type: str, tenant_id: str, total_count: int, success_count: int, failed_count: int, elapsed_time: float, index_name: Optional[str] = None, errors: Optional[list] = None, deleted_count: Optional[int] = None, explicit_deleted_count: Optional[int] = None, auto_deleted_count: Optional[int] = None ): """ 记录索引结果 Args: logger: 日志器 index_type: 索引类型 tenant_id: 租户ID total_count: 总数 success_count: 成功数 failed_count: 失败数 elapsed_time: 耗时(秒) index_name: 索引名称 errors: 错误列表 deleted_count: 删除总数(可选) explicit_deleted_count: 显式删除数(可选) auto_deleted_count: 自动删除数(可选) """ message = ( f"Index request completed: type={index_type}, tenant_id={tenant_id}, " f"total={total_count}, success={success_count}, failed={failed_count}" ) if deleted_count is not None: message += f", deleted={deleted_count}" if explicit_deleted_count is not None: message += f" (explicit={explicit_deleted_count}, auto={auto_deleted_count or 0})" message += f", elapsed={elapsed_time:.2f}s" extra_data = { 'index_type': index_type, 'tenant_id': tenant_id, 'operation': 'request_complete', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'elapsed_time': elapsed_time, 'index_name': index_name, 'error': json.dumps(errors[:10], ensure_ascii=False) if errors else None } # 添加删除统计信息(如果提供) if deleted_count is not None: extra_data['deleted_count'] = deleted_count if explicit_deleted_count is not None: extra_data['explicit_deleted_count'] = explicit_deleted_count if auto_deleted_count is not None: extra_data['auto_deleted_count'] = auto_deleted_count logger.info(message, extra=extra_data) def log_spu_processing( logger: logging.Logger, tenant_id: str, spu_id: str, status: str, error: Optional[str] = None ): """ 记录单个SPU的处理状态 Args: logger: 日志器 tenant_id: 租户ID spu_id: SPU ID status: 状态 ('fetching', 'transforming', 'indexing', 'success', 'failed') error: 错误信息(如果有) """ level = logging.ERROR if status == 'failed' else logging.INFO logger.log( level, f"SPU processing: tenant_id={tenant_id}, spu_id={spu_id}, status={status}", extra={ 'tenant_id': tenant_id, 'spu_id': spu_id, 'operation': 'spu_processing', 'status': status, 'error': error } ) def log_bulk_index_batch( logger: logging.Logger, tenant_id: str, batch_num: int, total_batches: int, batch_size: int, success: int, failed: int ): """ 记录批量索引批次信息 Args: logger: 日志器 tenant_id: 租户ID batch_num: 批次号 total_batches: 总批次数 batch_size: 批次大小 success: 成功数 failed: 失败数 """ logger.info( f"Bulk index batch: tenant_id={tenant_id}, batch={batch_num}/{total_batches}, " f"size={batch_size}, success={success}, failed={failed}", extra={ 'tenant_id': tenant_id, 'operation': 'bulk_batch', 'batch_num': batch_num, 'total_batches': total_batches, 'batch_size': batch_size, 'success_count': success, 'failed_count': failed } )