Blame view

indexer/bulk_indexing_service.py 7.85 KB
3c1f8031   tangwang   api/routes/indexe...
1
2
3
4
5
6
7
8
9
10
11
12
13
  """
  全量索引服务。
  
  提供全量索引功能,将指定租户的所有SPU数据导入到ES
  """
  
  import logging
  from typing import Dict, Any
  from sqlalchemy import Engine
  from utils.es_client import ESClient
  from indexer.spu_transformer import SPUTransformer
  from indexer.bulk_indexer import BulkIndexer
  from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
14
15
16
  from indexer.indexer_logger import (
      get_indexer_logger, log_index_request, log_index_result, log_bulk_index_batch
  )
3c1f8031   tangwang   api/routes/indexe...
17
18
  
  logger = logging.getLogger(__name__)
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
19
20
  # Indexer专用日志器
  indexer_logger = get_indexer_logger()
3c1f8031   tangwang   api/routes/indexe...
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  
  
  class BulkIndexingService:
      """全量索引服务,提供批量导入功能。"""
  
      def __init__(self, db_engine: Engine, es_client: ESClient):
          """
          初始化全量索引服务。
  
          Args:
              db_engine: SQLAlchemy database engine
              es_client: Elasticsearch client
          """
          self.db_engine = db_engine
          self.es_client = es_client
          self.index_name = DEFAULT_INDEX_NAME
  
      def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
          """执行全量索引"""
          import time
          start_time = time.time()
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
43
44
45
46
47
48
49
50
51
52
53
54
          # 记录请求开始
          log_index_request(
              indexer_logger,
              index_type='bulk',
              tenant_id=tenant_id,
              request_params={
                  'recreate_index': recreate_index,
                  'batch_size': batch_size,
                  'index_name': self.index_name
              }
          )
  
3c1f8031   tangwang   api/routes/indexe...
55
56
57
          try:
              # 1. 加载mapping
              logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
58
59
60
61
62
63
64
65
66
              indexer_logger.info(
                  f"Loading mapping for bulk index",
                  extra={
                      'index_type': 'bulk',
                      'tenant_id': tenant_id,
                      'operation': 'load_mapping',
                      'index_name': self.index_name
                  }
              )
3c1f8031   tangwang   api/routes/indexe...
67
68
69
70
71
              mapping = load_mapping()
  
              # 2. 处理索引(删除并重建或创建)
              if recreate_index:
                  logger.info(f"[BulkIndexing] Recreating index: {self.index_name}")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
72
73
74
75
76
77
78
79
80
                  indexer_logger.info(
                      f"Recreating index: {self.index_name}",
                      extra={
                          'index_type': 'bulk',
                          'tenant_id': tenant_id,
                          'operation': 'recreate_index',
                          'index_name': self.index_name
                      }
                  )
3c1f8031   tangwang   api/routes/indexe...
81
82
83
84
85
86
87
88
                  if self.es_client.index_exists(self.index_name):
                      if delete_index_if_exists(self.es_client, self.index_name):
                          logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}")
                      else:
                          raise Exception(f"Failed to delete index: {self.index_name}")
  
              if not self.es_client.index_exists(self.index_name):
                  logger.info(f"[BulkIndexing] Creating index: {self.index_name}")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
89
90
91
92
93
94
95
96
97
                  indexer_logger.info(
                      f"Creating index: {self.index_name}",
                      extra={
                          'index_type': 'bulk',
                          'tenant_id': tenant_id,
                          'operation': 'create_index',
                          'index_name': self.index_name
                      }
                  )
3c1f8031   tangwang   api/routes/indexe...
98
99
100
101
102
103
104
105
                  if not self.es_client.create_index(self.index_name, mapping):
                      raise Exception(f"Failed to create index: {self.index_name}")
                  logger.info(f"[BulkIndexing] Created index: {self.index_name}")
              else:
                  logger.info(f"[BulkIndexing] Index already exists: {self.index_name}")
  
              # 3. 转换数据
              logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
