""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, boolean expressions, ranking, and result formatting. """ from typing import Dict, Any, List, Optional, Union import time from config import CustomerConfig from utils.es_client import ESClient from query import QueryParser, ParsedQuery from indexer import MappingGenerator from .boolean_parser import BooleanParser, QueryNode from .es_query_builder import ESQueryBuilder from .multilang_query_builder import MultiLanguageQueryBuilder from .ranking_engine import RankingEngine from context.request_context import RequestContext, RequestContextStage, create_request_context from api.models import FacetResult, FacetValue class SearchResult: """Container for search results (重构版).""" def __init__( self, hits: List[Dict[str, Any]], total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.hits = hits self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "hits": self.hits, "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Boolean expression parsing - ES query building - Result ranking and formatting """ def __init__( self, config: CustomerConfig, es_client: ESClient, query_parser: Optional[QueryParser] = None ): """ Initialize searcher. Args: config: Customer configuration es_client: Elasticsearch client query_parser: Query parser (created if not provided) """ self.config = config self.es_client = es_client self.query_parser = query_parser or QueryParser(config) # Initialize components self.boolean_parser = BooleanParser() self.ranking_engine = RankingEngine(config.ranking.expression) # Get mapping info mapping_gen = MappingGenerator(config) self.match_fields = mapping_gen.get_match_fields_for_domain("default") self.text_embedding_field = mapping_gen.get_text_embedding_field() self.image_embedding_field = mapping_gen.get_image_embedding_field() # Query builder - use multi-language version self.query_builder = MultiLanguageQueryBuilder( config=config, index_name=config.es_index_name, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field ) def search( self, query: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[Any]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False ) -> SearchResult: """ Execute search query (重构版). Args: query: Search query string size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (created if not provided) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output Returns: SearchResult object """ # Create context if not provided (backward compatibility) if context is None: context = create_request_context() # Always use config defaults (these are backend configuration, not user parameters) enable_translation = self.config.query_config.enable_translation enable_embedding = self.config.query_config.enable_text_embedding enable_rerank = True # Always enable reranking as it's part of the search logic # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, " f"enable_rerank={enable_rerank}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': enable_rerank, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': enable_rerank } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, generate_vector=enable_embedding, context=context ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, normalized_query=parsed_query.normalized_query, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, domain=parsed_query.domain, is_simple_query=self.boolean_parser.is_simple_query(parsed_query.rewritten_query) ) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"域: {parsed_query.domain} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Boolean parsing context.start_stage(RequestContextStage.BOOLEAN_PARSING) try: query_node = None if self.boolean_parser.is_simple_query(parsed_query.rewritten_query): # Simple query query_text = parsed_query.rewritten_query context.logger.debug( f"简单查询 | 无布尔表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) else: # Complex boolean query query_node = self.boolean_parser.parse(parsed_query.rewritten_query) query_text = parsed_query.rewritten_query context.store_intermediate_result('query_node', query_node) context.store_intermediate_result('boolean_ast', str(query_node)) context.logger.info( f"布尔表达式解析 | AST: {query_node}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"布尔表达式解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.BOOLEAN_PARSING) # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: es_query = self.query_builder.build_multilang_query( parsed_query=parsed_query, query_vector=parsed_query.query_vector if enable_embedding else None, query_node=query_node, filters=filters, range_filters=range_filters, size=size, from_=from_, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score ) # Add SPU collapse if configured if self.config.spu_config.enabled: es_query = self.query_builder.add_spu_collapse( es_query, self.config.spu_config.spu_field, self.config.spu_config.inner_hits_size ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) context.store_intermediate_result('es_body_for_search', body_for_es) context.logger.info( f"ES查询构建完成 | 大小: {len(str(es_query))}字符 | " f"KNN: {'是' if enable_embedding and parsed_query.query_vector is not None else '否'} | " f"分面: {'是' if facets else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) context.logger.debug( f"ES查询详情: {es_query}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: es_response = self.es_client.search( index_name=self.config.es_index_name, body=body_for_es, size=size, from_=from_ ) # Store ES response in context context.store_intermediate_result('es_response', es_response) # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: hits = [] raw_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: for hit in es_response['hits']['hits']: raw_hits.append(hit) result_doc = { '_id': hit['_id'], '_score': hit.get('_score') or 0.0, '_source': hit['_source'] } # Apply custom ranking if enabled if enable_rerank: base_score = hit.get('_score') or 0.0 knn_score = None # Check if KNN was used if 'knn' in es_query: # KNN score would be in the combined score # For simplicity, extract from score knn_score = base_score * 0.2 # Approximate based on our formula custom_score = self.ranking_engine.calculate_score( hit, base_score, knn_score ) result_doc['_custom_score'] = custom_score result_doc['_original_score'] = base_score hits.append(result_doc) # Re-sort by custom score if reranking enabled if enable_rerank: hits.sort(key=lambda x: x.get('_custom_score', x['_score']), reverse=True) context.logger.info( f"重排序完成 | 基于自定义评分表达式", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store intermediate results in context context.store_intermediate_result('raw_hits', raw_hits) context.store_intermediate_result('processed_hits', hits) # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Standardize facets standardized_facets = self._standardize_facets( es_response.get('aggregations', {}), facets, filters ) context.logger.info( f"结果处理完成 | 返回: {len(hits)}条 | 总计: {total_value}条 | " f"重排序: {'是' if enable_rerank else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "normalized_query": context.query_analysis.normalized_query, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "translations": context.query_analysis.translations, "has_vector": context.query_analysis.query_vector is not None, "is_simple_query": context.query_analysis.is_simple_query, "boolean_ast": context.get_intermediate_result('boolean_ast'), "domain": context.query_analysis.domain }, "es_query": context.get_intermediate_result('es_query', {}), "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}) }, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "search_params": context.metadata.get('search_params', {}) } # Build result result = SearchResult( hits=hits, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (重构版). Args: image_url: URL of query image size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding from embeddings import CLIPImageEncoder image_encoder = CLIPImageEncoder() image_vector = image_encoder.encode_image_from_url(image_url) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: es_query["query"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=self.config.es_index_name, body=es_query, size=size ) # Process results (similar to text search) hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: for hit in es_response['hits']['hits']: hits.append({ '_id': hit['_id'], '_score': hit['_score'], '_source': hit['_source'] }) total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total return SearchResult( hits=hits, total=total_value, max_score=es_response.get('hits', {}).get('max_score') or 0.0, took_ms=es_response.get('took', 0), query_info={'image_url': image_url, 'search_type': 'image_similarity'} ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of all configured domains. Returns: Dictionary with domain information """ return self.query_builder.get_domain_summary() def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: doc_id: Document ID Returns: Document or None if not found """ try: response = self.es_client.client.get( index=self.config.es_index_name, id=doc_id ) return response.get('_source') except Exception as e: print(f"[Searcher] Failed to get document {doc_id}: {e}") return None def _standardize_facets( self, es_aggregations: Dict[str, Any], facet_configs: Optional[List[Union[str, Any]]], current_filters: Optional[Dict[str, Any]] ) -> Optional[List[FacetResult]]: """ 将 ES 聚合结果转换为标准化的分面格式(返回 Pydantic 模型)。 Args: es_aggregations: ES 原始聚合结果 facet_configs: 分面配置列表(str 或 FacetConfig) current_filters: 当前应用的过滤器 Returns: 标准化的分面结果列表(FacetResult 对象) """ if not es_aggregations or not facet_configs: return None standardized_facets: List[FacetResult] = [] for config in facet_configs: # 解析配置 if isinstance(config, str): field = config facet_type = "terms" else: # FacetConfig 对象 field = config.field facet_type = config.type agg_name = f"{field}_facet" if agg_name not in es_aggregations: continue agg_result = es_aggregations[agg_name] # 获取当前字段的选中值 selected_values = set() if current_filters and field in current_filters: filter_value = current_filters[field] if isinstance(filter_value, list): selected_values = set(filter_value) else: selected_values = {filter_value} # 转换 buckets 为 FacetValue 对象 facet_values: List[FacetValue] = [] if 'buckets' in agg_result: for bucket in agg_result['buckets']: value = bucket.get('key') count = bucket.get('doc_count', 0) facet_values.append(FacetValue( value=value, label=str(value), count=count, selected=value in selected_values )) # 构建 FacetResult 对象 facet_result = FacetResult( field=field, label=self._get_field_label(field), type=facet_type, values=facet_values ) standardized_facets.append(facet_result) return standardized_facets if standardized_facets else None def _get_field_label(self, field: str) -> str: """获取字段的显示标签""" # 从配置中获取字段标签 for field_config in self.config.fields: if field_config.name == field: # 尝试获取 label 属性 return getattr(field_config, 'label', field) # 如果没有配置,返回字段名 return field