From 1556989b02000fe49ebb73784be31ac1355eaaa3 Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 20 Mar 2026 14:29:57 +0800 Subject: [PATCH] query翻译等待超时逻辑 --- config/config.yaml | 5 +++++ config/loader.py | 6 ++++++ config/schema.py | 5 +++++ query/query_parser.py | 163 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------ tests/test_query_parser_mixed_language.py | 23 +++++++++++++++++++++-- 5 files changed, 116 insertions(+), 86 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 70ee0d6..abf5cb9 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -47,6 +47,11 @@ query_config: enable_text_embedding: true enable_query_rewrite: true + # 查询解析阶段:翻译与 query 向量并发执行,共用同一等待预算(毫秒)。 + # 检测语言已在租户 index_languages 内:较短;不在索引语言内:较长(翻译对召回更关键)。 + translation_embedding_wait_budget_ms_source_in_index: 80 + translation_embedding_wait_budget_ms_source_not_in_index: 200 + # 动态多语言检索字段配置 # multilingual_fields 会被拼成 title.{lang}/brief.{lang}/... 形式; # shared_fields 为无语言后缀字段。 diff --git a/config/loader.py b/config/loader.py index d9ef119..c158505 100644 --- a/config/loader.py +++ b/config/loader.py @@ -297,6 +297,12 @@ class AppConfigLoader: default_translation_model=str( query_cfg.get("default_translation_model") or "nllb-200-distilled-600m" ), + translation_embedding_wait_budget_ms_source_in_index=int( + query_cfg.get("translation_embedding_wait_budget_ms_source_in_index", 80) + ), + translation_embedding_wait_budget_ms_source_not_in_index=int( + query_cfg.get("translation_embedding_wait_budget_ms_source_not_in_index", 200) + ), ) function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {} diff --git a/config/schema.py b/config/schema.py index 6f9776a..8a03472 100644 --- a/config/schema.py +++ b/config/schema.py @@ -61,6 +61,11 @@ class QueryConfig: zh_to_en_model: str = "opus-mt-zh-en" en_to_zh_model: str = "opus-mt-en-zh" default_translation_model: str = "nllb-200-distilled-600m" + # 查询阶段:翻译与向量生成并发提交后,共用同一等待预算(毫秒)。 + # 检测语言已在租户 index_languages 内:偏快返回,预算较短。 + # 检测语言不在 index_languages 内:翻译对召回更关键,预算较长。 + translation_embedding_wait_budget_ms_source_in_index: int = 80 + translation_embedding_wait_budget_ms_source_not_in_index: int = 200 @dataclass(frozen=True) diff --git a/query/query_parser.py b/query/query_parser.py index b86f60c..0dc829b 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Any, Union import numpy as np import logging import re -from concurrent.futures import ThreadPoolExecutor, as_completed, wait +from concurrent.futures import ThreadPoolExecutor, wait from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig @@ -139,7 +139,6 @@ class QueryParser: cfg.get("default_model"), ) self._translator = create_translation_client() - self._translation_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="query-translation") @property def text_encoder(self) -> TextEmbeddingEncoder: @@ -332,11 +331,14 @@ class QueryParser: if context: context.store_intermediate_result('detected_language', detected_lang) - # Stage 4: Translation (with async support and conditional waiting) - translations = {} - translation_futures = {} - translation_executor = None + # Stage 4: Translation — always submit to thread pool; results are collected together with + # embedding in one wait() that uses a configurable budget (short vs long by source-in-index). + translations: Dict[str, str] = {} + translation_futures: Dict[str, Any] = {} + translation_executor: Optional[ThreadPoolExecutor] = None index_langs: List[str] = [] + detected_norm = str(detected_lang or "").strip().lower() + try: # 根据租户配置的 index_languages 决定翻译目标语言 from config.tenant_config_loader import get_tenant_config_loader @@ -352,59 +354,32 @@ class QueryParser: seen_langs.add(norm_lang) index_langs.append(norm_lang) - target_langs_for_translation = [lang for lang in index_langs if lang != detected_lang] + target_langs_for_translation = [lang for lang in index_langs if lang != detected_norm] if target_langs_for_translation: - target_langs = target_langs_for_translation - - if target_langs: - # Determine if we need to wait for translation results - # If detected_lang is not in index_languages, we must wait for translation - need_wait_translation = detected_lang not in index_langs - - if need_wait_translation: - translation_executor = ThreadPoolExecutor( - max_workers=max(1, min(len(target_langs), 4)), - thread_name_prefix="query-translation-wait", - ) - for lang in target_langs: - model_name = self._pick_query_translation_model(detected_lang, lang, self.config) - log_debug( - f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" - ) - translation_futures[lang] = translation_executor.submit( - self.translator.translate, - query_text, - lang, - detected_lang, - "ecommerce_search_query", - model_name, - ) - else: - for lang in target_langs: - model_name = self._pick_query_translation_model(detected_lang, lang, self.config) - log_debug( - f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" - ) - self._translation_executor.submit( - self.translator.translate, - query_text, - lang, - detected_lang, - "ecommerce_search_query", - model_name, - ) + translation_executor = ThreadPoolExecutor( + max_workers=max(1, min(len(target_langs_for_translation), 4)), + thread_name_prefix="query-translation", + ) + for lang in target_langs_for_translation: + model_name = self._pick_query_translation_model(detected_lang, lang, self.config) + log_debug( + f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" + ) + translation_futures[lang] = translation_executor.submit( + self.translator.translate, + query_text, + lang, + detected_lang, + "ecommerce_search_query", + model_name, + ) - if translations: - log_info(f"Translation completed (cache hit) | Query text: '{query_text}' | Results: {translations}") - if translation_futures: - log_debug(f"Translation in progress, waiting for results... | Query text: '{query_text}' | Languages: {list(translation_futures.keys())}") - - if context: - context.store_intermediate_result('translations', translations) - for lang, translation in translations.items(): - if translation: - context.store_intermediate_result(f'translation_{lang}', translation) + if context: + context.store_intermediate_result('translations', translations) + for lang, translation in translations.items(): + if translation: + context.store_intermediate_result(f'translation_{lang}', translation) except Exception as e: error_msg = f"Translation failed | Error: {str(e)}" @@ -458,45 +433,66 @@ class QueryParser: encoding_executor = None embedding_future = None - # Wait for all async tasks to complete (translation and embedding) + # Wait for translation + embedding concurrently; shared budget (ms) depends on whether + # the detected language is in tenant index_languages. + qc = self.config.query_config + source_in_index_for_budget = detected_norm in index_langs + budget_ms = ( + qc.translation_embedding_wait_budget_ms_source_in_index + if source_in_index_for_budget + else qc.translation_embedding_wait_budget_ms_source_not_in_index + ) + budget_sec = max(0.0, float(budget_ms) / 1000.0) + + if translation_futures: + log_info( + f"Translation+embedding shared wait budget | budget_ms={budget_ms} | " + f"source_in_index_languages={source_in_index_for_budget} | " + f"translation_targets={list(translation_futures.keys())}" + ) + if translation_futures or embedding_future: - log_debug("Waiting for async tasks to complete...") - - # Collect all futures with their identifiers - all_futures = [] - future_to_lang = {} + log_debug( + f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | " + f"source_in_index_languages={source_in_index_for_budget}" + ) + + all_futures: List[Any] = [] + future_to_lang: Dict[Any, tuple] = {} for lang, future in translation_futures.items(): all_futures.append(future) - future_to_lang[future] = ('translation', lang) - + future_to_lang[future] = ("translation", lang) + if embedding_future: all_futures.append(embedding_future) - future_to_lang[embedding_future] = ('embedding', None) - - # Enforce a hard timeout for translation-related work (300ms budget) - done, not_done = wait(all_futures, timeout=0.3) + future_to_lang[embedding_future] = ("embedding", None) + + done, not_done = wait(all_futures, timeout=budget_sec) for future in done: task_type, lang = future_to_lang[future] try: result = future.result() - if task_type == 'translation': + if task_type == "translation": if result: translations[lang] = result log_info( - f"Translation completed | Query text: '{query_text}' | Target language: {lang} | Translation result: '{result}'" + f"Translation completed | Query text: '{query_text}' | " + f"Target language: {lang} | Translation result: '{result}'" ) if context: - context.store_intermediate_result(f'translation_{lang}', result) - elif task_type == 'embedding': + context.store_intermediate_result(f"translation_{lang}", result) + elif task_type == "embedding": query_vector = result if query_vector is not None: log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") if context: - context.store_intermediate_result('query_vector_shape', query_vector.shape) + context.store_intermediate_result("query_vector_shape", query_vector.shape) else: - log_info("Query vector generation completed but result is None, will process without vector") + log_info( + "Query vector generation completed but result is None, will process without vector" + ) except Exception as e: - if task_type == 'translation': + if task_type == "translation": error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" else: error_msg = f"Query vector generation failed | Error: {str(e)}" @@ -504,30 +500,29 @@ class QueryParser: if context: context.add_warning(error_msg) - # Log timeouts for any futures that did not finish within 300ms if not_done: for future in not_done: task_type, lang = future_to_lang[future] - if task_type == 'translation': + if task_type == "translation": timeout_msg = ( - f"Translation timeout (>300ms) | Language: {lang} | " + f"Translation timeout (>{budget_ms}ms) | Language: {lang} | " f"Query text: '{query_text}'" ) else: - timeout_msg = "Query vector generation timeout (>300ms), proceeding without embedding result" + timeout_msg = ( + f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result" + ) log_info(timeout_msg) if context: context.add_warning(timeout_msg) - # Clean up encoding executor if encoding_executor: encoding_executor.shutdown(wait=False) if translation_executor: translation_executor.shutdown(wait=False) - - # Update translations in context after all are complete + if translations and context: - context.store_intermediate_result('translations', translations) + context.store_intermediate_result("translations", translations) # Build language-scoped query plan: source language + available translations query_text_by_lang: Dict[str, str] = {} @@ -547,7 +542,7 @@ class QueryParser: # Use the original mixed-script query as a robust fallback probe for that language field set. query_text_by_lang[lang] = query_text - source_in_index_languages = detected_lang in index_langs + source_in_index_languages = detected_norm in index_langs ordered_search_langs: List[str] = [] seen_order = set() if detected_lang in query_text_by_lang: diff --git a/tests/test_query_parser_mixed_language.py b/tests/test_query_parser_mixed_language.py index a8b6ab0..5cf1fa5 100644 --- a/tests/test_query_parser_mixed_language.py +++ b/tests/test_query_parser_mixed_language.py @@ -39,7 +39,8 @@ def test_parse_adds_en_fields_for_mixed_chinese_query_with_meaningful_english(mo assert result.detected_language == "zh" assert "en" in result.search_langs - assert result.query_text_by_lang["en"] == "法式 dress 连衣裙" + # 翻译在预算内完成时会写入目标语言字段(优于仅用原文做 supplemental 探测) + assert result.query_text_by_lang["en"] == "法式 dress 连衣裙-en" assert result.query_text_by_lang["zh"] == "法式 dress 连衣裙" @@ -56,5 +57,23 @@ def test_parse_adds_zh_fields_for_english_query_when_cjk_present(monkeypatch): assert result.detected_language == "en" assert "zh" in result.search_langs - assert result.query_text_by_lang["zh"] == "red 连衣裙" + assert result.query_text_by_lang["zh"] == "red 连衣裙-zh" assert result.query_text_by_lang["en"] == "red 连衣裙" + + +def test_parse_waits_for_translation_when_source_in_index_languages(monkeypatch): + """en 在 index_languages 内时仍应等待并采纳 en->zh 翻译结果(与向量共用预算)。""" + parser = QueryParser(_build_config(), translator=_DummyTranslator()) + monkeypatch.setattr(parser.language_detector, "detect", lambda text: "en") + monkeypatch.setattr( + "query.query_parser.get_tenant_config_loader", + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en", "zh"]}), + raising=False, + ) + + result = parser.parse("off shoulder top", tenant_id="0", generate_vector=False) + + assert result.detected_language == "en" + assert result.translations.get("zh") == "off shoulder top-zh" + assert result.query_text_by_lang.get("zh") == "off shoulder top-zh" + assert result.source_in_index_languages is True -- libgit2 0.21.2