auth.py 4.31 KB
import secrets
import requests
from flask import Blueprint, request, redirect, jsonify, current_app
from config import Config
from middleware.hmac_validator import hmac_validator_required

# 创建认证蓝图
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')

@auth_bp.route('/install')
def install():
    """
    Step 1: 处理应用安装请求
    当商家从应用商店安装应用时,Shoplazza会发送请求到这个端点
    """
    shop = request.args.get('shop')
    if not shop:
        return jsonify({'error': 'Missing shop parameter'}), 400
    
    # 验证shop参数格式
    if not shop.endswith('.myshoplaza.com'):
        return jsonify({'error': 'Invalid shop parameter'}), 400
    
    # 定义需要的权限范围
    scopes = "read_customer,read_product,write_product,read_order,read_shop"
    
    # 生成随机state参数防止CSRF攻击
    state = secrets.token_hex(16)
    
    # 构建授权URL
    auth_url = (
        f"https://{shop}/admin/oauth/authorize?"
        f"client_id={Config.CLIENT_ID}&"
        f"scope={scopes}&"
        f"redirect_uri={Config.BASE_URL}{Config.REDIRECT_URI}&"
        f"response_type=code&"
        f"state={state}"
    )
    
    return redirect(auth_url)

@auth_bp.route('/shoplazza/callback')
@hmac_validator_required
def callback():
    """
    Step 2: 处理OAuth回调
    商家授权后,Shoplazza会重定向到这个端点
    """
    code = request.args.get('code')
    shop = request.args.get('shop')
    state = request.args.get('state')
    
    if not shop or not code:
        return jsonify({'error': 'Missing required parameters'}), 400
    
    try:
        # 交换授权码获取访问令牌
        token_data = exchange_code_for_token(code, shop)
        
        # 存储访问令牌(生产环境应使用数据库)
        Config.ACCESS_TOKENS[shop] = token_data
        
        current_app.logger.info(f"获取access_token成功 for shop: {shop}")
        
        return jsonify({
            'message': 'Authorization successful',
            'shop': shop,
            'store_id': token_data.get('store_id'),
            'store_name': token_data.get('store_name')
        })
        
    except Exception as e:
        current_app.logger.error(f"获取access_token失败: {str(e)}")
        return jsonify({'error': 'Failed to obtain access token'}), 500

def exchange_code_for_token(code, shop):
    """
    使用授权码交换访问令牌
    """
    token_url = f"https://{shop}/admin/oauth/token"
    
    data = {
        'client_id': Config.CLIENT_ID,
        'client_secret': Config.CLIENT_SECRET,
        'code': code,
        'grant_type': 'authorization_code',
        'redirect_uri': f"{Config.BASE_URL}{Config.REDIRECT_URI}"
    }
    
    response = requests.post(token_url, data=data)
    response.raise_for_status()
    
    return response.json()

@auth_bp.route('/refresh_token/<shop>')
def refresh_token(shop):
    """
    刷新访问令牌
    """
    if shop not in Config.ACCESS_TOKENS:
        return jsonify({'error': 'Shop not found'}), 404
    
    token_data = Config.ACCESS_TOKENS[shop]
    refresh_token_value = token_data.get('refresh_token')
    
    if not refresh_token_value:
        return jsonify({'error': 'No refresh token available'}), 400
    
    try:
        refresh_url = f"https://{shop}/admin/oauth/token"
        
        data = {
            'client_id': Config.CLIENT_ID,
            'client_secret': Config.CLIENT_SECRET,
            'refresh_token': refresh_token_value,
            'grant_type': 'refresh_token',
            'redirect_uri': f"{Config.BASE_URL}{Config.REDIRECT_URI}"
        }
        
        response = requests.post(refresh_url, data=data)
        response.raise_for_status()
        
        # 更新存储的令牌
        Config.ACCESS_TOKENS[shop] = response.json()
        
        return jsonify({
            'message': 'Token refreshed successfully',
            'new_token': response.json()
        })
        
    except Exception as e:
        current_app.logger.error(f"刷新token失败: {str(e)}")
        return jsonify({'error': 'Failed to refresh token'}), 500

@auth_bp.route('/tokens')
def list_tokens():
    """
    列出所有已授权的商店令牌(仅用于调试)
    """
    return jsonify({
        'authorized_shops': list(Config.ACCESS_TOKENS.keys()),
        'tokens': Config.ACCESS_TOKENS
    })