Blame view

api/app.py 11.9 KB
be52af70   tangwang   first commit
1
2
3
4
  """
  Main FastAPI application for the search service.
  
  Usage:
2a76641e   tangwang   config
5
      uvicorn api.app:app --host 0.0.0.0 --port 6002 --reload
be52af70   tangwang   first commit
6
7
8
9
  """
  
  import os
  import sys
bb3c5ef8   tangwang   灌入数据流程跑通
10
11
  import logging
  import time
325eec03   tangwang   1. 日志、配置基础设施,使用优化
12
13
  import argparse
  import uvicorn
bb3c5ef8   tangwang   灌入数据流程跑通
14
  from collections import defaultdict, deque
be52af70   tangwang   first commit
15
  from typing import Optional
bb3c5ef8   tangwang   灌入数据流程跑通
16
  from fastapi import FastAPI, Request, HTTPException
a7a8c6cb   tangwang   测试过滤、聚合、排序
17
18
  from fastapi.responses import JSONResponse, FileResponse
  from fastapi.staticfiles import StaticFiles
be52af70   tangwang   first commit
19
  from fastapi.middleware.cors import CORSMiddleware
bb3c5ef8   tangwang   灌入数据流程跑通
20
21
22
23
24
  from fastapi.middleware.trustedhost import TrustedHostMiddleware
  from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  from slowapi import Limiter, _rate_limit_exceeded_handler
  from slowapi.util import get_remote_address
  from slowapi.errors import RateLimitExceeded
be52af70   tangwang   first commit
25
  
bb3c5ef8   tangwang   灌入数据流程跑通
26
  # Configure logging with better formatting
3c1f8031   tangwang   api/routes/indexe...
27
28
29
  import pathlib
  log_dir = pathlib.Path('logs')
  log_dir.mkdir(exist_ok=True)
bb3c5ef8   tangwang   灌入数据流程跑通
30
31
32
33
34
  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
      handlers=[
          logging.StreamHandler(),
3c1f8031   tangwang   api/routes/indexe...
35
          logging.FileHandler(log_dir / 'api.log', mode='a', encoding='utf-8')
bb3c5ef8   tangwang   灌入数据流程跑通
36
37
38
39
40
41
42
      ]
  )
  logger = logging.getLogger(__name__)
  
  # Initialize rate limiter
  limiter = Limiter(key_func=get_remote_address)
  
be52af70   tangwang   first commit
43
44
45
  # Add parent directory to path
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
46
  from config.env_config import ES_CONFIG, DB_CONFIG
9f96d6f3   tangwang   短query不用语义搜索
47
  from config import ConfigLoader
be52af70   tangwang   first commit
48
  from utils import ESClient
0064e946   tangwang   feat: 增量索引服务、租户配置...
49
  from utils.db_connector import create_db_connection
be52af70   tangwang   first commit
50
  from search import Searcher
be52af70   tangwang   first commit
51
  from query import QueryParser
0064e946   tangwang   feat: 增量索引服务、租户配置...
52
  from indexer.incremental_service import IncrementalIndexerService
be52af70   tangwang   first commit
53
54
  
  # Global instances
be52af70   tangwang   first commit
55
56
57
  _es_client: Optional[ESClient] = None
  _searcher: Optional[Searcher] = None
  _query_parser: Optional[QueryParser] = None
9f96d6f3   tangwang   短query不用语义搜索
58
  _config = None
0064e946   tangwang   feat: 增量索引服务、租户配置...
59
  _incremental_service: Optional[IncrementalIndexerService] = None
3c1f8031   tangwang   api/routes/indexe...
60
  _bulk_indexing_service = None
be52af70   tangwang   first commit
61
62
  
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
63
  def init_service(es_host: str = "http://localhost:9200"):
be52af70   tangwang   first commit
64
      """
4d824a77   tangwang   所有租户共用一套统一配置.tena...
65
      Initialize search service with unified configuration.
be52af70   tangwang   first commit
66
67
  
      Args:
be52af70   tangwang   first commit
68
69
          es_host: Elasticsearch host URL
      """
3c1f8031   tangwang   api/routes/indexe...
70
      global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service
be52af70   tangwang   first commit
71
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
72
73
      start_time = time.time()
      logger.info("Initializing search service (multi-tenant)")
be52af70   tangwang   first commit
74
  
