MongoDB多文档事务管理Skill mongodb-transactions

这个技能专注于MongoDB数据库中的ACID事务实现,用于保证在多文档操作时的数据一致性和原子性。包括会话管理、事务配置、错误恢复和重试逻辑。适用于后端开发、金融系统、电商平台等需要高一致性保证的场景。关键词:MongoDB, ACID事务, 多文档操作, 数据一致性, 错误处理, 会话管理, 重试逻辑, 后端开发。

后端开发 0 次安装 0 次浏览 更新于 3/14/2026

名称: mongodb-transactions 版本: “2.1.0” 描述: 掌握MongoDB ACID事务以进行多文档操作。学习会话管理、事务机制、错误处理和生产模式。确保跨多个操作的数据一致性。 sasmp_version: “1.3.0” bonded_agent: 07-mongodb-application-development bond_type: PRIMARY_BOND

生产级技能配置

能力:

  • 会话管理
  • 多文档事务
  • 重试逻辑
  • 错误恢复
  • 一致性保证

输入验证: 必需上下文: - 操作列表 - 一致性要求 可选上下文: - 读关注 - 写关注 - 超时毫秒

输出格式: 事务代码: 字符串 会话处理: 字符串 错误处理: 字符串 重试逻辑: 字符串 回滚过程: 字符串

错误处理: 常见错误: - 代码: TXN001 条件: “暂时性事务错误” 恢复: “使用指数退避重试整个事务” - 代码: TXN002 条件: “未知事务提交结果” 恢复: “检查操作幂等性,重试前验证状态” - 代码: TXN003 条件: “事务超时” 恢复: “增加maxTransactionLockRequestTimeoutMillis或优化操作”

先决条件: mongodb版本: “4.0+ (副本集), 4.2+ (分片集群)” 必需知识: - CRUD操作 - 会话基础 部署要求: - “副本集或分片集群” - “不适用于独立部署或Atlas M0”

测试: 单元测试模板: | // 测试带回滚的事务 const session = client.startSession() try { await session.withTransaction(async () => { await collection1.insertOne(doc1, { session }) await collection2.insertOne(doc2, { session }) }) // 验证两者都已插入 } catch (error) { // 验证两者都未插入(回滚) } finally { await session.endSession() }

MongoDB多文档事务

通过ACID事务确保跨多个操作的一致性。

快速开始

基础事务

const session = client.startSession()

try {
  await session.withTransaction(async () => {
    // 这里的所有操作都是原子的
    await users.insertOne({ name: 'John' }, { session })
    await accounts.insertOne({ userId: 'xxx', balance: 100 }, { session })

    // 如果任何失败,所有操作回滚
  })
} catch (error) {
  console.error('事务失败:', error)
} finally {
  await session.endSession()
}

实际案例: 资金转账

async function transferMoney(fromId, toId, amount) {
  const session = client.startSession()

  try {
    await session.withTransaction(async () => {
      // 从账户A扣除
      await accounts.updateOne(
        { _id: fromId },
        { $inc: { balance: -amount } },
        { session }
      )

      // 添加到账户B
      await accounts.updateOne(
        { _id: toId },
        { $inc: { balance: amount } },
        { session }
      )

      // 两者都成功,或两者都回滚 - 没有部分转账!
    })
  } catch (error) {
    // 事务失败,资金未转移
    console.error('转账失败:', error)
  } finally {
    await session.endSession()
  }
}

事务要求

MongoDB版本

  • MongoDB 4.0+: 单文档事务(所有版本)
  • MongoDB 4.0: 多文档事务(副本集)
  • MongoDB 4.2+: 多文档事务(分片集群)

部署类型

  • 副本集: 事务必需
  • 分片集群: MongoDB 4.2+
  • 独立部署: 仅单文档事务
  • Atlas免费版: 无事务(共享集群)

会话管理

创建会话

const session = client.startSession()

// 配置会话
const session = client.startSession({
  defaultTransactionOptions: {
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
    readPreference: 'primary'
  }
})

会话生命周期

// 1. 开始会话
const session = client.startSession()

// 2. 开始事务
session.startTransaction()

// 3. 使用会话执行操作
await collection.insertOne(doc, { session })

// 4. 提交或中止
await session.commitTransaction()  // 成功
await session.abortTransaction()   // 回滚

// 5. 结束会话
await session.endSession()

事务选项

读关注

// snapshot: 查看事务开始时的已提交数据
// local: 查看最新数据(默认)
// majority: 查看多数确认的数据
// linearizable: 最新确认的数据

await session.withTransaction(async () => {
  // 操作
}, {
  readConcern: { level: 'snapshot' }
})

写关注

// w: 1 (默认): 主节点确认
// w: 'majority': 多数节点确认
// w: N: N个副本确认

