AWSDynamoDB单表设计 aws-dynamodb

掌握 AWS DynamoDB 的单表设计技巧,包括全局次索引(GSI)模式、SDK v3 的 TypeScript/Python 使用方法,以及最佳实践。

AWS 0 次安装 0 次浏览 更新于 3/4/2026

AWS DynamoDB 技能

加载方式:base.md + [typescript.md | python.md]

DynamoDB 是一个完全托管的 NoSQL 数据库,设计用于实现任何规模下的个位数毫秒级性能。掌握单表设计和访问模式建模。

资料来源: DynamoDB 文档 | SDK v3 | 最佳实践


核心原则

为访问模式设计,而不是实体。首先考虑访问模式。

DynamoDB 要求在设计模式之前了解您的查询。围绕您将如何访问数据进行建模,而不是数据如何关联。单表设计使用通用键属性将多个实体类型存储在一个表中。


核心概念

概念 描述
分区键 (PK) 主键属性 - 决定数据分布
排序键 (SK) 可选的次键,用于分区内的范围查询
GSI 全局次索引 - 替代分区/排序键
LSI 本地次索引 - 相同的分区,不同的排序
项目 单个记录(最大 400 KB)
属性 项目内的字段

单表设计

为什么单表?

  • 在单个查询中获取相关数据
  • 减少往返次数和成本
  • 跨实体类型启用事务
  • 简化操作(备份、恢复、IAM)

通用键模式

// 而不是特定于实体的键:
// userId, orderId, productId

// 使用适用于所有实体的通用键:
interface BaseItem {
  PK: string;   // 分区键
  SK: string;   // 排序键
  GSI1PK?: string;  // 第一个 GSI 分区键
  GSI1SK?: string;  // 第一个 GSI 排序键
  EntityType: string;
  // ... 实体特定的属性
}

示例:电子商务模式

// 用户
{ PK: 'USER#123', SK: 'PROFILE', EntityType: 'User', name: 'John', email: 'john@test.com' }
{ PK: 'USER#123', SK: 'ADDRESS#1', EntityType: 'Address', street: '123 Main', city: 'NYC' }

// 用户的订单 (1:N 关系)
{ PK: 'USER#123', SK: 'ORDER#2024-001', EntityType: 'Order', total: 99.99, status: 'shipped' }
{ PK: 'USER#123', SK: 'ORDER#2024-002', EntityType: 'Order', total: 49.99, status: 'pending' }

// 通过 GSI 按订单 ID 查询订单详情
{ PK: 'USER#123', SK: 'ORDER#2024-001', GSI1PK: 'ORDER#2024-001', GSI1SK: 'ORDER', ... }
{ PK: 'ORDER#2024-001', SK: 'ITEM#1', GSI1PK: 'ORDER#2024-001', GSI1SK: 'ITEM#1', productId: 'PROD#456', qty: 2 }

// 产品
{ PK: 'PROD#456', SK: 'PRODUCT', EntityType: 'Product', name: 'Widget', price: 29.99 }

覆盖的访问模式

1. 获取用户配置文件 → 查询 PK='USER#123', SK='PROFILE'
2. 获取带有地址的用户 → 查询 PK='USER#123', SK begins_with 'ADDRESS'
3. 获取所有用户订单 → 查询 PK='USER#123', SK begins_with 'ORDER'
4. 通过 ID 获取订单 → 查询 GSI1, PK='ORDER#2024-001'
5. 获取包含项目的订单 → 查询 GSI1, PK='ORDER#2024-001'
6. 获取产品详情 → 查询 PK='PROD#456', SK='PRODUCT'

SDK v3 设置 (TypeScript)

安装依赖项

npm install @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb

客户端配置

// lib/dynamodb.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';

const client = new DynamoDBClient({
  region: process.env.AWS_REGION || 'us-east-1',
  // 对于 DynamoDB 本地开发
  ...(process.env.DYNAMODB_LOCAL && {
    endpoint: 'http://localhost:8000',
    credentials: { accessKeyId: 'local', secretAccessKey: 'local' }
  })
});

