#!/usr/bin/env python3 """ 检查ES索引中的实际数据,看分面字段是否有值 """ import sys import os import argparse from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from utils.es_client import ESClient def check_es_facet_fields(es_client, tenant_id: str, size: int = 5): """检查ES中的分面相关字段""" print("\n" + "="*60) print("检查ES索引中的分面字段数据") print("="*60) query = { "query": { "term": { "tenant_id": tenant_id } }, "size": size, "_source": [ "spu_id", "title_zh", "category1_name", "category2_name", "category3_name", "category_name", "category_path_zh", "specifications", "option1_name", "option2_name", "option3_name" ] } try: response = es_client.client.search(index="search_products", body=query) hits = response.get('hits', {}).get('hits', []) total = response.get('hits', {}).get('total', {}).get('value', 0) print(f"\n总文档数: {total}") print(f"检查前 {len(hits)} 个文档:\n") for i, hit in enumerate(hits, 1): source = hit.get('_source', {}) print(f"文档 {i}:") print(f" spu_id: {source.get('spu_id')}") print(f" title_zh: {source.get('title_zh', '')[:50]}") print(f" category1_name: {source.get('category1_name')}") print(f" category2_name: {source.get('category2_name')}") print(f" category3_name: {source.get('category3_name')}") print(f" category_name: {source.get('category_name')}") print(f" category_path_zh: {source.get('category_path_zh')}") print(f" option1_name: {source.get('option1_name')}") print(f" option2_name: {source.get('option2_name')}") print(f" option3_name: {source.get('option3_name')}") specs = source.get('specifications', []) if specs: print(f" specifications 数量: {len(specs)}") # 显示前3个specifications for spec in specs[:3]: print(f" - name: {spec.get('name')}, value: {spec.get('value')}") else: print(f" specifications: 空") print() except Exception as e: print(f"错误: {e}") import traceback traceback.print_exc() def check_facet_aggregations(es_client, tenant_id: str): """检查分面聚合查询""" print("\n" + "="*60) print("检查分面聚合查询结果") print("="*60) query = { "query": { "term": { "tenant_id": tenant_id } }, "size": 0, "aggs": { "category1_facet": { "terms": { "field": "category1_name", "size": 10 } }, "color_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "color" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } }, "size_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "size" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } }, "material_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "material" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } } } } try: response = es_client.client.search(index="search_products", body=query) aggs = response.get('aggregations', {}) print("\n1. category1_name 分面:") category1 = aggs.get('category1_facet', {}) buckets = category1.get('buckets', []) if buckets: for bucket in buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" 空(没有数据)") print("\n2. specifications.color 分面:") color_agg = aggs.get('color_facet', {}) color_filter = color_agg.get('filter_by_name', {}) color_values = color_filter.get('value_counts', {}) color_buckets = color_values.get('buckets', []) if color_buckets: for bucket in color_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" 空(没有数据)") print("\n3. specifications.size 分面:") size_agg = aggs.get('size_facet', {}) size_filter = size_agg.get('filter_by_name', {}) size_values = size_filter.get('value_counts', {}) size_buckets = size_values.get('buckets', []) if size_buckets: for bucket in size_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" 空(没有数据)") print("\n4. specifications.material 分面:") material_agg = aggs.get('material_facet', {}) material_filter = material_agg.get('filter_by_name', {}) material_values = material_filter.get('value_counts', {}) material_buckets = material_values.get('buckets', []) if material_buckets: for bucket in material_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" 空(没有数据)") except Exception as e: print(f"错误: {e}") import traceback traceback.print_exc() def main(): parser = argparse.ArgumentParser(description='检查ES索引中的分面字段数据') parser.add_argument('--tenant-id', required=True, help='Tenant ID') parser.add_argument('--es-host', help='Elasticsearch host (或使用环境变量 ES_HOST, 默认: http://localhost:9200)') parser.add_argument('--size', type=int, default=5, help='检查的文档数量 (默认: 5)') args = parser.parse_args() # 连接ES es_host = args.es_host or os.environ.get('ES_HOST', 'http://localhost:9200') es_username = os.environ.get('ES_USERNAME') es_password = os.environ.get('ES_PASSWORD') print(f"连接Elasticsearch: {es_host}") print(f"Tenant ID: {args.tenant_id}\n") try: if es_username and es_password: es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) else: es_client = ESClient(hosts=[es_host]) if not es_client.ping(): print(f"✗ 无法连接到Elasticsearch: {es_host}") return 1 print("✓ Elasticsearch连接成功\n") except Exception as e: print(f"✗ 连接Elasticsearch失败: {e}") return 1 # 检查ES数据 check_es_facet_fields(es_client, args.tenant_id, args.size) check_facet_aggregations(es_client, args.tenant_id) print("\n" + "="*60) print("检查完成") print("="*60) return 0 if __name__ == '__main__': sys.exit(main())