Akka.HostingActorPatternsSkill akka-hosting-actor-patterns

Akka.Hosting Actor Patterns 是一个用于构建实体演员的技能,它支持在本地测试和集群生产模式下运行。它包括通用子实体父演员模式、消息提取器、集群分片抽象、akka-reminders和ITimeProvider。这个技能有助于创建可重用的演员配置模式,并确保代码在不同环境下的一致性和可测试性。

后端开发 0 次安装 0 次浏览 更新于 2/26/2026

Akka.Hosting Actor Patterns

使用此技能时

使用此技能时:

  • 构建代表领域对象的实体演员(用户、订单、发票等)
  • 需要在单元测试(无集群)和生产(集群分片)中工作的演员
  • 使用akka-reminders设置计划任务
  • 使用Akka.Hosting扩展方法注册演员
  • 创建可重用的演员配置模式

核心原则

  1. 执行模式抽象 - 相同的演员代码在本地(测试)或集群(生产)中运行
  2. Local的GenericChildPerEntityParent - 在不需要集群开销的情况下模仿分片语义
  3. 消息提取器用于路由 - 重用Akka.Cluster.Sharding的IMessageExtractor接口
  4. Akka.Hosting扩展方法 - 流畅配置,易于组合
  5. ITimeProvider用于可测试性 - 使用ActorSystem.Scheduler代替DateTime.Now

执行模式

定义一个枚举来控制演员行为:

/// <summary>
/// 确定Akka.NET应该如何配置
/// </summary>
public enum AkkaExecutionMode
{
    /// <summary>
    /// 纯本地演员系统 - 无远程处理,无集群。
    /// 使用GenericChildPerEntityParent代替ShardRegion。
    /// 适用于单元测试和简单场景。
    /// </summary>
    LocalTest,

    /// <summary>
    /// 完整的集群和ShardRegion。
    /// 用于集成测试和生产。
    /// </summary>
    Clustered
}

GenericChildPerEntityParent

一个轻量级的父演员,将消息路由到子实体,模仿集群分片语义,而不需要集群:

using Akka.Actor;
using Akka.Cluster.Sharding;

/// <summary>
/// 一个通用的“每个实体一个孩子”的父演员。
/// </summary>
/// <remarks>
/// 重用Akka.Cluster.Sharding的IMessageExtractor以实现一致的路由。
/// 适用于不需要集群开销的单元测试。
/// </remarks>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
    public static Props CreateProps(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        return Props.Create(() =>
            new GenericChildPerEntityParent(extractor, propsFactory));
    }

    private readonly IMessageExtractor _extractor;
    private readonly Func<string, Props> _propsFactory;

    public GenericChildPerEntityParent(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        _extractor = extractor;
        _propsFactory = propsFactory;

        ReceiveAny(message =>
        {
            var entityId = _extractor.EntityId(message);
            if (entityId is null) return;

            // 获取现有子项或创建新子项
            Context.Child(entityId)
                .GetOrElse(() => Context.ActorOf(_propsFactory(entityId), entityId))
                .Forward(_extractor.EntityMessage(message));
        });
    }
}

消息提取器

创建实现IMessageExtractor的提取器,来自Akka.Cluster.Sharding:

using Akka.Cluster.Sharding;

/// <summary>
/// 根据强类型ID路由消息到实体演员。
/// </summary>
public sealed class OrderMessageExtractor : HashCodeMessageExtractor
{
    public const int DefaultShardCount = 40;

    public OrderMessageExtractor(int maxNumberOfShards = DefaultShardCount)
        : base(maxNumberOfShards)
    {
    }

    public override string? EntityId(object message)
    {
        return message switch
        {
            IWithOrderId msg => msg.OrderId.Value.ToString(),
            _ => null
        };
    }
}

// 为特定实体定义消息的接口
public interface IWithOrderId
{
    OrderId OrderId { get; }
}

// 使用强类型ID
public readonly record struct OrderId(Guid Value)
{
    public static OrderId New() => new(Guid.NewGuid());
    public override string ToString() => Value.ToString();
}

Akka.Hosting扩展方法

创建抽象执行模式的扩展方法:

using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Hosting;

