Blame view

api/routes/indexer.py 7.96 KB
0064e946   tangwang   feat: 增量索引服务、租户配置...
1
  """
3c1f8031   tangwang   api/routes/indexe...
2
  索引API路由。
0064e946   tangwang   feat: 增量索引服务、租户配置...
3
  
3c1f8031   tangwang   api/routes/indexe...
4
  提供全量和增量索引接口,供外部Java程序调用。
0064e946   tangwang   feat: 增量索引服务、租户配置...
5
6
  """
  
791a7909   tangwang   支持并发的增量和全量请求:
7
  import asyncio
3c1f8031   tangwang   api/routes/indexe...
8
9
10
  from fastapi import APIRouter, HTTPException
  from typing import List
  from pydantic import BaseModel
0064e946   tangwang   feat: 增量索引服务、租户配置...
11
  import logging
bb9c626c   tangwang   搜索服务(6002)不再初始化/挂...
12
13
14
15
  from sqlalchemy import text
  
  # Indexer routes depend on services provided by api/indexer_app.py via this registry.
  from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client
0064e946   tangwang   feat: 增量索引服务、租户配置...
16
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
17
18
19
20
21
  logger = logging.getLogger(__name__)
  
  router = APIRouter(prefix="/indexer", tags=["indexer"])
  
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
22
23
  class ReindexRequest(BaseModel):
      """全量重建索引请求"""
3c1f8031   tangwang   api/routes/indexe...
24
      tenant_id: str
3c1f8031   tangwang   api/routes/indexe...
25
26
      batch_size: int = 500
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
27
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
28
29
  class IndexSpusRequest(BaseModel):
      """增量索引请求(按SPU列表索引)"""
3c1f8031   tangwang   api/routes/indexe...
30
31
      tenant_id: str
      spu_ids: List[str]
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
32
      delete_spu_ids: List[str] = []  # 显式指定要删除的SPU ID列表(可选)
0064e946   tangwang   feat: 增量索引服务、租户配置...
33
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
34
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
35
36
37
38
39
40
41
42
43
44
45
  class GetDocumentsRequest(BaseModel):
      """查询文档请求(不写入ES)"""
      tenant_id: str
      spu_ids: List[str]
  
  
  @router.post("/reindex")
  async def reindex_all(request: ReindexRequest):
      """
      全量重建索引接口
      
351a7eb5   tangwang   1. 新的重建索引脚本
46
47
      将指定租户的所有SPU数据重新索引到ES
      注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引(删除后重建),请在服务器上执行 scripts/recreate_index.py 脚本。
791a7909   tangwang   支持并发的增量和全量请求:
48
49
50
      
      注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。
      全量索引和增量索引可以并行执行。
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
51
      """
3c1f8031   tangwang   api/routes/indexe...
52
      try:
3c1f8031   tangwang   api/routes/indexe...
53
54
55
          service = get_bulk_indexing_service()
          if service is None:
              raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
791a7909   tangwang   支持并发的增量和全量请求:
56
57
58
59
60
61
62
63
64
65
66
          
          # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
          # 这样全量索引和增量索引可以并行执行
          loop = asyncio.get_event_loop()
          result = await loop.run_in_executor(
              None,  # 使用默认线程池
              lambda: service.bulk_index(
                  tenant_id=request.tenant_id,
                  recreate_index=False,
                  batch_size=request.batch_size
              )
3c1f8031   tangwang   api/routes/indexe...
67
          )
791a7909   tangwang   支持并发的增量和全量请求:
68
69
          
          return result
3c1f8031   tangwang   api/routes/indexe...
70
71
72
      except HTTPException:
          raise
      except Exception as e:
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
73
74
75
76
77
78
79
80
81
82
          logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True)
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  
  
  @router.post("/index")
  async def index_spus(request: IndexSpusRequest):
      """
      增量索引接口
      
      根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
83
84
85
86
87
88
89
90
91
      
      支持两种删除方式:
      1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档
      2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何)
      
      删除策略说明:
      - 数据库是唯一真实来源(Single Source of Truth
      - 自动检测:查询数据库时发现deleted=1,自动从ES删除
      - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
c797ba2b   tangwang   1. 增量索引接口,增加删除操作后...
92
93
94
95
96
97
      
      响应格式:
      - spu_ids: spu_ids对应的响应列表,每个元素包含spu_idstatusindexed/deleted/failed
      - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_idstatusdeleted/not_found/failed
      - failed状态的元素会包含msg字段说明失败原因
      - 最后给出总体统计:total, success_count, failed_count
791a7909   tangwang   支持并发的增量和全量请求:
98
99
      
      注意:增量索引在线程池中执行,可以与全量索引并行执行。
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
100
101
      """
      try:
