""" Main Searcher module - executes search queries against Elasticsearch. Handles query parsing, ranking, and result formatting. """ from typing import Dict, Any, List, Optional, Union, Tuple import os import time, json import logging import hashlib from string import Formatter import numpy as np from utils.es_client import ESClient from query import QueryParser, ParsedQuery from embeddings.image_encoder import CLIPImageEncoder from .es_query_builder import ESQueryBuilder from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from context.request_context import RequestContext, RequestContextStage from api.models import FacetResult, FacetValue, FacetConfig from api.result_formatter import ResultFormatter from indexer.mapping_generator import get_tenant_index_name logger = logging.getLogger(__name__) backend_verbose_logger = logging.getLogger("backend.verbose") def _log_backend_verbose(payload: Dict[str, Any]) -> None: if not backend_verbose_logger.handlers: return backend_verbose_logger.info( json.dumps(payload, ensure_ascii=False, separators=(",", ":")) ) class SearchResult: """Container for search results (外部友好格式).""" def __init__( self, results: List[Any], # List[SpuResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None, related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} self.suggestions = suggestions or [] self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, "query_info": self.query_info, "suggestions": self.suggestions, "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info return result class Searcher: """ Main search engine class. Handles: - Query parsing and translation - Dynamic multi-language text recall planning - ES query building - Result ranking and formatting """ def __init__( self, es_client: ESClient, config: SearchConfig, query_parser: Optional[QueryParser] = None, image_encoder: Optional[CLIPImageEncoder] = None, ): """ Initialize searcher. Args: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) image_encoder: Optional pre-initialized image encoder """ self.es_client = es_client self.config = config # Index name is now generated dynamically per tenant, no longer stored here self.query_parser = query_parser or QueryParser(config) self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" self.image_embedding_field = config.query_config.image_embedding_field if self.image_embedding_field and image_encoder is None: self.image_encoder = CLIPImageEncoder() else: self.image_encoder = image_encoder self.source_fields = config.query_config.source_fields # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( match_fields=[], field_boosts=self.config.field_boosts, multilingual_fields=self.config.query_config.multilingual_fields, shared_fields=self.config.query_config.shared_fields, core_multilingual_fields=self.config.query_config.core_multilingual_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, source_fields=self.source_fields, function_score_config=self.config.function_score, default_language=self.config.query_config.default_language, knn_boost=self.config.query_config.knn_boost, base_minimum_should_match=self.config.query_config.base_minimum_should_match, translation_minimum_should_match=self.config.query_config.translation_minimum_should_match, translation_boost=self.config.query_config.translation_boost, translation_boost_when_source_missing=self.config.query_config.translation_boost_when_source_missing, source_boost_when_missing=self.config.query_config.source_boost_when_missing, original_query_fallback_boost_when_translation_missing=( self.config.query_config.original_query_fallback_boost_when_translation_missing ), tie_breaker_base_query=self.config.query_config.tie_breaker_base_query, ) def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: """ Apply tri-state _source semantics: - None: do not set _source (return full source) - []: _source=false (return no source fields) - [..]: _source.includes=[..] """ if self.source_fields is None: return if not isinstance(self.source_fields, list): raise ValueError("query_config.source_fields must be null or list[str]") if len(self.source_fields) == 0: es_query["_source"] = False return es_query["_source"] = {"includes": self.source_fields} def _resolve_rerank_source_filter(self, doc_template: str) -> Dict[str, Any]: """ Build a lightweight _source filter for rerank prefetch. Only fetch fields required by rerank doc template to reduce ES payload. """ field_map = { "title": "title", "brief": "brief", "vendor": "vendor", "description": "description", "category_path": "category_path", } includes: set[str] = set() template = str(doc_template or "{title}") for _, field_name, _, _ in Formatter().parse(template): if not field_name: continue key = field_name.split(".", 1)[0].split("!", 1)[0].split(":", 1)[0] mapped = field_map.get(key) if mapped: includes.add(mapped) # Fallback to title-only to keep rerank docs usable. if not includes: includes.add("title") return {"includes": sorted(includes)} def _fetch_hits_by_ids( self, index_name: str, doc_ids: List[str], source_spec: Optional[Any], ) -> tuple[Dict[str, Dict[str, Any]], int]: """ Fetch page documents by IDs for final response fill. Returns: (hits_by_id, es_took_ms) """ if not doc_ids: return {}, 0 body: Dict[str, Any] = { "query": { "ids": { "values": doc_ids, } } } if source_spec is not None: body["_source"] = source_spec resp = self.es_client.search( index_name=index_name, body=body, size=len(doc_ids), from_=0, ) hits = resp.get("hits", {}).get("hits") or [] hits_by_id: Dict[str, Dict[str, Any]] = {} for hit in hits: hid = hit.get("_id") if hid is None: continue hits_by_id[str(hid)] = hit return hits_by_id, int(resp.get("took", 0) or 0) @staticmethod def _normalize_sku_match_text(value: Optional[str]) -> str: """Normalize free text for lightweight SKU option matching.""" if value is None: return "" return " ".join(str(value).strip().casefold().split()) @staticmethod def _sku_option1_embedding_key( sku: Dict[str, Any], spu_option1_name: Optional[Any] = None, ) -> Optional[str]: """ Text sent to the embedding service for option1 must be "name:value" (option name from SKU row or SPU-level option1_name). """ value_raw = sku.get("option1_value") if value_raw is None: return None value = str(value_raw).strip() if not value: return None name = sku.get("option1_name") if name is None or not str(name).strip(): name = spu_option1_name name_str = str(name).strip() if name is not None and str(name).strip() else "" if name_str: value = f"{name_str}:{value}" return value.casefold() def _build_sku_query_texts(self, parsed_query: ParsedQuery) -> List[str]: """Collect original and translated query texts for SKU option matching.""" candidates: List[str] = [] for text in ( getattr(parsed_query, "original_query", None), getattr(parsed_query, "query_normalized", None), getattr(parsed_query, "rewritten_query", None), ): normalized = self._normalize_sku_match_text(text) if normalized: candidates.append(normalized) query_text_by_lang = getattr(parsed_query, "query_text_by_lang", {}) or {} if isinstance(query_text_by_lang, dict): for text in query_text_by_lang.values(): normalized = self._normalize_sku_match_text(text) if normalized: candidates.append(normalized) translations = getattr(parsed_query, "translations", {}) or {} if isinstance(translations, dict): for text in translations.values(): normalized = self._normalize_sku_match_text(text) if normalized: candidates.append(normalized) deduped: List[str] = [] seen = set() for text in candidates: if text in seen: continue seen.add(text) deduped.append(text) return deduped def _find_query_matching_sku_index( self, skus: List[Dict[str, Any]], query_texts: List[str], spu_option1_name: Optional[Any] = None, ) -> Optional[int]: """Return the first SKU whose option1_value (or name:value) appears in query texts.""" if not skus or not query_texts: return None for index, sku in enumerate(skus): option1_value = self._normalize_sku_match_text(sku.get("option1_value")) if not option1_value: continue if any(option1_value in query_text for query_text in query_texts): return index embed_key = self._sku_option1_embedding_key(sku, spu_option1_name) if embed_key and embed_key != option1_value: composite_norm = self._normalize_sku_match_text(embed_key.replace(":", " ")) if any(composite_norm in query_text for query_text in query_texts): return index if any(embed_key.casefold() in query_text for query_text in query_texts): return index return None def _encode_query_vector_for_sku_matching( self, parsed_query: ParsedQuery, context: Optional[RequestContext] = None, ) -> Optional[np.ndarray]: """Best-effort fallback query embedding for final-page SKU matching.""" query_text = ( getattr(parsed_query, "rewritten_query", None) or getattr(parsed_query, "query_normalized", None) or getattr(parsed_query, "original_query", None) ) if not query_text: return None text_encoder = getattr(self.query_parser, "text_encoder", None) if text_encoder is None: return None try: vectors = text_encoder.encode([query_text], priority=1) except Exception as exc: logger.warning("Failed to encode query vector for SKU matching: %s", exc, exc_info=True) if context is not None: context.add_warning(f"SKU query embedding failed: {exc}") return None if vectors is None or len(vectors) == 0: return None vector = vectors[0] if vector is None: return None return np.asarray(vector, dtype=np.float32) def _select_sku_by_embedding( self, skus: List[Dict[str, Any]], option1_vectors: Dict[str, np.ndarray], query_vector: np.ndarray, spu_option1_name: Optional[Any] = None, ) -> Tuple[Optional[int], Optional[float]]: """Select the SKU whose option1 embedding key (name:value) is most similar to the query.""" best_index: Optional[int] = None best_score: Optional[float] = None for index, sku in enumerate(skus): embed_key = self._sku_option1_embedding_key(sku, spu_option1_name) if not embed_key: continue option_vector = option1_vectors.get(embed_key) if option_vector is None: continue score = float(np.inner(query_vector, option_vector)) if best_score is None or score > best_score: best_index = index best_score = score return best_index, best_score @staticmethod def _promote_matching_sku(source: Dict[str, Any], match_index: int) -> Optional[Dict[str, Any]]: """Move the matched SKU to the front and swap the SPU image.""" skus = source.get("skus") if not isinstance(skus, list) or match_index < 0 or match_index >= len(skus): return None matched_sku = skus.pop(match_index) skus.insert(0, matched_sku) image_src = matched_sku.get("image_src") or matched_sku.get("imageSrc") if image_src: source["image_url"] = image_src return matched_sku def _apply_sku_sorting_for_page_hits( self, es_hits: List[Dict[str, Any]], parsed_query: ParsedQuery, context: Optional[RequestContext] = None, ) -> None: """Sort each page hit's SKUs so the best-matching SKU is first.""" if not es_hits: return query_texts = self._build_sku_query_texts(parsed_query) unmatched_hits: List[Dict[str, Any]] = [] option1_values_to_encode: List[str] = [] seen_option1_values = set() text_matched = 0 embedding_matched = 0 for hit in es_hits: source = hit.get("_source") if not isinstance(source, dict): continue skus = source.get("skus") if not isinstance(skus, list) or not skus: continue spu_option1_name = source.get("option1_name") match_index = self._find_query_matching_sku_index( skus, query_texts, spu_option1_name=spu_option1_name ) if match_index is not None: self._promote_matching_sku(source, match_index) text_matched += 1 continue unmatched_hits.append(hit) for sku in skus: embed_key = self._sku_option1_embedding_key(sku, spu_option1_name) if not embed_key or embed_key in seen_option1_values: continue seen_option1_values.add(embed_key) option1_values_to_encode.append(embed_key) if not unmatched_hits or not option1_values_to_encode: return query_vector = getattr(parsed_query, "query_vector", None) if query_vector is None: query_vector = self._encode_query_vector_for_sku_matching(parsed_query, context=context) if query_vector is None: return text_encoder = getattr(self.query_parser, "text_encoder", None) if text_encoder is None: return try: encoded_option_vectors = text_encoder.encode(option1_values_to_encode, priority=1) except Exception as exc: logger.warning("Failed to encode SKU option1 values for final-page sorting: %s", exc, exc_info=True) if context is not None: context.add_warning(f"SKU option embedding failed: {exc}") return option1_vectors: Dict[str, np.ndarray] = {} for option1_value, vector in zip(option1_values_to_encode, encoded_option_vectors): if vector is None: continue option1_vectors[option1_value] = np.asarray(vector, dtype=np.float32) query_vector_array = np.asarray(query_vector, dtype=np.float32) for hit in unmatched_hits: source = hit.get("_source") if not isinstance(source, dict): continue skus = source.get("skus") if not isinstance(skus, list) or not skus: continue match_index, _ = self._select_sku_by_embedding( skus, option1_vectors, query_vector_array, spu_option1_name=source.get("option1_name"), ) if match_index is None: continue self._promote_matching_sku(source, match_index) embedding_matched += 1 if text_matched or embedding_matched: logger.info( "Final-page SKU sorting completed | text_matched=%s | embedding_matched=%s", text_matched, embedding_matched, ) def search( self, query: str, tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None, facets: Optional[List[FacetConfig]] = None, min_score: Optional[float] = None, context: Optional[RequestContext] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", debug: bool = False, language: str = "en", sku_filter_dimension: Optional[List[str]] = None, enable_rerank: Optional[bool] = None, rerank_query_template: Optional[str] = None, rerank_doc_template: Optional[str] = None, ) -> SearchResult: """ Execute search query (外部友好格式). Args: query: Search query string tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters range_filters: Range filters for numeric fields facets: Facet configurations for faceted search min_score: Minimum score threshold context: Request context for tracking (required) sort_by: Field name for sorting sort_order: Sort order: 'asc' or 'desc' debug: Enable debug information output language: Response / field selection language hint (e.g. zh, en) sku_filter_dimension: SKU grouping dimensions for per-SPU variant pick enable_rerank: If None, use ``config.rerank.enabled``; if set, overrides whether the rerank provider is invoked (subject to rerank window). rerank_query_template: Override for rerank query text template; None uses ``config.rerank.rerank_query_template`` (e.g. ``"{query}"``). rerank_doc_template: Override for per-hit document text passed to rerank; None uses ``config.rerank.rerank_doc_template``. Placeholders are resolved in ``search/rerank_client.py``. Returns: SearchResult object with formatted results """ if context is None: raise ValueError("context is required") # 根据租户配置决定翻译开关(离线/在线统一) tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_langs = tenant_cfg.get("index_languages") or [] enable_translation = len(index_langs) > 0 enable_embedding = self.config.query_config.enable_text_embedding rc = self.config.rerank effective_query_template = rerank_query_template or rc.rerank_query_template effective_doc_template = rerank_doc_template or rc.rerank_doc_template # 重排开关优先级:请求参数显式传值 > 服务端配置(默认开启) rerank_enabled_by_config = bool(rc.enabled) do_rerank = rerank_enabled_by_config if enable_rerank is None else bool(enable_rerank) rerank_window = rc.rerank_window # 若开启重排且请求范围在窗口内:从 ES 取前 rerank_window 条、重排后再按 from/size 分页;否则不重排,按原 from/size 查 ES in_rerank_window = do_rerank and (from_ + size) <= rerank_window es_fetch_from = 0 if in_rerank_window else from_ es_fetch_size = rerank_window if in_rerank_window else size # Start timing context.start_stage(RequestContextStage.TOTAL) context.logger.info( f"开始搜索请求 | 查询: '{query}' | 参数: size={size}, from_={from_}, " f"enable_rerank(request)={enable_rerank}, enable_rerank(config)={rerank_enabled_by_config}, " f"enable_rerank(effective)={do_rerank}, in_rerank_window={in_rerank_window}, " f"es_fetch=({es_fetch_from},{es_fetch_size}) | " f"enable_translation={enable_translation}, enable_embedding={enable_embedding}, min_score={min_score}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Store search parameters in context context.metadata['search_params'] = { 'size': size, 'from_': from_, 'es_fetch_from': es_fetch_from, 'es_fetch_size': es_fetch_size, 'in_rerank_window': in_rerank_window, 'rerank_enabled_by_config': rerank_enabled_by_config, 'enable_rerank_request': enable_rerank, 'rerank_query_template': effective_query_template, 'rerank_doc_template': effective_doc_template, 'filters': filters, 'range_filters': range_filters, 'facets': facets, 'enable_translation': enable_translation, 'enable_embedding': enable_embedding, 'enable_rerank': do_rerank, 'min_score': min_score, 'sort_by': sort_by, 'sort_order': sort_order } context.metadata['feature_flags'] = { 'translation_enabled': enable_translation, 'embedding_enabled': enable_embedding, 'rerank_enabled': do_rerank } # Step 1: Parse query context.start_stage(RequestContextStage.QUERY_PARSING) try: parsed_query = self.query_parser.parse( query, tenant_id=tenant_id, generate_vector=enable_embedding, context=context, target_languages=index_langs if enable_translation else [], ) # Store query analysis results in context context.store_query_analysis( original_query=parsed_query.original_query, query_normalized=parsed_query.query_normalized, rewritten_query=parsed_query.rewritten_query, detected_language=parsed_query.detected_language, translations=parsed_query.translations, query_vector=parsed_query.query_vector.tolist() if parsed_query.query_vector is not None else None, domain="default", is_simple_query=True ) context.logger.info( f"查询解析完成 | 原查询: '{parsed_query.original_query}' | " f"重写后: '{parsed_query.rewritten_query}' | " f"语言: {parsed_query.detected_language} | " f"向量: {'是' if parsed_query.query_vector is not None else '否'}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"查询解析失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_PARSING) # Step 2: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # index_name = "search_products" # No longer need to add tenant_id to filters since each tenant has its own index es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.query_normalized, query_vector=parsed_query.query_vector if enable_embedding else None, filters=filters, range_filters=range_filters, facet_configs=facets, size=es_fetch_size, from_=es_fetch_from, enable_knn=enable_embedding and parsed_query.query_vector is not None, min_score=min_score, parsed_query=parsed_query, index_languages=index_langs, ) # Add facets for faceted search if facets: facet_aggs = self.query_builder.build_facets(facets) if facet_aggs: if "aggs" not in es_query: es_query["aggs"] = {} es_query["aggs"].update(facet_aggs) # Add sorting if specified if sort_by: es_query = self.query_builder.add_sorting(es_query, sort_by, sort_order) es_query["track_scores"] = True # Keep requested response _source semantics for the final response fill. response_source_spec = es_query.get("_source") # In rerank window, first pass only fetches minimal fields required by rerank template. es_query_for_fetch = es_query rerank_prefetch_source = None if in_rerank_window: rerank_prefetch_source = self._resolve_rerank_source_filter(effective_doc_template) es_query_for_fetch = dict(es_query) es_query_for_fetch["_source"] = rerank_prefetch_source # Extract size and from from body for ES client parameters body_for_es = {k: v for k, v in es_query_for_fetch.items() if k not in ['size', 'from']} # Store ES query in context context.store_intermediate_result('es_query', es_query) if in_rerank_window and rerank_prefetch_source is not None: context.store_intermediate_result('es_query_rerank_prefetch_source', rerank_prefetch_source) context.store_intermediate_result('es_body_for_search', body_for_es) # Serialize ES query to compute a compact size + stable digest for correlation es_query_compact = json.dumps(es_query_for_fetch, ensure_ascii=False, separators=(",", ":")) es_query_digest = hashlib.sha256(es_query_compact.encode("utf-8")).hexdigest()[:16] knn_enabled = bool(enable_embedding and parsed_query.query_vector is not None) vector_dims = int(len(parsed_query.query_vector)) if parsed_query.query_vector is not None else 0 context.logger.info( "ES query built | size: %s chars | digest: %s | KNN: %s | vector_dims: %s | facets: %s | rerank_prefetch_source: %s", len(es_query_compact), es_query_digest, "yes" if knn_enabled else "no", vector_dims, "yes" if facets else "no", rerank_prefetch_source, extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ "event": "es_query_built", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "size_chars": len(es_query_compact), "sha256_16": es_query_digest, "knn_enabled": knn_enabled, "vector_dims": vector_dims, "has_facets": bool(facets), "query": es_query_for_fetch, }) except Exception as e: context.set_error(e) context.logger.error( f"ES查询构建失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.QUERY_BUILDING) # Step 4: Elasticsearch search (primary recall) context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) try: # Use tenant-specific index name(开启重排且在窗口内时已用 es_fetch_size/es_fetch_from) es_response = self.es_client.search( index_name=index_name, body=body_for_es, size=es_fetch_size, from_=es_fetch_from, include_named_queries_score=bool(do_rerank and in_rerank_window), ) # Store ES response in context context.store_intermediate_result('es_response', es_response) # Extract timing from ES response es_took = es_response.get('took', 0) context.logger.info( f"ES搜索完成 | 耗时: {es_took}ms | " f"命中数: {es_response.get('hits', {}).get('total', {}).get('value', 0)} | " f"最高分: {(es_response.get('hits', {}).get('max_score') or 0):.3f}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"ES搜索执行失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.ELASTICSEARCH_SEARCH_PRIMARY) # Optional Step 4.5: AI reranking(仅当请求范围在重排窗口内时执行) if do_rerank and in_rerank_window: context.start_stage(RequestContextStage.RERANKING) try: from .rerank_client import run_rerank rerank_query = parsed_query.original_query if parsed_query else query es_response, rerank_meta, fused_debug = run_rerank( query=rerank_query, es_response=es_response, language=language, timeout_sec=rc.timeout_sec, weight_es=rc.weight_es, weight_ai=rc.weight_ai, rerank_query_template=effective_query_template, rerank_doc_template=effective_doc_template, top_n=(from_ + size), ) if rerank_meta is not None: from config.services_config import get_rerank_service_url rerank_url = get_rerank_service_url() context.metadata.setdefault("rerank_info", {}) context.metadata["rerank_info"].update({ "service_url": rerank_url, "docs": len(es_response.get("hits", {}).get("hits") or []), "meta": rerank_meta, }) context.store_intermediate_result("rerank_scores", fused_debug) context.logger.info( f"重排完成 | docs={len(fused_debug)} | meta={rerank_meta}", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.add_warning(f"Rerank failed: {e}") context.logger.warning( f"调用重排服务失败 | error: {e}", extra={'reqid': context.reqid, 'uid': context.uid}, exc_info=True, ) finally: context.end_stage(RequestContextStage.RERANKING) # 当本次请求在重排窗口内时:已从 ES 取了 rerank_window 条并可能已重排,需按请求的 from/size 做分页切片 if in_rerank_window: hits = es_response.get("hits", {}).get("hits") or [] sliced = hits[from_ : from_ + size] es_response.setdefault("hits", {})["hits"] = sliced if sliced: # 对于启用重排的结果,优先使用 _fused_score 计算 max_score;否则退回原始 _score slice_max = max( (h.get("_fused_score", h.get("_score", 0.0)) for h in sliced), default=0.0, ) try: es_response["hits"]["max_score"] = float(slice_max) except (TypeError, ValueError): es_response["hits"]["max_score"] = 0.0 else: es_response["hits"]["max_score"] = 0.0 # Page fill: fetch detailed fields only for final page hits. if sliced: if response_source_spec is False: for hit in sliced: hit.pop("_source", None) context.logger.info( "分页详情回填跳过 | 原查询 _source=false", extra={'reqid': context.reqid, 'uid': context.uid} ) else: context.start_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) try: page_ids = [str(h.get("_id")) for h in sliced if h.get("_id") is not None] details_by_id, fill_took = self._fetch_hits_by_ids( index_name=index_name, doc_ids=page_ids, source_spec=response_source_spec, ) filled = 0 for hit in sliced: hid = hit.get("_id") if hid is None: continue detail_hit = details_by_id.get(str(hid)) if detail_hit is None: continue if "_source" in detail_hit: hit["_source"] = detail_hit.get("_source") or {} filled += 1 if fill_took: es_response["took"] = int((es_response.get("took", 0) or 0) + fill_took) context.logger.info( f"分页详情回填 | ids={len(page_ids)} | filled={filled} | took={fill_took}ms", extra={'reqid': context.reqid, 'uid': context.uid} ) finally: context.end_stage(RequestContextStage.ELASTICSEARCH_PAGE_FILL) context.logger.info( f"重排分页切片 | from={from_}, size={size}, 返回={len(sliced)}条", extra={'reqid': context.reqid, 'uid': context.uid} ) # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total # max_score 会在启用 AI 搜索时被更新为融合分数的最大值 max_score = es_response.get('hits', {}).get('max_score') or 0.0 # 从上下文中取出重排调试信息(若有) rerank_debug_raw = context.get_intermediate_result('rerank_scores', None) rerank_debug_by_doc: Dict[str, Dict[str, Any]] = {} if isinstance(rerank_debug_raw, list): for item in rerank_debug_raw: if not isinstance(item, dict): continue doc_id = item.get("doc_id") if doc_id is None: continue rerank_debug_by_doc[str(doc_id)] = item self._apply_sku_sorting_for_page_hits(es_hits, parsed_query, context=context) # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language=language, sku_filter_dimension=sku_filter_dimension ) # Build per-result debug info (per SPU) when debug mode is enabled per_result_debug = [] if debug and es_hits and formatted_results: for hit, spu in zip(es_hits, formatted_results): source = hit.get("_source", {}) or {} doc_id = hit.get("_id") rerank_debug = None if doc_id is not None: rerank_debug = rerank_debug_by_doc.get(str(doc_id)) raw_score = hit.get("_score") try: es_score = float(raw_score) if raw_score is not None else 0.0 except (TypeError, ValueError): es_score = 0.0 try: normalized = float(es_score) / float(max_score) if max_score else None except (TypeError, ValueError, ZeroDivisionError): normalized = None title_multilingual = source.get("title") if isinstance(source.get("title"), dict) else None brief_multilingual = source.get("brief") if isinstance(source.get("brief"), dict) else None vendor_multilingual = source.get("vendor") if isinstance(source.get("vendor"), dict) else None debug_entry: Dict[str, Any] = { "spu_id": spu.spu_id, "es_score": es_score, "es_score_normalized": normalized, "title_multilingual": title_multilingual, "brief_multilingual": brief_multilingual, "vendor_multilingual": vendor_multilingual, } # 若存在重排调试信息,则补充 doc 级别的融合分数信息 if rerank_debug: debug_entry["doc_id"] = rerank_debug.get("doc_id") # 与 rerank_client 中字段保持一致,便于前端直接使用 debug_entry["rerank_score"] = rerank_debug.get("rerank_score") debug_entry["text_score"] = rerank_debug.get("text_score") debug_entry["text_source_score"] = rerank_debug.get("text_source_score") debug_entry["text_translation_score"] = rerank_debug.get("text_translation_score") debug_entry["text_fallback_score"] = rerank_debug.get("text_fallback_score") debug_entry["text_primary_score"] = rerank_debug.get("text_primary_score") debug_entry["text_support_score"] = rerank_debug.get("text_support_score") debug_entry["knn_score"] = rerank_debug.get("knn_score") debug_entry["fused_score"] = rerank_debug.get("fused_score") debug_entry["matched_queries"] = rerank_debug.get("matched_queries") per_result_debug.append(debug_entry) # Format facets standardized_facets = None if facets: standardized_facets = ResultFormatter.format_facets( es_response.get('aggregations', {}), facets, filters ) # Generate suggestions and related searches query_text = parsed_query.original_query if parsed_query else query suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) except Exception as e: context.set_error(e) context.logger.error( f"结果处理失败 | 错误: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise finally: context.end_stage(RequestContextStage.RESULT_PROCESSING) # End total timing and build result total_duration = context.end_stage(RequestContextStage.TOTAL) context.performance_metrics.total_duration = total_duration # Collect debug information if requested debug_info = None if debug: debug_info = { "query_analysis": { "original_query": context.query_analysis.original_query, "query_normalized": context.query_analysis.query_normalized, "rewritten_query": context.query_analysis.rewritten_query, "detected_language": context.query_analysis.detected_language, "translations": context.query_analysis.translations, "has_vector": context.query_analysis.query_vector is not None, "is_simple_query": context.query_analysis.is_simple_query, "domain": context.query_analysis.domain }, "es_query": context.get_intermediate_result('es_query', {}), "es_response": { "took_ms": es_response.get('took', 0), "total_hits": total_value, "max_score": max_score, "shards": es_response.get('_shards', {}) }, "feature_flags": context.metadata.get('feature_flags', {}), "stage_timings": { k: round(v, 2) for k, v in context.performance_metrics.stage_timings.items() }, "search_params": context.metadata.get('search_params', {}) } if per_result_debug: debug_info["per_result"] = per_result_debug # Build result result = SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), suggestions=suggestions, related_searches=related_searches, debug_info=debug_info ) # Log complete performance summary context.log_performance_summary() return result def search_by_image( self, image_url: str, tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ Search by image similarity (外部友好格式). Args: image_url: URL of query image tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") # Generate image embedding if self.image_encoder is None: raise RuntimeError("Image encoder is not initialized at startup") image_vector = self.image_encoder.encode_image_from_url(image_url, priority=1) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") # Generate tenant-specific index name index_name = get_tenant_index_name(tenant_id) # No longer need to add tenant_id to filters since each tenant has its own index # Build KNN query es_query = { "size": size, "knn": { "field": self.image_embedding_field, "query_vector": image_vector.tolist(), "k": size, "num_candidates": size * 10 } } # Apply source filtering semantics (None / [] / list) self._apply_source_filter(es_query) if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) if filter_clauses: if len(filter_clauses) == 1: es_query["knn"]["filter"] = filter_clauses[0] else: es_query["knn"]["filter"] = { "bool": { "filter": filter_clauses } } # Execute search es_response = self.es_client.search( index_name=index_name, body=es_query, size=size ) # Extract ES hits es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total max_score = es_response.get('hits', {}).get('max_score') or 0.0 # Format results using ResultFormatter formatted_results = ResultFormatter.format_search_results( es_hits, max_score, language="en", # Default language for image search sku_filter_dimension=None # Image search doesn't support SKU filtering ) return SearchResult( results=formatted_results, total=total_value, max_score=max_score, took_ms=es_response.get('took', 0), facets=None, query_info={'image_url': image_url, 'search_type': 'image_similarity'}, suggestions=[], related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: """ Get summary of dynamic text retrieval configuration. Returns: Dictionary with language-aware field information """ return { "mode": "dynamic_language_fields", "multilingual_fields": self.config.query_config.multilingual_fields, "shared_fields": self.config.query_config.shared_fields, "core_multilingual_fields": self.config.query_config.core_multilingual_fields, "field_boosts": self.config.field_boosts, } def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: tenant_id: Tenant ID (required to determine which index to query) doc_id: Document ID Returns: Document or None if not found """ try: index_name = get_tenant_index_name(tenant_id) response = self.es_client.client.get( index=index_name, id=doc_id ) return response.get('_source') except Exception as e: logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True) return None def _standardize_facets( self, es_aggregations: Dict[str, Any], facet_configs: Optional[List[Union[str, Any]]], current_filters: Optional[Dict[str, Any]] ) -> Optional[List[FacetResult]]: """ 将 ES 聚合结果转换为标准化的分面格式(返回 Pydantic 模型)。 Args: es_aggregations: ES 原始聚合结果 facet_configs: 分面配置列表(str 或 FacetConfig) current_filters: 当前应用的过滤器 Returns: 标准化的分面结果列表(FacetResult 对象) """ if not es_aggregations or not facet_configs: return None standardized_facets: List[FacetResult] = [] for config in facet_configs: # 解析配置 if isinstance(config, str): field = config facet_type = "terms" else: # FacetConfig 对象 field = config.field facet_type = config.type agg_name = f"{field}_facet" if agg_name not in es_aggregations: continue agg_result = es_aggregations[agg_name] # 获取当前字段的选中值 selected_values = set() if current_filters and field in current_filters: filter_value = current_filters[field] if isinstance(filter_value, list): selected_values = set(filter_value) else: selected_values = {filter_value} # 转换 buckets 为 FacetValue 对象 facet_values: List[FacetValue] = [] if 'buckets' in agg_result: for bucket in agg_result['buckets']: value = bucket.get('key') count = bucket.get('doc_count', 0) facet_values.append(FacetValue( value=value, label=str(value), count=count, selected=value in selected_values )) # 构建 FacetResult 对象 facet_result = FacetResult( field=field, label=field, type=facet_type, values=facet_values ) standardized_facets.append(facet_result) return standardized_facets if standardized_facets else None