Akka.NETBestPracticesSkill akka-net-best-practices

Akka.NET最佳实践指南,涵盖事件流与分布式发布订阅、监管策略、错误处理、Props与依赖注入、工作分配模式、集群与本地模式抽象等多个关键领域,旨在帮助开发者构建高效、可测试且健壮的Akka.NET应用程序。

0 次安装 0 次浏览 更新于 2/26/2026

Akka.NET最佳实践

使用此技能时

使用此技能时:

  • 设计actor通信模式
  • 在EventStream和DistributedPubSub之间做决策
  • 在actors中实现错误处理
  • 理解监管策略
  • 在Props模式和DependencyResolver之间做选择
  • 设计跨节点的工作分配
  • 创建可测试的actor系统,可以运行有或没有集群基础设施
  • 为本地测试场景抽象化Cluster Sharding

1.EventStream与DistributedPubSub

重要:EventStream仅限LOCAL

Context.System.EventStream仅限于单个ActorSystem进程。它不跨集群节点工作。

// BAD: 这只在单个服务器上工作
// 当你添加第二个服务器时,服务器2上的订阅者不会收到来自服务器1的事件
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));

何时使用EventStream合适:

  • 单个进程内的日志记录和诊断
  • 真正单进程应用的本地事件总线
  • 开发/测试场景

多节点使用DistributedPubSub

对于必须到达多个集群节点的actors的事件,请使用Akka.Cluster.Tools.PublishSubscribe

using Akka.Cluster.Tools.PublishSubscribe;

public class TimelineUpdatePublisher : ReceiveActor
{
    private readonly IActorRef _mediator;

    public TimelineUpdatePublisher()
    {
        // 获取DistributedPubSub中介
        _mediator = DistributedPubSub.Get(Context.System).Mediator;

        Receive<PublishTimelineUpdate>(msg =>
        {
            // 发布到一个主题 - 覆盖所有节点的所有订阅者
            _mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
        });
    }
}

public class TimelineSubscriber : ReceiveActor
{
    public TimelineSubscriber(UserId userId)
    {
        var mediator = DistributedPubSub.Get(Context.System).Mediator;

        // 订阅用户特定主题
        mediator.Tell(new Subscribe($"timeline:{userId}", Self));

        Receive<TimelineUpdate>(update =>
        {
            // 处理更新 - 这在集群节点上工作
        });

        Receive<SubscribeAck>(ack =>
        {
            // 订阅确认
        });
    }
}

Akka.Hosting配置DistributedPubSub

builder.WithDistributedPubSub(role: null); // 在所有角色上可用,或指定一个角色

主题设计模式

模式 主题格式 用例
每个用户 timeline:{userId} 时间线更新,通知
每个实体 post:{postId} 发帖参与更新
广播 system:announcements 系统范围通知
基于角色 workers:rss-poller 工作分配

2.监管策略

关键澄清:监管是针对CHILDREN

在actor上定义的监管策略决定了该actor如何监管其子代,而不是actor本身如何被监管。

public class ParentActor : ReceiveActor
{
    // 此策略适用于ParentActor的子代,而不是ParentActor本身
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: 10,
            withinTimeRange: TimeSpan.FromSeconds(30),
            decider: ex => ex switch
            {
                ArithmeticException => Directive.Resume,
                NullReferenceException => Directive.Restart,
                ArgumentException => Directive.Stop,
                _ => Directive.Escalate
            });
    }
}

默认监管策略

默认的OneForOneStrategy已经包括了速率限制:

  • 1秒内10次重启 = actor被永久停止
  • 这防止了无限的重启循环

你很少需要自定义策略,除非你有特定的要求。

何时定义自定义监管

好的理由:

  • Actor抛出异常表明状态损坏无法恢复 → 重启
  • Actor抛出的异常不应该导致重启(预期的失败) → 恢复
  • 子代失败应该影响兄弟 → 使用AllForOneStrategy
  • 需要与默认不同的重试限制

坏的理由:

  • “只是为了安全” - 默认已经安全
  • 不了解actor的作用 - 先了解它

自定义监管何时有意义的例子

public class RssFeedCoordinator : ReceiveActor
{
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: -1, // 无限重试
            withinTimeRange: TimeSpan.FromMinutes(1),
            decider: ex => ex switch
            {
                // HTTP超时 - 暂时的,恢复并让actor通过自己的定时器重试
                HttpRequestException => Directive.Resume,

                // 订阅源URL永久无效 - 停止这个子代,不要永远重启
                InvalidFeedUrlException => Directive.Stop,

                // 未知错误 - 重启以清除可能损坏的状态
                _ => Directive.Restart
            });
    }
}

