事件广播技能Skill broadcast-event

这个技能设计WebSocket事件广播系统,用于AI开发者工作流的可观测性。它支持实时流式传输工作流事件到外部仪表板和监控系统,包括事件类型定义、消息格式、摘要策略、WebSocket服务器实现、客户端订阅和弹性模式。关键词:WebSocket、事件广播、AI工作流、可观测性、实时监控、仪表板。

后端开发 0 次安装 0 次浏览 更新于 3/11/2026

name: 广播事件 description: 为ADW可观测性设计WebSocket事件广播。用于将工作流事件流式传输到外部仪表板或监控系统。 argument-hint: <事件类型> [负载描述] allowed-tools: 读取, 写入, 全局搜索, 文本搜索

广播事件

为AI开发者工作流可观测性设计WebSocket事件广播。

参数

  • $ARGUMENTS: <事件类型> [负载描述]
    • 事件类型: 要广播的事件类型(例如,ToolUseBlock, StepComplete
    • 负载描述: 可选的负载结构描述

事件类型

事件类型 来源 负载
TextBlock 代理输出 文本内容
ToolUseBlock 工具调用 工具名称, 输入
ThinkingBlock 扩展思考 思考内容
StepStart 工作流步骤 步骤名称, 时间戳
StepEnd 工作流步骤 步骤名称, 状态
ADWComplete 工作流完成 最终状态, 指标

指令

步骤1: 定义消息格式

标准ADW事件结构:

{
  "type": "adw_event",
  "adw_id": "a1b2c3d4",
  "step": "build",
  "event_type": "ToolUseBlock",
  "timestamp": "2026-01-01T14:30:00Z",
  "summary": "Writing authentication middleware to src/auth.py",
  "payload": {
    "tool_name": "Write",
    "file_path": "src/auth.py",
    "content_preview": "class AuthMiddleware:..."
  }
}

步骤2: 设计摘要策略

使用Haiku进行快速、经济的摘要:

提示模板:

在15个词或更少内为开发者仪表板摘要此{事件类型}:

工具:{工具名称}
输入:{工具输入预览}

摘要:

要摘要的事件:

  • ToolUseBlock: 摘要工具操作
  • TextBlock: 摘要内容(如果较长)
  • ThinkingBlock: 摘要推理

直接传递的事件:

  • StepStart: 使用固定格式
  • StepEnd: 使用固定格式
  • ADWComplete: 使用固定格式

步骤3: 设计WebSocket服务器

服务器规范:

# adws/websocket_server.py
from fastapi import FastAPI, WebSocket
from typing import Dict, Set
import asyncio

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, adw_id: str):
        await websocket.accept()
        if adw_id not in self.active_connections:
            self.active_connections[adw_id] = set()
        self.active_connections[adw_id].add(websocket)

    async def broadcast(self, adw_id: str, message: dict):
        if adw_id in self.active_connections:
            for connection in self.active_connections[adw_id]:
                await connection.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/{adw_id}")
async def websocket_endpoint(websocket: WebSocket, adw_id: str):
    await manager.connect(websocket, adw_id)
    try:
        while True:
            await websocket.receive_text()  # 保持连接
    except:
        manager.active_connections[adw_id].discard(websocket)

步骤4: 设计客户端订阅

客户端订阅消息:

{
  "action": "subscribe",
  "filters": {
    "adw_id": "a1b2c3d4",
    "steps": ["build", "review"],
    "event_types": ["ToolUseBlock", "StepEnd"]
  }
}

步骤5: 设计弹性模式

重连策略:

class ResilientWebSocket {
  constructor(url) {
    this.url = url;
    this.maxReconnectDelay = 30000;
    this.reconnectAttempts = 0;
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onclose = () => {
      const delay = Math.min(
        1000 * Math.pow(2, this.reconnectAttempts),
        this.maxReconnectDelay
      );
      setTimeout(() => this.connect(), delay);
      this.reconnectAttempts++;
    };

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;
    };
  }
}

心跳机制:

  • 间隔: 30秒
  • 超时: 90秒
  • 消息: {"type": "ping"}

输出

## 事件广播规范

**事件类型:** {事件类型}
**ADW上下文:** {adw_id}

### 消息格式

```json
{消息结构}

摘要

策略: {haiku/直接传递} 提示: {如果haiku}

服务器端点

URL: ws://localhost:8000/ws/{adw_id} 协议: WebSocket

客户端订阅

{订阅消息}

弹性

模式
重连策略 指数退避
最大延迟 30秒
最大尝试次数 10
心跳间隔 30秒

集成

钩子脚本通过HTTP POST广播到服务器:

import httpx

async def broadcast(event: dict):
    async with httpx.AsyncClient() as client:
        await client.post(
            f"http://localhost:8000/broadcast/{event['adw_id']}",
            json=event
        )

后续步骤

  1. 实现WebSocket服务器 (adws/websocket_server.py)
  2. 与钩子集成 (/configure-hooks)
  3. 构建泳道UI (swimlane-visualization 技能)
  4. 添加事件持久化(可选数据库日志记录)

## SDK注意

> **实现注意:** 完整的WebSocket集成需要生产后端。此命令提供规范;实现需要FastAPI/asyncio设置。

## 交叉引用

- @websocket-architecture.md - WebSocket模式
- @hook-event-patterns.md - 事件类型
- `event-broadcaster` 代理 - 广播设计
- `swimlane-visualization` 技能 - UI消费