// 文档客户端简化操作
export const docClient = DynamoDBDocumentClient.from(client, {
  marshallOptions: {
    removeUndefinedValues: true,  // 重要:匹配 v2 行为
    convertClassInstanceToMap: true
  },
  unmarshallOptions: {
    wrapNumbers: false
  }
});

export const TABLE_NAME = process.env.DYNAMODB_TABLE || 'MyTable';

类型定义

// types/dynamodb.ts
export interface BaseItem {
  PK: string;
  SK: string;
  GSI1PK?: string;
  GSI1SK?: string;
  EntityType: string;
  createdAt: string;
  updatedAt: string;
}

export interface User extends BaseItem {
  EntityType: 'User';
  userId: string;
  email: string;
  name: string;
}

export interface Order extends BaseItem {
  EntityType: 'Order';
  orderId: string;
  userId: string;
  total: number;
  status: 'pending' | 'paid' | 'shipped' | 'delivered';
}

// 键构建器
export const keys = {
  user: (userId: string) => ({
    PK: `USER#${userId}`,
    SK: 'PROFILE'
  }),
  userOrders: (userId: string) => ({
    PK: `USER#${userId}`,
    SKPrefix: 'ORDER#'
  }),
  order: (userId: string, orderId: string) => ({
    PK: `USER#${userId}`,
    SK: `ORDER#${orderId}`,
    GSI1PK: `ORDER#${orderId}`,
    GSI1SK: 'ORDER'
  })
};

CRUD 操作

放置项目(创建/更新)

import { PutCommand } from '@aws-sdk/lib-dynamodb';
import { docClient, TABLE_NAME } from './dynamodb';
import { User, keys } from './types';

async function createUser(userId: string, data: { email: string; name: string }): Promise<User> {
  const now = new Date().toISOString();
  const item: User = {
    ...keys.user(userId),
    EntityType: 'User',
    userId,
    email: data.email,
    name: data.name,
    createdAt: now,
    updatedAt: now
  };

  await docClient.send(new PutCommand({
    TableName: TABLE_NAME,
    Item: item,
    ConditionExpression: 'attribute_not_exists(PK)'  // 防止覆盖
  }));

  return item;
}

获取项目(读取)

import { GetCommand } from '@aws-sdk/lib-dynamodb';

async function getUser(userId: string): Promise<User | null> {
  const result = await docClient.send(new GetCommand({
    TableName: TABLE_NAME,
    Key: keys.user(userId)
  }));

  return (result.Item as User) || null;
}

查询(列表/搜索)

import { QueryCommand } from '@aws-sdk/lib-dynamodb';

// 获取用户的所有订单
async function getUserOrders(userId: string): Promise<Order[]> {
  const result = await docClient.send(new QueryCommand({
    TableName: TABLE_NAME,
    KeyConditionExpression: 'PK = :pk AND begins_with(SK, :sk)',
    ExpressionAttributeValues: {
      ':pk': `USER#${userId}`,
      ':sk': 'ORDER#'
    },
    ScanIndexForward: false  // 最新的在前
  }));

  return (result.Items as Order[]) || [];
}

// 通过订单 ID 查询 GSI
async function getOrderById(orderId: string): Promise<Order | null> {
  const result = await docClient.send(new QueryCommand({
    TableName: TABLE_NAME,
    IndexName: 'GSI1',
    KeyConditionExpression: 'GSI1PK = :pk',
    ExpressionAttributeValues: {
      ':pk': `ORDER#${orderId}`
    }
  }));

  return (result.Items?.[0] as Order) || null;
}

// 分页查询
async function getUserOrdersPaginated(
  userId: string,
  pageSize: number = 20,
  lastKey?: Record<string, any>
): Promise<{ items: Order[]; lastKey?: Record<string, any> }> {
  const result = await docClient.send(new QueryCommand({
    TableName: TABLE_NAME,
    KeyConditionExpression: 'PK = :pk AND begins_with(SK, :sk)',
    ExpressionAttributeValues: {
      ':pk': `USER#${userId}`,
      ':sk': 'ORDER#'
    },
    Limit: pageSize,
    ExclusiveStartKey: lastKey
  }));

  return {
    items: (result.Items as Order[]) || [],
    lastKey: result.LastEvaluatedKey
  };
}

