16c42787
tangwang
feat: implement r...
|
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
|
python_version = sys.version
self.logger.info(f"Python版本: {python_version}")
checks.append(("Python", True, f"版本 {python_version}"))
except Exception as e:
checks.append(("Python", False, str(e)))
# 检查conda环境
try:
result = self._run_command(['conda', '--version'])
if result.returncode == 0:
conda_version = result.stdout.strip()
self.logger.info(f"Conda版本: {conda_version}")
checks.append(("Conda", True, conda_version))
else:
checks.append(("Conda", False, "未找到conda"))
except Exception as e:
checks.append(("Conda", False, str(e)))
# 检查依赖包
required_packages = [
'pytest', 'fastapi', 'elasticsearch', 'numpy',
'torch', 'transformers', 'pyyaml'
]
for package in required_packages:
try:
result = self._run_command(['python', '-c', f'import {package}'])
if result.returncode == 0:
checks.append((package, True, "已安装"))
else:
checks.append((package, False, "导入失败"))
except Exception as e:
checks.append((package, False, str(e)))
# 检查Elasticsearch
try:
es_host = os.getenv('ES_HOST', 'http://localhost:9200')
result = self._run_command(['curl', '-s', f'{es_host}/_cluster/health'])
if result.returncode == 0:
health_data = json.loads(result.stdout)
status = health_data.get('status', 'unknown')
self.logger.info(f"Elasticsearch状态: {status}")
checks.append(("Elasticsearch", True, f"状态: {status}"))
else:
checks.append(("Elasticsearch", False, "连接失败"))
except Exception as e:
checks.append(("Elasticsearch", False, str(e)))
# 检查API服务
try:
api_host = os.getenv('API_HOST', '127.0.0.1')
api_port = os.getenv('API_PORT', '6003')
result = self._run_command(['curl', '-s', f'http://{api_host}:{api_port}/health'])
if result.returncode == 0:
health_data = json.loads(result.stdout)
status = health_data.get('status', 'unknown')
self.logger.info(f"API服务状态: {status}")
checks.append(("API服务", True, f"状态: {status}"))
else:
checks.append(("API服务", False, "连接失败"))
except Exception as e:
checks.append(("API服务", False, str(e)))
# 输出检查结果
self.logger.info("环境检查结果:")
all_passed = True
for name, passed, details in checks:
status = "✓" if passed else "✗"
self.logger.info(f" {status} {name}: {details}")
if not passed:
all_passed = False
return all_passed
def run_unit_tests(self) -> TestSuiteResult:
"""运行单元测试"""
self.logger.info("运行单元测试...")
start_time = time.time()
cmd = [
'python', '-m', 'pytest',
'tests/unit/',
'-v',
'--tb=short',
'--json-report',
'--json-report-file=test_logs/unit_test_results.json'
]
try:
result = self._run_command(cmd)
duration = time.time() - start_time
# 解析测试结果
if result.returncode == 0:
status = "passed"
else:
status = "failed"
# 尝试解析JSON报告
test_results = []
passed = failed = skipped = errors = 0
try:
with open(project_root / 'test_logs' / 'unit_test_results.json', 'r') as f:
report_data = json.load(f)
summary = report_data.get('summary', {})
total = summary.get('total', 0)
passed = summary.get('passed', 0)
failed = summary.get('failed', 0)
skipped = summary.get('skipped', 0)
errors = summary.get('error', 0)
# 获取详细结果
for test in report_data.get('tests', []):
test_results.append(TestResult(
name=test.get('nodeid', ''),
status=test.get('outcome', 'unknown'),
duration=test.get('duration', 0.0),
details=test
))
except Exception as e:
self.logger.warning(f"无法解析单元测试JSON报告: {e}")
suite_result = TestSuiteResult(
name="单元测试",
total_tests=passed + failed + skipped + errors,
passed=passed,
failed=failed,
skipped=skipped,
errors=errors,
duration=duration,
results=test_results
)
self.results.append(suite_result)
self.logger.info(f"单元测试完成: {suite_result.total_tests}个测试, "
f"{suite_result.passed}通过, {suite_result.failed}失败, "
f"{suite_result.skipped}跳过, {suite_result.errors}错误")
return suite_result
except Exception as e:
self.logger.error(f"单元测试执行失败: {e}")
raise
def run_integration_tests(self) -> TestSuiteResult:
"""运行集成测试"""
self.logger.info("运行集成测试...")
start_time = time.time()
cmd = [
'python', '-m', 'pytest',
'tests/integration/',
'-v',
'--tb=short',
'-m', 'not slow', # 排除慢速测试
'--json-report',
'--json-report-file=test_logs/integration_test_results.json'
]
try:
result = self._run_command(cmd)
duration = time.time() - start_time
# 解析测试结果
if result.returncode == 0:
status = "passed"
else:
status = "failed"
# 尝试解析JSON报告
test_results = []
passed = failed = skipped = errors = 0
try:
with open(project_root / 'test_logs' / 'integration_test_results.json', 'r') as f:
report_data = json.load(f)
summary = report_data.get('summary', {})
total = summary.get('total', 0)
passed = summary.get('passed', 0)
failed = summary.get('failed', 0)
skipped = summary.get('skipped', 0)
errors = summary.get('error', 0)
for test in report_data.get('tests', []):
test_results.append(TestResult(
name=test.get('nodeid', ''),
status=test.get('outcome', 'unknown'),
duration=test.get('duration', 0.0),
details=test
))
except Exception as e:
self.logger.warning(f"无法解析集成测试JSON报告: {e}")
suite_result = TestSuiteResult(
name="集成测试",
total_tests=passed + failed + skipped + errors,
passed=passed,
failed=failed,
skipped=skipped,
errors=errors,
duration=duration,
results=test_results
)
self.results.append(suite_result)
self.logger.info(f"集成测试完成: {suite_result.total_tests}个测试, "
f"{suite_result.passed}通过, {suite_result.failed}失败, "
f"{suite_result.skipped}跳过, {suite_result.errors}错误")
return suite_result
except Exception as e:
self.logger.error(f"集成测试执行失败: {e}")
raise
def run_api_tests(self) -> TestSuiteResult:
"""运行API测试"""
self.logger.info("运行API测试...")
start_time = time.time()
cmd = [
'python', '-m', 'pytest',
'tests/integration/test_api_integration.py',
'-v',
'--tb=short',
'--json-report',
'--json-report-file=test_logs/api_test_results.json'
]
try:
result = self._run_command(cmd)
duration = time.time() - start_time
# 解析测试结果
if result.returncode == 0:
status = "passed"
else:
status = "failed"
# 尝试解析JSON报告
test_results = []
passed = failed = skipped = errors = 0
try:
with open(project_root / 'test_logs' / 'api_test_results.json', 'r') as f:
report_data = json.load(f)
summary = report_data.get('summary', {})
total = summary.get('total', 0)
passed = summary.get('passed', 0)
failed = summary.get('failed', 0)
skipped = summary.get('skipped', 0)
errors = summary.get('error', 0)
for test in report_data.get('tests', []):
test_results.append(TestResult(
name=test.get('nodeid', ''),
status=test.get('outcome', 'unknown'),
duration=test.get('duration', 0.0),
details=test
))
except Exception as e:
self.logger.warning(f"无法解析API测试JSON报告: {e}")
suite_result = TestSuiteResult(
name="API测试",
total_tests=passed + failed + skipped + errors,
passed=passed,
failed=failed,
skipped=skipped,
errors=errors,
duration=duration,
results=test_results
)
self.results.append(suite_result)
self.logger.info(f"API测试完成: {suite_result.total_tests}个测试, "
f"{suite_result.passed}通过, {suite_result.failed}失败, "
f"{suite_result.skipped}跳过, {suite_result.errors}错误")
return suite_result
except Exception as e:
self.logger.error(f"API测试执行失败: {e}")
raise
def run_performance_tests(self) -> TestSuiteResult:
"""运行性能测试"""
self.logger.info("运行性能测试...")
start_time = time.time()
# 简单的性能测试 - 测试搜索响应时间
test_queries = [
"红色连衣裙",
"智能手机",
"笔记本电脑 AND (游戏 OR 办公)",
"无线蓝牙耳机"
]
test_results = []
passed = failed = 0
for query in test_queries:
try:
query_start = time.time()
result = self._run_command([
'curl', '-s',
f'http://{os.getenv("API_HOST", "127.0.0.1")}:{os.getenv("API_PORT", "6003")}/search',
'-d', f'q={query}'
])
query_duration = time.time() - query_start
if result.returncode == 0:
response_data = json.loads(result.stdout)
took_ms = response_data.get('took_ms', 0)
# 性能阈值:响应时间不超过2秒
if took_ms <= 2000:
test_results.append(TestResult(
name=f"搜索性能测试: {query}",
status="passed",
duration=query_duration,
details={"took_ms": took_ms, "response_size": len(result.stdout)}
))
passed += 1
else:
test_results.append(TestResult(
name=f"搜索性能测试: {query}",
status="failed",
duration=query_duration,
details={"took_ms": took_ms, "threshold": 2000}
))
failed += 1
else:
test_results.append(TestResult(
name=f"搜索性能测试: {query}",
status="failed",
duration=query_duration,
error=result.stderr
))
failed += 1
except Exception as e:
test_results.append(TestResult(
name=f"搜索性能测试: {query}",
status="error",
duration=0.0,
error=str(e)
))
failed += 1
duration = time.time() - start_time
suite_result = TestSuiteResult(
name="性能测试",
total_tests=len(test_results),
passed=passed,
failed=failed,
skipped=0,
errors=0,
duration=duration,
results=test_results
)
self.results.append(suite_result)
self.logger.info(f"性能测试完成: {suite_result.total_tests}个测试, "
f"{suite_result.passed}通过, {suite_result.failed}失败")
return suite_result
def generate_report(self) -> str:
"""生成测试报告"""
self.logger.info("生成测试报告...")
# 计算总体统计
total_tests = sum(suite.total_tests for suite in self.results)
total_passed = sum(suite.passed for suite in self.results)
total_failed = sum(suite.failed for suite in self.results)
total_skipped = sum(suite.skipped for suite in self.results)
total_errors = sum(suite.errors for suite in self.results)
total_duration = sum(suite.duration for suite in self.results)
# 生成报告数据
report_data = {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_tests": total_tests,
"passed": total_passed,
"failed": total_failed,
"skipped": total_skipped,
"errors": total_errors,
"success_rate": (total_passed / total_tests * 100) if total_tests > 0 else 0,
"total_duration": total_duration
},
"suites": [asdict(suite) for suite in self.results]
}
# 保存JSON报告
report_file = project_root / 'test_logs' / f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
# 生成文本报告
text_report = self._generate_text_report(report_data)
report_file_text = project_root / 'test_logs' / f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
with open(report_file_text, 'w', encoding='utf-8') as f:
f.write(text_report)
self.logger.info(f"测试报告已保存: {report_file}")
self.logger.info(f"文本报告已保存: {report_file_text}")
return text_report
def _generate_text_report(self, report_data: Dict[str, Any]) -> str:
"""生成文本格式的测试报告"""
lines = []
# 标题
lines.append("=" * 60)
lines.append("搜索引擎测试报告")
lines.append("=" * 60)
lines.append(f"时间: {report_data['timestamp']}")
lines.append("")
# 摘要
summary = report_data['summary']
lines.append("测试摘要")
lines.append("-" * 30)
lines.append(f"总测试数: {summary['total_tests']}")
lines.append(f"通过: {summary['passed']}")
lines.append(f"失败: {summary['failed']}")
lines.append(f"跳过: {summary['skipped']}")
lines.append(f"错误: {summary['errors']}")
lines.append(f"成功率: {summary['success_rate']:.1f}%")
lines.append(f"总耗时: {summary['total_duration']:.2f}秒")
lines.append("")
# 各测试套件详情
lines.append("测试套件详情")
lines.append("-" * 30)
for suite in report_data['suites']:
lines.append(f"\n{suite['name']}:")
lines.append(f" 总数: {suite['total_tests']}, 通过: {suite['passed']}, "
f"失败: {suite['failed']}, 跳过: {suite['skipped']}, 错误: {suite['errors']}")
lines.append(f" 耗时: {suite['duration']:.2f}秒")
# 显示失败的测试
failed_tests = [r for r in suite['results'] if r['status'] in ['failed', 'error']]
if failed_tests:
lines.append(" 失败的测试:")
for test in failed_tests[:5]: # 只显示前5个
lines.append(f" - {test['name']}: {test['status']}")
if test.get('error'):
lines.append(f" 错误: {test['error'][:100]}...")
if len(failed_tests) > 5:
lines.append(f" ... 还有 {len(failed_tests) - 5} 个失败的测试")
return "\n".join(lines)
def run_all_tests(self) -> bool:
"""运行所有测试"""
try:
# 确保日志目录存在
(project_root / 'test_logs').mkdir(exist_ok=True)
# 加载环境变量
env_file = project_root / 'test_env.sh'
if env_file.exists():
self.logger.info("加载测试环境变量...")
result = self._run_command(['bash', str(env_file)])
if result.returncode != 0:
self.logger.warning("环境变量加载失败,继续使用默认配置")
# 检查环境
if not self.check_environment():
self.logger.error("环境检查失败,请先启动测试环境")
return False
# 运行各类测试
test_suites = [
("unit", self.run_unit_tests),
("integration", self.run_integration_tests),
("api", self.run_api_tests),
("performance", self.run_performance_tests)
]
failed_suites = []
for suite_name, suite_func in test_suites:
if suite_name in self.config.get('skip_suites', []):
self.logger.info(f"跳过 {suite_name} 测试")
continue
try:
suite_result = suite_func()
if suite_result.failed > 0 or suite_result.errors > 0:
failed_suites.append(suite_name)
except Exception as e:
self.logger.error(f"{suite_name} 测试执行失败: {e}")
failed_suites.append(suite_name)
# 生成报告
report = self.generate_report()
print(report)
# 返回测试结果
return len(failed_suites) == 0
except Exception as e:
self.logger.error(f"测试执行失败: {e}")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="运行搜索引擎测试流水线")
parser.add_argument('--skip-suites', nargs='+',
choices=['unit', 'integration', 'api', 'performance'],
help='跳过指定的测试套件')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='日志级别')
parser.add_argument('--test-timeout', type=int, default=300,
help='单个测试超时时间(秒)')
parser.add_argument('--start-env', action='store_true',
help='启动测试环境后运行测试')
parser.add_argument('--stop-env', action='store_true',
help='测试完成后停止测试环境')
args = parser.parse_args()
# 配置
config = {
'skip_suites': args.skip_suites or [],
'log_level': args.log_level,
'test_timeout': args.test_timeout
}
# 启动环境
if args.start_env:
print("启动测试环境...")
result = subprocess.run([
'bash', str(project_root / 'scripts' / 'start_test_environment.sh')
], capture_output=True, text=True)
if result.returncode != 0:
print(f"测试环境启动失败: {result.stderr}")
return 1
print("测试环境启动成功")
time.sleep(5) # 等待服务完全启动
try:
# 运行测试
runner = TestRunner(config)
success = runner.run_all_tests()
if success:
print("\n🎉 所有测试通过!")
return_code = 0
else:
print("\n❌ 部分测试失败,请查看日志")
return_code = 1
finally:
# 停止环境
if args.stop_env:
print("\n停止测试环境...")
subprocess.run([
'bash', str(project_root / 'scripts' / 'stop_test_environment.sh')
])
return return_code
if __name__ == "__main__":
sys.exit(main())
|