索引数据接口文档.md 20.2 KB

索引数据接口文档

本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。

目录

  1. 租户配置说明
  2. 全量数据导入脚本
  3. 增量数据获取接口
  4. 数据格式说明
  5. 使用示例

租户配置说明

配置文件位置

租户配置存储在统一配置文件 config/config.yaml 中,与索引配置放在同一文件。

配置结构

config/config.yaml 中的 tenant_config 部分:

tenant_config:
  # 默认配置(未配置的租户使用此配置)
  default:
    primary_language: "zh"
    translate_to_en: true
    translate_to_zh: false
  # 租户特定配置
  tenants:
    "1":
      primary_language: "zh"
      translate_to_en: true
      translate_to_zh: false
    "162":
      primary_language: "zh"
      translate_to_en: false
      translate_to_zh: false

配置字段说明

字段 类型 说明 可选值
primary_language string 主语言(SKU表中title等文本字段的语言) "zh"(中文)或 "en"(英文)
translate_to_en boolean 是否需要翻译英文 truefalse
translate_to_zh boolean 是否需要翻译中文 truefalse

配置规则

  1. 主语言:指定SKU表中 titlebriefdescriptionvendor 等字段的语言。

    • 如果主语言是 zh,这些字段的值会填充到 title_zhbrief_zh 等字段
    • 如果主语言是 en,这些字段的值会填充到 title_enbrief_en 等字段
  2. 翻译配置

    • translate_to_en: true:如果主语言是中文,则会将中文内容翻译为英文,填充到 title_en 等字段
    • translate_to_zh: true:如果主语言是英文,则会将英文内容翻译为中文,填充到 title_zh 等字段
    • 注意:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,translate_to_en: true 不会触发翻译)
  3. 默认配置:如果租户ID不在 tenants 中,则使用 default 配置。

配置示例

示例1:中文主语言,需要翻译英文

{
  "primary_language": "zh",
  "translate_to_en": true,
  "translate_to_zh": false
}
  • SKU表的 title 字段(中文)→ title_zh
  • 翻译服务将中文翻译为英文 → title_en

示例2:英文主语言,需要翻译中文

{
  "primary_language": "en",
  "translate_to_en": false,
  "translate_to_zh": true
}
  • SKU表的 title 字段(英文)→ title_en
  • 翻译服务将英文翻译为中文 → title_zh

示例3:仅使用主语言,不翻译

{
  "primary_language": "zh",
  "translate_to_en": false,
  "translate_to_zh": false
}
  • SKU表的 title 字段(中文)→ title_zh
  • title_en 保持为 null

配置更新

修改 config/config.yaml 中的 tenant_config 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。


全量数据导入脚本

功能说明

scripts/recreate_and_import.py 是一个全量数据导入脚本,用于:

  • 重建ES索引(删除旧索引,使用新的mapping创建新索引)
  • 从MySQL数据库批量读取指定租户的所有SPU数据
  • 将数据转换为ES文档格式
  • 批量导入到Elasticsearch

使用方法

基本用法

python scripts/recreate_and_import.py \
    --tenant-id 1 \
    --db-host 120.79.247.228 \
    --db-port 3306 \
    --db-database saas \
    --db-username saas \
    --db-password your_password \
    --es-host http://localhost:9200 \
    --batch-size 500

参数说明

参数 说明 是否必需 默认值
--tenant-id 租户ID -
--db-host MySQL主机地址 否(可用环境变量) 环境变量 DB_HOST
--db-port MySQL端口 否(可用环境变量) 环境变量 DB_PORT 或 3306
--db-database MySQL数据库名 否(可用环境变量) 环境变量 DB_DATABASE
--db-username MySQL用户名 否(可用环境变量) 环境变量 DB_USERNAME
--db-password MySQL密码 否(可用环境变量) 环境变量 DB_PASSWORD
--es-host Elasticsearch地址 否(可用环境变量) 环境变量 ES_HOSThttp://localhost:9200
--batch-size 批量导入大小 500
--skip-delete 跳过删除旧索引步骤 False

环境变量配置

可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息:

export DB_HOST=120.79.247.228
export DB_PORT=3306
export DB_DATABASE=saas
export DB_USERNAME=saas
export DB_PASSWORD=your_password
export ES_HOST=http://localhost:9200

python scripts/recreate_and_import.py --tenant-id 1

