Akka.NET 管理和服务发现
何时使用此技能
使用此技能时:
- 将 Akka.NET 集群部署到 Kubernetes 或云环境
- 用动态服务发现替换静态种子节点
- 配置集群引导以自动形成
- 为负载均衡器设置健康端点
- 与 Azure 表存储、Kubernetes API 或基于配置的发现集成
概览
Akka.Management 提供 HTTP 端点用于集群管理,并与 Akka.Cluster.Bootstrap 集成,以启用使用服务发现而不是静态种子节点的动态集群形成。
为什么使用 Akka.Management?
| 方法 | 优点 | 缺点 |
|---|---|---|
| 静态种子节点 | 简单,无依赖 | 不扩展,需要已知 IP |
| Akka.Management | 动态发现,可扩展到 N 个节点 | 更多配置,外部依赖 |
使用静态种子节点 用于:开发,单节点部署,固定基础设施。
使用 Akka.Management 用于:Kubernetes,自动扩展组,动态环境,生产集群。
架构
┌─────────────────────────────────────────────────────────────┐
│ 集群引导 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 节点 1 │ │ 节点 2 │ │ 节点 3 │ │
│ │ │ │ │ │ │ │
│ │ 管理 │◄──►│ 管理 │◄──►│ 管理 │ │
│ │ HTTP :8558 │ │ HTTP :8558 │ │ HTTP :8558 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌───────▼───────┐ │
│ │ 发现 │ │
│ │ 提供者 │ │
│ └───────────────┘ │
│ │ │
└────────────────────────────┼────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌──────▼─────┐ ┌─────▼──────┐
│ Kubernetes│ │ Azure │ │ Config │
│ API │ │ 表存储 │ │ (HOCON) │
└───────────┘ └────────────┘ └────────────┘
必需的 NuGet 包
<ItemGroup>
<!-- 核心管理 -->
<PackageReference Include="Akka.Management" />
<PackageReference Include="Akka.Management.Cluster.Bootstrap" />
<!-- 选择一个发现提供者 -->
<PackageReference Include="Akka.Discovery.KubernetesApi" /> <!-- 用于 Kubernetes -->
<PackageReference Include="Akka.Discovery.Azure" /> <!-- 用于 Azure -->
<PackageReference Include="Akka.Discovery.Config.Hosting" /> <!-- 用于静态配置 -->
</ItemGroup>
配置模型
为所有管理选项创建强类型设置。有关验证模式,请参见 microsoft-extensions-configuration 技能。
AkkaManagementOptions
using System.Net;
public class AkkaManagementOptions
{
/// <summary>
/// 管理 HTTP 端点的主机名。
/// 其他节点使用此主机名来联系此节点的管理端点。
/// </summary>
public string HostName { get; set; } = Dns.GetHostName();
/// <summary>
/// 管理 HTTP 端点的端口。
/// 标准端口是 8558。
/// </summary>
public int Port { get; set; } = 8558;
}
ClusterBootstrapOptions
public class ClusterBootstrapOptions
{
/// <summary>
/// 启用/禁用 Akka.Management 集群引导。
/// 当禁用时,使用传统的种子节点。
/// </summary>
public bool Enabled { get; set; } = false;
/// <summary>
/// 用于发现的服务名称。
/// 同一集群中的所有节点必须使用相同的服务名称。
/// </summary>
public string ServiceName { get; set; } = "my-service";
/// <summary>
/// 用于管理 HTTP 端点的端口名称。
/// Kubernetes 发现使用它来找到正确的端口。
/// </summary>
public string PortName { get; set; } = "management";
/// <summary>
/// 形成集群所需的最小联系点数量。
/// 应与您的最小副本计数匹配。
/// </summary>
/// <remarks>
/// 设置为 1 用于开发,生产为 3+。
/// </remarks>
public int RequiredContactPointsNr { get; set; } = 3;
/// <summary>
/// 使用哪种发现机制。
/// </summary>
public DiscoveryMethod DiscoveryMethod { get; set; } = DiscoveryMethod.Config;
/// <summary>
/// 探测发现的联系点的频率。
/// </summary>
public TimeSpan ContactPointProbingInterval { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 查询发现提供者的频率。
/// </summary>
public TimeSpan BootstrapperDiscoveryPingInterval { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 等待稳定联系点之前形成集群的时间。
/// 对于较慢的环境,增加此值。
/// </summary>
public TimeSpan StableMargin { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// 是否联系所有发现的节点还是仅所需的数量。
/// 设置为 true 以提高集群形成的可靠性。
/// </summary>
public bool ContactWithAllContactPoints { get; set; } = true;
/// <summary>
/// 通过管理端口过滤联系点。
/// 对于 Kubernetes(固定端口)设置为 true,对于 Aspire(动态端口)设置为 false。
/// </summary>
public bool FilterOnFallbackPort { get; set; } = true;
// 发现特定选项
public string[]? ConfigServiceEndpoints { get; set; }
public AzureDiscoveryOptions? AzureDiscoveryOptions { get; set; }
public KubernetesDiscoveryOptions? KubernetesDiscoveryOptions { get; set; }
}
public enum DiscoveryMethod
{
/// <summary>
/// 静态配置 - 在 HOCON/appsettings 中定义的端点。
/// 适用于开发和固定基础设施。
/// </summary>
Config,
/// <summary>
/// Kubernetes API 发现 - 查询 K8s API 以获取 pod 端点。
/// 最适合 Kubernetes 部署。
/// </summary>
Kubernetes,
/// <summary>
/// Azure 表存储 - 节点在共享表中注册自己。
/// 适用于 Azure 部署和 Aspire 本地开发。
/// </summary>
AzureTableStorage
}
发现特定选项
public class AzureDiscoveryOptions
{
public string? ConnectionString { get; set; }
public string TableName { get; set; } = "AkkaDiscovery";
}
public class KubernetesDiscoveryOptions
{
/// <summary>
/// 要搜索 pod 的 Kubernetes 命名空间。
/// 如果为 null,则使用当前 pod 的命名空间。
/// </summary>
public string? PodNamespace { get; set; }
/// <summary>
/// 用于过滤 pod 的标签选择器(例如,"app=my-service")。
/// </summary>
public string? PodLabelSelector { get; set; }
/// <summary>
/// pod 规范中用于管理端点的端口名称。
/// </summary>
public string PodPortName { get; set; } = "management";
}
Akka.Hosting 配置
基本设置与模式选择
public static class AkkaConfiguration
{
public static IServiceCollection ConfigureAkka(
this IServiceCollection services,
Action<AkkaConfigurationBuilder, IServiceProvider>? additionalConfig = null)
{
// 绑定并验证设置(见 microsoft-extensions-configuration 技能)
services.AddOptions<AkkaSettings>()
.BindConfiguration("AkkaSettings")
.ValidateDataAnnotations()
.ValidateOnStart();
services.AddSingleton<IValidateOptions<AkkaSettings>, AkkaSettingsValidator>();
return services.AddAkka("MySystem", (builder, sp) =>
{
var settings = sp.GetRequiredService<IOptions<AkkaSettings>>().Value;
var configuration = sp.GetRequiredService<IConfiguration>();
ConfigureNetwork(builder, settings, configuration);
ConfigureHealthChecks(builder);
additionalConfig?.Invoke(builder, sp);
});
}
private static void ConfigureNetwork(
AkkaConfigurationBuilder builder,
AkkaSettings settings,
IConfiguration configuration)
{
// LocalTest 模式 = 无网络
if (settings.ExecutionMode == AkkaExecutionMode.LocalTest)
return;
// 配置远程处理
builder.WithRemoting(settings.RemoteOptions);
if (settings.ClusterBootstrapOptions.Enabled)
{
// 使用 Akka.Management 动态集群形成
ConfigureAkkaManagement(builder, settings, configuration);
}
else
{
// 传统的种子节点集群
builder.WithClustering(settings.ClusterOptions);
}
}
private static void ConfigureHealthChecks(AkkaConfigurationBuilder builder)
{
builder
.WithActorSystemLivenessCheck()
.WithAkkaClusterReadinessCheck();
}
}
Akka.Management 配置
private static void ConfigureAkkaManagement(
AkkaConfigurationBuilder builder,
AkkaSettings settings,
IConfiguration configuration)
{
var mgmtOptions = settings.AkkaManagementOptions;
var bootstrapOptions = settings.ClusterBootstrapOptions;
// 使用 Akka.Management 时清除种子节点
settings.ClusterOptions.SeedNodes = [];
builder
// 配置集群(无种子节点)
.WithClustering(settings.ClusterOptions)
// 配置 Akka.Management HTTP 端点
.WithAkkaManagement(setup =>
{
setup.Http.HostName = mgmtOptions.HostName;
setup.Http.Port = mgmtOptions.Port;
setup.Http.BindHostName = "0.0.0.0"; // 监听所有接口
setup.Http.BindPort = mgmtOptions.Port;
})
// 配置集群引导
.WithClusterBootstrap(options =>
{
options.ContactPointDiscovery.ServiceName = bootstrapOptions.ServiceName;
options.ContactPointDiscovery.PortName = bootstrapOptions.PortName;
options.ContactPointDiscovery.RequiredContactPointsNr = bootstrapOptions.RequiredContactPointsNr;
options.ContactPointDiscovery.Interval = bootstrapOptions.ContactPointProbingInterval;
options.ContactPointDiscovery.StableMargin = bootstrapOptions.StableMargin;
options.ContactPointDiscovery.ContactWithAllContactPoints = bootstrapOptions.ContactWithAllContactPoints;
options.ContactPoint.FilterOnFallbackPort = bootstrapOptions.FilterOnFallbackPort;
options.ContactPoint.ProbeInterval = bootstrapOptions.BootstrapperDiscoveryPingInterval;
});
// 配置发现提供者
ConfigureDiscovery(builder, settings, configuration);
}
发现提供者
1. 配置发现(开发/固定基础设施)
当提前知道端点时使用:
private static void ConfigureConfigDiscovery(
AkkaConfigurationBuilder builder,
ClusterBootstrapOptions options)
{
if (options.ConfigServiceEndpoints == null || options.ConfigServiceEndpoints.Length == 0)
throw new InvalidOperationException("ConfigServiceEndpoints required for Config discovery");
var endpoints = string.Join(", ", options.ConfigServiceEndpoints.Select(ep => $"\"{ep}\""));
var hocon = $@"
akka.discovery {{
method = config
config {{
services {{
{options.ServiceName} {{
endpoints = [{endpoints}]
}}
}}
}}
}}";
builder.AddHocon(hocon, HoconAddMode.Prepend);
}
appsettings.json:
{
"AkkaSettings": {
"ClusterBootstrapOptions": {
"Enabled": true,
"DiscoveryMethod": "Config",
"ServiceName": "my-service",
"ConfigServiceEndpoints": [
"node1.local:8558",
"node2.local:8558",
"node3.local:8558"
]
}
}
}
2. Kubernetes 发现(生产 K8s)
查询 Kubernetes API 以获取 pod 端点:
private static void ConfigureKubernetesDiscovery(
AkkaConfigurationBuilder builder,
KubernetesDiscoveryOptions? options)
{
if (options != null)
{
builder.WithKubernetesDiscovery(k8sOptions =>
{
if (!string.IsNullOrEmpty(options.PodNamespace))
k8sOptions.PodNamespace = options.PodNamespace;
if (!string.IsNullOrEmpty(options.PodLabelSelector))
k8sOptions.PodLabelSelector = options.PodLabelSelector;
});
}
else
{
// 使用默认值 - 自动检测命名空间并使用所有 pod
builder.WithKubernetesDiscovery();
}
}
Kubernetes 部署:
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-akka-service
spec:
replicas: 3
selector:
matchLabels:
app: my-akka-service
template:
metadata:
labels:
app: my-akka-service
spec:
containers:
- name: app
image: my-app:latest
ports:
- name: http
containerPort: 8080
- name: remote
containerPort: 8081
- name: management # 必须与配置中的 PortName 匹配
containerPort: 8558
env:
- name: AkkaSettings__ClusterBootstrapOptions__Enabled
value: "true"
- name: AkkaSettings__ClusterBootstrapOptions__DiscoveryMethod
value: "Kubernetes"
- name: AkkaSettings__ClusterBootstrapOptions__ServiceName
value: "my-akka-service"
- name: AkkaSettings__RemoteOptions__PublicHostName
valueFrom:
fieldRef:
fieldPath: status.podIP
---
apiVersion: v1
kind: Service
metadata:
name: my-akka-service
spec:
clusterIP: None # 无头服务,用于直接 pod 发现
selector:
app: my-akka-service
ports:
- name: management
port: 8558
必需的 RBAC:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: akka-discovery
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: akka-discovery
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: akka-discovery
apiGroup: rbac.authorization.k8s.io
3. Azure 表存储发现(Azure/Aspire)
节点在共享 Azure 表中注册自己:
private static void ConfigureAzureDiscovery(
AkkaConfigurationBuilder builder,
ClusterBootstrapOptions bootstrapOptions,
AkkaManagementOptions mgmtOptions,
IConfiguration configuration)
{
var connectionString = configuration.GetConnectionString("AkkaManagementAzure");
if (string.IsNullOrEmpty(connectionString))
throw new InvalidOperationException("AkkaManagementAzure connection string required");
builder.WithAzureDiscovery(options =>
{
options.ServiceName = bootstrapOptions.ServiceName;
options.ConnectionString = connectionString;
options.HostName = mgmtOptions.HostName;
options.Port = mgmtOptions.Port;
});
}
appsettings.json:
{
"ConnectionStrings": {
"AkkaManagementAzure": "DefaultEndpointsProtocol=https;AccountName=...;AccountKey=..."
},
"AkkaSettings": {
"ClusterBootstrapOptions": {
"Enabled": true,
"DiscoveryMethod": "AzureTableStorage",
"ServiceName": "my-service",
"AzureDiscoveryOptions": {
"TableName": "AkkaDiscovery"
}
}
}
}
完整的发现配置
private static void ConfigureDiscovery(
AkkaConfigurationBuilder builder,
AkkaSettings settings,
IConfiguration configuration)
{
var bootstrapOptions = settings.ClusterBootstrapOptions;
var mgmtOptions = settings.AkkaManagementOptions;
switch (bootstrapOptions.DiscoveryMethod)
{
case DiscoveryMethod.Config:
ConfigureConfigDiscovery(builder, bootstrapOptions);
break;
case DiscoveryMethod.Kubernetes:
ConfigureKubernetesDiscovery(builder, bootstrapOptions.KubernetesDiscoveryOptions);
break;
case DiscoveryMethod.AzureTableStorage:
ConfigureAzureDiscovery(builder, bootstrapOptions, mgmtOptions, configuration);
break;
default:
throw new ArgumentOutOfRangeException(
nameof(bootstrapOptions.DiscoveryMethod),
$"Unknown discovery method: {bootstrapOptions.DiscoveryMethod}");
}
}
健康端点
Akka.Management 暴露健康端点,供负载均衡器和编排器使用:
| 端点 | 目的 | 返回 200 时 |
|---|---|---|
/alive |
存活 | ActorSystem 正在运行 |
/ready |
就绪 | 集群成员处于 Up 状态 |
/cluster/members |
调试 | 返回集群成员资格 |
ASP.NET Core 健康检查集成
// 注册 Akka 健康检查
builder.Services.AddHealthChecks();
// 在 Akka 配置中
builder
.WithActorSystemLivenessCheck() // 添加 "akka-liveness" 健康检查
.WithAkkaClusterReadinessCheck(); // 添加 "akka-cluster-readiness" 健康检查
// 映射端点
app.MapHealthChecks("/health/live", new HealthCheckOptions
{
Predicate = check => check.Tags.Contains("liveness")
});
app.MapHealthChecks("/health/ready", new HealthCheckOptions
{
Predicate = check => check.Tags.Contains("readiness")
});
故障排除
集群无法形成
症状: 节点保持为单独的单节点集群。
检查清单:
- 所有节点使用相同的
ServiceName RequiredContactPointsNr与实际副本计数匹配- 发现提供者配置正确
- 网络允许管理端口(8558)上的流量
- 对于 Kubernetes:设置了 RBAC 权限
调试:
// 启用详细日志
"AkkaSettings": {
"LogConfigOnStart": true
}
脑裂
症状: 形成多个集群而不是一个。
解决方案:
- 设置
ContactWithAllContactPoints = true - 增加
StableMargin以适应较慢的环境 - 对于 Aspire:设置
FilterOnFallbackPort = false(动态端口) - 对于 Kubernetes:设置
FilterOnFallbackPort = true(固定端口)
Azure 发现问题
症状: 节点无法通过 Azure 表找到彼此。
检查清单:
- 连接字符串有效
- 存储帐户允许表操作
- 所有节点使用相同的
ServiceName - 防火墙允许访问 Azure 存储
Aspire 集成
有关 Aspire 特定模式的详细信息,请参阅 akka-net-aspire-configuration 技能。
Aspire 的快速参考:
// 在 AppHost
appBuilder
.WithEndpoint(name: "remote", protocol: ProtocolType.Tcp,
env: "AkkaSettings__RemoteOptions__Port")
.WithEndpoint(name: "management", protocol: ProtocolType.Tcp,
env: "AkkaSettings__AkkaManagementOptions__Port")
.WithEnvironment("AkkaSettings__ClusterBootstrapOptions__Enabled", "true")
.WithEnvironment("AkkaSettings__ClusterBootstrapOptions__DiscoveryMethod", "AzureTableStorage")
.WithEnvironment("AkkaSettings__ClusterBootstrapOptions__FilterOnFallbackPort", "false");
总结:何时使用什么
| 场景 | 发现方法 | FilterOnFallbackPort |
|---|---|---|
| 本地开发(单节点) | 无(使用种子节点) | N/A |
| Aspire 多节点 | AzureTableStorage | false |
| Kubernetes | Kubernetes | true |
| Azure VM/VMSS | AzureTableStorage | true |
| 固定基础设施 | Config | true |
| AWS ECS/EC2 | AWS 发现插件 | true |