Webhook测试技能Skill webhook-tester

Webhook测试技能用于本地测试webhook集成,通过隧道工具将本地服务器暴露到互联网,捕获和检查webhook负载,验证签名,测试重试机制,并提供调试和集成测试功能。关键词:webhook测试、本地调试、隧道工具、签名验证、重试测试、集成测试、安全验证。

测试 0 次安装 0 次浏览 更新于 3/11/2026

名称: webhook-tester 描述: 使用隧道、检查和调试工具在本地测试webhook集成。

Webhook测试技能

使用隧道、检查和调试工具在本地测试webhook集成。

说明

您是一个webhook测试专家。当调用时:

  1. 本地Webhook测试

    • 设置本地webhook接收器
    • 使用隧道将本地主机暴露到互联网
    • 捕获和检查webhook负载
    • 验证webhook签名
    • 测试重试机制
  2. 调试Webhook

    • 检查请求头和请求体
    • 验证webhook签名
    • 测试不同的负载格式
    • 模拟webhook失败
    • 记录和重放webhook
  3. 集成测试

    • 测试webhook交付
    • 验证幂等性
    • 测试重试逻辑
    • 验证错误处理
    • 性能测试
  4. 安全验证

    • 验证签名验证
    • 测试HTTPS要求
    • 验证来源检查
    • 测试重放攻击预防

使用示例

@webhook-tester
@webhook-tester --setup-tunnel
@webhook-tester --inspect
@webhook-tester --verify-signature
@webhook-tester --replay

隧道工具

ngrok(最流行)

基本设置

# 安装ngrok
# 从 https://ngrok.com/download 下载
# 或使用包管理器
brew install ngrok/ngrok/ngrok  # macOS
choco install ngrok             # Windows

# 认证(从ngrok.com获取令牌)
ngrok config add-authtoken 您的令牌

# 启动到localhost:3000的隧道
ngrok http 3000

# 自定义子域名(需要付费计划)
ngrok http 3000 --subdomain=myapp

# 多个端口
ngrok http 3000 3001

# 使用特定区域
ngrok http 3000 --region=us

# 启用检查UI
ngrok http 3000 --inspect=true

ngrok配置文件

# ~/.ngrok2/ngrok.yml
版本: "2"
认证令牌: 您的令牌

隧道:
  api:
    地址: 3000
    协议: http
    子域名: myapi

  webhooks:
    地址: 4000
    协议: http
    子域名: webhooks

  web:
    地址: 8080
    协议: http
    绑定_tls: true

# 启动所有隧道
ngrok start --all

# 启动特定隧道
ngrok start api

ngrok API

// 以编程方式使用ngrok
const ngrok = require('ngrok');

async function startTunnel() {
  const url = await ngrok.connect({
    addr: 3000,
    region: 'us',
    onStatusChange: status => console.log('状态:', status)
  });

  console.log('隧道URL:', url);
  // 使用此URL作为webhook端点
  return url;
}

// 清理
async function stopTunnel() {
  await ngrok.disconnect();
  await ngrok.kill();
}

Cloudflare隧道(免费,无需账户)

# 安装
brew install cloudflare/cloudflare/cloudflared  # macOS
# 或从cloudflare.com下载

# 快速隧道(无需认证)
cloudflared tunnel --url http://localhost:3000

# 输出将是: https://随机-单词.trycloudflare.com

localtunnel

# 安装
npm install -g localtunnel

# 启动隧道
lt --port 3000

# 自定义子域名(可能不可用)
lt --port 3000 --subdomain myapp

# 以编程方式使用localtunnel
const localtunnel = require('localtunnel');

const tunnel = await localtunnel({ port: 3000 });
console.log('隧道URL:', tunnel.url);

tunnel.on('close', () => {
  console.log('隧道已关闭');
});

VS Code端口转发

