""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, boolean expressions, ranking, and result formatting. """ from typing import Dict, Any, List, Optional, Union import time, json import logging from utils.es_client import ESClient from query import QueryParser, ParsedQuery from embeddings import CLIPImageEncoder from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder from .rerank_engine import RerankEngine from config import SearchConfig from config.utils import get_match_fields_for_index from context.request_context import RequestContext, RequestContextStage, create_request_context from api.models import FacetResult, FacetValue from api.result_formatter import ResultFormatter logger = logging.getLogger(__name__) class SearchResult: """Container for search results (外部友好格式).""" def __init__( self, results: List[Any], # List[SpuResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None, related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.suggestions = suggestions or [] self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info, "suggestions": self.suggestions, "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Boolean expression parsing - ES query building - Result ranking and formatting """ def __init__( self, es_client: ESClient, config: SearchConfig, query_parser: Optional[QueryParser] = None ): """ Initialize searcher. Args: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) """ self.es_client = es_client self.config = config self.index_name = config.es_index_name self.query_parser = query_parser or QueryParser(config) # Initialize components self.boolean_parser = BooleanParser() self.rerank_engine = RerankEngine(config.ranking.expression, enabled=False) # Get match fields from config self.match_fields = get_match_fields_for_index(config, "default") self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" self.image_embedding_field = config.query_config.image_embedding_field or "image_embedding" self.source_fields = config.query_config.source_fields or [] # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( index_name=self.index_name, match_fields=self.match_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, source_fields=self.source_fields, function_score_config=self.config.function_score ) def search( self, query: str, tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[Any]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False, language: str = "zh", sku_filter_dimension: Optional[List[str]] = None, ) -> SearchResult: """ Execute search query (外部友好格式). Args: query: Search query string tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (created if not provided) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output Returns: SearchResult object with formatted results """ # Create context if not provided (backward compatibility) if context is None: context = create_request_context() # Always use config defaults (these are backend configuration, not user parameters) enable_translation = self.config.query_config.enable_translation enable_embedding = self.config.query_config.enable_text_embedding enable_rerank = False # Temporarily disabled # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, " f"enable_rerank={enable_rerank}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': enable_rerank, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': enable_rerank } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, generate_vector=enable_embedding, context=context ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, normalized_query=parsed_query.normalized_query, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, domain=parsed_query.domain, is_simple_query=self.boolean_parser.is_simple_query(parsed_query.rewritten_query) ) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"域: {parsed_query.domain} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Boolean parsing context.start_stage(RequestContextStage.BOOLEAN_PARSING) try: query_node = None if self.boolean_parser.is_simple_query(parsed_query.rewritten_query): # Simple query query_text = parsed_query.rewritten_query context.logger.debug( f"简单查询 | 无布尔表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) else: # Complex boolean query query_node = self.boolean_parser.parse(parsed_query.rewritten_query) query_text = parsed_query.rewritten_query context.store_intermediate_result('query_node', query_node) context.store_intermediate_result('boolean_ast', str(query_node)) context.logger.info( f"布尔表达式解析 | AST: {query_node}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"布尔表达式解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.BOOLEAN_PARSING) # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: # Add tenant_id to filters (required) if filters is None: filters = {} filters['tenant_id'] = tenant_id es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.normalized_query, query_vector=parsed_query.query_vector if enable_embedding else None, query_node=query_node, filters=filters, range_filters=range_filters, size=size, from_=from_, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) context.store_intermediate_result('es_body_for_search', body_for_es) # Serialize ES query as a compact JSON string (no spaces or newlines) es_query_compact = json.dumps(es_query, ensure_ascii=False, separators=(',', ':')) context.logger.info( f"ES query built | size: {len(es_query_compact)} chars | " f"KNN: {'yes' if enable_embedding and parsed_query.query_vector is not None else 'no'} | " f"facets: {'yes' if facets else 'no'} | " f"query: {es_query_compact}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: es_response = self.es_client.search( index_name=self.index_name, body=body_for_es, size=size, from_=from_ ) # Store ES response in context context.store_intermediate_result('es_response', es_response) # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language=language, sku_filter_dimension=sku_filter_dimension ) # Format facets standardized_facets = None if facets: standardized_facets = ResultFormatter.format_facets( es_response.get('aggregations', {}), facets ) # Generate suggestions and related searches query_text = parsed_query.original_query if parsed_query else query suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "normalized_query": context.query_analysis.normalized_query, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "translations": context.query_analysis.translations, "has_vector": context.query_analysis.query_vector is not None, "is_simple_query": context.query_analysis.is_simple_query, "boolean_ast": context.get_intermediate_result('boolean_ast'), "domain": context.query_analysis.domain }, "es_query": context.get_intermediate_result('es_query', {}), "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}) }, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "search_params": context.metadata.get('search_params', {}) } # Build result result = SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), suggestions=suggestions, related_searches=related_searches, debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (外部友好格式). Args: image_url: URL of query image tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding image_encoder = CLIPImageEncoder() image_vector = image_encoder.encode_image_from_url(image_url) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Add tenant_id to filters (required) if filters is None: filters = {} filters['tenant_id'] = tenant_id # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } # Add _source filtering if source_fields are configured if self.source_fields: es_query["_source"] = { "includes": self.source_fields } if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: es_query["query"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=self.index_name, body=es_query, size=size ) # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language="zh", # Default language for image search sku_filter_dimension=None # Image search doesn't support SKU filtering ) return SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=es_response.get('took', 0), facets=None, query_info={'image_url': image_url, 'search_type': 'image_similarity'}, suggestions=[], related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of all configured domains. Returns: Dictionary with domain information """ return self.query_builder.get_domain_summary() def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: doc_id: Document ID Returns: Document or None if not found """ try: response = self.es_client.client.get( index=self.index_name, id=doc_id ) return response.get('_source') except Exception as e: logger.error(f"Failed to get document {doc_id}: {e}", exc_info=True) return None def _standardize_facets( self, es_aggregations: Dict[str, Any], facet_configs: Optional[List[Union[str, Any]]], current_filters: Optional[Dict[str, Any]] ) -> Optional[List[FacetResult]]: """ 将 ES 聚合结果转换为标准化的分面格式(返回 Pydantic 模型)。 Args: es_aggregations: ES 原始聚合结果 facet_configs: 分面配置列表(str 或 FacetConfig) current_filters: 当前应用的过滤器 Returns: 标准化的分面结果列表(FacetResult 对象) """ if not es_aggregations or not facet_configs: return None standardized_facets: List[FacetResult] = [] for config in facet_configs: # 解析配置 if isinstance(config, str): field = config facet_type = "terms" else: # FacetConfig 对象 field = config.field facet_type = config.type agg_name = f"{field}_facet" if agg_name not in es_aggregations: continue agg_result = es_aggregations[agg_name] # 获取当前字段的选中值 selected_values = set() if current_filters and field in current_filters: filter_value = current_filters[field] if isinstance(filter_value, list): selected_values = set(filter_value) else: selected_values = {filter_value} # 转换 buckets 为 FacetValue 对象 facet_values: List[FacetValue] = [] if 'buckets' in agg_result: for bucket in agg_result['buckets']: value = bucket.get('key') count = bucket.get('doc_count', 0) facet_values.append(FacetValue( value=value, label=str(value), count=count, selected=value in selected_values )) # 构建 FacetResult 对象 facet_result = FacetResult( field=field, label=field, type=facet_type, values=facet_values ) standardized_facets.append(facet_result) return standardized_facets if standardized_facets else None