Commit 3c1f80319aeaae8a0192a3d51f90575556cb718f

Authored by tangwang
1 parent 0064e946

api/routes/indexer.py

- 新增批量索引接口: POST /indexer/bulk - 全量索引功能
  - SPU接口改进: POST /indexer/spus - 支持批量获取SPU文档(最多100个)

新增 全量索引服务
indexer/bulk_indexing_service.py

docs/搜索API对接指南.md
  - 新增索引接口文档: 详细的批量索引和SPU索引接口说明
  - 请求示例: 提供完整的curl命令示例
COMMIT_MESSAGE.md deleted
... ... @@ -1,154 +0,0 @@
1   -# 本次修改总结
2   -
3   -## 最终状态
4   -
5   -### 1. 增量数据获取服务
6   -
7   -**新增文件**:
8   -- `indexer/incremental_service.py`: 增量索引服务,提供单个SPU数据获取
9   -- `api/routes/indexer.py`: 增量索引API路由
10   -- `indexer/test_indexing.py`: 索引功能测试脚本
11   -
12   -**功能**:
13   -- 提供 `GET /indexer/spu/{spu_id}?tenant_id={tenant_id}` 接口,返回单个SPU的ES文档数据
14   -- 服务启动时预加载分类映射(全局共享),提高性能
15   -- 支持按需加载租户配置和搜索配置
16   -
17   -### 2. 公共文档转换器
18   -
19   -**新增文件**:
20   -- `indexer/document_transformer.py`: SPU文档转换器,提取全量和增量共用的转换逻辑
21   -
22   -**功能**:
23   -- 统一了全量索引(SPUTransformer)和增量索引(IncrementalIndexerService)的文档转换逻辑
24   -- 消除了约300行重复代码
25   -- 支持根据租户配置进行语言处理和翻译
26   -
27   -### 3. 租户配置系统
28   -
29   -**配置位置**:
30   -- 租户配置合并到统一配置文件 `config/config.yaml` 的 `tenant_config` 部分
31   -- 删除了独立的 `config/tenant_config.json` 文件
32   -
33   -**配置结构**:
34   -```yaml
35   -tenant_config:
36   - default:
37   - primary_language: "zh"
38   - translate_to_en: true
39   - translate_to_zh: false
40   - tenants:
41   - "162":
42   - primary_language: "zh"
43   - translate_to_en: false # 翻译关闭
44   - translate_to_zh: false
45   -```
46   -
47   -**功能**:
48   -- 每个租户可配置主语言和翻译选项
49   -- 租户162配置为翻译关闭(用于测试)
50   -- 未配置的租户使用默认配置
51   -
52   -### 4. 翻译功能集成
53   -
54   -**翻译模块增强**:
55   -- `query/translator.py`: 支持提示词参数,作为DeepL API的`context`参数传递
56   -- 修复了重复的executor初始化代码
57   -- 统一使用logger替代print语句
58   -
59   -**翻译提示词配置**:
60   -- 在 `config/config.yaml` 的 `translation_prompts` 部分配置
61   -- 支持中英文提示词:
62   - - `product_title_zh/en`: 商品标题翻译提示词
63   - - `query_zh/en`: 查询翻译提示词
64   - - `default_zh/en`: 默认翻译用词
65   -
66   -**翻译模式**:
67   -- **索引场景**:同步翻译,等待结果返回,使用缓存避免重复翻译
68   -- **查询场景**:异步翻译,立即返回缓存结果,后台翻译缺失项
69   -
70   -**DeepL Context参数**:
71   -- 提示词作为DeepL API的`context`参数传递(不参与翻译,仅提供上下文)
72   -- Context中的字符不计入DeepL计费
73   -
74   -### 5. 代码重构
75   -
76   -**消除冗余**:
77   -- 提取公共转换逻辑到 `SPUDocumentTransformer`
78   -- `SPUTransformer` 和 `IncrementalIndexerService` 都使用公共转换器
79   -- 移除了重复的 `_transform_spu_to_doc` 和 `_transform_sku_row` 方法
80   -
81   -**架构优化**:
82   -- 全量和增量索引共用同一转换逻辑
83   -- 分类映射在服务启动时预加载(全局共享)
84   -- 租户配置按需加载(支持热更新)
85   -
86   -### 6. 测试
87   -
88   -**测试文件位置**(遵循模块化原则):
89   -- `indexer/test_indexing.py`: 索引功能测试(全量、增量、租户配置、文档转换器)
90   -- `query/test_translation.py`: 翻译功能测试(同步、异步、缓存、Context参数)
91   -
92   -### 7. 文档更新
93   -
94   -- `docs/索引数据接口文档.md`: 更新了租户配置说明,从独立JSON文件改为统一配置文件
95   -- `docs/翻译功能测试说明.md`: 新增翻译功能测试说明文档
96   -
97   -## 修改的文件
98   -
99   -### 新增文件
100   -- `indexer/incremental_service.py`
101   -- `indexer/document_transformer.py`
102   -- `indexer/test_indexing.py`
103   -- `api/routes/indexer.py`
104   -- `query/test_translation.py`
105   -- `config/tenant_config_loader.py` (重构,从JSON改为YAML)
106   -- `docs/翻译功能测试说明.md`
107   -
108   -### 修改文件
109   -- `config/config.yaml`: 添加租户配置和翻译提示词配置
110   -- `config/config_loader.py`: 支持租户配置加载
111   -- `config/tenant_config_loader.py`: 从统一配置文件加载租户配置
112   -- `indexer/spu_transformer.py`: 使用公共转换器,集成翻译服务
113   -- `indexer/incremental_service.py`: 使用公共转换器,集成翻译服务
114   -- `query/translator.py`: 支持提示词作为context参数,修复冗余代码
115   -- `query/query_parser.py`: 使用翻译提示词
116   -- `api/app.py`: 注册增量索引路由,初始化增量服务
117   -- `docs/索引数据接口文档.md`: 更新租户配置说明
118   -
119   -### 删除文件
120   -- `config/tenant_config.json`: 合并到统一配置文件
121   -
122   -## 测试验证
123   -
124   -### 租户162测试(翻译关闭)
125   -- 全量索引:验证翻译功能关闭,title_en为None
126   -- 增量索引:验证翻译功能关闭,title_en为None
127   -- 文档转换器:验证根据租户配置正确处理翻译
128   -
129   -### 其他租户测试(翻译开启)
130   -- 验证翻译功能正常工作
131   -- 验证提示词正确使用
132   -
133   -## 架构设计
134   -
135   -### 数据流
136   -```
137   -MySQL数据
138   - ↓
139   -SPUTransformer / IncrementalIndexerService (数据加载层)
140   - ↓
141   -SPUDocumentTransformer (公共转换层)
142   - ↓
143   -ES文档 (输出)
144   -```
145   -
146   -### 配置层次
147   -1. **索引配置** (`config/config.yaml`): 搜索行为配置
148   -2. **租户配置** (`config/config.yaml` 的 `tenant_config` 部分): 数据转换配置
149   -
150   -### 性能优化
151   -1. 公共数据预加载:分类映射在服务启动时一次性加载
152   -2. 配置按需加载:租户配置和搜索配置按需加载,支持热更新
153   -3. 翻译缓存:索引时使用缓存避免重复翻译
154   -
api/app.py
... ... @@ -24,12 +24,15 @@ from slowapi.util import get_remote_address
24 24 from slowapi.errors import RateLimitExceeded
25 25  
26 26 # Configure logging with better formatting
  27 +import pathlib
  28 +log_dir = pathlib.Path('logs')
  29 +log_dir.mkdir(exist_ok=True)
