csharp-concurrency-patternsSkill csharp-concurrency-patterns

.NET并发:选择合适的抽象 - 从I/O的async/await到生产者/消费者模式的Channels,再到Akka.NET的状态实体管理。避免锁和手动同步,除非绝对必要。

后端开发 0 次安装 0 次浏览 更新于 2/26/2026

.NET并发:选择正确的工具

何时使用此技能

当:

  • 决定如何处理.NET中的并发操作
  • 评估是否使用async/await、Channels、Akka.NET或其他抽象
  • 倾向于使用锁、信号量或其他同步原语
  • 需要处理具有背压、批处理或去抖的数据流
  • 在多个并发实体间管理状态

理念

从简单开始,仅在需要时升级。

大多数并发问题都可以用async/await解决。只有当你有特定需求,async/await不能清晰解决时,才使用更复杂的工具。

**尽量避免共享可变状态。**处理并发的最佳方式是通过设计来避免它。不可变数据、消息传递和隔离状态(如演员)消除了整个类别的错误。

**锁应该是例外,而不是规则。**当你无法避免共享可变状态时,偶尔使用锁并不是世界末日。但如果你发现自己经常需要lockSemaphoreSlim或其他同步原语,请退后一步,重新考虑你的设计。

当你真正需要共享可变状态时:

  1. **首选:**重新设计以避免它(不可变性、消息传递、演员隔离)
  2. **次选:**使用System.Collections.Concurrent(ConcurrentDictionary、ConcurrentQueue等)
  3. **第三选择:**使用Channel<T>通过消息传递序列化访问
  4. **最后手段:**对于简单、短暂的临界区使用lock

锁适用于构建低级基础设施或并发数据结构。但对于业务逻辑,几乎总是有更好的抽象。

决策树

你想要做什么?
│
├─► 等待I/O(HTTP、数据库、文件)?
│   └─► 使用async/await
│
├─► 并行处理集合(CPU密集型)?
│   └─► 使用Parallel.ForEachAsync
│
├─► 生产者/消费者模式(工作队列)?
│   └─► 使用System.Threading.Channels
│
├─► UI事件处理(去抖、节流、合并)?
│   └─► 使用响应式扩展(Rx)
│
├─► 服务器端流处理(背压、批处理)?
│   └─► 使用Akka.NET Streams
│
├─► 具有复杂转换的状态机?
│   └─► 使用Akka.NET Actors(Become模式)
│
├─► 管理许多独立实体的状态?
│   └─► 使用Akka.NET Actors(每个实体一个演员)
│
├─► 协调多个异步操作?
│   └─► 使用Task.WhenAll / Task.WhenAny
│
└─► 以上都不符合?
    └─► 问问自己:"我真的需要共享可变状态吗?"
        ├─► 是 → 考虑重新设计以避免它
        └─► 真正无法避免 → 使用Channels或Actors序列化访问

一级:async/await(默认选择)

**用于:**I/O密集型操作、非阻塞等待、大多数日常并发。

// 简单的异步I/O
public async Task<Order> GetOrderAsync(string orderId, CancellationToken ct)
{
    var order = await _database.GetAsync(orderId, ct);
    var customer = await _customerService.GetAsync(order.CustomerId, ct);
    return order with { Customer = customer };
}

// 并行异步操作(当独立时)
public async Task<Dashboard> LoadDashboardAsync(string userId, CancellationToken ct)
{
    var ordersTask = _orderService.GetRecentOrdersAsync(userId, ct);
    var notificationsTask = _notificationService.GetUnreadAsync(userId, ct);
    var statsTask = _statsService.GetUserStatsAsync(userId, ct);

    await Task.WhenAll(ordersTask, notificationsTask, statsTask);

    return new Dashboard(
        Orders: await ordersTask,
        Notifications: await notificationsTask,
        Stats: await statsTask);
}

