From 4650fceca00a9450a5f0de86221d105fa3d8c90e Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 23 Mar 2026 23:45:04 +0800 Subject: [PATCH] 日志优化、日志串联(uid rqid) --- api/app.py | 41 +++++++++++++++++++++++++++++++++-------- api/routes/search.py | 4 ++++ context/request_context.py | 8 ++++++++ embeddings/README.md | 2 ++ embeddings/config.py | 6 ++++++ embeddings/image_encoder.py | 32 +++++++++++++++++++++++++++++++- embeddings/server.py | 384 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- embeddings/text_embedding_tei.py | 38 +++++++++++++++++++++++++++----------- embeddings/text_encoder.py | 26 ++++++++++++++++++++++++-- query/query_parser.py | 7 ++++++- request_log_context.py | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_embedding_pipeline.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ utils/logger.py | 25 +++---------------------- 13 files changed, 520 insertions(+), 220 deletions(-) create mode 100644 request_log_context.py diff --git a/api/app.py b/api/app.py index b95b053..3266f48 100644 --- a/api/app.py +++ b/api/app.py @@ -27,6 +27,8 @@ from slowapi.errors import RateLimitExceeded # Configure backend logging import pathlib +from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter + def configure_backend_logging() -> None: log_dir = pathlib.Path("logs") @@ -34,9 +36,8 @@ def configure_backend_logging() -> None: log_level = os.getenv("LOG_LEVEL", "INFO").upper() numeric_level = getattr(logging, log_level, logging.INFO) - default_formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) + default_formatter = logging.Formatter(LOG_LINE_FORMAT) + request_filter = RequestLogContextFilter() root_logger = logging.getLogger() root_logger.setLevel(numeric_level) @@ -45,6 +46,7 @@ def configure_backend_logging() -> None: console_handler = logging.StreamHandler() console_handler.setLevel(numeric_level) console_handler.setFormatter(default_formatter) + console_handler.addFilter(request_filter) root_logger.addHandler(console_handler) backend_handler = TimedRotatingFileHandler( @@ -56,6 +58,7 @@ def configure_backend_logging() -> None: ) backend_handler.setLevel(numeric_level) backend_handler.setFormatter(default_formatter) + backend_handler.addFilter(request_filter) root_logger.addHandler(backend_handler) verbose_logger = logging.getLogger("backend.verbose") @@ -71,11 +74,16 @@ def configure_backend_logging() -> None: encoding="utf-8", ) verbose_handler.setLevel(numeric_level) - verbose_handler.setFormatter( - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - ) + verbose_handler.setFormatter(logging.Formatter(LOG_LINE_FORMAT)) + verbose_handler.addFilter(request_filter) verbose_logger.addHandler(verbose_handler) + for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"): + uvicorn_logger = logging.getLogger(logger_name) + uvicorn_logger.handlers.clear() + uvicorn_logger.setLevel(numeric_level) + uvicorn_logger.propagate = True + configure_backend_logging() logger = logging.getLogger(__name__) @@ -101,6 +109,16 @@ _suggestion_service: Optional[SuggestionService] = None _app_config = None +def _request_log_extra_from_http(request: Request) -> dict: + reqid = getattr(getattr(request, "state", None), "reqid", None) or request.headers.get("X-Request-ID") + uid = ( + getattr(getattr(request, "state", None), "uid", None) + or request.headers.get("X-User-ID") + or request.headers.get("User-ID") + ) + return {"reqid": reqid or "-1", "uid": uid or "-1"} + + def init_service(es_host: str = "http://localhost:9200"): """ Initialize search service with unified configuration. @@ -261,7 +279,11 @@ async def shutdown_event(): async def global_exception_handler(request: Request, exc: Exception): """Global exception handler with detailed logging.""" client_ip = request.client.host if request.client else "unknown" - logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True) + logger.error( + f"Unhandled exception from {client_ip}: {exc}", + exc_info=True, + extra=_request_log_extra_from_http(request), + ) return JSONResponse( status_code=500, @@ -276,7 +298,10 @@ async def global_exception_handler(request: Request, exc: Exception): @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """HTTP exception handler.""" - logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}") + logger.warning( + f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}", + extra=_request_log_extra_from_http(request), + ) return JSONResponse( status_code=exc.status_code, diff --git a/api/routes/search.py b/api/routes/search.py index e9bef63..4c2a1a4 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -59,6 +59,8 @@ async def search(request: SearchRequest, http_request: Request): Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) + http_request.state.reqid = reqid + http_request.state.uid = uid # Extract tenant_id (required) tenant_id = http_request.headers.get('X-Tenant-ID') @@ -213,6 +215,8 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) + http_request.state.reqid = reqid + http_request.state.uid = uid # Extract tenant_id (required) tenant_id = http_request.headers.get('X-Tenant-ID') diff --git a/context/request_context.py b/context/request_context.py index e3837a7..ead99e7 100644 --- a/context/request_context.py +++ b/context/request_context.py @@ -12,6 +12,8 @@ from typing import Dict, Any, Optional, List from dataclasses import dataclass, field import uuid +from request_log_context import bind_request_log_context, reset_request_log_context + class RequestContextStage(Enum): """搜索阶段枚举""" @@ -375,9 +377,15 @@ def get_current_request_context() -> Optional[RequestContext]: def set_current_request_context(context: RequestContext) -> None: """设置当前线程的请求上下文""" threading.current_thread().request_context = context + _, _, tokens = bind_request_log_context(context.reqid, context.uid) + threading.current_thread().request_log_tokens = tokens def clear_current_request_context() -> None: """清除当前线程的请求上下文""" + tokens = getattr(threading.current_thread(), 'request_log_tokens', None) + if tokens is not None: + reset_request_log_context(tokens) + delattr(threading.current_thread(), 'request_log_tokens') if hasattr(threading.current_thread(), 'request_context'): delattr(threading.current_thread(), 'request_context') \ No newline at end of file diff --git a/embeddings/README.md b/embeddings/README.md index 66e9185..c150750 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -5,6 +5,8 @@ - `../docs/TEI_SERVICE说明文档.md` - `../docs/CNCLIP_SERVICE说明文档.md` +**请求日志串联(reqid / uid)**:统一实现在仓库根目录的 `request_log_context.py`(勿放到 `utils/` 下,以免 `.venv-embedding` 因 `utils/__init__.py` 拉取数据库依赖)。Uvicorn 日志配置见 `config/uvicorn_embedding_logging.json`。 + --- 这个目录是一个完整的“向量化模块”,包含: diff --git a/embeddings/config.py b/embeddings/config.py index 2591b90..6fc6294 100644 --- a/embeddings/config.py +++ b/embeddings/config.py @@ -2,6 +2,7 @@ from __future__ import annotations +import os from typing import Optional from config.loader import get_app_config @@ -25,6 +26,11 @@ class EmbeddingConfig(object): self.TEXT_NORMALIZE_EMBEDDINGS = bool(text_backend.get("normalize_embeddings", True)) self.TEI_BASE_URL = str(text_backend.get("base_url") or "http://127.0.0.1:8080") self.TEI_TIMEOUT_SEC = int(text_backend.get("timeout_sec", 60)) + self.TEI_MAX_CLIENT_BATCH_SIZE = int( + os.getenv("TEI_MAX_CLIENT_BATCH_SIZE") + or text_backend.get("max_client_batch_size") + or 24 + ) self.USE_CLIP_AS_SERVICE = services.image_backend == "clip_as_service" self.CLIP_AS_SERVICE_SERVER = str(image_backend.get("server") or "grpc://127.0.0.1:51000") diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index 861bfbc..bd754f2 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -13,6 +13,7 @@ from config.loader import get_app_config from config.services_config import get_embedding_image_base_url from embeddings.cache_keys import build_image_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache +from request_log_context import build_downstream_request_headers, build_request_log_extra class CLIPImageEncoder: @@ -40,6 +41,8 @@ class CLIPImageEncoder: request_data: List[str], normalize_embeddings: bool = True, priority: int = 0, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> List[Any]: """ Call the embedding service API. @@ -50,6 +53,7 @@ class CLIPImageEncoder: Returns: List of embeddings (list[float]) or nulls (None), aligned to input order """ + response = None try: response = requests.post( self.endpoint, @@ -58,12 +62,26 @@ class CLIPImageEncoder: "priority": max(0, int(priority)), }, json=request_data, + headers=build_downstream_request_headers(request_id=request_id, user_id=user_id), timeout=60 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: - logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True) + body_preview = "" + if response is not None: + try: + body_preview = (response.text or "")[:300] + except Exception: + body_preview = "" + logger.error( + "CLIPImageEncoder service request failed | status=%s body=%s error=%s", + getattr(response, "status_code", "n/a"), + body_preview, + e, + exc_info=True, + extra=build_request_log_extra(request_id=request_id, user_id=user_id), + ) raise def encode_image(self, image: Image.Image) -> np.ndarray: @@ -79,6 +97,8 @@ class CLIPImageEncoder: url: str, normalize_embeddings: bool = True, priority: int = 0, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> np.ndarray: """ Generate image embedding via network service using URL. @@ -98,6 +118,8 @@ class CLIPImageEncoder: [url], normalize_embeddings=normalize_embeddings, priority=priority, + request_id=request_id, + user_id=user_id, ) if not response_data or len(response_data) != 1 or response_data[0] is None: raise RuntimeError(f"No image embedding returned for URL: {url}") @@ -113,6 +135,8 @@ class CLIPImageEncoder: batch_size: int = 8, normalize_embeddings: bool = True, priority: int = 0, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> List[np.ndarray]: """ Encode a batch of images efficiently via network service. @@ -151,6 +175,8 @@ class CLIPImageEncoder: batch_urls, normalize_embeddings=normalize_embeddings, priority=priority, + request_id=request_id, + user_id=user_id, ) if not response_data or len(response_data) != len(batch_urls): raise RuntimeError( @@ -176,6 +202,8 @@ class CLIPImageEncoder: batch_size: Optional[int] = None, normalize_embeddings: bool = True, priority: int = 0, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> List[np.ndarray]: """ 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。 @@ -192,4 +220,6 @@ class CLIPImageEncoder: batch_size=batch_size or 8, normalize_embeddings=normalize_embeddings, priority=priority, + request_id=request_id, + user_id=user_id, ) diff --git a/embeddings/server.py b/embeddings/server.py index c03c939..9e4d294 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -26,17 +26,17 @@ from embeddings.cache_keys import build_image_cache_key, build_text_cache_key from embeddings.config import CONFIG from embeddings.protocols import ImageEncoderProtocol from embeddings.redis_embedding_cache import RedisEmbeddingCache +from request_log_context import ( + LOG_LINE_FORMAT, + RequestLogContextFilter, + bind_request_log_context, + build_request_log_extra, + reset_request_log_context, +) app = FastAPI(title="saas-search Embedding Service", version="1.0.0") -class _DefaultRequestIdFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - if not hasattr(record, "reqid"): - record.reqid = "-1" - return True - - def configure_embedding_logging() -> None: root_logger = logging.getLogger() if getattr(root_logger, "_embedding_logging_configured", False): @@ -47,17 +47,15 @@ def configure_embedding_logging() -> None: log_level = os.getenv("LOG_LEVEL", "INFO").upper() numeric_level = getattr(logging, log_level, logging.INFO) - formatter = logging.Formatter( - "%(asctime)s | reqid:%(reqid)s | %(name)s | %(levelname)s | %(message)s" - ) - request_filter = _DefaultRequestIdFilter() + formatter = logging.Formatter(LOG_LINE_FORMAT) + context_filter = RequestLogContextFilter() root_logger.setLevel(numeric_level) root_logger.handlers.clear() stream_handler = logging.StreamHandler() stream_handler.setLevel(numeric_level) stream_handler.setFormatter(formatter) - stream_handler.addFilter(request_filter) + stream_handler.addFilter(context_filter) root_logger.addHandler(stream_handler) verbose_logger = logging.getLogger("embedding.verbose") @@ -231,6 +229,7 @@ class _TextDispatchTask: normalized: List[str] effective_normalize: bool request_id: str + user_id: str priority: int created_at: float done: threading.Event @@ -321,12 +320,13 @@ def _text_dispatch_worker_loop(worker_idx: int) -> None: _priority_label(task.priority), len(task.normalized), queue_wait_ms, - extra=_request_log_extra(task.request_id), + extra=build_request_log_extra(task.request_id, task.user_id), ) task.result = _embed_text_impl( task.normalized, task.effective_normalize, task.request_id, + task.user_id, task.priority, ) except Exception as exc: @@ -339,6 +339,7 @@ def _submit_text_dispatch_and_wait( normalized: List[str], effective_normalize: bool, request_id: str, + user_id: str, priority: int, ) -> _EmbedResult: if not any(worker.is_alive() for worker in _text_dispatch_workers): @@ -347,6 +348,7 @@ def _submit_text_dispatch_and_wait( normalized=normalized, effective_normalize=effective_normalize, request_id=request_id, + user_id=user_id, priority=_effective_priority(priority), created_at=time.perf_counter(), done=threading.Event(), @@ -380,6 +382,7 @@ class _SingleTextTask: priority: int created_at: float request_id: str + user_id: str done: threading.Event result: Optional[List[float]] = None error: Optional[Exception] = None @@ -435,10 +438,6 @@ def _preview_vector(vec: Optional[List[float]], max_dims: int = _VECTOR_PREVIEW_ return [round(float(v), 6) for v in vec[:max_dims]] -def _request_log_extra(request_id: str) -> Dict[str, str]: - return {"reqid": request_id} - - def _resolve_request_id(http_request: Request) -> str: header_value = http_request.headers.get("X-Request-ID") if header_value and header_value.strip(): @@ -446,6 +445,13 @@ def _resolve_request_id(http_request: Request) -> str: return str(uuid.uuid4())[:8] +def _resolve_user_id(http_request: Request) -> str: + header_value = http_request.headers.get("X-User-ID") or http_request.headers.get("User-ID") + if header_value and header_value.strip(): + return header_value.strip()[:64] + return "-1" + + def _request_client(http_request: Request) -> str: client = getattr(http_request, "client", None) host = getattr(client, "host", None) @@ -522,18 +528,21 @@ def _text_batch_worker_loop() -> None: try: queue_wait_ms = [(time.perf_counter() - task.created_at) * 1000.0 for task in batch] reqids = [task.request_id for task in batch] + uids = [task.user_id for task in batch] logger.info( - "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s", + "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s uids=%s preview=%s", len(batch), _priority_label(max(task.priority for task in batch)), min(queue_wait_ms) if queue_wait_ms else 0.0, max(queue_wait_ms) if queue_wait_ms else 0.0, reqids, + uids, _preview_inputs( [task.text for task in batch], _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS, ), + extra=build_request_log_extra(), ) batch_t0 = time.perf_counter() embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False) @@ -548,19 +557,23 @@ def _text_batch_worker_loop() -> None: raise RuntimeError("Text model returned empty embedding in micro-batch") task.result = vec logger.info( - "text microbatch done | size=%d reqids=%s dim=%d backend_elapsed_ms=%.2f", + "text microbatch done | size=%d reqids=%s uids=%s dim=%d backend_elapsed_ms=%.2f", len(batch), reqids, + uids, len(batch[0].result) if batch and batch[0].result is not None else 0, (time.perf_counter() - batch_t0) * 1000.0, + extra=build_request_log_extra(), ) except Exception as exc: logger.error( - "text microbatch failed | size=%d reqids=%s error=%s", + "text microbatch failed | size=%d reqids=%s uids=%s error=%s", len(batch), [task.request_id for task in batch], + [task.user_id for task in batch], exc, exc_info=True, + extra=build_request_log_extra(), ) for task in batch: task.error = exc @@ -573,6 +586,7 @@ def _encode_single_text_with_microbatch( text: str, normalize: bool, request_id: str, + user_id: str, priority: int, ) -> List[float]: task = _SingleTextTask( @@ -581,6 +595,7 @@ def _encode_single_text_with_microbatch( priority=_effective_priority(priority), created_at=time.perf_counter(), request_id=request_id, + user_id=user_id, done=threading.Event(), ) with _text_single_queue_cv: @@ -632,6 +647,9 @@ def load_models(): _text_model = TEITextModel( base_url=str(base_url), timeout_sec=timeout_sec, + max_client_batch_size=int( + backend_cfg.get("max_client_batch_size") or CONFIG.TEI_MAX_CLIENT_BATCH_SIZE + ), ) elif backend_name == "local_st": from embeddings.text_embedding_sentence_transformers import Qwen3TextModel @@ -823,6 +841,7 @@ def _embed_text_impl( normalized: List[str], effective_normalize: bool, request_id: str, + user_id: str, priority: int = 0, ) -> _EmbedResult: if _text_model is None: @@ -854,7 +873,7 @@ def _embed_text_impl( effective_normalize, len(out[0]) if out and out[0] is not None else 0, cache_hits, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return _EmbedResult( vectors=out, @@ -873,6 +892,7 @@ def _embed_text_impl( missing_texts[0], normalize=effective_normalize, request_id=request_id, + user_id=user_id, priority=priority, ) ] @@ -905,7 +925,7 @@ def _embed_text_impl( "Text embedding backend failure: %s", e, exc_info=True, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) raise RuntimeError(f"Text embedding backend failure: {e}") from e @@ -931,7 +951,7 @@ def _embed_text_impl( cache_hits, len(missing_texts), backend_elapsed_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return _EmbedResult( vectors=out, @@ -954,75 +974,79 @@ async def embed_text( raise HTTPException(status_code=503, detail="Text embedding model not loaded in this service") request_id = _resolve_request_id(http_request) + user_id = _resolve_user_id(http_request) + _, _, log_tokens = bind_request_log_context(request_id, user_id) response.headers["X-Request-ID"] = request_id - - if priority < 0: - raise HTTPException(status_code=400, detail="priority must be >= 0") - effective_priority = _effective_priority(priority) - effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) - normalized: List[str] = [] - for i, t in enumerate(texts): - if not isinstance(t, str): - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string") - s = t.strip() - if not s: - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string") - normalized.append(s) - - cache_check_started = time.perf_counter() - cache_only = _try_full_text_cache_hit(normalized, effective_normalize) - if cache_only is not None: - latency_ms = (time.perf_counter() - cache_check_started) * 1000.0 - _text_stats.record_completed( - success=True, - latency_ms=latency_ms, - backend_latency_ms=0.0, - cache_hits=cache_only.cache_hits, - cache_misses=0, - ) - logger.info( - "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", - _text_backend_name, - _priority_label(effective_priority), - len(normalized), - effective_normalize, - len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, - cache_only.cache_hits, - _preview_vector(cache_only.vectors[0] if cache_only.vectors else None), - latency_ms, - extra=_request_log_extra(request_id), - ) - return cache_only.vectors - - accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0) - if not accepted: - _text_stats.record_rejected() - logger.warning( - "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", - _request_client(http_request), - _text_backend_name, - _priority_label(effective_priority), - len(normalized), - effective_normalize, - active, - _TEXT_MAX_INFLIGHT, - _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), - extra=_request_log_extra(request_id), - ) - raise HTTPException( - status_code=_OVERLOAD_STATUS_CODE, - detail=( - "Text embedding service busy for priority=0 requests: " - f"active={active}, limit={_TEXT_MAX_INFLIGHT}" - ), - ) - + response.headers["X-User-ID"] = user_id request_started = time.perf_counter() success = False backend_elapsed_ms = 0.0 cache_hits = 0 cache_misses = 0 + limiter_acquired = False + try: + if priority < 0: + raise HTTPException(status_code=400, detail="priority must be >= 0") + effective_priority = _effective_priority(priority) + effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) + normalized: List[str] = [] + for i, t in enumerate(texts): + if not isinstance(t, str): + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string") + s = t.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string") + normalized.append(s) + + cache_check_started = time.perf_counter() + cache_only = _try_full_text_cache_hit(normalized, effective_normalize) + if cache_only is not None: + latency_ms = (time.perf_counter() - cache_check_started) * 1000.0 + _text_stats.record_completed( + success=True, + latency_ms=latency_ms, + backend_latency_ms=0.0, + cache_hits=cache_only.cache_hits, + cache_misses=0, + ) + logger.info( + "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + _text_backend_name, + _priority_label(effective_priority), + len(normalized), + effective_normalize, + len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, + cache_only.cache_hits, + _preview_vector(cache_only.vectors[0] if cache_only.vectors else None), + latency_ms, + extra=build_request_log_extra(request_id, user_id), + ) + return cache_only.vectors + + accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0) + if not accepted: + _text_stats.record_rejected() + logger.warning( + "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + _text_backend_name, + _priority_label(effective_priority), + len(normalized), + effective_normalize, + active, + _TEXT_MAX_INFLIGHT, + _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), + extra=build_request_log_extra(request_id, user_id), + ) + raise HTTPException( + status_code=_OVERLOAD_STATUS_CODE, + detail=( + "Text embedding service busy for priority=0 requests: " + f"active={active}, limit={_TEXT_MAX_INFLIGHT}" + ), + ) + limiter_acquired = True logger.info( "embed_text request | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), @@ -1033,7 +1057,7 @@ async def embed_text( active, _TEXT_MAX_INFLIGHT, _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS), - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( "embed_text detail | payload=%s normalize=%s backend=%s priority=%s", @@ -1041,13 +1065,14 @@ async def embed_text( effective_normalize, _text_backend_name, _priority_label(effective_priority), - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) result = await run_in_threadpool( _submit_text_dispatch_and_wait, normalized, effective_normalize, request_id, + user_id, effective_priority, ) success = True @@ -1074,7 +1099,7 @@ async def embed_text( cache_misses, _preview_vector(result.vectors[0] if result.vectors else None), latency_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( "embed_text result detail | count=%d priority=%s first_vector=%s latency_ms=%.2f", @@ -1084,7 +1109,7 @@ async def embed_text( if result.vectors and result.vectors[0] is not None else [], latency_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return result.vectors except HTTPException: @@ -1107,24 +1132,27 @@ async def embed_text( latency_ms, e, exc_info=True, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) raise HTTPException(status_code=502, detail=str(e)) from e finally: - remaining = _text_request_limiter.release(success=success) - logger.info( - "embed_text finalize | success=%s priority=%s active_after=%d", - success, - _priority_label(effective_priority), - remaining, - extra=_request_log_extra(request_id), - ) + if limiter_acquired: + remaining = _text_request_limiter.release(success=success) + logger.info( + "embed_text finalize | success=%s priority=%s active_after=%d", + success, + _priority_label(effective_priority), + remaining, + extra=build_request_log_extra(request_id, user_id), + ) + reset_request_log_context(log_tokens) def _embed_image_impl( urls: List[str], effective_normalize: bool, request_id: str, + user_id: str, ) -> _EmbedResult: if _image_model is None: raise RuntimeError("Image model not loaded") @@ -1154,7 +1182,7 @@ def _embed_image_impl( effective_normalize, len(out[0]) if out and out[0] is not None else 0, cache_hits, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return _EmbedResult( vectors=out, @@ -1194,7 +1222,7 @@ def _embed_image_impl( cache_hits, len(missing_urls), backend_elapsed_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return _EmbedResult( vectors=out, @@ -1217,74 +1245,78 @@ async def embed_image( raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service") request_id = _resolve_request_id(http_request) + user_id = _resolve_user_id(http_request) + _, _, log_tokens = bind_request_log_context(request_id, user_id) response.headers["X-Request-ID"] = request_id - - if priority < 0: - raise HTTPException(status_code=400, detail="priority must be >= 0") - effective_priority = _effective_priority(priority) - - effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) - urls: List[str] = [] - for i, url_or_path in enumerate(images): - if not isinstance(url_or_path, str): - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") - s = url_or_path.strip() - if not s: - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") - urls.append(s) - - cache_check_started = time.perf_counter() - cache_only = _try_full_image_cache_hit(urls, effective_normalize) - if cache_only is not None: - latency_ms = (time.perf_counter() - cache_check_started) * 1000.0 - _image_stats.record_completed( - success=True, - latency_ms=latency_ms, - backend_latency_ms=0.0, - cache_hits=cache_only.cache_hits, - cache_misses=0, - ) - logger.info( - "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", - _priority_label(effective_priority), - len(urls), - effective_normalize, - len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, - cache_only.cache_hits, - _preview_vector(cache_only.vectors[0] if cache_only.vectors else None), - latency_ms, - extra=_request_log_extra(request_id), - ) - return cache_only.vectors - - accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0) - if not accepted: - _image_stats.record_rejected() - logger.warning( - "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", - _request_client(http_request), - _priority_label(effective_priority), - len(urls), - effective_normalize, - active, - _IMAGE_MAX_INFLIGHT, - _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), - extra=_request_log_extra(request_id), - ) - raise HTTPException( - status_code=_OVERLOAD_STATUS_CODE, - detail=( - "Image embedding service busy for priority=0 requests: " - f"active={active}, limit={_IMAGE_MAX_INFLIGHT}" - ), - ) - + response.headers["X-User-ID"] = user_id request_started = time.perf_counter() success = False backend_elapsed_ms = 0.0 cache_hits = 0 cache_misses = 0 + limiter_acquired = False + try: + if priority < 0: + raise HTTPException(status_code=400, detail="priority must be >= 0") + effective_priority = _effective_priority(priority) + + effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize) + urls: List[str] = [] + for i, url_or_path in enumerate(images): + if not isinstance(url_or_path, str): + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") + s = url_or_path.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") + urls.append(s) + + cache_check_started = time.perf_counter() + cache_only = _try_full_image_cache_hit(urls, effective_normalize) + if cache_only is not None: + latency_ms = (time.perf_counter() - cache_check_started) * 1000.0 + _image_stats.record_completed( + success=True, + latency_ms=latency_ms, + backend_latency_ms=0.0, + cache_hits=cache_only.cache_hits, + cache_misses=0, + ) + logger.info( + "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f", + _priority_label(effective_priority), + len(urls), + effective_normalize, + len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0, + cache_only.cache_hits, + _preview_vector(cache_only.vectors[0] if cache_only.vectors else None), + latency_ms, + extra=build_request_log_extra(request_id, user_id), + ) + return cache_only.vectors + + accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0) + if not accepted: + _image_stats.record_rejected() + logger.warning( + "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", + _request_client(http_request), + _priority_label(effective_priority), + len(urls), + effective_normalize, + active, + _IMAGE_MAX_INFLIGHT, + _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), + extra=build_request_log_extra(request_id, user_id), + ) + raise HTTPException( + status_code=_OVERLOAD_STATUS_CODE, + detail=( + "Image embedding service busy for priority=0 requests: " + f"active={active}, limit={_IMAGE_MAX_INFLIGHT}" + ), + ) + limiter_acquired = True logger.info( "embed_image request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s", _request_client(http_request), @@ -1294,16 +1326,16 @@ async def embed_image( active, _IMAGE_MAX_INFLIGHT, _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS), - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( "embed_image detail | payload=%s normalize=%s priority=%s", urls, effective_normalize, _priority_label(effective_priority), - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) - result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id) + result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id, user_id) success = True backend_elapsed_ms = result.backend_elapsed_ms cache_hits = result.cache_hits @@ -1327,7 +1359,7 @@ async def embed_image( cache_misses, _preview_vector(result.vectors[0] if result.vectors else None), latency_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) verbose_logger.info( "embed_image result detail | count=%d first_vector=%s latency_ms=%.2f", @@ -1336,7 +1368,7 @@ async def embed_image( if result.vectors and result.vectors[0] is not None else [], latency_ms, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) return result.vectors except HTTPException: @@ -1358,15 +1390,17 @@ async def embed_image( latency_ms, e, exc_info=True, - extra=_request_log_extra(request_id), + extra=build_request_log_extra(request_id, user_id), ) raise HTTPException(status_code=502, detail=f"Image embedding backend failure: {e}") from e finally: - remaining = _image_request_limiter.release(success=success) - logger.info( - "embed_image finalize | success=%s priority=%s active_after=%d", - success, - _priority_label(effective_priority), - remaining, - extra=_request_log_extra(request_id), - ) + if limiter_acquired: + remaining = _image_request_limiter.release(success=success) + logger.info( + "embed_image finalize | success=%s priority=%s active_after=%d", + success, + _priority_label(effective_priority), + remaining, + extra=build_request_log_extra(request_id, user_id), + ) + reset_request_log_context(log_tokens) diff --git a/embeddings/text_embedding_tei.py b/embeddings/text_embedding_tei.py index 908fac9..2d29502 100644 --- a/embeddings/text_embedding_tei.py +++ b/embeddings/text_embedding_tei.py @@ -2,11 +2,14 @@ from __future__ import annotations +import logging from typing import Any, List, Union import numpy as np import requests +logger = logging.getLogger(__name__) + class TEITextModel: """ @@ -18,12 +21,13 @@ class TEITextModel: response: [[...], [...], ...] """ - def __init__(self, base_url: str, timeout_sec: int = 60): + def __init__(self, base_url: str, timeout_sec: int = 60, max_client_batch_size: int = 24): if not base_url or not str(base_url).strip(): raise ValueError("TEI base_url must not be empty") self.base_url = str(base_url).rstrip("/") self.endpoint = f"{self.base_url}/embed" self.timeout_sec = int(timeout_sec) + self.max_client_batch_size = max(1, int(max_client_batch_size)) self._health_check() def _health_check(self) -> None: @@ -72,16 +76,28 @@ class TEITextModel: if not isinstance(t, str) or not t.strip(): raise ValueError(f"Invalid input text at index {i}: {t!r}") - response = requests.post( - self.endpoint, - json={"inputs": texts}, - timeout=self.timeout_sec, - ) - response.raise_for_status() - payload = response.json() - vectors = self._parse_payload(payload, expected_len=len(texts)) - if normalize_embeddings: - vectors = [self._normalize(vec) for vec in vectors] + if len(texts) > self.max_client_batch_size: + logger.info( + "TEI batch split | total_inputs=%d chunk_size=%d chunks=%d", + len(texts), + self.max_client_batch_size, + (len(texts) + self.max_client_batch_size - 1) // self.max_client_batch_size, + ) + + vectors: List[np.ndarray] = [] + for start in range(0, len(texts), self.max_client_batch_size): + batch = texts[start : start + self.max_client_batch_size] + response = requests.post( + self.endpoint, + json={"inputs": batch}, + timeout=self.timeout_sec, + ) + response.raise_for_status() + payload = response.json() + parsed = self._parse_payload(payload, expected_len=len(batch)) + if normalize_embeddings: + parsed = [self._normalize(vec) for vec in parsed] + vectors.extend(parsed) return np.array(vectors, dtype=object) def _parse_payload(self, payload: Any, expected_len: int) -> List[np.ndarray]: diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 6202931..570041c 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -13,6 +13,7 @@ from config.loader import get_app_config from config.services_config import get_embedding_text_base_url from embeddings.cache_keys import build_text_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache +from request_log_context import build_downstream_request_headers, build_request_log_extra class TextEmbeddingEncoder: @@ -40,6 +41,8 @@ class TextEmbeddingEncoder: request_data: List[str], normalize_embeddings: bool = True, priority: int = 0, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> List[Any]: """ Call the embedding service API. @@ -50,6 +53,7 @@ class TextEmbeddingEncoder: Returns: List of embeddings (list[float]) or nulls (None), aligned to input order """ + response = None try: response = requests.post( self.endpoint, @@ -58,12 +62,26 @@ class TextEmbeddingEncoder: "priority": max(0, int(priority)), }, json=request_data, + headers=build_downstream_request_headers(request_id=request_id, user_id=user_id), timeout=60 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: - logger.error(f"TextEmbeddingEncoder service request failed: {e}", exc_info=True) + body_preview = "" + if response is not None: + try: + body_preview = (response.text or "")[:300] + except Exception: + body_preview = "" + logger.error( + "TextEmbeddingEncoder service request failed | status=%s body=%s error=%s", + getattr(response, "status_code", "n/a"), + body_preview, + e, + exc_info=True, + extra=build_request_log_extra(request_id=request_id, user_id=user_id), + ) raise def encode( @@ -72,7 +90,9 @@ class TextEmbeddingEncoder: normalize_embeddings: bool = True, priority: int = 0, device: str = 'cpu', - batch_size: int = 32 + batch_size: int = 32, + request_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> np.ndarray: """ Encode text into embeddings via network service with Redis caching. @@ -113,6 +133,8 @@ class TextEmbeddingEncoder: request_data, normalize_embeddings=normalize_embeddings, priority=priority, + request_id=request_id, + user_id=user_id, ) # Process response diff --git a/query/query_parser.py b/query/query_parser.py index 0063308..64edaad 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -301,7 +301,12 @@ class QueryParser: log_debug("Submitting query vector generation") def _encode_query_vector() -> Optional[np.ndarray]: - arr = self.text_encoder.encode([query_text], priority=1) + arr = self.text_encoder.encode( + [query_text], + priority=1, + request_id=(context.reqid if context else None), + user_id=(context.uid if context else None), + ) if arr is None or len(arr) == 0: return None vec = arr[0] diff --git a/request_log_context.py b/request_log_context.py new file mode 100644 index 0000000..5d29c86 --- /dev/null +++ b/request_log_context.py @@ -0,0 +1,107 @@ +""" +Request-scoped reqid/uid for logging and downstream HTTP headers. + +Kept as a **top-level module** (not under ``utils/``) because ``utils/__init__.py`` +pulls optional deps (e.g. sqlalchemy) that are not installed in ``.venv-embedding``. +Uvicorn ``--log-config`` and the embedding service must be able to import this module +without importing the full ``utils`` package. +""" + +from __future__ import annotations + +import logging +from contextvars import ContextVar, Token +from typing import Dict, Optional, Tuple + +_DEFAULT_REQUEST_ID = "-1" +_DEFAULT_USER_ID = "-1" + +_request_id_var: ContextVar[str] = ContextVar("request_log_reqid", default=_DEFAULT_REQUEST_ID) +_user_id_var: ContextVar[str] = ContextVar("request_log_uid", default=_DEFAULT_USER_ID) + +LOG_LINE_FORMAT = ( + "%(asctime)s | reqid:%(reqid)s | uid:%(uid)s | %(levelname)-8s | %(name)s | %(message)s" +) + + +def _normalize_value(value: Optional[str], *, fallback: str) -> str: + text = str(value or "").strip() + return text[:64] if text else fallback + + +def bind_request_log_context( + request_id: Optional[str] = None, + user_id: Optional[str] = None, +) -> Tuple[str, str, Tuple[Token[str], Token[str]]]: + """Bind reqid/uid to contextvars for the current execution context.""" + normalized_reqid = _normalize_value(request_id, fallback=_DEFAULT_REQUEST_ID) + normalized_uid = _normalize_value(user_id, fallback=_DEFAULT_USER_ID) + req_token = _request_id_var.set(normalized_reqid) + uid_token = _user_id_var.set(normalized_uid) + return normalized_reqid, normalized_uid, (req_token, uid_token) + + +def reset_request_log_context(tokens: Tuple[Token[str], Token[str]]) -> None: + """Reset reqid/uid contextvars back to their previous values.""" + req_token, uid_token = tokens + _request_id_var.reset(req_token) + _user_id_var.reset(uid_token) + + +def current_request_log_context() -> Tuple[str, str]: + """Return the currently bound reqid/uid pair.""" + return _request_id_var.get(), _user_id_var.get() + + +def build_request_log_extra( + request_id: Optional[str] = None, + user_id: Optional[str] = None, +) -> Dict[str, str]: + """Build logging extras, defaulting to the current bound context.""" + current_reqid, current_uid = current_request_log_context() + return { + "reqid": _normalize_value(request_id, fallback=current_reqid), + "uid": _normalize_value(user_id, fallback=current_uid), + } + + +def build_downstream_request_headers( + request_id: Optional[str] = None, + user_id: Optional[str] = None, +) -> Dict[str, str]: + """Build headers for downstream service calls when request context exists.""" + extra = build_request_log_extra(request_id=request_id, user_id=user_id) + if extra["reqid"] == _DEFAULT_REQUEST_ID and extra["uid"] == _DEFAULT_USER_ID: + return {} + headers = {"X-Request-ID": extra["reqid"]} + if extra["uid"]: + headers["X-User-ID"] = extra["uid"] + return headers + + +class RequestLogContextFilter(logging.Filter): + """Inject reqid/uid defaults into all log records.""" + + def filter(self, record: logging.LogRecord) -> bool: + reqid = getattr(record, "reqid", None) + uid = getattr(record, "uid", None) + + if reqid is None or uid is None: + bound_reqid, bound_uid = current_request_log_context() + reqid = reqid if reqid is not None else bound_reqid + uid = uid if uid is not None else bound_uid + + if reqid == _DEFAULT_REQUEST_ID and uid == _DEFAULT_USER_ID: + try: + from context.request_context import get_current_request_context + + context = get_current_request_context() + except Exception: + context = None + if context is not None: + reqid = getattr(context, "reqid", None) or reqid + uid = getattr(context, "uid", None) or uid + + record.reqid = _normalize_value(reqid, fallback=_DEFAULT_REQUEST_ID) + record.uid = _normalize_value(uid, fallback=_DEFAULT_USER_ID) + return True diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py index 8670718..bd1734c 100644 --- a/tests/test_embedding_pipeline.py +++ b/tests/test_embedding_pipeline.py @@ -13,9 +13,11 @@ from config import ( ) from embeddings.text_encoder import TextEmbeddingEncoder from embeddings.image_encoder import CLIPImageEncoder +from embeddings.text_embedding_tei import TEITextModel from embeddings.bf16 import encode_embedding_for_redis from embeddings.cache_keys import build_image_cache_key, build_text_cache_key from query import QueryParser +from context.request_context import create_request_context, set_current_request_context, clear_current_request_context class _FakeRedis: @@ -168,6 +170,30 @@ def test_text_embedding_encoder_cache_hit(monkeypatch): assert np.allclose(out[1], np.array([0.3, 0.4], dtype=np.float32)) +def test_text_embedding_encoder_forwards_request_headers(monkeypatch): + fake_cache = _FakeEmbeddingCache() + monkeypatch.setattr("embeddings.text_encoder.RedisEmbeddingCache", lambda **kwargs: fake_cache) + + captured = {} + + def _fake_post(url, json, timeout, **kwargs): + captured["headers"] = dict(kwargs.get("headers") or {}) + return _FakeResponse([[0.1, 0.2]]) + + monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) + + context = create_request_context(reqid="req-ctx-1", uid="user-ctx-1") + set_current_request_context(context) + try: + encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005") + encoder.encode(["hello"]) + finally: + clear_current_request_context() + + assert captured["headers"]["X-Request-ID"] == "req-ctx-1" + assert captured["headers"]["X-User-ID"] == "user-ctx-1" + + def test_image_embedding_encoder_cache_hit(monkeypatch): fake_cache = _FakeEmbeddingCache() cached = np.array([0.5, 0.6], dtype=np.float32) @@ -234,3 +260,37 @@ def test_query_parser_skips_query_vector_when_disabled(): parsed = parser.parse("red dress", tenant_id="162", generate_vector=False) assert parsed.query_vector is None + + +def test_tei_text_model_splits_batches_over_client_limit(monkeypatch): + monkeypatch.setattr(TEITextModel, "_health_check", lambda self: None) + calls = [] + + class _Response: + def __init__(self, payload): + self._payload = payload + + def raise_for_status(self): + return None + + def json(self): + return self._payload + + def _fake_post(url, json, timeout): + inputs = list(json["inputs"]) + calls.append(inputs) + return _Response([[float(idx)] for idx, _ in enumerate(inputs, start=1)]) + + monkeypatch.setattr("embeddings.text_embedding_tei.requests.post", _fake_post) + + model = TEITextModel( + base_url="http://127.0.0.1:8080", + timeout_sec=20, + max_client_batch_size=24, + ) + vectors = model.encode([f"text-{idx}" for idx in range(25)], normalize_embeddings=False) + + assert len(calls) == 2 + assert len(calls[0]) == 24 + assert len(calls[1]) == 1 + assert len(vectors) == 25 diff --git a/utils/logger.py b/utils/logger.py index 718947d..c6643f3 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -14,6 +14,8 @@ from datetime import datetime from typing import Any, Dict, Optional from pathlib import Path +from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter + class StructuredFormatter(logging.Formatter): """Structured JSON formatter with request context support""" @@ -89,25 +91,6 @@ def _log_with_context(logger: logging.Logger, level: int, msg: str, **kwargs): logging.setLogRecordFactory(old_factory) -class RequestContextFilter(logging.Filter): - """Filter that automatically injects request context from thread-local storage""" - - def filter(self, record: logging.LogRecord) -> bool: - """Inject request context from thread-local storage""" - try: - # Import here to avoid circular imports - from context.request_context import get_current_request_context - context = get_current_request_context() - if context: - # Ensure every request-scoped log record carries reqid/uid. - # If they are missing in the context, fall back to "-1". - record.reqid = getattr(context, "reqid", None) or "-1" - record.uid = getattr(context, "uid", None) or "-1" - except (ImportError, AttributeError): - pass - return True - - class ContextAwareConsoleFormatter(logging.Formatter): """ Console formatter that injects reqid/uid into the log line. @@ -156,9 +139,7 @@ def setup_logging( # Create formatters structured_formatter = StructuredFormatter() - console_formatter = ContextAwareConsoleFormatter( - '%(asctime)s | reqid:%(reqid)s | uid:%(uid)s | %(levelname)-8s | %(name)-15s | %(message)s' - ) + console_formatter = ContextAwareConsoleFormatter(LOG_LINE_FORMAT) # Add console handler if enable_console: -- libgit2 0.21.2