名称: webhook-integration-patterns 描述: 设计可靠的Webhook系统,包含适当的交付保证、重试逻辑、签名验证和幂等处理,适用于事件驱动集成。 许可证: MIT
Webhook集成模式
此技能提供设计和实现可靠Webhook系统的指导,适用于提供者(发送Webhook)和消费者(接收Webhook)。
核心能力
- 交付保证: 至少一次、恰好一次语义
- 安全性: 签名验证、密钥轮换
- 可靠性: 重试策略、死信处理
- 可扩展性: 基于队列的处理、速率限制
Webhook基础
Webhook解决的问题
轮询(低效): Webhook(高效):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ 客户端 │ │ 服务器 │ │ 客户端 │ │ 服务器 │
└───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
│ 有新闻吗? │ │ │
│─────────────▶│ │ │
│ 无 │ │ 事件! │
│◀─────────────│ │◀─────────────│
│ 有新闻吗? │ │ POST /hook │
│─────────────▶│ │◀─────────────│
│ 无 │ │ 200 OK │
│◀─────────────│ │─────────────▶│
│ 有新闻吗? │
│─────────────▶│ 基于推送的通知
│ 有! │ 代替轮询
│◀─────────────│
Webhook结构
POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000
{
"id": "evt_12345",
"type": "payment.completed",
"created_at": "2024-01-30T12:00:00Z",
"data": {
"payment_id": "pay_abc",
"amount": 1000,
"currency": "USD"
}
}
关键头部:
- 签名: 真实性的加密证明
- ID: 用于去重的唯一事件标识符
- 时间戳: 事件发生时间
Webhook提供者设计
事件模式设计
class WebhookEvent:
"""标准Webhook事件结构"""
def __init__(self, event_type, data, idempotency_key=None):
self.id = self._generate_id()
self.type = event_type
self.created_at = datetime.utcnow().isoformat()
self.api_version = "2024-01-30"
self.data = data
self.idempotency_key = idempotency_key or self.id
def to_payload(self):
return {
"id": self.id,
"type": self.type,
"created_at": self.created_at,
"api_version": self.api_version,
"data": self.data
}
# 事件类型遵循资源.操作模式
EVENT_TYPES = [
"payment.created",
"payment.completed",
"payment.failed",
"subscription.created",
"subscription.updated",
"subscription.cancelled",
"customer.created",
"customer.deleted"
]
签名生成
import hmac
import hashlib
import time
class WebhookSigner:
"""为验证签署Webhook负载"""
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def sign(self, payload, timestamp=None):
"""生成HMAC签名"""
timestamp = timestamp or int(time.time())
payload_str = json.dumps(payload, separators=(',', ':'))
# 包含时间戳以防止重放攻击
signed_payload = f"{timestamp}.{payload_str}"
signature = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return {
'signature': f"sha256={signature}",
'timestamp': timestamp
}
def create_headers(self, payload):
"""生成所有Webhook头部"""
sign_data = self.sign(payload)
return {
'Content-Type': 'application/json',
'X-Webhook-Signature': sign_data['signature'],
'X-Webhook-Timestamp': str(sign_data['timestamp']),
'X-Webhook-ID': payload['id']
}
交付系统
import asyncio
from datetime import datetime, timedelta
class WebhookDeliverySystem:
"""具有重试的可靠Webhook交付"""
RETRY_SCHEDULE = [
timedelta(seconds=10),
timedelta(minutes=1),
timedelta(minutes=5),
timedelta(minutes=30),
timedelta(hours=1),
timedelta(hours=6),
timedelta(hours=24)
]
def __init__(self, signer, http_client):
self.signer = signer
self.http = http_client
self.delivery_log = []
async def deliver(self, endpoint, event, attempt=0):
"""尝试Webhook交付"""
payload = event.to_payload()
headers = self.signer.create_headers(payload)
try:
response = await self.http.post(
endpoint.url,
json=payload,
headers=headers,
timeout=30
)
self._log_attempt(endpoint, event, attempt, response)
if response.status_code in (200, 201, 202, 204):
return {'status': 'delivered', 'attempts': attempt + 1}
# 非成功状态 - 调度重试
return await self._schedule_retry(endpoint, event, attempt)
except Exception as e:
self._log_attempt(endpoint, event, attempt, error=e)
return await self._schedule_retry(endpoint, event, attempt)
async def _schedule_retry(self, endpoint, event, attempt):
"""调度下次重试或放弃"""
if attempt >= len(self.RETRY_SCHEDULE):
self._move_to_dead_letter(endpoint, event)
return {'status': 'failed', 'attempts': attempt + 1}
delay = self.RETRY_SCHEDULE[attempt]
# 生产环境:使用具有延迟执行的作业队列
await asyncio.sleep(delay.total_seconds())
return await self.deliver(endpoint, event, attempt + 1)
def _move_to_dead_letter(self, endpoint, event):
"""存储失败的Webhook供手动审查"""
# 存储在死信队列/表中
pass
端点管理
class WebhookEndpoint:
"""订阅者端点配置"""
def __init__(self, url, events, secret=None): # allow-secret
self.id = generate_id()
self.url = url
self.events = events # 订阅的事件类型列表
self.secret = secret or generate_secret() # allow-secret
self.status = 'active'
self.created_at = datetime.utcnow()
# 健康跟踪
self.consecutive_failures = 0
self.last_success = None
self.last_failure = None
def should_receive(self, event_type):
"""检查端点是否订阅此事件"""
if '*' in self.events:
return True
return event_type in self.events
def record_success(self):
self.consecutive_failures = 0
self.last_success = datetime.utcnow()
if self.status == 'disabled':
self.status = 'active'
def record_failure(self):
self.consecutive_failures += 1
self.last_failure = datetime.utcnow()
# 失败过多后自动禁用
if self.consecutive_failures >= 10:
self.status = 'disabled'
Webhook消费者设计
签名验证
class WebhookVerifier:
"""验证传入的Webhook签名"""
TIMESTAMP_TOLERANCE = 300 # 5分钟
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def verify(self, payload, signature, timestamp):
"""验证Webhook真实性"""
# 检查时间戳新鲜度
current_time = int(time.time())
if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
raise WebhookVerificationError("时间戳过旧")
# 计算预期签名
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
# 恒定时间比较以防止定时攻击
if not hmac.compare_digest(signature, expected_sig):
raise WebhookVerificationError("无效签名")
return True
# Flask端点示例
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
verifier = WebhookVerifier(WEBHOOK_SECRET)
try:
verifier.verify(
request.data.decode(),
request.headers.get('X-Webhook-Signature'),
request.headers.get('X-Webhook-Timestamp')
)
except WebhookVerificationError:
return '无效签名', 401
event = request.json
process_webhook(event)
return 'OK', 200
幂等处理
class IdempotentWebhookProcessor:
"""处理Webhook恰好一次"""
def __init__(self, storage):
self.storage = storage # Redis、数据库等
self.lock_ttl = 300 # 5分钟锁
async def process(self, event):
"""幂等地处理Webhook"""
event_id = event['id']
# 检查是否已处理
if await self.storage.exists(f"webhook:processed:{event_id}"):
return {'status': 'duplicate', 'event_id': event_id}
# 获取锁以防止并发处理
lock_key = f"webhook:lock:{event_id}"
if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
return {'status': 'processing', 'event_id': event_id}
try:
# 处理事件
result = await self._handle_event(event)
# 标记为已处理(使用长TTL进行去重)
await self.storage.set(
f"webhook:processed:{event_id}",
json.dumps(result),
ex=86400 * 7 # 保留7天
)
return {'status': 'processed', 'event_id': event_id}
finally:
await self.storage.delete(lock_key)
async def _handle_event(self, event):
"""将事件路由到适当的处理程序"""
handlers = {
'payment.completed': self._handle_payment_completed,
'subscription.cancelled': self._handle_subscription_cancelled,
# ... 更多处理程序
}
handler = handlers.get(event['type'])
if handler:
return await handler(event['data'])
return {'skipped': True, 'reason': 'unknown_event_type'}
基于队列的处理
class QueuedWebhookHandler:
"""解耦接收和处理"""
def __init__(self, queue):
self.queue = queue # Redis、SQS、RabbitMQ等
async def receive(self, event):
"""快速确认接收,异步处理"""
# 立即验证
self._validate_event_structure(event)
# 排队处理
await self.queue.enqueue(
'webhook_processing',
event,
deduplication_id=event['id']
)
# 立即返回200(不要让发送者等待)
return {'status': 'accepted'}
async def process_queue(self):
"""处理排队Webhook的工作器"""
while True:
event = await self.queue.dequeue('webhook_processing')
if event:
try:
await self._process_event(event)
await self.queue.ack(event)
except Exception as e:
await self.queue.nack(event, requeue=True)
logging.error(f"Webhook处理失败: {e}")
安全模式
密钥轮换
class SecretRotation:
"""支持滚动密钥更新"""
def __init__(self):
self.current_secret = None
self.previous_secret = None
self.rotation_timestamp = None
def rotate(self, new_secret):
"""轮换到新密钥,同时支持旧密钥"""
self.previous_secret = self.current_secret
self.current_secret = new_secret
self.rotation_timestamp = datetime.utcnow()
def get_verification_secrets(self):
"""返回用于验证的密钥"""
secrets = [self.current_secret]
# 在宽限期内接受先前的密钥
if self.previous_secret and self.rotation_timestamp:
grace_period = timedelta(hours=24)
if datetime.utcnow() - self.rotation_timestamp < grace_period:
secrets.append(self.previous_secret)
return secrets
IP允许列表
ALLOWED_IPS = {
'stripe': ['3.18.12.63', '3.130.192.231', ...],
'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}
def verify_source_ip(request, provider):
"""验证Webhook来自预期的IP"""
client_ip = request.remote_addr
allowed = ALLOWED_IPS.get(provider, [])
for allowed_range in allowed:
if ip_in_range(client_ip, allowed_range):
return True
return False
错误处理
响应代码
| 代码 | 含义 | 提供者操作 |
|---|---|---|
| 200-204 | 成功 | 标记为已交付 |
| 400 | 请求错误 | 不重试(你的错误) |
| 401/403 | 认证失败 | 禁用端点 |
| 404 | 未找到 | 禁用端点 |
| 429 | 速率限制 | 带退避重试 |
| 500+ | 服务器错误 | 重试 |
| 超时 | 无响应 | 重试 |
死信队列
class DeadLetterQueue:
"""存储和管理失败的Webhook"""
async def add(self, endpoint, event, failure_reason, attempts):
"""添加失败的Webhook到DLQ"""
await self.storage.add({
'id': generate_id(),
'endpoint_id': endpoint.id,
'endpoint_url': endpoint.url,
'event': event.to_payload(),
'failure_reason': str(failure_reason),
'attempts': attempts,
'added_at': datetime.utcnow().isoformat()
})
async def retry(self, dlq_item_id):
"""手动重试DLQ项"""
item = await self.storage.get(dlq_item_id)
endpoint = await self.get_endpoint(item['endpoint_id'])
event = WebhookEvent.from_payload(item['event'])
result = await self.delivery_system.deliver(endpoint, event)
if result['status'] == 'delivered':
await self.storage.remove(dlq_item_id)
return result
async def purge_old(self, days=30):
"""清理旧的DLQ项"""
cutoff = datetime.utcnow() - timedelta(days=days)
await self.storage.delete_before(cutoff)
最佳实践
对于提供者
- 包含事件ID 用于消费者去重
- 使用HMAC-SHA256最低标准签署负载
- 包含时间戳 以防止重放
- 使用指数退避重试(至少5次尝试)
- 在仪表板中提供Webhook日志
- 支持按类型过滤事件
- 版本化负载 以确保向后兼容
对于消费者
- 在处理前验证签名
- 快速响应(< 5秒)
- 通过队列异步处理
- 使用事件ID实现幂等性
- 记录一切 以进行调试
- 主动监控失败
参考资料
references/webhook-security.md- 详细的安全实现references/provider-examples.md- Stripe、GitHub、Twilio模式references/testing-webhooks.md- 本地开发和测试