""" Query parser - main module for query processing. Handles query rewriting, translation, and embedding generation. """ from typing import Dict, List, Optional, Any, Union, Tuple import numpy as np import logging import re from concurrent.futures import ThreadPoolExecutor, wait from embeddings.text_encoder import TextEmbeddingEncoder from config import SearchConfig from translation import create_translation_client from .language_detector import LanguageDetector from .query_rewriter import QueryRewriter, QueryNormalizer logger = logging.getLogger(__name__) try: import hanlp # type: ignore except Exception: # pragma: no cover hanlp = None def simple_tokenize_query(text: str) -> List[str]: """ Lightweight tokenizer for suggestion length / analysis (aligned with QueryParser fallback). - Consecutive CJK characters form one token - Latin / digit runs (with internal hyphens) form tokens """ if not text: return [] pattern = re.compile(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+(?:-[A-Za-z0-9_]+)*") return pattern.findall(text) class ParsedQuery: """Container for parsed query results.""" def __init__( self, original_query: str, query_normalized: str, rewritten_query: Optional[str] = None, detected_language: Optional[str] = None, translations: Dict[str, str] = None, query_vector: Optional[np.ndarray] = None, domain: str = "default", keywords: str = "", token_count: int = 0, query_tokens: Optional[List[str]] = None, query_text_by_lang: Optional[Dict[str, str]] = None, search_langs: Optional[List[str]] = None, index_languages: Optional[List[str]] = None, source_in_index_languages: bool = True, contains_chinese: bool = False, contains_english: bool = False, ): self.original_query = original_query self.query_normalized = query_normalized self.rewritten_query = rewritten_query or query_normalized self.detected_language = detected_language self.translations = translations or {} self.query_vector = query_vector self.domain = domain # Query analysis fields self.keywords = keywords self.token_count = token_count self.query_tokens = query_tokens or [] self.query_text_by_lang = query_text_by_lang or {} self.search_langs = search_langs or [] self.index_languages = index_languages or [] self.source_in_index_languages = bool(source_in_index_languages) self.contains_chinese = bool(contains_chinese) self.contains_english = bool(contains_english) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "original_query": self.original_query, "query_normalized": self.query_normalized, "rewritten_query": self.rewritten_query, "detected_language": self.detected_language, "translations": self.translations, "domain": self.domain } result["query_text_by_lang"] = self.query_text_by_lang result["search_langs"] = self.search_langs result["index_languages"] = self.index_languages result["source_in_index_languages"] = self.source_in_index_languages result["contains_chinese"] = self.contains_chinese result["contains_english"] = self.contains_english return result class QueryParser: """ Main query parser that processes queries through multiple stages: 1. Normalization 2. Query rewriting (brand/category mappings, synonyms) 3. Language detection 4. Translation to target languages 5. Text embedding generation (for semantic search) """ def __init__( self, config: SearchConfig, text_encoder: Optional[TextEmbeddingEncoder] = None, translator: Optional[Any] = None ): """ Initialize query parser. Args: config: SearchConfig instance text_encoder: Text embedding encoder (initialized at startup if not provided) translator: Translator instance (initialized at startup if not provided) """ self.config = config self._text_encoder = text_encoder self._translator = translator # Initialize components self.normalizer = QueryNormalizer() self.language_detector = LanguageDetector() self.rewriter = QueryRewriter(config.query_config.rewrite_dictionary) # Optional HanLP components (heavy). If unavailable, fall back to a lightweight tokenizer. self._tok = None self._pos_tag = None if hanlp is not None: try: logger.info("Initializing HanLP components...") self._tok = hanlp.load(hanlp.pretrained.tok.CTB9_TOK_ELECTRA_BASE_CRF) self._tok.config.output_spans = True self._pos_tag = hanlp.load(hanlp.pretrained.pos.CTB9_POS_ELECTRA_SMALL) logger.info("HanLP components initialized") except Exception as e: logger.warning(f"HanLP init failed, falling back to simple tokenizer: {e}") self._tok = None self._pos_tag = None else: logger.info("HanLP not installed; using simple tokenizer") # Eager initialization (startup-time failure visibility, no lazy init in request path) if self.config.query_config.enable_text_embedding and self._text_encoder is None: logger.info("Initializing text encoder at QueryParser construction...") self._text_encoder = TextEmbeddingEncoder() if self._translator is None: from config.services_config import get_translation_config cfg = get_translation_config() logger.info( "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...", cfg.get("service_url"), cfg.get("default_model"), ) self._translator = create_translation_client() @property def text_encoder(self) -> TextEmbeddingEncoder: """Return pre-initialized text encoder.""" return self._text_encoder @property def translator(self) -> Any: """Return pre-initialized translator.""" return self._translator @staticmethod def _pick_query_translation_model(source_lang: str, target_lang: str, config: SearchConfig) -> str: """Pick the translation capability for query-time translation (configurable).""" src = str(source_lang or "").strip().lower() tgt = str(target_lang or "").strip().lower() # Use dedicated models for zh<->en if configured if src == "zh" and tgt == "en": return config.query_config.zh_to_en_model if src == "en" and tgt == "zh": return config.query_config.en_to_zh_model # For any other language pairs, fall back to the configurable default model. # By default this is `nllb-200-distilled-600m` (multi-lingual local model). return config.query_config.default_translation_model def _simple_tokenize(self, text: str) -> List[str]: return simple_tokenize_query(text) def _extract_keywords(self, query: str) -> str: """Extract keywords (nouns with length > 1) from query.""" if self._tok is not None and self._pos_tag is not None: tok_result = self._tok(query) if not tok_result: return "" words = [x[0] for x in tok_result] pos_tags = self._pos_tag(words) keywords = [] for word, pos in zip(words, pos_tags): if len(word) > 1 and isinstance(pos, str) and pos.startswith("N"): keywords.append(word) return " ".join(keywords) # Fallback: treat tokens with length > 1 as "keywords" tokens = self._simple_tokenize(query) keywords = [t for t in tokens if len(t) > 1] return " ".join(keywords) def _get_token_count(self, query: str) -> int: """Get token count (HanLP if available, otherwise simple).""" if self._tok is not None: tok_result = self._tok(query) return len(tok_result) if tok_result else 0 return len(self._simple_tokenize(query)) def _get_query_tokens(self, query: str) -> List[str]: """Get token list (HanLP if available, otherwise simple).""" if self._tok is not None: tok_result = self._tok(query) return [x[0] for x in tok_result] if tok_result else [] return self._simple_tokenize(query) @staticmethod def _contains_cjk(text: str) -> bool: """Whether query contains any CJK ideograph.""" return bool(re.search(r"[\u4e00-\u9fff]", text or "")) @staticmethod def _is_pure_english_word_token(token: str) -> bool: """ A tokenizer token counts as English iff it is letters only (optional internal hyphens) and length >= 3. """ if not token or len(token) < 3: return False return bool(re.fullmatch(r"[A-Za-z]+(?:-[A-Za-z]+)*", token)) @staticmethod def _extract_latin_tokens(text: str) -> List[str]: """Extract latin word tokens from query text.""" return re.findall(r"[A-Za-z]+(?:-[A-Za-z]+)*", text or "") def _infer_supplemental_search_langs( self, query_text: str, detected_lang: str, index_langs: List[str], ) -> List[str]: """ Infer extra languages to search when the query mixes scripts. Rules: - If any Chinese characters appear, include `zh` when available. - If the query contains meaningful latin tokens, include `en` when available. "Meaningful" means either: 1) at least 2 latin tokens with length >= 4, or 2) at least 1 latin token with length >= 4 and latin chars occupy >= 20% of non-space chars. """ supplemental: List[str] = [] normalized_index_langs = {str(lang or "").strip().lower() for lang in index_langs} normalized_detected = str(detected_lang or "").strip().lower() query_text = str(query_text or "") if "zh" in normalized_index_langs and self._contains_cjk(query_text) and normalized_detected != "zh": supplemental.append("zh") latin_tokens = self._extract_latin_tokens(query_text) significant_latin_tokens = [tok for tok in latin_tokens if len(tok) >= 4] latin_chars = sum(len(tok) for tok in latin_tokens) non_space_chars = len(re.sub(r"\s+", "", query_text)) latin_ratio = (latin_chars / non_space_chars) if non_space_chars > 0 else 0.0 has_meaningful_english = ( len(significant_latin_tokens) >= 2 or (len(significant_latin_tokens) >= 1 and latin_ratio >= 0.2) ) if "en" in normalized_index_langs and has_meaningful_english and normalized_detected != "en": supplemental.append("en") return supplemental def parse( self, query: str, tenant_id: Optional[str] = None, generate_vector: bool = True, context: Optional[Any] = None ) -> ParsedQuery: """ Parse query through all processing stages. Args: query: Raw query string generate_vector: Whether to generate query embedding context: Optional request context for tracking and logging Returns: ParsedQuery object with all processing results """ # Initialize logger if context provided active_logger = context.logger if context else logger if context and hasattr(context, "logger"): context.logger.info( f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}", extra={'reqid': context.reqid, 'uid': context.uid} ) def log_info(msg): if context and hasattr(context, 'logger'): context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.info(msg) def log_debug(msg): if context and hasattr(context, 'logger'): context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid}) else: active_logger.debug(msg) # Stage 1: Normalize normalized = self.normalizer.normalize(query) log_debug(f"Normalization completed | '{query}' -> '{normalized}'") if context: context.store_intermediate_result('query_normalized', normalized) # Extract domain if present (e.g., "brand:Nike" -> domain="brand", query="Nike") domain, query_text = self.normalizer.extract_domain_query(normalized) log_debug(f"Domain extraction | Domain: '{domain}', Query: '{query_text}'") if context: context.store_intermediate_result('extracted_domain', domain) context.store_intermediate_result('domain_query', query_text) # Stage 2: Query rewriting rewritten = None if self.config.query_config.rewrite_dictionary: # Enable rewrite if dictionary exists rewritten = self.rewriter.rewrite(query_text) if rewritten != query_text: log_info(f"Query rewritten | '{query_text}' -> '{rewritten}'") query_text = rewritten if context: context.store_intermediate_result('rewritten_query', rewritten) context.add_warning(f"Query was rewritten: {query_text}") # Stage 3: Language detection detected_lang = self.language_detector.detect(query_text) # Use default language if detection failed (None or "unknown") if not detected_lang or detected_lang == "unknown": detected_lang = self.config.query_config.default_language log_info(f"Language detection | Detected language: {detected_lang}") if context: context.store_intermediate_result('detected_language', detected_lang) # Stage 4: Translation — always submit to thread pool; results are collected together with # embedding in one wait() that uses a configurable budget (short vs long by source-in-index). translations: Dict[str, str] = {} translation_futures: Dict[str, Any] = {} translation_executor: Optional[ThreadPoolExecutor] = None index_langs: List[str] = [] detected_norm = str(detected_lang or "").strip().lower() try: # 根据租户配置的 index_languages 决定翻译目标语言 from config.tenant_config_loader import get_tenant_config_loader tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id or "default") raw_index_langs = tenant_cfg.get("index_languages") or [] index_langs = [] seen_langs = set() for lang in raw_index_langs: norm_lang = str(lang or "").strip().lower() if not norm_lang or norm_lang in seen_langs: continue seen_langs.add(norm_lang) index_langs.append(norm_lang) target_langs_for_translation = [lang for lang in index_langs if lang != detected_norm] if target_langs_for_translation: translation_executor = ThreadPoolExecutor( max_workers=max(1, min(len(target_langs_for_translation), 4)), thread_name_prefix="query-translation", ) for lang in target_langs_for_translation: model_name = self._pick_query_translation_model(detected_lang, lang, self.config) log_debug( f"Submitting query translation | source={detected_lang} target={lang} model={model_name}" ) translation_futures[lang] = translation_executor.submit( self.translator.translate, query_text, lang, detected_lang, "ecommerce_search_query", model_name, ) if context: context.store_intermediate_result('translations', translations) for lang, translation in translations.items(): if translation: context.store_intermediate_result(f'translation_{lang}', translation) except Exception as e: error_msg = f"Translation failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) # Stage 5: Query analysis (keywords, token count, query_tokens) keywords = self._extract_keywords(query_text) query_tokens = self._get_query_tokens(query_text) token_count = len(query_tokens) contains_chinese = self._contains_cjk(query_text) contains_english = any(self._is_pure_english_word_token(t) for t in query_tokens) log_debug(f"Query analysis | Keywords: {keywords} | Token count: {token_count} | " f"Query tokens: {query_tokens} | contains_chinese={contains_chinese} | " f"contains_english={contains_english}") if context: context.store_intermediate_result('keywords', keywords) context.store_intermediate_result('token_count', token_count) context.store_intermediate_result('query_tokens', query_tokens) context.store_intermediate_result('contains_chinese', contains_chinese) context.store_intermediate_result('contains_english', contains_english) # Stage 6: Text embedding (only for non-short queries) - async execution query_vector = None embedding_future = None should_generate_embedding = ( generate_vector and self.config.query_config.enable_text_embedding and domain == "default" ) encoding_executor = None if should_generate_embedding: try: if self.text_encoder is None: raise RuntimeError("Text embedding is enabled but text encoder is not initialized") log_debug("Starting query vector generation (async)") # Submit encoding task to thread pool for async execution encoding_executor = ThreadPoolExecutor(max_workers=1) def _encode_query_vector() -> Optional[np.ndarray]: arr = self.text_encoder.encode([query_text], priority=1) if arr is None or len(arr) == 0: return None vec = arr[0] return vec if isinstance(vec, np.ndarray) else None embedding_future = encoding_executor.submit( _encode_query_vector ) except Exception as e: error_msg = f"Query vector generation task submission failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) encoding_executor = None embedding_future = None # Wait for translation + embedding concurrently; shared budget (ms) depends on whether # the detected language is in tenant index_languages. qc = self.config.query_config source_in_index_for_budget = detected_norm in index_langs budget_ms = ( qc.translation_embedding_wait_budget_ms_source_in_index if source_in_index_for_budget else qc.translation_embedding_wait_budget_ms_source_not_in_index ) budget_sec = max(0.0, float(budget_ms) / 1000.0) if translation_futures: log_info( f"Translation+embedding shared wait budget | budget_ms={budget_ms} | " f"source_in_index_languages={source_in_index_for_budget} | " f"translation_targets={list(translation_futures.keys())}" ) if translation_futures or embedding_future: log_debug( f"Waiting for async tasks (translation+embedding) | budget_ms={budget_ms} | " f"source_in_index_languages={source_in_index_for_budget}" ) all_futures: List[Any] = [] future_to_lang: Dict[Any, tuple] = {} for lang, future in translation_futures.items(): all_futures.append(future) future_to_lang[future] = ("translation", lang) if embedding_future: all_futures.append(embedding_future) future_to_lang[embedding_future] = ("embedding", None) done, not_done = wait(all_futures, timeout=budget_sec) for future in done: task_type, lang = future_to_lang[future] try: result = future.result() if task_type == "translation": if result: translations[lang] = result log_info( f"Translation completed | Query text: '{query_text}' | " f"Target language: {lang} | Translation result: '{result}'" ) if context: context.store_intermediate_result(f"translation_{lang}", result) elif task_type == "embedding": query_vector = result if query_vector is not None: log_debug(f"Query vector generation completed | Shape: {query_vector.shape}") if context: context.store_intermediate_result("query_vector_shape", query_vector.shape) else: log_info( "Query vector generation completed but result is None, will process without vector" ) except Exception as e: if task_type == "translation": error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}" else: error_msg = f"Query vector generation failed | Error: {str(e)}" log_info(error_msg) if context: context.add_warning(error_msg) if not_done: for future in not_done: task_type, lang = future_to_lang[future] if task_type == "translation": timeout_msg = ( f"Translation timeout (>{budget_ms}ms) | Language: {lang} | " f"Query text: '{query_text}'" ) else: timeout_msg = ( f"Query vector generation timeout (>{budget_ms}ms), proceeding without embedding result" ) log_info(timeout_msg) if context: context.add_warning(timeout_msg) if encoding_executor: encoding_executor.shutdown(wait=False) if translation_executor: translation_executor.shutdown(wait=False) if translations and context: context.store_intermediate_result("translations", translations) # Build language-scoped query plan: source language + available translations query_text_by_lang: Dict[str, str] = {} if query_text: query_text_by_lang[detected_lang] = query_text for lang, translated_text in (translations or {}).items(): if translated_text and str(translated_text).strip(): query_text_by_lang[str(lang).strip().lower()] = str(translated_text) supplemental_search_langs = self._infer_supplemental_search_langs( query_text=query_text, detected_lang=detected_lang, index_langs=index_langs, ) for lang in supplemental_search_langs: if lang not in query_text_by_lang and query_text: # Use the original mixed-script query as a robust fallback probe for that language field set. query_text_by_lang[lang] = query_text source_in_index_languages = detected_norm in index_langs ordered_search_langs: List[str] = [] seen_order = set() if detected_lang in query_text_by_lang: ordered_search_langs.append(detected_lang) seen_order.add(detected_lang) for lang in index_langs: if lang in query_text_by_lang and lang not in seen_order: ordered_search_langs.append(lang) seen_order.add(lang) for lang in query_text_by_lang.keys(): if lang not in seen_order: ordered_search_langs.append(lang) seen_order.add(lang) if context: context.store_intermediate_result("search_langs", ordered_search_langs) context.store_intermediate_result("query_text_by_lang", query_text_by_lang) context.store_intermediate_result("supplemental_search_langs", supplemental_search_langs) # Build result result = ParsedQuery( original_query=query, query_normalized=normalized, rewritten_query=rewritten, detected_language=detected_lang, translations=translations, query_vector=query_vector, domain=domain, keywords=keywords, token_count=token_count, query_tokens=query_tokens, query_text_by_lang=query_text_by_lang, search_langs=ordered_search_langs, index_languages=index_langs, source_in_index_languages=source_in_index_languages, contains_chinese=contains_chinese, contains_english=contains_english, ) if context and hasattr(context, 'logger'): context.logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang} | Domain: {domain} | " f"Translation count: {len(translations)} | Vector: {'yes' if query_vector is not None else 'no'}", extra={'reqid': context.reqid, 'uid': context.uid} ) else: logger.info( f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | " f"Language: {detected_lang} | Domain: {domain}" ) return result def get_search_queries(self, parsed_query: ParsedQuery) -> List[str]: """ Get list of queries to search (original + translations). Args: parsed_query: Parsed query object Returns: List of query strings to search """ queries = [parsed_query.rewritten_query] # Add translations for lang, translation in parsed_query.translations.items(): if translation and translation != parsed_query.rewritten_query: queries.append(translation) return queries def detect_text_language_for_suggestions( text: str, *, index_languages: Optional[List[str]] = None, primary_language: str = "en", ) -> Tuple[str, float, str]: """ Language detection for short strings (mixed-language tags, query-log fallback). Uses the same ``LanguageDetector`` as :class:`QueryParser`. Returns a language code present in ``index_languages`` when possible, otherwise the tenant primary. Returns: (lang, confidence, source) where source is ``detector``, ``fallback``, or ``default``. """ langs_list = [x for x in (index_languages or []) if x] langs_set = set(langs_list) def _norm_lang(raw: Optional[str]) -> Optional[str]: if not raw: return None token = str(raw).strip().lower().replace("-", "_") if not token: return None if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] primary = _norm_lang(primary_language) or "en" if primary not in langs_set and langs_list: primary = _norm_lang(langs_list[0]) or langs_list[0] if not text or not str(text).strip(): return primary, 0.0, "default" raw_code = LanguageDetector().detect(str(text).strip()) if not raw_code or raw_code == "unknown": return primary, 0.35, "default" def _index_lang_base(cand: str) -> str: t = str(cand).strip().lower().replace("-", "_") return t.split("_")[0] if t else "" def _resolve_index_lang(code: str) -> Optional[str]: if code in langs_set: return code for cand in langs_list: if _index_lang_base(cand) == code: return cand return None if langs_list: resolved = _resolve_index_lang(raw_code) if resolved is None: return primary, 0.5, "fallback" return resolved, 0.92, "detector" return raw_code, 0.92, "detector"