Blame view

query/english_keyword_extractor.py 8.53 KB
45b39796   tangwang   qp性能优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
  """
  Lightweight English core-term extraction for lexical keyword constraints.
  """
  
  from __future__ import annotations
  
  import logging
  from typing import List, Optional, Sequence, Set
  
  from .tokenization import normalize_query_text, simple_tokenize_query
  
  logger = logging.getLogger(__name__)
  
  _WEAK_BOOST_ADJS = frozenset(
      {
          "best",
          "good",
          "great",
          "new",
          "free",
          "cheap",
          "top",
          "fine",
          "real",
      }
  )
  
  _FUNCTIONAL_DEP = frozenset(
      {
          "det",
          "aux",
          "auxpass",
          "prep",
          "mark",
          "expl",
          "cc",
          "punct",
          "case",
      }
  )
  
  _DEMOGRAPHIC_NOUNS = frozenset(
      {
          "women",
          "woman",
          "men",
          "man",
          "kids",
          "kid",
          "boys",
          "boy",
          "girls",
          "girl",
          "baby",
          "babies",
          "toddler",
          "adult",
          "adults",
      }
  )
  
  _PRICE_PREP_LEMMAS = frozenset({"under", "over", "below", "above", "within", "between", "near"})
  _DIMENSION_ROOTS = frozenset({"size", "width", "length", "height", "weight"})
  
  
  def _dedupe_preserve(seq: Sequence[str]) -> List[str]:
      seen: Set[str] = set()
      out: List[str] = []
      for item in seq:
          normalized = normalize_query_text(item)
          if not normalized or normalized in seen:
              continue
          seen.add(normalized)
          out.append(normalized)
      return out
  
  
  def _lemma_lower(token) -> str:
      return ((token.lemma_ or token.text) or "").lower().strip()
  
  
  def _surface_lower(token) -> str:
      return (token.text or "").lower().strip()
  
  
  def _project_terms_to_query_tokens(query: str, terms: Sequence[str]) -> List[str]:
      simple_tokens = _dedupe_preserve(simple_tokenize_query(query))
      projected: List[str] = []
      for term in terms:
          normalized = normalize_query_text(term)
          if len(normalized) < 2 or normalized in _DEMOGRAPHIC_NOUNS:
              continue
          exact = next((token for token in simple_tokens if token == normalized), None)
          if exact is not None:
              projected.append(exact)
              continue
          partial = next(
              (
                  token
                  for token in simple_tokens
                  if len(normalized) >= 3 and normalized in token and token not in _DEMOGRAPHIC_NOUNS
              ),
              None,
          )
          if partial is not None:
              projected.append(partial)
              continue
          projected.append(normalized)
      return _dedupe_preserve(projected)
  
  
  class EnglishKeywordExtractor:
      """Extracts a small set of English core product terms with spaCy."""
  
      def __init__(self, nlp: Optional[object] = None) -> None:
          self._nlp = nlp if nlp is not None else self._load_nlp()
  
      @staticmethod
      def _load_nlp() -> Optional[object]:
          try:
              import spacy
  
              return spacy.load("en_core_web_sm", disable=["ner", "textcat"])
          except Exception as exc:
              logger.warning("English keyword extractor disabled; failed to load spaCy model: %s", exc)
              return None
  
      def extract_keywords(self, query: str) -> str:
          text = str(query or "").strip()
          if not text:
              return ""
          if self._nlp is None:
              return self._fallback_keywords(text)
          try:
              return self._extract_keywords_with_spacy(text)
          except Exception as exc:
              logger.warning("spaCy English keyword extraction failed; using fallback: %s", exc)
              return self._fallback_keywords(text)
  
      def _extract_keywords_with_spacy(self, query: str) -> str:
          doc = self._nlp(query)
          intersection: Set[str] = set()
          stops = self._nlp.Defaults.stop_words | _WEAK_BOOST_ADJS
          pobj_heads_to_demote: Set[int] = set()
  
          for token in doc:
              if token.dep_ == "prep" and token.text.lower() == "for":
                  for child in token.children:
                      if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN"):
                          pobj_heads_to_demote.add(child.i)
  
          for token in doc:
              if token.dep_ != "prep" or _lemma_lower(token) not in _PRICE_PREP_LEMMAS:
                  continue
              for child in token.children:
                  if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN"):
                      pobj_heads_to_demote.add(child.i)
  
          for token in doc:
              if token.dep_ == "dobj" and token.pos_ in ("NOUN", "PROPN") and token.i not in pobj_heads_to_demote:
                  intersection.add(_surface_lower(token))
  
          for token in doc:
              if token.dep_ == "nsubj" and token.pos_ in ("NOUN", "PROPN"):
                  head = token.head
                  if head.pos_ == "AUX" and head.dep_ == "ROOT":
                      intersection.add(_surface_lower(token))
  
          for token in doc:
              if token.dep_ == "ROOT" and token.pos_ in ("INTJ", "PROPN"):
                  intersection.add(_surface_lower(token))
              if token.pos_ == "PROPN":
                  if token.dep_ == "compound" and _lemma_lower(token.head) in _DEMOGRAPHIC_NOUNS:
                      continue
                  intersection.add(_surface_lower(token))
  
          for token in doc:
              if token.dep_ == "ROOT" and token.pos_ in ("NOUN", "PROPN"):
                  if _lemma_lower(token) in _DIMENSION_ROOTS:
                      for child in token.children:
                          if child.dep_ == "nsubj" and child.pos_ in ("NOUN", "PROPN"):
                              intersection.add(_surface_lower(child))
                      continue
                  if _lemma_lower(token) in _DEMOGRAPHIC_NOUNS:
                      for child in token.children:
                          if child.dep_ == "compound" and child.pos_ == "NOUN":
                              intersection.add(_surface_lower(child))
                      continue
                  if token.i in pobj_heads_to_demote:
                      continue
                  intersection.add(_surface_lower(token))
  
          for token in doc:
              if token.dep_ != "ROOT" or token.pos_ not in ("INTJ", "VERB", "NOUN"):
                  continue
              pobjs = sorted(
                  [child for child in token.children if child.dep_ == "pobj" and child.pos_ in ("NOUN", "PROPN")],
                  key=lambda item: item.i,
              )
              if len(pobjs) >= 2 and token.pos_ == "INTJ":
                  intersection.add(_surface_lower(pobjs[0]))
                  for extra in pobjs[1:]:
                      if _lemma_lower(extra) not in _DEMOGRAPHIC_NOUNS:
                          intersection.add(_surface_lower(extra))
              elif len(pobjs) == 1 and token.pos_ == "INTJ":
                  intersection.add(_surface_lower(pobjs[0]))
  
          if not intersection:
              for chunk in doc.noun_chunks:
                  head = chunk.root
                  if head.pos_ not in ("NOUN", "PROPN"):
                      continue
                  if head.dep_ == "pobj" and head.head.dep_ == "prep":
                      prep = head.head
                      if _lemma_lower(prep) in _PRICE_PREP_LEMMAS or prep.text.lower() == "for":
                          continue
                  head_text = _surface_lower(head)
                  if head_text:
                      intersection.add(head_text)
                  for token in chunk:
                      if token == head or token.pos_ != "PROPN":
                          continue
                      intersection.add(_surface_lower(token))
  
          core_terms = _dedupe_preserve(
              token.text.lower()
              for token in doc
              if _surface_lower(token) in intersection
              and _surface_lower(token) not in stops
              and _surface_lower(token) not in _DEMOGRAPHIC_NOUNS
              and token.dep_ not in _FUNCTIONAL_DEP
              and len(_surface_lower(token)) >= 2
          )
          projected_terms = _project_terms_to_query_tokens(query, core_terms)
          if projected_terms:
              return " ".join(projected_terms[:3])
          return self._fallback_keywords(query)
  
      def _fallback_keywords(self, query: str) -> str:
          tokens = [
              normalize_query_text(token)
              for token in simple_tokenize_query(query)
              if normalize_query_text(token)
          ]
          if not tokens:
              return ""
  
          filtered = [token for token in tokens if token not in _DEMOGRAPHIC_NOUNS]
          if not filtered:
              filtered = tokens
  
          # Keep the right-most likely product head plus one close modifier.
          head = filtered[-1]
          if len(filtered) >= 2:
              return " ".join(filtered[-2:])
          return head