public static class OrderActorHostingExtensions
{
    /// <summary>
    /// 添加支持本地和集群模式的OrderActor。
    /// </summary>
    public static AkkaConfigurationBuilder WithOrderActor(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode executionMode = AkkaExecutionMode.Clustered,
        string? clusterRole = null)
    {
        if (executionMode == AkkaExecutionMode.LocalTest)
        {
            // 非集群模式:使用GenericChildPerEntityParent
            builder.WithActors((system, registry, resolver) =>
            {
                var parent = system.ActorOf(
                    GenericChildPerEntityParent.CreateProps(
                        new OrderMessageExtractor(),
                        entityId => resolver.Props<OrderActor>(entityId)),
                    "orders");

                registry.Register<OrderActor>(parent);
            });
        }
        else
        {
            // 集群模式:使用ShardRegion
            builder.WithShardRegion<OrderActor>(
                "orders",
                (system, registry, resolver) =>
                    entityId => resolver.Props<OrderActor>(entityId),
                new OrderMessageExtractor(),
                new ShardOptions
                {
                    StateStoreMode = StateStoreMode.DData,
                    Role = clusterRole
                });
        }

        return builder;
    }
}

组合多个演员

创建一个方便的方法来注册所有领域演员:

public static class DomainActorHostingExtensions
{
    /// <summary>
    /// 添加所有订单领域演员,支持分片。
    /// </summary>
    public static AkkaConfigurationBuilder WithOrderDomainActors(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode executionMode = AkkaExecutionMode.Clustered,
        string? clusterRole = null)
    {
        return builder
            .WithOrderActor(executionMode, clusterRole)
            .WithPaymentActor(executionMode, clusterRole)
            .WithShipmentActor(executionMode, clusterRole)
            .WithNotificationActor(); // 单例,不需要分片
    }
}

使用ITimeProvider进行调度

将ActorSystem的Scheduler注册为ITimeProvider,以实现可测试的时间逻辑:

public static class SharedAkkaHostingExtensions
{
    public static IServiceCollection AddAkkaWithTimeProvider(
        this IServiceCollection services,
        Action<AkkaConfigurationBuilder, IServiceProvider> configure)
    {
        // 使用ActorSystem的Scheduler注册ITimeProvider
        services.AddSingleton<ITimeProvider>(sp =>
            sp.GetRequiredService<ActorSystem>().Scheduler);

        return services.ConfigureAkka((builder, sp) =>
        {
            configure(builder, sp);
        });
    }
}

// 在你的演员中,注入ITimeProvider
public class SubscriptionActor : ReceiveActor
{
    private readonly ITimeProvider _timeProvider;

    public SubscriptionActor(ITimeProvider timeProvider)
    {
        _timeProvider = timeProvider;

        // 使用_timeProvider.GetUtcNow()代替DateTime.UtcNow
        // 这允许测试控制时间
    }
}

Akka.Reminders集成

对于需要在重启后生存的持久计划任务,使用akka-reminders:

using Akka.Reminders;
using Akka.Reminders.Sql;
using Akka.Reminders.Sql.Configuration;
using Akka.Reminders.Storage;

public static class ReminderHostingExtensions
{
    /// <summary>
    /// 使用PostgreSQL存储配置akka-reminders。
    /// </summary>
    public static AkkaConfigurationBuilder WithPostgresReminders(
        this AkkaConfigurationBuilder builder,
        string connectionString,
        string schemaName = "reminders",
        string tableName = "scheduled_reminders",
        bool autoInitialize = true)
    {
        return builder.WithLocalReminders(reminders => reminders
            .WithResolver(sys => new GenericChildPerEntityResolver(sys))
            .WithStorage(system =>
            {
                var settings = SqlReminderStorageSettings.CreatePostgreSql(
                    connectionString,
                    schemaName,
                    tableName,
                    autoInitialize);
                return new SqlReminderStorage(settings, system);
            })
            .WithSettings(new ReminderSettings
            {
                MaxSlippage = TimeSpan.FromSeconds(30),
                MaxDeliveryAttempts = 3,
                RetryBackoffBase = TimeSpan.FromSeconds(10)
            });
    }

    /// <summary>
    /// 为测试配置内存存储的akka-reminders。
    /// </summary>
    public static AkkaConfigurationBuilder WithInMemoryReminders(
        this AkkaConfigurationBuilder builder)
    {
        return builder.WithLocalReminders(reminders => reminders
            .WithResolver(sys => new GenericChildPerEntityResolver(sys))
            .WithStorage(system => new InMemoryReminderStorage())
            .WithSettings(new ReminderSettings
            {
                MaxSlippage = TimeSpan.FromSeconds(1),
                MaxDeliveryAttempts = 3,
                RetryBackoffBase = TimeSpan.FromMilliseconds(100)
            }));
    }
}

为Child-Per-Entity自定义提醒解析器

将提醒回调路由到GenericChildPerEntityParent演员:

using Akka.Actor;
using Akka.Hosting;
using Akka.Reminders;

/// <summary>
/// 解析提醒目标到GenericChildPerEntityParent演员。
/// </summary>
public sealed class GenericChildPerEntityResolver : IReminderActorResolver
{
    private readonly ActorSystem _system;

