perfermance_test_http.py 9.64 KB
import requests
import time
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
import json


class EmbeddingPerformanceTester:
    def __init__(self, base_url: str = "http://127.0.0.1:9997/v1"):
        """初始化性能测试器"""
        self.base_url = base_url
        self.embeddings_url = f"{base_url}/embeddings"
        
    def test_single_request(self, model: str, input_text: List[str]) -> Tuple[bool, float]:
        """测试单个请求,返回成功状态和耗时"""
        try:
            start_time = time.perf_counter()
            
            # 构建请求体
            # 如果 input_text 是列表,取第一个元素;如果是字符串,直接使用
            input_value = input_text[0] if isinstance(input_text, list) and len(input_text) > 0 else input_text
            
            response = requests.post(
                self.embeddings_url,
                headers={
                    'accept': 'application/json',
                    'Content-Type': 'application/json'
                },
                json={
                    "model": model,
                    "input": input_value
                },
                timeout=60  # 设置超时时间
            )
            
            end_time = time.perf_counter()
            
            # 验证响应格式
            if response.status_code == 200:
                result = response.json()
                if result and 'data' in result and len(result['data']) > 0:
                    return True, end_time - start_time
                else:
                    return False, end_time - start_time
            else:
                return False, end_time - start_time
                
        except Exception as e:
            print(f"请求失败 - 模型 {model}: {str(e)}")
            return False, 0.0
    
    def test_model_sequential(self, model: str, input_text: List[str], 
                             iterations: int = 1000) -> Dict:
        """顺序执行性能测试"""
        print(f"\n开始顺序测试模型: {model}")
        print(f"测试次数: {iterations}")
        
        successes = 0
        failures = 0
        latencies = []
        
        for i in range(iterations):
            if i % 100 == 0 and i > 0:
                print(f"  已完成 {i}/{iterations} 次请求...")
                
            success, latency = self.test_single_request(model, input_text)
            
            if success:
                successes += 1
                latencies.append(latency)
            else:
                failures += 1
        
        return self._calculate_stats(model, successes, failures, latencies)
    
    def test_model_concurrent(self, model: str, input_text: List[str],
                             iterations: int = 1000, max_workers: int = 10) -> Dict:
        """并发执行性能测试"""
        print(f"\n开始并发测试模型: {model}")
        print(f"测试次数: {iterations}, 并发数: {max_workers}")
        
        successes = 0
        failures = 0
        latencies = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_request = {
                executor.submit(self.test_single_request, model, input_text): i
                for i in range(iterations)
            }
            
            # 收集结果
            completed = 0
            for future in as_completed(future_to_request):
                completed += 1
                if completed % 100 == 0:
                    print(f"  已完成 {completed}/{iterations} 次请求...")
                
                try:
                    success, latency = future.result()
                    if success:
                        successes += 1
                        latencies.append(latency)
                    else:
                        failures += 1
                except Exception as e:
                    print(f"请求异常: {str(e)}")
                    failures += 1
        
        return self._calculate_stats(model, successes, failures, latencies)
    
    def _calculate_stats(self, model: str, successes: int, 
                        failures: int, latencies: List[float]) -> Dict:
        """计算性能统计信息"""
        if not latencies:
            return {
                "model": model,
                "total_requests": successes + failures,
                "successful_requests": successes,
                "failed_requests": failures,
                "success_rate": 0.0,
                "error": "无成功请求"
            }
        
        stats = {
            "model": model,
            "total_requests": successes + failures,
            "successful_requests": successes,
            "failed_requests": failures,
            "success_rate": successes / (successes + failures) * 100,
            "total_time": sum(latencies),
            "avg_latency": statistics.mean(latencies),
            "min_latency": min(latencies),
            "max_latency": max(latencies),
            "p50_latency": statistics.median(latencies),
            "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)],
            "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)],
            "requests_per_second": len(latencies) / sum(latencies) if sum(latencies) > 0 else 0
        }
        
        # 添加标准差(如果有多于一个样本)
        if len(latencies) > 1:
            stats["std_dev"] = statistics.stdev(latencies)
        
        return stats
    
    def print_results(self, results: Dict):
        """打印测试结果"""
        print("\n" + "="*60)
        print(f"性能测试结果 - {results['model']}")
        print("="*60)
        
        if "error" in results:
            print(f"错误: {results['error']}")
            return
            
        print(f"总请求数: {results['total_requests']}")
        print(f"成功请求: {results['successful_requests']}")
        print(f"失败请求: {results['failed_requests']}")
        print(f"成功率: {results['success_rate']:.2f}%")
        print(f"总耗时: {results['total_time']:.4f}秒")
        print(f"平均延迟: {results['avg_latency']:.4f}秒")
        print(f"最小延迟: {results['min_latency']:.4f}秒")
        print(f"最大延迟: {results['max_latency']:.4f}秒")
        print(f"P50延迟: {results['p50_latency']:.4f}秒")
        print(f"P95延迟: {results['p95_latency']:.4f}秒")
        print(f"P99延迟: {results['p99_latency']:.4f}秒")
        
        if "std_dev" in results:
            print(f"标准差: {results['std_dev']:.4f}秒")
        
        print(f"QPS: {results['requests_per_second']:.2f} 请求/秒")
        print("="*60)
    
    def save_results(self, results_list: List[Dict], filename: str = "performance_results.json"):
        """保存测试结果到JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(results_list, f, indent=2, ensure_ascii=False)
        print(f"\n结果已保存到: {filename}")


def main():
    """主函数"""
    # 初始化测试器
    tester = EmbeddingPerformanceTester()
    
    # 测试配置
    test_input = ["What is the capital of China?"]
    iterations = 1000
    test_models = ['bge-m3', 'Qwen3-Embedding-0.6B']
    
    print("="*60)
    print("Embedding API 性能测试 (HTTP)")
    print("="*60)
    
    all_results = []
    
    # 测试模式选择
    print("\n选择测试模式:")
    print("1. 顺序测试 (Sequential)")
    print("2. 并发测试 (Concurrent)")
    print("3. 两种模式都测试")
    
    mode = input("请输入选择 (1/2/3, 默认1): ").strip()
    
    for model in test_models:
        print(f"\n{'='*60}")
        print(f"测试模型: {model}")
        print(f"{'='*60}")
        
        if mode in ['2', '3']:
            # 并发测试
            concurrent_results = tester.test_model_concurrent(
                model=model,
                input_text=test_input,
                iterations=iterations,
                max_workers=10  # 可根据需要调整并发数
            )
            tester.print_results(concurrent_results)
            concurrent_results["test_mode"] = "concurrent"
            all_results.append(concurrent_results)
        
        if mode in ['1', '3'] or not mode:
            # 顺序测试
            sequential_results = tester.test_model_sequential(
                model=model,
                input_text=test_input,
                iterations=iterations
            )
            tester.print_results(sequential_results)
            sequential_results["test_mode"] = "sequential"
            all_results.append(sequential_results)
    
    # 保存结果
    tester.save_results(all_results)
    
    # 汇总对比
    print("\n" + "="*60)
    print("性能测试汇总对比")
    print("="*60)
    
    for result in all_results:
        if "error" not in result:
            print(f"\n模型: {result['model']} ({result['test_mode']})")
            print(f"  QPS: {result['requests_per_second']:.2f}")
            print(f"  平均延迟: {result['avg_latency']:.4f}秒")
            print(f"  成功率: {result['success_rate']:.2f}%")


if __name__ == "__main__":
    # 添加一个简单的健康检查
    try:
        tester = EmbeddingPerformanceTester()
        # 快速测试连接
        test_result = tester.test_single_request('bge-m3', ["test"])
        if test_result[0]:
            print("API连接正常,开始性能测试...")
            main()
        else:
            print("API连接失败,请检查服务是否正常运行")
    except Exception as e:
        print(f"初始化失败: {str(e)}")
        print("请确保 requests 库已安装: pip install requests")