name: Erlang分布式系统 user-invocable: false description: 用于Erlang分布式系统,包括节点连接性、分布式进程、全局名称注册、分布式监控、网络分区处理,以及在BEAM虚拟机上构建容错多节点应用。 allowed-tools: []
Erlang分布式系统
引言
Erlang内置的分布式功能使得能够在多个节点上构建集群化、容错的系统。不同节点上的进程通过本地使用的相同消息传递原语进行透明通信。这种位置透明性使分布式编程变得自然和直接。
分布式层自动处理网络通信、序列化和节点连接性。节点通过命名发现彼此,进程可以通过注册名称或进程ID引用全局寻址。理解分布式模式对于构建可扩展、弹性的系统至关重要。
本技能涵盖节点连接性和集群化、分布式消息传递、全局名称注册、分布式监控、处理网络分区、RPC模式,以及构建生产级分布式应用。
节点连接性
节点连接形成集群以实现分布式计算和容错。
%% 启动命名节点
%% erl -name node1@hostname -setcookie secret
%% erl -sname node2 -setcookie secret
%% 连接节点
connect_nodes() ->
Node1 = 'node1@host',
Node2 = 'node2@host',
net_kernel:connect_node(Node2).
%% 检查连接节点
list_nodes() ->
Nodes = [node() | nodes()],
io:format("连接节点: ~p~n", [Nodes]).
%% 监控节点连接
monitor_nodes() ->
net_kernel:monitor_nodes(true),
receive
{nodeup, Node} ->
io:format("节点上线: ~p~n", [Node]);
{nodedown, Node} ->
io:format("节点下线: ~p~n", [Node])
end.
%% 节点配置
start_distributed() ->
{ok, _} = net_kernel:start([mynode, shortnames]),
erlang:set_cookie(node(), secret_cookie).
%% 隐藏节点(用于监控)
connect_hidden(Node) ->
net_kernel:connect_node(Node),
erlang:disconnect_node(Node),
net_kernel:hidden_connect_node(Node).
%% 获取节点信息
node_info() ->
#{
name => node(),
cookie => erlang:get_cookie(),
nodes => nodes(),
alive => is_alive()
}.
节点连接性使得能够构建具有自动发现的分布式集群。
分布式消息传递
使用与本地消息传递相同的语法向远程节点上的进程发送消息。
%% 发送到远程节点上的注册进程
send_remote(Node, Name, Message) ->
{Name, Node} ! Message.
%% 在远程节点上生成进程
spawn_on_remote(Node, Fun) ->
spawn(Node, Fun).
spawn_on_remote(Node, Module, Function, Args) ->
spawn(Node, Module, Function, Args).
%% 分布式请求-响应
remote_call(Node, Module, Function, Args) ->
Pid = spawn(Node, fun() ->
Result = apply(Module, Function, Args),
receive
{From, Ref} -> From ! {Ref, Result}
end
end),
Ref = make_ref(),
Pid ! {self(), Ref},
receive
{Ref, Result} -> {ok, Result}
after 5000 ->
{error, timeout}
end.
%% 分布式工作分配
-module(work_dispatcher).
-export([start/0, dispatch/1]).
start() ->
register(?MODULE, spawn(fun() -> loop([]) end)).
dispatch(Work) ->
?MODULE ! {dispatch, Work}.
loop(Workers) ->
receive
{dispatch, Work} ->
Node = select_node(nodes()),
Pid = spawn(Node, fun() -> do_work(Work) end),
loop([{Pid, Node} | Workers])
end.
select_node(Nodes) ->
lists:nth(rand:uniform(length(Nodes)), Nodes).
do_work(Work) ->
Result = process_work(Work),
io:format("工作在 ~p 完成: ~p~n", [node(), Result]).
process_work(Work) -> Work * 2.
%% 远程组领导用于输出
remote_process_with_io(Node) ->
spawn(Node, fun() ->
group_leader(self(), self()),
io:format("从 ~p 输出~n", [node()])
end).
位置透明消息传递使得无缝分布式通信成为可能。
全局名称注册
在分布式集群中全局注册进程名称。
%% 全局注册
register_global(Name) ->
Pid = spawn(fun() -> global_loop() end),
global:register_name(Name, Pid),
Pid.
global_loop() ->
receive
{From, Message} ->
From ! {reply, Message},
global_loop();
stop -> ok
end.
%% 发送到全局注册进程
send_global(Name, Message) ->
case global:whereis_name(Name) of
undefined ->
{error, not_found};
Pid ->
Pid ! Message,
ok
end.
%% 带有冲突解决的全局名称
register_with_resolve(Name) ->
Pid = spawn(fun() -> server_loop() end),
ResolveFun = fun(Name, Pid1, Pid2) ->
%% 保留在名称较低节点上的进程
case node(Pid1) < node(Pid2) of
true -> Pid1;
false -> Pid2
end
end,
global:register_name(Name, Pid, ResolveFun).
server_loop() ->
receive
Message ->
io:format("在 ~p 收到: ~p~n", [node(), Message]),
server_loop()
end.
%% 全局同步
sync_global() ->
global:sync().
%% 列出全局注册名称
list_global_names() ->
global:registered_names().
%% 节点重连后重新注册
ensure_global_registration(Name, Fun) ->
case global:whereis_name(Name) of
undefined ->
Pid = spawn(Fun),
global:register_name(Name, Pid),
Pid;
Pid ->
Pid
end.
全局注册使得位置独立的进程发现成为可能。
分布式监控
在多个节点上监控进程,以实现集群范围的容错。
-module(distributed_supervisor).
-behaviour(supervisor).
-export([start_link/0, start_worker/1]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_worker(Node) ->
ChildSpec = #{
id => make_ref(),
start => {worker, start_link, [Node]},
restart => permanent,
type => worker
},
supervisor:start_child(?MODULE, ChildSpec).
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 5,
period => 60
},
{ok, {SupFlags, []}}.
%% 在特定节点上生成的工人模块
-module(worker).
-export([start_link/1, loop/0]).
start_link(Node) ->
Pid = spawn_link(Node, ?MODULE, loop, []),
{ok, Pid}.
loop() ->
receive
stop -> ok;
Msg ->
io:format("工人在 ~p: ~p~n", [node(), Msg]),
loop()
end.
%% 分布式进程组
-module(pg_example).
-export([start/0, join/1, broadcast/1]).
start() ->
pg:start_link().
join(Group) ->
pg:join(Group, self()).
broadcast(Group, Message) ->
Members = pg:get_members(Group),
[Pid ! Message || Pid <- Members].
分布式监控在节点故障时维持系统健康。
RPC和远程执行
使用各种调用模式在远程节点上执行函数调用。
%% 基本RPC
simple_rpc(Node, Module, Function, Args) ->
rpc:call(Node, Module, Function, Args).
%% 带超时的RPC
timed_rpc(Node, Module, Function, Args, Timeout) ->
rpc:call(Node, Module, Function, Args, Timeout).
%% 异步RPC
async_rpc(Node, Module, Function, Args) ->
Key = rpc:async_call(Node, Module, Function, Args),
%% 稍后获取结果
rpc:yield(Key).
%% 并行RPC到多个节点
parallel_rpc(Nodes, Module, Function, Args) ->
rpc:multicall(Nodes, Module, Function, Args).
%% 带结果的并行调用
parallel_rpc_results(Nodes, Module, Function, Args) ->
rpc:multicall(Nodes, Module, Function, Args, 5000).
%% 投递(发后即忘)
cast_rpc(Node, Module, Function, Args) ->
rpc:cast(Node, Module, Function, Args).
%% 广播到所有节点
broadcast_rpc(Module, Function, Args) ->
Nodes = [node() | nodes()],
rpc:multicall(Nodes, Module, Function, Args).
%% 在节点上并行映射
pmap_nodes(Fun, List) ->
Nodes = nodes(),
DistFun = fun(X) ->
Node = lists:nth((X rem length(Nodes)) + 1, Nodes),
rpc:call(Node, erlang, apply, [Fun, [X]])
end,
lists:map(DistFun, List).
RPC使得具有位置透明性的远程执行变得方便。
网络分区和CAP
处理网络分区并理解CAP定理的权衡。
%% 检测网络分区
detect_partition() ->
ExpectedNodes = [node1@host, node2@host, node3@host],
CurrentNodes = nodes(),
Missing = ExpectedNodes -- CurrentNodes,
case Missing of
[] -> ok;
Nodes -> {partition, Nodes}
end.
%% 分区愈合策略
-module(partition_handler).
-export([monitor_cluster/1]).
monitor_cluster(ExpectedNodes) ->
net_kernel:monitor_nodes(true),
monitor_loop(ExpectedNodes, nodes()).
monitor_loop(Expected, Current) ->
receive
{nodeup, Node} ->
NewCurrent = [Node | Current],
case length(NewCurrent) == length(Expected) of
true ->
io:format("集群完全连接~n"),
heal_partition();
false ->
ok
end,
monitor_loop(Expected, NewCurrent);
{nodedown, Node} ->
NewCurrent = lists:delete(Node, Current),
io:format("检测到分区: ~p~n", [Node]),
monitor_loop(Expected, NewCurrent)
end.
heal_partition() ->
%% 分区愈合后同步状态
global:sync(),
ok.
%% 基于多数票的共识
-module(consensus).
-export([propose/2, vote/3]).
propose(Nodes, Value) ->
Ref = make_ref(),
[Node ! {vote, self(), Ref, Value} || Node <- Nodes],
collect_votes(Ref, length(Nodes), 0).
collect_votes(_Ref, Total, Votes) when Votes > Total div 2 ->
{ok, majority};
collect_votes(_Ref, Total, Total) ->
{error, no_majority};
collect_votes(Ref, Total, Votes) ->
receive
{vote, Ref, accept} ->
collect_votes(Ref, Total, Votes + 1);
{vote, Ref, reject} ->
collect_votes(Ref, Total, Votes)
after 5000 ->
{error, timeout}
end.
vote(From, Ref, Value) ->
Decision = evaluate_proposal(Value),
From ! {vote, Ref, Decision}.
evaluate_proposal(_Value) -> accept.
分区处理策略在网络故障时维持系统可用性。
最佳实践
-
本地集群使用短名称,互联网范围分布使用长名称
-
在可信集群中设置相同cookie以提高安全性
-
监控节点连接以检测和处理网络分区
-
谨慎使用全局注册,因为它增加协调开销
-
实现分区检测和愈合策略以提高弹性
-
设计为最终一致性,在分布式系统中接受CAP限制
-
简单调用使用RPC,但复杂协议更倾向于消息传递
-
使用如toxiproxy或混沌工程工具测试网络故障
-
在分布式调用上实施适当超时以处理慢网络
-
使用分布式监控以在节点间维持容错
常见陷阱
-
未设置cookie阻止节点连接,导致静默失败
-
到处使用全局注册创建单点故障和瓶颈
-
未处理节点断开连接导致进程无限期挂起
-
假设网络可靠性导致在分区期间行为不正确
-
在RPC调用中使用长超时导致故障期间级联延迟
-
未测试网络分区错过关键故障模式
-
分区愈合后忘记同步全局注册
-
在多台机器上使用相同节点名导致冲突
-
未监控节点健康阻止检测集群状态退化
-
依赖严格一致性在分布式设置中违反CAP定理
何时使用此技能
当构建需要高可用性和容错的系统时应用分布式。
使用分布式监控用于需要自动故障转移的关键服务。
利用多个节点实现水平可扩展性,超越单机限制。
当地理分布提供延迟优势时,实施分布式系统。
使用集群化进行跨多台服务器的负载分配。
应用分布式模式构建弹性的微服务架构。