Blame view

api/routes/indexer.py 3.83 KB
0064e946   tangwang   feat: 增量索引服务、租户配置...
1
  """
3c1f8031   tangwang   api/routes/indexe...
2
  索引API路由。
0064e946   tangwang   feat: 增量索引服务、租户配置...
3
  
3c1f8031   tangwang   api/routes/indexe...
4
  提供全量和增量索引接口,供外部Java程序调用。
0064e946   tangwang   feat: 增量索引服务、租户配置...
5
6
  """
  
3c1f8031   tangwang   api/routes/indexe...
7
8
9
  from fastapi import APIRouter, HTTPException
  from typing import List
  from pydantic import BaseModel
0064e946   tangwang   feat: 增量索引服务、租户配置...
10
11
  import logging
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
12
13
14
15
16
  logger = logging.getLogger(__name__)
  
  router = APIRouter(prefix="/indexer", tags=["indexer"])
  
  
3c1f8031   tangwang   api/routes/indexe...
17
18
19
20
21
  class BulkIndexRequest(BaseModel):
      tenant_id: str
      recreate_index: bool = False
      batch_size: int = 500
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
22
  
3c1f8031   tangwang   api/routes/indexe...
23
24
25
  class BatchSpuRequest(BaseModel):
      tenant_id: str
      spu_ids: List[str]
0064e946   tangwang   feat: 增量索引服务、租户配置...
26
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
27
  
3c1f8031   tangwang   api/routes/indexe...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  @router.post("/bulk")
  async def bulk_index(request: BulkIndexRequest):
      """全量索引接口"""
      try:
          from ..app import get_bulk_indexing_service
          service = get_bulk_indexing_service()
          if service is None:
              raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
          return service.bulk_index(
              tenant_id=request.tenant_id,
              recreate_index=request.recreate_index,
              batch_size=request.batch_size
          )
      except HTTPException:
          raise
      except Exception as e:
          logger.error(f"Error in bulk indexing for tenant_id={request.tenant_id}: {e}", exc_info=True)
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
0064e946   tangwang   feat: 增量索引服务、租户配置...
46
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
47
  
3c1f8031   tangwang   api/routes/indexe...
48
49
50
  @router.post("/spus")
  async def get_spu_documents(request: BatchSpuRequest):
      """获取SPU文档接口(支持单个或批量)"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
51
52
      try:
          from ..app import get_incremental_service
3c1f8031   tangwang   api/routes/indexe...
53
54
55
56
          if not request.spu_ids:
              raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
          if len(request.spu_ids) > 100:
              raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
0064e946   tangwang   feat: 增量索引服务、租户配置...
57
58
          service = get_incremental_service()
          if service is None:
3c1f8031   tangwang   api/routes/indexe...
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
              raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
          success_list, failed_list = [], []
          for spu_id in request.spu_ids:
              try:
                  doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
                  (success_list if doc else failed_list).append({
                      "spu_id": spu_id,
                      "document": doc
                  } if doc else {
                      "spu_id": spu_id,
                      "error": "SPU not found or deleted"
                  })
              except Exception as e:
                  failed_list.append({"spu_id": spu_id, "error": str(e)})
          return {
              "success": success_list,
              "failed": failed_list,
              "total": len(request.spu_ids),
              "success_count": len(success_list),
              "failed_count": len(failed_list)
          }
0064e946   tangwang   feat: 增量索引服务、租户配置...
80
81
82
      except HTTPException:
          raise
      except Exception as e:
3c1f8031   tangwang   api/routes/indexe...
83
          logger.error(f"Error getting SPU documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
0064e946   tangwang   feat: 增量索引服务、租户配置...
84
85
86
87
88
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  
  
  @router.get("/health")
  async def indexer_health_check():
3c1f8031   tangwang   api/routes/indexe...
89
      """检查索引服务健康状态"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
90
91
      try:
          from ..app import get_incremental_service
3c1f8031   tangwang   api/routes/indexe...
92
          from sqlalchemy import text
0064e946   tangwang   feat: 增量索引服务、租户配置...
93
94
          service = get_incremental_service()
          if service is None:
3c1f8031   tangwang   api/routes/indexe...
95
              return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
0064e946   tangwang   feat: 增量索引服务、租户配置...
96
          try:
0064e946   tangwang   feat: 增量索引服务、租户配置...
97
98
99
100
101
              with service.db_engine.connect() as conn:
                  conn.execute(text("SELECT 1"))
              db_status = "connected"
          except Exception as e:
              db_status = f"disconnected: {str(e)}"
0064e946   tangwang   feat: 增量索引服务、租户配置...
102
103
104
          return {
              "status": "available",
              "database": db_status,
3c1f8031   tangwang   api/routes/indexe...
105
              "preloaded_data": {"category_mappings": len(service.category_id_to_name)}
0064e946   tangwang   feat: 增量索引服务、租户配置...
106
          }
0064e946   tangwang   feat: 增量索引服务、租户配置...
107
108
      except Exception as e:
          logger.error(f"Error checking indexer health: {e}", exc_info=True)
3c1f8031   tangwang   api/routes/indexe...
109
          return {"status": "error", "message": str(e)}
0064e946   tangwang   feat: 增量索引服务、租户配置...