AWS DynamoDB 技能
加载方式:base.md + [typescript.md | python.md]
DynamoDB 是一个完全托管的 NoSQL 数据库,设计用于实现任何规模下的个位数毫秒级性能。掌握单表设计和访问模式建模。
资料来源: DynamoDB 文档 | SDK v3 | 最佳实践
核心原则
为访问模式设计,而不是实体。首先考虑访问模式。
DynamoDB 要求在设计模式之前了解您的查询。围绕您将如何访问数据进行建模,而不是数据如何关联。单表设计使用通用键属性将多个实体类型存储在一个表中。
核心概念
| 概念 | 描述 |
|---|---|
| 分区键 (PK) | 主键属性 - 决定数据分布 |
| 排序键 (SK) | 可选的次键,用于分区内的范围查询 |
| GSI | 全局次索引 - 替代分区/排序键 |
| LSI | 本地次索引 - 相同的分区,不同的排序 |
| 项目 | 单个记录(最大 400 KB) |
| 属性 | 项目内的字段 |
单表设计
为什么单表?
- 在单个查询中获取相关数据
- 减少往返次数和成本
- 跨实体类型启用事务
- 简化操作(备份、恢复、IAM)
通用键模式
// 而不是特定于实体的键:
// userId, orderId, productId
// 使用适用于所有实体的通用键:
interface BaseItem {
PK: string; // 分区键
SK: string; // 排序键
GSI1PK?: string; // 第一个 GSI 分区键
GSI1SK?: string; // 第一个 GSI 排序键
EntityType: string;
// ... 实体特定的属性
}
示例:电子商务模式
// 用户
{ PK: 'USER#123', SK: 'PROFILE', EntityType: 'User', name: 'John', email: 'john@test.com' }
{ PK: 'USER#123', SK: 'ADDRESS#1', EntityType: 'Address', street: '123 Main', city: 'NYC' }
// 用户的订单 (1:N 关系)
{ PK: 'USER#123', SK: 'ORDER#2024-001', EntityType: 'Order', total: 99.99, status: 'shipped' }
{ PK: 'USER#123', SK: 'ORDER#2024-002', EntityType: 'Order', total: 49.99, status: 'pending' }
// 通过 GSI 按订单 ID 查询订单详情
{ PK: 'USER#123', SK: 'ORDER#2024-001', GSI1PK: 'ORDER#2024-001', GSI1SK: 'ORDER', ... }
{ PK: 'ORDER#2024-001', SK: 'ITEM#1', GSI1PK: 'ORDER#2024-001', GSI1SK: 'ITEM#1', productId: 'PROD#456', qty: 2 }
// 产品
{ PK: 'PROD#456', SK: 'PRODUCT', EntityType: 'Product', name: 'Widget', price: 29.99 }
覆盖的访问模式
1. 获取用户配置文件 → 查询 PK='USER#123', SK='PROFILE'
2. 获取带有地址的用户 → 查询 PK='USER#123', SK begins_with 'ADDRESS'
3. 获取所有用户订单 → 查询 PK='USER#123', SK begins_with 'ORDER'
4. 通过 ID 获取订单 → 查询 GSI1, PK='ORDER#2024-001'
5. 获取包含项目的订单 → 查询 GSI1, PK='ORDER#2024-001'
6. 获取产品详情 → 查询 PK='PROD#456', SK='PRODUCT'
SDK v3 设置 (TypeScript)
安装依赖项
npm install @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb
客户端配置
// lib/dynamodb.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
const client = new DynamoDBClient({
region: process.env.AWS_REGION || 'us-east-1',
// 对于 DynamoDB 本地开发
...(process.env.DYNAMODB_LOCAL && {
endpoint: 'http://localhost:8000',
credentials: { accessKeyId: 'local', secretAccessKey: 'local' }
})
});
// 文档客户端简化操作
export const docClient = DynamoDBDocumentClient.from(client, {
marshallOptions: {
removeUndefinedValues: true, // 重要:匹配 v2 行为
convertClassInstanceToMap: true
},
unmarshallOptions: {
wrapNumbers: false
}
});
export const TABLE_NAME = process.env.DYNAMODB_TABLE || 'MyTable';
类型定义
// types/dynamodb.ts
export interface BaseItem {
PK: string;
SK: string;
GSI1PK?: string;
GSI1SK?: string;
EntityType: string;
createdAt: string;
updatedAt: string;
}
export interface User extends BaseItem {
EntityType: 'User';
userId: string;
email: string;
name: string;
}
export interface Order extends BaseItem {
EntityType: 'Order';
orderId: string;
userId: string;
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
}
// 键构建器
export const keys = {
user: (userId: string) => ({
PK: `USER#${userId}`,
SK: 'PROFILE'
}),
userOrders: (userId: string) => ({
PK: `USER#${userId}`,
SKPrefix: 'ORDER#'
}),
order: (userId: string, orderId: string) => ({
PK: `USER#${userId}`,
SK: `ORDER#${orderId}`,
GSI1PK: `ORDER#${orderId}`,
GSI1SK: 'ORDER'
})
};
CRUD 操作
放置项目(创建/更新)
import { PutCommand } from '@aws-sdk/lib-dynamodb';
import { docClient, TABLE_NAME } from './dynamodb';
import { User, keys } from './types';
async function createUser(userId: string, data: { email: string; name: string }): Promise<User> {
const now = new Date().toISOString();
const item: User = {
...keys.user(userId),
EntityType: 'User',
userId,
email: data.email,
name: data.name,
createdAt: now,
updatedAt: now
};
await docClient.send(new PutCommand({
TableName: TABLE_NAME,
Item: item,
ConditionExpression: 'attribute_not_exists(PK)' // 防止覆盖
}));
return item;
}
获取项目(读取)
import { GetCommand } from '@aws-sdk/lib-dynamodb';
async function getUser(userId: string): Promise<User | null> {
const result = await docClient.send(new GetCommand({
TableName: TABLE_NAME,
Key: keys.user(userId)
}));
return (result.Item as User) || null;
}
查询(列表/搜索)
import { QueryCommand } from '@aws-sdk/lib-dynamodb';
// 获取用户的所有订单
async function getUserOrders(userId: string): Promise<Order[]> {
const result = await docClient.send(new QueryCommand({
TableName: TABLE_NAME,
KeyConditionExpression: 'PK = :pk AND begins_with(SK, :sk)',
ExpressionAttributeValues: {
':pk': `USER#${userId}`,
':sk': 'ORDER#'
},
ScanIndexForward: false // 最新的在前
}));
return (result.Items as Order[]) || [];
}
// 通过订单 ID 查询 GSI
async function getOrderById(orderId: string): Promise<Order | null> {
const result = await docClient.send(new QueryCommand({
TableName: TABLE_NAME,
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk',
ExpressionAttributeValues: {
':pk': `ORDER#${orderId}`
}
}));
return (result.Items?.[0] as Order) || null;
}
// 分页查询
async function getUserOrdersPaginated(
userId: string,
pageSize: number = 20,
lastKey?: Record<string, any>
): Promise<{ items: Order[]; lastKey?: Record<string, any> }> {
const result = await docClient.send(new QueryCommand({
TableName: TABLE_NAME,
KeyConditionExpression: 'PK = :pk AND begins_with(SK, :sk)',
ExpressionAttributeValues: {
':pk': `USER#${userId}`,
':sk': 'ORDER#'
},
Limit: pageSize,
ExclusiveStartKey: lastKey
}));
return {
items: (result.Items as Order[]) || [],
lastKey: result.LastEvaluatedKey
};
}
更新项目
import { UpdateCommand } from '@aws-sdk/lib-dynamodb';
async function updateUser(userId: string, updates: Partial<Pick<User, 'name' | 'email'>>): Promise<User> {
// 动态构建更新表达式
const updateParts: string[] = ['#updatedAt = :updatedAt'];
const names: Record<string, string> = { '#updatedAt': 'updatedAt' };
const values: Record<string, any> = { ':updatedAt': new Date().toISOString() };
if (updates.name !== undefined) {
updateParts.push('#name = :name');
names['#name'] = 'name';
values[':name'] = updates.name;
}
if (updates.email !== undefined) {
updateParts.push('#email = :email');
names['#email'] = 'email';
values[':email'] = updates.email;
}
const result = await docClient.send(new UpdateCommand({
TableName: TABLE_NAME,
Key: keys.user(userId),
UpdateExpression: `SET ${updateParts.join(', ')}`,
ExpressionAttributeNames: names,
ExpressionAttributeValues: values,
ReturnValues: 'ALL_NEW',
ConditionExpression: 'attribute_exists(PK)' // 必须存在
}));
return result.Attributes as User;
}
// 原子计数器增量
async function incrementOrderCount(userId: string): Promise<void> {
await docClient.send(new UpdateCommand({
TableName: TABLE_NAME,
Key: keys.user(userId),
UpdateExpression: 'SET orderCount = if_not_exists(orderCount, :zero) + :inc',
ExpressionAttributeValues: {
':zero': 0,
':inc': 1
}
}));
}
删除项目
import { DeleteCommand } from '@aws-sdk/lib-dynamodb';
async function deleteUser(userId: string): Promise<void> {
await docClient.send(new DeleteCommand({
TableName: TABLE_NAME,
Key: keys.user(userId),
ConditionExpression: 'attribute_exists(PK)'
}));
}
批量操作
批量写入(最多 25 项)
import { BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
async function batchCreateItems(items: BaseItem[]): Promise<void> {
// DynamoDB 每批最多允许 25 项
const chunks = [];
for (let i = 0; i < items.length; i += 25) {
chunks.push(items.slice(i, i + 25));
}
for (const chunk of chunks) {
await docClient.send(new BatchWriteCommand({
RequestItems: {
[TABLE_NAME]: chunk.map(item => ({
PutRequest: { Item: item }
}))
}
}));
}
}
批量获取(最多 100 项)
import { BatchGetCommand } from '@aws-sdk/lib-dynamodb';
async function batchGetUsers(userIds: string[]): Promise<User[]> {
const result = await docClient.send(new BatchGetCommand({
RequestItems: {
[TABLE_NAME]: {
Keys: userIds.map(id => keys.user(id))
}
}
}));
return (result.Responses?.[TABLE_NAME] as User[]) || [];
}
事务
TransactWrite(原子多项目)
import { TransactWriteCommand } from '@aws-sdk/lib-dynamodb';
async function createOrderWithItems(
userId: string,
orderId: string,
orderData: { total: number },
items: { productId: string; quantity: number }[]
): Promise<void> {
const now = new Date().toISOString();
const transactItems = [
// 创建订单
{
Put: {
TableName: TABLE_NAME,
Item: {
...keys.order(userId, orderId),
EntityType: 'Order',
orderId,
userId,
total: orderData.total,
status: 'pending',
createdAt: now,
updatedAt: now
},
ConditionExpression: 'attribute_not_exists(PK)'
}
},
// 更新用户的订单计数
{
Update: {
TableName: TABLE_NAME,
Key: keys.user(userId),
UpdateExpression: 'SET orderCount = if_not_exists(orderCount, :zero) + :inc',
ExpressionAttributeValues: { ':zero': 0, ':inc': 1 }
}
},
// 添加订单项目
...items.map((item, index) => ({
Put: {
TableName: TABLE_NAME,
Item: {
PK: `ORDER#${orderId}`,
SK: `ITEM#${index}`,
GSI1PK: `ORDER#${orderId}`,
GSI1SK: `ITEM#${index}`,
EntityType: 'OrderItem',
productId: item.productId,
quantity: item.quantity,
createdAt: now
}
}
}))
];
await docClient.send(new TransactWriteCommand({
TransactItems: transactItems
}));
}
GSI 模式
稀疏索引
// 只有具有 GSI1PK 属性的项目才会出现在索引中
// 适用于 "特色" 或 "标记" 项目
// 特色产品(只有一些产品有 GSI1PK)
{ PK: 'PROD#1', SK: 'PRODUCT', GSI1PK: 'FEATURED', GSI1SK: 'PROD#1', ... } // 在索引中
{ PK: 'PROD#2', SK: 'PRODUCT', ... } // 不在索引中(没有 GSI1PK)
// 查询特色产品
const featured = await docClient.send(new QueryCommand({
TableName: TABLE_NAME,
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk',
ExpressionAttributeValues: { ':pk': 'FEATURED' }
}));
反转索引 (GSI)
// 主表:用户 -> 订单 (PK=USER#, SK=ORDER#)
// GSI:按状态的订单 (GSI1PK=STATUS#, GSI1SK=ORDER#)
{ PK: 'USER#123', SK: 'ORDER#001', GSI1PK: 'STATUS#pending', GSI1SK: 'ORDER#001', ... }
{ PK: 'USER#456', SK: 'ORDER#002', GSI1PK: 'STATUS#shipped', GSI1SK: 'ORDER#002', ... }
// 获取所有用户的待处理订单
const pending = await docClient.send(new QueryCommand({
TableName: TABLE_NAME,
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk',
ExpressionAttributeValues: { ':pk': 'STATUS#pending' }
}));
多属性复合键 (2025年11月+)
// 新功能:每个分区/排序键最多4个属性
// 不再需要合成键,如 "TOURNAMENT#WINTER2024#REGION#NA-EAST"
// 表定义 (IaC)
const table = {
AttributeDefinitions: [
{ AttributeName: 'tournament', AttributeType: 'S' },
{ AttributeName: 'region', AttributeType: 'S' },
{ AttributeName: 'score', AttributeType: 'N' }
],
GlobalSecondaryIndexes: [{
IndexName: 'TournamentRegionIndex',
KeySchema: [
{ AttributeName: 'tournament', KeyType: 'HASH' }, // 复合 PK 第一部分
{ AttributeName: 'region', KeyType: 'HASH' }, // 复合 PK 第二部分
{ AttributeName: 'score', KeyType: 'RANGE' }
]
}]
};
Python (boto3)
设置
# requirements.txt
boto3>=1.34.0
# db.py
import boto3
from boto3.dynamodb.conditions import Key, Attr
import os
dynamodb = boto3.resource(
'dynamodb',
region_name=os.getenv('AWS_REGION', 'us-east-1'),
endpoint_url=os.getenv('DYNAMODB_LOCAL_ENDPOINT') # 用于本地开发
)
table = dynamodb.Table(os.getenv('DYNAMODB_TABLE', 'MyTable'))
操作
from datetime import datetime
from typing import Optional, List
from decimal import Decimal
def create_user(user_id: str, email: str, name: str) -> dict:
now = datetime.utcnow().isoformat()
item = {
'PK': f'USER#{user_id}',
'SK': 'PROFILE',
'EntityType': 'User',
'userId': user_id,
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
table.put_item(
Item=item,
ConditionExpression='attribute_not_exists(PK)'
)
return item
def get_user(user_id: str) -> Optional[dict]:
response = table.get_item(
Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'}
)
return response.get('Item')
def get_user_orders(user_id: str) -> List[dict]:
response = table.query(
KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') & Key('SK').begins_with('ORDER#'),
ScanIndexForward=False
)
return response.get('Items', [])
def update_user(user_id: str, **updates) -> dict:
update_parts = ['#updatedAt = :updatedAt']
names = {'#updatedAt': 'updatedAt'}
values = {':updatedAt': datetime.utcnow().isoformat()}
for key, value in updates.items():
update_parts.append(f'#{key} = :{key}')
names[f'#{key}'] = key
values[f':{key}'] = value
response = table.update_item(
Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'},
UpdateExpression=f'SET {", ".join(update_parts)}',
ExpressionAttributeNames=names,
ExpressionAttributeValues=values,
ReturnValues='ALL_NEW'
)
return response['Attributes']
def delete_user(user_id: string) -> None:
table.delete_item(
Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'}
)
本地开发
DynamoDB 本地
# Docker
docker run -d -p 8000:8000 amazon/dynamodb-local
# 本地创建表
aws dynamodb create-table \
--endpoint-url http://localhost:8000 \
--table-name MyTable \
--attribute-definitions \
AttributeName=PK,AttributeType=S \
AttributeName=SK,AttributeType=S \
AttributeName=GSI1PK,AttributeType=S \
AttributeName=GSI1SK,AttributeType=S \
--key-schema \
AttributeName=PK,KeyType=HASH \
AttributeName=SK,KeyType=RANGE \
--global-secondary-indexes \
'IndexName=GSI1,KeySchema=[{AttributeName=GSI1PK,KeyType=HASH},{AttributeName=GSI1SK,KeyType=RANGE}],Projection={ProjectionType=ALL}' \
--billing-mode PAY_PER_REQUEST
NoSQL 工作台
AWS 提供了 NoSQL 工作台 用于可视化数据建模和查询。
CLI 快速参考
# 表操作
aws dynamodb create-table --cli-input-json file://table.json
aws dynamodb describe-table --table-name MyTable
aws dynamodb delete-table --table-name MyTable
# 项目操作
aws dynamodb put-item --table-name MyTable --item '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'
aws dynamodb get-item --table-name MyTable --key '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'
aws dynamodb delete-item --table-name MyTable --key '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'
# 查询
aws dynamodb query --table-name MyTable \
--key-condition-expression "PK = :pk" \
--expression-attribute-values '{":pk":{"S":"USER#1"}}'
# 扫描(生产中避免)
aws dynamodb scan --table-name MyTable --limit 10
反模式
- 扫描操作 - 始终使用具有适当键条件的查询
- 热分区 - 使用高基数分区键分布写入
- 大项目 - 保持项目在 400KB 以下;使用 S3 存储大数据
- 太多的 GSIs - 每个 GSI 复制数据;仔细设计
- 忽略容量 - 监控消耗的容量,对于变化的负载使用按需付费
- 没有条件表达式 - 始终使用 ConditionExpression 验证
- 获取所有属性 - 使用 ProjectionExpression 限制数据
- 无理由的多表设计 - 单表优先,除非访问模式不重叠