# 在VS Code中,使用GitHub账户
# 1. 打开终端
# 2. 点击“端口”选项卡
# 3. 点击“转发端口”
# 4. 输入端口号(例如,3000)
# 5. 分享公共URL

Webhook接收器设置

Express.js Webhook端点

const express = require('express');
const crypto = require('crypto');

const app = express();

// 用于签名验证的原始请求体解析器
app.use(express.json({
  verify: (req, res, buf) => {
    req.rawBody = buf.toString();
  }
}));

// Webhook端点
app.post('/webhooks/github', (req, res) => {
  console.log('收到来自GitHub的webhook');
  console.log('请求头:', req.headers);
  console.log('请求体:', req.body);

  // 验证签名
  const signature = req.headers['x-hub-signature-256'];
  const secret = process.env.WEBHOOK_SECRET;

  if (!verifyGitHubSignature(req.rawBody, signature, secret)) {
    console.error('无效签名');
    return res.status(401).send('无效签名');
  }

  // 处理webhook
  const event = req.headers['x-github-event'];
  handleGitHubEvent(event, req.body);

  // 总是快速响应(GitHub期望在10秒内响应)
  res.status(200).send('OK');
});

function verifyGitHubSignature(payload, signature, secret) {
  if (!signature) return false;

  const hmac = crypto.createHmac('sha256', secret);
  const digest = 'sha256=' + hmac.update(payload).digest('hex');

  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(digest)
  );
}

function handleGitHubEvent(event, payload) {
  switch (event) {
    case 'push':
      console.log('推送事件:', payload.ref);
      break;
    case 'pull_request':
      console.log('PR事件:', payload.action);
      break;
    default:
      console.log('未处理事件:', event);
  }
}

// Stripe webhook
app.post('/webhooks/stripe', (req, res) => {
  const stripe = require('stripe')(process.env.STRIPE_SECRET_KEY);
  const sig = req.headers['stripe-signature'];

  let event;
  try {
    event = stripe.webhooks.constructEvent(
      req.rawBody,
      sig,
      process.env.STRIPE_WEBHOOK_SECRET
    );
  } catch (err) {
    console.error('Webhook签名验证失败:', err.message);
    return res.status(400).send(`Webhook错误: ${err.message}`);
  }

  // 处理事件
  switch (event.type) {
    case 'payment_intent.succeeded':
      const paymentIntent = event.data.object;
      console.log('PaymentIntent成功:', paymentIntent.id);
      break;
    case 'payment_intent.failed':
      console.log('PaymentIntent失败');
      break;
    default:
      console.log(`未处理事件类型 ${event.type}`);
  }

  res.json({ received: true });
});

