Commit 1556989b02000fe49ebb73784be31ac1355eaaa3

Authored by tangwang
1 parent fe80e80e

query翻译等待超时逻辑

config/config.yaml
@@ -47,6 +47,11 @@ query_config: @@ -47,6 +47,11 @@ query_config:
47 enable_text_embedding: true 47 enable_text_embedding: true
48 enable_query_rewrite: true 48 enable_query_rewrite: true
49 49
  50 + # 查询解析阶段:翻译与 query 向量并发执行,共用同一等待预算(毫秒)。
  51 + # 检测语言已在租户 index_languages 内:较短;不在索引语言内:较长(翻译对召回更关键)。
  52 + translation_embedding_wait_budget_ms_source_in_index: 80
  53 + translation_embedding_wait_budget_ms_source_not_in_index: 200
  54 +
50 # 动态多语言检索字段配置 55 # 动态多语言检索字段配置
51 # multilingual_fields 会被拼成 title.{lang}/brief.{lang}/... 形式; 56 # multilingual_fields 会被拼成 title.{lang}/brief.{lang}/... 形式;
52 # shared_fields 为无语言后缀字段。 57 # shared_fields 为无语言后缀字段。
@@ -297,6 +297,12 @@ class AppConfigLoader: @@ -297,6 +297,12 @@ class AppConfigLoader:
297 default_translation_model=str( 297 default_translation_model=str(
298 query_cfg.get("default_translation_model") or "nllb-200-distilled-600m" 298 query_cfg.get("default_translation_model") or "nllb-200-distilled-600m"
299 ), 299 ),
  300 + translation_embedding_wait_budget_ms_source_in_index=int(
  301 + query_cfg.get("translation_embedding_wait_budget_ms_source_in_index", 80)
  302 + ),
  303 + translation_embedding_wait_budget_ms_source_not_in_index=int(
  304 + query_cfg.get("translation_embedding_wait_budget_ms_source_not_in_index", 200)
  305 + ),
300 ) 306 )
301 307
302 function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {} 308 function_score_cfg = raw.get("function_score") if isinstance(raw.get("function_score"), dict) else {}
@@ -61,6 +61,11 @@ class QueryConfig: @@ -61,6 +61,11 @@ class QueryConfig:
61 zh_to_en_model: str = "opus-mt-zh-en" 61 zh_to_en_model: str = "opus-mt-zh-en"
62 en_to_zh_model: str = "opus-mt-en-zh" 62 en_to_zh_model: str = "opus-mt-en-zh"
63 default_translation_model: str = "nllb-200-distilled-600m" 63 default_translation_model: str = "nllb-200-distilled-600m"
  64 + # 查询阶段:翻译与向量生成并发提交后,共用同一等待预算(毫秒)。
  65 + # 检测语言已在租户 index_languages 内:偏快返回,预算较短。
  66 + # 检测语言不在 index_languages 内:翻译对召回更关键,预算较长。
  67 + translation_embedding_wait_budget_ms_source_in_index: int = 80
  68 + translation_embedding_wait_budget_ms_source_not_in_index: int = 200
64 69
65 70
66 @dataclass(frozen=True) 71 @dataclass(frozen=True)
query/query_parser.py
@@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Any, Union @@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Any, Union
8 import numpy as np 8 import numpy as np
9 import logging 9 import logging
10 import re 10 import re
11 -from concurrent.futures import ThreadPoolExecutor, as_completed, wait 11 +from concurrent.futures import ThreadPoolExecutor, wait
12 12
13 from embeddings.text_encoder import TextEmbeddingEncoder 13 from embeddings.text_encoder import TextEmbeddingEncoder
14 from config import SearchConfig 14 from config import SearchConfig
@@ -139,7 +139,6 @@ class QueryParser: @@ -139,7 +139,6 @@ class QueryParser:
139 cfg.get("default_model"), 139 cfg.get("default_model"),
140 ) 140 )
141 self._translator = create_translation_client() 141 self._translator = create_translation_client()
142 - self._translation_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="query-translation")  
143 142
144 @property 143 @property
145 def text_encoder(self) -> TextEmbeddingEncoder: 144 def text_encoder(self) -> TextEmbeddingEncoder:
@@ -332,11 +331,14 @@ class QueryParser: @@ -332,11 +331,14 @@ class QueryParser:
332 if context: 331 if context:
333 context.store_intermediate_result('detected_language', detected_lang) 332 context.store_intermediate_result('detected_language', detected_lang)
334 333
335 - # Stage 4: Translation (with async support and conditional waiting)  
336 - translations = {}  
337 - translation_futures = {}  
338 - translation_executor = None 334 + # Stage 4: Translation — always submit to thread pool; results are collected together with
  335 + # embedding in one wait() that uses a configurable budget (short vs long by source-in-index).
  336 + translations: Dict[str, str] = {}
  337 + translation_futures: Dict[str, Any] = {}
  338 + translation_executor: Optional[ThreadPoolExecutor] = None
