name: aws-aurora description: AWS Aurora Serverless v2, RDS Proxy, Data API, connection pooling
AWS Aurora 技能
Load with: base.md + [typescript.md | python.md]
Amazon Aurora 是一个与 MySQL/PostgreSQL 兼容的关系型数据库,具有无服务器扩展、高可用性和企业级特性。
参考资料: Aurora 文档 | Serverless v2 | RDS Proxy
核心原则
对于无服务器架构(Lambda),始终使用 RDS Proxy 或 Data API 来处理连接管理。永远不要从 Lambda 函数直接打开原始连接。
Aurora 在 ACID 兼容的工作负载中表现出色。对于无服务器架构(Lambda),始终使用 RDS Proxy 或 Data API 来处理连接管理。
Aurora 选项
| 选项 | 最适合 |
|---|---|
| Aurora Serverless v2 | 变化的工作负载,自动扩展(0.5-128 ACUs) |
| Aurora Provisioned | 可预测的工作负载,最大性能 |
| Aurora Global | 多区域,灾难恢复 |
| Data API | 无 VPC 的无服务器,简单的 HTTP 访问 |
| RDS Proxy | Lambda 的连接池,高并发 |
连接策略
策略 1:RDS Proxy(推荐用于 Lambda)
Lambda → RDS Proxy → Aurora
(pool)
- 连接池和重用
- 自动故障转移处理
- IAM 认证支持
- 与现有 SQL 客户端兼容
策略 2:Data API(无服务器最简单的)
Lambda → Data API (HTTP) → Aurora
- 不需要 VPC
- 不需要连接管理
- 每个查询的延迟更高
- 仅限于 Aurora Serverless
策略 3:直接连接(不适合 Lambda)
App Server → Aurora
(persistent connection)
- 仅适用于长运行的服务器(ECS, EC2)
- 自己管理连接池
- 不适合无服务器
RDS Proxy 设置
创建代理(AWS 控制台/CDK)
// CDK 示例
import * as rds from 'aws-cdk-lib/aws-rds';
const proxy = new rds.DatabaseProxy(this, 'Proxy', {
proxyTarget: rds.ProxyTarget.fromCluster(cluster),
secrets: [cluster.secret!],
vpc,
securityGroups: [proxySecurityGroup],
requireTLS: true,
idleClientTimeout: cdk.Duration.minutes(30),
maxConnectionsPercent: 90,
maxIdleConnectionsPercent: 10,
borrowTimeout: cdk.Duration.seconds(30)
});
通过代理连接(TypeScript/Node.js)
// lib/db.ts
import { Pool } from 'pg';
import { Signer } from '@aws-sdk/rds-signer';
const signer = new Signer({
hostname: process.env.RDS_PROXY_ENDPOINT!,
port: 5432,
username: process.env.DB_USER!,
region: process.env.AWS_REGION!
});
// IAM 认证
async function getPool(): Promise<Pool> {
const token = await signer.getAuthToken();
return new Pool({
host: process.env.RDS_PROXY_ENDPOINT,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: token,
ssl: { rejectUnauthorized: true },
max: 1, // 单个连接用于 Lambda
idleTimeoutMillis: 120000,
connectionTimeoutMillis: 10000
});
}
// Lambda 中的使用
let pool: Pool | null = null;
export async function handler(event: any) {
if (!pool) {
pool = await getPool();
}
const result = await pool.query('SELECT * FROM users WHERE id = $1', [event.userId]);
return result.rows[0];
}
代理配置最佳实践
# Lambda 工作负载的关键设置
MaxConnectionsPercent: 90 # 使用大部分数据库连接
MaxIdleConnectionsPercent: 10 # 保留一些空闲连接以应对突发情况
ConnectionBorrowTimeout: 30s # 等待可用连接
IdleClientTimeout: 30min # 关闭空闲代理连接
# 监控这些 CloudWatch 指标:
# - DatabaseConnectionsCurrentlyBorrowed
# - DatabaseConnectionsCurrentlySessionPinned
# - QueryDatabaseResponseLatency
Data API(基于 HTTP)
启用 Data API
# 必须是 Aurora Serverless
aws rds modify-db-cluster \
--db-cluster-identifier my-cluster \
--enable-http-endpoint
TypeScript 与 Data API 客户端 v2
npm install data-api-client
// lib/db.ts
import DataAPIClient from 'data-api-client';
const db = DataAPIClient({
secretArn: process.env.DB_SECRET_ARN!,
resourceArn: process.env.DB_CLUSTER_ARN!,
database: process.env.DB_NAME!,
region: process.env.AWS_REGION!
});
// 简单查询
const users = await db.query('SELECT * FROM users WHERE active = :active', {
active: true
});
// 插入并返回
const result = await db.query(
'INSERT INTO users (email, name) VALUES (:email, :name) RETURNING *',
{ email: 'user@test.com', name: 'Test User' }
);
// 事务
const transaction = await db.transaction();
try {
await transaction.query('UPDATE accounts SET balance = balance - :amount WHERE id = :from', {
amount: 100, from: 1
});
await transaction.query('UPDATE accounts SET balance = balance + :amount WHERE id = :to', {
amount: 100, to: 2
});
await transaction.commit();
} catch (error) {
await transaction.rollback();
throw error;
}
Python 与 boto3
# requirements.txt
boto3>=1.34.0
# db.py
import boto3
import os
rds_data = boto3.client('rds-data')
CLUSTER_ARN = os.environ['DB_CLUSTER_ARN']
SECRET_ARN = os.environ['DB_SECRET_ARN']
DATABASE = os.environ['DB_NAME']
def execute_sql(sql: str, parameters: list = None):
"""通过 Data API 执行 SQL。"""
params = {
'resourceArn': CLUSTER_ARN,
'secretArn': SECRET_ARN,
'database': DATABASE,
'sql': sql
}
if parameters:
params['parameters'] = parameters
return rds_data.execute_statement(**params)
def get_user(user_id: int):
result = execute_sql(
'SELECT * FROM users WHERE id = :id',
[{'name': 'id', 'value': {'longValue': user_id}}]
)
return result.get('records', [])
def create_user(email: str, name: str):
result = execute_sql(
'INSERT INTO users (email, name) VALUES (:email, :name) RETURNING *',
[
{'name': 'email', 'value': {'stringValue': email}},
{'name': 'name', 'value': {'stringValue': name}}
]
)
return result.get('generatedFields')
# 事务
def transfer_funds(from_id: int, to_id: int, amount: float):
transaction = rds_data.begin_transaction(
resourceArn=CLUSTER_ARN,
secretArn=SECRET_ARN,
database=DATABASE
)
transaction_id = transaction['transactionId']
try:
execute_sql(
'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
[
{'name': 'amount', 'value': {'doubleValue': amount}},
{'name': 'id', 'value': {'longValue': from_id}}
]
)
execute_sql(
'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
[
{'name': 'amount', 'value': {'doubleValue': amount}},
{'name': 'id', 'value': {'longValue': to_id}}
]
)
rds_data.commit_transaction(
resourceArn=CLUSTER_ARN,
secretArn=SECRET_ARN,
transactionId=transaction_id
)
except Exception as e:
rds_data.rollback_transaction(
resourceArn=CLUSTER_ARN,
secretArn=SECRET_ARN,
transactionId=transaction_id
)
raise e
Prisma 与 Aurora
设置(通过 RDS Proxy 连接 VPC)
npm install prisma @prisma/client
npx prisma init
// prisma/schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model User {
id Int @id @default(autoincrement())
email String @unique
name String
posts Post[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
model Post {
id Int @id @default(autoincrement())
title String
content String?
published Boolean @default(false)
author User @relation(fields: [authorId], references: [id])
authorId Int
createdAt DateTime @default(now())
}
环境
# 使用 RDS Proxy 端点
DATABASE_URL="postgresql://user:password@proxy-endpoint.proxy-xxx.region.rds.amazonaws.com:5432/mydb?schema=public&connection_limit=1"
Lambda 处理器与 Prisma
// handlers/users.ts
import { PrismaClient } from '@prisma/client';
// 跨调用重用客户端
let prisma: PrismaClient | null = null;
function getPrisma(): PrismaClient {
if (!prisma) {
prisma = new PrismaClient({
datasources: {
db: { url: process.env.DATABASE_URL }
}
});
}
return prisma;
}
export async function handler(event: any) {
const db = getPrisma();
const users = await db.user.findMany({
include: { posts: true },
take: 10
});
return {
statusCode: 200,
body: JSON.stringify(users)
};
}
Aurora Serverless v2
容量配置
// CDK
const cluster = new rds.DatabaseCluster(this, 'Cluster', {
engine: rds.DatabaseClusterEngine.auroraPostgres({
version: rds.AuroraPostgresEngineVersion.VER_15_4
}),
serverlessV2MinCapacity: 0.5, // 最小 ACUs
serverlessV2MaxCapacity: 16, // 最大 ACUs
writer: rds.ClusterInstance.serverlessV2('writer'),
readers: [
rds.ClusterInstance.serverlessV2('reader', { scaleWithWriter: true })
],
vpc,
vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }
});
容量指南
| 工作负载 | 最小 ACUs | 最大 ACUs |
|---|---|---|
| Dev/Test | 0.5 | 2 |
| 小型生产 | 2 | 8 |
| 中型生产 | 4 | 32 |
| 大型生产 | 8 | 128 |
处理缩放到零的唤醒
// Data API 客户端 v2 自动处理此问题
// 对于直接连接,实现重试逻辑:
import { Pool } from 'pg';
async function queryWithRetry(
pool: Pool,
sql: string,
params: any[],
maxRetries = 3
): Promise<any> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await pool.query(sql, params);
} catch (error: any) {
// Aurora Serverless 唤醒中
if (error.code === 'ETIMEDOUT' || error.message?.includes('Communications link failure')) {
if (attempt === maxRetries) throw error;
// 指数退避
await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
continue;
}
throw error;
}
}
}
迁移
使用 Prisma Migrate
# 开发(创建迁移)
npx prisma migrate dev --name add_users_table
# 生产(应用迁移)
npx prisma migrate deploy
# 生成客户端
npx prisma generate
CI/CD 迁移脚本
# .github/workflows/deploy.yml
- name: 运行迁移
run: |
# 通过堡垒机连接或使用迁移 Lambda
npx prisma migrate deploy
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
迁移 Lambda
// lambdas/migrate.ts
import { execSync } from 'child_process';
export async function handler() {
try {
execSync('npx prisma migrate deploy', {
env: {
...process.env,
DATABASE_URL: process.env.DATABASE_URL
},
stdio: 'inherit'
});
return { statusCode: 200, body: 'Migrations applied' };
} catch (error) {
console.error('Migration failed:', error);
throw error;
}
}
连接池(非 Lambda)
PgBouncer 边车(ECS/EKS)
# docker-compose.yml
services:
app:
build: .
environment:
DATABASE_URL: postgresql://user:pass@pgbouncer:6432/mydb
pgbouncer:
image: edoburu/pgbouncer
environment:
DATABASE_URL: postgresql://user:pass@aurora-endpoint:5432/mydb
POOL_MODE: transaction
MAX_CLIENT_CONN: 1000
DEFAULT_POOL_SIZE: 20
应用级池化
// 适用于长运行的服务器(非 Lambda)
import { Pool } from 'pg';
const pool = new Pool({
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 20, // 最大连接数
idleTimeoutMillis: 30000, // 30秒后关闭空闲连接
connectionTimeoutMillis: 10000
});
// 使用池进行所有查询
export async function query(sql: string, params?: any[]) {
const client = await pool.connect();
try {
return await client.query(sql, params);
} finally {
client.release();
}
}
监控
关键 CloudWatch 指标
# Aurora
- CPUUtilization
- DatabaseConnections
- FreeableMemory
- ServerlessDatabaseCapacity (ACUs)
- AuroraReplicaLag
# RDS Proxy
- DatabaseConnectionsCurrentlyBorrowed
- DatabaseConnectionsCurrentlySessionPinned
- QueryDatabaseResponseLatency
- ClientConnectionsReceived
性能洞察
# 通过控制台或 CLI 启用
aws rds modify-db-cluster \
--db-cluster-identifier my-cluster \
--enable-performance-insights \
--performance-insights-retention-period 7
安全
IAM 数据库认证
import { Signer } from '@aws-sdk/rds-signer';
const signer = new Signer({
hostname: process.env.DB_HOST!,
port: 5432,
username: 'iam_user',
region: 'us-east-1'
});
const token = await signer.getAuthToken();
// 将 token 作为密码使用(有效期为 15 分钟)
const pool = new Pool({
host: process.env.DB_HOST,
user: 'iam_user',
password: token,
ssl: true
});
秘密管理器轮换
import { SecretsManagerClient, GetSecretValueCommand } from '@aws-sdk/client-secrets-manager';
const client = new SecretsManagerClient({ region: 'us-east-1' });
async function getDbCredentials() {
const response = await client.send(
new GetSecretValueCommand({ SecretId: process.env.DB_SECRET_ARN })
);
return JSON.parse(response.SecretString!);
}
CLI 快速参考
# 集群操作
aws rds describe-db-clusters
aws rds create-db-cluster --engine aurora-postgresql --db-cluster-identifier my-cluster
aws rds delete-db-cluster --db-cluster-identifier my-cluster --skip-final-snapshot
# Serverless v2
aws rds modify-db-cluster \
--db-cluster-identifier my-cluster \
--serverless-v2-scaling-configuration MinCapacity=0.5,MaxCapacity=16
# Data API
aws rds-data execute-statement \
--resource-arn $CLUSTER_ARN \
--secret-arn $SECRET_ARN \
--database mydb \
--sql "SELECT * FROM users"
# 代理
aws rds describe-db-proxies
aws rds create-db-proxy --db-proxy-name my-proxy --engine-family POSTGRESQL ...
# 快照
aws rds create-db-cluster-snapshot --db-cluster-identifier my-cluster --db-cluster-snapshot-identifier backup-1
aws rds restore-db-cluster-from-snapshot --db-cluster-identifier restored --snapshot-identifier backup-1
反模式
- 直接 Lambda→Aurora 连接 - 始终使用 RDS Proxy 或 Data API
- 无连接限制 - 对于 Lambda 设置
max: 1,对于服务器使用池化 - 忽略冷启动 - Serverless v2 需要时间来扩展;为生产保持最小 ACUs
- 无读取副本 - 将读取卸载到副本以应对重负载
- 缺少 IAM 认证 - 尽可能使用 IAM 而不是静态密码
- 无重试逻辑 - 处理扩展/故障转移中的瞬时错误
- 过度配置容量 - 对于变化的工作负载使用 Serverless v2
- 跳过 Secrets Manager - 永远不要硬编码凭据