Blame view

third-party/xinference/perfermance_test.py 8.97 KB
775db2b0   tangwang   xinfer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
  import openai
  import time
  import statistics
  from concurrent.futures import ThreadPoolExecutor, as_completed
  from typing import List, Dict, Tuple
  import json
  
  
  class EmbeddingPerformanceTester:
      def __init__(self, base_url: str = "http://127.0.0.1:9997/v1"):
          """初始化性能测试器"""
          self.client = openai.Client(
              api_key="cannot be empty",  # 实际使用时请替换为真实API Key
              base_url=base_url
          )
          
      def test_single_request(self, model: str, input_text: List[str]) -> Tuple[bool, float]:
          """测试单个请求,返回成功状态和耗时"""
          try:
              start_time = time.perf_counter()
              response = self.client.embeddings.create(
                  model=model,
                  input=input_text
              )
              end_time = time.perf_counter()
              
              # 验证响应格式
              if response and hasattr(response, 'data'):
                  return True, end_time - start_time
              else:
                  return False, end_time - start_time
                  
          except Exception as e:
              print(f"请求失败 - 模型 {model}: {str(e)}")
              return False, 0.0
      
      def test_model_sequential(self, model: str, input_text: List[str], 
                               iterations: int = 1000) -> Dict:
          """顺序执行性能测试"""
          print(f"\n开始顺序测试模型: {model}")
          print(f"测试次数: {iterations}")
          
          successes = 0
          failures = 0
          latencies = []
          
          for i in range(iterations):
              if i % 100 == 0 and i > 0:
                  print(f"  已完成 {i}/{iterations} 次请求...")
                  
              success, latency = self.test_single_request(model, input_text)
              
              if success:
                  successes += 1
                  latencies.append(latency)
              else:
                  failures += 1
          
          return self._calculate_stats(model, successes, failures, latencies)
      
      def test_model_concurrent(self, model: str, input_text: List[str],
                               iterations: int = 1000, max_workers: int = 10) -> Dict:
          """并发执行性能测试"""
          print(f"\n开始并发测试模型: {model}")
          print(f"测试次数: {iterations}, 并发数: {max_workers}")
          
          successes = 0
          failures = 0
          latencies = []
          
          with ThreadPoolExecutor(max_workers=max_workers) as executor:
              # 提交所有任务
              future_to_request = {
                  executor.submit(self.test_single_request, model, input_text): i
                  for i in range(iterations)
              }
              
              # 收集结果
              completed = 0
              for future in as_completed(future_to_request):
                  completed += 1
                  if completed % 100 == 0:
                      print(f"  已完成 {completed}/{iterations} 次请求...")
                  
                  try:
                      success, latency = future.result()
                      if success:
                          successes += 1
                          latencies.append(latency)
                      else:
                          failures += 1
                  except Exception as e:
                      print(f"请求异常: {str(e)}")
                      failures += 1
          
          return self._calculate_stats(model, successes, failures, latencies)
      
      def _calculate_stats(self, model: str, successes: int, 
                          failures: int, latencies: List[float]) -> Dict:
          """计算性能统计信息"""
          if not latencies:
              return {
                  "model": model,
                  "total_requests": successes + failures,
                  "successful_requests": successes,
                  "failed_requests": failures,
                  "success_rate": 0.0,
                  "error": "无成功请求"
              }
          
          stats = {
              "model": model,
              "total_requests": successes + failures,
              "successful_requests": successes,
              "failed_requests": failures,
              "success_rate": successes / (successes + failures) * 100,
              "total_time": sum(latencies),
              "avg_latency": statistics.mean(latencies),
              "min_latency": min(latencies),
              "max_latency": max(latencies),
              "p50_latency": statistics.median(latencies),
              "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)],
              "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)],
              "requests_per_second": len(latencies) / sum(latencies) if sum(latencies) > 0 else 0
          }
          
          # 添加标准差(如果有多于一个样本)
          if len(latencies) > 1:
              stats["std_dev"] = statistics.stdev(latencies)
          
          return stats
      
      def print_results(self, results: Dict):
          """打印测试结果"""
          print("\n" + "="*60)
          print(f"性能测试结果 - {results['model']}")
          print("="*60)
          
          if "error" in results:
              print(f"错误: {results['error']}")
              return
              
          print(f"总请求数: {results['total_requests']}")
          print(f"成功请求: {results['successful_requests']}")
          print(f"失败请求: {results['failed_requests']}")
          print(f"成功率: {results['success_rate']:.2f}%")
          print(f"总耗时: {results['total_time']:.4f}秒")
          print(f"平均延迟: {results['avg_latency']:.4f}秒")
          print(f"最小延迟: {results['min_latency']:.4f}秒")
          print(f"最大延迟: {results['max_latency']:.4f}秒")
          print(f"P50延迟: {results['p50_latency']:.4f}秒")
          print(f"P95延迟: {results['p95_latency']:.4f}秒")
          print(f"P99延迟: {results['p99_latency']:.4f}秒")
          
          if "std_dev" in results:
              print(f"标准差: {results['std_dev']:.4f}秒")
          
          print(f"QPS: {results['requests_per_second']:.2f} 请求/秒")
          print("="*60)
      
      def save_results(self, results_list: List[Dict], filename: str = "performance_results.json"):
          """保存测试结果到JSON文件"""
          with open(filename, 'w', encoding='utf-8') as f:
              json.dump(results_list, f, indent=2, ensure_ascii=False)
          print(f"\n结果已保存到: {filename}")
  
  
  def main():
      """主函数"""
      # 初始化测试器
      tester = EmbeddingPerformanceTester()
      
      # 测试配置
      test_input = ["What is the capital of China?"]
      iterations = 1000
      test_models = ['bge-m3', 'Qwen3-Embedding-0.6B']
      
      print("="*60)
      print("Embedding API 性能测试")
      print("="*60)
      
      all_results = []
      
      # 测试模式选择
      print("\n选择测试模式:")
      print("1. 顺序测试 (Sequential)")
      print("2. 并发测试 (Concurrent)")
      print("3. 两种模式都测试")
      
      mode = input("请输入选择 (1/2/3, 默认1): ").strip()
      
      for model in test_models:
          print(f"\n{'='*60}")
          print(f"测试模型: {model}")
          print(f"{'='*60}")
          
          if mode in ['2', '3']:
              # 并发测试
              concurrent_results = tester.test_model_concurrent(
                  model=model,
                  input_text=test_input,
                  iterations=iterations,
                  max_workers=10  # 可根据需要调整并发数
              )
              tester.print_results(concurrent_results)
              concurrent_results["test_mode"] = "concurrent"
              all_results.append(concurrent_results)
          
          if mode in ['1', '3'] or not mode:
              # 顺序测试
              sequential_results = tester.test_model_sequential(
                  model=model,
                  input_text=test_input,
                  iterations=iterations
              )
              tester.print_results(sequential_results)
              sequential_results["test_mode"] = "sequential"
              all_results.append(sequential_results)
      
      # 保存结果
      tester.save_results(all_results)
      
      # 汇总对比
      print("\n" + "="*60)
      print("性能测试汇总对比")
      print("="*60)
      
      for result in all_results:
          if "error" not in result:
              print(f"\n模型: {result['model']} ({result['test_mode']})")
              print(f"  QPS: {result['requests_per_second']:.2f}")
              print(f"  平均延迟: {result['avg_latency']:.4f}秒")
              print(f"  成功率: {result['success_rate']:.2f}%")
  
  
  if __name__ == "__main__":
      # 添加一个简单的健康检查
      try:
          tester = EmbeddingPerformanceTester()
          # 快速测试连接
          test_result = tester.test_single_request('bge-m3', ["test"])
          if test_result[0]:
              print("API连接正常,开始性能测试...")
              main()
          else:
              print("API连接失败,请检查服务是否正常运行")
      except Exception as e:
          print(f"初始化失败: {str(e)}")
          print("请确保OpenAI客户端已安装: pip install openai")