app.py
12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
"""
Main FastAPI application for the search service.
Usage:
uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
"""
import os
import sys
import logging
import time
import argparse
import uvicorn
from logging.handlers import TimedRotatingFileHandler
from collections import defaultdict, deque
from typing import Optional
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Configure backend logging
import pathlib
from request_log_context import LOG_LINE_FORMAT, RequestLogContextFilter
def configure_backend_logging() -> None:
log_dir = pathlib.Path("logs")
log_dir.mkdir(exist_ok=True)
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
numeric_level = getattr(logging, log_level, logging.INFO)
default_formatter = logging.Formatter(LOG_LINE_FORMAT)
request_filter = RequestLogContextFilter()
root_logger = logging.getLogger()
root_logger.setLevel(numeric_level)
root_logger.handlers.clear()
console_handler = logging.StreamHandler()
console_handler.setLevel(numeric_level)
console_handler.setFormatter(default_formatter)
console_handler.addFilter(request_filter)
root_logger.addHandler(console_handler)
backend_handler = TimedRotatingFileHandler(
filename=log_dir / "api.log",
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8",
)
backend_handler.setLevel(numeric_level)
backend_handler.setFormatter(default_formatter)
backend_handler.addFilter(request_filter)
root_logger.addHandler(backend_handler)
verbose_logger = logging.getLogger("backend.verbose")
verbose_logger.setLevel(numeric_level)
verbose_logger.handlers.clear()
verbose_logger.propagate = False
verbose_handler = TimedRotatingFileHandler(
filename=log_dir / "backend_verbose.log",
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8",
)
verbose_handler.setLevel(numeric_level)
verbose_handler.setFormatter(logging.Formatter(LOG_LINE_FORMAT))
verbose_handler.addFilter(request_filter)
verbose_logger.addHandler(verbose_handler)
for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"):
uvicorn_logger = logging.getLogger(logger_name)
uvicorn_logger.handlers.clear()
uvicorn_logger.setLevel(numeric_level)
uvicorn_logger.propagate = True
configure_backend_logging()
logger = logging.getLogger(__name__)
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import get_app_config
from utils import ESClient
from search import Searcher
from query import QueryParser
from suggestion import SuggestionService
from .service_registry import set_es_client
# Global instances
_es_client: Optional[ESClient] = None
_searcher: Optional[Searcher] = None
_query_parser: Optional[QueryParser] = None
_suggestion_service: Optional[SuggestionService] = None
_app_config = None
def _request_log_extra_from_http(request: Request) -> dict:
reqid = getattr(getattr(request, "state", None), "reqid", None) or request.headers.get("X-Request-ID")
uid = (
getattr(getattr(request, "state", None), "uid", None)
or request.headers.get("X-User-ID")
or request.headers.get("User-ID")
)
return {"reqid": reqid or "-1", "uid": uid or "-1"}
def init_service(es_host: str = "http://localhost:9200"):
"""
Initialize search service with unified configuration.
Args:
es_host: Elasticsearch host URL
"""
global _es_client, _searcher, _query_parser, _suggestion_service, _app_config
start_time = time.time()
logger.info("Initializing search service (multi-tenant)")
# Load configuration
logger.info("Loading configuration...")
_app_config = get_app_config()
search_config = _app_config.search
logger.info("Configuration loaded")
# Get ES credentials
es_username = _app_config.infrastructure.elasticsearch.username
es_password = _app_config.infrastructure.elasticsearch.password
# Connect to Elasticsearch
logger.info(f"Connecting to Elasticsearch at {es_host}...")
if es_username and es_password:
_es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
else:
_es_client = ESClient(hosts=[es_host])
if not _es_client.ping():
raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
logger.info("Elasticsearch connected")
# expose ES client for any shared components (e.g. searcher)
set_es_client(_es_client)
# Initialize components
logger.info("Initializing query parser...")
_query_parser = QueryParser(search_config)
logger.info("Initializing searcher...")
_searcher = Searcher(_es_client, search_config, _query_parser)
logger.info("Initializing suggestion service...")
_suggestion_service = SuggestionService(_es_client)
elapsed = time.time() - start_time
logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {search_config.es_index_name}")
def get_es_client() -> ESClient:
"""Get Elasticsearch client."""
if _es_client is None:
raise RuntimeError("Service not initialized")
return _es_client
def get_searcher() -> Searcher:
"""Get searcher instance."""
if _searcher is None:
raise RuntimeError("Service not initialized")
return _searcher
def get_query_parser() -> QueryParser:
"""Get query parser instance."""
if _query_parser is None:
raise RuntimeError("Service not initialized")
return _query_parser
def get_suggestion_service() -> SuggestionService:
"""Get suggestion service instance."""
if _suggestion_service is None:
raise RuntimeError("Service not initialized")
return _suggestion_service
def get_config():
"""Get global config instance."""
if _app_config is None:
raise RuntimeError("Service not initialized")
return _app_config
# Create FastAPI app with enhanced configuration
app = FastAPI(
title="E-Commerce Search API",
description="Configurable search engine for cross-border e-commerce",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Add rate limiting middleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Add trusted host middleware (restrict to localhost and trusted domains)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # Allow all hosts for development, restrict in production
)
# Add security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Add CORS middleware with more restrictive settings
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production to specific domains
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["X-Total-Count"]
)
@app.on_event("startup")
async def startup_event():
"""Initialize service on startup."""
# Configure thread pool size for uvicorn (default is 40, set to 48)
try:
import anyio.to_thread
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 48
logger.info(f"Thread pool size set to {limiter.total_tokens}")
except Exception as e:
logger.warning(f"Failed to set thread pool size: {e}, using default")
es_host = get_app_config().infrastructure.elasticsearch.host
logger.info("Starting E-Commerce Search API (Multi-Tenant)")
logger.info(f"Elasticsearch Host: {es_host}")
try:
init_service(es_host=es_host)
logger.info("Service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize service: {e}", exc_info=True)
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
logger.info("Shutting down E-Commerce Search API")
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler with detailed logging."""
client_ip = request.client.host if request.client else "unknown"
logger.error(
f"Unhandled exception from {client_ip}: {exc}",
exc_info=True,
extra=_request_log_extra_from_http(request),
)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"detail": "An unexpected error occurred. Please try again later.",
"timestamp": int(time.time())
}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP exception handler."""
logger.warning(
f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}",
extra=_request_log_extra_from_http(request),
)
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"timestamp": int(time.time())
}
)
@app.get("/api")
@limiter.limit("60/minute")
async def root(request: Request):
"""Root endpoint with rate limiting."""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Root endpoint accessed from {client_ip}")
return {
"service": "E-Commerce Search API",
"version": "1.0.0",
"status": "running",
"timestamp": int(time.time())
}
@app.get("/health")
@limiter.limit("120/minute")
async def health_check(request: Request):
"""Health check endpoint."""
try:
# Check if services are initialized
get_es_client()
get_searcher()
return {
"status": "healthy",
"services": {
"config": "initialized",
"elasticsearch": "connected",
"searcher": "initialized"
},
"timestamp": int(time.time())
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e),
"timestamp": int(time.time())
}
)
# Include routers (search app should NOT mount indexer routes)
from .routes import search, admin
app.include_router(search.router)
app.include_router(admin.router)
# Mount static files and serve frontend
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
if os.path.exists(frontend_path):
# Serve frontend HTML at root
@app.get("/")
async def serve_frontend():
"""Serve the frontend HTML."""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
# Mount static files (CSS, JS, images)
app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
logger.info(f"Frontend static files mounted from: {frontend_path}")
else:
logger.warning(f"Frontend directory not found: {frontend_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Start search API service (multi-tenant)')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
args = parser.parse_args()
# Set environment variable
os.environ['ES_HOST'] = args.es_host
# Run server
uvicorn.run(
"api.app:app",
host=args.host,
port=args.port,
reload=args.reload
)