From 3a950275b31399928b9d15724fc6b7d0fb14e2dd Mon Sep 17 00:00:00 2001 From: tangwang Date: Sat, 8 Nov 2025 15:45:35 +0800 Subject: [PATCH] 导入测试数据 --- data/customer1/ingest_customer1.py | 41 +++++++++++++++++++++++++++++++++++++---- start_all.sh | 1 - 当前开发进度.md | 545 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------- 3 files changed, 551 insertions(+), 36 deletions(-) diff --git a/data/customer1/ingest_customer1.py b/data/customer1/ingest_customer1.py index 4d3ca1d..27a48d3 100755 --- a/data/customer1/ingest_customer1.py +++ b/data/customer1/ingest_customer1.py @@ -9,9 +9,11 @@ import sys import os import pandas as pd import argparse +from typing import Optional -# Add parent directory to path -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# Add parent directory to path (go up 3 levels: customer1 -> data -> SearchEngine -> root) +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, project_root) from config import ConfigLoader from utils import ESClient, get_connection_from_config @@ -53,6 +55,8 @@ def main(): parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing') parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists') parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + parser.add_argument('--es-username', default=None, help='Elasticsearch username (or set ES_USERNAME env var)') + parser.add_argument('--es-password', default=None, help='Elasticsearch password (or set ES_PASSWORD env var)') parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation') args = parser.parse_args() @@ -80,11 +84,40 @@ def main(): # Initialize Elasticsearch client print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}") - os.environ['ES_HOST'] = args.es_host - es_client = ESClient(hosts=[args.es_host]) + + # Get credentials: prioritize command-line args, then environment variables, then .env file + es_username = args.es_username + es_password = args.es_password + + # If not provided via args, try to load from .env file via env_config + if not es_username or not es_password: + try: + from config.env_config import get_es_config + es_config = get_es_config() + es_username = es_username or es_config.get('username') + es_password = es_password or es_config.get('password') + except Exception: + # Fallback to environment variables + es_username = es_username or os.getenv('ES_USERNAME') + es_password = es_password or os.getenv('ES_PASSWORD') + + # Create ES client with credentials if available + if es_username and es_password: + print(f" Using authentication: {es_username}") + es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password) + else: + print(f" Warning: No authentication credentials found") + print(f" Attempting connection without authentication (will fail if ES requires auth)") + es_client = ESClient(hosts=[args.es_host]) if not es_client.ping(): print("Failed to connect to Elasticsearch") + print("\nTroubleshooting:") + print(" 1. Check if Elasticsearch is running: curl http://localhost:9200") + print(" 2. If ES requires authentication, provide credentials:") + print(" - Use --es-username and --es-password arguments, or") + print(" - Set ES_USERNAME and ES_PASSWORD environment variables") + print(" 3. Verify the host URL is correct: --es-host") return 1 print("Connected to Elasticsearch successfully") diff --git a/start_all.sh b/start_all.sh index 572469c..9f444ef 100755 --- a/start_all.sh +++ b/start_all.sh @@ -3,7 +3,6 @@ # One-click startup script for SearchEngine # This script starts everything you need -source /home/tw/miniconda3/etc/profile.d/conda.sh set -e diff --git a/当前开发进度.md b/当前开发进度.md index 0d3fcbd..7fb71f4 100644 --- a/当前开发进度.md +++ b/当前开发进度.md @@ -1,53 +1,536 @@ +# 搜索引擎通用化开发进度 +## 项目概述 对后端搜索技术 做通用化。 - 通用化的本质 是 对于各种业务数据、各种检索需求,都可以 用少量定制+配置化 来实现效果。 -## 1. 原始数据层的约定。 -### 店匠主表 -shoplazza_product_sku -shoplazza_product_spu -所有租户共用这个主表 -### 每个租户的辅表 -各个租户,有自己的扩展表。 入索引的时候,商品主表 shoplazza_product_sku 的 id + shopid,拼接租户自己单独的扩展表(比如可以放一些自己的属性体系、各种语言的商品名、品牌名、标签、分类等) +**通用化的本质**:对于各种业务数据、各种检索需求,都可以用少量定制+配置化来实现效果。 + +--- + +## 1. 原始数据层的约定 + +所有租户共用主表、独立配置和扩展表,有自己独立的ES索引。 + +### 1.1 店匠主表 + +所有租户共用以下主表: +- `shoplazza_product_sku` - SKU级别商品数据 +- `shoplazza_product_spu` - SPU级别商品数据 + +### 1.2 每个租户的扩展表 + +各个租户有自己的扩展表,不同的租户根据不同的业务需要、以及不同的数据源,来定制自己的扩展表: +- 自定义属性体系 +- 多语言商品标题(中文、英文、俄文等) +- 品牌名、不同的类目和标签体系 +- 业务过滤和聚合字段 +- 权重(提权)字段 + +**数据关联方式**: +- 入索引时,商品主表 `shoplazza_product_sku` 的 `id` + `shopid` 与租户扩展表关联 +- 例如:`customer1_extension` 表存储 customer1 的自定义字段 + +### 1.3 配置化方案 + +统一通过配置文件定义: +1. ES 字段定义(字段类型、分析器、来源表/列) +2. ES mapping 结构生成 +3. 数据入库映射关系 + +--- + +## 2. 配置系统实现 + +### 2.1 应用结构配置(字段定义) + +**配置文件位置**:`config/schema/{customer_id}_config.yaml` + +**配置内容**:定义了 ES 的输入数据有哪些字段、关联 MySQL 的哪些字段。 + +**实现情况**: + +#### 字段类型支持 +- **TEXT**:文本字段,支持多语言分析器 +- **KEYWORD**:关键词字段,用于精确匹配和聚合 +- **TEXT_EMBEDDING**:文本向量字段(1024维,dot_product相似度) +- **IMAGE_EMBEDDING**:图片向量字段(1024维,dot_product相似度) +- **INT/LONG**:整数类型 +- **FLOAT/DOUBLE**:浮点数类型 +- **DATE**:日期类型 +- **BOOLEAN**:布尔类型 + +#### 分析器支持 +- **chinese_ecommerce**:中文电商分词器(index_ansj/query_ansj) +- **english**:英文分析器 +- **russian**:俄文分析器 +- **arabic**:阿拉伯文分析器 +- **spanish**:西班牙文分析器 +- **japanese**:日文分析器 +- **standard**:标准分析器 +- **keyword**:关键词分析器 + +#### 字段配置示例 + +```yaml +fields: + # 主键字段 + - name: "skuId" + type: "LONG" + source_table: "main" # 主表 + source_column: "id" + required: true + index: true + store: true + + # 多语言文本字段 + - name: "name" + type: "TEXT" + source_table: "extension" # 扩展表 + source_column: "name" + analyzer: "chinese_ecommerce" + boost: 2.0 + index: true + store: true + + - name: "enSpuName" + type: "TEXT" + source_table: "extension" + source_column: "enSpuName" + analyzer: "english" + boost: 2.0 + + - name: "ruSkuName" + type: "TEXT" + source_table: "extension" + source_column: "ruSkuName" + analyzer: "russian" + boost: 2.0 + + # 文本向量字段 + - name: "name_embedding" + type: "TEXT_EMBEDDING" + source_table: "extension" + source_column: "name" + embedding_dims: 1024 + embedding_similarity: "dot_product" + index: true + + # 图片向量字段 + - name: "image_embedding" + type: "IMAGE_EMBEDDING" + source_table: "extension" + source_column: "imageUrl" + embedding_dims: 1024 + embedding_similarity: "dot_product" + nested: false +``` + +**实现模块**: +- `config/config_loader.py` - 配置加载器 +- `config/field_types.py` - 字段类型定义 +- `indexer/mapping_generator.py` - ES mapping 生成器 +- `indexer/data_transformer.py` - 数据转换器 + +### 2.2 索引结构配置(查询域配置) + +**配置内容**:定义了 ES 的字段索引 mapping 配置,支持各个域的查询,包括默认域的查询。 + +**实现情况**: + +#### 域(Domain)配置 +每个域定义了: +- 域名称(如 `default`, `title`, `category`, `brand`) +- 域标签(中文描述) +- 搜索字段列表 +- 默认分析器 +- 权重(boost) +- **多语言字段映射**(`language_field_mapping`) + +#### 多语言字段映射 + +支持将不同语言的查询路由到对应的字段: + +```yaml +indexes: + - name: "default" + label: "默认索引" + fields: + - "name" + - "enSpuName" + - "ruSkuName" + - "categoryName" + - "brandName" + analyzer: "chinese_ecommerce" + boost: 1.0 + language_field_mapping: + zh: + - "name" + - "categoryName" + - "brandName" + en: + - "enSpuName" + ru: + - "ruSkuName" + + - name: "title" + label: "标题索引" + fields: + - "name" + - "enSpuName" + - "ruSkuName" + analyzer: "chinese_ecommerce" + boost: 2.0 + language_field_mapping: + zh: + - "name" + en: + - "enSpuName" + ru: + - "ruSkuName" +``` + +**工作原理**: +1. 检测查询语言(中文、英文、俄文等) +2. 如果查询语言在 `language_field_mapping` 中,使用原始查询搜索对应语言的字段 +3. 将查询翻译到其他支持的语言,分别搜索对应语言的字段 +4. 组合多个语言查询的结果,提高召回率 + +**实现模块**: +- `search/multilang_query_builder.py` - 多语言查询构建器 +- `query/query_parser.py` - 查询解析器(支持语言检测和翻译) + +--- + +## 3. 测试数据灌入 + +### 3.1 数据源 + +**主表**:`shoplazza_product_sku` +- 所有租户共用 +- 包含基础商品信息(id, shopid 等) + +**扩展表**:`customer1_extension` +- 每个租户独立 +- 包含自定义字段和多语言字段 + +### 3.2 数据灌入方式 + +**实现情况**: + +#### 命令行工具 +```bash +python main.py ingest \ + --customer customer1 \ + --csv-file data/customer1_data.csv \ + --es-host http://localhost:9200 \ + --recreate \ + --batch-size 100 +``` + +#### 数据流程 +1. **数据加载**:从 CSV 文件或 MySQL 数据库加载数据 +2. **数据转换**: + - 字段映射(根据配置将源字段映射到 ES 字段) + - 类型转换(字符串、数字、日期等) + - 向量生成(文本向量、图片向量) + - 向量缓存(避免重复计算) +3. **索引创建**: + - 根据配置生成 ES mapping + - 创建或更新索引 +4. **批量入库**: + - 批量写入 ES(默认每批 500 条) + - 错误处理和重试机制 + +#### 配置映射示例 + +**customer1_config.yaml** 配置: +```yaml +main_table: "shoplazza_product_sku" +extension_table: "customer1_extension" +es_index_name: "search_customer1" + +fields: + - name: "skuId" + source_table: "main" + source_column: "id" + - name: "name" + source_table: "extension" + source_column: "name" + - name: "enSpuName" + source_table: "extension" + source_column: "enSpuName" +``` + +**数据转换**: +- 主表字段:直接从 `shoplazza_product_sku` 表的 `id` 字段读取 +- 扩展表字段:从 `customer1_extension` 表的对应列读取 +- 向量字段:对源文本/图片生成向量并缓存 + +**实现模块**: +- `indexer/data_transformer.py` - 数据转换器 +- `indexer/bulk_indexer.py` - 批量索引器 +- `indexer/indexing_pipeline.py` - 索引流水线 +- `embeddings/bge_encoder.py` - 文本向量编码器 +- `embeddings/clip_image_encoder.py` - 图片向量编码器 + +--- + +## 4. QueryParser 实现 + + +### 4.1 查询改写(Query Rewriting) + +配置词典的key是query,value是改写后的查询表达式,比如。比如品牌词 改写为在brand|query OR name|query,类别词、标签词等都可以放进去。纠错、规范化、查询改写等 都可以通过这个词典来配置。 +**实现情况**: + +#### 配置方式 +在 `query_config.rewrite_dictionary` 中配置查询改写规则: + +```yaml +query_config: + enable_query_rewrite: true + rewrite_dictionary: + "芭比": "brand:芭比 OR name:芭比娃娃" + "玩具": "category:玩具" + "消防": "category:消防 OR name:消防" +``` + +#### 功能特性 +- **精确匹配**:查询完全匹配词典 key 时,替换为 value +- **部分匹配**:查询包含词典 key 时,替换该部分 +- **支持布尔表达式**:value 可以是复杂的布尔表达式(AND, OR, 域查询等) + +#### 实现模块 +- `query/query_rewriter.py` - 查询改写器 +- `query/query_parser.py` - 查询解析器(集成改写功能) + +### 4.2 翻译(Translation) + +**实现情况**: + +#### 配置方式 +```yaml +query_config: + supported_languages: + - "zh" + - "en" + - "ru" + default_language: "zh" + enable_translation: true + translation_service: "deepl" + translation_api_key: null # 通过环境变量设置 +``` + +#### 功能特性 +1. **语言检测**:自动检测查询语言 +2. **智能翻译**: + - 如果查询是中文,翻译为英文、俄文 + - 如果查询是英文,翻译为中文、俄文 + - 如果查询是其他语言,翻译为所有支持的语言 +3. **域感知翻译**: + - 如果域有 `language_field_mapping`,只翻译到映射中存在的语言 + - 避免不必要的翻译,提高效率 +4. **翻译缓存**:缓存翻译结果,避免重复调用 API + +#### 工作流程 +``` +查询输入 → 语言检测 → 确定目标语言 → 翻译 → 多语言查询构建 +``` + +#### 实现模块 +- `query/language_detector.py` - 语言检测器 +- `query/translator.py` - 翻译器(DeepL API) +- `query/query_parser.py` - 查询解析器(集成翻译功能) + +### 4.3 文本向量化(Text Embedding) + +如果配置打开了text_embedding查询,并且query 包含了default域的查询,那么要把default域的查询词转向量,后面searcher会用这个向量参与查询。 + +**实现情况**: + +#### 配置方式 +```yaml +query_config: + enable_text_embedding: true +``` + +#### 功能特性 +1. **条件生成**: + - 仅当 `enable_text_embedding=true` 时生成向量 + - 仅对 `default` 域查询生成向量 +2. **向量模型**:BGE-M3 模型(1024维向量) +3. **用途**:用于语义搜索(KNN 检索) + +#### 实现模块 +- `embeddings/bge_encoder.py` - BGE 文本编码器 +- `query/query_parser.py` - 查询解析器(集成向量生成) + +--- + +## 5. Searcher 实现 + +参考opensearch,他们自己定义的一套索引结构配置、支持自定义的一套检索表达式、排序表达式,这是各个客户进行配置化的基础,包括索引结构配置、排序策略配置。 +比如各种业务过滤策略 可以简单的通过表达式满足,比如brand|耐克 AND cate2|xxx。指定字段排序可以通过排序的表达式实现。 + +查询默认在default域,相也会对这个域的查询做一些相关性的重点优化,包括融合语义相关性、多语言相关性(可以基于配置 将查询翻译到指定语言并在对应的语言的字段进行查询)来弥补传统查询分析手段(比如查询改写 纠错 词权重等)的不足,也支持通过配置一些词表转为泛查询模式来优化相关性。 + +### 5.1 布尔表达式解析 + +**实现情况**: + +#### 支持的运算符 +- **AND**:所有项必须匹配 +- **OR**:任意项匹配 +- **RANK**:排序增强(类似 OR 但影响排序) +- **ANDNOT**:排除(第一项匹配,第二项不匹配) +- **()**:括号分组 + +#### 优先级(从高到低) +1. `()` - 括号 +2. `ANDNOT` - 排除 +3. `AND` - 与 +4. `OR` - 或 +5. `RANK` - 排序 + +#### 示例 +``` +laptop AND (gaming OR professional) ANDNOT cheap +``` + +#### 实现模块 +- `search/boolean_parser.py` - 布尔表达式解析器 +- `search/searcher.py` - 搜索器(集成布尔解析) + +### 5.2 多语言搜索 + +**实现情况**: + +#### 工作原理 +1. **查询解析**: + - 提取域(如 `title:查询` → 域=`title`,查询=`查询`) + - 检测查询语言 + - 生成翻译 +2. **多语言查询构建**: + - 如果域有 `language_field_mapping`: + - 使用检测到的语言查询对应字段(boost * 1.5) + - 使用翻译后的查询搜索其他语言字段(boost * 1.0) + - 如果域没有 `language_field_mapping`: + - 使用所有字段进行搜索 +3. **查询组合**: + - 多个语言查询组合为 `should` 子句 + - 提高召回率 + +#### 示例 +``` +查询: "芭比娃娃" +域: default +检测语言: zh + +生成的查询: +- 中文查询 "芭比娃娃" → 搜索 name, categoryName, brandName (boost * 1.5) +- 英文翻译 "Barbie doll" → 搜索 enSpuName (boost * 1.0) +- 俄文翻译 "Кукла Барби" → 搜索 ruSkuName (boost * 1.0) +``` + +#### 实现模块 +- `search/multilang_query_builder.py` - 多语言查询构建器 +- `search/searcher.py` - 搜索器(使用多语言构建器) + +### 5.3 相关性计算(Ranking) + +**实现情况**: + +#### 当前实现 +**公式**:`bm25() + 0.2 * text_embedding_relevance()` + +- **bm25()**:BM25 文本相关性得分 + - 包括多语言打分 + - 内部通过配置翻译为多种语言 + - 分别到对应的字段搜索 + - 中文字段使用中文分词器,英文字段使用英文分词器 +- **text_embedding_relevance()**:文本向量相关性得分(KNN 检索的打分) + - 权重:0.2 + +#### 配置方式 +```yaml +ranking: + expression: "bm25() + 0.2*text_embedding_relevance()" + description: "BM25 text relevance combined with semantic embedding similarity" +``` + +#### 扩展性 +- 支持表达式配置(未来可扩展) +- 支持自定义函数(如 `timeliness()`, `field_value()`) + +#### 实现模块 +- `search/ranking_engine.py` - 排序引擎 +- `search/searcher.py` - 搜索器(集成排序功能) -但是,各个租户,可能有不一样的业务数据,比如不同租户有不同的属性的体系、不同语言的商品标题(一般至少有中英文两种满足跨境的搜索需求),有不同的权重(提权)字段、业务过滤和聚合字段。 -能够统一的 只能是 sku表 按照一套配置规范、做一个配置文件,按照配置文件建设ES mapping结构以及做数据的入库。 +--- -1. 应用结构配置 : 定义了ES的输入数据有哪些字段、关联mysql的哪些字段. - 请帮我补充具体实现的一些配置 +## 6. 已完成功能总结 +### 6.1 配置系统 +- ✅ 字段定义配置(类型、分析器、来源表/列) +- ✅ 索引域配置(多域查询、多语言映射) +- ✅ 查询配置(改写词典、翻译配置) +- ✅ 排序配置(表达式配置) +- ✅ 配置验证(字段存在性、类型检查、分析器匹配) -2。 索引结构配置 : 定义了ES的字段,每个字段的索引mapping配置,支持各个域的查询,包括默认的域的查询。索引配置预定一号了一堆分析方式 - 请帮我补充具体实现的一些配置 +### 6.2 数据索引 +- ✅ 数据转换(字段映射、类型转换) +- ✅ 向量生成(文本向量、图片向量) +- ✅ 向量缓存(避免重复计算) +- ✅ 批量索引(错误处理、重试机制) +- ✅ ES mapping 自动生成 -## 测试数据灌入 +### 6.3 查询处理 +- ✅ 查询改写(词典配置) +- ✅ 语言检测 +- ✅ 多语言翻译(DeepL API) +- ✅ 文本向量化(BGE-M3) +- ✅ 域提取(支持 `domain:query` 语法) -灌入数据、mysql到ES的自动同步,不在本项目的范围内,但是,该项目 为了提供测试数据,需要 构造一个实例 customer1. -我们为他构造一套应用配置和索引配置。 -暂时是随机抽了我们自己的1w数据,建设辅助表,然后写一个程序,将数据分别灌入主表和辅表。 +### 6.4 搜索功能 +- ✅ 布尔表达式解析(AND, OR, RANK, ANDNOT, 括号) +- ✅ 多语言查询构建(语言路由、字段映射) +- ✅ 语义搜索(KNN 检索) +- ✅ 相关性排序(BM25 + 向量相似度) +- ✅ 结果聚合(Faceted Search) -请帮我补充具体,当前测试数据灌入的具体的配置和方式,比如辅助表的内容 对应的应用结构配置 索引配置 等等。 +### 6.5 API 服务 +- ✅ RESTful API(FastAPI) +- ✅ 搜索接口(文本搜索、图片搜索) +- ✅ 文档查询接口 +- ✅ 前端界面(HTML + JavaScript) -## queryParser +--- -1. 查询改写。 配置词典的key是query,value是改写后的查询表达式,比如。比如品牌词 改写为在brand|query OR name|query,类别词、标签词等都可以放进去。纠错、规范化、查询改写等 都可以通过这个词典来配置。 -2. 翻译。配置需要得到的几种目标语言。 在customer1测试案例中,我们配置 zh en两种语言。先对query做语言检测,如果query是中文那么要翻译一下en,如果是en那么要翻译zh,如果两者都不是那么zh en都需要翻译。 -3. 如果配置打开了text_embedding查询,并且query 包含了default域的查询,那么要把default域的查询词转向量,后面searcher会用这个向量参与查询。 +## 7. 技术栈 -也帮我补充一些具体实现情况 +- **后端**:Python 3.6+ +- **搜索引擎**:Elasticsearch +- **数据库**:MySQL(Shoplazza) +- **向量模型**:BGE-M3(文本)、CN-CLIP(图片) +- **翻译服务**:DeepL API +- **API 框架**:FastAPI +- **前端**:HTML + JavaScript -## searcher +--- -支持多种检索表达式: -支持多种匹配方式,如AND、OR、RANK、NOTAND以及(),优先级从高到低为(),ANDNOT,AND,OR,RANK。 +## 8. 配置文件示例 -## default域的相关性,是代码里面单独计算,是特定的深度定制优化的,暂时不做配置化。 +完整配置示例请参考:`config/schema/customer1_config.yaml` -暂时具体实现为 bm25()+0.2*text_embedding_relevence(也就是knn检索表达式的打分) -bm25() 包括多语言的打分:内部需要通过配置翻译为多种语言(配置几种目标语言 默认中文、英文,并且设置对应的检索域),然后分别到对应的字段搜索,中文字段到配置的中文title搜索,英文到对应的英文title搜索。 +--- -也帮我补充一些具体实现情况 +## 9. 相关文档 +- `MULTILANG_FEATURE.md` - 多语言功能详细说明 +- `QUICKSTART.md` - 快速开始指南 +- `HighLevelDesign.md` - 高层设计文档 +- `IMPLEMENTATION_SUMMARY.md` - 实现总结 +- `商品数据源入ES配置规范.md` - 数据源配置规范 -- libgit2 0.21.2