Erlang分布式系统Skill ErlangDistribution

这个技能专注于使用Erlang编程语言构建分布式系统,涵盖节点连接、分布式消息传递、全局名称注册、分布式监控、网络分区处理和容错应用开发。关键词:Erlang、分布式系统、节点连接、容错、BEAM VM、分布式编程、集群、RPC、CAP定理。

架构设计 0 次安装 2 次浏览 更新于 3/25/2026

name: Erlang分布式系统 user-invocable: false description: 用于Erlang分布式系统,包括节点连接性、分布式进程、全局名称注册、分布式监控、网络分区处理,以及在BEAM虚拟机上构建容错多节点应用。 allowed-tools: []

Erlang分布式系统

引言

Erlang内置的分布式功能使得能够在多个节点上构建集群化、容错的系统。不同节点上的进程通过本地使用的相同消息传递原语进行透明通信。这种位置透明性使分布式编程变得自然和直接。

分布式层自动处理网络通信、序列化和节点连接性。节点通过命名发现彼此,进程可以通过注册名称或进程ID引用全局寻址。理解分布式模式对于构建可扩展、弹性的系统至关重要。

本技能涵盖节点连接性和集群化、分布式消息传递、全局名称注册、分布式监控、处理网络分区、RPC模式,以及构建生产级分布式应用。

节点连接性

节点连接形成集群以实现分布式计算和容错。

%% 启动命名节点
%% erl -name node1@hostname -setcookie secret
%% erl -sname node2 -setcookie secret

%% 连接节点
connect_nodes() ->
    Node1 = 'node1@host',
    Node2 = 'node2@host',
    net_kernel:connect_node(Node2).

%% 检查连接节点
list_nodes() ->
    Nodes = [node() | nodes()],
    io:format("连接节点: ~p~n", [Nodes]).

%% 监控节点连接
monitor_nodes() ->
    net_kernel:monitor_nodes(true),
    receive
        {nodeup, Node} ->
            io:format("节点上线: ~p~n", [Node]);
        {nodedown, Node} ->
            io:format("节点下线: ~p~n", [Node])
    end.

%% 节点配置
start_distributed() ->
    {ok, _} = net_kernel:start([mynode, shortnames]),
    erlang:set_cookie(node(), secret_cookie).

%% 隐藏节点(用于监控)
connect_hidden(Node) ->
    net_kernel:connect_node(Node),
    erlang:disconnect_node(Node),
    net_kernel:hidden_connect_node(Node).

%% 获取节点信息
node_info() ->
    #{
        name => node(),
        cookie => erlang:get_cookie(),
        nodes => nodes(),
        alive => is_alive()
    }.

节点连接性使得能够构建具有自动发现的分布式集群。

分布式消息传递

使用与本地消息传递相同的语法向远程节点上的进程发送消息。

%% 发送到远程节点上的注册进程
send_remote(Node, Name, Message) ->
    {Name, Node} ! Message.

%% 在远程节点上生成进程
spawn_on_remote(Node, Fun) ->
    spawn(Node, Fun).

spawn_on_remote(Node, Module, Function, Args) ->
    spawn(Node, Module, Function, Args).

%% 分布式请求-响应
remote_call(Node, Module, Function, Args) ->
    Pid = spawn(Node, fun() ->
        Result = apply(Module, Function, Args),
        receive
            {From, Ref} -> From ! {Ref, Result}
        end
    end),
    Ref = make_ref(),
    Pid ! {self(), Ref},
    receive
        {Ref, Result} -> {ok, Result}
    after 5000 ->
        {error, timeout}
    end.

%% 分布式工作分配
-module(work_dispatcher).
-export([start/0, dispatch/1]).

start() ->
    register(?MODULE, spawn(fun() -> loop([]) end)).

dispatch(Work) ->
    ?MODULE ! {dispatch, Work}.

loop(Workers) ->
    receive
        {dispatch, Work} ->
            Node = select_node(nodes()),
            Pid = spawn(Node, fun() -> do_work(Work) end),
            loop([{Pid, Node} | Workers])
    end.

select_node(Nodes) ->
    lists:nth(rand:uniform(length(Nodes)), Nodes).

do_work(Work) ->
    Result = process_work(Work),
    io:format("工作在 ~p 完成: ~p~n", [node(), Result]).

process_work(Work) -> Work * 2.

