Blame view

api/app.py 9.49 KB
be52af70   tangwang   first commit
1
2
3
4
  """
  Main FastAPI application for the search service.
  
  Usage:
2a76641e   tangwang   config
5
      uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
be52af70   tangwang   first commit
6
7
8
9
  """
  
  import os
  import sys
bb3c5ef8   tangwang   灌入数据流程跑通
10
11
12
  import logging
  import time
  from collections import defaultdict, deque
be52af70   tangwang   first commit
13
  from typing import Optional
bb3c5ef8   tangwang   灌入数据流程跑通
14
  from fastapi import FastAPI, Request, HTTPException
be52af70   tangwang   first commit
15
16
  from fastapi.responses import JSONResponse
  from fastapi.middleware.cors import CORSMiddleware
bb3c5ef8   tangwang   灌入数据流程跑通
17
18
19
20
21
  from fastapi.middleware.trustedhost import TrustedHostMiddleware
  from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  from slowapi import Limiter, _rate_limit_exceeded_handler
  from slowapi.util import get_remote_address
  from slowapi.errors import RateLimitExceeded
be52af70   tangwang   first commit
22
23
  import argparse
  
bb3c5ef8   tangwang   灌入数据流程跑通
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  # Configure logging with better formatting
  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
      handlers=[
          logging.StreamHandler(),
          logging.FileHandler('/tmp/search_engine_api.log', mode='a')
      ]
  )
  logger = logging.getLogger(__name__)
  
  # Initialize rate limiter
  limiter = Limiter(key_func=get_remote_address)
  
be52af70   tangwang   first commit
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  # Add parent directory to path
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
  from config import ConfigLoader, CustomerConfig
  from utils import ESClient
  from search import Searcher
  from query import QueryParser
  
  # Global instances
  _config: Optional[CustomerConfig] = None
  _es_client: Optional[ESClient] = None
  _searcher: Optional[Searcher] = None
  _query_parser: Optional[QueryParser] = None
  
  
  def init_service(customer_id: str = "customer1", es_host: str = "http://localhost:9200"):
      """
      Initialize search service with configuration.
  
      Args:
          customer_id: Customer configuration ID
          es_host: Elasticsearch host URL
      """
      global _config, _es_client, _searcher, _query_parser
  
      print(f"Initializing search service for customer: {customer_id}")
  
      # Load configuration
      config_loader = ConfigLoader("config/schema")
      _config = config_loader.load_customer_config(customer_id)
  
      # Validate configuration
      errors = config_loader.validate_config(_config)
      if errors:
          raise ValueError(f"Configuration validation failed: {errors}")
  
      print(f"Configuration loaded: {_config.customer_name}")
  
a406638e   tangwang   up
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
      # Get ES credentials from environment variables or .env file
      es_username = os.getenv('ES_USERNAME')
      es_password = os.getenv('ES_PASSWORD')
      
      # Try to load from config if not in env
      if not es_username or not es_password:
          try:
              from config.env_config import get_es_config
              es_config = get_es_config()
              es_username = es_username or es_config.get('username')
              es_password = es_password or es_config.get('password')
          except Exception:
              pass
  
      # Initialize ES client with authentication if credentials are available
      if es_username and es_password:
          print(f"Connecting to Elasticsearch with authentication: {es_username}")
          _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
      else:
          print(f"Connecting to Elasticsearch without authentication")
          _es_client = ESClient(hosts=[es_host])
      
be52af70   tangwang   first commit
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
      if not _es_client.ping():
          raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
  
      print(f"Connected to Elasticsearch: {es_host}")
  
      # Initialize query parser
      _query_parser = QueryParser(_config)
      print("Query parser initialized")
  
      # Initialize searcher
      _searcher = Searcher(_config, _es_client, _query_parser)
      print("Searcher initialized")
  
      print("Search service ready!")
  
  
  def get_config() -> CustomerConfig:
      """Get customer configuration."""
      if _config is None:
          raise RuntimeError("Service not initialized")
      return _config
  
  
  def get_es_client() -> ESClient:
      """Get Elasticsearch client."""
      if _es_client is None:
          raise RuntimeError("Service not initialized")
      return _es_client
  
  
  def get_searcher() -> Searcher:
      """Get searcher instance."""
      if _searcher is None:
          raise RuntimeError("Service not initialized")
      return _searcher
  
  
  def get_query_parser() -> QueryParser:
      """Get query parser instance."""
      if _query_parser is None:
          raise RuntimeError("Service not initialized")
      return _query_parser
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
142
  # Create FastAPI app with enhanced configuration
be52af70   tangwang   first commit
143
144
145
  app = FastAPI(
      title="E-Commerce Search API",
      description="Configurable search engine for cross-border e-commerce",
bb3c5ef8   tangwang   灌入数据流程跑通
146
147
148
149
150
151
152
153
154
155
156
157
158
159
      version="1.0.0",
      docs_url="/docs",
      redoc_url="/redoc",
      openapi_url="/openapi.json"
  )
  
  # Add rate limiting middleware
  app.state.limiter = limiter
  app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
  
  # Add trusted host middleware (restrict to localhost and trusted domains)
  app.add_middleware(
      TrustedHostMiddleware,
      allowed_hosts=["*"]  # Allow all hosts for development, restrict in production
be52af70   tangwang   first commit
160
161
  )
  
