Blame view

scripts/ingest_shoplazza.py 4.99 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
  #!/usr/bin/env python3
  """
  Shoplazza data ingestion script.
  
  Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer.
  """
  
  import sys
  import os
  import argparse
  from pathlib import Path
  
  # Add parent directory to path
  sys.path.insert(0, str(Path(__file__).parent.parent))
  
  from utils.db_connector import create_db_connection
  from utils.es_client import ESClient
  from indexer.spu_transformer import SPUTransformer
  from indexer.mapping_generator import MappingGenerator
  from indexer.bulk_indexer import BulkIndexer
  from config import ConfigLoader
  
  
  def main():
      parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch')
      
      # Database connection
      parser.add_argument('--db-host', required=True, help='MySQL host')
      parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
      parser.add_argument('--db-database', required=True, help='MySQL database name')
      parser.add_argument('--db-username', required=True, help='MySQL username')
      parser.add_argument('--db-password', required=True, help='MySQL password')
      
      # Tenant and index
      parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)')
      parser.add_argument('--config', default='base', help='Configuration ID (default: base)')
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      
      # Options
      parser.add_argument('--recreate', action='store_true', help='Recreate index if exists')
      parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)')
      
      args = parser.parse_args()
  
      print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}")
  
      # Load configuration
      config_loader = ConfigLoader("config/schema")
      try:
          config = config_loader.load_customer_config(args.config)
          print(f"Loaded configuration: {config.customer_name}")
      except Exception as e:
          print(f"ERROR: Failed to load configuration: {e}")
          return 1
  
      # Validate tenant_id field exists
      tenant_id_field = None
      for field in config.fields:
          if field.name == "tenant_id":
              tenant_id_field = field
              break
      
      if not tenant_id_field:
          print("ERROR: Configuration must include 'tenant_id' field")
          return 1
  
      # Connect to MySQL
      print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
      try:
          db_engine = create_db_connection(
              host=args.db_host,
              port=args.db_port,
              database=args.db_database,
              username=args.db_username,
              password=args.db_password
          )
      except Exception as e:
          print(f"ERROR: Failed to connect to MySQL: {e}")
          return 1
  
      # Connect to Elasticsearch
      print(f"Connecting to Elasticsearch: {args.es_host}")
      es_client = ESClient(hosts=[args.es_host])
      if not es_client.ping():
          print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}")
          return 1
  
      # Generate and create index
      mapping_gen = MappingGenerator(config)
      mapping = mapping_gen.generate_mapping()
      index_name = config.es_index_name
  
      if args.recreate:
          if es_client.index_exists(index_name):
              print(f"Deleting existing index: {index_name}")
              es_client.delete_index(index_name)
  
      if not es_client.index_exists(index_name):
          print(f"Creating index: {index_name}")
          es_client.create_index(index_name, mapping)
      else:
          print(f"Using existing index: {index_name}")
  
      # Initialize SPU transformer
      print(f"Initializing SPU transformer for tenant: {args.tenant_id}")
      transformer = SPUTransformer(db_engine, args.tenant_id)
  
      # Transform data
      print("Transforming SPU and SKU data...")
      try:
          documents = transformer.transform_batch()
          print(f"Transformed {len(documents)} SPU documents")
      except Exception as e:
          print(f"ERROR: Failed to transform data: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
      if not documents:
          print("WARNING: No documents to index")
          return 0
  
      # Bulk index
      print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...")
      indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size)
      
      try:
          results = indexer.index_documents(documents, id_field="product_id", show_progress=True)
          print(f"\nIngestion complete:")
          print(f"  Success: {results['success']}")
          print(f"  Failed: {results['failed']}")
          print(f"  Time: {results.get('elapsed_time', 0):.2f}s")
          
          if results['failed'] > 0:
              print(f"\nWARNING: {results['failed']} documents failed to index")
              return 1
          
          return 0
      except Exception as e:
          print(f"ERROR: Failed to index documents: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
  
  if __name__ == '__main__':
      sys.exit(main())