example_usage.py 7.47 KB
"""
RequestContext使用示例

展示如何在搜索应用中使用RequestContext进行请求级别的上下文管理和性能监控。
"""

import sys
import os

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from context import RequestContext, RequestContextStage, create_request_context


def example_basic_usage():
    """基本使用示例"""
    print("=== 基本使用示例 ===")

    # 创建context
    context = create_request_context("req-001", "user-123")

    # 模拟搜索流程
    with context:
        # 步骤1: 查询解析
        context.start_stage(RequestContextStage.QUERY_PARSING)
        # 这里调用 query_parser.parse(query, context=context)
        import time
        time.sleep(0.05)  # 模拟处理时间
        context.end_stage(RequestContextStage.QUERY_PARSING)

        # 存储查询分析结果
        context.store_query_analysis(
            original_query="红色连衣裙",
            normalized_query="红色 连衣裙",
            rewritten_query="红色 女 连衣裙",
            detected_language="zh",
            translations={"en": "red dress"}
        )

        # 步骤2: 布尔解析
        if not context.query_analysis.is_simple_query:
            context.start_stage(RequestContextStage.BOOLEAN_PARSING)
            time.sleep(0.02)
            context.end_stage(RequestContextStage.BOOLEAN_PARSING)

        # 步骤3: ES查询构建
        context.start_stage(RequestContextStage.QUERY_BUILDING)
        time.sleep(0.03)
        context.end_stage(RequestContextStage.QUERY_BUILDING)
        context.store_intermediate_result('es_query', {
            "query": {"match": {"title": "红色连衣裙"}},
            "size": 10
        })

        # 步骤4: ES搜索
        context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
        time.sleep(0.1)  # 模拟ES响应时间
        context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
        context.store_intermediate_result('es_response', {
            "hits": {"total": {"value": 156}, "hits": []},
            "took": 45
        })

        # 步骤5: 结果处理
        context.start_stage(RequestContextStage.RESULT_PROCESSING)
        time.sleep(0.02)
        context.end_stage(RequestContextStage.RESULT_PROCESSING)

    # 自动记录性能摘要日志
    print(f"搜索完成,请求ID: {context.reqid}")


def example_with_searcher():
    """在Searcher中使用RequestContext的示例"""
    print("\n=== Searcher集成使用示例 ===")

    # 模拟Searcher.search()调用
    def mock_search(query: str, context: RequestContext = None):
        """模拟Searcher.search()方法"""
        # 如果没有提供context,创建一个
        if context is None:
            context = create_request_context()

        # 存储搜索参数
        context.metadata['search_params'] = {
            'query': query,
            'size': 10,
            'from': 0
        }

        context.metadata['feature_flags'] = {
            'enable_translation': True,
            'enable_embedding': True,
            'enable_rerank': True
        }

        # 模拟搜索流程
        context.start_stage(RequestContextStage.QUERY_PARSING)
        import time
        time.sleep(0.04)
        context.end_stage(RequestContextStage.QUERY_PARSING)
        context.store_query_analysis(
            original_query=query,
            rewritten_query=query,
            detected_language="zh"
        )

        context.start_stage(RequestContextStage.QUERY_BUILDING)
        time.sleep(0.025)
        context.end_stage(RequestContextStage.QUERY_BUILDING)

        context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
        time.sleep(0.08)
        context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH)

        context.start_stage(RequestContextStage.RESULT_PROCESSING)
        time.sleep(0.015)
        context.end_stage(RequestContextStage.RESULT_PROCESSING)

        # 设置总耗时
        context.performance_metrics.total_duration = 160.0

        # 返回包含context的SearchResult(这里简化)
        return {
            'hits': [],
            'total': 0,
            'context': context
        }

    # 使用方式1: 让Searcher自动创建context
    result1 = mock_search("无线蓝牙耳机")
    print(f"自动创建context - 请求ID: {result1['context'].reqid}")

    # 使用方式2: 自己创建并传递context
    my_context = create_request_context("custom-001", "user-456")
    result2 = mock_search("运动鞋", context=my_context)
    print(f"手动创建context - 请求ID: {result2['context'].reqid}")

    # 获取详细的性能摘要
    summary = result2['context'].get_summary()
    print(f"性能摘要: {summary['performance']}")


def example_error_handling():
    """错误处理示例"""
    print("\n=== 错误处理示例 ===")

    context = create_request_context("error-001")

    try:
        context.start_stage(RequestContextStage.QUERY_PARSING)
        # 模拟错误
        raise ValueError("查询解析失败:包含非法字符")
    except Exception as e:
        context.set_error(e)
        context.end_stage(RequestContextStage.QUERY_PARSING)

    # 添加警告
    context.add_warning("查询结果较少,建议放宽搜索条件")

    # 记录错误摘要
    context.log_performance_summary()

    print(f"错误处理完成,请求ID: {context.reqid}")


def example_performance_analysis():
    """性能分析示例"""
    print("\n=== 性能分析示例 ===")

    context = create_request_context("perf-001", "user-789")

    # 模拟一个完整的搜索请求,记录各阶段耗时
    stages_with_durations = [
        (RequestContextStage.QUERY_PARSING, 35.2),
        (RequestContextStage.BOOLEAN_PARSING, 8.1),
        (RequestContextStage.QUERY_BUILDING, 22.5),
        (RequestContextStage.ELASTICSEARCH_SEARCH, 145.8),
        (RequestContextStage.RESULT_PROCESSING, 18.3),
        (RequestContextStage.RERANKING, 42.7)
    ]

    import time
    for stage, duration_ms in stages_with_durations:
        context.start_stage(stage)
        time.sleep(duration_ms / 1000.0)  # 转换为秒
        context.end_stage(stage)

    # 设置总耗时
    total_time = sum(duration_ms for _, duration_ms in stages_with_durations)
    context.performance_metrics.total_duration = total_time

    # 分析性能
    summary = context.get_summary()
    print(f"总耗时: {summary['performance']['total_duration_ms']:.1f}ms")
    print("各阶段耗时详情:")
    for stage, duration in summary['performance']['stage_timings_ms'].items():
        percentage = summary['performance']['stage_percentages'].get(stage, 0)
        print(f"  {stage}: {duration:.1f}ms ({percentage:.1f}%)")

    # 识别性能瓶颈(耗时超过20%的阶段)
    bottlenecks = [
        stage for stage, percentage in summary['performance']['stage_percentages'].items()
        if percentage > 20
    ]
    if bottlenecks:
        print(f"性能瓶颈: {', '.join(bottlenecks)}")
    else:
        print("无明显性能瓶颈")


if __name__ == "__main__":
    print("RequestContext使用示例\n")

    example_basic_usage()
    example_with_searcher()
    example_error_handling()
    example_performance_analysis()

    print("\n✅ 所有示例运行完成!")
    print("\n主要特性:")
    print("1. 自动阶段计时和性能监控")
    print("2. 统一的查询分析结果存储")
    print("3. 中间结果跟踪和调试支持")
    print("4. 错误处理和警告记录")
    print("5. 详细的性能摘要日志")
    print("6. 上下文管理器支持")