From 86d8358b25faf30015b8035300bad7fadb4d308f Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 19 Mar 2026 23:04:11 +0800 Subject: [PATCH] config optimize --- api/app.py | 27 +++++++++++++-------------- api/indexer_app.py | 38 +++++++++++++++++--------------------- api/routes/admin.py | 29 +++++++++++++++++++---------- config/__init__.py | 94 +++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------- config/config.yaml | 20 ++++++++++++++++++++ config/config_loader.py | 531 ++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- config/dictionaries/query_rewrite.dict | 2 ++ config/env_config.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------------------------- config/loader.py | 592 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ config/query_rewrite.dict | 3 --- config/schema.py | 307 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ config/services_config.py | 219 +++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ config/tenant_config_loader.py | 40 +++++++++++----------------------------- config/utils.py | 3 ++- docs/config-system-review-and-redesign.md | 738 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ embeddings/config.py | 65 ++++++++++++++++++++++++++++++----------------------------------- embeddings/image_encoder.py | 5 +++-- embeddings/redis_embedding_cache.py | 17 +++++++++-------- embeddings/server.py | 18 +++--------------- embeddings/text_encoder.py | 9 ++++----- indexer/document_transformer.py | 1 - indexer/incremental_service.py | 4 ++-- indexer/indexing_utils.py | 5 ++--- indexer/mapping_generator.py | 4 ++-- indexer/product_enrich.py | 19 ++++++++++--------- main.py | 30 +++++++++++++++--------------- query/query_parser.py | 4 ++-- reranker/backends/dashscope_rerank.py | 34 +++++----------------------------- reranker/config.py | 44 ++++++++++++++++++++++++-------------------- suggestion/builder.py | 4 ++-- translation/backends/deepl.py | 3 +-- translation/backends/llm.py | 9 ++++----- translation/backends/qwen_mt.py | 4 +--- translation/cache.py | 23 +++++++++++++++-------- translation/client.py | 4 ++-- translation/service.py | 13 ++++++++----- utils/es_client.py | 24 ++++++++---------------- 37 files changed, 2087 insertions(+), 1123 deletions(-) create mode 100644 config/dictionaries/query_rewrite.dict create mode 100644 config/loader.py delete mode 100644 config/query_rewrite.dict create mode 100644 config/schema.py create mode 100644 docs/config-system-review-and-redesign.md diff --git a/api/app.py b/api/app.py index da821a9..b95b053 100644 --- a/api/app.py +++ b/api/app.py @@ -86,8 +86,7 @@ limiter = Limiter(key_func=get_remote_address) # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from config.env_config import ES_CONFIG, DB_CONFIG -from config import ConfigLoader +from config import get_app_config from utils import ESClient from search import Searcher from query import QueryParser @@ -99,7 +98,7 @@ _es_client: Optional[ESClient] = None _searcher: Optional[Searcher] = None _query_parser: Optional[QueryParser] = None _suggestion_service: Optional[SuggestionService] = None -_config = None +_app_config = None def init_service(es_host: str = "http://localhost:9200"): @@ -109,20 +108,20 @@ def init_service(es_host: str = "http://localhost:9200"): Args: es_host: Elasticsearch host URL """ - global _es_client, _searcher, _query_parser, _suggestion_service, _config + global _es_client, _searcher, _query_parser, _suggestion_service, _app_config start_time = time.time() logger.info("Initializing search service (multi-tenant)") # Load configuration logger.info("Loading configuration...") - config_loader = ConfigLoader("config/config.yaml") - _config = config_loader.load_config() + _app_config = get_app_config() + search_config = _app_config.search logger.info("Configuration loaded") # Get ES credentials - es_username = os.getenv('ES_USERNAME') or ES_CONFIG.get('username') - es_password = os.getenv('ES_PASSWORD') or ES_CONFIG.get('password') + es_username = _app_config.infrastructure.elasticsearch.username + es_password = _app_config.infrastructure.elasticsearch.password # Connect to Elasticsearch logger.info(f"Connecting to Elasticsearch at {es_host}...") @@ -139,15 +138,15 @@ def init_service(es_host: str = "http://localhost:9200"): # Initialize components logger.info("Initializing query parser...") - _query_parser = QueryParser(_config) + _query_parser = QueryParser(search_config) logger.info("Initializing searcher...") - _searcher = Searcher(_es_client, _config, _query_parser) + _searcher = Searcher(_es_client, search_config, _query_parser) logger.info("Initializing suggestion service...") _suggestion_service = SuggestionService(_es_client) elapsed = time.time() - start_time - logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}") + logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {search_config.es_index_name}") @@ -182,9 +181,9 @@ def get_suggestion_service() -> SuggestionService: def get_config(): """Get global config instance.""" - if _config is None: + if _app_config is None: raise RuntimeError("Service not initialized") - return _config + return _app_config # Create FastAPI app with enhanced configuration @@ -240,7 +239,7 @@ async def startup_event(): except Exception as e: logger.warning(f"Failed to set thread pool size: {e}, using default") - es_host = os.getenv("ES_HOST", "http://localhost:9200") + es_host = get_app_config().infrastructure.elasticsearch.host logger.info("Starting E-Commerce Search API (Multi-Tenant)") logger.info(f"Elasticsearch Host: {es_host}") diff --git a/api/indexer_app.py b/api/indexer_app.py index 41ad323..eb407bf 100644 --- a/api/indexer_app.py +++ b/api/indexer_app.py @@ -38,8 +38,7 @@ logger = logging.getLogger(__name__) # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from config.env_config import ES_CONFIG # noqa: E402 -from config import ConfigLoader # noqa: E402 +from config import get_app_config # noqa: E402 from utils import ESClient # noqa: E402 from utils.db_connector import create_db_connection # noqa: E402 from indexer.incremental_service import IncrementalIndexerService # noqa: E402 @@ -55,7 +54,7 @@ from .service_registry import ( _es_client: Optional[ESClient] = None -_config = None +_app_config = None _incremental_service: Optional[IncrementalIndexerService] = None _bulk_indexing_service: Optional[BulkIndexingService] = None _suggestion_builder: Optional[SuggestionIndexBuilder] = None @@ -68,20 +67,19 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): This mirrors the indexing-related initialization logic in api.app.init_service but without search-related components. """ - global _es_client, _config, _incremental_service, _bulk_indexing_service, _suggestion_builder + global _es_client, _app_config, _incremental_service, _bulk_indexing_service, _suggestion_builder start_time = time.time() logger.info("Initializing Indexer service") # Load configuration (kept for parity/logging; indexer routes don't depend on it) logger.info("Loading configuration...") - config_loader = ConfigLoader("config/config.yaml") - _config = config_loader.load_config() + _app_config = get_app_config() logger.info("Configuration loaded") # Get ES credentials - es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username") - es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password") + es_username = _app_config.infrastructure.elasticsearch.username + es_password = _app_config.infrastructure.elasticsearch.password # Connect to Elasticsearch logger.info(f"Connecting to Elasticsearch at {es_host} for indexer...") @@ -97,11 +95,12 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): set_es_client(_es_client) # Initialize indexing services (DB is required here) - db_host = os.getenv("DB_HOST") - db_port = int(os.getenv("DB_PORT", 3306)) - db_database = os.getenv("DB_DATABASE") - db_username = os.getenv("DB_USERNAME") - db_password = os.getenv("DB_PASSWORD") + db_config = _app_config.infrastructure.database + db_host = db_config.host + db_port = db_config.port + db_database = db_config.database + db_username = db_config.username + db_password = db_config.password if all([db_host, db_database, db_username, db_password]): logger.info("Initializing database connection for indexing services...") @@ -166,7 +165,7 @@ async def startup_event(): except Exception as e: logger.warning(f"Failed to set thread pool size: {e}, using default") - es_host = os.getenv("ES_HOST", "http://localhost:9200") + es_host = get_app_config().infrastructure.elasticsearch.host logger.info("Starting Indexer API service") logger.info(f"Elasticsearch Host: {es_host}") try: @@ -176,14 +175,11 @@ async def startup_event(): # Eager warmup: build per-tenant transformer bundles at startup to avoid # first-request latency (config/provider/encoder + transformer wiring). try: - if _incremental_service is not None and _config is not None: + if _incremental_service is not None and _app_config is not None: tenants = [] - # config.tenant_config shape: {"default": {...}, "tenants": {"1": {...}, ...}} - tc = getattr(_config, "tenant_config", None) or {} - if isinstance(tc, dict): - tmap = tc.get("tenants") - if isinstance(tmap, dict): - tenants = [str(k) for k in tmap.keys()] + tmap = _app_config.tenants.tenants + if isinstance(tmap, dict): + tenants = [str(k) for k in tmap.keys()] # If no explicit tenants configured, skip warmup. if tenants: warm = _incremental_service.warmup_transformers(tenants) diff --git a/api/routes/admin.py b/api/routes/admin.py index a1fe373..8a84989 100644 --- a/api/routes/admin.py +++ b/api/routes/admin.py @@ -42,23 +42,32 @@ async def health_check(): @router.get("/config") async def get_configuration(): """ - Get current search configuration (sanitized). + Get the effective application configuration (sanitized). """ try: from ..app import get_config - config = get_config() + return get_config().sanitized_dict() + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/config/meta") +async def get_configuration_meta(): + """Get configuration metadata for observability.""" + try: + from ..app import get_config + + config = get_config() return { - "es_index_name": config.es_index_name, - "num_field_boosts": len(config.field_boosts), - "multilingual_fields": config.query_config.multilingual_fields, - "shared_fields": config.query_config.shared_fields, - "core_multilingual_fields": config.query_config.core_multilingual_fields, - "supported_languages": config.query_config.supported_languages, - "spu_enabled": config.spu_config.enabled + "environment": config.runtime.environment, + "config_hash": config.metadata.config_hash, + "loaded_files": list(config.metadata.loaded_files), + "deprecated_keys": list(config.metadata.deprecated_keys), } - except HTTPException: raise except Exception as e: diff --git a/config/__init__.py b/config/__init__.py index 1bef6ad..650f116 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,61 +1,57 @@ -""" -Configuration package for search engine. +"""Unified configuration package exports.""" -Provides configuration loading, validation, and utility functions. -""" - -from .config_loader import ( - SearchConfig, - QueryConfig, - IndexConfig, - SPUConfig, +from config.config_loader import ConfigLoader, ConfigurationError +from config.loader import AppConfigLoader, get_app_config, reload_app_config +from config.schema import ( + AppConfig, FunctionScoreConfig, + IndexConfig, + QueryConfig, RerankConfig, - ConfigLoader, - ConfigurationError -) - -from .utils import ( - get_match_fields_for_index, - get_domain_fields + SPUConfig, + SearchConfig, + ServicesConfig, ) -from .services_config import ( - get_translation_config, - get_embedding_config, - get_rerank_config, +from config.services_config import ( get_embedding_backend_config, - get_rerank_backend_config, - get_translation_base_url, - get_embedding_text_base_url, + get_embedding_config, + get_embedding_image_backend_config, get_embedding_image_base_url, + get_embedding_text_base_url, + get_rerank_backend_config, + get_rerank_config, get_rerank_service_url, + get_translation_base_url, get_translation_cache_config, - ServiceConfig, + get_translation_config, ) +from config.utils import get_domain_fields, get_match_fields_for_index __all__ = [ - # Main config classes - 'SearchConfig', - 'QueryConfig', - 'IndexConfig', - 'SPUConfig', - 'FunctionScoreConfig', - 'RerankConfig', - - # Loader and utilities - 'ConfigLoader', - 'ConfigurationError', - 'get_match_fields_for_index', - 'get_domain_fields', - 'get_translation_config', - 'get_embedding_config', - 'get_rerank_config', - 'get_embedding_backend_config', - 'get_rerank_backend_config', - 'get_translation_base_url', - 'get_embedding_text_base_url', - 'get_embedding_image_base_url', - 'get_rerank_service_url', - 'get_translation_cache_config', - 'ServiceConfig', + "AppConfig", + "AppConfigLoader", + "ConfigLoader", + "ConfigurationError", + "FunctionScoreConfig", + "IndexConfig", + "QueryConfig", + "RerankConfig", + "SPUConfig", + "SearchConfig", + "ServicesConfig", + "get_app_config", + "reload_app_config", + "get_domain_fields", + "get_match_fields_for_index", + "get_translation_config", + "get_embedding_config", + "get_rerank_config", + "get_embedding_backend_config", + "get_embedding_image_backend_config", + "get_rerank_backend_config", + "get_translation_base_url", + "get_embedding_text_base_url", + "get_embedding_image_base_url", + "get_rerank_service_url", + "get_translation_cache_config", ] diff --git a/config/config.yaml b/config/config.yaml index 0078b30..0934739 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -5,6 +5,10 @@ # Elasticsearch Index es_index_name: "search_products" +# Config assets +assets: + query_rewrite_dictionary_path: "config/dictionaries/query_rewrite.dict" + # ES Index Settings (基础设置) es_settings: number_of_shards: 1 @@ -211,6 +215,19 @@ services: device: "cuda" batch_size: 32 normalize_embeddings: true + # 服务内图片后端(embedding 进程启动时读取) + image_backend: "clip_as_service" # clip_as_service | local_cnclip + image_backends: + clip_as_service: + server: "grpc://127.0.0.1:51000" + model_name: "CN-CLIP/ViT-L-14" + batch_size: 8 + normalize_embeddings: true + local_cnclip: + model_name: "ViT-L-14" + device: null + batch_size: 8 + normalize_embeddings: true rerank: provider: "http" base_url: "http://127.0.0.1:6007" @@ -218,6 +235,9 @@ services: http: base_url: "http://127.0.0.1:6007" service_url: "http://127.0.0.1:6007/rerank" + request: + max_docs: 1000 + normalize: true # 服务内后端(reranker 进程启动时读取) backend: "qwen3_vllm" # bge | qwen3_vllm | qwen3_transformers | dashscope_rerank backends: diff --git a/config/config_loader.py b/config/config_loader.py index e99fbf7..ec45701 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -1,523 +1,48 @@ """ -Configuration loader and validator for search engine configurations. +Compatibility wrapper for search-behavior config access. -This module handles loading, parsing, and validating YAML configuration files -that define how search should be executed (NOT how data should be indexed). - -索引结构由 mappings/search_products.json 定义。 -此配置只定义搜索行为:字段权重、搜索域、查询策略等。 +The unified loader lives in :mod:`config.loader`. This module now exposes the +search subtree only, so existing search/indexer code can consume a single +source-of-truth search config without reparsing YAML separately. """ -import yaml -from typing import Dict, Any, List, Optional -from dataclasses import dataclass, field -from pathlib import Path - - -@dataclass -class IndexConfig: - """Configuration for an index domain (e.g., default, title, brand).""" - name: str - label: str - fields: List[str] # List of field names to include in this search domain - boost: float = 1.0 - example: Optional[str] = None - - -@dataclass -class QueryConfig: - """Configuration for query processing.""" - supported_languages: List[str] = field(default_factory=lambda: ["zh", "en"]) - default_language: str = "en" - - # Feature flags - enable_text_embedding: bool = True - enable_query_rewrite: bool = True - - # Query rewrite dictionary (loaded from external file) - rewrite_dictionary: Dict[str, str] = field(default_factory=dict) - - # Embedding field names - text_embedding_field: Optional[str] = "title_embedding" - image_embedding_field: Optional[str] = None - - # Source fields configuration - source_fields: Optional[List[str]] = None - - # KNN boost configuration - knn_boost: float = 0.25 # Boost value for KNN (embedding recall) - - # Dynamic text fields for multi-language retrieval - multilingual_fields: List[str] = field( - default_factory=lambda: ["title", "brief", "description", "vendor", "category_path", "category_name_text"] - ) - shared_fields: List[str] = field( - default_factory=lambda: ["tags", "option1_values", "option2_values", "option3_values"] - ) - core_multilingual_fields: List[str] = field( - default_factory=lambda: ["title", "brief", "vendor", "category_name_text"] - ) - - # Unified text strategy tuning - base_minimum_should_match: str = "75%" - translation_minimum_should_match: str = "75%" - translation_boost: float = 0.4 - translation_boost_when_source_missing: float = 1.0 - source_boost_when_missing: float = 0.6 - original_query_fallback_boost_when_translation_missing: float = 0.2 - tie_breaker_base_query: float = 0.9 - - # Query-time translation model selection (configurable) - # - zh_to_en_model: model for zh -> en - # - en_to_zh_model: model for en -> zh - # - default_translation_model: fallback model for all other language pairs - zh_to_en_model: str = "opus-mt-zh-en" - en_to_zh_model: str = "opus-mt-en-zh" - default_translation_model: str = "nllb-200-distilled-600m" - - -@dataclass -class SPUConfig: - """Configuration for SPU aggregation.""" - enabled: bool = False - spu_field: Optional[str] = None - inner_hits_size: int = 3 - # 配置哪些option维度参与检索(进索引、以及在线搜索) - searchable_option_dimensions: List[str] = field(default_factory=lambda: ['option1', 'option2', 'option3']) - - -@dataclass -class FunctionScoreConfig: - """Function Score配置(ES层打分规则)""" - score_mode: str = "sum" - boost_mode: str = "multiply" - functions: List[Dict[str, Any]] = field(default_factory=list) - - -@dataclass -class RerankConfig: - """重排配置(provider/URL 在 services.rerank)""" - enabled: bool = True - rerank_window: int = 384 - timeout_sec: float = 15.0 - weight_es: float = 0.4 - weight_ai: float = 0.6 - rerank_query_template: str = "{query}" - rerank_doc_template: str = "{title}" - - -@dataclass -class SearchConfig: - """Complete configuration for search engine (multi-tenant).""" - - # 字段权重配置(用于搜索) - field_boosts: Dict[str, float] - - # Legacy index domains (deprecated; kept for compatibility) - indexes: List[IndexConfig] - - # Query processing - query_config: QueryConfig - - # Function Score configuration (ES层打分) - function_score: FunctionScoreConfig - - # Rerank configuration (本地重排) - rerank: RerankConfig - - # SPU configuration - spu_config: SPUConfig - - # ES index settings - es_index_name: str - - # Tenant configuration - tenant_config: Dict[str, Any] = field(default_factory=dict) - - # ES settings - es_settings: Dict[str, Any] = field(default_factory=dict) - # Extensible service/provider registry (translation/embedding/rerank/...) - services: Dict[str, Any] = field(default_factory=dict) +from __future__ import annotations +from dataclasses import asdict +from pathlib import Path +from typing import Any, Dict, List, Optional -class ConfigurationError(Exception): - """Raised when configuration validation fails.""" - pass +from config.loader import AppConfigLoader, ConfigurationError +from config.schema import ( + FunctionScoreConfig, + IndexConfig, + QueryConfig, + RerankConfig, + SPUConfig, + SearchConfig, +) class ConfigLoader: - """Loads and validates unified search engine configuration from YAML file.""" - - def __init__(self, config_file: Optional[Path] = None): - """ - Initialize config loader. - - Args: - config_file: Path to config YAML file (defaults to config/config.yaml) - """ - if config_file is None: - config_file = Path(__file__).parent / "config.yaml" - self.config_file = Path(config_file) - - def _load_rewrite_dictionary(self) -> Dict[str, str]: - """Load query rewrite dictionary from external file.""" - rewrite_file = Path(__file__).parent / "rewrite_dictionary.txt" - rewrite_dict = {} - - if not rewrite_file.exists(): - return rewrite_dict - - try: - with open(rewrite_file, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line or line.startswith('#'): - continue - - parts = line.split('\t') - if len(parts) >= 2: - original = parts[0].strip() - replacement = parts[1].strip() - if original and replacement: - rewrite_dict[original] = replacement - except Exception as e: - print(f"Warning: Failed to load rewrite dictionary: {e}") - - return rewrite_dict - - def load_config(self, validate: bool = True) -> SearchConfig: - """ - Load unified configuration from YAML file. - - Args: - validate: Whether to validate configuration after loading - - Returns: - SearchConfig object - - Raises: - ConfigurationError: If config file not found, invalid, or validation fails - """ - if not self.config_file.exists(): - raise ConfigurationError(f"Configuration file not found: {self.config_file}") - - try: - with open(self.config_file, 'r', encoding='utf-8') as f: - config_data = yaml.safe_load(f) - except yaml.YAMLError as e: - raise ConfigurationError(f"Invalid YAML in {self.config_file}: {e}") - - config = self._parse_config(config_data) - - # Auto-validate configuration - if validate: - errors = self.validate_config(config) - if errors: - error_msg = "Configuration validation failed:\n" + "\n".join(f" - {err}" for err in errors) - raise ConfigurationError(error_msg) - - return config - - def _parse_config(self, config_data: Dict[str, Any]) -> SearchConfig: - """Parse configuration dictionary into SearchConfig object.""" - - # Parse field_boosts - field_boosts = config_data.get("field_boosts", {}) - if not isinstance(field_boosts, dict): - raise ConfigurationError("field_boosts must be a dictionary") - - # Parse indexes (deprecated; compatibility only) - indexes = [] - for index_data in config_data.get("indexes", []): - indexes.append(self._parse_index_config(index_data)) - - # Parse query config - query_config_data = config_data.get("query_config", {}) - rewrite_dictionary = self._load_rewrite_dictionary() - search_fields_cfg = query_config_data.get("search_fields", {}) - text_strategy_cfg = query_config_data.get("text_query_strategy", {}) + """Load the unified app config and return the search subtree.""" - query_config = QueryConfig( - supported_languages=query_config_data.get("supported_languages") or ["zh", "en"], - default_language=query_config_data.get("default_language") or "en", - enable_text_embedding=query_config_data.get("enable_text_embedding", True), - enable_query_rewrite=query_config_data.get("enable_query_rewrite", True), - rewrite_dictionary=rewrite_dictionary, - text_embedding_field=query_config_data.get("text_embedding_field"), - image_embedding_field=query_config_data.get("image_embedding_field"), - source_fields=query_config_data.get("source_fields"), - knn_boost=query_config_data.get("knn_boost", 0.25), - multilingual_fields=search_fields_cfg.get( - "multilingual_fields", - ["title", "brief", "description", "vendor", "category_path", "category_name_text"], - ), - shared_fields=search_fields_cfg.get( - "shared_fields", - ["tags", "option1_values", "option2_values", "option3_values"], - ), - core_multilingual_fields=search_fields_cfg.get( - "core_multilingual_fields", - ["title", "brief", "vendor", "category_name_text"], - ), - base_minimum_should_match=str(text_strategy_cfg.get("base_minimum_should_match", "75%")), - translation_minimum_should_match=str(text_strategy_cfg.get("translation_minimum_should_match", "75%")), - translation_boost=float(text_strategy_cfg.get("translation_boost", 0.4)), - translation_boost_when_source_missing=float( - text_strategy_cfg.get("translation_boost_when_source_missing", 1.0) - ), - source_boost_when_missing=float(text_strategy_cfg.get("source_boost_when_missing", 0.6)), - original_query_fallback_boost_when_translation_missing=float( - text_strategy_cfg.get("original_query_fallback_boost_when_translation_missing", 0.2) - ), - tie_breaker_base_query=float(text_strategy_cfg.get("tie_breaker_base_query", 0.9)), - zh_to_en_model=str(query_config_data.get("zh_to_en_model") or "opus-mt-zh-en"), - en_to_zh_model=str(query_config_data.get("en_to_zh_model") or "opus-mt-en-zh"), - default_translation_model=str( - query_config_data.get("default_translation_model") or "nllb-200-distilled-600m" - ), - ) - - # Parse Function Score configuration - fs_data = config_data.get("function_score", {}) - function_score = FunctionScoreConfig( - score_mode=fs_data.get("score_mode") or "sum", - boost_mode=fs_data.get("boost_mode") or "multiply", - functions=fs_data.get("functions") or [] - ) - - # Parse Rerank (provider/URL in services.rerank) - rerank_data = config_data.get("rerank", {}) - rerank = RerankConfig( - enabled=bool(rerank_data.get("enabled", True)), - rerank_window=int(rerank_data.get("rerank_window", 384)), - timeout_sec=float(rerank_data.get("timeout_sec", 15.0)), - weight_es=float(rerank_data.get("weight_es", 0.4)), - weight_ai=float(rerank_data.get("weight_ai", 0.6)), - rerank_query_template=str(rerank_data.get("rerank_query_template") or "{query}"), - rerank_doc_template=str(rerank_data.get("rerank_doc_template") or "{title}"), - ) - - # Parse SPU config - spu_data = config_data.get("spu_config", {}) - spu_config = SPUConfig( - enabled=spu_data.get("enabled", False), - spu_field=spu_data.get("spu_field"), - inner_hits_size=spu_data.get("inner_hits_size", 3), - searchable_option_dimensions=spu_data.get("searchable_option_dimensions", ['option1', 'option2', 'option3']) - ) - - # Parse tenant config - tenant_config_data = config_data.get("tenant_config", {}) + def __init__(self, config_file: Optional[Path] = None) -> None: + self._loader = AppConfigLoader(config_file=Path(config_file) if config_file is not None else None) - # Parse extensible services/provider registry - services_data = config_data.get("services", {}) or {} - if not isinstance(services_data, dict): - raise ConfigurationError("services must be a dictionary if provided") + def load_config(self, validate: bool = True) -> SearchConfig: + return self._loader.load(validate=validate).search - return SearchConfig( - field_boosts=field_boosts, - indexes=indexes, - query_config=query_config, - function_score=function_score, - rerank=rerank, - spu_config=spu_config, - tenant_config=tenant_config_data, - es_index_name=config_data.get("es_index_name", "search_products"), - es_settings=config_data.get("es_settings", {}), - services=services_data - ) - - def _parse_index_config(self, index_data: Dict[str, Any]) -> IndexConfig: - """Parse index configuration from dictionary.""" - return IndexConfig( - name=index_data["name"], - label=index_data.get("label", index_data["name"]), - fields=index_data.get("fields", []), - boost=index_data.get("boost", 1.0), - example=index_data.get("example") - ) - def validate_config(self, config: SearchConfig) -> List[str]: - """ - Validate configuration for common errors. - - Args: - config: SearchConfig to validate - - Returns: - List of error messages (empty if valid) - """ - errors = [] - - # Validate es_index_name + errors: List[str] = [] if not config.es_index_name: errors.append("es_index_name is required") - - # Validate field_boosts if not config.field_boosts: errors.append("field_boosts is empty") - - for field_name, boost in config.field_boosts.items(): - if not isinstance(boost, (int, float)): - errors.append(f"field_boosts['{field_name}']: boost must be a number, got {type(boost).__name__}") - elif boost < 0: - errors.append(f"field_boosts['{field_name}']: boost must be non-negative") - - # Validate indexes (deprecated, optional) - index_names = set() - for index in config.indexes: - # Check for duplicate index names - if index.name in index_names: - errors.append(f"Duplicate index name: {index.name}") - index_names.add(index.name) - - # Validate fields in index - if not index.fields: - errors.append(f"Index '{index.name}': fields list is empty") - - # Validate SPU config - if config.spu_config.enabled: - if not config.spu_config.spu_field: - errors.append("SPU aggregation enabled but no spu_field specified") - - # Validate query config - if not config.query_config.supported_languages: - errors.append("At least one supported language must be specified") - if config.query_config.default_language not in config.query_config.supported_languages: - errors.append( - f"Default language '{config.query_config.default_language}' " - f"not in supported languages: {config.query_config.supported_languages}" - ) - - # Validate dynamic search fields - def _validate_str_list(name: str, values: List[str]) -> None: - if not isinstance(values, list) or not values: - errors.append(f"query_config.{name} must be a non-empty list[str]") - return - for i, val in enumerate(values): - if not isinstance(val, str) or not val.strip(): - errors.append(f"query_config.{name}[{i}] must be a non-empty string") - - _validate_str_list("multilingual_fields", config.query_config.multilingual_fields) - _validate_str_list("shared_fields", config.query_config.shared_fields) - _validate_str_list("core_multilingual_fields", config.query_config.core_multilingual_fields) - - core_set = set(config.query_config.core_multilingual_fields) - multi_set = set(config.query_config.multilingual_fields) - if not core_set.issubset(multi_set): - errors.append("query_config.core_multilingual_fields must be subset of multilingual_fields") - - # Validate text query strategy numbers - for name in ( - "translation_boost", - "translation_boost_when_source_missing", - "source_boost_when_missing", - "original_query_fallback_boost_when_translation_missing", - "tie_breaker_base_query", - ): - value = getattr(config.query_config, name, None) - if not isinstance(value, (int, float)): - errors.append(f"query_config.{name} must be a number") - elif value < 0: - errors.append(f"query_config.{name} must be non-negative") - - # Validate source_fields tri-state semantics - source_fields = config.query_config.source_fields - if source_fields is not None: - if not isinstance(source_fields, list): - errors.append("query_config.source_fields must be null or list[str]") - else: - for idx, field_name in enumerate(source_fields): - if not isinstance(field_name, str) or not field_name.strip(): - errors.append( - f"query_config.source_fields[{idx}] must be a non-empty string" - ) - - # Validate tenant config shape (default must exist in config) - tenant_cfg = config.tenant_config - if not isinstance(tenant_cfg, dict): - errors.append("tenant_config must be an object") - else: - default_cfg = tenant_cfg.get("default") - if not isinstance(default_cfg, dict): - errors.append("tenant_config.default must be configured") - else: - index_languages = default_cfg.get("index_languages") - if not isinstance(index_languages, list) or len(index_languages) == 0: - errors.append("tenant_config.default.index_languages must be a non-empty list") - + errors.append("default_language must be included in supported_languages") + if config.spu_config.enabled and not config.spu_config.spu_field: + errors.append("spu_field is required when SPU is enabled") return errors - + def to_dict(self, config: SearchConfig) -> Dict[str, Any]: - """Convert SearchConfig to dictionary representation.""" - - # Build query_config dict - query_config_dict = { - "supported_languages": config.query_config.supported_languages, - "default_language": config.query_config.default_language, - "enable_text_embedding": config.query_config.enable_text_embedding, - "enable_query_rewrite": config.query_config.enable_query_rewrite, - "text_embedding_field": config.query_config.text_embedding_field, - "image_embedding_field": config.query_config.image_embedding_field, - "source_fields": config.query_config.source_fields, - "search_fields": { - "multilingual_fields": config.query_config.multilingual_fields, - "shared_fields": config.query_config.shared_fields, - "core_multilingual_fields": config.query_config.core_multilingual_fields, - }, - "text_query_strategy": { - "base_minimum_should_match": config.query_config.base_minimum_should_match, - "translation_minimum_should_match": config.query_config.translation_minimum_should_match, - "translation_boost": config.query_config.translation_boost, - "translation_boost_when_source_missing": config.query_config.translation_boost_when_source_missing, - "source_boost_when_missing": config.query_config.source_boost_when_missing, - "original_query_fallback_boost_when_translation_missing": ( - config.query_config.original_query_fallback_boost_when_translation_missing - ), - "tie_breaker_base_query": config.query_config.tie_breaker_base_query, - } - } - - return { - "es_index_name": config.es_index_name, - "es_settings": config.es_settings, - "field_boosts": config.field_boosts, - "indexes": [self._index_to_dict(index) for index in config.indexes], - "query_config": query_config_dict, - "function_score": { - "score_mode": config.function_score.score_mode, - "boost_mode": config.function_score.boost_mode, - "functions": config.function_score.functions - }, - "rerank": { - "enabled": config.rerank.enabled, - "rerank_window": config.rerank.rerank_window, - "timeout_sec": config.rerank.timeout_sec, - "weight_es": config.rerank.weight_es, - "weight_ai": config.rerank.weight_ai, - "rerank_query_template": config.rerank.rerank_query_template, - "rerank_doc_template": config.rerank.rerank_doc_template, - }, - "spu_config": { - "enabled": config.spu_config.enabled, - "spu_field": config.spu_config.spu_field, - "inner_hits_size": config.spu_config.inner_hits_size, - "searchable_option_dimensions": config.spu_config.searchable_option_dimensions - }, - "services": config.services, - } - - def _index_to_dict(self, index: IndexConfig) -> Dict[str, Any]: - """Convert IndexConfig to dictionary.""" - result = { - "name": index.name, - "label": index.label, - "fields": index.fields, - "boost": index.boost - } - - if index.example: - result["example"] = index.example - - return result + return asdict(config) diff --git a/config/dictionaries/query_rewrite.dict b/config/dictionaries/query_rewrite.dict new file mode 100644 index 0000000..bab6ac8 --- /dev/null +++ b/config/dictionaries/query_rewrite.dict @@ -0,0 +1,2 @@ +玩具 category.keyword:玩具 OR default:玩具 +消防 category.keyword:消防 OR default:消防 diff --git a/config/env_config.py b/config/env_config.py index 1c8075b..028c702 100644 --- a/config/env_config.py +++ b/config/env_config.py @@ -1,147 +1,115 @@ """ -Centralized configuration management for saas-search. +Compatibility accessors for infrastructure/runtime environment settings. -Loads configuration from environment variables and .env file. -This module provides a single point for loading .env and setting defaults. -All configuration variables are exported directly - no need for getter functions. +All values are derived from the unified application config. This module no +longer owns any independent loading or precedence rules. """ -import os -from pathlib import Path -from dotenv import load_dotenv - -# Load .env file from project root -PROJECT_ROOT = Path(__file__).parent.parent -load_dotenv(PROJECT_ROOT / '.env') - - -# Elasticsearch Configuration -ES_CONFIG = { - 'host': os.getenv('ES_HOST', 'http://localhost:9200'), - 'username': os.getenv('ES_USERNAME'), - 'password': os.getenv('ES_PASSWORD'), -} - -# Runtime environment & index namespace -# RUNTIME_ENV: 当前运行环境,建议使用 prod / uat / test / dev 等枚举值 -RUNTIME_ENV = os.getenv('RUNTIME_ENV', 'prod') -# ES_INDEX_NAMESPACE: 用于按环境隔离索引的命名空间前缀,例如 "uat_" / "test_" -# 为空字符串时表示不加前缀(通常是 prod 环境) -ES_INDEX_NAMESPACE = os.getenv('ES_INDEX_NAMESPACE') -if ES_INDEX_NAMESPACE is None: - # 未显式配置时,非 prod 环境默认加 "_" 前缀,prod 环境默认不加前缀 - ES_INDEX_NAMESPACE = '' if RUNTIME_ENV == 'prod' else f'{RUNTIME_ENV}_' - -# Redis Configuration -REDIS_CONFIG = { - 'host': os.getenv('REDIS_HOST', 'localhost'), - 'port': int(os.getenv('REDIS_PORT', 6479)), - 'snapshot_db': int(os.getenv('REDIS_SNAPSHOT_DB', 0)), - 'password': os.getenv('REDIS_PASSWORD'), - 'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', 1)), - 'socket_connect_timeout': int(os.getenv('REDIS_SOCKET_CONNECT_TIMEOUT', 1)), - 'retry_on_timeout': os.getenv('REDIS_RETRY_ON_TIMEOUT', 'False').lower() == 'true', - 'cache_expire_days': int(os.getenv('REDIS_CACHE_EXPIRE_DAYS', 360*2)), # 6 months - # Embedding 缓存 key 前缀,例如 "embedding" - 'embedding_cache_prefix': os.getenv('REDIS_EMBEDDING_CACHE_PREFIX', 'embedding'), -} - -# DeepL API Key -DEEPL_AUTH_KEY = os.getenv('DEEPL_AUTH_KEY') - -# DashScope API Key (for Qwen models) -DASHSCOPE_API_KEY = os.getenv('DASHSCOPE_API_KEY') - -# API Service Configuration -API_HOST = os.getenv('API_HOST', '0.0.0.0') -API_PORT = int(os.getenv('API_PORT', 6002)) -# Indexer service -INDEXER_HOST = os.getenv('INDEXER_HOST', '0.0.0.0') -INDEXER_PORT = int(os.getenv('INDEXER_PORT', 6004)) -# Optional dependent services -# EMBEDDING_HOST / EMBEDDING_PORT are only used by the optional combined embedding mode. -EMBEDDING_HOST = os.getenv('EMBEDDING_HOST', '127.0.0.1') -EMBEDDING_PORT = int(os.getenv('EMBEDDING_PORT', 6005)) -EMBEDDING_TEXT_HOST = os.getenv('EMBEDDING_TEXT_HOST', '127.0.0.1') -EMBEDDING_TEXT_PORT = int(os.getenv('EMBEDDING_TEXT_PORT', 6005)) -EMBEDDING_IMAGE_HOST = os.getenv('EMBEDDING_IMAGE_HOST', '127.0.0.1') -EMBEDDING_IMAGE_PORT = int(os.getenv('EMBEDDING_IMAGE_PORT', 6008)) -TRANSLATION_HOST = os.getenv('TRANSLATION_HOST', '127.0.0.1') -TRANSLATION_PORT = int(os.getenv('TRANSLATION_PORT', 6006)) -RERANKER_HOST = os.getenv('RERANKER_HOST', '127.0.0.1') -RERANKER_PORT = int(os.getenv('RERANKER_PORT', 6007)) -RERANK_PROVIDER = os.getenv('RERANK_PROVIDER', 'http') -# API_BASE_URL: 如果未设置,根据API_HOST构建(0.0.0.0使用localhost) -API_BASE_URL = os.getenv('API_BASE_URL') -if not API_BASE_URL: - API_BASE_URL = f'http://localhost:{API_PORT}' if API_HOST == '0.0.0.0' else f'http://{API_HOST}:{API_PORT}' -INDEXER_BASE_URL = os.getenv('INDEXER_BASE_URL') or ( - f'http://localhost:{INDEXER_PORT}' if INDEXER_HOST == '0.0.0.0' else f'http://{INDEXER_HOST}:{INDEXER_PORT}' -) -EMBEDDING_TEXT_SERVICE_URL = os.getenv('EMBEDDING_TEXT_SERVICE_URL') or ( - f'http://{EMBEDDING_TEXT_HOST}:{EMBEDDING_TEXT_PORT}' -) -EMBEDDING_IMAGE_SERVICE_URL = os.getenv('EMBEDDING_IMAGE_SERVICE_URL') or ( - f'http://{EMBEDDING_IMAGE_HOST}:{EMBEDDING_IMAGE_PORT}' -) -RERANKER_SERVICE_URL = os.getenv('RERANKER_SERVICE_URL') or f'http://{RERANKER_HOST}:{RERANKER_PORT}/rerank' +from __future__ import annotations + +from typing import Any, Dict + +from config.loader import get_app_config + + +def _app(): + return get_app_config() + + +def _runtime(): + return _app().runtime -# Model IDs / paths -TEXT_MODEL_DIR = os.getenv('TEXT_MODEL_DIR', os.getenv('TEXT_MODEL_ID', 'Qwen/Qwen3-Embedding-0.6B')) -IMAGE_MODEL_DIR = os.getenv('IMAGE_MODEL_DIR', '/data/tw/models/cn-clip') -# Cache Directory -CACHE_DIR = os.getenv('CACHE_DIR', '.cache') +def _infra(): + return _app().infrastructure -# MySQL Database Configuration (Shoplazza) -DB_CONFIG = { - 'host': os.getenv('DB_HOST'), - 'port': int(os.getenv('DB_PORT', 3306)) if os.getenv('DB_PORT') else 3306, - 'database': os.getenv('DB_DATABASE'), - 'username': os.getenv('DB_USERNAME'), - 'password': os.getenv('DB_PASSWORD'), -} +def _elasticsearch_dict() -> Dict[str, Any]: + cfg = _infra().elasticsearch + return { + "host": cfg.host, + "username": cfg.username, + "password": cfg.password, + } -def print_config(): - """Print current configuration (with sensitive data masked).""" - print("=" * 60) - print("saas-search Configuration") - print("=" * 60) - print("\nElasticsearch:") - print(f" Host: {ES_CONFIG['host']}") - print(f" Username: {ES_CONFIG['username']}") - print(f" Password: {'*' * 10 if ES_CONFIG['password'] else 'None'}") +def _redis_dict() -> Dict[str, Any]: + cfg = _infra().redis + return { + "host": cfg.host, + "port": cfg.port, + "snapshot_db": cfg.snapshot_db, + "password": cfg.password, + "socket_timeout": cfg.socket_timeout, + "socket_connect_timeout": cfg.socket_connect_timeout, + "retry_on_timeout": cfg.retry_on_timeout, + "cache_expire_days": cfg.cache_expire_days, + "embedding_cache_prefix": cfg.embedding_cache_prefix, + "anchor_cache_prefix": cfg.anchor_cache_prefix, + "anchor_cache_expire_days": cfg.anchor_cache_expire_days, + } - print("\nRedis:") - print(f" Host: {REDIS_CONFIG['host']}") - print(f" Port: {REDIS_CONFIG['port']}") - print(f" Password: {'*' * 10 if REDIS_CONFIG['password'] else 'None'}") - print("\nDeepL:") - print(f" API Key: {'*' * 10 if DEEPL_AUTH_KEY else 'None (translation disabled)'}") +def _db_dict() -> Dict[str, Any]: + cfg = _infra().database + return { + "host": cfg.host, + "port": cfg.port, + "database": cfg.database, + "username": cfg.username, + "password": cfg.password, + } + + +ES_CONFIG = _elasticsearch_dict() +REDIS_CONFIG = _redis_dict() +DB_CONFIG = _db_dict() + +RUNTIME_ENV = _runtime().environment +ES_INDEX_NAMESPACE = _runtime().index_namespace + +DEEPL_AUTH_KEY = _infra().secrets.deepl_auth_key +DASHSCOPE_API_KEY = _infra().secrets.dashscope_api_key + +API_HOST = _runtime().api_host +API_PORT = _runtime().api_port +INDEXER_HOST = _runtime().indexer_host +INDEXER_PORT = _runtime().indexer_port +EMBEDDING_HOST = _runtime().embedding_host +EMBEDDING_PORT = _runtime().embedding_port +EMBEDDING_TEXT_HOST = _runtime().embedding_host +EMBEDDING_TEXT_PORT = _runtime().embedding_text_port +EMBEDDING_IMAGE_HOST = _runtime().embedding_host +EMBEDDING_IMAGE_PORT = _runtime().embedding_image_port +TRANSLATION_HOST = _runtime().translator_host +TRANSLATION_PORT = _runtime().translator_port +RERANKER_HOST = _runtime().reranker_host +RERANKER_PORT = _runtime().reranker_port + +API_BASE_URL = f"http://localhost:{API_PORT}" if API_HOST == "0.0.0.0" else f"http://{API_HOST}:{API_PORT}" +INDEXER_BASE_URL = ( + f"http://localhost:{INDEXER_PORT}" if INDEXER_HOST == "0.0.0.0" else f"http://{INDEXER_HOST}:{INDEXER_PORT}" +) +EMBEDDING_TEXT_SERVICE_URL = _app().services.embedding.get_provider_config().get("text_base_url") +EMBEDDING_IMAGE_SERVICE_URL = _app().services.embedding.get_provider_config().get("image_base_url") +RERANKER_SERVICE_URL = ( + _app().services.rerank.get_provider_config().get("service_url") + or _app().services.rerank.get_provider_config().get("base_url") +) + - print("\nAPI Service:") - print(f" Host: {API_HOST}") - print(f" Port: {API_PORT}") +def get_es_config() -> Dict[str, Any]: + return dict(ES_CONFIG) - print("\nModels:") - print(f" Text Model: {TEXT_MODEL_DIR}") - print(f" Image Model: {IMAGE_MODEL_DIR}") - print("\nCache:") - print(f" Cache Directory: {CACHE_DIR}") +def get_db_config() -> Dict[str, Any]: + return dict(DB_CONFIG) - print("\nMySQL Database:") - print(f" Host: {DB_CONFIG['host']}") - print(f" Port: {DB_CONFIG['port']}") - print(f" Database: {DB_CONFIG['database']}") - print(f" Username: {DB_CONFIG['username']}") - print(f" Password: {'*' * 10 if DB_CONFIG['password'] else 'None'}") - print("=" * 60) +def get_redis_config() -> Dict[str, Any]: + return dict(REDIS_CONFIG) -if __name__ == "__main__": - print_config() +def print_config() -> None: + config = _app().sanitized_dict() + print(config) diff --git a/config/loader.py b/config/loader.py new file mode 100644 index 0000000..07f5c17 --- /dev/null +++ b/config/loader.py @@ -0,0 +1,592 @@ +""" +Unified application configuration loader. + +This module is the single source of truth for loading, merging, normalizing, +and validating application configuration. +""" + +from __future__ import annotations + +import hashlib +import json +import os +from copy import deepcopy +from dataclasses import asdict +from functools import lru_cache +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import yaml + +try: + from dotenv import load_dotenv as _load_dotenv # type: ignore +except Exception: # pragma: no cover + _load_dotenv = None + +from config.schema import ( + AppConfig, + AssetsConfig, + ConfigMetadata, + DatabaseSettings, + ElasticsearchSettings, + EmbeddingServiceConfig, + FunctionScoreConfig, + IndexConfig, + InfrastructureConfig, + QueryConfig, + RedisSettings, + RerankConfig, + RerankServiceConfig, + RuntimeConfig, + SearchConfig, + SecretsConfig, + ServicesConfig, + SPUConfig, + TenantCatalogConfig, + TranslationServiceConfig, +) +from translation.settings import build_translation_config + + +class ConfigurationError(Exception): + """Raised when configuration validation fails.""" + + +def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: + result = deepcopy(base) + for key, value in (override or {}).items(): + if ( + key in result + and isinstance(result[key], dict) + and isinstance(value, dict) + ): + result[key] = _deep_merge(result[key], value) + else: + result[key] = deepcopy(value) + return result + + +def _load_yaml(path: Path) -> Dict[str, Any]: + with open(path, "r", encoding="utf-8") as handle: + data = yaml.safe_load(handle) or {} + if not isinstance(data, dict): + raise ConfigurationError(f"Configuration file root must be a mapping: {path}") + return data + + +def _read_rewrite_dictionary(path: Path) -> Dict[str, str]: + rewrite_dict: Dict[str, str] = {} + if not path.exists(): + return rewrite_dict + + with open(path, "r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t") + if len(parts) < 2: + continue + original = parts[0].strip() + replacement = parts[1].strip() + if original and replacement: + rewrite_dict[original] = replacement + return rewrite_dict + + +class AppConfigLoader: + """Load the unified application configuration.""" + + def __init__( + self, + *, + config_dir: Optional[Path] = None, + config_file: Optional[Path] = None, + env_file: Optional[Path] = None, + ) -> None: + self.config_dir = Path(config_dir or Path(__file__).parent) + self.config_file = Path(config_file) if config_file is not None else None + self.project_root = self.config_dir.parent + self.env_file = Path(env_file) if env_file is not None else self.project_root / ".env" + + def load(self, validate: bool = True) -> AppConfig: + self._load_env() + raw_config, loaded_files = self._load_raw_config() + app_config = self._build_app_config(raw_config, loaded_files) + if validate: + self._validate(app_config) + return app_config + + def _load_env(self) -> None: + if _load_dotenv is not None: + _load_dotenv(self.env_file, override=False) + return + _load_env_file_fallback(self.env_file) + + def _load_raw_config(self) -> Tuple[Dict[str, Any], List[str]]: + env_name = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" + loaded_files: List[str] = [] + raw: Dict[str, Any] = {} + + if self.config_file is not None: + config_path = self.config_file + if not config_path.exists(): + raise ConfigurationError(f"Configuration file not found: {config_path}") + raw = _deep_merge(raw, _load_yaml(config_path)) + loaded_files.append(str(config_path)) + else: + base_path = self.config_dir / "base.yaml" + legacy_path = self.config_dir / "config.yaml" + primary_path = base_path if base_path.exists() else legacy_path + if not primary_path.exists(): + raise ConfigurationError(f"Configuration file not found: {primary_path}") + raw = _deep_merge(raw, _load_yaml(primary_path)) + loaded_files.append(str(primary_path)) + + env_path = self.config_dir / "environments" / f"{env_name}.yaml" + if env_path.exists(): + raw = _deep_merge(raw, _load_yaml(env_path)) + loaded_files.append(str(env_path)) + + tenant_dir = self.config_dir / "tenants" + if tenant_dir.is_dir(): + tenant_files = sorted(tenant_dir.glob("*.yaml")) + if tenant_files: + tenant_config = {"default": {}, "tenants": {}} + default_path = tenant_dir / "_default.yaml" + if default_path.exists(): + tenant_config["default"] = _load_yaml(default_path) + loaded_files.append(str(default_path)) + for tenant_path in tenant_files: + if tenant_path.name == "_default.yaml": + continue + tenant_config["tenants"][tenant_path.stem] = _load_yaml(tenant_path) + loaded_files.append(str(tenant_path)) + raw["tenant_config"] = tenant_config + + return raw, loaded_files + + def _build_app_config(self, raw: Dict[str, Any], loaded_files: List[str]) -> AppConfig: + assets_cfg = raw.get("assets") if isinstance(raw.get("assets"), dict) else {} + rewrite_path = ( + assets_cfg.get("query_rewrite_dictionary_path") + or assets_cfg.get("rewrite_dictionary_path") + or self.config_dir / "dictionaries" / "query_rewrite.dict" + ) + rewrite_path = Path(rewrite_path) + if not rewrite_path.is_absolute(): + rewrite_path = (self.project_root / rewrite_path).resolve() + if not rewrite_path.exists(): + legacy_rewrite_path = (self.config_dir / "query_rewrite.dict").resolve() + if legacy_rewrite_path.exists(): + rewrite_path = legacy_rewrite_path + + rewrite_dictionary = _read_rewrite_dictionary(rewrite_path) + search_config = self._build_search_config(raw, rewrite_dictionary) + services_config = self._build_services_config(raw.get("services") or {}) + tenants_config = self._build_tenants_config(raw.get("tenant_config") or {}) + runtime_config = self._build_runtime_config() + infrastructure_config = self._build_infrastructure_config(runtime_config.environment) + + metadata = ConfigMetadata( + loaded_files=tuple(loaded_files), + config_hash="", + deprecated_keys=tuple(self._detect_deprecated_keys(raw)), + ) + + app_config = AppConfig( + runtime=runtime_config, + infrastructure=infrastructure_config, + search=search_config, + services=services_config, + tenants=tenants_config, + assets=AssetsConfig(query_rewrite_dictionary_path=rewrite_path), + metadata=metadata, + ) + + config_hash = self._compute_hash(app_config) + return AppConfig( + runtime=app_config.runtime, + infrastructure=app_config.infrastructure, + search=app_config.search, + services=app_config.services, + tenants=app_config.tenants, + assets=app_config.assets, + metadata=ConfigMetadata( + loaded_files=app_config.metadata.loaded_files, + config_hash=config_hash, + deprecated_keys=app_config.metadata.deprecated_keys, + ), + ) + + def _build_search_config(self, raw: Dict[str, Any], rewrite_dictionary: Dict[str, str]) -> SearchConfig: + field_boosts = raw.get("field_boosts") or {} + if not isinstance(field_boosts, dict): + raise ConfigurationError("field_boosts must be a mapping") + + indexes: List[IndexConfig] = [] + for item in raw.get("indexes") or []: + if not isinstance(item, dict): + raise ConfigurationError("indexes items must be mappings") + indexes.append( + IndexConfig( + name=str(item["name"]), + label=str(item.get("label") or item["name"]), + fields=list(item.get("fields") or []), + boost=float(item.get("boost", 1.0)), + example=item.get("example"), + ) + ) + + query_cfg = raw.get("query_config") if isinstance(raw.get("query_config"), dict) else {} + search_fields = query_cfg.get("search_fields") if isinstance(query_cfg.get("search_fields"), dict) else {} + text_strategy = ( + query_cfg.get("text_query_strategy") + if isinstance(query_cfg.get("text_query_strategy"), dict) + else {} + ) + query_config = QueryConfig( + supported_languages=list(query_cfg.get("supported_languages") or ["zh", "en"]), + default_language=str(query_cfg.get("default_language") or "en"), + enable_text_embedding=bool(query_cfg.get("enable_text_embedding", True)), + enable_query_rewrite=bool(query_cfg.get("enable_query_rewrite", True)), + rewrite_dictionary=rewrite_dictionary, + text_embedding_field=query_cfg.get("text_embedding_field"), + image_embedding_field=query_cfg.get("image_embedding_field"), + source_fields=query_cfg.get("source_fields"), + knn_boost=float(query_cfg.get("knn_boost", 0.25)), + multilingual_fields=list( + search_fields.get( + "multilingual_fields", + ["title", "brief", "description", "vendor", "category_path", "category_name_text"], + ) + ), + shared_fields=list( + search_fields.get( + "shared_fields", + ["tags", "option1_values", "option2_values", "option3_values"], + ) + ), + core_multilingual_fields=list( + search_fields.get( + "core_multilingual_fields", + ["title", "brief", "vendor", "category_name_text"], + ) + ), + base_minimum_should_match=str(text_strategy.get("base_minimum_should_match", "75%")), + translation_minimum_should_match=str(text_strategy.get("translation_minimum_should_match", "75%")), + translation_boost=float(text_strategy.get("translation_boost", 0.4)), + translation_boost_when_source_missing=float( + text_strategy.get("translation_boost_when_source_missing", 1.0) + ), + source_boost_when_missing=float(text_strategy.get("source_boost_when_missing", 0.6)), + original_query_fallback_boost_when_translation_missing=float( + text_strategy.get("original_query_fallback_boost_when_translation_missing", 0.2) + ), + tie_breaker_base_query=float(text_strategy.get("tie_breaker_base_query", 0.9)), + zh_to_en_model=str(query_cfg.get("zh_to_en_model") or "opus-mt-zh-en"), + en_to_zh_model=str(query_cfg.get("en_to_zh_model") or "opus-mt-en-zh"), + default_translation_model=str( + query_cfg.get("default_translation_model") or "nllb-200-distilled-600m" + ), + ) + + function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {} + rerank_cfg = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} + spu_cfg = raw.get("spu_config") if isinstance(raw.get("spu_config"), dict) else {} + + return SearchConfig( + field_boosts={str(key): float(value) for key, value in field_boosts.items()}, + indexes=indexes, + query_config=query_config, + function_score=FunctionScoreConfig( + score_mode=str(function_score_cfg.get("score_mode") or "sum"), + boost_mode=str(function_score_cfg.get("boost_mode") or "multiply"), + functions=list(function_score_cfg.get("functions") or []), + ), + rerank=RerankConfig( + enabled=bool(rerank_cfg.get("enabled", True)), + rerank_window=int(rerank_cfg.get("rerank_window", 384)), + timeout_sec=float(rerank_cfg.get("timeout_sec", 15.0)), + weight_es=float(rerank_cfg.get("weight_es", 0.4)), + weight_ai=float(rerank_cfg.get("weight_ai", 0.6)), + rerank_query_template=str(rerank_cfg.get("rerank_query_template") or "{query}"), + rerank_doc_template=str(rerank_cfg.get("rerank_doc_template") or "{title}"), + ), + spu_config=SPUConfig( + enabled=bool(spu_cfg.get("enabled", False)), + spu_field=spu_cfg.get("spu_field"), + inner_hits_size=int(spu_cfg.get("inner_hits_size", 3)), + searchable_option_dimensions=list( + spu_cfg.get("searchable_option_dimensions") or ["option1", "option2", "option3"] + ), + ), + es_index_name=str(raw.get("es_index_name") or "search_products"), + es_settings=dict(raw.get("es_settings") or {}), + ) + + def _build_services_config(self, raw: Dict[str, Any]) -> ServicesConfig: + if not isinstance(raw, dict): + raise ConfigurationError("services must be a mapping") + + translation_raw = raw.get("translation") if isinstance(raw.get("translation"), dict) else {} + normalized_translation = build_translation_config(translation_raw) + translation_config = TranslationServiceConfig( + endpoint=str(normalized_translation["service_url"]).rstrip("/"), + timeout_sec=float(normalized_translation["timeout_sec"]), + default_model=str(normalized_translation["default_model"]), + default_scene=str(normalized_translation["default_scene"]), + cache=dict(normalized_translation["cache"]), + capabilities={str(key): dict(value) for key, value in normalized_translation["capabilities"].items()}, + ) + + embedding_raw = raw.get("embedding") if isinstance(raw.get("embedding"), dict) else {} + embedding_provider = str(embedding_raw.get("provider") or "http").strip().lower() + embedding_providers = dict(embedding_raw.get("providers") or {}) + if embedding_provider not in embedding_providers: + raise ConfigurationError(f"services.embedding.providers.{embedding_provider} must be configured") + embedding_backend = str(embedding_raw.get("backend") or "").strip().lower() + embedding_backends = { + str(key).strip().lower(): dict(value) + for key, value in dict(embedding_raw.get("backends") or {}).items() + } + if embedding_backend not in embedding_backends: + raise ConfigurationError(f"services.embedding.backends.{embedding_backend} must be configured") + image_backend = str(embedding_raw.get("image_backend") or "clip_as_service").strip().lower() + image_backends = { + str(key).strip().lower(): dict(value) + for key, value in dict(embedding_raw.get("image_backends") or {}).items() + } + if not image_backends: + image_backends = { + "clip_as_service": { + "server": "grpc://127.0.0.1:51000", + "model_name": "CN-CLIP/ViT-L-14", + "batch_size": 8, + "normalize_embeddings": True, + }, + "local_cnclip": { + "model_name": "ViT-L-14", + "device": None, + "batch_size": 8, + "normalize_embeddings": True, + }, + } + if image_backend not in image_backends: + raise ConfigurationError(f"services.embedding.image_backends.{image_backend} must be configured") + + embedding_config = EmbeddingServiceConfig( + provider=embedding_provider, + providers=embedding_providers, + backend=embedding_backend, + backends=embedding_backends, + image_backend=image_backend, + image_backends=image_backends, + ) + + rerank_raw = raw.get("rerank") if isinstance(raw.get("rerank"), dict) else {} + rerank_provider = str(rerank_raw.get("provider") or "http").strip().lower() + rerank_providers = dict(rerank_raw.get("providers") or {}) + if rerank_provider not in rerank_providers: + raise ConfigurationError(f"services.rerank.providers.{rerank_provider} must be configured") + rerank_backend = str(rerank_raw.get("backend") or "").strip().lower() + rerank_backends = { + str(key).strip().lower(): dict(value) + for key, value in dict(rerank_raw.get("backends") or {}).items() + } + if rerank_backend not in rerank_backends: + raise ConfigurationError(f"services.rerank.backends.{rerank_backend} must be configured") + rerank_request = dict(rerank_raw.get("request") or {}) + rerank_request.setdefault("max_docs", 1000) + rerank_request.setdefault("normalize", True) + + rerank_config = RerankServiceConfig( + provider=rerank_provider, + providers=rerank_providers, + backend=rerank_backend, + backends=rerank_backends, + request=rerank_request, + ) + + return ServicesConfig( + translation=translation_config, + embedding=embedding_config, + rerank=rerank_config, + ) + + def _build_tenants_config(self, raw: Dict[str, Any]) -> TenantCatalogConfig: + if not isinstance(raw, dict): + raise ConfigurationError("tenant_config must be a mapping") + default_cfg = raw.get("default") if isinstance(raw.get("default"), dict) else {} + tenants_cfg = raw.get("tenants") if isinstance(raw.get("tenants"), dict) else {} + return TenantCatalogConfig( + default=dict(default_cfg), + tenants={str(key): dict(value) for key, value in tenants_cfg.items()}, + ) + + def _build_runtime_config(self) -> RuntimeConfig: + environment = (os.getenv("APP_ENV") or os.getenv("RUNTIME_ENV") or "prod").strip().lower() or "prod" + namespace = os.getenv("ES_INDEX_NAMESPACE") + if namespace is None: + namespace = "" if environment == "prod" else f"{environment}_" + + return RuntimeConfig( + environment=environment, + index_namespace=namespace, + api_host=os.getenv("API_HOST", "0.0.0.0"), + api_port=int(os.getenv("API_PORT", 6002)), + indexer_host=os.getenv("INDEXER_HOST", "0.0.0.0"), + indexer_port=int(os.getenv("INDEXER_PORT", 6004)), + embedding_host=os.getenv("EMBEDDING_HOST", "127.0.0.1"), + embedding_port=int(os.getenv("EMBEDDING_PORT", 6005)), + embedding_text_port=int(os.getenv("EMBEDDING_TEXT_PORT", 6005)), + embedding_image_port=int(os.getenv("EMBEDDING_IMAGE_PORT", 6008)), + translator_host=os.getenv("TRANSLATION_HOST", "127.0.0.1"), + translator_port=int(os.getenv("TRANSLATION_PORT", 6006)), + reranker_host=os.getenv("RERANKER_HOST", "127.0.0.1"), + reranker_port=int(os.getenv("RERANKER_PORT", 6007)), + ) + + def _build_infrastructure_config(self, environment: str) -> InfrastructureConfig: + del environment + return InfrastructureConfig( + elasticsearch=ElasticsearchSettings( + host=os.getenv("ES_HOST", "http://localhost:9200"), + username=os.getenv("ES_USERNAME"), + password=os.getenv("ES_PASSWORD"), + ), + redis=RedisSettings( + host=os.getenv("REDIS_HOST", "localhost"), + port=int(os.getenv("REDIS_PORT", 6479)), + snapshot_db=int(os.getenv("REDIS_SNAPSHOT_DB", 0)), + password=os.getenv("REDIS_PASSWORD"), + socket_timeout=int(os.getenv("REDIS_SOCKET_TIMEOUT", 1)), + socket_connect_timeout=int(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", 1)), + retry_on_timeout=os.getenv("REDIS_RETRY_ON_TIMEOUT", "false").strip().lower() == "true", + cache_expire_days=int(os.getenv("REDIS_CACHE_EXPIRE_DAYS", 360 * 2)), + embedding_cache_prefix=os.getenv("REDIS_EMBEDDING_CACHE_PREFIX", "embedding"), + anchor_cache_prefix=os.getenv("REDIS_ANCHOR_CACHE_PREFIX", "product_anchors"), + anchor_cache_expire_days=int(os.getenv("REDIS_ANCHOR_CACHE_EXPIRE_DAYS", 30)), + ), + database=DatabaseSettings( + host=os.getenv("DB_HOST"), + port=int(os.getenv("DB_PORT", 3306)) if os.getenv("DB_PORT") else 3306, + database=os.getenv("DB_DATABASE"), + username=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + ), + secrets=SecretsConfig( + dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"), + deepl_auth_key=os.getenv("DEEPL_AUTH_KEY"), + ), + ) + + def _validate(self, app_config: AppConfig) -> None: + errors: List[str] = [] + + if not app_config.search.es_index_name: + errors.append("search.es_index_name is required") + + if not app_config.search.field_boosts: + errors.append("search.field_boosts cannot be empty") + else: + for field_name, boost in app_config.search.field_boosts.items(): + if boost < 0: + errors.append(f"field_boosts.{field_name} must be non-negative") + + query_config = app_config.search.query_config + if not query_config.supported_languages: + errors.append("query_config.supported_languages must not be empty") + if query_config.default_language not in query_config.supported_languages: + errors.append("query_config.default_language must be included in supported_languages") + for name, values in ( + ("multilingual_fields", query_config.multilingual_fields), + ("shared_fields", query_config.shared_fields), + ("core_multilingual_fields", query_config.core_multilingual_fields), + ): + if not values: + errors.append(f"query_config.{name} must not be empty") + + if not set(query_config.core_multilingual_fields).issubset(set(query_config.multilingual_fields)): + errors.append("query_config.core_multilingual_fields must be a subset of multilingual_fields") + + if app_config.search.spu_config.enabled and not app_config.search.spu_config.spu_field: + errors.append("spu_config.spu_field is required when spu_config.enabled is true") + + if not app_config.tenants.default or not app_config.tenants.default.get("index_languages"): + errors.append("tenant_config.default.index_languages must be configured") + + if app_config.metadata.deprecated_keys: + errors.append( + "Deprecated tenant config keys are not supported: " + + ", ".join(app_config.metadata.deprecated_keys) + ) + + embedding_provider_cfg = app_config.services.embedding.get_provider_config() + if not embedding_provider_cfg.get("text_base_url"): + errors.append("services.embedding.providers..text_base_url is required") + if not embedding_provider_cfg.get("image_base_url"): + errors.append("services.embedding.providers..image_base_url is required") + + rerank_provider_cfg = app_config.services.rerank.get_provider_config() + if not rerank_provider_cfg.get("service_url") and not rerank_provider_cfg.get("base_url"): + errors.append("services.rerank.providers..service_url or base_url is required") + + if errors: + raise ConfigurationError("Configuration validation failed:\n" + "\n".join(f" - {err}" for err in errors)) + + def _compute_hash(self, app_config: AppConfig) -> str: + payload = asdict(app_config) + payload["metadata"]["config_hash"] = "" + payload["infrastructure"]["elasticsearch"]["password"] = "***" if payload["infrastructure"]["elasticsearch"].get("password") else None + payload["infrastructure"]["database"]["password"] = "***" if payload["infrastructure"]["database"].get("password") else None + payload["infrastructure"]["redis"]["password"] = "***" if payload["infrastructure"]["redis"].get("password") else None + payload["infrastructure"]["secrets"]["dashscope_api_key"] = "***" if payload["infrastructure"]["secrets"].get("dashscope_api_key") else None + payload["infrastructure"]["secrets"]["deepl_auth_key"] = "***" if payload["infrastructure"]["secrets"].get("deepl_auth_key") else None + blob = json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) + return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] + + def _detect_deprecated_keys(self, raw: Dict[str, Any]) -> Iterable[str]: + tenant_raw = raw.get("tenant_config") if isinstance(raw.get("tenant_config"), dict) else {} + for key in ("default",): + item = tenant_raw.get(key) + if isinstance(item, dict): + for deprecated in ("translate_to_en", "translate_to_zh"): + if deprecated in item: + yield f"tenant_config.{key}.{deprecated}" + tenants = tenant_raw.get("tenants") if isinstance(tenant_raw.get("tenants"), dict) else {} + for tenant_id, cfg in tenants.items(): + if not isinstance(cfg, dict): + continue + for deprecated in ("translate_to_en", "translate_to_zh"): + if deprecated in cfg: + yield f"tenant_config.tenants.{tenant_id}.{deprecated}" + + +@lru_cache(maxsize=1) +def get_app_config() -> AppConfig: + """Return the process-global application configuration.""" + + return AppConfigLoader().load() + + +def reload_app_config() -> AppConfig: + """Clear the cached configuration and reload it.""" + + get_app_config.cache_clear() + return get_app_config() + + +def _load_env_file_fallback(path: Path) -> None: + if not path.exists(): + return + with open(path, "r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value diff --git a/config/query_rewrite.dict b/config/query_rewrite.dict deleted file mode 100644 index 321979f..0000000 --- a/config/query_rewrite.dict +++ /dev/null @@ -1,3 +0,0 @@ -玩具 category.keyword:玩具 OR default:玩具 -消防 category.keyword:消防 OR default:消防 - diff --git a/config/schema.py b/config/schema.py new file mode 100644 index 0000000..99fa38b --- /dev/null +++ b/config/schema.py @@ -0,0 +1,307 @@ +""" +Typed configuration schema for the unified application configuration. + +This module defines the normalized in-memory structure used by all services. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + + +@dataclass(frozen=True) +class IndexConfig: + """Deprecated compatibility shape for legacy diagnostics/tests.""" + + name: str + label: str + fields: List[str] + boost: float = 1.0 + example: Optional[str] = None + + +@dataclass(frozen=True) +class QueryConfig: + """Configuration for query processing.""" + + supported_languages: List[str] = field(default_factory=lambda: ["zh", "en"]) + default_language: str = "en" + enable_text_embedding: bool = True + enable_query_rewrite: bool = True + rewrite_dictionary: Dict[str, str] = field(default_factory=dict) + text_embedding_field: Optional[str] = "title_embedding" + image_embedding_field: Optional[str] = None + source_fields: Optional[List[str]] = None + knn_boost: float = 0.25 + multilingual_fields: List[str] = field( + default_factory=lambda: [ + "title", + "brief", + "description", + "vendor", + "category_path", + "category_name_text", + ] + ) + shared_fields: List[str] = field( + default_factory=lambda: ["tags", "option1_values", "option2_values", "option3_values"] + ) + core_multilingual_fields: List[str] = field( + default_factory=lambda: ["title", "brief", "vendor", "category_name_text"] + ) + base_minimum_should_match: str = "75%" + translation_minimum_should_match: str = "75%" + translation_boost: float = 0.4 + translation_boost_when_source_missing: float = 1.0 + source_boost_when_missing: float = 0.6 + original_query_fallback_boost_when_translation_missing: float = 0.2 + tie_breaker_base_query: float = 0.9 + zh_to_en_model: str = "opus-mt-zh-en" + en_to_zh_model: str = "opus-mt-en-zh" + default_translation_model: str = "nllb-200-distilled-600m" + + +@dataclass(frozen=True) +class SPUConfig: + """SPU aggregation/search configuration.""" + + enabled: bool = False + spu_field: Optional[str] = None + inner_hits_size: int = 3 + searchable_option_dimensions: List[str] = field( + default_factory=lambda: ["option1", "option2", "option3"] + ) + + +@dataclass(frozen=True) +class FunctionScoreConfig: + """Function score configuration.""" + + score_mode: str = "sum" + boost_mode: str = "multiply" + functions: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass(frozen=True) +class RerankConfig: + """Search-time rerank configuration.""" + + enabled: bool = True + rerank_window: int = 384 + timeout_sec: float = 15.0 + weight_es: float = 0.4 + weight_ai: float = 0.6 + rerank_query_template: str = "{query}" + rerank_doc_template: str = "{title}" + + +@dataclass(frozen=True) +class SearchConfig: + """Search behavior configuration shared by backend and indexer.""" + + field_boosts: Dict[str, float] + indexes: List[IndexConfig] = field(default_factory=list) + query_config: QueryConfig = field(default_factory=QueryConfig) + function_score: FunctionScoreConfig = field(default_factory=FunctionScoreConfig) + rerank: RerankConfig = field(default_factory=RerankConfig) + spu_config: SPUConfig = field(default_factory=SPUConfig) + es_index_name: str = "search_products" + es_settings: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TranslationServiceConfig: + """Translator service configuration.""" + + endpoint: str + timeout_sec: float + default_model: str + default_scene: str + cache: Dict[str, Any] + capabilities: Dict[str, Dict[str, Any]] + + def as_dict(self) -> Dict[str, Any]: + return { + "service_url": self.endpoint, + "timeout_sec": self.timeout_sec, + "default_model": self.default_model, + "default_scene": self.default_scene, + "cache": self.cache, + "capabilities": self.capabilities, + } + + +@dataclass(frozen=True) +class EmbeddingServiceConfig: + """Embedding service configuration.""" + + provider: str + providers: Dict[str, Any] + backend: str + backends: Dict[str, Dict[str, Any]] + image_backend: str + image_backends: Dict[str, Dict[str, Any]] + + def get_provider_config(self) -> Dict[str, Any]: + return dict(self.providers.get(self.provider, {}) or {}) + + def get_backend_config(self) -> Dict[str, Any]: + return dict(self.backends.get(self.backend, {}) or {}) + + def get_image_backend_config(self) -> Dict[str, Any]: + return dict(self.image_backends.get(self.image_backend, {}) or {}) + + +@dataclass(frozen=True) +class RerankServiceConfig: + """Reranker service configuration.""" + + provider: str + providers: Dict[str, Any] + backend: str + backends: Dict[str, Dict[str, Any]] + request: Dict[str, Any] + + def get_provider_config(self) -> Dict[str, Any]: + return dict(self.providers.get(self.provider, {}) or {}) + + def get_backend_config(self) -> Dict[str, Any]: + return dict(self.backends.get(self.backend, {}) or {}) + + +@dataclass(frozen=True) +class ServicesConfig: + """All service-level configuration.""" + + translation: TranslationServiceConfig + embedding: EmbeddingServiceConfig + rerank: RerankServiceConfig + + +@dataclass(frozen=True) +class TenantCatalogConfig: + """Tenant catalog configuration.""" + + default: Dict[str, Any] + tenants: Dict[str, Dict[str, Any]] + + def get_raw(self) -> Dict[str, Any]: + return { + "default": dict(self.default), + "tenants": {str(key): dict(value) for key, value in self.tenants.items()}, + } + + +@dataclass(frozen=True) +class ElasticsearchSettings: + host: str = "http://localhost:9200" + username: Optional[str] = None + password: Optional[str] = None + + +@dataclass(frozen=True) +class RedisSettings: + host: str = "localhost" + port: int = 6479 + snapshot_db: int = 0 + password: Optional[str] = None + socket_timeout: int = 1 + socket_connect_timeout: int = 1 + retry_on_timeout: bool = False + cache_expire_days: int = 720 + embedding_cache_prefix: str = "embedding" + anchor_cache_prefix: str = "product_anchors" + anchor_cache_expire_days: int = 30 + + +@dataclass(frozen=True) +class DatabaseSettings: + host: Optional[str] = None + port: int = 3306 + database: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + + +@dataclass(frozen=True) +class SecretsConfig: + dashscope_api_key: Optional[str] = None + deepl_auth_key: Optional[str] = None + + +@dataclass(frozen=True) +class InfrastructureConfig: + elasticsearch: ElasticsearchSettings + redis: RedisSettings + database: DatabaseSettings + secrets: SecretsConfig + + +@dataclass(frozen=True) +class RuntimeConfig: + environment: str = "prod" + index_namespace: str = "" + api_host: str = "0.0.0.0" + api_port: int = 6002 + indexer_host: str = "0.0.0.0" + indexer_port: int = 6004 + embedding_host: str = "127.0.0.1" + embedding_port: int = 6005 + embedding_text_port: int = 6005 + embedding_image_port: int = 6008 + translator_host: str = "127.0.0.1" + translator_port: int = 6006 + reranker_host: str = "127.0.0.1" + reranker_port: int = 6007 + + +@dataclass(frozen=True) +class AssetsConfig: + query_rewrite_dictionary_path: Path + + +@dataclass(frozen=True) +class ConfigMetadata: + loaded_files: Tuple[str, ...] + config_hash: str + deprecated_keys: Tuple[str, ...] = field(default_factory=tuple) + + +@dataclass(frozen=True) +class AppConfig: + """Root application configuration.""" + + runtime: RuntimeConfig + infrastructure: InfrastructureConfig + search: SearchConfig + services: ServicesConfig + tenants: TenantCatalogConfig + assets: AssetsConfig + metadata: ConfigMetadata + + def sanitized_dict(self) -> Dict[str, Any]: + data = asdict(self) + data["infrastructure"]["elasticsearch"]["password"] = _mask_secret( + data["infrastructure"]["elasticsearch"].get("password") + ) + data["infrastructure"]["database"]["password"] = _mask_secret( + data["infrastructure"]["database"].get("password") + ) + data["infrastructure"]["redis"]["password"] = _mask_secret( + data["infrastructure"]["redis"].get("password") + ) + data["infrastructure"]["secrets"]["dashscope_api_key"] = _mask_secret( + data["infrastructure"]["secrets"].get("dashscope_api_key") + ) + data["infrastructure"]["secrets"]["deepl_auth_key"] = _mask_secret( + data["infrastructure"]["secrets"].get("deepl_auth_key") + ) + return data + + +def _mask_secret(value: Optional[str]) -> Optional[str]: + if not value: + return value + return "***" diff --git a/config/services_config.py b/config/services_config.py index 5eec562..0fa30a5 100644 --- a/config/services_config.py +++ b/config/services_config.py @@ -1,205 +1,76 @@ """ -Services configuration - single source for translation, embedding, rerank. +Unified service configuration accessors. -Translation is modeled as: -- one translator service endpoint used by business callers -- multiple translation capabilities loaded inside the translator service +This module is now a thin adapter over ``config.loader.get_app_config`` and +contains no independent parsing or precedence logic. """ from __future__ import annotations -import os -from dataclasses import dataclass, field -from functools import lru_cache -from pathlib import Path -from typing import Any, Dict, List, Optional - -import yaml -from translation.settings import TranslationConfig, build_translation_config, get_translation_cache - - -@dataclass -class ServiceConfig: - """Config for one capability (embedding/rerank).""" - - provider: str - providers: Dict[str, Any] = field(default_factory=dict) - - def get_provider_cfg(self) -> Dict[str, Any]: - p = (self.provider or "").strip().lower() - return self.providers.get(p, {}) if isinstance(self.providers, dict) else {} - - -def _load_services_raw(config_path: Optional[Path] = None) -> Dict[str, Any]: - if config_path is None: - config_path = Path(__file__).parent / "config.yaml" - path = Path(config_path) - if not path.exists(): - raise FileNotFoundError(f"services config file not found: {path}") - try: - with open(path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - except Exception as exc: - raise RuntimeError(f"failed to parse services config from {path}: {exc}") from exc - if not isinstance(data, dict): - raise RuntimeError(f"invalid config format in {path}: expected mapping root") - services = data.get("services") - if not isinstance(services, dict): - raise RuntimeError("config.yaml must contain a valid 'services' mapping") - return services - - -def _resolve_provider_name(env_name: str, config_provider: Any, capability: str) -> str: - provider = os.getenv(env_name) or config_provider - if not provider: - raise ValueError( - f"services.{capability}.provider is required " - f"(or set env override {env_name})" - ) - return str(provider).strip().lower() - - -def _resolve_translation() -> TranslationConfig: - raw = _load_services_raw() - cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} - return build_translation_config(cfg) - - -def _resolve_embedding() -> ServiceConfig: - raw = _load_services_raw() - cfg = raw.get("embedding", {}) if isinstance(raw.get("embedding"), dict) else {} - providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - - provider = _resolve_provider_name( - env_name="EMBEDDING_PROVIDER", - config_provider=cfg.get("provider"), - capability="embedding", - ) - if provider != "http": - raise ValueError(f"Unsupported embedding provider: {provider}") - - env_text_url = os.getenv("EMBEDDING_TEXT_SERVICE_URL") - env_image_url = os.getenv("EMBEDDING_IMAGE_SERVICE_URL") - if provider == "http": - providers = dict(providers) - http_cfg = dict(providers.get("http", {})) - if env_text_url: - http_cfg["text_base_url"] = env_text_url.rstrip("/") - if env_image_url: - http_cfg["image_base_url"] = env_image_url.rstrip("/") - if not http_cfg.get("text_base_url"): - raise ValueError("services.embedding.providers.http.text_base_url is required") - if not http_cfg.get("image_base_url"): - raise ValueError("services.embedding.providers.http.image_base_url is required") - providers["http"] = http_cfg - - return ServiceConfig(provider=provider, providers=providers) - - -def _resolve_rerank() -> ServiceConfig: - raw = _load_services_raw() - cfg = raw.get("rerank", {}) if isinstance(raw.get("rerank"), dict) else {} - providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - - provider = _resolve_provider_name( - env_name="RERANK_PROVIDER", - config_provider=cfg.get("provider"), - capability="rerank", - ) - if provider != "http": - raise ValueError(f"Unsupported rerank provider: {provider}") - - env_url = os.getenv("RERANKER_SERVICE_URL") - if env_url: - url = env_url.rstrip("/") - if not url.endswith("/rerank"): - url = f"{url}/rerank" if "/rerank" not in url else url - providers = dict(providers) - providers["http"] = dict(providers.get("http", {})) - providers["http"]["base_url"] = url.replace("/rerank", "") - providers["http"]["service_url"] = url - - return ServiceConfig(provider=provider, providers=providers) - - -def get_rerank_backend_config() -> tuple[str, dict]: - raw = _load_services_raw() - cfg = raw.get("rerank", {}) if isinstance(raw.get("rerank"), dict) else {} - backends = cfg.get("backends", {}) if isinstance(cfg.get("backends"), dict) else {} - name = os.getenv("RERANK_BACKEND") or cfg.get("backend") - if not name: - raise ValueError("services.rerank.backend is required (or env RERANK_BACKEND)") - name = str(name).strip().lower() - backend_cfg = backends.get(name, {}) if isinstance(backends.get(name), dict) else {} - if not backend_cfg: - raise ValueError(f"services.rerank.backends.{name} is required") - return name, backend_cfg - - -def get_embedding_backend_config() -> tuple[str, dict]: - raw = _load_services_raw() - cfg = raw.get("embedding", {}) if isinstance(raw.get("embedding"), dict) else {} - backends = cfg.get("backends", {}) if isinstance(cfg.get("backends"), dict) else {} - name = os.getenv("EMBEDDING_BACKEND") or cfg.get("backend") - if not name: - raise ValueError("services.embedding.backend is required (or env EMBEDDING_BACKEND)") - name = str(name).strip().lower() - backend_cfg = backends.get(name, {}) if isinstance(backends.get(name), dict) else {} - if not backend_cfg: - raise ValueError(f"services.embedding.backends.{name} is required") - return name, backend_cfg - - -@lru_cache(maxsize=1) -def get_translation_config() -> TranslationConfig: - return _resolve_translation() - - -@lru_cache(maxsize=1) -def get_embedding_config() -> ServiceConfig: - return _resolve_embedding() - - -@lru_cache(maxsize=1) -def get_rerank_config() -> ServiceConfig: - return _resolve_rerank() +from typing import Any, Dict, Tuple + +from config.loader import get_app_config +from config.schema import EmbeddingServiceConfig, RerankServiceConfig, TranslationServiceConfig + + +def get_translation_config() -> Dict[str, Any]: + return get_app_config().services.translation.as_dict() + + +def get_embedding_config() -> EmbeddingServiceConfig: + return get_app_config().services.embedding + + +def get_rerank_config() -> RerankServiceConfig: + return get_app_config().services.rerank def get_translation_base_url() -> str: - return str(get_translation_config()["service_url"]) + return get_app_config().services.translation.endpoint def get_translation_cache_config() -> Dict[str, Any]: - return get_translation_cache(get_translation_config()) + return dict(get_app_config().services.translation.cache) def get_embedding_text_base_url() -> str: - provider_cfg = get_embedding_config().providers.get("http", {}) - base = os.getenv("EMBEDDING_TEXT_SERVICE_URL") or provider_cfg.get("text_base_url") + provider_cfg = get_app_config().services.embedding.get_provider_config() + base = provider_cfg.get("text_base_url") if not base: - raise ValueError("Embedding text HTTP base_url is not configured") + raise ValueError("Embedding text base_url is not configured") return str(base).rstrip("/") def get_embedding_image_base_url() -> str: - provider_cfg = get_embedding_config().providers.get("http", {}) - base = os.getenv("EMBEDDING_IMAGE_SERVICE_URL") or provider_cfg.get("image_base_url") + provider_cfg = get_app_config().services.embedding.get_provider_config() + base = provider_cfg.get("image_base_url") if not base: - raise ValueError("Embedding image HTTP base_url is not configured") + raise ValueError("Embedding image base_url is not configured") return str(base).rstrip("/") +def get_embedding_backend_config() -> Tuple[str, Dict[str, Any]]: + cfg = get_app_config().services.embedding + return cfg.backend, cfg.get_backend_config() + + +def get_embedding_image_backend_config() -> Tuple[str, Dict[str, Any]]: + cfg = get_app_config().services.embedding + return cfg.image_backend, cfg.get_image_backend_config() + + +def get_rerank_backend_config() -> Tuple[str, Dict[str, Any]]: + cfg = get_app_config().services.rerank + return cfg.backend, cfg.get_backend_config() + + def get_rerank_base_url() -> str: - base = ( - os.getenv("RERANKER_SERVICE_URL") - or get_rerank_config().providers.get("http", {}).get("service_url") - or get_rerank_config().providers.get("http", {}).get("base_url") - ) + provider_cfg = get_app_config().services.rerank.get_provider_config() + base = provider_cfg.get("service_url") or provider_cfg.get("base_url") if not base: - raise ValueError("Rerank HTTP base_url is not configured") + raise ValueError("Rerank service URL is not configured") return str(base).rstrip("/") def get_rerank_service_url() -> str: - """Backward-compatible alias.""" return get_rerank_base_url() diff --git a/config/tenant_config_loader.py b/config/tenant_config_loader.py index 4450963..d007236 100644 --- a/config/tenant_config_loader.py +++ b/config/tenant_config_loader.py @@ -2,12 +2,13 @@ 租户配置加载器。 从统一配置文件(config.yaml)加载租户配置,包括主语言和索引语言(index_languages)。 -支持旧配置 translate_to_en / translate_to_zh 的兼容解析。 """ import logging from typing import Dict, Any, Optional, List +from config.loader import get_app_config + logger = logging.getLogger(__name__) # 支持的索引语言:code -> display name(供商家勾选主市场语言等场景使用) @@ -83,25 +84,13 @@ def resolve_index_languages( ) -> List[str]: """ 从租户配置解析 index_languages。 - 若存在 index_languages 则用之;否则按旧配置 translate_to_en / translate_to_zh 推导。 + 若配置缺失或非法,则回退到默认配置。 """ - if "index_languages" in tenant_config: - normalized = normalize_index_languages( - tenant_config["index_languages"], - tenant_config.get("primary_language") or "en", - ) - return normalized if normalized else list(default_index_languages) - primary = (tenant_config.get("primary_language") or "en").strip().lower() - to_en = bool(tenant_config.get("translate_to_en")) - to_zh = bool(tenant_config.get("translate_to_zh")) - langs: List[str] = [] - if primary and primary in SOURCE_LANG_CODE_MAP: - langs.append(primary) - for code in ("en", "zh"): - if code not in langs and ((code == "en" and to_en) or (code == "zh" and to_zh)): - if code in SOURCE_LANG_CODE_MAP: - langs.append(code) - return langs if langs else list(default_index_languages) + normalized = normalize_index_languages( + tenant_config.get("index_languages"), + tenant_config.get("primary_language") or "en", + ) + return normalized if normalized else list(default_index_languages) class TenantConfigLoader: @@ -122,15 +111,8 @@ class TenantConfigLoader: return self._config try: - from config import ConfigLoader - - config_loader = ConfigLoader() - search_config = config_loader.load_config() - tenant_cfg = search_config.tenant_config - if not isinstance(tenant_cfg, dict): - raise RuntimeError("tenant_config must be an object") - - default_cfg = tenant_cfg.get("default") + tenant_cfg = get_app_config().tenants + default_cfg = tenant_cfg.default if not isinstance(default_cfg, dict): raise RuntimeError("tenant_config.default must be configured in config.yaml") default_primary = (default_cfg.get("primary_language") or "en").strip().lower() @@ -143,7 +125,7 @@ class TenantConfigLoader: "tenant_config.default.index_languages must include at least one supported language" ) - tenants_cfg = tenant_cfg.get("tenants", {}) + tenants_cfg = tenant_cfg.tenants if not isinstance(tenants_cfg, dict): raise RuntimeError("tenant_config.tenants must be an object") diff --git a/config/utils.py b/config/utils.py index baa878d..6e44b68 100644 --- a/config/utils.py +++ b/config/utils.py @@ -1,7 +1,8 @@ """Configuration helper functions for dynamic multi-language search fields.""" from typing import Dict, List -from .config_loader import SearchConfig + +from config.schema import SearchConfig def _format_field_with_boost(field_name: str, boost: float) -> str: diff --git a/docs/config-system-review-and-redesign.md b/docs/config-system-review-and-redesign.md new file mode 100644 index 0000000..d06a3b6 --- /dev/null +++ b/docs/config-system-review-and-redesign.md @@ -0,0 +1,738 @@ +# Configuration System Review And Redesign + +## 1. Goal + +This document reviews the current configuration system and proposes a practical redesign for long-term maintainability. + +The target is a configuration system that is: + +- unified in loading and ownership +- clear in boundaries and precedence +- visible in effective behavior +- easy to evolve across development, deployment, and operations + +This review is based on the current implementation, not only on the intended architecture in docs. + +## 2. Project Context + +The repo already defines the right architectural direction: + +- `config/config.yaml` should be the main configuration source for search behavior and service wiring +- `.env` should mainly carry deployment-specific values and secrets +- provider/backend expansion should stay centralized instead of spreading through business code + +That direction is described in: + +- [`README.md`](/data/saas-search/README.md) +- [`docs/DEVELOPER_GUIDE.md`](/data/saas-search/docs/DEVELOPER_GUIDE.md) +- [`docs/QUICKSTART.md`](/data/saas-search/docs/QUICKSTART.md) +- [`translation/README.md`](/data/saas-search/translation/README.md) + +The problem is not the architectural intent. The problem is that the current implementation only partially follows it. + +## 3. Current-State Review + +### 3.1 What exists today + +The current system effectively has several configuration channels: + +- `config/config.yaml` + - search behavior + - rerank behavior + - services registry + - tenant config +- `config/config_loader.py` + - parses search behavior and tenant config into `SearchConfig` + - also injects some defaults from code +- `config/services_config.py` + - reparses `config/config.yaml` again, independently + - resolves translation, embedding, rerank service config + - also applies env overrides +- `config/env_config.py` + - loads `.env` + - defines ES, Redis, DB, host/port, service URLs, namespace, model path defaults +- service-local config modules + - [`embeddings/config.py`](/data/saas-search/embeddings/config.py) + - [`reranker/config.py`](/data/saas-search/reranker/config.py) +- startup scripts + - derive defaults from shell env, Python config, and YAML in different combinations +- inline fallbacks in business logic + - query parsing + - indexing + - service startup + +### 3.2 Main findings + +#### Finding A: there is no single loader for the full effective configuration + +`ConfigLoader` and `services_config` both parse `config/config.yaml`, but they do so separately and with different responsibilities. + +- [`config/config_loader.py`](/data/saas-search/config/config_loader.py#L148) +- [`config/services_config.py`](/data/saas-search/config/services_config.py#L33) + +Impact: + +- the same file is loaded twice through different code paths +- search config and services config can drift in interpretation +- alternative config paths are hard to support cleanly +- tests and tools cannot ask one place for the full effective config tree + +#### Finding B: precedence is not explicit, stable, or globally enforced + +Current precedence differs by subsystem: + +- search behavior mostly comes from YAML plus code defaults +- embedding and rerank allow env overrides for provider/backend/url +- translation intentionally blocks some env overrides +- startup scripts still choose host/port and mode via env +- some values are reconstructed from other env vars + +Examples: + +- env override for embedding provider/url/backend: + - [`config/services_config.py`](/data/saas-search/config/services_config.py#L52) + - [`config/services_config.py`](/data/saas-search/config/services_config.py#L68) + - [`config/services_config.py`](/data/saas-search/config/services_config.py#L139) +- host/port and service URL reconstruction: + - [`config/env_config.py`](/data/saas-search/config/env_config.py#L55) + - [`config/env_config.py`](/data/saas-search/config/env_config.py#L75) +- translator host/port still driven by startup env: + - [`scripts/start_translator.sh`](/data/saas-search/scripts/start_translator.sh#L28) + +Impact: + +- operators cannot reliably predict the effective configuration by reading one file +- the same setting category behaves differently across services +- incidents become harder to debug because source-of-truth depends on the code path + +#### Finding C: defaults are duplicated across YAML and code + +There are several layers of default values: + +- dataclass defaults in `QueryConfig` +- fallback defaults in `ConfigLoader._parse_config` +- defaults in `config.yaml` +- defaults in `env_config.py` +- defaults in `embeddings/config.py` +- defaults in `reranker/config.py` +- defaults in startup scripts + +Examples: + +- query defaults duplicated in dataclass and parser: + - [`config/config_loader.py`](/data/saas-search/config/config_loader.py#L24) + - [`config/config_loader.py`](/data/saas-search/config/config_loader.py#L240) +- embedding defaults duplicated in YAML, `services_config`, `embeddings/config.py`, and startup script: + - [`config/config.yaml`](/data/saas-search/config/config.yaml#L196) + - [`embeddings/config.py`](/data/saas-search/embeddings/config.py#L14) + - [`scripts/start_embedding_service.sh`](/data/saas-search/scripts/start_embedding_service.sh#L29) +- reranker defaults duplicated in YAML and `reranker/config.py`: + - [`config/config.yaml`](/data/saas-search/config/config.yaml#L214) + - [`reranker/config.py`](/data/saas-search/reranker/config.py#L6) + +Impact: + +- changing a default is risky because there may be multiple hidden copies +- code review cannot easily tell whether a value is authoritative or dead legacy +- “same config” may behave differently across processes + +#### Finding D: config is still embedded in runtime logic + +Some important behavior remains encoded as inline fallback logic rather than declared config. + +Examples: + +- query-time translation target languages fallback to `["en", "zh"]`: + - [`query/query_parser.py`](/data/saas-search/query/query_parser.py#L339) +- indexer text handling and LLM enrichment also fallback to `["en", "zh"]`: + - [`indexer/document_transformer.py`](/data/saas-search/indexer/document_transformer.py#L216) + - [`indexer/document_transformer.py`](/data/saas-search/indexer/document_transformer.py#L310) + - [`indexer/document_transformer.py`](/data/saas-search/indexer/document_transformer.py#L649) + +Impact: + +- configuration is not fully visible in config files +- behavior can silently change when tenant config is missing or malformed +- “default behavior” is spread across business modules + +#### Finding E: some configuration assets are not managed as first-class config + +Query rewrite is configured through an external file, but the file path is hardcoded and currently inconsistent with the repository content. + +- loader expects: + - [`config/config_loader.py`](/data/saas-search/config/config_loader.py#L162) +- repo currently contains: + - [`config/query_rewrite.dict`](/data/saas-search/config/query_rewrite.dict) + +There is also an admin API that mutates rewrite rules in memory only: + +- [`api/routes/admin.py`](/data/saas-search/api/routes/admin.py#L68) +- [`query/query_parser.py`](/data/saas-search/query/query_parser.py#L622) + +Impact: + +- rewrite rules are neither cleanly file-backed nor fully runtime-managed +- restart behavior is unclear +- configuration visibility and persistence are weak + +#### Finding F: visibility is limited + +The system exposes only a small sanitized subset at `/admin/config`. + +- [`api/routes/admin.py`](/data/saas-search/api/routes/admin.py#L42) + +At the same time, the true effective config includes: + +- tenant overlays +- env overrides +- service backend selections +- script-selected modes +- hidden defaults in code + +Impact: + +- there is no authoritative “effective config” view +- debugging configuration mismatches requires source reading +- operators cannot easily verify what each process actually started with + +#### Finding G: the indexer does not really consume the unified config as a first-class dependency + +Indexer startup explicitly says config is loaded only for parity/logging and routes do not depend on it. + +- [`api/indexer_app.py`](/data/saas-search/api/indexer_app.py#L76) + +Impact: + +- configuration is not truly system-wide +- search-side and indexer-side behavior can drift +- the current “unified config” is only partially unified + +#### Finding H: docs still carry legacy and mixed mental models + +Most high-level docs describe the desired centralized model, but some implementation/docs still expose legacy concepts such as `translate_to_en` and `translate_to_zh`. + +- desired model: + - [`README.md`](/data/saas-search/README.md#L78) + - [`docs/DEVELOPER_GUIDE.md`](/data/saas-search/docs/DEVELOPER_GUIDE.md#L207) + - [`translation/README.md`](/data/saas-search/translation/README.md#L161) +- legacy tenant translation flags still documented: + - [`indexer/README.md`](/data/saas-search/indexer/README.md#L39) + +Impact: + +- new developers may follow old mental models +- cleanup work keeps getting deferred because old and new systems appear both “supported” + +## 4. Design Principles For The Redesign + +The redesign should follow these rules. + +### 4.1 One logical configuration system + +It is acceptable to have multiple files, but not multiple loaders with overlapping ownership. + +There must be one loader pipeline that produces one typed `AppConfig`. + +### 4.2 Configuration files declare, parser code interprets, env provides runtime injection + +Responsibilities should be: + +- configuration files + - declare non-secret desired behavior and non-secret deployable settings +- parsing logic + - load, merge, validate, normalize, and expose typed config + - never invent hidden business behavior +- environment variables + - carry secrets and a small set of runtime/process values + - do not redefine business behavior casually + +### 4.3 One precedence rule for the whole system + +Every config category should follow the same merge model unless explicitly exempted. + +### 4.4 No silent implicit fallback for business behavior + +Fail fast at startup when required config is missing or invalid. + +Do not silently fall back to legacy behavior such as hardcoded language lists. + +### 4.5 Effective configuration must be observable + +Every service should be able to show: + +- config version or hash +- source files loaded +- environment name +- sanitized effective configuration + +## 5. Recommended Target Design + +## 5.1 Boundary model + +Use three clear layers. + +### Layer 1: repository-managed static config + +Purpose: + +- search behavior +- tenant behavior +- provider/backend registry +- non-secret service topology defaults +- feature switches + +Examples: + +- field boosts +- query strategy +- rerank fusion parameters +- tenant language plans +- translation capability registry +- embedding backend selection default + +### Layer 2: environment-specific overlays + +Purpose: + +- per-environment non-secret differences +- service endpoints by environment +- resource sizing defaults by environment +- dev/test/prod operational differences + +Examples: + +- local embedding URL vs production URL +- dev rerank backend vs prod rerank backend +- lower concurrency in local development + +### Layer 3: environment variables + +Purpose: + +- secrets +- bind host/port +- external infrastructure credentials +- container-orchestrator last-mile injection + +Examples: + +- `ES_HOST`, `ES_USERNAME`, `ES_PASSWORD` +- `DB_HOST`, `DB_USERNAME`, `DB_PASSWORD` +- `REDIS_HOST`, `REDIS_PASSWORD` +- `DASHSCOPE_API_KEY`, `DEEPL_AUTH_KEY` +- `API_HOST`, `API_PORT`, `INDEXER_PORT`, `TRANSLATION_PORT` + +Rule: + +- environment variables should not be the normal path for choosing business behavior such as translation model, embedding backend, or tenant language policy +- if an env override is allowed for a non-secret field, it must be explicitly listed and documented as an operational override, not a hidden convention + +## 5.2 Unified precedence + +Recommended precedence: + +1. schema defaults in code +2. `config/base.yaml` +3. `config/environments/.yaml` +4. tenant overlay from `config/tenants/` +5. environment variables for the explicitly allowed runtime keys +6. CLI flags for the current process only + +Important rule: + +- only one module may implement this merge logic +- no business module may call `os.getenv()` directly for configuration + +## 5.3 Recommended directory structure + +```text +config/ + schema.py + loader.py + sources.py + base.yaml + environments/ + dev.yaml + test.yaml + prod.yaml + tenants/ + _default.yaml + 1.yaml + 162.yaml + 170.yaml + dictionaries/ + query_rewrite.dict + README.md +.env.example +``` + +Notes: + +- `base.yaml` contains shared defaults and feature behavior +- `environments/*.yaml` contains environment-specific non-secret overrides +- `tenants/*.yaml` contains tenant-specific overrides only +- `dictionaries/` stores first-class config assets such as rewrite dictionaries +- `schema.py` defines the typed config model +- `loader.py` is the only entry point that loads and merges config + +If the team prefers fewer files, `tenants.yaml` is also acceptable. The key requirement is not “one file”, but “one loading model with clear ownership”. + +## 5.4 Typed configuration model + +Introduce one root object, for example: + +```python +class AppConfig(BaseModel): + runtime: RuntimeConfig + infrastructure: InfrastructureConfig + search: SearchConfig + services: ServicesConfig + tenants: TenantCatalogConfig + assets: ConfigAssets +``` + +Suggested subtrees: + +- `runtime` + - environment name + - config revision/hash + - bind addresses/ports +- `infrastructure` + - ES + - DB + - Redis + - index namespace +- `search` + - field boosts + - query config + - function score + - rerank behavior + - spu config +- `services` + - translation + - embedding + - rerank +- `tenants` + - default tenant config + - tenant overrides +- `assets` + - rewrite dictionary path + +Benefits: + +- one validated object shared by backend, indexer, translator, embedding, reranker +- one place for defaults +- one place for schema evolution + +## 5.5 Loading flow + +Recommended loading flow: + +1. determine `APP_ENV` or `RUNTIME_ENV` +2. load schema defaults +3. load `config/base.yaml` +4. load `config/environments/.yaml` if present +5. load tenant files +6. inject first-class assets such as rewrite dictionary +7. apply allowed env overrides +8. validate the final `AppConfig` +9. freeze and cache the config object +10. expose a sanitized effective-config view + +Important: + +- every process should call the same loader +- services should receive a resolved `AppConfig`, not re-open YAML independently + +## 5.6 Clear responsibility split + +### Configuration files are responsible for + +- what the system should do +- what providers/backends are available +- which features are enabled +- tenant language/index policies +- non-secret service topology + +### Parser/loader code is responsible for + +- locating sources +- merge precedence +- type validation +- normalization +- deprecation warnings +- producing the final immutable config object + +### Environment variables are responsible for + +- secrets +- bind addresses/ports +- infrastructure endpoints when the deployment platform injects them +- a very small set of documented operational overrides + +### Business code is not responsible for + +- inventing defaults for missing config +- loading YAML directly +- calling `os.getenv()` for normal application behavior + +## 5.7 How to handle service config + +Unify all service-facing config under one structure: + +```yaml +services: + translation: + endpoint: "http://translator:6006" + timeout_sec: 10 + default_model: "llm" + default_scene: "general" + capabilities: ... + embedding: + endpoint: + text: "http://embedding:6005" + image: "http://embedding-image:6008" + backend: "tei" + backends: ... + rerank: + endpoint: "http://reranker:6007/rerank" + backend: "qwen3_vllm" + backends: ... +``` + +Rules: + +- `endpoint` is how callers reach the service +- `backend` is how the service itself is implemented +- only the service process cares about `backend` +- only callers care about `endpoint` +- both still belong to the same config tree, because they are part of one system + +## 5.8 How to handle tenant config + +Tenant config should become explicit policy, not translation-era leftovers. + +Recommended tenant fields: + +- `primary_language` +- `index_languages` +- `search_languages` +- `translation_policy` +- `facet_policy` +- optional tenant-specific ranking overrides + +Avoid keeping `translate_to_en` and `translate_to_zh` as active concepts in the long-term model. + +If compatibility is needed, support them only in the loader as deprecated aliases and emit warnings. + +## 5.9 How to handle rewrite rules and similar assets + +Treat them as declared config assets. + +Recommended rules: + +- file path declared in config +- one canonical location under `config/dictionaries/` +- loader validates presence and format +- admin runtime updates either: + - are removed, or + - write back through a controlled persistence path + +Do not keep a hybrid model where startup loads one file and admin mutates only in memory. + +## 5.10 Observability improvements + +Add the following: + +- `config dump` CLI that prints sanitized effective config +- startup log with config hash, environment, and config file list +- `/admin/config/effective` endpoint returning sanitized effective config +- `/admin/config/meta` endpoint returning: + - environment + - config hash + - loaded source files + - deprecated keys in use + +This is important for operations and for multi-service debugging. + +## 6. Practical Refactor Plan + +The refactor should be incremental. + +### Phase 1: establish the new config core without changing behavior + +- create `config/schema.py` +- create `config/loader.py` +- move all current defaults into schema models +- make loader read current `config/config.yaml` +- make loader read `.env` only for approved keys +- expose one `get_app_config()` + +Result: + +- same behavior, but one typed root config becomes available + +### Phase 2: remove duplicate readers + +- make `services_config.py` a thin adapter over `get_app_config()` +- make `tenant_config_loader.py` read from `get_app_config()` +- stop reparsing YAML in `services_config.py` +- stop service modules from depending on legacy local config modules for behavior + +Result: + +- one parsing path +- fewer divergence risks + +### Phase 3: move hidden defaults out of business logic + +- remove hardcoded fallback language lists from query/indexer modules +- require tenant defaults to come from config schema only +- remove duplicate behavior defaults from service code + +Result: + +- behavior becomes visible and reviewable + +### Phase 4: clean service startup configuration + +- make startup scripts ask the unified loader for resolved values +- keep only bind host/port and secret injection in shell env +- retire or reduce `embeddings/config.py` and `reranker/config.py` + +Result: + +- startup behavior matches runtime config model + +### Phase 5: split config files by responsibility + +- keep a single root loader +- split current giant `config.yaml` into: + - `base.yaml` + - `environments/.yaml` + - `tenants/*.yaml` + - `dictionaries/query_rewrite.dict` + +Result: + +- config remains unified logically, but is easier to read and maintain physically + +### Phase 6: deprecate legacy compatibility + +- deprecate `translate_to_en` and `translate_to_zh` +- deprecate env-based backend/provider selection except for explicitly approved keys +- remove old code paths after one or two release cycles + +Result: + +- the system becomes simpler instead of carrying two generations forever + +## 7. Concrete Rules To Adopt + +These rules should be documented and enforced in code review. + +### Rule 1 + +Only `config/loader.py` may load config files or `.env`. + +### Rule 2 + +Only `config/loader.py` may read `os.getenv()` for application config. + +### Rule 3 + +Business modules receive typed config objects and do not read files or env directly. + +### Rule 4 + +Each config key has one owner. + +Examples: + +- `search.query.knn_boost` belongs to search behavior config +- `services.embedding.backend` belongs to service implementation config +- `infrastructure.redis.password` belongs to env/secrets + +### Rule 5 + +Every fallback must be either: + +- declared in schema defaults, or +- rejected at startup + +No hidden fallback in runtime logic. + +### Rule 6 + +Every configuration asset must be visible in one of these places only: + +- config file +- env var +- generated runtime metadata + +Not inside parser code as an implicit constant. + +## 8. Recommended Naming Conventions + +Suggested conventions: + +- config keys use noun-based hierarchical names +- avoid mixing transport and implementation concepts in one field +- use `endpoint` for caller-facing addresses +- use `backend` for service-internal implementation choice +- use `enabled` only for true feature toggles +- use `default_*` only when a real selection happens at runtime + +Examples: + +- good: `services.rerank.endpoint` +- good: `services.rerank.backend` +- good: `tenants.default.index_languages` +- avoid: `service_url`, `base_url`, `provider`, `backend`, and script env all meaning slightly different things without a common model + +## 9. Highest-Priority Cleanup Items + +If the team wants the shortest path to improvement, start here: + +1. build one root `AppConfig` +2. make `services_config.py` stop reparsing YAML +3. declare rewrite dictionary path explicitly and fix the current mismatch +4. remove hardcoded `["en", "zh"]` fallbacks from query/indexer logic +5. replace `/admin/config` with an effective-config endpoint +6. retire `embeddings/config.py` and `reranker/config.py` as behavior sources +7. deprecate legacy tenant translation flags + +## 10. Expected Outcome + +After the redesign: + +- developers can answer “where does this setting come from?” in one step +- operators can see effective config without reading source code +- backend, indexer, translator, embedding, and reranker all share one model +- tenant behavior is explicit instead of partially implicit +- migration becomes safer because defaults and precedence are centralized +- adding a new provider/backend becomes configuration extension, not configuration archaeology + +## 11. Summary + +The current system has the right intent but not yet the right implementation shape. + +Today the main problems are: + +- duplicate config loaders +- inconsistent precedence +- duplicated defaults +- config hidden in runtime logic +- weak effective-config visibility +- leftover legacy concepts + +The recommended direction is: + +- one root typed config +- one loader pipeline +- explicit layered sources +- narrow env responsibility +- no hidden business fallbacks +- observable effective config + +That design is practical to implement incrementally in this repository and aligns well with the project's multi-tenant, multi-service, provider/backend-based architecture. diff --git a/embeddings/config.py b/embeddings/config.py index 61c18f2..2591b90 100644 --- a/embeddings/config.py +++ b/embeddings/config.py @@ -1,44 +1,39 @@ -""" -Embedding module configuration. +"""Embedding service compatibility config derived from unified app config.""" -This module is intentionally a plain Python file (no env var parsing, no extra deps). -Edit values here to configure: -- server host/port -- local model settings (paths/devices/batch sizes) -""" +from __future__ import annotations from typing import Optional -import os + +from config.loader import get_app_config class EmbeddingConfig(object): - # Server - HOST = os.getenv("EMBEDDING_HOST", "0.0.0.0") - PORT = int(os.getenv("EMBEDDING_PORT", 6005)) - - # Text backend defaults - TEXT_MODEL_ID = os.getenv("TEXT_MODEL_ID", "Qwen/Qwen3-Embedding-0.6B") - # Keep TEXT_MODEL_DIR as an alias so code can refer to one canonical text model value. - TEXT_MODEL_DIR = TEXT_MODEL_ID - TEXT_DEVICE = os.getenv("TEXT_DEVICE", "cuda") # "cuda" or "cpu" - TEXT_BATCH_SIZE = int(os.getenv("TEXT_BATCH_SIZE", "32")) - TEXT_NORMALIZE_EMBEDDINGS = os.getenv("TEXT_NORMALIZE_EMBEDDINGS", "true").lower() in ("1", "true", "yes") - TEI_BASE_URL = os.getenv("TEI_BASE_URL", "http://127.0.0.1:8080") - TEI_TIMEOUT_SEC = int(os.getenv("TEI_TIMEOUT_SEC", "60")) - - # Image embeddings - # Option A: clip-as-service (Jina CLIP server, recommended) - USE_CLIP_AS_SERVICE = os.getenv("USE_CLIP_AS_SERVICE", "true").lower() in ("1", "true", "yes") - CLIP_AS_SERVICE_SERVER = os.getenv("CLIP_AS_SERVICE_SERVER", "grpc://127.0.0.1:51000") - CLIP_AS_SERVICE_MODEL_NAME = os.getenv("CLIP_AS_SERVICE_MODEL_NAME", "CN-CLIP/ViT-L-14") - - # Option B: local CN-CLIP (when USE_CLIP_AS_SERVICE=false) - IMAGE_MODEL_NAME = os.getenv("IMAGE_MODEL_NAME", "ViT-L-14") - IMAGE_DEVICE = None # type: Optional[str] # "cuda" / "cpu" / None(auto) - - # Service behavior - IMAGE_BATCH_SIZE = 8 - IMAGE_NORMALIZE_EMBEDDINGS = os.getenv("IMAGE_NORMALIZE_EMBEDDINGS", "true").lower() in ("1", "true", "yes") + def __init__(self) -> None: + app_config = get_app_config() + runtime = app_config.runtime + services = app_config.services.embedding + text_backend = services.get_backend_config() + image_backend = services.get_image_backend_config() + + self.HOST = runtime.embedding_host + self.PORT = runtime.embedding_port + + self.TEXT_MODEL_ID = str(text_backend.get("model_id") or "Qwen/Qwen3-Embedding-0.6B") + self.TEXT_MODEL_DIR = self.TEXT_MODEL_ID + self.TEXT_DEVICE = str(text_backend.get("device") or "cuda") + self.TEXT_BATCH_SIZE = int(text_backend.get("batch_size", 32)) + self.TEXT_NORMALIZE_EMBEDDINGS = bool(text_backend.get("normalize_embeddings", True)) + self.TEI_BASE_URL = str(text_backend.get("base_url") or "http://127.0.0.1:8080") + self.TEI_TIMEOUT_SEC = int(text_backend.get("timeout_sec", 60)) + + self.USE_CLIP_AS_SERVICE = services.image_backend == "clip_as_service" + self.CLIP_AS_SERVICE_SERVER = str(image_backend.get("server") or "grpc://127.0.0.1:51000") + self.CLIP_AS_SERVICE_MODEL_NAME = str(image_backend.get("model_name") or "CN-CLIP/ViT-L-14") + + self.IMAGE_MODEL_NAME = str(image_backend.get("model_name") or "ViT-L-14") + self.IMAGE_DEVICE = image_backend.get("device") # type: Optional[str] + self.IMAGE_BATCH_SIZE = int(image_backend.get("batch_size", 8)) + self.IMAGE_NORMALIZE_EMBEDDINGS = bool(image_backend.get("normalize_embeddings", True)) CONFIG = EmbeddingConfig() diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index acd1502..acb313a 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -9,8 +9,8 @@ from PIL import Image logger = logging.getLogger(__name__) +from config.loader import get_app_config from config.services_config import get_embedding_image_base_url -from config.env_config import REDIS_CONFIG from embeddings.cache_keys import build_image_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache @@ -24,10 +24,11 @@ class CLIPImageEncoder: def __init__(self, service_url: Optional[str] = None): resolved_url = service_url or get_embedding_image_base_url() + redis_config = get_app_config().infrastructure.redis self.service_url = str(resolved_url).rstrip("/") self.endpoint = f"{self.service_url}/embed/image" # Reuse embedding cache prefix, but separate namespace for images to avoid collisions. - self.cache_prefix = str(REDIS_CONFIG.get("embedding_cache_prefix", "embedding")).strip() or "embedding" + self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding" logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url) self.cache = RedisEmbeddingCache( key_prefix=self.cache_prefix, diff --git a/embeddings/redis_embedding_cache.py b/embeddings/redis_embedding_cache.py index 50298e6..ed1b2c3 100644 --- a/embeddings/redis_embedding_cache.py +++ b/embeddings/redis_embedding_cache.py @@ -20,7 +20,7 @@ try: except ImportError: # pragma: no cover - runtime fallback for minimal envs redis = None # type: ignore[assignment] -from config.env_config import REDIS_CONFIG +from config.loader import get_app_config from embeddings.bf16 import decode_embedding_from_redis, encode_embedding_for_redis logger = logging.getLogger(__name__) @@ -37,7 +37,8 @@ class RedisEmbeddingCache: ): self.key_prefix = (key_prefix or "").strip() or "embedding" self.namespace = (namespace or "").strip() - self.expire_time = expire_time or timedelta(days=REDIS_CONFIG.get("cache_expire_days", 180)) + redis_config = get_app_config().infrastructure.redis + self.expire_time = expire_time or timedelta(days=redis_config.cache_expire_days) if redis_client is not None: self.redis_client = redis_client @@ -50,13 +51,13 @@ class RedisEmbeddingCache: try: client = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), + host=redis_config.host, + port=redis_config.port, + password=redis_config.password, decode_responses=False, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + socket_timeout=redis_config.socket_timeout, + socket_connect_timeout=redis_config.socket_connect_timeout, + retry_on_timeout=redis_config.retry_on_timeout, health_check_interval=10, ) client.ping() diff --git a/embeddings/server.py b/embeddings/server.py index 85e0c28..0f864b9 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -470,16 +470,8 @@ def load_models(): if backend_name == "tei": from embeddings.text_embedding_tei import TEITextModel - base_url = ( - os.getenv("TEI_BASE_URL") - or backend_cfg.get("base_url") - or CONFIG.TEI_BASE_URL - ) - timeout_sec = int( - os.getenv("TEI_TIMEOUT_SEC") - or backend_cfg.get("timeout_sec") - or CONFIG.TEI_TIMEOUT_SEC - ) + base_url = backend_cfg.get("base_url") or CONFIG.TEI_BASE_URL + timeout_sec = int(backend_cfg.get("timeout_sec") or CONFIG.TEI_TIMEOUT_SEC) logger.info("Loading text backend: tei (base_url=%s)", base_url) _text_model = TEITextModel( base_url=str(base_url), @@ -488,11 +480,7 @@ def load_models(): elif backend_name == "local_st": from embeddings.text_embedding_sentence_transformers import Qwen3TextModel - model_id = ( - os.getenv("TEXT_MODEL_ID") - or backend_cfg.get("model_id") - or CONFIG.TEXT_MODEL_ID - ) + model_id = backend_cfg.get("model_id") or CONFIG.TEXT_MODEL_ID logger.info("Loading text backend: local_st (model=%s)", model_id) _text_model = Qwen3TextModel(model_id=str(model_id)) _start_text_batch_worker() diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index e1067c3..048f7fa 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -9,13 +9,11 @@ import requests logger = logging.getLogger(__name__) +from config.loader import get_app_config from config.services_config import get_embedding_text_base_url from embeddings.cache_keys import build_text_cache_key from embeddings.redis_embedding_cache import RedisEmbeddingCache -# Try to import REDIS_CONFIG, but allow import to fail -from config.env_config import REDIS_CONFIG - class TextEmbeddingEncoder: """ @@ -24,10 +22,11 @@ class TextEmbeddingEncoder: def __init__(self, service_url: Optional[str] = None): resolved_url = service_url or get_embedding_text_base_url() + redis_config = get_app_config().infrastructure.redis self.service_url = str(resolved_url).rstrip("/") self.endpoint = f"{self.service_url}/embed/text" - self.expire_time = timedelta(days=REDIS_CONFIG.get("cache_expire_days", 180)) - self.cache_prefix = str(REDIS_CONFIG.get("embedding_cache_prefix", "embedding")).strip() or "embedding" + self.expire_time = timedelta(days=redis_config.cache_expire_days) + self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding" logger.info("Creating TextEmbeddingEncoder instance with service URL: %s", self.service_url) self.cache = RedisEmbeddingCache( diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index b4885bd..0fb256e 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -13,7 +13,6 @@ import numpy as np import logging import re from typing import Dict, Any, Optional, List -from config import ConfigLoader from indexer.product_enrich import analyze_products logger = logging.getLogger(__name__) diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 5da37b3..cfa38ef 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -13,7 +13,7 @@ from indexer.mapping_generator import get_tenant_index_name from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_spu_processing ) -from config import ConfigLoader +from config import get_app_config from translation import create_translation_client # Configure logger @@ -51,7 +51,7 @@ class IncrementalIndexerService: def _eager_init(self) -> None: """Strict eager initialization. Any dependency failure should fail fast.""" - self._config = ConfigLoader("config/config.yaml").load_config() + self._config = get_app_config().search self._searchable_option_dimensions = ( getattr(self._config.spu_config, "searchable_option_dimensions", None) or ["option1", "option2", "option3"] diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index c63ccac..f8216f0 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -7,7 +7,7 @@ import logging from typing import Dict, Any, Optional from sqlalchemy import Engine, text -from config import ConfigLoader +from config import get_app_config from config.tenant_config_loader import get_tenant_config_loader from indexer.document_transformer import SPUDocumentTransformer from translation import create_translation_client @@ -92,8 +92,7 @@ def create_document_transformer( or config is None ): if config is None: - config_loader = ConfigLoader() - config = config_loader.load_config() + config = get_app_config().search if searchable_option_dimensions is None: searchable_option_dimensions = config.spu_config.searchable_option_dimensions diff --git a/indexer/mapping_generator.py b/indexer/mapping_generator.py index 17318fd..2502630 100644 --- a/indexer/mapping_generator.py +++ b/indexer/mapping_generator.py @@ -9,7 +9,7 @@ import json import logging from pathlib import Path -from config.env_config import ES_INDEX_NAMESPACE +from config.loader import get_app_config logger = logging.getLogger(__name__) @@ -30,7 +30,7 @@ def get_tenant_index_name(tenant_id: str) -> str: 其中 ES_INDEX_NAMESPACE 由 config.env_config.ES_INDEX_NAMESPACE 控制, 用于区分 prod/uat/test 等不同运行环境。 """ - prefix = ES_INDEX_NAMESPACE or "" + prefix = get_app_config().runtime.index_namespace or "" return f"{prefix}search_products_tenant_{tenant_id}" diff --git a/indexer/product_enrich.py b/indexer/product_enrich.py index 35e3567..9f54849 100644 --- a/indexer/product_enrich.py +++ b/indexer/product_enrich.py @@ -20,7 +20,7 @@ import redis import requests from pathlib import Path -from config.env_config import REDIS_CONFIG +from config.loader import get_app_config from config.tenant_config_loader import SOURCE_LANG_CODE_MAP from indexer.product_enrich_prompts import ( SYSTEM_MESSAGE, @@ -91,19 +91,20 @@ logger.info("Verbose LLM logs are written to: %s", verbose_log_file) # Redis 缓存(用于 anchors / 语义属性) -ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") -ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) +_REDIS_CONFIG = get_app_config().infrastructure.redis +ANCHOR_CACHE_PREFIX = _REDIS_CONFIG.anchor_cache_prefix +ANCHOR_CACHE_EXPIRE_DAYS = int(_REDIS_CONFIG.anchor_cache_expire_days) _anchor_redis: Optional[redis.Redis] = None try: _anchor_redis = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), + host=_REDIS_CONFIG.host, + port=_REDIS_CONFIG.port, + password=_REDIS_CONFIG.password, decode_responses=True, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + socket_timeout=_REDIS_CONFIG.socket_timeout, + socket_connect_timeout=_REDIS_CONFIG.socket_connect_timeout, + retry_on_timeout=_REDIS_CONFIG.retry_on_timeout, health_check_interval=10, ) _anchor_redis.ping() diff --git a/main.py b/main.py index 36aabe0..ed77382 100755 --- a/main.py +++ b/main.py @@ -16,8 +16,7 @@ import json # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -from config import ConfigLoader -from config.env_config import ES_CONFIG +from config import get_app_config from utils import ESClient from search import Searcher from suggestion import SuggestionIndexBuilder @@ -61,8 +60,7 @@ def cmd_serve_indexer(args): def cmd_search(args): """Test search from command line.""" # Load config - config_loader = ConfigLoader("config/config.yaml") - config = config_loader.load_config() + config = get_app_config().search # Initialize ES and searcher es_client = ESClient(hosts=[args.es_host]) @@ -106,8 +104,9 @@ def cmd_search(args): def cmd_build_suggestions(args): """Build/update suggestion index for a tenant.""" # Initialize ES client with optional authentication - es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username") - es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password") + es_cfg = get_app_config().infrastructure.elasticsearch + es_username = es_cfg.username + es_password = es_cfg.password if es_username and es_password: es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password) else: @@ -117,11 +116,12 @@ def cmd_build_suggestions(args): return 1 # Build DB config directly from environment to avoid dotenv dependency - db_host = os.getenv("DB_HOST") - db_port = int(os.getenv("DB_PORT", "3306")) - db_name = os.getenv("DB_DATABASE") - db_user = os.getenv("DB_USERNAME") - db_pass = os.getenv("DB_PASSWORD") + db_cfg = get_app_config().infrastructure.database + db_host = db_cfg.host + db_port = db_cfg.port + db_name = db_cfg.database + db_user = db_cfg.username + db_pass = db_cfg.password if not all([db_host, db_name, db_user, db_pass]): print("ERROR: DB_HOST/DB_PORT/DB_DATABASE/DB_USERNAME/DB_PASSWORD must be set in environment") return 1 @@ -170,7 +170,7 @@ def main(): serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)') serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_parser.add_argument('--port', type=int, default=6002, help='Port to bind to') - serve_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') + serve_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Serve-indexer command @@ -180,14 +180,14 @@ def main(): ) serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to') - serve_indexer_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') + serve_indexer_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') # Search command search_parser = subparsers.add_parser('search', help='Test search from command line') search_parser.add_argument('query', help='Search query') search_parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') - search_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') + search_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') search_parser.add_argument('--size', type=int, default=10, help='Number of results') search_parser.add_argument('--no-translation', action='store_true', help='Disable translation') search_parser.add_argument('--no-embedding', action='store_true', help='Disable embeddings') @@ -199,7 +199,7 @@ def main(): help='Build tenant suggestion index (full/incremental)' ) suggest_build_parser.add_argument('--tenant-id', required=True, help='Tenant ID') - suggest_build_parser.add_argument('--es-host', default=ES_CONFIG.get('host', 'http://localhost:9200'), help='Elasticsearch host') + suggest_build_parser.add_argument('--es-host', default=get_app_config().infrastructure.elasticsearch.host, help='Elasticsearch host') suggest_build_parser.add_argument( '--mode', choices=['full', 'incremental'], diff --git a/query/query_parser.py b/query/query_parser.py index fef9d75..741d93c 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -336,13 +336,13 @@ class QueryParser: translations = {} translation_futures = {} translation_executor = None - index_langs = ["en", "zh"] + index_langs: List[str] = [] try: # 根据租户配置的 index_languages 决定翻译目标语言 from config.tenant_config_loader import get_tenant_config_loader tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id or "default") - raw_index_langs = tenant_cfg.get("index_languages") or ["en", "zh"] + raw_index_langs = tenant_cfg.get("index_languages") or [] index_langs = [] seen_langs = set() for lang in raw_index_langs: diff --git a/reranker/backends/dashscope_rerank.py b/reranker/backends/dashscope_rerank.py index fefa67c..cf23442 100644 --- a/reranker/backends/dashscope_rerank.py +++ b/reranker/backends/dashscope_rerank.py @@ -63,43 +63,19 @@ class DashScopeRerankBackend: - max_retries: int, default 1 - retry_backoff_sec: float, default 0.2 - Env overrides: - - RERANK_DASHSCOPE_ENDPOINT - - RERANK_DASHSCOPE_MODEL - - RERANK_DASHSCOPE_TIMEOUT_SEC - - RERANK_DASHSCOPE_TOP_N_CAP - - RERANK_DASHSCOPE_BATCHSIZE """ def __init__(self, config: Dict[str, Any]) -> None: self._config = config or {} - self._model_name = str( - os.getenv("RERANK_DASHSCOPE_MODEL") - or self._config.get("model_name") - or "qwen3-rerank" - ) + self._model_name = str(self._config.get("model_name") or "qwen3-rerank") self._endpoint = str( - os.getenv("RERANK_DASHSCOPE_ENDPOINT") - or self._config.get("endpoint") - or "https://dashscope.aliyuncs.com/compatible-api/v1/reranks" + self._config.get("endpoint") or "https://dashscope.aliyuncs.com/compatible-api/v1/reranks" ).strip() self._api_key_env = str(self._config.get("api_key_env") or "").strip() self._api_key = str(os.getenv(self._api_key_env) or "").strip().strip('"').strip("'") - self._timeout_sec = float( - os.getenv("RERANK_DASHSCOPE_TIMEOUT_SEC") - or self._config.get("timeout_sec") - or 15.0 - ) - self._top_n_cap = int( - os.getenv("RERANK_DASHSCOPE_TOP_N_CAP") - or self._config.get("top_n_cap") - or 0 - ) - self._batchsize = int( - os.getenv("RERANK_DASHSCOPE_BATCHSIZE") - or self._config.get("batchsize") - or 0 - ) + self._timeout_sec = float(self._config.get("timeout_sec") or 15.0) + self._top_n_cap = int(self._config.get("top_n_cap") or 0) + self._batchsize = int(self._config.get("batchsize") or 0) self._instruct = str(self._config.get("instruct") or "").strip() self._max_retries = int(self._config.get("max_retries", 1)) self._retry_backoff_sec = float(self._config.get("retry_backoff_sec", 0.2)) diff --git a/reranker/config.py b/reranker/config.py index 2f19b1f..fc6379a 100644 --- a/reranker/config.py +++ b/reranker/config.py @@ -1,27 +1,31 @@ -"""Reranker service configuration (simple Python config).""" +"""Reranker service compatibility config derived from unified app config.""" -import os +from __future__ import annotations + +from config.loader import get_app_config class RerankerConfig(object): - # Server - HOST = os.getenv("RERANKER_HOST", "0.0.0.0") - PORT = int(os.getenv("RERANKER_PORT", 6007)) - - # Model - MODEL_NAME = "Qwen/Qwen3-Reranker-0.6B" - DEVICE = None # None -> auto (cuda if available) - USE_FP16 = True - BATCH_SIZE = 64 - MAX_LENGTH = 512 - CACHE_DIR = "./model_cache" - ENABLE_WARMUP = True - - # Request limits - MAX_DOCS = 1000 - - # Output - NORMALIZE = True + def __init__(self) -> None: + app_config = get_app_config() + runtime = app_config.runtime + service = app_config.services.rerank + backend = service.get_backend_config() + request = service.request + + self.HOST = runtime.reranker_host + self.PORT = runtime.reranker_port + + self.MODEL_NAME = str(backend.get("model_name") or "Qwen/Qwen3-Reranker-0.6B") + self.DEVICE = backend.get("device") + self.USE_FP16 = bool(backend.get("use_fp16", True)) + self.BATCH_SIZE = int(backend.get("batch_size", backend.get("infer_batch_size", 64))) + self.MAX_LENGTH = int(backend.get("max_length", 512)) + self.CACHE_DIR = str(backend.get("cache_dir") or "./model_cache") + self.ENABLE_WARMUP = bool(backend.get("enable_warmup", True)) + + self.MAX_DOCS = int(request.get("max_docs", 1000)) + self.NORMALIZE = bool(request.get("normalize", True)) CONFIG = RerankerConfig() diff --git a/suggestion/builder.py b/suggestion/builder.py index 2914c2d..c8e2c47 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -18,7 +18,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple from sqlalchemy import text -from config.env_config import ES_INDEX_NAMESPACE +from config.loader import get_app_config from config.tenant_config_loader import get_tenant_config_loader from suggestion.mapping import build_suggestion_mapping from utils.es_client import ESClient @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) def _index_prefix() -> str: - return ES_INDEX_NAMESPACE or "" + return get_app_config().runtime.index_namespace or "" def get_suggestion_alias_name(tenant_id: str) -> str: diff --git a/translation/backends/deepl.py b/translation/backends/deepl.py index 85dfbeb..6921712 100644 --- a/translation/backends/deepl.py +++ b/translation/backends/deepl.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -import os import re from typing import List, Optional, Sequence, Tuple, Union @@ -24,7 +23,7 @@ class DeepLTranslationBackend: timeout: float, glossary_id: Optional[str] = None, ) -> None: - self.api_key = api_key or os.getenv("DEEPL_AUTH_KEY") + self.api_key = api_key self.api_url = api_url self.timeout = float(timeout) self.glossary_id = glossary_id diff --git a/translation/backends/llm.py b/translation/backends/llm.py index 2cdaa63..507f892 100644 --- a/translation/backends/llm.py +++ b/translation/backends/llm.py @@ -3,13 +3,11 @@ from __future__ import annotations import logging -import os import time from typing import List, Optional, Sequence, Union from openai import OpenAI -from config.env_config import DASHSCOPE_API_KEY from translation.languages import LANGUAGE_LABELS from translation.prompts import TRANSLATION_PROMPTS from translation.scenes import normalize_scene_name @@ -52,11 +50,13 @@ class LLMTranslationBackend: model: str, timeout_sec: float, base_url: str, + api_key: Optional[str], ) -> None: self.capability_name = capability_name self.model = model self.timeout_sec = float(timeout_sec) self.base_url = base_url + self.api_key = api_key self.client = self._create_client() @property @@ -64,12 +64,11 @@ class LLMTranslationBackend: return True def _create_client(self) -> Optional[OpenAI]: - api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - if not api_key: + if not self.api_key: logger.warning("DASHSCOPE_API_KEY not set; llm translation unavailable") return None try: - return OpenAI(api_key=api_key, base_url=self.base_url) + return OpenAI(api_key=self.api_key, base_url=self.base_url) except Exception as exc: logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True) return None diff --git a/translation/backends/qwen_mt.py b/translation/backends/qwen_mt.py index 297b409..5f78dec 100644 --- a/translation/backends/qwen_mt.py +++ b/translation/backends/qwen_mt.py @@ -3,14 +3,12 @@ from __future__ import annotations import logging -import os import re import time from typing import List, Optional, Sequence, Union from openai import OpenAI -from config.env_config import DASHSCOPE_API_KEY from translation.languages import QWEN_LANGUAGE_CODES logger = logging.getLogger(__name__) @@ -64,7 +62,7 @@ class QwenMTTranslationBackend: @staticmethod def _default_api_key(model: str) -> Optional[str]: del model - return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") + return None def translate( self, diff --git a/translation/cache.py b/translation/cache.py index 72aec6e..da292e2 100644 --- a/translation/cache.py +++ b/translation/cache.py @@ -6,9 +6,12 @@ import hashlib import logging from typing import Mapping, Optional -import redis +try: + import redis +except ImportError: # pragma: no cover - runtime fallback for minimal envs + redis = None # type: ignore[assignment] -from config.env_config import REDIS_CONFIG +from config.loader import get_app_config logger = logging.getLogger(__name__) @@ -70,15 +73,19 @@ class TranslationCache: @staticmethod def _init_redis_client() -> Optional[redis.Redis]: + if redis is None: + logger.warning("redis package is not installed; translation cache disabled") + return None + redis_config = get_app_config().infrastructure.redis try: client = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), + host=redis_config.host, + port=redis_config.port, + password=redis_config.password, decode_responses=True, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + socket_timeout=redis_config.socket_timeout, + socket_connect_timeout=redis_config.socket_connect_timeout, + retry_on_timeout=redis_config.retry_on_timeout, health_check_interval=10, ) client.ping() diff --git a/translation/client.py b/translation/client.py index 6896f5c..103a4bd 100644 --- a/translation/client.py +++ b/translation/client.py @@ -7,7 +7,7 @@ from typing import List, Optional, Sequence, Union import requests -from config.services_config import get_translation_config +from config.loader import get_app_config from translation.settings import normalize_translation_model, normalize_translation_scene logger = logging.getLogger(__name__) @@ -24,7 +24,7 @@ class TranslationServiceClient: default_scene: Optional[str] = None, timeout_sec: Optional[float] = None, ) -> None: - cfg = get_translation_config() + cfg = get_app_config().services.translation.as_dict() self.base_url = str(base_url or cfg["service_url"]).rstrip("/") self.default_model = normalize_translation_model(cfg, default_model or cfg["default_model"]) self.default_scene = normalize_translation_scene(cfg, default_scene or cfg["default_scene"]) diff --git a/translation/service.py b/translation/service.py index c0011bf..456898b 100644 --- a/translation/service.py +++ b/translation/service.py @@ -5,7 +5,8 @@ from __future__ import annotations import logging from typing import Dict, List, Optional -from config.services_config import get_translation_config +from config.loader import get_app_config +from config.schema import AppConfig from translation.cache import TranslationCache from translation.protocols import TranslateInput, TranslateOutput, TranslationBackendProtocol from translation.settings import ( @@ -22,8 +23,9 @@ logger = logging.getLogger(__name__) class TranslationService: """Owns translation backends and routes calls by model and scene.""" - def __init__(self, config: Optional[TranslationConfig] = None) -> None: - self.config = config or get_translation_config() + def __init__(self, config: Optional[TranslationConfig] = None, app_config: Optional[AppConfig] = None) -> None: + self._app_config = app_config or get_app_config() + self.config = config or self._app_config.services.translation.as_dict() self._enabled_capabilities = self._collect_enabled_capabilities() if not self._enabled_capabilities: raise ValueError("No enabled translation backends found in services.translation.capabilities") @@ -85,7 +87,7 @@ class TranslationService: capability_name=name, model=str(cfg["model"]).strip(), base_url=str(cfg["base_url"]).strip(), - api_key=cfg.get("api_key"), + api_key=self._app_config.infrastructure.secrets.dashscope_api_key, timeout=int(cfg["timeout_sec"]), glossary_id=cfg.get("glossary_id"), ) @@ -94,7 +96,7 @@ class TranslationService: from translation.backends.deepl import DeepLTranslationBackend return DeepLTranslationBackend( - api_key=cfg.get("api_key"), + api_key=self._app_config.infrastructure.secrets.deepl_auth_key, api_url=str(cfg["api_url"]).strip(), timeout=float(cfg["timeout_sec"]), glossary_id=cfg.get("glossary_id"), @@ -108,6 +110,7 @@ class TranslationService: model=str(cfg["model"]).strip(), timeout_sec=float(cfg["timeout_sec"]), base_url=str(cfg["base_url"]).strip(), + api_key=self._app_config.infrastructure.secrets.dashscope_api_key, ) def _create_local_nllb_backend(self, *, name: str, cfg: Dict[str, object]) -> TranslationBackendProtocol: diff --git a/utils/es_client.py b/utils/es_client.py index 81c4564..8759589 100644 --- a/utils/es_client.py +++ b/utils/es_client.py @@ -5,10 +5,9 @@ Elasticsearch client wrapper. from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from typing import Dict, Any, List, Optional -import os import logging -from config.env_config import ES_CONFIG +from config.loader import get_app_config logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ class ESClient: **kwargs: Additional ES client parameters """ if hosts is None: - hosts = [os.getenv('ES_HOST', 'http://localhost:9200')] + hosts = [get_app_config().infrastructure.elasticsearch.host] # Build client config client_config = { @@ -325,16 +324,9 @@ def get_es_client_from_env() -> ESClient: Returns: ESClient instance """ - if ES_CONFIG: - return ESClient( - hosts=[ES_CONFIG['host']], - username=ES_CONFIG.get('username'), - password=ES_CONFIG.get('password') - ) - else: - # Fallback to env variables - return ESClient( - hosts=[os.getenv('ES_HOST', 'http://localhost:9200')], - username=os.getenv('ES_USERNAME'), - password=os.getenv('ES_PASSWORD') - ) + cfg = get_app_config().infrastructure.elasticsearch + return ESClient( + hosts=[cfg.host], + username=cfg.username, + password=cfg.password, + ) -- libgit2 0.21.2