3c1f8031
tangwang
api/routes/indexe...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
"""
全量索引服务。
提供全量索引功能,将指定租户的所有SPU数据导入到ES。
"""
import logging
from typing import Dict, Any
from sqlalchemy import Engine
from utils.es_client import ESClient
from indexer.spu_transformer import SPUTransformer
from indexer.bulk_indexer import BulkIndexer
from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME
logger = logging.getLogger(__name__)
class BulkIndexingService:
"""全量索引服务,提供批量导入功能。"""
def __init__(self, db_engine: Engine, es_client: ESClient):
"""
初始化全量索引服务。
Args:
db_engine: SQLAlchemy database engine
es_client: Elasticsearch client
"""
self.db_engine = db_engine
self.es_client = es_client
self.index_name = DEFAULT_INDEX_NAME
def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
"""执行全量索引"""
import time
start_time = time.time()
try:
# 1. 加载mapping
logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}")
mapping = load_mapping()
# 2. 处理索引(删除并重建或创建)
if recreate_index:
logger.info(f"[BulkIndexing] Recreating index: {self.index_name}")
if self.es_client.index_exists(self.index_name):
if delete_index_if_exists(self.es_client, self.index_name):
logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}")
else:
raise Exception(f"Failed to delete index: {self.index_name}")
if not self.es_client.index_exists(self.index_name):
logger.info(f"[BulkIndexing] Creating index: {self.index_name}")
if not self.es_client.create_index(self.index_name, mapping):
raise Exception(f"Failed to create index: {self.index_name}")
logger.info(f"[BulkIndexing] Created index: {self.index_name}")
else:
logger.info(f"[BulkIndexing] Index already exists: {self.index_name}")
# 3. 转换数据
logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
transformer = SPUTransformer(self.db_engine, tenant_id)
documents = transformer.transform_batch()
if not documents:
logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}")
return {
"success": True,
"total": 0,
"indexed": 0,
"failed": 0,
"elapsed_time": time.time() - start_time,
"message": "No documents to index"
}
logger.info(f"[BulkIndexing] Transformed {len(documents)} documents")
# 4. 批量导入
logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size)
results = indexer.index_documents(
documents,
id_field="spu_id",
show_progress=False # API调用时不打印进度
)
elapsed_time = time.time() - start_time
logger.info(
f"[BulkIndexing] Completed for tenant_id={tenant_id}: "
f"indexed={results['success']}, failed={results['failed']}, "
f"elapsed={elapsed_time:.2f}s"
)
return {
"success": results['failed'] == 0,
"total": len(documents),
"indexed": results['success'],
"failed": results['failed'],
"elapsed_time": elapsed_time,
"index_name": self.index_name,
"tenant_id": tenant_id
}
except Exception as e:
logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True)
raise
|