Blame view

api/app.py 12.7 KB
be52af70   tangwang   first commit
1
2
3
4
  """
  Main FastAPI application for the search service.
  
  Usage:
2a76641e   tangwang   config
5
      uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
be52af70   tangwang   first commit
6
7
8
9
  """
  
  import os
  import sys
bb3c5ef8   tangwang   灌入数据流程跑通
10
11
  import logging
  import time
325eec03   tangwang   1. 日志、配置基础设施,使用优化
12
13
  import argparse
  import uvicorn
28e57bb1   tangwang   日志体系优化
14
  from logging.handlers import TimedRotatingFileHandler
bb3c5ef8   tangwang   灌入数据流程跑通
15
  from collections import defaultdict, deque
be52af70   tangwang   first commit
16
  from typing import Optional
bb3c5ef8   tangwang   灌入数据流程跑通
17
  from fastapi import FastAPI, Request, HTTPException
a7a8c6cb   tangwang   测试过滤、聚合、排序
18
19
  from fastapi.responses import JSONResponse, FileResponse
  from fastapi.staticfiles import StaticFiles
be52af70   tangwang   first commit
20
  from fastapi.middleware.cors import CORSMiddleware
bb3c5ef8   tangwang   灌入数据流程跑通
21
22
23
24
25
  from fastapi.middleware.trustedhost import TrustedHostMiddleware
  from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  from slowapi import Limiter, _rate_limit_exceeded_handler
  from slowapi.util import get_remote_address
  from slowapi.errors import RateLimitExceeded
be52af70   tangwang   first commit
26
  
28e57bb1   tangwang   日志体系优化
27
  # Configure backend logging
3c1f8031   tangwang   api/routes/indexe...
28
  import pathlib
28e57bb1   tangwang   日志体系优化
29
  
4650fcec   tangwang   日志优化、日志串联(uid rqid)
30
31
  from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter
  
28e57bb1   tangwang   日志体系优化
32
33
34
35
36
37
38
  
  def configure_backend_logging() -> None:
      log_dir = pathlib.Path("logs")
      log_dir.mkdir(exist_ok=True)
      log_level = os.getenv("LOG_LEVEL", "INFO").upper()
      numeric_level = getattr(logging, log_level, logging.INFO)
  
4650fcec   tangwang   日志优化、日志串联(uid rqid)
39
40
      default_formatter = logging.Formatter(LOG_LINE_FORMAT)
      request_filter = RequestLogContextFilter()
28e57bb1   tangwang   日志体系优化
41
42
43
44
45
46
47
48
  
      root_logger = logging.getLogger()
      root_logger.setLevel(numeric_level)
      root_logger.handlers.clear()
  
      console_handler = logging.StreamHandler()
      console_handler.setLevel(numeric_level)
      console_handler.setFormatter(default_formatter)
4650fcec   tangwang   日志优化、日志串联(uid rqid)
49
      console_handler.addFilter(request_filter)
28e57bb1   tangwang   日志体系优化
50
51
52
53
54
55
56
57
58
59
60
      root_logger.addHandler(console_handler)
  
      backend_handler = TimedRotatingFileHandler(
          filename=log_dir / "api.log",
          when="midnight",
          interval=1,
          backupCount=30,
          encoding="utf-8",
      )
      backend_handler.setLevel(numeric_level)
      backend_handler.setFormatter(default_formatter)
4650fcec   tangwang   日志优化、日志串联(uid rqid)
61
      backend_handler.addFilter(request_filter)
28e57bb1   tangwang   日志体系优化
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
      root_logger.addHandler(backend_handler)
  
      verbose_logger = logging.getLogger("backend.verbose")
      verbose_logger.setLevel(numeric_level)
      verbose_logger.handlers.clear()
      verbose_logger.propagate = False
  
      verbose_handler = TimedRotatingFileHandler(
          filename=log_dir / "backend_verbose.log",
          when="midnight",
          interval=1,
          backupCount=30,
          encoding="utf-8",
      )
      verbose_handler.setLevel(numeric_level)
