向量化模块和API说明文档
本文档详细说明SearchEngine项目中的向量化模块架构、API接口、配置方法和使用指南。
目录
-
- 4.1 阿里云DashScope
- 4.2 API Key配置
- 4.3 使用方式
概述
1.1 向量化模块简介
SearchEngine项目实现了完整的文本和图片向量化能力,支持两种部署方式:
- 本地向量化服务:独立部署的微服务,基于本地GPU/CPU运行BGE-M3和CN-CLIP模型
- 云端向量化服务:集成阿里云DashScope API,按使用量付费
向量化模块是搜索引擎的核心组件,为语义搜索、图片搜索提供AI驱动的相似度计算能力。
1.2 技术选型
| 功能 | 本地服务 | 云端服务 |
|---|---|---|
| 文本模型 | BGE-M3 (Xorbits/bge-m3) | text-embedding-v4 |
| 图片模型 | CN-CLIP (ViT-H-14) | - |
| 向量维度 | 1024 | 1024 |
| 服务框架 | FastAPI | 阿里云API |
| 部署方式 | Docker/本地 | 云端API |
1.3 应用场景
- 语义搜索:查询文本向量化,与商品向量计算相似度
- 图片搜索:商品图片向量化,支持以图搜图
- 混合检索:BM25 + 向量相似度组合排序
- 多语言搜索:中英文跨语言语义理解
向量化服务架构
2.1 本地向量化服务
┌─────────────────────────────────────────┐
│ Embedding Microservice (FastAPI) │
│ Port: 6005, Workers: 1 │
└──────────────┬──────────────────────────┘
│
┌───────┴───────┐
│ │
┌──────▼──────┐ ┌────▼─────┐
│ BGE-M3 │ │ CN-CLIP │
│ Text Model │ │ Image │
│ (CUDA/CPU) │ │ Model │
└─────────────┘ └──────────┘
核心特性:
- 独立部署,可横向扩展
- GPU加速支持
- 线程安全设计
- 启动时预加载模型
2.2 云端向量化服务
┌─────────────────────────────────────┐
│ SearchEngine Main Service │
│ (uses CloudTextEncoder) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Aliyun DashScope API │
│ text-embedding-v4 │
│ (HTTP/REST) │
└─────────────────────────────────────┘
核心特性:
- 无需GPU资源
- 按使用量计费
- 自动扩展
- 低运维成本
2.3 架构对比
| 维度 | 本地服务 | 云端服务 |
|---|---|---|
| 初始成本 | 高(GPU服务器) | 低(按需付费) |
| 运行成本 | 固定 | 变动(按调用量) |
| 延迟 | 300-400ms | |
| 吞吐量 | 高(~32 qps) | 中(~2-3 qps) |
| 离线支持 | ✅ | ❌ |
| 维护成本 | 高 | 低 |
| 扩展性 | 手动扩展 | 自动扩展 |
| 适用场景 | 大规模生产环境 | 初期开发/小规模应用 |
本地向量化服务
3.1 服务启动
方式1:使用脚本启动(推荐)
# 启动向量化服务
./scripts/start_embedding_service.sh
脚本特性:
- 自动激活conda环境
- 读取配置文件获取端口
- 单worker模式启动服务
方式2:手动启动
# 激活环境
source /home/tw/miniconda3/etc/profile.d/conda.sh
conda activate searchengine
# 启动服务
python -m uvicorn embeddings.server:app \
--host 0.0.0.0 \
--port 6005 \
--workers 1
方式3:Docker部署(生产环境)
# 构建镜像
docker build -t searchengine-embedding:latest .
# 启动容器
docker run -d \
--name embedding-service \
--gpus all \
-p 6005:6005 \
searchengine-embedding:latest
3.2 服务配置
配置文件:embeddings/config.py
class EmbeddingConfig:
# 服务配置
HOST = "0.0.0.0" # 监听地址
PORT = 6005 # 监听端口
# 文本模型 (BGE-M3)
TEXT_MODEL_DIR = "Xorbits/bge-m3" # 模型路径/HuggingFace ID
TEXT_DEVICE = "cuda" # 设备: "cuda" 或 "cpu"
TEXT_BATCH_SIZE = 32 # 批处理大小
# 图片模型 (CN-CLIP)
IMAGE_MODEL_NAME = "ViT-H-14" # 模型名称
IMAGE_DEVICE = None # None=自动, "cuda", "cpu"
IMAGE_BATCH_SIZE = 8 # 批处理大小
3.3 模型说明
BGE-M3 文本模型
- 模型ID:
Xorbits/bge-m3 - 向量维度: 1024
- 支持语言: 中文、英文、多语言(100+)
- 特性: 强大的语义理解能力,支持长文本
- 部署: 自动从HuggingFace下载
CN-CLIP 图片模型
- 模型: ViT-H-14 (Chinese CLIP)
- 向量维度: 1024
- 输入: 图片URL或本地路径
- 特性: 中文图文理解,适合电商场景
- 预处理: 自动下载、缩放、归一化
云端向量化服务
4.1 阿里云DashScope
服务地址:
- 北京地域:
https://dashscope.aliyuncs.com/compatible-mode/v1 - 新加坡地域:
https://dashscope-intl.aliyuncs.com/compatible-mode/v1
模型信息:
- 模型名:
text-embedding-v4 - 向量维度: 1024
- 输入限制: 单次最多2048个文本,每个文本最大8192 token
- 速率限制: 根据API套餐不同而不同
4.2 API Key配置
方式1:环境变量(推荐)
# 临时设置
export DASHSCOPE_API_KEY="sk-your-api-key-here"
# 永久设置(添加到 ~/.bashrc 或 ~/.zshrc)
echo 'export DASHSCOPE_API_KEY="sk-your-api-key-here"' >> ~/.bashrc
source ~/.bashrc
方式2:.env文件
在项目根目录创建.env文件:
DASHSCOPE_API_KEY=sk-your-api-key-here
获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
4.3 使用方式
from embeddings.cloud_text_encoder import CloudTextEncoder
# 初始化编码器(自动从环境变量读取API Key)
encoder = CloudTextEncoder()
# 单个文本向量化
text = "衣服的质量杠杠的"
embedding = encoder.encode(text)
print(embedding.shape) # (1, 1024)
# 批量向量化
texts = ["文本1", "文本2", "文本3"]
embeddings = encoder.encode(texts)
print(embeddings.shape) # (3, 1024)
# 大批量处理(自动分批)
large_texts = [f"商品 {i}" for i in range(1000)]
embeddings = encoder.encode_batch(large_texts, batch_size=32)
自定义配置:
# 使用新加坡地域
encoder = CloudTextEncoder(
api_key="sk-xxx",
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
)
Embedding API详细说明
5.1 API概览
本地向量化服务提供RESTful API接口:
| 端点 | 方法 | 功能 |
|---|---|---|
/health |
GET | 健康检查 |
/embed/text |
POST | 文本向量化 |
/embed/image |
POST | 图片向量化 |
服务地址:
- 默认:
http://localhost:6005 - 生产:
http://<your-server>:6005
5.2 健康检查接口
GET /health
响应示例:
{
"status": "ok",
"text_model_loaded": true,
"image_model_loaded": true
}
字段说明:
status: 服务状态,"ok"表示正常text_model_loaded: 文本模型是否加载成功image_model_loaded: 图片模型是否加载成功
cURL示例:
curl http://localhost:6005/health
5.3 文本向量化接口
POST /embed/text
Content-Type: application/json
请求格式
请求体(JSON数组):
[
"衣服的质量杠杠的",
"Bohemian Maxi Dress",
"Vintage Denim Jacket"
]
参数说明:
- 类型:
List[str] - 长度:建议≤100(避免超时)
- 单个文本:建议≤512个字符
响应格式
成功响应(200 OK):
[
[0.1234, -0.5678, 0.9012, ..., 0.3456], // 1024维向量
[0.2345, 0.6789, -0.1234, ..., 0.4567], // 1024维向量
[0.3456, -0.7890, 0.2345, ..., 0.5678] // 1024维向量
]
字段说明:
- 类型:
List[List[float]] - 每个向量:1024个浮点数
- 对齐原则:输出数组与输入数组按索引一一对应
- 失败项:返回
null
错误示例:
[
[0.1234, -0.5678, ...], // 成功
null, // 失败(空文本或其他错误)
[0.3456, 0.7890, ...] // 成功
]
cURL示例
# 单个文本
curl -X POST http://localhost:6005/embed/text \
-H "Content-Type: application/json" \
-d '["测试查询文本"]'
# 批量文本
curl -X POST http://localhost:6005/embed/text \
-H "Content-Type: application/json" \
-d '["红色连衣裙", "blue jeans", "vintage dress"]'
Python示例
import requests
import numpy as np
def embed_texts(texts):
"""文本向量化"""
response = requests.post(
"http://localhost:6005/embed/text",
json=texts,
timeout=30
)
response.raise_for_status()
embeddings = response.json()
# 转换为numpy数组
valid_embeddings = [e for e in embeddings if e is not None]
return np.array(valid_embeddings)
# 使用
texts = ["红色连衣裙", "blue jeans"]
embeddings = embed_texts(texts)
print(f"Shape: {embeddings.shape}") # (2, 1024)
# 计算相似度
similarity = np.dot(embeddings[0], embeddings[1])
print(f"Similarity: {similarity}")
5.4 图片向量化接口
POST /embed/image
Content-Type: application/json
请求格式
请求体(JSON数组):
[
"https://example.com/product1.jpg",
"https://example.com/product2.png",
"/local/path/to/product3.jpg"
]
参数说明:
- 类型:
List[str] - 支持:HTTP URL或本地文件路径
- 格式:JPG、PNG等常见图片格式
- 长度:建议≤10(图片处理较慢)
响应格式
成功响应(200 OK):
[
[0.1234, 0.5678, 0.9012, ..., 0.3456], // 1024维向量
null, // 失败(图片无效或下载失败)
[0.3456, 0.7890, 0.2345, ..., 0.5678] // 1024维向量
]
特性:
- 自动下载:HTTP URL自动下载图片
- 逐个处理:串行处理(带锁保证线程安全)
- 容错:单个失败不影响其他图片
cURL示例
# 单个图片(URL)
curl -X POST http://localhost:6005/embed/image \
-H "Content-Type: application/json" \
-d '["https://example.com/product.jpg"]'
# 多个图片(混合URL和本地路径)
curl -X POST http://localhost:6005/embed/image \
-H "Content-Type: application/json" \
-d '["https://example.com/img1.jpg", "/data/images/img2.png"]'
Python示例
import requests
import numpy as np
def embed_images(image_urls):
"""图片向量化"""
response = requests.post(
"http://localhost:6005/embed/image",
json=image_urls,
timeout=120 # 图片处理较慢,设置更长超时
)
response.raise_for_status()
embeddings = response.json()
# 过滤成功的向量化结果
valid_embeddings = [(url, emb) for url, emb in zip(image_urls, embeddings) if emb is not None]
return valid_embeddings
# 使用
image_urls = [
"https://example.com/dress1.jpg",
"https://example.com/dress2.jpg"
]
results = embed_images(image_urls)
for url, embedding in results:
print(f"{url}: {len(embedding)} dimensions")
5.5 错误处理
HTTP状态码
| 状态码 | 含义 | 处理方式 |
|---|---|---|
| 200 | 成功 | 正常处理响应 |
| 500 | 服务器错误 | 检查服务日志 |
| 503 | 服务不可用 | 模型未加载,检查启动日志 |
常见错误场景
模型未加载
{ "detail": "Runtime Error: Text model not loaded" }解决:检查服务启动日志,确认模型加载成功
无效输入
[null, null]原因:输入包含空字符串或None
图片下载失败
[ [0.123, ...], null // URL无效或网络问题 ]解决:检查URL是否可访问
配置说明
6.1 服务配置
编辑 embeddings/config.py 修改服务配置:
class EmbeddingConfig:
# ========== 服务配置 ==========
HOST = "0.0.0.0" # 监听所有网卡
PORT = 6005 # 默认端口
生产环境建议:
- 使用反向代理(Nginx)处理SSL
- 配置防火墙规则限制访问
- 使用Docker容器隔离
6.2 模型配置
文本模型配置
# ========== BGE-M3 文本模型 ==========
TEXT_MODEL_DIR = "Xorbits/bge-m3" # HuggingFace模型ID
TEXT_DEVICE = "cuda" # 设备选择
TEXT_BATCH_SIZE = 32 # 批处理大小
DEVICE选择:
"cuda": GPU加速(推荐,需要CUDA)"cpu": CPU模式(较慢,但兼容性好)
批处理大小建议:
- GPU(16GB显存):32-64
- GPU(8GB显存):16-32
- CPU:8-16
图片模型配置
# ========== CN-CLIP 图片模型 ==========
IMAGE_MODEL_NAME = "ViT-H-14" # 模型名称
IMAGE_DEVICE = None # None=自动检测
IMAGE_BATCH_SIZE = 8 # 批处理大小
IMAGE_DEVICE选择:
None: 自动检测(推荐)"cuda": 强制使用GPU"cpu": 强制使用CPU
6.3 批处理配置
批处理大小调优:
| 场景 | 文本Batch Size | 图片Batch Size | 说明 |
|---|---|---|---|
| 开发测试 | 16 | 1 | 快速响应 |
| 生产环境(GPU) | 32-64 | 4-8 | 平衡性能 |
| 生产环境(CPU) | 8-16 | 1-2 | 避免内存溢出 |
| 离线批处理 | 128+ | 16+ | 最大化吞吐 |
批处理建议:
- 监控GPU内存使用:
nvidia-smi - 逐步增加batch_size直到OOM
- 预留20%内存余量
客户端集成示例
7.1 Python客户端
基础客户端类
import requests
from typing import List, Optional
import numpy as np
class EmbeddingServiceClient:
"""向量化服务客户端"""
def __init__(self, base_url: str = "http://localhost:6005"):
self.base_url = base_url.rstrip('/')
self.timeout = 30
def health_check(self) -> dict:
"""健康检查"""
response = requests.get(f"{self.base_url}/health", timeout=5)
response.raise_for_status()
return response.json()
def embed_text(self, text: str) -> Optional[List[float]]:
"""单个文本向量化"""
result = self.embed_texts([text])
return result[0] if result else None
def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]:
"""批量文本向量化"""
if not texts:
return []
response = requests.post(
f"{self.base_url}/embed/text",
json=texts,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def embed_image(self, image_url: str) -> Optional[List[float]]:
"""单个图片向量化"""
result = self.embed_images([image_url])
return result[0] if result else None
def embed_images(self, image_urls: List[str]) -> List[Optional[List[float]]]:
"""批量图片向量化"""
if not image_urls:
return []
response = requests.post(
f"{self.base_url}/embed/image",
json=image_urls,
timeout=120 # 图片处理需要更长时间
)
response.raise_for_status()
return response.json()
def embed_texts_to_numpy(self, texts: List[str]) -> Optional[np.ndarray]:
"""批量文本向量化,返回numpy数组"""
embeddings = self.embed_texts(texts)
valid_embeddings = [e for e in embeddings if e is not None]
if not valid_embeddings:
return None
return np.array(valid_embeddings, dtype=np.float32)
# 使用示例
if __name__ == "__main__":
client = EmbeddingServiceClient()
# 健康检查
health = client.health_check()
print(f"Service status: {health}")
# 文本向量化
texts = ["红色连衣裙", "blue jeans", "vintage dress"]
embeddings = client.embed_texts_to_numpy(texts)
print(f"Embeddings shape: {embeddings.shape}")
# 计算相似度
from sklearn.metrics.pairwise import cosine_similarity
similarities = cosine_similarity(embeddings)
print(f"Similarity matrix:\n{similarities}")
高级用法:异步客户端
import aiohttp
import asyncio
from typing import List, Optional
class AsyncEmbeddingClient:
"""异步向量化服务客户端"""
def __init__(self, base_url: str = "http://localhost:6005"):
self.base_url = base_url.rstrip('/')
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]:
"""异步批量文本向量化"""
if not texts:
return []
if not self.session:
raise RuntimeError("Client not initialized. Use 'async with'.")
async with self.session.post(
f"{self.base_url}/embed/text",
json=texts,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
response.raise_for_status()
return await response.json()
# 使用示例
async def main():
async with AsyncEmbeddingClient() as client:
texts = ["text1", "text2", "text3"]
embeddings = await client.embed_texts(texts)
print(f"Got {len(embeddings)} embeddings")
asyncio.run(main())
7.2 Java客户端
基础客户端类
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
public class EmbeddingServiceClient {
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
private final String baseUrl;
public EmbeddingServiceClient(String baseUrl) {
this.baseUrl = baseUrl.replaceAll("/$", "");
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
this.objectMapper = new ObjectMapper();
}
/**
* 健康检查
*/
public HealthStatus healthCheck() throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/health"))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);
JsonNode json = objectMapper.readTree(response.body());
return new HealthStatus(
json.get("status").asText(),
json.get("text_model_loaded").asBoolean(),
json.get("image_model_loaded").asBoolean()
);
}
/**
* 批量文本向量化
*/
public List<float[]> embedTexts(List<String> texts) throws Exception {
// 构建请求体
ArrayNode requestBody = objectMapper.createArrayNode();
for (String text : texts) {
requestBody.add(text);
}
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/embed/text"))
.header("Content-Type", "application/json")
.timeout(Duration.ofSeconds(30))
.POST(HttpRequest.BodyPublishers.ofString(
objectMapper.writeValueAsString(requestBody)
))
.build();
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);
if (response.statusCode() != 200) {
throw new RuntimeException("API error: " + response.body());
}
// 解析响应
JsonNode root = objectMapper.readTree(response.body());
List<float[]> embeddings = new java.util.ArrayList<>();
for (JsonNode item : root) {
if (item.isNull()) {
embeddings.add(null);
} else {
float[] vector = objectMapper.treeToValue(item, float[].class);
embeddings.add(vector);
}
}
return embeddings;
}
/**
* 计算余弦相似度
*/
public static float cosineSimilarity(float[] v1, float[] v2) {
if (v1.length != v2.length) {
throw new IllegalArgumentException("Vectors must be same length");
}
float dotProduct = 0.0f;
float norm1 = 0.0f;
float norm2 = 0.0f;
for (int i = 0; i < v1.length; i++) {
dotProduct += v1[i] * v2[i];
norm1 += v1[i] * v1[i];
norm2 += v2[i] * v2[i];
}
return (float) (dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2)));
}
// 健康状态数据类
public static class HealthStatus {
public final String status;
public final boolean textModelLoaded;
public final boolean imageModelLoaded;
public HealthStatus(String status, boolean textModelLoaded, boolean imageModelLoaded) {
this.status = status;
this.textModelLoaded = textModelLoaded;
this.imageModelLoaded = imageModelLoaded;
}
@Override
public String toString() {
return String.format("HealthStatus{status='%s', textModelLoaded=%b, imageModelLoaded=%b}",
status, textModelLoaded, imageModelLoaded);
}
}
// 使用示例
public static void main(String[] args) throws Exception {
EmbeddingServiceClient client = new EmbeddingServiceClient("http://localhost:6005");
// 健康检查
HealthStatus health = client.healthCheck();
System.out.println("Health: " + health);
// 文本向量化
List<String> texts = List.of("红色连衣裙", "blue jeans", "vintage dress");
List<float[]> embeddings = client.embedTexts(texts);
System.out.println("Got " + embeddings.size() + " embeddings");
for (int i = 0; i < embeddings.size(); i++) {
System.out.println("Embedding " + i + " dimensions: " +
(embeddings.get(i) != null ? embeddings.get(i).length : "null"));
}
// 计算相似度
if (embeddings.get(0) != null && embeddings.get(1) != null) {
float similarity = cosineSimilarity(embeddings.get(0), embeddings.get(1));
System.out.println("Similarity between text 0 and 1: " + similarity);
}
}
}
Maven依赖(pom.xml):
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
7.3 cURL示例
健康检查
curl http://localhost:6005/health
文本向量化
# 单个文本
curl -X POST http://localhost:6005/embed/text \
-H "Content-Type: application/json" \
-d '["衣服的质量杠杠的"]' \
| jq '.[0][0:10]' # 打印前10维
# 批量文本
curl -X POST http://localhost:6005/embed/text \
-H "Content-Type: application/json" \
-d '["红色连衣裙", "blue jeans", "vintage dress"]' \
| jq '. | length' # 检查返回数量
图片向量化
# URL图片
curl -X POST http://localhost:6005/embed/image \
-H "Content-Type: application/json" \
-d '["https://example.com/product.jpg"]' \
| jq '.[0][0:5]'
# 本地图片
curl -X POST http://localhost:6005/embed/image \
-H "Content-Type: application/json" \
-d '["/data/images/product.jpg"]'
错误处理示例
# 检查服务状态
if ! curl -f http://localhost:6005/health > /dev/null 2>&1; then
echo "Embedding service is not healthy!"
exit 1
fi
# 调用API并检查错误
response=$(curl -s -X POST http://localhost:6005/embed/text \
-H "Content-Type: application/json" \
-d '["test query"]')
if echo "$response" | jq -e '.[0] == null' > /dev/null; then
echo "Embedding failed!"
echo "$response"
exit 1
fi
echo "Embedding succeeded!"
性能对比与优化
8.1 性能对比
本地服务性能
| 操作 | 硬件配置 | 延迟 | 吞吐量 |
|---|---|---|---|
| 文本向量化(单个) | GPU (RTX 3090) | ~80ms | ~12 qps |
| 文本向量化(批量32) | GPU (RTX 3090) | ~2.5s | ~256 qps |
| 文本向量化(单个) | CPU (16核) | ~500ms | ~2 qps |
| 图片向量化(单个) | GPU (RTX 3090) | ~150ms | ~6 qps |
| 图片向量化(批量4) | GPU (RTX 3090) | ~600ms | ~6 qps |
云端服务性能
| 操作 | 指标 | 值 |
|---|---|---|
| 文本向量化(单个) | 延迟 | 300-400ms |
| 文本向量化(批量) | 吞吐量 | ~2-3 qps |
| API限制 | 速率限制 | 取决于套餐 |
| 可用性 | SLA | 99.9% |
8.2 成本对比
本地服务成本
| 配置 | 硬件成本(月) | 电费(月) | 总成本(月) |
|---|---|---|---|
| GPU服务器 (RTX 3090) | ¥3000 | ¥500 | ¥3500 |
| GPU服务器 (A100) | ¥8000 | ¥800 | ¥8800 |
| CPU服务器(16核) | ¥800 | ¥200 | ¥1000 |
云端服务成本
阿里云DashScope定价(参考):
| 套餐 | 价格 | 调用量 | 适用场景 |
|---|---|---|---|
| 按量付费 | ¥0.0007/1K tokens | 无限制 | 测试/小规模 |
| 基础版 | ¥100/月 | 1M tokens | 小规模应用 |
| 专业版 | ¥500/月 | 10M tokens | 中等规模 |
| 企业版 | 定制 | 无限制 | 大规模 |
成本计算示例:
假设每天10万次搜索,每次查询平均10个token:
- 日调用量:1M tokens
- 月调用量:30M tokens
- 月成本:30 × 0.7 = ¥21(按量付费)
8.3 优化建议
本地服务优化
GPU利用率优化
# 增加批处理大小 TEXT_BATCH_SIZE = 64 # 从32增加到64模型量化
# 使用半精度浮点数(节省显存) import torch model = model.half() # FP16预热模型
# 服务启动后预热 @app.on_event("startup") async def warmup(): _text_model.encode(["warmup"], device="cuda")连接池优化
# uvicorn配置 --workers 1 \ # 单worker(GPU模型限制) --backlog 2048 \ # 增加连接队列 --limit-concurrency 32 # 限制并发数
云端服务优化
批量合并
# 累积多个请求后批量调用 class BatchEncoder: def __init__(self, batch_size=32, timeout=0.1): self.batch_size = batch_size self.timeout = timeout self.queue = [] async def encode(self, text: str): # 等待批量积累 future = asyncio.Future() self.queue.append((text, future)) if len(self.queue) >= self.batch_size: self._flush() return await future本地缓存
import hashlib import pickle
class CachedEncoder: def init(self, cache_file="embedding_cache.pkl"): self.cache = self.load_cache(cachefile)
def encode(self, text: str):
key = hashlib.md5(text.encode()).hexdigest()
if key in self.cache:
return self.cache[key]
embedding = self._call_api(text)
self.cache[key] = embedding
return embedding
3. **降级策略**
```python
class HybridEncoder:
def __init__(self):
self.cloud_encoder = CloudTextEncoder()
self.local_encoder = None # 按需加载
def encode(self, text: str):
try:
return self.cloud_encoder.encode(text)
except Exception as e:
logger.warning(f"Cloud API failed: {e}, falling back to local")
if not self.local_encoder:
self.local_encoder = BgeEncoder()
return self.local_encoder.encode(text)
故障排查
9.1 常见问题
问题1:服务无法启动
症状:
$ ./scripts/start_embedding_service.sh
Error: Port 6005 already in use
解决:
# 检查端口占用
lsof -i :6005
# 杀死占用进程
kill -9 <PID>
# 或者修改配置文件中的端口
# embeddings/config.py: PORT = 6006
问题2:CUDA Out of Memory
症状:
RuntimeError: CUDA out of memory. Tried to allocate 2.00 GiB
解决:
# 减小批处理大小
TEXT_BATCH_SIZE = 16 # 从32减少到16
# 或者使用CPU模式
TEXT_DEVICE = "cpu"
问题3:模型下载失败
症状:
OSError: Can't load tokenizer for 'Xorbits/bge-m3'
解决:
# 手动下载模型
huggingface-cli download Xorbits/bge-m3
# 或使用镜像
export HF_ENDPOINT=https://hf-mirror.com
问题4:云端API Key无效
症状:
ERROR: DASHSCOPE_API_KEY environment variable is not set!
解决:
# 设置环境变量
export DASHSCOPE_API_KEY="sk-your-key"
# 验证
echo $DASHSCOPE_API_KEY
问题5:API速率限制
症状:
Rate limit exceeded. Please try again later.
解决:
# 添加延迟
import time
for batch in batches:
embeddings = encoder.encode_batch(batch)
time.sleep(0.1) # 每批之间延迟100ms
9.2 日志查看
服务日志
# 查看实时日志
./scripts/start_embedding_service.sh 2>&1 | tee embedding.log
# 或使用systemd(如果配置了服务)
journalctl -u embedding-service -f
Python应用日志
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 使用
logger.info("Encoding texts...")
logger.error("Encoding failed: %s", str(e))
GPU监控
# 实时监控GPU使用
watch -n 1 nvidia-smi
# 查看详细信息
nvidia-smi --query-gpu=timestamp,name,temperature.gpu,utilization.gpu,utilization.memory,memory.total,memory.used,memory.free --format=csv
9.3 性能调优
性能分析
import time
import numpy as np
def benchmark_encoder(encoder, texts, iterations=100):
"""性能基准测试"""
times = []
for i in range(iterations):
start = time.time()
embeddings = encoder.encode(texts)
end = time.time()
times.append(end - start)
times = np.array(times)
print(f"Mean: {times.mean():.3f}s")
print(f"Std: {times.std():.3f}s")
print(f"Min: {times.min():.3f}s")
print(f"Max: {times.max():.3f}s")
print(f"QPS: {len(texts) / times.mean():.2f}")
# 使用
benchmark_encoder(encoder, texts=["test"] * 32, iterations=100)
内存分析
# Python内存分析
pip install memory_profiler
# 在代码中添加
from memory_profiler import profile
@profile
def encode_batch(texts):
return encoder.encode(texts)
# 运行
python -m memory_profiler script.py
附录
10.1 向量维度说明
为什么是1024维?
- 表达能力:1024维可以捕捉丰富的语义信息
- 计算效率:维度适中,计算速度快
- 存储平衡:向量大小合理(每个向量约4KB)
- 模型选择:BGE-M3和text-embedding-v4都使用1024维
向量存储计算
单个向量大小 = 1024 × 4字节(FP32) = 4KB
100万向量大小 = 4KB × 1,000,000 = 4GB
1000万向量大小 = 4KB × 10,000,000 = 40GB
10.2 模型版本信息
BGE-M3
- HuggingFace ID:
Xorbits/bge-m3 - 论文: BGE-M3: Multi-Functionality, Multi-Linguality, Multi-Granularity Text Embeddings Through Self-Knowledge Distillation
- GitHub: https://github.com/FlagOpen/FlagEmbedding
- 特性:
- 支持100+种语言
- 最大支持8192 token长度
- 丰富的语义表达能力
CN-CLIP
- 模型: ViT-H-14
- 论文: Chinese CLIP: Contrastive Language-Image Pretraining in Chinese
- GitHub: https://github.com/OFA-Sys/Chinese-CLIP
- 特性:
- 中文图文理解
- 支持图片检索和文本检索
- 适合电商场景
Aliyun text-embedding-v4
- 提供商: 阿里云DashScope
- 文档: https://help.aliyun.com/zh/model-studio/getting-started/models
- 特性:
- 云端API,无需部署
- 高可用性(99.9% SLA)
- 自动扩展
10.3 相关文档
项目文档
- 搜索API对接指南:
docs/搜索API对接指南.md - 索引字段说明:
docs/索引字段说明v2.md - 系统设计文档:
docs/系统设计文档.md - CLAUDE项目指南:
CLAUDE.md
外部参考
- BGE-M3官方文档: https://github.com/FlagOpen/FlagEmbedding/tree/master/BGE_M3
- 阿里云DashScope: https://help.aliyun.com/zh/model-studio/
- Elasticsearch向量搜索: https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html
- FastAPI文档: https://fastapi.tiangolo.com/
测试脚本
# 本地向量化服务测试
./scripts/test_embedding_service.sh
# 云端向量化服务测试
python scripts/test_cloud_embedding.py
# 性能基准测试
python scripts/benchmark_embeddings.py
版本历史
| 版本 | 日期 | 变更说明 |
|---|---|---|
| v1.0 | 2025-12-23 | 初始版本,完整的向量化模块文档 |
联系方式
如有问题或建议,请联系项目维护者。
项目仓库: /data/tw/SearchEngine
相关文档目录: docs/