数据库事务管理 DatabaseTransactions

数据库事务管理是确保数据完整性和一致性的关键技术,涉及到ACID属性、隔离级别、锁定机制等,对于任何需要数据事务处理的软件开发至关重要。

后端开发 0 次安装 0 次浏览 更新于 3/5/2026

数据库事务

概览

数据库事务是维护应用程序数据完整性和一致性的基础。它们提供了一种将多个操作组合成一个工作单元的方式,该工作单元要么完全成功,要么完全失败,遵循ACID原则。

前提条件

  • 理解SQL和数据库操作
  • 了解数据库系统(PostgreSQL、MySQL等)
  • 熟悉并发概念
  • 基本了解分布式系统

核心概念

数据库事务是什么?

数据库事务是:

  • 原子性:所有操作全部成功或全部不做
  • 一致性:数据库从有效状态转移到另一个有效状态
  • 隔离性:事务之间不互相干扰
  • 持久性:提交的更改持久保存

ACID属性

原子性

原子性确保事务中的所有操作被视为一个单一单元。要么所有操作成功,要么都不成功。

-- 示例:账户间转账
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;  -- 两个更新要么都成功,要么都失败

如果第二个UPDATE失败,则第一个UPDATE自动回滚。

一致性

一致性确保事务将数据库从一个有效状态带到另一个有效状态,维护所有数据库规则和约束。

-- 示例:确保业务规则
BEGIN TRANSACTION;

-- 检查是否有足够的余额
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
-- 如果余额 < 100,则ROLLBACK

-- 执行转账
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- 验证没有负余额
SELECT COUNT(*) FROM accounts WHERE balance < 0;
-- 如果计数 > 0,则ROLLBACK

COMMIT;

隔离性

隔离性确保并发事务不会相互干扰。每个事务看到一个一致的数据库快照。

-- 事务1
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1;  -- 读取$1000

-- 事务2(并发)
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance + 500 WHERE id = 1;
COMMIT;  -- 更新到$1500

-- 事务1继续
SELECT balance FROM accounts WHERE id = 1;  -- 仍然读取$1000
COMMIT;

持久性

持久性确保一旦事务被提交,它就是永久的,即使在系统故障的情况下也是如此。

BEGIN TRANSACTION;
INSERT INTO orders (user_id, total) VALUES (1, 100.00);
COMMIT;  -- 数据被永久存储
-- 即使服务器在此之后崩溃,数据也被保留

实施指南

事务生命周期

BEGIN

开始一个新的事务。

BEGIN;
-- 或
BEGIN TRANSACTION;
-- 或
START TRANSACTION;

COMMIT

永久保存事务中所做的所有更改。

BEGIN;
UPDATE users SET email = 'new@example.com' WHERE id = 1;
COMMIT;  -- 更改现在是永久的

ROLLBACK

撤销事务中所做的所有更改。

BEGIN;
UPDATE users SET email = 'new@example.com' WHERE id = 1;
ROLLBACK;  -- 更改被丢弃

事务生命周期示例

-- 1. 开始事务
BEGIN;

-- 2. 执行操作
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 1;
INSERT INTO orders (product_id, quantity) VALUES (1, 1);

-- 3. 检查条件
IF (SELECT quantity FROM inventory WHERE product_id = 1) < 0 THEN
    ROLLBACK;  -- 库存不足
ELSE
    COMMIT;    -- 成功
END IF;

隔离级别

READ UNCOMMITTED

最低的隔离级别。事务可以从其他事务中读取未提交的更改。

SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
BEGIN;
SELECT * FROM accounts WHERE id = 1;  -- 可以读取未提交的数据
COMMIT;

特点:

  • 允许脏读
  • 允许不可重复读
  • 允许幻读
  • 并发性最高,一致性最低

何时使用:

  • 实际中很少使用
  • 仅当性能至关重要且可以接受脏读时

READ COMMITTED

PostgreSQL和SQL Server中的默认隔离级别。事务只能读取已提交的更改。

SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
BEGIN;
SELECT * FROM accounts WHERE id = 1;  -- 只读取已提交的数据
COMMIT;

