Commit 4650fceca00a9450a5f0de86221d105fa3d8c90e

Authored by tangwang
1 parent cda1cd62

日志优化、日志串联(uid rqid)

api/app.py
... ... @@ -27,6 +27,8 @@ from slowapi.errors import RateLimitExceeded
27 27 # Configure backend logging
28 28 import pathlib
29 29  
  30 +from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter
  31 +
30 32  
31 33 def configure_backend_logging() -> None:
32 34 log_dir = pathlib.Path("logs")
... ... @@ -34,9 +36,8 @@ def configure_backend_logging() -> None:
34 36 log_level = os.getenv("LOG_LEVEL", "INFO").upper()
35 37 numeric_level = getattr(logging, log_level, logging.INFO)
36 38  
37   - default_formatter = logging.Formatter(
38   - "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
39   - )
  39 + default_formatter = logging.Formatter(LOG_LINE_FORMAT)
  40 + request_filter = RequestLogContextFilter()
40 41  
41 42 root_logger = logging.getLogger()
42 43 root_logger.setLevel(numeric_level)
... ... @@ -45,6 +46,7 @@ def configure_backend_logging() -> None:
45 46 console_handler = logging.StreamHandler()
46 47 console_handler.setLevel(numeric_level)
47 48 console_handler.setFormatter(default_formatter)
  49 + console_handler.addFilter(request_filter)
48 50 root_logger.addHandler(console_handler)
49 51  
50 52 backend_handler = TimedRotatingFileHandler(
... ... @@ -56,6 +58,7 @@ def configure_backend_logging() -> None:
56 58 )
57 59 backend_handler.setLevel(numeric_level)
58 60 backend_handler.setFormatter(default_formatter)
  61 + backend_handler.addFilter(request_filter)
59 62 root_logger.addHandler(backend_handler)
60 63  
61 64 verbose_logger = logging.getLogger("backend.verbose")
... ... @@ -71,11 +74,16 @@ def configure_backend_logging() -> None:
71 74 encoding="utf-8",
72 75 )
73 76 verbose_handler.setLevel(numeric_level)
74   - verbose_handler.setFormatter(
75   - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
76   - )
  77 + verbose_handler.setFormatter(logging.Formatter(LOG_LINE_FORMAT))
  78 + verbose_handler.addFilter(request_filter)
77 79 verbose_logger.addHandler(verbose_handler)
78 80  
  81 + for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"):
  82 + uvicorn_logger = logging.getLogger(logger_name)
  83 + uvicorn_logger.handlers.clear()
  84 + uvicorn_logger.setLevel(numeric_level)
  85 + uvicorn_logger.propagate = True
  86 +
79 87  
80 88 configure_backend_logging()
81 89 logger = logging.getLogger(__name__)
... ... @@ -101,6 +109,16 @@ _suggestion_service: Optional[SuggestionService] = None
101 109 _app_config = None
102 110  
103 111  
  112 +def _request_log_extra_from_http(request: Request) -> dict:
  113 + reqid = getattr(getattr(request, "state", None), "reqid", None) or request.headers.get("X-Request-ID")
  114 + uid = (
  115 + getattr(getattr(request, "state", None), "uid", None)
  116 + or request.headers.get("X-User-ID")
  117 + or request.headers.get("User-ID")
  118 + )
  119 + return {"reqid": reqid or "-1", "uid": uid or "-1"}
  120 +
  121 +
104 122 def init_service(es_host: str = "http://localhost:9200"):
105 123 """
106 124 Initialize search service with unified configuration.
... ... @@ -261,7 +279,11 @@ async def shutdown_event():
261 279 async def global_exception_handler(request: Request, exc: Exception):
262 280 """Global exception handler with detailed logging."""
263 281 client_ip = request.client.host if request.client else "unknown"
264   - logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
  282 + logger.error(
  283 + f"Unhandled exception from {client_ip}: {exc}",
  284 + exc_info=True,
  285 + extra=_request_log_extra_from_http(request),
  286 + )
265 287  
266 288 return JSONResponse(
267 289 status_code=500,
... ... @@ -276,7 +298,10 @@ async def global_exception_handler(request: Request, exc: Exception):
276 298 @app.exception_handler(HTTPException)
277 299 async def http_exception_handler(request: Request, exc: HTTPException):
278 300 """HTTP exception handler."""
279   - logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
  301 + logger.warning(
  302 + f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}",
  303 + extra=_request_log_extra_from_http(request),
  304 + )
280 305  
281 306 return JSONResponse(
282 307 status_code=exc.status_code,
... ...
api/routes/search.py
... ... @@ -59,6 +59,8 @@ async def search(request: SearchRequest, http_request: Request):
59 59 Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id).
60 60 """
61 61 reqid, uid = extract_request_info(http_request)
  62 + http_request.state.reqid = reqid
  63 + http_request.state.uid = uid