关键原则:

  • 总是接受CancellationToken
  • 在库代码中使用ConfigureAwait(false)
  • 不要在异步代码上阻塞(不要.Result.Wait()

二级:Parallel.ForEachAsync(CPU密集型并行性)

**用于:**当工作是CPU密集型或你需要控制并发时,处理集合并行。

// 用控制的并行性处理项目
public async Task ProcessOrdersAsync(
    IEnumerable<Order> orders,
    CancellationToken ct)
{
    await Parallel.ForEachAsync(
        orders,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            CancellationToken = ct
        },
        async (order, token) =>
        {
            await ProcessOrderAsync(order, token);
        });
}

// CPU密集型工作与I/O
public async Task<IReadOnlyList<ProcessedImage>> ProcessImagesAsync(
    IEnumerable<string> imagePaths,
    CancellationToken ct)
{
    var results = new ConcurrentBag<ProcessedImage>();

    await Parallel.ForEachAsync(
        imagePaths,
        new ParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = ct },
        async (path, token) =>
        {
            var image = await File.ReadAllBytesAsync(path, token);
            var processed = ProcessImage(image); // CPU密集型
            results.Add(processed);
        });

    return results.ToList();
}

何时不使用:

  • 纯I/O操作(async/await足够)
  • 当顺序重要时(Parallel不保留顺序)
  • 当你需要背压或流控制时

三级:System.Threading.Channels(生产者/消费者)

**用于:**工作队列、生产者/消费者模式、解耦生产者与消费者、简单的流式处理。

// 基本生产者/消费者
public class OrderProcessor
{
    private readonly Channel<Order> _channel;

    public OrderProcessor()
    {
        // 有界通道提供背压
        _channel = Channel.CreateBounded<Order>(new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait
        });
    }

    // 生产者
    public async Task EnqueueOrderAsync(Order order, CancellationToken ct)
    {
        await _channel.Writer.WriteAsync(order, ct);
    }

    // 消费者(作为后台任务运行)
    public async Task ProcessOrdersAsync(CancellationToken ct)
    {
        await foreach (var order in _channel.Reader.ReadAllAsync(ct))
        {
            await ProcessOrderAsync(order, ct);
        }
    }

    // 信号不再有项目
    public void Complete() => _channel.Writer.Complete();
}
// 多个消费者(工作窃取模式)
public class WorkerPool
{
    private readonly Channel<WorkItem> _channel;
    private readonly List<Task> _workers = new();

    public WorkerPool(int workerCount)
    {
        _channel = Channel.CreateUnbounded<WorkItem>();

        // 启动多个消费者
        for (int i = 0; i < workerCount; i++)
        {
            _workers.Add(Task.Run(() => ConsumeAsync()));
        }
    }

    private async Task ConsumeAsync()
    {
        await foreach (var item in _channel.Reader.ReadAllAsync())
        {
            await ProcessAsync(item);
        }
    }

    public ValueTask EnqueueAsync(WorkItem item)
        => _channel.Writer.WriteAsync(item);
}

Channels适用于:

  • 从生产者速度解耦消费者速度
  • 带有背压的工作缓冲
  • 简单的扇出到多个工作者
  • 后台处理队列

Channels不适用于:

  • 复杂的流操作(批处理、窗口、合并)
  • 每个实体的状态处理
  • 当你需要复杂的错误处理/监督时

四级:Akka.NET Streams(复杂流处理)

**用于:**背压、批处理、去抖、节流、合并流、复杂转换。

using Akka.Streams;
using Akka.Streams.Dsl;

// 带有超时的批处理
public Source<IReadOnlyList<Event>, NotUsed> BatchEvents(
    Source<Event, NotUsed> events)
{
    return events
        .GroupedWithin(100, TimeSpan.FromSeconds(1)) // 批处理多达100个或1秒
        .Select(batch => batch.ToList() as IReadOnlyList<Event>);
}

// 节流
public Source<Request, NotUsed> ThrottleRequests(
    Source<Request, NotUsed> requests)
{
    return requests
        .Throttle(10, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping);
}

// 并行处理与有序结果
public Source<ProcessedItem, NotUsed> ProcessWithParallelism(
    Source<Item, NotUsed> items)
{
    return items
        .SelectAsync(4, async item => await ProcessAsync(item)); // 4并行
}

