索引数据接口文档
本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。
目录
租户配置说明
配置文件位置
租户配置存储在统一配置文件 config/config.yaml 中,与索引配置放在同一文件。
配置结构
在 config/config.yaml 中的 tenant_config 部分:
tenant_config:
# 默认配置(未配置的租户使用此配置)
default:
primary_language: "zh"
translate_to_en: true
translate_to_zh: false
# 租户特定配置
tenants:
"1":
primary_language: "zh"
translate_to_en: true
translate_to_zh: false
"162":
primary_language: "zh"
translate_to_en: false
translate_to_zh: false
配置字段说明
| 字段 | 类型 | 说明 | 可选值 |
|---|---|---|---|
primary_language |
string | 主语言(SKU表中title等文本字段的语言) | "zh"(中文)或 "en"(英文) |
translate_to_en |
boolean | 是否需要翻译英文 | true 或 false |
translate_to_zh |
boolean | 是否需要翻译中文 | true 或 false |
配置规则
主语言:指定SKU表中
title、brief、description、vendor等字段的语言。- 如果主语言是
zh,这些字段的值会填充到title_zh、brief_zh等字段 - 如果主语言是
en,这些字段的值会填充到title_en、brief_en等字段
- 如果主语言是
翻译配置:
translate_to_en: true:如果主语言是中文,则会将中文内容翻译为英文,填充到title_en等字段translate_to_zh: true:如果主语言是英文,则会将英文内容翻译为中文,填充到title_zh等字段- 注意:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,
translate_to_en: true不会触发翻译)
默认配置:如果租户ID不在
tenants中,则使用default配置。
配置示例
示例1:中文主语言,需要翻译英文
{
"primary_language": "zh",
"translate_to_en": true,
"translate_to_zh": false
}
- SKU表的
title字段(中文)→title_zh - 翻译服务将中文翻译为英文 →
title_en
示例2:英文主语言,需要翻译中文
{
"primary_language": "en",
"translate_to_en": false,
"translate_to_zh": true
}
- SKU表的
title字段(英文)→title_en - 翻译服务将英文翻译为中文 →
title_zh
示例3:仅使用主语言,不翻译
{
"primary_language": "zh",
"translate_to_en": false,
"translate_to_zh": false
}
- SKU表的
title字段(中文)→title_zh title_en保持为null
配置更新
修改 config/config.yaml 中的 tenant_config 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。
全量数据导入脚本
功能说明
scripts/recreate_and_import.py 是一个全量数据导入脚本,用于:
- 重建ES索引(删除旧索引,使用新的mapping创建新索引)
- 从MySQL数据库批量读取指定租户的所有SPU数据
- 将数据转换为ES文档格式
- 批量导入到Elasticsearch
使用方法
基本用法
python scripts/recreate_and_import.py \
--tenant-id 1 \
--db-host 120.79.247.228 \
--db-port 3306 \
--db-database saas \
--db-username saas \
--db-password your_password \
--es-host http://localhost:9200 \
--batch-size 500
参数说明
| 参数 | 说明 | 是否必需 | 默认值 |
|---|---|---|---|
--tenant-id |
租户ID | 是 | - |
--db-host |
MySQL主机地址 | 否(可用环境变量) | 环境变量 DB_HOST |
--db-port |
MySQL端口 | 否(可用环境变量) | 环境变量 DB_PORT 或 3306 |
--db-database |
MySQL数据库名 | 否(可用环境变量) | 环境变量 DB_DATABASE |
--db-username |
MySQL用户名 | 否(可用环境变量) | 环境变量 DB_USERNAME |
--db-password |
MySQL密码 | 否(可用环境变量) | 环境变量 DB_PASSWORD |
--es-host |
Elasticsearch地址 | 否(可用环境变量) | 环境变量 ES_HOST 或 http://localhost:9200 |
--batch-size |
批量导入大小 | 否 | 500 |
--skip-delete |
跳过删除旧索引步骤 | 否 | False |
环境变量配置
可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息:
export DB_HOST=120.79.247.228
export DB_PORT=3306
export DB_DATABASE=saas
export DB_USERNAME=saas
export DB_PASSWORD=your_password
export ES_HOST=http://localhost:9200
python scripts/recreate_and_import.py --tenant-id 1
执行流程
脚本执行分为以下步骤:
- 加载mapping配置:从
mappings/search_products.json加载ES索引mapping - 连接Elasticsearch:验证ES连接可用性
- 删除旧索引(可选):如果索引已存在,删除旧索引(可通过
--skip-delete跳过) - 创建新索引:使用新的mapping创建索引
- 连接MySQL:建立数据库连接
- 数据转换和导入:
- 从MySQL读取SPU、SKU、Option数据
- 转换为ES文档格式
- 批量导入到ES
输出示例
============================================================
重建ES索引并导入数据
============================================================
[1/4] 加载mapping配置...
✓ 成功加载mapping配置
索引名称: search_products
[2/4] 连接Elasticsearch...
ES地址: http://localhost:9200
✓ Elasticsearch连接成功
[3/4] 删除旧索引...
发现已存在的索引: search_products
✓ 成功删除索引: search_products
[4/4] 创建新索引...
创建索引: search_products
✓ 成功创建索引: search_products
[5/5] 连接MySQL...
MySQL: 120.79.247.228:3306/saas
✓ MySQL连接成功
[6/6] 导入数据...
Tenant ID: 1
批量大小: 500
正在转换数据...
✓ 转换完成: 1000 个文档
正在导入数据到ES (批量大小: 500)...
✓ 导入完成
============================================================
导入完成!
============================================================
成功: 1000
失败: 0
耗时: 12.34秒
注意事项
- 数据量:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。
- 索引重建:默认会删除旧索引,请确保有数据备份。
- 性能:批量大小(
--batch-size)影响导入性能,建议根据ES集群性能调整(默认500)。 - 租户隔离:每次只能导入一个租户的数据,需要为每个租户分别执行。
增量数据获取接口
功能说明
增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于:
- MySQL数据变更后,实时同步到ES
- 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES
- 避免全量重建索引,提高更新效率
接口地址
GET /indexer/spu/{spu_id}?tenant_id={tenant_id}
请求参数
| 参数 | 位置 | 类型 | 说明 | 是否必需 |
|---|---|---|---|---|
spu_id |
路径参数 | string | SPU ID | 是 |
tenant_id |
查询参数 | string | 租户ID | 是 |
请求示例
# cURL
curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1"
# Java (OkHttp)
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://localhost:6002/indexer/spu/123?tenant_id=1")
.get()
.build();
Response response = client.newCall(request).execute();
String json = response.body().string();
响应格式
成功响应(200 OK)
返回完整的ES文档JSON对象,包含所有索引字段:
{
"tenant_id": "1",
"spu_id": "123",
"title_zh": "商品标题",
"title_en": null,
"brief_zh": "商品简介",
"brief_en": null,
"description_zh": "商品详细描述",
"description_en": null,
"vendor_zh": "供应商名称",
"vendor_en": null,
"tags": ["标签1", "标签2"],
"category_path_zh": "类目1/类目2/类目3",
"category_path_en": null,
"category_name_zh": "类目名称",
"category_name_en": null,
"category_id": "100",
"category_name": "类目名称",
"category_level": 3,
"category1_name": "类目1",
"category2_name": "类目2",
"category3_name": "类目3",
"option1_name": "颜色",
"option2_name": "尺寸",
"option3_name": null,
"option1_values": ["红色", "蓝色", "绿色"],
"option2_values": ["S", "M", "L"],
"option3_values": [],
"min_price": 99.99,
"max_price": 199.99,
"compare_at_price": 299.99,
"sku_prices": [99.99, 149.99, 199.99],
"sku_weights": [100, 150, 200],
"sku_weight_units": ["g"],
"total_inventory": 500,
"sales": 1000,
"image_url": "https://example.com/image.jpg",
"create_time": "2024-01-01T00:00:00",
"update_time": "2024-01-02T00:00:00",
"skus": [
{
"sku_id": "456",
"price": 99.99,
"compare_at_price": 149.99,
"sku_code": "SKU001",
"stock": 100,
"weight": 100.0,
"weight_unit": "g",
"option1_value": "红色",
"option2_value": "S",
"option3_value": null,
"image_src": "https://example.com/sku1.jpg"
}
],
"specifications": [
{
"sku_id": "456",
"name": "颜色",
"value": "红色"
},
{
"sku_id": "456",
"name": "尺寸",
"value": "S"
}
]
}
错误响应
404 Not Found - SPU不存在或已删除:
{
"detail": "SPU 123 not found for tenant_id=1 or has been deleted"
}
400 Bad Request - 缺少必需参数:
{
"detail": "tenant_id is required"
}
500 Internal Server Error - 服务器内部错误:
{
"detail": "Internal server error: ..."
}
503 Service Unavailable - 服务未初始化:
{
"detail": "Incremental indexer service is not initialized. Please check database connection."
}
健康检查接口
检查增量索引服务的健康状态:
GET /indexer/health
响应示例
{
"status": "available",
"database": "connected",
"preloaded_data": {
"category_mappings": 150,
"searchable_option_dimensions": ["option1", "option2", "option3"]
}
}
性能优化
服务在启动时预加载以下公共数据,以提高查询性能:
- 分类映射:所有租户共享的分类ID到名称映射
- 配置信息:搜索配置(如
searchable_option_dimensions)
这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。
使用场景
场景1:MySQL变更监听
外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时:
// 伪代码示例
@EventListener
public void onProductChange(ProductChangeEvent event) {
String tenantId = event.getTenantId();
String spuId = event.getSpuId();
// 调用增量接口获取ES文档数据
String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
Map<String, Object> esDoc = httpClient.get(url);
// 推送到ES
elasticsearchClient.index("search_products", esDoc);
}
场景2:定时同步
定时任务扫描变更的商品,批量更新:
// 伪代码示例
List<String> changedSpuIds = getChangedSpuIds();
for (String spuId : changedSpuIds) {
String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId);
Map<String, Object> esDoc = httpClient.get(url);
elasticsearchClient.index("search_products", esDoc);
}
注意事项
- 服务初始化:确保API服务已启动,且数据库连接配置正确(
DB_HOST,DB_DATABASE,DB_USERNAME,DB_PASSWORD)。 - 数据一致性:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。
- 错误处理:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。
- 性能:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。
数据格式说明
ES文档结构
返回的ES文档结构完全符合 mappings/search_products.json 定义的索引结构。主要字段说明:
| 字段类别 | 字段名 | 类型 | 说明 |
|---|---|---|---|
| 基础标识 | tenant_id |
keyword | 租户ID |
| 基础标识 | spu_id |
keyword | SPU ID |
| 文本字段 | title_zh, title_en |
text | 标题(中英文) |
| 文本字段 | brief_zh, brief_en |
text | 简介(中英文) |
| 文本字段 | description_zh, description_en |
text | 描述(中英文) |
| 文本字段 | vendor_zh, vendor_en |
text | 供应商(中英文) |
| 类目字段 | category_path_zh, category_path_en |
text | 类目路径(中英文) |
| 类目字段 | category1_name, category2_name, category3_name |
keyword | 分层类目名称 |
| 价格字段 | min_price, max_price |
float | 价格范围 |
| 库存字段 | total_inventory |
long | 总库存 |
| 销量字段 | sales |
long | 销量 |
| 嵌套字段 | skus |
nested | SKU列表 |
| 嵌套字段 | specifications |
nested | 规格列表 |
详细字段说明请参考:索引字段说明v2.md
SKU嵌套结构
{
"skus": [
{
"sku_id": "456",
"price": 99.99,
"compare_at_price": 149.99,
"sku_code": "SKU001",
"stock": 100,
"weight": 100.0,
"weight_unit": "g",
"option1_value": "红色",
"option2_value": "S",
"option3_value": null,
"image_src": "https://example.com/sku1.jpg"
}
]
}
Specifications嵌套结构
{
"specifications": [
{
"sku_id": "456",
"name": "颜色",
"value": "红色"
},
{
"sku_id": "456",
"name": "尺寸",
"value": "S"
}
]
}
使用示例
示例1:全量导入
# 设置环境变量
export DB_HOST=120.79.247.228
export DB_PORT=3306
export DB_DATABASE=saas
export DB_USERNAME=saas
export DB_PASSWORD=your_password
export ES_HOST=http://localhost:9200
# 执行全量导入
python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500
示例2:增量更新(Java)
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.client.RestHighLevelClient;
public class IncrementalIndexer {
private static final String API_BASE_URL = "http://localhost:6002";
private static final OkHttpClient httpClient = new OkHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final RestHighLevelClient esClient = createESClient();
/**
* 获取SPU的ES文档数据并推送到ES
*/
public void indexSpu(String tenantId, String spuId) throws Exception {
// 1. 调用增量接口获取数据
String url = String.format("%s/indexer/spu/%s?tenant_id=%s",
API_BASE_URL, spuId, tenantId);
Request request = new Request.Builder()
.url(url)
.get()
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (response.code() == 404) {
// SPU已删除,从ES中删除
deleteFromES(tenantId, spuId);
return;
}
if (!response.isSuccessful()) {
throw new RuntimeException("Failed to get SPU data: " + response.code());
}
// 2. 解析JSON响应
String json = response.body().string();
Map<String, Object> esDoc = objectMapper.readValue(json, Map.class);
// 3. 推送到ES
IndexRequest indexRequest = new IndexRequest("search_products")
.id(spuId)
.source(esDoc);
esClient.index(indexRequest, RequestOptions.DEFAULT);
}
}
/**
* 从ES中删除SPU
*/
private void deleteFromES(String tenantId, String spuId) throws Exception {
DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId);
esClient.delete(deleteRequest, RequestOptions.DEFAULT);
}
}
示例3:批量增量更新
/**
* 批量更新多个SPU
*/
public void batchIndexSpus(String tenantId, List<String> spuIds) {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for (String spuId : spuIds) {
Future<?> future = executor.submit(() -> {
try {
indexSpu(tenantId, spuId);
} catch (Exception e) {
log.error("Failed to index SPU: " + spuId, e);
}
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("Task failed", e);
}
}
executor.shutdown();
}
示例4:监听MySQL变更(Canal)
@CanalEventListener
public class ProductChangeListener {
@Autowired
private IncrementalIndexer indexer;
@ListenPoint(
destination = "example",
schema = "saas",
table = {"shoplazza_product_spu", "shoplazza_product_sku"},
eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE}
)
public void onEvent(CanalEntry.Entry entry) {
String tableName = entry.getHeader().getTableName();
String tenantId = extractTenantId(entry);
String spuId = extractSpuId(entry, tableName);
if (tableName.equals("shoplazza_product_spu")) {
if (entry.getEntryType() == CanalEntry.EntryType.DELETE) {
// SPU删除,从ES删除
indexer.deleteFromES(tenantId, spuId);
} else {
// SPU新增或更新,重新索引
indexer.indexSpu(tenantId, spuId);
}
} else if (tableName.equals("shoplazza_product_sku")) {
// SKU变更,需要更新对应的SPU
indexer.indexSpu(tenantId, spuId);
}
}
}
常见问题
Q1: 全量导入和增量接口的区别?
- 全量导入:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。
- 增量接口:适合实时同步,按需获取单个SPU数据,响应快速。
Q2: 增量接口返回的数据是否包含向量字段?
不包含。向量字段(title_embedding, image_embedding)需要单独生成,不在本接口返回范围内。如需向量字段,需要:
- 调用本接口获取基础数据
- 使用文本/图片编码服务生成向量
- 将向量字段添加到文档后推送到ES
Q3: 如何处理SPU删除?
当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档:
if (response.code() == 404) {
DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId);
esClient.delete(deleteRequest, RequestOptions.DEFAULT);
}
Q4: 服务启动失败,提示数据库连接错误?
检查环境变量或配置文件中的数据库连接信息:
DB_HOSTDB_PORTDB_DATABASEDB_USERNAMEDB_PASSWORD
确保这些变量已正确设置,且数据库可访问。
Q5: 接口响应慢怎么办?
- 检查数据库连接池配置
- 确认预加载数据是否成功(调用
/indexer/health检查) - 检查数据库查询性能(SPU、SKU、Option表是否有索引)
- 考虑使用连接池和缓存优化
相关文档
- 索引字段说明v2.md - ES索引字段详细说明
- 索引字段说明v2-参考表结构.md - MySQL表结构参考
- mappings/search_products.json - ES索引mapping定义