62 64  
63 65 # Extract tenant_id (required)
64 66 tenant_id = http_request.headers.get('X-Tenant-ID')
... ... @@ -213,6 +215,8 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request):
213 215 Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id).
214 216 """
215 217 reqid, uid = extract_request_info(http_request)
  218 + http_request.state.reqid = reqid
  219 + http_request.state.uid = uid
216 220  
217 221 # Extract tenant_id (required)
218 222 tenant_id = http_request.headers.get('X-Tenant-ID')
... ...
context/request_context.py
... ... @@ -12,6 +12,8 @@ from typing import Dict, Any, Optional, List
12 12 from dataclasses import dataclass, field
13 13 import uuid
14 14  
  15 +from request_log_context import bind_request_log_context, reset_request_log_context
  16 +
15 17  
16 18 class RequestContextStage(Enum):
17 19 """搜索阶段枚举"""
... ... @@ -375,9 +377,15 @@ def get_current_request_context() -> Optional[RequestContext]:
375 377 def set_current_request_context(context: RequestContext) -> None:
376 378 """设置当前线程的请求上下文"""
377 379 threading.current_thread().request_context = context
  380 + _, _, tokens = bind_request_log_context(context.reqid, context.uid)
  381 + threading.current_thread().request_log_tokens = tokens
378 382  
379 383  
380 384 def clear_current_request_context() -> None:
381 385 """清除当前线程的请求上下文"""
  386 + tokens = getattr(threading.current_thread(), 'request_log_tokens', None)
  387 + if tokens is not None:
  388 + reset_request_log_context(tokens)
  389 + delattr(threading.current_thread(), 'request_log_tokens')
382 390 if hasattr(threading.current_thread(), 'request_context'):
383 391 delattr(threading.current_thread(), 'request_context')
384 392 \ No newline at end of file
... ...
embeddings/README.md
... ... @@ -5,6 +5,8 @@
5 5 - `../docs/TEI_SERVICE说明文档.md`
6 6 - `../docs/CNCLIP_SERVICE说明文档.md`
7 7  
  8 +**请求日志串联(reqid / uid)**:统一实现在仓库根目录的 `request_log_context.py`(勿放到 `utils/` 下,以免 `.venv-embedding` 因 `utils/__init__.py` 拉取数据库依赖)。Uvicorn 日志配置见 `config/uvicorn_embedding_logging.json`。
  9 +
8 10 ---
9 11  
10 12 这个目录是一个完整的“向量化模块”,包含:
... ...
embeddings/config.py
... ... @@ -2,6 +2,7 @@
2 2  
3 3 from __future__ import annotations
4 4  
  5 +import os
5 6 from typing import Optional
6 7  
7 8 from config.loader import get_app_config
... ... @@ -25,6 +26,11 @@ class EmbeddingConfig(object):
25 26 self.TEXT_NORMALIZE_EMBEDDINGS = bool(text_backend.get("normalize_embeddings", True))
26 27 self.TEI_BASE_URL = str(text_backend.get("base_url") or "http://127.0.0.1:8080")
27 28 self.TEI_TIMEOUT_SEC = int(text_backend.get("timeout_sec", 60))
  29 + self.TEI_MAX_CLIENT_BATCH_SIZE = int(
  30 + os.getenv("TEI_MAX_CLIENT_BATCH_SIZE")
  31 + or text_backend.get("max_client_batch_size")
  32 + or 24
  33 + )
28 34  
29 35 self.USE_CLIP_AS_SERVICE = services.image_backend == "clip_as_service"
30 36 self.CLIP_AS_SERVICE_SERVER = str(image_backend.get("server") or "grpc://127.0.0.1:51000")
... ...
embeddings/image_encoder.py
... ... @@ -13,6 +13,7 @@ from config.loader import get_app_config
13 13 from config.services_config import get_embedding_image_base_url
14 14 from embeddings.cache_keys import build_image_cache_key
15 15 from embeddings.redis_embedding_cache import RedisEmbeddingCache
  16 +from request_log_context import build_downstream_request_headers, build_request_log_extra
16 17  
17 18  
18 19 class CLIPImageEncoder:
... ... @@ -40,6 +41,8 @@ class CLIPImageEncoder:
40 41 request_data: List[str],
41 42 normalize_embeddings: bool = True,
42 43 priority: int = 0,
  44 + request_id: Optional[str] = None,
  45 + user_id: Optional[str] = None,
43 46 ) -> List[Any]:
44 47 """
45 48 Call the embedding service API.
... ... @@ -50,6 +53,7 @@ class CLIPImageEncoder:
50 53 Returns:
51 54 List of embeddings (list[float]) or nulls (None), aligned to input order
52 55 """
  56 + response = None
53 57 try:
54 58 response = requests.post(
55 59 self.endpoint,
... ... @@ -58,12 +62,26 @@ class CLIPImageEncoder:
58 62 "priority": max(0, int(priority)),
59 63 },
60 64 json=request_data,
  65 + headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
61 66 timeout=60
62 67 )
63 68 response.raise_for_status()
64 69 return response.json()
65 70 except requests.exceptions.RequestException as e:
66   - logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True)
  71 + body_preview = ""
  72 + if response is not None:
  73 + try:
  74 + body_preview = (response.text or "")[:300]
  75 + except Exception:
  76 + body_preview = ""
  77 + logger.error(
  78 + "CLIPImageEncoder service request failed | status=%s body=%s error=%s",
  79 + getattr(response, "status_code", "n/a"),
  80 + body_preview,
  81 + e,
  82 + exc_info=True,
  83 + extra=build_request_log_extra(request_id=request_id, user_id=user_id),
  84 + )
67 85 raise
68 86  
69 87 def encode_image(self, image: Image.Image) -> np.ndarray:
... ... @@ -79,6 +97,8 @@ class CLIPImageEncoder:
79 97 url: str,
80 98 normalize_embeddings: bool = True,
81 99 priority: int = 0,
  100 + request_id: Optional[str] = None,
  101 + user_id: Optional[str] = None,
82 102 ) -> np.ndarray:
83 103 """
84 104 Generate image embedding via network service using URL.
... ... @@ -98,6 +118,8 @@ class CLIPImageEncoder:
98 118 [url],
99 119 normalize_embeddings=normalize_embeddings,
100 120 priority=priority,
  121 + request_id=request_id,
  122 + user_id=user_id,
101 123 )
102 124 if not response_data or len(response_data) != 1 or response_data[0] is None:
103 125 raise RuntimeError(f"No image embedding returned for URL: {url}")
... ... @@ -113,6 +135,8 @@ class CLIPImageEncoder:
113 135 batch_size: int = 8,
114 136 normalize_embeddings: bool = True,
115 137 priority: int = 0,
  138 + request_id: Optional[str] = None,
  139 + user_id: Optional[str] = None,
116 140 ) -> List[np.ndarray]:
117 141 """
118 142 Encode a batch of images efficiently via network service.
... ... @@ -151,6 +175,8 @@ class CLIPImageEncoder:
151 175 batch_urls,
152 176 normalize_embeddings=normalize_embeddings,
153 177 priority=priority,
  178 + request_id=request_id,
  179 + user_id=user_id,
154 180 )
155 181 if not response_data or len(response_data) != len(batch_urls):
156 182 raise RuntimeError(
... ... @@ -176,6 +202,8 @@ class CLIPImageEncoder:
176 202 batch_size: Optional[int] = None,
177 203 normalize_embeddings: bool = True,
178 204 priority: int = 0,
  205 + request_id: Optional[str] = None,
  206 + user_id: Optional[str] = None,
179 207 ) -> List[np.ndarray]:
180 208 """
181 209 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。
... ... @@ -192,4 +220,6 @@ class CLIPImageEncoder:
192 220 batch_size=batch_size or 8,
193 221 normalize_embeddings=normalize_embeddings,
194 222 priority=priority,
  223 + request_id=request_id,
  224 + user_id=user_id,
195 225 )
... ...
embeddings/server.py
... ... @@ -26,17 +26,17 @@ from embeddings.cache_keys import build_image_cache_key, build_text_cache_key
26 26 from embeddings.config import CONFIG
27 27 from embeddings.protocols import ImageEncoderProtocol
28 28 from embeddings.redis_embedding_cache import RedisEmbeddingCache
  29 +from request_log_context import (
  30 + LOG_LINE_FORMAT,
  31 + RequestLogContextFilter,
  32 + bind_request_log_context,
  33 + build_request_log_extra,
  34 + reset_request_log_context,
  35 +)
29 36  
30 37 app = FastAPI(title="saas-search Embedding Service", version="1.0.0")
31 38  
32 39  
33   -class _DefaultRequestIdFilter(logging.Filter):
34   - def filter(self, record: logging.LogRecord) -> bool:
35   - if not hasattr(record, "reqid"):
36   - record.reqid = "-1"
37   - return True
38   -
39   -
40 40 def configure_embedding_logging() -> None:
41 41 root_logger = logging.getLogger()
42 42 if getattr(root_logger, "_embedding_logging_configured", False):
... ... @@ -47,17 +47,15 @@ def configure_embedding_logging() -> None:
47 47  
48 48 log_level = os.getenv("LOG_LEVEL", "INFO").upper()
49 49 numeric_level = getattr(logging, log_level, logging.INFO)
50   - formatter = logging.Formatter(
51   - "%(asctime)s | reqid:%(reqid)s | %(name)s | %(levelname)s | %(message)s"
52   - )
53   - request_filter = _DefaultRequestIdFilter()
  50 + formatter = logging.Formatter(LOG_LINE_FORMAT)
  51 + context_filter = RequestLogContextFilter()
54 52  
55 53 root_logger.setLevel(numeric_level)
56 54 root_logger.handlers.clear()
57 55 stream_handler = logging.StreamHandler()
58 56 stream_handler.setLevel(numeric_level)
59 57 stream_handler.setFormatter(formatter)
60   - stream_handler.addFilter(request_filter)
  58 + stream_handler.addFilter(context_filter)
