""" Elasticsearch query builder. Converts parsed queries and search parameters into ES DSL queries. Simplified architecture: - filters and (text_recall or embedding_recall) - function_score wrapper for boosting fields """ from typing import Dict, Any, List, Optional, Union, Tuple import numpy as np from .boolean_parser import QueryNode from config import FunctionScoreConfig class ESQueryBuilder: """Builds Elasticsearch DSL queries.""" def __init__( self, index_name: str, match_fields: List[str], text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, source_fields: Optional[List[str]] = None, function_score_config: Optional[FunctionScoreConfig] = None, enable_multilang_search: bool = True ): """ Initialize query builder. Args: index_name: ES index name match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings source_fields: Fields to return in search results (_source includes) function_score_config: Function score configuration enable_multilang_search: Enable multi-language search using translations """ self.index_name = index_name self.match_fields = match_fields self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field self.source_fields = source_fields self.function_score_config = function_score_config self.enable_multilang_search = enable_multilang_search def _split_filters_for_faceting( self, filters: Optional[Dict[str, Any]], facet_configs: Optional[List[Any]] ) -> tuple: """ Split filters into conjunctive (query) and disjunctive (post_filter) based on facet configs. Disjunctive filters (multi-select facets): - Applied via post_filter (affects results but not aggregations) - Allows showing other options in the same facet even when filtered Conjunctive filters (standard facets): - Applied in query.bool.filter (affects both results and aggregations) - Standard drill-down behavior Args: filters: All filters from request facet_configs: Facet configurations with disjunctive flags Returns: (conjunctive_filters, disjunctive_filters) """ if not filters or not facet_configs: return filters or {}, {} # Get fields that support multi-select multi_select_fields = set() for fc in facet_configs: if getattr(fc, 'disjunctive', False): # Handle specifications.xxx format if fc.field.startswith('specifications.'): multi_select_fields.add('specifications') else: multi_select_fields.add(fc.field) # Split filters conjunctive = {} disjunctive = {} for field, value in filters.items(): if field in multi_select_fields: disjunctive[field] = value else: conjunctive[field] = value return conjunctive, disjunctive def build_query( self, query_text: str, query_vector: Optional[np.ndarray] = None, query_node: Optional[QueryNode] = None, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facet_configs: Optional[List[Any]] = None, size: int = 10, from_: int = 0, enable_knn: bool = True, knn_k: int = 50, knn_num_candidates: int = 200, min_score: Optional[float] = None, parsed_query: Optional[Any] = None ) -> Dict[str, Any]: """ Build complete ES query with post_filter support for multi-select faceting. 结构:filters and (text_recall or embedding_recall) + post_filter - conjunctive_filters: 应用在 query.bool.filter(影响结果和聚合) - disjunctive_filters: 应用在 post_filter(只影响结果,不影响聚合) - text_recall: 文本相关性召回(中英文字段都用) - embedding_recall: 向量召回(KNN) - function_score: 包装召回部分,支持提权字段 Args: query_text: Query text for BM25 matching query_vector: Query embedding for KNN search query_node: Parsed boolean expression tree filters: Exact match filters range_filters: Range filters for numeric fields (always applied in query) facet_configs: Facet configurations (used to identify multi-select facets) size: Number of results from_: Offset for pagination enable_knn: Whether to use KNN search knn_k: K value for KNN knn_num_candidates: Number of candidates for KNN min_score: Minimum score threshold Returns: ES query DSL dictionary """ es_query = { "size": size, "from": from_ } # Add _source filtering if source_fields are configured if self.source_fields: es_query["_source"] = { "includes": self.source_fields } # 1. Build recall queries (text or embedding) recall_clauses = [] # Text recall (always include if query_text exists) if query_text: if query_node and query_node.operator != 'TERM': # Complex boolean query text_query = self._build_boolean_query(query_node) else: # Simple text query - use advanced should-based multi-query strategy text_query = self._build_advanced_text_query(query_text, parsed_query) recall_clauses.append(text_query) # Embedding recall (KNN - separate from query, handled below) has_embedding = enable_knn and query_vector is not None and self.text_embedding_field # 2. Split filters for multi-select faceting conjunctive_filters, disjunctive_filters = self._split_filters_for_faceting( filters, facet_configs ) # Build filter clauses for query (conjunctive filters + range filters) filter_clauses = self._build_filters(conjunctive_filters, range_filters) # 3. Build main query structure: filters and recall if recall_clauses: # Combine text recalls with OR logic (if multiple) if len(recall_clauses) == 1: recall_query = recall_clauses[0] else: recall_query = { "bool": { "should": recall_clauses, "minimum_should_match": 1 } } # Wrap recall with function_score for boosting recall_query = self._wrap_with_function_score(recall_query) # Combine filters and recall if filter_clauses: es_query["query"] = { "bool": { "must": [recall_query], "filter": filter_clauses } } else: es_query["query"] = recall_query else: # No recall queries, only filters (match_all filtered) if filter_clauses: es_query["query"] = { "bool": { "must": [{"match_all": {}}], "filter": filter_clauses } } else: es_query["query"] = {"match_all": {}} # 4. Add KNN search if enabled (separate from query, ES will combine) if has_embedding: knn_clause = { "field": self.text_embedding_field, "query_vector": query_vector.tolist(), "k": knn_k, "num_candidates": knn_num_candidates, "boost": 0.2 # Lower boost for embedding recall } es_query["knn"] = knn_clause # 5. Add post_filter for disjunctive (multi-select) filters if disjunctive_filters: post_filter_clauses = self._build_filters(disjunctive_filters, None) if post_filter_clauses: if len(post_filter_clauses) == 1: es_query["post_filter"] = post_filter_clauses[0] else: es_query["post_filter"] = { "bool": {"filter": post_filter_clauses} } # 6. Add minimum score filter if min_score is not None: es_query["min_score"] = min_score return es_query def _wrap_with_function_score(self, query: Dict[str, Any]) -> Dict[str, Any]: """ Wrap query with function_score for boosting fields. Args: query: Base query to wrap Returns: Function score query or original query if no functions configured """ functions = self._build_score_functions() # If no functions configured, return original query if not functions: return query # Build function_score query score_mode = self.function_score_config.score_mode if self.function_score_config else "sum" boost_mode = self.function_score_config.boost_mode if self.function_score_config else "multiply" function_score_query = { "function_score": { "query": query, "functions": functions, "score_mode": score_mode, "boost_mode": boost_mode } } return function_score_query def _build_score_functions(self) -> List[Dict[str, Any]]: """ Build function_score functions from config. Returns: List of function score functions """ functions = [] if not self.function_score_config: return functions config_functions = self.function_score_config.functions or [] for func_config in config_functions: func_type = func_config.get("type") if func_type == "filter_weight": # Filter + Weight functions.append({ "filter": func_config["filter"], "weight": func_config.get("weight", 1.0) }) elif func_type == "field_value_factor": # Field Value Factor functions.append({ "field_value_factor": { "field": func_config["field"], "factor": func_config.get("factor", 1.0), "modifier": func_config.get("modifier", "none"), "missing": func_config.get("missing", 1.0) } }) elif func_type == "decay": # Decay Function (gauss/exp/linear) decay_func = func_config.get("function", "gauss") field = func_config["field"] decay_params = { "origin": func_config.get("origin", "now"), "scale": func_config["scale"] } if "offset" in func_config: decay_params["offset"] = func_config["offset"] if "decay" in func_config: decay_params["decay"] = func_config["decay"] functions.append({ decay_func: { field: decay_params } }) return functions def _build_text_query(self, query_text: str) -> Dict[str, Any]: """ Build simple text matching query (BM25). Legacy method - kept for backward compatibility. Args: query_text: Query text Returns: ES query clause """ return { "multi_match": { "query": query_text, "fields": self.match_fields, "minimum_should_match": "67%", "tie_breaker": 0.9, "boost": 1.0, "_name": "base_query" } } def _get_match_fields(self, language: str) -> Tuple[List[str], List[str]]: """ Get match fields for a specific language. Args: language: Language code ('zh' or 'en') Returns: (all_fields, core_fields) - core_fields are for phrase/keyword queries """ if language == 'zh': all_fields = [ "title_zh^3.0", "brief_zh^1.5", "description_zh", "vendor_zh^1.5", "tags", "category_path_zh^1.5", "category_name_zh^1.5", "option1_values^0.5" ] core_fields = [ "title_zh^3.0", "brief_zh^1.5", "vendor_zh^1.5", "category_name_zh^1.5" ] else: # en all_fields = [ "title_en^3.0", "brief_en^1.5", "description_en", "vendor_en^1.5", "tags", "category_path_en^1.5", "category_name_en^1.5", "option1_values^0.5" ] core_fields = [ "title_en^3.0", "brief_en^1.5", "vendor_en^1.5", "category_name_en^1.5" ] return all_fields, core_fields def _get_embedding_field(self, language: str) -> str: """Get embedding field name for a language.""" # Currently using unified embedding field return self.text_embedding_field or "title_embedding" def _build_advanced_text_query(self, query_text: str, parsed_query: Optional[Any] = None) -> Dict[str, Any]: """ Build advanced text query using should clauses with multiple query strategies. Reference implementation: - base_query: main query with AND operator and 75% minimum_should_match - translation queries: lower boost (0.4) for other languages - phrase query: for short queries (2+ tokens) - keywords query: extracted nouns from query - KNN query: added separately in build_query Args: query_text: Query text parsed_query: ParsedQuery object with analysis results Returns: ES bool query with should clauses """ should_clauses = [] # Get query analysis from parsed_query translations = {} language = 'zh' keywords = "" token_count = 0 is_short_query = False is_long_query = False if parsed_query: translations = parsed_query.translations or {} language = parsed_query.detected_language or 'zh' keywords = getattr(parsed_query, 'keywords', '') or "" token_count = getattr(parsed_query, 'token_count', 0) or 0 is_short_query = getattr(parsed_query, 'is_short_query', False) is_long_query = getattr(parsed_query, 'is_long_query', False) # Get match fields for the detected language match_fields, core_fields = self._get_match_fields(language) # Tie breaker values tie_breaker_base_query = 0.9 tie_breaker_long_query = 0.9 tie_breaker_keywords = 0.9 # 1. Base query - main query with AND operator should_clauses.append({ "multi_match": { "_name": "base_query", "fields": match_fields, "minimum_should_match": "75%", "operator": "AND", "query": query_text, "tie_breaker": tie_breaker_base_query } }) # 2. Translation queries - lower boost (0.4) for other languages if self.enable_multilang_search: if language != 'zh' and translations.get('zh') and translations['zh'] != query_text: zh_fields, _ = self._get_match_fields('zh') should_clauses.append({ "multi_match": { "query": translations['zh'], "fields": zh_fields, "operator": "AND", "minimum_should_match": "75%", "tie_breaker": tie_breaker_base_query, "boost": 0.4, "_name": "base_query_trans_zh" } }) if language != 'en' and translations.get('en') and translations['en'] != query_text: en_fields, _ = self._get_match_fields('en') should_clauses.append({ "multi_match": { "query": translations['en'], "fields": en_fields, "operator": "AND", "minimum_should_match": "75%", "tie_breaker": tie_breaker_base_query, "boost": 0.4, "_name": "base_query_trans_en" } }) # 3. Long query - add a query with lower minimum_should_match # Currently disabled (False condition in reference) if False and is_long_query: boost = 0.5 * pow(min(1.0, token_count / 10.0), 0.9) minimum_should_match = "70%" should_clauses.append({ "multi_match": { "query": query_text, "fields": match_fields, "minimum_should_match": minimum_should_match, "boost": boost, "tie_breaker": tie_breaker_long_query, "_name": "long_query" } }) # 4. Short query - add phrase query ENABLE_PHRASE_QUERY = True if ENABLE_PHRASE_QUERY and token_count >= 2 and is_short_query: query_length = len(query_text) slop = 0 if query_length < 3 else 1 if query_length < 5 else 2 should_clauses.append({ "multi_match": { "query": query_text, "fields": core_fields, "type": "phrase", "slop": slop, "boost": 1.0, "_name": "phrase_query" } }) # 5. Keywords query - extracted nouns from query elif keywords and len(keywords.split()) <= 2 and 2 * len(keywords.replace(' ', '')) <= len(query_text): should_clauses.append({ "multi_match": { "query": keywords, "fields": core_fields, "operator": "AND", "tie_breaker": tie_breaker_keywords, "boost": 0.1, "_name": "keywords_query" } }) # Return bool query with should clauses if len(should_clauses) == 1: return should_clauses[0] return { "bool": { "should": should_clauses, "minimum_should_match": 1 } } def _build_boolean_query(self, node: QueryNode) -> Dict[str, Any]: """ Build query from boolean expression tree. Args: node: Query tree node Returns: ES query clause """ if node.operator == 'TERM': # Leaf node - simple text query return self._build_text_query(node.value) elif node.operator == 'AND': # All terms must match return { "bool": { "must": [ self._build_boolean_query(term) for term in node.terms ] } } elif node.operator == 'OR': # Any term must match return { "bool": { "should": [ self._build_boolean_query(term) for term in node.terms ], "minimum_should_match": 1 } } elif node.operator == 'ANDNOT': # First term must match, second must not if len(node.terms) >= 2: return { "bool": { "must": [self._build_boolean_query(node.terms[0])], "must_not": [self._build_boolean_query(node.terms[1])] } } else: return self._build_boolean_query(node.terms[0]) elif node.operator == 'RANK': # Like OR but for ranking (all terms contribute to score) return { "bool": { "should": [ self._build_boolean_query(term) for term in node.terms ] } } else: # Unknown operator return {"match_all": {}} def _build_filters( self, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, 'RangeFilter']] = None ) -> List[Dict[str, Any]]: """ 构建过滤子句。 Args: filters: 精确匹配过滤器字典 range_filters: 范围过滤器(Dict[str, RangeFilter],RangeFilter 是 Pydantic 模型) Returns: ES filter 子句列表 """ filter_clauses = [] # 1. 处理精确匹配过滤 if filters: for field, value in filters.items(): # 特殊处理:specifications 嵌套过滤 if field == "specifications": if isinstance(value, dict): # 单个规格过滤:{"name": "color", "value": "green"} name = value.get("name") spec_value = value.get("value") if name and spec_value: filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) elif isinstance(value, list): # 多个规格过滤:按 name 分组,相同维度 OR,不同维度 AND # 例如:[{"name": "size", "value": "3"}, {"name": "size", "value": "4"}, {"name": "color", "value": "green"}] # 应该生成:(size=3 OR size=4) AND color=green from collections import defaultdict specs_by_name = defaultdict(list) for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: specs_by_name[name].append(spec_value) # 为每个 name 维度生成一个过滤子句 for name, values in specs_by_name.items(): if len(values) == 1: # 单个值,直接生成 term 查询 filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": values[0]}} ] } } } }) else: # 多个值,使用 should (OR) 连接 should_clauses = [] for spec_value in values: should_clauses.append({ "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } }) filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "should": should_clauses, "minimum_should_match": 1 } } } }) continue # 普通字段过滤 if isinstance(value, list): # 多值匹配(OR) filter_clauses.append({ "terms": {field: value} }) else: # 单值精确匹配 filter_clauses.append({ "term": {field: value} }) # 2. 处理范围过滤(支持 RangeFilter Pydantic 模型或字典) if range_filters: for field, range_filter in range_filters.items(): # 支持 Pydantic 模型或字典格式 if hasattr(range_filter, 'model_dump'): # Pydantic 模型 range_dict = range_filter.model_dump(exclude_none=True) elif isinstance(range_filter, dict): # 已经是字典格式 range_dict = {k: v for k, v in range_filter.items() if v is not None} else: # 其他格式,跳过 continue if range_dict: filter_clauses.append({ "range": {field: range_dict} }) return filter_clauses def add_spu_collapse( self, es_query: Dict[str, Any], spu_field: str, inner_hits_size: int = 3 ) -> Dict[str, Any]: """ Add SPU aggregation/collapse to query. Args: es_query: Existing ES query spu_field: Field containing SPU ID inner_hits_size: Number of SKUs to return per SPU Returns: Modified ES query """ # Add collapse es_query["collapse"] = { "field": spu_field, "inner_hits": { "_source": False, "name": "top_docs", "size": inner_hits_size } } # Add cardinality aggregation to count unique SPUs if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"]["unique_count"] = { "cardinality": { "field": spu_field } } return es_query def add_sorting( self, es_query: Dict[str, Any], sort_by: str, sort_order: str = "desc" ) -> Dict[str, Any]: """ Add sorting to ES query. Args: es_query: Existing ES query sort_by: Field name for sorting (支持 'price' 自动映射) sort_order: Sort order: 'asc' or 'desc' Returns: Modified ES query """ if not sort_by: return es_query if not sort_order: sort_order = "desc" # Auto-map 'price' to 'min_price' or 'max_price' based on sort_order if sort_by == "price": if sort_order.lower() == "asc": sort_by = "min_price" # 价格从低到高 else: sort_by = "max_price" # 价格从高到低 if "sort" not in es_query: es_query["sort"] = [] # Add the specified sort sort_field = { sort_by: { "order": sort_order.lower() } } es_query["sort"].append(sort_field) return es_query def build_facets( self, facet_configs: Optional[List['FacetConfig']] = None ) -> Dict[str, Any]: """ 构建分面聚合。 Args: facet_configs: 分面配置对象列表 支持的字段类型: - 普通字段: 如 "category1_name"(terms 或 range 类型) - specifications: "specifications"(返回所有规格名称及其值) - specifications.{name}: 如 "specifications.color"(返回指定规格名称的值) Returns: ES aggregations 字典 """ if not facet_configs: return {} aggs = {} for config in facet_configs: field = config.field size = config.size facet_type = config.type # 处理 specifications(所有规格名称) if field == "specifications": aggs["specifications_facet"] = { "nested": {"path": "specifications"}, "aggs": { "by_name": { "terms": { "field": "specifications.name", "size": 20, "order": {"_count": "desc"} }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } } } } } continue # 处理 specifications.{name}(指定规格名称) if field.startswith("specifications."): name = field[len("specifications."):] agg_name = f"specifications_{name}_facet" aggs[agg_name] = { "nested": {"path": "specifications"}, "aggs": { "filter_by_name": { "filter": {"term": {"specifications.name": name}}, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } } } } } continue # 处理普通字段 agg_name = f"{field}_facet" if facet_type == 'terms': aggs[agg_name] = { "terms": { "field": field, "size": size, "order": {"_count": "desc"} } } elif facet_type == 'range': if config.ranges: aggs[agg_name] = { "range": { "field": field, "ranges": config.ranges } } return aggs