Blame view

python-app-demo/main.py 3.78 KB
cccb7cfc   tangwang   init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
  from flask import Flask, request, redirect, jsonify
  from flask_cors import CORS
  import hmac
  import hashlib
  import secrets
  import requests
  
  app = Flask(__name__)
  CORS(app)
  
  # 配置 - 直接写死
  CLIENT_ID = "J2ntcUvAKRq_xVXcmysAc6iUJ-MYJF4JtoMKJGi_I9A"
  CLIENT_SECRET = "WvmwpHVf1u1hfYEhsc7LPl0YxrbiFy3A2JVBhy8PWAU"
  REDIRECT_URI = "https://rooster-wired-monster.ngrok-free.app/api/auth/callback"
  
  # 工具函数:安全比较HMAC
  def secure_compare(a, b):
      """
      使用timing-safe比较来防止时序攻击
      """
      return hmac.compare_digest(a, b)
  
  # 中间件:HMAC验证
  def hmac_validator(func):
      """
      HMAC验证装饰器 - 验证请求的HMAC签名
      """
      def wrapper(*args, **kwargs):
          # 获取查询参数
          code = request.args.get('code')
          hmac_value = request.args.get('hmac')
          state = request.args.get('state')
          shop = request.args.get('shop')
          
          if not hmac_value:
              return jsonify({"error": "HMAC validation failed"}), 400
          
          # 构建消息
          params = dict(request.args)
          del params['hmac']  # 移除hmac参数
          
          # 按字典序排序参数并构建消息字符串
          sorted_keys = sorted(params.keys())
          message = "&".join([f"{key}={params[key]}" for key in sorted_keys])
          
          # 生成HMAC哈希
          generated_hash = hmac.new(
              CLIENT_SECRET.encode(),
              message.encode(),
              hashlib.sha256
          ).hexdigest()
          
          # 比较HMAC
          if not secure_compare(generated_hash, hmac_value):
              return jsonify({"error": "HMAC validation failed"}), 400
          
          return func(*args, **kwargs)
      
      wrapper.__name__ = func.__name__
      return wrapper
  
  # OAuth认证路由
  @app.route('/api/auth', methods=['GET'])
  def auth():
      """
      OAuth认证端点 - 重定向到Shopify授权页面
      """
      shop = request.args.get('shop')
      if not shop:
          return jsonify({"error": "shop parameter is required"}), 400
      
      scopes = "read_customer"
      state = secrets.token_hex(16)  # 生成随机state
      
      auth_url = (
          f"https://{shop}/admin/oauth/authorize?"
          f"client_id={CLIENT_ID}"
          f"&scope={scopes}"
          f"&redirect_uri={REDIRECT_URI}"
          f"&response_type=code"
          f"&state={state}"
      )
      
      return redirect(auth_url)
  
  # OAuth回调路由
  @app.route('/api/auth/callback', methods=['GET'])
  @hmac_validator
  def auth_callback():
      """
      OAuth回调端点 - 交换授权码获取访问令牌
      """
      code = request.args.get('code')
      shop = request.args.get('shop')
      
      if not shop or not code:
          return jsonify({"error": "Required parameters missing"}), 400
      
      try:
          # 请求访问令牌
          token_url = f"https://{shop}/admin/oauth/token"
          data = {
              "client_id": CLIENT_ID,
              "client_secret": CLIENT_SECRET,
              "code": code,
              "grant_type": "authorization_code",
              "redirect_uri": REDIRECT_URI,
          }
          
          response = requests.post(token_url, json=data)
          response.raise_for_status()
          
          token_data = response.json()
          print("token data:", token_data)
          
          return jsonify(token_data)
          
          # 这里可以调用Shoplazza OpenAPI获取更多数据
          # headers = {"Access-Token": token_data.get("access_token")}
          # customers_url = f"https://{shop}/openapi/2022-01/customers"
          # customers_response = requests.get(customers_url, headers=headers)
          # return redirect(LAST_URL)
          
      except requests.exceptions.RequestException as e:
          print(f"Error: {e}")
          return jsonify({"error": "Failed to get access token"}), 500
  
  if __name__ == '__main__':
      app.run(host='0.0.0.0', port=4000, debug=True)