# MySQL 到 Elasticsearch 文档映射说明 ## 1. 概述 本文档详细说明从 MySQL 数据库到 Elasticsearch 的完整数据转换流程,包括 SQL 查询逻辑、字段映射规则、以及复杂字段(类别、多款式)的处理方法。 ### 1.1 数据流转架构 ``` MySQL 数据库 ↓ [SQL 查询] → SPU表、SKU表、Option表 ↓ [数据加载] → Pandas DataFrame ↓ [文档转换] → ES 文档字典 ↓ [批量写入] → Elasticsearch ``` ### 1.2 核心组件 - **IncrementalIndexerService**: 增量索引服务,负责数据获取和索引 - **SPUDocumentTransformer**: 文档转换器,负责将数据库记录转换为 ES 文档 - **BulkIndexer**: 批量写入器,负责将文档批量写入 ES --- ## 2. 数据源表结构 ### 2.1 SPU 表(shoplazza_product_spu) SPU(Standard Product Unit)表存储商品的基本信息。 **主要字段:** | 字段名 | 类型 | 说明 | |--------|------|------| | `id` | BIGINT | SPU主键ID | | `tenant_id` | BIGINT | 租户ID | | `title` | VARCHAR(500) | 商品标题 | | `brief` | VARCHAR(1000) | 商品简介 | | `description` | TEXT | 商品详细描述 | | `vendor` | VARCHAR(255) | 供应商/品牌 | | `category` | VARCHAR(255) | 类目名称 | | `category_id` | BIGINT | 类目ID | | `category_path` | VARCHAR(500) | 类目路径(逗号分隔的ID列表) | | `category_level` | INT | 类目层级 | | `tags` | TEXT | 标签(逗号分隔) | | `image_src` | VARCHAR(500) | 主图URL | | `fake_sales` | INT | 假销量 | | `create_time` | DATETIME | 创建时间 | | `update_time` | DATETIME | 更新时间 | | `deleted` | BIT(1) | 删除标记 | ### 2.2 SKU 表(shoplazza_product_sku) SKU(Stock Keeping Unit)表存储商品的变体信息,一个 SPU 可以有多个 SKU。 **主要字段:** | 字段名 | 类型 | 说明 | |--------|------|------| | `id` | BIGINT | SKU主键ID | | `spu_id` | BIGINT | 关联的SPU ID | | `tenant_id` | BIGINT | 租户ID | | `sku` | VARCHAR(100) | SKU编码 | | `price` | DECIMAL(10,2) | 价格 | | `compare_at_price` | DECIMAL(10,2) | 对比价格(原价) | | `inventory_quantity` | INT | 库存数量 | | `weight` | DECIMAL(10,2) | 重量 | | `weight_unit` | VARCHAR(10) | 重量单位 | | `option1` | VARCHAR(255) | 选项1的值(如:颜色-红色) | | `option2` | VARCHAR(255) | 选项2的值(如:尺寸-L) | | `option3` | VARCHAR(255) | 选项3的值(如:材质-棉) | | `image_src` | VARCHAR(500) | SKU图片URL | | `deleted` | BIT(1) | 删除标记 | ### 2.3 Option 表(shoplazza_product_option) Option 表存储选项的名称定义,用于定义 SKU 的选项维度(如:颜色、尺寸、材质)。 **主要字段:** | 字段名 | 类型 | 说明 | |--------|------|------| | `id` | BIGINT | Option主键ID | | `spu_id` | BIGINT | 关联的SPU ID | | `tenant_id` | BIGINT | 租户ID | | `position` | INT | 位置序号(1、2、3对应option1、option2、option3) | | `name` | VARCHAR(255) | 选项名称(如:"颜色"、"尺寸") | | `values` | TEXT | 选项值列表(JSON格式) | | `deleted` | BIT(1) | 删除标记 | --- ## 3. SQL 查询逻辑 ### 3.1 批量查询 SPU 数据 **方法:** `_load_spus_for_spu_ids(tenant_id, spu_ids, include_deleted=True)` **SQL 查询:** ```sql SELECT id, shop_id, shoplazza_id, title, brief, description, spu, vendor, vendor_url, image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id IN :spu_ids [AND deleted = 0] -- 如果 include_deleted=False ``` **查询特点:** - 使用 `IN` 子句批量查询多个 SPU - 支持查询已删除的记录(用于检测删除状态) - 使用 SQLAlchemy 的 `bindparam` 和 `expanding=True` 处理动态 IN 列表 **代码位置:** `indexer/incremental_service.py:223-262` ### 3.2 批量查询 SKU 数据 **方法:** `_load_skus_for_spu_ids(tenant_id, spu_ids)` **SQL 查询:** ```sql SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id, title, sku, barcode, position, price, compare_at_price, cost_price, option1, option2, option3, inventory_quantity, weight, weight_unit, image_src, wholesale_price, note, extend, shoplazza_created_at, shoplazza_updated_at, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_sku WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids ``` **查询特点:** - 批量查询多个 SPU 的所有 SKU - 只查询未删除的 SKU(`deleted = 0`) - 结果按 `spu_id` 分组处理 **代码位置:** `indexer/incremental_service.py:264-288` ### 3.3 批量查询 Option 数据 **方法:** `_load_options_for_spu_ids(tenant_id, spu_ids)` **SQL 查询:** ```sql SELECT id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, position, name, `values`, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_option WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids ORDER BY spu_id, position ``` **查询特点:** - 批量查询多个 SPU 的所有 Option - 按 `spu_id` 和 `position` 排序,确保选项顺序一致 - `position` 字段对应 SKU 表中的 `option1`、`option2`、`option3` **代码位置:** `indexer/incremental_service.py:290-310` ### 3.4 分类映射加载 **方法:** `load_category_mapping(db_engine)` **SQL 查询:** ```sql SELECT DISTINCT category_id, category FROM shoplazza_product_spu WHERE deleted = 0 AND category_id IS NOT NULL ``` **查询特点:** - 全局加载所有租户的分类映射(category_id → category_name) - 在服务初始化时预加载,避免重复查询 - 用于将 `category_path` 中的 ID 转换为名称 **代码位置:** `indexer/indexing_utils.py:17-51` --- ## 4. 字段映射详细说明 ### 4.1 基础字段映射 #### 4.1.1 标识字段 | ES 字段 | 数据来源 | 转换逻辑 | |---------|----------|----------| | `tenant_id` | `spu_row['tenant_id']` | 直接转换为字符串:`str(tenant_id)` | | `spu_id` | `spu_row['id']` | 直接转换为字符串:`str(spu_id)` | #### 4.1.2 文本字段(多语言处理) 文本字段支持中英文双语,根据租户配置进行自动翻译。 **字段列表:** - `title_zh` / `title_en` - `brief_zh` / `brief_en` - `description_zh` / `description_en` - `vendor_zh` / `vendor_en` **填充逻辑:** 1. **获取租户主语言**:从 `tenant_config` 读取 `primary_language`(默认 'zh') 2. **翻译处理**: - 如果配置了 `translate_to_en=true` 且主语言不是 'en',则翻译到英文 - 如果配置了 `translate_to_zh=true` 且主语言不是 'zh',则翻译到中文 - 使用 `Translator.translate_for_indexing()` 方法进行翻译 3. **字段填充**: ```python # 示例:title 字段 if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) if self.translator: translations = self.translator.translate_for_indexing( title_text, shop_language=primary_lang, source_lang=primary_lang, translate_to_en=translate_to_en, translate_to_zh=translate_to_zh, ) doc['title_zh'] = translations.get('zh') or (title_text if primary_lang == 'zh' else None) doc['title_en'] = translations.get('en') or (title_text if primary_lang == 'en' else None) ``` **代码位置:** `indexer/document_transformer.py:168-298` #### 4.1.3 标签字段 | ES 字段 | 数据来源 | 转换逻辑 | |---------|----------|----------| | `tags` | `spu_row['tags']` | 逗号分隔字符串 → 列表:`[tag.strip() for tag in tags_str.split(',') if tag.strip()]` | **示例:** - 数据库:`"标签1,标签2,标签3"` - ES:`["标签1", "标签2", "标签3"]` #### 4.1.4 图片字段 | ES 字段 | 数据来源 | 转换逻辑 | |---------|----------|----------| | `image_url` | `spu_row['image_src']` | 如果 URL 不以 `http` 开头,添加 `//` 前缀 | **代码位置:** `indexer/document_transformer.py:390-396` #### 4.1.5 销量字段 | ES 字段 | 数据来源 | 转换逻辑 | |---------|----------|----------| | `sales` | `spu_row['fake_sales']` | 转换为整数,如果为空或转换失败则设为 0 | #### 4.1.6 时间字段 | ES 字段 | 数据来源 | 转换逻辑 | |---------|----------|----------| | `create_time` | `spu_row['create_time']` | 转换为 ISO 格式字符串:`datetime.isoformat()` | | `update_time` | `spu_row['update_time']` | 转换为 ISO 格式字符串:`datetime.isoformat()` | --- ### 4.2 类别(Category)字段映射 类别字段是复杂字段,涉及多级分类的解析和映射。 #### 4.2.1 类别数据来源 类别信息主要来自 SPU 表的以下字段: - `category_path`: 类目路径,逗号分隔的类目ID列表(如:`"1,2,3"`) - `category`: 类目名称(备用字段) - `category_id`: 当前类目ID - `category_level`: 类目层级 #### 4.2.2 类别映射流程 **步骤 1:加载分类映射** 在服务初始化时,从数据库加载全局分类映射: ```python category_id_to_name = load_category_mapping(db_engine) # 结果:{"1": "电子产品", "2": "手机", "3": "iPhone"} ``` **步骤 2:解析 category_path** ```python if pd.notna(spu_row.get('category_path')): category_path = str(spu_row['category_path']) # 例如:"1,2,3" category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] # 结果:["1", "2", "3"] ``` **步骤 3:ID 转名称** ```python category_names = [] missing_ids = [] for cid in category_ids: if cid in self.category_id_to_name: category_names.append(self.category_id_to_name[cid]) else: missing_ids.append(cid) ``` **步骤 4:数据质量检查** 如果存在缺失的类目ID(在映射中找不到),则: - 记录错误日志 - **不写入任何类目字段**(视为没有类目) - 不阻塞索引流程 **步骤 5:填充 ES 字段** ```python if category_names: # 构建类目路径字符串 category_path_str = '/'.join(category_names) doc['category_path_zh'] = category_path_str # 例如:"电子产品/手机/iPhone" # 填充分层类目名称 if len(category_names) > 0: doc['category1_name'] = category_names[0] # "电子产品" if len(category_names) > 1: doc['category2_name'] = category_names[1] # "手机" if len(category_names) > 2: doc['category3_name'] = category_names[2] # "iPhone" ``` **步骤 6:备用处理(category_path 为空时)** 如果 `category_path` 为空,使用 `category` 字段作为备选: ```python elif pd.notna(spu_row.get('category')): category = str(spu_row['category']) doc['category_name_zh'] = category # 尝试从 category 字段解析多级分类(如果包含 "/") if '/' in category: path_parts = category.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() ``` #### 4.2.3 类别相关 ES 字段 | ES 字段 | 类型 | 说明 | |---------|------|------| | `category_path_zh` | text | 类目路径字符串(如:"电子产品/手机/iPhone") | | `category_path_en` | text | 类目路径英文(暂未实现) | | `category1_name` | keyword | 一级类目名称 | | `category2_name` | keyword | 二级类目名称 | | `category3_name` | keyword | 三级类目名称 | | `category_id` | keyword | 当前类目ID | | `category_level` | integer | 类目层级 | **代码位置:** `indexer/document_transformer.py:300-376` --- ### 4.3 多款式(SKU/Options)字段映射 多款式字段是最复杂的部分,涉及 SKU 嵌套结构、选项值提取、规格构建等。 #### 4.3.1 SKU 数据结构 一个 SPU 可以有多个 SKU,每个 SKU 代表一个商品变体(如:红色-L码、蓝色-M码)。 **ES 中的 SKU 嵌套结构:** ```json { "skus": [ { "sku_id": "123", "price": 99.99, "compare_at_price": 129.99, "sku_code": "SKU001", "stock": 100, "weight": 0.5, "weight_unit": "kg", "option1_value": "红色", "option2_value": "L", "option3_value": "棉", "image_src": "https://..." }, { "sku_id": "124", "price": 99.99, "compare_at_price": 129.99, "sku_code": "SKU002", "stock": 50, "weight": 0.5, "weight_unit": "kg", "option1_value": "蓝色", "option2_value": "M", "option3_value": "棉", "image_src": "https://..." } ] } ``` #### 4.3.2 SKU 处理流程 **步骤 1:构建 Option 名称映射** 从 Option 表获取选项名称: ```python option_name_map = {} if not options.empty: for _, opt_row in options.iterrows(): position = opt_row.get('position') # 1, 2, 3 name = opt_row.get('name') # "颜色", "尺寸", "材质" if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) # 结果:{1: "颜色", 2: "尺寸", 3: "材质"} ``` **步骤 2:转换每个 SKU** ```python for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: skus_list.append(sku_data) ``` **SKU 转换逻辑:** ```python def _transform_sku_row(self, sku_row, option_name_map): sku_data = { 'sku_id': str(sku_row['id']), 'price': float(sku_row['price']) if pd.notna(sku_row.get('price')) else None, 'compare_at_price': float(sku_row['compare_at_price']) if pd.notna(sku_row.get('compare_at_price')) else None, 'sku_code': str(sku_row['sku']) if pd.notna(sku_row.get('sku')) else None, 'stock': int(sku_row['inventory_quantity']) if pd.notna(sku_row.get('inventory_quantity')) else 0, 'weight': float(sku_row['weight']) if pd.notna(sku_row.get('weight')) else None, 'weight_unit': str(sku_row['weight_unit']) if pd.notna(sku_row.get('weight_unit')) else None, 'image_src': str(sku_row['image_src']) if pd.notna(sku_row.get('image_src')) else None, } # 填充选项值 if pd.notna(sku_row.get('option1')): sku_data['option1_value'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): sku_data['option2_value'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): sku_data['option3_value'] = str(sku_row['option3']) return sku_data ``` **步骤 3:收集聚合信息** 在处理 SKU 的同时,收集价格、重量、库存等聚合信息: ```python prices = [] compare_prices = [] sku_prices = [] sku_weights = [] sku_weight_units = [] total_inventory = 0 for sku_data in skus_list: if 'price' in sku_data and sku_data['price'] is not None: prices.append(sku_data['price']) sku_prices.append(sku_data['price']) if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: compare_prices.append(sku_data['compare_at_price']) if 'weight' in sku_data and sku_data['weight'] is not None: sku_weights.append(int(float(sku_data['weight']))) if 'weight_unit' in sku_data and sku_data['weight_unit']: sku_weight_units.append(sku_data['weight_unit']) if 'stock' in sku_data and sku_data['stock'] is not None: total_inventory += sku_data['stock'] ``` **步骤 4:填充 ES 字段** ```python doc['skus'] = skus_list # 嵌套 SKU 数组 # 价格范围 if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) # SKU 扁平化字段(用于过滤和聚合) doc['sku_prices'] = sku_prices # [99.99, 99.99, 129.99] doc['sku_weights'] = sku_weights # [500, 500, 600] doc['sku_weight_units'] = list(set(sku_weight_units)) # ["kg"] doc['total_inventory'] = total_inventory # 150 ``` **代码位置:** `indexer/document_transformer.py:398-480` #### 4.3.3 选项名称字段 从 Option 表获取选项名称,填充到 ES 文档的顶层: ```python def _fill_option_names(self, doc, options): if not options.empty: sorted_options = options.sort_values('position') if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): doc['option1_name'] = str(sorted_options.iloc[0]['name']) # "颜色" if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): doc['option2_name'] = str(sorted_options.iloc[1]['name']) # "尺寸" if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): doc['option3_name'] = str(sorted_options.iloc[2]['name']) # "材质" ``` **ES 字段:** - `option1_name`: 选项1的名称(如:"颜色") - `option2_name`: 选项2的名称(如:"尺寸") - `option3_name`: 选项3的名称(如:"材质") **代码位置:** `indexer/document_transformer.py:378-388` #### 4.3.4 选项值字段 从所有 SKU 中提取选项值,去重后填充到 ES 文档: ```python def _fill_option_values(self, doc, skus): option1_values = [] option2_values = [] option3_values = [] for _, sku_row in skus.iterrows(): if pd.notna(sku_row.get('option1')): option1_values.append(str(sku_row['option1'])) if pd.notna(sku_row.get('option2')): option2_values.append(str(sku_row['option2'])) if pd.notna(sku_row.get('option3')): option3_values.append(str(sku_row['option3'])) # 根据配置决定是否写入索引(searchable_option_dimensions) if 'option1' in self.searchable_option_dimensions: doc['option1_values'] = list(set(option1_values)) # ["红色", "蓝色", "绿色"] else: doc['option1_values'] = [] # 同样处理 option2_values 和 option3_values ``` **ES 字段:** - `option1_values`: 选项1的所有值列表(如:`["红色", "蓝色", "绿色"]`) - `option2_values`: 选项2的所有值列表(如:`["S", "M", "L"]`) - `option3_values`: 选项3的所有值列表(如:`["棉", "涤纶"]`) **注意:** 只有配置在 `searchable_option_dimensions` 中的选项才会被索引。 **代码位置:** `indexer/document_transformer.py:482-510` #### 4.3.5 规格(Specifications)字段 规格字段用于支持按规格过滤和分面搜索,将 SKU 的选项值结构化存储。 **构建逻辑:** ```python specifications = [] for _, sku_row in skus.iterrows(): sku_id = str(sku_row['id']) # 如果 SKU 有 option1 且 Option 表中有对应的名称 if pd.notna(sku_row.get('option1')) and 1 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[1], # "颜色" 'value': str(sku_row['option1']) # "红色" }) # 同样处理 option2 和 option3 if pd.notna(sku_row.get('option2')) and 2 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[2], # "尺寸" 'value': str(sku_row['option2']) # "L" }) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[3], # "材质" 'value': str(sku_row['option3']) # "棉" }) doc['specifications'] = specifications ``` **ES 字段结构:** ```json { "specifications": [ {"sku_id": "123", "name": "颜色", "value": "红色"}, {"sku_id": "123", "name": "尺寸", "value": "L"}, {"sku_id": "123", "name": "材质", "value": "棉"}, {"sku_id": "124", "name": "颜色", "value": "蓝色"}, {"sku_id": "124", "name": "尺寸", "value": "M"}, {"sku_id": "124", "name": "材质", "value": "棉"} ] } ``` **用途:** - 支持按规格过滤:`{"specifications": {"name": "颜色", "value": "红色"}}` - 支持规格分面:统计每个规格值的数量 **代码位置:** `indexer/document_transformer.py:459-478` --- ### 4.4 向量字段映射 #### 4.4.1 标题向量(title_embedding) **生成时机:** - 在增量索引中,批量生成 embedding(性能优化) - 在单条查询中,按需生成 **生成逻辑:** ```python # 批量生成(增量索引) if enable_embedding and encoder and documents: title_texts = [] title_doc_indices = [] for i, (_, doc) in enumerate(documents): title_text = doc.get("title_en") or doc.get("title_zh") if title_text and str(title_text).strip(): title_texts.append(str(title_text)) title_doc_indices.append(i) if title_texts: embeddings = encoder.encode_batch(title_texts, batch_size=32) for j, emb in enumerate(embeddings): doc_idx = title_doc_indices[j] if isinstance(emb, np.ndarray): documents[doc_idx][1]["title_embedding"] = emb.tolist() ``` **ES 字段:** - `title_embedding`: 1024 维浮点数组,用于语义搜索 **代码位置:** - 批量生成:`indexer/incremental_service.py:558-576` - 单条生成:`indexer/document_transformer.py:586-619` --- ## 5. 数据转换完整流程 ### 5.1 增量索引流程 **入口方法:** `IncrementalIndexerService.index_spus_to_es()` **流程步骤:** 1. **参数处理** - 去重 SPU ID 列表 - 生成索引名称(如果未提供) 2. **显式删除处理**(如果提供了 `delete_spu_ids`) - 遍历 `delete_spu_ids`,从 ES 中删除对应文档 3. **批量加载数据** - 调用 `_load_spus_for_spu_ids()` 批量加载 SPU 数据 - 调用 `_load_skus_for_spu_ids()` 批量加载 SKU 数据 - 调用 `_load_options_for_spu_ids()` 批量加载 Option 数据 4. **自动删除检测** - 检查 SPU 是否在数据库中存在 - 检查 SPU 的 `deleted` 字段 - 如果不存在或已删除,从 ES 中删除对应文档 5. **数据分组** - 将 SKU 按 `spu_id` 分组 - 将 Option 按 `spu_id` 分组 6. **文档转换** - 遍历每个 SPU - 调用 `transformer.transform_spu_to_doc()` 转换为 ES 文档 - 收集所有文档 7. **批量生成 Embedding**(如果启用) - 收集所有文档的标题文本 - 批量调用 `encoder.encode_batch()` 生成 embedding - 填充到对应文档 8. **批量写入 ES** - 使用 `BulkIndexer` 批量写入文档 - 使用 `spu_id` 作为文档 ID 9. **返回结果** - 返回成功/失败统计 - 返回每个 SPU 的处理状态 **代码位置:** `indexer/incremental_service.py:391-679` ### 5.2 文档转换流程 **入口方法:** `SPUDocumentTransformer.transform_spu_to_doc()` **流程步骤:** 1. **初始化文档字典** ```python doc = {} doc['tenant_id'] = str(tenant_id) doc['spu_id'] = str(spu_id) ``` 2. **填充文本字段**(多语言) - 调用 `_fill_text_fields()` 填充 title、brief、description、vendor 3. **填充标签** - 解析 `tags` 字段,转换为列表 4. **填充类别字段** - 调用 `_fill_category_fields()` 处理类别映射 5. **填充选项名称** - 调用 `_fill_option_names()` 从 Option 表获取选项名称 6. **填充图片 URL** - 调用 `_fill_image_url()` 处理图片 URL 7. **填充销量** - 转换 `fake_sales` 为整数 8. **处理 SKU** - 调用 `_process_skus()` 处理所有 SKU - 构建 `skus` 嵌套数组 - 构建 `specifications` 数组 - 计算价格范围、总库存等聚合字段 9. **填充选项值** - 调用 `_fill_option_values()` 提取所有选项值 10. **填充时间字段** - 转换 `create_time` 和 `update_time` 为 ISO 格式 11. **返回文档** ```python return doc ``` **代码位置:** `indexer/document_transformer.py:57-166` --- ## 6. 特殊处理逻辑 ### 6.1 删除检测 系统支持两种删除方式: 1. **显式删除**:通过 `delete_spu_ids` 参数显式指定要删除的 SPU 2. **自动检测删除**: - SPU 在数据库中不存在 - SPU 的 `deleted` 字段为 `1` 或 `b'\x01'` **删除处理:** ```python # 检查 deleted 字段(可能是 bit 类型) def _is_deleted_value(v): if isinstance(v, bytes): return v == b"\x01" or v == 1 return bool(v) spu_df["_is_deleted"] = spu_df["deleted"].apply(_is_deleted_value) ``` **代码位置:** `indexer/incremental_service.py:481-517` ### 6.2 数据质量检查 #### 6.2.1 类别映射缺失 如果 `category_path` 中的类目ID在映射中不存在: - 记录错误日志 - 不写入任何类目字段(视为没有类目) - 不阻塞索引流程 #### 6.2.2 标题缺失 如果 SPU 没有标题: - 记录错误日志 - 继续处理(但可能影响搜索效果) #### 6.2.3 SKU 缺失 如果 SPU 没有 SKU: - 记录警告日志 - 继续处理(但价格、库存等字段可能为空) ### 6.3 性能优化 1. **批量查询**:使用 `IN` 子句批量查询,减少数据库往返 2. **缓存分类映射**:在服务初始化时预加载,避免重复查询 3. **缓存 Transformer**:按租户缓存 Transformer 实例,避免重复初始化 4. **批量生成 Embedding**:收集所有标题后批量生成,利用批处理性能 5. **批量写入 ES**:使用 `BulkIndexer` 批量写入,提高写入效率 --- ## 7. ES 文档完整示例 ```json { "tenant_id": "1", "spu_id": "12345", "title_zh": "iPhone 15 Pro Max", "title_en": "iPhone 15 Pro Max", "brief_zh": "最新款 iPhone", "brief_en": "Latest iPhone", "description_zh": "详细描述...", "description_en": "Detailed description...", "vendor_zh": "Apple", "vendor_en": "Apple", "tags": ["手机", "智能手机", "Apple"], "category_path_zh": "电子产品/手机/iPhone", "category1_name": "电子产品", "category2_name": "手机", "category3_name": "iPhone", "category_id": "3", "category_level": 3, "option1_name": "颜色", "option2_name": "存储容量", "option3_name": null, "option1_values": ["深空黑色", "原色钛金属", "白色钛金属"], "option2_values": ["256GB", "512GB", "1TB"], "option3_values": [], "image_url": "https://example.com/image.jpg", "sales": 1000, "min_price": 8999.0, "max_price": 12999.0, "compare_at_price": 12999.0, "sku_prices": [8999.0, 10999.0, 12999.0], "sku_weights": [221, 221, 221], "sku_weight_units": ["g"], "total_inventory": 500, "create_time": "2024-01-01T00:00:00", "update_time": "2024-01-15T10:30:00", "title_embedding": [0.123, 0.456, ...], // 1024维向量 "skus": [ { "sku_id": "1001", "price": 8999.0, "compare_at_price": 9999.0, "sku_code": "IP15PM-256-BLK", "stock": 100, "weight": 221.0, "weight_unit": "g", "option1_value": "深空黑色", "option2_value": "256GB", "option3_value": null, "image_src": "https://example.com/sku1.jpg" }, { "sku_id": "1002", "price": 10999.0, "compare_at_price": 11999.0, "sku_code": "IP15PM-512-BLK", "stock": 200, "weight": 221.0, "weight_unit": "g", "option1_value": "深空黑色", "option2_value": "512GB", "option3_value": null, "image_src": "https://example.com/sku2.jpg" } ], "specifications": [ {"sku_id": "1001", "name": "颜色", "value": "深空黑色"}, {"sku_id": "1001", "name": "存储容量", "value": "256GB"}, {"sku_id": "1002", "name": "颜色", "value": "深空黑色"}, {"sku_id": "1002", "name": "存储容量", "value": "512GB"} ] } ``` --- ## 8. 总结 ### 8.1 关键要点 1. **数据源**:三个主要表(SPU、SKU、Option) 2. **批量查询**:使用 `IN` 子句提高查询效率 3. **类别映射**:通过预加载的映射表将 ID 转换为名称 4. **多款式处理**:通过嵌套结构和扁平化字段支持复杂的 SKU 查询 5. **多语言支持**:自动翻译文本字段,支持中英文双语 6. **向量搜索**:批量生成 embedding,支持语义搜索 ### 8.2 复杂字段处理总结 | 字段类型 | 处理方式 | 关键逻辑 | |---------|---------|---------| | **类别** | ID映射 + 路径解析 | 从 `category_path` 解析ID,通过映射表转换为名称,构建多级分类 | | **SKU** | 嵌套数组 + 聚合计算 | 将每个 SKU 转换为嵌套对象,同时计算价格范围、总库存等聚合值 | | **选项** | 名称 + 值分离 | 从 Option 表获取名称,从 SKU 表提取值,分别存储 | | **规格** | 结构化数组 | 将 SKU 的选项值转换为 `{name, value, sku_id}` 结构,支持过滤和分面 | --- ## 附录:相关代码文件 - `indexer/incremental_service.py`: 增量索引服务,数据获取逻辑 - `indexer/document_transformer.py`: 文档转换器,字段填充逻辑 - `indexer/indexing_utils.py`: 工具函数,分类映射加载和 Transformer 创建 - `indexer/bulk_indexer.py`: 批量写入器,ES 写入逻辑