name: secrets-rotation description: 实现自动化的秘密轮换策略,用于API密钥、凭证、证书和加密密钥,零停机部署和全面的审计日志记录。
秘密轮换
概览
实现自动化的秘密轮换策略,用于凭证、API密钥、证书和加密密钥,零停机部署和全面的审计日志记录。
何时使用
- API密钥管理
- 数据库凭证
- TLS/SSL证书
- 加密密钥轮换
- 合规要求
- 安全事件响应
- 服务账户管理
实施示例
1. Node.js 秘密管理器与轮换
// secrets-manager.js
const AWS = require('aws-sdk');
const crypto = require('crypto');
class SecretsManager {
constructor() {
this.secretsManager = new AWS.SecretsManager({
region: process.env.AWS_REGION
});
this.rotationSchedule = new Map();
}
/**
* 生成新的秘密值
*/
generateSecret(type = 'api_key', length = 32) {
switch (type) {
case 'api_key':
return crypto.randomBytes(length).toString('hex');
case 'password':
// 生成强密码
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
let password = '';
for (let i = 0; i < length; i++) {
password += chars.charAt(crypto.randomInt(chars.length));
}
return password;
case 'jwt_secret':
return crypto.randomBytes(64).toString('base64');
default:
return crypto.randomBytes(length).toString('base64');
}
}
/**
* 在AWS Secrets Manager中存储秘密
*/
async createSecret(name, value, description = '') {
const params = {
Name: name,
SecretString: JSON.stringify(value),
Description: description
};
try {
const result = await this.secretsManager.createSecret(params).promise();
return result;
} catch (error) {
if (error.code === 'ResourceExistsException') {
// 更新现有秘密
return this.updateSecret(name, value);
}
throw error;
}
}
/**
* 检索秘密
*/
async getSecret(name) {
const params = { SecretId: name };
try {
const data = await this.secretsManager.getSecretValue(params).promise();
if ('SecretString' in data) {
return JSON.parse(data.SecretString);
}
// 二进制秘密
const buff = Buffer.from(data.SecretBinary, 'base64');
return buff.toString('ascii');
} catch (error) {
console.error(`Error retrieving secret ${name}:`, error);
throw error;
}
}
/**
* 更新秘密值
*/
async updateSecret(name, value) {
const params = {
SecretId: name,
SecretString: JSON.stringify(value)
};
return this.secretsManager.updateSecret(params).promise();
}
/**
* 零停机轮换秘密
*/
async rotateSecret(name, type = 'api_key') {
console.log(`Starting rotation for secret: ${name}`);
try {
// 第1步:生成新秘密
const newValue = this.generateSecret(type);
// 第2步:存储新版本
const currentSecret = await this.getSecret(name);
// 暂时保留旧值以实现平滑过渡
const secretWithRotation = {
current: newValue,
previous: currentSecret.current || currentSecret,
rotatedAt: new Date().toISOString()
};
await this.updateSecret(name, secretWithRotation);
console.log(`New secret version created for: ${name}`);
// 第3步:等待应用程序获取新秘密
await this.waitForPropagation(5000);
// 第4步:验证新秘密是否有效
const verificationPassed = await this.verifySecret(name, newValue);
if (!verificationPassed) {
throw new Error('Secret verification failed');
}
// 第5步:在宽限期后移除旧版本
setTimeout(async () => {
await this.updateSecret(name, {
current: newValue,
rotatedAt: new Date().toISOString()
});
console.log(`Rotation completed for: ${name}`);
}, 300000); // 5分钟宽限期
return {
success: true,
secretName: name,
rotatedAt: new Date().toISOString()
};
} catch (error) {
console.error(`Rotation failed for ${name}:`, error);
// 失败时回滚
await this.rollbackRotation(name);
throw error;
}
}
/**
* 计划自动轮换
*/
async scheduleRotation(name, intervalDays = 90) {
const intervalMs = intervalDays * 24 * 60 * 60 * 1000;
const rotationJob = setInterval(async () => {
try {
await this.rotateSecret(name);
console.log(`Scheduled rotation completed for: ${name}`);
} catch (error) {
console.error(`Scheduled rotation failed for ${name}:`, error);
// 警告运维团队
this.sendAlert(name, error);
}
}, intervalMs);
this.rotationSchedule.set(name, rotationJob);
// AWS Secrets Manager自动轮换
const params = {
SecretId: name,
RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
RotationRules: {
AutomaticallyAfterDays: intervalDays
}
};
await this.secretsManager.rotateSecret(params).promise();
}
/**
* 轮换数据库凭证
*/
async rotateDatabaseCredentials(secretName) {
const credentials = await this.getSecret(secretName);
// 生成新密码
const newPassword = this.generateSecret('password', 20);
// 更新数据库用户密码
const connection = await this.connectToDatabase(credentials);
await connection.query(
'ALTER USER ? IDENTIFIED BY ?',
[credentials.username, newPassword]
);
// 更新秘密
await this.updateSecret(secretName, {
username: credentials.username,
password: newPassword,
host: credentials.host,
database: credentials.database,
rotatedAt: new Date().toISOString()
});
await connection.end();
return { success: true };
}
/**
* 轮换TLS证书
*/
async rotateTLSCertificate(domain) {
// 使用Let's Encrypt或内部CA
const certbot = require('certbot');
try {
// 请求新证书
const newCert = await certbot.certonly({
domains: [domain],
email: process.env.ADMIN_EMAIL,
agreeTos: true,
renewByDefault: true
});
// 存储在秘密管理器中
await this.createSecret(`tls-cert-${domain}`, {
certificate: newCert.certificate,
privateKey: newCert.privateKey,
chain: newCert.chain,
issuedAt: new Date().toISOString(),
expiresAt: newCert.expiresAt
});
// 更新负载均衡器/网络服务器
await this.updateServerCertificate(domain, newCert);
console.log(`TLS certificate rotated for: ${domain}`);
return { success: true };
} catch (error) {
console.error('Certificate rotation failed:', error);
throw error;
}
}
async waitForPropagation(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async verifySecret(name, value) {
// 实现验证逻辑
// 测试API调用,数据库连接等。
return true;
}
async rollbackRotation(name) {
// 恢复以前版本
console.log(`Rolling back rotation for: ${name}`);
}
async sendAlert(secretName, error) {
// 发送到监控系统
console.error(`ALERT: Rotation failed for ${secretName}`, error);
}
async connectToDatabase(credentials) {
// 数据库连接逻辑
return null;
}
async updateServerCertificate(domain, cert) {
// 更新服务器配置
return null;
}
}
// 使用方法
const secretsManager = new SecretsManager();
// 轮换API密钥
async function rotateAPIKey() {
await secretsManager.rotateSecret('api-key-external-service', 'api_key');
}
// 计划自动轮换
async function setupRotationSchedule() {
await secretsManager.scheduleRotation('database-credentials', 90);
await secretsManager.scheduleRotation('api-keys', 30);
}
// 轮换数据库凭证
async function rotateDatabaseCreds() {
await secretsManager.rotateDatabaseCredentials('rds-production');
}
module.exports = SecretsManager;
2. Python 秘密轮换与Vault
# secrets_rotation.py
import hvac
import secrets
import string
from datetime import datetime, timedelta
from typing import Dict, Any
import psycopg2
import boto3
class SecretsRotation:
def __init__(self, vault_url: str, vault_token: str):
self.vault_client = hvac.Client(url=vault_url, token=vault_token)
self.ssm = boto3.client('ssm')
def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
"""生成新的秘密值"""
if secret_type == 'api_key':
return secrets.token_urlsafe(length)
elif secret_type == 'password':
# 强密码包含所有字符类型
chars = string.ascii_letters + string.digits + string.punctuation
return ''.join(secrets.choice(chars) for _ in range(length))
elif secret_type == 'jwt_secret':
return secrets.token_urlsafe(64)
else:
return secrets.token_bytes(length).hex()
def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
"""零停机轮换秘密"""
print(f"Starting rotation for: {path}")
try:
# 读取当前秘密
current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
current_data = current_secret['data']['data']
# 生成新值
new_value = self.generate_secret(secret_type)
# 存储新旧值
rotation_data = {
'current': new_value,
'previous': current_data.get('current', current_data.get('value')),
'rotated_at': datetime.utcnow().isoformat()
}
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=rotation_data
)
print(f"Secret rotated successfully: {path}")
return {
'success': True,
'path': path,
'rotated_at': rotation_data['rotated_at']
}
except Exception as e:
print(f"Rotation failed for {path}: {e}")
raise
def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
"""轮换数据库凭证"""
# 获取当前凭证
secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
creds = secret['data']['data']
# 生成新密码
new_password = self.generate_secret('password', 20)
# 连接数据库
conn = psycopg2.connect(
host=creds['host'],
database=creds['database'],
user=creds['username'],
password=creds['password']
)
cursor = conn.cursor()
try:
# 在数据库中更新密码
cursor.execute(
f"ALTER USER {creds['username']} WITH PASSWORD %s",
(new_password,)
)
conn.commit()
# 在Vault中更新秘密
updated_creds = {
**creds,
'password': new_password,
'rotated_at': datetime.utcnow().isoformat()
}
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=secret_path,
secret=updated_creds
)
print(f"Database credentials rotated: {secret_path}")
return {'success': True}
finally:
cursor.close()
conn.close()
def schedule_rotation(self, path: str, interval_days: int = 90):
"""使用AWS Lambda计划自动轮换"""
# 在AWS Secrets Manager中创建轮换计划
# 或使用cron作业
schedule_expression = f"rate({interval_days} days)"
# 这将触发一个Lambda函数
print(f"Rotation scheduled for {path}: every {interval_days} days")
def rotate_encryption_keys(self, key_id: str):
"""轮换加密密钥"""
kms = boto3.client('kms')
# 启用自动密钥轮换
kms.enable_key_rotation(KeyId=key_id)
print(f"Automatic rotation enabled for KMS key: {key_id}")
def audit_rotation_history(self, path: str) -> list:
"""获取轮换历史"""
versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)
history = []
for version, metadata in versions['data']['versions'].items():
history.append({
'version': version,
'created_time': metadata['created_time'],
'deleted': metadata.get('deletion_time') is not None
})
return sorted(history, key=lambda x: x['created_time'], reverse=True)
# 使用方法
if __name__ == '__main__':
rotation = SecretsRotation(
vault_url='http://localhost:8200',
vault_token='your-token'
)
# 轮换API密钥
rotation.rotate_secret('api-keys/external-service', 'api_key')
# 轮换数据库凭证
rotation.rotate_database_password('database/production')
# 计划轮换
rotation.schedule_rotation('api-keys/external-service', 30)
rotation.schedule_rotation('database/production', 90)
# 查看历史
history = rotation.audit_rotation_history('api-keys/external-service')
print(f"Rotation history: {history}")
3. Kubernetes 秘密轮换
# secrets-rotation-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: secrets-rotation
namespace: production
spec:
schedule: "0 2 * * 0" # 每周日凌晨2点
jobTemplate:
spec:
template:
spec:
serviceAccountName: secrets-rotator
containers:
- name: rotate
image: secrets-rotator:latest
env:
- name: VAULT_ADDR
value: "http://vault:8200"
- name: VAULT_TOKEN
valueFrom:
secretKeyRef:
name: vault-token
key: token
command:
- /bin/sh
- -c
- |
# 轮换秘密
python /app/rotate_secrets.py \
--secret database-password \
--secret api-keys \
--secret tls-certificates
restartPolicy: OnFailure
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: secrets-rotator
namespace: production
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: secrets-rotator
namespace: production
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: secrets-rotator
namespace: production
subjects:
- kind: ServiceAccount
name: secrets-rotator
roleRef:
kind: Role
name: secrets-rotator
apiGroup: rbac.authorization.k8s.io
最佳实践
✅ DO
- 自动化轮换
- 使用宽限期
- 验证新秘密
- 维护轮换审计跟踪
- 实施回滚程序
- 监控轮换失败
- 使用托管服务(AWS Secrets Manager)
- 测试轮换程序
❌ DON’T
- 硬编码秘密
- 共享秘密
- 跳过验证
- 无宽限期轮换
- 忽略轮换失败
- 在版本控制中存储秘密
轮换计划
- API密钥:30-90天
- 数据库密码:90天
- TLS证书:到期前
- 加密密钥:1年
- 服务账户令牌:90天
零停机策略
- 生成新秘密
- 带版本控制存储
- 宽限期(两个版本均有效)
- 验证
- 弃用旧版本
- 宽限期后移除