Blame view

api/app.py 9.18 KB
be52af70   tangwang   first commit
1
2
3
4
  """
  Main FastAPI application for the search service.
  
  Usage:
2a76641e   tangwang   config
5
      uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
be52af70   tangwang   first commit
6
7
8
9
  """
  
  import os
  import sys
bb3c5ef8   tangwang   灌入数据流程跑通
10
11
  import logging
  import time
325eec03   tangwang   1. 日志、配置基础设施,使用优化
12
13
  import argparse
  import uvicorn
bb3c5ef8   tangwang   灌入数据流程跑通
14
  from collections import defaultdict, deque
be52af70   tangwang   first commit
15
  from typing import Optional
bb3c5ef8   tangwang   灌入数据流程跑通
16
  from fastapi import FastAPI, Request, HTTPException
a7a8c6cb   tangwang   测试过滤、聚合、排序
17
18
  from fastapi.responses import JSONResponse, FileResponse
  from fastapi.staticfiles import StaticFiles
be52af70   tangwang   first commit
19
  from fastapi.middleware.cors import CORSMiddleware
bb3c5ef8   tangwang   灌入数据流程跑通
20
21
22
23
24
  from fastapi.middleware.trustedhost import TrustedHostMiddleware
  from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  from slowapi import Limiter, _rate_limit_exceeded_handler
  from slowapi.util import get_remote_address
  from slowapi.errors import RateLimitExceeded
be52af70   tangwang   first commit
25
  
bb3c5ef8   tangwang   灌入数据流程跑通
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  # Configure logging with better formatting
  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
      handlers=[
          logging.StreamHandler(),
          logging.FileHandler('/tmp/search_engine_api.log', mode='a')
      ]
  )
  logger = logging.getLogger(__name__)
  
  # Initialize rate limiter
  limiter = Limiter(key_func=get_remote_address)
  
be52af70   tangwang   first commit
40
41
42
  # Add parent directory to path
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
43
  from config.env_config import ES_CONFIG
be52af70   tangwang   first commit
44
45
  from utils import ESClient
  from search import Searcher
bf89b597   tangwang   feat(search): ada...
46
  from search.query_config import DEFAULT_INDEX_NAME
be52af70   tangwang   first commit
47
48
49
  from query import QueryParser
  
  # Global instances
be52af70   tangwang   first commit
50
51
52
53
54
  _es_client: Optional[ESClient] = None
  _searcher: Optional[Searcher] = None
  _query_parser: Optional[QueryParser] = None
  
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
55
  def init_service(es_host: str = "http://localhost:9200"):
be52af70   tangwang   first commit
56
      """
4d824a77   tangwang   所有租户共用一套统一配置.tena...
57
      Initialize search service with unified configuration.
be52af70   tangwang   first commit
58
59
  
      Args:
be52af70   tangwang   first commit
60
61
          es_host: Elasticsearch host URL
      """
bf89b597   tangwang   feat(search): ada...
62
      global _es_client, _searcher, _query_parser
be52af70   tangwang   first commit
63
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
64
65
      start_time = time.time()
      logger.info("Initializing search service (multi-tenant)")
be52af70   tangwang   first commit
66
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
67
68
69
      # Get ES credentials
      es_username = os.getenv('ES_USERNAME') or ES_CONFIG.get('username')
      es_password = os.getenv('ES_PASSWORD') or ES_CONFIG.get('password')
be52af70   tangwang   first commit
70
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
71
72
      # Connect to Elasticsearch
      logger.info(f"Connecting to Elasticsearch at {es_host}...")
a406638e   tangwang   up
73
      if es_username and es_password:
a406638e   tangwang   up
74
75
          _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
      else:
a406638e   tangwang   up
76
77
          _es_client = ESClient(hosts=[es_host])
      
be52af70   tangwang   first commit
78
79
      if not _es_client.ping():
          raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
325eec03   tangwang   1. 日志、配置基础设施,使用优化
80
      logger.info("Elasticsearch connected")
be52af70   tangwang   first commit
81
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
82
83
      # Initialize components
      logger.info("Initializing query parser...")
bf89b597   tangwang   feat(search): ada...
84
      _query_parser = QueryParser()
