#!/usr/bin/env python3 """ 索引功能测试脚本。 测试内容: 1. 全量索引(SPUTransformer) 2. 增量索引(IncrementalIndexerService) 3. 租户配置加载 4. 翻译功能集成(根据租户配置) 5. 文档转换器功能 """ import sys import os from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from config import ConfigLoader from config.tenant_config_loader import get_tenant_config_loader from utils.db_connector import create_db_connection from indexer.spu_transformer import SPUTransformer from indexer.incremental_service import IncrementalIndexerService import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def test_tenant_config(): """测试租户配置加载""" print("\n" + "="*60) print("测试1: 租户配置加载") print("="*60) try: tenant_config_loader = get_tenant_config_loader() # 测试默认配置 default_config = tenant_config_loader.get_tenant_config("999") print(f"默认配置: {default_config}") # 测试租户162(翻译关闭) tenant_162_config = tenant_config_loader.get_tenant_config("162") print(f"租户162配置: {tenant_162_config}") assert tenant_162_config['translate_to_en'] == False, "租户162翻译应该关闭" assert tenant_162_config['translate_to_zh'] == False, "租户162翻译应该关闭" print("✓ 租户162配置正确(翻译关闭)") # 测试其他租户 tenant_1_config = tenant_config_loader.get_tenant_config("1") print(f"租户1配置: {tenant_1_config}") assert tenant_1_config['translate_to_en'] == True, "租户1应该启用英文翻译" print("✓ 租户1配置正确(翻译开启)") return True except Exception as e: print(f"✗ 租户配置测试失败: {e}") import traceback traceback.print_exc() return False def test_full_indexing(tenant_id: str = "162"): """测试全量索引""" print("\n" + "="*60) print(f"测试2: 全量索引(租户{tenant_id})") print("="*60) # 获取数据库配置 db_host = os.environ.get('DB_HOST') db_port = int(os.environ.get('DB_PORT', 3306)) db_database = os.environ.get('DB_DATABASE') db_username = os.environ.get('DB_USERNAME') db_password = os.environ.get('DB_PASSWORD') if not all([db_host, db_database, db_username, db_password]): print("✗ 跳过:数据库配置不完整") print(" 需要环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") return False try: # 连接数据库 db_engine = create_db_connection( host=db_host, port=db_port, database=db_database, username=db_username, password=db_password ) print(f"✓ 数据库连接成功: {db_host}:{db_port}/{db_database}") # 创建转换器 transformer = SPUTransformer(db_engine, tenant_id) print(f"✓ SPUTransformer初始化成功") # 转换数据(只转换前3个SPU用于测试) print(f"\n开始转换数据(租户{tenant_id})...") documents = transformer.transform_batch() if not documents: print(f"⚠ 没有数据需要转换") return True print(f"✓ 转换完成: {len(documents)} 个文档") # 检查前3个文档 for i, doc in enumerate(documents[:3]): print(f"\n文档 {i+1}:") print(f" SPU ID: {doc.get('spu_id')}") print(f" Tenant ID: {doc.get('tenant_id')}") print(f" 标题 (中文): {doc.get('title_zh', 'N/A')}") print(f" 标题 (英文): {doc.get('title_en', 'N/A')}") # 检查租户162的翻译状态 if tenant_id == "162": # 租户162翻译应该关闭,title_en应该为None if doc.get('title_en') is None: print(f" ✓ 翻译已关闭(title_en为None)") else: print(f" ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}") return True except Exception as e: print(f"✗ 全量索引测试失败: {e}") import traceback traceback.print_exc() return False def test_incremental_indexing(tenant_id: str = "162"): """测试增量索引""" print("\n" + "="*60) print(f"测试3: 增量索引(租户{tenant_id})") print("="*60) # 获取数据库配置 db_host = os.environ.get('DB_HOST') db_port = int(os.environ.get('DB_PORT', 3306)) db_database = os.environ.get('DB_DATABASE') db_username = os.environ.get('DB_USERNAME') db_password = os.environ.get('DB_PASSWORD') if not all([db_host, db_database, db_username, db_password]): print("✗ 跳过:数据库配置不完整") return False try: # 连接数据库 db_engine = create_db_connection( host=db_host, port=db_port, database=db_database, username=db_username, password=db_password ) # 创建增量服务 service = IncrementalIndexerService(db_engine) print(f"✓ IncrementalIndexerService初始化成功") # 先查询一个SPU ID from sqlalchemy import text with db_engine.connect() as conn: query = text(""" SELECT id FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 LIMIT 1 """) result = conn.execute(query, {"tenant_id": tenant_id}) row = result.fetchone() if not row: print(f"⚠ 租户{tenant_id}没有数据,跳过增量测试") return True spu_id = str(row[0]) print(f"\n测试SPU ID: {spu_id}") # 获取SPU文档 doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) if doc is None: print(f"✗ SPU {spu_id} 文档获取失败") return False print(f"✓ SPU文档获取成功") print(f" SPU ID: {doc.get('spu_id')}") print(f" Tenant ID: {doc.get('tenant_id')}") print(f" 标题 (中文): {doc.get('title_zh', 'N/A')}") print(f" 标题 (英文): {doc.get('title_en', 'N/A')}") print(f" SKU数量: {len(doc.get('skus', []))}") print(f" 规格数量: {len(doc.get('specifications', []))}") # 检查租户162的翻译状态 if tenant_id == "162": if doc.get('title_en') is None: print(f" ✓ 翻译已关闭(title_en为None)") else: print(f" ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}") return True except Exception as e: print(f"✗ 增量索引测试失败: {e}") import traceback traceback.print_exc() return False def test_document_transformer(): """测试文档转换器""" print("\n" + "="*60) print("测试4: 文档转换器") print("="*60) try: import pandas as pd from indexer.document_transformer import SPUDocumentTransformer from config import ConfigLoader config = ConfigLoader().load_config() # 创建模拟数据 spu_row = pd.Series({ 'id': 123, 'tenant_id': '162', 'title': '测试商品', 'brief': '测试简介', 'description': '测试描述', 'vendor': '测试品牌', 'category': '测试类目', 'category_id': 100, 'category_level': 1, 'fake_sales': 1000, 'image_src': 'https://example.com/image.jpg', 'tags': '测试,标签', 'create_time': pd.Timestamp.now(), 'update_time': pd.Timestamp.now() }) skus_df = pd.DataFrame([{ 'id': 456, 'price': 99.99, 'compare_at_price': 149.99, 'sku': 'SKU001', 'inventory_quantity': 100, 'option1': '黑色', 'option2': None, 'option3': None }]) options_df = pd.DataFrame([{ 'id': 1, 'position': 1, 'name': '颜色' }]) # 获取租户配置 tenant_config_loader = get_tenant_config_loader() tenant_config = tenant_config_loader.get_tenant_config('162') # 初始化翻译器(如果启用) translator = None if config.query_config.enable_translation: from query.translator import Translator translator = Translator( api_key=config.query_config.translation_api_key, use_cache=True ) # 创建转换器 transformer = SPUDocumentTransformer( category_id_to_name={}, searchable_option_dimensions=['option1', 'option2', 'option3'], tenant_config=tenant_config, translator=translator, translation_prompts=config.query_config.translation_prompts ) # 转换文档 doc = transformer.transform_spu_to_doc( tenant_id='162', spu_row=spu_row, skus=skus_df, options=options_df ) if doc: print(f"✓ 文档转换成功") print(f" title_zh: {doc.get('title_zh')}") print(f" title_en: {doc.get('title_en')}") print(f" SKU数量: {len(doc.get('skus', []))}") # 验证租户162翻译关闭 if doc.get('title_en') is None: print(f" ✓ 翻译已关闭(符合租户162配置)") else: print(f" ⚠ 警告:翻译应该关闭") return True else: print(f"✗ 文档转换失败") return False except Exception as e: print(f"✗ 文档转换器测试失败: {e}") import traceback traceback.print_exc() return False def main(): """主测试函数""" print("="*60) print("索引功能完整测试") print("="*60) results = [] # 测试1: 租户配置 results.append(("租户配置加载", test_tenant_config())) # 测试2: 全量索引(租户162) results.append(("全量索引(租户162)", test_full_indexing("162"))) # 测试3: 增量索引(租户162) results.append(("增量索引(租户162)", test_incremental_indexing("162"))) # 测试4: 文档转换器 results.append(("文档转换器", test_document_transformer())) # 总结 print("\n" + "="*60) print("测试总结") print("="*60) passed = sum(1 for _, result in results if result) total = len(results) for name, result in results: status = "✓ 通过" if result else "✗ 失败" print(f"{status}: {name}") print(f"\n总计: {passed}/{total} 通过") if passed == total: print("✓ 所有测试通过") return 0 else: print("✗ 部分测试失败") return 1 if __name__ == '__main__': sys.exit(main())