""" Unified application configuration loader. This module is the single source of truth for loading, merging, normalizing, and validating application configuration. """ from __future__ import annotations import hashlib import json import os from copy import deepcopy from dataclasses import asdict from functools import lru_cache from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import yaml try: from dotenv import load_dotenv as _load_dotenv # type: ignore except Exception: # pragma: no cover _load_dotenv = None from config.schema import ( AppConfig, AssetsConfig, ConfigMetadata, DatabaseSettings, ElasticsearchSettings, EmbeddingServiceConfig, FunctionScoreConfig, IndexConfig, InfrastructureConfig, QueryConfig, ProductEnrichConfig, RedisSettings, RerankConfig, RerankFusionConfig, RerankServiceConfig, RuntimeConfig, SearchConfig, SecretsConfig, ServicesConfig, SPUConfig, TenantCatalogConfig, TranslationServiceConfig, ) from translation.settings import build_translation_config class ConfigurationError(Exception): """Raised when configuration validation fails.""" def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: result = deepcopy(base) for key, value in (override or {}).items(): if ( key in result and isinstance(result[key], dict) and isinstance(value, dict) ): result[key] = _deep_merge(result[key], value) else: result[key] = deepcopy(value) return result def _load_yaml(path: Path) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as handle: data = yaml.safe_load(handle) or {} if not isinstance(data, dict): raise ConfigurationError(f"Configuration file root must be a mapping: {path}") return data def _read_rewrite_dictionary(path: Path) -> Dict[str, str]: rewrite_dict: Dict[str, str] = {} if not path.exists(): return rewrite_dict with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 2: continue original = parts[0].strip() replacement = parts[1].strip() if original and replacement: rewrite_dict[original] = replacement return rewrite_dict def _read_synonym_csv_dictionary(path: Path) -> List[List[str]]: rows: List[List[str]] = [] if not path.exists(): return rows with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue parts = [segment.strip() for segment in line.split(",")] normalized = [segment for segment in parts if segment] if normalized: rows.append(normalized) return rows def _read_product_title_exclusion_dictionary(path: Path) -> List[Dict[str, List[str]]]: rules: List[Dict[str, List[str]]] = [] if not path.exists(): return rules with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue parts = [segment.strip() for segment in line.split("\t")] if len(parts) != 4: continue def _split_cell(cell: str) -> List[str]: return [item.strip() for item in cell.split(",") if item.strip()] rules.append( { "zh_trigger_terms": _split_cell(parts[0]), "en_trigger_terms": _split_cell(parts[1]), "zh_title_exclusions": _split_cell(parts[2]), "en_title_exclusions": _split_cell(parts[3]), } ) return rules _DEFAULT_STYLE_INTENT_DIMENSION_ALIASES: Dict[str, List[str]] = { "color": ["color", "colors", "colour", "colours", "颜色", "色", "色系"], "size": ["size", "sizes", "sizing", "尺码", "尺寸", "码数", "号码", "码"], } class AppConfigLoader: """Load the unified application configuration.""" def __init__( self, *, config_dir: Optional[Path] = None, config_file: Optional[Path] = None, env_file: Optional[Path] = None, ) -> None: self.config_dir = Path(config_dir or Path(__file__).parent) self.config_file = Path(config_file) if config_file is not None else None self.project_root = self.config_dir.parent self.env_file = Path(env_file) if env_file is not None else self.project_root / ".env" def load(self, validate: bool = True) -> AppConfig: self._load_env() raw_config, loaded_files = self._load_raw_config() app_config = self._build_app_config(raw_config, loaded_files) if validate: self._validate(app_config) return app_config def _load_env(self) -> None: if _load_dotenv is not None: _load_dotenv(self.env_file, override=False) return _load_env_file_fallback(self.env_file) def _load_raw_config(self) -> Tuple[Dict[str, Any], List[str]]: env_name = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" loaded_files: List[str] = [] raw: Dict[str, Any] = {} if self.config_file is not None: config_path = self.config_file if not config_path.exists(): raise ConfigurationError(f"Configuration file not found: {config_path}") raw = _deep_merge(raw, _load_yaml(config_path)) loaded_files.append(str(config_path)) else: base_path = self.config_dir / "base.yaml" legacy_path = self.config_dir / "config.yaml" primary_path = base_path if base_path.exists() else legacy_path if not primary_path.exists(): raise ConfigurationError(f"Configuration file not found: {primary_path}") raw = _deep_merge(raw, _load_yaml(primary_path)) loaded_files.append(str(primary_path)) env_path = self.config_dir / "environments" / f"{env_name}.yaml" if env_path.exists(): raw = _deep_merge(raw, _load_yaml(env_path)) loaded_files.append(str(env_path)) tenant_dir = self.config_dir / "tenants" if tenant_dir.is_dir(): tenant_files = sorted(tenant_dir.glob("*.yaml")) if tenant_files: tenant_config = {"default": {}, "tenants": {}} default_path = tenant_dir / "_default.yaml" if default_path.exists(): tenant_config["default"] = _load_yaml(default_path) loaded_files.append(str(default_path)) for tenant_path in tenant_files: if tenant_path.name == "_default.yaml": continue tenant_config["tenants"][tenant_path.stem] = _load_yaml(tenant_path) loaded_files.append(str(tenant_path)) raw["tenant_config"] = tenant_config return raw, loaded_files def _build_app_config(self, raw: Dict[str, Any], loaded_files: List[str]) -> AppConfig: assets_cfg = raw.get("assets") if isinstance(raw.get("assets"), dict) else {} rewrite_path = ( assets_cfg.get("query_rewrite_dictionary_path") or assets_cfg.get("rewrite_dictionary_path") or self.config_dir / "dictionaries" / "query_rewrite.dict" ) rewrite_path = Path(rewrite_path) if not rewrite_path.is_absolute(): rewrite_path = (self.project_root / rewrite_path).resolve() if not rewrite_path.exists(): legacy_rewrite_path = (self.config_dir / "query_rewrite.dict").resolve() if legacy_rewrite_path.exists(): rewrite_path = legacy_rewrite_path rewrite_dictionary = _read_rewrite_dictionary(rewrite_path) search_config = self._build_search_config(raw, rewrite_dictionary) services_config = self._build_services_config(raw.get("services") or {}) tenants_config = self._build_tenants_config(raw.get("tenant_config") or {}) runtime_config = self._build_runtime_config() infrastructure_config = self._build_infrastructure_config(runtime_config.environment) product_enrich_raw = raw.get("product_enrich") if isinstance(raw.get("product_enrich"), dict) else {} product_enrich_config = ProductEnrichConfig( max_workers=int(product_enrich_raw.get("max_workers", 40)), ) metadata = ConfigMetadata( loaded_files=tuple(loaded_files), config_hash="", deprecated_keys=tuple(self._detect_deprecated_keys(raw)), ) app_config = AppConfig( runtime=runtime_config, infrastructure=infrastructure_config, product_enrich=product_enrich_config, search=search_config, services=services_config, tenants=tenants_config, assets=AssetsConfig(query_rewrite_dictionary_path=rewrite_path), metadata=metadata, ) config_hash = self._compute_hash(app_config) return AppConfig( runtime=app_config.runtime, infrastructure=app_config.infrastructure, product_enrich=app_config.product_enrich, search=app_config.search, services=app_config.services, tenants=app_config.tenants, assets=app_config.assets, metadata=ConfigMetadata( loaded_files=app_config.metadata.loaded_files, config_hash=config_hash, deprecated_keys=app_config.metadata.deprecated_keys, ), ) def _build_search_config(self, raw: Dict[str, Any], rewrite_dictionary: Dict[str, str]) -> SearchConfig: field_boosts = raw.get("field_boosts") or {} if not isinstance(field_boosts, dict): raise ConfigurationError("field_boosts must be a mapping") indexes: List[IndexConfig] = [] for item in raw.get("indexes") or []: if not isinstance(item, dict): raise ConfigurationError("indexes items must be mappings") indexes.append( IndexConfig( name=str(item["name"]), label=str(item.get("label") or item["name"]), fields=list(item.get("fields") or []), boost=float(item.get("boost", 1.0)), example=item.get("example"), ) ) query_cfg = raw.get("query_config") if isinstance(raw.get("query_config"), dict) else {} search_fields = query_cfg.get("search_fields") if isinstance(query_cfg.get("search_fields"), dict) else {} text_strategy = ( query_cfg.get("text_query_strategy") if isinstance(query_cfg.get("text_query_strategy"), dict) else {} ) style_intent_cfg = ( query_cfg.get("style_intent") if isinstance(query_cfg.get("style_intent"), dict) else {} ) product_title_exclusion_cfg = ( query_cfg.get("product_title_exclusion") if isinstance(query_cfg.get("product_title_exclusion"), dict) else {} ) def _resolve_project_path(value: Any, default_path: Path) -> Path: if value in (None, ""): return default_path candidate = Path(str(value)) if candidate.is_absolute(): return candidate return self.project_root / candidate style_color_path = _resolve_project_path( style_intent_cfg.get("color_dictionary_path"), self.config_dir / "dictionaries" / "style_intent_color.csv", ) style_size_path = _resolve_project_path( style_intent_cfg.get("size_dictionary_path"), self.config_dir / "dictionaries" / "style_intent_size.csv", ) configured_dimension_aliases = ( style_intent_cfg.get("dimension_aliases") if isinstance(style_intent_cfg.get("dimension_aliases"), dict) else {} ) style_dimension_aliases: Dict[str, List[str]] = {} for intent_type, default_aliases in _DEFAULT_STYLE_INTENT_DIMENSION_ALIASES.items(): aliases = configured_dimension_aliases.get(intent_type) if isinstance(aliases, list) and aliases: style_dimension_aliases[intent_type] = [str(alias) for alias in aliases if str(alias).strip()] else: style_dimension_aliases[intent_type] = list(default_aliases) style_intent_terms = { "color": _read_synonym_csv_dictionary(style_color_path), "size": _read_synonym_csv_dictionary(style_size_path), } product_title_exclusion_path = _resolve_project_path( product_title_exclusion_cfg.get("dictionary_path"), self.config_dir / "dictionaries" / "product_title_exclusion.tsv", ) query_config = QueryConfig( supported_languages=list(query_cfg.get("supported_languages") or ["zh", "en"]), default_language=str(query_cfg.get("default_language") or "en"), enable_text_embedding=bool(query_cfg.get("enable_text_embedding", True)), enable_query_rewrite=bool(query_cfg.get("enable_query_rewrite", True)), rewrite_dictionary=rewrite_dictionary, text_embedding_field=query_cfg.get("text_embedding_field"), image_embedding_field=query_cfg.get("image_embedding_field"), source_fields=query_cfg.get("source_fields"), knn_boost=float(query_cfg.get("knn_boost", 0.25)), multilingual_fields=list( search_fields.get( "multilingual_fields", [], ) ), shared_fields=list( search_fields.get( "shared_fields", [], ) or [] ), core_multilingual_fields=list( search_fields.get( "core_multilingual_fields", [], ) ), base_minimum_should_match=str(text_strategy.get("base_minimum_should_match", "70%")), translation_minimum_should_match=str(text_strategy.get("translation_minimum_should_match", "70%")), translation_boost=float(text_strategy.get("translation_boost", 0.4)), tie_breaker_base_query=float(text_strategy.get("tie_breaker_base_query", 0.9)), best_fields={ str(field): float(boost) for field, boost in dict(text_strategy.get("best_fields") or {}).items() }, best_fields_boost=float(text_strategy.get("best_fields_boost", 2.0)), phrase_fields={ str(field): float(boost) for field, boost in dict(text_strategy.get("phrase_fields") or {}).items() }, phrase_match_boost=float(text_strategy.get("phrase_match_boost", 3.0)), zh_to_en_model=str(query_cfg.get("zh_to_en_model") or "opus-mt-zh-en"), en_to_zh_model=str(query_cfg.get("en_to_zh_model") or "opus-mt-en-zh"), default_translation_model=str( query_cfg.get("default_translation_model") or "nllb-200-distilled-600m" ), zh_to_en_model_source_not_in_index=( str(v) if (v := query_cfg.get("zh_to_en_model__source_not_in_index")) not in (None, "") else None ), en_to_zh_model_source_not_in_index=( str(v) if (v := query_cfg.get("en_to_zh_model__source_not_in_index")) not in (None, "") else None ), default_translation_model_source_not_in_index=( str(v) if (v := query_cfg.get("default_translation_model__source_not_in_index")) not in (None, "") else None ), translation_embedding_wait_budget_ms_source_in_index=int( query_cfg.get("translation_embedding_wait_budget_ms_source_in_index", 80) ), translation_embedding_wait_budget_ms_source_not_in_index=int( query_cfg.get("translation_embedding_wait_budget_ms_source_not_in_index", 200) ), style_intent_enabled=bool(style_intent_cfg.get("enabled", True)), style_intent_terms=style_intent_terms, style_intent_dimension_aliases=style_dimension_aliases, product_title_exclusion_enabled=bool(product_title_exclusion_cfg.get("enabled", True)), product_title_exclusion_rules=_read_product_title_exclusion_dictionary( product_title_exclusion_path ), ) function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {} rerank_cfg = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} fusion_raw = rerank_cfg.get("fusion") if isinstance(rerank_cfg.get("fusion"), dict) else {} spu_cfg = raw.get("spu_config") if isinstance(raw.get("spu_config"), dict) else {} return SearchConfig( field_boosts={str(key): float(value) for key, value in field_boosts.items()}, indexes=indexes, query_config=query_config, function_score=FunctionScoreConfig( score_mode=str(function_score_cfg.get("score_mode") or "sum"), boost_mode=str(function_score_cfg.get("boost_mode") or "multiply"), functions=list(function_score_cfg.get("functions") or []), ), rerank=RerankConfig( enabled=bool(rerank_cfg.get("enabled", True)), rerank_window=int(rerank_cfg.get("rerank_window", 384)), timeout_sec=float(rerank_cfg.get("timeout_sec", 15.0)), weight_es=float(rerank_cfg.get("weight_es", 0.4)), weight_ai=float(rerank_cfg.get("weight_ai", 0.6)), rerank_query_template=str(rerank_cfg.get("rerank_query_template") or "{query}"), rerank_doc_template=str(rerank_cfg.get("rerank_doc_template") or "{title}"), fusion=RerankFusionConfig( rerank_bias=float(fusion_raw.get("rerank_bias", 0.00001)), rerank_exponent=float(fusion_raw.get("rerank_exponent", 1.0)), text_bias=float(fusion_raw.get("text_bias", 0.1)), text_exponent=float(fusion_raw.get("text_exponent", 0.35)), knn_bias=float(fusion_raw.get("knn_bias", 0.6)), knn_exponent=float(fusion_raw.get("knn_exponent", 0.2)), ), ), spu_config=SPUConfig( enabled=bool(spu_cfg.get("enabled", False)), spu_field=spu_cfg.get("spu_field"), inner_hits_size=int(spu_cfg.get("inner_hits_size", 3)), searchable_option_dimensions=list( spu_cfg.get("searchable_option_dimensions") or ["option1", "option2", "option3"] ), ), es_index_name=str(raw.get("es_index_name") or "search_products"), es_settings=dict(raw.get("es_settings") or {}), ) def _build_services_config(self, raw: Dict[str, Any]) -> ServicesConfig: if not isinstance(raw, dict): raise ConfigurationError("services must be a mapping") translation_raw = raw.get("translation") if isinstance(raw.get("translation"), dict) else {} normalized_translation = build_translation_config(translation_raw) translation_config = TranslationServiceConfig( endpoint=str(normalized_translation["service_url"]).rstrip("/"), timeout_sec=float(normalized_translation["timeout_sec"]), default_model=str(normalized_translation["default_model"]), default_scene=str(normalized_translation["default_scene"]), cache=dict(normalized_translation["cache"]), capabilities={str(key): dict(value) for key, value in normalized_translation["capabilities"].items()}, ) embedding_raw = raw.get("embedding") if isinstance(raw.get("embedding"), dict) else {} embedding_provider = str(embedding_raw.get("provider") or "http").strip().lower() embedding_providers = dict(embedding_raw.get("providers") or {}) if embedding_provider not in embedding_providers: raise ConfigurationError(f"services.embedding.providers.{embedding_provider} must be configured") embedding_backend = str(embedding_raw.get("backend") or "").strip().lower() embedding_backends = { str(key).strip().lower(): dict(value) for key, value in dict(embedding_raw.get("backends") or {}).items() } if embedding_backend not in embedding_backends: raise ConfigurationError(f"services.embedding.backends.{embedding_backend} must be configured") image_backend = str(embedding_raw.get("image_backend") or "clip_as_service").strip().lower() image_backends = { str(key).strip().lower(): dict(value) for key, value in dict(embedding_raw.get("image_backends") or {}).items() } if not image_backends: image_backends = { "clip_as_service": { "server": "grpc://127.0.0.1:51000", "model_name": "CN-CLIP/ViT-L-14", "batch_size": 8, "normalize_embeddings": True, }, "local_cnclip": { "model_name": "ViT-L-14", "device": None, "batch_size": 8, "normalize_embeddings": True, }, } if image_backend not in image_backends: raise ConfigurationError(f"services.embedding.image_backends.{image_backend} must be configured") embedding_config = EmbeddingServiceConfig( provider=embedding_provider, providers=embedding_providers, backend=embedding_backend, backends=embedding_backends, image_backend=image_backend, image_backends=image_backends, ) rerank_raw = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} rerank_provider = str(rerank_raw.get("provider") or "http").strip().lower() rerank_providers = dict(rerank_raw.get("providers") or {}) if rerank_provider not in rerank_providers: raise ConfigurationError(f"services.rerank.providers.{rerank_provider} must be configured") rerank_backend = str(rerank_raw.get("backend") or "").strip().lower() rerank_backends = { str(key).strip().lower(): dict(value) for key, value in dict(rerank_raw.get("backends") or {}).items() } if rerank_backend not in rerank_backends: raise ConfigurationError(f"services.rerank.backends.{rerank_backend} must be configured") rerank_request = dict(rerank_raw.get("request") or {}) rerank_request.setdefault("max_docs", 1000) rerank_request.setdefault("normalize", True) rerank_config = RerankServiceConfig( provider=rerank_provider, providers=rerank_providers, backend=rerank_backend, backends=rerank_backends, request=rerank_request, ) return ServicesConfig( translation=translation_config, embedding=embedding_config, rerank=rerank_config, ) def _build_tenants_config(self, raw: Dict[str, Any]) -> TenantCatalogConfig: if not isinstance(raw, dict): raise ConfigurationError("tenant_config must be a mapping") default_cfg = raw.get("default") if isinstance(raw.get("default"), dict) else {} tenants_cfg = raw.get("tenants") if isinstance(raw.get("tenants"), dict) else {} return TenantCatalogConfig( default=dict(default_cfg), tenants={str(key): dict(value) for key, value in tenants_cfg.items()}, ) def _build_runtime_config(self) -> RuntimeConfig: environment = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" namespace = os.getenv("ES_INDEX_NAMESPACE") if namespace is None: namespace = "" if environment == "prod" else f"{environment}_" return RuntimeConfig( environment=environment, index_namespace=namespace, api_host=os.getenv("API_HOST", "0.0.0.0"), api_port=int(os.getenv("API_PORT", 6002)), indexer_host=os.getenv("INDEXER_HOST", "0.0.0.0"), indexer_port=int(os.getenv("INDEXER_PORT", 6004)), embedding_host=os.getenv("EMBEDDING_HOST", "0.0.0.0"), embedding_port=int(os.getenv("EMBEDDING_PORT", 6005)), embedding_text_port=int(os.getenv("EMBEDDING_TEXT_PORT", 6005)), embedding_image_port=int(os.getenv("EMBEDDING_IMAGE_PORT", 6008)), translator_host=os.getenv("TRANSLATION_HOST", "127.0.0.1"), translator_port=int(os.getenv("TRANSLATION_PORT", 6006)), reranker_host=os.getenv("RERANKER_HOST", "127.0.0.1"), reranker_port=int(os.getenv("RERANKER_PORT", 6007)), ) def _build_infrastructure_config(self, environment: str) -> InfrastructureConfig: del environment return InfrastructureConfig( elasticsearch=ElasticsearchSettings( host=os.getenv("ES_HOST", "http://localhost:9200"), username=os.getenv("ES_USERNAME"), password=os.getenv("ES_PASSWORD"), ), redis=RedisSettings( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6479)), snapshot_db=int(os.getenv("REDIS_SNAPSHOT_DB", 0)), password=os.getenv("REDIS_PASSWORD"), socket_timeout=int(os.getenv("REDIS_SOCKET_TIMEOUT", 1)), socket_connect_timeout=int(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", 1)), retry_on_timeout=os.getenv("REDIS_RETRY_ON_TIMEOUT", "false").strip().lower() == "true", cache_expire_days=int(os.getenv("REDIS_CACHE_EXPIRE_DAYS", 360 * 2)), embedding_cache_prefix=os.getenv("REDIS_EMBEDDING_CACHE_PREFIX", "embedding"), anchor_cache_prefix=os.getenv("REDIS_ANCHOR_CACHE_PREFIX", "product_anchors"), anchor_cache_expire_days=int(os.getenv("REDIS_ANCHOR_CACHE_EXPIRE_DAYS", 30)), ), database=DatabaseSettings( host=os.getenv("DB_HOST"), port=int(os.getenv("DB_PORT", 3306)) if os.getenv("DB_PORT") else 3306, database=os.getenv("DB_DATABASE"), username=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), ), secrets=SecretsConfig( dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"), deepl_auth_key=os.getenv("DEEPL_AUTH_KEY"), ), ) def _validate(self, app_config: AppConfig) -> None: errors: List[str] = [] if not app_config.search.es_index_name: errors.append("search.es_index_name is required") if not app_config.search.field_boosts: errors.append("search.field_boosts cannot be empty") else: for field_name, boost in app_config.search.field_boosts.items(): if boost < 0: errors.append(f"field_boosts.{field_name} must be non-negative") query_config = app_config.search.query_config if not query_config.supported_languages: errors.append("query_config.supported_languages must not be empty") if query_config.default_language not in query_config.supported_languages: errors.append("query_config.default_language must be included in supported_languages") for name, values in ( ("multilingual_fields", query_config.multilingual_fields), ("core_multilingual_fields", query_config.core_multilingual_fields), ): if not values: errors.append(f"query_config.{name} must not be empty") if not set(query_config.core_multilingual_fields).issubset(set(query_config.multilingual_fields)): errors.append("query_config.core_multilingual_fields must be a subset of multilingual_fields") if app_config.search.spu_config.enabled and not app_config.search.spu_config.spu_field: errors.append("spu_config.spu_field is required when spu_config.enabled is true") if not app_config.tenants.default or not app_config.tenants.default.get("index_languages"): errors.append("tenant_config.default.index_languages must be configured") if app_config.metadata.deprecated_keys: errors.append( "Deprecated tenant config keys are not supported: " + ", ".join(app_config.metadata.deprecated_keys) ) embedding_provider_cfg = app_config.services.embedding.get_provider_config() if not embedding_provider_cfg.get("text_base_url"): errors.append("services.embedding.providers..text_base_url is required") if not embedding_provider_cfg.get("image_base_url"): errors.append("services.embedding.providers..image_base_url is required") rerank_provider_cfg = app_config.services.rerank.get_provider_config() if not rerank_provider_cfg.get("service_url") and not rerank_provider_cfg.get("base_url"): errors.append("services.rerank.providers..service_url or base_url is required") if errors: raise ConfigurationError("Configuration validation failed:\n" + "\n".join(f" - {err}" for err in errors)) def _compute_hash(self, app_config: AppConfig) -> str: payload = asdict(app_config) payload["metadata"]["config_hash"] = "" payload["infrastructure"]["elasticsearch"]["password"] = "***" if payload["infrastructure"]["elasticsearch"].get("password") else None payload["infrastructure"]["database"]["password"] = "***" if payload["infrastructure"]["database"].get("password") else None payload["infrastructure"]["redis"]["password"] = "***" if payload["infrastructure"]["redis"].get("password") else None payload["infrastructure"]["secrets"]["dashscope_api_key"] = "***" if payload["infrastructure"]["secrets"].get("dashscope_api_key") else None payload["infrastructure"]["secrets"]["deepl_auth_key"] = "***" if payload["infrastructure"]["secrets"].get("deepl_auth_key") else None blob = json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] def _detect_deprecated_keys(self, raw: Dict[str, Any]) -> Iterable[str]: # Translation-era legacy flags have been removed; keep the hook for future # deprecations, but currently no deprecated keys are detected. return () @lru_cache(maxsize=1) def get_app_config() -> AppConfig: """Return the process-global application configuration.""" return AppConfigLoader().load() def reload_app_config() -> AppConfig: """Clear the cached configuration and reload it.""" get_app_config.cache_clear() return get_app_config() def _load_env_file_fallback(path: Path) -> None: if not path.exists(): return with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value