325eec03   tangwang   1. 日志、配置基础设施,使用优化
85
86
      
      logger.info("Initializing searcher...")
bf89b597   tangwang   feat(search): ada...
87
      _searcher = Searcher(_es_client, _query_parser, index_name=DEFAULT_INDEX_NAME)
325eec03   tangwang   1. 日志、配置基础设施,使用优化
88
89
      
      elapsed = time.time() - start_time
bf89b597   tangwang   feat(search): ada...
90
      logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {DEFAULT_INDEX_NAME}")
be52af70   tangwang   first commit
91
92
  
  
be52af70   tangwang   first commit
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
  
  
  def get_es_client() -> ESClient:
      """Get Elasticsearch client."""
      if _es_client is None:
          raise RuntimeError("Service not initialized")
      return _es_client
  
  
  def get_searcher() -> Searcher:
      """Get searcher instance."""
      if _searcher is None:
          raise RuntimeError("Service not initialized")
      return _searcher
  
  
  def get_query_parser() -> QueryParser:
      """Get query parser instance."""
      if _query_parser is None:
          raise RuntimeError("Service not initialized")
      return _query_parser
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
116
  # Create FastAPI app with enhanced configuration
be52af70   tangwang   first commit
117
118
119
  app = FastAPI(
      title="E-Commerce Search API",
      description="Configurable search engine for cross-border e-commerce",
bb3c5ef8   tangwang   灌入数据流程跑通
120
121
122
123
124
125
126
127
128
129
130
131
132
133
      version="1.0.0",
      docs_url="/docs",
      redoc_url="/redoc",
      openapi_url="/openapi.json"
  )
  
  # Add rate limiting middleware
  app.state.limiter = limiter
  app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
  
  # Add trusted host middleware (restrict to localhost and trusted domains)
  app.add_middleware(
      TrustedHostMiddleware,
      allowed_hosts=["*"]  # Allow all hosts for development, restrict in production
be52af70   tangwang   first commit
134
135
  )
  
bb3c5ef8   tangwang   灌入数据流程跑通
136
137
138
139
140
141
142
143
144
145
146
  # Add security headers middleware
  @app.middleware("http")
  async def add_security_headers(request: Request, call_next):
      response = await call_next(request)
      response.headers["X-Content-Type-Options"] = "nosniff"
      response.headers["X-Frame-Options"] = "DENY"
      response.headers["X-XSS-Protection"] = "1; mode=block"
      response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
      return response
  
  # Add CORS middleware with more restrictive settings
be52af70   tangwang   first commit
147
148
  app.add_middleware(
      CORSMiddleware,
bb3c5ef8   tangwang   灌入数据流程跑通
149
      allow_origins=["*"],  # Restrict in production to specific domains
be52af70   tangwang   first commit
150
      allow_credentials=True,
bb3c5ef8   tangwang   灌入数据流程跑通
151
      allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
be52af70   tangwang   first commit
152
      allow_headers=["*"],
bb3c5ef8   tangwang   灌入数据流程跑通
153
      expose_headers=["X-Total-Count"]
be52af70   tangwang   first commit
154
155
156
157
158
159
  )
  
  
  @app.on_event("startup")
  async def startup_event():
      """Initialize service on startup."""
be52af70   tangwang   first commit
160
161
      es_host = os.getenv("ES_HOST", "http://localhost:9200")
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
162
      logger.info("Starting E-Commerce Search API (Multi-Tenant)")
bb3c5ef8   tangwang   灌入数据流程跑通
163
164
      logger.info(f"Elasticsearch Host: {es_host}")
  
be52af70   tangwang   first commit
165
      try:
4d824a77   tangwang   所有租户共用一套统一配置.tena...
166
          init_service(es_host=es_host)
bb3c5ef8   tangwang   灌入数据流程跑通
167
          logger.info("Service initialized successfully")
be52af70   tangwang   first commit
168
      except Exception as e:
bb3c5ef8   tangwang   灌入数据流程跑通
169
170
171
172
173
174
175
176
          logger.error(f"Failed to initialize service: {e}")
          logger.warning("Service will start but may not function correctly")
  
  
  @app.on_event("shutdown")
  async def shutdown_event():
      """Cleanup on shutdown."""
      logger.info("Shutting down E-Commerce Search API")
