result_formatter.py 15.2 KB
"""
Result formatter for converting ES internal format to external-friendly format.
"""

from typing import List, Dict, Any, Optional
from .models import SpuResult, SkuResult, FacetResult, FacetValue


class ResultFormatter:
    """Formats ES search results to external-friendly format."""

    @staticmethod
    def format_search_results(
        es_hits: List[Dict[str, Any]],
        max_score: float = 1.0,
        language: str = "zh",
        sku_filter_dimension: Optional[List[str]] = None
    ) -> List[SpuResult]:
        """
        Convert ES hits to SpuResult list.

        Args:
            es_hits: List of ES hit dictionaries (with _id, _score, _source)
            max_score: Maximum score (unused, kept for compatibility)

        Returns:
            List of SpuResult objects
        """
        results = []
        lang = (language or "zh").lower()
        if lang not in ("zh", "en"):
            lang = "en"

        def pick_lang_field(src: Dict[str, Any], base: str) -> Optional[str]:
            """从 *_zh / *_en 字段中按语言选择一个值,若目标语言缺失则回退到另一种。"""
            zh_val = src.get(f"{base}_zh")
            en_val = src.get(f"{base}_en")
            if lang == "zh":
                return zh_val or en_val
            else:
                return en_val or zh_val

        for hit in es_hits:
            source = hit.get('_source', {})
            score = hit.get('_score')
            
            # Use original ES score directly (no normalization)
            # Handle None score (can happen with certain query types or when score is explicitly null)
            if score is None:
                relevance_score = 0.0
            else:
                try:
                    relevance_score = float(score)
                except (ValueError, TypeError):
                    relevance_score = 0.0

            # Multi-language fields
            title = pick_lang_field(source, "title")
            brief = pick_lang_field(source, "brief")
            description = pick_lang_field(source, "description")
            vendor = pick_lang_field(source, "vendor")
            category_path = pick_lang_field(source, "category_path")
            category_name = pick_lang_field(source, "category_name")

            # Extract SKUs
            skus = []
            skus_data = source.get('skus', [])
            if isinstance(skus_data, list):
                for sku_entry in skus_data:
                    sku = SkuResult(
                        sku_id=str(sku_entry.get('sku_id', '')),
                        title=sku_entry.get('title'),
                        price=sku_entry.get('price'),
                        compare_at_price=sku_entry.get('compare_at_price'),
                        sku=sku_entry.get('sku'),
                        sku_code=sku_entry.get('sku_code'),
                        stock=sku_entry.get('stock', 0),
                        weight=sku_entry.get('weight'),
                        weight_unit=sku_entry.get('weight_unit'),
                        option1_value=sku_entry.get('option1_value'),
                        option2_value=sku_entry.get('option2_value'),
                        option3_value=sku_entry.get('option3_value'),
                        image_src=sku_entry.get('image_src'),
                        options=sku_entry.get('options')
                    )
                    skus.append(sku)

            # Apply SKU filtering if dimension list is specified
            if sku_filter_dimension and skus:
                skus = ResultFormatter._filter_skus_by_dimensions(
                    skus,
                    sku_filter_dimension,
                    source.get('option1_name'),
                    source.get('option2_name'),
                    source.get('option3_name'),
                    source.get('specifications', [])
                )

            # Determine in_stock (any sku has stock > 0)
            in_stock = any(sku.stock > 0 for sku in skus) if skus else True

            # Build SpuResult
            spu = SpuResult(
                spu_id=str(source.get('spu_id', '')),
                title=title,
                brief=brief,
                handle=source.get('handle'),
                description=description,
                vendor=vendor,
                category=category_name,
                category_path=category_path,
                category_name=category_name,
                category_id=source.get('category_id'),
                category_level=source.get('category_level'),
                category1_name=source.get('category1_name'),
                category2_name=source.get('category2_name'),
                category3_name=source.get('category3_name'),
                tags=source.get('tags'),
                price=source.get('min_price'),
                compare_at_price=source.get('compare_at_price'),
                currency="USD",  # Default currency
                image_url=source.get('image_url'),
                in_stock=in_stock,
                sku_prices=source.get('sku_prices'),
                sku_weights=source.get('sku_weights'),
                sku_weight_units=source.get('sku_weight_units'),
                total_inventory=source.get('total_inventory'),
                option1_name=source.get('option1_name'),
                option2_name=source.get('option2_name'),
                option3_name=source.get('option3_name'),
                specifications=source.get('specifications'),
                skus=skus,
                relevance_score=relevance_score
            )

            results.append(spu)

        return results

    @staticmethod
    def _filter_skus_by_dimensions(
        skus: List[SkuResult],
        dimensions: List[str],
        option1_name: Optional[str] = None,
        option2_name: Optional[str] = None,
        option3_name: Optional[str] = None,
        specifications: Optional[List[Dict[str, Any]]] = None
    ) -> List[SkuResult]:
        """
        Filter SKUs by one or more dimensions, keeping only one SKU per dimension value combination.
        
        Args:
            skus: List of SKU results to filter
            dimensions: Filter dimensions, each dimension can be:
                - 'option1', 'option2', 'option3': Direct option field
                - A specification/option name (e.g., 'color', 'size'): Match by option name
            option1_name: Name of option1 (e.g., 'color')
            option2_name: Name of option2 (e.g., 'size')
            option3_name: Name of option3
            specifications: List of specifications (for reference)
            
        Returns:
            Filtered list of SKUs (one per dimension value)
        """
        if not skus or not dimensions:
            return skus

        # Resolve each dimension to an underlying SKU field (option1_value / option2_value / option3_value)
        filter_fields: List[str] = []

        for dim in dimensions:
            if not dim:
                continue
            dim_lower = dim.lower()

            field_name: Optional[str] = None
            # Direct option field (option1, option2, option3)
            if dim_lower == 'option1':
                field_name = 'option1_value'
            elif dim_lower == 'option2':
                field_name = 'option2_value'
            elif dim_lower == 'option3':
                field_name = 'option3_value'
            else:
                # Try to match by option name
                if option1_name and option1_name.lower() == dim_lower:
                    field_name = 'option1_value'
                elif option2_name and option2_name.lower() == dim_lower:
                    field_name = 'option2_value'
                elif option3_name and option3_name.lower() == dim_lower:
                    field_name = 'option3_value'

            if field_name and field_name not in filter_fields:
                filter_fields.append(field_name)

        # If no matching field found for all dimensions, do not return any child SKUs
        if not filter_fields:
            return []

        # Group SKUs by dimension value combination and select first one from each group
        dimension_groups: Dict[tuple, SkuResult] = {}

        for sku in skus:
            # Build key as combination of all dimension values
            key_values: List[str] = []
            for field in filter_fields:
                dimension_value = getattr(sku, field, None)
                # Use empty string as key part for None values
                key_values.append(str(dimension_value) if dimension_value is not None else '')

            key = tuple(key_values)

            # Keep first SKU for each dimension combination
            if key not in dimension_groups:
                dimension_groups[key] = sku

        # Return filtered SKUs (one per dimension combination)
        return list(dimension_groups.values())

    @staticmethod
    def format_facets(
        es_aggregations: Dict[str, Any],
        facet_configs: Optional[List[Any]] = None
    ) -> List[FacetResult]:
        """
        Format ES aggregations to FacetResult list.

        支持:
        1. 普通terms聚合
        2. range聚合
        3. specifications嵌套聚合(按name分组,然后按value聚合)

        Args:
            es_aggregations: ES aggregations response
            facet_configs: Facet configurations (optional)

        Returns:
            List of FacetResult objects
        """
        facets = []

        for field_name, agg_data in es_aggregations.items():
            display_field = field_name[:-6] if field_name.endswith("_facet") else field_name
            
            # 处理specifications嵌套分面(所有name)
            if field_name == "specifications_facet" and 'by_name' in agg_data:
                # specifications嵌套聚合:按name分组,每个name下有value_counts
                by_name_agg = agg_data['by_name']
                if 'buckets' in by_name_agg:
                    for name_bucket in by_name_agg['buckets']:
                        name = name_bucket['key']
                        value_counts = name_bucket.get('value_counts', {})
                        
                        values = []
                        if 'buckets' in value_counts:
                            for value_bucket in value_counts['buckets']:
                                value = FacetValue(
                                    value=value_bucket['key'],
                                    label=str(value_bucket['key']),
                                    count=value_bucket['doc_count'],
                                    selected=False
                                )
                                values.append(value)
                        
                        # 为每个name创建一个分面结果
                        facet = FacetResult(
                            field=f"specifications.{name}",
                            label=str(name),  # 使用name作为label,如"颜色"、"尺寸"
                            type="terms",
                            values=values,
                            total_count=name_bucket['doc_count']
                        )
                        facets.append(facet)
                continue
            
            # 处理specifications嵌套分面(指定name,如 specifications.color)
            if field_name.startswith("specifications_") and field_name.endswith("_facet"):
                # 提取name(从 "specifications_color_facet" 提取 "color")
                name = field_name[len("specifications_"):-len("_facet")]
                
                # ES nested聚合返回结构: { "doc_count": N, "filter_by_name": { ... } }
                # filter_by_name应该在agg_data的第一层
                filter_by_name_agg = agg_data.get('filter_by_name')
                
                if filter_by_name_agg:
                    value_counts = filter_by_name_agg.get('value_counts', {})
                    
                    values = []
                    if 'buckets' in value_counts and value_counts['buckets']:
                        for value_bucket in value_counts['buckets']:
                            value = FacetValue(
                                value=value_bucket['key'],
                                label=str(value_bucket['key']),
                                count=value_bucket['doc_count'],
                                selected=False
                            )
                            values.append(value)
                    
                    # 创建分面结果
                    facet = FacetResult(
                        field=f"specifications.{name}",
                        label=str(name),
                        type="terms",
                        values=values,
                        total_count=filter_by_name_agg.get('doc_count', 0)
                    )
                    facets.append(facet)
                continue
            
            # Handle terms aggregation
            if 'buckets' in agg_data:
                values = []
                for bucket in agg_data['buckets']:
                    value = FacetValue(
                        value=bucket['key'],
                        label=bucket.get('key_as_string', str(bucket['key'])),
                        count=bucket['doc_count'],
                        selected=False
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,  # Can be enhanced with field labels
                    type="terms",
                    values=values,
                    total_count=agg_data.get('sum_other_doc_count', 0) + len(values)
                )
                facets.append(facet)

            # Handle range aggregation
            elif 'buckets' in agg_data and any('from' in b or 'to' in b for b in agg_data['buckets']):
                values = []
                for bucket in agg_data['buckets']:
                    range_key = bucket.get('key', '')
                    value = FacetValue(
                        value=range_key,
                        label=range_key,
                        count=bucket['doc_count'],
                        selected=False
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,
                    type="range",
                    values=values
                )
                facets.append(facet)

        return facets

    @staticmethod
    def generate_suggestions(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate search suggestions.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of suggestion strings (currently returns empty list)
        """
        # TODO: Implement suggestion generation logic
        return []

    @staticmethod
    def generate_related_searches(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate related searches.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of related search strings (currently returns empty list)
        """
        # TODO: Implement related search generation logic
        return []