向量化模块和API说明文档.md 35.7 KB

向量化模块和API说明文档

本文档详细说明saas-search项目中的向量化模块架构、API接口、配置方法和使用指南。

目录

  1. 概述

  2. 向量化服务架构

  3. 本地向量化服务

  4. 云端向量化服务

  5. Embedding API详细说明

  6. 配置说明

  7. 客户端集成示例

  8. 性能对比与优化

  9. 故障排查

  10. 附录


概述

1.1 向量化模块简介

saas-search项目实现了完整的文本和图片向量化能力,支持两种部署方式:

  1. 本地向量化服务:独立部署的微服务,基于本地GPU/CPU运行BGE-M3和CN-CLIP模型
  2. 云端向量化服务:集成阿里云DashScope API,按使用量付费

向量化模块是搜索引擎的核心组件,为语义搜索、图片搜索提供AI驱动的相似度计算能力。

1.2 技术选型

功能 本地服务 云端服务
文本模型 BGE-M3 (Xorbits/bge-m3) text-embedding-v4
图片模型 CN-CLIP (ViT-H-14) -
向量维度 1024 1024
服务框架 FastAPI 阿里云API
部署方式 Docker/本地 云端API

1.3 应用场景

  • 语义搜索:查询文本向量化,与商品向量计算相似度
  • 图片搜索:商品图片向量化,支持以图搜图
  • 混合检索:BM25 + 向量相似度组合排序
  • 多语言搜索:中英文跨语言语义理解

向量化服务架构

2.1 本地向量化服务

┌─────────────────────────────────────────┐
│  Embedding Microservice (FastAPI)       │
│  Port: 6005, Workers: 1                 │
└──────────────┬──────────────────────────┘
               │
       ┌───────┴───────┐
       │               │
┌──────▼──────┐  ┌────▼─────┐
│ BGE-M3      │  │ CN-CLIP  │
│ Text Model  │  │ Image    │
│ (CUDA/CPU)  │  │ Model    │
└─────────────┘  └──────────┘

核心特性

  • 独立部署,可横向扩展
  • GPU加速支持
  • 线程安全设计
  • 启动时预加载模型

2.2 云端向量化服务

┌─────────────────────────────────────┐
│  saas-search Main Service          │
│  (uses CloudTextEncoder)            │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│  Aliyun DashScope API               │
│  text-embedding-v4                  │
│  (HTTP/REST)                        │
└─────────────────────────────────────┘

核心特性

  • 无需GPU资源
  • 按使用量计费
  • 自动扩展
  • 低运维成本

2.3 架构对比

维度 本地服务 云端服务
初始成本 高(GPU服务器) 低(按需付费)
运行成本 固定 变动(按调用量)
延迟 300-400ms
吞吐量 高(~32 qps) 中(~2-3 qps)
离线支持
维护成本
扩展性 手动扩展 自动扩展
适用场景 大规模生产环境 初期开发/小规模应用

本地向量化服务

3.1 服务启动

方式1:使用脚本启动(推荐)

# 启动向量化服务
./scripts/start_embedding_service.sh

脚本特性:

  • 自动激活conda环境
  • 读取配置文件获取端口
  • 单worker模式启动服务

方式2:手动启动

# 激活环境(推荐使用项目根目录 activate.sh;新机器按需 export CONDA_ROOT)
# 例如你的 conda 是 ~/anaconda3/bin/conda,则 export CONDA_ROOT=$HOME/anaconda3
cd /data/saas-search
source activate.sh

# 启动服务
python -m uvicorn embeddings.server:app \
  --host 0.0.0.0 \
  --port 6005 \
  --workers 1

方式3:Docker部署(生产环境)

# 构建镜像
docker build -t searchengine-embedding:latest .

# 启动容器
docker run -d \
  --name embedding-service \
  --gpus all \
  -p 6005:6005 \
  searchengine-embedding:latest

3.2 服务配置

