""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, boolean expressions, ranking, and result formatting. """ from typing import Dict, Any, List, Optional, Union import os import time, json import logging from utils.es_client import ESClient from query import QueryParser, ParsedQuery from embeddings.image_encoder import CLIPImageEncoder from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from config.utils import get_match_fields_for_index from context.request_context import RequestContext, RequestContextStage from api.models import FacetResult, FacetValue, FacetConfig from api.result_formatter import ResultFormatter from indexer.mapping_generator import get_tenant_index_name logger = logging.getLogger(__name__) class SearchResult: """Container for search results (外部友好格式).""" def __init__( self, results: List[Any], # List[SpuResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None, related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.suggestions = suggestions or [] self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info, "suggestions": self.suggestions, "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Boolean expression parsing - ES query building - Result ranking and formatting """ def __init__( self, es_client: ESClient, config: SearchConfig, query_parser: Optional[QueryParser] = None, image_encoder: Optional[CLIPImageEncoder] = None, ): """ Initialize searcher. Args: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) image_encoder: Optional pre-initialized image encoder """ self.es_client = es_client self.config = config # Index name is now generated dynamically per tenant, no longer stored here self.query_parser = query_parser or QueryParser(config) # Initialize components self.boolean_parser = BooleanParser() # Get match fields from config self.match_fields = get_match_fields_for_index(config, "default") self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" self.image_embedding_field = config.query_config.image_embedding_field if self.image_embedding_field and image_encoder is None: self.image_encoder = CLIPImageEncoder() else: self.image_encoder = image_encoder self.source_fields = config.query_config.source_fields # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( match_fields=self.match_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, source_fields=self.source_fields, function_score_config=self.config.function_score, default_language=self.config.query_config.default_language, knn_boost=self.config.query_config.knn_boost ) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return full source) - []: _source=false (return no source fields) - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def search( self, query: str, tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[FacetConfig]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False, language: str = "en", sku_filter_dimension: Optional[List[str]] = None, enable_rerank: bool = False, rerank_query_template: Optional[str] = None, rerank_doc_template: Optional[str] = None, ) -> SearchResult: """ Execute search query (外部友好格式). Args: query: Search query string tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (created if not provided) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output Returns: SearchResult object with formatted results """ if context is None: raise ValueError("context is required") # 根据租户配置决定翻译开关(离线/在线统一) tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_langs = tenant_cfg.get("index_languages") or [] enable_translation = len(index_langs) > 0 enable_embedding = self.config.query_config.enable_text_embedding # 重排仅由请求参数 enable_rerank 控制,唯一实现为调用外部 BGE 重排服务 do_rerank = bool(enable_rerank) rerank_window = self.config.rerank.rerank_window or 1000 # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES in_rerank_window = do_rerank and (from_ + size) <= rerank_window es_fetch_from = 0 if in_rerank_window else from_ es_fetch_size = rerank_window if in_rerank_window else size # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_rerank={do_rerank}, in_rerank_window={in_rerank_window}, es_fetch=({es_fetch_from},{es_fetch_size}) | " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'es_fetch_from': es_fetch_from, 'es_fetch_size': es_fetch_size, 'in_rerank_window': in_rerank_window, 'rerank_query_template': rerank_query_template, 'rerank_doc_template': rerank_doc_template, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': do_rerank, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': do_rerank } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, tenant_id=tenant_id, generate_vector=enable_embedding, context=context ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, query_normalized=parsed_query.query_normalized, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, domain=parsed_query.domain, is_simple_query=self.boolean_parser.is_simple_query(parsed_query.rewritten_query) ) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"域: {parsed_query.domain} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Boolean parsing context.start_stage(RequestContextStage.BOOLEAN_PARSING) try: query_node = None if self.boolean_parser.is_simple_query(parsed_query.rewritten_query): # Simple query query_text = parsed_query.rewritten_query context.logger.debug( f"简单查询 | 无布尔表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) else: # Complex boolean query query_node = self.boolean_parser.parse(parsed_query.rewritten_query) query_text = parsed_query.rewritten_query context.store_intermediate_result('query_node', query_node) context.store_intermediate_result('boolean_ast', str(query_node)) context.logger.info( f"布尔表达式解析 | AST: {query_node}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"布尔表达式解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.BOOLEAN_PARSING) # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # index_name = "search_products" # No longer need to add tenant_id to filters since each tenant has its own index es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.query_normalized, query_vector=parsed_query.query_vector if enable_embedding else None, query_node=query_node, filters=filters, range_filters=range_filters, facet_configs=facets, size=es_fetch_size, from_=es_fetch_from, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score, parsed_query=parsed_query ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) context.store_intermediate_result('es_body_for_search', body_for_es) # Serialize ES query as a compact JSON string (no spaces or newlines) es_query_compact = json.dumps(es_query, ensure_ascii=False, separators=(',', ':')) context.logger.info( f"ES query built | size: {len(es_query_compact)} chars | " f"KNN: {'yes' if enable_embedding and parsed_query.query_vector is not None else 'no'} | " f"facets: {'yes' if facets else 'no'} | " f"query: {es_query_compact}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( index_name=index_name, body=body_for_es, size=es_fetch_size, from_=es_fetch_from ) # Store ES response in context context.store_intermediate_result('es_response', es_response) # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行) if do_rerank and in_rerank_window: context.start_stage(RequestContextStage.RERANKING) try: from .rerank_client import run_rerank rerank_query = parsed_query.original_query if parsed_query else query rc = self.config.rerank effective_query_template = rerank_query_template or rc.rerank_query_template effective_doc_template = rerank_doc_template or rc.rerank_doc_template es_response, rerank_meta, fused_debug = run_rerank( query=rerank_query, es_response=es_response, language=language, timeout_sec=rc.timeout_sec, weight_es=rc.weight_es, weight_ai=rc.weight_ai, rerank_query_template=effective_query_template, rerank_doc_template=effective_doc_template, ) if rerank_meta is not None: from config.services_config import get_rerank_service_url rerank_url = get_rerank_service_url() context.metadata.setdefault("rerank_info", {}) context.metadata["rerank_info"].update({ "service_url": rerank_url, "docs": len(es_response.get("hits", {}).get("hits") or []), "meta": rerank_meta, }) context.store_intermediate_result("rerank_scores", fused_debug) context.logger.info( f"重排完成 | docs={len(fused_debug)} | meta={rerank_meta}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.add_warning(f"Rerank failed: {e}") context.logger.warning( f"调用重排服务失败 | error: {e}", extra={'reqid': context.reqid, 'uid': context.uid}, exc_info=True, ) finally: context.end_stage(RequestContextStage.RERANKING) # 当本次请求在重排窗口内时:已从 ES 取了 rerank_window 条并可能已重排,需按请求的 from/size 做分页切片 if in_rerank_window: hits = es_response.get("hits", {}).get("hits") or [] sliced = hits[from_ : from_ + size] es_response.setdefault("hits", {})["hits"] = sliced if sliced: slice_max = max((h.get("_score") for h in sliced), default=0.0) try: es_response["hits"]["max_score"] = float(slice_max) except (TypeError, ValueError): es_response["hits"]["max_score"] = 0.0 else: es_response["hits"]["max_score"] = 0.0 context.logger.info( f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", extra={'reqid': context.reqid, 'uid': context.uid} ) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total # max_score 会在启用 AI 搜索时被更新为融合分数的最大值 max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language=language, sku_filter_dimension=sku_filter_dimension ) # Format facets standardized_facets = None if facets: standardized_facets = ResultFormatter.format_facets( es_response.get('aggregations', {}), facets, filters ) # Generate suggestions and related searches query_text = parsed_query.original_query if parsed_query else query suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "query_normalized": context.query_analysis.query_normalized, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "translations": context.query_analysis.translations, "has_vector": context.query_analysis.query_vector is not None, "is_simple_query": context.query_analysis.is_simple_query, "boolean_ast": context.get_intermediate_result('boolean_ast'), "domain": context.query_analysis.domain }, "es_query": context.get_intermediate_result('es_query', {}), "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}) }, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "search_params": context.metadata.get('search_params', {}) } # Build result result = SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), suggestions=suggestions, related_searches=related_searches, debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (外部友好格式). Args: image_url: URL of query image tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding if self.image_encoder is None: raise RuntimeError("Image encoder is not initialized at startup") image_vector = self.image_encoder.encode_image_from_url(image_url) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # No longer need to add tenant_id to filters since each tenant has its own index # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } # Apply source filtering semantics (None / [] / list) self._apply_source_filter(es_query) if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: es_query["query"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=index_name, body=es_query, size=size ) # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language="en", # Default language for image search sku_filter_dimension=None # Image search doesn't support SKU filtering ) return SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=es_response.get('took', 0), facets=None, query_info={'image_url': image_url, 'search_type': 'image_similarity'}, suggestions=[], related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of all configured domains. Returns: Dictionary with domain information """ return self.query_builder.get_domain_summary() def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: tenant_id: Tenant ID (required to determine which index to query) doc_id: Document ID Returns: Document or None if not found """ try: index_name = get_tenant_index_name(tenant_id) response = self.es_client.client.get( index=index_name, id=doc_id ) return response.get('_source') except Exception as e: logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True) return None def _standardize_facets( self, es_aggregations: Dict[str, Any], facet_configs: Optional[List[Union[str, Any]]], current_filters: Optional[Dict[str, Any]] ) -> Optional[List[FacetResult]]: """ 将 ES 聚合结果转换为标准化的分面格式(返回 Pydantic 模型)。 Args: es_aggregations: ES 原始聚合结果 facet_configs: 分面配置列表(str 或 FacetConfig) current_filters: 当前应用的过滤器 Returns: 标准化的分面结果列表(FacetResult 对象) """ if not es_aggregations or not facet_configs: return None standardized_facets: List[FacetResult] = [] for config in facet_configs: # 解析配置 if isinstance(config, str): field = config facet_type = "terms" else: # FacetConfig 对象 field = config.field facet_type = config.type agg_name = f"{field}_facet" if agg_name not in es_aggregations: continue agg_result = es_aggregations[agg_name] # 获取当前字段的选中值 selected_values = set() if current_filters and field in current_filters: filter_value = current_filters[field] if isinstance(filter_value, list): selected_values = set(filter_value) else: selected_values = {filter_value} # 转换 buckets 为 FacetValue 对象 facet_values: List[FacetValue] = [] if 'buckets' in agg_result: for bucket in agg_result['buckets']: value = bucket.get('key') count = bucket.get('doc_count', 0) facet_values.append(FacetValue( value=value, label=str(value), count=count, selected=value in selected_values )) # 构建 FacetResult 对象 facet_result = FacetResult( field=field, label=field, type=facet_type, values=facet_values ) standardized_facets.append(facet_result) return standardized_facets if standardized_facets else None