name: oauth-flow-architect description: 实现OAuth 2.0和OpenID Connect身份认证流程,具有适当的安全性、令牌管理和常见提供商集成。 license: MIT
OAuth 流程架构师
此技能提供实现OAuth 2.0和OpenID Connect(OIDC)身份认证流程的安全和正确指导。
核心能力
- OAuth 2.0 流程:授权码、PKCE、客户端凭证
- OpenID Connect:ID令牌、UserInfo、发现
- 令牌管理:刷新、撤销、存储
- 安全:CSRF、令牌盗窃、重定向URI验证
OAuth 2.0 基础
OAuth 解决的问题
没有OAuth: 有OAuth:
┌──────┐ 凭证 ┌──────┐ ┌──────┐ ┌──────┐
│ 用户 │──────────────▶│ 应用 │ │ 用户 │ │ 应用 │
└──────┘ └──┬───┘ └──┬───┘ └──┬───┘
│ │ 在提供者处登录 │
│ │ │
▼ ▼ │
┌──────┐ ┌──────┐ 令牌 ┌──────┐
│谷歌 │ │谷歌 │───────────▶│谷歌 │
└──────┘ └──────┘ └──────┘
应用有您的密码 应用从未看到密码
OAuth 角色
| 角色 | 描述 | 示例 |
|---|---|---|
| 资源所有者 | 拥有数据的用户 | 终端用户 |
| 客户端 | 请求访问的应用 | 您的应用 |
| 授权服务器 | 颁发令牌 | 谷歌、Auth0 |
| 资源服务器 | 托管受保护资源 | 谷歌API |
授权类型概述
| 授权类型 | 使用场景 | 安全级别 |
|---|---|---|
| 授权码 + PKCE | 网络应用、移动应用、SPA | 最高 |
| 授权码 | 传统服务器应用 | 高 |
| 客户端凭证 | 机器到机器 | 高 |
| 刷新令牌 | 令牌续期 | 高 |
| 隐式(已弃用) | 传统SPA | 低 |
| 密码(已弃用) | 传统迁移 | 低 |
授权码流与PKCE
推荐用于所有面向用户的应用。
流程图
┌──────┐ ┌─────────────┐ ┌──────────┐
│ 用户 │ │ 客户端 │ │ 授权 │
│ │ │ (应用) │ │ 服务器 │
└──┬───┘ └──────┬──────┘ └────┬─────┘
│ 1. 点击“登录” │ │
│────────────────────────────────────────▶│ │
│ │ 2. 生成code_verifier │
│ │ code_challenge = SHA256() │
│ │ │
│ 3. 重定向到授权端点 │ │
│◀────────────────────────────────────────│ │
│ │ │
│ 4. 重定向(在授权服务器登录) │ │
│────────────────────────────────────────────────────────────────────────▶│
│ │ │
│ 5. 用户认证和同意 │ │
│◀────────────────────────────────────────────────────────────────────────│
│ │ │
│ 6. 重定向返回授权码 │ │
│────────────────────────────────────────▶│ │
│ │ │
│ │ 7. 交换代码 + verifier │
│ │ 获取令牌 │
│ │──────────────────────────────▶│
│ │ │
│ │ 8. 访问令牌 + ID令牌 │
│ │◀──────────────────────────────│
│ │ │
│ 9. 用户登录成功 │ │
│◀────────────────────────────────────────│ │
实现
import secrets
import hashlib
import base64
from urllib.parse import urlencode
class OAuthClient:
"""带PKCE的OAuth 2.0客户端"""
def __init__(self, config):
self.client_id = config['client_id']
self.client_secret = config.get('client_secret') # PKCE可选
self.redirect_uri = config['redirect_uri']
self.authorization_endpoint = config['authorization_endpoint']
self.token_endpoint = config['token_endpoint']
self.scopes = config.get('scopes', ['openid', 'profile', 'email'])
def generate_pkce(self):
"""生成PKCE代码验证器和挑战"""
# 代码验证器:43-128字符,URL安全
code_verifier = secrets.token_urlsafe(32)
# 代码挑战:验证器的SHA256哈希
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
return code_verifier, code_challenge
def get_authorization_url(self, state=None):
"""构建重定向的授权URL"""
code_verifier, code_challenge = self.generate_pkce()
# 状态参数用于CSRF保护
state = state or secrets.token_urlsafe(16)
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': ' '.join(self.scopes),
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
url = f"{self.authorization_endpoint}?{urlencode(params)}"
return {
'url': url,
'state': state,
'code_verifier': code_verifier # 服务器端存储
}
async def exchange_code(self, code, code_verifier):
"""交换授权码获取令牌"""
data = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'code': code,
'redirect_uri': self.redirect_uri,
'code_verifier': code_verifier
}
# 如果是机密客户端,包含client_secret
if self.client_secret:
data['client_secret'] = self.client_secret
response = await self.http.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if response.status_code != 200:
raise OAuthError(response.json())
return response.json() # {access_token, refresh_token, id_token, ...}
回调处理
from flask import request, session, redirect
@app.route('/callback')
async def oauth_callback():
# 验证状态以防止CSRF
state = request.args.get('state')
stored_state = session.get('oauth_state')
if not state or state != stored_state:
return '无效的状态参数', 400
# 检查错误
error = request.args.get('error')
if error:
error_desc = request.args.get('error_description', '未知错误')
return f'OAuth错误: {error_desc}', 400
# 交换代码获取令牌
code = request.args.get('code')
code_verifier = session.get('oauth_code_verifier')
try:
tokens = await oauth_client.exchange_code(code, code_verifier)
except OAuthError as e:
return f'令牌交换失败: {e}', 400
# 如果使用OIDC,验证ID令牌
if 'id_token' in tokens:
user_info = validate_id_token(tokens['id_token'])
else:
user_info = await fetch_userinfo(tokens['access_token'])
# 创建会话
session['user'] = user_info
session['tokens'] = tokens
# 清理OAuth状态
session.pop('oauth_state', None)
session.pop('oauth_code_verifier', None)
return redirect('/dashboard')
OpenID Connect
OIDC在OAuth 2.0之上添加身份层。
ID令牌结构
# ID令牌是一个包含声明的JWT
{
# 标准声明
"iss": "https://accounts.google.com", # 发行者
"sub": "110169484474386276334", # 主体(用户ID)
"aud": "your-client-id", # 受众
"exp": 1706616000, # 过期时间
"iat": 1706612400, # 签发时间
"nonce": "abc123", # 重放保护
# 个人资料声明
"name": "Alice Smith",
"email": "alice@example.com",
"email_verified": true,
"picture": "https://..."
}
ID令牌验证
import jwt
from jwt import PyJWKClient
class IDTokenValidator:
"""验证OIDC ID令牌"""
def __init__(self, issuer, client_id, jwks_uri):
self.issuer = issuer
self.client_id = client_id
self.jwks_client = PyJWKClient(jwks_uri)
def validate(self, id_token, nonce=None):
"""验证并解码ID令牌"""
try:
# 获取签名密钥
signing_key = self.jwks_client.get_signing_key_from_jwt(id_token)
# 解码和验证
claims = jwt.decode(
id_token,
signing_key.key,
algorithms=['RS256'],
audience=self.client_id,
issuer=self.issuer
)
# 如果提供nonce,验证nonce
if nonce and claims.get('nonce') != nonce:
raise ValueError('无效的nonce')
return claims
except jwt.ExpiredSignatureError:
raise AuthenticationError('ID令牌已过期')
except jwt.InvalidAudienceError:
raise AuthenticationError('无效的受众')
except jwt.InvalidIssuerError:
raise AuthenticationError('无效的发行者')
except Exception as e:
raise AuthenticationError(f'令牌验证失败: {e}')
OIDC发现
async def discover_oidc_config(issuer):
"""获取OIDC提供者配置"""
discovery_url = f"{issuer.rstrip('/')}/.well-known/openid-configuration"
response = await http.get(discovery_url)
config = response.json()
return {
'authorization_endpoint': config['authorization_endpoint'],
'token_endpoint': config['token_endpoint'],
'userinfo_endpoint': config['userinfo_endpoint'],
'jwks_uri': config['jwks_uri'],
'scopes_supported': config['scopes_supported'],
'response_types_supported': config['response_types_supported']
}
# 示例:谷歌
# https://accounts.google.com/.well-known/openid-configuration
客户端凭证流
用于机器到机器认证(不涉及用户)。
class ClientCredentialsAuth:
"""OAuth客户端凭证流"""
def __init__(self, client_id, client_secret, token_endpoint):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self._token = None
self._token_expiry = None
async def get_token(self, scopes=None):
"""获取访问令牌,需要时刷新"""
if self._token and self._token_expiry > time.time():
return self._token
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
if scopes:
data['scope'] = ' '.join(scopes)
response = await http.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
tokens = response.json()
self._token = tokens['access_token']
self._token_expiry = time.time() + tokens.get('expires_in', 3600) - 60
return self._token
令牌管理
刷新令牌流
class TokenManager:
"""管理访问和刷新令牌"""
def __init__(self, oauth_client, token_storage):
self.oauth = oauth_client
self.storage = token_storage
async def get_valid_access_token(self, user_id):
"""获取有效的访问令牌,需要时刷新"""
tokens = await self.storage.get_tokens(user_id)
if not tokens:
raise AuthenticationError('未找到令牌')
# 检查访问令牌是否仍有效(带缓冲)
if tokens.get('expires_at', 0) > time.time() + 60:
return tokens['access_token']
# 刷新令牌
if 'refresh_token' not in tokens:
raise AuthenticationError('无刷新令牌,需要重新认证')
new_tokens = await self._refresh(tokens['refresh_token'])
await self.storage.save_tokens(user_id, new_tokens)
return new_tokens['access_token']
async def _refresh(self, refresh_token):
"""交换刷新令牌获取新的访问令牌"""
data = {
'grant_type': 'refresh_token',
'client_id': self.oauth.client_id,
'refresh_token': refresh_token
}
if self.oauth.client_secret:
data['client_secret'] = self.oauth.client_secret
response = await http.post(
self.oauth.token_endpoint,
data=data
)
if response.status_code != 200:
raise TokenRefreshError(response.json())
tokens = response.json()
tokens['expires_at'] = time.time() + tokens.get('expires_in', 3600)
return tokens
令牌存储安全
class SecureTokenStorage:
"""安全存储令牌"""
def __init__(self, encryption_key, backend):
self.cipher = Fernet(encryption_key)
self.backend = backend # Redis、数据库等
async def save_tokens(self, user_id, tokens):
"""加密并存储令牌"""
# 加密敏感字段
encrypted = {
'access_token': self._encrypt(tokens['access_token']),
'expires_at': tokens['expires_at']
}
if 'refresh_token' in tokens:
encrypted['refresh_token'] = self._encrypt(tokens['refresh_token'])
if 'id_token' in tokens:
# ID令牌不需要加密(它是签名的,不是秘密)
encrypted['id_token'] = tokens['id_token']
await self.backend.set(f"tokens:{user_id}", json.dumps(encrypted))
async def get_tokens(self, user_id):
"""检索并解密令牌"""
data = await self.backend.get(f"tokens:{user_id}")
if not data:
return None
encrypted = json.loads(data)
return {
'access_token': self._decrypt(encrypted['access_token']),
'refresh_token': self._decrypt(encrypted.get('refresh_token', '')),
'expires_at': encrypted['expires_at'],
'id_token': encrypted.get('id_token')
}
def _encrypt(self, value):
if not value:
return ''
return self.cipher.encrypt(value.encode()).decode()
def _decrypt(self, value):
if not value:
return ''
return self.cipher.decrypt(value.encode()).decode()
安全考虑
重定向URI验证
def validate_redirect_uri(redirect_uri, registered_uris):
"""严格的重定向URI验证"""
# 要求完全匹配(生产环境无通配符)
if redirect_uri not in registered_uris:
raise SecurityError('无效的重定向URI')
# 额外检查
parsed = urlparse(redirect_uri)
# 必须使用HTTPS(开发环境localhost除外)
if parsed.scheme != 'https':
if parsed.hostname not in ('localhost', '127.0.0.1'):
raise SecurityError('重定向URI必须使用HTTPS')
# 无片段
if parsed.fragment:
raise SecurityError('重定向URI不能有片段')
return True
CSRF保护
# 状态参数防止CSRF攻击
# 1. 重定向前生成状态
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
# 2. 包含在授权URL中
auth_url = f"{authorization_endpoint}?state={state}&..."
# 3. 在回调时验证
if request.args.get('state') != session.get('oauth_state'):
abort(400, '检测到CSRF')
常见漏洞
| 漏洞 | 预防措施 |
|---|---|
| CSRF | 状态参数、SameSite cookies |
| 令牌盗窃 | 仅HTTPS、安全存储 |
| 开放重定向 | 严格的重定向URI验证 |
| 代码注入 | PKCE、短寿命代码 |
| 重放 | ID令牌中的nonce |
提供者特定设置
谷歌
GOOGLE_CONFIG = {
'client_id': 'xxx.apps.googleusercontent.com',
'client_secret': 'xxx',
'authorization_endpoint': 'https://accounts.google.com/o/oauth2/v2/auth',
'token_endpoint': 'https://oauth2.googleapis.com/token',
'userinfo_endpoint': 'https://openidconnect.googleapis.com/v1/userinfo',
'scopes': ['openid', 'email', 'profile']
}
GitHub(OAuth 2.0,非OIDC)
GITHUB_CONFIG = {
'client_id': 'xxx',
'client_secret': 'xxx',
'authorization_endpoint': 'https://github.com/login/oauth/authorize',
'token_endpoint': 'https://github.com/login/oauth/access_token',
'userinfo_endpoint': 'https://api.github.com/user',
'scopes': ['read:user', 'user:email']
}
参考资料
references/oauth-security.md- 安全最佳实践和威胁references/provider-configs.md- 常见提供者配置references/token-patterns.md- 令牌存储和刷新模式