配置文件:embeddings/config.py

class EmbeddingConfig:
    # 服务配置
    HOST = "0.0.0.0"      # 监听地址
    PORT = 6005           # 监听端口

    # 文本模型 (BGE-M3)
    TEXT_MODEL_DIR = "Xorbits/bge-m3"  # 模型路径/HuggingFace ID
    TEXT_DEVICE = "cuda"               # 设备: "cuda" 或 "cpu"
    TEXT_BATCH_SIZE = 32               # 批处理大小

    # 图片模型 (CN-CLIP)
    IMAGE_MODEL_NAME = "ViT-H-14"      # 模型名称
    IMAGE_DEVICE = None                # None=自动, "cuda", "cpu"
    IMAGE_BATCH_SIZE = 8               # 批处理大小

3.3 模型说明

BGE-M3 文本模型

  • 模型ID: Xorbits/bge-m3
  • 向量维度: 1024
  • 支持语言: 中文、英文、多语言(100+)
  • 特性: 强大的语义理解能力,支持长文本
  • 部署: 自动从HuggingFace下载

CN-CLIP 图片模型

  • 模型: ViT-H-14 (Chinese CLIP)
  • 向量维度: 1024
  • 输入: 图片URL或本地路径
  • 特性: 中文图文理解,适合电商场景
  • 预处理: 自动下载、缩放、归一化

云端向量化服务

4.1 阿里云DashScope

服务地址

  • 北京地域:https://dashscope.aliyuncs.com/compatible-mode/v1
  • 新加坡地域:https://dashscope-intl.aliyuncs.com/compatible-mode/v1

模型信息

  • 模型名: text-embedding-v4
  • 向量维度: 1024
  • 输入限制: 单次最多2048个文本,每个文本最大8192 token
  • 速率限制: 根据API套餐不同而不同

4.2 API Key配置

方式1:环境变量(推荐)

# 临时设置
export DASHSCOPE_API_KEY="sk-your-api-key-here"

# 永久设置(添加到 ~/.bashrc 或 ~/.zshrc)
echo 'export DASHSCOPE_API_KEY="sk-your-api-key-here"' >> ~/.bashrc
source ~/.bashrc

方式2:.env文件

在项目根目录创建.env文件:

DASHSCOPE_API_KEY=sk-your-api-key-here

获取API Keyhttps://help.aliyun.com/zh/model-studio/get-api-key

4.3 使用方式

from embeddings.cloud_text_encoder import CloudTextEncoder

# 初始化编码器(自动从环境变量读取API Key)
encoder = CloudTextEncoder()

# 单个文本向量化
text = "衣服的质量杠杠的"
embedding = encoder.encode(text)
print(embedding.shape)  # (1, 1024)

# 批量向量化
texts = ["文本1", "文本2", "文本3"]
embeddings = encoder.encode(texts)
print(embeddings.shape)  # (3, 1024)

# 大批量处理(自动分批)
large_texts = [f"商品 {i}" for i in range(1000)]
embeddings = encoder.encode_batch(large_texts, batch_size=32)

自定义配置

# 使用新加坡地域
encoder = CloudTextEncoder(
    api_key="sk-xxx",
    base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
)

Embedding API详细说明

5.1 API概览

本地向量化服务提供RESTful API接口:

端点 方法 功能
/health GET 健康检查
/embed/text POST 文本向量化
/embed/image POST 图片向量化

服务地址

  • 默认:http://localhost:6005
  • 生产:http://<your-server>:6005

5.2 健康检查接口

GET /health

响应示例

{
  "status": "ok",
  "text_model_loaded": true,
  "image_model_loaded": true
}

字段说明

  • status: 服务状态,"ok"表示正常
  • text_model_loaded: 文本模型是否加载成功
  • image_model_loaded: 图片模型是否加载成功

cURL示例

curl http://localhost:6005/health

5.3 文本向量化接口

