result_formatter.py 17.4 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
"""
Result formatter for converting ES internal format to external-friendly format.
"""

from typing import List, Dict, Any, Optional
from .models import SpuResult, SkuResult, FacetResult, FacetValue


class ResultFormatter:
    """Formats ES search results to external-friendly format."""

    @staticmethod
    def format_search_results(
        es_hits: List[Dict[str, Any]],
        max_score: float = 1.0,
        language: str = "zh",
        sku_filter_dimension: Optional[List[str]] = None
    ) -> List[SpuResult]:
        """
        Convert ES hits to SpuResult list.

        Args:
            es_hits: List of ES hit dictionaries (with _id, _score, _source)
            max_score: Maximum score (unused, kept for compatibility)

        Returns:
            List of SpuResult objects
        """
        results = []
        lang = (language or "zh").lower()
        if lang not in ("zh", "en"):
            lang = "en"

        def pick_lang_field(src: Dict[str, Any], base: str) -> Optional[str]:
            """从 *_zh / *_en 字段中按语言选择一个值,若目标语言缺失则回退到另一种。"""
            zh_val = src.get(f"{base}_zh")
            en_val = src.get(f"{base}_en")
            if lang == "zh":
                return zh_val or en_val
            else:
                return en_val or zh_val

        for hit in es_hits:
            source = hit.get('_source', {})
            score = hit.get('_score')
            
            # Use original ES score directly (no normalization)
            # Handle None score (can happen with certain query types or when score is explicitly null)
            if score is None:
                relevance_score = 0.0
            else:
                try:
                    relevance_score = float(score)
                except (ValueError, TypeError):
                    relevance_score = 0.0

            # Multi-language fields
            title = pick_lang_field(source, "title")
            brief = pick_lang_field(source, "brief")
            description = pick_lang_field(source, "description")
            vendor = pick_lang_field(source, "vendor")
            category_path = pick_lang_field(source, "category_path")
            category_name = pick_lang_field(source, "category_name")

            # Extract SKUs
            skus = []
            skus_data = source.get('skus', [])
            if isinstance(skus_data, list):
                for sku_entry in skus_data:
                    sku = SkuResult(
                        sku_id=str(sku_entry.get('sku_id', '')),
                        title=sku_entry.get('title'),
                        price=sku_entry.get('price'),
                        compare_at_price=sku_entry.get('compare_at_price'),
                        sku=sku_entry.get('sku'),
                        sku_code=sku_entry.get('sku_code'),
                        stock=sku_entry.get('stock', 0),
                        weight=sku_entry.get('weight'),
                        weight_unit=sku_entry.get('weight_unit'),
                        option1_value=sku_entry.get('option1_value'),
                        option2_value=sku_entry.get('option2_value'),
                        option3_value=sku_entry.get('option3_value'),
                        image_src=sku_entry.get('image_src'),
                        options=sku_entry.get('options')
                    )
                    skus.append(sku)

            # Apply SKU filtering if dimension list is specified
            if sku_filter_dimension and skus:
                skus = ResultFormatter._filter_skus_by_dimensions(
                    skus,
                    sku_filter_dimension,
                    source.get('option1_name'),
                    source.get('option2_name'),
                    source.get('option3_name'),
                    source.get('specifications', [])
                )

            # Determine in_stock (any sku has stock > 0)
            in_stock = any(sku.stock > 0 for sku in skus) if skus else True

            # Build SpuResult
            spu = SpuResult(
                spu_id=str(source.get('spu_id', '')),
                title=title,
                brief=brief,
                handle=source.get('handle'),
                description=description,
                vendor=vendor,
                category=category_name,
                category_path=category_path,
                category_name=category_name,
                category_id=source.get('category_id'),
                category_level=source.get('category_level'),
                category1_name=source.get('category1_name'),
                category2_name=source.get('category2_name'),
                category3_name=source.get('category3_name'),
                tags=source.get('tags'),
                price=source.get('min_price'),
                compare_at_price=source.get('compare_at_price'),
                currency="USD",  # Default currency
                image_url=source.get('image_url'),
                in_stock=in_stock,
                sku_prices=source.get('sku_prices'),
                sku_weights=source.get('sku_weights'),
                sku_weight_units=source.get('sku_weight_units'),
                total_inventory=source.get('total_inventory'),
                option1_name=source.get('option1_name'),
                option2_name=source.get('option2_name'),
                option3_name=source.get('option3_name'),
                specifications=source.get('specifications'),
                skus=skus,
                relevance_score=relevance_score
            )

            results.append(spu)

        return results

    @staticmethod
    def _filter_skus_by_dimensions(
        skus: List[SkuResult],
        dimensions: List[str],
        option1_name: Optional[str] = None,
        option2_name: Optional[str] = None,
        option3_name: Optional[str] = None,
        specifications: Optional[List[Dict[str, Any]]] = None
    ) -> List[SkuResult]:
        """
        Filter SKUs by one or more dimensions, keeping only one SKU per dimension value combination.
        
        Args:
            skus: List of SKU results to filter
            dimensions: Filter dimensions, each dimension can be:
                - 'option1', 'option2', 'option3': Direct option field
                - A specification/option name (e.g., 'color', 'size'): Match by option name
            option1_name: Name of option1 (e.g., 'color')
            option2_name: Name of option2 (e.g., 'size')
            option3_name: Name of option3
            specifications: List of specifications (for reference)
            
        Returns:
            Filtered list of SKUs (one per dimension value)
        """
        if not skus or not dimensions:
            return skus

        # Resolve each dimension to an underlying SKU field (option1_value / option2_value / option3_value)
        filter_fields: List[str] = []

        for dim in dimensions:
            if not dim:
                continue
            dim_lower = dim.lower()

            field_name: Optional[str] = None
            # Direct option field (option1, option2, option3)
            if dim_lower == 'option1':
                field_name = 'option1_value'
            elif dim_lower == 'option2':
                field_name = 'option2_value'
            elif dim_lower == 'option3':
                field_name = 'option3_value'
            else:
                # Try to match by option name
                if option1_name and option1_name.lower() == dim_lower:
                    field_name = 'option1_value'
                elif option2_name and option2_name.lower() == dim_lower:
                    field_name = 'option2_value'
                elif option3_name and option3_name.lower() == dim_lower:
                    field_name = 'option3_value'

            if field_name and field_name not in filter_fields:
                filter_fields.append(field_name)

        # If no matching field found for all dimensions, do not return any child SKUs
        if not filter_fields:
            return []

        # Group SKUs by dimension value combination and select first one from each group
        dimension_groups: Dict[tuple, SkuResult] = {}

        for sku in skus:
            # Build key as combination of all dimension values
            key_values: List[str] = []
            for field in filter_fields:
                dimension_value = getattr(sku, field, None)
                # Use empty string as key part for None values
                key_values.append(str(dimension_value) if dimension_value is not None else '')

            key = tuple(key_values)

            # Keep first SKU for each dimension combination
            if key not in dimension_groups:
                dimension_groups[key] = sku

        # Return filtered SKUs (one per dimension combination)
        return list(dimension_groups.values())

    @staticmethod
    def format_facets(
        es_aggregations: Dict[str, Any],
        facet_configs: Optional[List[Any]] = None,
        current_filters: Optional[Dict[str, Any]] = None
    ) -> List[FacetResult]:
        """
        Format ES aggregations to FacetResult list with selected state.

        支持:
        1. 普通terms聚合
        2. range聚合
        3. specifications嵌套聚合(按name分组,然后按value聚合)
        4. 标记selected状态(基于current_filters)

        Args:
            es_aggregations: ES aggregations response
            facet_configs: Facet configurations (optional)
            current_filters: Current applied filters (used to mark selected values)

        Returns:
            List of FacetResult objects with selected states
        """
        facets = []
        
        # Build a set of selected values for specifications
        selected_specs = set()
        if current_filters and 'specifications' in current_filters:
            specs = current_filters['specifications']
            if isinstance(specs, list):
                # [{"name": "颜色", "value": "白色"}, ...]
                for spec in specs:
                    if isinstance(spec, dict):
                        selected_specs.add((spec.get('name'), spec.get('value')))
            elif isinstance(specs, dict):
                # {"name": "颜色", "value": "白色"}
                selected_specs.add((specs.get('name'), specs.get('value')))

        for field_name, agg_data in es_aggregations.items():
            display_field = field_name[:-6] if field_name.endswith("_facet") else field_name
            
            # 处理specifications嵌套分面(所有name)
            if field_name == "specifications_facet" and 'by_name' in agg_data:
                # specifications嵌套聚合:按name分组,每个name下有value_counts
                by_name_agg = agg_data['by_name']
                if 'buckets' in by_name_agg:
                    for name_bucket in by_name_agg['buckets']:
                        name = name_bucket['key']
                        value_counts = name_bucket.get('value_counts', {})
                        
                        values = []
                        if 'buckets' in value_counts:
                            for value_bucket in value_counts['buckets']:
                                # Check if this spec value is selected
                                is_selected = (name, value_bucket['key']) in selected_specs
                                value = FacetValue(
                                    value=value_bucket['key'],
                                    label=str(value_bucket['key']),
                                    count=value_bucket['doc_count'],
                                    selected=is_selected
                                )
                                values.append(value)
                        
                        # 为每个name创建一个分面结果
                        facet = FacetResult(
                            field=f"specifications.{name}",
                            label=str(name),  # 使用name作为label,如"颜色"、"尺寸"
                            type="terms",
                            values=values,
                            total_count=name_bucket['doc_count']
                        )
                        facets.append(facet)
                continue
            
            # 处理specifications嵌套分面(指定name,如 specifications.color)
            if field_name.startswith("specifications_") and field_name.endswith("_facet"):
                # 提取name(从 "specifications_color_facet" 提取 "color")
                name = field_name[len("specifications_"):-len("_facet")]
                
                # ES nested聚合返回结构: { "doc_count": N, "filter_by_name": { ... } }
                # filter_by_name应该在agg_data的第一层
                filter_by_name_agg = agg_data.get('filter_by_name')
                
                if filter_by_name_agg:
                    value_counts = filter_by_name_agg.get('value_counts', {})
                    
                    values = []
                    if 'buckets' in value_counts and value_counts['buckets']:
                        for value_bucket in value_counts['buckets']:
                            # Check if this spec value is selected
                            is_selected = (name, value_bucket['key']) in selected_specs
                            value = FacetValue(
                                value=value_bucket['key'],
                                label=str(value_bucket['key']),
                                count=value_bucket['doc_count'],
                                selected=is_selected
                            )
                            values.append(value)
                    
                    # 创建分面结果
                    facet = FacetResult(
                        field=f"specifications.{name}",
                        label=str(name),
                        type="terms",
                        values=values,
                        total_count=filter_by_name_agg.get('doc_count', 0)
                    )
                    facets.append(facet)
                continue
            
            # Handle terms aggregation
            if 'buckets' in agg_data:
                values = []
                for bucket in agg_data['buckets']:
                    # Check if this value is selected in current filters
                    is_selected = False
                    if current_filters and display_field in current_filters:
                        filter_value = current_filters[display_field]
                        if isinstance(filter_value, list):
                            is_selected = bucket['key'] in filter_value
                        else:
                            is_selected = bucket['key'] == filter_value
                    
                    value = FacetValue(
                        value=bucket['key'],
                        label=bucket.get('key_as_string', str(bucket['key'])),
                        count=bucket['doc_count'],
                        selected=is_selected
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,  # Can be enhanced with field labels
                    type="terms",
                    values=values,
                    total_count=agg_data.get('sum_other_doc_count', 0) + len(values)
                )
                facets.append(facet)

            # Handle range aggregation
            elif 'buckets' in agg_data and any('from' in b or 'to' in b for b in agg_data['buckets']):
                values = []
                for bucket in agg_data['buckets']:
                    range_key = bucket.get('key', '')
                    # Check if this range is selected
                    is_selected = False
                    if current_filters and display_field in current_filters:
                        filter_value = current_filters[display_field]
                        if isinstance(filter_value, list):
                            is_selected = range_key in filter_value
                        else:
                            is_selected = range_key == filter_value
                    
                    value = FacetValue(
                        value=range_key,
                        label=range_key,
                        count=bucket['doc_count'],
                        selected=is_selected
                    )
                    values.append(value)

                facet = FacetResult(
                    field=display_field,
                    label=display_field,
                    type="range",
                    values=values
                )
                facets.append(facet)

        return facets

    @staticmethod
    def generate_suggestions(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate search suggestions.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of suggestion strings (currently returns empty list)
        """
        # TODO: Implement suggestion generation logic
        return []

    @staticmethod
    def generate_related_searches(
        query: str,
        results: List[SpuResult]
    ) -> List[str]:
        """
        Generate related searches.

        Args:
            query: Original search query
            results: Search results

        Returns:
            List of related search strings (currently returns empty list)
        """
        # TODO: Implement related search generation logic
        return []