From db9c469ca8aa5b4741751cfb70bf891a61136cc5 Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 30 Mar 2026 21:31:04 +0800 Subject: [PATCH] log optimize --- query/query_parser.py | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------- 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/query/query_parser.py b/query/query_parser.py index 7024248..696495e 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import logging +import time from concurrent.futures import ThreadPoolExecutor, wait from embeddings.image_encoder import CLIPImageEncoder @@ -34,6 +35,44 @@ logger = logging.getLogger(__name__) import hanlp # type: ignore +def _async_enrichment_result_summary( + task_type: str, lang: Optional[str], result: Any +) -> str: + """One-line description of a completed translation/embedding task for logging.""" + if task_type == "translation": + if result: + return f"lang={lang} translated={result!r}" + return f"lang={lang} empty_translation" + if task_type in ("embedding", "image_embedding"): + if result is not None: + return f"vector_shape={tuple(result.shape)}" + return "no_vector" if task_type == "embedding" else "no_image_vector" + return f"unexpected_task_type={task_type!r}" + + +def _async_enrichment_failure_warning(task_type: str, lang: Optional[str], err: BaseException) -> str: + """Warning text aligned with historical messages for context.add_warning.""" + msg = str(err) + if task_type == "translation": + return f"Translation failed | Language: {lang} | Error: {msg}" + if task_type == "image_embedding": + return f"CLIP text query vector generation failed | Error: {msg}" + return f"Query vector generation failed | Error: {msg}" + + +def _log_async_enrichment_finished( + log_info: Callable[[str], None], + *, + task_type: str, + summary: str, + elapsed_ms: float, +) -> None: + log_info( + f"Async enrichment task finished | task_type={task_type} | " + f"summary={summary} | elapsed_ms={elapsed_ms:.1f}" + ) + + def rerank_query_text( original_query: str, *, @@ -328,6 +367,7 @@ class QueryParser: # caller decides translation targets and later search-field planning. translations: Dict[str, str] = {} future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} + future_submit_at: Dict[Any, float] = {} async_executor: Optional[ThreadPoolExecutor] = None detected_norm = str(detected_lang or "").strip().lower() normalized_targets = self._normalize_language_codes(target_languages) @@ -378,6 +418,7 @@ class QueryParser: model_name, ) future_to_task[future] = ("translation", lang) + future_submit_at[future] = time.perf_counter() if should_generate_embedding: if self.text_encoder is None: @@ -400,6 +441,7 @@ class QueryParser: future = async_executor.submit(_encode_query_vector) future_to_task[future] = ("embedding", None) + future_submit_at[future] = time.perf_counter() if should_generate_image_embedding: if self.image_encoder is None: @@ -422,6 +464,7 @@ class QueryParser: future = async_executor.submit(_encode_image_query_vector) future_to_task[future] = ("image_embedding", None) + future_submit_at[future] = time.perf_counter() except Exception as e: error_msg = f"Async query enrichment submission failed | Error: {str(e)}" log_info(error_msg) @@ -431,6 +474,7 @@ class QueryParser: async_executor.shutdown(wait=False) async_executor = None future_to_task.clear() + future_submit_at.clear() # Wait for translation + embedding concurrently; shared budget depends on whether # the detected language belongs to caller-provided target_languages. @@ -459,56 +503,45 @@ class QueryParser: done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) for future in done: task_type, lang = future_to_task[future] + t0 = future_submit_at.pop(future, None) + elapsed_ms = (time.perf_counter() - t0) * 1000.0 if t0 is not None else 0.0 try: result = future.result() if task_type == "translation": if result: translations[lang] = result - log_info( - f"Translation completed | Query text: '{query_text}' | " - f"Target language: {lang} | Translation result: '{result}'" - ) if context: context.store_intermediate_result(f"translation_{lang}", result) elif task_type == "embedding": query_vector = result - if query_vector is not None: - log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") - if context: - context.store_intermediate_result("query_vector_shape", query_vector.shape) - else: - log_info( - "Query vector generation completed but result is None, will process without vector" - ) + if query_vector is not None and context: + context.store_intermediate_result("query_vector_shape", query_vector.shape) elif task_type == "image_embedding": image_query_vector = result - if image_query_vector is not None: - log_debug( - f"CLIP text query vector generation completed | Shape: {image_query_vector.shape}" - ) - if context: - context.store_intermediate_result( - "image_query_vector_shape", - image_query_vector.shape, - ) - else: - log_info( - "CLIP text query vector generation completed but result is None, " - "will process without image vector" + if image_query_vector is not None and context: + context.store_intermediate_result( + "image_query_vector_shape", + image_query_vector.shape, ) + _log_async_enrichment_finished( + log_info, + task_type=task_type, + summary=_async_enrichment_result_summary(task_type, lang, result), + elapsed_ms=elapsed_ms, + ) except Exception as e: - if task_type == "translation": - error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" - elif task_type == "image_embedding": - error_msg = f"CLIP text query vector generation failed | Error: {str(e)}" - else: - error_msg = f"Query vector generation failed | Error: {str(e)}" - log_info(error_msg) + _log_async_enrichment_finished( + log_info, + task_type=task_type, + summary=f"error={e!s}", + elapsed_ms=elapsed_ms, + ) if context: - context.add_warning(error_msg) + context.add_warning(_async_enrichment_failure_warning(task_type, lang, e)) if not_done: for future in not_done: + future_submit_at.pop(future, None) task_type, lang = future_to_task[future] if task_type == "translation": timeout_msg = ( -- libgit2 0.21.2