Akka.NET最佳实践
使用此技能时
使用此技能时:
- 设计actor通信模式
- 在EventStream和DistributedPubSub之间做决策
- 在actors中实现错误处理
- 理解监管策略
- 在Props模式和DependencyResolver之间做选择
- 设计跨节点的工作分配
- 创建可测试的actor系统,可以运行有或没有集群基础设施
- 为本地测试场景抽象化Cluster Sharding
1.EventStream与DistributedPubSub
重要:EventStream仅限LOCAL
Context.System.EventStream是仅限于单个ActorSystem进程。它不跨集群节点工作。
// BAD: 这只在单个服务器上工作
// 当你添加第二个服务器时,服务器2上的订阅者不会收到来自服务器1的事件
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
何时使用EventStream合适:
- 单个进程内的日志记录和诊断
- 真正单进程应用的本地事件总线
- 开发/测试场景
多节点使用DistributedPubSub
对于必须到达多个集群节点的actors的事件,请使用Akka.Cluster.Tools.PublishSubscribe:
using Akka.Cluster.Tools.PublishSubscribe;
public class TimelineUpdatePublisher : ReceiveActor
{
private readonly IActorRef _mediator;
public TimelineUpdatePublisher()
{
// 获取DistributedPubSub中介
_mediator = DistributedPubSub.Get(Context.System).Mediator;
Receive<PublishTimelineUpdate>(msg =>
{
// 发布到一个主题 - 覆盖所有节点的所有订阅者
_mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
});
}
}
public class TimelineSubscriber : ReceiveActor
{
public TimelineSubscriber(UserId userId)
{
var mediator = DistributedPubSub.Get(Context.System).Mediator;
// 订阅用户特定主题
mediator.Tell(new Subscribe($"timeline:{userId}", Self));
Receive<TimelineUpdate>(update =>
{
// 处理更新 - 这在集群节点上工作
});
Receive<SubscribeAck>(ack =>
{
// 订阅确认
});
}
}
Akka.Hosting配置DistributedPubSub
builder.WithDistributedPubSub(role: null); // 在所有角色上可用,或指定一个角色
主题设计模式
| 模式 | 主题格式 | 用例 |
|---|---|---|
| 每个用户 | timeline:{userId} |
时间线更新,通知 |
| 每个实体 | post:{postId} |
发帖参与更新 |
| 广播 | system:announcements |
系统范围通知 |
| 基于角色 | workers:rss-poller |
工作分配 |
2.监管策略
关键澄清:监管是针对CHILDREN
在actor上定义的监管策略决定了该actor如何监管其子代,而不是actor本身如何被监管。
public class ParentActor : ReceiveActor
{
// 此策略适用于ParentActor的子代,而不是ParentActor本身
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
decider: ex => ex switch
{
ArithmeticException => Directive.Resume,
NullReferenceException => Directive.Restart,
ArgumentException => Directive.Stop,
_ => Directive.Escalate
});
}
}
默认监管策略
默认的OneForOneStrategy已经包括了速率限制:
- 1秒内10次重启 = actor被永久停止
- 这防止了无限的重启循环
你很少需要自定义策略,除非你有特定的要求。
何时定义自定义监管
好的理由:
- Actor抛出异常表明状态损坏无法恢复 → 重启
- Actor抛出的异常不应该导致重启(预期的失败) → 恢复
- 子代失败应该影响兄弟 → 使用
AllForOneStrategy - 需要与默认不同的重试限制
坏的理由:
- “只是为了安全” - 默认已经安全
- 不了解actor的作用 - 先了解它
自定义监管何时有意义的例子
public class RssFeedCoordinator : ReceiveActor
{
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1, // 无限重试
withinTimeRange: TimeSpan.FromMinutes(1),
decider: ex => ex switch
{
// HTTP超时 - 暂时的,恢复并让actor通过自己的定时器重试
HttpRequestException => Directive.Resume,
// 订阅源URL永久无效 - 停止这个子代,不要永远重启
InvalidFeedUrlException => Directive.Stop,
// 未知错误 - 重启以清除可能损坏的状态
_ => Directive.Restart
});
}
}
3.错误处理:监管与Try-Catch
何时使用Try-Catch(大多数情况)
使用try-catch时:
- 失败是预期的(网络超时,无效输入,外部服务宕机)
- 你确切知道为什么异常发生
- 你可以优雅地处理它(重试,返回错误响应,记录并继续)
- 重启不会帮助(相同的错误会再次发生)
public class RssFeedPollerActor : ReceiveActor
{
public RssFeedPollerActor()
{
ReceiveAsync<PollFeed>(async msg =>
{
try
{
var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
var items = ParseFeed(feed);
// 处理项目...
}
catch (HttpRequestException ex)
{
// 预期的失败 - 记录并安排重试
_log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromMinutes(5),
Self,
msg,
Self);
}
catch (XmlException ex)
{
// 无效的订阅源格式 - 记录并标记为坏
_log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
}
});
}
}
何时让监管处理它
让异常传播(触发监管)时:
- 你不知道为什么异常发生
- actor的状态可能已损坏
- 重启会有帮助(新状态,重新连接资源)
- 这是一个编程错误(NullReferenceException,来自错误逻辑的InvalidOperationException)
public class OrderActor : ReceiveActor
{
private OrderState _state;
public OrderActor()
{
Receive<ProcessPayment>(msg =>
{
// 如果这抛出,我们不知道为什么 - 让监管重启我们
// 重启将从持久性重新加载状态,可能会解决问题
var result = _state.ApplyPayment(msg.Amount);
Persist(new PaymentApplied(msg.Amount), evt =>
{
_state = _state.With(evt);
});
});
}
}
反模式:吞噬未知异常
// BAD: 吞噬异常隐藏问题
public class BadActor : ReceiveActor
{
public BadActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (Exception ex)
{
// 这隐藏了所有错误 - 你永远不会知道某事是坏的
_log.Error(ex, "Error processing work");
// Actor继续使用可能损坏的状态
}
});
}
}
// GOOD: 处理已知异常,让未知的传播
public class GoodActor : ReceiveActor
{
public GoodActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (HttpRequestException ex)
{
// 已知,预期的失败 - 优雅处理
_log.Warning("HTTP request failed: {Error}", ex.Message);
Sender.Tell(new WorkResult.TransientFailure());
}
// 未知异常传播到监管
});
}
}
4.Props与DependencyResolver
何时使用普通Props
使用Props.Create()时:
- Actor不需要
IServiceProvider或IRequiredActor<T> - 所有依赖项都可以通过构造函数传递
- Actor简单且自包含
// 没有DI需求的简单actor
public static Props Props(PostId postId, IPostWriteStore store)
=> Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));
// 使用
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());
何时使用DependencyResolver
使用resolver.Props<T>()时:
- Actor需要
IServiceProvider创建范围服务 - Actor使用
IRequiredActor<T>获得对其他actors的引用 - Actor有许多已经存在于DI容器中的依赖项
// 需要范围数据库连接的Actor
public class OrderProcessorActor : ReceiveActor
{
public OrderProcessorActor(IServiceProvider serviceProvider)
{
ReceiveAsync<ProcessOrder>(async msg =>
{
// 为此操作创建一个范围
using var scope = serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
// 处理订单...
});
}
}
// 与DI注册
builder.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
registry.Register<OrderProcessorActor>(actor);
});
远程部署考虑事项
你几乎不需要远程部署。 远程部署意味着在创建它的节点以外的节点上部署actor。
如果你不进行远程部署(你可能不会):
Props.Create(() => new Actor(...))带有闭包是可以的- "序列化问题"警告不适用
你会使用远程部署时:
- 将计算密集型工作分配到特定节点
- 在具有特定硬件的节点上运行actors(GPU等)
对于大多数应用,使用集群分片而不是远程部署 - 它自动处理分发。
5.工作分配模式
问题:雷霆兽群
当你有许多后台作业(RSS源,发送电子邮件等)时,不要一次处理全部:
// BAD: 在启动时同时轮询所有源
public class BadRssCoordinator : ReceiveActor
{
public BadRssCoordinator(IRssFeedRepository repo)
{
ReceiveAsync<StartPolling>(async _ =>
{
var feeds = await repo.GetAllFeedsAsync();
foreach (var feed in feeds) // 2000个源 = 2000个同时HTTP请求
{
Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
}
});
}
}
模式1:数据库驱动的工作队列
使用数据库作为工作队列,使用FOR UPDATE SKIP LOCKED:
public class RssPollerWorker : ReceiveActor
{
public RssPollerWorker(IRssFeedRepository repo)
{
ReceiveAsync<PollBatch>(async _ =>
{
// 每个worker认领一批 - 自然分布在多个节点上
var feeds = await repo.ClaimFeedsForPollingAsync(
batchSize: 10,
staleAfter: TimeSpan.FromMinutes(10));
foreach (var feed in feeds)
{
try
{
await PollFeed(feed);
await repo.MarkPolledAsync(feed.Id, success: true);
}
catch (Exception ex)
{
await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
}
}
// 安排下一批
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromSeconds(5),
Self,
PollBatch.Instance,
Self);
});
}
}
-- ClaimFeedsForPollingAsync实现
UPDATE rss_feeds
SET status = 'processing',
processing_started_at = NOW()
WHERE id IN (
SELECT id FROM rss_feeds
WHERE status = 'pending'
AND (next_poll_at IS NULL OR next_poll_at <= NOW())
ORDER BY next_poll_at NULLS FIRST
LIMIT @batchSize
FOR UPDATE SKIP LOCKED
)
RETURNING *;
好处:
- 自然分布在多个服务器节点上
- 不需要协调 - 数据库处理锁定
- 易于监控(查询表)
- 能够在服务器重启后存活
模式2:Akka.Streams限流
使用Akka.Streams限制单个节点内的处理:
public class ThrottledRssProcessor : ReceiveActor
{
public ThrottledRssProcessor(IRssFeedRepository repo)
{
var materializer = Context.System.Materializer();
ReceiveAsync<StartProcessing>(async _ =>
{
await Source.From(await repo.GetPendingFeedsAsync())
.Throttle(10, TimeSpan.FromSeconds(1)) // 最多每秒10个
.SelectAsync(4, async feed => // 最多4个并发
{
await PollFeed(feed);
return feed;
})
.RunWith(Sink.Ignore<RssFeed>(), materializer);
});
}
}
模式3:持久队列(电子邮件出站模式)
对于必须可靠处理的工作,使用数据库支持的出站队列:
// 与业务操作一起事务性地入队工作
public async Task CreatePostAsync(Post post)
{
await using var transaction = await _db.BeginTransactionAsync();
await _postStore.CreateAsync(post);
// 在同一事务中入队通知电子邮件
foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
{
await _emailOutbox.EnqueueAsync(new EmailJob
{
To = follower.Email,
Template = "new-post",
Data = JsonSerializer.Serialize(new { PostId = post.Id })
});
}
await transaction.CommitAsync();
}
// Worker处理出站队列
public class EmailOutboxWorker : ReceiveActor
{
public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
{
ReceiveAsync<ProcessBatch>(async _ =>
{
var batch = await outbox.ClaimBatchAsync(10);
foreach (var job in batch)
{
try
{
await sender.SendAsync(job);
await outbox.MarkSentAsync(job.Id);
}
catch (Exception ex)
{
await outbox.MarkFailedAsync(job.Id, ex.Message);
}
}
});
}
}
6.常见错误总结
| 错误 | 为什么错了 | 修复 |
|---|---|---|
| 使用EventStream进行跨节点pub/sub | EventStream仅限本地 | 使用DistributedPubSub |
| 定义监管以"保护"actor | 监管保护子代 | 理解层次结构 |
| 捕获所有异常 | 隐藏错误,损坏状态 | 只捕获预期错误 |
| 总是使用DependencyResolver | 增加不必要的复杂性 | 尽可能使用普通Props |
| 一次处理所有后台作业 | 雷霆兽群,资源耗尽 | 使用数据库队列+限流 |
| 为预期的失败抛出异常 | 触发不必要的重启 | 返回结果类型,使用消息传递 |
7.快速参考
通信模式决策树
需要在actors之间通信吗? ├── 仅限同一进程? → EventStream可以 ├── 跨集群节点? │ ├── 点对点? → 使用ActorSelection或已知IActorRef │ └── Pub/sub? → 使用DistributedPubSub └── 向外部系统火并忘记? → 考虑出站模式
### 错误处理决策树
actors中发生异常?
├── 预期的失败(HTTP超时,无效输入)?
│ └── 尝试捕获,优雅处理,继续
├── 状态可能已损坏?
│ └── 让监管重启
├── 未知原因?
│ └── 让监管重启
└── 编程错误(空引用,错误逻辑)?
└── 让监管重启,修复错误
Props决策树
创建actors Props? ├── Actor需要IServiceProvider? │ └── 使用resolver.Props<T>() ├── Actor需要IRequiredActor<T>? │ └── 使用resolver.Props<T>() ├── 简单的Actor带有构造函数参数? │ └── 使用Props.Create(() => new Actor(…)) └── 需要远程部署? └── 可能不需要 - 使用集群分片代替
---
## 8.集群/本地模式抽象
对于需要在集群生产环境和没有集群基础设施的本地/测试环境中运行的应用程序,使用抽象模式在实现之间切换。
### AkkaExecutionMode枚举
定义一个执行模式,控制使用哪些实现:
```csharp
/// <summary>
/// 确定如何配置Akka.NET基础设施功能。
/// </summary>
public enum AkkaExecutionMode
{
/// <summary>
/// 本地测试模式 - 没有集群基础设施。
/// 使用内存中实现pub/sub和本地父actors
/// 而不是集群分片。
/// </summary>
LocalTest,
/// <summary>
/// 完整的集群模式,具有分片,单例和分布式pub/sub。
/// </summary>
Clustered
}
GenericChildPerEntityParent - 本地分片替代品
当在本地测试时,你不能使用Cluster Sharding。这个actor通过使用与真实分片相同的IMessageExtractor接口,为每个实体ID创建和管理子actors,模仿分片行为:
/// <summary>
/// 本地父actor,模仿Cluster Sharding行为。
/// 使用与真实分片相同的IMessageExtractor为每个实体ID创建和管理子actors。
/// 允许在模式之间无缝切换。
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
private readonly IMessageExtractor _extractor;
private readonly Func<string, Props> _propsFactory;
private readonly Dictionary<string, IActorRef> _children = new();
private readonly ILoggingAdapter _log = Context.GetLogger();
public GenericChildPerEntityParent(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
_extractor = extractor;
_propsFactory = propsFactory;
ReceiveAny(msg =>
{
var entityId = _extractor.EntityId(msg);
if (string.IsNullOrEmpty(entityId))
{
_log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
Unhandled(msg);
return;
}
var child = GetOrCreateChild(entityId);
// 如果它是ShardingEnvelope,则解包消息
var unwrapped = _extractor.EntityMessage(msg);
child.Forward(unwrapped);
});
}
private IActorRef GetOrCreateChild(string entityId)
{
if (_children.TryGetValue(entityId, out var existing))
return existing;
var props = _propsFactory(entityId);
var child = Context.ActorOf(props, entityId);
Context.Watch(child);
_children[entityId] = child;
_log.Debug("Created child actor for entity {0}", entityId);
return child;
}
protected override void PreRestart(Exception reason, object message)
{
// 在重启时不要停止子代
}
public static Props CreateProps(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
}
}
IPubSubMediator - 抽象DistributedPubSub
创建一个接口来抽象pub/sub,以便测试可以使用本地实现:
/// <summary>
/// 抽象pub/sub消息传递,允许在
/// DistributedPubSub(集群)和本地实现(测试)之间切换。
/// </summary>
public interface IPubSubMediator
{
/// <summary>
/// 订阅actor到一个主题。
/// </summary>
void Subscribe(string topic, IActorRef subscriber);
/// <summary>
/// 从主题中注销actor。
/// </summary>
void Unsubscribe(string topic, IActorRef subscriber);
/// <summary>
/// 向所有订阅者发布消息到一个主题。
/// </summary>
void Publish(string topic, object message);
/// <summary>
/// 发送消息给一个订阅者的主题(负载均衡)。
/// </summary>
void Send(string topic, object message);
}
LocalPubSubMediator - 内存中实现
/// <summary>
/// 本地测试的内存中pub/sub实现,没有集群。
/// 内部使用EventStream以简化。
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
private readonly ActorSystem _system;
private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
private readonly object _lock = new();
public LocalPubSubMediator(ActorSystem system)
{
_system = system;
}
public void Subscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
subs.Add(subscriber);
}
// 发送确认,就像真正的DistributedPubSub一样
subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs))
{
subs.Remove(subscriber);
}
}
subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
}
public void Publish(string topic, object message)
{
HashSet<IActorRef> subscribers;
lock (_lock)
{
if (!_subscriptions.TryGetValue(topic, out var subs))
return;
subscribers = new HashSet<IActorRef>(subs);
}
foreach (var subscriber in subscribers)
{
subscriber.Tell(message);
}
}
public void Send(string topic, object message)
{
IActorRef? target = null;
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
{
// 简单的轮询 - 选择第一个可用的
target = subs.FirstOrDefault();
}
}
target?.Tell(message);
}
}
ClusterPubSubMediator - 生产实现
/// <summary>
/// 生产实现,包装Akka.Cluster.Tools.PublishSubscribe。
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
private readonly IActorRef _mediator;
public ClusterPubSubMediator(ActorSystem system)
{
_mediator = DistributedPubSub.Get(system).Mediator;
}
public void Subscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Subscribe(topic, subscriber));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Unsubscribe(topic, subscriber));
}
public void Publish(string topic, object message)
{
_mediator.Tell(new Publish(topic, message));
}
public void Send(string topic, object message)
{
_mediator.Tell(new Send(topic, message, localAffinity: true));
}
}
将所有内容连接在一起
根据执行模式配置你的ActorSystem:
public static class AkkaHostingExtensions
{
public static AkkaConfigurationBuilder ConfigureActorSystem(
this AkkaConfigurationBuilder builder,
AkkaExecutionMode mode,
IServiceCollection services)
{
if (mode == AkkaExecutionMode.Clustered)
{
builder
.WithClustering()
.WithShardRegion<MyEntity>(
"my-entity",
(system, registry, resolver) => entityId =>
resolver.Props<MyEntityActor>(entityId),
new MyEntityMessageExtractor(),
new ShardOptions())
.WithDistributedPubSub();
// 注册集群pub/sub中介
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new ClusterPubSubMediator(system);
});
}
else // LocalTest模式
{
// 注册本地pub/sub中介
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new LocalPubSubMediator(system);
});
// 使用GenericChildPerEntityParent代替分片
builder.WithActors((system, registry, resolver) =>
{
var parent = system.ActorOf(
GenericChildPerEntityParent.CreateProps(
new MyEntityMessageExtractor(),
entityId => resolver.Props<MyEntityActor>(entityId)),
"my-entity");
registry.Register<MyEntityParent>(parent);
});
}
return builder;
}
}
应用程序代码中的使用
应用程序代码使用抽象,不需要知道哪个模式是活动的:
public class MyService
{
private readonly IPubSubMediator _pubSub;
private readonly IRequiredActor<MyEntityParent> _entityParent;
public MyService(
IPubSubMediator pubSub,
IRequiredActor<MyEntityParent> entityParent)
{
_pubSub = pubSub;
_entityParent = entityParent;
}
public async Task ProcessAsync(string entityId, MyCommand command)
{
// 在两种模式下工作相同
var parent = await _entityParent.GetAsync();
parent.Tell(new ShardingEnvelope(entityId, command));
// 发布事件 - 使用本地和分布式pub/sub
_pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
}
}
此模式的好处
| 好处 | 描述 |
|---|---|
| 快速单元测试 | 没有集群启动开销,测试在毫秒级运行 |
| 相同的消息流 | 相同的IMessageExtractor,相同的消息类型 |
| 易于调试 | 本地模式更简单,可以逐步进行 |
| 集成测试灵活性 | 每个测试场景选择模式 |
| 生产信心 | 抽象是真实实现的薄包装 |
何时使用每种模式
| 场景 | 推荐模式 |
|---|---|
| 单元测试 | LocalTest |
| 集成测试(单节点) | LocalTest |
| 集成测试(多节点) | Clustered |
| 本地开发 | LocalTest或Clustered(你的选择) |
| 生产 | Clustered |
9.Actor日志记录
使用ILoggingAdapter,而不是ILogger<T>
在actors中,使用Context.GetLogger()的ILoggingAdapter,而不是DI注入的ILogger<T>:
public class MyActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
public MyActor()
{
Receive<MyMessage>(msg =>
{
// ✅ Akka.NET ILoggingAdapter与语义日志记录(v1.5.57+)
_log.Info("Processing message for user {UserId}", msg.UserId);
_log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
});
}
}
为什么使用ILoggingAdapter:
- 与Akka的日志记录管道和监管集成
- 支持语义/结构化日志记录,从v1.5.57开始
- 方法名:
Info(),Debug(),Warning(),Error()(而不是Log*变体) - 不需要DI - 直接从actor上下文获得
不要注入ILogger<T>:
// ❌ 不要将ILogger<T>注入actors
public class MyActor : ReceiveActor
{
private readonly ILogger<MyActor> _logger; // 错误!
public MyActor(ILogger<MyActor> logger)
{
_logger = logger;
}
}
语义日志记录(v1.5.57+)
从Akka.NET v1.5.57开始,ILoggingAdapter支持带有命名占位符的语义/结构化日志记录:
// 命名占位符,用于更好的日志聚合和查询
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);
// 优先使用命名占位符而不是位置
// ✅ 好的:{OrderId}, {CustomerId}
// ❌ 避免:{0}, {1}
10.使用CancellationToken管理异步操作
当actors通过PipeTo启动异步操作时,如果不正确管理,这些操作可能会比actor更长命。使用与actor生命周期绑定的CancellationToken。
每Actor作用域CancellationTokenSource
当actor停止时取消正在进行的异步工作:
public class DataSyncActor : ReceiveActor
{
private CancellationTokenSource? _operationCts;
public DataSyncActor()
{
ReceiveAsync<StartSync>(HandleStartSyncAsync);
}
protected override void PostStop()
{
// 当actor停止时取消任何正在进行的异步工作
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = null;
base.PostStop();
}
private Task HandleStartSyncAsync(StartSync cmd)
{
// 取消任何先前的操作,创建新的CTS
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = new CancellationTokenSource();
var ct = _operationCts.Token;
async Task<SyncResult> PerformSyncAsync()
{
try
{
ct.ThrowIfCancellationRequested();
// 将令牌传递给所有异步操作
var data = await _repository.GetDataAsync(ct);
await _service.ProcessAsync(data, ct);
return new SyncResult(Success: true);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Actor正在停止 - 优雅退出
return new SyncResult(Success: false, "Cancelled");
}
}
PerformSyncAsync().PipeTo(Self);
return Task.CompletedTask;
}
}
链接CTS用于每次操作超时
对于可能会挂起的外部API调用,请使用带有短超时的链接CTS:
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);
async Task<SyncResult> PerformSyncAsync()
{
// 检查actor级取消
ct.ThrowIfCancellationRequested();
// 每次操作超时链接到actor的CTS
SomeResult result;
using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
{
opCts.CancelAfter(ApiTimeout);
result = await _externalApi.FetchDataAsync(opCts.Token);
}
// 处理结果...
}
链接CTS的工作方式:
- 从父级(actor停止→立即取消)继承取消
- 通过
CancelAfter添加自己的超时(挂起的API→超时后取消) - 哪个先触发哪个赢
- 每个操作后被丢弃(短命的)
优雅超时与关闭处理
区分actor关闭和操作超时:
try
{
using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
opCts.CancelAfter(ApiTimeout);
await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// 超时(不是actor死亡) - 可以重试或优雅处理
_log.Warning("API call timed out, skipping item");
}
// 如果ct.IsCancellationRequested为true,则让它传播上去
关键点
| 实践 | 描述 |
|---|---|
| Actor CTS在PostStop中 | 总是在PostStop()中取消和处置 |
| 每次操作新CTS | 在开始新工作之前取消上一个 |
| 到处传递令牌 | EF Core查询,HTTP调用等都接受CancellationToken |
| 链接CTS用于超时 | 外部调用获得短超时以防止挂起 |
| 循环中检查 | 在迭代之间调用ct.ThrowIfCancellationRequested() |
| 优雅处理 | 在catch块中区分超时与关闭 |
何时使用
- 任何通过
PipeTo启动异步工作的actors - 长时间运行的操作(同步作业,批量处理)
- 可能会挂起的外部API调用
- 循环中的数据库操作