27 30 logging.basicConfig(
28 31 level=logging.INFO,
29 32 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
30 33 handlers=[
31 34 logging.StreamHandler(),
32   - logging.FileHandler('/tmp/search_engine_api.log', mode='a')
  35 + logging.FileHandler(log_dir / 'api.log', mode='a', encoding='utf-8')
33 36 ]
34 37 )
35 38 logger = logging.getLogger(__name__)
... ... @@ -54,6 +57,7 @@ _searcher: Optional[Searcher] = None
54 57 _query_parser: Optional[QueryParser] = None
55 58 _config = None
56 59 _incremental_service: Optional[IncrementalIndexerService] = None
  60 +_bulk_indexing_service = None
57 61  
58 62  
59 63 def init_service(es_host: str = "http://localhost:9200"):
... ... @@ -63,7 +67,7 @@ def init_service(es_host: str = "http://localhost:9200"):
63 67 Args:
64 68 es_host: Elasticsearch host URL
65 69 """
66   - global _es_client, _searcher, _query_parser, _config, _incremental_service
  70 + global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service
67 71  
68 72 start_time = time.time()
69 73 logger.info("Initializing search service (multi-tenant)")
... ... @@ -96,16 +100,20 @@ def init_service(es_host: str = "http://localhost:9200"):
96 100 logger.info("Initializing searcher...")
97 101 _searcher = Searcher(_es_client, _config, _query_parser)
98 102  
99   - # Initialize incremental indexer service (if DB config is available)
  103 + # Initialize indexing services (if DB config is available)
100 104 try:
101   - db_host = DB_CONFIG.get('host')
102   - db_port = DB_CONFIG.get('port', 3306)
103   - db_database = DB_CONFIG.get('database')
104   - db_username = DB_CONFIG.get('username')
105   - db_password = DB_CONFIG.get('password')
  105 + from utils.db_connector import create_db_connection
  106 + from indexer.incremental_service import IncrementalIndexerService
  107 + from indexer.bulk_indexing_service import BulkIndexingService
  108 +
  109 + db_host = os.getenv('DB_HOST')
  110 + db_port = int(os.getenv('DB_PORT', 3306))
  111 + db_database = os.getenv('DB_DATABASE')
  112 + db_username = os.getenv('DB_USERNAME')
  113 + db_password = os.getenv('DB_PASSWORD')
106 114  
107 115 if all([db_host, db_database, db_username, db_password]):
108   - logger.info("Initializing incremental indexer service...")
  116 + logger.info("Initializing database connection for indexing services...")
109 117 db_engine = create_db_connection(
110 118 host=db_host,
111 119 port=db_port,
... ... @@ -113,15 +121,22 @@ def init_service(es_host: str = "http://localhost:9200"):
113 121 username=db_username,
114 122 password=db_password
115 123 )
  124 +
  125 + # Initialize incremental service
116 126 _incremental_service = IncrementalIndexerService(db_engine)
117 127 logger.info("Incremental indexer service initialized")
  128 +
  129 + # Initialize bulk indexing service
  130 + _bulk_indexing_service = BulkIndexingService(db_engine, _es_client)
  131 + logger.info("Bulk indexing service initialized")
118 132 else:
119   - logger.warning("Database configuration incomplete, incremental indexer service will not be available")
  133 + logger.warning("Database config incomplete, indexing services will not be available")
120 134 logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
121 135 except Exception as e:
122   - logger.warning(f"Failed to initialize incremental indexer service: {e}")
123   - logger.warning("Incremental indexer endpoints will not be available")
  136 + logger.warning(f"Failed to initialize indexing services: {e}")
  137 + logger.warning("Indexing endpoints will not be available")
124 138 _incremental_service = None
  139 + _bulk_indexing_service = None
125 140  
126 141 elapsed = time.time() - start_time
127 142 logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}")
... ... @@ -162,6 +177,11 @@ def get_incremental_service() -> Optional[IncrementalIndexerService]:
162 177 return _incremental_service
163 178  
164 179  
  180 +def get_bulk_indexing_service():
  181 + """Get bulk indexing service instance."""
  182 + return _bulk_indexing_service
  183 +
  184 +
165 185 # Create FastAPI app with enhanced configuration
166 186 app = FastAPI(
167 187 title="E-Commerce Search API",
... ... @@ -207,15 +227,14 @@ app.add_middleware(
207 227 async def startup_event():
208 228 """Initialize service on startup."""
209 229 es_host = os.getenv("ES_HOST", "http://localhost:9200")
210   -
211 230 logger.info("Starting E-Commerce Search API (Multi-Tenant)")
212 231 logger.info(f"Elasticsearch Host: {es_host}")
213   -
  232 +
214 233 try:
215 234 init_service(es_host=es_host)
216 235 logger.info("Service initialized successfully")
217 236 except Exception as e:
218   - logger.error(f"Failed to initialize service: {e}")
  237 + logger.error(f"Failed to initialize service: {e}", exc_info=True)
219 238 logger.warning("Service will start but may not function correctly")
220 239  
221 240  
... ...
api/routes/indexer.py
1 1 """
2   -增量索引API路由。
  2 +索引API路由。
