indexer.py
6.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
索引API路由。
提供全量和增量索引接口,供外部Java程序调用。
"""
from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
class ReindexRequest(BaseModel):
"""全量重建索引请求"""
tenant_id: str
recreate_index: bool = False
batch_size: int = 500
class IndexSpusRequest(BaseModel):
"""增量索引请求(按SPU列表索引)"""
tenant_id: str
spu_ids: List[str]
delete_spu_ids: List[str] = [] # 显式指定要删除的SPU ID列表(可选)
class GetDocumentsRequest(BaseModel):
"""查询文档请求(不写入ES)"""
tenant_id: str
spu_ids: List[str]
@router.post("/reindex")
async def reindex_all(request: ReindexRequest):
"""
全量重建索引接口
将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。
"""
try:
from ..app import get_bulk_indexing_service
service = get_bulk_indexing_service()
if service is None:
raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
return service.bulk_index(
tenant_id=request.tenant_id,
recreate_index=request.recreate_index,
batch_size=request.batch_size
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/index")
async def index_spus(request: IndexSpusRequest):
"""
增量索引接口
根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
支持两种删除方式:
1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
删除策略说明:
- 数据库是唯一真实来源(Single Source of Truth)
- 自动检测:查询数据库时发现deleted=1,自动从ES删除
- 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
"""
try:
from ..app import get_incremental_service, get_es_client
# 验证请求参数
if not request.spu_ids and not request.delete_spu_ids:
raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
if request.spu_ids and len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
es_client = get_es_client()
if es_client is None:
raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
# 调用批量索引方法(支持显式删除参数)
result = service.index_spus_to_es(
es_client=es_client,
tenant_id=request.tenant_id,
spu_ids=request.spu_ids if request.spu_ids else [],
delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/documents")
async def get_documents(request: GetDocumentsRequest):
"""
查询文档接口
根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
"""
try:
from ..app import get_incremental_service
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
success_list, failed_list = [], []
for spu_id in request.spu_ids:
try:
doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
(success_list if doc else failed_list).append({
"spu_id": spu_id,
"document": doc
} if doc else {
"spu_id": spu_id,
"error": "SPU not found or deleted"
})
except Exception as e:
failed_list.append({"spu_id": spu_id, "error": str(e)})
return {
"success": success_list,
"failed": failed_list,
"total": len(request.spu_ids),
"success_count": len(success_list),
"failed_count": len(failed_list)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
"""检查索引服务健康状态"""
try:
from ..app import get_incremental_service
from sqlalchemy import text
service = get_incremental_service()
if service is None:
return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
try:
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
return {
"status": "available",
"database": db_status,
"preloaded_data": {"category_mappings": len(service.category_id_to_name)}
}
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
return {"status": "error", "message": str(e)}