be52af70
tangwang
first commit
|
1
2
3
4
|
"""
Main FastAPI application for the search service.
Usage:
|
2a76641e
tangwang
config
|
5
|
uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
|
be52af70
tangwang
first commit
|
6
7
8
9
|
"""
import os
import sys
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
10
11
12
|
import logging
import time
from collections import defaultdict, deque
|
be52af70
tangwang
first commit
|
13
|
from typing import Optional
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
14
|
from fastapi import FastAPI, Request, HTTPException
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
15
16
|
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
|
be52af70
tangwang
first commit
|
17
|
from fastapi.middleware.cors import CORSMiddleware
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
18
19
20
21
22
|
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
|
be52af70
tangwang
first commit
|
23
24
|
import argparse
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# Configure logging with better formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/tmp/search_engine_api.log', mode='a')
]
)
logger = logging.getLogger(__name__)
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
|
be52af70
tangwang
first commit
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import ConfigLoader, CustomerConfig
from utils import ESClient
from search import Searcher
from query import QueryParser
# Global instances
_config: Optional[CustomerConfig] = None
_es_client: Optional[ESClient] = None
_searcher: Optional[Searcher] = None
_query_parser: Optional[QueryParser] = None
def init_service(customer_id: str = "customer1", es_host: str = "http://localhost:9200"):
"""
Initialize search service with configuration.
Args:
customer_id: Customer configuration ID
es_host: Elasticsearch host URL
"""
global _config, _es_client, _searcher, _query_parser
print(f"Initializing search service for customer: {customer_id}")
# Load configuration
config_loader = ConfigLoader("config/schema")
_config = config_loader.load_customer_config(customer_id)
# Validate configuration
errors = config_loader.validate_config(_config)
if errors:
raise ValueError(f"Configuration validation failed: {errors}")
print(f"Configuration loaded: {_config.customer_name}")
|
a406638e
tangwang
up
|
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# Get ES credentials from environment variables or .env file
es_username = os.getenv('ES_USERNAME')
es_password = os.getenv('ES_PASSWORD')
# Try to load from config if not in env
if not es_username or not es_password:
try:
from config.env_config import get_es_config
es_config = get_es_config()
es_username = es_username or es_config.get('username')
es_password = es_password or es_config.get('password')
except Exception:
pass
# Initialize ES client with authentication if credentials are available
if es_username and es_password:
print(f"Connecting to Elasticsearch with authentication: {es_username}")
_es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
else:
print(f"Connecting to Elasticsearch without authentication")
_es_client = ESClient(hosts=[es_host])
|
be52af70
tangwang
first commit
|
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
if not _es_client.ping():
raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
print(f"Connected to Elasticsearch: {es_host}")
# Initialize query parser
_query_parser = QueryParser(_config)
print("Query parser initialized")
# Initialize searcher
_searcher = Searcher(_config, _es_client, _query_parser)
print("Searcher initialized")
print("Search service ready!")
def get_config() -> CustomerConfig:
"""Get customer configuration."""
if _config is None:
raise RuntimeError("Service not initialized")
return _config
def get_es_client() -> ESClient:
"""Get Elasticsearch client."""
if _es_client is None:
raise RuntimeError("Service not initialized")
return _es_client
def get_searcher() -> Searcher:
"""Get searcher instance."""
if _searcher is None:
raise RuntimeError("Service not initialized")
return _searcher
def get_query_parser() -> QueryParser:
"""Get query parser instance."""
if _query_parser is None:
raise RuntimeError("Service not initialized")
return _query_parser
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
143
|
# Create FastAPI app with enhanced configuration
|
be52af70
tangwang
first commit
|
144
145
146
|
app = FastAPI(
title="E-Commerce Search API",
description="Configurable search engine for cross-border e-commerce",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Add rate limiting middleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Add trusted host middleware (restrict to localhost and trusted domains)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # Allow all hosts for development, restrict in production
|
be52af70
tangwang
first commit
|
161
162
|
)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
163
164
165
166
167
168
169
170
171
172
173
|
# Add security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Add CORS middleware with more restrictive settings
|
be52af70
tangwang
first commit
|
174
175
|
app.add_middleware(
CORSMiddleware,
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
176
|
allow_origins=["*"], # Restrict in production to specific domains
|
be52af70
tangwang
first commit
|
177
|
allow_credentials=True,
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
178
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
be52af70
tangwang
first commit
|
179
|
allow_headers=["*"],
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
180
|
expose_headers=["X-Total-Count"]
|
be52af70
tangwang
first commit
|
181
182
183
184
185
186
187
188
189
|
)
@app.on_event("startup")
async def startup_event():
"""Initialize service on startup."""
customer_id = os.getenv("CUSTOMER_ID", "customer1")
es_host = os.getenv("ES_HOST", "http://localhost:9200")
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
190
191
192
193
|
logger.info(f"Starting E-Commerce Search API")
logger.info(f"Customer ID: {customer_id}")
logger.info(f"Elasticsearch Host: {es_host}")
|
be52af70
tangwang
first commit
|
194
195
|
try:
init_service(customer_id=customer_id, es_host=es_host)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
196
|
logger.info("Service initialized successfully")
|
be52af70
tangwang
first commit
|
197
|
except Exception as e:
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
198
199
200
201
202
203
204
205
|
logger.error(f"Failed to initialize service: {e}")
logger.warning("Service will start but may not function correctly")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
logger.info("Shutting down E-Commerce Search API")
|
be52af70
tangwang
first commit
|
206
207
208
209
|
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
210
211
212
213
|
"""Global exception handler with detailed logging."""
client_ip = request.client.host if request.client else "unknown"
logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
|
be52af70
tangwang
first commit
|
214
215
216
217
|
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
"detail": "An unexpected error occurred. Please try again later.",
"timestamp": int(time.time())
}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP exception handler."""
logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"timestamp": int(time.time())
|
be52af70
tangwang
first commit
|
235
236
237
238
|
}
)
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
239
|
@app.get("/api")
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
240
241
242
243
244
245
|
@limiter.limit("60/minute")
async def root(request: Request):
"""Root endpoint with rate limiting."""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Root endpoint accessed from {client_ip}")
|
be52af70
tangwang
first commit
|
246
247
248
|
return {
"service": "E-Commerce Search API",
"version": "1.0.0",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
249
250
|
"status": "running",
"timestamp": int(time.time())
|
be52af70
tangwang
first commit
|
251
252
253
|
}
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
@app.get("/health")
@limiter.limit("120/minute")
async def health_check(request: Request):
"""Health check endpoint."""
try:
# Check if services are initialized
get_config()
get_es_client()
return {
"status": "healthy",
"services": {
"config": "initialized",
"elasticsearch": "connected",
"searcher": "initialized"
},
"timestamp": int(time.time())
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e),
"timestamp": int(time.time())
}
)
|
be52af70
tangwang
first commit
|
284
285
286
287
288
289
|
# Include routers
from .routes import search, admin
app.include_router(search.router)
app.include_router(admin.router)
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
# Mount static files and serve frontend
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
if os.path.exists(frontend_path):
# Serve frontend HTML at root
@app.get("/")
async def serve_frontend():
"""Serve the frontend HTML."""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
# Mount static files (CSS, JS, images)
app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
logger.info(f"Frontend static files mounted from: {frontend_path}")
else:
logger.warning(f"Frontend directory not found: {frontend_path}")
|
be52af70
tangwang
first commit
|
309
310
311
312
313
314
|
if __name__ == "__main__":
import uvicorn
parser = argparse.ArgumentParser(description='Start search API service')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
|
2a76641e
tangwang
config
|
315
|
parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
|
be52af70
tangwang
first commit
|
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
|
parser.add_argument('--customer', default='customer1', help='Customer ID')
parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
args = parser.parse_args()
# Set environment variables
os.environ['CUSTOMER_ID'] = args.customer
os.environ['ES_HOST'] = args.es_host
# Run server
uvicorn.run(
"api.app:app",
host=args.host,
port=args.port,
reload=args.reload
)
|