Commit 791a790908dc1f1429a29b7ad27c0713cc95dff4

Authored by tangwang
1 parent d6606d7a

支持并发的增量和全量请求:

1. 添加 asyncio 导入
在文件顶部添加 import asyncio,用于在线程池中执行同步阻塞操作
2. 修改 /indexer/reindex 路由(全量索引)
使用 loop.run_in_executor() 将 service.bulk_index() 放到线程池执行
避免阻塞事件循环,允许其他请求并行处理
3. 修改 /indexer/index 路由(增量索引)
使用 loop.run_in_executor() 将 service.index_spus_to_es() 放到线程池执行
确保全量索引和增量索引可以并行执行
工作原理
线程池执行:同步阻塞操作(如数据库查询、ES 写入)在线程池中执行,不阻塞事件循环
并发支持:
全量索引占用一个线程
增量索引可同时使用其他线程
多个增量请求可并行处理
资源管理:
数据库连接池(pool_size=10, max_overflow=20)可支持并发请求
uvicorn 默认线程池(40 个线程)可处理多个并发请求
Showing 1 changed file with 30 additions and 10 deletions   Show diff stats
api/routes/indexer.py
@@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
4 提供全量和增量索引接口,供外部Java程序调用。 4 提供全量和增量索引接口,供外部Java程序调用。
5 """ 5 """
6 6
  7 +import asyncio
7 from fastapi import APIRouter, HTTPException 8 from fastapi import APIRouter, HTTPException
8 from typing import List 9 from typing import List
9 from pydantic import BaseModel 10 from pydantic import BaseModel
@@ -44,16 +45,28 @@ async def reindex_all(request: ReindexRequest): @@ -44,16 +45,28 @@ async def reindex_all(request: ReindexRequest):
44 45
45 将指定租户的所有SPU数据重新索引到ES。 46 将指定租户的所有SPU数据重新索引到ES。
46 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。 47 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。
  48 +
  49 + 注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。
  50 + 全量索引和增量索引可以并行执行。
47 """ 51 """
48 try: 52 try:
49 service = get_bulk_indexing_service() 53 service = get_bulk_indexing_service()
50 if service is None: 54 if service is None:
51 raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") 55 raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
52 - return service.bulk_index(  
53 - tenant_id=request.tenant_id,  
54 - recreate_index=False,  
55 - batch_size=request.batch_size 56 +
  57 + # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
  58 + # 这样全量索引和增量索引可以并行执行
  59 + loop = asyncio.get_event_loop()
  60 + result = await loop.run_in_executor(
  61 + None, # 使用默认线程池
  62 + lambda: service.bulk_index(
  63 + tenant_id=request.tenant_id,
  64 + recreate_index=False,
  65 + batch_size=request.batch_size
  66 + )
56 ) 67 )
  68 +
  69 + return result
57 except HTTPException: 70 except HTTPException:
58 raise 71 raise
59 except Exception as e: 72 except Exception as e:
@@ -82,6 +95,8 @@ async def index_spus(request: IndexSpusRequest): @@ -82,6 +95,8 @@ async def index_spus(request: IndexSpusRequest):
82 - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed) 95 - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed)
83 - failed状态的元素会包含msg字段说明失败原因 96 - failed状态的元素会包含msg字段说明失败原因
84 - 最后给出总体统计:total, success_count, failed_count等 97 - 最后给出总体统计:total, success_count, failed_count等
  98 +
  99 + 注意:增量索引在线程池中执行,可以与全量索引并行执行。
85 """ 100 """
86 try: 101 try:
87 # 验证请求参数 102 # 验证请求参数
@@ -102,12 +117,17 @@ async def index_spus(request: IndexSpusRequest): @@ -102,12 +117,17 @@ async def index_spus(request: IndexSpusRequest):
102 if es_client is None: 117 if es_client is None:
103 raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") 118 raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
104 119
105 - # 调用批量索引方法(支持显式删除参数)  
106 - result = service.index_spus_to_es(  
107 - es_client=es_client,  
108 - tenant_id=request.tenant_id,  
109 - spu_ids=request.spu_ids if request.spu_ids else [],  
110 - delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None 120 + # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
  121 + # 这样全量索引和增量索引可以并行执行
  122 + loop = asyncio.get_event_loop()
  123 + result = await loop.run_in_executor(
  124 + None, # 使用默认线程池
  125 + lambda: service.index_spus_to_es(
  126 + es_client=es_client,
  127 + tenant_id=request.tenant_id,
  128 + spu_ids=request.spu_ids if request.spu_ids else [],
  129 + delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
  130 + )
111 ) 131 )
112 132
113 return result 133 return result