名称: mongodb-transactions 版本: “2.1.0” 描述: 掌握MongoDB ACID事务以进行多文档操作。学习会话管理、事务机制、错误处理和生产模式。确保跨多个操作的数据一致性。 sasmp_version: “1.3.0” bonded_agent: 07-mongodb-application-development bond_type: PRIMARY_BOND
生产级技能配置
能力:
- 会话管理
- 多文档事务
- 重试逻辑
- 错误恢复
- 一致性保证
输入验证: 必需上下文: - 操作列表 - 一致性要求 可选上下文: - 读关注 - 写关注 - 超时毫秒
输出格式: 事务代码: 字符串 会话处理: 字符串 错误处理: 字符串 重试逻辑: 字符串 回滚过程: 字符串
错误处理: 常见错误: - 代码: TXN001 条件: “暂时性事务错误” 恢复: “使用指数退避重试整个事务” - 代码: TXN002 条件: “未知事务提交结果” 恢复: “检查操作幂等性,重试前验证状态” - 代码: TXN003 条件: “事务超时” 恢复: “增加maxTransactionLockRequestTimeoutMillis或优化操作”
先决条件: mongodb版本: “4.0+ (副本集), 4.2+ (分片集群)” 必需知识: - CRUD操作 - 会话基础 部署要求: - “副本集或分片集群” - “不适用于独立部署或Atlas M0”
测试: 单元测试模板: | // 测试带回滚的事务 const session = client.startSession() try { await session.withTransaction(async () => { await collection1.insertOne(doc1, { session }) await collection2.insertOne(doc2, { session }) }) // 验证两者都已插入 } catch (error) { // 验证两者都未插入(回滚) } finally { await session.endSession() }
MongoDB多文档事务
通过ACID事务确保跨多个操作的一致性。
快速开始
基础事务
const session = client.startSession()
try {
await session.withTransaction(async () => {
// 这里的所有操作都是原子的
await users.insertOne({ name: 'John' }, { session })
await accounts.insertOne({ userId: 'xxx', balance: 100 }, { session })
// 如果任何失败,所有操作回滚
})
} catch (error) {
console.error('事务失败:', error)
} finally {
await session.endSession()
}
实际案例: 资金转账
async function transferMoney(fromId, toId, amount) {
const session = client.startSession()
try {
await session.withTransaction(async () => {
// 从账户A扣除
await accounts.updateOne(
{ _id: fromId },
{ $inc: { balance: -amount } },
{ session }
)
// 添加到账户B
await accounts.updateOne(
{ _id: toId },
{ $inc: { balance: amount } },
{ session }
)
// 两者都成功,或两者都回滚 - 没有部分转账!
})
} catch (error) {
// 事务失败,资金未转移
console.error('转账失败:', error)
} finally {
await session.endSession()
}
}
事务要求
MongoDB版本
- MongoDB 4.0+: 单文档事务(所有版本)
- MongoDB 4.0: 多文档事务(副本集)
- MongoDB 4.2+: 多文档事务(分片集群)
部署类型
- 副本集: 事务必需
- 分片集群: MongoDB 4.2+
- 独立部署: 仅单文档事务
- Atlas免费版: 无事务(共享集群)
会话管理
创建会话
const session = client.startSession()
// 配置会话
const session = client.startSession({
defaultTransactionOptions: {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
}
})
会话生命周期
// 1. 开始会话
const session = client.startSession()
// 2. 开始事务
session.startTransaction()
// 3. 使用会话执行操作
await collection.insertOne(doc, { session })
// 4. 提交或中止
await session.commitTransaction() // 成功
await session.abortTransaction() // 回滚
// 5. 结束会话
await session.endSession()
事务选项
读关注
// snapshot: 查看事务开始时的已提交数据
// local: 查看最新数据(默认)
// majority: 查看多数确认的数据
// linearizable: 最新确认的数据
await session.withTransaction(async () => {
// 操作
}, {
readConcern: { level: 'snapshot' }
})
写关注
// w: 1 (默认): 主节点确认
// w: 'majority': 多数节点确认
// w: N: N个副本确认
await session.withTransaction(async () => {
// 操作
}, {
writeConcern: { w: 'majority' }
})
读偏好
// primary: 仅从主节点读取
// primaryPreferred: 主节点,回退到从节点
// secondary: 从从节点读取
// secondaryPreferred: 从节点,回退到主节点
// nearest: 最近服务器
{
readPreference: 'primary'
}
错误处理
处理事务错误
async function robustTransaction() {
const session = client.startSession()
try {
await session.withTransaction(async () => {
// 操作
})
} catch (error) {
if (error.hasErrorLabel('TransientTransactionError')) {
// 临时错误 - 重试
return robustTransaction()
} else if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
// 提交结果未知 - 检查状态
console.log('提交结果未知')
} else {
// 致命错误
throw error
}
} finally {
await session.endSession()
}
}
重试逻辑
async function executeWithRetry(fn, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
const session = client.startSession()
try {
await session.withTransaction(async () => {
await fn(session)
})
return // 成功
} catch (error) {
if (error.hasErrorLabel('TransientTransactionError') && attempt < maxRetries) {
continue // 重试
} else {
throw error
}
} finally {
await session.endSession()
}
}
}
实际案例
带库存的订单放置
async function placeOrder(userId, items) {
const session = client.startSession()
try {
await session.withTransaction(async () => {
// 1. 创建订单
const order = {
userId,
items,
status: 'pending',
createdAt: new Date()
}
const orderResult = await orders.insertOne(order, { session })
// 2. 更新库存
for (const item of items) {
const updated = await products.findOneAndUpdate(
{ _id: item.productId },
{ $inc: { stock: -item.quantity } },
{ session, returnDocument: 'after' }
)
// 检查库存是否变为负数
if (updated.value.stock < 0) {
throw new Error('库存不足')
}
}
// 3. 从用户账户扣除
const userUpdate = await users.findOneAndUpdate(
{ _id: userId },
{ $inc: { balance: -calculateTotal(items) } },
{ session, returnDocument: 'after' }
)
if (userUpdate.value.balance < 0) {
throw new Error('资金不足')
}
return orderResult.insertedId
})
} catch (error) {
// 如果任何失败,所有操作回滚
// 库存保持不变
// 用户余额未变
// 订单未创建
console.error('订单失败:', error.message)
throw error
} finally {
await session.endSession()
}
}
账户对账
async function reconcileAccounts(mainId, secondaryIds) {
const session = client.startSession()
try {
await session.withTransaction(async () => {
// 计算总余额
const secondary = await accounts.find(
{ _id: { $in: secondaryIds } },
{ session }
).toArray()
const totalBalance = secondary.reduce((sum, acc) => sum + acc.balance, 0)
// 更新主账户
await accounts.updateOne(
{ _id: mainId },
{ $set: { balance: totalBalance } },
{ session }
)
// 删除次要账户
await accounts.deleteMany(
{ _id: { $in: secondaryIds } },
{ session }
)
// 记录对账
await reconciliationLog.insertOne({
mainId,
secondaryIds,
totalBalance,
timestamp: new Date()
}, { session })
// 全部成功,或全部回滚
})
} finally {
await session.endSession()
}
}
限制与考虑
事务限制
- 最大16MB的写操作
- 不能创建/删除集合
- 不能创建/删除索引
- 不能修改集合
- 不能写入系统集合
性能影响
- 事务有开销
- 写操作更慢(更多协调)
- 锁增加锁争用
- 不适用于每个操作
最佳实践
✅ 事务最佳实践:
- 保持简短 - 最小化锁时间
- 重试暂时性错误 - 网络问题会发生
- 排序操作 - 防止死锁
- 使用适当的写关注 - ‘majority’ 用于安全
- 监控延迟 - 事务增加开销
✅ 何时使用:
- ✅ 资金转账
- ✅ 订单处理
- ✅ 库存管理
- ✅ 账户操作
- ✅ 任何多集合原子操作
❌ 何时不使用:
- ❌ 单文档操作(本身原子)
- ❌ 简单插入/更新
- ❌ 性能关键读取
- ❌ 批量操作(使用批量)
- ❌ 如果不在副本集上
下一步
- 学习会话基础 - StartSession, endSession
- 编写简单事务 - 插入和更新
- 添加错误处理 - Try-catch块
- 实现重试逻辑 - 处理暂时性错误
- 监控性能 - 测量事务时间
通过事务确保一致性! ✅