MySQL到ES文档映射说明.md 29 KB

MySQL 到 Elasticsearch 文档映射说明

1. 概述

本文档详细说明从 MySQL 数据库到 Elasticsearch 的完整数据转换流程,包括 SQL 查询逻辑、字段映射规则、以及复杂字段(类别、多款式)的处理方法。

1.1 数据流转架构

MySQL 数据库
    ↓
[SQL 查询] → SPU表、SKU表、Option表
    ↓
[数据加载] → Pandas DataFrame
    ↓
[文档转换] → ES 文档字典
    ↓
[批量写入] → Elasticsearch

1.2 核心组件

  • IncrementalIndexerService: 增量索引服务,负责数据获取和索引
  • SPUDocumentTransformer: 文档转换器,负责将数据库记录转换为 ES 文档
  • BulkIndexer: 批量写入器,负责将文档批量写入 ES

2. 数据源表结构

2.1 SPU 表(shoplazza_product_spu)

SPU(Standard Product Unit)表存储商品的基本信息。

主要字段:

字段名 类型 说明
id BIGINT SPU主键ID
tenant_id BIGINT 租户ID
title VARCHAR(500) 商品标题
brief VARCHAR(1000) 商品简介
description TEXT 商品详细描述
vendor VARCHAR(255) 供应商/品牌
category VARCHAR(255) 类目名称
category_id BIGINT 类目ID
category_path VARCHAR(500) 类目路径(逗号分隔的ID列表)
category_level INT 类目层级
tags TEXT 标签(逗号分隔)
image_src VARCHAR(500) 主图URL
fake_sales INT 假销量
create_time DATETIME 创建时间
update_time DATETIME 更新时间
deleted BIT(1) 删除标记

2.2 SKU 表(shoplazza_product_sku)

SKU(Stock Keeping Unit)表存储商品的变体信息,一个 SPU 可以有多个 SKU。

主要字段:

字段名 类型 说明
id BIGINT SKU主键ID
spu_id BIGINT 关联的SPU ID
tenant_id BIGINT 租户ID
sku VARCHAR(100) SKU编码
price DECIMAL(10,2) 价格
compare_at_price DECIMAL(10,2) 对比价格(原价)
inventory_quantity INT 库存数量
weight DECIMAL(10,2) 重量
weight_unit VARCHAR(10) 重量单位
option1 VARCHAR(255) 选项1的值(如:颜色-红色)
option2 VARCHAR(255) 选项2的值(如:尺寸-L)
option3 VARCHAR(255) 选项3的值(如:材质-棉)
image_src VARCHAR(500) SKU图片URL
deleted BIT(1) 删除标记

2.3 Option 表(shoplazza_product_option)

Option 表存储选项的名称定义,用于定义 SKU 的选项维度(如:颜色、尺寸、材质)。

主要字段:

字段名 类型 说明
id BIGINT Option主键ID
spu_id BIGINT 关联的SPU ID
tenant_id BIGINT 租户ID
position INT 位置序号(1、2、3对应option1、option2、option3)
name VARCHAR(255) 选项名称(如:"颜色"、"尺寸")
values TEXT 选项值列表(JSON格式)
deleted BIT(1) 删除标记

3. SQL 查询逻辑

3.1 批量查询 SPU 数据

方法: _load_spus_for_spu_ids(tenant_id, spu_ids, include_deleted=True)

SQL 查询:

SELECT 
    id, shop_id, shoplazza_id, title, brief, description,
    spu, vendor, vendor_url,
    image_src, image_width, image_height, image_path, image_alt,
    tags, note, category, category_id, category_google_id,
    category_level, category_path,
    fake_sales, display_fake_sales,
    tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id 
  AND id IN :spu_ids
  [AND deleted = 0]  -- 如果 include_deleted=False

查询特点:

  • 使用 IN 子句批量查询多个 SPU
  • 支持查询已删除的记录(用于检测删除状态)
  • 使用 SQLAlchemy 的 bindparamexpanding=True 处理动态 IN 列表

代码位置: indexer/incremental_service.py:223-262

3.2 批量查询 SKU 数据

方法: _load_skus_for_spu_ids(tenant_id, spu_ids)

SQL 查询:

