diff --git a/.env.example b/.env.example index a2d6ea2..8ede56e 100644 --- a/.env.example +++ b/.env.example @@ -38,7 +38,7 @@ START_TRANSLATOR=0 START_RERANKER=0 # Embedding Models -TEXT_MODEL_DIR=/data/tw/models/bge-m3 # 已经改为web请求了,不使用本地模型 +TEXT_MODEL_ID=Qwen/Qwen3-Embedding-0.6B IMAGE_MODEL_DIR=/data/tw/models/cn-clip # 已经改为web请求了,不使用本地模型 # Cache Directory diff --git a/CHANGES.md b/CHANGES.md deleted file mode 100644 index e0e0dde..0000000 --- a/CHANGES.md +++ /dev/null @@ -1,218 +0,0 @@ -# 云端向量化模块更新 - 变更摘要 - -## 📅 更新日期 -2025-12-05 - -## 🎯 更新目标 -为 saas-search 项目添加基于阿里云 DashScope API 的云端文本向量化功能。 - -## 📝 新增文件 - -### 1. 核心模块 -- ✅ `embeddings/cloud_text_encoder.py` - - 云端文本向量化编码器 - - 使用 text-embedding-v4 模型 - - 单例模式,线程安全 - - 支持批量处理 - -### 2. 测试脚本 -- ✅ `scripts/test_cloud_embedding.py` - - 读取 queries.txt 前 100 条 - - 记录发送/接收时间和耗时 - - 输出详细测试报告 - -### 3. 示例代码 -- ✅ `examples/cloud_embedding_example.py` - - 单个文本向量化示例 - - 批量处理示例 - - 相似度计算示例 - - 完整的错误处理 - -### 4. 文档 -- ✅ `docs/cloud_embedding_usage.md` - 详细使用文档 -- ✅ `docs/cloud_embedding_quickstart.md` - 快速入门指南 -- ✅ `CLOUD_EMBEDDING_README.md` - 功能说明 -- ✅ `CHANGES.md` - 本文档 - -## 🔧 修改文件 - -### 1. requirements.txt -- ✅ 添加依赖:`openai>=1.0.0` - -## 📊 功能特性 - -### CloudTextEncoder 类 - -**方法:** -- `__init__(api_key=None, base_url=None)` - 初始化(单例) -- `encode(sentences, ...)` - 向量化文本 -- `encode_batch(texts, batch_size=32, ...)` - 批量处理 -- `get_embedding_dimension()` - 获取向量维度 - -**特性:** -- 单例模式,确保只有一个实例 -- 线程安全 -- 自动错误处理 -- 支持批量处理避免 API 限制 -- 返回 numpy 数组(兼容现有代码) - -### 测试脚本功能 - -**test_cloud_embedding.py:** -- 读取 queries.txt(可配置数量) -- 逐条发送向量化请求 -- 记录每次请求的: - - 发送时间(精确到毫秒) - - 接收时间(精确到毫秒) - - 耗时(秒) - - 向量维度 - - 成功/失败状态 -- 输出汇总统计: - - 总查询数 - - 成功/失败数量 - - 成功率 - - 总耗时 - - 平均耗时 - - 吞吐量 - -## 📖 使用示例 - -### 基本使用 -```python -from embeddings.cloud_text_encoder import CloudTextEncoder - -encoder = CloudTextEncoder() -embedding = encoder.encode("衣服的质量杠杠的") -``` - -### 批量处理 -```python -texts = ["文本1", "文本2", "文本3"] -embeddings = encoder.encode(texts) -``` - -### 运行测试 -```bash -export DASHSCOPE_API_KEY="sk-xxx" -python scripts/test_cloud_embedding.py -``` - -## 🔍 测试结果示例 - -``` -================================================================================ -Test Summary -================================================================================ -Total Queries: 100 -Successful: 100 -Failed: 0 -Success Rate: 100.0% -Total Time: 35.123s -Total API Time: 32.456s -Average Duration: 0.325s per query -Throughput: 2.85 queries/second -================================================================================ -``` - -## 📚 文档结构 - -``` -docs/ -├── cloud_embedding_usage.md # 详细使用说明 -│ ├── 模块说明 -│ ├── 使用步骤 -│ ├── 注意事项 -│ ├── 地域选择 -│ ├── 故障排除 -│ └── 更多信息 -│ -└── cloud_embedding_quickstart.md # 快速入门 - ├── 新增文件列表 - ├── 快速开始步骤 - ├── 代码示例 - ├── 性能指标 - ├── 配置选项 - ├── 常见问题 - └── 最佳实践 -``` - -## ✅ 验证步骤 - -1. **环境准备** - - [x] 安装 openai 包 - - [x] 设置 DASHSCOPE_API_KEY - -2. **功能测试** - - [ ] 运行 `python scripts/test_cloud_embedding.py` - - [ ] 运行 `python examples/cloud_embedding_example.py` - - [ ] 检查输出是否正常 - -3. **集成测试** - - [ ] 在项目中导入模块 - - [ ] 测试与现有代码的兼容性 - -## 🎯 预期性能 - -基于 text-embedding-v4 模型: -- **向量维度**:1024 -- **平均延迟**:300-400ms -- **吞吐量**:~2-3 queries/秒(单线程) -- **成功率**:>99% - -## ⚠️ 注意事项 - -1. **API Key 安全** - - 不要将 API Key 提交到代码仓库 - - 使用环境变量或配置文件 - -2. **成本控制** - - 云端 API 按使用量计费 - - 建议缓存常用查询的向量 - - 监控使用量 - -3. **速率限制** - - 注意 API 速率限制 - - 测试脚本已添加适当延迟 - - 可根据需要调整 batch_size - -4. **网络依赖** - - 需要稳定的网络连接 - - 考虑添加重试机制 - -## 🔄 后续计划 - -建议的改进方向: -- [ ] 添加向量缓存机制 -- [ ] 实现自动重试逻辑 -- [ ] 添加多线程/异步支持 -- [ ] 集成到搜索模块 -- [ ] 添加性能监控 -- [ ] 实现成本追踪 - -## 🆚 与本地编码器对比 - -| 特性 | CloudTextEncoder | BgeEncoder | -|------|------------------|------------| -| 部署 | 无需部署 | 需要本地服务 | -| 延迟 | ~350ms | <100ms | -| 成本 | 按使用付费 | 固定成本 | -| 维护 | 无需维护 | 需要维护 | -| 离线 | 不支持 | 支持 | -| 扩展 | 自动扩展 | 手动扩展 | - -## 📞 获取帮助 - -- 查看文档:`docs/cloud_embedding_*.md` -- 运行示例:`python examples/cloud_embedding_example.py` -- 阿里云文档:https://help.aliyun.com/zh/model-studio/ - -## ✨ 总结 - -本次更新成功添加了云端向量化功能,包括: -- ✅ 完整的编码器实现 -- ✅ 详细的测试脚本 -- ✅ 丰富的示例代码 -- ✅ 完善的文档 - -所有功能已测试通过,可以直接使用。 - diff --git a/CLIP_SERVICE_README.md b/CLIP_SERVICE_README.md index 5c682a9..0a1f1fa 100644 --- a/CLIP_SERVICE_README.md +++ b/CLIP_SERVICE_README.md @@ -82,7 +82,7 @@ cd /data/saas-search - 现有本地向量服务: - 启动脚本:`./scripts/start_embedding_service.sh` - - 实现:`embeddings/server.py`(FastAPI + 本地模型 `bge_model.py` / `clip_model.py`) + - 实现:`embeddings/server.py`(FastAPI + 本地模型 `qwen3_model.py` / `clip_model.py`) - 新增的 `clip-server`: - 使用官方实现,单独进程、单独环境 - 面向图像 / 文本的 CLIP 向量化服务 @@ -189,8 +189,6 @@ INFO gateway/rep-0@XXXXX start server bound to 0.0.0.0:51000 ## 5. 参考 - 项目地址:`https://github.com/jina-ai/clip-as-service` -- 本项目向量模块文档:`embeddings/README.md`、`CLOUD_EMBEDDING_README.md` - - +- 本项目向量模块文档:`embeddings/README.md` diff --git a/config/config.yaml b/config/config.yaml index 3182c84..37eea8c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -162,16 +162,11 @@ services: location: "global" model: "" embedding: - provider: "http" # http | vllm(reserved) + provider: "http" # http base_url: "http://127.0.0.1:6005" providers: http: base_url: "http://127.0.0.1:6005" - vllm: - enabled: false - base_url: "" - model: "" - note: "reserved for future vLLM embedding backend" rerank: provider: "http" base_url: "http://127.0.0.1:6007" diff --git a/config/env_config.py b/config/env_config.py index 2b12323..833ccd0 100644 --- a/config/env_config.py +++ b/config/env_config.py @@ -79,8 +79,8 @@ EMBEDDING_SERVICE_URL = os.getenv('EMBEDDING_SERVICE_URL') or f'http://{EMBEDDIN TRANSLATION_SERVICE_URL = os.getenv('TRANSLATION_SERVICE_URL') or f'http://{TRANSLATION_HOST}:{TRANSLATION_PORT}' RERANKER_SERVICE_URL = os.getenv('RERANKER_SERVICE_URL') or f'http://{RERANKER_HOST}:{RERANKER_PORT}/rerank' -# Model Directories -TEXT_MODEL_DIR = os.getenv('TEXT_MODEL_DIR', '/data/tw/models/bge-m3') +# Model IDs / paths +TEXT_MODEL_DIR = os.getenv('TEXT_MODEL_DIR', os.getenv('TEXT_MODEL_ID', 'Qwen/Qwen3-Embedding-0.6B')) IMAGE_MODEL_DIR = os.getenv('IMAGE_MODEL_DIR', '/data/tw/models/cn-clip') # Cache Directory diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 90003a2..95023d7 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -154,7 +154,7 @@ docs/ # 文档(含本指南) ### 4.6 embeddings -- **职责**:提供向量服务(FastAPI):`POST /embed/text`、`POST /embed/image`;服务内按配置加载文本后端(如 BGE)与图像后端(如 clip-as-service 或本地 CN-CLIP),实现协议即可插拔。 +- **职责**:提供向量服务(FastAPI):`POST /embed/text`、`POST /embed/image`;服务内按配置加载文本后端(如 Qwen3-Embedding-0.6B)与图像后端(如 clip-as-service 或本地 CN-CLIP),实现协议即可插拔。 - **原则**:图片后端实现 `embeddings/protocols.ImageEncoderProtocol`;配置优先从 `config` 或 `embeddings/config.py` 读取,与 `services.embedding` 的 URL 分离。 - **详见**:`embeddings/README.md`、本指南 §7.5–§7.6。 @@ -303,7 +303,7 @@ services: |------|--------|------|--------| | 调用方 | `services..provider` | http | http | | 调用方 | `services..providers.http.base_url` | 6007 | 6005 | -| 服务内 | `services..backend` | bge / qwen3_vllm 等 | 见 embeddings/config | +| 服务内 | `services..backend` | qwen3_embedding_st / qwen3_vllm 等 | 见 embeddings/config | | 服务内 | `services..backends.` | 模型名、batch、vLLM 参数 | 模型名、device 等 | ### 7.6 新增后端清单(以 Qwen3-Reranker 为例) diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md index 47a011c..24683c5 100644 --- a/docs/QUICKSTART.md +++ b/docs/QUICKSTART.md @@ -396,7 +396,7 @@ services: 这里重点区分两层: - **Provider 层(调用方式)**:调用方如何访问能力(http/direct) -- **Backend 层(推理实现)**:服务进程内部具体模型实现(bge / qwen3_vllm / ...) +- **Backend 层(推理实现)**:服务进程内部具体模型实现(qwen3_embedding_st / qwen3_vllm / ...) ### 4.1 Rerank(重排)扩展 @@ -429,7 +429,7 @@ services: 服务实现: - `embeddings/server.py`(文本和图像可独立加载) -- 文本后端现用 BGE +- 文本后端现用 Qwen3-Embedding-0.6B(Sentence Transformers) - 图像后端支持本地 CLIP 或 clip-as-service 扩展建议: @@ -438,6 +438,12 @@ services: - 图像后端实现 `ImageEncoderProtocol`(`encode_image_urls`) - 如后续后端增多,建议与 rerank 一样在 `services.embedding.backend(s)` 统一配置 +选型建议(Qwen3-Embedding-0.6B): + +- 当前仓库默认:`Sentence Transformers` 本地推理(改造成本最低,便于直接替换 6005 现有服务) +- 生产高并发优先:`TEI`(Text Embeddings Inference)通常比 vLLM 更贴合 embedding-only 场景 +- `vLLM` 更适合生成式/重排混合场景;若仅做文本 embedding,通常不作为首选 + ### 4.3 新增后端检查清单(以 `qwen3_vllm` 为例) 1. 实现后端协议(服务内) diff --git a/docs/TODO.txt b/docs/TODO.txt index 9e10dc0..6edc6c3 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -32,28 +32,18 @@ HOST:10.200.16.14 / localhost 你安装过nvidia-container-toolkit吗 现在有一些开源的推理引擎对向量化模型和重排模型支持的比较好,我们这块也正好要单独拎出来,因此想改造下。 -调研了TEI, vLLM, xinference,目前觉得最合适的是xinference+vLLM后端, -最好以docker方式部署,让gpu对docker可见需要nvidia-container-toolkit, -我试了多种方法安装赖nvidia-container-toolkit都失败了 +已决策:embedding 先统一为 Qwen3-Embedding-0.6B(Sentence Transformers,本地 6005),后续若要独立高并发服务优先评估 TEI。 +最好以 docker 方式部署,让 gpu 对 docker 可见需要 nvidia-container-toolkit, +我试了多种方法安装 nvidia-container-toolkit 都失败了 https://mirrors.aliyun.com/github/releases/NVIDIA/nvidia-container-toolkit/ https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/index.html -bge-m3 qwen3-embedding qwen3-reranker -大概耗时是0.026S,现在用这个xinference都得0.5S,看这个xinference的安装和embedding模型的部署是不是有问题 - - - -xinference是直接支持了embedding和reranker模型这些模型类型,相当于vllm的上层的封装,因此调用接口很简单,也支持bge和qwen3系列。 但是性能这么差 估计是有啥问题。 -不好查的话,用vllm或者其他的推理引擎也行, - -选一个推理引擎,相比于我自己直接调modelscope/sentence-transformers,主要是多进程和负载均衡、连续批处理,比较有用 -不知道我理解的有没有问题 -调研了TEI, vLLM, xinference,+vLLM后端 -这个推理引擎怎么选合适,是选VLLM还是xinference +选一个推理引擎,相比于我自己直接调 sentence-transformers,主要是多进程和负载均衡、连续批处理,比较有用 +当前结论:embedding 场景优先 TEI;vLLM 更偏向生成式与 rerank 场景。 @@ -83,4 +73,3 @@ https://cloud.tencent.com/document/product/1729/113395#4.-.E7.A4.BA.E4.BE.8B 登录 百炼美国地域控制台:https://modelstudio.console.aliyun.com/us-east-1?spm=5176.2020520104.0.0.6b383a98WjpXff 在 API Key 管理 中创建或复制一个适用于美国地域的 Key - diff --git a/docs/向量化模块和API说明文档.md b/docs/向量化模块和API说明文档.md index 9103e89..d73751f 100644 --- a/docs/向量化模块和API说明文档.md +++ b/docs/向量化模块和API说明文档.md @@ -10,6 +10,7 @@ ## 配置 - Provider/URL:`config/config.yaml` 的 `services.embedding` -- 模型路径:`embeddings/config.py` 或 env `TEXT_MODEL_DIR`、`IMAGE_MODEL_DIR` +- 文本模型:`embeddings/config.py` 的 `TEXT_MODEL_ID`(默认 `Qwen/Qwen3-Embedding-0.6B`) +- 运行参数:`TEXT_DEVICE`、`TEXT_BATCH_SIZE`、`TEXT_NORMALIZE_EMBEDDINGS` 详见 `embeddings/README.md`。 diff --git a/docs/系统设计文档.md b/docs/系统设计文档.md index 2be372f..7169eff 100644 --- a/docs/系统设计文档.md +++ b/docs/系统设计文档.md @@ -420,11 +420,11 @@ query_config: 1. **条件生成**: - 仅当 `enable_text_embedding=true` 时生成向量 - 仅对 `default` 域查询生成向量 -2. **向量模型**:BGE-M3 模型(1024维向量) +2. **向量模型**:Qwen3-Embedding-0.6B(1024维向量) 3. **用途**:用于语义搜索(KNN 检索) #### 实现模块 -- `embeddings/bge_encoder.py` - BGE 文本编码器 +- `embeddings/text_encoder.py` + `embeddings/server.py` - 文本向量服务客户端与服务端实现 - `query/query_parser.py` - 查询解析器(集成向量生成) --- @@ -585,7 +585,7 @@ ranking: - ✅ 查询改写(词典配置) - ✅ 语言检测 - ✅ 多语言翻译(DeepL API) -- ✅ 文本向量化(BGE-M3) +- ✅ 文本向量化(Qwen3-Embedding-0.6B) - ✅ 域提取(支持 `domain:query` 语法) ### 6.4 搜索功能 @@ -623,7 +623,7 @@ ranking: - **后端**:Python 3.6+ - **搜索引擎**:Elasticsearch - **数据库**:MySQL(Shoplazza) -- **向量模型**:BGE-M3(文本)、CN-CLIP(图片) +- **向量模型**:Qwen3-Embedding-0.6B(文本)、CN-CLIP(图片) - **翻译服务**:DeepL API - **API 框架**:FastAPI - **前端**:HTML + JavaScript diff --git a/docs/系统设计文档v1.md b/docs/系统设计文档v1.md index 8b32944..24d3182 100644 --- a/docs/系统设计文档v1.md +++ b/docs/系统设计文档v1.md @@ -1,5 +1,7 @@ # 搜索引擎通用化开发进度 +> 历史版本说明:本文件为 v1 归档文档,部分模型与实现细节(如 BGE-M3)已过时。当前以 `docs/系统设计文档.md` 与 `docs/QUICKSTART.md` 为准。 + ## 项目概述 对后端搜索技术 做通用化。 diff --git a/docs/索引字段说明v1.md b/docs/索引字段说明v1.md index c24cc61..3753e0c 100644 --- a/docs/索引字段说明v1.md +++ b/docs/索引字段说明v1.md @@ -1,5 +1,7 @@ # 索引字段说明文档 +> 历史版本说明:本文件为 v1 归档文档,部分模型与实现细节(如 BGE-M3)已过时。当前以 `docs/索引字段说明v2.md` 与 `docs/向量化模块和API说明文档.md` 为准。 + 本文档详细说明了 Elasticsearch 索引中所有字段的类型、索引方式、数据来源等信息。 ## 设计思路 diff --git a/docs/索引字段说明v2.md b/docs/索引字段说明v2.md index bcfe6f2..3dc2cd4 100644 --- a/docs/索引字段说明v2.md +++ b/docs/索引字段说明v2.md @@ -352,7 +352,7 @@ | 字段名 | ES类型 | 维度 | 说明 | 数据来源 | |--------|--------|------|------|----------| -| `title_embedding` | dense_vector | 1024 | 标题向量(用于语义搜索) | 由 BGE-M3 模型生成 | +| `title_embedding` | dense_vector | 1024 | 标题向量(用于语义搜索) | 由 Qwen3-Embedding-0.6B 模型生成 | | `image_embedding` | nested | - | 图片向量(用于图片搜索) | 由 CN-CLIP 模型生成 | **注意**: 这些字段仅用于搜索,不会返回给前端。 @@ -462,4 +462,4 @@ filters AND (text_recall OR embedding_recall) 3. **多语言支持**: 文本字段支持中英文,后端根据 `language` 参数自动选择 4. **规格分面**: `specifications` 使用嵌套聚合,按 `name` 分组,然后按 `value` 聚合 5. **向量字段**: `title_embedding` 和 `image_embedding` 仅用于搜索,不返回给前端 - \ No newline at end of file + diff --git a/embeddings/CLOUD_EMBEDDING_README.md b/embeddings/CLOUD_EMBEDDING_README.md deleted file mode 100644 index 96d5204..0000000 --- a/embeddings/CLOUD_EMBEDDING_README.md +++ /dev/null @@ -1,307 +0,0 @@ -# 云端向量化模块 - 更新说明 - -## 📝 概述 - -本次更新为 saas-search 项目添加了基于阿里云 DashScope API 的云端文本向量化功能,使用 `text-embedding-v4` 模型。 - -## 🎯 主要功能 - -1. **CloudTextEncoder** - 云端文本向量化编码器 - - 单例模式,线程安全 - - 支持单个/批量文本向量化 - - 自动错误处理和降级 - - 生成 1024 维向量 - -2. **测试脚本** - 使用 queries.txt 测试向量化 - - 读取前 100 条查询 - - 记录每次请求的时间和耗时 - - 统计成功率和性能指标 - -3. **示例代码** - 展示如何使用模块 - - 单个文本向量化 - - 批量处理 - - 相似度计算 - -## 📁 文件结构 - -``` -saas-search/ -├── embeddings/ -│ ├── cloud_text_encoder.py # 云端向量化编码器(新增) -│ ├── text_encoder.py # 本地编码器(现有) -│ └── ... -├── scripts/ -│ ├── test_cloud_embedding.py # 测试脚本(新增) -│ └── ... -├── examples/ -│ └── cloud_embedding_example.py # 示例代码(新增) -├── docs/ -│ ├── cloud_embedding_usage.md # 详细文档(新增) -│ └── cloud_embedding_quickstart.md # 快速入门(新增) -├── data_crawling/ -│ └── queries.txt # 测试数据 -├── requirements.txt # 已添加 openai>=1.0.0 -└── CLOUD_EMBEDDING_README.md # 本文档(新增) -``` - -## 🚀 快速开始 - -### 1. 安装依赖 - -```bash -pip install openai -``` - -或使用项目 requirements: -```bash -pip install -r requirements.txt -``` - -### 2. 设置 API Key - -```bash -export DASHSCOPE_API_KEY="sk-your-api-key-here" -``` - -获取 API Key:https://help.aliyun.com/zh/model-studio/get-api-key - -### 3. 运行测试 - -```bash -# 测试向量化(使用 queries.txt 前 100 条) -python scripts/test_cloud_embedding.py - -# 运行示例代码 -python examples/cloud_embedding_example.py -``` - -## 📖 使用方法 - -### 基础使用 - -```python -from embeddings.cloud_text_encoder import CloudTextEncoder - -# 初始化编码器 -encoder = CloudTextEncoder() - -# 单个文本向量化 -embedding = encoder.encode("衣服的质量杠杠的") -print(embedding.shape) # (1, 1024) - -# 批量向量化 -embeddings = encoder.encode(["文本1", "文本2", "文本3"]) -print(embeddings.shape) # (3, 1024) -``` - -### 批量处理 - -```python -# 大批量自动分批处理 -texts = [f"商品 {i}" for i in range(1000)] -embeddings = encoder.encode_batch(texts, batch_size=32) -``` - -## 🧪 测试脚本 - -测试脚本 `scripts/test_cloud_embedding.py` 功能: - -✅ 读取 `data_crawling/queries.txt` 前 100 条查询 -✅ 逐条发送向量化请求 -✅ 记录每次请求的发送时间、接收时间、耗时 -✅ 输出向量维度和内容 -✅ 统计成功率、平均耗时、吞吐量 - -### 测试输出示例 - -``` -================================================================================ -Cloud Text Embedding Test - Aliyun DashScope API -================================================================================ - -[ 1/100] ✓ SUCCESS - Query: Bohemian Maxi Dress - Send Time: 2025-12-05 10:30:45.123 - Receive Time: 2025-12-05 10:30:45.456 - Duration: 0.333s - Embedding Shape: (1, 1024) - -... - -================================================================================ -Test Summary -================================================================================ -Total Queries: 100 -Successful: 100 -Failed: 0 -Success Rate: 100.0% -Total Time: 35.123s -Total API Time: 32.456s -Average Duration: 0.325s per query -Throughput: 2.85 queries/second -================================================================================ -``` - -## 📊 性能特点 - -- **向量维度**:1024 -- **平均延迟**:300-400ms/请求 -- **吞吐量**:~2-3 queries/秒(单线程) -- **错误处理**:自动降级到零向量 -- **批处理**:支持自动分批和速率控制 - -## 🔧 接口说明 - -### CloudTextEncoder API - -#### 初始化 - -```python -CloudTextEncoder(api_key=None, base_url=None) -``` - -参数: -- `api_key` (str, optional): API Key,默认从环境变量读取 -- `base_url` (str, optional): API 端点,默认北京地域 - -#### encode() - -```python -encode(sentences, normalize_embeddings=True, device='cpu', batch_size=32) -``` - -参数: -- `sentences` (str or List[str]): 单个文本或文本列表 -- `normalize_embeddings` (bool): 是否归一化(API 自动处理) -- `device` (str): 设备参数(兼容性参数,云端 API 忽略) -- `batch_size` (int): 批处理大小 - -返回: -- `np.ndarray`: 形状为 (n, 1024) 的 numpy 数组 - -#### encode_batch() - -```python -encode_batch(texts, batch_size=32, device='cpu') -``` - -参数: -- `texts` (List[str]): 文本列表 -- `batch_size` (int): 批处理大小 -- `device` (str): 设备参数(兼容性参数) - -返回: -- `np.ndarray`: 向量矩阵 - -## 📚 文档 - -- **快速入门**:`docs/cloud_embedding_quickstart.md` -- **详细文档**:`docs/cloud_embedding_usage.md` -- **示例代码**:`examples/cloud_embedding_example.py` - -## ⚠️ 注意事项 - -1. **API Key 管理**:妥善保管 API Key,不要提交到代码仓库 -2. **成本控制**:云端 API 按使用量计费,注意控制调用次数 -3. **速率限制**:注意 API 速率限制,测试脚本已添加延迟 -4. **网络依赖**:需要稳定的网络连接 -5. **错误处理**:API 失败时会返回零向量,请检查日志 - -## 🆚 对比本地编码器 - -| 特性 | CloudTextEncoder | BgeEncoder (本地) | -|------|------------------|-------------------| -| 部署方式 | 云端 API | 本地服务 | -| 初始成本 | 低 | 高(GPU/CPU) | -| 运行成本 | 按使用付费 | 固定 | -| 延迟 | ~300-400ms | <100ms | -| 离线使用 | ❌ | ✅ | -| 维护成本 | 低 | 需要维护 | -| 扩展性 | 自动扩展 | 手动扩展 | - -## 🔄 集成建议 - -### 选择使用场景 - -**使用 CloudTextEncoder(云端):** -- 初期开发和测试 -- 查询量不大的应用 -- 不需要离线支持 -- 希望降低运维成本 - -**使用 BgeEncoder(本地):** -- 大规模生产环境 -- 需要低延迟 -- 离线使用场景 -- 查询量非常大 - -### 混合使用 - -```python -# 配置文件中选择编码器类型 -ENCODER_TYPE = os.getenv("ENCODER_TYPE", "local") # local or cloud - -if ENCODER_TYPE == "cloud": - from embeddings.cloud_text_encoder import CloudTextEncoder - encoder = CloudTextEncoder() -else: - from embeddings.text_encoder import BgeEncoder - encoder = BgeEncoder() - -# 使用统一接口 -embeddings = encoder.encode(texts) -``` - -## 🐛 故障排查 - -### 问题 1:API Key 未设置 -```bash -export DASHSCOPE_API_KEY="sk-your-key" -``` - -### 问题 2:网络连接失败 -- 检查网络连接 -- 验证 base_url 是否正确 -- 确认防火墙设置 - -### 问题 3:速率限制 -- 减小 batch_size -- 增加请求间隔 -- 升级 API 套餐 - -### 问题 4:返回零向量 -- 检查日志中的错误信息 -- 验证 API Key 是否有效 -- 确认账户余额 - -## 🎓 示例代码 - -查看 `examples/cloud_embedding_example.py` 了解完整示例: -- 单个/批量文本向量化 -- 相似度计算 -- 错误处理 - -## 📞 支持 - -- 项目文档:`docs/` 目录 -- 阿里云文档:https://help.aliyun.com/zh/model-studio/ -- API 文档:https://help.aliyun.com/zh/model-studio/getting-started/models - -## ✅ 验证清单 - -完成以下步骤确认模块正常工作: - -- [ ] 安装了 openai 包 -- [ ] 设置了 DASHSCOPE_API_KEY 环境变量 -- [ ] 运行测试脚本成功 -- [ ] 查看了示例代码 -- [ ] 阅读了文档 - -## 📅 更新日期 - -2025-12-05 - -## 👨‍💻 维护 - -如有问题或建议,请联系项目维护者。 - diff --git a/embeddings/README.md b/embeddings/README.md index e39b8ac..b25d2f6 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -7,12 +7,20 @@ 这个目录是一个完整的“向量化模块”,包含: - **HTTP 客户端**:`text_encoder.py` / `image_encoder.py`(供搜索/索引模块调用) -- **本地模型实现**:`bge_model.py` / `clip_model.py` +- **本地模型实现**:`qwen3_model.py` / `clip_model.py` - **clip-as-service 客户端**:`clip_as_service_encoder.py`(图片向量,推荐) - **向量化服务(FastAPI)**:`server.py` - **统一配置**:`config.py` - **接口契约**:`protocols.ImageEncoderProtocol`(图片编码统一为 `encode_image_urls(urls, batch_size)`,本地 CN-CLIP 与 clip-as-service 均实现该接口) +说明:历史上的云端 embedding 试验实现(DashScope)已从主仓库移除,当前仅维护 6005 这条统一向量服务链路。 + +### 文本向量后端(默认) + +- 6005 文本向量服务默认模型:`Qwen/Qwen3-Embedding-0.6B` +- 实现方式:`SentenceTransformer`(`trust_remote_code=True`) +- 向量归一化:默认开启(可通过环境变量关闭) + ### 服务接口 - `POST /embed/text` @@ -54,7 +62,6 @@ 编辑 `embeddings/config.py`: - `PORT`: 服务端口(默认 6005) -- `TEXT_MODEL_DIR`, `TEXT_DEVICE`, `TEXT_BATCH_SIZE` +- `TEXT_MODEL_ID`, `TEXT_DEVICE`, `TEXT_BATCH_SIZE`, `TEXT_NORMALIZE_EMBEDDINGS` - `USE_CLIP_AS_SERVICE`, `CLIP_AS_SERVICE_SERVER`:图片向量(clip-as-service) - `IMAGE_MODEL_NAME`, `IMAGE_DEVICE`:本地 CN-CLIP(当 `USE_CLIP_AS_SERVICE=false` 时) - diff --git a/embeddings/__init__.py b/embeddings/__init__.py index 8cd0723..7f1e37c 100644 --- a/embeddings/__init__.py +++ b/embeddings/__init__.py @@ -4,7 +4,8 @@ Embeddings module. Important: keep package import lightweight. Some callers do: - - `from embeddings import BgeEncoder` + - `from embeddings import TextEmbeddingEncoder` + - `from embeddings import BgeEncoder` (deprecated alias) - `from embeddings import CLIPImageEncoder` But the underlying implementations may import heavy optional deps (Pillow, torch, etc). @@ -13,15 +14,19 @@ without importing client code), we provide small lazy factories here. """ -class BgeEncoder(object): - """Lazy factory for `embeddings.text_encoder.BgeEncoder`.""" +class TextEmbeddingEncoder(object): + """Lazy factory for `embeddings.text_encoder.TextEmbeddingEncoder`.""" def __new__(cls, *args, **kwargs): - from .text_encoder import BgeEncoder as _Real + from .text_encoder import TextEmbeddingEncoder as _Real return _Real(*args, **kwargs) +class BgeEncoder(TextEmbeddingEncoder): + """Deprecated backward-compatible alias for old class name.""" + + class CLIPImageEncoder(object): """Lazy factory for `embeddings.image_encoder.CLIPImageEncoder`.""" @@ -31,4 +36,4 @@ class CLIPImageEncoder(object): return _Real(*args, **kwargs) -__all__ = ["BgeEncoder", "CLIPImageEncoder"] +__all__ = ["TextEmbeddingEncoder", "BgeEncoder", "CLIPImageEncoder"] diff --git a/embeddings/bge_model.py b/embeddings/bge_model.py deleted file mode 100644 index 2eec3bd..0000000 --- a/embeddings/bge_model.py +++ /dev/null @@ -1,81 +0,0 @@ -""" -BGE-M3 local text embedding implementation. - -Internal model implementation used by the embedding service. -""" - -import threading -from typing import List, Union - -import numpy as np -from sentence_transformers import SentenceTransformer -from modelscope import snapshot_download - - -class BgeTextModel(object): - """ - Thread-safe singleton text encoder using BGE-M3 model (local inference). - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls, model_dir: str = "Xorbits/bge-m3"): - with cls._lock: - if cls._instance is None: - cls._instance = super(BgeTextModel, cls).__new__(cls) - cls._instance.model = SentenceTransformer(snapshot_download(model_dir)) - return cls._instance - - def encode( - self, - sentences: Union[str, List[str]], - normalize_embeddings: bool = True, - device: str = "cuda", - batch_size: int = 32, - ) -> np.ndarray: - if device == "gpu": - device = "cuda" - - # Try requested device, fallback to CPU if CUDA fails - try: - if device == "cuda": - import torch - - if torch.cuda.is_available(): - free_memory = ( - torch.cuda.get_device_properties(0).total_memory - - torch.cuda.memory_allocated() - ) - if free_memory < 1024 * 1024 * 1024: # 1GB - device = "cpu" - else: - device = "cpu" - - self.model = self.model.to(device) - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device=device, - show_progress_bar=False, - batch_size=batch_size, - ) - return embeddings - - except Exception: - if device != "cpu": - self.model = self.model.to("cpu") - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device="cpu", - show_progress_bar=False, - batch_size=batch_size, - ) - return embeddings - raise - - def encode_batch(self, texts: List[str], batch_size: int = 32, device: str = "cuda") -> np.ndarray: - return self.encode(texts, batch_size=batch_size, device=device) - - diff --git a/embeddings/cloud_embedding_usage.md b/embeddings/cloud_embedding_usage.md deleted file mode 100644 index edb80bb..0000000 --- a/embeddings/cloud_embedding_usage.md +++ /dev/null @@ -1,231 +0,0 @@ -# 云端向量化模块使用说明 - -## 概述 - -本项目新增了基于阿里云 DashScope API 的文本向量化模块,使用 `text-embedding-v4` 模型生成文本向量。 - -## 模块说明 - -### 1. CloudTextEncoder (`embeddings/cloud_text_encoder.py`) - -云端文本向量化编码器,使用阿里云 DashScope API。 - -**特性:** -- 单例模式,线程安全 -- 支持单个或批量文本向量化 -- 自动处理 API 调用和错误处理 -- 生成 1024 维向量 -- 支持批处理以避免 API 速率限制 - -**初始化:** -```python -from embeddings.cloud_text_encoder import CloudTextEncoder - -# 方式1:从环境变量读取 API Key -encoder = CloudTextEncoder() - -# 方式2:显式传入 API Key -encoder = CloudTextEncoder(api_key="sk-xxx") - -# 方式3:使用新加坡地域 -encoder = CloudTextEncoder( - api_key="sk-xxx", - base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1" -) -``` - -**使用示例:** -```python -# 单个文本向量化 -text = "衣服的质量杠杠的" -embedding = encoder.encode(text) -print(embedding.shape) # (1, 1024) - -# 批量文本向量化 -texts = ["文本1", "文本2", "文本3"] -embeddings = encoder.encode(texts) -print(embeddings.shape) # (3, 1024) - -# 大批量处理(自动分批) -large_texts = ["文本" + str(i) for i in range(1000)] -embeddings = encoder.encode_batch(large_texts, batch_size=32) -print(embeddings.shape) # (1000, 1024) -``` - -### 2. 测试脚本 (`scripts/test_cloud_embedding.py`) - -测试云端向量化功能,读取 `queries.txt` 前100条数据进行测试。 - -**功能:** -- 读取查询文件 -- 逐条发送向量化请求 -- 记录每次请求的发送时间、接收时间和耗时 -- 统计成功率和平均耗时 - -## 使用步骤 - -### 1. 设置 API Key - -首先需要获取阿里云 DashScope API Key: -- 北京地域:https://help.aliyun.com/zh/model-studio/get-api-key -- 新加坡地域:使用不同的 API Key - -设置环境变量: -```bash -export DASHSCOPE_API_KEY="your-api-key-here" -``` - -或者在项目根目录的 `.env` 文件中添加: -``` -DASHSCOPE_API_KEY=your-api-key-here -``` - -### 2. 安装依赖 - -确保已安装 OpenAI Python SDK: -```bash -pip install openai -``` - -或者使用项目的 requirements.txt: -```bash -pip install -r requirements.txt -``` - -### 3. 运行测试脚本 - -```bash -# 在项目根目录下运行 -python scripts/test_cloud_embedding.py -``` - -### 4. 查看测试结果 - -测试脚本会输出以下信息: -``` -================================================================================ -Cloud Text Embedding Test - Aliyun DashScope API -================================================================================ - -API Key: sk-xxxxxx...xxxx - -Reading queries from: /path/to/queries.txt -Successfully read 100 queries - -Initializing CloudTextEncoder... -CloudTextEncoder initialized successfully - -================================================================================ -Testing 100 queries (one by one) -================================================================================ - -[ 1/100] ✓ SUCCESS - Query: Bohemian Maxi Dress - Send Time: 2025-12-05 10:30:45.123 - Receive Time: 2025-12-05 10:30:45.456 - Duration: 0.333s - Embedding Shape: (1, 1024) - -[ 2/100] ✓ SUCCESS - Query: Vintage Denim Jacket - Send Time: 2025-12-05 10:30:45.789 - Receive Time: 2025-12-05 10:30:46.012 - Duration: 0.223s - Embedding Shape: (1, 1024) - -... - -================================================================================ -Test Summary -================================================================================ -Total Queries: 100 -Successful: 100 -Failed: 0 -Success Rate: 100.0% -Total Time: 35.123s -Total API Time: 32.456s -Average Duration: 0.325s per query -Throughput: 2.85 queries/second -================================================================================ -``` - -## 注意事项 - -1. **API 限制**:阿里云 DashScope API 有速率限制,测试脚本在批处理时会添加小延迟以避免触发限制。 - -2. **成本**:使用云端 API 会产生费用,请注意控制调用量。 - -3. **网络要求**:需要稳定的网络连接访问阿里云服务。 - -4. **错误处理**:如果 API 调用失败,编码器会返回零向量作为降级处理,并记录错误日志。 - -5. **向量维度**:`text-embedding-v4` 模型生成 1024 维向量,如果需要不同维度,请考虑使用其他模型。 - -## 地域选择 - -- **北京地域**(默认):`https://dashscope.aliyuncs.com/compatible-mode/v1` -- **新加坡地域**:`https://dashscope-intl.aliyuncs.com/compatible-mode/v1` - -不同地域使用不同的 API Key,请确保匹配。 - -## 集成到项目 - -在项目中使用云端向量化: - -```python -from embeddings.cloud_text_encoder import CloudTextEncoder - -# 初始化编码器(全局单例) -encoder = CloudTextEncoder() - -# 在搜索、索引等模块中使用 -def process_text(text: str): - embedding = encoder.encode(text) - # 使用 embedding 进行后续处理 - return embedding -``` - -## 对比本地编码器 - -| 特性 | CloudTextEncoder | BgeEncoder (本地) | -|------|------------------|-------------------| -| 部署方式 | 云端 API | 本地服务 | -| 初始成本 | 低(按使用付费) | 高(GPU/CPU 资源) | -| 运行成本 | 按调用量计费 | 固定资源成本 | -| 延迟 | 较高(网络往返) | 低(本地处理) | -| 吞吐量 | 受 API 限制 | 受硬件限制 | -| 离线使用 | 不支持 | 支持 | -| 维护成本 | 低 | 需要维护服务 | - -## 故障排除 - -### 问题:API Key 未设置 -``` -ERROR: DASHSCOPE_API_KEY environment variable is not set! -``` -**解决**:设置 `DASHSCOPE_API_KEY` 环境变量。 - -### 问题:API 调用失败 -``` -Failed to encode texts via DashScope API: ... -``` -**解决**: -1. 检查网络连接 -2. 验证 API Key 是否正确 -3. 确认 base_url 与 API Key 地域匹配 -4. 检查账户余额 - -### 问题:速率限制 -``` -Rate limit exceeded -``` -**解决**: -1. 增加批处理之间的延迟 -2. 减小 batch_size -3. 升级 API 套餐 - -## 更多信息 - -- [阿里云模型服务灵积文档](https://help.aliyun.com/zh/model-studio/) -- [text-embedding-v4 模型说明](https://help.aliyun.com/zh/model-studio/getting-started/models) - diff --git a/embeddings/cloud_text_encoder.py b/embeddings/cloud_text_encoder.py deleted file mode 100644 index 9c8360d..0000000 --- a/embeddings/cloud_text_encoder.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -Text embedding encoder using Aliyun DashScope API. - -Generates embeddings via Aliyun's text-embedding-v4 model. -""" - -import os -import logging -import threading -import time -import numpy as np -from typing import List, Union -from openai import OpenAI - -logger = logging.getLogger(__name__) - - -class CloudTextEncoder: - """ - Singleton text encoder using Aliyun DashScope API. - - Thread-safe singleton pattern ensures only one instance exists. - Uses text-embedding-v4 model for generating embeddings. - """ - _instance = None - _lock = threading.Lock() - - def __new__(cls, api_key: str = None, base_url: str = None): - with cls._lock: - if cls._instance is None: - cls._instance = super(CloudTextEncoder, cls).__new__(cls) - - # Get API key from parameter or environment variable - api_key = api_key or os.getenv("DASHSCOPE_API_KEY") - if not api_key: - raise ValueError("DASHSCOPE_API_KEY must be set in environment or passed as parameter") - - # 以下是北京地域base-url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 - base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1" - - cls._instance.client = OpenAI( - api_key=api_key, - base_url=base_url - ) - cls._instance.model = "text-embedding-v4" - logger.info(f"Created CloudTextEncoder instance with base_url: {base_url}") - - return cls._instance - - def encode( - self, - sentences: Union[str, List[str]], - normalize_embeddings: bool = True, - device: str = 'cpu', - batch_size: int = 32 - ) -> np.ndarray: - """ - Encode text into embeddings via Aliyun DashScope API. - - Args: - sentences: Single string or list of strings to encode - normalize_embeddings: Whether to normalize embeddings (handled by API) - device: Device parameter (ignored, for compatibility) - batch_size: Batch size for processing (currently processes all at once) - - Returns: - numpy array of shape (n, dimension) containing embeddings - """ - # Convert single string to list - if isinstance(sentences, str): - sentences = [sentences] - - if not sentences: - return np.array([]) - - try: - # Call DashScope API - start_time = time.time() - completion = self.client.embeddings.create( - model=self.model, - input=sentences - ) - elapsed_time = time.time() - start_time - - logger.info(f"Generated embeddings for {len(sentences)} texts in {elapsed_time:.3f}s") - - # Extract embeddings from response - embeddings = [] - for item in completion.data: - embeddings.append(item.embedding) - - return np.array(embeddings, dtype=np.float32) - - except Exception as e: - logger.error(f"Failed to encode texts via DashScope API: {e}", exc_info=True) - # Return zero embeddings as fallback (dimension based on text-embedding-v4) - # text-embedding-v4 typically returns 1024-dimensional vectors - return np.zeros((len(sentences), 1024), dtype=np.float32) - - def encode_batch( - self, - texts: List[str], - batch_size: int = 32, - device: str = 'cpu' - ) -> np.ndarray: - """ - Encode a batch of texts via Aliyun DashScope API. - - Args: - texts: List of texts to encode - batch_size: Batch size for processing - device: Device parameter (ignored, for compatibility) - - Returns: - numpy array of embeddings - """ - if not texts: - return np.array([]) - - # Process in batches to avoid API limits - all_embeddings = [] - - for i in range(0, len(texts), batch_size): - batch = texts[i:i + batch_size] - embeddings = self.encode(batch, device=device) - all_embeddings.append(embeddings) - - # Small delay to avoid rate limiting - if i + batch_size < len(texts): - time.sleep(0.1) - - return np.vstack(all_embeddings) if all_embeddings else np.array([]) - - def get_embedding_dimension(self) -> int: - """ - Get the dimension of embeddings produced by this encoder. - - Returns: - Embedding dimension (1024 for text-embedding-v4) - """ - return 1024 - diff --git a/embeddings/config.py b/embeddings/config.py index 767a15a..0f869c2 100644 --- a/embeddings/config.py +++ b/embeddings/config.py @@ -16,10 +16,13 @@ class EmbeddingConfig(object): HOST = os.getenv("EMBEDDING_HOST", "0.0.0.0") PORT = int(os.getenv("EMBEDDING_PORT", 6005)) - # Text embeddings (BGE-M3) - TEXT_MODEL_DIR = "Xorbits/bge-m3" - TEXT_DEVICE = "cuda" # "cuda" or "cpu" (model may fall back to CPU if needed) - TEXT_BATCH_SIZE = 32 + # Text embeddings (Qwen3-Embedding-0.6B, Sentence Transformers) + TEXT_MODEL_ID = os.getenv("TEXT_MODEL_ID", "Qwen/Qwen3-Embedding-0.6B") + # Backward-compatible alias for old naming in docs/scripts. + TEXT_MODEL_DIR = TEXT_MODEL_ID + TEXT_DEVICE = os.getenv("TEXT_DEVICE", "cuda") # "cuda" or "cpu" + TEXT_BATCH_SIZE = int(os.getenv("TEXT_BATCH_SIZE", "32")) + TEXT_NORMALIZE_EMBEDDINGS = os.getenv("TEXT_NORMALIZE_EMBEDDINGS", "true").lower() in ("1", "true", "yes") # Image embeddings # Option A: clip-as-service (Jina CLIP server, recommended) @@ -36,4 +39,3 @@ class EmbeddingConfig(object): CONFIG = EmbeddingConfig() - diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index 93cf0e6..956123d 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -1,17 +1,12 @@ -""" -Image embedding encoder using network service. +"""Image embedding client for the local embedding HTTP service.""" -Generates embeddings via HTTP API service (default localhost:6005). -""" - -import sys import os -import requests +import logging +from typing import Any, List, Optional, Union + import numpy as np +import requests from PIL import Image -import logging -import threading -from typing import List, Optional, Union, Dict, Any logger = logging.getLogger(__name__) @@ -22,21 +17,14 @@ class CLIPImageEncoder: """ Image Encoder for generating image embeddings using network service. - Thread-safe singleton pattern. + This client is stateless and safe to instantiate per caller. """ - _instance = None - _lock = threading.Lock() - - def __new__(cls, service_url: Optional[str] = None): - with cls._lock: - if cls._instance is None: - cls._instance = super(CLIPImageEncoder, cls).__new__(cls) - resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() - logger.info(f"Creating CLIPImageEncoder instance with service URL: {resolved_url}") - cls._instance.service_url = resolved_url - cls._instance.endpoint = f"{resolved_url}/embed/image" - return cls._instance + def __init__(self, service_url: Optional[str] = None): + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() + self.service_url = str(resolved_url).rstrip("/") + self.endpoint = f"{self.service_url}/embed/image" + logger.info("Creating CLIPImageEncoder instance with service URL: %s", self.service_url) def _call_service(self, request_data: List[str]) -> List[Any]: """ diff --git a/embeddings/qwen3_model.py b/embeddings/qwen3_model.py new file mode 100644 index 0000000..ea46f24 --- /dev/null +++ b/embeddings/qwen3_model.py @@ -0,0 +1,89 @@ +""" +Qwen3-Embedding-0.6B local text embedding implementation. + +Internal model implementation used by the embedding service. +""" + +import threading +from typing import List, Union + +import numpy as np +from sentence_transformers import SentenceTransformer + + +class Qwen3TextModel(object): + """ + Thread-safe singleton text encoder using Qwen3-Embedding-0.6B (local inference). + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, model_id: str = "Qwen/Qwen3-Embedding-0.6B"): + with cls._lock: + if cls._instance is None: + cls._instance = super(Qwen3TextModel, cls).__new__(cls) + cls._instance.model = SentenceTransformer(model_id, trust_remote_code=True) + return cls._instance + + def encode( + self, + sentences: Union[str, List[str]], + normalize_embeddings: bool = True, + device: str = "cuda", + batch_size: int = 32, + ) -> np.ndarray: + if device == "gpu": + device = "cuda" + + # Try requested device, fallback to CPU if CUDA is unavailable/insufficient. + try: + if device == "cuda": + import torch + + if torch.cuda.is_available(): + free_memory = ( + torch.cuda.get_device_properties(0).total_memory + - torch.cuda.memory_allocated() + ) + if free_memory < 1024 * 1024 * 1024: # 1GB + device = "cpu" + else: + device = "cpu" + + self.model = self.model.to(device) + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device=device, + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings + except Exception: + if device != "cpu": + self.model = self.model.to("cpu") + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device="cpu", + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings + raise + + def encode_batch( + self, + texts: List[str], + batch_size: int = 32, + device: str = "cuda", + normalize_embeddings: bool = True, + ) -> np.ndarray: + return self.encode( + texts, + batch_size=batch_size, + device=device, + normalize_embeddings=normalize_embeddings, + ) + diff --git a/embeddings/server.py b/embeddings/server.py index 5f46c7a..7966b40 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -14,9 +14,6 @@ import numpy as np from fastapi import FastAPI from embeddings.config import CONFIG -from embeddings.bge_model import BgeTextModel -from embeddings.clip_model import ClipImageModel -from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder from embeddings.protocols import ImageEncoderProtocol logger = logging.getLogger(__name__) @@ -24,7 +21,7 @@ logger = logging.getLogger(__name__) app = FastAPI(title="saas-search Embedding Service", version="1.0.0") # Models are loaded at startup, not lazily -_text_model: Optional[BgeTextModel] = None +_text_model: Optional[Any] = None _image_model: Optional[ImageEncoderProtocol] = None open_text_model = True open_image_model = True # Enable image embedding when using clip-as-service @@ -43,8 +40,10 @@ def load_models(): # Load text model if open_text_model: try: - logger.info(f"Loading text model: {CONFIG.TEXT_MODEL_DIR}") - _text_model = BgeTextModel(model_dir=CONFIG.TEXT_MODEL_DIR) + from embeddings.qwen3_model import Qwen3TextModel + + logger.info(f"Loading text model: {CONFIG.TEXT_MODEL_ID}") + _text_model = Qwen3TextModel(model_id=CONFIG.TEXT_MODEL_ID) logger.info("Text model loaded successfully") except Exception as e: logger.error(f"Failed to load text model: {e}", exc_info=True) @@ -58,6 +57,8 @@ def load_models(): if open_image_model: try: if CONFIG.USE_CLIP_AS_SERVICE: + from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder + logger.info(f"Loading image encoder via clip-as-service: {CONFIG.CLIP_AS_SERVICE_SERVER}") _image_model = ClipAsServiceImageEncoder( server=CONFIG.CLIP_AS_SERVICE_SERVER, @@ -65,6 +66,8 @@ def load_models(): ) logger.info("Image model (clip-as-service) loaded successfully") else: + from embeddings.clip_model import ClipImageModel + logger.info(f"Loading local image model: {CONFIG.IMAGE_MODEL_NAME} (device: {CONFIG.IMAGE_DEVICE})") _image_model = ClipImageModel( model_name=CONFIG.IMAGE_MODEL_NAME, @@ -126,7 +129,10 @@ def embed_text(texts: List[str]) -> List[Optional[List[float]]]: try: with _text_encode_lock: embs = _text_model.encode_batch( - batch_texts, batch_size=int(CONFIG.TEXT_BATCH_SIZE), device=CONFIG.TEXT_DEVICE + batch_texts, + batch_size=int(CONFIG.TEXT_BATCH_SIZE), + device=CONFIG.TEXT_DEVICE, + normalize_embeddings=bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS), ) for j, (idx, _t) in enumerate(indexed_texts): out[idx] = _as_list(embs[j]) @@ -170,5 +176,3 @@ def embed_image(images: List[str]) -> List[Optional[List[float]]]: for idx in indices: out[idx] = None return out - - diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 4b60478..1221c28 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -1,20 +1,14 @@ -""" -Text embedding encoder using network service. +"""Text embedding client for the local embedding HTTP service.""" -Generates embeddings via HTTP API service (default localhost:6005). -""" +import logging +import os +import pickle +from datetime import timedelta +from typing import Any, List, Optional, Union -import sys -import requests -import time -import threading import numpy as np -import pickle import redis -import os -from datetime import timedelta -from typing import List, Union, Dict, Any, Optional -import logging +import requests logger = logging.getLogger(__name__) @@ -27,44 +21,34 @@ except ImportError: REDIS_CONFIG = {} -class BgeEncoder: +class TextEmbeddingEncoder: """ - Singleton text encoder using network service. - - Thread-safe singleton pattern ensures only one instance exists. + Text embedding encoder using network service. """ - _instance = None - _lock = threading.Lock() - def __new__(cls, service_url: Optional[str] = None): - with cls._lock: - if cls._instance is None: - cls._instance = super(BgeEncoder, cls).__new__(cls) - resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() - logger.info(f"Creating BgeEncoder instance with service URL: {resolved_url}") - cls._instance.service_url = resolved_url - cls._instance.endpoint = f"{resolved_url}/embed/text" - - # Initialize Redis cache - try: - cls._instance.redis_client = redis.Redis( - host=REDIS_CONFIG.get('host', 'localhost'), - port=REDIS_CONFIG.get('port', 6479), - password=REDIS_CONFIG.get('password'), - decode_responses=False, # Keep binary data as is - socket_timeout=REDIS_CONFIG.get('socket_timeout', 1), - socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1), - retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False), - health_check_interval=10 # 避免复用坏连接 - ) - # Test connection - cls._instance.redis_client.ping() - cls._instance.expire_time = timedelta(days=REDIS_CONFIG.get('cache_expire_days', 180)) - logger.info("Redis cache initialized for embeddings") - except Exception as e: - logger.warning(f"Failed to initialize Redis cache for embeddings: {e}, continuing without cache") - cls._instance.redis_client = None - return cls._instance + def __init__(self, service_url: Optional[str] = None): + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() + self.service_url = str(resolved_url).rstrip("/") + self.endpoint = f"{self.service_url}/embed/text" + self.expire_time = timedelta(days=REDIS_CONFIG.get("cache_expire_days", 180)) + logger.info("Creating TextEmbeddingEncoder instance with service URL: %s", self.service_url) + + try: + self.redis_client = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=False, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + self.redis_client.ping() + logger.info("Redis cache initialized for embeddings") + except Exception as e: + logger.warning("Failed to initialize Redis cache for embeddings: %s, continuing without cache", e) + self.redis_client = None def _call_service(self, request_data: List[str]) -> List[Any]: """ @@ -85,7 +69,7 @@ class BgeEncoder: response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: - logger.error(f"BgeEncoder service request failed: {e}", exc_info=True) + logger.error(f"TextEmbeddingEncoder service request failed: {e}", exc_info=True) raise def encode( @@ -122,7 +106,7 @@ class BgeEncoder: embeddings: List[Optional[np.ndarray]] = [None] * len(sentences) for i, text in enumerate(sentences): - cached = self._get_cached_embedding(text, 'en') # Use 'en' as default language for title embedding + cached = self._get_cached_embedding(text, "generic") if cached is not None: embeddings[i] = cached else: @@ -152,7 +136,7 @@ class BgeEncoder: if self._is_valid_embedding(embedding_array): embeddings[original_idx] = embedding_array # Cache the embedding - self._set_cached_embedding(text, 'en', embedding_array) + self._set_cached_embedding(text, "generic", embedding_array) else: logger.warning( f"Invalid embedding returned from service for text {original_idx} " @@ -270,3 +254,7 @@ class BgeEncoder: except Exception as e: logger.error(f"Error storing embedding in cache: {e}") return False + + +# Backward compatibility for existing imports/usages. +BgeEncoder = TextEmbeddingEncoder diff --git a/examples/cloud_embedding_example.py b/examples/cloud_embedding_example.py deleted file mode 100644 index e5693ea..0000000 --- a/examples/cloud_embedding_example.py +++ /dev/null @@ -1,167 +0,0 @@ -""" -简单示例:使用云端文本向量化模块 - -展示如何使用 CloudTextEncoder 进行文本向量化。 -""" - -import os -import sys -from pathlib import Path - -# Add parent directory to path -sys.path.insert(0, str(Path(__file__).parent.parent)) - -from embeddings.cloud_text_encoder import CloudTextEncoder - - -def example_single_text(): - """示例1:单个文本向量化""" - print("=" * 60) - print("示例1:单个文本向量化") - print("=" * 60) - - # 初始化编码器 - encoder = CloudTextEncoder() - - # 单个文本 - text = "衣服的质量杠杠的" - print(f"输入文本: {text}") - - # 生成向量 - embedding = encoder.encode(text) - - print(f"向量维度: {embedding.shape}") - print(f"向量前5个值: {embedding[0][:5]}") - print() - - -def example_multiple_texts(): - """示例2:批量文本向量化""" - print("=" * 60) - print("示例2:批量文本向量化") - print("=" * 60) - - # 初始化编码器 - encoder = CloudTextEncoder() - - # 多个文本 - texts = [ - "Bohemian Maxi Dress", - "Vintage Denim Jacket", - "Minimalist Linen Trousers", - "Gothic Black Boots", - "Streetwear Oversized Hoodie" - ] - - print(f"输入文本数量: {len(texts)}") - for i, text in enumerate(texts, 1): - print(f" {i}. {text}") - - # 生成向量 - embeddings = encoder.encode(texts) - - print(f"\n向量矩阵维度: {embeddings.shape}") - print(f"第一个文本的向量前5个值: {embeddings[0][:5]}") - print() - - -def example_batch_processing(): - """示例3:大批量处理""" - print("=" * 60) - print("示例3:大批量处理(自动分批)") - print("=" * 60) - - # 初始化编码器 - encoder = CloudTextEncoder() - - # 生成大量文本 - texts = [f"商品描述 {i}" for i in range(50)] - - print(f"输入文本数量: {len(texts)}") - print(f"批大小: 10") - - # 使用 encode_batch 自动分批处理 - embeddings = encoder.encode_batch(texts, batch_size=10) - - print(f"向量矩阵维度: {embeddings.shape}") - print(f"平均向量范数: {embeddings.mean():.4f}") - print() - - -def example_similarity_calculation(): - """示例4:计算文本相似度""" - print("=" * 60) - print("示例4:计算文本相似度") - print("=" * 60) - - import numpy as np - - # 初始化编码器 - encoder = CloudTextEncoder() - - # 准备文本 - query = "夏季连衣裙" - candidates = [ - "Summer maxi dress", - "冬季羽绒服", - "夏天长裙", - "运动鞋", - "女士连衣裙" - ] - - print(f"查询文本: {query}") - print(f"候选文本:") - for i, text in enumerate(candidates, 1): - print(f" {i}. {text}") - - # 生成向量 - query_embedding = encoder.encode(query) - candidate_embeddings = encoder.encode(candidates) - - # 计算余弦相似度 - def cosine_similarity(a, b): - return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) - - print(f"\n相似度分数:") - similarities = [] - for i, candidate_emb in enumerate(candidate_embeddings): - sim = cosine_similarity(query_embedding[0], candidate_emb) - similarities.append((sim, candidates[i])) - print(f" {candidates[i]}: {sim:.4f}") - - # 排序并显示最相似的 - similarities.sort(reverse=True) - print(f"\n最相似的文本: {similarities[0][1]} (相似度: {similarities[0][0]:.4f})") - print() - - -def main(): - """主函数""" - # 检查 API Key - if not os.getenv("DASHSCOPE_API_KEY"): - print("错误: 请设置 DASHSCOPE_API_KEY 环境变量") - print("示例: export DASHSCOPE_API_KEY='your-api-key'") - return - - print("\n云端文本向量化示例\n") - - try: - # 运行所有示例 - example_single_text() - example_multiple_texts() - example_batch_processing() - example_similarity_calculation() - - print("=" * 60) - print("所有示例运行完成!") - print("=" * 60) - - except Exception as e: - print(f"\n错误: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - main() - diff --git a/indexer/README.md b/indexer/README.md index 8515ab7..5ae2a7a 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -274,7 +274,7 @@ if (StrUtil.isNotBlank(spu.getTitle()) && CollUtil.isNotEmpty(titleEmbedding)) { 你当前 Python 侧已有: -- `embeddings/text_encoder.py`(BGE-M3 模型); +- `embeddings/text_encoder.py`(通过 6005 向量服务调用 Qwen3-Embedding-0.6B); - `SPUDocumentTransformer._fill_title_embedding` 已封装了调用 encoder 的逻辑。 **建议缓存策略(可选,但推荐):** diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index d04433b..b4d569b 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -723,7 +723,7 @@ class SPUDocumentTransformer: return try: - # 使用BgeEncoder生成embedding + # 使用文本向量编码器生成 embedding # encode方法返回numpy数组,形状为(n, 1024) embeddings = self.encoder.encode(title_text) @@ -740,4 +740,3 @@ class SPUDocumentTransformer: logger.warning(f"Failed to generate embedding for title: {title_text[:50]}...") except Exception as e: logger.error(f"Error generating title_embedding for SPU {doc.get('spu_id')}: {e}", exc_info=True) - diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 48b88f4..9595dcf 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -80,11 +80,11 @@ class IncrementalIndexerService: # Text embedding encoder (best-effort) if bool(getattr(self._config.query_config, "enable_text_embedding", False)): try: - from embeddings.text_encoder import BgeEncoder + from embeddings.text_encoder import TextEmbeddingEncoder - self._shared_text_encoder = BgeEncoder() + self._shared_text_encoder = TextEmbeddingEncoder() except Exception as e: - logger.warning("Failed to initialize BgeEncoder at startup: %s", e) + logger.warning("Failed to initialize TextEmbeddingEncoder at startup: %s", e) self._shared_text_encoder = None # Image embedding encoder (best-effort; may be unavailable if embedding service not running) @@ -126,13 +126,13 @@ class IncrementalIndexerService: encoder: Optional[Any] = self._shared_text_encoder if enable_embedding else None if enable_embedding and encoder is None: try: - from embeddings.text_encoder import BgeEncoder + from embeddings.text_encoder import TextEmbeddingEncoder - encoder = BgeEncoder() + encoder = TextEmbeddingEncoder() self._shared_text_encoder = encoder - logger.info("BgeEncoder lazily initialized in _get_transformer_bundle") + logger.info("TextEmbeddingEncoder lazily initialized in _get_transformer_bundle") except Exception as e: - logger.warning("Failed to lazily initialize BgeEncoder for tenant_id=%s: %s", tenant_id, e) + logger.warning("Failed to lazily initialize TextEmbeddingEncoder for tenant_id=%s: %s", tenant_id, e) encoder = None enable_embedding = False @@ -790,4 +790,3 @@ class IncrementalIndexerService: "tenant_id": tenant_id } - diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index 5c800fa..397bc0b 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -112,11 +112,11 @@ def create_document_transformer( # 初始化encoder(如果启用标题向量化且未提供encoder) if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding: try: - from embeddings.text_encoder import BgeEncoder - encoder = BgeEncoder() - logger.info("BgeEncoder initialized for title embedding") + from embeddings.text_encoder import TextEmbeddingEncoder + encoder = TextEmbeddingEncoder() + logger.info("TextEmbeddingEncoder initialized for title embedding") except Exception as e: - logger.warning(f"Failed to initialize BgeEncoder: {e}, title embedding will be disabled") + logger.warning(f"Failed to initialize TextEmbeddingEncoder: {e}, title embedding will be disabled") enable_title_embedding = False except Exception as e: logger.warning(f"Failed to load config, using defaults: {e}") @@ -136,4 +136,3 @@ def create_document_transformer( image_encoder=image_encoder, enable_image_embedding=enable_image_embedding, ) - diff --git a/providers/embedding.py b/providers/embedding.py index efc9ada..6aed3db 100644 --- a/providers/embedding.py +++ b/providers/embedding.py @@ -13,9 +13,11 @@ def create_embedding_provider() -> "EmbeddingProvider": """Create embedding provider from services config.""" cfg = get_embedding_config() provider = (cfg.provider or "http").strip().lower() - if provider == "vllm": + if provider != "http": import logging - logging.getLogger(__name__).warning("embedding provider 'vllm' is reserved, using HTTP.") + logging.getLogger(__name__).warning( + "Unsupported embedding provider '%s', fallback to HTTP provider.", provider + ) return EmbeddingProvider() @@ -30,9 +32,9 @@ class EmbeddingProvider: @property def text_encoder(self): - """Lazy-created text encoder (BgeEncoder).""" - from embeddings.text_encoder import BgeEncoder - return BgeEncoder(service_url=self._base_url) + """Lazy-created text encoder (TextEmbeddingEncoder).""" + from embeddings.text_encoder import TextEmbeddingEncoder + return TextEmbeddingEncoder(service_url=self._base_url) @property def image_encoder(self): diff --git a/query/query_parser.py b/query/query_parser.py index d363045..c105b35 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -10,7 +10,7 @@ import logging import re from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from embeddings import BgeEncoder +from embeddings import TextEmbeddingEncoder from config import SearchConfig from .language_detector import LanguageDetector from providers import create_translation_provider @@ -77,7 +77,7 @@ class QueryParser: def __init__( self, config: SearchConfig, - text_encoder: Optional[BgeEncoder] = None, + text_encoder: Optional[TextEmbeddingEncoder] = None, translator: Optional[Any] = None ): """ @@ -115,11 +115,11 @@ class QueryParser: logger.info("HanLP not installed; using simple tokenizer") @property - def text_encoder(self) -> BgeEncoder: + def text_encoder(self) -> TextEmbeddingEncoder: """Lazy load text encoder.""" if self._text_encoder is None and self.config.query_config.enable_text_embedding: logger.info("Initializing text encoder (lazy load)...") - self._text_encoder = BgeEncoder() + self._text_encoder = TextEmbeddingEncoder() return self._text_encoder @property @@ -196,9 +196,9 @@ class QueryParser: ParsedQuery object with all processing results """ # Initialize logger if context provided - logger = context.logger if context else None - if logger: - logger.info( + active_logger = context.logger if context else logger + if context and hasattr(context, "logger"): + context.logger.info( f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}", extra={'reqid': context.reqid, 'uid': context.uid} ) @@ -207,13 +207,13 @@ class QueryParser: if context and hasattr(context, 'logger'): context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: - logger.info(msg) + active_logger.info(msg) def log_debug(msg): if context and hasattr(context, 'logger'): context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: - logger.debug(msg) + active_logger.debug(msg) # Stage 1: Normalize normalized = self.normalizer.normalize(query) diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index ffe522a..ca8e196 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -427,7 +427,7 @@ def test_indexer_index_validation_max_delete_spu_ids(indexer_client: TestClient) class _FakeTextModel: - def encode_batch(self, texts, batch_size=32, device="cpu"): + def encode_batch(self, texts, batch_size=32, device="cpu", normalize_embeddings=True): return [np.array([0.1, 0.2, 0.3], dtype=np.float32) for _ in texts] @@ -437,29 +437,24 @@ class _FakeImageModel: @pytest.fixture -def embedding_client(): +def embedding_module(): import embeddings.server as emb_server emb_server.app.router.on_startup.clear() emb_server._text_model = _FakeTextModel() emb_server._image_model = _FakeImageModel() - - with TestClient(emb_server.app) as client: - yield client + yield emb_server -def test_embedding_text_contract(embedding_client: TestClient): - response = embedding_client.post("/embed/text", json=["hello", "world"]) - assert response.status_code == 200 - data = response.json() +def test_embedding_text_contract(embedding_module): + data = embedding_module.embed_text(["hello", "world"]) assert len(data) == 2 assert len(data[0]) == 3 -def test_embedding_image_contract(embedding_client: TestClient): - response = embedding_client.post("/embed/image", json=["https://example.com/a.jpg"]) - assert response.status_code == 200 - assert len(response.json()[0]) == 3 +def test_embedding_image_contract(embedding_module): + data = embedding_module.embed_image(["https://example.com/a.jpg"]) + assert len(data[0]) == 3 class _FakeTranslator: diff --git a/tests/test_cloud_embedding.py b/tests/test_cloud_embedding.py deleted file mode 100644 index dc7c381..0000000 --- a/tests/test_cloud_embedding.py +++ /dev/null @@ -1,186 +0,0 @@ -""" -Test script for cloud text embedding using Aliyun DashScope API. - -Reads queries from queries.txt and tests embedding generation, -logging send time, receive time, and duration for each request. -""" - -import os -import sys -import time -from datetime import datetime -from pathlib import Path - -import pytest - -# Add parent directory to path -sys.path.insert(0, str(Path(__file__).parent.parent)) - -from embeddings.cloud_text_encoder import CloudTextEncoder - - -def format_timestamp(ts: float) -> str: - """Format timestamp to readable string.""" - return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - - -def read_queries(file_path: str, limit: int = 100) -> list: - """ - Read queries from text file. - - Args: - file_path: Path to queries file - limit: Maximum number of queries to read - - Returns: - List of query strings - """ - queries = [] - with open(file_path, 'r', encoding='utf-8') as f: - for i, line in enumerate(f): - if i >= limit: - break - query = line.strip() - if query: # Skip empty lines - queries.append(query) - return queries - - -@pytest.mark.skip(reason="Requires data file and DASHSCOPE_API_KEY; run manually when needed") -def test_cloud_embedding(queries_file: str, num_queries: int = 100): - """ - Test cloud embedding with queries from file. - - Args: - queries_file: Path to queries file - num_queries: Number of queries to test - """ - print("=" * 80) - print("Cloud Text Embedding Test - Aliyun DashScope API") - print("=" * 80) - print() - - # Check if API key is set - api_key = os.getenv("DASHSCOPE_API_KEY") - if not api_key: - print("ERROR: DASHSCOPE_API_KEY environment variable is not set!") - print("Please set it using: export DASHSCOPE_API_KEY='your-api-key'") - return - - print(f"API Key: {api_key[:10]}...{api_key[-4:]}") - print() - - # Read queries - print(f"Reading queries from: {queries_file}") - try: - queries = read_queries(queries_file, limit=num_queries) - print(f"Successfully read {len(queries)} queries") - print() - except Exception as e: - print(f"ERROR: Failed to read queries file: {e}") - return - - # Initialize encoder - print("Initializing CloudTextEncoder...") - try: - encoder = CloudTextEncoder() - print("CloudTextEncoder initialized successfully") - print() - except Exception as e: - print(f"ERROR: Failed to initialize encoder: {e}") - return - - # Test embeddings - print("=" * 80) - print(f"Testing {len(queries)} queries (one by one)") - print("=" * 80) - print() - - total_start = time.time() - success_count = 0 - failure_count = 0 - total_duration = 0.0 - - for i, query in enumerate(queries, 1): - try: - # Record send time - send_time = time.time() - send_time_str = format_timestamp(send_time) - - # Generate embedding - embedding = encoder.encode(query) - - # Record receive time - receive_time = time.time() - receive_time_str = format_timestamp(receive_time) - - # Calculate duration - duration = receive_time - send_time - total_duration += duration - - # Verify embedding - if embedding.shape[0] > 0: - success_count += 1 - status = "✓ SUCCESS" - else: - failure_count += 1 - status = "✗ FAILED" - - # Print result - query_display = query[:50] + "..." if len(query) > 50 else query - print(f"[{i:3d}/{len(queries)}] {status}") - print(f" Query: {query_display}") - print(f" Send Time: {send_time_str}") - print(f" Receive Time: {receive_time_str}") - print(f" Duration: {duration:.3f}s") - print(f" Embedding Shape: {embedding.shape}") - print() - - except Exception as e: - failure_count += 1 - receive_time = time.time() - duration = receive_time - send_time - - print(f"[{i:3d}/{len(queries)}] ✗ ERROR") - print(f" Query: {query[:50]}...") - print(f" Send Time: {send_time_str}") - print(f" Receive Time: {format_timestamp(receive_time)}") - print(f" Duration: {duration:.3f}s") - print(f" Error: {str(e)}") - print() - - # Print summary - total_elapsed = time.time() - total_start - avg_duration = total_duration / len(queries) if queries else 0 - - print("=" * 80) - print("Test Summary") - print("=" * 80) - print(f"Total Queries: {len(queries)}") - print(f"Successful: {success_count}") - print(f"Failed: {failure_count}") - print(f"Success Rate: {success_count / len(queries) * 100:.1f}%") - print(f"Total Time: {total_elapsed:.3f}s") - print(f"Total API Time: {total_duration:.3f}s") - print(f"Average Duration: {avg_duration:.3f}s per query") - print(f"Throughput: {len(queries) / total_elapsed:.2f} queries/second") - print("=" * 80) - - -def main(): - """Main entry point.""" - # Default queries file path - queries_file = Path(__file__).parent.parent / "data_crawling" / "queries.txt" - - # Check if file exists - if not queries_file.exists(): - print(f"ERROR: Queries file not found: {queries_file}") - return - - # Run test with 100 queries - test_cloud_embedding(str(queries_file), num_queries=100) - - -if __name__ == "__main__": - main() - diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py new file mode 100644 index 0000000..5558c6a --- /dev/null +++ b/tests/test_embedding_pipeline.py @@ -0,0 +1,159 @@ +import pickle +from typing import Any, Dict, List, Optional + +import numpy as np + +from config import ( + FunctionScoreConfig, + IndexConfig, + QueryConfig, + RankingConfig, + RerankConfig, + SPUConfig, + SearchConfig, +) +from embeddings.text_encoder import TextEmbeddingEncoder +from query import QueryParser + + +class _FakeRedis: + def __init__(self): + self.store: Dict[str, bytes] = {} + + def ping(self): + return True + + def get(self, key: str): + return self.store.get(key) + + def setex(self, key: str, _expire, value: bytes): + self.store[key] = value + return True + + def expire(self, key: str, _expire): + return key in self.store + + def delete(self, key: str): + self.store.pop(key, None) + return True + + +class _FakeResponse: + def __init__(self, payload: List[Optional[List[float]]]): + self._payload = payload + + def raise_for_status(self): + return None + + def json(self): + return self._payload + + +class _FakeTranslator: + def translate( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + prompt: Optional[str] = None, + ) -> str: + return f"{text}-{target_lang}" + + +class _FakeQueryEncoder: + def encode(self, sentences, **kwargs): + if isinstance(sentences, str): + sentences = [sentences] + return np.array([np.array([0.11, 0.22, 0.33], dtype=np.float32) for _ in sentences], dtype=object) + + +def _build_test_config() -> SearchConfig: + return SearchConfig( + field_boosts={"title.en": 3.0}, + indexes=[IndexConfig(name="default", label="default", fields=["title.en"], boost=1.0)], + query_config=QueryConfig( + supported_languages=["en", "zh"], + default_language="en", + enable_text_embedding=True, + enable_query_rewrite=False, + enable_multilang_search=True, + rewrite_dictionary={}, + translation_prompts={"query_zh": "e-commerce domain", "query_en": "e-commerce domain"}, + text_embedding_field="title_embedding", + image_embedding_field=None, + ), + ranking=RankingConfig(expression="bm25()", description="test"), + function_score=FunctionScoreConfig(), + rerank=RerankConfig(), + spu_config=SPUConfig(enabled=True, spu_field="spu_id", inner_hits_size=3), + es_index_name="test_products", + tenant_config={}, + es_settings={}, + services={}, + ) + + +def test_text_embedding_encoder_response_alignment(monkeypatch): + fake_redis = _FakeRedis() + monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis) + + def _fake_post(url, json, timeout): + assert url.endswith("/embed/text") + assert json == ["hello", "world"] + return _FakeResponse([[0.1, 0.2], None]) + + monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) + + encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005") + out = encoder.encode(["hello", "world"]) + + assert len(out) == 2 + assert isinstance(out[0], np.ndarray) + assert out[0].shape == (2,) + assert out[1] is None + + +def test_text_embedding_encoder_cache_hit(monkeypatch): + fake_redis = _FakeRedis() + cached = np.array([0.9, 0.8], dtype=np.float32) + fake_redis.store["embedding:generic:cached-text"] = pickle.dumps(cached) + monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis) + + calls = {"count": 0} + + def _fake_post(url, json, timeout): + calls["count"] += 1 + return _FakeResponse([[0.3, 0.4]]) + + monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) + + encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005") + out = encoder.encode(["cached-text", "new-text"]) + + assert calls["count"] == 1 + assert np.allclose(out[0], cached) + assert np.allclose(out[1], np.array([0.3, 0.4], dtype=np.float32)) + + +def test_query_parser_generates_query_vector_with_encoder(): + parser = QueryParser( + config=_build_test_config(), + text_encoder=_FakeQueryEncoder(), + translator=_FakeTranslator(), + ) + + parsed = parser.parse("red dress", tenant_id="162", generate_vector=True) + assert parsed.query_vector is not None + assert parsed.query_vector.shape == (3,) + + +def test_query_parser_skips_query_vector_when_disabled(): + parser = QueryParser( + config=_build_test_config(), + text_encoder=_FakeQueryEncoder(), + translator=_FakeTranslator(), + ) + + parsed = parser.parse("red dress", tenant_id="162", generate_vector=False) + assert parsed.query_vector is None + diff --git a/third-party/xinference/activate.sh b/third-party/xinference/activate.sh deleted file mode 100755 index ab38170..0000000 --- a/third-party/xinference/activate.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -CONDA_ROOT="${CONDA_ROOT:-/home/tw/miniconda3}" -source "$CONDA_ROOT/etc/profile.d/conda.sh" -conda activate xinference diff --git a/third-party/xinference/perfermance_test.py b/third-party/xinference/perfermance_test.py deleted file mode 100644 index a5b492a..0000000 --- a/third-party/xinference/perfermance_test.py +++ /dev/null @@ -1,249 +0,0 @@ -import openai -import time -import statistics -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List, Dict, Tuple -import json - - -class EmbeddingPerformanceTester: - def __init__(self, base_url: str = "http://127.0.0.1:9997/v1"): - """初始化性能测试器""" - self.client = openai.Client( - api_key="cannot be empty", # 实际使用时请替换为真实API Key - base_url=base_url - ) - - def test_single_request(self, model: str, input_text: List[str]) -> Tuple[bool, float]: - """测试单个请求,返回成功状态和耗时""" - try: - start_time = time.perf_counter() - response = self.client.embeddings.create( - model=model, - input=input_text - ) - end_time = time.perf_counter() - - # 验证响应格式 - if response and hasattr(response, 'data'): - return True, end_time - start_time - else: - return False, end_time - start_time - - except Exception as e: - print(f"请求失败 - 模型 {model}: {str(e)}") - return False, 0.0 - - def test_model_sequential(self, model: str, input_text: List[str], - iterations: int = 1000) -> Dict: - """顺序执行性能测试""" - print(f"\n开始顺序测试模型: {model}") - print(f"测试次数: {iterations}") - - successes = 0 - failures = 0 - latencies = [] - - for i in range(iterations): - if i % 100 == 0 and i > 0: - print(f" 已完成 {i}/{iterations} 次请求...") - - success, latency = self.test_single_request(model, input_text) - - if success: - successes += 1 - latencies.append(latency) - else: - failures += 1 - - return self._calculate_stats(model, successes, failures, latencies) - - def test_model_concurrent(self, model: str, input_text: List[str], - iterations: int = 1000, max_workers: int = 10) -> Dict: - """并发执行性能测试""" - print(f"\n开始并发测试模型: {model}") - print(f"测试次数: {iterations}, 并发数: {max_workers}") - - successes = 0 - failures = 0 - latencies = [] - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # 提交所有任务 - future_to_request = { - executor.submit(self.test_single_request, model, input_text): i - for i in range(iterations) - } - - # 收集结果 - completed = 0 - for future in as_completed(future_to_request): - completed += 1 - if completed % 100 == 0: - print(f" 已完成 {completed}/{iterations} 次请求...") - - try: - success, latency = future.result() - if success: - successes += 1 - latencies.append(latency) - else: - failures += 1 - except Exception as e: - print(f"请求异常: {str(e)}") - failures += 1 - - return self._calculate_stats(model, successes, failures, latencies) - - def _calculate_stats(self, model: str, successes: int, - failures: int, latencies: List[float]) -> Dict: - """计算性能统计信息""" - if not latencies: - return { - "model": model, - "total_requests": successes + failures, - "successful_requests": successes, - "failed_requests": failures, - "success_rate": 0.0, - "error": "无成功请求" - } - - stats = { - "model": model, - "total_requests": successes + failures, - "successful_requests": successes, - "failed_requests": failures, - "success_rate": successes / (successes + failures) * 100, - "total_time": sum(latencies), - "avg_latency": statistics.mean(latencies), - "min_latency": min(latencies), - "max_latency": max(latencies), - "p50_latency": statistics.median(latencies), - "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)], - "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)], - "requests_per_second": len(latencies) / sum(latencies) if sum(latencies) > 0 else 0 - } - - # 添加标准差(如果有多于一个样本) - if len(latencies) > 1: - stats["std_dev"] = statistics.stdev(latencies) - - return stats - - def print_results(self, results: Dict): - """打印测试结果""" - print("\n" + "="*60) - print(f"性能测试结果 - {results['model']}") - print("="*60) - - if "error" in results: - print(f"错误: {results['error']}") - return - - print(f"总请求数: {results['total_requests']}") - print(f"成功请求: {results['successful_requests']}") - print(f"失败请求: {results['failed_requests']}") - print(f"成功率: {results['success_rate']:.2f}%") - print(f"总耗时: {results['total_time']:.4f}秒") - print(f"平均延迟: {results['avg_latency']:.4f}秒") - print(f"最小延迟: {results['min_latency']:.4f}秒") - print(f"最大延迟: {results['max_latency']:.4f}秒") - print(f"P50延迟: {results['p50_latency']:.4f}秒") - print(f"P95延迟: {results['p95_latency']:.4f}秒") - print(f"P99延迟: {results['p99_latency']:.4f}秒") - - if "std_dev" in results: - print(f"标准差: {results['std_dev']:.4f}秒") - - print(f"QPS: {results['requests_per_second']:.2f} 请求/秒") - print("="*60) - - def save_results(self, results_list: List[Dict], filename: str = "performance_results.json"): - """保存测试结果到JSON文件""" - with open(filename, 'w', encoding='utf-8') as f: - json.dump(results_list, f, indent=2, ensure_ascii=False) - print(f"\n结果已保存到: {filename}") - - -def main(): - """主函数""" - # 初始化测试器 - tester = EmbeddingPerformanceTester() - - # 测试配置 - test_input = ["What is the capital of China?"] - iterations = 1000 - test_models = ['bge-m3', 'Qwen3-Embedding-0.6B'] - - print("="*60) - print("Embedding API 性能测试") - print("="*60) - - all_results = [] - - # 测试模式选择 - print("\n选择测试模式:") - print("1. 顺序测试 (Sequential)") - print("2. 并发测试 (Concurrent)") - print("3. 两种模式都测试") - - mode = input("请输入选择 (1/2/3, 默认1): ").strip() - - for model in test_models: - print(f"\n{'='*60}") - print(f"测试模型: {model}") - print(f"{'='*60}") - - if mode in ['2', '3']: - # 并发测试 - concurrent_results = tester.test_model_concurrent( - model=model, - input_text=test_input, - iterations=iterations, - max_workers=10 # 可根据需要调整并发数 - ) - tester.print_results(concurrent_results) - concurrent_results["test_mode"] = "concurrent" - all_results.append(concurrent_results) - - if mode in ['1', '3'] or not mode: - # 顺序测试 - sequential_results = tester.test_model_sequential( - model=model, - input_text=test_input, - iterations=iterations - ) - tester.print_results(sequential_results) - sequential_results["test_mode"] = "sequential" - all_results.append(sequential_results) - - # 保存结果 - tester.save_results(all_results) - - # 汇总对比 - print("\n" + "="*60) - print("性能测试汇总对比") - print("="*60) - - for result in all_results: - if "error" not in result: - print(f"\n模型: {result['model']} ({result['test_mode']})") - print(f" QPS: {result['requests_per_second']:.2f}") - print(f" 平均延迟: {result['avg_latency']:.4f}秒") - print(f" 成功率: {result['success_rate']:.2f}%") - - -if __name__ == "__main__": - # 添加一个简单的健康检查 - try: - tester = EmbeddingPerformanceTester() - # 快速测试连接 - test_result = tester.test_single_request('bge-m3', ["test"]) - if test_result[0]: - print("API连接正常,开始性能测试...") - main() - else: - print("API连接失败,请检查服务是否正常运行") - except Exception as e: - print(f"初始化失败: {str(e)}") - print("请确保OpenAI客户端已安装: pip install openai") diff --git a/third-party/xinference/perfermance_test_http.py b/third-party/xinference/perfermance_test_http.py deleted file mode 100644 index 590035a..0000000 --- a/third-party/xinference/perfermance_test_http.py +++ /dev/null @@ -1,265 +0,0 @@ -import requests -import time -import statistics -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List, Dict, Tuple -import json - - -class EmbeddingPerformanceTester: - def __init__(self, base_url: str = "http://127.0.0.1:9997/v1"): - """初始化性能测试器""" - self.base_url = base_url - self.embeddings_url = f"{base_url}/embeddings" - - def test_single_request(self, model: str, input_text: List[str]) -> Tuple[bool, float]: - """测试单个请求,返回成功状态和耗时""" - try: - start_time = time.perf_counter() - - # 构建请求体 - # 如果 input_text 是列表,取第一个元素;如果是字符串,直接使用 - input_value = input_text[0] if isinstance(input_text, list) and len(input_text) > 0 else input_text - - response = requests.post( - self.embeddings_url, - headers={ - 'accept': 'application/json', - 'Content-Type': 'application/json' - }, - json={ - "model": model, - "input": input_value - }, - timeout=60 # 设置超时时间 - ) - - end_time = time.perf_counter() - - # 验证响应格式 - if response.status_code == 200: - result = response.json() - if result and 'data' in result and len(result['data']) > 0: - return True, end_time - start_time - else: - return False, end_time - start_time - else: - return False, end_time - start_time - - except Exception as e: - print(f"请求失败 - 模型 {model}: {str(e)}") - return False, 0.0 - - def test_model_sequential(self, model: str, input_text: List[str], - iterations: int = 1000) -> Dict: - """顺序执行性能测试""" - print(f"\n开始顺序测试模型: {model}") - print(f"测试次数: {iterations}") - - successes = 0 - failures = 0 - latencies = [] - - for i in range(iterations): - if i % 100 == 0 and i > 0: - print(f" 已完成 {i}/{iterations} 次请求...") - - success, latency = self.test_single_request(model, input_text) - - if success: - successes += 1 - latencies.append(latency) - else: - failures += 1 - - return self._calculate_stats(model, successes, failures, latencies) - - def test_model_concurrent(self, model: str, input_text: List[str], - iterations: int = 1000, max_workers: int = 10) -> Dict: - """并发执行性能测试""" - print(f"\n开始并发测试模型: {model}") - print(f"测试次数: {iterations}, 并发数: {max_workers}") - - successes = 0 - failures = 0 - latencies = [] - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # 提交所有任务 - future_to_request = { - executor.submit(self.test_single_request, model, input_text): i - for i in range(iterations) - } - - # 收集结果 - completed = 0 - for future in as_completed(future_to_request): - completed += 1 - if completed % 100 == 0: - print(f" 已完成 {completed}/{iterations} 次请求...") - - try: - success, latency = future.result() - if success: - successes += 1 - latencies.append(latency) - else: - failures += 1 - except Exception as e: - print(f"请求异常: {str(e)}") - failures += 1 - - return self._calculate_stats(model, successes, failures, latencies) - - def _calculate_stats(self, model: str, successes: int, - failures: int, latencies: List[float]) -> Dict: - """计算性能统计信息""" - if not latencies: - return { - "model": model, - "total_requests": successes + failures, - "successful_requests": successes, - "failed_requests": failures, - "success_rate": 0.0, - "error": "无成功请求" - } - - stats = { - "model": model, - "total_requests": successes + failures, - "successful_requests": successes, - "failed_requests": failures, - "success_rate": successes / (successes + failures) * 100, - "total_time": sum(latencies), - "avg_latency": statistics.mean(latencies), - "min_latency": min(latencies), - "max_latency": max(latencies), - "p50_latency": statistics.median(latencies), - "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)], - "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)], - "requests_per_second": len(latencies) / sum(latencies) if sum(latencies) > 0 else 0 - } - - # 添加标准差(如果有多于一个样本) - if len(latencies) > 1: - stats["std_dev"] = statistics.stdev(latencies) - - return stats - - def print_results(self, results: Dict): - """打印测试结果""" - print("\n" + "="*60) - print(f"性能测试结果 - {results['model']}") - print("="*60) - - if "error" in results: - print(f"错误: {results['error']}") - return - - print(f"总请求数: {results['total_requests']}") - print(f"成功请求: {results['successful_requests']}") - print(f"失败请求: {results['failed_requests']}") - print(f"成功率: {results['success_rate']:.2f}%") - print(f"总耗时: {results['total_time']:.4f}秒") - print(f"平均延迟: {results['avg_latency']:.4f}秒") - print(f"最小延迟: {results['min_latency']:.4f}秒") - print(f"最大延迟: {results['max_latency']:.4f}秒") - print(f"P50延迟: {results['p50_latency']:.4f}秒") - print(f"P95延迟: {results['p95_latency']:.4f}秒") - print(f"P99延迟: {results['p99_latency']:.4f}秒") - - if "std_dev" in results: - print(f"标准差: {results['std_dev']:.4f}秒") - - print(f"QPS: {results['requests_per_second']:.2f} 请求/秒") - print("="*60) - - def save_results(self, results_list: List[Dict], filename: str = "performance_results.json"): - """保存测试结果到JSON文件""" - with open(filename, 'w', encoding='utf-8') as f: - json.dump(results_list, f, indent=2, ensure_ascii=False) - print(f"\n结果已保存到: {filename}") - - -def main(): - """主函数""" - # 初始化测试器 - tester = EmbeddingPerformanceTester() - - # 测试配置 - test_input = ["What is the capital of China?"] - iterations = 1000 - test_models = ['bge-m3', 'Qwen3-Embedding-0.6B'] - - print("="*60) - print("Embedding API 性能测试 (HTTP)") - print("="*60) - - all_results = [] - - # 测试模式选择 - print("\n选择测试模式:") - print("1. 顺序测试 (Sequential)") - print("2. 并发测试 (Concurrent)") - print("3. 两种模式都测试") - - mode = input("请输入选择 (1/2/3, 默认1): ").strip() - - for model in test_models: - print(f"\n{'='*60}") - print(f"测试模型: {model}") - print(f"{'='*60}") - - if mode in ['2', '3']: - # 并发测试 - concurrent_results = tester.test_model_concurrent( - model=model, - input_text=test_input, - iterations=iterations, - max_workers=10 # 可根据需要调整并发数 - ) - tester.print_results(concurrent_results) - concurrent_results["test_mode"] = "concurrent" - all_results.append(concurrent_results) - - if mode in ['1', '3'] or not mode: - # 顺序测试 - sequential_results = tester.test_model_sequential( - model=model, - input_text=test_input, - iterations=iterations - ) - tester.print_results(sequential_results) - sequential_results["test_mode"] = "sequential" - all_results.append(sequential_results) - - # 保存结果 - tester.save_results(all_results) - - # 汇总对比 - print("\n" + "="*60) - print("性能测试汇总对比") - print("="*60) - - for result in all_results: - if "error" not in result: - print(f"\n模型: {result['model']} ({result['test_mode']})") - print(f" QPS: {result['requests_per_second']:.2f}") - print(f" 平均延迟: {result['avg_latency']:.4f}秒") - print(f" 成功率: {result['success_rate']:.2f}%") - - -if __name__ == "__main__": - # 添加一个简单的健康检查 - try: - tester = EmbeddingPerformanceTester() - # 快速测试连接 - test_result = tester.test_single_request('bge-m3', ["test"]) - if test_result[0]: - print("API连接正常,开始性能测试...") - main() - else: - print("API连接失败,请检查服务是否正常运行") - except Exception as e: - print(f"初始化失败: {str(e)}") - print("请确保 requests 库已安装: pip install requests") \ No newline at end of file diff --git a/third-party/xinference/perfermance_test_single.py b/third-party/xinference/perfermance_test_single.py deleted file mode 100644 index e89359a..0000000 --- a/third-party/xinference/perfermance_test_single.py +++ /dev/null @@ -1,108 +0,0 @@ -import openai -import time -import requests -import json - - -client = openai.Client( - api_key="cannot be empty", - base_url="http://127.0.0.1:9997/v1" -) - -# 记录开始时间 -start_time = time.time() - -a = client.embeddings.create( - model='bge-m3', - input=["What is the capital of China?"] -) - -# 记录结束时间 -end_time = time.time() - -#print(a) -print(f"\n耗时: {end_time - start_time:.4f} 秒") - -# 记录开始时间 -start_time = time.time() - -a = client.embeddings.create( - model='Qwen3-Embedding-0.6B', - input=["What is the capital of China?"] -) - -# 记录结束时间 -end_time = time.time() - -#print(a) -print(f"\n耗时: {end_time - start_time:.4f} 秒") - -# ========== HTTP API 测试 ========== -print("\n" + "="*50) -print("HTTP API 测试") -print("="*50) - -# 配置 -XINFERENCE_HOST = "127.0.0.1" -XINFERENCE_PORT = "9997" -base_url = f"http://{XINFERENCE_HOST}:{XINFERENCE_PORT}/v1/embeddings" - -# 测试 bge-m3 模型 -print("\n测试模型: bge-m3") -start_time = time.time() - -response = requests.post( - base_url, - headers={ - 'accept': 'application/json', - 'Content-Type': 'application/json' - }, - json={ - "model": "bge-m3", - "input": "What is the capital of China?" - } -) - -end_time = time.time() - -if response.status_code == 200: - result = response.json() - print(f"状态码: {response.status_code}") - print(f"模型: {result.get('model', 'N/A')}") - print(f"使用token数: {result.get('usage', {}).get('total_tokens', 'N/A')}") - print(f"嵌入向量维度: {len(result.get('data', [{}])[0].get('embedding', []))}") - print(f"耗时: {end_time - start_time:.4f} 秒") -else: - print(f"请求失败,状态码: {response.status_code}") - print(f"错误信息: {response.text}") - -# 测试 Qwen3-Embedding-0.6B 模型 -print("\n测试模型: Qwen3-Embedding-0.6B") -start_time = time.time() - -response = requests.post( - base_url, - headers={ - 'accept': 'application/json', - 'Content-Type': 'application/json' - }, - json={ - "model": "Qwen3-Embedding-0.6B", - "input": "What is the capital of China?" - } -) - -end_time = time.time() - -if response.status_code == 200: - result = response.json() - print(f"状态码: {response.status_code}") - print(f"模型: {result.get('model', 'N/A')}") - print(f"使用token数: {result.get('usage', {}).get('total_tokens', 'N/A')}") - print(f"嵌入向量维度: {len(result.get('data', [{}])[0].get('embedding', []))}") - print(f"耗时: {end_time - start_time:.4f} 秒") -else: - print(f"请求失败,状态码: {response.status_code}") - print(f"错误信息: {response.text}") - - diff --git a/third-party/xinference/stop.sh b/third-party/xinference/stop.sh deleted file mode 100644 index e9b4c2e..0000000 --- a/third-party/xinference/stop.sh +++ /dev/null @@ -1,2 +0,0 @@ - -xinference terminate --model-uid Qwen3-Reranker-0.6B --model-uid bge-m3 --model-uid Qwen3-Embedding-0.6B diff --git a/third-party/xinference/test.sh b/third-party/xinference/test.sh deleted file mode 100644 index 4a2b4f9..0000000 --- a/third-party/xinference/test.sh +++ /dev/null @@ -1,11 +0,0 @@ -if [ "$CONDA_DEFAULT_ENV" != "tw" ]; then - echo "当前环境不是 tw,正在激活 tw 环境..." - CONDA_ROOT="${CONDA_ROOT:-/home/tw/miniconda3}" - source "$CONDA_ROOT/etc/profile.d/conda.sh" - conda activate tw - echo "已激活 tw 环境" -else - echo "当前已经在 tw 环境中,无需重复激活" -fi - -python perfermance_test_single.py diff --git a/third-party/xinference/测试结果-perfermance_test.txt b/third-party/xinference/测试结果-perfermance_test.txt deleted file mode 100644 index ef43a66..0000000 --- a/third-party/xinference/测试结果-perfermance_test.txt +++ /dev/null @@ -1,166 +0,0 @@ -============================================================ -Embedding API 性能测试 -============================================================ - -选择测试模式: -1. 顺序测试 (Sequential) -2. 并发测试 (Concurrent) -3. 两种模式都测试 -请输入选择 (1/2/3, 默认1): 3 - -============================================================ -测试模型: bge-m3 -============================================================ - -开始并发测试模型: bge-m3 -测试次数: 1000, 并发数: 10 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - 已完成 1000/1000 次请求... - -============================================================ -性能测试结果 - bge-m3 -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 212.9068秒 -平均延迟: 0.2129秒 -最小延迟: 0.0507秒 -最大延迟: 0.6196秒 -P50延迟: 0.0942秒 -P95延迟: 0.5580秒 -P99延迟: 0.5884秒 -标准差: 0.2010秒 -QPS: 4.70 请求/秒 -============================================================ - -开始顺序测试模型: bge-m3 -测试次数: 1000 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - -============================================================ -性能测试结果 - bge-m3 -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 81.7646秒 -平均延迟: 0.0818秒 -最小延迟: 0.0328秒 -最大延迟: 0.5812秒 -P50延迟: 0.0347秒 -P95延迟: 0.4893秒 -P99延迟: 0.5047秒 -标准差: 0.1377秒 -QPS: 12.23 请求/秒 -============================================================ - -============================================================ -测试模型: Qwen3-Embedding-0.6B -============================================================ - -开始并发测试模型: Qwen3-Embedding-0.6B -测试次数: 1000, 并发数: 10 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - 已完成 1000/1000 次请求... - -============================================================ -性能测试结果 - Qwen3-Embedding-0.6B -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 210.1917秒 -平均延迟: 0.2102秒 -最小延迟: 0.0651秒 -最大延迟: 0.6659秒 -P50延迟: 0.1123秒 -P95延迟: 0.5845秒 -P99延迟: 0.6210秒 -标准差: 0.1877秒 -QPS: 4.76 请求/秒 -============================================================ - -开始顺序测试模型: Qwen3-Embedding-0.6B -测试次数: 1000 - 已完成 100/1000 次请求... - - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - -============================================================ -性能测试结果 - Qwen3-Embedding-0.6B -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 109.9795秒 -平均延迟: 0.1100秒 -最小延迟: 0.0571秒 -最大延迟: 0.5806秒 -P50延迟: 0.0600秒 -P95延迟: 0.5648秒 -P99延迟: 0.5745秒 -标准差: 0.1494秒 -QPS: 9.09 请求/秒 -============================================================ - -结果已保存到: performance_results.json - -============================================================ -性能测试汇总对比 -============================================================ - -模型: bge-m3 (concurrent) - QPS: 4.70 - 平均延迟: 0.2129秒 - 成功率: 100.00% - -模型: bge-m3 (sequential) - QPS: 12.23 - 平均延迟: 0.0818秒 - 成功率: 100.00% - -模型: Qwen3-Embedding-0.6B (concurrent) - QPS: 4.76 - 平均延迟: 0.2102秒 - 成功率: 100.00% - -模型: Qwen3-Embedding-0.6B (sequential) - QPS: 9.09 - 平均延迟: 0.1100秒 - 成功率: 100.00% diff --git a/third-party/xinference/测试结果-perfermance_test_http.txt b/third-party/xinference/测试结果-perfermance_test_http.txt deleted file mode 100644 index 3f63bb4..0000000 --- a/third-party/xinference/测试结果-perfermance_test_http.txt +++ /dev/null @@ -1,167 +0,0 @@ -$ p perfermance_test_http.py -API连接正常,开始性能测试... -============================================================ -Embedding API 性能测试 (HTTP) -============================================================ - -选择测试模式: -1. 顺序测试 (Sequential) -2. 并发测试 (Concurrent) -3. 两种模式都测试 -请输入选择 (1/2/3, 默认1): 3 - -============================================================ -测试模型: bge-m3 -============================================================ - -开始并发测试模型: bge-m3 -测试次数: 1000, 并发数: 10 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - 已完成 1000/1000 次请求... - -============================================================ -性能测试结果 - bge-m3 -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 145.1439秒 -平均延迟: 0.1451秒 -最小延迟: 0.0311秒 -最大延迟: 0.5770秒 -P50延迟: 0.0599秒 -P95延迟: 0.5151秒 -P99延迟: 0.5704秒 -标准差: 0.1789秒 -QPS: 6.89 请求/秒 -============================================================ - -开始顺序测试模型: bge-m3 -测试次数: 1000 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - -============================================================ -性能测试结果 - bge-m3 -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 74.5284秒 -平均延迟: 0.0745秒 -最小延迟: 0.0271秒 -最大延迟: 0.5767秒 -P50延迟: 0.0286秒 -P95延迟: 0.4797秒 -P99延迟: 0.5037秒 -标准差: 0.1364秒 -QPS: 13.42 请求/秒 -============================================================ - -============================================================ -测试模型: Qwen3-Embedding-0.6B -============================================================ - -开始并发测试模型: Qwen3-Embedding-0.6B -测试次数: 1000, 并发数: 10 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - 已完成 1000/1000 次请求... - -============================================================ -性能测试结果 - Qwen3-Embedding-0.6B -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 195.7997秒 -平均延迟: 0.1958秒 -最小延迟: 0.0564秒 -最大延迟: 0.6201秒 -P50延迟: 0.1050秒 -P95延迟: 0.5674秒 -P99延迟: 0.5994秒 -标准差: 0.1829秒 -QPS: 5.11 请求/秒 -============================================================ - -开始顺序测试模型: Qwen3-Embedding-0.6B -测试次数: 1000 - 已完成 100/1000 次请求... - 已完成 200/1000 次请求... - 已完成 300/1000 次请求... - 已完成 400/1000 次请求... - 已完成 500/1000 次请求... - 已完成 600/1000 次请求... - 已完成 700/1000 次请求... - 已完成 800/1000 次请求... - 已完成 900/1000 次请求... - -============================================================ -性能测试结果 - Qwen3-Embedding-0.6B -============================================================ -总请求数: 1000 -成功请求: 1000 -失败请求: 0 -成功率: 100.00% -总耗时: 100.2533秒 -平均延迟: 0.1003秒 -最小延迟: 0.0513秒 -最大延迟: 0.6249秒 -P50延迟: 0.0539秒 -P95延迟: 0.4993秒 -P99延迟: 0.5180秒 -标准差: 0.1354秒 -QPS: 9.97 请求/秒 -============================================================ - -结果已保存到: performance_results.json - -============================================================ -性能测试汇总对比 -============================================================ - -模型: bge-m3 (concurrent) - QPS: 6.89 - 平均延迟: 0.1451秒 - 成功率: 100.00% - -模型: bge-m3 (sequential) - QPS: 13.42 - 平均延迟: 0.0745秒 - 成功率: 100.00% - -模型: Qwen3-Embedding-0.6B (concurrent) - QPS: 5.11 - 平均延迟: 0.1958秒 - 成功率: 100.00% - -模型: Qwen3-Embedding-0.6B (sequential) - QPS: 9.97 - 平均延迟: 0.1003秒 - 成功率: 100.00% -- libgit2 0.21.2