boost_strategy.py 21.8 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import json
from src.services.user_profile import UserProfile
from config.logging_config import get_app_logger
from google.protobuf.json_format import MessageToDict
import logging
from config.app_config import BOOST_CONFIGS, FRESH_BOOST_CONFIG, BOOST_WEIGHTS_CONFIG, FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY, FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY

logger = get_app_logger(__name__)

@dataclass
class BoostConfig:
    tag_id: int
    tag_name: str
    tag_type: Optional[str]
    boost_value: float
    es_intent_boost_value: float
    reranker_intent_boost_value: float
    intent_names: List[str]
    platform: List[str]



# 标签ID	标签名称	标签类型	提权幅度
# 156	行业新品	销售属性	1.1
# 157	爆品/时货	销售属性	1.1
# 158	常年热销	销售属性	1.1
# 159	质量好	销售属性	1.1
# 162	小惠商品	null	1.05
# 163	优惠商品	null	1.1
# 164	特惠商品	null	1.3
# 165	超惠商品	null	1.15

# 3	一箱快出	null
# 5	推荐	null
# 10	人气热销	null
# 14	特色精选	null
# 17	赠品(新)(补柜专区)	null
# 20	新品首发	null
# 21	0316-首发新品【新品页面专用】	null
# 25	0316essa新品-【新品页面专用】	null
# 26	essaone新品	null
# 27	0316最近上架(专区)	null
# 40	一箱	null
# 41	快出	null
# 42	上市新品(报表)&(专区)	null
# 43	9.20内销(专区)	null
# 82	半箱拼团	null

# # 季节性,打入到 关键词字段 做匹配
# 149	年货	销售时节
# 150	万圣节	销售时节
# 151	圣诞节	销售时节
# 152	开学季	销售时节
# 153	复活节	销售时节
# 154	三八节	销售时节
# 155	情人节	销售时节


# TODO 根据 前端参数 客户类型 销售区域 做提权
# 标签ID	标签名称	标签类型
# 137	东欧市场	销售区域
# 138	欧美市场	销售区域
# 139	南美市场	销售区域
# 140	中东市场	销售区域
# 141	东南亚市场	销售区域
# 142	综合商超	客户类型
# 143	专业商超	客户类型
# 144	品牌商	客户类型
# 145	公司批发商	客户类型
# 146	市场批发商	客户类型
# 147	电商	客户类型
# 148	赠品商	客户类型