3 3  
4   -提供单个SPU数据获取接口,用于增量更新ES索引
  4 +提供全量和增量索引接口,供外部Java程序调用
5 5 """
6 6  
7   -from fastapi import APIRouter, HTTPException, Query, Request
8   -from typing import Optional
  7 +from fastapi import APIRouter, HTTPException
  8 +from typing import List
  9 +from pydantic import BaseModel
9 10 import logging
10 11  
11   -from ..models import ErrorResponse
12   -
13 12 logger = logging.getLogger(__name__)
14 13  
15 14 router = APIRouter(prefix="/indexer", tags=["indexer"])
16 15  
17 16  
18   -@router.get("/spu/{spu_id}")
19   -async def get_spu_document(
20   - spu_id: str,
21   - tenant_id: str = Query(..., description="租户ID"),
22   - request: Request = None
23   -):
24   - """
25   - 获取单个SPU的ES文档数据(用于增量索引更新)。
  17 +class BulkIndexRequest(BaseModel):
  18 + tenant_id: str
  19 + recreate_index: bool = False
  20 + batch_size: int = 500
  21 +
26 22  
27   - 功能说明:
28   - - 根据 tenant_id 和 spu_id 查询MySQL数据库
29   - - 返回该SPU的完整ES文档数据(JSON格式)
30   - - 外部Java程序可以调用此接口获取数据后推送到ES
  23 +class BatchSpuRequest(BaseModel):
  24 + tenant_id: str
  25 + spu_ids: List[str]
31 26  
32   - 参数:
33   - - spu_id: SPU ID(路径参数)
34   - - tenant_id: 租户ID(查询参数,必需)
35 27  
36   - 返回:
37   - - 成功:返回ES文档JSON对象
38   - - SPU不存在或已删除:返回404
39   - - 其他错误:返回500
  28 +@router.post("/bulk")
  29 +async def bulk_index(request: BulkIndexRequest):
  30 + """全量索引接口"""
  31 + try:
  32 + from ..app import get_bulk_indexing_service
  33 + service = get_bulk_indexing_service()
  34 + if service is None:
  35 + raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized")
  36 + return service.bulk_index(
  37 + tenant_id=request.tenant_id,
  38 + recreate_index=request.recreate_index,
  39 + batch_size=request.batch_size
  40 + )
  41 + except HTTPException:
  42 + raise
  43 + except Exception as e:
  44 + logger.error(f"Error in bulk indexing for tenant_id={request.tenant_id}: {e}", exc_info=True)
  45 + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
