""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, ranking, and result formatting. """ from typing import Dict, Any, List, Optional import json import logging import hashlib from string import Formatter from utils.es_client import ESClient from query import QueryParser, ParsedQuery from query.style_intent import StyleIntentRegistry from embeddings.image_encoder import CLIPImageEncoder from .es_query_builder import ESQueryBuilder from .sku_intent_selector import SkuSelectionDecision, StyleSkuSelector from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from context.request_context import RequestContext, RequestContextStage from api.models import FacetResult, FacetConfig from api.result_formatter import ResultFormatter from indexer.mapping_generator import get_tenant_index_name logger = logging.getLogger(__name__) backend_verbose_logger = logging.getLogger("backend.verbose") def _log_backend_verbose(payload: Dict[str, Any]) -> None: if not backend_verbose_logger.handlers: return backend_verbose_logger.info( json.dumps(payload, ensure_ascii=False, separators=(",", ":")) ) def _index_debug_rows_by_doc(rows: Any) -> Dict[str, Dict[str, Any]]: indexed: Dict[str, Dict[str, Any]] = {} if not isinstance(rows, list): return indexed for item in rows: if not isinstance(item, dict): continue doc_id = item.get("doc_id") if doc_id is None: continue indexed[str(doc_id)] = item return indexed def _summarize_ltr_features(per_result_debug: List[Dict[str, Any]], top_n: int = 20) -> Dict[str, Any]: rows = list(per_result_debug[:top_n]) if not rows: return {"top_n": 0, "counts": {}, "averages": {}, "top_docs": []} def _feature(row: Dict[str, Any], key: str) -> Any: funnel = row.get("ranking_funnel", {}) for stage_name in ("rerank", "fine_rank", "coarse_rank"): stage_features = funnel.get(stage_name, {}).get("ltr_features") if isinstance(stage_features, dict) and key in stage_features: return stage_features.get(key) return None def _count(flag: str) -> int: return sum(1 for row in rows if bool(_feature(row, flag))) def _avg(name: str) -> float | None: values = [float(v) for row in rows if (v := _feature(row, name)) is not None] if not values: return None return round(sum(values) / len(values), 6) top_docs = [] for row in rows[:10]: top_docs.append( { "spu_id": row.get("spu_id"), "final_rank": row.get("final_rank"), "title_zh": row.get("title_multilingual", {}).get("zh") if isinstance(row.get("title_multilingual"), dict) else None, "es_score": _feature(row, "es_score"), "text_score": _feature(row, "text_score"), "knn_score": _feature(row, "knn_score"), "rerank_score": _feature(row, "rerank_score"), "fine_score": _feature(row, "fine_score"), "has_translation_match": _feature(row, "has_translation_match"), "has_text_knn": _feature(row, "has_text_knn"), "has_image_knn": _feature(row, "has_image_knn"), "has_style_boost": _feature(row, "has_style_boost"), } ) return { "top_n": len(rows), "counts": { "translation_match_docs": _count("has_translation_match"), "text_knn_docs": _count("has_text_knn"), "image_knn_docs": _count("has_image_knn"), "style_boost_docs": _count("has_style_boost"), "text_fallback_to_es_docs": _count("text_score_fallback_to_es"), }, "averages": { "es_score": _avg("es_score"), "text_score": _avg("text_score"), "knn_score": _avg("knn_score"), "rerank_score": _avg("rerank_score"), "fine_score": _avg("fine_score"), "source_score": _avg("source_score"), "translation_score": _avg("translation_score"), "text_knn_score": _avg("text_knn_score"), "image_knn_score": _avg("image_knn_score"), }, "top_docs": top_docs, } class SearchResult: """Container for search results (外部友好格式).""" def __init__( self, results: List[Any], # List[SpuResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None, related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.suggestions = suggestions or [] self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info, "suggestions": self.suggestions, "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Dynamic multi-language text recall planning - ES query building - Result ranking and formatting """ def __init__( self, es_client: ESClient, config: SearchConfig, query_parser: Optional[QueryParser] = None, image_encoder: Optional[CLIPImageEncoder] = None, ): """ Initialize searcher. Args: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) image_encoder: Optional pre-initialized image encoder """ self.es_client = es_client self.config = config self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" self.image_embedding_field = config.query_config.image_embedding_field if self.image_embedding_field and image_encoder is None: self.image_encoder = CLIPImageEncoder() else: self.image_encoder = image_encoder # Index name is now generated dynamically per tenant, no longer stored here self.query_parser = query_parser or QueryParser(config, image_encoder=self.image_encoder) self.source_fields = config.query_config.source_fields self.style_intent_registry = StyleIntentRegistry.from_query_config(self.config.query_config) self.style_sku_selector = StyleSkuSelector( self.style_intent_registry, text_encoder_getter=lambda: getattr(self.query_parser, "text_encoder", None), ) # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( match_fields=[], field_boosts=self.config.field_boosts, multilingual_fields=self.config.query_config.multilingual_fields, shared_fields=self.config.query_config.shared_fields, core_multilingual_fields=self.config.query_config.core_multilingual_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, source_fields=self.source_fields, function_score_config=self.config.function_score, default_language=self.config.query_config.default_language, knn_text_boost=self.config.query_config.knn_text_boost, knn_image_boost=self.config.query_config.knn_image_boost, knn_text_k=self.config.query_config.knn_text_k, knn_text_num_candidates=self.config.query_config.knn_text_num_candidates, knn_text_k_long=self.config.query_config.knn_text_k_long, knn_text_num_candidates_long=self.config.query_config.knn_text_num_candidates_long, knn_image_k=self.config.query_config.knn_image_k, knn_image_num_candidates=self.config.query_config.knn_image_num_candidates, base_minimum_should_match=self.config.query_config.base_minimum_should_match, translation_minimum_should_match=self.config.query_config.translation_minimum_should_match, translation_boost=self.config.query_config.translation_boost, tie_breaker_base_query=self.config.query_config.tie_breaker_base_query, best_fields_boosts=self.config.query_config.best_fields, best_fields_clause_boost=self.config.query_config.best_fields_boost, phrase_field_boosts=self.config.query_config.phrase_fields, phrase_match_boost=self.config.query_config.phrase_match_boost, ) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return full source) - []: _source=false (return no source fields) - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _resolve_exact_knn_rescore_window(self) -> int: configured = int(self.config.rerank.exact_knn_rescore_window) if configured > 0: return configured return int(self.config.rerank.rerank_window) def _build_exact_knn_rescore( self, *, query_vector: Any, image_query_vector: Any, parsed_query: Optional[ParsedQuery] = None, ) -> Optional[Dict[str, Any]]: clauses: List[Dict[str, Any]] = [] text_clause = self.query_builder.build_exact_text_knn_rescore_clause( query_vector, parsed_query=parsed_query, query_name="exact_text_knn_query", ) if text_clause: clauses.append(text_clause) image_clause = self.query_builder.build_exact_image_knn_rescore_clause( image_query_vector, query_name="exact_image_knn_query", ) if image_clause: clauses.append(image_clause) if not clauses: return None return { "window_size": self._resolve_exact_knn_rescore_window(), "query": { # Phase 1: only compute exact vector scores and expose them in matched_queries. "score_mode": "total", "query_weight": 1.0, "rescore_query_weight": 0.0, "rescore_query": { "bool": { "should": clauses, "minimum_should_match": 1, } }, }, } def _attach_exact_knn_rescore( self, es_query: Dict[str, Any], *, in_rank_window: bool, query_vector: Any, image_query_vector: Any, parsed_query: Optional[ParsedQuery] = None, ) -> None: if not in_rank_window or not self.config.rerank.exact_knn_rescore_enabled: return rescore = self._build_exact_knn_rescore( query_vector=query_vector, image_query_vector=image_query_vector, parsed_query=parsed_query, ) if not rescore: return existing = es_query.get("rescore") if existing is None: es_query["rescore"] = rescore elif isinstance(existing, list): es_query["rescore"] = [*existing, rescore] else: es_query["rescore"] = [existing, rescore] def _resolve_rerank_source_filter( self, doc_template: str, parsed_query: Optional[ParsedQuery] = None, ) -> Dict[str, Any]: """ Build a lightweight _source filter for rerank prefetch. Only fetch fields required by rerank doc template to reduce ES payload. """ field_map = { "title": "title", "brief": "brief", "vendor": "vendor", "description": "description", "category_path": "category_path", } includes: set[str] = set() template = str(doc_template or "{title}") for _, field_name, _, _ in Formatter().parse(template): if not field_name: continue key = field_name.split(".", 1)[0].split("!", 1)[0].split(":", 1)[0] mapped = field_map.get(key) if mapped: includes.add(mapped) # Fallback to title-only to keep rerank docs usable. if not includes: includes.add("title") if self._has_style_intent(parsed_query): includes.update( { "skus", "option1_name", "option2_name", "option3_name", } ) return {"includes": sorted(includes)} def _fetch_hits_by_ids( self, index_name: str, doc_ids: List[str], source_spec: Optional[Any], ) -> tuple[Dict[str, Dict[str, Any]], int]: """ Fetch page documents by IDs for final response fill. Returns: (hits_by_id, es_took_ms) """ if not doc_ids: return {}, 0 body: Dict[str, Any] = { "query": { "ids": { "values": doc_ids, } } } if source_spec is not None: body["_source"] = source_spec resp = self.es_client.search( index_name=index_name, body=body, size=len(doc_ids), from_=0, ) hits = resp.get("hits", {}).get("hits") or [] hits_by_id: Dict[str, Dict[str, Any]] = {} for hit in hits: hid = hit.get("_id") if hid is None: continue hits_by_id[str(hid)] = hit return hits_by_id, int(resp.get("took", 0) or 0) @staticmethod def _restore_hits_in_doc_order( doc_ids: List[str], hits_by_id: Dict[str, Dict[str, Any]], ) -> List[Dict[str, Any]]: ordered_hits: List[Dict[str, Any]] = [] for doc_id in doc_ids: hit = hits_by_id.get(str(doc_id)) if hit is not None: ordered_hits.append(hit) return ordered_hits @staticmethod def _merge_source_specs(*source_specs: Any) -> Optional[Dict[str, Any]]: includes: set[str] = set() for source_spec in source_specs: if not isinstance(source_spec, dict): continue for field_name in source_spec.get("includes") or []: includes.add(str(field_name)) if not includes: return None return {"includes": sorted(includes)} @staticmethod def _has_style_intent(parsed_query: Optional[ParsedQuery]) -> bool: profile = getattr(parsed_query, "style_intent_profile", None) return bool(getattr(profile, "is_active", False)) def _apply_style_intent_to_hits( self, es_hits: List[Dict[str, Any]], parsed_query: ParsedQuery, context: Optional[RequestContext] = None, ) -> Dict[str, SkuSelectionDecision]: if context is not None: context.start_stage(RequestContextStage.STYLE_SKU_PREPARE_HITS) try: return self.style_sku_selector.prepare_hits(es_hits, parsed_query) finally: if context is not None: context.end_stage(RequestContextStage.STYLE_SKU_PREPARE_HITS) def search( self, query: str, tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[FacetConfig]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False, language: str = "en", sku_filter_dimension: Optional[List[str]] = None, enable_rerank: Optional[bool] = None, rerank_query_template: Optional[str] = None, rerank_doc_template: Optional[str] = None, ) -> SearchResult: """ Execute search query (外部友好格式). Args: query: Search query string tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (required) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output language: Response / field selection language hint (e.g. zh, en) sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides whether the final rerank provider is invoked (subject to rank window). When false, the ranking pipeline still runs and rerank stage becomes pass-through. rerank_query_template: Override for rerank query text template; None uses ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``). rerank_doc_template: Override for per-hit document text passed to rerank; None uses ``config.rerank.rerank_doc_template``. Placeholders are resolved in ``search/rerank_client.py``. Returns: SearchResult object with formatted results """ if context is None: raise ValueError("context is required") # 根据租户配置决定翻译开关(离线/在线统一) tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_langs = tenant_cfg.get("index_languages") or [] enable_translation = len(index_langs) > 0 enable_embedding = self.config.query_config.enable_text_embedding coarse_cfg = self.config.coarse_rank fine_cfg = self.config.fine_rank rc = self.config.rerank effective_query_template = rerank_query_template or rc.rerank_query_template effective_doc_template = rerank_doc_template or rc.rerank_doc_template fine_query_template = fine_cfg.rerank_query_template or effective_query_template fine_doc_template = fine_cfg.rerank_doc_template or effective_doc_template # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启) rerank_enabled_by_config = bool(rc.enabled) do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank) fine_enabled = bool(fine_cfg.enabled) rerank_window = rc.rerank_window coarse_input_window = max(rerank_window, int(coarse_cfg.input_window)) coarse_output_window = max(rerank_window, int(coarse_cfg.output_window)) fine_input_window = max(rerank_window, int(fine_cfg.input_window)) fine_output_window = max(rerank_window, int(fine_cfg.output_window)) # 多阶段排序窗口独立于最终 rerank 开关:即使关闭最终 rerank,也保留 coarse/fine 流程。 in_rank_window = (from_ + size) <= rerank_window es_fetch_from = 0 if in_rank_window else from_ es_fetch_size = coarse_input_window if in_rank_window else size es_score_normalization_factor: Optional[float] = None initial_ranks_by_doc: Dict[str, int] = {} coarse_ranks_by_doc: Dict[str, int] = {} fine_ranks_by_doc: Dict[str, int] = {} rerank_ranks_by_doc: Dict[str, int] = {} coarse_debug_info: Optional[Dict[str, Any]] = None fine_debug_info: Optional[Dict[str, Any]] = None rerank_debug_info: Optional[Dict[str, Any]] = None # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, " f"fine_enabled(config)={fine_enabled}, " f"enable_rerank(effective)={do_rerank}, in_rank_window={in_rank_window}, " f"es_fetch=({es_fetch_from},{es_fetch_size}) | " f"index_languages={index_langs} | " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'es_fetch_from': es_fetch_from, 'es_fetch_size': es_fetch_size, 'in_rank_window': in_rank_window, 'rerank_enabled_by_config': rerank_enabled_by_config, 'fine_enabled': fine_enabled, 'enable_rerank_request': enable_rerank, 'rerank_query_template': effective_query_template, 'rerank_doc_template': effective_doc_template, 'fine_query_template': fine_query_template, 'fine_doc_template': fine_doc_template, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': do_rerank, 'coarse_input_window': coarse_input_window, 'coarse_output_window': coarse_output_window, 'fine_input_window': fine_input_window, 'fine_output_window': fine_output_window, 'rerank_window': rerank_window, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'fine_enabled': fine_enabled, 'rerank_enabled': do_rerank, 'style_intent_enabled': bool(self.style_intent_registry.enabled), } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, generate_vector=enable_embedding, tenant_id=tenant_id, context=context, target_languages=index_langs if enable_translation else [], ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, query_normalized=parsed_query.query_normalized, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, keywords_queries=parsed_query.keywords_queries, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, ) context.metadata["feature_flags"]["style_intent_active"] = self._has_style_intent(parsed_query) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"关键词: {parsed_query.keywords_queries} | " f"文本向量: {'是' if parsed_query.query_vector is not None else '否'} | " f"图片向量: {'是' if parsed_query.image_query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # index_name = "search_products" # No longer need to add tenant_id to filters since each tenant has its own index image_query_vector = None if enable_embedding: image_query_vector = parsed_query.image_query_vector es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.query_normalized, query_vector=parsed_query.query_vector if enable_embedding else None, image_query_vector=image_query_vector, filters=filters, range_filters=range_filters, facet_configs=facets, size=es_fetch_size, from_=es_fetch_from, enable_knn=enable_embedding and ( parsed_query.query_vector is not None or image_query_vector is not None ), min_score=min_score, parsed_query=parsed_query, ) self._attach_exact_knn_rescore( es_query, in_rank_window=in_rank_window, query_vector=parsed_query.query_vector if enable_embedding else None, image_query_vector=image_query_vector, parsed_query=parsed_query, ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) es_query["track_scores"] = True # Keep requested response _source semantics for the final response fill. response_source_spec = es_query.get("_source") # In multi-stage rank window, first pass only needs score signals for coarse rank. es_query_for_fetch = es_query if in_rank_window: es_query_for_fetch = dict(es_query) es_query_for_fetch["_source"] = False # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query_for_fetch.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) # Serialize ES query to compute a compact size + stable digest for correlation es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":")) es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16] knn_enabled = bool(enable_embedding and ( parsed_query.query_vector is not None or image_query_vector is not None )) vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0 image_vector_dims = ( int(len(image_query_vector)) if image_query_vector is not None else 0 ) context.logger.info( "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | image_vector_dims: %s | facets: %s", len(es_query_compact), es_query_digest, "yes" if knn_enabled else "no", vector_dims, image_vector_dims, "yes" if facets else "no", extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ "event": "es_query_built", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "size_chars": len(es_query_compact), "sha256_16": es_query_digest, "knn_enabled": knn_enabled, "vector_dims": vector_dims, "image_vector_dims": image_vector_dims, "has_facets": bool(facets), "query": es_query_for_fetch, }) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search (primary recall) context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) try: # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( index_name=index_name, body=body_for_es, size=es_fetch_size, from_=es_fetch_from, include_named_queries_score=bool(in_rank_window), ) # Store ES response in context context.store_intermediate_result('es_response', es_response) if debug: initial_hits = es_response.get("hits", {}).get("hits") or [] for rank, hit in enumerate(initial_hits, 1): doc_id = hit.get("_id") if doc_id is not None: initial_ranks_by_doc[str(doc_id)] = rank raw_initial_max_score = es_response.get("hits", {}).get("max_score") try: es_score_normalization_factor = float(raw_initial_max_score) if raw_initial_max_score is not None else None except (TypeError, ValueError): es_score_normalization_factor = None if es_score_normalization_factor is None and initial_hits: first_score = initial_hits[0].get("_score") try: es_score_normalization_factor = float(first_score) if first_score is not None else None except (TypeError, ValueError): es_score_normalization_factor = None # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) style_intent_decisions: Dict[str, SkuSelectionDecision] = {} if in_rank_window: from dataclasses import asdict from config.services_config import get_rerank_backend_config, get_rerank_service_url from .rerank_client import coarse_resort_hits, run_lightweight_rerank, run_rerank coarse_fusion_debug = asdict(coarse_cfg.fusion) stage_fusion_debug = asdict(rc.fusion) def _rank_map(stage_hits: List[Dict[str, Any]]) -> Dict[str, int]: return { str(hit.get("_id")): rank for rank, hit in enumerate(stage_hits, 1) if hit.get("_id") is not None } def _stage_debug_info( *, enabled: bool, applied: bool, skipped_reason: Optional[str], service_profile: Optional[str], query_template: Optional[str], doc_template: Optional[str], docs_in: int, docs_out: int, top_n: int, meta: Optional[Dict[str, Any]] = None, backend: Optional[str] = None, backend_model_name: Optional[str] = None, service_url: Optional[str] = None, model: Optional[str] = None, fusion: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: return { "enabled": enabled, "applied": applied, "passthrough": not applied, "skipped_reason": skipped_reason, "service_profile": service_profile, "service_url": service_url, "backend": backend, "model": model, "backend_model_name": backend_model_name, "query_template": query_template, "doc_template": doc_template, "query_text": ( str(query_template).format_map({"query": rerank_query}) if query_template is not None else None ), "docs_in": docs_in, "docs_out": docs_out, "top_n": top_n, "meta": meta, "fusion": fusion, } def _rank_change(previous_rank: Optional[int], current_rank: Optional[int]) -> Optional[int]: if previous_rank is None or current_rank is None: return None return previous_rank - current_rank def _build_result_stage( *, rank: Optional[int], previous_rank: Optional[int], values: Optional[Dict[str, Any]] = None, signals: Optional[Dict[str, Any]] = None, signal_fields: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: stage_payload: Dict[str, Any] = { "rank": rank, "rank_change": _rank_change(previous_rank, rank), } if values: stage_payload.update(values) if signals: stage_payload["signals"] = signals stage_payload["ltr_features"] = signals.get("ltr_features") for shared_key in ("fusion_summary", "fusion_inputs", "fusion_factors"): if stage_payload.get(shared_key) is None: stage_payload[shared_key] = signals.get(shared_key) for payload_key, signal_key in (signal_fields or {}).items(): if stage_payload.get(payload_key) is None: stage_payload[payload_key] = signals.get(signal_key) return stage_payload def _run_optional_stage( *, stage: RequestContextStage, stage_label: str, enabled: bool, stage_hits: List[Dict[str, Any]], input_limit: int, output_limit: int, service_profile: Optional[str], query_template: str, doc_template: str, top_n: int, debug_key: Optional[str], runner, ) -> tuple[List[Dict[str, Any]], Dict[str, int], Optional[Dict[str, Any]]]: context.start_stage(stage) try: input_hits = list(stage_hits[:input_limit]) output_hits = list(stage_hits[:output_limit]) applied = False skip_reason: Optional[str] = None meta: Optional[Dict[str, Any]] = None debug_rows: Optional[List[Dict[str, Any]]] = None if enabled and input_hits: output_hits_candidate, applied, meta, debug_rows = runner(input_hits) if applied: output_hits = list((output_hits_candidate or input_hits)[:output_limit]) else: skip_reason = "service_returned_none" else: skip_reason = "disabled" if not enabled else "no_hits" ranks = _rank_map(output_hits) if debug else {} stage_info = None if debug: if applied: backend_name, backend_cfg = get_rerank_backend_config(service_profile) stage_info = _stage_debug_info( enabled=True, applied=True, skipped_reason=None, service_profile=service_profile, service_url=get_rerank_service_url(profile=service_profile), backend=backend_name, backend_model_name=backend_cfg.get("model_name"), model=meta.get("model") if isinstance(meta, dict) else None, query_template=query_template, doc_template=doc_template, docs_in=len(input_hits), docs_out=len(output_hits), top_n=top_n, meta=meta, fusion=stage_fusion_debug, ) if debug_key is not None and debug_rows is not None: context.store_intermediate_result(debug_key, debug_rows) else: stage_info = _stage_debug_info( enabled=enabled, applied=False, skipped_reason=skip_reason, service_profile=service_profile, query_template=query_template, doc_template=doc_template, docs_in=len(input_hits), docs_out=len(output_hits), top_n=top_n, fusion=stage_fusion_debug, ) if applied: context.logger.info( "%s完成 | docs=%s | top_n=%s | meta=%s", stage_label, len(output_hits), top_n, meta, extra={'reqid': context.reqid, 'uid': context.uid} ) else: context.logger.info( "%s透传 | reason=%s | docs=%s | top_n=%s", stage_label, skip_reason, len(output_hits), top_n, extra={'reqid': context.reqid, 'uid': context.uid} ) return output_hits, ranks, stage_info except Exception as e: output_hits = list(stage_hits[:output_limit]) ranks = _rank_map(output_hits) if debug else {} stage_info = None if debug: stage_info = _stage_debug_info( enabled=enabled, applied=False, skipped_reason="error", service_profile=service_profile, query_template=query_template, doc_template=doc_template, docs_in=min(len(stage_hits), input_limit), docs_out=len(output_hits), top_n=top_n, meta={"error": str(e)}, fusion=stage_fusion_debug, ) context.add_warning(f"{stage_label} failed: {e}") context.logger.warning( "调用%s服务失败 | error: %s", stage_label, e, extra={'reqid': context.reqid, 'uid': context.uid}, exc_info=True, ) return output_hits, ranks, stage_info finally: context.end_stage(stage) rerank_query = parsed_query.text_for_rerank() if parsed_query else query hits = es_response.get("hits", {}).get("hits") or [] context.start_stage(RequestContextStage.COARSE_RANKING) try: coarse_debug = coarse_resort_hits( hits, fusion=coarse_cfg.fusion, debug=debug, ) hits = hits[:coarse_output_window] es_response.setdefault("hits", {})["hits"] = hits if debug: coarse_ranks_by_doc = _rank_map(hits) coarse_debug_info = _stage_debug_info( enabled=True, applied=True, skipped_reason=None, service_profile=None, service_url=None, backend="local_coarse_fusion", backend_model_name=None, model=None, query_template=None, doc_template=None, docs_in=es_fetch_size, docs_out=len(hits), top_n=coarse_output_window, meta=None, fusion=coarse_fusion_debug, ) context.store_intermediate_result("coarse_rank_scores", coarse_debug) context.logger.info( "粗排完成 | docs_in=%s | docs_out=%s", es_fetch_size, len(hits), extra={'reqid': context.reqid, 'uid': context.uid} ) finally: context.end_stage(RequestContextStage.COARSE_RANKING) ranking_source_spec = self._merge_source_specs( self._resolve_rerank_source_filter( fine_doc_template, parsed_query=parsed_query, ), self._resolve_rerank_source_filter( effective_doc_template, parsed_query=parsed_query, ), ) candidate_ids = [str(h.get("_id")) for h in hits if h.get("_id") is not None] if candidate_ids: details_by_id, fill_took = self._fetch_hits_by_ids( index_name=index_name, doc_ids=candidate_ids, source_spec=ranking_source_spec, ) for hit in hits: hid = hit.get("_id") if hid is None: continue detail_hit = details_by_id.get(str(hid)) if detail_hit is not None and "_source" in detail_hit: hit["_source"] = detail_hit.get("_source") or {} if fill_took: es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) if self._has_style_intent(parsed_query): style_intent_decisions = self._apply_style_intent_to_hits( es_response.get("hits", {}).get("hits") or [], parsed_query, context=context, ) if style_intent_decisions: context.logger.info( "款式意图 SKU 预筛选完成 | hits=%s", len(style_intent_decisions), extra={'reqid': context.reqid, 'uid': context.uid} ) def _run_fine_stage(stage_input: List[Dict[str, Any]]): fine_scores, fine_meta, fine_debug_rows = run_lightweight_rerank( query=rerank_query, es_hits=stage_input, language=language, timeout_sec=fine_cfg.timeout_sec, rerank_query_template=fine_query_template, rerank_doc_template=fine_doc_template, top_n=fine_output_window, debug=debug, fusion=rc.fusion, style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, service_profile=fine_cfg.service_profile, ) return stage_input, fine_scores is not None, fine_meta, fine_debug_rows hits, fine_ranks_by_doc, fine_debug_info = _run_optional_stage( stage=RequestContextStage.FINE_RANKING, stage_label="精排", enabled=fine_enabled, stage_hits=es_response.get("hits", {}).get("hits") or [], input_limit=fine_input_window, output_limit=fine_output_window, service_profile=fine_cfg.service_profile, query_template=fine_query_template, doc_template=fine_doc_template, top_n=fine_output_window, debug_key="fine_rank_scores", runner=_run_fine_stage, ) es_response["hits"]["hits"] = hits def _run_rerank_stage(stage_input: List[Dict[str, Any]]): nonlocal es_response es_response["hits"]["hits"] = stage_input es_response, rerank_meta, fused_debug = run_rerank( query=rerank_query, es_response=es_response, language=language, timeout_sec=rc.timeout_sec, weight_es=rc.weight_es, weight_ai=rc.weight_ai, rerank_query_template=effective_query_template, rerank_doc_template=effective_doc_template, top_n=(from_ + size), debug=debug, fusion=rc.fusion, service_profile=rc.service_profile, style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, ) return ( es_response.get("hits", {}).get("hits") or [], rerank_meta is not None, rerank_meta, fused_debug, ) hits, rerank_ranks_by_doc, rerank_debug_info = _run_optional_stage( stage=RequestContextStage.RERANKING, stage_label="重排", enabled=do_rerank, stage_hits=es_response.get("hits", {}).get("hits") or [], input_limit=rerank_window, output_limit=rerank_window, service_profile=rc.service_profile, query_template=effective_query_template, doc_template=effective_doc_template, top_n=from_ + size, debug_key="rerank_scores", runner=_run_rerank_stage, ) es_response["hits"]["hits"] = hits # 当本次请求在排序窗口内时:已按多阶段排序产出前 rerank_window 条,需按请求的 from/size 做分页切片 if in_rank_window: hits = es_response.get("hits", {}).get("hits") or [] sliced = hits[from_ : from_ + size] es_response.setdefault("hits", {})["hits"] = sliced if sliced: slice_max = max( ( h.get("_fused_score", h.get("_fine_score", h.get("_coarse_score", h.get("_score", 0.0)))) for h in sliced ), default=0.0, ) try: es_response["hits"]["max_score"] = float(slice_max) except (TypeError, ValueError): es_response["hits"]["max_score"] = 0.0 else: es_response["hits"]["max_score"] = 0.0 if sliced: if response_source_spec is False: for hit in sliced: hit.pop("_source", None) context.logger.info( "分页详情回填跳过 | 原查询 _source=false", extra={'reqid': context.reqid, 'uid': context.uid} ) else: context.start_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) try: page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None] details_by_id, fill_took = self._fetch_hits_by_ids( index_name=index_name, doc_ids=page_ids, source_spec=response_source_spec, ) filled = 0 for hit in sliced: hid = hit.get("_id") if hid is None: continue detail_hit = details_by_id.get(str(hid)) if detail_hit is None: continue if "_source" in detail_hit: hit["_source"] = detail_hit.get("_source") or {} filled += 1 if style_intent_decisions: self.style_sku_selector.apply_precomputed_decisions( sliced, style_intent_decisions, ) if fill_took: es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) context.logger.info( f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms", extra={'reqid': context.reqid, 'uid': context.uid} ) finally: context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) context.logger.info( f"排序窗口分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", extra={'reqid': context.reqid, 'uid': context.uid} ) # 非重排窗口:款式意图在 result_processing 之前执行,便于单独计时且与 ES 召回阶段衔接 if self._has_style_intent(parsed_query) and not in_rank_window: es_hits_pre = es_response.get("hits", {}).get("hits") or [] style_intent_decisions = self._apply_style_intent_to_hits( es_hits_pre, parsed_query, context=context, ) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total # max_score 会在启用 AI 搜索时被更新为融合分数的最大值 max_score = es_response.get('hits', {}).get('max_score') or 0.0 # 从上下文中取出重排调试信息(若有) rerank_debug_by_doc = _index_debug_rows_by_doc(context.get_intermediate_result('rerank_scores', None)) coarse_debug_by_doc = _index_debug_rows_by_doc(context.get_intermediate_result('coarse_rank_scores', None)) fine_debug_by_doc = _index_debug_rows_by_doc(context.get_intermediate_result('fine_rank_scores', None)) if self._has_style_intent(parsed_query): if style_intent_decisions: self.style_sku_selector.apply_precomputed_decisions( es_hits, style_intent_decisions, ) # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language=language, sku_filter_dimension=sku_filter_dimension ) # Build per-result debug info (per SPU) when debug mode is enabled per_result_debug = [] if debug and es_hits and formatted_results: final_ranks_by_doc = { str(hit.get("_id")): from_ + rank for rank, hit in enumerate(es_hits, 1) if hit.get("_id") is not None } for hit, spu in zip(es_hits, formatted_results): source = hit.get("_source", {}) or {} doc_id = hit.get("_id") rerank_debug = None if doc_id is not None: rerank_debug = rerank_debug_by_doc.get(str(doc_id)) coarse_debug = None if doc_id is not None: coarse_debug = coarse_debug_by_doc.get(str(doc_id)) fine_debug = None if doc_id is not None: fine_debug = fine_debug_by_doc.get(str(doc_id)) style_intent_debug = None if doc_id is not None and style_intent_decisions: decision = style_intent_decisions.get(str(doc_id)) if decision is not None: style_intent_debug = decision.to_dict() raw_score = hit.get("_raw_es_score", hit.get("_original_score", hit.get("_score"))) try: es_score = float(raw_score) if raw_score is not None else 0.0 except (TypeError, ValueError): es_score = 0.0 try: normalized = ( float(es_score) / float(es_score_normalization_factor) if es_score_normalization_factor else None ) except (TypeError, ValueError, ZeroDivisionError): normalized = None title_multilingual = source.get("title") if isinstance(source.get("title"), dict) else None brief_multilingual = source.get("brief") if isinstance(source.get("brief"), dict) else None vendor_multilingual = source.get("vendor") if isinstance(source.get("vendor"), dict) else None debug_entry: Dict[str, Any] = { "spu_id": spu.spu_id, "es_score": es_score, "es_score_normalized": normalized, "initial_rank": initial_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None, "final_rank": final_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None, "title_multilingual": title_multilingual, "brief_multilingual": brief_multilingual, "vendor_multilingual": vendor_multilingual, } initial_rank = initial_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None coarse_rank = coarse_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None fine_rank = fine_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None rerank_rank = rerank_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None final_rank = final_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None rerank_previous_rank = fine_rank if fine_rank is not None else coarse_rank final_previous_rank = rerank_rank if final_previous_rank is None: final_previous_rank = fine_rank if final_previous_rank is None: final_previous_rank = coarse_rank if final_previous_rank is None: final_previous_rank = initial_rank debug_entry["ranking_funnel"] = { "es_recall": _build_result_stage( rank=initial_rank, previous_rank=None, values={ "score": es_score, "normalized_score": normalized, "matched_queries": hit.get("matched_queries"), }, ), "coarse_rank": _build_result_stage( rank=coarse_rank, previous_rank=initial_rank, values={ "score": coarse_debug.get("coarse_score") if coarse_debug else None, "es_score": coarse_debug.get("es_score") if coarse_debug else es_score, "text_score": coarse_debug.get("text_score") if coarse_debug else None, "knn_score": coarse_debug.get("knn_score") if coarse_debug else None, }, signals=coarse_debug, signal_fields={ "es_factor": "coarse_es_factor", "text_factor": "coarse_text_factor", "knn_factor": "coarse_knn_factor", "text_knn_factor": "coarse_text_knn_factor", "image_knn_factor": "coarse_image_knn_factor", }, ), "fine_rank": _build_result_stage( rank=fine_rank, previous_rank=coarse_rank, values={ "score": ( fine_debug.get("score") if fine_debug and fine_debug.get("score") is not None else hit.get("_fine_fused_score", hit.get("_fine_score")) ), "fine_score": fine_debug.get("fine_score") if fine_debug else hit.get("_fine_score"), "es_score": fine_debug.get("es_score") if fine_debug else es_score, "text_score": fine_debug.get("text_score") if fine_debug else hit.get("_text_score"), "knn_score": fine_debug.get("knn_score") if fine_debug else hit.get("_knn_score"), "rerank_input": fine_debug.get("rerank_input") if fine_debug else None, }, signals=fine_debug, signal_fields={ "es_factor": "es_factor", }, ), "rerank": _build_result_stage( rank=rerank_rank, previous_rank=rerank_previous_rank, values={ "score": rerank_debug.get("score") if rerank_debug else hit.get("_fused_score"), "es_score": rerank_debug.get("es_score") if rerank_debug else es_score, "rerank_score": rerank_debug.get("rerank_score") if rerank_debug else hit.get("_rerank_score"), "fine_score": rerank_debug.get("fine_score") if rerank_debug else hit.get("_fine_score"), "fused_score": rerank_debug.get("fused_score") if rerank_debug else hit.get("_fused_score"), "text_score": rerank_debug.get("text_score") if rerank_debug else hit.get("_text_score"), "knn_score": rerank_debug.get("knn_score") if rerank_debug else hit.get("_knn_score"), }, signals=rerank_debug, signal_fields={ "rerank_factor": "rerank_factor", "fine_factor": "fine_factor", "es_factor": "es_factor", "text_factor": "text_factor", "knn_factor": "knn_factor", }, ), "final_page": _build_result_stage( rank=final_rank, previous_rank=final_previous_rank, ), } if style_intent_debug: debug_entry["style_intent_sku"] = style_intent_debug per_result_debug.append(debug_entry) # Format facets standardized_facets = None if facets: standardized_facets = ResultFormatter.format_facets( es_response.get('aggregations', {}), facets, filters ) # Generate suggestions and related searches query_text = parsed_query.original_query if parsed_query else query suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: query_tokens = parsed_query.query_tokens if parsed_query else [] token_count = len(query_tokens) text_knn_is_long = token_count >= 5 text_knn_k = self.query_builder.knn_text_k_long if text_knn_is_long else self.query_builder.knn_text_k text_knn_num_candidates = ( self.query_builder.knn_text_num_candidates_long if text_knn_is_long else self.query_builder.knn_text_num_candidates ) ltr_summary = _summarize_ltr_features(per_result_debug) debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "query_normalized": context.query_analysis.query_normalized, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "index_languages": index_langs, "translations": context.query_analysis.translations, "keywords_queries": context.query_analysis.keywords_queries, "has_vector": context.query_analysis.query_vector is not None, "has_image_vector": parsed_query.image_query_vector is not None, "query_tokens": query_tokens, "intent_detection": context.get_intermediate_result("style_intent_profile"), }, "retrieval_plan": { "text_knn": { "enabled": bool(enable_embedding and parsed_query and parsed_query.query_vector is not None), "is_long_query_plan": text_knn_is_long, "token_count": token_count, "k": text_knn_k, "num_candidates": text_knn_num_candidates, "boost": ( self.query_builder.knn_text_boost * 1.4 if text_knn_is_long else self.query_builder.knn_text_boost ), }, "image_knn": { "enabled": bool( self.image_embedding_field and enable_embedding and parsed_query and image_query_vector is not None ), "k": self.query_builder.knn_image_k, "num_candidates": self.query_builder.knn_image_num_candidates, "boost": self.query_builder.knn_image_boost, }, }, "es_query": context.get_intermediate_result('es_query', {}), "es_query_context": { "es_fetch_from": es_fetch_from, "es_fetch_size": es_fetch_size, "in_rank_window": in_rank_window, "include_named_queries_score": bool(in_rank_window), "exact_knn_rescore_enabled": bool(rc.exact_knn_rescore_enabled and in_rank_window), "exact_knn_rescore_window": ( self._resolve_exact_knn_rescore_window() if rc.exact_knn_rescore_enabled and in_rank_window else None ), }, "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}), "es_score_normalization_factor": es_score_normalization_factor, }, "coarse_rank": coarse_debug_info, "fine_rank": fine_debug_info, "rerank": rerank_debug_info, "ranking_funnel": { "es_recall": { "docs_out": es_fetch_size, "score_normalization_factor": es_score_normalization_factor, }, "coarse_rank": coarse_debug_info, "fine_rank": fine_debug_info, "rerank": rerank_debug_info, }, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "stage_time_bounds_ms": { stage: { kk: round(vv, 3) for kk, vv in bounds.items() } for stage, bounds in context.performance_metrics.stage_time_bounds_ms.items() }, "search_params": context.metadata.get('search_params', {}) } if per_result_debug: debug_info["per_result"] = per_result_debug debug_info["ltr_summary"] = ltr_summary _log_backend_verbose({ "event": "search_debug_ltr_summary", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "query": query, "language": language, "top_n": ltr_summary.get("top_n"), "counts": ltr_summary.get("counts"), "averages": ltr_summary.get("averages"), "top_docs": ltr_summary.get("top_docs"), "query_analysis": { "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "translations": context.query_analysis.translations, "query_tokens": query_tokens, }, "retrieval_plan": debug_info["retrieval_plan"], "ranking_windows": { "es_fetch_size": es_fetch_size, "coarse_output_window": coarse_output_window if in_rank_window else None, "fine_input_window": fine_input_window if in_rank_window else None, "fine_output_window": fine_output_window if in_rank_window else None, "rerank_window": rerank_window if in_rank_window else None, "page_from": from_, "page_size": size, }, }) # Build result result = SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), suggestions=suggestions, related_searches=related_searches, debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (外部友好格式). Args: image_url: URL of query image tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding if self.image_encoder is None: raise RuntimeError("Image encoder is not initialized at startup") image_vector = self.image_encoder.encode_image_from_url(image_url, priority=1) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # No longer need to add tenant_id to filters since each tenant has its own index # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } # Apply source filtering semantics (None / [] / list) self._apply_source_filter(es_query) if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: if len(filter_clauses) == 1: es_query["knn"]["filter"] = filter_clauses[0] else: es_query["knn"]["filter"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=index_name, body=es_query, size=size ) # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language="en", # Default language for image search sku_filter_dimension=None # Image search doesn't support SKU filtering ) return SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=es_response.get('took', 0), facets=None, query_info={'image_url': image_url, 'search_type': 'image_similarity'}, suggestions=[], related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of dynamic text retrieval configuration. Returns: Dictionary with language-aware field information """ return { "mode": "dynamic_language_fields", "multilingual_fields": self.config.query_config.multilingual_fields, "shared_fields": self.config.query_config.shared_fields, "core_multilingual_fields": self.config.query_config.core_multilingual_fields, "field_boosts": self.config.field_boosts, } def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: tenant_id: Tenant ID (required to determine which index to query) doc_id: Document ID Returns: Document or None if not found """ try: index_name = get_tenant_index_name(tenant_id) response = self.es_client.client.get( index=index_name, id=doc_id ) return response.get('_source') except Exception as e: logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True) return None