0064e946
tangwang
feat: 增量索引服务、租户配置...
|
1
|
"""
|
3c1f8031
tangwang
api/routes/indexe...
|
2
|
索引API路由。
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
3
|
|
3c1f8031
tangwang
api/routes/indexe...
|
4
|
提供全量和增量索引接口,供外部Java程序调用。
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
5
6
|
"""
|
791a7909
tangwang
支持并发的增量和全量请求:
|
7
|
import asyncio
|
3c1f8031
tangwang
api/routes/indexe...
|
8
9
10
|
from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
11
|
import logging
|
bb9c626c
tangwang
搜索服务(6002)不再初始化/挂...
|
12
13
14
15
|
from sqlalchemy import text
# Indexer routes depend on services provided by api/indexer_app.py via this registry.
from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
16
|
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
17
18
19
20
21
|
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
22
23
|
class ReindexRequest(BaseModel):
"""全量重建索引请求"""
|
3c1f8031
tangwang
api/routes/indexe...
|
24
|
tenant_id: str
|
3c1f8031
tangwang
api/routes/indexe...
|
25
26
|
batch_size: int = 500
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
27
|
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
28
29
|
class IndexSpusRequest(BaseModel):
"""增量索引请求(按SPU列表索引)"""
|
3c1f8031
tangwang
api/routes/indexe...
|
30
31
|
tenant_id: str
spu_ids: List[str]
|
f54b3854
tangwang
pu_ids参数。目前总共3个参数:
|
32
|
delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
33
|
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
34
|
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
35
36
37
38
39
40
41
42
43
44
45
|
class GetDocumentsRequest(BaseModel):
"""查询文档请求(不写入ES)"""
tenant_id: str
spu_ids: List[str]
@router.post("/reindex")
async def reindex_all(request: ReindexRequest):
"""
全量重建索引接口
|
351a7eb5
tangwang
1. 新的重建索引脚本
|
46
47
|
将指定租户的所有SPU数据重新索引到ES。
注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。
|
791a7909
tangwang
支持并发的增量和全量请求:
|
48
49
50
|
注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。
全量索引和增量索引可以并行执行。
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
51
|
"""
|
3c1f8031
tangwang
api/routes/indexe...
|
52
|
try:
|
3c1f8031
tangwang
api/routes/indexe...
|
53
54
55
|
service = get_bulk_indexing_service()
if service is None:
raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
|
791a7909
tangwang
支持并发的增量和全量请求:
|
56
57
58
59
60
61
62
63
64
65
66
|
# 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
# 这样全量索引和增量索引可以并行执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
lambda: service.bulk_index(
tenant_id=request.tenant_id,
recreate_index=False,
batch_size=request.batch_size
)
|
3c1f8031
tangwang
api/routes/indexe...
|
67
|
)
|
791a7909
tangwang
支持并发的增量和全量请求:
|
68
69
|
return result
|
3c1f8031
tangwang
api/routes/indexe...
|
70
71
72
|
except HTTPException:
raise
except Exception as e:
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
73
74
75
76
77
78
79
80
81
82
|
logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/index")
async def index_spus(request: IndexSpusRequest):
"""
增量索引接口
根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
|
f54b3854
tangwang
pu_ids参数。目前总共3个参数:
|
83
84
85
86
87
88
89
90
91
|
支持两种删除方式:
1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
删除策略说明:
- 数据库是唯一真实来源(Single Source of Truth)
- 自动检测:查询数据库时发现deleted=1,自动从ES删除
- 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
|
c797ba2b
tangwang
1. 增量索引接口,增加删除操作后...
|
92
93
94
95
96
97
|
响应格式:
- spu_ids: spu_ids对应的响应列表,每个元素包含spu_id和status(indexed/deleted/failed)
- delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed)
- failed状态的元素会包含msg字段说明失败原因
- 最后给出总体统计:total, success_count, failed_count等
|
791a7909
tangwang
支持并发的增量和全量请求:
|
98
99
|
注意:增量索引在线程池中执行,可以与全量索引并行执行。
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
100
101
|
"""
try:
|
f54b3854
tangwang
pu_ids参数。目前总共3个参数:
|
102
103
104
105
106
107
108
109
110
|
# 验证请求参数
if not request.spu_ids and not request.delete_spu_ids:
raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
if request.spu_ids and len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
111
112
113
114
115
116
117
118
119
|
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
es_client = get_es_client()
if es_client is None:
raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
|
791a7909
tangwang
支持并发的增量和全量请求:
|
120
121
122
123
124
125
126
127
128
129
130
|
# 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
# 这样全量索引和增量索引可以并行执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
lambda: service.index_spus_to_es(
es_client=es_client,
tenant_id=request.tenant_id,
spu_ids=request.spu_ids if request.spu_ids else [],
delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
)
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
131
132
133
134
135
136
137
138
|
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
|
3c1f8031
tangwang
api/routes/indexe...
|
139
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
140
|
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
141
|
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
142
143
144
145
146
147
148
|
@router.post("/documents")
async def get_documents(request: GetDocumentsRequest):
"""
查询文档接口
根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
149
|
try:
|
3c1f8031
tangwang
api/routes/indexe...
|
150
151
152
153
|
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
154
155
|
service = get_incremental_service()
if service is None:
|
3c1f8031
tangwang
api/routes/indexe...
|
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
success_list, failed_list = [], []
for spu_id in request.spu_ids:
try:
doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
(success_list if doc else failed_list).append({
"spu_id": spu_id,
"document": doc
} if doc else {
"spu_id": spu_id,
"error": "SPU not found or deleted"
})
except Exception as e:
failed_list.append({"spu_id": spu_id, "error": str(e)})
return {
"success": success_list,
"failed": failed_list,
"total": len(request.spu_ids),
"success_count": len(success_list),
"failed_count": len(failed_list)
}
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
177
178
179
|
except HTTPException:
raise
except Exception as e:
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
180
|
logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
181
182
183
184
185
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
|
3c1f8031
tangwang
api/routes/indexe...
|
186
|
"""检查索引服务健康状态"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
187
|
try:
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
188
189
|
service = get_incremental_service()
if service is None:
|
3c1f8031
tangwang
api/routes/indexe...
|
190
|
return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
191
|
try:
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
192
193
194
195
196
|
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
197
198
199
|
return {
"status": "available",
"database": db_status,
|
3c1f8031
tangwang
api/routes/indexe...
|
200
|
"preloaded_data": {"category_mappings": len(service.category_id_to_name)}
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
201
|
}
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
202
203
|
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
|
3c1f8031
tangwang
api/routes/indexe...
|
204
|
return {"status": "error", "message": str(e)}
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
|
|