f54b3854   tangwang   pu_ids参数。目前总共3个参数:
102
103
104
105
106
107
108
109
110
          # 验证请求参数
          if not request.spu_ids and not request.delete_spu_ids:
              raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
          
          if request.spu_ids and len(request.spu_ids) > 100:
              raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing")
          
          if request.delete_spu_ids and len(request.delete_spu_ids) > 100:
              raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
111
112
113
114
115
116
117
118
119
          
          service = get_incremental_service()
          if service is None:
              raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
          
          es_client = get_es_client()
          if es_client is None:
              raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized")
          
791a7909   tangwang   支持并发的增量和全量请求:
120
121
122
123
124
125
126
127
128
129
130
          # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环
          # 这样全量索引和增量索引可以并行执行
          loop = asyncio.get_event_loop()
          result = await loop.run_in_executor(
              None,  # 使用默认线程池
              lambda: service.index_spus_to_es(
                  es_client=es_client,
                  tenant_id=request.tenant_id,
                  spu_ids=request.spu_ids if request.spu_ids else [],
                  delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None
              )
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
131
132
133
134
135
136
137
138
          )
          
          return result
          
      except HTTPException:
          raise
      except Exception as e:
          logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True)
3c1f8031   tangwang   api/routes/indexe...
139
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
0064e946   tangwang   feat: 增量索引服务、租户配置...
140
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
141
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
142
143
144
145
146
147
148
  @router.post("/documents")
  async def get_documents(request: GetDocumentsRequest):
      """
      查询文档接口
      
      根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
      """
0064e946   tangwang   feat: 增量索引服务、租户配置...
149
      try:
3c1f8031   tangwang   api/routes/indexe...
150
151
152
153
          if not request.spu_ids:
              raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
          if len(request.spu_ids) > 100:
              raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
0064e946   tangwang   feat: 增量索引服务、租户配置...
154
155
          service = get_incremental_service()
          if service is None:
3c1f8031   tangwang   api/routes/indexe...
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
              raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
          success_list, failed_list = [], []
          for spu_id in request.spu_ids:
              try:
                  doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
                  (success_list if doc else failed_list).append({
                      "spu_id": spu_id,
                      "document": doc
                  } if doc else {
                      "spu_id": spu_id,
                      "error": "SPU not found or deleted"
                  })
              except Exception as e:
                  failed_list.append({"spu_id": spu_id, "error": str(e)})
          return {
              "success": success_list,
              "failed": failed_list,
              "total": len(request.spu_ids),
              "success_count": len(success_list),
              "failed_count": len(failed_list)
          }
0064e946   tangwang   feat: 增量索引服务、租户配置...
177
178
179
      except HTTPException:
          raise
      except Exception as e:
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
180
          logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
0064e946   tangwang   feat: 增量索引服务、租户配置...
181
182
183
184
185
          raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
  
  
  @router.get("/health")
  async def indexer_health_check():
3c1f8031   tangwang   api/routes/indexe...
186
      """检查索引服务健康状态"""
0064e946   tangwang   feat: 增量索引服务、租户配置...
187
      try:
0064e946   tangwang   feat: 增量索引服务、租户配置...
188
189
          service = get_incremental_service()
          if service is None:
3c1f8031   tangwang   api/routes/indexe...
190
              return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
0064e946   tangwang   feat: 增量索引服务、租户配置...
191
          try:
0064e946   tangwang   feat: 增量索引服务、租户配置...
192
193
194
195
196
              with service.db_engine.connect() as conn:
                  conn.execute(text("SELECT 1"))
              db_status = "connected"
          except Exception as e:
              db_status = f"disconnected: {str(e)}"
0064e946   tangwang   feat: 增量索引服务、租户配置...
197
198
199
          return {
              "status": "available",
              "database": db_status,
3c1f8031   tangwang   api/routes/indexe...
200
              "preloaded_data": {"category_mappings": len(service.category_id_to_name)}
0064e946   tangwang   feat: 增量索引服务、租户配置...
201
          }
0064e946   tangwang   feat: 增量索引服务、租户配置...
202
203
      except Exception as e:
          logger.error(f"Error checking indexer health: {e}", exc_info=True)
3c1f8031   tangwang   api/routes/indexe...
204
          return {"status": "error", "message": str(e)}
0064e946   tangwang   feat: 增量索引服务、租户配置...