更新项目

import { UpdateCommand } from '@aws-sdk/lib-dynamodb';

async function updateUser(userId: string, updates: Partial<Pick<User, 'name' | 'email'>>): Promise<User> {
  // 动态构建更新表达式
  const updateParts: string[] = ['#updatedAt = :updatedAt'];
  const names: Record<string, string> = { '#updatedAt': 'updatedAt' };
  const values: Record<string, any> = { ':updatedAt': new Date().toISOString() };

  if (updates.name !== undefined) {
    updateParts.push('#name = :name');
    names['#name'] = 'name';
    values[':name'] = updates.name;
  }

  if (updates.email !== undefined) {
    updateParts.push('#email = :email');
    names['#email'] = 'email';
    values[':email'] = updates.email;
  }

  const result = await docClient.send(new UpdateCommand({
    TableName: TABLE_NAME,
    Key: keys.user(userId),
    UpdateExpression: `SET ${updateParts.join(', ')}`,
    ExpressionAttributeNames: names,
    ExpressionAttributeValues: values,
    ReturnValues: 'ALL_NEW',
    ConditionExpression: 'attribute_exists(PK)'  // 必须存在
  }));

  return result.Attributes as User;
}

// 原子计数器增量
async function incrementOrderCount(userId: string): Promise<void> {
  await docClient.send(new UpdateCommand({
    TableName: TABLE_NAME,
    Key: keys.user(userId),
    UpdateExpression: 'SET orderCount = if_not_exists(orderCount, :zero) + :inc',
    ExpressionAttributeValues: {
      ':zero': 0,
      ':inc': 1
    }
  }));
}

删除项目

import { DeleteCommand } from '@aws-sdk/lib-dynamodb';

async function deleteUser(userId: string): Promise<void> {
  await docClient.send(new DeleteCommand({
    TableName: TABLE_NAME,
    Key: keys.user(userId),
    ConditionExpression: 'attribute_exists(PK)'
  }));
}

批量操作

批量写入(最多 25 项)

import { BatchWriteCommand } from '@aws-sdk/lib-dynamodb';

async function batchCreateItems(items: BaseItem[]): Promise<void> {
  // DynamoDB 每批最多允许 25 项
  const chunks = [];
  for (let i = 0; i < items.length; i += 25) {
    chunks.push(items.slice(i, i + 25));
  }

  for (const chunk of chunks) {
    await docClient.send(new BatchWriteCommand({
      RequestItems: {
        [TABLE_NAME]: chunk.map(item => ({
          PutRequest: { Item: item }
        }))
      }
    }));
  }
}

批量获取(最多 100 项)

import { BatchGetCommand } from '@aws-sdk/lib-dynamodb';

async function batchGetUsers(userIds: string[]): Promise<User[]> {
  const result = await docClient.send(new BatchGetCommand({
    RequestItems: {
      [TABLE_NAME]: {
        Keys: userIds.map(id => keys.user(id))
      }
    }
  }));

  return (result.Responses?.[TABLE_NAME] as User[]) || [];
}

事务

TransactWrite(原子多项目)

import { TransactWriteCommand } from '@aws-sdk/lib-dynamodb';

