#!/usr/bin/env python3 """ 测试执行脚本 运行完整的测试流水线,包括: - 环境检查 - 单元测试 - 集成测试 - 性能测试 - 测试报告生成 """ import os import sys import subprocess import time import json import argparse import logging from pathlib import Path from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from datetime import datetime # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) @dataclass class TestResult: """测试结果数据结构""" name: str status: str # "passed", "failed", "skipped", "error" duration: float details: Optional[Dict[str, Any]] = None output: Optional[str] = None error: Optional[str] = None @dataclass class TestSuiteResult: """测试套件结果""" name: str total_tests: int passed: int failed: int skipped: int errors: int duration: float results: List[TestResult] class TestRunner: """测试运行器""" def __init__(self, config: Dict[str, Any]): self.config = config self.logger = self._setup_logger() self.results: List[TestSuiteResult] = [] self.start_time = time.time() def _setup_logger(self) -> logging.Logger: """设置日志记录器""" log_level = getattr(logging, self.config.get('log_level', 'INFO').upper()) logging.basicConfig( level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler( project_root / 'test_logs' / f'test_run_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' ) ] ) return logging.getLogger(__name__) def _run_command(self, cmd: List[str], cwd: Optional[Path] = None, env: Optional[Dict[str, str]] = None) -> subprocess.CompletedProcess: """运行命令""" try: self.logger.info(f"执行命令: {' '.join(cmd)}") # 设置环境变量 process_env = os.environ.copy() if env: process_env.update(env) result = subprocess.run( cmd, cwd=cwd or project_root, env=process_env, capture_output=True, text=True, timeout=self.config.get('test_timeout', 300) ) self.logger.debug(f"命令返回码: {result.returncode}") if result.stdout: self.logger.debug(f"标准输出: {result.stdout[:500]}...") if result.stderr: self.logger.debug(f"标准错误: {result.stderr[:500]}...") return result except subprocess.TimeoutExpired: self.logger.error(f"命令执行超时: {' '.join(cmd)}") raise except Exception as e: self.logger.error(f"命令执行失败: {e}") raise def check_environment(self) -> bool: """检查测试环境""" self.logger.info("检查测试环境...") checks = [] # 检查Python环境 try: python_version = sys.version self.logger.info(f"Python版本: {python_version}") checks.append(("Python", True, f"版本 {python_version}")) except Exception as e: checks.append(("Python", False, str(e))) # 检查conda环境 try: result = self._run_command(['conda', '--version']) if result.returncode == 0: conda_version = result.stdout.strip() self.logger.info(f"Conda版本: {conda_version}") checks.append(("Conda", True, conda_version)) else: checks.append(("Conda", False, "未找到conda")) except Exception as e: checks.append(("Conda", False, str(e))) # 检查依赖包 required_packages = [ 'pytest', 'fastapi', 'elasticsearch', 'numpy', 'torch', 'transformers', 'pyyaml' ] for package in required_packages: try: result = self._run_command(['python', '-c', f'import {package}']) if result.returncode == 0: checks.append((package, True, "已安装")) else: checks.append((package, False, "导入失败")) except Exception as e: checks.append((package, False, str(e))) # 检查Elasticsearch try: es_host = os.getenv('ES_HOST', 'http://localhost:9200') result = self._run_command(['curl', '-s', f'{es_host}/_cluster/health']) if result.returncode == 0: health_data = json.loads(result.stdout) status = health_data.get('status', 'unknown') self.logger.info(f"Elasticsearch状态: {status}") checks.append(("Elasticsearch", True, f"状态: {status}")) else: checks.append(("Elasticsearch", False, "连接失败")) except Exception as e: checks.append(("Elasticsearch", False, str(e))) # 检查API服务 try: api_host = os.getenv('API_HOST', '127.0.0.1') api_port = os.getenv('API_PORT', '6003') result = self._run_command(['curl', '-s', f'http://{api_host}:{api_port}/health']) if result.returncode == 0: health_data = json.loads(result.stdout) status = health_data.get('status', 'unknown') self.logger.info(f"API服务状态: {status}") checks.append(("API服务", True, f"状态: {status}")) else: checks.append(("API服务", False, "连接失败")) except Exception as e: checks.append(("API服务", False, str(e))) # 输出检查结果 self.logger.info("环境检查结果:") all_passed = True for name, passed, details in checks: status = "✓" if passed else "✗" self.logger.info(f" {status} {name}: {details}") if not passed: all_passed = False return all_passed def run_unit_tests(self) -> TestSuiteResult: """运行单元测试""" self.logger.info("运行单元测试...") start_time = time.time() cmd = [ 'python', '-m', 'pytest', 'tests/unit/', '-v', '--tb=short', '--json-report', '--json-report-file=test_logs/unit_test_results.json' ] try: result = self._run_command(cmd) duration = time.time() - start_time # 解析测试结果 if result.returncode == 0: status = "passed" else: status = "failed" # 尝试解析JSON报告 test_results = [] passed = failed = skipped = errors = 0 try: with open(project_root / 'test_logs' / 'unit_test_results.json', 'r') as f: report_data = json.load(f) summary = report_data.get('summary', {}) total = summary.get('total', 0) passed = summary.get('passed', 0) failed = summary.get('failed', 0) skipped = summary.get('skipped', 0) errors = summary.get('error', 0) # 获取详细结果 for test in report_data.get('tests', []): test_results.append(TestResult( name=test.get('nodeid', ''), status=test.get('outcome', 'unknown'), duration=test.get('duration', 0.0), details=test )) except Exception as e: self.logger.warning(f"无法解析单元测试JSON报告: {e}") suite_result = TestSuiteResult( name="单元测试", total_tests=passed + failed + skipped + errors, passed=passed, failed=failed, skipped=skipped, errors=errors, duration=duration, results=test_results ) self.results.append(suite_result) self.logger.info(f"单元测试完成: {suite_result.total_tests}个测试, " f"{suite_result.passed}通过, {suite_result.failed}失败, " f"{suite_result.skipped}跳过, {suite_result.errors}错误") return suite_result except Exception as e: self.logger.error(f"单元测试执行失败: {e}") raise def run_integration_tests(self) -> TestSuiteResult: """运行集成测试""" self.logger.info("运行集成测试...") start_time = time.time() cmd = [ 'python', '-m', 'pytest', 'tests/integration/', '-v', '--tb=short', '-m', 'not slow', # 排除慢速测试 '--json-report', '--json-report-file=test_logs/integration_test_results.json' ] try: result = self._run_command(cmd) duration = time.time() - start_time # 解析测试结果 if result.returncode == 0: status = "passed" else: status = "failed" # 尝试解析JSON报告 test_results = [] passed = failed = skipped = errors = 0 try: with open(project_root / 'test_logs' / 'integration_test_results.json', 'r') as f: report_data = json.load(f) summary = report_data.get('summary', {}) total = summary.get('total', 0) passed = summary.get('passed', 0) failed = summary.get('failed', 0) skipped = summary.get('skipped', 0) errors = summary.get('error', 0) for test in report_data.get('tests', []): test_results.append(TestResult( name=test.get('nodeid', ''), status=test.get('outcome', 'unknown'), duration=test.get('duration', 0.0), details=test )) except Exception as e: self.logger.warning(f"无法解析集成测试JSON报告: {e}") suite_result = TestSuiteResult( name="集成测试", total_tests=passed + failed + skipped + errors, passed=passed, failed=failed, skipped=skipped, errors=errors, duration=duration, results=test_results ) self.results.append(suite_result) self.logger.info(f"集成测试完成: {suite_result.total_tests}个测试, " f"{suite_result.passed}通过, {suite_result.failed}失败, " f"{suite_result.skipped}跳过, {suite_result.errors}错误") return suite_result except Exception as e: self.logger.error(f"集成测试执行失败: {e}") raise def run_api_tests(self) -> TestSuiteResult: """运行API测试""" self.logger.info("运行API测试...") start_time = time.time() cmd = [ 'python', '-m', 'pytest', 'tests/integration/test_api_integration.py', '-v', '--tb=short', '--json-report', '--json-report-file=test_logs/api_test_results.json' ] try: result = self._run_command(cmd) duration = time.time() - start_time # 解析测试结果 if result.returncode == 0: status = "passed" else: status = "failed" # 尝试解析JSON报告 test_results = [] passed = failed = skipped = errors = 0 try: with open(project_root / 'test_logs' / 'api_test_results.json', 'r') as f: report_data = json.load(f) summary = report_data.get('summary', {}) total = summary.get('total', 0) passed = summary.get('passed', 0) failed = summary.get('failed', 0) skipped = summary.get('skipped', 0) errors = summary.get('error', 0) for test in report_data.get('tests', []): test_results.append(TestResult( name=test.get('nodeid', ''), status=test.get('outcome', 'unknown'), duration=test.get('duration', 0.0), details=test )) except Exception as e: self.logger.warning(f"无法解析API测试JSON报告: {e}") suite_result = TestSuiteResult( name="API测试", total_tests=passed + failed + skipped + errors, passed=passed, failed=failed, skipped=skipped, errors=errors, duration=duration, results=test_results ) self.results.append(suite_result) self.logger.info(f"API测试完成: {suite_result.total_tests}个测试, " f"{suite_result.passed}通过, {suite_result.failed}失败, " f"{suite_result.skipped}跳过, {suite_result.errors}错误") return suite_result except Exception as e: self.logger.error(f"API测试执行失败: {e}") raise def run_performance_tests(self) -> TestSuiteResult: """运行性能测试""" self.logger.info("运行性能测试...") start_time = time.time() # 简单的性能测试 - 测试搜索响应时间 test_queries = [ "红色连衣裙", "智能手机", "笔记本电脑 AND (游戏 OR 办公)", "无线蓝牙耳机" ] test_results = [] passed = failed = 0 for query in test_queries: try: query_start = time.time() result = self._run_command([ 'curl', '-s', f'http://{os.getenv("API_HOST", "127.0.0.1")}:{os.getenv("API_PORT", "6003")}/search', '-d', f'q={query}' ]) query_duration = time.time() - query_start if result.returncode == 0: response_data = json.loads(result.stdout) took_ms = response_data.get('took_ms', 0) # 性能阈值:响应时间不超过2秒 if took_ms <= 2000: test_results.append(TestResult( name=f"搜索性能测试: {query}", status="passed", duration=query_duration, details={"took_ms": took_ms, "response_size": len(result.stdout)} )) passed += 1 else: test_results.append(TestResult( name=f"搜索性能测试: {query}", status="failed", duration=query_duration, details={"took_ms": took_ms, "threshold": 2000} )) failed += 1 else: test_results.append(TestResult( name=f"搜索性能测试: {query}", status="failed", duration=query_duration, error=result.stderr )) failed += 1 except Exception as e: test_results.append(TestResult( name=f"搜索性能测试: {query}", status="error", duration=0.0, error=str(e) )) failed += 1 duration = time.time() - start_time suite_result = TestSuiteResult( name="性能测试", total_tests=len(test_results), passed=passed, failed=failed, skipped=0, errors=0, duration=duration, results=test_results ) self.results.append(suite_result) self.logger.info(f"性能测试完成: {suite_result.total_tests}个测试, " f"{suite_result.passed}通过, {suite_result.failed}失败") return suite_result def generate_report(self) -> str: """生成测试报告""" self.logger.info("生成测试报告...") # 计算总体统计 total_tests = sum(suite.total_tests for suite in self.results) total_passed = sum(suite.passed for suite in self.results) total_failed = sum(suite.failed for suite in self.results) total_skipped = sum(suite.skipped for suite in self.results) total_errors = sum(suite.errors for suite in self.results) total_duration = sum(suite.duration for suite in self.results) # 生成报告数据 report_data = { "timestamp": datetime.now().isoformat(), "summary": { "total_tests": total_tests, "passed": total_passed, "failed": total_failed, "skipped": total_skipped, "errors": total_errors, "success_rate": (total_passed / total_tests * 100) if total_tests > 0 else 0, "total_duration": total_duration }, "suites": [asdict(suite) for suite in self.results] } # 保存JSON报告 report_file = project_root / 'test_logs' / f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' with open(report_file, 'w', encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) # 生成文本报告 text_report = self._generate_text_report(report_data) report_file_text = project_root / 'test_logs' / f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt' with open(report_file_text, 'w', encoding='utf-8') as f: f.write(text_report) self.logger.info(f"测试报告已保存: {report_file}") self.logger.info(f"文本报告已保存: {report_file_text}") return text_report def _generate_text_report(self, report_data: Dict[str, Any]) -> str: """生成文本格式的测试报告""" lines = [] # 标题 lines.append("=" * 60) lines.append("搜索引擎测试报告") lines.append("=" * 60) lines.append(f"时间: {report_data['timestamp']}") lines.append("") # 摘要 summary = report_data['summary'] lines.append("测试摘要") lines.append("-" * 30) lines.append(f"总测试数: {summary['total_tests']}") lines.append(f"通过: {summary['passed']}") lines.append(f"失败: {summary['failed']}") lines.append(f"跳过: {summary['skipped']}") lines.append(f"错误: {summary['errors']}") lines.append(f"成功率: {summary['success_rate']:.1f}%") lines.append(f"总耗时: {summary['total_duration']:.2f}秒") lines.append("") # 各测试套件详情 lines.append("测试套件详情") lines.append("-" * 30) for suite in report_data['suites']: lines.append(f"\n{suite['name']}:") lines.append(f" 总数: {suite['total_tests']}, 通过: {suite['passed']}, " f"失败: {suite['failed']}, 跳过: {suite['skipped']}, 错误: {suite['errors']}") lines.append(f" 耗时: {suite['duration']:.2f}秒") # 显示失败的测试 failed_tests = [r for r in suite['results'] if r['status'] in ['failed', 'error']] if failed_tests: lines.append(" 失败的测试:") for test in failed_tests[:5]: # 只显示前5个 lines.append(f" - {test['name']}: {test['status']}") if test.get('error'): lines.append(f" 错误: {test['error'][:100]}...") if len(failed_tests) > 5: lines.append(f" ... 还有 {len(failed_tests) - 5} 个失败的测试") return "\n".join(lines) def run_all_tests(self) -> bool: """运行所有测试""" try: # 确保日志目录存在 (project_root / 'test_logs').mkdir(exist_ok=True) # 加载环境变量 env_file = project_root / 'test_env.sh' if env_file.exists(): self.logger.info("加载测试环境变量...") result = self._run_command(['bash', str(env_file)]) if result.returncode != 0: self.logger.warning("环境变量加载失败,继续使用默认配置") # 检查环境 if not self.check_environment(): self.logger.error("环境检查失败,请先启动测试环境") return False # 运行各类测试 test_suites = [ ("unit", self.run_unit_tests), ("integration", self.run_integration_tests), ("api", self.run_api_tests), ("performance", self.run_performance_tests) ] failed_suites = [] for suite_name, suite_func in test_suites: if suite_name in self.config.get('skip_suites', []): self.logger.info(f"跳过 {suite_name} 测试") continue try: suite_result = suite_func() if suite_result.failed > 0 or suite_result.errors > 0: failed_suites.append(suite_name) except Exception as e: self.logger.error(f"{suite_name} 测试执行失败: {e}") failed_suites.append(suite_name) # 生成报告 report = self.generate_report() print(report) # 返回测试结果 return len(failed_suites) == 0 except Exception as e: self.logger.error(f"测试执行失败: {e}") return False def main(): """主函数""" parser = argparse.ArgumentParser(description="运行搜索引擎测试流水线") parser.add_argument('--skip-suites', nargs='+', choices=['unit', 'integration', 'api', 'performance'], help='跳过指定的测试套件') parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='日志级别') parser.add_argument('--test-timeout', type=int, default=300, help='单个测试超时时间(秒)') parser.add_argument('--start-env', action='store_true', help='启动测试环境后运行测试') parser.add_argument('--stop-env', action='store_true', help='测试完成后停止测试环境') args = parser.parse_args() # 配置 config = { 'skip_suites': args.skip_suites or [], 'log_level': args.log_level, 'test_timeout': args.test_timeout } # 启动环境 if args.start_env: print("启动测试环境...") result = subprocess.run([ 'bash', str(project_root / 'scripts' / 'start_test_environment.sh') ], capture_output=True, text=True) if result.returncode != 0: print(f"测试环境启动失败: {result.stderr}") return 1 print("测试环境启动成功") time.sleep(5) # 等待服务完全启动 try: # 运行测试 runner = TestRunner(config) success = runner.run_all_tests() if success: print("\n🎉 所有测试通过!") return_code = 0 else: print("\n❌ 部分测试失败,请查看日志") return_code = 1 finally: # 停止环境 if args.stop_env: print("\n停止测试环境...") subprocess.run([ 'bash', str(project_root / 'scripts' / 'stop_test_environment.sh') ]) return return_code if __name__ == "__main__": sys.exit(main())