执行流程

脚本执行分为以下步骤:

  1. 加载mapping配置:从 mappings/search_products.json 加载ES索引mapping
  2. 连接Elasticsearch:验证ES连接可用性
  3. 删除旧索引(可选):如果索引已存在,删除旧索引(可通过 --skip-delete 跳过)
  4. 创建新索引:使用新的mapping创建索引
  5. 连接MySQL:建立数据库连接
  6. 数据转换和导入
    • 从MySQL读取SPU、SKU、Option数据
    • 转换为ES文档格式
    • 批量导入到ES

输出示例

============================================================
重建ES索引并导入数据
============================================================

[1/4] 加载mapping配置...
✓ 成功加载mapping配置
索引名称: search_products

[2/4] 连接Elasticsearch...
ES地址: http://localhost:9200
✓ Elasticsearch连接成功

[3/4] 删除旧索引...
发现已存在的索引: search_products
✓ 成功删除索引: search_products

[4/4] 创建新索引...
创建索引: search_products
✓ 成功创建索引: search_products

[5/5] 连接MySQL...
MySQL: 120.79.247.228:3306/saas
✓ MySQL连接成功

[6/6] 导入数据...
Tenant ID: 1
批量大小: 500
正在转换数据...
✓ 转换完成: 1000 个文档
正在导入数据到ES (批量大小: 500)...
✓ 导入完成

============================================================
导入完成!
============================================================
成功: 1000
失败: 0
耗时: 12.34秒

注意事项

  1. 数据量:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。
  2. 索引重建:默认会删除旧索引,请确保有数据备份。
  3. 性能:批量大小(--batch-size)影响导入性能,建议根据ES集群性能调整(默认500)。
  4. 租户隔离:每次只能导入一个租户的数据,需要为每个租户分别执行。

增量数据获取接口

功能说明

增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于:

  • MySQL数据变更后,实时同步到ES
  • 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES
  • 避免全量重建索引,提高更新效率

接口地址

GET /indexer/spu/{spu_id}?tenant_id={tenant_id}

请求参数

参数 位置 类型 说明 是否必需
spu_id 路径参数 string SPU ID
tenant_id 查询参数 string 租户ID

请求示例

# cURL
curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1"

# Java (OkHttp)
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
    .url("http://localhost:6002/indexer/spu/123?tenant_id=1")
    .get()
    .build();
Response response = client.newCall(request).execute();
String json = response.body().string();

响应格式

成功响应(200 OK)

返回完整的ES文档JSON对象,包含所有索引字段:

{
  "tenant_id": "1",
  "spu_id": "123",
  "title_zh": "商品标题",
  "title_en": null,
  "brief_zh": "商品简介",
  "brief_en": null,
  "description_zh": "商品详细描述",
  "description_en": null,
  "vendor_zh": "供应商名称",
  "vendor_en": null,
  "tags": ["标签1", "标签2"],
  "category_path_zh": "类目1/类目2/类目3",
  "category_path_en": null,
  "category_name_zh": "类目名称",
  "category_name_en": null,
  "category_id": "100",
  "category_name": "类目名称",
  "category_level": 3,
  "category1_name": "类目1",
  "category2_name": "类目2",
  "category3_name": "类目3",
  "option1_name": "颜色",
  "option2_name": "尺寸",
  "option3_name": null,
  "option1_values": ["红色", "蓝色", "绿色"],
  "option2_values": ["S", "M", "L"],
  "option3_values": [],
  "min_price": 99.99,
  "max_price": 199.99,
  "compare_at_price": 299.99,
  "sku_prices": [99.99, 149.99, 199.99],
  "sku_weights": [100, 150, 200],
  "sku_weight_units": ["g"],
  "total_inventory": 500,
  "sales": 1000,
  "image_url": "https://example.com/image.jpg",
  "create_time": "2024-01-01T00:00:00",
  "update_time": "2024-01-02T00:00:00",
  "skus": [
    {
      "sku_id": "456",
      "price": 99.99,
      "compare_at_price": 149.99,
      "sku_code": "SKU001",
      "stock": 100,
      "weight": 100.0,
      "weight_unit": "g",
      "option1_value": "红色",
      "option2_value": "S",
      "option3_value": null,
      "image_src": "https://example.com/sku1.jpg"
    }
  ],
  "specifications": [
    {
      "sku_id": "456",
      "name": "颜色",
      "value": "红色"
    },
    {
      "sku_id": "456",
      "name": "尺寸",
      "value": "S"
    }
  ]
}