POST /embed/text
Content-Type: application/json

请求格式

请求体(JSON数组):

[
  "衣服的质量杠杠的",
  "Bohemian Maxi Dress",
  "Vintage Denim Jacket"
]

参数说明

  • 类型:List[str]
  • 长度:建议≤100(避免超时)
  • 单个文本:建议≤512个字符

响应格式

成功响应(200 OK):

[
  [0.1234, -0.5678, 0.9012, ..., 0.3456],  // 1024维向量
  [0.2345, 0.6789, -0.1234, ..., 0.4567],  // 1024维向量
  [0.3456, -0.7890, 0.2345, ..., 0.5678]   // 1024维向量
]

字段说明

  • 类型:List[List[float]]
  • 每个向量:1024个浮点数
  • 对齐原则:输出数组与输入数组按索引一一对应
  • 失败项:返回null

错误示例

[
  [0.1234, -0.5678, ...],  // 成功
  null,                     // 失败(空文本或其他错误)
  [0.3456, 0.7890, ...]     // 成功
]

cURL示例

# 单个文本
curl -X POST http://localhost:6005/embed/text \
  -H "Content-Type: application/json" \
  -d '["测试查询文本"]'

# 批量文本
curl -X POST http://localhost:6005/embed/text \
  -H "Content-Type: application/json" \
  -d '["红色连衣裙", "blue jeans", "vintage dress"]'

Python示例

import requests
import numpy as np

def embed_texts(texts):
    """文本向量化"""
    response = requests.post(
        "http://localhost:6005/embed/text",
        json=texts,
        timeout=30
    )
    response.raise_for_status()
    embeddings = response.json()

    # 转换为numpy数组
    valid_embeddings = [e for e in embeddings if e is not None]
    return np.array(valid_embeddings)

# 使用
texts = ["红色连衣裙", "blue jeans"]
embeddings = embed_texts(texts)
print(f"Shape: {embeddings.shape}")  # (2, 1024)

# 计算相似度
similarity = np.dot(embeddings[0], embeddings[1])
print(f"Similarity: {similarity}")

5.4 图片向量化接口

POST /embed/image
Content-Type: application/json

请求格式

请求体(JSON数组):

[
  "https://example.com/product1.jpg",
  "https://example.com/product2.png",
  "/local/path/to/product3.jpg"
]

参数说明

  • 类型:List[str]
  • 支持:HTTP URL或本地文件路径
  • 格式:JPG、PNG等常见图片格式
  • 长度:建议≤10(图片处理较慢)

响应格式

成功响应(200 OK):

[
  [0.1234, 0.5678, 0.9012, ..., 0.3456],  // 1024维向量
  null,                                   // 失败(图片无效或下载失败)
  [0.3456, 0.7890, 0.2345, ..., 0.5678]   // 1024维向量
]

特性

  • 自动下载:HTTP URL自动下载图片
  • 逐个处理:串行处理(带锁保证线程安全)
  • 容错:单个失败不影响其他图片

cURL示例

# 单个图片(URL)
curl -X POST http://localhost:6005/embed/image \
  -H "Content-Type: application/json" \
  -d '["https://example.com/product.jpg"]'

# 多个图片(混合URL和本地路径)
curl -X POST http://localhost:6005/embed/image \
  -H "Content-Type: application/json" \
  -d '["https://example.com/img1.jpg", "/data/images/img2.png"]'

Python示例

import requests
import numpy as np

def embed_images(image_urls):
    """图片向量化"""
    response = requests.post(
        "http://localhost:6005/embed/image",
        json=image_urls,
        timeout=120  # 图片处理较慢,设置更长超时
    )
    response.raise_for_status()
    embeddings = response.json()

    # 过滤成功的向量化结果
    valid_embeddings = [(url, emb) for url, emb in zip(image_urls, embeddings) if emb is not None]
    return valid_embeddings

