幂等性处理 idempotency-handling

这个技能用于实现API操作的幂等性,通过使用幂等键、Redis缓存和数据库约束,确保在多次执行时产生相同结果,防止重复处理和竞争条件。适用于支付系统、webhook重试、邮件发送等场景,提高系统可靠性和数据一致性。关键词包括:幂等性、API、Redis、数据库、缓存、支付系统、webhook、重复处理、竞争条件、安全重试。

后端开发 0 次安装 0 次浏览 更新于 3/7/2026

name: idempotency-handling 描述:使用幂等键进行幂等 API 操作,Redis 缓存,数据库约束。适用于支付系统、webhook 重试、安全重试,或遇到重复处理、竞争条件、键过期错误。

幂等性处理

确保操作无论执行多少次都产生相同的结果。

幂等键模式

const redis = require('redis');
const client = redis.createClient();

async function idempotencyMiddleware(req, res, next) {
  const key = req.headers['idempotency-key'];
  if (!key) return next();

  const cached = await client.get(`idempotency:${key}`);
  if (cached) {
    const { status, body } = JSON.parse(cached);
    return res.status(status).json(body);
  }

  // 存储原始发送
  const originalSend = res.json.bind(res);
  res.json = async (body) => {
    await client.setEx(
      `idempotency:${key}`,
      86400, // 24 小时
      JSON.stringify({ status: res.statusCode, body })
    );
    return originalSend(body);
  };

  next();
}

数据库支持的幂等性

CREATE TABLE idempotency_keys (
  key VARCHAR(255) PRIMARY KEY,
  request_hash VARCHAR(64) NOT NULL,
  response JSONB,
  status VARCHAR(20) DEFAULT 'processing',
  created_at TIMESTAMP DEFAULT NOW(),
  expires_at TIMESTAMP DEFAULT NOW() + INTERVAL '24 hours'
);

CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at);
async function processPayment(idempotencyKey, payload) {
  const requestHash = crypto.createHash('sha256')
    .update(JSON.stringify(payload)).digest('hex');

  // 尝试插入状态为 'processing' - 只有一个请求会成功
  const insertResult = await db.query(
    `INSERT INTO idempotency_keys (key, request_hash, status)
     VALUES ($1, $2, 'processing')
     ON CONFLICT (key) DO NOTHING
     RETURNING *`,
    [idempotencyKey, requestHash]
  );

  // 如果我们插入了行(rowCount === 1),我们负责处理
  if (insertResult.rowCount === 1) {
    try {
      // 执行支付
      const result = await executePayment(payload);

      // 更新为完成状态并存储响应
      await db.query(
        'UPDATE idempotency_keys SET status = $1, response = $2 WHERE key = $3',
        ['completed', JSON.stringify(result), idempotencyKey]
      );

      return result;
    } catch (error) {
      // 错误时标记为失败
      await db.query(
        'UPDATE idempotency_keys SET status = $1, response = $2 WHERE key = $3',
        ['failed', JSON.stringify({ error: error.message }), idempotencyKey]
      );
      throw error;
    }
  }

  // 另一个请求正在/曾经处理此键 - 检查状态
  const existing = await db.query(
    'SELECT * FROM idempotency_keys WHERE key = $1',
    [idempotencyKey]
  );

  const row = existing.rows[0];
  if (!row) {
    throw new Error('意外:幂等键已消失');
  }

  // 验证请求未更改
  if (row.request_hash !== requestHash) {
    throw new Error('幂等键被用于不同请求');
  }

  // 检查状态
  if (row.status === 'completed') {
    return JSON.parse(row.response);
  } else if (row.status === 'processing') {
    throw new Error('请求已在处理中 - 稍后重试');
  } else if (row.status === 'failed') {
    const failedResponse = JSON.parse(row.response);
    throw new Error(`先前尝试失败:${failedResponse.error}`);
  }

  throw new Error(`未知状态:${row.status}`);
}

何时应用

  • 支付处理
  • 订单创建
  • Webhook 处理
  • 邮件发送
  • 任何重复会导致问题的操作

最佳实践

  • 对变更操作要求使用幂等键
  • 验证请求体与存储的请求匹配
  • 设置合适的 TTL(通常 24 小时)
  • 使用原子数据库操作
  • 实施清理作业以防止表膨胀

TTL 清理策略

为防止表无限增长,实施定期清理过期键:

选项 1:计划数据库作业(PostgreSQL)

-- 通过 pg_cron 或外部调度器每小时运行
DELETE FROM idempotency_keys
WHERE expires_at < NOW()
LIMIT 1000; -- 批量删除以避免长时间锁

选项 2:应用清理作业(Node.js)

// 通过 cron 或作业调度器运行(例如,node-cron, Bull)
async function cleanupExpiredKeys() {
  try {
    const result = await db.query(
      'DELETE FROM idempotency_keys WHERE expires_at < NOW()'
    );
    console.log(`清理了 ${result.rowCount} 个过期幂等键`);
  } catch (error) {
    console.error('清理作业失败:', error);
  }
}

// 计划每小时运行一次
cron.schedule('0 * * * *', cleanupExpiredKeys);

选项 3:应用清理作业(Python)

import asyncio
from datetime import datetime

async def cleanup_expired_keys():
    """移除过期幂等键以防止表膨胀。"""
    try:
        result = await db.execute(
            "DELETE FROM idempotency_keys WHERE expires_at < $1",
            datetime.now()
        )
        print(f"清理了 {result} 个过期幂等键")
    except Exception as e:
        print(f"清理作业失败:{e}")

# 通过 APScheduler, Celery 或类似工具运行
# scheduler.add_job(cleanup_expired_keys, 'interval', hours=1)

清理最佳实践:

  • 在低流量期间运行清理以最小化锁竞争
  • 对于大表使用批量删除(LIMIT 1000
  • 监控清理作业执行和失败
  • 考虑按 created_at 分区表以方便清理
  • 设置警报以防表大小意外增长