name: webhooks description: Webhook实现和消费模式。用于实现webhook端点、webhook接收器、webhook发送器、HTTP回调、事件通知、推送通知或实时集成。涵盖签名验证(HMAC、加密)、重试策略(指数退避)、幂等性键、交付保证、webhook安全、负载设计、和监控。关键词:webhook、webhooks、回调、callbacks、HTTP回调、事件通知、推送通知、签名验证、HMAC、hmac、加密签名、重试、指数退避、幂等性、幂等、交付保证、至少一次交付、webhook接收器、webhook发送器、webhook安全、webhook认证、重放攻击、死信队列、webhook监控。
Webhooks
概述
Webhooks是HTTP回调,用于在事件发生时通知外部系统。它们使得服务之间能够实时通信,而无需轮询。本技能涵盖webhook设计模式、安全、可靠性和实现最佳实践。
关键概念
Webhook设计模式
事件驱动架构:
interface WebhookEvent {
id: string; // 唯一事件ID
type: string; // 事件类型(例如,'order.created')
created: number; // Unix时间戳
apiVersion: string; // API版本用于负载格式
data: {
object: Record<string, any>; // 触发事件的资源
previousAttributes?: Record<string, any>; // 用于更新事件
};
}
// 示例事件
const orderCreatedEvent: WebhookEvent = {
id: "evt_1234567890",
type: "order.created",
created: 1702987200,
apiVersion: "2024-01-01",
data: {
object: {
id: "ord_abc123",
status: "pending",
total: 9999,
currency: "usd",
customer: "cus_xyz789",
},
},
};
const orderUpdatedEvent: WebhookEvent = {
id: "evt_1234567891",
type: "order.updated",
created: 1702987260,
apiVersion: "2024-01-01",
data: {
object: {
id: "ord_abc123",
status: "shipped",
total: 9999,
currency: "usd",
},
previousAttributes: {
status: "pending",
},
},
};
Webhook订阅模型:
interface WebhookEndpoint {
id: string;
url: string;
secret: string;
events: string[]; // 要接收的事件类型
status: "active" | "disabled";
metadata?: Record<string, string>;
createdAt: Date;
updatedAt: Date;
}
interface WebhookDelivery {
id: string;
endpointId: string;
eventId: string;
url: string;
requestHeaders: Record<string, string>;
requestBody: string;
responseStatus?: number;
responseHeaders?: Record<string, string>;
responseBody?: string;
duration?: number;
attempts: number;
nextRetryAt?: Date;
status: "pending" | "success" | "failed" | "retrying";
createdAt: Date;
completedAt?: Date;
}
签名验证(HMAC)
生成签名:
import crypto from "crypto";
class WebhookSigner {
constructor(private secret: string) {}
sign(payload: string, timestamp: number): string {
const signedPayload = `${timestamp}.${payload}`;
return crypto
.createHmac("sha256", this.secret)
.update(signedPayload)
.digest("hex");
}
generateHeaders(payload: string): Record<string, string> {
const timestamp = Math.floor(Date.now() / 1000);
const signature = this.sign(payload, timestamp);
return {
"X-Webhook-Timestamp": timestamp.toString(),
"X-Webhook-Signature": `v1=${signature}`,
"Content-Type": "application/json",
};
}
}
验证签名:
class WebhookVerifier {
constructor(
private secret: string,
private tolerance: number = 300, // 5分钟
) {}
verify(payload: string, signature: string, timestamp: string): boolean {
// 检查时间戳以防止重放攻击
const ts = parseInt(timestamp, 10);
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - ts) > this.tolerance) {
throw new WebhookError(
"时间戳超出容差范围",
"TIMESTAMP_EXPIRED",
);
}
// 提取签名值
const sigParts = signature.split(",");
const v1Sig = sigParts
.find((part) => part.startsWith("v1="))
?.replace("v1=", "");
if (!v1Sig) {
throw new WebhookError("未找到有效签名", "INVALID_SIGNATURE");
}
// 计算预期签名
const signedPayload = `${timestamp}.${payload}`;
const expectedSig = crypto
.createHmac("sha256", this.secret)
.update(signedPayload)
.digest("hex");
// 常量时间比较
const isValid = crypto.timingSafeEqual(
Buffer.from(v1Sig),
Buffer.from(expectedSig),
);
if (!isValid) {
throw new WebhookError("签名不匹配", "INVALID_SIGNATURE");
}
return true;
}
}
class WebhookError extends Error {
constructor(
message: string,
public code: string,
) {
super(message);
this.name = "WebhookError";
}
}
Express中间件用于验证:
import express from "express";
function webhookVerificationMiddleware(secret: string) {
const verifier = new WebhookVerifier(secret);
return (
req: express.Request,
res: express.Response,
next: express.NextFunction,
) => {
const signature = req.headers["x-webhook-signature"] as string;
const timestamp = req.headers["x-webhook-timestamp"] as string;
if (!signature || !timestamp) {
return res.status(401).json({ error: "缺少签名头信息" });
}
// 需要原始体用于签名验证
let rawBody = "";
req.setEncoding("utf8");
req.on("data", (chunk) => {
rawBody += chunk;
});
req.on("end", () => {
try {
verifier.verify(rawBody, signature, timestamp);
req.body = JSON.parse(rawBody);
next();
} catch (error) {
if (error instanceof WebhookError) {
return res
.status(401)
.json({ error: error.message, code: error.code });
}
return res.status(400).json({ error: "无效请求" });
}
});
};
}
// 与原始体解析器一起使用
app.post(
"/webhooks",
express.raw({ type: "application/json" }),
(req, res, next) => {
const verifier = new WebhookVerifier(process.env.WEBHOOK_SECRET!);
try {
verifier.verify(
req.body.toString(),
req.headers["x-webhook-signature"] as string,
req.headers["x-webhook-timestamp"] as string,
);
req.body = JSON.parse(req.body.toString());
next();
} catch (error) {
res.status(401).json({ error: "无效签名" });
}
},
);
重试逻辑与指数退避
重试配置:
interface RetryConfig {
maxAttempts: number;
initialDelay: number; // 毫秒
maxDelay: number; // 毫秒
backoffMultiplier: number;
retryableStatuses: number[];
}
const defaultRetryConfig: RetryConfig = {
maxAttempts: 5,
initialDelay: 1000, // 1秒
maxDelay: 3600000, // 1小时
backoffMultiplier: 2,
retryableStatuses: [408, 429, 500, 502, 503, 504],
};
function calculateNextRetry(attempt: number, config: RetryConfig): number {
// 带抖动的指数退避
const delay = Math.min(
config.initialDelay * Math.pow(config.backoffMultiplier, attempt),
config.maxDelay,
);
// 添加随机抖动(0-25%的延迟)
const jitter = delay * Math.random() * 0.25;
return delay + jitter;
}
Webhook交付服务:
import fetch from "node-fetch";
class WebhookDeliveryService {
constructor(
private db: Database,
private retryConfig: RetryConfig = defaultRetryConfig,
) {}
async deliver(endpoint: WebhookEndpoint, event: WebhookEvent): Promise<void> {
const delivery = await this.createDelivery(endpoint, event);
await this.attemptDelivery(delivery);
}
private async createDelivery(
endpoint: WebhookEndpoint,
event: WebhookEvent,
): Promise<WebhookDelivery> {
const payload = JSON.stringify(event);
const signer = new WebhookSigner(endpoint.secret);
const headers = signer.generateHeaders(payload);
return this.db.deliveries.create({
id: generateId(),
endpointId: endpoint.id,
eventId: event.id,
url: endpoint.url,
requestHeaders: headers,
requestBody: payload,
attempts: 0,
status: "pending",
createdAt: new Date(),
});
}
async attemptDelivery(delivery: WebhookDelivery): Promise<void> {
delivery.attempts++;
const startTime = Date.now();
try {
const response = await fetch(delivery.url, {
method: "POST",
headers: delivery.requestHeaders,
body: delivery.requestBody,
timeout: 30000, // 30秒超时
});
delivery.responseStatus = response.status;
delivery.responseHeaders = Object.fromEntries(response.headers);
delivery.responseBody = await response.text();
delivery.duration = Date.now() - startTime;
if (response.ok) {
delivery.status = "success";
delivery.completedAt = new Date();
} else if (this.shouldRetry(delivery)) {
await this.scheduleRetry(delivery);
} else {
delivery.status = "failed";
delivery.completedAt = new Date();
}
} catch (error) {
delivery.duration = Date.now() - startTime;
if (this.shouldRetry(delivery)) {
await this.scheduleRetry(delivery);
} else {
delivery.status = "failed";
delivery.completedAt = new Date();
}
}
await this.db.deliveries.update(delivery);
}
private shouldRetry(delivery: WebhookDelivery): boolean {
if (delivery.attempts >= this.retryConfig.maxAttempts) {
return false;
}
// 在网络错误或可重试状态码时重试
if (!delivery.responseStatus) {
return true;
}
return this.retryConfig.retryableStatuses.includes(delivery.responseStatus);
}
private async scheduleRetry(delivery: WebhookDelivery): Promise<void> {
const delay = calculateNextRetry(delivery.attempts, this.retryConfig);
delivery.nextRetryAt = new Date(Date.now() + delay);
delivery.status = "retrying";
// 排队稍后处理
await this.queue.add(
"webhook-retry",
{
deliveryId: delivery.id,
},
{
delay,
},
);
}
}
幂等性键
幂等性实现:
class IdempotencyManager {
constructor(private redis: Redis) {}
async checkAndStore(
key: string,
ttl: number = 86400, // 24小时
): Promise<{ isNew: boolean; existingResult?: any }> {
const existing = await this.redis.get(`idempotency:${key}`);
if (existing) {
return {
isNew: false,
existingResult: JSON.parse(existing),
};
}
// 标记为处理中
const acquired = await this.redis.set(
`idempotency:${key}`,
JSON.stringify({ status: "processing" }),
"EX",
ttl,
"NX",
);
return { isNew: acquired === "OK" };
}
async storeResult(
key: string,
result: any,
ttl: number = 86400,
): Promise<void> {
await this.redis.set(
`idempotency:${key}`,
JSON.stringify({ status: "completed", result }),
"EX",
ttl,
);
}
async markFailed(key: string): Promise<void> {
await this.redis.del(`idempotency:${key}`);
}
}
带幂等性的Webhook处理器:
class WebhookHandler {
constructor(
private idempotency: IdempotencyManager,
private handlers: Map<string, (data: any) => Promise<any>>,
) {}
async handleEvent(event: WebhookEvent): Promise<any> {
// 使用事件ID作为幂等性键
const check = await this.idempotency.checkAndStore(event.id);
if (!check.isNew) {
console.log(`事件 ${event.id} 已处理`);
return check.existingResult?.result;
}
try {
const handler = this.handlers.get(event.type);
if (!handler) {
console.log(`无事件类型处理器: ${event.type}`);
return null;
}
const result = await handler(event.data);
await this.idempotency.storeResult(event.id, result);
return result;
} catch (error) {
await this.idempotency.markFailed(event.id);
throw error;
}
}
}
Webhook负载设计
负载结构最佳实践:
// 好:自包含负载,包含所有需要的数据
interface GoodWebhookPayload {
id: string;
type: "invoice.paid";
apiVersion: string;
created: number;
data: {
object: {
id: string;
customerId: string;
customerEmail: string;
amount: number;
currency: string;
status: string;
lineItems: Array<{
description: string;
amount: number;
quantity: number;
}>;
paidAt: string;
};
};
// 包含相关数据以避免额外API调用
relatedObjects?: {
customer: {
id: string;
name: string;
email: string;
};
};
}
// 坏:需要额外API调用
interface BadWebhookPayload {
type: "invoice.paid";
invoiceId: string; // 仅ID,无数据 - 接收者必须获取
}
版本策略:
class WebhookPayloadTransformer {
private transformers: Map<string, (data: any) => any> = new Map();
constructor() {
// 注册版本转换器
this.transformers.set("2023-01-01", this.transformV20230101);
this.transformers.set("2024-01-01", this.transformV20240101);
}
transform(event: WebhookEvent, targetVersion: string): WebhookEvent {
const transformer = this.transformers.get(targetVersion);
if (!transformer) {
throw new Error(`未知API版本: ${targetVersion}`);
}
return {
...event,
apiVersion: targetVersion,
data: {
...event.data,
object: transformer(event.data.object),
},
};
}
private transformV20230101(data: any): any {
// 旧格式
return {
...data,
amount_cents: data.amount, // 旧字段名
};
}
private transformV20240101(data: any): any {
// 当前格式
return data;
}
}
交付保证
至少一次交付:
class WebhookDispatcher {
private queue: Queue;
private deliveryService: WebhookDeliveryService;
async dispatch(
event: WebhookEvent,
endpoints: WebhookEndpoint[],
): Promise<void> {
// 先持久化事件
await this.db.events.create(event);
// 为每个端点排队交付
for (const endpoint of endpoints) {
if (endpoint.status !== "active") continue;
if (!this.matchesEventFilter(event.type, endpoint.events)) continue;
await this.queue.add(
"webhook-delivery",
{
eventId: event.id,
endpointId: endpoint.id,
},
{
attempts: 5,
backoff: {
type: "exponential",
delay: 1000,
},
removeOnComplete: true,
removeOnFail: false, // 保留失败作业以供检查
},
);
}
}
private matchesEventFilter(eventType: string, filters: string[]): boolean {
return filters.some((filter) => {
if (filter === "*") return true;
if (filter.endsWith(".*")) {
const prefix = filter.slice(0, -2);
return eventType.startsWith(prefix);
}
return eventType === filter;
});
}
}
死信队列:
class DeadLetterHandler {
constructor(
private db: Database,
private alertService: AlertService,
) {}
async handleFailedDelivery(delivery: WebhookDelivery): Promise<void> {
// 移动到死信队列
await this.db.deadLetterQueue.create({
id: generateId(),
deliveryId: delivery.id,
eventId: delivery.eventId,
endpointId: delivery.endpointId,
lastAttempt: new Date(),
totalAttempts: delivery.attempts,
lastError: delivery.responseBody,
lastStatus: delivery.responseStatus,
createdAt: new Date(),
});
// 对重复失败发出警报
const recentFailures = await this.db.deadLetterQueue.count({
endpointId: delivery.endpointId,
createdAt: { $gte: new Date(Date.now() - 3600000) }, // 最后一小时
});
if (recentFailures >= 10) {
await this.alertService.send({
severity: "warning",
title: "Webhook端点失败",
message: `端点 ${delivery.endpointId} 在过去一小时内有 ${recentFailures} 次失败`,
metadata: {
endpointId: delivery.endpointId,
url: delivery.url,
},
});
// 可选禁用端点
await this.disableEndpointIfNeeded(delivery.endpointId);
}
}
private async disableEndpointIfNeeded(endpointId: string): Promise<void> {
const failures24h = await this.db.deadLetterQueue.count({
endpointId,
createdAt: { $gte: new Date(Date.now() - 86400000) },
});
if (failures24h >= 100) {
await this.db.webhookEndpoints.update(endpointId, {
status: "disabled",
disabledReason: "太多连续失败",
});
}
}
}
Webhook监控和调试
交付仪表板数据:
interface WebhookMetrics {
endpointId: string;
period: "hour" | "day" | "week";
totalDeliveries: number;
successfulDeliveries: number;
failedDeliveries: number;
avgResponseTime: number;
p95ResponseTime: number;
successRate: number;
errorBreakdown: Record<number, number>; // 状态码 -> 计数
}
class WebhookMetricsService {
constructor(private db: Database) {}
async getMetrics(
endpointId: string,
period: "hour" | "day" | "week",
): Promise<WebhookMetrics> {
const since = this.getPeriodStart(period);
const deliveries = await this.db.deliveries.aggregate([
{
$match: {
endpointId,
createdAt: { $gte: since },
},
},
{
$group: {
_id: null,
total: { $sum: 1 },
successful: {
$sum: { $cond: [{ $eq: ["$status", "success"] }, 1, 0] },
},
failed: {
$sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] },
},
avgDuration: { $avg: "$duration" },
durations: { $push: "$duration" },
},
},
]);
const errorBreakdown = await this.db.deliveries.aggregate([
{
$match: {
endpointId,
createdAt: { $gte: since },
status: "failed",
},
},
{
$group: {
_id: "$responseStatus",
count: { $sum: 1 },
},
},
]);
const data = deliveries[0] || { total: 0, successful: 0, failed: 0 };
return {
endpointId,
period,
totalDeliveries: data.total,
successfulDeliveries: data.successful,
failedDeliveries: data.failed,
avgResponseTime: data.avgDuration || 0,
p95ResponseTime: this.calculateP95(data.durations || []),
successRate: data.total > 0 ? data.successful / data.total : 0,
errorBreakdown: Object.fromEntries(
errorBreakdown.map((e) => [e._id, e.count]),
),
};
}
private getPeriodStart(period: string): Date {
const now = new Date();
switch (period) {
case "hour":
return new Date(now.getTime() - 3600000);
case "day":
return new Date(now.getTime() - 86400000);
case "week":
return new Date(now.getTime() - 604800000);
default:
return now;
}
}
private calculateP95(values: number[]): number {
if (values.length === 0) return 0;
const sorted = values.sort((a, b) => a - b);
const index = Math.ceil(sorted.length * 0.95) - 1;
return sorted[index];
}
}
事件重放:
class WebhookReplayService {
constructor(
private db: Database,
private deliveryService: WebhookDeliveryService,
) {}
async replayEvent(eventId: string, endpointId?: string): Promise<void> {
const event = await this.db.events.findById(eventId);
if (!event) {
throw new Error(`事件未找到: ${eventId}`);
}
let endpoints: WebhookEndpoint[];
if (endpointId) {
const endpoint = await this.db.webhookEndpoints.findById(endpointId);
if (!endpoint) {
throw new Error(`端点未找到: ${endpointId}`);
}
endpoints = [endpoint];
} else {
endpoints = await this.db.webhookEndpoints.findByEventType(event.type);
}
for (const endpoint of endpoints) {
await this.deliveryService.deliver(endpoint, event);
}
}
async replayFailedDeliveries(
endpointId: string,
since: Date,
): Promise<number> {
const failedDeliveries = await this.db.deliveries.find({
endpointId,
status: "failed",
createdAt: { $gte: since },
});
for (const delivery of failedDeliveries) {
const event = await this.db.events.findById(delivery.eventId);
const endpoint = await this.db.webhookEndpoints.findById(endpointId);
if (event && endpoint) {
await this.deliveryService.deliver(endpoint, event);
}
}
return failedDeliveries.length;
}
}
最佳实践
安全(签名验证、认证)
核心原则:
- 始终使用HTTPS用于webhook URL
- 实现HMAC签名验证用于认证
- 在签名中包含时间戳以防止重放攻击
- 使用常量时间比较用于签名(时间安全)
- 定期轮换webhook密钥
- 在处理前验证负载结构
- 限制webhook端点速率以防止滥用
何时使用:
- 任何处理敏感数据的webhook接收器
- 需要源证明的系统
- 防止中间人攻击
- 确保消息完整性
可靠性(重试策略、交付保证)
核心原则:
- 实现带抖动的指数退避用于重试
- 使用幂等性键安全地处理重复
- 提供至少一次交付保证
- 异步排队webhook交付
- 实现死信队列用于持久失败
- 设置合理的超时限制(建议30秒)
- 跟踪交付尝试和最终状态
重试配置:
- 最大尝试:5(可配置)
- 初始延迟:1秒
- 最大延迟:1小时
- 可重试状态码:408, 429, 500, 502, 503, 504
- 添加0-25%随机抖动以防止雷霆万钧问题
何时使用:
- 需要保证事件交付的系统
- 具有网络不可靠性的分布式架构
- 面向客户的webhook集成
- 任何异步事件通知系统
幂等性(重复预防)
核心原则:
- 使用事件ID作为幂等性键
- 将处理状态存储在Redis/缓存中(24小时TTL)
- 为重复请求返回缓存结果
- 标记为处理中以防止竞态条件
- 清理失败处理尝试
何时使用:
- 支付处理webhooks
- 订单创建/更新
- 任何状态更改操作
- 具有重试逻辑的系统(防止双处理)
负载设计(事件结构)
核心原则:
- 在负载中包含所有必要数据(避免额外API调用)
- 版本化你的webhook负载(
apiVersion字段) - 保持负载大小合理(< 256KB)
- 使用一致的事件命名约定(
resource.action) - 包含事件ID用于去重
- 为更新事件提供
previousAttributes - 添加时间戳(Unix epoch)用于事件计时
事件命名模式:
order.created,order.updated,order.cancelledpayment.completed,payment.faileduser.registered,user.deleted- 支持通配符订阅:
order.*,*
何时使用:
- 设计新的webhook系统
- 改进webhook消费者体验
- 版本迁移用于破坏性更改
接收器实现(Webhook端点)
核心原则:
- 快速响应(< 5秒,理想< 1秒)
- 异步处理webhooks(先确认,后处理)
- 在处理前存储原始负载(用于重放/调试)
- 实现适当的错误处理
- 返回适当的状态码(200表示成功,4xx表示永久错误,5xx表示重试)
- 使用请求ID用于追踪
状态码指南:
- 200:成功接收并排队
- 400:错误请求(格式错误的负载,不会重试)
- 401:无效签名(认证失败,不会重试)
- 500:内部错误(发送者将重试)
- 503:服务暂时不可用(发送者将重试)
何时使用:
- 构建webhook接收器端点
- 与第三方webhooks集成(Stripe、GitHub等)
- 确保webhook端点可靠性
监控(可观察性、调试)
核心原则:
- 跟踪每个端点的交付成功率
- 对端点失败发出警报(超过阈值后禁用)
- 记录所有交付尝试及完整上下文
- 向客户提供webhook事件日志
- 实现重放功能用于失败事件
- 监控响应时间(平均、p95、p99)
- 显示交付指标的仪表板
关键指标:
- 总交付数(小时/天/周)
- 成功率百分比
- 平均响应时间
- P95响应时间
- 按状态码分解的错误
- 死信队列大小
何时使用:
- 操作生产webhook系统
- 调试客户集成问题
- SLA监控和警报
- 容量规划
示例
完整Webhook系统
// Webhook发送者服务
import express from "express";
import { Queue, Worker } from "bullmq";
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL);
const webhookQueue = new Queue("webhooks", { connection: redis });
// 事件发射器
async function emitEvent(type: string, data: any): Promise<void> {
const event: WebhookEvent = {
id: `evt_${generateId()}`,
type,
created: Math.floor(Date.now() / 1000),
apiVersion: "2024-01-01",
data: { object: data },
};
// 持久化事件
await db.events.create(event);
// 查找订阅的端点
const endpoints = await db.webhookEndpoints.find({
status: "active",
events: { $in: [type, "*", `${type.split(".")[0]}.*`] },
});
// 排队交付
for (const endpoint of endpoints) {
await webhookQueue.add("deliver", {
eventId: event.id,
endpointId: endpoint.id,
});
}
}
// 交付工作者
const worker = new Worker(
"webhooks",
async (job) => {
const { eventId, endpointId } = job.data;
const event = await db.events.findById(eventId);
const endpoint = await db.webhookEndpoints.findById(endpointId);
if (!event || !endpoint) return;
const payload = JSON.stringify(event);
const signer = new WebhookSigner(endpoint.secret);
const headers = signer.generateHeaders(payload);
const response = await fetch(endpoint.url, {
method: "POST",
headers,
body: payload,
timeout: 30000,
});
if (!response.ok) {
throw new Error(`Webhook交付失败: ${response.status}`);
}
},
{
connection: redis,
limiter: { max: 100, duration: 1000 },
},
);
// Webhook接收器
const app = express();
app.post("/webhooks", express.raw({ type: "application/json" }), (req, res) => {
const verifier = new WebhookVerifier(process.env.WEBHOOK_SECRET!);
try {
verifier.verify(
req.body.toString(),
req.headers["x-webhook-signature"] as string,
req.headers["x-webhook-timestamp"] as string,
);
} catch (error) {
return res.status(401).json({ error: "无效签名" });
}
const event = JSON.parse(req.body.toString()) as WebhookEvent;
// 快速确认
res.status(200).json({ received: true });
// 异步处理
processEventAsync(event).catch(console.error);
});
async function processEventAsync(event: WebhookEvent): Promise<void> {
// 检查幂等性
const processed = await redis.get(`processed:${event.id}`);
if (processed) return;
// 按类型处理事件
switch (event.type) {
case "order.created":
await handleOrderCreated(event.data.object);
break;
case "payment.completed":
await handlePaymentCompleted(event.data.object);
break;
}
// 标记为已处理
await redis.set(`processed:${event.id}`, "1", "EX", 86400);
}