deepl.py
5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""DeepL translation backend."""
from __future__ import annotations
import logging
import re
from typing import List, Optional, Sequence, Tuple, Union
import requests
from translation.languages import DEEPL_LANGUAGE_CODES
from translation.scenes import SCENE_DEEPL_CONTEXTS, normalize_scene_name
logger = logging.getLogger(__name__)
class DeepLTranslationBackend:
def __init__(
self,
api_key: Optional[str],
*,
api_url: str,
timeout: float,
glossary_id: Optional[str] = None,
) -> None:
self.api_key = api_key
self.api_url = api_url
self.timeout = float(timeout)
self.glossary_id = glossary_id
self.model = "deepl"
if not self.api_key:
logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable")
@property
def supports_batch(self) -> bool:
return False
def _resolve_request_context(
self,
target_lang: str,
scene: Optional[str],
) -> Optional[str]:
if scene is None:
raise ValueError("deepl translation scene is required")
normalized_scene = normalize_scene_name(scene)
scene_map = SCENE_DEEPL_CONTEXTS[normalized_scene]
tgt = str(target_lang or "").strip().lower()
return scene_map.get(tgt) or scene_map.get("en")
def translate(
self,
text: Union[str, Sequence[str]],
target_lang: str,
source_lang: Optional[str] = None,
scene: Optional[str] = None,
) -> Union[Optional[str], List[Optional[str]]]:
if isinstance(text, (list, tuple)):
results: List[Optional[str]] = []
for item in text:
if item is None or not str(item).strip():
results.append(item) # type: ignore[arg-type]
continue
out = self.translate(
text=str(item),
target_lang=target_lang,
source_lang=source_lang,
scene=scene,
)
results.append(out)
return results
if not self.api_key:
return None
target_code = DEEPL_LANGUAGE_CODES.get((target_lang or "").lower(), (target_lang or "").upper())
headers = {
"Authorization": f"DeepL-Auth-Key {self.api_key}",
"Content-Type": "application/json",
}
api_context = self._resolve_request_context(target_lang, scene)
text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context)
payload = {
"text": [text_to_translate],
"target_lang": target_code,
}
if source_lang:
payload["source_lang"] = DEEPL_LANGUAGE_CODES.get(source_lang.lower(), source_lang.upper())
if api_context:
payload["context"] = api_context
if self.glossary_id:
payload["glossary_id"] = self.glossary_id
try:
response = requests.post(self.api_url, headers=headers, json=payload, timeout=self.timeout)
if response.status_code != 200:
logger.warning(
"[deepl] Failed | status=%s tgt=%s body=%s",
response.status_code,
target_code,
(response.text or "")[:200],
)
return None
data = response.json()
translations = data.get("translations") or []
if not translations:
return None
translated = translations[0].get("text")
if not translated:
return None
if needs_extraction:
translated = self._extract_term_from_translation(translated, text, target_code)
return translated
except requests.Timeout:
logger.warning("[deepl] Timeout | tgt=%s timeout=%.1fs", target_code, self.timeout)
return None
except Exception as exc:
logger.warning("[deepl] Exception | tgt=%s error=%s", target_code, exc, exc_info=True)
return None
def _add_ecommerce_context(
self,
text: str,
source_lang: Optional[str],
scene: Optional[str],
) -> Tuple[str, bool]:
if not scene or "e-commerce" not in scene.lower():
return text, False
if (source_lang or "").lower() != "zh":
return text, False
term = (text or "").strip()
if len(term.split()) == 1 and len(term) <= 2:
return f"购买 {term}", True
return text, False
def _extract_term_from_translation(
self,
translated_text: str,
original_text: str,
target_lang_code: str,
) -> str:
del original_text
if target_lang_code != "EN":
return translated_text
words = translated_text.strip().split()
if len(words) <= 1:
return translated_text
context_words = {"buy", "purchase", "product", "item", "commodity", "goods"}
for word in reversed(words):
normalized = re.sub(r"[.,!?;:]+$", "", word.lower())
if normalized not in context_words:
return normalized
return re.sub(r"[.,!?;:]+$", "", words[-1].lower())