be52af70
tangwang
first commit
|
1
2
3
4
|
"""
Main FastAPI application for the search service.
Usage:
|
2a76641e
tangwang
config
|
5
|
uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
|
be52af70
tangwang
first commit
|
6
7
8
9
|
"""
import os
import sys
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
10
11
|
import logging
import time
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
12
13
|
import argparse
import uvicorn
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
14
|
from collections import defaultdict, deque
|
be52af70
tangwang
first commit
|
15
|
from typing import Optional
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
16
|
from fastapi import FastAPI, Request, HTTPException
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
17
18
|
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
|
be52af70
tangwang
first commit
|
19
|
from fastapi.middleware.cors import CORSMiddleware
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
20
21
22
23
24
|
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
|
be52af70
tangwang
first commit
|
25
|
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
26
|
# Configure logging with better formatting
|
3c1f8031
tangwang
api/routes/indexe...
|
27
28
29
|
import pathlib
log_dir = pathlib.Path('logs')
log_dir.mkdir(exist_ok=True)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
30
31
32
33
34
|
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
|
3c1f8031
tangwang
api/routes/indexe...
|
35
|
logging.FileHandler(log_dir / 'api.log', mode='a', encoding='utf-8')
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
36
37
38
39
40
41
42
|
]
)
logger = logging.getLogger(__name__)
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
|
be52af70
tangwang
first commit
|
43
44
45
|
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
46
|
from config.env_config import ES_CONFIG, DB_CONFIG
|
9f96d6f3
tangwang
短query不用语义搜索
|
47
|
from config import ConfigLoader
|
be52af70
tangwang
first commit
|
48
|
from utils import ESClient
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
49
|
from utils.db_connector import create_db_connection
|
be52af70
tangwang
first commit
|
50
|
from search import Searcher
|
be52af70
tangwang
first commit
|
51
|
from query import QueryParser
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
52
|
from indexer.incremental_service import IncrementalIndexerService
|
be52af70
tangwang
first commit
|
53
54
|
# Global instances
|
be52af70
tangwang
first commit
|
55
56
57
|
_es_client: Optional[ESClient] = None
_searcher: Optional[Searcher] = None
_query_parser: Optional[QueryParser] = None
|
9f96d6f3
tangwang
短query不用语义搜索
|
58
|
_config = None
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
59
|
_incremental_service: Optional[IncrementalIndexerService] = None
|
3c1f8031
tangwang
api/routes/indexe...
|
60
|
_bulk_indexing_service = None
|
be52af70
tangwang
first commit
|
61
62
|
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
63
|
def init_service(es_host: str = "http://localhost:9200"):
|
be52af70
tangwang
first commit
|
64
|
"""
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
65
|
Initialize search service with unified configuration.
|
be52af70
tangwang
first commit
|
66
67
|
Args:
|
be52af70
tangwang
first commit
|
68
69
|
es_host: Elasticsearch host URL
"""
|
3c1f8031
tangwang
api/routes/indexe...
|
70
|
global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service
|
be52af70
tangwang
first commit
|
71
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
72
73
|
start_time = time.time()
logger.info("Initializing search service (multi-tenant)")
|
be52af70
tangwang
first commit
|
74
|
|
9f96d6f3
tangwang
短query不用语义搜索
|
75
76
77
78
79
80
|
# Load configuration
logger.info("Loading configuration...")
config_loader = ConfigLoader("config/config.yaml")
_config = config_loader.load_config()
logger.info("Configuration loaded")
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
81
82
83
|
# Get ES credentials
es_username = os.getenv('ES_USERNAME') or ES_CONFIG.get('username')
es_password = os.getenv('ES_PASSWORD') or ES_CONFIG.get('password')
|
be52af70
tangwang
first commit
|
84
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
85
86
|
# Connect to Elasticsearch
logger.info(f"Connecting to Elasticsearch at {es_host}...")
|
a406638e
tangwang
up
|
87
|
if es_username and es_password:
|
a406638e
tangwang
up
|
88
89
|
_es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
else:
|
a406638e
tangwang
up
|
90
91
|
_es_client = ESClient(hosts=[es_host])
|
be52af70
tangwang
first commit
|
92
93
|
if not _es_client.ping():
raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
94
|
logger.info("Elasticsearch connected")
|
be52af70
tangwang
first commit
|
95
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
96
97
|
# Initialize components
logger.info("Initializing query parser...")
|
9f96d6f3
tangwang
短query不用语义搜索
|
98
|
_query_parser = QueryParser(_config)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
99
100
|
logger.info("Initializing searcher...")
|
9f96d6f3
tangwang
短query不用语义搜索
|
101
|
_searcher = Searcher(_es_client, _config, _query_parser)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
102
|
|
3c1f8031
tangwang
api/routes/indexe...
|
103
|
# Initialize indexing services (if DB config is available)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
104
|
try:
|
3c1f8031
tangwang
api/routes/indexe...
|
105
106
107
108
109
110
111
112
113
|
from utils.db_connector import create_db_connection
from indexer.incremental_service import IncrementalIndexerService
from indexer.bulk_indexing_service import BulkIndexingService
db_host = os.getenv('DB_HOST')
db_port = int(os.getenv('DB_PORT', 3306))
db_database = os.getenv('DB_DATABASE')
db_username = os.getenv('DB_USERNAME')
db_password = os.getenv('DB_PASSWORD')
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
114
115
|
if all([db_host, db_database, db_username, db_password]):
|
3c1f8031
tangwang
api/routes/indexe...
|
116
|
logger.info("Initializing database connection for indexing services...")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
117
118
119
120
121
122
123
|
db_engine = create_db_connection(
host=db_host,
port=db_port,
database=db_database,
username=db_username,
password=db_password
)
|
3c1f8031
tangwang
api/routes/indexe...
|
124
125
|
# Initialize incremental service
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
126
127
|
_incremental_service = IncrementalIndexerService(db_engine)
logger.info("Incremental indexer service initialized")
|
3c1f8031
tangwang
api/routes/indexe...
|
128
129
130
131
|
# Initialize bulk indexing service
_bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
logger.info("Bulk indexing service initialized")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
132
|
else:
|
3c1f8031
tangwang
api/routes/indexe...
|
133
|
logger.warning("Database config incomplete, indexing services will not be available")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
134
135
|
logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
except Exception as e:
|
3c1f8031
tangwang
api/routes/indexe...
|
136
137
|
logger.warning(f"Failed to initialize indexing services: {e}")
logger.warning("Indexing endpoints will not be available")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
138
|
_incremental_service = None
|
3c1f8031
tangwang
api/routes/indexe...
|
139
|
_bulk_indexing_service = None
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
140
|
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
141
|
elapsed = time.time() - start_time
|
9f96d6f3
tangwang
短query不用语义搜索
|
142
|
logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}")
|
be52af70
tangwang
first commit
|
143
144
|
|
be52af70
tangwang
first commit
|
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
def get_es_client() -> ESClient:
"""Get Elasticsearch client."""
if _es_client is None:
raise RuntimeError("Service not initialized")
return _es_client
def get_searcher() -> Searcher:
"""Get searcher instance."""
if _searcher is None:
raise RuntimeError("Service not initialized")
return _searcher
def get_query_parser() -> QueryParser:
"""Get query parser instance."""
if _query_parser is None:
raise RuntimeError("Service not initialized")
return _query_parser
|
9f96d6f3
tangwang
短query不用语义搜索
|
168
169
170
171
172
173
174
|
def get_config():
"""Get global config instance."""
if _config is None:
raise RuntimeError("Service not initialized")
return _config
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
175
176
177
178
179
|
def get_incremental_service() -> Optional[IncrementalIndexerService]:
"""Get incremental indexer service instance."""
return _incremental_service
|
3c1f8031
tangwang
api/routes/indexe...
|
180
181
182
183
184
|
def get_bulk_indexing_service():
"""Get bulk indexing service instance."""
return _bulk_indexing_service
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
185
|
# Create FastAPI app with enhanced configuration
|
be52af70
tangwang
first commit
|
186
187
188
|
app = FastAPI(
title="E-Commerce Search API",
description="Configurable search engine for cross-border e-commerce",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Add rate limiting middleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Add trusted host middleware (restrict to localhost and trusted domains)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # Allow all hosts for development, restrict in production
|
be52af70
tangwang
first commit
|
203
204
|
)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
205
206
207
208
209
210
211
212
213
214
215
|
# Add security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Add CORS middleware with more restrictive settings
|
be52af70
tangwang
first commit
|
216
217
|
app.add_middleware(
CORSMiddleware,
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
218
|
allow_origins=["*"], # Restrict in production to specific domains
|
be52af70
tangwang
first commit
|
219
|
allow_credentials=True,
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
220
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
be52af70
tangwang
first commit
|
221
|
allow_headers=["*"],
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
222
|
expose_headers=["X-Total-Count"]
|
be52af70
tangwang
first commit
|
223
224
225
226
227
228
|
)
@app.on_event("startup")
async def startup_event():
"""Initialize service on startup."""
|
be52af70
tangwang
first commit
|
229
|
es_host = os.getenv("ES_HOST", "http://localhost:9200")
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
230
|
logger.info("Starting E-Commerce Search API (Multi-Tenant)")
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
231
|
logger.info(f"Elasticsearch Host: {es_host}")
|
3c1f8031
tangwang
api/routes/indexe...
|
232
|
|
be52af70
tangwang
first commit
|
233
|
try:
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
234
|
init_service(es_host=es_host)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
235
|
logger.info("Service initialized successfully")
|
be52af70
tangwang
first commit
|
236
|
except Exception as e:
|
3c1f8031
tangwang
api/routes/indexe...
|
237
|
logger.error(f"Failed to initialize service: {e}", exc_info=True)
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
238
239
240
241
242
243
244
|
logger.warning("Service will start but may not function correctly")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
logger.info("Shutting down E-Commerce Search API")
|
be52af70
tangwang
first commit
|
245
246
247
248
|
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
249
250
251
252
|
"""Global exception handler with detailed logging."""
client_ip = request.client.host if request.client else "unknown"
logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
|
be52af70
tangwang
first commit
|
253
254
255
256
|
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
"detail": "An unexpected error occurred. Please try again later.",
"timestamp": int(time.time())
}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP exception handler."""
logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"timestamp": int(time.time())
|
be52af70
tangwang
first commit
|
274
275
276
277
|
}
)
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
278
|
@app.get("/api")
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
279
280
281
282
283
284
|
@limiter.limit("60/minute")
async def root(request: Request):
"""Root endpoint with rate limiting."""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Root endpoint accessed from {client_ip}")
|
be52af70
tangwang
first commit
|
285
286
287
|
return {
"service": "E-Commerce Search API",
"version": "1.0.0",
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
288
289
|
"status": "running",
"timestamp": int(time.time())
|
be52af70
tangwang
first commit
|
290
291
292
|
}
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
293
294
295
296
297
298
|
@app.get("/health")
@limiter.limit("120/minute")
async def health_check(request: Request):
"""Health check endpoint."""
try:
# Check if services are initialized
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
299
|
get_es_client()
|
bf89b597
tangwang
feat(search): ada...
|
300
|
get_searcher()
|
bb3c5ef8
tangwang
灌入数据流程跑通
|
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
return {
"status": "healthy",
"services": {
"config": "initialized",
"elasticsearch": "connected",
"searcher": "initialized"
},
"timestamp": int(time.time())
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e),
"timestamp": int(time.time())
}
)
|
be52af70
tangwang
first commit
|
323
|
# Include routers
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
324
|
from .routes import search, admin, indexer
|
be52af70
tangwang
first commit
|
325
326
327
|
app.include_router(search.router)
app.include_router(admin.router)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
328
|
app.include_router(indexer.router)
|
be52af70
tangwang
first commit
|
329
|
|
a7a8c6cb
tangwang
测试过滤、聚合、排序
|
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
|
# Mount static files and serve frontend
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
if os.path.exists(frontend_path):
# Serve frontend HTML at root
@app.get("/")
async def serve_frontend():
"""Serve the frontend HTML."""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
# Mount static files (CSS, JS, images)
app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
logger.info(f"Frontend static files mounted from: {frontend_path}")
else:
logger.warning(f"Frontend directory not found: {frontend_path}")
|
be52af70
tangwang
first commit
|
349
350
|
if __name__ == "__main__":
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
351
|
parser = argparse.ArgumentParser(description='Start search API service (multi-tenant)')
|
be52af70
tangwang
first commit
|
352
|
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
|
2a76641e
tangwang
config
|
353
|
parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
|
be52af70
tangwang
first commit
|
354
355
356
357
|
parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
args = parser.parse_args()
|
4d824a77
tangwang
所有租户共用一套统一配置.tena...
|
358
|
# Set environment variable
|
be52af70
tangwang
first commit
|
359
360
361
362
363
364
365
366
367
|
os.environ['ES_HOST'] = args.es_host
# Run server
uvicorn.run(
"api.app:app",
host=args.host,
port=args.port,
reload=args.reload
)
|