Webhook 集成
概述
实现健壮的 Webhook 系统,用于事件驱动架构,实现服务和第三方集成之间的实时通信。
何时使用
- 第三方服务集成(Stripe, GitHub, Shopify)
- 事件通知系统
- 实时数据同步
- 自动化工作流触发器
- 支付处理回调
- CI/CD 管道通知
- 用户活动跟踪
- 微服务通信
Webhook 架构
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 事件 │────────▶│ Webhook │────────▶│ 消费者 │
│ 源 │ │ 发送者 │ │ 终端 │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────────┐
│ 重试队列 │
│ (失败) │
└──────────────┘
实现示例
1. Webhook 发送者 (TypeScript)
import crypto from 'crypto';
import axios from 'axios';
interface WebhookEvent {
id: string;
type: string;
timestamp: number;
data: any;
}
interface WebhookEndpoint {
url: string;
secret: string;
events: string[];
active: boolean;
}
interface DeliveryAttempt {
attemptNumber: number;
timestamp: number;
statusCode?: number;
error?: string;
duration: number;
}
class WebhookSender {
private maxRetries = 3;
private retryDelays = [1000, 5000, 30000]; // 指数退避
private timeout = 10000; // 10 秒
/**
* 为 Webhook 负载生成 HMAC 签名
*/
private generateSignature(
payload: string,
secret: string
): string {
return crypto
.createHmac('sha256', secret)
.update(payload)
.digest('hex');
}
/**
* 发送 Webhook 到终端
*/
async send(
endpoint: WebhookEndpoint,
event: WebhookEvent
): Promise<DeliveryAttempt[]> {
if (!endpoint.active) {
throw new Error('终端不活跃');
}
if (!endpoint.events.includes(event.type)) {
throw new Error(`事件类型 ${event.type} 未订阅`);
}
const payload = JSON.stringify(event);
const signature = this.generateSignature(payload, endpoint.secret);
const attempts: DeliveryAttempt[] = [];
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const startTime = Date.now();
try {
const response = await axios.post(endpoint.url, payload, {
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': signature,
'X-Webhook-ID': event.id,
'X-Webhook-Timestamp': event.timestamp.toString(),
'User-Agent': 'WebhookService/1.0'
},
timeout: this.timeout,
validateStatus: (status) => status >= 200 && status < 300
});
const duration = Date.now() - startTime;
attempts.push({
attemptNumber: attempt + 1,
timestamp: Date.now(),
statusCode: response.status,
duration
});
console.log(
`Webhook 成功投递到 ${endpoint.url} (尝试 ${attempt + 1})`
);
return attempts;
} catch (error: any) {
const duration = Date.now() - startTime;
attempts.push({
attemptNumber: attempt + 1,
timestamp: Date.now(),
statusCode: error.response?.status,
error: error.message,
duration
});
console.error(
`Webhook 投递失败到 ${endpoint.url} (尝试 ${attempt + 1}):`,
error.message
);
// 等待重试(最后尝试除外)
if (attempt < this.maxRetries - 1) {
await this.delay(this.retryDelays[attempt]);
}
}
}
throw new Error(
`Webhook 投递失败,尝试 ${this.maxRetries} 次后`
);
}
/**
* 批量发送 Webhooks
*/
async sendBatch(
endpoints: WebhookEndpoint[],
event: WebhookEvent
): Promise<Map<string, DeliveryAttempt[]>> {
const results = new Map<string, DeliveryAttempt[]>();
await Promise.allSettled(
endpoints.map(async (endpoint) => {
try {
const attempts = await this.send(endpoint, event);
results.set(endpoint.url, attempts);
} catch (error) {
console.error(`无法投递到 ${endpoint.url}:`, error);
}
})
);
return results;
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用
const sender = new WebhookSender();
const endpoint: WebhookEndpoint = {
url: 'https://api.example.com/webhooks',
secret: 'your-webhook-secret',
events: ['user.created', 'user.updated'],
active: true
};
const event: WebhookEvent = {
id: crypto.randomUUID(),
type: 'user.created',
timestamp: Date.now(),
data: {
userId: '123',
email: 'user@example.com'
}
};
await sender.send(endpoint, event);
2. Webhook 接收器 (Express)
import express from 'express';
import crypto from 'crypto';
import { body, validationResult } from 'express-validator';
interface WebhookConfig {
secret: string;
signatureHeader: string;
timestampTolerance: number; // 秒
}
class WebhookReceiver {
constructor(private config: WebhookConfig) {}
/**
* 验证 Webhook 签名
*/
verifySignature(
payload: string,
signature: string
): boolean {
const expectedSignature = crypto
.createHmac('sha256', this.config.secret)
.update(payload)
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
/**
* 验证时间戳以防止重放攻击
*/
verifyTimestamp(timestamp: number): boolean {
const now = Date.now();
const diff = Math.abs(now - timestamp) / 1000;
return diff <= this.config.timestampTolerance;
}
/**
* Webhook 验证中间件
*/
createMiddleware() {
return async (
req: express.Request,
res: express.Response,
next: express.NextFunction
) => {
try {
const signature = req.headers[this.config.signatureHeader] as string;
const timestamp = parseInt(
req.headers['x-webhook-timestamp'] as string
);
if (!signature) {
return res.status(401).json({
error: '缺少签名'
});
}
// 验证时间戳
if (!this.verifyTimestamp(timestamp)) {
return res.status(401).json({
error: '无效的时间戳'
});
}
// 获取原始正文以验证签名
const payload = JSON.stringify(req.body);
// 验证签名
if (!this.verifySignature(payload, signature)) {
return res.status(401).json({
error: '无效的签名'
});
}
next();
} catch (error) {
console.error('Webhook 验证错误:', error);
res.status(500).json({
error: '验证失败'
});
}
};
}
}
// 设置 Express 应用
const app = express();
// 使用原始正文解析器进行签名验证
app.use(express.json({
verify: (req: any, res, buf) => {
req.rawBody = buf.toString();
}
}));
const receiver = new WebhookReceiver({
secret: process.env.WEBHOOK_SECRET!,
signatureHeader: 'x-webhook-signature',
timestampTolerance: 300 // 5 分钟
});
// Webhook 终端
app.post('/webhooks',
receiver.createMiddleware(),
[
body('id').isString(),
body('type').isString(),
body('data').isObject()
],
async (req, res) => {
// 验证请求
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { id, type, data } = req.body;
try {
// 处理 Webhook 事件
await processWebhookEvent(type, data);
// 立即响应
res.status(200).json({
received: true,
eventId: id
});
// 如果需要,异步处理
processEventAsync(type, data).catch(console.error);
} catch (error) {
console.error('Webhook 处理错误:', error);
res.status(500).json({
error: '处理失败'
});
}
}
);
async function processWebhookEvent(type: string, data: any): Promise<void> {
switch (type) {
case 'user.created':
await handleUserCreated(data);
break;
case 'payment.success':
await handlePaymentSuccess(data);
break;
default:
console.log(`未知事件类型: ${type}`);
}
}
async function processEventAsync(type: string, data: any): Promise<void> {
// 重的不需要阻塞响应的处理
}
async function handleUserCreated(data: any): Promise<void> {
console.log('用户创建:', data);
}
async function handlePaymentSuccess(data: any): Promise<void> {
console.log('支付成功:', data);
}
app.listen(3000, () => {
console.log('Webhook 接收器在端口 3000 上监听');
});
3. Bull 的 Webhook 队列
import Queue from 'bull';
import axios from 'axios';
interface WebhookJob {
endpoint: WebhookEndpoint;
event: WebhookEvent;
}
class WebhookQueue {
private queue: Queue.Queue<WebhookJob>;
constructor(redisUrl: string) {
this.queue = new Queue('webhooks', redisUrl, {
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100,
removeOnFail: 1000
}
});
this.setupProcessors();
this.setupEventHandlers();
}
private setupProcessors(): void {
// 处理 Webhook 投递
this.queue.process('delivery', 5, async (job) => {
const { endpoint, event } = job.data;
job.log(`向 ${endpoint.url} 投递 Webhook`);
const sender = new WebhookSender();
const attempts = await sender.send(endpoint, event);
return {
endpoint: endpoint.url,
attempts,
success: true
};
});
}
private setupEventHandlers(): void {
this.queue.on('completed', (job, result) => {
console.log(`Webhook 投递: ${job.id}`, result);
});
this.queue.on('failed', (job, err) => {
console.error(`Webhook 投递失败: ${job?.id}`, err);
});
this.queue.on('stalled', (job) => {
console.warn(`Webhook 投递停滞: ${job.id}`);
});
}
async enqueue(
endpoint: WebhookEndpoint,
event: WebhookEvent,
options?: Queue.JobOptions
): Promise<Queue.Job<WebhookJob>> {
return this.queue.add(
'delivery',
{ endpoint, event },
{
jobId: `${event.id}-${endpoint.url}`,
...options
}
);
}
async enqueueBatch(
endpoints: WebhookEndpoint[],
event: WebhookEvent
): Promise<Queue.Job<WebhookJob>[]> {
const jobs = endpoints.map(endpoint => ({
name: 'delivery',
data: { endpoint, event },
opts: {
jobId: `${event.id}-${endpoint.url}`
}
}));
return this.queue.addBulk(jobs);
}
async getJobStatus(jobId: string): Promise<any> {
const job = await this.queue.getJob(jobId);
if (!job) return null;
return {
id: job.id,
state: await job.getState(),
progress: job.progress(),
attempts: job.attemptsMade,
failedReason: job.failedReason,
finishedOn: job.finishedOn,
processedOn: job.processedOn
};
}
async retryFailed(jobId: string): Promise<void> {
const job = await this.queue.getJob(jobId);
if (!job) {
throw new Error('工作未找到');
}
await job.retry();
}
async pause(): Promise<void> {
await this.queue.pause();
}
async resume(): Promise<void> {
await this.queue.resume();
}
async close(): Promise<void> {
await this.queue.close();
}
}
// 使用
const webhookQueue = new WebhookQueue('redis://localhost:6379');
// 将单个 Webhook 入队
await webhookQueue.enqueue(endpoint, event, {
delay: 1000, // 延迟 1 秒
priority: 1
});
// 向多个终端入队
await webhookQueue.enqueueBatch(endpoints, event);
// 检查工作状态
const status = await webhookQueue.getJobStatus('job-id');
console.log('工作状态:', status);
4. Webhook 测试工具
import express from 'express';
import crypto from 'crypto';
class WebhookTester {
private app: express.Application;
private receivedEvents: WebhookEvent[] = [];
constructor() {
this.app = express();
this.setupTestEndpoint();
}
private setupTestEndpoint(): void {
this.app.use(express.json());
this.app.post('/test-webhook', (req, res) => {
const event = req.body;
// 如果提供,验证签名
const signature = req.headers['x-webhook-signature'] as string;
if (signature) {
// 在这里验证签名
}
// 存储接收到的事件
this.receivedEvents.push(event);
console.log('接收到 Webhook:', event);
// 根据测试场景响应
res.status(200).json({
received: true,
eventId: event.id
});
});
// 模拟失败的终端
this.app.post('/test-webhook/fail', (req, res) => {
const failureType = req.query.type;
switch (failureType) {
case 'timeout':
// 不响应(模拟超时)
break;
case 'server-error':
res.status(500).json({ error: '内部服务器错误' });
break;
case 'unauthorized':
res.status(401).json({ error: '未授权' });
break;
default:
res.status(400).json({ error: '请求错误' });
}
});
}
start(port: number): void {
this.app.listen(port, () => {
console.log(`Webhook 测试服务器在端口 ${port} 上运行`);
});
}
getReceivedEvents(): WebhookEvent[] {
return this.receivedEvents;
}
clearEvents(): void {
this.receivedEvents = [];
}
/**
* 创建模拟 Webhook 事件
*/
static createMockEvent(type: string, data: any): WebhookEvent {
return {
id: crypto.randomUUID(),
type,
timestamp: Date.now(),
data
};
}
}
// 测试
const tester = new WebhookTester();
tester.start(3001);
// 发送测试 Webhook
const mockEvent = WebhookTester.createMockEvent('user.created', {
userId: '123',
email: 'test@example.com'
});
const sender = new WebhookSender();
await sender.send(
{
url: 'http://localhost:3001/test-webhook',
secret: 'test-secret',
events: ['user.created'],
active: true
},
mockEvent
);
// 验证接收到的
const received = tester.getReceivedEvents();
console.log('接收到的事件:', received);
最佳实践
✅ 要做
- 使用 HMAC 签名进行验证
- 通过事件 ID 实现幂等性
- 快速返回 200 OK,异步处理
- 实现重试的指数退避
- 包括时间戳以防止重放攻击
- 使用队列系统确保可靠投递
- 记录所有投递尝试
- 提供 Webhook 测试工具
- 文档化 Webhook 负载模式
- 实现 Webhook 管理 UI
- 允许按事件类型过滤
- 支持 Webhook 版本控制
❌ 不要做
- 在 Webhooks 中发送敏感数据
- 跳过签名验证
- 用重处理阻塞响应
- 无限期重试
- 暴露内部错误细节
- 在生产中发送到本地主机的 Webhooks
- 忘记超时处理
- 跳过速率限制
安全清单
- [ ] 使用 HMAC 验证签名
- [ ] 检查时间戳以防止重放攻击
- [ ] 验证 SSL 证书
- [ ] 仅使用 HTTPS
- [ ] 实施速率限制
- [ ] 验证 Webhook URL
- [ ] 定期轮换密钥
- [ ] 记录安全事件
- [ ] 实施 IP 白名单(可选)
- [ ] 清理错误消息