mapping_generator.py
4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Elasticsearch mapping loader.
Loads Elasticsearch index mapping from JSON file.
"""
from typing import Dict, Any
import json
import logging
from pathlib import Path
import os
from config.loader import get_app_config
logger = logging.getLogger(__name__)
# Default index name (deprecated, use get_tenant_index_name instead)
DEFAULT_INDEX_NAME = "search_products"
# Default mapping file path
DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json"
def get_tenant_index_name(tenant_id: str) -> str:
"""
Generate index name for a tenant.
索引命名规则统一为:
{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}
其中 ES_INDEX_NAMESPACE 由 config.env_config.ES_INDEX_NAMESPACE 控制,
用于区分 prod/uat/test 等不同运行环境。
"""
# Temporary override hooks (non-official, for ops/debug):
# - ES_INDEX_OVERRIDE_TENANT_<tenant_id>: absolute index name (without namespace auto-prefix)
# - ES_INDEX_OVERRIDE: absolute index name OR format string supporting "{tenant_id}"
#
# Examples:
# export ES_INDEX_OVERRIDE_TENANT_163="search_products_tenant_163_backup_20260415_1438"
# export ES_INDEX_OVERRIDE="search_products_tenant_{tenant_id}_backup_20260415_1438"
per_tenant_key = f"ES_INDEX_OVERRIDE_TENANT_{tenant_id}"
if (v := os.environ.get(per_tenant_key)):
return str(v)
if (v := os.environ.get("ES_INDEX_OVERRIDE")):
try:
return str(v).format(tenant_id=tenant_id)
except Exception:
return str(v)
prefix = get_app_config().runtime.index_namespace or ""
return f"{prefix}search_products_tenant_{tenant_id}"
def load_mapping(mapping_file: str = None) -> Dict[str, Any]:
"""
Load Elasticsearch mapping from JSON file.
Args:
mapping_file: Path to mapping JSON file. If None, uses default.
Returns:
Dictionary containing index configuration (settings + mappings)
Raises:
FileNotFoundError: If mapping file doesn't exist
json.JSONDecodeError: If mapping file is invalid JSON
"""
if mapping_file is None:
mapping_file = str(DEFAULT_MAPPING_FILE)
mapping_path = Path(mapping_file)
if not mapping_path.exists():
raise FileNotFoundError(f"Mapping file not found: {mapping_path}")
with open(mapping_path, 'r', encoding='utf-8') as f:
mapping = json.load(f)
logger.info(f"Loaded mapping from {mapping_path}")
return mapping
def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool:
"""
Create Elasticsearch index if it doesn't exist.
Args:
es_client: ESClient instance
index_name: Name of the index to create
mapping: Index mapping configuration. If None, loads from default file.
Returns:
True if index was created, False if it already exists
"""
if es_client.index_exists(index_name):
logger.info(f"Index '{index_name}' already exists")
return False
if mapping is None:
mapping = load_mapping()
if es_client.create_index(index_name, mapping):
logger.info(f"Index '{index_name}' created successfully")
return True
else:
logger.error(f"Failed to create index '{index_name}'")
return False
def delete_index_if_exists(es_client, index_name: str) -> bool:
"""
Delete Elasticsearch index if it exists.
Args:
es_client: ESClient instance
index_name: Name of the index to delete
Returns:
True if index was deleted, False if it didn't exist
"""
if not es_client.index_exists(index_name):
logger.warning(f"Index '{index_name}' does not exist")
return False
if es_client.delete_index(index_name):
logger.info(f"Index '{index_name}' deleted successfully")
return True
else:
logger.error(f"Failed to delete index '{index_name}'")
return False
def update_mapping(es_client, index_name: str, new_fields: Dict[str, Any]) -> bool:
"""
Update mapping for existing index (only adding new fields).
Args:
es_client: ESClient instance
index_name: Name of the index
new_fields: New field mappings to add
Returns:
True if successful
"""
if not es_client.index_exists(index_name):
logger.error(f"Index '{index_name}' does not exist")
return False
mapping = {"properties": new_fields}
if es_client.update_mapping(index_name, mapping):
logger.info(f"Mapping updated for index '{index_name}'")
return True
else:
logger.error(f"Failed to update mapping for index '{index_name}'")
return False