4650fcec   tangwang   日志优化、日志串联(uid rqid)
77
78
      verbose_handler.setFormatter(logging.Formatter(LOG_LINE_FORMAT))
      verbose_handler.addFilter(request_filter)
28e57bb1   tangwang   日志体系优化
79
80
      verbose_logger.addHandler(verbose_handler)
  
4650fcec   tangwang   日志优化、日志串联(uid rqid)
81
82
83
84
85
86
      for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"):
          uvicorn_logger = logging.getLogger(logger_name)
          uvicorn_logger.handlers.clear()
          uvicorn_logger.setLevel(numeric_level)
          uvicorn_logger.propagate = True
  
28e57bb1   tangwang   日志体系优化
87
88
  
  configure_backend_logging()
bb3c5ef8   tangwang   灌入数据流程跑通
89
90
91
92
93
  logger = logging.getLogger(__name__)
  
  # Initialize rate limiter
  limiter = Limiter(key_func=get_remote_address)
  
be52af70   tangwang   first commit
94
95
96
  # Add parent directory to path
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
86d8358b   tangwang   config optimize
97
  from config import get_app_config
be52af70   tangwang   first commit
98
  from utils import ESClient
be52af70   tangwang   first commit
99
  from search import Searcher
be52af70   tangwang   first commit
100
  from query import QueryParser
ded6f29e   tangwang   补充suggestion模块
101
  from suggestion import SuggestionService
bb9c626c   tangwang   搜索服务(6002)不再初始化/挂...
102
  from .service_registry import set_es_client
be52af70   tangwang   first commit
103
104
  
  # Global instances
be52af70   tangwang   first commit
105
106
107
  _es_client: Optional[ESClient] = None
  _searcher: Optional[Searcher] = None
  _query_parser: Optional[QueryParser] = None
ded6f29e   tangwang   补充suggestion模块
108
  _suggestion_service: Optional[SuggestionService] = None
86d8358b   tangwang   config optimize
109
  _app_config = None
be52af70   tangwang   first commit
110
111
  
  
4650fcec   tangwang   日志优化、日志串联(uid rqid)
112
113
114
115
116
117
118
119
120
121
  def _request_log_extra_from_http(request: Request) -> dict:
      reqid = getattr(getattr(request, "state", None), "reqid", None) or request.headers.get("X-Request-ID")
      uid = (
          getattr(getattr(request, "state", None), "uid", None)
          or request.headers.get("X-User-ID")
          or request.headers.get("User-ID")
      )
      return {"reqid": reqid or "-1", "uid": uid or "-1"}
  
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
122
  def init_service(es_host: str = "http://localhost:9200"):
be52af70   tangwang   first commit
123
      """
4d824a77   tangwang   所有租户共用一套统一配置.tena...
124
      Initialize search service with unified configuration.
be52af70   tangwang   first commit
125
126
  
      Args:
be52af70   tangwang   first commit
127
128
          es_host: Elasticsearch host URL
      """
86d8358b   tangwang   config optimize
129
      global _es_client, _searcher, _query_parser, _suggestion_service, _app_config
be52af70   tangwang   first commit
130
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
131
132
      start_time = time.time()
      logger.info("Initializing search service (multi-tenant)")
be52af70   tangwang   first commit
133
  
9f96d6f3   tangwang   短query不用语义搜索
134
135
      # Load configuration
      logger.info("Loading configuration...")
86d8358b   tangwang   config optimize
136
137
      _app_config = get_app_config()
      search_config = _app_config.search
9f96d6f3   tangwang   短query不用语义搜索
138
139
      logger.info("Configuration loaded")
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
140
      # Get ES credentials
86d8358b   tangwang   config optimize
141
142
      es_username = _app_config.infrastructure.elasticsearch.username
      es_password = _app_config.infrastructure.elasticsearch.password