    public GenericChildPerEntityResolver(ActorSystem system)
    {
        _system = system;
    }

    public IActorRef ResolveActorRef(ReminderEntry entry)
    {
        var registry = ActorRegistry.For(_system);

        return entry.Key switch
        {
            var k when k.StartsWith("order-") =>
                registry.Get<OrderActor>(),
            var k when k.StartsWith("subscription-") =>
                registry.Get<SubscriptionActor>(),
            _ => throw new InvalidOperationException(
                $"Unknown reminder key format: {entry.Key}")
        };
    }
}

单例演员(不分片)

对于应该只有一个实例的演员:

public static AkkaConfigurationBuilder WithEmailSenderActor(
    this AkkaConfigurationBuilder builder)
{
    return builder.WithActors((system, registry, resolver) =>
    {
        var actor = system.ActorOf(
            resolver.Props<EmailSenderActor>(),
            "email-sender");
        registry.Register<EmailSenderActor>(actor);
    });
}

注册表中的标记类型

当你需要引用注册为父项的演员时:

/// <summary>
/// 用于ActorRegistry检索订单管理器的标记类型
/// (OrderActors的GenericChildPerEntityParent)。
/// </summary>
public sealed class OrderManagerActor;

// 在扩展方法中的使用
registry.Register<OrderManagerActor>(parent);

// 在控制器/服务中的使用
public class OrderService
{
    private readonly IActorRef _orderManager;

    public OrderService(IRequiredActor<OrderManagerActor> orderManager)
    {
        _orderManager = orderManager.ActorRef;
    }

    public async Task<OrderResponse> CreateOrder(CreateOrderCommand cmd)
    {
        return await _orderManager.Ask<OrderResponse>(cmd);
    }
}

演员中的DI范围管理

演员没有自动DI范围。 与ASP.NET控制器不同(每个HTTP请求创建一个范围),演员是长寿命的。如果你需要范围服务(如DbContext),请注入IServiceProvider并手动创建范围。

每条消息一个范围的模式

public sealed class OrderProcessingActor : ReceiveActor
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IActorRef _notificationActor;

    public OrderProcessingActor(
        IServiceProvider serviceProvider,
        IRequiredActor<NotificationActor> notificationActor)
    {
        _serviceProvider = serviceProvider;
        _notificationActor = notificationActor.ActorRef;

        ReceiveAsync<ProcessOrder>(HandleProcessOrder);
    }

    private async Task HandleProcessOrder(ProcessOrder msg)
    {
        // 为这条消息创建范围 - 处理后释放
        using var scope = _serviceProvider.CreateScope();

        // 在范围内解析范围服务
        var orderRepository = scope.ServiceProvider.GetRequiredService<IOrderRepository>();
        var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();
        var emailComposer = scope.ServiceProvider.GetRequiredService<IOrderEmailComposer>();

        // 使用范围内的服务进行工作
        var order = await orderRepository.GetByIdAsync(msg.OrderId);
        var payment = await paymentService.ProcessAsync(order);

        // 范围释放时提交DbContext更改
    }
}

为什么这种模式

好处 解释
每条消息一个新鲜的DbContext 消息之间没有过时的实体跟踪
适当处置 数据库连接在每条消息后释放
隔离 一条消息的错误不会破坏另一条消息的状态
可测试 可以在测试中注入模拟IServiceProvider

单例服务 - 直接注入

对于无状态、线程安全的服务,直接注入(不需要范围):

public sealed class NotificationActor : ReceiveActor
{
    private readonly IEmailLinkGenerator _linkGenerator;  // 单例 - 可以!
    private readonly IMjmlTemplateRenderer _renderer;     // 单例 - 可以!

    public NotificationActor(
        IEmailLinkGenerator linkGenerator,
        IMjmlTemplateRenderer renderer)
    {
        _linkGenerator = linkGenerator;
        _renderer = renderer;

        Receive<SendWelcomeEmail>(Handle);
    }
}

常见错误:直接注入范围服务

// BAD: 范围服务注入到长寿命的演员
public sealed class BadActor : ReceiveActor
{
    private readonly IOrderRepository _repo;  // 范围!DbContext永远存在!

    public BadActor(IOrderRepository repo)  // 在演员创建时捕获
    {
        _repo = repo;  // 这个DbContext将变得过时
    }
}

