diff --git a/prompts.md b/prompts.md new file mode 100644 index 0000000..774c7bb --- /dev/null +++ b/prompts.md @@ -0,0 +1,7 @@ + +@query_parser.py (154-332) +1. get_translation_needs这种函数可以去除。逻辑直接写在parse函数中。 +2. translate_multi 还需要提供一种调用方法,异步的,但是 可以等待结果的。 +3. 如果detected_lang不是en也不是zh,这时候, 我们调用translate_multi 是需要等待结果返回的(因为是zh 或者 en 都有一个索引字段可以查,因此 本次可以不用结果,直接去走搜索,但是如果两者都不是,只能等待翻译结果。) +4. parse函数 这里可能发起一个异步的调用,下面的encode也要做成异步的,这样 encode和翻译两个异步任务的时间可以重叠,需要等待所有结果都返回。 + diff --git a/query/query_parser.py b/query/query_parser.py index 898a6fe..010b458 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -4,11 +4,12 @@ Query parser - main module for query processing. Handles query rewriting, translation, and embedding generation. """ -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Union import numpy as np import logging import re import hanlp +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from embeddings import BgeEncoder from config import SearchConfig @@ -217,42 +218,60 @@ class QueryParser: if context: context.store_intermediate_result('detected_language', detected_lang) - # Stage 4: Translation (async mode - only returns cached results, missing ones translated in background) + # Stage 4: Translation (with async support and conditional waiting) translations = {} + translation_futures = {} if self.config.query_config.enable_translation: try: # Determine target languages for translation # Simplified: always translate to Chinese and English target_langs_for_translation = ['zh', 'en'] - - target_langs = self.translator.get_translation_needs( - detected_lang, - target_langs_for_translation - ) + + target_langs = [lang for lang in target_langs_for_translation if detected_lang != lang] if target_langs: # Use e-commerce context for better disambiguation translation_context = self.config.query_config.translation_context # For query translation, we use a general prompt (not language-specific) - # Since translate_multi uses same prompt for all languages, we use default query_prompt = self.config.query_config.translation_prompts.get('query_zh') or \ self.config.query_config.translation_prompts.get('default_zh') - # Use async mode: returns cached translations immediately, missing ones translated in background - translations = self.translator.translate_multi( - query_text, - target_langs, - source_lang=detected_lang, - context=translation_context, - async_mode=True, - prompt=query_prompt - ) - # Filter out None values (missing translations that are being processed async) - translations = {k: v for k, v in translations.items() if v is not None} + + # Determine if we need to wait for translation results + # If detected_lang is neither 'en' nor 'zh', we must wait for translation + need_wait_translation = detected_lang not in ['en', 'zh'] + + if need_wait_translation: + # Use async method that returns Futures, so we can wait for results + translation_results = self.translator.translate_multi_async( + query_text, + target_langs, + source_lang=detected_lang, + context=translation_context, + prompt=query_prompt + ) + # Separate cached results and futures + for lang, result in translation_results.items(): + if isinstance(result, Future): + translation_futures[lang] = result + else: + translations[lang] = result + else: + # Use async mode: returns cached translations immediately, missing ones translated in background + translations = self.translator.translate_multi( + query_text, + target_langs, + source_lang=detected_lang, + context=translation_context, + async_mode=True, + prompt=query_prompt + ) + # Filter out None values (missing translations that are being processed async) + translations = {k: v for k, v in translations.items() if v is not None} if translations: log_info(f"翻译完成(缓存命中) | 结果: {translations}") - else: - log_debug(f"翻译未命中缓存,异步翻译中...") + if translation_futures: + log_debug(f"翻译进行中,等待结果... | 语言: {list(translation_futures.keys())}") if context: context.store_intermediate_result('translations', translations) @@ -279,8 +298,9 @@ class QueryParser: context.store_intermediate_result('is_short_query', is_short_query) context.store_intermediate_result('is_long_query', is_long_query) - # Stage 6: Text embedding (only for non-short queries) + # Stage 6: Text embedding (only for non-short queries) - async execution query_vector = None + embedding_future = None should_generate_embedding = ( generate_vector and self.config.query_config.enable_text_embedding and @@ -288,18 +308,70 @@ class QueryParser: not is_short_query ) + encoding_executor = None if should_generate_embedding: try: - log_debug("开始生成查询向量") - query_vector = self.text_encoder.encode([query_text])[0] - log_debug(f"查询向量生成完成 | 形状: {query_vector.shape}") - if context: - context.store_intermediate_result('query_vector_shape', query_vector.shape) + log_debug("开始生成查询向量(异步)") + # Submit encoding task to thread pool for async execution + encoding_executor = ThreadPoolExecutor(max_workers=1) + embedding_future = encoding_executor.submit( + lambda: self.text_encoder.encode([query_text])[0] + ) except Exception as e: - error_msg = f"查询向量生成失败 | 错误: {str(e)}" + error_msg = f"查询向量生成任务提交失败 | 错误: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) + encoding_executor = None + embedding_future = None + + # Wait for all async tasks to complete (translation and embedding) + if translation_futures or embedding_future: + log_debug("等待异步任务完成...") + + # Collect all futures with their identifiers + all_futures = [] + future_to_lang = {} + for lang, future in translation_futures.items(): + all_futures.append(future) + future_to_lang[future] = ('translation', lang) + + if embedding_future: + all_futures.append(embedding_future) + future_to_lang[embedding_future] = ('embedding', None) + + # Wait for all futures to complete + for future in as_completed(all_futures): + task_type, lang = future_to_lang[future] + try: + result = future.result() + if task_type == 'translation': + if result: + translations[lang] = result + log_info(f"翻译完成 | {lang}: {result}") + if context: + context.store_intermediate_result(f'translation_{lang}', result) + elif task_type == 'embedding': + query_vector = result + log_debug(f"查询向量生成完成 | 形状: {query_vector.shape}") + if context: + context.store_intermediate_result('query_vector_shape', query_vector.shape) + except Exception as e: + if task_type == 'translation': + error_msg = f"翻译失败 | 语言: {lang} | 错误: {str(e)}" + else: + error_msg = f"查询向量生成失败 | 错误: {str(e)}" + log_info(error_msg) + if context: + context.add_warning(error_msg) + + # Clean up encoding executor + if encoding_executor: + encoding_executor.shutdown(wait=False) + + # Update translations in context after all are complete + if translations and context: + context.store_intermediate_result('translations', translations) # Build result result = ParsedQuery( diff --git a/query/translator.py b/query/translator.py index 059f72e..e193791 100644 --- a/query/translator.py +++ b/query/translator.py @@ -14,9 +14,9 @@ https://developers.deepl.com/api-reference/translate/request-translation import requests import re import redis -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future from datetime import timedelta -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union import logging logger = logging.getLogger(__name__) @@ -409,6 +409,56 @@ class Translator: return results + def translate_multi_async( + self, + text: str, + target_langs: List[str], + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None + ) -> Dict[str, Union[str, Future]]: + """ + Translate text to multiple target languages asynchronously, returning Futures that can be awaited. + + This method returns a dictionary where: + - If translation is cached, the value is the translation string (immediate) + - If translation needs to be done, the value is a Future object that can be awaited + + Args: + text: Text to translate + target_langs: List of target language codes + source_lang: Source language code (optional) + context: Context hint for translation (optional) + prompt: Translation prompt/instruction (optional) + + Returns: + Dictionary mapping language code to either translation string (cached) or Future object + """ + results = {} + missing_langs = [] + + # First, get cached translations + for lang in target_langs: + cached = self._get_cached_translation(text, lang, source_lang, context, prompt) + if cached is not None: + results[lang] = cached + else: + missing_langs.append(lang) + + # For missing translations, submit async tasks and return Futures + for lang in missing_langs: + future = self.executor.submit( + self.translate, + text, + lang, + source_lang, + context, + prompt + ) + results[lang] = future + + return results + def _get_cached_translation( self, text: str, -- libgit2 0.21.2