""" Query parser - main module for query processing. Responsibilities are intentionally narrow: - normalize and rewrite the incoming query - detect language and tokenize with HanLP - run translation and embedding requests concurrently - return parser facts, not Elasticsearch language-planning data """ from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import logging import re from concurrent.futures import ThreadPoolExecutor, wait from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig from translation import create_translation_client from .language_detector import LanguageDetector from .query_rewriter import QueryRewriter, QueryNormalizer logger = logging.getLogger(__name__) try: import hanlp # type: ignore except Exception: # pragma: no cover hanlp = None def simple_tokenize_query(text: str) -> List[str]: """ Lightweight tokenizer for suggestion-side heuristics only. - Consecutive CJK characters form one token - Latin / digit runs (with internal hyphens) form tokens """ if not text: return [] pattern = re.compile(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+(?:-[A-Za-z0-9_]+)*") return pattern.findall(text) @dataclass(slots=True) class ParsedQuery: """Container for query parser facts.""" original_query: str query_normalized: str rewritten_query: str detected_language: Optional[str] = None translations: Dict[str, str] = field(default_factory=dict) query_vector: Optional[np.ndarray] = None query_tokens: List[str] = field(default_factory=list) contains_chinese: bool = False contains_english: bool = False def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "original_query": self.original_query, "query_normalized": self.query_normalized, "rewritten_query": self.rewritten_query, "detected_language": self.detected_language, "translations": self.translations, "query_tokens": self.query_tokens, "contains_chinese": self.contains_chinese, "contains_english": self.contains_english, } class QueryParser: """ Main query parser that processes queries through multiple stages: 1. Normalization 2. Query rewriting (brand/category mappings, synonyms) 3. Language detection 4. Translation to caller-provided target languages 5. Text embedding generation (for semantic search) """ def __init__( self, config: SearchConfig, text_encoder: Optional[TextEmbeddingEncoder] = None, translator: Optional[Any] = None, tokenizer: Optional[Callable[[str], Any]] = None, ): """ Initialize query parser. Args: config: SearchConfig instance text_encoder: Text embedding encoder (initialized at startup if not provided) translator: Translator instance (initialized at startup if not provided) """ self.config = config self._text_encoder = text_encoder self._translator = translator # Initialize components self.normalizer = QueryNormalizer() self.language_detector = LanguageDetector() self.rewriter = QueryRewriter(config.query_config.rewrite_dictionary) self._tokenizer = tokenizer or self._build_tokenizer() # Eager initialization (startup-time failure visibility, no lazy init in request path) if self.config.query_config.enable_text_embedding and self._text_encoder is None: logger.info("Initializing text encoder at QueryParser construction...") self._text_encoder = TextEmbeddingEncoder() if self._translator is None: from config.services_config import get_translation_config cfg = get_translation_config() logger.info( "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...", cfg.get("service_url"), cfg.get("default_model"), ) self._translator = create_translation_client() @property def text_encoder(self) -> TextEmbeddingEncoder: """Return pre-initialized text encoder.""" return self._text_encoder @property def translator(self) -> Any: """Return pre-initialized translator.""" return self._translator def _build_tokenizer(self) -> Callable[[str], Any]: """Build the tokenizer used by query parsing. No fallback path by design.""" if hanlp is None: raise RuntimeError("HanLP is required for QueryParser tokenization") logger.info("Initializing HanLP tokenizer...") tokenizer = hanlp.load(hanlp.pretrained.tok.CTB9_TOK_ELECTRA_BASE_CRF) tokenizer.config.output_spans = True logger.info("HanLP tokenizer initialized") return tokenizer @staticmethod def _pick_query_translation_model(source_lang: str, target_lang: str, config: SearchConfig) -> str: """Pick the translation capability for query-time translation (configurable).""" src = str(source_lang or "").strip().lower() tgt = str(target_lang or "").strip().lower() # Use dedicated models for zh<->en if configured if src == "zh" and tgt == "en": return config.query_config.zh_to_en_model if src == "en" and tgt == "zh": return config.query_config.en_to_zh_model # For any other language pairs, fall back to the configurable default model. # By default this is `nllb-200-distilled-600m` (multi-lingual local model). return config.query_config.default_translation_model @staticmethod def _normalize_language_codes(languages: Optional[List[str]]) -> List[str]: normalized: List[str] = [] seen = set() for language in languages or []: token = str(language or "").strip().lower() if not token or token in seen: continue seen.add(token) normalized.append(token) return normalized @staticmethod def _extract_tokens(tokenizer_result: Any) -> List[str]: """Normalize tokenizer output into a flat token string list.""" if not tokenizer_result: return [] if isinstance(tokenizer_result, str): token = tokenizer_result.strip() return [token] if token else [] tokens: List[str] = [] for item in tokenizer_result: token: Optional[str] = None if isinstance(item, str): token = item elif isinstance(item, (list, tuple)) and item: token = str(item[0]) elif item is not None: token = str(item) if token is None: continue token = token.strip() if token: tokens.append(token) return tokens def _get_query_tokens(self, query: str) -> List[str]: return self._extract_tokens(self._tokenizer(query)) @staticmethod def _contains_cjk(text: str) -> bool: """Whether query contains any CJK ideograph.""" return bool(re.search(r"[\u4e00-\u9fff]", text or "")) @staticmethod def _is_pure_english_word_token(token: str) -> bool: """ A tokenizer token counts as English iff it is letters only (optional internal hyphens) and length >= 3. """ if not token or len(token) < 3: return False return bool(re.fullmatch(r"[A-Za-z]+(?:-[A-Za-z]+)*", token)) def parse( self, query: str, tenant_id: Optional[str] = None, generate_vector: bool = True, context: Optional[Any] = None, target_languages: Optional[List[str]] = None, ) -> ParsedQuery: """ Parse query through all processing stages. Args: query: Raw query string tenant_id: Deprecated and ignored by QueryParser. Kept temporarily to avoid a wider refactor in this first step. generate_vector: Whether to generate query embedding context: Optional request context for tracking and logging target_languages: Translation target languages decided by the caller Returns: ParsedQuery object with all processing results """ # Initialize logger if context provided active_logger = context.logger if context else logger if context and hasattr(context, "logger"): context.logger.info( f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}", extra={'reqid': context.reqid, 'uid': context.uid} ) def log_info(msg): if context and hasattr(context, 'logger'): context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.info(msg) def log_debug(msg): if context and hasattr(context, 'logger'): context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.debug(msg) # Stage 1: Normalize normalized = self.normalizer.normalize(query) log_debug(f"Normalization completed | '{query}' -> '{normalized}'") if context: context.store_intermediate_result('query_normalized', normalized) # Stage 2: Query rewriting query_text = normalized rewritten = normalized if self.config.query_config.rewrite_dictionary: # Enable rewrite if dictionary exists rewritten = self.rewriter.rewrite(query_text) if rewritten != query_text: log_info(f"Query rewritten | '{query_text}' -> '{rewritten}'") query_text = rewritten if context: context.store_intermediate_result('rewritten_query', rewritten) context.add_warning(f"Query was rewritten: {query_text}") # Stage 3: Language detection detected_lang = self.language_detector.detect(query_text) # Use default language if detection failed (None or "unknown") if not detected_lang or detected_lang == "unknown": detected_lang = self.config.query_config.default_language log_info(f"Language detection | Detected language: {detected_lang}") if context: context.store_intermediate_result('detected_language', detected_lang) # Stage 4: Query analysis (tokenization + script flags) query_tokens = self._get_query_tokens(query_text) contains_chinese = self._contains_cjk(query_text) contains_english = any(self._is_pure_english_word_token(t) for t in query_tokens) log_debug( f"Query analysis | Query tokens: {query_tokens} | " f"contains_chinese={contains_chinese} | contains_english={contains_english}" ) if context: context.store_intermediate_result('query_tokens', query_tokens) context.store_intermediate_result('contains_chinese', contains_chinese) context.store_intermediate_result('contains_english', contains_english) # Stage 5: Translation + embedding. Parser only coordinates async enrichment work; the # caller decides translation targets and later search-field planning. translations: Dict[str, str] = {} future_to_task: Dict[Any, Tuple[str, Optional[str]]] = {} async_executor: Optional[ThreadPoolExecutor] = None detected_norm = str(detected_lang or "").strip().lower() normalized_targets = self._normalize_language_codes(target_languages) translation_targets = [lang for lang in normalized_targets if lang != detected_norm] # Stage 6: Text embedding - async execution query_vector = None should_generate_embedding = ( generate_vector and self.config.query_config.enable_text_embedding ) task_count = len(translation_targets) + (1 if should_generate_embedding else 0) if task_count > 0: async_executor = ThreadPoolExecutor( max_workers=max(1, min(task_count, 4)), thread_name_prefix="query-enrichment", ) try: if async_executor is not None: for lang in translation_targets: model_name = self._pick_query_translation_model(detected_lang, lang, self.config) log_debug( f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" ) future = async_executor.submit( self.translator.translate, query_text, lang, detected_lang, "ecommerce_search_query", model_name, ) future_to_task[future] = ("translation", lang) if should_generate_embedding: if self.text_encoder is None: raise RuntimeError("Text embedding is enabled but text encoder is not initialized") log_debug("Submitting query vector generation") def _encode_query_vector() -> Optional[np.ndarray]: arr = self.text_encoder.encode([query_text], priority=1) if arr is None or len(arr) == 0: return None vec = arr[0] if vec is None: return None return np.asarray(vec, dtype=np.float32) future = async_executor.submit(_encode_query_vector) future_to_task[future] = ("embedding", None) except Exception as e: error_msg = f"Async query enrichment submission failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if async_executor is not None: async_executor.shutdown(wait=False) async_executor = None future_to_task.clear() # Wait for translation + embedding concurrently; shared budget depends on whether # the detected language belongs to caller-provided target_languages. qc = self.config.query_config source_in_target_languages = bool(normalized_targets) and detected_norm in normalized_targets budget_ms = ( qc.translation_embedding_wait_budget_ms_source_in_index if source_in_target_languages else qc.translation_embedding_wait_budget_ms_source_not_in_index ) budget_sec = max(0.0, float(budget_ms) / 1000.0) if translation_targets: log_info( f"Translation+embedding shared wait budget | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages} | " f"translation_targets={translation_targets}" ) if future_to_task: log_debug( f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | " f"source_in_target_languages={source_in_target_languages}" ) done, not_done = wait(list(future_to_task.keys()), timeout=budget_sec) for future in done: task_type, lang = future_to_task[future] try: result = future.result() if task_type == "translation": if result: translations[lang] = result log_info( f"Translation completed | Query text: '{query_text}' | " f"Target language: {lang} | Translation result: '{result}'" ) if context: context.store_intermediate_result(f"translation_{lang}", result) elif task_type == "embedding": query_vector = result if query_vector is not None: log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") if context: context.store_intermediate_result("query_vector_shape", query_vector.shape) else: log_info( "Query vector generation completed but result is None, will process without vector" ) except Exception as e: if task_type == "translation": error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" else: error_msg = f"Query vector generation failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if not_done: for future in not_done: task_type, lang = future_to_task[future] if task_type == "translation": timeout_msg = ( f"Translation timeout (>{budget_ms}ms) | Language: {lang} | " f"Query text: '{query_text}'" ) else: timeout_msg = ( f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result" ) log_info(timeout_msg) if context: context.add_warning(timeout_msg) if async_executor: async_executor.shutdown(wait=False) if translations and context: context.store_intermediate_result("translations", translations) # Build result result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=query_text, detected_language=detected_lang, translations=translations, query_vector=query_vector, query_tokens=query_tokens, contains_chinese=contains_chinese, contains_english=contains_english, ) if context and hasattr(context, 'logger'): context.logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Translation count: {len(translations)} | Vector: {'yes' if query_vector is not None else 'no'}", extra={'reqid': context.reqid, 'uid': context.uid} ) else: logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang}" ) return result def get_search_queries(self, parsed_query: ParsedQuery) -> List[str]: """ Get list of queries to search (original + translations). Args: parsed_query: Parsed query object Returns: List of query strings to search """ queries = [parsed_query.rewritten_query] # Add translations for lang, translation in parsed_query.translations.items(): if translation and translation != parsed_query.rewritten_query: queries.append(translation) return queries def detect_text_language_for_suggestions( text: str, *, index_languages: Optional[List[str]] = None, primary_language: str = "en", ) -> Tuple[str, float, str]: """ Language detection for short strings (mixed-language tags, query-log fallback). Uses the same ``LanguageDetector`` as :class:`QueryParser`. Returns a language code present in ``index_languages`` when possible, otherwise the tenant primary. Returns: (lang, confidence, source) where source is ``detector``, ``fallback``, or ``default``. """ langs_list = [x for x in (index_languages or []) if x] langs_set = set(langs_list) def _norm_lang(raw: Optional[str]) -> Optional[str]: if not raw: return None token = str(raw).strip().lower().replace("-", "_") if not token: return None if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] primary = _norm_lang(primary_language) or "en" if primary not in langs_set and langs_list: primary = _norm_lang(langs_list[0]) or langs_list[0] if not text or not str(text).strip(): return primary, 0.0, "default" raw_code = LanguageDetector().detect(str(text).strip()) if not raw_code or raw_code == "unknown": return primary, 0.35, "default" def _index_lang_base(cand: str) -> str: t = str(cand).strip().lower().replace("-", "_") return t.split("_")[0] if t else "" def _resolve_index_lang(code: str) -> Optional[str]: if code in langs_set: return code for cand in langs_list: if _index_lang_base(cand) == code: return cand return None if langs_list: resolved = _resolve_index_lang(raw_code) if resolved is None: return primary, 0.5, "fallback" return resolved, 0.92, "detector" return raw_code, 0.92, "detector"