61 59 root_logger.addHandler(stream_handler)
62 60  
63 61 verbose_logger = logging.getLogger("embedding.verbose")
... ... @@ -231,6 +229,7 @@ class _TextDispatchTask:
231 229 normalized: List[str]
232 230 effective_normalize: bool
233 231 request_id: str
  232 + user_id: str
234 233 priority: int
235 234 created_at: float
236 235 done: threading.Event
... ... @@ -321,12 +320,13 @@ def _text_dispatch_worker_loop(worker_idx: int) -> None:
321 320 _priority_label(task.priority),
322 321 len(task.normalized),
323 322 queue_wait_ms,
324   - extra=_request_log_extra(task.request_id),
  323 + extra=build_request_log_extra(task.request_id, task.user_id),
325 324 )
326 325 task.result = _embed_text_impl(
327 326 task.normalized,
328 327 task.effective_normalize,
329 328 task.request_id,
  329 + task.user_id,
330 330 task.priority,
331 331 )
332 332 except Exception as exc:
... ... @@ -339,6 +339,7 @@ def _submit_text_dispatch_and_wait(
339 339 normalized: List[str],
340 340 effective_normalize: bool,
341 341 request_id: str,
  342 + user_id: str,
342 343 priority: int,
343 344 ) -> _EmbedResult:
344 345 if not any(worker.is_alive() for worker in _text_dispatch_workers):
... ... @@ -347,6 +348,7 @@ def _submit_text_dispatch_and_wait(
347 348 normalized=normalized,
348 349 effective_normalize=effective_normalize,
349 350 request_id=request_id,
  351 + user_id=user_id,
350 352 priority=_effective_priority(priority),
351 353 created_at=time.perf_counter(),
352 354 done=threading.Event(),
... ... @@ -380,6 +382,7 @@ class _SingleTextTask:
380 382 priority: int
381 383 created_at: float
382 384 request_id: str
  385 + user_id: str
383 386 done: threading.Event
384 387 result: Optional[List[float]] = None
385 388 error: Optional[Exception] = None
... ... @@ -435,10 +438,6 @@ def _preview_vector(vec: Optional[List[float]], max_dims: int = _VECTOR_PREVIEW_
435 438 return [round(float(v), 6) for v in vec[:max_dims]]
436 439  
437 440  
438   -def _request_log_extra(request_id: str) -> Dict[str, str]:
439   - return {"reqid": request_id}
440   -
441   -
442 441 def _resolve_request_id(http_request: Request) -> str:
443 442 header_value = http_request.headers.get("X-Request-ID")
444 443 if header_value and header_value.strip():
... ... @@ -446,6 +445,13 @@ def _resolve_request_id(http_request: Request) -> str:
446 445 return str(uuid.uuid4())[:8]
447 446  
448 447  
  448 +def _resolve_user_id(http_request: Request) -> str:
  449 + header_value = http_request.headers.get("X-User-ID") or http_request.headers.get("User-ID")
  450 + if header_value and header_value.strip():
  451 + return header_value.strip()[:64]
  452 + return "-1"
  453 +
  454 +
449 455 def _request_client(http_request: Request) -> str:
450 456 client = getattr(http_request, "client", None)
451 457 host = getattr(client, "host", None)
... ... @@ -522,18 +528,21 @@ def _text_batch_worker_loop() -> None:
522 528 try:
523 529 queue_wait_ms = [(time.perf_counter() - task.created_at) * 1000.0 for task in batch]
524 530 reqids = [task.request_id for task in batch]
  531 + uids = [task.user_id for task in batch]
525 532 logger.info(
526   - "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s preview=%s",
  533 + "text microbatch dispatch | size=%d priority=%s queue_wait_ms_min=%.2f queue_wait_ms_max=%.2f reqids=%s uids=%s preview=%s",
527 534 len(batch),
528 535 _priority_label(max(task.priority for task in batch)),
529 536 min(queue_wait_ms) if queue_wait_ms else 0.0,
530 537 max(queue_wait_ms) if queue_wait_ms else 0.0,
531 538 reqids,
  539 + uids,
532 540 _preview_inputs(
533 541 [task.text for task in batch],
534 542 _LOG_PREVIEW_COUNT,
535 543 _LOG_TEXT_PREVIEW_CHARS,
536 544 ),
  545 + extra=build_request_log_extra(),
537 546 )
538 547 batch_t0 = time.perf_counter()
539 548 embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False)
... ... @@ -548,19 +557,23 @@ def _text_batch_worker_loop() -> None:
548 557 raise RuntimeError("Text model returned empty embedding in micro-batch")
549 558 task.result = vec
550 559 logger.info(
551   - "text microbatch done | size=%d reqids=%s dim=%d backend_elapsed_ms=%.2f",
  560 + "text microbatch done | size=%d reqids=%s uids=%s dim=%d backend_elapsed_ms=%.2f",
552 561 len(batch),
553 562 reqids,
  563 + uids,
554 564 len(batch[0].result) if batch and batch[0].result is not None else 0,
555 565 (time.perf_counter() - batch_t0) * 1000.0,
  566 + extra=build_request_log_extra(),
556 567 )
557 568 except Exception as exc:
558 569 logger.error(
559   - "text microbatch failed | size=%d reqids=%s error=%s",
  570 + "text microbatch failed | size=%d reqids=%s uids=%s error=%s",
560 571 len(batch),
561 572 [task.request_id for task in batch],
  573 + [task.user_id for task in batch],
562 574 exc,
563 575 exc_info=True,
  576 + extra=build_request_log_extra(),
564 577 )
565 578 for task in batch:
566 579 task.error = exc
... ... @@ -573,6 +586,7 @@ def _encode_single_text_with_microbatch(
573 586 text: str,
574 587 normalize: bool,
575 588 request_id: str,
  589 + user_id: str,
576 590 priority: int,
577 591 ) -> List[float]:
578 592 task = _SingleTextTask(
... ... @@ -581,6 +595,7 @@ def _encode_single_text_with_microbatch(
581 595 priority=_effective_priority(priority),
582 596 created_at=time.perf_counter(),
583 597 request_id=request_id,
  598 + user_id=user_id,
584 599 done=threading.Event(),
585 600 )
586 601 with _text_single_queue_cv:
... ... @@ -632,6 +647,9 @@ def load_models():
632 647 _text_model = TEITextModel(
633 648 base_url=str(base_url),
634 649 timeout_sec=timeout_sec,
  650 + max_client_batch_size=int(
  651 + backend_cfg.get("max_client_batch_size") or CONFIG.TEI_MAX_CLIENT_BATCH_SIZE
  652 + ),
635 653 )
636 654 elif backend_name == "local_st":
637 655 from embeddings.text_embedding_sentence_transformers import Qwen3TextModel
... ... @@ -823,6 +841,7 @@ def _embed_text_impl(
823 841 normalized: List[str],
824 842 effective_normalize: bool,
825 843 request_id: str,
  844 + user_id: str,
826 845 priority: int = 0,
827 846 ) -> _EmbedResult:
828 847 if _text_model is None:
... ... @@ -854,7 +873,7 @@ def _embed_text_impl(
854 873 effective_normalize,
855 874 len(out[0]) if out and out[0] is not None else 0,
856 875 cache_hits,
857   - extra=_request_log_extra(request_id),
  876 + extra=build_request_log_extra(request_id, user_id),
858 877 )
859 878 return _EmbedResult(
860 879 vectors=out,
... ... @@ -873,6 +892,7 @@ def _embed_text_impl(
873 892 missing_texts[0],
874 893 normalize=effective_normalize,
875 894 request_id=request_id,
  895 + user_id=user_id,
876 896 priority=priority,
877 897 )
878 898 ]
... ... @@ -905,7 +925,7 @@ def _embed_text_impl(
905 925 "Text embedding backend failure: %s",
906 926 e,
907 927 exc_info=True,
908   - extra=_request_log_extra(request_id),
  928 + extra=build_request_log_extra(request_id, user_id),
909 929 )
910 930 raise RuntimeError(f"Text embedding backend failure: {e}") from e
911 931  
... ... @@ -931,7 +951,7 @@ def _embed_text_impl(
931 951 cache_hits,
932 952 len(missing_texts),
933 953 backend_elapsed_ms,
934   - extra=_request_log_extra(request_id),
  954 + extra=build_request_log_extra(request_id, user_id),
935 955 )
936 956 return _EmbedResult(
937 957 vectors=out,
... ... @@ -954,75 +974,79 @@ async def embed_text(
954 974 raise HTTPException(status_code=503, detail="Text embedding model not loaded in this service")
955 975  
956 976 request_id = _resolve_request_id(http_request)
  977 + user_id = _resolve_user_id(http_request)
  978 + _, _, log_tokens = bind_request_log_context(request_id, user_id)
957 979 response.headers["X-Request-ID"] = request_id
958   -
959   - if priority < 0:
960   - raise HTTPException(status_code=400, detail="priority must be >= 0")
961   - effective_priority = _effective_priority(priority)
962   - effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
963   - normalized: List[str] = []
964   - for i, t in enumerate(texts):
965   - if not isinstance(t, str):
966   - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string")
967   - s = t.strip()
968   - if not s:
969   - raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string")
970   - normalized.append(s)
971   -
972   - cache_check_started = time.perf_counter()
973   - cache_only = _try_full_text_cache_hit(normalized, effective_normalize)
974   - if cache_only is not None:
975   - latency_ms = (time.perf_counter() - cache_check_started) * 1000.0
976   - _text_stats.record_completed(
977   - success=True,
978   - latency_ms=latency_ms,
979   - backend_latency_ms=0.0,
980   - cache_hits=cache_only.cache_hits,
981   - cache_misses=0,
982   - )
983   - logger.info(
984   - "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
985   - _text_backend_name,
986   - _priority_label(effective_priority),
987   - len(normalized),
988   - effective_normalize,
989   - len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
990   - cache_only.cache_hits,
991   - _preview_vector(cache_only.vectors[0] if cache_only.vectors else None),
992   - latency_ms,
993   - extra=_request_log_extra(request_id),
994   - )
995   - return cache_only.vectors
996   -
997   - accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
998   - if not accepted:
999   - _text_stats.record_rejected()
1000   - logger.warning(
1001   - "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1002   - _request_client(http_request),
1003   - _text_backend_name,
1004   - _priority_label(effective_priority),
1005   - len(normalized),
1006   - effective_normalize,
1007   - active,
1008   - _TEXT_MAX_INFLIGHT,
1009   - _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS),
1010   - extra=_request_log_extra(request_id),
1011   - )
1012   - raise HTTPException(
1013   - status_code=_OVERLOAD_STATUS_CODE,
1014   - detail=(
1015   - "Text embedding service busy for priority=0 requests: "
1016   - f"active={active}, limit={_TEXT_MAX_INFLIGHT}"
1017   - ),
1018   - )
1019   -
  980 + response.headers["X-User-ID"] = user_id
1020 981 request_started = time.perf_counter()
1021 982 success = False
1022 983 backend_elapsed_ms = 0.0
1023 984 cache_hits = 0
1024 985 cache_misses = 0
  986 + limiter_acquired = False
  987 +
1025 988 try:
  989 + if priority < 0:
  990 + raise HTTPException(status_code=400, detail="priority must be >= 0")
  991 + effective_priority = _effective_priority(priority)
  992 + effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
  993 + normalized: List[str] = []
  994 + for i, t in enumerate(texts):
  995 + if not isinstance(t, str):
  996 + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string")
  997 + s = t.strip()
  998 + if not s:
  999 + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string")
  1000 + normalized.append(s)
  1001 +
  1002 + cache_check_started = time.perf_counter()
  1003 + cache_only = _try_full_text_cache_hit(normalized, effective_normalize)
  1004 + if cache_only is not None:
  1005 + latency_ms = (time.perf_counter() - cache_check_started) * 1000.0
  1006 + _text_stats.record_completed(
  1007 + success=True,
  1008 + latency_ms=latency_ms,
  1009 + backend_latency_ms=0.0,
  1010 + cache_hits=cache_only.cache_hits,
  1011 + cache_misses=0,
  1012 + )
  1013 + logger.info(
  1014 + "embed_text response | backend=%s mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
  1015 + _text_backend_name,
  1016 + _priority_label(effective_priority),
  1017 + len(normalized),
  1018 + effective_normalize,
  1019 + len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
  1020 + cache_only.cache_hits,
  1021 + _preview_vector(cache_only.vectors[0] if cache_only.vectors else None),
  1022 + latency_ms,
  1023 + extra=build_request_log_extra(request_id, user_id),
  1024 + )
  1025 + return cache_only.vectors
  1026 +
  1027 + accepted, active = _text_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
  1028 + if not accepted:
  1029 + _text_stats.record_rejected()
  1030 + logger.warning(
  1031 + "embed_text rejected | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1032 + _request_client(http_request),
  1033 + _text_backend_name,
  1034 + _priority_label(effective_priority),
  1035 + len(normalized),
  1036 + effective_normalize,
  1037 + active,
  1038 + _TEXT_MAX_INFLIGHT,
  1039 + _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS),
  1040 + extra=build_request_log_extra(request_id, user_id),
  1041 + )
  1042 + raise HTTPException(
  1043 + status_code=_OVERLOAD_STATUS_CODE,
  1044 + detail=(
  1045 + "Text embedding service busy for priority=0 requests: "
  1046 + f"active={active}, limit={_TEXT_MAX_INFLIGHT}"
  1047 + ),
  1048 + )
  1049 + limiter_acquired = True
1026 1050 logger.info(
1027 1051 "embed_text request | client=%s backend=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1028 1052 _request_client(http_request),
... ... @@ -1033,7 +1057,7 @@ async def embed_text(
1033 1057 active,
1034 1058 _TEXT_MAX_INFLIGHT,
1035 1059 _preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS),
1036   - extra=_request_log_extra(request_id),
  1060 + extra=build_request_log_extra(request_id, user_id),
1037 1061 )
1038 1062 verbose_logger.info(
1039 1063 "embed_text detail | payload=%s normalize=%s backend=%s priority=%s",
... ... @@ -1041,13 +1065,14 @@ async def embed_text(
1041 1065 effective_normalize,
1042 1066 _text_backend_name,
1043 1067 _priority_label(effective_priority),
1044   - extra=_request_log_extra(request_id),
  1068 + extra=build_request_log_extra(request_id, user_id),
1045 1069 )
1046 1070 result = await run_in_threadpool(
1047 1071 _submit_text_dispatch_and_wait,
1048 1072 normalized,
1049 1073 effective_normalize,
1050 1074 request_id,
  1075 + user_id,
1051 1076 effective_priority,
1052 1077 )
1053 1078 success = True
... ... @@ -1074,7 +1099,7 @@ async def embed_text(
1074 1099 cache_misses,
1075 1100 _preview_vector(result.vectors[0] if result.vectors else None),
1076 1101 latency_ms,
1077   - extra=_request_log_extra(request_id),
  1102 + extra=build_request_log_extra(request_id, user_id),
1078 1103 )
1079 1104 verbose_logger.info(
1080 1105 "embed_text result detail | count=%d priority=%s first_vector=%s latency_ms=%.2f",
... ... @@ -1084,7 +1109,7 @@ async def embed_text(
1084 1109 if result.vectors and result.vectors[0] is not None
1085 1110 else [],
1086 1111 latency_ms,
1087   - extra=_request_log_extra(request_id),
  1112 + extra=build_request_log_extra(request_id, user_id),
1088 1113 )
1089 1114 return result.vectors
1090 1115 except HTTPException:
... ... @@ -1107,24 +1132,27 @@ async def embed_text(
1107 1132 latency_ms,
1108 1133 e,
1109 1134 exc_info=True,
1110   - extra=_request_log_extra(request_id),
  1135 + extra=build_request_log_extra(request_id, user_id),
1111 1136 )
1112 1137 raise HTTPException(status_code=502, detail=str(e)) from e
1113 1138 finally:
1114   - remaining = _text_request_limiter.release(success=success)
1115   - logger.info(
1116   - "embed_text finalize | success=%s priority=%s active_after=%d",
1117   - success,
1118   - _priority_label(effective_priority),
1119   - remaining,
1120   - extra=_request_log_extra(request_id),
1121   - )
  1139 + if limiter_acquired:
  1140 + remaining = _text_request_limiter.release(success=success)
  1141 + logger.info(
  1142 + "embed_text finalize | success=%s priority=%s active_after=%d",
  1143 + success,
  1144 + _priority_label(effective_priority),
  1145 + remaining,
  1146 + extra=build_request_log_extra(request_id, user_id),
  1147 + )
  1148 + reset_request_log_context(log_tokens)
1122 1149  
1123 1150  
1124 1151 def _embed_image_impl(
1125 1152 urls: List[str],
1126 1153 effective_normalize: bool,
1127 1154 request_id: str,
  1155 + user_id: str,
1128 1156 ) -> _EmbedResult:
1129 1157 if _image_model is None:
1130 1158 raise RuntimeError("Image model not loaded")
... ... @@ -1154,7 +1182,7 @@ def _embed_image_impl(
1154 1182 effective_normalize,
1155 1183 len(out[0]) if out and out[0] is not None else 0,
1156 1184 cache_hits,
1157   - extra=_request_log_extra(request_id),
  1185 + extra=build_request_log_extra(request_id, user_id),
1158 1186 )
1159 1187 return _EmbedResult(
1160 1188 vectors=out,
... ... @@ -1194,7 +1222,7 @@ def _embed_image_impl(
1194 1222 cache_hits,
1195 1223 len(missing_urls),
1196 1224 backend_elapsed_ms,
1197   - extra=_request_log_extra(request_id),
  1225 + extra=build_request_log_extra(request_id, user_id),
1198 1226 )
1199 1227 return _EmbedResult(
1200 1228 vectors=out,
... ... @@ -1217,74 +1245,78 @@ async def embed_image(
1217 1245 raise HTTPException(status_code=503, detail="Image embedding model not loaded in this service")
1218 1246  
1219 1247 request_id = _resolve_request_id(http_request)
  1248 + user_id = _resolve_user_id(http_request)
  1249 + _, _, log_tokens = bind_request_log_context(request_id, user_id)
1220 1250 response.headers["X-Request-ID"] = request_id
1221   -
1222   - if priority < 0:
1223   - raise HTTPException(status_code=400, detail="priority must be >= 0")
1224   - effective_priority = _effective_priority(priority)
1225   -
1226   - effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
1227   - urls: List[str] = []
1228   - for i, url_or_path in enumerate(images):
1229   - if not isinstance(url_or_path, str):
1230   - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path")
1231   - s = url_or_path.strip()
1232   - if not s:
1233   - raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path")
1234   - urls.append(s)
1235   -
1236   - cache_check_started = time.perf_counter()
1237   - cache_only = _try_full_image_cache_hit(urls, effective_normalize)
1238   - if cache_only is not None:
1239   - latency_ms = (time.perf_counter() - cache_check_started) * 1000.0
1240   - _image_stats.record_completed(
1241   - success=True,
1242   - latency_ms=latency_ms,
1243   - backend_latency_ms=0.0,
1244   - cache_hits=cache_only.cache_hits,
1245   - cache_misses=0,
1246   - )
1247   - logger.info(
1248   - "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
1249   - _priority_label(effective_priority),
1250   - len(urls),
1251   - effective_normalize,
1252   - len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
1253   - cache_only.cache_hits,
1254   - _preview_vector(cache_only.vectors[0] if cache_only.vectors else None),
1255   - latency_ms,
1256   - extra=_request_log_extra(request_id),
1257   - )
1258   - return cache_only.vectors
1259   -
1260   - accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
1261   - if not accepted:
1262   - _image_stats.record_rejected()
1263   - logger.warning(
1264   - "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1265   - _request_client(http_request),
1266   - _priority_label(effective_priority),
1267   - len(urls),
1268   - effective_normalize,
1269   - active,
1270   - _IMAGE_MAX_INFLIGHT,
1271   - _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS),
1272   - extra=_request_log_extra(request_id),
1273   - )
1274   - raise HTTPException(
1275   - status_code=_OVERLOAD_STATUS_CODE,
1276   - detail=(
1277   - "Image embedding service busy for priority=0 requests: "
1278   - f"active={active}, limit={_IMAGE_MAX_INFLIGHT}"
1279   - ),
1280   - )
1281   -
  1251 + response.headers["X-User-ID"] = user_id
1282 1252 request_started = time.perf_counter()
1283 1253 success = False
1284 1254 backend_elapsed_ms = 0.0
1285 1255 cache_hits = 0
1286 1256 cache_misses = 0
  1257 + limiter_acquired = False
  1258 +
1287 1259 try:
  1260 + if priority < 0:
  1261 + raise HTTPException(status_code=400, detail="priority must be >= 0")
  1262 + effective_priority = _effective_priority(priority)
  1263 +
  1264 + effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
  1265 + urls: List[str] = []
  1266 + for i, url_or_path in enumerate(images):
  1267 + if not isinstance(url_or_path, str):
  1268 + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path")
  1269 + s = url_or_path.strip()
  1270 + if not s:
  1271 + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path")
  1272 + urls.append(s)
  1273 +
  1274 + cache_check_started = time.perf_counter()
  1275 + cache_only = _try_full_image_cache_hit(urls, effective_normalize)
  1276 + if cache_only is not None:
  1277 + latency_ms = (time.perf_counter() - cache_check_started) * 1000.0
  1278 + _image_stats.record_completed(
  1279 + success=True,
  1280 + latency_ms=latency_ms,
  1281 + backend_latency_ms=0.0,
  1282 + cache_hits=cache_only.cache_hits,
  1283 + cache_misses=0,
  1284 + )
  1285 + logger.info(
  1286 + "embed_image response | mode=cache-only priority=%s inputs=%d normalize=%s dim=%d cache_hits=%d cache_misses=0 first_vector=%s latency_ms=%.2f",
  1287 + _priority_label(effective_priority),
  1288 + len(urls),
  1289 + effective_normalize,
  1290 + len(cache_only.vectors[0]) if cache_only.vectors and cache_only.vectors[0] is not None else 0,
  1291 + cache_only.cache_hits,
  1292 + _preview_vector(cache_only.vectors[0] if cache_only.vectors else None),
  1293 + latency_ms,
  1294 + extra=build_request_log_extra(request_id, user_id),
  1295 + )
  1296 + return cache_only.vectors
  1297 +
  1298 + accepted, active = _image_request_limiter.try_acquire(bypass_limit=effective_priority > 0)
  1299 + if not accepted:
  1300 + _image_stats.record_rejected()
  1301 + logger.warning(
  1302 + "embed_image rejected | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
  1303 + _request_client(http_request),
  1304 + _priority_label(effective_priority),
  1305 + len(urls),
  1306 + effective_normalize,
  1307 + active,
  1308 + _IMAGE_MAX_INFLIGHT,
  1309 + _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS),
  1310 + extra=build_request_log_extra(request_id, user_id),
  1311 + )
  1312 + raise HTTPException(
  1313 + status_code=_OVERLOAD_STATUS_CODE,
  1314 + detail=(
  1315 + "Image embedding service busy for priority=0 requests: "
  1316 + f"active={active}, limit={_IMAGE_MAX_INFLIGHT}"
  1317 + ),
  1318 + )
  1319 + limiter_acquired = True
1288 1320 logger.info(
1289 1321 "embed_image request | client=%s priority=%s inputs=%d normalize=%s active=%d limit=%d preview=%s",
1290 1322 _request_client(http_request),
... ... @@ -1294,16 +1326,16 @@ async def embed_image(
1294 1326 active,
1295 1327 _IMAGE_MAX_INFLIGHT,
1296 1328 _preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS),
1297   - extra=_request_log_extra(request_id),
  1329 + extra=build_request_log_extra(request_id, user_id),
1298 1330 )
1299 1331 verbose_logger.info(
1300 1332 "embed_image detail | payload=%s normalize=%s priority=%s",
1301 1333 urls,
1302 1334 effective_normalize,
1303 1335 _priority_label(effective_priority),
1304   - extra=_request_log_extra(request_id),
  1336 + extra=build_request_log_extra(request_id, user_id),
1305 1337 )
1306   - result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id)
  1338 + result = await run_in_threadpool(_embed_image_impl, urls, effective_normalize, request_id, user_id)
1307 1339 success = True
1308 1340 backend_elapsed_ms = result.backend_elapsed_ms
1309 1341 cache_hits = result.cache_hits
... ... @@ -1327,7 +1359,7 @@ async def embed_image(
1327 1359 cache_misses,
1328 1360 _preview_vector(result.vectors[0] if result.vectors else None),
1329 1361 latency_ms,
1330   - extra=_request_log_extra(request_id),
  1362 + extra=build_request_log_extra(request_id, user_id),
1331 1363 )
1332 1364 verbose_logger.info(
1333 1365 "embed_image result detail | count=%d first_vector=%s latency_ms=%.2f",
... ... @@ -1336,7 +1368,7 @@ async def embed_image(
1336 1368 if result.vectors and result.vectors[0] is not None
1337 1369 else [],
1338 1370 latency_ms,
1339   - extra=_request_log_extra(request_id),
  1371 + extra=build_request_log_extra(request_id, user_id),
1340 1372 )
1341 1373 return result.vectors
1342 1374 except HTTPException:
... ... @@ -1358,15 +1390,17 @@ async def embed_image(
1358 1390 latency_ms,
1359 1391 e,
1360 1392 exc_info=True,
1361   - extra=_request_log_extra(request_id),
  1393 + extra=build_request_log_extra(request_id, user_id),
1362 1394 )
1363 1395 raise HTTPException(status_code=502, detail=f"Image embedding backend failure: {e}") from e
1364 1396 finally:
1365   - remaining = _image_request_limiter.release(success=success)
1366   - logger.info(
1367   - "embed_image finalize | success=%s priority=%s active_after=%d",
1368   - success,
1369   - _priority_label(effective_priority),
1370   - remaining,
1371   - extra=_request_log_extra(request_id),
1372   - )
  1397 + if limiter_acquired:
  1398 + remaining = _image_request_limiter.release(success=success)
  1399 + logger.info(
  1400 + "embed_image finalize | success=%s priority=%s active_after=%d",
  1401 + success,
  1402 + _priority_label(effective_priority),
  1403 + remaining,
  1404 + extra=build_request_log_extra(request_id, user_id),
  1405 + )
  1406 + reset_request_log_context(log_tokens)
... ...
embeddings/text_embedding_tei.py
... ... @@ -2,11 +2,14 @@
2 2  
3 3 from __future__ import annotations
4 4  
  5 +import logging
5 6 from typing import Any, List, Union
6 7  
7 8 import numpy as np
8 9 import requests
9 10  
  11 +logger = logging.getLogger(__name__)
  12 +
10 13  
11 14 class TEITextModel:
12 15 """
... ... @@ -18,12 +21,13 @@ class TEITextModel:
18 21 response: [[...], [...], ...]
19 22 """
20 23  
21   - def __init__(self, base_url: str, timeout_sec: int = 60):
  24 + def __init__(self, base_url: str, timeout_sec: int = 60, max_client_batch_size: int = 24):
22 25 if not base_url or not str(base_url).strip():
23 26 raise ValueError("TEI base_url must not be empty")
24 27 self.base_url = str(base_url).rstrip("/")
25 28 self.endpoint = f"{self.base_url}/embed"
26 29 self.timeout_sec = int(timeout_sec)
  30 + self.max_client_batch_size = max(1, int(max_client_batch_size))
27 31 self._health_check()
28 32  
29 33 def _health_check(self) -> None:
... ... @@ -72,16 +76,28 @@ class TEITextModel:
72 76 if not isinstance(t, str) or not t.strip():
73 77 raise ValueError(f"Invalid input text at index {i}: {t!r}")
74 78  
75   - response = requests.post(
76   - self.endpoint,
77   - json={"inputs": texts},
78   - timeout=self.timeout_sec,
79   - )
80   - response.raise_for_status()
81   - payload = response.json()
82   - vectors = self._parse_payload(payload, expected_len=len(texts))
83   - if normalize_embeddings:
84   - vectors = [self._normalize(vec) for vec in vectors]
  79 + if len(texts) > self.max_client_batch_size:
  80 + logger.info(
  81 + "TEI batch split | total_inputs=%d chunk_size=%d chunks=%d",
  82 + len(texts),
  83 + self.max_client_batch_size,
  84 + (len(texts) + self.max_client_batch_size - 1) // self.max_client_batch_size,
  85 + )
  86 +
  87 + vectors: List[np.ndarray] = []
  88 + for start in range(0, len(texts), self.max_client_batch_size):
  89 + batch = texts[start : start + self.max_client_batch_size]
  90 + response = requests.post(
  91 + self.endpoint,
  92 + json={"inputs": batch},
  93 + timeout=self.timeout_sec,
  94 + )
  95 + response.raise_for_status()
  96 + payload = response.json()
  97 + parsed = self._parse_payload(payload, expected_len=len(batch))
  98 + if normalize_embeddings:
  99 + parsed = [self._normalize(vec) for vec in parsed]
  100 + vectors.extend(parsed)
85 101 return np.array(vectors, dtype=object)
86 102  
87 103 def _parse_payload(self, payload: Any, expected_len: int) -> List[np.ndarray]:
... ...
embeddings/text_encoder.py
... ... @@ -13,6 +13,7 @@ from config.loader import get_app_config
13 13 from config.services_config import get_embedding_text_base_url
14 14 from embeddings.cache_keys import build_text_cache_key
15 15 from embeddings.redis_embedding_cache import RedisEmbeddingCache
  16 +from request_log_context import build_downstream_request_headers, build_request_log_extra
16 17  
17 18  
18 19 class TextEmbeddingEncoder:
... ... @@ -40,6 +41,8 @@ class TextEmbeddingEncoder:
40 41 request_data: List[str],
41 42 normalize_embeddings: bool = True,
42 43 priority: int = 0,
  44 + request_id: Optional[str] = None,
  45 + user_id: Optional[str] = None,
43 46 ) -> List[Any]:
44 47 """
45 48 Call the embedding service API.
... ... @@ -50,6 +53,7 @@ class TextEmbeddingEncoder:
50 53 Returns:
51 54 List of embeddings (list[float]) or nulls (None), aligned to input order
52 55 """
  56 + response = None
53 57 try:
54 58 response = requests.post(
55 59 self.endpoint,
... ... @@ -58,12 +62,26 @@ class TextEmbeddingEncoder:
58 62 "priority": max(0, int(priority)),
59 63 },
60 64 json=request_data,
  65 + headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
61 66 timeout=60
62 67 )
63 68 response.raise_for_status()
64 69 return response.json()
65 70 except requests.exceptions.RequestException as e:
66   - logger.error(f"TextEmbeddingEncoder service request failed: {e}", exc_info=True)
  71 + body_preview = ""
  72 + if response is not None:
  73 + try:
  74 + body_preview = (response.text or "")[:300]
  75 + except Exception:
  76 + body_preview = ""
  77 + logger.error(
  78 + "TextEmbeddingEncoder service request failed | status=%s body=%s error=%s",
  79 + getattr(response, "status_code", "n/a"),
  80 + body_preview,
  81 + e,
  82 + exc_info=True,
  83 + extra=build_request_log_extra(request_id=request_id, user_id=user_id),
  84 + )
67 85 raise
68 86  
69 87 def encode(
... ... @@ -72,7 +90,9 @@ class TextEmbeddingEncoder:
72 90 normalize_embeddings: bool = True,
73 91 priority: int = 0,
74 92 device: str = 'cpu',
75   - batch_size: int = 32
  93 + batch_size: int = 32,
  94 + request_id: Optional[str] = None,
  95 + user_id: Optional[str] = None,
76 96 ) -> np.ndarray:
77 97 """
78 98 Encode text into embeddings via network service with Redis caching.
... ... @@ -113,6 +133,8 @@ class TextEmbeddingEncoder:
113 133 request_data,
114 134 normalize_embeddings=normalize_embeddings,
115 135 priority=priority,
  136 + request_id=request_id,
  137 + user_id=user_id,
116 138 )
117 139  
118 140 # Process response
... ...
query/query_parser.py
... ... @@ -301,7 +301,12 @@ class QueryParser:
301 301 log_debug("Submitting query vector generation")
302 302  
303 303 def _encode_query_vector() -> Optional[np.ndarray]:
304   - arr = self.text_encoder.encode([query_text], priority=1)
  304 + arr = self.text_encoder.encode(
  305 + [query_text],
  306 + priority=1,
  307 + request_id=(context.reqid if context else None),
  308 + user_id=(context.uid if context else None),
  309 + )
305 310 if arr is None or len(arr) == 0:
306 311 return None
307 312 vec = arr[0]
... ...
request_log_context.py 0 → 100644
... ... @@ -0,0 +1,107 @@
  1 +"""
  2 +Request-scoped reqid/uid for logging and downstream HTTP headers.
  3 +
  4 +Kept as a **top-level module** (not under ``utils/``) because ``utils/__init__.py``
  5 +pulls optional deps (e.g. sqlalchemy) that are not installed in ``.venv-embedding``.
  6 +Uvicorn ``--log-config`` and the embedding service must be able to import this module
  7 +without importing the full ``utils`` package.
  8 +"""
  9 +
  10 +from __future__ import annotations
  11 +
  12 +import logging
  13 +from contextvars import ContextVar, Token
  14 +from typing import Dict, Optional, Tuple
  15 +
  16 +_DEFAULT_REQUEST_ID = "-1"
  17 +_DEFAULT_USER_ID = "-1"
  18 +
  19 +_request_id_var: ContextVar[str] = ContextVar("request_log_reqid", default=_DEFAULT_REQUEST_ID)
  20 +_user_id_var: ContextVar[str] = ContextVar("request_log_uid", default=_DEFAULT_USER_ID)
  21 +
  22 +LOG_LINE_FORMAT = (
  23 + "%(asctime)s | reqid:%(reqid)s | uid:%(uid)s | %(levelname)-8s | %(name)s | %(message)s"
  24 +)
  25 +
  26 +
  27 +def _normalize_value(value: Optional[str], *, fallback: str) -> str:
  28 + text = str(value or "").strip()
  29 + return text[:64] if text else fallback
  30 +
  31 +
  32 +def bind_request_log_context(
  33 + request_id: Optional[str] = None,
  34 + user_id: Optional[str] = None,
  35 +) -> Tuple[str, str, Tuple[Token[str], Token[str]]]:
  36 + """Bind reqid/uid to contextvars for the current execution context."""
  37 + normalized_reqid = _normalize_value(request_id, fallback=_DEFAULT_REQUEST_ID)
  38 + normalized_uid = _normalize_value(user_id, fallback=_DEFAULT_USER_ID)
  39 + req_token = _request_id_var.set(normalized_reqid)
  40 + uid_token = _user_id_var.set(normalized_uid)
  41 + return normalized_reqid, normalized_uid, (req_token, uid_token)
  42 +
  43 +
  44 +def reset_request_log_context(tokens: Tuple[Token[str], Token[str]]) -> None:
  45 + """Reset reqid/uid contextvars back to their previous values."""
  46 + req_token, uid_token = tokens
  47 + _request_id_var.reset(req_token)
  48 + _user_id_var.reset(uid_token)
  49 +
  50 +
  51 +def current_request_log_context() -> Tuple[str, str]:
  52 + """Return the currently bound reqid/uid pair."""
  53 + return _request_id_var.get(), _user_id_var.get()
  54 +
  55 +
  56 +def build_request_log_extra(
  57 + request_id: Optional[str] = None,
  58 + user_id: Optional[str] = None,
  59 +) -> Dict[str, str]:
  60 + """Build logging extras, defaulting to the current bound context."""
  61 + current_reqid, current_uid = current_request_log_context()
  62 + return {
  63 + "reqid": _normalize_value(request_id, fallback=current_reqid),
  64 + "uid": _normalize_value(user_id, fallback=current_uid),
  65 + }
  66 +
  67 +
  68 +def build_downstream_request_headers(
  69 + request_id: Optional[str] = None,
  70 + user_id: Optional[str] = None,
  71 +) -> Dict[str, str]:
  72 + """Build headers for downstream service calls when request context exists."""
  73 + extra = build_request_log_extra(request_id=request_id, user_id=user_id)
  74 + if extra["reqid"] == _DEFAULT_REQUEST_ID and extra["uid"] == _DEFAULT_USER_ID:
  75 + return {}
  76 + headers = {"X-Request-ID": extra["reqid"]}
  77 + if extra["uid"]:
  78 + headers["X-User-ID"] = extra["uid"]
  79 + return headers
  80 +
  81 +
  82 +class RequestLogContextFilter(logging.Filter):
  83 + """Inject reqid/uid defaults into all log records."""
  84 +
  85 + def filter(self, record: logging.LogRecord) -> bool:
  86 + reqid = getattr(record, "reqid", None)
  87 + uid = getattr(record, "uid", None)
  88 +
  89 + if reqid is None or uid is None:
  90 + bound_reqid, bound_uid = current_request_log_context()
  91 + reqid = reqid if reqid is not None else bound_reqid
  92 + uid = uid if uid is not None else bound_uid
  93 +
  94 + if reqid == _DEFAULT_REQUEST_ID and uid == _DEFAULT_USER_ID:
  95 + try:
  96 + from context.request_context import get_current_request_context
  97 +
  98 + context = get_current_request_context()
  99 + except Exception:
  100 + context = None
  101 + if context is not None:
  102 + reqid = getattr(context, "reqid", None) or reqid
  103 + uid = getattr(context, "uid", None) or uid
  104 +
  105 + record.reqid = _normalize_value(reqid, fallback=_DEFAULT_REQUEST_ID)
  106 + record.uid = _normalize_value(uid, fallback=_DEFAULT_USER_ID)
  107 + return True
... ...
tests/test_embedding_pipeline.py
... ... @@ -13,9 +13,11 @@ from config import (
13 13 )
14 14 from embeddings.text_encoder import TextEmbeddingEncoder
15 15 from embeddings.image_encoder import CLIPImageEncoder
  16 +from embeddings.text_embedding_tei import TEITextModel
16 17 from embeddings.bf16 import encode_embedding_for_redis
17 18 from embeddings.cache_keys import build_image_cache_key, build_text_cache_key
18 19 from query import QueryParser
  20 +from context.request_context import create_request_context, set_current_request_context, clear_current_request_context
19 21  
20 22  
21 23 class _FakeRedis:
... ... @@ -168,6 +170,30 @@ def test_text_embedding_encoder_cache_hit(monkeypatch):
168 170 assert np.allclose(out[1], np.array([0.3, 0.4], dtype=np.float32))
169 171  
170 172  
  173 +def test_text_embedding_encoder_forwards_request_headers(monkeypatch):
  174 + fake_cache = _FakeEmbeddingCache()
  175 + monkeypatch.setattr("embeddings.text_encoder.RedisEmbeddingCache", lambda **kwargs: fake_cache)
  176 +
  177 + captured = {}
  178 +
  179 + def _fake_post(url, json, timeout, **kwargs):
  180 + captured["headers"] = dict(kwargs.get("headers") or {})
  181 + return _FakeResponse([[0.1, 0.2]])
  182 +
  183 + monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post)
  184 +
  185 + context = create_request_context(reqid="req-ctx-1", uid="user-ctx-1")
  186 + set_current_request_context(context)
  187 + try:
  188 + encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005")
  189 + encoder.encode(["hello"])
  190 + finally:
  191 + clear_current_request_context()
  192 +
  193 + assert captured["headers"]["X-Request-ID"] == "req-ctx-1"
  194 + assert captured["headers"]["X-User-ID"] == "user-ctx-1"
  195 +
  196 +
171 197 def test_image_embedding_encoder_cache_hit(monkeypatch):
172 198 fake_cache = _FakeEmbeddingCache()
173 199 cached = np.array([0.5, 0.6], dtype=np.float32)
... ... @@ -234,3 +260,37 @@ def test_query_parser_skips_query_vector_when_disabled():
234 260  
235 261 parsed = parser.parse("red dress", tenant_id="162", generate_vector=False)
236 262 assert parsed.query_vector is None
  263 +
  264 +
  265 +def test_tei_text_model_splits_batches_over_client_limit(monkeypatch):
  266 + monkeypatch.setattr(TEITextModel, "_health_check", lambda self: None)
  267 + calls = []
  268 +
  269 + class _Response:
  270 + def __init__(self, payload):
  271 + self._payload = payload
  272 +
  273 + def raise_for_status(self):
  274 + return None
  275 +
  276 + def json(self):
  277 + return self._payload
  278 +
  279 + def _fake_post(url, json, timeout):
  280 + inputs = list(json["inputs"])
  281 + calls.append(inputs)
  282 + return _Response([[float(idx)] for idx, _ in enumerate(inputs, start=1)])
  283 +
  284 + monkeypatch.setattr("embeddings.text_embedding_tei.requests.post", _fake_post)
  285 +
  286 + model = TEITextModel(
  287 + base_url="http://127.0.0.1:8080",
  288 + timeout_sec=20,
  289 + max_client_batch_size=24,
  290 + )
  291 + vectors = model.encode([f"text-{idx}" for idx in range(25)], normalize_embeddings=False)
  292 +
  293 + assert len(calls) == 2
  294 + assert len(calls[0]) == 24
  295 + assert len(calls[1]) == 1
  296 + assert len(vectors) == 25
... ...
utils/logger.py
... ... @@ -14,6 +14,8 @@ from datetime import datetime
14 14 from typing import Any, Dict, Optional
15 15 from pathlib import Path
16 16  
  17 +from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter
  18 +
17 19  
18 20 class StructuredFormatter(logging.Formatter):
19 21 """Structured JSON formatter with request context support"""
... ... @@ -89,25 +91,6 @@ def _log_with_context(logger: logging.Logger, level: int, msg: str, **kwargs):
89 91 logging.setLogRecordFactory(old_factory)
90 92  
91 93  
92   -class RequestContextFilter(logging.Filter):
93   - """Filter that automatically injects request context from thread-local storage"""
94   -
95   - def filter(self, record: logging.LogRecord) -> bool:
96   - """Inject request context from thread-local storage"""
97   - try:
98   - # Import here to avoid circular imports
99   - from context.request_context import get_current_request_context
100   - context = get_current_request_context()
101   - if context:
102   - # Ensure every request-scoped log record carries reqid/uid.
103   - # If they are missing in the context, fall back to "-1".
104   - record.reqid = getattr(context, "reqid", None) or "-1"
105   - record.uid = getattr(context, "uid", None) or "-1"
106   - except (ImportError, AttributeError):
107   - pass
108   - return True
109   -
110   -
111 94 class ContextAwareConsoleFormatter(logging.Formatter):
112 95 """
113 96 Console formatter that injects reqid/uid into the log line.
... ... @@ -156,9 +139,7 @@ def setup_logging(
156 139  
157 140 # Create formatters
158 141 structured_formatter = StructuredFormatter()
159   - console_formatter = ContextAwareConsoleFormatter(
160   - '%(asctime)s | reqid:%(reqid)s | uid:%(uid)s | %(levelname)-8s | %(name)-15s | %(message)s'
161   - )
  142 + console_formatter = ContextAwareConsoleFormatter(LOG_LINE_FORMAT)
162 143  
163 144 # Add console handler
164 145 if enable_console:
... ...