# 使用
image_urls = [
    "https://example.com/dress1.jpg",
    "https://example.com/dress2.jpg"
]

results = embed_images(image_urls)
for url, embedding in results:
    print(f"{url}: {len(embedding)} dimensions")

5.5 错误处理

HTTP状态码

状态码 含义 处理方式
200 成功 正常处理响应
500 服务器错误 检查服务日志
503 服务不可用 模型未加载,检查启动日志

常见错误场景

  1. 模型未加载

    {
    "detail": "Runtime Error: Text model not loaded"
    }
    

    解决:检查服务启动日志,确认模型加载成功

  2. 无效输入

    [null, null]
    

    原因:输入包含空字符串或None

  3. 图片下载失败

    [
    [0.123, ...],
    null  // URL无效或网络问题
    ]
    

    解决:检查URL是否可访问


配置说明

6.1 服务配置

编辑 embeddings/config.py 修改服务配置:

class EmbeddingConfig:
    # ========== 服务配置 ==========
    HOST = "0.0.0.0"    # 监听所有网卡
    PORT = 6005         # 默认端口

生产环境建议

  • 使用反向代理(Nginx)处理SSL
  • 配置防火墙规则限制访问
  • 使用Docker容器隔离

6.2 模型配置

文本模型配置

# ========== BGE-M3 文本模型 ==========
TEXT_MODEL_DIR = "Xorbits/bge-m3"  # HuggingFace模型ID
TEXT_DEVICE = "cuda"               # 设备选择
TEXT_BATCH_SIZE = 32               # 批处理大小

DEVICE选择

  • "cuda": GPU加速(推荐,需要CUDA)
  • "cpu": CPU模式(较慢,但兼容性好)

批处理大小建议

  • GPU(16GB显存):32-64
  • GPU(8GB显存):16-32
  • CPU:8-16

图片模型配置

# ========== CN-CLIP 图片模型 ==========
IMAGE_MODEL_NAME = "ViT-H-14"      # 模型名称
IMAGE_DEVICE = None                # None=自动检测
IMAGE_BATCH_SIZE = 8               # 批处理大小

IMAGE_DEVICE选择

  • None: 自动检测(推荐)
  • "cuda": 强制使用GPU
  • "cpu": 强制使用CPU

6.3 批处理配置

批处理大小调优

场景 文本Batch Size 图片Batch Size 说明
开发测试 16 1 快速响应
生产环境(GPU) 32-64 4-8 平衡性能
生产环境(CPU) 8-16 1-2 避免内存溢出
离线批处理 128+ 16+ 最大化吞吐

批处理建议

  1. 监控GPU内存使用:nvidia-smi
  2. 逐步增加batch_size直到OOM
  3. 预留20%内存余量

客户端集成示例

7.1 Python客户端

基础客户端类

import requests
from typing import List, Optional
import numpy as np

