example_query_redis.py 5.25 KB
"""
示例:从Redis查询推荐结果
演示如何使用生成的索引进行在线推荐
"""
import redis
import argparse
from offline_tasks.config.offline_config import REDIS_CONFIG


def query_i2i_similar_items(redis_client, item_id, algorithm='swing', top_n=10):
    """
    查询物品的相似物品
    
    Args:
        redis_client: Redis客户端
        item_id: 物品ID
        algorithm: 算法类型 (swing/session_w2v/deepwalk)
        top_n: 返回前N个结果
    
    Returns:
        List of (item_id, score)
    """
    key = f"i2i:{algorithm}:{item_id}"
    value = redis_client.get(key)
    
    if not value:
        return []
    
    # 解析结果
    results = []
    items = value.split(',')
    for item in items[:top_n]:
        parts = item.split(':')
        if len(parts) == 2:
            results.append((parts[0], float(parts[1])))
    
    return results


def query_interest_items(redis_client, dimension_key, list_type='hot', top_n=20):
    """
    查询兴趣点聚合的推荐物品
    
    Args:
        redis_client: Redis客户端
        dimension_key: 维度key (如 platform:PC, country:US)
        list_type: 列表类型 (hot/cart/new/global)
        top_n: 返回前N个结果
    
    Returns:
        List of (item_id, score)
    """
    key = f"interest:{list_type}:{dimension_key}"
    value = redis_client.get(key)
    
    if not value:
        return []
    
    # 解析结果
    results = []
    items = value.split(',')
    for item in items[:top_n]:
        parts = item.split(':')
        if len(parts) == 2:
            results.append((parts[0], float(parts[1])))
    
    return results


def main():
    parser = argparse.ArgumentParser(description='Query recommendation results from Redis')
    parser.add_argument('--redis-host', type=str, default=REDIS_CONFIG.get('host', 'localhost'),
                       help='Redis host')
    parser.add_argument('--redis-port', type=int, default=REDIS_CONFIG.get('port', 6379),
                       help='Redis port')
    parser.add_argument('--redis-db', type=int, default=REDIS_CONFIG.get('db', 0),
                       help='Redis database')
    
    args = parser.parse_args()
    
    # 创建Redis连接
    print("Connecting to Redis...")
    redis_client = redis.Redis(
        host=args.redis_host,
        port=args.redis_port,
        db=args.redis_db,
        decode_responses=True
    )
    
    try:
        redis_client.ping()
        print("✓ Redis connected\n")
    except Exception as e:
        print(f"✗ Failed to connect to Redis: {e}")
        return 1
    
    # 示例1: 查询i2i相似物品
    print("="*80)
    print("示例1: 查询物品的相似物品(i2i)")
    print("="*80)
    
    test_item_id = "123456"  # 替换为实际的物品ID
    
    for algorithm in ['swing', 'session_w2v', 'deepwalk']:
        print(f"\n算法: {algorithm}")
        results = query_i2i_similar_items(redis_client, test_item_id, algorithm, top_n=5)
        
        if results:
            print(f"物品 {test_item_id} 的相似物品:")
            for idx, (item_id, score) in enumerate(results, 1):
                print(f"  {idx}. 物品ID: {item_id}, 相似度: {score:.4f}")
        else:
            print(f"  未找到物品 {test_item_id} 的相似物品")
    
    # 示例2: 查询兴趣点推荐
    print("\n" + "="*80)
    print("示例2: 查询兴趣点聚合推荐")
    print("="*80)
    
    # 测试不同维度
    test_cases = [
        ('platform', 'PC', 'hot'),
        ('country', 'US', 'hot'),
        ('customer_type', 'retailer', 'cart'),
        ('category_level2', '100', 'new'),
    ]
    
    for dimension, value, list_type in test_cases:
        dimension_key = f"{dimension}:{value}"
        print(f"\n维度: {dimension_key}, 类型: {list_type}")
        results = query_interest_items(redis_client, dimension_key, list_type, top_n=5)
        
        if results:
            print(f"推荐物品:")
            for idx, (item_id, score) in enumerate(results, 1):
                print(f"  {idx}. 物品ID: {item_id}, 分数: {score:.4f}")
        else:
            print(f"  未找到推荐结果")
    
    # 示例3: 组合查询(实际推荐场景)
    print("\n" + "="*80)
    print("示例3: 组合推荐场景")
    print("="*80)
    
    print("\n场景: 用户在PC端,来自美国,是零售商,浏览了物品123456")
    print("\n1. 基于物品的相似推荐(i2i):")
    results = query_i2i_similar_items(redis_client, test_item_id, 'swing', top_n=3)
    for idx, (item_id, score) in enumerate(results, 1):
        print(f"  {idx}. 物品ID: {item_id}, 相似度: {score:.4f}")
    
    print("\n2. 基于平台+国家的热门推荐:")
    results = query_interest_items(redis_client, 'platform_country:PC_US', 'hot', top_n=3)
    for idx, (item_id, score) in enumerate(results, 1):
        print(f"  {idx}. 物品ID: {item_id}, 分数: {score:.4f}")
    
    print("\n3. 基于客户类型的加购推荐:")
    results = query_interest_items(redis_client, 'customer_type:retailer', 'cart', top_n=3)
    for idx, (item_id, score) in enumerate(results, 1):
        print(f"  {idx}. 物品ID: {item_id}, 分数: {score:.4f}")
    
    print("\n" + "="*80)
    print("✓ 查询示例完成")
    print("="*80)
    
    return 0


if __name__ == '__main__':
    main()