From 3c1f80319aeaae8a0192a3d51f90575556cb718f Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 8 Dec 2025 09:41:34 +0800 Subject: [PATCH] api/routes/indexer.py - 新增批量索引接口: POST /indexer/bulk - 全量索引功能 - SPU接口改进: POST /indexer/spus - 支持批量获取SPU文档(最多100个) --- COMMIT_MESSAGE.md | 154 ---------------------------------------------------------------------------------------------------------------------------------------------------------- api/app.py | 49 ++++++++++++++++++++++++++++++++++--------------- api/routes/indexer.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------- docs/搜索API对接指南.md | 198 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- indexer/bulk_indexing_service.py | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ indexer/incremental_service.py | 135 +++++++++++---------------------------------------------------------------------------------------------------------------------------- indexer/indexing_utils.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ indexer/spu_transformer.py | 95 +++++++---------------------------------------------------------------------------------------- 8 files changed, 526 insertions(+), 494 deletions(-) delete mode 100644 COMMIT_MESSAGE.md create mode 100644 indexer/bulk_indexing_service.py create mode 100644 indexer/indexing_utils.py diff --git a/COMMIT_MESSAGE.md b/COMMIT_MESSAGE.md deleted file mode 100644 index 5157e15..0000000 --- a/COMMIT_MESSAGE.md +++ /dev/null @@ -1,154 +0,0 @@ -# 本次修改总结 - -## 最终状态 - -### 1. 增量数据获取服务 - -**新增文件**: -- `indexer/incremental_service.py`: 增量索引服务,提供单个SPU数据获取 -- `api/routes/indexer.py`: 增量索引API路由 -- `indexer/test_indexing.py`: 索引功能测试脚本 - -**功能**: -- 提供 `GET /indexer/spu/{spu_id}?tenant_id={tenant_id}` 接口,返回单个SPU的ES文档数据 -- 服务启动时预加载分类映射(全局共享),提高性能 -- 支持按需加载租户配置和搜索配置 - -### 2. 公共文档转换器 - -**新增文件**: -- `indexer/document_transformer.py`: SPU文档转换器,提取全量和增量共用的转换逻辑 - -**功能**: -- 统一了全量索引(SPUTransformer)和增量索引(IncrementalIndexerService)的文档转换逻辑 -- 消除了约300行重复代码 -- 支持根据租户配置进行语言处理和翻译 - -### 3. 租户配置系统 - -**配置位置**: -- 租户配置合并到统一配置文件 `config/config.yaml` 的 `tenant_config` 部分 -- 删除了独立的 `config/tenant_config.json` 文件 - -**配置结构**: -```yaml -tenant_config: - default: - primary_language: "zh" - translate_to_en: true - translate_to_zh: false - tenants: - "162": - primary_language: "zh" - translate_to_en: false # 翻译关闭 - translate_to_zh: false -``` - -**功能**: -- 每个租户可配置主语言和翻译选项 -- 租户162配置为翻译关闭(用于测试) -- 未配置的租户使用默认配置 - -### 4. 翻译功能集成 - -**翻译模块增强**: -- `query/translator.py`: 支持提示词参数,作为DeepL API的`context`参数传递 -- 修复了重复的executor初始化代码 -- 统一使用logger替代print语句 - -**翻译提示词配置**: -- 在 `config/config.yaml` 的 `translation_prompts` 部分配置 -- 支持中英文提示词: - - `product_title_zh/en`: 商品标题翻译提示词 - - `query_zh/en`: 查询翻译提示词 - - `default_zh/en`: 默认翻译用词 - -**翻译模式**: -- **索引场景**:同步翻译,等待结果返回,使用缓存避免重复翻译 -- **查询场景**:异步翻译,立即返回缓存结果,后台翻译缺失项 - -**DeepL Context参数**: -- 提示词作为DeepL API的`context`参数传递(不参与翻译,仅提供上下文) -- Context中的字符不计入DeepL计费 - -### 5. 代码重构 - -**消除冗余**: -- 提取公共转换逻辑到 `SPUDocumentTransformer` -- `SPUTransformer` 和 `IncrementalIndexerService` 都使用公共转换器 -- 移除了重复的 `_transform_spu_to_doc` 和 `_transform_sku_row` 方法 - -**架构优化**: -- 全量和增量索引共用同一转换逻辑 -- 分类映射在服务启动时预加载(全局共享) -- 租户配置按需加载(支持热更新) - -### 6. 测试 - -**测试文件位置**(遵循模块化原则): -- `indexer/test_indexing.py`: 索引功能测试(全量、增量、租户配置、文档转换器) -- `query/test_translation.py`: 翻译功能测试(同步、异步、缓存、Context参数) - -### 7. 文档更新 - -- `docs/索引数据接口文档.md`: 更新了租户配置说明,从独立JSON文件改为统一配置文件 -- `docs/翻译功能测试说明.md`: 新增翻译功能测试说明文档 - -## 修改的文件 - -### 新增文件 -- `indexer/incremental_service.py` -- `indexer/document_transformer.py` -- `indexer/test_indexing.py` -- `api/routes/indexer.py` -- `query/test_translation.py` -- `config/tenant_config_loader.py` (重构,从JSON改为YAML) -- `docs/翻译功能测试说明.md` - -### 修改文件 -- `config/config.yaml`: 添加租户配置和翻译提示词配置 -- `config/config_loader.py`: 支持租户配置加载 -- `config/tenant_config_loader.py`: 从统一配置文件加载租户配置 -- `indexer/spu_transformer.py`: 使用公共转换器,集成翻译服务 -- `indexer/incremental_service.py`: 使用公共转换器,集成翻译服务 -- `query/translator.py`: 支持提示词作为context参数,修复冗余代码 -- `query/query_parser.py`: 使用翻译提示词 -- `api/app.py`: 注册增量索引路由,初始化增量服务 -- `docs/索引数据接口文档.md`: 更新租户配置说明 - -### 删除文件 -- `config/tenant_config.json`: 合并到统一配置文件 - -## 测试验证 - -### 租户162测试(翻译关闭) -- 全量索引:验证翻译功能关闭,title_en为None -- 增量索引:验证翻译功能关闭,title_en为None -- 文档转换器:验证根据租户配置正确处理翻译 - -### 其他租户测试(翻译开启) -- 验证翻译功能正常工作 -- 验证提示词正确使用 - -## 架构设计 - -### 数据流 -``` -MySQL数据 - ↓ -SPUTransformer / IncrementalIndexerService (数据加载层) - ↓ -SPUDocumentTransformer (公共转换层) - ↓ -ES文档 (输出) -``` - -### 配置层次 -1. **索引配置** (`config/config.yaml`): 搜索行为配置 -2. **租户配置** (`config/config.yaml` 的 `tenant_config` 部分): 数据转换配置 - -### 性能优化 -1. 公共数据预加载:分类映射在服务启动时一次性加载 -2. 配置按需加载:租户配置和搜索配置按需加载,支持热更新 -3. 翻译缓存:索引时使用缓存避免重复翻译 - diff --git a/api/app.py b/api/app.py index a3fb577..773bff5 100644 --- a/api/app.py +++ b/api/app.py @@ -24,12 +24,15 @@ from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded # Configure logging with better formatting +import pathlib +log_dir = pathlib.Path('logs') +log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), - logging.FileHandler('/tmp/search_engine_api.log', mode='a') + logging.FileHandler(log_dir / 'api.log', mode='a', encoding='utf-8') ] ) logger = logging.getLogger(__name__) @@ -54,6 +57,7 @@ _searcher: Optional[Searcher] = None _query_parser: Optional[QueryParser] = None _config = None _incremental_service: Optional[IncrementalIndexerService] = None +_bulk_indexing_service = None def init_service(es_host: str = "http://localhost:9200"): @@ -63,7 +67,7 @@ def init_service(es_host: str = "http://localhost:9200"): Args: es_host: Elasticsearch host URL """ - global _es_client, _searcher, _query_parser, _config, _incremental_service + global _es_client, _searcher, _query_parser, _config, _incremental_service, _bulk_indexing_service start_time = time.time() logger.info("Initializing search service (multi-tenant)") @@ -96,16 +100,20 @@ def init_service(es_host: str = "http://localhost:9200"): logger.info("Initializing searcher...") _searcher = Searcher(_es_client, _config, _query_parser) - # Initialize incremental indexer service (if DB config is available) + # Initialize indexing services (if DB config is available) try: - db_host = DB_CONFIG.get('host') - db_port = DB_CONFIG.get('port', 3306) - db_database = DB_CONFIG.get('database') - db_username = DB_CONFIG.get('username') - db_password = DB_CONFIG.get('password') + from utils.db_connector import create_db_connection + from indexer.incremental_service import IncrementalIndexerService + from indexer.bulk_indexing_service import BulkIndexingService + + db_host = os.getenv('DB_HOST') + db_port = int(os.getenv('DB_PORT', 3306)) + db_database = os.getenv('DB_DATABASE') + db_username = os.getenv('DB_USERNAME') + db_password = os.getenv('DB_PASSWORD') if all([db_host, db_database, db_username, db_password]): - logger.info("Initializing incremental indexer service...") + logger.info("Initializing database connection for indexing services...") db_engine = create_db_connection( host=db_host, port=db_port, @@ -113,15 +121,22 @@ def init_service(es_host: str = "http://localhost:9200"): username=db_username, password=db_password ) + + # Initialize incremental service _incremental_service = IncrementalIndexerService(db_engine) logger.info("Incremental indexer service initialized") + + # Initialize bulk indexing service + _bulk_indexing_service = BulkIndexingService(db_engine, _es_client) + logger.info("Bulk indexing service initialized") else: - logger.warning("Database configuration incomplete, incremental indexer service will not be available") + logger.warning("Database config incomplete, indexing services will not be available") logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") except Exception as e: - logger.warning(f"Failed to initialize incremental indexer service: {e}") - logger.warning("Incremental indexer endpoints will not be available") + logger.warning(f"Failed to initialize indexing services: {e}") + logger.warning("Indexing endpoints will not be available") _incremental_service = None + _bulk_indexing_service = None elapsed = time.time() - start_time logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}") @@ -162,6 +177,11 @@ def get_incremental_service() -> Optional[IncrementalIndexerService]: return _incremental_service +def get_bulk_indexing_service(): + """Get bulk indexing service instance.""" + return _bulk_indexing_service + + # Create FastAPI app with enhanced configuration app = FastAPI( title="E-Commerce Search API", @@ -207,15 +227,14 @@ app.add_middleware( async def startup_event(): """Initialize service on startup.""" es_host = os.getenv("ES_HOST", "http://localhost:9200") - logger.info("Starting E-Commerce Search API (Multi-Tenant)") logger.info(f"Elasticsearch Host: {es_host}") - + try: init_service(es_host=es_host) logger.info("Service initialized successfully") except Exception as e: - logger.error(f"Failed to initialize service: {e}") + logger.error(f"Failed to initialize service: {e}", exc_info=True) logger.warning("Service will start but may not function correctly") diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 3705500..71999e7 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -1,149 +1,110 @@ """ -增量索引API路由。 +索引API路由。 -提供单个SPU数据获取接口,用于增量更新ES索引。 +提供全量和增量索引接口,供外部Java程序调用。 """ -from fastapi import APIRouter, HTTPException, Query, Request -from typing import Optional +from fastapi import APIRouter, HTTPException +from typing import List +from pydantic import BaseModel import logging -from ..models import ErrorResponse - logger = logging.getLogger(__name__) router = APIRouter(prefix="/indexer", tags=["indexer"]) -@router.get("/spu/{spu_id}") -async def get_spu_document( - spu_id: str, - tenant_id: str = Query(..., description="租户ID"), - request: Request = None -): - """ - 获取单个SPU的ES文档数据(用于增量索引更新)。 +class BulkIndexRequest(BaseModel): + tenant_id: str + recreate_index: bool = False + batch_size: int = 500 + - 功能说明: - - 根据 tenant_id 和 spu_id 查询MySQL数据库 - - 返回该SPU的完整ES文档数据(JSON格式) - - 外部Java程序可以调用此接口获取数据后推送到ES +class BatchSpuRequest(BaseModel): + tenant_id: str + spu_ids: List[str] - 参数: - - spu_id: SPU ID(路径参数) - - tenant_id: 租户ID(查询参数,必需) - 返回: - - 成功:返回ES文档JSON对象 - - SPU不存在或已删除:返回404 - - 其他错误:返回500 +@router.post("/bulk") +async def bulk_index(request: BulkIndexRequest): + """全量索引接口""" + try: + from ..app import get_bulk_indexing_service + service = get_bulk_indexing_service() + if service is None: + raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") + return service.bulk_index( + tenant_id=request.tenant_id, + recreate_index=request.recreate_index, + batch_size=request.batch_size + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error in bulk indexing for tenant_id={request.tenant_id}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") - 示例请求: - ``` - GET /indexer/spu/123?tenant_id=1 - ``` - 示例响应: - ```json - { - "tenant_id": "1", - "spu_id": "123", - "title_zh": "商品标题", - "brief_zh": "商品简介", - "description_zh": "商品描述", - "vendor_zh": "供应商", - "tags": ["标签1", "标签2"], - "category_path_zh": "类目1/类目2/类目3", - "category1_name": "类目1", - "category2_name": "类目2", - "category3_name": "类目3", - "category_id": "100", - "category_level": 3, - "min_price": 99.99, - "max_price": 199.99, - "compare_at_price": 299.99, - "sales": 1000, - "total_inventory": 500, - "skus": [...], - "specifications": [...], - ... - } - ``` - """ +@router.post("/spus") +async def get_spu_documents(request: BatchSpuRequest): + """获取SPU文档接口(支持单个或批量)""" try: from ..app import get_incremental_service - - # 获取增量服务实例 + if not request.spu_ids: + raise HTTPException(status_code=400, detail="spu_ids cannot be empty") + if len(request.spu_ids) > 100: + raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") service = get_incremental_service() if service is None: - raise HTTPException( - status_code=503, - detail="Incremental indexer service is not initialized. Please check database connection." - ) - - # 获取SPU文档 - doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) - - if doc is None: - raise HTTPException( - status_code=404, - detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted" - ) - - return doc - + raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") + success_list, failed_list = [], [] + for spu_id in request.spu_ids: + try: + doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id) + (success_list if doc else failed_list).append({ + "spu_id": spu_id, + "document": doc + } if doc else { + "spu_id": spu_id, + "error": "SPU not found or deleted" + }) + except Exception as e: + failed_list.append({"spu_id": spu_id, "error": str(e)}) + return { + "success": success_list, + "failed": failed_list, + "total": len(request.spu_ids), + "success_count": len(success_list), + "failed_count": len(failed_list) + } except HTTPException: raise except Exception as e: - logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) + logger.error(f"Error getting SPU documents for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/health") async def indexer_health_check(): - """ - 检查增量索引服务健康状态。 - - 返回: - - 服务是否可用 - - 数据库连接状态 - - 预加载数据状态 - """ + """检查索引服务健康状态""" try: from ..app import get_incremental_service - + from sqlalchemy import text service = get_incremental_service() if service is None: - return { - "status": "unavailable", - "message": "Incremental indexer service is not initialized", - "database": "unknown", - "preloaded_data": { - "category_mappings": 0 - } - } - - # 检查数据库连接 + return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}} try: - from sqlalchemy import text with service.db_engine.connect() as conn: conn.execute(text("SELECT 1")) db_status = "connected" except Exception as e: db_status = f"disconnected: {str(e)}" - return { "status": "available", "database": db_status, - "preloaded_data": { - "category_mappings": len(service.category_id_to_name) - } + "preloaded_data": {"category_mappings": len(service.category_id_to_name)} } - except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) - return { - "status": "error", - "message": str(e) - } + return {"status": "error", "message": str(e)} diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index da5d872..0a99a34 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -76,12 +76,191 @@ curl -X POST "http://120.76.41.98:6002/search/" \ ## 接口概览 -| 接口 | HTTP Method | Endpoint | -|------|------|------| -| 搜索 | POST | `/search/` | -| 搜索建议(框架,暂未实现) | GET | `/search/suggestions` | -| 获取文档 | GET | `/search/{doc_id}` | -| 健康检查 | GET | `/admin/health` | +| 接口 | HTTP Method | Endpoint | 说明 | +|------|------|------|------| +| 搜索 | POST | `/search/` | 执行搜索查询 | +| 全量索引 | POST | `/indexer/bulk` | 全量索引接口 | +| SPU索引 | POST | `/indexer/spus` | 获取SPU文档(支持单个或批量) | +| 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | +| 搜索建议(框架,暂未实现) | GET | `/search/suggestions` | 搜索建议 | +| 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | +| 健康检查 | GET | `/admin/health` | 服务健康检查 | + +--- + +## 索引接口 + +### 全量索引接口 + +- **端点**: `POST /indexer/bulk` +- **描述**: 将指定租户的所有SPU数据导入到ES索引 + +#### 请求参数 + +```json +{ + "tenant_id": "162", + "recreate_index": false, + "batch_size": 500 +} +``` + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| `tenant_id` | string | Y | - | 租户ID | +| `recreate_index` | boolean | N | false | 是否重建索引(删除旧索引后创建新索引) | +| `batch_size` | integer | N | 500 | 批量导入大小 | + +#### 响应格式 + +**成功响应(200 OK)**: +```json +{ + "success": true, + "total": 1000, + "indexed": 1000, + "failed": 0, + "elapsed_time": 12.34, + "index_name": "search_products", + "tenant_id": "162" +} +``` + +**错误响应**: +- `400 Bad Request`: 参数错误 +- `503 Service Unavailable`: 服务未初始化 + +#### 请求示例 + +**首次索引(重建索引)**: +```bash +curl -X POST "http://localhost:6002/indexer/bulk" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "162", + "recreate_index": true, + "batch_size": 500 + }' +``` + +**查看日志**: +```bash +# 查看API日志(包含索引操作日志) +tail -f logs/api.log + +# 或者查看所有日志文件 +tail -f logs/*.log +``` + +**增量更新(不重建索引)**: +```bash +curl -X POST "http://localhost:6002/indexer/bulk" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "162", + "recreate_index": false, + "batch_size": 500 + }' +``` + +--- + +### SPU索引接口 + +- **端点**: `POST /indexer/spus` +- **描述**: 获取SPU的ES文档数据(支持单个或批量) + +#### 请求参数 + +```json +{ + "tenant_id": "162", + "spu_ids": ["123", "456", "789"] +} +``` + +| 参数 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `tenant_id` | string | Y | 租户ID | +| `spu_ids` | array[string] | Y | SPU ID列表(1-100个) | + +#### 响应格式 + +```json +{ + "success": [ + { + "spu_id": "123", + "document": { + "tenant_id": "162", + "spu_id": "123", + "title_zh": "商品标题", + ... + } + }, + { + "spu_id": "456", + "document": {...} + } + ], + "failed": [ + { + "spu_id": "789", + "error": "SPU not found or deleted" + } + ], + "total": 3, + "success_count": 2, + "failed_count": 1 +} +``` + +#### 请求示例 + +**单个SPU**: +```bash +curl -X POST "http://localhost:6002/indexer/spus" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "162", + "spu_ids": ["123"] + }' +``` + +**批量SPU**: +```bash +curl -X POST "http://localhost:6002/indexer/spus" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "162", + "spu_ids": ["123", "456", "789"] + }' +``` + +--- + +### 索引健康检查接口 + +- **端点**: `GET /indexer/health` +- **描述**: 检查索引服务的健康状态 + +#### 响应格式 + +```json +{ + "status": "available", + "database": "connected", + "preloaded_data": { + "category_mappings": 150 + } +} +``` + +#### 请求示例 + +```bash +curl -X GET "http://localhost:6002/indexer/health" +``` --- @@ -1163,8 +1342,10 @@ curl "http://localhost:6002/search/12345" | 分析器 | 语言 | 描述 | |--------|------|------| -| `hanlp_index` | 中文 | 中文索引分析器(用于中文字段) | -| `hanlp_standard` | 中文 | 中文查询分析器(用于中文字段) | +| `index_ansj` | 中文 | 中文索引分析器(用于中文字段) | +| `query_ansj` | 中文 | 中文查询分析器(用于中文字段) | +| `hanlp_index`(暂不支持) | 中文 | 中文索引分析器(用于中文字段) | +| `hanlp_standard`(暂不支持) | 中文 | 中文查询分析器(用于中文字段) | | `english` | 英文 | 标准英文分析器(用于英文字段) | | `lowercase` | - | 小写标准化器(用于keyword子字段) | @@ -1180,4 +1361,3 @@ curl "http://localhost:6002/search/12345" | `date` | `date` | 日期时间 | | `nested` | `nested` | 嵌套对象(specifications, skus, image_embedding) | | `dense_vector` | `dense_vector` | 向量字段(title_embedding,仅用于搜索) | - diff --git a/indexer/bulk_indexing_service.py b/indexer/bulk_indexing_service.py new file mode 100644 index 0000000..b50699b --- /dev/null +++ b/indexer/bulk_indexing_service.py @@ -0,0 +1,108 @@ +""" +全量索引服务。 + +提供全量索引功能,将指定租户的所有SPU数据导入到ES。 +""" + +import logging +from typing import Dict, Any +from sqlalchemy import Engine +from utils.es_client import ESClient +from indexer.spu_transformer import SPUTransformer +from indexer.bulk_indexer import BulkIndexer +from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME + +logger = logging.getLogger(__name__) + + +class BulkIndexingService: + """全量索引服务,提供批量导入功能。""" + + def __init__(self, db_engine: Engine, es_client: ESClient): + """ + 初始化全量索引服务。 + + Args: + db_engine: SQLAlchemy database engine + es_client: Elasticsearch client + """ + self.db_engine = db_engine + self.es_client = es_client + self.index_name = DEFAULT_INDEX_NAME + + def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]: + """执行全量索引""" + import time + start_time = time.time() + + try: + # 1. 加载mapping + logger.info(f"[BulkIndexing] Loading mapping for tenant_id={tenant_id}") + mapping = load_mapping() + + # 2. 处理索引(删除并重建或创建) + if recreate_index: + logger.info(f"[BulkIndexing] Recreating index: {self.index_name}") + if self.es_client.index_exists(self.index_name): + if delete_index_if_exists(self.es_client, self.index_name): + logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}") + else: + raise Exception(f"Failed to delete index: {self.index_name}") + + if not self.es_client.index_exists(self.index_name): + logger.info(f"[BulkIndexing] Creating index: {self.index_name}") + if not self.es_client.create_index(self.index_name, mapping): + raise Exception(f"Failed to create index: {self.index_name}") + logger.info(f"[BulkIndexing] Created index: {self.index_name}") + else: + logger.info(f"[BulkIndexing] Index already exists: {self.index_name}") + + # 3. 转换数据 + logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}") + transformer = SPUTransformer(self.db_engine, tenant_id) + documents = transformer.transform_batch() + + if not documents: + logger.warning(f"[BulkIndexing] No documents to index for tenant_id={tenant_id}") + return { + "success": True, + "total": 0, + "indexed": 0, + "failed": 0, + "elapsed_time": time.time() - start_time, + "message": "No documents to index" + } + + logger.info(f"[BulkIndexing] Transformed {len(documents)} documents") + + # 4. 批量导入 + logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})") + indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size) + results = indexer.index_documents( + documents, + id_field="spu_id", + show_progress=False # API调用时不打印进度 + ) + + elapsed_time = time.time() - start_time + + logger.info( + f"[BulkIndexing] Completed for tenant_id={tenant_id}: " + f"indexed={results['success']}, failed={results['failed']}, " + f"elapsed={elapsed_time:.2f}s" + ) + + return { + "success": results['failed'] == 0, + "total": len(documents), + "indexed": results['success'], + "failed": results['failed'], + "elapsed_time": elapsed_time, + "index_name": self.index_name, + "tenant_id": tenant_id + } + + except Exception as e: + logger.error(f"[BulkIndexing] Failed for tenant_id={tenant_id}: {e}", exc_info=True) + raise + diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 34fcd58..0d737de 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -1,86 +1,28 @@ -""" -增量数据获取服务。 - -提供单个SPU的数据获取接口,用于增量更新ES索引。 -公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。 -""" +"""增量数据获取服务""" import pandas as pd -import numpy as np import logging from typing import Dict, Any, Optional from sqlalchemy import text -from config import ConfigLoader -from config.tenant_config_loader import get_tenant_config_loader -from indexer.document_transformer import SPUDocumentTransformer +from indexer.indexing_utils import load_category_mapping, create_document_transformer # Configure logger logger = logging.getLogger(__name__) class IncrementalIndexerService: - """增量索引服务,提供单个SPU数据获取功能。""" + """增量索引服务,提供SPU数据获取功能。""" def __init__(self, db_engine: Any): - """ - 初始化增量索引服务。 - - Args: - db_engine: SQLAlchemy database engine - """ + """初始化增量索引服务""" self.db_engine = db_engine # 预加载分类映射(全局,所有租户共享) - self.category_id_to_name = self._load_category_mapping() + self.category_id_to_name = load_category_mapping(db_engine) logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") - - # 租户配置加载器(延迟加载,按需获取租户配置) - self.tenant_config_loader = get_tenant_config_loader() - - def _load_category_mapping(self) -> Dict[str, str]: - """ - 加载分类ID到名称的映射(全局,所有租户共享)。 - - Returns: - Dictionary mapping category_id to category_name - """ - query = text(""" - SELECT DISTINCT - category_id, - category - FROM shoplazza_product_spu - WHERE deleted = 0 AND category_id IS NOT NULL - """) - - mapping = {} - try: - with self.db_engine.connect() as conn: - result = conn.execute(query) - for row in result: - category_id = str(int(row.category_id)) - category_name = row.category - - if not category_name or not category_name.strip(): - logger.warning(f"Category ID {category_id} has empty name, skipping") - continue - - mapping[category_id] = category_name - except Exception as e: - logger.error(f"Failed to load category mapping: {e}", exc_info=True) - - return mapping def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: - """ - 获取单个SPU的ES文档数据。 - - Args: - tenant_id: 租户ID - spu_id: SPU ID - - Returns: - ES文档字典,如果SPU不存在或已删除则返回None - """ + """获取SPU的ES文档数据""" try: # 加载SPU数据 spu_row = self._load_single_spu(tenant_id, spu_id) @@ -94,38 +36,10 @@ class IncrementalIndexerService: # 加载Option数据 options_df = self._load_options_for_spu(tenant_id, spu_id) - # 获取租户配置 - tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id) - - # 加载搜索配置 - translator = None - translation_prompts = {} - searchable_option_dimensions = ['option1', 'option2', 'option3'] - try: - config_loader = ConfigLoader() - config = config_loader.load_config() - searchable_option_dimensions = config.spu_config.searchable_option_dimensions - - # Initialize translator if translation is enabled - if config.query_config.enable_translation: - from query.translator import Translator - translator = Translator( - api_key=config.query_config.translation_api_key, - use_cache=True, # 索引时使用缓存避免重复翻译 - glossary_id=config.query_config.translation_glossary_id, - translation_context=config.query_config.translation_context - ) - translation_prompts = config.query_config.translation_prompts - except Exception as e: - logger.warning(f"Failed to load config, using default: {e}") - # 创建文档转换器 - transformer = SPUDocumentTransformer( + transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, - searchable_option_dimensions=searchable_option_dimensions, - tenant_config=tenant_config, - translator=translator, - translation_prompts=translation_prompts + tenant_id=tenant_id ) # 转换为ES文档 @@ -147,16 +61,7 @@ class IncrementalIndexerService: raise def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]: - """ - 加载单个SPU数据。 - - Args: - tenant_id: 租户ID - spu_id: SPU ID - - Returns: - SPU行数据,如果不存在则返回None - """ + """加载单个SPU数据""" query = text(""" SELECT id, shop_id, shoplazza_id, title, brief, description, @@ -180,16 +85,7 @@ class IncrementalIndexerService: return df.iloc[0] def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: - """ - 加载指定SPU的所有SKU数据。 - - Args: - tenant_id: 租户ID - spu_id: SPU ID - - Returns: - SKU数据DataFrame - """ + """加载指定SPU的所有SKU数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, @@ -210,16 +106,7 @@ class IncrementalIndexerService: return df def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: - """ - 加载指定SPU的所有Option数据。 - - Args: - tenant_id: 租户ID - spu_id: SPU ID - - Returns: - Option数据DataFrame - """ + """加载指定SPU的所有Option数据""" query = text(""" SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py new file mode 100644 index 0000000..b0e0356 --- /dev/null +++ b/indexer/indexing_utils.py @@ -0,0 +1,112 @@ +""" +索引工具函数。 + +提取公共逻辑,避免代码重复。 +""" + +import logging +from typing import Dict, Any, Optional +from sqlalchemy import Engine, text +from config import ConfigLoader +from config.tenant_config_loader import get_tenant_config_loader +from indexer.document_transformer import SPUDocumentTransformer + +logger = logging.getLogger(__name__) + + +def load_category_mapping(db_engine: Engine) -> Dict[str, str]: + """ + 加载分类ID到名称的映射(全局,所有租户共享)。 + + Args: + db_engine: SQLAlchemy database engine + + Returns: + Dictionary mapping category_id to category_name + """ + query = text(""" + SELECT DISTINCT + category_id, + category + FROM shoplazza_product_spu + WHERE deleted = 0 AND category_id IS NOT NULL + """) + + mapping = {} + try: + with db_engine.connect() as conn: + result = conn.execute(query) + for row in result: + category_id = str(int(row.category_id)) + category_name = row.category + + if not category_name or not category_name.strip(): + logger.warning(f"Category ID {category_id} has empty name, skipping") + continue + + mapping[category_id] = category_name + except Exception as e: + logger.error(f"Failed to load category mapping: {e}", exc_info=True) + + return mapping + + +def create_document_transformer( + category_id_to_name: Dict[str, str], + tenant_id: str, + searchable_option_dimensions: Optional[list] = None, + translator: Optional[Any] = None, + translation_prompts: Optional[Dict[str, str]] = None +) -> SPUDocumentTransformer: + """ + 创建文档转换器(统一初始化逻辑)。 + + Args: + category_id_to_name: 分类ID到名称的映射 + tenant_id: 租户ID + searchable_option_dimensions: 可搜索的option维度列表(如果为None则从配置加载) + translator: 翻译器实例(如果为None则根据配置初始化) + translation_prompts: 翻译提示词配置(如果为None则从配置加载) + + Returns: + SPUDocumentTransformer实例 + """ + # 加载租户配置 + tenant_config_loader = get_tenant_config_loader() + tenant_config = tenant_config_loader.get_tenant_config(tenant_id) + + # 加载搜索配置(如果需要) + if searchable_option_dimensions is None or translator is None or translation_prompts is None: + try: + config_loader = ConfigLoader() + config = config_loader.load_config() + + if searchable_option_dimensions is None: + searchable_option_dimensions = config.spu_config.searchable_option_dimensions + + if translator is None and config.query_config.enable_translation: + from query.translator import Translator + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True, + glossary_id=config.query_config.translation_glossary_id, + translation_context=config.query_config.translation_context + ) + + if translation_prompts is None: + translation_prompts = config.query_config.translation_prompts + except Exception as e: + logger.warning(f"Failed to load config, using defaults: {e}") + if searchable_option_dimensions is None: + searchable_option_dimensions = ['option1', 'option2', 'option3'] + if translation_prompts is None: + translation_prompts = {} + + return SPUDocumentTransformer( + category_id_to_name=category_id_to_name, + searchable_option_dimensions=searchable_option_dimensions, + tenant_config=tenant_config, + translator=translator, + translation_prompts=translation_prompts + ) + diff --git a/indexer/spu_transformer.py b/indexer/spu_transformer.py index eb03ebb..5c78b34 100644 --- a/indexer/spu_transformer.py +++ b/indexer/spu_transformer.py @@ -5,14 +5,10 @@ Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested s """ import pandas as pd -import numpy as np import logging from typing import Dict, Any, List, Optional -from sqlalchemy import create_engine, text -from utils.db_connector import create_db_connection -from config import ConfigLoader -from config.tenant_config_loader import get_tenant_config_loader -from indexer.document_transformer import SPUDocumentTransformer +from sqlalchemy import text +from indexer.indexing_utils import load_category_mapping, create_document_transformer # Configure logger logger = logging.getLogger(__name__) @@ -21,96 +17,19 @@ logger = logging.getLogger(__name__) class SPUTransformer: """Transform SPU and SKU data into SPU-level ES documents.""" - def __init__( - self, - db_engine: Any, - tenant_id: str - ): - """ - Initialize SPU transformer. - - Args: - db_engine: SQLAlchemy database engine - tenant_id: Tenant ID for filtering data - """ + def __init__(self, db_engine: Any, tenant_id: str): self.db_engine = db_engine self.tenant_id = tenant_id - # Load configuration to get searchable_option_dimensions - translator = None - translation_prompts = {} - try: - config_loader = ConfigLoader() - config = config_loader.load_config() - self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions - - # Initialize translator if translation is enabled - if config.query_config.enable_translation: - from query.translator import Translator - translator = Translator( - api_key=config.query_config.translation_api_key, - use_cache=True, # 索引时使用缓存避免重复翻译 - glossary_id=config.query_config.translation_glossary_id, - translation_context=config.query_config.translation_context - ) - translation_prompts = config.query_config.translation_prompts - except Exception as e: - logger.warning(f"Failed to load config, using default: {e}") - self.searchable_option_dimensions = ['option1', 'option2', 'option3'] - # Load category ID to name mapping - self.category_id_to_name = self._load_category_mapping() - - # Load tenant config - tenant_config_loader = get_tenant_config_loader() - tenant_config = tenant_config_loader.get_tenant_config(tenant_id) + self.category_id_to_name = load_category_mapping(db_engine) + logger.info(f"Loaded {len(self.category_id_to_name)} category ID to name mappings") # Initialize document transformer - self.document_transformer = SPUDocumentTransformer( + self.document_transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, - searchable_option_dimensions=self.searchable_option_dimensions, - tenant_config=tenant_config, - translator=translator, - translation_prompts=translation_prompts + tenant_id=tenant_id ) - - def _load_category_mapping(self) -> Dict[str, str]: - """ - Load category ID to name mapping from database. - - Returns: - Dictionary mapping category_id to category_name - """ - query = text(""" - SELECT DISTINCT - category_id, - category - FROM shoplazza_product_spu - WHERE deleted = 0 AND category_id IS NOT NULL - """) - - mapping = {} - with self.db_engine.connect() as conn: - result = conn.execute(query) - for row in result: - category_id = str(int(row.category_id)) - category_name = row.category - - if not category_name or not category_name.strip(): - logger.warning(f"Category ID {category_id} has empty name, skipping") - continue - - mapping[category_id] = category_name - - logger.info(f"Loaded {len(mapping)} category ID to name mappings") - - # Log all category mappings for debugging - if mapping: - logger.debug("Category ID mappings:") - for cid, name in sorted(mapping.items()): - logger.debug(f" {cid} -> {name}") - - return mapping def load_spu_data(self) -> pd.DataFrame: """ -- libgit2 0.21.2