339 index_langs: List[str] = [] 339 index_langs: List[str] = []
  340 + detected_norm = str(detected_lang or "").strip().lower()
  341 +
340 try: 342 try:
341 # 根据租户配置的 index_languages 决定翻译目标语言 343 # 根据租户配置的 index_languages 决定翻译目标语言
342 from config.tenant_config_loader import get_tenant_config_loader 344 from config.tenant_config_loader import get_tenant_config_loader
@@ -352,59 +354,32 @@ class QueryParser: @@ -352,59 +354,32 @@ class QueryParser:
352 seen_langs.add(norm_lang) 354 seen_langs.add(norm_lang)
353 index_langs.append(norm_lang) 355 index_langs.append(norm_lang)
354 356
355 - target_langs_for_translation = [lang for lang in index_langs if lang != detected_lang] 357 + target_langs_for_translation = [lang for lang in index_langs if lang != detected_norm]
356 358
357 if target_langs_for_translation: 359 if target_langs_for_translation:
358 - target_langs = target_langs_for_translation  
359 -  
360 - if target_langs:  
361 - # Determine if we need to wait for translation results  
362 - # If detected_lang is not in index_languages, we must wait for translation  
363 - need_wait_translation = detected_lang not in index_langs  
364 -  
365 - if need_wait_translation:  
366 - translation_executor = ThreadPoolExecutor(  
367 - max_workers=max(1, min(len(target_langs), 4)),  
368 - thread_name_prefix="query-translation-wait",  
369 - )  
370 - for lang in target_langs:  
371 - model_name = self._pick_query_translation_model(detected_lang, lang, self.config)  
372 - log_debug(  
373 - f"Submitting query translation | source={detected_lang} target={lang} model={model_name}"  
374 - )  
375 - translation_futures[lang] = translation_executor.submit(  
376 - self.translator.translate,  
377 - query_text,  
378 - lang,  
379 - detected_lang,  
380 - "ecommerce_search_query",  
381 - model_name,  
382 - )  
383 - else:  
384 - for lang in target_langs:  
385 - model_name = self._pick_query_translation_model(detected_lang, lang, self.config)  
386 - log_debug(  
387 - f"Submitting query translation | source={detected_lang} target={lang} model={model_name}"  
388 - )  
389 - self._translation_executor.submit(  
390 - self.translator.translate,  
391 - query_text,  
392 - lang,  
393 - detected_lang,  
394 - "ecommerce_search_query",  
395 - model_name,  
396 - ) 360 + translation_executor = ThreadPoolExecutor(
  361 + max_workers=max(1, min(len(target_langs_for_translation), 4)),
  362 + thread_name_prefix="query-translation",
  363 + )
  364 + for lang in target_langs_for_translation:
  365 + model_name = self._pick_query_translation_model(detected_lang, lang, self.config)
  366 + log_debug(
  367 + f"Submitting query translation | source={detected_lang} target={lang} model={model_name}"
  368 + )
  369 + translation_futures[lang] = translation_executor.submit(
  370 + self.translator.translate,
  371 + query_text,
  372 + lang,
  373 + detected_lang,
  374 + "ecommerce_search_query",
  375 + model_name,
  376 + )
397 377
398 - if translations:  
399 - log_info(f"Translation completed (cache hit) | Query text: '{query_text}' | Results: {translations}")  
400 - if translation_futures:  
401 - log_debug(f"Translation in progress, waiting for results... | Query text: '{query_text}' | Languages: {list(translation_futures.keys())}")  
402 -  
403 - if context:  
404 - context.store_intermediate_result('translations', translations)  
405 - for lang, translation in translations.items():  
406 - if translation:  
407 - context.store_intermediate_result(f'translation_{lang}', translation) 378 + if context:
  379 + context.store_intermediate_result('translations', translations)
  380 + for lang, translation in translations.items():
  381 + if translation:
  382 + context.store_intermediate_result(f'translation_{lang}', translation)
