区块链事件订阅Skill midnight-indexer:event-subscriptions

午夜网络区块链实时事件订阅技术,通过WebSocket连接实现实时监控交易、合约状态变化、UTXO创建与消费等区块链事件。支持GraphQL订阅查询、事件过滤、背压处理、自动重连和事件缓冲,适用于构建实时仪表板、推送通知系统、事件驱动后端服务和交易状态跟踪应用。关键词:区块链实时订阅、WebSocket事件、合约状态监控、交易确认跟踪、GraphQL订阅、事件驱动架构、午夜网络、DApp开发、实时数据流、智能合约事件。

DApp开发 0 次安装 0 次浏览 更新于 2/26/2026

name: 午夜索引器:事件订阅 description: 当需要订阅实时区块链事件、建立WebSocket连接、监控合约状态变化、构建实时仪表板或实现Midnight推送通知时使用。

事件订阅

通过WebSocket连接订阅午夜网络的实时区块链事件。

使用场景

  • 监听影响某个地址的新交易
  • 实时监控合约状态变化
  • 使用区块链数据构建实时仪表板
  • 为区块链事件实现推送通知
  • 跟踪已提交交易的确认状态
  • 构建事件驱动的后端服务

核心概念

WebSocket架构

午夜索引器通过WebSocket提供GraphQL订阅,用于实时事件传递。

组件 用途
WebSocket连接 持久化双向通道
GraphQL订阅 事件驱动查询
订阅过滤器 定位特定事件

事件类型

事件 描述
newTransaction 交易在区块中确认
newBlock 新区块添加到链上
contractStateChange 合约状态更新
utxoCreated 为地址创建新UTXO
utxoSpent UTXO被交易花费

连接生命周期

  1. 连接 - 建立WebSocket连接
  2. 订阅 - 发送订阅查询
  3. 接收 - 处理传入事件
  4. 重连 - 优雅处理断开连接
  5. 取消订阅 - 完成后清理

背压处理

当事件到达速度超过应用程序处理能力时,实施背压策略:

  • 缓冲 - 将事件排队处理
  • 丢弃 - 过载时跳过事件
  • 采样 - 仅处理每第N个事件

参考文档

文档 描述
websocket-setup.md 连接配置和协议
reconnection-patterns.md 处理断开连接和重放

示例

示例 描述
contract-events/ 订阅合约状态变化
reconnect-handler/ 具有事件重放的健壮重连机制

快速开始

1. 创建WebSocket客户端

import { createWebSocketClient } from '@midnight-ntwrk/midnight-js-indexer';

const wsClient = createWebSocketClient({
  uri: 'wss://indexer.testnet.midnight.network/api/v1/graphql',
});

2. 订阅事件

const subscription = wsClient.subscribe({
  query: `
    subscription WatchTransactions($address: String!) {
      newTransaction(address: $address) {
        hash
        blockNumber
        timestamp
        inputs { address amount }
        outputs { address amount }
      }
    }
  `,
  variables: { address: 'addr_test1...' },
});

subscription.on('data', (event) => {
  console.log('新交易:', event.data.newTransaction);
});

3. 处理事件

subscription.on('data', handleTransaction);
subscription.on('error', handleError);
subscription.on('complete', handleComplete);

4. 清理

// 当完成监听时
subscription.unsubscribe();
wsClient.close();

常见模式

事件处理器设置

interface SubscriptionHandlers<T> {
  onData: (data: T) => void;
  onError: (error: Error) => void;
  onComplete?: () => void;
}

function createEventSubscription<T>(
  client: WebSocketClient,
  query: string,
  variables: Record<string, unknown>,
  handlers: SubscriptionHandlers<T>
): () => void {
  const subscription = client.subscribe({ query, variables });

  subscription.on('data', (result) => {
    if (result.errors) {
      handlers.onError(new Error(result.errors[0].message));
      return;
    }
    handlers.onData(result.data);
  });

  subscription.on('error', handlers.onError);

  if (handlers.onComplete) {
    subscription.on('complete', handlers.onComplete);
  }

  // 返回取消订阅函数
  return () => subscription.unsubscribe();
}