await session.withTransaction(async () => {
  // 操作
}, {
  writeConcern: { w: 'majority' }
})

读偏好

// primary: 仅从主节点读取
// primaryPreferred: 主节点,回退到从节点
// secondary: 从从节点读取
// secondaryPreferred: 从节点,回退到主节点
// nearest: 最近服务器

{
  readPreference: 'primary'
}

错误处理

处理事务错误

async function robustTransaction() {
  const session = client.startSession()

  try {
    await session.withTransaction(async () => {
      // 操作
    })
  } catch (error) {
    if (error.hasErrorLabel('TransientTransactionError')) {
      // 临时错误 - 重试
      return robustTransaction()
    } else if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
      // 提交结果未知 - 检查状态
      console.log('提交结果未知')
    } else {
      // 致命错误
      throw error
    }
  } finally {
    await session.endSession()
  }
}

重试逻辑

async function executeWithRetry(fn, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    const session = client.startSession()

    try {
      await session.withTransaction(async () => {
        await fn(session)
      })
      return  // 成功
    } catch (error) {
      if (error.hasErrorLabel('TransientTransactionError') && attempt < maxRetries) {
        continue  // 重试
      } else {
        throw error
      }
    } finally {
      await session.endSession()
    }
  }
}

实际案例

带库存的订单放置

async function placeOrder(userId, items) {
  const session = client.startSession()

  try {
    await session.withTransaction(async () => {
      // 1. 创建订单
      const order = {
        userId,
        items,
        status: 'pending',
        createdAt: new Date()
      }
      const orderResult = await orders.insertOne(order, { session })

      // 2. 更新库存
      for (const item of items) {
        const updated = await products.findOneAndUpdate(
          { _id: item.productId },
          { $inc: { stock: -item.quantity } },
          { session, returnDocument: 'after' }
        )

        // 检查库存是否变为负数
        if (updated.value.stock < 0) {
          throw new Error('库存不足')
        }
      }

      // 3. 从用户账户扣除
      const userUpdate = await users.findOneAndUpdate(
        { _id: userId },
        { $inc: { balance: -calculateTotal(items) } },
        { session, returnDocument: 'after' }
      )

      if (userUpdate.value.balance < 0) {
        throw new Error('资金不足')
      }

      return orderResult.insertedId
    })
  } catch (error) {
    // 如果任何失败,所有操作回滚
    // 库存保持不变
    // 用户余额未变
    // 订单未创建
    console.error('订单失败:', error.message)
    throw error
  } finally {
    await session.endSession()
  }
}

账户对账

async function reconcileAccounts(mainId, secondaryIds) {
  const session = client.startSession()

  try {
    await session.withTransaction(async () => {
      // 计算总余额
      const secondary = await accounts.find(
        { _id: { $in: secondaryIds } },
        { session }
      ).toArray()
      const totalBalance = secondary.reduce((sum, acc) => sum + acc.balance, 0)

      // 更新主账户
      await accounts.updateOne(
        { _id: mainId },
        { $set: { balance: totalBalance } },
        { session }
      )

      // 删除次要账户
      await accounts.deleteMany(
        { _id: { $in: secondaryIds } },
        { session }
      )

      // 记录对账
      await reconciliationLog.insertOne({
        mainId,
        secondaryIds,
        totalBalance,
        timestamp: new Date()
      }, { session })

      // 全部成功,或全部回滚
    })
  } finally {
    await session.endSession()
  }
}

限制与考虑

事务限制

- 最大16MB的写操作
- 不能创建/删除集合
- 不能创建/删除索引
- 不能修改集合
- 不能写入系统集合

性能影响

- 事务有开销
- 写操作更慢(更多协调)
- 锁增加锁争用
- 不适用于每个操作

最佳实践

事务最佳实践:

  1. 保持简短 - 最小化锁时间
  2. 重试暂时性错误 - 网络问题会发生
  3. 排序操作 - 防止死锁
  4. 使用适当的写关注 - ‘majority’ 用于安全
  5. 监控延迟 - 事务增加开销

何时使用:

  1. ✅ 资金转账
  2. ✅ 订单处理
  3. ✅ 库存管理
  4. ✅ 账户操作
  5. ✅ 任何多集合原子操作

何时不使用:

  1. ❌ 单文档操作(本身原子)
  2. ❌ 简单插入/更新
  3. ❌ 性能关键读取
  4. ❌ 批量操作(使用批量)
  5. ❌ 如果不在副本集上

下一步

  1. 学习会话基础 - StartSession, endSession
  2. 编写简单事务 - 插入和更新
  3. 添加错误处理 - Try-catch块
  4. 实现重试逻辑 - 处理暂时性错误
  5. 监控性能 - 测量事务时间

通过事务确保一致性!