SELECT 
    id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
    shoplazza_image_id, title, sku, barcode, position,
    price, compare_at_price, cost_price,
    option1, option2, option3,
    inventory_quantity, weight, weight_unit, image_src,
    wholesale_price, note, extend,
    shoplazza_created_at, shoplazza_updated_at, tenant_id,
    creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id 
  AND deleted = 0 
  AND spu_id IN :spu_ids

查询特点:

  • 批量查询多个 SPU 的所有 SKU
  • 只查询未删除的 SKU(deleted = 0
  • 结果按 spu_id 分组处理

代码位置: indexer/incremental_service.py:264-288

3.3 批量查询 Option 数据

方法: _load_options_for_spu_ids(tenant_id, spu_ids)

SQL 查询:

SELECT 
    id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
    position, name, `values`, tenant_id,
    creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id 
  AND deleted = 0 
  AND spu_id IN :spu_ids
ORDER BY spu_id, position

查询特点:

  • 批量查询多个 SPU 的所有 Option
  • spu_idposition 排序,确保选项顺序一致
  • position 字段对应 SKU 表中的 option1option2option3

代码位置: indexer/incremental_service.py:290-310

3.4 分类映射加载

方法: load_category_mapping(db_engine)

SQL 查询:

SELECT DISTINCT
    category_id,
    category
FROM shoplazza_product_spu
WHERE deleted = 0 AND category_id IS NOT NULL

查询特点:

  • 全局加载所有租户的分类映射(category_id → category_name)
  • 在服务初始化时预加载,避免重复查询
  • 用于将 category_path 中的 ID 转换为名称

代码位置: indexer/indexing_utils.py:17-51


4. 字段映射详细说明

4.1 基础字段映射

4.1.1 标识字段

ES 字段 数据来源 转换逻辑
tenant_id spu_row['tenant_id'] 直接转换为字符串:str(tenant_id)
spu_id spu_row['id'] 直接转换为字符串:str(spu_id)

4.1.2 文本字段(多语言处理)

文本字段支持中英文双语,根据租户配置进行自动翻译。

字段列表:

  • title.zh / title.en
  • brief.zh / brief.en
  • description.zh / description.en
  • vendor.zh / vendor.en

填充逻辑:

  1. 获取租户主语言:从 tenant_config 读取 primary_language(默认 'zh')

  2. 翻译处理

    • 如果配置了 translate_to_en=true 且主语言不是 'en',则翻译到英文
    • 如果配置了 translate_to_zh=true 且主语言不是 'zh',则翻译到中文
    • 使用 Translator.translate_for_indexing() 方法进行翻译
  3. 字段填充

    # 示例:title 字段
    if pd.notna(spu_row.get('title')):
       title_text = str(spu_row['title'])
       if self.translator:
           translations = self.translator.translate_for_indexing(
               title_text,
               shop_language=primary_lang,
               source_lang=primary_lang,
               translate_to_en=translate_to_en,
               translate_to_zh=translate_to_zh,
           )
           doc['title.zh'] = translations.get('zh') or (title_text if primary_lang == 'zh' else None)
           doc['title.en'] = translations.get('en') or (title_text if primary_lang == 'en' else None)
    

代码位置: indexer/document_transformer.py:168-298

4.1.3 标签字段

ES 字段 数据来源 转换逻辑
tags spu_row['tags'] 逗号分隔字符串 → 列表:[tag.strip() for tag in tags_str.split(',') if tag.strip()]

示例:

  • 数据库:"标签1,标签2,标签3"
  • ES:["标签1", "标签2", "标签3"]

4.1.4 图片字段

ES 字段 数据来源 转换逻辑
image_url spu_row['image_src'] 如果 URL 不以 http 开头,添加 // 前缀

代码位置: indexer/document_transformer.py:390-396

4.1.5 销量字段

ES 字段 数据来源 转换逻辑
sales spu_row['fake_sales'] 转换为整数,如果为空或转换失败则设为 0

4.1.6 时间字段

ES 字段 数据来源 转换逻辑
create_time spu_row['create_time'] 转换为 ISO 格式字符串:datetime.isoformat()
update_time spu_row['update_time'] 转换为 ISO 格式字符串:datetime.isoformat()

4.2 类别(Category)字段映射

类别字段是复杂字段,涉及多级分类的解析和映射。

4.2.1 类别数据来源

类别信息主要来自 SPU 表的以下字段:

  • category_path: 类目路径,逗号分隔的类目ID列表(如:"1,2,3"
  • category: 类目名称(备用字段)
  • category_id: 当前类目ID
  • category_level: 类目层级

4.2.2 类别映射流程

步骤 1:加载分类映射

在服务初始化时,从数据库加载全局分类映射:

category_id_to_name = load_category_mapping(db_engine)
# 结果:{"1": "电子产品", "2": "手机", "3": "iPhone"}

步骤 2:解析 category_path

if pd.notna(spu_row.get('category_path')):
    category_path = str(spu_row['category_path'])  # 例如:"1,2,3"
    category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()]
    # 结果:["1", "2", "3"]

步骤 3:ID 转名称

category_names = []
missing_ids = []
for cid in category_ids:
    if cid in self.category_id_to_name:
        category_names.append(self.category_id_to_name[cid])
    else:
        missing_ids.append(cid)

步骤 4:数据质量检查

如果存在缺失的类目ID(在映射中找不到),则:

  • 记录错误日志
  • 不写入任何类目字段(视为没有类目)
  • 不阻塞索引流程

步骤 5:填充 ES 字段

if category_names:
    # 构建类目路径字符串
    category_path_str = '/'.join(category_names)
    doc['category_path.zh'] = category_path_str  # 例如:"电子产品/手机/iPhone"

    # 填充分层类目名称
    if len(category_names) > 0:
        doc['category1_name'] = category_names[0]  # "电子产品"
    if len(category_names) > 1:
        doc['category2_name'] = category_names[1]  # "手机"
    if len(category_names) > 2:
        doc['category3_name'] = category_names[2]  # "iPhone"

步骤 6:备用处理(category_path 为空时)

如果 category_path 为空,使用 category 字段作为备选:

elif pd.notna(spu_row.get('category')):
    category = str(spu_row['category'])
    doc['category_name_text.zh'] = category

    # 尝试从 category 字段解析多级分类(如果包含 "/")
    if '/' in category:
        path_parts = category.split('/')
        if len(path_parts) > 0:
            doc['category1_name'] = path_parts[0].strip()
        if len(path_parts) > 1:
            doc['category2_name'] = path_parts[1].strip()
        if len(path_parts) > 2:
            doc['category3_name'] = path_parts[2].strip()

4.2.3 类别相关 ES 字段

ES 字段 类型 说明
category_path.zh text 类目路径字符串(如:"电子产品/手机/iPhone")
category_path.en text 类目路径英文(暂未实现)
category1_name keyword 一级类目名称
category2_name keyword 二级类目名称
category3_name keyword 三级类目名称
category_id keyword 当前类目ID
category_level integer 类目层级

代码位置: indexer/document_transformer.py:300-376


4.3 多款式(SKU/Options)字段映射

多款式字段是最复杂的部分,涉及 SKU 嵌套结构、选项值提取、规格构建等。

4.3.1 SKU 数据结构

一个 SPU 可以有多个 SKU,每个 SKU 代表一个商品变体(如:红色-L码、蓝色-M码)。

ES 中的 SKU 嵌套结构:

{
  "skus": [
    {
      "sku_id": "123",
      "price": 99.99,
      "compare_at_price": 129.99,
      "sku_code": "SKU001",
      "stock": 100,
      "weight": 0.5,
      "weight_unit": "kg",
      "option1_value": "红色",
      "option2_value": "L",
      "option3_value": "棉",
      "image_src": "https://..."
    },
    {
      "sku_id": "124",
      "price": 99.99,
      "compare_at_price": 129.99,
      "sku_code": "SKU002",
      "stock": 50,
      "weight": 0.5,
      "weight_unit": "kg",
      "option1_value": "蓝色",
      "option2_value": "M",
      "option3_value": "棉",
      "image_src": "https://..."
    }
  ]
}

4.3.2 SKU 处理流程

步骤 1:构建 Option 名称映射

从 Option 表获取选项名称:

option_name_map = {}
if not options.empty:
    for _, opt_row in options.iterrows():
        position = opt_row.get('position')  # 1, 2, 3
        name = opt_row.get('name')          # "颜色", "尺寸", "材质"
        if pd.notna(position) and pd.notna(name):
            option_name_map[int(position)] = str(name)
# 结果:{1: "颜色", 2: "尺寸", 3: "材质"}

步骤 2:转换每个 SKU

for _, sku_row in skus.iterrows():
    sku_data = self._transform_sku_row(sku_row, option_name_map)
    if sku_data:
        skus_list.append(sku_data)

SKU 转换逻辑:

def _transform_sku_row(self, sku_row, option_name_map):
    sku_data = {
        'sku_id': str(sku_row['id']),
        'price': float(sku_row['price']) if pd.notna(sku_row.get('price')) else None,
        'compare_at_price': float(sku_row['compare_at_price']) if pd.notna(sku_row.get('compare_at_price')) else None,
        'sku_code': str(sku_row['sku']) if pd.notna(sku_row.get('sku')) else None,
        'stock': int(sku_row['inventory_quantity']) if pd.notna(sku_row.get('inventory_quantity')) else 0,
        'weight': float(sku_row['weight']) if pd.notna(sku_row.get('weight')) else None,
        'weight_unit': str(sku_row['weight_unit']) if pd.notna(sku_row.get('weight_unit')) else None,
        'image_src': str(sku_row['image_src']) if pd.notna(sku_row.get('image_src')) else None,
    }

    # 填充选项值
    if pd.notna(sku_row.get('option1')):
        sku_data['option1_value'] = str(sku_row['option1'])
    if pd.notna(sku_row.get('option2')):
        sku_data['option2_value'] = str(sku_row['option2'])
    if pd.notna(sku_row.get('option3')):
        sku_data['option3_value'] = str(sku_row['option3'])

    return sku_data

步骤 3:收集聚合信息

在处理 SKU 的同时,收集价格、重量、库存等聚合信息:

prices = []
compare_prices = []
sku_prices = []
sku_weights = []
sku_weight_units = []
total_inventory = 0

for sku_data in skus_list:
    if 'price' in sku_data and sku_data['price'] is not None:
        prices.append(sku_data['price'])
        sku_prices.append(sku_data['price'])

    if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None:
        compare_prices.append(sku_data['compare_at_price'])

    if 'weight' in sku_data and sku_data['weight'] is not None:
        sku_weights.append(int(float(sku_data['weight'])))

    if 'weight_unit' in sku_data and sku_data['weight_unit']:
        sku_weight_units.append(sku_data['weight_unit'])

    if 'stock' in sku_data and sku_data['stock'] is not None:
        total_inventory += sku_data['stock']

步骤 4:填充 ES 字段

doc['skus'] = skus_list  # 嵌套 SKU 数组

# 价格范围
if prices:
    doc['min_price'] = float(min(prices))
    doc['max_price'] = float(max(prices))
else:
    doc['min_price'] = 0.0
    doc['max_price'] = 0.0

if compare_prices:
    doc['compare_at_price'] = float(max(compare_prices))

# SKU 扁平化字段(用于过滤和聚合)
doc['sku_prices'] = sku_prices           # [99.99, 99.99, 129.99]
doc['sku_weights'] = sku_weights          # [500, 500, 600]
doc['sku_weight_units'] = list(set(sku_weight_units))  # ["kg"]
doc['total_inventory'] = total_inventory  # 150

代码位置: indexer/document_transformer.py:398-480

4.3.3 选项名称字段

从 Option 表获取选项名称,填充到 ES 文档的顶层:

def _fill_option_names(self, doc, options):
    if not options.empty:
        sorted_options = options.sort_values('position')
        if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')):
            doc['option1_name'] = str(sorted_options.iloc[0]['name'])  # "颜色"
        if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')):
            doc['option2_name'] = str(sorted_options.iloc[1]['name'])  # "尺寸"
        if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')):
            doc['option3_name'] = str(sorted_options.iloc[2]['name'])  # "材质"

ES 字段:

  • option1_name: 选项1的名称(如:"颜色")
  • option2_name: 选项2的名称(如:"尺寸")
  • option3_name: 选项3的名称(如:"材质")

代码位置: indexer/document_transformer.py:378-388

4.3.4 选项值字段

从所有 SKU 中提取选项值,去重后填充到 ES 文档:

def _fill_option_values(self, doc, skus):
    option1_values = []
    option2_values = []
    option3_values = []

    for _, sku_row in skus.iterrows():
        if pd.notna(sku_row.get('option1')):
            option1_values.append(str(sku_row['option1']))
        if pd.notna(sku_row.get('option2')):
            option2_values.append(str(sku_row['option2']))
        if pd.notna(sku_row.get('option3')):
            option3_values.append(str(sku_row['option3']))

    # 根据配置决定是否写入索引(searchable_option_dimensions)
    if 'option1' in self.searchable_option_dimensions:
        doc['option1_values'] = list(set(option1_values))  # ["红色", "蓝色", "绿色"]
    else:
        doc['option1_values'] = []

    # 同样处理 option2_values 和 option3_values

ES 字段:

  • option1_values: 选项1的所有值列表(如:["红色", "蓝色", "绿色"]
  • option2_values: 选项2的所有值列表(如:["S", "M", "L"]
  • option3_values: 选项3的所有值列表(如:["棉", "涤纶"]

注意: 只有配置在 searchable_option_dimensions 中的选项才会被索引。

代码位置: indexer/document_transformer.py:482-510

4.3.5 规格(Specifications)字段

规格字段用于支持按规格过滤和分面搜索,将 SKU 的选项值结构化存储。

构建逻辑:

specifications = []

for _, sku_row in skus.iterrows():
    sku_id = str(sku_row['id'])

    # 如果 SKU 有 option1 且 Option 表中有对应的名称
    if pd.notna(sku_row.get('option1')) and 1 in option_name_map:
        specifications.append({
            'sku_id': sku_id,
            'name': option_name_map[1],      # "颜色"
            'value': str(sku_row['option1']) # "红色"
        })

    # 同样处理 option2 和 option3
    if pd.notna(sku_row.get('option2')) and 2 in option_name_map:
        specifications.append({
            'sku_id': sku_id,
            'name': option_name_map[2],      # "尺寸"
            'value': str(sku_row['option2']) # "L"
        })

    if pd.notna(sku_row.get('option3')) and 3 in option_name_map:
        specifications.append({
            'sku_id': sku_id,
            'name': option_name_map[3],      # "材质"
            'value': str(sku_row['option3']) # "棉"
        })

doc['specifications'] = specifications

ES 字段结构:

{
  "specifications": [
    {"sku_id": "123", "name": "颜色", "value": "红色"},
    {"sku_id": "123", "name": "尺寸", "value": "L"},
    {"sku_id": "123", "name": "材质", "value": "棉"},
    {"sku_id": "124", "name": "颜色", "value": "蓝色"},
    {"sku_id": "124", "name": "尺寸", "value": "M"},
    {"sku_id": "124", "name": "材质", "value": "棉"}
  ]
}

用途:

  • 支持按规格过滤:{"specifications": {"name": "颜色", "value": "红色"}}
  • 支持规格分面:统计每个规格值的数量

代码位置: indexer/document_transformer.py:459-478


4.4 向量字段映射

4.4.1 标题向量(title_embedding)

生成时机:

  • 在增量索引中,批量生成 embedding(性能优化)
  • 在单条查询中,按需生成

生成逻辑:

# 批量生成(增量索引)
if enable_embedding and encoder and documents:
    title_texts = []
    title_doc_indices = []
    for i, (_, doc) in enumerate(documents):
        title_text = doc.get("title.en") or doc.get("title.zh")
        if title_text and str(title_text).strip():
            title_texts.append(str(title_text))
            title_doc_indices.append(i)

    if title_texts:
        embeddings = encoder.encode(title_texts, batch_size=32)
        for j, emb in enumerate(embeddings):
            doc_idx = title_doc_indices[j]
            if isinstance(emb, np.ndarray):
                documents[doc_idx][1]["title_embedding"] = emb.tolist()

ES 字段:

  • title_embedding: 1024 维浮点数组,用于语义搜索

代码位置:

  • 批量生成:indexer/incremental_service.py:558-576
  • 单条生成:indexer/document_transformer.py:586-619

5. 数据转换完整流程

5.1 增量索引流程

入口方法: IncrementalIndexerService.index_spus_to_es()

流程步骤:

  1. 参数处理

    • 去重 SPU ID 列表
    • 生成索引名称(如果未提供)
  2. 显式删除处理(如果提供了 delete_spu_ids

    • 遍历 delete_spu_ids,从 ES 中删除对应文档
  3. 批量加载数据

    • 调用 _load_spus_for_spu_ids() 批量加载 SPU 数据
    • 调用 _load_skus_for_spu_ids() 批量加载 SKU 数据
    • 调用 _load_options_for_spu_ids() 批量加载 Option 数据
  4. 自动删除检测

    • 检查 SPU 是否在数据库中存在
    • 检查 SPU 的 deleted 字段
    • 如果不存在或已删除,从 ES 中删除对应文档
  5. 数据分组

    • 将 SKU 按 spu_id 分组
    • 将 Option 按 spu_id 分组
  6. 文档转换

    • 遍历每个 SPU
    • 调用 transformer.transform_spu_to_doc() 转换为 ES 文档
    • 收集所有文档
  7. 批量生成 Embedding(如果启用)

    • 收集所有文档的标题文本
    • 批量调用 encoder.encode()(传入 list[str])生成 embedding
    • 填充到对应文档
  8. 批量写入 ES

    • 使用 BulkIndexer 批量写入文档
    • 使用 spu_id 作为文档 ID
  9. 返回结果

    • 返回成功/失败统计
    • 返回每个 SPU 的处理状态

代码位置: indexer/incremental_service.py:391-679

5.2 文档转换流程

入口方法: SPUDocumentTransformer.transform_spu_to_doc()

流程步骤:

  1. 初始化文档字典

    doc = {}
    doc['tenant_id'] = str(tenant_id)
    doc['spu_id'] = str(spu_id)
    
  2. 填充文本字段(多语言)

    • 调用 _fill_text_fields() 填充 title、brief、description、vendor
  3. 填充标签

    • 解析 tags 字段,转换为列表
  4. 填充类别字段

    • 调用 _fill_category_fields() 处理类别映射
  5. 填充选项名称

    • 调用 _fill_option_names() 从 Option 表获取选项名称
  6. 填充图片 URL

    • 调用 _fill_image_url() 处理图片 URL
  7. 填充销量

    • 转换 fake_sales 为整数
  8. 处理 SKU

    • 调用 _process_skus() 处理所有 SKU
    • 构建 skus 嵌套数组
    • 构建 specifications 数组
    • 计算价格范围、总库存等聚合字段
  9. 填充选项值

    • 调用 _fill_option_values() 提取所有选项值
  10. 填充时间字段

    • 转换 create_timeupdate_time 为 ISO 格式
  11. 返回文档

    return doc
    

代码位置: indexer/document_transformer.py:57-166


6. 特殊处理逻辑

6.1 删除检测

系统支持两种删除方式:

  1. 显式删除:通过 delete_spu_ids 参数显式指定要删除的 SPU
  2. 自动检测删除
    • SPU 在数据库中不存在
    • SPU 的 deleted 字段为 1b'\x01'

删除处理:

# 检查 deleted 字段(可能是 bit 类型)
def _is_deleted_value(v):
    if isinstance(v, bytes):
        return v == b"\x01" or v == 1
    return bool(v)

spu_df["_is_deleted"] = spu_df["deleted"].apply(_is_deleted_value)

代码位置: indexer/incremental_service.py:481-517

6.2 数据质量检查

6.2.1 类别映射缺失

如果 category_path 中的类目ID在映射中不存在:

  • 记录错误日志
  • 不写入任何类目字段(视为没有类目)
  • 不阻塞索引流程

6.2.2 标题缺失

如果 SPU 没有标题:

  • 记录错误日志
  • 继续处理(但可能影响搜索效果)

6.2.3 SKU 缺失

如果 SPU 没有 SKU:

  • 记录警告日志
  • 继续处理(但价格、库存等字段可能为空)

6.3 性能优化

  1. 批量查询:使用 IN 子句批量查询,减少数据库往返
  2. 缓存分类映射:在服务初始化时预加载,避免重复查询
  3. 缓存 Transformer:按租户缓存 Transformer 实例,避免重复初始化
  4. 批量生成 Embedding:收集所有标题后批量生成,利用批处理性能
  5. 批量写入 ES:使用 BulkIndexer 批量写入,提高写入效率

7. ES 文档完整示例

{
  "tenant_id": "1",
  "spu_id": "12345",
  "title.zh": "iPhone 15 Pro Max",
  "title.en": "iPhone 15 Pro Max",
  "brief.zh": "最新款 iPhone",
  "brief.en": "Latest iPhone",
  "description.zh": "详细描述...",
  "description.en": "Detailed description...",
  "vendor.zh": "Apple",
  "vendor.en": "Apple",
  "tags": ["手机", "智能手机", "Apple"],
  "category_path.zh": "电子产品/手机/iPhone",
  "category1_name": "电子产品",
  "category2_name": "手机",
  "category3_name": "iPhone",
  "category_id": "3",
  "category_level": 3,
  "option1_name": "颜色",
  "option2_name": "存储容量",
  "option3_name": null,
  "option1_values": ["深空黑色", "原色钛金属", "白色钛金属"],
  "option2_values": ["256GB", "512GB", "1TB"],
  "option3_values": [],
  "image_url": "https://example.com/image.jpg",
  "sales": 1000,
  "min_price": 8999.0,
  "max_price": 12999.0,
  "compare_at_price": 12999.0,
  "sku_prices": [8999.0, 10999.0, 12999.0],
  "sku_weights": [221, 221, 221],
  "sku_weight_units": ["g"],
  "total_inventory": 500,
  "create_time": "2024-01-01T00:00:00",
  "update_time": "2024-01-15T10:30:00",
  "title_embedding": [0.123, 0.456, ...],  // 1024维向量
  "skus": [
    {
      "sku_id": "1001",
      "price": 8999.0,
      "compare_at_price": 9999.0,
      "sku_code": "IP15PM-256-BLK",
      "stock": 100,
      "weight": 221.0,
      "weight_unit": "g",
      "option1_value": "深空黑色",
      "option2_value": "256GB",
      "option3_value": null,
      "image_src": "https://example.com/sku1.jpg"
    },
    {
      "sku_id": "1002",
      "price": 10999.0,
      "compare_at_price": 11999.0,
      "sku_code": "IP15PM-512-BLK",
      "stock": 200,
      "weight": 221.0,
      "weight_unit": "g",
      "option1_value": "深空黑色",
      "option2_value": "512GB",
      "option3_value": null,
      "image_src": "https://example.com/sku2.jpg"
    }
  ],
  "specifications": [
    {"sku_id": "1001", "name": "颜色", "value": "深空黑色"},
    {"sku_id": "1001", "name": "存储容量", "value": "256GB"},
    {"sku_id": "1002", "name": "颜色", "value": "深空黑色"},
    {"sku_id": "1002", "name": "存储容量", "value": "512GB"}
  ]
}

8. 总结

8.1 关键要点

  1. 数据源:三个主要表(SPU、SKU、Option)
  2. 批量查询:使用 IN 子句提高查询效率
  3. 类别映射:通过预加载的映射表将 ID 转换为名称
  4. 多款式处理:通过嵌套结构和扁平化字段支持复杂的 SKU 查询
  5. 多语言支持:自动翻译文本字段,支持中英文双语
  6. 向量搜索:批量生成 embedding,支持语义搜索

8.2 复杂字段处理总结

字段类型 处理方式 关键逻辑
类别 ID映射 + 路径解析 category_path 解析ID,通过映射表转换为名称,构建多级分类
SKU 嵌套数组 + 聚合计算 将每个 SKU 转换为嵌套对象,同时计算价格范围、总库存等聚合值
选项 名称 + 值分离 从 Option 表获取名称,从 SKU 表提取值,分别存储
规格 结构化数组 将 SKU 的选项值转换为 {name, value, sku_id} 结构,支持过滤和分面

附录:相关代码文件

  • indexer/incremental_service.py: 增量索引服务,数据获取逻辑
  • indexer/document_transformer.py: 文档转换器,字段填充逻辑
  • indexer/indexing_utils.py: 工具函数,分类映射加载和 Transformer 创建
  • indexer/bulk_indexer.py: 批量写入器,ES 写入逻辑