Erlang并发编程Skill ErlangConcurrency

Erlang并发编程是一种编程技能,专注于使用Erlang语言的轻量级进程、消息传递、进程链接和监视器等特性,以构建高并发、可扩展和故障容忍的系统。适用于分布式系统、实时应用和大规模并发处理场景。关键词:Erlang、并发编程、轻量级进程、消息传递、BEAM虚拟机、故障容忍、分布式架构、并发模型。

后端开发 0 次安装 2 次浏览 更新于 3/25/2026

name: Erlang Concurrency user-invocable: false description: 使用Erlang的并发模型,包括轻量级进程、消息传递、进程链接和监视器、错误处理模式、选择性接收以及在BEAM虚拟机上构建大规模并发系统。 allowed-tools: []

Erlang 并发编程

介绍

Erlang的并发模型基于轻量级进程和消息传递,可用于构建大规模可扩展的系统。进程之间相互隔离,无共享内存,通过消息进行异步通信。这种模型消除了共享内存系统中常见的并发错误。

BEAM虚拟机高效地调度数百万个进程,每个进程都有其自己的堆和邮箱。进程创建快速且成本低廉,支持“每个实体一个进程”的设计。链接和监视器提供故障检测,而选择性接收则支持灵活的消息处理模式。

本技能涵盖进程创建和生成、消息传递模式、进程链接和监视器、选择性接收、错误传播、并发设计模式以及构建可扩展的并发系统。

进程创建和生成

创建轻量级进程以执行并发任务。

%% 基本进程生成
simple_spawn() ->
    Pid = spawn(fun() ->
        io:format("Hello from process ~p~n", [self()])
    end),
    Pid.

%% 带参数的生成
spawn_with_args(Message) ->
    spawn(fun() ->
        io:format("Message: ~p~n", [Message])
    end).

%% 生成并注册
spawn_registered() ->
    Pid = spawn(fun() -> loop() end),
    register(my_process, Pid),
    Pid.

loop() ->
    receive
        stop -> ok;
        Msg ->
            io:format("Received: ~p~n", [Msg]),
            loop()
    end.

%% 生成链接(链接进程)
spawn_linked() ->
    spawn_link(fun() ->
        timer:sleep(1000),
        io:format("Linked process done~n")
    end).

%% 生成监视器
spawn_monitored() ->
    {Pid, Ref} = spawn_monitor(fun() ->
        timer:sleep(500),
        exit(normal)
    end),
    {Pid, Ref}.

%% 进程池
create_pool(N) ->
    [spawn(fun() -> worker_loop() end) || _ <- lists:seq(1, N)].

worker_loop() ->
    receive
        {work, Data, From} ->
            Result = process_data(Data),
            From ! {result, Result},
            worker_loop();
        stop ->
            ok
    end.

process_data(Data) -> Data * 2.

%% 并行映射
pmap(F, List) ->
    Parent = self(),
    Pids = [spawn(fun() ->
        Parent ! {self(), F(X)}
    end) || X <- List],
    [receive {Pid, Result} -> Result end || Pid <- Pids].


%% 分叉-合并模式
fork_join(Tasks) ->
    Self = self(),
    Pids = [spawn(fun() ->
        Result = Task(),
        Self ! {self(), Result}
    end) || Task <- Tasks],
    [receive {Pid, Result} -> Result end || Pid <- Pids].

轻量级进程支持大规模并发,开销最小。

消息传递模式

进程通过无共享内存的异步消息传递进行通信。

%% 发送和接收
send_message() ->
    Pid = spawn(fun() ->
        receive
            {From, Msg} ->
                io:format("Received: ~p~n", [Msg]),
                From ! {reply, "Acknowledged"}
        end
    end),
    Pid ! {self(), "Hello"},
    receive
        {reply, Response} ->
            io:format("Response: ~p~n", [Response])
    after 5000 ->
        io:format("Timeout~n")
    end.

%% 请求-响应模式
request(Pid, Request) ->
    Ref = make_ref(),
    Pid ! {self(), Ref, Request},
    receive
        {Ref, Response} -> {ok, Response}
    after 5000 ->
        {error, timeout}
    end.

