import requests import time import statistics from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple import json class EmbeddingPerformanceTester: def __init__(self, base_url: str = "http://127.0.0.1:9997/v1"): """初始化性能测试器""" self.base_url = base_url self.embeddings_url = f"{base_url}/embeddings" def test_single_request(self, model: str, input_text: List[str]) -> Tuple[bool, float]: """测试单个请求,返回成功状态和耗时""" try: start_time = time.perf_counter() # 构建请求体 # 如果 input_text 是列表,取第一个元素;如果是字符串,直接使用 input_value = input_text[0] if isinstance(input_text, list) and len(input_text) > 0 else input_text response = requests.post( self.embeddings_url, headers={ 'accept': 'application/json', 'Content-Type': 'application/json' }, json={ "model": model, "input": input_value }, timeout=60 # 设置超时时间 ) end_time = time.perf_counter() # 验证响应格式 if response.status_code == 200: result = response.json() if result and 'data' in result and len(result['data']) > 0: return True, end_time - start_time else: return False, end_time - start_time else: return False, end_time - start_time except Exception as e: print(f"请求失败 - 模型 {model}: {str(e)}") return False, 0.0 def test_model_sequential(self, model: str, input_text: List[str], iterations: int = 1000) -> Dict: """顺序执行性能测试""" print(f"\n开始顺序测试模型: {model}") print(f"测试次数: {iterations}") successes = 0 failures = 0 latencies = [] for i in range(iterations): if i % 100 == 0 and i > 0: print(f" 已完成 {i}/{iterations} 次请求...") success, latency = self.test_single_request(model, input_text) if success: successes += 1 latencies.append(latency) else: failures += 1 return self._calculate_stats(model, successes, failures, latencies) def test_model_concurrent(self, model: str, input_text: List[str], iterations: int = 1000, max_workers: int = 10) -> Dict: """并发执行性能测试""" print(f"\n开始并发测试模型: {model}") print(f"测试次数: {iterations}, 并发数: {max_workers}") successes = 0 failures = 0 latencies = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_request = { executor.submit(self.test_single_request, model, input_text): i for i in range(iterations) } # 收集结果 completed = 0 for future in as_completed(future_to_request): completed += 1 if completed % 100 == 0: print(f" 已完成 {completed}/{iterations} 次请求...") try: success, latency = future.result() if success: successes += 1 latencies.append(latency) else: failures += 1 except Exception as e: print(f"请求异常: {str(e)}") failures += 1 return self._calculate_stats(model, successes, failures, latencies) def _calculate_stats(self, model: str, successes: int, failures: int, latencies: List[float]) -> Dict: """计算性能统计信息""" if not latencies: return { "model": model, "total_requests": successes + failures, "successful_requests": successes, "failed_requests": failures, "success_rate": 0.0, "error": "无成功请求" } stats = { "model": model, "total_requests": successes + failures, "successful_requests": successes, "failed_requests": failures, "success_rate": successes / (successes + failures) * 100, "total_time": sum(latencies), "avg_latency": statistics.mean(latencies), "min_latency": min(latencies), "max_latency": max(latencies), "p50_latency": statistics.median(latencies), "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)], "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)], "requests_per_second": len(latencies) / sum(latencies) if sum(latencies) > 0 else 0 } # 添加标准差(如果有多于一个样本) if len(latencies) > 1: stats["std_dev"] = statistics.stdev(latencies) return stats def print_results(self, results: Dict): """打印测试结果""" print("\n" + "="*60) print(f"性能测试结果 - {results['model']}") print("="*60) if "error" in results: print(f"错误: {results['error']}") return print(f"总请求数: {results['total_requests']}") print(f"成功请求: {results['successful_requests']}") print(f"失败请求: {results['failed_requests']}") print(f"成功率: {results['success_rate']:.2f}%") print(f"总耗时: {results['total_time']:.4f}秒") print(f"平均延迟: {results['avg_latency']:.4f}秒") print(f"最小延迟: {results['min_latency']:.4f}秒") print(f"最大延迟: {results['max_latency']:.4f}秒") print(f"P50延迟: {results['p50_latency']:.4f}秒") print(f"P95延迟: {results['p95_latency']:.4f}秒") print(f"P99延迟: {results['p99_latency']:.4f}秒") if "std_dev" in results: print(f"标准差: {results['std_dev']:.4f}秒") print(f"QPS: {results['requests_per_second']:.2f} 请求/秒") print("="*60) def save_results(self, results_list: List[Dict], filename: str = "performance_results.json"): """保存测试结果到JSON文件""" with open(filename, 'w', encoding='utf-8') as f: json.dump(results_list, f, indent=2, ensure_ascii=False) print(f"\n结果已保存到: {filename}") def main(): """主函数""" # 初始化测试器 tester = EmbeddingPerformanceTester() # 测试配置 test_input = ["What is the capital of China?"] iterations = 1000 test_models = ['bge-m3', 'Qwen3-Embedding-0.6B'] print("="*60) print("Embedding API 性能测试 (HTTP)") print("="*60) all_results = [] # 测试模式选择 print("\n选择测试模式:") print("1. 顺序测试 (Sequential)") print("2. 并发测试 (Concurrent)") print("3. 两种模式都测试") mode = input("请输入选择 (1/2/3, 默认1): ").strip() for model in test_models: print(f"\n{'='*60}") print(f"测试模型: {model}") print(f"{'='*60}") if mode in ['2', '3']: # 并发测试 concurrent_results = tester.test_model_concurrent( model=model, input_text=test_input, iterations=iterations, max_workers=10 # 可根据需要调整并发数 ) tester.print_results(concurrent_results) concurrent_results["test_mode"] = "concurrent" all_results.append(concurrent_results) if mode in ['1', '3'] or not mode: # 顺序测试 sequential_results = tester.test_model_sequential( model=model, input_text=test_input, iterations=iterations ) tester.print_results(sequential_results) sequential_results["test_mode"] = "sequential" all_results.append(sequential_results) # 保存结果 tester.save_results(all_results) # 汇总对比 print("\n" + "="*60) print("性能测试汇总对比") print("="*60) for result in all_results: if "error" not in result: print(f"\n模型: {result['model']} ({result['test_mode']})") print(f" QPS: {result['requests_per_second']:.2f}") print(f" 平均延迟: {result['avg_latency']:.4f}秒") print(f" 成功率: {result['success_rate']:.2f}%") if __name__ == "__main__": # 添加一个简单的健康检查 try: tester = EmbeddingPerformanceTester() # 快速测试连接 test_result = tester.test_single_request('bge-m3', ["test"]) if test_result[0]: print("API连接正常,开始性能测试...") main() else: print("API连接失败,请检查服务是否正常运行") except Exception as e: print(f"初始化失败: {str(e)}") print("请确保 requests 库已安装: pip install requests")