From 791a790908dc1f1429a29b7ad27c0713cc95dff4 Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 19 Dec 2025 09:04:19 +0800 Subject: [PATCH] 支持并发的增量和全量请求: --- api/routes/indexer.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/api/routes/indexer.py b/api/routes/indexer.py index a739a9a..d78a24a 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -4,6 +4,7 @@ 提供全量和增量索引接口,供外部Java程序调用。 """ +import asyncio from fastapi import APIRouter, HTTPException from typing import List from pydantic import BaseModel @@ -44,16 +45,28 @@ async def reindex_all(request: ReindexRequest): 将指定租户的所有SPU数据重新索引到ES。 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。 + + 注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。 + 全量索引和增量索引可以并行执行。 """ try: service = get_bulk_indexing_service() if service is None: raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") - return service.bulk_index( - tenant_id=request.tenant_id, - recreate_index=False, - batch_size=request.batch_size + + # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环 + # 这样全量索引和增量索引可以并行执行 + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, # 使用默认线程池 + lambda: service.bulk_index( + tenant_id=request.tenant_id, + recreate_index=False, + batch_size=request.batch_size + ) ) + + return result except HTTPException: raise except Exception as e: @@ -82,6 +95,8 @@ async def index_spus(request: IndexSpusRequest): - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed) - failed状态的元素会包含msg字段说明失败原因 - 最后给出总体统计:total, success_count, failed_count等 + + 注意:增量索引在线程池中执行,可以与全量索引并行执行。 """ try: # 验证请求参数 @@ -102,12 +117,17 @@ async def index_spus(request: IndexSpusRequest): if es_client is None: raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") - # 调用批量索引方法(支持显式删除参数) - result = service.index_spus_to_es( - es_client=es_client, - tenant_id=request.tenant_id, - spu_ids=request.spu_ids if request.spu_ids else [], - delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None + # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环 + # 这样全量索引和增量索引可以并行执行 + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, # 使用默认线程池 + lambda: service.index_spus_to_es( + es_client=es_client, + tenant_id=request.tenant_id, + spu_ids=request.spu_ids if request.spu_ids else [], + delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None + ) ) return result -- libgit2 0.21.2