""" Elasticsearch query builder. Converts parsed queries and search parameters into ES DSL queries. Simplified architecture: - filters and (text_recall or embedding_recall) - function_score wrapper for boosting fields """ from typing import Dict, Any, List, Optional, Union, Tuple import numpy as np from config import FunctionScoreConfig # (Elasticsearch field path, boost before formatting as "path^boost") MatchFieldSpec = Tuple[str, float] class ESQueryBuilder: """Builds Elasticsearch DSL queries.""" def __init__( self, match_fields: List[str], field_boosts: Optional[Dict[str, float]] = None, multilingual_fields: Optional[List[str]] = None, shared_fields: Optional[List[str]] = None, core_multilingual_fields: Optional[List[str]] = None, text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, source_fields: Optional[List[str]] = None, function_score_config: Optional[FunctionScoreConfig] = None, default_language: str = "en", knn_boost: float = 0.25, base_minimum_should_match: str = "70%", translation_minimum_should_match: str = "70%", translation_boost: float = 0.4, tie_breaker_base_query: float = 0.9, mixed_script_merged_field_boost_scale: float = 0.6, phrase_match_base_fields: Optional[Tuple[str, ...]] = None, phrase_match_slop: int = 2, phrase_match_tie_breaker: float = 0.4, phrase_match_boost: float = 3.0, ): """ Initialize query builder. Multi-language search (translation-based cross-language recall) is always enabled: queries are matched against detected-language and translated target-language clauses. Args: match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings source_fields: Fields to return in search results (_source includes) function_score_config: Function score configuration default_language: Default language to use when detection fails or returns "unknown" knn_boost: Boost value for KNN (embedding recall) mixed_script_merged_field_boost_scale: Multiply per-field ^boost for cross-script merged fields """ self.match_fields = match_fields self.field_boosts = field_boosts or {} self.multilingual_fields = multilingual_fields or [ "title", "brief", "description", "qanchors", "vendor", "category_path", "category_name_text" ] self.shared_fields = shared_fields or ["tags", "option1_values", "option2_values", "option3_values"] self.core_multilingual_fields = core_multilingual_fields or ["title", "brief", "vendor", "category_name_text"] self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field self.source_fields = source_fields self.function_score_config = function_score_config self.default_language = default_language self.knn_boost = knn_boost self.base_minimum_should_match = base_minimum_should_match self.translation_minimum_should_match = translation_minimum_should_match self.translation_boost = float(translation_boost) self.tie_breaker_base_query = float(tie_breaker_base_query) self.mixed_script_merged_field_boost_scale = float(mixed_script_merged_field_boost_scale) self.phrase_match_base_fields = tuple(phrase_match_base_fields or ("title", "qanchors")) self.phrase_match_slop = int(phrase_match_slop) self.phrase_match_tie_breaker = float(phrase_match_tie_breaker) self.phrase_match_boost = float(phrase_match_boost) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return all source fields) - []: _source=false - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _split_filters_for_faceting( self, filters: Optional[Dict[str, Any]], facet_configs: Optional[List[Any]] ) -> tuple: """ Split filters into conjunctive (query) and disjunctive (post_filter) based on facet configs. Disjunctive filters (multi-select facets): - Applied via post_filter (affects results but not aggregations) - Allows showing other options in the same facet even when filtered Conjunctive filters (standard facets): - Applied in query.bool.filter (affects both results and aggregations) - Standard drill-down behavior Args: filters: All filters from request facet_configs: Facet configurations with disjunctive flags Returns: (conjunctive_filters, disjunctive_filters) """ if not filters or not facet_configs: return filters or {}, {} # Get fields that support multi-select multi_select_fields = set() for fc in facet_configs: if getattr(fc, 'disjunctive', False): # Handle specifications.xxx format if fc.field.startswith('specifications.'): multi_select_fields.add('specifications') else: multi_select_fields.add(fc.field) # Split filters conjunctive = {} disjunctive = {} for field, value in filters.items(): if field in multi_select_fields: disjunctive[field] = value else: conjunctive[field] = value return conjunctive, disjunctive def build_query( self, query_text: str, query_vector: Optional[np.ndarray] = None, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facet_configs: Optional[List[Any]] = None, size: int = 10, from_: int = 0, enable_knn: bool = True, knn_k: int = 50, knn_num_candidates: int = 200, min_score: Optional[float] = None, parsed_query: Optional[Any] = None, index_languages: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Build complete ES query with post_filter support for multi-select faceting. 结构:filters and (text_recall or embedding_recall) + post_filter - conjunctive_filters: 应用在 query.bool.filter(影响结果和聚合) - disjunctive_filters: 应用在 post_filter(只影响结果,不影响聚合) - text_recall: 文本相关性召回(按实际 clause 语言动态字段) - embedding_recall: 向量召回(KNN) - function_score: 包装召回部分,支持提权字段 Args: query_text: Query text for BM25 matching query_vector: Query embedding for KNN search filters: Exact match filters range_filters: Range filters for numeric fields (always applied in query) facet_configs: Facet configurations (used to identify multi-select facets) size: Number of results from_: Offset for pagination enable_knn: Whether to use KNN search knn_k: K value for KNN knn_num_candidates: Number of candidates for KNN min_score: Minimum score threshold Returns: ES query DSL dictionary """ # Boolean AST path has been removed; keep a single text strategy. es_query = { "size": size, "from": from_ } # Add _source filtering with explicit tri-state semantics. self._apply_source_filter(es_query) # 1. Build recall queries (text or embedding) recall_clauses = [] # Text recall (always include if query_text exists) if query_text: # Unified text query strategy text_query = self._build_advanced_text_query( query_text, parsed_query, index_languages=index_languages, ) recall_clauses.append(text_query) # Embedding recall (KNN - separate from query, handled below) has_embedding = enable_knn and query_vector is not None and self.text_embedding_field # 2. Split filters for multi-select faceting conjunctive_filters, disjunctive_filters = self._split_filters_for_faceting( filters, facet_configs ) # Build filter clauses for query (conjunctive filters + range filters) filter_clauses = self._build_filters(conjunctive_filters, range_filters) # 3. Build main query structure: filters and recall if recall_clauses: # Combine text recalls with OR logic (if multiple) if len(recall_clauses) == 1: recall_query = recall_clauses[0] else: recall_query = { "bool": { "should": recall_clauses, "minimum_should_match": 1 } } # Wrap recall with function_score for boosting recall_query = self._wrap_with_function_score(recall_query) # Combine filters and recall if filter_clauses: es_query["query"] = { "bool": { "must": [recall_query], "filter": filter_clauses } } else: es_query["query"] = recall_query else: # No recall queries, only filters (match_all filtered) if filter_clauses: es_query["query"] = { "bool": { "must": [{"match_all": {}}], "filter": filter_clauses } } else: es_query["query"] = {"match_all": {}} # 4. Add KNN search if enabled (separate from query, ES will combine) # Adjust KNN k, num_candidates, boost by query_tokens (short query: less KNN; long: more) if has_embedding: knn_boost = self.knn_boost if parsed_query: query_tokens = getattr(parsed_query, 'query_tokens', None) or [] token_count = len(query_tokens) if token_count >= 5: knn_k, knn_num_candidates = 160, 500 knn_boost = self.knn_boost * 1.4 # Higher weight for long queries else: knn_k, knn_num_candidates = 120, 400 else: knn_k, knn_num_candidates = 120, 400 knn_clause = { "field": self.text_embedding_field, "query_vector": query_vector.tolist(), "k": knn_k, "num_candidates": knn_num_candidates, "boost": knn_boost, "_name": "knn_query", } # Top-level knn does not inherit query.bool.filter automatically. # Apply conjunctive + range filters here so vector recall respects hard filters. if filter_clauses: if len(filter_clauses) == 1: knn_clause["filter"] = filter_clauses[0] else: knn_clause["filter"] = { "bool": { "filter": filter_clauses } } es_query["knn"] = knn_clause # 5. Add post_filter for disjunctive (multi-select) filters if disjunctive_filters: post_filter_clauses = self._build_filters(disjunctive_filters, None) if post_filter_clauses: if len(post_filter_clauses) == 1: es_query["post_filter"] = post_filter_clauses[0] else: es_query["post_filter"] = { "bool": {"filter": post_filter_clauses} } # 6. Add minimum score filter if min_score is not None: es_query["min_score"] = min_score return es_query def _wrap_with_function_score(self, query: Dict[str, Any]) -> Dict[str, Any]: """ Wrap query with function_score for boosting fields. Args: query: Base query to wrap Returns: Function score query or original query if no functions configured """ functions = self._build_score_functions() # If no functions configured, return original query if not functions: return query # Build function_score query score_mode = self.function_score_config.score_mode if self.function_score_config else "sum" boost_mode = self.function_score_config.boost_mode if self.function_score_config else "multiply" function_score_query = { "function_score": { "query": query, "functions": functions, "score_mode": score_mode, "boost_mode": boost_mode } } return function_score_query def _build_score_functions(self) -> List[Dict[str, Any]]: """ Build function_score functions from config. Returns: List of function score functions """ functions = [] if not self.function_score_config: return functions config_functions = self.function_score_config.functions or [] for func_config in config_functions: func_type = func_config.get("type") if func_type == "filter_weight": # Filter + Weight functions.append({ "filter": func_config["filter"], "weight": func_config.get("weight", 1.0) }) elif func_type == "field_value_factor": # Field Value Factor functions.append({ "field_value_factor": { "field": func_config["field"], "factor": func_config.get("factor", 1.0), "modifier": func_config.get("modifier", "none"), "missing": func_config.get("missing", 1.0) } }) elif func_type == "decay": # Decay Function (gauss/exp/linear) decay_func = func_config.get("function", "gauss") field = func_config["field"] decay_params = { "origin": func_config.get("origin", "now"), "scale": func_config["scale"] } if "offset" in func_config: decay_params["offset"] = func_config["offset"] if "decay" in func_config: decay_params["decay"] = func_config["decay"] functions.append({ decay_func: { field: decay_params } }) return functions def _build_text_query(self, query_text: str) -> Dict[str, Any]: """ Build simple text matching query (BM25). Args: query_text: Query text Returns: ES query clause """ return { "multi_match": { "query": query_text, "fields": self.match_fields, "minimum_should_match": "67%", "tie_breaker": 0.9, "boost": 1.0, "_name": "base_query" } } def _format_field_with_boost(self, field_name: str, boost: float) -> str: if abs(float(boost) - 1.0) < 1e-9: return field_name return f"{field_name}^{round(boost, 2)}" def _get_field_boost(self, base_field: str, language: Optional[str] = None) -> float: # Language-specific override first (e.g. title.de), then base field (e.g. title) if language: lang_key = f"{base_field}.{language}" if lang_key in self.field_boosts: return float(self.field_boosts[lang_key]) if base_field in self.field_boosts: return float(self.field_boosts[base_field]) return 1.0 def _build_match_field_specs(self, language: str) -> Tuple[List[MatchFieldSpec], List[MatchFieldSpec]]: """ Per-language match targets as (field_path, boost). Single source of truth before string formatting. Returns (all_fields, core_fields); core_fields are for phrase/keyword strategies elsewhere. """ lang = (language or "").strip().lower() all_specs: List[MatchFieldSpec] = [] core_specs: List[MatchFieldSpec] = [] for base in self.multilingual_fields: field = f"{base}.{lang}" all_specs.append((field, self._get_field_boost(base, lang))) for shared in self.shared_fields: all_specs.append((shared, self._get_field_boost(shared, None))) for base in self.core_multilingual_fields: field = f"{base}.{lang}" core_specs.append((field, self._get_field_boost(base, lang))) return all_specs, core_specs def _format_match_field_specs(self, specs: List[MatchFieldSpec]) -> List[str]: """Format (field_path, boost) pairs for Elasticsearch multi_match ``fields``.""" return [self._format_field_with_boost(path, boost) for path, boost in specs] def _build_phrase_match_fields(self, language: str) -> List[str]: """Fields for phrase multi_match: base names × ``.{lang}`` with ``field_boosts``.""" lang = (language or "").strip().lower() if not lang: return [] out: List[str] = [] for base in self.phrase_match_base_fields: path = f"{base}.{lang}" boost = self._get_field_boost(base, lang) out.append(self._format_field_with_boost(path, boost)) return out def _append_phrase_should_clause( self, should_clauses: List[Dict[str, Any]], lang: str, lang_query: str, clause_name: str ) -> None: text = (lang_query or "").strip() if not text: return phrase_fields = self._build_phrase_match_fields(lang) if not phrase_fields: return boost = self.phrase_match_boost should_clauses.append({ "multi_match": { "_name": f"{clause_name}_phrase", "query": lang_query, "type": "phrase", "fields": phrase_fields, "slop": self.phrase_match_slop, "tie_breaker": self.phrase_match_tie_breaker, "boost": boost, } }) def _merge_supplemental_lang_field_specs( self, specs: List[MatchFieldSpec], supplemental_lang: str, ) -> List[MatchFieldSpec]: """Append supplemental-language columns; boosts multiplied by mixed_script scale.""" scale = float(self.mixed_script_merged_field_boost_scale) extra_all, _ = self._build_match_field_specs(supplemental_lang) seen = {path for path, _ in specs} out = list(specs) for path, boost in extra_all: if path not in seen: out.append((path, boost * scale)) seen.add(path) return out def _expand_match_field_specs_for_mixed_script( self, lang: str, specs: List[MatchFieldSpec], contains_chinese: bool, contains_english: bool, index_languages: List[str], is_source: bool = False ) -> List[MatchFieldSpec]: """ When the query mixes scripts, widen each clause to indexed fields for the other script (e.g. zh clause also searches title.en when the query contains an English word token). """ norm = {str(x or "").strip().lower() for x in (index_languages or []) if str(x or "").strip()} allow = norm or {"zh", "en"} def can_use(lcode: str) -> bool: return lcode in allow if norm else True out = list(specs) lnorm = (lang or "").strip().lower() if is_source: if contains_english and lnorm != "en" and can_use("en"): out = self._merge_supplemental_lang_field_specs(out, "en") if contains_chinese and lnorm != "zh" and can_use("zh"): out = self._merge_supplemental_lang_field_specs(out, "zh") return out def _get_embedding_field(self, language: str) -> str: """Get embedding field name for a language.""" # Currently using unified embedding field return self.text_embedding_field or "title_embedding" @staticmethod def _normalize_language_list(languages: Optional[List[str]]) -> List[str]: normalized: List[str] = [] seen = set() for language in languages or []: token = str(language or "").strip().lower() if not token or token in seen: continue seen.add(token) normalized.append(token) return normalized def _build_advanced_text_query( self, query_text: str, parsed_query: Optional[Any] = None, *, index_languages: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Build advanced text query using base and translated lexical clauses. Unified implementation: - base_query: source-language clause - translation queries: target-language clauses from translations - KNN query: added separately in build_query Args: query_text: Query text parsed_query: ParsedQuery object with analysis results Returns: ES bool query with should clauses """ should_clauses = [] source_lang = self.default_language translations: Dict[str, str] = {} contains_chinese = False contains_english = False normalized_index_languages = self._normalize_language_list(index_languages) if parsed_query: detected_lang = getattr(parsed_query, "detected_language", None) source_lang = detected_lang if detected_lang and detected_lang != "unknown" else self.default_language translations = getattr(parsed_query, "translations", None) or {} contains_chinese = bool(getattr(parsed_query, "contains_chinese", False)) contains_english = bool(getattr(parsed_query, "contains_english", False)) source_lang = str(source_lang or self.default_language).strip().lower() or self.default_language base_query_text = ( getattr(parsed_query, "rewritten_query", None) if parsed_query else None ) or query_text def append_clause(lang: str, lang_query: str, clause_name: str, is_source: bool) -> None: nonlocal should_clauses all_specs, _ = self._build_match_field_specs(lang) expanded_specs = self._expand_match_field_specs_for_mixed_script( lang, all_specs, contains_chinese, contains_english, normalized_index_languages, is_source, ) match_fields = self._format_match_field_specs(expanded_specs) if not match_fields: return minimum_should_match = ( self.base_minimum_should_match if is_source else self.translation_minimum_should_match ) clause = { "multi_match": { "_name": clause_name, "fields": match_fields, "minimum_should_match": minimum_should_match, "query": lang_query, "tie_breaker": self.tie_breaker_base_query, } } # base_query: never set multi_match.boost (ES default 1.0). # Translation clauses: single knob from config — translation_boost. if not is_source: tb = float(self.translation_boost) clause["multi_match"]["boost"] = tb should_clauses.append({ "multi_match": clause["multi_match"] }) self._append_phrase_should_clause( should_clauses, lang, lang_query, clause_name ) if base_query_text: append_clause(source_lang, base_query_text, "base_query", True) for lang, translated_text in translations.items(): normalized_lang = str(lang or "").strip().lower() normalized_text = str(translated_text or "").strip() if not normalized_lang or not normalized_text: continue if normalized_lang == source_lang and normalized_text == base_query_text: continue append_clause(normalized_lang, normalized_text, f"base_query_trans_{normalized_lang}", False) # Fallback to a simple query when language fields cannot be resolved. if not should_clauses: fallback_fields = self.match_fields or ["title.en^1.0"] fallback_lexical = { "multi_match": { "_name": "base_query_fallback", "query": query_text, "fields": fallback_fields, "minimum_should_match": self.base_minimum_should_match, "tie_breaker": self.tie_breaker_base_query, } } fb_should: List[Dict[str, Any]] = [fallback_lexical] self._append_phrase_should_clause( fb_should, self.default_language, query_text, "base_query_fallback" ) if len(fb_should) == 1: return fallback_lexical return { "bool": { "should": fb_should, "minimum_should_match": 1, } } # Return bool query with should clauses if len(should_clauses) == 1: return should_clauses[0] return { "bool": { "should": should_clauses, "minimum_should_match": 1 } } def _build_filters( self, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, 'RangeFilter']] = None ) -> List[Dict[str, Any]]: """ 构建过滤子句。 Args: filters: 精确匹配过滤器字典 range_filters: 范围过滤器(Dict[str, RangeFilter],RangeFilter 是 Pydantic 模型) Returns: ES filter 子句列表 """ filter_clauses = [] # 1. 处理精确匹配过滤 if filters: for field, value in filters.items(): # 特殊处理:specifications 嵌套过滤 if field == "specifications": if isinstance(value, dict): # 单个规格过滤:{"name": "color", "value": "green"} name = value.get("name") spec_value = value.get("value") if name and spec_value: filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) elif isinstance(value, list): # 多个规格过滤:按 name 分组,相同维度 OR,不同维度 AND # 例如:[{"name": "size", "value": "3"}, {"name": "size", "value": "4"}, {"name": "color", "value": "green"}] # 应该生成:(size=3 OR size=4) AND color=green from collections import defaultdict specs_by_name = defaultdict(list) for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: specs_by_name[name].append(spec_value) # 为每个 name 维度生成一个过滤子句 for name, values in specs_by_name.items(): if len(values) == 1: # 单个值,直接生成 term 查询 filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": values[0]}} ] } } } }) else: # 多个值,使用 should (OR) 连接 should_clauses = [] for spec_value in values: should_clauses.append({ "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } }) filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "should": should_clauses, "minimum_should_match": 1 } } } }) continue # *_all 语义:多值时为 AND(必须同时匹配所有值) if field.endswith("_all"): es_field = field[:-4] # 去掉 _all 后缀 if es_field == "specifications" and isinstance(value, list): # specifications_all: 列表内每个规格条件都要满足(AND) must_nested = [] for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: must_nested.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) if must_nested: filter_clauses.append({"bool": {"must": must_nested}}) else: # 普通字段 _all:多值用 must + 多个 term if isinstance(value, list): if value: filter_clauses.append({ "bool": { "must": [{"term": {es_field: v}} for v in value] } }) else: filter_clauses.append({"term": {es_field: value}}) continue # 普通字段过滤(默认多值为 OR) if isinstance(value, list): # 多值匹配(OR) filter_clauses.append({ "terms": {field: value} }) else: # 单值精确匹配 filter_clauses.append({ "term": {field: value} }) # 2. 处理范围过滤(支持 RangeFilter Pydantic 模型或字典) if range_filters: for field, range_filter in range_filters.items(): # 支持 Pydantic 模型或字典格式 if hasattr(range_filter, 'model_dump'): # Pydantic 模型 range_dict = range_filter.model_dump(exclude_none=True) elif isinstance(range_filter, dict): # 已经是字典格式 range_dict = {k: v for k, v in range_filter.items() if v is not None} else: # 其他格式,跳过 continue if range_dict: filter_clauses.append({ "range": {field: range_dict} }) return filter_clauses def add_sorting( self, es_query: Dict[str, Any], sort_by: str, sort_order: str = "desc" ) -> Dict[str, Any]: """ Add sorting to ES query. Args: es_query: Existing ES query sort_by: Field name for sorting (支持 'price' 自动映射) sort_order: Sort order: 'asc' or 'desc' Returns: Modified ES query """ if not sort_by: return es_query if not sort_order: sort_order = "desc" # Auto-map 'price' to 'min_price' or 'max_price' based on sort_order if sort_by == "price": if sort_order.lower() == "asc": sort_by = "min_price" # 价格从低到高 else: sort_by = "max_price" # 价格从高到低 if "sort" not in es_query: es_query["sort"] = [] # Add the specified sort sort_field = { sort_by: { "order": sort_order.lower() } } es_query["sort"].append(sort_field) return es_query def build_facets( self, facet_configs: Optional[List['FacetConfig']] = None, use_reverse_nested: bool = True ) -> Dict[str, Any]: """ 构建分面聚合。 Args: facet_configs: 分面配置对象列表 use_reverse_nested: 是否使用 reverse_nested 统计产品数量(默认 True) 如果为 False,将统计嵌套文档数量(性能更好但计数可能不准确) 支持的字段类型: - 普通字段: 如 "category1_name"(terms 或 range 类型) - specifications: "specifications"(返回所有规格名称及其值) - specifications.{name}: 如 "specifications.color"(返回指定规格名称的值) Returns: ES aggregations 字典 性能说明: - use_reverse_nested=True: 统计产品数量,准确性高但性能略差(通常影响 < 20%) - use_reverse_nested=False: 统计嵌套文档数量,性能更好但计数可能不准确 """ if not facet_configs: return {} aggs = {} for config in facet_configs: field = config.field size = config.size facet_type = config.type # 处理 specifications(所有规格名称) if field == "specifications": aggs["specifications_facet"] = { "nested": {"path": "specifications"}, "aggs": { "by_name": { "terms": { "field": "specifications.name", "size": 20, "order": {"_count": "desc"} }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } } } } } continue # 处理 specifications.{name}(指定规格名称) if field.startswith("specifications."): name = field[len("specifications."):] agg_name = f"specifications_{name}_facet" # 使用 reverse_nested 统计产品(父文档)数量,而不是规格条目(嵌套文档)数量 # 这样可以确保分面计数反映实际的产品数量,与搜索结果数量一致 base_value_counts = { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } # 如果启用 reverse_nested,添加子聚合统计产品数量 if use_reverse_nested: base_value_counts["aggs"] = { "product_count": { "reverse_nested": {} } } aggs[agg_name] = { "nested": {"path": "specifications"}, "aggs": { "filter_by_name": { "filter": {"term": {"specifications.name": name}}, "aggs": { "value_counts": base_value_counts } } } } continue # 处理普通字段 agg_name = f"{field}_facet" if facet_type == 'terms': aggs[agg_name] = { "terms": { "field": field, "size": size, "order": {"_count": "desc"} } } elif facet_type == 'range': if config.ranges: aggs[agg_name] = { "range": { "field": field, "ranges": config.ranges } } return aggs