幂等性处理 idempotency-handling

确保操作无论执行多少次都产生相同结果的技术,适用于支付系统、API重试、分布式事务等场景。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

idempotency-handling 实现幂等性键和处理,以确保操作可以安全地重试而不会产生重复效果。当构建支付系统、具有重试的API或分布式事务时使用。

幂等性处理

概述

实现幂等性以确保操作无论执行多少次都产生相同的结果。

何时使用

  • 支付处理
  • 具有重试的API端点
  • Webhooks和回调
  • 消息队列消费者
  • 分布式事务
  • 银行转账
  • 订单创建
  • 发送电子邮件
  • 资源创建

实现示例

1. Express 幂等性中间件

import express from 'express';
import Redis from 'ioredis';
import crypto from 'crypto';

interface IdempotentRequest {
  key: string;
  status: 'processing' | 'completed' | 'failed';
  response?: any;
  error?: string;
  createdAt: number;
  completedAt?: number;
}

class IdempotencyService {
  private redis: Redis;
  private ttl = 86400; // 24小时

  constructor(redisUrl: string) {
    this.redis = new Redis(redisUrl);
  }

  async getRequest(key: string): Promise<IdempotentRequest | null> {
    const data = await this.redis.get('idempotency:' + key);
    return data ? JSON.parse(data) : null;
  }

  async setRequest(
    key: string,
    request: IdempotentRequest
  ): Promise<void> {
    await this.redis.setex(
      'idempotency:' + key,
      this.ttl,
      JSON.stringify(request)
    );
  }

  async startProcessing(key: string): Promise<boolean> {
    const request: IdempotentRequest = {
      key,
      status: 'processing',
      createdAt: Date.now()
    };

    // 使用SET NX确保只有一个请求处理
    const result = await this.redis.set(
      'idempotency:' + key,
      JSON.stringify(request),
      'EX',
      this.ttl,
      'NX'
    );

    return result === 'OK';
  }

  async completeRequest(
    key: string,
    response: any
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'completed',
      response,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }

  async failRequest(
    key: string,
    error: string
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'failed',
      error,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }
}