async function createOrderWithItems(
  userId: string,
  orderId: string,
  orderData: { total: number },
  items: { productId: string; quantity: number }[]
): Promise<void> {
  const now = new Date().toISOString();

  const transactItems = [
    // 创建订单
    {
      Put: {
        TableName: TABLE_NAME,
        Item: {
          ...keys.order(userId, orderId),
          EntityType: 'Order',
          orderId,
          userId,
          total: orderData.total,
          status: 'pending',
          createdAt: now,
          updatedAt: now
        },
        ConditionExpression: 'attribute_not_exists(PK)'
      }
    },
    // 更新用户的订单计数
    {
      Update: {
        TableName: TABLE_NAME,
        Key: keys.user(userId),
        UpdateExpression: 'SET orderCount = if_not_exists(orderCount, :zero) + :inc',
        ExpressionAttributeValues: { ':zero': 0, ':inc': 1 }
      }
    },
    // 添加订单项目
    ...items.map((item, index) => ({
      Put: {
        TableName: TABLE_NAME,
        Item: {
          PK: `ORDER#${orderId}`,
          SK: `ITEM#${index}`,
          GSI1PK: `ORDER#${orderId}`,
          GSI1SK: `ITEM#${index}`,
          EntityType: 'OrderItem',
          productId: item.productId,
          quantity: item.quantity,
          createdAt: now
        }
      }
    }))
  ];

  await docClient.send(new TransactWriteCommand({
    TransactItems: transactItems
  }));
}

GSI 模式

稀疏索引

// 只有具有 GSI1PK 属性的项目才会出现在索引中
// 适用于 "特色" 或 "标记" 项目

// 特色产品(只有一些产品有 GSI1PK)
{ PK: 'PROD#1', SK: 'PRODUCT', GSI1PK: 'FEATURED', GSI1SK: 'PROD#1', ... }  // 在索引中
{ PK: 'PROD#2', SK: 'PRODUCT', ... }  // 不在索引中(没有 GSI1PK)

// 查询特色产品
const featured = await docClient.send(new QueryCommand({
  TableName: TABLE_NAME,
  IndexName: 'GSI1',
  KeyConditionExpression: 'GSI1PK = :pk',
  ExpressionAttributeValues: { ':pk': 'FEATURED' }
}));

反转索引 (GSI)

// 主表:用户 -> 订单 (PK=USER#, SK=ORDER#)
// GSI:按状态的订单 (GSI1PK=STATUS#, GSI1SK=ORDER#)

{ PK: 'USER#123', SK: 'ORDER#001', GSI1PK: 'STATUS#pending', GSI1SK: 'ORDER#001', ... }
{ PK: 'USER#456', SK: 'ORDER#002', GSI1PK: 'STATUS#shipped', GSI1SK: 'ORDER#002', ... }

// 获取所有用户的待处理订单
const pending = await docClient.send(new QueryCommand({
  TableName: TABLE_NAME,
  IndexName: 'GSI1',
  KeyConditionExpression: 'GSI1PK = :pk',
  ExpressionAttributeValues: { ':pk': 'STATUS#pending' }
}));

多属性复合键 (2025年11月+)

// 新功能:每个分区/排序键最多4个属性
// 不再需要合成键,如 "TOURNAMENT#WINTER2024#REGION#NA-EAST"

// 表定义 (IaC)
const table = {
  AttributeDefinitions: [
    { AttributeName: 'tournament', AttributeType: 'S' },
    { AttributeName: 'region', AttributeType: 'S' },
    { AttributeName: 'score', AttributeType: 'N' }
  ],
  GlobalSecondaryIndexes: [{
    IndexName: 'TournamentRegionIndex',
    KeySchema: [
      { AttributeName: 'tournament', KeyType: 'HASH' },  // 复合 PK 第一部分
      { AttributeName: 'region', KeyType: 'HASH' },      // 复合 PK 第二部分
      { AttributeName: 'score', KeyType: 'RANGE' }
    ]
  }]
};

Python (boto3)

设置

# requirements.txt
boto3>=1.34.0

# db.py
import boto3
from boto3.dynamodb.conditions import Key, Attr
import os

dynamodb = boto3.resource(
    'dynamodb',
    region_name=os.getenv('AWS_REGION', 'us-east-1'),
    endpoint_url=os.getenv('DYNAMODB_LOCAL_ENDPOINT')  # 用于本地开发
)

table = dynamodb.Table(os.getenv('DYNAMODB_TABLE', 'MyTable'))

操作

