.NET实时通信 dotnet-realtime-communication

.NET 实时通信技能,专注于为 .NET 应用程序提供实时通信解决方案,包括 SignalR、SSE、JSON-RPC 2.0 和 gRPC 流等协议的比较、实现和选择指南,帮助开发者构建高效、可扩展的实时功能。关键词:.NET、实时通信、SignalR、SSE、gRPC、JSON-RPC、后端开发、WebSocket、服务器推送、双向通信、协议选择、扩展性、性能优化。

后端开发 0 次安装 0 次浏览 更新于 3/6/2026

名称: dotnet-realtime-communication 描述: “构建实时功能。SignalR 中心、SSE (.NET 10)、JSON-RPC 2.0、gRPC 流、扩展。” 用户可调用: false

dotnet-realtime-communication

.NET 应用程序的实时通信模式。比较 SignalR(全双工 over WebSocket 带自动回退)、Server-Sent Events(SSE,内置到 ASP.NET Core in .NET 10)、JSON-RPC 2.0(结构化请求-响应 over 任何传输)、和 gRPC 流(高性能二进制流)。提供基于需求选择正确协议的决策指导。

范围

  • SignalR 中心(WebSocket、自动回退、扩展)
  • Server-Sent Events(SSE,内置 .NET 10)
  • JSON-RPC 2.0 over 任何传输
  • gRPC 流用于高性能二进制
  • 协议比较和决策指导

超出范围

  • HTTP 客户端工厂和弹性管道 – 见 [skill:dotnet-http-client] 和 [skill:dotnet-resilience]
  • 原生 AOT 架构和修剪 – 见 [skill:dotnet-native-aot] 和 [skill:dotnet-trimming]
  • Blazor 特定的 SignalR 使用 – 见 [skill:dotnet-blazor-patterns]

交叉参考: [skill:dotnet-grpc] 用于 gRPC 流实现细节和所有四种流模式。见 [skill:dotnet-integration-testing] 用于测试实时通信端点。见 [skill:dotnet-blazor-patterns] 用于 Blazor 特定的 SignalR 电路管理和渲染模式交互。


协议比较

协议 方向 传输 格式 浏览器支持 最适合
SignalR 全双工 WebSocket、SSE、长轮询(自动协商) JSON 或 MessagePack 是(JS/TS 客户端) 交互式应用、聊天、仪表板、协作编辑
SSE (.NET 10) 仅服务器到客户端 HTTP/1.1+ 文本(通常为 JSON 行) 是(原生 EventSource API) 通知、实时订阅、状态更新
JSON-RPC 2.0 请求-响应 任何(HTTP、WebSocket、标准输入输出) JSON 取决于传输 工具协议(LSP)、结构化 RCP over 简单传输
gRPC 流 所有四种模式 HTTP/2 Protobuf(二进制) 有限(gRPC-Web) 服务到服务、高吞吐量、低延迟流

何时选择什么

  • SignalR:你需要与浏览器客户端的双向实时通信。SignalR 自动处理传输协商(首选 WebSocket,回退到 SSE,然后是长轮询)。当客户端需要同时发送和接收实时数据时使用。
  • SSE (.NET 10 内置):你只需要服务器到客户端的推送。当不需要双向通信时,比 SignalR 更简单。内置到 ASP.NET Core in .NET 10 – 无需额外包。与浏览器的原生 EventSource API 配合工作。
  • JSON-RPC 2.0:你需要结构化请求-响应语义 over 简单传输。由 Language Server Protocol (LSP) 和一些 .NET 工具使用。不是流协议 – 当你需要带类型参数的命名方法 over WebSocket 或标准输入输出时使用。
  • gRPC 流:服务到服务流,具有最大性能。支持所有四种流模式(单向、服务器流、客户端流、双向流)。当两个端点都是 .NET 服务或 gRPC 兼容时最佳。见 [skill:dotnet-grpc] 用于实现细节。

SignalR

SignalR 提供实时 web 功能,带自动连接管理和传输协商。

服务器设置

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSignalR(options =>
{
    options.EnableDetailedErrors = builder.Environment.IsDevelopment();
    options.MaximumReceiveMessageSize = 64 * 1024; // 64 KB
    options.KeepAliveInterval = TimeSpan.FromSeconds(15);
});

var app = builder.Build();

app.MapHub<NotificationHub>("/hubs/notifications");

中心实现

public sealed class NotificationHub(
    ILogger<NotificationHub> logger) : Hub
{
    public override async Task OnConnectedAsync()
    {
        var userId = Context.UserIdentifier;
        if (userId is not null)
        {
            await Groups.AddToGroupAsync(Context.ConnectionId, $"user:{userId}");
        }

        await base.OnConnectedAsync();
    }

    // 客户端到服务器方法
    public async Task SendMessage(string channel, string message)
    {
        // 广播到通道组中的所有客户端
        await Clients.Group(channel).SendAsync("ReceiveMessage",
            Context.UserIdentifier, message);
    }

    // 服务器到客户端流
    public async IAsyncEnumerable<StockPrice> StreamPrices(
        string symbol,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            yield return await GetLatestPrice(symbol, cancellationToken);
            await Task.Delay(1000, cancellationToken);
        }
    }
}

