From 80f87e57cd0557d592946757d2618800f3ffeaee Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 6 Jan 2026 22:37:07 +0800 Subject: [PATCH] 多语言索引修改 对应的 索引创建、数据灌入脚本、文档 同步修改 --- api/routes/indexer.py | 2 +- docs/常用查询 - ES.md | 3 +-- docs/搜索API对接指南.md | 33 ++++++++++++++++++++++++++++++--- indexer/__init__.py | 4 ++-- indexer/bulk_indexer.py | 1 - scripts/check_index_mapping.py | 10 ++++++++-- scripts/create_tenant_index.sh | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/recreate_index.py | 102 ------------------------------------------------------------------------------------------------------ 8 files changed, 118 insertions(+), 113 deletions(-) create mode 100755 scripts/create_tenant_index.sh delete mode 100644 scripts/recreate_index.py diff --git a/api/routes/indexer.py b/api/routes/indexer.py index d78a24a..732cbec 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -44,7 +44,7 @@ async def reindex_all(request: ReindexRequest): 全量重建索引接口 将指定租户的所有SPU数据重新索引到ES。 - 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。 + 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引结构(删除后重建),请使用 `scripts/create_tenant_index.sh` 脚本。 注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。 全量索引和增量索引可以并行执行。 diff --git a/docs/常用查询 - ES.md b/docs/常用查询 - ES.md index db51ad1..cd5a7a3 100644 --- a/docs/常用查询 - ES.md +++ b/docs/常用查询 - ES.md @@ -6,9 +6,8 @@ ### 1. 根据 tenant_id / spu_id 查询 curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ -{ "size": 11, - "_source": "*", + "_source": ["title"], "query": { "bool": { "filter": [ diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 09f72d7..98dd3eb 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -31,7 +31,8 @@ - 4.5 [多语言字段说明](#45-多语言字段说明) 5. [索引接口](#索引接口) - - 5.1 [全量重建索引接口](#51-全量重建索引接口) + - 5.0 [为租户创建索引](#50-为租户创建索引) + - 5.1 [全量索引接口](#51-全量索引接口) - 5.2 [增量索引接口](#52-增量索引接口) - 5.3 [查询文档接口](#53-查询文档接口) - 5.4 [索引健康检查接口](#54-索引健康检查接口) @@ -130,7 +131,7 @@ curl -X POST "http://120.76.41.98:6002/search/" \ | 搜索建议 | GET | `/search/suggestions` | 搜索建议(框架,暂未实现) ⚠️ TODO | | 即时搜索 | GET | `/search/instant` | 边输入边搜索(框架) ⚠️ TODO | | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | -| 全量重建索引 | POST | `/indexer/reindex` | 全量重建索引接口 | +| 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引) | | 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引,支持自动检测删除和显式删除) | | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | @@ -797,6 +798,32 @@ curl "http://localhost:6002/search/12345" ## 索引接口 +### 5.0 为租户创建索引 + +为租户创建索引需要两个步骤: + +1. **创建索引结构**(可选,仅在需要更新 mapping 时执行) + - 使用脚本创建 ES 索引结构(基于 `mappings/search_products.json`) + - 如果索引已存在,会提示用户确认(会删除现有数据) + +2. **导入数据**(必需) + - 使用全量索引接口 `/indexer/reindex` 导入数据 + +**创建索引结构**: + +```bash +./scripts/create_tenant_index.sh 170 +``` + +脚本会自动从项目根目录的 `.env` 文件加载 ES 配置。 + +**注意事项**: +- ⚠️ 如果索引已存在,脚本会提示确认,确认后会删除现有数据 +- 创建索引后,**必须**调用 `/indexer/reindex` 导入数据 +- 如果只是更新数据而不需要修改索引结构,直接使用 `/indexer/reindex` 即可 + +--- + ### 5.1 全量索引接口 - **端点**: `POST /indexer/reindex` @@ -856,7 +883,7 @@ tail -f logs/api.log tail -f logs/*.log ``` -> 如需 **重建索引(会删除并重建整份 ES 索引结构)**,在服务器上执行内部脚本:`python scripts/recreate_index.py`。重建后需要按租户调用 `/indexer/reindex` 重新导入各租户数据。 +> ⚠️ **重要提示**:如需 **创建索引结构**,请参考 [5.0 为租户创建索引](#50-为租户创建索引) 章节,使用 `scripts/recreate_all_tenant_indices.py` 脚本。创建后需要调用 `/indexer/reindex` 导入数据。 **查看索引日志**: diff --git a/indexer/__init__.py b/indexer/__init__.py index d472a02..bf11885 100644 --- a/indexer/__init__.py +++ b/indexer/__init__.py @@ -1,6 +1,6 @@ """Indexer package initialization.""" -from .mapping_generator import load_mapping, create_index_if_not_exists, delete_index_if_exists, DEFAULT_INDEX_NAME +from .mapping_generator import load_mapping, create_index_if_not_exists, delete_index_if_exists, get_tenant_index_name from .spu_transformer import SPUTransformer from .bulk_indexer import BulkIndexer @@ -8,7 +8,7 @@ __all__ = [ 'load_mapping', 'create_index_if_not_exists', 'delete_index_if_exists', - 'DEFAULT_INDEX_NAME', + 'get_tenant_index_name', 'SPUTransformer', 'BulkIndexer', ] diff --git a/indexer/bulk_indexer.py b/indexer/bulk_indexer.py index 754162a..a4ae44e 100644 --- a/indexer/bulk_indexer.py +++ b/indexer/bulk_indexer.py @@ -7,7 +7,6 @@ Handles batch indexing of documents with progress tracking and error handling. from typing import List, Dict, Any, Optional from elasticsearch.helpers import bulk, BulkIndexError from utils.es_client import ESClient -from indexer.mapping_generator import DEFAULT_INDEX_NAME import time diff --git a/scripts/check_index_mapping.py b/scripts/check_index_mapping.py index c7b06bd..e7569fd 100644 --- a/scripts/check_index_mapping.py +++ b/scripts/check_index_mapping.py @@ -11,7 +11,7 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from utils.es_client import get_es_client_from_env -from indexer.mapping_generator import DEFAULT_INDEX_NAME +from indexer.mapping_generator import get_tenant_index_name def check_field_mapping(mapping_dict, field_path): @@ -38,6 +38,12 @@ def check_field_mapping(mapping_dict, field_path): def main(): + import argparse + + parser = argparse.ArgumentParser(description="检查 Elasticsearch 索引实际映射配置") + parser.add_argument("--tenant-id", type=str, required=True, help="租户ID") + args = parser.parse_args() + print("=" * 80) print("检查 Elasticsearch 索引实际映射配置") print("=" * 80) @@ -53,7 +59,7 @@ def main(): print(f"✗ 连接 Elasticsearch 失败: {e}") return 1 - index_name = DEFAULT_INDEX_NAME + index_name = get_tenant_index_name(args.tenant_id) # 检查索引是否存在 if not es_client.index_exists(index_name): diff --git a/scripts/create_tenant_index.sh b/scripts/create_tenant_index.sh new file mode 100755 index 0000000..172f62c --- /dev/null +++ b/scripts/create_tenant_index.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +# 为租户创建 ES 索引 +# 用法: ./scripts/create_tenant_index.sh + +# 切换到项目根目录 +cd "$(dirname "$0")/.." + +# 加载 .env 文件(如果存在) +if [ -f .env ]; then + set -a + source .env + set +a +fi + +ES_HOST="${ES_HOST:-http://localhost:9200}" +ES_USERNAME="${ES_USERNAME:-}" +ES_PASSWORD="${ES_PASSWORD:-}" + +# 检查命令行参数 +if [ $# -eq 0 ]; then + echo "用法: $0 " + echo "示例: $0 170" + exit 1 +fi + +TENANT_ID="$1" +ES_INDEX="search_products_tenant_${TENANT_ID}" +MAPPING_FILE="mappings/search_products.json" + +# 检查 mapping 文件是否存在 +if [ ! -f "$MAPPING_FILE" ]; then + echo "错误: mapping 文件不存在: $MAPPING_FILE" + exit 1 +fi + +# 手动确认 +echo "创建索引前,将删除已有的同名索引。" +echo "索引名称: $ES_INDEX" +echo "请输入索引名称 '$ES_INDEX' 来确认:" +read -r user_input + +if [ "$user_input" != "$ES_INDEX" ]; then + echo "确认失败,索引名称不匹配。退出操作。" + exit 1 +fi + +echo "确认成功,继续创建索引..." + +# 构建 curl 认证参数 +AUTH_PARAM="" +if [ -n "$ES_USERNAME" ] && [ -n "$ES_PASSWORD" ]; then + AUTH_PARAM="-u ${ES_USERNAME}:${ES_PASSWORD}" +fi + +# 删除已存在的索引(如果存在) +echo +echo "删除索引: $ES_INDEX" +echo +curl -X DELETE "${ES_HOST}/${ES_INDEX}" $AUTH_PARAM -s -o /dev/null -w "HTTP状态码: %{http_code}\n" + +echo +echo "创建索引: $ES_INDEX" +echo + +# 创建索引(使用 mapping 文件) +curl -X PUT "${ES_HOST}/${ES_INDEX}" \ + -H "Content-Type: application/json" \ + $AUTH_PARAM \ + -d @"${MAPPING_FILE}" \ + -w "\nHTTP状态码: %{http_code}\n" + +echo +echo "完成!" +echo "提示: 请调用 /indexer/reindex 接口导入数据" + diff --git a/scripts/recreate_index.py b/scripts/recreate_index.py deleted file mode 100644 index eae4bf2..0000000 --- a/scripts/recreate_index.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -"""重建 ES 索引(仅索引结构,不导入数据)。 - -- 删除并重建索引(基于 mappings/search_products.json) -- 依赖环境变量中的 ES 配置: - - ES_HOST(默认: http://localhost:9200) - - ES_USERNAME(可选) - - ES_PASSWORD(可选) - -用法: - python scripts/recreate_index.py -""" - -import os -import sys -from pathlib import Path - -# 将项目根目录加入 sys.path -PROJECT_ROOT = Path(__file__).resolve().parent.parent -sys.path.insert(0, str(PROJECT_ROOT)) - -from utils.es_client import ESClient # type: ignore -from indexer.mapping_generator import ( # type: ignore - load_mapping, - delete_index_if_exists, - DEFAULT_INDEX_NAME, -) - - -def main() -> int: - print("=" * 60) - print("Recreate Elasticsearch index (structure only, no data import)") - print("=" * 60) - - # 1. 连接 Elasticsearch - es_host = os.environ.get("ES_HOST", "http://localhost:9200") - es_username = os.environ.get("ES_USERNAME") - es_password = os.environ.get("ES_PASSWORD") - - print(f"ES host: {es_host}") - if es_username: - print(f"ES username: {es_username}") - - try: - if es_username and es_password: - es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) - else: - es_client = ESClient(hosts=[es_host]) - - if not es_client.ping(): - print(f"[ERROR] Cannot connect to Elasticsearch at {es_host}") - return 1 - except Exception as e: - print(f"[ERROR] Failed to connect to Elasticsearch: {e}") - return 1 - - index_name = DEFAULT_INDEX_NAME - print(f"Index name: {index_name}") - - # 2. 加载 mapping - try: - mapping = load_mapping() - print("Loaded mapping configuration.") - except Exception as e: - print(f"[ERROR] Failed to load mapping: {e}") - return 1 - - # 3. 删除旧索引(如果存在) - print(f"Deleting existing index if exists: {index_name} ...") - try: - if es_client.index_exists(index_name): - if delete_index_if_exists(es_client, index_name): - print(f"✓ Deleted index: {index_name}") - else: - print(f"[ERROR] Failed to delete index: {index_name}") - return 1 - else: - print(f"Index does not exist, skip delete: {index_name}") - except Exception as e: - print(f"[ERROR] Error while deleting index: {e}") - return 1 - - # 4. 创建新索引 - print(f"Creating index: {index_name} ...") - try: - if es_client.create_index(index_name, mapping): - print(f"✓ Created index: {index_name}") - else: - print(f"[ERROR] Failed to create index: {index_name}") - return 1 - except Exception as e: - print(f"[ERROR] Error while creating index: {e}") - return 1 - - print("=" * 60) - print("Index recreation completed. Please trigger /indexer/reindex per tenant to re-import data.") - print("=" * 60) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) -- libgit2 0.21.2