// 复杂管道
public IRunnableGraph<Task<Done>> CreatePipeline(
    Source<RawEvent, NotUsed> events,
    Sink<ProcessedEvent, Task<Done>> sink)
{
    return events
        .Where(e => e.IsValid)
        .GroupedWithin(50, TimeSpan.FromMilliseconds(500))
        .SelectAsync(4, batch => ProcessBatchAsync(batch))
        .SelectMany(results => results)
        .ToMaterialized(sink, Keep.Right);
}

Akka.NET Streams擅长:

  • 大小和时间限制的批处理
  • 节流和速率限制
  • 通过整个管道传播的背压
  • 合并/拆分流
  • 并行处理与排序保证
  • 带有监督的错误处理

四级B:响应式扩展(UI和事件组合)

**用于:**UI事件处理,组合事件流,客户端应用程序中基于时间的操作。

Rx在UI场景中表现出色,你需要对用户事件做出反应,如去抖、节流或组合多个事件源。

using System.Reactive.Linq;

// 带有去抖的搜索即你打字
public class SearchViewModel
{
    public SearchViewModel(ISearchService searchService)
    {
        // 对文本更改做出反应,带有去抖
        SearchResults = SearchText
            .Throttle(TimeSpan.FromMilliseconds(300))  // 等待打字暂停
            .DistinctUntilChanged()                     // 忽略相同的文本
            .Where(text => text.Length >= 3)           // 最小长度
            .SelectMany(text => searchService.SearchAsync(text).ToObservable())
            .ObserveOn(RxApp.MainThreadScheduler);     // 返回到UI线程
    }

    public IObservable<string> SearchText { get; }
    public IObservable<IList<SearchResult>> SearchResults { get; }
}

// 组合多个UI事件
public IObservable<bool> CanSubmit =>
    Observable.CombineLatest(
        UsernameValid,
        PasswordValid,
        EmailValid,
        (user, pass, email) => user && pass && email);

// 双击检测
public IObservable<Point> DoubleClicks =>
    MouseClicks
        .Buffer(TimeSpan.FromMilliseconds(300))
        .Where(clicks => clicks.Count >= 2)
        .Select(clicks => clicks.Last());

// 带有去抖的自动保存
public IDisposable AutoSave =>
    DocumentChanges
        .Throttle(TimeSpan.FromSeconds(2))
        .Subscribe(async doc => await SaveAsync(doc));

Rx适用于:

  • UI事件组合(WPF、WinForms、MAUI、Blazor)
  • 带有去抖的搜索即你打字
  • 组合多个事件源
  • UI中的时间窗口操作
  • 拖放手势检测
  • 实时数据可视化

Rx与Akka.NET Streams:

场景 Rx Akka.NET Streams
UI事件 ✅ 最佳选择 过度杀伤
客户端组合 ✅ 最佳选择 过度杀伤
服务器端管道 有限的工作但有限 ✅ 更好的背压
分布式处理 ❌ 未为此设计 ✅ 为此而建
热 observables ✅ 原生支持 需要更多设置

**经验法则:**对于UI/客户端,使用Rx;对于服务器端管道,使用Akka.NET Streams。

五级:Akka.NET Actors(有状态并发)

**用于:**管理多个实体的状态,状态机,基于推送的更新,复杂协调,监督和容错。

实体每演员模式

// 每个订单一个演员 - 每个订单都有隔离的状态
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor(string orderId)
    {
        _state = new OrderState(orderId);

        Receive<AddItem>(msg =>
        {
            _state = _state.AddItem(msg.Item);
            Sender.Tell(new ItemAdded(msg.Item));
        });

        Receive<Checkout>(msg =>
        {
            if (_state.CanCheckout)
            {
                _state = _state.Checkout();
                Sender.Tell(new CheckoutSucceeded(_state.Total));
            }
            else
            {
                Sender.Tell(new CheckoutFailed("购物车为空"));
            }
        });

        Receive<GetState>(_ => Sender.Tell(_state));
    }
}

带有Become的状态机

演员擅长使用Become()实现状态机,以切换消息处理程序:

public class PaymentActor : ReceiveActor
{
    private PaymentData _payment;

    public PaymentActor(string paymentId)
    {
        _payment = new PaymentData(paymentId);

        // 从Pending状态开始
        Pending();
    }

    private void Pending()
    {
        Receive<AuthorizePayment>(msg =>
        {
            _payment = _payment with { Amount = msg.Amount };
            // 转换到Authorizing状态
            Become(Authorizing);
            Self.Tell(new ProcessAuthorization());
        });

        Receive<CancelPayment>(_ =>
        {
            Become(Cancelled);
            Sender.Tell(new PaymentCancelled(_payment.Id));
        });
    }

    private void Authorizing()
    {
        Receive<ProcessAuthorization>(async _ =>
        {
            var result = await _gateway.AuthorizeAsync(_payment);
            if (result.Success)
            {
                _payment = _payment with { AuthCode = result.AuthCode };
                Become(Authorized);
            }
            else
            {
                Become(Failed);
            }
        });

        // 在授权期间不能取消 - 稍后堆叠或拒绝
        Receive<CancelPayment>(_ =>
        {
            Sender.Tell(new PaymentError("不能在授权期间取消"));
        });
    }

    private void Authorized()
    {
        Receive<CapturePayment>(_ =>
        {
            Become(Capturing);
            Self.Tell(new ProcessCapture());
        });

        Receive<VoidPayment>(_ =>
        {
            Become(Voiding);
            Self.Tell(new ProcessVoid());
        });
    }

    private void Capturing() { /* ... */ }
    private void Voiding() { /* ... */ }
    private void Cancelled() { /* 只响应GetState */ }
    private void Failed() { /* 只响应GetState, Retry */ }
}

分布式实体与集群分片

// 使用集群分片分布式实体
builder.WithShardRegion<OrderActor>(
    typeName: "orders",
    entityPropsFactory: (_, _, resolver) =>
        orderId => Props.Create(() => new OrderActor(orderId)),
    messageExtractor: new OrderMessageExtractor(),
    shardOptions: new ShardOptions());

// 向任何订单发送消息 - 分片路由到正确的节点
var orderRegion = registry.Get<OrderActor>();
orderRegion.Tell(new ShardingEnvelope("order-123", new AddItem(item)));

何时使用Akka.NET

当你有以下情况时使用Akka.NET Actors:

场景 为什么使用演员?
许多实体具有独立状态 每个实体都有自己的演员 - 没有锁,自然隔离
状态机 Become()优雅地模拟状态转换
基于推送/反应的更新 演员自然支持告诉-不问
监督要求 父演员监督孩子,自动在失败时重新启动
分布式系统 集群分片在节点间分布实体
长期工作流 演员+持久性=持久工作流
实时系统 消息驱动,非阻塞设计
IoT/设备管理 每个设备=一个演员,可扩展到数百万

不要使用Akka.NET的情况:

场景 更好的替代品
简单的工作队列 Channel<T>
请求/响应API async/await
批处理 Parallel.ForEachAsync或Akka.NET Streams
UI事件处理 响应式扩展
共享状态(单个实例) 使用Channel进行序列化的服务
CRUD操作 标准异步服务

演员心态

当你的问题看起来像:

  • “我有成千上万的[订单/用户/设备/会话]需要独立状态”
  • “每个[实体]经历一个生命周期,在每个阶段有不同的行为”
  • “我需要推送更新给感兴趣的各方,当事情变化时”
  • “如果处理失败,我希望只重新启动那个实体,而不是整个系统”
  • “这需要在多个服务器上工作”

如果这些都不适用,你可能不需要演员。

反模式:避免什么

❌ 业务逻辑中的锁

// 糟糕:使用锁保护共享状态
private readonly object _lock = new();
private Dictionary<string, Order> _orders = new();

public void UpdateOrder(string id, Action<Order> update)
{
    lock (_lock)
    {
        if (_orders.TryGetValue(id, out var order))
        {
            update(order);
        }
    }
}

// 好:使用演员或Channel序列化访问
// 每个订单都有自己的演员 - 不需要锁

