""" SKU selection for style-intent-aware and image-aware search results. Unified algorithm (one pass per hit, no cascading fallback stages): 1. Per active style intent, a SKU's attribute value for that dimension comes from ONE of two sources, in priority order: - ``option``: the SKU's own ``optionN_value`` on the slot resolved by the intent's dimension aliases — authoritative whenever non-empty. - ``taxonomy``: the SPU-level ``enriched_taxonomy_attributes`` value on the same dimension — used only when the SKU has no own value (slot unresolved or value empty). Never overrides a contradicting SKU-level value. 2. A SKU is "text-matched" iff every active intent finds a match on its selected value source (tokens of zh/en/attribute synonyms; values are first passed through ``_with_segment_boundaries_for_matching`` so brackets and common separators split segments; pure-CJK terms still use a substring fallback when the value is one undivided CJK run, e.g. ``卡其色棉``). We remember the matching source and the raw matched text per intent so the final decision can surface it. 3. The image-pick comes straight from the nested ``image_embedding`` inner_hits (``exact_image_knn_query_hits`` preferred, ``image_knn_query_hits`` otherwise): the SKU whose ``image_src`` equals the top-scoring url. 4. Unified selection: - if the text-matched set is non-empty → pick image_pick when it lies in that set (visual tie-break among text-matched), otherwise the first text-matched SKU; - else → pick image_pick if any; - else → no decision (``final_source == "none"``). ``final_source`` values (weakest → strongest text evidence, reversed): ``option`` > ``taxonomy`` > ``image`` > ``none``. If any intent was satisfied only via taxonomy the overall source degrades to ``taxonomy`` so downstream callers can decide whether to differentiate the SPU-level signal from a true SKU-level option match. No embedding fallback, no stage cascade, no score thresholds. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple import posixpath from urllib.parse import unquote, urlsplit from query.style_intent import ( DetectedStyleIntent, StyleIntentProfile, StyleIntentRegistry, ) from query.tokenization import ( contains_han_text, normalize_query_text, simple_tokenize_query, ) import re _NON_HAN_RE = re.compile(r"[^\u4e00-\u9fff]") # Zero-width / BOM (often pasted from Excel or CMS). _ZW_AND_BOM_RE = re.compile(r"[\u200b-\u200d\ufeff\u2060]") # Brackets, slashes, and common commerce/list punctuation → segment boundaries so # tokenization can align intent terms (e.g. 卡其色) with the leading segment of # 卡其色(无内衬) / 卡其色/常规 / 卡其色·麻 等,without relying only on substring. _ATTRIBUTE_BOUNDARY_RE = re.compile( r"[\s\u3000]" # ASCII / ideographic space r"|[\(\)\[\]\{\}()【】{}〈〉《》「」『』[]「」]" r"|[/\\||/\︱丨]" r"|[,,、;;::.。]" r"|[·•・]" r"|[~~]" r"|[+\=#%&*×※]" r"|[\u2010-\u2015\u2212]" # hyphen, en dash, minus, etc. ) def _is_pure_han(value: str) -> bool: """True if the string is non-empty and contains only CJK Unified Ideographs.""" return bool(value) and not _NON_HAN_RE.search(value) def _with_segment_boundaries_for_matching(normalized_value: str) -> str: """Normalize commerce-style option/taxonomy strings for token matching. Inserts word boundaries at brackets and typical separators so ``simple_tokenize_query`` yields segments like ``['卡其色', '无内衬']`` instead of one undifferentiated CJK blob when unusual punctuation appears. """ if not normalized_value: return "" s = _ZW_AND_BOM_RE.sub("", normalized_value) s = _ATTRIBUTE_BOUNDARY_RE.sub(" ", s) return " ".join(s.split()) _IMAGE_INNER_HITS_KEYS: Tuple[str, ...] = ( "exact_image_knn_query_hits", "image_knn_query_hits", ) @dataclass(frozen=True) class ImagePick: sku_id: str url: str score: float @dataclass(frozen=True) class SkuSelectionDecision: selected_sku_id: Optional[str] rerank_suffix: str selected_text: str # "option" | "taxonomy" | "image" | "none" final_source: str resolved_dimensions: Dict[str, Optional[str]] = field(default_factory=dict) # Per-intent matching-source breakdown, e.g. {"color": "option", "size": "taxonomy"}. matched_sources: Dict[str, str] = field(default_factory=dict) image_pick_sku_id: Optional[str] = None image_pick_url: Optional[str] = None image_pick_score: Optional[float] = None # Backward-compat alias; some older callers/tests look at ``matched_stage``. @property def matched_stage(self) -> str: return self.final_source def to_dict(self) -> Dict[str, Any]: return { "selected_sku_id": self.selected_sku_id, "rerank_suffix": self.rerank_suffix, "selected_text": self.selected_text, "final_source": self.final_source, "matched_sources": dict(self.matched_sources), "resolved_dimensions": dict(self.resolved_dimensions), "image_pick": ( { "sku_id": self.image_pick_sku_id, "url": self.image_pick_url, "score": self.image_pick_score, } if self.image_pick_sku_id or self.image_pick_url else None ), } @dataclass class _SelectionContext: """Request-scoped memo for term tokenization and substring match probes.""" terms_by_intent: Dict[str, Tuple[str, ...]] normalized_text_cache: Dict[str, str] = field(default_factory=dict) tokenized_text_cache: Dict[str, Tuple[str, ...]] = field(default_factory=dict) text_match_cache: Dict[Tuple[str, str], bool] = field(default_factory=dict) class StyleSkuSelector: """Selects the best SKU per hit from style-intent text match + image KNN.""" def __init__( self, registry: StyleIntentRegistry, *, text_encoder_getter: Optional[Callable[[], Any]] = None, ) -> None: self.registry = registry # Retained for API back-compat; no longer used now that embedding fallback is gone. self._text_encoder_getter = text_encoder_getter # ------------------------------------------------------------------ # Public entry points # ------------------------------------------------------------------ def prepare_hits( self, es_hits: List[Dict[str, Any]], parsed_query: Any, ) -> Dict[str, SkuSelectionDecision]: """Compute selection decisions (without mutating ``_source``). Runs if either a style intent is active OR any hit carries image inner_hits. Decisions are keyed by ES ``_id`` and meant to be applied later via :meth:`apply_precomputed_decisions` (after page fill). """ decisions: Dict[str, SkuSelectionDecision] = {} style_profile = getattr(parsed_query, "style_intent_profile", None) style_active = ( isinstance(style_profile, StyleIntentProfile) and style_profile.is_active ) selection_context = ( self._build_selection_context(style_profile) if style_active else None ) for hit in es_hits: source = hit.get("_source") if not isinstance(source, dict): continue image_pick = self._pick_sku_by_image(hit, source) if not style_active and image_pick is None: # Nothing to do for this hit. continue decision = self._select( source=source, style_profile=style_profile if style_active else None, selection_context=selection_context, image_pick=image_pick, ) if decision is None: continue if decision.rerank_suffix: hit["_style_rerank_suffix"] = decision.rerank_suffix else: hit.pop("_style_rerank_suffix", None) doc_id = hit.get("_id") if doc_id is not None: decisions[str(doc_id)] = decision return decisions def apply_precomputed_decisions( self, es_hits: List[Dict[str, Any]], decisions: Dict[str, SkuSelectionDecision], ) -> None: if not es_hits or not decisions: return for hit in es_hits: doc_id = hit.get("_id") if doc_id is None: continue decision = decisions.get(str(doc_id)) if decision is None: continue source = hit.get("_source") if not isinstance(source, dict): continue self._apply_decision_to_source(source, decision) if decision.rerank_suffix: hit["_style_rerank_suffix"] = decision.rerank_suffix else: hit.pop("_style_rerank_suffix", None) # ------------------------------------------------------------------ # Selection context & text matching # ------------------------------------------------------------------ def _build_selection_context( self, style_profile: StyleIntentProfile, ) -> _SelectionContext: terms_by_intent: Dict[str, List[str]] = {} for intent in style_profile.intents: terms = terms_by_intent.setdefault(intent.intent_type, []) for raw_term in intent.matching_terms: normalized_term = normalize_query_text(raw_term) if normalized_term and normalized_term not in terms: terms.append(normalized_term) return _SelectionContext( terms_by_intent={ intent_type: tuple(terms) for intent_type, terms in terms_by_intent.items() }, ) def _normalize_cached(self, ctx: _SelectionContext, value: Any) -> str: raw = str(value or "").strip() if not raw: return "" cached = ctx.normalized_text_cache.get(raw) if cached is not None: return cached normalized = normalize_query_text(raw) ctx.normalized_text_cache[raw] = normalized return normalized def _tokenize_cached(self, ctx: _SelectionContext, value: str) -> Tuple[str, ...]: normalized_value = normalize_query_text(value) if not normalized_value: return () cached = ctx.tokenized_text_cache.get(normalized_value) if cached is not None: return cached tokens = tuple( normalize_query_text(token) for token in simple_tokenize_query(normalized_value) if token ) ctx.tokenized_text_cache[normalized_value] = tokens return tokens def _is_text_match( self, intent_type: str, ctx: _SelectionContext, *, normalized_value: str, ) -> bool: """True iff any intent term token-boundary matches the given value.""" if not normalized_value: return False cache_key = (intent_type, normalized_value) cached = ctx.text_match_cache.get(cache_key) if cached is not None: return cached terms = ctx.terms_by_intent.get(intent_type, ()) segmented = _with_segment_boundaries_for_matching(normalized_value) value_tokens = self._tokenize_cached(ctx, segmented) matched = any( self._matches_term_tokens( term=term, value_tokens=value_tokens, ctx=ctx, normalized_value=normalized_value, ) for term in terms if term ) ctx.text_match_cache[cache_key] = matched return matched def _matches_term_tokens( self, *, term: str, value_tokens: Tuple[str, ...], ctx: _SelectionContext, normalized_value: str, ) -> bool: normalized_term = normalize_query_text(term) if not normalized_term: return False if normalized_term == normalized_value: return True # Pure-CJK terms can't be split further by the whitespace/regex tokenizer # ("卡其色棉" is one token), so sliding-window token match would miss the prefix. # Fall back to normalized substring containment — safe because this branch # never triggers for Latin tokens where substring would cause "l" ⊂ "xl" issues. if _is_pure_han(normalized_term) and contains_han_text(normalized_value): return normalized_term in normalized_value term_tokens = self._tokenize_cached(ctx, normalized_term) if not term_tokens or not value_tokens: return normalized_term in normalized_value term_length = len(term_tokens) value_length = len(value_tokens) if term_length > value_length: return False for start in range(value_length - term_length + 1): if value_tokens[start : start + term_length] == term_tokens: return True return False # ------------------------------------------------------------------ # Dimension resolution (option slot + taxonomy values) # ------------------------------------------------------------------ def _resolve_dimensions( self, source: Dict[str, Any], style_profile: StyleIntentProfile, ) -> Dict[str, Optional[str]]: option_fields = ( ("option1_value", source.get("option1_name")), ("option2_value", source.get("option2_name")), ("option3_value", source.get("option3_name")), ) option_aliases = [ (field_name, normalize_query_text(name)) for field_name, name in option_fields ] resolved: Dict[str, Optional[str]] = {} for intent in style_profile.intents: if intent.intent_type in resolved: continue aliases = set( intent.dimension_aliases or self.registry.get_dimension_aliases(intent.intent_type) ) matched_field: Optional[str] = None for field_name, option_name in option_aliases: if option_name and option_name in aliases: matched_field = field_name break resolved[intent.intent_type] = matched_field return resolved def _collect_taxonomy_values( self, source: Dict[str, Any], style_profile: StyleIntentProfile, ) -> Dict[str, Tuple[Tuple[str, str], ...]]: """Extract SPU-level enriched_taxonomy_attributes values per intent dimension. Returns a mapping ``intent_type -> ((normalized, raw), ...)`` so the selection layer can (a) match against ``normalized`` and (b) surface the human-readable ``raw`` form in ``selected_text``. """ attrs = source.get("enriched_taxonomy_attributes") if not isinstance(attrs, list) or not attrs: return {} aliases_by_intent = { intent.intent_type: set( intent.dimension_aliases or self.registry.get_dimension_aliases(intent.intent_type) ) for intent in style_profile.intents } values_by_intent: Dict[str, List[Tuple[str, str]]] = { t: [] for t in aliases_by_intent } for attr in attrs: if not isinstance(attr, dict): continue attr_name = normalize_query_text(attr.get("name")) if not attr_name: continue matching_intents = [ t for t, aliases in aliases_by_intent.items() if attr_name in aliases ] if not matching_intents: continue for raw_text in _iter_multilingual_texts(attr.get("value")): raw = str(raw_text).strip() if not raw: continue normalized = normalize_query_text(raw) if not normalized: continue for intent_type in matching_intents: bucket = values_by_intent[intent_type] if not any(existing_norm == normalized for existing_norm, _ in bucket): bucket.append((normalized, raw)) return {t: tuple(v) for t, v in values_by_intent.items() if v} # ------------------------------------------------------------------ # Image pick # ------------------------------------------------------------------ @staticmethod def _normalize_url(url: Any) -> str: """host + path, no query/fragment; casefolded — primary equality key.""" raw = str(url or "").strip() if not raw: return "" # Accept protocol-relative URLs like "//cdn/..." or full URLs. if raw.startswith("//"): raw = "https:" + raw try: parts = urlsplit(raw) except ValueError: return str(url).strip().casefold() host = (parts.netloc or "").casefold() path = unquote(parts.path or "") return f"{host}{path}".casefold() @staticmethod def _normalize_path_only(url: Any) -> str: """Path-only key for cross-CDN / host-alias cases.""" raw = str(url or "").strip() if not raw: return "" if raw.startswith("//"): raw = "https:" + raw try: parts = urlsplit(raw) path = unquote(parts.path or "") except ValueError: return "" return path.casefold().rstrip("/") @classmethod def _url_filename(cls, url: Any) -> str: p = cls._normalize_path_only(url) if not p: return "" return posixpath.basename(p).casefold() @classmethod def _urls_equivalent(cls, a: Any, b: Any) -> bool: if not a or not b: return False na, nb = cls._normalize_url(a), cls._normalize_url(b) if na and nb and na == nb: return True pa, pb = cls._normalize_path_only(a), cls._normalize_path_only(b) if pa and pb and pa == pb: return True fa, fb = cls._url_filename(a), cls._url_filename(b) if fa and fb and fa == fb and len(fa) > 4: return True return False @staticmethod def _inner_hit_url_candidates(entry: Dict[str, Any], source: Dict[str, Any]) -> List[str]: """URLs to try for this inner_hit: _source.url plus image_embedding[offset].url.""" out: List[str] = [] src = entry.get("_source") or {} u = src.get("url") if u: out.append(str(u).strip()) nested = entry.get("_nested") if not isinstance(nested, dict): return out off = nested.get("offset") if not isinstance(off, int): return out embs = source.get("image_embedding") if not isinstance(embs, list) or not (0 <= off < len(embs)): return out emb = embs[off] if isinstance(emb, dict) and emb.get("url"): u2 = str(emb.get("url")).strip() if u2 and u2 not in out: out.append(u2) return out def _pick_sku_by_image( self, hit: Dict[str, Any], source: Dict[str, Any], ) -> Optional[ImagePick]: """Map ES nested image KNN inner_hits to a SKU via image URL alignment. ``image_pick`` is empty when: - ES did not return ``inner_hits`` for this hit (e.g. doc outside ``rescore.window_size`` so no exact-image rescore inner_hits; or the nested image clause did not match this document). - The winning nested ``url`` cannot be aligned to any ``skus[].image_src`` even after path/filename normalization (rare CDN / encoding edge cases). We try ``_source.url``, ``_nested.offset`` + ``image_embedding[offset].url``, and loose path/filename matching to reduce false negatives. """ inner_hits = hit.get("inner_hits") if not isinstance(inner_hits, dict): return None best_entry: Optional[Dict[str, Any]] = None top_score: Optional[float] = None for key in _IMAGE_INNER_HITS_KEYS: payload = inner_hits.get(key) if not isinstance(payload, dict): continue hits_block = payload.get("hits") inner_list = hits_block.get("hits") if isinstance(hits_block, dict) else None if not isinstance(inner_list, list) or not inner_list: continue for entry in inner_list: if not isinstance(entry, dict): continue if not self._inner_hit_url_candidates(entry, source): continue try: score = float(entry.get("_score") or 0.0) except (TypeError, ValueError): score = 0.0 if top_score is None or score > top_score: best_entry = entry top_score = score if best_entry is not None: break # Prefer exact_image_knn_query_hits over image_knn_query_hits. if best_entry is None: return None candidates = self._inner_hit_url_candidates(best_entry, source) if not candidates: return None skus = source.get("skus") if not isinstance(skus, list): return None for sku in skus: sku_raw = sku.get("image_src") or sku.get("imageSrc") for cand in candidates: if self._urls_equivalent(cand, sku_raw): return ImagePick( sku_id=str(sku.get("sku_id") or ""), url=cand, score=float(top_score or 0.0), ) return None # ------------------------------------------------------------------ # Unified per-hit selection # ------------------------------------------------------------------ def _select( self, *, source: Dict[str, Any], style_profile: Optional[StyleIntentProfile], selection_context: Optional[_SelectionContext], image_pick: Optional[ImagePick], ) -> Optional[SkuSelectionDecision]: skus = source.get("skus") if not isinstance(skus, list) or not skus: return None resolved_dimensions: Dict[str, Optional[str]] = {} text_matched: List[Tuple[Dict[str, Any], Dict[str, Tuple[str, str]]]] = [] if style_profile is not None and selection_context is not None: resolved_dimensions = self._resolve_dimensions(source, style_profile) taxonomy_values = self._collect_taxonomy_values(source, style_profile) # Only attempt text match when there is at least one value source # per intent (SKU option or SPU taxonomy). if all( resolved_dimensions.get(intent.intent_type) is not None or taxonomy_values.get(intent.intent_type) for intent in style_profile.intents ): text_matched = self._find_text_matched_skus( skus=skus, style_profile=style_profile, resolved_dimensions=resolved_dimensions, taxonomy_values=taxonomy_values, ctx=selection_context, ) selected_sku_id: Optional[str] = None selected_text = "" final_source = "none" matched_sources: Dict[str, str] = {} if text_matched: chosen_sku, per_intent = self._choose_among_text_matched( text_matched, image_pick ) selected_sku_id = str(chosen_sku.get("sku_id") or "") or None selected_text = self._text_from_matches(per_intent) matched_sources = { intent_type: src for intent_type, (src, _) in per_intent.items() } final_source = ( "taxonomy" if "taxonomy" in matched_sources.values() else "option" ) elif image_pick is not None: image_sku = self._find_sku_by_id(skus, image_pick.sku_id) if image_sku is not None: selected_sku_id = image_pick.sku_id or None selected_text = self._build_selected_text(image_sku, resolved_dimensions) final_source = "image" return SkuSelectionDecision( selected_sku_id=selected_sku_id, rerank_suffix=selected_text, selected_text=selected_text, final_source=final_source, resolved_dimensions=resolved_dimensions, matched_sources=matched_sources, image_pick_sku_id=(image_pick.sku_id or None) if image_pick else None, image_pick_url=image_pick.url if image_pick else None, image_pick_score=image_pick.score if image_pick else None, ) def _find_text_matched_skus( self, *, skus: List[Dict[str, Any]], style_profile: StyleIntentProfile, resolved_dimensions: Dict[str, Optional[str]], taxonomy_values: Dict[str, Tuple[Tuple[str, str], ...]], ctx: _SelectionContext, ) -> List[Tuple[Dict[str, Any], Dict[str, Tuple[str, str]]]]: """Return every SKU that satisfies every active intent, with match meta. Authority rule per intent: - If the SKU has a non-empty value on the resolved option slot, that value ALONE decides the match (source = ``option``). Taxonomy cannot override a contradicting SKU-level value. - Only when the SKU has no own value on the dimension (slot unresolved or value empty) does the SPU-level taxonomy serve as the fallback value source (source = ``taxonomy``). For each matched SKU we also return a per-intent dict mapping ``intent_type -> (source, raw_matched_text)`` so the final decision can surface the genuinely matched string in ``selected_text`` / ``rerank_suffix`` rather than, e.g., a SKU's unrelated option value. """ matched: List[Tuple[Dict[str, Any], Dict[str, Tuple[str, str]]]] = [] for sku in skus: per_intent: Dict[str, Tuple[str, str]] = {} all_ok = True for intent in style_profile.intents: slot = resolved_dimensions.get(intent.intent_type) sku_raw = str(sku.get(slot) or "").strip() if slot else "" sku_norm = normalize_query_text(sku_raw) if sku_raw else "" if sku_norm: if self._is_text_match( intent.intent_type, ctx, normalized_value=sku_norm ): per_intent[intent.intent_type] = ("option", sku_raw) else: all_ok = False break else: matched_raw: Optional[str] = None for tax_norm, tax_raw in taxonomy_values.get( intent.intent_type, () ): if self._is_text_match( intent.intent_type, ctx, normalized_value=tax_norm ): matched_raw = tax_raw break if matched_raw is None: all_ok = False break per_intent[intent.intent_type] = ("taxonomy", matched_raw) if all_ok: matched.append((sku, per_intent)) return matched @staticmethod def _choose_among_text_matched( text_matched: List[Tuple[Dict[str, Any], Dict[str, Tuple[str, str]]]], image_pick: Optional[ImagePick], ) -> Tuple[Dict[str, Any], Dict[str, Tuple[str, str]]]: """Image-visual tie-break inside the text-matched set; else first match.""" if image_pick and image_pick.sku_id: for sku, per_intent in text_matched: if str(sku.get("sku_id") or "") == image_pick.sku_id: return sku, per_intent return text_matched[0] @staticmethod def _text_from_matches(per_intent: Dict[str, Tuple[str, str]]) -> str: """Join the genuinely matched raw strings in intent declaration order.""" parts: List[str] = [] seen: set[str] = set() for _, raw in per_intent.values(): if raw and raw not in seen: seen.add(raw) parts.append(raw) return " ".join(parts).strip() @staticmethod def _find_sku_by_id( skus: List[Dict[str, Any]], sku_id: Optional[str] ) -> Optional[Dict[str, Any]]: if not sku_id: return None for sku in skus: if str(sku.get("sku_id") or "") == sku_id: return sku return None @staticmethod def _build_selected_text( sku: Dict[str, Any], resolved_dimensions: Dict[str, Optional[str]], ) -> str: """Text carried into rerank doc suffix: joined raw values on the resolved slots.""" parts: List[str] = [] seen: set[str] = set() for slot in resolved_dimensions.values(): if not slot: continue raw = str(sku.get(slot) or "").strip() if raw and raw not in seen: seen.add(raw) parts.append(raw) return " ".join(parts).strip() # ------------------------------------------------------------------ # Source mutation (applied after page fill) # ------------------------------------------------------------------ @staticmethod def _apply_decision_to_source( source: Dict[str, Any], decision: SkuSelectionDecision ) -> None: if not decision.selected_sku_id: return skus = source.get("skus") if not isinstance(skus, list) or not skus: return selected_index: Optional[int] = None for index, sku in enumerate(skus): if str(sku.get("sku_id") or "") == decision.selected_sku_id: selected_index = index break if selected_index is None: return selected_sku = skus.pop(selected_index) skus.insert(0, selected_sku) image_src = selected_sku.get("image_src") or selected_sku.get("imageSrc") if image_src: source["image_url"] = image_src def _iter_multilingual_texts(value: Any) -> List[str]: """Flatten a value that may be str, list, or multilingual dict {zh, en, ...}.""" if value is None: return [] if isinstance(value, str): return [value] if value.strip() else [] if isinstance(value, dict): out: List[str] = [] for v in value.values(): out.extend(_iter_multilingual_texts(v)) return out if isinstance(value, (list, tuple)): out = [] for v in value: out.extend(_iter_multilingual_texts(v)) return out return []