事务管理
概览
实现具有ACID合规性、并发控制和错误处理的健壮事务管理。涵盖隔离级别、锁定策略和死锁解决。
使用场景
- ACID事务实现
- 并发数据修改处理
- 隔离级别选择
- 死锁预防和解决
- 事务超时配置
- 分布式事务协调
- 金融交易安全
事务基础
PostgreSQL事务
简单事务:
-- 开始事务
BEGIN;
-- 多个语句
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 提交更改
COMMIT;
-- 或回滚
ROLLBACK;
带错误处理的事务:
BEGIN;
-- 部分回滚的保存点
SAVEPOINT sp1;
UPDATE accounts SET balance = balance - 50 WHERE id = 1;
-- 如果检测到错误
IF (SELECT balance FROM accounts WHERE id = 1) < 0 THEN
ROLLBACK TO sp1;
-- 处理负余额
END IF;
COMMIT;
MySQL事务
MySQL事务:
-- 开始事务
START TRANSACTION;
-- 或
BEGIN;
-- 语句
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 提交
COMMIT;
-- 或回滚
ROLLBACK;
MySQL保存点:
START TRANSACTION;
INSERT INTO orders (user_id, total) VALUES (123, 99.99);
SAVEPOINT after_insert;
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 456;
-- 如果库存检查失败
IF (SELECT quantity FROM inventory WHERE product_id = 456) < 0 THEN
ROLLBACK TO after_insert;
END IF;
COMMIT;
隔离级别
PostgreSQL隔离级别
读未提交(未完全实现):
-- PostgreSQL视为READ COMMITTED
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
BEGIN;
-- 可以从其他事务读取未提交的更改
SELECT COUNT(*) FROM orders WHERE user_id = 123;
COMMIT;
读已提交(默认):
-- 默认PostgreSQL隔离级别
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
BEGIN;
-- 只读取已提交的数据
-- 允许幻读和不可重复读
SELECT * FROM accounts WHERE id = 1;
-- 如果其他事务修改行,可能会看到不同的数据
SELECT * FROM accounts WHERE id = 1;
COMMIT;
可重复读:
-- 更高的隔离级别
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN;
-- 事务开始时的数据快照
SELECT COUNT(*) as count_1 FROM orders;
-- 其他事务插入订单
-- 仍然看到相同的计数
SELECT COUNT(*) as count_2 FROM orders;
COMMIT;
可串行化:
-- 最高的隔离级别
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
-- 事务像串行执行
-- 防止所有异常(可能会发生序列化失败)
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT; -- 可能会因序列化失败错误而失败
MySQL隔离级别
MySQL隔离级别配置:
-- 查看当前隔离级别
SHOW VARIABLES LIKE 'transaction_isolation';
-- 设置当前会话
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 设置所有新连接
SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 设置特定事务
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
START TRANSACTION;
-- 语句
COMMIT;
隔离级别比较:
-- READ UNCOMMITTED(可能发生脏读)
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
-- READ COMMITTED(重复读、幻读可能)
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- REPEATABLE READ(幻读可能,MySQL默认)
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- SERIALIZABLE(无异常)
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
锁定策略
PostgreSQL显式锁定
行级锁定:
-- FOR UPDATE:排他锁用于更新
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 其他事务不能UPDATE/DELETE/SELECT FOR UPDATE此行
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- FOR SHARE:共享锁
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
-- 其他事务可以SELECT FOR SHARE但不能FOR UPDATE
COMMIT;
-- FOR UPDATE NOWAIT:如果锁定则出错而不是等待
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;
EXCEPTION WHEN OTHERS THEN
-- 行被锁定
END;
COMMIT;
表级锁定:
-- 独占表锁
LOCK TABLE accounts IN EXCLUSIVE MODE;
-- 没有其他事务可以访问表
-- 共享锁
LOCK TABLE accounts IN SHARE MODE;
-- 其他事务可以读取但不能写入
-- 独占用于用户访问
LOCK TABLE accounts IN ACCESS EXCLUSIVE MODE;
MySQL锁定
行级锁定:
-- UPDATE/DELETE上的隐式锁定
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 行锁定直到事务结束
COMMIT;
-- SELECT FOR UPDATE:显式锁定
START TRANSACTION;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 获得排他锁
UPDATE accounts SET balance = 100 WHERE id = 1;
COMMIT;
-- SELECT FOR SHARE:读锁
START TRANSACTION;
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
-- 共享锁(阻止FOR UPDATE)
COMMIT;
间隙锁定(InnoDB):
-- InnoDB锁定行之间的间隙
START TRANSACTION;
-- 锁定id在1和100之间的行和间隙
SELECT * FROM products WHERE id BETWEEN 1 AND 100 FOR UPDATE;
-- 防止范围内的幻行
COMMIT;
并发控制
乐观并发
PostgreSQL与版本号:
-- 添加版本列
ALTER TABLE accounts ADD COLUMN version INT DEFAULT 1;
-- 带有版本检查的更新
UPDATE accounts
SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 5;
-- 应用程序检查受影响的行
-- 如果0行更新,则版本不匹配(重试)
PostgreSQL与时间戳:
-- 添加最后修改时间戳
ALTER TABLE accounts ADD COLUMN updated_at TIMESTAMP DEFAULT NOW();
-- 带有时间戳验证的更新
UPDATE accounts
SET balance = balance - 100, updated_at = NOW()
WHERE id = 1 AND updated_at = '2024-01-15 10:00:00';
-- 如果没有行更新,则数据已被另一事务修改
悲观并发
PostgreSQL - 锁定并修改:
BEGIN;
-- 修改前的锁定行
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 安全修改(其他事务不能更新)
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
死锁预防
PostgreSQL - 死锁检测:
-- PostgreSQL自动检测死锁
-- 杀死一个事务并引发错误
-- 示例死锁场景
-- 事务1:锁定A,然后尝试锁定B
-- 事务2:锁定B,然后尝试锁定A
-- 结果:一个事务回滚,出现死锁错误
-- 重试逻辑
DO $$
DECLARE
retry_count INT := 0;
BEGIN
LOOP
BEGIN
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
EXIT;
EXCEPTION WHEN deadlocked_table THEN
ROLLBACK;
retry_count := retry_count + 1;
IF retry_count > 3 THEN
RAISE;
END IF;
-- 重试前等待
PERFORM pg_sleep(0.1);
END;
END LOOP;
END $$;
MySQL - 死锁预防:
-- 通过一致的锁定顺序预防死锁
-- 总是先锁定表1 id=1,然后锁定表2 id=2
START TRANSACTION;
-- 总是先锁定账户1,然后锁定账户2
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
SELECT * FROM accounts WHERE id = 2 FOR UPDATE;
-- 安全顺序预防死锁
COMMIT;
死锁恢复处理:
// 应用程序级死锁重试(Node.js)
async function transferMoney(fromId, toId, amount, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
await db.query('BEGIN');
await db.query(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2 FOR UPDATE',
[amount, fromId]
);
await db.query(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2 FOR UPDATE',
[amount, toId]
);
await db.query('COMMIT');
return { success: true };
} catch (error) {
if (error.code === '40P01') { // 检测到死锁
await db.query('ROLLBACK');
if (i === retries - 1) throw error;
// 指数退避
await new Promise(r => setTimeout(r, 100 * Math.pow(2, i)));
} else {
throw error;
}
}
}
}
分布式事务
两阶段提交模式:
-- 准备阶段:获取锁,验证
BEGIN;
SAVEPOINT prepare_phase;
-- 在两个数据库上准备写入
INSERT INTO account_shadow SELECT * FROM accounts WHERE id = 1;
-- 检查两个数据库是否准备就绪
-- 如果任何一个失败,ROLLBACK TO prepare_phase
-- 提交阶段:最终确定
RELEASE SAVEPOINT prepare_phase;
COMMIT;
最终一致性模式:
// 跨服务的异步事务
async function transferAcrossServices(fromId, toId, amount) {
// 1. 从第一服务借记(事务性)
await service1.debit(fromId, amount);
// 2. 队列第二服务的贷记(可靠的队列)
await queue.publish({
type: 'credit',
toId,
amount,
requestId: uuid()
});
// 3. 服务2异步处理
queue.subscribe('credit', async (msg) => {
try {
await service2.credit(msg.toId, msg.amount);
await queue.ack(msg.requestId);
} catch (error) {
// 重试机制
await queue.retry(msg.requestId);
}
});
}
事务监控
PostgreSQL - 活跃事务:
-- 查看活跃事务
SELECT
pid,
usename,
application_name,
state,
query,
query_start,
xact_start
FROM pg_stat_activity
WHERE state != 'idle'
ORDER BY query_start DESC;
-- 查看事务锁
SELECT
l.locktype,
l.relation::regclass,
l.mode,
l.granted,
a.usename,
a.query
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE NOT l.granted;
MySQL - 活跃事务:
-- 查看活跃事务
SELECT *
FROM INFORMATION_SCHEMA.INNODB_TRX
ORDER BY trx_started DESC;
-- 查看锁
SELECT *
FROM INFORMATION_SCHEMA.INNODB_LOCKS;
-- 终止长时间运行的事务
KILL QUERY process_id;
KILL CONNECTION process_id;
最佳实践
✅ 根据用例使用适当的隔离级别 ✅ 保持事务简短 ✅ 频繁提交 ✅ 处理事务错误 ✅ 使用一致的锁定顺序 ✅ 监控事务性能 ✅ 文档化事务需求
❌ 不要在用户输入期间持有事务 ❌ 不要在高并发系统中使用SERIALIZABLE ❌ 不要忽略死锁错误 ❌ 不要锁定太多行 ❌ 不要对关键数据使用READ UNCOMMITTED