Webhook集成 webhook-integration

Webhook 集成是一种实现事件驱动架构的技术,用于服务间和第三方集成的实时通信。关键词包括:事件驱动、实时通信、服务集成、第三方服务、自动化工作流、支付处理、用户活动跟踪、微服务通信。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

Webhook 集成

概述

实现健壮的 Webhook 系统,用于事件驱动架构,实现服务和第三方集成之间的实时通信。

何时使用

  • 第三方服务集成(Stripe, GitHub, Shopify)
  • 事件通知系统
  • 实时数据同步
  • 自动化工作流触发器
  • 支付处理回调
  • CI/CD 管道通知
  • 用户活动跟踪
  • 微服务通信

Webhook 架构

┌──────────┐         ┌──────────┐         ┌──────────┐
│  事件    │────────▶│ Webhook  │────────▶│ 消费者 │
│  源      │         │  发送者   │         │ 终端   │
└──────────┘         └──────────┘         └──────────┘
                           │
                           ▼
                    ┌──────────────┐
                    │ 重试队列   │
                    │ (失败)     │
                    └──────────────┘

实现示例

1. Webhook 发送者 (TypeScript)

import crypto from 'crypto';
import axios from 'axios';

interface WebhookEvent {
  id: string;
  type: string;
  timestamp: number;
  data: any;
}

interface WebhookEndpoint {
  url: string;
  secret: string;
  events: string[];
  active: boolean;
}

interface DeliveryAttempt {
  attemptNumber: number;
  timestamp: number;
  statusCode?: number;
  error?: string;
  duration: number;
}

class WebhookSender {
  private maxRetries = 3;
  private retryDelays = [1000, 5000, 30000]; // 指数退避
  private timeout = 10000; // 10 秒

  /**
   * 为 Webhook 负载生成 HMAC 签名
   */
  private generateSignature(
    payload: string,
    secret: string
  ): string {
    return crypto
      .createHmac('sha256', secret)
      .update(payload)
      .digest('hex');
  }

  /**
   * 发送 Webhook 到终端
   */
  async send(
    endpoint: WebhookEndpoint,
    event: WebhookEvent
  ): Promise<DeliveryAttempt[]> {
    if (!endpoint.active) {
      throw new Error('终端不活跃');
    }

    if (!endpoint.events.includes(event.type)) {
      throw new Error(`事件类型 ${event.type} 未订阅`);
    }

    const payload = JSON.stringify(event);
    const signature = this.generateSignature(payload, endpoint.secret);

    const attempts: DeliveryAttempt[] = [];

    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      const startTime = Date.now();

      try {
        const response = await axios.post(endpoint.url, payload, {
          headers: {
            'Content-Type': 'application/json',
            'X-Webhook-Signature': signature,
            'X-Webhook-ID': event.id,
            'X-Webhook-Timestamp': event.timestamp.toString(),
            'User-Agent': 'WebhookService/1.0'
          },
          timeout: this.timeout,
          validateStatus: (status) => status >= 200 && status < 300
        });

        const duration = Date.now() - startTime;

        attempts.push({
          attemptNumber: attempt + 1,
          timestamp: Date.now(),
          statusCode: response.status,
          duration
        });

        console.log(
          `Webhook 成功投递到 ${endpoint.url} (尝试 ${attempt + 1})`
        );

        return attempts;
      } catch (error: any) {
        const duration = Date.now() - startTime;

        attempts.push({
          attemptNumber: attempt + 1,
          timestamp: Date.now(),
          statusCode: error.response?.status,
          error: error.message,
          duration
        });

        console.error(
          `Webhook 投递失败到 ${endpoint.url} (尝试 ${attempt + 1}):`,
          error.message
        );

        // 等待重试(最后尝试除外)
        if (attempt < this.maxRetries - 1) {
          await this.delay(this.retryDelays[attempt]);
        }
      }
    }

