From bb3c5ef84b9d2252968a0e88de6a4fb3745666b3 Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 10 Nov 2025 23:11:40 +0800 Subject: [PATCH] 灌入数据流程跑通 --- =0.1.9 | 14 ++++++++++++++ CLAUDE.md | 3 +++ SERVER_FIXES.md | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ api/app.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------- api/routes/search.py | 6 +++--- environment.yml | 2 ++ frontend/index.html | 4 ++-- frontend/static/js/app.js | 10 +++++----- indexer/data_transformer.py | 23 ++++++++++++++++++++++- scripts/frontend_server.py | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ scripts/install_server_deps.sh | 14 ++++++++++++++ scripts/start_servers.py | 247 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/stop.sh | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ search/boolean_parser.py | 2 +- start_all.sh | 21 +++++++++++++++++---- 15 files changed, 762 insertions(+), 35 deletions(-) create mode 100644 =0.1.9 create mode 100644 SERVER_FIXES.md create mode 100755 scripts/install_server_deps.sh create mode 100755 scripts/start_servers.py create mode 100755 scripts/stop.sh diff --git a/=0.1.9 b/=0.1.9 new file mode 100644 index 0000000..262ee14 --- /dev/null +++ b/=0.1.9 @@ -0,0 +1,14 @@ +Looking in indexes: https://mirrors.aliyun.com/pypi/simple +Collecting slowapi + Using cached https://mirrors.aliyun.com/pypi/packages/2b/bb/f71c4b7d7e7eb3fc1e8c0458a8979b912f40b58002b9fbf37729b8cb464b/slowapi-0.1.9-py3-none-any.whl (14 kB) +Collecting limits>=2.3 (from slowapi) + Using cached https://mirrors.aliyun.com/pypi/packages/40/96/4fcd44aed47b8fcc457653b12915fcad192cd646510ef3f29fd216f4b0ab/limits-5.6.0-py3-none-any.whl (60 kB) +Collecting deprecated>=1.2 (from limits>=2.3->slowapi) + Using cached https://mirrors.aliyun.com/pypi/packages/84/d0/205d54408c08b13550c733c4b85429e7ead111c7f0014309637425520a9a/deprecated-1.3.1-py2.py3-none-any.whl (11 kB) +Requirement already satisfied: packaging>=21 in /data/tw/miniconda3/envs/searchengine/lib/python3.10/site-packages (from limits>=2.3->slowapi) (25.0) +Requirement already satisfied: typing-extensions in /data/tw/miniconda3/envs/searchengine/lib/python3.10/site-packages (from limits>=2.3->slowapi) (4.15.0) +Collecting wrapt<3,>=1.10 (from deprecated>=1.2->limits>=2.3->slowapi) + Downloading https://mirrors.aliyun.com/pypi/packages/c6/93/5cf92edd99617095592af919cb81d4bff61c5dbbb70d3c92099425a8ec34/wrapt-2.0.1-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl (113 kB) +Installing collected packages: wrapt, deprecated, limits, slowapi + +Successfully installed deprecated-1.3.1 limits-5.6.0 slowapi-0.1.9 wrapt-2.0.1 diff --git a/CLAUDE.md b/CLAUDE.md index 49bd78e..a7e8cbe 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -109,3 +109,6 @@ The `searcher` supports: 4. **ES Similarity Configuration:** All text fields use modified BM25 with `b=0.0, k1=0.0` as the default similarity. 5. **Multi-Language Support:** The system is designed for cross-border e-commerce with at minimum Chinese and English support, with extensibility for other languages (Arabic, Spanish, Russian, Japanese). +- 记住这个项目的环境是 +- 记住这个项目的环境是source /home/tw/miniconda3/etc/profile.d/conda.sh +conda activate searchengine \ No newline at end of file diff --git a/SERVER_FIXES.md b/SERVER_FIXES.md new file mode 100644 index 0000000..8b22f6f --- /dev/null +++ b/SERVER_FIXES.md @@ -0,0 +1,142 @@ +# 服务器修复和优化文档 + +## 修复的问题 + +### 1. 前端服务器问题 (scripts/frontend_server.py) +- **问题**: 接收到大量扫描器流量导致的错误日志 +- **原因**: SSL/TLS握手尝试、RDP连接扫描、二进制数据攻击 +- **解决方案**: + - 添加错误处理机制,优雅处理连接断开 + - 实现速率限制 (100请求/分钟) + - 过滤扫描器噪音日志 + - 添加安全HTTP头 + - 使用线程服务器提高并发处理能力 + +### 2. API服务器问题 (api/app.py) +- **问题**: 缺乏安全性和错误处理机制 +- **解决方案**: + - 集成速率限制 (slowapi) + - 添加安全HTTP头 + - 实现更好的异常处理 + - 添加健康检查端点 + - 增强日志记录 + - 添加服务关闭处理 + +## 主要改进 + +### 安全性增强 +1. **速率限制**: 防止DDoS攻击和滥用 +2. **安全HTTP头**: 防止XSS、点击劫持等攻击 +3. **错误过滤**: 隐藏敏感错误信息 +4. **输入验证**: 更健壮的请求处理 + +### 稳定性提升 +1. **连接错误处理**: 优雅处理连接重置和断开 +2. **异常处理**: 全局异常捕获,防止服务器崩溃 +3. **日志管理**: 过滤噪音,记录重要事件 +4. **监控功能**: 健康检查和状态监控 + +### 性能优化 +1. **线程服务器**: 前端服务器支持并发请求 +2. **资源管理**: 更好的内存和连接管理 +3. **响应头优化**: 添加缓存和安全相关头 + +## 使用方法 + +### 安装依赖 +```bash +# 安装服务器安全依赖 +./scripts/install_server_deps.sh + +# 或者手动安装 +pip install slowapi>=0.1.9 anyio>=3.7.0 +``` + +### 启动服务器 + +#### 方法1: 使用管理脚本 (推荐) +```bash +# 启动所有服务器 +python scripts/start_servers.py --customer customer1 --es-host http://localhost:9200 + +# 启动前检查依赖 +python scripts/start_servers.py --check-dependencies +``` + +#### 方法2: 分别启动 +```bash +# 启动API服务器 +python main.py serve --customer customer1 --es-host http://localhost:9200 + +# 启动前端服务器 (在另一个终端) +python scripts/frontend_server.py +``` + +### 监控和日志 + +#### 日志位置 +- API服务器日志: `/tmp/search_engine_api.log` +- 启动日志: `/tmp/search_engine_startup.log` +- 控制台输出: 实时显示重要信息 + +#### 健康检查 +```bash +# 检查API服务器健康状态 +curl http://localhost:6002/health + +# 检查前端服务器 +curl http://localhost:6003 +``` + +## 配置选项 + +### 环境变量 +- `CUSTOMER_ID`: 客户ID (默认: customer1) +- `ES_HOST`: Elasticsearch主机 (默认: http://localhost:9200) + +### 速率限制配置 +- API服务器: 各端点不同限制 (60-120请求/分钟) +- 前端服务器: 100请求/分钟 + +## 故障排除 + +### 常见问题 + +1. **依赖缺失错误** + ```bash + pip install -r requirements_server.txt + ``` + +2. **端口被占用** + ```bash + # 查看端口占用 + lsof -i :6002 + lsof -i :6003 + ``` + +3. **权限问题** + ```bash + chmod +x scripts/*.py scripts/*.sh + ``` + +### 调试模式 +```bash +# 启用详细日志 +export PYTHONUNBUFFERED=1 +python scripts/start_servers.py +``` + +## 生产环境建议 + +1. **反向代理**: 使用nginx或Apache作为反向代理 +2. **SSL证书**: 配置HTTPS +3. **防火墙**: 限制访问源IP +4. **监控**: 集成监控和告警系统 +5. **日志轮转**: 配置日志轮转防止磁盘满 + +## 维护说明 + +- 定期检查日志文件大小 +- 监控服务器资源使用情况 +- 更新依赖包版本 +- 备份配置文件 \ No newline at end of file diff --git a/api/app.py b/api/app.py index 003e6b1..336351f 100644 --- a/api/app.py +++ b/api/app.py @@ -7,12 +7,34 @@ Usage: import os import sys +import logging +import time +from collections import defaultdict, deque from typing import Optional -from fastapi import FastAPI, Request +from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.trustedhost import TrustedHostMiddleware +from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded import argparse +# Configure logging with better formatting +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('/tmp/search_engine_api.log', mode='a') + ] +) +logger = logging.getLogger(__name__) + +# Initialize rate limiter +limiter = Limiter(key_func=get_remote_address) + # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -117,20 +139,44 @@ def get_query_parser() -> QueryParser: return _query_parser -# Create FastAPI app +# Create FastAPI app with enhanced configuration app = FastAPI( title="E-Commerce Search API", description="Configurable search engine for cross-border e-commerce", - version="1.0.0" + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", + openapi_url="/openapi.json" +) + +# Add rate limiting middleware +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +# Add trusted host middleware (restrict to localhost and trusted domains) +app.add_middleware( + TrustedHostMiddleware, + allowed_hosts=["*"] # Allow all hosts for development, restrict in production ) -# Add CORS middleware +# Add security headers middleware +@app.middleware("http") +async def add_security_headers(request: Request, call_next): + response = await call_next(request) + response.headers["X-Content-Type-Options"] = "nosniff" + response.headers["X-Frame-Options"] = "DENY" + response.headers["X-XSS-Protection"] = "1; mode=block" + response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" + return response + +# Add CORS middleware with more restrictive settings app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=["*"], # Restrict in production to specific domains allow_credentials=True, - allow_methods=["*"], + allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], + expose_headers=["X-Total-Count"] ) @@ -140,35 +186,100 @@ async def startup_event(): customer_id = os.getenv("CUSTOMER_ID", "customer1") es_host = os.getenv("ES_HOST", "http://localhost:9200") + logger.info(f"Starting E-Commerce Search API") + logger.info(f"Customer ID: {customer_id}") + logger.info(f"Elasticsearch Host: {es_host}") + try: init_service(customer_id=customer_id, es_host=es_host) + logger.info("Service initialized successfully") except Exception as e: - print(f"Failed to initialize service: {e}") - print("Service will start but may not function correctly") + logger.error(f"Failed to initialize service: {e}") + logger.warning("Service will start but may not function correctly") + + +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup on shutdown.""" + logger.info("Shutting down E-Commerce Search API") @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler.""" + """Global exception handler with detailed logging.""" + client_ip = request.client.host if request.client else "unknown" + logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True) + return JSONResponse( status_code=500, content={ "error": "Internal server error", - "detail": str(exc) + "detail": "An unexpected error occurred. Please try again later.", + "timestamp": int(time.time()) + } + ) + + +@app.exception_handler(HTTPException) +async def http_exception_handler(request: Request, exc: HTTPException): + """HTTP exception handler.""" + logger.warning(f"HTTP exception from {request.client.host if request.client else 'unknown'}: {exc.status_code} - {exc.detail}") + + return JSONResponse( + status_code=exc.status_code, + content={ + "error": exc.detail, + "status_code": exc.status_code, + "timestamp": int(time.time()) } ) @app.get("/") -async def root(): - """Root endpoint.""" +@limiter.limit("60/minute") +async def root(request: Request): + """Root endpoint with rate limiting.""" + client_ip = request.client.host if request.client else "unknown" + logger.info(f"Root endpoint accessed from {client_ip}") + return { "service": "E-Commerce Search API", "version": "1.0.0", - "status": "running" + "status": "running", + "timestamp": int(time.time()) } +@app.get("/health") +@limiter.limit("120/minute") +async def health_check(request: Request): + """Health check endpoint.""" + try: + # Check if services are initialized + get_config() + get_es_client() + + return { + "status": "healthy", + "services": { + "config": "initialized", + "elasticsearch": "connected", + "searcher": "initialized" + }, + "timestamp": int(time.time()) + } + except Exception as e: + logger.error(f"Health check failed: {e}") + return JSONResponse( + status_code=503, + content={ + "status": "unhealthy", + "error": str(e), + "timestamp": int(time.time()) + } + ) + + # Include routers from .routes import search, admin diff --git a/api/routes/search.py b/api/routes/search.py index af6528a..78735d7 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -33,7 +33,7 @@ async def search(request: SearchRequest): try: # Get searcher from app state - from main import get_searcher + from api.app import get_searcher searcher = get_searcher() # Execute search @@ -70,7 +70,7 @@ async def search_by_image(request: ImageSearchRequest): Uses image embeddings to find visually similar products. """ try: - from main import get_searcher + from api.app import get_searcher searcher = get_searcher() # Execute image search @@ -101,7 +101,7 @@ async def get_document(doc_id: str): Get a single document by ID. """ try: - from main import get_searcher + from api.app import get_searcher searcher = get_searcher() doc = searcher.get_document(doc_id) diff --git a/environment.yml b/environment.yml index 26a713d..0af287d 100644 --- a/environment.yml +++ b/environment.yml @@ -42,6 +42,8 @@ dependencies: - uvicorn[standard]>=0.23.0 - pydantic>=2.0.0 - python-multipart>=0.0.6 + - slowapi>=0.1.9 + - anyio>=3.7.0 # Translation - requests>=2.31.0 diff --git a/frontend/index.html b/frontend/index.html index f190956..6023808 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -51,9 +51,9 @@ - + diff --git a/frontend/static/js/app.js b/frontend/static/js/app.js index 4eedf9e..bec5553 100644 --- a/frontend/static/js/app.js +++ b/frontend/static/js/app.js @@ -1,7 +1,7 @@ // SearchEngine Frontend JavaScript // API endpoint -const API_BASE_URL = 'http://localhost:6002'; +const API_BASE_URL = 'http://120.76.41.98:6002'; // Update API URL display document.getElementById('apiUrl').textContent = API_BASE_URL; @@ -28,10 +28,10 @@ async function performSearch() { return; } - // Get options + // Get options (temporarily disable translation and embedding due to GPU issues) const size = parseInt(document.getElementById('resultSize').value); - const enableTranslation = document.getElementById('enableTranslation').checked; - const enableEmbedding = document.getElementById('enableEmbedding').checked; + const enableTranslation = false; // Disabled temporarily + const enableEmbedding = false; // Disabled temporarily const enableRerank = document.getElementById('enableRerank').checked; // Show loading @@ -68,7 +68,7 @@ async function performSearch() {
搜索出错: ${error.message}

