RequestContext_README.md 9.49 KB

RequestContext 使用指南

概述

RequestContext 是一个请求粒度的上下文管理器,用于跟踪和管理搜索请求的整个生命周期。它提供了统一的数据存储、性能监控和日志记录功能。

核心功能

1. 查询分析结果存储

  • 原始查询、规范化查询、重写查询
  • 检测语言和翻译结果
  • 查询向量(embedding)
  • 布尔查询AST

2. 各检索阶段中间结果

  • 解析后的查询对象
  • 布尔查询语法树
  • ES查询DSL
  • ES响应数据
  • 处理后的搜索结果

3. 性能监控

  • 自动计时各阶段耗时
  • 计算各阶段耗时占比
  • 识别性能瓶颈
  • 详细的性能摘要日志

4. 错误处理和警告

  • 统一的错误信息存储
  • 警告信息收集
  • 完整的上下文错误跟踪

支持的搜索阶段

class RequestContextStage(Enum):
    TOTAL = "total_search"                    # 总搜索时间
    QUERY_PARSING = "query_parsing"          # 查询解析
    BOOLEAN_PARSING = "boolean_parsing"      # 布尔查询解析
    QUERY_BUILDING = "query_building"        # ES查询构建
    ELASTICSEARCH_SEARCH = "elasticsearch_search"  # ES搜索
    RESULT_PROCESSING = "result_processing"  # 结果处理
    RERANKING = "reranking"                  # 重排序

基本使用方法

1. 创建RequestContext

from context import create_request_context, RequestContext

# 方式1: 使用工厂函数
context = create_request_context(reqid="req-001", uid="user-123")

# 方式2: 直接创建
context = RequestContext(reqid="req-001", uid="user-123")

# 方式3: 作为上下文管理器使用
with create_request_context("req-002", "user-456") as context:
    # 搜索逻辑
    pass  # 自动记录性能摘要

2. 阶段计时

from context import RequestContextStage

# 开始计时
context.start_stage(RequestContextStage.QUERY_PARSING)

# 执行查询解析逻辑
# parsed_query = query_parser.parse(query, context=context)

# 结束计时
duration = context.end_stage(RequestContextStage.QUERY_PARSING)
print(f"查询解析耗时: {duration:.2f}ms")

3. 存储查询分析结果

context.store_query_analysis(
    original_query="红色连衣裙",
    normalized_query="红色 连衣裙",
    rewritten_query="红色 女 连衣裙",
    detected_language="zh",
    translations={"en": "red dress"},
    query_vector=[0.1, 0.2, 0.3, ...],  # 如果有向量
    is_simple_query=True
)

4. 存储中间结果

# 存储解析后的查询对象
context.store_intermediate_result('parsed_query', parsed_query)

# 存储ES查询DSL
context.store_intermediate_result('es_query', es_query_dict)

# 存储ES响应
context.store_intermediate_result('es_response', es_response)

# 存储处理后的结果
context.store_intermediate_result('processed_hits', hits)

5. 错误处理和警告

try:
    # 可能出错的操作
    risky_operation()
except Exception as e:
    context.set_error(e)

# 添加警告信息
context.add_warning("查询结果较少,建议放宽搜索条件")

# 检查是否有错误
if context.has_error():
    print(f"搜索出错: {context.metadata['error_info']}")

在Searcher中使用

1. 自动创建Context(向后兼容)

searcher = Searcher(config, es_client)

# Searcher会自动创建RequestContext
result = searcher.search(
    query="无线蓝牙耳机",
    size=10,
    enable_embedding=True
)

# 结果中包含context信息
print(result.context.get_summary())

2. 手动创建和传递Context

# 创建自己的context
context = create_request_context("my-req-001", "user-789")

# 传递给searcher
result = searcher.search(
    query="运动鞋",
    context=context  # 传递自定义context
)

# 使用context进行详细分析
summary = context.get_summary()
print(f"总耗时: {summary['performance']['total_duration_ms']:.1f}ms")

性能分析

1. 获取性能摘要

summary = context.get_summary()

# 基本信息
print(f"请求ID: {summary['request_info']['reqid']}")
print(f"总耗时: {summary['performance']['total_duration_ms']:.1f}ms")

# 各阶段耗时
for stage, duration in summary['performance']['stage_timings_ms'].items():
    percentage = summary['performance']['stage_percentages'].get(stage, 0)
    print(f"{stage}: {duration:.1f}ms ({percentage:.1f}%)")

# 查询分析信息
query_info = summary['query_analysis']
print(f"原查询: {query_info['original_query']}")
print(f"重写查询: {query_info['rewritten_query']}")
print(f"检测语言: {query_info['detected_language']}")

