Commit bb9c626c6ff671be6d23a2b92e0f5eb8a43b135b

Authored by tangwang
1 parent 3ec5bfe6

搜索服务(6002)不再初始化/挂载 /indexer/* 路由,避免索引阻塞线上搜索

新增 api/indexer_app.py,在独立进程(默认 6004)中初始化 ES + DB + 索引服务,并复用 api/routes/indexer.py 一套路由
新增 api/service_registry.py,通过注册表向索引路由注入 ES 客户端和索引服务,消除重复代码与循环依赖
main.py 增加 serve-indexer 子命令;scripts/start.sh / stop.sh / start_backend.sh / start_indexer.sh 支持独立管理索引进程
文档中所有索引相关示例由 6002/indexer/* 统一调整为 6004/indexer/*
a.sh 0 → 100644
... ... @@ -0,0 +1 @@
  1 +curl -X POST "http://localhost:6004/indexer/reindex" -H "Content-Type: application/json" -d '{"tenant_id":"162","batch_size":500}'
... ...
api/app.py
... ... @@ -46,18 +46,15 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
46 46 from config.env_config import ES_CONFIG, DB_CONFIG
47 47 from config import ConfigLoader
48 48 from utils import ESClient
49   -from utils.db_connector import create_db_connection
50 49 from search import Searcher
51 50 from query import QueryParser
52   -from indexer.incremental_service import IncrementalIndexerService
  51 +from .service_registry import set_es_client
53 52  
54 53 # Global instances
55 54 _es_client: Optional[ESClient] = None
56 55 _searcher: Optional[Searcher] = None
57 56 _query_parser: Optional[QueryParser] = None
58 57 _config = None
59   -_incremental_service: Optional[IncrementalIndexerService] = None
60   -_bulk_indexing_service = None
61 58  
62 59  
63 60 def init_service(es_host: str = "http://localhost:9200"):
... ... @@ -67,7 +64,7 @@ def init_service(es_host: str = "http://localhost:9200"):
67 64 Args:
68 65 es_host: Elasticsearch host URL
69 66 """
70   - global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service
  67 + global _es_client, _searcher, _query_parser, _config
71 68  
72 69 start_time = time.time()
73 70 logger.info("Initializing search service (multi-tenant)")
... ... @@ -92,6 +89,8 @@ def init_service(es_host: str = "http://localhost:9200"):
92 89 if not _es_client.ping():
93 90 raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
94 91 logger.info("Elasticsearch connected")
  92 + # expose ES client for any shared components (e.g. searcher)
  93 + set_es_client(_es_client)
95 94  
96 95 # Initialize components
97 96 logger.info("Initializing query parser...")
... ... @@ -100,44 +99,6 @@ def init_service(es_host: str = "http://localhost:9200"):
100 99 logger.info("Initializing searcher...")
101 100 _searcher = Searcher(_es_client, _config, _query_parser)
102 101  
103   - # Initialize indexing services (if DB config is available)
104   - try:
105   - from utils.db_connector import create_db_connection
106   - from indexer.incremental_service import IncrementalIndexerService
107   - from indexer.bulk_indexing_service import BulkIndexingService
108   -
109   - db_host = os.getenv('DB_HOST')
110   - db_port = int(os.getenv('DB_PORT', 3306))
111   - db_database = os.getenv('DB_DATABASE')
112   - db_username = os.getenv('DB_USERNAME')
113   - db_password = os.getenv('DB_PASSWORD')
114   -
115   - if all([db_host, db_database, db_username, db_password]):
116   - logger.info("Initializing database connection for indexing services...")
117   - db_engine = create_db_connection(
118   - host=db_host,
119   - port=db_port,
120   - database=db_database,
121   - username=db_username,
122   - password=db_password
123   - )
124   -
125   - # Initialize incremental service
126   - _incremental_service = IncrementalIndexerService(db_engine)
127   - logger.info("Incremental indexer service initialized")
128   -
129   - # Initialize bulk indexing service
130   - _bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
131   - logger.info("Bulk indexing service initialized")
132   - else:
133   - logger.warning("Database config incomplete, indexing services will not be available")
134   - logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
135   - except Exception as e:
136   - logger.warning(f"Failed to initialize indexing services: {e}")
137   - logger.warning("Indexing endpoints will not be available")
138   - _incremental_service = None
139   - _bulk_indexing_service = None
140   -
141 102 elapsed = time.time() - start_time
142 103 logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}")
143 104  
... ... @@ -172,16 +133,6 @@ def get_config():
172 133 return _config
173 134  
174 135  
175   -def get_incremental_service() -> Optional[IncrementalIndexerService]:
176   - """Get incremental indexer service instance."""
177   - return _incremental_service
178   -
179   -
180   -def get_bulk_indexing_service():
181   - """Get bulk indexing service instance."""
182   - return _bulk_indexing_service
183   -
184   -
185 136 # Create FastAPI app with enhanced configuration
186 137 app = FastAPI(
187 138 title="E-Commerce Search API",
... ... @@ -320,12 +271,11 @@ async def health_check(request: Request):
320 271 )
321 272  
322 273  
323   -# Include routers
324   -from .routes import search, admin, indexer
  274 +# Include routers (search app should NOT mount indexer routes)
  275 +from .routes import search, admin
325 276  
326 277 app.include_router(search.router)
327 278 app.include_router(admin.router)
328   -app.include_router(indexer.router)
329 279  
330 280 # Mount static files and serve frontend
331 281 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend")
... ...
api/indexer_app.py 0 → 100644
... ... @@ -0,0 +1,236 @@
  1 +"""
  2 +FastAPI application dedicated to indexing (separate from search API).
  3 +
  4 +This service mounts ONLY ONE copy of indexer routes: api/routes/indexer.py
  5 +and injects required services via api/service_registry.py.
  6 +
  7 +Usage:
  8 + uvicorn api.indexer_app:app --host 0.0.0.0 --port 6004 --reload
  9 +
  10 +This service only exposes /indexer/* routes and can be run in a separate
  11 +process so that heavy indexing work does not block online search traffic.
  12 +"""
  13 +
  14 +import os
  15 +import sys
  16 +import logging
  17 +import time
  18 +from typing import Optional
  19 +
  20 +from fastapi import FastAPI, Request
  21 +from fastapi.responses import JSONResponse
  22 +
  23 +# Configure logging
  24 +import pathlib
  25 +
  26 +log_dir = pathlib.Path("logs")
  27 +log_dir.mkdir(exist_ok=True)
  28 +logging.basicConfig(
  29 + level=logging.INFO,
  30 + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  31 + handlers=[
  32 + logging.StreamHandler(),
  33 + logging.FileHandler(log_dir / "indexer_api.log", mode="a", encoding="utf-8"),
  34 + ],
  35 +)
  36 +logger = logging.getLogger(__name__)
  37 +
  38 +# Add parent directory to path
  39 +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  40 +
  41 +from config.env_config import ES_CONFIG # noqa: E402
  42 +from config import ConfigLoader # noqa: E402
  43 +from utils import ESClient # noqa: E402
  44 +from utils.db_connector import create_db_connection # noqa: E402
  45 +from indexer.incremental_service import IncrementalIndexerService # noqa: E402
  46 +from indexer.bulk_indexing_service import BulkIndexingService # noqa: E402
  47 +from .routes import indexer as indexer_routes # noqa: E402
  48 +from .service_registry import set_es_client, set_indexer_services # noqa: E402
  49 +
  50 +
  51 +_es_client: Optional[ESClient] = None
  52 +_config = None
  53 +_incremental_service: Optional[IncrementalIndexerService] = None
  54 +_bulk_indexing_service: Optional[BulkIndexingService] = None
  55 +
  56 +
  57 +def init_indexer_service(es_host: str = "http://localhost:9200"):
  58 + """
  59 + Initialize indexing services (ES client + DB + indexers).
  60 +
  61 + This mirrors the indexing-related initialization logic in api.app.init_service
  62 + but without search-related components.
  63 + """
  64 + global _es_client, _config, _incremental_service, _bulk_indexing_service
  65 +
  66 + start_time = time.time()
  67 + logger.info("Initializing Indexer service")
  68 +
  69 + # Load configuration (kept for parity/logging; indexer routes don't depend on it)
  70 + logger.info("Loading configuration...")
  71 + config_loader = ConfigLoader("config/config.yaml")
  72 + _config = config_loader.load_config()
  73 + logger.info("Configuration loaded")
  74 +
  75 + # Get ES credentials
  76 + es_username = os.getenv("ES_USERNAME") or ES_CONFIG.get("username")
  77 + es_password = os.getenv("ES_PASSWORD") or ES_CONFIG.get("password")
  78 +
  79 + # Connect to Elasticsearch
  80 + logger.info(f"Connecting to Elasticsearch at {es_host} for indexer...")
  81 + if es_username and es_password:
  82 + _es_client = ESClient(hosts=[es_host], username=es_username, password=es_password)
  83 + else:
  84 + _es_client = ESClient(hosts=[es_host])
  85 +
  86 + if not _es_client.ping():
  87 + raise ConnectionError(f"Failed to connect to Elasticsearch at {es_host}")
  88 + logger.info("Elasticsearch connected for indexer")
  89 + # publish ES client for routes
  90 + set_es_client(_es_client)
  91 +
  92 + # Initialize indexing services (DB is required here)
  93 + db_host = os.getenv("DB_HOST")
  94 + db_port = int(os.getenv("DB_PORT", 3306))
  95 + db_database = os.getenv("DB_DATABASE")
  96 + db_username = os.getenv("DB_USERNAME")
  97 + db_password = os.getenv("DB_PASSWORD")
  98 +
  99 + if all([db_host, db_database, db_username, db_password]):
  100 + logger.info("Initializing database connection for indexing services...")
  101 + db_engine = create_db_connection(
  102 + host=db_host,
  103 + port=db_port,
  104 + database=db_database,
  105 + username=db_username,
  106 + password=db_password,
  107 + )
  108 +
  109 + _incremental_service = IncrementalIndexerService(db_engine)
  110 + _bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
  111 + set_indexer_services(
  112 + incremental_service=_incremental_service,
  113 + bulk_indexing_service=_bulk_indexing_service,
  114 + )
  115 + logger.info("Indexer services initialized (incremental + bulk)")
  116 + else:
  117 + missing = [
  118 + name
  119 + for name, value in [
  120 + ("DB_HOST", db_host),
  121 + ("DB_DATABASE", db_database),
  122 + ("DB_USERNAME", db_username),
  123 + ("DB_PASSWORD", db_password),
  124 + ]
  125 + if not value
  126 + ]
  127 + logger.warning(
  128 + "Database config incomplete for indexer, services will not be available. "
  129 + f"Missing: {', '.join(missing)}"
  130 + )
  131 + _incremental_service = None
  132 + _bulk_indexing_service = None
  133 +
  134 + elapsed = time.time() - start_time
  135 + logger.info(f"Indexer service ready! (took {elapsed:.2f}s)")
  136 +
  137 + # NOTE: we intentionally do NOT synchronize anything into api.app
  138 + # to avoid code/route duplication and accidental availability on port 6002.
  139 +
  140 +
  141 +app = FastAPI(
  142 + title="E-Commerce Indexer API",
  143 + description="Dedicated indexing service for SearchEngine",
  144 + version="1.0.0",
  145 + docs_url="/docs",
  146 + redoc_url="/redoc",
  147 + openapi_url="/openapi.json",
  148 +)
  149 +
  150 +
  151 +@app.on_event("startup")
  152 +async def startup_event():
  153 + es_host = os.getenv("ES_HOST", "http://localhost:9200")
  154 + logger.info("Starting Indexer API service")
  155 + logger.info(f"Elasticsearch Host: {es_host}")
  156 + try:
  157 + init_indexer_service(es_host=es_host)
  158 + logger.info("Indexer service initialized successfully")
  159 + except Exception as e:
  160 + logger.error(f"Failed to initialize indexer service: {e}", exc_info=True)
  161 + logger.warning("Indexer service will start but may not function correctly")
  162 +
  163 +
  164 +@app.on_event("shutdown")
  165 +async def shutdown_event():
  166 + logger.info("Shutting down Indexer API service")
  167 +
  168 +
  169 +@app.exception_handler(Exception)
  170 +async def global_exception_handler(request: Request, exc: Exception):
  171 + """Global exception handler with basic logging."""
  172 + client_ip = request.client.host if request.client else "unknown"
  173 + logger.error(f"Unhandled exception from {client_ip}: {exc}", exc_info=True)
  174 +
  175 + return JSONResponse(
  176 + status_code=500,
  177 + content={
  178 + "error": "Internal server error",
  179 + "detail": "An unexpected error occurred in indexer service.",
  180 + "timestamp": int(time.time()),
  181 + },
  182 + )
  183 +
  184 +
  185 +@app.get("/health")
  186 +async def health_check():
  187 + """Simple health check for indexer service."""
  188 + try:
  189 + # ensure ES is reachable (best-effort)
  190 + if _es_client is None:
  191 + raise RuntimeError("ES client is not initialized")
  192 + return {
  193 + "status": "healthy",
  194 + "services": {
  195 + "elasticsearch": "connected",
  196 + "incremental_indexer": "initialized" if _incremental_service else "unavailable",
  197 + "bulk_indexer": "initialized" if _bulk_indexing_service else "unavailable",
  198 + },
  199 + "timestamp": int(time.time()),
  200 + }
  201 + except Exception as e:
  202 + logger.error(f"Indexer health check failed: {e}")
  203 + return JSONResponse(
  204 + status_code=503,
  205 + content={
  206 + "status": "unhealthy",
  207 + "error": str(e),
  208 + "timestamp": int(time.time()),
  209 + },
  210 + )
  211 +
  212 +
  213 +# Mount the single source of truth indexer routes
  214 +app.include_router(indexer_routes.router)
  215 +
  216 +
  217 +if __name__ == "__main__":
  218 + import argparse
  219 + import uvicorn
  220 +
  221 + parser = argparse.ArgumentParser(description="Start Indexer API service")
  222 + parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
  223 + parser.add_argument("--port", type=int, default=6004, help="Port to bind to")
  224 + parser.add_argument("--es-host", default="http://localhost:9200", help="Elasticsearch host")
  225 + parser.add_argument("--reload", action="store_true", help="Enable auto-reload")
  226 + args = parser.parse_args()
  227 +
  228 + os.environ["ES_HOST"] = args.es_host
  229 +
  230 + uvicorn.run(
  231 + "api.indexer_app:app",
  232 + host=args.host,
  233 + port=args.port,
  234 + reload=args.reload,
  235 + )
  236 +
... ...
api/routes/indexer.py
... ... @@ -8,6 +8,10 @@ from fastapi import APIRouter, HTTPException
8 8 from typing import List
9 9 from pydantic import BaseModel
10 10 import logging
  11 +from sqlalchemy import text
  12 +
  13 +# Indexer routes depend on services provided by api/indexer_app.py via this registry.
  14 +from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client
11 15  
12 16 logger = logging.getLogger(__name__)
13 17  
... ... @@ -42,7 +46,6 @@ async def reindex_all(request: ReindexRequest):
42 46 将指定租户的所有SPU数据重新索引到ES。支持删除旧索引并重建。
43 47 """
44 48 try:
45   - from ..app import get_bulk_indexing_service
46 49 service = get_bulk_indexing_service()
47 50 if service is None:
48 51 raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
... ... @@ -81,8 +84,6 @@ async def index_spus(request: IndexSpusRequest):
81 84 - 最后给出总体统计:total, success_count, failed_count等
82 85 """
83 86 try:
84   - from ..app import get_incremental_service, get_es_client
85   -
86 87 # 验证请求参数
87 88 if not request.spu_ids and not request.delete_spu_ids:
88 89 raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty")
... ... @@ -126,7 +127,6 @@ async def get_documents(request: GetDocumentsRequest):
126 127 根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。
127 128 """
128 129 try:
129   - from ..app import get_incremental_service
130 130 if not request.spu_ids:
131 131 raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
132 132 if len(request.spu_ids) > 100:
... ... @@ -165,8 +165,6 @@ async def get_documents(request: GetDocumentsRequest):
165 165 async def indexer_health_check():
166 166 """检查索引服务健康状态"""
167 167 try:
168   - from ..app import get_incremental_service
169   - from sqlalchemy import text
170 168 service = get_incremental_service()
171 169 if service is None:
172 170 return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
... ...
api/service_registry.py 0 → 100644
... ... @@ -0,0 +1,43 @@
  1 +"""
  2 +Minimal service registry shared by multiple FastAPI apps.
  3 +
  4 +We keep only ONE copy of indexer routes (api/routes/indexer.py) and inject
  5 +services via this registry, so the same routes can run in:
  6 +- Search API (6002): does NOT register indexer services, and does NOT mount indexer routes.
  7 +- Indexer API (6004): registers indexer services and mounts indexer routes.
  8 +"""
  9 +
  10 +from __future__ import annotations
  11 +
  12 +from typing import Any, Optional
  13 +
  14 +
  15 +_es_client: Optional[Any] = None
  16 +_incremental_service: Optional[Any] = None
  17 +_bulk_indexing_service: Optional[Any] = None
  18 +
  19 +
  20 +def set_es_client(es_client: Any) -> None:
  21 + global _es_client
  22 + _es_client = es_client
  23 +
  24 +
  25 +def get_es_client() -> Any:
  26 + if _es_client is None:
  27 + raise RuntimeError("ES client is not initialized")
  28 + return _es_client
  29 +
  30 +
  31 +def set_indexer_services(*, incremental_service: Any, bulk_indexing_service: Any) -> None:
  32 + global _incremental_service, _bulk_indexing_service
  33 + _incremental_service = incremental_service
  34 + _bulk_indexing_service = bulk_indexing_service
  35 +
  36 +
  37 +def get_incremental_service() -> Optional[Any]:
  38 + return _incremental_service
  39 +
  40 +
  41 +def get_bulk_indexing_service() -> Optional[Any]:
  42 + return _bulk_indexing_service
  43 +
... ...
docs/搜索API对接指南.md
... ... @@ -839,7 +839,7 @@ curl "http://localhost:6002/search/12345"
839 839  
840 840 **首次索引(重建索引)**:
841 841 ```bash
842   -curl -X POST "http://localhost:6002/indexer/reindex" \
  842 +curl -X POST "http://localhost:6004/indexer/reindex" \
843 843 -H "Content-Type: application/json" \
844 844 -d '{
845 845 "tenant_id": "162",
... ... @@ -859,7 +859,7 @@ tail -f logs/*.log
859 859  
860 860 **增量更新(不重建索引)**:
861 861 ```bash
862   -curl -X POST "http://localhost:6002/indexer/reindex" \
  862 +curl -X POST "http://localhost:6004/indexer/reindex" \
863 863 -H "Content-Type: application/json" \
864 864 -d '{
865 865 "tenant_id": "162",
... ... @@ -1015,7 +1015,7 @@ cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp
1015 1015  
1016 1016 **示例1:普通增量索引(自动检测删除)**:
1017 1017 ```bash
1018   -curl -X POST "http://localhost:6002/indexer/index" \
  1018 +curl -X POST "http://localhost:6004/indexer/index" \
1019 1019 -H "Content-Type: application/json" \
1020 1020 -d '{
1021 1021 "tenant_id": "162",
... ... @@ -1026,7 +1026,7 @@ curl -X POST "http://localhost:6002/indexer/index" \
1026 1026  
1027 1027 **示例2:显式删除(批量删除)**:
1028 1028 ```bash
1029   -curl -X POST "http://localhost:6002/indexer/index" \
  1029 +curl -X POST "http://localhost:6004/indexer/index" \
1030 1030 -H "Content-Type: application/json" \
1031 1031 -d '{
1032 1032 "tenant_id": "162",
... ... @@ -1038,7 +1038,7 @@ curl -X POST "http://localhost:6002/indexer/index" \
1038 1038  
1039 1039 **示例3:仅删除(不索引)**:
1040 1040 ```bash
1041   -curl -X POST "http://localhost:6002/indexer/index" \
  1041 +curl -X POST "http://localhost:6004/indexer/index" \
1042 1042 -H "Content-Type: application/json" \
1043 1043 -d '{
1044 1044 "tenant_id": "162",
... ... @@ -1050,7 +1050,7 @@ curl -X POST "http://localhost:6002/indexer/index" \
1050 1050  
1051 1051 **示例4:混合操作(索引+删除)**:
1052 1052 ```bash
1053   -curl -X POST "http://localhost:6002/indexer/index" \
  1053 +curl -X POST "http://localhost:6004/indexer/index" \
1054 1054 -H "Content-Type: application/json" \
1055 1055 -d '{
1056 1056 "tenant_id": "162",
... ... @@ -1133,7 +1133,7 @@ curl -X POST "http://localhost:6002/indexer/index" \
1133 1133  
1134 1134 **单个SPU查询**:
1135 1135 ```bash
1136   -curl -X POST "http://localhost:6002/indexer/documents" \
  1136 +curl -X POST "http://localhost:6004/indexer/documents" \
1137 1137 -H "Content-Type: application/json" \
1138 1138 -d '{
1139 1139 "tenant_id": "162",
... ... @@ -1143,7 +1143,7 @@ curl -X POST "http://localhost:6002/indexer/documents" \
1143 1143  
1144 1144 **批量SPU查询**:
1145 1145 ```bash
1146   -curl -X POST "http://localhost:6002/indexer/documents" \
  1146 +curl -X POST "http://localhost:6004/indexer/documents" \
1147 1147 -H "Content-Type: application/json" \
1148 1148 -d '{
1149 1149 "tenant_id": "162",
... ... @@ -1182,7 +1182,7 @@ curl -X POST "http://localhost:6002/indexer/documents" \
1182 1182 #### 请求示例
1183 1183  
1184 1184 ```bash
1185   -curl -X GET "http://localhost:6002/indexer/health"
  1185 +curl -X GET "http://localhost:6004/indexer/health"
1186 1186 ```
1187 1187  
1188 1188 ---
... ...
docs/索引数据接口文档.md
... ... @@ -250,12 +250,12 @@ GET /indexer/spu/{spu_id}?tenant_id={tenant_id}
250 250  
251 251 ```bash
252 252 # cURL
253   -curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1"
  253 +curl -X GET "http://localhost:6004/indexer/spu/123?tenant_id=1"
254 254  
255 255 # Java (OkHttp)
256 256 OkHttpClient client = new OkHttpClient();
257 257 Request request = new Request.Builder()
258   - .url("http://localhost:6002/indexer/spu/123?tenant_id=1")
  258 + .url("http://localhost:6004/indexer/spu/123?tenant_id=1")
259 259 .get()
260 260 .build();
261 261 Response response = client.newCall(request).execute();
... ... @@ -412,7 +412,7 @@ public void onProductChange(ProductChangeEvent event) {
412 412 String spuId = event.getSpuId();
413 413  
414 414 // 调用增量接口获取ES文档数据
415   - String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
  415 + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
416 416 Map<String, Object> esDoc = httpClient.get(url);
417 417  
418 418 // 推送到ES
... ... @@ -428,7 +428,7 @@ public void onProductChange(ProductChangeEvent event) {
428 428 // 伪代码示例
429 429 List<String> changedSpuIds = getChangedSpuIds();
430 430 for (String spuId : changedSpuIds) {
431   - String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
  431 + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
432 432 Map<String, Object> esDoc = httpClient.get(url);
433 433 elasticsearchClient.index("search_products", esDoc);
434 434 }
... ...
... ... @@ -20,13 +20,20 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
20 20  
21 21 from config import ConfigLoader
22 22 from utils import ESClient
23   -from indexer import DataTransformer, IndexingPipeline
24   -from embeddings import BgeEncoder, CLIPImageEncoder
25 23 from search import Searcher
26 24  
27 25  
28 26 def cmd_ingest(args):
29 27 """Run data ingestion."""
  28 + # Local imports to avoid hard dependency at module import time
  29 + import pandas as pd
  30 + from embeddings import BgeEncoder, CLIPImageEncoder
  31 + from indexer.bulk_indexer import IndexingPipeline
  32 + # NOTE: DataTransformer was referenced historically, but the concrete
  33 + # implementation is now provided via customer-specific scripts
  34 + # (e.g. data/customer1/ingest_customer1.py). If you still need a generic
  35 + # ingestion pipeline here, you can wire your own transformer.
  36 + from indexer.spu_transformer import SPUTransformer as DataTransformer
30 37 print("Starting data ingestion")
31 38  
32 39 # Load config
... ... @@ -68,7 +75,7 @@ def cmd_serve(args):
68 75 os.environ['ES_HOST'] = args.es_host
69 76  
70 77 print("Starting API service (multi-tenant)...")
71   - print(f" Host: {args.host}:{args.port}")
  78 + print(f" Host: {args.host}:{args.port} (search + indexer routes)")
72 79 print(f" Elasticsearch: {args.es_host}")
73 80  
74 81 uvicorn.run(
... ... @@ -79,6 +86,21 @@ def cmd_serve(args):
79 86 )
80 87  
81 88  
  89 +def cmd_serve_indexer(args):
  90 + """Start dedicated Indexer API service (no search endpoints)."""
  91 + os.environ['ES_HOST'] = args.es_host
  92 +
  93 + print("Starting Indexer API service...")
  94 + print(f" Host: {args.host}:{args.port} (indexer only)")
  95 + print(f" Elasticsearch: {args.es_host}")
  96 +
  97 + uvicorn.run(
  98 + "api.indexer_app:app",
  99 + host=args.host,
  100 + port=args.port,
  101 + reload=args.reload
  102 + )
  103 +
82 104 def cmd_search(args):
83 105 """Test search from command line."""
84 106 # Load config
... ... @@ -148,6 +170,16 @@ def main():
148 170 serve_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
149 171 serve_parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
150 172  
  173 + # Serve-indexer command
  174 + serve_indexer_parser = subparsers.add_parser(
  175 + 'serve-indexer',
  176 + help='Start dedicated Indexer API service (indexer routes only)'
  177 + )
  178 + serve_indexer_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
  179 + serve_indexer_parser.add_argument('--port', type=int, default=6004, help='Port to bind to')
  180 + serve_indexer_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
  181 + serve_indexer_parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
  182 +
151 183 # Search command
152 184 search_parser = subparsers.add_parser('search', help='Test search from command line')
153 185 search_parser.add_argument('query', help='Search query')
... ... @@ -169,6 +201,8 @@ def main():
169 201 return cmd_ingest(args)
170 202 elif args.command == 'serve':
171 203 return cmd_serve(args)
  204 + elif args.command == 'serve-indexer':
  205 + return cmd_serve_indexer(args)
172 206 elif args.command == 'search':
173 207 return cmd_search(args)
174 208 else:
... ...
scripts/amazon_xlsx_to_shoplazza_xlsx.py
... ... @@ -421,8 +421,8 @@ def main():
421 421 parser.add_argument("--max-rows-per-output", type=int, default=40000, help="Max total Excel rows per output file (including模板头部行,默认40000)")
422 422 parser.add_argument("--max-products", type=int, default=None, help="Limit number of SPU groups to output (for testing)")
423 423 # 默认行为:丢弃不符合要求的数据
424   - parser.add_argument("--keep-spu-if-parent-missing", action="store_false", dest="skip_spu_if_parent_missing", default=True, help="Keep SPU even if parent ASIN not found in variants (default: skip entire SPU)")
425   - parser.add_argument("--fix-sku-if-title-mismatch", action="store_false", dest="skip_sku_if_title_mismatch", default=True, help="Fix SKU title to match parent instead of skipping (default: skip SKU with mismatched title)")
  424 + parser.add_argument("--keep-spu-if-parent-missing", action="store_false", dest="skip_spu_if_parent_missing", default=False, help="Keep SPU even if parent ASIN not found in variants (default: skip entire SPU)")
  425 + parser.add_argument("--fix-sku-if-title-mismatch", action="store_false", dest="skip_sku_if_title_mismatch", default=False, help="Fix SKU title to match parent instead of skipping (default: skip SKU with mismatched title)")
426 426 args = parser.parse_args()
427 427  
428 428 if not os.path.isdir(args.input_dir):
... ...
scripts/start.sh
... ... @@ -17,18 +17,18 @@ echo -e &quot;${GREEN}========================================${NC}&quot;
17 17 # Create logs directory if it doesn't exist
18 18 mkdir -p logs
19 19  
20   -# Step 1: Start backend in background
21   -echo -e "\n${YELLOW}Step 1/2: 启动后端服务${NC}"
22   -echo -e "${YELLOW}后端服务将在后台运行...${NC}"
  20 +# Step 1: Start backend in background (search API)
  21 +echo -e "\n${YELLOW}Step 1/3: 启动后端搜索服务${NC}"
  22 +echo -e "${YELLOW}后端搜索服务将在后台运行...${NC}"
23 23  
24 24 nohup ./scripts/start_backend.sh > logs/backend.log 2>&1 &
25 25 BACKEND_PID=$!
26 26 echo $BACKEND_PID > logs/backend.pid
27   -echo -e "${GREEN}后端服务已启动 (PID: $BACKEND_PID)${NC}"
  27 +echo -e "${GREEN}后端搜索服务已启动 (PID: $BACKEND_PID)${NC}"
28 28 echo -e "${GREEN}日志文件: logs/backend.log${NC}"
29 29  
30 30 # Wait for backend to start
31   -echo -e "${YELLOW}等待后端服务启动...${NC}"
  31 +echo -e "${YELLOW}等待后端搜索服务启动...${NC}"
32 32 MAX_RETRIES=30
33 33 RETRY_COUNT=0
34 34 BACKEND_READY=false
... ... @@ -45,15 +45,47 @@ done
45 45  
46 46 # Check if backend is running
47 47 if [ "$BACKEND_READY" = true ]; then
48   - echo -e "${GREEN}✓ 后端服务运行正常${NC}"
  48 + echo -e "${GREEN}✓ 后端搜索服务运行正常${NC}"
49 49 else
50   - echo -e "${RED}✗ 后端服务启动失败,请检查日志: logs/backend.log${NC}"
  50 + echo -e "${RED}✗ 后端搜索服务启动失败,请检查日志: logs/backend.log${NC}"
51 51 echo -e "${YELLOW}提示: 后端服务可能需要更多时间启动,或者检查端口是否被占用${NC}"
52 52 exit 1
53 53 fi
54 54  
55   -# Step 2: Start frontend in background
56   -echo -e "\n${YELLOW}Step 2/2: 启动前端服务${NC}"
  55 +# Step 2: Start indexer in background
  56 +echo -e "\n${YELLOW}Step 2/3: 启动索引服务${NC}"
  57 +echo -e "${YELLOW}索引服务将在后台运行...${NC}"
  58 +
  59 +nohup ./scripts/start_indexer.sh > logs/indexer.log 2>&1 &
  60 +INDEXER_PID=$!
  61 +echo $INDEXER_PID > logs/indexer.pid
  62 +echo -e "${GREEN}索引服务已启动 (PID: $INDEXER_PID)${NC}"
  63 +echo -e "${GREEN}日志文件: logs/indexer.log${NC}"
  64 +
  65 +# Wait for indexer to start
  66 +echo -e "${YELLOW}等待索引服务启动...${NC}"
  67 +MAX_RETRIES=30
  68 +RETRY_COUNT=0
  69 +INDEXER_READY=false
  70 +
  71 +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
  72 + sleep 2
  73 + if curl -s http://localhost:6004/health > /dev/null 2>&1; then
  74 + INDEXER_READY=true
  75 + break
  76 + fi
  77 + RETRY_COUNT=$((RETRY_COUNT + 1))
  78 + echo -e "${YELLOW} 等待中... ($RETRY_COUNT/$MAX_RETRIES)${NC}"
  79 +done
  80 +
  81 +if [ "$INDEXER_READY" = true ]; then
  82 + echo -e "${GREEN}✓ 索引服务运行正常${NC}"
  83 +else
  84 + echo -e "${YELLOW}⚠ 索引服务可能还在启动中,请稍后访问 (日志: logs/indexer.log)${NC}"
  85 +fi
  86 +
  87 +# Step 3: Start frontend in background
  88 +echo -e "\n${YELLOW}Step 3/3: 启动前端服务${NC}"
57 89 echo -e "${YELLOW}前端服务将在后台运行...${NC}"
58 90  
59 91 nohup ./scripts/start_frontend.sh > logs/frontend.log 2>&1 &
... ...
scripts/start_backend.sh
... ... @@ -29,7 +29,7 @@ echo &quot; API Port: ${API_PORT:-6002}&quot;
29 29 echo " ES Host: ${ES_HOST:-http://localhost:9200}"
30 30 echo " ES Username: ${ES_USERNAME:-not set}"
31 31  
32   -echo -e "\n${YELLOW}Starting service (multi-tenant)...${NC}"
  32 +echo -e "\n${YELLOW}Starting backend API service (search + admin)...${NC}"
33 33  
34 34 # Export environment variables for the Python process
35 35 export API_HOST=${API_HOST:-0.0.0.0}
... ... @@ -38,8 +38,9 @@ export ES_HOST=${ES_HOST:-http://localhost:9200}
38 38 export ES_USERNAME=${ES_USERNAME:-}
39 39 export ES_PASSWORD=${ES_PASSWORD:-}
40 40  
41   -python -m api.app \
  41 +python main.py serve \
42 42 --host $API_HOST \
43 43 --port $API_PORT \
44 44 --es-host $ES_HOST
45 45  
  46 +
... ...
scripts/start_indexer.sh 0 → 100755
... ... @@ -0,0 +1,46 @@
  1 +#!/bin/bash
  2 +
  3 +# Start dedicated Indexer API Service
  4 +
  5 +set -e
  6 +
  7 +cd "$(dirname "$0")/.."
  8 +source /home/tw/miniconda3/etc/profile.d/conda.sh
  9 +conda activate searchengine
  10 +
  11 +GREEN='\033[0;32m'
  12 +YELLOW='\033[1;33m'
  13 +NC='\033[0m'
  14 +
  15 +echo -e "${GREEN}========================================${NC}"
  16 +echo -e "${GREEN}Starting Indexer API Service${NC}"
  17 +echo -e "${GREEN}========================================${NC}"
  18 +
  19 +# Load config from .env file if it exists
  20 +if [ -f .env ]; then
  21 + set -a
  22 + source .env
  23 + set +a
  24 +fi
  25 +
  26 +echo -e "\n${YELLOW}Configuration:${NC}"
  27 +echo " INDEXER Host: ${INDEXER_HOST:-0.0.0.0}"
  28 +echo " INDEXER Port: ${INDEXER_PORT:-6004}"
  29 +echo " ES Host: ${ES_HOST:-http://localhost:9200}"
  30 +echo " ES Username: ${ES_USERNAME:-not set}"
  31 +
  32 +echo -e "\n${YELLOW}Starting indexer service...${NC}"
  33 +
  34 +# Export environment variables for the Python process
  35 +export INDEXER_HOST=${INDEXER_HOST:-0.0.0.0}
  36 +export INDEXER_PORT=${INDEXER_PORT:-6004}
  37 +export ES_HOST=${ES_HOST:-http://localhost:9200}
  38 +export ES_USERNAME=${ES_USERNAME:-}
  39 +export ES_PASSWORD=${ES_PASSWORD:-}
  40 +
  41 +python main.py serve-indexer \
  42 + --host $INDEXER_HOST \
  43 + --port $INDEXER_PORT \
  44 + --es-host $ES_HOST
  45 +
  46 +
... ...
scripts/stop.sh
... ... @@ -7,7 +7,7 @@ echo &quot;========================================&quot;
7 7 echo "Stopping Search Engine Services"
8 8 echo "========================================"
9 9  
10   -# Kill processes on port 6002 (backend)
  10 +# Kill processes on port 6002 (backend - search API)
11 11 BACKEND_PIDS=$(lsof -ti:6002 2>/dev/null)
12 12 if [ ! -z "$BACKEND_PIDS" ]; then
13 13 echo "Stopping backend server(s) on port 6002..."
... ... @@ -29,6 +29,28 @@ else
29 29 echo "No backend server found running on port 6002."
30 30 fi
31 31  
  32 +# Kill processes on port 6004 (indexer API)
  33 +INDEXER_PIDS=$(lsof -ti:6004 2>/dev/null)
  34 +if [ ! -z "$INDEXER_PIDS" ]; then
  35 + echo "Stopping indexer server(s) on port 6004..."
  36 + for PID in $INDEXER_PIDS; do
  37 + echo " Killing PID: $PID"
  38 + kill -TERM $PID 2>/dev/null || true
  39 + done
  40 + sleep 2
  41 + # Force kill if still running
  42 + REMAINING_PIDS=$(lsof -ti:6004 2>/dev/null)
  43 + if [ ! -z "$REMAINING_PIDS" ]; then
  44 + echo " Force killing remaining processes..."
  45 + for PID in $REMAINING_PIDS; do
  46 + kill -KILL $PID 2>/dev/null || true
  47 + done
  48 + fi
  49 + echo "Indexer server stopped."
  50 +else
  51 + echo "No indexer server found running on port 6004."
  52 +fi
  53 +
32 54 # Kill processes on port 6003 (frontend)
33 55 FRONTEND_PIDS=$(lsof -ti:6003 2>/dev/null)
34 56 if [ ! -z "$FRONTEND_PIDS" ]; then
... ... @@ -63,6 +85,15 @@ if [ -f &quot;logs/backend.pid&quot; ]; then
63 85 rm -f logs/backend.pid
64 86 fi
65 87  
  88 +[ -f "logs/indexer.pid" ] && INDEXER_PID=$(cat logs/indexer.pid 2>/dev/null)
  89 +if [ ! -z "$INDEXER_PID" ] && kill -0 $INDEXER_PID 2>/dev/null; then
  90 + echo "Stopping indexer server via PID file (PID: $INDEXER_PID)..."
  91 + kill -TERM $INDEXER_PID 2>/dev/null || true
  92 + sleep 2
  93 + kill -KILL $INDEXER_PID 2>/dev/null || true
  94 +fi
  95 +rm -f logs/indexer.pid
  96 +
66 97 if [ -f "logs/frontend.pid" ]; then
67 98 FRONTEND_PID=$(cat logs/frontend.pid 2>/dev/null)
68 99 if [ ! -z "$FRONTEND_PID" ] && kill -0 $FRONTEND_PID 2>/dev/null; then
... ...
scripts/tenant3__csv_to_shoplazza_xlsx.sh 100644 → 100755