from typing import Dict, List, Optional, Any from dataclasses import dataclass import json from src.services.user_profile import UserProfile from config.logging_config import get_app_logger from google.protobuf.json_format import MessageToDict import logging from config.app_config import BOOST_CONFIGS, FRESH_BOOST_CONFIG, BOOST_WEIGHTS_CONFIG, FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY, FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY logger = get_app_logger(__name__) @dataclass class BoostConfig: tag_id: int tag_name: str tag_type: Optional[str] boost_value: float es_intent_boost_value: float reranker_intent_boost_value: float intent_names: List[str] platform: List[str] # 标签ID 标签名称 标签类型 提权幅度 # 156 行业新品 销售属性 1.1 # 157 爆品/时货 销售属性 1.1 # 158 常年热销 销售属性 1.1 # 159 质量好 销售属性 1.1 # 162 小惠商品 null 1.05 # 163 优惠商品 null 1.1 # 164 特惠商品 null 1.3 # 165 超惠商品 null 1.15 # 3 一箱快出 null # 5 推荐 null # 10 人气热销 null # 14 特色精选 null # 17 赠品(新)(补柜专区) null # 20 新品首发 null # 21 0316-首发新品【新品页面专用】 null # 25 0316essa新品-【新品页面专用】 null # 26 essaone新品 null # 27 0316最近上架(专区) null # 40 一箱 null # 41 快出 null # 42 上市新品(报表)&(专区) null # 43 9.20内销(专区) null # 82 半箱拼团 null # # 季节性,打入到 关键词字段 做匹配 # 149 年货 销售时节 # 150 万圣节 销售时节 # 151 圣诞节 销售时节 # 152 开学季 销售时节 # 153 复活节 销售时节 # 154 三八节 销售时节 # 155 情人节 销售时节 # TODO 根据 前端参数 客户类型 销售区域 做提权 # 标签ID 标签名称 标签类型 # 137 东欧市场 销售区域 # 138 欧美市场 销售区域 # 139 南美市场 销售区域 # 140 中东市场 销售区域 # 141 东南亚市场 销售区域 # 142 综合商超 客户类型 # 143 专业商超 客户类型 # 144 品牌商 客户类型 # 145 公司批发商 客户类型 # 146 市场批发商 客户类型 # 147 电商 客户类型 # 148 赠品商 客户类型 class SearchBoostStrategy: def __init__(self): # Initialize boost configurations from config file self.boost_configs: List[BoostConfig] = [ BoostConfig( config["tag_id"], config["tag_name"], config["tag_type"], config["boost_value"], config["es_intent_boost_value"], config["reranker_intent_boost_value"], config["intent_names"], config["platform"] ) for config in BOOST_CONFIGS ] # Create lookup dictionaries for faster access self.tag_id_to_boost: Dict[int, float] = { config.tag_id: config.boost_value for config in self.boost_configs } self.tag_name_to_boost: Dict[str, float] = { config.tag_name: config.boost_value for config in self.boost_configs } # Create intent-based boost lookup for ES search self.intent_to_boost: Dict[str, float] = {} for config in self.boost_configs: for intent_name in config.intent_names: self.intent_to_boost[intent_name] = config.es_intent_boost_value logger.debug(f"Initialized boost configs: {json.dumps([vars(c) for c in self.boost_configs], ensure_ascii=False)}") def _get_platform_boost_configs(self, business_platform: Optional[str]) -> List[BoostConfig]: """ Filters boost configurations based on the business platform. Returns a list of BoostConfig objects that match the platform. """ if not business_platform: return self.boost_configs return [ config for config in self.boost_configs if business_platform in config.platform ] def get_boost_query(self, user_profile: Optional[UserProfile] = None, label_field_name: Optional[str] = None, query_intents: Optional[List[str]] = None, business_platform: Optional[str] = None, search_context: Optional[Any] = None) -> dict: """ Generate the Elasticsearch boost query based on configured boost values and user profiles. Returns a function_score query that only affects scoring without impacting recall. Args: user_profile: User profile for behavior-based boosting label_field_name: Field name for label-based boosting query_intents: Detected query intents for intent-based boosting business_platform: Business platform for platform-based filtering search_context: Search context containing business platform and sale category information """ log_prefix = search_context.format_log_prefix() if search_context else "" functions = [] # Initialize boost query counters using int array for better performance # boost_cnt[0]: tag_functions, boost_cnt[1]: fresh_functions, boost_cnt[2]: behavior_functions # boost_cnt[3]: brand_functions, boost_cnt[4]: category_functions, boost_cnt[5]: price_range_functions # boost_cnt[6]: video_functions, boost_cnt[7]: platform_category_functions boost_cnt = [0] * 8 # Get platform-filtered boost configs platform_boost_configs = self._get_platform_boost_configs(business_platform) # Add boost for tag IDs - use dynamic field name and platform filtering if label_field_name: for config in platform_boost_configs: tag_id = config.tag_id boost_value = config.boost_value # Check if this tag should get intent-based boost final_boost_value = boost_value if query_intents: # Check if any detected intent matches this tag's intent_names for intent in query_intents: if intent in config.intent_names: final_boost_value = config.es_intent_boost_value logger.debug(f"{log_prefix} Intent-based boost for tag_id {tag_id}: {boost_value} -> {final_boost_value} (intent: {intent})") break functions.append({ "filter": { "term": { label_field_name: tag_id } }, "weight": final_boost_value }) boost_cnt[0] += 1 # tag_functions logger.debug(f"{log_prefix} Added {boost_cnt[0]} tag-based boost functions using field: {label_field_name} for platform: {business_platform}") if query_intents: logger.info(f"{log_prefix} Applied intent-based boost for intents: {query_intents}") else: logger.warning(f"{log_prefix} Label field name is empty, cannot apply tag boost") logger.warning(f"{log_prefix} Tag boost functions will be skipped - label_field_name is required for dynamic field name") # Add fresh boost using exact sigmoid formula # Check if new product intent is detected and apply power factor fresh_factor = FRESH_BOOST_CONFIG["default_factor"] if query_intents: for intent in query_intents: if intent == FRESH_BOOST_CONFIG["new_product_intent"]: fresh_factor = FRESH_BOOST_CONFIG["es_intent_factor"] logger.debug(f"{log_prefix} New product intent detected: {intent}, applying ES fresh boost factor: {fresh_factor}") break functions.append({ "field_value_factor": { "field": "on_sell_days_boost", "missing": 1.0, "factor": fresh_factor } }) boost_cnt[1] += 1 # fresh_functions logger.debug(f"{log_prefix} Added fresh boost function with factor: {fresh_factor}") # Add video boost functions.append({ "filter": { "term": { "is_video": True } }, "weight": BOOST_WEIGHTS_CONFIG["video_boost_weight"] }) boost_cnt[6] += 1 # video_functions logger.debug(f"{log_prefix} Added video boost function with weight: {BOOST_WEIGHTS_CONFIG['video_boost_weight']}") # ===== 平台类目排名提权 ===== if search_context and hasattr(search_context, 'businessPlatform') and hasattr(search_context, 'sale_category_id'): if search_context.businessPlatform and search_context.sale_category_id: platform_cate_top_keyword = f"{search_context.businessPlatform}_{search_context.sale_category_id}" logger.debug(f"{log_prefix} Adding platform category ranking boost for keyword: {platform_cate_top_keyword}") functions.append({ "filter": { "term": { "op_ranking_platform_cate_list": platform_cate_top_keyword } }, "weight": BOOST_WEIGHTS_CONFIG["platform_category_ranking_weight"] }) boost_cnt[7] += 1 # platform_category_functions logger.debug(f"{log_prefix} Added platform category ranking boost function for: {platform_cate_top_keyword}") else: logger.debug(f"{log_prefix} Skipping platform category boost - businessPlatform: {getattr(search_context, 'businessPlatform', 'None')}, sale_category_id: {getattr(search_context, 'sale_category_id', 'None')}") else: logger.debug(f"{log_prefix} Skipping platform category boost - search_context not provided or missing required fields") # ===== 用户画像个性化提权 ===== # 基于用户画像信息进行个性化商品推荐,提高搜索结果的个性化匹配度 # 包括:用户行为、品牌偏好、类目偏好、价格偏好、客户商品结构等维度 if user_profile: logger.debug(f"{log_prefix} Adding biz boosting based on user profile") logger.debug(f"{log_prefix} User profile base info: {MessageToDict(user_profile.base_info)}") # logger.debug(f"User profile statistics: {MessageToDict(user_profile.statistics)}") # Add detailed debug logging for statistics if logger.isEnabledFor(logging.DEBUG): logger.debug(f"{log_prefix} User profile statistics:") stats_dict = MessageToDict(user_profile.statistics) for key, value in stats_dict.items(): if isinstance(value, list): logger.debug(f"{log_prefix} Statistics {key}: {len(value)} items, first item: {value[0] if value else 'None'}") else: logger.debug(f"{log_prefix} Statistics {key}: {value}") # ===== 用户行为提权 ===== # 逻辑:从用户画像中提取行为记录(点击、加购、收藏、购买) # 限制:最多使用前N个行为记录,避免过多记录影响性能 behavior_map = user_profile.behavior_map # logger.debug(f"User behavior map: {MessageToDict(behavior_map)}") # Add detailed debug logging for behavior map if logger.isEnabledFor(logging.DEBUG): logger.debug(f"{log_prefix} User behavior map:") behavior_dict = MessageToDict(behavior_map) for behavior_type, behaviors in behavior_dict.items(): if isinstance(behaviors, list): logger.debug(f"{log_prefix} Behavior {behavior_type}: {len(behaviors)} items, first item: {behaviors[0] if behaviors else 'None'}") else: logger.debug(f"{log_prefix} Behavior {behavior_type}: {behaviors}") max_behavior_count_for_boost = BOOST_WEIGHTS_CONFIG["max_behavior_count_for_boost"] for behavior_type in ['click', 'add_cart', 'collect', 'purchase']: behaviors = getattr(behavior_map, behavior_type, []) if behaviors: sku_ids = [b.skuId for b in behaviors[:max_behavior_count_for_boost]] logger.debug(f"{log_prefix} Adding boost for {behavior_type} behaviors with {len(sku_ids)} SKUs: {sku_ids[:10]}") functions.append({ "filter": { "terms": { "sku_id": sku_ids } }, "weight": BOOST_WEIGHTS_CONFIG["user_behavior_weight"] }) boost_cnt[2] += 1 # behavior_functions # ===== 品牌偏好提权 ===== # 目的:基于用户偏好的品牌推荐商品,提高个性化匹配度 # 逻辑:从用户画像base_info中提取brandCategoryIds,对相关品牌商品进行提权 # 权重:从配置文件读取,默认1.1倍 if user_profile.base_info.brandCategoryIds: brand_ids = [x for x in user_profile.base_info.brandCategoryIds] logger.debug(f"{log_prefix} Adding boost for brand preferences with {len(brand_ids)} brand_ids {brand_ids[:10]}") functions.append({ "filter": { "terms": { "brand_id": brand_ids } }, "weight": BOOST_WEIGHTS_CONFIG["brand_preference_weight"] }) boost_cnt[3] += 1 # brand_functions # ===== 类目偏好提权 ===== # 目的:基于用户偏好的商品类目推荐相关商品,提高个性化匹配度 # 逻辑:从用户画像statistics中提取category_group,对相关类目商品进行提权 # 权重:从配置文件读取,默认1.08倍 # 注意:当前功能已禁用,如需启用请将if False改为if True if False: if user_profile.statistics.category_group: category_ids = [stat.keyId for stat in user_profile.statistics.category_group] category_stats = [MessageToDict(stat) for stat in user_profile.statistics.category_group] logger.debug(f"{log_prefix} Category preferences stats with {len(category_ids)} category_ids {category_ids[:10]}") logger.debug(f"{log_prefix} Adding boost for category preferences with {len(category_ids)} category_ids {category_ids[:10]}") functions.append({ "filter": { "terms": { "category_id": category_ids } }, "weight": BOOST_WEIGHTS_CONFIG["category_preference_weight"] }) boost_cnt[4] += 1 # category_functions # ===== 价格区间偏好提权 ===== # 目的:基于用户偏好的价格区间推荐相关商品,提高个性化匹配度 # 逻辑:从用户画像statistics中提取price_group,对相关价格区间商品进行提权 # 权重:从配置文件读取,默认1.1倍 # 注意:当前功能已禁用,如需启用请将if False改为if True if False: if user_profile.statistics.price_group: price_ranges = [stat.keyId for stat in user_profile.statistics.price_group] price_stats = [MessageToDict(stat) for stat in user_profile.statistics.price_group] logger.debug(f"{log_prefix} Price range preferences stats: {price_stats}") logger.debug(f"{log_prefix} Adding boost for price range preferences: {price_ranges}") functions.append({ "filter": { "terms": { "price_range": price_ranges } }, "weight": BOOST_WEIGHTS_CONFIG["price_range_preference_weight"] }) boost_cnt[5] += 1 # price_range_functions # ===== 客户商品结构类目提权 ===== # 目的:基于客户商品结构分析,推荐符合客户业务模式的类目商品 # 逻辑:从用户画像base_info中提取customerGoodsStructure,分析客户的类目偏好 # 权重:从配置文件读取,默认1.08倍 # 注意:categoryIds对应前端类目,不是ES的category_id字段 if user_profile.base_info.customerGoodsStructure: structure_list = [MessageToDict(s) for s in user_profile.base_info.customerGoodsStructure] logger.debug(f"{log_prefix} Customer goods structure details: {structure_list}") for structure in user_profile.base_info.customerGoodsStructure: if structure.categoryIds: logger.debug(f"{log_prefix} Adding boost for category IDs in structure length {len(structure.categoryIds)} category_ids {structure.categoryIds[:10]}") functions.append({ "filter": { "terms": { # 注意: user_profile.base_info.customerGoodsStructure.categoryIds 对应的是前端类目 而不是 ES 的 category_id "sale_category_all": [x for x in structure.categoryIds] } }, "weight": BOOST_WEIGHTS_CONFIG["customer_structure_category_weight"] }) boost_cnt[4] += 1 # category_functions if structure.priceBetween: # logger.debug(f"Adding boost for price range in structure: {structure.priceBetween}") # not support yet pass # Calculate total functions count total_functions = len(functions) # Log boost query statistics logger.info(f"{log_prefix} ===== ES查询提权函数统计 =====") logger.info(f"{log_prefix} 总提权函数数量: {total_functions}") logger.info(f"{log_prefix} 标签提权函数: {boost_cnt[0]}") logger.info(f"{log_prefix} 新品提权函数: {boost_cnt[1]}") logger.info(f"{log_prefix} 行为提权函数: {boost_cnt[2]}") logger.info(f"{log_prefix} 品牌提权函数: {boost_cnt[3]}") logger.info(f"{log_prefix} 类目提权函数: {boost_cnt[4]}") logger.info(f"{log_prefix} 价格区间提权函数: {boost_cnt[5]}") logger.info(f"{log_prefix} 视频提权函数: {boost_cnt[6]}") logger.info(f"{log_prefix} 平台类目排名提权函数: {boost_cnt[7]}") logger.info(f"{log_prefix} ===== ES查询提权函数统计结束 =====") if not functions: logger.debug(f"{log_prefix} No boost functions generated") return {} score_mode = FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY if search_context.search_query or search_context.query else FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY boost_query = { "function_score": { "functions": functions, "score_mode": score_mode, "boost_mode": "multiply" } } # logger.debug(f"Generated boost query: {json.dumps(boost_query, ensure_ascii=False)}") return boost_query def get_boost_value(self, tag_id: Optional[int] = None, tag_name: Optional[str] = None, platform: Optional[str] = None) -> float: """ Get the boost value for a given tag ID or name. Returns 1.0 if no boost is configured or if platform doesn't match. Args: tag_id: Tag ID to look up tag_name: Tag name to look up platform: Business platform for filtering """ if tag_id is not None: for config in self.boost_configs: if config.tag_id == tag_id: # Check platform compatibility if platform and config.platform != platform: logger.debug(f"Platform mismatch for tag_id {tag_id}: requested platform {platform}, tag platform {config.platform}") return 1.0 logger.debug(f"Found boost value {config.boost_value} for tag_id {tag_id}") return config.boost_value if tag_name is not None: for config in self.boost_configs: if config.tag_name == tag_name: # Check platform compatibility if platform and config.platform != platform: logger.debug(f"Platform mismatch for tag_name {tag_name}: requested platform {platform}, tag platform {config.platform}") return 1.0 logger.debug(f"Found boost value {config.boost_value} for tag_name {tag_name}") return config.boost_value logger.debug(f"No boost value found for tag_id={tag_id}, tag_name={tag_name}, platform={platform}") return 1.0