""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, boolean expressions, ranking, and result formatting. """ from typing import Dict, Any, List, Optional import time from config import CustomerConfig from utils.es_client import ESClient from query import QueryParser, ParsedQuery from indexer import MappingGenerator from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder from .multilang_query_builder import MultiLanguageQueryBuilder from .ranking_engine import RankingEngine from context.request_context import RequestContext, RequestContextStage, create_request_context class SearchResult: """Container for search results.""" def __init__( self, hits: List[Dict[str, Any]], total: int, max_score: float, took_ms: int, aggregations: Optional[Dict[str, Any]] = None, query_info: Optional[Dict[str, Any]] = None ): self.hits = hits self.total = total self.max_score = max_score self.took_ms = took_ms self.aggregations = aggregations or {} self.query_info = query_info or {} def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "hits": self.hits, "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "aggregations": self.aggregations, "query_info": self.query_info } class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Boolean expression parsing - ES query building - Result ranking and formatting """ def __init__( self, config: CustomerConfig, es_client: ESClient, query_parser: Optional[QueryParser] = None ): """ Initialize searcher. Args: config: Customer configuration es_client: Elasticsearch client query_parser: Query parser (created if not provided) """ self.config = config self.es_client = es_client self.query_parser = query_parser or QueryParser(config) # Initialize components self.boolean_parser = BooleanParser() self.ranking_engine = RankingEngine(config.ranking.expression) # Get mapping info mapping_gen = MappingGenerator(config) self.match_fields = mapping_gen.get_match_fields_for_domain("default") self.text_embedding_field = mapping_gen.get_text_embedding_field() self.image_embedding_field = mapping_gen.get_image_embedding_field() # Query builder - use multi-language version self.query_builder = MultiLanguageQueryBuilder( config=config, index_name=config.es_index_name, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field ) def search( self, query: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, aggregations: Optional[Dict[str, Any]] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc" ) -> SearchResult: """ Execute search query. Args: query: Search query string size: Number of results to return from_: Offset for pagination filters: Additional filters (field: value pairs) min_score: Minimum score threshold context: Request context for tracking (created if not provided) aggregations: Aggregation specifications for faceted search sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' Returns: SearchResult object """ # Create context if not provided (backward compatibility) if context is None: context = create_request_context() # Always use config defaults (these are backend configuration, not user parameters) enable_translation = self.config.query_config.enable_translation enable_embedding = self.config.query_config.enable_text_embedding enable_rerank = True # Always enable reranking as it's part of the search logic # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, " f"enable_rerank={enable_rerank}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'filters': filters, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': enable_rerank, 'min_score': min_score, 'aggregations': aggregations, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': enable_rerank } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, generate_vector=enable_embedding, context=context ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, normalized_query=parsed_query.normalized_query, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, domain=parsed_query.domain, is_simple_query=self.boolean_parser.is_simple_query(parsed_query.rewritten_query) ) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"域: {parsed_query.domain} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Boolean parsing context.start_stage(RequestContextStage.BOOLEAN_PARSING) try: query_node = None if self.boolean_parser.is_simple_query(parsed_query.rewritten_query): # Simple query query_text = parsed_query.rewritten_query context.logger.debug( f"简单查询 | 无布尔表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) else: # Complex boolean query query_node = self.boolean_parser.parse(parsed_query.rewritten_query) query_text = parsed_query.rewritten_query context.store_intermediate_result('query_node', query_node) context.store_intermediate_result('boolean_ast', str(query_node)) context.logger.info( f"布尔表达式解析 | AST: {query_node}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"布尔表达式解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.BOOLEAN_PARSING) # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: es_query = self.query_builder.build_multilang_query( parsed_query=parsed_query, query_vector=parsed_query.query_vector if enable_embedding else None, query_node=query_node, filters=filters, size=size, from_=from_, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score ) # Add SPU collapse if configured if self.config.spu_config.enabled: es_query = self.query_builder.add_spu_collapse( es_query, self.config.spu_config.spu_field, self.config.spu_config.inner_hits_size ) # Add aggregations for faceted search if aggregations: # Use dynamic aggregations from request es_query = self.query_builder.add_dynamic_aggregations(es_query, aggregations) elif filters: # Fallback to filter-based aggregations agg_fields = [f"{k}_keyword" for k in filters.keys() if f"{k}_keyword" in [f.name for f in self.config.fields]] if agg_fields: es_query = self.query_builder.add_aggregations(es_query, agg_fields) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) context.store_intermediate_result('es_body_for_search', body_for_es) context.logger.info( f"ES查询构建完成 | 大小: {len(str(es_query))}字符 | " f"KNN: {'是' if enable_embedding and parsed_query.query_vector is not None else '否'} | " f"聚合: {'是' if filters else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) context.logger.debug( f"ES查询详情: {es_query}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: es_response = self.es_client.search( index_name=self.config.es_index_name, body=body_for_es, size=size, from_=from_ ) # Store ES response in context context.store_intermediate_result('es_response', es_response) # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {es_response.get('hits', {}).get('max_score', 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: hits = [] raw_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: for hit in es_response['hits']['hits']: raw_hits.append(hit) result_doc = { '_id': hit['_id'], '_score': hit['_score'], '_source': hit['_source'] } # Apply custom ranking if enabled if enable_rerank: base_score = hit['_score'] knn_score = None # Check if KNN was used if 'knn' in es_query: # KNN score would be in the combined score # For simplicity, extract from score knn_score = base_score * 0.2 # Approximate based on our formula custom_score = self.ranking_engine.calculate_score( hit, base_score, knn_score ) result_doc['_custom_score'] = custom_score result_doc['_original_score'] = base_score hits.append(result_doc) # Re-sort by custom score if reranking enabled if enable_rerank: hits.sort(key=lambda x: x.get('_custom_score', x['_score']), reverse=True) context.logger.info( f"重排序完成 | 基于自定义评分表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store intermediate results in context context.store_intermediate_result('raw_hits', raw_hits) context.store_intermediate_result('processed_hits', hits) # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score', 0.0) # Extract aggregations aggregations = es_response.get('aggregations', {}) context.logger.info( f"结果处理完成 | 返回: {len(hits)}条 | 总计: {total_value}条 | " f"重排序: {'是' if enable_rerank else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Build result result = SearchResult( hits=hits, total=total_value, max_score=max_score, took_ms=int(total_duration), aggregations=aggregations, query_info=parsed_query.to_dict() ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, size: int = 10, filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity. Args: image_url: URL of query image size: Number of results filters: Additional filters Returns: SearchResult object """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding from embeddings import CLIPImageEncoder image_encoder = CLIPImageEncoder() image_vector = image_encoder.encode_image_from_url(image_url) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } if filters: es_query["query"] = { "bool": { "filter": self.query_builder._build_filters(filters) } } # Execute search es_response = self.es_client.search( index_name=self.config.es_index_name, body=es_query, size=size ) # Process results (similar to text search) hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: for hit in es_response['hits']['hits']: hits.append({ '_id': hit['_id'], '_score': hit['_score'], '_source': hit['_source'] }) total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total return SearchResult( hits=hits, total=total_value, max_score=es_response.get('hits', {}).get('max_score', 0.0), took_ms=es_response.get('took', 0), query_info={'image_url': image_url, 'search_type': 'image_similarity'} ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of all configured domains. Returns: Dictionary with domain information """ return self.query_builder.get_domain_summary() def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: doc_id: Document ID Returns: Document or None if not found """ try: response = self.es_client.client.get( index=self.config.es_index_name, id=doc_id ) return response.get('_source') except Exception as e: print(f"[Searcher] Failed to get document {doc_id}: {e}") return None