Blame view

query/qwen_mt_translate.py 10.8 KB
d4cadc13   tangwang   翻译重构
1
  """Qwen-MT translation orchestrator with cache and async helpers."""
a0a173ae   tangwang   last
2
  
d4cadc13   tangwang   翻译重构
3
  from __future__ import annotations
a0a173ae   tangwang   last
4
  
d4cadc13   tangwang   翻译重构
5
6
  import hashlib
  import logging
a0a173ae   tangwang   last
7
  import os
a0a173ae   tangwang   last
8
  import re
a0a173ae   tangwang   last
9
  import time
6f7840cf   tangwang   refactor: rename ...
10
  from typing import Dict, List, Optional, Sequence, Union
a0a173ae   tangwang   last
11
  
d4cadc13   tangwang   翻译重构
12
  import redis
a0a173ae   tangwang   last
13
14
  from openai import OpenAI
  
d4cadc13   tangwang   翻译重构
15
16
  from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG
  from config.services_config import get_translation_cache_config
6f7840cf   tangwang   refactor: rename ...
17
  from config.tenant_config_loader import SOURCE_LANG_CODE_MAP, TARGET_LANG_CODE_MAP
a0a173ae   tangwang   last
18
  
d4cadc13   tangwang   翻译重构
19
  logger = logging.getLogger(__name__)
a0a173ae   tangwang   last
20
  
a0a173ae   tangwang   last
21
  
d4cadc13   tangwang   翻译重构
22
23
24
  class Translator:
      QWEN_DEFAULT_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1"
      QWEN_MODEL = "qwen-mt-flash"
a0a173ae   tangwang   last
25
26
27
28
29
30
31
32
  
      def __init__(
          self,
          model: str = "qwen",
          api_key: Optional[str] = None,
          use_cache: bool = True,
          timeout: int = 10,
          glossary_id: Optional[str] = None,
d4cadc13   tangwang   翻译重构
33
          translation_context: Optional[str] = None,
a0a173ae   tangwang   last
34
      ):
d4cadc13   tangwang   翻译重构
35
36
37
          self.model = self._normalize_model(model)
          self.timeout = int(timeout)
          self.use_cache = bool(use_cache)
a0a173ae   tangwang   last
38
39
          self.glossary_id = glossary_id
          self.translation_context = translation_context or "e-commerce product search"
a0a173ae   tangwang   last
40
  
d4cadc13   tangwang   翻译重构
41
42
43
44
45
46
47
48
49
50
51
52
53
          cache_cfg = get_translation_cache_config()
          self.cache_prefix = str(cache_cfg.get("key_prefix", "trans:v2"))
          self.expire_seconds = int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600))
          self.cache_sliding_expiration = bool(cache_cfg.get("sliding_expiration", True))
          self.cache_include_context = bool(cache_cfg.get("key_include_context", True))
          self.cache_include_prompt = bool(cache_cfg.get("key_include_prompt", True))
          self.cache_include_source_lang = bool(cache_cfg.get("key_include_source_lang", True))
  
          self.qwen_model_name = self._resolve_qwen_model_name(model)
          self._api_key = api_key or self._default_api_key(self.model)
          self._qwen_client: Optional[OpenAI] = None
          base_url = os.getenv("DASHSCOPE_BASE_URL") or self.QWEN_DEFAULT_BASE_URL
          if self._api_key:
a0a173ae   tangwang   last
54
              try:
d4cadc13   tangwang   翻译重构
55
56
57
                  self._qwen_client = OpenAI(api_key=self._api_key, base_url=base_url)
              except Exception as exc:
                  logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True)
a0a173ae   tangwang   last
58
          else:
d4cadc13   tangwang   翻译重构
59
60
61
62
63
64
              logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable")
  
          self.redis_client = None
          if self.use_cache and bool(cache_cfg.get("enabled", True)):
              self.redis_client = self._init_redis_client()
  
6f7840cf   tangwang   refactor: rename ...
65
66
67
68
69
70
71
72
73
74
      @property
      def supports_batch(self) -> bool:
          """
          标记该 provider 已支持列表输入。
  
          当前实现为循环单条调用(带缓存),不是真正的并行批量请求,
          但对上层来说可以直接传 list,返回 list
          """
          return True
  
