""" Elasticsearch query builder. Converts parsed queries and search parameters into ES DSL queries. Simplified architecture: - filters and (text_recall or embedding_recall) - function_score wrapper for boosting fields """ from dataclasses import dataclass from typing import Dict, Any, List, Optional, Tuple import numpy as np from config import FunctionScoreConfig from query.keyword_extractor import KEYWORDS_QUERY_BASE_KEY class ESQueryBuilder: """Builds Elasticsearch DSL queries.""" def __init__( self, match_fields: List[str], field_boosts: Optional[Dict[str, float]] = None, multilingual_fields: Optional[List[str]] = None, shared_fields: Optional[List[str]] = None, core_multilingual_fields: Optional[List[str]] = None, text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, source_fields: Optional[List[str]] = None, function_score_config: Optional[FunctionScoreConfig] = None, default_language: str = "en", knn_text_boost: float = 20.0, knn_image_boost: float = 20.0, knn_text_k: int = 120, knn_text_num_candidates: int = 400, knn_text_k_long: int = 160, knn_text_num_candidates_long: int = 500, knn_image_k: int = 120, knn_image_num_candidates: int = 400, base_minimum_should_match: str = "66%", translation_minimum_should_match: str = "66%", keywords_minimum_should_match: str = "60%", translation_boost: float = 0.4, tie_breaker_base_query: float = 0.9, best_fields_boosts: Optional[Dict[str, float]] = None, best_fields_clause_boost: float = 2.0, phrase_field_boosts: Optional[Dict[str, float]] = None, phrase_match_base_fields: Optional[Tuple[str, ...]] = None, phrase_match_slop: int = 0, phrase_match_tie_breaker: float = 0.0, phrase_match_boost: float = 3.0, ): """ Initialize query builder. Multi-language search (translation-based cross-language recall) is always enabled: queries are matched against detected-language and translated target-language clauses. Args: match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings source_fields: Fields to return in search results (_source includes) function_score_config: Function score configuration default_language: Default language to use when detection fails or returns "unknown" knn_text_boost: Boost for text-embedding KNN clause knn_image_boost: Boost for image-embedding KNN clause """ self.match_fields = match_fields self.field_boosts = field_boosts or {} self.multilingual_fields = multilingual_fields or [] self.shared_fields = shared_fields or [] self.core_multilingual_fields = core_multilingual_fields or [] self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field self.source_fields = source_fields self.function_score_config = function_score_config self.default_language = default_language self.knn_text_boost = float(knn_text_boost) self.knn_image_boost = float(knn_image_boost) self.knn_text_k = int(knn_text_k) self.knn_text_num_candidates = int(knn_text_num_candidates) self.knn_text_k_long = int(knn_text_k_long) self.knn_text_num_candidates_long = int(knn_text_num_candidates_long) self.knn_image_k = int(knn_image_k) self.knn_image_num_candidates = int(knn_image_num_candidates) self.base_minimum_should_match = base_minimum_should_match self.translation_minimum_should_match = translation_minimum_should_match self.keywords_minimum_should_match = str(keywords_minimum_should_match) self.translation_boost = float(translation_boost) self.tie_breaker_base_query = float(tie_breaker_base_query) default_best_fields = { base: self._get_field_boost(base) for base in self.core_multilingual_fields if base in self.multilingual_fields } self.best_fields_boosts = { str(base): float(boost) for base, boost in (best_fields_boosts or default_best_fields).items() } self.best_fields_clause_boost = float(best_fields_clause_boost) default_phrase_base_fields = tuple(phrase_match_base_fields or ("title", "qanchors")) default_phrase_fields = { base: self._get_field_boost(base) for base in default_phrase_base_fields if base in self.multilingual_fields } self.phrase_field_boosts = { str(base): float(boost) for base, boost in (phrase_field_boosts or default_phrase_fields).items() } self.phrase_match_slop = int(phrase_match_slop) self.phrase_match_tie_breaker = float(phrase_match_tie_breaker) self.phrase_match_boost = float(phrase_match_boost) @dataclass(frozen=True) class KNNClausePlan: field: str boost: float k: Optional[int] = None num_candidates: Optional[int] = None nested_path: Optional[str] = None @staticmethod def _vector_to_list(vector: Any) -> List[float]: if vector is None: return [] if hasattr(vector, "tolist"): values = vector.tolist() else: values = list(vector) return [float(v) for v in values] @staticmethod def _query_token_count(parsed_query: Optional[Any]) -> int: if parsed_query is None: return 0 query_tokens = getattr(parsed_query, "query_tokens", None) or [] return len(query_tokens) def get_text_knn_plan(self, parsed_query: Optional[Any] = None) -> Optional[KNNClausePlan]: if not self.text_embedding_field: return None boost = self.knn_text_boost final_knn_k = self.knn_text_k final_knn_num_candidates = self.knn_text_num_candidates if self._query_token_count(parsed_query) >= 5: final_knn_k = self.knn_text_k_long final_knn_num_candidates = self.knn_text_num_candidates_long boost = self.knn_text_boost * 1.4 return self.KNNClausePlan( field=str(self.text_embedding_field), boost=float(boost), k=int(final_knn_k), num_candidates=int(final_knn_num_candidates), ) def get_image_knn_plan(self) -> Optional[KNNClausePlan]: if not self.image_embedding_field: return None nested_path, _, _ = str(self.image_embedding_field).rpartition(".") return self.KNNClausePlan( field=str(self.image_embedding_field), boost=float(self.knn_image_boost), k=int(self.knn_image_k), num_candidates=int(self.knn_image_num_candidates), nested_path=nested_path or None, ) def build_text_knn_clause( self, query_vector: Any, *, parsed_query: Optional[Any] = None, query_name: str = "knn_query", ) -> Optional[Dict[str, Any]]: plan = self.get_text_knn_plan(parsed_query) if plan is None or query_vector is None: return None return { "knn": { "field": plan.field, "query_vector": self._vector_to_list(query_vector), "k": plan.k, "num_candidates": plan.num_candidates, "boost": plan.boost, "_name": query_name, } } def build_image_knn_clause( self, image_query_vector: Any, *, query_name: str = "image_knn_query", ) -> Optional[Dict[str, Any]]: plan = self.get_image_knn_plan() if plan is None or image_query_vector is None: return None image_knn_query = { "field": plan.field, "query_vector": self._vector_to_list(image_query_vector), "k": plan.k, "num_candidates": plan.num_candidates, "boost": plan.boost, } if plan.nested_path: return { "nested": { "path": plan.nested_path, "_name": query_name, "query": {"knn": image_knn_query}, "score_mode": "max", } } return { "knn": { **image_knn_query, "_name": query_name, } } def build_exact_text_knn_rescore_clause( self, query_vector: Any, *, parsed_query: Optional[Any] = None, query_name: str = "exact_text_knn_query", ) -> Optional[Dict[str, Any]]: plan = self.get_text_knn_plan(parsed_query) if plan is None or query_vector is None: return None return { "script_score": { "_name": query_name, "query": {"exists": {"field": plan.field}}, "script": { "source": ( f"((dotProduct(params.query_vector, '{plan.field}') + 1.0) / 2.0) * params.boost" ), "params": { "query_vector": self._vector_to_list(query_vector), "boost": float(plan.boost), }, }, } } def build_exact_image_knn_rescore_clause( self, image_query_vector: Any, *, query_name: str = "exact_image_knn_query", ) -> Optional[Dict[str, Any]]: plan = self.get_image_knn_plan() if plan is None or image_query_vector is None: return None script_score_query = { "query": {"exists": {"field": plan.field}}, "script": { "source": ( f"((dotProduct(params.query_vector, '{plan.field}') + 1.0) / 2.0) * params.boost" ), "params": { "query_vector": self._vector_to_list(image_query_vector), "boost": float(plan.boost), }, }, } if plan.nested_path: return { "nested": { "path": plan.nested_path, "_name": query_name, "score_mode": "max", "query": {"script_score": script_score_query}, } } return {"script_score": {"_name": query_name, **script_score_query}} def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return all source fields) - []: _source=false - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _split_filters_for_faceting( self, filters: Optional[Dict[str, Any]], facet_configs: Optional[List[Any]] ) -> tuple: """ Split filters into conjunctive (query) and disjunctive (post_filter) based on facet configs. Disjunctive filters (multi-select facets): - Applied via post_filter (affects results but not aggregations) - Allows showing other options in the same facet even when filtered Conjunctive filters (standard facets): - Applied in query.bool.filter (affects both results and aggregations) - Standard drill-down behavior Args: filters: All filters from request facet_configs: Facet configurations with disjunctive flags Returns: (conjunctive_filters, disjunctive_filters) """ if not filters or not facet_configs: return filters or {}, {} # Get fields that support multi-select multi_select_fields = set() for fc in facet_configs: if getattr(fc, 'disjunctive', False): # Handle specifications.xxx format if fc.field.startswith('specifications.'): multi_select_fields.add('specifications') else: multi_select_fields.add(fc.field) # Split filters conjunctive = {} disjunctive = {} for field, value in filters.items(): if field in multi_select_fields: disjunctive[field] = value else: conjunctive[field] = value return conjunctive, disjunctive def build_query( self, query_text: str, query_vector: Optional[np.ndarray] = None, image_query_vector: Optional[np.ndarray] = None, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facet_configs: Optional[List[Any]] = None, size: int = 10, from_: int = 0, enable_knn: bool = True, min_score: Optional[float] = None, parsed_query: Optional[Any] = None, ) -> Dict[str, Any]: """ Build complete ES query with post_filter support for multi-select faceting. 结构:filters and (text_recall or embedding_recall) + post_filter - conjunctive_filters: 应用在 query.bool.filter(影响结果和聚合) - disjunctive_filters: 应用在 post_filter(只影响结果,不影响聚合) - text_recall: 文本相关性召回(按实际 clause 语言动态字段) - embedding_recall: 向量召回(KNN) - function_score: 包装召回部分,支持提权字段 Args: query_text: Query text for BM25 matching query_vector: Query embedding for KNN search filters: Exact match filters range_filters: Range filters for numeric fields (always applied in query) facet_configs: Facet configurations (used to identify multi-select facets) size: Number of results from_: Offset for pagination enable_knn: Whether to use KNN search min_score: Minimum score threshold Returns: ES query DSL dictionary """ # Boolean AST path has been removed; keep a single text strategy. es_query = { "size": size, "from": from_ } # Add _source filtering with explicit tri-state semantics. self._apply_source_filter(es_query) # 1. Build recall queries (text or embedding) recall_clauses = [] # Text recall (always include if query_text exists) if query_text: recall_clauses.extend(self._build_advanced_text_query(query_text, parsed_query)) # Embedding recall has_embedding = enable_knn and query_vector is not None and self.text_embedding_field has_image_embedding = enable_knn and image_query_vector is not None and self.image_embedding_field # 2. Split filters for multi-select faceting conjunctive_filters, disjunctive_filters = self._split_filters_for_faceting( filters, facet_configs ) # Build filter clauses for query (conjunctive filters + range filters) filter_clauses = self._build_filters(conjunctive_filters, range_filters) product_title_exclusion_filter = self._build_product_title_exclusion_filter(parsed_query) if product_title_exclusion_filter: filter_clauses.append(product_title_exclusion_filter) # 3. Add KNN search clauses alongside lexical clauses under the same bool.should # Text KNN: k / num_candidates from config; long queries use *_long and higher boost if has_embedding: text_knn_clause = self.build_text_knn_clause( query_vector, parsed_query=parsed_query, query_name="knn_query", ) if text_knn_clause: recall_clauses.append(text_knn_clause) if has_image_embedding: image_knn_clause = self.build_image_knn_clause( image_query_vector, query_name="image_knn_query", ) if image_knn_clause: recall_clauses.append(image_knn_clause) # 4. Build main query structure: filters and recall if recall_clauses: if len(recall_clauses) == 1: recall_query = recall_clauses[0] else: recall_query = { "bool": { "should": recall_clauses, "minimum_should_match": 1 } } recall_query = self._wrap_with_function_score(recall_query) if filter_clauses: es_query["query"] = { "bool": { "must": [recall_query], "filter": filter_clauses } } else: es_query["query"] = recall_query else: if filter_clauses: es_query["query"] = { "bool": { "must": [{"match_all": {}}], "filter": filter_clauses } } else: es_query["query"] = {"match_all": {}} # 5. Add post_filter for disjunctive (multi-select) filters if disjunctive_filters: post_filter_clauses = self._build_filters(disjunctive_filters, None) if post_filter_clauses: if len(post_filter_clauses) == 1: es_query["post_filter"] = post_filter_clauses[0] else: es_query["post_filter"] = { "bool": {"filter": post_filter_clauses} } # 6. Add minimum score filter if min_score is not None: es_query["min_score"] = min_score return es_query def _wrap_with_function_score(self, query: Dict[str, Any]) -> Dict[str, Any]: """ Wrap query with function_score for boosting fields. Args: query: Base query to wrap Returns: Function score query or original query if no functions configured """ functions = self._build_score_functions() # If no functions configured, return original query if not functions: return query # Build function_score query score_mode = self.function_score_config.score_mode if self.function_score_config else "sum" boost_mode = self.function_score_config.boost_mode if self.function_score_config else "multiply" function_score_query = { "function_score": { "query": query, "functions": functions, "score_mode": score_mode, "boost_mode": boost_mode } } return function_score_query def _build_score_functions(self) -> List[Dict[str, Any]]: """ Build function_score functions from config. Returns: List of function score functions """ functions = [] if not self.function_score_config: return functions config_functions = self.function_score_config.functions or [] for func_config in config_functions: func_type = func_config.get("type") if func_type == "filter_weight": # Filter + Weight functions.append({ "filter": func_config["filter"], "weight": func_config.get("weight", 1.0) }) elif func_type == "field_value_factor": # Field Value Factor functions.append({ "field_value_factor": { "field": func_config["field"], "factor": func_config.get("factor", 1.0), "modifier": func_config.get("modifier", "none"), "missing": func_config.get("missing", 1.0) } }) elif func_type == "decay": # Decay Function (gauss/exp/linear) decay_func = func_config.get("function", "gauss") field = func_config["field"] decay_params = { "origin": func_config.get("origin", "now"), "scale": func_config["scale"] } if "offset" in func_config: decay_params["offset"] = func_config["offset"] if "decay" in func_config: decay_params["decay"] = func_config["decay"] functions.append({ decay_func: { field: decay_params } }) return functions def _format_field_with_boost(self, field_name: str, boost: float) -> str: if abs(float(boost) - 1.0) < 1e-9: return field_name return f"{field_name}^{round(boost, 2)}" def _get_field_boost(self, base_field: str, language: Optional[str] = None) -> float: # Language-specific override first (e.g. title.de), then base field (e.g. title) if language: lang_key = f"{base_field}.{language}" if lang_key in self.field_boosts: return float(self.field_boosts[lang_key]) if base_field in self.field_boosts: return float(self.field_boosts[base_field]) return 1.0 def _match_field_strings( self, language: str, *, multilingual_fields: Optional[List[str]] = None, shared_fields: Optional[List[str]] = None, boost_overrides: Optional[Dict[str, float]] = None, ) -> List[str]: """Build ``multi_match`` / ``combined_fields`` field entries for one language code.""" lang = (language or "").strip().lower() text_bases = multilingual_fields if multilingual_fields is not None else self.multilingual_fields term_fields = shared_fields if shared_fields is not None else self.shared_fields overrides = boost_overrides or {} out: List[str] = [] for base in text_bases: path = f"{base}.{lang}" boost = float(overrides.get(base, self._get_field_boost(base, lang))) out.append(self._format_field_with_boost(path, boost)) for shared in term_fields: boost = float(overrides.get(shared, self._get_field_boost(shared, None))) out.append(self._format_field_with_boost(shared, boost)) return out def _build_best_fields_clause(self, language: str, query_text: str) -> Optional[Dict[str, Any]]: fields = self._match_field_strings( language, multilingual_fields=list(self.best_fields_boosts), shared_fields=[], boost_overrides=self.best_fields_boosts, ) if not fields: return None return { "multi_match": { "query": query_text, "type": "best_fields", "fields": fields, "boost": self.best_fields_clause_boost, } } def _build_phrase_clause(self, language: str, query_text: str) -> Optional[Dict[str, Any]]: fields = self._match_field_strings( language, multilingual_fields=list(self.phrase_field_boosts), shared_fields=[], boost_overrides=self.phrase_field_boosts, ) if not fields: return None clause: Dict[str, Any] = { "multi_match": { "query": query_text, "type": "phrase", "fields": fields, "boost": self.phrase_match_boost, } } if self.phrase_match_slop > 0: clause["multi_match"]["slop"] = self.phrase_match_slop if self.phrase_match_tie_breaker > 0: clause["multi_match"]["tie_breaker"] = self.phrase_match_tie_breaker return clause def _build_lexical_language_clause( self, lang: str, lang_query: str, clause_name: str, *, is_source: bool, keywords_query: Optional[str] = None, ) -> Optional[Dict[str, Any]]: combined_fields = self._match_field_strings(lang) if not combined_fields: return None minimum_should_match = ( self.base_minimum_should_match if is_source else self.translation_minimum_should_match ) kw = (keywords_query or "").strip() main_query = (lang_query or "").strip() combined_must: List[Dict[str, Any]] = [ { "combined_fields": { "query": main_query, "fields": combined_fields, "minimum_should_match": minimum_should_match, "boost": 2.0, } } ] if kw and kw != main_query: combined_must.append( { "combined_fields": { "query": kw, "fields": combined_fields, "minimum_should_match": self.keywords_minimum_should_match, "boost": 0.8, } } ) optional_mm = [ clause for clause in ( self._build_best_fields_clause(lang, main_query), self._build_phrase_clause(lang, main_query), ) if clause ] should_clauses: List[Dict[str, Any]] = [{"bool": {"must": combined_must}}] should_clauses.extend(optional_mm) clause: Dict[str, Any] = { "bool": { "_name": clause_name, "should": should_clauses, "minimum_should_match": 1, } } if not is_source: clause["bool"]["boost"] = float(self.translation_boost) return clause def _build_advanced_text_query( self, query_text: str, parsed_query: Optional[Any] = None, ) -> List[Dict[str, Any]]: """ Build advanced text query using base and translated lexical clauses. Unified implementation: - base_query: source-language clause - translation queries: target-language clauses from translations Args: query_text: Query text parsed_query: ParsedQuery object with analysis results Returns: Flat recall clauses to be merged with KNN clauses under query.bool.should """ should_clauses = [] source_lang = self.default_language translations: Dict[str, str] = {} if parsed_query: detected_lang = getattr(parsed_query, "detected_language", None) source_lang = detected_lang if detected_lang and detected_lang != "unknown" else self.default_language translations = getattr(parsed_query, "translations", None) or {} source_lang = str(source_lang or self.default_language).strip().lower() or self.default_language base_query_text = ( getattr(parsed_query, "rewritten_query", None) if parsed_query else None ) or query_text kw_by_variant: Dict[str, str] = ( getattr(parsed_query, "keywords_queries", None) or {} if parsed_query else {} ) if base_query_text: base_clause = self._build_lexical_language_clause( source_lang, base_query_text, "base_query", is_source=True, keywords_query=(kw_by_variant.get(KEYWORDS_QUERY_BASE_KEY) or "").strip(), ) if base_clause: should_clauses.append(base_clause) for lang, translated_text in translations.items(): normalized_lang = str(lang or "").strip().lower() normalized_text = str(translated_text or "").strip() if not normalized_lang or not normalized_text: continue if normalized_lang == source_lang and normalized_text == base_query_text: continue trans_kw = (kw_by_variant.get(normalized_lang) or "").strip() trans_clause = self._build_lexical_language_clause( normalized_lang, normalized_text, f"base_query_trans_{normalized_lang}", is_source=False, keywords_query=trans_kw, ) if trans_clause: should_clauses.append(trans_clause) # Fallback to a simple query when language fields cannot be resolved. if not should_clauses: fallback_fields = self.match_fields or ["title.en^1.0"] fallback_lexical = { "multi_match": { "_name": "base_query_fallback", "query": query_text, "fields": fallback_fields, "minimum_should_match": self.base_minimum_should_match, } } return [fallback_lexical] return should_clauses def _build_filters( self, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, 'RangeFilter']] = None ) -> List[Dict[str, Any]]: """ 构建过滤子句。 Args: filters: 精确匹配过滤器字典 range_filters: 范围过滤器(Dict[str, RangeFilter],RangeFilter 是 Pydantic 模型) Returns: ES filter 子句列表 """ filter_clauses = [] # 1. 处理精确匹配过滤 if filters: for field, value in filters.items(): # 特殊处理:specifications 嵌套过滤 if field == "specifications": if isinstance(value, dict): # 单个规格过滤:{"name": "color", "value": "green"} name = value.get("name") spec_value = value.get("value") if name and spec_value: filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) elif isinstance(value, list): # 多个规格过滤:按 name 分组,相同维度 OR,不同维度 AND # 例如:[{"name": "size", "value": "3"}, {"name": "size", "value": "4"}, {"name": "color", "value": "green"}] # 应该生成:(size=3 OR size=4) AND color=green from collections import defaultdict specs_by_name = defaultdict(list) for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: specs_by_name[name].append(spec_value) # 为每个 name 维度生成一个过滤子句 for name, values in specs_by_name.items(): if len(values) == 1: # 单个值,直接生成 term 查询 filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": values[0]}} ] } } } }) else: # 多个值,使用 should (OR) 连接 should_clauses = [] for spec_value in values: should_clauses.append({ "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } }) filter_clauses.append({ "nested": { "path": "specifications", "query": { "bool": { "should": should_clauses, "minimum_should_match": 1 } } } }) continue # *_all 语义:多值时为 AND(必须同时匹配所有值) if field.endswith("_all"): es_field = field[:-4] # 去掉 _all 后缀 if es_field == "specifications" and isinstance(value, list): # specifications_all: 列表内每个规格条件都要满足(AND) must_nested = [] for spec in value: if isinstance(spec, dict): name = spec.get("name") spec_value = spec.get("value") if name and spec_value: must_nested.append({ "nested": { "path": "specifications", "query": { "bool": { "must": [ {"term": {"specifications.name": name}}, {"term": {"specifications.value": spec_value}} ] } } } }) if must_nested: filter_clauses.append({"bool": {"must": must_nested}}) else: # 普通字段 _all:多值用 must + 多个 term if isinstance(value, list): if value: filter_clauses.append({ "bool": { "must": [{"term": {es_field: v}} for v in value] } }) else: filter_clauses.append({"term": {es_field: value}}) continue # 普通字段过滤(默认多值为 OR) if isinstance(value, list): # 多值匹配(OR) filter_clauses.append({ "terms": {field: value} }) else: # 单值精确匹配 filter_clauses.append({ "term": {field: value} }) # 2. 处理范围过滤(支持 RangeFilter Pydantic 模型或字典) if range_filters: for field, range_filter in range_filters.items(): # 支持 Pydantic 模型或字典格式 if hasattr(range_filter, 'model_dump'): # Pydantic 模型 range_dict = range_filter.model_dump(exclude_none=True) elif isinstance(range_filter, dict): # 已经是字典格式 range_dict = {k: v for k, v in range_filter.items() if v is not None} else: # 其他格式,跳过 continue if range_dict: filter_clauses.append({ "range": {field: range_dict} }) return filter_clauses @staticmethod def _build_product_title_exclusion_filter(parsed_query: Optional[Any]) -> Optional[Dict[str, Any]]: if parsed_query is None: return None profile = getattr(parsed_query, "product_title_exclusion_profile", None) if not profile or not getattr(profile, "is_active", False): return None should_clauses: List[Dict[str, Any]] = [] for term in profile.all_zh_title_exclusions(): should_clauses.append({"match_phrase": {"title.zh": {"query": term}}}) for term in profile.all_en_title_exclusions(): should_clauses.append({"match_phrase": {"title.en": {"query": term}}}) if not should_clauses: return None return { "bool": { "must_not": [ { "bool": { "should": should_clauses, "minimum_should_match": 1, } } ] } } def add_sorting( self, es_query: Dict[str, Any], sort_by: str, sort_order: str = "desc" ) -> Dict[str, Any]: """ Add sorting to ES query. Args: es_query: Existing ES query sort_by: Field name for sorting (支持 'price' 自动映射) sort_order: Sort order: 'asc' or 'desc' Returns: Modified ES query """ if not sort_by: return es_query if not sort_order: sort_order = "desc" # Auto-map 'price' to 'min_price' or 'max_price' based on sort_order if sort_by == "price": if sort_order.lower() == "asc": sort_by = "min_price" # 价格从低到高 else: sort_by = "max_price" # 价格从高到低 if "sort" not in es_query: es_query["sort"] = [] # Add the specified sort sort_field = { sort_by: { "order": sort_order.lower() } } es_query["sort"].append(sort_field) return es_query def build_facets( self, facet_configs: Optional[List['FacetConfig']] = None, use_reverse_nested: bool = True ) -> Dict[str, Any]: """ 构建分面聚合。 Args: facet_configs: 分面配置对象列表 use_reverse_nested: 是否使用 reverse_nested 统计产品数量(默认 True) 如果为 False,将统计嵌套文档数量(性能更好但计数可能不准确) 支持的字段类型: - 普通字段: 如 "category1_name"(terms 或 range 类型) - specifications: "specifications"(返回所有规格名称及其值) - specifications.{name}: 如 "specifications.color"(返回指定规格名称的值) Returns: ES aggregations 字典 性能说明: - use_reverse_nested=True: 统计产品数量,准确性高但性能略差(通常影响 < 20%) - use_reverse_nested=False: 统计嵌套文档数量,性能更好但计数可能不准确 """ if not facet_configs: return {} aggs = {} for config in facet_configs: field = config.field size = config.size facet_type = config.type # 处理 specifications(所有规格名称) if field == "specifications": aggs["specifications_facet"] = { "nested": {"path": "specifications"}, "aggs": { "by_name": { "terms": { "field": "specifications.name", "size": 20, "order": {"_count": "desc"} }, "aggs": { "value_counts": { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } } } } } continue # 处理 specifications.{name}(指定规格名称) if field.startswith("specifications."): name = field[len("specifications."):] agg_name = f"specifications_{name}_facet" # 使用 reverse_nested 统计产品(父文档)数量,而不是规格条目(嵌套文档)数量 # 这样可以确保分面计数反映实际的产品数量,与搜索结果数量一致 base_value_counts = { "terms": { "field": "specifications.value", "size": size, "order": {"_count": "desc"} } } # 如果启用 reverse_nested,添加子聚合统计产品数量 if use_reverse_nested: base_value_counts["aggs"] = { "product_count": { "reverse_nested": {} } } aggs[agg_name] = { "nested": {"path": "specifications"}, "aggs": { "filter_by_name": { "filter": {"term": {"specifications.name": name}}, "aggs": { "value_counts": base_value_counts } } } } continue # 处理普通字段 agg_name = f"{field}_facet" if facet_type == 'terms': aggs[agg_name] = { "terms": { "field": field, "size": size, "order": {"_count": "desc"} } } elif facet_type == 'range': if config.ranges: aggs[agg_name] = { "range": { "field": field, "ranges": config.ranges } } return aggs