From bb9c626c6ff671be6d23a2b92e0f5eb8a43b135b Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 18 Dec 2025 17:40:44 +0800 Subject: [PATCH] 搜索服务(6002)不再初始化/挂载 /indexer/* 路由,避免索引阻塞线上搜索 新增 api/indexer_app.py,在独立进程(默认 6004)中初始化 ES + DB + 索引服务,并复用 api/routes/indexer.py 一套路由 新增 api/service_registry.py,通过注册表向索引路由注入 ES 客户端和索引服务,消除重复代码与循环依赖 main.py 增加 serve-indexer 子命令;scripts/start.sh / stop.sh / start_backend.sh / start_indexer.sh 支持独立管理索引进程 文档中所有索引相关示例由 6002/indexer/* 统一调整为 6004/indexer/* --- a.sh | 1 + api/app.py | 62 ++++++-------------------------------------------------------- api/indexer_app.py | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ api/routes/indexer.py | 10 ++++------ api/service_registry.py | 43 +++++++++++++++++++++++++++++++++++++++++++ docs/搜索API对接指南.md | 18 +++++++++--------- docs/索引数据接口文档.md | 8 ++++---- main.py | 40 +++++++++++++++++++++++++++++++++++++--- scripts/amazon_xlsx_to_shoplazza_xlsx.py | 4 ++-- scripts/start.sh | 50 +++++++++++++++++++++++++++++++++++++++++--------- scripts/start_backend.sh | 5 +++-- scripts/start_indexer.sh | 46 ++++++++++++++++++++++++++++++++++++++++++++++ scripts/stop.sh | 33 ++++++++++++++++++++++++++++++++- scripts/tenant3__csv_to_shoplazza_xlsx.sh | 0 14 files changed, 464 insertions(+), 92 deletions(-) create mode 100644 a.sh create mode 100644 api/indexer_app.py create mode 100644 api/service_registry.py create mode 100755 scripts/start_indexer.sh mode change 100644 => 100755 scripts/tenant3__csv_to_shoplazza_xlsx.sh diff --git a/a.sh b/a.sh new file mode 100644 index 0000000..88b0e2f --- /dev/null +++ b/a.sh @@ -0,0 +1 @@ +curl -X POST "http://localhost:6004/indexer/reindex" -H "Content-Type: application/json" -d '{"tenant_id":"162","batch_size":500}' diff --git a/api/app.py b/api/app.py index 773bff5..3a5255e 100644 --- a/api/app.py +++ b/api/app.py @@ -46,18 +46,15 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config.env_config import ES_CONFIG, DB_CONFIG from config import ConfigLoader from utils import ESClient -from utils.db_connector import create_db_connection from search import Searcher from query import QueryParser -from indexer.incremental_service import IncrementalIndexerService +from .service_registry import set_es_client # Global instances _es_client: Optional[ESClient] = None _searcher: Optional[Searcher] = None _query_parser: Optional[QueryParser] = None _config = None -_incremental_service: Optional[IncrementalIndexerService] = None -_bulk_indexing_service = None def init_service(es_host: str = "http://localhost:9200"): @@ -67,7 +64,7 @@ def init_service(es_host: str = "http://localhost:9200"): Args: es_host: Elasticsearch host URL """ - global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service + global _es_client, _searcher, _query_parser, _config start_time = time.time() logger.info("Initializing search service (multi-tenant)") @@ -92,6 +89,8 @@ def init_service(es_host: str = "http://localhost:9200"): if not _es_client.ping(): raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}") logger.info("Elasticsearch connected") + # expose ES client for any shared components (e.g. searcher) + set_es_client(_es_client) # Initialize components logger.info("Initializing query parser...") @@ -100,44 +99,6 @@ def init_service(es_host: str = "http://localhost:9200"): logger.info("Initializing searcher...") _searcher = Searcher(_es_client, _config, _query_parser) - # Initialize indexing services (if DB config is available) - try: - from utils.db_connector import create_db_connection - from indexer.incremental_service import IncrementalIndexerService - from indexer.bulk_indexing_service import BulkIndexingService - - db_host = os.getenv('DB_HOST') - db_port = int(os.getenv('DB_PORT', 3306)) - db_database = os.getenv('DB_DATABASE') - db_username = os.getenv('DB_USERNAME') - db_password = os.getenv('DB_PASSWORD') - - if all([db_host, db_database, db_username, db_password]): - logger.info("Initializing database connection for indexing services...") - db_engine = create_db_connection( - host=db_host, - port=db_port, - database=db_database, - username=db_username, - password=db_password - ) - - # Initialize incremental service - _incremental_service = IncrementalIndexerService(db_engine) - logger.info("Incremental indexer service initialized") - - # Initialize bulk indexing service - _bulk_indexing_service = BulkIndexingService(db_engine, _es_client) - logger.info("Bulk indexing service initialized") - else: - logger.warning("Database config incomplete, indexing services will not be available") - logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") - except Exception as e: - logger.warning(f"Failed to initialize indexing services: {e}") - logger.warning("Indexing endpoints will not be available") - _incremental_service = None - _bulk_indexing_service = None - elapsed = time.time() - start_time logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}") @@ -172,16 +133,6 @@ def get_config(): return _config -def get_incremental_service() -> Optional[IncrementalIndexerService]: - """Get incremental indexer service instance.""" - return _incremental_service - - -def get_bulk_indexing_service(): - """Get bulk indexing service instance.""" - return _bulk_indexing_service - - # Create FastAPI app with enhanced configuration app = FastAPI( title="E-Commerce Search API", @@ -320,12 +271,11 @@ async def health_check(request: Request): ) -# Include routers -from .routes import search, admin, indexer +# Include routers (search app should NOT mount indexer routes) +from .routes import search, admin app.include_router(search.router) app.include_router(admin.router) -app.include_router(indexer.router) # Mount static files and serve frontend frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend") diff --git a/api/indexer_app.py b/api/indexer_app.py new file mode 100644 index 0000000..2de6c51 --- /dev/null +++ b/api/indexer_app.py @@ -0,0 +1,236 @@ +""" +FastAPI application dedicated to indexing (separate from search API). + +This service mounts ONLY ONE copy of indexer routes: api/routes/indexer.py +and injects required services via api/service_registry.py. + +Usage: + uvicorn api.indexer_app:app --host 0.0.0.0 --port 6004 --reload + +This service only exposes /indexer/* routes and can be run in a separate +process so that heavy indexing work does not block online search traffic. +""" + +import os +import sys +import logging +import time +from typing import Optional + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +# Configure logging +import pathlib + +log_dir = pathlib.Path("logs") +log_dir.mkdir(exist_ok=True) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(log_dir / "indexer_api.log", mode="a", encoding="utf-8"), + ], +) +logger = logging.getLogger(__name__) + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from config.env_config import ES_CONFIG # noqa: E402 +from config import ConfigLoader # noqa: E402 +from utils import ESClient # noqa: E402 +from utils.db_connector import create_db_connection # noqa: E402 +from indexer.incremental_service import IncrementalIndexerService # noqa: E402 +from indexer.bulk_indexing_service import BulkIndexingService # noqa: E402 +from .routes import indexer as indexer_routes # noqa: E402 +from .service_registry import set_es_client, set_indexer_services # noqa: E402 + + +_es_client: Optional[ESClient] = None +_config = None +_incremental_service: Optional[IncrementalIndexerService] = None +_bulk_indexing_service: Optional[BulkIndexingService] = None + + +def init_indexer_service(es_host: str = "http://localhost:9200"): + """ + Initialize indexing services (ES client + DB + indexers). + + This mirrors the indexing-related initialization logic in api.app.init_service + but without search-related components. + """ + global _es_client, _config, _incremental_service, _bulk_indexing_service + + start_time = time.time() + logger.info("Initializing Indexer service") + + # Load configuration (kept for parity/logging; indexer routes don't depend on it) + logger.info("Loading configuration...") + config_loader = ConfigLoader("config/config.yaml") + _config = config_loader.load_config() + logger.info("Configuration loaded") + + # Get ES credentials + es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username") + es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password") + + # Connect to Elasticsearch + logger.info(f"Connecting to Elasticsearch at {es_host} for indexer...") + if es_username and es_password: + _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password) + else: + _es_client = ESClient(hosts=[es_host]) + + if not _es_client.ping(): + raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}") + logger.info("Elasticsearch connected for indexer") + # publish ES client for routes + set_es_client(_es_client) + + # Initialize indexing services (DB is required here) + db_host = os.getenv("DB_HOST") + db_port = int(os.getenv("DB_PORT", 3306)) + db_database = os.getenv("DB_DATABASE") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + + if all([db_host, db_database, db_username, db_password]): + logger.info("Initializing database connection for indexing services...") + db_engine = create_db_connection( + host=db_host, + port=db_port, + database=db_database, + username=db_username, + password=db_password, + ) + + _incremental_service = IncrementalIndexerService(db_engine) + _bulk_indexing_service = BulkIndexingService(db_engine, _es_client) + set_indexer_services( + incremental_service=_incremental_service, + bulk_indexing_service=_bulk_indexing_service, + ) + logger.info("Indexer services initialized (incremental + bulk)") + else: + missing = [ + name + for name, value in [ + ("DB_HOST", db_host), + ("DB_DATABASE", db_database), + ("DB_USERNAME", db_username), + ("DB_PASSWORD", db_password), + ] + if not value + ] + logger.warning( + "Database config incomplete for indexer, services will not be available. " + f"Missing: {', '.join(missing)}" + ) + _incremental_service = None + _bulk_indexing_service = None + + elapsed = time.time() - start_time + logger.info(f"Indexer service ready! (took {elapsed:.2f}s)") + + # NOTE: we intentionally do NOT synchronize anything into api.app + # to avoid code/route duplication and accidental availability on port 6002. + + +app = FastAPI( + title="E-Commerce Indexer API", + description="Dedicated indexing service for SearchEngine", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", + openapi_url="/openapi.json", +) + + +@app.on_event("startup") +async def startup_event(): + es_host = os.getenv("ES_HOST", "http://localhost:9200") + logger.info("Starting Indexer API service") + logger.info(f"Elasticsearch Host: {es_host}") + try: + init_indexer_service(es_host=es_host) + logger.info("Indexer service initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize indexer service: {e}", exc_info=True) + logger.warning("Indexer service will start but may not function correctly") + + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("Shutting down Indexer API service") + + +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Global exception handler with basic logging.""" + client_ip = request.client.host if request.client else "unknown" + logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True) + + return JSONResponse( + status_code=500, + content={ + "error": "Internal server error", + "detail": "An unexpected error occurred in indexer service.", + "timestamp": int(time.time()), + }, + ) + + +@app.get("/health") +async def health_check(): + """Simple health check for indexer service.""" + try: + # ensure ES is reachable (best-effort) + if _es_client is None: + raise RuntimeError("ES client is not initialized") + return { + "status": "healthy", + "services": { + "elasticsearch": "connected", + "incremental_indexer": "initialized" if _incremental_service else "unavailable", + "bulk_indexer": "initialized" if _bulk_indexing_service else "unavailable", + }, + "timestamp": int(time.time()), + } + except Exception as e: + logger.error(f"Indexer health check failed: {e}") + return JSONResponse( + status_code=503, + content={ + "status": "unhealthy", + "error": str(e), + "timestamp": int(time.time()), + }, + ) + + +# Mount the single source of truth indexer routes +app.include_router(indexer_routes.router) + + +if __name__ == "__main__": + import argparse + import uvicorn + + parser = argparse.ArgumentParser(description="Start Indexer API service") + parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") + parser.add_argument("--port", type=int, default=6004, help="Port to bind to") + parser.add_argument("--es-host", default="http://localhost:9200", help="Elasticsearch host") + parser.add_argument("--reload", action="store_true", help="Enable auto-reload") + args = parser.parse_args() + + os.environ["ES_HOST"] = args.es_host + + uvicorn.run( + "api.indexer_app:app", + host=args.host, + port=args.port, + reload=args.reload, + ) + diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 937fbdf..1cd17a6 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -8,6 +8,10 @@ from fastapi import APIRouter, HTTPException from typing import List from pydantic import BaseModel import logging +from sqlalchemy import text + +# Indexer routes depend on services provided by api/indexer_app.py via this registry. +from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client logger = logging.getLogger(__name__) @@ -42,7 +46,6 @@ async def reindex_all(request: ReindexRequest): 将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。 """ try: - from ..app import get_bulk_indexing_service service = get_bulk_indexing_service() if service is None: raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") @@ -81,8 +84,6 @@ async def index_spus(request: IndexSpusRequest): - 最后给出总体统计:total, success_count, failed_count等 """ try: - from ..app import get_incremental_service, get_es_client - # 验证请求参数 if not request.spu_ids and not request.delete_spu_ids: raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty") @@ -126,7 +127,6 @@ async def get_documents(request: GetDocumentsRequest): 根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。 """ try: - from ..app import get_incremental_service if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 100: @@ -165,8 +165,6 @@ async def get_documents(request: GetDocumentsRequest): async def indexer_health_check(): """检查索引服务健康状态""" try: - from ..app import get_incremental_service - from sqlalchemy import text service = get_incremental_service() if service is None: return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}} diff --git a/api/service_registry.py b/api/service_registry.py new file mode 100644 index 0000000..4f79b1e --- /dev/null +++ b/api/service_registry.py @@ -0,0 +1,43 @@ +""" +Minimal service registry shared by multiple FastAPI apps. + +We keep only ONE copy of indexer routes (api/routes/indexer.py) and inject +services via this registry, so the same routes can run in: +- Search API (6002): does NOT register indexer services, and does NOT mount indexer routes. +- Indexer API (6004): registers indexer services and mounts indexer routes. +""" + +from __future__ import annotations + +from typing import Any, Optional + + +_es_client: Optional[Any] = None +_incremental_service: Optional[Any] = None +_bulk_indexing_service: Optional[Any] = None + + +def set_es_client(es_client: Any) -> None: + global _es_client + _es_client = es_client + + +def get_es_client() -> Any: + if _es_client is None: + raise RuntimeError("ES client is not initialized") + return _es_client + + +def set_indexer_services(*, incremental_service: Any, bulk_indexing_service: Any) -> None: + global _incremental_service, _bulk_indexing_service + _incremental_service = incremental_service + _bulk_indexing_service = bulk_indexing_service + + +def get_incremental_service() -> Optional[Any]: + return _incremental_service + + +def get_bulk_indexing_service() -> Optional[Any]: + return _bulk_indexing_service + diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 928e20d..e1c662b 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -839,7 +839,7 @@ curl "http://localhost:6002/search/12345" **首次索引(重建索引)**: ```bash -curl -X POST "http://localhost:6002/indexer/reindex" \ +curl -X POST "http://localhost:6004/indexer/reindex" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -859,7 +859,7 @@ tail -f logs/*.log **增量更新(不重建索引)**: ```bash -curl -X POST "http://localhost:6002/indexer/reindex" \ +curl -X POST "http://localhost:6004/indexer/reindex" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1015,7 +1015,7 @@ cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp **示例1:普通增量索引(自动检测删除)**: ```bash -curl -X POST "http://localhost:6002/indexer/index" \ +curl -X POST "http://localhost:6004/indexer/index" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1026,7 +1026,7 @@ curl -X POST "http://localhost:6002/indexer/index" \ **示例2:显式删除(批量删除)**: ```bash -curl -X POST "http://localhost:6002/indexer/index" \ +curl -X POST "http://localhost:6004/indexer/index" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1038,7 +1038,7 @@ curl -X POST "http://localhost:6002/indexer/index" \ **示例3:仅删除(不索引)**: ```bash -curl -X POST "http://localhost:6002/indexer/index" \ +curl -X POST "http://localhost:6004/indexer/index" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1050,7 +1050,7 @@ curl -X POST "http://localhost:6002/indexer/index" \ **示例4:混合操作(索引+删除)**: ```bash -curl -X POST "http://localhost:6002/indexer/index" \ +curl -X POST "http://localhost:6004/indexer/index" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1133,7 +1133,7 @@ curl -X POST "http://localhost:6002/indexer/index" \ **单个SPU查询**: ```bash -curl -X POST "http://localhost:6002/indexer/documents" \ +curl -X POST "http://localhost:6004/indexer/documents" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1143,7 +1143,7 @@ curl -X POST "http://localhost:6002/indexer/documents" \ **批量SPU查询**: ```bash -curl -X POST "http://localhost:6002/indexer/documents" \ +curl -X POST "http://localhost:6004/indexer/documents" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "162", @@ -1182,7 +1182,7 @@ curl -X POST "http://localhost:6002/indexer/documents" \ #### 请求示例 ```bash -curl -X GET "http://localhost:6002/indexer/health" +curl -X GET "http://localhost:6004/indexer/health" ``` --- diff --git a/docs/索引数据接口文档.md b/docs/索引数据接口文档.md index 8e53cbe..8e968b6 100644 --- a/docs/索引数据接口文档.md +++ b/docs/索引数据接口文档.md @@ -250,12 +250,12 @@ GET /indexer/spu/{spu_id}?tenant_id={tenant_id} ```bash # cURL -curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1" +curl -X GET "http://localhost:6004/indexer/spu/123?tenant_id=1" # Java (OkHttp) OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() - .url("http://localhost:6002/indexer/spu/123?tenant_id=1") + .url("http://localhost:6004/indexer/spu/123?tenant_id=1") .get() .build(); Response response = client.newCall(request).execute(); @@ -412,7 +412,7 @@ public void onProductChange(ProductChangeEvent event) { String spuId = event.getSpuId(); // 调用增量接口获取ES文档数据 - String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); Map esDoc = httpClient.get(url); // 推送到ES @@ -428,7 +428,7 @@ public void onProductChange(ProductChangeEvent event) { // 伪代码示例 List changedSpuIds = getChangedSpuIds(); for (String spuId : changedSpuIds) { - String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); Map esDoc = httpClient.get(url); elasticsearchClient.index("search_products", esDoc); } diff --git a/main.py b/main.py index 2c6e428..90f36eb 100755 --- a/main.py +++ b/main.py @@ -20,13 +20,20 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ConfigLoader from utils import ESClient -from indexer import DataTransformer, IndexingPipeline -from embeddings import BgeEncoder, CLIPImageEncoder from search import Searcher def cmd_ingest(args): """Run data ingestion.""" + # Local imports to avoid hard dependency at module import time + import pandas as pd + from embeddings import BgeEncoder, CLIPImageEncoder + from indexer.bulk_indexer import IndexingPipeline + # NOTE: DataTransformer was referenced historically, but the concrete + # implementation is now provided via customer-specific scripts + # (e.g. data/customer1/ingest_customer1.py). If you still need a generic + # ingestion pipeline here, you can wire your own transformer. + from indexer.spu_transformer import SPUTransformer as DataTransformer print("Starting data ingestion") # Load config @@ -68,7 +75,7 @@ def cmd_serve(args): os.environ['ES_HOST'] = args.es_host print("Starting API service (multi-tenant)...") - print(f" Host: {args.host}:{args.port}") + print(f" Host: {args.host}:{args.port} (search + indexer routes)") print(f" Elasticsearch: {args.es_host}") uvicorn.run( @@ -79,6 +86,21 @@ def cmd_serve(args): ) +def cmd_serve_indexer(args): + """Start dedicated Indexer API service (no search endpoints).""" + os.environ['ES_HOST'] = args.es_host + + print("Starting Indexer API service...") + print(f" Host: {args.host}:{args.port} (indexer only)") + print(f" Elasticsearch: {args.es_host}") + + uvicorn.run( + "api.indexer_app:app", + host=args.host, + port=args.port, + reload=args.reload + ) + def cmd_search(args): """Test search from command line.""" # Load config @@ -148,6 +170,16 @@ def main(): serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') + # Serve-indexer command + serve_indexer_parser = subparsers.add_parser( + 'serve-indexer', + help='Start dedicated Indexer API service (indexer routes only)' + ) + serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') + serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to') + serve_indexer_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload') + # Search command search_parser = subparsers.add_parser('search', help='Test search from command line') search_parser.add_argument('query', help='Search query') @@ -169,6 +201,8 @@ def main(): return cmd_ingest(args) elif args.command == 'serve': return cmd_serve(args) + elif args.command == 'serve-indexer': + return cmd_serve_indexer(args) elif args.command == 'search': return cmd_search(args) else: diff --git a/scripts/amazon_xlsx_to_shoplazza_xlsx.py b/scripts/amazon_xlsx_to_shoplazza_xlsx.py index fbb9122..5a5c70a 100644 --- a/scripts/amazon_xlsx_to_shoplazza_xlsx.py +++ b/scripts/amazon_xlsx_to_shoplazza_xlsx.py @@ -421,8 +421,8 @@ def main(): parser.add_argument("--max-rows-per-output", type=int, default=40000, help="Max total Excel rows per output file (including模板头部行,默认40000)") parser.add_argument("--max-products", type=int, default=None, help="Limit number of SPU groups to output (for testing)") # 默认行为:丢弃不符合要求的数据 - parser.add_argument("--keep-spu-if-parent-missing", action="store_false", dest="skip_spu_if_parent_missing", default=True, help="Keep SPU even if parent ASIN not found in variants (default: skip entire SPU)") - parser.add_argument("--fix-sku-if-title-mismatch", action="store_false", dest="skip_sku_if_title_mismatch", default=True, help="Fix SKU title to match parent instead of skipping (default: skip SKU with mismatched title)") + parser.add_argument("--keep-spu-if-parent-missing", action="store_false", dest="skip_spu_if_parent_missing", default=False, help="Keep SPU even if parent ASIN not found in variants (default: skip entire SPU)") + parser.add_argument("--fix-sku-if-title-mismatch", action="store_false", dest="skip_sku_if_title_mismatch", default=False, help="Fix SKU title to match parent instead of skipping (default: skip SKU with mismatched title)") args = parser.parse_args() if not os.path.isdir(args.input_dir): diff --git a/scripts/start.sh b/scripts/start.sh index 4a40d9e..594a6fb 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -17,18 +17,18 @@ echo -e "${GREEN}========================================${NC}" # Create logs directory if it doesn't exist mkdir -p logs -# Step 1: Start backend in background -echo -e "\n${YELLOW}Step 1/2: 启动后端服务${NC}" -echo -e "${YELLOW}后端服务将在后台运行...${NC}" +# Step 1: Start backend in background (search API) +echo -e "\n${YELLOW}Step 1/3: 启动后端搜索服务${NC}" +echo -e "${YELLOW}后端搜索服务将在后台运行...${NC}" nohup ./scripts/start_backend.sh > logs/backend.log 2>&1 & BACKEND_PID=$! echo $BACKEND_PID > logs/backend.pid -echo -e "${GREEN}后端服务已启动 (PID: $BACKEND_PID)${NC}" +echo -e "${GREEN}后端搜索服务已启动 (PID: $BACKEND_PID)${NC}" echo -e "${GREEN}日志文件: logs/backend.log${NC}" # Wait for backend to start -echo -e "${YELLOW}等待后端服务启动...${NC}" +echo -e "${YELLOW}等待后端搜索服务启动...${NC}" MAX_RETRIES=30 RETRY_COUNT=0 BACKEND_READY=false @@ -45,15 +45,47 @@ done # Check if backend is running if [ "$BACKEND_READY" = true ]; then - echo -e "${GREEN}✓ 后端服务运行正常${NC}" + echo -e "${GREEN}✓ 后端搜索服务运行正常${NC}" else - echo -e "${RED}✗ 后端服务启动失败,请检查日志: logs/backend.log${NC}" + echo -e "${RED}✗ 后端搜索服务启动失败,请检查日志: logs/backend.log${NC}" echo -e "${YELLOW}提示: 后端服务可能需要更多时间启动,或者检查端口是否被占用${NC}" exit 1 fi -# Step 2: Start frontend in background -echo -e "\n${YELLOW}Step 2/2: 启动前端服务${NC}" +# Step 2: Start indexer in background +echo -e "\n${YELLOW}Step 2/3: 启动索引服务${NC}" +echo -e "${YELLOW}索引服务将在后台运行...${NC}" + +nohup ./scripts/start_indexer.sh > logs/indexer.log 2>&1 & +INDEXER_PID=$! +echo $INDEXER_PID > logs/indexer.pid +echo -e "${GREEN}索引服务已启动 (PID: $INDEXER_PID)${NC}" +echo -e "${GREEN}日志文件: logs/indexer.log${NC}" + +# Wait for indexer to start +echo -e "${YELLOW}等待索引服务启动...${NC}" +MAX_RETRIES=30 +RETRY_COUNT=0 +INDEXER_READY=false + +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + sleep 2 + if curl -s http://localhost:6004/health > /dev/null 2>&1; then + INDEXER_READY=true + break + fi + RETRY_COUNT=$((RETRY_COUNT + 1)) + echo -e "${YELLOW} 等待中... ($RETRY_COUNT/$MAX_RETRIES)${NC}" +done + +if [ "$INDEXER_READY" = true ]; then + echo -e "${GREEN}✓ 索引服务运行正常${NC}" +else + echo -e "${YELLOW}⚠ 索引服务可能还在启动中,请稍后访问 (日志: logs/indexer.log)${NC}" +fi + +# Step 3: Start frontend in background +echo -e "\n${YELLOW}Step 3/3: 启动前端服务${NC}" echo -e "${YELLOW}前端服务将在后台运行...${NC}" nohup ./scripts/start_frontend.sh > logs/frontend.log 2>&1 & diff --git a/scripts/start_backend.sh b/scripts/start_backend.sh index 4a36327..93e4738 100755 --- a/scripts/start_backend.sh +++ b/scripts/start_backend.sh @@ -29,7 +29,7 @@ echo " API Port: ${API_PORT:-6002}" echo " ES Host: ${ES_HOST:-http://localhost:9200}" echo " ES Username: ${ES_USERNAME:-not set}" -echo -e "\n${YELLOW}Starting service (multi-tenant)...${NC}" +echo -e "\n${YELLOW}Starting backend API service (search + admin)...${NC}" # Export environment variables for the Python process export API_HOST=${API_HOST:-0.0.0.0} @@ -38,8 +38,9 @@ export ES_HOST=${ES_HOST:-http://localhost:9200} export ES_USERNAME=${ES_USERNAME:-} export ES_PASSWORD=${ES_PASSWORD:-} -python -m api.app \ +python main.py serve \ --host $API_HOST \ --port $API_PORT \ --es-host $ES_HOST + diff --git a/scripts/start_indexer.sh b/scripts/start_indexer.sh new file mode 100755 index 0000000..fa3ade2 --- /dev/null +++ b/scripts/start_indexer.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# Start dedicated Indexer API Service + +set -e + +cd "$(dirname "$0")/.." +source /home/tw/miniconda3/etc/profile.d/conda.sh +conda activate searchengine + +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +echo -e "${GREEN}========================================${NC}" +echo -e "${GREEN}Starting Indexer API Service${NC}" +echo -e "${GREEN}========================================${NC}" + +# Load config from .env file if it exists +if [ -f .env ]; then + set -a + source .env + set +a +fi + +echo -e "\n${YELLOW}Configuration:${NC}" +echo " INDEXER Host: ${INDEXER_HOST:-0.0.0.0}" +echo " INDEXER Port: ${INDEXER_PORT:-6004}" +echo " ES Host: ${ES_HOST:-http://localhost:9200}" +echo " ES Username: ${ES_USERNAME:-not set}" + +echo -e "\n${YELLOW}Starting indexer service...${NC}" + +# Export environment variables for the Python process +export INDEXER_HOST=${INDEXER_HOST:-0.0.0.0} +export INDEXER_PORT=${INDEXER_PORT:-6004} +export ES_HOST=${ES_HOST:-http://localhost:9200} +export ES_USERNAME=${ES_USERNAME:-} +export ES_PASSWORD=${ES_PASSWORD:-} + +python main.py serve-indexer \ + --host $INDEXER_HOST \ + --port $INDEXER_PORT \ + --es-host $ES_HOST + + diff --git a/scripts/stop.sh b/scripts/stop.sh index 4a82380..a85c1ee 100755 --- a/scripts/stop.sh +++ b/scripts/stop.sh @@ -7,7 +7,7 @@ echo "========================================" echo "Stopping Search Engine Services" echo "========================================" -# Kill processes on port 6002 (backend) +# Kill processes on port 6002 (backend - search API) BACKEND_PIDS=$(lsof -ti:6002 2>/dev/null) if [ ! -z "$BACKEND_PIDS" ]; then echo "Stopping backend server(s) on port 6002..." @@ -29,6 +29,28 @@ else echo "No backend server found running on port 6002." fi +# Kill processes on port 6004 (indexer API) +INDEXER_PIDS=$(lsof -ti:6004 2>/dev/null) +if [ ! -z "$INDEXER_PIDS" ]; then + echo "Stopping indexer server(s) on port 6004..." + for PID in $INDEXER_PIDS; do + echo " Killing PID: $PID" + kill -TERM $PID 2>/dev/null || true + done + sleep 2 + # Force kill if still running + REMAINING_PIDS=$(lsof -ti:6004 2>/dev/null) + if [ ! -z "$REMAINING_PIDS" ]; then + echo " Force killing remaining processes..." + for PID in $REMAINING_PIDS; do + kill -KILL $PID 2>/dev/null || true + done + fi + echo "Indexer server stopped." +else + echo "No indexer server found running on port 6004." +fi + # Kill processes on port 6003 (frontend) FRONTEND_PIDS=$(lsof -ti:6003 2>/dev/null) if [ ! -z "$FRONTEND_PIDS" ]; then @@ -63,6 +85,15 @@ if [ -f "logs/backend.pid" ]; then rm -f logs/backend.pid fi +[ -f "logs/indexer.pid" ] && INDEXER_PID=$(cat logs/indexer.pid 2>/dev/null) +if [ ! -z "$INDEXER_PID" ] && kill -0 $INDEXER_PID 2>/dev/null; then + echo "Stopping indexer server via PID file (PID: $INDEXER_PID)..." + kill -TERM $INDEXER_PID 2>/dev/null || true + sleep 2 + kill -KILL $INDEXER_PID 2>/dev/null || true +fi +rm -f logs/indexer.pid + if [ -f "logs/frontend.pid" ]; then FRONTEND_PID=$(cat logs/frontend.pid 2>/dev/null) if [ ! -z "$FRONTEND_PID" ] && kill -0 $FRONTEND_PID 2>/dev/null; then diff --git a/scripts/tenant3__csv_to_shoplazza_xlsx.sh b/scripts/tenant3__csv_to_shoplazza_xlsx.sh old mode 100644 new mode 100755 index bf93793..bf93793 --- a/scripts/tenant3__csv_to_shoplazza_xlsx.sh +++ b/scripts/tenant3__csv_to_shoplazza_xlsx.sh -- libgit2 0.21.2