check_es_data.py 8.8 KB
#!/usr/bin/env python3
"""
检查ES索引中的实际数据,看分面字段是否有值
"""

import sys
import os
import argparse
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.es_client import ESClient


def check_es_facet_fields(es_client, tenant_id: str, size: int = 5):
    """检查ES中的分面相关字段"""
    print("\n" + "="*60)
    print("检查ES索引中的分面字段数据")
    print("="*60)
    
    query = {
        "query": {
            "term": {
                "tenant_id": tenant_id
            }
        },
        "size": size,
        "_source": [
            "spu_id",
            "title_zh",
            "category1_name",
            "category2_name",
            "category3_name",
            "category_name",
            "category_path_zh",
            "specifications",
            "option1_name",
            "option2_name",
            "option3_name"
        ]
    }
    
    try:
        response = es_client.client.search(index="search_products", body=query)
        hits = response.get('hits', {}).get('hits', [])
        total = response.get('hits', {}).get('total', {}).get('value', 0)
        
        print(f"\n总文档数: {total}")
        print(f"检查前 {len(hits)} 个文档:\n")
        
        for i, hit in enumerate(hits, 1):
            source = hit.get('_source', {})
            print(f"文档 {i}:")
            print(f"  spu_id: {source.get('spu_id')}")
            print(f"  title_zh: {source.get('title_zh', '')[:50]}")
            print(f"  category1_name: {source.get('category1_name')}")
            print(f"  category2_name: {source.get('category2_name')}")
            print(f"  category3_name: {source.get('category3_name')}")
            print(f"  category_name: {source.get('category_name')}")
            print(f"  category_path_zh: {source.get('category_path_zh')}")
            print(f"  option1_name: {source.get('option1_name')}")
            print(f"  option2_name: {source.get('option2_name')}")
            print(f"  option3_name: {source.get('option3_name')}")
            
            specs = source.get('specifications', [])
            if specs:
                print(f"  specifications 数量: {len(specs)}")
                # 显示前3个specifications
                for spec in specs[:3]:
                    print(f"    - name: {spec.get('name')}, value: {spec.get('value')}")
            else:
                print(f"  specifications: 空")
            print()
            
    except Exception as e:
        print(f"错误: {e}")
        import traceback
        traceback.print_exc()


def check_facet_aggregations(es_client, tenant_id: str):
    """检查分面聚合查询"""
    print("\n" + "="*60)
    print("检查分面聚合查询结果")
    print("="*60)
    
    query = {
        "query": {
            "term": {
                "tenant_id": tenant_id
            }
        },
        "size": 0,
        "aggs": {
            "category1_facet": {
                "terms": {
                    "field": "category1_name",
                    "size": 10
                }
            },
            "color_facet": {
                "nested": {
                    "path": "specifications"
                },
                "aggs": {
                    "filter_by_name": {
                        "filter": {
                            "term": {
                                "specifications.name": "color"
                            }
                        },
                        "aggs": {
                            "value_counts": {
                                "terms": {
                                    "field": "specifications.value",
                                    "size": 10
                                }
                            }
                        }
                    }
                }
            },
            "size_facet": {
                "nested": {
                    "path": "specifications"
                },
                "aggs": {
                    "filter_by_name": {
                        "filter": {
                            "term": {
                                "specifications.name": "size"
                            }
                        },
                        "aggs": {
                            "value_counts": {
                                "terms": {
                                    "field": "specifications.value",
                                    "size": 10
                                }
                            }
                        }
                    }
                }
            },
            "material_facet": {
                "nested": {
                    "path": "specifications"
                },
                "aggs": {
                    "filter_by_name": {
                        "filter": {
                            "term": {
                                "specifications.name": "material"
                            }
                        },
                        "aggs": {
                            "value_counts": {
                                "terms": {
                                    "field": "specifications.value",
                                    "size": 10
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    
    try:
        response = es_client.client.search(index="search_products", body=query)
        aggs = response.get('aggregations', {})
        
        print("\n1. category1_name 分面:")
        category1 = aggs.get('category1_facet', {})
        buckets = category1.get('buckets', [])
        if buckets:
            for bucket in buckets:
                print(f"  {bucket['key']}: {bucket['doc_count']}")
        else:
            print("  空(没有数据)")
        
        print("\n2. specifications.color 分面:")
        color_agg = aggs.get('color_facet', {})
        color_filter = color_agg.get('filter_by_name', {})
        color_values = color_filter.get('value_counts', {})
        color_buckets = color_values.get('buckets', [])
        if color_buckets:
            for bucket in color_buckets:
                print(f"  {bucket['key']}: {bucket['doc_count']}")
        else:
            print("  空(没有数据)")
        
        print("\n3. specifications.size 分面:")
        size_agg = aggs.get('size_facet', {})
        size_filter = size_agg.get('filter_by_name', {})
        size_values = size_filter.get('value_counts', {})
        size_buckets = size_values.get('buckets', [])
        if size_buckets:
            for bucket in size_buckets:
                print(f"  {bucket['key']}: {bucket['doc_count']}")
        else:
            print("  空(没有数据)")
        
        print("\n4. specifications.material 分面:")
        material_agg = aggs.get('material_facet', {})
        material_filter = material_agg.get('filter_by_name', {})
        material_values = material_filter.get('value_counts', {})
        material_buckets = material_values.get('buckets', [])
        if material_buckets:
            for bucket in material_buckets:
                print(f"  {bucket['key']}: {bucket['doc_count']}")
        else:
            print("  空(没有数据)")
            
    except Exception as e:
        print(f"错误: {e}")
        import traceback
        traceback.print_exc()


def main():
    parser = argparse.ArgumentParser(description='检查ES索引中的分面字段数据')
    parser.add_argument('--tenant-id', required=True, help='Tenant ID')
    parser.add_argument('--es-host', help='Elasticsearch host (或使用环境变量 ES_HOST, 默认: http://localhost:9200)')
    parser.add_argument('--size', type=int, default=5, help='检查的文档数量 (默认: 5)')
    
    args = parser.parse_args()
    
    # 连接ES
    es_host = args.es_host or os.environ.get('ES_HOST', 'http://localhost:9200')
    es_username = os.environ.get('ES_USERNAME')
    es_password = os.environ.get('ES_PASSWORD')
    
    print(f"连接Elasticsearch: {es_host}")
    print(f"Tenant ID: {args.tenant_id}\n")
    
    try:
        if es_username and es_password:
            es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
        else:
            es_client = ESClient(hosts=[es_host])
        
        if not es_client.ping():
            print(f"✗ 无法连接到Elasticsearch: {es_host}")
            return 1
        print("✓ Elasticsearch连接成功\n")
    except Exception as e:
        print(f"✗ 连接Elasticsearch失败: {e}")
        return 1
    
    # 检查ES数据
    check_es_facet_fields(es_client, args.tenant_id, args.size)
    check_facet_aggregations(es_client, args.tenant_id)
    
    print("\n" + "="*60)
    print("检查完成")
    print("="*60)
    
    return 0


if __name__ == '__main__':
    sys.exit(main())