Blame view

verify_refactoring.py 9.24 KB
6aa246be   tangwang   问题:Pydantic 应该能自动...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
  #!/usr/bin/env python3
  """
  验证 API v3.0 重构是否完整
  检查代码中是否还有旧的逻辑残留
  """
  
  import os
  import re
  from pathlib import Path
  
  def print_header(title):
      print(f"\n{'='*60}")
      print(f"  {title}")
      print('='*60)
  
  def search_in_file(filepath, pattern, description):
      """在文件中搜索模式"""
      try:
          with open(filepath, 'r', encoding='utf-8') as f:
              content = f.read()
              matches = re.findall(pattern, content, re.MULTILINE)
              return matches
      except Exception as e:
          return None
  
  def check_removed_code():
      """检查已移除的代码"""
      print_header("检查已移除的代码")
      
      checks = [
          {
              "file": "search/es_query_builder.py",
              "pattern": r"if field == ['\"]price_ranges['\"]",
              "description": "硬编码的 price_ranges 逻辑",
              "should_exist": False
          },
          {
              "file": "search/es_query_builder.py",
              "pattern": r"def add_dynamic_aggregations",
              "description": "add_dynamic_aggregations 方法",
              "should_exist": False
          },
          {
              "file": "api/models.py",
              "pattern": r"aggregations.*Optional\[Dict",
              "description": "aggregations 参数(在 SearchRequest 中)",
              "should_exist": False
          },
          {
              "file": "frontend/static/js/app.js",
              "pattern": r"price_ranges",
              "description": "前端硬编码的 price_ranges",
              "should_exist": False
          },
          {
              "file": "frontend/static/js/app.js",
              "pattern": r"displayAggregations",
              "description": "旧的 displayAggregations 函数",
              "should_exist": False
          }
      ]
      
      all_passed = True
      for check in checks:
          filepath = os.path.join("/home/tw/SearchEngine", check["file"])
          matches = search_in_file(filepath, check["pattern"], check["description"])
          
          if matches is None:
              print(f"  ⚠️ 无法读取:{check['file']}")
              continue
          
          if check["should_exist"]:
              if matches:
                  print(f"  ✓ 存在:{check['description']}")
              else:
                  print(f"  ✗ 缺失:{check['description']}")
                  all_passed = False
          else:
              if matches:
                  print(f"  ✗ 仍存在:{check['description']}")
                  print(f"     匹配:{matches[:2]}")
                  all_passed = False
              else:
                  print(f"  ✓ 已移除:{check['description']}")
      
      return all_passed
  
  def check_new_code():
      """检查新增的代码"""
      print_header("检查新增的代码")
      
      checks = [
          {
              "file": "api/models.py",
              "pattern": r"class RangeFilter",
              "description": "RangeFilter 模型",
              "should_exist": True
          },
          {
              "file": "api/models.py",
              "pattern": r"class FacetConfig",
              "description": "FacetConfig 模型",
              "should_exist": True
          },
          {
              "file": "api/models.py",
              "pattern": r"class FacetValue",
              "description": "FacetValue 模型",
              "should_exist": True
          },
          {
              "file": "api/models.py",
              "pattern": r"class FacetResult",
              "description": "FacetResult 模型",
              "should_exist": True
          },
          {
              "file": "api/models.py",
              "pattern": r"range_filters.*RangeFilter",
              "description": "range_filters 参数",
              "should_exist": True
          },
          {
              "file": "api/models.py",
              "pattern": r"facets.*FacetConfig",
              "description": "facets 参数",
              "should_exist": True
          },
          {
              "file": "search/es_query_builder.py",
              "pattern": r"def build_facets",
              "description": "build_facets 方法",
              "should_exist": True
          },
          {
              "file": "search/searcher.py",
              "pattern": r"def _standardize_facets",
              "description": "_standardize_facets 方法",
              "should_exist": True
          },
          {
              "file": "api/routes/search.py",
              "pattern": r"@router.get\(['\"]\/suggestions",
              "description": "/search/suggestions 端点",
              "should_exist": True
          },
          {
              "file": "api/routes/search.py",
              "pattern": r"@router.get\(['\"]\/instant",
              "description": "/search/instant 端点",
              "should_exist": True
          },
          {
              "file": "frontend/static/js/app.js",
              "pattern": r"function displayFacets",
              "description": "displayFacets 函数",
              "should_exist": True
          },
          {
              "file": "frontend/static/js/app.js",
              "pattern": r"rangeFilters",
              "description": "rangeFilters 状态",
              "should_exist": True
          }
      ]
      
      all_passed = True
      for check in checks:
          filepath = os.path.join("/home/tw/SearchEngine", check["file"])
          matches = search_in_file(filepath, check["pattern"], check["description"])
          
          if matches is None:
              print(f"  ⚠️ 无法读取:{check['file']}")
              continue
          
          if check["should_exist"]:
              if matches:
                  print(f"  ✓ 存在:{check['description']}")
              else:
                  print(f"  ✗ 缺失:{check['description']}")
                  all_passed = False
          else:
              if matches:
                  print(f"  ✗ 仍存在:{check['description']}")
                  all_passed = False
              else:
                  print(f"  ✓ 已移除:{check['description']}")
      
      return all_passed
  
  def check_documentation():
      """检查文档"""
      print_header("检查文档")
      
      docs = [
          "API_DOCUMENTATION.md",
          "API_EXAMPLES.md",
          "MIGRATION_GUIDE_V3.md",
          "CHANGES.md"
      ]
      
      all_exist = True
      for doc in docs:
          filepath = os.path.join("/home/tw/SearchEngine", doc)
          if os.path.exists(filepath):
              size_kb = os.path.getsize(filepath) / 1024
              print(f"  ✓ 存在:{doc} ({size_kb:.1f} KB)")
          else:
              print(f"  ✗ 缺失:{doc}")
              all_exist = False
      
      return all_exist
  
  def check_imports():
      """检查模块导入"""
      print_header("检查模块导入")
      
      import sys
      sys.path.insert(0, '/home/tw/SearchEngine')
      
      try:
          from api.models import (
              RangeFilter, FacetConfig, FacetValue, FacetResult,
              SearchRequest, SearchResponse, ImageSearchRequest,
              SearchSuggestRequest, SearchSuggestResponse
          )
          print("  ✓ API 模型导入成功")
          
          from search.es_query_builder import ESQueryBuilder
          print("  ✓ ESQueryBuilder 导入成功")
          
          from search.searcher import Searcher, SearchResult
          print("  ✓ Searcher 导入成功")
          
          # 检查方法
          qb = ESQueryBuilder('test', ['field1'])
          if hasattr(qb, 'build_facets'):
              print("  ✓ build_facets 方法存在")
          else:
              print("  ✗ build_facets 方法不存在")
              return False
          
          if hasattr(qb, 'add_dynamic_aggregations'):
              print("  ✗ add_dynamic_aggregations 方法仍存在(应该已删除)")
              return False
          else:
              print("  ✓ add_dynamic_aggregations 方法已删除")
          
          # 检查 SearchResult
          sr = SearchResult(hits=[], total=0, max_score=0, took_ms=10, facets=[])
          if hasattr(sr, 'facets'):
              print("  ✓ SearchResult.facets 属性存在")
          else:
              print("  ✗ SearchResult.facets 属性不存在")
              return False
          
          if hasattr(sr, 'aggregations'):
              print("  ✗ SearchResult.aggregations 属性仍存在(应该已删除)")
              return False
          else:
              print("  ✓ SearchResult.aggregations 属性已删除")
          
          return True
          
      except Exception as e:
          print(f"  ✗ 导入失败:{e}")
          return False
  
  def main():
      """主函数"""
      print("\n" + "🔍 开始验证 API v3.0 重构")
      print(f"项目路径:/home/tw/SearchEngine\n")
      
      # 运行检查
      check1 = check_removed_code()
      check2 = check_new_code()
      check3 = check_documentation()
      check4 = check_imports()
      
      # 总结
      print_header("验证总结")
      
      results = {
          "已移除的代码": check1,
          "新增的代码": check2,
          "文档完整性": check3,
          "模块导入": check4
      }
      
      all_passed = all(results.values())
      
      for name, passed in results.items():
          status = "✓ 通过" if passed else "✗ 失败"
          print(f"  {status}: {name}")
      
      if all_passed:
          print(f"\n  🎉 所有检查通过!API v3.0 重构完成。")
      else:
          print(f"\n  ⚠️ 部分检查失败,请检查上述详情。")
      
      print("\n" + "="*60 + "\n")
      
      return 0 if all_passed else 1
  
  if __name__ == "__main__":
      exit(main())