3.错误处理:监管与Try-Catch

何时使用Try-Catch(大多数情况)

使用try-catch时:

  • 失败是预期的(网络超时,无效输入,外部服务宕机)
  • 你确切知道为什么异常发生
  • 你可以优雅地处理它(重试,返回错误响应,记录并继续)
  • 重启不会帮助(相同的错误会再次发生)
public class RssFeedPollerActor : ReceiveActor
{
    public RssFeedPollerActor()
    {
        ReceiveAsync<PollFeed>(async msg =>
        {
            try
            {
                var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
                var items = ParseFeed(feed);
                // 处理项目...
            }
            catch (HttpRequestException ex)
            {
                // 预期的失败 - 记录并安排重试
                _log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
                Context.System.Scheduler.ScheduleTellOnce(
                    TimeSpan.FromMinutes(5),
                    Self,
                    msg,
                    Self);
            }
            catch (XmlException ex)
            {
                // 无效的订阅源格式 - 记录并标记为坏
                _log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
                Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
            }
        });
    }
}

何时让监管处理它

让异常传播(触发监管)时:

  • 不知道为什么异常发生
  • actor的状态可能已损坏
  • 重启会有帮助(新状态,重新连接资源)
  • 这是一个编程错误(NullReferenceException,来自错误逻辑的InvalidOperationException)
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor()
    {
        Receive<ProcessPayment>(msg =>
        {
            // 如果这抛出,我们不知道为什么 - 让监管重启我们
            // 重启将从持久性重新加载状态,可能会解决问题
            var result = _state.ApplyPayment(msg.Amount);
            Persist(new PaymentApplied(msg.Amount), evt =>
            {
                _state = _state.With(evt);
            });
        });
    }
}

反模式:吞噬未知异常

// BAD: 吞噬异常隐藏问题
public class BadActor : ReceiveActor
{
    public BadActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (Exception ex)
            {
                // 这隐藏了所有错误 - 你永远不会知道某事是坏的
                _log.Error(ex, "Error processing work");
                // Actor继续使用可能损坏的状态
            }
        });
    }
}

// GOOD: 处理已知异常,让未知的传播
public class GoodActor : ReceiveActor
{
    public GoodActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (HttpRequestException ex)
            {
                // 已知,预期的失败 - 优雅处理
                _log.Warning("HTTP request failed: {Error}", ex.Message);
                Sender.Tell(new WorkResult.TransientFailure());
            }
            // 未知异常传播到监管
        });
    }
}

4.Props与DependencyResolver

何时使用普通Props

使用Props.Create()时:

  • Actor不需要IServiceProviderIRequiredActor<T>
  • 所有依赖项都可以通过构造函数传递
  • Actor简单且自包含
// 没有DI需求的简单actor
public static Props Props(PostId postId, IPostWriteStore store)
    => Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));

// 使用
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());

何时使用DependencyResolver

使用resolver.Props<T>()时:

  • Actor需要IServiceProvider创建范围服务
  • Actor使用IRequiredActor<T>获得对其他actors的引用
  • Actor有许多已经存在于DI容器中的依赖项
// 需要范围数据库连接的Actor
public class OrderProcessorActor : ReceiveActor
{
    public OrderProcessorActor(IServiceProvider serviceProvider)
    {
        ReceiveAsync<ProcessOrder>(async msg =>
        {
            // 为此操作创建一个范围
            using var scope = serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
            // 处理订单...
        });
    }
}

// 与DI注册
builder.WithActors((system, registry, resolver) =>
{
    var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
    registry.Register<OrderProcessorActor>(actor);
});

远程部署考虑事项

你几乎不需要远程部署。 远程部署意味着在创建它的节点以外的节点上部署actor。

如果你不进行远程部署(你可能不会):

  • Props.Create(() => new Actor(...))带有闭包是可以的
  • "序列化问题"警告不适用

你会使用远程部署时:

  • 将计算密集型工作分配到特定节点
  • 在具有特定硬件的节点上运行actors(GPU等)

对于大多数应用,使用集群分片而不是远程部署 - 它自动处理分发。


5.工作分配模式

问题:雷霆兽群

