OAuth流程架构师 oauth-flow-architect

这个技能专注于实现OAuth 2.0和OpenID Connect身份认证流程,确保安全性和正确性。它包括授权码流、PKCE、令牌管理、安全考虑和常见提供商集成。关键词:OAuth, OpenID Connect, 身份认证, 令牌管理, 安全, 后端开发, 架构设计

架构设计 0 次安装 0 次浏览 更新于 3/7/2026

name: oauth-flow-architect description: 实现OAuth 2.0和OpenID Connect身份认证流程,具有适当的安全性、令牌管理和常见提供商集成。 license: MIT

OAuth 流程架构师

此技能提供实现OAuth 2.0和OpenID Connect(OIDC)身份认证流程的安全和正确指导。

核心能力

  • OAuth 2.0 流程:授权码、PKCE、客户端凭证
  • OpenID Connect:ID令牌、UserInfo、发现
  • 令牌管理:刷新、撤销、存储
  • 安全:CSRF、令牌盗窃、重定向URI验证

OAuth 2.0 基础

OAuth 解决的问题

没有OAuth:                   有OAuth:
┌──────┐  凭证  ┌──────┐  ┌──────┐            ┌──────┐
│ 用户 │──────────────▶│ 应用 │  │ 用户 │            │ 应用 │
└──────┘               └──┬───┘  └──┬───┘            └──┬───┘
                          │         │ 在提供者处登录     │
                          │         │                   │
                          ▼         ▼                   │
                       ┌──────┐  ┌──────┐  令牌     ┌──────┐
                       │谷歌  │  │谷歌  │───────────▶│谷歌  │
                       └──────┘  └──────┘            └──────┘

应用有您的密码        应用从未看到密码

OAuth 角色

角色 描述 示例
资源所有者 拥有数据的用户 终端用户
客户端 请求访问的应用 您的应用
授权服务器 颁发令牌 谷歌、Auth0
资源服务器 托管受保护资源 谷歌API

授权类型概述

授权类型 使用场景 安全级别
授权码 + PKCE 网络应用、移动应用、SPA 最高
授权码 传统服务器应用
客户端凭证 机器到机器
刷新令牌 令牌续期
隐式(已弃用) 传统SPA
密码(已弃用) 传统迁移

授权码流与PKCE

推荐用于所有面向用户的应用。

流程图

┌──────┐                              ┌─────────────┐                    ┌──────────┐
│ 用户 │                              │   客户端    │                    │  授权    │
│      │                              │   (应用)  │                    │  服务器  │
└──┬───┘                              └──────┬──────┘                    └────┬─────┘
   │  1. 点击“登录”                       │                               │
   │────────────────────────────────────────▶│                               │
   │                                         │  2. 生成code_verifier        │
   │                                         │     code_challenge = SHA256() │
   │                                         │                               │
   │  3. 重定向到授权端点                 │                               │
   │◀────────────────────────────────────────│                               │
   │                                         │                               │
   │  4. 重定向(在授权服务器登录)       │                               │
   │────────────────────────────────────────────────────────────────────────▶│
   │                                         │                               │
   │  5. 用户认证和同意                   │                               │
   │◀────────────────────────────────────────────────────────────────────────│
   │                                         │                               │
   │  6. 重定向返回授权码                 │                               │
   │────────────────────────────────────────▶│                               │
   │                                         │                               │
   │                                         │  7. 交换代码 + verifier      │
   │                                         │     获取令牌                  │
   │                                         │──────────────────────────────▶│
   │                                         │                               │
   │                                         │  8. 访问令牌 + ID令牌        │
   │                                         │◀──────────────────────────────│
   │                                         │                               │
   │  9. 用户登录成功                       │                               │
   │◀────────────────────────────────────────│                               │

实现

import secrets
import hashlib
import base64
from urllib.parse import urlencode

class OAuthClient:
    """带PKCE的OAuth 2.0客户端"""

    def __init__(self, config):
        self.client_id = config['client_id']
        self.client_secret = config.get('client_secret')  # PKCE可选
        self.redirect_uri = config['redirect_uri']
        self.authorization_endpoint = config['authorization_endpoint']
        self.token_endpoint = config['token_endpoint']
        self.scopes = config.get('scopes', ['openid', 'profile', 'email'])

    def generate_pkce(self):
        """生成PKCE代码验证器和挑战"""
        # 代码验证器:43-128字符,URL安全
        code_verifier = secrets.token_urlsafe(32)

        # 代码挑战:验证器的SHA256哈希
        digest = hashlib.sha256(code_verifier.encode()).digest()
        code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()

        return code_verifier, code_challenge

    def get_authorization_url(self, state=None):
        """构建重定向的授权URL"""
        code_verifier, code_challenge = self.generate_pkce()

        # 状态参数用于CSRF保护
        state = state or secrets.token_urlsafe(16)

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'scope': ' '.join(self.scopes),
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }

        url = f"{self.authorization_endpoint}?{urlencode(params)}"

        return {
            'url': url,
            'state': state,
            'code_verifier': code_verifier  # 服务器端存储
        }

    async def exchange_code(self, code, code_verifier):
        """交换授权码获取令牌"""
        data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'code': code,
            'redirect_uri': self.redirect_uri,
            'code_verifier': code_verifier
        }

        # 如果是机密客户端,包含client_secret
        if self.client_secret:
            data['client_secret'] = self.client_secret

        response = await self.http.post(
            self.token_endpoint,
            data=data,
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )

        if response.status_code != 200:
            raise OAuthError(response.json())

        return response.json()  # {access_token, refresh_token, id_token, ...}