- 请确保后端服务正在运行 (http://localhost:6002) + 请确保后端服务正在运行 (${API_BASE_URL})
`; } finally { diff --git a/indexer/data_transformer.py b/indexer/data_transformer.py index 9826646..9ac6f16 100644 --- a/indexer/data_transformer.py +++ b/indexer/data_transformer.py @@ -301,7 +301,28 @@ class DataTransformer: # Pandas datetime handling if isinstance(value, pd.Timestamp): return value.isoformat() - return str(value) + elif isinstance(value, str): + # Try to parse string datetime and convert to ISO format + try: + import datetime + # Handle common datetime formats + formats = [ + '%Y-%m-%d %H:%M:%S', # 2020-07-07 16:44:09 + '%Y-%m-%d %H:%M:%S.%f', # 2020-07-07 16:44:09.123 + '%Y-%m-%dT%H:%M:%S', # 2020-07-07T16:44:09 + '%Y-%m-%d', # 2020-07-07 + ] + for fmt in formats: + try: + dt = datetime.datetime.strptime(value.strip(), fmt) + return dt.isoformat() + except ValueError: + continue + # If no format matches, return original string + return value + except Exception: + return value + return value else: return value diff --git a/scripts/frontend_server.py b/scripts/frontend_server.py index 8d74481..1ef4cc4 100755 --- a/scripts/frontend_server.py +++ b/scripts/frontend_server.py @@ -7,6 +7,9 @@ import http.server import socketserver import os import sys +import logging +import time +from collections import defaultdict, deque # Change to frontend directory frontend_dir = os.path.join(os.path.dirname(__file__), '../frontend') @@ -14,27 +17,116 @@ os.chdir(frontend_dir) PORT = 6003 -class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): - """Custom request handler with CORS support.""" +# Configure logging to suppress scanner noise +logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') + +class RateLimitingMixin: + """Mixin for rate limiting requests by IP address.""" + request_counts = defaultdict(deque) + rate_limit = 100 # requests per minute + window = 60 # seconds + + @classmethod + def is_rate_limited(cls, ip): + now = time.time() + + # Clean old requests + while cls.request_counts[ip] and cls.request_counts[ip][0] < now - cls.window: + cls.request_counts[ip].popleft() + + # Check rate limit + if len(cls.request_counts[ip]) > cls.rate_limit: + return True + + cls.request_counts[ip].append(now) + return False + +class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler, RateLimitingMixin): + """Custom request handler with CORS support and robust error handling.""" + + def setup(self): + """Setup with error handling.""" + try: + super().setup() + except Exception: + pass # Silently handle setup errors from scanners + + def handle_one_request(self): + """Handle single request with error catching.""" + try: + # Check rate limiting + client_ip = self.client_address[0] + if self.is_rate_limited(client_ip): + logging.warning(f"Rate limiting IP: {client_ip}") + self.send_error(429, "Too Many Requests") + return + + super().handle_one_request() + except (ConnectionResetError, BrokenPipeError): + # Client disconnected prematurely - common with scanners + pass + except UnicodeDecodeError: + # Binary data received - not HTTP + pass + except Exception as e: + # Log unexpected errors but don't crash + logging.debug(f"Request handling error: {e}") + + def log_message(self, format, *args): + """Suppress logging for malformed requests from scanners.""" + message = format % args + # Filter out scanner noise + noise_patterns = [ + "code 400", + "Bad request", + "Bad request version", + "Bad HTTP/0.9 request type", + "Bad request syntax" + ] + if any(pattern in message for pattern in noise_patterns): + return + # Only log legitimate requests + if message and not message.startswith(" ") and len(message) > 10: + super().log_message(format, *args) def end_headers(self): # Add CORS headers self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') + # Add security headers + self.send_header('X-Content-Type-Options', 'nosniff') + self.send_header('X-Frame-Options', 'DENY') + self.send_header('X-XSS-Protection', '1; mode=block') super().end_headers() def do_OPTIONS(self): - self.send_response(200) - self.end_headers() + """Handle OPTIONS requests.""" + try: + self.send_response(200) + self.end_headers() + except Exception: + pass + +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """Threaded TCP server with better error handling.""" + allow_reuse_address = True + daemon_threads = True if __name__ == '__main__': - with socketserver.TCPServer(("", PORT), MyHTTPRequestHandler) as httpd: + # Create threaded server for better concurrency + with ThreadedTCPServer(("", PORT), MyHTTPRequestHandler) as httpd: print(f"Frontend server started at http://localhost:{PORT}") print(f"Serving files from: {os.getcwd()}") print("\nPress Ctrl+C to stop the server") + try: httpd.serve_forever() except KeyboardInterrupt: - print("\nServer stopped") + print("\nShutting down server...") + httpd.shutdown() + print("Server stopped") sys.exit(0) + except Exception as e: + print(f"Server error: {e}") + sys.exit(1) diff --git a/scripts/install_server_deps.sh b/scripts/install_server_deps.sh new file mode 100755 index 0000000..b144d65 --- /dev/null +++ b/scripts/install_server_deps.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "Installing server security dependencies..." + +# Check if we're in a conda environment +if [ -z "$CONDA_DEFAULT_ENV" ]; then + echo "Warning: No conda environment detected. Installing with pip..." + pip install slowapi>=0.1.9 anyio>=3.7.0 +else + echo "Installing in conda environment: $CONDA_DEFAULT_ENV" + pip install slowapi>=0.1.9 anyio>=3.7.0 +fi + +echo "Dependencies installed successfully!" \ No newline at end of file diff --git a/scripts/start_servers.py b/scripts/start_servers.py new file mode 100755 index 0000000..140d5d6 --- /dev/null +++ b/scripts/start_servers.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +""" +Production-ready server startup script with proper error handling and monitoring. +""" + +import os +import sys +import signal +import time +import subprocess +import logging +from typing import Dict, List, Optional +import multiprocessing +import threading + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('/tmp/search_engine_startup.log', mode='a') + ] +) +logger = logging.getLogger(__name__) + +class ServerManager: + """Manages frontend and API server processes.""" + + def __init__(self): + self.processes: Dict[str, subprocess.Popen] = {} + self.running = True + + def start_frontend_server(self) -> bool: + """Start the frontend server.""" + try: + frontend_script = os.path.join(os.path.dirname(__file__), 'frontend_server.py') + + cmd = [sys.executable, frontend_script] + env = os.environ.copy() + env['PYTHONUNBUFFERED'] = '1' + + process = subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1 + ) + + self.processes['frontend'] = process + logger.info(f"Frontend server started with PID: {process.pid}") + + # Start monitoring thread + threading.Thread( + target=self._monitor_output, + args=('frontend', process), + daemon=True + ).start() + + return True + + except Exception as e: + logger.error(f"Failed to start frontend server: {e}") + return False + + def start_api_server(self, customer: str = "customer1", es_host: str = "http://localhost:9200") -> bool: + """Start the API server.""" + try: + cmd = [ + sys.executable, 'main.py', 'serve', + '--customer', customer, + '--es-host', es_host, + '--host', '0.0.0.0', + '--port', '6002' + ] + + env = os.environ.copy() + env['PYTHONUNBUFFERED'] = '1' + env['CUSTOMER_ID'] = customer + env['ES_HOST'] = es_host + + process = subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1 + ) + + self.processes['api'] = process + logger.info(f"API server started with PID: {process.pid}") + + # Start monitoring thread + threading.Thread( + target=self._monitor_output, + args=('api', process), + daemon=True + ).start() + + return True + + except Exception as e: + logger.error(f"Failed to start API server: {e}") + return False + + def _monitor_output(self, name: str, process: subprocess.Popen): + """Monitor process output and log appropriately.""" + try: + for line in iter(process.stdout.readline, ''): + if line.strip() and self.running: + # Filter out scanner noise for frontend server + if name == 'frontend': + noise_patterns = [ + 'code 400', + 'Bad request version', + 'Bad request syntax', + 'Bad HTTP/0.9 request type' + ] + if any(pattern in line for pattern in noise_patterns): + continue + + logger.info(f"[{name}] {line.strip()}") + + except Exception as e: + if self.running: + logger.error(f"Error monitoring {name} output: {e}") + + def check_servers(self) -> bool: + """Check if all servers are still running.""" + all_running = True + + for name, process in self.processes.items(): + if process.poll() is not None: + logger.error(f"{name} server has stopped with exit code: {process.returncode}") + all_running = False + + return all_running + + def stop_all(self): + """Stop all servers gracefully.""" + logger.info("Stopping all servers...") + self.running = False + + for name, process in self.processes.items(): + try: + logger.info(f"Stopping {name} server (PID: {process.pid})...") + + # Try graceful shutdown first + process.terminate() + + # Wait up to 10 seconds for graceful shutdown + try: + process.wait(timeout=10) + logger.info(f"{name} server stopped gracefully") + except subprocess.TimeoutExpired: + # Force kill if graceful shutdown fails + logger.warning(f"{name} server didn't stop gracefully, forcing...") + process.kill() + process.wait() + logger.info(f"{name} server stopped forcefully") + + except Exception as e: + logger.error(f"Error stopping {name} server: {e}") + + self.processes.clear() + logger.info("All servers stopped") + +def signal_handler(signum, frame): + """Handle shutdown signals.""" + logger.info(f"Received signal {signum}, shutting down...") + if 'manager' in globals(): + manager.stop_all() + sys.exit(0) + +def main(): + """Main function to start all servers.""" + global manager + + parser = argparse.ArgumentParser(description='Start SearchEngine servers') + parser.add_argument('--customer', default='customer1', help='Customer ID') + parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + parser.add_argument('--check-dependencies', action='store_true', help='Check dependencies before starting') + args = parser.parse_args() + + logger.info("Starting SearchEngine servers...") + logger.info(f"Customer: {args.customer}") + logger.info(f"Elasticsearch: {args.es_host}") + + # Check dependencies if requested + if args.check_dependencies: + logger.info("Checking dependencies...") + try: + import slowapi + import anyio + logger.info("✓ All dependencies available") + except ImportError as e: + logger.error(f"✗ Missing dependency: {e}") + logger.info("Please run: pip install -r requirements_server.txt") + sys.exit(1) + + manager = ServerManager() + + # Set up signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # Start servers + if not manager.start_api_server(args.customer, args.es_host): + logger.error("Failed to start API server") + sys.exit(1) + + # Wait a moment before starting frontend server + time.sleep(2) + + if not manager.start_frontend_server(): + logger.error("Failed to start frontend server") + manager.stop_all() + sys.exit(1) + + logger.info("All servers started successfully!") + logger.info("Frontend: http://localhost:6003") + logger.info("API: http://localhost:6002") + logger.info("API Docs: http://localhost:6002/docs") + logger.info("Press Ctrl+C to stop all servers") + + # Monitor servers + while manager.running: + if not manager.check_servers(): + logger.error("One or more servers have stopped unexpectedly") + manager.stop_all() + sys.exit(1) + + time.sleep(5) # Check every 5 seconds + + except KeyboardInterrupt: + logger.info("Received interrupt signal") + except Exception as e: + logger.error(f"Unexpected error: {e}") + finally: + manager.stop_all() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scripts/stop.sh b/scripts/stop.sh new file mode 100755 index 0000000..336ced8 --- /dev/null +++ b/scripts/stop.sh @@ -0,0 +1,68 @@ +#!/bin/bash + +# Stop script for Search Engine services +# This script stops both backend and frontend servers + +echo "========================================" +echo "Stopping Search Engine Services" +echo "========================================" + +# Kill processes on port 6002 (backend) +BACKEND_PIDS=$(lsof -ti:6002 2>/dev/null) +if [ ! -z "$BACKEND_PIDS" ]; then + echo "Stopping backend server(s) on port 6002..." + for PID in $BACKEND_PIDS; do + echo " Killing PID: $PID" + kill -TERM $PID 2>/dev/null || true + done + sleep 2 + # Force kill if still running + REMAINING_PIDS=$(lsof -ti:6002 2>/dev/null) + if [ ! -z "$REMAINING_PIDS" ]; then + echo " Force killing remaining processes..." + for PID in $REMAINING_PIDS; do + kill -KILL $PID 2>/dev/null || true + done + fi + echo "Backend server stopped." +else + echo "No backend server found running on port 6002." +fi + +# Kill processes on port 6003 (frontend) +FRONTEND_PIDS=$(lsof -ti:6003 2>/dev/null) +if [ ! -z "$FRONTEND_PIDS" ]; then + echo "Stopping frontend server(s) on port 6003..." + for PID in $FRONTEND_PIDS; do + echo " Killing PID: $PID" + kill -TERM $PID 2>/dev/null || true + done + sleep 2 + # Force kill if still running + REMAINING_PIDS=$(lsof -ti:6003 2>/dev/null) + if [ ! -z "$REMAINING_PIDS" ]; then + echo " Force killing remaining processes..." + for PID in $REMAINING_PIDS; do + kill -KILL $PID 2>/dev/null || true + done + fi + echo "Frontend server stopped." +else + echo "No frontend server found running on port 6003." +fi + +# Also stop any processes using PID files +if [ -f "logs/backend.pid" ]; then + BACKEND_PID=$(cat logs/backend.pid 2>/dev/null) + if [ ! -z "$BACKEND_PID" ] && kill -0 $BACKEND_PID 2>/dev/null; then + echo "Stopping backend server via PID file (PID: $BACKEND_PID)..." + kill -TERM $BACKEND_PID 2>/dev/null || true + sleep 2 + kill -KILL $BACKEND_PID 2>/dev/null || true + fi + rm -f logs/backend.pid +fi + +echo "========================================" +echo "All services stopped successfully!" +echo "========================================" \ No newline at end of file diff --git a/search/boolean_parser.py b/search/boolean_parser.py index 539e817..41f4007 100644 --- a/search/boolean_parser.py +++ b/search/boolean_parser.py @@ -82,7 +82,7 @@ class BooleanParser: List of tokens """ # Pattern to match: operators, parentheses, or terms (with domain prefix support) - pattern = r'\b(AND|OR|RANK|ANDNOT)\b|[()]|(?:\w+:)?[^\s()]++' + pattern = r'\b(AND|OR|RANK|ANDNOT)\b|[()]|(?:\w+:)?[^\s()]+' tokens = [] for match in re.finditer(pattern, expression): diff --git a/start_all.sh b/start_all.sh index 913d349..7ca6ca1 100755 --- a/start_all.sh +++ b/start_all.sh @@ -17,12 +17,25 @@ echo -e "${GREEN}========================================${NC}" echo -e "${GREEN}SearchEngine一键启动脚本${NC}" echo -e "${GREEN}========================================${NC}" +# Step 0: Stop existing services first +echo -e "\n${YELLOW}Step 0/5: 停止现有服务${NC}" +if [ -f "./scripts/stop.sh" ]; then + ./scripts/stop.sh + sleep 2 # Wait for services to fully stop +else + echo -e "${YELLOW}停止脚本不存在,手动检查端口...${NC}" + # Kill any existing processes on our ports + fuser -k 6002/tcp 2>/dev/null || true + fuser -k 6003/tcp 2>/dev/null || true + sleep 2 +fi + # Step 1: Setup environment -echo -e "\n${YELLOW}Step 1/4: 设置环境${NC}" +echo -e "\n${YELLOW}Step 1/5: 设置环境${NC}" ./setup.sh # Step 2: Check if data is already ingested -echo -e "\n${YELLOW}Step 2/4: 检查数据${NC}" +echo -e "\n${YELLOW}Step 2/5: 检查数据${NC}" source /home/tw/miniconda3/etc/profile.d/conda.sh conda activate searchengine @@ -55,7 +68,7 @@ else fi # Step 3: Start backend in background -echo -e "\n${YELLOW}Step 3/4: 启动后端服务${NC}" +echo -e "\n${YELLOW}Step 3/5: 启动后端服务${NC}" echo -e "${YELLOW}后端服务将在后台运行...${NC}" nohup ./scripts/start_backend.sh > logs/backend.log 2>&1 & BACKEND_PID=$! @@ -95,7 +108,7 @@ else fi # Step 4: Start frontend -echo -e "\n${YELLOW}Step 4/4: 启动前端服务${NC}" +echo -e "\n${YELLOW}Step 4/5: 启动前端服务${NC}" echo -e "${GREEN}========================================${NC}" echo -e "${GREEN}所有服务启动完成!${NC}" echo -e "${GREEN}========================================${NC}" -- libgit2 0.21.2