"""Qwen-MT translation backend with cache support.""" from __future__ import annotations import hashlib import logging import os import re import time from typing import List, Optional, Sequence, Union import redis from openai import OpenAI from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG from config.services_config import get_translation_cache_config from translation.languages import QWEN_LANGUAGE_CODES logger = logging.getLogger(__name__) class QwenMTTranslationBackend: def __init__( self, capability_name: str, model: str, base_url: str, api_key: Optional[str] = None, use_cache: bool = True, timeout: int = 10, glossary_id: Optional[str] = None, ): self.capability_name = capability_name self.model = self._normalize_capability_name(capability_name) self.qwen_model_name = self._normalize_model_name(model) self.base_url = base_url self.timeout = int(timeout) self.use_cache = bool(use_cache) self.glossary_id = glossary_id cache_cfg = get_translation_cache_config() self.cache_prefix = str(cache_cfg["key_prefix"]) self.expire_seconds = int(cache_cfg["ttl_seconds"]) self.cache_sliding_expiration = bool(cache_cfg["sliding_expiration"]) self.cache_include_scene = bool(cache_cfg["key_include_scene"]) self.cache_include_source_lang = bool(cache_cfg["key_include_source_lang"]) self._api_key = api_key or self._default_api_key(self.model) self._qwen_client: Optional[OpenAI] = None if self._api_key: try: self._qwen_client = OpenAI(api_key=self._api_key, base_url=self.base_url) except Exception as exc: logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) else: logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") self.redis_client = None if self.use_cache and bool(cache_cfg["enabled"]): self.redis_client = self._init_redis_client() @property def supports_batch(self) -> bool: return True @staticmethod def _normalize_capability_name(name: str) -> str: normalized = str(name or "").strip().lower() if normalized != "qwen-mt": raise ValueError(f"Qwen-MT backend capability must be 'qwen-mt', got '{name}'") return normalized @staticmethod def _normalize_model_name(model: str) -> str: normalized = str(model or "").strip() if not normalized: raise ValueError("qwen-mt backend model is required") return normalized @staticmethod def _default_api_key(model: str) -> Optional[str]: del model return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") def _init_redis_client(self): try: client = redis.Redis( host=REDIS_CONFIG.get("host", "localhost"), port=REDIS_CONFIG.get("port", 6479), password=REDIS_CONFIG.get("password"), decode_responses=True, socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), health_check_interval=10, ) client.ping() return client except Exception as exc: logger.warning("Failed to initialize translation redis cache: %s", exc) return None def _build_cache_key( self, text: str, target_lang: str, source_lang: Optional[str], scene: Optional[str], ) -> str: src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-" tgt = (target_lang or "").strip().lower() scn = (scene or "").strip() if self.cache_include_scene else "" payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nscene={scn}\ntext={text}" digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}" def translate( self, text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, scene: Optional[str] = None, ) -> Union[Optional[str], List[Optional[str]]]: if isinstance(text, (list, tuple)): results: List[Optional[str]] = [] for item in text: if item is None or not str(item).strip(): results.append(item) # type: ignore[arg-type] continue out = self.translate( text=str(item), target_lang=target_lang, source_lang=source_lang, scene=scene, ) results.append(out) return results if not text or not str(text).strip(): return text # type: ignore[return-value] tgt = (target_lang or "").strip().lower() src = (source_lang or "").strip().lower() or None if tgt == "en" and self._is_english_text(text): return text if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): return text cached = self._get_cached_translation_redis(text, tgt, src, scene) if cached is not None: return cached result = self._translate_qwen(text, tgt, src) if result is not None: self._set_cached_translation_redis(text, tgt, result, src, scene) return result def _translate_qwen( self, text: str, target_lang: str, source_lang: Optional[str], ) -> Optional[str]: if not self._qwen_client: return None tgt_norm = (target_lang or "").strip().lower() src_norm = (source_lang or "").strip().lower() tgt_qwen = QWEN_LANGUAGE_CODES.get(tgt_norm, tgt_norm.capitalize()) src_qwen = "auto" if not src_norm or src_norm == "auto" else QWEN_LANGUAGE_CODES.get(src_norm, src_norm.capitalize()) start = time.time() try: completion = self._qwen_client.chat.completions.create( model=self.qwen_model_name, messages=[{"role": "user", "content": text}], extra_body={ "translation_options": { "source_lang": src_qwen, "target_lang": tgt_qwen, } }, timeout=self.timeout, ) content = (completion.choices[0].message.content or "").strip() if not content: return None logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) return content except Exception as exc: logger.warning( "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", src_qwen, tgt_qwen, (time.time() - start) * 1000, exc, exc_info=True, ) return None def _get_cached_translation_redis( self, text: str, target_lang: str, source_lang: Optional[str] = None, scene: Optional[str] = None, ) -> Optional[str]: if not self.redis_client: return None key = self._build_cache_key(text, target_lang, source_lang, scene) try: value = self.redis_client.get(key) if value and self.cache_sliding_expiration: self.redis_client.expire(key, self.expire_seconds) return value except Exception as exc: logger.warning("Redis get translation cache failed: %s", exc) return None def _set_cached_translation_redis( self, text: str, target_lang: str, translation: str, source_lang: Optional[str] = None, scene: Optional[str] = None, ) -> None: if not self.redis_client: return key = self._build_cache_key(text, target_lang, source_lang, scene) try: self.redis_client.setex(key, self.expire_seconds, translation) except Exception as exc: logger.warning("Redis set translation cache failed: %s", exc) @staticmethod def _contains_chinese(text: str) -> bool: return bool(re.search(r"[\u4e00-\u9fff]", text or "")) @staticmethod def _is_english_text(text: str) -> bool: stripped = (text or "").strip() return bool(stripped) and bool(re.fullmatch(r"[A-Za-z0-9\s\W]+", stripped)) and not QwenMTTranslationBackend._contains_chinese(stripped) @staticmethod def _is_pure_number(text: str) -> bool: return bool(re.fullmatch(r"[\d.\-+%/,: ]+", (text or "").strip()))