Blame view

query/query_parser.py 26.9 KB
be52af70   tangwang   first commit
1
2
3
4
5
6
  """
  Query parser - main module for query processing.
  
  Handles query rewriting, translation, and embedding generation.
  """
  
3ec5bfe6   tangwang   1. get_translatio...
7
  from typing import Dict, List, Optional, Any, Union
be52af70   tangwang   first commit
8
  import numpy as np
325eec03   tangwang   1. 日志、配置基础设施,使用优化
9
  import logging
7bc756c5   tangwang   优化 ES 查询构建
10
  import re
d4cadc13   tangwang   翻译重构
11
  from concurrent.futures import ThreadPoolExecutor, as_completed, wait
be52af70   tangwang   first commit
12
  
07cf5a93   tangwang   START_EMBEDDING=...
13
  from embeddings.text_encoder import TextEmbeddingEncoder
9f96d6f3   tangwang   短query不用语义搜索
14
  from config import SearchConfig
0fd2f875   tangwang   translate
15
  from translation import create_translation_client
be52af70   tangwang   first commit
16
  from .language_detector import LanguageDetector
be52af70   tangwang   first commit
17
18
  from .query_rewriter import QueryRewriter, QueryNormalizer
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
19
20
  logger = logging.getLogger(__name__)
  
484adbfe   tangwang   adapt ubuntu; con...
21
22
23
24
  try:
      import hanlp  # type: ignore
  except Exception:  # pragma: no cover
      hanlp = None
be52af70   tangwang   first commit
25
26
27
28
29
30
31
  
  class ParsedQuery:
      """Container for parsed query results."""
  
      def __init__(
          self,
          original_query: str,
3a5fda00   tangwang   1. ES字段 skus的 ima...
32
          query_normalized: str,
be52af70   tangwang   first commit
33
          rewritten_query: Optional[str] = None,
a5a6bab8   tangwang   多语言查询优化
34
          detected_language: Optional[str] = None,
be52af70   tangwang   first commit
35
36
          translations: Dict[str, str] = None,
          query_vector: Optional[np.ndarray] = None,
7bc756c5   tangwang   优化 ES 查询构建
37
38
39
          domain: str = "default",
          keywords: str = "",
          token_count: int = 0,
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
40
41
42
43
44
          query_tokens: Optional[List[str]] = None,
          query_text_by_lang: Optional[Dict[str, str]] = None,
          search_langs: Optional[List[str]] = None,
          index_languages: Optional[List[str]] = None,
          source_in_index_languages: bool = True,
be52af70   tangwang   first commit
45
46
      ):
          self.original_query = original_query
3a5fda00   tangwang   1. ES字段 skus的 ima...
47
48
          self.query_normalized = query_normalized
          self.rewritten_query = rewritten_query or query_normalized
be52af70   tangwang   first commit
49
50
51
52
          self.detected_language = detected_language
          self.translations = translations or {}
          self.query_vector = query_vector
          self.domain = domain
7bc756c5   tangwang   优化 ES 查询构建
53
54
55
          # Query analysis fields
          self.keywords = keywords
          self.token_count = token_count
ea118f2b   tangwang   build_query:根据 qu...
56
          self.query_tokens = query_tokens or []
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
57
58
59
60
          self.query_text_by_lang = query_text_by_lang or {}
          self.search_langs = search_langs or []
          self.index_languages = index_languages or []
          self.source_in_index_languages = bool(source_in_index_languages)
be52af70   tangwang   first commit
61
62
63
64
65
  
      def to_dict(self) -> Dict[str, Any]:
          """Convert to dictionary representation."""
          result = {
              "original_query": self.original_query,
3a5fda00   tangwang   1. ES字段 skus的 ima...
66
              "query_normalized": self.query_normalized,
be52af70   tangwang   first commit
67
68
69
              "rewritten_query": self.rewritten_query,
              "detected_language": self.detected_language,
              "translations": self.translations,
038e4e2f   tangwang   refactor(i18n): t...
70
              "domain": self.domain
be52af70   tangwang   first commit
71
          }
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
72
73
74
75
          result["query_text_by_lang"] = self.query_text_by_lang
          result["search_langs"] = self.search_langs
          result["index_languages"] = self.index_languages
          result["source_in_index_languages"] = self.source_in_index_languages