    throw new Error(
      `Webhook 投递失败,尝试 ${this.maxRetries} 次后`
    );
  }

  /**
   * 批量发送 Webhooks
   */
  async sendBatch(
    endpoints: WebhookEndpoint[],
    event: WebhookEvent
  ): Promise<Map<string, DeliveryAttempt[]>> {
    const results = new Map<string, DeliveryAttempt[]>();

    await Promise.allSettled(
      endpoints.map(async (endpoint) => {
        try {
          const attempts = await this.send(endpoint, event);
          results.set(endpoint.url, attempts);
        } catch (error) {
          console.error(`无法投递到 ${endpoint.url}:`, error);
        }
      })
    );

    return results;
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 使用
const sender = new WebhookSender();

const endpoint: WebhookEndpoint = {
  url: 'https://api.example.com/webhooks',
  secret: 'your-webhook-secret',
  events: ['user.created', 'user.updated'],
  active: true
};

const event: WebhookEvent = {
  id: crypto.randomUUID(),
  type: 'user.created',
  timestamp: Date.now(),
  data: {
    userId: '123',
    email: 'user@example.com'
  }
};

await sender.send(endpoint, event);

2. Webhook 接收器 (Express)

import express from 'express';
import crypto from 'crypto';
import { body, validationResult } from 'express-validator';

interface WebhookConfig {
  secret: string;
  signatureHeader: string;
  timestampTolerance: number; // 秒
}

class WebhookReceiver {
  constructor(private config: WebhookConfig) {}

  /**
   * 验证 Webhook 签名
   */
  verifySignature(
    payload: string,
    signature: string
  ): boolean {
    const expectedSignature = crypto
      .createHmac('sha256', this.config.secret)
      .update(payload)
      .digest('hex');

    return crypto.timingSafeEqual(
      Buffer.from(signature),
      Buffer.from(expectedSignature)
    );
  }

  /**
   * 验证时间戳以防止重放攻击
   */
  verifyTimestamp(timestamp: number): boolean {
    const now = Date.now();
    const diff = Math.abs(now - timestamp) / 1000;

    return diff <= this.config.timestampTolerance;
  }

  /**
   * Webhook 验证中间件
   */
  createMiddleware() {
    return async (
      req: express.Request,
      res: express.Response,
      next: express.NextFunction
    ) => {
      try {
        const signature = req.headers[this.config.signatureHeader] as string;
        const timestamp = parseInt(
          req.headers['x-webhook-timestamp'] as string
        );

        if (!signature) {
          return res.status(401).json({
            error: '缺少签名'
          });
        }

        // 验证时间戳
        if (!this.verifyTimestamp(timestamp)) {
          return res.status(401).json({
            error: '无效的时间戳'
          });
        }

        // 获取原始正文以验证签名
        const payload = JSON.stringify(req.body);

        // 验证签名
        if (!this.verifySignature(payload, signature)) {
          return res.status(401).json({
            error: '无效的签名'
          });
        }

        next();
      } catch (error) {
        console.error('Webhook 验证错误:', error);
        res.status(500).json({
          error: '验证失败'
        });
      }
    };
  }
}

// 设置 Express 应用
const app = express();

// 使用原始正文解析器进行签名验证
app.use(express.json({
  verify: (req: any, res, buf) => {
    req.rawBody = buf.toString();
  }
}));

const receiver = new WebhookReceiver({
  secret: process.env.WEBHOOK_SECRET!,
  signatureHeader: 'x-webhook-signature',
  timestampTolerance: 300 // 5 分钟
});

// Webhook 终端
app.post('/webhooks',
  receiver.createMiddleware(),
  [
    body('id').isString(),
    body('type').isString(),
    body('data').isObject()
  ],
  async (req, res) => {
    // 验证请求
    const errors = validationResult(req);
    if (!errors.isEmpty()) {
      return res.status(400).json({ errors: errors.array() });
    }

    const { id, type, data } = req.body;

    try {
      // 处理 Webhook 事件
      await processWebhookEvent(type, data);

      // 立即响应
      res.status(200).json({
        received: true,
        eventId: id
      });

      // 如果需要,异步处理
      processEventAsync(type, data).catch(console.error);
    } catch (error) {
      console.error('Webhook 处理错误:', error);
      res.status(500).json({
        error: '处理失败'
      });
    }
  }
);

async function processWebhookEvent(type: string, data: any): Promise<void> {
  switch (type) {
    case 'user.created':
      await handleUserCreated(data);
      break;
    case 'payment.success':
      await handlePaymentSuccess(data);
      break;
    default:
      console.log(`未知事件类型: ${type}`);
  }
}

async function processEventAsync(type: string, data: any): Promise<void> {
  // 重的不需要阻塞响应的处理
}

async function handleUserCreated(data: any): Promise<void> {
  console.log('用户创建:', data);
}

async function handlePaymentSuccess(data: any): Promise<void> {
  console.log('支付成功:', data);
}

app.listen(3000, () => {
  console.log('Webhook 接收器在端口 3000 上监听');
});

3. Bull 的 Webhook 队列

import Queue from 'bull';
import axios from 'axios';

interface WebhookJob {
  endpoint: WebhookEndpoint;
  event: WebhookEvent;
}

class WebhookQueue {
  private queue: Queue.Queue<WebhookJob>;

  constructor(redisUrl: string) {
    this.queue = new Queue('webhooks', redisUrl, {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: 100,
        removeOnFail: 1000
      }
    });

    this.setupProcessors();
    this.setupEventHandlers();
  }

  private setupProcessors(): void {
    // 处理 Webhook 投递
    this.queue.process('delivery', 5, async (job) => {
      const { endpoint, event } = job.data;

      job.log(`向 ${endpoint.url} 投递 Webhook`);

      const sender = new WebhookSender();
      const attempts = await sender.send(endpoint, event);

      return {
        endpoint: endpoint.url,
        attempts,
        success: true
      };
    });
  }

  private setupEventHandlers(): void {
    this.queue.on('completed', (job, result) => {
      console.log(`Webhook 投递: ${job.id}`, result);
    });

    this.queue.on('failed', (job, err) => {
      console.error(`Webhook 投递失败: ${job?.id}`, err);
    });

    this.queue.on('stalled', (job) => {
      console.warn(`Webhook 投递停滞: ${job.id}`);
    });
  }

  async enqueue(
    endpoint: WebhookEndpoint,
    event: WebhookEvent,
    options?: Queue.JobOptions
  ): Promise<Queue.Job<WebhookJob>> {
    return this.queue.add(
      'delivery',
      { endpoint, event },
      {
        jobId: `${event.id}-${endpoint.url}`,
        ...options
      }
    );
  }

  async enqueueBatch(
    endpoints: WebhookEndpoint[],
    event: WebhookEvent
  ): Promise<Queue.Job<WebhookJob>[]> {
    const jobs = endpoints.map(endpoint => ({
      name: 'delivery',
      data: { endpoint, event },
      opts: {
        jobId: `${event.id}-${endpoint.url}`
      }
    }));

    return this.queue.addBulk(jobs);
  }

  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.queue.getJob(jobId);
    if (!job) return null;

    return {
      id: job.id,
      state: await job.getState(),
      progress: job.progress(),
      attempts: job.attemptsMade,
      failedReason: job.failedReason,
      finishedOn: job.finishedOn,
      processedOn: job.processedOn
    };
  }

  async retryFailed(jobId: string): Promise<void> {
    const job = await this.queue.getJob(jobId);
    if (!job) {
      throw new Error('工作未找到');
    }

    await job.retry();
  }

  async pause(): Promise<void> {
    await this.queue.pause();
  }

  async resume(): Promise<void> {
    await this.queue.resume();
  }

  async close(): Promise<void> {
    await this.queue.close();
  }
}

