ZeroTrustArchitecture zero-trust-architecture

零信任架构是一种基于“永不信任,始终验证”原则的安全模型,用于构建安全的云原生应用。

零信任架构 0 次安装 0 次浏览 更新于 3/4/2026

零信任架构

概览

基于“永不信任,始终验证”的原则实施全面的零信任安全架构,以身份为中心的安全、微分割和持续验证为核心。

何时使用

  • 云原生应用
  • 微服务架构
  • 远程劳动力安全
  • API安全
  • 多云部署
  • 遗留现代化
  • 合规要求

实施示例

1. 零信任网关

// zero-trust-gateway.js
const jwt = require('jsonwebtoken');
const axios = require('axios');

class ZeroTrustGateway {
  constructor() {
    this.identityProvider = process.env.IDENTITY_PROVIDER_URL;
    this.deviceRegistry = new Map();
    this.sessionContext = new Map();
  }

  /**
   * 验证身份 - 你是谁?
   */
  async verifyIdentity(token) {
    try {
      // 验证JWT令牌
      const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY, {
        algorithms: ['RS256']
      });

      // 检查令牌是否已被吊销
      const revoked = await this.checkTokenRevocation(decoded.jti);
      if (revoked) {
        throw new Error('令牌已被吊销');
      }

      return {
        valid: true,
        userId: decoded.sub,
        roles: decoded.roles,
        permissions: decoded.permissions
      };
    } catch (error) {
      return { valid: false, error: error.message };
    }
  }

  /**
   * 验证设备 - 你使用的是什么设备?
   */
  async verifyDevice(deviceId, deviceFingerprint) {
    const registered = this.deviceRegistry.get(deviceId);

    if (!registered) {
      return {
        trusted: false,
        reason: '设备未注册'
      };
    }

    // 检查设备指纹是否匹配
    if (registered.fingerprint !== deviceFingerprint) {
      return {
        trusted: false,
        reason: '设备指纹不匹配'
      };
    }

    // 检查设备合规性
    const compliance = await this.checkDeviceCompliance(deviceId);

    return {
      trusted: compliance.compliant,
      reason: compliance.reason,
      riskScore: compliance.riskScore
    };
  }

  /**
   * 验证位置 - 你在哪里?
   */
  async verifyLocation(ip, expectedCountry) {
    try {
      // 获取地理位置数据
      const geoData = await this.getGeoLocation(ip);

      // 检查不可能的旅行
      const lastLocation = this.getLastKnownLocation(ip);
      if (lastLocation) {
        const impossibleTravel = this.detectImpossibleTravel(
          lastLocation,
          geoData,
          Date.now() - lastLocation.timestamp
        );

        if (impossibleTravel) {
          return {
            valid: false,
            reason: '检测到不可能的旅行',
            riskScore: 9
          };
        }
      }

      // 检查允许的位置
      if (expectedCountry && geoData.country !== expectedCountry) {
        return {
          valid: false,
          reason: '意外的位置',
          riskScore: 7
        };
      }

      return {
        valid: true,
        location: geoData,
        riskScore: 1
      };
    } catch (error) {
      return {
        valid: false,
        reason: '位置验证失败',
        riskScore: 5
      };
    }
  }

  /**
   * 验证授权 - 你能访问什么?
   */
  async verifyAuthorization(userId, resource, action, context) {
    // 获取用户权限
    const user = await this.getUserPermissions(userId);

    // 检查直接权限
    if (this.hasPermission(user, resource, action)) {
      return { authorized: true, reason: '直接权限' };
    }

    // 检查基于角色的权限
    for (const role of user.roles) {
      if (this.hasRolePermission(role, resource, action)) {
        return { authorized: true, reason: `角色: ${role}` };
      }
    }

    // 检查基于属性的策略
    const abacResult = await this.evaluateABAC(user, resource, action, context);
    if (abacResult.allowed) {
      return { authorized: true, reason: 'ABAC策略' };
    }

    return {
      authorized: false,
      reason: '权限不足'
    };
  }

  /**
   * 计算风险评分
   */
  calculateRiskScore(factors) {
    let score = 0;

    // 身份因素
    if (!factors.mfaUsed) score += 3;
    if (factors.newDevice) score += 2;

    // 位置因素
    if (factors.unusualLocation) score += 3;
    if (factors.vpnDetected) score += 1;

    // 行为因素
    if (factors.unusualTime) score += 2;
    if (factors.rapidRequests) score += 2;

    // 设备因素
    if (!factors.deviceCompliant) score += 4;
    if (factors.jailbroken) score += 5;

    return Math.min(score, 10);
  }

  /**
   * 持续验证中间件
   */
  middleware() {
    return async (req, res, next) => {
      const startTime = Date.now();

      try {
        // 提取认证令牌
        const token = req.headers.authorization?.replace('Bearer ', '');
        if (!token) {
          return res.status(401).json({
            error: 'unauthorized',
            message: '未提供认证令牌'
          });
        }

        // 第1步:验证身份
        const identity = await this.verifyIdentity(token);
        if (!identity.valid) {
          return res.status(401).json({
            error: 'unauthorized',
            message: '无效的身份'
          });
        }

        // 第2步:验证设备
        const deviceId = req.headers['x-device-id'];
        const deviceFingerprint = req.headers['x-device-fingerprint'];

        if (deviceId && deviceFingerprint) {
          const device = await this.verifyDevice(deviceId, deviceFingerprint);
          if (!device.trusted) {
            return res.status(403).json({
              error: 'forbidden',
              message: device.reason
            });
          }
        }

        // 第3步:验证位置
        const location = await this.verifyLocation(req.ip);
        if (!location.valid) {
          // 需要加强认证
          return res.status(403).json({
            error: 'forbidden',
            message: '需要额外认证',
            requiresStepUp: true
          });
        }

        // 第4步:计算风险评分
        const riskScore = this.calculateRiskScore({
          mfaUsed: identity.mfaUsed,
          newDevice: !deviceId,
          unusualLocation: location.riskScore > 5,
          deviceCompliant: true
        });

        // 第5步:验证授权
        const authorization = await this.verifyAuthorization(
          identity.userId,
          req.path,
          req.method,
          {
            ip: req.ip,
            riskScore,
            time: new Date()
          }
        );

        if (!authorization.authorized) {
          return res.status(403).json({
            error: 'forbidden',
            message: authorization.reason
          });
        }

        // 添加上下文到请求
        req.zeroTrust = {
          userId: identity.userId,
          roles: identity.roles,
          riskScore,
          verificationTime: Date.now() - startTime
        };

        // 记录访问
        this.logAccess(req, identity, riskScore);

        next();
      } catch (error) {
        console.error('零信任验证失败:', error);
        return res.status(500).json({
          error: 'internal_error',
          message: '安全验证失败'
        });
      }
    };
  }

  async checkTokenRevocation(jti) {
    // 检查吊销列表
    return false;
  }

  async checkDeviceCompliance(deviceId) {
    // 检查设备是否符合安全要求
    return {
      compliant: true,
      reason: '设备符合要求',
      riskScore: 1
    };
  }

  async getGeoLocation(ip) {
    // 从IP获取地理位置
    return {
      country: 'US',
      city: 'San Francisco',
      lat: 37.7749,
      lon: -122.4194
    };
  }

  getLastKnownLocation(ip) {
    return null;
  }

  detectImpossibleTravel(lastLocation, currentLocation, timeDiff) {
    // 计算旅行是否可能
    return false;
  }

  async getUserPermissions(userId) {
    // 获取用户权限
    return {
      roles: ['user'],
      permissions: []
    };
  }

  hasPermission(user, resource, action) {
    return false;
  }

  hasRolePermission(role, resource, action) {
    return false;
  }

  async evaluateABAC(user, resource, action, context) {
    return { allowed: false };
  }

  logAccess(req, identity, riskScore) {
    console.log({
      timestamp: new Date().toISOString(),
      userId: identity.userId,
      resource: req.path,
      method: req.method,
      riskScore,
      ip: req.ip
    });
  }
}

