数据库锁定策略
概述
数据库锁定是用于管理共享数据并发访问的机制。它通过阻止多个事务同时以可能造成不一致的方式修改相同数据来确保数据的完整性。
前置条件
- 理解数据库事务和ACID属性
- 了解SQL和数据库操作
- 熟悉并发编程概念
- 基本理解隔离级别
关键概念
为什么需要锁定(并发控制)
问题
没有锁定,事务并发可能导致:
- 丢失更新:两个事务读取并更新同一个值,一个更新丢失
- 脏读:读取另一个事务中未提交的更改
- 不可重复读:同一事务内相同查询返回不同结果
- 幻读:后续查询中出现新行
示例:丢失更新
-- 事务1
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- 读取1000
-- ...处理...
UPDATE accounts SET balance = 900 WHERE id = 1; -- 1000 - 100
-- 事务2(并发)
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- 读取1000
UPDATE accounts SET balance = 1100 WHERE id = 1; -- 1000 + 100
COMMIT;
-- 事务1继续
COMMIT; -- 最终余额是1100,丢失了-100的更新!
解决方案:锁定
-- 事务1
BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE; -- 锁定行
-- ...处理...
UPDATE accounts SET balance = 900 WHERE id = 1;
COMMIT; -- 释放锁
-- 事务2等待锁定
BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE; -- 等待...
-- 事务1提交,现在可以继续
SELECT balance FROM accounts WHERE id = 1; -- 读取900
UPDATE accounts SET balance = 1000 WHERE id = 1;
COMMIT; -- 最终余额是1000,正确!
锁定类型
共享锁定(读锁定)
允许多个事务读取数据但阻止写入。
-- 事务1
BEGIN;
SELECT * FROM products WHERE id = 1 FOR SHARE; -- 共享锁定
-- 其他事务也可以使用FOR SHARE读取
-- 但在此次事务提交之前不能更新
COMMIT;
特性:
- 多个读取者可以持有共享锁定
- 写入者被阻塞
- 用于稍后可能更新的读取数据
独占锁定(写锁定)
阻止任何其他事务读取或写入。
-- 事务1
BEGIN;
SELECT * FROM products WHERE id = 1 FOR UPDATE; -- 独占锁定
-- 其他事务不能读取或更新此行
COMMIT;
特性:
- 只有一个事务可以持有独占锁定
- 阻塞所有其他事务
- 用于更新数据时
意向锁定
表明更细粒度上获取锁定的意图。
-- 事务1
BEGIN;
-- 意向锁定表
LOCK TABLE products IN SHARE MODE;
-- 然后锁定特定行
SELECT * FROM products WHERE id = 1 FOR UPDATE;
COMMIT;
类型:
- 意向共享(IS):意图在行上获取共享锁定
- 意向独占(IX):意图在行上获取独占锁定
- 共享(S):整个表的共享锁定
- 独占(X):整个表的独占锁定
锁定兼容性矩阵:
| 锁定类型 | IS | IX | S | X |
|---|---|---|---|---|
| IS | ✓ | ✓ | ✓ | ✗ |
| IX | ✓ | ✓ | ✗ | ✗ |
| S | ✓ | ✗ | ✓ | ✗ |
| X | ✗ | ✗ | ✗ | ✗ |
更新锁定
一种特殊锁定,允许其他共享锁定但在更新时转换为独占。
-- PostgreSQL自动使用更新锁定
BEGIN;
SELECT * FROM products WHERE id = 1 FOR UPDATE;
-- 在读取时,允许其他FOR SHARE
-- 当更新时,转换为独占
UPDATE products SET stock = stock - 1 WHERE id = 1;
COMMIT;
锁定粒度
行级锁定
锁定单个行,允许其他行并发访问。
-- 事务1
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE; -- 仅锁定行1
-- 其他事务可以访问其他行
SELECT * FROM accounts WHERE id = 2 FOR UPDATE; -- 这个有效
COMMIT;
优点:
- 高并发
- 细粒度控制
- 适用于OLTP工作负载
缺点:
- 更多锁定管理开销
- 可能导致死锁
- 对批量操作可能不高效
页面级锁定
锁定数据库页面(通常是8KB-16KB)。
-- 一些数据库自动使用页面级锁定
-- 当你锁定一行时,包含它的整个页面被锁定
-- 这会影响同一页面上的多行
优点:
- 比行级锁定开销小
- 适用于顺序访问模式
缺点:
- 并发性低于行级
- 可能会锁定不需要的行
表级锁定
锁定整个表。
-- 事务1
BEGIN;
LOCK TABLE accounts IN EXCLUSIVE MODE; -- 锁定整个表
-- 没有其他事务可以访问此表
COMMIT;
优点:
- 易于理解
- 适用于批量操作
- 最小锁定管理开销
缺点:
- 并发性非常低
- 阻止对表的所有访问
锁定模式:
ACCESS SHARE: SELECTROW SHARE: SELECT FOR UPDATEROW EXCLUSIVE: INSERT, UPDATE, DELETESHARE UPDATE EXCLUSIVE: VACUUM, ANALYZESHARE: CREATE INDEX CONCURRENTLYSHARE ROW EXCLUSIVE: LOCK TABLEEXCLUSIVE: REFRESH MATERIALIZED VIEWACCESS EXCLUSIVE: DROP TABLE, TRUNCATE
数据库级锁定
锁定整个数据库。
-- PostgreSQL
BEGIN;
LOCK DATABASE mydb IN ACCESS EXCLUSIVE MODE;
-- 没有其他会话可以连接到这个数据库
COMMIT;
用例:
- 数据库维护
- 模式更改
- 备份操作
实施指南
悲观锁定
SELECT FOR UPDATE
锁定行以更新。
// Node.js与PostgreSQL
async function transferMoney(fromId, toId, amount) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 锁定两个账户
const [fromAccount] = await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[fromId]
);
const [toAccount] = await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[toId]
);
// 检查余额
if (fromAccount.rows[0].balance < amount) {
throw new Error('余额不足');
}
// 执行转账
await client.query(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
[amount, fromId]
);
await client.query(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
[amount, toId]
);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
SELECT FOR SHARE
允许多个读取者但阻止写入者。
async function getProductStock(productId) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 锁定共享 - 多个读取者可以
const result = await client.query(
'SELECT * FROM products WHERE id = $1 FOR SHARE',
[productId]
);
const product = result.rows[0];
// 检查库存
if (product.stock < 1) {
await client.query('ROLLBACK');
throw new Error('库存不足');
}
// 更新库存
await client.query(
'UPDATE products SET stock = stock - 1 WHERE id = $1',
[productId]
);
await client.query('COMMIT');
return product;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
锁定超时
async function transferWithTimeout(fromId, toId, amount) {
const client = await pool.connect();
try {
// 设置锁定超时为5秒
await client.query('SET lock_timeout = 5000');
await client.query('BEGIN');
try {
const [fromAccount] = await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[fromId]
);
const [toAccount] = await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[toId]
);
// ...转账逻辑...
await client.query('COMMIT');
} catch (error) {
if (error.code === '55P03') { // 锁定不可用
throw new Error('无法获取锁定,请重试');
}
throw error;
}
} finally {
client.release();
}
}
乐观锁定
版本列
添加版本列以跟踪更改。
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
balance DECIMAL(10,2),
version INT DEFAULT 0
);
async function updateAccount(accountId, newBalance) {
let retries = 0;
const maxRetries = 3;
while (retries < maxRetries) {
// 读取当前版本
const [result] = await pool.query(
'SELECT * FROM accounts WHERE id = $1',
[accountId]
);
const account = result.rows[0];
// 尝试使用版本检查更新
const updateResult = await pool.query(
'UPDATE accounts SET balance = $1, version = version + 1 WHERE id = $2 AND version = $3',
[newBalance, accountId, account.version]
);
if (updateResult.rowCount > 0) {
// 成功
return updateResult.rows[0];
}
// 版本不匹配 - 重试
retries++;
await sleep(100 * retries); // 指数退避
}
throw new Error('超出最大重试次数');
}
时间戳列
使用时间戳进行乐观锁定。
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
balance DECIMAL(10,2),
updated_at TIMESTAMP DEFAULT NOW()
);
async function updateAccount(accountId, newBalance) {
// 读取当前时间戳
const [result] = await pool.query(
'SELECT * FROM accounts WHERE id = $1',
[accountId]
);
const account = result.rows[0];
// 尝试使用时间戳检查更新
const updateResult = await pool.query(
'UPDATE accounts SET balance = $1, updated_at = NOW() WHERE id = $2 AND updated_at = $3',
[newBalance, accountId, account.updated_at]
);
if (updateResult.rowCount === 0) {
throw new Error('账户被另一个事务修改');
}
return updateResult.rows[0];
}
条件更新
使用WHERE子句检查当前状态。
async function decrementStock(productId, quantity) {
const result = await pool.query(
'UPDATE products SET stock = stock - $1 WHERE id = $2 AND stock >= $1',
[quantity, productId]
);
if (result.rowCount === 0) {
throw new Error('库存不足');
}
return result.rows[0];
}
死锁
检测
数据库自动检测死锁并选择一个受害者进行回滚。
-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 锁定行1
-- 等待行2...
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 事务2(并发)
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 2; -- 锁定行2
-- 等待行1...
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
-- 死锁!数据库检测并回滚一个事务
预防
一致锁定顺序:
// 总是以相同的顺序锁定行(例如,按ID)
async function transferMoney(fromId, toId, amount) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 总是先锁定较低的ID
const [firstId, secondId] = fromId < toId
? [fromId, toId]
: [toId, fromId];
await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[firstId]
);
await client.query(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
[secondId]
);
// ...转账逻辑...
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
短事务:
// 保持事务短以减少死锁窗口
async function transferMoney(fromId, toId, amount) {
// 验证前事务
const fromAccount = await pool.query(
'SELECT * FROM accounts WHERE id = $1',
[fromId]
);
if (fromAccount.rows[0].balance < amount) {
throw new Error('余额不足');
}
// 短事务用于实际转账
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
[amount, fromId]
);
await client.query(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
[amount, toId]
);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
解决方案
async function executeWithRetry(fn, maxRetries = 3) {
let attempt = 0;
while (attempt < maxRetries) {
try {
return await fn();
} catch (error) {
if (error.code === '40P01') { // 检测到死锁
attempt++;
const delay = 100 * Math.pow(2, attempt); // 指数退避
await sleep(delay);
} else {
throw error;
}
}
}
throw new Error('由于死锁超出最大重试次数');
}
// 使用
await executeWithRetry(async () => {
await transferMoney(1, 2, 100);
});
建议锁定
PostgreSQL建议锁定
应用程序级锁定,不与特定行绑定。
-- 获取建议锁定
SELECT pg_advisory_lock(12345); -- 如果获取成功返回真
-- 检查锁定是否被持有
SELECT pg_advisory_lock_shared(12345);
-- 释放锁定
SELECT pg_advisory_unlock(12345);
// 使用建议锁定进行分布式任务
async function processTask(taskId) {
const lockId = hashTaskId(taskId);
// 尝试获取锁定
const [result] = await pool.query(
'SELECT pg_try_advisory_lock($1) AS acquired',
[lockId]
);
if (!result.rows[0].acquired) {
throw new Error('任务正在被处理');
}
try {
// 处理任务
await processTaskLogic(taskId);
} finally {
// 释放锁定
await pool.query('SELECT pg_advisory_unlock($1)', [lockId]);
}
}
MySQL GET_LOCK
-- 获取命名锁定
SELECT GET_LOCK('my_lock', 10); -- 10秒超时
-- 检查锁定是否被持有
SELECT IS_FREE_LOCK('my_lock');
-- 释放锁定
SELECT RELEASE_LOCK('my_lock');
// 使用命名锁定进行分布式协调
async function processJob(jobId) {
const lockName = `job_${jobId}`;
// 尝试获取10秒超时的锁定
const [result] = await pool.query(
'SELECT GET_LOCK(?, 10) AS acquired',
[lockName]
);
if (result[0].acquired === 0) {
throw new Error('工作正在被处理');
}
try {
// 处理工作
await processJobLogic(jobId);
} finally {
// 释放锁定
await pool.query('SELECT RELEASE_LOCK(?)', [lockName]);
}
}
分布式锁定
Redis分布式锁定
const Redis = require('ioredis');
const crypto = require('crypto');
class DistributedLock {
constructor(redis, key, options = {}) {
this.redis = redis;
this.key = `lock:${key}`;
this.ttl = options.ttl || 30000; // 30秒
this.value = crypto.randomBytes(16).toString('hex');
}
async acquire() {
// 尝试使用SET NX(仅如果不存在)获取锁定
const acquired = await this.redis.set(
this.key,
this.value,
'PX', this.ttl, // TTL后过期
'NX' // 仅如果不存在则设置
);
return acquired === 'OK';
}
async release() {
// 仅如果我们拥有锁定
const script = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`;
await this.redis.eval(script, 1, this.key, this.value);
}
async extend(newTtl) {
// 延长锁定TTL
const script = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('PEXPIRE', KEYS[1], ARGV[2])
else
return 0
end
`;
const result = await this.redis.eval(
script,
1,
this.key,
this.value,
newTtl
);
return result === 1;
}
}
// 使用
const redis = new Redis();
const lock = new DistributedLock(redis, 'resource_123', { ttl: 30000 });
if (await lock.acquire()) {
try {
// 临界区
await processResource(123);
} finally {
await lock.release();
}
} else {
throw new Error('无法获取锁定');
}
ZooKeeper分布式锁定
const ZooKeeper = require('zookeeper');
class ZKLock {
constructor(zk, path, options = {}) {
this.zk = zk;
this.path = `/locks/${path}`;
this.timeout = options.timeout || 30000;
}
async acquire() {
return new Promise((resolve, reject) => {
// 创建临时节点
this.zk.create(
this.path,
Buffer.from(''),
ZooKeeper.CreateMode.EPHEMERAL,
(error, path) => {
if (error) {
reject(error);
} else {
this.lockPath = path;
resolve(true);
}
}
);
// 超时
setTimeout(() => {
reject(new Error('锁定超时'));
}, this.timeout);
});
}
async release() {
return new Promise((resolve, reject) => {
this.zk.delete(
this.lockPath,
(error) => {
if (error) {
reject(error);
} else {
resolve();
}
}
);
});
}
}
MVCC(多版本并发控制)
MVCC工作原理
MVCC允许读者不阻塞写入者,反之亦然,通过维护每一行的多个版本。
行1(id=1, name="John", version=1)
↓ 事务A更新
行1(id=1, name="Jane", version=2)
↓ 事务B更新
行1(id=1, name="Jane", version=3)
事务A看到version=1
事务B看到version=2
新事务看到version=3
PostgreSQL MVCC
-- 事务1
BEGIN;
SELECT * FROM users WHERE id = 1; -- 看到事务开始时的版本
-- 即使其他事务提交更改,这个也看到旧版本
-- 事务2(并发)
BEGIN;
UPDATE users SET name = 'Jane' WHERE id = 1;
COMMIT; -- 创建新版本
-- 事务1继续
SELECT * FROM users WHERE id = 1; -- 仍然看到旧版本
COMMIT;
MySQL MVCC(InnoDB)
-- 事务1
START TRANSACTION;
SELECT * FROM users WHERE id = 1 FOR UPDATE; -- 锁定行
-- 如果行被另一个事务锁定,则等待
-- 事务2(并发)
START TRANSACTION;
UPDATE users SET name = 'Jane' WHERE id = 1; -- 等待锁定
-- 事务1提交后,这个继续
COMMIT;
PostgreSQL与MySQL锁定差异
锁定模式比较
| 特性 | PostgreSQL | MySQL (InnoDB) |
|---|---|---|
| 行锁定 | FOR UPDATE, FOR SHARE | FOR UPDATE |
| 锁定超时 | lock_timeout参数 | innodb_lock_wait_timeout |
| 建议锁定 | pg_advisory_lock | GET_LOCK |
| 锁定升级 | 否 | 是(自动) |
| 死锁检测 | 自动 | 自动 |
| MVCC | 是(快照隔离) | 是(undo log) |
PostgreSQL特定
-- SELECT FOR UPDATE SKIP LOCKED - 跳过锁定行
SELECT * FROM jobs
WHERE status = 'pending'
FOR UPDATE SKIP LOCKED
LIMIT 10;
-- SELECT FOR UPDATE NOWAIT - 如果锁定则立即失败
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;
MySQL特定
-- SELECT ... LOCK IN SHARE MODE - 共享锁定
SELECT * FROM products WHERE id = 1 LOCK IN SHARE MODE;
-- SELECT ... FOR UPDATE - 独占锁定
SELECT * FROM products WHERE id = 1 FOR UPDATE;
-- 低优先级更新
UPDATE LOW_PRIORITY accounts SET balance = balance - 100 WHERE id = 1;
锁定监控
PostgreSQL
-- 查看当前锁定
SELECT
pid,
relation::regclass AS table,
mode,
granted,
query
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.granted = false; -- 等待锁定
-- 查看锁定统计
SELECT * FROM pg_stat_user_tables;
MySQL
-- 查看当前锁定
SELECT * FROM information_schema.INNODB_LOCKS;
-- 查看锁定等待
SELECT * FROM information_schema.INNODB_LOCK_WAITS;
-- 查看锁定等待图
SELECT
r.trx_id AS waiting_trx,
r.trx_mysql_thread_id AS waiting_thread,
b.trx_id AS blocking_trx,
b.trx_mysql_thread_id AS blocking_thread
FROM information_schema.INNODB_LOCK_WAITS w
JOIN information_schema.INNODB_TRX r ON w.requesting_trx_id = r.trx_id
JOIN information_schema.INNODB_TRX b ON w.blocking_trx_id = b.trx_id;
锁定争用问题解决
识别争用
-- PostgreSQL:查找锁定最多的表
SELECT
relation::regclass AS table,
COUNT(*) AS lock_count
FROM pg_locks
WHERE relation IS NOT NULL
GROUP BY relation
ORDER BY lock_count DESC
LIMIT 10;
-- MySQL:查找锁定最多的表
SELECT
table_name,
COUNT(*) AS lock_count
FROM information_schema.INNODB_LOCKS
GROUP BY table_name
ORDER BY lock_count DESC
LIMIT 10;
解决方案
-
减少锁定持续时间
- 保持事务简短
- 避免长时间运行的查询
- 批处理
-
减少锁定范围
- 使用行级锁定而不是表级
- 仅锁定必要行
- 使用适当的隔离级别
-
使用乐观锁定
- 适用于低争用场景
- 实施重试逻辑
- 使用版本/时间戳列
-
改进索引
- 适当的索引减少锁定范围
- 避免全表扫描
- 使用覆盖索引
最佳实践
-
选择正确的锁定策略
- 高争用使用悲观锁定
- 低争用使用乐观锁定
- 尽可能使用行级锁定
- 考虑应用程序协调的建议锁定
-
处理死锁
- 实施一致的锁定顺序
- 保持事务简短
- 添加重试逻辑
- 记录死锁以供分析
-
设置适当的超时
- 配置锁定超时
- 优雅地处理超时错误
- 向用户提供反馈
-
监控锁定
- 跟踪锁定等待时间
- 监控死锁频率
- 识别争用热点
- 设置警报
-
彻底测试
- 测试并发访问
- 模拟死锁场景
- 验证隔离级别
- 在负载下测试