16c42787
tangwang
feat: implement r...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
"""
RequestContext使用示例
展示如何在搜索应用中使用RequestContext进行请求级别的上下文管理和性能监控。
"""
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from context import RequestContext, RequestContextStage, create_request_context
def example_basic_usage():
"""基本使用示例"""
print("=== 基本使用示例 ===")
# 创建context
context = create_request_context("req-001", "user-123")
# 模拟搜索流程
with context:
# 步骤1: 查询解析
context.start_stage(RequestContextStage.QUERY_PARSING)
# 这里调用 query_parser.parse(query, context=context)
import time
time.sleep(0.05) # 模拟处理时间
context.end_stage(RequestContextStage.QUERY_PARSING)
# 存储查询分析结果
context.store_query_analysis(
original_query="红色连衣裙",
normalized_query="红色 连衣裙",
rewritten_query="红色 女 连衣裙",
detected_language="zh",
translations={"en": "red dress"}
)
# 步骤2: 布尔解析
if not context.query_analysis.is_simple_query:
context.start_stage(RequestContextStage.BOOLEAN_PARSING)
time.sleep(0.02)
context.end_stage(RequestContextStage.BOOLEAN_PARSING)
# 步骤3: ES查询构建
context.start_stage(RequestContextStage.QUERY_BUILDING)
time.sleep(0.03)
context.end_stage(RequestContextStage.QUERY_BUILDING)
context.store_intermediate_result('es_query', {
"query": {"match": {"title": "红色连衣裙"}},
"size": 10
})
# 步骤4: ES搜索
context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
time.sleep(0.1) # 模拟ES响应时间
context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
context.store_intermediate_result('es_response', {
"hits": {"total": {"value": 156}, "hits": []},
"took": 45
})
# 步骤5: 结果处理
context.start_stage(RequestContextStage.RESULT_PROCESSING)
time.sleep(0.02)
context.end_stage(RequestContextStage.RESULT_PROCESSING)
# 自动记录性能摘要日志
print(f"搜索完成,请求ID: {context.reqid}")
def example_with_searcher():
"""在Searcher中使用RequestContext的示例"""
print("\n=== Searcher集成使用示例 ===")
# 模拟Searcher.search()调用
def mock_search(query: str, context: RequestContext = None):
"""模拟Searcher.search()方法"""
# 如果没有提供context,创建一个
if context is None:
context = create_request_context()
# 存储搜索参数
context.metadata['search_params'] = {
'query': query,
'size': 10,
'from': 0
}
context.metadata['feature_flags'] = {
'enable_translation': True,
'enable_embedding': True,
'enable_rerank': True
}
# 模拟搜索流程
context.start_stage(RequestContextStage.QUERY_PARSING)
import time
time.sleep(0.04)
context.end_stage(RequestContextStage.QUERY_PARSING)
context.store_query_analysis(
original_query=query,
rewritten_query=query,
detected_language="zh"
)
context.start_stage(RequestContextStage.QUERY_BUILDING)
time.sleep(0.025)
context.end_stage(RequestContextStage.QUERY_BUILDING)
context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
time.sleep(0.08)
context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
context.start_stage(RequestContextStage.RESULT_PROCESSING)
time.sleep(0.015)
context.end_stage(RequestContextStage.RESULT_PROCESSING)
# 设置总耗时
context.performance_metrics.total_duration = 160.0
# 返回包含context的SearchResult(这里简化)
return {
'hits': [],
'total': 0,
'context': context
}
# 使用方式1: 让Searcher自动创建context
result1 = mock_search("无线蓝牙耳机")
print(f"自动创建context - 请求ID: {result1['context'].reqid}")
# 使用方式2: 自己创建并传递context
my_context = create_request_context("custom-001", "user-456")
result2 = mock_search("运动鞋", context=my_context)
print(f"手动创建context - 请求ID: {result2['context'].reqid}")
# 获取详细的性能摘要
summary = result2['context'].get_summary()
print(f"性能摘要: {summary['performance']}")
def example_error_handling():
"""错误处理示例"""
print("\n=== 错误处理示例 ===")
context = create_request_context("error-001")
try:
context.start_stage(RequestContextStage.QUERY_PARSING)
# 模拟错误
raise ValueError("查询解析失败:包含非法字符")
except Exception as e:
context.set_error(e)
context.end_stage(RequestContextStage.QUERY_PARSING)
# 添加警告
context.add_warning("查询结果较少,建议放宽搜索条件")
# 记录错误摘要
context.log_performance_summary()
print(f"错误处理完成,请求ID: {context.reqid}")
def example_performance_analysis():
"""性能分析示例"""
print("\n=== 性能分析示例 ===")
context = create_request_context("perf-001", "user-789")
# 模拟一个完整的搜索请求,记录各阶段耗时
stages_with_durations = [
(RequestContextStage.QUERY_PARSING, 35.2),
(RequestContextStage.BOOLEAN_PARSING, 8.1),
(RequestContextStage.QUERY_BUILDING, 22.5),
(RequestContextStage.ELASTICSEARCH_SEARCH, 145.8),
(RequestContextStage.RESULT_PROCESSING, 18.3),
(RequestContextStage.RERANKING, 42.7)
]
import time
for stage, duration_ms in stages_with_durations:
context.start_stage(stage)
time.sleep(duration_ms / 1000.0) # 转换为秒
context.end_stage(stage)
# 设置总耗时
total_time = sum(duration_ms for _, duration_ms in stages_with_durations)
context.performance_metrics.total_duration = total_time
# 分析性能
summary = context.get_summary()
print(f"总耗时: {summary['performance']['total_duration_ms']:.1f}ms")
print("各阶段耗时详情:")
for stage, duration in summary['performance']['stage_timings_ms'].items():
percentage = summary['performance']['stage_percentages'].get(stage, 0)
print(f" {stage}: {duration:.1f}ms ({percentage:.1f}%)")
# 识别性能瓶颈(耗时超过20%的阶段)
bottlenecks = [
stage for stage, percentage in summary['performance']['stage_percentages'].items()
if percentage > 20
]
if bottlenecks:
print(f"性能瓶颈: {', '.join(bottlenecks)}")
else:
print("无明显性能瓶颈")
if __name__ == "__main__":
print("RequestContext使用示例\n")
example_basic_usage()
example_with_searcher()
example_error_handling()
example_performance_analysis()
print("\n✅ 所有示例运行完成!")
print("\n主要特性:")
print("1. 自动阶段计时和性能监控")
print("2. 统一的查询分析结果存储")
print("3. 中间结果跟踪和调试支持")
print("4. 错误处理和警告记录")
print("5. 详细的性能摘要日志")
print("6. 上下文管理器支持")
|