ErlangOTP行为模式Skill ErlangOTPBehaviors

本技能涉及使用Erlang OTP行为模式构建可靠、生产就绪的分布式应用程序。包括gen_server用于有状态客户端-服务器进程,gen_statem用于复杂状态机,supervisors用于容错系统,gen_event用于事件处理,以及应用行为用于打包。适用于后端开发、服务器端编程和高可用性系统。关键词:Erlang, OTP, 行为模式, gen_server, gen_statem, supervisor, 容错, 状态机, 事件处理, 应用程序开发。

后端开发 0 次安装 2 次浏览 更新于 3/25/2026

名称: Erlang OTP 行为模式 用户不可调用: false 描述: 使用OTP行为包括gen_server用于有状态进程、gen_statem用于状态机、supervisors用于容错、gen_event用于事件处理,以及使用经过验证的模式构建健壮、生产就绪的Erlang应用程序。 允许工具: []

Erlang OTP 行为模式

简介

OTP(开放电信平台)行为为Erlang系统中的常见进程类型提供可重用模式。这些抽象处理复杂的细节,如消息传递、错误处理和状态管理,允许开发人员专注于业务逻辑,同时保持系统可靠性。

行为定义进程必须实现的接口,OTP处理基础设施。Gen_server提供客户端-服务器进程,gen_statem实现状态机,supervisors管理进程生命周期,gen_event协调事件分发。理解这些模式对于生产Erlang系统至关重要。

本技能涵盖gen_server用于有状态进程、gen_statem用于复杂状态机、supervisor树用于容错、gen_event用于事件处理、应用行为用于打包,以及构建健壮OTP系统的模式。

Gen_Server 基础

Gen_server通过同步和异步通信实现客户端-服务器进程。

-module(counter_server).
-behaviour(gen_server).

