Commit 3ec5bfe65ba260743d37ccff3d9e7057d4b5f0e9

Authored by tangwang
1 parent 453992a8

1. get_translation_needs这种函数可以去除。逻辑直接写在parse函数中。

2. translate_multi 还需要提供一种调用方法,异步的,但是 可以等待结果的。
3. 如果detected_lang不是en也不是zh,这时候, 我们调用translate_multi  是需要等待结果返回的(因为是zh 或者 en 都有一个索引字段可以查,因此 本次可以不用结果,直接去走搜索,但是如果两者都不是,只能等待翻译结果。)
4. parse函数 这里可能发起一个异步的调用,下面的encode也要做成异步的,这样 encode和翻译两个异步任务的时间可以重叠,需要等待所有结果都返回。

更改
1. 去除 get_translation_needs 函数,逻辑内联到 parse 函数
在 parse 函数中(第230-234行)直接实现了 get_translation_needs 的逻辑
2. 添加 translate_multi_async 方法,支持异步等待结果
在 translator.py 中添加了 translate_multi_async 方法(第412-459行)
该方法返回字典,值为翻译字符串(缓存命中)或 Future 对象(需要等待)
3. 根据 detected_lang 决定是否需要等待翻译结果
如果 detected_lang 不是 'en' 也不是 'zh',使用 translate_multi_async 并等待结果(第245-261行)
如果是 'en' 或 'zh',使用 translate_multi 的异步模式,不等待结果(第262-273行)
4. 将 encode 和翻译改为异步并行执行
encode 使用 ThreadPoolExecutor 异步执行(第315-330行)
翻译和编码任务并行执行,使用 as_completed 等待所有结果(第332-375行)
Showing 3 changed files with 159 additions and 30 deletions   Show diff stats
prompts.md 0 → 100644
... ... @@ -0,0 +1,7 @@
  1 +
  2 +@query_parser.py (154-332)
  3 +1. get_translation_needs这种函数可以去除。逻辑直接写在parse函数中。
  4 +2. translate_multi 还需要提供一种调用方法,异步的,但是 可以等待结果的。
  5 +3. 如果detected_lang不是en也不是zh,这时候, 我们调用translate_multi 是需要等待结果返回的(因为是zh 或者 en 都有一个索引字段可以查,因此 本次可以不用结果,直接去走搜索,但是如果两者都不是,只能等待翻译结果。)
  6 +4. parse函数 这里可能发起一个异步的调用,下面的encode也要做成异步的,这样 encode和翻译两个异步任务的时间可以重叠,需要等待所有结果都返回。
  7 +
