indexer.py
3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
索引API路由。
提供全量和增量索引接口,供外部Java程序调用。
"""
from fastapi import APIRouter, HTTPException
from typing import List
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
class BulkIndexRequest(BaseModel):
tenant_id: str
recreate_index: bool = False
batch_size: int = 500
class BatchSpuRequest(BaseModel):
tenant_id: str
spu_ids: List[str]
@router.post("/bulk")
async def bulk_index(request: BulkIndexRequest):
"""全量索引接口"""
try:
from ..app import get_bulk_indexing_service
service = get_bulk_indexing_service()
if service is None:
raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
return service.bulk_index(
tenant_id=request.tenant_id,
recreate_index=request.recreate_index,
batch_size=request.batch_size
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in bulk indexing for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/spus")
async def get_spu_documents(request: BatchSpuRequest):
"""获取SPU文档接口(支持单个或批量)"""
try:
from ..app import get_incremental_service
if not request.spu_ids:
raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
if len(request.spu_ids) > 100:
raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
service = get_incremental_service()
if service is None:
raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
success_list, failed_list = [], []
for spu_id in request.spu_ids:
try:
doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
(success_list if doc else failed_list).append({
"spu_id": spu_id,
"document": doc
} if doc else {
"spu_id": spu_id,
"error": "SPU not found or deleted"
})
except Exception as e:
failed_list.append({"spu_id": spu_id, "error": str(e)})
return {
"success": success_list,
"failed": failed_list,
"total": len(request.spu_ids),
"success_count": len(success_list),
"failed_count": len(failed_list)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting SPU documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
"""检查索引服务健康状态"""
try:
from ..app import get_incremental_service
from sqlalchemy import text
service = get_incremental_service()
if service is None:
return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
try:
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
return {
"status": "available",
"database": db_status,
"preloaded_data": {"category_mappings": len(service.category_id_to_name)}
}
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
return {"status": "error", "message": str(e)}