408 383
409 except Exception as e: 384 except Exception as e:
410 error_msg = f"Translation failed | Error: {str(e)}" 385 error_msg = f"Translation failed | Error: {str(e)}"
@@ -458,45 +433,66 @@ class QueryParser: @@ -458,45 +433,66 @@ class QueryParser:
458 encoding_executor = None 433 encoding_executor = None
459 embedding_future = None 434 embedding_future = None
460 435
461 - # Wait for all async tasks to complete (translation and embedding) 436 + # Wait for translation + embedding concurrently; shared budget (ms) depends on whether
  437 + # the detected language is in tenant index_languages.
  438 + qc = self.config.query_config
  439 + source_in_index_for_budget = detected_norm in index_langs
  440 + budget_ms = (
  441 + qc.translation_embedding_wait_budget_ms_source_in_index
  442 + if source_in_index_for_budget
  443 + else qc.translation_embedding_wait_budget_ms_source_not_in_index
  444 + )
  445 + budget_sec = max(0.0, float(budget_ms) / 1000.0)
  446 +
  447 + if translation_futures:
  448 + log_info(
  449 + f"Translation+embedding shared wait budget | budget_ms={budget_ms} | "
  450 + f"source_in_index_languages={source_in_index_for_budget} | "
  451 + f"translation_targets={list(translation_futures.keys())}"
  452 + )
  453 +
462 if translation_futures or embedding_future: 454 if translation_futures or embedding_future:
463 - log_debug("Waiting for async tasks to complete...")  
464 -  
465 - # Collect all futures with their identifiers  
466 - all_futures = []  
467 - future_to_lang = {} 455 + log_debug(
  456 + f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | "
  457 + f"source_in_index_languages={source_in_index_for_budget}"
  458 + )
  459 +
  460 + all_futures: List[Any] = []
  461 + future_to_lang: Dict[Any, tuple] = {}
468 for lang, future in translation_futures.items(): 462 for lang, future in translation_futures.items():
469 all_futures.append(future) 463 all_futures.append(future)
470 - future_to_lang[future] = ('translation', lang)  
471 - 464 + future_to_lang[future] = ("translation", lang)
  465 +
472 if embedding_future: 466 if embedding_future:
473 all_futures.append(embedding_future) 467 all_futures.append(embedding_future)
474 - future_to_lang[embedding_future] = ('embedding', None)  
475 -  
476 - # Enforce a hard timeout for translation-related work (300ms budget)  
477 - done, not_done = wait(all_futures, timeout=0.3) 468 + future_to_lang[embedding_future] = ("embedding", None)
  469 +
  470 + done, not_done = wait(all_futures, timeout=budget_sec)
478 for future in done: 471 for future in done:
479 task_type, lang = future_to_lang[future] 472 task_type, lang = future_to_lang[future]
480 try: 473 try:
481 result = future.result() 474 result = future.result()
482 - if task_type == 'translation': 475 + if task_type == "translation":
483 if result: 476 if result:
484 translations[lang] = result 477 translations[lang] = result
485 log_info( 478 log_info(
486 - f"Translation completed | Query text: '{query_text}' | Target language: {lang} | Translation result: '{result}'" 479 + f"Translation completed | Query text: '{query_text}' | "
  480 + f"Target language: {lang} | Translation result: '{result}'"
487 ) 481 )
488 if context: 482 if context:
489 - context.store_intermediate_result(f'translation_{lang}', result)  
490 - elif task_type == 'embedding': 483 + context.store_intermediate_result(f"translation_{lang}", result)
  484 + elif task_type == "embedding":
491 query_vector = result 485 query_vector = result
492 if query_vector is not None: 486 if query_vector is not None:
493 log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") 487 log_debug(f"Query vector generation completed | Shape: {query_vector.shape}")
494 if context: 488 if context:
495 - context.store_intermediate_result('query_vector_shape', query_vector.shape) 489 + context.store_intermediate_result("query_vector_shape", query_vector.shape)
496 else: 490 else:
497 - log_info("Query vector generation completed but result is None, will process without vector") 491 + log_info(
  492 + "Query vector generation completed but result is None, will process without vector"
  493 + )
498 except Exception as e: 494 except Exception as e:
499 - if task_type == 'translation': 495 + if task_type == "translation":
500 error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" 496 error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}"
501 else: 497 else:
502 error_msg = f"Query vector generation failed | Error: {str(e)}" 498 error_msg = f"Query vector generation failed | Error: {str(e)}"
@@ -504,30 +500,29 @@ class QueryParser: @@ -504,30 +500,29 @@ class QueryParser:
504 if context: 500 if context:
505 context.add_warning(error_msg) 501 context.add_warning(error_msg)
506 502
507 - # Log timeouts for any futures that did not finish within 300ms  
508 if not_done: 503 if not_done:
509 for future in not_done: 504 for future in not_done:
510 task_type, lang = future_to_lang[future] 505 task_type, lang = future_to_lang[future]
511 - if task_type == 'translation': 506 + if task_type == "translation":
512 timeout_msg = ( 507 timeout_msg = (
513 - f"Translation timeout (>300ms) | Language: {lang} | " 508 + f"Translation timeout (>{budget_ms}ms) | Language: {lang} | "
514 f"Query text: '{query_text}'" 509 f"Query text: '{query_text}'"
515 ) 510 )
516 else: 511 else:
517 - timeout_msg = "Query vector generation timeout (>300ms), proceeding without embedding result" 512 + timeout_msg = (
  513 + f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result"
  514 + )