bb3c5ef8   tangwang   灌入数据流程跑通
162
163
164
165
166
167
168
169
170
171
172
  # Add security headers middleware
  @app.middleware("http")
  async def add_security_headers(request: Request, call_next):
      response = await call_next(request)
      response.headers["X-Content-Type-Options"] = "nosniff"
      response.headers["X-Frame-Options"] = "DENY"
      response.headers["X-XSS-Protection"] = "1; mode=block"
      response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
      return response
  
  # Add CORS middleware with more restrictive settings
be52af70   tangwang   first commit
173
174
  app.add_middleware(
      CORSMiddleware,
bb3c5ef8   tangwang   灌入数据流程跑通
175
      allow_origins=["*"],  # Restrict in production to specific domains
be52af70   tangwang   first commit
176
      allow_credentials=True,
bb3c5ef8   tangwang   灌入数据流程跑通
177
      allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
be52af70   tangwang   first commit
178
      allow_headers=["*"],
bb3c5ef8   tangwang   灌入数据流程跑通
179
      expose_headers=["X-Total-Count"]
be52af70   tangwang   first commit
180
181
182
183
184
185
186
187
188
  )
  
  
  @app.on_event("startup")
  async def startup_event():
      """Initialize service on startup."""
      customer_id = os.getenv("CUSTOMER_ID", "customer1")
      es_host = os.getenv("ES_HOST", "http://localhost:9200")
  
bb3c5ef8   tangwang   灌入数据流程跑通
189
190
191
192
      logger.info(f"Starting E-Commerce Search API")
      logger.info(f"Customer ID: {customer_id}")
      logger.info(f"Elasticsearch Host: {es_host}")
  
be52af70   tangwang   first commit
193
194
      try:
          init_service(customer_id=customer_id, es_host=es_host)
bb3c5ef8   tangwang   灌入数据流程跑通
195
          logger.info("Service initialized successfully")
be52af70   tangwang   first commit
196
      except Exception as e:
bb3c5ef8   tangwang   灌入数据流程跑通
197
198
199
200
201
202
203
204
          logger.error(f"Failed to initialize service: {e}")
          logger.warning("Service will start but may not function correctly")
  
  
  @app.on_event("shutdown")
  async def shutdown_event():
      """Cleanup on shutdown."""
      logger.info("Shutting down E-Commerce Search API")
be52af70   tangwang   first commit
205
206
207
208
  
  
  @app.exception_handler(Exception)
  async def global_exception_handler(request: Request, exc: Exception):
bb3c5ef8   tangwang   灌入数据流程跑通
209
210
211
212
      """Global exception handler with detailed logging."""
      client_ip = request.client.host if request.client else "unknown"
      logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
  
be52af70   tangwang   first commit
213
214
215
216
      return JSONResponse(
          status_code=500,
          content={
              "error": "Internal server error",
bb3c5ef8   tangwang   灌入数据流程跑通
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
              "detail": "An unexpected error occurred. Please try again later.",
              "timestamp": int(time.time())
          }
      )
  
  
  @app.exception_handler(HTTPException)
  async def http_exception_handler(request: Request, exc: HTTPException):
      """HTTP exception handler."""
      logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
  
      return JSONResponse(
          status_code=exc.status_code,
          content={
              "error": exc.detail,
              "status_code": exc.status_code,
              "timestamp": int(time.time())
be52af70   tangwang   first commit
234
235
236
237
238
          }
      )
  
  
  @app.get("/")
bb3c5ef8   tangwang   灌入数据流程跑通
239
240
241
242
243
244
  @limiter.limit("60/minute")
  async def root(request: Request):
      """Root endpoint with rate limiting."""
      client_ip = request.client.host if request.client else "unknown"
      logger.info(f"Root endpoint accessed from {client_ip}")
  
be52af70   tangwang   first commit
245
246
247
      return {
          "service": "E-Commerce Search API",
          "version": "1.0.0",
bb3c5ef8   tangwang   灌入数据流程跑通
248
249
          "status": "running",
          "timestamp": int(time.time())
be52af70   tangwang   first commit
250
251
252
      }
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
  @app.get("/health")
  @limiter.limit("120/minute")
  async def health_check(request: Request):
      """Health check endpoint."""
      try:
          # Check if services are initialized
          get_config()
          get_es_client()
  
          return {
              "status": "healthy",
              "services": {
                  "config": "initialized",
                  "elasticsearch": "connected",
                  "searcher": "initialized"
              },
              "timestamp": int(time.time())
          }
      except Exception as e:
          logger.error(f"Health check failed: {e}")
          return JSONResponse(
              status_code=503,
              content={
                  "status": "unhealthy",
                  "error": str(e),
                  "timestamp": int(time.time())
              }
          )
  
  
be52af70   tangwang   first commit
283
284
285
286
287
288
289
290
291
292
293
294
  # Include routers
  from .routes import search, admin
  
  app.include_router(search.router)
  app.include_router(admin.router)
  
  
  if __name__ == "__main__":
      import uvicorn
  
      parser = argparse.ArgumentParser(description='Start search API service')
      parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
2a76641e   tangwang   config
295
      parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
be52af70   tangwang   first commit
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
      parser.add_argument('--customer', default='customer1', help='Customer ID')
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
      args = parser.parse_args()
  
      # Set environment variables
      os.environ['CUSTOMER_ID'] = args.customer
      os.environ['ES_HOST'] = args.es_host
  
      # Run server
      uvicorn.run(
          "api.app:app",
          host=args.host,
          port=args.port,
          reload=args.reload
      )