#!/usr/bin/env python3 """ Check actual data in ES index to see if facet fields have values """ import sys import os import argparse from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from utils.es_client import ESClient def check_es_facet_fields(es_client, tenant_id: str, size: int = 5): """Check facet-related fields in ES""" print("\n" + "="*60) print("Checking facet field data in ES index") print("="*60) query = { "query": { "term": { "tenant_id": tenant_id } }, "size": size, "_source": [ "spu_id", "title", "category1_name", "category2_name", "category3_name", "category_name", "category_path", "specifications", "option1_name", "option2_name", "option3_name" ] } try: response = es_client.client.search(index="search_products", body=query) hits = response.get('hits', {}).get('hits', []) total = response.get('hits', {}).get('total', {}).get('value', 0) print(f"\nTotal documents: {total}") print(f"Checking first {len(hits)} documents:\n") for i, hit in enumerate(hits, 1): source = hit.get('_source', {}) title_obj = source.get("title") or {} category_path_obj = source.get("category_path") or {} print(f"Document {i}:") print(f" spu_id: {source.get('spu_id')}") print(f" title.zh: {str(title_obj.get('zh', ''))[:50] if isinstance(title_obj, dict) else ''}") print(f" category1_name: {source.get('category1_name')}") print(f" category2_name: {source.get('category2_name')}") print(f" category3_name: {source.get('category3_name')}") print(f" category_name: {source.get('category_name')}") print(f" category_path.zh: {category_path_obj.get('zh') if isinstance(category_path_obj, dict) else None}") print(f" option1_name: {source.get('option1_name')}") print(f" option2_name: {source.get('option2_name')}") print(f" option3_name: {source.get('option3_name')}") specs = source.get('specifications', []) if specs: print(f" specifications count: {len(specs)}") # 显示前3个specifications for spec in specs[:3]: print(f" - name: {spec.get('name')}, value: {spec.get('value')}") else: print(f" specifications: empty") print() except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() def check_facet_aggregations(es_client, tenant_id: str): """Check facet aggregation queries""" print("\n" + "="*60) print("Checking facet aggregation query results") print("="*60) query = { "query": { "term": { "tenant_id": tenant_id } }, "size": 0, "aggs": { "category1_facet": { "terms": { "field": "category1_name", "size": 10 } }, "color_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "color" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } }, "size_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "size" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } }, "material_facet": { "nested": { "path": "specifications" }, "aggs": { "filter_by_name": { "filter": { "term": { "specifications.name": "material" } }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10 } } } } } } } } try: response = es_client.client.search(index="search_products", body=query) aggs = response.get('aggregations', {}) print("\n1. category1_name facet:") category1 = aggs.get('category1_facet', {}) buckets = category1.get('buckets', []) if buckets: for bucket in buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" empty (no data)") print("\n2. specifications.color facet:") color_agg = aggs.get('color_facet', {}) color_filter = color_agg.get('filter_by_name', {}) color_values = color_filter.get('value_counts', {}) color_buckets = color_values.get('buckets', []) if color_buckets: for bucket in color_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" empty (no data)") print("\n3. specifications.size facet:") size_agg = aggs.get('size_facet', {}) size_filter = size_agg.get('filter_by_name', {}) size_values = size_filter.get('value_counts', {}) size_buckets = size_values.get('buckets', []) if size_buckets: for bucket in size_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" empty (no data)") print("\n4. specifications.material facet:") material_agg = aggs.get('material_facet', {}) material_filter = material_agg.get('filter_by_name', {}) material_values = material_filter.get('value_counts', {}) material_buckets = material_values.get('buckets', []) if material_buckets: for bucket in material_buckets: print(f" {bucket['key']}: {bucket['doc_count']}") else: print(" empty (no data)") except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() def main(): parser = argparse.ArgumentParser(description='Check facet field data in ES index') parser.add_argument('--tenant-id', required=True, help='Tenant ID') parser.add_argument('--es-host', help='Elasticsearch host (or use env var ES_HOST, default: http://localhost:9200)') parser.add_argument('--size', type=int, default=5, help='Number of documents to check (default: 5)') args = parser.parse_args() # 连接ES es_host = args.es_host or os.environ.get('ES_HOST', 'http://localhost:9200') es_username = os.environ.get('ES_USERNAME') es_password = os.environ.get('ES_PASSWORD') print(f"Connecting to Elasticsearch: {es_host}") print(f"Tenant ID: {args.tenant_id}\n") try: if es_username and es_password: es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) else: es_client = ESClient(hosts=[es_host]) if not es_client.ping(): print(f"✗ Cannot connect to Elasticsearch: {es_host}") return 1 print("✓ Elasticsearch connected successfully\n") except Exception as e: print(f"✗ Failed to connect to Elasticsearch: {e}") return 1 # 检查ES数据 check_es_facet_fields(es_client, args.tenant_id, args.size) check_facet_aggregations(es_client, args.tenant_id) print("\n" + "="*60) print("Check completed") print("="*60) return 0 if __name__ == '__main__': sys.exit(main())