""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, ranking, and result formatting. """ from typing import Dict, Any, List, Optional import json import logging import hashlib from string import Formatter from utils.es_client import ESClient from query import QueryParser, ParsedQuery from query.style_intent import StyleIntentRegistry from embeddings.image_encoder import CLIPImageEncoder from .es_query_builder import ESQueryBuilder from .sku_intent_selector import SkuSelectionDecision, StyleSkuSelector from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from context.request_context import RequestContext, RequestContextStage from api.models import FacetResult, FacetConfig from api.result_formatter import ResultFormatter from indexer.mapping_generator import get_tenant_index_name logger = logging.getLogger(__name__) backend_verbose_logger = logging.getLogger("backend.verbose") def _log_backend_verbose(payload: Dict[str, Any]) -> None: if not backend_verbose_logger.handlers: return backend_verbose_logger.info( json.dumps(payload, ensure_ascii=False, separators=(",", ":")) ) class SearchResult: """Container for search results (外部友好格式).""" def __init__( self, results: List[Any], # List[SpuResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None, related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.suggestions = suggestions or [] self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info, "suggestions": self.suggestions, "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Dynamic multi-language text recall planning - ES query building - Result ranking and formatting """ def __init__( self, es_client: ESClient, config: SearchConfig, query_parser: Optional[QueryParser] = None, image_encoder: Optional[CLIPImageEncoder] = None, ): """ Initialize searcher. Args: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) image_encoder: Optional pre-initialized image encoder """ self.es_client = es_client self.config = config # Index name is now generated dynamically per tenant, no longer stored here self.query_parser = query_parser or QueryParser(config) self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" self.image_embedding_field = config.query_config.image_embedding_field if self.image_embedding_field and image_encoder is None: self.image_encoder = CLIPImageEncoder() else: self.image_encoder = image_encoder self.source_fields = config.query_config.source_fields self.style_intent_registry = StyleIntentRegistry.from_query_config(self.config.query_config) self.style_sku_selector = StyleSkuSelector( self.style_intent_registry, text_encoder_getter=lambda: getattr(self.query_parser, "text_encoder", None), ) # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( match_fields=[], field_boosts=self.config.field_boosts, multilingual_fields=self.config.query_config.multilingual_fields, shared_fields=self.config.query_config.shared_fields, core_multilingual_fields=self.config.query_config.core_multilingual_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, source_fields=self.source_fields, function_score_config=self.config.function_score, default_language=self.config.query_config.default_language, knn_boost=self.config.query_config.knn_boost, base_minimum_should_match=self.config.query_config.base_minimum_should_match, translation_minimum_should_match=self.config.query_config.translation_minimum_should_match, translation_boost=self.config.query_config.translation_boost, tie_breaker_base_query=self.config.query_config.tie_breaker_base_query, best_fields_boosts=self.config.query_config.best_fields, best_fields_clause_boost=self.config.query_config.best_fields_boost, phrase_field_boosts=self.config.query_config.phrase_fields, phrase_match_boost=self.config.query_config.phrase_match_boost, ) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return full source) - []: _source=false (return no source fields) - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _resolve_rerank_source_filter( self, doc_template: str, parsed_query: Optional[ParsedQuery] = None, ) -> Dict[str, Any]: """ Build a lightweight _source filter for rerank prefetch. Only fetch fields required by rerank doc template to reduce ES payload. """ field_map = { "title": "title", "brief": "brief", "vendor": "vendor", "description": "description", "category_path": "category_path", } includes: set[str] = set() template = str(doc_template or "{title}") for _, field_name, _, _ in Formatter().parse(template): if not field_name: continue key = field_name.split(".", 1)[0].split("!", 1)[0].split(":", 1)[0] mapped = field_map.get(key) if mapped: includes.add(mapped) # Fallback to title-only to keep rerank docs usable. if not includes: includes.add("title") if self._has_style_intent(parsed_query): includes.update( { "skus", "option1_name", "option2_name", "option3_name", } ) return {"includes": sorted(includes)} def _fetch_hits_by_ids( self, index_name: str, doc_ids: List[str], source_spec: Optional[Any], ) -> tuple[Dict[str, Dict[str, Any]], int]: """ Fetch page documents by IDs for final response fill. Returns: (hits_by_id, es_took_ms) """ if not doc_ids: return {}, 0 body: Dict[str, Any] = { "query": { "ids": { "values": doc_ids, } } } if source_spec is not None: body["_source"] = source_spec resp = self.es_client.search( index_name=index_name, body=body, size=len(doc_ids), from_=0, ) hits = resp.get("hits", {}).get("hits") or [] hits_by_id: Dict[str, Dict[str, Any]] = {} for hit in hits: hid = hit.get("_id") if hid is None: continue hits_by_id[str(hid)] = hit return hits_by_id, int(resp.get("took", 0) or 0) @staticmethod def _has_style_intent(parsed_query: Optional[ParsedQuery]) -> bool: profile = getattr(parsed_query, "style_intent_profile", None) return bool(getattr(profile, "is_active", False)) def _apply_style_intent_to_hits( self, es_hits: List[Dict[str, Any]], parsed_query: ParsedQuery, context: Optional[RequestContext] = None, ) -> Dict[str, SkuSelectionDecision]: if context is not None: context.start_stage(RequestContextStage.STYLE_SKU_PREPARE_HITS) try: return self.style_sku_selector.prepare_hits(es_hits, parsed_query) finally: if context is not None: context.end_stage(RequestContextStage.STYLE_SKU_PREPARE_HITS) def search( self, query: str, tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[FacetConfig]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False, language: str = "en", sku_filter_dimension: Optional[List[str]] = None, enable_rerank: Optional[bool] = None, rerank_query_template: Optional[str] = None, rerank_doc_template: Optional[str] = None, ) -> SearchResult: """ Execute search query (外部友好格式). Args: query: Search query string tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (required) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output language: Response / field selection language hint (e.g. zh, en) sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides whether the rerank provider is invoked (subject to rerank window). rerank_query_template: Override for rerank query text template; None uses ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``). rerank_doc_template: Override for per-hit document text passed to rerank; None uses ``config.rerank.rerank_doc_template``. Placeholders are resolved in ``search/rerank_client.py``. Returns: SearchResult object with formatted results """ if context is None: raise ValueError("context is required") # 根据租户配置决定翻译开关(离线/在线统一) tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_langs = tenant_cfg.get("index_languages") or [] enable_translation = len(index_langs) > 0 enable_embedding = self.config.query_config.enable_text_embedding rc = self.config.rerank effective_query_template = rerank_query_template or rc.rerank_query_template effective_doc_template = rerank_doc_template or rc.rerank_doc_template # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启) rerank_enabled_by_config = bool(rc.enabled) do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank) rerank_window = rc.rerank_window # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES in_rerank_window = do_rerank and (from_ + size) <= rerank_window es_fetch_from = 0 if in_rerank_window else from_ es_fetch_size = rerank_window if in_rerank_window else size es_score_normalization_factor: Optional[float] = None initial_ranks_by_doc: Dict[str, int] = {} rerank_debug_info: Optional[Dict[str, Any]] = None # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, " f"enable_rerank(effective)={do_rerank}, in_rerank_window={in_rerank_window}, " f"es_fetch=({es_fetch_from},{es_fetch_size}) | " f"index_languages={index_langs} | " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'es_fetch_from': es_fetch_from, 'es_fetch_size': es_fetch_size, 'in_rerank_window': in_rerank_window, 'rerank_enabled_by_config': rerank_enabled_by_config, 'enable_rerank_request': enable_rerank, 'rerank_query_template': effective_query_template, 'rerank_doc_template': effective_doc_template, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': do_rerank, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': do_rerank, 'style_intent_enabled': bool(self.style_intent_registry.enabled), } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, generate_vector=enable_embedding, tenant_id=tenant_id, context=context, target_languages=index_langs if enable_translation else [], ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, query_normalized=parsed_query.query_normalized, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, ) context.metadata["feature_flags"]["style_intent_active"] = self._has_style_intent(parsed_query) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # index_name = "search_products" # No longer need to add tenant_id to filters since each tenant has its own index es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.query_normalized, query_vector=parsed_query.query_vector if enable_embedding else None, filters=filters, range_filters=range_filters, facet_configs=facets, size=es_fetch_size, from_=es_fetch_from, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score, parsed_query=parsed_query, ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) es_query["track_scores"] = True # Keep requested response _source semantics for the final response fill. response_source_spec = es_query.get("_source") # In rerank window, first pass only fetches minimal fields required by rerank template. es_query_for_fetch = es_query rerank_prefetch_source = None if in_rerank_window: rerank_prefetch_source = self._resolve_rerank_source_filter( effective_doc_template, parsed_query=parsed_query, ) es_query_for_fetch = dict(es_query) es_query_for_fetch["_source"] = rerank_prefetch_source # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query_for_fetch.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) if in_rerank_window and rerank_prefetch_source is not None: context.store_intermediate_result('es_query_rerank_prefetch_source', rerank_prefetch_source) # Serialize ES query to compute a compact size + stable digest for correlation es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":")) es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16] knn_enabled = bool(enable_embedding and parsed_query.query_vector is not None) vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0 context.logger.info( "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | facets: %s | rerank_prefetch_source: %s", len(es_query_compact), es_query_digest, "yes" if knn_enabled else "no", vector_dims, "yes" if facets else "no", rerank_prefetch_source, extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ "event": "es_query_built", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "size_chars": len(es_query_compact), "sha256_16": es_query_digest, "knn_enabled": knn_enabled, "vector_dims": vector_dims, "has_facets": bool(facets), "query": es_query_for_fetch, }) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search (primary recall) context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) try: # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( index_name=index_name, body=body_for_es, size=es_fetch_size, from_=es_fetch_from, include_named_queries_score=bool(do_rerank and in_rerank_window), ) # Store ES response in context context.store_intermediate_result('es_response', es_response) if debug: initial_hits = es_response.get("hits", {}).get("hits") or [] for rank, hit in enumerate(initial_hits, 1): doc_id = hit.get("_id") if doc_id is not None: initial_ranks_by_doc[str(doc_id)] = rank raw_initial_max_score = es_response.get("hits", {}).get("max_score") try: es_score_normalization_factor = float(raw_initial_max_score) if raw_initial_max_score is not None else None except (TypeError, ValueError): es_score_normalization_factor = None if es_score_normalization_factor is None and initial_hits: first_score = initial_hits[0].get("_score") try: es_score_normalization_factor = float(first_score) if first_score is not None else None except (TypeError, ValueError): es_score_normalization_factor = None # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) style_intent_decisions: Dict[str, SkuSelectionDecision] = {} if self._has_style_intent(parsed_query) and in_rerank_window: style_intent_decisions = self._apply_style_intent_to_hits( es_response.get("hits", {}).get("hits") or [], parsed_query, context=context, ) if style_intent_decisions: context.logger.info( "款式意图 SKU 预筛选完成 | hits=%s", len(style_intent_decisions), extra={'reqid': context.reqid, 'uid': context.uid} ) # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行) if do_rerank and in_rerank_window: context.start_stage(RequestContextStage.RERANKING) try: from .rerank_client import run_rerank rerank_query = parsed_query.text_for_rerank() if parsed_query else query es_response, rerank_meta, fused_debug = run_rerank( query=rerank_query, es_response=es_response, language=language, timeout_sec=rc.timeout_sec, weight_es=rc.weight_es, weight_ai=rc.weight_ai, rerank_query_template=effective_query_template, rerank_doc_template=effective_doc_template, top_n=(from_ + size), debug=debug, fusion=rc.fusion, style_intent_selected_sku_boost=self.config.query_config.style_intent_selected_sku_boost, ) if rerank_meta is not None: if debug: from dataclasses import asdict from config.services_config import get_rerank_service_url rerank_debug_info = { "service_url": get_rerank_service_url(), "query_template": effective_query_template, "doc_template": effective_doc_template, "query_text": str(effective_query_template).format_map({"query": rerank_query}), "docs": len(es_response.get("hits", {}).get("hits") or []), "top_n": from_ + size, "meta": rerank_meta, "fusion": asdict(rc.fusion), } context.store_intermediate_result("rerank_scores", fused_debug) context.logger.info( f"重排完成 | docs={len(es_response.get('hits', {}).get('hits') or [])} | " f"top_n={from_ + size} | meta={rerank_meta}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.add_warning(f"Rerank failed: {e}") context.logger.warning( f"调用重排服务失败 | error: {e}", extra={'reqid': context.reqid, 'uid': context.uid}, exc_info=True, ) finally: context.end_stage(RequestContextStage.RERANKING) # 当本次请求在重排窗口内时:已从 ES 取了 rerank_window 条并可能已重排,需按请求的 from/size 做分页切片 if in_rerank_window: hits = es_response.get("hits", {}).get("hits") or [] sliced = hits[from_ : from_ + size] es_response.setdefault("hits", {})["hits"] = sliced if sliced: # 对于启用重排的结果,优先使用 _fused_score 计算 max_score;否则退回原始 _score slice_max = max( (h.get("_fused_score", h.get("_score", 0.0)) for h in sliced), default=0.0, ) try: es_response["hits"]["max_score"] = float(slice_max) except (TypeError, ValueError): es_response["hits"]["max_score"] = 0.0 else: es_response["hits"]["max_score"] = 0.0 # Page fill: fetch detailed fields only for final page hits. if sliced: if response_source_spec is False: for hit in sliced: hit.pop("_source", None) context.logger.info( "分页详情回填跳过 | 原查询 _source=false", extra={'reqid': context.reqid, 'uid': context.uid} ) else: context.start_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) try: page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None] details_by_id, fill_took = self._fetch_hits_by_ids( index_name=index_name, doc_ids=page_ids, source_spec=response_source_spec, ) filled = 0 for hit in sliced: hid = hit.get("_id") if hid is None: continue detail_hit = details_by_id.get(str(hid)) if detail_hit is None: continue if "_source" in detail_hit: hit["_source"] = detail_hit.get("_source") or {} filled += 1 if style_intent_decisions: self.style_sku_selector.apply_precomputed_decisions( sliced, style_intent_decisions, ) if fill_took: es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) context.logger.info( f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms", extra={'reqid': context.reqid, 'uid': context.uid} ) finally: context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) context.logger.info( f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", extra={'reqid': context.reqid, 'uid': context.uid} ) # 非重排窗口:款式意图在 result_processing 之前执行,便于单独计时且与 ES 召回阶段衔接 if self._has_style_intent(parsed_query) and not in_rerank_window: es_hits_pre = es_response.get("hits", {}).get("hits") or [] style_intent_decisions = self._apply_style_intent_to_hits( es_hits_pre, parsed_query, context=context, ) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total # max_score 会在启用 AI 搜索时被更新为融合分数的最大值 max_score = es_response.get('hits', {}).get('max_score') or 0.0 # 从上下文中取出重排调试信息(若有) rerank_debug_raw = context.get_intermediate_result('rerank_scores', None) rerank_debug_by_doc: Dict[str, Dict[str, Any]] = {} if isinstance(rerank_debug_raw, list): for item in rerank_debug_raw: if not isinstance(item, dict): continue doc_id = item.get("doc_id") if doc_id is None: continue rerank_debug_by_doc[str(doc_id)] = item if self._has_style_intent(parsed_query): if style_intent_decisions: self.style_sku_selector.apply_precomputed_decisions( es_hits, style_intent_decisions, ) # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language=language, sku_filter_dimension=sku_filter_dimension ) # Build per-result debug info (per SPU) when debug mode is enabled per_result_debug = [] if debug and es_hits and formatted_results: final_ranks_by_doc = { str(hit.get("_id")): from_ + rank for rank, hit in enumerate(es_hits, 1) if hit.get("_id") is not None } for hit, spu in zip(es_hits, formatted_results): source = hit.get("_source", {}) or {} doc_id = hit.get("_id") rerank_debug = None if doc_id is not None: rerank_debug = rerank_debug_by_doc.get(str(doc_id)) style_intent_debug = None if doc_id is not None and style_intent_decisions: decision = style_intent_decisions.get(str(doc_id)) if decision is not None: style_intent_debug = decision.to_dict() raw_score = hit.get("_score") try: es_score = float(raw_score) if raw_score is not None else 0.0 except (TypeError, ValueError): es_score = 0.0 try: normalized = ( float(es_score) / float(es_score_normalization_factor) if es_score_normalization_factor else None ) except (TypeError, ValueError, ZeroDivisionError): normalized = None title_multilingual = source.get("title") if isinstance(source.get("title"), dict) else None brief_multilingual = source.get("brief") if isinstance(source.get("brief"), dict) else None vendor_multilingual = source.get("vendor") if isinstance(source.get("vendor"), dict) else None debug_entry: Dict[str, Any] = { "spu_id": spu.spu_id, "es_score": es_score, "es_score_normalized": normalized, "initial_rank": initial_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None, "final_rank": final_ranks_by_doc.get(str(doc_id)) if doc_id is not None else None, "title_multilingual": title_multilingual, "brief_multilingual": brief_multilingual, "vendor_multilingual": vendor_multilingual, } # 若存在重排调试信息,则补充 doc 级别的融合分数信息 if rerank_debug: debug_entry["doc_id"] = rerank_debug.get("doc_id") # 与 rerank_client 中字段保持一致,便于前端直接使用 debug_entry["rerank_score"] = rerank_debug.get("rerank_score") debug_entry["text_score"] = rerank_debug.get("text_score") debug_entry["text_source_score"] = rerank_debug.get("text_source_score") debug_entry["text_translation_score"] = rerank_debug.get("text_translation_score") debug_entry["text_weighted_source_score"] = rerank_debug.get("text_weighted_source_score") debug_entry["text_weighted_translation_score"] = rerank_debug.get("text_weighted_translation_score") debug_entry["text_primary_score"] = rerank_debug.get("text_primary_score") debug_entry["text_support_score"] = rerank_debug.get("text_support_score") debug_entry["text_score_fallback_to_es"] = rerank_debug.get("text_score_fallback_to_es") debug_entry["knn_score"] = rerank_debug.get("knn_score") debug_entry["rerank_factor"] = rerank_debug.get("rerank_factor") debug_entry["text_factor"] = rerank_debug.get("text_factor") debug_entry["knn_factor"] = rerank_debug.get("knn_factor") debug_entry["fused_score"] = rerank_debug.get("fused_score") debug_entry["rerank_input"] = rerank_debug.get("rerank_input") debug_entry["matched_queries"] = rerank_debug.get("matched_queries") if style_intent_debug: debug_entry["style_intent_sku"] = style_intent_debug per_result_debug.append(debug_entry) # Format facets standardized_facets = None if facets: standardized_facets = ResultFormatter.format_facets( es_response.get('aggregations', {}), facets, filters ) # Generate suggestions and related searches query_text = parsed_query.original_query if parsed_query else query suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "query_normalized": context.query_analysis.query_normalized, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "index_languages": index_langs, "translations": context.query_analysis.translations, "has_vector": context.query_analysis.query_vector is not None, "query_tokens": getattr(parsed_query, "query_tokens", []), "intent_detection": context.get_intermediate_result("style_intent_profile"), }, "es_query": context.get_intermediate_result('es_query', {}), "es_query_context": { "es_fetch_from": es_fetch_from, "es_fetch_size": es_fetch_size, "in_rerank_window": in_rerank_window, "rerank_prefetch_source": context.get_intermediate_result('es_query_rerank_prefetch_source'), "include_named_queries_score": bool(do_rerank and in_rerank_window), }, "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}), "es_score_normalization_factor": es_score_normalization_factor, }, "rerank": rerank_debug_info, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "stage_time_bounds_ms": { stage: { kk: round(vv, 3) for kk, vv in bounds.items() } for stage, bounds in context.performance_metrics.stage_time_bounds_ms.items() }, "search_params": context.metadata.get('search_params', {}) } if per_result_debug: debug_info["per_result"] = per_result_debug # Build result result = SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), suggestions=suggestions, related_searches=related_searches, debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (外部友好格式). Args: image_url: URL of query image tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding if self.image_encoder is None: raise RuntimeError("Image encoder is not initialized at startup") image_vector = self.image_encoder.encode_image_from_url(image_url, priority=1) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # No longer need to add tenant_id to filters since each tenant has its own index # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } # Apply source filtering semantics (None / [] / list) self._apply_source_filter(es_query) if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: if len(filter_clauses) == 1: es_query["knn"]["filter"] = filter_clauses[0] else: es_query["knn"]["filter"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=index_name, body=es_query, size=size ) # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language="en", # Default language for image search sku_filter_dimension=None # Image search doesn't support SKU filtering ) return SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=es_response.get('took', 0), facets=None, query_info={'image_url': image_url, 'search_type': 'image_similarity'}, suggestions=[], related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of dynamic text retrieval configuration. Returns: Dictionary with language-aware field information """ return { "mode": "dynamic_language_fields", "multilingual_fields": self.config.query_config.multilingual_fields, "shared_fields": self.config.query_config.shared_fields, "core_multilingual_fields": self.config.query_config.core_multilingual_fields, "field_boosts": self.config.field_boosts, } def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: tenant_id: Tenant ID (required to determine which index to query) doc_id: Document ID Returns: Document or None if not found """ try: index_name = get_tenant_index_name(tenant_id) response = self.es_client.client.get( index=index_name, id=doc_id ) return response.get('_source') except Exception as e: logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True) return None