错误响应

404 Not Found - SPU不存在或已删除:

{
  "detail": "SPU 123 not found for tenant_id=1 or has been deleted"
}

400 Bad Request - 缺少必需参数:

{
  "detail": "tenant_id is required"
}

500 Internal Server Error - 服务器内部错误:

{
  "detail": "Internal server error: ..."
}

503 Service Unavailable - 服务未初始化:

{
  "detail": "Incremental indexer service is not initialized. Please check database connection."
}

健康检查接口

检查增量索引服务的健康状态:

GET /indexer/health

响应示例

{
  "status": "available",
  "database": "connected",
  "preloaded_data": {
    "category_mappings": 150,
    "searchable_option_dimensions": ["option1", "option2", "option3"]
  }
}

性能优化

服务在启动时预加载以下公共数据,以提高查询性能:

  1. 分类映射:所有租户共享的分类ID到名称映射
  2. 配置信息:搜索配置(如 searchable_option_dimensions

这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。

使用场景

场景1:MySQL变更监听

外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时:

// 伪代码示例
@EventListener
public void onProductChange(ProductChangeEvent event) {
    String tenantId = event.getTenantId();
    String spuId = event.getSpuId();

    // 调用增量接口获取ES文档数据
    String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
    Map<String, Object> esDoc = httpClient.get(url);

    // 推送到ES
    elasticsearchClient.index("search_products", esDoc);
}

场景2:定时同步

定时任务扫描变更的商品,批量更新:

// 伪代码示例
List<String> changedSpuIds = getChangedSpuIds();
for (String spuId : changedSpuIds) {
    String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
    Map<String, Object> esDoc = httpClient.get(url);
    elasticsearchClient.index("search_products", esDoc);
}

注意事项

  1. 服务初始化:确保API服务已启动,且数据库连接配置正确(DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD)。
  2. 数据一致性:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。
  3. 错误处理:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。
  4. 性能:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。

数据格式说明

ES文档结构

返回的ES文档结构完全符合 mappings/search_products.json 定义的索引结构。主要字段说明:

字段类别 字段名 类型 说明
基础标识 tenant_id keyword 租户ID
基础标识 spu_id keyword SPU ID
文本字段 title_zh, title_en text 标题(中英文)
文本字段 brief_zh, brief_en text 简介(中英文)
文本字段 description_zh, description_en text 描述(中英文)
文本字段 vendor_zh, vendor_en text 供应商(中英文)
类目字段 category_path_zh, category_path_en text 类目路径(中英文)
类目字段 category1_name, category2_name, category3_name keyword 分层类目名称
价格字段 min_price, max_price float 价格范围
库存字段 total_inventory long 总库存
销量字段 sales long 销量
嵌套字段 skus nested SKU列表
嵌套字段 specifications nested 规格列表

详细字段说明请参考:索引字段说明v2.md

SKU嵌套结构

{
  "skus": [
    {
      "sku_id": "456",
      "price": 99.99,
      "compare_at_price": 149.99,
      "sku_code": "SKU001",
      "stock": 100,
      "weight": 100.0,
      "weight_unit": "g",
      "option1_value": "红色",
      "option2_value": "S",
      "option3_value": null,
      "image_src": "https://example.com/sku1.jpg"
    }
  ]
}

Specifications嵌套结构

{
  "specifications": [
    {
      "sku_id": "456",
      "name": "颜色",
      "value": "红色"
    },
    {
      "sku_id": "456",
      "name": "尺寸",
      "value": "S"
    }
  ]
}

使用示例

示例1:全量导入

# 设置环境变量
export DB_HOST=120.79.247.228
export DB_PORT=3306
export DB_DATABASE=saas
export DB_USERNAME=saas
export DB_PASSWORD=your_password
export ES_HOST=http://localhost:9200

# 执行全量导入
python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500

示例2:增量更新(Java)

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.client.RestHighLevelClient;

public class IncrementalIndexer {
    private static final String API_BASE_URL = "http://localhost:6002";
    private static final OkHttpClient httpClient = new OkHttpClient();
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final RestHighLevelClient esClient = createESClient();

    /**
     * 获取SPU的ES文档数据并推送到ES
     */
    public void indexSpu(String tenantId, String spuId) throws Exception {
        // 1. 调用增量接口获取数据
        String url = String.format("%s/indexer/spu/%s?tenant_id=%s", 
            API_BASE_URL, spuId, tenantId);

        Request request = new Request.Builder()
            .url(url)
            .get()
            .build();

        try (Response response = httpClient.newCall(request).execute()) {
            if (response.code() == 404) {
                // SPU已删除,从ES中删除
                deleteFromES(tenantId, spuId);
                return;
            }

            if (!response.isSuccessful()) {
                throw new RuntimeException("Failed to get SPU data: " + response.code());
            }

            // 2. 解析JSON响应
            String json = response.body().string();
            Map<String, Object> esDoc = objectMapper.readValue(json, Map.class);

            // 3. 推送到ES
            IndexRequest indexRequest = new IndexRequest("search_products")
                .id(spuId)
                .source(esDoc);

            esClient.index(indexRequest, RequestOptions.DEFAULT);
        }
    }

    /**
     * 从ES中删除SPU
     */
    private void deleteFromES(String tenantId, String spuId) throws Exception {
        DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId);
        esClient.delete(deleteRequest, RequestOptions.DEFAULT);
    }
}

