""" Configuration loader and validator for search engine configurations. This module handles loading, parsing, and validating YAML configuration files that define how search should be executed (NOT how data should be indexed). 索引结构由 mappings/search_products.json 定义。 此配置只定义搜索行为:字段权重、搜索域、查询策略等。 """ import yaml from typing import Dict, Any, List, Optional from dataclasses import dataclass, field from pathlib import Path @dataclass class IndexConfig: """Configuration for an index domain (e.g., default, title, brand).""" name: str label: str fields: List[str] # List of field names to include in this search domain boost: float = 1.0 example: Optional[str] = None @dataclass class QueryConfig: """Configuration for query processing.""" supported_languages: List[str] = field(default_factory=lambda: ["zh", "en"]) default_language: str = "en" # Feature flags enable_text_embedding: bool = True enable_query_rewrite: bool = True # Query rewrite dictionary (loaded from external file) rewrite_dictionary: Dict[str, str] = field(default_factory=dict) # Embedding field names text_embedding_field: Optional[str] = "title_embedding" image_embedding_field: Optional[str] = None # Source fields configuration source_fields: Optional[List[str]] = None # KNN boost configuration knn_boost: float = 0.25 # Boost value for KNN (embedding recall) # Dynamic text fields for multi-language retrieval multilingual_fields: List[str] = field( default_factory=lambda: ["title", "brief", "description", "vendor", "category_path", "category_name_text"] ) shared_fields: List[str] = field( default_factory=lambda: ["tags", "option1_values", "option2_values", "option3_values"] ) core_multilingual_fields: List[str] = field( default_factory=lambda: ["title", "brief", "vendor", "category_name_text"] ) # Unified text strategy tuning base_minimum_should_match: str = "75%" translation_minimum_should_match: str = "75%" translation_boost: float = 0.4 translation_boost_when_source_missing: float = 1.0 source_boost_when_missing: float = 0.6 original_query_fallback_boost_when_translation_missing: float = 0.2 tie_breaker_base_query: float = 0.9 @dataclass class SPUConfig: """Configuration for SPU aggregation.""" enabled: bool = False spu_field: Optional[str] = None inner_hits_size: int = 3 # 配置哪些option维度参与检索(进索引、以及在线搜索) searchable_option_dimensions: List[str] = field(default_factory=lambda: ['option1', 'option2', 'option3']) @dataclass class FunctionScoreConfig: """Function Score配置(ES层打分规则)""" score_mode: str = "sum" boost_mode: str = "multiply" functions: List[Dict[str, Any]] = field(default_factory=list) @dataclass class RerankConfig: """重排配置(provider/URL 在 services.rerank)""" enabled: bool = True rerank_window: int = 384 timeout_sec: float = 15.0 weight_es: float = 0.4 weight_ai: float = 0.6 rerank_query_template: str = "{query}" rerank_doc_template: str = "{title}" @dataclass class SearchConfig: """Complete configuration for search engine (multi-tenant).""" # 字段权重配置(用于搜索) field_boosts: Dict[str, float] # Legacy index domains (deprecated; kept for compatibility) indexes: List[IndexConfig] # Query processing query_config: QueryConfig # Function Score configuration (ES层打分) function_score: FunctionScoreConfig # Rerank configuration (本地重排) rerank: RerankConfig # SPU configuration spu_config: SPUConfig # ES index settings es_index_name: str # Tenant configuration tenant_config: Dict[str, Any] = field(default_factory=dict) # ES settings es_settings: Dict[str, Any] = field(default_factory=dict) # Extensible service/provider registry (translation/embedding/rerank/...) services: Dict[str, Any] = field(default_factory=dict) class ConfigurationError(Exception): """Raised when configuration validation fails.""" pass class ConfigLoader: """Loads and validates unified search engine configuration from YAML file.""" def __init__(self, config_file: Optional[Path] = None): """ Initialize config loader. Args: config_file: Path to config YAML file (defaults to config/config.yaml) """ if config_file is None: config_file = Path(__file__).parent / "config.yaml" self.config_file = Path(config_file) def _load_rewrite_dictionary(self) -> Dict[str, str]: """Load query rewrite dictionary from external file.""" rewrite_file = Path(__file__).parent / "rewrite_dictionary.txt" rewrite_dict = {} if not rewrite_file.exists(): return rewrite_dict try: with open(rewrite_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) >= 2: original = parts[0].strip() replacement = parts[1].strip() if original and replacement: rewrite_dict[original] = replacement except Exception as e: print(f"Warning: Failed to load rewrite dictionary: {e}") return rewrite_dict def load_config(self, validate: bool = True) -> SearchConfig: """ Load unified configuration from YAML file. Args: validate: Whether to validate configuration after loading Returns: SearchConfig object Raises: ConfigurationError: If config file not found, invalid, or validation fails """ if not self.config_file.exists(): raise ConfigurationError(f"Configuration file not found: {self.config_file}") try: with open(self.config_file, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) except yaml.YAMLError as e: raise ConfigurationError(f"Invalid YAML in {self.config_file}: {e}") config = self._parse_config(config_data) # Auto-validate configuration if validate: errors = self.validate_config(config) if errors: error_msg = "Configuration validation failed:\n" + "\n".join(f" - {err}" for err in errors) raise ConfigurationError(error_msg) return config def _parse_config(self, config_data: Dict[str, Any]) -> SearchConfig: """Parse configuration dictionary into SearchConfig object.""" # Parse field_boosts field_boosts = config_data.get("field_boosts", {}) if not isinstance(field_boosts, dict): raise ConfigurationError("field_boosts must be a dictionary") # Parse indexes (deprecated; compatibility only) indexes = [] for index_data in config_data.get("indexes", []): indexes.append(self._parse_index_config(index_data)) # Parse query config query_config_data = config_data.get("query_config", {}) rewrite_dictionary = self._load_rewrite_dictionary() search_fields_cfg = query_config_data.get("search_fields", {}) text_strategy_cfg = query_config_data.get("text_query_strategy", {}) query_config = QueryConfig( supported_languages=query_config_data.get("supported_languages") or ["zh", "en"], default_language=query_config_data.get("default_language") or "en", enable_text_embedding=query_config_data.get("enable_text_embedding", True), enable_query_rewrite=query_config_data.get("enable_query_rewrite", True), rewrite_dictionary=rewrite_dictionary, text_embedding_field=query_config_data.get("text_embedding_field"), image_embedding_field=query_config_data.get("image_embedding_field"), source_fields=query_config_data.get("source_fields"), knn_boost=query_config_data.get("knn_boost", 0.25), multilingual_fields=search_fields_cfg.get( "multilingual_fields", ["title", "brief", "description", "vendor", "category_path", "category_name_text"], ), shared_fields=search_fields_cfg.get( "shared_fields", ["tags", "option1_values", "option2_values", "option3_values"], ), core_multilingual_fields=search_fields_cfg.get( "core_multilingual_fields", ["title", "brief", "vendor", "category_name_text"], ), base_minimum_should_match=str(text_strategy_cfg.get("base_minimum_should_match", "75%")), translation_minimum_should_match=str(text_strategy_cfg.get("translation_minimum_should_match", "75%")), translation_boost=float(text_strategy_cfg.get("translation_boost", 0.4)), translation_boost_when_source_missing=float( text_strategy_cfg.get("translation_boost_when_source_missing", 1.0) ), source_boost_when_missing=float(text_strategy_cfg.get("source_boost_when_missing", 0.6)), original_query_fallback_boost_when_translation_missing=float( text_strategy_cfg.get("original_query_fallback_boost_when_translation_missing", 0.2) ), tie_breaker_base_query=float(text_strategy_cfg.get("tie_breaker_base_query", 0.9)), ) # Parse Function Score configuration fs_data = config_data.get("function_score", {}) function_score = FunctionScoreConfig( score_mode=fs_data.get("score_mode") or "sum", boost_mode=fs_data.get("boost_mode") or "multiply", functions=fs_data.get("functions") or [] ) # Parse Rerank (provider/URL in services.rerank) rerank_data = config_data.get("rerank", {}) rerank = RerankConfig( enabled=bool(rerank_data.get("enabled", True)), rerank_window=int(rerank_data.get("rerank_window", 384)), timeout_sec=float(rerank_data.get("timeout_sec", 15.0)), weight_es=float(rerank_data.get("weight_es", 0.4)), weight_ai=float(rerank_data.get("weight_ai", 0.6)), rerank_query_template=str(rerank_data.get("rerank_query_template") or "{query}"), rerank_doc_template=str(rerank_data.get("rerank_doc_template") or "{title}"), ) # Parse SPU config spu_data = config_data.get("spu_config", {}) spu_config = SPUConfig( enabled=spu_data.get("enabled", False), spu_field=spu_data.get("spu_field"), inner_hits_size=spu_data.get("inner_hits_size", 3), searchable_option_dimensions=spu_data.get("searchable_option_dimensions", ['option1', 'option2', 'option3']) ) # Parse tenant config tenant_config_data = config_data.get("tenant_config", {}) # Parse extensible services/provider registry services_data = config_data.get("services", {}) or {} if not isinstance(services_data, dict): raise ConfigurationError("services must be a dictionary if provided") return SearchConfig( field_boosts=field_boosts, indexes=indexes, query_config=query_config, function_score=function_score, rerank=rerank, spu_config=spu_config, tenant_config=tenant_config_data, es_index_name=config_data.get("es_index_name", "search_products"), es_settings=config_data.get("es_settings", {}), services=services_data ) def _parse_index_config(self, index_data: Dict[str, Any]) -> IndexConfig: """Parse index configuration from dictionary.""" return IndexConfig( name=index_data["name"], label=index_data.get("label", index_data["name"]), fields=index_data.get("fields", []), boost=index_data.get("boost", 1.0), example=index_data.get("example") ) def validate_config(self, config: SearchConfig) -> List[str]: """ Validate configuration for common errors. Args: config: SearchConfig to validate Returns: List of error messages (empty if valid) """ errors = [] # Validate es_index_name if not config.es_index_name: errors.append("es_index_name is required") # Validate field_boosts if not config.field_boosts: errors.append("field_boosts is empty") for field_name, boost in config.field_boosts.items(): if not isinstance(boost, (int, float)): errors.append(f"field_boosts['{field_name}']: boost must be a number, got {type(boost).__name__}") elif boost < 0: errors.append(f"field_boosts['{field_name}']: boost must be non-negative") # Validate indexes (deprecated, optional) index_names = set() for index in config.indexes: # Check for duplicate index names if index.name in index_names: errors.append(f"Duplicate index name: {index.name}") index_names.add(index.name) # Validate fields in index if not index.fields: errors.append(f"Index '{index.name}': fields list is empty") # Validate SPU config if config.spu_config.enabled: if not config.spu_config.spu_field: errors.append("SPU aggregation enabled but no spu_field specified") # Validate query config if not config.query_config.supported_languages: errors.append("At least one supported language must be specified") if config.query_config.default_language not in config.query_config.supported_languages: errors.append( f"Default language '{config.query_config.default_language}' " f"not in supported languages: {config.query_config.supported_languages}" ) # Validate dynamic search fields def _validate_str_list(name: str, values: List[str]) -> None: if not isinstance(values, list) or not values: errors.append(f"query_config.{name} must be a non-empty list[str]") return for i, val in enumerate(values): if not isinstance(val, str) or not val.strip(): errors.append(f"query_config.{name}[{i}] must be a non-empty string") _validate_str_list("multilingual_fields", config.query_config.multilingual_fields) _validate_str_list("shared_fields", config.query_config.shared_fields) _validate_str_list("core_multilingual_fields", config.query_config.core_multilingual_fields) core_set = set(config.query_config.core_multilingual_fields) multi_set = set(config.query_config.multilingual_fields) if not core_set.issubset(multi_set): errors.append("query_config.core_multilingual_fields must be subset of multilingual_fields") # Validate text query strategy numbers for name in ( "translation_boost", "translation_boost_when_source_missing", "source_boost_when_missing", "original_query_fallback_boost_when_translation_missing", "tie_breaker_base_query", ): value = getattr(config.query_config, name, None) if not isinstance(value, (int, float)): errors.append(f"query_config.{name} must be a number") elif value < 0: errors.append(f"query_config.{name} must be non-negative") # Validate source_fields tri-state semantics source_fields = config.query_config.source_fields if source_fields is not None: if not isinstance(source_fields, list): errors.append("query_config.source_fields must be null or list[str]") else: for idx, field_name in enumerate(source_fields): if not isinstance(field_name, str) or not field_name.strip(): errors.append( f"query_config.source_fields[{idx}] must be a non-empty string" ) # Validate tenant config shape (default must exist in config) tenant_cfg = config.tenant_config if not isinstance(tenant_cfg, dict): errors.append("tenant_config must be an object") else: default_cfg = tenant_cfg.get("default") if not isinstance(default_cfg, dict): errors.append("tenant_config.default must be configured") else: index_languages = default_cfg.get("index_languages") if not isinstance(index_languages, list) or len(index_languages) == 0: errors.append("tenant_config.default.index_languages must be a non-empty list") return errors def to_dict(self, config: SearchConfig) -> Dict[str, Any]: """Convert SearchConfig to dictionary representation.""" # Build query_config dict query_config_dict = { "supported_languages": config.query_config.supported_languages, "default_language": config.query_config.default_language, "enable_text_embedding": config.query_config.enable_text_embedding, "enable_query_rewrite": config.query_config.enable_query_rewrite, "text_embedding_field": config.query_config.text_embedding_field, "image_embedding_field": config.query_config.image_embedding_field, "source_fields": config.query_config.source_fields, "search_fields": { "multilingual_fields": config.query_config.multilingual_fields, "shared_fields": config.query_config.shared_fields, "core_multilingual_fields": config.query_config.core_multilingual_fields, }, "text_query_strategy": { "base_minimum_should_match": config.query_config.base_minimum_should_match, "translation_minimum_should_match": config.query_config.translation_minimum_should_match, "translation_boost": config.query_config.translation_boost, "translation_boost_when_source_missing": config.query_config.translation_boost_when_source_missing, "source_boost_when_missing": config.query_config.source_boost_when_missing, "original_query_fallback_boost_when_translation_missing": ( config.query_config.original_query_fallback_boost_when_translation_missing ), "tie_breaker_base_query": config.query_config.tie_breaker_base_query, } } return { "es_index_name": config.es_index_name, "es_settings": config.es_settings, "field_boosts": config.field_boosts, "indexes": [self._index_to_dict(index) for index in config.indexes], "query_config": query_config_dict, "function_score": { "score_mode": config.function_score.score_mode, "boost_mode": config.function_score.boost_mode, "functions": config.function_score.functions }, "rerank": { "enabled": config.rerank.enabled, "rerank_window": config.rerank.rerank_window, "timeout_sec": config.rerank.timeout_sec, "weight_es": config.rerank.weight_es, "weight_ai": config.rerank.weight_ai, "rerank_query_template": config.rerank.rerank_query_template, "rerank_doc_template": config.rerank.rerank_doc_template, }, "spu_config": { "enabled": config.spu_config.enabled, "spu_field": config.spu_config.spu_field, "inner_hits_size": config.spu_config.inner_hits_size, "searchable_option_dimensions": config.spu_config.searchable_option_dimensions }, "services": config.services, } def _index_to_dict(self, index: IndexConfig) -> Dict[str, Any]: """Convert IndexConfig to dictionary.""" result = { "name": index.name, "label": index.label, "fields": index.fields, "boost": index.boost } if index.example: result["example"] = index.example return result