d4cadc13   tangwang   翻译重构
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
      @staticmethod
      def _normalize_model(model: str) -> str:
          m = (model or "qwen").strip().lower()
          if m.startswith("qwen"):
              return "qwen-mt"
          raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'qwen-mt', 'qwen-mt-flash'")
  
      @staticmethod
      def _resolve_qwen_model_name(model: str) -> str:
          m = (model or "qwen").strip().lower()
          if m in {"qwen", "qwen-mt"}:
              return "qwen-mt-flash"
          return m
  
      @staticmethod
      def _default_api_key(model: str) -> Optional[str]:
          del model
          return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY")
  
      def _init_redis_client(self):
          try:
              client = redis.Redis(
                  host=REDIS_CONFIG.get("host", "localhost"),
                  port=REDIS_CONFIG.get("port", 6479),
                  password=REDIS_CONFIG.get("password"),
                  decode_responses=True,
                  socket_timeout=REDIS_CONFIG.get("socket_timeout", 1),
                  socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1),
                  retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False),
                  health_check_interval=10,
              )
              client.ping()
              return client
          except Exception as exc:
              logger.warning("Failed to initialize translation redis cache: %s", exc)
              return None
  
      def _build_cache_key(
          self,
          text: str,
          target_lang: str,
          source_lang: Optional[str],
          context: Optional[str],
          prompt: Optional[str],
      ) -> str:
          src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-"
          tgt = (target_lang or "").strip().lower()
          ctx = (context or "").strip() if self.cache_include_context else ""
          prm = (prompt or "").strip() if self.cache_include_prompt else ""
          payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nctx={ctx}\nprm={prm}\ntext={text}"
          digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()
          return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}"
a0a173ae   tangwang   last
127
128
129
  
      def translate(
          self,
6f7840cf   tangwang   refactor: rename ...
130
          text: Union[str, Sequence[str]],
a0a173ae   tangwang   last
131
132
133
          target_lang: str,
          source_lang: Optional[str] = None,
          context: Optional[str] = None,
d4cadc13   tangwang   翻译重构
134
          prompt: Optional[str] = None,
6f7840cf   tangwang   refactor: rename ...
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
      ) -> Union[Optional[str], List[Optional[str]]]:
          if isinstance(text, (list, tuple)):
              results: List[Optional[str]] = []
              for item in text:
                  if item is None or not str(item).strip():
                      results.append(item)  # type: ignore[arg-type]
                      continue
                  # 对于 batch,这里沿用单条的缓存与规则,逐条调用
                  out = self.translate(
                      text=str(item),
                      target_lang=target_lang,
                      source_lang=source_lang,
                      context=context,
                      prompt=prompt,
                  )
                  results.append(out)
              return results
  
          if not text or not str(text).strip():
              return text  # type: ignore[return-value]
a0a173ae   tangwang   last
155
  
d4cadc13   tangwang   翻译重构
156
157
158
          tgt = (target_lang or "").strip().lower()
          src = (source_lang or "").strip().lower() or None
          if tgt == "en" and self._is_english_text(text):
a0a173ae   tangwang   last
159
              return text
d4cadc13   tangwang   翻译重构
160
          if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)):
a0a173ae   tangwang   last
161
162
              return text
  
a0a173ae   tangwang   last
163
          translation_context = context or self.translation_context
d4cadc13   tangwang   翻译重构
164
165
166
          cached = self._get_cached_translation_redis(text, tgt, src, translation_context, prompt)
          if cached is not None:
              return cached
a0a173ae   tangwang   last
167
  
d4cadc13   tangwang   翻译重构
168
          result = self._translate_qwen(text, tgt, src)
a0a173ae   tangwang   last
169
  
d4cadc13   tangwang   翻译重构
170
171
          if result is not None:
              self._set_cached_translation_redis(text, tgt, result, src, translation_context, prompt)
a0a173ae   tangwang   last
172
173
174
175
176
177
178
          return result
  
      def _translate_qwen(
          self,
          text: str,
          target_lang: str,
          source_lang: Optional[str],
a0a173ae   tangwang   last
179
      ) -> Optional[str]:
d4cadc13   tangwang   翻译重构
180
          if not self._qwen_client:
a0a173ae   tangwang   last
181
              return None
d4cadc13   tangwang   翻译重构
182
183
184
185
186
          tgt_norm = (target_lang or "").strip().lower()
          src_norm = (source_lang or "").strip().lower()
          tgt_qwen = self.SOURCE_LANG_CODE_MAP.get(tgt_norm, tgt_norm.capitalize())
          src_qwen = "auto" if not src_norm or src_norm == "auto" else self.SOURCE_LANG_CODE_MAP.get(src_norm, src_norm.capitalize())
          start = time.time()
a0a173ae   tangwang   last
187
          try:
d4cadc13   tangwang   翻译重构
188
189
190
              completion = self._qwen_client.chat.completions.create(
                  model=self.qwen_model_name,
                  messages=[{"role": "user", "content": text}],
a0a173ae   tangwang   last
191
                  extra_body={
d4cadc13   tangwang   翻译重构
192
193
194
195
196
197
                      "translation_options": {
                          "source_lang": src_qwen,
                          "target_lang": tgt_qwen,
                      }
                  },
                  timeout=self.timeout,
a0a173ae   tangwang   last
198
              )
d4cadc13   tangwang   翻译重构
199
200
              content = (completion.choices[0].message.content or "").strip()
              if not content:
