1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#!/usr/bin/env python3
"""
Shoplazza data ingestion script.
Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer.
"""
import sys
import os
import argparse
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from utils.db_connector import create_db_connection
from utils.es_client import ESClient
from indexer.spu_transformer import SPUTransformer
from indexer.mapping_generator import MappingGenerator
from indexer.bulk_indexer import BulkIndexer
from config import ConfigLoader
def main():
parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch')
# Database connection
parser.add_argument('--db-host', required=True, help='MySQL host')
parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
parser.add_argument('--db-database', required=True, help='MySQL database name')
parser.add_argument('--db-username', required=True, help='MySQL username')
parser.add_argument('--db-password', required=True, help='MySQL password')
# Tenant and index
parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
36
37
38
39
40
41
42
43
44
45
|
parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
# Options
parser.add_argument('--recreate', action='store_true', help='Recreate index if exists')
parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)')
args = parser.parse_args()
print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}")
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
46
47
|
# Load unified configuration
config_loader = ConfigLoader("config/config.yaml")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
48
|
try:
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
49
50
|
config = config_loader.load_config()
print(f"Loaded configuration: {config.es_index_name}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
except Exception as e:
print(f"ERROR: Failed to load configuration: {e}")
return 1
# Validate tenant_id field exists
tenant_id_field = None
for field in config.fields:
if field.name == "tenant_id":
tenant_id_field = field
break
if not tenant_id_field:
print("ERROR: Configuration must include 'tenant_id' field")
return 1
# Connect to MySQL
print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
try:
db_engine = create_db_connection(
host=args.db_host,
port=args.db_port,
database=args.db_database,
username=args.db_username,
password=args.db_password
)
except Exception as e:
print(f"ERROR: Failed to connect to MySQL: {e}")
return 1
|
fb68a0ef
tangwang
配置优化
|
80
|
# Connect to Elasticsearch (use unified config loading)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
81
|
from config.env_config import ES_CONFIG
|
fb68a0ef
tangwang
配置优化
|
82
83
|
# Use provided es_host or fallback to config
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
84
85
86
|
es_host = args.es_host or ES_CONFIG.get('host', 'http://localhost:9200')
es_username = ES_CONFIG.get('username')
es_password = ES_CONFIG.get('password')
|
fb68a0ef
tangwang
配置优化
|
87
88
89
90
91
92
93
94
|
print(f"Connecting to Elasticsearch: {es_host}")
if es_username and es_password:
print(f"Using authentication: {es_username}")
es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
else:
es_client = ESClient(hosts=[es_host])
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
95
|
if not es_client.ping():
|
fb68a0ef
tangwang
配置优化
|
96
|
print(f"ERROR: Cannot connect to Elasticsearch at {es_host}")
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
97
98
99
100
101
102
103
104
105
106
|
return 1
# Generate and create index
mapping_gen = MappingGenerator(config)
mapping = mapping_gen.generate_mapping()
index_name = config.es_index_name
if args.recreate:
if es_client.index_exists(index_name):
print(f"Deleting existing index: {index_name}")
|
41e1f8df
tangwang
店匠体系数据的搜索:mock da...
|
107
108
109
|
if not es_client.delete_index(index_name):
print(f"ERROR: Failed to delete index '{index_name}'")
return 1
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
110
111
112
|
if not es_client.index_exists(index_name):
print(f"Creating index: {index_name}")
|
41e1f8df
tangwang
店匠体系数据的搜索:mock da...
|
113
114
115
116
|
if not es_client.create_index(index_name, mapping):
print(f"ERROR: Failed to create index '{index_name}'")
print("Please check the mapping configuration and try again.")
return 1
|
1f6d15fa
tangwang
重构:SPU级别索引、统一索引架构...
|
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
else:
print(f"Using existing index: {index_name}")
# Initialize SPU transformer
print(f"Initializing SPU transformer for tenant: {args.tenant_id}")
transformer = SPUTransformer(db_engine, args.tenant_id)
# Transform data
print("Transforming SPU and SKU data...")
try:
documents = transformer.transform_batch()
print(f"Transformed {len(documents)} SPU documents")
except Exception as e:
print(f"ERROR: Failed to transform data: {e}")
import traceback
traceback.print_exc()
return 1
if not documents:
print("WARNING: No documents to index")
return 0
# Bulk index
print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...")
indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size)
try:
results = indexer.index_documents(documents, id_field="product_id", show_progress=True)
print(f"\nIngestion complete:")
print(f" Success: {results['success']}")
print(f" Failed: {results['failed']}")
print(f" Time: {results.get('elapsed_time', 0):.2f}s")
if results['failed'] > 0:
print(f"\nWARNING: {results['failed']} documents failed to index")
return 1
return 0
except Exception as e:
print(f"ERROR: Failed to index documents: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())
|