diff --git a/api/translator_app.py b/api/translator_app.py index 57a5f97..a93aeaf 100644 --- a/api/translator_app.py +++ b/api/translator_app.py @@ -83,24 +83,17 @@ Start the service: uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload """ -import os -import sys import logging import argparse import uvicorn -from typing import Dict, List, Optional, Sequence, Union +from typing import Dict, List, Optional, Union from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field -# Add parent directory to path -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from query.qwen_mt_translate import Translator -from query.llm_translate import LLMTranslatorProvider -from query.deepl_provider import DeepLProvider from config.services_config import get_translation_config +from translation.service import TranslationService # Configure logging logging.basicConfig( @@ -109,54 +102,14 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -# Global translator instances cache (keyed by model) -_translators: Dict[str, object] = {} - +_translation_service: Optional[TranslationService] = None -def _resolve_default_model() -> str: - """ - Resolve translator model from services.translation config first. - Priority: - 1) TRANSLATION_MODEL env (explicit runtime override) - 2) services.translation.provider + providers..model - 3) qwen-mt - """ - env_model = (os.getenv("TRANSLATION_MODEL") or "").strip() - if env_model: - return env_model - try: - cfg = get_translation_config() - provider = (cfg.provider or "").strip().lower() - provider_cfg = cfg.get_provider_cfg() if hasattr(cfg, "get_provider_cfg") else {} - model = (provider_cfg.get("model") or "").strip().lower() if isinstance(provider_cfg, dict) else "" - if provider == "llm": - return "llm" - if provider in {"qwen-mt", "direct", "http"}: - return model or "qwen-mt" - if provider == "deepl": - return "deepl" - except Exception: - pass - return "qwen-mt" - - -def get_translator(model: str = "qwen") -> object: - """Get or create translator instance for the specified model.""" - global _translators - if model not in _translators: - logger.info(f"Initializing translator with model: {model}...") - normalized = (model or "qwen").strip().lower() - if normalized in {"qwen", "qwen-mt", "qwen-mt-flash", "qwen-mt-flush"}: - _translators[model] = Translator(model=normalized, use_cache=True, timeout=10) - elif normalized == "deepl": - _translators[model] = DeepLProvider(api_key=None, timeout=10.0) - elif normalized == "llm": - _translators[model] = LLMTranslatorProvider() - else: - raise ValueError(f"Unsupported model: {model}") - logger.info(f"Translator initialized with model: {model}") - return _translators[model] +def get_translation_service() -> TranslationService: + global _translation_service + if _translation_service is None: + _translation_service = TranslationService(get_translation_config()) + return _translation_service # Request/Response models @@ -166,7 +119,8 @@ class TranslationRequest(BaseModel): target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)") source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)") model: Optional[str] = Field(None, description="Translation model: qwen-mt | deepl | llm") - context: Optional[str] = Field(None, description="Optional translation scene or context") + scene: Optional[str] = Field(None, description="Translation scene, paired with model routing") + context: Optional[str] = Field(None, description="Deprecated alias of scene") prompt: Optional[str] = Field(None, description="Optional prompt override") class Config: @@ -176,7 +130,7 @@ class TranslationRequest(BaseModel): "target_lang": "en", "source_lang": "zh", "model": "llm", - "context": "sku_name" + "scene": "sku_name" } } @@ -192,6 +146,7 @@ class TranslationResponse(BaseModel): ) status: str = Field(..., description="Translation status") model: str = Field(..., description="Translation model used") + scene: str = Field(..., description="Translation scene used") # Create FastAPI app @@ -217,10 +172,13 @@ app.add_middleware( async def startup_event(): """Initialize translator on startup.""" logger.info("Starting Translation Service API on port 6006") - default_model = _resolve_default_model() try: - get_translator(model=default_model) - logger.info(f"Translation service ready with default model: {default_model}") + service = get_translation_service() + logger.info( + "Translation service ready | default_model=%s available_models=%s", + service.config.default_model, + service.available_models, + ) except Exception as e: logger.error(f"Failed to initialize translator: {e}", exc_info=True) raise @@ -230,17 +188,14 @@ async def startup_event(): async def health_check(): """Health check endpoint.""" try: - # 仅做轻量级本地检查,避免在健康检查中触发潜在的阻塞初始化或外部依赖 - default_model = _resolve_default_model() - # 如果启动事件成功,默认模型通常会已经初始化到缓存中 - translator = _translators.get(default_model) or next(iter(_translators.values()), None) + service = get_translation_service() return { "status": "healthy", "service": "translation", - "default_model": default_model, - "available_models": list(_translators.keys()), - "translator_initialized": translator is not None, - "cache_enabled": bool(getattr(translator, "use_cache", False)) + "default_model": service.config.default_model, + "default_scene": service.config.default_scene, + "available_models": service.available_models, + "enabled_capabilities": service.config.enabled_models, } except Exception as e: logger.error(f"Health check failed: {e}") @@ -283,27 +238,22 @@ async def translate(request: TranslationRequest): detail="target_lang is required" ) - # Validate model parameter - model = request.model.lower() if request.model else _resolve_default_model().lower() - if model not in ["qwen", "qwen-mt", "deepl", "llm"]: - raise HTTPException( - status_code=400, - detail="Invalid model. Supported models: 'qwen-mt', 'deepl', 'llm'" - ) - try: - # Get translator instance for the specified model - translator = get_translator(model=model) + service = get_translation_service() + scene = (request.scene or request.context or service.config.default_scene).strip() or "general" + model = service.config.normalize_model_name(request.model or service.config.default_model) + translator = service.get_backend(model) raw_text = request.text # 如果是列表,并且底层 provider 声明支持 batch,则直接传 list if isinstance(raw_text, list) and getattr(translator, "supports_batch", False): try: - translated_list = translator.translate( + translated_list = service.translate( text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, - context=request.context, + model=model, + scene=scene, prompt=request.prompt, ) except Exception as exc: @@ -334,6 +284,7 @@ async def translate(request: TranslationRequest): translated_text=normalized, status="success", model=str(getattr(translator, "model", model)), + scene=scene, ) # 否则:统一走逐条拆分逻辑(包括不支持 batch 的 provider) @@ -345,11 +296,12 @@ async def translate(request: TranslationRequest): results.append(item) # type: ignore[arg-type] continue try: - out = translator.translate( + out = service.translate( text=str(item), target_lang=request.target_lang, source_lang=request.source_lang, - context=request.context, + model=model, + scene=scene, prompt=request.prompt, ) except Exception as exc: @@ -365,14 +317,16 @@ async def translate(request: TranslationRequest): translated_text=results, status="success", model=str(getattr(translator, "model", model)), + scene=scene, ) # 单文本模式:保持原有严格失败语义 - translated_text = translator.translate( + translated_text = service.translate( text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, - context=request.context, + model=model, + scene=scene, prompt=request.prompt, ) @@ -388,7 +342,8 @@ async def translate(request: TranslationRequest): source_lang=request.source_lang, translated_text=translated_text, status="success", - model=str(getattr(translator, "model", model)) + model=str(getattr(translator, "model", model)), + scene=scene, ) except HTTPException: diff --git a/config/config.yaml b/config/config.yaml index ffed5e8..a421abb 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -107,9 +107,9 @@ rerank: # 可扩展服务/provider 注册表(单一配置源) services: translation: - provider: "llm" # qwen-mt | deepl | http | llm - base_url: "http://127.0.0.1:6006" - model: "qwen-flash" + service_url: "http://127.0.0.1:6006" + default_model: "llm" + default_scene: "general" timeout_sec: 10.0 cache: enabled: true @@ -119,20 +119,21 @@ services: key_include_context: true key_include_prompt: true key_include_source_lang: true - providers: + capabilities: qwen-mt: - model: "qwen-mt-flush" - http: - base_url: "http://127.0.0.1:6006" - model: "qwen-mt-flush" + enabled: true + model: "qwen-mt-flash" timeout_sec: 10.0 + use_cache: true llm: + enabled: true model: "qwen-flash" # 可选:覆盖 DashScope 兼容模式的 Endpoint 与超时 # base_url 留空则使用 DASHSCOPE_BASE_URL 或默认地域 base_url: "" timeout_sec: 30.0 deepl: + enabled: false model: "deepl" timeout_sec: 10.0 # 可选:用于术语表翻译(由 query_config.translation_glossary_id 衔接) diff --git a/config/env_config.py b/config/env_config.py index 63774ff..50cad06 100644 --- a/config/env_config.py +++ b/config/env_config.py @@ -65,8 +65,9 @@ EMBEDDING_HOST = os.getenv('EMBEDDING_HOST', '127.0.0.1') EMBEDDING_PORT = int(os.getenv('EMBEDDING_PORT', 6005)) TRANSLATION_HOST = os.getenv('TRANSLATION_HOST', '127.0.0.1') TRANSLATION_PORT = int(os.getenv('TRANSLATION_PORT', 6006)) -TRANSLATION_PROVIDER = os.getenv('TRANSLATION_PROVIDER', 'direct') -TRANSLATION_MODEL = os.getenv('TRANSLATION_MODEL', 'qwen') +TRANSLATION_PROVIDER = os.getenv('TRANSLATION_PROVIDER', 'direct') # deprecated +TRANSLATION_MODEL = os.getenv('TRANSLATION_MODEL', 'llm') +TRANSLATION_SCENE = os.getenv('TRANSLATION_SCENE', 'general') RERANKER_HOST = os.getenv('RERANKER_HOST', '127.0.0.1') RERANKER_PORT = int(os.getenv('RERANKER_PORT', 6007)) RERANK_PROVIDER = os.getenv('RERANK_PROVIDER', 'http') diff --git a/config/services_config.py b/config/services_config.py index 659e17e..cce5e80 100644 --- a/config/services_config.py +++ b/config/services_config.py @@ -1,8 +1,9 @@ """ -Services configuration - single source for translation, embedding, rerank providers. +Services configuration - single source for translation, embedding, rerank. -All provider selection and endpoint resolution is centralized here. -Priority: env vars > config.yaml. +Translation is modeled as: +- one translator service endpoint used by business callers +- multiple translation capabilities loaded inside the translator service """ from __future__ import annotations @@ -11,25 +12,60 @@ import os from dataclasses import dataclass, field from functools import lru_cache from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import yaml @dataclass class ServiceConfig: - """Config for one capability (translation/embedding/rerank).""" + """Config for one capability (embedding/rerank).""" + provider: str providers: Dict[str, Any] = field(default_factory=dict) def get_provider_cfg(self) -> Dict[str, Any]: - """Get config for current provider.""" p = (self.provider or "").strip().lower() return self.providers.get(p, {}) if isinstance(self.providers, dict) else {} +@dataclass +class TranslationServiceConfig: + """Dedicated config model for the translation service.""" + + service_url: str + timeout_sec: float + default_model: str + default_scene: str + capabilities: Dict[str, Dict[str, Any]] = field(default_factory=dict) + cache: Dict[str, Any] = field(default_factory=dict) + + def normalize_model_name(self, model: Optional[str]) -> str: + normalized = str(model or self.default_model).strip().lower() + aliases = { + "qwen": "qwen-mt", + "qwen-mt-flash": "qwen-mt", + "qwen-mt-flush": "qwen-mt", + "service": self.default_model, + "default": self.default_model, + } + return aliases.get(normalized, normalized) + + @property + def enabled_models(self) -> List[str]: + items: List[str] = [] + for name, cfg in self.capabilities.items(): + if isinstance(cfg, dict) and bool(cfg.get("enabled", False)): + items.append(str(name).strip().lower()) + return items + + def get_capability_cfg(self, model: Optional[str]) -> Dict[str, Any]: + normalized = self.normalize_model_name(model) + value = self.capabilities.get(normalized) + return dict(value) if isinstance(value, dict) else {} + + def _load_services_raw(config_path: Optional[Path] = None) -> Dict[str, Any]: - """Load services block from config.yaml.""" if config_path is None: config_path = Path(__file__).parent / "config.yaml" path = Path(config_path) @@ -48,11 +84,7 @@ def _load_services_raw(config_path: Optional[Path] = None) -> Dict[str, Any]: return services -def _resolve_provider_name( - env_name: str, - config_provider: Any, - capability: str, -) -> str: +def _resolve_provider_name(env_name: str, config_provider: Any, capability: str) -> str: provider = os.getenv(env_name) or config_provider if not provider: raise ValueError( @@ -62,27 +94,70 @@ def _resolve_provider_name( return str(provider).strip().lower() -def _resolve_translation() -> ServiceConfig: +def _resolve_translation() -> TranslationServiceConfig: raw = _load_services_raw() cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} - providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - provider = _resolve_provider_name( - env_name="TRANSLATION_PROVIDER", - config_provider=cfg.get("provider"), - capability="translation", + service_url = ( + os.getenv("TRANSLATION_SERVICE_URL") + or cfg.get("service_url") + or cfg.get("base_url") + or "http://127.0.0.1:6006" ) - if provider not in ("qwen-mt", "deepl", "direct", "local", "inprocess", "http", "service", "llm"): - raise ValueError(f"Unsupported translation provider: {provider}") + timeout_sec = float(os.getenv("TRANSLATION_TIMEOUT_SEC") or cfg.get("timeout_sec") or 10.0) + + raw_capabilities = cfg.get("capabilities") + if not isinstance(raw_capabilities, dict): + raw_capabilities = cfg.get("providers") + capabilities = raw_capabilities if isinstance(raw_capabilities, dict) else {} + + default_model = str( + os.getenv("TRANSLATION_MODEL") + or cfg.get("default_model") + or cfg.get("provider") + or "qwen-mt" + ).strip().lower() + default_scene = str( + os.getenv("TRANSLATION_SCENE") + or cfg.get("default_scene") + or "general" + ).strip() or "general" + + resolved_capabilities: Dict[str, Dict[str, Any]] = {} + for name, value in capabilities.items(): + if not isinstance(value, dict): + continue + normalized = str(name or "").strip().lower() + if not normalized: + continue + copied = dict(value) + copied.setdefault("enabled", normalized == default_model) + resolved_capabilities[normalized] = copied + + aliases = { + "qwen": "qwen-mt", + "qwen-mt-flash": "qwen-mt", + "qwen-mt-flush": "qwen-mt", + } + default_model = aliases.get(default_model, default_model) - # Env override for http base_url - env_url = os.getenv("TRANSLATION_SERVICE_URL") - if env_url and provider in ("http", "service"): - providers = dict(providers) - providers["http"] = dict(providers.get("http", {})) - providers["http"]["base_url"] = env_url.rstrip("/") + if default_model not in resolved_capabilities: + raise ValueError( + f"services.translation.default_model '{default_model}' is not defined in capabilities" + ) + if not bool(resolved_capabilities[default_model].get("enabled", False)): + resolved_capabilities[default_model]["enabled"] = True - return ServiceConfig(provider=provider, providers=providers) + cache_cfg = cfg.get("cache", {}) if isinstance(cfg.get("cache"), dict) else {} + + return TranslationServiceConfig( + service_url=str(service_url).rstrip("/"), + timeout_sec=timeout_sec, + default_model=default_model, + default_scene=default_scene, + capabilities=resolved_capabilities, + cache=cache_cfg, + ) def _resolve_embedding() -> ServiceConfig: @@ -134,18 +209,10 @@ def _resolve_rerank() -> ServiceConfig: def get_rerank_backend_config() -> tuple[str, dict]: - """ - Resolve reranker backend name and config for the reranker service process. - Returns (backend_name, backend_cfg). - Env RERANK_BACKEND overrides config. - """ raw = _load_services_raw() cfg = raw.get("rerank", {}) if isinstance(raw.get("rerank"), dict) else {} backends = cfg.get("backends", {}) if isinstance(cfg.get("backends"), dict) else {} - name = ( - os.getenv("RERANK_BACKEND") - or cfg.get("backend") - ) + name = os.getenv("RERANK_BACKEND") or cfg.get("backend") if not name: raise ValueError("services.rerank.backend is required (or env RERANK_BACKEND)") name = str(name).strip().lower() @@ -156,18 +223,10 @@ def get_rerank_backend_config() -> tuple[str, dict]: def get_embedding_backend_config() -> tuple[str, dict]: - """ - Resolve embedding backend name and config for the embedding service process. - Returns (backend_name, backend_cfg). - Env EMBEDDING_BACKEND overrides config. - """ raw = _load_services_raw() cfg = raw.get("embedding", {}) if isinstance(raw.get("embedding"), dict) else {} backends = cfg.get("backends", {}) if isinstance(cfg.get("backends"), dict) else {} - name = ( - os.getenv("EMBEDDING_BACKEND") - or cfg.get("backend") - ) + name = os.getenv("EMBEDDING_BACKEND") or cfg.get("backend") if not name: raise ValueError("services.embedding.backend is required (or env EMBEDDING_BACKEND)") name = str(name).strip().lower() @@ -178,44 +237,26 @@ def get_embedding_backend_config() -> tuple[str, dict]: @lru_cache(maxsize=1) -def get_translation_config() -> ServiceConfig: - """Get translation service config.""" +def get_translation_config() -> TranslationServiceConfig: return _resolve_translation() @lru_cache(maxsize=1) def get_embedding_config() -> ServiceConfig: - """Get embedding service config.""" return _resolve_embedding() @lru_cache(maxsize=1) def get_rerank_config() -> ServiceConfig: - """Get rerank service config.""" return _resolve_rerank() def get_translation_base_url() -> str: - """Resolve translation HTTP base URL (for http provider).""" - base = ( - os.getenv("TRANSLATION_SERVICE_URL") - or get_translation_config().providers.get("http", {}).get("base_url") - ) - if not base: - raise ValueError("Translation HTTP base_url is not configured") - return str(base).rstrip("/") + return get_translation_config().service_url def get_translation_cache_config() -> Dict[str, Any]: - """ - Resolve translation cache policy from services.translation.cache. - - All translation cache key/TTL behavior should be configured in config.yaml, - not hardcoded in code. - """ - raw = _load_services_raw() - cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} - cache_cfg = cfg.get("cache", {}) if isinstance(cfg.get("cache"), dict) else {} + cache_cfg = get_translation_config().cache return { "enabled": bool(cache_cfg.get("enabled", True)), "key_prefix": str(cache_cfg.get("key_prefix", "trans:v2")), @@ -228,31 +269,23 @@ def get_translation_cache_config() -> Dict[str, Any]: def get_embedding_base_url() -> str: - """Resolve embedding HTTP base URL.""" - base = ( - os.getenv("EMBEDDING_SERVICE_URL") - or get_embedding_config().providers.get("http", {}).get("base_url") - ) + base = os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_config().providers.get("http", {}).get("base_url") if not base: raise ValueError("Embedding HTTP base_url is not configured") return str(base).rstrip("/") -def get_rerank_service_url() -> str: - """Resolve rerank service URL (full path including /rerank).""" +def get_rerank_base_url() -> str: base = ( os.getenv("RERANKER_SERVICE_URL") or get_rerank_config().providers.get("http", {}).get("service_url") or get_rerank_config().providers.get("http", {}).get("base_url") ) if not base: - raise ValueError("Rerank HTTP service_url/base_url is not configured") - base = str(base).rstrip("/") - return base if base.endswith("/rerank") else f"{base}/rerank" + raise ValueError("Rerank HTTP base_url is not configured") + return str(base).rstrip("/") -def clear_services_cache() -> None: - """Clear cached config (for tests).""" - get_translation_config.cache_clear() - get_embedding_config.cache_clear() - get_rerank_config.cache_clear() +def get_rerank_service_url() -> str: + """Backward-compatible alias.""" + return get_rerank_base_url() diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 67166ee..736bc2c 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -166,14 +166,14 @@ docs/ # 文档(含本指南) ### 4.8 providers -- **职责**:统一“能力”的调用方式:翻译、向量、重排均通过工厂函数(如 `create_translation_provider()`、`create_rerank_provider()`、`create_embedding_provider()`)获取实现,配置来自 `config/services_config`(即 `config.yaml` 的 `services` + 环境变量)。 -- **原则**:业务代码只依赖 Provider 接口,不依赖具体 URL 或后端类型;新增调用方式(如新 Provider 类型)在对应 `providers/.py` 中实现并在工厂中注册。 +- **职责**:统一“能力”的调用方式。向量、重排仍是标准 provider 工厂;翻译侧的 `create_translation_provider()` 现在固定返回 translator service client,由 6006 服务统一承接后端选择与路由。 +- **原则**:业务代码只依赖调用接口,不依赖具体 URL 或服务内后端类型;翻译能力新增时优先扩展 `translation/backends/` 与 `services.translation.capabilities`,而不是在业务侧新增 provider 分支。 - **详见**:本指南 §7.2;[QUICKSTART.md](./QUICKSTART.md) §3。 -补充约定(翻译 provider): +补充约定(翻译 client): - `translate(text=...)` 支持 `str` 与 `List[str]` 两种输入;当输入为列表时,输出必须与输入 **等长且顺序对应**,失败位置为 `None`(HTTP JSON 表现为 `null`)。 -- provider 可暴露 `supports_batch: bool`(property)用于标识其是否支持直接批量调用;上层在处理 `text` 为列表时可优先走 batch,否则逐条拆分调用。 +- client / backend 可暴露 `supports_batch: bool`(property)用于标识其是否支持直接批量调用;上层在处理 `text` 为列表时可优先走 batch,否则逐条拆分调用。 ### 4.9 suggestion @@ -197,18 +197,19 @@ docs/ # 文档(含本指南) ### 5.2 配置驱动 - 搜索行为(字段权重、搜索域、排序、function_score、重排融合参数等)来自 `config/config.yaml`,由 `ConfigLoader` 加载。 -- 能力访问(翻译/向量/重排的 provider、URL、后端类型)来自 `config.yaml` 的 `services` 块及环境变量,由 `config/services_config` 解析。 +- 能力访问来自 `config.yaml` 的 `services` 块及环境变量,由 `config/services_config` 解析。 +- 其中翻译单独采用“service + capabilities”模型:调用方只配 `service_url` / `default_model` / `default_scene`,服务内通过 `capabilities` 控制启用哪些翻译能力。 - 新增开关或参数时,优先在现有 config 结构下扩展,避免新增散落配置文件。 ### 5.3 单一配置源与优先级 - 同一类配置只在一个地方定义默认值;覆盖顺序约定为:**环境变量 > config 文件**。 -- 服务 URL、后端类型等均在 `services.` 下配置;环境变量用于部署态覆盖(如 `RERANKER_SERVICE_URL`、`RERANK_BACKEND`)。 +- 服务 URL、后端类型等均在 `services.` 下配置;环境变量用于部署态覆盖(如 `TRANSLATION_SERVICE_URL`、`TRANSLATION_MODEL`、`RERANKER_SERVICE_URL`、`RERANK_BACKEND`)。 -### 5.4 调用方与实现解耦(Provider + Backend) +### 5.4 调用方与实现解耦(Client + Backend) -- **调用方**:通过 Provider(如 `HttpRerankProvider`)访问能力,不依赖具体 URL 或服务内实现。 -- **服务内**:通过“后端”实现具体推理(如 BGE 与 Qwen3-vLLM);后端实现协议、在配置与工厂中注册即可插拔。 +- **调用方**:通过 client/provider 访问能力,不依赖具体 URL 或服务内实现;翻译调用方统一连 translator service。 +- **服务内**:通过“后端”实现具体推理(如 qwen-mt、DeepL、LLM、本地模型;或 BGE 与 Qwen3-vLLM);后端实现协议、在配置与工厂中注册即可插拔。 - 新增“一种调用方式”在 providers 中扩展;新增“一种推理实现”在对应服务的 backends 中扩展,并遵循本指南 §7。 ### 5.5 协议契约 @@ -246,7 +247,7 @@ docs/ # 文档(含本指南) ### 6.1 主配置文件 -- **config/config.yaml**:搜索行为(field_boosts、query_config.search_fields、query_config.text_query_strategy,含翻译失败时的原文兜底 boost、ranking、function_score、rerank 融合参数)、SPU 配置、**services**(翻译/向量/重排的 provider 与 backends)、tenant_config 等。 +- **config/config.yaml**:搜索行为(field_boosts、query_config.search_fields、query_config.text_query_strategy,含翻译失败时的原文兜底 boost、ranking、function_score、rerank 融合参数)、SPU 配置、**services**(翻译 service 与 capabilities、向量/重排的 provider 与 backends)、tenant_config 等。 - **.env**:敏感信息与部署态变量(DB、ES、Redis、API Key、端口等);不提交敏感值,可提供 `.env.example` 模板。 ### 6.2 services 块结构(能力统一约定) @@ -265,14 +266,31 @@ services: qwen3_vllm: { ... } ``` +翻译是特例,结构为: + +```yaml +services: + translation: + service_url: "http://127.0.0.1:6006" + default_model: "llm" + default_scene: "general" + timeout_sec: 10.0 + capabilities: + llm: { enabled: true, model: "qwen-flash" } + qwen-mt: { enabled: true, model: "qwen-mt-flash" } + deepl: { enabled: false, timeout_sec: 10.0 } +``` + - **provider**:调用方如何访问(如 HTTP)。 - **backend / backends**:当能力由本仓库内服务提供时,该服务加载哪个后端及参数。 +- **translation.service_url**:业务侧统一调用的翻译服务地址。 +- **translation.capabilities**:翻译服务内部可启用的能力注册表。 - 解析入口:`config/services_config.py` 的 `get_*_config()` 及 `get_*_base_url()` / `get_rerank_service_url()` 等。 ### 6.3 环境变量(常用) - 能力 URL:`TRANSLATION_SERVICE_URL`、`EMBEDDING_SERVICE_URL`、`RERANKER_SERVICE_URL` -- 能力选择:`TRANSLATION_PROVIDER`、`EMBEDDING_PROVIDER`、`EMBEDDING_BACKEND`、`RERANK_PROVIDER`、`RERANK_BACKEND` +- 能力选择:`TRANSLATION_MODEL`、`TRANSLATION_SCENE`、`EMBEDDING_PROVIDER`、`EMBEDDING_BACKEND`、`RERANK_PROVIDER`、`RERANK_BACKEND` - 环境与索引:`ES_HOST`、`ES_INDEX_NAMESPACE`、`RUNTIME_ENV`、DB 与 Redis 等 详见 [QUICKSTART.md](./QUICKSTART.md) §1.6(.env 与生产凭证)、[Usage-Guide.md](./Usage-Guide.md)。 @@ -293,6 +311,13 @@ services: 3. 在 `config/config.yaml` 的 `services..providers` 下补充参数。 4. 不修改业务调用方(search/query/indexer 仍通过工厂获取实例)。 +翻译不再按这套扩展。新增翻译能力时: + +1. 在 `translation/backends/` 中实现新 backend。 +2. 在 `translation/service.py` 中注册工厂。 +3. 在 `services.translation.capabilities.` 下增加配置,并用 `enabled` 控制是否启用。 +4. 业务调用方保持不变,仍只通过 `create_translation_provider()` 调 6006。 + ### 7.3 新增 Backend(推理实现) 1. **实现协议**:在对应目录(如 `reranker/backends/`、`embeddings/`)实现满足协议接口的类。 diff --git a/docs/TODO.txt b/docs/TODO.txt index 3af2428..1f04326 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -84,7 +84,7 @@ When sorting on a field, scores are not computed. By setting track_scores to tru provider backend 两者的关系,如何配合。 translator的设计 : -QueryParser 里面 并不是调用的6006,目前是把6006做了一个provider,然后translate的总体配置又有6006的baseurl,很混乱!!! +QueryParser 里面 并不是调用的6006,目前是把6006做了一个provider,然后translate的总体配置又有6006的baseurl,很混乱。 config.yaml 里面的 翻译的配置 不是“6006 专用配置”,而是搜索服务的 6006本来之前是做一个provider。 diff --git a/docs/翻译模块说明.md b/docs/翻译模块说明.md index 5be5c3e..8b3c1e6 100644 --- a/docs/翻译模块说明.md +++ b/docs/翻译模块说明.md @@ -12,7 +12,9 @@ DASHSCOPE_API_KEY=sk-xxx DEEPL_AUTH_KEY=xxx # 可选 -TRANSLATION_MODEL=qwen # 或 deepl +TRANSLATION_SERVICE_URL=http://127.0.0.1:6006 +TRANSLATION_MODEL=llm # 默认能力;也可传 qwen-mt / deepl +TRANSLATION_SCENE=general ``` > **重要限速说明(Qwen 机翻)** @@ -21,9 +23,34 @@ TRANSLATION_MODEL=qwen # 或 deepl > - 高并发场景需要在调用端做限流 / 去抖,或改为离线批量翻译 > - 如需更高吞吐,可考虑 DeepL 或自建翻译服务 -## Provider 配置 - -Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [QUICKSTART.md](./QUICKSTART.md) §3 与 [DEVELOPER_GUIDE.md](./DEVELOPER_GUIDE.md) §7.2。 +## 配置模型 + +翻译已改为“一个翻译服务 + 多种翻译能力”的结构: + +- 业务侧(`QueryParser` / indexer)统一调用 `http://127.0.0.1:6006` +- 服务内按 `services.translation.capabilities` 加载并管理各翻译能力 +- 每种能力独立配置 `enabled`、`model`、`timeout` 等参数 +- 外部接口通过 `model + scene` 指定本次使用哪种能力、哪个场景 + +配置入口在 `config/config.yaml -> services.translation`,核心字段示例: + +```yaml +services: + translation: + service_url: "http://127.0.0.1:6006" + default_model: "llm" + default_scene: "general" + timeout_sec: 10.0 + capabilities: + qwen-mt: + enabled: true + model: "qwen-mt-flash" + llm: + enabled: true + model: "qwen-flash" + deepl: + enabled: false +``` ## HTTP 接口契约(translator service,端口 6006) @@ -41,8 +68,8 @@ Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [Q "text": "商品名称", "target_lang": "en", "source_lang": "zh", - "model": "qwen", - "context": "sku_name", + "model": "qwen-mt", + "scene": "sku_name", "prompt": null } ``` @@ -60,7 +87,8 @@ Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [Q "source_lang": "zh", "translated_text": "Product name", "status": "success", - "model": "qwen" + "model": "qwen-mt", + "scene": "sku_name" } ``` @@ -73,17 +101,24 @@ Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [Q "source_lang": "zh", "translated_text": ["Product name 1", null], "status": "success", - "model": "qwen" + "model": "qwen-mt", + "scene": "sku_name" } ``` 批量模式下,**单条失败用 `null` 占位**(即 `translated_text[i] = null`),保证长度与顺序一一对应,避免部分失败导致整批报错。 +说明: + +- `scene` 是标准字段,`context` 仅保留为兼容别名 +- `model` 只能选择已在 `services.translation.capabilities` 中启用的能力 +- `/health` 会返回 `default_model`、`default_scene` 与 `enabled_capabilities` + --- -## 开发者接口约定(Provider / 代码调用) +## 开发者接口约定(代码调用) -除 HTTP 微服务外,代码侧(如 query/indexer)通常通过 `providers.translation.create_translation_provider()` 获取翻译 provider 实例并调用 `translate()`。 +代码侧(如 query/indexer)仍通过 `providers.translation.create_translation_provider()` 获取实例并调用 `translate()`,但该实例现在固定是 **translator service client**,不再在业务侧做翻译 provider 选择。 ### 输入输出形状(Shape) @@ -94,13 +129,8 @@ Provider 与 URL 在 `config/config.yaml` 的 `services.translation`。详见 [Q ### 批量能力标识(supports_batch) -不同 provider 对批量的实现方式可能不同(例如:真正一次请求传多条,或内部循环逐条翻译并保持 shape)。 +服务客户端与服务内后端都可以暴露 `supports_batch`。若后端不支持批量,服务端会逐条拆分并保持 shape。 为便于上层(如 `api/translator_app.py`)做最优调用,provider 可暴露: - `supports_batch: bool`(property) - -上层在收到 `text` 为列表时: - -- **若 `supports_batch=True`**:可以直接将列表传给 `translate(text=[...])` -- **若 `supports_batch=False`**:上层会逐条拆分调用(仍保证输出列表一一对应、失败为 `null`) diff --git a/providers/translation.py b/providers/translation.py index c904931..11154a1 100644 --- a/providers/translation.py +++ b/providers/translation.py @@ -1,175 +1,28 @@ -"""Translation provider factory and HTTP provider implementation.""" -from __future__ import annotations - -import logging -from typing import Any, Dict, List, Optional, Sequence, Union -import requests - -from config.services_config import get_translation_config, get_translation_base_url - -logger = logging.getLogger(__name__) - - -class HttpTranslationProvider: - """Translation via HTTP service.""" - - def __init__( - self, - base_url: str, - model: str = "qwen", - timeout_sec: float = 10.0, - ): - self.base_url = (base_url or "").rstrip("/") - self.model = model or "qwen" - self.timeout_sec = float(timeout_sec or 10.0) - - @property - def supports_batch(self) -> bool: - """ - Whether this provider supports list input natively. - - 当前实现中,我们已经在 `_translate_once` 内处理了 list, - 所以可以直接视为支持 batch。 - """ - return True +"""Translation client factory for business callers.""" - def _translate_once( - self, - text: Union[str, Sequence[str]], - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Union[Optional[str], List[Optional[str]]]: - # 允许 text 为单个字符串或字符串列表 - if isinstance(text, (list, tuple)): - # 上游约定:列表输入时,输出列表一一对应;失败位置为 None - results: List[Optional[str]] = [] - for item in text: - if item is None or not str(item).strip(): - # 空字符串/None 不视为失败,原样返回以保持语义 - results.append(item) # type: ignore[arg-type] - continue - try: - single = self._translate_once( - text=str(item), - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) - results.append(single) # type: ignore[arg-type] - except Exception: - # 理论上不会进入,因为内部已捕获;兜底保持长度一致 - results.append(None) - return results +from __future__ import annotations - if not text or not str(text).strip(): - return text # type: ignore[return-value] - try: - url = f"{self.base_url}/translate" - payload = { - "text": text, - "target_lang": target_lang, - "source_lang": source_lang or "auto", - "model": self.model, - } - if context: - payload["context"] = context - if prompt: - payload["prompt"] = prompt - response = requests.post(url, json=payload, timeout=self.timeout_sec) - if response.status_code != 200: - logger.warning( - "HTTP translator failed: status=%s body=%s", - response.status_code, - (response.text or "")[:200], - ) - return None - data = response.json() - translated = data.get("translated_text") - return translated if translated is not None else None - except Exception as exc: - logger.warning("HTTP translator request failed: %s", exc, exc_info=True) - return None +from typing import Any - def translate( - self, - text: Union[str, Sequence[str]], - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Union[Optional[str], List[Optional[str]]]: - return self._translate_once( - text=text, - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) +from config.services_config import get_translation_config +from translation.client import TranslationServiceClient -def create_translation_provider(query_config: Any = None) -> Any: +def create_translation_provider(query_config: Any = None) -> TranslationServiceClient: """ - Create translation provider from services config. + Create a translation client. - query_config: optional, for api_key/glossary_id/context (used by direct provider). + Translation is no longer selected via provider mechanism on the caller side. + Search / indexer always talk to the translator service, while the service + itself decides which translation capabilities are enabled and how to route. """ - cfg = get_translation_config() - provider = cfg.provider - pc = cfg.get_provider_cfg() - - if provider in ("qwen-mt", "direct", "local", "inprocess"): - from query.qwen_mt_translate import Translator - model = pc.get("model") or "qwen-mt-flash" - qc = query_config or _empty_query_config() - return Translator( - model=model, - api_key=getattr(qc, "translation_api_key", None), - use_cache=True, - glossary_id=getattr(qc, "translation_glossary_id", None), - translation_context=getattr(qc, "translation_context", "e-commerce product search"), - ) - - elif provider in ("http", "service"): - base_url = get_translation_base_url() - model = pc.get("model") or "qwen" - timeout = pc.get("timeout_sec", 10.0) - qc = query_config or _empty_query_config() - return HttpTranslationProvider( - base_url=base_url, - model=model, - timeout_sec=float(timeout), - ) - - elif provider == "llm": - from query.llm_translate import LLMTranslatorProvider - model = pc.get("model") - timeout = float(pc.get("timeout_sec", 30.0)) - base_url = (pc.get("base_url") or "").strip() or None - return LLMTranslatorProvider( - model=model, - timeout_sec=timeout, - base_url=base_url, - ) - elif provider == "deepl": - from query.deepl_provider import DeepLProvider - qc = query_config or _empty_query_config() - return DeepLProvider( - api_key=getattr(qc, "translation_api_key", None), - timeout=float(pc.get("timeout_sec", 10.0)), - glossary_id=pc.get("glossary_id") or getattr(qc, "translation_glossary_id", None), - ) - - raise ValueError(f"Unsupported translation provider: {provider}") - - -def _empty_query_config() -> Any: - """Minimal object with default translation attrs.""" - class _QC: - translation_api_key = None - translation_glossary_id = None - translation_context = "e-commerce product search" - return _QC() + cfg = get_translation_config() + qc = query_config + default_scene = getattr(qc, "translation_context", None) if qc is not None else None + return TranslationServiceClient( + base_url=cfg.service_url, + default_model=cfg.default_model, + default_scene=default_scene or cfg.default_scene, + timeout_sec=cfg.timeout_sec, + ) diff --git a/query/deepl_provider.py b/query/deepl_provider.py index a134d68..6932288 100644 --- a/query/deepl_provider.py +++ b/query/deepl_provider.py @@ -1,227 +1,3 @@ -""" -DeepL backend provider. - -This module only handles network calls to DeepL. -It does not handle cache, async fanout, or fallback semantics. -""" - -from __future__ import annotations - -import logging -import os -import re -from typing import Dict, List, Optional, Sequence, Tuple, Union - -import requests -from config.services_config import get_translation_config - - -logger = logging.getLogger(__name__) - -DEFAULT_CONTEXTS: Dict[str, Dict[str, str]] = { - "sku_name": { - "zh": "商品SKU名称", - "en": "product SKU name", - }, - "ecommerce_search_query": { - "zh": "电商", - "en": "e-commerce", - }, - "general": { - "zh": "", - "en": "", - }, -} -SCENE_NAMES = frozenset(DEFAULT_CONTEXTS.keys()) - - -def _merge_contexts(raw: object) -> Dict[str, Dict[str, str]]: - merged: Dict[str, Dict[str, str]] = { - scene: dict(lang_map) for scene, lang_map in DEFAULT_CONTEXTS.items() - } - if not isinstance(raw, dict): - return merged - for scene, lang_map in raw.items(): - if not isinstance(lang_map, dict): - continue - scene_name = str(scene or "").strip() - if not scene_name: - continue - merged.setdefault(scene_name, {}) - for lang, value in lang_map.items(): - lang_key = str(lang or "").strip().lower() - context_value = str(value or "").strip() - if lang_key and context_value: - merged[scene_name][lang_key] = context_value - return merged - - -class DeepLProvider: - API_URL = "https://api.deepl.com/v2/translate" # Pro tier - LANG_CODE_MAP = { - "zh": "ZH", - "en": "EN", - "ru": "RU", - "ar": "AR", - "ja": "JA", - "es": "ES", - "de": "DE", - "fr": "FR", - "it": "IT", - "pt": "PT", - } - - def __init__( - self, - api_key: Optional[str], - *, - timeout: float = 10.0, - glossary_id: Optional[str] = None, - ) -> None: - cfg = get_translation_config() - provider_cfg = cfg.providers.get("deepl", {}) if isinstance(cfg.providers, dict) else {} - self.api_key = api_key or os.getenv("DEEPL_AUTH_KEY") - self.timeout = float(provider_cfg.get("timeout_sec") or timeout or 10.0) - self.glossary_id = glossary_id or provider_cfg.get("glossary_id") - self.model = "deepl" - self.context_presets = _merge_contexts(provider_cfg.get("contexts")) - if not self.api_key: - logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable") - - @property - def supports_batch(self) -> bool: - """ - DeepL HTTP API 本身支持一次传多条 text,这里先返回 False, - 由上层逐条拆分,后续如果要真正批量,可调整实现。 - """ - return False - - def _resolve_request_context( - self, - target_lang: str, - context: Optional[str], - prompt: Optional[str], - ) -> Optional[str]: - if prompt: - return prompt - if context in SCENE_NAMES: - scene_map = self.context_presets.get(context) or self.context_presets.get("default") or {} - tgt = (target_lang or "").strip().lower() - return scene_map.get(tgt) or scene_map.get("en") - if context: - return context - scene_map = self.context_presets.get("default") or {} - tgt = (target_lang or "").strip().lower() - return scene_map.get(tgt) or scene_map.get("en") - - def translate( - self, - text: Union[str, Sequence[str]], - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Union[Optional[str], List[Optional[str]]]: - if isinstance(text, (list, tuple)): - results: List[Optional[str]] = [] - for item in text: - if item is None or not str(item).strip(): - results.append(item) # type: ignore[arg-type] - continue - out = self.translate( - text=str(item), - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) - results.append(out) - return results - - if not self.api_key: - return None - - target_code = self.LANG_CODE_MAP.get((target_lang or "").lower(), (target_lang or "").upper()) - headers = { - "Authorization": f"DeepL-Auth-Key {self.api_key}", - "Content-Type": "application/json", - } - - api_context = self._resolve_request_context(target_lang, context, prompt) - text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) - - payload = { - "text": [text_to_translate], - "target_lang": target_code, - } - if source_lang: - payload["source_lang"] = self.LANG_CODE_MAP.get(source_lang.lower(), source_lang.upper()) - if api_context: - payload["context"] = api_context - if self.glossary_id: - payload["glossary_id"] = self.glossary_id - - try: - response = requests.post(self.API_URL, headers=headers, json=payload, timeout=self.timeout) - if response.status_code != 200: - logger.warning( - "[deepl] Failed | status=%s tgt=%s body=%s", - response.status_code, - target_code, - (response.text or "")[:200], - ) - return None - - data = response.json() - translations = data.get("translations") or [] - if not translations: - return None - translated = translations[0].get("text") - if not translated: - return None - if needs_extraction: - translated = self._extract_term_from_translation(translated, text, target_code) - return translated - except requests.Timeout: - logger.warning("[deepl] Timeout | tgt=%s timeout=%.1fs", target_code, self.timeout) - return None - except Exception as exc: - logger.warning("[deepl] Exception | tgt=%s error=%s", target_code, exc, exc_info=True) - return None - - def _add_ecommerce_context( - self, - text: str, - source_lang: Optional[str], - context: Optional[str], - ) -> Tuple[str, bool]: - if not context or "e-commerce" not in context.lower(): - return text, False - if (source_lang or "").lower() != "zh": - return text, False - - term = (text or "").strip() - if len(term.split()) == 1 and len(term) <= 2: - return f"购买 {term}", True - return text, False - - def _extract_term_from_translation( - self, - translated_text: str, - original_text: str, - target_lang_code: str, - ) -> str: - del original_text - if target_lang_code != "EN": - return translated_text - - words = translated_text.strip().split() - if len(words) <= 1: - return translated_text - context_words = {"buy", "purchase", "product", "item", "commodity", "goods"} - for word in reversed(words): - normalized = re.sub(r"[.,!?;:]+$", "", word.lower()) - if normalized not in context_words: - return normalized - return re.sub(r"[.,!?;:]+$", "", words[-1].lower()) +"""Backward-compatible import for DeepL translation backend.""" +from translation.backends.deepl import DeepLProvider, DeepLTranslationBackend diff --git a/query/llm_translate.py b/query/llm_translate.py index f0af1e0..6723a86 100644 --- a/query/llm_translate.py +++ b/query/llm_translate.py @@ -1,238 +1,5 @@ -""" -LLM-based translation backend (DashScope-compatible OpenAI API). +"""Backward-compatible import for LLM translation backend.""" -Failure semantics are strict: -- success: translated string -- failure: None -""" +from translation.backends.llm import LLMTranslationBackend, LLMTranslatorProvider, llm_translate -from __future__ import annotations - -import logging -import os -import time -from typing import List, Optional, Sequence, Union - -from openai import OpenAI - -from config.env_config import DASHSCOPE_API_KEY -from config.services_config import get_translation_config -from config.translate_prompts import TRANSLATION_PROMPTS -from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP - - -logger = logging.getLogger(__name__) - - -DEFAULT_QWEN_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" -DEFAULT_LLM_MODEL = "qwen-flash" - - -def _build_prompt( - text: str, - *, - source_lang: Optional[str], - target_lang: str, - scene: Optional[str], -) -> str: - """ - 从 config.translate_prompts.TRANSLATION_PROMPTS 中构建提示词。 - - 要求:模板必须包含 {source_lang}({src_lang_code}){target_lang}({tgt_lang_code})。 - 这里统一使用 code 作为占位的 lang 与 label,外部接口仍然只传语言 code。 - """ - tgt = (target_lang or "").lower() or "en" - src = (source_lang or "auto").lower() - - # 将业务上下文 scene 映射为模板分组名 - normalized_scene = (scene or "").strip() or "general" - # 如果出现历史词,则报错,用于发现错误 - if normalized_scene in {"query", "ecommerce_search", "ecommerce_search_query"}: - group_key = "ecommerce_search_query" - elif normalized_scene in {"product_title", "sku_name"}: - group_key = "sku_name" - else: - group_key = normalized_scene - group = TRANSLATION_PROMPTS.get(group_key) or TRANSLATION_PROMPTS["general"] - - # 先按目标语言 code 取模板,取不到回退到英文 - template = group.get(tgt) or group.get("en") - if not template: - # 理论上不会发生,兜底一个简单模板 - template = ( - "You are a professional {source_lang} ({src_lang_code}) to " - "{target_lang} ({tgt_lang_code}) translator, output only the translation: {text}" - ) - - # 目前不额外维护语言名称映射,直接使用 code 作为 label - source_lang_label = SOURCE_LANG_CODE_MAP.get(src, src) - target_lang_label = SOURCE_LANG_CODE_MAP.get(tgt, tgt) - - return template.format( - source_lang=source_lang_label, - src_lang_code=src, - target_lang=target_lang_label, - tgt_lang_code=tgt, - text=text, - ) - - -class LLMTranslatorProvider: - def __init__( - self, - *, - model: Optional[str] = None, - timeout_sec: float = 30.0, - base_url: Optional[str] = None, - ) -> None: - cfg = get_translation_config() - llm_cfg = cfg.providers.get("llm", {}) if isinstance(cfg.providers, dict) else {} - self.model = model or llm_cfg.get("model") or DEFAULT_LLM_MODEL - self.timeout_sec = float(llm_cfg.get("timeout_sec") or timeout_sec or 30.0) - self.base_url = ( - (base_url or "").strip() - or (llm_cfg.get("base_url") or "").strip() - or os.getenv("DASHSCOPE_BASE_URL") - or DEFAULT_QWEN_BASE_URL - ) - self.client = self._create_client() - - @property - def supports_batch(self) -> bool: - """Whether this provider efficiently supports list input.""" - # 我们在 translate 中已经原生支持 list,所以这里返回 True - return True - - def _create_client(self) -> Optional[OpenAI]: - api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - if not api_key: - logger.warning("DASHSCOPE_API_KEY not set; llm translation unavailable") - return None - try: - return OpenAI(api_key=api_key, base_url=self.base_url) - except Exception as exc: - logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True) - return None - - def _translate_single( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Optional[str]: - if not text or not str(text).strip(): - return text - if not self.client: - return None - - tgt = (target_lang or "").lower() or "en" - src = (source_lang or "auto").lower() - scene = context or "default" - user_prompt = prompt or _build_prompt( - text=text, - source_lang=src, - target_lang=tgt, - scene=scene, - ) - start = time.time() - try: - logger.info( - "[llm] Request | src=%s tgt=%s model=%s prompt=%s", - src, - tgt, - self.model, - user_prompt, - ) - completion = self.client.chat.completions.create( - model=self.model, - messages=[{"role": "user", "content": user_prompt}], - timeout=self.timeout_sec, - ) - content = (completion.choices[0].message.content or "").strip() - latency_ms = (time.time() - start) * 1000 - if not content: - logger.warning("[llm] Empty result | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms) - return None - logger.info( - "[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms", - src, - tgt, - text, - content, - latency_ms, - ) - return content - except Exception as exc: - latency_ms = (time.time() - start) * 1000 - logger.warning( - "[llm] Failed | src=%s tgt=%s latency=%.1fms error=%s", - src, - tgt, - latency_ms, - exc, - exc_info=True, - ) - return None - - def translate( - self, - text: Union[str, Sequence[str]], - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Union[Optional[str], List[Optional[str]]]: - """ - Translate a single string or a list of strings. - - - If input is a list, returns a list of the same length. - - Per-item failures are returned as None. - """ - if isinstance(text, (list, tuple)): - results: List[Optional[str]] = [] - for item in text: - # 保证一一对应,即使某个元素为空也占位 - if item is None: - results.append(None) - continue - results.append( - self._translate_single( - text=str(item), - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) - ) - return results - - return self._translate_single( - text=str(text), - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) - - -def llm_translate( - text: Union[str, Sequence[str]], - target_lang: str, - *, - source_lang: Optional[str] = None, - source_lang_label: Optional[str] = None, - target_lang_label: Optional[str] = None, - timeout_sec: Optional[float] = None, -) -> Union[Optional[str], List[Optional[str]]]: - provider = LLMTranslatorProvider(timeout_sec=timeout_sec or 30.0) - return provider.translate( - text=text, - target_lang=target_lang, - source_lang=source_lang, - context=None, - ) - - -__all__ = ["LLMTranslatorProvider", "llm_translate"] +__all__ = ["LLMTranslationBackend", "LLMTranslatorProvider", "llm_translate"] diff --git a/query/query_parser.py b/query/query_parser.py index 1927421..1ff110e 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -133,7 +133,11 @@ class QueryParser: if self._translator is None: from config.services_config import get_translation_config cfg = get_translation_config() - logger.info("Initializing translator at QueryParser construction (provider=%s)...", cfg.provider) + logger.info( + "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...", + cfg.service_url, + cfg.default_model, + ) self._translator = create_translation_provider(self.config.query_config) self._translation_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="query-translation") diff --git a/query/qwen_mt_translate.py b/query/qwen_mt_translate.py index 48273c5..03e9552 100644 --- a/query/qwen_mt_translate.py +++ b/query/qwen_mt_translate.py @@ -1,287 +1,5 @@ -"""Qwen-MT translation orchestrator with cache and async helpers.""" +"""Backward-compatible import for Qwen-MT translation backend.""" -from __future__ import annotations +from translation.backends.qwen_mt import QwenMTTranslationBackend, Translator -import hashlib -import logging -import os -import re -import time -from typing import Dict, List, Optional, Sequence, Union - -import redis -from openai import OpenAI - -from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG -from config.services_config import get_translation_cache_config -from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP - -logger = logging.getLogger(__name__) - - -class Translator: - QWEN_DEFAULT_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" - QWEN_MODEL = "qwen-mt-flash" - - def __init__( - self, - model: str = "qwen", - api_key: Optional[str] = None, - use_cache: bool = True, - timeout: int = 10, - glossary_id: Optional[str] = None, - translation_context: Optional[str] = None, - ): - self.model = self._normalize_model(model) - self.timeout = int(timeout) - self.use_cache = bool(use_cache) - self.glossary_id = glossary_id - self.translation_context = translation_context or "e-commerce product search" - - cache_cfg = get_translation_cache_config() - self.cache_prefix = str(cache_cfg.get("key_prefix", "trans:v2")) - self.expire_seconds = int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600)) - self.cache_sliding_expiration = bool(cache_cfg.get("sliding_expiration", True)) - self.cache_include_context = bool(cache_cfg.get("key_include_context", True)) - self.cache_include_prompt = bool(cache_cfg.get("key_include_prompt", True)) - self.cache_include_source_lang = bool(cache_cfg.get("key_include_source_lang", True)) - - self.qwen_model_name = self._resolve_qwen_model_name(model) - self._api_key = api_key or self._default_api_key(self.model) - self._qwen_client: Optional[OpenAI] = None - base_url = os.getenv("DASHSCOPE_BASE_URL") or self.QWEN_DEFAULT_BASE_URL - if self._api_key: - try: - self._qwen_client = OpenAI(api_key=self._api_key, base_url=base_url) - except Exception as exc: - logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) - else: - logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") - - self.redis_client = None - if self.use_cache and bool(cache_cfg.get("enabled", True)): - self.redis_client = self._init_redis_client() - - @property - def supports_batch(self) -> bool: - """ - 标记该 provider 已支持列表输入。 - - 当前实现为循环单条调用(带缓存),不是真正的并行批量请求, - 但对上层来说可以直接传 list,返回 list。 - """ - return True - - @staticmethod - def _normalize_model(model: str) -> str: - m = (model or "qwen").strip().lower() - if m.startswith("qwen"): - return "qwen-mt" - raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'qwen-mt', 'qwen-mt-flash'") - - @staticmethod - def _resolve_qwen_model_name(model: str) -> str: - m = (model or "qwen").strip().lower() - if m in {"qwen", "qwen-mt"}: - return "qwen-mt-flash" - return m - - @staticmethod - def _default_api_key(model: str) -> Optional[str]: - del model - return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - - def _init_redis_client(self): - try: - client = redis.Redis( - host=REDIS_CONFIG.get("host", "localhost"), - port=REDIS_CONFIG.get("port", 6479), - password=REDIS_CONFIG.get("password"), - decode_responses=True, - socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), - socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), - retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), - health_check_interval=10, - ) - client.ping() - return client - except Exception as exc: - logger.warning("Failed to initialize translation redis cache: %s", exc) - return None - - def _build_cache_key( - self, - text: str, - target_lang: str, - source_lang: Optional[str], - context: Optional[str], - prompt: Optional[str], - ) -> str: - src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-" - tgt = (target_lang or "").strip().lower() - ctx = (context or "").strip() if self.cache_include_context else "" - prm = (prompt or "").strip() if self.cache_include_prompt else "" - payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nctx={ctx}\nprm={prm}\ntext={text}" - digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() - return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}" - - def translate( - self, - text: Union[str, Sequence[str]], - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Union[Optional[str], List[Optional[str]]]: - if isinstance(text, (list, tuple)): - results: List[Optional[str]] = [] - for item in text: - if item is None or not str(item).strip(): - results.append(item) # type: ignore[arg-type] - continue - # 对于 batch,这里沿用单条的缓存与规则,逐条调用 - out = self.translate( - text=str(item), - target_lang=target_lang, - source_lang=source_lang, - context=context, - prompt=prompt, - ) - results.append(out) - return results - - if not text or not str(text).strip(): - return text # type: ignore[return-value] - - tgt = (target_lang or "").strip().lower() - src = (source_lang or "").strip().lower() or None - if tgt == "en" and self._is_english_text(text): - return text - if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): - return text - - translation_context = context or self.translation_context - cached = self._get_cached_translation_redis(text, tgt, src, translation_context, prompt) - if cached is not None: - return cached - - result = self._translate_qwen(text, tgt, src) - - if result is not None: - self._set_cached_translation_redis(text, tgt, result, src, translation_context, prompt) - return result - - def _translate_qwen( - self, - text: str, - target_lang: str, - source_lang: Optional[str], - ) -> Optional[str]: - if not self._qwen_client: - return None - tgt_norm = (target_lang or "").strip().lower() - src_norm = (source_lang or "").strip().lower() - tgt_qwen = self.SOURCE_LANG_CODE_MAP.get(tgt_norm, tgt_norm.capitalize()) - src_qwen = "auto" if not src_norm or src_norm == "auto" else self.SOURCE_LANG_CODE_MAP.get(src_norm, src_norm.capitalize()) - start = time.time() - try: - completion = self._qwen_client.chat.completions.create( - model=self.qwen_model_name, - messages=[{"role": "user", "content": text}], - extra_body={ - "translation_options": { - "source_lang": src_qwen, - "target_lang": tgt_qwen, - } - }, - timeout=self.timeout, - ) - content = (completion.choices[0].message.content or "").strip() - if not content: - return None - logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) - return content - except Exception as exc: - logger.warning( - "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", - src_qwen, - tgt_qwen, - (time.time() - start) * 1000, - exc, - exc_info=True, - ) - return None - - - def _get_cached_translation_redis( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Optional[str]: - if not self.redis_client: - return None - key = self._build_cache_key(text, target_lang, source_lang, context, prompt) - try: - value = self.redis_client.get(key) - if value and self.cache_sliding_expiration: - self.redis_client.expire(key, self.expire_seconds) - return value - except Exception as exc: - logger.warning("Redis get translation cache failed: %s", exc) - return None - - def _set_cached_translation_redis( - self, - text: str, - target_lang: str, - translation: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> None: - if not self.redis_client: - return - key = self._build_cache_key(text, target_lang, source_lang, context, prompt) - try: - self.redis_client.setex(key, self.expire_seconds, translation) - except Exception as exc: - logger.warning("Redis set translation cache failed: %s", exc) - - def _shop_lang_matches(self, shop_lang_lower: str, lang_code: str) -> bool: - if not shop_lang_lower or not lang_code: - return False - if shop_lang_lower == lang_code: - return True - if lang_code == "zh" and "zh" in shop_lang_lower: - return True - if lang_code == "en" and "en" in shop_lang_lower: - return True - return False - - def get_translation_needs(self, detected_lang: str, supported_langs: List[str]) -> List[str]: - if detected_lang in supported_langs: - return [lang for lang in supported_langs if lang != detected_lang] - return supported_langs - - def _is_english_text(self, text: str) -> bool: - if not text or not text.strip(): - return True - text_clean = re.sub(r"[\s\.,!?;:\-\'\"\(\)\[\]{}]", "", text) - if not text_clean: - return True - ascii_count = sum(1 for c in text_clean if ord(c) < 128) - return (ascii_count / len(text_clean)) > 0.8 - - def _contains_chinese(self, text: str) -> bool: - if not text: - return False - return bool(re.search(r"[\u4e00-\u9fff]", text)) - - def _is_pure_number(self, text: str) -> bool: - if not text or not text.strip(): - return False - text_clean = re.sub(r"[\s\.,]", "", text.strip()) - return bool(text_clean) and text_clean.isdigit() +__all__ = ["QwenMTTranslationBackend", "Translator"] diff --git a/translation/__init__.py b/translation/__init__.py new file mode 100644 index 0000000..546253d --- /dev/null +++ b/translation/__init__.py @@ -0,0 +1,8 @@ +"""Translation package.""" + +__all__ = [ + "client", + "service", + "protocols", + "backends", +] diff --git a/translation/backends/__init__.py b/translation/backends/__init__.py new file mode 100644 index 0000000..2e0af75 --- /dev/null +++ b/translation/backends/__init__.py @@ -0,0 +1,11 @@ +"""Translation backend registry.""" + +from .deepl import DeepLTranslationBackend +from .llm import LLMTranslationBackend +from .qwen_mt import QwenMTTranslationBackend + +__all__ = [ + "DeepLTranslationBackend", + "LLMTranslationBackend", + "QwenMTTranslationBackend", +] diff --git a/translation/backends/deepl.py b/translation/backends/deepl.py new file mode 100644 index 0000000..92173ef --- /dev/null +++ b/translation/backends/deepl.py @@ -0,0 +1,220 @@ +"""DeepL translation backend.""" + +from __future__ import annotations + +import logging +import os +import re +from typing import Dict, List, Optional, Sequence, Tuple, Union + +import requests + +from config.services_config import get_translation_config + +logger = logging.getLogger(__name__) + +DEFAULT_CONTEXTS: Dict[str, Dict[str, str]] = { + "sku_name": { + "zh": "商品SKU名称", + "en": "product SKU name", + }, + "ecommerce_search_query": { + "zh": "电商", + "en": "e-commerce", + }, + "general": { + "zh": "", + "en": "", + }, +} +SCENE_NAMES = frozenset(DEFAULT_CONTEXTS.keys()) + + +def _merge_contexts(raw: object) -> Dict[str, Dict[str, str]]: + merged: Dict[str, Dict[str, str]] = { + scene: dict(lang_map) for scene, lang_map in DEFAULT_CONTEXTS.items() + } + if not isinstance(raw, dict): + return merged + for scene, lang_map in raw.items(): + if not isinstance(lang_map, dict): + continue + scene_name = str(scene or "").strip() + if not scene_name: + continue + merged.setdefault(scene_name, {}) + for lang, value in lang_map.items(): + lang_key = str(lang or "").strip().lower() + context_value = str(value or "").strip() + if lang_key and context_value: + merged[scene_name][lang_key] = context_value + return merged + + +class DeepLTranslationBackend: + API_URL = "https://api.deepl.com/v2/translate" + LANG_CODE_MAP = { + "zh": "ZH", + "en": "EN", + "ru": "RU", + "ar": "AR", + "ja": "JA", + "es": "ES", + "de": "DE", + "fr": "FR", + "it": "IT", + "pt": "PT", + } + + def __init__( + self, + api_key: Optional[str], + *, + timeout: float = 10.0, + glossary_id: Optional[str] = None, + ) -> None: + cfg = get_translation_config() + provider_cfg = cfg.get_capability_cfg("deepl") + self.api_key = api_key or os.getenv("DEEPL_AUTH_KEY") + self.timeout = float(provider_cfg.get("timeout_sec") or timeout or 10.0) + self.glossary_id = glossary_id or provider_cfg.get("glossary_id") + self.model = "deepl" + self.context_presets = _merge_contexts(provider_cfg.get("contexts")) + if not self.api_key: + logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable") + + @property + def supports_batch(self) -> bool: + return False + + def _resolve_request_context( + self, + target_lang: str, + context: Optional[str], + prompt: Optional[str], + ) -> Optional[str]: + if prompt: + return prompt + if context in SCENE_NAMES: + scene_map = self.context_presets.get(context) or self.context_presets.get("default") or {} + tgt = (target_lang or "").strip().lower() + return scene_map.get(tgt) or scene_map.get("en") + if context: + return context + scene_map = self.context_presets.get("default") or {} + tgt = (target_lang or "").strip().lower() + return scene_map.get(tgt) or scene_map.get("en") + + def translate( + self, + text: Union[str, Sequence[str]], + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + if item is None or not str(item).strip(): + results.append(item) # type: ignore[arg-type] + continue + out = self.translate( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + results.append(out) + return results + + if not self.api_key: + return None + + target_code = self.LANG_CODE_MAP.get((target_lang or "").lower(), (target_lang or "").upper()) + headers = { + "Authorization": f"DeepL-Auth-Key {self.api_key}", + "Content-Type": "application/json", + } + + api_context = self._resolve_request_context(target_lang, context, prompt) + text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) + + payload = { + "text": [text_to_translate], + "target_lang": target_code, + } + if source_lang: + payload["source_lang"] = self.LANG_CODE_MAP.get(source_lang.lower(), source_lang.upper()) + if api_context: + payload["context"] = api_context + if self.glossary_id: + payload["glossary_id"] = self.glossary_id + + try: + response = requests.post(self.API_URL, headers=headers, json=payload, timeout=self.timeout) + if response.status_code != 200: + logger.warning( + "[deepl] Failed | status=%s tgt=%s body=%s", + response.status_code, + target_code, + (response.text or "")[:200], + ) + return None + + data = response.json() + translations = data.get("translations") or [] + if not translations: + return None + translated = translations[0].get("text") + if not translated: + return None + if needs_extraction: + translated = self._extract_term_from_translation(translated, text, target_code) + return translated + except requests.Timeout: + logger.warning("[deepl] Timeout | tgt=%s timeout=%.1fs", target_code, self.timeout) + return None + except Exception as exc: + logger.warning("[deepl] Exception | tgt=%s error=%s", target_code, exc, exc_info=True) + return None + + def _add_ecommerce_context( + self, + text: str, + source_lang: Optional[str], + context: Optional[str], + ) -> Tuple[str, bool]: + if not context or "e-commerce" not in context.lower(): + return text, False + if (source_lang or "").lower() != "zh": + return text, False + + term = (text or "").strip() + if len(term.split()) == 1 and len(term) <= 2: + return f"购买 {term}", True + return text, False + + def _extract_term_from_translation( + self, + translated_text: str, + original_text: str, + target_lang_code: str, + ) -> str: + del original_text + if target_lang_code != "EN": + return translated_text + + words = translated_text.strip().split() + if len(words) <= 1: + return translated_text + context_words = {"buy", "purchase", "product", "item", "commodity", "goods"} + for word in reversed(words): + normalized = re.sub(r"[.,!?;:]+$", "", word.lower()) + if normalized not in context_words: + return normalized + return re.sub(r"[.,!?;:]+$", "", words[-1].lower()) + + +DeepLProvider = DeepLTranslationBackend diff --git a/translation/backends/llm.py b/translation/backends/llm.py new file mode 100644 index 0000000..939b06d --- /dev/null +++ b/translation/backends/llm.py @@ -0,0 +1,209 @@ +"""LLM-based translation backend.""" + +from __future__ import annotations + +import logging +import os +import time +from typing import List, Optional, Sequence, Union + +from openai import OpenAI + +from config.env_config import DASHSCOPE_API_KEY +from config.services_config import get_translation_config +from config.translate_prompts import TRANSLATION_PROMPTS +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP + +logger = logging.getLogger(__name__) + +DEFAULT_QWEN_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" +DEFAULT_LLM_MODEL = "qwen-flash" + + +def _build_prompt( + text: str, + *, + source_lang: Optional[str], + target_lang: str, + scene: Optional[str], +) -> str: + tgt = (target_lang or "").lower() or "en" + src = (source_lang or "auto").lower() + normalized_scene = (scene or "").strip() or "general" + if normalized_scene in {"query", "ecommerce_search", "ecommerce_search_query"}: + group_key = "ecommerce_search_query" + elif normalized_scene in {"product_title", "sku_name"}: + group_key = "sku_name" + else: + group_key = normalized_scene + group = TRANSLATION_PROMPTS.get(group_key) or TRANSLATION_PROMPTS["general"] + template = group.get(tgt) or group.get("en") + if not template: + template = ( + "You are a professional {source_lang} ({src_lang_code}) to " + "{target_lang} ({tgt_lang_code}) translator, output only the translation: {text}" + ) + + source_lang_label = SOURCE_LANG_CODE_MAP.get(src, src) + target_lang_label = SOURCE_LANG_CODE_MAP.get(tgt, tgt) + + return template.format( + source_lang=source_lang_label, + src_lang_code=src, + target_lang=target_lang_label, + tgt_lang_code=tgt, + text=text, + ) + + +class LLMTranslationBackend: + def __init__( + self, + *, + model: Optional[str] = None, + timeout_sec: float = 30.0, + base_url: Optional[str] = None, + ) -> None: + cfg = get_translation_config() + llm_cfg = cfg.get_capability_cfg("llm") + self.model = model or llm_cfg.get("model") or DEFAULT_LLM_MODEL + self.timeout_sec = float(llm_cfg.get("timeout_sec") or timeout_sec or 30.0) + self.base_url = ( + (base_url or "").strip() + or (llm_cfg.get("base_url") or "").strip() + or os.getenv("DASHSCOPE_BASE_URL") + or DEFAULT_QWEN_BASE_URL + ) + self.client = self._create_client() + + @property + def supports_batch(self) -> bool: + return True + + def _create_client(self) -> Optional[OpenAI]: + api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") + if not api_key: + logger.warning("DASHSCOPE_API_KEY not set; llm translation unavailable") + return None + try: + return OpenAI(api_key=api_key, base_url=self.base_url) + except Exception as exc: + logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True) + return None + + def _translate_single( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Optional[str]: + if not text or not str(text).strip(): + return text + if not self.client: + return None + + tgt = (target_lang or "").lower() or "en" + src = (source_lang or "auto").lower() + scene = context or "default" + user_prompt = prompt or _build_prompt( + text=text, + source_lang=src, + target_lang=tgt, + scene=scene, + ) + start = time.time() + try: + logger.info( + "[llm] Request | src=%s tgt=%s model=%s prompt=%s", + src, + tgt, + self.model, + user_prompt, + ) + completion = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": user_prompt}], + timeout=self.timeout_sec, + ) + content = (completion.choices[0].message.content or "").strip() + latency_ms = (time.time() - start) * 1000 + if not content: + logger.warning("[llm] Empty result | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms) + return None + logger.info( + "[llm] Success | src=%s tgt=%s src_text=%s response=%s latency=%.1fms", + src, + tgt, + text, + content, + latency_ms, + ) + return content + except Exception as exc: + latency_ms = (time.time() - start) * 1000 + logger.warning( + "[llm] Failed | src=%s tgt=%s latency=%.1fms error=%s", + src, + tgt, + latency_ms, + exc, + exc_info=True, + ) + return None + + def translate( + self, + text: Union[str, Sequence[str]], + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + if item is None: + results.append(None) + continue + results.append( + self._translate_single( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + ) + return results + + return self._translate_single( + text=str(text), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + + +LLMTranslatorProvider = LLMTranslationBackend + + +def llm_translate( + text: Union[str, Sequence[str]], + target_lang: str, + *, + source_lang: Optional[str] = None, + source_lang_label: Optional[str] = None, + target_lang_label: Optional[str] = None, + timeout_sec: Optional[float] = None, +) -> Union[Optional[str], List[Optional[str]]]: + del source_lang_label, target_lang_label + provider = LLMTranslationBackend(timeout_sec=timeout_sec or 30.0) + return provider.translate( + text=text, + target_lang=target_lang, + source_lang=source_lang, + context=None, + ) diff --git a/translation/backends/qwen_mt.py b/translation/backends/qwen_mt.py new file mode 100644 index 0000000..e35e8ad --- /dev/null +++ b/translation/backends/qwen_mt.py @@ -0,0 +1,260 @@ +"""Qwen-MT translation backend with cache support.""" + +from __future__ import annotations + +import hashlib +import logging +import os +import re +import time +from typing import List, Optional, Sequence, Union + +import redis +from openai import OpenAI + +from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG +from config.services_config import get_translation_cache_config +from config.tenant_config_loader import SOURCE_LANG_CODE_MAP + +logger = logging.getLogger(__name__) + + +class QwenMTTranslationBackend: + QWEN_DEFAULT_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" + QWEN_MODEL = "qwen-mt-flash" + SOURCE_LANG_CODE_MAP = SOURCE_LANG_CODE_MAP + + def __init__( + self, + model: str = "qwen", + api_key: Optional[str] = None, + use_cache: bool = True, + timeout: int = 10, + glossary_id: Optional[str] = None, + translation_context: Optional[str] = None, + ): + self.model = self._normalize_model(model) + self.timeout = int(timeout) + self.use_cache = bool(use_cache) + self.glossary_id = glossary_id + self.translation_context = translation_context or "e-commerce product search" + + cache_cfg = get_translation_cache_config() + self.cache_prefix = str(cache_cfg.get("key_prefix", "trans:v2")) + self.expire_seconds = int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600)) + self.cache_sliding_expiration = bool(cache_cfg.get("sliding_expiration", True)) + self.cache_include_context = bool(cache_cfg.get("key_include_context", True)) + self.cache_include_prompt = bool(cache_cfg.get("key_include_prompt", True)) + self.cache_include_source_lang = bool(cache_cfg.get("key_include_source_lang", True)) + + self.qwen_model_name = self._resolve_qwen_model_name(model) + self._api_key = api_key or self._default_api_key(self.model) + self._qwen_client: Optional[OpenAI] = None + base_url = os.getenv("DASHSCOPE_BASE_URL") or self.QWEN_DEFAULT_BASE_URL + if self._api_key: + try: + self._qwen_client = OpenAI(api_key=self._api_key, base_url=base_url) + except Exception as exc: + logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) + else: + logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") + + self.redis_client = None + if self.use_cache and bool(cache_cfg.get("enabled", True)): + self.redis_client = self._init_redis_client() + + @property + def supports_batch(self) -> bool: + return True + + @staticmethod + def _normalize_model(model: str) -> str: + m = (model or "qwen").strip().lower() + if m.startswith("qwen"): + return "qwen-mt" + raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'qwen-mt', 'qwen-mt-flash'") + + @staticmethod + def _resolve_qwen_model_name(model: str) -> str: + m = (model or "qwen").strip().lower() + if m in {"qwen", "qwen-mt"}: + return "qwen-mt-flash" + return m + + @staticmethod + def _default_api_key(model: str) -> Optional[str]: + del model + return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") + + def _init_redis_client(self): + try: + client = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=True, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + client.ping() + return client + except Exception as exc: + logger.warning("Failed to initialize translation redis cache: %s", exc) + return None + + def _build_cache_key( + self, + text: str, + target_lang: str, + source_lang: Optional[str], + context: Optional[str], + prompt: Optional[str], + ) -> str: + src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-" + tgt = (target_lang or "").strip().lower() + ctx = (context or "").strip() if self.cache_include_context else "" + prm = (prompt or "").strip() if self.cache_include_prompt else "" + payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nctx={ctx}\nprm={prm}\ntext={text}" + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}" + + def translate( + self, + text: Union[str, Sequence[str]], + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + results: List[Optional[str]] = [] + for item in text: + if item is None or not str(item).strip(): + results.append(item) # type: ignore[arg-type] + continue + out = self.translate( + text=str(item), + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) + results.append(out) + return results + + if not text or not str(text).strip(): + return text # type: ignore[return-value] + + tgt = (target_lang or "").strip().lower() + src = (source_lang or "").strip().lower() or None + if tgt == "en" and self._is_english_text(text): + return text + if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): + return text + + translation_context = context or self.translation_context + cached = self._get_cached_translation_redis(text, tgt, src, translation_context, prompt) + if cached is not None: + return cached + + result = self._translate_qwen(text, tgt, src) + + if result is not None: + self._set_cached_translation_redis(text, tgt, result, src, translation_context, prompt) + return result + + def _translate_qwen( + self, + text: str, + target_lang: str, + source_lang: Optional[str], + ) -> Optional[str]: + if not self._qwen_client: + return None + tgt_norm = (target_lang or "").strip().lower() + src_norm = (source_lang or "").strip().lower() + tgt_qwen = self.SOURCE_LANG_CODE_MAP.get(tgt_norm, tgt_norm.capitalize()) + src_qwen = "auto" if not src_norm or src_norm == "auto" else self.SOURCE_LANG_CODE_MAP.get(src_norm, src_norm.capitalize()) + start = time.time() + try: + completion = self._qwen_client.chat.completions.create( + model=self.qwen_model_name, + messages=[{"role": "user", "content": text}], + extra_body={ + "translation_options": { + "source_lang": src_qwen, + "target_lang": tgt_qwen, + } + }, + timeout=self.timeout, + ) + content = (completion.choices[0].message.content or "").strip() + if not content: + return None + logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) + return content + except Exception as exc: + logger.warning( + "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", + src_qwen, + tgt_qwen, + (time.time() - start) * 1000, + exc, + exc_info=True, + ) + return None + + def _get_cached_translation_redis( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Optional[str]: + if not self.redis_client: + return None + key = self._build_cache_key(text, target_lang, source_lang, context, prompt) + try: + value = self.redis_client.get(key) + if value and self.cache_sliding_expiration: + self.redis_client.expire(key, self.expire_seconds) + return value + except Exception as exc: + logger.warning("Redis get translation cache failed: %s", exc) + return None + + def _set_cached_translation_redis( + self, + text: str, + target_lang: str, + translation: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> None: + if not self.redis_client: + return + key = self._build_cache_key(text, target_lang, source_lang, context, prompt) + try: + self.redis_client.setex(key, self.expire_seconds, translation) + except Exception as exc: + logger.warning("Redis set translation cache failed: %s", exc) + + @staticmethod + def _contains_chinese(text: str) -> bool: + return bool(re.search(r"[\u4e00-\u9fff]", text or "")) + + @staticmethod + def _is_english_text(text: str) -> bool: + stripped = (text or "").strip() + return bool(stripped) and bool(re.fullmatch(r"[A-Za-z0-9\s\W]+", stripped)) and not QwenMTTranslationBackend._contains_chinese(stripped) + + @staticmethod + def _is_pure_number(text: str) -> bool: + return bool(re.fullmatch(r"[\d.\-+%/,: ]+", (text or "").strip())) + + +Translator = QwenMTTranslationBackend diff --git a/translation/client.py b/translation/client.py new file mode 100644 index 0000000..ede2858 --- /dev/null +++ b/translation/client.py @@ -0,0 +1,86 @@ +"""HTTP client for the translation service.""" + +from __future__ import annotations + +import logging +from typing import List, Optional, Sequence, Union + +import requests + +from config.services_config import get_translation_config + +logger = logging.getLogger(__name__) + + +class TranslationServiceClient: + """Business-side translation client that talks to the translator service.""" + + def __init__( + self, + *, + base_url: Optional[str] = None, + default_model: Optional[str] = None, + default_scene: Optional[str] = None, + timeout_sec: Optional[float] = None, + ) -> None: + cfg = get_translation_config() + self.base_url = (base_url or cfg.service_url).rstrip("/") + self.default_model = cfg.normalize_model_name(default_model or cfg.default_model) + self.default_scene = (default_scene or cfg.default_scene or "general").strip() or "general" + self.timeout_sec = float(timeout_sec or cfg.timeout_sec or 10.0) + + @property + def model(self) -> str: + return self.default_model + + @property + def supports_batch(self) -> bool: + return True + + def translate( + self, + text: Union[str, Sequence[str]], + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + model: Optional[str] = None, + scene: Optional[str] = None, + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, tuple): + text = list(text) + payload = { + "text": text, + "target_lang": target_lang, + "source_lang": source_lang or "auto", + "model": (model or self.default_model), + "scene": (scene or context or self.default_scene), + } + if prompt: + payload["prompt"] = prompt + try: + response = requests.post( + f"{self.base_url}/translate", + json=payload, + timeout=self.timeout_sec, + ) + if response.status_code != 200: + logger.warning( + "Translation service request failed: status=%s body=%s", + response.status_code, + (response.text or "")[:300], + ) + return self._empty_result_for(text) + data = response.json() + return data.get("translated_text") + except Exception as exc: + logger.warning("Translation service request error: %s", exc, exc_info=True) + return self._empty_result_for(text) + + @staticmethod + def _empty_result_for( + text: Union[str, Sequence[str]], + ) -> Union[Optional[str], List[Optional[str]]]: + if isinstance(text, (list, tuple)): + return [None for _ in text] + return None diff --git a/translation/protocols.py b/translation/protocols.py new file mode 100644 index 0000000..82bf6f9 --- /dev/null +++ b/translation/protocols.py @@ -0,0 +1,30 @@ +"""Protocols for translation service backends.""" + +from __future__ import annotations + +from typing import List, Optional, Protocol, Sequence, Union, runtime_checkable + + +TranslateInput = Union[str, Sequence[str]] +TranslateOutput = Union[Optional[str], List[Optional[str]]] + + +@runtime_checkable +class TranslationBackendProtocol(Protocol): + """Shared protocol implemented by translation backends.""" + + model: str + + @property + def supports_batch(self) -> bool: + ... + + def translate( + self, + text: TranslateInput, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> TranslateOutput: + ... diff --git a/translation/service.py b/translation/service.py new file mode 100644 index 0000000..10ed49e --- /dev/null +++ b/translation/service.py @@ -0,0 +1,103 @@ +"""Translation service orchestration.""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional + +from config.services_config import TranslationServiceConfig, get_translation_config +from translation.protocols import TranslateInput, TranslateOutput, TranslationBackendProtocol + +logger = logging.getLogger(__name__) + + +class TranslationService: + """Owns translation backends and routes calls by model and scene.""" + + def __init__(self, config: Optional[TranslationServiceConfig] = None) -> None: + self.config = config or get_translation_config() + self._backends: Dict[str, TranslationBackendProtocol] = {} + self._init_enabled_backends() + + def _init_enabled_backends(self) -> None: + registry = { + "qwen-mt": self._create_qwen_mt_backend, + "deepl": self._create_deepl_backend, + "llm": self._create_llm_backend, + } + for name in self.config.enabled_models: + factory = registry.get(name) + if factory is None: + logger.warning("Translation backend '%s' is enabled but not registered", name) + continue + self._backends[name] = factory() + + if not self._backends: + raise ValueError("No enabled translation backends found in services.translation.capabilities") + + def _create_qwen_mt_backend(self) -> TranslationBackendProtocol: + from translation.backends.qwen_mt import QwenMTTranslationBackend + + cfg = self.config.get_capability_cfg("qwen-mt") + return QwenMTTranslationBackend( + model=cfg.get("model") or "qwen-mt-flash", + api_key=cfg.get("api_key"), + use_cache=bool(cfg.get("use_cache", True)), + timeout=int(cfg.get("timeout_sec", 10)), + glossary_id=cfg.get("glossary_id"), + translation_context=cfg.get("translation_context"), + ) + + def _create_deepl_backend(self) -> TranslationBackendProtocol: + from translation.backends.deepl import DeepLTranslationBackend + + cfg = self.config.get_capability_cfg("deepl") + return DeepLTranslationBackend( + api_key=cfg.get("api_key"), + timeout=float(cfg.get("timeout_sec", 10.0)), + glossary_id=cfg.get("glossary_id"), + ) + + def _create_llm_backend(self) -> TranslationBackendProtocol: + from translation.backends.llm import LLMTranslationBackend + + cfg = self.config.get_capability_cfg("llm") + return LLMTranslationBackend( + model=cfg.get("model"), + timeout_sec=float(cfg.get("timeout_sec", 30.0)), + base_url=cfg.get("base_url"), + ) + + @property + def available_models(self) -> List[str]: + return list(self._backends.keys()) + + def get_backend(self, model: Optional[str] = None) -> TranslationBackendProtocol: + normalized = self.config.normalize_model_name(model) + backend = self._backends.get(normalized) + if backend is None: + raise ValueError( + f"Translation model '{normalized}' is not enabled. " + f"Available models: {', '.join(self.available_models) or 'none'}" + ) + return backend + + def translate( + self, + text: TranslateInput, + target_lang: str, + source_lang: Optional[str] = None, + *, + model: Optional[str] = None, + scene: Optional[str] = None, + prompt: Optional[str] = None, + ) -> TranslateOutput: + backend = self.get_backend(model) + active_scene = (scene or self.config.default_scene or "general").strip() or "general" + return backend.translate( + text=text, + target_lang=target_lang, + source_lang=source_lang, + context=active_scene, + prompt=prompt, + ) -- libgit2 0.21.2