Webhook集成模式设计技能 webhook-integration-patterns

这个技能专注于设计和实现可靠的Webhook系统,包括作为提供者发送Webhook和作为消费者接收Webhook。核心内容包括交付保证、安全验证、重试逻辑、幂等处理等,适用于事件驱动集成场景。关键词:Webhook集成、可靠系统、事件驱动、签名验证、重试策略、安全模式、架构设计。

架构设计 0 次安装 0 次浏览 更新于 3/7/2026

name: webhook-integration-patterns description: 设计可靠的Webhook系统,具有适当的交付保证、重试逻辑、签名验证和幂等处理,适用于事件驱动集成。 license: MIT

Webhook 集成模式

本技能提供设计和实施健壮Webhook系统的指导——既作为提供者(发送Webhook)也作为消费者(接收Webhook)。

核心能力

  • 交付保证:至少一次、恰好一次语义
  • 安全性:签名验证、密钥轮换
  • 可靠性:重试策略、死信处理
  • 可扩展性:基于队列的处理、速率限制

Webhook 基础

Webhook 解决的问题

轮询(低效):           Webhook(高效):
┌────────┐     ┌────────┐       ┌────────┐     ┌────────┐
│ 客户端 │     │ 服务器 │       │ 客户端 │     │ 服务器 │
└───┬────┘     └───┬────┘       └───┬────┘     └───┬────┘
    │ 有新闻吗?   │                │              │
    │─────────────▶│                │              │
    │   没有       │                │   事件!     │
    │◀─────────────│                │◀─────────────│
    │ 有新闻吗?   │                │ POST /hook   │
    │─────────────▶│                │◀─────────────│
    │   没有       │                │   200 OK     │
    │◀─────────────│                │─────────────▶│
    │ 有新闻吗?   │
    │─────────────▶│       基于推送的通知代替轮询
    │   有!       │
    │◀─────────────│

Webhook 结构

POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000

{
  "id": "evt_12345",
  "type": "payment.completed",
  "created_at": "2024-01-30T12:00:00Z",
  "data": {
    "payment_id": "pay_abc",
    "amount": 1000,
    "currency": "USD"
  }
}

关键头部:

  • 签名:真实性的加密证明
  • ID:唯一事件标识符,用于去重
  • 时间戳:事件发生时间

Webhook 提供者设计

事件模式设计

class WebhookEvent:
    """标准Webhook事件结构"""

    def __init__(self, event_type, data, idempotency_key=None):
        self.id = self._generate_id()
        self.type = event_type
        self.created_at = datetime.utcnow().isoformat()
        self.api_version = "2024-01-30"
        self.data = data
        self.idempotency_key = idempotency_key or self.id

    def to_payload(self):
        return {
            "id": self.id,
            "type": self.type,
            "created_at": self.created_at,
            "api_version": self.api_version,
            "data": self.data
        }

# 事件类型遵循 resource.action 模式
EVENT_TYPES = [
    "payment.created",
    "payment.completed",
    "payment.failed",
    "subscription.created",
    "subscription.updated",
    "subscription.cancelled",
    "customer.created",
    "customer.deleted"
]

签名生成

import hmac
import hashlib
import time

class WebhookSigner:
    """为验证签名Webhook负载"""

    def __init__(self, secret):  # allow-secret
        self.secret = secret.encode()  # allow-secret

    def sign(self, payload, timestamp=None):
        """生成HMAC签名"""
        timestamp = timestamp or int(time.time())
        payload_str = json.dumps(payload, separators=(',', ':'))

        # 包含时间戳以防止重放攻击
        signed_payload = f"{timestamp}.{payload_str}"

        signature = hmac.new(
            self.secret,
            signed_payload.encode(),
            hashlib.sha256
        ).hexdigest()

        return {
            'signature': f"sha256={signature}",
            'timestamp': timestamp
        }

    def create_headers(self, payload):
        """生成所有Webhook头部"""
        sign_data = self.sign(payload)
        return {
            'Content-Type': 'application/json',
            'X-Webhook-Signature': sign_data['signature'],
            'X-Webhook-Timestamp': str(sign_data['timestamp']),
            'X-Webhook-ID': payload['id']
        }

交付系统

import asyncio
from datetime import datetime, timedelta