强类型中心

使用接口以获得客户端方法调用的编译时安全性:

public interface INotificationClient
{
    Task ReceiveMessage(string user, string message);
    Task OrderStatusChanged(int orderId, string status);
}

public sealed class NotificationHub(
    ILogger<NotificationHub> logger) : Hub<INotificationClient>
{
    public async Task SendMessage(string channel, string message)
    {
        // 编译时检查 -- 无魔法字符串
        await Clients.Group(channel).ReceiveMessage(
            Context.UserIdentifier!, message);
    }
}

从中心外部发送

注入 IHubContext 以从后台服务或控制器发送消息:

public sealed class OrderService(
    IHubContext<NotificationHub, INotificationClient> hubContext)
{
    public async Task UpdateOrderStatus(int orderId, string userId, string status)
    {
        // 发送到特定用户组
        await hubContext.Clients.Group($"user:{userId}")
            .OrderStatusChanged(orderId, status);
    }
}

传输协商

SignalR 自动协商最佳传输:

  1. WebSocket(首选) – 全双工,最低延迟
  2. Server-Sent Events – 仅服务器到客户端,当 WebSocket 不可用时回退
  3. 长轮询 – 通用回退,最高延迟

需要时强制特定传输:

// 服务器:禁用特定传输
app.MapHub<NotificationHub>("/hubs/notifications", options =>
{
    options.Transports = HttpTransportType.WebSockets |
                         HttpTransportType.ServerSentEvents;
    // 禁用长轮询
});

MessagePack 协议

使用 MessagePack 以获得更小的负载和更快的序列化:

// 服务器
builder.Services.AddSignalR()
    .AddMessagePackProtocol();

// 客户端(JavaScript)
// new signalR.HubConnectionBuilder()
//     .withUrl("/hubs/notifications")
//     .withHubProtocol(new signalR.protocols.msgpack.MessagePackHubProtocol())
//     .build();

连接生命周期

重写 OnConnectedAsyncOnDisconnectedAsync 以管理连接状态:

public sealed class NotificationHub(
    ILogger<NotificationHub> logger,
    IConnectionTracker tracker) : Hub<INotificationClient>
{
    public override async Task OnConnectedAsync()
    {
        var userId = Context.UserIdentifier;
        var connectionId = Context.ConnectionId;

        logger.LogInformation("客户端 {ConnectionId} 连接(用户: {UserId})",
            connectionId, userId);

        // 跟踪连接以实现存在功能
        if (userId is not null)
        {
            await tracker.AddConnectionAsync(userId, connectionId);
            await Groups.AddToGroupAsync(connectionId, $"user:{userId}");
        }

        await base.OnConnectedAsync();
    }

    public override async Task OnDisconnectedAsync(Exception? exception)
    {
        var userId = Context.UserIdentifier;
        var connectionId = Context.ConnectionId;

        if (exception is not null)
        {
            logger.LogWarning(exception,
                "客户端 {ConnectionId} 断开连接并带错误", connectionId);
        }

        if (userId is not null)
        {
            await tracker.RemoveConnectionAsync(userId, connectionId);
        }

        await base.OnDisconnectedAsync(exception);
    }
}

组管理

组提供轻量级发布-订阅机制。连接可以属于多个组,组成员资格按连接管理:

public sealed class ChatHub : Hub<IChatClient>
{
    // 加入房间(由客户端调用)
    public async Task JoinRoom(string roomName)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, roomName);
        await Clients.Group(roomName).UserJoined(Context.UserIdentifier!, roomName);
    }

    // 离开房间
    public async Task LeaveRoom(string roomName)
    {
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, roomName);
        await Clients.Group(roomName).UserLeft(Context.UserIdentifier!, roomName);
    }

    // 发送到特定组
    public async Task SendToRoom(string roomName, string message)
    {
        await Clients.Group(roomName).ReceiveMessage(
            Context.UserIdentifier!, message);
    }

    // 发送到除调用者外的所有客户端
    public async Task BroadcastExceptSelf(string message)
    {
        await Clients.Others.ReceiveMessage(
            Context.UserIdentifier!, message);
    }
}

组不持久化 – 当连接断开时清除。如果需要(例如,从数据库或缓存),在 OnConnectedAsync 中重新添加连接到组。

客户端到服务器流

客户端可以使用 IAsyncEnumerable<T>ChannelReader<T> 流数据到中心:

public sealed class UploadHub : Hub
{
    // 从客户端接受流项目
    public async Task UploadData(
        IAsyncEnumerable<SensorReading> stream,
        CancellationToken cancellationToken)
    {
        await foreach (var reading in stream.WithCancellation(cancellationToken))
        {
            await ProcessReading(reading);
        }
    }
}

认证

SignalR 使用与 ASP.NET Core 主机相同的认证。对于 WebSocket 连接,访问令牌通过查询字符串发送,因为 WebSocket 不支持自定义头:

// 服务器:为 SignalR 配置 JWT
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
    .AddJwtBearer(options =>
    {
        options.Authority = "https://identity.example.com";
        options.Events = new JwtBearerEvents
        {
            OnMessageReceived = context =>
            {
                // 从查询字符串读取令牌以用于 WebSocket 请求
                var accessToken = context.Request.Query["access_token"];
                var path = context.HttpContext.Request.Path;
                if (!string.IsNullOrEmpty(accessToken) &&
                    path.StartsWithSegments("/hubs"))
                {
                    context.Token = accessToken;
                }
                return Task.CompletedTask;
            }
        };
    });

builder.Services.AddAuthorization();

var app = builder.Build();

app.UseAuthentication();
app.UseAuthorization();

app.MapHub<NotificationHub>("/hubs/notifications")
    .RequireAuthorization();

在中心中访问 Context.UserIdentifier 以识别认证用户。默认情况下,这映射到 ClaimTypes.NameIdentifier 声明。使用 IUserIdProvider 自定义:

public sealed class EmailUserIdProvider : IUserIdProvider
{
    public string? GetUserId(HubConnectionContext connection)
    {
        return connection.User?.FindFirst(ClaimTypes.Email)?.Value;
    }
}

// 注册
builder.Services.AddSingleton<IUserIdProvider, EmailUserIdProvider>();

使用背板扩展

对于多服务器部署,使用背板跨实例同步消息。没有背板,在一个服务器上发送的消息在其他服务器上的连接不可见。

Redis 背板:

builder.Services.AddSignalR()
    .AddStackExchangeRedis(builder.Configuration.GetConnectionString("Redis")!,
        options =>
        {
            options.Configuration.ChannelPrefix =
                RedisChannel.Literal("MyApp:");
        });

Azure SignalR 服务(托管背板):

builder.Services.AddSignalR()
    .AddAzureSignalR(builder.Configuration["Azure:SignalR:ConnectionString"]);

Azure SignalR 服务完全卸载连接管理 – ASP.NET Core 服务器处理中心逻辑,而 Azure 管理 WebSocket 连接、扩展和消息路由。


Server-Sent Events (SSE) – .NET 10

.NET 10 向 ASP.NET Core 添加了内置 SSE 支持,使得服务器到客户端流变得直接,无需额外包。

最小 API 端点

app.MapGet("/events/orders", async (
    OrderEventService eventService,
    CancellationToken cancellationToken) =>
{
    // TypedResults.ServerSentEvents 返回 SSE 响应
    return TypedResults.ServerSentEvents(
        eventService.GetOrderEventsAsync(cancellationToken));
});

事件源实现

public sealed class OrderEventService
{
    public async IAsyncEnumerable<SseItem<OrderEvent>> GetOrderEventsAsync(
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var evt = await WaitForNextEvent(cancellationToken);
            yield return new SseItem<OrderEvent>(evt, "order-update");
        }
    }
}

浏览器客户端

const source = new EventSource('/events/orders');

source.addEventListener('order-update', (event) => {
    const order = JSON.parse(event.data);
    updateDashboard(order);
});

source.onerror = () => {
    // EventSource 自动重新连接
    console.log('SSE 连接丢失,重新连接中...');
};

何时使用 SSE Over SignalR

  • 仅单向推送 – 当你不需要客户端到服务器消息时,SSE 更简单
  • 浏览器原生 – 无需 JavaScript 库(使用 EventSource API)
  • 自动重新连接 – 浏览器使用 Last-Event-ID 自动重新连接
  • HTTP/1.1 兼容 – 通过不支持 WebSocket 升级的代理工作

JSON-RPC 2.0

JSON-RPC 2.0 是一个无状态、传输无关的远程过程调用协议,编码在 JSON 中。它是 Language Server Protocol (LSP) 的基础,用于一些 .NET 工具场景。

协议结构

// 请求
{"jsonrpc": "2.0", "method": "textDocument/completion", "params": {...}, "id": 1}

// 响应
{"jsonrpc": "2.0", "result": {...}, "id": 1}

// 通知(不期望响应)
{"jsonrpc": "2.0", "method": "textDocument/didChange", "params": {...}}

// 错误
{"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": 1}

StreamJsonRpc (.NET 库)

StreamJsonRpc 是主要的 .NET 库用于 JSON-RPC 2.0:

<PackageReference Include="StreamJsonRpc" Version="2.*" />
// 服务器:通过 JSON-RPC over 流暴露方法
using StreamJsonRpc;

public sealed class CalculatorService
{
    public int Add(int a, int b) => a + b;
    public Task<double> DivideAsync(double a, double b) =>
        b == 0 ? throw new ArgumentException("除零错误")
               : Task.FromResult(a / b);
}

// 通过 WebSocket 连接 -- UseWebSockets() 是升级处理所必需的
app.UseWebSockets();
app.Map("/jsonrpc", async (HttpContext context) =>
{
    if (!context.WebSockets.IsWebSocketRequest)
    {
        context.Response.StatusCode = 400;
        return;
    }

    var ws = await context.WebSockets.AcceptWebSocketAsync();
    using var rpc = new JsonRpc(new WebSocketMessageHandler(ws));
    rpc.AddLocalRpcTarget(new CalculatorService());
    rpc.StartListening();
    await rpc.Completion;
});
// 客户端
using var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri("ws://localhost:5000/jsonrpc"),
    CancellationToken.None);

using var rpc = new JsonRpc(new WebSocketMessageHandler(ws));
rpc.StartListening();

var result = await rpc.InvokeAsync<int>("Add", 2, 3);
// result == 5

何时使用 JSON-RPC 2.0

  • 构建或集成 Language Server Protocol (LSP) 实现
  • 简单 RPC over WebSocket 或标准输入输出,其中 gRPC 太重量级
  • 与非 .NET 系统互操作,这些系统使用 JSON-RPC
  • 工具和编辑器集成

gRPC 流

见 [skill:dotnet-grpc] 用于完整的 gRPC 实现细节,包括所有四种流模式(单向、服务器流、客户端流、双向流)、认证、负载均衡和健康检查。

快速决策:gRPC 流 vs SignalR vs SSE

需求 选择
服务到服务,两者 .NET gRPC 流
浏览器客户端需要双向 SignalR
浏览器客户端仅需要服务器推送 SSE
最大吞吐量,二进制负载 gRPC 流
与浏览器客户端的自动重新连接 SSE(原生)或 SignalR(内置)
多个客户端平台(JS、移动、.NET) SignalR

关键原则

  • 默认使用 SignalR 用于浏览器面向的实时通信 – 它处理传输协商、重新连接和分组开箱即用
  • 使用 SSE 用于简单服务器推送 – .NET 10 内置支持使其成为单向通知的最轻量选项
  • 使用 gRPC 流用于服务到服务 – 最高性能、强类型契约、所有四种流模式
  • 使用 JSON-RPC 2.0 用于工具协议 – 当你需要结构化 RCP over 简单传输(WebSocket、标准输入输出)时
  • 使用强类型中心Hub<T> 在编译时捕获方法名称错误,而不是运行时
  • 使用背板扩展 SignalR – Redis 或 Azure SignalR 服务用于多服务器部署

见 [skill:dotnet-native-aot] 用于 AOT 编译管道和 [skill:dotnet-aot-architecture] 用于 AOT 兼容的实时通信模式。


代理陷阱

  1. 当 SSE 足够时不要使用 SignalR – 如果你只需要服务器到客户端推送而不需要双向通信,SSE 更简单、更轻量。
  2. 当客户端使用 MessagePack 时不要忘记服务器上的 AddMessagePackProtocol() – 不匹配的协议导致静默连接失败。
  3. 除非必需,否则不要使用长轮询传输与 SignalR – 与 WebSocket 相比,它有显著更高的延迟和服务器资源使用。
  4. 不要长期存储连接 ID – SignalR 连接 ID 在重新连接时改变。使用用户标识符或组进行寻址。
  5. 不要直接使用 gRPC 流到浏览器 – 浏览器不支持 HTTP/2 trailers 原生。使用 gRPC-Web 带代理或选择 SignalR/SSE 替代。
  6. 不要混淆 SSE 与 WebSocket – SSE 是单向的(仅服务器到客户端)。如果你需要客户端到服务器消息,直接使用 SignalR 或 WebSocket。
  7. 对于 JWT 与 SignalR 不要忘记 OnMessageReceived – WebSocket 连接在初始握手后不能发送自定义 HTTP 头。访问令牌必须在 JwtBearerEvents.OnMessageReceived 中从查询字符串读取。
  8. 不要假设组成员资格在重新连接时持久化 – 组与连接 ID 绑定,在重新连接时改变。在 OnConnectedAsync 中重新添加连接到组。
  9. 不要在没有背板的情况下部署多服务器 SignalR – 没有 Redis 或 Azure SignalR 服务,在一个服务器实例上发送的消息在其他实例上的连接不可见。

归因

改编自 [Aaronontheweb/dotnet-skills](MIT 许可证)。


参考