合约事件监听器

interface ContractEvent {
  contractAddress: string;
  key: string;
  oldValue: string | null;
  newValue: string;
  txHash: string;
  blockNumber: number;
}

function watchContractState(
  client: WebSocketClient,
  contractAddress: string,
  onEvent: (event: ContractEvent) => void
): () => void {
  return createEventSubscription(
    client,
    `
      subscription WatchContract($address: String!) {
        contractStateChange(address: $address) {
          contractAddress
          key
          oldValue
          newValue
          txHash
          blockNumber
        }
      }
    `,
    { address: contractAddress },
    {
      onData: (data) => onEvent(data.contractStateChange),
      onError: (error) => console.error('订阅错误:', error),
    }
  );
}

交易确认跟踪器

async function waitForConfirmation(
  client: WebSocketClient,
  txHash: string,
  confirmations = 1,
  timeout = 120000
): Promise<number> {
  return new Promise((resolve, reject) => {
    const timer = setTimeout(() => {
      unsubscribe();
      reject(new Error('确认超时'));
    }, timeout);

    const unsubscribe = createEventSubscription(
      client,
      `
        subscription TrackTx($hash: String!) {
          transactionConfirmation(hash: $hash) {
            hash
            blockNumber
            confirmations
          }
        }
      `,
      { hash: txHash },
      {
        onData: (data) => {
          const { confirmations: current } = data.transactionConfirmation;
          if (current >= confirmations) {
            clearTimeout(timer);
            unsubscribe();
            resolve(data.transactionConfirmation.blockNumber);
          }
        },
        onError: (error) => {
          clearTimeout(timer);
          reject(error);
        },
      }
    );
  });
}

多订阅管理器

class SubscriptionManager {
  private subscriptions = new Map<string, () => void>();
  private client: WebSocketClient;

  constructor(wsUri: string) {
    this.client = createWebSocketClient({ uri: wsUri });
  }

  subscribe(
    id: string,
    query: string,
    variables: Record<string, unknown>,
    onData: (data: unknown) => void
  ): void {
    // 取消相同ID的现有订阅
    this.unsubscribe(id);

    const unsub = createEventSubscription(
      this.client,
      query,
      variables,
      {
        onData,
        onError: (err) => console.error(`订阅 ${id} 错误:`, err),
      }
    );

    this.subscriptions.set(id, unsub);
  }

  unsubscribe(id: string): void {
    const unsub = this.subscriptions.get(id);
    if (unsub) {
      unsub();
      this.subscriptions.delete(id);
    }
  }

  unsubscribeAll(): void {
    for (const [id] of this.subscriptions) {
      this.unsubscribe(id);
    }
  }

  close(): void {
    this.unsubscribeAll();
    this.client.close();
  }
}

事件缓冲

class EventBuffer<T> {
  private buffer: T[] = [];
  private processing = false;
  private intervalHandle: ReturnType<typeof setInterval>;

  constructor(
    private processor: (events: T[]) => Promise<void>,
    private maxSize = 100,
    private flushInterval = 1000
  ) {
    this.intervalHandle = setInterval(() => this.flush(), flushInterval);
  }

  destroy(): void {
    clearInterval(this.intervalHandle);
  }

  push(event: T): void {
    this.buffer.push(event);

    if (this.buffer.length >= this.maxSize) {
      this.flush();
    }
  }

  async flush(): Promise<void> {
    if (this.processing || this.buffer.length === 0) return;

    this.processing = true;
    const events = this.buffer.splice(0);

    try {
      await this.processor(events);
    } catch (error) {
      console.error('缓冲处理错误:', error);
      // 将失败事件重新加入队列前端
      this.buffer.unshift(...events);
    } finally {
      this.processing = false;
    }
  }
}

相关技能

  • indexer-service - 查询历史区块链数据
  • midnight-dapp:state-management - 同步前端状态与事件
  • midnight-dapp:transaction-flows - 提交交易并跟踪状态

相关命令

  • /midnight-tooling:check - 验证WebSocket连接性