be52af70   tangwang   first commit
143
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
144
145
      # Connect to Elasticsearch
      logger.info(f"Connecting to Elasticsearch at {es_host}...")
a406638e   tangwang   up
146
      if es_username and es_password:
a406638e   tangwang   up
147
148
          _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
      else:
a406638e   tangwang   up
149
150
          _es_client = ESClient(hosts=[es_host])
      
be52af70   tangwang   first commit
151
152
      if not _es_client.ping():
          raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
325eec03   tangwang   1. 日志、配置基础设施,使用优化
153
      logger.info("Elasticsearch connected")
bb9c626c   tangwang   搜索服务(6002)不再初始化/挂...
154
155
      # expose ES client for any shared components (e.g. searcher)
      set_es_client(_es_client)
be52af70   tangwang   first commit
156
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
157
158
      # Initialize components
      logger.info("Initializing query parser...")
86d8358b   tangwang   config optimize
159
      _query_parser = QueryParser(search_config)
325eec03   tangwang   1. 日志、配置基础设施,使用优化
160
161
      
      logger.info("Initializing searcher...")
86d8358b   tangwang   config optimize
162
      _searcher = Searcher(_es_client, search_config, _query_parser)
ded6f29e   tangwang   补充suggestion模块
163
164
      logger.info("Initializing suggestion service...")
      _suggestion_service = SuggestionService(_es_client)
325eec03   tangwang   1. 日志、配置基础设施,使用优化
165
      
325eec03   tangwang   1. 日志、配置基础设施,使用优化
166
      elapsed = time.time() - start_time
86d8358b   tangwang   config optimize
167
      logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {search_config.es_index_name}")
be52af70   tangwang   first commit
168
169
  
  
be52af70   tangwang   first commit
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
  
  
  def get_es_client() -> ESClient:
      """Get Elasticsearch client."""
      if _es_client is None:
          raise RuntimeError("Service not initialized")
      return _es_client
  
  
  def get_searcher() -> Searcher:
      """Get searcher instance."""
      if _searcher is None:
          raise RuntimeError("Service not initialized")
      return _searcher
  
  
  def get_query_parser() -> QueryParser:
      """Get query parser instance."""
      if _query_parser is None:
          raise RuntimeError("Service not initialized")
      return _query_parser
  
  
ded6f29e   tangwang   补充suggestion模块
193
194
195
196
197
198
199
  def get_suggestion_service() -> SuggestionService:
      """Get suggestion service instance."""
      if _suggestion_service is None:
          raise RuntimeError("Service not initialized")
      return _suggestion_service
  
  
9f96d6f3   tangwang   短query不用语义搜索
200
201
  def get_config():
      """Get global config instance."""
86d8358b   tangwang   config optimize
202
      if _app_config is None:
9f96d6f3   tangwang   短query不用语义搜索
203
          raise RuntimeError("Service not initialized")
86d8358b   tangwang   config optimize
204
      return _app_config
9f96d6f3   tangwang   短query不用语义搜索
205
206
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
207
  # Create FastAPI app with enhanced configuration
be52af70   tangwang   first commit
208
209
210
  app = FastAPI(
      title="E-Commerce Search API",
      description="Configurable search engine for cross-border e-commerce",
bb3c5ef8   tangwang   灌入数据流程跑通
211
212
213
214
215
216
217
218
219
220
221
222
223
224
      version="1.0.0",
      docs_url="/docs",
      redoc_url="/redoc",
      openapi_url="/openapi.json"
  )
  
  # Add rate limiting middleware
  app.state.limiter = limiter
  app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
  
  # Add trusted host middleware (restrict to localhost and trusted domains)
  app.add_middleware(
      TrustedHostMiddleware,
      allowed_hosts=["*"]  # Allow all hosts for development, restrict in production
be52af70   tangwang   first commit
225
226
  )
  
