idempotency-handling 实现幂等性键和处理,以确保操作可以安全地重试而不会产生重复效果。当构建支付系统、具有重试的API或分布式事务时使用。
幂等性处理
概述
实现幂等性以确保操作无论执行多少次都产生相同的结果。
何时使用
- 支付处理
- 具有重试的API端点
- Webhooks和回调
- 消息队列消费者
- 分布式事务
- 银行转账
- 订单创建
- 发送电子邮件
- 资源创建
实现示例
1. Express 幂等性中间件
import express from 'express';
import Redis from 'ioredis';
import crypto from 'crypto';
interface IdempotentRequest {
key: string;
status: 'processing' | 'completed' | 'failed';
response?: any;
error?: string;
createdAt: number;
completedAt?: number;
}
class IdempotencyService {
private redis: Redis;
private ttl = 86400; // 24小时
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
async getRequest(key: string): Promise<IdempotentRequest | null> {
const data = await this.redis.get('idempotency:' + key);
return data ? JSON.parse(data) : null;
}
async setRequest(
key: string,
request: IdempotentRequest
): Promise<void> {
await this.redis.setex(
'idempotency:' + key,
this.ttl,
JSON.stringify(request)
);
}
async startProcessing(key: string): Promise<boolean> {
const request: IdempotentRequest = {
key,
status: 'processing',
createdAt: Date.now()
};
// 使用SET NX确保只有一个请求处理
const result = await this.redis.set(
'idempotency:' + key,
JSON.stringify(request),
'EX',
this.ttl,
'NX'
);
return result === 'OK';
}
async completeRequest(
key: string,
response: any
): Promise<void> {
const request: IdempotentRequest = {
key,
status: 'completed',
response,
createdAt: Date.now(),
completedAt: Date.now()
};
await this.setRequest(key, request);
}
async failRequest(
key: string,
error: string
): Promise<void> {
const request: IdempotentRequest = {
key,
status: 'failed',
error,
createdAt: Date.now(),
completedAt: Date.now()
};
await this.setRequest(key, request);
}
}
function idempotencyMiddleware(idempotency: IdempotencyService) {
return async (
req: express.Request,
res: express.Response,
next: express.NextFunction
) => {
// 仅适用于POST, PUT, PATCH, DELETE
if (!['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method)) {
return next();
}
const idempotencyKey = req.headers['idempotency-key'] as string;
if (!idempotencyKey) {
return res.status(400).json({
error: '需要Idempotency-Key头部'
});
}
// 检查现有请求
const existing = await idempotency.getRequest(idempotencyKey);
if (existing) {
if (existing.status === 'processing') {
return res.status(409).json({
error: '请求正在处理中',
message: '请稍后重试'
});
}
if (existing.status === 'completed') {
return res.status(200).json(existing.response);
}
if (existing.status === 'failed') {
return res.status(500).json({
error: '先前的请求失败',
message: existing.error
});
}
}
// 开始处理
const canProcess = await idempotency.startProcessing(idempotencyKey);
if (!canProcess) {
return res.status(409).json({
error: '请求正在处理中'
});
}
// 捕获响应
const originalSend = res.json.bind(res);
res.json = (body: any) => {
// 为将来的请求保存响应
idempotency.completeRequest(idempotencyKey, body).catch(console.error);
return originalSend(body);
};
// 处理错误
const originalNext = next;
next = (err?: any) => {
if (err) {
idempotency.failRequest(idempotencyKey, err.message).catch(console.error);
}
return originalNext(err);
};
next();
};
}
// 使用
const app = express();
const redis = new Redis('redis://localhost:6379');
const idempotency = new IdempotencyService('redis://localhost:6379');
app.use(express.json());
app.use(idempotencyMiddleware(idempotency));
app.post('/api/payments', async (req, res) => {
const { amount, userId } = req.body;
// 处理支付
const payment = await processPayment(amount, userId);
res.json(payment);
});
async function processPayment(amount: number, userId: string) {
// 支付处理逻辑
return {
id: crypto.randomUUID(),
amount,
userId,
status: 'completed'
};
}
app.listen(3000);
2. 基于数据库的幂等性
import { Pool } from 'pg';
interface IdempotencyRecord {
key: string;
request_body: any;
response_body?: any;
status: string;
error_message?: string;
created_at: Date;
completed_at?: Date;
}
class DatabaseIdempotency {
constructor(private db: Pool) {
this.createTable();
}
private async createTable(): Promise<void> {
await this.db.query(`
CREATE TABLE IF NOT EXISTS idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_body JSONB NOT NULL,
response_body JSONB,
status VARCHAR(50) NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_idempotency_expires
ON idempotency_keys (expires_at);
`);
}
async checkIdempotency(
key: string,
requestBody: any
): Promise<IdempotencyRecord | null> {
const result = await this.db.query(
'SELECT * FROM idempotency_keys WHERE key = $1',
[key]
);
if (result.rows.length === 0) {
return null;
}
const record = result.rows[0];
// 检查请求体是否匹配
if (JSON.stringify(record.request_body) !== JSON.stringify(requestBody)) {
throw new Error('请求体不匹配幂等性键');
}
return record;
}
async startProcessing(
key: string,
requestBody: any
): Promise<boolean> {
try {
const expiresAt = new Date(Date.now() + 86400 * 1000); // 24小时
await this.db.query(`
INSERT INTO idempotency_keys (key, request_body, status, expires_at)
VALUES ($1, $2, 'processing', $3)
`, [key, requestBody, expiresAt]);
return true;
} catch (error: any) {
if (error.code === '23505') { // 唯一性违规
return false;
}
throw error;
}
}
async completeRequest(
key: string,
responseBody: any
): Promise<void> {
await this.db.query(`
UPDATE idempotency_keys
SET
response_body = $1,
status = 'completed',
completed_at = NOW()
WHERE key = $2
`, [responseBody, key]);
}
async failRequest(
key: string,
errorMessage: string
): Promise<void> {
await this.db.query(`
UPDATE idempotency_keys
SET
error_message = $1,
status = 'failed',
completed_at = NOW()
WHERE key = $2
`, [errorMessage, key]);
}
async cleanup(): Promise<number> {
const result = await this.db.query(`
DELETE FROM idempotency_keys
WHERE expires_at < NOW()
`);
return result.rowCount || 0;
}
}
3. Stripe风格的幂等性
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import psycopg2
class IdempotencyManager:
def __init__(self, db_connection):
self.db = db_connection
self.ttl_days = 1
def process_request(
self,
idempotency_key: str,
request_data: Dict[str, Any],
process_fn: callable
) -> Dict[str, Any]:
"""
处理具有幂等性保证的请求。
参数:
idempotency_key: 此请求的唯一键
request_data: 请求负载
process_fn: 处理请求的函数
返回:
响应数据
"""
# 检查现有请求
existing = self.get_existing_request(
idempotency_key,
request_data
)
if existing:
if existing['status'] == 'processing':
raise ConflictError('请求正在处理中')
if existing['status'] == 'completed':
return existing['response']
if existing['status'] == 'failed':
raise ProcessingError(existing['error'])
# 开始处理
if not self.start_processing(idempotency_key, request_data):
raise ConflictError('请求正在处理中')
try:
# 处理请求
result = process_fn(request_data)
# 存储结果
self.complete_request(idempotency_key, result)
return result
except Exception as e:
# 存储错误
self.fail_request(idempotency_key, str(e))
raise
def get_existing_request(
self,
key: str,
request_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""获取现有的幂等请求。"""
cursor = self.db.cursor()
cursor.execute("""
SELECT status, response, error, request_hash
FROM idempotency_requests
WHERE idempotency_key = %s
AND created_at > %s
""", (key, datetime.now() - timedelta(days=self.ttl_days)))
row = cursor.fetchone()
cursor.close()
if not row:
return None
# 验证请求数据匹配
request_hash = self.hash_request(request_data)
if row[3] != request_hash:
raise ValueError(
'请求数据与幂等性键不匹配'
)
return {
'status': row[0],
'response': row[1],
'error': row[2]
}
def start_processing(
self,
key: str,
request_data: Dict[str, Any]
) -> bool:
"""标记请求为处理中。"""
cursor = self.db.cursor()
request_hash = self.hash_request(request_data)
try:
cursor.execute("""
INSERT INTO idempotency_requests
(idempotency_key, request_hash, status, created_at)
VALUES (%s, %s, 'processing', NOW())
""", (key, request_hash))
self.db.commit()
cursor.close()
return True
except psycopg2.IntegrityError:
self.db.rollback()
cursor.close()
return False
def complete_request(
self,
key: str,
response: Dict[str, Any]
):
"""标记请求为已完成。"""
cursor = self.db.cursor()
cursor.execute("""
UPDATE idempotency_requests
SET
status = 'completed',
response = %s,
completed_at = NOW()
WHERE idempotency_key = %s
""", (json.dumps(response), key))
self.db.commit()
cursor.close()
def fail_request(self, key: str, error: str):
"""标记请求为失败。"""
cursor = self.db.cursor()
cursor.execute("""
UPDATE idempotency_requests
SET
status = 'failed',
error = %s,
completed_at = NOW()
WHERE idempotency_key = %s
""", (error, key))
self.db.commit()
cursor.close()
def hash_request(self, data: Dict[str, Any]) -> str:
"""创建请求数据的哈希。"""
json_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()
class ConflictError(Exception):
pass
class ProcessingError(Exception):
pass
# 使用
def process_payment(data):
# 处理支付逻辑
return {
'payment_id': 'pay_123',
'amount': data['amount'],
'status': 'completed'
}
# 在你的API处理程序中
idempotency = IdempotencyManager(db_connection)
try:
result = idempotency.process_request(
idempotency_key='key_abc123',
request_data={'amount': 100, 'currency': 'USD'},
process_fn=process_payment
)
print(result)
except ConflictError as e:
print(f"冲突:{e}")
except ProcessingError as e:
print(f"处理错误:{e}")
4. 消息队列幂等性
interface Message {
id: string;
data: any;
timestamp: number;
}
class IdempotentMessageProcessor {
private processedMessages = new Set<string>();
private db: Pool;
constructor(db: Pool) {
this.db = db;
this.loadProcessedMessages();
}
private async loadProcessedMessages(): Promise<void> {
// 加载最近处理的消息ID
const result = await this.db.query(`
SELECT message_id
FROM processed_messages
WHERE processed_at > NOW() - INTERVAL '24 hours'
`);
result.rows.forEach(row => {
this.processedMessages.add(row.message_id);
});
}
async processMessage(message: Message): Promise<void> {
// 检查是否已处理
if (this.processedMessages.has(message.id)) {
console.log(`消息${message.id}已处理,跳过`);
return;
}
// 标记为处理中(原子操作)
const wasInserted = await this.markAsProcessing(message.id);
if (!wasInserted) {
console.log(`消息${message.id}正在处理中`);
return;
}
try {
// 处理消息
await this.handleMessage(message);
// 标记为已完成
await this.markAsCompleted(message.id);
this.processedMessages.add(message.id);
} catch (error) {
console.error(`未能处理消息${message.id}:`, error);
await this.markAsFailed(message.id, (error as Error).message);
throw error;
}
}
private async markAsProcessing(messageId: string): Promise<boolean> {
try {
await this.db.query(`
INSERT INTO processed_messages (message_id, status, processed_at)
VALUES ($1, 'processing', NOW())
`, [messageId]);
return true;
} catch (error: any) {
if (error.code === '23505') {
return false;
}
throw error;
}
}
private async markAsCompleted(messageId: string): Promise<void> {
await this.db.query(`
UPDATE processed_messages
SET status = 'completed', completed_at = NOW()
WHERE message_id = $1
`, [messageId]);
}
private async markAsFailed(
messageId: string,
error: string
): Promise<void> {
await this.db.query(`
UPDATE processed_messages
SET status = 'failed', error = $2, completed_at = NOW()
WHERE message_id = $1
`, [messageId, error]);
}
private async handleMessage(message: Message): Promise<void> {
// 实际消息处理逻辑
console.log('处理消息:', message);
}
}
最佳实践
✅ 要做
- 要求变更操作的幂等性键
- 一起存储请求和响应
- 为幂等性记录设置适当的TTL
- 验证请求体与存储的请求匹配
- 优雅地处理并发请求
- 对重复请求返回相同的响应
- 清理旧的幂等性记录
- 使用数据库约束进行原子性
❌ 不要做
- 对GET请求应用幂等性
- 永久存储幂等性数据
- 跳过请求体的验证
- 使用非唯一的幂等性键
- 同时处理相同的请求
- 为重复请求更改响应
模式设计
CREATE TABLE idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_hash VARCHAR(64) NOT NULL,
request_body JSONB NOT NULL,
response_body JSONB,
status VARCHAR(20) NOT NULL CHECK (status IN ('processing', 'completed', 'failed')),
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_idempotency_expires ON idempotency_keys (expires_at);
CREATE INDEX idx_idempotency_status ON idempotency_keys (status);