indexer.py
4.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
增量索引API路由。
提供单个SPU数据获取接口,用于增量更新ES索引。
"""
from fastapi import APIRouter, HTTPException, Query, Request
from typing import Optional
import logging
from ..models import ErrorResponse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/indexer", tags=["indexer"])
@router.get("/spu/{spu_id}")
async def get_spu_document(
spu_id: str,
tenant_id: str = Query(..., description="租户ID"),
request: Request = None
):
"""
获取单个SPU的ES文档数据(用于增量索引更新)。
功能说明:
- 根据 tenant_id 和 spu_id 查询MySQL数据库
- 返回该SPU的完整ES文档数据(JSON格式)
- 外部Java程序可以调用此接口获取数据后推送到ES
参数:
- spu_id: SPU ID(路径参数)
- tenant_id: 租户ID(查询参数,必需)
返回:
- 成功:返回ES文档JSON对象
- SPU不存在或已删除:返回404
- 其他错误:返回500
示例请求:
```
GET /indexer/spu/123?tenant_id=1
```
示例响应:
```json
{
"tenant_id": "1",
"spu_id": "123",
"title_zh": "商品标题",
"brief_zh": "商品简介",
"description_zh": "商品描述",
"vendor_zh": "供应商",
"tags": ["标签1", "标签2"],
"category_path_zh": "类目1/类目2/类目3",
"category1_name": "类目1",
"category2_name": "类目2",
"category3_name": "类目3",
"category_id": "100",
"category_level": 3,
"min_price": 99.99,
"max_price": 199.99,
"compare_at_price": 299.99,
"sales": 1000,
"total_inventory": 500,
"skus": [...],
"specifications": [...],
...
}
```
"""
try:
from ..app import get_incremental_service
# 获取增量服务实例
service = get_incremental_service()
if service is None:
raise HTTPException(
status_code=503,
detail="Incremental indexer service is not initialized. Please check database connection."
)
# 获取SPU文档
doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
if doc is None:
raise HTTPException(
status_code=404,
detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted"
)
return doc
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/health")
async def indexer_health_check():
"""
检查增量索引服务健康状态。
返回:
- 服务是否可用
- 数据库连接状态
- 预加载数据状态
"""
try:
from ..app import get_incremental_service
service = get_incremental_service()
if service is None:
return {
"status": "unavailable",
"message": "Incremental indexer service is not initialized",
"database": "unknown",
"preloaded_data": {
"category_mappings": 0
}
}
# 检查数据库连接
try:
from sqlalchemy import text
with service.db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"disconnected: {str(e)}"
return {
"status": "available",
"database": db_status,
"preloaded_data": {
"category_mappings": len(service.category_id_to_name)
}
}
except Exception as e:
logger.error(f"Error checking indexer health: {e}", exc_info=True)
return {
"status": "error",
"message": str(e)
}