import hmac import hashlib from flask import request, jsonify from functools import wraps from config import Config def hmac_validator_required(f): """ HMAC验证装饰器 验证来自Shoplazza的请求是否有效 """ @wraps(f) def decorated_function(*args, **kwargs): # 获取查询参数 query_params = request.args.to_dict() hmac_param = query_params.pop('hmac', None) if not hmac_param: return jsonify({'error': 'Missing HMAC parameter'}), 400 # 验证shop参数格式 shop = query_params.get('shop') if not shop or not shop.endswith('.myshoplaza.com'): return jsonify({'error': 'Invalid shop parameter'}), 400 # 构建验证消息 sorted_keys = sorted(query_params.keys()) message = '&'.join([f"{key}={query_params[key]}" for key in sorted_keys]) # 计算HMAC calculated_hmac = hmac.new( Config.CLIENT_SECRET.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).hexdigest() # 安全比较HMAC if not hmac.compare_digest(calculated_hmac, hmac_param): return jsonify({'error': 'HMAC validation failed'}), 403 return f(*args, **kwargs) return decorated_function def verify_webhook_hmac(data, hmac_header): """ 验证Webhook的HMAC签名 """ import base64 calculated_hmac = base64.b64encode( hmac.new( Config.CLIENT_SECRET.encode('utf-8'), data, hashlib.sha256 ).digest() ).decode('utf-8') return hmac.compare_digest(calculated_hmac, hmac_header)