Blame view

scripts/import_test_data.py 5.16 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  #!/usr/bin/env python3
  """
  Import test data into MySQL Shoplazza tables.
  
  Reads SQL file generated by generate_test_data.py and imports into MySQL.
  """
  
  import sys
  import os
  import argparse
  from pathlib import Path
  
  # Add parent directory to path
  sys.path.insert(0, str(Path(__file__).parent.parent))
  
  from utils.db_connector import create_db_connection, test_connection
  
  
  def import_sql_file(db_engine, sql_file: str):
      """
      Import SQL file into database.
  
      Args:
          db_engine: SQLAlchemy database engine
          sql_file: Path to SQL file
      """
fb68a0ef   tangwang   配置优化
27
28
      from sqlalchemy import text
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
29
30
31
32
      with open(sql_file, 'r', encoding='utf-8') as f:
          sql_content = f.read()
      
      # Split by semicolons to get individual statements
fb68a0ef   tangwang   配置优化
33
34
35
36
37
38
39
40
41
42
43
44
45
      # Remove comments and empty lines, then split by semicolon
      lines = []
      for line in sql_content.split('\n'):
          # Remove inline comments
          if '--' in line:
              line = line[:line.index('--')]
          line = line.strip()
          if line and not line.startswith('--'):
              lines.append(line)
      
      # Join lines and split by semicolon
      full_text = ' '.join(lines)
      statements = [s.strip() for s in full_text.split(';') if s.strip()]
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
46
47
48
49
50
51
52
      
      print(f"Executing {len(statements)} SQL statements...")
      
      with db_engine.connect() as conn:
          for i, statement in enumerate(statements, 1):
              if statement:
                  try:
fb68a0ef   tangwang   配置优化
53
                      conn.execute(text(statement))
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
54
55
56
57
                      conn.commit()
                      print(f"  [{i}/{len(statements)}] Executed successfully")
                  except Exception as e:
                      print(f"  [{i}/{len(statements)}] ERROR: {e}")
fb68a0ef   tangwang   配置优化
58
                      print(f"  Statement: {statement[:200]}...")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
                      raise
  
  
  def verify_import(db_engine, tenant_id: str):
      """
      Verify imported data.
  
      Args:
          db_engine: SQLAlchemy database engine
          tenant_id: Tenant ID to verify
      """
      from sqlalchemy import text
      
      with db_engine.connect() as conn:
          # Count SPUs
          result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_spu WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
          spu_count = result.scalar()
          
          # Count SKUs
          result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_sku WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
          sku_count = result.scalar()
          
          print(f"\nVerification:")
          print(f"  SPUs: {spu_count}")
          print(f"  SKUs: {sku_count}")
          
          return spu_count, sku_count
  
  
  def main():
      parser = argparse.ArgumentParser(description='Import test data into MySQL')
      
      # Database connection
      parser.add_argument('--db-host', required=True, help='MySQL host')
      parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
      parser.add_argument('--db-database', required=True, help='MySQL database name')
      parser.add_argument('--db-username', required=True, help='MySQL username')
      parser.add_argument('--db-password', required=True, help='MySQL password')
      
      # Import options
      parser.add_argument('--sql-file', required=True, help='SQL file to import')
      parser.add_argument('--tenant-id', help='Tenant ID to verify (optional)')
      
      args = parser.parse_args()
  
      print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
      
      # Connect to database
      try:
          db_engine = create_db_connection(
              host=args.db_host,
              port=args.db_port,
              database=args.db_database,
              username=args.db_username,
              password=args.db_password
          )
      except Exception as e:
          print(f"ERROR: Failed to connect to MySQL: {e}")
          return 1
  
      # Test connection
      if not test_connection(db_engine):
          print("ERROR: Database connection test failed")
          return 1
  
      print("Database connection successful")
  
fb68a0ef   tangwang   配置优化
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
      # Clean existing data if tenant_id provided
      if args.tenant_id:
          print(f"\nCleaning existing data for tenant_id: {args.tenant_id}")
          from sqlalchemy import text
          try:
              with db_engine.connect() as conn:
                  # Delete SKUs first (foreign key constraint)
                  conn.execute(text(f"DELETE FROM shoplazza_product_sku WHERE tenant_id = '{args.tenant_id}'"))
                  # Delete SPUs
                  conn.execute(text(f"DELETE FROM shoplazza_product_spu WHERE tenant_id = '{args.tenant_id}'"))
                  conn.commit()
                  print("✓ Existing data cleaned")
          except Exception as e:
              print(f"⚠ Warning: Failed to clean existing data: {e}")
              # Continue anyway
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
      # Import SQL file
      print(f"\nImporting SQL file: {args.sql_file}")
      try:
          import_sql_file(db_engine, args.sql_file)
          print("Import completed successfully")
      except Exception as e:
          print(f"ERROR: Failed to import SQL file: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
      # Verify import if tenant_id provided
      if args.tenant_id:
          verify_import(db_engine, args.tenant_id)
  
      return 0
  
  
  if __name__ == '__main__':
      sys.exit(main())