518 log_info(timeout_msg) 515 log_info(timeout_msg)
519 if context: 516 if context:
520 context.add_warning(timeout_msg) 517 context.add_warning(timeout_msg)
521 518
522 - # Clean up encoding executor  
523 if encoding_executor: 519 if encoding_executor:
524 encoding_executor.shutdown(wait=False) 520 encoding_executor.shutdown(wait=False)
525 if translation_executor: 521 if translation_executor:
526 translation_executor.shutdown(wait=False) 522 translation_executor.shutdown(wait=False)
527 -  
528 - # Update translations in context after all are complete 523 +
529 if translations and context: 524 if translations and context:
530 - context.store_intermediate_result('translations', translations) 525 + context.store_intermediate_result("translations", translations)
531 526
532 # Build language-scoped query plan: source language + available translations 527 # Build language-scoped query plan: source language + available translations
533 query_text_by_lang: Dict[str, str] = {} 528 query_text_by_lang: Dict[str, str] = {}
@@ -547,7 +542,7 @@ class QueryParser: @@ -547,7 +542,7 @@ class QueryParser:
547 # Use the original mixed-script query as a robust fallback probe for that language field set. 542 # Use the original mixed-script query as a robust fallback probe for that language field set.
548 query_text_by_lang[lang] = query_text 543 query_text_by_lang[lang] = query_text
549 544
550 - source_in_index_languages = detected_lang in index_langs 545 + source_in_index_languages = detected_norm in index_langs
551 ordered_search_langs: List[str] = [] 546 ordered_search_langs: List[str] = []
552 seen_order = set() 547 seen_order = set()
553 if detected_lang in query_text_by_lang: 548 if detected_lang in query_text_by_lang:
tests/test_query_parser_mixed_language.py
@@ -39,7 +39,8 @@ def test_parse_adds_en_fields_for_mixed_chinese_query_with_meaningful_english(mo @@ -39,7 +39,8 @@ def test_parse_adds_en_fields_for_mixed_chinese_query_with_meaningful_english(mo
39 39
40 assert result.detected_language == "zh" 40 assert result.detected_language == "zh"
41 assert "en" in result.search_langs 41 assert "en" in result.search_langs
42 - assert result.query_text_by_lang["en"] == "法式 dress 连衣裙" 42 + # 翻译在预算内完成时会写入目标语言字段(优于仅用原文做 supplemental 探测)
  43 + assert result.query_text_by_lang["en"] == "法式 dress 连衣裙-en"
43 assert result.query_text_by_lang["zh"] == "法式 dress 连衣裙" 44 assert result.query_text_by_lang["zh"] == "法式 dress 连衣裙"
44 45
45 46
@@ -56,5 +57,23 @@ def test_parse_adds_zh_fields_for_english_query_when_cjk_present(monkeypatch): @@ -56,5 +57,23 @@ def test_parse_adds_zh_fields_for_english_query_when_cjk_present(monkeypatch):
56 57
57 assert result.detected_language == "en" 58 assert result.detected_language == "en"
58 assert "zh" in result.search_langs 59 assert "zh" in result.search_langs
59 - assert result.query_text_by_lang["zh"] == "red 连衣裙" 60 + assert result.query_text_by_lang["zh"] == "red 连衣裙-zh"
60 assert result.query_text_by_lang["en"] == "red 连衣裙" 61 assert result.query_text_by_lang["en"] == "red 连衣裙"
  62 +
  63 +
  64 +def test_parse_waits_for_translation_when_source_in_index_languages(monkeypatch):
  65 + """en 在 index_languages 内时仍应等待并采纳 en->zh 翻译结果(与向量共用预算)。"""
  66 + parser = QueryParser(_build_config(), translator=_DummyTranslator())
  67 + monkeypatch.setattr(parser.language_detector, "detect", lambda text: "en")
  68 + monkeypatch.setattr(
  69 + "query.query_parser.get_tenant_config_loader",
  70 + lambda: SimpleNamespace(get_tenant_config=lambda tenant_id: {"index_languages": ["en", "zh"]}),
  71 + raising=False,
  72 + )
  73 +
  74 + result = parser.parse("off shoulder top", tenant_id="0", generate_vector=False)
  75 +
  76 + assert result.detected_language == "en"
  77 + assert result.translations.get("zh") == "off shoulder top-zh"
  78 + assert result.query_text_by_lang.get("zh") == "off shoulder top-zh"
  79 + assert result.source_in_index_languages is True