compare_index_mappings.py 6.09 KB
#!/usr/bin/env python3
"""
对比不同租户索引的 mapping 结构
"""

import os
import sys
import json
from pathlib import Path
from typing import Dict, Any

sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.es_client import get_es_client_from_env


def get_field_type(mapping_dict: Dict, field_path: str) -> Dict[str, Any]:
    """递归获取字段的 mapping 信息"""
    parts = field_path.split('.')
    current = mapping_dict
    
    for part in parts:
        if isinstance(current, dict):
            current = current.get(part)
            if current is None:
                return None
        else:
            return None
    return current


def compare_mappings(mapping1: Dict[str, Any], mapping2: Dict[str, Any], index1_name: str, index2_name: str):
    """对比两个索引的 mapping"""
    props1 = mapping1.get('mappings', {}).get('properties', {})
    props2 = mapping2.get('mappings', {}).get('properties', {})
    
    all_fields = set(props1.keys()) | set(props2.keys())
    
    print(f"\n{'='*80}")
    print(f"对比索引映射结构")
    print(f"{'='*80}")
    print(f"索引1: {index1_name}")
    print(f"索引2: {index2_name}")
    print(f"{'='*80}\n")
    
    differences = []
    same_fields = []
    
    for field in sorted(all_fields):
        field1 = props1.get(field)
        field2 = props2.get(field)
        
        if field1 is None:
            differences.append((field, f"只在 {index2_name} 中存在", field2))
            continue
        if field2 is None:
            differences.append((field, f"只在 {index1_name} 中存在", field1))
            continue
        
        type1 = field1.get('type')
        type2 = field2.get('type')
        
        if type1 != type2:
            differences.append((field, f"类型不同: {index1_name}={type1}, {index2_name}={type2}", (field1, field2)))
        else:
            same_fields.append((field, type1))
    
    # 打印相同的字段
    print(f"✓ 相同字段 ({len(same_fields)} 个):")
    for field, field_type in same_fields[:20]:  # 只显示前20个
        print(f"  - {field}: {field_type}")
    if len(same_fields) > 20:
        print(f"  ... 还有 {len(same_fields) - 20} 个相同字段")
    
    # 打印不同的字段
    if differences:
        print(f"\n✗ 不同字段 ({len(differences)} 个):")
        for field, reason, details in differences:
            print(f"\n  {field}:")
            print(f"    {reason}")
            if isinstance(details, tuple):
                print(f"    {index1_name}: {json.dumps(details[0], indent=4, ensure_ascii=False)}")
                print(f"    {index2_name}: {json.dumps(details[1], indent=4, ensure_ascii=False)}")
            else:
                print(f"    详情: {json.dumps(details, indent=4, ensure_ascii=False)}")
    else:
        print(f"\n✓ 所有字段类型一致!")
    
    # 特别检查 tags 字段
    print(f"\n{'='*80}")
    print(f"特别检查: tags 字段")
    print(f"{'='*80}")
    
    tags1 = get_field_type(props1, 'tags')
    tags2 = get_field_type(props2, 'tags')
    
    if tags1:
        print(f"\n{index1_name}.tags:")
        print(f"  类型: {tags1.get('type')}")
        print(f"  完整定义: {json.dumps(tags1, indent=2, ensure_ascii=False)}")
    else:
        print(f"\n{index1_name}.tags: 不存在")
    
    if tags2:
        print(f"\n{index2_name}.tags:")
        print(f"  类型: {tags2.get('type')}")
        print(f"  完整定义: {json.dumps(tags2, indent=2, ensure_ascii=False)}")
    else:
        print(f"\n{index2_name}.tags: 不存在")


def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='对比 Elasticsearch 索引的 mapping 结构')
    parser.add_argument('index1', help='第一个索引名称 (例如: search_products_tenant_171)')
    parser.add_argument('index2', nargs='?', help='第二个索引名称 (例如: search_products_tenant_162)')
    parser.add_argument('--list', action='store_true', help='列出所有以 index1 为前缀的索引')
    
    args = parser.parse_args()
    
    # 连接 ES
    try:
        es_client = get_es_client_from_env()
        if not es_client.ping():
            print("✗ 无法连接到 Elasticsearch")
            return 1
        print("✓ Elasticsearch 连接成功\n")
    except Exception as e:
        print(f"✗ 连接 Elasticsearch 失败: {e}")
        return 1
    
    # 如果指定了 --list,列出所有匹配的索引
    if args.list or not args.index2:
        try:
            # 使用 cat API 列出所有索引
            indices = es_client.client.cat.indices(format='json')
            matching_indices = [idx['index'] for idx in indices if idx['index'].startswith(args.index1)]
            
            if matching_indices:
                print(f"找到 {len(matching_indices)} 个匹配的索引:")
                for idx in sorted(matching_indices):
                    print(f"  - {idx}")
                return 0
            else:
                print(f"未找到以 '{args.index1}' 开头的索引")
                return 1
        except Exception as e:
            print(f"✗ 列出索引失败: {e}")
            return 1
    
    # 获取两个索引的 mapping
    index1 = args.index1
    index2 = args.index2
    
    print(f"正在获取索引映射...")
    print(f"  索引1: {index1}")
    print(f"  索引2: {index2}\n")
    
    # 检查索引是否存在
    if not es_client.index_exists(index1):
        print(f"✗ 索引 '{index1}' 不存在")
        return 1
    
    if not es_client.index_exists(index2):
        print(f"✗ 索引 '{index2}' 不存在")
        return 1
    
    # 获取 mapping
    mapping1 = es_client.get_mapping(index1)
    mapping2 = es_client.get_mapping(index2)
    
    if not mapping1 or index1 not in mapping1:
        print(f"✗ 无法获取索引 '{index1}' 的映射")
        return 1
    
    if not mapping2 or index2 not in mapping2:
        print(f"✗ 无法获取索引 '{index2}' 的映射")
        return 1
    
    # 对比 mapping
    compare_mappings(mapping1[index1], mapping2[index2], index1, index2)
    
    return 0


if __name__ == '__main__':
    sys.exit(main())