diff --git a/api/routes/search.py b/api/routes/search.py index 63aef84..e9bef63 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -472,7 +472,6 @@ async def get_es_raw_document(spu_id: str, http_request: Request): index_name = get_tenant_index_name(tenant_id) body = { - "size": 5, "query": { "bool": { "filter": [ diff --git a/api/translator_app.py b/api/translator_app.py index df6a0a9..096c3c2 100644 --- a/api/translator_app.py +++ b/api/translator_app.py @@ -98,7 +98,9 @@ from pydantic import BaseModel, Field sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from query.qwen_mt_translate import Translator -from config.env_config import DEEPL_AUTH_KEY, DASHSCOPE_API_KEY, REDIS_CONFIG +from query.llm_translate import LLMTranslatorProvider +from query.deepl_provider import DeepLProvider +from config.services_config import get_translation_config # Configure logging logging.basicConfig( @@ -107,23 +109,52 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -# Fixed translation prompt -TRANSLATION_PROMPT = "Translate the original text into an English product SKU name. Requirements: Ensure accurate and complete transmission of the original information, with concise, clear, authentic, and professional language." - # Global translator instances cache (keyed by model) -_translators: Dict[str, Translator] = {} +_translators: Dict[str, object] = {} + +def _resolve_default_model() -> str: + """ + Resolve translator model from services.translation config first. -def get_translator(model: str = "qwen") -> Translator: + Priority: + 1) TRANSLATION_MODEL env (explicit runtime override) + 2) services.translation.provider + providers..model + 3) qwen-mt + """ + env_model = (os.getenv("TRANSLATION_MODEL") or "").strip() + if env_model: + return env_model + try: + cfg = get_translation_config() + provider = (cfg.provider or "").strip().lower() + provider_cfg = cfg.get_provider_cfg() if hasattr(cfg, "get_provider_cfg") else {} + model = (provider_cfg.get("model") or "").strip().lower() if isinstance(provider_cfg, dict) else "" + if provider == "llm": + return "llm" + if provider in {"qwen-mt", "direct", "http"}: + return model or "qwen-mt" + if provider == "deepl": + return "deepl" + except Exception: + pass + return "qwen-mt" + + +def get_translator(model: str = "qwen") -> object: """Get or create translator instance for the specified model.""" global _translators if model not in _translators: logger.info(f"Initializing translator with model: {model}...") - _translators[model] = Translator( - model=model, - use_cache=True, - timeout=10 - ) + normalized = (model or "qwen").strip().lower() + if normalized in {"qwen", "qwen-mt", "qwen-mt-flash", "qwen-mt-flush"}: + _translators[model] = Translator(model=normalized, use_cache=True, timeout=10) + elif normalized == "deepl": + _translators[model] = DeepLProvider(api_key=None, timeout=10.0) + elif normalized == "llm": + _translators[model] = LLMTranslatorProvider() + else: + raise ValueError(f"Unsupported model: {model}") logger.info(f"Translator initialized with model: {model}") return _translators[model] @@ -134,7 +165,9 @@ class TranslationRequest(BaseModel): text: str = Field(..., description="Text to translate") target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)") source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)") - model: Optional[str] = Field("qwen", description="Translation model: 'qwen' (default) or 'deepl'") + model: Optional[str] = Field(None, description="Translation model: qwen-mt | deepl | llm") + context: Optional[str] = Field(None, description="Optional translation scene or context") + prompt: Optional[str] = Field(None, description="Optional prompt override") class Config: json_schema_extra = { @@ -142,7 +175,8 @@ class TranslationRequest(BaseModel): "text": "商品名称", "target_lang": "en", "source_lang": "zh", - "model": "qwen" + "model": "llm", + "context": "sku_name" } } @@ -180,8 +214,7 @@ app.add_middleware( async def startup_event(): """Initialize translator on startup.""" logger.info("Starting Translation Service API on port 6006") - # Get default model from environment variable or use 'qwen' - default_model = os.getenv("TRANSLATION_MODEL", "qwen") + default_model = _resolve_default_model() try: get_translator(model=default_model) logger.info(f"Translation service ready with default model: {default_model}") @@ -194,15 +227,17 @@ async def startup_event(): async def health_check(): """Health check endpoint.""" try: - default_model = os.getenv("TRANSLATION_MODEL", "qwen") - translator = get_translator(model=default_model) + # 仅做轻量级本地检查,避免在健康检查中触发潜在的阻塞初始化或外部依赖 + default_model = _resolve_default_model() + # 如果启动事件成功,默认模型通常会已经初始化到缓存中 + translator = _translators.get(default_model) or next(iter(_translators.values()), None) return { "status": "healthy", "service": "translation", "default_model": default_model, "available_models": list(_translators.keys()), "translator_initialized": translator is not None, - "cache_enabled": translator.use_cache if translator else False + "cache_enabled": bool(getattr(translator, "use_cache", False)) } except Exception as e: logger.error(f"Health check failed: {e}") @@ -238,11 +273,11 @@ async def translate(request: TranslationRequest): ) # Validate model parameter - model = request.model.lower() if request.model else "qwen" - if model not in ['qwen', 'deepl']: + model = request.model.lower() if request.model else _resolve_default_model().lower() + if model not in ["qwen", "qwen-mt", "deepl", "llm"]: raise HTTPException( status_code=400, - detail=f"Invalid model: {model}. Supported models: 'qwen', 'deepl'" + detail="Invalid model. Supported models: 'qwen-mt', 'deepl', 'llm'" ) try: @@ -254,7 +289,8 @@ async def translate(request: TranslationRequest): text=request.text, target_lang=request.target_lang, source_lang=request.source_lang, - prompt=TRANSLATION_PROMPT + context=request.context, + prompt=request.prompt, ) if translated_text is None: @@ -269,7 +305,7 @@ async def translate(request: TranslationRequest): source_lang=request.source_lang, translated_text=translated_text, status="success", - model=translator.model + model=str(getattr(translator, "model", model)) ) except HTTPException: diff --git a/config/__init__.py b/config/__init__.py index ee693e9..32de35a 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -28,6 +28,7 @@ from .services_config import ( get_translation_base_url, get_embedding_base_url, get_rerank_service_url, + get_translation_cache_config, ServiceConfig, ) @@ -53,5 +54,6 @@ __all__ = [ 'get_translation_base_url', 'get_embedding_base_url', 'get_rerank_service_url', + 'get_translation_cache_config', 'ServiceConfig', ] diff --git a/config/config.yaml b/config/config.yaml index d824b3c..b18090e 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -81,18 +81,6 @@ query_config: translation_service: "deepl" translation_api_key: null # 通过环境变量设置 - # 翻译提示词配置(用于提高翻译质量,作为DeepL API的context参数) - translation_prompts: - # 商品标题翻译提示词 - product_title_zh: "请将原文翻译成中文商品SKU名称,要求:确保精确、完整地传达原文信息的基础上,语言简洁清晰、地道、专业。" - product_title_en: "Translate the original text into an English product SKU name. Requirements: Ensure accurate and complete transmission of the original information, with concise, clear, authentic, and professional language." - # query翻译提示词 - query_zh: "电商领域" - query_en: "e-commerce domain" - # 默认翻译用词 - default_zh: "电商领域" - default_en: "e-commerce domain" - # 返回字段配置(_source includes) # null表示返回所有字段,[]表示不返回任何字段,列表表示只返回指定字段 source_fields: null @@ -119,16 +107,24 @@ rerank: # 可扩展服务/provider 注册表(单一配置源) services: translation: - provider: "llm" # direct | http | google(reserved) + provider: "llm" # qwen-mt | deepl | http | llm base_url: "http://127.0.0.1:6006" - model: "qwen" + model: "qwen-flash" timeout_sec: 10.0 + cache: + enabled: true + key_prefix: "trans:v2" + ttl_seconds: 62208000 + sliding_expiration: true + key_include_context: true + key_include_prompt: true + key_include_source_lang: true providers: - direct: - model: "qwen" + qwen-mt: + model: "qwen-mt-flush" http: base_url: "http://127.0.0.1:6006" - model: "qwen" + model: "qwen-mt-flush" timeout_sec: 10.0 llm: model: "qwen-flash" @@ -136,6 +132,11 @@ services: # base_url 留空则使用 DASHSCOPE_BASE_URL 或默认地域 base_url: "" timeout_sec: 30.0 + deepl: + model: "deepl" + timeout_sec: 10.0 + # 可选:用于术语表翻译(由 query_config.translation_glossary_id 衔接) + glossary_id: "" google: enabled: false project_id: "" diff --git a/config/config_loader.py b/config/config_loader.py index 9de8578..10be1ae 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -42,7 +42,6 @@ class QueryConfig: translation_api_key: Optional[str] = None translation_glossary_id: Optional[str] = None translation_context: str = "e-commerce product search" - translation_prompts: Dict[str, str] = field(default_factory=dict) # Embedding field names text_embedding_field: Optional[str] = "title_embedding" @@ -250,7 +249,6 @@ class ConfigLoader: translation_service=query_config_data.get("translation_service") or "deepl", translation_glossary_id=query_config_data.get("translation_glossary_id"), translation_context=query_config_data.get("translation_context") or "e-commerce product search", - translation_prompts=query_config_data.get("translation_prompts", {}), text_embedding_field=query_config_data.get("text_embedding_field"), image_embedding_field=query_config_data.get("image_embedding_field"), source_fields=query_config_data.get("source_fields"), diff --git a/config/services_config.py b/config/services_config.py index 0f4ae3a..659e17e 100644 --- a/config/services_config.py +++ b/config/services_config.py @@ -72,12 +72,12 @@ def _resolve_translation() -> ServiceConfig: config_provider=cfg.get("provider"), capability="translation", ) - if provider not in ("direct", "local", "inprocess", "http", "service"): + if provider not in ("qwen-mt", "deepl", "direct", "local", "inprocess", "http", "service", "llm"): raise ValueError(f"Unsupported translation provider: {provider}") # Env override for http base_url env_url = os.getenv("TRANSLATION_SERVICE_URL") - if env_url and provider == "http": + if env_url and provider in ("http", "service"): providers = dict(providers) providers["http"] = dict(providers.get("http", {})) providers["http"]["base_url"] = env_url.rstrip("/") @@ -206,6 +206,27 @@ def get_translation_base_url() -> str: return str(base).rstrip("/") +def get_translation_cache_config() -> Dict[str, Any]: + """ + Resolve translation cache policy from services.translation.cache. + + All translation cache key/TTL behavior should be configured in config.yaml, + not hardcoded in code. + """ + raw = _load_services_raw() + cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} + cache_cfg = cfg.get("cache", {}) if isinstance(cfg.get("cache"), dict) else {} + return { + "enabled": bool(cache_cfg.get("enabled", True)), + "key_prefix": str(cache_cfg.get("key_prefix", "trans:v2")), + "ttl_seconds": int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600)), + "sliding_expiration": bool(cache_cfg.get("sliding_expiration", True)), + "key_include_context": bool(cache_cfg.get("key_include_context", True)), + "key_include_prompt": bool(cache_cfg.get("key_include_prompt", True)), + "key_include_source_lang": bool(cache_cfg.get("key_include_source_lang", True)), + } + + def get_embedding_base_url() -> str: """Resolve embedding HTTP base URL.""" base = ( diff --git a/config/translate_prompts.py b/config/translate_prompts.py new file mode 100644 index 0000000..d1e8f92 --- /dev/null +++ b/config/translate_prompts.py @@ -0,0 +1,82 @@ +SOURCE_LANG_CODE_MAP = { + "en": "English", + "zh": "Chinese", + "zh_tw": "Traditional Chinese", + "ru": "Russian", + "ja": "Japanese", + "ko": "Korean", + "es": "Spanish", + "fr": "French", + "pt": "Portuguese", + "de": "German", + "it": "Italian", + "th": "Thai", + "vi": "Vietnamese", + "id": "Indonesian", + "ms": "Malay", + "ar": "Arabic", + "hi": "Hindi", + "he": "Hebrew", + "my": "Burmese", + "ta": "Tamil", + "ur": "Urdu", + "bn": "Bengali", + "pl": "Polish", + "nl": "Dutch", + "ro": "Romanian", + "tr": "Turkish", + "km": "Khmer", + "lo": "Lao", + "yue": "Cantonese", + "cs": "Czech", + "el": "Greek", + "sv": "Swedish", + "hu": "Hungarian", + "da": "Danish", + "fi": "Finnish", + "uk": "Ukrainian", + "bg": "Bulgarian", +} + +TARGET_LANG_CODE_MAP = {v: k for k, v in SOURCE_LANG_CODE_MAP.items()} + +TRANSLATION_PROMPTS = { + "general": { + "zh": "你是一名专业的 {source_lang}({src_lang_code})到 {target_lang}({tgt_lang_code})翻译专家,请准确传达原文含义并符合{target_lang}语言习惯,只输出翻译结果:{text}", + "en": "You are a professional {source_lang} ({src_lang_code}) to {target_lang} ({tgt_lang_code}) translator. Accurately convey the meaning following {target_lang} grammar and usage, output only the translation: {text}", + "ru": "Вы профессиональный переводчик с {source_lang} ({src_lang_code}) на {target_lang} ({tgt_lang_code}). Точно передайте смысл текста, соблюдая нормы {target_lang}, выводите только перевод: {text}", + "ar": "أنت مترجم محترف من {source_lang} ({src_lang_code}) إلى {target_lang} ({tgt_lang_code}). انقل المعنى بدقة وفق قواعد {target_lang} وأخرج الترجمة فقط: {text}", + "ja": "あなたは {source_lang}({src_lang_code})から {target_lang}({tgt_lang_code})へのプロ翻訳者です。意味を正確に伝え、{target_lang}の表現に従い、翻訳のみ出力してください:{text}", + "es": "Eres un traductor profesional de {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Transmite con precisión el significado y devuelve solo la traducción: {text}", + "de": "Du bist ein professioneller Übersetzer von {source_lang} ({src_lang_code}) nach {target_lang} ({tgt_lang_code}). Gib die Bedeutung korrekt wieder und gib nur die Übersetzung aus: {text}", + "fr": "Vous êtes un traducteur professionnel de {source_lang} ({src_lang_code}) vers {target_lang} ({tgt_lang_code}). Transmettez fidèlement le sens et produisez uniquement la traduction : {text}", + "it": "Sei un traduttore professionista da {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Trasmetti accuratamente il significato e restituisci solo la traduzione: {text}", + "pt": "Você é um tradutor profissional de {source_lang} ({src_lang_code}) para {target_lang} ({tgt_lang_code}). Transmita o significado com precisão e produza apenas a tradução: {text}" + }, + + "sku_name": { + "zh": "你是一名专业的 {source_lang}({src_lang_code})到 {target_lang}({tgt_lang_code})电商翻译专家,请将原文翻译为{target_lang}商品SKU名称,要求准确完整、简洁专业,只输出结果:{text}", + "en": "You are a professional {source_lang} ({src_lang_code}) to {target_lang} ({tgt_lang_code}) ecommerce translator. Translate into a concise and accurate {target_lang} product SKU name, output only the result: {text}", + "ru": "Вы переводчик e-commerce с {source_lang} ({src_lang_code}) на {target_lang} ({tgt_lang_code}). Переведите в краткое и точное название SKU товара на {target_lang}, выводите только результат: {text}", + "ar": "أنت مترجم تجارة إلكترونية من {source_lang} ({src_lang_code}) إلى {target_lang} ({tgt_lang_code}). ترجم إلى اسم SKU للمنتج بلغة {target_lang} بدقة واختصار، وأخرج النتيجة فقط: {text}", + "ja": "{source_lang}({src_lang_code})から {target_lang}({tgt_lang_code})へのEC翻訳者として、簡潔で正確な{target_lang}の商品SKU名に翻訳し、結果のみ出力してください:{text}", + "es": "Eres un traductor ecommerce de {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Traduce a un nombre SKU de producto en {target_lang}, preciso y conciso, devuelve solo el resultado: {text}", + "de": "Du bist ein E-Commerce-Übersetzer von {source_lang} ({src_lang_code}) nach {target_lang} ({tgt_lang_code}). Übersetze in einen präzisen und kurzen {target_lang} Produkt-SKU-Namen, nur Ergebnis ausgeben: {text}", + "fr": "Vous êtes un traducteur e-commerce de {source_lang} ({src_lang_code}) vers {target_lang} ({tgt_lang_code}). Traduisez en un nom SKU produit {target_lang} précis et concis, sortie uniquement : {text}", + "it": "Sei un traduttore ecommerce da {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Traduce in un nome SKU prodotto {target_lang} conciso e accurato, restituisci solo il risultato: {text}", + "pt": "Você é um tradutor de e-commerce de {source_lang} ({src_lang_code}) para {target_lang} ({tgt_lang_code}). Traduza para um nome SKU de produto {target_lang} conciso e preciso, produza apenas o resultado: {text}" + }, + + "ecommerce_search_query": { + "zh": "你是一名专业的 {source_lang}({src_lang_code})到 {target_lang}({tgt_lang_code})翻译助手,请将电商搜索词准确翻译为{target_lang}并符合搜索习惯,只输出结果:{text}", + "en": "You are a professional {source_lang} ({src_lang_code}) to {target_lang} ({tgt_lang_code}) translator. Translate the ecommerce search query accurately following {target_lang} search habits, output only the result: {text}", + "ru": "Вы переводчик с {source_lang} ({src_lang_code}) на {target_lang} ({tgt_lang_code}). Переведите поисковый запрос e-commerce с учётом привычек поиска, выводите только результат: {text}", + "ar": "أنت مترجم من {source_lang} ({src_lang_code}) إلى {target_lang} ({tgt_lang_code}). ترجم عبارة البحث للتجارة الإلكترونية بما يناسب عادات البحث وأخرج النتيجة فقط: {text}", + "ja": "{source_lang}({src_lang_code})から {target_lang}({tgt_lang_code})への翻訳者として、EC検索キーワードを{target_lang}の検索習慣に合わせて翻訳し、結果のみ出力してください:{text}", + "es": "Eres un traductor de {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Traduce la consulta de búsqueda ecommerce según los hábitos de búsqueda y devuelve solo el resultado: {text}", + "de": "Du bist ein Übersetzer von {source_lang} ({src_lang_code}) nach {target_lang} ({tgt_lang_code}). Übersetze die E-Commerce-Suchanfrage entsprechend den Suchgewohnheiten, nur Ergebnis ausgeben: {text}", + "fr": "Vous êtes un traducteur de {source_lang} ({src_lang_code}) vers {target_lang} ({tgt_lang_code}). Traduisez la requête de recherche e-commerce selon les habitudes de recherche, sortie uniquement : {text}", + "it": "Sei un traduttore da {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Traduce la query di ricerca ecommerce secondo le abitudini di ricerca e restituisci solo il risultato: {text}", + "pt": "Você é um tradutor de {source_lang} ({src_lang_code}) para {target_lang} ({tgt_lang_code}). Traduza a consulta de busca de ecommerce conforme os hábitos de busca e produza apenas o resultado: {text}" + } +} diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 47e0e07..d50469f 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -1814,7 +1814,8 @@ curl "http://localhost:6007/health" "text": "商品名称", "target_lang": "en", "source_lang": "zh", - "model": "qwen" + "model": "qwen", + "context": "sku_name" } ``` @@ -1823,7 +1824,8 @@ curl "http://localhost:6007/health" | `text` | string | Y | 待翻译文本 | | `target_lang` | string | Y | 目标语言:`zh`、`en`、`ru` 等 | | `source_lang` | string | N | 源语言,不传则自动检测 | -| `model` | string | N | `qwen`(默认)或 `deepl` | +| `model` | string | N | `qwen`(默认)、`deepl` 或 `llm` | +| `context` | string | N | 翻译场景参数:商品标题翻译使用 `sku_name`,搜索请求中的 query 翻译使用 `ecommerce_search_query`,其它通用场景可不传或使用 `general` | **响应**: ```json diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index d761346..7de8899 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -36,7 +36,6 @@ class SPUDocumentTransformer: searchable_option_dimensions: List[str], tenant_config: Optional[Dict[str, Any]] = None, translator: Optional[Any] = None, - translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, enable_title_embedding: bool = True, image_encoder: Optional[Any] = None, @@ -50,7 +49,6 @@ class SPUDocumentTransformer: searchable_option_dimensions: 可搜索的option维度列表 tenant_config: 租户配置(包含主语言和翻译配置) translator: 翻译器实例(可选,如果提供则启用翻译功能) - translation_prompts: 翻译提示词配置(可选) encoder: 文本编码器实例(可选,用于生成title_embedding) enable_title_embedding: 是否启用标题向量化(默认True) image_encoder: 图片编码器实例(可选,需实现 encode_image_urls(urls) -> List[Optional[np.ndarray]]) @@ -60,12 +58,33 @@ class SPUDocumentTransformer: self.searchable_option_dimensions = searchable_option_dimensions self.tenant_config = tenant_config or {} self.translator = translator - self.translation_prompts = translation_prompts or {} self.encoder = encoder self.enable_title_embedding = enable_title_embedding self.image_encoder = image_encoder self.enable_image_embedding = bool(enable_image_embedding and image_encoder is not None) + def _translate_index_languages( + self, + text: str, + source_lang: str, + index_languages: List[str], + scene: str, + ) -> Dict[str, Optional[str]]: + translations: Dict[str, Optional[str]] = {} + if not self.translator or not text or not str(text).strip(): + return translations + for lang in index_languages: + if lang == source_lang: + translations[lang] = text + continue + translations[lang] = self.translator.translate( + text=text, + target_lang=lang, + source_lang=source_lang, + context=scene, + ) + return translations + def transform_spu_to_doc( self, tenant_id: str, @@ -322,15 +341,12 @@ class SPUDocumentTransformer: title_text = str(spu_row['title']) translations: Dict[str, Optional[str]] = {} if self.translator: - prompt_zh = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') - prompt_en = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') - translations = self.translator.translate_for_indexing( - title_text, - shop_language=primary_lang, + translations = self._translate_index_languages( + text=title_text, source_lang=primary_lang, - prompt=prompt_zh if primary_lang == 'zh' else prompt_en, index_languages=index_langs, - ) or {} + scene="product_title", + ) _set_lang_obj("title", title_text, translations) # Brief @@ -338,14 +354,12 @@ class SPUDocumentTransformer: brief_text = str(spu_row['brief']) translations = {} if self.translator: - prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translations = self.translator.translate_for_indexing( - brief_text, - shop_language=primary_lang, + translations = self._translate_index_languages( + text=brief_text, source_lang=primary_lang, - prompt=prompt, index_languages=index_langs, - ) or {} + scene="default", + ) _set_lang_obj("brief", brief_text, translations) # Description @@ -353,14 +367,12 @@ class SPUDocumentTransformer: desc_text = str(spu_row['description']) translations = {} if self.translator: - prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translations = self.translator.translate_for_indexing( - desc_text, - shop_language=primary_lang, + translations = self._translate_index_languages( + text=desc_text, source_lang=primary_lang, - prompt=prompt, index_languages=index_langs, - ) or {} + scene="default", + ) _set_lang_obj("description", desc_text, translations) # Vendor @@ -368,14 +380,12 @@ class SPUDocumentTransformer: vendor_text = str(spu_row['vendor']) translations = {} if self.translator: - prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translations = self.translator.translate_for_indexing( - vendor_text, - shop_language=primary_lang, + translations = self._translate_index_languages( + text=vendor_text, source_lang=primary_lang, - prompt=prompt, index_languages=index_langs, - ) or {} + scene="default", + ) _set_lang_obj("vendor", vendor_text, translations) def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index aa1e5b0..257403e 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -39,7 +39,6 @@ class IncrementalIndexerService: self._config: Optional[Any] = None self._config_lock = threading.Lock() self._translator: Optional[Any] = None - self._translation_prompts: Optional[Dict[str, Any]] = None self._searchable_option_dimensions: Optional[List[str]] = None self._shared_text_encoder: Optional[Any] = None self._shared_image_encoder: Optional[Any] = None @@ -52,7 +51,6 @@ class IncrementalIndexerService: def _eager_init(self) -> None: """Strict eager initialization. Any dependency failure should fail fast.""" self._config = ConfigLoader("config/config.yaml").load_config() - self._translation_prompts = getattr(self._config.query_config, "translation_prompts", {}) or {} self._searchable_option_dimensions = ( getattr(self._config.spu_config, "searchable_option_dimensions", None) or ["option1", "option2", "option3"] @@ -110,7 +108,6 @@ class IncrementalIndexerService: tenant_id=tenant_id, searchable_option_dimensions=self._searchable_option_dimensions, translator=self._translator, - translation_prompts=self._translation_prompts, encoder=encoder, enable_title_embedding=False, # batch fill later image_encoder=image_encoder, diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index d3887dc..89cb5a7 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -57,7 +57,6 @@ def create_document_transformer( tenant_id: str, searchable_option_dimensions: Optional[list] = None, translator: Optional[Any] = None, - translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, enable_title_embedding: bool = True, image_encoder: Optional[Any] = None, @@ -72,7 +71,6 @@ def create_document_transformer( tenant_id: 租户ID searchable_option_dimensions: 可搜索的option维度列表(如果为None则从配置加载) translator: 翻译器实例(如果为None则根据配置初始化) - translation_prompts: 翻译提示词配置(如果为None则从配置加载) encoder: 文本编码器实例(如果为None且enable_title_embedding为True则根据配置初始化) enable_title_embedding: 是否启用标题向量化(默认True) image_encoder: 图片编码器(可选,需实现 encode_image_urls(urls)) @@ -89,7 +87,6 @@ def create_document_transformer( if ( searchable_option_dimensions is None or translator is None - or translation_prompts is None or (encoder is None and enable_title_embedding) or config is None ): @@ -107,9 +104,6 @@ def create_document_transformer( translator = create_translation_provider(config.query_config) - if translation_prompts is None: - translation_prompts = config.query_config.translation_prompts - # 初始化encoder(如果启用标题向量化且未提供encoder) if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding: from embeddings.text_encoder import TextEmbeddingEncoder @@ -122,7 +116,6 @@ def create_document_transformer( searchable_option_dimensions=searchable_option_dimensions, tenant_config=tenant_config, translator=translator, - translation_prompts=translation_prompts, encoder=encoder, enable_title_embedding=enable_title_embedding, image_encoder=image_encoder, diff --git a/indexer/test_indexing.py b/indexer/test_indexing.py index a1f0093..1d2aef2 100755 --- a/indexer/test_indexing.py +++ b/indexer/test_indexing.py @@ -285,7 +285,6 @@ def test_document_transformer(): searchable_option_dimensions=['option1', 'option2', 'option3'], tenant_config=tenant_config, translator=translator, - translation_prompts=config.query_config.translation_prompts ) # 转换文档 diff --git a/providers/translation.py b/providers/translation.py index 0b8e522..e2b1db9 100644 --- a/providers/translation.py +++ b/providers/translation.py @@ -1,12 +1,8 @@ -""" -Translation provider - direct (in-process) or HTTP service. -""" +"""Translation provider factory and HTTP provider implementation.""" from __future__ import annotations import logging -from typing import Any, Dict, List, Optional, Union - -from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Dict, Optional import requests from config.services_config import get_translation_config, get_translation_base_url @@ -22,19 +18,18 @@ class HttpTranslationProvider: base_url: str, model: str = "qwen", timeout_sec: float = 10.0, - translation_context: Optional[str] = None, ): self.base_url = (base_url or "").rstrip("/") self.model = model or "qwen" self.timeout_sec = float(timeout_sec or 10.0) - self.translation_context = translation_context or "e-commerce product search" - self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="http-translator") def _translate_once( self, text: str, target_lang: str, source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, ) -> Optional[str]: if not text or not str(text).strip(): return text @@ -46,6 +41,10 @@ class HttpTranslationProvider: "source_lang": source_lang or "auto", "model": self.model, } + if context: + payload["context"] = context + if prompt: + payload["prompt"] = prompt response = requests.post(url, json=payload, timeout=self.timeout_sec) if response.status_code != 200: logger.warning( @@ -69,58 +68,13 @@ class HttpTranslationProvider: context: Optional[str] = None, prompt: Optional[str] = None, ) -> Optional[str]: - del context, prompt - result = self._translate_once(text=text, target_lang=target_lang, source_lang=source_lang) - return result if result is not None else text - - def translate_multi( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - async_mode: bool = True, - prompt: Optional[str] = None, - ) -> Dict[str, Optional[str]]: - del context, async_mode, prompt - out: Dict[str, Optional[str]] = {} - for lang in target_langs: - out[lang] = self.translate(text, lang, source_lang=source_lang) - return out - - def translate_multi_async( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - ) -> Dict[str, Union[str, Future]]: - del context, prompt - out: Dict[str, Union[str, Future]] = {} - for lang in target_langs: - out[lang] = self.executor.submit(self.translate, text, lang, source_lang) - return out - - def translate_for_indexing( - self, - text: str, - shop_language: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - index_languages: Optional[List[str]] = None, - ) -> Dict[str, Optional[str]]: - del context, prompt - langs = index_languages if index_languages else ["en", "zh"] - source = source_lang or shop_language or "auto" - out: Dict[str, Optional[str]] = {} - for lang in langs: - if lang == shop_language: - out[lang] = text - else: - out[lang] = self.translate(text, target_lang=lang, source_lang=source) - return out + return self._translate_once( + text=text, + target_lang=target_lang, + source_lang=source_lang, + context=context, + prompt=prompt, + ) def create_translation_provider(query_config: Any = None) -> Any: @@ -133,9 +87,9 @@ def create_translation_provider(query_config: Any = None) -> Any: provider = cfg.provider pc = cfg.get_provider_cfg() - if provider in ("direct", "local", "inprocess"): + if provider in ("qwen-mt", "direct", "local", "inprocess"): from query.qwen_mt_translate import Translator - model = pc.get("model") or "qwen" + model = pc.get("model") or "qwen-mt-flash" qc = query_config or _empty_query_config() return Translator( model=model, @@ -145,7 +99,7 @@ def create_translation_provider(query_config: Any = None) -> Any: translation_context=getattr(qc, "translation_context", "e-commerce product search"), ) - if provider in ("http", "service"): + elif provider in ("http", "service"): base_url = get_translation_base_url() model = pc.get("model") or "qwen" timeout = pc.get("timeout_sec", 10.0) @@ -154,7 +108,26 @@ def create_translation_provider(query_config: Any = None) -> Any: base_url=base_url, model=model, timeout_sec=float(timeout), - translation_context=getattr(qc, "translation_context", "e-commerce product search"), + ) + + elif provider == "llm": + from query.llm_translate import LLMTranslatorProvider + model = pc.get("model") + timeout = float(pc.get("timeout_sec", 30.0)) + base_url = (pc.get("base_url") or "").strip() or None + return LLMTranslatorProvider( + model=model, + timeout_sec=timeout, + base_url=base_url, + ) + + elif provider == "deepl": + from query.deepl_provider import DeepLProvider + qc = query_config or _empty_query_config() + return DeepLProvider( + api_key=getattr(qc, "translation_api_key", None), + timeout=float(pc.get("timeout_sec", 10.0)), + glossary_id=pc.get("glossary_id") or getattr(qc, "translation_glossary_id", None), ) raise ValueError(f"Unsupported translation provider: {provider}") diff --git a/query/deepl_provider.py b/query/deepl_provider.py new file mode 100644 index 0000000..916778b --- /dev/null +++ b/query/deepl_provider.py @@ -0,0 +1,203 @@ +""" +DeepL backend provider. + +This module only handles network calls to DeepL. +It does not handle cache, async fanout, or fallback semantics. +""" + +from __future__ import annotations + +import logging +import os +import re +from typing import Dict, Optional, Tuple + +import requests +from config.services_config import get_translation_config + + +logger = logging.getLogger(__name__) + +DEFAULT_CONTEXTS: Dict[str, Dict[str, str]] = { + "sku_name": { + "zh": "商品SKU名称", + "en": "product SKU name", + }, + "ecommerce_search_query": { + "zh": "电商", + "en": "e-commerce", + }, + "general": { + "zh": "", + "en": "", + }, +} +SCENE_NAMES = frozenset(DEFAULT_CONTEXTS.keys()) + + +def _merge_contexts(raw: object) -> Dict[str, Dict[str, str]]: + merged: Dict[str, Dict[str, str]] = { + scene: dict(lang_map) for scene, lang_map in DEFAULT_CONTEXTS.items() + } + if not isinstance(raw, dict): + return merged + for scene, lang_map in raw.items(): + if not isinstance(lang_map, dict): + continue + scene_name = str(scene or "").strip() + if not scene_name: + continue + merged.setdefault(scene_name, {}) + for lang, value in lang_map.items(): + lang_key = str(lang or "").strip().lower() + context_value = str(value or "").strip() + if lang_key and context_value: + merged[scene_name][lang_key] = context_value + return merged + + +class DeepLProvider: + API_URL = "https://api.deepl.com/v2/translate" # Pro tier + LANG_CODE_MAP = { + "zh": "ZH", + "en": "EN", + "ru": "RU", + "ar": "AR", + "ja": "JA", + "es": "ES", + "de": "DE", + "fr": "FR", + "it": "IT", + "pt": "PT", + } + + def __init__( + self, + api_key: Optional[str], + *, + timeout: float = 10.0, + glossary_id: Optional[str] = None, + ) -> None: + cfg = get_translation_config() + provider_cfg = cfg.providers.get("deepl", {}) if isinstance(cfg.providers, dict) else {} + self.api_key = api_key or os.getenv("DEEPL_AUTH_KEY") + self.timeout = float(provider_cfg.get("timeout_sec") or timeout or 10.0) + self.glossary_id = glossary_id or provider_cfg.get("glossary_id") + self.model = "deepl" + self.context_presets = _merge_contexts(provider_cfg.get("contexts")) + if not self.api_key: + logger.warning("DEEPL_AUTH_KEY not set; DeepL translation is unavailable") + + def _resolve_request_context( + self, + target_lang: str, + context: Optional[str], + prompt: Optional[str], + ) -> Optional[str]: + if prompt: + return prompt + if context in SCENE_NAMES: + scene_map = self.context_presets.get(context) or self.context_presets.get("default") or {} + tgt = (target_lang or "").strip().lower() + return scene_map.get(tgt) or scene_map.get("en") + if context: + return context + scene_map = self.context_presets.get("default") or {} + tgt = (target_lang or "").strip().lower() + return scene_map.get(tgt) or scene_map.get("en") + + def translate( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Optional[str]: + if not self.api_key: + return None + + target_code = self.LANG_CODE_MAP.get((target_lang or "").lower(), (target_lang or "").upper()) + headers = { + "Authorization": f"DeepL-Auth-Key {self.api_key}", + "Content-Type": "application/json", + } + + api_context = self._resolve_request_context(target_lang, context, prompt) + text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) + + payload = { + "text": [text_to_translate], + "target_lang": target_code, + } + if source_lang: + payload["source_lang"] = self.LANG_CODE_MAP.get(source_lang.lower(), source_lang.upper()) + if api_context: + payload["context"] = api_context + if self.glossary_id: + payload["glossary_id"] = self.glossary_id + + try: + response = requests.post(self.API_URL, headers=headers, json=payload, timeout=self.timeout) + if response.status_code != 200: + logger.warning( + "[deepl] Failed | status=%s tgt=%s body=%s", + response.status_code, + target_code, + (response.text or "")[:200], + ) + return None + + data = response.json() + translations = data.get("translations") or [] + if not translations: + return None + translated = translations[0].get("text") + if not translated: + return None + if needs_extraction: + translated = self._extract_term_from_translation(translated, text, target_code) + return translated + except requests.Timeout: + logger.warning("[deepl] Timeout | tgt=%s timeout=%.1fs", target_code, self.timeout) + return None + except Exception as exc: + logger.warning("[deepl] Exception | tgt=%s error=%s", target_code, exc, exc_info=True) + return None + + def _add_ecommerce_context( + self, + text: str, + source_lang: Optional[str], + context: Optional[str], + ) -> Tuple[str, bool]: + if not context or "e-commerce" not in context.lower(): + return text, False + if (source_lang or "").lower() != "zh": + return text, False + + term = (text or "").strip() + if len(term.split()) == 1 and len(term) <= 2: + return f"购买 {term}", True + return text, False + + def _extract_term_from_translation( + self, + translated_text: str, + original_text: str, + target_lang_code: str, + ) -> str: + del original_text + if target_lang_code != "EN": + return translated_text + + words = translated_text.strip().split() + if len(words) <= 1: + return translated_text + context_words = {"buy", "purchase", "product", "item", "commodity", "goods"} + for word in reversed(words): + normalized = re.sub(r"[.,!?;:]+$", "", word.lower()) + if normalized not in context_words: + return normalized + return re.sub(r"[.,!?;:]+$", "", words[-1].lower()) + diff --git a/query/llm_translate.py b/query/llm_translate.py index 24e22c4..2b06510 100644 --- a/query/llm_translate.py +++ b/query/llm_translate.py @@ -1,21 +1,9 @@ """ -LLM-based translation helper using Qwen chat model. +LLM-based translation backend (DashScope-compatible OpenAI API). -This module provides a thin wrapper around DashScope's `qwen-flash` model -for high-quality, prompt-controlled translation, independent of the main -`Translator` (machine translation) pipeline. - -Usage example: - - from query.llm_translate import llm_translate - - result = llm_translate( - text="我看到这个视频后没有笑", - target_lang="en", - source_lang="zh", - source_lang_label="中文", - target_lang_label="英文", - ) +Failure semantics are strict: +- success: translated string +- failure: None """ from __future__ import annotations @@ -23,113 +11,159 @@ from __future__ import annotations import logging import os import time -from typing import Dict, Optional +from typing import Optional from openai import OpenAI from config.env_config import DASHSCOPE_API_KEY from config.services_config import get_translation_config +from config.translate_prompts import TRANSLATION_PROMPTS, SOURCE_LANG_CODE_MAP + logger = logging.getLogger(__name__) -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 -# -# 默认保持与现有翻译/索引脚本相同的美国地域,可通过环境变量覆盖: -# DASHSCOPE_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 DEFAULT_QWEN_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" -QWEN_MODEL_NAME = "qwen-flash" - - -# 由调用方提供的语言标签/代码填充,占位符说明: -# - source_lang: 源语言的人类可读名称(按目标语言本地化,例如 "中文", "English") -# - target_lang: 目标语言的人类可读名称 -# - src_lang_code: 源语言代码,例如 "zh" -# - tgt_lang_code: 目标语言代码,例如 "en" -TRANSLATION_PROMPTS: Dict[str, str] = { - "zh": """你是一名专业的 {source_lang}({src_lang_code})到 {target_lang}({tgt_lang_code})翻译员。你的目标是在遵循 {target_lang} 的语法、词汇和文化习惯的前提下,准确传达原始 {source_lang} 文本的含义和细微差别。请只输出 {target_lang} 的翻译内容,不要包含任何额外的解释或评论。请将以下 {source_lang} 文本翻译成 {target_lang}: - -{text}""", - "en": """You are a professional {source_lang} ({src_lang_code}) to {target_lang} ({tgt_lang_code}) translator. Your goal is to accurately convey the meaning and nuances of the original {source_lang} text while adhering to {target_lang} grammar, vocabulary, and cultural sensitivities. Produce only the {target_lang} translation, without any additional explanations or commentary. Please translate the following {source_lang} text into {target_lang}: - -{text}""", - "ru": """Вы профессиональный переводчик с {source_lang} ({src_lang_code}) на {target_lang} ({tgt_lang_code}). Ваша задача — точно передать смысл и нюансы исходного текста на {source_lang}, соблюдая грамматику, лексику и культурные особенности {target_lang}. Выводите только перевод на {target_lang}, без каких-либо дополнительных объяснений или комментариев. Пожалуйста, переведите следующий текст с {source_lang} на {target_lang}: - -{text}""", - "ar": """أنت مترجم محترف من {source_lang} ({src_lang_code}) إلى {target_lang} ({tgt_lang_code}). هدفك هو نقل المعنى والدلالات الدقيقة للنص الأصلي بلغة {source_lang} بدقة، مع الالتزام بقواعد اللغة والمفردات والحساسيات الثقافية الخاصة بلغة {target_lang}. قم بإنتاج الترجمة إلى {target_lang} فقط دون أي شروحات أو تعليقات إضافية. يرجى ترجمة النص التالي من {source_lang} إلى {target_lang}: - -{text}""", - "ja": """あなたは {source_lang}({src_lang_code})から {target_lang}({tgt_lang_code})へのプロの翻訳者です。{target_lang} の文法、語彙、文化的配慮に従いながら、元の {source_lang} テキストの意味やニュアンスを正確に伝えることが目的です。追加の説明やコメントは一切含めず、{target_lang} の翻訳のみを出力してください。次の {source_lang} テキストを {target_lang} に翻訳してください: - -{text}""", - "es": """Eres un traductor profesional de {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Tu objetivo es transmitir con precisión el significado y los matices del texto original en {source_lang}, respetando la gramática, el vocabulario y las sensibilidades culturales de {target_lang}. Produce únicamente la traducción en {target_lang}, sin explicaciones ni comentarios adicionales. Por favor, traduce el siguiente texto de {source_lang} a {target_lang}: - -{text}""", - "de": """Du bist ein professioneller Übersetzer von {source_lang} ({src_lang_code}) nach {target_lang} ({tgt_lang_code}). Dein Ziel ist es, die Bedeutung und Nuancen des ursprünglichen {source_lang}-Textes genau zu vermitteln und dabei die Grammatik, den Wortschatz und die kulturellen Besonderheiten von {target_lang} zu berücksichtigen. Gib ausschließlich die Übersetzung in {target_lang} aus, ohne zusätzliche Erklärungen oder Kommentare. Bitte übersetze den folgenden {source_lang}-Text in {target_lang}: - -{text}""", - "fr": """Vous êtes un traducteur professionnel de {source_lang} ({src_lang_code}) vers {target_lang} ({tgt_lang_code}). Votre objectif est de transmettre fidèlement le sens et les nuances du texte original en {source_lang}, tout en respectant la grammaire, le vocabulaire et les sensibilités culturelles de {target_lang}. Produisez uniquement la traduction en {target_lang}, sans explications ni commentaires supplémentaires. Veuillez traduire le texte suivant de {source_lang} vers {target_lang} : - -{text}""", - "it": """Sei un traduttore professionista da {source_lang} ({src_lang_code}) a {target_lang} ({tgt_lang_code}). Il tuo obiettivo è trasmettere con precisione il significato e le sfumature del testo originale in {source_lang}, rispettando la grammatica, il vocabolario e le sensibilità culturali di {target_lang}. Produci solo la traduzione in {target_lang}, senza spiegazioni o commenti aggiuntivi. Per favore traduci il seguente testo da {source_lang} a {target_lang}: - -{text}""", - "pt": """Você é um tradutor profissional de {source_lang} ({src_lang_code}) para {target_lang} ({tgt_lang_code}). Seu objetivo é transmitir com precisão o significado e as nuances do texto original em {source_lang}, respeitando a gramática, o vocabulário e as sensibilidades culturais de {target_lang}. Produza apenas a tradução em {target_lang}, sem quaisquer explicações ou comentários adicionais. Por favor, traduza o seguinte texto de {source_lang} para {target_lang}: - -{text}""", -} - - -def _get_qwen_client(base_url: Optional[str] = None) -> Optional[OpenAI]: - """ - Lazily construct an OpenAI-compatible client for DashScope. - - Uses DASHSCOPE_API_KEY and base_url (provider config / env) to configure endpoint. - """ - api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - if not api_key: - logger.warning("DASHSCOPE_API_KEY not set; llm-based translation will be disabled") - return None - - # 优先使用显式传入的 base_url,其次环境变量,最后默认地域。 - base_url = ( - (base_url or "").strip() - or os.getenv("DASHSCOPE_BASE_URL") - or DEFAULT_QWEN_BASE_URL - ) - - try: - client = OpenAI(api_key=api_key, base_url=base_url) - return client - except Exception as exc: - logger.error("Failed to initialize DashScope OpenAI client: %s", exc, exc_info=True) - return None +DEFAULT_LLM_MODEL = "qwen-flash" def _build_prompt( text: str, + *, + source_lang: Optional[str], target_lang: str, - source_lang_label: str, - target_lang_label: str, - src_lang_code: str, - tgt_lang_code: str, + scene: Optional[str], ) -> str: """ - Build translation prompt for given target language, defaulting to English template. + 从 config.translate_prompts.TRANSLATION_PROMPTS 中构建提示词。 + + 要求:模板必须包含 {source_lang}({src_lang_code}){target_lang}({tgt_lang_code})。 + 这里统一使用 code 作为占位的 lang 与 label,外部接口仍然只传语言 code。 """ - key = (target_lang or "").lower() - template = TRANSLATION_PROMPTS.get(key) or TRANSLATION_PROMPTS["en"] + tgt = (target_lang or "").lower() or "en" + src = (source_lang or "auto").lower() + + # 将业务上下文 scene 映射为模板分组名 + normalized_scene = (scene or "").strip() or "general" + # 如果出现历史词,则报错,用于发现错误 + if normalized_scene in {"query", "ecommerce_search", "ecommerce_search_query"}: + group_key = "ecommerce_search_query" + elif normalized_scene in {"product_title", "sku_name"}: + group_key = "sku_name" + else: + group_key = normalized_scene + group = TRANSLATION_PROMPTS.get(group_key) or TRANSLATION_PROMPTS["general"] + + # 先按目标语言 code 取模板,取不到回退到英文 + template = group.get(tgt) or group.get("en") + if not template: + # 理论上不会发生,兜底一个简单模板 + template = ( + "You are a professional {source_lang} ({src_lang_code}) to " + "{target_lang} ({tgt_lang_code}) translator, output only the translation: {text}" + ) + + # 目前不额外维护语言名称映射,直接使用 code 作为 label + source_lang_label = SOURCE_LANG_CODE_MAP.get(src, src) + target_lang_label = SOURCE_LANG_CODE_MAP.get(tgt, tgt) + return template.format( source_lang=source_lang_label, + src_lang_code=src, target_lang=target_lang_label, - src_lang_code=src_lang_code, - tgt_lang_code=tgt_lang_code, + tgt_lang_code=tgt, text=text, ) +class LLMTranslatorProvider: + def __init__( + self, + *, + model: Optional[str] = None, + timeout_sec: float = 30.0, + base_url: Optional[str] = None, + ) -> None: + cfg = get_translation_config() + llm_cfg = cfg.providers.get("llm", {}) if isinstance(cfg.providers, dict) else {} + self.model = model or llm_cfg.get("model") or DEFAULT_LLM_MODEL + self.timeout_sec = float(llm_cfg.get("timeout_sec") or timeout_sec or 30.0) + self.base_url = ( + (base_url or "").strip() + or (llm_cfg.get("base_url") or "").strip() + or os.getenv("DASHSCOPE_BASE_URL") + or DEFAULT_QWEN_BASE_URL + ) + self.client = self._create_client() + + def _create_client(self) -> Optional[OpenAI]: + api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") + if not api_key: + logger.warning("DASHSCOPE_API_KEY not set; llm translation unavailable") + return None + try: + return OpenAI(api_key=api_key, base_url=self.base_url) + except Exception as exc: + logger.error("Failed to initialize llm translation client: %s", exc, exc_info=True) + return None + + def translate( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None, + ) -> Optional[str]: + if not text or not str(text).strip(): + return text + if not self.client: + return None + + tgt = (target_lang or "").lower() or "en" + src = (source_lang or "auto").lower() + scene = context or "default" + user_prompt = prompt or _build_prompt( + text=text, + source_lang=src, + target_lang=tgt, + scene=scene, + ) + start = time.time() + try: + logger.info( + "[llm] Request | src=%s tgt=%s model=%s prompt=%s", + src, + tgt, + self.model, + user_prompt, + ) + completion = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": user_prompt}], + timeout=self.timeout_sec, + ) + content = (completion.choices[0].message.content or "").strip() + latency_ms = (time.time() - start) * 1000 + if not content: + logger.warning("[llm] Empty result | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms) + return None + logger.info("[llm] Response | src=%s tgt=%s response=%s", src, tgt, content) + logger.info("[llm] Success | src=%s tgt=%s latency=%.1fms", src, tgt, latency_ms) + return content + except Exception as exc: + latency_ms = (time.time() - start) * 1000 + logger.warning( + "[llm] Failed | src=%s tgt=%s latency=%.1fms error=%s", + src, + tgt, + latency_ms, + exc, + exc_info=True, + ) + return None + + def llm_translate( text: str, target_lang: str, @@ -139,100 +173,13 @@ def llm_translate( target_lang_label: Optional[str] = None, timeout_sec: Optional[float] = None, ) -> Optional[str]: - """ - Translate text with Qwen chat model using rich prompts. - - - 根据目标语言选择提示词,如果没匹配到则退回英文模板。 - - 不对 text 做语言检测或缓存,调用方自行控制。 - - Args: - text: 原始文本 - target_lang: 目标语言代码(如 "zh", "en") - source_lang: 源语言代码(可选,不影响提示词选择,仅用于日志) - source_lang_label: 源语言展示名称,用于 prompt(默认使用 source_lang) - target_lang_label: 目标语言展示名称,用于 prompt(默认使用 target_lang) - timeout_sec: 请求超时时间(秒,可选;若未配置则从 config 读取或采用默认) - - Returns: - 翻译后的文本;如失败则返回 None。 - """ - if not text or not str(text).strip(): - return text - - cfg = get_translation_config() - provider_cfg = cfg.providers.get("llm", {}) if isinstance(cfg.providers, dict) else {} - - model_name = provider_cfg.get("model") or QWEN_MODEL_NAME - req_timeout = float(provider_cfg.get("timeout_sec") or timeout_sec or 30.0) - base_url = (provider_cfg.get("base_url") or "").strip() or None - - client = _get_qwen_client(base_url=base_url) - if not client: - # 无法调用云端,直接回退 - logger.warning( - "[llm_translate] Client init failed; returning original text. " - "text=%r target_lang=%s source_lang=%s", - text[:80], - target_lang, - source_lang or "auto", - ) - return text - - tgt = (target_lang or "").lower() or "en" - src = (source_lang or "auto").lower() - src_label = source_lang_label or src - tgt_label = target_lang_label or tgt - - prompt = _build_prompt( + provider = LLMTranslatorProvider(timeout_sec=timeout_sec or 30.0) + return provider.translate( text=text, - target_lang=tgt, - source_lang_label=src_label, - target_lang_label=tgt_label, - src_lang_code=src, - tgt_lang_code=tgt, + target_lang=target_lang, + source_lang=source_lang, + context=None, ) - start = time.time() - try: - completion = client.chat.completions.create( - model=model_name, - messages=[ - { - "role": "user", - "content": prompt, - } - ], - timeout=req_timeout, - ) - content = (completion.choices[0].message.content or "").strip() - duration_ms = (time.time() - start) * 1000 - logger.info( - "[llm_translate] Success | model=%s src=%s tgt=%s latency=%.1fms text=%r -> %r", - model_name, - src, - tgt, - duration_ms, - text[:80], - content[:80], - ) - return content or text - except Exception as exc: - duration_ms = (time.time() - start) * 1000 - logger.warning( - "[llm_translate] Failed | model=%s src=%s tgt=%s latency=%.1fms error=%s", - model_name, - src, - tgt, - duration_ms, - exc, - exc_info=True, - ) - # 安全回退:出错时返回原文,避免中断上游流程 - return text - - -__all__ = [ - "TRANSLATION_PROMPTS", - "llm_translate", -] +__all__ = ["LLMTranslatorProvider", "llm_translate"] diff --git a/query/query_parser.py b/query/query_parser.py index 2c81891..1927421 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Any, Union import numpy as np import logging import re -from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, wait from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig @@ -135,6 +135,7 @@ class QueryParser: cfg = get_translation_config() logger.info("Initializing translator at QueryParser construction (provider=%s)...", cfg.provider) self._translator = create_translation_provider(self.config.query_config) + self._translation_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="query-translation") @property def text_encoder(self) -> TextEmbeddingEncoder: @@ -265,6 +266,7 @@ class QueryParser: # Stage 4: Translation (with async support and conditional waiting) translations = {} translation_futures = {} + translation_executor = None index_langs = ["en", "zh"] try: # 根据租户配置的 index_languages 决定翻译目标语言 @@ -287,48 +289,33 @@ class QueryParser: target_langs = target_langs_for_translation if target_langs: - # Use e-commerce context for better disambiguation - translation_context = self.config.query_config.translation_context - # For query translation, we use a general prompt (not language-specific) - query_prompt = ( - self.config.query_config.translation_prompts.get(f"query_{detected_lang}") - or self.config.query_config.translation_prompts.get("query_en") - or self.config.query_config.translation_prompts.get("default_en") - or self.config.query_config.translation_prompts.get("default_zh") - ) - # Determine if we need to wait for translation results # If detected_lang is not in index_languages, we must wait for translation need_wait_translation = detected_lang not in index_langs - + if need_wait_translation: - # Use async method that returns Futures, so we can wait for results - translation_results = self.translator.translate_multi_async( - query_text, - target_langs, - source_lang=detected_lang, - context=translation_context, - prompt=query_prompt + translation_executor = ThreadPoolExecutor( + max_workers=max(1, min(len(target_langs), 4)), + thread_name_prefix="query-translation-wait", ) - # Separate cached results and futures - for lang, result in translation_results.items(): - if isinstance(result, Future): - translation_futures[lang] = result - else: - translations[lang] = result + for lang in target_langs: + translation_futures[lang] = translation_executor.submit( + self.translator.translate, + query_text, + lang, + detected_lang, + "ecommerce_search_query", + ) else: - # Use async mode: returns cached translations immediately, missing ones translated in background - translations = self.translator.translate_multi( - query_text, - target_langs, - source_lang=detected_lang, - context=translation_context, - async_mode=True, - prompt=query_prompt - ) - # Filter out None values (missing translations that are being processed async) - translations = {k: v for k, v in translations.items() if v is not None} - + for lang in target_langs: + self._translation_executor.submit( + self.translator.translate, + query_text, + lang, + detected_lang, + "ecommerce_search_query", + ) + if translations: log_info(f"Translation completed (cache hit) | Query text: '{query_text}' | Results: {translations}") if translation_futures: @@ -407,15 +394,18 @@ class QueryParser: all_futures.append(embedding_future) future_to_lang[embedding_future] = ('embedding', None) - # Wait for all futures to complete - for future in as_completed(all_futures): + # Enforce a hard timeout for translation-related work (300ms budget) + done, not_done = wait(all_futures, timeout=0.3) + for future in done: task_type, lang = future_to_lang[future] try: result = future.result() if task_type == 'translation': if result: translations[lang] = result - log_info(f"Translation completed | Query text: '{query_text}' | Target language: {lang} | Translation result: '{result}'") + log_info( + f"Translation completed | Query text: '{query_text}' | Target language: {lang} | Translation result: '{result}'" + ) if context: context.store_intermediate_result(f'translation_{lang}', result) elif task_type == 'embedding': @@ -434,10 +424,27 @@ class QueryParser: log_info(error_msg) if context: context.add_warning(error_msg) - + + # Log timeouts for any futures that did not finish within 300ms + if not_done: + for future in not_done: + task_type, lang = future_to_lang[future] + if task_type == 'translation': + timeout_msg = ( + f"Translation timeout (>300ms) | Language: {lang} | " + f"Query text: '{query_text}'" + ) + else: + timeout_msg = "Query vector generation timeout (>300ms), proceeding without embedding result" + log_info(timeout_msg) + if context: + context.add_warning(timeout_msg) + # Clean up encoding executor if encoding_executor: encoding_executor.shutdown(wait=False) + if translation_executor: + translation_executor.shutdown(wait=False) # Update translations in context after all are complete if translations and context: diff --git a/query/qwen_mt_translate.py b/query/qwen_mt_translate.py index ee39071..aec4e85 100644 --- a/query/qwen_mt_translate.py +++ b/query/qwen_mt_translate.py @@ -1,92 +1,27 @@ -""" -Translation service for multi-language query support. +"""Qwen-MT translation orchestrator with cache and async helpers.""" -Supports multiple translation models: -- Qwen (default): Alibaba Cloud DashScope API using qwen-mt-flash model -- DeepL: DeepL API for high-quality translations - -重要说明(Qwen 机翻限速): -- 当前默认使用的 `qwen-mt-flash` 为云端机翻模型,**官方限速较低,约 RPM=60(每分钟约 60 请求)** -- 在高并发场景必须依赖 Redis 翻译缓存与批量预热,避免在用户实时请求路径上直接打满 DashScope 限流 -- 若业务侧存在大规模离线翻译或更高吞吐需求,建议评估 DeepL 或自建翻译后端 - -使用方法 (Usage): - -```python -from query.translator import Translator - -# 使用默认的 qwen 模型(推荐) -translator = Translator() # 默认使用 qwen 模型 - -# 或显式指定模型 -translator = Translator(model='qwen') # 使用 qwen 模型 -translator = Translator(model='deepl') # 使用 DeepL 模型 - -# 翻译文本 -result = translator.translate( - text="我看到这个视频后没有笑", - target_lang="en", - source_lang="auto" # 自动检测源语言 -) -``` - -配置说明 (Configuration): -- Qwen 模型需要设置 DASHSCOPE_API_KEY 环境变量(在 .env 文件中) -- DeepL 模型需要设置 DEEPL_AUTH_KEY 环境变量(在 .env 文件中) - -Qwen 模型参考文档: -- 官方文档:https://help.aliyun.com/zh/model-studio/get-api-key -- 模型:qwen-mt-flash(快速翻译模型) - -DeepL 官方文档: -https://developers.deepl.com/api-reference/translate/request-translation -""" +from __future__ import annotations +import hashlib +import logging import os -import requests import re -import redis -from concurrent.futures import ThreadPoolExecutor, Future -from datetime import timedelta -from typing import Dict, List, Optional, Union -import logging import time +from typing import Dict, List, Optional -logger = logging.getLogger(__name__) - -from config.env_config import DEEPL_AUTH_KEY, DASHSCOPE_API_KEY, REDIS_CONFIG +import redis from openai import OpenAI +from config.env_config import DASHSCOPE_API_KEY, REDIS_CONFIG +from config.services_config import get_translation_cache_config +from config.translate_prompts import SOURCE_LANG_CODE_MAP -class Translator: - """ - Multi-language translator supporting Qwen and DeepL APIs. - - Default model is 'qwen' which uses Alibaba Cloud DashScope API. - """ -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 +logger = logging.getLogger(__name__) - DEEPL_API_URL = "https://api.deepl.com/v2/translate" # Pro tier - QWEN_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" # 北京地域 - # QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" # 新加坡 - # 如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 - QWEN_MODEL = "qwen-mt-flash" # 快速翻译模型 - # Language code mapping - LANG_CODE_MAP = { - 'zh': 'ZH', - 'en': 'EN', - 'ru': 'RU', - 'ar': 'AR', - 'ja': 'JA', - 'es': 'ES', - 'de': 'DE', - 'fr': 'FR', - 'it': 'IT', - 'pt': 'PT', - } +class Translator: + QWEN_DEFAULT_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" + QWEN_MODEL = "qwen-mt-flash" def __init__( self, @@ -95,77 +30,90 @@ class Translator: use_cache: bool = True, timeout: int = 10, glossary_id: Optional[str] = None, - translation_context: Optional[str] = None + translation_context: Optional[str] = None, ): - """ - Initialize translator. - - Args: - model: Translation model to use. Options: 'qwen' (default) or 'deepl' - api_key: API key for the selected model (or None to use from config/env) - use_cache: Whether to cache translations - timeout: Request timeout in seconds - glossary_id: DeepL glossary ID for custom terminology (optional, only for DeepL) - translation_context: Context hint for translation (e.g., "e-commerce", "product search") - """ - self.model = model.lower() - if self.model not in ['qwen', 'deepl']: - raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'deepl'") - - # Get API key from config if not provided - if api_key is None: - if self.model == 'qwen': - api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - else: # deepl - api_key = DEEPL_AUTH_KEY or os.getenv("DEEPL_AUTH_KEY") - - self.api_key = api_key - self.timeout = timeout - self.use_cache = use_cache + self.model = self._normalize_model(model) + self.timeout = int(timeout) + self.use_cache = bool(use_cache) self.glossary_id = glossary_id self.translation_context = translation_context or "e-commerce product search" - - # Initialize OpenAI client for Qwen if needed - self.qwen_client = None - if self.model == 'qwen': - if not self.api_key: - logger.warning("DASHSCOPE_API_KEY not set. Qwen translation will not work.") - else: - self.qwen_client = OpenAI( - api_key=self.api_key, - base_url=self.QWEN_BASE_URL, - ) - # Initialize Redis cache if enabled - if use_cache: + cache_cfg = get_translation_cache_config() + self.cache_prefix = str(cache_cfg.get("key_prefix", "trans:v2")) + self.expire_seconds = int(cache_cfg.get("ttl_seconds", 360 * 24 * 3600)) + self.cache_sliding_expiration = bool(cache_cfg.get("sliding_expiration", True)) + self.cache_include_context = bool(cache_cfg.get("key_include_context", True)) + self.cache_include_prompt = bool(cache_cfg.get("key_include_prompt", True)) + self.cache_include_source_lang = bool(cache_cfg.get("key_include_source_lang", True)) + + self.qwen_model_name = self._resolve_qwen_model_name(model) + self._api_key = api_key or self._default_api_key(self.model) + self._qwen_client: Optional[OpenAI] = None + base_url = os.getenv("DASHSCOPE_BASE_URL") or self.QWEN_DEFAULT_BASE_URL + if self._api_key: try: - self.redis_client = redis.Redis( - host=REDIS_CONFIG.get('host', 'localhost'), - port=REDIS_CONFIG.get('port', 6479), - password=REDIS_CONFIG.get('password'), - decode_responses=True, # Return str instead of bytes - socket_timeout=REDIS_CONFIG.get('socket_timeout', 1), - socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1), - retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False), - health_check_interval=10, # 避免复用坏连接 - ) - # Test connection - self.redis_client.ping() - expire_days = REDIS_CONFIG.get('translation_cache_expire_days', 360) - self.expire_time = timedelta(days=expire_days) - self.expire_seconds = int(self.expire_time.total_seconds()) # Redis 需要秒数 - self.cache_prefix = REDIS_CONFIG.get('translation_cache_prefix', 'trans') - logger.info("Redis cache initialized for translations") - except Exception as e: - logger.warning(f"Failed to initialize Redis cache: {e}, falling back to no cache") - self.redis_client = None - self.cache = None + self._qwen_client = OpenAI(api_key=self._api_key, base_url=base_url) + except Exception as exc: + logger.warning("Failed to initialize qwen-mt client: %s", exc, exc_info=True) else: - self.redis_client = None - self.cache = None - - # Thread pool for async translation - self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="translator") + logger.warning("DASHSCOPE_API_KEY not set; qwen-mt translation unavailable") + + self.redis_client = None + if self.use_cache and bool(cache_cfg.get("enabled", True)): + self.redis_client = self._init_redis_client() + + @staticmethod + def _normalize_model(model: str) -> str: + m = (model or "qwen").strip().lower() + if m.startswith("qwen"): + return "qwen-mt" + raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'qwen-mt', 'qwen-mt-flash'") + + @staticmethod + def _resolve_qwen_model_name(model: str) -> str: + m = (model or "qwen").strip().lower() + if m in {"qwen", "qwen-mt"}: + return "qwen-mt-flash" + return m + + @staticmethod + def _default_api_key(model: str) -> Optional[str]: + del model + return DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") + + def _init_redis_client(self): + try: + client = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=True, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + client.ping() + return client + except Exception as exc: + logger.warning("Failed to initialize translation redis cache: %s", exc) + return None + + def _build_cache_key( + self, + text: str, + target_lang: str, + source_lang: Optional[str], + context: Optional[str], + prompt: Optional[str], + ) -> str: + src = (source_lang or "auto").strip().lower() if self.cache_include_source_lang else "-" + tgt = (target_lang or "").strip().lower() + ctx = (context or "").strip() if self.cache_include_context else "" + prm = (prompt or "").strip() if self.cache_include_prompt else "" + payload = f"model={self.model}\nsrc={src}\ntgt={tgt}\nctx={ctx}\nprm={prm}\ntext={text}" + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"{self.cache_prefix}:{self.model}:{src}:{tgt}:{digest}" def translate( self, @@ -173,99 +121,27 @@ class Translator: target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, - prompt: Optional[str] = None + prompt: Optional[str] = None, ) -> Optional[str]: - """ - Translate text to target language (synchronous mode). - - Args: - text: Text to translate - target_lang: Target language code ('zh', 'en', 'ru', etc.) - source_lang: Source language code (option al, auto-detect if None) - context: Additional context for translation (overrides default context) - prompt: Translation prompt/instruction (optional, for better translation quality) - - Returns: - Translated text or None if translation fails - """ if not text or not text.strip(): return text - # Normalize language codes - target_lang = target_lang.lower() - if source_lang: - source_lang = source_lang.lower() - - # Optimization: Skip translation if not needed - if target_lang == 'en' and self._is_english_text(text): - logger.info(f"[Translator] Text is already English, skipping translation: '{text[:50]}...'") + tgt = (target_lang or "").strip().lower() + src = (source_lang or "").strip().lower() or None + if tgt == "en" and self._is_english_text(text): return text - - if target_lang == 'zh' and (self._contains_chinese(text) or self._is_pure_number(text)): - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: Skip translation (contains Chinese or pure number)" - ) + if tgt == "zh" and (self._contains_chinese(text) or self._is_pure_number(text)): return text - # Use provided context or default context translation_context = context or self.translation_context - - # Build cache key (include prompt in cache key if provided) - cache_key_parts = [source_lang or 'auto', target_lang, translation_context] - if prompt: - cache_key_parts.append(prompt) - cache_key_parts.append(text) - cache_key = ':'.join(cache_key_parts) + cached = self._get_cached_translation_redis(text, tgt, src, translation_context, prompt) + if cached is not None: + return cached - # Check cache (include context and prompt in cache key for accuracy) - if self.use_cache and self.redis_client: - cached = self._get_cached_translation_redis(text, target_lang, source_lang, translation_context, prompt) - if cached: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{cached}' | Source: Cache hit" - ) - return cached - - # If no API key, return mock translation (for testing) - if not self.api_key: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{text}' | Source: Mock mode (no API key)" - ) - return text - - # Translate using selected model - logger.info( - f"[Translator] Translation request | Model: {self.model} | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Context: {translation_context} | " - f"Prompt: {'yes' if prompt else 'no'} | Status: Starting translation" - ) - - if self.model == 'qwen': - result = self._translate_qwen(text, target_lang, source_lang, translation_context, prompt) - else: # deepl - result = self._translate_deepl(text, target_lang, source_lang, translation_context, prompt) - - # Surface translation failure to the caller instead of silently - # masquerading the source text as a successful translation. - if result is None: - logger.warning( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Status: Translation failed" - ) - else: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{result}' | Status: Translation successful" - ) - - # Cache only successful translations. Failed attempts must not poison - # Redis with the original text. - if result is not None and self.use_cache and self.redis_client: - self._set_cached_translation_redis(text, target_lang, result, source_lang, translation_context, prompt) + result = self._translate_qwen(text, tgt, src) + if result is not None: + self._set_cached_translation_redis(text, tgt, result, src, translation_context, prompt) return result def _translate_qwen( @@ -273,412 +149,63 @@ class Translator: text: str, target_lang: str, source_lang: Optional[str], - context: Optional[str] = None, - prompt: Optional[str] = None ) -> Optional[str]: - """ - Translate using Qwen MT Flash model via Alibaba Cloud DashScope API. - - Args: - text: Text to translate - target_lang: Target language code ('zh', 'en', 'ru', etc.) - source_lang: Source language code (optional, 'auto' if None) - context: Context hint for translation (optional) - prompt: Translation prompt/instruction (optional) - - Returns: - Translated text or None if translation fails - """ - if not self.qwen_client: - logger.error("[Translator] Qwen client not initialized. Check DASHSCOPE_API_KEY.") + if not self._qwen_client: return None - - # Qwen (qwen-mt-plus/flash/turbo) supported languages mapping - # 标准来自:你提供的“语言 / 英文名 / 代码”表 - qwen_lang_map = { - "en": "English", - "zh": "Chinese", - "zh_tw": "Traditional Chinese", - "ru": "Russian", - "ja": "Japanese", - "ko": "Korean", - "es": "Spanish", - "fr": "French", - "pt": "Portuguese", - "de": "German", - "it": "Italian", - "th": "Thai", - "vi": "Vietnamese", - "id": "Indonesian", - "ms": "Malay", - "ar": "Arabic", - "hi": "Hindi", - "he": "Hebrew", - "my": "Burmese", - "ta": "Tamil", - "ur": "Urdu", - "bn": "Bengali", - "pl": "Polish", - "nl": "Dutch", - "ro": "Romanian", - "tr": "Turkish", - "km": "Khmer", - "lo": "Lao", - "yue": "Cantonese", - "cs": "Czech", - "el": "Greek", - "sv": "Swedish", - "hu": "Hungarian", - "da": "Danish", - "fi": "Finnish", - "uk": "Ukrainian", - "bg": "Bulgarian", - } - - # Convert target language - target_lang_normalized = target_lang.lower() - target_lang_qwen = qwen_lang_map.get(target_lang_normalized, target_lang.capitalize()) - - # Convert source language - source_lang_normalized = (source_lang or "").strip().lower() - if not source_lang_normalized or source_lang_normalized == "auto": - source_lang_qwen = "auto" - else: - source_lang_qwen = qwen_lang_map.get(source_lang_normalized, source_lang.capitalize()) - - # Prepare translation options - translation_options = { - "source_lang": source_lang_qwen, - "target_lang": target_lang_qwen, - } - - # Prepare messages - messages = [ - { - "role": "user", - "content": text - } - ] - - start_time = time.time() + tgt_norm = (target_lang or "").strip().lower() + src_norm = (source_lang or "").strip().lower() + tgt_qwen = self.SOURCE_LANG_CODE_MAP.get(tgt_norm, tgt_norm.capitalize()) + src_qwen = "auto" if not src_norm or src_norm == "auto" else self.SOURCE_LANG_CODE_MAP.get(src_norm, src_norm.capitalize()) + start = time.time() try: - completion = self.qwen_client.chat.completions.create( - model=self.QWEN_MODEL, - messages=messages, + completion = self._qwen_client.chat.completions.create( + model=self.qwen_model_name, + messages=[{"role": "user", "content": text}], extra_body={ - "translation_options": translation_options - } - ) - - translated_text = completion.choices[0].message.content.strip() - duration_ms = (time.time() - start_time) * 1000 - - logger.info( - f"[Translator] Qwen API response success | Original text: '{text}' | Target language: {target_lang_qwen} | " - f"Translation result: '{translated_text}' | Duration: {duration_ms:.2f} ms" - ) - return translated_text - - except Exception as e: - duration_ms = (time.time() - start_time) * 1000 - logger.error( - f"[Translator] Qwen API request exception | Original text: '{text}' | Target language: {target_lang_qwen} | " - f"Duration: {duration_ms:.2f} ms | Error: {e}", exc_info=True - ) - return None - - def _translate_deepl( - self, - text: str, - target_lang: str, - source_lang: Optional[str], - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """ - Translate using DeepL API with context and glossary support. - - Args: - text: Text to translate - target_lang: Target language code - source_lang: Source language code (optional) - context: Context hint for translation (e.g., "e-commerce product search") - """ - # Map to DeepL language codes - target_code = self.LANG_CODE_MAP.get(target_lang, target_lang.upper()) - - headers = { - "Authorization": f"DeepL-Auth-Key {self.api_key}", - "Content-Type": "application/json", - } - - # Use prompt as context parameter for DeepL API (not as text prefix) - # According to DeepL API: context is "Additional context that can influence a translation but is not translated itself" - # If prompt is provided, use it as context; otherwise use the default context - api_context = prompt if prompt else context - - # For e-commerce, add context words to help DeepL understand the domain - # This is especially important for single-word ambiguous terms like "车" (car vs rook) - text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) - - payload = { - "text": [text_to_translate], - "target_lang": target_code, - } - - if source_lang: - source_code = self.LANG_CODE_MAP.get(source_lang, source_lang.upper()) - payload["source_lang"] = source_code - - # Add context parameter (prompt or default context) - # Context influences translation but is not translated itself - if api_context: - payload["context"] = api_context - - # Add glossary if configured - if self.glossary_id: - payload["glossary_id"] = self.glossary_id - - # Note: DeepL API v2 supports "context" parameter for additional context - # that influences translation but is not translated itself. - # We use prompt as context parameter when provided. - - try: - response = requests.post( - self.DEEPL_API_URL, - headers=headers, - json=payload, - timeout=self.timeout + "translation_options": { + "source_lang": src_qwen, + "target_lang": tgt_qwen, + } + }, + timeout=self.timeout, ) - - if response.status_code == 200: - data = response.json() - if "translations" in data and len(data["translations"]) > 0: - translated_text = data["translations"][0]["text"] - # If we added context, extract just the term from the result - if needs_extraction: - translated_text = self._extract_term_from_translation( - translated_text, text, target_code - ) - logger.debug( - f"[Translator] DeepL API response success | Original text: '{text}' | Target language: {target_code} | " - f"Translation result: '{translated_text}'" - ) - return translated_text - else: - logger.error( - f"[Translator] DeepL API error | Original text: '{text}' | Target language: {target_code} | " - f"Status code: {response.status_code} | Error message: {response.text}" - ) + content = (completion.choices[0].message.content or "").strip() + if not content: return None - - except requests.Timeout: + logger.info("[qwen-mt] Success | src=%s tgt=%s latency=%.1fms", src_qwen, tgt_qwen, (time.time() - start) * 1000) + return content + except Exception as exc: logger.warning( - f"[Translator] DeepL API request timeout | Original text: '{text}' | Target language: {target_code} | " - f"Timeout: {self.timeout}s" - ) - return None - except Exception as e: - logger.error( - f"[Translator] DeepL API request exception | Original text: '{text}' | Target language: {target_code} | " - f"Error: {e}", exc_info=True + "[qwen-mt] Failed | src=%s tgt=%s latency=%.1fms error=%s", + src_qwen, + tgt_qwen, + (time.time() - start) * 1000, + exc, + exc_info=True, ) return None - # NOTE: _translate_deepl_free is intentionally not implemented. - # We do not support automatic fallback to the free endpoint, to avoid - # mixing Pro keys with https://api-free.deepl.com and related 403 errors. - - def translate_multi( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - async_mode: bool = True, - prompt: Optional[str] = None - ) -> Dict[str, Optional[str]]: - """ - Translate text to multiple target languages. - - In async_mode=True (default): - - Returns cached translations immediately if available - - For translations that can be optimized (e.g., pure numbers, already in target language), - returns result immediately via synchronous call - - Launches async tasks for other missing translations (non-blocking) - - Returns None for missing translations that require async processing - - In async_mode=False: - - Waits for all translations to complete (blocking) - - Args: - text: Text to translate - target_langs: List of target language codes - source_lang: Source language code (optional) - context: Context hint for translation (optional) - async_mode: If True, return cached results immediately and translate missing ones async - prompt: Translation prompt/instruction (optional) - Returns: - Dictionary mapping language code to translated text (only cached results in async mode) - """ - results = {} - missing_langs = [] - async_langs = [] - - # First, get cached translations - for lang in target_langs: - cached = self._get_cached_translation(text, lang, source_lang, context, prompt) - if cached is not None: - results[lang] = cached - else: - missing_langs.append(lang) - - # If async mode and there are missing translations - if async_mode and missing_langs: - # Check if translation can be optimized (immediate return) - for lang in missing_langs: - target_lang = lang.lower() - # Check optimization conditions (same as in translate method) - can_optimize = False - if target_lang == 'en' and self._is_english_text(text): - can_optimize = True - elif target_lang == 'zh' and (self._contains_chinese(text) or self._is_pure_number(text)): - can_optimize = True - - if can_optimize: - # Can be optimized, call translate synchronously for immediate result - results[lang] = self.translate(text, lang, source_lang, context, prompt) - else: - # Requires actual translation, add to async list - async_langs.append(lang) - - # Launch async tasks for translations that require actual API calls - if async_langs: - for lang in async_langs: - self._translate_async(text, lang, source_lang, context, prompt) - # Return None for async translations - for lang in async_langs: - results[lang] = None - else: - # Synchronous mode: wait for all translations - for lang in missing_langs: - results[lang] = self.translate(text, lang, source_lang, context, prompt) - - return results - - def translate_multi_async( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Dict[str, Union[str, Future]]: - """ - Translate text to multiple target languages asynchronously, returning Futures that can be awaited. - - This method returns a dictionary where: - - If translation is cached, the value is the translation string (immediate) - - If translation needs to be done, the value is a Future object that can be awaited - - Args: - text: Text to translate - target_langs: List of target language codes - source_lang: Source language code (optional) - context: Context hint for translation (optional) - prompt: Translation prompt/instruction (optional) - - Returns: - Dictionary mapping language code to either translation string (cached) or Future object - """ - results = {} - missing_langs = [] - - # First, get cached translations - for lang in target_langs: - cached = self._get_cached_translation(text, lang, source_lang, context, prompt) - if cached is not None: - results[lang] = cached - else: - missing_langs.append(lang) - - # For missing translations, submit async tasks and return Futures - for lang in missing_langs: - future = self.executor.submit( - self.translate, - text, - lang, - source_lang, - context, - prompt - ) - results[lang] = future - - return results - - def _get_cached_translation( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """Get translation from cache if available.""" - if not self.redis_client: - return None - return self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) - def _get_cached_translation_redis( self, text: str, target_lang: str, source_lang: Optional[str] = None, context: Optional[str] = None, - prompt: Optional[str] = None + prompt: Optional[str] = None, ) -> Optional[str]: - """ - Get translation from Redis cache with sliding expiration. - - 滑动过期机制:每次访问缓存时,重置过期时间为配置的过期时间(默认720天)。 - 这样缓存会在最后一次访问后的720天才过期,而不是写入后的720天。 - 这确保了常用的翻译缓存不会被过早删除。 - """ if not self.redis_client: return None - + key = self._build_cache_key(text, target_lang, source_lang, context, prompt) try: - # Build cache key: prefix:target_lang:text - # For simplicity, we use target_lang and text as key - # Context and prompt are not included in key to maximize cache hits - cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" - value = self.redis_client.get(cache_key) - if value: - # Sliding expiration: reset expiration time on access - # 每次读取缓存时,重置过期时间为配置的过期时间(最后一次访问后的N天才过期) - try: - self.redis_client.expire(cache_key, self.expire_seconds) - except Exception as expire_error: - # 即使 expire 失败,也返回缓存值(不影响功能) - logger.warning( - f"[Translator] Failed to update cache expiration for key {cache_key}: {expire_error}" - ) - - logger.debug( - f"[Translator] Redis cache hit | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key} | Translation result: '{value}' | TTL reset to {self.expire_seconds}s" - ) - return value - logger.debug( - f"[Translator] Redis cache miss | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key}" - ) + value = self.redis_client.get(key) + if value and self.cache_sliding_expiration: + self.redis_client.expire(key, self.expire_seconds) + return value + except Exception as exc: + logger.warning("Redis get translation cache failed: %s", exc) return None - except Exception as e: - logger.error(f"[Translator] Redis error during get translation cache | Original text: '{text}' | Target language: {target_lang} | Error: {e}") - return None - + def _set_cached_translation_redis( self, text: str, @@ -686,128 +213,17 @@ class Translator: translation: str, source_lang: Optional[str] = None, context: Optional[str] = None, - prompt: Optional[str] = None + prompt: Optional[str] = None, ) -> None: - """Store translation in Redis cache.""" if not self.redis_client: return - + key = self._build_cache_key(text, target_lang, source_lang, context, prompt) try: - cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" - self.redis_client.setex(cache_key, self.expire_seconds, translation) - logger.info( - f"[Translator] Redis cache write | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key} | Translation result: '{translation}'" - ) - except Exception as e: - logger.error( - f"[Translator] Redis cache write failed | Original text: '{text}' | Target language: {target_lang} | " - f"Error: {e}" - ) - - def _translate_async( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ): - """Launch async translation task.""" - def _do_translate(): - try: - result = self.translate(text, target_lang, source_lang, context, prompt) - if result: - logger.debug(f"Async translation completed: {text} -> {target_lang}: {result}") - except Exception as e: - logger.warning(f"Async translation failed: {text} -> {target_lang}: {e}") - - self.executor.submit(_do_translate) - - def _add_ecommerce_context( - self, - text: str, - source_lang: Optional[str], - context: Optional[str] - ) -> tuple: - """ - Add e-commerce context to text for better disambiguation. - - For single-word ambiguous Chinese terms, we add context words that help - DeepL understand this is an e-commerce/product search context. - - Args: - text: Original text to translate - source_lang: Source language code - context: Context hint - - Returns: - Tuple of (text_with_context, needs_extraction) - - text_with_context: Text to send to DeepL - - needs_extraction: Whether we need to extract the term from the result - """ - # Only apply for e-commerce context and Chinese source - if not context or "e-commerce" not in context.lower(): - return text, False - - if not source_lang or source_lang.lower() != 'zh': - return text, False - - # For single-word queries, add context to help disambiguation - text_stripped = text.strip() - if len(text_stripped.split()) == 1 and len(text_stripped) <= 2: - # Common ambiguous Chinese e-commerce terms like "车" (car vs rook) - # We add a context phrase: "购买 [term]" (buy [term]) or "商品 [term]" (product [term]) - # This helps DeepL understand the e-commerce context - # We'll need to extract just the term from the translation result - context_phrase = f"购买 {text_stripped}" - return context_phrase, True - - # For multi-word queries, DeepL usually has enough context - return text, False - - def _extract_term_from_translation( - self, - translated_text: str, - original_text: str, - target_lang_code: str - ) -> str: - """ - Extract the actual term from a translation that included context. - - For example, if we translated "购买 车" (buy car) and got "buy car", - we want to extract just "car". - - Args: - translated_text: Full translation result - original_text: Original single-word query - target_lang_code: Target language code (EN, ZH, etc.) - - Returns: - Extracted term or original translation if extraction fails - """ - # For English target, try to extract the last word (the actual term) - if target_lang_code == "EN": - words = translated_text.strip().split() - if len(words) > 1: - # Usually the last word is the term we want - # But we need to be smart - if it's "buy car", we want "car" - # Common context words to skip: buy, purchase, product, item, etc. - context_words = {"buy", "purchase", "product", "item", "commodity", "goods"} - # Try to find the term (not a context word) - for word in reversed(words): - word_lower = word.lower().rstrip('.,!?;:') - if word_lower not in context_words: - return word_lower - # If all words are context words, return the last one - return words[-1].lower().rstrip('.,!?;:') - - # For other languages or if extraction fails, return as-is - # The user can configure a glossary for better results - return translated_text + self.redis_client.setex(key, self.expire_seconds, translation) + except Exception as exc: + logger.warning("Redis set translation cache failed: %s", exc) def _shop_lang_matches(self, shop_lang_lower: str, lang_code: str) -> bool: - """True if shop language matches index language (use source, no translate).""" if not shop_lang_lower or not lang_code: return False if shop_lang_lower == lang_code: @@ -818,146 +234,27 @@ class Translator: return True return False - def translate_for_indexing( - self, - text: str, - shop_language: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - index_languages: Optional[List[str]] = None, - ) -> Dict[str, Optional[str]]: - """ - Translate text for indexing based on shop language and tenant index_languages. - - For each language in index_languages: use source text if shop language matches, - otherwise translate to that language. - - Args: - text: Text to translate - shop_language: Shop primary language (e.g. 'zh', 'en', 'ru') - source_lang: Source language code (optional) - context: Additional context for translation (optional) - prompt: Translation prompt (optional) - index_languages: Languages to index (from tenant_config). Default ["en", "zh"]. - - Returns: - Dict keyed by each index_language with translated or source text (or None). - """ - langs = index_languages if index_languages else ["en", "zh"] - results = {lang: None for lang in langs} - if not text or not text.strip(): - return results - if re.match(r'^[\d\s_-]+$', text): - logger.info(f"[Translator] Skip translation for symbol-only query: '{text}'") - return results - - shop_lang_lower = (shop_language or "").strip().lower() - targets = [] - for lang in langs: - if self._shop_lang_matches(shop_lang_lower, lang): - results[lang] = text - else: - targets.append(lang) - - for target_lang in targets: - cached = self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) - if cached: - results[target_lang] = cached - logger.debug(f"[Translator] Cache hit for indexing: '{text}' -> {target_lang}: {cached}") - continue - translated = self.translate( - text, - target_lang=target_lang, - source_lang=source_lang or shop_language, - context=context, - prompt=prompt, - ) - results[target_lang] = translated - return results - - def get_translation_needs( - self, - detected_lang: str, - supported_langs: List[str] - ) -> List[str]: - """ - Determine which languages need translation. - - Args: - detected_lang: Detected query language - supported_langs: List of supported languages - - Returns: - List of language codes to translate to - """ - # If detected language is in supported list, translate to others + def get_translation_needs(self, detected_lang: str, supported_langs: List[str]) -> List[str]: if detected_lang in supported_langs: - return [lang for lang in supported_langs if detected_lang != lang] - - # Otherwise, translate to all supported languages + return [lang for lang in supported_langs if lang != detected_lang] return supported_langs - + def _is_english_text(self, text: str) -> bool: - """ - Check if text is primarily English (ASCII letters, numbers, common punctuation). - - Args: - text: Text to check - - Returns: - True if text appears to be English - """ if not text or not text.strip(): return True - - # Remove whitespace and common punctuation - text_clean = re.sub(r'[\s\.,!?;:\-\'\"\(\)\[\]{}]', '', text) + text_clean = re.sub(r"[\s\.,!?;:\-\'\"\(\)\[\]{}]", "", text) if not text_clean: return True - - # Check if all remaining characters are ASCII (letters, numbers) - # This is a simple heuristic: if most characters are ASCII, it's likely English ascii_count = sum(1 for c in text_clean if ord(c) < 128) - ratio = ascii_count / len(text_clean) if text_clean else 0 - - # If more than 80% are ASCII characters, consider it English - return ratio > 0.8 - + return (ascii_count / len(text_clean)) > 0.8 + def _contains_chinese(self, text: str) -> bool: - """ - Check if text contains Chinese characters (Han characters). - - Args: - text: Text to check - - Returns: - True if text contains Chinese characters - """ if not text: return False - - # Check for Chinese characters (Unicode range: \u4e00-\u9fff) - chinese_pattern = re.compile(r'[\u4e00-\u9fff]') - return bool(chinese_pattern.search(text)) - + return bool(re.search(r"[\u4e00-\u9fff]", text)) + def _is_pure_number(self, text: str) -> bool: - """ - Check if text is purely numeric (digits, possibly with spaces, dots, commas). - - Args: - text: Text to check - - Returns: - True if text is purely numeric - """ if not text or not text.strip(): return False - - # Remove whitespace, dots, commas (common number separators) - text_clean = re.sub(r'[\s\.,]', '', text.strip()) - if not text_clean: - return False - - # Check if all remaining characters are digits - return text_clean.isdigit() + text_clean = re.sub(r"[\s\.,]", "", text.strip()) + return bool(text_clean) and text_clean.isdigit() diff --git a/query/test_translation.py b/query/test_translation.py index 1ce00f5..3e69676 100755 --- a/query/test_translation.py +++ b/query/test_translation.py @@ -14,6 +14,7 @@ Test content: import sys import os from pathlib import Path +from concurrent.futures import ThreadPoolExecutor # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) @@ -42,9 +43,6 @@ def test_config_loading(): print(f"✓ Configuration loaded successfully") print(f" Translation service: {config.query_config.translation_service}") - print(f" Translation prompt configuration:") - for key, value in config.query_config.translation_prompts.items(): - print(f" {key}: {value[:60]}..." if len(value) > 60 else f" {key}: {value}") return config except Exception as e: @@ -72,34 +70,23 @@ def test_translator_sync(config): translation_context=config.query_config.translation_context ) - # 测试商品标题翻译(使用product_title提示词) + # 测试商品标题翻译(使用sku_name提示词) test_texts = [ - ("蓝牙耳机", "zh", "en", "product_title"), - ("Wireless Headphones", "en", "zh", "product_title"), + ("蓝牙耳机", "zh", "en", "sku_name"), + ("Wireless Headphones", "en", "zh", "sku_name"), ] - for text, source_lang, target_lang, prompt_type in test_texts: - if prompt_type == "product_title": - if target_lang == "zh": - prompt = config.query_config.translation_prompts.get('product_title_zh') - else: - prompt = config.query_config.translation_prompts.get('product_title_en') - else: - if target_lang == "zh": - prompt = config.query_config.translation_prompts.get('default_zh') - else: - prompt = config.query_config.translation_prompts.get('default_en') - + for text, source_lang, target_lang, scene in test_texts: print(f"\nTranslation test:") print(f" Original text ({source_lang}): {text}") print(f" Target language: {target_lang}") - print(f" Prompt: {prompt[:50] if prompt else 'None'}...") + print(f" Scene: {scene}") result = translator.translate( text, target_lang=target_lang, source_lang=source_lang, - prompt=prompt + context=scene, ) if result: @@ -131,43 +118,25 @@ def test_translator_async(config, translator): query_text = "手机" target_langs = ['en'] source_lang = 'zh' - - query_prompt = config.query_config.translation_prompts.get('query_zh') - + print(f"Query text: {query_text}") print(f"Target languages: {target_langs}") - print(f"Prompt: {query_prompt}") - - # 异步模式(立即返回,后台翻译) - results = translator.translate_multi( - query_text, - target_langs, - source_lang=source_lang, - context=config.query_config.translation_context, - async_mode=True, - prompt=query_prompt - ) - - print(f"\nAsynchronous translation results:") - for lang, translation in results.items(): - if translation: - print(f" {lang}: {translation} (cache hit)") - else: - print(f" {lang}: None (translating in background...)") - - # 同步模式(等待完成) - print(f"\nSynchronous translation (waiting for completion):") - results_sync = translator.translate_multi( - query_text, - target_langs, - source_lang=source_lang, - context=config.query_config.translation_context, - async_mode=False, - prompt=query_prompt - ) + print("Scene: ecommerce_search_query") - for lang, translation in results_sync.items(): - print(f" {lang}: {translation}") + print(f"\nConcurrent translation via generic translate():") + with ThreadPoolExecutor(max_workers=len(target_langs)) as executor: + futures = { + lang: executor.submit( + translator.translate, + query_text, + lang, + source_lang, + "ecommerce_search_query", + ) + for lang in target_langs + } + for lang, future in futures.items(): + print(f" {lang}: {future.result()}") except Exception as e: print(f"✗ Asynchronous translation test failed: {e}") @@ -193,14 +162,13 @@ def test_cache(): test_text = "测试文本" target_lang = "en" source_lang = "zh" - prompt = config.query_config.translation_prompts.get('default_zh') print(f"First translation (should call API or return mock):") - result1 = translator.translate(test_text, target_lang, source_lang, prompt=prompt) + result1 = translator.translate(test_text, target_lang, source_lang, context="default") print(f" Result: {result1}") print(f"\nSecond translation (should use cache):") - result2 = translator.translate(test_text, target_lang, source_lang, prompt=prompt) + result2 = translator.translate(test_text, target_lang, source_lang, context="default") print(f" Result: {result2}") if result1 == result2: @@ -231,17 +199,16 @@ def test_context_parameter(): # 测试带context和不带context的翻译 text = "手机" - prompt = config.query_config.translation_prompts.get('query_zh') print(f"Test text: {text}") - print(f"Prompt (as context): {prompt}") + print("Scene: ecommerce_search_query") # 带context的翻译 result_with_context = translator.translate( text, target_lang='en', source_lang='zh', - prompt=prompt + context="ecommerce_search_query", ) print(f"\nTranslation result with context: {result_with_context}") diff --git a/query/translator.py b/query/translator.py deleted file mode 100644 index 77b829f..0000000 --- a/query/translator.py +++ /dev/null @@ -1,963 +0,0 @@ -""" -Translation service for multi-language query support. - -Supports multiple translation models: -- Qwen (default): Alibaba Cloud DashScope API using qwen-mt-flash model -- DeepL: DeepL API for high-quality translations - -重要说明(Qwen 机翻限速): -- 当前默认使用的 `qwen-mt-flash` 为云端机翻模型,**官方限速较低,约 RPM=60(每分钟约 60 请求)** -- 在高并发场景必须依赖 Redis 翻译缓存与批量预热,避免在用户实时请求路径上直接打满 DashScope 限流 -- 若业务侧存在大规模离线翻译或更高吞吐需求,建议评估 DeepL 或自建翻译后端 - -使用方法 (Usage): - -```python -from query.qwen_mt_translate import Translator - -# 使用默认的 qwen 模型(推荐) -translator = Translator() # 默认使用 qwen 模型 - -# 或显式指定模型 -translator = Translator(model='qwen') # 使用 qwen 模型 -translator = Translator(model='deepl') # 使用 DeepL 模型 - -# 翻译文本 -result = translator.translate( - text="我看到这个视频后没有笑", - target_lang="en", - source_lang="auto" # 自动检测源语言 -) -``` - -配置说明 (Configuration): -- Qwen 模型需要设置 DASHSCOPE_API_KEY 环境变量(在 .env 文件中) -- DeepL 模型需要设置 DEEPL_AUTH_KEY 环境变量(在 .env 文件中) - -Qwen 模型参考文档: -- 官方文档:https://help.aliyun.com/zh/model-studio/get-api-key -- 模型:qwen-mt-flash(快速翻译模型) - -DeepL 官方文档: -https://developers.deepl.com/api-reference/translate/request-translation -""" - -import os -import requests -import re -import redis -from concurrent.futures import ThreadPoolExecutor, Future -from datetime import timedelta -from typing import Dict, List, Optional, Union -import logging -import time - -logger = logging.getLogger(__name__) - -from config.env_config import DEEPL_AUTH_KEY, DASHSCOPE_API_KEY, REDIS_CONFIG -from openai import OpenAI - - -class Translator: - """ - Multi-language translator supporting Qwen and DeepL APIs. - - Default model is 'qwen' which uses Alibaba Cloud DashScope API. - """ -# 华北2(北京):https://dashscope.aliyuncs.com/compatible-mode/v1 -# 新加坡:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 -# 美国(弗吉尼亚):https://dashscope-us.aliyuncs.com/compatible-mode/v1 - - DEEPL_API_URL = "https://api.deepl.com/v2/translate" # Pro tier - QWEN_BASE_URL = "https://dashscope-us.aliyuncs.com/compatible-mode/v1" # 北京地域 - # QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" # 新加坡 - # 如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1 - QWEN_MODEL = "qwen-mt-flash" # 快速翻译模型 - - # Language code mapping - LANG_CODE_MAP = { - 'zh': 'ZH', - 'en': 'EN', - 'ru': 'RU', - 'ar': 'AR', - 'ja': 'JA', - 'es': 'ES', - 'de': 'DE', - 'fr': 'FR', - 'it': 'IT', - 'pt': 'PT', - } - - def __init__( - self, - model: str = "qwen", - api_key: Optional[str] = None, - use_cache: bool = True, - timeout: int = 10, - glossary_id: Optional[str] = None, - translation_context: Optional[str] = None - ): - """ - Initialize translator. - - Args: - model: Translation model to use. Options: 'qwen' (default) or 'deepl' - api_key: API key for the selected model (or None to use from config/env) - use_cache: Whether to cache translations - timeout: Request timeout in seconds - glossary_id: DeepL glossary ID for custom terminology (optional, only for DeepL) - translation_context: Context hint for translation (e.g., "e-commerce", "product search") - """ - self.model = model.lower() - if self.model not in ['qwen', 'deepl']: - raise ValueError(f"Unsupported model: {model}. Supported models: 'qwen', 'deepl'") - - # Get API key from config if not provided - if api_key is None: - if self.model == 'qwen': - api_key = DASHSCOPE_API_KEY or os.getenv("DASHSCOPE_API_KEY") - else: # deepl - api_key = DEEPL_AUTH_KEY or os.getenv("DEEPL_AUTH_KEY") - - self.api_key = api_key - self.timeout = timeout - self.use_cache = use_cache - self.glossary_id = glossary_id - self.translation_context = translation_context or "e-commerce product search" - - # Initialize OpenAI client for Qwen if needed - self.qwen_client = None - if self.model == 'qwen': - if not self.api_key: - logger.warning("DASHSCOPE_API_KEY not set. Qwen translation will not work.") - else: - self.qwen_client = OpenAI( - api_key=self.api_key, - base_url=self.QWEN_BASE_URL, - ) - - # Initialize Redis cache if enabled - if use_cache: - try: - self.redis_client = redis.Redis( - host=REDIS_CONFIG.get('host', 'localhost'), - port=REDIS_CONFIG.get('port', 6479), - password=REDIS_CONFIG.get('password'), - decode_responses=True, # Return str instead of bytes - socket_timeout=REDIS_CONFIG.get('socket_timeout', 1), - socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1), - retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False), - health_check_interval=10, # 避免复用坏连接 - ) - # Test connection - self.redis_client.ping() - expire_days = REDIS_CONFIG.get('translation_cache_expire_days', 360) - self.expire_time = timedelta(days=expire_days) - self.expire_seconds = int(self.expire_time.total_seconds()) # Redis 需要秒数 - self.cache_prefix = REDIS_CONFIG.get('translation_cache_prefix', 'trans') - logger.info("Redis cache initialized for translations") - except Exception as e: - logger.warning(f"Failed to initialize Redis cache: {e}, falling back to no cache") - self.redis_client = None - self.cache = None - else: - self.redis_client = None - self.cache = None - - # Thread pool for async translation - self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="translator") - - def translate( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """ - Translate text to target language (synchronous mode). - - Args: - text: Text to translate - target_lang: Target language code ('zh', 'en', 'ru', etc.) - source_lang: Source language code (option al, auto-detect if None) - context: Additional context for translation (overrides default context) - prompt: Translation prompt/instruction (optional, for better translation quality) - - Returns: - Translated text or None if translation fails - """ - if not text or not text.strip(): - return text - - # Normalize language codes - target_lang = target_lang.lower() - if source_lang: - source_lang = source_lang.lower() - - # Optimization: Skip translation if not needed - if target_lang == 'en' and self._is_english_text(text): - logger.info(f"[Translator] Text is already English, skipping translation: '{text[:50]}...'") - return text - - if target_lang == 'zh' and (self._contains_chinese(text) or self._is_pure_number(text)): - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: Skip translation (contains Chinese or pure number)" - ) - return text - - # Use provided context or default context - translation_context = context or self.translation_context - - # Build cache key (include prompt in cache key if provided) - cache_key_parts = [source_lang or 'auto', target_lang, translation_context] - if prompt: - cache_key_parts.append(prompt) - cache_key_parts.append(text) - cache_key = ':'.join(cache_key_parts) - - # Check cache (include context and prompt in cache key for accuracy) - if self.use_cache and self.redis_client: - cached = self._get_cached_translation_redis(text, target_lang, source_lang, translation_context, prompt) - if cached: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{cached}' | Source: Cache hit" - ) - return cached - - # If no API key, return mock translation (for testing) - if not self.api_key: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{text}' | Source: Mock mode (no API key)" - ) - return text - - # Translate using selected model - logger.info( - f"[Translator] Translation request | Model: {self.model} | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Context: {translation_context} | " - f"Prompt: {'yes' if prompt else 'no'} | Status: Starting translation" - ) - - if self.model == 'qwen': - result = self._translate_qwen(text, target_lang, source_lang, translation_context, prompt) - else: # deepl - result = self._translate_deepl(text, target_lang, source_lang, translation_context, prompt) - - # Surface translation failure to the caller instead of silently - # masquerading the source text as a successful translation. - if result is None: - logger.warning( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Status: Translation failed" - ) - else: - logger.info( - f"[Translator] Translation request | Original text: '{text}' | Target language: {target_lang} | " - f"Source language: {source_lang or 'auto'} | Result: '{result}' | Status: Translation successful" - ) - - # Cache only successful translations. Failed attempts must not poison - # Redis with the original text. - if result is not None and self.use_cache and self.redis_client: - self._set_cached_translation_redis(text, target_lang, result, source_lang, translation_context, prompt) - - return result - - def _translate_qwen( - self, - text: str, - target_lang: str, - source_lang: Optional[str], - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """ - Translate using Qwen MT Flash model via Alibaba Cloud DashScope API. - - Args: - text: Text to translate - target_lang: Target language code ('zh', 'en', 'ru', etc.) - source_lang: Source language code (optional, 'auto' if None) - context: Context hint for translation (optional) - prompt: Translation prompt/instruction (optional) - - Returns: - Translated text or None if translation fails - """ - if not self.qwen_client: - logger.error("[Translator] Qwen client not initialized. Check DASHSCOPE_API_KEY.") - return None - - # Qwen (qwen-mt-plus/flash/turbo) supported languages mapping - # 标准来自:你提供的“语言 / 英文名 / 代码”表 - qwen_lang_map = { - "en": "English", - "zh": "Chinese", - "zh_tw": "Traditional Chinese", - "ru": "Russian", - "ja": "Japanese", - "ko": "Korean", - "es": "Spanish", - "fr": "French", - "pt": "Portuguese", - "de": "German", - "it": "Italian", - "th": "Thai", - "vi": "Vietnamese", - "id": "Indonesian", - "ms": "Malay", - "ar": "Arabic", - "hi": "Hindi", - "he": "Hebrew", - "my": "Burmese", - "ta": "Tamil", - "ur": "Urdu", - "bn": "Bengali", - "pl": "Polish", - "nl": "Dutch", - "ro": "Romanian", - "tr": "Turkish", - "km": "Khmer", - "lo": "Lao", - "yue": "Cantonese", - "cs": "Czech", - "el": "Greek", - "sv": "Swedish", - "hu": "Hungarian", - "da": "Danish", - "fi": "Finnish", - "uk": "Ukrainian", - "bg": "Bulgarian", - } - - # Convert target language - target_lang_normalized = target_lang.lower() - target_lang_qwen = qwen_lang_map.get(target_lang_normalized, target_lang.capitalize()) - - # Convert source language - source_lang_normalized = (source_lang or "").strip().lower() - if not source_lang_normalized or source_lang_normalized == "auto": - source_lang_qwen = "auto" - else: - source_lang_qwen = qwen_lang_map.get(source_lang_normalized, source_lang.capitalize()) - - # Prepare translation options - translation_options = { - "source_lang": source_lang_qwen, - "target_lang": target_lang_qwen, - } - - # Prepare messages - messages = [ - { - "role": "user", - "content": text - } - ] - - start_time = time.time() - try: - completion = self.qwen_client.chat.completions.create( - model=self.QWEN_MODEL, - messages=messages, - extra_body={ - "translation_options": translation_options - } - ) - - translated_text = completion.choices[0].message.content.strip() - duration_ms = (time.time() - start_time) * 1000 - - logger.info( - f"[Translator] Qwen API response success | Original text: '{text}' | Target language: {target_lang_qwen} | " - f"Translation result: '{translated_text}' | Duration: {duration_ms:.2f} ms" - ) - return translated_text - - except Exception as e: - duration_ms = (time.time() - start_time) * 1000 - logger.error( - f"[Translator] Qwen API request exception | Original text: '{text}' | Target language: {target_lang_qwen} | " - f"Duration: {duration_ms:.2f} ms | Error: {e}", exc_info=True - ) - return None - - def _translate_deepl( - self, - text: str, - target_lang: str, - source_lang: Optional[str], - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """ - Translate using DeepL API with context and glossary support. - - Args: - text: Text to translate - target_lang: Target language code - source_lang: Source language code (optional) - context: Context hint for translation (e.g., "e-commerce product search") - """ - # Map to DeepL language codes - target_code = self.LANG_CODE_MAP.get(target_lang, target_lang.upper()) - - headers = { - "Authorization": f"DeepL-Auth-Key {self.api_key}", - "Content-Type": "application/json", - } - - # Use prompt as context parameter for DeepL API (not as text prefix) - # According to DeepL API: context is "Additional context that can influence a translation but is not translated itself" - # If prompt is provided, use it as context; otherwise use the default context - api_context = prompt if prompt else context - - # For e-commerce, add context words to help DeepL understand the domain - # This is especially important for single-word ambiguous terms like "车" (car vs rook) - text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) - - payload = { - "text": [text_to_translate], - "target_lang": target_code, - } - - if source_lang: - source_code = self.LANG_CODE_MAP.get(source_lang, source_lang.upper()) - payload["source_lang"] = source_code - - # Add context parameter (prompt or default context) - # Context influences translation but is not translated itself - if api_context: - payload["context"] = api_context - - # Add glossary if configured - if self.glossary_id: - payload["glossary_id"] = self.glossary_id - - # Note: DeepL API v2 supports "context" parameter for additional context - # that influences translation but is not translated itself. - # We use prompt as context parameter when provided. - - try: - response = requests.post( - self.DEEPL_API_URL, - headers=headers, - json=payload, - timeout=self.timeout - ) - - if response.status_code == 200: - data = response.json() - if "translations" in data and len(data["translations"]) > 0: - translated_text = data["translations"][0]["text"] - # If we added context, extract just the term from the result - if needs_extraction: - translated_text = self._extract_term_from_translation( - translated_text, text, target_code - ) - logger.debug( - f"[Translator] DeepL API response success | Original text: '{text}' | Target language: {target_code} | " - f"Translation result: '{translated_text}'" - ) - return translated_text - else: - logger.error( - f"[Translator] DeepL API error | Original text: '{text}' | Target language: {target_code} | " - f"Status code: {response.status_code} | Error message: {response.text}" - ) - return None - - except requests.Timeout: - logger.warning( - f"[Translator] DeepL API request timeout | Original text: '{text}' | Target language: {target_code} | " - f"Timeout: {self.timeout}s" - ) - return None - except Exception as e: - logger.error( - f"[Translator] DeepL API request exception | Original text: '{text}' | Target language: {target_code} | " - f"Error: {e}", exc_info=True - ) - return None - - # NOTE: _translate_deepl_free is intentionally not implemented. - # We do not support automatic fallback to the free endpoint, to avoid - # mixing Pro keys with https://api-free.deepl.com and related 403 errors. - - def translate_multi( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - async_mode: bool = True, - prompt: Optional[str] = None - ) -> Dict[str, Optional[str]]: - """ - Translate text to multiple target languages. - - In async_mode=True (default): - - Returns cached translations immediately if available - - For translations that can be optimized (e.g., pure numbers, already in target language), - returns result immediately via synchronous call - - Launches async tasks for other missing translations (non-blocking) - - Returns None for missing translations that require async processing - - In async_mode=False: - - Waits for all translations to complete (blocking) - - Args: - text: Text to translate - target_langs: List of target language codes - source_lang: Source language code (optional) - context: Context hint for translation (optional) - async_mode: If True, return cached results immediately and translate missing ones async - prompt: Translation prompt/instruction (optional) - - Returns: - Dictionary mapping language code to translated text (only cached results in async mode) - """ - results = {} - missing_langs = [] - async_langs = [] - - # First, get cached translations - for lang in target_langs: - cached = self._get_cached_translation(text, lang, source_lang, context, prompt) - if cached is not None: - results[lang] = cached - else: - missing_langs.append(lang) - - # If async mode and there are missing translations - if async_mode and missing_langs: - # Check if translation can be optimized (immediate return) - for lang in missing_langs: - target_lang = lang.lower() - # Check optimization conditions (same as in translate method) - can_optimize = False - if target_lang == 'en' and self._is_english_text(text): - can_optimize = True - elif target_lang == 'zh' and (self._contains_chinese(text) or self._is_pure_number(text)): - can_optimize = True - - if can_optimize: - # Can be optimized, call translate synchronously for immediate result - results[lang] = self.translate(text, lang, source_lang, context, prompt) - else: - # Requires actual translation, add to async list - async_langs.append(lang) - - # Launch async tasks for translations that require actual API calls - if async_langs: - for lang in async_langs: - self._translate_async(text, lang, source_lang, context, prompt) - # Return None for async translations - for lang in async_langs: - results[lang] = None - else: - # Synchronous mode: wait for all translations - for lang in missing_langs: - results[lang] = self.translate(text, lang, source_lang, context, prompt) - - return results - - def translate_multi_async( - self, - text: str, - target_langs: List[str], - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Dict[str, Union[str, Future]]: - """ - Translate text to multiple target languages asynchronously, returning Futures that can be awaited. - - This method returns a dictionary where: - - If translation is cached, the value is the translation string (immediate) - - If translation needs to be done, the value is a Future object that can be awaited - - Args: - text: Text to translate - target_langs: List of target language codes - source_lang: Source language code (optional) - context: Context hint for translation (optional) - prompt: Translation prompt/instruction (optional) - - Returns: - Dictionary mapping language code to either translation string (cached) or Future object - """ - results = {} - missing_langs = [] - - # First, get cached translations - for lang in target_langs: - cached = self._get_cached_translation(text, lang, source_lang, context, prompt) - if cached is not None: - results[lang] = cached - else: - missing_langs.append(lang) - - # For missing translations, submit async tasks and return Futures - for lang in missing_langs: - future = self.executor.submit( - self.translate, - text, - lang, - source_lang, - context, - prompt - ) - results[lang] = future - - return results - - def _get_cached_translation( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """Get translation from cache if available.""" - if not self.redis_client: - return None - return self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) - - def _get_cached_translation_redis( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> Optional[str]: - """ - Get translation from Redis cache with sliding expiration. - - 滑动过期机制:每次访问缓存时,重置过期时间为配置的过期时间(默认720天)。 - 这样缓存会在最后一次访问后的720天才过期,而不是写入后的720天。 - 这确保了常用的翻译缓存不会被过早删除。 - """ - if not self.redis_client: - return None - - try: - # Build cache key: prefix:target_lang:text - # For simplicity, we use target_lang and text as key - # Context and prompt are not included in key to maximize cache hits - cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" - value = self.redis_client.get(cache_key) - if value: - # Sliding expiration: reset expiration time on access - # 每次读取缓存时,重置过期时间为配置的过期时间(最后一次访问后的N天才过期) - try: - self.redis_client.expire(cache_key, self.expire_seconds) - except Exception as expire_error: - # 即使 expire 失败,也返回缓存值(不影响功能) - logger.warning( - f"[Translator] Failed to update cache expiration for key {cache_key}: {expire_error}" - ) - - logger.debug( - f"[Translator] Redis cache hit | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key} | Translation result: '{value}' | TTL reset to {self.expire_seconds}s" - ) - return value - logger.debug( - f"[Translator] Redis cache miss | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key}" - ) - return None - except Exception as e: - logger.error(f"[Translator] Redis error during get translation cache | Original text: '{text}' | Target language: {target_lang} | Error: {e}") - return None - - def _set_cached_translation_redis( - self, - text: str, - target_lang: str, - translation: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ) -> None: - """Store translation in Redis cache.""" - if not self.redis_client: - return - - try: - cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" - self.redis_client.setex(cache_key, self.expire_seconds, translation) - logger.info( - f"[Translator] Redis cache write | Original text: '{text}' | Target language: {target_lang} | " - f"Cache key: {cache_key} | Translation result: '{translation}'" - ) - except Exception as e: - logger.error( - f"[Translator] Redis cache write failed | Original text: '{text}' | Target language: {target_lang} | " - f"Error: {e}" - ) - - def _translate_async( - self, - text: str, - target_lang: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None - ): - """Launch async translation task.""" - def _do_translate(): - try: - result = self.translate(text, target_lang, source_lang, context, prompt) - if result: - logger.debug(f"Async translation completed: {text} -> {target_lang}: {result}") - except Exception as e: - logger.warning(f"Async translation failed: {text} -> {target_lang}: {e}") - - self.executor.submit(_do_translate) - - def _add_ecommerce_context( - self, - text: str, - source_lang: Optional[str], - context: Optional[str] - ) -> tuple: - """ - Add e-commerce context to text for better disambiguation. - - For single-word ambiguous Chinese terms, we add context words that help - DeepL understand this is an e-commerce/product search context. - - Args: - text: Original text to translate - source_lang: Source language code - context: Context hint - - Returns: - Tuple of (text_with_context, needs_extraction) - - text_with_context: Text to send to DeepL - - needs_extraction: Whether we need to extract the term from the result - """ - # Only apply for e-commerce context and Chinese source - if not context or "e-commerce" not in context.lower(): - return text, False - - if not source_lang or source_lang.lower() != 'zh': - return text, False - - # For single-word queries, add context to help disambiguation - text_stripped = text.strip() - if len(text_stripped.split()) == 1 and len(text_stripped) <= 2: - # Common ambiguous Chinese e-commerce terms like "车" (car vs rook) - # We add a context phrase: "购买 [term]" (buy [term]) or "商品 [term]" (product [term]) - # This helps DeepL understand the e-commerce context - # We'll need to extract just the term from the translation result - context_phrase = f"购买 {text_stripped}" - return context_phrase, True - - # For multi-word queries, DeepL usually has enough context - return text, False - - def _extract_term_from_translation( - self, - translated_text: str, - original_text: str, - target_lang_code: str - ) -> str: - """ - Extract the actual term from a translation that included context. - - For example, if we translated "购买 车" (buy car) and got "buy car", - we want to extract just "car". - - Args: - translated_text: Full translation result - original_text: Original single-word query - target_lang_code: Target language code (EN, ZH, etc.) - - Returns: - Extracted term or original translation if extraction fails - """ - # For English target, try to extract the last word (the actual term) - if target_lang_code == "EN": - words = translated_text.strip().split() - if len(words) > 1: - # Usually the last word is the term we want - # But we need to be smart - if it's "buy car", we want "car" - # Common context words to skip: buy, purchase, product, item, etc. - context_words = {"buy", "purchase", "product", "item", "commodity", "goods"} - # Try to find the term (not a context word) - for word in reversed(words): - word_lower = word.lower().rstrip('.,!?;:') - if word_lower not in context_words: - return word_lower - # If all words are context words, return the last one - return words[-1].lower().rstrip('.,!?;:') - - # For other languages or if extraction fails, return as-is - # The user can configure a glossary for better results - return translated_text - - def _shop_lang_matches(self, shop_lang_lower: str, lang_code: str) -> bool: - """True if shop language matches index language (use source, no translate).""" - if not shop_lang_lower or not lang_code: - return False - if shop_lang_lower == lang_code: - return True - if lang_code == "zh" and "zh" in shop_lang_lower: - return True - if lang_code == "en" and "en" in shop_lang_lower: - return True - return False - - def translate_for_indexing( - self, - text: str, - shop_language: str, - source_lang: Optional[str] = None, - context: Optional[str] = None, - prompt: Optional[str] = None, - index_languages: Optional[List[str]] = None, - ) -> Dict[str, Optional[str]]: - """ - Translate text for indexing based on shop language and tenant index_languages. - - For each language in index_languages: use source text if shop language matches, - otherwise translate to that language. - - Args: - text: Text to translate - shop_language: Shop primary language (e.g. 'zh', 'en', 'ru') - source_lang: Source language code (optional) - context: Additional context for translation (optional) - prompt: Translation prompt (optional) - index_languages: Languages to index (from tenant_config). Default ["en", "zh"]. - - Returns: - Dict keyed by each index_language with translated or source text (or None). - """ - langs = index_languages if index_languages else ["en", "zh"] - results = {lang: None for lang in langs} - if not text or not text.strip(): - return results - if re.match(r'^[\d\s_-]+$', text): - logger.info(f"[Translator] Skip translation for symbol-only query: '{text}'") - return results - - shop_lang_lower = (shop_language or "").strip().lower() - targets = [] - for lang in langs: - if self._shop_lang_matches(shop_lang_lower, lang): - results[lang] = text - else: - targets.append(lang) - - for target_lang in targets: - cached = self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) - if cached: - results[target_lang] = cached - logger.debug(f"[Translator] Cache hit for indexing: '{text}' -> {target_lang}: {cached}") - continue - translated = self.translate( - text, - target_lang=target_lang, - source_lang=source_lang or shop_language, - context=context, - prompt=prompt, - ) - results[target_lang] = translated - return results - - def get_translation_needs( - self, - detected_lang: str, - supported_langs: List[str] - ) -> List[str]: - """ - Determine which languages need translation. - - Args: - detected_lang: Detected query language - supported_langs: List of supported languages - - Returns: - List of language codes to translate to - """ - # If detected language is in supported list, translate to others - if detected_lang in supported_langs: - return [lang for lang in supported_langs if detected_lang != lang] - - # Otherwise, translate to all supported languages - return supported_langs - - def _is_english_text(self, text: str) -> bool: - """ - Check if text is primarily English (ASCII letters, numbers, common punctuation). - - Args: - text: Text to check - - Returns: - True if text appears to be English - """ - if not text or not text.strip(): - return True - - # Remove whitespace and common punctuation - text_clean = re.sub(r'[\s\.,!?;:\-\'\"\(\)\[\]{}]', '', text) - if not text_clean: - return True - - # Check if all remaining characters are ASCII (letters, numbers) - # This is a simple heuristic: if most characters are ASCII, it's likely English - ascii_count = sum(1 for c in text_clean if ord(c) < 128) - ratio = ascii_count / len(text_clean) if text_clean else 0 - - # If more than 80% are ASCII characters, consider it English - return ratio > 0.8 - - def _contains_chinese(self, text: str) -> bool: - """ - Check if text contains Chinese characters (Han characters). - - Args: - text: Text to check - - Returns: - True if text contains Chinese characters - """ - if not text: - return False - - # Check for Chinese characters (Unicode range: \u4e00-\u9fff) - chinese_pattern = re.compile(r'[\u4e00-\u9fff]') - return bool(chinese_pattern.search(text)) - - def _is_pure_number(self, text: str) -> bool: - """ - Check if text is purely numeric (digits, possibly with spaces, dots, commas). - - Args: - text: Text to check - - Returns: - True if text is purely numeric - """ - if not text or not text.strip(): - return False - - # Remove whitespace, dots, commas (common number separators) - text_clean = re.sub(r'[\s\.,]', '', text.strip()) - if not text_clean: - return False - - # Check if all remaining characters are digits - return text_clean.isdigit() diff --git a/services.translation.providers.llm b/services.translation.providers.llm new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services.translation.providers.llm diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py index 9fb135f..482a525 100644 --- a/tests/test_embedding_pipeline.py +++ b/tests/test_embedding_pipeline.py @@ -77,12 +77,10 @@ def _build_test_config() -> SearchConfig: enable_text_embedding=True, enable_query_rewrite=False, rewrite_dictionary={}, - translation_prompts={"query_zh": "e-commerce domain", "query_en": "e-commerce domain"}, text_embedding_field="title_embedding", image_embedding_field=None, ), function_score=FunctionScoreConfig(), - function_score=FunctionScoreConfig(), rerank=RerankConfig(), spu_config=SPUConfig(enabled=True, spu_field="spu_id", inner_hits_size=3), es_index_name="test_products", -- libgit2 0.21.2