indexer.py
5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
索引API路由。
提供全量和增量索引接口,供外部Java程序调用。
"""
from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
class ReindexRequest(BaseModel):
"""全量重建索引请求"""
tenant_id: str
recreate_index: bool = False
batch_size: int = 500
class IndexSpusRequest(BaseModel):
"""增量索引请求(按SPU列表索引)"""
tenant_id: str
spu_ids: List[str]
class GetDocumentsRequest(BaseModel):
"""查询文档请求(不写入ES)"""
tenant_id: str
spu_ids: List[str]
@router.post("/reindex")
async def reindex_all(request: ReindexRequest):
"""
全量重建索引接口
将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。
"""
try:
from ..app import get_bulk_indexing_service
service = get_bulk_indexing_service()
if service is None:
raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
return service.bulk_index(
tenant_id=request.tenant_id,
recreate_index=request.recreate_index,
batch_size=request.batch_size
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/index")
async def index_spus(request: IndexSpusRequest):
"""
增量索引接口
根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
"""
try:
from ..app import get_incremental_service, get_es_client
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
es_client = get_es_client()
if es_client is None:
raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
# 调用批量索引方法
result = service.index_spus_to_es(
es_client=es_client,
tenant_id=request.tenant_id,
spu_ids=request.spu_ids
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/documents")
async def get_documents(request: GetDocumentsRequest):
"""
查询文档接口
根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
"""
try:
from ..app import get_incremental_service
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
success_list, failed_list = [], []
for spu_id in request.spu_ids:
try:
doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
(success_list if doc else failed_list).append({
"spu_id": spu_id,
"document": doc
} if doc else {
"spu_id": spu_id,
"error": "SPU not found or deleted"
})
except Exception as e:
failed_list.append({"spu_id": spu_id, "error": str(e)})
return {
"success": success_list,
"failed": failed_list,
"total": len(request.spu_ids),
"success_count": len(success_list),
"failed_count": len(failed_list)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
"""检查索引服务健康状态"""
try:
from ..app import get_incremental_service
from sqlalchemy import text
service = get_incremental_service()
if service is None:
return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
try:
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
return {
"status": "available",
"database": db_status,
"preloaded_data": {"category_mappings": len(service.category_id_to_name)}
}
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
return {"status": "error", "message": str(e)}