From 1557b026b6f4948725f99f0bdd66c78174b0e290 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 23 Oct 2025 23:23:54 +0800 Subject: [PATCH] 调整目录 --- boost_strategy.py | 429 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- db_service.py | 48 ------------------------------------------------ refers/boost_strategy.py | 429 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ refers/db_service.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ refers/user_profile.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ refers/user_profile_extractor.py | 1006 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ user_profile.py | 58 ---------------------------------------------------------- user_profile_extractor.py | 1006 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 8 files changed, 1541 insertions(+), 1541 deletions(-) delete mode 100644 boost_strategy.py delete mode 100644 db_service.py create mode 100644 refers/boost_strategy.py create mode 100644 refers/db_service.py create mode 100644 refers/user_profile.py create mode 100644 refers/user_profile_extractor.py delete mode 100644 user_profile.py delete mode 100644 user_profile_extractor.py diff --git a/boost_strategy.py b/boost_strategy.py deleted file mode 100644 index efd6416..0000000 --- a/boost_strategy.py +++ /dev/null @@ -1,429 +0,0 @@ -from typing import Dict, List, Optional, Any -from dataclasses import dataclass -import json -from src.services.user_profile import UserProfile -from config.logging_config import get_app_logger -from google.protobuf.json_format import MessageToDict -import logging -from config.app_config import BOOST_CONFIGS, FRESH_BOOST_CONFIG, BOOST_WEIGHTS_CONFIG, FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY, FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY - -logger = get_app_logger(__name__) - -@dataclass -class BoostConfig: - tag_id: int - tag_name: str - tag_type: Optional[str] - boost_value: float - es_intent_boost_value: float - reranker_intent_boost_value: float - intent_names: List[str] - platform: List[str] - - - -# 标签ID 标签名称 标签类型 提权幅度 -# 156 行业新品 销售属性 1.1 -# 157 爆品/时货 销售属性 1.1 -# 158 常年热销 销售属性 1.1 -# 159 质量好 销售属性 1.1 -# 162 小惠商品 null 1.05 -# 163 优惠商品 null 1.1 -# 164 特惠商品 null 1.3 -# 165 超惠商品 null 1.15 - -# 3 一箱快出 null -# 5 推荐 null -# 10 人气热销 null -# 14 特色精选 null -# 17 赠品(新)(补柜专区) null -# 20 新品首发 null -# 21 0316-首发新品【新品页面专用】 null -# 25 0316essa新品-【新品页面专用】 null -# 26 essaone新品 null -# 27 0316最近上架(专区) null -# 40 一箱 null -# 41 快出 null -# 42 上市新品(报表)&(专区) null -# 43 9.20内销(专区) null -# 82 半箱拼团 null - -# # 季节性,打入到 关键词字段 做匹配 -# 149 年货 销售时节 -# 150 万圣节 销售时节 -# 151 圣诞节 销售时节 -# 152 开学季 销售时节 -# 153 复活节 销售时节 -# 154 三八节 销售时节 -# 155 情人节 销售时节 - - -# TODO 根据 前端参数 客户类型 销售区域 做提权 -# 标签ID 标签名称 标签类型 -# 137 东欧市场 销售区域 -# 138 欧美市场 销售区域 -# 139 南美市场 销售区域 -# 140 中东市场 销售区域 -# 141 东南亚市场 销售区域 -# 142 综合商超 客户类型 -# 143 专业商超 客户类型 -# 144 品牌商 客户类型 -# 145 公司批发商 客户类型 -# 146 市场批发商 客户类型 -# 147 电商 客户类型 -# 148 赠品商 客户类型 - -class SearchBoostStrategy: - def __init__(self): - # Initialize boost configurations from config file - self.boost_configs: List[BoostConfig] = [ - BoostConfig( - config["tag_id"], - config["tag_name"], - config["tag_type"], - config["boost_value"], - config["es_intent_boost_value"], - config["reranker_intent_boost_value"], - config["intent_names"], - config["platform"] - ) for config in BOOST_CONFIGS - ] - - # Create lookup dictionaries for faster access - self.tag_id_to_boost: Dict[int, float] = { - config.tag_id: config.boost_value for config in self.boost_configs - } - - self.tag_name_to_boost: Dict[str, float] = { - config.tag_name: config.boost_value for config in self.boost_configs - } - - # Create intent-based boost lookup for ES search - self.intent_to_boost: Dict[str, float] = {} - for config in self.boost_configs: - for intent_name in config.intent_names: - self.intent_to_boost[intent_name] = config.es_intent_boost_value - - logger.debug(f"Initialized boost configs: {json.dumps([vars(c) for c in self.boost_configs], ensure_ascii=False)}") - - def _get_platform_boost_configs(self, business_platform: Optional[str]) -> List[BoostConfig]: - """ - Filters boost configurations based on the business platform. - Returns a list of BoostConfig objects that match the platform. - """ - if not business_platform: - return self.boost_configs - return [ - config for config in self.boost_configs - if business_platform in config.platform - ] - - def get_boost_query(self, user_profile: Optional[UserProfile] = None, label_field_name: Optional[str] = None, query_intents: Optional[List[str]] = None, business_platform: Optional[str] = None, search_context: Optional[Any] = None) -> dict: - """ - Generate the Elasticsearch boost query based on configured boost values and user profiles. - Returns a function_score query that only affects scoring without impacting recall. - - Args: - user_profile: User profile for behavior-based boosting - label_field_name: Field name for label-based boosting - query_intents: Detected query intents for intent-based boosting - business_platform: Business platform for platform-based filtering - search_context: Search context containing business platform and sale category information - """ - log_prefix = search_context.format_log_prefix() if search_context else "" - functions = [] - - # Initialize boost query counters using int array for better performance - # boost_cnt[0]: tag_functions, boost_cnt[1]: fresh_functions, boost_cnt[2]: behavior_functions - # boost_cnt[3]: brand_functions, boost_cnt[4]: category_functions, boost_cnt[5]: price_range_functions - # boost_cnt[6]: video_functions, boost_cnt[7]: platform_category_functions - boost_cnt = [0] * 8 - - # Get platform-filtered boost configs - platform_boost_configs = self._get_platform_boost_configs(business_platform) - - # Add boost for tag IDs - use dynamic field name and platform filtering - if label_field_name: - for config in platform_boost_configs: - tag_id = config.tag_id - boost_value = config.boost_value - - # Check if this tag should get intent-based boost - final_boost_value = boost_value - if query_intents: - # Check if any detected intent matches this tag's intent_names - for intent in query_intents: - if intent in config.intent_names: - final_boost_value = config.es_intent_boost_value - logger.debug(f"{log_prefix} Intent-based boost for tag_id {tag_id}: {boost_value} -> {final_boost_value} (intent: {intent})") - break - - functions.append({ - "filter": { - "term": { - label_field_name: tag_id - } - }, - "weight": final_boost_value - }) - boost_cnt[0] += 1 # tag_functions - logger.debug(f"{log_prefix} Added {boost_cnt[0]} tag-based boost functions using field: {label_field_name} for platform: {business_platform}") - if query_intents: - logger.info(f"{log_prefix} Applied intent-based boost for intents: {query_intents}") - else: - logger.warning(f"{log_prefix} Label field name is empty, cannot apply tag boost") - logger.warning(f"{log_prefix} Tag boost functions will be skipped - label_field_name is required for dynamic field name") - - # Add fresh boost using exact sigmoid formula - # Check if new product intent is detected and apply power factor - fresh_factor = FRESH_BOOST_CONFIG["default_factor"] - if query_intents: - for intent in query_intents: - if intent == FRESH_BOOST_CONFIG["new_product_intent"]: - fresh_factor = FRESH_BOOST_CONFIG["es_intent_factor"] - logger.debug(f"{log_prefix} New product intent detected: {intent}, applying ES fresh boost factor: {fresh_factor}") - break - - functions.append({ - "field_value_factor": { - "field": "on_sell_days_boost", - "missing": 1.0, - "factor": fresh_factor - } - }) - boost_cnt[1] += 1 # fresh_functions - logger.debug(f"{log_prefix} Added fresh boost function with factor: {fresh_factor}") - - # Add video boost - functions.append({ - "filter": { - "term": { - "is_video": True - } - }, - "weight": BOOST_WEIGHTS_CONFIG["video_boost_weight"] - }) - boost_cnt[6] += 1 # video_functions - logger.debug(f"{log_prefix} Added video boost function with weight: {BOOST_WEIGHTS_CONFIG['video_boost_weight']}") - - # ===== 平台类目排名提权 ===== - if search_context and hasattr(search_context, 'businessPlatform') and hasattr(search_context, 'sale_category_id'): - if search_context.businessPlatform and search_context.sale_category_id: - platform_cate_top_keyword = f"{search_context.businessPlatform}_{search_context.sale_category_id}" - logger.debug(f"{log_prefix} Adding platform category ranking boost for keyword: {platform_cate_top_keyword}") - functions.append({ - "filter": { - "term": { - "op_ranking_platform_cate_list": platform_cate_top_keyword - } - }, - "weight": BOOST_WEIGHTS_CONFIG["platform_category_ranking_weight"] - }) - boost_cnt[7] += 1 # platform_category_functions - logger.debug(f"{log_prefix} Added platform category ranking boost function for: {platform_cate_top_keyword}") - else: - logger.debug(f"{log_prefix} Skipping platform category boost - businessPlatform: {getattr(search_context, 'businessPlatform', 'None')}, sale_category_id: {getattr(search_context, 'sale_category_id', 'None')}") - else: - logger.debug(f"{log_prefix} Skipping platform category boost - search_context not provided or missing required fields") - - # ===== 用户画像个性化提权 ===== - # 基于用户画像信息进行个性化商品推荐,提高搜索结果的个性化匹配度 - # 包括:用户行为、品牌偏好、类目偏好、价格偏好、客户商品结构等维度 - if user_profile: - logger.debug(f"{log_prefix} Adding biz boosting based on user profile") - logger.debug(f"{log_prefix} User profile base info: {MessageToDict(user_profile.base_info)}") - # logger.debug(f"User profile statistics: {MessageToDict(user_profile.statistics)}") - - # Add detailed debug logging for statistics - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"{log_prefix} User profile statistics:") - stats_dict = MessageToDict(user_profile.statistics) - for key, value in stats_dict.items(): - if isinstance(value, list): - logger.debug(f"{log_prefix} Statistics {key}: {len(value)} items, first item: {value[0] if value else 'None'}") - else: - logger.debug(f"{log_prefix} Statistics {key}: {value}") - - # ===== 用户行为提权 ===== - # 逻辑:从用户画像中提取行为记录(点击、加购、收藏、购买) - # 限制:最多使用前N个行为记录,避免过多记录影响性能 - behavior_map = user_profile.behavior_map - # logger.debug(f"User behavior map: {MessageToDict(behavior_map)}") - - # Add detailed debug logging for behavior map - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"{log_prefix} User behavior map:") - behavior_dict = MessageToDict(behavior_map) - for behavior_type, behaviors in behavior_dict.items(): - if isinstance(behaviors, list): - logger.debug(f"{log_prefix} Behavior {behavior_type}: {len(behaviors)} items, first item: {behaviors[0] if behaviors else 'None'}") - else: - logger.debug(f"{log_prefix} Behavior {behavior_type}: {behaviors}") - - max_behavior_count_for_boost = BOOST_WEIGHTS_CONFIG["max_behavior_count_for_boost"] - - for behavior_type in ['click', 'add_cart', 'collect', 'purchase']: - behaviors = getattr(behavior_map, behavior_type, []) - if behaviors: - sku_ids = [b.skuId for b in behaviors[:max_behavior_count_for_boost]] - logger.debug(f"{log_prefix} Adding boost for {behavior_type} behaviors with {len(sku_ids)} SKUs: {sku_ids[:10]}") - functions.append({ - "filter": { - "terms": { - "sku_id": sku_ids - } - }, - "weight": BOOST_WEIGHTS_CONFIG["user_behavior_weight"] - }) - boost_cnt[2] += 1 # behavior_functions - - # ===== 品牌偏好提权 ===== - # 目的:基于用户偏好的品牌推荐商品,提高个性化匹配度 - # 逻辑:从用户画像base_info中提取brandCategoryIds,对相关品牌商品进行提权 - # 权重:从配置文件读取,默认1.1倍 - if user_profile.base_info.brandCategoryIds: - brand_ids = [x for x in user_profile.base_info.brandCategoryIds] - logger.debug(f"{log_prefix} Adding boost for brand preferences with {len(brand_ids)} brand_ids {brand_ids[:10]}") - functions.append({ - "filter": { - "terms": { - "brand_id": brand_ids - } - }, - "weight": BOOST_WEIGHTS_CONFIG["brand_preference_weight"] - }) - boost_cnt[3] += 1 # brand_functions - - # ===== 类目偏好提权 ===== - # 目的:基于用户偏好的商品类目推荐相关商品,提高个性化匹配度 - # 逻辑:从用户画像statistics中提取category_group,对相关类目商品进行提权 - # 权重:从配置文件读取,默认1.08倍 - # 注意:当前功能已禁用,如需启用请将if False改为if True - if False: - if user_profile.statistics.category_group: - category_ids = [stat.keyId for stat in user_profile.statistics.category_group] - category_stats = [MessageToDict(stat) for stat in user_profile.statistics.category_group] - logger.debug(f"{log_prefix} Category preferences stats with {len(category_ids)} category_ids {category_ids[:10]}") - logger.debug(f"{log_prefix} Adding boost for category preferences with {len(category_ids)} category_ids {category_ids[:10]}") - functions.append({ - "filter": { - "terms": { - "category_id": category_ids - } - }, - "weight": BOOST_WEIGHTS_CONFIG["category_preference_weight"] - }) - boost_cnt[4] += 1 # category_functions - - # ===== 价格区间偏好提权 ===== - # 目的:基于用户偏好的价格区间推荐相关商品,提高个性化匹配度 - # 逻辑:从用户画像statistics中提取price_group,对相关价格区间商品进行提权 - # 权重:从配置文件读取,默认1.1倍 - # 注意:当前功能已禁用,如需启用请将if False改为if True - if False: - if user_profile.statistics.price_group: - price_ranges = [stat.keyId for stat in user_profile.statistics.price_group] - price_stats = [MessageToDict(stat) for stat in user_profile.statistics.price_group] - logger.debug(f"{log_prefix} Price range preferences stats: {price_stats}") - logger.debug(f"{log_prefix} Adding boost for price range preferences: {price_ranges}") - functions.append({ - "filter": { - "terms": { - "price_range": price_ranges - } - }, - "weight": BOOST_WEIGHTS_CONFIG["price_range_preference_weight"] - }) - boost_cnt[5] += 1 # price_range_functions - - # ===== 客户商品结构类目提权 ===== - # 目的:基于客户商品结构分析,推荐符合客户业务模式的类目商品 - # 逻辑:从用户画像base_info中提取customerGoodsStructure,分析客户的类目偏好 - # 权重:从配置文件读取,默认1.08倍 - # 注意:categoryIds对应前端类目,不是ES的category_id字段 - if user_profile.base_info.customerGoodsStructure: - structure_list = [MessageToDict(s) for s in user_profile.base_info.customerGoodsStructure] - logger.debug(f"{log_prefix} Customer goods structure details: {structure_list}") - for structure in user_profile.base_info.customerGoodsStructure: - if structure.categoryIds: - logger.debug(f"{log_prefix} Adding boost for category IDs in structure length {len(structure.categoryIds)} category_ids {structure.categoryIds[:10]}") - functions.append({ - "filter": { - "terms": { - # 注意: user_profile.base_info.customerGoodsStructure.categoryIds 对应的是前端类目 而不是 ES 的 category_id - "sale_category_all": [x for x in structure.categoryIds] - } - }, - "weight": BOOST_WEIGHTS_CONFIG["customer_structure_category_weight"] - }) - boost_cnt[4] += 1 # category_functions - if structure.priceBetween: - # logger.debug(f"Adding boost for price range in structure: {structure.priceBetween}") - # not support yet - pass - - # Calculate total functions count - total_functions = len(functions) - - # Log boost query statistics - logger.info(f"{log_prefix} ===== ES查询提权函数统计 =====") - logger.info(f"{log_prefix} 总提权函数数量: {total_functions}") - logger.info(f"{log_prefix} 标签提权函数: {boost_cnt[0]}") - logger.info(f"{log_prefix} 新品提权函数: {boost_cnt[1]}") - logger.info(f"{log_prefix} 行为提权函数: {boost_cnt[2]}") - logger.info(f"{log_prefix} 品牌提权函数: {boost_cnt[3]}") - logger.info(f"{log_prefix} 类目提权函数: {boost_cnt[4]}") - logger.info(f"{log_prefix} 价格区间提权函数: {boost_cnt[5]}") - logger.info(f"{log_prefix} 视频提权函数: {boost_cnt[6]}") - logger.info(f"{log_prefix} 平台类目排名提权函数: {boost_cnt[7]}") - logger.info(f"{log_prefix} ===== ES查询提权函数统计结束 =====") - - if not functions: - logger.debug(f"{log_prefix} No boost functions generated") - return {} - - score_mode = FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY if search_context.search_query or search_context.query else FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY - - boost_query = { - "function_score": { - "functions": functions, - "score_mode": score_mode, - "boost_mode": "multiply" - } - } - - # logger.debug(f"Generated boost query: {json.dumps(boost_query, ensure_ascii=False)}") - return boost_query - - def get_boost_value(self, tag_id: Optional[int] = None, tag_name: Optional[str] = None, platform: Optional[str] = None) -> float: - """ - Get the boost value for a given tag ID or name. - Returns 1.0 if no boost is configured or if platform doesn't match. - - Args: - tag_id: Tag ID to look up - tag_name: Tag name to look up - platform: Business platform for filtering - """ - if tag_id is not None: - for config in self.boost_configs: - if config.tag_id == tag_id: - # Check platform compatibility - if platform and config.platform != platform: - logger.debug(f"Platform mismatch for tag_id {tag_id}: requested platform {platform}, tag platform {config.platform}") - return 1.0 - logger.debug(f"Found boost value {config.boost_value} for tag_id {tag_id}") - return config.boost_value - - if tag_name is not None: - for config in self.boost_configs: - if config.tag_name == tag_name: - # Check platform compatibility - if platform and config.platform != platform: - logger.debug(f"Platform mismatch for tag_name {tag_name}: requested platform {platform}, tag platform {config.platform}") - return 1.0 - logger.debug(f"Found boost value {config.boost_value} for tag_name {tag_name}") - return config.boost_value - - logger.debug(f"No boost value found for tag_id={tag_id}, tag_name={tag_name}, platform={platform}") - return 1.0 \ No newline at end of file diff --git a/db_service.py b/db_service.py deleted file mode 100644 index 19b7619..0000000 --- a/db_service.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -数据库连接服务模块 -提供统一的数据库连接接口 -""" -from sqlalchemy import create_engine -from urllib.parse import quote_plus -import logging - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def create_db_connection(host, port, database, username, password): - """ - 创建数据库连接 - - Args: - host: 数据库主机地址 - port: 端口 - database: 数据库名 - username: 用户名 - password: 密码 - - Returns: - SQLAlchemy engine对象 - """ - try: - # 对密码进行URL编码,处理特殊字符 - encoded_password = quote_plus(password) - - # 构建连接字符串 - connection_string = f'mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}' - - # 创建引擎 - engine = create_engine( - connection_string, - pool_pre_ping=True, # 连接池预检 - pool_recycle=3600, # 连接回收时间 - echo=False - ) - - logger.info(f"Database connection created successfully: {host}:{port}/{database}") - return engine - - except Exception as e: - logger.error(f"Failed to create database connection: {e}") - raise - diff --git a/refers/boost_strategy.py b/refers/boost_strategy.py new file mode 100644 index 0000000..efd6416 --- /dev/null +++ b/refers/boost_strategy.py @@ -0,0 +1,429 @@ +from typing import Dict, List, Optional, Any +from dataclasses import dataclass +import json +from src.services.user_profile import UserProfile +from config.logging_config import get_app_logger +from google.protobuf.json_format import MessageToDict +import logging +from config.app_config import BOOST_CONFIGS, FRESH_BOOST_CONFIG, BOOST_WEIGHTS_CONFIG, FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY, FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY + +logger = get_app_logger(__name__) + +@dataclass +class BoostConfig: + tag_id: int + tag_name: str + tag_type: Optional[str] + boost_value: float + es_intent_boost_value: float + reranker_intent_boost_value: float + intent_names: List[str] + platform: List[str] + + + +# 标签ID 标签名称 标签类型 提权幅度 +# 156 行业新品 销售属性 1.1 +# 157 爆品/时货 销售属性 1.1 +# 158 常年热销 销售属性 1.1 +# 159 质量好 销售属性 1.1 +# 162 小惠商品 null 1.05 +# 163 优惠商品 null 1.1 +# 164 特惠商品 null 1.3 +# 165 超惠商品 null 1.15 + +# 3 一箱快出 null +# 5 推荐 null +# 10 人气热销 null +# 14 特色精选 null +# 17 赠品(新)(补柜专区) null +# 20 新品首发 null +# 21 0316-首发新品【新品页面专用】 null +# 25 0316essa新品-【新品页面专用】 null +# 26 essaone新品 null +# 27 0316最近上架(专区) null +# 40 一箱 null +# 41 快出 null +# 42 上市新品(报表)&(专区) null +# 43 9.20内销(专区) null +# 82 半箱拼团 null + +# # 季节性,打入到 关键词字段 做匹配 +# 149 年货 销售时节 +# 150 万圣节 销售时节 +# 151 圣诞节 销售时节 +# 152 开学季 销售时节 +# 153 复活节 销售时节 +# 154 三八节 销售时节 +# 155 情人节 销售时节 + + +# TODO 根据 前端参数 客户类型 销售区域 做提权 +# 标签ID 标签名称 标签类型 +# 137 东欧市场 销售区域 +# 138 欧美市场 销售区域 +# 139 南美市场 销售区域 +# 140 中东市场 销售区域 +# 141 东南亚市场 销售区域 +# 142 综合商超 客户类型 +# 143 专业商超 客户类型 +# 144 品牌商 客户类型 +# 145 公司批发商 客户类型 +# 146 市场批发商 客户类型 +# 147 电商 客户类型 +# 148 赠品商 客户类型 + +class SearchBoostStrategy: + def __init__(self): + # Initialize boost configurations from config file + self.boost_configs: List[BoostConfig] = [ + BoostConfig( + config["tag_id"], + config["tag_name"], + config["tag_type"], + config["boost_value"], + config["es_intent_boost_value"], + config["reranker_intent_boost_value"], + config["intent_names"], + config["platform"] + ) for config in BOOST_CONFIGS + ] + + # Create lookup dictionaries for faster access + self.tag_id_to_boost: Dict[int, float] = { + config.tag_id: config.boost_value for config in self.boost_configs + } + + self.tag_name_to_boost: Dict[str, float] = { + config.tag_name: config.boost_value for config in self.boost_configs + } + + # Create intent-based boost lookup for ES search + self.intent_to_boost: Dict[str, float] = {} + for config in self.boost_configs: + for intent_name in config.intent_names: + self.intent_to_boost[intent_name] = config.es_intent_boost_value + + logger.debug(f"Initialized boost configs: {json.dumps([vars(c) for c in self.boost_configs], ensure_ascii=False)}") + + def _get_platform_boost_configs(self, business_platform: Optional[str]) -> List[BoostConfig]: + """ + Filters boost configurations based on the business platform. + Returns a list of BoostConfig objects that match the platform. + """ + if not business_platform: + return self.boost_configs + return [ + config for config in self.boost_configs + if business_platform in config.platform + ] + + def get_boost_query(self, user_profile: Optional[UserProfile] = None, label_field_name: Optional[str] = None, query_intents: Optional[List[str]] = None, business_platform: Optional[str] = None, search_context: Optional[Any] = None) -> dict: + """ + Generate the Elasticsearch boost query based on configured boost values and user profiles. + Returns a function_score query that only affects scoring without impacting recall. + + Args: + user_profile: User profile for behavior-based boosting + label_field_name: Field name for label-based boosting + query_intents: Detected query intents for intent-based boosting + business_platform: Business platform for platform-based filtering + search_context: Search context containing business platform and sale category information + """ + log_prefix = search_context.format_log_prefix() if search_context else "" + functions = [] + + # Initialize boost query counters using int array for better performance + # boost_cnt[0]: tag_functions, boost_cnt[1]: fresh_functions, boost_cnt[2]: behavior_functions + # boost_cnt[3]: brand_functions, boost_cnt[4]: category_functions, boost_cnt[5]: price_range_functions + # boost_cnt[6]: video_functions, boost_cnt[7]: platform_category_functions + boost_cnt = [0] * 8 + + # Get platform-filtered boost configs + platform_boost_configs = self._get_platform_boost_configs(business_platform) + + # Add boost for tag IDs - use dynamic field name and platform filtering + if label_field_name: + for config in platform_boost_configs: + tag_id = config.tag_id + boost_value = config.boost_value + + # Check if this tag should get intent-based boost + final_boost_value = boost_value + if query_intents: + # Check if any detected intent matches this tag's intent_names + for intent in query_intents: + if intent in config.intent_names: + final_boost_value = config.es_intent_boost_value + logger.debug(f"{log_prefix} Intent-based boost for tag_id {tag_id}: {boost_value} -> {final_boost_value} (intent: {intent})") + break + + functions.append({ + "filter": { + "term": { + label_field_name: tag_id + } + }, + "weight": final_boost_value + }) + boost_cnt[0] += 1 # tag_functions + logger.debug(f"{log_prefix} Added {boost_cnt[0]} tag-based boost functions using field: {label_field_name} for platform: {business_platform}") + if query_intents: + logger.info(f"{log_prefix} Applied intent-based boost for intents: {query_intents}") + else: + logger.warning(f"{log_prefix} Label field name is empty, cannot apply tag boost") + logger.warning(f"{log_prefix} Tag boost functions will be skipped - label_field_name is required for dynamic field name") + + # Add fresh boost using exact sigmoid formula + # Check if new product intent is detected and apply power factor + fresh_factor = FRESH_BOOST_CONFIG["default_factor"] + if query_intents: + for intent in query_intents: + if intent == FRESH_BOOST_CONFIG["new_product_intent"]: + fresh_factor = FRESH_BOOST_CONFIG["es_intent_factor"] + logger.debug(f"{log_prefix} New product intent detected: {intent}, applying ES fresh boost factor: {fresh_factor}") + break + + functions.append({ + "field_value_factor": { + "field": "on_sell_days_boost", + "missing": 1.0, + "factor": fresh_factor + } + }) + boost_cnt[1] += 1 # fresh_functions + logger.debug(f"{log_prefix} Added fresh boost function with factor: {fresh_factor}") + + # Add video boost + functions.append({ + "filter": { + "term": { + "is_video": True + } + }, + "weight": BOOST_WEIGHTS_CONFIG["video_boost_weight"] + }) + boost_cnt[6] += 1 # video_functions + logger.debug(f"{log_prefix} Added video boost function with weight: {BOOST_WEIGHTS_CONFIG['video_boost_weight']}") + + # ===== 平台类目排名提权 ===== + if search_context and hasattr(search_context, 'businessPlatform') and hasattr(search_context, 'sale_category_id'): + if search_context.businessPlatform and search_context.sale_category_id: + platform_cate_top_keyword = f"{search_context.businessPlatform}_{search_context.sale_category_id}" + logger.debug(f"{log_prefix} Adding platform category ranking boost for keyword: {platform_cate_top_keyword}") + functions.append({ + "filter": { + "term": { + "op_ranking_platform_cate_list": platform_cate_top_keyword + } + }, + "weight": BOOST_WEIGHTS_CONFIG["platform_category_ranking_weight"] + }) + boost_cnt[7] += 1 # platform_category_functions + logger.debug(f"{log_prefix} Added platform category ranking boost function for: {platform_cate_top_keyword}") + else: + logger.debug(f"{log_prefix} Skipping platform category boost - businessPlatform: {getattr(search_context, 'businessPlatform', 'None')}, sale_category_id: {getattr(search_context, 'sale_category_id', 'None')}") + else: + logger.debug(f"{log_prefix} Skipping platform category boost - search_context not provided or missing required fields") + + # ===== 用户画像个性化提权 ===== + # 基于用户画像信息进行个性化商品推荐,提高搜索结果的个性化匹配度 + # 包括:用户行为、品牌偏好、类目偏好、价格偏好、客户商品结构等维度 + if user_profile: + logger.debug(f"{log_prefix} Adding biz boosting based on user profile") + logger.debug(f"{log_prefix} User profile base info: {MessageToDict(user_profile.base_info)}") + # logger.debug(f"User profile statistics: {MessageToDict(user_profile.statistics)}") + + # Add detailed debug logging for statistics + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"{log_prefix} User profile statistics:") + stats_dict = MessageToDict(user_profile.statistics) + for key, value in stats_dict.items(): + if isinstance(value, list): + logger.debug(f"{log_prefix} Statistics {key}: {len(value)} items, first item: {value[0] if value else 'None'}") + else: + logger.debug(f"{log_prefix} Statistics {key}: {value}") + + # ===== 用户行为提权 ===== + # 逻辑:从用户画像中提取行为记录(点击、加购、收藏、购买) + # 限制:最多使用前N个行为记录,避免过多记录影响性能 + behavior_map = user_profile.behavior_map + # logger.debug(f"User behavior map: {MessageToDict(behavior_map)}") + + # Add detailed debug logging for behavior map + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"{log_prefix} User behavior map:") + behavior_dict = MessageToDict(behavior_map) + for behavior_type, behaviors in behavior_dict.items(): + if isinstance(behaviors, list): + logger.debug(f"{log_prefix} Behavior {behavior_type}: {len(behaviors)} items, first item: {behaviors[0] if behaviors else 'None'}") + else: + logger.debug(f"{log_prefix} Behavior {behavior_type}: {behaviors}") + + max_behavior_count_for_boost = BOOST_WEIGHTS_CONFIG["max_behavior_count_for_boost"] + + for behavior_type in ['click', 'add_cart', 'collect', 'purchase']: + behaviors = getattr(behavior_map, behavior_type, []) + if behaviors: + sku_ids = [b.skuId for b in behaviors[:max_behavior_count_for_boost]] + logger.debug(f"{log_prefix} Adding boost for {behavior_type} behaviors with {len(sku_ids)} SKUs: {sku_ids[:10]}") + functions.append({ + "filter": { + "terms": { + "sku_id": sku_ids + } + }, + "weight": BOOST_WEIGHTS_CONFIG["user_behavior_weight"] + }) + boost_cnt[2] += 1 # behavior_functions + + # ===== 品牌偏好提权 ===== + # 目的:基于用户偏好的品牌推荐商品,提高个性化匹配度 + # 逻辑:从用户画像base_info中提取brandCategoryIds,对相关品牌商品进行提权 + # 权重:从配置文件读取,默认1.1倍 + if user_profile.base_info.brandCategoryIds: + brand_ids = [x for x in user_profile.base_info.brandCategoryIds] + logger.debug(f"{log_prefix} Adding boost for brand preferences with {len(brand_ids)} brand_ids {brand_ids[:10]}") + functions.append({ + "filter": { + "terms": { + "brand_id": brand_ids + } + }, + "weight": BOOST_WEIGHTS_CONFIG["brand_preference_weight"] + }) + boost_cnt[3] += 1 # brand_functions + + # ===== 类目偏好提权 ===== + # 目的:基于用户偏好的商品类目推荐相关商品,提高个性化匹配度 + # 逻辑:从用户画像statistics中提取category_group,对相关类目商品进行提权 + # 权重:从配置文件读取,默认1.08倍 + # 注意:当前功能已禁用,如需启用请将if False改为if True + if False: + if user_profile.statistics.category_group: + category_ids = [stat.keyId for stat in user_profile.statistics.category_group] + category_stats = [MessageToDict(stat) for stat in user_profile.statistics.category_group] + logger.debug(f"{log_prefix} Category preferences stats with {len(category_ids)} category_ids {category_ids[:10]}") + logger.debug(f"{log_prefix} Adding boost for category preferences with {len(category_ids)} category_ids {category_ids[:10]}") + functions.append({ + "filter": { + "terms": { + "category_id": category_ids + } + }, + "weight": BOOST_WEIGHTS_CONFIG["category_preference_weight"] + }) + boost_cnt[4] += 1 # category_functions + + # ===== 价格区间偏好提权 ===== + # 目的:基于用户偏好的价格区间推荐相关商品,提高个性化匹配度 + # 逻辑:从用户画像statistics中提取price_group,对相关价格区间商品进行提权 + # 权重:从配置文件读取,默认1.1倍 + # 注意:当前功能已禁用,如需启用请将if False改为if True + if False: + if user_profile.statistics.price_group: + price_ranges = [stat.keyId for stat in user_profile.statistics.price_group] + price_stats = [MessageToDict(stat) for stat in user_profile.statistics.price_group] + logger.debug(f"{log_prefix} Price range preferences stats: {price_stats}") + logger.debug(f"{log_prefix} Adding boost for price range preferences: {price_ranges}") + functions.append({ + "filter": { + "terms": { + "price_range": price_ranges + } + }, + "weight": BOOST_WEIGHTS_CONFIG["price_range_preference_weight"] + }) + boost_cnt[5] += 1 # price_range_functions + + # ===== 客户商品结构类目提权 ===== + # 目的:基于客户商品结构分析,推荐符合客户业务模式的类目商品 + # 逻辑:从用户画像base_info中提取customerGoodsStructure,分析客户的类目偏好 + # 权重:从配置文件读取,默认1.08倍 + # 注意:categoryIds对应前端类目,不是ES的category_id字段 + if user_profile.base_info.customerGoodsStructure: + structure_list = [MessageToDict(s) for s in user_profile.base_info.customerGoodsStructure] + logger.debug(f"{log_prefix} Customer goods structure details: {structure_list}") + for structure in user_profile.base_info.customerGoodsStructure: + if structure.categoryIds: + logger.debug(f"{log_prefix} Adding boost for category IDs in structure length {len(structure.categoryIds)} category_ids {structure.categoryIds[:10]}") + functions.append({ + "filter": { + "terms": { + # 注意: user_profile.base_info.customerGoodsStructure.categoryIds 对应的是前端类目 而不是 ES 的 category_id + "sale_category_all": [x for x in structure.categoryIds] + } + }, + "weight": BOOST_WEIGHTS_CONFIG["customer_structure_category_weight"] + }) + boost_cnt[4] += 1 # category_functions + if structure.priceBetween: + # logger.debug(f"Adding boost for price range in structure: {structure.priceBetween}") + # not support yet + pass + + # Calculate total functions count + total_functions = len(functions) + + # Log boost query statistics + logger.info(f"{log_prefix} ===== ES查询提权函数统计 =====") + logger.info(f"{log_prefix} 总提权函数数量: {total_functions}") + logger.info(f"{log_prefix} 标签提权函数: {boost_cnt[0]}") + logger.info(f"{log_prefix} 新品提权函数: {boost_cnt[1]}") + logger.info(f"{log_prefix} 行为提权函数: {boost_cnt[2]}") + logger.info(f"{log_prefix} 品牌提权函数: {boost_cnt[3]}") + logger.info(f"{log_prefix} 类目提权函数: {boost_cnt[4]}") + logger.info(f"{log_prefix} 价格区间提权函数: {boost_cnt[5]}") + logger.info(f"{log_prefix} 视频提权函数: {boost_cnt[6]}") + logger.info(f"{log_prefix} 平台类目排名提权函数: {boost_cnt[7]}") + logger.info(f"{log_prefix} ===== ES查询提权函数统计结束 =====") + + if not functions: + logger.debug(f"{log_prefix} No boost functions generated") + return {} + + score_mode = FUNCTIONS_SCORE__SCORE_MODE__WHEN_HAS_QUERY if search_context.search_query or search_context.query else FUNCTIONS_SCORE__SCORE_MODE__WHEN_NO_QUERY + + boost_query = { + "function_score": { + "functions": functions, + "score_mode": score_mode, + "boost_mode": "multiply" + } + } + + # logger.debug(f"Generated boost query: {json.dumps(boost_query, ensure_ascii=False)}") + return boost_query + + def get_boost_value(self, tag_id: Optional[int] = None, tag_name: Optional[str] = None, platform: Optional[str] = None) -> float: + """ + Get the boost value for a given tag ID or name. + Returns 1.0 if no boost is configured or if platform doesn't match. + + Args: + tag_id: Tag ID to look up + tag_name: Tag name to look up + platform: Business platform for filtering + """ + if tag_id is not None: + for config in self.boost_configs: + if config.tag_id == tag_id: + # Check platform compatibility + if platform and config.platform != platform: + logger.debug(f"Platform mismatch for tag_id {tag_id}: requested platform {platform}, tag platform {config.platform}") + return 1.0 + logger.debug(f"Found boost value {config.boost_value} for tag_id {tag_id}") + return config.boost_value + + if tag_name is not None: + for config in self.boost_configs: + if config.tag_name == tag_name: + # Check platform compatibility + if platform and config.platform != platform: + logger.debug(f"Platform mismatch for tag_name {tag_name}: requested platform {platform}, tag platform {config.platform}") + return 1.0 + logger.debug(f"Found boost value {config.boost_value} for tag_name {tag_name}") + return config.boost_value + + logger.debug(f"No boost value found for tag_id={tag_id}, tag_name={tag_name}, platform={platform}") + return 1.0 \ No newline at end of file diff --git a/refers/db_service.py b/refers/db_service.py new file mode 100644 index 0000000..19b7619 --- /dev/null +++ b/refers/db_service.py @@ -0,0 +1,48 @@ +""" +数据库连接服务模块 +提供统一的数据库连接接口 +""" +from sqlalchemy import create_engine +from urllib.parse import quote_plus +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_db_connection(host, port, database, username, password): + """ + 创建数据库连接 + + Args: + host: 数据库主机地址 + port: 端口 + database: 数据库名 + username: 用户名 + password: 密码 + + Returns: + SQLAlchemy engine对象 + """ + try: + # 对密码进行URL编码,处理特殊字符 + encoded_password = quote_plus(password) + + # 构建连接字符串 + connection_string = f'mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}' + + # 创建引擎 + engine = create_engine( + connection_string, + pool_pre_ping=True, # 连接池预检 + pool_recycle=3600, # 连接回收时间 + echo=False + ) + + logger.info(f"Database connection created successfully: {host}:{port}/{database}") + return engine + + except Exception as e: + logger.error(f"Failed to create database connection: {e}") + raise + diff --git a/refers/user_profile.py b/refers/user_profile.py new file mode 100644 index 0000000..bf73d7e --- /dev/null +++ b/refers/user_profile.py @@ -0,0 +1,58 @@ +import sys +from pathlib import Path +import time + +# Add the project root to Python path +current_dir = Path(__file__).parent +project_root = current_dir.parent.parent +sys.path.append(str(project_root)) +sys.path.append(str(project_root / 'snapshot_pb/generated')) + +from typing import Optional +import redis +from config.app_config import REDIS_CONFIG +from config.logging_config import get_app_logger +from user_profile_pb2 import UserProfile + +logger = get_app_logger(__name__) + +class UserProfileManager: + def __init__(self): + self.redis_client = redis.Redis( + host=REDIS_CONFIG['host'], + port=REDIS_CONFIG['port'], + db=REDIS_CONFIG['snapshot_db'], + password=REDIS_CONFIG['password'], + decode_responses=False + ) + + def get_user_profile(self, uid: str) -> Optional[UserProfile]: + """Get user profile from Redis""" + logger.debug(f"Fetching user profile for uid: {uid}") + + profile_key = f"user_profile:{uid}" + if not self.redis_client.exists(profile_key): + logger.debug(f"No profile data found for uid: {uid}") + return None + + # Measure Redis fetch time + fetch_start = time.time() + profile_data = self.redis_client.get(profile_key) + fetch_time = time.time() - fetch_start + + if not profile_data: + logger.debug(f"No profile data found for uid: {uid}") + return None + + try: + # Measure deserialization time + deserialize_start = time.time() + profile = UserProfile() + profile.ParseFromString(profile_data) + deserialize_time = time.time() - deserialize_start + logger.info(f"REDIS_COST_TIME: key: {profile_key}, Response size: {len(profile_data)//1024}KB, Redis fetch: {fetch_time*1000:.2f}ms, Deserialization: {deserialize_time*1000:.2f}ms for uid: {uid}") + return profile + + except Exception as e: + logger.error(f"Error deserializing profile data for uid {uid}: {str(e)}") + return None \ No newline at end of file diff --git a/refers/user_profile_extractor.py b/refers/user_profile_extractor.py new file mode 100644 index 0000000..f9777b2 --- /dev/null +++ b/refers/user_profile_extractor.py @@ -0,0 +1,1006 @@ +""" +用户画像信息提取器 - 从UserProfile中提取相关信息并生成描述 +""" + +from typing import Dict, Any, Optional, List, NamedTuple +from dataclasses import dataclass +from datetime import datetime, timedelta +from collections import Counter +import re,math +from src.services.user_profile import UserProfile +from config.logging_config import get_app_logger +from src.chat_search.dict_loader import DictLoader +from config.chat_search_config import USER_PROFILE_BEHAVIOR_CONFIG, SESSION_CONFIG, ATTR_STATIS_DISPLAY_MIN_OPTION_COUNT, ATTR_STATIS_DISPLAY_MIN_PRODUCT_COUNT, get_display_text, USER_BEHAVIOR_STAT_IN_PROMPT, USER_SEARCH_HISTORY_IN_PROMPT + +logger = get_app_logger(__name__) + + +@dataclass +class BehaviorStatFieldConfig: + """行为统计字段配置""" + field_name: str # 原始字段名 + feature_prefix: str # 特征前缀 + display_name: str # 显示名称 + description_template: str # 描述模板 + max_items: int = 10 # 最大显示项目数 + is_repeated: bool = False # 是否为重复字段 + is_numeric: bool = False # 是否为数值字段 + is_time: bool = False # 是否为时间字段 + bucket_size: int = 10 # 分桶大小(仅用于数值字段) + enable: bool = True # 是否启用该字段,默认启用 + dict_name: str = None # 词典名称(可选) + + +@dataclass +class BehaviorStatsConfig: + """行为统计配置""" + # 行为权重定义 + behavior_weights: Dict[str, float] = None + + # 直接取值字段配置 + direct_fields: List[BehaviorStatFieldConfig] = None + + # 重复字段配置 + repeated_fields: List[BehaviorStatFieldConfig] = None + + # 数值字段配置 + numeric_fields: List[BehaviorStatFieldConfig] = None + + # 时间字段配置 + time_fields: List[BehaviorStatFieldConfig] = None + + # 行为统计配置 + behavior_summary_truncate_limit: int = 1000 # 行为统计截断限制 + + def __post_init__(self): + """初始化默认配置""" + # 从集中配置加载 + config = USER_PROFILE_BEHAVIOR_CONFIG + + if self.behavior_weights is None: + self.behavior_weights = config['behavior_weights'] + + if self.direct_fields is None: + self.direct_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['direct_fields']] + + if self.repeated_fields is None: + self.repeated_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['repeated_fields']] + + if self.numeric_fields is None: + self.numeric_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['numeric_fields']] + + if self.time_fields is None: + self.time_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['time_fields']] + + if self.behavior_summary_truncate_limit is None: + self.behavior_summary_truncate_limit = config['behavior_summary_truncate_limit'] + + +@dataclass +class UserProfileInfo: + """用户画像信息结构""" + # 基础信息 + sale_market_value: str = "" # 主要销售地区名 + nature_of_company_value: str = "" # 公司性质名 + customer_type: str = "" # 公司类型编码 + customer_type_value: str = "" # 公司类型名 + sell_channel_value: str = "" # 销售渠道名 + stores_number: int = 0 # 门店数量 + register_category_values: List[str] = None # 注册主要采购品类名 + auth_category_values: List[str] = None # 认证主要采购品类名 + purchase_quantity_by_year_value: str = "" # 采购规模名 + customer_goods_structures: List[Dict[str, str]] = None # 客户商品结构 + brand_category_values: List[str] = None # 客户品牌品类名 + delivery_type_value: str = "" # 主要出货方式名 + customs_import_scale: str = "" # 海关进口规模 + purchase_quantity: int = 0 # 单款采购箱数 + tax_clearance_type: str = "" # 清关方式编码 + tax_clearance_type_value: str = "" # 清关方式名 + category_values: List[str] = None # 经营类目名 + stores_number_offline: int = 0 # 线下门店数量 + year_sales_amount: str = "" # 年销售额 + main_market_values: List[str] = None # 主攻市场名 + main_area_values: List[str] = None # 外贸主攻区域名 + secondary_area_values: List[str] = None # 外贸次要区域名 + country_value: str = "" # 国家名 + + # 最近搜索词 + recent_search_keywords: List[str] = None # 最近10个搜索词(过滤掉isSearchFactory=true的) + + def __post_init__(self): + """初始化默认值""" + if self.register_category_values is None: + self.register_category_values = [] + if self.auth_category_values is None: + self.auth_category_values = [] + if self.customer_goods_structures is None: + self.customer_goods_structures = [] + if self.brand_category_values is None: + self.brand_category_values = [] + if self.category_values is None: + self.category_values = [] + if self.main_market_values is None: + self.main_market_values = [] + if self.main_area_values is None: + self.main_area_values = [] + if self.secondary_area_values is None: + self.secondary_area_values = [] + if self.recent_search_keywords is None: + self.recent_search_keywords = [] + + +class UserProfileExtractor: + """用户画像信息提取器""" + + def __init__(self): + """初始化提取器""" + self.behavior_stats_config = BehaviorStatsConfig() + self.dict_loader = DictLoader() + + def extract_user_profile_info(self, user_profile: UserProfile) -> UserProfileInfo: + """ + 从UserProfile中提取相关信息 + + Args: + user_profile: UserProfile对象 + + Returns: + UserProfileInfo: 提取的用户画像信息 + """ + if not user_profile or not user_profile.base_info: + logger.warning("[extract_user_profile_info] UserProfile or base_info is None") + return UserProfileInfo() + + base_info = user_profile.base_info + + # 提取基础信息 + profile_info = UserProfileInfo( + sale_market_value=base_info.saleMarketValue or "", + nature_of_company_value=base_info.natureOfCompanyValue or "", + customer_type=base_info.customerType or "", + customer_type_value=base_info.customerTypeValue or "", + sell_channel_value=base_info.sellChannelValue or "", + stores_number=base_info.storesNumber or 0, + register_category_values=[str(item) for item in base_info.registerCategoryValues] if base_info.registerCategoryValues else [], + auth_category_values=[str(item) for item in base_info.authCategoryValues] if base_info.authCategoryValues else [], + purchase_quantity_by_year_value=base_info.purchaseQuantityByYearValue or "", + customer_goods_structures=self._extract_customer_goods_structures(base_info.customerGoodsStructure), + brand_category_values=[str(item) for item in base_info.brandCategoryValues] if base_info.brandCategoryValues else [], + delivery_type_value=base_info.deliveryTypeValue or "", + customs_import_scale=base_info.customsImportScale or "", + purchase_quantity=base_info.purchaseQuantity or 0, + tax_clearance_type=base_info.taxClearanceType or "", + tax_clearance_type_value=base_info.taxClearanceTypeValue or "", + category_values=[str(item) for item in base_info.categoryValues] if base_info.categoryValues else [], + stores_number_offline=base_info.storesNumberOffline or 0, + year_sales_amount=base_info.yearSalesAmount or "", + main_market_values=[str(item) for item in base_info.mainMarketValues] if base_info.mainMarketValues else [], + main_area_values=[str(item) for item in base_info.mainAreaValues] if base_info.mainAreaValues else [], + secondary_area_values=[str(item) for item in base_info.secondaryAreaValues] if base_info.secondaryAreaValues else [], + country_value=base_info.countryValue or "", + recent_search_keywords=self._extract_recent_search_keywords(user_profile) + ) + + logger.info(f"[UserProfileExtractor.extract_user_profile_info] Extracted user profile info: {profile_info}") + return profile_info + + def _extract_customer_goods_structures(self, customer_goods_structures) -> List[Dict[str, str]]: + """ + 提取客户商品结构信息 + + Args: + customer_goods_structures: 客户商品结构列表 + + Returns: + List[Dict[str, str]]: 客户商品结构信息列表 + """ + if not customer_goods_structures: + return [] + + structures = [] + for structure in customer_goods_structures: + structure_info = { + 'price_between': structure.priceBetween or "", + 'goods_grade': structure.goodsGrade or "", + 'package_type': structure.packageType or "" + } + structures.append(structure_info) + + return structures + + + def generate_chat_search_intro(self, profile_info: UserProfileInfo) -> str: + """ + 生成导购语介绍 + + Args: + profile_info: UserProfileInfo对象 + + Returns: + str: 导购语介绍 + """ + if profile_info: + customer_type_value = profile_info.customer_type_value + # 地理位置信息 + location = profile_info.sale_market_value if profile_info.sale_market_value else profile_info.country_value + else: + customer_type_value = None + location = None + + # 生成导购语 + if not location and not customer_type_value: + return "你是一个跨境B2B选品顾问,请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" + elif not location: + return f"你是一个跨境B2B选品顾问,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" + elif not customer_type_value: + return f"你是一个跨境B2B选品顾问,熟悉{location}市场。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" + else: + return f"你是一个跨境B2B选品顾问,熟悉{location}市场,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" + + + def generate_natural_language_description(self, profile_info: UserProfileInfo) -> str: + """ + 生成用户基础信息的自然语言描述 + + Args: + profile_info: UserProfileInfo对象 + + Returns: + str: 自然语言描述 + """ + if not profile_info: + return "暂无用户画像信息" + + description_parts = [] + + # 基础公司信息 + if profile_info.customer_type_value: + description_parts.append(f"公司类型:{profile_info.customer_type_value}") + + if profile_info.nature_of_company_value: + description_parts.append(f"公司性质:{profile_info.nature_of_company_value}") + + if profile_info.sell_channel_value: + description_parts.append(f"销售渠道:{profile_info.sell_channel_value}") + + # 地理位置信息 + location_parts = [] + if profile_info.country_value: + location_parts.append(profile_info.country_value) + if profile_info.sale_market_value: + location_parts.append(profile_info.sale_market_value) + if location_parts: + description_parts.append(f"主要销售地区:{', '.join(location_parts)}") + + # 门店信息 + if profile_info.stores_number > 0: + description_parts.append(f"门店数量:{profile_info.stores_number}家") + if profile_info.stores_number_offline > 0: + description_parts.append(f"线下门店:{profile_info.stores_number_offline}家") + + # 采购信息 + if profile_info.purchase_quantity_by_year_value: + description_parts.append(f"采购规模:{profile_info.purchase_quantity_by_year_value}") + + if profile_info.purchase_quantity > 0: + description_parts.append(f"单款采购箱数:{profile_info.purchase_quantity}箱") + + # 年销售额 + if profile_info.year_sales_amount: + description_parts.append(f"年销售额:{profile_info.year_sales_amount}") + + # 类目信息 + if profile_info.register_category_values: + description_parts.append(f"注册采购品类:{', '.join(str(item) for item in profile_info.register_category_values)}") + + if profile_info.auth_category_values: + description_parts.append(f"认证采购品类:{', '.join(str(item) for item in profile_info.auth_category_values)}") + + if profile_info.category_values: + description_parts.append(f"经营类目:{', '.join(str(item) for item in profile_info.category_values)}") + + # 品牌信息 + if profile_info.brand_category_values: + description_parts.append(f"品牌品类:{', '.join(str(item) for item in profile_info.brand_category_values)}") + + # 市场信息 + if profile_info.main_market_values: + description_parts.append(f"主攻市场:{', '.join(str(item) for item in profile_info.main_market_values)}") + + if profile_info.main_area_values: + description_parts.append(f"外贸主攻区域:{', '.join(str(item) for item in profile_info.main_area_values)}") + + # 商品结构统计 + if profile_info.customer_goods_structures: + structure_descriptions = [] + for structure in profile_info.customer_goods_structures[:USER_PROFILE_BEHAVIOR_CONFIG['max_customer_goods_structures']]: # 只取前N个 + parts = [] + if structure['price_between']: + parts.append(f"价格区间{structure['price_between']}") + if structure['goods_grade']: + parts.append(f"产品档次{structure['goods_grade']}") + if structure['package_type']: + parts.append(f"包装类型{structure['package_type']}") + if parts: + structure_descriptions.append('、'.join(parts)) + + if structure_descriptions: + description_parts.append(f"商品结构统计:{'; '.join(structure_descriptions)}") + + # 物流信息 + if profile_info.delivery_type_value: + description_parts.append(f"主要出货方式:{profile_info.delivery_type_value}") + + if profile_info.tax_clearance_type_value: + description_parts.append(f"清关方式:{profile_info.tax_clearance_type_value}") + + if profile_info.customs_import_scale: + description_parts.append(f"海关进口规模:{profile_info.customs_import_scale}") + + # 组合成完整描述 + if description_parts: + return "\n".join(description_parts) + else: + return "暂无用户画像信息(信息为空)" + + def extract_and_describe(self, user_profile: UserProfile) -> str: + """ + 提取用户画像信息并生成完整的自然语言描述 + + Args: + user_profile: UserProfile对象 + + Returns: + 导购语, 完整的用户画像自然语言描述 + """ + # 提取基础信息 + profile_info = self.extract_user_profile_info(user_profile) + + # 生成导购语 + guide_intro = self.generate_chat_search_intro(profile_info) + + if not user_profile: + return guide_intro, "暂无用户画像信息" + + natural_description = self.generate_natural_language_description(profile_info) + + # 提取历史行为中的通用属性分布统计 + common_attribute_distribution = self.extract_common_attribute_distribution(user_profile) + + # 提取历史行为中每个商品的具体属性统计 + item_specific_attributes = self.extract_item_specific_attributes(user_profile) + + # 生成自然语言描述 + common_attribute_description = self.generate_common_attribute_distribution_description(common_attribute_distribution) + item_specific_attribute_description = self.generate_item_specific_attribute_description(item_specific_attributes) + + # 组织完整的描述 + language = getattr(self, 'language', 'zh') + + complete_description = f"{get_display_text('customer_background', language)}:\n{natural_description}" + + # 添加通用属性分布描述 + if USER_BEHAVIOR_STAT_IN_PROMPT: + if common_attribute_description: + complete_description += f"\n\n{get_display_text('historical_purchase_general_attributes', language)}:\n{common_attribute_description}" + + # 添加具体属性偏好描述 + if item_specific_attribute_description: + complete_description += f"\n\n{get_display_text('historical_purchase_category_specific_attributes', language)}:\n{item_specific_attribute_description}" + + # 添加最近搜索词信息 + # 提取最近搜索词 + if USER_SEARCH_HISTORY_IN_PROMPT: + recent_search_keywords = self._extract_recent_search_keywords(user_profile) + if recent_search_keywords: + complete_description += f"\n\n{get_display_text('recent_search_keywords', language)}:{', '.join(recent_search_keywords)}" + + return guide_intro, complete_description + + def extract_common_attribute_distribution(self, user_profile: UserProfile) -> Dict[str, Any]: + """ + 提取历史行为中的通用属性分布统计 + + Args: + user_profile: UserProfile对象 + + Returns: + Dict[str, Any]: 通用属性分布统计信息 + """ + if not user_profile or not user_profile.behavior_map: + logger.warning("[extract_common_attribute_distribution] UserProfile or behavior_map is None") + return {} + + behavior_map = user_profile.behavior_map + common_features = {} + + # 获取所有行为数据 + all_behaviors = [] + for behavior_type, behaviors in [ + ('click', behavior_map.click), + ('add_cart', behavior_map.add_cart), + ('collect', behavior_map.collect), + ('purchase', behavior_map.purchase) + ]: + logger.info(f"[UserProfileExtractor.extract_common_attribute_distribution] Extracted behavior_type {behavior_type} with {len(behaviors)} behaviors") + for behavior in behaviors: + all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) + + + # 1. 处理直接取值字段 + for field_config in self.behavior_stats_config.direct_fields: + if not field_config.enable: + continue + counter = Counter() + total_weight_for_field = 0 # 该字段的总权重(包括空值) + + for behavior, weight in all_behaviors: + total_weight_for_field += weight # 所有行为都计入总数 + if hasattr(behavior, field_config.field_name): + value = getattr(behavior, field_config.field_name) + if value: # 确保值不为空 + counter[str(value)] += weight # 转换为字符串 + # 如果值为空,不加入counter,但已计入total_weight_for_field + + # 计算空值权重 + empty_weight = total_weight_for_field - sum(counter.values()) + if empty_weight > 0: + counter['__empty__'] = empty_weight + + # 保存统计结果 + common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) + common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field + common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] + + # 2. 处理重复字段 + for field_config in self.behavior_stats_config.repeated_fields: + if not field_config.enable: + continue + counter = Counter() + total_weight_for_field = 0 # 该字段的总权重(包括空值) + + for behavior, weight in all_behaviors: + total_weight_for_field += weight # 所有行为都计入总数 + if hasattr(behavior, field_config.field_name) and getattr(behavior, field_config.field_name): + values = getattr(behavior, field_config.field_name) + has_valid_value = False + for value in values: + if value: + counter[str(value)] += weight + has_valid_value = True + # 如果没有有效值,不加入counter,但已计入total_weight_for_field + # 如果字段不存在或为空,不加入counter,但已计入total_weight_for_field + + # 计算空值权重 + empty_weight = total_weight_for_field - sum(counter.values()) + if empty_weight > 0: + counter['__empty__'] = empty_weight + + common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) + common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field + common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] + + # 3. 处理数值字段分桶统计 + for field_config in self.behavior_stats_config.numeric_fields: + if not field_config.enable: + continue + bucket_counter = Counter() + total_weight_for_field = 0 # 该字段的总权重(包括空值) + + for behavior, weight in all_behaviors: + total_weight_for_field += weight # 所有行为都计入总数 + if hasattr(behavior, field_config.field_name): + value = getattr(behavior, field_config.field_name) + if value and value > 0: + bucket = int(value / field_config.bucket_size) + bucket_counter[str(bucket)] += weight # 转换为字符串 + # 如果值为空或<=0,不加入counter,但已计入total_weight_for_field + + # 计算空值权重 + empty_weight = total_weight_for_field - sum(bucket_counter.values()) + if empty_weight > 0: + bucket_counter['__empty__'] = empty_weight + + common_features[f'{field_config.feature_prefix}_bucket_weighted_counts'] = dict(bucket_counter) + common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field + common_features[f'{field_config.feature_prefix}_top_buckets'] = [bucket for bucket, count in bucket_counter.most_common(10)] + + # 4. 处理时间差统计 + for field_config in self.behavior_stats_config.time_fields: + if not field_config.enable: + continue + time_bucket_counter = Counter() + total_weight_for_field = 0 # 该字段的总权重(包括空值) + + for behavior, weight in all_behaviors: + total_weight_for_field += weight # 所有行为都计入总数 + if hasattr(behavior, field_config.field_name) and hasattr(behavior, 'behaviorTime'): + time_value = getattr(behavior, field_config.field_name) + behavior_time = behavior.behaviorTime + + if time_value and behavior_time: + try: + # 解析时间字符串 + if isinstance(time_value, str): + time_obj = datetime.strptime(time_value, '%Y-%m-%d %H:%M:%S') + else: + time_obj = time_value + + if isinstance(behavior_time, str): + behavior_time_obj = datetime.strptime(behavior_time, '%Y-%m-%d %H:%M:%S') + else: + behavior_time_obj = behavior_time + + # 计算时间差(月数) + time_diff = behavior_time_obj - time_obj + months_diff = int(time_diff.days / 30) + + # 分桶:0-6个月,6-12个月,12-24个月,24个月以上 + if months_diff < 0: + bucket = 'future' + elif months_diff <= 6: + bucket = '0-6m' + elif months_diff <= 12: + bucket = '6-12m' + elif months_diff <= 24: + bucket = '12-24m' + else: + bucket = '24m+' + + time_bucket_counter[bucket] += weight + + except (ValueError, TypeError) as e: + logger.debug(f"Error parsing time for {field_config.field_name}: {e}") + continue + # 如果时间值为空或解析失败,不加入counter,但已计入total_weight_for_field + + # 计算空值权重 + empty_weight = total_weight_for_field - sum(time_bucket_counter.values()) + if empty_weight > 0: + time_bucket_counter['__empty__'] = empty_weight + + common_features[f'{field_config.feature_prefix}_time_bucket_weighted_counts'] = dict(time_bucket_counter) + common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field + common_features[f'{field_config.feature_prefix}_top_time_buckets'] = [bucket for bucket, count in time_bucket_counter.most_common(5)] + + # 5. 综合统计信息 + total_weighted_behaviors = sum(weight for _, weight in all_behaviors) + common_features['total_weighted_behaviors'] = total_weighted_behaviors + + # 各行为类型的统计 + behavior_type_counts = Counter() + for behavior_type, behaviors in [ + ('click', behavior_map.click), + ('add_cart', behavior_map.add_cart), + ('collect', behavior_map.collect), + ('purchase', behavior_map.purchase) + ]: + behavior_type_counts[behavior_type] = len(behaviors) + + common_features['behavior_type_counts'] = dict(behavior_type_counts) + + logger.info(f"Extracted behavior stats with {len(common_features)} feature groups") + return common_features + + def extract_item_specific_attributes(self, user_profile: UserProfile) -> Dict[str, Any]: + """ + 从历史行为中提取每个商品的具体属性统计 + + Args: + user_profile: UserProfile对象 + + Returns: + Dict[str, Any]: 商品具体属性统计信息 + """ + if not user_profile or not user_profile.behavior_map: + logger.warning("[extract_item_specific_attributes] UserProfile or behavior_map is None") + return {} + + behavior_map = user_profile.behavior_map + + # 获取所有行为数据 + all_behaviors = [] + for behavior_type, behaviors in [ + ('click', behavior_map.click), + ('add_cart', behavior_map.add_cart), + ('collect', behavior_map.collect), + ('purchase', behavior_map.purchase) + ]: + for behavior in behaviors: + all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) + + # 统计每个属性名称和属性值对应的权重 + attr_statistics = {} # {attr_name: {option_name: weight}} + + for behavior, weight in all_behaviors: + # 合并 spuAttributeList 和 skuAttributeList + merged_attributes = [] + + # 以 skuAttributeList 为基础 + if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: + merged_attributes.extend(behavior.skuAttributeList) + + # 加入 spuAttributeList,如果 attributeId 已存在则跳过 + existing_attr_ids = set() + if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: + existing_attr_ids = {attr.attributeId for attr in behavior.skuAttributeList} + + if hasattr(behavior, 'spuAttributeList') and behavior.spuAttributeList: + for attr in behavior.spuAttributeList: + if attr.attributeId not in existing_attr_ids: + merged_attributes.append(attr) + existing_attr_ids.add(attr.attributeId) + + # 统计合并后的属性 + for attr in merged_attributes: + attr_id = attr.attributeId + option_id = attr.optionId + + # 获取属性名称 + attr_name = self.dict_loader.get_name('spu_attribute', str(attr_id)) + if not attr_name: + attr_name = self.dict_loader.get_name('sku_attribute', str(attr_id)) + if not attr_name: + attr_name = f"属性{attr_id}" + + # 获取属性值名称 + option_name = self.dict_loader.get_name('spu_attribute_option', str(option_id)) + if not option_name: + option_name = self.dict_loader.get_name('sku_attribute_option', str(option_id)) + if not option_name: + option_name = f"选项{option_id}" + + # 跳过无效的属性值 + if option_name == '无' or not option_name: + continue + + # 统计 + if attr_name not in attr_statistics: + attr_statistics[attr_name] = {} + + if option_name not in attr_statistics[attr_name]: + attr_statistics[attr_name][option_name] = 0 + + attr_statistics[attr_name][option_name] += weight + + if not attr_statistics: + return {} + + # 生成属性统计特征 + attribute_features = {} + + # 计算每个属性的总权重并排序 + attr_with_total = [ + (attr_name, options_dict, sum(options_dict.values())) + for attr_name, options_dict in attr_statistics.items() + ] + + # 按总权重排序,取前10个属性 + sorted_attrs = sorted(attr_with_total, key=lambda x: x[2], reverse=True) + + for attr_name, options_dict, total_weight in sorted_attrs: + # 按权重排序选项,取前5个 + sorted_options = sorted(options_dict.items(), key=lambda x: x[1], reverse=True) + + # 生成特征名称(使用属性名称的拼音或ID作为前缀) + attr_feature_prefix = f"attr_{attr_name.replace(' ', '_').replace(':', '_')}" + + attribute_features[f'{attr_feature_prefix}_weighted_counts'] = dict(options_dict) + attribute_features[f'{attr_feature_prefix}_total_weight'] = total_weight + attribute_features[f'{attr_feature_prefix}_top_items'] = [item for item, count in sorted_options] + + # 添加总体属性统计 + total_attribute_weight = sum(attr[2] for attr in sorted_attrs) + attribute_features['attribute_total_weight'] = total_attribute_weight + attribute_features['attribute_attr_count'] = len(sorted_attrs) + + logger.info(f"Extracted attribute statistics with {len(attribute_features)} attribute feature groups") + return attribute_features + + def generate_common_attribute_distribution_description(self, common_attribute_distribution: Dict[str, Any]) -> str: + """ + 生成通用属性分布统计的自然语言描述 + + Args: + common_attribute_distribution: 通用属性分布统计信息 + + Returns: + str: 自然语言描述 + """ + if not common_attribute_distribution: + return "暂无通用属性分布统计信息" + + description_parts = [] + + # 0. 行为总述(放在最前面) + if 'behavior_type_counts' in common_attribute_distribution: + behavior_counts = common_attribute_distribution['behavior_type_counts'] + total_behaviors = sum(behavior_counts.values()) + + if total_behaviors > 0: + behavior_summary_parts = [] + + # 检查是否达到截断限制 + if total_behaviors >= self.behavior_stats_config.behavior_summary_truncate_limit: + behavior_summary_parts.append(f"该用户有超过{self.behavior_stats_config.behavior_summary_truncate_limit}次行为") + else: + behavior_summary_parts.append(f"该用户有{total_behaviors}次行为") + + # 添加具体行为类型统计 + behavior_details = [] + if behavior_counts.get('click', 0) > 0: + behavior_details.append(f"{behavior_counts['click']}次点击") + if behavior_counts.get('add_cart', 0) > 0: + behavior_details.append(f"{behavior_counts['add_cart']}次加购") + if behavior_counts.get('collect', 0) > 0: + behavior_details.append(f"{behavior_counts['collect']}次收藏") + if behavior_counts.get('purchase', 0) > 0: + behavior_details.append(f"{behavior_counts['purchase']}次购买") + + if behavior_details: + behavior_summary_parts.append(f"包括{', '.join(behavior_details)}") + + description_parts.append(''.join(behavior_summary_parts)) + + # 1. 处理直接取值字段描述 + for field_config in self.behavior_stats_config.direct_fields: + if not field_config.enable: + continue + weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' + total_weight_key = f'{field_config.feature_prefix}_total_weight' + + if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: + weighted_counts = common_attribute_distribution[weighted_counts_key] + total_weight = common_attribute_distribution[total_weight_key] + + if total_weight > 0: + # 生成带占比的描述 + items_with_percentage = [] + for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: + percentage = (count / total_weight) * 100 + # 词典映射 + if item == '__empty__': + display_name = '空值' + elif field_config.dict_name: + display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) + else: + display_name = str(item) + + items_with_percentage.append(f"{display_name}({percentage:.1f}%)") + + if items_with_percentage: + description = field_config.description_template.format( + display_name=field_config.display_name, + values=', '.join(items_with_percentage) + ) + description_parts.append(description) + + # 2. 处理重复字段描述 + for field_config in self.behavior_stats_config.repeated_fields: + if not field_config.enable: + continue + weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' + total_weight_key = f'{field_config.feature_prefix}_total_weight' + + if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: + weighted_counts = common_attribute_distribution[weighted_counts_key] + total_weight = common_attribute_distribution[total_weight_key] + + if total_weight > 0: + # 生成带占比的描述 + items_with_percentage = [] + for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: + percentage = (count / total_weight) * 100 + # 词典映射 + if item == '__empty__': + display_name = '空值' + elif field_config.dict_name: + display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) + else: + display_name = str(item) + + items_with_percentage.append(f"{display_name}({percentage:.1f}%)") + + if items_with_percentage: + description = field_config.description_template.format( + display_name=field_config.display_name, + values=', '.join(items_with_percentage) + ) + description_parts.append(description) + + # 3. 处理数值字段描述 + for field_config in self.behavior_stats_config.numeric_fields: + if not field_config.enable: + continue + bucket_counts_key = f'{field_config.feature_prefix}_bucket_weighted_counts' + total_weight_key = f'{field_config.feature_prefix}_total_weight' + + if bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: + bucket_counts = common_attribute_distribution[bucket_counts_key] + total_weight = common_attribute_distribution[total_weight_key] + + if total_weight > 0: + # 生成带占比的描述 + ranges_with_percentage = [] + for bucket, count in sorted(bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: + percentage = (count / total_weight) * 100 + + if bucket == '__empty__': + range_desc = '空值' + else: + range_desc = f"{int(bucket)*field_config.bucket_size}-{(int(bucket)+1)*field_config.bucket_size}" + + ranges_with_percentage.append(f"{range_desc}({percentage:.1f}%)") + + if ranges_with_percentage: + description = field_config.description_template.format( + display_name=field_config.display_name, + values=', '.join(ranges_with_percentage) + ) + description_parts.append(description) + + # 4. 处理时间字段描述 + for field_config in self.behavior_stats_config.time_fields: + if not field_config.enable: + continue + time_bucket_counts_key = f'{field_config.feature_prefix}_time_bucket_weighted_counts' + total_weight_key = f'{field_config.feature_prefix}_total_weight' + + if time_bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: + time_bucket_counts = common_attribute_distribution[time_bucket_counts_key] + total_weight = common_attribute_distribution[total_weight_key] + + if total_weight > 0: + # 生成带占比的描述 + time_descriptions_with_percentage = [] + for bucket, count in sorted(time_bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: + percentage = (count / total_weight) * 100 + bucket_str = str(bucket) + + if bucket_str == '__empty__': + time_desc = '空值' + elif bucket_str == '0-6m': + time_desc = '半年内' + elif bucket_str == '6-12m': + time_desc = '半年到一年' + elif bucket_str == '12-24m': + time_desc = '1-2年' + elif bucket_str == '24m+': + time_desc = '2年+' + elif bucket_str == 'future': + time_desc = '错误时间' + else: + time_desc = bucket_str + + time_descriptions_with_percentage.append(f"{time_desc}({percentage:.1f}%)") + + if time_descriptions_with_percentage: + description = field_config.description_template.format( + display_name=field_config.display_name, + values=', '.join(time_descriptions_with_percentage) + ) + description_parts.append(description) + + # 组合成完整描述 + if description_parts: + return "\n".join(description_parts) + else: + return "" + + def generate_item_specific_attribute_description(self, item_specific_attributes: Dict[str, Any]) -> str: + """ + 生成商品具体属性统计的自然语言描述 + + Args: + item_specific_attributes: 商品具体属性统计信息 + + Returns: + str: 商品具体属性统计的自然语言描述 + """ + if not item_specific_attributes: + return "暂无商品具体属性统计信息。" + + descriptions = [] + + # 获取所有属性相关的特征 + attr_features = {} + for key, value in item_specific_attributes.items(): + if key.startswith('attr_') and key.endswith('_weighted_counts'): + attr_name = key.replace('_weighted_counts', '').replace('attr_', '') + attr_features[attr_name] = value + + if not attr_features: + return "暂无有效属性统计信息。" + + # 按总权重排序属性 + sorted_attrs = [] + for attr_name, weighted_counts in attr_features.items(): + total_weight = sum(weighted_counts.values()) + sorted_attrs.append((attr_name, weighted_counts, total_weight)) + + sorted_attrs.sort(key=lambda x: x[2], reverse=True) + + # 生成描述 + max_attrs = USER_PROFILE_BEHAVIOR_CONFIG['max_attributes_display'] + max_options = USER_PROFILE_BEHAVIOR_CONFIG['max_options_per_attribute'] + for attr_name, weighted_counts, total_weight in sorted_attrs[:max_attrs]: # 取前N个属性 + # 按权重排序选项,取前N个 + sorted_options = sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:max_options] + + option_texts = [] + for option_name, weight in sorted_options: + if option_name != '__empty__': + # 计算百分比 + percentage = (weight / total_weight) * 100 + option_texts.append(f"{option_name}({percentage:.1f}%)") + + if option_texts: + desc = f"• {attr_name}: {', '.join(option_texts)}" + descriptions.append(desc) + + if descriptions: + return "\n".join(descriptions) + return "暂无有效属性统计信息。" + + def _extract_recent_search_keywords(self, user_profile: UserProfile) -> List[str]: + """ + 提取最近10个搜索词(过滤掉isSearchFactory=true的) + + Args: + user_profile: UserProfile对象 + + Returns: + List[str]: 最近10个搜索词列表 + """ + if not user_profile or not user_profile.behavior_map: + return [] + + search_keywords = user_profile.behavior_map.search_keyword + if not search_keywords: + return [] + + # 过滤、去重并收集最近10个搜索词 + seen_keywords = set() + recent_keywords = [] + for search_behavior in search_keywords: + if not search_behavior.isSearchFactory and search_behavior.keyword: + keyword = search_behavior.keyword.strip() + + # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 + if self._is_valid_search_keyword(keyword): + if keyword not in seen_keywords: + seen_keywords.add(keyword) + recent_keywords.append(keyword) + if len(recent_keywords) >= SESSION_CONFIG['max_recent_search_keywords']: # 达到最大数量就停止 + break + + logger.info(f"[UserProfileExtractor._extract_recent_search_keywords] Extracted {len(recent_keywords)} recent search keywords") + return recent_keywords + + def _is_valid_search_keyword(self, keyword: str) -> bool: + """ + 判断搜索关键词是否有效 + + Args: + keyword: 搜索关键词 + + Returns: + bool: 是否有效 + """ + if not keyword or keyword.strip() == '': + return False + + # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 + # 使用正则表达式匹配:只包含数字、下划线、减号、空白字符的字符串 + if re.match(r'^[\d\s_-]+$', keyword): + return False + + # 只有一个单词(split后只有一个)、并且这个单词里面既包含数字又包含字母 (转小写后 既有小写字母、又有数字) + if len(keyword.split()) == 1: + if re.match(r'^[a-z0-9]+$', keyword.lower()): + return False + # 包含数字和- + if re.match(r'^[0-9-]+$', keyword): + return False + + return True \ No newline at end of file diff --git a/user_profile.py b/user_profile.py deleted file mode 100644 index bf73d7e..0000000 --- a/user_profile.py +++ /dev/null @@ -1,58 +0,0 @@ -import sys -from pathlib import Path -import time - -# Add the project root to Python path -current_dir = Path(__file__).parent -project_root = current_dir.parent.parent -sys.path.append(str(project_root)) -sys.path.append(str(project_root / 'snapshot_pb/generated')) - -from typing import Optional -import redis -from config.app_config import REDIS_CONFIG -from config.logging_config import get_app_logger -from user_profile_pb2 import UserProfile - -logger = get_app_logger(__name__) - -class UserProfileManager: - def __init__(self): - self.redis_client = redis.Redis( - host=REDIS_CONFIG['host'], - port=REDIS_CONFIG['port'], - db=REDIS_CONFIG['snapshot_db'], - password=REDIS_CONFIG['password'], - decode_responses=False - ) - - def get_user_profile(self, uid: str) -> Optional[UserProfile]: - """Get user profile from Redis""" - logger.debug(f"Fetching user profile for uid: {uid}") - - profile_key = f"user_profile:{uid}" - if not self.redis_client.exists(profile_key): - logger.debug(f"No profile data found for uid: {uid}") - return None - - # Measure Redis fetch time - fetch_start = time.time() - profile_data = self.redis_client.get(profile_key) - fetch_time = time.time() - fetch_start - - if not profile_data: - logger.debug(f"No profile data found for uid: {uid}") - return None - - try: - # Measure deserialization time - deserialize_start = time.time() - profile = UserProfile() - profile.ParseFromString(profile_data) - deserialize_time = time.time() - deserialize_start - logger.info(f"REDIS_COST_TIME: key: {profile_key}, Response size: {len(profile_data)//1024}KB, Redis fetch: {fetch_time*1000:.2f}ms, Deserialization: {deserialize_time*1000:.2f}ms for uid: {uid}") - return profile - - except Exception as e: - logger.error(f"Error deserializing profile data for uid {uid}: {str(e)}") - return None \ No newline at end of file diff --git a/user_profile_extractor.py b/user_profile_extractor.py deleted file mode 100644 index f9777b2..0000000 --- a/user_profile_extractor.py +++ /dev/null @@ -1,1006 +0,0 @@ -""" -用户画像信息提取器 - 从UserProfile中提取相关信息并生成描述 -""" - -from typing import Dict, Any, Optional, List, NamedTuple -from dataclasses import dataclass -from datetime import datetime, timedelta -from collections import Counter -import re,math -from src.services.user_profile import UserProfile -from config.logging_config import get_app_logger -from src.chat_search.dict_loader import DictLoader -from config.chat_search_config import USER_PROFILE_BEHAVIOR_CONFIG, SESSION_CONFIG, ATTR_STATIS_DISPLAY_MIN_OPTION_COUNT, ATTR_STATIS_DISPLAY_MIN_PRODUCT_COUNT, get_display_text, USER_BEHAVIOR_STAT_IN_PROMPT, USER_SEARCH_HISTORY_IN_PROMPT - -logger = get_app_logger(__name__) - - -@dataclass -class BehaviorStatFieldConfig: - """行为统计字段配置""" - field_name: str # 原始字段名 - feature_prefix: str # 特征前缀 - display_name: str # 显示名称 - description_template: str # 描述模板 - max_items: int = 10 # 最大显示项目数 - is_repeated: bool = False # 是否为重复字段 - is_numeric: bool = False # 是否为数值字段 - is_time: bool = False # 是否为时间字段 - bucket_size: int = 10 # 分桶大小(仅用于数值字段) - enable: bool = True # 是否启用该字段,默认启用 - dict_name: str = None # 词典名称(可选) - - -@dataclass -class BehaviorStatsConfig: - """行为统计配置""" - # 行为权重定义 - behavior_weights: Dict[str, float] = None - - # 直接取值字段配置 - direct_fields: List[BehaviorStatFieldConfig] = None - - # 重复字段配置 - repeated_fields: List[BehaviorStatFieldConfig] = None - - # 数值字段配置 - numeric_fields: List[BehaviorStatFieldConfig] = None - - # 时间字段配置 - time_fields: List[BehaviorStatFieldConfig] = None - - # 行为统计配置 - behavior_summary_truncate_limit: int = 1000 # 行为统计截断限制 - - def __post_init__(self): - """初始化默认配置""" - # 从集中配置加载 - config = USER_PROFILE_BEHAVIOR_CONFIG - - if self.behavior_weights is None: - self.behavior_weights = config['behavior_weights'] - - if self.direct_fields is None: - self.direct_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['direct_fields']] - - if self.repeated_fields is None: - self.repeated_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['repeated_fields']] - - if self.numeric_fields is None: - self.numeric_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['numeric_fields']] - - if self.time_fields is None: - self.time_fields = [BehaviorStatFieldConfig(**field_config) for field_config in config['time_fields']] - - if self.behavior_summary_truncate_limit is None: - self.behavior_summary_truncate_limit = config['behavior_summary_truncate_limit'] - - -@dataclass -class UserProfileInfo: - """用户画像信息结构""" - # 基础信息 - sale_market_value: str = "" # 主要销售地区名 - nature_of_company_value: str = "" # 公司性质名 - customer_type: str = "" # 公司类型编码 - customer_type_value: str = "" # 公司类型名 - sell_channel_value: str = "" # 销售渠道名 - stores_number: int = 0 # 门店数量 - register_category_values: List[str] = None # 注册主要采购品类名 - auth_category_values: List[str] = None # 认证主要采购品类名 - purchase_quantity_by_year_value: str = "" # 采购规模名 - customer_goods_structures: List[Dict[str, str]] = None # 客户商品结构 - brand_category_values: List[str] = None # 客户品牌品类名 - delivery_type_value: str = "" # 主要出货方式名 - customs_import_scale: str = "" # 海关进口规模 - purchase_quantity: int = 0 # 单款采购箱数 - tax_clearance_type: str = "" # 清关方式编码 - tax_clearance_type_value: str = "" # 清关方式名 - category_values: List[str] = None # 经营类目名 - stores_number_offline: int = 0 # 线下门店数量 - year_sales_amount: str = "" # 年销售额 - main_market_values: List[str] = None # 主攻市场名 - main_area_values: List[str] = None # 外贸主攻区域名 - secondary_area_values: List[str] = None # 外贸次要区域名 - country_value: str = "" # 国家名 - - # 最近搜索词 - recent_search_keywords: List[str] = None # 最近10个搜索词(过滤掉isSearchFactory=true的) - - def __post_init__(self): - """初始化默认值""" - if self.register_category_values is None: - self.register_category_values = [] - if self.auth_category_values is None: - self.auth_category_values = [] - if self.customer_goods_structures is None: - self.customer_goods_structures = [] - if self.brand_category_values is None: - self.brand_category_values = [] - if self.category_values is None: - self.category_values = [] - if self.main_market_values is None: - self.main_market_values = [] - if self.main_area_values is None: - self.main_area_values = [] - if self.secondary_area_values is None: - self.secondary_area_values = [] - if self.recent_search_keywords is None: - self.recent_search_keywords = [] - - -class UserProfileExtractor: - """用户画像信息提取器""" - - def __init__(self): - """初始化提取器""" - self.behavior_stats_config = BehaviorStatsConfig() - self.dict_loader = DictLoader() - - def extract_user_profile_info(self, user_profile: UserProfile) -> UserProfileInfo: - """ - 从UserProfile中提取相关信息 - - Args: - user_profile: UserProfile对象 - - Returns: - UserProfileInfo: 提取的用户画像信息 - """ - if not user_profile or not user_profile.base_info: - logger.warning("[extract_user_profile_info] UserProfile or base_info is None") - return UserProfileInfo() - - base_info = user_profile.base_info - - # 提取基础信息 - profile_info = UserProfileInfo( - sale_market_value=base_info.saleMarketValue or "", - nature_of_company_value=base_info.natureOfCompanyValue or "", - customer_type=base_info.customerType or "", - customer_type_value=base_info.customerTypeValue or "", - sell_channel_value=base_info.sellChannelValue or "", - stores_number=base_info.storesNumber or 0, - register_category_values=[str(item) for item in base_info.registerCategoryValues] if base_info.registerCategoryValues else [], - auth_category_values=[str(item) for item in base_info.authCategoryValues] if base_info.authCategoryValues else [], - purchase_quantity_by_year_value=base_info.purchaseQuantityByYearValue or "", - customer_goods_structures=self._extract_customer_goods_structures(base_info.customerGoodsStructure), - brand_category_values=[str(item) for item in base_info.brandCategoryValues] if base_info.brandCategoryValues else [], - delivery_type_value=base_info.deliveryTypeValue or "", - customs_import_scale=base_info.customsImportScale or "", - purchase_quantity=base_info.purchaseQuantity or 0, - tax_clearance_type=base_info.taxClearanceType or "", - tax_clearance_type_value=base_info.taxClearanceTypeValue or "", - category_values=[str(item) for item in base_info.categoryValues] if base_info.categoryValues else [], - stores_number_offline=base_info.storesNumberOffline or 0, - year_sales_amount=base_info.yearSalesAmount or "", - main_market_values=[str(item) for item in base_info.mainMarketValues] if base_info.mainMarketValues else [], - main_area_values=[str(item) for item in base_info.mainAreaValues] if base_info.mainAreaValues else [], - secondary_area_values=[str(item) for item in base_info.secondaryAreaValues] if base_info.secondaryAreaValues else [], - country_value=base_info.countryValue or "", - recent_search_keywords=self._extract_recent_search_keywords(user_profile) - ) - - logger.info(f"[UserProfileExtractor.extract_user_profile_info] Extracted user profile info: {profile_info}") - return profile_info - - def _extract_customer_goods_structures(self, customer_goods_structures) -> List[Dict[str, str]]: - """ - 提取客户商品结构信息 - - Args: - customer_goods_structures: 客户商品结构列表 - - Returns: - List[Dict[str, str]]: 客户商品结构信息列表 - """ - if not customer_goods_structures: - return [] - - structures = [] - for structure in customer_goods_structures: - structure_info = { - 'price_between': structure.priceBetween or "", - 'goods_grade': structure.goodsGrade or "", - 'package_type': structure.packageType or "" - } - structures.append(structure_info) - - return structures - - - def generate_chat_search_intro(self, profile_info: UserProfileInfo) -> str: - """ - 生成导购语介绍 - - Args: - profile_info: UserProfileInfo对象 - - Returns: - str: 导购语介绍 - """ - if profile_info: - customer_type_value = profile_info.customer_type_value - # 地理位置信息 - location = profile_info.sale_market_value if profile_info.sale_market_value else profile_info.country_value - else: - customer_type_value = None - location = None - - # 生成导购语 - if not location and not customer_type_value: - return "你是一个跨境B2B选品顾问,请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" - elif not location: - return f"你是一个跨境B2B选品顾问,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" - elif not customer_type_value: - return f"你是一个跨境B2B选品顾问,熟悉{location}市场。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" - else: - return f"你是一个跨境B2B选品顾问,熟悉{location}市场,了解“{customer_type_value}”类型客户的采购决策逻辑。请基于客户背景信息、本次搜索query及其相关的搜索结果,按要求完成选品的思考和建议。" - - - def generate_natural_language_description(self, profile_info: UserProfileInfo) -> str: - """ - 生成用户基础信息的自然语言描述 - - Args: - profile_info: UserProfileInfo对象 - - Returns: - str: 自然语言描述 - """ - if not profile_info: - return "暂无用户画像信息" - - description_parts = [] - - # 基础公司信息 - if profile_info.customer_type_value: - description_parts.append(f"公司类型:{profile_info.customer_type_value}") - - if profile_info.nature_of_company_value: - description_parts.append(f"公司性质:{profile_info.nature_of_company_value}") - - if profile_info.sell_channel_value: - description_parts.append(f"销售渠道:{profile_info.sell_channel_value}") - - # 地理位置信息 - location_parts = [] - if profile_info.country_value: - location_parts.append(profile_info.country_value) - if profile_info.sale_market_value: - location_parts.append(profile_info.sale_market_value) - if location_parts: - description_parts.append(f"主要销售地区:{', '.join(location_parts)}") - - # 门店信息 - if profile_info.stores_number > 0: - description_parts.append(f"门店数量:{profile_info.stores_number}家") - if profile_info.stores_number_offline > 0: - description_parts.append(f"线下门店:{profile_info.stores_number_offline}家") - - # 采购信息 - if profile_info.purchase_quantity_by_year_value: - description_parts.append(f"采购规模:{profile_info.purchase_quantity_by_year_value}") - - if profile_info.purchase_quantity > 0: - description_parts.append(f"单款采购箱数:{profile_info.purchase_quantity}箱") - - # 年销售额 - if profile_info.year_sales_amount: - description_parts.append(f"年销售额:{profile_info.year_sales_amount}") - - # 类目信息 - if profile_info.register_category_values: - description_parts.append(f"注册采购品类:{', '.join(str(item) for item in profile_info.register_category_values)}") - - if profile_info.auth_category_values: - description_parts.append(f"认证采购品类:{', '.join(str(item) for item in profile_info.auth_category_values)}") - - if profile_info.category_values: - description_parts.append(f"经营类目:{', '.join(str(item) for item in profile_info.category_values)}") - - # 品牌信息 - if profile_info.brand_category_values: - description_parts.append(f"品牌品类:{', '.join(str(item) for item in profile_info.brand_category_values)}") - - # 市场信息 - if profile_info.main_market_values: - description_parts.append(f"主攻市场:{', '.join(str(item) for item in profile_info.main_market_values)}") - - if profile_info.main_area_values: - description_parts.append(f"外贸主攻区域:{', '.join(str(item) for item in profile_info.main_area_values)}") - - # 商品结构统计 - if profile_info.customer_goods_structures: - structure_descriptions = [] - for structure in profile_info.customer_goods_structures[:USER_PROFILE_BEHAVIOR_CONFIG['max_customer_goods_structures']]: # 只取前N个 - parts = [] - if structure['price_between']: - parts.append(f"价格区间{structure['price_between']}") - if structure['goods_grade']: - parts.append(f"产品档次{structure['goods_grade']}") - if structure['package_type']: - parts.append(f"包装类型{structure['package_type']}") - if parts: - structure_descriptions.append('、'.join(parts)) - - if structure_descriptions: - description_parts.append(f"商品结构统计:{'; '.join(structure_descriptions)}") - - # 物流信息 - if profile_info.delivery_type_value: - description_parts.append(f"主要出货方式:{profile_info.delivery_type_value}") - - if profile_info.tax_clearance_type_value: - description_parts.append(f"清关方式:{profile_info.tax_clearance_type_value}") - - if profile_info.customs_import_scale: - description_parts.append(f"海关进口规模:{profile_info.customs_import_scale}") - - # 组合成完整描述 - if description_parts: - return "\n".join(description_parts) - else: - return "暂无用户画像信息(信息为空)" - - def extract_and_describe(self, user_profile: UserProfile) -> str: - """ - 提取用户画像信息并生成完整的自然语言描述 - - Args: - user_profile: UserProfile对象 - - Returns: - 导购语, 完整的用户画像自然语言描述 - """ - # 提取基础信息 - profile_info = self.extract_user_profile_info(user_profile) - - # 生成导购语 - guide_intro = self.generate_chat_search_intro(profile_info) - - if not user_profile: - return guide_intro, "暂无用户画像信息" - - natural_description = self.generate_natural_language_description(profile_info) - - # 提取历史行为中的通用属性分布统计 - common_attribute_distribution = self.extract_common_attribute_distribution(user_profile) - - # 提取历史行为中每个商品的具体属性统计 - item_specific_attributes = self.extract_item_specific_attributes(user_profile) - - # 生成自然语言描述 - common_attribute_description = self.generate_common_attribute_distribution_description(common_attribute_distribution) - item_specific_attribute_description = self.generate_item_specific_attribute_description(item_specific_attributes) - - # 组织完整的描述 - language = getattr(self, 'language', 'zh') - - complete_description = f"{get_display_text('customer_background', language)}:\n{natural_description}" - - # 添加通用属性分布描述 - if USER_BEHAVIOR_STAT_IN_PROMPT: - if common_attribute_description: - complete_description += f"\n\n{get_display_text('historical_purchase_general_attributes', language)}:\n{common_attribute_description}" - - # 添加具体属性偏好描述 - if item_specific_attribute_description: - complete_description += f"\n\n{get_display_text('historical_purchase_category_specific_attributes', language)}:\n{item_specific_attribute_description}" - - # 添加最近搜索词信息 - # 提取最近搜索词 - if USER_SEARCH_HISTORY_IN_PROMPT: - recent_search_keywords = self._extract_recent_search_keywords(user_profile) - if recent_search_keywords: - complete_description += f"\n\n{get_display_text('recent_search_keywords', language)}:{', '.join(recent_search_keywords)}" - - return guide_intro, complete_description - - def extract_common_attribute_distribution(self, user_profile: UserProfile) -> Dict[str, Any]: - """ - 提取历史行为中的通用属性分布统计 - - Args: - user_profile: UserProfile对象 - - Returns: - Dict[str, Any]: 通用属性分布统计信息 - """ - if not user_profile or not user_profile.behavior_map: - logger.warning("[extract_common_attribute_distribution] UserProfile or behavior_map is None") - return {} - - behavior_map = user_profile.behavior_map - common_features = {} - - # 获取所有行为数据 - all_behaviors = [] - for behavior_type, behaviors in [ - ('click', behavior_map.click), - ('add_cart', behavior_map.add_cart), - ('collect', behavior_map.collect), - ('purchase', behavior_map.purchase) - ]: - logger.info(f"[UserProfileExtractor.extract_common_attribute_distribution] Extracted behavior_type {behavior_type} with {len(behaviors)} behaviors") - for behavior in behaviors: - all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) - - - # 1. 处理直接取值字段 - for field_config in self.behavior_stats_config.direct_fields: - if not field_config.enable: - continue - counter = Counter() - total_weight_for_field = 0 # 该字段的总权重(包括空值) - - for behavior, weight in all_behaviors: - total_weight_for_field += weight # 所有行为都计入总数 - if hasattr(behavior, field_config.field_name): - value = getattr(behavior, field_config.field_name) - if value: # 确保值不为空 - counter[str(value)] += weight # 转换为字符串 - # 如果值为空,不加入counter,但已计入total_weight_for_field - - # 计算空值权重 - empty_weight = total_weight_for_field - sum(counter.values()) - if empty_weight > 0: - counter['__empty__'] = empty_weight - - # 保存统计结果 - common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) - common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field - common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] - - # 2. 处理重复字段 - for field_config in self.behavior_stats_config.repeated_fields: - if not field_config.enable: - continue - counter = Counter() - total_weight_for_field = 0 # 该字段的总权重(包括空值) - - for behavior, weight in all_behaviors: - total_weight_for_field += weight # 所有行为都计入总数 - if hasattr(behavior, field_config.field_name) and getattr(behavior, field_config.field_name): - values = getattr(behavior, field_config.field_name) - has_valid_value = False - for value in values: - if value: - counter[str(value)] += weight - has_valid_value = True - # 如果没有有效值,不加入counter,但已计入total_weight_for_field - # 如果字段不存在或为空,不加入counter,但已计入total_weight_for_field - - # 计算空值权重 - empty_weight = total_weight_for_field - sum(counter.values()) - if empty_weight > 0: - counter['__empty__'] = empty_weight - - common_features[f'{field_config.feature_prefix}_weighted_counts'] = dict(counter) - common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field - common_features[f'{field_config.feature_prefix}_top_items'] = [item for item, count in counter.most_common(10)] - - # 3. 处理数值字段分桶统计 - for field_config in self.behavior_stats_config.numeric_fields: - if not field_config.enable: - continue - bucket_counter = Counter() - total_weight_for_field = 0 # 该字段的总权重(包括空值) - - for behavior, weight in all_behaviors: - total_weight_for_field += weight # 所有行为都计入总数 - if hasattr(behavior, field_config.field_name): - value = getattr(behavior, field_config.field_name) - if value and value > 0: - bucket = int(value / field_config.bucket_size) - bucket_counter[str(bucket)] += weight # 转换为字符串 - # 如果值为空或<=0,不加入counter,但已计入total_weight_for_field - - # 计算空值权重 - empty_weight = total_weight_for_field - sum(bucket_counter.values()) - if empty_weight > 0: - bucket_counter['__empty__'] = empty_weight - - common_features[f'{field_config.feature_prefix}_bucket_weighted_counts'] = dict(bucket_counter) - common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field - common_features[f'{field_config.feature_prefix}_top_buckets'] = [bucket for bucket, count in bucket_counter.most_common(10)] - - # 4. 处理时间差统计 - for field_config in self.behavior_stats_config.time_fields: - if not field_config.enable: - continue - time_bucket_counter = Counter() - total_weight_for_field = 0 # 该字段的总权重(包括空值) - - for behavior, weight in all_behaviors: - total_weight_for_field += weight # 所有行为都计入总数 - if hasattr(behavior, field_config.field_name) and hasattr(behavior, 'behaviorTime'): - time_value = getattr(behavior, field_config.field_name) - behavior_time = behavior.behaviorTime - - if time_value and behavior_time: - try: - # 解析时间字符串 - if isinstance(time_value, str): - time_obj = datetime.strptime(time_value, '%Y-%m-%d %H:%M:%S') - else: - time_obj = time_value - - if isinstance(behavior_time, str): - behavior_time_obj = datetime.strptime(behavior_time, '%Y-%m-%d %H:%M:%S') - else: - behavior_time_obj = behavior_time - - # 计算时间差(月数) - time_diff = behavior_time_obj - time_obj - months_diff = int(time_diff.days / 30) - - # 分桶:0-6个月,6-12个月,12-24个月,24个月以上 - if months_diff < 0: - bucket = 'future' - elif months_diff <= 6: - bucket = '0-6m' - elif months_diff <= 12: - bucket = '6-12m' - elif months_diff <= 24: - bucket = '12-24m' - else: - bucket = '24m+' - - time_bucket_counter[bucket] += weight - - except (ValueError, TypeError) as e: - logger.debug(f"Error parsing time for {field_config.field_name}: {e}") - continue - # 如果时间值为空或解析失败,不加入counter,但已计入total_weight_for_field - - # 计算空值权重 - empty_weight = total_weight_for_field - sum(time_bucket_counter.values()) - if empty_weight > 0: - time_bucket_counter['__empty__'] = empty_weight - - common_features[f'{field_config.feature_prefix}_time_bucket_weighted_counts'] = dict(time_bucket_counter) - common_features[f'{field_config.feature_prefix}_total_weight'] = total_weight_for_field - common_features[f'{field_config.feature_prefix}_top_time_buckets'] = [bucket for bucket, count in time_bucket_counter.most_common(5)] - - # 5. 综合统计信息 - total_weighted_behaviors = sum(weight for _, weight in all_behaviors) - common_features['total_weighted_behaviors'] = total_weighted_behaviors - - # 各行为类型的统计 - behavior_type_counts = Counter() - for behavior_type, behaviors in [ - ('click', behavior_map.click), - ('add_cart', behavior_map.add_cart), - ('collect', behavior_map.collect), - ('purchase', behavior_map.purchase) - ]: - behavior_type_counts[behavior_type] = len(behaviors) - - common_features['behavior_type_counts'] = dict(behavior_type_counts) - - logger.info(f"Extracted behavior stats with {len(common_features)} feature groups") - return common_features - - def extract_item_specific_attributes(self, user_profile: UserProfile) -> Dict[str, Any]: - """ - 从历史行为中提取每个商品的具体属性统计 - - Args: - user_profile: UserProfile对象 - - Returns: - Dict[str, Any]: 商品具体属性统计信息 - """ - if not user_profile or not user_profile.behavior_map: - logger.warning("[extract_item_specific_attributes] UserProfile or behavior_map is None") - return {} - - behavior_map = user_profile.behavior_map - - # 获取所有行为数据 - all_behaviors = [] - for behavior_type, behaviors in [ - ('click', behavior_map.click), - ('add_cart', behavior_map.add_cart), - ('collect', behavior_map.collect), - ('purchase', behavior_map.purchase) - ]: - for behavior in behaviors: - all_behaviors.append((behavior, self.behavior_stats_config.behavior_weights[behavior_type])) - - # 统计每个属性名称和属性值对应的权重 - attr_statistics = {} # {attr_name: {option_name: weight}} - - for behavior, weight in all_behaviors: - # 合并 spuAttributeList 和 skuAttributeList - merged_attributes = [] - - # 以 skuAttributeList 为基础 - if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: - merged_attributes.extend(behavior.skuAttributeList) - - # 加入 spuAttributeList,如果 attributeId 已存在则跳过 - existing_attr_ids = set() - if hasattr(behavior, 'skuAttributeList') and behavior.skuAttributeList: - existing_attr_ids = {attr.attributeId for attr in behavior.skuAttributeList} - - if hasattr(behavior, 'spuAttributeList') and behavior.spuAttributeList: - for attr in behavior.spuAttributeList: - if attr.attributeId not in existing_attr_ids: - merged_attributes.append(attr) - existing_attr_ids.add(attr.attributeId) - - # 统计合并后的属性 - for attr in merged_attributes: - attr_id = attr.attributeId - option_id = attr.optionId - - # 获取属性名称 - attr_name = self.dict_loader.get_name('spu_attribute', str(attr_id)) - if not attr_name: - attr_name = self.dict_loader.get_name('sku_attribute', str(attr_id)) - if not attr_name: - attr_name = f"属性{attr_id}" - - # 获取属性值名称 - option_name = self.dict_loader.get_name('spu_attribute_option', str(option_id)) - if not option_name: - option_name = self.dict_loader.get_name('sku_attribute_option', str(option_id)) - if not option_name: - option_name = f"选项{option_id}" - - # 跳过无效的属性值 - if option_name == '无' or not option_name: - continue - - # 统计 - if attr_name not in attr_statistics: - attr_statistics[attr_name] = {} - - if option_name not in attr_statistics[attr_name]: - attr_statistics[attr_name][option_name] = 0 - - attr_statistics[attr_name][option_name] += weight - - if not attr_statistics: - return {} - - # 生成属性统计特征 - attribute_features = {} - - # 计算每个属性的总权重并排序 - attr_with_total = [ - (attr_name, options_dict, sum(options_dict.values())) - for attr_name, options_dict in attr_statistics.items() - ] - - # 按总权重排序,取前10个属性 - sorted_attrs = sorted(attr_with_total, key=lambda x: x[2], reverse=True) - - for attr_name, options_dict, total_weight in sorted_attrs: - # 按权重排序选项,取前5个 - sorted_options = sorted(options_dict.items(), key=lambda x: x[1], reverse=True) - - # 生成特征名称(使用属性名称的拼音或ID作为前缀) - attr_feature_prefix = f"attr_{attr_name.replace(' ', '_').replace(':', '_')}" - - attribute_features[f'{attr_feature_prefix}_weighted_counts'] = dict(options_dict) - attribute_features[f'{attr_feature_prefix}_total_weight'] = total_weight - attribute_features[f'{attr_feature_prefix}_top_items'] = [item for item, count in sorted_options] - - # 添加总体属性统计 - total_attribute_weight = sum(attr[2] for attr in sorted_attrs) - attribute_features['attribute_total_weight'] = total_attribute_weight - attribute_features['attribute_attr_count'] = len(sorted_attrs) - - logger.info(f"Extracted attribute statistics with {len(attribute_features)} attribute feature groups") - return attribute_features - - def generate_common_attribute_distribution_description(self, common_attribute_distribution: Dict[str, Any]) -> str: - """ - 生成通用属性分布统计的自然语言描述 - - Args: - common_attribute_distribution: 通用属性分布统计信息 - - Returns: - str: 自然语言描述 - """ - if not common_attribute_distribution: - return "暂无通用属性分布统计信息" - - description_parts = [] - - # 0. 行为总述(放在最前面) - if 'behavior_type_counts' in common_attribute_distribution: - behavior_counts = common_attribute_distribution['behavior_type_counts'] - total_behaviors = sum(behavior_counts.values()) - - if total_behaviors > 0: - behavior_summary_parts = [] - - # 检查是否达到截断限制 - if total_behaviors >= self.behavior_stats_config.behavior_summary_truncate_limit: - behavior_summary_parts.append(f"该用户有超过{self.behavior_stats_config.behavior_summary_truncate_limit}次行为") - else: - behavior_summary_parts.append(f"该用户有{total_behaviors}次行为") - - # 添加具体行为类型统计 - behavior_details = [] - if behavior_counts.get('click', 0) > 0: - behavior_details.append(f"{behavior_counts['click']}次点击") - if behavior_counts.get('add_cart', 0) > 0: - behavior_details.append(f"{behavior_counts['add_cart']}次加购") - if behavior_counts.get('collect', 0) > 0: - behavior_details.append(f"{behavior_counts['collect']}次收藏") - if behavior_counts.get('purchase', 0) > 0: - behavior_details.append(f"{behavior_counts['purchase']}次购买") - - if behavior_details: - behavior_summary_parts.append(f"包括{', '.join(behavior_details)}") - - description_parts.append(''.join(behavior_summary_parts)) - - # 1. 处理直接取值字段描述 - for field_config in self.behavior_stats_config.direct_fields: - if not field_config.enable: - continue - weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' - total_weight_key = f'{field_config.feature_prefix}_total_weight' - - if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: - weighted_counts = common_attribute_distribution[weighted_counts_key] - total_weight = common_attribute_distribution[total_weight_key] - - if total_weight > 0: - # 生成带占比的描述 - items_with_percentage = [] - for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: - percentage = (count / total_weight) * 100 - # 词典映射 - if item == '__empty__': - display_name = '空值' - elif field_config.dict_name: - display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) - else: - display_name = str(item) - - items_with_percentage.append(f"{display_name}({percentage:.1f}%)") - - if items_with_percentage: - description = field_config.description_template.format( - display_name=field_config.display_name, - values=', '.join(items_with_percentage) - ) - description_parts.append(description) - - # 2. 处理重复字段描述 - for field_config in self.behavior_stats_config.repeated_fields: - if not field_config.enable: - continue - weighted_counts_key = f'{field_config.feature_prefix}_weighted_counts' - total_weight_key = f'{field_config.feature_prefix}_total_weight' - - if weighted_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: - weighted_counts = common_attribute_distribution[weighted_counts_key] - total_weight = common_attribute_distribution[total_weight_key] - - if total_weight > 0: - # 生成带占比的描述 - items_with_percentage = [] - for item, count in sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: - percentage = (count / total_weight) * 100 - # 词典映射 - if item == '__empty__': - display_name = '空值' - elif field_config.dict_name: - display_name = self.dict_loader.get_name(field_config.dict_name, str(item)) or str(item) - else: - display_name = str(item) - - items_with_percentage.append(f"{display_name}({percentage:.1f}%)") - - if items_with_percentage: - description = field_config.description_template.format( - display_name=field_config.display_name, - values=', '.join(items_with_percentage) - ) - description_parts.append(description) - - # 3. 处理数值字段描述 - for field_config in self.behavior_stats_config.numeric_fields: - if not field_config.enable: - continue - bucket_counts_key = f'{field_config.feature_prefix}_bucket_weighted_counts' - total_weight_key = f'{field_config.feature_prefix}_total_weight' - - if bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: - bucket_counts = common_attribute_distribution[bucket_counts_key] - total_weight = common_attribute_distribution[total_weight_key] - - if total_weight > 0: - # 生成带占比的描述 - ranges_with_percentage = [] - for bucket, count in sorted(bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: - percentage = (count / total_weight) * 100 - - if bucket == '__empty__': - range_desc = '空值' - else: - range_desc = f"{int(bucket)*field_config.bucket_size}-{(int(bucket)+1)*field_config.bucket_size}" - - ranges_with_percentage.append(f"{range_desc}({percentage:.1f}%)") - - if ranges_with_percentage: - description = field_config.description_template.format( - display_name=field_config.display_name, - values=', '.join(ranges_with_percentage) - ) - description_parts.append(description) - - # 4. 处理时间字段描述 - for field_config in self.behavior_stats_config.time_fields: - if not field_config.enable: - continue - time_bucket_counts_key = f'{field_config.feature_prefix}_time_bucket_weighted_counts' - total_weight_key = f'{field_config.feature_prefix}_total_weight' - - if time_bucket_counts_key in common_attribute_distribution and total_weight_key in common_attribute_distribution: - time_bucket_counts = common_attribute_distribution[time_bucket_counts_key] - total_weight = common_attribute_distribution[total_weight_key] - - if total_weight > 0: - # 生成带占比的描述 - time_descriptions_with_percentage = [] - for bucket, count in sorted(time_bucket_counts.items(), key=lambda x: x[1], reverse=True)[:field_config.max_items]: - percentage = (count / total_weight) * 100 - bucket_str = str(bucket) - - if bucket_str == '__empty__': - time_desc = '空值' - elif bucket_str == '0-6m': - time_desc = '半年内' - elif bucket_str == '6-12m': - time_desc = '半年到一年' - elif bucket_str == '12-24m': - time_desc = '1-2年' - elif bucket_str == '24m+': - time_desc = '2年+' - elif bucket_str == 'future': - time_desc = '错误时间' - else: - time_desc = bucket_str - - time_descriptions_with_percentage.append(f"{time_desc}({percentage:.1f}%)") - - if time_descriptions_with_percentage: - description = field_config.description_template.format( - display_name=field_config.display_name, - values=', '.join(time_descriptions_with_percentage) - ) - description_parts.append(description) - - # 组合成完整描述 - if description_parts: - return "\n".join(description_parts) - else: - return "" - - def generate_item_specific_attribute_description(self, item_specific_attributes: Dict[str, Any]) -> str: - """ - 生成商品具体属性统计的自然语言描述 - - Args: - item_specific_attributes: 商品具体属性统计信息 - - Returns: - str: 商品具体属性统计的自然语言描述 - """ - if not item_specific_attributes: - return "暂无商品具体属性统计信息。" - - descriptions = [] - - # 获取所有属性相关的特征 - attr_features = {} - for key, value in item_specific_attributes.items(): - if key.startswith('attr_') and key.endswith('_weighted_counts'): - attr_name = key.replace('_weighted_counts', '').replace('attr_', '') - attr_features[attr_name] = value - - if not attr_features: - return "暂无有效属性统计信息。" - - # 按总权重排序属性 - sorted_attrs = [] - for attr_name, weighted_counts in attr_features.items(): - total_weight = sum(weighted_counts.values()) - sorted_attrs.append((attr_name, weighted_counts, total_weight)) - - sorted_attrs.sort(key=lambda x: x[2], reverse=True) - - # 生成描述 - max_attrs = USER_PROFILE_BEHAVIOR_CONFIG['max_attributes_display'] - max_options = USER_PROFILE_BEHAVIOR_CONFIG['max_options_per_attribute'] - for attr_name, weighted_counts, total_weight in sorted_attrs[:max_attrs]: # 取前N个属性 - # 按权重排序选项,取前N个 - sorted_options = sorted(weighted_counts.items(), key=lambda x: x[1], reverse=True)[:max_options] - - option_texts = [] - for option_name, weight in sorted_options: - if option_name != '__empty__': - # 计算百分比 - percentage = (weight / total_weight) * 100 - option_texts.append(f"{option_name}({percentage:.1f}%)") - - if option_texts: - desc = f"• {attr_name}: {', '.join(option_texts)}" - descriptions.append(desc) - - if descriptions: - return "\n".join(descriptions) - return "暂无有效属性统计信息。" - - def _extract_recent_search_keywords(self, user_profile: UserProfile) -> List[str]: - """ - 提取最近10个搜索词(过滤掉isSearchFactory=true的) - - Args: - user_profile: UserProfile对象 - - Returns: - List[str]: 最近10个搜索词列表 - """ - if not user_profile or not user_profile.behavior_map: - return [] - - search_keywords = user_profile.behavior_map.search_keyword - if not search_keywords: - return [] - - # 过滤、去重并收集最近10个搜索词 - seen_keywords = set() - recent_keywords = [] - for search_behavior in search_keywords: - if not search_behavior.isSearchFactory and search_behavior.keyword: - keyword = search_behavior.keyword.strip() - - # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 - if self._is_valid_search_keyword(keyword): - if keyword not in seen_keywords: - seen_keywords.add(keyword) - recent_keywords.append(keyword) - if len(recent_keywords) >= SESSION_CONFIG['max_recent_search_keywords']: # 达到最大数量就停止 - break - - logger.info(f"[UserProfileExtractor._extract_recent_search_keywords] Extracted {len(recent_keywords)} recent search keywords") - return recent_keywords - - def _is_valid_search_keyword(self, keyword: str) -> bool: - """ - 判断搜索关键词是否有效 - - Args: - keyword: 搜索关键词 - - Returns: - bool: 是否有效 - """ - if not keyword or keyword.strip() == '': - return False - - # 过滤掉纯数字、下划线、减号、空白字符构成的关键词 - # 使用正则表达式匹配:只包含数字、下划线、减号、空白字符的字符串 - if re.match(r'^[\d\s_-]+$', keyword): - return False - - # 只有一个单词(split后只有一个)、并且这个单词里面既包含数字又包含字母 (转小写后 既有小写字母、又有数字) - if len(keyword.split()) == 1: - if re.match(r'^[a-z0-9]+$', keyword.lower()): - return False - # 包含数字和- - if re.match(r'^[0-9-]+$', keyword): - return False - - return True \ No newline at end of file -- libgit2 0.21.2