当你有许多后台作业(RSS源,发送电子邮件等)时,不要一次处理全部:

// BAD: 在启动时同时轮询所有源
public class BadRssCoordinator : ReceiveActor
{
    public BadRssCoordinator(IRssFeedRepository repo)
    {
        ReceiveAsync<StartPolling>(async _ =>
        {
            var feeds = await repo.GetAllFeedsAsync();
            foreach (var feed in feeds) // 2000个源 = 2000个同时HTTP请求
            {
                Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
            }
        });
    }
}

模式1:数据库驱动的工作队列

使用数据库作为工作队列,使用FOR UPDATE SKIP LOCKED

public class RssPollerWorker : ReceiveActor
{
    public RssPollerWorker(IRssFeedRepository repo)
    {
        ReceiveAsync<PollBatch>(async _ =>
        {
            // 每个worker认领一批 - 自然分布在多个节点上
            var feeds = await repo.ClaimFeedsForPollingAsync(
                batchSize: 10,
                staleAfter: TimeSpan.FromMinutes(10));

            foreach (var feed in feeds)
            {
                try
                {
                    await PollFeed(feed);
                    await repo.MarkPolledAsync(feed.Id, success: true);
                }
                catch (Exception ex)
                {
                    await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
                }
            }

            // 安排下一批
            Context.System.Scheduler.ScheduleTellOnce(
                TimeSpan.FromSeconds(5),
                Self,
                PollBatch.Instance,
                Self);
        });
    }
}
-- ClaimFeedsForPollingAsync实现
UPDATE rss_feeds
SET status = 'processing',
    processing_started_at = NOW()
