name: 午夜索引器:事件订阅
description: 当需要订阅实时区块链事件、建立WebSocket连接、监控合约状态变化、构建实时仪表板或实现Midnight推送通知时使用。
事件订阅
通过WebSocket连接订阅午夜网络的实时区块链事件。
使用场景
- 监听影响某个地址的新交易
- 实时监控合约状态变化
- 使用区块链数据构建实时仪表板
- 为区块链事件实现推送通知
- 跟踪已提交交易的确认状态
- 构建事件驱动的后端服务
核心概念
WebSocket架构
午夜索引器通过WebSocket提供GraphQL订阅,用于实时事件传递。
| 组件 |
用途 |
| WebSocket连接 |
持久化双向通道 |
| GraphQL订阅 |
事件驱动查询 |
| 订阅过滤器 |
定位特定事件 |
事件类型
| 事件 |
描述 |
newTransaction |
交易在区块中确认 |
newBlock |
新区块添加到链上 |
contractStateChange |
合约状态更新 |
utxoCreated |
为地址创建新UTXO |
utxoSpent |
UTXO被交易花费 |
连接生命周期
- 连接 - 建立WebSocket连接
- 订阅 - 发送订阅查询
- 接收 - 处理传入事件
- 重连 - 优雅处理断开连接
- 取消订阅 - 完成后清理
背压处理
当事件到达速度超过应用程序处理能力时,实施背压策略:
- 缓冲 - 将事件排队处理
- 丢弃 - 过载时跳过事件
- 采样 - 仅处理每第N个事件
参考文档
示例
快速开始
1. 创建WebSocket客户端
import { createWebSocketClient } from '@midnight-ntwrk/midnight-js-indexer';
const wsClient = createWebSocketClient({
uri: 'wss://indexer.testnet.midnight.network/api/v1/graphql',
});
2. 订阅事件
const subscription = wsClient.subscribe({
query: `
subscription WatchTransactions($address: String!) {
newTransaction(address: $address) {
hash
blockNumber
timestamp
inputs { address amount }
outputs { address amount }
}
}
`,
variables: { address: 'addr_test1...' },
});
subscription.on('data', (event) => {
console.log('新交易:', event.data.newTransaction);
});
3. 处理事件
subscription.on('data', handleTransaction);
subscription.on('error', handleError);
subscription.on('complete', handleComplete);
4. 清理
// 当完成监听时
subscription.unsubscribe();
wsClient.close();
常见模式
事件处理器设置
interface SubscriptionHandlers<T> {
onData: (data: T) => void;
onError: (error: Error) => void;
onComplete?: () => void;
}
function createEventSubscription<T>(
client: WebSocketClient,
query: string,
variables: Record<string, unknown>,
handlers: SubscriptionHandlers<T>
): () => void {
const subscription = client.subscribe({ query, variables });
subscription.on('data', (result) => {
if (result.errors) {
handlers.onError(new Error(result.errors[0].message));
return;
}
handlers.onData(result.data);
});
subscription.on('error', handlers.onError);
if (handlers.onComplete) {
subscription.on('complete', handlers.onComplete);
}
// 返回取消订阅函数
return () => subscription.unsubscribe();
}
合约事件监听器
interface ContractEvent {
contractAddress: string;
key: string;
oldValue: string | null;
newValue: string;
txHash: string;
blockNumber: number;
}
function watchContractState(
client: WebSocketClient,
contractAddress: string,
onEvent: (event: ContractEvent) => void
): () => void {
return createEventSubscription(
client,
`
subscription WatchContract($address: String!) {
contractStateChange(address: $address) {
contractAddress
key
oldValue
newValue
txHash
blockNumber
}
}
`,
{ address: contractAddress },
{
onData: (data) => onEvent(data.contractStateChange),
onError: (error) => console.error('订阅错误:', error),
}
);
}
交易确认跟踪器
async function waitForConfirmation(
client: WebSocketClient,
txHash: string,
confirmations = 1,
timeout = 120000
): Promise<number> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
unsubscribe();
reject(new Error('确认超时'));
}, timeout);
const unsubscribe = createEventSubscription(
client,
`
subscription TrackTx($hash: String!) {
transactionConfirmation(hash: $hash) {
hash
blockNumber
confirmations
}
}
`,
{ hash: txHash },
{
onData: (data) => {
const { confirmations: current } = data.transactionConfirmation;
if (current >= confirmations) {
clearTimeout(timer);
unsubscribe();
resolve(data.transactionConfirmation.blockNumber);
}
},
onError: (error) => {
clearTimeout(timer);
reject(error);
},
}
);
});
}
多订阅管理器
class SubscriptionManager {
private subscriptions = new Map<string, () => void>();
private client: WebSocketClient;
constructor(wsUri: string) {
this.client = createWebSocketClient({ uri: wsUri });
}
subscribe(
id: string,
query: string,
variables: Record<string, unknown>,
onData: (data: unknown) => void
): void {
// 取消相同ID的现有订阅
this.unsubscribe(id);
const unsub = createEventSubscription(
this.client,
query,
variables,
{
onData,
onError: (err) => console.error(`订阅 ${id} 错误:`, err),
}
);
this.subscriptions.set(id, unsub);
}
unsubscribe(id: string): void {
const unsub = this.subscriptions.get(id);
if (unsub) {
unsub();
this.subscriptions.delete(id);
}
}
unsubscribeAll(): void {
for (const [id] of this.subscriptions) {
this.unsubscribe(id);
}
}
close(): void {
this.unsubscribeAll();
this.client.close();
}
}
事件缓冲
class EventBuffer<T> {
private buffer: T[] = [];
private processing = false;
private intervalHandle: ReturnType<typeof setInterval>;
constructor(
private processor: (events: T[]) => Promise<void>,
private maxSize = 100,
private flushInterval = 1000
) {
this.intervalHandle = setInterval(() => this.flush(), flushInterval);
}
destroy(): void {
clearInterval(this.intervalHandle);
}
push(event: T): void {
this.buffer.push(event);
if (this.buffer.length >= this.maxSize) {
this.flush();
}
}
async flush(): Promise<void> {
if (this.processing || this.buffer.length === 0) return;
this.processing = true;
const events = this.buffer.splice(0);
try {
await this.processor(events);
} catch (error) {
console.error('缓冲处理错误:', error);
// 将失败事件重新加入队列前端
this.buffer.unshift(...events);
} finally {
this.processing = false;
}
}
}
相关技能
indexer-service - 查询历史区块链数据
midnight-dapp:state-management - 同步前端状态与事件
midnight-dapp:transaction-flows - 提交交易并跟踪状态
相关命令
/midnight-tooling:check - 验证WebSocket连接性