be52af70   tangwang   first commit
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
          return result
  
  
  class QueryParser:
      """
      Main query parser that processes queries through multiple stages:
      1. Normalization
      2. Query rewriting (brand/category mappings, synonyms)
      3. Language detection
      4. Translation to target languages
      5. Text embedding generation (for semantic search)
      """
  
      def __init__(
          self,
9f96d6f3   tangwang   短query不用语义搜索
91
          config: SearchConfig,
950a640e   tangwang   embeddings
92
          text_encoder: Optional[TextEmbeddingEncoder] = None,
42e3aea6   tangwang   tidy
93
          translator: Optional[Any] = None
be52af70   tangwang   first commit
94
95
96
97
98
      ):
          """
          Initialize query parser.
  
          Args:
9f96d6f3   tangwang   短query不用语义搜索
99
              config: SearchConfig instance
26b910bd   tangwang   refactor service ...
100
101
              text_encoder: Text embedding encoder (initialized at startup if not provided)
              translator: Translator instance (initialized at startup if not provided)
be52af70   tangwang   first commit
102
          """
9f96d6f3   tangwang   短query不用语义搜索
103
          self.config = config
be52af70   tangwang   first commit
104
105
106
107
108
109
          self._text_encoder = text_encoder
          self._translator = translator
  
          # Initialize components
          self.normalizer = QueryNormalizer()
          self.language_detector = LanguageDetector()
9f96d6f3   tangwang   短query不用语义搜索
110
          self.rewriter = QueryRewriter(config.query_config.rewrite_dictionary)
7bc756c5   tangwang   优化 ES 查询构建
111
          
484adbfe   tangwang   adapt ubuntu; con...
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
          # Optional HanLP components (heavy). If unavailable, fall back to a lightweight tokenizer.
          self._tok = None
          self._pos_tag = None
          if hanlp is not None:
              try:
                  logger.info("Initializing HanLP components...")
                  self._tok = hanlp.load(hanlp.pretrained.tok.CTB9_TOK_ELECTRA_BASE_CRF)
                  self._tok.config.output_spans = True
                  self._pos_tag = hanlp.load(hanlp.pretrained.pos.CTB9_POS_ELECTRA_SMALL)
                  logger.info("HanLP components initialized")
              except Exception as e:
                  logger.warning(f"HanLP init failed, falling back to simple tokenizer: {e}")
                  self._tok = None
                  self._pos_tag = None
          else:
              logger.info("HanLP not installed; using simple tokenizer")
be52af70   tangwang   first commit
128
  
26b910bd   tangwang   refactor service ...
129
130
131
132
133
134
135
          # Eager initialization (startup-time failure visibility, no lazy init in request path)
          if self.config.query_config.enable_text_embedding and self._text_encoder is None:
              logger.info("Initializing text encoder at QueryParser construction...")
              self._text_encoder = TextEmbeddingEncoder()
          if self._translator is None:
              from config.services_config import get_translation_config
              cfg = get_translation_config()
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
136
137
              logger.info(
                  "Initializing translator client at QueryParser construction (service_url=%s, default_model=%s)...",
a8261ece   tangwang   检索效果优化
138
139
                  cfg.get("service_url"),
                  cfg.get("default_model"),
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
140
              )
0fd2f875   tangwang   translate
141
              self._translator = create_translation_client()
d4cadc13   tangwang   翻译重构
142
          self._translation_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="query-translation")
26b910bd   tangwang   refactor service ...
143
  
be52af70   tangwang   first commit
144
      @property
950a640e   tangwang   embeddings
145
      def text_encoder(self) -> TextEmbeddingEncoder:
26b910bd   tangwang   refactor service ...
146
          """Return pre-initialized text encoder."""
be52af70   tangwang   first commit
147
148
149
          return self._text_encoder
  
      @property
42e3aea6   tangwang   tidy
150
      def translator(self) -> Any:
26b910bd   tangwang   refactor service ...
151
          """Return pre-initialized translator."""
be52af70   tangwang   first commit
152
          return self._translator
484adbfe   tangwang   adapt ubuntu; con...
153
  
e56fbdc1   tangwang   query trans
154
155
156
157
158
159
160
161
162
163
164
      @staticmethod
      def _pick_query_translation_model(source_lang: str, target_lang: str) -> str:
          """Pick the translation capability for query-time translation."""
          src = str(source_lang or "").strip().lower()
          tgt = str(target_lang or "").strip().lower()
          if src == "zh" and tgt == "en":
              return "opus-mt-zh-en"
          if src == "en" and tgt == "zh":
              return "opus-mt-en-zh"
          return "deepl"
  