特点:

  • 防止脏读
  • 允许不可重复读
  • 允许幻读
  • 一致性和性能之间的良好平衡

何时使用:

  • 大多数应用程序
  • 当你需要看到其他事务的已提交更改时

REPEATABLE READ

MySQL中的默认隔离级别。事务从事务开始时看到一个一致的快照。

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN;
SELECT * FROM accounts WHERE id = 1;  -- 一致的快照
-- 即使其他事务提交更改,你也不会看到它们
COMMIT;

特点:

  • 防止脏读
  • 防止不可重复读
  • 允许幻读(在MySQL中,防止幻读)
  • 适用于报告和分析

何时使用:

  • 当你需要在事务中一致地读取时
  • 报告查询
  • 当不可重复读会导致问题时

SERIALIZABLE

最高的隔离级别。事务完全隔离于彼此。

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
SELECT * FROM accounts WHERE id = 1;  -- 完全隔离
COMMIT;

特点:

  • 防止脏读
  • 防止不可重复读
  • 防止幻读
  • 并发性最低,一致性最高
  • 可能会导致序列化失败

何时使用:

  • 重要的财务操作
  • 当绝对一致性需要时
  • 当幻读会导致问题时

锁定机制

悲观锁定

在访问资源之前锁定资源,假设会发生冲突。

-- SELECT FOR UPDATE - 锁定用于更新的行
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 此行现在被锁定,其他事务必须等待

-- 进行一些处理
-- ...

UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;  -- 锁定被释放

SELECT FOR SHARE (PostgreSQL) / SELECT FOR SHARE (MySQL):

BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
-- 其他事务也可以使用FOR SHARE读取
-- 但在当前事务提交之前不能更新
COMMIT;

优点:

  • 保证数据一致性
  • 易于理解
  • 适用于高并发场景

缺点:

  • 可能导致死锁
  • 降低并发性
  • 可能会导致性能问题

乐观锁定

假设冲突很少,并在提交前检查冲突。

-- 添加版本列
ALTER TABLE accounts ADD COLUMN version INT DEFAULT 0;

-- 使用版本检查更新
UPDATE accounts
SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 5;

-- 检查是否更新成功
IF ROW_COUNT() = 0 THEN
    -- 发生冲突,处理它
    ROLLBACK;
END IF;

使用时间戳:

-- 添加updated_at列
ALTER TABLE accounts ADD COLUMN updated_at TIMESTAMP;

-- 使用时间戳检查更新
UPDATE accounts
SET balance = balance - 100, updated_at = NOW()
WHERE id = 1 AND updated_at = '2024-01-01 10:00:00';

-- 检查是否更新成功
IF ROW_COUNT() = 0 THEN
    -- 发生冲突
    ROLLBACK;
END IF;

优点:

  • 没有锁定,更高的并发性
  • 在低并发场景下性能更好
  • 不会导致死锁

缺点:

  • 需要处理冲突
  • 需要额外的列
  • 更复杂的错误处理

行级与表级锁定

行级锁定

锁定单个行,允许同一表中其他行的并发访问。

BEGIN;
-- 锁定特定行
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;

-- 表中的其他行仍然可以访问
SELECT * FROM accounts WHERE id = 2;  -- 这个工作正常
COMMIT;

何时使用:

  • 大多数应用场景
  • 高并发要求
  • 当冲突局限于特定行时

表级锁定

锁定整个表,防止任何并发访问。

BEGIN;
-- 锁定整个表
LOCK TABLE accounts IN EXCLUSIVE MODE;

-- 没有其他事务可以访问这个表
SELECT * FROM accounts WHERE id = 1;  -- 只有这个事务
COMMIT;

锁定模式:

  • ACCESS SHARE: SELECT
  • ROW SHARE: SELECT FOR UPDATE
  • ROW EXCLUSIVE: INSERT, UPDATE, DELETE
  • SHARE UPDATE EXCLUSIVE: VACUUM, ANALYZE
  • SHARE: CREATE INDEX CONCURRENTLY
  • SHARE ROW EXCLUSIVE: LOCK TABLE
  • EXCLUSIVE: REFRESH MATERIALIZED VIEW CONCURRENTLY
  • ACCESS EXCLUSIVE: DROP TABLE, TRUNCATE, ALTER TABLE