function idempotencyMiddleware(idempotency: IdempotencyService) {
  return async (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // 仅适用于POST, PUT, PATCH, DELETE
    if (!['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method)) {
      return next();
    }

    const idempotencyKey = req.headers['idempotency-key'] as string;

    if (!idempotencyKey) {
      return res.status(400).json({
        error: '需要Idempotency-Key头部'
      });
    }

    // 检查现有请求
    const existing = await idempotency.getRequest(idempotencyKey);

    if (existing) {
      if (existing.status === 'processing') {
        return res.status(409).json({
          error: '请求正在处理中',
          message: '请稍后重试'
        });
      }

      if (existing.status === 'completed') {
        return res.status(200).json(existing.response);
      }

      if (existing.status === 'failed') {
        return res.status(500).json({
          error: '先前的请求失败',
          message: existing.error
        });
      }
    }

    // 开始处理
    const canProcess = await idempotency.startProcessing(idempotencyKey);

    if (!canProcess) {
      return res.status(409).json({
        error: '请求正在处理中'
      });
    }

    // 捕获响应
    const originalSend = res.json.bind(res);
    res.json = (body: any) => {
      // 为将来的请求保存响应
      idempotency.completeRequest(idempotencyKey, body).catch(console.error);
      return originalSend(body);
    };

    // 处理错误
    const originalNext = next;
    next = (err?: any) => {
      if (err) {
        idempotency.failRequest(idempotencyKey, err.message).catch(console.error);
      }
      return originalNext(err);
    };

    next();
  };
}

// 使用
const app = express();
const redis = new Redis('redis://localhost:6379');
const idempotency = new IdempotencyService('redis://localhost:6379');

app.use(express.json());
app.use(idempotencyMiddleware(idempotency));

app.post('/api/payments', async (req, res) => {
  const { amount, userId } = req.body;

  // 处理支付
  const payment = await processPayment(amount, userId);

  res.json(payment);
});

async function processPayment(amount: number, userId: string) {
  // 支付处理逻辑
  return {
    id: crypto.randomUUID(),
    amount,
    userId,
    status: 'completed'
  };
}

app.listen(3000);

2. 基于数据库的幂等性

import { Pool } from 'pg';

interface IdempotencyRecord {
  key: string;
  request_body: any;
  response_body?: any;
  status: string;
  error_message?: string;
  created_at: Date;
  completed_at?: Date;
}

class DatabaseIdempotency {
  constructor(private db: Pool) {
    this.createTable();
  }

  private async createTable(): Promise<void> {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS idempotency_keys (
        key VARCHAR(255) PRIMARY KEY,
        request_body JSONB NOT NULL,
        response_body JSONB,
        status VARCHAR(50) NOT NULL,
        error_message TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        completed_at TIMESTAMP,
        expires_at TIMESTAMP NOT NULL
      );

      CREATE INDEX IF NOT EXISTS idx_idempotency_expires
      ON idempotency_keys (expires_at);
    `);
  }

  async checkIdempotency(
    key: string,
    requestBody: any
  ): Promise<IdempotencyRecord | null> {
    const result = await this.db.query(
      'SELECT * FROM idempotency_keys WHERE key = $1',
      [key]
    );

    if (result.rows.length === 0) {
      return null;
    }

    const record = result.rows[0];

    // 检查请求体是否匹配
    if (JSON.stringify(record.request_body) !== JSON.stringify(requestBody)) {
      throw new Error('请求体不匹配幂等性键');
    }

    return record;
  }

  async startProcessing(
    key: string,
    requestBody: any
  ): Promise<boolean> {
    try {
      const expiresAt = new Date(Date.now() + 86400 * 1000); // 24小时

      await this.db.query(`
        INSERT INTO idempotency_keys (key, request_body, status, expires_at)
        VALUES ($1, $2, 'processing', $3)
      `, [key, requestBody, expiresAt]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') { // 唯一性违规
        return false;
      }
      throw error;
    }
  }

  async completeRequest(
    key: string,
    responseBody: any
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        response_body = $1,
        status = 'completed',
        completed_at = NOW()
      WHERE key = $2
    `, [responseBody, key]);
  }

  async failRequest(
    key: string,
    errorMessage: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        error_message = $1,
        status = 'failed',
        completed_at = NOW()
      WHERE key = $2
    `, [errorMessage, key]);
  }

  async cleanup(): Promise<number> {
    const result = await this.db.query(`
      DELETE FROM idempotency_keys
      WHERE expires_at < NOW()
    `);

    return result.rowCount || 0;
  }
}

3. Stripe风格的幂等性

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import psycopg2

class IdempotencyManager:
    def __init__(self, db_connection):
        self.db = db_connection
        self.ttl_days = 1

    def process_request(
        self,
        idempotency_key: str,
        request_data: Dict[str, Any],
        process_fn: callable
    ) -> Dict[str, Any]:
        """
        处理具有幂等性保证的请求。

        参数:
            idempotency_key: 此请求的唯一键
            request_data: 请求负载
            process_fn: 处理请求的函数

        返回:
            响应数据
        """
        # 检查现有请求
        existing = self.get_existing_request(
            idempotency_key,
            request_data
        )

        if existing:
            if existing['status'] == 'processing':
                raise ConflictError('请求正在处理中')

            if existing['status'] == 'completed':
                return existing['response']

            if existing['status'] == 'failed':
                raise ProcessingError(existing['error'])

        # 开始处理
        if not self.start_processing(idempotency_key, request_data):
            raise ConflictError('请求正在处理中')

        try:
            # 处理请求
            result = process_fn(request_data)

            # 存储结果
            self.complete_request(idempotency_key, result)

            return result

        except Exception as e:
            # 存储错误
            self.fail_request(idempotency_key, str(e))
            raise

    def get_existing_request(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """获取现有的幂等请求。"""
        cursor = self.db.cursor()

        cursor.execute("""
            SELECT status, response, error, request_hash
            FROM idempotency_requests
            WHERE idempotency_key = %s
            AND created_at > %s
        """, (key, datetime.now() - timedelta(days=self.ttl_days)))

        row = cursor.fetchone()
        cursor.close()

        if not row:
            return None

        # 验证请求数据匹配
        request_hash = self.hash_request(request_data)
        if row[3] != request_hash:
            raise ValueError(
                '请求数据与幂等性键不匹配'
            )

        return {
            'status': row[0],
            'response': row[1],
            'error': row[2]
        }

    def start_processing(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> bool:
        """标记请求为处理中。"""
        cursor = self.db.cursor()
        request_hash = self.hash_request(request_data)

        try:
            cursor.execute("""
                INSERT INTO idempotency_requests
                (idempotency_key, request_hash, status, created_at)
                VALUES (%s, %s, 'processing', NOW())
            """, (key, request_hash))

            self.db.commit()
            cursor.close()
            return True

        except psycopg2.IntegrityError:
            self.db.rollback()
            cursor.close()
            return False

    def complete_request(
        self,
        key: str,
        response: Dict[str, Any]
    ):
        """标记请求为已完成。"""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'completed',
                response = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (json.dumps(response), key))

        self.db.commit()
        cursor.close()

    def fail_request(self, key: str, error: str):
        """标记请求为失败。"""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'failed',
                error = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (error, key))

        self.db.commit()
        cursor.close()

    def hash_request(self, data: Dict[str, Any]) -> str:
        """创建请求数据的哈希。"""
        json_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(json_str.encode()).hexdigest()


class ConflictError(Exception):
    pass


class ProcessingError(Exception):
    pass


# 使用
def process_payment(data):
    # 处理支付逻辑
    return {
        'payment_id': 'pay_123',
        'amount': data['amount'],
        'status': 'completed'
    }

# 在你的API处理程序中
idempotency = IdempotencyManager(db_connection)

try:
    result = idempotency.process_request(
        idempotency_key='key_abc123',
        request_data={'amount': 100, 'currency': 'USD'},
        process_fn=process_payment
    )
    print(result)
except ConflictError as e:
    print(f"冲突:{e}")
except ProcessingError as e:
    print(f"处理错误:{e}")

4. 消息队列幂等性

interface Message {
  id: string;
  data: any;
  timestamp: number;
}

class IdempotentMessageProcessor {
  private processedMessages = new Set<string>();
  private db: Pool;

  constructor(db: Pool) {
    this.db = db;
    this.loadProcessedMessages();
  }

  private async loadProcessedMessages(): Promise<void> {
    // 加载最近处理的消息ID
    const result = await this.db.query(`
      SELECT message_id
      FROM processed_messages
      WHERE processed_at > NOW() - INTERVAL '24 hours'
    `);

    result.rows.forEach(row => {
      this.processedMessages.add(row.message_id);
    });
  }

  async processMessage(message: Message): Promise<void> {
    // 检查是否已处理
    if (this.processedMessages.has(message.id)) {
      console.log(`消息${message.id}已处理,跳过`);
      return;
    }

    // 标记为处理中(原子操作)
    const wasInserted = await this.markAsProcessing(message.id);

    if (!wasInserted) {
      console.log(`消息${message.id}正在处理中`);
      return;
    }

    try {
      // 处理消息
      await this.handleMessage(message);

      // 标记为已完成
      await this.markAsCompleted(message.id);

      this.processedMessages.add(message.id);
    } catch (error) {
      console.error(`未能处理消息${message.id}:`, error);
      await this.markAsFailed(message.id, (error as Error).message);
      throw error;
    }
  }

  private async markAsProcessing(messageId: string): Promise<boolean> {
    try {
      await this.db.query(`
        INSERT INTO processed_messages (message_id, status, processed_at)
        VALUES ($1, 'processing', NOW())
      `, [messageId]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') {
        return false;
      }
      throw error;
    }
  }

  private async markAsCompleted(messageId: string): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'completed', completed_at = NOW()
      WHERE message_id = $1
    `, [messageId]);
  }

  private async markAsFailed(
    messageId: string,
    error: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'failed', error = $2, completed_at = NOW()
      WHERE message_id = $1
    `, [messageId, error]);
  }

  private async handleMessage(message: Message): Promise<void> {
    // 实际消息处理逻辑
    console.log('处理消息:', message);
  }
}

最佳实践

✅ 要做

  • 要求变更操作的幂等性键
  • 一起存储请求和响应
  • 为幂等性记录设置适当的TTL
  • 验证请求体与存储的请求匹配
  • 优雅地处理并发请求
  • 对重复请求返回相同的响应
  • 清理旧的幂等性记录
  • 使用数据库约束进行原子性

❌ 不要做

  • 对GET请求应用幂等性
  • 永久存储幂等性数据
  • 跳过请求体的验证
  • 使用非唯一的幂等性键
  • 同时处理相同的请求
  • 为重复请求更改响应

模式设计

CREATE TABLE idempotency_keys (
  key VARCHAR(255) PRIMARY KEY,
  request_hash VARCHAR(64) NOT NULL,
  request_body JSONB NOT NULL,
  response_body JSONB,
  status VARCHAR(20) NOT NULL CHECK (status IN ('processing', 'completed', 'failed')),
  error_message TEXT,
  created_at TIMESTAMP DEFAULT NOW(),
  completed_at TIMESTAMP,
  expires_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_idempotency_expires ON idempotency_keys (expires_at);
CREATE INDEX idx_idempotency_status ON idempotency_keys (status);

资源