#!/usr/bin/env python3 """ Import customer1 CSV data into MySQL Shoplazza tables. Reads CSV file and generates SQL INSERT statements for SPU and SKU tables. Each CSV row corresponds to 1 SPU and 1 SKU. """ import sys import os import csv import random import argparse import re from pathlib import Path from datetime import datetime, timedelta # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) def escape_sql_string(value: str) -> str: """ Escape SQL string value (replace single quotes with doubled quotes and handle special characters). Args: value: String value to escape Returns: Escaped string """ if value is None: return '' # Convert to string and handle None s = str(value) # Replace single quotes with doubled quotes (SQL standard) s = s.replace("'", "''") # Replace backslashes (MySQL escape) s = s.replace("\\", "\\\\") # Remove or replace control characters that can break SQL # Replace newlines and carriage returns with spaces s = s.replace("\n", " ").replace("\r", " ") # Remove other control characters (except tab) s = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', s) # Remove null bytes s = s.replace('\x00', '') return s def generate_handle(title: str) -> str: """ Generate URL-friendly handle from title. Args: title: Product title Returns: URL-friendly handle """ # Remove special characters, convert to lowercase, replace spaces with hyphens handle = re.sub(r'[^\w\s-]', '', title.lower()) handle = re.sub(r'[-\s]+', '-', handle) handle = handle.strip('-') # Limit length if len(handle) > 255: handle = handle[:255] return handle or 'product' def parse_csv_row(row: dict) -> dict: """ Parse CSV row and extract fields. Args: row: CSV row dictionary Returns: Parsed data dictionary """ # Remove quotes from values if present def clean_value(value): if value is None: return '' value = str(value).strip() # Remove surrounding quotes if value.startswith('"') and value.endswith('"'): value = value[1:-1] return value return { 'skuId': clean_value(row.get('skuId', '')), 'name': clean_value(row.get('name', '')), 'name_pinyin': clean_value(row.get('name_pinyin', '')), 'create_time': clean_value(row.get('create_time', '')), 'ruSkuName': clean_value(row.get('ruSkuName', '')), 'enSpuName': clean_value(row.get('enSpuName', '')), 'categoryName': clean_value(row.get('categoryName', '')), 'supplierName': clean_value(row.get('supplierName', '')), 'brandName': clean_value(row.get('brandName', '')), 'file_id': clean_value(row.get('file_id', '')), 'days_since_last_update': clean_value(row.get('days_since_last_update', '')), 'id': clean_value(row.get('id', '')), 'imageUrl': clean_value(row.get('imageUrl', '')) } def generate_spu_data(csv_data: dict, spu_id: int, tenant_id: str = "2") -> dict: """ Generate SPU data from CSV row. Args: csv_data: Parsed CSV row data spu_id: SPU ID tenant_id: Tenant ID (default: "2") Returns: SPU data dictionary """ # Parse create_time try: created_at = datetime.strptime(csv_data['create_time'], '%Y-%m-%d %H:%M:%S') except: created_at = datetime.now() - timedelta(days=random.randint(1, 365)) updated_at = created_at + timedelta(days=random.randint(0, 30)) # Generate handle from title title = csv_data['name'] or csv_data['enSpuName'] or 'Product' handle = generate_handle(title) # Generate tags from category and brand tags_parts = [] if csv_data['categoryName']: tags_parts.append(csv_data['categoryName']) if csv_data['brandName']: tags_parts.append(csv_data['brandName']) tags = ','.join(tags_parts) if tags_parts else '' # Generate SEO fields seo_title = f"{title} - {csv_data['categoryName']}" if csv_data['categoryName'] else title seo_description = f"购买{csv_data['brandName']}{title}" if csv_data['brandName'] else title seo_keywords = f"{title},{csv_data['categoryName']},{csv_data['brandName']}" if csv_data['categoryName'] else title spu = { 'id': spu_id, 'shop_id': 1, 'shoplazza_id': csv_data['id'] or f"spu-{spu_id}", 'handle': handle, 'title': title, 'brief': csv_data['name'] or '', 'description': f"

{csv_data['name']}