9f96d6f3   tangwang   短query不用语义搜索
75
76
77
78
79
80
      # Load configuration
      logger.info("Loading configuration...")
      config_loader = ConfigLoader("config/config.yaml")
      _config = config_loader.load_config()
      logger.info("Configuration loaded")
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
81
82
83
      # Get ES credentials
      es_username = os.getenv('ES_USERNAME') or ES_CONFIG.get('username')
      es_password = os.getenv('ES_PASSWORD') or ES_CONFIG.get('password')
be52af70   tangwang   first commit
84
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
85
86
      # Connect to Elasticsearch
      logger.info(f"Connecting to Elasticsearch at {es_host}...")
a406638e   tangwang   up
87
      if es_username and es_password:
a406638e   tangwang   up
88
89
          _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
      else:
a406638e   tangwang   up
90
91
          _es_client = ESClient(hosts=[es_host])
      
be52af70   tangwang   first commit
92
93
      if not _es_client.ping():
          raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
325eec03   tangwang   1. 日志、配置基础设施,使用优化
94
      logger.info("Elasticsearch connected")
be52af70   tangwang   first commit
95
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
96
97
      # Initialize components
      logger.info("Initializing query parser...")
9f96d6f3   tangwang   短query不用语义搜索
98
      _query_parser = QueryParser(_config)
325eec03   tangwang   1. 日志、配置基础设施,使用优化
99
100
      
      logger.info("Initializing searcher...")
9f96d6f3   tangwang   短query不用语义搜索
101
      _searcher = Searcher(_es_client, _config, _query_parser)
325eec03   tangwang   1. 日志、配置基础设施,使用优化
102
      
3c1f8031   tangwang   api/routes/indexe...
103
      # Initialize indexing services (if DB config is available)
0064e946   tangwang   feat: 增量索引服务、租户配置...
104
      try:
3c1f8031   tangwang   api/routes/indexe...
105
106
107
108
109
110
111
112
113
          from utils.db_connector import create_db_connection
          from indexer.incremental_service import IncrementalIndexerService
          from indexer.bulk_indexing_service import BulkIndexingService
          
          db_host = os.getenv('DB_HOST')
          db_port = int(os.getenv('DB_PORT', 3306))
          db_database = os.getenv('DB_DATABASE')
          db_username = os.getenv('DB_USERNAME')
          db_password = os.getenv('DB_PASSWORD')
0064e946   tangwang   feat: 增量索引服务、租户配置...
114
115
          
          if all([db_host, db_database, db_username, db_password]):
3c1f8031   tangwang   api/routes/indexe...
116
              logger.info("Initializing database connection for indexing services...")
0064e946   tangwang   feat: 增量索引服务、租户配置...
117
118
119
120
121
122
123
              db_engine = create_db_connection(
                  host=db_host,
                  port=db_port,
                  database=db_database,
                  username=db_username,
                  password=db_password
              )
3c1f8031   tangwang   api/routes/indexe...
124
125
              
              # Initialize incremental service
0064e946   tangwang   feat: 增量索引服务、租户配置...
126
127
              _incremental_service = IncrementalIndexerService(db_engine)
              logger.info("Incremental indexer service initialized")
3c1f8031   tangwang   api/routes/indexe...
128
129
130
131
              
              # Initialize bulk indexing service
              _bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
              logger.info("Bulk indexing service initialized")
0064e946   tangwang   feat: 增量索引服务、租户配置...
132
          else:
3c1f8031   tangwang   api/routes/indexe...
133
              logger.warning("Database config incomplete, indexing services will not be available")
0064e946   tangwang   feat: 增量索引服务、租户配置...
134
135
              logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
      except Exception as e:
3c1f8031   tangwang   api/routes/indexe...
136
137
          logger.warning(f"Failed to initialize indexing services: {e}")
          logger.warning("Indexing endpoints will not be available")
0064e946   tangwang   feat: 增量索引服务、租户配置...
138
          _incremental_service = None
3c1f8031   tangwang   api/routes/indexe...
139
          _bulk_indexing_service = None
0064e946   tangwang   feat: 增量索引服务、租户配置...
140
      
325eec03   tangwang   1. 日志、配置基础设施,使用优化
141
      elapsed = time.time() - start_time
9f96d6f3   tangwang   短query不用语义搜索
142
      logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}")
be52af70   tangwang   first commit
143
144
  
  
be52af70   tangwang   first commit
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
  
  
  def get_es_client() -> ESClient:
      """Get Elasticsearch client."""
      if _es_client is None:
          raise RuntimeError("Service not initialized")
      return _es_client
  
  
  def get_searcher() -> Searcher:
      """Get searcher instance."""
      if _searcher is None:
          raise RuntimeError("Service not initialized")
      return _searcher
  
  
  def get_query_parser() -> QueryParser:
      """Get query parser instance."""
      if _query_parser is None:
          raise RuntimeError("Service not initialized")
      return _query_parser
  
  