%% 远程组领导用于输出
remote_process_with_io(Node) ->
    spawn(Node, fun() ->
        group_leader(self(), self()),
        io:format("从 ~p 输出~n", [node()])
    end).

位置透明消息传递使得无缝分布式通信成为可能。

全局名称注册

在分布式集群中全局注册进程名称。

%% 全局注册
register_global(Name) ->
    Pid = spawn(fun() -> global_loop() end),
    global:register_name(Name, Pid),
    Pid.

global_loop() ->
    receive
        {From, Message} ->
            From ! {reply, Message},
            global_loop();
        stop -> ok
    end.

%% 发送到全局注册进程
send_global(Name, Message) ->
    case global:whereis_name(Name) of
        undefined ->
            {error, not_found};
        Pid ->
            Pid ! Message,
            ok
    end.

%% 带有冲突解决的全局名称
register_with_resolve(Name) ->
    Pid = spawn(fun() -> server_loop() end),
    ResolveFun = fun(Name, Pid1, Pid2) ->
        %% 保留在名称较低节点上的进程
        case node(Pid1) < node(Pid2) of
            true -> Pid1;
            false -> Pid2
        end
    end,
    global:register_name(Name, Pid, ResolveFun).

server_loop() ->
    receive
        Message ->
            io:format("在 ~p 收到: ~p~n", [node(), Message]),
            server_loop()
    end.

%% 全局同步
sync_global() ->
    global:sync().

%% 列出全局注册名称
list_global_names() ->
    global:registered_names().

%% 节点重连后重新注册
ensure_global_registration(Name, Fun) ->
    case global:whereis_name(Name) of
        undefined ->
            Pid = spawn(Fun),
            global:register_name(Name, Pid),
            Pid;
        Pid ->
            Pid
    end.

全局注册使得位置独立的进程发现成为可能。

分布式监控

在多个节点上监控进程,以实现集群范围的容错。

-module(distributed_supervisor).
-behaviour(supervisor).

-export([start_link/0, start_worker/1]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

start_worker(Node) ->
    ChildSpec = #{
        id => make_ref(),
        start => {worker, start_link, [Node]},
        restart => permanent,
        type => worker
    },
    supervisor:start_child(?MODULE, ChildSpec).

init([]) ->
    SupFlags = #{
        strategy => one_for_one,
        intensity => 5,
        period => 60
    },
    {ok, {SupFlags, []}}.

%% 在特定节点上生成的工人模块
-module(worker).
-export([start_link/1, loop/0]).

start_link(Node) ->
    Pid = spawn_link(Node, ?MODULE, loop, []),
    {ok, Pid}.

loop() ->
    receive
        stop -> ok;
        Msg ->
            io:format("工人在 ~p: ~p~n", [node(), Msg]),
            loop()
    end.

%% 分布式进程组
-module(pg_example).
-export([start/0, join/1, broadcast/1]).

start() ->
    pg:start_link().

join(Group) ->
    pg:join(Group, self()).

broadcast(Group, Message) ->
    Members = pg:get_members(Group),
    [Pid ! Message || Pid <- Members].

分布式监控在节点故障时维持系统健康。

RPC和远程执行

使用各种调用模式在远程节点上执行函数调用。

%% 基本RPC
simple_rpc(Node, Module, Function, Args) ->
    rpc:call(Node, Module, Function, Args).

%% 带超时的RPC
timed_rpc(Node, Module, Function, Args, Timeout) ->
    rpc:call(Node, Module, Function, Args, Timeout).

%% 异步RPC
async_rpc(Node, Module, Function, Args) ->
    Key = rpc:async_call(Node, Module, Function, Args),
    %% 稍后获取结果
    rpc:yield(Key).

%% 并行RPC到多个节点
parallel_rpc(Nodes, Module, Function, Args) ->
    rpc:multicall(Nodes, Module, Function, Args).

%% 带结果的并行调用
parallel_rpc_results(Nodes, Module, Function, Args) ->
    rpc:multicall(Nodes, Module, Function, Args, 5000).

%% 投递(发后即忘)
cast_rpc(Node, Module, Function, Args) ->
    rpc:cast(Node, Module, Function, Args).

%% 广播到所有节点
broadcast_rpc(Module, Function, Args) ->
    Nodes = [node() | nodes()],
    rpc:multicall(Nodes, Module, Function, Args).