class EmbeddingServiceClient:
    """向量化服务客户端"""

    def __init__(self, base_url: str = "http://localhost:6005"):
        self.base_url = base_url.rstrip('/')
        self.timeout = 30

    def health_check(self) -> dict:
        """健康检查"""
        response = requests.get(f"{self.base_url}/health", timeout=5)
        response.raise_for_status()
        return response.json()

    def embed_text(self, text: str) -> Optional[List[float]]:
        """单个文本向量化"""
        result = self.embed_texts([text])
        return result[0] if result else None

    def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]:
        """批量文本向量化"""
        if not texts:
            return []

        response = requests.post(
            f"{self.base_url}/embed/text",
            json=texts,
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def embed_image(self, image_url: str) -> Optional[List[float]]:
        """单个图片向量化"""
        result = self.embed_images([image_url])
        return result[0] if result else None

    def embed_images(self, image_urls: List[str]) -> List[Optional[List[float]]]:
        """批量图片向量化"""
        if not image_urls:
            return []

        response = requests.post(
            f"{self.base_url}/embed/image",
            json=image_urls,
            timeout=120  # 图片处理需要更长时间
        )
        response.raise_for_status()
        return response.json()

    def embed_texts_to_numpy(self, texts: List[str]) -> Optional[np.ndarray]:
        """批量文本向量化,返回numpy数组"""
        embeddings = self.embed_texts(texts)
        valid_embeddings = [e for e in embeddings if e is not None]
        if not valid_embeddings:
            return None
        return np.array(valid_embeddings, dtype=np.float32)

# 使用示例
if __name__ == "__main__":
    client = EmbeddingServiceClient()

    # 健康检查
    health = client.health_check()
    print(f"Service status: {health}")

    # 文本向量化
    texts = ["红色连衣裙", "blue jeans", "vintage dress"]
    embeddings = client.embed_texts_to_numpy(texts)
    print(f"Embeddings shape: {embeddings.shape}")

    # 计算相似度
    from sklearn.metrics.pairwise import cosine_similarity
    similarities = cosine_similarity(embeddings)
    print(f"Similarity matrix:\n{similarities}")

高级用法:异步客户端

import aiohttp
import asyncio
from typing import List, Optional

class AsyncEmbeddingClient:
    """异步向量化服务客户端"""

    def __init__(self, base_url: str = "http://localhost:6005"):
        self.base_url = base_url.rstrip('/')
        self.session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]:
        """异步批量文本向量化"""
        if not texts:
            return []

        if not self.session:
            raise RuntimeError("Client not initialized. Use 'async with'.")

        async with self.session.post(
            f"{self.base_url}/embed/text",
            json=texts,
            timeout=aiohttp.ClientTimeout(total=30)
        ) as response:
            response.raise_for_status()
            return await response.json()

# 使用示例
async def main():
    async with AsyncEmbeddingClient() as client:
        texts = ["text1", "text2", "text3"]
        embeddings = await client.embed_texts(texts)
        print(f"Got {len(embeddings)} embeddings")

asyncio.run(main())

7.2 Java客户端

基础客户端类

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;

public class EmbeddingServiceClient {
    private final HttpClient httpClient;
    private final ObjectMapper objectMapper;
    private final String baseUrl;

    public EmbeddingServiceClient(String baseUrl) {
        this.baseUrl = baseUrl.replaceAll("/$", "");
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
        this.objectMapper = new ObjectMapper();
    }