106
107
108
109
110
111
112
113
114
              indexer_logger.info(
                  f"Transforming SPU data",
                  extra={
                      'index_type': 'bulk',
                      'tenant_id': tenant_id,
                      'operation': 'transform_data',
                      'index_name': self.index_name
                  }
              )
3c1f8031   tangwang   api/routes/indexe...
115
116
117
118
119
              transformer = SPUTransformer(self.db_engine, tenant_id)
              documents = transformer.transform_batch()
  
              if not documents:
                  logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
120
121
122
123
124
125
126
127
128
129
130
                  elapsed_time = time.time() - start_time
                  log_index_result(
                      indexer_logger,
                      index_type='bulk',
                      tenant_id=tenant_id,
                      total_count=0,
                      success_count=0,
                      failed_count=0,
                      elapsed_time=elapsed_time,
                      index_name=self.index_name
                  )
3c1f8031   tangwang   api/routes/indexe...
131
132
133
134
135
                  return {
                      "success": True,
                      "total": 0,
                      "indexed": 0,
                      "failed": 0,
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
136
137
138
139
                      "elapsed_time": elapsed_time,
                      "message": "No documents to index",
                      "index_name": self.index_name,
                      "tenant_id": tenant_id
3c1f8031   tangwang   api/routes/indexe...
140
141
142
                  }
  
              logger.info(f"[BulkIndexing] Transformed {len(documents)} documents")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
143
144
145
146
147
148
149
150
151
152
              indexer_logger.info(
                  f"Transformed {len(documents)} documents",
                  extra={
                      'index_type': 'bulk',
                      'tenant_id': tenant_id,
                      'operation': 'transform_complete',
                      'total_count': len(documents),
                      'index_name': self.index_name
                  }
              )
3c1f8031   tangwang   api/routes/indexe...
153
154
155
  
              # 4. 批量导入
              logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
156
              indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size, max_retries=3)
3c1f8031   tangwang   api/routes/indexe...
157
158
159
160
161
162
163
164
              results = indexer.index_documents(
                  documents,
                  id_field="spu_id",
                  show_progress=False  # API调用时不打印进度
              )
  
              elapsed_time = time.time() - start_time
  
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
165
166
167
168
169
170
171
172
173
174
175
176
177
              # 记录最终结果
              log_index_result(
                  indexer_logger,
                  index_type='bulk',
                  tenant_id=tenant_id,
                  total_count=len(documents),
                  success_count=results['success'],
                  failed_count=results['failed'],
                  elapsed_time=elapsed_time,
                  index_name=self.index_name,
                  errors=results.get('errors', [])
              )
  
3c1f8031   tangwang   api/routes/indexe...
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
              logger.info(
                  f"[BulkIndexing] Completed for tenant_id={tenant_id}: "
                  f"indexed={results['success']}, failed={results['failed']}, "
                  f"elapsed={elapsed_time:.2f}s"
              )
  
              return {
                  "success": results['failed'] == 0,
                  "total": len(documents),
                  "indexed": results['success'],
                  "failed": results['failed'],
                  "elapsed_time": elapsed_time,
                  "index_name": self.index_name,
                  "tenant_id": tenant_id
              }
  
          except Exception as e:
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
195
196
              elapsed_time = time.time() - start_time
              error_msg = str(e)
3c1f8031   tangwang   api/routes/indexe...
197
              logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True)
c55c5e47   tangwang   feat: 新增增量索引接口并重构...
198
199
200
201
202
203
204
205
206
207
208
209
              indexer_logger.error(
                  f"Bulk index failed: {error_msg}",
                  extra={
                      'index_type': 'bulk',
                      'tenant_id': tenant_id,
                      'operation': 'request_failed',
                      'error': error_msg,
                      'elapsed_time': elapsed_time,
                      'index_name': self.index_name
                  },
                  exc_info=True
              )
3c1f8031   tangwang   api/routes/indexe...
210
              raise