From cff5e86ff12b33b782021f98c5fc691a4286b137 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 18 Dec 2025 18:51:22 +0800 Subject: [PATCH] reindex --- a.sh | 1 - config/config.yaml | 4 ++++ docs/索引数据接口文档.md |docs/索引数据接口文档___old.md |full_bulk.sh | 2 ++ 5 files changed, 720 insertions(+), 715 deletions(-) delete mode 100644 a.sh delete mode 100644 docs/索引数据接口文档.md create mode 100644 docs/索引数据接口文档___old.md create mode 100644 full_bulk.sh diff --git a/a.sh b/a.sh deleted file mode 100644 index 88b0e2f..0000000 --- a/a.sh +++ /dev/null @@ -1 +0,0 @@ -curl -X POST "http://localhost:6004/indexer/reindex" -H "Content-Type: application/json" -d '{"tenant_id":"162","batch_size":500}' diff --git a/config/config.yaml b/config/config.yaml index c6c1c36..206ab3b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -172,3 +172,7 @@ tenant_config: primary_language: "zh" translate_to_en: false translate_to_zh: false + "170": + primary_language: "en" + translate_to_en: true + translate_to_zh: true diff --git a/docs/索引数据接口文档.md b/docs/索引数据接口文档.md deleted file mode 100644 index 8e968b6..0000000 --- a/docs/索引数据接口文档.md +++ /dev/null @@ -1,714 +0,0 @@ -# 索引数据接口文档 - -本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。 - -## 目录 - -1. [租户配置说明](#租户配置说明) -2. [全量数据导入脚本](#全量数据导入脚本) -3. [增量数据获取接口](#增量数据获取接口) -4. [数据格式说明](#数据格式说明) -5. [使用示例](#使用示例) - ---- - -## 租户配置说明 - -### 配置文件位置 - -租户配置存储在统一配置文件 `config/config.yaml` 中,与索引配置放在同一文件。 - -### 配置结构 - -在 `config/config.yaml` 中的 `tenant_config` 部分: - -```yaml -tenant_config: - # 默认配置(未配置的租户使用此配置) - default: - primary_language: "zh" - translate_to_en: true - translate_to_zh: false - # 租户特定配置 - tenants: - "1": - primary_language: "zh" - translate_to_en: true - translate_to_zh: false - "162": - primary_language: "zh" - translate_to_en: false - translate_to_zh: false -``` - -### 配置字段说明 - -| 字段 | 类型 | 说明 | 可选值 | -|------|------|------|--------| -| `primary_language` | string | 主语言(SKU表中title等文本字段的语言) | `"zh"`(中文)或 `"en"`(英文) | -| `translate_to_en` | boolean | 是否需要翻译英文 | `true` 或 `false` | -| `translate_to_zh` | boolean | 是否需要翻译中文 | `true` 或 `false` | - -### 配置规则 - -1. **主语言**:指定SKU表中 `title`、`brief`、`description`、`vendor` 等字段的语言。 - - 如果主语言是 `zh`,这些字段的值会填充到 `title_zh`、`brief_zh` 等字段 - - 如果主语言是 `en`,这些字段的值会填充到 `title_en`、`brief_en` 等字段 - -2. **翻译配置**: - - `translate_to_en: true`:如果主语言是中文,则会将中文内容翻译为英文,填充到 `title_en` 等字段 - - `translate_to_zh: true`:如果主语言是英文,则会将英文内容翻译为中文,填充到 `title_zh` 等字段 - - **注意**:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,`translate_to_en: true` 不会触发翻译) - -3. **默认配置**:如果租户ID不在 `tenants` 中,则使用 `default` 配置。 - -### 配置示例 - -**示例1:中文主语言,需要翻译英文** -```json -{ - "primary_language": "zh", - "translate_to_en": true, - "translate_to_zh": false -} -``` -- SKU表的 `title` 字段(中文)→ `title_zh` -- 翻译服务将中文翻译为英文 → `title_en` - -**示例2:英文主语言,需要翻译中文** -```json -{ - "primary_language": "en", - "translate_to_en": false, - "translate_to_zh": true -} -``` -- SKU表的 `title` 字段(英文)→ `title_en` -- 翻译服务将英文翻译为中文 → `title_zh` - -**示例3:仅使用主语言,不翻译** -```json -{ - "primary_language": "zh", - "translate_to_en": false, - "translate_to_zh": false -} -``` -- SKU表的 `title` 字段(中文)→ `title_zh` -- `title_en` 保持为 `null` - -### 配置更新 - -修改 `config/config.yaml` 中的 `tenant_config` 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。 - ---- - -## 全量数据导入脚本 - -### 功能说明 - -`scripts/recreate_and_import.py` 是一个全量数据导入脚本,用于: -- 重建ES索引(删除旧索引,使用新的mapping创建新索引) -- 从MySQL数据库批量读取指定租户的所有SPU数据 -- 将数据转换为ES文档格式 -- 批量导入到Elasticsearch - -### 使用方法 - -#### 基本用法 - -```bash -python scripts/recreate_and_import.py \ - --tenant-id 1 \ - --db-host 120.79.247.228 \ - --db-port 3306 \ - --db-database saas \ - --db-username saas \ - --db-password your_password \ - --es-host http://localhost:9200 \ - --batch-size 500 -``` - -#### 参数说明 - -| 参数 | 说明 | 是否必需 | 默认值 | -|------|------|----------|--------| -| `--tenant-id` | 租户ID | **是** | - | -| `--db-host` | MySQL主机地址 | 否(可用环境变量) | 环境变量 `DB_HOST` | -| `--db-port` | MySQL端口 | 否(可用环境变量) | 环境变量 `DB_PORT` 或 3306 | -| `--db-database` | MySQL数据库名 | 否(可用环境变量) | 环境变量 `DB_DATABASE` | -| `--db-username` | MySQL用户名 | 否(可用环境变量) | 环境变量 `DB_USERNAME` | -| `--db-password` | MySQL密码 | 否(可用环境变量) | 环境变量 `DB_PASSWORD` | -| `--es-host` | Elasticsearch地址 | 否(可用环境变量) | 环境变量 `ES_HOST` 或 `http://localhost:9200` | -| `--batch-size` | 批量导入大小 | 否 | 500 | -| `--skip-delete` | 跳过删除旧索引步骤 | 否 | False | - -#### 环境变量配置 - -可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息: - -```bash -export DB_HOST=120.79.247.228 -export DB_PORT=3306 -export DB_DATABASE=saas -export DB_USERNAME=saas -export DB_PASSWORD=your_password -export ES_HOST=http://localhost:9200 - -python scripts/recreate_and_import.py --tenant-id 1 -``` - -#### 执行流程 - -脚本执行分为以下步骤: - -1. **加载mapping配置**:从 `mappings/search_products.json` 加载ES索引mapping -2. **连接Elasticsearch**:验证ES连接可用性 -3. **删除旧索引**(可选):如果索引已存在,删除旧索引(可通过 `--skip-delete` 跳过) -4. **创建新索引**:使用新的mapping创建索引 -5. **连接MySQL**:建立数据库连接 -6. **数据转换和导入**: - - 从MySQL读取SPU、SKU、Option数据 - - 转换为ES文档格式 - - 批量导入到ES - -#### 输出示例 - -``` -============================================================ -重建ES索引并导入数据 -============================================================ - -[1/4] 加载mapping配置... -✓ 成功加载mapping配置 -索引名称: search_products - -[2/4] 连接Elasticsearch... -ES地址: http://localhost:9200 -✓ Elasticsearch连接成功 - -[3/4] 删除旧索引... -发现已存在的索引: search_products -✓ 成功删除索引: search_products - -[4/4] 创建新索引... -创建索引: search_products -✓ 成功创建索引: search_products - -[5/5] 连接MySQL... -MySQL: 120.79.247.228:3306/saas -✓ MySQL连接成功 - -[6/6] 导入数据... -Tenant ID: 1 -批量大小: 500 -正在转换数据... -✓ 转换完成: 1000 个文档 -正在导入数据到ES (批量大小: 500)... -✓ 导入完成 - -============================================================ -导入完成! -============================================================ -成功: 1000 -失败: 0 -耗时: 12.34秒 -``` - -#### 注意事项 - -1. **数据量**:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。 -2. **索引重建**:默认会删除旧索引,请确保有数据备份。 -3. **性能**:批量大小(`--batch-size`)影响导入性能,建议根据ES集群性能调整(默认500)。 -4. **租户隔离**:每次只能导入一个租户的数据,需要为每个租户分别执行。 - ---- - -## 增量数据获取接口 - -### 功能说明 - -增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于: -- MySQL数据变更后,实时同步到ES -- 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES -- 避免全量重建索引,提高更新效率 - -### 接口地址 - -``` -GET /indexer/spu/{spu_id}?tenant_id={tenant_id} -``` - -### 请求参数 - -| 参数 | 位置 | 类型 | 说明 | 是否必需 | -|------|------|------|------|----------| -| `spu_id` | 路径参数 | string | SPU ID | **是** | -| `tenant_id` | 查询参数 | string | 租户ID | **是** | - -### 请求示例 - -```bash -# cURL -curl -X GET "http://localhost:6004/indexer/spu/123?tenant_id=1" - -# Java (OkHttp) -OkHttpClient client = new OkHttpClient(); -Request request = new Request.Builder() - .url("http://localhost:6004/indexer/spu/123?tenant_id=1") - .get() - .build(); -Response response = client.newCall(request).execute(); -String json = response.body().string(); -``` - -### 响应格式 - -#### 成功响应(200 OK) - -返回完整的ES文档JSON对象,包含所有索引字段: - -```json -{ - "tenant_id": "1", - "spu_id": "123", - "title_zh": "商品标题", - "title_en": null, - "brief_zh": "商品简介", - "brief_en": null, - "description_zh": "商品详细描述", - "description_en": null, - "vendor_zh": "供应商名称", - "vendor_en": null, - "tags": ["标签1", "标签2"], - "category_path_zh": "类目1/类目2/类目3", - "category_path_en": null, - "category_name_zh": "类目名称", - "category_name_en": null, - "category_id": "100", - "category_name": "类目名称", - "category_level": 3, - "category1_name": "类目1", - "category2_name": "类目2", - "category3_name": "类目3", - "option1_name": "颜色", - "option2_name": "尺寸", - "option3_name": null, - "option1_values": ["红色", "蓝色", "绿色"], - "option2_values": ["S", "M", "L"], - "option3_values": [], - "min_price": 99.99, - "max_price": 199.99, - "compare_at_price": 299.99, - "sku_prices": [99.99, 149.99, 199.99], - "sku_weights": [100, 150, 200], - "sku_weight_units": ["g"], - "total_inventory": 500, - "sales": 1000, - "image_url": "https://example.com/image.jpg", - "create_time": "2024-01-01T00:00:00", - "update_time": "2024-01-02T00:00:00", - "skus": [ - { - "sku_id": "456", - "price": 99.99, - "compare_at_price": 149.99, - "sku_code": "SKU001", - "stock": 100, - "weight": 100.0, - "weight_unit": "g", - "option1_value": "红色", - "option2_value": "S", - "option3_value": null, - "image_src": "https://example.com/sku1.jpg" - } - ], - "specifications": [ - { - "sku_id": "456", - "name": "颜色", - "value": "红色" - }, - { - "sku_id": "456", - "name": "尺寸", - "value": "S" - } - ] -} -``` - -#### 错误响应 - -**404 Not Found** - SPU不存在或已删除: -```json -{ - "detail": "SPU 123 not found for tenant_id=1 or has been deleted" -} -``` - -**400 Bad Request** - 缺少必需参数: -```json -{ - "detail": "tenant_id is required" -} -``` - -**500 Internal Server Error** - 服务器内部错误: -```json -{ - "detail": "Internal server error: ..." -} -``` - -**503 Service Unavailable** - 服务未初始化: -```json -{ - "detail": "Incremental indexer service is not initialized. Please check database connection." -} -``` - -### 健康检查接口 - -检查增量索引服务的健康状态: - -``` -GET /indexer/health -``` - -#### 响应示例 - -```json -{ - "status": "available", - "database": "connected", - "preloaded_data": { - "category_mappings": 150, - "searchable_option_dimensions": ["option1", "option2", "option3"] - } -} -``` - -### 性能优化 - -服务在启动时预加载以下公共数据,以提高查询性能: - -1. **分类映射**:所有租户共享的分类ID到名称映射 -2. **配置信息**:搜索配置(如 `searchable_option_dimensions`) - -这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。 - -### 使用场景 - -#### 场景1:MySQL变更监听 - -外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时: - -```java -// 伪代码示例 -@EventListener -public void onProductChange(ProductChangeEvent event) { - String tenantId = event.getTenantId(); - String spuId = event.getSpuId(); - - // 调用增量接口获取ES文档数据 - String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); - Map esDoc = httpClient.get(url); - - // 推送到ES - elasticsearchClient.index("search_products", esDoc); -} -``` - -#### 场景2:定时同步 - -定时任务扫描变更的商品,批量更新: - -```java -// 伪代码示例 -List changedSpuIds = getChangedSpuIds(); -for (String spuId : changedSpuIds) { - String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); - Map esDoc = httpClient.get(url); - elasticsearchClient.index("search_products", esDoc); -} -``` - -### 注意事项 - -1. **服务初始化**:确保API服务已启动,且数据库连接配置正确(`DB_HOST`, `DB_DATABASE`, `DB_USERNAME`, `DB_PASSWORD`)。 -2. **数据一致性**:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。 -3. **错误处理**:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。 -4. **性能**:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。 - ---- - -## 数据格式说明 - -### ES文档结构 - -返回的ES文档结构完全符合 `mappings/search_products.json` 定义的索引结构。主要字段说明: - -| 字段类别 | 字段名 | 类型 | 说明 | -|---------|--------|------|------| -| 基础标识 | `tenant_id` | keyword | 租户ID | -| 基础标识 | `spu_id` | keyword | SPU ID | -| 文本字段 | `title_zh`, `title_en` | text | 标题(中英文) | -| 文本字段 | `brief_zh`, `brief_en` | text | 简介(中英文) | -| 文本字段 | `description_zh`, `description_en` | text | 描述(中英文) | -| 文本字段 | `vendor_zh`, `vendor_en` | text | 供应商(中英文) | -| 类目字段 | `category_path_zh`, `category_path_en` | text | 类目路径(中英文) | -| 类目字段 | `category1_name`, `category2_name`, `category3_name` | keyword | 分层类目名称 | -| 价格字段 | `min_price`, `max_price` | float | 价格范围 | -| 库存字段 | `total_inventory` | long | 总库存 | -| 销量字段 | `sales` | long | 销量 | -| 嵌套字段 | `skus` | nested | SKU列表 | -| 嵌套字段 | `specifications` | nested | 规格列表 | - -详细字段说明请参考:[索引字段说明v2.md](./索引字段说明v2.md) - -### SKU嵌套结构 - -```json -{ - "skus": [ - { - "sku_id": "456", - "price": 99.99, - "compare_at_price": 149.99, - "sku_code": "SKU001", - "stock": 100, - "weight": 100.0, - "weight_unit": "g", - "option1_value": "红色", - "option2_value": "S", - "option3_value": null, - "image_src": "https://example.com/sku1.jpg" - } - ] -} -``` - -### Specifications嵌套结构 - -```json -{ - "specifications": [ - { - "sku_id": "456", - "name": "颜色", - "value": "红色" - }, - { - "sku_id": "456", - "name": "尺寸", - "value": "S" - } - ] -} -``` - ---- - -## 使用示例 - -### 示例1:全量导入 - -```bash -# 设置环境变量 -export DB_HOST=120.79.247.228 -export DB_PORT=3306 -export DB_DATABASE=saas -export DB_USERNAME=saas -export DB_PASSWORD=your_password -export ES_HOST=http://localhost:9200 - -# 执行全量导入 -python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500 -``` - -### 示例2:增量更新(Java) - -```java -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.elasticsearch.client.RestHighLevelClient; - -public class IncrementalIndexer { - private static final String API_BASE_URL = "http://localhost:6002"; - private static final OkHttpClient httpClient = new OkHttpClient(); - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final RestHighLevelClient esClient = createESClient(); - - /** - * 获取SPU的ES文档数据并推送到ES - */ - public void indexSpu(String tenantId, String spuId) throws Exception { - // 1. 调用增量接口获取数据 - String url = String.format("%s/indexer/spu/%s?tenant_id=%s", - API_BASE_URL, spuId, tenantId); - - Request request = new Request.Builder() - .url(url) - .get() - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - if (response.code() == 404) { - // SPU已删除,从ES中删除 - deleteFromES(tenantId, spuId); - return; - } - - if (!response.isSuccessful()) { - throw new RuntimeException("Failed to get SPU data: " + response.code()); - } - - // 2. 解析JSON响应 - String json = response.body().string(); - Map esDoc = objectMapper.readValue(json, Map.class); - - // 3. 推送到ES - IndexRequest indexRequest = new IndexRequest("search_products") - .id(spuId) - .source(esDoc); - - esClient.index(indexRequest, RequestOptions.DEFAULT); - } - } - - /** - * 从ES中删除SPU - */ - private void deleteFromES(String tenantId, String spuId) throws Exception { - DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); - esClient.delete(deleteRequest, RequestOptions.DEFAULT); - } -} -``` - -### 示例3:批量增量更新 - -```java -/** - * 批量更新多个SPU - */ -public void batchIndexSpus(String tenantId, List spuIds) { - ExecutorService executor = Executors.newFixedThreadPool(10); - List> futures = new ArrayList<>(); - - for (String spuId : spuIds) { - Future future = executor.submit(() -> { - try { - indexSpu(tenantId, spuId); - } catch (Exception e) { - log.error("Failed to index SPU: " + spuId, e); - } - }); - futures.add(future); - } - - // 等待所有任务完成 - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - log.error("Task failed", e); - } - } - - executor.shutdown(); -} -``` - -### 示例4:监听MySQL变更(Canal) - -```java -@CanalEventListener -public class ProductChangeListener { - - @Autowired - private IncrementalIndexer indexer; - - @ListenPoint( - destination = "example", - schema = "saas", - table = {"shoplazza_product_spu", "shoplazza_product_sku"}, - eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE} - ) - public void onEvent(CanalEntry.Entry entry) { - String tableName = entry.getHeader().getTableName(); - String tenantId = extractTenantId(entry); - String spuId = extractSpuId(entry, tableName); - - if (tableName.equals("shoplazza_product_spu")) { - if (entry.getEntryType() == CanalEntry.EntryType.DELETE) { - // SPU删除,从ES删除 - indexer.deleteFromES(tenantId, spuId); - } else { - // SPU新增或更新,重新索引 - indexer.indexSpu(tenantId, spuId); - } - } else if (tableName.equals("shoplazza_product_sku")) { - // SKU变更,需要更新对应的SPU - indexer.indexSpu(tenantId, spuId); - } - } -} -``` - ---- - -## 常见问题 - -### Q1: 全量导入和增量接口的区别? - -- **全量导入**:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。 -- **增量接口**:适合实时同步,按需获取单个SPU数据,响应快速。 - -### Q2: 增量接口返回的数据是否包含向量字段? - -不包含。向量字段(`title_embedding`, `image_embedding`)需要单独生成,不在本接口返回范围内。如需向量字段,需要: -1. 调用本接口获取基础数据 -2. 使用文本/图片编码服务生成向量 -3. 将向量字段添加到文档后推送到ES - -### Q3: 如何处理SPU删除? - -当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档: - -```java -if (response.code() == 404) { - DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); - esClient.delete(deleteRequest, RequestOptions.DEFAULT); -} -``` - -### Q4: 服务启动失败,提示数据库连接错误? - -检查环境变量或配置文件中的数据库连接信息: -- `DB_HOST` -- `DB_PORT` -- `DB_DATABASE` -- `DB_USERNAME` -- `DB_PASSWORD` - -确保这些变量已正确设置,且数据库可访问。 - -### Q5: 接口响应慢怎么办? - -1. 检查数据库连接池配置 -2. 确认预加载数据是否成功(调用 `/indexer/health` 检查) -3. 检查数据库查询性能(SPU、SKU、Option表是否有索引) -4. 考虑使用连接池和缓存优化 - ---- - -## 相关文档 - -- [索引字段说明v2.md](./索引字段说明v2.md) - ES索引字段详细说明 -- [索引字段说明v2-参考表结构.md](./索引字段说明v2-参考表结构.md) - MySQL表结构参考 -- [mappings/search_products.json](../mappings/search_products.json) - ES索引mapping定义 - diff --git a/docs/索引数据接口文档___old.md b/docs/索引数据接口文档___old.md new file mode 100644 index 0000000..8e968b6 --- /dev/null +++ b/docs/索引数据接口文档___old.md @@ -0,0 +1,714 @@ +# 索引数据接口文档 + +本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。 + +## 目录 + +1. [租户配置说明](#租户配置说明) +2. [全量数据导入脚本](#全量数据导入脚本) +3. [增量数据获取接口](#增量数据获取接口) +4. [数据格式说明](#数据格式说明) +5. [使用示例](#使用示例) + +--- + +## 租户配置说明 + +### 配置文件位置 + +租户配置存储在统一配置文件 `config/config.yaml` 中,与索引配置放在同一文件。 + +### 配置结构 + +在 `config/config.yaml` 中的 `tenant_config` 部分: + +```yaml +tenant_config: + # 默认配置(未配置的租户使用此配置) + default: + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + # 租户特定配置 + tenants: + "1": + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + "162": + primary_language: "zh" + translate_to_en: false + translate_to_zh: false +``` + +### 配置字段说明 + +| 字段 | 类型 | 说明 | 可选值 | +|------|------|------|--------| +| `primary_language` | string | 主语言(SKU表中title等文本字段的语言) | `"zh"`(中文)或 `"en"`(英文) | +| `translate_to_en` | boolean | 是否需要翻译英文 | `true` 或 `false` | +| `translate_to_zh` | boolean | 是否需要翻译中文 | `true` 或 `false` | + +### 配置规则 + +1. **主语言**:指定SKU表中 `title`、`brief`、`description`、`vendor` 等字段的语言。 + - 如果主语言是 `zh`,这些字段的值会填充到 `title_zh`、`brief_zh` 等字段 + - 如果主语言是 `en`,这些字段的值会填充到 `title_en`、`brief_en` 等字段 + +2. **翻译配置**: + - `translate_to_en: true`:如果主语言是中文,则会将中文内容翻译为英文,填充到 `title_en` 等字段 + - `translate_to_zh: true`:如果主语言是英文,则会将英文内容翻译为中文,填充到 `title_zh` 等字段 + - **注意**:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,`translate_to_en: true` 不会触发翻译) + +3. **默认配置**:如果租户ID不在 `tenants` 中,则使用 `default` 配置。 + +### 配置示例 + +**示例1:中文主语言,需要翻译英文** +```json +{ + "primary_language": "zh", + "translate_to_en": true, + "translate_to_zh": false +} +``` +- SKU表的 `title` 字段(中文)→ `title_zh` +- 翻译服务将中文翻译为英文 → `title_en` + +**示例2:英文主语言,需要翻译中文** +```json +{ + "primary_language": "en", + "translate_to_en": false, + "translate_to_zh": true +} +``` +- SKU表的 `title` 字段(英文)→ `title_en` +- 翻译服务将英文翻译为中文 → `title_zh` + +**示例3:仅使用主语言,不翻译** +```json +{ + "primary_language": "zh", + "translate_to_en": false, + "translate_to_zh": false +} +``` +- SKU表的 `title` 字段(中文)→ `title_zh` +- `title_en` 保持为 `null` + +### 配置更新 + +修改 `config/config.yaml` 中的 `tenant_config` 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。 + +--- + +## 全量数据导入脚本 + +### 功能说明 + +`scripts/recreate_and_import.py` 是一个全量数据导入脚本,用于: +- 重建ES索引(删除旧索引,使用新的mapping创建新索引) +- 从MySQL数据库批量读取指定租户的所有SPU数据 +- 将数据转换为ES文档格式 +- 批量导入到Elasticsearch + +### 使用方法 + +#### 基本用法 + +```bash +python scripts/recreate_and_import.py \ + --tenant-id 1 \ + --db-host 120.79.247.228 \ + --db-port 3306 \ + --db-database saas \ + --db-username saas \ + --db-password your_password \ + --es-host http://localhost:9200 \ + --batch-size 500 +``` + +#### 参数说明 + +| 参数 | 说明 | 是否必需 | 默认值 | +|------|------|----------|--------| +| `--tenant-id` | 租户ID | **是** | - | +| `--db-host` | MySQL主机地址 | 否(可用环境变量) | 环境变量 `DB_HOST` | +| `--db-port` | MySQL端口 | 否(可用环境变量) | 环境变量 `DB_PORT` 或 3306 | +| `--db-database` | MySQL数据库名 | 否(可用环境变量) | 环境变量 `DB_DATABASE` | +| `--db-username` | MySQL用户名 | 否(可用环境变量) | 环境变量 `DB_USERNAME` | +| `--db-password` | MySQL密码 | 否(可用环境变量) | 环境变量 `DB_PASSWORD` | +| `--es-host` | Elasticsearch地址 | 否(可用环境变量) | 环境变量 `ES_HOST` 或 `http://localhost:9200` | +| `--batch-size` | 批量导入大小 | 否 | 500 | +| `--skip-delete` | 跳过删除旧索引步骤 | 否 | False | + +#### 环境变量配置 + +可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息: + +```bash +export DB_HOST=120.79.247.228 +export DB_PORT=3306 +export DB_DATABASE=saas +export DB_USERNAME=saas +export DB_PASSWORD=your_password +export ES_HOST=http://localhost:9200 + +python scripts/recreate_and_import.py --tenant-id 1 +``` + +#### 执行流程 + +脚本执行分为以下步骤: + +1. **加载mapping配置**:从 `mappings/search_products.json` 加载ES索引mapping +2. **连接Elasticsearch**:验证ES连接可用性 +3. **删除旧索引**(可选):如果索引已存在,删除旧索引(可通过 `--skip-delete` 跳过) +4. **创建新索引**:使用新的mapping创建索引 +5. **连接MySQL**:建立数据库连接 +6. **数据转换和导入**: + - 从MySQL读取SPU、SKU、Option数据 + - 转换为ES文档格式 + - 批量导入到ES + +#### 输出示例 + +``` +============================================================ +重建ES索引并导入数据 +============================================================ + +[1/4] 加载mapping配置... +✓ 成功加载mapping配置 +索引名称: search_products + +[2/4] 连接Elasticsearch... +ES地址: http://localhost:9200 +✓ Elasticsearch连接成功 + +[3/4] 删除旧索引... +发现已存在的索引: search_products +✓ 成功删除索引: search_products + +[4/4] 创建新索引... +创建索引: search_products +✓ 成功创建索引: search_products + +[5/5] 连接MySQL... +MySQL: 120.79.247.228:3306/saas +✓ MySQL连接成功 + +[6/6] 导入数据... +Tenant ID: 1 +批量大小: 500 +正在转换数据... +✓ 转换完成: 1000 个文档 +正在导入数据到ES (批量大小: 500)... +✓ 导入完成 + +============================================================ +导入完成! +============================================================ +成功: 1000 +失败: 0 +耗时: 12.34秒 +``` + +#### 注意事项 + +1. **数据量**:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。 +2. **索引重建**:默认会删除旧索引,请确保有数据备份。 +3. **性能**:批量大小(`--batch-size`)影响导入性能,建议根据ES集群性能调整(默认500)。 +4. **租户隔离**:每次只能导入一个租户的数据,需要为每个租户分别执行。 + +--- + +## 增量数据获取接口 + +### 功能说明 + +增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于: +- MySQL数据变更后,实时同步到ES +- 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES +- 避免全量重建索引,提高更新效率 + +### 接口地址 + +``` +GET /indexer/spu/{spu_id}?tenant_id={tenant_id} +``` + +### 请求参数 + +| 参数 | 位置 | 类型 | 说明 | 是否必需 | +|------|------|------|------|----------| +| `spu_id` | 路径参数 | string | SPU ID | **是** | +| `tenant_id` | 查询参数 | string | 租户ID | **是** | + +### 请求示例 + +```bash +# cURL +curl -X GET "http://localhost:6004/indexer/spu/123?tenant_id=1" + +# Java (OkHttp) +OkHttpClient client = new OkHttpClient(); +Request request = new Request.Builder() + .url("http://localhost:6004/indexer/spu/123?tenant_id=1") + .get() + .build(); +Response response = client.newCall(request).execute(); +String json = response.body().string(); +``` + +### 响应格式 + +#### 成功响应(200 OK) + +返回完整的ES文档JSON对象,包含所有索引字段: + +```json +{ + "tenant_id": "1", + "spu_id": "123", + "title_zh": "商品标题", + "title_en": null, + "brief_zh": "商品简介", + "brief_en": null, + "description_zh": "商品详细描述", + "description_en": null, + "vendor_zh": "供应商名称", + "vendor_en": null, + "tags": ["标签1", "标签2"], + "category_path_zh": "类目1/类目2/类目3", + "category_path_en": null, + "category_name_zh": "类目名称", + "category_name_en": null, + "category_id": "100", + "category_name": "类目名称", + "category_level": 3, + "category1_name": "类目1", + "category2_name": "类目2", + "category3_name": "类目3", + "option1_name": "颜色", + "option2_name": "尺寸", + "option3_name": null, + "option1_values": ["红色", "蓝色", "绿色"], + "option2_values": ["S", "M", "L"], + "option3_values": [], + "min_price": 99.99, + "max_price": 199.99, + "compare_at_price": 299.99, + "sku_prices": [99.99, 149.99, 199.99], + "sku_weights": [100, 150, 200], + "sku_weight_units": ["g"], + "total_inventory": 500, + "sales": 1000, + "image_url": "https://example.com/image.jpg", + "create_time": "2024-01-01T00:00:00", + "update_time": "2024-01-02T00:00:00", + "skus": [ + { + "sku_id": "456", + "price": 99.99, + "compare_at_price": 149.99, + "sku_code": "SKU001", + "stock": 100, + "weight": 100.0, + "weight_unit": "g", + "option1_value": "红色", + "option2_value": "S", + "option3_value": null, + "image_src": "https://example.com/sku1.jpg" + } + ], + "specifications": [ + { + "sku_id": "456", + "name": "颜色", + "value": "红色" + }, + { + "sku_id": "456", + "name": "尺寸", + "value": "S" + } + ] +} +``` + +#### 错误响应 + +**404 Not Found** - SPU不存在或已删除: +```json +{ + "detail": "SPU 123 not found for tenant_id=1 or has been deleted" +} +``` + +**400 Bad Request** - 缺少必需参数: +```json +{ + "detail": "tenant_id is required" +} +``` + +**500 Internal Server Error** - 服务器内部错误: +```json +{ + "detail": "Internal server error: ..." +} +``` + +**503 Service Unavailable** - 服务未初始化: +```json +{ + "detail": "Incremental indexer service is not initialized. Please check database connection." +} +``` + +### 健康检查接口 + +检查增量索引服务的健康状态: + +``` +GET /indexer/health +``` + +#### 响应示例 + +```json +{ + "status": "available", + "database": "connected", + "preloaded_data": { + "category_mappings": 150, + "searchable_option_dimensions": ["option1", "option2", "option3"] + } +} +``` + +### 性能优化 + +服务在启动时预加载以下公共数据,以提高查询性能: + +1. **分类映射**:所有租户共享的分类ID到名称映射 +2. **配置信息**:搜索配置(如 `searchable_option_dimensions`) + +这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。 + +### 使用场景 + +#### 场景1:MySQL变更监听 + +外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时: + +```java +// 伪代码示例 +@EventListener +public void onProductChange(ProductChangeEvent event) { + String tenantId = event.getTenantId(); + String spuId = event.getSpuId(); + + // 调用增量接口获取ES文档数据 + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + Map esDoc = httpClient.get(url); + + // 推送到ES + elasticsearchClient.index("search_products", esDoc); +} +``` + +#### 场景2:定时同步 + +定时任务扫描变更的商品,批量更新: + +```java +// 伪代码示例 +List changedSpuIds = getChangedSpuIds(); +for (String spuId : changedSpuIds) { + String url = String.format("http://localhost:6004/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + Map esDoc = httpClient.get(url); + elasticsearchClient.index("search_products", esDoc); +} +``` + +### 注意事项 + +1. **服务初始化**:确保API服务已启动,且数据库连接配置正确(`DB_HOST`, `DB_DATABASE`, `DB_USERNAME`, `DB_PASSWORD`)。 +2. **数据一致性**:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。 +3. **错误处理**:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。 +4. **性能**:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。 + +--- + +## 数据格式说明 + +### ES文档结构 + +返回的ES文档结构完全符合 `mappings/search_products.json` 定义的索引结构。主要字段说明: + +| 字段类别 | 字段名 | 类型 | 说明 | +|---------|--------|------|------| +| 基础标识 | `tenant_id` | keyword | 租户ID | +| 基础标识 | `spu_id` | keyword | SPU ID | +| 文本字段 | `title_zh`, `title_en` | text | 标题(中英文) | +| 文本字段 | `brief_zh`, `brief_en` | text | 简介(中英文) | +| 文本字段 | `description_zh`, `description_en` | text | 描述(中英文) | +| 文本字段 | `vendor_zh`, `vendor_en` | text | 供应商(中英文) | +| 类目字段 | `category_path_zh`, `category_path_en` | text | 类目路径(中英文) | +| 类目字段 | `category1_name`, `category2_name`, `category3_name` | keyword | 分层类目名称 | +| 价格字段 | `min_price`, `max_price` | float | 价格范围 | +| 库存字段 | `total_inventory` | long | 总库存 | +| 销量字段 | `sales` | long | 销量 | +| 嵌套字段 | `skus` | nested | SKU列表 | +| 嵌套字段 | `specifications` | nested | 规格列表 | + +详细字段说明请参考:[索引字段说明v2.md](./索引字段说明v2.md) + +### SKU嵌套结构 + +```json +{ + "skus": [ + { + "sku_id": "456", + "price": 99.99, + "compare_at_price": 149.99, + "sku_code": "SKU001", + "stock": 100, + "weight": 100.0, + "weight_unit": "g", + "option1_value": "红色", + "option2_value": "S", + "option3_value": null, + "image_src": "https://example.com/sku1.jpg" + } + ] +} +``` + +### Specifications嵌套结构 + +```json +{ + "specifications": [ + { + "sku_id": "456", + "name": "颜色", + "value": "红色" + }, + { + "sku_id": "456", + "name": "尺寸", + "value": "S" + } + ] +} +``` + +--- + +## 使用示例 + +### 示例1:全量导入 + +```bash +# 设置环境变量 +export DB_HOST=120.79.247.228 +export DB_PORT=3306 +export DB_DATABASE=saas +export DB_USERNAME=saas +export DB_PASSWORD=your_password +export ES_HOST=http://localhost:9200 + +# 执行全量导入 +python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500 +``` + +### 示例2:增量更新(Java) + +```java +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.elasticsearch.client.RestHighLevelClient; + +public class IncrementalIndexer { + private static final String API_BASE_URL = "http://localhost:6002"; + private static final OkHttpClient httpClient = new OkHttpClient(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final RestHighLevelClient esClient = createESClient(); + + /** + * 获取SPU的ES文档数据并推送到ES + */ + public void indexSpu(String tenantId, String spuId) throws Exception { + // 1. 调用增量接口获取数据 + String url = String.format("%s/indexer/spu/%s?tenant_id=%s", + API_BASE_URL, spuId, tenantId); + + Request request = new Request.Builder() + .url(url) + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + if (response.code() == 404) { + // SPU已删除,从ES中删除 + deleteFromES(tenantId, spuId); + return; + } + + if (!response.isSuccessful()) { + throw new RuntimeException("Failed to get SPU data: " + response.code()); + } + + // 2. 解析JSON响应 + String json = response.body().string(); + Map esDoc = objectMapper.readValue(json, Map.class); + + // 3. 推送到ES + IndexRequest indexRequest = new IndexRequest("search_products") + .id(spuId) + .source(esDoc); + + esClient.index(indexRequest, RequestOptions.DEFAULT); + } + } + + /** + * 从ES中删除SPU + */ + private void deleteFromES(String tenantId, String spuId) throws Exception { + DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); + esClient.delete(deleteRequest, RequestOptions.DEFAULT); + } +} +``` + +### 示例3:批量增量更新 + +```java +/** + * 批量更新多个SPU + */ +public void batchIndexSpus(String tenantId, List spuIds) { + ExecutorService executor = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + + for (String spuId : spuIds) { + Future future = executor.submit(() -> { + try { + indexSpu(tenantId, spuId); + } catch (Exception e) { + log.error("Failed to index SPU: " + spuId, e); + } + }); + futures.add(future); + } + + // 等待所有任务完成 + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + log.error("Task failed", e); + } + } + + executor.shutdown(); +} +``` + +### 示例4:监听MySQL变更(Canal) + +```java +@CanalEventListener +public class ProductChangeListener { + + @Autowired + private IncrementalIndexer indexer; + + @ListenPoint( + destination = "example", + schema = "saas", + table = {"shoplazza_product_spu", "shoplazza_product_sku"}, + eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE} + ) + public void onEvent(CanalEntry.Entry entry) { + String tableName = entry.getHeader().getTableName(); + String tenantId = extractTenantId(entry); + String spuId = extractSpuId(entry, tableName); + + if (tableName.equals("shoplazza_product_spu")) { + if (entry.getEntryType() == CanalEntry.EntryType.DELETE) { + // SPU删除,从ES删除 + indexer.deleteFromES(tenantId, spuId); + } else { + // SPU新增或更新,重新索引 + indexer.indexSpu(tenantId, spuId); + } + } else if (tableName.equals("shoplazza_product_sku")) { + // SKU变更,需要更新对应的SPU + indexer.indexSpu(tenantId, spuId); + } + } +} +``` + +--- + +## 常见问题 + +### Q1: 全量导入和增量接口的区别? + +- **全量导入**:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。 +- **增量接口**:适合实时同步,按需获取单个SPU数据,响应快速。 + +### Q2: 增量接口返回的数据是否包含向量字段? + +不包含。向量字段(`title_embedding`, `image_embedding`)需要单独生成,不在本接口返回范围内。如需向量字段,需要: +1. 调用本接口获取基础数据 +2. 使用文本/图片编码服务生成向量 +3. 将向量字段添加到文档后推送到ES + +### Q3: 如何处理SPU删除? + +当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档: + +```java +if (response.code() == 404) { + DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); + esClient.delete(deleteRequest, RequestOptions.DEFAULT); +} +``` + +### Q4: 服务启动失败,提示数据库连接错误? + +检查环境变量或配置文件中的数据库连接信息: +- `DB_HOST` +- `DB_PORT` +- `DB_DATABASE` +- `DB_USERNAME` +- `DB_PASSWORD` + +确保这些变量已正确设置,且数据库可访问。 + +### Q5: 接口响应慢怎么办? + +1. 检查数据库连接池配置 +2. 确认预加载数据是否成功(调用 `/indexer/health` 检查) +3. 检查数据库查询性能(SPU、SKU、Option表是否有索引) +4. 考虑使用连接池和缓存优化 + +--- + +## 相关文档 + +- [索引字段说明v2.md](./索引字段说明v2.md) - ES索引字段详细说明 +- [索引字段说明v2-参考表结构.md](./索引字段说明v2-参考表结构.md) - MySQL表结构参考 +- [mappings/search_products.json](../mappings/search_products.json) - ES索引mapping定义 + diff --git a/full_bulk.sh b/full_bulk.sh new file mode 100644 index 0000000..0127b11 --- /dev/null +++ b/full_bulk.sh @@ -0,0 +1,2 @@ +#curl -X POST "http://localhost:6004/indexer/reindex" -H "Content-Type: application/json" -d '{"tenant_id":"162","batch_size":500}' +curl -X POST "http://localhost:6004/indexer/reindex" -H "Content-Type: application/json" -d '{"tenant_id":"170","batch_size":500}' -- libgit2 0.21.2