""" Query parser - main module for query processing. Responsibilities are intentionally narrow: - normalize and rewrite the incoming query - detect language and tokenize with HanLP - run translation and embedding requests concurrently - return parser facts, not Elasticsearch language-planning data """ from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import logging from concurrent.futures import ThreadPoolExecutor, wait from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig from translation import create_translation_client from .language_detector import LanguageDetector from .query_rewriter import QueryRewriter, QueryNormalizer from .style_intent import StyleIntentDetector, StyleIntentProfile, StyleIntentRegistry from .tokenization import extract_token_strings, simple_tokenize_query logger = logging.getLogger(__name__) import hanlp # type: ignore @dataclass(slots=True) class ParsedQuery: """Container for query parser facts.""" original_query: str query_normalized: str rewritten_query: str detected_language: Optional[str] = None translations: Dict[str, str] = field(default_factory=dict) query_vector: Optional[np.ndarray] = None query_tokens: List[str] = field(default_factory=list) style_intent_profile: Optional[StyleIntentProfile] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "original_query": self.original_query, "query_normalized": self.query_normalized, "rewritten_query": self.rewritten_query, "detected_language": self.detected_language, "translations": self.translations, "query_tokens": self.query_tokens, "style_intent_profile": ( self.style_intent_profile.to_dict() if self.style_intent_profile is not None else None ), } class QueryParser: """ Main query parser that processes queries through multiple stages: 1. Normalization 2. Query rewriting (brand/category mappings, synonyms) 3. Language detection 4. Translation to caller-provided target languages 5. Text embedding generation (for semantic search) """ def __init__( self, config: SearchConfig, text_encoder: Optional[TextEmbeddingEncoder] = None, translator: Optional[Any] = None, tokenizer: Optional[Callable[[str], Any]] = None, ): """ Initialize query parser. Args: config: SearchConfig instance text_encoder: Text embedding encoder (initialized at startup if not provided) translator: Translator instance (initialized at startup if not provided) """ self.config = config self._text_encoder = text_encoder self._translator = translator # Initialize components self.normalizer = QueryNormalizer() self.language_detector = LanguageDetector() self.rewriter = QueryRewriter(config.query_config.rewrite_dictionary) self._tokenizer = tokenizer or self._build_tokenizer() self.style_intent_registry = StyleIntentRegistry.from_query_config(config.query_config) self.style_intent_detector = StyleIntentDetector( self.style_intent_registry, tokenizer=self._tokenizer, ) # Eager initialization (startup-time failure visibility, no lazy init in request path) if self.config.query_config.enable_text_embedding and self._text_encoder is None: logger.info("Initializing text encoder at QueryParser construction...") self._text_encoder = TextEmbeddingEncoder() if self._translator is None: from config.services_config import get_translation_config cfg = get_translation_config() logger.info( "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...", cfg.get("service_url"), cfg.get("default_model"), ) self._translator = create_translation_client() @property def text_encoder(self) -> TextEmbeddingEncoder: """Return pre-initialized text encoder.""" return self._text_encoder @property def translator(self) -> Any: """Return pre-initialized translator.""" return self._translator def _build_tokenizer(self) -> Callable[[str], Any]: """Build the tokenizer used by query parsing. No fallback path by design.""" if hanlp is None: raise RuntimeError("HanLP is required for QueryParser tokenization") logger.info("Initializing HanLP tokenizer...") tokenizer = hanlp.load(hanlp.pretrained.tok.CTB9_TOK_ELECTRA_BASE_CRF) tokenizer.config.output_spans = True logger.info("HanLP tokenizer initialized") return tokenizer @staticmethod def _pick_query_translation_model( source_lang: str, target_lang: str, config: SearchConfig, source_language_in_index: bool, ) -> str: """Pick the translation capability for query-time translation (configurable).""" src = str(source_lang or "").strip().lower() tgt = str(target_lang or "").strip().lower() qc = config.query_config if source_language_in_index: if src == "zh" and tgt == "en": return qc.zh_to_en_model if src == "en" and tgt == "zh": return qc.en_to_zh_model return qc.default_translation_model if src == "zh" and tgt == "en": return qc.zh_to_en_model_source_not_in_index or qc.zh_to_en_model if src == "en" and tgt == "zh": return qc.en_to_zh_model_source_not_in_index or qc.en_to_zh_model return qc.default_translation_model_source_not_in_index or qc.default_translation_model @staticmethod def _normalize_language_codes(languages: Optional[List[str]]) -> List[str]: normalized: List[str] = [] seen = set() for language in languages or []: token = str(language or "").strip().lower() if not token or token in seen: continue seen.add(token) normalized.append(token) return normalized @staticmethod def _extract_tokens(tokenizer_result: Any) -> List[str]: """Normalize tokenizer output into a flat token string list.""" return extract_token_strings(tokenizer_result) def _get_query_tokens(self, query: str) -> List[str]: return self._extract_tokens(self._tokenizer(query)) def parse( self, query: str, tenant_id: Optional[str] = None, generate_vector: bool = True, context: Optional[Any] = None, target_languages: Optional[List[str]] = None, ) -> ParsedQuery: """ Parse query through all processing stages. Args: query: Raw query string tenant_id: Deprecated and ignored by QueryParser. Kept temporarily to avoid a wider refactor in this first step. generate_vector: Whether to generate query embedding context: Optional request context for tracking and logging target_languages: Translation target languages decided by the caller Returns: ParsedQuery object with all processing results """ # Initialize logger if context provided active_logger = context.logger if context else logger if context and hasattr(context, "logger"): context.logger.info( f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}", extra={'reqid': context.reqid, 'uid': context.uid} ) def log_info(msg): if context and hasattr(context, 'logger'): context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.info(msg) def log_debug(msg): if context and hasattr(context, 'logger'): context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.debug(msg) # Stage 1: Normalize normalized = self.normalizer.normalize(query) log_debug(f"Normalization completed | '{query}' -> '{normalized}'") if context: context.store_intermediate_result('query_normalized', normalized) # Stage 2: Query rewriting query_text = normalized rewritten = normalized if self.config.query_config.rewrite_dictionary: # Enable rewrite if dictionary exists rewritten = self.rewriter.rewrite(query_text) if rewritten != query_text: log_info(f"Query rewritten | '{query_text}' -> '{rewritten}'") query_text = rewritten if context: context.store_intermediate_result('rewritten_query', rewritten) context.add_warning(f"Query was rewritten: {query_text}") # Stage 3: Language detection detected_lang = self.language_detector.detect(query_text) # Use default language if detection failed (None or "unknown") if not detected_lang or detected_lang == "unknown": detected_lang = self.config.query_config.default_language log_info(f"Language detection | Detected language: {detected_lang}") if context: context.store_intermediate_result('detected_language', detected_lang) # Stage 4: Query analysis (tokenization) query_tokens = self._get_query_tokens(query_text) log_debug(f"Query analysis | Query tokens: {query_tokens}") if context: context.store_intermediate_result('query_tokens', query_tokens) # Stage 5: Translation + embedding. Parser only coordinates async enrichment work; the # caller decides translation targets and later search-field planning. translations: Dict[str, str] = {} future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} async_executor: Optional[ThreadPoolExecutor] = None detected_norm = str(detected_lang or "").strip().lower() normalized_targets = self._normalize_language_codes(target_languages) translation_targets = [lang for lang in normalized_targets if lang != detected_norm] source_language_in_index = bool(normalized_targets) and detected_norm in normalized_targets # Stage 6: Text embedding - async execution query_vector = None should_generate_embedding = ( generate_vector and self.config.query_config.enable_text_embedding ) task_count = len(translation_targets) + (1 if should_generate_embedding else 0) if task_count > 0: async_executor = ThreadPoolExecutor( max_workers=max(1, min(task_count, 4)), thread_name_prefix="query-enrichment", ) try: if async_executor is not None: for lang in translation_targets: model_name = self._pick_query_translation_model( detected_lang, lang, self.config, source_language_in_index, ) log_debug( f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" ) future = async_executor.submit( self.translator.translate, query_text, lang, detected_lang, "ecommerce_search_query", model_name, ) future_to_task[future] = ("translation", lang) if should_generate_embedding: if self.text_encoder is None: raise RuntimeError("Text embedding is enabled but text encoder is not initialized") log_debug("Submitting query vector generation") def _encode_query_vector() -> Optional[np.ndarray]: arr = self.text_encoder.encode( [query_text], priority=1, request_id=(context.reqid if context else None), user_id=(context.uid if context else None), ) if arr is None or len(arr) == 0: return None vec = arr[0] if vec is None: return None return np.asarray(vec, dtype=np.float32) future = async_executor.submit(_encode_query_vector) future_to_task[future] = ("embedding", None) except Exception as e: error_msg = f"Async query enrichment submission failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if async_executor is not None: async_executor.shutdown(wait=False) async_executor = None future_to_task.clear() # Wait for translation + embedding concurrently; shared budget depends on whether # the detected language belongs to caller-provided target_languages. qc = self.config.query_config source_in_target_languages = bool(normalized_targets) and detected_norm in normalized_targets budget_ms = ( qc.translation_embedding_wait_budget_ms_source_in_index if source_in_target_languages else qc.translation_embedding_wait_budget_ms_source_not_in_index ) budget_sec = max(0.0, float(budget_ms) / 1000.0) if translation_targets: log_info( f"Translation+embedding shared wait budget | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages} | " f"translation_targets={translation_targets}" ) if future_to_task: log_debug( f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages}" ) done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) for future in done: task_type, lang = future_to_task[future] try: result = future.result() if task_type == "translation": if result: translations[lang] = result log_info( f"Translation completed | Query text: '{query_text}' | " f"Target language: {lang} | Translation result: '{result}'" ) if context: context.store_intermediate_result(f"translation_{lang}", result) elif task_type == "embedding": query_vector = result if query_vector is not None: log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") if context: context.store_intermediate_result("query_vector_shape", query_vector.shape) else: log_info( "Query vector generation completed but result is None, will process without vector" ) except Exception as e: if task_type == "translation": error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" else: error_msg = f"Query vector generation failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if not_done: for future in not_done: task_type, lang = future_to_task[future] if task_type == "translation": timeout_msg = ( f"Translation timeout (>{budget_ms}ms) | Language: {lang} | " f"Query text: '{query_text}'" ) else: timeout_msg = ( f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result" ) log_info(timeout_msg) if context: context.add_warning(timeout_msg) if async_executor: async_executor.shutdown(wait=False) if translations and context: context.store_intermediate_result("translations", translations) # Build result base_result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=query_text, detected_language=detected_lang, translations=translations, query_vector=query_vector, query_tokens=query_tokens, ) style_intent_profile = self.style_intent_detector.detect(base_result) if context: context.store_intermediate_result( "style_intent_profile", style_intent_profile.to_dict(), ) result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=query_text, detected_language=detected_lang, translations=translations, query_vector=query_vector, query_tokens=query_tokens, style_intent_profile=style_intent_profile, ) if context and hasattr(context, 'logger'): context.logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Translation count: {len(translations)} | Vector: {'yes' if query_vector is not None else 'no'}", extra={'reqid': context.reqid, 'uid': context.uid} ) else: logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang}" ) return result def get_search_queries(self, parsed_query: ParsedQuery) -> List[str]: """ Get list of queries to search (original + translations). Args: parsed_query: Parsed query object Returns: List of query strings to search """ queries = [parsed_query.rewritten_query] # Add translations for lang, translation in parsed_query.translations.items(): if translation and translation != parsed_query.rewritten_query: queries.append(translation) return queries def detect_text_language_for_suggestions( text: str, *, index_languages: Optional[List[str]] = None, primary_language: str = "en", ) -> Tuple[str, float, str]: """ Language detection for short strings (mixed-language tags, query-log fallback). Uses the same ``LanguageDetector`` as :class:`QueryParser`. Returns a language code present in ``index_languages`` when possible, otherwise the tenant primary. Returns: (lang, confidence, source) where source is ``detector``, ``fallback``, or ``default``. """ langs_list = [x for x in (index_languages or []) if x] langs_set = set(langs_list) def _norm_lang(raw: Optional[str]) -> Optional[str]: if not raw: return None token = str(raw).strip().lower().replace("-", "_") if not token: return None if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] primary = _norm_lang(primary_language) or "en" if primary not in langs_set and langs_list: primary = _norm_lang(langs_list[0]) or langs_list[0] if not text or not str(text).strip(): return primary, 0.0, "default" raw_code = LanguageDetector().detect(str(text).strip()) if not raw_code or raw_code == "unknown": return primary, 0.35, "default" def _index_lang_base(cand: str) -> str: t = str(cand).strip().lower().replace("-", "_") return t.split("_")[0] if t else "" def _resolve_index_lang(code: str) -> Optional[str]: if code in langs_set: return code for cand in langs_list: if _index_lang_base(cand) == code: return cand return None if langs_list: resolved = _resolve_index_lang(raw_code) if resolved is None: return primary, 0.5, "fallback" return resolved, 0.92, "detector" return raw_code, 0.92, "detector"