name: idempotency-handling 描述:使用幂等键进行幂等 API 操作,Redis 缓存,数据库约束。适用于支付系统、webhook 重试、安全重试,或遇到重复处理、竞争条件、键过期错误。
幂等性处理
确保操作无论执行多少次都产生相同的结果。
幂等键模式
const redis = require('redis');
const client = redis.createClient();
async function idempotencyMiddleware(req, res, next) {
const key = req.headers['idempotency-key'];
if (!key) return next();
const cached = await client.get(`idempotency:${key}`);
if (cached) {
const { status, body } = JSON.parse(cached);
return res.status(status).json(body);
}
// 存储原始发送
const originalSend = res.json.bind(res);
res.json = async (body) => {
await client.setEx(
`idempotency:${key}`,
86400, // 24 小时
JSON.stringify({ status: res.statusCode, body })
);
return originalSend(body);
};
next();
}
数据库支持的幂等性
CREATE TABLE idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_hash VARCHAR(64) NOT NULL,
response JSONB,
status VARCHAR(20) DEFAULT 'processing',
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP DEFAULT NOW() + INTERVAL '24 hours'
);
CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at);
async function processPayment(idempotencyKey, payload) {
const requestHash = crypto.createHash('sha256')
.update(JSON.stringify(payload)).digest('hex');
// 尝试插入状态为 'processing' - 只有一个请求会成功
const insertResult = await db.query(
`INSERT INTO idempotency_keys (key, request_hash, status)
VALUES ($1, $2, 'processing')
ON CONFLICT (key) DO NOTHING
RETURNING *`,
[idempotencyKey, requestHash]
);
// 如果我们插入了行(rowCount === 1),我们负责处理
if (insertResult.rowCount === 1) {
try {
// 执行支付
const result = await executePayment(payload);
// 更新为完成状态并存储响应
await db.query(
'UPDATE idempotency_keys SET status = $1, response = $2 WHERE key = $3',
['completed', JSON.stringify(result), idempotencyKey]
);
return result;
} catch (error) {
// 错误时标记为失败
await db.query(
'UPDATE idempotency_keys SET status = $1, response = $2 WHERE key = $3',
['failed', JSON.stringify({ error: error.message }), idempotencyKey]
);
throw error;
}
}
// 另一个请求正在/曾经处理此键 - 检查状态
const existing = await db.query(
'SELECT * FROM idempotency_keys WHERE key = $1',
[idempotencyKey]
);
const row = existing.rows[0];
if (!row) {
throw new Error('意外:幂等键已消失');
}
// 验证请求未更改
if (row.request_hash !== requestHash) {
throw new Error('幂等键被用于不同请求');
}
// 检查状态
if (row.status === 'completed') {
return JSON.parse(row.response);
} else if (row.status === 'processing') {
throw new Error('请求已在处理中 - 稍后重试');
} else if (row.status === 'failed') {
const failedResponse = JSON.parse(row.response);
throw new Error(`先前尝试失败:${failedResponse.error}`);
}
throw new Error(`未知状态:${row.status}`);
}
何时应用
- 支付处理
- 订单创建
- Webhook 处理
- 邮件发送
- 任何重复会导致问题的操作
最佳实践
- 对变更操作要求使用幂等键
- 验证请求体与存储的请求匹配
- 设置合适的 TTL(通常 24 小时)
- 使用原子数据库操作
- 实施清理作业以防止表膨胀
TTL 清理策略
为防止表无限增长,实施定期清理过期键:
选项 1:计划数据库作业(PostgreSQL)
-- 通过 pg_cron 或外部调度器每小时运行
DELETE FROM idempotency_keys
WHERE expires_at < NOW()
LIMIT 1000; -- 批量删除以避免长时间锁
选项 2:应用清理作业(Node.js)
// 通过 cron 或作业调度器运行(例如,node-cron, Bull)
async function cleanupExpiredKeys() {
try {
const result = await db.query(
'DELETE FROM idempotency_keys WHERE expires_at < NOW()'
);
console.log(`清理了 ${result.rowCount} 个过期幂等键`);
} catch (error) {
console.error('清理作业失败:', error);
}
}
// 计划每小时运行一次
cron.schedule('0 * * * *', cleanupExpiredKeys);
选项 3:应用清理作业(Python)
import asyncio
from datetime import datetime
async def cleanup_expired_keys():
"""移除过期幂等键以防止表膨胀。"""
try:
result = await db.execute(
"DELETE FROM idempotency_keys WHERE expires_at < $1",
datetime.now()
)
print(f"清理了 {result} 个过期幂等键")
except Exception as e:
print(f"清理作业失败:{e}")
# 通过 APScheduler, Celery 或类似工具运行
# scheduler.add_job(cleanup_expired_keys, 'interval', hours=1)
清理最佳实践:
- 在低流量期间运行清理以最小化锁竞争
- 对于大表使用批量删除(
LIMIT 1000) - 监控清理作业执行和失败
- 考虑按 created_at 分区表以方便清理
- 设置警报以防表大小意外增长