qwen_mt.py 8.89 KB
"""Qwen-MT translation backend with cache support."""

from __future__ import annotations

import hashlib
import logging
import os
import re
import time
from typing import List, Optional, Sequence, Union

import redis
from openai import OpenAI

from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG
from config.services_config import get_translation_cache_config
from translation.languages import QWEN_LANGUAGE_CODES

logger = logging.getLogger(__name__)


class QwenMTTranslationBackend:
    def __init__(
        self,
        capability_name: str,
        model: str,
        base_url: str,
        api_key: Optional[str] = None,
        use_cache: bool = True,
        timeout: int = 10,
        glossary_id: Optional[str] = None,
    ):
        self.capability_name = capability_name
        self.model = self._normalize_capability_name(capability_name)
        self.qwen_model_name = self._normalize_model_name(model)
        self.base_url = base_url
        self.timeout = int(timeout)
        self.use_cache = bool(use_cache)
        self.glossary_id = glossary_id

        cache_cfg = get_translation_cache_config()
        self.cache_prefix = str(cache_cfg["key_prefix"])
        self.expire_seconds = int(cache_cfg["ttl_seconds"])
        self.cache_sliding_expiration = bool(cache_cfg["sliding_expiration"])
        self.cache_include_scene = bool(cache_cfg["key_include_scene"])
        self.cache_include_source_lang = bool(cache_cfg["key_include_source_lang"])

        self._api_key = api_key or self._default_api_key(self.model)
        self._qwen_client: Optional[OpenAI] = None
        if self._api_key:
            try:
                self._qwen_client = OpenAI(api_key=self._api_key, base_url=self.base_url)
            except Exception as exc:
                logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True)
        else:
            logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable")

        self.redis_client = None
        if self.use_cache and bool(cache_cfg["enabled"]):
            self.redis_client = self._init_redis_client()

    @property
    def supports_batch(self) -> bool:
        return True

    @staticmethod
    def _normalize_capability_name(name: str) -> str:
        normalized = str(name or "").strip().lower()
        if normalized != "qwen-mt":
            raise ValueError(f"Qwen-MT backend capability must be 'qwen-mt', got '{name}'")
        return normalized

    @staticmethod
    def _normalize_model_name(model: str) -> str:
        normalized = str(model or "").strip()
        if not normalized:
            raise ValueError("qwen-mt backend model is required")
        return normalized

    @staticmethod
    def _default_api_key(model: str) -> Optional[str]:
        del model
        return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY")

    def _init_redis_client(self):
        try:
            client = redis.Redis(
                host=REDIS_CONFIG.get("host", "localhost"),
                port=REDIS_CONFIG.get("port", 6479),
                password=REDIS_CONFIG.get("password"),
                decode_responses=True,
                socket_timeout=REDIS_CONFIG.get("socket_timeout", 1),
                socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1),
                retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False),
                health_check_interval=10,
            )
            client.ping()
            return client
        except Exception as exc:
            logger.warning("Failed to initialize translation redis cache: %s", exc)
            return None

    def _build_cache_key(
        self,
        text: str,
        target_lang: str,
        source_lang: Optional[str],
        scene: Optional[str],
    ) -> str:
        src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-"
        tgt = (target_lang or "").strip().lower()
        scn = (scene or "").strip() if self.cache_include_scene else ""
        payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nscene={scn}\ntext={text}"
        digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()
        return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}"

    def translate(
        self,
        text: Union[str, Sequence[str]],
        target_lang: str,
        source_lang: Optional[str] = None,
        scene: Optional[str] = None,
    ) -> Union[Optional[str], List[Optional[str]]]:
        if isinstance(text, (list, tuple)):
            results: List[Optional[str]] = []
            for item in text:
                if item is None or not str(item).strip():
                    results.append(item)  # type: ignore[arg-type]
                    continue
                out = self.translate(
                    text=str(item),
                    target_lang=target_lang,
                    source_lang=source_lang,
                    scene=scene,
                )
                results.append(out)
            return results

        if not text or not str(text).strip():
            return text  # type: ignore[return-value]

        tgt = (target_lang or "").strip().lower()
        src = (source_lang or "").strip().lower() or None
        if tgt == "en" and self._is_english_text(text):
            return text
        if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)):
            return text

        cached = self._get_cached_translation_redis(text, tgt, src, scene)
        if cached is not None:
            return cached

        result = self._translate_qwen(text, tgt, src)

        if result is not None:
            self._set_cached_translation_redis(text, tgt, result, src, scene)
        return result

    def _translate_qwen(
        self,
        text: str,
        target_lang: str,
        source_lang: Optional[str],
    ) -> Optional[str]:
        if not self._qwen_client:
            return None
        tgt_norm = (target_lang or "").strip().lower()
        src_norm = (source_lang or "").strip().lower()
        tgt_qwen = QWEN_LANGUAGE_CODES.get(tgt_norm, tgt_norm.capitalize())
        src_qwen = "auto" if not src_norm or src_norm == "auto" else QWEN_LANGUAGE_CODES.get(src_norm, src_norm.capitalize())
        start = time.time()
        try:
            completion = self._qwen_client.chat.completions.create(
                model=self.qwen_model_name,
                messages=[{"role": "user", "content": text}],
                extra_body={
                    "translation_options": {
                        "source_lang": src_qwen,
                        "target_lang": tgt_qwen,
                    }
                },
                timeout=self.timeout,
            )
            content = (completion.choices[0].message.content or "").strip()
            if not content:
                return None
            logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000)
            return content
        except Exception as exc:
            logger.warning(
                "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s",
                src_qwen,
                tgt_qwen,
                (time.time() - start) * 1000,
                exc,
                exc_info=True,
            )
            return None

    def _get_cached_translation_redis(
        self,
        text: str,
        target_lang: str,
        source_lang: Optional[str] = None,
        scene: Optional[str] = None,
    ) -> Optional[str]:
        if not self.redis_client:
            return None
        key = self._build_cache_key(text, target_lang, source_lang, scene)
        try:
            value = self.redis_client.get(key)
            if value and self.cache_sliding_expiration:
                self.redis_client.expire(key, self.expire_seconds)
            return value
        except Exception as exc:
            logger.warning("Redis get translation cache failed: %s", exc)
            return None

    def _set_cached_translation_redis(
        self,
        text: str,
        target_lang: str,
        translation: str,
        source_lang: Optional[str] = None,
        scene: Optional[str] = None,
    ) -> None:
        if not self.redis_client:
            return
        key = self._build_cache_key(text, target_lang, source_lang, scene)
        try:
            self.redis_client.setex(key, self.expire_seconds, translation)
        except Exception as exc:
            logger.warning("Redis set translation cache failed: %s", exc)

    @staticmethod
    def _contains_chinese(text: str) -> bool:
        return bool(re.search(r"[\u4e00-\u9fff]", text or ""))

    @staticmethod
    def _is_english_text(text: str) -> bool:
        stripped = (text or "").strip()
        return bool(stripped) and bool(re.fullmatch(r"[A-Za-z0-9\s\W]+", stripped)) and not QwenMTTranslationBackend._contains_chinese(stripped)

    @staticmethod
    def _is_pure_number(text: str) -> bool:
        return bool(re.fullmatch(r"[\d.\-+%/,: ]+", (text or "").strip()))