9f96d6f3   tangwang   短query不用语义搜索
168
169
170
171
172
173
174
  def get_config():
      """Get global config instance."""
      if _config is None:
          raise RuntimeError("Service not initialized")
      return _config
  
  
0064e946   tangwang   feat: 增量索引服务、租户配置...
175
176
177
178
179
  def get_incremental_service() -> Optional[IncrementalIndexerService]:
      """Get incremental indexer service instance."""
      return _incremental_service
  
  
3c1f8031   tangwang   api/routes/indexe...
180
181
182
183
184
  def get_bulk_indexing_service():
      """Get bulk indexing service instance."""
      return _bulk_indexing_service
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
185
  # Create FastAPI app with enhanced configuration
be52af70   tangwang   first commit
186
187
188
  app = FastAPI(
      title="E-Commerce Search API",
      description="Configurable search engine for cross-border e-commerce",
bb3c5ef8   tangwang   灌入数据流程跑通
189
190
191
192
193
194
195
196
197
198
199
200
201
202
      version="1.0.0",
      docs_url="/docs",
      redoc_url="/redoc",
      openapi_url="/openapi.json"
  )
  
  # Add rate limiting middleware
  app.state.limiter = limiter
  app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
  
  # Add trusted host middleware (restrict to localhost and trusted domains)
  app.add_middleware(
      TrustedHostMiddleware,
      allowed_hosts=["*"]  # Allow all hosts for development, restrict in production
be52af70   tangwang   first commit
203
204
  )
  
bb3c5ef8   tangwang   灌入数据流程跑通
205
206
207
208
209
210
211
212
213
214
215
  # Add security headers middleware
  @app.middleware("http")
  async def add_security_headers(request: Request, call_next):
      response = await call_next(request)
      response.headers["X-Content-Type-Options"] = "nosniff"
      response.headers["X-Frame-Options"] = "DENY"
      response.headers["X-XSS-Protection"] = "1; mode=block"
      response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
      return response
  
  # Add CORS middleware with more restrictive settings
be52af70   tangwang   first commit
216
217
  app.add_middleware(
      CORSMiddleware,
bb3c5ef8   tangwang   灌入数据流程跑通
218
      allow_origins=["*"],  # Restrict in production to specific domains
be52af70   tangwang   first commit
219
      allow_credentials=True,
bb3c5ef8   tangwang   灌入数据流程跑通
220
      allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
be52af70   tangwang   first commit
221
      allow_headers=["*"],
bb3c5ef8   tangwang   灌入数据流程跑通
222
      expose_headers=["X-Total-Count"]
be52af70   tangwang   first commit
223
224
225
226
227
228
  )
  
  
  @app.on_event("startup")
  async def startup_event():
      """Initialize service on startup."""
be52af70   tangwang   first commit
229
      es_host = os.getenv("ES_HOST", "http://localhost:9200")
4d824a77   tangwang   所有租户共用一套统一配置.tena...
230
      logger.info("Starting E-Commerce Search API (Multi-Tenant)")
bb3c5ef8   tangwang   灌入数据流程跑通
231
      logger.info(f"Elasticsearch Host: {es_host}")
3c1f8031   tangwang   api/routes/indexe...
232
      
be52af70   tangwang   first commit
233
      try:
4d824a77   tangwang   所有租户共用一套统一配置.tena...
234
          init_service(es_host=es_host)
bb3c5ef8   tangwang   灌入数据流程跑通
235
          logger.info("Service initialized successfully")
be52af70   tangwang   first commit
236
      except Exception as e:
3c1f8031   tangwang   api/routes/indexe...
237
          logger.error(f"Failed to initialize service: {e}", exc_info=True)
bb3c5ef8   tangwang   灌入数据流程跑通
238
239
240
241
242
243
244
          logger.warning("Service will start but may not function correctly")
  
  
  @app.on_event("shutdown")
  async def shutdown_event():
      """Cleanup on shutdown."""
      logger.info("Shutting down E-Commerce Search API")
be52af70   tangwang   first commit
245
246
247
248
  
  
  @app.exception_handler(Exception)
  async def global_exception_handler(request: Request, exc: Exception):
bb3c5ef8   tangwang   灌入数据流程跑通
249
250
251
252
      """Global exception handler with detailed logging."""
      client_ip = request.client.host if request.client else "unknown"
      logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
  
