name: Erlang Concurrency user-invocable: false description: 使用Erlang的并发模型,包括轻量级进程、消息传递、进程链接和监视器、错误处理模式、选择性接收以及在BEAM虚拟机上构建大规模并发系统。 allowed-tools: []
Erlang 并发编程
介绍
Erlang的并发模型基于轻量级进程和消息传递,可用于构建大规模可扩展的系统。进程之间相互隔离,无共享内存,通过消息进行异步通信。这种模型消除了共享内存系统中常见的并发错误。
BEAM虚拟机高效地调度数百万个进程,每个进程都有其自己的堆和邮箱。进程创建快速且成本低廉,支持“每个实体一个进程”的设计。链接和监视器提供故障检测,而选择性接收则支持灵活的消息处理模式。
本技能涵盖进程创建和生成、消息传递模式、进程链接和监视器、选择性接收、错误传播、并发设计模式以及构建可扩展的并发系统。
进程创建和生成
创建轻量级进程以执行并发任务。
%% 基本进程生成
simple_spawn() ->
Pid = spawn(fun() ->
io:format("Hello from process ~p~n", [self()])
end),
Pid.
%% 带参数的生成
spawn_with_args(Message) ->
spawn(fun() ->
io:format("Message: ~p~n", [Message])
end).
%% 生成并注册
spawn_registered() ->
Pid = spawn(fun() -> loop() end),
register(my_process, Pid),
Pid.
loop() ->
receive
stop -> ok;
Msg ->
io:format("Received: ~p~n", [Msg]),
loop()
end.
%% 生成链接(链接进程)
spawn_linked() ->
spawn_link(fun() ->
timer:sleep(1000),
io:format("Linked process done~n")
end).
%% 生成监视器
spawn_monitored() ->
{Pid, Ref} = spawn_monitor(fun() ->
timer:sleep(500),
exit(normal)
end),
{Pid, Ref}.
%% 进程池
create_pool(N) ->
[spawn(fun() -> worker_loop() end) || _ <- lists:seq(1, N)].
worker_loop() ->
receive
{work, Data, From} ->
Result = process_data(Data),
From ! {result, Result},
worker_loop();
stop ->
ok
end.
process_data(Data) -> Data * 2.
%% 并行映射
pmap(F, List) ->
Parent = self(),
Pids = [spawn(fun() ->
Parent ! {self(), F(X)}
end) || X <- List],
[receive {Pid, Result} -> Result end || Pid <- Pids].
%% 分叉-合并模式
fork_join(Tasks) ->
Self = self(),
Pids = [spawn(fun() ->
Result = Task(),
Self ! {self(), Result}
end) || Task <- Tasks],
[receive {Pid, Result} -> Result end || Pid <- Pids].
轻量级进程支持大规模并发,开销最小。
消息传递模式
进程通过无共享内存的异步消息传递进行通信。
%% 发送和接收
send_message() ->
Pid = spawn(fun() ->
receive
{From, Msg} ->
io:format("Received: ~p~n", [Msg]),
From ! {reply, "Acknowledged"}
end
end),
Pid ! {self(), "Hello"},
receive
{reply, Response} ->
io:format("Response: ~p~n", [Response])
after 5000 ->
io:format("Timeout~n")
end.
%% 请求-响应模式
request(Pid, Request) ->
Ref = make_ref(),
Pid ! {self(), Ref, Request},
receive
{Ref, Response} -> {ok, Response}
after 5000 ->
{error, timeout}
end.
server_loop() ->
receive
{From, Ref, {add, A, B}} ->
From ! {Ref, A + B},
server_loop();
{From, Ref, {multiply, A, B}} ->
From ! {Ref, A * B},
server_loop();
stop -> ok
end.
%% 发布-订阅
start_pubsub() ->
spawn(fun() -> pubsub_loop([]) end).
pubsub_loop(Subscribers) ->
receive
{subscribe, Pid} ->
pubsub_loop([Pid | Subscribers]);
{unsubscribe, Pid} ->
pubsub_loop(lists:delete(Pid, Subscribers));
{publish, Message} ->
[Pid ! {message, Message} || Pid <- Subscribers],
pubsub_loop(Subscribers)
end.
%% 管道模式
pipeline(Data, Functions) ->
lists:foldl(fun(F, Acc) -> F(Acc) end, Data, Functions).
concurrent_pipeline(Data, Stages) ->
Self = self(),
lists:foldl(fun(Stage, AccData) ->
Pid = spawn(fun() ->
Result = Stage(AccData),
Self ! {result, Result}
end),
receive {result, R} -> R end
end, Data, Stages).
消息传递支持安全的并发通信,无需锁。
链接和监视器
链接双向连接进程,而监视器提供单向观察。
%% 进程链接
link_example() ->
process_flag(trap_exit, true),
Pid = spawn_link(fun() ->
timer:sleep(1000),
exit(normal)
end),
receive
{'EXIT', Pid, Reason} ->
io:format("Process exited: ~p~n", [Reason])
end.
%% 监视
monitor_example() ->
Pid = spawn(fun() ->
timer:sleep(500),
exit(normal)
end),
Ref = monitor(process, Pid),
receive
{'DOWN', Ref, process, Pid, Reason} ->
io:format("Process down: ~p~n", [Reason])
end.
%% 监督者模式
supervisor() ->
process_flag(trap_exit, true),
Worker = spawn_link(fun() -> worker() end),
supervisor_loop(Worker).
supervisor_loop(Worker) ->
receive
{'EXIT', Worker, _Reason} ->
NewWorker = spawn_link(fun() -> worker() end),
supervisor_loop(NewWorker)
end.
worker() ->
receive
crash -> exit(crashed);
work -> worker()
end.
链接和监视器支持构建具有自动故障检测的容错系统。
最佳实践
-
自由创建进程,因为它们轻量级且生成成本低
-
专门使用消息传递进行进程间通信,无需共享状态
-
实现适当的超时在接收中,以防止无限阻塞
-
当不需要双向链接时使用监视器进行单向观察
-
保持进程状态最小化以减少每个进程的内存使用
-
谨慎使用注册名称,因为全局名称限制了可扩展性
-
实现适当的错误处理,使用链接和监视器实现容错
-
使用选择性接收处理特定消息,同时将其他消息留在队列中
-
避免消息累积通过在接收子句中处理所有消息模式
-
分析并发系统以识别瓶颈并优化热点路径
常见陷阱
-
创建进程过少,未能充分利用Erlang的并发模型
-
未在接收中使用超时,导致故障时无限阻塞
-
积累消息在邮箱中,导致内存泄漏和性能下降
-
使用共享ETS表作为互斥锁替代,破坏了隔离优势
-
未处理所有消息类型,导致未匹配消息的邮箱溢出
-
忘记在监督者中捕获退出,阻止了适当的错误处理
-
创建循环链接,导致没有适当监督的级联故障
-
使用进程进行细粒度并行,增加开销而无益
-
未监视生成的进程,丢失了对故障的跟踪
-
过度使用注册名称,创建单点故障和竞争
何时使用此技能
将进程用于需要隔离和独立状态的并发任务。
在分布式系统中使用消息传递进行所有进程间通信。
利用链接和监视器构建容错的监督层次结构。
创建进程池以进行并发请求处理和平行计算。
使用选择性接收处理复杂的消息处理协议。