be52af70   tangwang   first commit
177
178
179
180
  
  
  @app.exception_handler(Exception)
  async def global_exception_handler(request: Request, exc: Exception):
bb3c5ef8   tangwang   灌入数据流程跑通
181
182
183
184
      """Global exception handler with detailed logging."""
      client_ip = request.client.host if request.client else "unknown"
      logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
  
be52af70   tangwang   first commit
185
186
187
188
      return JSONResponse(
          status_code=500,
          content={
              "error": "Internal server error",
bb3c5ef8   tangwang   灌入数据流程跑通
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
              "detail": "An unexpected error occurred. Please try again later.",
              "timestamp": int(time.time())
          }
      )
  
  
  @app.exception_handler(HTTPException)
  async def http_exception_handler(request: Request, exc: HTTPException):
      """HTTP exception handler."""
      logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
  
      return JSONResponse(
          status_code=exc.status_code,
          content={
              "error": exc.detail,
              "status_code": exc.status_code,
              "timestamp": int(time.time())
be52af70   tangwang   first commit
206
207
208
209
          }
      )
  
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
210
  @app.get("/api")
bb3c5ef8   tangwang   灌入数据流程跑通
211
212
213
214
215
216
  @limiter.limit("60/minute")
  async def root(request: Request):
      """Root endpoint with rate limiting."""
      client_ip = request.client.host if request.client else "unknown"
      logger.info(f"Root endpoint accessed from {client_ip}")
  
be52af70   tangwang   first commit
217
218
219
      return {
          "service": "E-Commerce Search API",
          "version": "1.0.0",
bb3c5ef8   tangwang   灌入数据流程跑通
220
221
          "status": "running",
          "timestamp": int(time.time())
be52af70   tangwang   first commit
222
223
224
      }
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
225
226
227
228
229
230
  @app.get("/health")
  @limiter.limit("120/minute")
  async def health_check(request: Request):
      """Health check endpoint."""
      try:
          # Check if services are initialized
bb3c5ef8   tangwang   灌入数据流程跑通
231
          get_es_client()
bf89b597   tangwang   feat(search): ada...
232
          get_searcher()
bb3c5ef8   tangwang   灌入数据流程跑通
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
  
          return {
              "status": "healthy",
              "services": {
                  "config": "initialized",
                  "elasticsearch": "connected",
                  "searcher": "initialized"
              },
              "timestamp": int(time.time())
          }
      except Exception as e:
          logger.error(f"Health check failed: {e}")
          return JSONResponse(
              status_code=503,
              content={
                  "status": "unhealthy",
                  "error": str(e),
                  "timestamp": int(time.time())
              }
          )
  
  
be52af70   tangwang   first commit
255
256
257
258
259
260
  # Include routers
  from .routes import search, admin
  
  app.include_router(search.router)
  app.include_router(admin.router)
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
  # Mount static files and serve frontend
  frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
  if os.path.exists(frontend_path):
      # Serve frontend HTML at root
      @app.get("/")
      async def serve_frontend():
          """Serve the frontend HTML."""
          index_path = os.path.join(frontend_path, "index.html")
          if os.path.exists(index_path):
              return FileResponse(index_path)
          return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
      
      # Mount static files (CSS, JS, images)
      app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
      
      logger.info(f"Frontend static files mounted from: {frontend_path}")
  else:
      logger.warning(f"Frontend directory not found: {frontend_path}")
  
be52af70   tangwang   first commit
280
281
  
  if __name__ == "__main__":
4d824a77   tangwang   所有租户共用一套统一配置.tena...
282
      parser = argparse.ArgumentParser(description='Start search API service (multi-tenant)')
be52af70   tangwang   first commit
283
      parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
2a76641e   tangwang   config
284
      parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
be52af70   tangwang   first commit
285
286
287
288
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
      args = parser.parse_args()
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
289
      # Set environment variable
be52af70   tangwang   first commit
290
291
292
293
294
295
296
297
298
      os.environ['ES_HOST'] = args.es_host
  
      # Run server
      uvicorn.run(
          "api.app:app",
          host=args.host,
          port=args.port,
          reload=args.reload
      )