40 46  
41   - 示例请求:
42   - ```
43   - GET /indexer/spu/123?tenant_id=1
44   - ```
45 47  
46   - 示例响应:
47   - ```json
48   - {
49   - "tenant_id": "1",
50   - "spu_id": "123",
51   - "title_zh": "商品标题",
52   - "brief_zh": "商品简介",
53   - "description_zh": "商品描述",
54   - "vendor_zh": "供应商",
55   - "tags": ["标签1", "标签2"],
56   - "category_path_zh": "类目1/类目2/类目3",
57   - "category1_name": "类目1",
58   - "category2_name": "类目2",
59   - "category3_name": "类目3",
60   - "category_id": "100",
61   - "category_level": 3,
62   - "min_price": 99.99,
63   - "max_price": 199.99,
64   - "compare_at_price": 299.99,
65   - "sales": 1000,
66   - "total_inventory": 500,
67   - "skus": [...],
68   - "specifications": [...],
69   - ...
70   - }
71   - ```
72   - """
  48 +@router.post("/spus")
  49 +async def get_spu_documents(request: BatchSpuRequest):
  50 + """获取SPU文档接口(支持单个或批量)"""
73 51 try:
74 52 from ..app import get_incremental_service
75   -
76   - # 获取增量服务实例
  53 + if not request.spu_ids:
  54 + raise HTTPException(status_code=400, detail="spu_ids cannot be empty")
  55 + if len(request.spu_ids) > 100:
  56 + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request")
77 57 service = get_incremental_service()
78 58 if service is None:
79   - raise HTTPException(
80   - status_code=503,
81   - detail="Incremental indexer service is not initialized. Please check database connection."
82   - )
83   -
84   - # 获取SPU文档
85   - doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id)
86   -
87   - if doc is None:
88   - raise HTTPException(
89   - status_code=404,
90   - detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted"
91   - )
92   -
93   - return doc
94   -
  59 + raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized")
  60 + success_list, failed_list = [], []
  61 + for spu_id in request.spu_ids:
  62 + try:
  63 + doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id)
  64 + (success_list if doc else failed_list).append({
  65 + "spu_id": spu_id,
  66 + "document": doc
  67 + } if doc else {
  68 + "spu_id": spu_id,
  69 + "error": "SPU not found or deleted"
  70 + })
  71 + except Exception as e:
  72 + failed_list.append({"spu_id": spu_id, "error": str(e)})
  73 + return {
  74 + "success": success_list,
  75 + "failed": failed_list,
  76 + "total": len(request.spu_ids),
  77 + "success_count": len(success_list),
  78 + "failed_count": len(failed_list)
  79 + }
95 80 except HTTPException:
96 81 raise
97 82 except Exception as e:
98   - logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
  83 + logger.error(f"Error getting SPU documents for tenant_id={request.tenant_id}: {e}", exc_info=True)
99 84 raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
100 85  
101 86  
102 87 @router.get("/health")
103 88 async def indexer_health_check():
104   - """
105   - 检查增量索引服务健康状态。
106   -
107   - 返回:
108   - - 服务是否可用
109   - - 数据库连接状态
110   - - 预加载数据状态
111   - """
  89 + """检查索引服务健康状态"""
112 90 try:
113 91 from ..app import get_incremental_service
114   -
  92 + from sqlalchemy import text
115 93 service = get_incremental_service()
116 94 if service is None:
117   - return {
118   - "status": "unavailable",
119   - "message": "Incremental indexer service is not initialized",
120   - "database": "unknown",
121   - "preloaded_data": {
122   - "category_mappings": 0
123   - }
124   - }
125   -
126   - # 检查数据库连接
  95 + return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}}
127 96 try:
128   - from sqlalchemy import text
129 97 with service.db_engine.connect() as conn:
130 98 conn.execute(text("SELECT 1"))
131 99 db_status = "connected"
132 100 except Exception as e:
133 101 db_status = f"disconnected: {str(e)}"
134   -
135 102 return {
136 103 "status": "available",
137 104 "database": db_status,
138   - "preloaded_data": {
139   - "category_mappings": len(service.category_id_to_name)
140   - }
  105 + "preloaded_data": {"category_mappings": len(service.category_id_to_name)}
141 106 }
142   -
143 107 except Exception as e:
144 108 logger.error(f"Error checking indexer health: {e}", exc_info=True)
145   - return {
146   - "status": "error",
147   - "message": str(e)
148   - }
  109 + return {"status": "error", "message": str(e)}
149 110  
... ...
docs/搜索API对接指南.md
... ... @@ -76,12 +76,191 @@ curl -X POST "http://120.76.41.98:6002/search/" \
76 76  
77 77 ## 接口概览
78 78  
79   -| 接口 | HTTP Method | Endpoint |
80   -|------|------|------|
81   -| 搜索 | POST | `/search/` |
82   -| 搜索建议(框架,暂未实现) | GET | `/search/suggestions` |
83   -| 获取文档 | GET | `/search/{doc_id}` |
84   -| 健康检查 | GET | `/admin/health` |
  79 +| 接口 | HTTP Method | Endpoint | 说明 |
  80 +|------|------|------|------|
  81 +| 搜索 | POST | `/search/` | 执行搜索查询 |
  82 +| 全量索引 | POST | `/indexer/bulk` | 全量索引接口 |
  83 +| SPU索引 | POST | `/indexer/spus` | 获取SPU文档(支持单个或批量) |
  84 +| 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 |
  85 +| 搜索建议(框架,暂未实现) | GET | `/search/suggestions` | 搜索建议 |
  86 +| 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 |
  87 +| 健康检查 | GET | `/admin/health` | 服务健康检查 |
  88 +
  89 +---
  90 +
  91 +## 索引接口
  92 +
  93 +### 全量索引接口
  94 +
  95 +- **端点**: `POST /indexer/bulk`
  96 +- **描述**: 将指定租户的所有SPU数据导入到ES索引
  97 +
  98 +#### 请求参数
  99 +
  100 +```json
  101 +{
  102 + "tenant_id": "162",
  103 + "recreate_index": false,
  104 + "batch_size": 500
  105 +}
  106 +```
  107 +
  108 +| 参数 | 类型 | 必填 | 默认值 | 说明 |
  109 +|------|------|------|--------|------|
  110 +| `tenant_id` | string | Y | - | 租户ID |
  111 +| `recreate_index` | boolean | N | false | 是否重建索引(删除旧索引后创建新索引) |
  112 +| `batch_size` | integer | N | 500 | 批量导入大小 |
  113 +
  114 +#### 响应格式
  115 +
  116 +**成功响应(200 OK)**:
  117 +```json
  118 +{
  119 + "success": true,
  120 + "total": 1000,
  121 + "indexed": 1000,
  122 + "failed": 0,
  123 + "elapsed_time": 12.34,
  124 + "index_name": "search_products",
  125 + "tenant_id": "162"
  126 +}
  127 +```
  128 +
  129 +**错误响应**:
  130 +- `400 Bad Request`: 参数错误
  131 +- `503 Service Unavailable`: 服务未初始化
  132 +
  133 +#### 请求示例
  134 +
  135 +**首次索引(重建索引)**:
  136 +```bash
  137 +curl -X POST "http://localhost:6002/indexer/bulk" \
  138 + -H "Content-Type: application/json" \
  139 + -d '{
  140 + "tenant_id": "162",
  141 + "recreate_index": true,
  142 + "batch_size": 500
  143 + }'
  144 +```
  145 +
  146 +**查看日志**:
  147 +```bash
  148 +# 查看API日志(包含索引操作日志)
  149 +tail -f logs/api.log
  150 +
  151 +# 或者查看所有日志文件
  152 +tail -f logs/*.log
  153 +```
  154 +
  155 +**增量更新(不重建索引)**:
  156 +```bash
  157 +curl -X POST "http://localhost:6002/indexer/bulk" \
  158 + -H "Content-Type: application/json" \
  159 + -d '{
  160 + "tenant_id": "162",
  161 + "recreate_index": false,
  162 + "batch_size": 500
  163 + }'
  164 +```
  165 +
  166 +---
  167 +
  168 +### SPU索引接口
  169 +
  170 +- **端点**: `POST /indexer/spus`
  171 +- **描述**: 获取SPU的ES文档数据(支持单个或批量)
  172 +
  173 +#### 请求参数
  174 +
  175 +```json
  176 +{
  177 + "tenant_id": "162",
  178 + "spu_ids": ["123", "456", "789"]
  179 +}
  180 +```
  181 +
  182 +| 参数 | 类型 | 必填 | 说明 |
  183 +|------|------|------|------|
  184 +| `tenant_id` | string | Y | 租户ID |
  185 +| `spu_ids` | array[string] | Y | SPU ID列表(1-100个) |
  186 +
  187 +#### 响应格式
  188 +
  189 +```json
  190 +{
  191 + "success": [
  192 + {
  193 + "spu_id": "123",
  194 + "document": {
  195 + "tenant_id": "162",
  196 + "spu_id": "123",
  197 + "title_zh": "商品标题",
  198 + ...
  199 + }
  200 + },
  201 + {
  202 + "spu_id": "456",
  203 + "document": {...}
  204 + }
  205 + ],
  206 + "failed": [
  207 + {
  208 + "spu_id": "789",
  209 + "error": "SPU not found or deleted"
  210 + }
  211 + ],
  212 + "total": 3,
  213 + "success_count": 2,
  214 + "failed_count": 1
  215 +}
  216 +```
  217 +
  218 +#### 请求示例
  219 +
  220 +**单个SPU**:
  221 +```bash
  222 +curl -X POST "http://localhost:6002/indexer/spus" \
  223 + -H "Content-Type: application/json" \
  224 + -d '{
  225 + "tenant_id": "162",
  226 + "spu_ids": ["123"]
  227 + }'
  228 +```
  229 +
  230 +**批量SPU**:
  231 +```bash
  232 +curl -X POST "http://localhost:6002/indexer/spus" \
  233 + -H "Content-Type: application/json" \
  234 + -d '{
  235 + "tenant_id": "162",
  236 + "spu_ids": ["123", "456", "789"]
  237 + }'
  238 +```
  239 +
  240 +---
  241 +
  242 +### 索引健康检查接口
  243 +
  244 +- **端点**: `GET /indexer/health`
  245 +- **描述**: 检查索引服务的健康状态
  246 +
  247 +#### 响应格式
  248 +
  249 +```json
  250 +{
  251 + "status": "available",
  252 + "database": "connected",
  253 + "preloaded_data": {
  254 + "category_mappings": 150
  255 + }
  256 +}
  257 +```
  258 +
  259 +#### 请求示例
  260 +
  261 +```bash
  262 +curl -X GET "http://localhost:6002/indexer/health"
  263 +```
85 264  
86 265 ---
87 266  
... ... @@ -1163,8 +1342,10 @@ curl "http://localhost:6002/search/12345"
1163 1342  
1164 1343 | 分析器 | 语言 | 描述 |
1165 1344 |--------|------|------|
1166   -| `hanlp_index` | 中文 | 中文索引分析器(用于中文字段) |
1167   -| `hanlp_standard` | 中文 | 中文查询分析器(用于中文字段) |
  1345 +| `index_ansj` | 中文 | 中文索引分析器(用于中文字段) |
  1346 +| `query_ansj` | 中文 | 中文查询分析器(用于中文字段) |
  1347 +| `hanlp_index`(暂不支持) | 中文 | 中文索引分析器(用于中文字段) |
  1348 +| `hanlp_standard`(暂不支持) | 中文 | 中文查询分析器(用于中文字段) |
1168 1349 | `english` | 英文 | 标准英文分析器(用于英文字段) |
1169 1350 | `lowercase` | - | 小写标准化器(用于keyword子字段) |
1170 1351  
... ... @@ -1180,4 +1361,3 @@ curl "http://localhost:6002/search/12345"
1180 1361 | `date` | `date` | 日期时间 |
1181 1362 | `nested` | `nested` | 嵌套对象(specifications, skus, image_embedding) |
1182 1363 | `dense_vector` | `dense_vector` | 向量字段(title_embedding,仅用于搜索) |
1183   -
... ...
indexer/bulk_indexing_service.py 0 → 100644
... ... @@ -0,0 +1,108 @@
  1 +"""
  2 +全量索引服务。
  3 +
  4 +提供全量索引功能,将指定租户的所有SPU数据导入到ES。
  5 +"""
  6 +
  7 +import logging
  8 +from typing import Dict, Any
  9 +from sqlalchemy import Engine
  10 +from utils.es_client import ESClient
  11 +from indexer.spu_transformer import SPUTransformer
  12 +from indexer.bulk_indexer import BulkIndexer
  13 +from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME
  14 +
  15 +logger = logging.getLogger(__name__)
  16 +
  17 +
  18 +class BulkIndexingService:
  19 + """全量索引服务,提供批量导入功能。"""
  20 +
  21 + def __init__(self, db_engine: Engine, es_client: ESClient):
  22 + """
  23 + 初始化全量索引服务。
  24 +
  25 + Args:
  26 + db_engine: SQLAlchemy database engine
  27 + es_client: Elasticsearch client
  28 + """
  29 + self.db_engine = db_engine
  30 + self.es_client = es_client
  31 + self.index_name = DEFAULT_INDEX_NAME
  32 +
  33 + def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
  34 + """执行全量索引"""
  35 + import time
  36 + start_time = time.time()
  37 +
  38 + try:
  39 + # 1. 加载mapping
  40 + logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}")
  41 + mapping = load_mapping()
  42 +
  43 + # 2. 处理索引(删除并重建或创建)
  44 + if recreate_index:
  45 + logger.info(f"[BulkIndexing] Recreating index: {self.index_name}")
  46 + if self.es_client.index_exists(self.index_name):
  47 + if delete_index_if_exists(self.es_client, self.index_name):
  48 + logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}")
  49 + else:
  50 + raise Exception(f"Failed to delete index: {self.index_name}")
  51 +
  52 + if not self.es_client.index_exists(self.index_name):
  53 + logger.info(f"[BulkIndexing] Creating index: {self.index_name}")
  54 + if not self.es_client.create_index(self.index_name, mapping):
  55 + raise Exception(f"Failed to create index: {self.index_name}")
  56 + logger.info(f"[BulkIndexing] Created index: {self.index_name}")
  57 + else:
  58 + logger.info(f"[BulkIndexing] Index already exists: {self.index_name}")
  59 +
  60 + # 3. 转换数据
  61 + logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
  62 + transformer = SPUTransformer(self.db_engine, tenant_id)
  63 + documents = transformer.transform_batch()
  64 +
  65 + if not documents:
  66 + logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}")
  67 + return {
  68 + "success": True,
  69 + "total": 0,
  70 + "indexed": 0,
  71 + "failed": 0,
  72 + "elapsed_time": time.time() - start_time,
  73 + "message": "No documents to index"
  74 + }
  75 +
  76 + logger.info(f"[BulkIndexing] Transformed {len(documents)} documents")
  77 +
  78 + # 4. 批量导入
  79 + logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
  80 + indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size)
  81 + results = indexer.index_documents(
  82 + documents,
  83 + id_field="spu_id",
  84 + show_progress=False # API调用时不打印进度
  85 + )
  86 +
  87 + elapsed_time = time.time() - start_time
  88 +
  89 + logger.info(
  90 + f"[BulkIndexing] Completed for tenant_id={tenant_id}: "
  91 + f"indexed={results['success']}, failed={results['failed']}, "
  92 + f"elapsed={elapsed_time:.2f}s"
  93 + )
  94 +
  95 + return {
  96 + "success": results['failed'] == 0,
  97 + "total": len(documents),
  98 + "indexed": results['success'],
  99 + "failed": results['failed'],
  100 + "elapsed_time": elapsed_time,
  101 + "index_name": self.index_name,
  102 + "tenant_id": tenant_id
  103 + }
  104 +
  105 + except Exception as e:
  106 + logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True)
  107 + raise
  108 +
... ...
indexer/incremental_service.py
1   -"""
2   -增量数据获取服务。
3   -
4   -提供单个SPU的数据获取接口,用于增量更新ES索引。
5   -公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。
6   -"""
  1 +"""增量数据获取服务"""
7 2  
8 3 import pandas as pd
9   -import numpy as np
10 4 import logging
11 5 from typing import Dict, Any, Optional
12 6 from sqlalchemy import text
13   -from config import ConfigLoader
14   -from config.tenant_config_loader import get_tenant_config_loader
15   -from indexer.document_transformer import SPUDocumentTransformer
  7 +from indexer.indexing_utils import load_category_mapping, create_document_transformer
16 8  
17 9 # Configure logger
18 10 logger = logging.getLogger(__name__)
19 11  
20 12  
21 13 class IncrementalIndexerService:
22   - """增量索引服务,提供单个SPU数据获取功能。"""
  14 + """增量索引服务,提供SPU数据获取功能。"""
23 15  
24 16 def __init__(self, db_engine: Any):
25   - """
26   - 初始化增量索引服务。
27   -
28   - Args:
29   - db_engine: SQLAlchemy database engine
30   - """
  17 + """初始化增量索引服务"""
31 18 self.db_engine = db_engine
32 19  
33 20 # 预加载分类映射(全局,所有租户共享)
34   - self.category_id_to_name = self._load_category_mapping()
  21 + self.category_id_to_name = load_category_mapping(db_engine)
35 22 logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
36   -
37   - # 租户配置加载器(延迟加载,按需获取租户配置)
38   - self.tenant_config_loader = get_tenant_config_loader()
39   -
40   - def _load_category_mapping(self) -> Dict[str, str]:
41   - """
42   - 加载分类ID到名称的映射(全局,所有租户共享)。
43   -
44   - Returns:
45   - Dictionary mapping category_id to category_name
46   - """
47   - query = text("""
48   - SELECT DISTINCT
49   - category_id,
50   - category
51   - FROM shoplazza_product_spu
52   - WHERE deleted = 0 AND category_id IS NOT NULL
53   - """)
54   -
55   - mapping = {}
56   - try:
57   - with self.db_engine.connect() as conn:
58   - result = conn.execute(query)
59   - for row in result:
60   - category_id = str(int(row.category_id))
61   - category_name = row.category
62   -
63   - if not category_name or not category_name.strip():
64   - logger.warning(f"Category ID {category_id} has empty name, skipping")
65   - continue
66   -
67   - mapping[category_id] = category_name
68   - except Exception as e:
69   - logger.error(f"Failed to load category mapping: {e}", exc_info=True)
70   -
71   - return mapping
72 23  
73 24 def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
74   - """
75   - 获取单个SPU的ES文档数据。
76   -
77   - Args:
78   - tenant_id: 租户ID
79   - spu_id: SPU ID
80   -
81   - Returns:
82   - ES文档字典,如果SPU不存在或已删除则返回None
83   - """
  25 + """获取SPU的ES文档数据"""
84 26 try:
85 27 # 加载SPU数据
86 28 spu_row = self._load_single_spu(tenant_id, spu_id)
... ... @@ -94,38 +36,10 @@ class IncrementalIndexerService:
94 36 # 加载Option数据
95 37 options_df = self._load_options_for_spu(tenant_id, spu_id)
96 38  
97   - # 获取租户配置
98   - tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id)
99   -
100   - # 加载搜索配置
101   - translator = None
102   - translation_prompts = {}
103   - searchable_option_dimensions = ['option1', 'option2', 'option3']
104   - try:
105   - config_loader = ConfigLoader()
106   - config = config_loader.load_config()
107   - searchable_option_dimensions = config.spu_config.searchable_option_dimensions
108   -
109   - # Initialize translator if translation is enabled
110   - if config.query_config.enable_translation:
111   - from query.translator import Translator
112   - translator = Translator(
113   - api_key=config.query_config.translation_api_key,
114   - use_cache=True, # 索引时使用缓存避免重复翻译
115   - glossary_id=config.query_config.translation_glossary_id,
116   - translation_context=config.query_config.translation_context
117   - )
118   - translation_prompts = config.query_config.translation_prompts
119   - except Exception as e:
120   - logger.warning(f"Failed to load config, using default: {e}")
121   -
122 39 # 创建文档转换器
123   - transformer = SPUDocumentTransformer(
  40 + transformer = create_document_transformer(
124 41 category_id_to_name=self.category_id_to_name,
125   - searchable_option_dimensions=searchable_option_dimensions,
126   - tenant_config=tenant_config,
127   - translator=translator,
128   - translation_prompts=translation_prompts
  42 + tenant_id=tenant_id
129 43 )
130 44  
131 45 # 转换为ES文档
... ... @@ -147,16 +61,7 @@ class IncrementalIndexerService:
147 61 raise
148 62  
149 63 def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
150   - """
151   - 加载单个SPU数据。
152   -
153   - Args:
154   - tenant_id: 租户ID
155   - spu_id: SPU ID
156   -
157   - Returns:
158   - SPU行数据,如果不存在则返回None
159   - """
  64 + """加载单个SPU数据"""
160 65 query = text("""
161 66 SELECT
162 67 id, shop_id, shoplazza_id, title, brief, description,
... ... @@ -180,16 +85,7 @@ class IncrementalIndexerService:
180 85 return df.iloc[0]
181 86  
182 87 def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
183   - """
184   - 加载指定SPU的所有SKU数据。
185   -
186   - Args:
187   - tenant_id: 租户ID
188   - spu_id: SPU ID
189   -
190   - Returns:
191   - SKU数据DataFrame
192   - """
  88 + """加载指定SPU的所有SKU数据"""
