零信任架构
概览
基于“永不信任,始终验证”的原则实施全面的零信任安全架构,以身份为中心的安全、微分割和持续验证为核心。
何时使用
- 云原生应用
- 微服务架构
- 远程劳动力安全
- API安全
- 多云部署
- 遗留现代化
- 合规要求
实施示例
1. 零信任网关
// zero-trust-gateway.js
const jwt = require('jsonwebtoken');
const axios = require('axios');
class ZeroTrustGateway {
constructor() {
this.identityProvider = process.env.IDENTITY_PROVIDER_URL;
this.deviceRegistry = new Map();
this.sessionContext = new Map();
}
/**
* 验证身份 - 你是谁?
*/
async verifyIdentity(token) {
try {
// 验证JWT令牌
const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY, {
algorithms: ['RS256']
});
// 检查令牌是否已被吊销
const revoked = await this.checkTokenRevocation(decoded.jti);
if (revoked) {
throw new Error('令牌已被吊销');
}
return {
valid: true,
userId: decoded.sub,
roles: decoded.roles,
permissions: decoded.permissions
};
} catch (error) {
return { valid: false, error: error.message };
}
}
/**
* 验证设备 - 你使用的是什么设备?
*/
async verifyDevice(deviceId, deviceFingerprint) {
const registered = this.deviceRegistry.get(deviceId);
if (!registered) {
return {
trusted: false,
reason: '设备未注册'
};
}
// 检查设备指纹是否匹配
if (registered.fingerprint !== deviceFingerprint) {
return {
trusted: false,
reason: '设备指纹不匹配'
};
}
// 检查设备合规性
const compliance = await this.checkDeviceCompliance(deviceId);
return {
trusted: compliance.compliant,
reason: compliance.reason,
riskScore: compliance.riskScore
};
}
/**
* 验证位置 - 你在哪里?
*/
async verifyLocation(ip, expectedCountry) {
try {
// 获取地理位置数据
const geoData = await this.getGeoLocation(ip);
// 检查不可能的旅行
const lastLocation = this.getLastKnownLocation(ip);
if (lastLocation) {
const impossibleTravel = this.detectImpossibleTravel(
lastLocation,
geoData,
Date.now() - lastLocation.timestamp
);
if (impossibleTravel) {
return {
valid: false,
reason: '检测到不可能的旅行',
riskScore: 9
};
}
}
// 检查允许的位置
if (expectedCountry && geoData.country !== expectedCountry) {
return {
valid: false,
reason: '意外的位置',
riskScore: 7
};
}
return {
valid: true,
location: geoData,
riskScore: 1
};
} catch (error) {
return {
valid: false,
reason: '位置验证失败',
riskScore: 5
};
}
}
/**
* 验证授权 - 你能访问什么?
*/
async verifyAuthorization(userId, resource, action, context) {
// 获取用户权限
const user = await this.getUserPermissions(userId);
// 检查直接权限
if (this.hasPermission(user, resource, action)) {
return { authorized: true, reason: '直接权限' };
}
// 检查基于角色的权限
for (const role of user.roles) {
if (this.hasRolePermission(role, resource, action)) {
return { authorized: true, reason: `角色: ${role}` };
}
}
// 检查基于属性的策略
const abacResult = await this.evaluateABAC(user, resource, action, context);
if (abacResult.allowed) {
return { authorized: true, reason: 'ABAC策略' };
}
return {
authorized: false,
reason: '权限不足'
};
}
/**
* 计算风险评分
*/
calculateRiskScore(factors) {
let score = 0;
// 身份因素
if (!factors.mfaUsed) score += 3;
if (factors.newDevice) score += 2;
// 位置因素
if (factors.unusualLocation) score += 3;
if (factors.vpnDetected) score += 1;
// 行为因素
if (factors.unusualTime) score += 2;
if (factors.rapidRequests) score += 2;
// 设备因素
if (!factors.deviceCompliant) score += 4;
if (factors.jailbroken) score += 5;
return Math.min(score, 10);
}
/**
* 持续验证中间件
*/
middleware() {
return async (req, res, next) => {
const startTime = Date.now();
try {
// 提取认证令牌
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({
error: 'unauthorized',
message: '未提供认证令牌'
});
}
// 第1步:验证身份
const identity = await this.verifyIdentity(token);
if (!identity.valid) {
return res.status(401).json({
error: 'unauthorized',
message: '无效的身份'
});
}
// 第2步:验证设备
const deviceId = req.headers['x-device-id'];
const deviceFingerprint = req.headers['x-device-fingerprint'];
if (deviceId && deviceFingerprint) {
const device = await this.verifyDevice(deviceId, deviceFingerprint);
if (!device.trusted) {
return res.status(403).json({
error: 'forbidden',
message: device.reason
});
}
}
// 第3步:验证位置
const location = await this.verifyLocation(req.ip);
if (!location.valid) {
// 需要加强认证
return res.status(403).json({
error: 'forbidden',
message: '需要额外认证',
requiresStepUp: true
});
}
// 第4步:计算风险评分
const riskScore = this.calculateRiskScore({
mfaUsed: identity.mfaUsed,
newDevice: !deviceId,
unusualLocation: location.riskScore > 5,
deviceCompliant: true
});
// 第5步:验证授权
const authorization = await this.verifyAuthorization(
identity.userId,
req.path,
req.method,
{
ip: req.ip,
riskScore,
time: new Date()
}
);
if (!authorization.authorized) {
return res.status(403).json({
error: 'forbidden',
message: authorization.reason
});
}
// 添加上下文到请求
req.zeroTrust = {
userId: identity.userId,
roles: identity.roles,
riskScore,
verificationTime: Date.now() - startTime
};
// 记录访问
this.logAccess(req, identity, riskScore);
next();
} catch (error) {
console.error('零信任验证失败:', error);
return res.status(500).json({
error: 'internal_error',
message: '安全验证失败'
});
}
};
}
async checkTokenRevocation(jti) {
// 检查吊销列表
return false;
}
async checkDeviceCompliance(deviceId) {
// 检查设备是否符合安全要求
return {
compliant: true,
reason: '设备符合要求',
riskScore: 1
};
}
async getGeoLocation(ip) {
// 从IP获取地理位置
return {
country: 'US',
city: 'San Francisco',
lat: 37.7749,
lon: -122.4194
};
}
getLastKnownLocation(ip) {
return null;
}
detectImpossibleTravel(lastLocation, currentLocation, timeDiff) {
// 计算旅行是否可能
return false;
}
async getUserPermissions(userId) {
// 获取用户权限
return {
roles: ['user'],
permissions: []
};
}
hasPermission(user, resource, action) {
return false;
}
hasRolePermission(role, resource, action) {
return false;
}
async evaluateABAC(user, resource, action, context) {
return { allowed: false };
}
logAccess(req, identity, riskScore) {
console.log({
timestamp: new Date().toISOString(),
userId: identity.userId,
resource: req.path,
method: req.method,
riskScore,
ip: req.ip
});
}
}
// Express设置
const express = require('express');
const app = express();
const ztGateway = new ZeroTrustGateway();
// 应用零信任中间件
app.use(ztGateway.middleware());
// 受保护的端点
app.get('/api/sensitive-data', (req, res) => {
res.json({
message: '访问授权',
riskScore: req.zeroTrust.riskScore
});
});
module.exports = ZeroTrustGateway;
2. 服务网格 - 微分割
# istio-zero-trust.yaml
# Istio配置零信任微分割
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT # 要求相互TLS
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: deny-all
namespace: production
spec:
{} # 默认拒绝所有
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-frontend-to-backend
namespace: production
spec:
selector:
matchLabels:
app: backend
action: ALLOW
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/frontend"]
to:
- operation:
methods: ["GET", "POST"]
paths: ["/api/*"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-backend-to-database
namespace: production
spec:
selector:
matchLabels:
app: database
action: ALLOW
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/backend"]
to:
- operation:
methods: ["*"]
---
# JWT认证
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: production
spec:
selector:
matchLabels:
app: backend
jwtRules:
- issuer: "https://auth.example.com"
jwksUri: "https://auth.example.com/.well-known/jwks.json"
audiences:
- "api.example.com"
---
# 网络策略 - 额外的防御层
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: backend-network-policy
namespace: production
spec:
podSelector:
matchLabels:
app: backend
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: frontend
ports:
- protocol: TCP
port: 8080
egress:
- to:
- podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
3. Python零信任策略引擎
# zero_trust_policy.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
import jwt
@dataclass
class ZeroTrustContext:
user_id: str
device_id: str
location: Dict[str, Any]
risk_score: int
timestamp: datetime
class ZeroTrustPolicy:
def __init__(self):
self.policies = self.load_policies()
def load_policies(self) -> List[Dict]:
"""加载零信任策略"""
return [
{
'name': 'high_risk_block',
'condition': lambda ctx: ctx.risk_score >= 8,
'action': 'deny',
'reason': '高风险评分'
},
{
'name': 'require_mfa',
'condition': lambda ctx: ctx.risk_score >= 5,
'action': 'step_up_auth',
'reason': '提高风险需要MFA'
},
{
'name': 'untrusted_device',
'condition': lambda ctx: not self.is_device_trusted(ctx.device_id),
'action': 'deny',
'reason': '不信任的设备'
},
{
'name': 'unusual_location',
'condition': lambda ctx: self.is_unusual_location(ctx),
'action': 'step_up_auth',
'reason': '检测到不寻常的位置'
}
]
def evaluate(self, context: ZeroTrustContext, resource: str, action: str) -> Dict:
"""评估零信任策略"""
for policy in self.policies:
if policy['condition'](context):
return {
'allowed': policy['action'] != 'deny',
'action': policy['action'],
'reason': policy['reason'],
'policy': policy['name']
}
# 检查资源特定权限
if self.has_permission(context.user_id, resource, action):
return {
'allowed': True,
'action': 'allow',
'reason': '用户具有所需权限'
}
return {
'allowed': False,
'action': 'deny',
'reason': '没有匹配的策略允许访问'
}
def is_device_trusted(self, device_id: str) -> bool:
# 检查设备信任状态
return True
def is_unusual_location(self, context: ZeroTrustContext) -> bool:
# 检查位置是否不寻常
return False
def has_permission(self, user_id: str, resource: str, action: str) -> bool:
# 检查用户权限
return False
# Flask集成
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
zt_policy = ZeroTrustPolicy()
def zero_trust_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 构建零信任上下文
context = ZeroTrustContext(
user_id=g.user_id,
device_id=request.headers.get('X-Device-ID', 'unknown'),
location={
'ip': request.remote_addr,
'country': 'US' # 来自GeoIP
},
risk_score=calculate_risk_score(request),
timestamp=datetime.utcnow()
)
# 评估策略
result = zt_policy.evaluate(
context,
request.path,
request.method
)
if not result['allowed']:
return jsonify({
'error': 'forbidden',
'reason': result['reason'],
'action_required': result['action']
}), 403
return f(*args, **kwargs)
return decorated_function
def calculate_risk_score(request) -> int:
"""基于请求上下文计算风险评分"""
score = 0
# 检查可疑模式
if not request.headers.get('X-Device-ID'):
score += 2
# 添加更多风险因素
return min(score, 10)
@app.route('/api/sensitive', methods=['GET'])
@zero_trust_required
def get_sensitive_data():
return jsonify({'data': '敏感信息'})
零信任原则
- 明确验证:始终认证和授权
- 最小权限:所需的最小访问权限
- 假设泄露:限制破坏范围
- 微分割:网络分割
- 持续监控:实时安全
- 设备信任:验证设备合规性
- 数据保护:加密一切
实施层
- 身份层:强认证(MFA,SSO)
- 设备层:设备合规性,证书
- 网络层:微分割,mTLS
- 应用层:API网关,授权
- 数据层:加密,DLP
零信任成熟度模型
- 传统:基于边界的安全
- 高级:一些分割
- 最优:完整的零信任
- 进步:持续改进
最佳实践
✅ DO
- 验证每个请求
- 到处实施MFA
- 使用微分割
- 持续监控
- 加密所有通信
- 实施最小权限
- 记录所有访问
- 定期审计
❌ DON’T
- 信任网络位置
- 使用隐式信任
- 跳过设备验证
- 允许侧向移动
- 使用静态凭据