#!/usr/bin/env python3 """重建 ES 索引(仅索引结构,不导入数据)。 - 删除并重建索引(基于 mappings/search_products.json) - 依赖环境变量中的 ES 配置: - ES_HOST(默认: http://localhost:9200) - ES_USERNAME(可选) - ES_PASSWORD(可选) 用法: python scripts/recreate_index.py """ import os import sys from pathlib import Path # 将项目根目录加入 sys.path PROJECT_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from utils.es_client import ESClient # type: ignore from indexer.mapping_generator import ( # type: ignore load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME, ) def main() -> int: print("=" * 60) print("Recreate Elasticsearch index (structure only, no data import)") print("=" * 60) # 1. 连接 Elasticsearch es_host = os.environ.get("ES_HOST", "http://localhost:9200") es_username = os.environ.get("ES_USERNAME") es_password = os.environ.get("ES_PASSWORD") print(f"ES host: {es_host}") if es_username: print(f"ES username: {es_username}") try: if es_username and es_password: es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) else: es_client = ESClient(hosts=[es_host]) if not es_client.ping(): print(f"[ERROR] Cannot connect to Elasticsearch at {es_host}") return 1 except Exception as e: print(f"[ERROR] Failed to connect to Elasticsearch: {e}") return 1 index_name = DEFAULT_INDEX_NAME print(f"Index name: {index_name}") # 2. 加载 mapping try: mapping = load_mapping() print("Loaded mapping configuration.") except Exception as e: print(f"[ERROR] Failed to load mapping: {e}") return 1 # 3. 删除旧索引(如果存在) print(f"Deleting existing index if exists: {index_name} ...") try: if es_client.index_exists(index_name): if delete_index_if_exists(es_client, index_name): print(f"✓ Deleted index: {index_name}") else: print(f"[ERROR] Failed to delete index: {index_name}") return 1 else: print(f"Index does not exist, skip delete: {index_name}") except Exception as e: print(f"[ERROR] Error while deleting index: {e}") return 1 # 4. 创建新索引 print(f"Creating index: {index_name} ...") try: if es_client.create_index(index_name, mapping): print(f"✓ Created index: {index_name}") else: print(f"[ERROR] Failed to create index: {index_name}") return 1 except Exception as e: print(f"[ERROR] Error while creating index: {e}") return 1 print("=" * 60) print("Index recreation completed. Please trigger /indexer/reindex per tenant to re-import data.") print("=" * 60) return 0 if __name__ == "__main__": raise SystemExit(main())