// Express设置
const express = require('express');
const app = express();

const ztGateway = new ZeroTrustGateway();

// 应用零信任中间件
app.use(ztGateway.middleware());

// 受保护的端点
app.get('/api/sensitive-data', (req, res) => {
  res.json({
    message: '访问授权',
    riskScore: req.zeroTrust.riskScore
  });
});

module.exports = ZeroTrustGateway;

2. 服务网格 - 微分割

# istio-zero-trust.yaml
# Istio配置零信任微分割

apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: production
spec:
  mtls:
    mode: STRICT  # 要求相互TLS

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: deny-all
  namespace: production
spec:
  {} # 默认拒绝所有

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: allow-frontend-to-backend
  namespace: production
spec:
  selector:
    matchLabels:
      app: backend
  action: ALLOW
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/production/sa/frontend"]
    to:
    - operation:
        methods: ["GET", "POST"]
        paths: ["/api/*"]

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: allow-backend-to-database
  namespace: production
spec:
  selector:
    matchLabels:
      app: database
  action: ALLOW
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/production/sa/backend"]
    to:
    - operation:
        methods: ["*"]

---
# JWT认证
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
  name: jwt-auth
  namespace: production
spec:
  selector:
    matchLabels:
      app: backend
  jwtRules:
  - issuer: "https://auth.example.com"
    jwksUri: "https://auth.example.com/.well-known/jwks.json"
    audiences:
    - "api.example.com"