    /**
     * 健康检查
     */
    public HealthStatus healthCheck() throws Exception {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + "/health"))
            .timeout(Duration.ofSeconds(5))
            .GET()
            .build();

        HttpResponse<String> response = httpClient.send(
            request,
            HttpResponse.BodyHandlers.ofString()
        );

        JsonNode json = objectMapper.readTree(response.body());
        return new HealthStatus(
            json.get("status").asText(),
            json.get("text_model_loaded").asBoolean(),
            json.get("image_model_loaded").asBoolean()
        );
    }

    /**
     * 批量文本向量化
     */
    public List<float[]> embedTexts(List<String> texts) throws Exception {
        // 构建请求体
        ArrayNode requestBody = objectMapper.createArrayNode();
        for (String text : texts) {
            requestBody.add(text);
        }

        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + "/embed/text"))
            .header("Content-Type", "application/json")
            .timeout(Duration.ofSeconds(30))
            .POST(HttpRequest.BodyPublishers.ofString(
                objectMapper.writeValueAsString(requestBody)
            ))
            .build();

        HttpResponse<String> response = httpClient.send(
            request,
            HttpResponse.BodyHandlers.ofString()
        );

        if (response.statusCode() != 200) {
            throw new RuntimeException("API error: " + response.body());
        }

        // 解析响应
        JsonNode root = objectMapper.readTree(response.body());
        List<float[]> embeddings = new java.util.ArrayList<>();

        for (JsonNode item : root) {
            if (item.isNull()) {
                embeddings.add(null);
            } else {
                float[] vector = objectMapper.treeToValue(item, float[].class);
                embeddings.add(vector);
            }
        }

        return embeddings;
    }

    /**
     * 计算余弦相似度
     */
    public static float cosineSimilarity(float[] v1, float[] v2) {
        if (v1.length != v2.length) {
            throw new IllegalArgumentException("Vectors must be same length");
        }

        float dotProduct = 0.0f;
        float norm1 = 0.0f;
        float norm2 = 0.0f;

        for (int i = 0; i < v1.length; i++) {
            dotProduct += v1[i] * v2[i];
            norm1 += v1[i] * v1[i];
            norm2 += v2[i] * v2[i];
        }

        return (float) (dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2)));
    }

    // 健康状态数据类
    public static class HealthStatus {
        public final String status;
        public final boolean textModelLoaded;
        public final boolean imageModelLoaded;

        public HealthStatus(String status, boolean textModelLoaded, boolean imageModelLoaded) {
            this.status = status;
            this.textModelLoaded = textModelLoaded;
            this.imageModelLoaded = imageModelLoaded;
        }

        @Override
        public String toString() {
            return String.format("HealthStatus{status='%s', textModelLoaded=%b, imageModelLoaded=%b}",
                status, textModelLoaded, imageModelLoaded);
        }
    }

    // 使用示例
    public static void main(String[] args) throws Exception {
        EmbeddingServiceClient client = new EmbeddingServiceClient("http://localhost:6005");

        // 健康检查
        HealthStatus health = client.healthCheck();
        System.out.println("Health: " + health);

        // 文本向量化
        List<String> texts = List.of("红色连衣裙", "blue jeans", "vintage dress");
        List<float[]> embeddings = client.embedTexts(texts);

        System.out.println("Got " + embeddings.size() + " embeddings");
        for (int i = 0; i < embeddings.size(); i++) {
            System.out.println("Embedding " + i + " dimensions: " +
                (embeddings.get(i) != null ? embeddings.get(i).length : "null"));
        }

        // 计算相似度
        if (embeddings.get(0) != null && embeddings.get(1) != null) {
            float similarity = cosineSimilarity(embeddings.get(0), embeddings.get(1));
            System.out.println("Similarity between text 0 and 1: " + similarity);
        }
    }
}

Maven依赖pom.xml):

<dependencies>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
</dependencies>

7.3 cURL示例

健康检查

curl http://localhost:6005/health

文本向量化

# 单个文本
curl -X POST http://localhost:6005/embed/text \
  -H "Content-Type: application/json" \
  -d '["衣服的质量杠杠的"]' \
  | jq '.[0][0:10]'  # 打印前10维

# 批量文本
curl -X POST http://localhost:6005/embed/text \
  -H "Content-Type: application/json" \
  -d '["红色连衣裙", "blue jeans", "vintage dress"]' \
  | jq '. | length'  # 检查返回数量

图片向量化

# URL图片
curl -X POST http://localhost:6005/embed/image \
  -H "Content-Type: application/json" \
  -d '["https://example.com/product.jpg"]' \
  | jq '.[0][0:5]'

# 本地图片
curl -X POST http://localhost:6005/embed/image \
  -H "Content-Type: application/json" \
  -d '["/data/images/product.jpg"]'

错误处理示例

# 检查服务状态
if ! curl -f http://localhost:6005/health > /dev/null 2>&1; then
    echo "Embedding service is not healthy!"
    exit 1
fi

# 调用API并检查错误
response=$(curl -s -X POST http://localhost:6005/embed/text \
  -H "Content-Type: application/json" \
  -d '["test query"]')

if echo "$response" | jq -e '.[0] == null' > /dev/null; then
    echo "Embedding failed!"
    echo "$response"
    exit 1
fi

echo "Embedding succeeded!"

性能对比与优化

8.1 性能对比

本地服务性能