WHERE id IN (
    SELECT id FROM rss_feeds
    WHERE status = 'pending'
      AND (next_poll_at IS NULL OR next_poll_at <= NOW())
    ORDER BY next_poll_at NULLS FIRST
    LIMIT @batchSize
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

好处:

  • 自然分布在多个服务器节点上
  • 不需要协调 - 数据库处理锁定
  • 易于监控(查询表)
  • 能够在服务器重启后存活

模式2:Akka.Streams限流

使用Akka.Streams限制单个节点内的处理:

public class ThrottledRssProcessor : ReceiveActor
{
    public ThrottledRssProcessor(IRssFeedRepository repo)
    {
        var materializer = Context.System.Materializer();

        ReceiveAsync<StartProcessing>(async _ =>
        {
            await Source.From(await repo.GetPendingFeedsAsync())
                .Throttle(10, TimeSpan.FromSeconds(1)) // 最多每秒10个
                .SelectAsync(4, async feed => // 最多4个并发
                {
                    await PollFeed(feed);
                    return feed;
                })
                .RunWith(Sink.Ignore<RssFeed>(), materializer);
        });
    }
}

模式3:持久队列(电子邮件出站模式)

对于必须可靠处理的工作,使用数据库支持的出站队列:

// 与业务操作一起事务性地入队工作
public async Task CreatePostAsync(Post post)
{
    await using var transaction = await _db.BeginTransactionAsync();

    await _postStore.CreateAsync(post);

    // 在同一事务中入队通知电子邮件
    foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
    {
        await _emailOutbox.EnqueueAsync(new EmailJob
        {
            To = follower.Email,
            Template = "new-post",
            Data = JsonSerializer.Serialize(new { PostId = post.Id })
        });
    }

    await transaction.CommitAsync();
}

// Worker处理出站队列
public class EmailOutboxWorker : ReceiveActor
{
    public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
    {
        ReceiveAsync<ProcessBatch>(async _ =>
        {
            var batch = await outbox.ClaimBatchAsync(10);
            foreach (var job in batch)
            {
                try
                {
                    await sender.SendAsync(job);
                    await outbox.MarkSentAsync(job.Id);
                }
                catch (Exception ex)
                {
                    await outbox.MarkFailedAsync(job.Id, ex.Message);
                }
            }
        });
    }
}

6.常见错误总结

错误 为什么错了 修复
使用EventStream进行跨节点pub/sub EventStream仅限本地 使用DistributedPubSub
定义监管以"保护"actor 监管保护子代 理解层次结构
捕获所有异常 隐藏错误,损坏状态 只捕获预期错误
总是使用DependencyResolver 增加不必要的复杂性 尽可能使用普通Props
一次处理所有后台作业 雷霆兽群,资源耗尽 使用数据库队列+限流
为预期的失败抛出异常 触发不必要的重启 返回结果类型,使用消息传递

7.快速参考

通信模式决策树

需要在actors之间通信吗? ├── 仅限同一进程? → EventStream可以 ├── 跨集群节点? │ ├── 点对点? → 使用ActorSelection或已知IActorRef │ └── Pub/sub? → 使用DistributedPubSub └── 向外部系统火并忘记? → 考虑出站模式


### 错误处理决策树

actors中发生异常?
├── 预期的失败(HTTP超时,无效输入)?
│   └── 尝试捕获,优雅处理,继续
├── 状态可能已损坏?
│   └── 让监管重启
├── 未知原因?
│   └── 让监管重启
└── 编程错误(空引用,错误逻辑)?
    └── 让监管重启,修复错误

Props决策树

创建actors Props? ├── Actor需要IServiceProvider? │ └── 使用resolver.Props<T>() ├── Actor需要IRequiredActor<T>? │ └── 使用resolver.Props<T>() ├── 简单的Actor带有构造函数参数? │ └── 使用Props.Create(() => new Actor(…)) └── 需要远程部署? └── 可能不需要 - 使用集群分片代替


---

## 8.集群/本地模式抽象

对于需要在集群生产环境和没有集群基础设施的本地/测试环境中运行的应用程序,使用抽象模式在实现之间切换。

### AkkaExecutionMode枚举

定义一个执行模式,控制使用哪些实现:

```csharp
/// <summary>
/// 确定如何配置Akka.NET基础设施功能。
/// </summary>
public enum AkkaExecutionMode
{
    /// <summary>
    /// 本地测试模式 - 没有集群基础设施。
    /// 使用内存中实现pub/sub和本地父actors
    /// 而不是集群分片。
    /// </summary>
    LocalTest,

    /// <summary>
    /// 完整的集群模式,具有分片,单例和分布式pub/sub。
    /// </summary>
    Clustered
}

GenericChildPerEntityParent - 本地分片替代品

当在本地测试时,你不能使用Cluster Sharding。这个actor通过使用与真实分片相同的IMessageExtractor接口,为每个实体ID创建和管理子actors,模仿分片行为:

/// <summary>
/// 本地父actor,模仿Cluster Sharding行为。
/// 使用与真实分片相同的IMessageExtractor为每个实体ID创建和管理子actors。
/// 允许在模式之间无缝切换。
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
    private readonly IMessageExtractor _extractor;
    private readonly Func<string, Props> _propsFactory;
    private readonly Dictionary<string, IActorRef> _children = new();
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public GenericChildPerEntityParent(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        _extractor = extractor;
        _propsFactory = propsFactory;

        ReceiveAny(msg =>
        {
            var entityId = _extractor.EntityId(msg);
            if (string.IsNullOrEmpty(entityId))
            {
                _log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
                Unhandled(msg);
                return;
            }

            var child = GetOrCreateChild(entityId);

            // 如果它是ShardingEnvelope,则解包消息
            var unwrapped = _extractor.EntityMessage(msg);
            child.Forward(unwrapped);
        });
    }

    private IActorRef GetOrCreateChild(string entityId)
    {
        if (_children.TryGetValue(entityId, out var existing))
            return existing;

        var props = _propsFactory(entityId);
        var child = Context.ActorOf(props, entityId);
        Context.Watch(child);
        _children[entityId] = child;

        _log.Debug("Created child actor for entity {0}", entityId);
        return child;
    }

    protected override void PreRestart(Exception reason, object message)
    {
        // 在重启时不要停止子代
    }

    public static Props CreateProps(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
    }
}

IPubSubMediator - 抽象DistributedPubSub

创建一个接口来抽象pub/sub,以便测试可以使用本地实现:

/// <summary>
/// 抽象pub/sub消息传递,允许在
/// DistributedPubSub(集群)和本地实现(测试)之间切换。
/// </summary>
public interface IPubSubMediator
{
    /// <summary>
    /// 订阅actor到一个主题。
    /// </summary>
    void Subscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// 从主题中注销actor。
    /// </summary>
    void Unsubscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// 向所有订阅者发布消息到一个主题。
    /// </summary>
    void Publish(string topic, object message);

    /// <summary>
    /// 发送消息给一个订阅者的主题(负载均衡)。
    /// </summary>
    void Send(string topic, object message);
}

LocalPubSubMediator - 内存中实现

/// <summary>
/// 本地测试的内存中pub/sub实现,没有集群。
/// 内部使用EventStream以简化。
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
    private readonly ActorSystem _system;
    private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
    private readonly object _lock = new();

    public LocalPubSubMediator(ActorSystem system)
    {
        _system = system;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
            subs.Add(subscriber);
        }

        // 发送确认,就像真正的DistributedPubSub一样
        subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs))
            {
                subs.Remove(subscriber);
            }
        }

        subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
    }

    public void Publish(string topic, object message)
    {
        HashSet<IActorRef> subscribers;
        lock (_lock)
        {
            if (!_subscriptions.TryGetValue(topic, out var subs))
                return;
            subscribers = new HashSet<IActorRef>(subs);
        }

        foreach (var subscriber in subscribers)
        {
            subscriber.Tell(message);
        }
    }

    public void Send(string topic, object message)
    {
        IActorRef? target = null;
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
            {
                // 简单的轮询 - 选择第一个可用的
                target = subs.FirstOrDefault();
            }
        }

        target?.Tell(message);
    }
}

ClusterPubSubMediator - 生产实现

/// <summary>
/// 生产实现,包装Akka.Cluster.Tools.PublishSubscribe。
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
    private readonly IActorRef _mediator;

    public ClusterPubSubMediator(ActorSystem system)
    {
        _mediator = DistributedPubSub.Get(system).Mediator;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Subscribe(topic, subscriber));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Unsubscribe(topic, subscriber));
    }

    public void Publish(string topic, object message)
    {
        _mediator.Tell(new Publish(topic, message));
    }

    public void Send(string topic, object message)
    {
        _mediator.Tell(new Send(topic, message, localAffinity: true));
    }
}

将所有内容连接在一起

根据执行模式配置你的ActorSystem:

public static class AkkaHostingExtensions
{
    public static AkkaConfigurationBuilder ConfigureActorSystem(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode mode,
        IServiceCollection services)
    {
        if (mode == AkkaExecutionMode.Clustered)
        {
            builder
                .WithClustering()
                .WithShardRegion<MyEntity>(
                    "my-entity",
                    (system, registry, resolver) => entityId =>
                        resolver.Props<MyEntityActor>(entityId),
                    new MyEntityMessageExtractor(),
                    new ShardOptions())
                .WithDistributedPubSub();

            // 注册集群pub/sub中介
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new ClusterPubSubMediator(system);
            });
        }
        else // LocalTest模式
        {
            // 注册本地pub/sub中介
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new LocalPubSubMediator(system);
            });

            // 使用GenericChildPerEntityParent代替分片
            builder.WithActors((system, registry, resolver) =>
            {
                var parent = system.ActorOf(
                    GenericChildPerEntityParent.CreateProps(
                        new MyEntityMessageExtractor(),
                        entityId => resolver.Props<MyEntityActor>(entityId)),
                    "my-entity");

                registry.Register<MyEntityParent>(parent);
            });
        }

        return builder;
    }
}

应用程序代码中的使用

应用程序代码使用抽象,不需要知道哪个模式是活动的:

public class MyService
{
    private readonly IPubSubMediator _pubSub;
    private readonly IRequiredActor<MyEntityParent> _entityParent;

    public MyService(
        IPubSubMediator pubSub,
        IRequiredActor<MyEntityParent> entityParent)
    {
        _pubSub = pubSub;
        _entityParent = entityParent;
    }

    public async Task ProcessAsync(string entityId, MyCommand command)
    {
        // 在两种模式下工作相同
        var parent = await _entityParent.GetAsync();
        parent.Tell(new ShardingEnvelope(entityId, command));

        // 发布事件 - 使用本地和分布式pub/sub
        _pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
    }
}

此模式的好处

好处 描述
快速单元测试 没有集群启动开销,测试在毫秒级运行
相同的消息流 相同的IMessageExtractor,相同的消息类型
易于调试 本地模式更简单,可以逐步进行
集成测试灵活性 每个测试场景选择模式
生产信心 抽象是真实实现的薄包装

何时使用每种模式

场景 推荐模式
单元测试 LocalTest
集成测试(单节点) LocalTest
集成测试(多节点) Clustered
本地开发 LocalTest或Clustered(你的选择)
生产 Clustered

9.Actor日志记录

使用ILoggingAdapter,而不是ILogger<T>

在actors中,使用Context.GetLogger()ILoggingAdapter,而不是DI注入的ILogger<T>

public class MyActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public MyActor()
    {
        Receive<MyMessage>(msg =>
        {
            // ✅ Akka.NET ILoggingAdapter与语义日志记录(v1.5.57+)
            _log.Info("Processing message for user {UserId}", msg.UserId);
            _log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
        });
    }
}

为什么使用ILoggingAdapter:

  • 与Akka的日志记录管道和监管集成
  • 支持语义/结构化日志记录,从v1.5.57开始
  • 方法名:Info(), Debug(), Warning(), Error()(而不是Log*变体)
  • 不需要DI - 直接从actor上下文获得

不要注入ILogger<T>:

// ❌ 不要将ILogger<T>注入actors
public class MyActor : ReceiveActor
{
    private readonly ILogger<MyActor> _logger; // 错误!

    public MyActor(ILogger<MyActor> logger)
    {
        _logger = logger;
    }
}

语义日志记录(v1.5.57+)

从Akka.NET v1.5.57开始,ILoggingAdapter支持带有命名占位符的语义/结构化日志记录:

// 命名占位符,用于更好的日志聚合和查询
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);

// 优先使用命名占位符而不是位置
// ✅ 好的:{OrderId}, {CustomerId}
// ❌ 避免:{0}, {1}

10.使用CancellationToken管理异步操作

当actors通过PipeTo启动异步操作时,如果不正确管理,这些操作可能会比actor更长命。使用与actor生命周期绑定的CancellationToken

每Actor作用域CancellationTokenSource

当actor停止时取消正在进行的异步工作:

public class DataSyncActor : ReceiveActor
{
    private CancellationTokenSource? _operationCts;

    public DataSyncActor()
    {
        ReceiveAsync<StartSync>(HandleStartSyncAsync);
    }

    protected override void PostStop()
    {
        // 当actor停止时取消任何正在进行的异步工作
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = null;
        base.PostStop();
    }

    private Task HandleStartSyncAsync(StartSync cmd)
    {
        // 取消任何先前的操作,创建新的CTS
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = new CancellationTokenSource();
        var ct = _operationCts.Token;

        async Task<SyncResult> PerformSyncAsync()
        {
            try
            {
                ct.ThrowIfCancellationRequested();

                // 将令牌传递给所有异步操作
                var data = await _repository.GetDataAsync(ct);
                await _service.ProcessAsync(data, ct);

                return new SyncResult(Success: true);
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                // Actor正在停止 - 优雅退出
                return new SyncResult(Success: false, "Cancelled");
            }
        }

        PerformSyncAsync().PipeTo(Self);
        return Task.CompletedTask;
    }
}

链接CTS用于每次操作超时

对于可能会挂起的外部API调用,请使用带有短超时的链接CTS:

private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);

async Task<SyncResult> PerformSyncAsync()
{
    // 检查actor级取消
    ct.ThrowIfCancellationRequested();

    // 每次操作超时链接到actor的CTS
    SomeResult result;
    using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
    {
        opCts.CancelAfter(ApiTimeout);
        result = await _externalApi.FetchDataAsync(opCts.Token);
    }

    // 处理结果...
}

链接CTS的工作方式:

  • 从父级(actor停止→立即取消)继承取消
  • 通过CancelAfter添加自己的超时(挂起的API→超时后取消)
  • 哪个先触发哪个赢
  • 每个操作后被丢弃(短命的)

优雅超时与关闭处理

区分actor关闭和操作超时:

try
{
    using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    opCts.CancelAfter(ApiTimeout);
    await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
    // 超时(不是actor死亡) - 可以重试或优雅处理
    _log.Warning("API call timed out, skipping item");
}
// 如果ct.IsCancellationRequested为true,则让它传播上去

关键点

实践 描述
Actor CTS在PostStop中 总是在PostStop()中取消和处置
每次操作新CTS 在开始新工作之前取消上一个
到处传递令牌 EF Core查询,HTTP调用等都接受CancellationToken
链接CTS用于超时 外部调用获得短超时以防止挂起
循环中检查 在迭代之间调用ct.ThrowIfCancellationRequested()
优雅处理 在catch块中区分超时与关闭

何时使用

  • 任何通过PipeTo启动异步工作的actors
  • 长时间运行的操作(同步作业,批量处理)
  • 可能会挂起的外部API调用
  • 循环中的数据库操作