名称: dotnet-realtime-communication 描述: “构建实时功能。SignalR 中心、SSE (.NET 10)、JSON-RPC 2.0、gRPC 流、扩展。” 用户可调用: false
dotnet-realtime-communication
.NET 应用程序的实时通信模式。比较 SignalR(全双工 over WebSocket 带自动回退)、Server-Sent Events(SSE,内置到 ASP.NET Core in .NET 10)、JSON-RPC 2.0(结构化请求-响应 over 任何传输)、和 gRPC 流(高性能二进制流)。提供基于需求选择正确协议的决策指导。
范围
- SignalR 中心(WebSocket、自动回退、扩展)
- Server-Sent Events(SSE,内置 .NET 10)
- JSON-RPC 2.0 over 任何传输
- gRPC 流用于高性能二进制
- 协议比较和决策指导
超出范围
- HTTP 客户端工厂和弹性管道 – 见 [skill:dotnet-http-client] 和 [skill:dotnet-resilience]
- 原生 AOT 架构和修剪 – 见 [skill:dotnet-native-aot] 和 [skill:dotnet-trimming]
- Blazor 特定的 SignalR 使用 – 见 [skill:dotnet-blazor-patterns]
交叉参考: [skill:dotnet-grpc] 用于 gRPC 流实现细节和所有四种流模式。见 [skill:dotnet-integration-testing] 用于测试实时通信端点。见 [skill:dotnet-blazor-patterns] 用于 Blazor 特定的 SignalR 电路管理和渲染模式交互。
协议比较
| 协议 | 方向 | 传输 | 格式 | 浏览器支持 | 最适合 |
|---|---|---|---|---|---|
| SignalR | 全双工 | WebSocket、SSE、长轮询(自动协商) | JSON 或 MessagePack | 是(JS/TS 客户端) | 交互式应用、聊天、仪表板、协作编辑 |
| SSE (.NET 10) | 仅服务器到客户端 | HTTP/1.1+ | 文本(通常为 JSON 行) | 是(原生 EventSource API) | 通知、实时订阅、状态更新 |
| JSON-RPC 2.0 | 请求-响应 | 任何(HTTP、WebSocket、标准输入输出) | JSON | 取决于传输 | 工具协议(LSP)、结构化 RCP over 简单传输 |
| gRPC 流 | 所有四种模式 | HTTP/2 | Protobuf(二进制) | 有限(gRPC-Web) | 服务到服务、高吞吐量、低延迟流 |
何时选择什么
- SignalR:你需要与浏览器客户端的双向实时通信。SignalR 自动处理传输协商(首选 WebSocket,回退到 SSE,然后是长轮询)。当客户端需要同时发送和接收实时数据时使用。
- SSE (.NET 10 内置):你只需要服务器到客户端的推送。当不需要双向通信时,比 SignalR 更简单。内置到 ASP.NET Core in .NET 10 – 无需额外包。与浏览器的原生
EventSourceAPI 配合工作。 - JSON-RPC 2.0:你需要结构化请求-响应语义 over 简单传输。由 Language Server Protocol (LSP) 和一些 .NET 工具使用。不是流协议 – 当你需要带类型参数的命名方法 over WebSocket 或标准输入输出时使用。
- gRPC 流:服务到服务流,具有最大性能。支持所有四种流模式(单向、服务器流、客户端流、双向流)。当两个端点都是 .NET 服务或 gRPC 兼容时最佳。见 [skill:dotnet-grpc] 用于实现细节。
SignalR
SignalR 提供实时 web 功能,带自动连接管理和传输协商。
服务器设置
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR(options =>
{
options.EnableDetailedErrors = builder.Environment.IsDevelopment();
options.MaximumReceiveMessageSize = 64 * 1024; // 64 KB
options.KeepAliveInterval = TimeSpan.FromSeconds(15);
});
var app = builder.Build();
app.MapHub<NotificationHub>("/hubs/notifications");
中心实现
public sealed class NotificationHub(
ILogger<NotificationHub> logger) : Hub
{
public override async Task OnConnectedAsync()
{
var userId = Context.UserIdentifier;
if (userId is not null)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"user:{userId}");
}
await base.OnConnectedAsync();
}
// 客户端到服务器方法
public async Task SendMessage(string channel, string message)
{
// 广播到通道组中的所有客户端
await Clients.Group(channel).SendAsync("ReceiveMessage",
Context.UserIdentifier, message);
}
// 服务器到客户端流
public async IAsyncEnumerable<StockPrice> StreamPrices(
string symbol,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
yield return await GetLatestPrice(symbol, cancellationToken);
await Task.Delay(1000, cancellationToken);
}
}
}
强类型中心
使用接口以获得客户端方法调用的编译时安全性:
public interface INotificationClient
{
Task ReceiveMessage(string user, string message);
Task OrderStatusChanged(int orderId, string status);
}
public sealed class NotificationHub(
ILogger<NotificationHub> logger) : Hub<INotificationClient>
{
public async Task SendMessage(string channel, string message)
{
// 编译时检查 -- 无魔法字符串
await Clients.Group(channel).ReceiveMessage(
Context.UserIdentifier!, message);
}
}
从中心外部发送
注入 IHubContext 以从后台服务或控制器发送消息:
public sealed class OrderService(
IHubContext<NotificationHub, INotificationClient> hubContext)
{
public async Task UpdateOrderStatus(int orderId, string userId, string status)
{
// 发送到特定用户组
await hubContext.Clients.Group($"user:{userId}")
.OrderStatusChanged(orderId, status);
}
}
传输协商
SignalR 自动协商最佳传输:
- WebSocket(首选) – 全双工,最低延迟
- Server-Sent Events – 仅服务器到客户端,当 WebSocket 不可用时回退
- 长轮询 – 通用回退,最高延迟
需要时强制特定传输:
// 服务器:禁用特定传输
app.MapHub<NotificationHub>("/hubs/notifications", options =>
{
options.Transports = HttpTransportType.WebSockets |
HttpTransportType.ServerSentEvents;
// 禁用长轮询
});
MessagePack 协议
使用 MessagePack 以获得更小的负载和更快的序列化:
// 服务器
builder.Services.AddSignalR()
.AddMessagePackProtocol();
// 客户端(JavaScript)
// new signalR.HubConnectionBuilder()
// .withUrl("/hubs/notifications")
// .withHubProtocol(new signalR.protocols.msgpack.MessagePackHubProtocol())
// .build();
连接生命周期
重写 OnConnectedAsync 和 OnDisconnectedAsync 以管理连接状态:
public sealed class NotificationHub(
ILogger<NotificationHub> logger,
IConnectionTracker tracker) : Hub<INotificationClient>
{
public override async Task OnConnectedAsync()
{
var userId = Context.UserIdentifier;
var connectionId = Context.ConnectionId;
logger.LogInformation("客户端 {ConnectionId} 连接(用户: {UserId})",
connectionId, userId);
// 跟踪连接以实现存在功能
if (userId is not null)
{
await tracker.AddConnectionAsync(userId, connectionId);
await Groups.AddToGroupAsync(connectionId, $"user:{userId}");
}
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
var userId = Context.UserIdentifier;
var connectionId = Context.ConnectionId;
if (exception is not null)
{
logger.LogWarning(exception,
"客户端 {ConnectionId} 断开连接并带错误", connectionId);
}
if (userId is not null)
{
await tracker.RemoveConnectionAsync(userId, connectionId);
}
await base.OnDisconnectedAsync(exception);
}
}
组管理
组提供轻量级发布-订阅机制。连接可以属于多个组,组成员资格按连接管理:
public sealed class ChatHub : Hub<IChatClient>
{
// 加入房间(由客户端调用)
public async Task JoinRoom(string roomName)
{
await Groups.AddToGroupAsync(Context.ConnectionId, roomName);
await Clients.Group(roomName).UserJoined(Context.UserIdentifier!, roomName);
}
// 离开房间
public async Task LeaveRoom(string roomName)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, roomName);
await Clients.Group(roomName).UserLeft(Context.UserIdentifier!, roomName);
}
// 发送到特定组
public async Task SendToRoom(string roomName, string message)
{
await Clients.Group(roomName).ReceiveMessage(
Context.UserIdentifier!, message);
}
// 发送到除调用者外的所有客户端
public async Task BroadcastExceptSelf(string message)
{
await Clients.Others.ReceiveMessage(
Context.UserIdentifier!, message);
}
}
组不持久化 – 当连接断开时清除。如果需要(例如,从数据库或缓存),在 OnConnectedAsync 中重新添加连接到组。
客户端到服务器流
客户端可以使用 IAsyncEnumerable<T> 或 ChannelReader<T> 流数据到中心:
public sealed class UploadHub : Hub
{
// 从客户端接受流项目
public async Task UploadData(
IAsyncEnumerable<SensorReading> stream,
CancellationToken cancellationToken)
{
await foreach (var reading in stream.WithCancellation(cancellationToken))
{
await ProcessReading(reading);
}
}
}
认证
SignalR 使用与 ASP.NET Core 主机相同的认证。对于 WebSocket 连接,访问令牌通过查询字符串发送,因为 WebSocket 不支持自定义头:
// 服务器:为 SignalR 配置 JWT
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Authority = "https://identity.example.com";
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
// 从查询字符串读取令牌以用于 WebSocket 请求
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
path.StartsWithSegments("/hubs"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
builder.Services.AddAuthorization();
var app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();
app.MapHub<NotificationHub>("/hubs/notifications")
.RequireAuthorization();
在中心中访问 Context.UserIdentifier 以识别认证用户。默认情况下,这映射到 ClaimTypes.NameIdentifier 声明。使用 IUserIdProvider 自定义:
public sealed class EmailUserIdProvider : IUserIdProvider
{
public string? GetUserId(HubConnectionContext connection)
{
return connection.User?.FindFirst(ClaimTypes.Email)?.Value;
}
}
// 注册
builder.Services.AddSingleton<IUserIdProvider, EmailUserIdProvider>();
使用背板扩展
对于多服务器部署,使用背板跨实例同步消息。没有背板,在一个服务器上发送的消息在其他服务器上的连接不可见。
Redis 背板:
builder.Services.AddSignalR()
.AddStackExchangeRedis(builder.Configuration.GetConnectionString("Redis")!,
options =>
{
options.Configuration.ChannelPrefix =
RedisChannel.Literal("MyApp:");
});
Azure SignalR 服务(托管背板):
builder.Services.AddSignalR()
.AddAzureSignalR(builder.Configuration["Azure:SignalR:ConnectionString"]);
Azure SignalR 服务完全卸载连接管理 – ASP.NET Core 服务器处理中心逻辑,而 Azure 管理 WebSocket 连接、扩展和消息路由。
Server-Sent Events (SSE) – .NET 10
.NET 10 向 ASP.NET Core 添加了内置 SSE 支持,使得服务器到客户端流变得直接,无需额外包。
最小 API 端点
app.MapGet("/events/orders", async (
OrderEventService eventService,
CancellationToken cancellationToken) =>
{
// TypedResults.ServerSentEvents 返回 SSE 响应
return TypedResults.ServerSentEvents(
eventService.GetOrderEventsAsync(cancellationToken));
});
事件源实现
public sealed class OrderEventService
{
public async IAsyncEnumerable<SseItem<OrderEvent>> GetOrderEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var evt = await WaitForNextEvent(cancellationToken);
yield return new SseItem<OrderEvent>(evt, "order-update");
}
}
}
浏览器客户端
const source = new EventSource('/events/orders');
source.addEventListener('order-update', (event) => {
const order = JSON.parse(event.data);
updateDashboard(order);
});
source.onerror = () => {
// EventSource 自动重新连接
console.log('SSE 连接丢失,重新连接中...');
};
何时使用 SSE Over SignalR
- 仅单向推送 – 当你不需要客户端到服务器消息时,SSE 更简单
- 浏览器原生 – 无需 JavaScript 库(使用
EventSourceAPI) - 自动重新连接 – 浏览器使用
Last-Event-ID自动重新连接 - HTTP/1.1 兼容 – 通过不支持 WebSocket 升级的代理工作
JSON-RPC 2.0
JSON-RPC 2.0 是一个无状态、传输无关的远程过程调用协议,编码在 JSON 中。它是 Language Server Protocol (LSP) 的基础,用于一些 .NET 工具场景。
协议结构
// 请求
{"jsonrpc": "2.0", "method": "textDocument/completion", "params": {...}, "id": 1}
// 响应
{"jsonrpc": "2.0", "result": {...}, "id": 1}
// 通知(不期望响应)
{"jsonrpc": "2.0", "method": "textDocument/didChange", "params": {...}}
// 错误
{"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": 1}
StreamJsonRpc (.NET 库)
StreamJsonRpc 是主要的 .NET 库用于 JSON-RPC 2.0:
<PackageReference Include="StreamJsonRpc" Version="2.*" />
// 服务器:通过 JSON-RPC over 流暴露方法
using StreamJsonRpc;
public sealed class CalculatorService
{
public int Add(int a, int b) => a + b;
public Task<double> DivideAsync(double a, double b) =>
b == 0 ? throw new ArgumentException("除零错误")
: Task.FromResult(a / b);
}
// 通过 WebSocket 连接 -- UseWebSockets() 是升级处理所必需的
app.UseWebSockets();
app.Map("/jsonrpc", async (HttpContext context) =>
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = 400;
return;
}
var ws = await context.WebSockets.AcceptWebSocketAsync();
using var rpc = new JsonRpc(new WebSocketMessageHandler(ws));
rpc.AddLocalRpcTarget(new CalculatorService());
rpc.StartListening();
await rpc.Completion;
});
// 客户端
using var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri("ws://localhost:5000/jsonrpc"),
CancellationToken.None);
using var rpc = new JsonRpc(new WebSocketMessageHandler(ws));
rpc.StartListening();
var result = await rpc.InvokeAsync<int>("Add", 2, 3);
// result == 5
何时使用 JSON-RPC 2.0
- 构建或集成 Language Server Protocol (LSP) 实现
- 简单 RPC over WebSocket 或标准输入输出,其中 gRPC 太重量级
- 与非 .NET 系统互操作,这些系统使用 JSON-RPC
- 工具和编辑器集成
gRPC 流
见 [skill:dotnet-grpc] 用于完整的 gRPC 实现细节,包括所有四种流模式(单向、服务器流、客户端流、双向流)、认证、负载均衡和健康检查。
快速决策:gRPC 流 vs SignalR vs SSE
| 需求 | 选择 |
|---|---|
| 服务到服务,两者 .NET | gRPC 流 |
| 浏览器客户端需要双向 | SignalR |
| 浏览器客户端仅需要服务器推送 | SSE |
| 最大吞吐量,二进制负载 | gRPC 流 |
| 与浏览器客户端的自动重新连接 | SSE(原生)或 SignalR(内置) |
| 多个客户端平台(JS、移动、.NET) | SignalR |
关键原则
- 默认使用 SignalR 用于浏览器面向的实时通信 – 它处理传输协商、重新连接和分组开箱即用
- 使用 SSE 用于简单服务器推送 – .NET 10 内置支持使其成为单向通知的最轻量选项
- 使用 gRPC 流用于服务到服务 – 最高性能、强类型契约、所有四种流模式
- 使用 JSON-RPC 2.0 用于工具协议 – 当你需要结构化 RCP over 简单传输(WebSocket、标准输入输出)时
- 使用强类型中心 –
Hub<T>在编译时捕获方法名称错误,而不是运行时 - 使用背板扩展 SignalR – Redis 或 Azure SignalR 服务用于多服务器部署
见 [skill:dotnet-native-aot] 用于 AOT 编译管道和 [skill:dotnet-aot-architecture] 用于 AOT 兼容的实时通信模式。
代理陷阱
- 当 SSE 足够时不要使用 SignalR – 如果你只需要服务器到客户端推送而不需要双向通信,SSE 更简单、更轻量。
- 当客户端使用 MessagePack 时不要忘记服务器上的
AddMessagePackProtocol()– 不匹配的协议导致静默连接失败。 - 除非必需,否则不要使用长轮询传输与 SignalR – 与 WebSocket 相比,它有显著更高的延迟和服务器资源使用。
- 不要长期存储连接 ID – SignalR 连接 ID 在重新连接时改变。使用用户标识符或组进行寻址。
- 不要直接使用 gRPC 流到浏览器 – 浏览器不支持 HTTP/2 trailers 原生。使用 gRPC-Web 带代理或选择 SignalR/SSE 替代。
- 不要混淆 SSE 与 WebSocket – SSE 是单向的(仅服务器到客户端)。如果你需要客户端到服务器消息,直接使用 SignalR 或 WebSocket。
- 对于 JWT 与 SignalR 不要忘记
OnMessageReceived– WebSocket 连接在初始握手后不能发送自定义 HTTP 头。访问令牌必须在JwtBearerEvents.OnMessageReceived中从查询字符串读取。 - 不要假设组成员资格在重新连接时持久化 – 组与连接 ID 绑定,在重新连接时改变。在
OnConnectedAsync中重新添加连接到组。 - 不要在没有背板的情况下部署多服务器 SignalR – 没有 Redis 或 Azure SignalR 服务,在一个服务器实例上发送的消息在其他实例上的连接不可见。
归因
改编自 [Aaronontheweb/dotnet-skills](MIT 许可证)。