Blame view

boost_strategy.py 21.8 KB
5ab1c29c   tangwang   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
  from typing import Dict, List, Optional, Any

  from dataclasses import dataclass

  import json

  from src.services.user_profile import UserProfile

  from config.logging_config import get_app_logger

  from google.protobuf.json_format import MessageToDict

  import logging

  from config.app_config import BOOST_CONFIGS, FRESH_BOOST_CONFIG, BOOST_WEIGHTS_CONFIG, FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY, FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY

  

  logger = get_app_logger(__name__)

  

  @dataclass

  class BoostConfig:

      tag_id: int

      tag_name: str

      tag_type: Optional[str]

      boost_value: float

      es_intent_boost_value: float

      reranker_intent_boost_value: float

      intent_names: List[str]

      platform: List[str]

  

  

  

  # 标签ID	标签名称	标签类型	提权幅度

  # 156	行业新品	销售属性	1.1

  # 157	爆品/时货	销售属性	1.1

  # 158	常年热销	销售属性	1.1

  # 159	质量好	销售属性	1.1

  # 162	小惠商品	null	1.05

  # 163	优惠商品	null	1.1

  # 164	特惠商品	null	1.3

  # 165	超惠商品	null	1.15

  

  # 3	一箱快出	null

  # 5	推荐	null

  # 10	人气热销	null

  # 14	特色精选	null

  # 17	赠品(新)(补柜专区)	null

  # 20	新品首发	null

  # 21	0316-首发新品【新品页面专用】	null

  # 25	0316essa新品-【新品页面专用】	null

  # 26	essaone新品	null

  # 27	0316最近上架(专区)	null

  # 40	一箱	null

  # 41	快出	null

  # 42	上市新品(报表)&(专区)	null

  # 43	9.20内销(专区)	null

  # 82	半箱拼团	null

  

  # # 季节性,打入到 关键词字段 做匹配

  # 149	年货	销售时节

  # 150	万圣节	销售时节

  # 151	圣诞节	销售时节

  # 152	开学季	销售时节

  # 153	复活节	销售时节

  # 154	三八节	销售时节

  # 155	情人节	销售时节

  

  

  # TODO 根据 前端参数 客户类型 销售区域 做提权

  # 标签ID	标签名称	标签类型

  # 137	东欧市场	销售区域

  # 138	欧美市场	销售区域

  # 139	南美市场	销售区域

  # 140	中东市场	销售区域

  # 141	东南亚市场	销售区域

  # 142	综合商超	客户类型

  # 143	专业商超	客户类型

  # 144	品牌商	客户类型

  # 145	公司批发商	客户类型

  # 146	市场批发商	客户类型

  # 147	电商	客户类型

  # 148	赠品商	客户类型

  

  class SearchBoostStrategy:

      def __init__(self):

          # Initialize boost configurations from config file

          self.boost_configs: List[BoostConfig] = [

              BoostConfig(

                  config["tag_id"], 

                  config["tag_name"], 

                  config["tag_type"], 

                  config["boost_value"],

                  config["es_intent_boost_value"],

                  config["reranker_intent_boost_value"],

                  config["intent_names"],

                  config["platform"]

              ) for config in BOOST_CONFIGS

          ]

          

          # Create lookup dictionaries for faster access

          self.tag_id_to_boost: Dict[int, float] = {

              config.tag_id: config.boost_value for config in self.boost_configs

          }

          

          self.tag_name_to_boost: Dict[str, float] = {

              config.tag_name: config.boost_value for config in self.boost_configs

          }

          

          # Create intent-based boost lookup for ES search

          self.intent_to_boost: Dict[str, float] = {}

          for config in self.boost_configs:

              for intent_name in config.intent_names:

                  self.intent_to_boost[intent_name] = config.es_intent_boost_value

          

          logger.debug(f"Initialized boost configs: {json.dumps([vars(c) for c in self.boost_configs], ensure_ascii=False)}")

  

      def _get_platform_boost_configs(self, business_platform: Optional[str]) -> List[BoostConfig]:

          """

          Filters boost configurations based on the business platform.

          Returns a list of BoostConfig objects that match the platform.

          """

          if not business_platform:

              return self.boost_configs

          return [

              config for config in self.boost_configs

              if business_platform in config.platform

          ]

  

      def get_boost_query(self, user_profile: Optional[UserProfile] = None, label_field_name: Optional[str] = None, query_intents: Optional[List[str]] = None, business_platform: Optional[str] = None, search_context: Optional[Any] = None) -> dict:

          """

          Generate the Elasticsearch boost query based on configured boost values and user profiles.

          Returns a function_score query that only affects scoring without impacting recall.

          

          Args:

              user_profile: User profile for behavior-based boosting

              label_field_name: Field name for label-based boosting

              query_intents: Detected query intents for intent-based boosting

              business_platform: Business platform for platform-based filtering

              search_context: Search context containing business platform and sale category information

          """

          log_prefix = search_context.format_log_prefix() if search_context else ""

          functions = []

          

          # Initialize boost query counters using int array for better performance

          # boost_cnt[0]: tag_functions, boost_cnt[1]: fresh_functions, boost_cnt[2]: behavior_functions

          # boost_cnt[3]: brand_functions, boost_cnt[4]: category_functions, boost_cnt[5]: price_range_functions

          # boost_cnt[6]: video_functions, boost_cnt[7]: platform_category_functions

          boost_cnt = [0] * 8

          

          # Get platform-filtered boost configs

          platform_boost_configs = self._get_platform_boost_configs(business_platform)

          

          # Add boost for tag IDs - use dynamic field name and platform filtering

          if label_field_name:

              for config in platform_boost_configs:

                  tag_id = config.tag_id

                  boost_value = config.boost_value

                  

                  # Check if this tag should get intent-based boost

                  final_boost_value = boost_value

                  if query_intents:

                      # Check if any detected intent matches this tag's intent_names

                      for intent in query_intents:

                          if intent in config.intent_names:

                              final_boost_value = config.es_intent_boost_value

                              logger.debug(f"{log_prefix} Intent-based boost for tag_id {tag_id}: {boost_value} -> {final_boost_value} (intent: {intent})")

                              break

                  

                  functions.append({

                      "filter": {

                          "term": {

                              label_field_name: tag_id

                          }

                      },

                      "weight": final_boost_value

                  })

                  boost_cnt[0] += 1  # tag_functions

              logger.debug(f"{log_prefix} Added {boost_cnt[0]} tag-based boost functions using field: {label_field_name} for platform: {business_platform}")

              if query_intents:

                  logger.info(f"{log_prefix} Applied intent-based boost for intents: {query_intents}")

          else:

              logger.warning(f"{log_prefix} Label field name is empty, cannot apply tag boost")

              logger.warning(f"{log_prefix} Tag boost functions will be skipped - label_field_name is required for dynamic field name")

              

          # Add fresh boost using exact sigmoid formula

          # Check if new product intent is detected and apply power factor

          fresh_factor = FRESH_BOOST_CONFIG["default_factor"]

          if query_intents:

              for intent in query_intents:

                  if intent == FRESH_BOOST_CONFIG["new_product_intent"]:

                      fresh_factor = FRESH_BOOST_CONFIG["es_intent_factor"]

                      logger.debug(f"{log_prefix} New product intent detected: {intent}, applying ES fresh boost factor: {fresh_factor}")

                      break

          

          functions.append({

              "field_value_factor": {

                  "field": "on_sell_days_boost",

                  "missing": 1.0,

                  "factor": fresh_factor

              }

          })

          boost_cnt[1] += 1  # fresh_functions

          logger.debug(f"{log_prefix} Added fresh boost function with factor: {fresh_factor}")

  

          # Add video boost

          functions.append({

              "filter": {

                  "term": {

                      "is_video": True

                  }

              },

              "weight": BOOST_WEIGHTS_CONFIG["video_boost_weight"]

          })

          boost_cnt[6] += 1  # video_functions

          logger.debug(f"{log_prefix} Added video boost function with weight: {BOOST_WEIGHTS_CONFIG['video_boost_weight']}")

  

          # ===== 平台类目排名提权 =====

          if search_context and hasattr(search_context, 'businessPlatform') and hasattr(search_context, 'sale_category_id'):

              if search_context.businessPlatform and search_context.sale_category_id:

                  platform_cate_top_keyword = f"{search_context.businessPlatform}_{search_context.sale_category_id}"

                  logger.debug(f"{log_prefix} Adding platform category ranking boost for keyword: {platform_cate_top_keyword}")

                  functions.append({

                      "filter": {

                          "term": {

                              "op_ranking_platform_cate_list": platform_cate_top_keyword

                          }

                      },

                      "weight": BOOST_WEIGHTS_CONFIG["platform_category_ranking_weight"]

                  })

                  boost_cnt[7] += 1  # platform_category_functions

                  logger.debug(f"{log_prefix} Added platform category ranking boost function for: {platform_cate_top_keyword}")

              else:

                  logger.debug(f"{log_prefix} Skipping platform category boost - businessPlatform: {getattr(search_context, 'businessPlatform', 'None')}, sale_category_id: {getattr(search_context, 'sale_category_id', 'None')}")

          else:

              logger.debug(f"{log_prefix} Skipping platform category boost - search_context not provided or missing required fields")

  

          # ===== 用户画像个性化提权 =====

          # 基于用户画像信息进行个性化商品推荐,提高搜索结果的个性化匹配度

          # 包括:用户行为、品牌偏好、类目偏好、价格偏好、客户商品结构等维度

          if user_profile:

              logger.debug(f"{log_prefix} Adding biz boosting based on user profile")

              logger.debug(f"{log_prefix} User profile base info: {MessageToDict(user_profile.base_info)}")

              # logger.debug(f"User profile statistics: {MessageToDict(user_profile.statistics)}")

              

              # Add detailed debug logging for statistics

              if logger.isEnabledFor(logging.DEBUG):

                  logger.debug(f"{log_prefix} User profile statistics:")

                  stats_dict = MessageToDict(user_profile.statistics)

                  for key, value in stats_dict.items():

                      if isinstance(value, list):

                          logger.debug(f"{log_prefix} Statistics {key}: {len(value)} items, first item: {value[0] if value else 'None'}")

                      else:

                          logger.debug(f"{log_prefix} Statistics {key}: {value}")

              

              # ===== 用户行为提权 =====

              # 逻辑:从用户画像中提取行为记录(点击、加购、收藏、购买)

              # 限制:最多使用前N个行为记录,避免过多记录影响性能

              behavior_map = user_profile.behavior_map

              # logger.debug(f"User behavior map: {MessageToDict(behavior_map)}")

              

              # Add detailed debug logging for behavior map

              if logger.isEnabledFor(logging.DEBUG):

                  logger.debug(f"{log_prefix} User behavior map:")

                  behavior_dict = MessageToDict(behavior_map)

                  for behavior_type, behaviors in behavior_dict.items():

                      if isinstance(behaviors, list):

                          logger.debug(f"{log_prefix} Behavior {behavior_type}: {len(behaviors)} items, first item: {behaviors[0] if behaviors else 'None'}")

                      else:

                          logger.debug(f"{log_prefix} Behavior {behavior_type}: {behaviors}")

  

              max_behavior_count_for_boost = BOOST_WEIGHTS_CONFIG["max_behavior_count_for_boost"]

  

              for behavior_type in ['click', 'add_cart', 'collect', 'purchase']:

                  behaviors = getattr(behavior_map, behavior_type, [])

                  if behaviors:

                      sku_ids = [b.skuId for b in behaviors[:max_behavior_count_for_boost]]

                      logger.debug(f"{log_prefix} Adding boost for {behavior_type} behaviors with {len(sku_ids)} SKUs: {sku_ids[:10]}")

                      functions.append({

                          "filter": {

                              "terms": {

                                  "sku_id": sku_ids

                              }

                          },

                          "weight": BOOST_WEIGHTS_CONFIG["user_behavior_weight"]

                      })

                      boost_cnt[2] += 1  # behavior_functions

  

              # ===== 品牌偏好提权 =====

              # 目的:基于用户偏好的品牌推荐商品,提高个性化匹配度

              # 逻辑:从用户画像base_info中提取brandCategoryIds,对相关品牌商品进行提权

              # 权重:从配置文件读取,默认1.1倍

              if user_profile.base_info.brandCategoryIds:

                  brand_ids = [x for x in user_profile.base_info.brandCategoryIds]

                  logger.debug(f"{log_prefix} Adding boost for brand preferences with {len(brand_ids)} brand_ids {brand_ids[:10]}")

                  functions.append({

                      "filter": {

                          "terms": {

                              "brand_id": brand_ids

                          }

                      },

                      "weight": BOOST_WEIGHTS_CONFIG["brand_preference_weight"]

                  })

                  boost_cnt[3] += 1  # brand_functions

  

              # ===== 类目偏好提权 =====

              # 目的:基于用户偏好的商品类目推荐相关商品,提高个性化匹配度

              # 逻辑:从用户画像statistics中提取category_group,对相关类目商品进行提权

              # 权重:从配置文件读取,默认1.08倍

              # 注意:当前功能已禁用,如需启用请将if False改为if True

              if False:

                  if user_profile.statistics.category_group:

                      category_ids = [stat.keyId for stat in user_profile.statistics.category_group]

                      category_stats = [MessageToDict(stat) for stat in user_profile.statistics.category_group]

                      logger.debug(f"{log_prefix} Category preferences stats with {len(category_ids)} category_ids {category_ids[:10]}")

                      logger.debug(f"{log_prefix} Adding boost for category preferences with {len(category_ids)} category_ids {category_ids[:10]}")

                      functions.append({

                          "filter": {

                              "terms": {

                                  "category_id": category_ids

                              }

                          },

                          "weight": BOOST_WEIGHTS_CONFIG["category_preference_weight"]

                      })

                      boost_cnt[4] += 1  # category_functions

  

              # ===== 价格区间偏好提权 =====

              # 目的:基于用户偏好的价格区间推荐相关商品,提高个性化匹配度

              # 逻辑:从用户画像statistics中提取price_group,对相关价格区间商品进行提权

              # 权重:从配置文件读取,默认1.1倍

              # 注意:当前功能已禁用,如需启用请将if False改为if True

              if False:

                  if user_profile.statistics.price_group:

                      price_ranges = [stat.keyId for stat in user_profile.statistics.price_group]

                      price_stats = [MessageToDict(stat) for stat in user_profile.statistics.price_group]

                      logger.debug(f"{log_prefix} Price range preferences stats: {price_stats}")

                      logger.debug(f"{log_prefix} Adding boost for price range preferences: {price_ranges}")

                      functions.append({

                          "filter": {

                              "terms": {

                                  "price_range": price_ranges

                              }

                          },

                          "weight": BOOST_WEIGHTS_CONFIG["price_range_preference_weight"]

                      })

                      boost_cnt[5] += 1  # price_range_functions

  

              # ===== 客户商品结构类目提权 =====

              # 目的:基于客户商品结构分析,推荐符合客户业务模式的类目商品

              # 逻辑:从用户画像base_info中提取customerGoodsStructure,分析客户的类目偏好

              # 权重:从配置文件读取,默认1.08倍

              # 注意:categoryIds对应前端类目,不是ES的category_id字段

              if user_profile.base_info.customerGoodsStructure:

                  structure_list = [MessageToDict(s) for s in user_profile.base_info.customerGoodsStructure]

                  logger.debug(f"{log_prefix} Customer goods structure details: {structure_list}")

                  for structure in user_profile.base_info.customerGoodsStructure:

                      if structure.categoryIds:

                          logger.debug(f"{log_prefix} Adding boost for category IDs in structure length {len(structure.categoryIds)} category_ids {structure.categoryIds[:10]}")

                          functions.append({

                              "filter": {

                                  "terms": {

                                      # 注意: user_profile.base_info.customerGoodsStructure.categoryIds 对应的是前端类目 而不是 ES 的 category_id

                                      "sale_category_all": [x for x in structure.categoryIds]

                                  }

                              },

                              "weight": BOOST_WEIGHTS_CONFIG["customer_structure_category_weight"]

                          })

                          boost_cnt[4] += 1  # category_functions

                      if structure.priceBetween:

                          # logger.debug(f"Adding boost for price range in structure: {structure.priceBetween}")

                          # not support yet

                          pass

          

          # Calculate total functions count

          total_functions = len(functions)

          

          # Log boost query statistics

          logger.info(f"{log_prefix} ===== ES查询提权函数统计 =====")

          logger.info(f"{log_prefix} 总提权函数数量: {total_functions}")

          logger.info(f"{log_prefix} 标签提权函数: {boost_cnt[0]}")

          logger.info(f"{log_prefix} 新品提权函数: {boost_cnt[1]}")

          logger.info(f"{log_prefix} 行为提权函数: {boost_cnt[2]}")

          logger.info(f"{log_prefix} 品牌提权函数: {boost_cnt[3]}")

          logger.info(f"{log_prefix} 类目提权函数: {boost_cnt[4]}")

          logger.info(f"{log_prefix} 价格区间提权函数: {boost_cnt[5]}")

          logger.info(f"{log_prefix} 视频提权函数: {boost_cnt[6]}")

          logger.info(f"{log_prefix} 平台类目排名提权函数: {boost_cnt[7]}")

          logger.info(f"{log_prefix} ===== ES查询提权函数统计结束 =====")

          

          if not functions:

              logger.debug(f"{log_prefix} No boost functions generated")

              return {}

          

          score_mode = FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY if search_context.search_query or search_context.query else FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY

  

          boost_query = {

              "function_score": {

                  "functions": functions,

                  "score_mode": score_mode,

                  "boost_mode": "multiply"

              }

          }

          

          # logger.debug(f"Generated boost query: {json.dumps(boost_query, ensure_ascii=False)}")

          return boost_query

  

      def get_boost_value(self, tag_id: Optional[int] = None, tag_name: Optional[str] = None, platform: Optional[str] = None) -> float:

          """

          Get the boost value for a given tag ID or name.

          Returns 1.0 if no boost is configured or if platform doesn't match.

          

          Args:

              tag_id: Tag ID to look up

              tag_name: Tag name to look up

              platform: Business platform for filtering

          """

          if tag_id is not None:

              for config in self.boost_configs:

                  if config.tag_id == tag_id:

                      # Check platform compatibility

                      if platform and config.platform != platform:

                          logger.debug(f"Platform mismatch for tag_id {tag_id}: requested platform {platform}, tag platform {config.platform}")

                          return 1.0

                      logger.debug(f"Found boost value {config.boost_value} for tag_id {tag_id}")

                      return config.boost_value

                      

          if tag_name is not None:

              for config in self.boost_configs:

                  if config.tag_name == tag_name:

                      # Check platform compatibility

                      if platform and config.platform != platform:

                          logger.debug(f"Platform mismatch for tag_name {tag_name}: requested platform {platform}, tag platform {config.platform}")

                          return 1.0

                      logger.debug(f"Found boost value {config.boost_value} for tag_name {tag_name}")

                      return config.boost_value

                      

          logger.debug(f"No boost value found for tag_id={tag_id}, tag_name={tag_name}, platform={platform}")

          return 1.0