deepl.py 7.03 KB
"""DeepL translation backend."""

from __future__ import annotations

import logging
import re
import time
from typing import List, Optional, Sequence, Tuple, Union

import requests
from requests.adapters import HTTPAdapter

from translation.languages import DEEPL_LANGUAGE_CODES
from translation.scenes import SCENE_DEEPL_CONTEXTS, normalize_scene_name

logger = logging.getLogger(__name__)


class DeepLTranslationBackend:
    _CONNECT_TIMEOUT_SEC = 2.0
    _MAX_RETRIES = 2
    _RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

    def __init__(
        self,
        api_key: Optional[str],
        *,
        api_url: str,
        timeout: float,
        glossary_id: Optional[str] = None,
    ) -> None:
        self.api_key = api_key
        self.api_url = api_url
        self.timeout = float(timeout)
        self.glossary_id = glossary_id
        self.model = "deepl"
        self._session = requests.Session()
        self._session.mount("http://", HTTPAdapter(pool_connections=32, pool_maxsize=32, max_retries=0))
        self._session.mount("https://", HTTPAdapter(pool_connections=32, pool_maxsize=32, max_retries=0))
        if not self.api_key:
            logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable")

    @property
    def supports_batch(self) -> bool:
        return False

    def _resolve_request_context(
        self,
        target_lang: str,
        scene: Optional[str],
    ) -> Optional[str]:
        if scene is None:
            raise ValueError("deepl translation scene is required")
        normalized_scene = normalize_scene_name(scene)
        scene_map = SCENE_DEEPL_CONTEXTS[normalized_scene]
        tgt = str(target_lang or "").strip().lower()
        return scene_map.get(tgt) or scene_map.get("en")

    def _build_headers(self) -> dict:
        return {
            "Authorization": f"DeepL-Auth-Key {self.api_key}",
            "Content-Type": "application/json",
        }

    def _build_payload(
        self,
        *,
        texts: List[str],
        target_code: str,
        source_lang: Optional[str],
        api_context: Optional[str],
    ) -> dict:
        payload: dict = {"text": texts, "target_lang": target_code}
        if source_lang:
            payload["source_lang"] = DEEPL_LANGUAGE_CODES.get(source_lang.lower(), source_lang.upper())
        if api_context:
            payload["context"] = api_context
        if self.glossary_id:
            payload["glossary_id"] = self.glossary_id
        return payload

    def _post_with_retries(self, *, headers: dict, payload: dict, target_code: str) -> Optional[dict]:
        for attempt in range(self._MAX_RETRIES + 1):
            try:
                response = self._session.post(
                    self.api_url,
                    headers=headers,
                    json=payload,
                    timeout=(self._CONNECT_TIMEOUT_SEC, self.timeout),
                )
            except requests.Timeout:
                logger.warning("[deepl] Timeout | tgt=%s timeout=%.1fs", target_code, self.timeout)
                return None
            except Exception as exc:
                logger.warning("[deepl] Exception | tgt=%s error=%s", target_code, exc, exc_info=True)
                return None

            if response.status_code == 200:
                try:
                    return response.json()
                except Exception as exc:
                    logger.warning("[deepl] Bad JSON | tgt=%s error=%s body=%s", target_code, exc, (response.text or "")[:200])
                    return None

            retryable = response.status_code in self._RETRYABLE_STATUS_CODES
            logger.warning(
                "[deepl] Failed | status=%s tgt=%s attempt=%s/%s body=%s",
                response.status_code,
                target_code,
                attempt + 1,
                self._MAX_RETRIES + 1,
                (response.text or "")[:200],
            )
            if (not retryable) or attempt >= self._MAX_RETRIES:
                return None
            sleep_s = min(1.0, 0.1 * (2 ** attempt))
            time.sleep(sleep_s)

        return None

    def translate(
        self,
        text: Union[str, Sequence[str]],
        target_lang: str,
        source_lang: Optional[str] = None,
        scene: Optional[str] = None,
    ) -> Union[Optional[str], List[Optional[str]]]:
        if isinstance(text, (list, tuple)):
            results: List[Optional[str]] = []
            for item in text:
                if item is None or not str(item).strip():
                    results.append(item)  # type: ignore[arg-type]
                    continue
                out = self.translate(
                    text=str(item),
                    target_lang=target_lang,
                    source_lang=source_lang,
                    scene=scene,
                )
                results.append(out)
            return results

        if not self.api_key:
            return None

        target_code = DEEPL_LANGUAGE_CODES.get((target_lang or "").lower(), (target_lang or "").upper())
        api_context = self._resolve_request_context(target_lang, scene)
        headers = self._build_headers()

        text_to_translate, needs_extraction = self._add_ecommerce_context(str(text), source_lang, api_context)

        payload = self._build_payload(
            texts=[text_to_translate],
            target_code=target_code,
            source_lang=source_lang,
            api_context=api_context,
        )
        data = self._post_with_retries(headers=headers, payload=payload, target_code=target_code)
        translations = (data or {}).get("translations") or []
        if not translations:
            return None
        translated = translations[0].get("text") if isinstance(translations[0], dict) else None
        if not translated:
            return None
        if needs_extraction:
            translated = self._extract_term_from_translation(translated, text, target_code)
        return translated

    def _add_ecommerce_context(
        self,
        text: str,
        source_lang: Optional[str],
        scene: Optional[str],
    ) -> Tuple[str, bool]:
        if not scene or "e-commerce" not in scene.lower():
            return text, False
        if (source_lang or "").lower() != "zh":
            return text, False

        term = (text or "").strip()
        if len(term.split()) == 1 and len(term) <= 2:
            return f"购买 {term}", True
        return text, False

    def _extract_term_from_translation(
        self,
        translated_text: str,
        original_text: str,
        target_lang_code: str,
    ) -> str:
        del original_text
        if target_lang_code != "EN":
            return translated_text

        words = translated_text.strip().split()
        if len(words) <= 1:
            return translated_text
        context_words = {"buy", "purchase", "product", "item", "commodity", "goods"}
        for word in reversed(words):
            normalized = re.sub(r"[.,!?;:]+$", "", word.lower())
            if normalized not in context_words:
                return normalized
        return re.sub(r"[.,!?;:]+$", "", words[-1].lower())