"""Qwen-MT translation backend.""" from __future__ import annotations import logging import re import time from typing import List, Optional, Sequence, Union from openai import OpenAI from translation.languages import QWEN_LANGUAGE_CODES logger = logging.getLogger(__name__) class QwenMTTranslationBackend: def __init__( self, capability_name: str, model: str, base_url: str, api_key: Optional[str] = None, timeout: int = 10, glossary_id: Optional[str] = None, ): self.capability_name = capability_name self.model = self._normalize_capability_name(capability_name) self.qwen_model_name = self._normalize_model_name(model) self.base_url = base_url self.timeout = int(timeout) self.glossary_id = glossary_id self._api_key = api_key or self._default_api_key(self.model) self._qwen_client: Optional[OpenAI] = None if self._api_key: try: self._qwen_client = OpenAI(api_key=self._api_key, base_url=self.base_url) except Exception as exc: logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) else: logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") @property def supports_batch(self) -> bool: return True @staticmethod def _normalize_capability_name(name: str) -> str: normalized = str(name or "").strip().lower() if normalized != "qwen-mt": raise ValueError(f"Qwen-MT backend capability must be 'qwen-mt', got '{name}'") return normalized @staticmethod def _normalize_model_name(model: str) -> str: normalized = str(model or "").strip() if not normalized: raise ValueError("qwen-mt backend model is required") return normalized @staticmethod def _default_api_key(model: str) -> Optional[str]: del model return None def translate( self, text: Union[str, Sequence[str]], target_lang: str, source_lang: Optional[str] = None, scene: Optional[str] = None, ) -> Union[Optional[str], List[Optional[str]]]: if isinstance(text, (list, tuple)): results: List[Optional[str]] = [] for item in text: if item is None or not str(item).strip(): results.append(item) # type: ignore[arg-type] continue out = self.translate( text=str(item), target_lang=target_lang, source_lang=source_lang, scene=scene, ) results.append(out) return results if not text or not str(text).strip(): return text # type: ignore[return-value] tgt = (target_lang or "").strip().lower() src = (source_lang or "").strip().lower() or None if tgt == "en" and self._is_english_text(text): return text if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): return text result = self._translate_qwen(text, tgt, src) return result def _translate_qwen( self, text: str, target_lang: str, source_lang: Optional[str], ) -> Optional[str]: if not self._qwen_client: return None tgt_norm = (target_lang or "").strip().lower() src_norm = (source_lang or "").strip().lower() tgt_qwen = QWEN_LANGUAGE_CODES.get(tgt_norm, tgt_norm.capitalize()) src_qwen = "auto" if not src_norm or src_norm == "auto" else QWEN_LANGUAGE_CODES.get(src_norm, src_norm.capitalize()) start = time.time() try: completion = self._qwen_client.chat.completions.create( model=self.qwen_model_name, messages=[{"role": "user", "content": text}], extra_body={ "translation_options": { "source_lang": src_qwen, "target_lang": tgt_qwen, } }, timeout=self.timeout, ) content = (completion.choices[0].message.content or "").strip() if not content: return None logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) return content except Exception as exc: logger.warning( "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", src_qwen, tgt_qwen, (time.time() - start) * 1000, exc, exc_info=True, ) return None @staticmethod def _contains_chinese(text: str) -> bool: return bool(re.search(r"[\u4e00-\u9fff]", text or "")) @staticmethod def _is_english_text(text: str) -> bool: stripped = (text or "").strip() return bool(stripped) and bool(re.fullmatch(r"[A-Za-z0-9\s\W]+", stripped)) and not QwenMTTranslationBackend._contains_chinese(stripped) @staticmethod def _is_pure_number(text: str) -> bool: return bool(re.fullmatch(r"[\d.\-+%/,: ]+", (text or "").strip()))