SecretsRotation secrets-rotation

自动化秘密轮换策略,用于API密钥、凭证、证书和加密密钥,确保零停机部署和全面的审计日志记录。

云安全 0 次安装 0 次浏览 更新于 3/4/2026

name: secrets-rotation description: 实现自动化的秘密轮换策略,用于API密钥、凭证、证书和加密密钥,零停机部署和全面的审计日志记录。

秘密轮换

概览

实现自动化的秘密轮换策略,用于凭证、API密钥、证书和加密密钥,零停机部署和全面的审计日志记录。

何时使用

  • API密钥管理
  • 数据库凭证
  • TLS/SSL证书
  • 加密密钥轮换
  • 合规要求
  • 安全事件响应
  • 服务账户管理

实施示例

1. Node.js 秘密管理器与轮换

// secrets-manager.js
const AWS = require('aws-sdk');
const crypto = require('crypto');

class SecretsManager {
  constructor() {
    this.secretsManager = new AWS.SecretsManager({
      region: process.env.AWS_REGION
    });

    this.rotationSchedule = new Map();
  }

  /**
   * 生成新的秘密值
   */
  generateSecret(type = 'api_key', length = 32) {
    switch (type) {
      case 'api_key':
        return crypto.randomBytes(length).toString('hex');

      case 'password':
        // 生成强密码
        const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
        let password = '';
        for (let i = 0; i < length; i++) {
          password += chars.charAt(crypto.randomInt(chars.length));
        }
        return password;

      case 'jwt_secret':
        return crypto.randomBytes(64).toString('base64');

      default:
        return crypto.randomBytes(length).toString('base64');
    }
  }

  /**
   * 在AWS Secrets Manager中存储秘密
   */
  async createSecret(name, value, description = '') {
    const params = {
      Name: name,
      SecretString: JSON.stringify(value),
      Description: description
    };

    try {
      const result = await this.secretsManager.createSecret(params).promise();
      return result;
    } catch (error) {
      if (error.code === 'ResourceExistsException') {
        // 更新现有秘密
        return this.updateSecret(name, value);
      }
      throw error;
    }
  }

  /**
   * 检索秘密
   */
  async getSecret(name) {
    const params = { SecretId: name };

    try {
      const data = await this.secretsManager.getSecretValue(params).promise();

      if ('SecretString' in data) {
        return JSON.parse(data.SecretString);
      }

      // 二进制秘密
      const buff = Buffer.from(data.SecretBinary, 'base64');
      return buff.toString('ascii');
    } catch (error) {
      console.error(`Error retrieving secret ${name}:`, error);
      throw error;
    }
  }

  /**
   * 更新秘密值
   */
  async updateSecret(name, value) {
    const params = {
      SecretId: name,
      SecretString: JSON.stringify(value)
    };

    return this.secretsManager.updateSecret(params).promise();
  }

  /**
   * 零停机轮换秘密
   */
  async rotateSecret(name, type = 'api_key') {
    console.log(`Starting rotation for secret: ${name}`);

    try {
      // 第1步:生成新秘密
      const newValue = this.generateSecret(type);

      // 第2步:存储新版本
      const currentSecret = await this.getSecret(name);

      // 暂时保留旧值以实现平滑过渡
      const secretWithRotation = {
        current: newValue,
        previous: currentSecret.current || currentSecret,
        rotatedAt: new Date().toISOString()
      };

      await this.updateSecret(name, secretWithRotation);

      console.log(`New secret version created for: ${name}`);

      // 第3步:等待应用程序获取新秘密
      await this.waitForPropagation(5000);

      // 第4步:验证新秘密是否有效
      const verificationPassed = await this.verifySecret(name, newValue);

      if (!verificationPassed) {
        throw new Error('Secret verification failed');
      }

      // 第5步:在宽限期后移除旧版本
      setTimeout(async () => {
        await this.updateSecret(name, {
          current: newValue,
          rotatedAt: new Date().toISOString()
        });
        console.log(`Rotation completed for: ${name}`);
      }, 300000); // 5分钟宽限期

      return {
        success: true,
        secretName: name,
        rotatedAt: new Date().toISOString()
      };
    } catch (error) {
      console.error(`Rotation failed for ${name}:`, error);

      // 失败时回滚
      await this.rollbackRotation(name);

      throw error;
    }
  }