484adbfe   tangwang   adapt ubuntu; con...
165
166
167
168
169
170
171
172
173
174
175
      def _simple_tokenize(self, text: str) -> List[str]:
          """
          Lightweight tokenizer fallback.
  
          - Groups consecutive CJK chars as a token
          - Groups consecutive latin/digits/underscore/dash as a token
          """
          if not text:
              return []
          pattern = re.compile(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+(?:-[A-Za-z0-9_]+)*")
          return pattern.findall(text)
7bc756c5   tangwang   优化 ES 查询构建
176
177
178
      
      def _extract_keywords(self, query: str) -> str:
          """Extract keywords (nouns with length > 1) from query."""
484adbfe   tangwang   adapt ubuntu; con...
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
          if self._tok is not None and self._pos_tag is not None:
              tok_result = self._tok(query)
              if not tok_result:
                  return ""
              words = [x[0] for x in tok_result]
              pos_tags = self._pos_tag(words)
              keywords = []
              for word, pos in zip(words, pos_tags):
                  if len(word) > 1 and isinstance(pos, str) and pos.startswith("N"):
                      keywords.append(word)
              return " ".join(keywords)
  
          # Fallback: treat tokens with length > 1 as "keywords"
          tokens = self._simple_tokenize(query)
          keywords = [t for t in tokens if len(t) > 1]
7bc756c5   tangwang   优化 ES 查询构建
194
195
196
          return " ".join(keywords)
      
      def _get_token_count(self, query: str) -> int:
484adbfe   tangwang   adapt ubuntu; con...
197
198
199
200
201
          """Get token count (HanLP if available, otherwise simple)."""
          if self._tok is not None:
              tok_result = self._tok(query)
              return len(tok_result) if tok_result else 0
          return len(self._simple_tokenize(query))
ea118f2b   tangwang   build_query:根据 qu...
202
203
  
      def _get_query_tokens(self, query: str) -> List[str]:
484adbfe   tangwang   adapt ubuntu; con...
204
205
206
207
208
          """Get token list (HanLP if available, otherwise simple)."""
          if self._tok is not None:
              tok_result = self._tok(query)
              return [x[0] for x in tok_result] if tok_result else []
          return self._simple_tokenize(query)
be52af70   tangwang   first commit
209
  
a8261ece   tangwang   检索效果优化
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
      @staticmethod
      def _contains_cjk(text: str) -> bool:
          """Whether query contains any CJK ideograph."""
          return bool(re.search(r"[\u4e00-\u9fff]", text or ""))
  
      @staticmethod
      def _extract_latin_tokens(text: str) -> List[str]:
          """Extract latin word tokens from query text."""
          return re.findall(r"[A-Za-z]+(?:-[A-Za-z]+)*", text or "")
  
      def _infer_supplemental_search_langs(
          self,
          query_text: str,
          detected_lang: str,
          index_langs: List[str],
      ) -> List[str]:
          """
          Infer extra languages to search when the query mixes scripts.
  
          Rules:
          - If any Chinese characters appear, include `zh` when available.
          - If the query contains meaningful latin tokens, include `en` when available.
            "Meaningful" means either:
            1) at least 2 latin tokens with length >= 4, or
            2) at least 1 latin token with length >= 4 and latin chars occupy >= 20% of non-space chars.
          """
          supplemental: List[str] = []
          normalized_index_langs = {str(lang or "").strip().lower() for lang in index_langs}
          normalized_detected = str(detected_lang or "").strip().lower()
          query_text = str(query_text or "")
  
          if "zh" in normalized_index_langs and self._contains_cjk(query_text) and normalized_detected != "zh":
              supplemental.append("zh")
  
          latin_tokens = self._extract_latin_tokens(query_text)
          significant_latin_tokens = [tok for tok in latin_tokens if len(tok) >= 4]
          latin_chars = sum(len(tok) for tok in latin_tokens)
          non_space_chars = len(re.sub(r"\s+", "", query_text))
          latin_ratio = (latin_chars / non_space_chars) if non_space_chars > 0 else 0.0
          has_meaningful_english = (
              len(significant_latin_tokens) >= 2 or
              (len(significant_latin_tokens) >= 1 and latin_ratio >= 0.2)
          )
  
          if "en" in normalized_index_langs and has_meaningful_english and normalized_detected != "en":
              supplemental.append("en")
  
          return supplemental
  
345d960b   tangwang   1. 删除全局 enable_tr...
259
260
261
262
263
264
265
      def parse(
          self,
          query: str,
          tenant_id: Optional[str] = None,
          generate_vector: bool = True,
          context: Optional[Any] = None
      ) -> ParsedQuery:
be52af70   tangwang   first commit
266
267
268
269
270
271
          """
          Parse query through all processing stages.
  
          Args:
              query: Raw query string
              generate_vector: Whether to generate query embedding
16c42787   tangwang   feat: implement r...
272
              context: Optional request context for tracking and logging
be52af70   tangwang   first commit
273
274
275
276
  
          Returns:
              ParsedQuery object with all processing results
          """
16c42787   tangwang   feat: implement r...
277
          # Initialize logger if context provided
950a640e   tangwang   embeddings
278
279
280
          active_logger = context.logger if context else logger
          if context and hasattr(context, "logger"):
              context.logger.info(
70dab99f   tangwang   add logs
281
                  f"Starting query parsing | Original query: '{query}' | Generate vector: {generate_vector}",
16c42787   tangwang   feat: implement r...
282
283
284
                  extra={'reqid': context.reqid, 'uid': context.uid}
              )
  
16c42787   tangwang   feat: implement r...
285
          def log_info(msg):
325eec03   tangwang   1. 日志、配置基础设施,使用优化
286
287
              if context and hasattr(context, 'logger'):
                  context.logger.info(msg, extra={'reqid': context.reqid, 'uid': context.uid})
16c42787   tangwang   feat: implement r...
288
              else:
950a640e   tangwang   embeddings
289
                  active_logger.info(msg)
16c42787   tangwang   feat: implement r...
290
291
  
          def log_debug(msg):
325eec03   tangwang   1. 日志、配置基础设施,使用优化
292
293
              if context and hasattr(context, 'logger'):
                  context.logger.debug(msg, extra={'reqid': context.reqid, 'uid': context.uid})
16c42787   tangwang   feat: implement r...
294
              else:
950a640e   tangwang   embeddings
295
                  active_logger.debug(msg)
be52af70   tangwang   first commit
296
297
298
  
          # Stage 1: Normalize
          normalized = self.normalizer.normalize(query)
70dab99f   tangwang   add logs
299
          log_debug(f"Normalization completed | '{query}' -> '{normalized}'")
16c42787   tangwang   feat: implement r...
300
          if context:
3a5fda00   tangwang   1. ES字段 skus的 ima...
301
              context.store_intermediate_result('query_normalized', normalized)
be52af70   tangwang   first commit
302
303
304
  
          # Extract domain if present (e.g., "brand:Nike" -> domain="brand", query="Nike")
          domain, query_text = self.normalizer.extract_domain_query(normalized)
70dab99f   tangwang   add logs
305
          log_debug(f"Domain extraction | Domain: '{domain}', Query: '{query_text}'")
16c42787   tangwang   feat: implement r...
306
307
308
          if context:
              context.store_intermediate_result('extracted_domain', domain)
              context.store_intermediate_result('domain_query', query_text)
be52af70   tangwang   first commit
309
310
311
  
          # Stage 2: Query rewriting
          rewritten = None
9f96d6f3   tangwang   短query不用语义搜索
312
          if self.config.query_config.rewrite_dictionary:  # Enable rewrite if dictionary exists
be52af70   tangwang   first commit
313
314
              rewritten = self.rewriter.rewrite(query_text)
              if rewritten != query_text:
70dab99f   tangwang   add logs
315
                  log_info(f"Query rewritten | '{query_text}' -> '{rewritten}'")
be52af70   tangwang   first commit
316
                  query_text = rewritten
16c42787   tangwang   feat: implement r...
317
318
                  if context:
                      context.store_intermediate_result('rewritten_query', rewritten)
70dab99f   tangwang   add logs
319
                      context.add_warning(f"Query was rewritten: {query_text}")
be52af70   tangwang   first commit
320
321
322
  
          # Stage 3: Language detection
          detected_lang = self.language_detector.detect(query_text)
a5a6bab8   tangwang   多语言查询优化
323
324
325
          # Use default language if detection failed (None or "unknown")
          if not detected_lang or detected_lang == "unknown":
              detected_lang = self.config.query_config.default_language
70dab99f   tangwang   add logs
326
          log_info(f"Language detection | Detected language: {detected_lang}")
16c42787   tangwang   feat: implement r...
327
328
          if context:
              context.store_intermediate_result('detected_language', detected_lang)
be52af70   tangwang   first commit
329
  
3ec5bfe6   tangwang   1. get_translatio...
330
          # Stage 4: Translation (with async support and conditional waiting)
be52af70   tangwang   first commit
331
          translations = {}
3ec5bfe6   tangwang   1. get_translatio...
332
          translation_futures = {}
d4cadc13   tangwang   翻译重构
333
          translation_executor = None
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
334
          index_langs = ["en", "zh"]
345d960b   tangwang   1. 删除全局 enable_tr...
335
          try:
038e4e2f   tangwang   refactor(i18n): t...
336
              # 根据租户配置的 index_languages 决定翻译目标语言
345d960b   tangwang   1. 删除全局 enable_tr...
337
338
339
              from config.tenant_config_loader import get_tenant_config_loader
              tenant_loader = get_tenant_config_loader()
              tenant_cfg = tenant_loader.get_tenant_config(tenant_id or "default")
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
340
341
342
343
344
345
346
347
348
              raw_index_langs = tenant_cfg.get("index_languages") or ["en", "zh"]
              index_langs = []
              seen_langs = set()
              for lang in raw_index_langs:
                  norm_lang = str(lang or "").strip().lower()
                  if not norm_lang or norm_lang in seen_langs:
                      continue
                  seen_langs.add(norm_lang)
                  index_langs.append(norm_lang)
345d960b   tangwang   1. 删除全局 enable_tr...
349
  
038e4e2f   tangwang   refactor(i18n): t...
350
              target_langs_for_translation = [lang for lang in index_langs if lang != detected_lang]
345d960b   tangwang   1. 删除全局 enable_tr...
351
  
345d960b   tangwang   1. 删除全局 enable_tr...
352
              if target_langs_for_translation:
038e4e2f   tangwang   refactor(i18n): t...
353
                  target_langs = target_langs_for_translation
16c42787   tangwang   feat: implement r...
354
355
  
                  if target_langs:
3ec5bfe6   tangwang   1. get_translatio...
356
                      # Determine if we need to wait for translation results
038e4e2f   tangwang   refactor(i18n): t...
357
358
                      # If detected_lang is not in index_languages, we must wait for translation
                      need_wait_translation = detected_lang not in index_langs
d4cadc13   tangwang   翻译重构
359
  
3ec5bfe6   tangwang   1. get_translatio...
360
                      if need_wait_translation:
d4cadc13   tangwang   翻译重构
361
362
363
                          translation_executor = ThreadPoolExecutor(
                              max_workers=max(1, min(len(target_langs), 4)),
                              thread_name_prefix="query-translation-wait",
3ec5bfe6   tangwang   1. get_translatio...
364
                          )
d4cadc13   tangwang   翻译重构
365
                          for lang in target_langs:
e56fbdc1   tangwang   query trans
366
367
368
369
                              model_name = self._pick_query_translation_model(detected_lang, lang)
                              log_debug(
                                  f"Submitting query translation | source={detected_lang} target={lang} model={model_name}"
                              )
d4cadc13   tangwang   翻译重构
370
371
372
373
374
375
                              translation_futures[lang] = translation_executor.submit(
                                  self.translator.translate,
                                  query_text,
                                  lang,
                                  detected_lang,
                                  "ecommerce_search_query",
e56fbdc1   tangwang   query trans
376
                                  model_name,
d4cadc13   tangwang   翻译重构
377
                              )
3ec5bfe6   tangwang   1. get_translatio...
378
                      else:
d4cadc13   tangwang   翻译重构
379
                          for lang in target_langs:
e56fbdc1   tangwang   query trans
380
381
382
383
                              model_name = self._pick_query_translation_model(detected_lang, lang)
                              log_debug(
                                  f"Submitting query translation | source={detected_lang} target={lang} model={model_name}"
                              )
d4cadc13   tangwang   翻译重构
384
385
386
387
388
389
                              self._translation_executor.submit(
                                  self.translator.translate,
                                  query_text,
                                  lang,
                                  detected_lang,
                                  "ecommerce_search_query",
e56fbdc1   tangwang   query trans
390
                                  model_name,
d4cadc13   tangwang   翻译重构
391
392
                              )
  
6e0e310c   tangwang   1. Translator 类增强
393
                      if translations:
70dab99f   tangwang   add logs
394
                          log_info(f"Translation completed (cache hit) | Query text: '{query_text}' | Results: {translations}")
3ec5bfe6   tangwang   1. get_translatio...
395
                      if translation_futures:
70dab99f   tangwang   add logs
396
                          log_debug(f"Translation in progress, waiting for results... | Query text: '{query_text}' | Languages: {list(translation_futures.keys())}")
6e0e310c   tangwang   1. Translator 类增强
397
                      
16c42787   tangwang   feat: implement r...
398
399
400
401
402
403
                      if context:
                          context.store_intermediate_result('translations', translations)
                          for lang, translation in translations.items():
                              if translation:
                                  context.store_intermediate_result(f'translation_{lang}', translation)
  
345d960b   tangwang   1. 删除全局 enable_tr...
404
          except Exception as e:
70dab99f   tangwang   add logs
405
              error_msg = f"Translation failed | Error: {str(e)}"
345d960b   tangwang   1. 删除全局 enable_tr...
406
407
408
              log_info(error_msg)
              if context:
                  context.add_warning(error_msg)
be52af70   tangwang   first commit
409
  
ea118f2b   tangwang   build_query:根据 qu...
410
          # Stage 5: Query analysis (keywords, token count, query_tokens)
7bc756c5   tangwang   优化 ES 查询构建
411
          keywords = self._extract_keywords(query_text)
ea118f2b   tangwang   build_query:根据 qu...
412
413
          query_tokens = self._get_query_tokens(query_text)
          token_count = len(query_tokens)
7bc756c5   tangwang   优化 ES 查询构建
414
          
70dab99f   tangwang   add logs
415
          log_debug(f"Query analysis | Keywords: {keywords} | Token count: {token_count} | "
ea118f2b   tangwang   build_query:根据 qu...
416
                   f"Query tokens: {query_tokens}")
7bc756c5   tangwang   优化 ES 查询构建
417
418
419
          if context:
              context.store_intermediate_result('keywords', keywords)
              context.store_intermediate_result('token_count', token_count)
ea118f2b   tangwang   build_query:根据 qu...
420
              context.store_intermediate_result('query_tokens', query_tokens)
7bc756c5   tangwang   优化 ES 查询构建
421
          
3ec5bfe6   tangwang   1. get_translatio...
422
          # Stage 6: Text embedding (only for non-short queries) - async execution
be52af70   tangwang   first commit
423
          query_vector = None
3ec5bfe6   tangwang   1. get_translatio...
424
          embedding_future = None
7bc756c5   tangwang   优化 ES 查询构建
425
426
          should_generate_embedding = (
              generate_vector and
9f96d6f3   tangwang   短query不用语义搜索
427
              self.config.query_config.enable_text_embedding and
d90e7428   tangwang   补充重排
428
              domain == "default"
7bc756c5   tangwang   优化 ES 查询构建
429
430
          )
          
3ec5bfe6   tangwang   1. get_translatio...
431
          encoding_executor = None
7bc756c5   tangwang   优化 ES 查询构建
432
433
          if should_generate_embedding:
              try:
26b910bd   tangwang   refactor service ...
434
435
                  if self.text_encoder is None:
                      raise RuntimeError("Text embedding is enabled but text encoder is not initialized")
70dab99f   tangwang   add logs
436
                  log_debug("Starting query vector generation (async)")
3ec5bfe6   tangwang   1. get_translatio...
437
438
                  # Submit encoding task to thread pool for async execution
                  encoding_executor = ThreadPoolExecutor(max_workers=1)
b2e50710   tangwang   BgeEncoder.encode...
439
440
441
442
443
444
                  def _encode_query_vector() -> Optional[np.ndarray]:
                      arr = self.text_encoder.encode([query_text])
                      if arr is None or len(arr) == 0:
                          return None
                      vec = arr[0]
                      return vec if isinstance(vec, np.ndarray) else None
3ec5bfe6   tangwang   1. get_translatio...
445
                  embedding_future = encoding_executor.submit(
b2e50710   tangwang   BgeEncoder.encode...
446
                      _encode_query_vector
3ec5bfe6   tangwang   1. get_translatio...
447
                  )
7bc756c5   tangwang   优化 ES 查询构建
448
              except Exception as e:
70dab99f   tangwang   add logs
449
                  error_msg = f"Query vector generation task submission failed | Error: {str(e)}"
7bc756c5   tangwang   优化 ES 查询构建
450
451
452
                  log_info(error_msg)
                  if context:
                      context.add_warning(error_msg)
3ec5bfe6   tangwang   1. get_translatio...
453
454
455
456
457
                  encoding_executor = None
                  embedding_future = None
          
          # Wait for all async tasks to complete (translation and embedding)
          if translation_futures or embedding_future:
70dab99f   tangwang   add logs
458
              log_debug("Waiting for async tasks to complete...")
3ec5bfe6   tangwang   1. get_translatio...
459
460
461
462
463
464
465
466
467
468
469
470
              
              # Collect all futures with their identifiers
              all_futures = []
              future_to_lang = {}
              for lang, future in translation_futures.items():
                  all_futures.append(future)
                  future_to_lang[future] = ('translation', lang)
              
              if embedding_future:
                  all_futures.append(embedding_future)
                  future_to_lang[embedding_future] = ('embedding', None)
              
d4cadc13   tangwang   翻译重构
471
472
473
              # Enforce a hard timeout for translation-related work (300ms budget)
              done, not_done = wait(all_futures, timeout=0.3)
              for future in done:
3ec5bfe6   tangwang   1. get_translatio...
474
475
476
477
478
479
                  task_type, lang = future_to_lang[future]
                  try:
                      result = future.result()
                      if task_type == 'translation':
                          if result:
                              translations[lang] = result
d4cadc13   tangwang   翻译重构
480
481
482
                              log_info(
                                  f"Translation completed | Query text: '{query_text}' | Target language: {lang} | Translation result: '{result}'"
                              )
3ec5bfe6   tangwang   1. get_translatio...
483
484
485
486
                              if context:
                                  context.store_intermediate_result(f'translation_{lang}', result)
                      elif task_type == 'embedding':
                          query_vector = result
b2e50710   tangwang   BgeEncoder.encode...
487
                          if query_vector is not None:
70dab99f   tangwang   add logs
488
                              log_debug(f"Query vector generation completed | Shape: {query_vector.shape}")
b2e50710   tangwang   BgeEncoder.encode...
489
490
491
                              if context:
                                  context.store_intermediate_result('query_vector_shape', query_vector.shape)
                          else:
70dab99f   tangwang   add logs
492
                              log_info("Query vector generation completed but result is None, will process without vector")
3ec5bfe6   tangwang   1. get_translatio...
493
494
                  except Exception as e:
                      if task_type == 'translation':
70dab99f   tangwang   add logs
495
                          error_msg = f"Translation failed | Language: {lang} | Error: {str(e)}"
3ec5bfe6   tangwang   1. get_translatio...
496
                      else:
70dab99f   tangwang   add logs
497
                          error_msg = f"Query vector generation failed | Error: {str(e)}"
3ec5bfe6   tangwang   1. get_translatio...
498
499
500
                      log_info(error_msg)
                      if context:
                          context.add_warning(error_msg)
d4cadc13   tangwang   翻译重构
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
  
              # Log timeouts for any futures that did not finish within 300ms
              if not_done:
                  for future in not_done:
                      task_type, lang = future_to_lang[future]
                      if task_type == 'translation':
                          timeout_msg = (
                              f"Translation timeout (>300ms) | Language: {lang} | "
                              f"Query text: '{query_text}'"
                          )
                      else:
                          timeout_msg = "Query vector generation timeout (>300ms), proceeding without embedding result"
                      log_info(timeout_msg)
                      if context:
                          context.add_warning(timeout_msg)
  
3ec5bfe6   tangwang   1. get_translatio...
517
518
519
              # Clean up encoding executor
              if encoding_executor:
                  encoding_executor.shutdown(wait=False)
d4cadc13   tangwang   翻译重构
520
521
              if translation_executor:
                  translation_executor.shutdown(wait=False)
3ec5bfe6   tangwang   1. get_translatio...
522
523
524
525
              
              # Update translations in context after all are complete
              if translations and context:
                  context.store_intermediate_result('translations', translations)
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
526
527
528
529
530
531
532
533
          
          # Build language-scoped query plan: source language + available translations
          query_text_by_lang: Dict[str, str] = {}
          if query_text:
              query_text_by_lang[detected_lang] = query_text
          for lang, translated_text in (translations or {}).items():
              if translated_text and str(translated_text).strip():
                  query_text_by_lang[str(lang).strip().lower()] = str(translated_text)
a8261ece   tangwang   检索效果优化
534
535
536
537
538
539
540
541
542
543
544
  
          supplemental_search_langs = self._infer_supplemental_search_langs(
              query_text=query_text,
              detected_lang=detected_lang,
              index_langs=index_langs,
          )
          for lang in supplemental_search_langs:
              if lang not in query_text_by_lang and query_text:
                  # Use the original mixed-script query as a robust fallback probe for that language field set.
                  query_text_by_lang[lang] = query_text
  
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
          source_in_index_languages = detected_lang in index_langs
          ordered_search_langs: List[str] = []
          seen_order = set()
          if detected_lang in query_text_by_lang:
              ordered_search_langs.append(detected_lang)
              seen_order.add(detected_lang)
          for lang in index_langs:
              if lang in query_text_by_lang and lang not in seen_order:
                  ordered_search_langs.append(lang)
                  seen_order.add(lang)
          for lang in query_text_by_lang.keys():
              if lang not in seen_order:
                  ordered_search_langs.append(lang)
                  seen_order.add(lang)
          
          if context:
              context.store_intermediate_result("search_langs", ordered_search_langs)
              context.store_intermediate_result("query_text_by_lang", query_text_by_lang)
a8261ece   tangwang   检索效果优化
563
              context.store_intermediate_result("supplemental_search_langs", supplemental_search_langs)
be52af70   tangwang   first commit
564
565
566
567
  
          # Build result
          result = ParsedQuery(
              original_query=query,
3a5fda00   tangwang   1. ES字段 skus的 ima...
568
              query_normalized=normalized,
be52af70   tangwang   first commit
569
570
571
572
              rewritten_query=rewritten,
              detected_language=detected_lang,
              translations=translations,
              query_vector=query_vector,
7bc756c5   tangwang   优化 ES 查询构建
573
574
575
              domain=domain,
              keywords=keywords,
              token_count=token_count,
bd96cead   tangwang   1. 动态多语言字段与统一策略配置
576
577
578
579
580
              query_tokens=query_tokens,
              query_text_by_lang=query_text_by_lang,
              search_langs=ordered_search_langs,
              index_languages=index_langs,
              source_in_index_languages=source_in_index_languages,
be52af70   tangwang   first commit
581
582
          )
  
325eec03   tangwang   1. 日志、配置基础设施,使用优化
583
584
          if context and hasattr(context, 'logger'):
              context.logger.info(
70dab99f   tangwang   add logs
585
586
587
                  f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | "
                  f"Language: {detected_lang} | Domain: {domain} | "
                  f"Translation count: {len(translations)} | Vector: {'yes' if query_vector is not None else 'no'}",
16c42787   tangwang   feat: implement r...
588
589
590
                  extra={'reqid': context.reqid, 'uid': context.uid}
              )
          else:
325eec03   tangwang   1. 日志、配置基础设施,使用优化
591
              logger.info(
70dab99f   tangwang   add logs
592
593
                  f"Query parsing completed | Original query: '{query}' | Final query: '{rewritten or query_text}' | "
                  f"Language: {detected_lang} | Domain: {domain}"
325eec03   tangwang   1. 日志、配置基础设施,使用优化
594
              )
16c42787   tangwang   feat: implement r...
595
  
be52af70   tangwang   first commit
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
          return result
  
      def get_search_queries(self, parsed_query: ParsedQuery) -> List[str]:
          """
          Get list of queries to search (original + translations).
  
          Args:
              parsed_query: Parsed query object
  
          Returns:
              List of query strings to search
          """
          queries = [parsed_query.rewritten_query]
  
          # Add translations
          for lang, translation in parsed_query.translations.items():
              if translation and translation != parsed_query.rewritten_query:
                  queries.append(translation)
  
          return queries
  
      def update_rewrite_rules(self, rules: Dict[str, str]) -> None:
          """
          Update query rewrite rules.
  
          Args:
              rules: Dictionary of pattern -> replacement mappings
          """
          for pattern, replacement in rules.items():
              self.rewriter.add_rule(pattern, replacement)
  
      def get_rewrite_rules(self) -> Dict[str, str]:
          """Get current rewrite rules."""
          return self.rewriter.get_rules()