mapping_generator.py 3.36 KB
"""
Elasticsearch mapping loader.

Loads Elasticsearch index mapping from JSON file.
"""

from typing import Dict, Any
import json
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

# Default index name
DEFAULT_INDEX_NAME = "search_products"

# Default mapping file path
DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json"


def load_mapping(mapping_file: str = None) -> Dict[str, Any]:
        """
    Load Elasticsearch mapping from JSON file.

        Args:
        mapping_file: Path to mapping JSON file. If None, uses default.

        Returns:
        Dictionary containing index configuration (settings + mappings)

    Raises:
        FileNotFoundError: If mapping file doesn't exist
        json.JSONDecodeError: If mapping file is invalid JSON
    """
    if mapping_file is None:
        mapping_file = str(DEFAULT_MAPPING_FILE)

    mapping_path = Path(mapping_file)
    if not mapping_path.exists():
        raise FileNotFoundError(f"Mapping file not found: {mapping_path}")

    with open(mapping_path, 'r', encoding='utf-8') as f:
        mapping = json.load(f)

    logger.info(f"Loaded mapping from {mapping_path}")
    return mapping


def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool:
    """
    Create Elasticsearch index if it doesn't exist.

    Args:
        es_client: ESClient instance
        index_name: Name of the index to create
        mapping: Index mapping configuration. If None, loads from default file.

    Returns:
        True if index was created, False if it already exists
    """
    if es_client.index_exists(index_name):
        logger.info(f"Index '{index_name}' already exists")
        return False

    if mapping is None:
        mapping = load_mapping()

    if es_client.create_index(index_name, mapping):
    logger.info(f"Index '{index_name}' created successfully")
    return True
    else:
        logger.error(f"Failed to create index '{index_name}'")
        return False


def delete_index_if_exists(es_client, index_name: str) -> bool:
    """
    Delete Elasticsearch index if it exists.

    Args:
        es_client: ESClient instance
        index_name: Name of the index to delete

    Returns:
        True if index was deleted, False if it didn't exist
    """
    if not es_client.index_exists(index_name):
        logger.warning(f"Index '{index_name}' does not exist")
        return False

    if es_client.delete_index(index_name):
    logger.info(f"Index '{index_name}' deleted successfully")
    return True
    else:
        logger.error(f"Failed to delete index '{index_name}'")
        return False


def update_mapping(es_client, index_name: str, new_fields: Dict[str, Any]) -> bool:
    """
    Update mapping for existing index (only adding new fields).

    Args:
        es_client: ESClient instance
        index_name: Name of the index
        new_fields: New field mappings to add

    Returns:
        True if successful
    """
    if not es_client.index_exists(index_name):
        logger.error(f"Index '{index_name}' does not exist")
        return False

    mapping = {"properties": new_fields}
    if es_client.update_mapping(index_name, mapping):
    logger.info(f"Mapping updated for index '{index_name}'")
    return True
    else:
        logger.error(f"Failed to update mapping for index '{index_name}'")
        return False