""" Result formatter for converting ES internal format to external-friendly format. """ from typing import List, Dict, Any, Optional from .models import SpuResult, SkuResult, FacetResult, FacetValue class ResultFormatter: """Formats ES search results to external-friendly format.""" @staticmethod def format_search_results( es_hits: List[Dict[str, Any]], max_score: float = 1.0, language: str = "zh" ) -> List[SpuResult]: """ Convert ES hits to SpuResult list. Args: es_hits: List of ES hit dictionaries (with _id, _score, _source) max_score: Maximum score (unused, kept for compatibility) Returns: List of SpuResult objects """ results = [] lang = (language or "zh").lower() if lang not in ("zh", "en"): lang = "en" def pick_lang_field(src: Dict[str, Any], base: str) -> Optional[str]: """从 *_zh / *_en 字段中按语言选择一个值,若目标语言缺失则回退到另一种。""" zh_val = src.get(f"{base}_zh") en_val = src.get(f"{base}_en") if lang == "zh": return zh_val or en_val else: return en_val or zh_val for hit in es_hits: source = hit.get('_source', {}) score = hit.get('_score') # Use original ES score directly (no normalization) # Handle None score (can happen with certain query types or when score is explicitly null) if score is None: relevance_score = 0.0 else: try: relevance_score = float(score) except (ValueError, TypeError): relevance_score = 0.0 # Multi-language fields title = pick_lang_field(source, "title") brief = pick_lang_field(source, "brief") description = pick_lang_field(source, "description") vendor = pick_lang_field(source, "vendor") category_path = pick_lang_field(source, "category_path") category_name = pick_lang_field(source, "category_name") # Extract SKUs skus = [] skus_data = source.get('skus', []) if isinstance(skus_data, list): for sku_entry in skus_data: sku = SkuResult( sku_id=str(sku_entry.get('sku_id', '')), title=sku_entry.get('title'), price=sku_entry.get('price'), compare_at_price=sku_entry.get('compare_at_price'), sku=sku_entry.get('sku'), stock=sku_entry.get('stock', 0), options=sku_entry.get('options') ) skus.append(sku) # Determine in_stock (any sku has stock > 0) in_stock = any(sku.stock > 0 for sku in skus) if skus else True # Build SpuResult spu = SpuResult( spu_id=str(source.get('spu_id', '')), title=title, brief=brief, handle=source.get('handle'), description=description, vendor=vendor, category=category_name, category_path=category_path, category_name=category_name, category_id=source.get('category_id'), category_level=source.get('category_level'), category1_name=source.get('category1_name'), category2_name=source.get('category2_name'), category3_name=source.get('category3_name'), tags=source.get('tags'), price=source.get('min_price'), compare_at_price=source.get('compare_at_price'), currency="USD", # Default currency image_url=source.get('image_url'), in_stock=in_stock, sku_prices=source.get('sku_prices'), sku_weights=source.get('sku_weights'), sku_weight_units=source.get('sku_weight_units'), total_inventory=source.get('total_inventory'), option1_name=source.get('option1_name'), option2_name=source.get('option2_name'), option3_name=source.get('option3_name'), specifications=source.get('specifications'), skus=skus, relevance_score=relevance_score ) results.append(spu) return results @staticmethod def format_facets( es_aggregations: Dict[str, Any], facet_configs: Optional[List[Any]] = None ) -> List[FacetResult]: """ Format ES aggregations to FacetResult list. 支持: 1. 普通terms聚合 2. range聚合 3. specifications嵌套聚合(按name分组,然后按value聚合) Args: es_aggregations: ES aggregations response facet_configs: Facet configurations (optional) Returns: List of FacetResult objects """ facets = [] for field_name, agg_data in es_aggregations.items(): display_field = field_name[:-6] if field_name.endswith("_facet") else field_name # 处理specifications嵌套分面 if field_name == "specifications_facet" and 'by_name' in agg_data: # specifications嵌套聚合:按name分组,每个name下有value_counts by_name_agg = agg_data['by_name'] if 'buckets' in by_name_agg: for name_bucket in by_name_agg['buckets']: name = name_bucket['key'] value_counts = name_bucket.get('value_counts', {}) values = [] if 'buckets' in value_counts: for value_bucket in value_counts['buckets']: value = FacetValue( value=value_bucket['key'], label=str(value_bucket['key']), count=value_bucket['doc_count'], selected=False ) values.append(value) # 为每个name创建一个分面结果 facet = FacetResult( field=f"specifications.{name}", label=str(name), # 使用name作为label,如"颜色"、"尺寸" type="terms", values=values, total_count=name_bucket['doc_count'] ) facets.append(facet) continue # Handle terms aggregation if 'buckets' in agg_data: values = [] for bucket in agg_data['buckets']: value = FacetValue( value=bucket['key'], label=bucket.get('key_as_string', str(bucket['key'])), count=bucket['doc_count'], selected=False ) values.append(value) facet = FacetResult( field=display_field, label=display_field, # Can be enhanced with field labels type="terms", values=values, total_count=agg_data.get('sum_other_doc_count', 0) + len(values) ) facets.append(facet) # Handle range aggregation elif 'buckets' in agg_data and any('from' in b or 'to' in b for b in agg_data['buckets']): values = [] for bucket in agg_data['buckets']: range_key = bucket.get('key', '') value = FacetValue( value=range_key, label=range_key, count=bucket['doc_count'], selected=False ) values.append(value) facet = FacetResult( field=display_field, label=display_field, type="range", values=values ) facets.append(facet) return facets @staticmethod def generate_suggestions( query: str, results: List[SpuResult] ) -> List[str]: """ Generate search suggestions. Args: query: Original search query results: Search results Returns: List of suggestion strings (currently returns empty list) """ # TODO: Implement suggestion generation logic return [] @staticmethod def generate_related_searches( query: str, results: List[SpuResult] ) -> List[str]: """ Generate related searches. Args: query: Original search query results: Search results Returns: List of related search strings (currently returns empty list) """ # TODO: Implement related search generation logic return []