操作 硬件配置 延迟 吞吐量
文本向量化(单个) GPU (RTX 3090) ~80ms ~12 qps
文本向量化(批量32) GPU (RTX 3090) ~2.5s ~256 qps
文本向量化(单个) CPU (16核) ~500ms ~2 qps
图片向量化(单个) GPU (RTX 3090) ~150ms ~6 qps
图片向量化(批量4) GPU (RTX 3090) ~600ms ~6 qps

云端服务性能

操作 指标
文本向量化(单个) 延迟 300-400ms
文本向量化(批量) 吞吐量 ~2-3 qps
API限制 速率限制 取决于套餐
可用性 SLA 99.9%

8.2 成本对比

本地服务成本

配置 硬件成本(月) 电费(月) 总成本(月)
GPU服务器 (RTX 3090) ¥3000 ¥500 ¥3500
GPU服务器 (A100) ¥8000 ¥800 ¥8800
CPU服务器(16核) ¥800 ¥200 ¥1000

云端服务成本

阿里云DashScope定价(参考):

套餐 价格 调用量 适用场景
按量付费 ¥0.0007/1K tokens 无限制 测试/小规模
基础版 ¥100/月 1M tokens 小规模应用
专业版 ¥500/月 10M tokens 中等规模
企业版 定制 无限制 大规模

成本计算示例

假设每天10万次搜索,每次查询平均10个token:

  • 日调用量:1M tokens
  • 月调用量:30M tokens
  • 月成本:30 × 0.7 = ¥21(按量付费)

8.3 优化建议

本地服务优化

  1. GPU利用率优化

    # 增加批处理大小
    TEXT_BATCH_SIZE = 64  # 从32增加到64
    
  2. 模型量化

    # 使用半精度浮点数(节省显存)
    import torch
    model = model.half()  # FP16
    
  3. 预热模型

    # 服务启动后预热
    @app.on_event("startup")
    async def warmup():
    _text_model.encode(["warmup"], device="cuda")
    
  4. 连接池优化

    # uvicorn配置
    --workers 1 \           # 单worker(GPU模型限制)
    --backlog 2048 \        # 增加连接队列
    --limit-concurrency 32  # 限制并发数
    

云端服务优化

  1. 批量合并

    # 累积多个请求后批量调用
    class BatchEncoder:
    def __init__(self, batch_size=32, timeout=0.1):
        self.batch_size = batch_size
        self.timeout = timeout
        self.queue = []
    
    async def encode(self, text: str):
        # 等待批量积累
        future = asyncio.Future()
        self.queue.append((text, future))
    
        if len(self.queue) >= self.batch_size:
            self._flush()
    
        return await future
    
  2. 本地缓存

    import hashlib
    import pickle
    

class CachedEncoder: def init(self, cache_file="embedding_cache.pkl"): self.cache = self.load_cache(cachefile)

def encode(self, text: str):
    key = hashlib.md5(text.encode()).hexdigest()
    if key in self.cache:
        return self.cache[key]

    embedding = self._call_api(text)
    self.cache[key] = embedding
    return embedding