何时使用:

  • 批量操作
  • 架构更改
  • 当你需要对所有数据进行独占访问时

保存点

保存点允许你在事务内回滚到一个特定点,而不需要回滚整个事务。

BEGIN;

-- 第一个操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;

-- 第二个操作
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- 如果第二个操作失败,回滚到保存点
-- ROLLBACK TO sp1;

-- 继续其他操作
UPDATE accounts SET balance = balance + 50 WHERE id = 3;

COMMIT;

多个保存点:

BEGIN;

-- 操作1
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;

-- 操作2
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
SAVEPOINT sp2;

-- 如果操作2失败,回滚到sp1
-- ROLLBACK TO sp1;

-- 尝试替代操作2
UPDATE accounts SET balance = balance + 50 WHERE id = 2;

COMMIT;

释放保存点:

BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;

UPDATE accounts SET balance = balance + 100 WHERE id = 2;
RELEASE SAVEPOINT sp1;  -- 保存点被释放

-- 不能再回滚到sp1了
-- ROLLBACK TO sp1;  -- 错误!
COMMIT;

死锁检测和预防

什么是死锁?

当两个或多个事务等待对方的锁定时,就会发生死锁。

事务1:锁定行A,等待行B
事务2:锁定行B,等待行A

死锁示例

-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 锁定行1
-- 等待行2...
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- 事务2(并发)
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;  -- 锁定行2
-- 等待行1...
UPDATE accounts SET balance = balance + 100 WHERE id = 1;

-- 死锁!两个事务都在等待对方

死锁预防

一致的锁定顺序:

-- 总是以相同的顺序锁定行(例如,按ID)
-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 较低ID优先
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

-- 事务2
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 相同顺序
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

锁定超时:

-- 设置锁定超时
SET lock_timeout = '5s';  -- PostgreSQL
-- 或
SET innodb_lock_wait_timeout = 5;  -- MySQL

BEGIN;
-- 如果5秒内无法获取锁定,则引发错误
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
COMMIT;

短事务:

-- 使事务尽可能短
BEGIN;
-- 仅必要的操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;

-- 在事务之外进行处理
-- ...

BEGIN;
-- 最终更新
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

处理死锁

// 应用程序级别的重试逻辑
async function transferMoney(fromId, toId, amount) {
  const maxRetries = 3;
  let attempt = 0;

  while (attempt < maxRetries) {
    try {
      await db.transaction(async (trx) => {
        await trx('accounts')
          .where('id', fromId)
          .update('balance', db.raw('balance - ?', [amount]));

        await trx('accounts')
          .where('id', toId)
          .update('balance', db.raw('balance + ?', [amount]));
      });
      return;  // 成功
    } catch (error) {
      if (error.code === '40P01') {  // 死锁错误代码
        attempt++;
        await sleep(100 * attempt);  // 指数退避
      } else {
        throw error;  // 其他错误
      }
    }
  }

  throw new Error('Max retries exceeded');
}

事务超时和重试

设置超时

PostgreSQL:

-- 语句超时
SET statement_timeout = '30s';

-- 锁定超时
SET lock_timeout = '5s';

-- 事务空闲超时
SET idle_in_transaction_session_timeout = '10min';

MySQL:

-- 锁定等待超时
SET innodb_lock_wait_timeout = 5;

-- 事务只读超时
SET tx_read_only = 1;

重试逻辑

class TransactionRetry {
  constructor(options = {}) {
    this.maxRetries = options.maxRetries || 3;
    this.retryDelay = options.retryDelay || 100;
    this.backoffMultiplier = options.backoffMultiplier || 2;
  }

