""" Elasticsearch query builder. Converts parsed queries and search parameters into ES DSL queries. Simplified architecture: - filters and (text_recall or embedding_recall) - function_score wrapper for boosting fields """ from typing import Dict, Any, List, Optional, Union, Tuple import numpy as np from config import FunctionScoreConfig class ESQueryBuilder: """Builds Elasticsearch DSL queries.""" def __init__( self, match_fields: List[str], field_boosts: Optional[Dict[str, float]] = None, multilingual_fields: Optional[List[str]] = None, shared_fields: Optional[List[str]] = None, core_multilingual_fields: Optional[List[str]] = None, text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, source_fields: Optional[List[str]] = None, function_score_config: Optional[FunctionScoreConfig] = None, default_language: str = "en", knn_boost: float = 0.25, base_minimum_should_match: str = "75%", translation_minimum_should_match: str = "75%", translation_boost: float = 0.4, translation_boost_when_source_missing: float = 1.0, source_boost_when_missing: float = 0.6, original_query_fallback_boost_when_translation_missing: float = 0.2, keywords_boost: float = 0.1, enable_phrase_query: bool = True, tie_breaker_base_query: float = 0.9, tie_breaker_keywords: float = 0.9, ): """ Initialize query builder. Multi-language search (translation-based cross-language recall) is always enabled: queries are matched against detected-language and translated target-language clauses. Args: match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings source_fields: Fields to return in search results (_source includes) function_score_config: Function score configuration default_language: Default language to use when detection fails or returns "unknown" knn_boost: Boost value for KNN (embedding recall) """ self.match_fields = match_fields self.field_boosts = field_boosts or {} self.multilingual_fields = multilingual_fields or [ "title", "brief", "description", "vendor", "category_path", "category_name_text" ] self.shared_fields = shared_fields or ["tags", "option1_values", "option2_values", "option3_values"] self.core_multilingual_fields = core_multilingual_fields or ["title", "brief", "vendor", "category_name_text"] self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field self.source_fields = source_fields self.function_score_config = function_score_config self.default_language = default_language self.knn_boost = knn_boost self.base_minimum_should_match = base_minimum_should_match self.translation_minimum_should_match = translation_minimum_should_match self.translation_boost = float(translation_boost) self.translation_boost_when_source_missing = float(translation_boost_when_source_missing) self.source_boost_when_missing = float(source_boost_when_missing) self.original_query_fallback_boost_when_translation_missing = float( original_query_fallback_boost_when_translation_missing ) self.keywords_boost = float(keywords_boost) self.enable_phrase_query = bool(enable_phrase_query) self.tie_breaker_base_query = float(tie_breaker_base_query) self.tie_breaker_keywords = float(tie_breaker_keywords) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return all source fields) - []: _source=false - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _split_filters_for_faceting( self, filters: Optional[Dict[str, Any]], facet_configs: Optional[List[Any]] ) -> tuple: """ Split filters into conjunctive (query) and disjunctive (post_filter) based on facet configs. Disjunctive filters (multi-select facets): - Applied via post_filter (affects results but not aggregations) - Allows showing other options in the same facet even when filtered Conjunctive filters (standard facets): - Applied in query.bool.filter (affects both results and aggregations) - Standard drill-down behavior Args: filters: All filters from request facet_configs: Facet configurations with disjunctive flags Returns: (conjunctive_filters, disjunctive_filters) """ if not filters or not facet_configs: return filters or {}, {} # Get fields that support multi-select multi_select_fields = set() for fc in facet_configs: if getattr(fc, 'disjunctive', False): # Handle specifications.xxx format if fc.field.startswith('specifications.'): multi_select_fields.add('specifications') else: multi_select_fields.add(fc.field) # Split filters conjunctive = {} disjunctive = {} for field, value in filters.items(): if field in multi_select_fields: disjunctive[field] = value else: conjunctive[field] = value return conjunctive, disjunctive def build_query( self, query_text: str, query_vector: Optional[np.ndarray] = None, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facet_configs: Optional[List[Any]] = None, size: int = 10, from_: int = 0, enable_knn: bool = True, knn_k: int = 50, knn_num_candidates: int = 200, min_score: Optional[float] = None, parsed_query: Optional[Any] = None ) -> Dict[str, Any]: """ Build complete ES query with post_filter support for multi-select faceting. 结构:filters and (text_recall or embedding_recall) + post_filter - conjunctive_filters: 应用在 query.bool.filter(影响结果和聚合) - disjunctive_filters: 应用在 post_filter(只影响结果,不影响聚合) - text_recall: 文本相关性召回(按 search_langs 动态语言字段) - embedding_recall: 向量召回(KNN) - function_score: 包装召回部分,支持提权字段 Args: query_text: Query text for BM25 matching query_vector: Query embedding for KNN search filters: Exact match filters range_filters: Range filters for numeric fields (always applied in query) facet_configs: Facet configurations (used to identify multi-select facets) size: Number of results from_: Offset for pagination enable_knn: Whether to use KNN search knn_k: K value for KNN knn_num_candidates: Number of candidates for KNN min_score: Minimum score threshold Returns: ES query DSL dictionary """ # Boolean AST path has been removed; keep a single text strategy. es_query = { "size": size, "from": from_ } # Add _source filtering with explicit tri-state semantics. self._apply_source_filter(es_query) # 1. Build recall queries (text or embedding) recall_clauses = [] # Text recall (always include if query_text exists) if query_text: # Unified text query strategy text_query = self._build_advanced_text_query(query_text, parsed_query) recall_clauses.append(text_query) # Embedding recall (KNN - separate from query, handled below) has_embedding = enable_knn and query_vector is not None and self.text_embedding_field # 2. Split filters for multi-select faceting conjunctive_filters, disjunctive_filters = self._split_filters_for_faceting( filters, facet_configs ) # Build filter clauses for query (conjunctive filters + range filters) filter_clauses = self._build_filters(conjunctive_filters, range_filters) # 3. Build main query structure: filters and recall if recall_clauses: # Combine text recalls with OR logic (if multiple) if len(recall_clauses) == 1: recall_query = recall_clauses[0] else: recall_query = { "bool": { "should": recall_clauses, "minimum_should_match": 1 } } # Wrap recall with function_score for boosting recall_query = self._wrap_with_function_score(recall_query) # Combine filters and recall if filter_clauses: es_query["query"] = { "bool": { "must": [recall_query], "filter": filter_clauses } } else: es_query["query"] = recall_query else: # No recall queries, only filters (match_all filtered) if filter_clauses: es_query["query"] = { "bool": { "must": [{"match_all": {}}], "filter": filter_clauses } } else: es_query["query"] = {"match_all": {}} # 4. Add KNN search if enabled (separate from query, ES will combine) # Adjust KNN k, num_candidates, boost by query_tokens (short query: less KNN; long: more) if has_embedding: knn_boost = self.knn_boost if parsed_query: query_tokens = getattr(parsed_query, 'query_tokens', None) or [] token_count = len(query_tokens) if token_count <= 2: knn_k, knn_num_candidates = 30, 100 knn_boost = self.knn_boost * 0.6 # Lower weight for short queries elif token_count >= 5: knn_k, knn_num_candidates = 80, 300 knn_boost = self.knn_boost * 1.4 # Higher weight for long queries else: knn_k, knn_num_candidates = 50, 200 else: knn_k, knn_num_candidates = 50, 200 knn_clause = { "field": self.text_embedding_field, "query_vector": query_vector.tolist(), "k": knn_k, "num_candidates": knn_num_candidates, "boost": knn_boost } # Top-level knn does not inherit query.bool.filter automatically. # Apply conjunctive + range filters here so vector recall respects hard filters. if filter_clauses: if len(filter_clauses) == 1: knn_clause["filter"] = filter_clauses[0] else: knn_clause["filter"] = { "bool": { "filter": filter_clauses } } es_query["knn"] = knn_clause # 5. Add post_filter for disjunctive (multi-select) filters if disjunctive_filters: post_filter_clauses = self._build_filters(disjunctive_filters, None) if post_filter_clauses: if len(post_filter_clauses) == 1: es_query["post_filter"] = post_filter_clauses[0] else: es_query["post_filter"] = { "bool": {"filter": post_filter_clauses} } # 6. Add minimum score filter if min_score is not None: es_query["min_score"] = min_score return es_query def _wrap_with_function_score(self, query: Dict[str, Any]) -> Dict[str, Any]: """ Wrap query with function_score for boosting fields. Args: query: Base query to wrap Returns: Function score query or original query if no functions configured """ functions = self._build_score_functions() # If no functions configured, return original query if not functions: return query # Build function_score query score_mode = self.function_score_config.score_mode if self.function_score_config else "sum" boost_mode = self.function_score_config.boost_mode if self.function_score_config else "multiply" function_score_query = { "function_score": { "query": query, "functions": functions, "score_mode": score_mode, "boost_mode": boost_mode } } return function_score_query def _build_score_functions(self) -> List[Dict[str, Any]]: """ Build function_score functions from config. Returns: List of function score functions """ functions = [] if not self.function_score_config: return functions config_functions = self.function_score_config.functions or [] for func_config in config_functions: func_type = func_config.get("type") if func_type == "filter_weight": # Filter + Weight functions.append({ "filter": func_config["filter"], "weight": func_config.get("weight", 1.0) }) elif func_type == "field_value_factor": # Field Value Factor functions.append({ "field_value_factor": { "field": func_config["field"], "factor": func_config.get("factor", 1.0), "modifier": func_config.get("modifier", "none"), "missing": func_config.get("missing", 1.0) } }) elif func_type == "decay": # Decay Function (gauss/exp/linear) decay_func = func_config.get("function", "gauss") field = func_config["field"] decay_params = { "origin": func_config.get("origin", "now"), "scale": func_config["scale"] } if "offset" in func_config: decay_params["offset"] = func_config["offset"] if "decay" in func_config: decay_params["decay"] = func_config["decay"] functions.append({ decay_func: { field: decay_params } }) return functions def _build_text_query(self, query_text: str) -> Dict[str, Any]: """ Build simple text matching query (BM25). Args: query_text: Query text Returns: ES query clause """ return { "multi_match": { "query": query_text, "fields": self.match_fields, "minimum_should_match": "67%", "tie_breaker": 0.9, "boost": 1.0, "_name": "base_query" } } def _format_field_with_boost(self, field_name: str, boost: float) -> str: if abs(float(boost) - 1.0) < 1e-9: return field_name return f"{field_name}^{boost}" def _get_field_boost(self, base_field: str, language: Optional[str] = None) -> float: # Language-specific override first (e.g. title.de), then base field (e.g. title) if language: lang_key = f"{base_field}.{language}" if lang_key in self.field_boosts: return float(self.field_boosts[lang_key]) if base_field in self.field_boosts: return float(self.field_boosts[base_field]) return 1.0 def _get_match_fields(self, language: str) -> Tuple[List[str], List[str]]: """ Build dynamic match fields for one language. Args: language: Language code (e.g. zh/en/de/fr/...) Returns: (all_fields, core_fields) - core_fields are for phrase/keyword queries """ lang = (language or "").strip().lower() all_fields: List[str] = [] core_fields: List[str] = [] for base in self.multilingual_fields: field = f"{base}.{lang}" boost = self._get_field_boost(base, lang) all_fields.append(self._format_field_with_boost(field, boost)) for shared in self.shared_fields: boost = self._get_field_boost(shared, None) all_fields.append(self._format_field_with_boost(shared, boost)) for base in self.core_multilingual_fields: field = f"{base}.{lang}" boost = self._get_field_boost(base, lang) core_fields.append(self._format_field_with_boost(field, boost)) return all_fields, core_fields def _get_embedding_field(self, language: str) -> str: """Get embedding field name for a language.""" # Currently using unified embedding field return self.text_embedding_field or "title_embedding" def _build_advanced_text_query(self, query_text: str, parsed_query: Optional[Any] = None) -> Dict[str, Any]: """ Build advanced text query using should clauses with multiple query strategies. Unified implementation: - base_query: source-language clause - translation queries: target-language clauses from search_langs/query_text_by_lang - phrase query: for short queries (2+ tokens) - keywords query: extracted nouns from query - KNN query: added separately in build_query Args: query_text: Query text parsed_query: ParsedQuery object with analysis results Returns: ES bool query with should clauses """ should_clauses = [] # Get query analysis from parsed_query query_text_by_lang: Dict[str, str] = {} search_langs: List[str] = [] source_lang = self.default_language source_in_index_languages = True index_languages: List[str] = [] keywords = "" query_tokens = [] token_count = 0 if parsed_query: query_text_by_lang = getattr(parsed_query, "query_text_by_lang", None) or {} search_langs = getattr(parsed_query, "search_langs", None) or [] detected_lang = getattr(parsed_query, "detected_language", None) source_lang = detected_lang if detected_lang and detected_lang != "unknown" else self.default_language source_in_index_languages = bool( getattr(parsed_query, "source_in_index_languages", True) ) index_languages = getattr(parsed_query, "index_languages", None) or [] keywords = getattr(parsed_query, 'keywords', '') or "" query_tokens = getattr(parsed_query, 'query_tokens', None) or [] token_count = len(query_tokens) or getattr(parsed_query, 'token_count', 0) or 0 if not query_text_by_lang: query_text_by_lang = {source_lang: query_text} if source_lang not in query_text_by_lang and query_text: query_text_by_lang[source_lang] = query_text if not search_langs: search_langs = list(query_text_by_lang.keys()) # Core fields for phrase/keyword based on source language. _, core_fields = self._get_match_fields(source_lang) if not core_fields and search_langs: _, core_fields = self._get_match_fields(search_langs[0]) # Base + translated clauses based on language plan. for lang in search_langs: lang_query = query_text_by_lang.get(lang) if not lang_query: continue match_fields, _ = self._get_match_fields(lang) if not match_fields: continue is_source = (lang == source_lang) clause_boost = 1.0 clause_name = "base_query" if is_source else f"base_query_trans_{lang}" minimum_should_match = ( self.base_minimum_should_match if is_source else self.translation_minimum_should_match ) if is_source and not source_in_index_languages: clause_boost = self.source_boost_when_missing elif not is_source: clause_boost = ( self.translation_boost if source_in_index_languages else self.translation_boost_when_source_missing ) clause = { "multi_match": { "_name": clause_name, "fields": match_fields, "minimum_should_match": minimum_should_match, "query": lang_query, "tie_breaker": self.tie_breaker_base_query, } } if abs(clause_boost - 1.0) > 1e-9: clause["multi_match"]["boost"] = clause_boost should_clauses.append({ "multi_match": clause["multi_match"] }) # Fallback: source language is not indexed and translation for some index languages is missing. # Use original query text on missing index-language fields with a low boost. if not source_in_index_languages and query_text and index_languages: normalized_index_langs: List[str] = [] seen_langs = set() for lang in index_languages: norm_lang = str(lang or "").strip().lower() if not norm_lang or norm_lang in seen_langs: continue seen_langs.add(norm_lang) normalized_index_langs.append(norm_lang) for lang in normalized_index_langs: if lang == source_lang: continue if lang in query_text_by_lang: continue match_fields, _ = self._get_match_fields(lang) if not match_fields: continue should_clauses.append({ "multi_match": { "_name": f"fallback_original_query_{lang}", "query": query_text, "fields": match_fields, "minimum_should_match": self.translation_minimum_should_match, "tie_breaker": self.tie_breaker_base_query, "boost": self.original_query_fallback_boost_when_translation_missing, } }) # 3. Short query - add phrase query (derived from query_tokens) # is_short: quoted or ((token_count <= 2 or len <= 4) and no space) source_query_text = query_text_by_lang.get(source_lang) or query_text ENABLE_PHRASE_QUERY = self.enable_phrase_query is_quoted = query_text.startswith('"') and query_text.endswith('"') is_short = is_quoted or ((token_count <= 2 or len(query_text) <= 4) and ' ' not in query_text) if ENABLE_PHRASE_QUERY and core_fields and token_count >= 2 and is_short: query_length = len(query_text) slop = 0 if query_length < 3 else 1 if query_length < 5 else 2 should_clauses.append({ "multi_match": { "query": source_query_text, "fields": core_fields, "type": "phrase", "slop": slop, "boost": 1.0, "_name": "phrase_query" } }) # 4. Keywords query - extracted nouns from query elif core_fields and keywords and len(keywords.split()) <= 2 and 2 * len(keywords.replace(' ', '')) <= len(query_text): should_clauses.append({ "multi_match": { "query": keywords, "fields": core_fields, # "operator": "AND", "tie_breaker": self.tie_breaker_keywords, "boost": self.keywords_boost, "_name": "keywords_query" } }) # Fallback to a simple query when language fields cannot be resolved. if not should_clauses: fallback_fields = self.match_fields or ["title.en^1.0"] return { "multi_match": { "_name": "base_query_fallback", "query": query_text, "fields": fallback_fields, "minimum_should_match": self.base_minimum_should_match, "tie_breaker": self.tie_breaker_base_query, } } # Return bool query with should clauses if len(should_clauses) == 1: return should_clauses[0] return { "bool": { "should": should_clauses, "minimum_should_match": 1 } } def _build_filters( self, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, 'RangeFilter']] = None ) -> List[Dict[str, Any]]: """ 构建过滤子句。 Args: filters: 精确匹配过滤器字典 range_filters: 范围过滤器(Dict[str, RangeFilter],RangeFilter 是 Pydantic 模型) Returns: ES filter 子句列表 """ filter_clauses = [] # 1. 处理精确匹配过滤 if filters: for field, value in filters.items(): # 特殊处理:specifications 嵌套过滤 if field == "specifications": if isinstance(value, dict): # 单个规格过滤:{"name": "color", "value": "green"} name = value.get("name") spec_value = value.get("value") if name and spec_value: filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) elif isinstance(value, list): # 多个规格过滤:按 name 分组,相同维度 OR,不同维度 AND # 例如:[{"name": "size", "value": "3"}, {"name": "size", "value": "4"}, {"name": "color", "value": "green"}] # 应该生成:(size=3 OR size=4) AND color=green from collections import defaultdict specs_by_name = defaultdict(list) for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: specs_by_name[name].append(spec_value) # 为每个 name 维度生成一个过滤子句 for name, values in specs_by_name.items(): if len(values) == 1: # 单个值,直接生成 term 查询 filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": values[0]}} ] } } } }) else: # 多个值,使用 should (OR) 连接 should_clauses = [] for spec_value in values: should_clauses.append({ "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } }) filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "should": should_clauses, "minimum_should_match": 1 } } } }) continue # *_all 语义:多值时为 AND(必须同时匹配所有值) if field.endswith("_all"): es_field = field[:-4] # 去掉 _all 后缀 if es_field == "specifications" and isinstance(value, list): # specifications_all: 列表内每个规格条件都要满足(AND) must_nested = [] for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: must_nested.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) if must_nested: filter_clauses.append({"bool": {"must": must_nested}}) else: # 普通字段 _all:多值用 must + 多个 term if isinstance(value, list): if value: filter_clauses.append({ "bool": { "must": [{"term": {es_field: v}} for v in value] } }) else: filter_clauses.append({"term": {es_field: value}}) continue # 普通字段过滤(默认多值为 OR) if isinstance(value, list): # 多值匹配(OR) filter_clauses.append({ "terms": {field: value} }) else: # 单值精确匹配 filter_clauses.append({ "term": {field: value} }) # 2. 处理范围过滤(支持 RangeFilter Pydantic 模型或字典) if range_filters: for field, range_filter in range_filters.items(): # 支持 Pydantic 模型或字典格式 if hasattr(range_filter, 'model_dump'): # Pydantic 模型 range_dict = range_filter.model_dump(exclude_none=True) elif isinstance(range_filter, dict): # 已经是字典格式 range_dict = {k: v for k, v in range_filter.items() if v is not None} else: # 其他格式,跳过 continue if range_dict: filter_clauses.append({ "range": {field: range_dict} }) return filter_clauses def add_sorting( self, es_query: Dict[str, Any], sort_by: str, sort_order: str = "desc" ) -> Dict[str, Any]: """ Add sorting to ES query. Args: es_query: Existing ES query sort_by: Field name for sorting (支持 'price' 自动映射) sort_order: Sort order: 'asc' or 'desc' Returns: Modified ES query """ if not sort_by: return es_query if not sort_order: sort_order = "desc" # Auto-map 'price' to 'min_price' or 'max_price' based on sort_order if sort_by == "price": if sort_order.lower() == "asc": sort_by = "min_price" # 价格从低到高 else: sort_by = "max_price" # 价格从高到低 if "sort" not in es_query: es_query["sort"] = [] # Add the specified sort sort_field = { sort_by: { "order": sort_order.lower() } } es_query["sort"].append(sort_field) return es_query def build_facets( self, facet_configs: Optional[List['FacetConfig']] = None, use_reverse_nested: bool = True ) -> Dict[str, Any]: """ 构建分面聚合。 Args: facet_configs: 分面配置对象列表 use_reverse_nested: 是否使用 reverse_nested 统计产品数量(默认 True) 如果为 False,将统计嵌套文档数量(性能更好但计数可能不准确) 支持的字段类型: - 普通字段: 如 "category1_name"(terms 或 range 类型) - specifications: "specifications"(返回所有规格名称及其值) - specifications.{name}: 如 "specifications.color"(返回指定规格名称的值) Returns: ES aggregations 字典 性能说明: - use_reverse_nested=True: 统计产品数量,准确性高但性能略差(通常影响 < 20%) - use_reverse_nested=False: 统计嵌套文档数量,性能更好但计数可能不准确 """ if not facet_configs: return {} aggs = {} for config in facet_configs: field = config.field size = config.size facet_type = config.type # 处理 specifications(所有规格名称) if field == "specifications": aggs["specifications_facet"] = { "nested": {"path": "specifications"}, "aggs": { "by_name": { "terms": { "field": "specifications.name", "size": 20, "order": {"_count": "desc"} }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } } } } } continue # 处理 specifications.{name}(指定规格名称) if field.startswith("specifications."): name = field[len("specifications."):] agg_name = f"specifications_{name}_facet" # 使用 reverse_nested 统计产品(父文档)数量,而不是规格条目(嵌套文档)数量 # 这样可以确保分面计数反映实际的产品数量,与搜索结果数量一致 base_value_counts = { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } # 如果启用 reverse_nested,添加子聚合统计产品数量 if use_reverse_nested: base_value_counts["aggs"] = { "product_count": { "reverse_nested": {} } } aggs[agg_name] = { "nested": {"path": "specifications"}, "aggs": { "filter_by_name": { "filter": {"term": {"specifications.name": name}}, "aggs": { "value_counts": base_value_counts } } } } continue # 处理普通字段 agg_name = f"{field}_facet" if facet_type == 'terms': aggs[agg_name] = { "terms": { "field": field, "size": size, "order": {"_count": "desc"} } } elif facet_type == 'range': if config.ranges: aggs[agg_name] = { "range": { "field": field, "ranges": config.ranges } } return aggs