""" Elasticsearch query builder. Converts parsed queries and search parameters into ES DSL queries. Simplified architecture: - filters and (text_recall or embedding_recall) - function_score wrapper for boosting fields """ from typing import Dict, Any, List, Optional, Union import numpy as np from .boolean_parser import QueryNode from .query_config import FUNCTION_SCORE_CONFIG class ESQueryBuilder: """Builds Elasticsearch DSL queries.""" def __init__( self, index_name: str, match_fields: List[str], text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, source_fields: Optional[List[str]] = None ): """ Initialize query builder. Args: index_name: ES index name match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings source_fields: Fields to return in search results (_source includes) """ self.index_name = index_name self.match_fields = match_fields self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field self.source_fields = source_fields def build_query( self, query_text: str, query_vector: Optional[np.ndarray] = None, query_node: Optional[QueryNode] = None, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, size: int = 10, from_: int = 0, enable_knn: bool = True, knn_k: int = 50, knn_num_candidates: int = 200, min_score: Optional[float] = None ) -> Dict[str, Any]: """ Build complete ES query (简化版). 结构:filters and (text_recall or embedding_recall) - filters: 前端传递的过滤条件永远起作用 - text_recall: 文本相关性召回(中英文字段都用) - embedding_recall: 向量召回(KNN) - function_score: 包装召回部分,支持提权字段 Args: query_text: Query text for BM25 matching query_vector: Query embedding for KNN search query_node: Parsed boolean expression tree filters: Exact match filters (always applied) range_filters: Range filters for numeric fields (always applied) size: Number of results from_: Offset for pagination enable_knn: Whether to use KNN search knn_k: K value for KNN knn_num_candidates: Number of candidates for KNN min_score: Minimum score threshold Returns: ES query DSL dictionary """ es_query = { "size": size, "from": from_ } # Add _source filtering if source_fields are configured if self.source_fields: es_query["_source"] = { "includes": self.source_fields } # 1. Build recall queries (text or embedding) recall_clauses = [] # Text recall (always include if query_text exists) if query_text: if query_node and query_node.operator != 'TERM': # Complex boolean query text_query = self._build_boolean_query(query_node) else: # Simple text query text_query = self._build_text_query(query_text) recall_clauses.append(text_query) # Embedding recall (KNN - separate from query, handled below) has_embedding = enable_knn and query_vector is not None and self.text_embedding_field # 2. Build filter clauses (always applied) filter_clauses = self._build_filters(filters, range_filters) # 3. Build main query structure: filters and recall if recall_clauses: # Combine text recalls with OR logic (if multiple) if len(recall_clauses) == 1: recall_query = recall_clauses[0] else: recall_query = { "bool": { "should": recall_clauses, "minimum_should_match": 1 } } # Wrap recall with function_score for boosting recall_query = self._wrap_with_function_score(recall_query) # Combine filters and recall if filter_clauses: es_query["query"] = { "bool": { "must": [recall_query], "filter": filter_clauses } } else: es_query["query"] = recall_query else: # No recall queries, only filters (match_all filtered) if filter_clauses: es_query["query"] = { "bool": { "must": [{"match_all": {}}], "filter": filter_clauses } } else: es_query["query"] = {"match_all": {}} # 4. Add KNN search if enabled (separate from query, ES will combine) if has_embedding: knn_clause = { "field": self.text_embedding_field, "query_vector": query_vector.tolist(), "k": knn_k, "num_candidates": knn_num_candidates, "boost": 0.2 # Lower boost for embedding recall } es_query["knn"] = knn_clause # 5. Add minimum score filter if min_score is not None: es_query["min_score"] = min_score return es_query def _wrap_with_function_score(self, query: Dict[str, Any]) -> Dict[str, Any]: """ Wrap query with function_score for boosting fields. Args: query: Base query to wrap Returns: Function score query or original query if no functions configured """ functions = self._build_score_functions() # If no functions configured, return original query if not functions: return query # Build function_score query function_score_query = { "function_score": { "query": query, "functions": functions, "score_mode": FUNCTION_SCORE_CONFIG.get("score_mode", "sum"), "boost_mode": FUNCTION_SCORE_CONFIG.get("boost_mode", "multiply") } } return function_score_query def _build_score_functions(self) -> List[Dict[str, Any]]: """ Build function_score functions from config. Returns: List of function score functions """ functions = [] config_functions = FUNCTION_SCORE_CONFIG.get("functions", []) for func_config in config_functions: func_type = func_config.get("type") if func_type == "filter_weight": # Filter + Weight functions.append({ "filter": func_config["filter"], "weight": func_config.get("weight", 1.0) }) elif func_type == "field_value_factor": # Field Value Factor functions.append({ "field_value_factor": { "field": func_config["field"], "factor": func_config.get("factor", 1.0), "modifier": func_config.get("modifier", "none"), "missing": func_config.get("missing", 1.0) } }) elif func_type == "decay": # Decay Function (gauss/exp/linear) decay_func = func_config.get("function", "gauss") field = func_config["field"] decay_params = { "origin": func_config.get("origin", "now"), "scale": func_config["scale"] } if "offset" in func_config: decay_params["offset"] = func_config["offset"] if "decay" in func_config: decay_params["decay"] = func_config["decay"] functions.append({ decay_func: { field: decay_params } }) return functions def _build_text_query(self, query_text: str) -> Dict[str, Any]: """ Build simple text matching query (BM25). Args: query_text: Query text Returns: ES query clause """ return { "multi_match": { "query": query_text, "fields": self.match_fields, "minimum_should_match": "67%", "tie_breaker": 0.9, "boost": 1.0, "_name": "base_query" } } def _build_boolean_query(self, node: QueryNode) -> Dict[str, Any]: """ Build query from boolean expression tree. Args: node: Query tree node Returns: ES query clause """ if node.operator == 'TERM': # Leaf node - simple text query return self._build_text_query(node.value) elif node.operator == 'AND': # All terms must match return { "bool": { "must": [ self._build_boolean_query(term) for term in node.terms ] } } elif node.operator == 'OR': # Any term must match return { "bool": { "should": [ self._build_boolean_query(term) for term in node.terms ], "minimum_should_match": 1 } } elif node.operator == 'ANDNOT': # First term must match, second must not if len(node.terms) >= 2: return { "bool": { "must": [self._build_boolean_query(node.terms[0])], "must_not": [self._build_boolean_query(node.terms[1])] } } else: return self._build_boolean_query(node.terms[0]) elif node.operator == 'RANK': # Like OR but for ranking (all terms contribute to score) return { "bool": { "should": [ self._build_boolean_query(term) for term in node.terms ] } } else: # Unknown operator return {"match_all": {}} def _build_filters( self, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, 'RangeFilter']] = None ) -> List[Dict[str, Any]]: """ 构建过滤子句。 Args: filters: 精确匹配过滤器字典 range_filters: 范围过滤器(Dict[str, RangeFilter],RangeFilter 是 Pydantic 模型) Returns: ES filter 子句列表 """ filter_clauses = [] # 1. 处理精确匹配过滤 if filters: for field, value in filters.items(): if isinstance(value, list): # 多值匹配(OR) filter_clauses.append({ "terms": {field: value} }) else: # 单值精确匹配 filter_clauses.append({ "term": {field: value} }) # 2. 处理范围过滤(支持 RangeFilter Pydantic 模型或字典) if range_filters: for field, range_filter in range_filters.items(): # 支持 Pydantic 模型或字典格式 if hasattr(range_filter, 'model_dump'): # Pydantic 模型 range_dict = range_filter.model_dump(exclude_none=True) elif isinstance(range_filter, dict): # 已经是字典格式 range_dict = {k: v for k, v in range_filter.items() if v is not None} else: # 其他格式,跳过 continue if range_dict: filter_clauses.append({ "range": {field: range_dict} }) return filter_clauses def add_spu_collapse( self, es_query: Dict[str, Any], spu_field: str, inner_hits_size: int = 3 ) -> Dict[str, Any]: """ Add SPU aggregation/collapse to query. Args: es_query: Existing ES query spu_field: Field containing SPU ID inner_hits_size: Number of SKUs to return per SPU Returns: Modified ES query """ # Add collapse es_query["collapse"] = { "field": spu_field, "inner_hits": { "_source": False, "name": "top_docs", "size": inner_hits_size } } # Add cardinality aggregation to count unique SPUs if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"]["unique_count"] = { "cardinality": { "field": spu_field } } return es_query def add_sorting( self, es_query: Dict[str, Any], sort_by: str, sort_order: str = "desc" ) -> Dict[str, Any]: """ Add sorting to ES query. Args: es_query: Existing ES query sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' Returns: Modified ES query """ if not sort_by: return es_query if not sort_order: sort_order = "desc" if "sort" not in es_query: es_query["sort"] = [] # Add the specified sort sort_field = { sort_by: { "order": sort_order.lower() } } es_query["sort"].append(sort_field) return es_query def build_facets( self, facet_configs: Optional[List[Union[str, 'FacetConfig']]] = None ) -> Dict[str, Any]: """ 构建分面聚合。 支持: 1. 分类分面:category1_name, category2_name, category3_name, category_name 2. specifications分面:嵌套聚合,按name聚合,然后按value聚合 Args: facet_configs: 分面配置列表(标准格式): - str: 字段名,使用默认 terms 配置 - FacetConfig: 详细的分面配置对象 - 特殊值 "specifications": 构建specifications嵌套分面 Returns: ES aggregations 字典 """ if not facet_configs: return {} aggs = {} for config in facet_configs: # 特殊处理:specifications嵌套分面 if isinstance(config, str) and config == "specifications": # 构建specifications嵌套分面(按name聚合,然后按value聚合) aggs["specifications_facet"] = { "nested": { "path": "specifications" }, "aggs": { "by_name": { "terms": { "field": "specifications.name", "size": 20, "order": {"_count": "desc"} }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": 10, "order": {"_count": "desc"} } } } } } } continue # 简单模式:只有字段名(字符串) if isinstance(config, str): field = config agg_name = f"{field}_facet" aggs[agg_name] = { "terms": { "field": field, "size": 10, "order": {"_count": "desc"} } } # 高级模式:FacetConfig 对象 else: # 此时 config 应该是 FacetConfig 对象 field = config.field facet_type = config.type size = config.size agg_name = f"{field}_facet" if facet_type == 'terms': aggs[agg_name] = { "terms": { "field": field, "size": size, "order": {"_count": "desc"} } } elif facet_type == 'range': if config.ranges: aggs[agg_name] = { "range": { "field": field, "ranges": config.ranges } } return aggs