diff --git a/.env.example b/.env.example index a7a15bd..f5db7c2 100644 --- a/.env.example +++ b/.env.example @@ -27,6 +27,9 @@ RERANKER_PORT=6007 EMBEDDING_SERVICE_URL=http://127.0.0.1:6005 TRANSLATION_SERVICE_URL=http://127.0.0.1:6006 RERANKER_SERVICE_URL=http://127.0.0.1:6007/rerank +TRANSLATION_PROVIDER=direct +TRANSLATION_MODEL=qwen +RERANK_PROVIDER=http # Optional startup switches (run.sh / scripts/service_ctl.sh) START_EMBEDDING=0 diff --git a/README.md b/README.md index ec760ba..00fa986 100644 --- a/README.md +++ b/README.md @@ -68,32 +68,6 @@ query anchor -对外: -embedding服务: - curl -X POST http://43.166.252.75:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["衣服", "Bohemian Maxi Dress"]' - - -翻译服务: -# 方式1:直接运行 -python api/translator_app.py -# 方式2:使用 uvicorn -uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload - -curl -X POST http://localhost:6006/translate -H "Content-Type: application/json" -d '{ - "text": "商品名称", - "target_lang": "en", - "source_lang": "zh" - }' - -localhost替换为 -服务器内网地址: -10.0.163.168 -公网地址: -43.166.252.75 - - # 电商搜索引擎 SaaS 一个针对跨境独立站(店匠 Shoplazza 等)的多租户可配置搜索平台。README 作为项目导航入口,帮助你在不同阶段定位到更详细的文档。 @@ -149,18 +123,16 @@ python scripts/recreate_and_import.py \ - **可配置化**:字段/索引域/排序表达式/查询改写全部配置驱动 - **脚本化流水线**:Mock/CSV 数据 → MySQL → Elasticsearch → API/前端 -## 快速上手(概览) +## 新人入口 -| 步骤 | 去哪里看 | 摘要 | -|------|---------|------| -| 1. 准备环境 | `docs/环境配置说明.md` / `Usage-Guide.md` | Conda、`activate.sh`、依赖、ES/MySQL、`.env` | -| 2. 构造测试数据 | `测试数据指南.md` | Tenant1 Mock、Tenant2 CSV、`mock_data.sh` | -| 3. 启动与验证 | `Usage-Guide.md` | `run.sh` 一键启动、分步脚本、日志与健康检查 | -| 4. 理解架构 | `系统设计文档.md` | 数据流、配置系统、查询/搜索/索引模块 | -| 5. 接入搜索 API | `搜索API对接指南.md` / `搜索API速查表.md` | REST 端点、参数、响应、最佳实践 | -| 6. 查字段定义 | `索引字段说明.md` | `search_products` 映射、字段来源、类型与用途 | +**→ `docs/QUICKSTART.md`**:环境、服务、模块、请求示例一页搞定。 -> README 仅保留最常用命令的“索引”。细节以主题文档为准。 +| 步骤 | 文档 | +|------|------| +| 1. 环境与启动 | `docs/QUICKSTART.md` | +| 2. 搜索/索引 API | `docs/QUICKSTART.md` §3、`docs/搜索API速查表.md` | +| 3. 运维与故障 | `docs/Usage-Guide.md` | +| 4. 架构与扩展 | `docs/PROVIDER_ARCHITECTURE.md`、`docs/系统设计文档.md` | ### Runtimes & 命令示例 @@ -196,28 +168,17 @@ curl -X POST http://localhost:6002/search/ \ -d '{"query": "玩具", "size": 10}' ``` -## 文档地图 - -| 文档 | 内容提要 | 适用场景 | -|------|----------|----------| -| `docs/环境配置说明.md` | 系统要求、`activate.sh`、Conda/依赖、外部服务、CONDA_ROOT | 首次部署、新机器环境 | -| `docs/SERVICE_MATRIX.md` | 服务分层、端口、统一启动/停止入口 | 运维值守、联调启动 | -| `Usage-Guide.md` | 环境准备、服务启动、配置、日志、验证手册 | 日常运维、调试 | -| `基础配置指南.md` | 统一硬编码配置说明、索引结构、查询配置 | 了解系统配置、修改配置 | -| `测试数据指南.md` | 两个租户的模拟/CSV 数据构造 & MySQL→ES 流程 | 数据准备、联调 | -| `测试Pipeline说明.md` | 测试流水线、CI 脚本、上下文说明 | 自动化测试、追踪流水线 | -| `系统设计文档.md` | 架构、配置系统、索引/查询/排序模块细节 | 研发/扩展功能 | -| `索引字段说明v2.md` | `search_products` 字段、类型、来源、嵌套结构 | 新增字段、数据对齐 | -| `搜索API对接指南.md` | REST API(文本/图片/管理)详解、示例、响应格式 | API 使用、测试 | -| `搜索API速查表.md` | 常用请求体、过滤器、分面速查表 | 支持团队快速查阅 | -| `Search-API-Examples.md` | Python/JS/cURL 端到端示例 | 客户工程、SDK 参考 | -| `环境配置说明.md` + `.env` 模板 | 运行依赖账号、端口、密钥对照表 | 交付 & 运维 | - -更多补充材料: - -- `测试数据指南.md`:包含完整工作流脚本示例 -- `商品数据源入ES配置规范.md`:数据源映射约定 -- `MULTILANG_FEATURE.md`:多语言处理细节 +## 文档索引 + +| 文档 | 用途 | +|------|------| +| `docs/QUICKSTART.md` | **新人入口**:环境、服务、模块、请求 | +| `docs/Usage-Guide.md` | 运维:日志、多环境、故障排查 | +| `docs/搜索API速查表.md` | 搜索 API 参数速查 | +| `docs/搜索API对接指南.md` | 搜索 API 完整说明 | +| `docs/PROVIDER_ARCHITECTURE.md` | 翻译/向量/重排 provider 扩展 | +| `docs/环境配置说明.md` | 首次部署、新机器环境 | +| `docs/系统设计文档.md` | 架构与模块细节 | ## 关键工作流指引 diff --git a/config/__init__.py b/config/__init__.py index f162a82..3fcff16 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -21,6 +21,20 @@ from .utils import ( get_match_fields_for_index, get_domain_fields ) +from .service_endpoints import ( + resolve_translation_service_url, + resolve_embedding_service_url, + resolve_reranker_service_url, +) +from .services_config import ( + get_translation_config, + get_embedding_config, + get_rerank_config, + get_translation_base_url, + get_embedding_base_url, + get_rerank_service_url, + ServiceConfig, +) __all__ = [ # Main config classes @@ -38,4 +52,14 @@ __all__ = [ 'load_tenant_config', 'get_match_fields_for_index', 'get_domain_fields', + 'resolve_translation_service_url', + 'resolve_embedding_service_url', + 'resolve_reranker_service_url', + 'get_translation_config', + 'get_embedding_config', + 'get_rerank_config', + 'get_translation_base_url', + 'get_embedding_base_url', + 'get_rerank_service_url', + 'ServiceConfig', ] diff --git a/config/config.yaml b/config/config.yaml index b28b801..63857df 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -99,10 +99,10 @@ query_config: chinese_char_limit: 4 english_word_limit: 3 - # 翻译API配置 + # 翻译API配置(provider/URL 在 services.translation) translation_service: "deepl" translation_api_key: null # 通过环境变量设置 - + # 翻译提示词配置(用于提高翻译质量,作为DeepL API的context参数) translation_prompts: # 商品标题翻译提示词 @@ -133,20 +133,57 @@ function_score: boost_mode: "multiply" functions: [] -# 重排配置(唯一实现:外部 BGE 重排服务,由请求参数 enable_rerank 控制是否执行) -# enable_rerank 且 from+size<=rerank_window 时:从 ES 取前 rerank_window 条、重排后再按 from/size 分页 +# 重排配置(provider/URL 在 services.rerank) rerank: rerank_window: 1000 - # service_url: "http://127.0.0.1:6007/rerank" # 可选,不填则用默认端口 6007 - timeout_sec: 15.0 # 文档多时重排耗时长,可按需调大 + timeout_sec: 15.0 weight_es: 0.4 weight_ai: 0.6 - # 模板:用于将搜索请求/文档字段组装成重排服务输入 - # - rerank_query_template:支持 {query} - # - rerank_doc_template:支持 {title} {brief} {vendor} {description} {category_path} rerank_query_template: "{query}" rerank_doc_template: "{title}" +# 可扩展服务/provider 注册表(单一配置源) +services: + translation: + provider: "direct" # direct | http | google(reserved) + base_url: "http://127.0.0.1:6006" + model: "qwen" + timeout_sec: 10.0 + providers: + direct: + model: "qwen" + http: + base_url: "http://127.0.0.1:6006" + model: "qwen" + timeout_sec: 10.0 + google: + enabled: false + project_id: "" + location: "global" + model: "" + embedding: + provider: "http" # http | vllm(reserved) + base_url: "http://127.0.0.1:6005" + providers: + http: + base_url: "http://127.0.0.1:6005" + vllm: + enabled: false + base_url: "" + model: "" + note: "reserved for future vLLM embedding backend" + rerank: + provider: "http" # http | vllm(reserved) + base_url: "http://127.0.0.1:6007" + providers: + http: + base_url: "http://127.0.0.1:6007" + vllm: + enabled: false + base_url: "" + model: "" + note: "reserved for future vLLM reranker backend" + # SPU配置(已启用,使用嵌套skus) spu_config: enabled: true diff --git a/config/config_loader.py b/config/config_loader.py index 47fa38e..d60996a 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -9,7 +9,6 @@ that define how search should be executed (NOT how data should be indexed). """ import yaml -import os from typing import Dict, Any, List, Optional from dataclasses import dataclass, field from pathlib import Path @@ -39,12 +38,12 @@ class QueryConfig: # Query rewrite dictionary (loaded from external file) rewrite_dictionary: Dict[str, str] = field(default_factory=dict) - # Translation settings + # Translation settings (provider/URL in services.translation) translation_service: str = "deepl" translation_api_key: Optional[str] = None translation_glossary_id: Optional[str] = None translation_context: str = "e-commerce product search" - translation_prompts: Dict[str, str] = field(default_factory=dict) # Translation prompts for different use cases + translation_prompts: Dict[str, str] = field(default_factory=dict) # Embedding field names text_embedding_field: Optional[str] = "title_embedding" @@ -88,17 +87,11 @@ class RankingConfig: @dataclass class RerankConfig: - """重排配置(唯一实现:调用外部 BGE 重排服务,由请求参数 enable_rerank 控制是否执行)""" - # 重排窗口:enable_rerank 且 from+size<=rerank_window 时,从 ES 取前 rerank_window 条重排后再分页 + """重排配置(provider/URL 在 services.rerank)""" rerank_window: int = 1000 - # 可选:重排服务 URL,为空时使用 reranker 模块默认端口 6007 - service_url: Optional[str] = None timeout_sec: float = 15.0 weight_es: float = 0.4 weight_ai: float = 0.6 - # 模板:用于将搜索请求/文档字段组装成重排服务输入 - # - rerank_query_template:支持 {query} - # - rerank_doc_template:支持 {title} {brief} {vendor} {description} {category_path} rerank_query_template: str = "{query}" rerank_doc_template: str = "{title}" @@ -136,6 +129,8 @@ class SearchConfig: # ES settings es_settings: Dict[str, Any] = field(default_factory=dict) + # Extensible service/provider registry (translation/embedding/rerank/...) + services: Dict[str, Any] = field(default_factory=dict) class ConfigurationError(Exception): @@ -231,13 +226,10 @@ class ConfigLoader: # Parse query config query_config_data = config_data.get("query_config", {}) - - # Load rewrite dictionary from external file + services_data = config_data.get("services", {}) if isinstance(config_data.get("services", {}), dict) else {} rewrite_dictionary = self._load_rewrite_dictionary() - - # Parse embedding disable thresholds embedding_thresholds = query_config_data.get("embedding_disable_thresholds", {}) - + query_config = QueryConfig( supported_languages=query_config_data.get("supported_languages") or ["zh", "en"], default_language=query_config_data.get("default_language") or "en", @@ -272,11 +264,10 @@ class ConfigLoader: functions=fs_data.get("functions") or [] ) - # Parse Rerank configuration(唯一实现:外部重排服务,由 enable_rerank 控制) + # Parse Rerank (provider/URL in services.rerank) rerank_data = config_data.get("rerank", {}) rerank = RerankConfig( rerank_window=int(rerank_data.get("rerank_window", 1000)), - service_url=rerank_data.get("service_url") or None, timeout_sec=float(rerank_data.get("timeout_sec", 15.0)), weight_es=float(rerank_data.get("weight_es", 0.4)), weight_ai=float(rerank_data.get("weight_ai", 0.6)), @@ -306,7 +297,8 @@ class ConfigLoader: spu_config=spu_config, tenant_config=tenant_config_data, es_index_name=config_data.get("es_index_name", "search_products"), - es_settings=config_data.get("es_settings", {}) + es_settings=config_data.get("es_settings", {}), + services=services_data ) def _parse_index_config(self, index_data: Dict[str, Any]) -> IndexConfig: @@ -374,7 +366,7 @@ class ConfigLoader: f"Default language '{config.query_config.default_language}' " f"not in supported languages: {config.query_config.supported_languages}" ) - + return errors def to_dict(self, config: SearchConfig) -> Dict[str, Any]: @@ -413,7 +405,6 @@ class ConfigLoader: }, "rerank": { "rerank_window": config.rerank.rerank_window, - "service_url": config.rerank.service_url, "timeout_sec": config.rerank.timeout_sec, "weight_es": config.rerank.weight_es, "weight_ai": config.rerank.weight_ai, @@ -425,7 +416,8 @@ class ConfigLoader: "spu_field": config.spu_config.spu_field, "inner_hits_size": config.spu_config.inner_hits_size, "searchable_option_dimensions": config.spu_config.searchable_option_dimensions - } + }, + "services": config.services, } def _index_to_dict(self, index: IndexConfig) -> Dict[str, Any]: diff --git a/config/env_config.py b/config/env_config.py index 3b5b722..2b12323 100644 --- a/config/env_config.py +++ b/config/env_config.py @@ -63,8 +63,11 @@ EMBEDDING_HOST = os.getenv('EMBEDDING_HOST', '127.0.0.1') EMBEDDING_PORT = int(os.getenv('EMBEDDING_PORT', 6005)) TRANSLATION_HOST = os.getenv('TRANSLATION_HOST', '127.0.0.1') TRANSLATION_PORT = int(os.getenv('TRANSLATION_PORT', os.getenv('TRANSLATOR_PORT', 6006))) +TRANSLATION_PROVIDER = os.getenv('TRANSLATION_PROVIDER', 'direct') +TRANSLATION_MODEL = os.getenv('TRANSLATION_MODEL', 'qwen') RERANKER_HOST = os.getenv('RERANKER_HOST', '127.0.0.1') RERANKER_PORT = int(os.getenv('RERANKER_PORT', 6007)) +RERANK_PROVIDER = os.getenv('RERANK_PROVIDER', 'http') # API_BASE_URL: 如果未设置,根据API_HOST构建(0.0.0.0使用localhost) API_BASE_URL = os.getenv('API_BASE_URL') if not API_BASE_URL: diff --git a/config/service_endpoints.py b/config/service_endpoints.py new file mode 100644 index 0000000..c1635d3 --- /dev/null +++ b/config/service_endpoints.py @@ -0,0 +1,12 @@ +""" +Endpoint resolvers - delegate to services_config. + +Deprecated: use config.services_config directly. +Kept for backward compatibility. +""" + +from .services_config import ( + get_translation_base_url as resolve_translation_service_url, + get_embedding_base_url as resolve_embedding_service_url, + get_rerank_service_url as resolve_reranker_service_url, +) diff --git a/config/services_config.py b/config/services_config.py new file mode 100644 index 0000000..e4b095f --- /dev/null +++ b/config/services_config.py @@ -0,0 +1,169 @@ +""" +Services configuration - single source for translation, embedding, rerank providers. + +All provider selection and endpoint resolution is centralized here. +Priority: env vars > config.yaml > defaults. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from functools import lru_cache +from pathlib import Path +from typing import Any, Dict, Optional + +import yaml + + +@dataclass +class ServiceConfig: + """Config for one capability (translation/embedding/rerank).""" + provider: str + providers: Dict[str, Any] = field(default_factory=dict) + + def get_provider_cfg(self) -> Dict[str, Any]: + """Get config for current provider.""" + p = (self.provider or "").strip().lower() + return self.providers.get(p, {}) if isinstance(self.providers, dict) else {} + + +def _load_services_raw(config_path: Optional[Path] = None) -> Dict[str, Any]: + """Load services block from config.yaml.""" + if config_path is None: + config_path = Path(__file__).parent / "config.yaml" + path = Path(config_path) + if not path.exists(): + return {} + try: + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + except Exception: + return {} + services = data.get("services") if isinstance(data, dict) else {} + return services if isinstance(services, dict) else {} + + +def _resolve_translation() -> ServiceConfig: + raw = _load_services_raw() + cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} + providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} + + provider = ( + os.getenv("TRANSLATION_PROVIDER") + or cfg.get("provider") + or "direct" + ) + provider = str(provider).strip().lower() + + # Env override for http base_url + env_url = os.getenv("TRANSLATION_SERVICE_URL") + if env_url and provider == "http": + providers = dict(providers) + providers["http"] = dict(providers.get("http", {})) + providers["http"]["base_url"] = env_url.rstrip("/") + + return ServiceConfig(provider=provider, providers=providers) + + +def _resolve_embedding() -> ServiceConfig: + raw = _load_services_raw() + cfg = raw.get("embedding", {}) if isinstance(raw.get("embedding"), dict) else {} + providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} + + provider = ( + os.getenv("EMBEDDING_PROVIDER") + or cfg.get("provider") + or "http" + ) + provider = str(provider).strip().lower() + + env_url = os.getenv("EMBEDDING_SERVICE_URL") + if env_url and provider == "http": + providers = dict(providers) + providers["http"] = dict(providers.get("http", {})) + providers["http"]["base_url"] = env_url.rstrip("/") + + return ServiceConfig(provider=provider, providers=providers) + + +def _resolve_rerank() -> ServiceConfig: + raw = _load_services_raw() + cfg = raw.get("rerank", {}) if isinstance(raw.get("rerank"), dict) else {} + providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} + + provider = ( + os.getenv("RERANK_PROVIDER") + or cfg.get("provider") + or "http" + ) + provider = str(provider).strip().lower() + + env_url = os.getenv("RERANKER_SERVICE_URL") + if env_url: + url = env_url.rstrip("/") + if not url.endswith("/rerank"): + url = f"{url}/rerank" if "/rerank" not in url else url + providers = dict(providers) + providers["http"] = dict(providers.get("http", {})) + providers["http"]["base_url"] = url.replace("/rerank", "") + providers["http"]["service_url"] = url + + return ServiceConfig(provider=provider, providers=providers) + + +@lru_cache(maxsize=1) +def get_translation_config() -> ServiceConfig: + """Get translation service config.""" + return _resolve_translation() + + +@lru_cache(maxsize=1) +def get_embedding_config() -> ServiceConfig: + """Get embedding service config.""" + return _resolve_embedding() + + +@lru_cache(maxsize=1) +def get_rerank_config() -> ServiceConfig: + """Get rerank service config.""" + return _resolve_rerank() + + +def get_translation_base_url() -> str: + """Resolve translation HTTP base URL (for http provider).""" + base = ( + os.getenv("TRANSLATION_SERVICE_URL") + or get_translation_config().providers.get("http", {}).get("base_url") + or "http://127.0.0.1:6006" + ) + return str(base).rstrip("/") + + +def get_embedding_base_url() -> str: + """Resolve embedding HTTP base URL.""" + base = ( + os.getenv("EMBEDDING_SERVICE_URL") + or get_embedding_config().providers.get("http", {}).get("base_url") + or "http://127.0.0.1:6005" + ) + return str(base).rstrip("/") + + +def get_rerank_service_url() -> str: + """Resolve rerank service URL (full path including /rerank).""" + base = ( + os.getenv("RERANKER_SERVICE_URL") + or get_rerank_config().providers.get("http", {}).get("service_url") + or get_rerank_config().providers.get("http", {}).get("base_url") + or "http://127.0.0.1:6007" + ) + base = str(base).rstrip("/") + return base if base.endswith("/rerank") else f"{base}/rerank" + + +def clear_services_cache() -> None: + """Clear cached config (for tests).""" + get_translation_config.cache_clear() + get_embedding_config.cache_clear() + get_rerank_config.cache_clear() diff --git a/docs/ARCHITECTURE_EVALUATION.md b/docs/ARCHITECTURE_EVALUATION.md new file mode 100644 index 0000000..977f0bb --- /dev/null +++ b/docs/ARCHITECTURE_EVALUATION.md @@ -0,0 +1,202 @@ +# 能力提供者架构评估与统一改造方案 + +> **已落地**。实现见 `providers/`、`config/services_config.py`。使用与扩展见 `docs/PROVIDER_ARCHITECTURE.md`。 + +--- + +## 一、当前状态梳理 + +### 1.1 两种“可插拔”的辨析 + +| 模式 | 含义 | 当前是否存在 | +|------|------|--------------| +| **提供者内部可选择** | 某个 provider(如翻译)内部封装多种实现(如 qwen/deepl),内部切换 | 部分存在:`direct` 的 Translator 内部可选 qwen/deepl | +| **平台级多 provider** | 平台定义能力抽象,多个独立 provider 注册,通过配置切换 | 存在:translation 的 direct/http,rerank 的 http/vllm | + +**结论**:当前是 **平台级可插拔** 为主,但实现不统一、配置分散,造成混乱。 + +### 1.2 三种能力的实现对比 + +| 能力 | 抽象层 | Provider 实现 | 配置来源 | 问题 | +|------|--------|---------------|----------|------| +| **翻译** | `create_translation_client()` | direct, http | `query_config` + `services.translation` | 双重配置源,优先级链复杂 | +| **重排** | `create_rerank_client()` | http, vllm(reserved) | `rerank` 块 + `services.rerank` | 同上 | +| **向量化** | 无 | 仅 HTTP 直连 | `service_endpoints` 读 `services.embedding` | 无 provider 抽象,只有 endpoint 解析 | + +### 1.3 配置分散问题 + +``` +config.yaml 中: +├── query_config.translation_provider +├── query_config.translation_providers +├── query_config.translation_service_url +├── rerank.rerank_provider +├── rerank.rerank_providers +├── rerank.service_url +└── services.{translation,embedding,rerank} # 又一整套 +``` + +config_loader 用冗长的优先级链合并(env > query_config > services > defaults),维护成本高。 + +--- + +## 二、统一架构原则 + +### 2.1 设计目标 + +1. **单一配置源**:每种能力只在一个地方配置 +2. **统一抽象模式**:translation / embedding / rerank 采用相同结构 +3. **平台级可插拔**:能力 = 接口 + 多 provider 实现,通过配置切换 +4. **丢弃历史包袱**:移除冗余配置、合并重复逻辑 + +### 2.2 推荐方案:平台级 Provider Registry + +**核心思想**:平台定义“能力”,每种能力有统一接口;多个 provider 实现该接口;配置只在一个地方。 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Platform (Search Engine) │ +├─────────────────────────────────────────────────────────────┤ +│ Capability: Translation Embedding Rerank │ +│ │ │ │ │ │ +│ ▼ ▼ ▼ ▼ │ +│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ +│ │ direct │ │ http │ │ http │ ← Provider 实现 │ +│ │ http │ │ vllm │ │ vllm │ (可扩展) │ +│ │ google │ │ │ │ │ │ +│ └─────────┘ └─────────┘ └─────────┘ │ +│ ▲ ▲ ▲ │ +│ └────────────┴────────────┴── 统一从 services.* 读取 │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## 三、统一改造方案 + +### 3.1 配置结构(单一源) + +**只保留 `services` 块**,移除 query_config / rerank 中的 provider 相关字段: + +```yaml +services: + translation: + provider: "direct" # 当前使用的 provider + providers: + direct: + model: "qwen" + http: + base_url: "http://127.0.0.1:6006" + model: "qwen" + timeout_sec: 10.0 + google: + enabled: false + # ... + + embedding: + provider: "http" + providers: + http: + base_url: "http://127.0.0.1:6005" + vllm: + enabled: false + # ... + + rerank: + provider: "http" + providers: + http: + base_url: "http://127.0.0.1:6007" + service_path: "/rerank" + vllm: + enabled: false + # ... +``` + +**环境变量**(部署态覆盖)保持简洁: +- `TRANSLATION_PROVIDER`, `TRANSLATION_SERVICE_URL` +- `EMBEDDING_PROVIDER`, `EMBEDDING_SERVICE_URL` +- `RERANK_PROVIDER`, `RERANKER_SERVICE_URL` + +### 3.2 统一 Provider 创建入口 + +新建 `providers/` 模块,统一工厂: + +```python +# providers/__init__.py +def create_translation_provider(config: ServicesConfig) -> TranslationProvider +def create_embedding_provider(config: ServicesConfig) -> EmbeddingProvider +def create_rerank_provider(config: ServicesConfig) -> RerankProvider +``` + +每个 factory 从 `config.services["translation"]` 等读取,不再从 query_config / rerank 块读取。 + +### 3.3 能力接口(Protocol) + +```python +# providers/base.py +class TranslationProvider(Protocol): + def translate(self, text, target_lang, ...) -> Optional[str]: ... + def translate_for_indexing(self, ...) -> Dict[str, Optional[str]]: ... + +class EmbeddingProvider(Protocol): + def encode_text(self, texts: List[str]) -> np.ndarray: ... + def encode_image(self, url: str) -> Optional[np.ndarray]: ... + +class RerankProvider(Protocol): + def rerank(self, query: str, docs: List[str], timeout: float) -> Tuple[Optional[List[float]], ...]: ... +``` + +### 3.4 迁移步骤 + +| 步骤 | 内容 | +|------|------| +| 1 | 新建 `config/services_config.py`,定义 `ServicesConfig`,只从 `services` 块加载 | +| 2 | 新建 `providers/` 目录,实现 `create_*_provider()`,迁移 translation/rerank 逻辑 | +| 3 | 为 embedding 增加 provider 抽象(HttpEmbeddingProvider),封装 BgeEncoder/CLIPImageEncoder 的 HTTP 调用 | +| 4 | 从 `query_config` 移除 translation_provider/providers/service_url 等 | +| 5 | 从 `rerank` 块移除 rerank_provider/providers/service_url 等 | +| 6 | 精简 `config_loader.py`,删除冗长的 provider 合并逻辑 | +| 7 | 更新 `config.yaml`,删除重复配置 | +| 8 | 调用方改为使用 `create_*_provider(services_config)` | + +--- + +## 四、回答核心问题 + +### Q1: 可插拔是提供者内部可选择,还是平台多 provider? + +**答**:采用 **平台级多 provider**。每种能力(translation/embedding/rerank)在平台层定义接口,多个独立 provider 实现该接口,通过配置切换。提供者内部(如 direct 的 qwen/deepl)可作为该 provider 的子选项,但不作为平台级扩展点。 + +### Q2: 现在是两者都有吗? + +**答**:之前是混合状态——配置分散、三种能力实现不一致。改造后 **只保留平台级可插拔**,结构统一。 + +### Q3: 如何减少混乱、架构清晰? + +**答**: +1. **单一配置源**:`services.{translation,embedding,rerank}` +2. **统一模式**:每种能力 = Protocol + factory + 多 provider 实现 +3. **丢弃冗余**:删除 query_config/rerank 中的 provider 配置,删除 service_endpoints 中的重复解析逻辑 + +--- + +## 五、改造后的目录结构 + +``` +providers/ +├── __init__.py # create_*_provider 导出 +├── base.py # Protocol 定义 +├── translation/ +│ ├── direct.py # 进程内 Translator +│ ├── http.py # HttpTranslationClient +│ └── ... +├── embedding/ +│ ├── http.py # HttpEmbeddingProvider (封装 BgeEncoder/CLIP 的 HTTP) +│ └── vllm.py # reserved +└── rerank/ + ├── http.py # HttpRerankClient + └── vllm.py # reserved +``` + +调用方(query_parser, searcher, indexer)只依赖 `providers.create_*_provider(services_config)`,不关心具体实现。 diff --git a/docs/CNCLIP_SERVICE说明文档.md b/docs/CNCLIP_SERVICE说明文档.md index 208f4eb..6fa2d4e 100644 --- a/docs/CNCLIP_SERVICE说明文档.md +++ b/docs/CNCLIP_SERVICE说明文档.md @@ -1,5 +1,10 @@ +# CN-CLIP 服务(Legacy) -# TODO +> **注意**:当前主流程使用 embedding 服务(端口 6005),见 `docs/QUICKSTART.md` 3.3。本文档为 legacy gRPC 服务说明。 + +--- + +# TODO(历史) 现在,跟自己 cn_clip 预估的结果,有差别: 这个比较接近: 可能是预处理逻辑有些不一样。 diff --git a/docs/PROVIDER_ARCHITECTURE.md b/docs/PROVIDER_ARCHITECTURE.md new file mode 100644 index 0000000..95a1a2e --- /dev/null +++ b/docs/PROVIDER_ARCHITECTURE.md @@ -0,0 +1,61 @@ +# Provider 架构与扩展指南 + +本文档说明如何统一管理翻译、向量化、重排等“能力提供者(provider)”。 + +## 1. 设计目标 + +- **调用方稳定**:业务代码不关心具体供应商,只调用统一接口。 +- **配置可切换**:通过配置切换 provider,不改业务代码。 +- **单一配置源**:所有 provider 配置在 `config/config.yaml` 的 `services` 块。 + +## 2. 当前落地状态 + +### 2.1 统一入口 + +- **模块**:`providers/` +- **工厂**:`create_translation_provider()`, `create_rerank_provider()`, `create_embedding_provider()` +- **配置**:`config/services_config.py` 从 `services` 块加载,env 可覆盖 + +### 2.2 翻译 + +- `providers/translation.py`:`direct`(进程内 Translator)、`http`(HTTP 服务) +- 调用方:`query/query_parser.py`, `indexer/indexing_utils.py` + +### 2.3 重排 + +- `providers/rerank.py`:`http`(vllm 预留) +- 调用方:`search/rerank_client.py` → `run_rerank()` + +### 2.4 向量化 + +- `providers/embedding.py`:`http`(vllm 预留) +- 封装 `BgeEncoder` / `CLIPImageEncoder`,URL 来自 `services_config` + +## 3. 配置 + +**单一配置源**:`config/config.yaml` 的 `services` 块。 + +```yaml +services: + translation: + provider: "direct" # direct | http + providers: + direct: { model: "qwen" } + http: { base_url: "http://127.0.0.1:6006", model: "qwen", timeout_sec: 10.0 } + embedding: + provider: "http" + providers: + http: { base_url: "http://127.0.0.1:6005" } + rerank: + provider: "http" + providers: + http: { base_url: "http://127.0.0.1:6007" } +``` + +**环境变量**(部署态覆盖):`TRANSLATION_PROVIDER`, `TRANSLATION_SERVICE_URL`, `EMBEDDING_SERVICE_URL`, `RERANKER_SERVICE_URL` + +## 4. 新增 provider + +1. 在 `providers/.py` 中实现新 provider 类 +2. 在 `create_*_provider()` 中注册分支 +3. 在 `config/config.yaml` 的 `services..providers` 中补充参数 diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md new file mode 100644 index 0000000..ebe7030 --- /dev/null +++ b/docs/QUICKSTART.md @@ -0,0 +1,122 @@ +# 开发者快速上手 + +新人入口文档:环境、服务、模块、请求示例一页搞定。 + +## 1. 环境 + +```bash +source activate.sh +# 首次:./scripts/create_venv.sh 或 conda env create -f environment.yml +``` + +依赖:Python 3.8+、Elasticsearch 8.x、MySQL、Redis(可选)。详见 `docs/环境配置说明.md`。 + +## 2. 服务与端口 + +| 服务 | 端口 | 默认启动 | 说明 | +|------|-----:|:--------:|------| +| backend | 6002 | ✓ | 搜索 API | +| indexer | 6004 | ✓ | 索引 API | +| frontend | 6003 | ✓ | 调试 UI | +| embedding | 6005 | - | 向量服务 | +| translator | 6006 | - | 翻译服务 | +| reranker | 6007 | - | 重排服务 | + +```bash +./run.sh +# 全功能:START_EMBEDDING=1 START_TRANSLATOR=1 START_RERANKER=1 ./run.sh +./scripts/service_ctl.sh status +./scripts/stop.sh +``` + +## 3. 模块与请求 + +### 3.1 搜索 API(backend 6002) + +```bash +# 文本搜索 +curl -X POST http://localhost:6002/search/ \ + -H "Content-Type: application/json" \ + -H "X-Tenant-ID: 162" \ + -d '{"query": "玩具", "size": 10}' + +# 图片搜索 +curl -X POST http://localhost:6002/search/image \ + -H "Content-Type: application/json" \ + -H "X-Tenant-ID: 162" \ + -d '{"image_url": "https://example.com/img.jpg", "size": 10}' + +# 建议 +curl "http://localhost:6002/search/suggestions?q=玩&size=5" -H "X-Tenant-ID: 162" +``` + +API 文档:http://localhost:6002/docs + +### 3.2 索引 API(indexer 6004) + +```bash +# 创建租户索引 +./scripts/create_tenant_index.sh 162 + +# 全量索引 +curl -X POST http://localhost:6004/indexer/reindex \ + -H "Content-Type: application/json" \ + -d '{"tenant_id": "162", "batch_size": 500}' + +# 构建文档(不写 ES,供上游调用) +curl -X POST http://localhost:6004/indexer/build-docs \ + -H "Content-Type: application/json" \ + -d '{"tenant_id": "162", "items": [{"spu": {...}, "skus": [...], "options": [...]}]}' +``` + +### 3.3 向量服务(embedding 6005) + +```bash +./scripts/start_embedding_service.sh + +# 文本向量 +curl -X POST http://localhost:6005/embed/text \ + -H "Content-Type: application/json" \ + -d '["衣服", "Bohemian Maxi Dress"]' + +# 图片向量(URL 列表) +curl -X POST http://localhost:6005/embed/image \ + -H "Content-Type: application/json" \ + -d '["https://example.com/img.jpg"]' +``` + +### 3.4 翻译服务(translator 6006) + +```bash +./scripts/start_translator.sh + +curl -X POST http://localhost:6006/translate \ + -H "Content-Type: application/json" \ + -d '{"text": "商品名称", "target_lang": "en", "source_lang": "zh"}' +``` + +### 3.5 重排服务(reranker 6007) + +```bash +./scripts/start_reranker.sh + +curl -X POST http://localhost:6007/rerank \ + -H "Content-Type: application/json" \ + -d '{"query": "wireless mouse", "docs": ["logitech mx master", "usb cable"]}' +``` + +## 4. 配置 + +- **主配置**:`config/config.yaml`(搜索行为、字段权重、分面等) +- **服务 provider**:`config/config.yaml` 的 `services` 块(翻译/向量/重排的 provider 与 URL) +- **环境变量**:`.env`(DB、ES、Redis、API Key 等) + +## 5. 延伸阅读 + +| 文档 | 用途 | +|------|------| +| `docs/Usage-Guide.md` | 运维:日志、多环境、故障排查 | +| `docs/搜索API速查表.md` | 搜索 API 参数速查 | +| `docs/搜索API对接指南.md` | 搜索 API 完整说明 | +| `docs/PROVIDER_ARCHITECTURE.md` | 翻译/向量/重排 provider 扩展 | +| `indexer/README.md` | 索引模块职责与接口 | diff --git a/docs/SERVICE_MATRIX.md b/docs/SERVICE_MATRIX.md deleted file mode 100644 index a634298..0000000 --- a/docs/SERVICE_MATRIX.md +++ /dev/null @@ -1,57 +0,0 @@ -# 服务矩阵(Service Matrix) - -本文档定义当前项目的服务分层、默认启动策略与脚本入口。 - -## 1. 服务分层 - -| 服务 | 角色 | 默认端口 | 是否默认启动 | 启动脚本 | 停止方式 | -|---|---|---:|---|---|---| -| backend | 核心搜索 API | 6002 | 是 | `scripts/start_backend.sh` | `scripts/service_ctl.sh stop backend` | -| indexer | 核心索引 API | 6004 | 是 | `scripts/start_indexer.sh` | `scripts/service_ctl.sh stop indexer` | -| frontend | 调试 UI | 6003 | 是 | `scripts/start_frontend.sh` | `scripts/service_ctl.sh stop frontend` | -| embedding | 向量服务(文本/图片) | 6005 | 否(按需) | `scripts/start_embedding_service.sh` | `scripts/service_ctl.sh stop embedding` | -| translator | 翻译服务(qwen/deepl) | 6006 | 否(按需) | `scripts/start_translator.sh` | `scripts/service_ctl.sh stop translator` | -| reranker | 重排服务(BGE) | 6007 | 否(按需) | `scripts/start_reranker.sh` | `scripts/service_ctl.sh stop reranker` | -| clip | CLIP 替代服务(legacy/可选) | 51000 | 否(按需) | `scripts/start_clip_service.sh` | `scripts/service_ctl.sh stop clip` | -| cnclip | CN-CLIP gRPC 服务(legacy/可选) | 51000 | 否(按需) | `scripts/start_cnclip_service.sh` | `scripts/service_ctl.sh stop cnclip` | - -> 说明:`clip` 与 `cnclip` 都是 legacy 服务,脚本内部自带后台化与 PID 管理,`service_ctl.sh` 仅做编排与委托。 - -## 2. 统一控制入口 - -- 推荐统一入口:`scripts/service_ctl.sh` -- 支持命令:`start` / `stop` / `restart` / `status` - -示例: - -```bash -# 启动核心服务(backend/indexer/frontend) -./scripts/service_ctl.sh start - -# 启动指定服务 -./scripts/service_ctl.sh start backend indexer frontend translator reranker - -# 查看所有服务状态 -./scripts/service_ctl.sh status - -# 停止全部已知服务 -./scripts/service_ctl.sh stop -``` - -## 3. 默认与可选服务策略 - -- `./run.sh` 默认只启动核心服务:`backend/indexer/frontend` -- 如需启动可选能力,使用环境变量: - -```bash -START_EMBEDDING=1 START_TRANSLATOR=1 START_RERANKER=1 ./run.sh -``` - -## 4. 兼容入口 - -以下脚本仍保留,用于兼容旧习惯,但内部已委托到统一控制脚本: - -- `run.sh` -- `restart.sh` -- `scripts/start.sh` -- `scripts/stop.sh` diff --git a/docs/向量化模块和API说明文档.md b/docs/向量化模块和API说明文档.md index 4a3f803..9103e89 100644 --- a/docs/向量化模块和API说明文档.md +++ b/docs/向量化模块和API说明文档.md @@ -1,1425 +1,15 @@ -# 向量化模块和API说明文档 +# 向量化模块 -本文档详细说明saas-search项目中的向量化模块架构、API接口、配置方法和使用指南。 +**快速上手**:见 `docs/QUICKSTART.md` 第 3.3 节。 -## 目录 +## 服务接口 -1. [概述](#概述) - - 1.1 [向量化模块简介](#11-向量化模块简介) - - 1.2 [技术选型](#12-技术选型) - - 1.3 [应用场景](#13-应用场景) +- `POST /embed/text`:文本向量,入参 `["text1", "text2"]`,出参 `[[...], [...]]` +- `POST /embed/image`:图片向量,入参 `["url1", "url2"]`,出参 `[[...], [...]]` -2. [向量化服务架构](#向量化服务架构) - - 2.1 [本地向量化服务](#21-本地向量化服务) - - 2.2 [云端向量化服务](#22-云端向量化服务) - - 2.3 [架构对比](#23-架构对比) +## 配置 -3. [本地向量化服务](#本地向量化服务) - - 3.1 [服务启动](#31-服务启动) - - 3.2 [服务配置](#32-服务配置) - - 3.3 [模型说明](#33-模型说明) +- Provider/URL:`config/config.yaml` 的 `services.embedding` +- 模型路径:`embeddings/config.py` 或 env `TEXT_MODEL_DIR`、`IMAGE_MODEL_DIR` -4. [云端向量化服务](#云端向量化服务) - - 4.1 [阿里云DashScope](#41-阿里云dashscope) - - 4.2 [API Key配置](#42-api-key配置) - - 4.3 [使用方式](#43-使用方式) - -5. [Embedding API详细说明](#embedding-api详细说明) - - 5.1 [API概览](#51-api概览) - - 5.2 [健康检查接口](#52-健康检查接口) - - 5.3 [文本向量化接口](#53-文本向量化接口) - - 5.4 [图片向量化接口](#54-图片向量化接口) - - 5.5 [错误处理](#55-错误处理) - -6. [配置说明](#配置说明) - - 6.1 [服务配置](#61-服务配置) - - 6.2 [模型配置](#62-模型配置) - - 6.3 [批处理配置](#63-批处理配置) - -7. [客户端集成示例](#客户端集成示例) - - 7.1 [Python客户端](#71-python客户端) - - 7.2 [Java客户端](#72-java客户端) - - 7.3 [cURL示例](#73-curl示例) - -8. [性能对比与优化](#性能对比与优化) - - 8.1 [性能对比](#81-性能对比) - - 8.2 [成本对比](#82-成本对比) - - 8.3 [优化建议](#83-优化建议) - -9. [故障排查](#故障排查) - - 9.1 [常见问题](#91-常见问题) - - 9.2 [日志查看](#92-日志查看) - - 9.3 [性能调优](#93-性能调优) - -10. [附录](#附录) - - 10.1 [向量维度说明](#101-向量维度说明) - - 10.2 [模型版本信息](#102-模型版本信息) - - 10.3 [相关文档](#103-相关文档) - ---- - -## 概述 - -### 1.1 向量化模块简介 - -saas-search项目实现了完整的文本和图片向量化能力,支持两种部署方式: - -1. **本地向量化服务**:独立部署的微服务,基于本地GPU/CPU运行BGE-M3和CN-CLIP模型 -2. **云端向量化服务**:集成阿里云DashScope API,按使用量付费 - -向量化模块是搜索引擎的核心组件,为语义搜索、图片搜索提供AI驱动的相似度计算能力。 - -### 1.2 技术选型 - -| 功能 | 本地服务 | 云端服务 | -|------|---------|---------| -| **文本模型** | BGE-M3 (Xorbits/bge-m3) | text-embedding-v4 | -| **图片模型** | CN-CLIP (ViT-H-14) | - | -| **向量维度** | 1024 | 1024 | -| **服务框架** | FastAPI | 阿里云API | -| **部署方式** | Docker/本地 | 云端API | - -### 1.3 应用场景 - -- **语义搜索**:查询文本向量化,与商品向量计算相似度 -- **图片搜索**:商品图片向量化,支持以图搜图 -- **混合检索**:BM25 + 向量相似度组合排序 -- **多语言搜索**:中英文跨语言语义理解 - ---- - -## 向量化服务架构 - -### 2.1 本地向量化服务 - -``` -┌─────────────────────────────────────────┐ -│ Embedding Microservice (FastAPI) │ -│ Port: 6005, Workers: 1 │ -└──────────────┬──────────────────────────┘ - │ - ┌───────┴───────┐ - │ │ -┌──────▼──────┐ ┌────▼─────┐ -│ BGE-M3 │ │ CN-CLIP │ -│ Text Model │ │ Image │ -│ (CUDA/CPU) │ │ Model │ -└─────────────┘ └──────────┘ -``` - -**核心特性**: -- 独立部署,可横向扩展 -- GPU加速支持 -- 线程安全设计 -- 启动时预加载模型 - -### 2.2 云端向量化服务 - -``` -┌─────────────────────────────────────┐ -│ saas-search Main Service │ -│ (uses CloudTextEncoder) │ -└──────────────┬──────────────────────┘ - │ - ▼ -┌─────────────────────────────────────┐ -│ Aliyun DashScope API │ -│ text-embedding-v4 │ -│ (HTTP/REST) │ -└─────────────────────────────────────┘ -``` - -**核心特性**: -- 无需GPU资源 -- 按使用量计费 -- 自动扩展 -- 低运维成本 - -### 2.3 架构对比 - -| 维度 | 本地服务 | 云端服务 | -|------|---------|---------| -| **初始成本** | 高(GPU服务器) | 低(按需付费) | -| **运行成本** | 固定 | 变动(按调用量) | -| **延迟** | <100ms | 300-400ms | -| **吞吐量** | 高(~32 qps) | 中(~2-3 qps) | -| **离线支持** | ✅ | ❌ | -| **维护成本** | 高 | 低 | -| **扩展性** | 手动扩展 | 自动扩展 | -| **适用场景** | 大规模生产环境 | 初期开发/小规模应用 | - ---- - -## 本地向量化服务 - -### 3.1 服务启动 - -#### 方式1:使用脚本启动(推荐) - -```bash -# 启动向量化服务 -./scripts/start_embedding_service.sh -``` - -脚本特性: -- 自动激活conda环境 -- 读取配置文件获取端口 -- 单worker模式启动服务 - -#### 方式2:手动启动 - -```bash -# 激活环境(推荐使用项目根目录 activate.sh;新机器按需 export CONDA_ROOT) -# 例如你的 conda 是 ~/anaconda3/bin/conda,则 export CONDA_ROOT=$HOME/anaconda3 -cd /data/saas-search -source activate.sh - -# 启动服务 -python -m uvicorn embeddings.server:app \ - --host 0.0.0.0 \ - --port 6005 \ - --workers 1 -``` - -#### 方式3:Docker部署(生产环境) - -```bash -# 构建镜像 -docker build -t searchengine-embedding:latest . - -# 启动容器 -docker run -d \ - --name embedding-service \ - --gpus all \ - -p 6005:6005 \ - searchengine-embedding:latest -``` - -### 3.2 服务配置 - -配置文件:`embeddings/config.py` - -```python -class EmbeddingConfig: - # 服务配置 - HOST = "0.0.0.0" # 监听地址 - PORT = 6005 # 监听端口 - - # 文本模型 (BGE-M3) - TEXT_MODEL_DIR = "Xorbits/bge-m3" # 模型路径/HuggingFace ID - TEXT_DEVICE = "cuda" # 设备: "cuda" 或 "cpu" - TEXT_BATCH_SIZE = 32 # 批处理大小 - - # 图片模型 (CN-CLIP) - IMAGE_MODEL_NAME = "ViT-H-14" # 模型名称 - IMAGE_DEVICE = None # None=自动, "cuda", "cpu" - IMAGE_BATCH_SIZE = 8 # 批处理大小 -``` - -### 3.3 模型说明 - -#### BGE-M3 文本模型 - -- **模型ID**: `Xorbits/bge-m3` -- **向量维度**: 1024 -- **支持语言**: 中文、英文、多语言(100+) -- **特性**: 强大的语义理解能力,支持长文本 -- **部署**: 自动从HuggingFace下载 - -#### CN-CLIP 图片模型 - -- **模型**: ViT-H-14 (Chinese CLIP) -- **向量维度**: 1024 -- **输入**: 图片URL或本地路径 -- **特性**: 中文图文理解,适合电商场景 -- **预处理**: 自动下载、缩放、归一化 - ---- - -## 云端向量化服务 - -### 4.1 阿里云DashScope - -**服务地址**: -- 北京地域:`https://dashscope.aliyuncs.com/compatible-mode/v1` -- 新加坡地域:`https://dashscope-intl.aliyuncs.com/compatible-mode/v1` - -**模型信息**: -- **模型名**: `text-embedding-v4` -- **向量维度**: 1024 -- **输入限制**: 单次最多2048个文本,每个文本最大8192 token -- **速率限制**: 根据API套餐不同而不同 - -### 4.2 API Key配置 - -#### 方式1:环境变量(推荐) - -```bash -# 临时设置 -export DASHSCOPE_API_KEY="sk-your-api-key-here" - -# 永久设置(添加到 ~/.bashrc 或 ~/.zshrc) -echo 'export DASHSCOPE_API_KEY="sk-your-api-key-here"' >> ~/.bashrc -source ~/.bashrc -``` - -#### 方式2:.env文件 - -在项目根目录创建`.env`文件: - -```bash -DASHSCOPE_API_KEY=sk-your-api-key-here -``` - -**获取API Key**:https://help.aliyun.com/zh/model-studio/get-api-key - -### 4.3 使用方式 - -```python -from embeddings.cloud_text_encoder import CloudTextEncoder - -# 初始化编码器(自动从环境变量读取API Key) -encoder = CloudTextEncoder() - -# 单个文本向量化 -text = "衣服的质量杠杠的" -embedding = encoder.encode(text) -print(embedding.shape) # (1, 1024) - -# 批量向量化 -texts = ["文本1", "文本2", "文本3"] -embeddings = encoder.encode(texts) -print(embeddings.shape) # (3, 1024) - -# 大批量处理(自动分批) -large_texts = [f"商品 {i}" for i in range(1000)] -embeddings = encoder.encode_batch(large_texts, batch_size=32) -``` - -**自定义配置**: - -```python -# 使用新加坡地域 -encoder = CloudTextEncoder( - api_key="sk-xxx", - base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1" -) -``` - ---- - -## Embedding API详细说明 - -### 5.1 API概览 - -本地向量化服务提供RESTful API接口: - -| 端点 | 方法 | 功能 | -|------|------|------| -| `/health` | GET | 健康检查 | -| `/embed/text` | POST | 文本向量化 | -| `/embed/image` | POST | 图片向量化 | - -**服务地址**: -- 默认:`http://localhost:6005` -- 生产:`http://:6005` - -### 5.2 健康检查接口 - -```http -GET /health -``` - -**响应示例**: -```json -{ - "status": "ok", - "text_model_loaded": true, - "image_model_loaded": true -} -``` - -**字段说明**: -- `status`: 服务状态,"ok"表示正常 -- `text_model_loaded`: 文本模型是否加载成功 -- `image_model_loaded`: 图片模型是否加载成功 - -**cURL示例**: -```bash -curl http://localhost:6005/health -``` - -### 5.3 文本向量化接口 - -```http -POST /embed/text -Content-Type: application/json -``` - -#### 请求格式 - -**请求体**(JSON数组): -```json -[ - "衣服的质量杠杠的", - "Bohemian Maxi Dress", - "Vintage Denim Jacket" -] -``` - -**参数说明**: -- 类型:`List[str]` -- 长度:建议≤100(避免超时) -- 单个文本:建议≤512个字符 - -#### 响应格式 - -**成功响应**(200 OK): -```json -[ - [0.1234, -0.5678, 0.9012, ..., 0.3456], // 1024维向量 - [0.2345, 0.6789, -0.1234, ..., 0.4567], // 1024维向量 - [0.3456, -0.7890, 0.2345, ..., 0.5678] // 1024维向量 -] -``` - -**字段说明**: -- 类型:`List[List[float]]` -- 每个向量:1024个浮点数 -- 对齐原则:输出数组与输入数组按索引一一对应 -- 失败项:返回`null` - -**错误示例**: -```json -[ - [0.1234, -0.5678, ...], // 成功 - null, // 失败(空文本或其他错误) - [0.3456, 0.7890, ...] // 成功 -] -``` - -#### cURL示例 - -```bash -# 单个文本 -curl -X POST http://localhost:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["测试查询文本"]' - -# 批量文本 -curl -X POST http://localhost:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["红色连衣裙", "blue jeans", "vintage dress"]' -``` - -#### Python示例 - -```python -import requests -import numpy as np - -def embed_texts(texts): - """文本向量化""" - response = requests.post( - "http://localhost:6005/embed/text", - json=texts, - timeout=30 - ) - response.raise_for_status() - embeddings = response.json() - - # 转换为numpy数组 - valid_embeddings = [e for e in embeddings if e is not None] - return np.array(valid_embeddings) - -# 使用 -texts = ["红色连衣裙", "blue jeans"] -embeddings = embed_texts(texts) -print(f"Shape: {embeddings.shape}") # (2, 1024) - -# 计算相似度 -similarity = np.dot(embeddings[0], embeddings[1]) -print(f"Similarity: {similarity}") -``` - -### 5.4 图片向量化接口 - -```http -POST /embed/image -Content-Type: application/json -``` - -#### 请求格式 - -**请求体**(JSON数组): -```json -[ - "https://example.com/product1.jpg", - "https://example.com/product2.png", - "/local/path/to/product3.jpg" -] -``` - -**参数说明**: -- 类型:`List[str]` -- 支持:HTTP URL或本地文件路径 -- 格式:JPG、PNG等常见图片格式 -- 长度:建议≤10(图片处理较慢) - -#### 响应格式 - -**成功响应**(200 OK): -```json -[ - [0.1234, 0.5678, 0.9012, ..., 0.3456], // 1024维向量 - null, // 失败(图片无效或下载失败) - [0.3456, 0.7890, 0.2345, ..., 0.5678] // 1024维向量 -] -``` - -**特性**: -- 自动下载:HTTP URL自动下载图片 -- 逐个处理:串行处理(带锁保证线程安全) -- 容错:单个失败不影响其他图片 - -#### cURL示例 - -```bash -# 单个图片(URL) -curl -X POST http://localhost:6005/embed/image \ - -H "Content-Type: application/json" \ - -d '["https://example.com/product.jpg"]' - -# 多个图片(混合URL和本地路径) -curl -X POST http://localhost:6005/embed/image \ - -H "Content-Type: application/json" \ - -d '["https://example.com/img1.jpg", "/data/images/img2.png"]' -``` - -#### Python示例 - -```python -import requests -import numpy as np - -def embed_images(image_urls): - """图片向量化""" - response = requests.post( - "http://localhost:6005/embed/image", - json=image_urls, - timeout=120 # 图片处理较慢,设置更长超时 - ) - response.raise_for_status() - embeddings = response.json() - - # 过滤成功的向量化结果 - valid_embeddings = [(url, emb) for url, emb in zip(image_urls, embeddings) if emb is not None] - return valid_embeddings - -# 使用 -image_urls = [ - "https://example.com/dress1.jpg", - "https://example.com/dress2.jpg" -] - -results = embed_images(image_urls) -for url, embedding in results: - print(f"{url}: {len(embedding)} dimensions") -``` - -### 5.5 错误处理 - -#### HTTP状态码 - -| 状态码 | 含义 | 处理方式 | -|--------|------|---------| -| 200 | 成功 | 正常处理响应 | -| 500 | 服务器错误 | 检查服务日志 | -| 503 | 服务不可用 | 模型未加载,检查启动日志 | - -#### 常见错误场景 - -1. **模型未加载** -```json -{ - "detail": "Runtime Error: Text model not loaded" -} -``` -**解决**:检查服务启动日志,确认模型加载成功 - -2. **无效输入** -```json -[null, null] -``` -**原因**:输入包含空字符串或None - -3. **图片下载失败** -```json -[ - [0.123, ...], - null // URL无效或网络问题 -] -``` -**解决**:检查URL是否可访问 - ---- - -## 配置说明 - -### 6.1 服务配置 - -编辑 `embeddings/config.py` 修改服务配置: - -```python -class EmbeddingConfig: - # ========== 服务配置 ========== - HOST = "0.0.0.0" # 监听所有网卡 - PORT = 6005 # 默认端口 -``` - -**生产环境建议**: -- 使用反向代理(Nginx)处理SSL -- 配置防火墙规则限制访问 -- 使用Docker容器隔离 - -### 6.2 模型配置 - -#### 文本模型配置 - -```python -# ========== BGE-M3 文本模型 ========== -TEXT_MODEL_DIR = "Xorbits/bge-m3" # HuggingFace模型ID -TEXT_DEVICE = "cuda" # 设备选择 -TEXT_BATCH_SIZE = 32 # 批处理大小 -``` - -**DEVICE选择**: -- `"cuda"`: GPU加速(推荐,需要CUDA) -- `"cpu"`: CPU模式(较慢,但兼容性好) - -**批处理大小建议**: -- GPU(16GB显存):32-64 -- GPU(8GB显存):16-32 -- CPU:8-16 - -#### 图片模型配置 - -```python -# ========== CN-CLIP 图片模型 ========== -IMAGE_MODEL_NAME = "ViT-H-14" # 模型名称 -IMAGE_DEVICE = None # None=自动检测 -IMAGE_BATCH_SIZE = 8 # 批处理大小 -``` - -**IMAGE_DEVICE选择**: -- `None`: 自动检测(推荐) -- `"cuda"`: 强制使用GPU -- `"cpu"`: 强制使用CPU - -### 6.3 批处理配置 - -**批处理大小调优**: - -| 场景 | 文本Batch Size | 图片Batch Size | 说明 | -|------|---------------|---------------|------| -| 开发测试 | 16 | 1 | 快速响应 | -| 生产环境(GPU) | 32-64 | 4-8 | 平衡性能 | -| 生产环境(CPU) | 8-16 | 1-2 | 避免内存溢出 | -| 离线批处理 | 128+ | 16+ | 最大化吞吐 | - -**批处理建议**: -1. 监控GPU内存使用:`nvidia-smi` -2. 逐步增加batch_size直到OOM -3. 预留20%内存余量 - ---- - -## 客户端集成示例 - -### 7.1 Python客户端 - -#### 基础客户端类 - -```python -import requests -from typing import List, Optional -import numpy as np - -class EmbeddingServiceClient: - """向量化服务客户端""" - - def __init__(self, base_url: str = "http://localhost:6005"): - self.base_url = base_url.rstrip('/') - self.timeout = 30 - - def health_check(self) -> dict: - """健康检查""" - response = requests.get(f"{self.base_url}/health", timeout=5) - response.raise_for_status() - return response.json() - - def embed_text(self, text: str) -> Optional[List[float]]: - """单个文本向量化""" - result = self.embed_texts([text]) - return result[0] if result else None - - def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]: - """批量文本向量化""" - if not texts: - return [] - - response = requests.post( - f"{self.base_url}/embed/text", - json=texts, - timeout=self.timeout - ) - response.raise_for_status() - return response.json() - - def embed_image(self, image_url: str) -> Optional[List[float]]: - """单个图片向量化""" - result = self.embed_images([image_url]) - return result[0] if result else None - - def embed_images(self, image_urls: List[str]) -> List[Optional[List[float]]]: - """批量图片向量化""" - if not image_urls: - return [] - - response = requests.post( - f"{self.base_url}/embed/image", - json=image_urls, - timeout=120 # 图片处理需要更长时间 - ) - response.raise_for_status() - return response.json() - - def embed_texts_to_numpy(self, texts: List[str]) -> Optional[np.ndarray]: - """批量文本向量化,返回numpy数组""" - embeddings = self.embed_texts(texts) - valid_embeddings = [e for e in embeddings if e is not None] - if not valid_embeddings: - return None - return np.array(valid_embeddings, dtype=np.float32) - -# 使用示例 -if __name__ == "__main__": - client = EmbeddingServiceClient() - - # 健康检查 - health = client.health_check() - print(f"Service status: {health}") - - # 文本向量化 - texts = ["红色连衣裙", "blue jeans", "vintage dress"] - embeddings = client.embed_texts_to_numpy(texts) - print(f"Embeddings shape: {embeddings.shape}") - - # 计算相似度 - from sklearn.metrics.pairwise import cosine_similarity - similarities = cosine_similarity(embeddings) - print(f"Similarity matrix:\n{similarities}") -``` - -#### 高级用法:异步客户端 - -```python -import aiohttp -import asyncio -from typing import List, Optional - -class AsyncEmbeddingClient: - """异步向量化服务客户端""" - - def __init__(self, base_url: str = "http://localhost:6005"): - self.base_url = base_url.rstrip('/') - self.session: Optional[aiohttp.ClientSession] = None - - async def __aenter__(self): - self.session = aiohttp.ClientSession() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - if self.session: - await self.session.close() - - async def embed_texts(self, texts: List[str]) -> List[Optional[List[float]]]: - """异步批量文本向量化""" - if not texts: - return [] - - if not self.session: - raise RuntimeError("Client not initialized. Use 'async with'.") - - async with self.session.post( - f"{self.base_url}/embed/text", - json=texts, - timeout=aiohttp.ClientTimeout(total=30) - ) as response: - response.raise_for_status() - return await response.json() - -# 使用示例 -async def main(): - async with AsyncEmbeddingClient() as client: - texts = ["text1", "text2", "text3"] - embeddings = await client.embed_texts(texts) - print(f"Got {len(embeddings)} embeddings") - -asyncio.run(main()) -``` - -### 7.2 Java客户端 - -#### 基础客户端类 - -```java -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.time.Duration; -import java.util.List; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - -public class EmbeddingServiceClient { - private final HttpClient httpClient; - private final ObjectMapper objectMapper; - private final String baseUrl; - - public EmbeddingServiceClient(String baseUrl) { - this.baseUrl = baseUrl.replaceAll("/$", ""); - this.httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(10)) - .build(); - this.objectMapper = new ObjectMapper(); - } - - /** - * 健康检查 - */ - public HealthStatus healthCheck() throws Exception { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/health")) - .timeout(Duration.ofSeconds(5)) - .GET() - .build(); - - HttpResponse response = httpClient.send( - request, - HttpResponse.BodyHandlers.ofString() - ); - - JsonNode json = objectMapper.readTree(response.body()); - return new HealthStatus( - json.get("status").asText(), - json.get("text_model_loaded").asBoolean(), - json.get("image_model_loaded").asBoolean() - ); - } - - /** - * 批量文本向量化 - */ - public List embedTexts(List texts) throws Exception { - // 构建请求体 - ArrayNode requestBody = objectMapper.createArrayNode(); - for (String text : texts) { - requestBody.add(text); - } - - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/embed/text")) - .header("Content-Type", "application/json") - .timeout(Duration.ofSeconds(30)) - .POST(HttpRequest.BodyPublishers.ofString( - objectMapper.writeValueAsString(requestBody) - )) - .build(); - - HttpResponse response = httpClient.send( - request, - HttpResponse.BodyHandlers.ofString() - ); - - if (response.statusCode() != 200) { - throw new RuntimeException("API error: " + response.body()); - } - - // 解析响应 - JsonNode root = objectMapper.readTree(response.body()); - List embeddings = new java.util.ArrayList<>(); - - for (JsonNode item : root) { - if (item.isNull()) { - embeddings.add(null); - } else { - float[] vector = objectMapper.treeToValue(item, float[].class); - embeddings.add(vector); - } - } - - return embeddings; - } - - /** - * 计算余弦相似度 - */ - public static float cosineSimilarity(float[] v1, float[] v2) { - if (v1.length != v2.length) { - throw new IllegalArgumentException("Vectors must be same length"); - } - - float dotProduct = 0.0f; - float norm1 = 0.0f; - float norm2 = 0.0f; - - for (int i = 0; i < v1.length; i++) { - dotProduct += v1[i] * v2[i]; - norm1 += v1[i] * v1[i]; - norm2 += v2[i] * v2[i]; - } - - return (float) (dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2))); - } - - // 健康状态数据类 - public static class HealthStatus { - public final String status; - public final boolean textModelLoaded; - public final boolean imageModelLoaded; - - public HealthStatus(String status, boolean textModelLoaded, boolean imageModelLoaded) { - this.status = status; - this.textModelLoaded = textModelLoaded; - this.imageModelLoaded = imageModelLoaded; - } - - @Override - public String toString() { - return String.format("HealthStatus{status='%s', textModelLoaded=%b, imageModelLoaded=%b}", - status, textModelLoaded, imageModelLoaded); - } - } - - // 使用示例 - public static void main(String[] args) throws Exception { - EmbeddingServiceClient client = new EmbeddingServiceClient("http://localhost:6005"); - - // 健康检查 - HealthStatus health = client.healthCheck(); - System.out.println("Health: " + health); - - // 文本向量化 - List texts = List.of("红色连衣裙", "blue jeans", "vintage dress"); - List embeddings = client.embedTexts(texts); - - System.out.println("Got " + embeddings.size() + " embeddings"); - for (int i = 0; i < embeddings.size(); i++) { - System.out.println("Embedding " + i + " dimensions: " + - (embeddings.get(i) != null ? embeddings.get(i).length : "null")); - } - - // 计算相似度 - if (embeddings.get(0) != null && embeddings.get(1) != null) { - float similarity = cosineSimilarity(embeddings.get(0), embeddings.get(1)); - System.out.println("Similarity between text 0 and 1: " + similarity); - } - } -} -``` - -**Maven依赖**(`pom.xml`): - -```xml - - - com.fasterxml.jackson.core - jackson-databind - 2.15.2 - - -``` - -### 7.3 cURL示例 - -#### 健康检查 - -```bash -curl http://localhost:6005/health -``` - -#### 文本向量化 - -```bash -# 单个文本 -curl -X POST http://localhost:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["衣服的质量杠杠的"]' \ - | jq '.[0][0:10]' # 打印前10维 - -# 批量文本 -curl -X POST http://localhost:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["红色连衣裙", "blue jeans", "vintage dress"]' \ - | jq '. | length' # 检查返回数量 -``` - -#### 图片向量化 - -```bash -# URL图片 -curl -X POST http://localhost:6005/embed/image \ - -H "Content-Type: application/json" \ - -d '["https://example.com/product.jpg"]' \ - | jq '.[0][0:5]' - -# 本地图片 -curl -X POST http://localhost:6005/embed/image \ - -H "Content-Type: application/json" \ - -d '["/data/images/product.jpg"]' -``` - -#### 错误处理示例 - -```bash -# 检查服务状态 -if ! curl -f http://localhost:6005/health > /dev/null 2>&1; then - echo "Embedding service is not healthy!" - exit 1 -fi - -# 调用API并检查错误 -response=$(curl -s -X POST http://localhost:6005/embed/text \ - -H "Content-Type: application/json" \ - -d '["test query"]') - -if echo "$response" | jq -e '.[0] == null' > /dev/null; then - echo "Embedding failed!" - echo "$response" - exit 1 -fi - -echo "Embedding succeeded!" -``` - ---- - -## 性能对比与优化 - -### 8.1 性能对比 - -#### 本地服务性能 - -| 操作 | 硬件配置 | 延迟 | 吞吐量 | -|------|---------|------|--------| -| 文本向量化(单个) | GPU (RTX 3090) | ~80ms | ~12 qps | -| 文本向量化(批量32) | GPU (RTX 3090) | ~2.5s | ~256 qps | -| 文本向量化(单个) | CPU (16核) | ~500ms | ~2 qps | -| 图片向量化(单个) | GPU (RTX 3090) | ~150ms | ~6 qps | -| 图片向量化(批量4) | GPU (RTX 3090) | ~600ms | ~6 qps | - -#### 云端服务性能 - -| 操作 | 指标 | 值 | -|------|------|-----| -| 文本向量化(单个) | 延迟 | 300-400ms | -| 文本向量化(批量) | 吞吐量 | ~2-3 qps | -| API限制 | 速率限制 | 取决于套餐 | -| 可用性 | SLA | 99.9% | - -### 8.2 成本对比 - -#### 本地服务成本 - -| 配置 | 硬件成本(月) | 电费(月) | 总成本(月) | -|------|--------------|-----------|------------| -| GPU服务器 (RTX 3090) | ¥3000 | ¥500 | ¥3500 | -| GPU服务器 (A100) | ¥8000 | ¥800 | ¥8800 | -| CPU服务器(16核) | ¥800 | ¥200 | ¥1000 | - -#### 云端服务成本 - -阿里云DashScope定价(参考): - -| 套餐 | 价格 | 调用量 | 适用场景 | -|------|------|--------|---------| -| 按量付费 | ¥0.0007/1K tokens | 无限制 | 测试/小规模 | -| 基础版 | ¥100/月 | 1M tokens | 小规模应用 | -| 专业版 | ¥500/月 | 10M tokens | 中等规模 | -| 企业版 | 定制 | 无限制 | 大规模 | - -**成本计算示例**: - -假设每天10万次搜索,每次查询平均10个token: -- 日调用量:1M tokens -- 月调用量:30M tokens -- 月成本:30 × 0.7 = ¥21(按量付费) - -### 8.3 优化建议 - -#### 本地服务优化 - -1. **GPU利用率优化** -```python -# 增加批处理大小 -TEXT_BATCH_SIZE = 64 # 从32增加到64 -``` - -2. **模型量化** -```python -# 使用半精度浮点数(节省显存) -import torch -model = model.half() # FP16 -``` - -3. **预热模型** -```python -# 服务启动后预热 -@app.on_event("startup") -async def warmup(): - _text_model.encode(["warmup"], device="cuda") -``` - -4. **连接池优化** -```python -# uvicorn配置 ---workers 1 \ # 单worker(GPU模型限制) ---backlog 2048 \ # 增加连接队列 ---limit-concurrency 32 # 限制并发数 -``` - -#### 云端服务优化 - -1. **批量合并** -```python -# 累积多个请求后批量调用 -class BatchEncoder: - def __init__(self, batch_size=32, timeout=0.1): - self.batch_size = batch_size - self.timeout = timeout - self.queue = [] - - async def encode(self, text: str): - # 等待批量积累 - future = asyncio.Future() - self.queue.append((text, future)) - - if len(self.queue) >= self.batch_size: - self._flush() - - return await future -``` - -2. **本地缓存** -```python -import hashlib -import pickle - -class CachedEncoder: - def __init__(self, cache_file="embedding_cache.pkl"): - self.cache = self._load_cache(cache_file) - - def encode(self, text: str): - key = hashlib.md5(text.encode()).hexdigest() - if key in self.cache: - return self.cache[key] - - embedding = self._call_api(text) - self.cache[key] = embedding - return embedding -``` - -3. **降级策略** -```python -class HybridEncoder: - def __init__(self): - self.cloud_encoder = CloudTextEncoder() - self.local_encoder = None # 按需加载 - - def encode(self, text: str): - try: - return self.cloud_encoder.encode(text) - except Exception as e: - logger.warning(f"Cloud API failed: {e}, falling back to local") - if not self.local_encoder: - self.local_encoder = BgeEncoder() - return self.local_encoder.encode(text) -``` - ---- - -## 故障排查 - -### 9.1 常见问题 - -#### 问题1:服务无法启动 - -**症状**: -```bash -$ ./scripts/start_embedding_service.sh -Error: Port 6005 already in use -``` - -**解决**: -```bash -# 检查端口占用 -lsof -i :6005 - -# 杀死占用进程 -kill -9 - -# 或者修改配置文件中的端口 -# embeddings/config.py: PORT = 6006 -``` - -#### 问题2:CUDA Out of Memory - -**症状**: -``` -RuntimeError: CUDA out of memory. Tried to allocate 2.00 GiB -``` - -**解决**: -```python -# 减小批处理大小 -TEXT_BATCH_SIZE = 16 # 从32减少到16 - -# 或者使用CPU模式 -TEXT_DEVICE = "cpu" -``` - -#### 问题3:模型下载失败 - -**症状**: -``` -OSError: Can't load tokenizer for 'Xorbits/bge-m3' -``` - -**解决**: -```bash -# 手动下载模型 -huggingface-cli download Xorbits/bge-m3 - -# 或使用镜像 -export HF_ENDPOINT=https://hf-mirror.com -``` - -#### 问题4:云端API Key无效 - -**症状**: -``` -ERROR: DASHSCOPE_API_KEY environment variable is not set! -``` - -**解决**: -```bash -# 设置环境变量 -export DASHSCOPE_API_KEY="sk-your-key" - -# 验证 -echo $DASHSCOPE_API_KEY -``` - -#### 问题5:API速率限制 - -**症状**: -``` -Rate limit exceeded. Please try again later. -``` - -**解决**: -```python -# 添加延迟 -import time -for batch in batches: - embeddings = encoder.encode_batch(batch) - time.sleep(0.1) # 每批之间延迟100ms -``` - -### 9.2 日志查看 - -#### 服务日志 - -```bash -# 查看实时日志 -./scripts/start_embedding_service.sh 2>&1 | tee embedding.log - -# 或使用systemd(如果配置了服务) -journalctl -u embedding-service -f -``` - -#### Python应用日志 - -```python -import logging - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) - -# 使用 -logger.info("Encoding texts...") -logger.error("Encoding failed: %s", str(e)) -``` - -#### GPU监控 - -```bash -# 实时监控GPU使用 -watch -n 1 nvidia-smi - -# 查看详细信息 -nvidia-smi --query-gpu=timestamp,name,temperature.gpu,utilization.gpu,utilization.memory,memory.total,memory.used,memory.free --format=csv -``` - -### 9.3 性能调优 - -#### 性能分析 - -```python -import time -import numpy as np - -def benchmark_encoder(encoder, texts, iterations=100): - """性能基准测试""" - times = [] - - for i in range(iterations): - start = time.time() - embeddings = encoder.encode(texts) - end = time.time() - times.append(end - start) - - times = np.array(times) - print(f"Mean: {times.mean():.3f}s") - print(f"Std: {times.std():.3f}s") - print(f"Min: {times.min():.3f}s") - print(f"Max: {times.max():.3f}s") - print(f"QPS: {len(texts) / times.mean():.2f}") - -# 使用 -benchmark_encoder(encoder, texts=["test"] * 32, iterations=100) -``` - -#### 内存分析 - -```bash -# Python内存分析 -pip install memory_profiler - -# 在代码中添加 -from memory_profiler import profile - -@profile -def encode_batch(texts): - return encoder.encode(texts) - -# 运行 -python -m memory_profiler script.py -``` - ---- - -## 附录 - -### 10.1 向量维度说明 - -#### 为什么是1024维? - -1. **表达能力**:1024维可以捕捉丰富的语义信息 -2. **计算效率**:维度适中,计算速度快 -3. **存储平衡**:向量大小合理(每个向量约4KB) -4. **模型选择**:BGE-M3和text-embedding-v4都使用1024维 - -#### 向量存储计算 - -``` -单个向量大小 = 1024 × 4字节(FP32) = 4KB -100万向量大小 = 4KB × 1,000,000 = 4GB -1000万向量大小 = 4KB × 10,000,000 = 40GB -``` - -### 10.2 模型版本信息 - -#### BGE-M3 - -- **HuggingFace ID**: `Xorbits/bge-m3` -- **论文**: [BGE-M3: Multi-Functionality, Multi-Linguality, Multi-Granularity Text Embeddings Through Self-Knowledge Distillation](https://arxiv.org/abs/2402.03616) -- **GitHub**: https://github.com/FlagOpen/FlagEmbedding -- **特性**: - - 支持100+种语言 - - 最大支持8192 token长度 - - 丰富的语义表达能力 - -#### CN-CLIP - -- **模型**: ViT-H-14 -- **论文**: [Chinese CLIP: Contrastive Language-Image Pretraining in Chinese](https://arxiv.org/abs/2211.01935) -- **GitHub**: https://github.com/OFA-Sys/Chinese-CLIP -- **特性**: - - 中文图文理解 - - 支持图片检索和文本检索 - - 适合电商场景 - -#### Aliyun text-embedding-v4 - -- **提供商**: 阿里云DashScope -- **文档**: https://help.aliyun.com/zh/model-studio/getting-started/models -- **特性**: - - 云端API,无需部署 - - 高可用性(99.9% SLA) - - 自动扩展 - -### 10.3 相关文档 - -#### 项目文档 - -- **搜索API对接指南**: `docs/搜索API对接指南.md` -- **索引字段说明**: `docs/索引字段说明v2.md` -- **系统设计文档**: `docs/系统设计文档.md` -- **CLAUDE项目指南**: `CLAUDE.md` - -#### 外部参考 - -- **BGE-M3官方文档**: https://github.com/FlagOpen/FlagEmbedding/tree/master/BGE_M3 -- **阿里云DashScope**: https://help.aliyun.com/zh/model-studio/ -- **Elasticsearch向量搜索**: https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html -- **FastAPI文档**: https://fastapi.tiangolo.com/ - -#### 测试脚本 - -```bash -# 本地向量化服务测试 -./scripts/test_embedding_service.sh - -# 云端向量化服务测试 -python scripts/test_cloud_embedding.py - -# 性能基准测试 -python scripts/benchmark_embeddings.py -``` - ---- - -## 版本历史 - -| 版本 | 日期 | 变更说明 | -|------|------|---------| -| v1.0 | 2025-12-23 | 初始版本,完整的向量化模块文档 | - ---- - -## 联系方式 - -如有问题或建议,请联系项目维护者。 - -**项目仓库**: `/data/saas-search` - -**相关文档目录**: `docs/` +详见 `embeddings/README.md`。 diff --git a/docs/翻译模块说明.md b/docs/翻译模块说明.md index 3a3cebb..676aeba 100644 --- a/docs/翻译模块说明.md +++ b/docs/翻译模块说明.md @@ -1,256 +1,20 @@ -# 翻译模块说明(Qwen / DeepL) +# 翻译模块 -本文档汇总翻译模块的**接口使用说明**与**Python 模块用法**,对应代码: +**快速上手**:见 `docs/QUICKSTART.md` 第 3.4 节。 -- HTTP 服务:`api/translator_app.py` -- Python 模块:`query/translator.py` +## 环境变量 ---- - -## 1. 功能概述 - -当前翻译模块支持两种后端: - -- **Qwen(默认)**:通过阿里云百炼 DashScope 的 OpenAI 兼容接口调用 `qwen-mt-flash` -- **DeepL**:通过 DeepL API 调用翻译(保留原有能力) - -两种方式均支持: - -- **Redis 缓存**(如启用):同文案同目标语言命中缓存直接返回 -- **`source_lang` 自动检测**:当 `source_lang` 为空或 `"auto"` 时启用自动检测(Qwen 使用 `"auto"`) - ---- - -## 2. 环境变量与配置 - -项目会在 `config/env_config.py` 中加载项目根目录的 `.env`,常用变量如下: - -```env -# Qwen / DashScope +```bash +# Qwen(默认) DASHSCOPE_API_KEY=sk-xxx # DeepL DEEPL_AUTH_KEY=xxx -# 可选:翻译服务默认模型(HTTP 服务启动后若请求不传 model,则使用此默认值) +# 可选 TRANSLATION_MODEL=qwen # 或 deepl ``` -说明: - -- **Qwen** 使用 `DASHSCOPE_API_KEY` -- **DeepL** 使用 `DEEPL_AUTH_KEY` -- `.env` 中的 `OPENAI_API_KEY` 不是本翻译模块必须项(当前实现用的是 `DASHSCOPE_API_KEY`) - ---- - -## 3. HTTP 翻译服务(`api/translator_app.py`) - -### 3.1 启动命令 - -推荐(热更新): - -```bash -cd /data/saas-search -uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload -``` - -指定默认模型(不传请求 `model` 时生效): - -```bash -cd /data/saas-search -export TRANSLATION_MODEL=qwen # 或 deepl -uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload -``` - -### 3.2 接口列表 - -- **GET** `/health`:健康检查(返回默认模型、已初始化模型列表等) -- **POST** `/translate`:翻译文本 -- **GET** `/docs`:Swagger UI - -### 3.3 `/translate` 请求参数 - -请求体(JSON): - -```json -{ - "text": "要翻译的文本", - "target_lang": "en", - "source_lang": "auto", - "model": "qwen" -} -``` - -- **text**:必填,待翻译文本 -- **target_lang**:必填,目标语言代码(见“语言支持”) -- **source_lang**:可选,源语言代码;不传或传 `"auto"` 时自动检测 -- **model**:可选,`"qwen"` 或 `"deepl"`;默认 `"qwen"` - -### 3.4 `/translate` 返回参数 - -响应体(JSON,成功时): - -```json -{ - "text": "商品名称", - "target_lang": "en", - "source_lang": "zh", - "translated_text": "Product name", - "status": "success", - "model": "qwen" -} -``` - -### 3.5 请求示例(curl) - -健康检查: - -```bash -curl http://localhost:6006/health -``` - -默认(qwen)中文 → 英文: - -```bash -curl -X POST http://localhost:6006/translate \ - -H "Content-Type: application/json" \ - -d '{"text":"我看到这个视频后没有笑","target_lang":"en","source_lang":"auto"}' -``` - -显式指定 qwen,英文 → 简体中文: - -```bash -curl -X POST http://localhost:6006/translate \ - -H "Content-Type: application/json" \ - -d '{"text":"Product name","target_lang":"zh","source_lang":"en","model":"qwen"}' -``` - -繁体中文(`zh_tw`)测试: - -```bash -curl -X POST http://localhost:6006/translate \ - -H "Content-Type: application/json" \ - -d '{"text":"商品名稱","target_lang":"zh_tw","source_lang":"auto","model":"qwen"}' -``` - -切换 DeepL: - -```bash -curl -X POST http://localhost:6006/translate \ - -H "Content-Type: application/json" \ - -d '{"text":"商品名称","target_lang":"en","source_lang":"zh","model":"deepl"}' -``` - -### 3.6 关于提示词(Prompt) - -HTTP 服务内部使用了固定提示词 `TRANSLATION_PROMPT`(适用于“商品 SKU 英文名”场景),并通过 `prompt` 参数传入 `Translator.translate()`。 - -- **DeepL**:`prompt` 会作为 DeepL 的 `context` 使用(影响翻译但不被翻译) -- **Qwen**:当前实现未将 `prompt/context` 传给 Qwen 的 `translation_options`(即对 Qwen 不生效) - ---- - -## 4. Python 翻译模块(`query/translator.py`) - -### 4.1 基本用法 - -```python -from query.translator import Translator - -# 默认使用 qwen -translator = Translator() - -result = translator.translate( - text="我看到这个视频后没有笑", - target_lang="en", - source_lang="auto", -) -print(result) -``` - -显式选择模型: - -```python -translator_qwen = Translator(model="qwen") -translator_deepl = Translator(model="deepl") -``` - -### 4.2 关键参数 - -- `Translator(model="qwen" | "deepl")`:选择翻译模型,默认 `"qwen"` -- `translate(text, target_lang, source_lang=None, context=None, prompt=None)`: - - `target_lang` / `source_lang`:语言代码(见“语言支持”) - - `source_lang` 为空或 `"auto"`:自动检测 - - `prompt`: - - DeepL:作为 `context` 使用 - - Qwen:当前未使用 - -### 4.3 缓存(Redis) - -`Translator(use_cache=True)` 时会连接 Redis 并缓存翻译结果。 - -- Redis 连接配置来自 `config/env_config.py` 的 `REDIS_CONFIG` -- 缓存 key 前缀默认 `trans`(可用 `REDIS_TRANSLATION_CACHE_PREFIX` 覆盖) - ---- - -## 5. Qwen 语言支持(按 qwen-mt-plus/flash/turbo 标准) - -> 以下为 Qwen 翻译模型支持的语言(**代码 → 英文名**),并已用于 `query/translator.py` 的映射。 - -| 代码 | 英文名 | -|------|--------| -| en | English | -| zh | Chinese | -| zh_tw | Traditional Chinese | -| ru | Russian | -| ja | Japanese | -| ko | Korean | -| es | Spanish | -| fr | French | -| pt | Portuguese | -| de | German | -| it | Italian | -| th | Thai | -| vi | Vietnamese | -| id | Indonesian | -| ms | Malay | -| ar | Arabic | -| hi | Hindi | -| he | Hebrew | -| my | Burmese | -| ta | Tamil | -| ur | Urdu | -| bn | Bengali | -| pl | Polish | -| nl | Dutch | -| ro | Romanian | -| tr | Turkish | -| km | Khmer | -| lo | Lao | -| yue | Cantonese | -| cs | Czech | -| el | Greek | -| sv | Swedish | -| hu | Hungarian | -| da | Danish | -| fi | Finnish | -| uk | Ukrainian | -| bg | Bulgarian | - ---- - -## 6. 常见问题(FAQ) - -### 6.1 Qwen 调用报错 / 无法初始化 - -- 确认 `.env` 中已配置 `DASHSCOPE_API_KEY` -- 确认安装依赖:`openai`(Python 包) -- 如在海外地域使用模型,将 `base_url` 切换为 `https://dashscope-intl.aliyuncs.com/compatible-mode/v1` - -### 6.2 DeepL 返回 403 / 翻译失败 - -- 确认 `.env` 中已配置 `DEEPL_AUTH_KEY` -- 若使用的是 Pro key,请使用 `https://api.deepl.com/v2/translate`(当前代码即为该地址) +## Provider 配置 +Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 `docs/PROVIDER_ARCHITECTURE.md`。 diff --git a/embeddings/README.md b/embeddings/README.md index b6418f1..5b59f2a 100644 --- a/embeddings/README.md +++ b/embeddings/README.md @@ -1,4 +1,8 @@ -## 向量化模块(embeddings) +# Embeddings 模块 + +**请求示例**见 `docs/QUICKSTART.md` §3.3。 + +--- 这个目录是一个完整的“向量化模块”,包含: diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index fec709e..d92c578 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -15,6 +15,8 @@ from typing import List, Optional, Union, Dict, Any logger = logging.getLogger(__name__) +from config.services_config import get_embedding_base_url + class CLIPImageEncoder: """ @@ -30,7 +32,7 @@ class CLIPImageEncoder: with cls._lock: if cls._instance is None: cls._instance = super(CLIPImageEncoder, cls).__new__(cls) - resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:6005") + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() logger.info(f"Creating CLIPImageEncoder instance with service URL: {resolved_url}") cls._instance.service_url = resolved_url cls._instance.endpoint = f"{resolved_url}/embed/image" diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 3cc1158..4b60478 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -18,6 +18,8 @@ import logging logger = logging.getLogger(__name__) +from config.services_config import get_embedding_base_url + # Try to import REDIS_CONFIG, but allow import to fail try: from config.env_config import REDIS_CONFIG @@ -38,7 +40,7 @@ class BgeEncoder: with cls._lock: if cls._instance is None: cls._instance = super(BgeEncoder, cls).__new__(cls) - resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:6005") + resolved_url = service_url or os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_base_url() logger.info(f"Creating BgeEncoder instance with service URL: {resolved_url}") cls._instance.service_url = resolved_url cls._instance.endpoint = f"{resolved_url}/embed/text" diff --git a/indexer/README.md b/indexer/README.md index 03f3f60..8515ab7 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -1,3 +1,9 @@ +# Indexer 模块 + +负责 MySQL → ES 文档富化(多语言、翻译、向量、规格聚合)。**请求示例**见 `docs/QUICKSTART.md` §3.2。 + +--- + ## 一、整体架构说明 ### 1.1 系统角色划分 diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index 857bd75..4693269 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -99,13 +99,8 @@ def create_document_transformer( index_langs = tenant_config.get("index_languages") or [] need_translator = len(index_langs) > 1 if translator is None and need_translator: - from query.translator import Translator - translator = Translator( - api_key=config.query_config.translation_api_key, - use_cache=True, - glossary_id=config.query_config.translation_glossary_id, - translation_context=config.query_config.translation_context - ) + from providers import create_translation_provider + translator = create_translation_provider(config.query_config) if translation_prompts is None: translation_prompts = config.query_config.translation_prompts diff --git a/providers/__init__.py b/providers/__init__.py new file mode 100644 index 0000000..0dbba48 --- /dev/null +++ b/providers/__init__.py @@ -0,0 +1,15 @@ +""" +Pluggable providers for translation, embedding, rerank. + +All provider selection is driven by config/services_config (services block). +""" + +from .translation import create_translation_provider +from .rerank import create_rerank_provider +from .embedding import create_embedding_provider + +__all__ = [ + "create_translation_provider", + "create_rerank_provider", + "create_embedding_provider", +] diff --git a/providers/embedding.py b/providers/embedding.py new file mode 100644 index 0000000..efc9ada --- /dev/null +++ b/providers/embedding.py @@ -0,0 +1,41 @@ +""" +Embedding provider - HTTP service (vllm reserved). + +Returns text/image encoders configured via services_config. +""" + +from __future__ import annotations + +from config.services_config import get_embedding_config, get_embedding_base_url + + +def create_embedding_provider() -> "EmbeddingProvider": + """Create embedding provider from services config.""" + cfg = get_embedding_config() + provider = (cfg.provider or "http").strip().lower() + if provider == "vllm": + import logging + logging.getLogger(__name__).warning("embedding provider 'vllm' is reserved, using HTTP.") + return EmbeddingProvider() + + +class EmbeddingProvider: + """ + Provides text and image encoders. Both use HTTP embedding service + configured via services_config. + """ + + def __init__(self) -> None: + self._base_url = get_embedding_base_url() + + @property + def text_encoder(self): + """Lazy-created text encoder (BgeEncoder).""" + from embeddings.text_encoder import BgeEncoder + return BgeEncoder(service_url=self._base_url) + + @property + def image_encoder(self): + """Lazy-created image encoder (CLIPImageEncoder).""" + from embeddings.image_encoder import CLIPImageEncoder + return CLIPImageEncoder(service_url=self._base_url) diff --git a/providers/rerank.py b/providers/rerank.py new file mode 100644 index 0000000..153a28e --- /dev/null +++ b/providers/rerank.py @@ -0,0 +1,68 @@ +""" +Rerank provider - HTTP service (vllm reserved). +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional, Tuple + +import requests + +from config.services_config import get_rerank_config, get_rerank_service_url + +logger = logging.getLogger(__name__) + + +class HttpRerankProvider: + """Rerank via HTTP service.""" + + def __init__(self, service_url: str): + self.service_url = (service_url or "").rstrip("/") + + def rerank( + self, + query: str, + docs: List[str], + timeout_sec: float, + ) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]: + if not docs: + return [], {} + try: + payload = {"query": (query or "").strip(), "docs": docs} + response = requests.post(self.service_url, json=payload, timeout=timeout_sec) + if response.status_code != 200: + logger.warning( + "Rerank service HTTP %s: %s", + response.status_code, + (response.text or "")[:200], + ) + return None, None + data = response.json() + scores = data.get("scores") + if not isinstance(scores, list): + return None, None + return scores, data.get("meta") or {} + except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout) as exc: + logger.warning( + "Rerank request timed out after %.1fs (docs=%d); returning ES order. %s", + timeout_sec, + len(docs), + exc, + ) + return None, None + except Exception as exc: + logger.warning("Rerank request failed: %s", exc, exc_info=True) + return None, None + + +def create_rerank_provider() -> HttpRerankProvider: + """Create rerank provider from services config.""" + cfg = get_rerank_config() + provider = (cfg.provider or "http").strip().lower() + + if provider == "vllm": + logger.warning("rerank provider 'vllm' is reserved, using HTTP.") + + url = get_rerank_service_url() + return HttpRerankProvider(service_url=url) diff --git a/providers/translation.py b/providers/translation.py new file mode 100644 index 0000000..277ea08 --- /dev/null +++ b/providers/translation.py @@ -0,0 +1,182 @@ +""" +Translation provider - direct (in-process) or HTTP service. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional, Union + +from concurrent.futures import Future, ThreadPoolExecutor +import requests + +from config.services_config import get_translation_config, get_translation_base_url + +logger = logging.getLogger(__name__) + + +class HttpTranslationProvider: + """Translation via HTTP service.""" + + def __init__( + self, + base_url: str, + model: str = "qwen", + timeout_sec: float = 10.0, + translation_context: Optional[str] = None, + ): + self.base_url = (base_url or "").rstrip("/") + self.model = model or "qwen" + self.timeout_sec = float(timeout_sec or 10.0) + self.translation_context = translation_context or "e-commerce product search" + self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="http-translator") + + def _translate_once( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + ) -> Optional[str]: + if not text or not str(text).strip(): + return text + try: + url = f"{self.base_url}/translate" + payload = { + "text": text, + "target_lang": target_lang, + "source_lang": source_lang or "auto", + "model": self.model, + } + response = requests.post(url, json=payload, timeout=self.timeout_sec) + if response.status_code != 200: + logger.warning( + "HTTP translator failed: status=%s body=%s", + response.status_code, + (response.text or "")[:200], + ) + return None + data = response.json() + translated = data.get("translated_text") + return translated if translated is not None else None + except Exception as exc: + logger.warning("HTTP translator request failed: %s", exc, exc_info=True) + return None + + def translate( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Optional[str]: + del context, prompt + result = self._translate_once(text=text, target_lang=target_lang, source_lang=source_lang) + return result if result is not None else text + + def translate_multi( + self, + text: str, + target_langs: List[str], + source_lang: Optional[str] = None, + context: Optional[str] = None, + async_mode: bool = True, + prompt: Optional[str] = None, + ) -> Dict[str, Optional[str]]: + del context, async_mode, prompt + out: Dict[str, Optional[str]] = {} + for lang in target_langs: + out[lang] = self.translate(text, lang, source_lang=source_lang) + return out + + def translate_multi_async( + self, + text: str, + target_langs: List[str], + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Dict[str, Union[str, Future]]: + del context, prompt + out: Dict[str, Union[str, Future]] = {} + for lang in target_langs: + out[lang] = self.executor.submit(self.translate, text, lang, source_lang) + return out + + def translate_for_indexing( + self, + text: str, + shop_language: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + index_languages: Optional[List[str]] = None, + ) -> Dict[str, Optional[str]]: + del context, prompt + langs = index_languages if index_languages else ["en", "zh"] + source = source_lang or shop_language or "auto" + out: Dict[str, Optional[str]] = {} + for lang in langs: + if lang == shop_language: + out[lang] = text + else: + out[lang] = self.translate(text, target_lang=lang, source_lang=source) + return out + + +def create_translation_provider(query_config: Any = None) -> Any: + """ + Create translation provider from services config. + + query_config: optional, for api_key/glossary_id/context (used by direct provider). + """ + cfg = get_translation_config() + provider = cfg.provider + pc = cfg.get_provider_cfg() + + if provider in ("direct", "local", "inprocess"): + from query.translator import Translator + model = pc.get("model") or "qwen" + qc = query_config or _empty_query_config() + return Translator( + model=model, + api_key=getattr(qc, "translation_api_key", None), + use_cache=True, + glossary_id=getattr(qc, "translation_glossary_id", None), + translation_context=getattr(qc, "translation_context", "e-commerce product search"), + ) + + if provider in ("http", "service"): + base_url = get_translation_base_url() + model = pc.get("model") or "qwen" + timeout = pc.get("timeout_sec", 10.0) + qc = query_config or _empty_query_config() + return HttpTranslationProvider( + base_url=base_url, + model=model, + timeout_sec=float(timeout), + translation_context=getattr(qc, "translation_context", "e-commerce product search"), + ) + + logger.warning( + "Unsupported translation provider '%s', fallback to direct.", + provider, + ) + from query.translator import Translator + qc = query_config or _empty_query_config() + return Translator( + model=pc.get("model") or "qwen", + api_key=getattr(qc, "translation_api_key", None), + use_cache=True, + glossary_id=getattr(qc, "translation_glossary_id", None), + translation_context=getattr(qc, "translation_context", "e-commerce product search"), + ) + + +def _empty_query_config() -> Any: + """Minimal object with default translation attrs.""" + class _QC: + translation_api_key = None + translation_glossary_id = None + translation_context = "e-commerce product search" + return _QC() diff --git a/query/__init__.py b/query/__init__.py index b732adf..bb8fa61 100644 --- a/query/__init__.py +++ b/query/__init__.py @@ -2,12 +2,15 @@ from .language_detector import LanguageDetector from .translator import Translator +from .translation_client import HttpTranslationClient, create_translation_client from .query_rewriter import QueryRewriter, QueryNormalizer from .query_parser import QueryParser, ParsedQuery __all__ = [ 'LanguageDetector', 'Translator', + 'HttpTranslationClient', + 'create_translation_client', 'QueryRewriter', 'QueryNormalizer', 'QueryParser', diff --git a/query/query_parser.py b/query/query_parser.py index e042532..d363045 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -13,7 +13,7 @@ from concurrent.futures import Future, ThreadPoolExecutor, as_completed from embeddings import BgeEncoder from config import SearchConfig from .language_detector import LanguageDetector -from .translator import Translator +from providers import create_translation_provider from .query_rewriter import QueryRewriter, QueryNormalizer logger = logging.getLogger(__name__) @@ -78,7 +78,7 @@ class QueryParser: self, config: SearchConfig, text_encoder: Optional[BgeEncoder] = None, - translator: Optional[Translator] = None + translator: Optional[Any] = None ): """ Initialize query parser. @@ -123,16 +123,13 @@ class QueryParser: return self._text_encoder @property - def translator(self) -> Translator: + def translator(self) -> Any: """Lazy load translator.""" if self._translator is None: - logger.info("Initializing translator (lazy load)...") - self._translator = Translator( - api_key=self.config.query_config.translation_api_key, - use_cache=True, - glossary_id=self.config.query_config.translation_glossary_id, - translation_context=self.config.query_config.translation_context - ) + from config.services_config import get_translation_config + cfg = get_translation_config() + logger.info("Initializing translator (provider=%s)...", cfg.provider) + self._translator = create_translation_provider(self.config.query_config) return self._translator def _simple_tokenize(self, text: str) -> List[str]: diff --git a/query/translation_client.py b/query/translation_client.py new file mode 100644 index 0000000..7b0caaa --- /dev/null +++ b/query/translation_client.py @@ -0,0 +1,20 @@ +""" +Translation client - delegates to providers. + +Deprecated: use providers.create_translation_provider() instead. +Kept for backward compatibility. +""" + +from __future__ import annotations + +from typing import Any + +from providers.translation import ( + HttpTranslationProvider as HttpTranslationClient, + create_translation_provider, +) + + +def create_translation_client(query_config: Any) -> Any: + """Backward compat: delegate to create_translation_provider.""" + return create_translation_provider(query_config) diff --git a/reranker/README.md b/reranker/README.md index 268534a..7de460b 100644 --- a/reranker/README.md +++ b/reranker/README.md @@ -1,4 +1,8 @@ -# Reranker Service (BGE v2 m3) +# Reranker 模块 + +**请求示例**见 `docs/QUICKSTART.md` §3.5。 + +--- A minimal, production-ready reranker service based on **BAAI/bge-reranker-v2-m3**. diff --git a/search/rerank_client.py b/search/rerank_client.py index aca671f..bdb414d 100644 --- a/search/rerank_client.py +++ b/search/rerank_client.py @@ -8,9 +8,10 @@ """ from typing import Dict, Any, List, Optional, Tuple -import os import logging +from providers import create_rerank_provider + logger = logging.getLogger(__name__) # 默认融合权重:ES 归一化分数权重、重排分数权重(相加为 1) @@ -78,46 +79,17 @@ def build_docs_from_hits( def call_rerank_service( query: str, docs: List[str], - service_url: str, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> Tuple[Optional[List[float]], Optional[Dict[str, Any]]]: """ 调用重排服务 POST /rerank,返回分数列表与 meta。 - - Args: - query: 搜索查询字符串 - docs: 文档文本列表(与 ES hits 顺序一致) - service_url: 完整 URL,如 http://127.0.0.1:6007/rerank - timeout_sec: 请求超时秒数 - - Returns: - (scores, meta):成功时 scores 与 docs 等长,meta 为服务返回的 meta; - 失败时返回 (None, None) + Provider 和 URL 从 services_config 读取。 """ if not docs: return [], {} try: - import requests - payload = {"query": (query or "").strip(), "docs": docs} - response = requests.post(service_url, json=payload, timeout=timeout_sec) - if response.status_code != 200: - logger.warning( - "Rerank service HTTP %s: %s", - response.status_code, - (response.text or "")[:200], - ) - return None, None - data = response.json() - scores = data.get("scores") - if not isinstance(scores, list): - return None, None - return scores, data.get("meta") or {} - except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout) as e: - logger.warning( - "Rerank request timed out after %.1fs (docs=%d); returning ES order. %s", - timeout_sec, len(docs), e, - ) - return None, None + client = create_rerank_provider() + return client.rerank(query=query, docs=docs, timeout_sec=timeout_sec) except Exception as e: logger.warning("Rerank request failed: %s", e, exc_info=True) return None, None @@ -199,7 +171,6 @@ def run_rerank( query: str, es_response: Dict[str, Any], language: str = "zh", - service_url: Optional[str] = None, timeout_sec: float = DEFAULT_TIMEOUT_SEC, weight_es: float = DEFAULT_WEIGHT_ES, weight_ai: float = DEFAULT_WEIGHT_AI, @@ -208,41 +179,19 @@ def run_rerank( ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], List[Dict[str, Any]]]: """ 完整重排流程:从 es_response 取 hits -> 构造 docs -> 调服务 -> 融合分数并重排 -> 更新 max_score。 - - Args: - query: 搜索查询 - es_response: ES 原始响应(其中的 hits["hits"] 会被原地修改) - language: 文档文本使用的语言 - service_url: 重排服务 URL,为 None 时使用默认 127.0.0.1:6007 - timeout_sec: 请求超时 - weight_es: ES 分数权重 - weight_ai: 重排分数权重 - - Returns: - (es_response, rerank_meta, fused_debug): - - es_response: 已更新 hits 与 max_score 的响应(同一引用) - - rerank_meta: 重排服务返回的 meta,失败时为 None - - fused_debug: 每条文档的融合信息,供 debug 使用 + Provider 和 URL 从 services_config 读取。 """ - try: - from reranker.config import CONFIG as RERANKER_CONFIG - except Exception: - RERANKER_CONFIG = None - - url = service_url or os.getenv("RERANKER_SERVICE_URL") - if not url and RERANKER_CONFIG is not None: - url = f"http://127.0.0.1:{RERANKER_CONFIG.PORT}/rerank" - if not url: - url = "http://127.0.0.1:6007/rerank" - hits = es_response.get("hits", {}).get("hits") or [] if not hits: return es_response, None, [] - # Apply query template (supports {query}) query_text = str(rerank_query_template).format_map({"query": query}) docs = build_docs_from_hits(hits, language=language, doc_template=rerank_doc_template) - scores, meta = call_rerank_service(query_text, docs, url, timeout_sec=timeout_sec) + scores, meta = call_rerank_service( + query_text, + docs, + timeout_sec=timeout_sec, + ) if scores is None or len(scores) != len(hits): return es_response, None, [] diff --git a/search/searcher.py b/search/searcher.py index 3fca1f9..93dbfc4 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -392,7 +392,6 @@ class Searcher: query=rerank_query, es_response=es_response, language=language, - service_url=rc.service_url, timeout_sec=rc.timeout_sec, weight_es=rc.weight_es, weight_ai=rc.weight_ai, @@ -401,11 +400,8 @@ class Searcher: ) if rerank_meta is not None: - rerank_url = ( - rc.service_url - or os.getenv("RERANKER_SERVICE_URL") - or "http://127.0.0.1:6007/rerank" - ) + from config.services_config import get_rerank_service_url + rerank_url = get_rerank_service_url() context.metadata.setdefault("rerank_info", {}) context.metadata["rerank_info"].update({ "service_url": rerank_url, diff --git a/start_reranker.sh b/start_reranker.sh deleted file mode 100644 index ae86205..0000000 --- a/start_reranker.sh +++ /dev/null @@ -1,3 +0,0 @@ - -uvicorn reranker.server:app --host 0.0.0.0 --port 6007 - diff --git a/tests/test_suggestions.py b/tests/test_suggestions.py index ece27a6..2e24e7a 100644 --- a/tests/test_suggestions.py +++ b/tests/test_suggestions.py @@ -122,11 +122,12 @@ def test_resolve_query_language_uses_request_params_when_log_missing(): @pytest.mark.unit def test_resolve_query_language_fallback_to_primary(): - """当无任何语言线索时,应回落到租户 primary_language。""" + """当无任何语言线索时(无 script 检测),应回落到租户 primary_language。""" fake_es = FakeESClient() builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) + # "123" 无 CJK/Latin 等 script,_detect_script_language 返回 None lang, conf, source, conflict = builder._resolve_query_language( - query="some text", + query="123", log_language=None, request_params=None, index_languages=["zh", "en"], -- libgit2 0.21.2