Blame view

old/middleware/hmac_validator.py 1.69 KB
cccb7cfc   tangwang   init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  import hmac
  import hashlib
  from flask import request, jsonify
  from functools import wraps
  from config import Config
  
  def hmac_validator_required(f):
      """
      HMAC验证装饰器
      验证来自Shoplazza的请求是否有效
      """
      @wraps(f)
      def decorated_function(*args, **kwargs):
          # 获取查询参数
          query_params = request.args.to_dict()
          hmac_param = query_params.pop('hmac', None)
          
          if not hmac_param:
              return jsonify({'error': 'Missing HMAC parameter'}), 400
          
          # 验证shop参数格式
          shop = query_params.get('shop')
          if not shop or not shop.endswith('.myshoplaza.com'):
              return jsonify({'error': 'Invalid shop parameter'}), 400
          
          # 构建验证消息
          sorted_keys = sorted(query_params.keys())
          message = '&'.join([f"{key}={query_params[key]}" for key in sorted_keys])
          
          # 计算HMAC
          calculated_hmac = hmac.new(
              Config.CLIENT_SECRET.encode('utf-8'),
              message.encode('utf-8'),
              hashlib.sha256
          ).hexdigest()
          
          # 安全比较HMAC
          if not hmac.compare_digest(calculated_hmac, hmac_param):
              return jsonify({'error': 'HMAC validation failed'}), 403
          
          return f(*args, **kwargs)
      
      return decorated_function
  
  def verify_webhook_hmac(data, hmac_header):
      """
      验证WebhookHMAC签名
      """
      import base64
      
      calculated_hmac = base64.b64encode(
          hmac.new(
              Config.CLIENT_SECRET.encode('utf-8'),
              data,
              hashlib.sha256
          ).digest()
      ).decode('utf-8')
      
      return hmac.compare_digest(calculated_hmac, hmac_header)