名称: Erlang OTP 行为模式 用户不可调用: false 描述: 使用OTP行为包括gen_server用于有状态进程、gen_statem用于状态机、supervisors用于容错、gen_event用于事件处理,以及使用经过验证的模式构建健壮、生产就绪的Erlang应用程序。 允许工具: []
Erlang OTP 行为模式
简介
OTP(开放电信平台)行为为Erlang系统中的常见进程类型提供可重用模式。这些抽象处理复杂的细节,如消息传递、错误处理和状态管理,允许开发人员专注于业务逻辑,同时保持系统可靠性。
行为定义进程必须实现的接口,OTP处理基础设施。Gen_server提供客户端-服务器进程,gen_statem实现状态机,supervisors管理进程生命周期,gen_event协调事件分发。理解这些模式对于生产Erlang系统至关重要。
本技能涵盖gen_server用于有状态进程、gen_statem用于复杂状态机、supervisor树用于容错、gen_event用于事件处理、应用行为用于打包,以及构建健壮OTP系统的模式。
Gen_Server 基础
Gen_server通过同步和异步通信实现客户端-服务器进程。
-module(counter_server).
-behaviour(gen_server).
%% API
-export([start_link/0, increment/0, decrement/0, get_value/0, reset/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% State record
-record(state, {count = 0}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
increment() ->
gen_server:cast(?SERVER, increment).
decrement() ->
gen_server:cast(?SERVER, decrement).
get_value() ->
gen_server:call(?SERVER, get_value).
reset() ->
gen_server:call(?SERVER, reset).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
{ok, #state{}}.
%% Synchronous calls (with response)
handle_call(get_value, _From, State) ->
{reply, State#state.count, State};
handle_call(reset, _From, State) ->
{reply, ok, State#state{count = 0}};
handle_call(_Request, _From, State) ->
{reply, ignored, State}.
%% Asynchronous casts (no response)
handle_cast(increment, State) ->
NewCount = State#state.count + 1,
{noreply, State#state{count = NewCount}};
handle_cast(decrement, State) ->
NewCount = State#state.count - 1,
{noreply, State#state{count = NewCount}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% Handle other messages
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Complex gen_server example: Cache
%%%===================================================================
-module(cache_server).
-behaviour(gen_server).
-export([start_link/1, put/2, get/1, delete/1, clear/0, size/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
cache = #{},
max_size = 1000,
hits = 0,
misses = 0
}).
start_link(MaxSize) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [MaxSize], []).
put(Key, Value) ->
gen_server:call(?MODULE, {put, Key, Value}).
get(Key) ->
gen_server:call(?MODULE, {get, Key}).
delete(Key) ->
gen_server:cast(?MODULE, {delete, Key}).
clear() ->
gen_server:cast(?MODULE, clear).
size() ->
gen_server:call(?MODULE, size).
init([MaxSize]) ->
process_flag(trap_exit, true),
{ok, #state{max_size = MaxSize}}.
handle_call({put, Key, Value}, _From, State) ->
Cache = State#state.cache,
case maps:size(Cache) >= State#state.max_size of
true ->
{reply, {error, cache_full}, State};
false ->
NewCache = maps:put(Key, Value, Cache),
{reply, ok, State#state{cache = NewCache}}
end;
handle_call({get, Key}, _From, State) ->
Cache = State#state.cache,
case maps:find(Key, Cache) of
{ok, Value} ->
NewState = State#state{hits = State#state.hits + 1},
{reply, {ok, Value}, NewState};
error ->
NewState = State#state{misses = State#state.misses + 1},
{reply, not_found, NewState}
end;
handle_call(size, _From, State) ->
Size = maps:size(State#state.cache),
{reply, Size, State};
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast({delete, Key}, State) ->
NewCache = maps:remove(Key, State#state.cache),
{noreply, State#state{cache = NewCache}};
handle_cast(clear, State) ->
{noreply, State#state{cache = #{}}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, State) ->
io:format("Cache terminating: ~p~n", [Reason]),
io:format("Stats - Hits: ~p, Misses: ~p~n", [State#state.hits, State#state.misses]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% gen_server with timeouts
%%%===================================================================
-module(session_server).
-behaviour(gen_server).
-export([start_link/0, touch/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(TIMEOUT, 30000). % 30 seconds
-record(state, {
last_activity,
data = #{}
}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
touch() ->
gen_server:cast(?MODULE, touch).
init([]) ->
{ok, #state{last_activity = erlang:system_time(millisecond)}, ?TIMEOUT}.
handle_call(_Request, _From, State) ->
{reply, ok, State, ?TIMEOUT}.
handle_cast(touch, State) ->
NewState = State#state{last_activity = erlang:system_time(millisecond)},
{noreply, NewState, ?TIMEOUT};
handle_cast(_Msg, State) ->
{noreply, State, ?TIMEOUT}.
handle_info(timeout, State) ->
io:format("Session timed out~n"),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State, ?TIMEOUT}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
Gen_server为有状态进程提供客户端-服务器模式的结构。
Gen_Statem 用于状态机
Gen_statem通过显式状态转换实现有限状态机。
-module(door_fsm).
-behaviour(gen_statem).
-export([start_link/0, open/0, close/0, lock/0, unlock/1]).
-export([init/1, callback_mode/0, terminate/3, code_change/4]).
-export([locked/3, unlocked/3, open/3]).
-define(CODE, "1234").
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
open() ->
gen_statem:call(?MODULE, open).
close() ->
gen_statem:call(?MODULE, close).
lock() ->
gen_statem:call(?MODULE, lock).
unlock(Code) ->
gen_statem:call(?MODULE, {unlock, Code}).
init([]) ->
{ok, locked, #{}}.
callback_mode() ->
state_functions.
%% Locked state
locked(call, {unlock, Code}, Data) when Code =:= ?CODE ->
{next_state, unlocked, Data, [{reply, ok}]};
locked(call, {unlock, _WrongCode}, Data) ->
{keep_state, Data, [{reply, {error, wrong_code}}]};
locked(call, _Event, Data) ->
{keep_state, Data, [{reply, {error, door_locked}}]}.
%% Unlocked state
unlocked(call, lock, Data) ->
{next_state, locked, Data, [{reply, ok}]};
unlocked(call, open, Data) ->
{next_state, open, Data, [{reply, ok}]};
unlocked(call, _Event, Data) ->
{keep_state, Data, [{reply, ok}]}.
%% Open state
open(call, close, Data) ->
{next_state, unlocked, Data, [{reply, ok}]};
open(call, _Event, Data) ->
{keep_state, Data, [{reply, {error, door_open}}]}.
terminate(_Reason, _State, _Data) ->
ok.
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.
%%%===================================================================
%%% Connection state machine
%%%===================================================================
-module(connection_fsm).
-behaviour(gen_statem).
-export([start_link/0, connect/0, disconnect/0, send/1]).
-export([init/1, callback_mode/0, terminate/3, code_change/4]).
-export([disconnected/3, connecting/3, connected/3]).
-record(data, {
socket = undefined,
buffer = <<>>,
retry_count = 0
}).
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
connect() ->
gen_statem:call(?MODULE, connect).
disconnect() ->
gen_statem:call(?MODULE, disconnect).
send(Data) ->
gen_statem:call(?MODULE, {send, Data}).
init([]) ->
{ok, disconnected, #data{}}.
callback_mode() ->
[state_functions, state_enter].
%% Disconnected state
disconnected(enter, _OldState, _Data) ->
io:format("Entered disconnected state~n"),
keep_state_and_data;
disconnected(call, connect, Data) ->
case connect_to_server() of
{ok, Socket} ->
{next_state, connected, Data#data{socket = Socket, retry_count = 0},
[{reply, ok}]};
error ->
NewData = Data#data{retry_count = Data#data.retry_count + 1},
case NewData#data.retry_count < 3 of
true ->
{next_state, connecting, NewData, [{reply, {error, retrying}}]};
false ->
{keep_state, NewData, [{reply, {error, max_retries}}]}
end
end.
%% Connecting state
connecting(enter, _OldState, _Data) ->
erlang:send_after(1000, self(), retry_connect),
keep_state_and_data;
connecting(info, retry_connect, Data) ->
case connect_to_server() of
{ok, Socket} ->
{next_state, connected, Data#data{socket = Socket, retry_count = 0}};
error ->
NewData = Data#data{retry_count = Data#data.retry_count + 1},
case NewData#data.retry_count < 3 of
true ->
{keep_state, NewData};
false ->
{next_state, disconnected, NewData}
end
end.
%% Connected state
connected(enter, _OldState, _Data) ->
io:format("Connection established~n"),
keep_state_and_data;
connected(call, {send, Data}, StateData) ->
case send_data(StateData#data.socket, Data) of
ok ->
{keep_state_and_data, [{reply, ok}]};
error ->
{next_state, disconnected, StateData, [{reply, {error, send_failed}}]}
end;
connected(call, disconnect, StateData) ->
close_connection(StateData#data.socket),
{next_state, disconnected, StateData#data{socket = undefined}, [{reply, ok}]}.
terminate(_Reason, _State, Data) ->
case Data#data.socket of
undefined -> ok;
Socket -> close_connection(Socket)
end.
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.
%% Helper functions
connect_to_server() ->
{ok, socket}.
send_data(_Socket, _Data) ->
ok.
close_connection(_Socket) ->
ok.
Gen_statem提供具有显式转换的结构化状态机实现。
Supervisor 树
Supervisors监控子进程并在失败时重启它们,以实现容错。
-module(my_supervisor).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 5,
period => 60
},
ChildSpecs = [
#{
id => counter_server,
start => {counter_server, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [counter_server]
},
#{
id => cache_server,
start => {cache_server, start_link, [1000]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [cache_server]
}
],
{ok, {SupFlags, ChildSpecs}}.
%%%===================================================================
%%% Supervisor strategies
%%%===================================================================
%% one_for_one: Restart only failed child
init_one_for_one([]) ->
SupFlags = #{strategy => one_for_one},
Children = [worker_spec(worker1), worker_spec(worker2)],
{ok, {SupFlags, Children}}.
%% one_for_all: Restart all children if any fails
init_one_for_all([]) ->
SupFlags = #{strategy => one_for_all},
Children = [worker_spec(worker1), worker_spec(worker2)],
{ok, {SupFlags, Children}}.
%% rest_for_one: Restart failed child and all started after it
init_rest_for_one([]) ->
SupFlags = #{strategy => rest_for_one},
Children = [
worker_spec(database),
worker_spec(cache), % Depends on database
worker_spec(api) % Depends on cache
],
{ok, {SupFlags, Children}}.
worker_spec(Name) ->
#{
id => Name,
start => {Name, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker
}.
%%%===================================================================
%%% Nested supervisors (supervision tree)
%%%===================================================================
-module(app_supervisor).
-behaviour(supervisor).
-export([start_link/0, init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_one},
ChildSpecs = [
#{
id => database_sup,
start => {database_supervisor, start_link, []},
restart => permanent,
type => supervisor
},
#{
id => api_sup,
start => {api_supervisor, start_link, []},
restart => permanent,
type => supervisor
},
#{
id => worker_sup,
start => {worker_supervisor, start_link, []},
restart => permanent,
type => supervisor
}
],
{ok, {SupFlags, ChildSpecs}}.
%%%===================================================================
%%% Dynamic supervision
%%%===================================================================
-module(dynamic_sup).
-behaviour(supervisor).
-export([start_link/0, start_child/1, stop_child/1]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(Args) ->
supervisor:start_child(?MODULE, [Args]).
stop_child(Pid) ->
supervisor:terminate_child(?MODULE, Pid).
init([]) ->
SupFlags = #{
strategy => simple_one_for_one,
intensity => 5,
period => 60
},
ChildSpec = #{
id => worker,
start => {worker, start_link, []},
restart => temporary,
shutdown => 5000,
type => worker
},
{ok, {SupFlags, [ChildSpec]}}.
Supervisor树提供自动故障恢复和系统弹性。
最佳实践
-
使用gen_server用于有状态进程以利用OTP基础设施和错误处理
-
实现所有回调函数即使它们返回默认值以确保完整性
-
保持状态记录简单以减少复杂性并提高可维护性
-
使用handle_cast用于即发即弃操作无需响应要求
-
实现适当的终止在terminate/2中进行资源清理
-
设置适当的超时值以防止调用中无限阻塞
-
使用gen_statem用于复杂状态机具有许多状态和转换
-
设计supervisor层次结构匹配应用程序组件依赖关系
-
使用适当的重启策略基于子进程关系
-
测试supervisor行为通过有意使子进程崩溃以验证恢复
常见陷阱
-
在handle_call中阻塞阻止处理其他消息导致死锁
-
不匹配所有消息模式导致未处理消息积累
-
忘记回复在handle_call中使调用者无限等待
-
使用错误的supervision策略导致不必要的进程重启
-
不设置process_flag trap_exit阻止优雅终止处理
-
创建循环依赖在supervisor树中导致启动失败
-
使用temporary重启用于关键进程允许永久故障
-
不实现code_change阻止热代码升级
-
存储大状态在gen_server中导致内存问题
-
不处理超时在状态机中允许无限阻塞
何时使用此技能
应用gen_server用于任何需要客户端-服务器交互的有状态进程。
使用gen_statem当实现具有显式状态转换的协议或系统时。
利用supervisors用于所有需要容错和自动恢复的应用程序。
构建supervisor树以结构化具有多个组件的复杂应用程序。
使用OTP行为用于需要可靠性和可维护性的生产系统。