%% 在节点上并行映射
pmap_nodes(Fun, List) ->
    Nodes = nodes(),
    DistFun = fun(X) ->
        Node = lists:nth((X rem length(Nodes)) + 1, Nodes),
        rpc:call(Node, erlang, apply, [Fun, [X]])
    end,
    lists:map(DistFun, List).

RPC使得具有位置透明性的远程执行变得方便。

网络分区和CAP

处理网络分区并理解CAP定理的权衡。

%% 检测网络分区
detect_partition() ->
    ExpectedNodes = [node1@host, node2@host, node3@host],
    CurrentNodes = nodes(),
    Missing = ExpectedNodes -- CurrentNodes,
    case Missing of
        [] -> ok;
        Nodes -> {partition, Nodes}
    end.

%% 分区愈合策略
-module(partition_handler).
-export([monitor_cluster/1]).

monitor_cluster(ExpectedNodes) ->
    net_kernel:monitor_nodes(true),
    monitor_loop(ExpectedNodes, nodes()).

monitor_loop(Expected, Current) ->
    receive
        {nodeup, Node} ->
            NewCurrent = [Node | Current],
            case length(NewCurrent) == length(Expected) of
                true ->
                    io:format("集群完全连接~n"),
                    heal_partition();
                false ->
                    ok
            end,
            monitor_loop(Expected, NewCurrent);

        {nodedown, Node} ->
            NewCurrent = lists:delete(Node, Current),
            io:format("检测到分区: ~p~n", [Node]),
            monitor_loop(Expected, NewCurrent)
    end.

heal_partition() ->
    %% 分区愈合后同步状态
    global:sync(),
    ok.

%% 基于多数票的共识
-module(consensus).
-export([propose/2, vote/3]).

propose(Nodes, Value) ->
    Ref = make_ref(),
    [Node ! {vote, self(), Ref, Value} || Node <- Nodes],
    collect_votes(Ref, length(Nodes), 0).

collect_votes(_Ref, Total, Votes) when Votes > Total div 2 ->
    {ok, majority};
collect_votes(_Ref, Total, Total) ->
    {error, no_majority};
collect_votes(Ref, Total, Votes) ->
    receive
        {vote, Ref, accept} ->
            collect_votes(Ref, Total, Votes + 1);
        {vote, Ref, reject} ->
            collect_votes(Ref, Total, Votes)
    after 5000 ->
        {error, timeout}
    end.

vote(From, Ref, Value) ->
    Decision = evaluate_proposal(Value),
    From ! {vote, Ref, Decision}.

evaluate_proposal(_Value) -> accept.

分区处理策略在网络故障时维持系统可用性。

最佳实践

  1. 本地集群使用短名称,互联网范围分布使用长名称

  2. 在可信集群中设置相同cookie以提高安全性

  3. 监控节点连接以检测和处理网络分区

  4. 谨慎使用全局注册,因为它增加协调开销

  5. 实现分区检测和愈合策略以提高弹性

  6. 设计为最终一致性,在分布式系统中接受CAP限制

  7. 简单调用使用RPC,但复杂协议更倾向于消息传递

  8. 使用如toxiproxy或混沌工程工具测试网络故障

  9. 在分布式调用上实施适当超时以处理慢网络

  10. 使用分布式监控以在节点间维持容错

常见陷阱

  1. 未设置cookie阻止节点连接,导致静默失败

  2. 到处使用全局注册创建单点故障和瓶颈

  3. 未处理节点断开连接导致进程无限期挂起

  4. 假设网络可靠性导致在分区期间行为不正确

  5. 在RPC调用中使用长超时导致故障期间级联延迟

  6. 未测试网络分区错过关键故障模式

  7. 分区愈合后忘记同步全局注册

  8. 在多台机器上使用相同节点名导致冲突

  9. 未监控节点健康阻止检测集群状态退化

  10. 依赖严格一致性在分布式设置中违反CAP定理

何时使用此技能

当构建需要高可用性和容错的系统时应用分布式。

使用分布式监控用于需要自动故障转移的关键服务。

利用多个节点实现水平可扩展性,超越单机限制。

当地理分布提供延迟优势时,实施分布式系统。

使用集群化进行跨多台服务器的负载分配。

应用分布式模式构建弹性的微服务架构。

资源