3. **降级策略**
```python
class HybridEncoder:
    def __init__(self):
        self.cloud_encoder = CloudTextEncoder()
        self.local_encoder = None  # 按需加载

    def encode(self, text: str):
        try:
            return self.cloud_encoder.encode(text)
        except Exception as e:
            logger.warning(f"Cloud API failed: {e}, falling back to local")
            if not self.local_encoder:
                self.local_encoder = BgeEncoder()
            return self.local_encoder.encode(text)

故障排查

9.1 常见问题

问题1:服务无法启动

症状

$ ./scripts/start_embedding_service.sh
Error: Port 6005 already in use

解决

# 检查端口占用
lsof -i :6005

# 杀死占用进程
kill -9 <PID>

# 或者修改配置文件中的端口
# embeddings/config.py: PORT = 6006

问题2:CUDA Out of Memory

症状

RuntimeError: CUDA out of memory. Tried to allocate 2.00 GiB

解决

# 减小批处理大小
TEXT_BATCH_SIZE = 16  # 从32减少到16

# 或者使用CPU模式
TEXT_DEVICE = "cpu"

问题3:模型下载失败

症状

OSError: Can't load tokenizer for 'Xorbits/bge-m3'

解决

# 手动下载模型
huggingface-cli download Xorbits/bge-m3

# 或使用镜像
export HF_ENDPOINT=https://hf-mirror.com

问题4:云端API Key无效

症状

ERROR: DASHSCOPE_API_KEY environment variable is not set!

解决

# 设置环境变量
export DASHSCOPE_API_KEY="sk-your-key"

# 验证
echo $DASHSCOPE_API_KEY

问题5:API速率限制

症状

Rate limit exceeded. Please try again later.

解决

# 添加延迟
import time
for batch in batches:
    embeddings = encoder.encode_batch(batch)
    time.sleep(0.1)  # 每批之间延迟100ms

9.2 日志查看

服务日志

# 查看实时日志
./scripts/start_embedding_service.sh 2>&1 | tee embedding.log

# 或使用systemd(如果配置了服务)
journalctl -u embedding-service -f

Python应用日志

import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 使用
logger.info("Encoding texts...")
logger.error("Encoding failed: %s", str(e))

GPU监控

# 实时监控GPU使用
watch -n 1 nvidia-smi

# 查看详细信息
nvidia-smi --query-gpu=timestamp,name,temperature.gpu,utilization.gpu,utilization.memory,memory.total,memory.used,memory.free --format=csv

9.3 性能调优

性能分析

import time
import numpy as np

def benchmark_encoder(encoder, texts, iterations=100):
    """性能基准测试"""
    times = []

    for i in range(iterations):
        start = time.time()
        embeddings = encoder.encode(texts)
        end = time.time()
        times.append(end - start)

    times = np.array(times)
    print(f"Mean: {times.mean():.3f}s")
    print(f"Std:  {times.std():.3f}s")
    print(f"Min:  {times.min():.3f}s")
    print(f"Max:  {times.max():.3f}s")
    print(f"QPS:  {len(texts) / times.mean():.2f}")

# 使用
benchmark_encoder(encoder, texts=["test"] * 32, iterations=100)

内存分析

# Python内存分析
pip install memory_profiler

# 在代码中添加
from memory_profiler import profile

@profile
def encode_batch(texts):
    return encoder.encode(texts)

# 运行
python -m memory_profiler script.py

附录

10.1 向量维度说明

为什么是1024维?

  1. 表达能力:1024维可以捕捉丰富的语义信息
  2. 计算效率:维度适中,计算速度快
  3. 存储平衡:向量大小合理(每个向量约4KB)
  4. 模型选择:BGE-M3和text-embedding-v4都使用1024维

向量存储计算

单个向量大小 = 1024 × 4字节(FP32) = 4KB
100万向量大小 = 4KB × 1,000,000 = 4GB
1000万向量大小 = 4KB × 10,000,000 = 40GB

10.2 模型版本信息

BGE-M3

CN-CLIP

Aliyun text-embedding-v4

10.3 相关文档

项目文档

  • 搜索API对接指南: docs/搜索API对接指南.md
  • 索引字段说明: docs/索引字段说明v2.md
  • 系统设计文档: docs/系统设计文档.md
  • CLAUDE项目指南: CLAUDE.md

外部参考

测试脚本

# 本地向量化服务测试
./scripts/test_embedding_service.sh

# 云端向量化服务测试
python scripts/test_cloud_embedding.py

# 性能基准测试
python scripts/benchmark_embeddings.py

版本历史

版本 日期 变更说明
v1.0 2025-12-23 初始版本,完整的向量化模块文档

联系方式

如有问题或建议,请联系项目维护者。

项目仓库: /data/saas-search

相关文档目录: docs/