// 使用
const webhookQueue = new WebhookQueue('redis://localhost:6379');

// 将单个 Webhook 入队
await webhookQueue.enqueue(endpoint, event, {
  delay: 1000, // 延迟 1 秒
  priority: 1
});

// 向多个终端入队
await webhookQueue.enqueueBatch(endpoints, event);

// 检查工作状态
const status = await webhookQueue.getJobStatus('job-id');
console.log('工作状态:', status);

4. Webhook 测试工具

import express from 'express';
import crypto from 'crypto';

class WebhookTester {
  private app: express.Application;
  private receivedEvents: WebhookEvent[] = [];

  constructor() {
    this.app = express();
    this.setupTestEndpoint();
  }

  private setupTestEndpoint(): void {
    this.app.use(express.json());

    this.app.post('/test-webhook', (req, res) => {
      const event = req.body;

      // 如果提供,验证签名
      const signature = req.headers['x-webhook-signature'] as string;
      if (signature) {
        // 在这里验证签名
      }

      // 存储接收到的事件
      this.receivedEvents.push(event);

      console.log('接收到 Webhook:', event);

      // 根据测试场景响应
      res.status(200).json({
        received: true,
        eventId: event.id
      });
    });

    // 模拟失败的终端
    this.app.post('/test-webhook/fail', (req, res) => {
      const failureType = req.query.type;

      switch (failureType) {
        case 'timeout':
          // 不响应(模拟超时)
          break;
        case 'server-error':
          res.status(500).json({ error: '内部服务器错误' });
          break;
        case 'unauthorized':
          res.status(401).json({ error: '未授权' });
          break;
        default:
          res.status(400).json({ error: '请求错误' });
      }
    });
  }

