#!/usr/bin/env python3 """ 索引功能测试脚本。 测试内容: 1. 全量索引(SPUTransformer) 2. 增量索引(IncrementalIndexerService) 3. 租户配置加载 4. 翻译功能集成(根据租户配置) 5. 文档转换器功能 """ import sys import os from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from config import ConfigLoader from config.tenant_config_loader import get_tenant_config_loader from utils.db_connector import create_db_connection from indexer.spu_transformer import SPUTransformer from indexer.incremental_service import IncrementalIndexerService import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def test_tenant_config(): """测试租户配置加载""" print("\n" + "="*60) print("测试1: 租户配置加载") print("="*60) try: tenant_config_loader = get_tenant_config_loader() # 测试默认配置 default_config = tenant_config_loader.get_tenant_config("999") print(f"默认配置: {default_config}") assert "index_languages" in default_config, "默认配置应包含 index_languages" assert "en" in default_config["index_languages"] and "zh" in default_config["index_languages"], \ "默认 index_languages 应包含 en, zh" print("✓ 默认配置正确(index_languages 含 en, zh)") # 测试租户162(index_languages: zh, en) tenant_162_config = tenant_config_loader.get_tenant_config("162") print(f"租户162配置: {tenant_162_config}") idx = tenant_162_config.get("index_languages") or [] assert "zh" in idx and "en" in idx, "租户162 index_languages 应包含 zh, en" print("✓ 租户162配置正确(index_languages 含 zh, en)") # 测试租户1 tenant_1_config = tenant_config_loader.get_tenant_config("1") print(f"租户1配置: {tenant_1_config}") idx1 = tenant_1_config.get("index_languages") or [] assert "zh" in idx1 and "en" in idx1, "租户1 index_languages 应包含 zh, en" assert tenant_1_config.get("primary_language") == "zh", "租户1 主语言为 zh" print("✓ 租户1配置正确(index_languages 含 zh, en,主语言 zh)") return True except Exception as e: print(f"✗ 租户配置测试失败: {e}") import traceback traceback.print_exc() return False def test_full_indexing(tenant_id: str = "162"): """测试全量索引""" print("\n" + "="*60) print(f"测试2: 全量索引(租户{tenant_id})") print("="*60) # 获取数据库配置 db_host = os.environ.get('DB_HOST') db_port = int(os.environ.get('DB_PORT', 3306)) db_database = os.environ.get('DB_DATABASE') db_username = os.environ.get('DB_USERNAME') db_password = os.environ.get('DB_PASSWORD') if not all([db_host, db_database, db_username, db_password]): print("✗ 跳过:数据库配置不完整") print(" 需要环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") return False try: # 连接数据库 db_engine = create_db_connection( host=db_host, port=db_port, database=db_database, username=db_username, password=db_password ) print(f"✓ 数据库连接成功: {db_host}:{db_port}/{db_database}") # 创建转换器 transformer = SPUTransformer(db_engine, tenant_id) print(f"✓ SPUTransformer初始化成功") # 转换数据(只转换前3个SPU用于测试) print(f"\n开始转换数据(租户{tenant_id})...") documents = transformer.transform_batch() if not documents: print(f"⚠ 没有数据需要转换") return True print(f"✓ 转换完成: {len(documents)} 个文档") # 检查前3个文档 for i, doc in enumerate(documents[:3]): print(f"\n文档 {i+1}:") print(f" SPU ID: {doc.get('spu_id')}") print(f" Tenant ID: {doc.get('tenant_id')}") title_obj = doc.get("title") or {} print(f" 标题 (中文): {title_obj.get('zh', 'N/A') if isinstance(title_obj, dict) else 'N/A'}") print(f" 标题 (英文): {title_obj.get('en', 'N/A') if isinstance(title_obj, dict) else 'N/A'}") # 租户162 index_languages [zh, en],应有 title.en if tenant_id == "162": if isinstance(title_obj, dict) and title_obj.get("en"): print(f" ✓ 多语言索引正常(title.en 已填充)") else: print(f" ⚠ 警告:租户162 配置 [zh,en],但 title.en 为空") return True except Exception as e: print(f"✗ 全量索引测试失败: {e}") import traceback traceback.print_exc() return False def test_incremental_indexing(tenant_id: str = "162"): """测试增量索引""" print("\n" + "="*60) print(f"测试3: 增量索引(租户{tenant_id})") print("="*60) # 获取数据库配置 db_host = os.environ.get('DB_HOST') db_port = int(os.environ.get('DB_PORT', 3306)) db_database = os.environ.get('DB_DATABASE') db_username = os.environ.get('DB_USERNAME') db_password = os.environ.get('DB_PASSWORD') if not all([db_host, db_database, db_username, db_password]): print("✗ 跳过:数据库配置不完整") return False try: # 连接数据库 db_engine = create_db_connection( host=db_host, port=db_port, database=db_database, username=db_username, password=db_password ) # 创建增量服务 service = IncrementalIndexerService(db_engine) print(f"✓ IncrementalIndexerService初始化成功") # 先查询一个SPU ID from sqlalchemy import text with db_engine.connect() as conn: query = text(""" SELECT id FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 LIMIT 1 """) result = conn.execute(query, {"tenant_id": tenant_id}) row = result.fetchone() if not row: print(f"⚠ 租户{tenant_id}没有数据,跳过增量测试") return True spu_id = str(row[0]) print(f"\n测试SPU ID: {spu_id}") # 获取SPU文档 doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) if doc is None: print(f"✗ SPU {spu_id} 文档获取失败") return False print(f"✓ SPU文档获取成功") print(f" SPU ID: {doc.get('spu_id')}") print(f" Tenant ID: {doc.get('tenant_id')}") title_obj = doc.get("title") or {} print(f" 标题 (中文): {title_obj.get('zh', 'N/A') if isinstance(title_obj, dict) else 'N/A'}") print(f" 标题 (英文): {title_obj.get('en', 'N/A') if isinstance(title_obj, dict) else 'N/A'}") print(f" SKU数量: {len(doc.get('skus', []))}") print(f" 规格数量: {len(doc.get('specifications', []))}") # 租户162 配置了 index_languages [zh, en],应有 title.en if tenant_id == "162": if isinstance(title_obj, dict) and title_obj.get("en"): print(f" ✓ 多语言索引正常(title.en 已填充)") else: print(f" ⚠ 警告:租户162 配置 [zh,en],但 title.en 为空") return True except Exception as e: print(f"✗ 增量索引测试失败: {e}") import traceback traceback.print_exc() return False def test_document_transformer(): """测试文档转换器""" print("\n" + "="*60) print("测试4: 文档转换器") print("="*60) try: import pandas as pd from indexer.document_transformer import SPUDocumentTransformer from config import ConfigLoader config = ConfigLoader().load_config() # 创建模拟数据 spu_row = pd.Series({ 'id': 123, 'tenant_id': '162', 'title': '测试商品', 'brief': '测试简介', 'description': '测试描述', 'vendor': '测试品牌', 'category': '测试类目', 'category_id': 100, 'category_level': 1, 'fake_sales': 1000, 'image_src': 'https://example.com/image.jpg', 'tags': '测试,标签', 'create_time': pd.Timestamp.now(), 'update_time': pd.Timestamp.now() }) skus_df = pd.DataFrame([{ 'id': 456, 'price': 99.99, 'compare_at_price': 149.99, 'sku': 'SKU001', 'inventory_quantity': 100, 'option1': '黑色', 'option2': None, 'option3': None }]) options_df = pd.DataFrame([{ 'id': 1, 'position': 1, 'name': '颜色' }]) # 获取租户配置 tenant_config_loader = get_tenant_config_loader() tenant_config = tenant_config_loader.get_tenant_config('162') # 初始化翻译器(测试环境总是启用,具体翻译方向由tenant_config控制) from query.translator import Translator translator = Translator( api_key=config.query_config.translation_api_key, use_cache=True ) # 创建转换器 transformer = SPUDocumentTransformer( category_id_to_name={}, searchable_option_dimensions=['option1', 'option2', 'option3'], tenant_config=tenant_config, translator=translator, translation_prompts=config.query_config.translation_prompts ) # 转换文档 doc = transformer.transform_spu_to_doc( tenant_id='162', spu_row=spu_row, skus=skus_df, options=options_df ) if doc: print(f"✓ 文档转换成功") title_obj = doc.get("title") or {} print(f" title.zh: {title_obj.get('zh') if isinstance(title_obj, dict) else None}") print(f" title.en: {title_obj.get('en') if isinstance(title_obj, dict) else None}") print(f" SKU数量: {len(doc.get('skus', []))}") # 租户162 index_languages [zh, en],主语言 zh,应有 zh(原文)与 en(翻译) if isinstance(title_obj, dict) and title_obj.get("zh") and title_obj.get("en"): print(f" ✓ 多语言字段正确(zh + en)") elif isinstance(title_obj, dict) and title_obj.get("zh"): print(f" ⚠ 仅有 zh(若未配置翻译或翻译未调用可接受)") else: print(f" ⚠ 未发现预期多语言字段") return True else: print(f"✗ 文档转换失败") return False except Exception as e: print(f"✗ 文档转换器测试失败: {e}") import traceback traceback.print_exc() return False def main(): """主测试函数""" print("="*60) print("索引功能完整测试") print("="*60) results = [] # 测试1: 租户配置 results.append(("租户配置加载", test_tenant_config())) # 测试2: 全量索引(租户162) results.append(("全量索引(租户162)", test_full_indexing("162"))) # 测试3: 增量索引(租户162) results.append(("增量索引(租户162)", test_incremental_indexing("162"))) # 测试4: 文档转换器 results.append(("文档转换器", test_document_transformer())) # 总结 print("\n" + "="*60) print("测试总结") print("="*60) passed = sum(1 for _, result in results if result) total = len(results) for name, result in results: status = "✓ 通过" if result else "✗ 失败" print(f"{status}: {name}") print(f"\n总计: {passed}/{total} 通过") if passed == total: print("✓ 所有测试通过") return 0 else: print("✗ 部分测试失败") return 1 if __name__ == '__main__': sys.exit(main())