""" Query parser - main module for query processing. Responsibilities are intentionally narrow: - normalize and rewrite the incoming query - detect language and tokenize with HanLP - run translation and embedding requests concurrently - return parser facts, not Elasticsearch language-planning data """ from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import logging import time from concurrent.futures import ThreadPoolExecutor, wait from embeddings.image_encoder import CLIPImageEncoder from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig from translation import create_translation_client from .language_detector import LanguageDetector from .product_title_exclusion import ( ProductTitleExclusionDetector, ProductTitleExclusionProfile, ProductTitleExclusionRegistry, ) from .query_rewriter import QueryRewriter, QueryNormalizer from .style_intent import StyleIntentDetector, StyleIntentProfile, StyleIntentRegistry from .tokenization import QueryTextAnalysisCache, contains_han_text, extract_token_strings from .keyword_extractor import KeywordExtractor, collect_keywords_queries logger = logging.getLogger(__name__) import hanlp # type: ignore def _async_enrichment_result_summary( task_type: str, lang: Optional[str], result: Any ) -> str: """One-line description of a completed translation/embedding task for logging.""" if task_type == "translation": if result: return f"lang={lang} translated={result!r}" return f"lang={lang} empty_translation" if task_type in ("embedding", "image_embedding"): if result is not None: return f"vector_shape={tuple(result.shape)}" return "no_vector" if task_type == "embedding" else "no_image_vector" return f"unexpected_task_type={task_type!r}" def _async_enrichment_failure_warning(task_type: str, lang: Optional[str], err: BaseException) -> str: """Warning text aligned with historical messages for context.add_warning.""" msg = str(err) if task_type == "translation": return f"Translation failed | Language: {lang} | Error: {msg}" if task_type == "image_embedding": return f"CLIP text query vector generation failed | Error: {msg}" return f"Query vector generation failed | Error: {msg}" def _log_async_enrichment_finished( log_info: Callable[[str], None], *, task_type: str, summary: str, elapsed_ms: float, ) -> None: log_info( f"Async enrichment task finished | task_type={task_type} | " f"summary={summary} | elapsed_ms={elapsed_ms:.1f}" ) def rerank_query_text( original_query: str, *, detected_language: Optional[str] = None, translations: Optional[Dict[str, str]] = None, ) -> str: """ Text substituted for ``{query}`` when calling the reranker. Chinese and English queries use the original string. For any other detected language, prefer the English translation, then Chinese; if neither exists, fall back to the original query. """ lang = (detected_language or "").strip().lower() if lang in ("zh", "en"): return original_query trans = translations or {} for key in ("en", "zh"): t = (trans.get(key) or "").strip() if t: return t return original_query @dataclass(slots=True) class ParsedQuery: """ Container for query parser facts. ``keywords_queries`` parallels text variants: key ``base`` (see ``keyword_extractor.KEYWORDS_QUERY_BASE_KEY``) for ``rewritten_query``, and the same language codes as ``translations`` for each translated string. Entries with no extracted nouns are omitted. """ original_query: str query_normalized: str rewritten_query: str detected_language: Optional[str] = None translations: Dict[str, str] = field(default_factory=dict) query_vector: Optional[np.ndarray] = None image_query_vector: Optional[np.ndarray] = None query_tokens: List[str] = field(default_factory=list) keywords_queries: Dict[str, str] = field(default_factory=dict) style_intent_profile: Optional[StyleIntentProfile] = None product_title_exclusion_profile: Optional[ProductTitleExclusionProfile] = None _text_analysis_cache: Optional[QueryTextAnalysisCache] = field(default=None, repr=False) def text_for_rerank(self) -> str: """See :func:`rerank_query_text`.""" return rerank_query_text( self.original_query, detected_language=self.detected_language, translations=self.translations, ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "original_query": self.original_query, "query_normalized": self.query_normalized, "rewritten_query": self.rewritten_query, "detected_language": self.detected_language, "translations": self.translations, "has_query_vector": self.query_vector is not None, "has_image_query_vector": self.image_query_vector is not None, "query_tokens": self.query_tokens, "keywords_queries": dict(self.keywords_queries), "style_intent_profile": ( self.style_intent_profile.to_dict() if self.style_intent_profile is not None else None ), "product_title_exclusion_profile": ( self.product_title_exclusion_profile.to_dict() if self.product_title_exclusion_profile is not None else None ), } class QueryParser: """ Main query parser that processes queries through multiple stages: 1. Normalization 2. Query rewriting (brand/category mappings, synonyms) 3. Language detection 4. Translation to caller-provided target languages 5. Text embedding generation (for semantic search) """ def __init__( self, config: SearchConfig, text_encoder: Optional[TextEmbeddingEncoder] = None, image_encoder: Optional[CLIPImageEncoder] = None, translator: Optional[Any] = None, tokenizer: Optional[Callable[[str], Any]] = None, ): """ Initialize query parser. Args: config: SearchConfig instance text_encoder: Text embedding encoder (initialized at startup if not provided) translator: Translator instance (initialized at startup if not provided) """ self.config = config self._text_encoder = text_encoder self._image_encoder = image_encoder self._translator = translator # Initialize components self.normalizer = QueryNormalizer() self.language_detector = LanguageDetector() self.rewriter = QueryRewriter(config.query_config.rewrite_dictionary) self._tokenizer = tokenizer or self._build_tokenizer() self._keyword_extractor = KeywordExtractor(tokenizer=self._tokenizer) self.style_intent_registry = StyleIntentRegistry.from_query_config(config.query_config) self.style_intent_detector = StyleIntentDetector( self.style_intent_registry, tokenizer=self._tokenizer, ) self.product_title_exclusion_registry = ProductTitleExclusionRegistry.from_query_config( config.query_config ) self.product_title_exclusion_detector = ProductTitleExclusionDetector( self.product_title_exclusion_registry, tokenizer=self._tokenizer, ) # Eager initialization (startup-time failure visibility, no lazy init in request path) if self.config.query_config.enable_text_embedding and self._text_encoder is None: logger.info("Initializing text encoder at QueryParser construction...") self._text_encoder = TextEmbeddingEncoder() if self.config.query_config.image_embedding_field and self._image_encoder is None: logger.info("Initializing image encoder at QueryParser construction...") self._image_encoder = CLIPImageEncoder() if self._translator is None: from config.services_config import get_translation_config cfg = get_translation_config() logger.info( "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...", cfg.get("service_url"), cfg.get("default_model"), ) self._translator = create_translation_client() @property def text_encoder(self) -> TextEmbeddingEncoder: """Return pre-initialized text encoder.""" return self._text_encoder @property def translator(self) -> Any: """Return pre-initialized translator.""" return self._translator @property def image_encoder(self) -> Optional[CLIPImageEncoder]: """Return pre-initialized image encoder for CLIP text embeddings.""" return self._image_encoder def _build_tokenizer(self) -> Callable[[str], Any]: """Build the tokenizer used by query parsing. No fallback path by design.""" if hanlp is None: raise RuntimeError("HanLP is required for QueryParser tokenization") logger.info("Initializing HanLP tokenizer...") tokenizer = hanlp.load(hanlp.pretrained.tok.FINE_ELECTRA_SMALL_ZH) tokenizer.config.output_spans = True logger.info("HanLP tokenizer initialized") return tokenizer @staticmethod def _pick_query_translation_model( source_lang: str, target_lang: str, config: SearchConfig, source_language_in_index: bool, ) -> str: """Pick the translation capability for query-time translation (configurable).""" src = str(source_lang or "").strip().lower() tgt = str(target_lang or "").strip().lower() qc = config.query_config if source_language_in_index: if src == "zh" and tgt == "en": return qc.zh_to_en_model if src == "en" and tgt == "zh": return qc.en_to_zh_model return qc.default_translation_model if src == "zh" and tgt == "en": return qc.zh_to_en_model_source_not_in_index or qc.zh_to_en_model if src == "en" and tgt == "zh": return qc.en_to_zh_model_source_not_in_index or qc.en_to_zh_model return qc.default_translation_model_source_not_in_index or qc.default_translation_model @staticmethod def _normalize_language_codes(languages: Optional[List[str]]) -> List[str]: normalized: List[str] = [] seen = set() for language in languages or []: token = str(language or "").strip().lower() if not token or token in seen: continue seen.add(token) normalized.append(token) return normalized @staticmethod def _extract_tokens(tokenizer_result: Any) -> List[str]: """Normalize tokenizer output into a flat token string list.""" return extract_token_strings(tokenizer_result) def _get_query_tokens(self, query: str) -> List[str]: return self._extract_tokens(self._tokenizer(query)) @staticmethod def _is_ascii_latin_query(text: str) -> bool: candidate = str(text or "").strip() if not candidate or contains_han_text(candidate): return False try: candidate.encode("ascii") except UnicodeEncodeError: return False return any(ch.isalpha() for ch in candidate) def _detect_query_language( self, query_text: str, *, target_languages: Optional[List[str]] = None, ) -> str: normalized_targets = self._normalize_language_codes(target_languages) supported_languages = self._normalize_language_codes( getattr(self.config.query_config, "supported_languages", None) ) active_languages = normalized_targets or supported_languages if active_languages and set(active_languages).issubset({"en", "zh"}): if self._is_ascii_latin_query(query_text): return "en" return self.language_detector.detect(query_text) def parse( self, query: str, tenant_id: Optional[str] = None, generate_vector: bool = True, context: Optional[Any] = None, target_languages: Optional[List[str]] = None, ) -> ParsedQuery: """ Parse query through all processing stages. Args: query: Raw query string tenant_id: Deprecated and ignored by QueryParser. Kept temporarily to avoid a wider refactor in this first step. generate_vector: Whether to generate query embedding context: Optional request context for tracking and logging target_languages: Translation target languages decided by the caller Returns: ParsedQuery object with all processing results """ parse_t0 = time.perf_counter() # Initialize logger if context provided active_logger = context.logger if context else logger if context and hasattr(context, "logger"): context.logger.info( f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}", extra={'reqid': context.reqid, 'uid': context.uid} ) def log_info(msg): if context and hasattr(context, 'logger'): context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.info(msg) def log_debug(msg): if context and hasattr(context, 'logger'): context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.debug(msg) before_wait_t0 = time.perf_counter() # Stage 1: Normalize normalized = self.normalizer.normalize(query) log_debug(f"Normalization completed | '{query}' -> '{normalized}'") if context: context.store_intermediate_result('query_normalized', normalized) # Stage 2: Query rewriting query_text = normalized rewritten = normalized if self.config.query_config.rewrite_dictionary: # Enable rewrite if dictionary exists rewritten = self.rewriter.rewrite(query_text) if rewritten != query_text: log_info(f"Query rewritten | '{query_text}' -> '{rewritten}'") query_text = rewritten if context: context.store_intermediate_result('rewritten_query', rewritten) context.add_warning(f"Query was rewritten: {query_text}") normalized_targets = self._normalize_language_codes(target_languages) # Stage 3: Language detection detected_lang = self._detect_query_language( query_text, target_languages=normalized_targets, ) # Use default language if detection failed (None or "unknown") if not detected_lang or detected_lang == "unknown": detected_lang = self.config.query_config.default_language log_info(f"Language detection | Detected language: {detected_lang}") if context: context.store_intermediate_result('detected_language', detected_lang) text_analysis_cache = QueryTextAnalysisCache(tokenizer=self._tokenizer) for text_variant in (query, normalized, query_text): text_analysis_cache.set_language_hint(text_variant, detected_lang) # Stage 5: Translation + embedding. Parser only coordinates async enrichment work; the # caller decides translation targets and later search-field planning. translations: Dict[str, str] = {} future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} future_submit_at: Dict[Any, float] = {} async_executor: Optional[ThreadPoolExecutor] = None detected_norm = str(detected_lang or "").strip().lower() translation_targets = [lang for lang in normalized_targets if lang != detected_norm] source_language_in_index = bool(normalized_targets) and detected_norm in normalized_targets # Stage 6: Text embedding - async execution query_vector = None image_query_vector = None should_generate_embedding = ( generate_vector and self.config.query_config.enable_text_embedding ) should_generate_image_embedding = ( generate_vector and bool(self.config.query_config.image_embedding_field) ) task_count = ( len(translation_targets) + (1 if should_generate_embedding else 0) + (1 if should_generate_image_embedding else 0) ) if task_count > 0: async_executor = ThreadPoolExecutor( max_workers=max(1, min(task_count, 4)), thread_name_prefix="query-enrichment", ) try: if async_executor is not None: for lang in translation_targets: model_name = self._pick_query_translation_model( detected_lang, lang, self.config, source_language_in_index, ) log_debug( f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" ) future = async_executor.submit( self.translator.translate, query_text, lang, detected_lang, "ecommerce_search_query", model_name, ) future_to_task[future] = ("translation", lang) future_submit_at[future] = time.perf_counter() if should_generate_embedding: if self.text_encoder is None: raise RuntimeError("Text embedding is enabled but text encoder is not initialized") log_debug("Submitting query vector generation") def _encode_query_vector() -> Optional[np.ndarray]: arr = self.text_encoder.encode( [query_text], priority=1, request_id=(context.reqid if context else None), user_id=(context.uid if context else None), ) if arr is None or len(arr) == 0: return None vec = arr[0] if vec is None: return None return np.asarray(vec, dtype=np.float32) future = async_executor.submit(_encode_query_vector) future_to_task[future] = ("embedding", None) future_submit_at[future] = time.perf_counter() if should_generate_image_embedding: if self.image_encoder is None: raise RuntimeError( "Image embedding field is configured but image encoder is not initialized" ) log_debug("Submitting CLIP text query vector generation") def _encode_image_query_vector() -> Optional[np.ndarray]: vec = self.image_encoder.encode_clip_text( query_text, normalize_embeddings=True, priority=1, request_id=(context.reqid if context else None), user_id=(context.uid if context else None), ) if vec is None: return None return np.asarray(vec, dtype=np.float32) future = async_executor.submit(_encode_image_query_vector) future_to_task[future] = ("image_embedding", None) future_submit_at[future] = time.perf_counter() except Exception as e: error_msg = f"Async query enrichment submission failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if async_executor is not None: async_executor.shutdown(wait=False) async_executor = None future_to_task.clear() future_submit_at.clear() # Stage 4: Query analysis (tokenization) now overlaps with async enrichment work. query_tokenizer_result = text_analysis_cache.get_tokenizer_result(query_text) query_tokens = self._extract_tokens(query_tokenizer_result) log_debug(f"Query analysis | Query tokens: {query_tokens}") if context: context.store_intermediate_result('query_tokens', query_tokens) keywords_base_query = "" keywords_base_ms = 0.0 try: keywords_base_t0 = time.perf_counter() keywords_base_query = self._keyword_extractor.extract_keywords( query_text, language_hint=detected_lang, tokenizer_result=text_analysis_cache.get_tokenizer_result(query_text), ) keywords_base_ms = (time.perf_counter() - keywords_base_t0) * 1000.0 except Exception as e: log_info(f"Base keyword extraction failed | Error: {e}") before_wait_ms = (time.perf_counter() - before_wait_t0) * 1000.0 # Wait for translation + embedding concurrently; shared budget depends on whether # the detected language belongs to caller-provided target_languages. qc = self.config.query_config source_in_target_languages = bool(normalized_targets) and detected_norm in normalized_targets budget_ms = ( qc.translation_embedding_wait_budget_ms_source_in_index if source_in_target_languages else qc.translation_embedding_wait_budget_ms_source_not_in_index ) budget_sec = max(0.0, float(budget_ms) / 1000.0) if translation_targets: log_info( f"Translation+embedding shared wait budget | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages} | " f"translation_targets={translation_targets}" ) if future_to_task: log_debug( f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages}" ) async_wait_t0 = time.perf_counter() done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) async_wait_ms = (time.perf_counter() - async_wait_t0) * 1000.0 for future in done: task_type, lang = future_to_task[future] t0 = future_submit_at.pop(future, None) elapsed_ms = (time.perf_counter() - t0) * 1000.0 if t0 is not None else 0.0 try: result = future.result() if task_type == "translation": if result: translations[lang] = result text_analysis_cache.set_language_hint(result, lang) if context: context.store_intermediate_result(f"translation_{lang}", result) elif task_type == "embedding": query_vector = result if query_vector is not None and context: context.store_intermediate_result("query_vector_shape", query_vector.shape) elif task_type == "image_embedding": image_query_vector = result if image_query_vector is not None and context: context.store_intermediate_result( "image_query_vector_shape", image_query_vector.shape, ) _log_async_enrichment_finished( log_info, task_type=task_type, summary=_async_enrichment_result_summary(task_type, lang, result), elapsed_ms=elapsed_ms, ) except Exception as e: _log_async_enrichment_finished( log_info, task_type=task_type, summary=f"error={e!s}", elapsed_ms=elapsed_ms, ) if context: context.add_warning(_async_enrichment_failure_warning(task_type, lang, e)) if not_done: for future in not_done: future_submit_at.pop(future, None) task_type, lang = future_to_task[future] if task_type == "translation": timeout_msg = ( f"Translation timeout (>{budget_ms}ms) | Language: {lang} | " f"Query text: '{query_text}'" ) elif task_type == "image_embedding": timeout_msg = ( f"CLIP text query vector generation timeout (>{budget_ms}ms), " "proceeding without image embedding result" ) else: timeout_msg = ( f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result" ) log_info(timeout_msg) if context: context.add_warning(timeout_msg) if async_executor: async_executor.shutdown(wait=False) if translations and context: context.store_intermediate_result("translations", translations) else: async_wait_ms = 0.0 tail_sync_t0 = time.perf_counter() keywords_queries: Dict[str, str] = {} keyword_tail_ms = 0.0 try: keywords_t0 = time.perf_counter() keywords_queries = collect_keywords_queries( self._keyword_extractor, query_text, translations, source_language=detected_lang, text_analysis_cache=text_analysis_cache, base_keywords_query=keywords_base_query, ) keyword_tail_ms = (time.perf_counter() - keywords_t0) * 1000.0 if context: context.store_intermediate_result("keywords_queries", keywords_queries) log_info(f"Keyword extraction completed | keywords_queries={keywords_queries}") except Exception as e: log_info(f"Keyword extraction failed | Error: {e}") # Build result base_result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=query_text, detected_language=detected_lang, translations=translations, query_vector=query_vector, image_query_vector=image_query_vector, query_tokens=query_tokens, keywords_queries=keywords_queries, _text_analysis_cache=text_analysis_cache, ) style_intent_profile = self.style_intent_detector.detect(base_result) product_title_exclusion_profile = self.product_title_exclusion_detector.detect(base_result) tail_sync_ms = (time.perf_counter() - tail_sync_t0) * 1000.0 log_info( "Query parse stage timings | " f"before_wait_ms={before_wait_ms:.1f} | " f"async_wait_ms={async_wait_ms:.1f} | " f"base_keywords_ms={keywords_base_ms:.1f} | " f"keyword_tail_ms={keyword_tail_ms:.1f} | " f"tail_sync_ms={tail_sync_ms:.1f}" ) if context: context.store_intermediate_result( "style_intent_profile", style_intent_profile.to_dict(), ) context.store_intermediate_result( "product_title_exclusion_profile", product_title_exclusion_profile.to_dict(), ) result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=query_text, detected_language=detected_lang, translations=translations, query_vector=query_vector, image_query_vector=image_query_vector, query_tokens=query_tokens, keywords_queries=keywords_queries, style_intent_profile=style_intent_profile, product_title_exclusion_profile=product_title_exclusion_profile, _text_analysis_cache=text_analysis_cache, ) parse_total_ms = (time.perf_counter() - parse_t0) * 1000.0 completion_tail = ( f"Translation count: {len(translations)} | " f"Vector: {'yes' if query_vector is not None else 'no'} | " f"Image vector: {'yes' if image_query_vector is not None else 'no'} | " f"parse_total_ms={parse_total_ms:.1f}" ) if context and hasattr(context, 'logger'): context.logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang} | {completion_tail}", extra={'reqid': context.reqid, 'uid': context.uid} ) else: logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang} | {completion_tail}" ) return result def get_search_queries(self, parsed_query: ParsedQuery) -> List[str]: """ Get list of queries to search (original + translations). Args: parsed_query: Parsed query object Returns: List of query strings to search """ queries = [parsed_query.rewritten_query] # Add translations for lang, translation in parsed_query.translations.items(): if translation and translation != parsed_query.rewritten_query: queries.append(translation) return queries def detect_text_language_for_suggestions( text: str, *, index_languages: Optional[List[str]] = None, primary_language: str = "en", ) -> Tuple[str, float, str]: """ Language detection for short strings (mixed-language tags, query-log fallback). Uses the same ``LanguageDetector`` as :class:`QueryParser`. Returns a language code present in ``index_languages`` when possible, otherwise the tenant primary. Returns: (lang, confidence, source) where source is ``detector``, ``fallback``, or ``default``. """ langs_list = [x for x in (index_languages or []) if x] langs_set = set(langs_list) def _norm_lang(raw: Optional[str]) -> Optional[str]: if not raw: return None token = str(raw).strip().lower().replace("-", "_") if not token: return None if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] primary = _norm_lang(primary_language) or "en" if primary not in langs_set and langs_list: primary = _norm_lang(langs_list[0]) or langs_list[0] if not text or not str(text).strip(): return primary, 0.0, "default" raw_code = LanguageDetector().detect(str(text).strip()) if not raw_code or raw_code == "unknown": return primary, 0.35, "default" def _index_lang_base(cand: str) -> str: t = str(cand).strip().lower().replace("-", "_") return t.split("_")[0] if t else "" def _resolve_index_lang(code: str) -> Optional[str]: if code in langs_set: return code for cand in langs_list: if _index_lang_base(cand) == code: return cand return None if langs_list: resolved = _resolve_index_lang(raw_code) if resolved is None: return primary, 0.5, "fallback" return resolved, 0.92, "detector" return raw_code, 0.92, "detector"