cccb7cfc
tangwang
init
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
from flask import Flask, request, redirect, jsonify
from flask_cors import CORS
import hmac
import hashlib
import secrets
import requests
app = Flask(__name__)
CORS(app)
# 配置 - 直接写死
CLIENT_ID = "J2ntcUvAKRq_xVXcmysAc6iUJ-MYJF4JtoMKJGi_I9A"
CLIENT_SECRET = "WvmwpHVf1u1hfYEhsc7LPl0YxrbiFy3A2JVBhy8PWAU"
REDIRECT_URI = "https://rooster-wired-monster.ngrok-free.app/api/auth/callback"
# 工具函数:安全比较HMAC
def secure_compare(a, b):
"""
使用timing-safe比较来防止时序攻击
"""
return hmac.compare_digest(a, b)
# 中间件:HMAC验证
def hmac_validator(func):
"""
HMAC验证装饰器 - 验证请求的HMAC签名
"""
def wrapper(*args, **kwargs):
# 获取查询参数
code = request.args.get('code')
hmac_value = request.args.get('hmac')
state = request.args.get('state')
shop = request.args.get('shop')
if not hmac_value:
return jsonify({"error": "HMAC validation failed"}), 400
# 构建消息
params = dict(request.args)
del params['hmac'] # 移除hmac参数
# 按字典序排序参数并构建消息字符串
sorted_keys = sorted(params.keys())
message = "&".join([f"{key}={params[key]}" for key in sorted_keys])
# 生成HMAC哈希
generated_hash = hmac.new(
CLIENT_SECRET.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
# 比较HMAC
if not secure_compare(generated_hash, hmac_value):
return jsonify({"error": "HMAC validation failed"}), 400
return func(*args, **kwargs)
wrapper.__name__ = func.__name__
return wrapper
# OAuth认证路由
@app.route('/api/auth', methods=['GET'])
def auth():
"""
OAuth认证端点 - 重定向到Shopify授权页面
"""
shop = request.args.get('shop')
if not shop:
return jsonify({"error": "shop parameter is required"}), 400
scopes = "read_customer"
state = secrets.token_hex(16) # 生成随机state
auth_url = (
f"https://{shop}/admin/oauth/authorize?"
f"client_id={CLIENT_ID}"
f"&scope={scopes}"
f"&redirect_uri={REDIRECT_URI}"
f"&response_type=code"
f"&state={state}"
)
return redirect(auth_url)
# OAuth回调路由
@app.route('/api/auth/callback', methods=['GET'])
@hmac_validator
def auth_callback():
"""
OAuth回调端点 - 交换授权码获取访问令牌
"""
code = request.args.get('code')
shop = request.args.get('shop')
if not shop or not code:
return jsonify({"error": "Required parameters missing"}), 400
try:
# 请求访问令牌
token_url = f"https://{shop}/admin/oauth/token"
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": REDIRECT_URI,
}
response = requests.post(token_url, json=data)
response.raise_for_status()
token_data = response.json()
print("token data:", token_data)
return jsonify(token_data)
# 这里可以调用Shoplazza OpenAPI获取更多数据
# headers = {"Access-Token": token_data.get("access_token")}
# customers_url = f"https://{shop}/openapi/2022-01/customers"
# customers_response = requests.get(customers_url, headers=headers)
# return redirect(LAST_URL)
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return jsonify({"error": "Failed to get access token"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=4000, debug=True)
|