schema.py 9.65 KB
"""
Typed configuration schema for the unified application configuration.

This module defines the normalized in-memory structure used by all services.
"""

from __future__ import annotations

from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple


@dataclass(frozen=True)
class IndexConfig:
    """Deprecated compatibility shape for legacy diagnostics/tests."""

    name: str
    label: str
    fields: List[str]
    boost: float = 1.0
    example: Optional[str] = None


@dataclass(frozen=True)
class QueryConfig:
    """Configuration for query processing."""

    supported_languages: List[str] = field(default_factory=lambda: ["zh", "en"])
    default_language: str = "en"
    enable_text_embedding: bool = True
    enable_query_rewrite: bool = True
    rewrite_dictionary: Dict[str, str] = field(default_factory=dict)
    text_embedding_field: Optional[str] = "title_embedding"
    image_embedding_field: Optional[str] = None
    source_fields: Optional[List[str]] = None
    knn_boost: float = 0.25
    multilingual_fields: List[str] = field(
        default_factory=lambda: [
            "title",
            "brief",
            "description",
            "vendor",
            "category_path",
            "category_name_text",
        ]
    )
    shared_fields: List[str] = field(
        default_factory=lambda: ["tags", "option1_values", "option2_values", "option3_values"]
    )
    core_multilingual_fields: List[str] = field(
        default_factory=lambda: ["title", "brief", "vendor", "category_name_text"]
    )
    base_minimum_should_match: str = "70%"
    translation_minimum_should_match: str = "70%"
    translation_boost: float = 0.4
    tie_breaker_base_query: float = 0.9
    best_fields: Dict[str, float] = field(default_factory=dict)
    best_fields_boost: float = 2.0
    phrase_fields: Dict[str, float] = field(default_factory=dict)
    phrase_match_boost: float = 3.0
    zh_to_en_model: str = "opus-mt-zh-en"
    en_to_zh_model: str = "opus-mt-en-zh"
    default_translation_model: str = "nllb-200-distilled-600m"
    # 检测语种不在租户 index_languages(无可直接命中的多语字段)时使用;None 表示与上一组同模型。
    zh_to_en_model_source_not_in_index: Optional[str] = None
    en_to_zh_model_source_not_in_index: Optional[str] = None
    default_translation_model_source_not_in_index: Optional[str] = None
    # 查询阶段:翻译与向量生成并发提交后,共用同一等待预算(毫秒)。
    # 检测语言已在租户 index_languages 内:偏快返回,预算较短。
    # 检测语言不在 index_languages 内:翻译对召回更关键,预算较长。
    translation_embedding_wait_budget_ms_source_in_index: int = 80
    translation_embedding_wait_budget_ms_source_not_in_index: int = 200


@dataclass(frozen=True)
class SPUConfig:
    """SPU aggregation/search configuration."""

    enabled: bool = False
    spu_field: Optional[str] = None
    inner_hits_size: int = 3
    searchable_option_dimensions: List[str] = field(
        default_factory=lambda: ["option1", "option2", "option3"]
    )


@dataclass(frozen=True)
class FunctionScoreConfig:
    """Function score configuration."""

    score_mode: str = "sum"
    boost_mode: str = "multiply"
    functions: List[Dict[str, Any]] = field(default_factory=list)


@dataclass(frozen=True)
class RerankConfig:
    """Search-time rerank configuration."""

    enabled: bool = True
    rerank_window: int = 384
    timeout_sec: float = 15.0
    weight_es: float = 0.4
    weight_ai: float = 0.6
    rerank_query_template: str = "{query}"
    rerank_doc_template: str = "{title}"


@dataclass(frozen=True)
class SearchConfig:
    """Search behavior configuration shared by backend and indexer."""

    field_boosts: Dict[str, float]
    indexes: List[IndexConfig] = field(default_factory=list)
    query_config: QueryConfig = field(default_factory=QueryConfig)
    function_score: FunctionScoreConfig = field(default_factory=FunctionScoreConfig)
    rerank: RerankConfig = field(default_factory=RerankConfig)
    spu_config: SPUConfig = field(default_factory=SPUConfig)
    es_index_name: str = "search_products"
    es_settings: Dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True)
class TranslationServiceConfig:
    """Translator service configuration."""

    endpoint: str
    timeout_sec: float
    default_model: str
    default_scene: str
    cache: Dict[str, Any]
    capabilities: Dict[str, Dict[str, Any]]

    def as_dict(self) -> Dict[str, Any]:
        return {
            "service_url": self.endpoint,
            "timeout_sec": self.timeout_sec,
            "default_model": self.default_model,
            "default_scene": self.default_scene,
            "cache": self.cache,
            "capabilities": self.capabilities,
        }


