check_data_source.py 11.9 KB
#!/usr/bin/env python3
"""
诊断脚本:检查MySQL数据源中分类和规格信息是否正确

检查:
1. category_path 字段是否有值
2. category_path 格式是否正确(应该能被解析为 category1_name)
3. shoplazza_product_option 表的 name 字段是否有值(应该是 "color", "size", "material")
4. shoplazza_product_sku 表的 option1/2/3 字段是否有值
"""

import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.db_connector import create_db_connection


def check_category_path(db_engine, tenant_id: str):
    """检查 category_path 和 category 字段"""
    print("\n" + "="*60)
    print("1. 检查 category_path 和 category 字段")
    print("="*60)
    
    query = text("""
        SELECT 
            COUNT(*) as total,
            COUNT(category_path) as has_category_path,
            COUNT(*) - COUNT(category_path) as null_category_path,
            COUNT(category) as has_category,
            COUNT(*) - COUNT(category) as null_category
        FROM shoplazza_product_spu
        WHERE tenant_id = :tenant_id AND deleted = 0
    """)
    
    with db_engine.connect() as conn:
        result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
        total = result[0]
        has_category_path = result[1]
        null_category_path = result[2]
        has_category = result[3]
        null_category = result[4]
        
        print(f"总SPU数: {total}")
        print(f"有 category_path 的SPU: {has_category_path}")
        print(f"category_path 为空的SPU: {null_category_path}")
        print(f"有 category 的SPU: {has_category}")
        print(f"category 为空的SPU: {null_category}")
        
        # 查看category字段的示例
        if has_category > 0:
            sample_query = text("""
                SELECT id, title, category_path, category, category_id, category_level
                FROM shoplazza_product_spu
                WHERE tenant_id = :tenant_id 
                  AND deleted = 0 
                  AND category IS NOT NULL
                LIMIT 5
            """)
            samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
            print(f"\n示例数据(前5条有 category 的记录):")
            for row in samples:
                print(f"  SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}")
                print(f"    category_path: {row[2]}")
                print(f"    category: '{row[3]}'")
                print(f"    category_id: {row[4]}, category_level: {row[5]}")
                
                # 解析 category 字段(用于生成 category1_name)
                if row[3]:
                    category = str(row[3])
                    if '/' in category:
                        path_parts = category.split('/')
                        print(f"    解析后(按'/'分割): {path_parts}")
                        if len(path_parts) > 0:
                            print(f"    → category1_name: '{path_parts[0].strip()}'")
                    else:
                        print(f"    → category1_name: '{category.strip()}'(直接作为category1_name)")
        else:
            print("\n⚠️ 警告: 没有SPU有 category 值!")
        
        # 查看category_path的示例(如果有)
        if has_category_path > 0:
            sample_query = text("""
                SELECT id, title, category_path, category
                FROM shoplazza_product_spu
                WHERE tenant_id = :tenant_id 
                  AND deleted = 0 
                  AND category_path IS NOT NULL
                LIMIT 3
            """)
            samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
            print(f"\n示例数据(有 category_path 的记录):")
            for row in samples:
                print(f"  SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}")
                print(f"    category_path: '{row[2]}'")
                print(f"    category: '{row[3]}'")
                
                # 检查是否是ID列表格式
                if row[2] and ',' in str(row[2]) and not '/' in str(row[2]):
                    print(f"    ⚠️  注意: category_path是ID列表格式(逗号分隔),不是路径格式")


def check_options(db_engine, tenant_id: str):
    """检查 option 表的 name 字段"""
    print("\n" + "="*60)
    print("2. 检查 shoplazza_product_option 表的 name 字段")
    print("="*60)
    
    query = text("""
        SELECT 
            COUNT(*) as total_options,
            COUNT(DISTINCT name) as distinct_names,
            COUNT(DISTINCT spu_id) as spus_with_options
        FROM shoplazza_product_option
        WHERE tenant_id = :tenant_id AND deleted = 0
    """)
    
    with db_engine.connect() as conn:
        result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
        total_options = result[0]
        distinct_names = result[1]
        spus_with_options = result[2]
        
        print(f"总 option 记录数: {total_options}")
        print(f"不同的 name 数量: {distinct_names}")
        print(f"有 option 定义的 SPU 数量: {spus_with_options}")
        
        if total_options > 0:
            # 查看不同的 name 值
            name_query = text("""
                SELECT DISTINCT name, position, COUNT(*) as count
                FROM shoplazza_product_option
                WHERE tenant_id = :tenant_id AND deleted = 0
                GROUP BY name, position
                ORDER BY position, name
            """)
            names = conn.execute(name_query, {"tenant_id": tenant_id}).fetchall()
            print(f"\n不同的 name 值:")
            for row in names:
                print(f"  position={row[1]}, name='{row[0]}', count={row[2]}")
                
            # 查看一些示例
            sample_query = text("""
                SELECT spu_id, position, name, `values`
                FROM shoplazza_product_option
                WHERE tenant_id = :tenant_id AND deleted = 0
                ORDER BY spu_id, position
                LIMIT 10
            """)
            samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
            print(f"\n示例数据(前10条 option 记录):")
            for row in samples:
                print(f"  SPU ID: {row[0]}, position: {row[1]}, name: '{row[2]}', values: {row[3]}")
        else:
            print("\n⚠️ 警告: 没有 option 记录!")


