Webhook开发 webhook-development

构建可靠的 Webhook 系统,实现事件驱动集成、签名验证、重试逻辑和死信处理。适用于实时通知、事件驱动架构、第三方平台集成等场景。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

Webhook 开发

概览

构建可靠的 Webhook 系统,实现事件传递、签名验证、重试逻辑和死信处理,用于异步集成。

何时使用

  • 发送实时通知到外部系统
  • 实施事件驱动架构
  • 与第三方平台集成
  • 构建审计跟踪和日志系统
  • 触发自动化工作流
  • 发送支付或订单通知

指南

1. Webhook 事件模式

{
  "id": "evt_1234567890",
  "timestamp": "2025-01-15T10:30:00Z",
  "event": "order.created",
  "version": "1.0",
  "data": {
    "orderId": "ORD-123456",
    "customerId": "CUST-789",
    "amount": 99.99,
    "currency": "USD",
    "items": [
      {
        "productId": "PROD-001",
        "quantity": 2,
        "price": 49.99
      }
    ],
    "status": "pending"
  },
  "attempt": 1,
  "retryable": true
}

2. Node.js Webhook 服务

const express = require('express');
const crypto = require('crypto');
const axios = require('axios');
const Bull = require('bull');

const app = express();
app.use(express.json());

const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;
const webhookQueue = new Bull('webhooks', {
  redis: { host: 'localhost', port: 6379 }
});

// 注册 Webhook 订阅
app.post('/api/webhooks/subscribe', async (req, res) => {
  const { url, events, secret } = req.body;

  // 验证 URL
  try {
    new URL(url);
  } catch {
    return res.status(400).json({ error: '无效的 URL' });
  }

  const webhook = {
    id: crypto.randomBytes(16).toString('hex'),
    url,
    events,
    secret: secret || crypto.randomBytes(32).toString('hex'),
    active: true,
    createdAt: new Date(),
    failureCount: 0
  };

  // 保存到数据库
  await WebhookSubscription.create(webhook);

  res.status(201).json({
    id: webhook.id,
    secret: webhook.secret,
    message: 'Webhook 注册成功'
  });
});

// 发送 Webhook 事件
const sendWebhookEvent = async (eventType, data) => {
  const webhooks = await WebhookSubscription.find({
    events: eventType,
    active: true
  });

  for (const webhook of webhooks) {
    const event = {
      id: `evt_${Date.now()}`,
      timestamp: new Date().toISOString(),
      event: eventType,
      version: '1.0',
      data,
      attempt: 1,
      retryable: true
    };

    // 添加到队列
    await webhookQueue.add(
      { webhook, event },
      {
        attempts: 5,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: true
      }
    );
  }
};

// 处理 Webhook 队列
webhookQueue.process(async (job) => {
  const { webhook, event } = job.data;

  try {
    const signature = generateSignature(event, webhook.secret);

    const response = await axios.post(webhook.url, event, {
      headers: {
        'Content-Type': 'application/json',
        'X-Webhook-Signature': signature,
        'X-Webhook-ID': event.id,
        'X-Webhook-Attempt': event.attempt
      },
      timeout: 10000
    });

    if (response.status >= 200 && response.status < 300) {
      // 成功
      await WebhookDelivery.create({
        webhookId: webhook.id,
        eventId: event.id,
        status: 'delivered',
        statusCode: response.status,
        deliveredAt: new Date()
      });
      return;
    }

    throw new Error(`HTTP ${response.status}`);
  } catch (error) {
    // 重试或死信
    if (job.attemptsMade < 5) {
      throw error; // 重试
    } else {
      // 死信
      await DeadLetterQueue.create({
        webhookId: webhook.id,
        eventId: event.id,
        event,
        error: error.message,
        failedAt: new Date()
      });

      // 更新失败次数
      webhook.failureCount++;
      if (webhook.failureCount >= 10) {
        webhook.active = false;
      }
      await webhook.save();
    }
  }
});

