名称: webhook-tester 描述: 使用隧道、检查和调试工具在本地测试webhook集成。
Webhook测试技能
使用隧道、检查和调试工具在本地测试webhook集成。
说明
您是一个webhook测试专家。当调用时:
-
本地Webhook测试:
- 设置本地webhook接收器
- 使用隧道将本地主机暴露到互联网
- 捕获和检查webhook负载
- 验证webhook签名
- 测试重试机制
-
调试Webhook:
- 检查请求头和请求体
- 验证webhook签名
- 测试不同的负载格式
- 模拟webhook失败
- 记录和重放webhook
-
集成测试:
- 测试webhook交付
- 验证幂等性
- 测试重试逻辑
- 验证错误处理
- 性能测试
-
安全验证:
- 验证签名验证
- 测试HTTPS要求
- 验证来源检查
- 测试重放攻击预防
使用示例
@webhook-tester
@webhook-tester --setup-tunnel
@webhook-tester --inspect
@webhook-tester --verify-signature
@webhook-tester --replay
隧道工具
ngrok(最流行)
基本设置
# 安装ngrok
# 从 https://ngrok.com/download 下载
# 或使用包管理器
brew install ngrok/ngrok/ngrok # macOS
choco install ngrok # Windows
# 认证(从ngrok.com获取令牌)
ngrok config add-authtoken 您的令牌
# 启动到localhost:3000的隧道
ngrok http 3000
# 自定义子域名(需要付费计划)
ngrok http 3000 --subdomain=myapp
# 多个端口
ngrok http 3000 3001
# 使用特定区域
ngrok http 3000 --region=us
# 启用检查UI
ngrok http 3000 --inspect=true
ngrok配置文件
# ~/.ngrok2/ngrok.yml
版本: "2"
认证令牌: 您的令牌
隧道:
api:
地址: 3000
协议: http
子域名: myapi
webhooks:
地址: 4000
协议: http
子域名: webhooks
web:
地址: 8080
协议: http
绑定_tls: true
# 启动所有隧道
ngrok start --all
# 启动特定隧道
ngrok start api
ngrok API
// 以编程方式使用ngrok
const ngrok = require('ngrok');
async function startTunnel() {
const url = await ngrok.connect({
addr: 3000,
region: 'us',
onStatusChange: status => console.log('状态:', status)
});
console.log('隧道URL:', url);
// 使用此URL作为webhook端点
return url;
}
// 清理
async function stopTunnel() {
await ngrok.disconnect();
await ngrok.kill();
}
Cloudflare隧道(免费,无需账户)
# 安装
brew install cloudflare/cloudflare/cloudflared # macOS
# 或从cloudflare.com下载
# 快速隧道(无需认证)
cloudflared tunnel --url http://localhost:3000
# 输出将是: https://随机-单词.trycloudflare.com
localtunnel
# 安装
npm install -g localtunnel
# 启动隧道
lt --port 3000
# 自定义子域名(可能不可用)
lt --port 3000 --subdomain myapp
# 以编程方式使用localtunnel
const localtunnel = require('localtunnel');
const tunnel = await localtunnel({ port: 3000 });
console.log('隧道URL:', tunnel.url);
tunnel.on('close', () => {
console.log('隧道已关闭');
});
VS Code端口转发
# 在VS Code中,使用GitHub账户
# 1. 打开终端
# 2. 点击“端口”选项卡
# 3. 点击“转发端口”
# 4. 输入端口号(例如,3000)
# 5. 分享公共URL
Webhook接收器设置
Express.js Webhook端点
const express = require('express');
const crypto = require('crypto');
const app = express();
// 用于签名验证的原始请求体解析器
app.use(express.json({
verify: (req, res, buf) => {
req.rawBody = buf.toString();
}
}));
// Webhook端点
app.post('/webhooks/github', (req, res) => {
console.log('收到来自GitHub的webhook');
console.log('请求头:', req.headers);
console.log('请求体:', req.body);
// 验证签名
const signature = req.headers['x-hub-signature-256'];
const secret = process.env.WEBHOOK_SECRET;
if (!verifyGitHubSignature(req.rawBody, signature, secret)) {
console.error('无效签名');
return res.status(401).send('无效签名');
}
// 处理webhook
const event = req.headers['x-github-event'];
handleGitHubEvent(event, req.body);
// 总是快速响应(GitHub期望在10秒内响应)
res.status(200).send('OK');
});
function verifyGitHubSignature(payload, signature, secret) {
if (!signature) return false;
const hmac = crypto.createHmac('sha256', secret);
const digest = 'sha256=' + hmac.update(payload).digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(digest)
);
}
function handleGitHubEvent(event, payload) {
switch (event) {
case 'push':
console.log('推送事件:', payload.ref);
break;
case 'pull_request':
console.log('PR事件:', payload.action);
break;
default:
console.log('未处理事件:', event);
}
}
// Stripe webhook
app.post('/webhooks/stripe', (req, res) => {
const stripe = require('stripe')(process.env.STRIPE_SECRET_KEY);
const sig = req.headers['stripe-signature'];
let event;
try {
event = stripe.webhooks.constructEvent(
req.rawBody,
sig,
process.env.STRIPE_WEBHOOK_SECRET
);
} catch (err) {
console.error('Webhook签名验证失败:', err.message);
return res.status(400).send(`Webhook错误: ${err.message}`);
}
// 处理事件
switch (event.type) {
case 'payment_intent.succeeded':
const paymentIntent = event.data.object;
console.log('PaymentIntent成功:', paymentIntent.id);
break;
case 'payment_intent.failed':
console.log('PaymentIntent失败');
break;
default:
console.log(`未处理事件类型 ${event.type}`);
}
res.json({ received: true });
});
// 通用webhook记录器
app.post('/webhooks/:service', (req, res) => {
const { service } = req.params;
console.log(`
${'='.repeat(50)}`);
console.log(`Webhook已收到: ${service}`);
console.log(`时间戳: ${new Date().toISOString()}`);
console.log(`${'='.repeat(50)}`);
console.log('
请求头:');
Object.entries(req.headers).forEach(([key, value]) => {
console.log(` ${key}: ${value}`);
});
console.log('
请求体:');
console.log(JSON.stringify(req.body, null, 2));
console.log(`${'='.repeat(50)}
`);
res.status(200).json({ received: true });
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Webhook接收器监听端口 ${PORT}`);
});
Python Flask Webhook接收器
from flask import Flask, request, jsonify
import hmac
import hashlib
import os
app = Flask(__name__)
@app.route('/webhooks/github', methods=['POST'])
def github_webhook():
# 验证签名
signature = request.headers.get('X-Hub-Signature-256')
secret = os.getenv('WEBHOOK_SECRET')
if not verify_github_signature(request.data, signature, secret):
return '无效签名', 401
event = request.headers.get('X-GitHub-Event')
payload = request.json
print(f'收到 {event} 事件')
print(f'负载: {payload}')
# 处理事件
handle_github_event(event, payload)
return 'OK', 200
def verify_github_signature(payload, signature, secret):
if not signature:
return False
mac = hmac.new(
secret.encode(),
msg=payload,
digestmod=hashlib.sha256
)
expected = 'sha256=' + mac.hexdigest()
return hmac.compare_digest(expected, signature)
def handle_github_event(event, payload):
if event == 'push':
print(f"推送到 {payload['ref']}")
elif event == 'pull_request':
print(f"PR {payload['action']}")
@app.route('/webhooks/<service>', methods=['POST'])
def generic_webhook(service):
print(f'
{"=" * 50}')
print(f'Webhook已收到: {service}')
print(f'{"=" * 50}')
print('
请求头:')
for key, value in request.headers:
print(f' {key}: {value}')
print('
请求体:')
print(request.get_data(as_text=True))
return jsonify({'received': True}), 200
if __name__ == '__main__':
app.run(port=3000)
Webhook测试工具
webhook.site(在线工具)
# 1. 访问 https://webhook.site
# 2. 获取唯一URL(例如,https://webhook.site/abc-123)
# 3. 使用此URL作为webhook端点
# 4. 实时查看所有传入请求
# 功能:
# - 每个会话唯一URL
# - 查看请求头和请求体
# - 自定义响应配置
# - 请求历史
# - 与团队分享URL
Postman
// 1. 在Postman中创建模拟服务器
// 2. 添加webhook端点
// 3. 配置响应
// 4. 使用模拟URL作为webhook端点
// 示例模拟服务器响应
{
"statusCode": 200,
"body": {
"received": true,
"timestamp": "{{$timestamp}}"
}
}
Webhook测试CLI
// webhook-cli.js
const express = require('express');
const chalk = require('chalk');
class WebhookTester {
constructor(port = 3000) {
this.app = express();
this.port = port;
this.requests = [];
this.setupMiddleware();
this.setupRoutes();
}
setupMiddleware() {
this.app.use(express.json({
verify: (req, res, buf) => {
req.rawBody = buf.toString();
}
}));
}
setupRoutes() {
// 捕获所有webhook请求
this.app.all('/webhooks/*', (req, res) => {
const webhook = {
timestamp: new Date().toISOString(),
method: req.method,
path: req.path,
headers: req.headers,
body: req.body,
query: req.query
};
this.requests.push(webhook);
this.logWebhook(webhook);
res.status(200).json({ received: true });
});
}
logWebhook(webhook) {
console.log(chalk.blue('
' + '='.repeat(60)));
console.log(chalk.green('Webhook已收到'));
console.log(chalk.blue('='.repeat(60)));
console.log(chalk.yellow('
时间戳:'), webhook.timestamp);
console.log(chalk.yellow('方法:'), webhook.method);
console.log(chalk.yellow('路径:'), webhook.path);
console.log(chalk.yellow('
请求头:'));
Object.entries(webhook.headers).forEach(([key, value]) => {
console.log(` ${chalk.gray(key)}: ${value}`);
});
if (Object.keys(webhook.query).length > 0) {
console.log(chalk.yellow('
查询:'));
console.log(JSON.stringify(webhook.query, null, 2));
}
console.log(chalk.yellow('
请求体:'));
console.log(JSON.stringify(webhook.body, null, 2));
console.log(chalk.blue('='.repeat(60) + '
'));
}
start() {
this.app.listen(this.port, () => {
console.log(chalk.green(`
Webhook测试器运行在 http://localhost:${this.port}`));
console.log(chalk.gray('等待webhook...
'));
});
}
getHistory() {
return this.requests;
}
clearHistory() {
this.requests = [];
console.log(chalk.yellow('历史已清除'));
}
}
// 使用
const tester = new WebhookTester(3000);
tester.start();
测试Webhook签名
GitHub Webhook签名
const crypto = require('crypto');
function verifyGitHubWebhook(payload, signature, secret) {
if (!signature || !signature.startsWith('sha256=')) {
return false;
}
const hmac = crypto.createHmac('sha256', secret);
const digest = 'sha256=' + hmac.update(payload).digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(digest)
);
}
// 测试
const payload = JSON.stringify({ test: 'data' });
const secret = 'my-webhook-secret';
const signature = 'sha256=' + crypto
.createHmac('sha256', secret)
.update(payload)
.digest('hex');
console.log('有效:', verifyGitHubWebhook(payload, signature, secret));
Stripe Webhook签名
const stripe = require('stripe')('sk_test_...');
app.post('/webhooks/stripe', async (req, res) => {
const sig = req.headers['stripe-signature'];
const webhookSecret = process.env.STRIPE_WEBHOOK_SECRET;
let event;
try {
event = stripe.webhooks.constructEvent(
req.rawBody,
sig,
webhookSecret
);
} catch (err) {
console.error('Webhook签名验证失败:', err.message);
return res.status(400).send(`Webhook错误: ${err.message}`);
}
// 处理事件
console.log('事件:', event.type);
res.json({ received: true });
});
Shopify HMAC验证
const crypto = require('crypto');
function verifyShopifyWebhook(body, hmacHeader, secret) {
const hash = crypto
.createHmac('sha256', secret)
.update(body, 'utf8')
.digest('base64');
return crypto.timingSafeEqual(
Buffer.from(hash),
Buffer.from(hmacHeader)
);
}
app.post('/webhooks/shopify', (req, res) => {
const hmac = req.headers['x-shopify-hmac-sha256'];
const secret = process.env.SHOPIFY_SECRET;
if (!verifyShopifyWebhook(req.rawBody, hmac, secret)) {
return res.status(401).send('无效签名');
}
res.status(200).send('OK');
});
自动化Webhook测试
Jest测试
const request = require('supertest');
const app = require('./app');
const crypto = require('crypto');
describe('Webhook测试', () => {
const webhookSecret = 'test-secret';
function generateSignature(payload) {
return 'sha256=' + crypto
.createHmac('sha256', webhookSecret)
.update(JSON.stringify(payload))
.digest('hex');
}
describe('POST /webhooks/github', () => {
test('应该接受有效的webhook', async () => {
const payload = {
ref: 'refs/heads/main',
commits: []
};
const signature = generateSignature(payload);
const response = await request(app)
.post('/webhooks/github')
.set('X-Hub-Signature-256', signature)
.set('X-GitHub-Event', 'push')
.send(payload);
expect(response.status).toBe(200);
});
test('应该拒绝无效签名', async () => {
const payload = { test: 'data' };
const response = await request(app)
.post('/webhooks/github')
.set('X-Hub-Signature-256', 'invalid')
.set('X-GitHub-Event', 'push')
.send(payload);
expect(response.status).toBe(401);
});
test('应该拒绝缺少签名', async () => {
const payload = { test: 'data' };
const response = await request(app)
.post('/webhooks/github')
.set('X-GitHub-Event', 'push')
.send(payload);
expect(response.status).toBe(401);
});
});
});
Webhook重放和调试
请求存储
const fs = require('fs').promises;
const path = require('path');
class WebhookStorage {
constructor(storageDir = './webhooks') {
this.storageDir = storageDir;
}
async saveWebhook(webhook) {
const filename = `${Date.now()}-${webhook.path.replace(/\//g, '-')}.json`;
const filepath = path.join(this.storageDir, filename);
await fs.mkdir(this.storageDir, { recursive: true });
await fs.writeFile(filepath, JSON.stringify(webhook, null, 2));
console.log('Webhook已保存:', filepath);
}
async loadWebhook(filename) {
const filepath = path.join(this.storageDir, filename);
const content = await fs.readFile(filepath, 'utf8');
return JSON.parse(content);
}
async replayWebhook(filename) {
const webhook = await this.loadWebhook(filename);
const response = await fetch(`http://localhost:3000${webhook.path}`, {
method: webhook.method,
headers: webhook.headers,
body: JSON.stringify(webhook.body)
});
console.log('已重放webhook:', filename);
console.log('响应:', response.status);
}
}
Webhook重试测试
重试模拟
app.post('/webhooks/test-retry', async (req, res) => {
const attemptNumber = parseInt(req.headers['x-attempt'] || '1');
const maxAttempts = 3;
console.log(`尝试 ${attemptNumber}/${maxAttempts}`);
// 前两次尝试失败
if (attemptNumber < maxAttempts) {
console.log('模拟失败');
return res.status(500).send('临时错误');
}
console.log('最后一次尝试成功');
res.status(200).send('OK');
});
// 重试逻辑(发送端)
async function sendWebhookWithRetry(url, payload, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Attempt': attempt.toString()
},
body: JSON.stringify(payload)
});
if (response.ok) {
console.log(`Webhook在第 ${attempt} 次尝试时交付`);
return response;
}
console.log(`尝试 ${attempt} 失败: ${response.status}`);
} catch (error) {
console.log(`尝试 ${attempt} 错误:`, error.message);
}
// 指数退避
if (attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000;
console.log(`等待 ${delay}ms 后重试...`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error('所有webhook交付尝试失败');
}
最佳实践
Webhook接收器
- 快速响应(在10秒内)
- 总是对有效请求返回200
- 异步处理webhook
- 实现幂等性
- 验证签名
- 记录所有webhook用于调试
安全
- 总是验证webhook签名
- 使用HTTPS端点
- 验证webhook来源
- 实现速率限制
- 安全存储秘密
- 检查重放攻击
错误处理
- 优雅地处理缺失/无效签名
- 记录所有错误
- 实现带指数退避的重试逻辑
- 对重复失败发出警报
- 监控webhook健康状态
测试
- 测试签名验证
- 模拟失败和重试
- 测试幂等性
- 验证错误处理
- 负载测试webhook端点
- 使用真实负载测试
常见Webhook提供商
GitHub
签名头: X-Hub-Signature-256
事件头: X-GitHub-Event
算法: HMAC SHA-256
Stripe
签名头: Stripe-Signature
算法: HMAC SHA-256(特殊格式)
测试模式: 使用Stripe CLI
Shopify
签名头: X-Shopify-Hmac-SHA256
算法: HMAC SHA-256(base64)
主题头: X-Shopify-Topic
Twilio
签名头: X-Twilio-Signature
算法: HMAC SHA-1
验证: 特殊URL + 参数
Slack
签名头: X-Slack-Signature
时间戳头: X-Slack-Request-Timestamp
算法: HMAC SHA-256
注释
- 使用隧道工具(ngrok、cloudflared)进行本地测试
- 在生产中总是验证webhook签名
- 快速响应webhook(< 10秒)
- 异步处理webhook
- 使用webhook ID实现幂等性
- 记录所有webhook用于调试
- 测试重试机制
- 监控webhook交付失败
- 在开发中使用webhook测试工具
- 安全存储webhook秘密
- 实现适当的错误处理
- 使用提供商真实负载测试