from datetime import datetime
from typing import Optional, List
from decimal import Decimal

def create_user(user_id: str, email: str, name: str) -> dict:
    now = datetime.utcnow().isoformat()
    item = {
        'PK': f'USER#{user_id}',
        'SK': 'PROFILE',
        'EntityType': 'User',
        'userId': user_id,
        'email': email,
        'name': name,
        'createdAt': now,
        'updatedAt': now
    }

    table.put_item(
        Item=item,
        ConditionExpression='attribute_not_exists(PK)'
    )
    return item


def get_user(user_id: str) -> Optional[dict]:
    response = table.get_item(
        Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'}
    )
    return response.get('Item')


def get_user_orders(user_id: str) -> List[dict]:
    response = table.query(
        KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') & Key('SK').begins_with('ORDER#'),
        ScanIndexForward=False
    )
    return response.get('Items', [])


def update_user(user_id: str, **updates) -> dict:
    update_parts = ['#updatedAt = :updatedAt']
    names = {'#updatedAt': 'updatedAt'}
    values = {':updatedAt': datetime.utcnow().isoformat()}

    for key, value in updates.items():
        update_parts.append(f'#{key} = :{key}')
        names[f'#{key}'] = key
        values[f':{key}'] = value

    response = table.update_item(
        Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'},
        UpdateExpression=f'SET {", ".join(update_parts)}',
        ExpressionAttributeNames=names,
        ExpressionAttributeValues=values,
        ReturnValues='ALL_NEW'
    )
    return response['Attributes']


def delete_user(user_id: string) -> None:
    table.delete_item(
        Key={'PK': f'USER#{user_id}', 'SK': 'PROFILE'}
    )

本地开发

DynamoDB 本地

# Docker
docker run -d -p 8000:8000 amazon/dynamodb-local

# 本地创建表
aws dynamodb create-table \
  --endpoint-url http://localhost:8000 \
  --table-name MyTable \
  --attribute-definitions \
    AttributeName=PK,AttributeType=S \
    AttributeName=SK,AttributeType=S \
    AttributeName=GSI1PK,AttributeType=S \
    AttributeName=GSI1SK,AttributeType=S \
  --key-schema \
    AttributeName=PK,KeyType=HASH \
    AttributeName=SK,KeyType=RANGE \
  --global-secondary-indexes \
    'IndexName=GSI1,KeySchema=[{AttributeName=GSI1PK,KeyType=HASH},{AttributeName=GSI1SK,KeyType=RANGE}],Projection={ProjectionType=ALL}' \
  --billing-mode PAY_PER_REQUEST

NoSQL 工作台

AWS 提供了 NoSQL 工作台 用于可视化数据建模和查询。


CLI 快速参考

# 表操作
aws dynamodb create-table --cli-input-json file://table.json
aws dynamodb describe-table --table-name MyTable
aws dynamodb delete-table --table-name MyTable

# 项目操作
aws dynamodb put-item --table-name MyTable --item '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'
aws dynamodb get-item --table-name MyTable --key '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'
aws dynamodb delete-item --table-name MyTable --key '{"PK":{"S":"USER#1"},"SK":{"S":"PROFILE"}}'

# 查询
aws dynamodb query --table-name MyTable \
  --key-condition-expression "PK = :pk" \
  --expression-attribute-values '{":pk":{"S":"USER#1"}}'

# 扫描(生产中避免)
aws dynamodb scan --table-name MyTable --limit 10

反模式

  • 扫描操作 - 始终使用具有适当键条件的查询
  • 热分区 - 使用高基数分区键分布写入
  • 大项目 - 保持项目在 400KB 以下;使用 S3 存储大数据
  • 太多的 GSIs - 每个 GSI 复制数据;仔细设计
  • 忽略容量 - 监控消耗的容量,对于变化的负载使用按需付费
  • 没有条件表达式 - 始终使用 ConditionExpression 验证
  • 获取所有属性 - 使用 ProjectionExpression 限制数据
  • 无理由的多表设计 - 单表优先,除非访问模式不重叠