  /**
   * 计划自动轮换
   */
  async scheduleRotation(name, intervalDays = 90) {
    const intervalMs = intervalDays * 24 * 60 * 60 * 1000;

    const rotationJob = setInterval(async () => {
      try {
        await this.rotateSecret(name);
        console.log(`Scheduled rotation completed for: ${name}`);
      } catch (error) {
        console.error(`Scheduled rotation failed for ${name}:`, error);
        // 警告运维团队
        this.sendAlert(name, error);
      }
    }, intervalMs);

    this.rotationSchedule.set(name, rotationJob);

    // AWS Secrets Manager自动轮换
    const params = {
      SecretId: name,
      RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
      RotationRules: {
        AutomaticallyAfterDays: intervalDays
      }
    };

    await this.secretsManager.rotateSecret(params).promise();
  }

  /**
   * 轮换数据库凭证
   */
  async rotateDatabaseCredentials(secretName) {
    const credentials = await this.getSecret(secretName);

    // 生成新密码
    const newPassword = this.generateSecret('password', 20);

    // 更新数据库用户密码
    const connection = await this.connectToDatabase(credentials);

    await connection.query(
      'ALTER USER ? IDENTIFIED BY ?',
      [credentials.username, newPassword]
    );

    // 更新秘密
    await this.updateSecret(secretName, {
      username: credentials.username,
      password: newPassword,
      host: credentials.host,
      database: credentials.database,
      rotatedAt: new Date().toISOString()
    });

    await connection.end();

    return { success: true };
  }

  /**
   * 轮换TLS证书
   */
  async rotateTLSCertificate(domain) {
    // 使用Let's Encrypt或内部CA
    const certbot = require('certbot');

    try {
      // 请求新证书
      const newCert = await certbot.certonly({
        domains: [domain],
        email: process.env.ADMIN_EMAIL,
        agreeTos: true,
        renewByDefault: true
      });

      // 存储在秘密管理器中
      await this.createSecret(`tls-cert-${domain}`, {
        certificate: newCert.certificate,
        privateKey: newCert.privateKey,
        chain: newCert.chain,
        issuedAt: new Date().toISOString(),
        expiresAt: newCert.expiresAt
      });

      // 更新负载均衡器/网络服务器
      await this.updateServerCertificate(domain, newCert);

      console.log(`TLS certificate rotated for: ${domain}`);

      return { success: true };
    } catch (error) {
      console.error('Certificate rotation failed:', error);
      throw error;
    }
  }

  async waitForPropagation(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  async verifySecret(name, value) {
    // 实现验证逻辑
    // 测试API调用,数据库连接等。
    return true;
  }

  async rollbackRotation(name) {
    // 恢复以前版本
    console.log(`Rolling back rotation for: ${name}`);
  }

  async sendAlert(secretName, error) {
    // 发送到监控系统
    console.error(`ALERT: Rotation failed for ${secretName}`, error);
  }

  async connectToDatabase(credentials) {
    // 数据库连接逻辑
    return null;
  }

  async updateServerCertificate(domain, cert) {
    // 更新服务器配置
    return null;
  }
}

// 使用方法
const secretsManager = new SecretsManager();

// 轮换API密钥
async function rotateAPIKey() {
  await secretsManager.rotateSecret('api-key-external-service', 'api_key');
}

// 计划自动轮换
async function setupRotationSchedule() {
  await secretsManager.scheduleRotation('database-credentials', 90);
  await secretsManager.scheduleRotation('api-keys', 30);
}

// 轮换数据库凭证
async function rotateDatabaseCreds() {
  await secretsManager.rotateDatabaseCredentials('rds-production');
}

module.exports = SecretsManager;

2. Python 秘密轮换与Vault

# secrets_rotation.py
import hvac
import secrets
import string
from datetime import datetime, timedelta
from typing import Dict, Any
import psycopg2
import boto3

class SecretsRotation:
    def __init__(self, vault_url: str, vault_token: str):
        self.vault_client = hvac.Client(url=vault_url, token=vault_token)
        self.ssm = boto3.client('ssm')

    def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
        """生成新的秘密值"""
        if secret_type == 'api_key':
            return secrets.token_urlsafe(length)

        elif secret_type == 'password':
            # 强密码包含所有字符类型
            chars = string.ascii_letters + string.digits + string.punctuation
            return ''.join(secrets.choice(chars) for _ in range(length))

        elif secret_type == 'jwt_secret':
            return secrets.token_urlsafe(64)

        else:
            return secrets.token_bytes(length).hex()

    def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
        """零停机轮换秘密"""
        print(f"Starting rotation for: {path}")

        try:
            # 读取当前秘密
            current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
            current_data = current_secret['data']['data']

            # 生成新值
            new_value = self.generate_secret(secret_type)