a0a173ae   tangwang   last
201
                  return None
d4cadc13   tangwang   翻译重构
202
203
204
              logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000)
              return content
          except Exception as exc:
a0a173ae   tangwang   last
205
              logger.warning(
d4cadc13   tangwang   翻译重构
206
207
208
209
210
211
                  "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s",
                  src_qwen,
                  tgt_qwen,
                  (time.time() - start) * 1000,
                  exc,
                  exc_info=True,
a0a173ae   tangwang   last
212
213
214
              )
              return None
  
a0a173ae   tangwang   last
215
  
a0a173ae   tangwang   last
216
217
218
219
220
221
      def _get_cached_translation_redis(
          self,
          text: str,
          target_lang: str,
          source_lang: Optional[str] = None,
          context: Optional[str] = None,
d4cadc13   tangwang   翻译重构
222
          prompt: Optional[str] = None,
a0a173ae   tangwang   last
223
      ) -> Optional[str]:
a0a173ae   tangwang   last
224
225
          if not self.redis_client:
              return None
d4cadc13   tangwang   翻译重构
226
          key = self._build_cache_key(text, target_lang, source_lang, context, prompt)
a0a173ae   tangwang   last
227
          try:
d4cadc13   tangwang   翻译重构
228
229
230
231
232
233
              value = self.redis_client.get(key)
              if value and self.cache_sliding_expiration:
                  self.redis_client.expire(key, self.expire_seconds)
              return value
          except Exception as exc:
              logger.warning("Redis get translation cache failed: %s", exc)
a0a173ae   tangwang   last
234
              return None
d4cadc13   tangwang   翻译重构
235
  
a0a173ae   tangwang   last
236
237
238
239
240
241
242
      def _set_cached_translation_redis(
          self,
          text: str,
          target_lang: str,
          translation: str,
          source_lang: Optional[str] = None,
          context: Optional[str] = None,
d4cadc13   tangwang   翻译重构
243
          prompt: Optional[str] = None,
a0a173ae   tangwang   last
244
      ) -> None:
a0a173ae   tangwang   last
245
246
          if not self.redis_client:
              return
d4cadc13   tangwang   翻译重构
247
          key = self._build_cache_key(text, target_lang, source_lang, context, prompt)
a0a173ae   tangwang   last
248
          try:
d4cadc13   tangwang   翻译重构
249
250
251
              self.redis_client.setex(key, self.expire_seconds, translation)
          except Exception as exc:
              logger.warning("Redis set translation cache failed: %s", exc)
a0a173ae   tangwang   last
252
253
  
      def _shop_lang_matches(self, shop_lang_lower: str, lang_code: str) -> bool:
a0a173ae   tangwang   last
254
255
256
257
258
259
260
261
262
263
          if not shop_lang_lower or not lang_code:
              return False
          if shop_lang_lower == lang_code:
              return True
          if lang_code == "zh" and "zh" in shop_lang_lower:
              return True
          if lang_code == "en" and "en" in shop_lang_lower:
              return True
          return False
  
d4cadc13   tangwang   翻译重构
264
      def get_translation_needs(self, detected_lang: str, supported_langs: List[str]) -> List[str]:
a0a173ae   tangwang   last
265
          if detected_lang in supported_langs:
d4cadc13   tangwang   翻译重构
266
              return [lang for lang in supported_langs if lang != detected_lang]
a0a173ae   tangwang   last
267
          return supported_langs
d4cadc13   tangwang   翻译重构
268
  
a0a173ae   tangwang   last
269
      def _is_english_text(self, text: str) -> bool:
a0a173ae   tangwang   last
270
271
          if not text or not text.strip():
              return True
d4cadc13   tangwang   翻译重构
272
          text_clean = re.sub(r"[\s\.,!?;:\-\'\"\(\)\[\]{}]", "", text)
a0a173ae   tangwang   last
273
274
          if not text_clean:
              return True
a0a173ae   tangwang   last
275
          ascii_count = sum(1 for c in text_clean if ord(c) < 128)
d4cadc13   tangwang   翻译重构
276
277
          return (ascii_count / len(text_clean)) > 0.8
  
a0a173ae   tangwang   last
278
      def _contains_chinese(self, text: str) -> bool:
a0a173ae   tangwang   last
279
280
          if not text:
              return False
d4cadc13   tangwang   翻译重构
281
282
          return bool(re.search(r"[\u4e00-\u9fff]", text))
  
a0a173ae   tangwang   last
283
      def _is_pure_number(self, text: str) -> bool:
a0a173ae   tangwang   last
284
285
          if not text or not text.strip():
              return False
d4cadc13   tangwang   翻译重构
286
287
          text_clean = re.sub(r"[\s\.,]", "", text.strip())
          return bool(text_clean) and text_clean.isdigit()