数据库事务
概览
数据库事务是维护应用程序数据完整性和一致性的基础。它们提供了一种将多个操作组合成一个工作单元的方式,该工作单元要么完全成功,要么完全失败,遵循ACID原则。
前提条件
- 理解SQL和数据库操作
- 了解数据库系统(PostgreSQL、MySQL等)
- 熟悉并发概念
- 基本了解分布式系统
核心概念
数据库事务是什么?
数据库事务是:
- 原子性:所有操作全部成功或全部不做
- 一致性:数据库从有效状态转移到另一个有效状态
- 隔离性:事务之间不互相干扰
- 持久性:提交的更改持久保存
ACID属性
原子性
原子性确保事务中的所有操作被视为一个单一单元。要么所有操作成功,要么都不成功。
-- 示例:账户间转账
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT; -- 两个更新要么都成功,要么都失败
如果第二个UPDATE失败,则第一个UPDATE自动回滚。
一致性
一致性确保事务将数据库从一个有效状态带到另一个有效状态,维护所有数据库规则和约束。
-- 示例:确保业务规则
BEGIN TRANSACTION;
-- 检查是否有足够的余额
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
-- 如果余额 < 100,则ROLLBACK
-- 执行转账
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 验证没有负余额
SELECT COUNT(*) FROM accounts WHERE balance < 0;
-- 如果计数 > 0,则ROLLBACK
COMMIT;
隔离性
隔离性确保并发事务不会相互干扰。每个事务看到一个一致的数据库快照。
-- 事务1
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1; -- 读取$1000
-- 事务2(并发)
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance + 500 WHERE id = 1;
COMMIT; -- 更新到$1500
-- 事务1继续
SELECT balance FROM accounts WHERE id = 1; -- 仍然读取$1000
COMMIT;
持久性
持久性确保一旦事务被提交,它就是永久的,即使在系统故障的情况下也是如此。
BEGIN TRANSACTION;
INSERT INTO orders (user_id, total) VALUES (1, 100.00);
COMMIT; -- 数据被永久存储
-- 即使服务器在此之后崩溃,数据也被保留
实施指南
事务生命周期
BEGIN
开始一个新的事务。
BEGIN;
-- 或
BEGIN TRANSACTION;
-- 或
START TRANSACTION;
COMMIT
永久保存事务中所做的所有更改。
BEGIN;
UPDATE users SET email = 'new@example.com' WHERE id = 1;
COMMIT; -- 更改现在是永久的
ROLLBACK
撤销事务中所做的所有更改。
BEGIN;
UPDATE users SET email = 'new@example.com' WHERE id = 1;
ROLLBACK; -- 更改被丢弃
事务生命周期示例
-- 1. 开始事务
BEGIN;
-- 2. 执行操作
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 1;
INSERT INTO orders (product_id, quantity) VALUES (1, 1);
-- 3. 检查条件
IF (SELECT quantity FROM inventory WHERE product_id = 1) < 0 THEN
ROLLBACK; -- 库存不足
ELSE
COMMIT; -- 成功
END IF;
隔离级别
READ UNCOMMITTED
最低的隔离级别。事务可以从其他事务中读取未提交的更改。
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- 可以读取未提交的数据
COMMIT;
特点:
- 允许脏读
- 允许不可重复读
- 允许幻读
- 并发性最高,一致性最低
何时使用:
- 实际中很少使用
- 仅当性能至关重要且可以接受脏读时
READ COMMITTED
PostgreSQL和SQL Server中的默认隔离级别。事务只能读取已提交的更改。
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- 只读取已提交的数据
COMMIT;
特点:
- 防止脏读
- 允许不可重复读
- 允许幻读
- 一致性和性能之间的良好平衡
何时使用:
- 大多数应用程序
- 当你需要看到其他事务的已提交更改时
REPEATABLE READ
MySQL中的默认隔离级别。事务从事务开始时看到一个一致的快照。
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- 一致的快照
-- 即使其他事务提交更改,你也不会看到它们
COMMIT;
特点:
- 防止脏读
- 防止不可重复读
- 允许幻读(在MySQL中,防止幻读)
- 适用于报告和分析
何时使用:
- 当你需要在事务中一致地读取时
- 报告查询
- 当不可重复读会导致问题时
SERIALIZABLE
最高的隔离级别。事务完全隔离于彼此。
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- 完全隔离
COMMIT;
特点:
- 防止脏读
- 防止不可重复读
- 防止幻读
- 并发性最低,一致性最高
- 可能会导致序列化失败
何时使用:
- 重要的财务操作
- 当绝对一致性需要时
- 当幻读会导致问题时
锁定机制
悲观锁定
在访问资源之前锁定资源,假设会发生冲突。
-- SELECT FOR UPDATE - 锁定用于更新的行
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 此行现在被锁定,其他事务必须等待
-- 进行一些处理
-- ...
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT; -- 锁定被释放
SELECT FOR SHARE (PostgreSQL) / SELECT FOR SHARE (MySQL):
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
-- 其他事务也可以使用FOR SHARE读取
-- 但在当前事务提交之前不能更新
COMMIT;
优点:
- 保证数据一致性
- 易于理解
- 适用于高并发场景
缺点:
- 可能导致死锁
- 降低并发性
- 可能会导致性能问题
乐观锁定
假设冲突很少,并在提交前检查冲突。
-- 添加版本列
ALTER TABLE accounts ADD COLUMN version INT DEFAULT 0;
-- 使用版本检查更新
UPDATE accounts
SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 5;
-- 检查是否更新成功
IF ROW_COUNT() = 0 THEN
-- 发生冲突,处理它
ROLLBACK;
END IF;
使用时间戳:
-- 添加updated_at列
ALTER TABLE accounts ADD COLUMN updated_at TIMESTAMP;
-- 使用时间戳检查更新
UPDATE accounts
SET balance = balance - 100, updated_at = NOW()
WHERE id = 1 AND updated_at = '2024-01-01 10:00:00';
-- 检查是否更新成功
IF ROW_COUNT() = 0 THEN
-- 发生冲突
ROLLBACK;
END IF;
优点:
- 没有锁定,更高的并发性
- 在低并发场景下性能更好
- 不会导致死锁
缺点:
- 需要处理冲突
- 需要额外的列
- 更复杂的错误处理
行级与表级锁定
行级锁定
锁定单个行,允许同一表中其他行的并发访问。
BEGIN;
-- 锁定特定行
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 表中的其他行仍然可以访问
SELECT * FROM accounts WHERE id = 2; -- 这个工作正常
COMMIT;
何时使用:
- 大多数应用场景
- 高并发要求
- 当冲突局限于特定行时
表级锁定
锁定整个表,防止任何并发访问。
BEGIN;
-- 锁定整个表
LOCK TABLE accounts IN EXCLUSIVE MODE;
-- 没有其他事务可以访问这个表
SELECT * FROM accounts WHERE id = 1; -- 只有这个事务
COMMIT;
锁定模式:
ACCESS SHARE: SELECTROW SHARE: SELECT FOR UPDATEROW EXCLUSIVE: INSERT, UPDATE, DELETESHARE UPDATE EXCLUSIVE: VACUUM, ANALYZESHARE: CREATE INDEX CONCURRENTLYSHARE ROW EXCLUSIVE: LOCK TABLEEXCLUSIVE: REFRESH MATERIALIZED VIEW CONCURRENTLYACCESS EXCLUSIVE: DROP TABLE, TRUNCATE, ALTER TABLE
何时使用:
- 批量操作
- 架构更改
- 当你需要对所有数据进行独占访问时
保存点
保存点允许你在事务内回滚到一个特定点,而不需要回滚整个事务。
BEGIN;
-- 第一个操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;
-- 第二个操作
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 如果第二个操作失败,回滚到保存点
-- ROLLBACK TO sp1;
-- 继续其他操作
UPDATE accounts SET balance = balance + 50 WHERE id = 3;
COMMIT;
多个保存点:
BEGIN;
-- 操作1
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;
-- 操作2
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
SAVEPOINT sp2;
-- 如果操作2失败,回滚到sp1
-- ROLLBACK TO sp1;
-- 尝试替代操作2
UPDATE accounts SET balance = balance + 50 WHERE id = 2;
COMMIT;
释放保存点:
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT sp1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
RELEASE SAVEPOINT sp1; -- 保存点被释放
-- 不能再回滚到sp1了
-- ROLLBACK TO sp1; -- 错误!
COMMIT;
死锁检测和预防
什么是死锁?
当两个或多个事务等待对方的锁定时,就会发生死锁。
事务1:锁定行A,等待行B
事务2:锁定行B,等待行A
死锁示例
-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 锁定行1
-- 等待行2...
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 事务2(并发)
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 2; -- 锁定行2
-- 等待行1...
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
-- 死锁!两个事务都在等待对方
死锁预防
一致的锁定顺序:
-- 总是以相同的顺序锁定行(例如,按ID)
-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 较低ID优先
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
-- 事务2
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 相同顺序
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
锁定超时:
-- 设置锁定超时
SET lock_timeout = '5s'; -- PostgreSQL
-- 或
SET innodb_lock_wait_timeout = 5; -- MySQL
BEGIN;
-- 如果5秒内无法获取锁定,则引发错误
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
COMMIT;
短事务:
-- 使事务尽可能短
BEGIN;
-- 仅必要的操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- 在事务之外进行处理
-- ...
BEGIN;
-- 最终更新
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
处理死锁
// 应用程序级别的重试逻辑
async function transferMoney(fromId, toId, amount) {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await db.transaction(async (trx) => {
await trx('accounts')
.where('id', fromId)
.update('balance', db.raw('balance - ?', [amount]));
await trx('accounts')
.where('id', toId)
.update('balance', db.raw('balance + ?', [amount]));
});
return; // 成功
} catch (error) {
if (error.code === '40P01') { // 死锁错误代码
attempt++;
await sleep(100 * attempt); // 指数退避
} else {
throw error; // 其他错误
}
}
}
throw new Error('Max retries exceeded');
}
事务超时和重试
设置超时
PostgreSQL:
-- 语句超时
SET statement_timeout = '30s';
-- 锁定超时
SET lock_timeout = '5s';
-- 事务空闲超时
SET idle_in_transaction_session_timeout = '10min';
MySQL:
-- 锁定等待超时
SET innodb_lock_wait_timeout = 5;
-- 事务只读超时
SET tx_read_only = 1;
重试逻辑
class TransactionRetry {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 100;
this.backoffMultiplier = options.backoffMultiplier || 2;
}
async execute(fn) {
let attempt = 0;
let lastError;
while (attempt < this.maxRetries) {
try {
return await fn();
} catch (error) {
lastError = error;
if (this.isRetryableError(error)) {
attempt++;
const delay = this.retryDelay * Math.pow(this.backoffMultiplier, attempt - 1);
await this.sleep(delay);
} else {
throw error;
}
}
}
throw lastError;
}
isRetryableError(error) {
const retryableCodes = [
'40P01', // 死锁
'40001', // 序列化失败
'08006', // 连接失败
];
return retryableCodes.includes(error.code);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用
const retry = new TransactionRetry({ maxRetries: 3 });
await retry.execute(async () => {
await db.transaction(async (trx) => {
// 事务操作
});
});
ORM中的事务模式
Prisma
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
// 简单事务
await prisma.$transaction(async (tx) => {
await tx.account.update({
where: { id: 1 },
data: { balance: { decrement: 100 } },
});
await tx.account.update({
where: { id: 2 },
data: { balance: { increment: 100 } },
});
});
// 交互式事务与重试
await prisma.$transaction(
async (tx) => {
const account = await tx.account.findUnique({
where: { id: 1 },
});
if (account.balance < 100) {
throw new Error('Insufficient balance');
}
await tx.account.update({
where: { id: 1 },
data: { balance: { decrement: 100 } },
});
},
{
maxWait: 5000, // 等待事务的最大时间
timeout: 10000, // 事务运行的最大时间
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
}
);
TypeORM
import { DataSource, IsolationLevel } from 'typeorm';
const dataSource = new DataSource({ /* ... */ });
// 简单事务
await dataSource.transaction(async (manager) => {
await manager.update(Account, 1, {
balance: () => `balance - 100`,
});
await manager.update(Account, 2, {
balance: () => `balance + 100`,
});
});
// 带选项的事务
await dataSource.transaction({
isolationLevel: IsolationLevel.REPEATABLE_READ,
}, async (manager) => {
// 事务操作
});
// 手动事务控制
const queryRunner = dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
await queryRunner.manager.update(Account, 1, {
balance: () => `balance - 100`,
});
await queryRunner.manager.update(Account, 2, {
balance: () => `balance + 100`,
});
await queryRunner.commitTransaction();
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
SQLAlchemy (Python)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
engine = create_engine('postgresql://user:pass@localhost/db')
Session = sessionmaker(bind=engine)
# 简单事务
with Session() as session:
try:
session.execute(
text("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
)
session.execute(
text("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
)
session.commit()
except Exception as e:
session.rollback()
raise
# 带隔离级别的事务
with Session() as session:
session.execute(
text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
)
session.begin()
try:
# 事务操作
session.commit()
except Exception as e:
session.rollback()
raise
# 嵌套事务(保存点)
with Session() as session:
session.begin_nested()
try:
# 嵌套操作
session.commit()
except Exception as e:
session.rollback()
raise
最佳实践
-
事务设计
- 使事务尽可能短
- 只包括必要的操作
- 避免在事务中进行用户交互
- 使用适当的隔离级别
-
错误处理
- 总是处理事务错误
- 为瞬态失败实现重试逻辑
- 记录事务失败以进行调试
- 出错时回滚
-
性能
- 尽可能使用行级锁
- 避免长时间运行的事务
- 监控锁定争用
- 使用连接池
-
测试
- 测试事务回滚场景
- 测试并发访问
- 测试死锁处理
- 测试隔离级别
-
分布式事务
- 考虑为微服务使用Saga模式
- 仅在必要时使用2PC
- 了解性能影响
- 准备补偿逻辑