""" 用户画像信息提取器 - 从UserProfile中提取相关信息并生成描述 """ from typing import Dict, Any, Optional, List, NamedTuple from dataclasses import dataclass from datetime import datetime, timedelta from collections import Counter import re,math from src.services.user_profile import UserProfile from config.logging_config import get_app_logger from src.chat_search.dict_loader import DictLoader from config.chat_search_config import USER_PROFILE_BEHAVIOR_CONFIG, SESSION_CONFIG, ATTR_STATIS_DISPLAY_MIN_OPTION_COUNT, ATTR_STATIS_DISPLAY_MIN_PRODUCT_COUNT, get_display_text, USER_BEHAVIOR_STAT_IN_PROMPT, USER_SEARCH_HISTORY_IN_PROMPT logger = get_app_logger(__name__) @dataclass class BehaviorStatFieldConfig: """行为统计字段配置""" field_name: str # 原始字段名 feature_prefix: str # 特征前缀 display_name: str # 显示名称 description_template: str # 描述模板 max_items: int = 10 # 最大显示项目数 is_repeated: bool = False # 是否为重复字段 is_numeric: bool = False # 是否为数值字段 is_time: bool = False # 是否为时间字段 bucket_size: int = 10 # 分桶大小(仅用于数值字段) enable: bool = True # 是否启用该字段,默认启用 dict_name: str = None # 词典名称(可选) @dataclass class BehaviorStatsConfig: """行为统计配置""" # 行为权重定义 behavior_weights: Dict[str, float] = None # 直接取值字段配置 direct_fields: List[BehaviorStatFieldConfig] = None # 重复字段配置 repeated_fields: List[BehaviorStatFieldConfig] = None # 数值字段配置 numeric_fields: List[BehaviorStatFieldConfig] = None # 时间字段配置 time_fields: List[BehaviorStatFieldConfig] = None # 行为统计配置 behavior_summary_truncate_limit: int = 1000 # 行为统计截断限制 def __post_init__(self): """初始化默认配置""" # 从集中配置加载 config = USER_PROFILE_BEHAVIOR_CONFIG if self.behavior_weights is None: self.behavior_weights = config['behavior_weights'] if self.direct_fields is None: self.direct_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['direct_fields']] if self.repeated_fields is None: self.repeated_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['repeated_fields']] if self.numeric_fields is None: self.numeric_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['numeric_fields']] if self.time_fields is None: self.time_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['time_fields']] if self.behavior_summary_truncate_limit is None: self.behavior_summary_truncate_limit = config['behavior_summary_truncate_limit'] @dataclass class UserProfileInfo: """用户画像信息结构""" # 基础信息 sale_market_value: str = "" # 主要销售地区名 nature_of_company_value: str = "" # 公司性质名 customer_type: str = "" # 公司类型编码 customer_type_value: str = "" # 公司类型名 sell_channel_value: str = "" # 销售渠道名 stores_number: int = 0 # 门店数量 register_category_values: List[str] = None # 注册主要采购品类名 auth_category_values: List[str] = None # 认证主要采购品类名 purchase_quantity_by_year_value: str = "" # 采购规模名 customer_goods_structures: List[Dict[str, str]] = None # 客户商品结构 brand_category_values: List[str] = None # 客户品牌品类名 delivery_type_value: str = "" # 主要出货方式名 customs_import_scale: str = "" # 海关进口规模 purchase_quantity: int = 0 # 单款采购箱数 tax_clearance_type: str = "" # 清关方式编码 tax_clearance_type_value: str = "" # 清关方式名 category_values: List[str] = None # 经营类目名 stores_number_offline: int = 0 # 线下门店数量 year_sales_amount: str = "" # 年销售额 main_market_values: List[str] = None # 主攻市场名 main_area_values: List[str] = None # 外贸主攻区域名 secondary_area_values: List[str] = None # 外贸次要区域名 country_value: str = "" # 国家名 # 最近搜索词 recent_search_keywords: List[str] = None # 最近10个搜索词(过滤掉isSearchFactory=true的) def __post_init__(self): """初始化默认值""" if self.register_category_values is None: self.register_category_values = [] if self.auth_category_values is None: self.auth_category_values = [] if self.customer_goods_structures is None: self.customer_goods_structures = [] if self.brand_category_values is None: self.brand_category_values = [] if self.category_values is None: self.category_values = [] if self.main_market_values is None: self.main_market_values = [] if self.main_area_values is None: self.main_area_values = [] if self.secondary_area_values is None: self.secondary_area_values = [] if self.recent_search_keywords is None: self.recent_search_keywords = [] class UserProfileExtractor: """用户画像信息提取器""" def __init__(self): """初始化提取器""" self.behavior_stats_config = BehaviorStatsConfig() self.dict_loader = DictLoader() def extract_user_profile_info(self, user_profile: UserProfile) -> UserProfileInfo: """ 从UserProfile中提取相关信息 Args: user_profile: UserProfile对象 Returns: UserProfileInfo: 提取的用户画像信息 """ if not user_profile or not user_profile.base_info: logger.warning("[extract_user_profile_info] UserProfile or base_info is None") return UserProfileInfo() base_info = user_profile.base_info # 提取基础信息 profile_info = UserProfileInfo( sale_market_value=base_info.saleMarketValue or "", nature_of_company_value=base_info.natureOfCompanyValue or "", customer_type=base_info.customerType or "", customer_type_value=base_info.customerTypeValue or "", sell_channel_value=base_info.sellChannelValue or "", stores_number=base_info.storesNumber or 0, register_category_values=[str(item) for item in base_info.registerCategoryValues] if base_info.registerCategoryValues else [], auth_category_values=[str(item) for item in base_info.authCategoryValues] if base_info.authCategoryValues else [], purchase_quantity_by_year_value=base_info.purchaseQuantityByYearValue or "", customer_goods_structures=self._extract_customer_goods_structures(base_info.customerGoodsStructure), brand_category_values=[str(item) for item in base_info.brandCategoryValues] if base_info.brandCategoryValues else [], delivery_type_value=base_info.deliveryTypeValue or "", customs_import_scale=base_info.customsImportScale or "", purchase_quantity=base_info.purchaseQuantity or 0, tax_clearance_type=base_info.taxClearanceType or "", tax_clearance_type_value=base_info.taxClearanceTypeValue or "", category_values=[str(item) for item in base_info.categoryValues] if base_info.categoryValues else [], stores_number_offline=base_info.storesNumberOffline or 0, year_sales_amount=base_info.yearSalesAmount or "", main_market_values=[str(item) for item in base_info.mainMarketValues] if base_info.mainMarketValues else [], main_area_values=[str(item) for item in base_info.mainAreaValues] if base_info.mainAreaValues else [], secondary_area_values=[str(item) for item in base_info.secondaryAreaValues] if base_info.secondaryAreaValues else [], country_value=base_info.countryValue or "", recent_search_keywords=self._extract_recent_search_keywords(user_profile) ) logger.info(f"[UserProfileExtractor.extract_user_profile_info] Extracted user profile info: {profile_info}") return profile_info def _extract_customer_goods_structures(self, customer_goods_structures) -> List[Dict[str, str]]: """ 提取客户商品结构信息 Args: customer_goods_structures: 客户商品结构列表 Returns: List[Dict[str, str]]: 客户商品结构信息列表 """ if not customer_goods_structures: return [] structures = [] for structure in customer_goods_structures: structure_info = { 'price_between': structure.priceBetween or "", 'goods_grade': structure.goodsGrade or "", 'package_type': structure.packageType or "" } structures.append(structure_info) return structures def generate_chat_search_intro(self, profile_info: UserProfileInfo) -> str: """ 生成导购语介绍 Args: profile_info: UserProfileInfo对象 Returns: str: 导购语介绍 """ if profile_info: customer_type_value = profile_info.customer_type_value # 地理位置信息 location = profile_info.sale_market_value if profile_info.sale_market_value else profile_info.country_value else: customer_type_value = None location = None # 生成导购语 if not location and not customer_type_value: return "你是一个跨境B2B选品顾问,请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" elif not location: return f"你是一个跨境B2B选品顾问,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" elif not customer_type_value: return f"你是一个跨境B2B选品顾问,熟悉{location}市场。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" else: return f"你是一个跨境B2B选品顾问,熟悉{location}市场,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" def generate_natural_language_description(self, profile_info: UserProfileInfo) -> str: """ 生成用户基础信息的自然语言描述 Args: profile_info: UserProfileInfo对象 Returns: str: 自然语言描述 """ if not profile_info: return "暂无用户画像信息" description_parts = [] # 基础公司信息 if profile_info.customer_type_value: description_parts.append(f"公司类型:{profile_info.customer_type_value}") if profile_info.nature_of_company_value: description_parts.append(f"公司性质:{profile_info.nature_of_company_value}") if profile_info.sell_channel_value: description_parts.append(f"销售渠道:{profile_info.sell_channel_value}") # 地理位置信息 location_parts = [] if profile_info.country_value: location_parts.append(profile_info.country_value) if profile_info.sale_market_value: location_parts.append(profile_info.sale_market_value) if location_parts: description_parts.append(f"主要销售地区:{', '.join(location_parts)}") # 门店信息 if profile_info.stores_number > 0: description_parts.append(f"门店数量:{profile_info.stores_number}家") if profile_info.stores_number_offline > 0: description_parts.append(f"线下门店:{profile_info.stores_number_offline}家") # 采购信息 if profile_info.purchase_quantity_by_year_value: description_parts.append(f"采购规模:{profile_info.purchase_quantity_by_year_value}") if profile_info.purchase_quantity > 0: description_parts.append(f"单款采购箱数:{profile_info.purchase_quantity}箱") # 年销售额 if profile_info.year_sales_amount: description_parts.append(f"年销售额:{profile_info.year_sales_amount}") # 类目信息 if profile_info.register_category_values: description_parts.append(f"注册采购品类:{', '.join(str(item) for item in profile_info.register_category_values)}") if profile_info.auth_category_values: description_parts.append(f"认证采购品类:{', '.join(str(item) for item in profile_info.auth_category_values)}") if profile_info.category_values: description_parts.append(f"经营类目:{', '.join(str(item) for item in profile_info.category_values)}") # 品牌信息 if profile_info.brand_category_values: description_parts.append(f"品牌品类:{', '.join(str(item) for item in profile_info.brand_category_values)}") # 市场信息 if profile_info.main_market_values: description_parts.append(f"主攻市场:{', '.join(str(item) for item in profile_info.main_market_values)}") if profile_info.main_area_values: description_parts.append(f"外贸主攻区域:{', '.join(str(item) for item in profile_info.main_area_values)}") # 商品结构统计 if profile_info.customer_goods_structures: structure_descriptions = [] for structure in profile_info.customer_goods_structures[:USER_PROFILE_BEHAVIOR_CONFIG['max_customer_goods_structures']]: # 只取前N个 parts = [] if structure['price_between']: parts.append(f"价格区间{structure['price_between']}") if structure['goods_grade']: parts.append(f"产品档次{structure['goods_grade']}") if structure['package_type']: parts.append(f"包装类型{structure['package_type']}") if parts: structure_descriptions.append('、'.join(parts)) if structure_descriptions: description_parts.append(f"商品结构统计:{'; '.join(structure_descriptions)}") # 物流信息 if profile_info.delivery_type_value: description_parts.append(f"主要出货方式:{profile_info.delivery_type_value}") if profile_info.tax_clearance_type_value: description_parts.append(f"清关方式:{profile_info.tax_clearance_type_value}") if profile_info.customs_import_scale: description_parts.append(f"海关进口规模:{profile_info.customs_import_scale}") # 组合成完整描述 if description_parts: return "\n".join(description_parts) else: return "暂无用户画像信息(信息为空)" def extract_and_describe(self, user_profile: UserProfile) -> str: """ 提取用户画像信息并生成完整的自然语言描述 Args: user_profile: UserProfile对象 Returns: 导购语, 完整的用户画像自然语言描述 """ # 提取基础信息 profile_info = self.extract_user_profile_info(user_profile) # 生成导购语 guide_intro = self.generate_chat_search_intro(profile_info) if not user_profile: return guide_intro, "暂无用户画像信息" natural_description = self.generate_natural_language_description(profile_info) # 提取历史行为中的通用属性分布统计 common_attribute_distribution = self.extract_common_attribute_distribution(user_profile) # 提取历史行为中每个商品的具体属性统计 item_specific_attributes = self.extract_item_specific_attributes(user_profile) # 生成自然语言描述 common_attribute_description = self.generate_common_attribute_distribution_description(common_attribute_distribution) item_specific_attribute_description = self.generate_item_specific_attribute_description(item_specific_attributes) # 组织完整的描述 language = getattr(self, 'language', 'zh') complete_description = f"{get_display_text('customer_background', language)}:\n{natural_description}" # 添加通用属性分布描述 if USER_BEHAVIOR_STAT_IN_PROMPT: if common_attribute_description: complete_description += f"\n\n{get_display_text('historical_purchase_general_attributes', language)}:\n{common_attribute_description}" # 添加具体属性偏好描述 if item_specific_attribute_description: complete_description += f"\n\n{get_display_text('historical_purchase_category_specific_attributes', language)}:\n{item_specific_attribute_description}" # 添加最近搜索词信息 # 提取最近搜索词 if USER_SEARCH_HISTORY_IN_PROMPT: recent_search_keywords = self._extract_recent_search_keywords(user_profile) if recent_search_keywords: complete_description += f"\n\n{get_display_text('recent_search_keywords', language)}:{', '.join(recent_search_keywords)}" return guide_intro, complete_description def extract_common_attribute_distribution(self, user_profile: UserProfile) -> Dict[str, Any]: """ 提取历史行为中的通用属性分布统计 Args: user_profile: UserProfile对象 Returns: Dict[str, Any]: 通用属性分布统计信息 """ if not user_profile or not user_profile.behavior_map: logger.warning("[extract_common_attribute_distribution] UserProfile or behavior_map is None") return {} behavior_map = user_profile.behavior_map common_features = {} # 获取所有行为数据 all_behaviors = [] for behavior_type, behaviors in [ ('click', behavior_map.click), ('add_cart', behavior_map.add_cart), ('collect', behavior_map.collect), ('purchase', behavior_map.purchase) ]: logger.info(f"[UserProfileExtractor.extract_common_attribute_distribution] Extracted behavior_type {behavior_type} with {len(behaviors)} behaviors") for behavior in behaviors: all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) # 1. 处理直接取值字段 for field_config in self.behavior_stats_config.direct_fields: if not field_config.enable: continue counter = Counter() total_weight_for_field = 0 # 该字段的总权重(包括空值) for behavior, weight in all_behaviors: total_weight_for_field += weight # 所有行为都计入总数 if hasattr(behavior, field_config.field_name): value = getattr(behavior, field_config.field_name) if value: # 确保值不为空 counter[str(value)] += weight # 转换为字符串 # 如果值为空,不加入counter,但已计入total_weight_for_field # 计算空值权重 empty_weight = total_weight_for_field - sum(counter.values()) if empty_weight > 0: counter['__empty__'] = empty_weight # 保存统计结果 common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] # 2. 处理重复字段 for field_config in self.behavior_stats_config.repeated_fields: if not field_config.enable: continue counter = Counter() total_weight_for_field = 0 # 该字段的总权重(包括空值) for behavior, weight in all_behaviors: total_weight_for_field += weight # 所有行为都计入总数 if hasattr(behavior, field_config.field_name) and getattr(behavior, field_config.field_name): values = getattr(behavior, field_config.field_name) has_valid_value = False for value in values: if value: counter[str(value)] += weight has_valid_value = True # 如果没有有效值,不加入counter,但已计入total_weight_for_field # 如果字段不存在或为空,不加入counter,但已计入total_weight_for_field # 计算空值权重 empty_weight = total_weight_for_field - sum(counter.values()) if empty_weight > 0: counter['__empty__'] = empty_weight common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] # 3. 处理数值字段分桶统计 for field_config in self.behavior_stats_config.numeric_fields: if not field_config.enable: continue bucket_counter = Counter() total_weight_for_field = 0 # 该字段的总权重(包括空值) for behavior, weight in all_behaviors: total_weight_for_field += weight # 所有行为都计入总数 if hasattr(behavior, field_config.field_name): value = getattr(behavior, field_config.field_name) if value and value > 0: bucket = int(value / field_config.bucket_size) bucket_counter[str(bucket)] += weight # 转换为字符串 # 如果值为空或<=0,不加入counter,但已计入total_weight_for_field # 计算空值权重 empty_weight = total_weight_for_field - sum(bucket_counter.values()) if empty_weight > 0: bucket_counter['__empty__'] = empty_weight common_features[f'{field_config.feature_prefix}_bucket_weighted_counts'] = dict(bucket_counter) common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field common_features[f'{field_config.feature_prefix}_top_buckets'] = [bucket for bucket, count in bucket_counter.most_common(10)] # 4. 处理时间差统计 for field_config in self.behavior_stats_config.time_fields: if not field_config.enable: continue time_bucket_counter = Counter() total_weight_for_field = 0 # 该字段的总权重(包括空值) for behavior, weight in all_behaviors: total_weight_for_field += weight # 所有行为都计入总数 if hasattr(behavior, field_config.field_name) and hasattr(behavior, 'behaviorTime'): time_value = getattr(behavior, field_config.field_name) behavior_time = behavior.behaviorTime if time_value and behavior_time: try: # 解析时间字符串 if isinstance(time_value, str): time_obj = datetime.strptime(time_value, '%Y-%m-%d %H:%M:%S') else: time_obj = time_value if isinstance(behavior_time, str): behavior_time_obj = datetime.strptime(behavior_time, '%Y-%m-%d %H:%M:%S') else: behavior_time_obj = behavior_time # 计算时间差(月数) time_diff = behavior_time_obj - time_obj months_diff = int(time_diff.days / 30) # 分桶:0-6个月,6-12个月,12-24个月,24个月以上 if months_diff < 0: bucket = 'future' elif months_diff <= 6: bucket = '0-6m' elif months_diff <= 12: bucket = '6-12m' elif months_diff <= 24: bucket = '12-24m' else: bucket = '24m+' time_bucket_counter[bucket] += weight except (ValueError, TypeError) as e: logger.debug(f"Error parsing time for {field_config.field_name}: {e}") continue # 如果时间值为空或解析失败,不加入counter,但已计入total_weight_for_field # 计算空值权重 empty_weight = total_weight_for_field - sum(time_bucket_counter.values()) if empty_weight > 0: time_bucket_counter['__empty__'] = empty_weight common_features[f'{field_config.feature_prefix}_time_bucket_weighted_counts'] = dict(time_bucket_counter) common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field common_features[f'{field_config.feature_prefix}_top_time_buckets'] = [bucket for bucket, count in time_bucket_counter.most_common(5)] # 5. 综合统计信息 total_weighted_behaviors = sum(weight for _, weight in all_behaviors) common_features['total_weighted_behaviors'] = total_weighted_behaviors # 各行为类型的统计 behavior_type_counts = Counter() for behavior_type, behaviors in [ ('click', behavior_map.click), ('add_cart', behavior_map.add_cart), ('collect', behavior_map.collect), ('purchase', behavior_map.purchase) ]: behavior_type_counts[behavior_type] = len(behaviors) common_features['behavior_type_counts'] = dict(behavior_type_counts) logger.info(f"Extracted behavior stats with {len(common_features)} feature groups") return common_features def extract_item_specific_attributes(self, user_profile: UserProfile) -> Dict[str, Any]: """ 从历史行为中提取每个商品的具体属性统计 Args: user_profile: UserProfile对象 Returns: Dict[str, Any]: 商品具体属性统计信息 """ if not user_profile or not user_profile.behavior_map: logger.warning("[extract_item_specific_attributes] UserProfile or behavior_map is None") return {} behavior_map = user_profile.behavior_map # 获取所有行为数据 all_behaviors = [] for behavior_type, behaviors in [ ('click', behavior_map.click), ('add_cart', behavior_map.add_cart), ('collect', behavior_map.collect), ('purchase', behavior_map.purchase) ]: for behavior in behaviors: all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) # 统计每个属性名称和属性值对应的权重 attr_statistics = {} # {attr_name: {option_name: weight}} for behavior, weight in all_behaviors: # 合并 spuAttributeList 和 skuAttributeList merged_attributes = [] # 以 skuAttributeList 为基础 if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: merged_attributes.extend(behavior.skuAttributeList) # 加入 spuAttributeList,如果 attributeId 已存在则跳过 existing_attr_ids = set() if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: existing_attr_ids = {attr.attributeId for attr in behavior.skuAttributeList} if hasattr(behavior, 'spuAttributeList') and behavior.spuAttributeList: for attr in behavior.spuAttributeList: if attr.attributeId not in existing_attr_ids: merged_attributes.append(attr) existing_attr_ids.add(attr.attributeId) # 统计合并后的属性 for attr in merged_attributes: attr_id = attr.attributeId option_id = attr.optionId # 获取属性名称 attr_name = self.dict_loader.get_name('spu_attribute', str(attr_id)) if not attr_name: attr_name = self.dict_loader.get_name('sku_attribute', str(attr_id)) if not attr_name: attr_name = f"属性{attr_id}" # 获取属性值名称 option_name = self.dict_loader.get_name('spu_attribute_option', str(option_id)) if not option_name: option_name = self.dict_loader.get_name('sku_attribute_option', str(option_id)) if not option_name: option_name = f"选项{option_id}" # 跳过无效的属性值 if option_name == '无' or not option_name: continue # 统计 if attr_name not in attr_statistics: attr_statistics[attr_name] = {} if option_name not in attr_statistics[attr_name]: attr_statistics[attr_name][option_name] = 0 attr_statistics[attr_name][option_name] += weight if not attr_statistics: return {} # 生成属性统计特征 attribute_features = {} # 计算每个属性的总权重并排序 attr_with_total = [ (attr_name, options_dict, sum(options_dict.values())) for attr_name, options_dict in attr_statistics.items() ] # 按总权重排序,取前10个属性 sorted_attrs = sorted(attr_with_total, key=lambda x: x[2], reverse=True) for attr_name, options_dict, total_weight in sorted_attrs: # 按权重排序选项,取前5个 sorted_options = sorted(options_dict.items(), key=lambda x: x[1], reverse=True) # 生成特征名称(使用属性名称的拼音或ID作为前缀) attr_feature_prefix = f"attr_{attr_name.replace(' ', '_').replace(':', '_')}" attribute_features[f'{attr_feature_prefix}_weighted_counts'] = dict(options_dict) attribute_features[f'{attr_feature_prefix}_total_weight'] = total_weight attribute_features[f'{attr_feature_prefix}_top_items'] = [item for item, count in sorted_options] # 添加总体属性统计 total_attribute_weight = sum(attr[2] for attr in sorted_attrs) attribute_features['attribute_total_weight'] = total_attribute_weight attribute_features['attribute_attr_count'] = len(sorted_attrs) logger.info(f"Extracted attribute statistics with {len(attribute_features)} attribute feature groups") return attribute_features def generate_common_attribute_distribution_description(self, common_attribute_distribution: Dict[str, Any]) -> str: """ 生成通用属性分布统计的自然语言描述 Args: common_attribute_distribution: 通用属性分布统计信息 Returns: str: 自然语言描述 """ if not common_attribute_distribution: return "暂无通用属性分布统计信息" description_parts = [] # 0. 行为总述(放在最前面) if 'behavior_type_counts' in common_attribute_distribution: behavior_counts = common_attribute_distribution['behavior_type_counts'] total_behaviors = sum(behavior_counts.values()) if total_behaviors > 0: behavior_summary_parts = [] # 检查是否达到截断限制 if total_behaviors >= self.behavior_stats_config.behavior_summary_truncate_limit: behavior_summary_parts.append(f"该用户有超过{self.behavior_stats_config.behavior_summary_truncate_limit}次行为") else: behavior_summary_parts.append(f"该用户有{total_behaviors}次行为") # 添加具体行为类型统计 behavior_details = [] if behavior_counts.get('click', 0) > 0: behavior_details.append(f"{behavior_counts['click']}次点击") if behavior_counts.get('add_cart', 0) > 0: behavior_details.append(f"{behavior_counts['add_cart']}次加购") if behavior_counts.get('collect', 0) > 0: behavior_details.append(f"{behavior_counts['collect']}次收藏") if behavior_counts.get('purchase', 0) > 0: behavior_details.append(f"{behavior_counts['purchase']}次购买") if behavior_details: behavior_summary_parts.append(f"包括{', '.join(behavior_details)}") description_parts.append(''.join(behavior_summary_parts)) # 1. 处理直接取值字段描述 for field_config in self.behavior_stats_config.direct_fields: if not field_config.enable: continue weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' total_weight_key = f'{field_config.feature_prefix}_total_weight' if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: weighted_counts = common_attribute_distribution[weighted_counts_key] total_weight = common_attribute_distribution[total_weight_key] if total_weight > 0: # 生成带占比的描述 items_with_percentage = [] for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: percentage = (count / total_weight) * 100 # 词典映射 if item == '__empty__': display_name = '空值' elif field_config.dict_name: display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) else: display_name = str(item) items_with_percentage.append(f"{display_name}({percentage:.1f}%)") if items_with_percentage: description = field_config.description_template.format( display_name=field_config.display_name, values=', '.join(items_with_percentage) ) description_parts.append(description) # 2. 处理重复字段描述 for field_config in self.behavior_stats_config.repeated_fields: if not field_config.enable: continue weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' total_weight_key = f'{field_config.feature_prefix}_total_weight' if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: weighted_counts = common_attribute_distribution[weighted_counts_key] total_weight = common_attribute_distribution[total_weight_key] if total_weight > 0: # 生成带占比的描述 items_with_percentage = [] for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: percentage = (count / total_weight) * 100 # 词典映射 if item == '__empty__': display_name = '空值' elif field_config.dict_name: display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) else: display_name = str(item) items_with_percentage.append(f"{display_name}({percentage:.1f}%)") if items_with_percentage: description = field_config.description_template.format( display_name=field_config.display_name, values=', '.join(items_with_percentage) ) description_parts.append(description) # 3. 处理数值字段描述 for field_config in self.behavior_stats_config.numeric_fields: if not field_config.enable: continue bucket_counts_key = f'{field_config.feature_prefix}_bucket_weighted_counts' total_weight_key = f'{field_config.feature_prefix}_total_weight' if bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: bucket_counts = common_attribute_distribution[bucket_counts_key] total_weight = common_attribute_distribution[total_weight_key] if total_weight > 0: # 生成带占比的描述 ranges_with_percentage = [] for bucket, count in sorted(bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: percentage = (count / total_weight) * 100 if bucket == '__empty__': range_desc = '空值' else: range_desc = f"{int(bucket)*field_config.bucket_size}-{(int(bucket)+1)*field_config.bucket_size}" ranges_with_percentage.append(f"{range_desc}({percentage:.1f}%)") if ranges_with_percentage: description = field_config.description_template.format( display_name=field_config.display_name, values=', '.join(ranges_with_percentage) ) description_parts.append(description) # 4. 处理时间字段描述 for field_config in self.behavior_stats_config.time_fields: if not field_config.enable: continue time_bucket_counts_key = f'{field_config.feature_prefix}_time_bucket_weighted_counts' total_weight_key = f'{field_config.feature_prefix}_total_weight' if time_bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: time_bucket_counts = common_attribute_distribution[time_bucket_counts_key] total_weight = common_attribute_distribution[total_weight_key] if total_weight > 0: # 生成带占比的描述 time_descriptions_with_percentage = [] for bucket, count in sorted(time_bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: percentage = (count / total_weight) * 100 bucket_str = str(bucket) if bucket_str == '__empty__': time_desc = '空值' elif bucket_str == '0-6m': time_desc = '半年内' elif bucket_str == '6-12m': time_desc = '半年到一年' elif bucket_str == '12-24m': time_desc = '1-2年' elif bucket_str == '24m+': time_desc = '2年+' elif bucket_str == 'future': time_desc = '错误时间' else: time_desc = bucket_str time_descriptions_with_percentage.append(f"{time_desc}({percentage:.1f}%)") if time_descriptions_with_percentage: description = field_config.description_template.format( display_name=field_config.display_name, values=', '.join(time_descriptions_with_percentage) ) description_parts.append(description) # 组合成完整描述 if description_parts: return "\n".join(description_parts) else: return "" def generate_item_specific_attribute_description(self, item_specific_attributes: Dict[str, Any]) -> str: """ 生成商品具体属性统计的自然语言描述 Args: item_specific_attributes: 商品具体属性统计信息 Returns: str: 商品具体属性统计的自然语言描述 """ if not item_specific_attributes: return "暂无商品具体属性统计信息。" descriptions = [] # 获取所有属性相关的特征 attr_features = {} for key, value in item_specific_attributes.items(): if key.startswith('attr_') and key.endswith('_weighted_counts'): attr_name = key.replace('_weighted_counts', '').replace('attr_', '') attr_features[attr_name] = value if not attr_features: return "暂无有效属性统计信息。" # 按总权重排序属性 sorted_attrs = [] for attr_name, weighted_counts in attr_features.items(): total_weight = sum(weighted_counts.values()) sorted_attrs.append((attr_name, weighted_counts, total_weight)) sorted_attrs.sort(key=lambda x: x[2], reverse=True) # 生成描述 max_attrs = USER_PROFILE_BEHAVIOR_CONFIG['max_attributes_display'] max_options = USER_PROFILE_BEHAVIOR_CONFIG['max_options_per_attribute'] for attr_name, weighted_counts, total_weight in sorted_attrs[:max_attrs]: # 取前N个属性 # 按权重排序选项,取前N个 sorted_options = sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:max_options] option_texts = [] for option_name, weight in sorted_options: if option_name != '__empty__': # 计算百分比 percentage = (weight / total_weight) * 100 option_texts.append(f"{option_name}({percentage:.1f}%)") if option_texts: desc = f"• {attr_name}: {', '.join(option_texts)}" descriptions.append(desc) if descriptions: return "\n".join(descriptions) return "暂无有效属性统计信息。" def _extract_recent_search_keywords(self, user_profile: UserProfile) -> List[str]: """ 提取最近10个搜索词(过滤掉isSearchFactory=true的) Args: user_profile: UserProfile对象 Returns: List[str]: 最近10个搜索词列表 """ if not user_profile or not user_profile.behavior_map: return [] search_keywords = user_profile.behavior_map.search_keyword if not search_keywords: return [] # 过滤、去重并收集最近10个搜索词 seen_keywords = set() recent_keywords = [] for search_behavior in search_keywords: if not search_behavior.isSearchFactory and search_behavior.keyword: keyword = search_behavior.keyword.strip() # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 if self._is_valid_search_keyword(keyword): if keyword not in seen_keywords: seen_keywords.add(keyword) recent_keywords.append(keyword) if len(recent_keywords) >= SESSION_CONFIG['max_recent_search_keywords']: # 达到最大数量就停止 break logger.info(f"[UserProfileExtractor._extract_recent_search_keywords] Extracted {len(recent_keywords)} recent search keywords") return recent_keywords def _is_valid_search_keyword(self, keyword: str) -> bool: """ 判断搜索关键词是否有效 Args: keyword: 搜索关键词 Returns: bool: 是否有效 """ if not keyword or keyword.strip() == '': return False # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 # 使用正则表达式匹配:只包含数字、下划线、减号、空白字符的字符串 if re.match(r'^[\d\s_-]+$', keyword): return False # 只有一个单词(split后只有一个)、并且这个单词里面既包含数字又包含字母 (转小写后 既有小写字母、又有数字) if len(keyword.split()) == 1: if re.match(r'^[a-z0-9]+$', keyword.lower()): return False # 包含数字和- if re.match(r'^[0-9-]+$', keyword): return False return True