4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
"""
Request-scoped reqid/uid for logging and downstream HTTP headers.
Kept as a **top-level module** (not under ``utils/``) because ``utils/__init__.py``
pulls optional deps (e.g. sqlalchemy) that are not installed in ``.venv-embedding``.
Uvicorn ``--log-config`` and the embedding service must be able to import this module
without importing the full ``utils`` package.
"""
from __future__ import annotations
import logging
from contextvars import ContextVar, Token
from typing import Dict, Optional, Tuple
_DEFAULT_REQUEST_ID = "-1"
_DEFAULT_USER_ID = "-1"
_request_id_var: ContextVar[str] = ContextVar("request_log_reqid", default=_DEFAULT_REQUEST_ID)
_user_id_var: ContextVar[str] = ContextVar("request_log_uid", default=_DEFAULT_USER_ID)
LOG_LINE_FORMAT = (
"%(asctime)s | reqid:%(reqid)s | uid:%(uid)s | %(levelname)-8s | %(name)s | %(message)s"
)
def _normalize_value(value: Optional[str], *, fallback: str) -> str:
text = str(value or "").strip()
return text[:64] if text else fallback
def bind_request_log_context(
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Tuple[str, str, Tuple[Token[str], Token[str]]]:
"""Bind reqid/uid to contextvars for the current execution context."""
normalized_reqid = _normalize_value(request_id, fallback=_DEFAULT_REQUEST_ID)
normalized_uid = _normalize_value(user_id, fallback=_DEFAULT_USER_ID)
req_token = _request_id_var.set(normalized_reqid)
uid_token = _user_id_var.set(normalized_uid)
return normalized_reqid, normalized_uid, (req_token, uid_token)
def reset_request_log_context(tokens: Tuple[Token[str], Token[str]]) -> None:
"""Reset reqid/uid contextvars back to their previous values."""
req_token, uid_token = tokens
_request_id_var.reset(req_token)
_user_id_var.reset(uid_token)
def current_request_log_context() -> Tuple[str, str]:
"""Return the currently bound reqid/uid pair."""
return _request_id_var.get(), _user_id_var.get()
def build_request_log_extra(
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, str]:
"""Build logging extras, defaulting to the current bound context."""
current_reqid, current_uid = current_request_log_context()
return {
"reqid": _normalize_value(request_id, fallback=current_reqid),
"uid": _normalize_value(user_id, fallback=current_uid),
}
def build_downstream_request_headers(
request_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, str]:
"""Build headers for downstream service calls when request context exists."""
extra = build_request_log_extra(request_id=request_id, user_id=user_id)
if extra["reqid"] == _DEFAULT_REQUEST_ID and extra["uid"] == _DEFAULT_USER_ID:
return {}
headers = {"X-Request-ID": extra["reqid"]}
if extra["uid"]:
headers["X-User-ID"] = extra["uid"]
return headers
class RequestLogContextFilter(logging.Filter):
"""Inject reqid/uid defaults into all log records."""
def filter(self, record: logging.LogRecord) -> bool:
reqid = getattr(record, "reqid", None)
uid = getattr(record, "uid", None)
if reqid is None or uid is None:
bound_reqid, bound_uid = current_request_log_context()
reqid = reqid if reqid is not None else bound_reqid
uid = uid if uid is not None else bound_uid
if reqid == _DEFAULT_REQUEST_ID and uid == _DEFAULT_USER_ID:
try:
from context.request_context import get_current_request_context
context = get_current_request_context()
except Exception:
context = None
if context is not None:
reqid = getattr(context, "reqid", None) or reqid
uid = getattr(context, "uid", None) or uid
record.reqid = _normalize_value(reqid, fallback=_DEFAULT_REQUEST_ID)
record.uid = _normalize_value(uid, fallback=_DEFAULT_USER_ID)
return True
|