Akka.Hosting Actor Patterns
使用此技能时
使用此技能时:
- 构建代表领域对象的实体演员(用户、订单、发票等)
- 需要在单元测试(无集群)和生产(集群分片)中工作的演员
- 使用akka-reminders设置计划任务
- 使用Akka.Hosting扩展方法注册演员
- 创建可重用的演员配置模式
核心原则
- 执行模式抽象 - 相同的演员代码在本地(测试)或集群(生产)中运行
- Local的GenericChildPerEntityParent - 在不需要集群开销的情况下模仿分片语义
- 消息提取器用于路由 - 重用Akka.Cluster.Sharding的IMessageExtractor接口
- Akka.Hosting扩展方法 - 流畅配置,易于组合
- ITimeProvider用于可测试性 - 使用ActorSystem.Scheduler代替DateTime.Now
执行模式
定义一个枚举来控制演员行为:
/// <summary>
/// 确定Akka.NET应该如何配置
/// </summary>
public enum AkkaExecutionMode
{
/// <summary>
/// 纯本地演员系统 - 无远程处理,无集群。
/// 使用GenericChildPerEntityParent代替ShardRegion。
/// 适用于单元测试和简单场景。
/// </summary>
LocalTest,
/// <summary>
/// 完整的集群和ShardRegion。
/// 用于集成测试和生产。
/// </summary>
Clustered
}
GenericChildPerEntityParent
一个轻量级的父演员,将消息路由到子实体,模仿集群分片语义,而不需要集群:
using Akka.Actor;
using Akka.Cluster.Sharding;
/// <summary>
/// 一个通用的“每个实体一个孩子”的父演员。
/// </summary>
/// <remarks>
/// 重用Akka.Cluster.Sharding的IMessageExtractor以实现一致的路由。
/// 适用于不需要集群开销的单元测试。
/// </remarks>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
public static Props CreateProps(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
return Props.Create(() =>
new GenericChildPerEntityParent(extractor, propsFactory));
}
private readonly IMessageExtractor _extractor;
private readonly Func<string, Props> _propsFactory;
public GenericChildPerEntityParent(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
_extractor = extractor;
_propsFactory = propsFactory;
ReceiveAny(message =>
{
var entityId = _extractor.EntityId(message);
if (entityId is null) return;
// 获取现有子项或创建新子项
Context.Child(entityId)
.GetOrElse(() => Context.ActorOf(_propsFactory(entityId), entityId))
.Forward(_extractor.EntityMessage(message));
});
}
}
消息提取器
创建实现IMessageExtractor的提取器,来自Akka.Cluster.Sharding:
using Akka.Cluster.Sharding;
/// <summary>
/// 根据强类型ID路由消息到实体演员。
/// </summary>
public sealed class OrderMessageExtractor : HashCodeMessageExtractor
{
public const int DefaultShardCount = 40;
public OrderMessageExtractor(int maxNumberOfShards = DefaultShardCount)
: base(maxNumberOfShards)
{
}
public override string? EntityId(object message)
{
return message switch
{
IWithOrderId msg => msg.OrderId.Value.ToString(),
_ => null
};
}
}
// 为特定实体定义消息的接口
public interface IWithOrderId
{
OrderId OrderId { get; }
}
// 使用强类型ID
public readonly record struct OrderId(Guid Value)
{
public static OrderId New() => new(Guid.NewGuid());
public override string ToString() => Value.ToString();
}
Akka.Hosting扩展方法
创建抽象执行模式的扩展方法:
using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Hosting;
public static class OrderActorHostingExtensions
{
/// <summary>
/// 添加支持本地和集群模式的OrderActor。
/// </summary>
public static AkkaConfigurationBuilder WithOrderActor(
this AkkaConfigurationBuilder builder,
AkkaExecutionMode executionMode = AkkaExecutionMode.Clustered,
string? clusterRole = null)
{
if (executionMode == AkkaExecutionMode.LocalTest)
{
// 非集群模式:使用GenericChildPerEntityParent
builder.WithActors((system, registry, resolver) =>
{
var parent = system.ActorOf(
GenericChildPerEntityParent.CreateProps(
new OrderMessageExtractor(),
entityId => resolver.Props<OrderActor>(entityId)),
"orders");
registry.Register<OrderActor>(parent);
});
}
else
{
// 集群模式:使用ShardRegion
builder.WithShardRegion<OrderActor>(
"orders",
(system, registry, resolver) =>
entityId => resolver.Props<OrderActor>(entityId),
new OrderMessageExtractor(),
new ShardOptions
{
StateStoreMode = StateStoreMode.DData,
Role = clusterRole
});
}
return builder;
}
}
组合多个演员
创建一个方便的方法来注册所有领域演员:
public static class DomainActorHostingExtensions
{
/// <summary>
/// 添加所有订单领域演员,支持分片。
/// </summary>
public static AkkaConfigurationBuilder WithOrderDomainActors(
this AkkaConfigurationBuilder builder,
AkkaExecutionMode executionMode = AkkaExecutionMode.Clustered,
string? clusterRole = null)
{
return builder
.WithOrderActor(executionMode, clusterRole)
.WithPaymentActor(executionMode, clusterRole)
.WithShipmentActor(executionMode, clusterRole)
.WithNotificationActor(); // 单例,不需要分片
}
}
使用ITimeProvider进行调度
将ActorSystem的Scheduler注册为ITimeProvider,以实现可测试的时间逻辑:
public static class SharedAkkaHostingExtensions
{
public static IServiceCollection AddAkkaWithTimeProvider(
this IServiceCollection services,
Action<AkkaConfigurationBuilder, IServiceProvider> configure)
{
// 使用ActorSystem的Scheduler注册ITimeProvider
services.AddSingleton<ITimeProvider>(sp =>
sp.GetRequiredService<ActorSystem>().Scheduler);
return services.ConfigureAkka((builder, sp) =>
{
configure(builder, sp);
});
}
}
// 在你的演员中,注入ITimeProvider
public class SubscriptionActor : ReceiveActor
{
private readonly ITimeProvider _timeProvider;
public SubscriptionActor(ITimeProvider timeProvider)
{
_timeProvider = timeProvider;
// 使用_timeProvider.GetUtcNow()代替DateTime.UtcNow
// 这允许测试控制时间
}
}
Akka.Reminders集成
对于需要在重启后生存的持久计划任务,使用akka-reminders:
using Akka.Reminders;
using Akka.Reminders.Sql;
using Akka.Reminders.Sql.Configuration;
using Akka.Reminders.Storage;
public static class ReminderHostingExtensions
{
/// <summary>
/// 使用PostgreSQL存储配置akka-reminders。
/// </summary>
public static AkkaConfigurationBuilder WithPostgresReminders(
this AkkaConfigurationBuilder builder,
string connectionString,
string schemaName = "reminders",
string tableName = "scheduled_reminders",
bool autoInitialize = true)
{
return builder.WithLocalReminders(reminders => reminders
.WithResolver(sys => new GenericChildPerEntityResolver(sys))
.WithStorage(system =>
{
var settings = SqlReminderStorageSettings.CreatePostgreSql(
connectionString,
schemaName,
tableName,
autoInitialize);
return new SqlReminderStorage(settings, system);
})
.WithSettings(new ReminderSettings
{
MaxSlippage = TimeSpan.FromSeconds(30),
MaxDeliveryAttempts = 3,
RetryBackoffBase = TimeSpan.FromSeconds(10)
});
}
/// <summary>
/// 为测试配置内存存储的akka-reminders。
/// </summary>
public static AkkaConfigurationBuilder WithInMemoryReminders(
this AkkaConfigurationBuilder builder)
{
return builder.WithLocalReminders(reminders => reminders
.WithResolver(sys => new GenericChildPerEntityResolver(sys))
.WithStorage(system => new InMemoryReminderStorage())
.WithSettings(new ReminderSettings
{
MaxSlippage = TimeSpan.FromSeconds(1),
MaxDeliveryAttempts = 3,
RetryBackoffBase = TimeSpan.FromMilliseconds(100)
}));
}
}
为Child-Per-Entity自定义提醒解析器
将提醒回调路由到GenericChildPerEntityParent演员:
using Akka.Actor;
using Akka.Hosting;
using Akka.Reminders;
/// <summary>
/// 解析提醒目标到GenericChildPerEntityParent演员。
/// </summary>
public sealed class GenericChildPerEntityResolver : IReminderActorResolver
{
private readonly ActorSystem _system;
public GenericChildPerEntityResolver(ActorSystem system)
{
_system = system;
}
public IActorRef ResolveActorRef(ReminderEntry entry)
{
var registry = ActorRegistry.For(_system);
return entry.Key switch
{
var k when k.StartsWith("order-") =>
registry.Get<OrderActor>(),
var k when k.StartsWith("subscription-") =>
registry.Get<SubscriptionActor>(),
_ => throw new InvalidOperationException(
$"Unknown reminder key format: {entry.Key}")
};
}
}
单例演员(不分片)
对于应该只有一个实例的演员:
public static AkkaConfigurationBuilder WithEmailSenderActor(
this AkkaConfigurationBuilder builder)
{
return builder.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(
resolver.Props<EmailSenderActor>(),
"email-sender");
registry.Register<EmailSenderActor>(actor);
});
}
注册表中的标记类型
当你需要引用注册为父项的演员时:
/// <summary>
/// 用于ActorRegistry检索订单管理器的标记类型
/// (OrderActors的GenericChildPerEntityParent)。
/// </summary>
public sealed class OrderManagerActor;
// 在扩展方法中的使用
registry.Register<OrderManagerActor>(parent);
// 在控制器/服务中的使用
public class OrderService
{
private readonly IActorRef _orderManager;
public OrderService(IRequiredActor<OrderManagerActor> orderManager)
{
_orderManager = orderManager.ActorRef;
}
public async Task<OrderResponse> CreateOrder(CreateOrderCommand cmd)
{
return await _orderManager.Ask<OrderResponse>(cmd);
}
}
演员中的DI范围管理
演员没有自动DI范围。 与ASP.NET控制器不同(每个HTTP请求创建一个范围),演员是长寿命的。如果你需要范围服务(如DbContext),请注入IServiceProvider并手动创建范围。
每条消息一个范围的模式
public sealed class OrderProcessingActor : ReceiveActor
{
private readonly IServiceProvider _serviceProvider;
private readonly IActorRef _notificationActor;
public OrderProcessingActor(
IServiceProvider serviceProvider,
IRequiredActor<NotificationActor> notificationActor)
{
_serviceProvider = serviceProvider;
_notificationActor = notificationActor.ActorRef;
ReceiveAsync<ProcessOrder>(HandleProcessOrder);
}
private async Task HandleProcessOrder(ProcessOrder msg)
{
// 为这条消息创建范围 - 处理后释放
using var scope = _serviceProvider.CreateScope();
// 在范围内解析范围服务
var orderRepository = scope.ServiceProvider.GetRequiredService<IOrderRepository>();
var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();
var emailComposer = scope.ServiceProvider.GetRequiredService<IOrderEmailComposer>();
// 使用范围内的服务进行工作
var order = await orderRepository.GetByIdAsync(msg.OrderId);
var payment = await paymentService.ProcessAsync(order);
// 范围释放时提交DbContext更改
}
}
为什么这种模式
| 好处 | 解释 |
|---|---|
| 每条消息一个新鲜的DbContext | 消息之间没有过时的实体跟踪 |
| 适当处置 | 数据库连接在每条消息后释放 |
| 隔离 | 一条消息的错误不会破坏另一条消息的状态 |
| 可测试 | 可以在测试中注入模拟IServiceProvider |
单例服务 - 直接注入
对于无状态、线程安全的服务,直接注入(不需要范围):
public sealed class NotificationActor : ReceiveActor
{
private readonly IEmailLinkGenerator _linkGenerator; // 单例 - 可以!
private readonly IMjmlTemplateRenderer _renderer; // 单例 - 可以!
public NotificationActor(
IEmailLinkGenerator linkGenerator,
IMjmlTemplateRenderer renderer)
{
_linkGenerator = linkGenerator;
_renderer = renderer;
Receive<SendWelcomeEmail>(Handle);
}
}
常见错误:直接注入范围服务
// BAD: 范围服务注入到长寿命的演员
public sealed class BadActor : ReceiveActor
{
private readonly IOrderRepository _repo; // 范围!DbContext永远存在!
public BadActor(IOrderRepository repo) // 在演员创建时捕获
{
_repo = repo; // 这个DbContext将变得过时
}
}
// GOOD: 注入IServiceProvider,每条消息创建范围
public sealed class GoodActor : ReceiveActor
{
private readonly IServiceProvider _sp;
public GoodActor(IServiceProvider sp)
{
_sp = sp;
ReceiveAsync<ProcessOrder>(async msg =>
{
using var scope = _sp.CreateScope();
var repo = scope.ServiceProvider.GetRequiredService<IOrderRepository>();
// 这条消息的新鲜DbContext
});
}
}
关于DI生命周期和范围管理的更多信息,请参见microsoft-extensions/dependency-injection技能。
集群分片配置
RememberEntities: 几乎总是False
RememberEntities控制着分片区域是否记得并自动重启所有曾经创建的实体。这几乎总是false。
builder.WithShardRegion<OrderActor>(
"orders",
(system, registry, resolver) => entityId => resolver.Props<OrderActor>(entityId),
new OrderMessageExtractor(),
new ShardOptions
{
StateStoreMode = StateStoreMode.DData,
RememberEntities = false, // DEFAULT - 几乎总是正确的
Role = clusterRole
});
当RememberEntities = true引起问题时:
| 问题 | 解释 |
|---|---|
| 无界内存增长 | 每个曾经创建的实体都被记住并永远重启 |
| 慢速集群启动 | 集群必须在启动时重启成千上万/数百万的实体 |
| 过时实体复活 | 过期的会话、发送的电子邮件、旧订单都会重新启动 |
| 没有被动化 | 空闲实体无限期地消耗内存(被动化被禁用) |
何时使用每个设置
| 实体类型 | RememberEntities | 原因 |
|---|---|---|
UserSessionActor |
false | 会话到期,登录时创建 |
DraftActor |
false | 草稿被发送/丢弃,短暂 |
EmailSenderActor |
false | 火并忘记操作 |
OrderActor |
false | 订单完成,不断创建新订单 |
ShoppingCartActor |
false | 购物车到期,常见的遗弃购物车 |
TenantActor |
也许true | 固定的租户集合,总是需要 |
AccountActor |
也许true | 有界的账户集合,长寿命 |
经验法则: 仅对以下情况使用RememberEntities = true:
- 有界 实体集(已知上限)
- 长寿命 领域实体,应始终可用
- 实体的记住成本 < 懒创建成本
使用WithShardRegion<T>的标记类型
当使用WithShardRegion<T>时,泛型参数T用作ActorRegistry的标记类型。使用专用的标记类型(而不是演员类本身)进行一致的注册表访问:
/// <summary>
/// ActorRegistry的标记类型。使用此类型检索OrderActor分片区域。
/// </summary>
public sealed class OrderActorRegion;
// 注册 - 使用标记类型作为泛型参数
builder.WithShardRegion<OrderActorRegion>(
"orders",
(system, registry, resolver) => entityId => resolver.Props<OrderActor>(entityId),
new OrderMessageExtractor(),
new ShardOptions { StateStoreMode = StateStoreMode.DData });
// 检索 - 使用相同的标记类型
var orderRegion = ActorRegistry.Get<OrderActorRegion>();
orderRegion.Tell(new CreateOrder(orderId, amount));
为什么使用标记类型?
WithShardRegion<T>自动将分片区域注册在类型T下- 直接使用演员类可能会导致混淆(注册表返回区域,而不是演员)
- 标记类型使意图明确,并在LocalTest和Clustered模式下一致工作
避免冗余注册表调用
WithShardRegion<T>自动将分片区域注册在ActorRegistry中。不要再调用registry.Register<T>():
// BAD - 冗余注册
builder.WithShardRegion<OrderActorRegion>("orders", ...)
.WithActors((system, registry, resolver) =>
{
var region = registry.Get<OrderActorRegion>();
registry.Register<OrderActorRegion>(region); // UNNECESSARY!
});
// GOOD - WithShardRegion已经注册
builder.WithShardRegion<OrderActorRegion>("orders", ...);
// 就这样 - OrderActorRegion现在在注册表中
最佳实践
- 始终支持两种执行模式 - 使测试变得容易,无需更改代码
- 使用强类型ID -
OrderId而不是string或Guid - 基于接口的消息路由 -
IWithOrderId用于类型安全的提取 - 注册父项,而不是子项 - 对于每个实体的孩子,将父项注册在ActorRegistry中
- 标记类型用于清晰度 - 使用空标记类进行注册表查找
- 组合优于继承 - 链式扩展方法,不要创建深层层次结构
- ITimeProvider用于调度 - 永远不要直接在演员中使用
DateTime.Now - akka-reminders用于持久性 - 用于必须在重启后生存的计划任务
- RememberEntities = false默认 - 仅对有界、长寿命的实体设置为true