service_registry.py
1.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""
Minimal service registry shared by multiple FastAPI apps.
We keep only ONE copy of indexer routes (api/routes/indexer.py) and inject
services via this registry, so the same routes can run in:
- Search API (6002): does NOT register indexer services, and does NOT mount indexer routes.
- Indexer API (6004): registers indexer services and mounts indexer routes.
"""
from __future__ import annotations
from typing import Any, Optional
_es_client: Optional[Any] = None
_incremental_service: Optional[Any] = None
_bulk_indexing_service: Optional[Any] = None
def set_es_client(es_client: Any) -> None:
global _es_client
_es_client = es_client
def get_es_client() -> Any:
if _es_client is None:
raise RuntimeError("ES client is not initialized")
return _es_client
def set_indexer_services(*, incremental_service: Any, bulk_indexing_service: Any) -> None:
global _incremental_service, _bulk_indexing_service
_incremental_service = incremental_service
_bulk_indexing_service = bulk_indexing_service
def get_incremental_service() -> Optional[Any]:
return _incremental_service
def get_bulk_indexing_service() -> Optional[Any]:
return _bulk_indexing_service