def check_sku_options(db_engine, tenant_id: str):
    """检查 SKU 表的 option1/2/3 字段"""
    print("\n" + "="*60)
    print("3. 检查 shoplazza_product_sku 表的 option1/2/3 字段")
    print("="*60)
    
    query = text("""
        SELECT 
            COUNT(*) as total_skus,
            COUNT(option1) as has_option1,
            COUNT(option2) as has_option2,
            COUNT(option3) as has_option3,
            COUNT(DISTINCT spu_id) as distinct_spus
        FROM shoplazza_product_sku
        WHERE tenant_id = :tenant_id AND deleted = 0
    """)
    
    with db_engine.connect() as conn:
        result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
        total_skus = result[0]
        has_option1 = result[1]
        has_option2 = result[2]
        has_option3 = result[3]
        distinct_spus = result[4]
        
        print(f"总 SKU 数: {total_skus}")
        print(f"有 option1 的 SKU: {has_option1}")
        print(f"有 option2 的 SKU: {has_option2}")
        print(f"有 option3 的 SKU: {has_option3}")
        print(f"不同的 SPU 数量: {distinct_spus}")
        
        if total_skus > 0:
            # 查看一些示例
            sample_query = text("""
                SELECT spu_id, id, option1, option2, option3
                FROM shoplazza_product_sku
                WHERE tenant_id = :tenant_id AND deleted = 0
                ORDER BY spu_id, id
                LIMIT 10
            """)
            samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
            print(f"\n示例数据(前10条 SKU 记录):")
            for row in samples:
                print(f"  SPU ID: {row[0]}, SKU ID: {row[1]}")
                print(f"    option1: '{row[2]}', option2: '{row[3]}', option3: '{row[4]}'")
        else:
            print("\n⚠️ 警告: 没有 SKU 记录!")


def check_spu_summary(db_engine, tenant_id: str):
    """检查 SPU 汇总信息"""
    print("\n" + "="*60)
    print("4. SPU 汇总信息")
    print("="*60)
    
    query = text("""
        SELECT 
            COUNT(DISTINCT spu.id) as total_spus,
            COUNT(DISTINCT sku.id) as total_skus,
            COUNT(DISTINCT opt.id) as total_options,
            COUNT(DISTINCT CASE WHEN spu.category_path IS NOT NULL THEN spu.id END) as spus_with_category_path,
            COUNT(DISTINCT opt.spu_id) as spus_with_options
        FROM shoplazza_product_spu spu
        LEFT JOIN shoplazza_product_sku sku ON spu.id = sku.spu_id AND sku.tenant_id = :tenant_id AND sku.deleted = 0
        LEFT JOIN shoplazza_product_option opt ON spu.id = opt.spu_id AND opt.tenant_id = :tenant_id AND opt.deleted = 0
        WHERE spu.tenant_id = :tenant_id AND spu.deleted = 0
    """)
    
    with db_engine.connect() as conn:
        result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
        total_spus = result[0]
        total_skus = result[1]
        total_options = result[2]
        spus_with_category_path = result[3]
        spus_with_options = result[4]
        
        print(f"总 SPU 数: {total_spus}")
        print(f"总 SKU 数: {total_skus}")
        print(f"总 option 记录数: {total_options}")
        print(f"有 category_path 的 SPU: {spus_with_category_path}")
        print(f"有 option 定义的 SPU: {spus_with_options}")


def main():
    parser = argparse.ArgumentParser(description='检查MySQL数据源中的分类和规格信息')
    parser.add_argument('--tenant-id', required=True, help='Tenant ID')
    parser.add_argument('--db-host', help='MySQL host (或使用环境变量 DB_HOST)')
    parser.add_argument('--db-port', type=int, help='MySQL port (或使用环境变量 DB_PORT, 默认: 3306)')
    parser.add_argument('--db-database', help='MySQL database (或使用环境变量 DB_DATABASE)')
    parser.add_argument('--db-username', help='MySQL username (或使用环境变量 DB_USERNAME)')
    parser.add_argument('--db-password', help='MySQL password (或使用环境变量 DB_PASSWORD)')
    
    args = parser.parse_args()
    
    # 连接数据库
    import os
    db_host = args.db_host or os.environ.get('DB_HOST')
    db_port = args.db_port or int(os.environ.get('DB_PORT', 3306))
    db_database = args.db_database or os.environ.get('DB_DATABASE')
    db_username = args.db_username or os.environ.get('DB_USERNAME')
    db_password = args.db_password or os.environ.get('DB_PASSWORD')
    
    if not all([db_host, db_database, db_username, db_password]):
        print("错误: MySQL连接参数不完整")
        print("请提供 --db-host, --db-database, --db-username, --db-password")
        print("或设置环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
        return 1
    
    print(f"连接MySQL: {db_host}:{db_port}/{db_database}")
    print(f"Tenant ID: {args.tenant_id}")
    
    try:
        db_engine = create_db_connection(
            host=db_host,
            port=db_port,
            database=db_database,
            username=db_username,
            password=db_password
        )
        print("✓ MySQL连接成功\n")
    except Exception as e:
        print(f"✗ 连接MySQL失败: {e}")
        return 1
    
    # 执行检查
    check_spu_summary(db_engine, args.tenant_id)
    check_category_path(db_engine, args.tenant_id)
    check_options(db_engine, args.tenant_id)
    check_sku_options(db_engine, args.tenant_id)
    
    print("\n" + "="*60)
    print("检查完成")
    print("="*60)
    
    return 0


if __name__ == '__main__':
    sys.exit(main())