""" Elasticsearch mapping loader. Loads Elasticsearch index mapping from JSON file. """ from typing import Dict, Any import json import logging from pathlib import Path logger = logging.getLogger(__name__) # Default index name DEFAULT_INDEX_NAME = "search_products" # Default mapping file path DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json" def load_mapping(mapping_file: str = None) -> Dict[str, Any]: """ Load Elasticsearch mapping from JSON file. Args: mapping_file: Path to mapping JSON file. If None, uses default. Returns: Dictionary containing index configuration (settings + mappings) Raises: FileNotFoundError: If mapping file doesn't exist json.JSONDecodeError: If mapping file is invalid JSON """ if mapping_file is None: mapping_file = str(DEFAULT_MAPPING_FILE) mapping_path = Path(mapping_file) if not mapping_path.exists(): raise FileNotFoundError(f"Mapping file not found: {mapping_path}") with open(mapping_path, 'r', encoding='utf-8') as f: mapping = json.load(f) logger.info(f"Loaded mapping from {mapping_path}") return mapping def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool: """ Create Elasticsearch index if it doesn't exist. Args: es_client: ESClient instance index_name: Name of the index to create mapping: Index mapping configuration. If None, loads from default file. Returns: True if index was created, False if it already exists """ if es_client.index_exists(index_name): logger.info(f"Index '{index_name}' already exists") return False if mapping is None: mapping = load_mapping() if es_client.create_index(index_name, mapping): logger.info(f"Index '{index_name}' created successfully") return True else: logger.error(f"Failed to create index '{index_name}'") return False def delete_index_if_exists(es_client, index_name: str) -> bool: """ Delete Elasticsearch index if it exists. Args: es_client: ESClient instance index_name: Name of the index to delete Returns: True if index was deleted, False if it didn't exist """ if not es_client.index_exists(index_name): logger.warning(f"Index '{index_name}' does not exist") return False if es_client.delete_index(index_name): logger.info(f"Index '{index_name}' deleted successfully") return True else: logger.error(f"Failed to delete index '{index_name}'") return False def update_mapping(es_client, index_name: str, new_fields: Dict[str, Any]) -> bool: """ Update mapping for existing index (only adding new fields). Args: es_client: ESClient instance index_name: Name of the index new_fields: New field mappings to add Returns: True if successful """ if not es_client.index_exists(index_name): logger.error(f"Index '{index_name}' does not exist") return False mapping = {"properties": new_fields} if es_client.update_mapping(index_name, mapping): logger.info(f"Mapping updated for index '{index_name}'") return True else: logger.error(f"Failed to update mapping for index '{index_name}'") return False