hmac_validator.py
1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import hmac
import hashlib
from flask import request, jsonify
from functools import wraps
from config import Config
def hmac_validator_required(f):
"""
HMAC验证装饰器
验证来自Shoplazza的请求是否有效
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 获取查询参数
query_params = request.args.to_dict()
hmac_param = query_params.pop('hmac', None)
if not hmac_param:
return jsonify({'error': 'Missing HMAC parameter'}), 400
# 验证shop参数格式
shop = query_params.get('shop')
if not shop or not shop.endswith('.myshoplaza.com'):
return jsonify({'error': 'Invalid shop parameter'}), 400
# 构建验证消息
sorted_keys = sorted(query_params.keys())
message = '&'.join([f"{key}={query_params[key]}" for key in sorted_keys])
# 计算HMAC
calculated_hmac = hmac.new(
Config.CLIENT_SECRET.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
# 安全比较HMAC
if not hmac.compare_digest(calculated_hmac, hmac_param):
return jsonify({'error': 'HMAC validation failed'}), 403
return f(*args, **kwargs)
return decorated_function
def verify_webhook_hmac(data, hmac_header):
"""
验证Webhook的HMAC签名
"""
import base64
calculated_hmac = base64.b64encode(
hmac.new(
Config.CLIENT_SECRET.encode('utf-8'),
data,
hashlib.sha256
).digest()
).decode('utf-8')
return hmac.compare_digest(calculated_hmac, hmac_header)