server_loop() ->
    receive
        {From, Ref, {add, A, B}} ->
            From ! {Ref, A + B},
            server_loop();
        {From, Ref, {multiply, A, B}} ->
            From ! {Ref, A * B},
            server_loop();
        stop -> ok
    end.

%% 发布-订阅
start_pubsub() ->
    spawn(fun() -> pubsub_loop([]) end).

pubsub_loop(Subscribers) ->
    receive
        {subscribe, Pid} ->
            pubsub_loop([Pid | Subscribers]);
        {unsubscribe, Pid} ->
            pubsub_loop(lists:delete(Pid, Subscribers));
        {publish, Message} ->
            [Pid ! {message, Message} || Pid <- Subscribers],
            pubsub_loop(Subscribers)
    end.

%% 管道模式
pipeline(Data, Functions) ->
    lists:foldl(fun(F, Acc) -> F(Acc) end, Data, Functions).

concurrent_pipeline(Data, Stages) ->
    Self = self(),
    lists:foldl(fun(Stage, AccData) ->
        Pid = spawn(fun() ->
            Result = Stage(AccData),
            Self ! {result, Result}
        end),
        receive {result, R} -> R end
    end, Data, Stages).

消息传递支持安全的并发通信,无需锁。

链接和监视器

链接双向连接进程,而监视器提供单向观察。

%% 进程链接
link_example() ->
    process_flag(trap_exit, true),
    Pid = spawn_link(fun() ->
        timer:sleep(1000),
        exit(normal)
    end),
    receive
        {'EXIT', Pid, Reason} ->
            io:format("Process exited: ~p~n", [Reason])
    end.

%% 监视
monitor_example() ->
    Pid = spawn(fun() ->
        timer:sleep(500),
        exit(normal)
    end),
    Ref = monitor(process, Pid),
    receive
        {'DOWN', Ref, process, Pid, Reason} ->
            io:format("Process down: ~p~n", [Reason])
    end.

%% 监督者模式
supervisor() ->
    process_flag(trap_exit, true),
    Worker = spawn_link(fun() -> worker() end),
    supervisor_loop(Worker).

supervisor_loop(Worker) ->
    receive
        {'EXIT', Worker, _Reason} ->
            NewWorker = spawn_link(fun() -> worker() end),
            supervisor_loop(NewWorker)
    end.

worker() ->
    receive
        crash -> exit(crashed);
        work -> worker()
    end.

链接和监视器支持构建具有自动故障检测的容错系统。

最佳实践

  1. 自由创建进程,因为它们轻量级且生成成本低

  2. 专门使用消息传递进行进程间通信,无需共享状态

  3. 实现适当的超时在接收中,以防止无限阻塞

  4. 当不需要双向链接时使用监视器进行单向观察

  5. 保持进程状态最小化以减少每个进程的内存使用

  6. 谨慎使用注册名称,因为全局名称限制了可扩展性

  7. 实现适当的错误处理,使用链接和监视器实现容错

  8. 使用选择性接收处理特定消息,同时将其他消息留在队列中

  9. 避免消息累积通过在接收子句中处理所有消息模式

  10. 分析并发系统以识别瓶颈并优化热点路径

常见陷阱

  1. 创建进程过少,未能充分利用Erlang的并发模型

  2. 未在接收中使用超时,导致故障时无限阻塞

  3. 积累消息在邮箱中,导致内存泄漏和性能下降

  4. 使用共享ETS表作为互斥锁替代,破坏了隔离优势

  5. 未处理所有消息类型,导致未匹配消息的邮箱溢出

  6. 忘记在监督者中捕获退出,阻止了适当的错误处理

  7. 创建循环链接,导致没有适当监督的级联故障

  8. 使用进程进行细粒度并行,增加开销而无益

  9. 未监视生成的进程,丢失了对故障的跟踪

  10. 过度使用注册名称,创建单点故障和竞争

何时使用此技能

将进程用于需要隔离和独立状态的并发任务。

在分布式系统中使用消息传递进行所有进程间通信。

利用链接和监视器构建容错的监督层次结构。

创建进程池以进行并发请求处理和平行计算。

使用选择性接收处理复杂的消息处理协议。

资源