# 索引数据接口文档 本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。 ## 目录 1. [租户配置说明](#租户配置说明) 2. [全量数据导入脚本](#全量数据导入脚本) 3. [增量数据获取接口](#增量数据获取接口) 4. [数据格式说明](#数据格式说明) 5. [使用示例](#使用示例) --- ## 租户配置说明 ### 配置文件位置 租户配置存储在统一配置文件 `config/config.yaml` 中,与索引配置放在同一文件。 ### 配置结构 在 `config/config.yaml` 中的 `tenant_config` 部分: ```yaml tenant_config: # 默认配置(未配置的租户使用此配置) default: primary_language: "zh" translate_to_en: true translate_to_zh: false # 租户特定配置 tenants: "1": primary_language: "zh" translate_to_en: true translate_to_zh: false "162": primary_language: "zh" translate_to_en: false translate_to_zh: false ``` ### 配置字段说明 | 字段 | 类型 | 说明 | 可选值 | |------|------|------|--------| | `primary_language` | string | 主语言(SKU表中title等文本字段的语言) | `"zh"`(中文)或 `"en"`(英文) | | `translate_to_en` | boolean | 是否需要翻译英文 | `true` 或 `false` | | `translate_to_zh` | boolean | 是否需要翻译中文 | `true` 或 `false` | ### 配置规则 1. **主语言**:指定SKU表中 `title`、`brief`、`description`、`vendor` 等字段的语言。 - 如果主语言是 `zh`,这些字段的值会填充到 `title_zh`、`brief_zh` 等字段 - 如果主语言是 `en`,这些字段的值会填充到 `title_en`、`brief_en` 等字段 2. **翻译配置**: - `translate_to_en: true`:如果主语言是中文,则会将中文内容翻译为英文,填充到 `title_en` 等字段 - `translate_to_zh: true`:如果主语言是英文,则会将英文内容翻译为中文,填充到 `title_zh` 等字段 - **注意**:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,`translate_to_en: true` 不会触发翻译) 3. **默认配置**:如果租户ID不在 `tenants` 中,则使用 `default` 配置。 ### 配置示例 **示例1:中文主语言,需要翻译英文** ```json { "primary_language": "zh", "translate_to_en": true, "translate_to_zh": false } ``` - SKU表的 `title` 字段(中文)→ `title_zh` - 翻译服务将中文翻译为英文 → `title_en` **示例2:英文主语言,需要翻译中文** ```json { "primary_language": "en", "translate_to_en": false, "translate_to_zh": true } ``` - SKU表的 `title` 字段(英文)→ `title_en` - 翻译服务将英文翻译为中文 → `title_zh` **示例3:仅使用主语言,不翻译** ```json { "primary_language": "zh", "translate_to_en": false, "translate_to_zh": false } ``` - SKU表的 `title` 字段(中文)→ `title_zh` - `title_en` 保持为 `null` ### 配置更新 修改 `config/config.yaml` 中的 `tenant_config` 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。 --- ## 全量数据导入脚本 ### 功能说明 `scripts/recreate_and_import.py` 是一个全量数据导入脚本,用于: - 重建ES索引(删除旧索引,使用新的mapping创建新索引) - 从MySQL数据库批量读取指定租户的所有SPU数据 - 将数据转换为ES文档格式 - 批量导入到Elasticsearch ### 使用方法 #### 基本用法 ```bash python scripts/recreate_and_import.py \ --tenant-id 1 \ --db-host 120.79.247.228 \ --db-port 3306 \ --db-database saas \ --db-username saas \ --db-password your_password \ --es-host http://localhost:9200 \ --batch-size 500 ``` #### 参数说明 | 参数 | 说明 | 是否必需 | 默认值 | |------|------|----------|--------| | `--tenant-id` | 租户ID | **是** | - | | `--db-host` | MySQL主机地址 | 否(可用环境变量) | 环境变量 `DB_HOST` | | `--db-port` | MySQL端口 | 否(可用环境变量) | 环境变量 `DB_PORT` 或 3306 | | `--db-database` | MySQL数据库名 | 否(可用环境变量) | 环境变量 `DB_DATABASE` | | `--db-username` | MySQL用户名 | 否(可用环境变量) | 环境变量 `DB_USERNAME` | | `--db-password` | MySQL密码 | 否(可用环境变量) | 环境变量 `DB_PASSWORD` | | `--es-host` | Elasticsearch地址 | 否(可用环境变量) | 环境变量 `ES_HOST` 或 `http://localhost:9200` | | `--batch-size` | 批量导入大小 | 否 | 500 | | `--skip-delete` | 跳过删除旧索引步骤 | 否 | False | #### 环境变量配置 可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息: ```bash export DB_HOST=120.79.247.228 export DB_PORT=3306 export DB_DATABASE=saas export DB_USERNAME=saas export DB_PASSWORD=your_password export ES_HOST=http://localhost:9200 python scripts/recreate_and_import.py --tenant-id 1 ``` #### 执行流程 脚本执行分为以下步骤: 1. **加载mapping配置**:从 `mappings/search_products.json` 加载ES索引mapping 2. **连接Elasticsearch**:验证ES连接可用性 3. **删除旧索引**(可选):如果索引已存在,删除旧索引(可通过 `--skip-delete` 跳过) 4. **创建新索引**:使用新的mapping创建索引 5. **连接MySQL**:建立数据库连接 6. **数据转换和导入**: - 从MySQL读取SPU、SKU、Option数据 - 转换为ES文档格式 - 批量导入到ES #### 输出示例 ``` ============================================================ 重建ES索引并导入数据 ============================================================ [1/4] 加载mapping配置... ✓ 成功加载mapping配置 索引名称: search_products [2/4] 连接Elasticsearch... ES地址: http://localhost:9200 ✓ Elasticsearch连接成功 [3/4] 删除旧索引... 发现已存在的索引: search_products ✓ 成功删除索引: search_products [4/4] 创建新索引... 创建索引: search_products ✓ 成功创建索引: search_products [5/5] 连接MySQL... MySQL: 120.79.247.228:3306/saas ✓ MySQL连接成功 [6/6] 导入数据... Tenant ID: 1 批量大小: 500 正在转换数据... ✓ 转换完成: 1000 个文档 正在导入数据到ES (批量大小: 500)... ✓ 导入完成 ============================================================ 导入完成! ============================================================ 成功: 1000 失败: 0 耗时: 12.34秒 ``` #### 注意事项 1. **数据量**:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。 2. **索引重建**:默认会删除旧索引,请确保有数据备份。 3. **性能**:批量大小(`--batch-size`)影响导入性能,建议根据ES集群性能调整(默认500)。 4. **租户隔离**:每次只能导入一个租户的数据,需要为每个租户分别执行。 --- ## 增量数据获取接口 ### 功能说明 增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于: - MySQL数据变更后,实时同步到ES - 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES - 避免全量重建索引,提高更新效率 ### 接口地址 ``` GET /indexer/spu/{spu_id}?tenant_id={tenant_id} ``` ### 请求参数 | 参数 | 位置 | 类型 | 说明 | 是否必需 | |------|------|------|------|----------| | `spu_id` | 路径参数 | string | SPU ID | **是** | | `tenant_id` | 查询参数 | string | 租户ID | **是** | ### 请求示例 ```bash # cURL curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1" # Java (OkHttp) OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("http://localhost:6002/indexer/spu/123?tenant_id=1") .get() .build(); Response response = client.newCall(request).execute(); String json = response.body().string(); ``` ### 响应格式 #### 成功响应(200 OK) 返回完整的ES文档JSON对象,包含所有索引字段: ```json { "tenant_id": "1", "spu_id": "123", "title_zh": "商品标题", "title_en": null, "brief_zh": "商品简介", "brief_en": null, "description_zh": "商品详细描述", "description_en": null, "vendor_zh": "供应商名称", "vendor_en": null, "tags": ["标签1", "标签2"], "category_path_zh": "类目1/类目2/类目3", "category_path_en": null, "category_name_zh": "类目名称", "category_name_en": null, "category_id": "100", "category_name": "类目名称", "category_level": 3, "category1_name": "类目1", "category2_name": "类目2", "category3_name": "类目3", "option1_name": "颜色", "option2_name": "尺寸", "option3_name": null, "option1_values": ["红色", "蓝色", "绿色"], "option2_values": ["S", "M", "L"], "option3_values": [], "min_price": 99.99, "max_price": 199.99, "compare_at_price": 299.99, "sku_prices": [99.99, 149.99, 199.99], "sku_weights": [100, 150, 200], "sku_weight_units": ["g"], "total_inventory": 500, "sales": 1000, "image_url": "https://example.com/image.jpg", "create_time": "2024-01-01T00:00:00", "update_time": "2024-01-02T00:00:00", "skus": [ { "sku_id": "456", "price": 99.99, "compare_at_price": 149.99, "sku_code": "SKU001", "stock": 100, "weight": 100.0, "weight_unit": "g", "option1_value": "红色", "option2_value": "S", "option3_value": null, "image_src": "https://example.com/sku1.jpg" } ], "specifications": [ { "sku_id": "456", "name": "颜色", "value": "红色" }, { "sku_id": "456", "name": "尺寸", "value": "S" } ] } ``` #### 错误响应 **404 Not Found** - SPU不存在或已删除: ```json { "detail": "SPU 123 not found for tenant_id=1 or has been deleted" } ``` **400 Bad Request** - 缺少必需参数: ```json { "detail": "tenant_id is required" } ``` **500 Internal Server Error** - 服务器内部错误: ```json { "detail": "Internal server error: ..." } ``` **503 Service Unavailable** - 服务未初始化: ```json { "detail": "Incremental indexer service is not initialized. Please check database connection." } ``` ### 健康检查接口 检查增量索引服务的健康状态: ``` GET /indexer/health ``` #### 响应示例 ```json { "status": "available", "database": "connected", "preloaded_data": { "category_mappings": 150, "searchable_option_dimensions": ["option1", "option2", "option3"] } } ``` ### 性能优化 服务在启动时预加载以下公共数据,以提高查询性能: 1. **分类映射**:所有租户共享的分类ID到名称映射 2. **配置信息**:搜索配置(如 `searchable_option_dimensions`) 这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。 ### 使用场景 #### 场景1:MySQL变更监听 外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时: ```java // 伪代码示例 @EventListener public void onProductChange(ProductChangeEvent event) { String tenantId = event.getTenantId(); String spuId = event.getSpuId(); // 调用增量接口获取ES文档数据 String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); Map esDoc = httpClient.get(url); // 推送到ES elasticsearchClient.index("search_products", esDoc); } ``` #### 场景2:定时同步 定时任务扫描变更的商品,批量更新: ```java // 伪代码示例 List changedSpuIds = getChangedSpuIds(); for (String spuId : changedSpuIds) { String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); Map esDoc = httpClient.get(url); elasticsearchClient.index("search_products", esDoc); } ``` ### 注意事项 1. **服务初始化**:确保API服务已启动,且数据库连接配置正确(`DB_HOST`, `DB_DATABASE`, `DB_USERNAME`, `DB_PASSWORD`)。 2. **数据一致性**:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。 3. **错误处理**:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。 4. **性能**:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。 --- ## 数据格式说明 ### ES文档结构 返回的ES文档结构完全符合 `mappings/search_products.json` 定义的索引结构。主要字段说明: | 字段类别 | 字段名 | 类型 | 说明 | |---------|--------|------|------| | 基础标识 | `tenant_id` | keyword | 租户ID | | 基础标识 | `spu_id` | keyword | SPU ID | | 文本字段 | `title_zh`, `title_en` | text | 标题(中英文) | | 文本字段 | `brief_zh`, `brief_en` | text | 简介(中英文) | | 文本字段 | `description_zh`, `description_en` | text | 描述(中英文) | | 文本字段 | `vendor_zh`, `vendor_en` | text | 供应商(中英文) | | 类目字段 | `category_path_zh`, `category_path_en` | text | 类目路径(中英文) | | 类目字段 | `category1_name`, `category2_name`, `category3_name` | keyword | 分层类目名称 | | 价格字段 | `min_price`, `max_price` | float | 价格范围 | | 库存字段 | `total_inventory` | long | 总库存 | | 销量字段 | `sales` | long | 销量 | | 嵌套字段 | `skus` | nested | SKU列表 | | 嵌套字段 | `specifications` | nested | 规格列表 | 详细字段说明请参考:[索引字段说明v2.md](./索引字段说明v2.md) ### SKU嵌套结构 ```json { "skus": [ { "sku_id": "456", "price": 99.99, "compare_at_price": 149.99, "sku_code": "SKU001", "stock": 100, "weight": 100.0, "weight_unit": "g", "option1_value": "红色", "option2_value": "S", "option3_value": null, "image_src": "https://example.com/sku1.jpg" } ] } ``` ### Specifications嵌套结构 ```json { "specifications": [ { "sku_id": "456", "name": "颜色", "value": "红色" }, { "sku_id": "456", "name": "尺寸", "value": "S" } ] } ``` --- ## 使用示例 ### 示例1:全量导入 ```bash # 设置环境变量 export DB_HOST=120.79.247.228 export DB_PORT=3306 export DB_DATABASE=saas export DB_USERNAME=saas export DB_PASSWORD=your_password export ES_HOST=http://localhost:9200 # 执行全量导入 python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500 ``` ### 示例2:增量更新(Java) ```java import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import com.fasterxml.jackson.databind.ObjectMapper; import org.elasticsearch.client.RestHighLevelClient; public class IncrementalIndexer { private static final String API_BASE_URL = "http://localhost:6002"; private static final OkHttpClient httpClient = new OkHttpClient(); private static final ObjectMapper objectMapper = new ObjectMapper(); private static final RestHighLevelClient esClient = createESClient(); /** * 获取SPU的ES文档数据并推送到ES */ public void indexSpu(String tenantId, String spuId) throws Exception { // 1. 调用增量接口获取数据 String url = String.format("%s/indexer/spu/%s?tenant_id=%s", API_BASE_URL, spuId, tenantId); Request request = new Request.Builder() .url(url) .get() .build(); try (Response response = httpClient.newCall(request).execute()) { if (response.code() == 404) { // SPU已删除,从ES中删除 deleteFromES(tenantId, spuId); return; } if (!response.isSuccessful()) { throw new RuntimeException("Failed to get SPU data: " + response.code()); } // 2. 解析JSON响应 String json = response.body().string(); Map esDoc = objectMapper.readValue(json, Map.class); // 3. 推送到ES IndexRequest indexRequest = new IndexRequest("search_products") .id(spuId) .source(esDoc); esClient.index(indexRequest, RequestOptions.DEFAULT); } } /** * 从ES中删除SPU */ private void deleteFromES(String tenantId, String spuId) throws Exception { DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); esClient.delete(deleteRequest, RequestOptions.DEFAULT); } } ``` ### 示例3:批量增量更新 ```java /** * 批量更新多个SPU */ public void batchIndexSpus(String tenantId, List spuIds) { ExecutorService executor = Executors.newFixedThreadPool(10); List> futures = new ArrayList<>(); for (String spuId : spuIds) { Future future = executor.submit(() -> { try { indexSpu(tenantId, spuId); } catch (Exception e) { log.error("Failed to index SPU: " + spuId, e); } }); futures.add(future); } // 等待所有任务完成 for (Future future : futures) { try { future.get(); } catch (Exception e) { log.error("Task failed", e); } } executor.shutdown(); } ``` ### 示例4:监听MySQL变更(Canal) ```java @CanalEventListener public class ProductChangeListener { @Autowired private IncrementalIndexer indexer; @ListenPoint( destination = "example", schema = "saas", table = {"shoplazza_product_spu", "shoplazza_product_sku"}, eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE} ) public void onEvent(CanalEntry.Entry entry) { String tableName = entry.getHeader().getTableName(); String tenantId = extractTenantId(entry); String spuId = extractSpuId(entry, tableName); if (tableName.equals("shoplazza_product_spu")) { if (entry.getEntryType() == CanalEntry.EntryType.DELETE) { // SPU删除,从ES删除 indexer.deleteFromES(tenantId, spuId); } else { // SPU新增或更新,重新索引 indexer.indexSpu(tenantId, spuId); } } else if (tableName.equals("shoplazza_product_sku")) { // SKU变更,需要更新对应的SPU indexer.indexSpu(tenantId, spuId); } } } ``` --- ## 常见问题 ### Q1: 全量导入和增量接口的区别? - **全量导入**:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。 - **增量接口**:适合实时同步,按需获取单个SPU数据,响应快速。 ### Q2: 增量接口返回的数据是否包含向量字段? 不包含。向量字段(`title_embedding`, `image_embedding`)需要单独生成,不在本接口返回范围内。如需向量字段,需要: 1. 调用本接口获取基础数据 2. 使用文本/图片编码服务生成向量 3. 将向量字段添加到文档后推送到ES ### Q3: 如何处理SPU删除? 当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档: ```java if (response.code() == 404) { DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); esClient.delete(deleteRequest, RequestOptions.DEFAULT); } ``` ### Q4: 服务启动失败,提示数据库连接错误? 检查环境变量或配置文件中的数据库连接信息: - `DB_HOST` - `DB_PORT` - `DB_DATABASE` - `DB_USERNAME` - `DB_PASSWORD` 确保这些变量已正确设置,且数据库可访问。 ### Q5: 接口响应慢怎么办? 1. 检查数据库连接池配置 2. 确认预加载数据是否成功(调用 `/indexer/health` 检查) 3. 检查数据库查询性能(SPU、SKU、Option表是否有索引) 4. 考虑使用连接池和缓存优化 --- ## 相关文档 - [索引字段说明v2.md](./索引字段说明v2.md) - ES索引字段详细说明 - [索引字段说明v2-参考表结构.md](./索引字段说明v2-参考表结构.md) - MySQL表结构参考 - [mappings/search_products.json](../mappings/search_products.json) - ES索引mapping定义