result_formatter.py 7.64 KB
"""
Result formatter for converting ES internal format to external-friendly format.
"""

from typing import List, Dict, Any, Optional
from .models import SpuResult, SkuResult, FacetResult, FacetValue


class ResultFormatter:
    """Formats ES search results to external-friendly format."""

    @staticmethod
    def format_search_results(
        es_hits: List[Dict[str, Any]],
        max_score: float = 1.0
    ) -> List[SpuResult]:
        """
        Convert ES hits to SpuResult list.

        Args:
            es_hits: List of ES hit dictionaries (with _id, _score, _source)
            max_score: Maximum score (unused, kept for compatibility)

        Returns:
            List of SpuResult objects
        """
        results = []

        for hit in es_hits:
            source = hit.get('_source', {})
            score = hit.get('_score')
            
            # Use original ES score directly (no normalization)
            # Handle None score (can happen with certain query types or when score is explicitly null)
            if score is None:
                relevance_score = 0.0
            else:
                try:
                    relevance_score = float(score)
                except (ValueError, TypeError):
                    relevance_score = 0.0

            # Extract SKUs
            skus = []
            skus_data = source.get('skus', [])
            if isinstance(skus_data, list):
                for sku_entry in skus_data:
                    sku = SkuResult(
                        sku_id=str(sku_entry.get('sku_id', '')),
                        title=sku_entry.get('title'),
                        price=sku_entry.get('price'),
                        compare_at_price=sku_entry.get('compare_at_price'),
                        sku=sku_entry.get('sku'),
                        stock=sku_entry.get('stock', 0),
                        options=sku_entry.get('options')
                    )
                    skus.append(sku)

            # Determine in_stock (any sku has stock > 0)
            in_stock = any(sku.stock > 0 for sku in skus) if skus else True

            # Build SpuResult
            spu = SpuResult(
                spu_id=str(source.get('spu_id', '')),
                title=source.get('title'),
                handle=source.get('handle'),
                description=source.get('description'),
                vendor=source.get('vendor'),
                category=source.get('category'),
                tags=source.get('tags'),
                price=source.get('min_price'),
                compare_at_price=source.get('compare_at_price'),
                currency="USD",  # Default currency
                image_url=source.get('image_url'),
                in_stock=in_stock,
                skus=skus,
                relevance_score=relevance_score
            )

            results.append(spu)

        return results

    @staticmethod
    def format_facets(
        es_aggregations: Dict[str, Any],
        facet_configs: Optional[List[Any]] = None
    ) -> List[FacetResult]:
        """
        Format ES aggregations to FacetResult list.

        支持:
        1. 普通terms聚合
        2. range聚合
        3. specifications嵌套聚合(按name分组,然后按value聚合)

        Args:
            es_aggregations: ES aggregations response
            facet_configs: Facet configurations (optional)

        Returns:
            List of FacetResult objects
        """
        facets = []

        for field_name, agg_data in es_aggregations.items():
            display_field = field_name[:-6] if field_name.endswith("_facet") else field_name
            
            # 处理specifications嵌套分面
            if field_name == "specifications_facet" and 'by_name' in agg_data:
                # specifications嵌套聚合:按name分组,每个name下有value_counts
                by_name_agg = agg_data['by_name']
                if 'buckets' in by_name_agg:
                    for name_bucket in by_name_agg['buckets']:
                        name = name_bucket['key']
                        value_counts = name_bucket.get('value_counts', {})
                        
                        values = []
                        if 'buckets' in value_counts:
                            for value_bucket in value_counts['buckets']:
                                value = FacetValue(
                                    value=value_bucket['key'],
                                    label=str(value_bucket['key']),
                                    count=value_bucket['doc_count'],
                                    selected=False
                                )
                                values.append(value)
                        
                        # 为每个name创建一个分面结果
                        facet = FacetResult(
                            field=f"specifications.{name}",
                            label=str(name),  # 使用name作为label,如"颜色"、"尺寸"
                            type="terms",
                            values=values,
                            total_count=name_bucket['doc_count']
                        )
                        facets.append(facet)
                continue
            
            # Handle terms aggregation
            if 'buckets' in agg_data:
                values = []
                for bucket in agg_data['buckets']:
                    value = FacetValue(
                        value=bucket['key'],
                        label=bucket.get('key_as_string', str(bucket['key'])),
                        count=bucket['doc_count'],
                        selected=False
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,  # Can be enhanced with field labels
                    type="terms",
                    values=values,
                    total_count=agg_data.get('sum_other_doc_count', 0) + len(values)
                )
                facets.append(facet)

            # Handle range aggregation
            elif 'buckets' in agg_data and any('from' in b or 'to' in b for b in agg_data['buckets']):
                values = []
                for bucket in agg_data['buckets']:
                    range_key = bucket.get('key', '')
                    value = FacetValue(
                        value=range_key,
                        label=range_key,
                        count=bucket['doc_count'],
                        selected=False
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,
                    type="range",
                    values=values
                )
                facets.append(facet)

        return facets

    @staticmethod
    def generate_suggestions(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate search suggestions.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of suggestion strings (currently returns empty list)
        """
        # TODO: Implement suggestion generation logic
        return []

    @staticmethod
    def generate_related_searches(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate related searches.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of related search strings (currently returns empty list)
        """
        # TODO: Implement related search generation logic
        return []