回调处理

from flask import request, session, redirect

@app.route('/callback')
async def oauth_callback():
    # 验证状态以防止CSRF
    state = request.args.get('state')
    stored_state = session.get('oauth_state')

    if not state or state != stored_state:
        return '无效的状态参数', 400

    # 检查错误
    error = request.args.get('error')
    if error:
        error_desc = request.args.get('error_description', '未知错误')
        return f'OAuth错误: {error_desc}', 400

    # 交换代码获取令牌
    code = request.args.get('code')
    code_verifier = session.get('oauth_code_verifier')

    try:
        tokens = await oauth_client.exchange_code(code, code_verifier)
    except OAuthError as e:
        return f'令牌交换失败: {e}', 400

    # 如果使用OIDC,验证ID令牌
    if 'id_token' in tokens:
        user_info = validate_id_token(tokens['id_token'])
    else:
        user_info = await fetch_userinfo(tokens['access_token'])

    # 创建会话
    session['user'] = user_info
    session['tokens'] = tokens

    # 清理OAuth状态
    session.pop('oauth_state', None)
    session.pop('oauth_code_verifier', None)

    return redirect('/dashboard')

OpenID Connect

OIDC在OAuth 2.0之上添加身份层。

ID令牌结构

# ID令牌是一个包含声明的JWT
{
    # 标准声明
    "iss": "https://accounts.google.com",  # 发行者
    "sub": "110169484474386276334",         # 主体(用户ID)
    "aud": "your-client-id",                # 受众
    "exp": 1706616000,                      # 过期时间
    "iat": 1706612400,                      # 签发时间
    "nonce": "abc123",                      # 重放保护

    # 个人资料声明
    "name": "Alice Smith",
    "email": "alice@example.com",
    "email_verified": true,
    "picture": "https://..."
}

ID令牌验证

import jwt
from jwt import PyJWKClient

class IDTokenValidator:
    """验证OIDC ID令牌"""

    def __init__(self, issuer, client_id, jwks_uri):
        self.issuer = issuer
        self.client_id = client_id
        self.jwks_client = PyJWKClient(jwks_uri)

    def validate(self, id_token, nonce=None):
        """验证并解码ID令牌"""
        try:
            # 获取签名密钥
            signing_key = self.jwks_client.get_signing_key_from_jwt(id_token)

            # 解码和验证
            claims = jwt.decode(
                id_token,
                signing_key.key,
                algorithms=['RS256'],
                audience=self.client_id,
                issuer=self.issuer
            )

            # 如果提供nonce,验证nonce
            if nonce and claims.get('nonce') != nonce:
                raise ValueError('无效的nonce')

            return claims

        except jwt.ExpiredSignatureError:
            raise AuthenticationError('ID令牌已过期')
        except jwt.InvalidAudienceError:
            raise AuthenticationError('无效的受众')
        except jwt.InvalidIssuerError:
            raise AuthenticationError('无效的发行者')
        except Exception as e:
            raise AuthenticationError(f'令牌验证失败: {e}')

OIDC发现

async def discover_oidc_config(issuer):
    """获取OIDC提供者配置"""
    discovery_url = f"{issuer.rstrip('/')}/.well-known/openid-configuration"

    response = await http.get(discovery_url)
    config = response.json()

    return {
        'authorization_endpoint': config['authorization_endpoint'],
        'token_endpoint': config['token_endpoint'],
        'userinfo_endpoint': config['userinfo_endpoint'],
        'jwks_uri': config['jwks_uri'],
        'scopes_supported': config['scopes_supported'],
        'response_types_supported': config['response_types_supported']
    }

# 示例:谷歌
# https://accounts.google.com/.well-known/openid-configuration

客户端凭证流

用于机器到机器认证(不涉及用户)。

class ClientCredentialsAuth:
    """OAuth客户端凭证流"""

    def __init__(self, client_id, client_secret, token_endpoint):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_endpoint = token_endpoint
        self._token = None
        self._token_expiry = None

    async def get_token(self, scopes=None):
        """获取访问令牌,需要时刷新"""
        if self._token and self._token_expiry > time.time():
            return self._token

        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        if scopes:
            data['scope'] = ' '.join(scopes)

        response = await http.post(
            self.token_endpoint,
            data=data,
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )

        tokens = response.json()
        self._token = tokens['access_token']
        self._token_expiry = time.time() + tokens.get('expires_in', 3600) - 60

        return self._token

令牌管理

刷新令牌流

