Commit db9c469ca8aa5b4741751cfb70bf891a61136cc5
1 parent
de98daa3
log optimize
Showing
1 changed file
with
66 additions
and
33 deletions
Show diff stats
query/query_parser.py
| @@ -12,6 +12,7 @@ from dataclasses import dataclass, field | @@ -12,6 +12,7 @@ from dataclasses import dataclass, field | ||
| 12 | from typing import Any, Callable, Dict, List, Optional, Tuple | 12 | from typing import Any, Callable, Dict, List, Optional, Tuple |
| 13 | import numpy as np | 13 | import numpy as np |
| 14 | import logging | 14 | import logging |
| 15 | +import time | ||
| 15 | from concurrent.futures import ThreadPoolExecutor, wait | 16 | from concurrent.futures import ThreadPoolExecutor, wait |
| 16 | 17 | ||
| 17 | from embeddings.image_encoder import CLIPImageEncoder | 18 | from embeddings.image_encoder import CLIPImageEncoder |
| @@ -34,6 +35,44 @@ logger = logging.getLogger(__name__) | @@ -34,6 +35,44 @@ logger = logging.getLogger(__name__) | ||
| 34 | import hanlp # type: ignore | 35 | import hanlp # type: ignore |
| 35 | 36 | ||
| 36 | 37 | ||
| 38 | +def _async_enrichment_result_summary( | ||
| 39 | + task_type: str, lang: Optional[str], result: Any | ||
| 40 | +) -> str: | ||
| 41 | + """One-line description of a completed translation/embedding task for logging.""" | ||
| 42 | + if task_type == "translation": | ||
| 43 | + if result: | ||
| 44 | + return f"lang={lang} translated={result!r}" | ||
| 45 | + return f"lang={lang} empty_translation" | ||
| 46 | + if task_type in ("embedding", "image_embedding"): | ||
| 47 | + if result is not None: | ||
| 48 | + return f"vector_shape={tuple(result.shape)}" | ||
| 49 | + return "no_vector" if task_type == "embedding" else "no_image_vector" | ||
| 50 | + return f"unexpected_task_type={task_type!r}" | ||
| 51 | + | ||
| 52 | + | ||
| 53 | +def _async_enrichment_failure_warning(task_type: str, lang: Optional[str], err: BaseException) -> str: | ||
| 54 | + """Warning text aligned with historical messages for context.add_warning.""" | ||
| 55 | + msg = str(err) | ||
| 56 | + if task_type == "translation": | ||
| 57 | + return f"Translation failed | Language: {lang} | Error: {msg}" | ||
| 58 | + if task_type == "image_embedding": | ||
| 59 | + return f"CLIP text query vector generation failed | Error: {msg}" | ||
| 60 | + return f"Query vector generation failed | Error: {msg}" | ||
| 61 | + | ||
| 62 | + | ||
| 63 | +def _log_async_enrichment_finished( | ||
| 64 | + log_info: Callable[[str], None], | ||
| 65 | + *, | ||
| 66 | + task_type: str, | ||
| 67 | + summary: str, | ||
| 68 | + elapsed_ms: float, | ||
| 69 | +) -> None: | ||
| 70 | + log_info( | ||
| 71 | + f"Async enrichment task finished | task_type={task_type} | " | ||
| 72 | + f"summary={summary} | elapsed_ms={elapsed_ms:.1f}" | ||
| 73 | + ) | ||
| 74 | + | ||
| 75 | + | ||
| 37 | def rerank_query_text( | 76 | def rerank_query_text( |
| 38 | original_query: str, | 77 | original_query: str, |
| 39 | *, | 78 | *, |
| @@ -328,6 +367,7 @@ class QueryParser: | @@ -328,6 +367,7 @@ class QueryParser: | ||
| 328 | # caller decides translation targets and later search-field planning. | 367 | # caller decides translation targets and later search-field planning. |
| 329 | translations: Dict[str, str] = {} | 368 | translations: Dict[str, str] = {} |
| 330 | future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} | 369 | future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} |
| 370 | + future_submit_at: Dict[Any, float] = {} | ||
| 331 | async_executor: Optional[ThreadPoolExecutor] = None | 371 | async_executor: Optional[ThreadPoolExecutor] = None |
| 332 | detected_norm = str(detected_lang or "").strip().lower() | 372 | detected_norm = str(detected_lang or "").strip().lower() |
| 333 | normalized_targets = self._normalize_language_codes(target_languages) | 373 | normalized_targets = self._normalize_language_codes(target_languages) |
| @@ -378,6 +418,7 @@ class QueryParser: | @@ -378,6 +418,7 @@ class QueryParser: | ||
| 378 | model_name, | 418 | model_name, |
| 379 | ) | 419 | ) |
| 380 | future_to_task[future] = ("translation", lang) | 420 | future_to_task[future] = ("translation", lang) |
| 421 | + future_submit_at[future] = time.perf_counter() | ||
| 381 | 422 | ||
| 382 | if should_generate_embedding: | 423 | if should_generate_embedding: |
| 383 | if self.text_encoder is None: | 424 | if self.text_encoder is None: |
| @@ -400,6 +441,7 @@ class QueryParser: | @@ -400,6 +441,7 @@ class QueryParser: | ||
| 400 | 441 | ||
| 401 | future = async_executor.submit(_encode_query_vector) | 442 | future = async_executor.submit(_encode_query_vector) |
| 402 | future_to_task[future] = ("embedding", None) | 443 | future_to_task[future] = ("embedding", None) |
| 444 | + future_submit_at[future] = time.perf_counter() | ||
| 403 | 445 | ||
| 404 | if should_generate_image_embedding: | 446 | if should_generate_image_embedding: |
| 405 | if self.image_encoder is None: | 447 | if self.image_encoder is None: |
| @@ -422,6 +464,7 @@ class QueryParser: | @@ -422,6 +464,7 @@ class QueryParser: | ||
| 422 | 464 | ||
| 423 | future = async_executor.submit(_encode_image_query_vector) | 465 | future = async_executor.submit(_encode_image_query_vector) |
| 424 | future_to_task[future] = ("image_embedding", None) | 466 | future_to_task[future] = ("image_embedding", None) |
| 467 | + future_submit_at[future] = time.perf_counter() | ||
| 425 | except Exception as e: | 468 | except Exception as e: |
| 426 | error_msg = f"Async query enrichment submission failed | Error: {str(e)}" | 469 | error_msg = f"Async query enrichment submission failed | Error: {str(e)}" |
| 427 | log_info(error_msg) | 470 | log_info(error_msg) |
| @@ -431,6 +474,7 @@ class QueryParser: | @@ -431,6 +474,7 @@ class QueryParser: | ||
| 431 | async_executor.shutdown(wait=False) | 474 | async_executor.shutdown(wait=False) |
| 432 | async_executor = None | 475 | async_executor = None |
| 433 | future_to_task.clear() | 476 | future_to_task.clear() |
| 477 | + future_submit_at.clear() | ||
| 434 | 478 | ||
| 435 | # Wait for translation + embedding concurrently; shared budget depends on whether | 479 | # Wait for translation + embedding concurrently; shared budget depends on whether |
| 436 | # the detected language belongs to caller-provided target_languages. | 480 | # the detected language belongs to caller-provided target_languages. |
| @@ -459,56 +503,45 @@ class QueryParser: | @@ -459,56 +503,45 @@ class QueryParser: | ||
| 459 | done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) | 503 | done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) |
| 460 | for future in done: | 504 | for future in done: |
| 461 | task_type, lang = future_to_task[future] | 505 | task_type, lang = future_to_task[future] |
| 506 | + t0 = future_submit_at.pop(future, None) | ||
| 507 | + elapsed_ms = (time.perf_counter() - t0) * 1000.0 if t0 is not None else 0.0 | ||
| 462 | try: | 508 | try: |
| 463 | result = future.result() | 509 | result = future.result() |
| 464 | if task_type == "translation": | 510 | if task_type == "translation": |
| 465 | if result: | 511 | if result: |
| 466 | translations[lang] = result | 512 | translations[lang] = result |
| 467 | - log_info( | ||
| 468 | - f"Translation completed | Query text: '{query_text}' | " | ||
| 469 | - f"Target language: {lang} | Translation result: '{result}'" | ||
| 470 | - ) | ||
| 471 | if context: | 513 | if context: |
| 472 | context.store_intermediate_result(f"translation_{lang}", result) | 514 | context.store_intermediate_result(f"translation_{lang}", result) |
| 473 | elif task_type == "embedding": | 515 | elif task_type == "embedding": |
| 474 | query_vector = result | 516 | query_vector = result |
| 475 | - if query_vector is not None: | ||
| 476 | - log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") | ||
| 477 | - if context: | ||
| 478 | - context.store_intermediate_result("query_vector_shape", query_vector.shape) | ||
| 479 | - else: | ||
| 480 | - log_info( | ||
| 481 | - "Query vector generation completed but result is None, will process without vector" | ||
| 482 | - ) | 517 | + if query_vector is not None and context: |
| 518 | + context.store_intermediate_result("query_vector_shape", query_vector.shape) | ||
| 483 | elif task_type == "image_embedding": | 519 | elif task_type == "image_embedding": |
| 484 | image_query_vector = result | 520 | image_query_vector = result |
| 485 | - if image_query_vector is not None: | ||
| 486 | - log_debug( | ||
| 487 | - f"CLIP text query vector generation completed | Shape: {image_query_vector.shape}" | ||
| 488 | - ) | ||
| 489 | - if context: | ||
| 490 | - context.store_intermediate_result( | ||
| 491 | - "image_query_vector_shape", | ||
| 492 | - image_query_vector.shape, | ||
| 493 | - ) | ||
| 494 | - else: | ||
| 495 | - log_info( | ||
| 496 | - "CLIP text query vector generation completed but result is None, " | ||
| 497 | - "will process without image vector" | 521 | + if image_query_vector is not None and context: |
| 522 | + context.store_intermediate_result( | ||
| 523 | + "image_query_vector_shape", | ||
| 524 | + image_query_vector.shape, | ||
| 498 | ) | 525 | ) |
| 526 | + _log_async_enrichment_finished( | ||
| 527 | + log_info, | ||
| 528 | + task_type=task_type, | ||
| 529 | + summary=_async_enrichment_result_summary(task_type, lang, result), | ||
| 530 | + elapsed_ms=elapsed_ms, | ||
| 531 | + ) | ||
| 499 | except Exception as e: | 532 | except Exception as e: |
| 500 | - if task_type == "translation": | ||
| 501 | - error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" | ||
| 502 | - elif task_type == "image_embedding": | ||
| 503 | - error_msg = f"CLIP text query vector generation failed | Error: {str(e)}" | ||
| 504 | - else: | ||
| 505 | - error_msg = f"Query vector generation failed | Error: {str(e)}" | ||
| 506 | - log_info(error_msg) | 533 | + _log_async_enrichment_finished( |
| 534 | + log_info, | ||
| 535 | + task_type=task_type, | ||
| 536 | + summary=f"error={e!s}", | ||
| 537 | + elapsed_ms=elapsed_ms, | ||
| 538 | + ) | ||
| 507 | if context: | 539 | if context: |
| 508 | - context.add_warning(error_msg) | 540 | + context.add_warning(_async_enrichment_failure_warning(task_type, lang, e)) |
| 509 | 541 | ||
| 510 | if not_done: | 542 | if not_done: |
| 511 | for future in not_done: | 543 | for future in not_done: |
| 544 | + future_submit_at.pop(future, None) | ||
| 512 | task_type, lang = future_to_task[future] | 545 | task_type, lang = future_to_task[future] |
| 513 | if task_type == "translation": | 546 | if task_type == "translation": |
| 514 | timeout_msg = ( | 547 | timeout_msg = ( |