193 89 query = text("""
194 90 SELECT
195 91 id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
... ... @@ -210,16 +106,7 @@ class IncrementalIndexerService:
210 106 return df
211 107  
212 108 def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
213   - """
214   - 加载指定SPU的所有Option数据。
215   -
216   - Args:
217   - tenant_id: 租户ID
218   - spu_id: SPU ID
219   -
220   - Returns:
221   - Option数据DataFrame
222   - """
  109 + """加载指定SPU的所有Option数据"""
223 110 query = text("""
224 111 SELECT
225 112 id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
... ...
indexer/indexing_utils.py 0 → 100644
... ... @@ -0,0 +1,112 @@
  1 +"""
  2 +索引工具函数。
  3 +
  4 +提取公共逻辑,避免代码重复。
  5 +"""
  6 +
  7 +import logging
  8 +from typing import Dict, Any, Optional
  9 +from sqlalchemy import Engine, text
  10 +from config import ConfigLoader
  11 +from config.tenant_config_loader import get_tenant_config_loader
  12 +from indexer.document_transformer import SPUDocumentTransformer
  13 +
  14 +logger = logging.getLogger(__name__)
  15 +
  16 +
  17 +def load_category_mapping(db_engine: Engine) -> Dict[str, str]:
  18 + """
  19 + 加载分类ID到名称的映射(全局,所有租户共享)。
  20 +
  21 + Args:
  22 + db_engine: SQLAlchemy database engine
  23 +
  24 + Returns:
  25 + Dictionary mapping category_id to category_name
  26 + """
  27 + query = text("""
  28 + SELECT DISTINCT
  29 + category_id,
  30 + category
  31 + FROM shoplazza_product_spu
  32 + WHERE deleted = 0 AND category_id IS NOT NULL
  33 + """)
  34 +
  35 + mapping = {}
  36 + try:
  37 + with db_engine.connect() as conn:
  38 + result = conn.execute(query)
  39 + for row in result:
  40 + category_id = str(int(row.category_id))
  41 + category_name = row.category
  42 +
  43 + if not category_name or not category_name.strip():
  44 + logger.warning(f"Category ID {category_id} has empty name, skipping")
  45 + continue
  46 +
  47 + mapping[category_id] = category_name
  48 + except Exception as e:
  49 + logger.error(f"Failed to load category mapping: {e}", exc_info=True)
  50 +
  51 + return mapping
  52 +
  53 +
  54 +def create_document_transformer(
  55 + category_id_to_name: Dict[str, str],
  56 + tenant_id: str,
  57 + searchable_option_dimensions: Optional[list] = None,
  58 + translator: Optional[Any] = None,
  59 + translation_prompts: Optional[Dict[str, str]] = None
  60 +) -> SPUDocumentTransformer:
  61 + """
  62 + 创建文档转换器(统一初始化逻辑)。
  63 +
  64 + Args:
  65 + category_id_to_name: 分类ID到名称的映射
  66 + tenant_id: 租户ID
  67 + searchable_option_dimensions: 可搜索的option维度列表(如果为None则从配置加载)
  68 + translator: 翻译器实例(如果为None则根据配置初始化)
  69 + translation_prompts: 翻译提示词配置(如果为None则从配置加载)
  70 +
  71 + Returns:
  72 + SPUDocumentTransformer实例
  73 + """
  74 + # 加载租户配置
  75 + tenant_config_loader = get_tenant_config_loader()
  76 + tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
  77 +
  78 + # 加载搜索配置(如果需要)
  79 + if searchable_option_dimensions is None or translator is None or translation_prompts is None:
  80 + try:
  81 + config_loader = ConfigLoader()
  82 + config = config_loader.load_config()
  83 +
  84 + if searchable_option_dimensions is None:
  85 + searchable_option_dimensions = config.spu_config.searchable_option_dimensions
  86 +
  87 + if translator is None and config.query_config.enable_translation:
  88 + from query.translator import Translator
  89 + translator = Translator(
  90 + api_key=config.query_config.translation_api_key,
  91 + use_cache=True,
  92 + glossary_id=config.query_config.translation_glossary_id,
  93 + translation_context=config.query_config.translation_context
  94 + )
  95 +
  96 + if translation_prompts is None:
  97 + translation_prompts = config.query_config.translation_prompts
  98 + except Exception as e:
  99 + logger.warning(f"Failed to load config, using defaults: {e}")
  100 + if searchable_option_dimensions is None:
  101 + searchable_option_dimensions = ['option1', 'option2', 'option3']
  102 + if translation_prompts is None:
  103 + translation_prompts = {}
  104 +
  105 + return SPUDocumentTransformer(
  106 + category_id_to_name=category_id_to_name,
  107 + searchable_option_dimensions=searchable_option_dimensions,
  108 + tenant_config=tenant_config,
  109 + translator=translator,
  110 + translation_prompts=translation_prompts
  111 + )
  112 +
... ...
indexer/spu_transformer.py
... ... @@ -5,14 +5,10 @@ Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested s
5 5 """
6 6  
7 7 import pandas as pd
8   -import numpy as np
9 8 import logging
10 9 from typing import Dict, Any, List, Optional
11   -from sqlalchemy import create_engine, text
12   -from utils.db_connector import create_db_connection
13   -from config import ConfigLoader
14   -from config.tenant_config_loader import get_tenant_config_loader
15   -from indexer.document_transformer import SPUDocumentTransformer
  10 +from sqlalchemy import text
  11 +from indexer.indexing_utils import load_category_mapping, create_document_transformer
16 12  
17 13 # Configure logger
18 14 logger = logging.getLogger(__name__)
... ... @@ -21,96 +17,19 @@ logger = logging.getLogger(__name__)
21 17 class SPUTransformer:
22 18 """Transform SPU and SKU data into SPU-level ES documents."""
23 19  
24   - def __init__(
25   - self,
26   - db_engine: Any,
27   - tenant_id: str
28   - ):
29   - """
30   - Initialize SPU transformer.
31   -
32   - Args:
33   - db_engine: SQLAlchemy database engine
34   - tenant_id: Tenant ID for filtering data
35   - """
  20 + def __init__(self, db_engine: Any, tenant_id: str):
36 21 self.db_engine = db_engine
37 22 self.tenant_id = tenant_id
38 23  
39   - # Load configuration to get searchable_option_dimensions
40   - translator = None
41   - translation_prompts = {}
42   - try:
43   - config_loader = ConfigLoader()
44   - config = config_loader.load_config()
45   - self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions
46   -
47   - # Initialize translator if translation is enabled
48   - if config.query_config.enable_translation:
49   - from query.translator import Translator
50   - translator = Translator(
51   - api_key=config.query_config.translation_api_key,
52   - use_cache=True, # 索引时使用缓存避免重复翻译
53   - glossary_id=config.query_config.translation_glossary_id,
54   - translation_context=config.query_config.translation_context
55   - )
56   - translation_prompts = config.query_config.translation_prompts
57   - except Exception as e:
58   - logger.warning(f"Failed to load config, using default: {e}")
59   - self.searchable_option_dimensions = ['option1', 'option2', 'option3']
60   -
61 24 # Load category ID to name mapping
62   - self.category_id_to_name = self._load_category_mapping()
63   -
64   - # Load tenant config
65   - tenant_config_loader = get_tenant_config_loader()
66   - tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
  25 + self.category_id_to_name = load_category_mapping(db_engine)
  26 + logger.info(f"Loaded {len(self.category_id_to_name)} category ID to name mappings")
67 27  
68 28 # Initialize document transformer
69   - self.document_transformer = SPUDocumentTransformer(
  29 + self.document_transformer = create_document_transformer(
70 30 category_id_to_name=self.category_id_to_name,
71   - searchable_option_dimensions=self.searchable_option_dimensions,
72   - tenant_config=tenant_config,
73   - translator=translator,
74   - translation_prompts=translation_prompts
  31 + tenant_id=tenant_id
75 32 )
76   -
77   - def _load_category_mapping(self) -> Dict[str, str]:
78   - """
79   - Load category ID to name mapping from database.
80   -
81   - Returns:
82   - Dictionary mapping category_id to category_name
83   - """
84   - query = text("""
85   - SELECT DISTINCT
86   - category_id,
87   - category
88   - FROM shoplazza_product_spu
89   - WHERE deleted = 0 AND category_id IS NOT NULL
90   - """)
91   -
92   - mapping = {}
93   - with self.db_engine.connect() as conn:
94   - result = conn.execute(query)
95   - for row in result:
96   - category_id = str(int(row.category_id))
97   - category_name = row.category
98   -
99   - if not category_name or not category_name.strip():
100   - logger.warning(f"Category ID {category_id} has empty name, skipping")
101   - continue
102   -
103   - mapping[category_id] = category_name
104   -
105   - logger.info(f"Loaded {len(mapping)} category ID to name mappings")
106   -
107   - # Log all category mappings for debugging
108   - if mapping:
109   - logger.debug("Category ID mappings:")
110   - for cid, name in sorted(mapping.items()):
111   - logger.debug(f" {cid} -> {name}")
112   -
113   - return mapping
114 33  
115 34 def load_spu_data(self) -> pd.DataFrame:
116 35 """
... ...