service_registry.py 1.18 KB
"""
Minimal service registry shared by multiple FastAPI apps.

We keep only ONE copy of indexer routes (api/routes/indexer.py) and inject
services via this registry, so the same routes can run in:
- Search API (6002): does NOT register indexer services, and does NOT mount indexer routes.
- Indexer API (6004): registers indexer services and mounts indexer routes.
"""

from __future__ import annotations

from typing import Any, Optional


_es_client: Optional[Any] = None
_incremental_service: Optional[Any] = None
_bulk_indexing_service: Optional[Any] = None


def set_es_client(es_client: Any) -> None:
    global _es_client
    _es_client = es_client


def get_es_client() -> Any:
    if _es_client is None:
        raise RuntimeError("ES client is not initialized")
    return _es_client


def set_indexer_services(*, incremental_service: Any, bulk_indexing_service: Any) -> None:
    global _incremental_service, _bulk_indexing_service
    _incremental_service = incremental_service
    _bulk_indexing_service = bulk_indexing_service


def get_incremental_service() -> Optional[Any]:
    return _incremental_service


def get_bulk_indexing_service() -> Optional[Any]:
    return _bulk_indexing_service