  async execute(fn) {
    let attempt = 0;
    let lastError;

    while (attempt < this.maxRetries) {
      try {
        return await fn();
      } catch (error) {
        lastError = error;

        if (this.isRetryableError(error)) {
          attempt++;
          const delay = this.retryDelay * Math.pow(this.backoffMultiplier, attempt - 1);
          await this.sleep(delay);
        } else {
          throw error;
        }
      }
    }

    throw lastError;
  }

  isRetryableError(error) {
    const retryableCodes = [
      '40P01',  // 死锁
      '40001',  // 序列化失败
      '08006',  // 连接失败
    ];
    return retryableCodes.includes(error.code);
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 使用
const retry = new TransactionRetry({ maxRetries: 3 });

await retry.execute(async () => {
  await db.transaction(async (trx) => {
    // 事务操作
  });
});

ORM中的事务模式

Prisma

import { PrismaClient } from '@prisma/client';

const prisma = new PrismaClient();

// 简单事务
await prisma.$transaction(async (tx) => {
  await tx.account.update({
    where: { id: 1 },
    data: { balance: { decrement: 100 } },
  });

  await tx.account.update({
    where: { id: 2 },
    data: { balance: { increment: 100 } },
  });
});

// 交互式事务与重试
await prisma.$transaction(
  async (tx) => {
    const account = await tx.account.findUnique({
      where: { id: 1 },
    });

    if (account.balance < 100) {
      throw new Error('Insufficient balance');
    }

    await tx.account.update({
      where: { id: 1 },
      data: { balance: { decrement: 100 } },
    });
  },
  {
    maxWait: 5000,    // 等待事务的最大时间
    timeout: 10000,   // 事务运行的最大时间
    isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
  }
);

TypeORM

import { DataSource, IsolationLevel } from 'typeorm';

const dataSource = new DataSource({ /* ... */ });

// 简单事务
await dataSource.transaction(async (manager) => {
  await manager.update(Account, 1, {
    balance: () => `balance - 100`,
  });

  await manager.update(Account, 2, {
    balance: () => `balance + 100`,
  });
});

// 带选项的事务
await dataSource.transaction({
  isolationLevel: IsolationLevel.REPEATABLE_READ,
}, async (manager) => {
  // 事务操作
});

// 手动事务控制
const queryRunner = dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

try {
  await queryRunner.manager.update(Account, 1, {
    balance: () => `balance - 100`,
  });

  await queryRunner.manager.update(Account, 2, {
    balance: () => `balance + 100`,
  });

  await queryRunner.commitTransaction();
} catch (error) {
  await queryRunner.rollbackTransaction();
  throw error;
} finally {
  await queryRunner.release();
}

SQLAlchemy (Python)

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session

engine = create_engine('postgresql://user:pass@localhost/db')
Session = sessionmaker(bind=engine)

# 简单事务
with Session() as session:
    try:
        session.execute(
            text("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
        )
        session.execute(
            text("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
        )
        session.commit()
    except Exception as e:
        session.rollback()
        raise

# 带隔离级别的事务
with Session() as session:
    session.execute(
        text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
    )
    session.begin()
    try:
        # 事务操作
        session.commit()
    except Exception as e:
        session.rollback()
        raise

# 嵌套事务(保存点)
with Session() as session:
    session.begin_nested()
    try:
        # 嵌套操作
        session.commit()
    except Exception as e:
        session.rollback()
        raise

最佳实践

  1. 事务设计

    • 使事务尽可能短
    • 只包括必要的操作
    • 避免在事务中进行用户交互
    • 使用适当的隔离级别
  2. 错误处理

    • 总是处理事务错误
    • 为瞬态失败实现重试逻辑
    • 记录事务失败以进行调试
    • 出错时回滚
  3. 性能

    • 尽可能使用行级锁
    • 避免长时间运行的事务
    • 监控锁定争用
    • 使用连接池
  4. 测试

    • 测试事务回滚场景
    • 测试并发访问
    • 测试死锁处理
    • 测试隔离级别
  5. 分布式事务

    • 考虑为微服务使用Saga模式
    • 仅在必要时使用2PC
    • 了解性能影响
    • 准备补偿逻辑

相关技能