indexer.py
7.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
索引API路由。
提供全量和增量索引接口,供外部Java程序调用。
"""
import asyncio
from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
import logging
from sqlalchemy import text
# Indexer routes depend on services provided by api/indexer_app.py via this registry.
from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
class ReindexRequest(BaseModel):
"""全量重建索引请求"""
tenant_id: str
batch_size: int = 500
class IndexSpusRequest(BaseModel):
"""增量索引请求(按SPU列表索引)"""
tenant_id: str
spu_ids: List[str]
delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选)
class GetDocumentsRequest(BaseModel):
"""查询文档请求(不写入ES)"""
tenant_id: str
spu_ids: List[str]
@router.post("/reindex")
async def reindex_all(request: ReindexRequest):
"""
全量重建索引接口
将指定租户的所有SPU数据重新索引到ES。
注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。
注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。
全量索引和增量索引可以并行执行。
"""
try:
service = get_bulk_indexing_service()
if service is None:
raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
# 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
# 这样全量索引和增量索引可以并行执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
lambda: service.bulk_index(
tenant_id=request.tenant_id,
recreate_index=False,
batch_size=request.batch_size
)
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/index")
async def index_spus(request: IndexSpusRequest):
"""
增量索引接口
根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
支持两种删除方式:
1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
删除策略说明:
- 数据库是唯一真实来源(Single Source of Truth)
- 自动检测:查询数据库时发现deleted=1,自动从ES删除
- 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
响应格式:
- spu_ids: spu_ids对应的响应列表,每个元素包含spu_id和status(indexed/deleted/failed)
- delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed)
- failed状态的元素会包含msg字段说明失败原因
- 最后给出总体统计:total, success_count, failed_count等
注意:增量索引在线程池中执行,可以与全量索引并行执行。
"""
try:
# 验证请求参数
if not request.spu_ids and not request.delete_spu_ids:
raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
if request.spu_ids and len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
es_client = get_es_client()
if es_client is None:
raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
# 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
# 这样全量索引和增量索引可以并行执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
lambda: service.index_spus_to_es(
es_client=es_client,
tenant_id=request.tenant_id,
spu_ids=request.spu_ids if request.spu_ids else [],
delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
)
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/documents")
async def get_documents(request: GetDocumentsRequest):
"""
查询文档接口
根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
"""
try:
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
success_list, failed_list = [], []
for spu_id in request.spu_ids:
try:
doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
(success_list if doc else failed_list).append({
"spu_id": spu_id,
"document": doc
} if doc else {
"spu_id": spu_id,
"error": "SPU not found or deleted"
})
except Exception as e:
failed_list.append({"spu_id": spu_id, "error": str(e)})
return {
"success": success_list,
"failed": failed_list,
"total": len(request.spu_ids),
"success_count": len(success_list),
"failed_count": len(failed_list)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
"""检查索引服务健康状态"""
try:
service = get_incremental_service()
if service is None:
return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
try:
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
return {
"status": "available",
"database": db_status,
"preloaded_data": {"category_mappings": len(service.category_id_to_name)}
}
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
return {"status": "error", "message": str(e)}