2. 识别性能瓶颈

summary = context.get_summary()

# 找出耗时超过20%的阶段
bottlenecks = []
for stage, percentage in summary['performance']['stage_percentages'].items():
    if percentage > 20:
        bottlenecks.append((stage, percentage))

if bottlenecks:
    print("性能瓶颈:")
    for stage, percentage in bottlenecks:
        print(f"  - {stage}: {percentage:.1f}%")

3. 自动性能日志

RequestContext会在以下时机自动记录详细的性能摘要日志:

  • 上下文管理器退出时 (with context:)
  • 手动调用 context.log_performance_summary()
  • Searcher.search() 完成时

日志格式示例:

[2024-01-01 10:30:45] [INFO] [request_context] 搜索请求性能摘要 | reqid: req-001 | 总耗时: 272.6ms | 阶段耗时: |   - query_parsing: 35.3ms (13.0%) |   - elasticsearch_search: 146.0ms (53.6%) |   - result_processing: 18.6ms (6.8%) | 查询: '红色连衣裙' -> '红色 女 连衣裙' (zh) | 结果: 156 hits ES查询: 2456 chars

线程安全

RequestContext是线程安全的,支持并发请求处理。每个请求使用独立的context实例,互不干扰。

import threading
from context import create_request_context

def worker(request_id, query):
    context = create_request_context(request_id)
    # 搜索逻辑
    # context自动跟踪此线程的请求
    pass

# 多线程并发处理
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(f"req-{i}", f"query-{i}"))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

调试支持

1. 检查中间结果

# 获取查询解析结果
parsed_query = context.get_intermediate_result('parsed_query')

# 获取ES查询DSL
es_query = context.get_intermediate_result('es_query')

# 获取ES响应
es_response = context.get_intermediate_result('es_response')

# 获取原始搜索结果
raw_hits = context.get_intermediate_result('raw_hits')

# 获取最终处理后的结果
processed_hits = context.get_intermediate_result('processed_hits')

2. 错误诊断

if context.has_error():
    error_info = context.metadata['error_info']
    print(f"错误类型: {error_info['type']}")
    print(f"错误消息: {error_info['message']}")

    # 检查是否有警告
    if context.metadata['warnings']:
        print("警告信息:")
        for warning in context.metadata['warnings']:
            print(f"  - {warning}")

最佳实践

1. 统一使用Context

# 推荐:在整个搜索流程中传递同一个context
result = searcher.search(query, context=context)

# 不推荐:在各个环节创建不同的context

2. 合理设置阶段边界

# 只在有意义的大阶段之间计时
context.start_stage(RequestContextStage.QUERY_PARSING)
# 整个查询解析逻辑
context.end_stage(RequestContextStage.QUERY_PARSING)

# 避免在细粒度操作间频繁计时

3. 及时存储关键数据

# 在每个阶段完成后及时存储结果
context.store_intermediate_result('parsed_query', parsed_query)
context.store_intermediate_result('es_query', es_query)

# 便于后续调试和分析

4. 适当使用警告

# 使用警告记录非致命问题
if total_hits < 10:
    context.add_warning("搜索结果较少,建议放宽搜索条件")

if query_time > 5.0:
    context.add_warning(f"查询耗时较长: {query_time:.1f}秒")

集成示例

API接口集成

from flask import Flask, request, jsonify
from context import create_request_context

app = Flask(__name__)

@app.route('/search')
def api_search():
    # 从请求中获取参数
    query = request.args.get('q', '')
    uid = request.args.get('uid', 'anonymous')

    # 创建context
    context = create_request_context(uid=uid)

    try:
        # 执行搜索
        result = searcher.search(query, context=context)

        # 返回结果(包含性能信息)
        response = {
            'results': result.to_dict(),
            'performance': context.get_summary()['performance']
        }

        return jsonify(response)

    except Exception as e:
        context.set_error(e)
        context.log_performance_summary()

        return jsonify({
            'error': str(e),
            'request_id': context.reqid
        }), 500

总结

RequestContext提供了一个强大而灵活的框架,用于管理搜索请求的整个生命周期。通过统一的上下文管理、自动性能监控和详细的日志记录,它显著提升了搜索系统的可观测性和调试能力。

主要优势:

  1. 统一管理: 所有请求相关数据集中存储
  2. 自动监控: 无需手动计时,自动跟踪性能
  3. 详细日志: 完整的请求生命周期记录
  4. 向后兼容: 现有代码无需修改即可受益
  5. 线程安全: 支持高并发场景
  6. 易于调试: 丰富的中间结果和错误信息

通过合理使用RequestContext,可以构建更加可靠、高性能和易维护的搜索系统。