... ...
query/query_parser.py
... ... @@ -4,11 +4,12 @@ Query parser - main module for query processing.
4 4 Handles query rewriting, translation, and embedding generation.
5 5 """
6 6  
7   -from typing import Dict, List, Optional, Any
  7 +from typing import Dict, List, Optional, Any, Union
8 8 import numpy as np
9 9 import logging
10 10 import re
11 11 import hanlp
  12 +from concurrent.futures import Future, ThreadPoolExecutor, as_completed
12 13  
13 14 from embeddings import BgeEncoder
14 15 from config import SearchConfig
... ... @@ -217,42 +218,60 @@ class QueryParser:
217 218 if context:
218 219 context.store_intermediate_result('detected_language', detected_lang)
219 220  
220   - # Stage 4: Translation (async mode - only returns cached results, missing ones translated in background)
  221 + # Stage 4: Translation (with async support and conditional waiting)
221 222 translations = {}
  223 + translation_futures = {}
222 224 if self.config.query_config.enable_translation:
223 225 try:
224 226 # Determine target languages for translation
225 227 # Simplified: always translate to Chinese and English
226 228 target_langs_for_translation = ['zh', 'en']
227   -
228   - target_langs = self.translator.get_translation_needs(
229   - detected_lang,
230   - target_langs_for_translation
231   - )
  229 +
  230 + target_langs = [lang for lang in target_langs_for_translation if detected_lang != lang]
232 231  
233 232 if target_langs:
234 233 # Use e-commerce context for better disambiguation
235 234 translation_context = self.config.query_config.translation_context
236 235 # For query translation, we use a general prompt (not language-specific)
237   - # Since translate_multi uses same prompt for all languages, we use default
238 236 query_prompt = self.config.query_config.translation_prompts.get('query_zh') or \
239 237 self.config.query_config.translation_prompts.get('default_zh')
240   - # Use async mode: returns cached translations immediately, missing ones translated in background
241   - translations = self.translator.translate_multi(
242   - query_text,
243   - target_langs,
244   - source_lang=detected_lang,
245   - context=translation_context,
246   - async_mode=True,
247   - prompt=query_prompt
248   - )
249   - # Filter out None values (missing translations that are being processed async)
250   - translations = {k: v for k, v in translations.items() if v is not None}
  238 +
  239 + # Determine if we need to wait for translation results
  240 + # If detected_lang is neither 'en' nor 'zh', we must wait for translation
  241 + need_wait_translation = detected_lang not in ['en', 'zh']
  242 +
  243 + if need_wait_translation:
  244 + # Use async method that returns Futures, so we can wait for results
  245 + translation_results = self.translator.translate_multi_async(
  246 + query_text,
  247 + target_langs,
  248 + source_lang=detected_lang,
  249 + context=translation_context,
  250 + prompt=query_prompt
  251 + )
  252 + # Separate cached results and futures
  253 + for lang, result in translation_results.items():
  254 + if isinstance(result, Future):
  255 + translation_futures[lang] = result
  256 + else:
  257 + translations[lang] = result
  258 + else:
  259 + # Use async mode: returns cached translations immediately, missing ones translated in background
  260 + translations = self.translator.translate_multi(
  261 + query_text,
  262 + target_langs,
  263 + source_lang=detected_lang,
  264 + context=translation_context,
  265 + async_mode=True,
  266 + prompt=query_prompt
  267 + )
  268 + # Filter out None values (missing translations that are being processed async)
  269 + translations = {k: v for k, v in translations.items() if v is not None}
251 270  
252 271 if translations:
253 272 log_info(f"翻译完成(缓存命中) | 结果: {translations}")
254   - else:
255   - log_debug(f"翻译未命中缓存,异步翻译中...")
  273 + if translation_futures:
  274 + log_debug(f"翻译进行中,等待结果... | 语言: {list(translation_futures.keys())}")
256 275  
257 276 if context:
258 277 context.store_intermediate_result('translations', translations)
... ... @@ -279,8 +298,9 @@ class QueryParser:
279 298 context.store_intermediate_result('is_short_query', is_short_query)
280 299 context.store_intermediate_result('is_long_query', is_long_query)
281 300  
282   - # Stage 6: Text embedding (only for non-short queries)
  301 + # Stage 6: Text embedding (only for non-short queries) - async execution
283 302 query_vector = None
  303 + embedding_future = None
284 304 should_generate_embedding = (
285 305 generate_vector and
286 306 self.config.query_config.enable_text_embedding and
... ... @@ -288,18 +308,70 @@ class QueryParser:
288 308 not is_short_query
289 309 )
290 310  
  311 + encoding_executor = None
291 312 if should_generate_embedding:
292 313 try:
293   - log_debug("开始生成查询向量")
294   - query_vector = self.text_encoder.encode([query_text])[0]
295   - log_debug(f"查询向量生成完成 | 形状: {query_vector.shape}")
296   - if context:
297   - context.store_intermediate_result('query_vector_shape', query_vector.shape)
  314 + log_debug("开始生成查询向量(异步)")
  315 + # Submit encoding task to thread pool for async execution
  316 + encoding_executor = ThreadPoolExecutor(max_workers=1)
  317 + embedding_future = encoding_executor.submit(
  318 + lambda: self.text_encoder.encode([query_text])[0]
  319 + )
298 320 except Exception as e:
299   - error_msg = f"查询向量生成失败 | 错误: {str(e)}"
  321 + error_msg = f"查询向量生成任务提交失败 | 错误: {str(e)}"
300 322 log_info(error_msg)
301 323 if context:
302 324 context.add_warning(error_msg)
  325 + encoding_executor = None
  326 + embedding_future = None
  327 +
  328 + # Wait for all async tasks to complete (translation and embedding)
  329 + if translation_futures or embedding_future:
  330 + log_debug("等待异步任务完成...")
  331 +
  332 + # Collect all futures with their identifiers
  333 + all_futures = []
  334 + future_to_lang = {}
  335 + for lang, future in translation_futures.items():
  336 + all_futures.append(future)
  337 + future_to_lang[future] = ('translation', lang)
  338 +
  339 + if embedding_future:
  340 + all_futures.append(embedding_future)
  341 + future_to_lang[embedding_future] = ('embedding', None)
  342 +
  343 + # Wait for all futures to complete
  344 + for future in as_completed(all_futures):
  345 + task_type, lang = future_to_lang[future]
  346 + try:
  347 + result = future.result()
  348 + if task_type == 'translation':
  349 + if result:
  350 + translations[lang] = result
  351 + log_info(f"翻译完成 | {lang}: {result}")
  352 + if context:
  353 + context.store_intermediate_result(f'translation_{lang}', result)
  354 + elif task_type == 'embedding':
  355 + query_vector = result
  356 + log_debug(f"查询向量生成完成 | 形状: {query_vector.shape}")
  357 + if context:
  358 + context.store_intermediate_result('query_vector_shape', query_vector.shape)
  359 + except Exception as e:
  360 + if task_type == 'translation':
  361 + error_msg = f"翻译失败 | 语言: {lang} | 错误: {str(e)}"
  362 + else:
  363 + error_msg = f"查询向量生成失败 | 错误: {str(e)}"
  364 + log_info(error_msg)
  365 + if context:
  366 + context.add_warning(error_msg)
  367 +
  368 + # Clean up encoding executor
  369 + if encoding_executor:
  370 + encoding_executor.shutdown(wait=False)
  371 +
  372 + # Update translations in context after all are complete
  373 + if translations and context:
  374 + context.store_intermediate_result('translations', translations)
303 375  
304 376 # Build result
305 377 result = ParsedQuery(
... ...
query/translator.py
... ... @@ -14,9 +14,9 @@ https://developers.deepl.com/api-reference/translate/request-translation
14 14 import requests
15 15 import re
16 16 import redis
17   -from concurrent.futures import ThreadPoolExecutor
  17 +from concurrent.futures import ThreadPoolExecutor, Future
18 18 from datetime import timedelta
19   -from typing import Dict, List, Optional
  19 +from typing import Dict, List, Optional, Union
20 20 import logging
21 21  
22 22 logger = logging.getLogger(__name__)
... ... @@ -409,6 +409,56 @@ class Translator:
409 409  
410 410 return results
411 411  
  412 + def translate_multi_async(
  413 + self,
  414 + text: str,
  415 + target_langs: List[str],
  416 + source_lang: Optional[str] = None,
  417 + context: Optional[str] = None,
  418 + prompt: Optional[str] = None
  419 + ) -> Dict[str, Union[str, Future]]:
  420 + """
  421 + Translate text to multiple target languages asynchronously, returning Futures that can be awaited.
  422 +
  423 + This method returns a dictionary where:
  424 + - If translation is cached, the value is the translation string (immediate)
  425 + - If translation needs to be done, the value is a Future object that can be awaited
  426 +
  427 + Args:
  428 + text: Text to translate
  429 + target_langs: List of target language codes
  430 + source_lang: Source language code (optional)
  431 + context: Context hint for translation (optional)
  432 + prompt: Translation prompt/instruction (optional)
  433 +
  434 + Returns:
  435 + Dictionary mapping language code to either translation string (cached) or Future object
  436 + """
  437 + results = {}
  438 + missing_langs = []
  439 +
  440 + # First, get cached translations
  441 + for lang in target_langs:
  442 + cached = self._get_cached_translation(text, lang, source_lang, context, prompt)
  443 + if cached is not None:
  444 + results[lang] = cached
  445 + else:
  446 + missing_langs.append(lang)
  447 +
  448 + # For missing translations, submit async tasks and return Futures
  449 + for lang in missing_langs:
  450 + future = self.executor.submit(
  451 + self.translate,
  452 + text,
  453 + lang,
  454 + source_lang,
  455 + context,
  456 + prompt
  457 + )
  458 + results[lang] = future
  459 +
  460 + return results
  461 +
412 462 def _get_cached_translation(
413 463 self,
414 464 text: str,
... ...