// Webhook 端点(接收 Webhook)
app.post('/webhooks/:id', async (req, res) => {
  const signature = req.headers['x-webhook-signature'];
  const webhookId = req.params.id;
  const event = req.body;

  try {
    const webhook = await WebhookSubscription.findOne({ id: webhookId });
    if (!webhook) {
      return res.status(404).json({ error: 'Webhook 未找到' });
    }

    // 验证签名
    const expectedSignature = generateSignature(event, webhook.secret);
    if (signature !== expectedSignature) {
      return res.status(401).json({ error: '无效的签名' });
    }

    // 处理事件
    console.log('接收到 Webhook 事件:', event);

    res.status(200).json({ received: true });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// 签名生成
const generateSignature = (payload, secret) => {
  const message = JSON.stringify(payload);
  return crypto.createHmac('sha256', secret).update(message).digest('hex');
};

// 列出 Webhook 订阅
app.get('/api/webhooks', async (req, res) => {
  const webhooks = await WebhookSubscription.find({}, { secret: 0 });
  res.json(webhooks);
});

// 测试 Webhook 传递
app.post('/api/webhooks/:id/test', async (req, res) => {
  const webhook = await WebhookSubscription.findOne({ id: req.params.id });

  const testEvent = {
    id: `evt_test_${Date.now()}`,
    timestamp: new Date().toISOString(),
    event: 'webhook.test',
    data: { message: '测试事件' }
  };

  await webhookQueue.add({ webhook, event: testEvent });

  res.json({ message: '测试事件排队' });
});

// 重试失败的传递
app.post('/api/webhooks/deliveries/:id/retry', async (req, res) => {
  const delivery = await WebhookDelivery.findOne({ _id: req.params.id });
  if (!delivery) {
    return res.status(404).json({ error: '传递未找到' });
  }

  const webhook = await WebhookSubscription.findOne({ id: delivery.webhookId });
  const event = await Event.findOne({ id: delivery.eventId });

  await webhookQueue.add({ webhook, event });

  res.json({ message: '重试排队' });
});

// 列出 Webhook 传递
app.get('/api/webhooks/:id/deliveries', async (req, res) => {
  const deliveries = await WebhookDelivery.find({
    webhookId: req.params.id
  }).limit(100);

  res.json(deliveries);
});

// 事件触发示例
app.post('/api/orders', async (req, res) => {
  const order = await Order.create(req.body);

  // 发送 Webhook 事件
  await sendWebhookEvent('order.created', {
    orderId: order.id,
    customerId: order.customerId,
    amount: order.amount,
    status: order.status
  });

  res.status(201).json(order);
});

app.listen(3000, () => console.log('服务器在 3000 端口'));

3. Python Webhook 处理器

from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import hmac
import hashlib
import requests
import json
from celery import Celery
from sqlalchemy import Column, String, Boolean, DateTime, Integer

app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')

class WebhookSubscription:
    id = Column(String(100), primary_key=True)
    url = Column(String(500))
    events = Column(String(500))
    secret = Column(String(256))
    active = Column(Boolean, default=True)
    failure_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)

def generate_signature(payload, secret):
    message = json.dumps(payload, sort_keys=True)
    return hmac.new(
        secret.encode(),
        message.encode(),
        hashlib.sha256
    ).hexdigest()

@app.route('/api/webhooks/subscribe', methods=['POST'])
def subscribe_webhook():
    data = request.get_json()
    url = data.get('url')
    events = data.get('events', [])
    secret = data.get('secret', os.urandom(32).hex())

    webhook = WebhookSubscription(
        id=f"wh_{secrets.token_hex(8)}",
        url=url,
        events=','.join(events),
        secret=secret,
        active=True
    )

    db.session.add(webhook)
    db.session.commit()

    return jsonify({
        'id': webhook.id,
        'secret': webhook.secret,
        'message': 'Webhook 注册'
    }), 201

@celery.task(bind=True, max_retries=5)
def deliver_webhook(self, webhook_id, event):
    webhook = WebhookSubscription.query.get(webhook_id)
    if not webhook:
        return

    signature = generate_signature(event, webhook.secret)

    try:
        response = requests.post(
            webhook.url,
            json=event,
            headers={
                'Content-Type': 'application/json',
                'X-Webhook-Signature': signature,
                'X-Webhook-ID': event['id'],
                'X-Webhook-Attempt': str(event.get('attempt', 1))
            },
            timeout=10
        )

        if 200 <= response.status_code < 300:
            WebhookDelivery.create(
                webhook_id=webhook_id,
                event_id=event['id'],
                status='delivered',
                status_code=response.status_code
            )
            return

        raise Exception(f"HTTP {response.status_code}")

    except Exception as exc:
        retry_delay = 2 ** self.request.retries
        raise self.retry(exc=exc, countdown=retry_delay)

@app.route('/webhooks/<webhook_id>', methods=['POST'])
def receive_webhook(webhook_id):
    signature = request.headers.get('X-Webhook-Signature')
    event = request.get_json()

    webhook = WebhookSubscription.query.get(webhook_id)
    if not webhook:
        return jsonify({'error': '未找到'}), 404

    expected_signature = generate_signature(event, webhook.secret)
    if signature != expected_signature:
        return jsonify({'error': '无效签名'}), 401

    return jsonify({'received': True}), 200

@app.route('/api/orders', methods=['POST'])
def create_order():
    order = Order.create(request.get_json())

    # 队列 Webhook 传递
    event = {
        'id': f"evt_{datetime.utcnow().timestamp()}",
        'timestamp': datetime.utcnow().isoformat(),
        'event': 'order.created',
        'data': order.to_dict()
    }

    webhooks = WebhookSubscription.query.filter(
        WebhookSubscription.events.contains('order.created'),
        WebhookSubscription.active == True
    ).all()

    for webhook in webhooks:
        deliver_webhook.delay(webhook.id, event)

    return jsonify(order.to_dict()), 201

if __name__ == '__main__':
    app.run(debug=False, port=3000)

4. 最佳实践

✅ 应该:
- 使用 HMAC 签名所有 Webhook
- 实施指数退避重试
- 使用消息队列确保可靠传递
- 跟踪 Webhook 传递以便于调试
- 提供 Webhook 测试端点
- 文档支持的事件类型
- 使用唯一事件 ID 以实现去重
- 设置适当的超时(10s)
- 实施死信队列
- 快速返回 2xx,异步处理

❌ 不应该:
- 不加密发送敏感数据
- 使用弱签名
- 同步 Webhook 传递
- 忽略签名验证
- 公开 Webhook URL
- 无限重试
- 记录带有密钥的 Webhook 负载
- 跳过 Webhook 认证
- 忘记处理幂等性
- 发送重复事件

5. Webhook 事件

标准事件类型:
- order.created
- order.updated
- order.cancelled
- payment.succeeded
- payment.failed
- user.registered
- user.updated
- invoice.issued
- shipment.created
- refund.processed

监控

app.get('/api/webhooks/metrics', async (req, res) => {
  const total = await WebhookDelivery.countDocuments();
  const delivered = await WebhookDelivery.countDocuments({ status: 'delivered' });
  const failed = await WebhookDelivery.countDocuments({ status: 'failed' });
  const avgLatency = await WebhookDelivery.aggregate([
    { $group: { _id: null, avg: { $avg: '$latency' } } }
  ]);

  res.json({
    total,
    delivered,
    failed,
    successRate: (delivered / total * 100).toFixed(2),
    averageLatency: avgLatency[0]?.avg || 0
  });
});