be52af70   tangwang   first commit
253
254
255
256
      return JSONResponse(
          status_code=500,
          content={
              "error": "Internal server error",
bb3c5ef8   tangwang   灌入数据流程跑通
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
              "detail": "An unexpected error occurred. Please try again later.",
              "timestamp": int(time.time())
          }
      )
  
  
  @app.exception_handler(HTTPException)
  async def http_exception_handler(request: Request, exc: HTTPException):
      """HTTP exception handler."""
      logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}")
  
      return JSONResponse(
          status_code=exc.status_code,
          content={
              "error": exc.detail,
              "status_code": exc.status_code,
              "timestamp": int(time.time())
be52af70   tangwang   first commit
274
275
276
277
          }
      )
  
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
278
  @app.get("/api")
bb3c5ef8   tangwang   灌入数据流程跑通
279
280
281
282
283
284
  @limiter.limit("60/minute")
  async def root(request: Request):
      """Root endpoint with rate limiting."""
      client_ip = request.client.host if request.client else "unknown"
      logger.info(f"Root endpoint accessed from {client_ip}")
  
be52af70   tangwang   first commit
285
286
287
      return {
          "service": "E-Commerce Search API",
          "version": "1.0.0",
bb3c5ef8   tangwang   灌入数据流程跑通
288
289
          "status": "running",
          "timestamp": int(time.time())
be52af70   tangwang   first commit
290
291
292
      }
  
  
bb3c5ef8   tangwang   灌入数据流程跑通
293
294
295
296
297
298
  @app.get("/health")
  @limiter.limit("120/minute")
  async def health_check(request: Request):
      """Health check endpoint."""
      try:
          # Check if services are initialized
bb3c5ef8   tangwang   灌入数据流程跑通
299
          get_es_client()
bf89b597   tangwang   feat(search): ada...
300
          get_searcher()
bb3c5ef8   tangwang   灌入数据流程跑通
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
  
          return {
              "status": "healthy",
              "services": {
                  "config": "initialized",
                  "elasticsearch": "connected",
                  "searcher": "initialized"
              },
              "timestamp": int(time.time())
          }
      except Exception as e:
          logger.error(f"Health check failed: {e}")
          return JSONResponse(
              status_code=503,
              content={
                  "status": "unhealthy",
                  "error": str(e),
                  "timestamp": int(time.time())
              }
          )
  
  
be52af70   tangwang   first commit
323
  # Include routers
0064e946   tangwang   feat: 增量索引服务、租户配置...
324
  from .routes import search, admin, indexer
be52af70   tangwang   first commit
325
326
327
  
  app.include_router(search.router)
  app.include_router(admin.router)
0064e946   tangwang   feat: 增量索引服务、租户配置...
328
  app.include_router(indexer.router)
be52af70   tangwang   first commit
329
  
a7a8c6cb   tangwang   测试过滤、聚合、排序
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
  # Mount static files and serve frontend
  frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
  if os.path.exists(frontend_path):
      # Serve frontend HTML at root
      @app.get("/")
      async def serve_frontend():
          """Serve the frontend HTML."""
          index_path = os.path.join(frontend_path, "index.html")
          if os.path.exists(index_path):
              return FileResponse(index_path)
          return {"service": "E-Commerce Search API", "version": "1.0.0", "status": "running"}
      
      # Mount static files (CSS, JS, images)
      app.mount("/static", StaticFiles(directory=os.path.join(frontend_path, "static")), name="static")
      
      logger.info(f"Frontend static files mounted from: {frontend_path}")
  else:
      logger.warning(f"Frontend directory not found: {frontend_path}")
  
be52af70   tangwang   first commit
349
350
  
  if __name__ == "__main__":
4d824a77   tangwang   所有租户共用一套统一配置.tena...
351
      parser = argparse.ArgumentParser(description='Start search API service (multi-tenant)')
be52af70   tangwang   first commit
352
      parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
2a76641e   tangwang   config
353
      parser.add_argument('--port', type=int, default=6002, help='Port to bind to')
be52af70   tangwang   first commit
354
355
356
357
      parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
      parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
      args = parser.parse_args()
  
4d824a77   tangwang   所有租户共用一套统一配置.tena...
358
      # Set environment variable
be52af70   tangwang   first commit
359
360
361
362
363
364
365
366
367
      os.environ['ES_HOST'] = args.es_host
  
      # Run server
      uvicorn.run(
          "api.app:app",
          host=args.host,
          port=args.port,
          reload=args.reload
      )