test_indexing.py 11.5 KB
#!/usr/bin/env python3
"""
索引功能测试脚本。

测试内容:
1. 全量索引(SPUTransformer)
2. 增量索引(IncrementalIndexerService)
3. 租户配置加载
4. 翻译功能集成(根据租户配置)
5. 文档转换器功能
"""

import sys
import os
from pathlib import Path

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from config import ConfigLoader
from config.tenant_config_loader import get_tenant_config_loader
from utils.db_connector import create_db_connection
from indexer.spu_transformer import SPUTransformer
from indexer.incremental_service import IncrementalIndexerService
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


def test_tenant_config():
    """测试租户配置加载"""
    print("\n" + "="*60)
    print("测试1: 租户配置加载")
    print("="*60)
    
    try:
        tenant_config_loader = get_tenant_config_loader()
        
        # 测试默认配置
        default_config = tenant_config_loader.get_tenant_config("999")
        print(f"默认配置: {default_config}")
        
        # 测试租户162(翻译关闭)
        tenant_162_config = tenant_config_loader.get_tenant_config("162")
        print(f"租户162配置: {tenant_162_config}")
        assert tenant_162_config['translate_to_en'] == False, "租户162翻译应该关闭"
        assert tenant_162_config['translate_to_zh'] == False, "租户162翻译应该关闭"
        print("✓ 租户162配置正确(翻译关闭)")
        
        # 测试其他租户
        tenant_1_config = tenant_config_loader.get_tenant_config("1")
        print(f"租户1配置: {tenant_1_config}")
        assert tenant_1_config['translate_to_en'] == True, "租户1应该启用英文翻译"
        print("✓ 租户1配置正确(翻译开启)")
        
        return True
    except Exception as e:
        print(f"✗ 租户配置测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False


def test_full_indexing(tenant_id: str = "162"):
    """测试全量索引"""
    print("\n" + "="*60)
    print(f"测试2: 全量索引(租户{tenant_id})")
    print("="*60)
    
    # 获取数据库配置
    db_host = os.environ.get('DB_HOST')
    db_port = int(os.environ.get('DB_PORT', 3306))
    db_database = os.environ.get('DB_DATABASE')
    db_username = os.environ.get('DB_USERNAME')
    db_password = os.environ.get('DB_PASSWORD')
    
    if not all([db_host, db_database, db_username, db_password]):
        print("✗ 跳过:数据库配置不完整")
        print("  需要环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
        return False
    
    try:
        # 连接数据库
        db_engine = create_db_connection(
            host=db_host,
            port=db_port,
            database=db_database,
            username=db_username,
            password=db_password
        )
        print(f"✓ 数据库连接成功: {db_host}:{db_port}/{db_database}")
        
        # 创建转换器
        transformer = SPUTransformer(db_engine, tenant_id)
        print(f"✓ SPUTransformer初始化成功")
        
        # 转换数据(只转换前3个SPU用于测试)
        print(f"\n开始转换数据(租户{tenant_id})...")
        documents = transformer.transform_batch()
        
        if not documents:
            print(f"⚠ 没有数据需要转换")
            return True
        
        print(f"✓ 转换完成: {len(documents)} 个文档")
        
        # 检查前3个文档
        for i, doc in enumerate(documents[:3]):
            print(f"\n文档 {i+1}:")
            print(f"  SPU ID: {doc.get('spu_id')}")
            print(f"  Tenant ID: {doc.get('tenant_id')}")
            print(f"  标题 (中文): {doc.get('title_zh', 'N/A')}")
            print(f"  标题 (英文): {doc.get('title_en', 'N/A')}")
            
            # 检查租户162的翻译状态
            if tenant_id == "162":
                # 租户162翻译应该关闭,title_en应该为None
                if doc.get('title_en') is None:
                    print(f"  ✓ 翻译已关闭(title_en为None)")
                else:
                    print(f"  ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}")
        
        return True
        
    except Exception as e:
        print(f"✗ 全量索引测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False


def test_incremental_indexing(tenant_id: str = "162"):
    """测试增量索引"""
    print("\n" + "="*60)
    print(f"测试3: 增量索引(租户{tenant_id})")
    print("="*60)
    
    # 获取数据库配置
    db_host = os.environ.get('DB_HOST')
    db_port = int(os.environ.get('DB_PORT', 3306))
    db_database = os.environ.get('DB_DATABASE')
    db_username = os.environ.get('DB_USERNAME')
    db_password = os.environ.get('DB_PASSWORD')
    
    if not all([db_host, db_database, db_username, db_password]):
        print("✗ 跳过:数据库配置不完整")
        return False
    
    try:
        # 连接数据库
        db_engine = create_db_connection(
            host=db_host,
            port=db_port,
            database=db_database,
            username=db_username,
            password=db_password
        )
        
        # 创建增量服务
        service = IncrementalIndexerService(db_engine)
        print(f"✓ IncrementalIndexerService初始化成功")
        
        # 先查询一个SPU ID
        from sqlalchemy import text
        with db_engine.connect() as conn:
            query = text("""
                SELECT id FROM shoplazza_product_spu
                WHERE tenant_id = :tenant_id AND deleted = 0
                LIMIT 1
            """)
            result = conn.execute(query, {"tenant_id": tenant_id})
            row = result.fetchone()
            if not row:
                print(f"⚠ 租户{tenant_id}没有数据,跳过增量测试")
                return True
            spu_id = str(row[0])
        
        print(f"\n测试SPU ID: {spu_id}")
        
        # 获取SPU文档
        doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
        
        if doc is None:
            print(f"✗ SPU {spu_id} 文档获取失败")
            return False
        
        print(f"✓ SPU文档获取成功")
        print(f"  SPU ID: {doc.get('spu_id')}")
        print(f"  Tenant ID: {doc.get('tenant_id')}")
        print(f"  标题 (中文): {doc.get('title_zh', 'N/A')}")
        print(f"  标题 (英文): {doc.get('title_en', 'N/A')}")
        print(f"  SKU数量: {len(doc.get('skus', []))}")
        print(f"  规格数量: {len(doc.get('specifications', []))}")
        
        # 检查租户162的翻译状态
        if tenant_id == "162":
            if doc.get('title_en') is None:
                print(f"  ✓ 翻译已关闭(title_en为None)")
            else:
                print(f"  ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}")
        
        return True
        
    except Exception as e:
        print(f"✗ 增量索引测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False


def test_document_transformer():
    """测试文档转换器"""
    print("\n" + "="*60)
    print("测试4: 文档转换器")
    print("="*60)
    
    try:
        import pandas as pd
        from indexer.document_transformer import SPUDocumentTransformer
        from config import ConfigLoader
        
        config = ConfigLoader().load_config()
        
        # 创建模拟数据
        spu_row = pd.Series({
            'id': 123,
            'tenant_id': '162',
            'title': '测试商品',
            'brief': '测试简介',
            'description': '测试描述',
            'vendor': '测试品牌',
            'category': '测试类目',
            'category_id': 100,
            'category_level': 1,
            'fake_sales': 1000,
            'image_src': 'https://example.com/image.jpg',
            'tags': '测试,标签',
            'create_time': pd.Timestamp.now(),
            'update_time': pd.Timestamp.now()
        })
        
        skus_df = pd.DataFrame([{
            'id': 456,
            'price': 99.99,
            'compare_at_price': 149.99,
            'sku': 'SKU001',
            'inventory_quantity': 100,
            'option1': '黑色',
            'option2': None,
            'option3': None
        }])
        
        options_df = pd.DataFrame([{
            'id': 1,
            'position': 1,
            'name': '颜色'
        }])
        
        # 获取租户配置
        tenant_config_loader = get_tenant_config_loader()
        tenant_config = tenant_config_loader.get_tenant_config('162')
        
        # 初始化翻译器(如果启用)
        translator = None
        if config.query_config.enable_translation:
            from query.translator import Translator
            translator = Translator(
                api_key=config.query_config.translation_api_key,
                use_cache=True
            )
        
        # 创建转换器
        transformer = SPUDocumentTransformer(
            category_id_to_name={},
            searchable_option_dimensions=['option1', 'option2', 'option3'],
            tenant_config=tenant_config,
            translator=translator,
            translation_prompts=config.query_config.translation_prompts
        )
        
        # 转换文档
        doc = transformer.transform_spu_to_doc(
            tenant_id='162',
            spu_row=spu_row,
            skus=skus_df,
            options=options_df
        )
        
        if doc:
            print(f"✓ 文档转换成功")
            print(f"  title_zh: {doc.get('title_zh')}")
            print(f"  title_en: {doc.get('title_en')}")
            print(f"  SKU数量: {len(doc.get('skus', []))}")
            
            # 验证租户162翻译关闭
            if doc.get('title_en') is None:
                print(f"  ✓ 翻译已关闭(符合租户162配置)")
            else:
                print(f"  ⚠ 警告:翻译应该关闭")
            
            return True
        else:
            print(f"✗ 文档转换失败")
            return False
        
    except Exception as e:
        print(f"✗ 文档转换器测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False


def main():
    """主测试函数"""
    print("="*60)
    print("索引功能完整测试")
    print("="*60)
    
    results = []
    
    # 测试1: 租户配置
    results.append(("租户配置加载", test_tenant_config()))
    
    # 测试2: 全量索引(租户162)
    results.append(("全量索引(租户162)", test_full_indexing("162")))
    
    # 测试3: 增量索引(租户162)
    results.append(("增量索引(租户162)", test_incremental_indexing("162")))
    
    # 测试4: 文档转换器
    results.append(("文档转换器", test_document_transformer()))
    
    # 总结
    print("\n" + "="*60)
    print("测试总结")
    print("="*60)
    
    passed = sum(1 for _, result in results if result)
    total = len(results)
    
    for name, result in results:
        status = "✓ 通过" if result else "✗ 失败"
        print(f"{status}: {name}")
    
    print(f"\n总计: {passed}/{total} 通过")
    
    if passed == total:
        print("✓ 所有测试通过")
        return 0
    else:
        print("✗ 部分测试失败")
        return 1


if __name__ == '__main__':
    sys.exit(main())