bb3c5ef8   tangwang   灌入数据流程跑通
227
228
229
230
231
232
233
234
235
236
237
  # Add security headers middleware
  @app.middleware("http")
  async def add_security_headers(request: Request, call_next):
      response = await call_next(request)
      response.headers["X-Content-Type-Options"] = "nosniff"
      response.headers["X-Frame-Options"] = "DENY"
      response.headers["X-XSS-Protection"] = "1; mode=block"
      response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
      return response
  
  # Add CORS middleware with more restrictive settings
be52af70   tangwang   first commit
238
239
  app.add_middleware(
      CORSMiddleware,
bb3c5ef8   tangwang   灌入数据流程跑通
240
      allow_origins=["*"],  # Restrict in production to specific domains
be52af70   tangwang   first commit
241
      allow_credentials=True,
bb3c5ef8   tangwang   灌入数据流程跑通
242
      allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
be52af70   tangwang   first commit
243
      allow_headers=["*"],
bb3c5ef8   tangwang   灌入数据流程跑通
244
      expose_headers=["X-Total-Count"]
be52af70   tangwang   first commit
245
246
247
248
249
250
  )
  
  
  @app.on_event("startup")
  async def startup_event():
      """Initialize service on startup."""
f62a541c   tangwang   将 uvicorn 的默认线程池调...
251
252
253
254
255
256
257
258
259
      # Configure thread pool size for uvicorn (default is 40, set to 48)
      try:
          import anyio.to_thread
          limiter = anyio.to_thread.current_default_thread_limiter()
          limiter.total_tokens = 48
          logger.info(f"Thread pool size set to {limiter.total_tokens}")
      except Exception as e:
          logger.warning(f"Failed to set thread pool size: {e}, using default")
      
86d8358b   tangwang   config optimize
260
      es_host = get_app_config().infrastructure.elasticsearch.host
4d824a77   tangwang   所有租户共用一套统一配置.tena...
261
      logger.info("Starting E-Commerce Search API (Multi-Tenant)")
bb3c5ef8   tangwang   灌入数据流程跑通
262
      logger.info(f"Elasticsearch Host: {es_host}")
3c1f8031   tangwang   api/routes/indexe...
263
      
be52af70   tangwang   first commit
264
      try:
4d824a77   tangwang   所有租户共用一套统一配置.tena...
265
          init_service(es_host=es_host)
bb3c5ef8   tangwang   灌入数据流程跑通
266
          logger.info("Service initialized successfully")
be52af70   tangwang   first commit
267
      except Exception as e:
3c1f8031   tangwang   api/routes/indexe...
268
          logger.error(f"Failed to initialize service: {e}", exc_info=True)
26b910bd   tangwang   refactor service ...
269
          raise
bb3c5ef8   tangwang   灌入数据流程跑通
270
271
272
273
274
275
  
  
  @app.on_event("shutdown")
  async def shutdown_event():
      """Cleanup on shutdown."""
      logger.info("Shutting down E-Commerce Search API")
be52af70   tangwang   first commit
276
277
278
279
  
  
  @app.exception_handler(Exception)
  async def global_exception_handler(request: Request, exc: Exception):
bb3c5ef8   tangwang   灌入数据流程跑通
280
281
      """Global exception handler with detailed logging."""
      client_ip = request.client.host if request.client else "unknown"
4650fcec   tangwang   日志优化、日志串联(uid rqid)
282
283
284
285
286
      logger.error(
          f"Unhandled exception from {client_ip}: {exc}",
          exc_info=True,
          extra=_request_log_extra_from_http(request),
      )
bb3c5ef8   tangwang   灌入数据流程跑通
287
  
be52af70   tangwang   first commit
288
289
290
291
      return JSONResponse(
          status_code=500,
          content={
              "error": "Internal server error",
bb3c5ef8   tangwang   灌入数据流程跑通
292
293
294
295
296
297
298
299
300
              "detail": "An unexpected error occurred. Please try again later.",
              "timestamp": int(time.time())
          }
      )
  
  
  @app.exception_handler(HTTPException)
  async def http_exception_handler(request: Request, exc: HTTPException):
      """HTTP exception handler."""