class WebhookDeliverySystem:
    """可靠的Webhook交付系统,带重试"""

    RETRY_SCHEDULE = [
        timedelta(seconds=10),
        timedelta(minutes=1),
        timedelta(minutes=5),
        timedelta(minutes=30),
        timedelta(hours=1),
        timedelta(hours=6),
        timedelta(hours=24)
    ]

    def __init__(self, signer, http_client):
        self.signer = signer
        self.http = http_client
        self.delivery_log = []

    async def deliver(self, endpoint, event, attempt=0):
        """尝试Webhook交付"""
        payload = event.to_payload()
        headers = self.signer.create_headers(payload)

        try:
            response = await self.http.post(
                endpoint.url,
                json=payload,
                headers=headers,
                timeout=30
            )

            self._log_attempt(endpoint, event, attempt, response)

            if response.status_code in (200, 201, 202, 204):
                return {'status': 'delivered', 'attempts': attempt + 1}

            # 非成功状态 - 安排重试
            return await self._schedule_retry(endpoint, event, attempt)

        except Exception as e:
            self._log_attempt(endpoint, event, attempt, error=e)
            return await self._schedule_retry(endpoint, event, attempt)

    async def _schedule_retry(self, endpoint, event, attempt):
        """安排下一次重试或放弃"""
        if attempt >= len(self.RETRY_SCHEDULE):
            self._move_to_dead_letter(endpoint, event)
            return {'status': 'failed', 'attempts': attempt + 1}

        delay = self.RETRY_SCHEDULE[attempt]
        # 生产中:使用带延迟执行的作业队列
        await asyncio.sleep(delay.total_seconds())
        return await self.deliver(endpoint, event, attempt + 1)

    def _move_to_dead_letter(self, endpoint, event):
        """存储失败的Webhook供手动审核"""
        # 存储在死信队列/表中
        pass

端点管理

class WebhookEndpoint:
    """订阅者端点配置"""

    def __init__(self, url, events, secret=None):  # allow-secret
        self.id = generate_id()
        self.url = url
        self.events = events  # 订阅的事件类型列表
        self.secret = secret or generate_secret()  # allow-secret
        self.status = 'active'
        self.created_at = datetime.utcnow()

        # 健康跟踪
        self.consecutive_failures = 0
        self.last_success = None
        self.last_failure = None

    def should_receive(self, event_type):
        """检查端点是否订阅此事件"""
        if '*' in self.events:
            return True
        return event_type in self.events

    def record_success(self):
        self.consecutive_failures = 0
        self.last_success = datetime.utcnow()
        if self.status == 'disabled':
            self.status = 'active'

    def record_failure(self):
        self.consecutive_failures += 1
        self.last_failure = datetime.utcnow()

        # 失败太多后自动禁用
        if self.consecutive_failures >= 10:
            self.status = 'disabled'

Webhook 消费者设计

签名验证

class WebhookVerifier:
    """验证传入Webhook签名"""

    TIMESTAMP_TOLERANCE = 300  # 5分钟

    def __init__(self, secret):  # allow-secret
        self.secret = secret.encode()  # allow-secret

    def verify(self, payload, signature, timestamp):
        """验证Webhook真实性"""
        # 检查时间戳新鲜度
        current_time = int(time.time())
        if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
            raise WebhookVerificationError("时间戳太旧")

        # 计算预期签名
        signed_payload = f"{timestamp}.{payload}"
        expected = hmac.new(
            self.secret,
            signed_payload.encode(),
            hashlib.sha256
        ).hexdigest()

        expected_sig = f"sha256={expected}"

        # 恒定时间比较以防止时序攻击
        if not hmac.compare_digest(signature, expected_sig):
            raise WebhookVerificationError("无效签名")

        return True


# Flask端点示例
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
    verifier = WebhookVerifier(WEBHOOK_SECRET)

    try:
        verifier.verify(
            request.data.decode(),
            request.headers.get('X-Webhook-Signature'),
            request.headers.get('X-Webhook-Timestamp')
        )
    except WebhookVerificationError:
        return '无效签名', 401

    event = request.json
    process_webhook(event)

    return 'OK', 200

幂等处理

class IdempotentWebhookProcessor:
    """幂等处理Webhook,确保恰好一次"""

    def __init__(self, storage):
        self.storage = storage  # Redis、数据库等
        self.lock_ttl = 300  # 5分钟锁

    async def process(self, event):
        """幂等地处理Webhook"""
        event_id = event['id']

        # 检查是否已处理
        if await self.storage.exists(f"webhook:processed:{event_id}"):
            return {'status': 'duplicate', 'event_id': event_id}

        # 获取锁以防止并发处理
        lock_key = f"webhook:lock:{event_id}"
        if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
            return {'status': 'processing', 'event_id': event_id}

        try:
            # 处理事件
            result = await self._handle_event(event)

            # 标记为已处理(带长TTL用于去重)
            await self.storage.set(
                f"webhook:processed:{event_id}",
                json.dumps(result),
                ex=86400 * 7  # 保持7天
            )

            return {'status': 'processed', 'event_id': event_id}

        finally:
            await self.storage.delete(lock_key)

    async def _handle_event(self, event):
        """将事件路由到适当的处理器"""
        handlers = {
            'payment.completed': self._handle_payment_completed,
            'subscription.cancelled': self._handle_subscription_cancelled,
            # ... 更多处理器
        }
        handler = handlers.get(event['type'])
        if handler:
            return await handler(event['data'])
        return {'skipped': True, 'reason': '未知事件类型'}

