#!/usr/bin/env python3 """ 诊断脚本:检查MySQL数据源中分类和规格信息是否正确 检查: 1. category_path 字段是否有值 2. category_path 格式是否正确(应该能被解析为 category1_name) 3. shoplazza_product_option 表的 name 字段是否有值(应该是 "color", "size", "material") 4. shoplazza_product_sku 表的 option1/2/3 字段是否有值 """ import sys import argparse from pathlib import Path from sqlalchemy import create_engine, text # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from utils.db_connector import create_db_connection def check_category_path(db_engine, tenant_id: str): """检查 category_path 和 category 字段""" print("\n" + "="*60) print("1. 检查 category_path 和 category 字段") print("="*60) query = text(""" SELECT COUNT(*) as total, COUNT(category_path) as has_category_path, COUNT(*) - COUNT(category_path) as null_category_path, COUNT(category) as has_category, COUNT(*) - COUNT(category) as null_category FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 """) with db_engine.connect() as conn: result = conn.execute(query, {"tenant_id": tenant_id}).fetchone() total = result[0] has_category_path = result[1] null_category_path = result[2] has_category = result[3] null_category = result[4] print(f"总SPU数: {total}") print(f"有 category_path 的SPU: {has_category_path}") print(f"category_path 为空的SPU: {null_category_path}") print(f"有 category 的SPU: {has_category}") print(f"category 为空的SPU: {null_category}") # 查看category字段的示例 if has_category > 0: sample_query = text(""" SELECT id, title, category_path, category, category_id, category_level FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 AND category IS NOT NULL LIMIT 5 """) samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall() print(f"\n示例数据(前5条有 category 的记录):") for row in samples: print(f" SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}") print(f" category_path: {row[2]}") print(f" category: '{row[3]}'") print(f" category_id: {row[4]}, category_level: {row[5]}") # 解析 category 字段(用于生成 category1_name) if row[3]: category = str(row[3]) if '/' in category: path_parts = category.split('/') print(f" 解析后(按'/'分割): {path_parts}") if len(path_parts) > 0: print(f" → category1_name: '{path_parts[0].strip()}'") else: print(f" → category1_name: '{category.strip()}'(直接作为category1_name)") else: print("\n⚠️ 警告: 没有SPU有 category 值!") # 查看category_path的示例(如果有) if has_category_path > 0: sample_query = text(""" SELECT id, title, category_path, category FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 AND category_path IS NOT NULL LIMIT 3 """) samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall() print(f"\n示例数据(有 category_path 的记录):") for row in samples: print(f" SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}") print(f" category_path: '{row[2]}'") print(f" category: '{row[3]}'") # 检查是否是ID列表格式 if row[2] and ',' in str(row[2]) and not '/' in str(row[2]): print(f" ⚠️ 注意: category_path是ID列表格式(逗号分隔),不是路径格式") def check_options(db_engine, tenant_id: str): """检查 option 表的 name 字段""" print("\n" + "="*60) print("2. 检查 shoplazza_product_option 表的 name 字段") print("="*60) query = text(""" SELECT COUNT(*) as total_options, COUNT(DISTINCT name) as distinct_names, COUNT(DISTINCT spu_id) as spus_with_options FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 """) with db_engine.connect() as conn: result = conn.execute(query, {"tenant_id": tenant_id}).fetchone() total_options = result[0] distinct_names = result[1] spus_with_options = result[2] print(f"总 option 记录数: {total_options}") print(f"不同的 name 数量: {distinct_names}") print(f"有 option 定义的 SPU 数量: {spus_with_options}") if total_options > 0: # 查看不同的 name 值 name_query = text(""" SELECT DISTINCT name, position, COUNT(*) as count FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 GROUP BY name, position ORDER BY position, name """) names = conn.execute(name_query, {"tenant_id": tenant_id}).fetchall() print(f"\n不同的 name 值:") for row in names: print(f" position={row[1]}, name='{row[0]}', count={row[2]}") # 查看一些示例 sample_query = text(""" SELECT spu_id, position, name, `values` FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 ORDER BY spu_id, position LIMIT 10 """) samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall() print(f"\n示例数据(前10条 option 记录):") for row in samples: print(f" SPU ID: {row[0]}, position: {row[1]}, name: '{row[2]}', values: {row[3]}") else: print("\n⚠️ 警告: 没有 option 记录!") def check_sku_options(db_engine, tenant_id: str): """检查 SKU 表的 option1/2/3 字段""" print("\n" + "="*60) print("3. 检查 shoplazza_product_sku 表的 option1/2/3 字段") print("="*60) query = text(""" SELECT COUNT(*) as total_skus, COUNT(option1) as has_option1, COUNT(option2) as has_option2, COUNT(option3) as has_option3, COUNT(DISTINCT spu_id) as distinct_spus FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 """) with db_engine.connect() as conn: result = conn.execute(query, {"tenant_id": tenant_id}).fetchone() total_skus = result[0] has_option1 = result[1] has_option2 = result[2] has_option3 = result[3] distinct_spus = result[4] print(f"总 SKU 数: {total_skus}") print(f"有 option1 的 SKU: {has_option1}") print(f"有 option2 的 SKU: {has_option2}") print(f"有 option3 的 SKU: {has_option3}") print(f"不同的 SPU 数量: {distinct_spus}") if total_skus > 0: # 查看一些示例 sample_query = text(""" SELECT spu_id, id, option1, option2, option3 FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 ORDER BY spu_id, id LIMIT 10 """) samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall() print(f"\n示例数据(前10条 SKU 记录):") for row in samples: print(f" SPU ID: {row[0]}, SKU ID: {row[1]}") print(f" option1: '{row[2]}', option2: '{row[3]}', option3: '{row[4]}'") else: print("\n⚠️ 警告: 没有 SKU 记录!") def check_spu_summary(db_engine, tenant_id: str): """检查 SPU 汇总信息""" print("\n" + "="*60) print("4. SPU 汇总信息") print("="*60) query = text(""" SELECT COUNT(DISTINCT spu.id) as total_spus, COUNT(DISTINCT sku.id) as total_skus, COUNT(DISTINCT opt.id) as total_options, COUNT(DISTINCT CASE WHEN spu.category_path IS NOT NULL THEN spu.id END) as spus_with_category_path, COUNT(DISTINCT opt.spu_id) as spus_with_options FROM shoplazza_product_spu spu LEFT JOIN shoplazza_product_sku sku ON spu.id = sku.spu_id AND sku.tenant_id = :tenant_id AND sku.deleted = 0 LEFT JOIN shoplazza_product_option opt ON spu.id = opt.spu_id AND opt.tenant_id = :tenant_id AND opt.deleted = 0 WHERE spu.tenant_id = :tenant_id AND spu.deleted = 0 """) with db_engine.connect() as conn: result = conn.execute(query, {"tenant_id": tenant_id}).fetchone() total_spus = result[0] total_skus = result[1] total_options = result[2] spus_with_category_path = result[3] spus_with_options = result[4] print(f"总 SPU 数: {total_spus}") print(f"总 SKU 数: {total_skus}") print(f"总 option 记录数: {total_options}") print(f"有 category_path 的 SPU: {spus_with_category_path}") print(f"有 option 定义的 SPU: {spus_with_options}") def main(): parser = argparse.ArgumentParser(description='检查MySQL数据源中的分类和规格信息') parser.add_argument('--tenant-id', required=True, help='Tenant ID') parser.add_argument('--db-host', help='MySQL host (或使用环境变量 DB_HOST)') parser.add_argument('--db-port', type=int, help='MySQL port (或使用环境变量 DB_PORT, 默认: 3306)') parser.add_argument('--db-database', help='MySQL database (或使用环境变量 DB_DATABASE)') parser.add_argument('--db-username', help='MySQL username (或使用环境变量 DB_USERNAME)') parser.add_argument('--db-password', help='MySQL password (或使用环境变量 DB_PASSWORD)') args = parser.parse_args() # 连接数据库 import os db_host = args.db_host or os.environ.get('DB_HOST') db_port = args.db_port or int(os.environ.get('DB_PORT', 3306)) db_database = args.db_database or os.environ.get('DB_DATABASE') db_username = args.db_username or os.environ.get('DB_USERNAME') db_password = args.db_password or os.environ.get('DB_PASSWORD') if not all([db_host, db_database, db_username, db_password]): print("错误: MySQL连接参数不完整") print("请提供 --db-host, --db-database, --db-username, --db-password") print("或设置环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") return 1 print(f"连接MySQL: {db_host}:{db_port}/{db_database}") print(f"Tenant ID: {args.tenant_id}") try: db_engine = create_db_connection( host=db_host, port=db_port, database=db_database, username=db_username, password=db_password ) print("✓ MySQL连接成功\n") except Exception as e: print(f"✗ 连接MySQL失败: {e}") return 1 # 执行检查 check_spu_summary(db_engine, args.tenant_id) check_category_path(db_engine, args.tenant_id) check_options(db_engine, args.tenant_id) check_sku_options(db_engine, args.tenant_id) print("\n" + "="*60) print("检查完成") print("="*60) return 0 if __name__ == '__main__': sys.exit(main())