hmac_validator.py 1.69 KB
import hmac
import hashlib
from flask import request, jsonify
from functools import wraps
from config import Config

def hmac_validator_required(f):
    """
    HMAC验证装饰器
    验证来自Shoplazza的请求是否有效
    """
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # 获取查询参数
        query_params = request.args.to_dict()
        hmac_param = query_params.pop('hmac', None)
        
        if not hmac_param:
            return jsonify({'error': 'Missing HMAC parameter'}), 400
        
        # 验证shop参数格式
        shop = query_params.get('shop')
        if not shop or not shop.endswith('.myshoplaza.com'):
            return jsonify({'error': 'Invalid shop parameter'}), 400
        
        # 构建验证消息
        sorted_keys = sorted(query_params.keys())
        message = '&'.join([f"{key}={query_params[key]}" for key in sorted_keys])
        
        # 计算HMAC
        calculated_hmac = hmac.new(
            Config.CLIENT_SECRET.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        # 安全比较HMAC
        if not hmac.compare_digest(calculated_hmac, hmac_param):
            return jsonify({'error': 'HMAC validation failed'}), 403
        
        return f(*args, **kwargs)
    
    return decorated_function

def verify_webhook_hmac(data, hmac_header):
    """
    验证Webhook的HMAC签名
    """
    import base64
    
    calculated_hmac = base64.b64encode(
        hmac.new(
            Config.CLIENT_SECRET.encode('utf-8'),
            data,
            hashlib.sha256
        ).digest()
    ).decode('utf-8')
    
    return hmac.compare_digest(calculated_hmac, hmac_header)