@dataclass(frozen=True)
class EmbeddingServiceConfig:
    """Embedding service configuration."""

    provider: str
    providers: Dict[str, Any]
    backend: str
    backends: Dict[str, Dict[str, Any]]
    image_backend: str
    image_backends: Dict[str, Dict[str, Any]]

    def get_provider_config(self) -> Dict[str, Any]:
        return dict(self.providers.get(self.provider, {}) or {})

    def get_backend_config(self) -> Dict[str, Any]:
        return dict(self.backends.get(self.backend, {}) or {})

    def get_image_backend_config(self) -> Dict[str, Any]:
        return dict(self.image_backends.get(self.image_backend, {}) or {})


@dataclass(frozen=True)
class RerankServiceConfig:
    """Reranker service configuration."""

    provider: str
    providers: Dict[str, Any]
    backend: str
    backends: Dict[str, Dict[str, Any]]
    request: Dict[str, Any]

    def get_provider_config(self) -> Dict[str, Any]:
        return dict(self.providers.get(self.provider, {}) or {})

    def get_backend_config(self) -> Dict[str, Any]:
        return dict(self.backends.get(self.backend, {}) or {})


@dataclass(frozen=True)
class ServicesConfig:
    """All service-level configuration."""

    translation: TranslationServiceConfig
    embedding: EmbeddingServiceConfig
    rerank: RerankServiceConfig


@dataclass(frozen=True)
class TenantCatalogConfig:
    """Tenant catalog configuration."""

    default: Dict[str, Any]
    tenants: Dict[str, Dict[str, Any]]

    def get_raw(self) -> Dict[str, Any]:
        return {
            "default": dict(self.default),
            "tenants": {str(key): dict(value) for key, value in self.tenants.items()},
        }


@dataclass(frozen=True)
class ElasticsearchSettings:
    host: str = "http://localhost:9200"
    username: Optional[str] = None
    password: Optional[str] = None


@dataclass(frozen=True)
class RedisSettings:
    host: str = "localhost"
    port: int = 6479
    snapshot_db: int = 0
    password: Optional[str] = None
    socket_timeout: int = 1
    socket_connect_timeout: int = 1
    retry_on_timeout: bool = False
    cache_expire_days: int = 720
    embedding_cache_prefix: str = "embedding"
    anchor_cache_prefix: str = "product_anchors"
    anchor_cache_expire_days: int = 30


@dataclass(frozen=True)
class DatabaseSettings:
    host: Optional[str] = None
    port: int = 3306
    database: Optional[str] = None
    username: Optional[str] = None
    password: Optional[str] = None


@dataclass(frozen=True)
class SecretsConfig:
    dashscope_api_key: Optional[str] = None
    deepl_auth_key: Optional[str] = None


@dataclass(frozen=True)
class InfrastructureConfig:
    elasticsearch: ElasticsearchSettings
    redis: RedisSettings
    database: DatabaseSettings
    secrets: SecretsConfig


@dataclass(frozen=True)
class ProductEnrichConfig:
    """Configuration for LLM-based product content understanding (enrich-content)."""

    max_workers: int = 40


@dataclass(frozen=True)
class RuntimeConfig:
    environment: str = "prod"
    index_namespace: str = ""
    api_host: str = "0.0.0.0"
    api_port: int = 6002
    indexer_host: str = "0.0.0.0"
    indexer_port: int = 6004
    embedding_host: str = "0.0.0.0"
    embedding_port: int = 6005
    embedding_text_port: int = 6005
    embedding_image_port: int = 6008
    translator_host: str = "0.0.0.0"
    translator_port: int = 6006
    reranker_host: str = "0.0.0.0"
    reranker_port: int = 6007


@dataclass(frozen=True)
class AssetsConfig:
    query_rewrite_dictionary_path: Path


@dataclass(frozen=True)
class ConfigMetadata:
    loaded_files: Tuple[str, ...]
    config_hash: str
    deprecated_keys: Tuple[str, ...] = field(default_factory=tuple)


@dataclass(frozen=True)
class AppConfig:
    """Root application configuration."""

    runtime: RuntimeConfig
    infrastructure: InfrastructureConfig
    product_enrich: ProductEnrichConfig
    search: SearchConfig
    services: ServicesConfig
    tenants: TenantCatalogConfig
    assets: AssetsConfig
    metadata: ConfigMetadata

    def sanitized_dict(self) -> Dict[str, Any]:
        data = asdict(self)
        data["infrastructure"]["elasticsearch"]["password"] = _mask_secret(
            data["infrastructure"]["elasticsearch"].get("password")
        )
        data["infrastructure"]["database"]["password"] = _mask_secret(
            data["infrastructure"]["database"].get("password")
        )
        data["infrastructure"]["redis"]["password"] = _mask_secret(
            data["infrastructure"]["redis"].get("password")
        )
        data["infrastructure"]["secrets"]["dashscope_api_key"] = _mask_secret(
            data["infrastructure"]["secrets"].get("dashscope_api_key")
        )
        data["infrastructure"]["secrets"]["deepl_auth_key"] = _mask_secret(
            data["infrastructure"]["secrets"].get("deepl_auth_key")
        )
        return data


def _mask_secret(value: Optional[str]) -> Optional[str]:
    if not value:
        return value
    return "***"