#!/usr/bin/env python3 """ Offline experiment: English query bucketing (intersection / boost / drop). Scheme A: spaCy noun_chunks + head + lemma + rule buckets Scheme B: spaCy NP candidates + KeyBERT rerank → intersection vs boost Scheme C: YAKE + spaCy noun/POS filter Run (after deps): python scripts/experiments/english_query_bucketing_demo.py Optional: pip install -r scripts/experiments/requirements_query_bucketing_experiments.txt """ from __future__ import annotations import argparse import json import re import sys from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Sequence, Set, Tuple # --- shared ----------------------------------------------------------------- _POSSESSIVE_RE = re.compile(r"(['’]s)\b", re.IGNORECASE) def normalize_query(s: str) -> str: s = (s or "").strip() s = _POSSESSIVE_RE.sub("", s) return s @dataclass class BucketResult: intersection_terms: List[str] = field(default_factory=list) boost_terms: List[str] = field(default_factory=list) drop_terms: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "intersection_terms": self.intersection_terms, "boost_terms": self.boost_terms, "drop_terms": self.drop_terms, } def _dedupe_preserve(seq: Sequence[str]) -> List[str]: seen: Set[str] = set() out: List[str] = [] for x in seq: k = x.strip().lower() if not k or k in seen: continue seen.add(k) out.append(x.strip()) return out # --- Scheme A: spaCy + rules ------------------------------------------------- WEAK_BOOST_ADJS = frozenset( { "best", "good", "great", "new", "free", "cheap", "top", "fine", "real", } ) FUNCTIONAL_DEP = frozenset( { "det", "aux", "auxpass", "prep", "mark", "expl", "cc", "punct", "case", } ) # Second pobj under list-like INTJ roots often encodes audience/size (boost, not must-match). _DEMOGRAPHIC_NOUNS = frozenset( { "women", "woman", "men", "man", "kids", "kid", "boys", "boy", "girls", "girl", "baby", "babies", "toddler", "adult", "adults", } ) def _lemma_lower(t) -> str: return ((t.lemma_ or t.text) or "").lower().strip() def _surface_lower(t) -> str: """Lowercased surface form (keeps plural 'headphones' vs lemma 'headphone').""" return (t.text or "").lower().strip() _PRICE_PREP_LEMMAS = frozenset({"under", "over", "below", "above", "within", "between", "near"}) def bucket_scheme_a_spacy(query: str, nlp) -> BucketResult: """ Dependency-first bucketing: noun_chunks alone mis-parse verbal queries like "noise cancelling headphones" (ROOT verb). Prefer dobj / ROOT product nouns, purpose PP (for …), and brand INTJ/PROPN. """ import spacy # noqa: F401 # Do not strip possessives ('s) before spaCy: it changes the parse tree # (e.g. "women's running shoes size 8" vs "women running shoes size 8"). text = (query or "").strip() doc = nlp(text) intersection: Set[str] = set() boost: Set[str] = set() drop: Set[str] = set() stops = nlp.Defaults.stop_words | WEAK_BOOST_ADJS def mark_drop(t) -> None: if not t.is_space and not t.is_punct: drop.add(t.text.lower()) # --- Drops: function words / question words --- for token in doc: if token.is_space or token.is_punct: continue lem = _lemma_lower(token) if token.pos_ in ("DET", "PRON", "AUX", "ADP", "PART", "SCONJ", "CCONJ"): mark_drop(token) continue if token.dep_ in FUNCTIONAL_DEP: mark_drop(token) continue if token.pos_ == "ADV" and lem in {"where", "how", "when", "why", "what", "which"}: mark_drop(token) continue if token.text.lower() in ("'s", "’s"): mark_drop(token) continue if lem in stops and token.pos_ != "PROPN": mark_drop(token) pobj_heads_to_demote: Set[int] = set() # Purpose / context: "for airplane travel" → boost phrase; demote bare head from intersection for token in doc: if token.dep_ == "prep" and token.text.lower() == "for": for c in token.children: if c.dep_ == "pobj" and c.pos_ in ("NOUN", "PROPN"): span = doc[c.left_edge.i : c.right_edge.i + 1] phrase = span.text.strip().lower() if phrase: boost.add(phrase) pobj_heads_to_demote.add(c.i) # Price / range: "under 500 dollars" → boost only for token in doc: if token.dep_ != "prep" or _lemma_lower(token) not in _PRICE_PREP_LEMMAS: continue for c in token.children: if c.dep_ == "pobj" and c.pos_ in ("NOUN", "PROPN"): span = doc[c.left_edge.i : c.right_edge.i + 1] phrase = span.text.strip().lower() if phrase: boost.add(phrase) pobj_heads_to_demote.add(c.i) # Direct object product nouns (handles "noise cancelling … headphones") for token in doc: if token.dep_ == "dobj" and token.pos_ in ("NOUN", "PROPN"): if token.i in pobj_heads_to_demote: continue intersection.add(_surface_lower(token)) # Copular questions / definitions: "what is the best smartphone …" for token in doc: if token.dep_ != "nsubj" or token.pos_ not in ("NOUN", "PROPN"): continue h = token.head if h.pos_ == "AUX" and h.dep_ == "ROOT": intersection.add(_surface_lower(token)) # Verbal ROOT: modifiers left of dobj → boost phrase (e.g. "noise cancelling") roots = [t for t in doc if t.dep_ == "ROOT"] if roots and roots[0].pos_ == "VERB": root_v = roots[0] for t in doc: if t.dep_ != "dobj" or t.pos_ not in ("NOUN", "PROPN"): continue if t.i in pobj_heads_to_demote: continue parts: List[str] = [] for x in doc[: t.i]: if x.is_punct or x.is_space: continue if x.pos_ in ("DET", "ADP", "PRON"): continue xl = _lemma_lower(x) if xl in stops: continue parts.append(x.text.lower()) if len(parts) >= 1: boost.add(" ".join(parts)) # Brand / query lead: INTJ/PROPN ROOT (e.g. Nike …) for token in doc: if token.dep_ == "ROOT" and token.pos_ in ("INTJ", "PROPN"): intersection.add(_surface_lower(token)) if token.pos_ == "PROPN": intersection.add(_surface_lower(token)) _DIMENSION_ROOTS = frozenset({"size", "width", "length", "height", "weight"}) # "women's running shoes size 8" → shoes ∩, "size 8" boost (not size alone) for token in doc: if token.dep_ != "ROOT" or token.pos_ != "NOUN": continue if _lemma_lower(token) not in _DIMENSION_ROOTS: continue for c in token.children: if c.dep_ == "nsubj" and c.pos_ in ("NOUN", "PROPN"): intersection.add(_surface_lower(c)) for ch in c.children: if ch.dep_ == "compound" and ch.pos_ in ("NOUN", "VERB", "ADJ"): boost.add(_surface_lower(ch)) # Only the dimension head + numbers (not full subtree: left_edge/right_edge is huge) dim_parts = [token.text.lower()] for ch in token.children: if ch.dep_ == "nummod": dim_parts.append(ch.text.lower()) boost.add(" ".join(dim_parts)) # ROOT noun product (e.g. "plastic toy car") for token in doc: if token.dep_ == "ROOT" and token.pos_ in ("NOUN", "PROPN"): if _lemma_lower(token) in _DIMENSION_ROOTS and any( c.dep_ == "nsubj" and c.pos_ in ("NOUN", "PROPN") for c in token.children ): continue intersection.add(_surface_lower(token)) for c in token.children: if c.dep_ == "compound" and c.pos_ == "NOUN": boost.add(c.text.lower()) if token.i - token.left_edge.i >= 1: comps = [x.text.lower() for x in doc[token.left_edge.i : token.i] if x.dep_ == "compound"] if len(comps) >= 2: boost.add(" ".join(comps)) # List-like INTJ head with multiple pobj: first pobj = product head, rest often demographic for token in doc: if token.dep_ != "ROOT" or token.pos_ not in ("INTJ", "VERB", "NOUN"): continue pobjs = sorted( [c for c in token.children if c.dep_ == "pobj" and c.pos_ in ("NOUN", "PROPN")], key=lambda x: x.i, ) if len(pobjs) >= 2 and token.pos_ == "INTJ": intersection.add(_surface_lower(pobjs[0])) for extra in pobjs[1:]: if _lemma_lower(extra) in _DEMOGRAPHIC_NOUNS: boost.add(_surface_lower(extra)) else: intersection.add(_surface_lower(extra)) elif len(pobjs) == 1 and token.pos_ == "INTJ": intersection.add(_surface_lower(pobjs[0])) # amod under pobj (running → shoes) for token in doc: if token.dep_ == "amod" and token.head.pos_ in ("NOUN", "PROPN"): if token.pos_ == "VERB": boost.add(_surface_lower(token)) elif token.pos_ == "ADJ": boost.add(_lemma_lower(token)) # Genitive possessor (women's shoes → women boost) for token in doc: if token.dep_ == "poss" and token.head.pos_ in ("NOUN", "PROPN"): boost.add(_surface_lower(token)) # noun_chunks fallback when no dobj/ROOT intersection yet if not intersection: for chunk in doc.noun_chunks: head = chunk.root if head.pos_ not in ("NOUN", "PROPN"): continue # Price / range: "under 500 dollars" → boost, not a product head if head.dep_ == "pobj" and head.head.dep_ == "prep": prep = head.head if _lemma_lower(prep) in _PRICE_PREP_LEMMAS: boost.add(chunk.text.strip().lower()) continue hl = _surface_lower(head) if hl: intersection.add(hl) for t in chunk: if t == head or t.pos_ != "PROPN": continue intersection.add(_surface_lower(t)) for t in chunk: if t == head: continue if t.pos_ == "ADJ" or (t.pos_ == "NOUN" and t.dep_ == "compound"): boost.add(_lemma_lower(t)) # Remove demoted pobj heads from intersection (purpose / price clause) for i in pobj_heads_to_demote: t = doc[i] intersection.discard(_lemma_lower(t)) intersection.discard(_surface_lower(t)) boost -= intersection boost = {b for b in boost if b.lower() not in stops and b.strip()} return BucketResult( intersection_terms=_dedupe_preserve(sorted(intersection)), boost_terms=_dedupe_preserve(sorted(boost)), drop_terms=_dedupe_preserve(sorted(drop)), ) # --- Scheme B: spaCy candidates + KeyBERT ----------------------------------- def _spacy_np_candidates(doc) -> List[str]: phrases: List[str] = [] for chunk in doc.noun_chunks: t = chunk.text.strip() if len(t) < 2: continue root = chunk.root if root.pos_ not in ("NOUN", "PROPN"): continue phrases.append(t) return phrases def bucket_scheme_b_keybert(query: str, nlp, kw_model) -> BucketResult: text = (query or "").strip() doc = nlp(text) candidates = _spacy_np_candidates(doc) if not candidates: candidates = [text] # KeyBERT API: candidate_keywords=... (sentence-transformers backend) try: keywords = kw_model.extract_keywords( text, candidates=candidates, top_n=min(8, max(4, len(candidates) + 2)), ) except TypeError: keywords = kw_model.extract_keywords( text, candidate_keywords=candidates, top_n=min(8, max(4, len(candidates) + 2)), ) ranked = [k[0].lower().strip() for k in (keywords or []) if k and k[0].strip()] intersection: List[str] = [] boost: List[str] = [] if ranked: intersection.append(ranked[0]) if len(ranked) > 1: boost.extend(ranked[1:]) # Add remaining spaCy heads not in lists heads: List[str] = [] for ch in doc.noun_chunks: h = ch.root if h.pos_ in ("NOUN", "PROPN"): heads.append(_surface_lower(h)) for h in heads: if h and h not in intersection and h not in boost: boost.append(h) if not intersection and heads: intersection.append(heads[0]) boost = [x for x in boost if x != heads[0]] drop_tokens: Set[str] = set() stops = nlp.Defaults.stop_words | WEAK_BOOST_ADJS for token in doc: if token.is_punct: continue lem = (token.lemma_ or token.text).lower() if token.pos_ in ("DET", "ADP", "PART", "PRON", "AUX") or lem in stops: drop_tokens.add(token.text.lower()) return BucketResult( intersection_terms=_dedupe_preserve(intersection), boost_terms=_dedupe_preserve(boost), drop_terms=sorted(drop_tokens), ) # --- Scheme C: YAKE + noun filter -------------------------------------------- def bucket_scheme_c_yake(query: str, nlp, yake_extractor) -> BucketResult: text = (query or "").strip() doc = nlp(text) kws = yake_extractor.extract_keywords(text) # List[Tuple[str, float]] newest yake API may differ scored: List[Tuple[str, float]] = [] if kws and isinstance(kws[0], (list, tuple)) and len(kws[0]) >= 2: scored = [(str(a).strip(), float(b)) for a, b in kws] else: # older yake returns list of tuples (kw, score) scored = [(str(x[0]).strip(), float(x[1])) for x in kws] boost: List[str] = [] intersection: List[str] = [] for phrase, _score in sorted(scored, key=lambda x: x[1]): # lower score = more important in YAKE phrase = phrase.lower().strip() if not phrase or len(phrase) < 2: continue sub = nlp(phrase) keep = False head_noun = False for t in sub: if t.is_punct or t.is_space: continue if t.pos_ in ("NOUN", "PROPN"): keep = True if t.dep_ == "ROOT" or t == sub[-1]: head_noun = True if not keep: continue # top 1–2 important → intersection (very small) if len(intersection) < 2 and head_noun and len(phrase.split()) <= 2: intersection.append(phrase) else: boost.append(phrase) drop: Set[str] = set() stops = nlp.Defaults.stop_words | WEAK_BOOST_ADJS for token in doc: if token.is_punct: continue lem = (token.lemma_ or token.text).lower() if token.pos_ in ("DET", "ADP", "PART", "PRON", "AUX") or lem in stops: drop.add(token.text.lower()) return BucketResult( intersection_terms=_dedupe_preserve(intersection), boost_terms=_dedupe_preserve(boost), drop_terms=sorted(drop), ) # --- CLI --------------------------------------------------------------------- DEFAULT_QUERIES = [ "best noise cancelling headphones for airplane travel", "nike running shoes women", "plastic toy car", "what is the best smartphone under 500 dollars", "women's running shoes size 8", ] def _load_spacy(): import spacy try: return spacy.load("en_core_web_sm") except OSError: print( "Missing model: run: python -m spacy download en_core_web_sm", file=sys.stderr, ) raise def _load_keybert(): from keybert import KeyBERT # small & fast for demo; swap for larger if needed return KeyBERT(model="paraphrase-MiniLM-L6-v2") def _load_yake(): import yake return yake.KeywordExtractor( lan="en", n=3, dedupLim=0.9, top=20, features=None, ) def main() -> None: parser = argparse.ArgumentParser(description="English query bucketing experiments") parser.add_argument( "--queries", nargs="*", default=DEFAULT_QUERIES, help="Queries to run (default: built-in examples)", ) parser.add_argument( "--scheme", choices=("a", "b", "c", "all"), default="all", ) args = parser.parse_args() nlp = _load_spacy() kb = None yk = None if args.scheme in ("b", "all"): kb = _load_keybert() if args.scheme in ("c", "all"): yk = _load_yake() for q in args.queries: print("=" * 72) print("QUERY:", q) print("-" * 72) if args.scheme in ("a", "all"): ra = bucket_scheme_a_spacy(q, nlp) print("A spaCy+rules:", json.dumps(ra.to_dict(), ensure_ascii=False)) if args.scheme in ("b", "all") and kb is not None: rb = bucket_scheme_b_keybert(q, nlp, kb) print("B spaCy+KeyBERT:", json.dumps(rb.to_dict(), ensure_ascii=False)) if args.scheme in ("c", "all") and yk is not None: rc = bucket_scheme_c_yake(q, nlp, yk) print("C YAKE+noun filter:", json.dumps(rc.to_dict(), ensure_ascii=False)) print() if __name__ == "__main__": main()