""" Translation provider - direct (in-process) or HTTP service. """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional, Union from concurrent.futures import Future, ThreadPoolExecutor import requests from config.services_config import get_translation_config, get_translation_base_url logger = logging.getLogger(__name__) class HttpTranslationProvider: """Translation via HTTP service.""" def __init__( self, base_url: str, model: str = "qwen", timeout_sec: float = 10.0, translation_context: Optional[str] = None, ): self.base_url = (base_url or "").rstrip("/") self.model = model or "qwen" self.timeout_sec = float(timeout_sec or 10.0) self.translation_context = translation_context or "e-commerce product search" self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="http-translator") def _translate_once( self, text: str, target_lang: str, source_lang: Optional[str] = None, ) -> Optional[str]: if not text or not str(text).strip(): return text try: url = f"{self.base_url}/translate" payload = { "text": text, "target_lang": target_lang, "source_lang": source_lang or "auto", "model": self.model, } response = requests.post(url, json=payload, timeout=self.timeout_sec) if response.status_code != 200: logger.warning( "HTTP translator failed: status=%s body=%s", response.status_code, (response.text or "")[:200], ) return None data = response.json() translated = data.get("translated_text") return translated if translated is not None else None except Exception as exc: logger.warning("HTTP translator request failed: %s", exc, exc_info=True) return None def translate( self, text: str, target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, ) -> Optional[str]: del context, prompt result = self._translate_once(text=text, target_lang=target_lang, source_lang=source_lang) return result if result is not None else text def translate_multi( self, text: str, target_langs: List[str], source_lang: Optional[str] = None, context: Optional[str] = None, async_mode: bool = True, prompt: Optional[str] = None, ) -> Dict[str, Optional[str]]: del context, async_mode, prompt out: Dict[str, Optional[str]] = {} for lang in target_langs: out[lang] = self.translate(text, lang, source_lang=source_lang) return out def translate_multi_async( self, text: str, target_langs: List[str], source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, ) -> Dict[str, Union[str, Future]]: del context, prompt out: Dict[str, Union[str, Future]] = {} for lang in target_langs: out[lang] = self.executor.submit(self.translate, text, lang, source_lang) return out def translate_for_indexing( self, text: str, shop_language: str, source_lang: Optional[str] = None, context: Optional[str] = None, prompt: Optional[str] = None, index_languages: Optional[List[str]] = None, ) -> Dict[str, Optional[str]]: del context, prompt langs = index_languages if index_languages else ["en", "zh"] source = source_lang or shop_language or "auto" out: Dict[str, Optional[str]] = {} for lang in langs: if lang == shop_language: out[lang] = text else: out[lang] = self.translate(text, target_lang=lang, source_lang=source) return out def create_translation_provider(query_config: Any = None) -> Any: """ Create translation provider from services config. query_config: optional, for api_key/glossary_id/context (used by direct provider). """ cfg = get_translation_config() provider = cfg.provider pc = cfg.get_provider_cfg() if provider in ("direct", "local", "inprocess"): from query.translator import Translator model = pc.get("model") or "qwen" qc = query_config or _empty_query_config() return Translator( model=model, api_key=getattr(qc, "translation_api_key", None), use_cache=True, glossary_id=getattr(qc, "translation_glossary_id", None), translation_context=getattr(qc, "translation_context", "e-commerce product search"), ) if provider in ("http", "service"): base_url = get_translation_base_url() model = pc.get("model") or "qwen" timeout = pc.get("timeout_sec", 10.0) qc = query_config or _empty_query_config() return HttpTranslationProvider( base_url=base_url, model=model, timeout_sec=float(timeout), translation_context=getattr(qc, "translation_context", "e-commerce product search"), ) logger.warning( "Unsupported translation provider '%s', fallback to direct.", provider, ) from query.translator import Translator qc = query_config or _empty_query_config() return Translator( model=pc.get("model") or "qwen", api_key=getattr(qc, "translation_api_key", None), use_cache=True, glossary_id=getattr(qc, "translation_glossary_id", None), translation_context=getattr(qc, "translation_context", "e-commerce product search"), ) def _empty_query_config() -> Any: """Minimal object with default translation attrs.""" class _QC: translation_api_key = None translation_glossary_id = None translation_context = "e-commerce product search" return _QC()