---
# 网络策略 - 额外的防御层
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: backend-network-policy
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: backend
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: frontend
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: database
    ports:
    - protocol: TCP
      port: 5432

3. Python零信任策略引擎

# zero_trust_policy.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
import jwt

@dataclass
class ZeroTrustContext:
    user_id: str
    device_id: str
    location: Dict[str, Any]
    risk_score: int
    timestamp: datetime

class ZeroTrustPolicy:
    def __init__(self):
        self.policies = self.load_policies()

    def load_policies(self) -> List[Dict]:
        """加载零信任策略"""
        return [
            {
                'name': 'high_risk_block',
                'condition': lambda ctx: ctx.risk_score >= 8,
                'action': 'deny',
                'reason': '高风险评分'
            },
            {
                'name': 'require_mfa',
                'condition': lambda ctx: ctx.risk_score >= 5,
                'action': 'step_up_auth',
                'reason': '提高风险需要MFA'
            },
            {
                'name': 'untrusted_device',
                'condition': lambda ctx: not self.is_device_trusted(ctx.device_id),
                'action': 'deny',
                'reason': '不信任的设备'
            },
            {
                'name': 'unusual_location',
                'condition': lambda ctx: self.is_unusual_location(ctx),
                'action': 'step_up_auth',
                'reason': '检测到不寻常的位置'
            }
        ]

    def evaluate(self, context: ZeroTrustContext, resource: str, action: str) -> Dict:
        """评估零信任策略"""
        for policy in self.policies:
            if policy['condition'](context):
                return {
                    'allowed': policy['action'] != 'deny',
                    'action': policy['action'],
                    'reason': policy['reason'],
                    'policy': policy['name']
                }

        # 检查资源特定权限
        if self.has_permission(context.user_id, resource, action):
            return {
                'allowed': True,
                'action': 'allow',
                'reason': '用户具有所需权限'
            }

        return {
            'allowed': False,
            'action': 'deny',
            'reason': '没有匹配的策略允许访问'
        }

    def is_device_trusted(self, device_id: str) -> bool:
        # 检查设备信任状态
        return True

    def is_unusual_location(self, context: ZeroTrustContext) -> bool:
        # 检查位置是否不寻常
        return False

    def has_permission(self, user_id: str, resource: str, action: str) -> bool:
        # 检查用户权限
        return False

# Flask集成
from flask import Flask, request, jsonify, g
from functools import wraps

app = Flask(__name__)
zt_policy = ZeroTrustPolicy()

def zero_trust_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # 构建零信任上下文
        context = ZeroTrustContext(
            user_id=g.user_id,
            device_id=request.headers.get('X-Device-ID', 'unknown'),
            location={
                'ip': request.remote_addr,
                'country': 'US'  # 来自GeoIP
            },
            risk_score=calculate_risk_score(request),
            timestamp=datetime.utcnow()
        )

        # 评估策略
        result = zt_policy.evaluate(
            context,
            request.path,
            request.method
        )

        if not result['allowed']:
            return jsonify({
                'error': 'forbidden',
                'reason': result['reason'],
                'action_required': result['action']
            }), 403

        return f(*args, **kwargs)

    return decorated_function

def calculate_risk_score(request) -> int:
    """基于请求上下文计算风险评分"""
    score = 0

    # 检查可疑模式
    if not request.headers.get('X-Device-ID'):
        score += 2

    # 添加更多风险因素
    return min(score, 10)

@app.route('/api/sensitive', methods=['GET'])
@zero_trust_required
def get_sensitive_data():
    return jsonify({'data': '敏感信息'})

零信任原则

  1. 明确验证:始终认证和授权
  2. 最小权限:所需的最小访问权限
  3. 假设泄露:限制破坏范围
  4. 微分割:网络分割
  5. 持续监控:实时安全
  6. 设备信任:验证设备合规性
  7. 数据保护:加密一切

实施层

  • 身份层:强认证(MFA,SSO)
  • 设备层:设备合规性,证书
  • 网络层:微分割,mTLS
  • 应用层:API网关,授权
  • 数据层:加密,DLP

零信任成熟度模型

  1. 传统:基于边界的安全
  2. 高级:一些分割
  3. 最优:完整的零信任
  4. 进步:持续改进

最佳实践

✅ DO

  • 验证每个请求
  • 到处实施MFA
  • 使用微分割
  • 持续监控
  • 加密所有通信
  • 实施最小权限
  • 记录所有访问
  • 定期审计

❌ DON’T

  • 信任网络位置
  • 使用隐式信任
  • 跳过设备验证
  • 允许侧向移动
  • 使用静态凭据

资源