Blame view

scripts/import_test_data.py 6.95 KB
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  #!/usr/bin/env python3
  """
  Import test data into MySQL Shoplazza tables.
  
  Reads SQL file generated by generate_test_data.py and imports into MySQL.
  """
  
  import sys
  import os
  import argparse
  from pathlib import Path
  
  # Add parent directory to path
  sys.path.insert(0, str(Path(__file__).parent.parent))
  
  from utils.db_connector import create_db_connection, test_connection
  
  
  def import_sql_file(db_engine, sql_file: str):
      """
      Import SQL file into database.
  
      Args:
          db_engine: SQLAlchemy database engine
          sql_file: Path to SQL file
      """
fb68a0ef   tangwang   配置优化
27
28
      from sqlalchemy import text
      
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
29
30
31
      with open(sql_file, 'r', encoding='utf-8') as f:
          sql_content = f.read()
      
a5a3856d   tangwang   店匠体系数据的搜索:mock da...
32
33
34
35
36
37
38
      # More robust SQL statement parsing
      # Handle multi-line statements and quoted strings properly
      statements = []
      current_statement = []
      in_string = False
      string_char = None
      i = 0
fb68a0ef   tangwang   配置优化
39
      
a5a3856d   tangwang   店匠体系数据的搜索:mock da...
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
      while i < len(sql_content):
          char = sql_content[i]
          
          # Track string boundaries (single quotes)
          if char in ("'", '"') and (i == 0 or sql_content[i-1] != '\\'):
              if not in_string:
                  in_string = True
                  string_char = char
              elif char == string_char:
                  in_string = False
                  string_char = None
          
          current_statement.append(char)
          
          # Only split on semicolon if not inside a string
          if char == ';' and not in_string:
              stmt = ''.join(current_statement).strip()
              # Remove comments
              if '--' in stmt:
                  # Only remove comments that are not inside strings
                  parts = []
                  in_quote = False
                  quote_char = None
                  j = 0
                  while j < len(stmt):
                      c = stmt[j]
                      if c in ("'", '"') and (j == 0 or stmt[j-1] != '\\'):
                          if not in_quote:
                              in_quote = True
                              quote_char = c
                          elif c == quote_char:
                              in_quote = False
                              quote_char = None
                      if c == '-' and j < len(stmt) - 1 and stmt[j+1] == '-' and not in_quote:
                          # Found comment marker outside string
                          break
                      parts.append(c)
                      j += 1
                  stmt = ''.join(parts).strip()
              
              if stmt and not stmt.startswith('--'):
                  statements.append(stmt)
              current_statement = []
          
          i += 1
      
      # Handle last statement if no semicolon at end
      if current_statement:
          stmt = ''.join(current_statement).strip()
          if stmt and not stmt.startswith('--'):
              statements.append(stmt)
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
91
92
93
94
95
96
97
      
      print(f"Executing {len(statements)} SQL statements...")
      
      with db_engine.connect() as conn:
          for i, statement in enumerate(statements, 1):
              if statement:
                  try:
fb68a0ef   tangwang   配置优化
98
                      conn.execute(text(statement))
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
99
100
101
102
                      conn.commit()
                      print(f"  [{i}/{len(statements)}] Executed successfully")
                  except Exception as e:
                      print(f"  [{i}/{len(statements)}] ERROR: {e}")
a5a3856d   tangwang   店匠体系数据的搜索:mock da...
103
104
105
106
                      # Show more context for debugging
                      error_start = max(0, statement.find('VALUES') - 100)
                      error_end = min(len(statement), error_start + 500)
                      print(f"  Statement context: ...{statement[error_start:error_end]}...")
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
                      raise
  
  
  def verify_import(db_engine, tenant_id: str):
      """
      Verify imported data.
  
      Args:
          db_engine: SQLAlchemy database engine
          tenant_id: Tenant ID to verify
      """
      from sqlalchemy import text
      
      with db_engine.connect() as conn:
          # Count SPUs
          result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_spu WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
          spu_count = result.scalar()
          
          # Count SKUs
          result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_sku WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id})
          sku_count = result.scalar()
          
          print(f"\nVerification:")
          print(f"  SPUs: {spu_count}")
          print(f"  SKUs: {sku_count}")
          
          return spu_count, sku_count
  
  
  def main():
      parser = argparse.ArgumentParser(description='Import test data into MySQL')
      
      # Database connection
      parser.add_argument('--db-host', required=True, help='MySQL host')
      parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)')
      parser.add_argument('--db-database', required=True, help='MySQL database name')
      parser.add_argument('--db-username', required=True, help='MySQL username')
      parser.add_argument('--db-password', required=True, help='MySQL password')
      
      # Import options
      parser.add_argument('--sql-file', required=True, help='SQL file to import')
      parser.add_argument('--tenant-id', help='Tenant ID to verify (optional)')
      
      args = parser.parse_args()
  
      print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
      
      # Connect to database
      try:
          db_engine = create_db_connection(
              host=args.db_host,
              port=args.db_port,
              database=args.db_database,
              username=args.db_username,
              password=args.db_password
          )
      except Exception as e:
          print(f"ERROR: Failed to connect to MySQL: {e}")
          return 1
  
      # Test connection
      if not test_connection(db_engine):
          print("ERROR: Database connection test failed")
          return 1
  
      print("Database connection successful")
  
fb68a0ef   tangwang   配置优化
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      # Clean existing data if tenant_id provided
      if args.tenant_id:
          print(f"\nCleaning existing data for tenant_id: {args.tenant_id}")
          from sqlalchemy import text
          try:
              with db_engine.connect() as conn:
                  # Delete SKUs first (foreign key constraint)
                  conn.execute(text(f"DELETE FROM shoplazza_product_sku WHERE tenant_id = '{args.tenant_id}'"))
                  # Delete SPUs
                  conn.execute(text(f"DELETE FROM shoplazza_product_spu WHERE tenant_id = '{args.tenant_id}'"))
                  conn.commit()
                  print("✓ Existing data cleaned")
          except Exception as e:
              print(f"⚠ Warning: Failed to clean existing data: {e}")
              # Continue anyway
  
1f6d15fa   tangwang   重构:SPU级别索引、统一索引架构...
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
      # Import SQL file
      print(f"\nImporting SQL file: {args.sql_file}")
      try:
          import_sql_file(db_engine, args.sql_file)
          print("Import completed successfully")
      except Exception as e:
          print(f"ERROR: Failed to import SQL file: {e}")
          import traceback
          traceback.print_exc()
          return 1
  
      # Verify import if tenant_id provided
      if args.tenant_id:
          verify_import(db_engine, args.tenant_id)
  
      return 0
  
  
  if __name__ == '__main__':
      sys.exit(main())