main.py 3.77 KB
from flask import Flask, request, redirect, jsonify
from flask_cors import CORS
import hmac
import hashlib
import secrets
import requests

app = Flask(__name__)
CORS(app)

# 配置 - 直接写死
CLIENT_ID = "J2ntcUvAKRq_xVXcmysAc6iUJ-MYJF4JtoMKJGi_I9A"
CLIENT_SECRET = "WvmwpHVf1u1hfYEhsc7LPl0YxrbiFy3A2JVBhy8PWAU"
REDIRECT_URI = "https://rooster-wired-monster.ngrok-free.app/api/auth/callback"

# 工具函数:安全比较HMAC
def secure_compare(a, b):
    """
    使用timing-safe比较来防止时序攻击
    """
    return hmac.compare_digest(a, b)

# 中间件:HMAC验证
def hmac_validator(func):
    """
    HMAC验证装饰器 - 验证请求的HMAC签名
    """
    def wrapper(*args, **kwargs):
        # 获取查询参数
        code = request.args.get('code')
        hmac_value = request.args.get('hmac')
        state = request.args.get('state')
        shop = request.args.get('shop')
        
        if not hmac_value:
            return jsonify({"error": "HMAC validation failed"}), 400
        
        # 构建消息
        params = dict(request.args)
        del params['hmac']  # 移除hmac参数
        
        # 按字典序排序参数并构建消息字符串
        sorted_keys = sorted(params.keys())
        message = "&".join([f"{key}={params[key]}" for key in sorted_keys])
        
        # 生成HMAC哈希
        generated_hash = hmac.new(
            CLIENT_SECRET.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        
        # 比较HMAC
        if not secure_compare(generated_hash, hmac_value):
            return jsonify({"error": "HMAC validation failed"}), 400
        
        return func(*args, **kwargs)
    
    wrapper.__name__ = func.__name__
    return wrapper

# OAuth认证路由
@app.route('/api/auth', methods=['GET'])
def auth():
    """
    OAuth认证端点 - 重定向到Shopify授权页面
    """
    shop = request.args.get('shop')
    if not shop:
        return jsonify({"error": "shop parameter is required"}), 400
    
    scopes = "read_tenant"
    state = secrets.token_hex(16)  # 生成随机state
    
    auth_url = (
        f"https://{shop}/admin/oauth/authorize?"
        f"client_id={CLIENT_ID}"
        f"&scope={scopes}"
        f"&redirect_uri={REDIRECT_URI}"
        f"&response_type=code"
        f"&state={state}"
    )
    
    return redirect(auth_url)

# OAuth回调路由
@app.route('/api/auth/callback', methods=['GET'])
@hmac_validator
def auth_callback():
    """
    OAuth回调端点 - 交换授权码获取访问令牌
    """
    code = request.args.get('code')
    shop = request.args.get('shop')
    
    if not shop or not code:
        return jsonify({"error": "Required parameters missing"}), 400
    
    try:
        # 请求访问令牌
        token_url = f"https://{shop}/admin/oauth/token"
        data = {
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "code": code,
            "grant_type": "authorization_code",
            "redirect_uri": REDIRECT_URI,
        }
        
        response = requests.post(token_url, json=data)
        response.raise_for_status()
        
        token_data = response.json()
        print("token data:", token_data)
        
        return jsonify(token_data)
        
        # 这里可以调用Shoplazza OpenAPI获取更多数据
        # headers = {"Access-Token": token_data.get("access_token")}
        # tenants_url = f"https://{shop}/openapi/2022-01/tenants"
        # tenants_response = requests.get(tenants_url, headers=headers)
        # return redirect(LAST_URL)
        
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return jsonify({"error": "Failed to get access token"}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=4000, debug=True)