  start(port: number): void {
    this.app.listen(port, () => {
      console.log(`Webhook 测试服务器在端口 ${port} 上运行`);
    });
  }

  getReceivedEvents(): WebhookEvent[] {
    return this.receivedEvents;
  }

  clearEvents(): void {
    this.receivedEvents = [];
  }

  /**
   * 创建模拟 Webhook 事件
   */
  static createMockEvent(type: string, data: any): WebhookEvent {
    return {
      id: crypto.randomUUID(),
      type,
      timestamp: Date.now(),
      data
    };
  }
}

// 测试
const tester = new WebhookTester();
tester.start(3001);

// 发送测试 Webhook
const mockEvent = WebhookTester.createMockEvent('user.created', {
  userId: '123',
  email: 'test@example.com'
});

const sender = new WebhookSender();
await sender.send(
  {
    url: 'http://localhost:3001/test-webhook',
    secret: 'test-secret',
    events: ['user.created'],
    active: true
  },
  mockEvent
);

// 验证接收到的
const received = tester.getReceivedEvents();
console.log('接收到的事件:', received);

最佳实践

✅ 要做

  • 使用 HMAC 签名进行验证
  • 通过事件 ID 实现幂等性
  • 快速返回 200 OK,异步处理
  • 实现重试的指数退避
  • 包括时间戳以防止重放攻击
  • 使用队列系统确保可靠投递
  • 记录所有投递尝试
  • 提供 Webhook 测试工具
  • 文档化 Webhook 负载模式
  • 实现 Webhook 管理 UI
  • 允许按事件类型过滤
  • 支持 Webhook 版本控制

❌ 不要做

  • 在 Webhooks 中发送敏感数据
  • 跳过签名验证
  • 用重处理阻塞响应
  • 无限期重试
  • 暴露内部错误细节
  • 在生产中发送到本地主机的 Webhooks
  • 忘记超时处理
  • 跳过速率限制

安全清单

  • [ ] 使用 HMAC 验证签名
  • [ ] 检查时间戳以防止重放攻击
  • [ ] 验证 SSL 证书
  • [ ] 仅使用 HTTPS
  • [ ] 实施速率限制
  • [ ] 验证 Webhook URL
  • [ ] 定期轮换密钥
  • [ ] 记录安全事件
  • [ ] 实施 IP 白名单(可选)
  • [ ] 清理错误消息

资源