#!/usr/bin/env python3 """ 对比不同租户索引的 mapping 结构 """ import os import sys import json from pathlib import Path from typing import Dict, Any sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from utils.es_client import get_es_client_from_env def get_field_type(mapping_dict: Dict, field_path: str) -> Dict[str, Any]: """递归获取字段的 mapping 信息""" parts = field_path.split('.') current = mapping_dict for part in parts: if isinstance(current, dict): current = current.get(part) if current is None: return None else: return None return current def compare_mappings(mapping1: Dict[str, Any], mapping2: Dict[str, Any], index1_name: str, index2_name: str): """对比两个索引的 mapping""" props1 = mapping1.get('mappings', {}).get('properties', {}) props2 = mapping2.get('mappings', {}).get('properties', {}) all_fields = set(props1.keys()) | set(props2.keys()) print(f"\n{'='*80}") print(f"对比索引映射结构") print(f"{'='*80}") print(f"索引1: {index1_name}") print(f"索引2: {index2_name}") print(f"{'='*80}\n") differences = [] same_fields = [] for field in sorted(all_fields): field1 = props1.get(field) field2 = props2.get(field) if field1 is None: differences.append((field, f"只在 {index2_name} 中存在", field2)) continue if field2 is None: differences.append((field, f"只在 {index1_name} 中存在", field1)) continue type1 = field1.get('type') type2 = field2.get('type') if type1 != type2: differences.append((field, f"类型不同: {index1_name}={type1}, {index2_name}={type2}", (field1, field2))) else: same_fields.append((field, type1)) # 打印相同的字段 print(f"✓ 相同字段 ({len(same_fields)} 个):") for field, field_type in same_fields[:20]: # 只显示前20个 print(f" - {field}: {field_type}") if len(same_fields) > 20: print(f" ... 还有 {len(same_fields) - 20} 个相同字段") # 打印不同的字段 if differences: print(f"\n✗ 不同字段 ({len(differences)} 个):") for field, reason, details in differences: print(f"\n {field}:") print(f" {reason}") if isinstance(details, tuple): print(f" {index1_name}: {json.dumps(details[0], indent=4, ensure_ascii=False)}") print(f" {index2_name}: {json.dumps(details[1], indent=4, ensure_ascii=False)}") else: print(f" 详情: {json.dumps(details, indent=4, ensure_ascii=False)}") else: print(f"\n✓ 所有字段类型一致!") # 特别检查 tags 字段 print(f"\n{'='*80}") print(f"特别检查: tags 字段") print(f"{'='*80}") tags1 = get_field_type(props1, 'tags') tags2 = get_field_type(props2, 'tags') if tags1: print(f"\n{index1_name}.tags:") print(f" 类型: {tags1.get('type')}") print(f" 完整定义: {json.dumps(tags1, indent=2, ensure_ascii=False)}") else: print(f"\n{index1_name}.tags: 不存在") if tags2: print(f"\n{index2_name}.tags:") print(f" 类型: {tags2.get('type')}") print(f" 完整定义: {json.dumps(tags2, indent=2, ensure_ascii=False)}") else: print(f"\n{index2_name}.tags: 不存在") def main(): import argparse parser = argparse.ArgumentParser(description='对比 Elasticsearch 索引的 mapping 结构') parser.add_argument('index1', help='第一个索引名称 (例如: search_products_tenant_171)') parser.add_argument('index2', nargs='?', help='第二个索引名称 (例如: search_products_tenant_162)') parser.add_argument('--list', action='store_true', help='列出所有以 index1 为前缀的索引') args = parser.parse_args() # 连接 ES try: es_client = get_es_client_from_env() if not es_client.ping(): print("✗ 无法连接到 Elasticsearch") return 1 print("✓ Elasticsearch 连接成功\n") except Exception as e: print(f"✗ 连接 Elasticsearch 失败: {e}") return 1 # 如果指定了 --list,列出所有匹配的索引 if args.list or not args.index2: try: # 使用 cat API 列出所有索引 indices = es_client.client.cat.indices(format='json') matching_indices = [idx['index'] for idx in indices if idx['index'].startswith(args.index1)] if matching_indices: print(f"找到 {len(matching_indices)} 个匹配的索引:") for idx in sorted(matching_indices): print(f" - {idx}") return 0 else: print(f"未找到以 '{args.index1}' 开头的索引") return 1 except Exception as e: print(f"✗ 列出索引失败: {e}") return 1 # 获取两个索引的 mapping index1 = args.index1 index2 = args.index2 print(f"正在获取索引映射...") print(f" 索引1: {index1}") print(f" 索引2: {index2}\n") # 检查索引是否存在 if not es_client.index_exists(index1): print(f"✗ 索引 '{index1}' 不存在") return 1 if not es_client.index_exists(index2): print(f"✗ 索引 '{index2}' 不存在") return 1 # 获取 mapping mapping1 = es_client.get_mapping(index1) mapping2 = es_client.get_mapping(index2) if not mapping1 or index1 not in mapping1: print(f"✗ 无法获取索引 '{index1}' 的映射") return 1 if not mapping2 or index2 not in mapping2: print(f"✗ 无法获取索引 '{index2}' 的映射") return 1 # 对比 mapping compare_mappings(mapping1[index1], mapping2[index2], index1, index2) return 0 if __name__ == '__main__': sys.exit(main())