// 通用webhook记录器
app.post('/webhooks/:service', (req, res) => {
  const { service } = req.params;

  console.log(`
${'='.repeat(50)}`);
  console.log(`Webhook已收到: ${service}`);
  console.log(`时间戳: ${new Date().toISOString()}`);
  console.log(`${'='.repeat(50)}`);

  console.log('
请求头:');
  Object.entries(req.headers).forEach(([key, value]) => {
    console.log(`  ${key}: ${value}`);
  });

  console.log('
请求体:');
  console.log(JSON.stringify(req.body, null, 2));

  console.log(`${'='.repeat(50)}
`);

  res.status(200).json({ received: true });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Webhook接收器监听端口 ${PORT}`);
});

Python Flask Webhook接收器

from flask import Flask, request, jsonify
import hmac
import hashlib
import os

app = Flask(__name__)

@app.route('/webhooks/github', methods=['POST'])
def github_webhook():
    # 验证签名
    signature = request.headers.get('X-Hub-Signature-256')
    secret = os.getenv('WEBHOOK_SECRET')

    if not verify_github_signature(request.data, signature, secret):
        return '无效签名', 401

    event = request.headers.get('X-GitHub-Event')
    payload = request.json

    print(f'收到 {event} 事件')
    print(f'负载: {payload}')

    # 处理事件
    handle_github_event(event, payload)

    return 'OK', 200

def verify_github_signature(payload, signature, secret):
    if not signature:
        return False

    mac = hmac.new(
        secret.encode(),
        msg=payload,
        digestmod=hashlib.sha256
    )
    expected = 'sha256=' + mac.hexdigest()

    return hmac.compare_digest(expected, signature)

def handle_github_event(event, payload):
    if event == 'push':
        print(f"推送到 {payload['ref']}")
    elif event == 'pull_request':
        print(f"PR {payload['action']}")

@app.route('/webhooks/<service>', methods=['POST'])
def generic_webhook(service):
    print(f'
{"=" * 50}')
    print(f'Webhook已收到: {service}')
    print(f'{"=" * 50}')

    print('
请求头:')
    for key, value in request.headers:
        print(f'  {key}: {value}')

    print('
请求体:')
    print(request.get_data(as_text=True))

    return jsonify({'received': True}), 200

if __name__ == '__main__':
    app.run(port=3000)

Webhook测试工具

webhook.site(在线工具)

# 1. 访问 https://webhook.site
# 2. 获取唯一URL(例如,https://webhook.site/abc-123)
# 3. 使用此URL作为webhook端点
# 4. 实时查看所有传入请求

# 功能:
# - 每个会话唯一URL
# - 查看请求头和请求体
# - 自定义响应配置
# - 请求历史
# - 与团队分享URL

Postman

// 1. 在Postman中创建模拟服务器
// 2. 添加webhook端点
// 3. 配置响应
// 4. 使用模拟URL作为webhook端点

// 示例模拟服务器响应
{
  "statusCode": 200,
  "body": {
    "received": true,
    "timestamp": "{{$timestamp}}"
  }
}

Webhook测试CLI

// webhook-cli.js
const express = require('express');
const chalk = require('chalk');

class WebhookTester {
  constructor(port = 3000) {
    this.app = express();
    this.port = port;
    this.requests = [];

    this.setupMiddleware();
    this.setupRoutes();
  }

  setupMiddleware() {
    this.app.use(express.json({
      verify: (req, res, buf) => {
        req.rawBody = buf.toString();
      }
    }));
  }

  setupRoutes() {
    // 捕获所有webhook请求
    this.app.all('/webhooks/*', (req, res) => {
      const webhook = {
        timestamp: new Date().toISOString(),
        method: req.method,
        path: req.path,
        headers: req.headers,
        body: req.body,
        query: req.query
      };

      this.requests.push(webhook);
      this.logWebhook(webhook);

      res.status(200).json({ received: true });
    });
  }

  logWebhook(webhook) {
    console.log(chalk.blue('
' + '='.repeat(60)));
    console.log(chalk.green('Webhook已收到'));
    console.log(chalk.blue('='.repeat(60)));

    console.log(chalk.yellow('
时间戳:'), webhook.timestamp);
    console.log(chalk.yellow('方法:'), webhook.method);
    console.log(chalk.yellow('路径:'), webhook.path);

    console.log(chalk.yellow('
请求头:'));
    Object.entries(webhook.headers).forEach(([key, value]) => {
      console.log(`  ${chalk.gray(key)}: ${value}`);
    });

    if (Object.keys(webhook.query).length > 0) {
      console.log(chalk.yellow('
查询:'));
      console.log(JSON.stringify(webhook.query, null, 2));
    }

    console.log(chalk.yellow('
请求体:'));
    console.log(JSON.stringify(webhook.body, null, 2));

    console.log(chalk.blue('='.repeat(60) + '
'));
  }

  start() {
    this.app.listen(this.port, () => {
      console.log(chalk.green(`
Webhook测试器运行在 http://localhost:${this.port}`));
      console.log(chalk.gray('等待webhook...
'));
    });
  }

  getHistory() {
    return this.requests;
  }

  clearHistory() {
    this.requests = [];
    console.log(chalk.yellow('历史已清除'));
  }
}

// 使用
const tester = new WebhookTester(3000);
tester.start();

测试Webhook签名

GitHub Webhook签名

const crypto = require('crypto');

function verifyGitHubWebhook(payload, signature, secret) {
  if (!signature || !signature.startsWith('sha256=')) {
    return false;
  }

  const hmac = crypto.createHmac('sha256', secret);
  const digest = 'sha256=' + hmac.update(payload).digest('hex');

  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(digest)
  );
}

// 测试
const payload = JSON.stringify({ test: 'data' });
const secret = 'my-webhook-secret';
const signature = 'sha256=' + crypto
  .createHmac('sha256', secret)
  .update(payload)
  .digest('hex');

console.log('有效:', verifyGitHubWebhook(payload, signature, secret));

Stripe Webhook签名

const stripe = require('stripe')('sk_test_...');

app.post('/webhooks/stripe', async (req, res) => {
  const sig = req.headers['stripe-signature'];
  const webhookSecret = process.env.STRIPE_WEBHOOK_SECRET;

  let event;

  try {
    event = stripe.webhooks.constructEvent(
      req.rawBody,
      sig,
      webhookSecret
    );
  } catch (err) {
    console.error('Webhook签名验证失败:', err.message);
    return res.status(400).send(`Webhook错误: ${err.message}`);
  }

  // 处理事件
  console.log('事件:', event.type);
  res.json({ received: true });
});

Shopify HMAC验证

const crypto = require('crypto');

function verifyShopifyWebhook(body, hmacHeader, secret) {
  const hash = crypto
    .createHmac('sha256', secret)
    .update(body, 'utf8')
    .digest('base64');

  return crypto.timingSafeEqual(
    Buffer.from(hash),
    Buffer.from(hmacHeader)
  );
}

app.post('/webhooks/shopify', (req, res) => {
  const hmac = req.headers['x-shopify-hmac-sha256'];
  const secret = process.env.SHOPIFY_SECRET;

  if (!verifyShopifyWebhook(req.rawBody, hmac, secret)) {
    return res.status(401).send('无效签名');
  }

  res.status(200).send('OK');
});

自动化Webhook测试

Jest测试

const request = require('supertest');
const app = require('./app');
const crypto = require('crypto');

describe('Webhook测试', () => {
  const webhookSecret = 'test-secret';

  function generateSignature(payload) {
    return 'sha256=' + crypto
      .createHmac('sha256', webhookSecret)
      .update(JSON.stringify(payload))
      .digest('hex');
  }

  describe('POST /webhooks/github', () => {
    test('应该接受有效的webhook', async () => {
      const payload = {
        ref: 'refs/heads/main',
        commits: []
      };

      const signature = generateSignature(payload);

      const response = await request(app)
        .post('/webhooks/github')
        .set('X-Hub-Signature-256', signature)
        .set('X-GitHub-Event', 'push')
        .send(payload);

      expect(response.status).toBe(200);
    });

    test('应该拒绝无效签名', async () => {
      const payload = { test: 'data' };

      const response = await request(app)
        .post('/webhooks/github')
        .set('X-Hub-Signature-256', 'invalid')
        .set('X-GitHub-Event', 'push')
        .send(payload);

      expect(response.status).toBe(401);
    });

    test('应该拒绝缺少签名', async () => {
      const payload = { test: 'data' };

      const response = await request(app)
        .post('/webhooks/github')
        .set('X-GitHub-Event', 'push')
        .send(payload);

      expect(response.status).toBe(401);
    });
  });
});

Webhook重放和调试

请求存储

const fs = require('fs').promises;
const path = require('path');

class WebhookStorage {
  constructor(storageDir = './webhooks') {
    this.storageDir = storageDir;
  }

  async saveWebhook(webhook) {
    const filename = `${Date.now()}-${webhook.path.replace(/\//g, '-')}.json`;
    const filepath = path.join(this.storageDir, filename);

    await fs.mkdir(this.storageDir, { recursive: true });
    await fs.writeFile(filepath, JSON.stringify(webhook, null, 2));

    console.log('Webhook已保存:', filepath);
  }

  async loadWebhook(filename) {
    const filepath = path.join(this.storageDir, filename);
    const content = await fs.readFile(filepath, 'utf8');
    return JSON.parse(content);
  }

  async replayWebhook(filename) {
    const webhook = await this.loadWebhook(filename);

    const response = await fetch(`http://localhost:3000${webhook.path}`, {
      method: webhook.method,
      headers: webhook.headers,
      body: JSON.stringify(webhook.body)
    });

    console.log('已重放webhook:', filename);
    console.log('响应:', response.status);
  }
}

Webhook重试测试

重试模拟

app.post('/webhooks/test-retry', async (req, res) => {
  const attemptNumber = parseInt(req.headers['x-attempt'] || '1');
  const maxAttempts = 3;

  console.log(`尝试 ${attemptNumber}/${maxAttempts}`);

  // 前两次尝试失败
  if (attemptNumber < maxAttempts) {
    console.log('模拟失败');
    return res.status(500).send('临时错误');
  }

  console.log('最后一次尝试成功');
  res.status(200).send('OK');
});

// 重试逻辑(发送端)
async function sendWebhookWithRetry(url, payload, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'X-Attempt': attempt.toString()
        },
        body: JSON.stringify(payload)
      });

      if (response.ok) {
        console.log(`Webhook在第 ${attempt} 次尝试时交付`);
        return response;
      }

      console.log(`尝试 ${attempt} 失败: ${response.status}`);
    } catch (error) {
      console.log(`尝试 ${attempt} 错误:`, error.message);
    }

    // 指数退避
    if (attempt < maxRetries) {
      const delay = Math.pow(2, attempt) * 1000;
      console.log(`等待 ${delay}ms 后重试...`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }

  throw new Error('所有webhook交付尝试失败');
}

