""" RequestContext使用示例 展示如何在搜索应用中使用RequestContext进行请求级别的上下文管理和性能监控。 """ import sys import os # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from context import RequestContext, RequestContextStage, create_request_context def example_basic_usage(): """基本使用示例""" print("=== 基本使用示例 ===") # 创建context context = create_request_context("req-001", "user-123") # 模拟搜索流程 with context: # 步骤1: 查询解析 context.start_stage(RequestContextStage.QUERY_PARSING) # 这里调用 query_parser.parse(query, context=context) import time time.sleep(0.05) # 模拟处理时间 context.end_stage(RequestContextStage.QUERY_PARSING) # 存储查询分析结果 context.store_query_analysis( original_query="红色连衣裙", normalized_query="红色 连衣裙", rewritten_query="红色 女 连衣裙", detected_language="zh", translations={"en": "red dress"} ) # 步骤2: 布尔解析 if not context.query_analysis.is_simple_query: context.start_stage(RequestContextStage.BOOLEAN_PARSING) time.sleep(0.02) context.end_stage(RequestContextStage.BOOLEAN_PARSING) # 步骤3: ES查询构建 context.start_stage(RequestContextStage.QUERY_BUILDING) time.sleep(0.03) context.end_stage(RequestContextStage.QUERY_BUILDING) context.store_intermediate_result('es_query', { "query": {"match": {"title": "红色连衣裙"}}, "size": 10 }) # 步骤4: ES搜索 context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) time.sleep(0.1) # 模拟ES响应时间 context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) context.store_intermediate_result('es_response', { "hits": {"total": {"value": 156}, "hits": []}, "took": 45 }) # 步骤5: 结果处理 context.start_stage(RequestContextStage.RESULT_PROCESSING) time.sleep(0.02) context.end_stage(RequestContextStage.RESULT_PROCESSING) # 自动记录性能摘要日志 print(f"搜索完成,请求ID: {context.reqid}") def example_with_searcher(): """在Searcher中使用RequestContext的示例""" print("\n=== Searcher集成使用示例 ===") # 模拟Searcher.search()调用 def mock_search(query: str, context: RequestContext = None): """模拟Searcher.search()方法""" # 如果没有提供context,创建一个 if context is None: context = create_request_context() # 存储搜索参数 context.metadata['search_params'] = { 'query': query, 'size': 10, 'from': 0 } context.metadata['feature_flags'] = { 'enable_translation': True, 'enable_embedding': True, 'enable_rerank': True } # 模拟搜索流程 context.start_stage(RequestContextStage.QUERY_PARSING) import time time.sleep(0.04) context.end_stage(RequestContextStage.QUERY_PARSING) context.store_query_analysis( original_query=query, rewritten_query=query, detected_language="zh" ) context.start_stage(RequestContextStage.QUERY_BUILDING) time.sleep(0.025) context.end_stage(RequestContextStage.QUERY_BUILDING) context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) time.sleep(0.08) context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) context.start_stage(RequestContextStage.RESULT_PROCESSING) time.sleep(0.015) context.end_stage(RequestContextStage.RESULT_PROCESSING) # 设置总耗时 context.performance_metrics.total_duration = 160.0 # 返回包含context的SearchResult(这里简化) return { 'hits': [], 'total': 0, 'context': context } # 使用方式1: 让Searcher自动创建context result1 = mock_search("无线蓝牙耳机") print(f"自动创建context - 请求ID: {result1['context'].reqid}") # 使用方式2: 自己创建并传递context my_context = create_request_context("custom-001", "user-456") result2 = mock_search("运动鞋", context=my_context) print(f"手动创建context - 请求ID: {result2['context'].reqid}") # 获取详细的性能摘要 summary = result2['context'].get_summary() print(f"性能摘要: {summary['performance']}") def example_error_handling(): """错误处理示例""" print("\n=== 错误处理示例 ===") context = create_request_context("error-001") try: context.start_stage(RequestContextStage.QUERY_PARSING) # 模拟错误 raise ValueError("查询解析失败:包含非法字符") except Exception as e: context.set_error(e) context.end_stage(RequestContextStage.QUERY_PARSING) # 添加警告 context.add_warning("查询结果较少,建议放宽搜索条件") # 记录错误摘要 context.log_performance_summary() print(f"错误处理完成,请求ID: {context.reqid}") def example_performance_analysis(): """性能分析示例""" print("\n=== 性能分析示例 ===") context = create_request_context("perf-001", "user-789") # 模拟一个完整的搜索请求,记录各阶段耗时 stages_with_durations = [ (RequestContextStage.QUERY_PARSING, 35.2), (RequestContextStage.BOOLEAN_PARSING, 8.1), (RequestContextStage.QUERY_BUILDING, 22.5), (RequestContextStage.ELASTICSEARCH_SEARCH, 145.8), (RequestContextStage.RESULT_PROCESSING, 18.3), (RequestContextStage.RERANKING, 42.7) ] import time for stage, duration_ms in stages_with_durations: context.start_stage(stage) time.sleep(duration_ms / 1000.0) # 转换为秒 context.end_stage(stage) # 设置总耗时 total_time = sum(duration_ms for _, duration_ms in stages_with_durations) context.performance_metrics.total_duration = total_time # 分析性能 summary = context.get_summary() print(f"总耗时: {summary['performance']['total_duration_ms']:.1f}ms") print("各阶段耗时详情:") for stage, duration in summary['performance']['stage_timings_ms'].items(): percentage = summary['performance']['stage_percentages'].get(stage, 0) print(f" {stage}: {duration:.1f}ms ({percentage:.1f}%)") # 识别性能瓶颈(耗时超过20%的阶段) bottlenecks = [ stage for stage, percentage in summary['performance']['stage_percentages'].items() if percentage > 20 ] if bottlenecks: print(f"性能瓶颈: {', '.join(bottlenecks)}") else: print("无明显性能瓶颈") if __name__ == "__main__": print("RequestContext使用示例\n") example_basic_usage() example_with_searcher() example_error_handling() example_performance_analysis() print("\n✅ 所有示例运行完成!") print("\n主要特性:") print("1. 自动阶段计时和性能监控") print("2. 统一的查询分析结果存储") print("3. 中间结果跟踪和调试支持") print("4. 错误处理和警告记录") print("5. 详细的性能摘要日志") print("6. 上下文管理器支持")