name: 广播事件 description: 为ADW可观测性设计WebSocket事件广播。用于将工作流事件流式传输到外部仪表板或监控系统。 argument-hint: <事件类型> [负载描述] allowed-tools: 读取, 写入, 全局搜索, 文本搜索
广播事件
为AI开发者工作流可观测性设计WebSocket事件广播。
参数
$ARGUMENTS:<事件类型> [负载描述]事件类型: 要广播的事件类型(例如,ToolUseBlock,StepComplete)负载描述: 可选的负载结构描述
事件类型
| 事件类型 | 来源 | 负载 |
|---|---|---|
TextBlock |
代理输出 | 文本内容 |
ToolUseBlock |
工具调用 | 工具名称, 输入 |
ThinkingBlock |
扩展思考 | 思考内容 |
StepStart |
工作流步骤 | 步骤名称, 时间戳 |
StepEnd |
工作流步骤 | 步骤名称, 状态 |
ADWComplete |
工作流完成 | 最终状态, 指标 |
指令
步骤1: 定义消息格式
标准ADW事件结构:
{
"type": "adw_event",
"adw_id": "a1b2c3d4",
"step": "build",
"event_type": "ToolUseBlock",
"timestamp": "2026-01-01T14:30:00Z",
"summary": "Writing authentication middleware to src/auth.py",
"payload": {
"tool_name": "Write",
"file_path": "src/auth.py",
"content_preview": "class AuthMiddleware:..."
}
}
步骤2: 设计摘要策略
使用Haiku进行快速、经济的摘要:
提示模板:
在15个词或更少内为开发者仪表板摘要此{事件类型}:
工具:{工具名称}
输入:{工具输入预览}
摘要:
要摘要的事件:
ToolUseBlock: 摘要工具操作TextBlock: 摘要内容(如果较长)ThinkingBlock: 摘要推理
直接传递的事件:
StepStart: 使用固定格式StepEnd: 使用固定格式ADWComplete: 使用固定格式
步骤3: 设计WebSocket服务器
服务器规范:
# adws/websocket_server.py
from fastapi import FastAPI, WebSocket
from typing import Dict, Set
import asyncio
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, adw_id: str):
await websocket.accept()
if adw_id not in self.active_connections:
self.active_connections[adw_id] = set()
self.active_connections[adw_id].add(websocket)
async def broadcast(self, adw_id: str, message: dict):
if adw_id in self.active_connections:
for connection in self.active_connections[adw_id]:
await connection.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/{adw_id}")
async def websocket_endpoint(websocket: WebSocket, adw_id: str):
await manager.connect(websocket, adw_id)
try:
while True:
await websocket.receive_text() # 保持连接
except:
manager.active_connections[adw_id].discard(websocket)
步骤4: 设计客户端订阅
客户端订阅消息:
{
"action": "subscribe",
"filters": {
"adw_id": "a1b2c3d4",
"steps": ["build", "review"],
"event_types": ["ToolUseBlock", "StepEnd"]
}
}
步骤5: 设计弹性模式
重连策略:
class ResilientWebSocket {
constructor(url) {
this.url = url;
this.maxReconnectDelay = 30000;
this.reconnectAttempts = 0;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onclose = () => {
const delay = Math.min(
1000 * Math.pow(2, this.reconnectAttempts),
this.maxReconnectDelay
);
setTimeout(() => this.connect(), delay);
this.reconnectAttempts++;
};
this.ws.onopen = () => {
this.reconnectAttempts = 0;
};
}
}
心跳机制:
- 间隔: 30秒
- 超时: 90秒
- 消息:
{"type": "ping"}
输出
## 事件广播规范
**事件类型:** {事件类型}
**ADW上下文:** {adw_id}
### 消息格式
```json
{消息结构}
摘要
策略: {haiku/直接传递} 提示: {如果haiku}
服务器端点
URL: ws://localhost:8000/ws/{adw_id}
协议: WebSocket
客户端订阅
{订阅消息}
弹性
| 模式 | 值 |
|---|---|
| 重连策略 | 指数退避 |
| 最大延迟 | 30秒 |
| 最大尝试次数 | 10 |
| 心跳间隔 | 30秒 |
集成
钩子脚本通过HTTP POST广播到服务器:
import httpx
async def broadcast(event: dict):
async with httpx.AsyncClient() as client:
await client.post(
f"http://localhost:8000/broadcast/{event['adw_id']}",
json=event
)
后续步骤
- 实现WebSocket服务器 (
adws/websocket_server.py) - 与钩子集成 (
/configure-hooks) - 构建泳道UI (
swimlane-visualization技能) - 添加事件持久化(可选数据库日志记录)
## SDK注意
> **实现注意:** 完整的WebSocket集成需要生产后端。此命令提供规范;实现需要FastAPI/asyncio设置。
## 交叉引用
- @websocket-architecture.md - WebSocket模式
- @hook-event-patterns.md - 事件类型
- `event-broadcaster` 代理 - 广播设计
- `swimlane-visualization` 技能 - UI消费