示例3:批量增量更新

/**
 * 批量更新多个SPU
 */
public void batchIndexSpus(String tenantId, List<String> spuIds) {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Future<?>> futures = new ArrayList<>();

    for (String spuId : spuIds) {
        Future<?> future = executor.submit(() -> {
            try {
                indexSpu(tenantId, spuId);
            } catch (Exception e) {
                log.error("Failed to index SPU: " + spuId, e);
            }
        });
        futures.add(future);
    }

    // 等待所有任务完成
    for (Future<?> future : futures) {
        try {
            future.get();
        } catch (Exception e) {
            log.error("Task failed", e);
        }
    }

    executor.shutdown();
}

示例4:监听MySQL变更(Canal)

@CanalEventListener
public class ProductChangeListener {

    @Autowired
    private IncrementalIndexer indexer;

    @ListenPoint(
        destination = "example",
        schema = "saas",
        table = {"shoplazza_product_spu", "shoplazza_product_sku"},
        eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE}
    )
    public void onEvent(CanalEntry.Entry entry) {
        String tableName = entry.getHeader().getTableName();
        String tenantId = extractTenantId(entry);
        String spuId = extractSpuId(entry, tableName);

        if (tableName.equals("shoplazza_product_spu")) {
            if (entry.getEntryType() == CanalEntry.EntryType.DELETE) {
                // SPU删除,从ES删除
                indexer.deleteFromES(tenantId, spuId);
            } else {
                // SPU新增或更新,重新索引
                indexer.indexSpu(tenantId, spuId);
            }
        } else if (tableName.equals("shoplazza_product_sku")) {
            // SKU变更,需要更新对应的SPU
            indexer.indexSpu(tenantId, spuId);
        }
    }
}

常见问题

Q1: 全量导入和增量接口的区别?

  • 全量导入:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。
  • 增量接口:适合实时同步,按需获取单个SPU数据,响应快速。

Q2: 增量接口返回的数据是否包含向量字段?

不包含。向量字段(title_embedding, image_embedding)需要单独生成,不在本接口返回范围内。如需向量字段,需要:

  1. 调用本接口获取基础数据
  2. 使用文本/图片编码服务生成向量
  3. 将向量字段添加到文档后推送到ES

Q3: 如何处理SPU删除?

当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档:

if (response.code() == 404) {
    DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId);
    esClient.delete(deleteRequest, RequestOptions.DEFAULT);
}

Q4: 服务启动失败,提示数据库连接错误?

检查环境变量或配置文件中的数据库连接信息:

  • DB_HOST
  • DB_PORT
  • DB_DATABASE
  • DB_USERNAME
  • DB_PASSWORD

确保这些变量已正确设置,且数据库可访问。

Q5: 接口响应慢怎么办?

  1. 检查数据库连接池配置
  2. 确认预加载数据是否成功(调用 /indexer/health 检查)
  3. 检查数据库查询性能(SPU、SKU、Option表是否有索引)
  4. 考虑使用连接池和缓存优化

相关文档