❌ 手动线程管理

// 糟糕:手动创建线程
var thread = new Thread(() => ProcessOrders());
thread.Start();

// 好:使用Task.Run或更好的抽象
_ = Task.Run(() => ProcessOrdersAsync(cancellationToken));

❌ 在异步代码中阻塞

// 糟糕:在异步上阻塞
var result = GetDataAsync().Result; // 死锁风险!
GetDataAsync().Wait();              // 也不好

// 好:全程异步
var result = await GetDataAsync();

❌ 未经保护的共享可变状态

// 糟糕:多个任务修改共享状态
var results = new List<Result>();
await Parallel.ForEachAsync(items, async (item, ct) =>
{
    var result = await ProcessAsync(item, ct);
    results.Add(result); // 竞态条件!
});

// 好:使用ConcurrentBag或以不同的方式收集结果
var results = new ConcurrentBag<Result>();
// 或更好:从lambda返回并收集

优先使用异步本地函数

使用异步本地函数,而不是Task.Run(async () => ...)ContinueWith()

不要:匿名异步Lambda

private void HandleCommand(MyCommand cmd)
{
    var self = Self;

    _ = Task.Run(async () =>
    {
        // 这里有很多异步工作...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }).PipeTo(self);
}

做:异步本地函数

private void HandleCommand(MyCommand cmd)
{
    async Task<WorkCompleted> ExecuteAsync()
    {
        // 这里有很多异步工作...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }

    ExecuteAsync().PipeTo(Self);
}

避免ContinueWith进行序列化

不要:

someTask
    .ContinueWith(t => ProcessResult(t.Result))
    .ContinueWith(t => SendNotification(t.Result));

做:

async Task ProcessAndNotifyAsync()
{
    var result = await someTask;
    var processed = await ProcessResult(result);
    await SendNotification(processed);
}

ProcessAndNotifyAsync();

为什么这很重要

好处 描述
可读性 命名函数自文档化;匿名lambda掩盖意图
调试 堆栈跟踪显示有意义的函数名而不是<>c__DisplayClass
异常处理 清洁的try/catch结构,无需AggregateException解包
作用域清晰 本地函数使捕获的变量明确
可测试性 更容易提取和单元测试异步逻辑

Akka.NET示例

当在演员中使用PipeTo时,异步本地函数保持模式清晰:

private void HandleSync(StartSync cmd)
{
    async Task<SyncResult> PerformSyncAsync()
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var service = scope.ServiceProvider.GetRequiredService<ISyncService>();

        var count = await service.SyncAsync(cmd.EntityId);
        return new SyncResult(cmd.EntityId, count);
    }

    PerformSyncAsync().PipeTo(Self);
}

这比包装一切在Task.Run(async () => ...)中更清晰。

快速参考:何时使用哪个工具?

需要 工具 示例
等待I/O async/await HTTP调用,数据库查询
平行CPU工作 Parallel.ForEachAsync 图像处理,计算
工作队列 Channel<T> 后台作业处理
UI事件去抖/节流 响应式扩展 搜索即你打字,自动保存
服务器端批处理/节流 Akka.NET Streams 事件聚合,速率限制
状态机 Akka.NET Actors 支付流程,订单生命周期
实体状态管理 Akka.NET Actors 订单管理,用户会话
触发多个异步操作 Task.WhenAll 加载仪表板数据
竞赛多个异步操作 Task.WhenAny 带有回退的超时
定期工作 PeriodicTimer 健康检查,轮询

升级路径

async/await(从这里开始)
    │
    ├─► 需要并行性? → Parallel.ForEachAsync
    │
    ├─► 需要生产者/消费者? → Channel<T>
    │
    ├─► 需要UI事件组合? → 响应式扩展
    │
    ├─► 需要服务器端流处理? → Akka.NET Streams
    │
    └─► 需要状态机或实体管理? → Akka.NET Actors

**只有当你有具体需求时才升级。**不要“只是以防万一”就使用演员或流 - 从async/await开始,只有当更简单的方法不适合时才向上移动。