Blame view

scripts/ingest_shoplazza.py 4.96 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  #!/usr/bin/env python3
  """
  Shoplazza data ingestion script.
  
  Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer.
  """
  
  import sys
  import os
  import argparse
  from pathlib import Path
  
  # Add parent directory to path
  sys.path.insert(0, str(Path(__file__).parent.parent))
  
  from utils.db_connector import create_db_connection
  from utils.es_client import ESClient
  from indexer.spu_transformer import SPUTransformer
59b0a342   tangwang   创建手写 mapping JSON
19
  from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
20
  from indexer.bulk_indexer import BulkIndexer
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  
  
  def main():
      parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch')
      
      # Database connection
      parser.add_argument('--db-host', required=True, help='MySQL host')
      parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
      parser.add_argument('--db-database', required=True, help='MySQL database name')
      parser.add_argument('--db-username', required=True, help='MySQL username')
      parser.add_argument('--db-password', required=True, help='MySQL password')
      
      # Tenant and index
      parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
35
36
37
38
39
40
41
42
43
44
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      
      # Options
      parser.add_argument('--recreate', action='store_true', help='Recreate index if exists')
      parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)')
      
      args = parser.parse_args()
  
      print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}")
  
59b0a342   tangwang   创建手写 mapping JSON
45
      # Load mapping from JSON file
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
46
      try:
59b0a342   tangwang   创建手写 mapping JSON
47
48
          mapping = load_mapping()
          print(f"Loaded mapping configuration")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
49
      except Exception as e:
59b0a342   tangwang   创建手写 mapping JSON
50
          print(f"ERROR: Failed to load mapping: {e}")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
51
52
          return 1
  
59b0a342   tangwang   创建手写 mapping JSON
53
      index_name = DEFAULT_INDEX_NAME
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  
      # Connect to MySQL
      print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
      try:
          db_engine = create_db_connection(
              host=args.db_host,
              port=args.db_port,
              database=args.db_database,
              username=args.db_username,
              password=args.db_password
          )
      except Exception as e:
          print(f"ERROR: Failed to connect to MySQL: {e}")
          return 1
  
59b0a342   tangwang   创建手写 mapping JSON
69
70
71
72
      # Connect to Elasticsearch
      es_host = args.es_host
      es_username = os.environ.get('ES_USERNAME')
      es_password = os.environ.get('ES_PASSWORD')
fb68a0ef   tangwang   配置优化
73
74
75
76
77
78
79
80
      
      print(f"Connecting to Elasticsearch: {es_host}")
      if es_username and es_password:
          print(f"Using authentication: {es_username}")
          es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
      else:
          es_client = ESClient(hosts=[es_host])
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
81
      if not es_client.ping():
fb68a0ef   tangwang   配置优化
82
          print(f"ERROR: Cannot connect to Elasticsearch at {es_host}")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
83
84
          return 1
  
59b0a342   tangwang   创建手写 mapping JSON
85
      # Create index if needed
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
86
87
88
      if args.recreate:
          if es_client.index_exists(index_name):
              print(f"Deleting existing index: {index_name}")
41e1f8df   tangwang   店匠体系数据的搜索:mock da...
89
90
91
              if not es_client.delete_index(index_name):
                  print(f"ERROR: Failed to delete index '{index_name}'")
                  return 1
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
92
93
94
  
      if not es_client.index_exists(index_name):
          print(f"Creating index: {index_name}")
41e1f8df   tangwang   店匠体系数据的搜索:mock da...
95
96
97
98
          if not es_client.create_index(index_name, mapping):
              print(f"ERROR: Failed to create index '{index_name}'")
              print("Please check the mapping configuration and try again.")
              return 1
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      else:
          print(f"Using existing index: {index_name}")
  
      # Initialize SPU transformer
      print(f"Initializing SPU transformer for tenant: {args.tenant_id}")
      transformer = SPUTransformer(db_engine, args.tenant_id)
  
      # Transform data
      print("Transforming SPU and SKU data...")
      try:
          documents = transformer.transform_batch()
          print(f"Transformed {len(documents)} SPU documents")
      except Exception as e:
          print(f"ERROR: Failed to transform data: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
      if not documents:
          print("WARNING: No documents to index")
          return 0
  
      # Bulk index
      print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...")
      indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size)
      
      try:
cadc77b6   tangwang   索引字段名、变量名、API数据结构...
126
          results = indexer.index_documents(documents, id_field="spu_id", show_progress=True)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
          print(f"\nIngestion complete:")
          print(f"  Success: {results['success']}")
          print(f"  Failed: {results['failed']}")
          print(f"  Time: {results.get('elapsed_time', 0):.2f}s")
          
          if results['failed'] > 0:
              print(f"\nWARNING: {results['failed']} documents failed to index")
              return 1
          
          return 0
      except Exception as e:
          print(f"ERROR: Failed to index documents: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
  
  if __name__ == '__main__':
      sys.exit(main())