最佳实践

Webhook接收器

  • 快速响应(在10秒内)
  • 总是对有效请求返回200
  • 异步处理webhook
  • 实现幂等性
  • 验证签名
  • 记录所有webhook用于调试

安全

  • 总是验证webhook签名
  • 使用HTTPS端点
  • 验证webhook来源
  • 实现速率限制
  • 安全存储秘密
  • 检查重放攻击

错误处理

  • 优雅地处理缺失/无效签名
  • 记录所有错误
  • 实现带指数退避的重试逻辑
  • 对重复失败发出警报
  • 监控webhook健康状态

测试

  • 测试签名验证
  • 模拟失败和重试
  • 测试幂等性
  • 验证错误处理
  • 负载测试webhook端点
  • 使用真实负载测试

常见Webhook提供商

GitHub

签名头: X-Hub-Signature-256
事件头: X-GitHub-Event
算法: HMAC SHA-256

Stripe

签名头: Stripe-Signature
算法: HMAC SHA-256(特殊格式)
测试模式: 使用Stripe CLI

Shopify

签名头: X-Shopify-Hmac-SHA256
算法: HMAC SHA-256(base64)
主题头: X-Shopify-Topic

Twilio

签名头: X-Twilio-Signature
算法: HMAC SHA-1
验证: 特殊URL + 参数

Slack

签名头: X-Slack-Signature
时间戳头: X-Slack-Request-Timestamp
算法: HMAC SHA-256

注释

  • 使用隧道工具(ngrok、cloudflared)进行本地测试
  • 在生产中总是验证webhook签名
  • 快速响应webhook(< 10秒)
  • 异步处理webhook
  • 使用webhook ID实现幂等性
  • 记录所有webhook用于调试
  • 测试重试机制
  • 监控webhook交付失败
  • 在开发中使用webhook测试工具
  • 安全存储webhook秘密
  • 实现适当的错误处理
  • 使用提供商真实负载测试