mapping_generator.py
3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Elasticsearch mapping loader.
Loads Elasticsearch index mapping from JSON file.
"""
from typing import Dict, Any
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Default index name
DEFAULT_INDEX_NAME = "search_products"
# Default mapping file path
DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json"
def load_mapping(mapping_file: str = None) -> Dict[str, Any]:
"""
Load Elasticsearch mapping from JSON file.
Args:
mapping_file: Path to mapping JSON file. If None, uses default.
Returns:
Dictionary containing index configuration (settings + mappings)
Raises:
FileNotFoundError: If mapping file doesn't exist
json.JSONDecodeError: If mapping file is invalid JSON
"""
if mapping_file is None:
mapping_file = str(DEFAULT_MAPPING_FILE)
mapping_path = Path(mapping_file)
if not mapping_path.exists():
raise FileNotFoundError(f"Mapping file not found: {mapping_path}")
with open(mapping_path, 'r', encoding='utf-8') as f:
mapping = json.load(f)
logger.info(f"Loaded mapping from {mapping_path}")
return mapping
def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool:
"""
Create Elasticsearch index if it doesn't exist.
Args:
es_client: ESClient instance
index_name: Name of the index to create
mapping: Index mapping configuration. If None, loads from default file.
Returns:
True if index was created, False if it already exists
"""
if es_client.index_exists(index_name):
logger.info(f"Index '{index_name}' already exists")
return False
if mapping is None:
mapping = load_mapping()
if es_client.create_index(index_name, mapping):
logger.info(f"Index '{index_name}' created successfully")
return True
else:
logger.error(f"Failed to create index '{index_name}'")
return False
def delete_index_if_exists(es_client, index_name: str) -> bool:
"""
Delete Elasticsearch index if it exists.
Args:
es_client: ESClient instance
index_name: Name of the index to delete
Returns:
True if index was deleted, False if it didn't exist
"""
if not es_client.index_exists(index_name):
logger.warning(f"Index '{index_name}' does not exist")
return False
if es_client.delete_index(index_name):
logger.info(f"Index '{index_name}' deleted successfully")
return True
else:
logger.error(f"Failed to delete index '{index_name}'")
return False
def update_mapping(es_client, index_name: str, new_fields: Dict[str, Any]) -> bool:
"""
Update mapping for existing index (only adding new fields).
Args:
es_client: ESClient instance
index_name: Name of the index
new_fields: New field mappings to add
Returns:
True if successful
"""
if not es_client.index_exists(index_name):
logger.error(f"Index '{index_name}' does not exist")
return False
mapping = {"properties": new_fields}
if es_client.update_mapping(index_name, mapping):
logger.info(f"Mapping updated for index '{index_name}'")
return True
else:
logger.error(f"Failed to update mapping for index '{index_name}'")
return False