""" Unified application configuration loader. This module is the single source of truth for loading, merging, normalizing, and validating application configuration. """ from __future__ import annotations import hashlib import json import os import csv from copy import deepcopy from dataclasses import asdict from functools import lru_cache from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import yaml try: from dotenv import load_dotenv as _load_dotenv # type: ignore except Exception: # pragma: no cover _load_dotenv = None from config.schema import ( AppConfig, AssetsConfig, CoarseRankConfig, CoarseRankFusionConfig, ConfigMetadata, DatabaseSettings, ElasticsearchSettings, EmbeddingServiceConfig, FineRankConfig, FunctionScoreConfig, IndexConfig, InfrastructureConfig, QueryConfig, RedisSettings, RerankConfig, RerankFusionConfig, RerankServiceConfig, RerankServiceInstanceConfig, RuntimeConfig, SearchConfig, SearchEvaluationConfig, SecretsConfig, ServicesConfig, SPUConfig, TenantCatalogConfig, TranslationServiceConfig, ) from translation.settings import build_translation_config class ConfigurationError(Exception): """Raised when configuration validation fails.""" def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: result = deepcopy(base) for key, value in (override or {}).items(): if ( key in result and isinstance(result[key], dict) and isinstance(value, dict) ): result[key] = _deep_merge(result[key], value) else: result[key] = deepcopy(value) return result def _load_yaml(path: Path) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as handle: data = yaml.safe_load(handle) or {} if not isinstance(data, dict): raise ConfigurationError(f"Configuration file root must be a mapping: {path}") return data def _read_rewrite_dictionary(path: Path) -> Dict[str, str]: rewrite_dict: Dict[str, str] = {} if not path.exists(): return rewrite_dict with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 2: continue original = parts[0].strip() replacement = parts[1].strip() if original and replacement: rewrite_dict[original] = replacement return rewrite_dict def _read_synonym_csv_dictionary(path: Path) -> List[Dict[str, List[str]]]: rows: List[Dict[str, List[str]]] = [] if not path.exists(): return rows def _split_terms(cell: str) -> List[str]: return [item.strip() for item in str(cell or "").split(",") if item.strip()] with open(path, "r", encoding="utf-8") as handle: reader = csv.reader(handle) for parts in reader: if not parts: continue if parts[0].strip().startswith("#"): continue normalized = [segment.strip() for segment in parts] if len(normalized) < 3: continue row = { "en_terms": _split_terms(normalized[0]), "zh_terms": _split_terms(normalized[1]), "attribute_terms": _split_terms(normalized[2]), } if any(row.values()): rows.append(row) return rows def _read_product_title_exclusion_dictionary(path: Path) -> List[Dict[str, List[str]]]: rules: List[Dict[str, List[str]]] = [] if not path.exists(): return rules with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue parts = [segment.strip() for segment in line.split("\t")] if len(parts) != 4: continue def _split_cell(cell: str) -> List[str]: return [item.strip() for item in cell.split(",") if item.strip()] rules.append( { "zh_trigger_terms": _split_cell(parts[0]), "en_trigger_terms": _split_cell(parts[1]), "zh_title_exclusions": _split_cell(parts[2]), "en_title_exclusions": _split_cell(parts[3]), } ) return rules _DEFAULT_STYLE_INTENT_DIMENSION_ALIASES: Dict[str, List[str]] = { "color": ["color", "colors", "colour", "colours", "颜色", "色", "色系"], "size": ["size", "sizes", "sizing", "尺码", "尺寸", "码数", "号码", "码"], } class AppConfigLoader: """Load the unified application configuration.""" def __init__( self, *, config_dir: Optional[Path] = None, config_file: Optional[Path] = None, env_file: Optional[Path] = None, ) -> None: self.config_dir = Path(config_dir or Path(__file__).parent) self.config_file = Path(config_file) if config_file is not None else None self.project_root = self.config_dir.parent self.env_file = Path(env_file) if env_file is not None else self.project_root / ".env" def load(self, validate: bool = True) -> AppConfig: self._load_env() raw_config, loaded_files = self._load_raw_config() app_config = self._build_app_config(raw_config, loaded_files) if validate: self._validate(app_config) return app_config def _load_env(self) -> None: if _load_dotenv is not None: _load_dotenv(self.env_file, override=False) return _load_env_file_fallback(self.env_file) def _load_raw_config(self) -> Tuple[Dict[str, Any], List[str]]: env_name = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" loaded_files: List[str] = [] raw: Dict[str, Any] = {} if self.config_file is not None: config_path = self.config_file if not config_path.exists(): raise ConfigurationError(f"Configuration file not found: {config_path}") raw = _deep_merge(raw, _load_yaml(config_path)) loaded_files.append(str(config_path)) else: base_path = self.config_dir / "base.yaml" legacy_path = self.config_dir / "config.yaml" primary_path = base_path if base_path.exists() else legacy_path if not primary_path.exists(): raise ConfigurationError(f"Configuration file not found: {primary_path}") raw = _deep_merge(raw, _load_yaml(primary_path)) loaded_files.append(str(primary_path)) env_path = self.config_dir / "environments" / f"{env_name}.yaml" if env_path.exists(): raw = _deep_merge(raw, _load_yaml(env_path)) loaded_files.append(str(env_path)) tenant_dir = self.config_dir / "tenants" if tenant_dir.is_dir(): tenant_files = sorted(tenant_dir.glob("*.yaml")) if tenant_files: tenant_config = {"default": {}, "tenants": {}} default_path = tenant_dir / "_default.yaml" if default_path.exists(): tenant_config["default"] = _load_yaml(default_path) loaded_files.append(str(default_path)) for tenant_path in tenant_files: if tenant_path.name == "_default.yaml": continue tenant_config["tenants"][tenant_path.stem] = _load_yaml(tenant_path) loaded_files.append(str(tenant_path)) raw["tenant_config"] = tenant_config return raw, loaded_files def _build_app_config(self, raw: Dict[str, Any], loaded_files: List[str]) -> AppConfig: assets_cfg = raw.get("assets") if isinstance(raw.get("assets"), dict) else {} rewrite_path = ( assets_cfg.get("query_rewrite_dictionary_path") or assets_cfg.get("rewrite_dictionary_path") or self.config_dir / "dictionaries" / "query_rewrite.dict" ) rewrite_path = Path(rewrite_path) if not rewrite_path.is_absolute(): rewrite_path = (self.project_root / rewrite_path).resolve() if not rewrite_path.exists(): legacy_rewrite_path = (self.config_dir / "query_rewrite.dict").resolve() if legacy_rewrite_path.exists(): rewrite_path = legacy_rewrite_path rewrite_dictionary = _read_rewrite_dictionary(rewrite_path) search_config = self._build_search_config(raw, rewrite_dictionary) services_config = self._build_services_config(raw.get("services") or {}) tenants_config = self._build_tenants_config(raw.get("tenant_config") or {}) runtime_config = self._build_runtime_config() infrastructure_config = self._build_infrastructure_config(runtime_config.environment) search_evaluation_config = self._build_search_evaluation_config(raw, runtime_config) metadata = ConfigMetadata( loaded_files=tuple(loaded_files), config_hash="", deprecated_keys=tuple(self._detect_deprecated_keys(raw)), ) app_config = AppConfig( runtime=runtime_config, infrastructure=infrastructure_config, search=search_config, services=services_config, tenants=tenants_config, assets=AssetsConfig(query_rewrite_dictionary_path=rewrite_path), search_evaluation=search_evaluation_config, metadata=metadata, ) config_hash = self._compute_hash(app_config) return AppConfig( runtime=app_config.runtime, infrastructure=app_config.infrastructure, search=app_config.search, services=app_config.services, tenants=app_config.tenants, assets=app_config.assets, search_evaluation=app_config.search_evaluation, metadata=ConfigMetadata( loaded_files=app_config.metadata.loaded_files, config_hash=config_hash, deprecated_keys=app_config.metadata.deprecated_keys, ), ) def _build_search_evaluation_config(self, raw: Dict[str, Any], runtime: RuntimeConfig) -> SearchEvaluationConfig: se = raw.get("search_evaluation") if isinstance(raw.get("search_evaluation"), dict) else {} default_artifact = (self.project_root / "artifacts" / "search_evaluation").resolve() default_queries = (self.project_root / "scripts" / "evaluation" / "queries" / "queries.txt").resolve() default_log_dir = (self.project_root / "logs").resolve() default_search_base = f"http://127.0.0.1:{int(runtime.api_port)}" def _project_path(value: Any, default: Path) -> Path: if value in (None, ""): return default candidate = Path(str(value)) if candidate.is_absolute(): return candidate.resolve() return (self.project_root / candidate).resolve() def _str(key: str, default: str) -> str: v = se.get(key) if v is None or (isinstance(v, str) and not v.strip()): return default return str(v).strip() def _int(key: str, default: int) -> int: v = se.get(key) if v is None: return default return int(v) def _float(key: str, default: float) -> float: v = se.get(key) if v is None: return default return float(v) def _bool(key: str, default: bool) -> bool: v = se.get(key) if v is None: return default if isinstance(v, bool): return v if isinstance(v, str): return v.strip().lower() in {"1", "true", "yes", "on"} return bool(v) raw_search_url = se.get("search_base_url") if raw_search_url is None or (isinstance(raw_search_url, str) and not str(raw_search_url).strip()): search_base_url = default_search_base else: search_base_url = str(raw_search_url).strip() return SearchEvaluationConfig( artifact_root=_project_path(se.get("artifact_root"), default_artifact), queries_file=_project_path(se.get("queries_file"), default_queries), eval_log_dir=_project_path(se.get("eval_log_dir"), default_log_dir), default_tenant_id=_str("default_tenant_id", "163"), search_base_url=search_base_url, web_host=_str("web_host", "0.0.0.0"), web_port=_int("web_port", 6010), judge_model=_str("judge_model", "qwen3.5-plus"), judge_enable_thinking=_bool("judge_enable_thinking", False), judge_dashscope_batch=_bool("judge_dashscope_batch", False), intent_model=_str("intent_model", "qwen3-max"), intent_enable_thinking=_bool("intent_enable_thinking", True), judge_batch_completion_window=_str("judge_batch_completion_window", "24h"), judge_batch_poll_interval_sec=_float("judge_batch_poll_interval_sec", 10.0), build_search_depth=_int("build_search_depth", 1000), build_rerank_depth=_int("build_rerank_depth", 10000), annotate_search_top_k=_int("annotate_search_top_k", 120), annotate_rerank_top_k=_int("annotate_rerank_top_k", 200), batch_top_k=_int("batch_top_k", 100), audit_top_k=_int("audit_top_k", 100), audit_limit_suspicious=_int("audit_limit_suspicious", 5), default_language=_str("default_language", "en"), search_recall_top_k=_int("search_recall_top_k", 200), rerank_high_threshold=_float("rerank_high_threshold", 0.5), rerank_high_skip_count=_int("rerank_high_skip_count", 1000), rebuild_llm_batch_size=_int("rebuild_llm_batch_size", 50), rebuild_min_llm_batches=_int("rebuild_min_llm_batches", 10), rebuild_max_llm_batches=_int("rebuild_max_llm_batches", 40), rebuild_irrelevant_stop_ratio=_float("rebuild_irrelevant_stop_ratio", 0.799), rebuild_irrel_low_combined_stop_ratio=_float("rebuild_irrel_low_combined_stop_ratio", 0.959), rebuild_irrelevant_stop_streak=_int("rebuild_irrelevant_stop_streak", 3), ) def _build_search_config(self, raw: Dict[str, Any], rewrite_dictionary: Dict[str, str]) -> SearchConfig: field_boosts = raw.get("field_boosts") or {} if not isinstance(field_boosts, dict): raise ConfigurationError("field_boosts must be a mapping") indexes: List[IndexConfig] = [] for item in raw.get("indexes") or []: if not isinstance(item, dict): raise ConfigurationError("indexes items must be mappings") indexes.append( IndexConfig( name=str(item["name"]), label=str(item.get("label") or item["name"]), fields=list(item.get("fields") or []), boost=float(item.get("boost", 1.0)), example=item.get("example"), ) ) query_cfg = raw.get("query_config") if isinstance(raw.get("query_config"), dict) else {} search_fields = query_cfg.get("search_fields") if isinstance(query_cfg.get("search_fields"), dict) else {} text_strategy = ( query_cfg.get("text_query_strategy") if isinstance(query_cfg.get("text_query_strategy"), dict) else {} ) style_intent_cfg = ( query_cfg.get("style_intent") if isinstance(query_cfg.get("style_intent"), dict) else {} ) product_title_exclusion_cfg = ( query_cfg.get("product_title_exclusion") if isinstance(query_cfg.get("product_title_exclusion"), dict) else {} ) def _resolve_project_path(value: Any, default_path: Path) -> Path: if value in (None, ""): return default_path candidate = Path(str(value)) if candidate.is_absolute(): return candidate return self.project_root / candidate style_color_path = _resolve_project_path( style_intent_cfg.get("color_dictionary_path"), self.config_dir / "dictionaries" / "style_intent_color.csv", ) style_size_path = _resolve_project_path( style_intent_cfg.get("size_dictionary_path"), self.config_dir / "dictionaries" / "style_intent_size.csv", ) configured_dimension_aliases = ( style_intent_cfg.get("dimension_aliases") if isinstance(style_intent_cfg.get("dimension_aliases"), dict) else {} ) style_dimension_aliases: Dict[str, List[str]] = {} for intent_type, default_aliases in _DEFAULT_STYLE_INTENT_DIMENSION_ALIASES.items(): aliases = configured_dimension_aliases.get(intent_type) if isinstance(aliases, list) and aliases: style_dimension_aliases[intent_type] = [str(alias) for alias in aliases if str(alias).strip()] else: style_dimension_aliases[intent_type] = list(default_aliases) style_intent_terms = { "color": _read_synonym_csv_dictionary(style_color_path), "size": _read_synonym_csv_dictionary(style_size_path), } product_title_exclusion_path = _resolve_project_path( product_title_exclusion_cfg.get("dictionary_path"), self.config_dir / "dictionaries" / "product_title_exclusion.tsv", ) query_config = QueryConfig( supported_languages=list(query_cfg.get("supported_languages") or ["zh", "en"]), default_language=str(query_cfg.get("default_language") or "en"), enable_text_embedding=bool(query_cfg.get("enable_text_embedding", True)), enable_query_rewrite=bool(query_cfg.get("enable_query_rewrite", True)), rewrite_dictionary=rewrite_dictionary, text_embedding_field=query_cfg.get("text_embedding_field"), image_embedding_field=query_cfg.get("image_embedding_field"), source_fields=query_cfg.get("source_fields"), knn_text_boost=float( query_cfg.get("knn_text_boost", query_cfg.get("knn_boost", 0.25)) ), knn_image_boost=float( query_cfg.get("knn_image_boost", query_cfg.get("knn_boost", 0.25)) ), knn_text_k=int(query_cfg.get("knn_text_k", 120)), knn_text_num_candidates=int(query_cfg.get("knn_text_num_candidates", 400)), knn_text_k_long=int(query_cfg.get("knn_text_k_long", 160)), knn_text_num_candidates_long=int( query_cfg.get("knn_text_num_candidates_long", 500) ), knn_image_k=int(query_cfg.get("knn_image_k", 120)), knn_image_num_candidates=int(query_cfg.get("knn_image_num_candidates", 400)), multilingual_fields=list( search_fields.get( "multilingual_fields", [], ) ), shared_fields=list( search_fields.get( "shared_fields", [], ) or [] ), core_multilingual_fields=list( search_fields.get( "core_multilingual_fields", [], ) ), base_minimum_should_match=str(text_strategy.get("base_minimum_should_match", "70%")), translation_minimum_should_match=str(text_strategy.get("translation_minimum_should_match", "70%")), translation_boost=float(text_strategy.get("translation_boost", 0.4)), tie_breaker_base_query=float(text_strategy.get("tie_breaker_base_query", 0.9)), best_fields={ str(field): float(boost) for field, boost in dict(text_strategy.get("best_fields") or {}).items() }, best_fields_boost=float(text_strategy.get("best_fields_boost", 2.0)), phrase_fields={ str(field): float(boost) for field, boost in dict(text_strategy.get("phrase_fields") or {}).items() }, phrase_match_boost=float(text_strategy.get("phrase_match_boost", 3.0)), zh_to_en_model=str(query_cfg.get("zh_to_en_model") or "opus-mt-zh-en"), en_to_zh_model=str(query_cfg.get("en_to_zh_model") or "opus-mt-en-zh"), default_translation_model=str( query_cfg.get("default_translation_model") or "nllb-200-distilled-600m" ), zh_to_en_model_source_not_in_index=( str(v) if (v := query_cfg.get("zh_to_en_model__source_not_in_index")) not in (None, "") else None ), en_to_zh_model_source_not_in_index=( str(v) if (v := query_cfg.get("en_to_zh_model__source_not_in_index")) not in (None, "") else None ), default_translation_model_source_not_in_index=( str(v) if (v := query_cfg.get("default_translation_model__source_not_in_index")) not in (None, "") else None ), translation_embedding_wait_budget_ms_source_in_index=int( query_cfg.get("translation_embedding_wait_budget_ms_source_in_index", 80) ), translation_embedding_wait_budget_ms_source_not_in_index=int( query_cfg.get("translation_embedding_wait_budget_ms_source_not_in_index", 200) ), style_intent_enabled=bool(style_intent_cfg.get("enabled", True)), style_intent_selected_sku_boost=float( style_intent_cfg.get("selected_sku_boost", 1.2) ), style_intent_terms=style_intent_terms, style_intent_dimension_aliases=style_dimension_aliases, product_title_exclusion_enabled=bool(product_title_exclusion_cfg.get("enabled", True)), product_title_exclusion_rules=_read_product_title_exclusion_dictionary( product_title_exclusion_path ), ) function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {} coarse_rank_cfg = raw.get("coarse_rank") if isinstance(raw.get("coarse_rank"), dict) else {} coarse_fusion_raw = ( coarse_rank_cfg.get("fusion") if isinstance(coarse_rank_cfg.get("fusion"), dict) else {} ) fine_rank_cfg = raw.get("fine_rank") if isinstance(raw.get("fine_rank"), dict) else {} rerank_cfg = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} fusion_raw = rerank_cfg.get("fusion") if isinstance(rerank_cfg.get("fusion"), dict) else {} spu_cfg = raw.get("spu_config") if isinstance(raw.get("spu_config"), dict) else {} return SearchConfig( field_boosts={str(key): float(value) for key, value in field_boosts.items()}, indexes=indexes, query_config=query_config, function_score=FunctionScoreConfig( score_mode=str(function_score_cfg.get("score_mode") or "sum"), boost_mode=str(function_score_cfg.get("boost_mode") or "multiply"), functions=list(function_score_cfg.get("functions") or []), ), coarse_rank=CoarseRankConfig( enabled=bool(coarse_rank_cfg.get("enabled", True)), input_window=int(coarse_rank_cfg.get("input_window", 700)), output_window=int(coarse_rank_cfg.get("output_window", 240)), fusion=CoarseRankFusionConfig( es_bias=float(coarse_fusion_raw.get("es_bias", 0.1)), es_exponent=float(coarse_fusion_raw.get("es_exponent", 0.0)), text_bias=float(coarse_fusion_raw.get("text_bias", 0.1)), text_exponent=float(coarse_fusion_raw.get("text_exponent", 0.35)), knn_text_weight=float(coarse_fusion_raw.get("knn_text_weight", 1.0)), knn_image_weight=float(coarse_fusion_raw.get("knn_image_weight", 1.0)), knn_tie_breaker=float(coarse_fusion_raw.get("knn_tie_breaker", 0.0)), knn_bias=float(coarse_fusion_raw.get("knn_bias", 0.6)), knn_exponent=float(coarse_fusion_raw.get("knn_exponent", 0.2)), text_translation_weight=float( coarse_fusion_raw.get("text_translation_weight", 0.8) ), ), ), fine_rank=FineRankConfig( enabled=bool(fine_rank_cfg.get("enabled", True)), input_window=int(fine_rank_cfg.get("input_window", 240)), output_window=int(fine_rank_cfg.get("output_window", 80)), timeout_sec=float(fine_rank_cfg.get("timeout_sec", 10.0)), rerank_query_template=str(fine_rank_cfg.get("rerank_query_template") or "{query}"), rerank_doc_template=str(fine_rank_cfg.get("rerank_doc_template") or "{title}"), service_profile=( str(v) if (v := fine_rank_cfg.get("service_profile")) not in (None, "") else "fine" ), ), rerank=RerankConfig( enabled=bool(rerank_cfg.get("enabled", True)), rerank_window=int(rerank_cfg.get("rerank_window", 384)), timeout_sec=float(rerank_cfg.get("timeout_sec", 15.0)), weight_es=float(rerank_cfg.get("weight_es", 0.4)), weight_ai=float(rerank_cfg.get("weight_ai", 0.6)), rerank_query_template=str(rerank_cfg.get("rerank_query_template") or "{query}"), rerank_doc_template=str(rerank_cfg.get("rerank_doc_template") or "{title}"), service_profile=( str(v) if (v := rerank_cfg.get("service_profile")) not in (None, "") else None ), fusion=RerankFusionConfig( es_bias=float(fusion_raw.get("es_bias", 0.1)), es_exponent=float(fusion_raw.get("es_exponent", 0.0)), rerank_bias=float(fusion_raw.get("rerank_bias", 0.00001)), rerank_exponent=float(fusion_raw.get("rerank_exponent", 1.0)), text_bias=float(fusion_raw.get("text_bias", 0.1)), text_exponent=float(fusion_raw.get("text_exponent", 0.35)), knn_text_weight=float(fusion_raw.get("knn_text_weight", 1.0)), knn_image_weight=float(fusion_raw.get("knn_image_weight", 1.0)), knn_tie_breaker=float(fusion_raw.get("knn_tie_breaker", 0.0)), knn_bias=float(fusion_raw.get("knn_bias", 0.6)), knn_exponent=float(fusion_raw.get("knn_exponent", 0.2)), fine_bias=float(fusion_raw.get("fine_bias", 0.00001)), fine_exponent=float(fusion_raw.get("fine_exponent", 1.0)), text_translation_weight=float( fusion_raw.get("text_translation_weight", 0.8) ), ), ), spu_config=SPUConfig( enabled=bool(spu_cfg.get("enabled", False)), spu_field=spu_cfg.get("spu_field"), inner_hits_size=int(spu_cfg.get("inner_hits_size", 3)), searchable_option_dimensions=list( spu_cfg.get("searchable_option_dimensions") or ["option1", "option2", "option3"] ), ), es_index_name=str(raw.get("es_index_name") or "search_products"), es_settings=dict(raw.get("es_settings") or {}), ) def _build_services_config(self, raw: Dict[str, Any]) -> ServicesConfig: if not isinstance(raw, dict): raise ConfigurationError("services must be a mapping") translation_raw = raw.get("translation") if isinstance(raw.get("translation"), dict) else {} normalized_translation = build_translation_config(translation_raw) translation_config = TranslationServiceConfig( endpoint=str(normalized_translation["service_url"]).rstrip("/"), timeout_sec=float(normalized_translation["timeout_sec"]), default_model=str(normalized_translation["default_model"]), default_scene=str(normalized_translation["default_scene"]), cache=dict(normalized_translation["cache"]), capabilities={str(key): dict(value) for key, value in normalized_translation["capabilities"].items()}, ) embedding_raw = raw.get("embedding") if isinstance(raw.get("embedding"), dict) else {} embedding_provider = str(embedding_raw.get("provider") or "http").strip().lower() embedding_providers = dict(embedding_raw.get("providers") or {}) if embedding_provider not in embedding_providers: raise ConfigurationError(f"services.embedding.providers.{embedding_provider} must be configured") embedding_backend = str(embedding_raw.get("backend") or "").strip().lower() embedding_backends = { str(key).strip().lower(): dict(value) for key, value in dict(embedding_raw.get("backends") or {}).items() } if embedding_backend not in embedding_backends: raise ConfigurationError(f"services.embedding.backends.{embedding_backend} must be configured") image_backend = str(embedding_raw.get("image_backend") or "clip_as_service").strip().lower() image_backends = { str(key).strip().lower(): dict(value) for key, value in dict(embedding_raw.get("image_backends") or {}).items() } if not image_backends: image_backends = { "clip_as_service": { "server": "grpc://127.0.0.1:51000", "model_name": "CN-CLIP/ViT-H-14", "batch_size": 8, "normalize_embeddings": True, }, "local_cnclip": { "model_name": "ViT-H-14", "device": None, "batch_size": 8, "normalize_embeddings": True, }, } if image_backend not in image_backends: raise ConfigurationError(f"services.embedding.image_backends.{image_backend} must be configured") embedding_config = EmbeddingServiceConfig( provider=embedding_provider, providers=embedding_providers, backend=embedding_backend, backends=embedding_backends, image_backend=image_backend, image_backends=image_backends, ) rerank_raw = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} rerank_provider = str(rerank_raw.get("provider") or "http").strip().lower() rerank_providers = dict(rerank_raw.get("providers") or {}) if rerank_provider not in rerank_providers: raise ConfigurationError(f"services.rerank.providers.{rerank_provider} must be configured") rerank_backends = { str(key).strip().lower(): dict(value) for key, value in dict(rerank_raw.get("backends") or {}).items() } default_instance = str(rerank_raw.get("default_instance") or "default").strip() or "default" raw_instances = rerank_raw.get("instances") if isinstance(rerank_raw.get("instances"), dict) else {} if not raw_instances: legacy_backend = str(rerank_raw.get("backend") or "").strip().lower() if legacy_backend not in rerank_backends: raise ConfigurationError(f"services.rerank.backends.{legacy_backend} must be configured") provider_cfg = dict(rerank_providers.get(rerank_provider) or {}) raw_instances = { default_instance: { "host": "0.0.0.0", "port": 6007, "backend": legacy_backend, "base_url": provider_cfg.get("base_url"), "service_url": provider_cfg.get("service_url"), } } rerank_instances = {} for instance_name, instance_raw in raw_instances.items(): if not isinstance(instance_raw, dict): raise ConfigurationError(f"services.rerank.instances.{instance_name} must be a mapping") normalized_instance_name = str(instance_name).strip() backend_name = str(instance_raw.get("backend") or "").strip().lower() if backend_name not in rerank_backends: raise ConfigurationError( f"services.rerank.instances.{normalized_instance_name}.backend must reference configured services.rerank.backends" ) port = int(instance_raw.get("port", 6007)) rerank_instances[normalized_instance_name] = RerankServiceInstanceConfig( host=str(instance_raw.get("host") or "0.0.0.0"), port=port, backend=backend_name, runtime_dir=( str(v) if (v := instance_raw.get("runtime_dir")) not in (None, "") else None ), base_url=( str(v).rstrip("/") if (v := instance_raw.get("base_url")) not in (None, "") else None ), service_url=( str(v).rstrip("/") if (v := instance_raw.get("service_url")) not in (None, "") else None ), ) if default_instance not in rerank_instances: raise ConfigurationError( f"services.rerank.default_instance={default_instance!r} must exist in services.rerank.instances" ) rerank_request = dict(rerank_raw.get("request") or {}) rerank_request.setdefault("max_docs", 1000) rerank_request.setdefault("normalize", True) rerank_config = RerankServiceConfig( provider=rerank_provider, providers=rerank_providers, default_instance=default_instance, instances=rerank_instances, backends=rerank_backends, request=rerank_request, ) return ServicesConfig( translation=translation_config, embedding=embedding_config, rerank=rerank_config, ) def _build_tenants_config(self, raw: Dict[str, Any]) -> TenantCatalogConfig: if not isinstance(raw, dict): raise ConfigurationError("tenant_config must be a mapping") default_cfg = raw.get("default") if isinstance(raw.get("default"), dict) else {} tenants_cfg = raw.get("tenants") if isinstance(raw.get("tenants"), dict) else {} return TenantCatalogConfig( default=dict(default_cfg), tenants={str(key): dict(value) for key, value in tenants_cfg.items()}, ) def _build_runtime_config(self) -> RuntimeConfig: environment = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" namespace = os.getenv("ES_INDEX_NAMESPACE") if namespace is None: namespace = "" if environment == "prod" else f"{environment}_" return RuntimeConfig( environment=environment, index_namespace=namespace, api_host=os.getenv("API_HOST", "0.0.0.0"), api_port=int(os.getenv("API_PORT", 6002)), indexer_host=os.getenv("INDEXER_HOST", "0.0.0.0"), indexer_port=int(os.getenv("INDEXER_PORT", 6004)), embedding_host=os.getenv("EMBEDDING_HOST", "0.0.0.0"), embedding_port=int(os.getenv("EMBEDDING_PORT", 6005)), embedding_text_port=int(os.getenv("EMBEDDING_TEXT_PORT", 6005)), embedding_image_port=int(os.getenv("EMBEDDING_IMAGE_PORT", 6008)), translator_host=os.getenv("TRANSLATION_HOST", "127.0.0.1"), translator_port=int(os.getenv("TRANSLATION_PORT", 6006)), reranker_host=os.getenv("RERANKER_HOST", "127.0.0.1"), reranker_port=int(os.getenv("RERANKER_PORT", 6007)), ) def _build_infrastructure_config(self, environment: str) -> InfrastructureConfig: del environment return InfrastructureConfig( elasticsearch=ElasticsearchSettings( host=os.getenv("ES_HOST", "http://localhost:9200"), username=os.getenv("ES_USERNAME"), password=os.getenv("ES_PASSWORD"), ), redis=RedisSettings( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6479)), snapshot_db=int(os.getenv("REDIS_SNAPSHOT_DB", 0)), password=os.getenv("REDIS_PASSWORD"), socket_timeout=int(os.getenv("REDIS_SOCKET_TIMEOUT", 1)), socket_connect_timeout=int(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", 1)), retry_on_timeout=os.getenv("REDIS_RETRY_ON_TIMEOUT", "false").strip().lower() == "true", cache_expire_days=int(os.getenv("REDIS_CACHE_EXPIRE_DAYS", 360 * 2)), embedding_cache_prefix=os.getenv("REDIS_EMBEDDING_CACHE_PREFIX", "embedding"), ), database=DatabaseSettings( host=os.getenv("DB_HOST"), port=int(os.getenv("DB_PORT", 3306)) if os.getenv("DB_PORT") else 3306, database=os.getenv("DB_DATABASE"), username=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), ), secrets=SecretsConfig( dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"), deepl_auth_key=os.getenv("DEEPL_AUTH_KEY"), ), ) def _validate(self, app_config: AppConfig) -> None: errors: List[str] = [] if not app_config.search.es_index_name: errors.append("search.es_index_name is required") if not app_config.search.field_boosts: errors.append("search.field_boosts cannot be empty") else: for field_name, boost in app_config.search.field_boosts.items(): if boost < 0: errors.append(f"field_boosts.{field_name} must be non-negative") query_config = app_config.search.query_config if not query_config.supported_languages: errors.append("query_config.supported_languages must not be empty") if query_config.default_language not in query_config.supported_languages: errors.append("query_config.default_language must be included in supported_languages") for name, values in ( ("multilingual_fields", query_config.multilingual_fields), ("core_multilingual_fields", query_config.core_multilingual_fields), ): if not values: errors.append(f"query_config.{name} must not be empty") if not set(query_config.core_multilingual_fields).issubset(set(query_config.multilingual_fields)): errors.append("query_config.core_multilingual_fields must be a subset of multilingual_fields") if app_config.search.spu_config.enabled and not app_config.search.spu_config.spu_field: errors.append("spu_config.spu_field is required when spu_config.enabled is true") if not app_config.tenants.default or not app_config.tenants.default.get("index_languages"): errors.append("tenant_config.default.index_languages must be configured") if app_config.metadata.deprecated_keys: errors.append( "Deprecated tenant config keys are not supported: " + ", ".join(app_config.metadata.deprecated_keys) ) embedding_provider_cfg = app_config.services.embedding.get_provider_config() if not embedding_provider_cfg.get("text_base_url"): errors.append("services.embedding.providers..text_base_url is required") if not embedding_provider_cfg.get("image_base_url"): errors.append("services.embedding.providers..image_base_url is required") rerank_provider_cfg = app_config.services.rerank.get_provider_config() provider_instances = rerank_provider_cfg.get("instances") if not isinstance(provider_instances, dict): provider_instances = {} for instance_name in app_config.services.rerank.instances: instance_cfg = app_config.services.rerank.get_instance(instance_name) provider_instance_cfg = provider_instances.get(instance_name) if isinstance(provider_instances, dict) else None has_instance_url = False if isinstance(provider_instance_cfg, dict): has_instance_url = bool(provider_instance_cfg.get("service_url") or provider_instance_cfg.get("base_url")) if not has_instance_url and not instance_cfg.service_url and not instance_cfg.base_url: errors.append( f"services.rerank instance {instance_name!r} must define service_url/base_url either under providers..instances or services.rerank.instances" ) if errors: raise ConfigurationError("Configuration validation failed:\n" + "\n".join(f" - {err}" for err in errors)) def _compute_hash(self, app_config: AppConfig) -> str: payload = asdict(app_config) payload["metadata"]["config_hash"] = "" payload["infrastructure"]["elasticsearch"]["password"] = "***" if payload["infrastructure"]["elasticsearch"].get("password") else None payload["infrastructure"]["database"]["password"] = "***" if payload["infrastructure"]["database"].get("password") else None payload["infrastructure"]["redis"]["password"] = "***" if payload["infrastructure"]["redis"].get("password") else None payload["infrastructure"]["secrets"]["dashscope_api_key"] = "***" if payload["infrastructure"]["secrets"].get("dashscope_api_key") else None payload["infrastructure"]["secrets"]["deepl_auth_key"] = "***" if payload["infrastructure"]["secrets"].get("deepl_auth_key") else None blob = json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] def _detect_deprecated_keys(self, raw: Dict[str, Any]) -> Iterable[str]: # Translation-era legacy flags have been removed; keep the hook for future # deprecations, but currently no deprecated keys are detected. return () @lru_cache(maxsize=1) def get_app_config() -> AppConfig: """Return the process-global application configuration.""" return AppConfigLoader().load() def reload_app_config() -> AppConfig: """Clear the cached configuration and reload it.""" get_app_config.cache_clear() return get_app_config() def _load_env_file_fallback(path: Path) -> None: if not path.exists(): return with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value