" if csv_data['name'] else '', 'spu': '', 'vendor': csv_data['supplierName'] or '', 'vendor_url': '', 'seo_title': seo_title, 'seo_description': seo_description, 'seo_keywords': seo_keywords, 'image_src': csv_data['imageUrl'] or '', 'image_width': 800, 'image_height': 600, 'image_path': f"products/{spu_id}.jpg", 'image_alt': title, 'inventory_policy': '', 'inventory_quantity': 0, 'inventory_tracking': '0', 'published': 1, 'published_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), 'requires_shipping': 1, 'taxable': 0, 'fake_sales': 0, 'display_fake_sales': 0, 'mixed_wholesale': 0, 'need_variant_image': 0, 'has_only_default_variant': 0, 'tags': tags, 'note': '', 'category': csv_data['categoryName'] or '', 'shoplazza_created_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), 'shoplazza_updated_at': updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'tenant_id': tenant_id, 'creator': '1', 'create_time': created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updater': '1', 'update_time': updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'deleted': 0 } return spu def generate_sku_data(csv_data: dict, spu_id: int, sku_id: int, tenant_id: str = "2") -> dict: """ Generate SKU data from CSV row. Args: csv_data: Parsed CSV row data spu_id: Associated SPU ID sku_id: SKU ID (from CSV skuId) tenant_id: Tenant ID (default: "2") Returns: SKU data dictionary """ # Parse create_time try: created_at = datetime.strptime(csv_data['create_time'], '%Y-%m-%d %H:%M:%S') except: created_at = datetime.now() - timedelta(days=random.randint(1, 365)) updated_at = created_at + timedelta(days=random.randint(0, 30)) # Generate random price price = round(random.uniform(50, 500), 2) compare_at_price = round(price * random.uniform(1.2, 1.5), 2) cost_price = round(price * 0.6, 2) # Generate random stock inventory_quantity = random.randint(0, 100) # Generate random weight weight = round(random.uniform(0.1, 5.0), 2) # Use ruSkuName as title, fallback to name title = csv_data['ruSkuName'] or csv_data['name'] or 'SKU' # Use skuId as SKU code sku_code = csv_data['skuId'] or f"SKU-{sku_id}" sku = { 'id': sku_id, 'spu_id': spu_id, 'shop_id': 1, 'shoplazza_id': f"sku-{sku_id}", 'shoplazza_product_id': csv_data['id'] or f"spu-{spu_id}", 'shoplazza_image_id': '', 'title': title, 'sku': sku_code, 'barcode': f"BAR{sku_id:08d}", 'position': 1, 'price': price, 'compare_at_price': compare_at_price, 'cost_price': cost_price, 'option1': '', 'option2': '', 'option3': '', 'inventory_quantity': inventory_quantity, 'weight': weight, 'weight_unit': 'kg', 'image_src': csv_data['imageUrl'] or '', 'wholesale_price': f'[{{"price": {round(price * 0.8, 2)}, "minQuantity": 10}}]', 'note': '', 'extend': None, # JSON field, use NULL 'shoplazza_created_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), 'shoplazza_updated_at': updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'tenant_id': tenant_id, 'creator': '1', 'create_time': created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updater': '1', 'update_time': updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'deleted': 0 } return sku def read_csv_file(csv_file: str) -> list: """ Read CSV file and return list of parsed rows. Args: csv_file: Path to CSV file Returns: List of parsed CSV data dictionaries """ csv_data_list = [] with open(csv_file, 'r', encoding='utf-8') as f: # Use csv.DictReader to handle quoted fields properly reader = csv.DictReader(f) for row in reader: parsed = parse_csv_row(row) csv_data_list.append(parsed) return csv_data_list def generate_sql_inserts(spus: list, skus: list, output_file: str): """ Generate SQL INSERT statements. Args: spus: List of SPU data skus: List of SKU data output_file: Output file path """ with open(output_file, 'w', encoding='utf-8') as f: f.write("-- SPU Data from customer1 CSV\n") f.write("INSERT INTO shoplazza_product_spu (\n") f.write(" id, shop_id, shoplazza_id, handle, title, brief, description, spu,\n") f.write(" vendor, vendor_url, seo_title, seo_description, seo_keywords,\n") f.write(" image_src, image_width, image_height, image_path, image_alt,\n") f.write(" inventory_policy, inventory_quantity, inventory_tracking,\n") f.write(" published, published_at, requires_shipping, taxable,\n") f.write(" fake_sales, display_fake_sales, mixed_wholesale, need_variant_image,\n") f.write(" has_only_default_variant, tags, note, category,\n") f.write(" shoplazza_created_at, shoplazza_updated_at, tenant_id,\n") f.write(" creator, create_time, updater, update_time, deleted\n") f.write(") VALUES\n") for i, spu in enumerate(spus): values = ( f"({spu['id']}, {spu['shop_id']}, '{escape_sql_string(spu['shoplazza_id'])}', " f"'{escape_sql_string(spu['handle'])}', '{escape_sql_string(spu['title'])}', " f"'{escape_sql_string(spu['brief'])}', '{escape_sql_string(spu['description'])}', " f"'{escape_sql_string(spu['spu'])}', '{escape_sql_string(spu['vendor'])}', " f"'{escape_sql_string(spu['vendor_url'])}', '{escape_sql_string(spu['seo_title'])}', " f"'{escape_sql_string(spu['seo_description'])}', '{escape_sql_string(spu['seo_keywords'])}', " f"'{escape_sql_string(spu['image_src'])}', {spu['image_width']}, " f"{spu['image_height']}, '{escape_sql_string(spu['image_path'])}', " f"'{escape_sql_string(spu['image_alt'])}', '{escape_sql_string(spu['inventory_policy'])}', " f"{spu['inventory_quantity']}, '{escape_sql_string(spu['inventory_tracking'])}', " f"{spu['published']}, '{escape_sql_string(spu['published_at'])}', " f"{spu['requires_shipping']}, {spu['taxable']}, " f"{spu['fake_sales']}, {spu['display_fake_sales']}, {spu['mixed_wholesale']}, " f"{spu['need_variant_image']}, {spu['has_only_default_variant']}, " f"'{escape_sql_string(spu['tags'])}', '{escape_sql_string(spu['note'])}', " f"'{escape_sql_string(spu['category'])}', '{escape_sql_string(spu['shoplazza_created_at'])}', " f"'{escape_sql_string(spu['shoplazza_updated_at'])}', '{escape_sql_string(spu['tenant_id'])}', " f"'{escape_sql_string(spu['creator'])}', '{escape_sql_string(spu['create_time'])}', " f"'{escape_sql_string(spu['updater'])}', '{escape_sql_string(spu['update_time'])}', " f"{spu['deleted']})" ) f.write(values) if i < len(spus) - 1: f.write(",\n") else: f.write(";\n\n") f.write("-- SKU Data from customer1 CSV\n") f.write("INSERT INTO shoplazza_product_sku (\n") f.write(" id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id,\n") f.write(" title, sku, barcode, position, price, compare_at_price, cost_price,\n") f.write(" option1, option2, option3, inventory_quantity, weight, weight_unit,\n") f.write(" image_src, wholesale_price, note, extend,\n") f.write(" shoplazza_created_at, shoplazza_updated_at, tenant_id,\n") f.write(" creator, create_time, updater, update_time, deleted\n") f.write(") VALUES\n") for i, sku in enumerate(skus): # Handle extend field (JSON, can be NULL) extend_value = 'NULL' if sku['extend'] is None else f"'{escape_sql_string(sku['extend'])}'" values = ( f"({sku['id']}, {sku['spu_id']}, {sku['shop_id']}, '{escape_sql_string(sku['shoplazza_id'])}', " f"'{escape_sql_string(sku['shoplazza_product_id'])}', '{escape_sql_string(sku['shoplazza_image_id'])}', " f"'{escape_sql_string(sku['title'])}', '{escape_sql_string(sku['sku'])}', " f"'{escape_sql_string(sku['barcode'])}', {sku['position']}, " f"{sku['price']}, {sku['compare_at_price']}, {sku['cost_price']}, " f"'{escape_sql_string(sku['option1'])}', '{escape_sql_string(sku['option2'])}', " f"'{escape_sql_string(sku['option3'])}', {sku['inventory_quantity']}, {sku['weight']}, " f"'{escape_sql_string(sku['weight_unit'])}', '{escape_sql_string(sku['image_src'])}', " f"'{escape_sql_string(sku['wholesale_price'])}', '{escape_sql_string(sku['note'])}', " f"{extend_value}, '{escape_sql_string(sku['shoplazza_created_at'])}', " f"'{escape_sql_string(sku['shoplazza_updated_at'])}', '{escape_sql_string(sku['tenant_id'])}', " f"'{escape_sql_string(sku['creator'])}', '{escape_sql_string(sku['create_time'])}', " f"'{escape_sql_string(sku['updater'])}', '{escape_sql_string(sku['update_time'])}', " f"{sku['deleted']})" ) f.write(values) if i < len(skus) - 1: f.write(",\n") else: f.write(";\n") def main(): parser = argparse.ArgumentParser(description='Import customer1 CSV data into MySQL Shoplazza tables') parser.add_argument('--csv-file', required=True, help='CSV file path') parser.add_argument('--tenant-id', default='2', help='Tenant ID (default: 2)') parser.add_argument('--start-spu-id', type=int, default=1, help='Starting SPU ID (default: 1)') parser.add_argument('--output', default='customer1_data.sql', help='Output SQL file (default: customer1_data.sql)') args = parser.parse_args() print(f"Reading CSV file: {args.csv_file}") csv_data_list = read_csv_file(args.csv_file) print(f"Read {len(csv_data_list)} rows from CSV") # Generate SPU and SKU data print(f"Generating SPU and SKU data (tenant_id={args.tenant_id})...") spus = [] skus = [] spu_id = args.start_spu_id for csv_data in csv_data_list: # Generate SPU spu = generate_spu_data(csv_data, spu_id, args.tenant_id) spus.append(spu) # Generate SKU - use skuId from CSV as SKU ID try: sku_id = int(csv_data['skuId']) except: # If skuId is not valid, use a generated ID sku_id = 1000000 + spu_id sku = generate_sku_data(csv_data, spu_id, sku_id, args.tenant_id) skus.append(sku) spu_id += 1 print(f"Generated {len(spus)} SPUs and {len(skus)} SKUs") # Generate SQL file print(f"Generating SQL file: {args.output}") generate_sql_inserts(spus, skus, args.output) print(f"SQL file generated: {args.output}") print(f" - SPUs: {len(spus)}") print(f" - SKUs: {len(skus)}") if __name__ == '__main__': main()