基于队列的处理

class QueuedWebhookHandler:
    """解耦接收和处理"""

    def __init__(self, queue):
        self.queue = queue  # Redis、SQS、RabbitMQ等

    async def receive(self, event):
        """快速确认接收,异步处理"""
        # 立即验证
        self._validate_event_structure(event)

        # 入队处理
        await self.queue.enqueue(
            'webhook_processing',
            event,
            deduplication_id=event['id']
        )

        # 立即返回200(不让发送者等待)
        return {'status': 'accepted'}

    async def process_queue(self):
        """处理队列中Webhook的工作者"""
        while True:
            event = await self.queue.dequeue('webhook_processing')
            if event:
                try:
                    await self._process_event(event)
                    await self.queue.ack(event)
                except Exception as e:
                    await self.queue.nack(event, requeue=True)
                    logging.error(f"Webhook处理失败: {e}")

安全模式

密钥轮换

class SecretRotation:
    """支持滚动密钥更新"""

    def __init__(self):
        self.current_secret = None
        self.previous_secret = None
        self.rotation_timestamp = None

    def rotate(self, new_secret):
        """轮换到新密钥,同时支持旧密钥"""
        self.previous_secret = self.current_secret
        self.current_secret = new_secret
        self.rotation_timestamp = datetime.utcnow()

    def get_verification_secrets(self):
        """返回用于验证的密钥"""
        secrets = [self.current_secret]

        # 在宽限期内接受旧密钥
        if self.previous_secret and self.rotation_timestamp:
            grace_period = timedelta(hours=24)
            if datetime.utcnow() - self.rotation_timestamp < grace_period:
                secrets.append(self.previous_secret)

        return secrets

IP允许列表

ALLOWED_IPS = {
    'stripe': ['3.18.12.63', '3.130.192.231', ...],
    'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
    'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}

def verify_source_ip(request, provider):
    """验证Webhook来自预期IP"""
    client_ip = request.remote_addr

    allowed = ALLOWED_IPS.get(provider, [])
    for allowed_range in allowed:
        if ip_in_range(client_ip, allowed_range):
            return True

    return False

错误处理

响应代码

代码 含义 提供者行动
200-204 成功 标记为已交付
400 错误请求 不重试(你的错误)
401/403 验证失败 禁用端点
404 未找到 禁用端点
429 速率限制 带退避重试
500+ 服务器错误 重试
超时 无响应 重试

死信队列

class DeadLetterQueue:
    """存储和管理失败的Webhook"""

    async def add(self, endpoint, event, failure_reason, attempts):
        """将失败的Webhook添加到DLQ"""
        await self.storage.add({
            'id': generate_id(),
            'endpoint_id': endpoint.id,
            'endpoint_url': endpoint.url,
            'event': event.to_payload(),
            'failure_reason': str(failure_reason),
            'attempts': attempts,
            'added_at': datetime.utcnow().isoformat()
        })

    async def retry(self, dlq_item_id):
        """手动重试DLQ项"""
        item = await self.storage.get(dlq_item_id)
        endpoint = await self.get_endpoint(item['endpoint_id'])
        event = WebhookEvent.from_payload(item['event'])

        result = await self.delivery_system.deliver(endpoint, event)

        if result['status'] == 'delivered':
            await self.storage.remove(dlq_item_id)

        return result

    async def purge_old(self, days=30):
        """清理旧的DLQ项"""
        cutoff = datetime.utcnow() - timedelta(days=days)
        await self.storage.delete_before(cutoff)

最佳实践

对于提供者

  1. 包含事件ID 供消费者去重
  2. 用HMAC-SHA256至少签名负载
  3. 包含时间戳 以防止重放
  4. 带指数退避重试(至少5次尝试)
  5. 在仪表板中提供Webhook日志
  6. 支持按类型过滤事件
  7. 版本化负载 以保持向后兼容性

对于消费者

  1. 在处理前验证签名
  2. 快速响应(< 5秒)
  3. 通过队列异步处理
  4. 使用事件ID实现幂等性
  5. 记录一切 以便调试
  6. 主动监控失败

参考

  • references/webhook-security.md - 详细安全实施
  • references/provider-examples.md - Stripe、GitHub、Twilio模式
  • references/testing-webhooks.md - 本地开发和测试