            # 存储新旧值
            rotation_data = {
                'current': new_value,
                'previous': current_data.get('current', current_data.get('value')),
                'rotated_at': datetime.utcnow().isoformat()
            }

            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path=path,
                secret=rotation_data
            )

            print(f"Secret rotated successfully: {path}")

            return {
                'success': True,
                'path': path,
                'rotated_at': rotation_data['rotated_at']
            }

        except Exception as e:
            print(f"Rotation failed for {path}: {e}")
            raise

    def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
        """轮换数据库凭证"""
        # 获取当前凭证
        secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
        creds = secret['data']['data']

        # 生成新密码
        new_password = self.generate_secret('password', 20)

        # 连接数据库
        conn = psycopg2.connect(
            host=creds['host'],
            database=creds['database'],
            user=creds['username'],
            password=creds['password']
        )

        cursor = conn.cursor()

        try:
            # 在数据库中更新密码
            cursor.execute(
                f"ALTER USER {creds['username']} WITH PASSWORD %s",
                (new_password,)
            )
            conn.commit()

            # 在Vault中更新秘密
            updated_creds = {
                **creds,
                'password': new_password,
                'rotated_at': datetime.utcnow().isoformat()
            }

            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path=secret_path,
                secret=updated_creds
            )

            print(f"Database credentials rotated: {secret_path}")

            return {'success': True}

        finally:
            cursor.close()
            conn.close()

    def schedule_rotation(self, path: str, interval_days: int = 90):
        """使用AWS Lambda计划自动轮换"""
        # 在AWS Secrets Manager中创建轮换计划
        # 或使用cron作业

        schedule_expression = f"rate({interval_days} days)"

        # 这将触发一个Lambda函数
        print(f"Rotation scheduled for {path}: every {interval_days} days")

    def rotate_encryption_keys(self, key_id: str):
        """轮换加密密钥"""
        kms = boto3.client('kms')

        # 启用自动密钥轮换
        kms.enable_key_rotation(KeyId=key_id)

        print(f"Automatic rotation enabled for KMS key: {key_id}")

    def audit_rotation_history(self, path: str) -> list:
        """获取轮换历史"""
        versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)

        history = []
        for version, metadata in versions['data']['versions'].items():
            history.append({
                'version': version,
                'created_time': metadata['created_time'],
                'deleted': metadata.get('deletion_time') is not None
            })

        return sorted(history, key=lambda x: x['created_time'], reverse=True)

# 使用方法
if __name__ == '__main__':
    rotation = SecretsRotation(
        vault_url='http://localhost:8200',
        vault_token='your-token'
    )

    # 轮换API密钥
    rotation.rotate_secret('api-keys/external-service', 'api_key')

    # 轮换数据库凭证
    rotation.rotate_database_password('database/production')

    # 计划轮换
    rotation.schedule_rotation('api-keys/external-service', 30)
    rotation.schedule_rotation('database/production', 90)

    # 查看历史
    history = rotation.audit_rotation_history('api-keys/external-service')
    print(f"Rotation history: {history}")

3. Kubernetes 秘密轮换

# secrets-rotation-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: secrets-rotation
  namespace: production
spec:
  schedule: "0 2 * * 0"  # 每周日凌晨2点
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: secrets-rotator
          containers:
          - name: rotate
            image: secrets-rotator:latest
            env:
            - name: VAULT_ADDR
              value: "http://vault:8200"
            - name: VAULT_TOKEN
              valueFrom:
                secretKeyRef:
                  name: vault-token
                  key: token
            command:
            - /bin/sh
            - -c
            - |
              # 轮换秘密
              python /app/rotate_secrets.py \
                --secret database-password \
                --secret api-keys \
                --secret tls-certificates

          restartPolicy: OnFailure
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: secrets-rotator
  namespace: production
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: secrets-rotator
  namespace: production
rules:
- apiGroups: [""]
  resources: ["secrets"]
  verbs: ["get", "list", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: secrets-rotator
  namespace: production
subjects:
- kind: ServiceAccount
  name: secrets-rotator
roleRef:
  kind: Role
  name: secrets-rotator
  apiGroup: rbac.authorization.k8s.io

最佳实践

✅ DO

  • 自动化轮换
  • 使用宽限期
  • 验证新秘密
  • 维护轮换审计跟踪
  • 实施回滚程序
  • 监控轮换失败
  • 使用托管服务(AWS Secrets Manager)
  • 测试轮换程序

❌ DON’T

  • 硬编码秘密
  • 共享秘密
  • 跳过验证
  • 无宽限期轮换
  • 忽略轮换失败
  • 在版本控制中存储秘密

轮换计划

  • API密钥:30-90天
  • 数据库密码:90天
  • TLS证书:到期前
  • 加密密钥:1年
  • 服务账户令牌:90天

零停机策略

  1. 生成新秘密
  2. 带版本控制存储
  3. 宽限期(两个版本均有效)
  4. 验证
  5. 弃用旧版本
  6. 宽限期后移除

资源