%% API
-export([start_link/0, increment/0, decrement/0, get_value/0, reset/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).

%% State record
-record(state, {count = 0}).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

increment() ->
    gen_server:cast(?SERVER, increment).

decrement() ->
    gen_server:cast(?SERVER, decrement).

get_value() ->
    gen_server:call(?SERVER, get_value).

reset() ->
    gen_server:call(?SERVER, reset).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init([]) ->
    {ok, #state{}}.

%% Synchronous calls (with response)
handle_call(get_value, _From, State) ->
    {reply, State#state.count, State};

handle_call(reset, _From, State) ->
    {reply, ok, State#state{count = 0}};

handle_call(_Request, _From, State) ->
    {reply, ignored, State}.

%% Asynchronous casts (no response)
handle_cast(increment, State) ->
    NewCount = State#state.count + 1,
    {noreply, State#state{count = NewCount}};

handle_cast(decrement, State) ->
    NewCount = State#state.count - 1,
    {noreply, State#state{count = NewCount}};

handle_cast(_Msg, State) ->
    {noreply, State}.

%% Handle other messages
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Complex gen_server example: Cache
%%%===================================================================

-module(cache_server).
-behaviour(gen_server).

-export([start_link/1, put/2, get/1, delete/1, clear/0, size/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {
    cache = #{},
    max_size = 1000,
    hits = 0,
    misses = 0
}).

start_link(MaxSize) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [MaxSize], []).

put(Key, Value) ->
    gen_server:call(?MODULE, {put, Key, Value}).

get(Key) ->
    gen_server:call(?MODULE, {get, Key}).

delete(Key) ->
    gen_server:cast(?MODULE, {delete, Key}).

clear() ->
    gen_server:cast(?MODULE, clear).

size() ->
    gen_server:call(?MODULE, size).

init([MaxSize]) ->
    process_flag(trap_exit, true),
    {ok, #state{max_size = MaxSize}}.

handle_call({put, Key, Value}, _From, State) ->
    Cache = State#state.cache,
    case maps:size(Cache) >= State#state.max_size of
        true ->
            {reply, {error, cache_full}, State};
        false ->
            NewCache = maps:put(Key, Value, Cache),
            {reply, ok, State#state{cache = NewCache}}
    end;

handle_call({get, Key}, _From, State) ->
    Cache = State#state.cache,
    case maps:find(Key, Cache) of
        {ok, Value} ->
            NewState = State#state{hits = State#state.hits + 1},
            {reply, {ok, Value}, NewState};
        error ->
            NewState = State#state{misses = State#state.misses + 1},
            {reply, not_found, NewState}
    end;

handle_call(size, _From, State) ->
    Size = maps:size(State#state.cache),
    {reply, Size, State};

handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast({delete, Key}, State) ->
    NewCache = maps:remove(Key, State#state.cache),
    {noreply, State#state{cache = NewCache}};

handle_cast(clear, State) ->
    {noreply, State#state{cache = #{}}};

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(Reason, State) ->
    io:format("Cache terminating: ~p~n", [Reason]),
    io:format("Stats - Hits: ~p, Misses: ~p~n", [State#state.hits, State#state.misses]),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% gen_server with timeouts
%%%===================================================================

-module(session_server).
-behaviour(gen_server).

-export([start_link/0, touch/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(TIMEOUT, 30000). % 30 seconds

-record(state, {
    last_activity,
    data = #{}
}).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

touch() ->
    gen_server:cast(?MODULE, touch).

init([]) ->
    {ok, #state{last_activity = erlang:system_time(millisecond)}, ?TIMEOUT}.

handle_call(_Request, _From, State) ->
    {reply, ok, State, ?TIMEOUT}.

handle_cast(touch, State) ->
    NewState = State#state{last_activity = erlang:system_time(millisecond)},
    {noreply, NewState, ?TIMEOUT};

handle_cast(_Msg, State) ->
    {noreply, State, ?TIMEOUT}.

handle_info(timeout, State) ->
    io:format("Session timed out~n"),
    {stop, normal, State};

handle_info(_Info, State) ->
    {noreply, State, ?TIMEOUT}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

Gen_server为有状态进程提供客户端-服务器模式的结构。

Gen_Statem 用于状态机

Gen_statem通过显式状态转换实现有限状态机。

-module(door_fsm).
-behaviour(gen_statem).

-export([start_link/0, open/0, close/0, lock/0, unlock/1]).
-export([init/1, callback_mode/0, terminate/3, code_change/4]).
-export([locked/3, unlocked/3, open/3]).

-define(CODE, "1234").

start_link() ->
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).

open() ->
    gen_statem:call(?MODULE, open).

close() ->
    gen_statem:call(?MODULE, close).

lock() ->
    gen_statem:call(?MODULE, lock).

unlock(Code) ->
    gen_statem:call(?MODULE, {unlock, Code}).

init([]) ->
    {ok, locked, #{}}.

callback_mode() ->
    state_functions.

%% Locked state
locked(call, {unlock, Code}, Data) when Code =:= ?CODE ->
    {next_state, unlocked, Data, [{reply, ok}]};

locked(call, {unlock, _WrongCode}, Data) ->
    {keep_state, Data, [{reply, {error, wrong_code}}]};

locked(call, _Event, Data) ->
    {keep_state, Data, [{reply, {error, door_locked}}]}.

%% Unlocked state
unlocked(call, lock, Data) ->
    {next_state, locked, Data, [{reply, ok}]};

unlocked(call, open, Data) ->
    {next_state, open, Data, [{reply, ok}]};

unlocked(call, _Event, Data) ->
    {keep_state, Data, [{reply, ok}]}.

%% Open state
open(call, close, Data) ->
    {next_state, unlocked, Data, [{reply, ok}]};

open(call, _Event, Data) ->
    {keep_state, Data, [{reply, {error, door_open}}]}.

terminate(_Reason, _State, _Data) ->
    ok.

code_change(_OldVsn, State, Data, _Extra) ->
    {ok, State, Data}.

%%%===================================================================
%%% Connection state machine
%%%===================================================================

-module(connection_fsm).
-behaviour(gen_statem).

-export([start_link/0, connect/0, disconnect/0, send/1]).
-export([init/1, callback_mode/0, terminate/3, code_change/4]).
-export([disconnected/3, connecting/3, connected/3]).

-record(data, {
    socket = undefined,
    buffer = <<>>,
    retry_count = 0
}).

start_link() ->
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).

connect() ->
    gen_statem:call(?MODULE, connect).

disconnect() ->
    gen_statem:call(?MODULE, disconnect).

send(Data) ->
    gen_statem:call(?MODULE, {send, Data}).

init([]) ->
    {ok, disconnected, #data{}}.

callback_mode() ->
    [state_functions, state_enter].

%% Disconnected state
disconnected(enter, _OldState, _Data) ->
    io:format("Entered disconnected state~n"),
    keep_state_and_data;

disconnected(call, connect, Data) ->
    case connect_to_server() of
        {ok, Socket} ->
            {next_state, connected, Data#data{socket = Socket, retry_count = 0},
             [{reply, ok}]};
        error ->
            NewData = Data#data{retry_count = Data#data.retry_count + 1},
            case NewData#data.retry_count < 3 of
                true ->
                    {next_state, connecting, NewData, [{reply, {error, retrying}}]};
                false ->
                    {keep_state, NewData, [{reply, {error, max_retries}}]}
            end
    end.

%% Connecting state
connecting(enter, _OldState, _Data) ->
    erlang:send_after(1000, self(), retry_connect),
    keep_state_and_data;

connecting(info, retry_connect, Data) ->
    case connect_to_server() of
        {ok, Socket} ->
            {next_state, connected, Data#data{socket = Socket, retry_count = 0}};
        error ->
            NewData = Data#data{retry_count = Data#data.retry_count + 1},
            case NewData#data.retry_count < 3 of
                true ->
                    {keep_state, NewData};
                false ->
                    {next_state, disconnected, NewData}
            end
    end.

%% Connected state
connected(enter, _OldState, _Data) ->
    io:format("Connection established~n"),
    keep_state_and_data;

connected(call, {send, Data}, StateData) ->
    case send_data(StateData#data.socket, Data) of
        ok ->
            {keep_state_and_data, [{reply, ok}]};
        error ->
            {next_state, disconnected, StateData, [{reply, {error, send_failed}}]}
    end;

connected(call, disconnect, StateData) ->
    close_connection(StateData#data.socket),
    {next_state, disconnected, StateData#data{socket = undefined}, [{reply, ok}]}.

terminate(_Reason, _State, Data) ->
    case Data#data.socket of
        undefined -> ok;
        Socket -> close_connection(Socket)
    end.

code_change(_OldVsn, State, Data, _Extra) ->
    {ok, State, Data}.

%% Helper functions
connect_to_server() ->
    {ok, socket}.

send_data(_Socket, _Data) ->
    ok.

close_connection(_Socket) ->
    ok.

Gen_statem提供具有显式转换的结构化状态机实现。

Supervisor 树

Supervisors监控子进程并在失败时重启它们,以实现容错。

-module(my_supervisor).
-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    SupFlags = #{
        strategy => one_for_one,
        intensity => 5,
        period => 60
    },

    ChildSpecs = [
        #{
            id => counter_server,
            start => {counter_server, start_link, []},
            restart => permanent,
            shutdown => 5000,
            type => worker,
            modules => [counter_server]
        },
        #{
            id => cache_server,
            start => {cache_server, start_link, [1000]},
            restart => permanent,
            shutdown => 5000,
            type => worker,
            modules => [cache_server]
        }
    ],

    {ok, {SupFlags, ChildSpecs}}.

%%%===================================================================
%%% Supervisor strategies
%%%===================================================================

%% one_for_one: Restart only failed child
init_one_for_one([]) ->
    SupFlags = #{strategy => one_for_one},
    Children = [worker_spec(worker1), worker_spec(worker2)],
    {ok, {SupFlags, Children}}.

%% one_for_all: Restart all children if any fails
init_one_for_all([]) ->
    SupFlags = #{strategy => one_for_all},
    Children = [worker_spec(worker1), worker_spec(worker2)],
    {ok, {SupFlags, Children}}.

%% rest_for_one: Restart failed child and all started after it
init_rest_for_one([]) ->
    SupFlags = #{strategy => rest_for_one},
    Children = [
        worker_spec(database),
        worker_spec(cache),  % Depends on database
        worker_spec(api)     % Depends on cache
    ],
    {ok, {SupFlags, Children}}.

worker_spec(Name) ->
    #{
        id => Name,
        start => {Name, start_link, []},
        restart => permanent,
        shutdown => 5000,
        type => worker
    }.

%%%===================================================================
%%% Nested supervisors (supervision tree)
%%%===================================================================

-module(app_supervisor).
-behaviour(supervisor).

-export([start_link/0, init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    SupFlags = #{strategy => one_for_one},

    ChildSpecs = [
        #{
            id => database_sup,
            start => {database_supervisor, start_link, []},
            restart => permanent,
            type => supervisor
        },
        #{
            id => api_sup,
            start => {api_supervisor, start_link, []},
            restart => permanent,
            type => supervisor
        },
        #{
            id => worker_sup,
            start => {worker_supervisor, start_link, []},
            restart => permanent,
            type => supervisor
        }
    ],

    {ok, {SupFlags, ChildSpecs}}.

%%%===================================================================
%%% Dynamic supervision
%%%===================================================================

-module(dynamic_sup).
-behaviour(supervisor).

-export([start_link/0, start_child/1, stop_child/1]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

start_child(Args) ->
    supervisor:start_child(?MODULE, [Args]).

stop_child(Pid) ->
    supervisor:terminate_child(?MODULE, Pid).

init([]) ->
    SupFlags = #{
        strategy => simple_one_for_one,
        intensity => 5,
        period => 60
    },

    ChildSpec = #{
        id => worker,
        start => {worker, start_link, []},
        restart => temporary,
        shutdown => 5000,
        type => worker
    },

    {ok, {SupFlags, [ChildSpec]}}.

Supervisor树提供自动故障恢复和系统弹性。

最佳实践

  1. 使用gen_server用于有状态进程以利用OTP基础设施和错误处理

  2. 实现所有回调函数即使它们返回默认值以确保完整性

  3. 保持状态记录简单以减少复杂性并提高可维护性

  4. 使用handle_cast用于即发即弃操作无需响应要求

  5. 实现适当的终止在terminate/2中进行资源清理

  6. 设置适当的超时值以防止调用中无限阻塞

  7. 使用gen_statem用于复杂状态机具有许多状态和转换

  8. 设计supervisor层次结构匹配应用程序组件依赖关系

  9. 使用适当的重启策略基于子进程关系

  10. 测试supervisor行为通过有意使子进程崩溃以验证恢复

常见陷阱

  1. 在handle_call中阻塞阻止处理其他消息导致死锁

  2. 不匹配所有消息模式导致未处理消息积累

  3. 忘记回复在handle_call中使调用者无限等待

  4. 使用错误的supervision策略导致不必要的进程重启

  5. 不设置process_flag trap_exit阻止优雅终止处理

  6. 创建循环依赖在supervisor树中导致启动失败

  7. 使用temporary重启用于关键进程允许永久故障

  8. 不实现code_change阻止热代码升级

  9. 存储大状态在gen_server中导致内存问题

  10. 不处理超时在状态机中允许无限阻塞

何时使用此技能

应用gen_server用于任何需要客户端-服务器交互的有状态进程。

使用gen_statem当实现具有显式状态转换的协议或系统时。

利用supervisors用于所有需要容错和自动恢复的应用程序。

构建supervisor树以结构化具有多个组件的复杂应用程序。

使用OTP行为用于需要可靠性和可维护性的生产系统。

资源