class TokenManager:
    """管理访问和刷新令牌"""

    def __init__(self, oauth_client, token_storage):
        self.oauth = oauth_client
        self.storage = token_storage

    async def get_valid_access_token(self, user_id):
        """获取有效的访问令牌,需要时刷新"""
        tokens = await self.storage.get_tokens(user_id)

        if not tokens:
            raise AuthenticationError('未找到令牌')

        # 检查访问令牌是否仍有效(带缓冲)
        if tokens.get('expires_at', 0) > time.time() + 60:
            return tokens['access_token']

        # 刷新令牌
        if 'refresh_token' not in tokens:
            raise AuthenticationError('无刷新令牌,需要重新认证')

        new_tokens = await self._refresh(tokens['refresh_token'])
        await self.storage.save_tokens(user_id, new_tokens)

        return new_tokens['access_token']

    async def _refresh(self, refresh_token):
        """交换刷新令牌获取新的访问令牌"""
        data = {
            'grant_type': 'refresh_token',
            'client_id': self.oauth.client_id,
            'refresh_token': refresh_token
        }

        if self.oauth.client_secret:
            data['client_secret'] = self.oauth.client_secret

        response = await http.post(
            self.oauth.token_endpoint,
            data=data
        )

        if response.status_code != 200:
            raise TokenRefreshError(response.json())

        tokens = response.json()
        tokens['expires_at'] = time.time() + tokens.get('expires_in', 3600)

        return tokens

令牌存储安全

class SecureTokenStorage:
    """安全存储令牌"""

    def __init__(self, encryption_key, backend):
        self.cipher = Fernet(encryption_key)
        self.backend = backend  # Redis、数据库等

    async def save_tokens(self, user_id, tokens):
        """加密并存储令牌"""
        # 加密敏感字段
        encrypted = {
            'access_token': self._encrypt(tokens['access_token']),
            'expires_at': tokens['expires_at']
        }

        if 'refresh_token' in tokens:
            encrypted['refresh_token'] = self._encrypt(tokens['refresh_token'])

        if 'id_token' in tokens:
            # ID令牌不需要加密(它是签名的,不是秘密)
            encrypted['id_token'] = tokens['id_token']

        await self.backend.set(f"tokens:{user_id}", json.dumps(encrypted))

    async def get_tokens(self, user_id):
        """检索并解密令牌"""
        data = await self.backend.get(f"tokens:{user_id}")
        if not data:
            return None

        encrypted = json.loads(data)

        return {
            'access_token': self._decrypt(encrypted['access_token']),
            'refresh_token': self._decrypt(encrypted.get('refresh_token', '')),
            'expires_at': encrypted['expires_at'],
            'id_token': encrypted.get('id_token')
        }

    def _encrypt(self, value):
        if not value:
            return ''
        return self.cipher.encrypt(value.encode()).decode()

    def _decrypt(self, value):
        if not value:
            return ''
        return self.cipher.decrypt(value.encode()).decode()

安全考虑

重定向URI验证

def validate_redirect_uri(redirect_uri, registered_uris):
    """严格的重定向URI验证"""
    # 要求完全匹配(生产环境无通配符)
    if redirect_uri not in registered_uris:
        raise SecurityError('无效的重定向URI')

    # 额外检查
    parsed = urlparse(redirect_uri)

    # 必须使用HTTPS(开发环境localhost除外)
    if parsed.scheme != 'https':
        if parsed.hostname not in ('localhost', '127.0.0.1'):
            raise SecurityError('重定向URI必须使用HTTPS')

    # 无片段
    if parsed.fragment:
        raise SecurityError('重定向URI不能有片段')

    return True

CSRF保护

# 状态参数防止CSRF攻击

# 1. 重定向前生成状态
state = secrets.token_urlsafe(32)
session['oauth_state'] = state

# 2. 包含在授权URL中
auth_url = f"{authorization_endpoint}?state={state}&..."

# 3. 在回调时验证
if request.args.get('state') != session.get('oauth_state'):
    abort(400, '检测到CSRF')

常见漏洞

漏洞 预防措施
CSRF 状态参数、SameSite cookies
令牌盗窃 仅HTTPS、安全存储
开放重定向 严格的重定向URI验证
代码注入 PKCE、短寿命代码
重放 ID令牌中的nonce

提供者特定设置

谷歌

GOOGLE_CONFIG = {
    'client_id': 'xxx.apps.googleusercontent.com',
    'client_secret': 'xxx',
    'authorization_endpoint': 'https://accounts.google.com/o/oauth2/v2/auth',
    'token_endpoint': 'https://oauth2.googleapis.com/token',
    'userinfo_endpoint': 'https://openidconnect.googleapis.com/v1/userinfo',
    'scopes': ['openid', 'email', 'profile']
}

GitHub(OAuth 2.0,非OIDC)

GITHUB_CONFIG = {
    'client_id': 'xxx',
    'client_secret': 'xxx',
    'authorization_endpoint': 'https://github.com/login/oauth/authorize',
    'token_endpoint': 'https://github.com/login/oauth/access_token',
    'userinfo_endpoint': 'https://api.github.com/user',
    'scopes': ['read:user', 'user:email']
}

参考资料

  • references/oauth-security.md - 安全最佳实践和威胁
  • references/provider-configs.md - 常见提供者配置
  • references/token-patterns.md - 令牌存储和刷新模式