4650fcec   tangwang   日志优化、日志串联(uid rqid)
301
302
303
304
      logger.warning(
          f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}",
          extra=_request_log_extra_from_http(request),
      )
bb3c5ef8   tangwang   灌入数据流程跑通
305
306
307
308
309
310
311
  
      return JSONResponse(
          status_code=exc.status_code,
          content={
              "error": exc.detail,
              "status_code": exc.status_code,
              "timestamp": int(time.time())
be52af70   tangwang   first commit
312
313
314
315
          }
      )
  
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
316
  @app.get("/api")
bb3c5ef8   tangwang   灌入数据流程跑通
317
318
319
320
321
322
  @limiter.limit("60/minute")
  async def root(request: Request):
      """Root endpoint with rate limiting."""
      client_ip = request.client.host if request.client else "unknown"
      logger.info(f"Root endpoint accessed from {client_ip}")
  
be52af70   tangwang   first commit
323
324
325
      return {
          "service": "E-Commerce Search API",
          "version": "1.0.0",
bb3c5ef8   tangwang   灌入数据流程跑通
326
327
          "status": "running",
          "timestamp": int(time.time())
be52af70   tangwang   first commit
328
329
330
      }
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
331
332
333
334
335
336
  @app.get("/health")
  @limiter.limit("120/minute")
  async def health_check(request: Request):
      """Health check endpoint."""
      try:
          # Check if services are initialized
bb3c5ef8   tangwang   灌入数据流程跑通
337
          get_es_client()
bf89b597   tangwang   feat(search): ada...
338
          get_searcher()
bb3c5ef8   tangwang   灌入数据流程跑通
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
  
          return {
              "status": "healthy",
              "services": {
                  "config": "initialized",
                  "elasticsearch": "connected",
                  "searcher": "initialized"
              },
              "timestamp": int(time.time())
          }
      except Exception as e:
          logger.error(f"Health check failed: {e}")
          return JSONResponse(
              status_code=503,
              content={
                  "status": "unhealthy",
                  "error": str(e),
                  "timestamp": int(time.time())
              }
          )
  
  
bb9c626c   tangwang   搜索服务(6002)不再初始化/挂...
361
362
  # Include routers (search app should NOT mount indexer routes)
  from .routes import search, admin
be52af70   tangwang   first commit
363
364
365
  
  app.include_router(search.router)
  app.include_router(admin.router)
be52af70   tangwang   first commit
366
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
  # Mount static files and serve frontend
  frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
  if os.path.exists(frontend_path):
      # Serve frontend HTML at root
      @app.get("/")
      async def serve_frontend():
          """Serve the frontend HTML."""
          index_path = os.path.join(frontend_path, "index.html")
          if os.path.exists(index_path):
              return FileResponse(index_path)
          return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
      
      # Mount static files (CSS, JS, images)
      app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
      
      logger.info(f"Frontend static files mounted from: {frontend_path}")
  else:
      logger.warning(f"Frontend directory not found: {frontend_path}")
  
be52af70   tangwang   first commit
386
387
  
  if __name__ == "__main__":
4d824a77   tangwang   所有租户共用一套统一配置.tena...
388
      parser = argparse.ArgumentParser(description='Start search API service (multi-tenant)')
be52af70   tangwang   first commit
389
      parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
2a76641e   tangwang   config
390
      parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
be52af70   tangwang   first commit
391
392
393
394
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
      args = parser.parse_args()
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
395
      # Set environment variable
be52af70   tangwang   first commit
396
397
398
399
400
401
402
403
404
      os.environ['ES_HOST'] = args.es_host
  
      # Run server
      uvicorn.run(
          "api.app:app",
          host=args.host,
          port=args.port,
          reload=args.reload
      )