class SearchBoostStrategy:
    def __init__(self):
        # Initialize boost configurations from config file
        self.boost_configs: List[BoostConfig] = [
            BoostConfig(
                config["tag_id"], 
                config["tag_name"], 
                config["tag_type"], 
                config["boost_value"],
                config["es_intent_boost_value"],
                config["reranker_intent_boost_value"],
                config["intent_names"],
                config["platform"]
            ) for config in BOOST_CONFIGS
        ]
        
        # Create lookup dictionaries for faster access
        self.tag_id_to_boost: Dict[int, float] = {
            config.tag_id: config.boost_value for config in self.boost_configs
        }
        
        self.tag_name_to_boost: Dict[str, float] = {
            config.tag_name: config.boost_value for config in self.boost_configs
        }
        
        # Create intent-based boost lookup for ES search
        self.intent_to_boost: Dict[str, float] = {}
        for config in self.boost_configs:
            for intent_name in config.intent_names:
                self.intent_to_boost[intent_name] = config.es_intent_boost_value
        
        logger.debug(f"Initialized boost configs: {json.dumps([vars(c) for c in self.boost_configs], ensure_ascii=False)}")

    def _get_platform_boost_configs(self, business_platform: Optional[str]) -> List[BoostConfig]:
        """
        Filters boost configurations based on the business platform.
        Returns a list of BoostConfig objects that match the platform.
        """
        if not business_platform:
            return self.boost_configs
        return [
            config for config in self.boost_configs
            if business_platform in config.platform
        ]

    def get_boost_query(self, user_profile: Optional[UserProfile] = None, label_field_name: Optional[str] = None, query_intents: Optional[List[str]] = None, business_platform: Optional[str] = None, search_context: Optional[Any] = None) -> dict:
        """
        Generate the Elasticsearch boost query based on configured boost values and user profiles.
        Returns a function_score query that only affects scoring without impacting recall.
        
        Args:
            user_profile: User profile for behavior-based boosting
            label_field_name: Field name for label-based boosting
            query_intents: Detected query intents for intent-based boosting
            business_platform: Business platform for platform-based filtering
            search_context: Search context containing business platform and sale category information
        """
        log_prefix = search_context.format_log_prefix() if search_context else ""
        functions = []
        
        # Initialize boost query counters using int array for better performance
        # boost_cnt[0]: tag_functions, boost_cnt[1]: fresh_functions, boost_cnt[2]: behavior_functions
        # boost_cnt[3]: brand_functions, boost_cnt[4]: category_functions, boost_cnt[5]: price_range_functions
        # boost_cnt[6]: video_functions, boost_cnt[7]: platform_category_functions
        boost_cnt = [0] * 8
        
        # Get platform-filtered boost configs
        platform_boost_configs = self._get_platform_boost_configs(business_platform)
        
        # Add boost for tag IDs - use dynamic field name and platform filtering
        if label_field_name:
            for config in platform_boost_configs:
                tag_id = config.tag_id
                boost_value = config.boost_value
                
                # Check if this tag should get intent-based boost
                final_boost_value = boost_value
                if query_intents:
                    # Check if any detected intent matches this tag's intent_names
                    for intent in query_intents:
                        if intent in config.intent_names:
                            final_boost_value = config.es_intent_boost_value
                            logger.debug(f"{log_prefix} Intent-based boost for tag_id {tag_id}: {boost_value} -> {final_boost_value} (intent: {intent})")
                            break
                
                functions.append({
                    "filter": {
                        "term": {
                            label_field_name: tag_id
                        }
                    },
                    "weight": final_boost_value
                })
                boost_cnt[0] += 1  # tag_functions
            logger.debug(f"{log_prefix} Added {boost_cnt[0]} tag-based boost functions using field: {label_field_name} for platform: {business_platform}")
            if query_intents:
                logger.info(f"{log_prefix} Applied intent-based boost for intents: {query_intents}")
        else:
            logger.warning(f"{log_prefix} Label field name is empty, cannot apply tag boost")
            logger.warning(f"{log_prefix} Tag boost functions will be skipped - label_field_name is required for dynamic field name")
            
        # Add fresh boost using exact sigmoid formula
        # Check if new product intent is detected and apply power factor
        fresh_factor = FRESH_BOOST_CONFIG["default_factor"]
        if query_intents:
            for intent in query_intents:
                if intent == FRESH_BOOST_CONFIG["new_product_intent"]:
                    fresh_factor = FRESH_BOOST_CONFIG["es_intent_factor"]
                    logger.debug(f"{log_prefix} New product intent detected: {intent}, applying ES fresh boost factor: {fresh_factor}")
                    break
        
        functions.append({
            "field_value_factor": {
                "field": "on_sell_days_boost",
                "missing": 1.0,
                "factor": fresh_factor
            }
        })
        boost_cnt[1] += 1  # fresh_functions
        logger.debug(f"{log_prefix} Added fresh boost function with factor: {fresh_factor}")

        # Add video boost
        functions.append({
            "filter": {
                "term": {
                    "is_video": True
                }
            },
            "weight": BOOST_WEIGHTS_CONFIG["video_boost_weight"]
        })
        boost_cnt[6] += 1  # video_functions
        logger.debug(f"{log_prefix} Added video boost function with weight: {BOOST_WEIGHTS_CONFIG['video_boost_weight']}")

        # ===== 平台类目排名提权 =====
        if search_context and hasattr(search_context, 'businessPlatform') and hasattr(search_context, 'sale_category_id'):
            if search_context.businessPlatform and search_context.sale_category_id:
                platform_cate_top_keyword = f"{search_context.businessPlatform}_{search_context.sale_category_id}"
                logger.debug(f"{log_prefix} Adding platform category ranking boost for keyword: {platform_cate_top_keyword}")
                functions.append({
                    "filter": {
                        "term": {
                            "op_ranking_platform_cate_list": platform_cate_top_keyword
                        }
                    },
                    "weight": BOOST_WEIGHTS_CONFIG["platform_category_ranking_weight"]
                })
                boost_cnt[7] += 1  # platform_category_functions
                logger.debug(f"{log_prefix} Added platform category ranking boost function for: {platform_cate_top_keyword}")
            else:
                logger.debug(f"{log_prefix} Skipping platform category boost - businessPlatform: {getattr(search_context, 'businessPlatform', 'None')}, sale_category_id: {getattr(search_context, 'sale_category_id', 'None')}")
        else:
            logger.debug(f"{log_prefix} Skipping platform category boost - search_context not provided or missing required fields")

        # ===== 用户画像个性化提权 =====
        # 基于用户画像信息进行个性化商品推荐,提高搜索结果的个性化匹配度
        # 包括:用户行为、品牌偏好、类目偏好、价格偏好、客户商品结构等维度
        if user_profile:
            logger.debug(f"{log_prefix} Adding biz boosting based on user profile")
            logger.debug(f"{log_prefix} User profile base info: {MessageToDict(user_profile.base_info)}")
            # logger.debug(f"User profile statistics: {MessageToDict(user_profile.statistics)}")
            
            # Add detailed debug logging for statistics
            if logger.isEnabledFor(logging.DEBUG):
                logger.debug(f"{log_prefix} User profile statistics:")
                stats_dict = MessageToDict(user_profile.statistics)
                for key, value in stats_dict.items():
                    if isinstance(value, list):
                        logger.debug(f"{log_prefix} Statistics {key}: {len(value)} items, first item: {value[0] if value else 'None'}")
                    else:
                        logger.debug(f"{log_prefix} Statistics {key}: {value}")
            
            # ===== 用户行为提权 =====
            # 逻辑:从用户画像中提取行为记录(点击、加购、收藏、购买)
            # 限制:最多使用前N个行为记录,避免过多记录影响性能
            behavior_map = user_profile.behavior_map
            # logger.debug(f"User behavior map: {MessageToDict(behavior_map)}")
            
            # Add detailed debug logging for behavior map
            if logger.isEnabledFor(logging.DEBUG):
                logger.debug(f"{log_prefix} User behavior map:")
                behavior_dict = MessageToDict(behavior_map)
                for behavior_type, behaviors in behavior_dict.items():
                    if isinstance(behaviors, list):
                        logger.debug(f"{log_prefix} Behavior {behavior_type}: {len(behaviors)} items, first item: {behaviors[0] if behaviors else 'None'}")
                    else:
                        logger.debug(f"{log_prefix} Behavior {behavior_type}: {behaviors}")

            max_behavior_count_for_boost = BOOST_WEIGHTS_CONFIG["max_behavior_count_for_boost"]

            for behavior_type in ['click', 'add_cart', 'collect', 'purchase']:
                behaviors = getattr(behavior_map, behavior_type, [])
                if behaviors:
                    sku_ids = [b.skuId for b in behaviors[:max_behavior_count_for_boost]]
                    logger.debug(f"{log_prefix} Adding boost for {behavior_type} behaviors with {len(sku_ids)} SKUs: {sku_ids[:10]}")
                    functions.append({
                        "filter": {
                            "terms": {
                                "sku_id": sku_ids
                            }
                        },
                        "weight": BOOST_WEIGHTS_CONFIG["user_behavior_weight"]
                    })
                    boost_cnt[2] += 1  # behavior_functions

            # ===== 品牌偏好提权 =====
            # 目的:基于用户偏好的品牌推荐商品,提高个性化匹配度
            # 逻辑:从用户画像base_info中提取brandCategoryIds,对相关品牌商品进行提权
            # 权重:从配置文件读取,默认1.1倍
            if user_profile.base_info.brandCategoryIds:
                brand_ids = [x for x in user_profile.base_info.brandCategoryIds]
                logger.debug(f"{log_prefix} Adding boost for brand preferences with {len(brand_ids)} brand_ids {brand_ids[:10]}")
                functions.append({
                    "filter": {
                        "terms": {
                            "brand_id": brand_ids
                        }
                    },
                    "weight": BOOST_WEIGHTS_CONFIG["brand_preference_weight"]
                })
                boost_cnt[3] += 1  # brand_functions

            # ===== 类目偏好提权 =====
            # 目的:基于用户偏好的商品类目推荐相关商品,提高个性化匹配度
            # 逻辑:从用户画像statistics中提取category_group,对相关类目商品进行提权
            # 权重:从配置文件读取,默认1.08倍
            # 注意:当前功能已禁用,如需启用请将if False改为if True
            if False:
                if user_profile.statistics.category_group:
                    category_ids = [stat.keyId for stat in user_profile.statistics.category_group]
                    category_stats = [MessageToDict(stat) for stat in user_profile.statistics.category_group]
                    logger.debug(f"{log_prefix} Category preferences stats with {len(category_ids)} category_ids {category_ids[:10]}")
                    logger.debug(f"{log_prefix} Adding boost for category preferences with {len(category_ids)} category_ids {category_ids[:10]}")
                    functions.append({
                        "filter": {
                            "terms": {
                                "category_id": category_ids
                            }
                        },
                        "weight": BOOST_WEIGHTS_CONFIG["category_preference_weight"]
                    })
                    boost_cnt[4] += 1  # category_functions

            # ===== 价格区间偏好提权 =====
            # 目的:基于用户偏好的价格区间推荐相关商品,提高个性化匹配度
            # 逻辑:从用户画像statistics中提取price_group,对相关价格区间商品进行提权
            # 权重:从配置文件读取,默认1.1倍
            # 注意:当前功能已禁用,如需启用请将if False改为if True
            if False:
                if user_profile.statistics.price_group:
                    price_ranges = [stat.keyId for stat in user_profile.statistics.price_group]
                    price_stats = [MessageToDict(stat) for stat in user_profile.statistics.price_group]
                    logger.debug(f"{log_prefix} Price range preferences stats: {price_stats}")
                    logger.debug(f"{log_prefix} Adding boost for price range preferences: {price_ranges}")
                    functions.append({
                        "filter": {
                            "terms": {
                                "price_range": price_ranges
                            }
                        },
                        "weight": BOOST_WEIGHTS_CONFIG["price_range_preference_weight"]
                    })
                    boost_cnt[5] += 1  # price_range_functions

            # ===== 客户商品结构类目提权 =====
            # 目的:基于客户商品结构分析,推荐符合客户业务模式的类目商品
            # 逻辑:从用户画像base_info中提取customerGoodsStructure,分析客户的类目偏好
            # 权重:从配置文件读取,默认1.08倍
            # 注意:categoryIds对应前端类目,不是ES的category_id字段
            if user_profile.base_info.customerGoodsStructure:
                structure_list = [MessageToDict(s) for s in user_profile.base_info.customerGoodsStructure]
                logger.debug(f"{log_prefix} Customer goods structure details: {structure_list}")
                for structure in user_profile.base_info.customerGoodsStructure:
                    if structure.categoryIds:
                        logger.debug(f"{log_prefix} Adding boost for category IDs in structure length {len(structure.categoryIds)} category_ids {structure.categoryIds[:10]}")
                        functions.append({
                            "filter": {
                                "terms": {
                                    # 注意: user_profile.base_info.customerGoodsStructure.categoryIds 对应的是前端类目 而不是 ES 的 category_id
                                    "sale_category_all": [x for x in structure.categoryIds]
                                }
                            },
                            "weight": BOOST_WEIGHTS_CONFIG["customer_structure_category_weight"]
                        })
                        boost_cnt[4] += 1  # category_functions
                    if structure.priceBetween:
                        # logger.debug(f"Adding boost for price range in structure: {structure.priceBetween}")
                        # not support yet
                        pass
        
        # Calculate total functions count
        total_functions = len(functions)
        
        # Log boost query statistics
        logger.info(f"{log_prefix} ===== ES查询提权函数统计 =====")
        logger.info(f"{log_prefix} 总提权函数数量: {total_functions}")
        logger.info(f"{log_prefix} 标签提权函数: {boost_cnt[0]}")
        logger.info(f"{log_prefix} 新品提权函数: {boost_cnt[1]}")
        logger.info(f"{log_prefix} 行为提权函数: {boost_cnt[2]}")
        logger.info(f"{log_prefix} 品牌提权函数: {boost_cnt[3]}")
        logger.info(f"{log_prefix} 类目提权函数: {boost_cnt[4]}")
        logger.info(f"{log_prefix} 价格区间提权函数: {boost_cnt[5]}")
        logger.info(f"{log_prefix} 视频提权函数: {boost_cnt[6]}")
        logger.info(f"{log_prefix} 平台类目排名提权函数: {boost_cnt[7]}")
        logger.info(f"{log_prefix} ===== ES查询提权函数统计结束 =====")
        
        if not functions:
            logger.debug(f"{log_prefix} No boost functions generated")
            return {}
        
        score_mode = FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY if search_context.search_query or search_context.query else FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY

        boost_query = {
            "function_score": {
                "functions": functions,
                "score_mode": score_mode,
                "boost_mode": "multiply"
            }
        }
        
        # logger.debug(f"Generated boost query: {json.dumps(boost_query, ensure_ascii=False)}")
        return boost_query

    def get_boost_value(self, tag_id: Optional[int] = None, tag_name: Optional[str] = None, platform: Optional[str] = None) -> float:
        """
        Get the boost value for a given tag ID or name.
        Returns 1.0 if no boost is configured or if platform doesn't match.
        
        Args:
            tag_id: Tag ID to look up
            tag_name: Tag name to look up
            platform: Business platform for filtering
        """
        if tag_id is not None:
            for config in self.boost_configs:
                if config.tag_id == tag_id:
                    # Check platform compatibility
                    if platform and config.platform != platform:
                        logger.debug(f"Platform mismatch for tag_id {tag_id}: requested platform {platform}, tag platform {config.platform}")
                        return 1.0
                    logger.debug(f"Found boost value {config.boost_value} for tag_id {tag_id}")
                    return config.boost_value
                    
        if tag_name is not None:
            for config in self.boost_configs:
                if config.tag_name == tag_name:
                    # Check platform compatibility
                    if platform and config.platform != platform:
                        logger.debug(f"Platform mismatch for tag_name {tag_name}: requested platform {platform}, tag platform {config.platform}")
                        return 1.0
                    logger.debug(f"Found boost value {config.boost_value} for tag_name {tag_name}")
                    return config.boost_value
                    
        logger.debug(f"No boost value found for tag_id={tag_id}, tag_name={tag_name}, platform={platform}")
        return 1.0