3c1f8031
tangwang
api/routes/indexe...
|
1
2
3
4
5
6
7
8
9
10
11
12
|
"""
全量索引服务。
提供全量索引功能,将指定租户的所有SPU数据导入到ES。
"""
import logging
from typing import Dict, Any
from sqlalchemy import Engine
from utils.es_client import ESClient
from indexer.spu_transformer import SPUTransformer
from indexer.bulk_indexer import BulkIndexer
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
13
|
from indexer.mapping_generator import load_mapping, delete_index_if_exists, get_tenant_index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
14
15
16
|
from indexer.indexer_logger import (
get_indexer_logger, log_index_request, log_index_result, log_bulk_index_batch
)
|
3c1f8031
tangwang
api/routes/indexe...
|
17
18
|
logger = logging.getLogger(__name__)
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
19
20
|
# Indexer专用日志器
indexer_logger = get_indexer_logger()
|
3c1f8031
tangwang
api/routes/indexe...
|
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class BulkIndexingService:
"""全量索引服务,提供批量导入功能。"""
def __init__(self, db_engine: Engine, es_client: ESClient):
"""
初始化全量索引服务。
Args:
db_engine: SQLAlchemy database engine
es_client: Elasticsearch client
"""
self.db_engine = db_engine
self.es_client = es_client
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
36
|
# Index name is now generated dynamically per tenant, no longer stored here
|
3c1f8031
tangwang
api/routes/indexe...
|
37
38
39
40
41
42
|
def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
"""执行全量索引"""
import time
start_time = time.time()
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
43
44
45
|
# Generate tenant-specific index name
index_name = get_tenant_index_name(tenant_id)
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
46
47
48
49
50
51
52
53
|
# 记录请求开始
log_index_request(
indexer_logger,
index_type='bulk',
tenant_id=tenant_id,
request_params={
'recreate_index': recreate_index,
'batch_size': batch_size,
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
54
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
55
56
57
|
}
)
|
3c1f8031
tangwang
api/routes/indexe...
|
58
59
60
|
try:
# 1. 加载mapping
logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
61
62
63
64
65
66
|
indexer_logger.info(
f"Loading mapping for bulk index",
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'load_mapping',
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
67
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
68
69
|
}
)
|
3c1f8031
tangwang
api/routes/indexe...
|
70
71
72
73
|
mapping = load_mapping()
# 2. 处理索引(删除并重建或创建)
if recreate_index:
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
74
|
logger.info(f"[BulkIndexing] Recreating index: {index_name}")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
75
|
indexer_logger.info(
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
76
|
f"Recreating index: {index_name}",
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
77
78
79
80
|
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'recreate_index',
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
81
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
82
83
|
}
)
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
84
85
86
|
if self.es_client.index_exists(index_name):
if delete_index_if_exists(self.es_client, index_name):
logger.info(f"[BulkIndexing] Deleted existing index: {index_name}")
|
3c1f8031
tangwang
api/routes/indexe...
|
87
|
else:
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
88
|
raise Exception(f"Failed to delete index: {index_name}")
|
3c1f8031
tangwang
api/routes/indexe...
|
89
|
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
90
91
|
if not self.es_client.index_exists(index_name):
logger.info(f"[BulkIndexing] Creating index: {index_name}")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
92
|
indexer_logger.info(
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
93
|
f"Creating index: {index_name}",
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
94
95
96
97
|
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'create_index',
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
98
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
99
100
|
}
)
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
101
102
103
|
if not self.es_client.create_index(index_name, mapping):
raise Exception(f"Failed to create index: {index_name}")
logger.info(f"[BulkIndexing] Created index: {index_name}")
|
3c1f8031
tangwang
api/routes/indexe...
|
104
|
else:
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
105
|
logger.info(f"[BulkIndexing] Index already exists: {index_name}")
|
3c1f8031
tangwang
api/routes/indexe...
|
106
107
108
|
# 3. 转换数据
logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
109
110
111
112
113
114
|
indexer_logger.info(
f"Transforming SPU data",
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'transform_data',
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
115
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
116
117
|
}
)
|
3c1f8031
tangwang
api/routes/indexe...
|
118
119
120
121
122
|
transformer = SPUTransformer(self.db_engine, tenant_id)
documents = transformer.transform_batch()
if not documents:
logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
123
124
125
126
127
128
129
130
131
|
elapsed_time = time.time() - start_time
log_index_result(
indexer_logger,
index_type='bulk',
tenant_id=tenant_id,
total_count=0,
success_count=0,
failed_count=0,
elapsed_time=elapsed_time,
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
132
|
index_name=index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
133
|
)
|
3c1f8031
tangwang
api/routes/indexe...
|
134
135
136
137
138
|
return {
"success": True,
"total": 0,
"indexed": 0,
"failed": 0,
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
139
140
|
"elapsed_time": elapsed_time,
"message": "No documents to index",
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
141
|
"index_name": index_name,
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
142
|
"tenant_id": tenant_id
|
3c1f8031
tangwang
api/routes/indexe...
|
143
144
145
|
}
logger.info(f"[BulkIndexing] Transformed {len(documents)} documents")
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
146
147
148
149
150
151
152
|
indexer_logger.info(
f"Transformed {len(documents)} documents",
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'transform_complete',
'total_count': len(documents),
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
153
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
154
155
|
}
)
|
3c1f8031
tangwang
api/routes/indexe...
|
156
157
158
|
# 4. 批量导入
logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
159
|
indexer = BulkIndexer(self.es_client, index_name, batch_size=batch_size, max_retries=3)
|
3c1f8031
tangwang
api/routes/indexe...
|
160
161
162
163
164
165
166
167
|
results = indexer.index_documents(
documents,
id_field="spu_id",
show_progress=False # API调用时不打印进度
)
elapsed_time = time.time() - start_time
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
168
169
170
171
172
173
174
175
176
|
# 记录最终结果
log_index_result(
indexer_logger,
index_type='bulk',
tenant_id=tenant_id,
total_count=len(documents),
success_count=results['success'],
failed_count=results['failed'],
elapsed_time=elapsed_time,
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
177
|
index_name=index_name,
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
178
179
180
|
errors=results.get('errors', [])
)
|
3c1f8031
tangwang
api/routes/indexe...
|
181
182
183
184
185
186
187
188
189
190
191
192
|
logger.info(
f"[BulkIndexing] Completed for tenant_id={tenant_id}: "
f"indexed={results['success']}, failed={results['failed']}, "
f"elapsed={elapsed_time:.2f}s"
)
return {
"success": results['failed'] == 0,
"total": len(documents),
"indexed": results['success'],
"failed": results['failed'],
"elapsed_time": elapsed_time,
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
193
|
"index_name": index_name,
|
3c1f8031
tangwang
api/routes/indexe...
|
194
195
196
197
|
"tenant_id": tenant_id
}
except Exception as e:
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
198
199
|
elapsed_time = time.time() - start_time
error_msg = str(e)
|
3c1f8031
tangwang
api/routes/indexe...
|
200
|
logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True)
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
201
202
203
204
205
206
207
208
|
indexer_logger.error(
f"Bulk index failed: {error_msg}",
extra={
'index_type': 'bulk',
'tenant_id': tenant_id,
'operation': 'request_failed',
'error': error_msg,
'elapsed_time': elapsed_time,
|
e4a39cc8
tangwang
索引隔离。 不同的tenant_i...
|
209
|
'index_name': index_name
|
c55c5e47
tangwang
feat: 新增增量索引接口并重构...
|
210
211
212
|
},
exc_info=True
)
|
3c1f8031
tangwang
api/routes/indexe...
|
213
|
raise
|