.NET并发:选择正确的工具
何时使用此技能
当:
- 决定如何处理.NET中的并发操作
- 评估是否使用async/await、Channels、Akka.NET或其他抽象
- 倾向于使用锁、信号量或其他同步原语
- 需要处理具有背压、批处理或去抖的数据流
- 在多个并发实体间管理状态
理念
从简单开始,仅在需要时升级。
大多数并发问题都可以用async/await解决。只有当你有特定需求,async/await不能清晰解决时,才使用更复杂的工具。
**尽量避免共享可变状态。**处理并发的最佳方式是通过设计来避免它。不可变数据、消息传递和隔离状态(如演员)消除了整个类别的错误。
**锁应该是例外,而不是规则。**当你无法避免共享可变状态时,偶尔使用锁并不是世界末日。但如果你发现自己经常需要lock、SemaphoreSlim或其他同步原语,请退后一步,重新考虑你的设计。
当你真正需要共享可变状态时:
- **首选:**重新设计以避免它(不可变性、消息传递、演员隔离)
- **次选:**使用
System.Collections.Concurrent(ConcurrentDictionary、ConcurrentQueue等) - **第三选择:**使用
Channel<T>通过消息传递序列化访问 - **最后手段:**对于简单、短暂的临界区使用
lock
锁适用于构建低级基础设施或并发数据结构。但对于业务逻辑,几乎总是有更好的抽象。
决策树
你想要做什么?
│
├─► 等待I/O(HTTP、数据库、文件)?
│ └─► 使用async/await
│
├─► 并行处理集合(CPU密集型)?
│ └─► 使用Parallel.ForEachAsync
│
├─► 生产者/消费者模式(工作队列)?
│ └─► 使用System.Threading.Channels
│
├─► UI事件处理(去抖、节流、合并)?
│ └─► 使用响应式扩展(Rx)
│
├─► 服务器端流处理(背压、批处理)?
│ └─► 使用Akka.NET Streams
│
├─► 具有复杂转换的状态机?
│ └─► 使用Akka.NET Actors(Become模式)
│
├─► 管理许多独立实体的状态?
│ └─► 使用Akka.NET Actors(每个实体一个演员)
│
├─► 协调多个异步操作?
│ └─► 使用Task.WhenAll / Task.WhenAny
│
└─► 以上都不符合?
└─► 问问自己:"我真的需要共享可变状态吗?"
├─► 是 → 考虑重新设计以避免它
└─► 真正无法避免 → 使用Channels或Actors序列化访问
一级:async/await(默认选择)
**用于:**I/O密集型操作、非阻塞等待、大多数日常并发。
// 简单的异步I/O
public async Task<Order> GetOrderAsync(string orderId, CancellationToken ct)
{
var order = await _database.GetAsync(orderId, ct);
var customer = await _customerService.GetAsync(order.CustomerId, ct);
return order with { Customer = customer };
}
// 并行异步操作(当独立时)
public async Task<Dashboard> LoadDashboardAsync(string userId, CancellationToken ct)
{
var ordersTask = _orderService.GetRecentOrdersAsync(userId, ct);
var notificationsTask = _notificationService.GetUnreadAsync(userId, ct);
var statsTask = _statsService.GetUserStatsAsync(userId, ct);
await Task.WhenAll(ordersTask, notificationsTask, statsTask);
return new Dashboard(
Orders: await ordersTask,
Notifications: await notificationsTask,
Stats: await statsTask);
}
关键原则:
- 总是接受
CancellationToken - 在库代码中使用
ConfigureAwait(false) - 不要在异步代码上阻塞(不要
.Result或.Wait())
二级:Parallel.ForEachAsync(CPU密集型并行性)
**用于:**当工作是CPU密集型或你需要控制并发时,处理集合并行。
// 用控制的并行性处理项目
public async Task ProcessOrdersAsync(
IEnumerable<Order> orders,
CancellationToken ct)
{
await Parallel.ForEachAsync(
orders,
new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = ct
},
async (order, token) =>
{
await ProcessOrderAsync(order, token);
});
}
// CPU密集型工作与I/O
public async Task<IReadOnlyList<ProcessedImage>> ProcessImagesAsync(
IEnumerable<string> imagePaths,
CancellationToken ct)
{
var results = new ConcurrentBag<ProcessedImage>();
await Parallel.ForEachAsync(
imagePaths,
new ParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = ct },
async (path, token) =>
{
var image = await File.ReadAllBytesAsync(path, token);
var processed = ProcessImage(image); // CPU密集型
results.Add(processed);
});
return results.ToList();
}
何时不使用:
- 纯I/O操作(async/await足够)
- 当顺序重要时(Parallel不保留顺序)
- 当你需要背压或流控制时
三级:System.Threading.Channels(生产者/消费者)
**用于:**工作队列、生产者/消费者模式、解耦生产者与消费者、简单的流式处理。
// 基本生产者/消费者
public class OrderProcessor
{
private readonly Channel<Order> _channel;
public OrderProcessor()
{
// 有界通道提供背压
_channel = Channel.CreateBounded<Order>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
});
}
// 生产者
public async Task EnqueueOrderAsync(Order order, CancellationToken ct)
{
await _channel.Writer.WriteAsync(order, ct);
}
// 消费者(作为后台任务运行)
public async Task ProcessOrdersAsync(CancellationToken ct)
{
await foreach (var order in _channel.Reader.ReadAllAsync(ct))
{
await ProcessOrderAsync(order, ct);
}
}
// 信号不再有项目
public void Complete() => _channel.Writer.Complete();
}
// 多个消费者(工作窃取模式)
public class WorkerPool
{
private readonly Channel<WorkItem> _channel;
private readonly List<Task> _workers = new();
public WorkerPool(int workerCount)
{
_channel = Channel.CreateUnbounded<WorkItem>();
// 启动多个消费者
for (int i = 0; i < workerCount; i++)
{
_workers.Add(Task.Run(() => ConsumeAsync()));
}
}
private async Task ConsumeAsync()
{
await foreach (var item in _channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}
public ValueTask EnqueueAsync(WorkItem item)
=> _channel.Writer.WriteAsync(item);
}
Channels适用于:
- 从生产者速度解耦消费者速度
- 带有背压的工作缓冲
- 简单的扇出到多个工作者
- 后台处理队列
Channels不适用于:
- 复杂的流操作(批处理、窗口、合并)
- 每个实体的状态处理
- 当你需要复杂的错误处理/监督时
四级:Akka.NET Streams(复杂流处理)
**用于:**背压、批处理、去抖、节流、合并流、复杂转换。
using Akka.Streams;
using Akka.Streams.Dsl;
// 带有超时的批处理
public Source<IReadOnlyList<Event>, NotUsed> BatchEvents(
Source<Event, NotUsed> events)
{
return events
.GroupedWithin(100, TimeSpan.FromSeconds(1)) // 批处理多达100个或1秒
.Select(batch => batch.ToList() as IReadOnlyList<Event>);
}
// 节流
public Source<Request, NotUsed> ThrottleRequests(
Source<Request, NotUsed> requests)
{
return requests
.Throttle(10, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping);
}
// 并行处理与有序结果
public Source<ProcessedItem, NotUsed> ProcessWithParallelism(
Source<Item, NotUsed> items)
{
return items
.SelectAsync(4, async item => await ProcessAsync(item)); // 4并行
}
// 复杂管道
public IRunnableGraph<Task<Done>> CreatePipeline(
Source<RawEvent, NotUsed> events,
Sink<ProcessedEvent, Task<Done>> sink)
{
return events
.Where(e => e.IsValid)
.GroupedWithin(50, TimeSpan.FromMilliseconds(500))
.SelectAsync(4, batch => ProcessBatchAsync(batch))
.SelectMany(results => results)
.ToMaterialized(sink, Keep.Right);
}
Akka.NET Streams擅长:
- 大小和时间限制的批处理
- 节流和速率限制
- 通过整个管道传播的背压
- 合并/拆分流
- 并行处理与排序保证
- 带有监督的错误处理
四级B:响应式扩展(UI和事件组合)
**用于:**UI事件处理,组合事件流,客户端应用程序中基于时间的操作。
Rx在UI场景中表现出色,你需要对用户事件做出反应,如去抖、节流或组合多个事件源。
using System.Reactive.Linq;
// 带有去抖的搜索即你打字
public class SearchViewModel
{
public SearchViewModel(ISearchService searchService)
{
// 对文本更改做出反应,带有去抖
SearchResults = SearchText
.Throttle(TimeSpan.FromMilliseconds(300)) // 等待打字暂停
.DistinctUntilChanged() // 忽略相同的文本
.Where(text => text.Length >= 3) // 最小长度
.SelectMany(text => searchService.SearchAsync(text).ToObservable())
.ObserveOn(RxApp.MainThreadScheduler); // 返回到UI线程
}
public IObservable<string> SearchText { get; }
public IObservable<IList<SearchResult>> SearchResults { get; }
}
// 组合多个UI事件
public IObservable<bool> CanSubmit =>
Observable.CombineLatest(
UsernameValid,
PasswordValid,
EmailValid,
(user, pass, email) => user && pass && email);
// 双击检测
public IObservable<Point> DoubleClicks =>
MouseClicks
.Buffer(TimeSpan.FromMilliseconds(300))
.Where(clicks => clicks.Count >= 2)
.Select(clicks => clicks.Last());
// 带有去抖的自动保存
public IDisposable AutoSave =>
DocumentChanges
.Throttle(TimeSpan.FromSeconds(2))
.Subscribe(async doc => await SaveAsync(doc));
Rx适用于:
- UI事件组合(WPF、WinForms、MAUI、Blazor)
- 带有去抖的搜索即你打字
- 组合多个事件源
- UI中的时间窗口操作
- 拖放手势检测
- 实时数据可视化
Rx与Akka.NET Streams:
| 场景 | Rx | Akka.NET Streams |
|---|---|---|
| UI事件 | ✅ 最佳选择 | 过度杀伤 |
| 客户端组合 | ✅ 最佳选择 | 过度杀伤 |
| 服务器端管道 | 有限的工作但有限 | ✅ 更好的背压 |
| 分布式处理 | ❌ 未为此设计 | ✅ 为此而建 |
| 热 observables | ✅ 原生支持 | 需要更多设置 |
**经验法则:**对于UI/客户端,使用Rx;对于服务器端管道,使用Akka.NET Streams。
五级:Akka.NET Actors(有状态并发)
**用于:**管理多个实体的状态,状态机,基于推送的更新,复杂协调,监督和容错。
实体每演员模式
// 每个订单一个演员 - 每个订单都有隔离的状态
public class OrderActor : ReceiveActor
{
private OrderState _state;
public OrderActor(string orderId)
{
_state = new OrderState(orderId);
Receive<AddItem>(msg =>
{
_state = _state.AddItem(msg.Item);
Sender.Tell(new ItemAdded(msg.Item));
});
Receive<Checkout>(msg =>
{
if (_state.CanCheckout)
{
_state = _state.Checkout();
Sender.Tell(new CheckoutSucceeded(_state.Total));
}
else
{
Sender.Tell(new CheckoutFailed("购物车为空"));
}
});
Receive<GetState>(_ => Sender.Tell(_state));
}
}
带有Become的状态机
演员擅长使用Become()实现状态机,以切换消息处理程序:
public class PaymentActor : ReceiveActor
{
private PaymentData _payment;
public PaymentActor(string paymentId)
{
_payment = new PaymentData(paymentId);
// 从Pending状态开始
Pending();
}
private void Pending()
{
Receive<AuthorizePayment>(msg =>
{
_payment = _payment with { Amount = msg.Amount };
// 转换到Authorizing状态
Become(Authorizing);
Self.Tell(new ProcessAuthorization());
});
Receive<CancelPayment>(_ =>
{
Become(Cancelled);
Sender.Tell(new PaymentCancelled(_payment.Id));
});
}
private void Authorizing()
{
Receive<ProcessAuthorization>(async _ =>
{
var result = await _gateway.AuthorizeAsync(_payment);
if (result.Success)
{
_payment = _payment with { AuthCode = result.AuthCode };
Become(Authorized);
}
else
{
Become(Failed);
}
});
// 在授权期间不能取消 - 稍后堆叠或拒绝
Receive<CancelPayment>(_ =>
{
Sender.Tell(new PaymentError("不能在授权期间取消"));
});
}
private void Authorized()
{
Receive<CapturePayment>(_ =>
{
Become(Capturing);
Self.Tell(new ProcessCapture());
});
Receive<VoidPayment>(_ =>
{
Become(Voiding);
Self.Tell(new ProcessVoid());
});
}
private void Capturing() { /* ... */ }
private void Voiding() { /* ... */ }
private void Cancelled() { /* 只响应GetState */ }
private void Failed() { /* 只响应GetState, Retry */ }
}
分布式实体与集群分片
// 使用集群分片分布式实体
builder.WithShardRegion<OrderActor>(
typeName: "orders",
entityPropsFactory: (_, _, resolver) =>
orderId => Props.Create(() => new OrderActor(orderId)),
messageExtractor: new OrderMessageExtractor(),
shardOptions: new ShardOptions());
// 向任何订单发送消息 - 分片路由到正确的节点
var orderRegion = registry.Get<OrderActor>();
orderRegion.Tell(new ShardingEnvelope("order-123", new AddItem(item)));
何时使用Akka.NET
当你有以下情况时使用Akka.NET Actors:
| 场景 | 为什么使用演员? |
|---|---|
| 许多实体具有独立状态 | 每个实体都有自己的演员 - 没有锁,自然隔离 |
| 状态机 | Become()优雅地模拟状态转换 |
| 基于推送/反应的更新 | 演员自然支持告诉-不问 |
| 监督要求 | 父演员监督孩子,自动在失败时重新启动 |
| 分布式系统 | 集群分片在节点间分布实体 |
| 长期工作流 | 演员+持久性=持久工作流 |
| 实时系统 | 消息驱动,非阻塞设计 |
| IoT/设备管理 | 每个设备=一个演员,可扩展到数百万 |
不要使用Akka.NET的情况:
| 场景 | 更好的替代品 |
|---|---|
| 简单的工作队列 | Channel<T> |
| 请求/响应API | async/await |
| 批处理 | Parallel.ForEachAsync或Akka.NET Streams |
| UI事件处理 | 响应式扩展 |
| 共享状态(单个实例) | 使用Channel进行序列化的服务 |
| CRUD操作 | 标准异步服务 |
演员心态
当你的问题看起来像:
- “我有成千上万的[订单/用户/设备/会话]需要独立状态”
- “每个[实体]经历一个生命周期,在每个阶段有不同的行为”
- “我需要推送更新给感兴趣的各方,当事情变化时”
- “如果处理失败,我希望只重新启动那个实体,而不是整个系统”
- “这需要在多个服务器上工作”
如果这些都不适用,你可能不需要演员。
反模式:避免什么
❌ 业务逻辑中的锁
// 糟糕:使用锁保护共享状态
private readonly object _lock = new();
private Dictionary<string, Order> _orders = new();
public void UpdateOrder(string id, Action<Order> update)
{
lock (_lock)
{
if (_orders.TryGetValue(id, out var order))
{
update(order);
}
}
}
// 好:使用演员或Channel序列化访问
// 每个订单都有自己的演员 - 不需要锁
❌ 手动线程管理
// 糟糕:手动创建线程
var thread = new Thread(() => ProcessOrders());
thread.Start();
// 好:使用Task.Run或更好的抽象
_ = Task.Run(() => ProcessOrdersAsync(cancellationToken));
❌ 在异步代码中阻塞
// 糟糕:在异步上阻塞
var result = GetDataAsync().Result; // 死锁风险!
GetDataAsync().Wait(); // 也不好
// 好:全程异步
var result = await GetDataAsync();
❌ 未经保护的共享可变状态
// 糟糕:多个任务修改共享状态
var results = new List<Result>();
await Parallel.ForEachAsync(items, async (item, ct) =>
{
var result = await ProcessAsync(item, ct);
results.Add(result); // 竞态条件!
});
// 好:使用ConcurrentBag或以不同的方式收集结果
var results = new ConcurrentBag<Result>();
// 或更好:从lambda返回并收集
优先使用异步本地函数
使用异步本地函数,而不是Task.Run(async () => ...)或ContinueWith():
不要:匿名异步Lambda
private void HandleCommand(MyCommand cmd)
{
var self = Self;
_ = Task.Run(async () =>
{
// 这里有很多异步工作...
var result = await DoWorkAsync();
return new WorkCompleted(result);
}).PipeTo(self);
}
做:异步本地函数
private void HandleCommand(MyCommand cmd)
{
async Task<WorkCompleted> ExecuteAsync()
{
// 这里有很多异步工作...
var result = await DoWorkAsync();
return new WorkCompleted(result);
}
ExecuteAsync().PipeTo(Self);
}
避免ContinueWith进行序列化
不要:
someTask
.ContinueWith(t => ProcessResult(t.Result))
.ContinueWith(t => SendNotification(t.Result));
做:
async Task ProcessAndNotifyAsync()
{
var result = await someTask;
var processed = await ProcessResult(result);
await SendNotification(processed);
}
ProcessAndNotifyAsync();
为什么这很重要
| 好处 | 描述 |
|---|---|
| 可读性 | 命名函数自文档化;匿名lambda掩盖意图 |
| 调试 | 堆栈跟踪显示有意义的函数名而不是<>c__DisplayClass |
| 异常处理 | 清洁的try/catch结构,无需AggregateException解包 |
| 作用域清晰 | 本地函数使捕获的变量明确 |
| 可测试性 | 更容易提取和单元测试异步逻辑 |
Akka.NET示例
当在演员中使用PipeTo时,异步本地函数保持模式清晰:
private void HandleSync(StartSync cmd)
{
async Task<SyncResult> PerformSyncAsync()
{
await using var scope = _scopeFactory.CreateAsyncScope();
var service = scope.ServiceProvider.GetRequiredService<ISyncService>();
var count = await service.SyncAsync(cmd.EntityId);
return new SyncResult(cmd.EntityId, count);
}
PerformSyncAsync().PipeTo(Self);
}
这比包装一切在Task.Run(async () => ...)中更清晰。
快速参考:何时使用哪个工具?
| 需要 | 工具 | 示例 |
|---|---|---|
| 等待I/O | async/await |
HTTP调用,数据库查询 |
| 平行CPU工作 | Parallel.ForEachAsync |
图像处理,计算 |
| 工作队列 | Channel<T> |
后台作业处理 |
| UI事件去抖/节流 | 响应式扩展 | 搜索即你打字,自动保存 |
| 服务器端批处理/节流 | Akka.NET Streams | 事件聚合,速率限制 |
| 状态机 | Akka.NET Actors | 支付流程,订单生命周期 |
| 实体状态管理 | Akka.NET Actors | 订单管理,用户会话 |
| 触发多个异步操作 | Task.WhenAll |
加载仪表板数据 |
| 竞赛多个异步操作 | Task.WhenAny |
带有回退的超时 |
| 定期工作 | PeriodicTimer |
健康检查,轮询 |
升级路径
async/await(从这里开始)
│
├─► 需要并行性? → Parallel.ForEachAsync
│
├─► 需要生产者/消费者? → Channel<T>
│
├─► 需要UI事件组合? → 响应式扩展
│
├─► 需要服务器端流处理? → Akka.NET Streams
│
└─► 需要状态机或实体管理? → Akka.NET Actors
**只有当你有具体需求时才升级。**不要“只是以防万一”就使用演员或流 - 从async/await开始,只有当更简单的方法不适合时才向上移动。