// GOOD: 注入IServiceProvider,每条消息创建范围
public sealed class GoodActor : ReceiveActor
{
    private readonly IServiceProvider _sp;

    public GoodActor(IServiceProvider sp)
    {
        _sp = sp;
        ReceiveAsync<ProcessOrder>(async msg =>
        {
            using var scope = _sp.CreateScope();
            var repo = scope.ServiceProvider.GetRequiredService<IOrderRepository>();
            // 这条消息的新鲜DbContext
        });
    }
}

关于DI生命周期和范围管理的更多信息,请参见microsoft-extensions/dependency-injection技能。


集群分片配置

RememberEntities: 几乎总是False

RememberEntities控制着分片区域是否记得并自动重启所有曾经创建的实体。这几乎总是false

builder.WithShardRegion<OrderActor>(
    "orders",
    (system, registry, resolver) => entityId => resolver.Props<OrderActor>(entityId),
    new OrderMessageExtractor(),
    new ShardOptions
    {
        StateStoreMode = StateStoreMode.DData,
        RememberEntities = false,  // DEFAULT - 几乎总是正确的
        Role = clusterRole
    });

RememberEntities = true引起问题时:

问题 解释
无界内存增长 每个曾经创建的实体都被记住并永远重启
慢速集群启动 集群必须在启动时重启成千上万/数百万的实体
过时实体复活 过期的会话、发送的电子邮件、旧订单都会重新启动
没有被动化 空闲实体无限期地消耗内存(被动化被禁用)

何时使用每个设置

实体类型 RememberEntities 原因
UserSessionActor false 会话到期,登录时创建
DraftActor false 草稿被发送/丢弃,短暂
EmailSenderActor false 火并忘记操作
OrderActor false 订单完成,不断创建新订单
ShoppingCartActor false 购物车到期,常见的遗弃购物车
TenantActor 也许true 固定的租户集合,总是需要
AccountActor 也许true 有界的账户集合,长寿命

经验法则: 仅对以下情况使用RememberEntities = true

  1. 有界 实体集(已知上限)
  2. 长寿命 领域实体,应始终可用
  3. 实体的记住成本 < 懒创建成本

使用WithShardRegion<T>的标记类型

当使用WithShardRegion<T>时,泛型参数T用作ActorRegistry的标记类型。使用专用的标记类型(而不是演员类本身)进行一致的注册表访问:

/// <summary>
/// ActorRegistry的标记类型。使用此类型检索OrderActor分片区域。
/// </summary>
public sealed class OrderActorRegion;

// 注册 - 使用标记类型作为泛型参数
builder.WithShardRegion<OrderActorRegion>(
    "orders",
    (system, registry, resolver) => entityId => resolver.Props<OrderActor>(entityId),
    new OrderMessageExtractor(),
    new ShardOptions { StateStoreMode = StateStoreMode.DData });

// 检索 - 使用相同的标记类型
var orderRegion = ActorRegistry.Get<OrderActorRegion>();
orderRegion.Tell(new CreateOrder(orderId, amount));

为什么使用标记类型?

  • WithShardRegion<T>自动将分片区域注册在类型T
  • 直接使用演员类可能会导致混淆(注册表返回区域,而不是演员)
  • 标记类型使意图明确,并在LocalTest和Clustered模式下一致工作

避免冗余注册表调用

WithShardRegion<T>自动将分片区域注册在ActorRegistry中。不要再调用registry.Register<T>()

// BAD - 冗余注册
builder.WithShardRegion<OrderActorRegion>("orders", ...)
    .WithActors((system, registry, resolver) =>
    {
        var region = registry.Get<OrderActorRegion>();
        registry.Register<OrderActorRegion>(region);  // UNNECESSARY!
    });

// GOOD - WithShardRegion已经注册
builder.WithShardRegion<OrderActorRegion>("orders", ...);
// 就这样 - OrderActorRegion现在在注册表中

最佳实践

  1. 始终支持两种执行模式 - 使测试变得容易,无需更改代码
  2. 使用强类型ID - OrderId而不是stringGuid
  3. 基于接口的消息路由 - IWithOrderId用于类型安全的提取
  4. 注册父项,而不是子项 - 对于每个实体的孩子,将父项注册在ActorRegistry中
  5. 标记类型用于清晰度 - 使用空标记类进行注册表查找
  6. 组合优于继承 - 链式扩展方法,不要创建深层层次结构
  7. ITimeProvider用于调度 - 永远不要直接在演员中使用DateTime.Now
  8. akka-reminders用于持久性 - 用于必须在重启后生存的计划任务
  9. RememberEntities = false默认 - 仅对有界、长寿命的实体设置为true