"""Qwen-MT translation orchestrator with cache and async helpers.""" from __future__ import annotations import hashlib import logging import os import re import time from typing import Dict, List, Optional import redis from openai import OpenAI from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG from config.services_config import get_translation_cache_config from config.translate_prompts import SOURCE_LANG_CODE_MAP logger = logging.getLogger(__name__) class Translator: QWEN_DEFAULT_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" QWEN_MODEL = "qwen-mt-flash" def __init__( self, model: str = "qwen", api_key: Optional[str] = None, use_cache: bool = True, timeout: int = 10, glossary_id: Optional[str] = None, translation_context: Optional[str] = None, ): self.model = self._normalize_model(model) self.timeout = int(timeout) self.use_cache = bool(use_cache) self.glossary_id = glossary_id self.translation_context = translation_context or "e-commerce product search" cache_cfg = get_translation_cache_config() self.cache_prefix = str(cache_cfg.get("key_prefix", "trans:v2")) self.expire_seconds = int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600)) self.cache_sliding_expiration = bool(cache_cfg.get("sliding_expiration", True)) self.cache_include_context = bool(cache_cfg.get("key_include_context", True)) self.cache_include_prompt = bool(cache_cfg.get("key_include_prompt", True)) self.cache_include_source_lang = bool(cache_cfg.get("key_include_source_lang", True)) self.qwen_model_name = self._resolve_qwen_model_name(model) self._api_key = api_key or self._default_api_key(self.model) self._qwen_client: Optional[OpenAI] = None base_url = os.getenv("DASHSCOPE_BASE_URL") or self.QWEN_DEFAULT_BASE_URL if self._api_key: try: self._qwen_client = OpenAI(api_key=self._api_key, base_url=base_url) except Exception as exc: logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) else: logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") self.redis_client = None if self.use_cache and bool(cache_cfg.get("enabled", True)): self.redis_client = self._init_redis_client() @staticmethod def _normalize_model(model: str) -> str: m = (model or "qwen").strip().lower() if m.startswith("qwen"): return "qwen-mt" raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'qwen-mt', 'qwen-mt-flash'") @staticmethod def _resolve_qwen_model_name(model: str) -> str: m = (model or "qwen").strip().lower() if m in {"qwen", "qwen-mt"}: return "qwen-mt-flash" return m @staticmethod def _default_api_key(model: str) -> Optional[str]: del model return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") def _init_redis_client(self): try: client = redis.Redis( host=REDIS_CONFIG.get("host", "localhost"), port=REDIS_CONFIG.get("port", 6479), password=REDIS_CONFIG.get("password"), decode_responses=True, socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), health_check_interval=10, ) client.ping() return client except Exception as exc: logger.warning("Failed to initialize translation redis cache: %s", exc) return None def _build_cache_key( self, text: str, target_lang: str, source_lang: Optional[str], context: Optional[str], prompt: Optional[str], ) -> str: src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-" tgt = (target_lang or "").strip().lower() ctx = (context or "").strip() if self.cache_include_context else "" prm = (prompt or "").strip() if self.cache_include_prompt else "" payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nctx={ctx}\nprm={prm}\ntext={text}" digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}" def translate( self, text: str, target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, ) -> Optional[str]: if not text or not text.strip(): return text tgt = (target_lang or "").strip().lower() src = (source_lang or "").strip().lower() or None if tgt == "en" and self._is_english_text(text): return text if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): return text translation_context = context or self.translation_context cached = self._get_cached_translation_redis(text, tgt, src, translation_context, prompt) if cached is not None: return cached result = self._translate_qwen(text, tgt, src) if result is not None: self._set_cached_translation_redis(text, tgt, result, src, translation_context, prompt) return result def _translate_qwen( self, text: str, target_lang: str, source_lang: Optional[str], ) -> Optional[str]: if not self._qwen_client: return None tgt_norm = (target_lang or "").strip().lower() src_norm = (source_lang or "").strip().lower() tgt_qwen = self.SOURCE_LANG_CODE_MAP.get(tgt_norm, tgt_norm.capitalize()) src_qwen = "auto" if not src_norm or src_norm == "auto" else self.SOURCE_LANG_CODE_MAP.get(src_norm, src_norm.capitalize()) start = time.time() try: completion = self._qwen_client.chat.completions.create( model=self.qwen_model_name, messages=[{"role": "user", "content": text}], extra_body={ "translation_options": { "source_lang": src_qwen, "target_lang": tgt_qwen, } }, timeout=self.timeout, ) content = (completion.choices[0].message.content or "").strip() if not content: return None logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) return content except Exception as exc: logger.warning( "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", src_qwen, tgt_qwen, (time.time() - start) * 1000, exc, exc_info=True, ) return None def _get_cached_translation_redis( self, text: str, target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, ) -> Optional[str]: if not self.redis_client: return None key = self._build_cache_key(text, target_lang, source_lang, context, prompt) try: value = self.redis_client.get(key) if value and self.cache_sliding_expiration: self.redis_client.expire(key, self.expire_seconds) return value except Exception as exc: logger.warning("Redis get translation cache failed: %s", exc) return None def _set_cached_translation_redis( self, text: str, target_lang: str, translation: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, ) -> None: if not self.redis_client: return key = self._build_cache_key(text, target_lang, source_lang, context, prompt) try: self.redis_client.setex(key, self.expire_seconds, translation) except Exception as exc: logger.warning("Redis set translation cache failed: %s", exc) def _shop_lang_matches(self, shop_lang_lower: str, lang_code: str) -> bool: if not shop_lang_lower or not lang_code: return False if shop_lang_lower == lang_code: return True if lang_code == "zh" and "zh" in shop_lang_lower: return True if lang_code == "en" and "en" in shop_lang_lower: return True return False def get_translation_needs(self, detected_lang: str, supported_langs: List[str]) -> List[str]: if detected_lang in supported_langs: return [lang for lang in supported_langs if lang != detected_lang] return supported_langs def _is_english_text(self, text: str) -> bool: if not text or not text.strip(): return True text_clean = re.sub(r"[\s\.,!?;:\-\'\"\(\)\[\]{}]", "", text) if not text_clean: return True ascii_count = sum(1 for c in text_clean if ord(c) < 128) return (ascii_count / len(text_clean)) > 0.8 def _contains_chinese(self, text: str) -> bool: if not text: return False return bool(re.search(r"[\u4e00-\u9fff]", text)) def _is_pure_number(self, text: str) -> bool: if not text or not text.strip(): return False text_clean = re.sub(r"[\s\.,]", "", text.strip()) return bool(text_clean) and text_clean.isdigit()