english_keyword_extractor.py
8.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
Lightweight English core-term extraction for lexical keyword constraints.
"""
from __future__ import annotations
import logging
from typing import List, Optional, Sequence, Set
from .tokenization import normalize_query_text, simple_tokenize_query
logger = logging.getLogger(__name__)
_WEAK_BOOST_ADJS = frozenset(
{
"best",
"good",
"great",
"new",
"free",
"cheap",
"top",
"fine",
"real",
}
)
_FUNCTIONAL_DEP = frozenset(
{
"det",
"aux",
"auxpass",
"prep",
"mark",
"expl",
"cc",
"punct",
"case",
}
)
_DEMOGRAPHIC_NOUNS = frozenset(
{
"women",
"woman",
"men",
"man",
"kids",
"kid",
"boys",
"boy",
"girls",
"girl",
"baby",
"babies",
"toddler",
"adult",
"adults",
}
)
_PRICE_PREP_LEMMAS = frozenset({"under", "over", "below", "above", "within", "between", "near"})
_DIMENSION_ROOTS = frozenset({"size", "width", "length", "height", "weight"})
def _dedupe_preserve(seq: Sequence[str]) -> List[str]:
seen: Set[str] = set()
out: List[str] = []
for item in seq:
normalized = normalize_query_text(item)
if not normalized or normalized in seen:
continue
seen.add(normalized)
out.append(normalized)
return out
def _lemma_lower(token) -> str:
return ((token.lemma_ or token.text) or "").lower().strip()
def _surface_lower(token) -> str:
return (token.text or "").lower().strip()
def _project_terms_to_query_tokens(query: str, terms: Sequence[str]) -> List[str]:
simple_tokens = _dedupe_preserve(simple_tokenize_query(query))
projected: List[str] = []
for term in terms:
normalized = normalize_query_text(term)
if len(normalized) < 2 or normalized in _DEMOGRAPHIC_NOUNS:
continue
exact = next((token for token in simple_tokens if token == normalized), None)
if exact is not None:
projected.append(exact)
continue
partial = next(
(
token
for token in simple_tokens
if len(normalized) >= 3 and normalized in token and token not in _DEMOGRAPHIC_NOUNS
),
None,
)
if partial is not None:
projected.append(partial)
continue
projected.append(normalized)
return _dedupe_preserve(projected)
class EnglishKeywordExtractor:
"""Extracts a small set of English core product terms with spaCy."""
def __init__(self, nlp: Optional[object] = None) -> None:
self._nlp = nlp if nlp is not None else self._load_nlp()
@staticmethod
def _load_nlp() -> Optional[object]:
try:
import spacy
return spacy.load("en_core_web_sm", disable=["ner", "textcat"])
except Exception as exc:
logger.warning("English keyword extractor disabled; failed to load spaCy model: %s", exc)
return None
def extract_keywords(self, query: str) -> str:
text = str(query or "").strip()
if not text:
return ""
if self._nlp is None:
return self._fallback_keywords(text)
try:
return self._extract_keywords_with_spacy(text)
except Exception as exc:
logger.warning("spaCy English keyword extraction failed; using fallback: %s", exc)
return self._fallback_keywords(text)
def _extract_keywords_with_spacy(self, query: str) -> str:
doc = self._nlp(query)
intersection: Set[str] = set()
stops = self._nlp.Defaults.stop_words | _WEAK_BOOST_ADJS
pobj_heads_to_demote: Set[int] = set()
for token in doc:
if token.dep_ == "prep" and token.text.lower() == "for":
for child in token.children:
if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN"):
pobj_heads_to_demote.add(child.i)
for token in doc:
if token.dep_ != "prep" or _lemma_lower(token) not in _PRICE_PREP_LEMMAS:
continue
for child in token.children:
if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN"):
pobj_heads_to_demote.add(child.i)
for token in doc:
if token.dep_ == "dobj" and token.pos_ in ("NOUN", "PROPN") and token.i not in pobj_heads_to_demote:
intersection.add(_surface_lower(token))
for token in doc:
if token.dep_ == "nsubj" and token.pos_ in ("NOUN", "PROPN"):
head = token.head
if head.pos_ == "AUX" and head.dep_ == "ROOT":
intersection.add(_surface_lower(token))
for token in doc:
if token.dep_ == "ROOT" and token.pos_ in ("INTJ", "PROPN"):
intersection.add(_surface_lower(token))
if token.pos_ == "PROPN":
if token.dep_ == "compound" and _lemma_lower(token.head) in _DEMOGRAPHIC_NOUNS:
continue
intersection.add(_surface_lower(token))
for token in doc:
if token.dep_ == "ROOT" and token.pos_ in ("NOUN", "PROPN"):
if _lemma_lower(token) in _DIMENSION_ROOTS:
for child in token.children:
if child.dep_ == "nsubj" and child.pos_ in ("NOUN", "PROPN"):
intersection.add(_surface_lower(child))
continue
if _lemma_lower(token) in _DEMOGRAPHIC_NOUNS:
for child in token.children:
if child.dep_ == "compound" and child.pos_ == "NOUN":
intersection.add(_surface_lower(child))
continue
if token.i in pobj_heads_to_demote:
continue
intersection.add(_surface_lower(token))
for token in doc:
if token.dep_ != "ROOT" or token.pos_ not in ("INTJ", "VERB", "NOUN"):
continue
pobjs = sorted(
[child for child in token.children if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN")],
key=lambda item: item.i,
)
if len(pobjs) >= 2 and token.pos_ == "INTJ":
intersection.add(_surface_lower(pobjs[0]))
for extra in pobjs[1:]:
if _lemma_lower(extra) not in _DEMOGRAPHIC_NOUNS:
intersection.add(_surface_lower(extra))
elif len(pobjs) == 1 and token.pos_ == "INTJ":
intersection.add(_surface_lower(pobjs[0]))
if not intersection:
for chunk in doc.noun_chunks:
head = chunk.root
if head.pos_ not in ("NOUN", "PROPN"):
continue
if head.dep_ == "pobj" and head.head.dep_ == "prep":
prep = head.head
if _lemma_lower(prep) in _PRICE_PREP_LEMMAS or prep.text.lower() == "for":
continue
head_text = _surface_lower(head)
if head_text:
intersection.add(head_text)
for token in chunk:
if token == head or token.pos_ != "PROPN":
continue
intersection.add(_surface_lower(token))
core_terms = _dedupe_preserve(
token.text.lower()
for token in doc
if _surface_lower(token) in intersection
and _surface_lower(token) not in stops
and _surface_lower(token) not in _DEMOGRAPHIC_NOUNS
and token.dep_ not in _FUNCTIONAL_DEP
and len(_surface_lower(token)) >= 2
)
projected_terms = _project_terms_to_query_tokens(query, core_terms)
if projected_terms:
return " ".join(projected_terms[:3])
return self._fallback_keywords(query)
def _fallback_keywords(self, query: str) -> str:
tokens = [
normalize_query_text(token)
for token in simple_tokenize_query(query)
if normalize_query_text(token)
]
if not tokens:
return ""
filtered = [token for token in tokens if token not in _DEMOGRAPHIC_NOUNS]
if not filtered:
filtered = tokens
# Keep the right-most likely product head plus one close modifier.
head = filtered[-1]
if len(filtered) >= 2:
return " ".join(filtered[-2:])
return head