compare_index_mappings.py
6.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python3
"""
对比不同租户索引的 mapping 结构
"""
import os
import sys
import json
from pathlib import Path
from typing import Dict, Any
sys.path.insert(0, str(Path(__file__).parent.parent))
from utils.es_client import get_es_client_from_env
def get_field_type(mapping_dict: Dict, field_path: str) -> Dict[str, Any]:
"""递归获取字段的 mapping 信息"""
parts = field_path.split('.')
current = mapping_dict
for part in parts:
if isinstance(current, dict):
current = current.get(part)
if current is None:
return None
else:
return None
return current
def compare_mappings(mapping1: Dict[str, Any], mapping2: Dict[str, Any], index1_name: str, index2_name: str):
"""对比两个索引的 mapping"""
props1 = mapping1.get('mappings', {}).get('properties', {})
props2 = mapping2.get('mappings', {}).get('properties', {})
all_fields = set(props1.keys()) | set(props2.keys())
print(f"\n{'='*80}")
print(f"对比索引映射结构")
print(f"{'='*80}")
print(f"索引1: {index1_name}")
print(f"索引2: {index2_name}")
print(f"{'='*80}\n")
differences = []
same_fields = []
for field in sorted(all_fields):
field1 = props1.get(field)
field2 = props2.get(field)
if field1 is None:
differences.append((field, f"只在 {index2_name} 中存在", field2))
continue
if field2 is None:
differences.append((field, f"只在 {index1_name} 中存在", field1))
continue
type1 = field1.get('type')
type2 = field2.get('type')
if type1 != type2:
differences.append((field, f"类型不同: {index1_name}={type1}, {index2_name}={type2}", (field1, field2)))
else:
same_fields.append((field, type1))
# 打印相同的字段
print(f"✓ 相同字段 ({len(same_fields)} 个):")
for field, field_type in same_fields[:20]: # 只显示前20个
print(f" - {field}: {field_type}")
if len(same_fields) > 20:
print(f" ... 还有 {len(same_fields) - 20} 个相同字段")
# 打印不同的字段
if differences:
print(f"\n✗ 不同字段 ({len(differences)} 个):")
for field, reason, details in differences:
print(f"\n {field}:")
print(f" {reason}")
if isinstance(details, tuple):
print(f" {index1_name}: {json.dumps(details[0], indent=4, ensure_ascii=False)}")
print(f" {index2_name}: {json.dumps(details[1], indent=4, ensure_ascii=False)}")
else:
print(f" 详情: {json.dumps(details, indent=4, ensure_ascii=False)}")
else:
print(f"\n✓ 所有字段类型一致!")
# 特别检查 tags 字段
print(f"\n{'='*80}")
print(f"特别检查: tags 字段")
print(f"{'='*80}")
tags1 = get_field_type(props1, 'tags')
tags2 = get_field_type(props2, 'tags')
if tags1:
print(f"\n{index1_name}.tags:")
print(f" 类型: {tags1.get('type')}")
print(f" 完整定义: {json.dumps(tags1, indent=2, ensure_ascii=False)}")
else:
print(f"\n{index1_name}.tags: 不存在")
if tags2:
print(f"\n{index2_name}.tags:")
print(f" 类型: {tags2.get('type')}")
print(f" 完整定义: {json.dumps(tags2, indent=2, ensure_ascii=False)}")
else:
print(f"\n{index2_name}.tags: 不存在")
def main():
import argparse
parser = argparse.ArgumentParser(description='对比 Elasticsearch 索引的 mapping 结构')
parser.add_argument('index1', help='第一个索引名称 (例如: search_products_tenant_171)')
parser.add_argument('index2', nargs='?', help='第二个索引名称 (例如: search_products_tenant_162)')
parser.add_argument('--list', action='store_true', help='列出所有以 index1 为前缀的索引')
args = parser.parse_args()
# 连接 ES
try:
es_client = get_es_client_from_env()
if not es_client.ping():
print("✗ 无法连接到 Elasticsearch")
return 1
print("✓ Elasticsearch 连接成功\n")
except Exception as e:
print(f"✗ 连接 Elasticsearch 失败: {e}")
return 1
# 如果指定了 --list,列出所有匹配的索引
if args.list or not args.index2:
try:
# 使用 cat API 列出所有索引
indices = es_client.client.cat.indices(format='json')
matching_indices = [idx['index'] for idx in indices if idx['index'].startswith(args.index1)]
if matching_indices:
print(f"找到 {len(matching_indices)} 个匹配的索引:")
for idx in sorted(matching_indices):
print(f" - {idx}")
return 0
else:
print(f"未找到以 '{args.index1}' 开头的索引")
return 1
except Exception as e:
print(f"✗ 列出索引失败: {e}")
return 1
# 获取两个索引的 mapping
index1 = args.index1
index2 = args.index2
print(f"正在获取索引映射...")
print(f" 索引1: {index1}")
print(f" 索引2: {index2}\n")
# 检查索引是否存在
if not es_client.index_exists(index1):
print(f"✗ 索引 '{index1}' 不存在")
return 1
if not es_client.index_exists(index2):
print(f"✗ 索引 '{index2}' 不存在")
return 1
# 获取 mapping
mapping1 = es_client.get_mapping(index1)
mapping2 = es_client.get_mapping(index2)
if not mapping1 or index1 not in mapping1:
print(f"✗ 无法获取索引 '{index1}' 的映射")
return 1
if not mapping2 or index2 not in mapping2:
print(f"✗ 无法获取索引 '{index2}' 的映射")
return 1
# 对比 mapping
compare_mappings(mapping1[index1], mapping2[index2], index1, index2)
return 0
if __name__ == '__main__':
sys.exit(main())