MySQL 到 Elasticsearch 文档映射说明
1. 概述
本文档详细说明从 MySQL 数据库到 Elasticsearch 的完整数据转换流程,包括 SQL 查询逻辑、字段映射规则、以及复杂字段(类别、多款式)的处理方法。
1.1 数据流转架构
MySQL 数据库
↓
[SQL 查询] → SPU表、SKU表、Option表
↓
[数据加载] → Pandas DataFrame
↓
[文档转换] → ES 文档字典
↓
[批量写入] → Elasticsearch
1.2 核心组件
- IncrementalIndexerService: 增量索引服务,负责数据获取和索引
- SPUDocumentTransformer: 文档转换器,负责将数据库记录转换为 ES 文档
- BulkIndexer: 批量写入器,负责将文档批量写入 ES
2. 数据源表结构
2.1 SPU 表(shoplazza_product_spu)
SPU(Standard Product Unit)表存储商品的基本信息。
主要字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT | SPU主键ID |
tenant_id |
BIGINT | 租户ID |
title |
VARCHAR(500) | 商品标题 |
brief |
VARCHAR(1000) | 商品简介 |
description |
TEXT | 商品详细描述 |
vendor |
VARCHAR(255) | 供应商/品牌 |
category |
VARCHAR(255) | 类目名称 |
category_id |
BIGINT | 类目ID |
category_path |
VARCHAR(500) | 类目路径(逗号分隔的ID列表) |
category_level |
INT | 类目层级 |
tags |
TEXT | 标签(逗号分隔) |
image_src |
VARCHAR(500) | 主图URL |
fake_sales |
INT | 假销量 |
create_time |
DATETIME | 创建时间 |
update_time |
DATETIME | 更新时间 |
deleted |
BIT(1) | 删除标记 |
2.2 SKU 表(shoplazza_product_sku)
SKU(Stock Keeping Unit)表存储商品的变体信息,一个 SPU 可以有多个 SKU。
主要字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT | SKU主键ID |
spu_id |
BIGINT | 关联的SPU ID |
tenant_id |
BIGINT | 租户ID |
sku |
VARCHAR(100) | SKU编码 |
price |
DECIMAL(10,2) | 价格 |
compare_at_price |
DECIMAL(10,2) | 对比价格(原价) |
inventory_quantity |
INT | 库存数量 |
weight |
DECIMAL(10,2) | 重量 |
weight_unit |
VARCHAR(10) | 重量单位 |
option1 |
VARCHAR(255) | 选项1的值(如:颜色-红色) |
option2 |
VARCHAR(255) | 选项2的值(如:尺寸-L) |
option3 |
VARCHAR(255) | 选项3的值(如:材质-棉) |
image_src |
VARCHAR(500) | SKU图片URL |
deleted |
BIT(1) | 删除标记 |
2.3 Option 表(shoplazza_product_option)
Option 表存储选项的名称定义,用于定义 SKU 的选项维度(如:颜色、尺寸、材质)。
主要字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT | Option主键ID |
spu_id |
BIGINT | 关联的SPU ID |
tenant_id |
BIGINT | 租户ID |
position |
INT | 位置序号(1、2、3对应option1、option2、option3) |
name |
VARCHAR(255) | 选项名称(如:"颜色"、"尺寸") |
values |
TEXT | 选项值列表(JSON格式) |
deleted |
BIT(1) | 删除标记 |
3. SQL 查询逻辑
3.1 批量查询 SPU 数据
方法: _load_spus_for_spu_ids(tenant_id, spu_ids, include_deleted=True)
SQL 查询:
SELECT
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
image_src, image_width, image_height, image_path, image_alt,
tags, note, category, category_id, category_google_id,
category_level, category_path,
fake_sales, display_fake_sales,
tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id
AND id IN :spu_ids
[AND deleted = 0] -- 如果 include_deleted=False
查询特点:
- 使用
IN子句批量查询多个 SPU - 支持查询已删除的记录(用于检测删除状态)
- 使用 SQLAlchemy 的
bindparam和expanding=True处理动态 IN 列表
代码位置: indexer/incremental_service.py:223-262
3.2 批量查询 SKU 数据
方法: _load_skus_for_spu_ids(tenant_id, spu_ids)
SQL 查询:
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id
AND deleted = 0
AND spu_id IN :spu_ids
查询特点:
- 批量查询多个 SPU 的所有 SKU
- 只查询未删除的 SKU(
deleted = 0) - 结果按
spu_id分组处理
代码位置: indexer/incremental_service.py:264-288
3.3 批量查询 Option 数据
方法: _load_options_for_spu_ids(tenant_id, spu_ids)
SQL 查询:
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
position, name, `values`, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id
AND deleted = 0
AND spu_id IN :spu_ids
ORDER BY spu_id, position
查询特点:
- 批量查询多个 SPU 的所有 Option
- 按
spu_id和position排序,确保选项顺序一致 position字段对应 SKU 表中的option1、option2、option3
代码位置: indexer/incremental_service.py:290-310
3.4 分类映射加载
方法: load_category_mapping(db_engine)
SQL 查询:
SELECT DISTINCT
category_id,
category
FROM shoplazza_product_spu
WHERE deleted = 0 AND category_id IS NOT NULL
查询特点:
- 全局加载所有租户的分类映射(category_id → category_name)
- 在服务初始化时预加载,避免重复查询
- 用于将
category_path中的 ID 转换为名称
代码位置: indexer/indexing_utils.py:17-51
4. 字段映射详细说明
4.1 基础字段映射
4.1.1 标识字段
| ES 字段 | 数据来源 | 转换逻辑 |
|---|---|---|
tenant_id |
spu_row['tenant_id'] |
直接转换为字符串:str(tenant_id) |
spu_id |
spu_row['id'] |
直接转换为字符串:str(spu_id) |
4.1.2 文本字段(多语言处理)
文本字段支持中英文双语,根据租户配置进行自动翻译。
字段列表:
title.zh/title.enbrief.zh/brief.endescription.zh/description.envendor.zh/vendor.en
填充逻辑:
获取租户主语言:从
tenant_config读取primary_language(默认 'zh')翻译处理:
- 如果配置了
translate_to_en=true且主语言不是 'en',则翻译到英文 - 如果配置了
translate_to_zh=true且主语言不是 'zh',则翻译到中文 - 使用
Translator.translate_for_indexing()方法进行翻译
- 如果配置了
字段填充:
# 示例:title 字段 if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) if self.translator: translations = self.translator.translate_for_indexing( title_text, shop_language=primary_lang, source_lang=primary_lang, translate_to_en=translate_to_en, translate_to_zh=translate_to_zh, ) doc['title.zh'] = translations.get('zh') or (title_text if primary_lang == 'zh' else None) doc['title.en'] = translations.get('en') or (title_text if primary_lang == 'en' else None)
代码位置: indexer/document_transformer.py:168-298
4.1.3 标签字段
| ES 字段 | 数据来源 | 转换逻辑 |
|---|---|---|
tags |
spu_row['tags'] |
逗号分隔字符串 → 列表:[tag.strip() for tag in tags_str.split(',') if tag.strip()] |
示例:
- 数据库:
"标签1,标签2,标签3" - ES:
["标签1", "标签2", "标签3"]
4.1.4 图片字段
| ES 字段 | 数据来源 | 转换逻辑 |
|---|---|---|
image_url |
spu_row['image_src'] |
如果 URL 不以 http 开头,添加 // 前缀 |
代码位置: indexer/document_transformer.py:390-396
4.1.5 销量字段
| ES 字段 | 数据来源 | 转换逻辑 |
|---|---|---|
sales |
spu_row['fake_sales'] |
转换为整数,如果为空或转换失败则设为 0 |
4.1.6 时间字段
| ES 字段 | 数据来源 | 转换逻辑 |
|---|---|---|
create_time |
spu_row['create_time'] |
转换为 ISO 格式字符串:datetime.isoformat() |
update_time |
spu_row['update_time'] |
转换为 ISO 格式字符串:datetime.isoformat() |
4.2 类别(Category)字段映射
类别字段是复杂字段,涉及多级分类的解析和映射。
4.2.1 类别数据来源
类别信息主要来自 SPU 表的以下字段:
category_path: 类目路径,逗号分隔的类目ID列表(如:"1,2,3")category: 类目名称(备用字段)category_id: 当前类目IDcategory_level: 类目层级
4.2.2 类别映射流程
步骤 1:加载分类映射
在服务初始化时,从数据库加载全局分类映射:
category_id_to_name = load_category_mapping(db_engine)
# 结果:{"1": "电子产品", "2": "手机", "3": "iPhone"}
步骤 2:解析 category_path
if pd.notna(spu_row.get('category_path')):
category_path = str(spu_row['category_path']) # 例如:"1,2,3"
category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()]
# 结果:["1", "2", "3"]
步骤 3:ID 转名称
category_names = []
missing_ids = []
for cid in category_ids:
if cid in self.category_id_to_name:
category_names.append(self.category_id_to_name[cid])
else:
missing_ids.append(cid)
步骤 4:数据质量检查
如果存在缺失的类目ID(在映射中找不到),则:
- 记录错误日志
- 不写入任何类目字段(视为没有类目)
- 不阻塞索引流程
步骤 5:填充 ES 字段
if category_names:
# 构建类目路径字符串
category_path_str = '/'.join(category_names)
doc['category_path.zh'] = category_path_str # 例如:"电子产品/手机/iPhone"
# 填充分层类目名称
if len(category_names) > 0:
doc['category1_name'] = category_names[0] # "电子产品"
if len(category_names) > 1:
doc['category2_name'] = category_names[1] # "手机"
if len(category_names) > 2:
doc['category3_name'] = category_names[2] # "iPhone"
步骤 6:备用处理(category_path 为空时)
如果 category_path 为空,使用 category 字段作为备选:
elif pd.notna(spu_row.get('category')):
category = str(spu_row['category'])
doc['category_name_text.zh'] = category
# 尝试从 category 字段解析多级分类(如果包含 "/")
if '/' in category:
path_parts = category.split('/')
if len(path_parts) > 0:
doc['category1_name'] = path_parts[0].strip()
if len(path_parts) > 1:
doc['category2_name'] = path_parts[1].strip()
if len(path_parts) > 2:
doc['category3_name'] = path_parts[2].strip()
4.2.3 类别相关 ES 字段
| ES 字段 | 类型 | 说明 |
|---|---|---|
category_path.zh |
text | 类目路径字符串(如:"电子产品/手机/iPhone") |
category_path.en |
text | 类目路径英文(暂未实现) |
category1_name |
keyword | 一级类目名称 |
category2_name |
keyword | 二级类目名称 |
category3_name |
keyword | 三级类目名称 |
category_id |
keyword | 当前类目ID |
category_level |
integer | 类目层级 |
代码位置: indexer/document_transformer.py:300-376
4.3 多款式(SKU/Options)字段映射
多款式字段是最复杂的部分,涉及 SKU 嵌套结构、选项值提取、规格构建等。
4.3.1 SKU 数据结构
一个 SPU 可以有多个 SKU,每个 SKU 代表一个商品变体(如:红色-L码、蓝色-M码)。
ES 中的 SKU 嵌套结构:
{
"skus": [
{
"sku_id": "123",
"price": 99.99,
"compare_at_price": 129.99,
"sku_code": "SKU001",
"stock": 100,
"weight": 0.5,
"weight_unit": "kg",
"option1_value": "红色",
"option2_value": "L",
"option3_value": "棉",
"image_src": "https://..."
},
{
"sku_id": "124",
"price": 99.99,
"compare_at_price": 129.99,
"sku_code": "SKU002",
"stock": 50,
"weight": 0.5,
"weight_unit": "kg",
"option1_value": "蓝色",
"option2_value": "M",
"option3_value": "棉",
"image_src": "https://..."
}
]
}
4.3.2 SKU 处理流程
步骤 1:构建 Option 名称映射
从 Option 表获取选项名称:
option_name_map = {}
if not options.empty:
for _, opt_row in options.iterrows():
position = opt_row.get('position') # 1, 2, 3
name = opt_row.get('name') # "颜色", "尺寸", "材质"
if pd.notna(position) and pd.notna(name):
option_name_map[int(position)] = str(name)
# 结果:{1: "颜色", 2: "尺寸", 3: "材质"}
步骤 2:转换每个 SKU
for _, sku_row in skus.iterrows():
sku_data = self._transform_sku_row(sku_row, option_name_map)
if sku_data:
skus_list.append(sku_data)
SKU 转换逻辑:
def _transform_sku_row(self, sku_row, option_name_map):
sku_data = {
'sku_id': str(sku_row['id']),
'price': float(sku_row['price']) if pd.notna(sku_row.get('price')) else None,
'compare_at_price': float(sku_row['compare_at_price']) if pd.notna(sku_row.get('compare_at_price')) else None,
'sku_code': str(sku_row['sku']) if pd.notna(sku_row.get('sku')) else None,
'stock': int(sku_row['inventory_quantity']) if pd.notna(sku_row.get('inventory_quantity')) else 0,
'weight': float(sku_row['weight']) if pd.notna(sku_row.get('weight')) else None,
'weight_unit': str(sku_row['weight_unit']) if pd.notna(sku_row.get('weight_unit')) else None,
'image_src': str(sku_row['image_src']) if pd.notna(sku_row.get('image_src')) else None,
}
# 填充选项值
if pd.notna(sku_row.get('option1')):
sku_data['option1_value'] = str(sku_row['option1'])
if pd.notna(sku_row.get('option2')):
sku_data['option2_value'] = str(sku_row['option2'])
if pd.notna(sku_row.get('option3')):
sku_data['option3_value'] = str(sku_row['option3'])
return sku_data
步骤 3:收集聚合信息
在处理 SKU 的同时,收集价格、重量、库存等聚合信息:
prices = []
compare_prices = []
sku_prices = []
sku_weights = []
sku_weight_units = []
total_inventory = 0
for sku_data in skus_list:
if 'price' in sku_data and sku_data['price'] is not None:
prices.append(sku_data['price'])
sku_prices.append(sku_data['price'])
if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None:
compare_prices.append(sku_data['compare_at_price'])
if 'weight' in sku_data and sku_data['weight'] is not None:
sku_weights.append(int(float(sku_data['weight'])))
if 'weight_unit' in sku_data and sku_data['weight_unit']:
sku_weight_units.append(sku_data['weight_unit'])
if 'stock' in sku_data and sku_data['stock'] is not None:
total_inventory += sku_data['stock']
步骤 4:填充 ES 字段
doc['skus'] = skus_list # 嵌套 SKU 数组
# 价格范围
if prices:
doc['min_price'] = float(min(prices))
doc['max_price'] = float(max(prices))
else:
doc['min_price'] = 0.0
doc['max_price'] = 0.0
if compare_prices:
doc['compare_at_price'] = float(max(compare_prices))
# SKU 扁平化字段(用于过滤和聚合)
doc['sku_prices'] = sku_prices # [99.99, 99.99, 129.99]
doc['sku_weights'] = sku_weights # [500, 500, 600]
doc['sku_weight_units'] = list(set(sku_weight_units)) # ["kg"]
doc['total_inventory'] = total_inventory # 150
代码位置: indexer/document_transformer.py:398-480
4.3.3 选项名称字段
从 Option 表获取选项名称,填充到 ES 文档的顶层:
def _fill_option_names(self, doc, options):
if not options.empty:
sorted_options = options.sort_values('position')
if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')):
doc['option1_name'] = str(sorted_options.iloc[0]['name']) # "颜色"
if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')):
doc['option2_name'] = str(sorted_options.iloc[1]['name']) # "尺寸"
if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')):
doc['option3_name'] = str(sorted_options.iloc[2]['name']) # "材质"
ES 字段:
option1_name: 选项1的名称(如:"颜色")option2_name: 选项2的名称(如:"尺寸")option3_name: 选项3的名称(如:"材质")
代码位置: indexer/document_transformer.py:378-388
4.3.4 选项值字段
从所有 SKU 中提取选项值,去重后填充到 ES 文档:
def _fill_option_values(self, doc, skus):
option1_values = []
option2_values = []
option3_values = []
for _, sku_row in skus.iterrows():
if pd.notna(sku_row.get('option1')):
option1_values.append(str(sku_row['option1']))
if pd.notna(sku_row.get('option2')):
option2_values.append(str(sku_row['option2']))
if pd.notna(sku_row.get('option3')):
option3_values.append(str(sku_row['option3']))
# 根据配置决定是否写入索引(searchable_option_dimensions)
if 'option1' in self.searchable_option_dimensions:
doc['option1_values'] = list(set(option1_values)) # ["红色", "蓝色", "绿色"]
else:
doc['option1_values'] = []
# 同样处理 option2_values 和 option3_values
ES 字段:
option1_values: 选项1的所有值列表(如:["红色", "蓝色", "绿色"])option2_values: 选项2的所有值列表(如:["S", "M", "L"])option3_values: 选项3的所有值列表(如:["棉", "涤纶"])
注意: 只有配置在 searchable_option_dimensions 中的选项才会被索引。
代码位置: indexer/document_transformer.py:482-510
4.3.5 规格(Specifications)字段
规格字段用于支持按规格过滤和分面搜索,将 SKU 的选项值结构化存储。
构建逻辑:
specifications = []
for _, sku_row in skus.iterrows():
sku_id = str(sku_row['id'])
# 如果 SKU 有 option1 且 Option 表中有对应的名称
if pd.notna(sku_row.get('option1')) and 1 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[1], # "颜色"
'value': str(sku_row['option1']) # "红色"
})
# 同样处理 option2 和 option3
if pd.notna(sku_row.get('option2')) and 2 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[2], # "尺寸"
'value': str(sku_row['option2']) # "L"
})
if pd.notna(sku_row.get('option3')) and 3 in option_name_map:
specifications.append({
'sku_id': sku_id,
'name': option_name_map[3], # "材质"
'value': str(sku_row['option3']) # "棉"
})
doc['specifications'] = specifications
ES 字段结构:
{
"specifications": [
{"sku_id": "123", "name": "颜色", "value": "红色"},
{"sku_id": "123", "name": "尺寸", "value": "L"},
{"sku_id": "123", "name": "材质", "value": "棉"},
{"sku_id": "124", "name": "颜色", "value": "蓝色"},
{"sku_id": "124", "name": "尺寸", "value": "M"},
{"sku_id": "124", "name": "材质", "value": "棉"}
]
}
用途:
- 支持按规格过滤:
{"specifications": {"name": "颜色", "value": "红色"}} - 支持规格分面:统计每个规格值的数量
代码位置: indexer/document_transformer.py:459-478
4.4 向量字段映射
4.4.1 标题向量(title_embedding)
生成时机:
- 在增量索引中,批量生成 embedding(性能优化)
- 在单条查询中,按需生成
生成逻辑:
# 批量生成(增量索引)
if enable_embedding and encoder and documents:
title_texts = []
title_doc_indices = []
for i, (_, doc) in enumerate(documents):
title_text = doc.get("title.en") or doc.get("title.zh")
if title_text and str(title_text).strip():
title_texts.append(str(title_text))
title_doc_indices.append(i)
if title_texts:
embeddings = encoder.encode(title_texts, batch_size=32)
for j, emb in enumerate(embeddings):
doc_idx = title_doc_indices[j]
if isinstance(emb, np.ndarray):
documents[doc_idx][1]["title_embedding"] = emb.tolist()
ES 字段:
title_embedding: 1024 维浮点数组,用于语义搜索
代码位置:
- 批量生成:
indexer/incremental_service.py:558-576 - 单条生成:
indexer/document_transformer.py:586-619
5. 数据转换完整流程
5.1 增量索引流程
入口方法: IncrementalIndexerService.index_spus_to_es()
流程步骤:
参数处理
- 去重 SPU ID 列表
- 生成索引名称(如果未提供)
显式删除处理(如果提供了
delete_spu_ids)- 遍历
delete_spu_ids,从 ES 中删除对应文档
- 遍历
批量加载数据
- 调用
_load_spus_for_spu_ids()批量加载 SPU 数据 - 调用
_load_skus_for_spu_ids()批量加载 SKU 数据 - 调用
_load_options_for_spu_ids()批量加载 Option 数据
- 调用
自动删除检测
- 检查 SPU 是否在数据库中存在
- 检查 SPU 的
deleted字段 - 如果不存在或已删除,从 ES 中删除对应文档
数据分组
- 将 SKU 按
spu_id分组 - 将 Option 按
spu_id分组
- 将 SKU 按
文档转换
- 遍历每个 SPU
- 调用
transformer.transform_spu_to_doc()转换为 ES 文档 - 收集所有文档
批量生成 Embedding(如果启用)
- 收集所有文档的标题文本
- 批量调用
encoder.encode()(传入 list[str])生成 embedding - 填充到对应文档
批量写入 ES
- 使用
BulkIndexer批量写入文档 - 使用
spu_id作为文档 ID
- 使用
返回结果
- 返回成功/失败统计
- 返回每个 SPU 的处理状态
代码位置: indexer/incremental_service.py:391-679
5.2 文档转换流程
入口方法: SPUDocumentTransformer.transform_spu_to_doc()
流程步骤:
初始化文档字典
doc = {} doc['tenant_id'] = str(tenant_id) doc['spu_id'] = str(spu_id)填充文本字段(多语言)
- 调用
_fill_text_fields()填充 title、brief、description、vendor
- 调用
填充标签
- 解析
tags字段,转换为列表
- 解析
填充类别字段
- 调用
_fill_category_fields()处理类别映射
- 调用
填充选项名称
- 调用
_fill_option_names()从 Option 表获取选项名称
- 调用
填充图片 URL
- 调用
_fill_image_url()处理图片 URL
- 调用
填充销量
- 转换
fake_sales为整数
- 转换
处理 SKU
- 调用
_process_skus()处理所有 SKU - 构建
skus嵌套数组 - 构建
specifications数组 - 计算价格范围、总库存等聚合字段
- 调用
填充选项值
- 调用
_fill_option_values()提取所有选项值
- 调用
填充时间字段
- 转换
create_time和update_time为 ISO 格式
- 转换
返回文档
return doc
代码位置: indexer/document_transformer.py:57-166
6. 特殊处理逻辑
6.1 删除检测
系统支持两种删除方式:
- 显式删除:通过
delete_spu_ids参数显式指定要删除的 SPU - 自动检测删除:
- SPU 在数据库中不存在
- SPU 的
deleted字段为1或b'\x01'
删除处理:
# 检查 deleted 字段(可能是 bit 类型)
def _is_deleted_value(v):
if isinstance(v, bytes):
return v == b"\x01" or v == 1
return bool(v)
spu_df["_is_deleted"] = spu_df["deleted"].apply(_is_deleted_value)
代码位置: indexer/incremental_service.py:481-517
6.2 数据质量检查
6.2.1 类别映射缺失
如果 category_path 中的类目ID在映射中不存在:
- 记录错误日志
- 不写入任何类目字段(视为没有类目)
- 不阻塞索引流程
6.2.2 标题缺失
如果 SPU 没有标题:
- 记录错误日志
- 继续处理(但可能影响搜索效果)
6.2.3 SKU 缺失
如果 SPU 没有 SKU:
- 记录警告日志
- 继续处理(但价格、库存等字段可能为空)
6.3 性能优化
- 批量查询:使用
IN子句批量查询,减少数据库往返 - 缓存分类映射:在服务初始化时预加载,避免重复查询
- 缓存 Transformer:按租户缓存 Transformer 实例,避免重复初始化
- 批量生成 Embedding:收集所有标题后批量生成,利用批处理性能
- 批量写入 ES:使用
BulkIndexer批量写入,提高写入效率
7. ES 文档完整示例
{
"tenant_id": "1",
"spu_id": "12345",
"title.zh": "iPhone 15 Pro Max",
"title.en": "iPhone 15 Pro Max",
"brief.zh": "最新款 iPhone",
"brief.en": "Latest iPhone",
"description.zh": "详细描述...",
"description.en": "Detailed description...",
"vendor.zh": "Apple",
"vendor.en": "Apple",
"tags": ["手机", "智能手机", "Apple"],
"category_path.zh": "电子产品/手机/iPhone",
"category1_name": "电子产品",
"category2_name": "手机",
"category3_name": "iPhone",
"category_id": "3",
"category_level": 3,
"option1_name": "颜色",
"option2_name": "存储容量",
"option3_name": null,
"option1_values": ["深空黑色", "原色钛金属", "白色钛金属"],
"option2_values": ["256GB", "512GB", "1TB"],
"option3_values": [],
"image_url": "https://example.com/image.jpg",
"sales": 1000,
"min_price": 8999.0,
"max_price": 12999.0,
"compare_at_price": 12999.0,
"sku_prices": [8999.0, 10999.0, 12999.0],
"sku_weights": [221, 221, 221],
"sku_weight_units": ["g"],
"total_inventory": 500,
"create_time": "2024-01-01T00:00:00",
"update_time": "2024-01-15T10:30:00",
"title_embedding": [0.123, 0.456, ...], // 1024维向量
"skus": [
{
"sku_id": "1001",
"price": 8999.0,
"compare_at_price": 9999.0,
"sku_code": "IP15PM-256-BLK",
"stock": 100,
"weight": 221.0,
"weight_unit": "g",
"option1_value": "深空黑色",
"option2_value": "256GB",
"option3_value": null,
"image_src": "https://example.com/sku1.jpg"
},
{
"sku_id": "1002",
"price": 10999.0,
"compare_at_price": 11999.0,
"sku_code": "IP15PM-512-BLK",
"stock": 200,
"weight": 221.0,
"weight_unit": "g",
"option1_value": "深空黑色",
"option2_value": "512GB",
"option3_value": null,
"image_src": "https://example.com/sku2.jpg"
}
],
"specifications": [
{"sku_id": "1001", "name": "颜色", "value": "深空黑色"},
{"sku_id": "1001", "name": "存储容量", "value": "256GB"},
{"sku_id": "1002", "name": "颜色", "value": "深空黑色"},
{"sku_id": "1002", "name": "存储容量", "value": "512GB"}
]
}
8. 总结
8.1 关键要点
- 数据源:三个主要表(SPU、SKU、Option)
- 批量查询:使用
IN子句提高查询效率 - 类别映射:通过预加载的映射表将 ID 转换为名称
- 多款式处理:通过嵌套结构和扁平化字段支持复杂的 SKU 查询
- 多语言支持:自动翻译文本字段,支持中英文双语
- 向量搜索:批量生成 embedding,支持语义搜索
8.2 复杂字段处理总结
| 字段类型 | 处理方式 | 关键逻辑 |
|---|---|---|
| 类别 | ID映射 + 路径解析 | 从 category_path 解析ID,通过映射表转换为名称,构建多级分类 |
| SKU | 嵌套数组 + 聚合计算 | 将每个 SKU 转换为嵌套对象,同时计算价格范围、总库存等聚合值 |
| 选项 | 名称 + 值分离 | 从 Option 表获取名称,从 SKU 表提取值,分别存储 |
| 规格 | 结构化数组 | 将 SKU 的选项值转换为 {name, value, sku_id} 结构,支持过滤和分面 |
附录:相关代码文件
indexer/incremental_service.py: 增量索引服务,数据获取逻辑indexer/document_transformer.py: 文档转换器,字段填充逻辑indexer/indexing_utils.py: 工具函数,分类映射加载和 Transformer 创建indexer/bulk_indexer.py: 批量写入器,ES 写入逻辑