ElixirOTP模式Skill elixir-otp-patterns

这个技能专注于Elixir OTP模式,用于开发高并发、高容错的应用程序。涵盖GenServer、Supervisor、Agent、Task等核心组件,适用于后端系统、分布式架构和实时应用。关键词:Elixir, OTP, 并发编程, 容错系统, GenServer, Supervisor, Agent, Task, 后端开发。

后端开发 0 次安装 2 次浏览 更新于 3/25/2026

name: elixir-otp-patterns user-invocable: false description: 使用Elixir OTP模式,包括GenServer、Supervisor、Agent和Task。用于构建并发、容错的Elixir应用程序。 allowed-tools:

  • Bash
  • Read

Elixir OTP 模式

掌握OTP(开放电信平台)模式以构建并发、容错的Elixir应用程序。本技能涵盖GenServer、Supervisor、Agent、Task和其他OTP行为。

GenServer 基础

defmodule Counter do
  use GenServer

  # 客户端 API

  def start_link(initial_value \\ 0) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end

  def increment do
    GenServer.cast(__MODULE__, :increment)
  end

  def get_value do
    GenServer.call(__MODULE__, :get_value)
  end

  # 服务器回调

  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end

  @impl true
  def handle_call(:get_value, _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_cast(:increment, state) do
    {:noreply, state + 1}
  end
end

# 使用
{:ok, _pid} = Counter.start_link(0)
Counter.increment()
Counter.get_value()  # => 1

GenServer 与状态管理

defmodule UserCache do
  use GenServer

  # 客户端 API

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def put(user_id, user_data) do
    GenServer.cast(__MODULE__, {:put, user_id, user_data})
  end

  def get(user_id) do
    GenServer.call(__MODULE__, {:get, user_id})
  end

  def delete(user_id) do
    GenServer.cast(__MODULE__, {:delete, user_id})
  end

  def all do
    GenServer.call(__MODULE__, :all)
  end

  # 服务器回调

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:get, user_id}, _from, state) do
    {:reply, Map.get(state, user_id), state}
  end

  @impl true
  def handle_call(:all, _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_cast({:put, user_id, user_data}, state) do
    {:noreply, Map.put(state, user_id, user_data)}
  end

  @impl true
  def handle_cast({:delete, user_id}, state) do
    {:noreply, Map.delete(state, user_id)}
  end
end

监控器策略

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # One-for-one: 仅重启失败的子进程
      {Counter, 0},
      {UserCache, []},

      # One-for-all 监控器
      {Supervisor,
       strategy: :one_for_all,
       name: MyApp.CriticalSupervisor,
       children: [
         {Database, []},
         {Cache, []}
       ]},

      # Rest-for-one 监控器
      {Supervisor,
       strategy: :rest_for_one,
       name: MyApp.OrderedSupervisor,
       children: [
         {ConfigLoader, []},
         {DatabasePool, []},
         {WebServer, []}
       ]}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

动态监控器

defmodule TaskRunner do
  use GenServer

  def start_link(task_id) do
    GenServer.start_link(__MODULE__, task_id)
  end

  @impl true
  def init(task_id) do
    Process.send_after(self(), :run_task, 0)
    {:ok, task_id}
  end

  @impl true
  def handle_info(:run_task, task_id) do
    # 执行任务工作
    IO.puts("运行任务 #{task_id}")
    {:noreply, task_id}
  end
end

defmodule TaskSupervisor do
  use DynamicSupervisor

  def start_link(_opts) do
    DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def start_task(task_id) do
    spec = {TaskRunner, task_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_task(pid) do
    DynamicSupervisor.terminate_child(__MODULE__, pid)
  end

  @impl true
  def init(:ok) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

# 使用
TaskSupervisor.start_link([])
{:ok, pid} = TaskSupervisor.start_task(1)
TaskSupervisor.stop_task(pid)

Agent 用于简单状态

defmodule SimpleCounter do
  use Agent

  def start_link(initial_value) do
    Agent.start_link(fn -> initial_value end, name: __MODULE__)
  end

  def increment do
    Agent.update(__MODULE__, &(&1 + 1))
  end

  def decrement do
    Agent.update(__MODULE__, &(&1 - 1))
  end

  def value do
    Agent.get(__MODULE__, & &1)
  end

  def reset do
    Agent.update(__MODULE__, fn _ -> 0 end)
  end
end

# 使用
{:ok, _pid} = SimpleCounter.start_link(0)
SimpleCounter.increment()
SimpleCounter.value()  # => 1

Task 用于异步操作

defmodule DataFetcher do
  def fetch_all do
    tasks = [
      Task.async(fn -> fetch_users() end),
      Task.async(fn -> fetch_posts() end),
      Task.async(fn -> fetch_comments() end)
    ]

    results = Task.await_many(tasks, 5000)

    %{
      users: Enum.at(results, 0),
      posts: Enum.at(results, 1),
      comments: Enum.at(results, 2)
    }
  end

  defp fetch_users do
    # 模拟 API 调用
    Process.sleep(100)
    ["user1", "user2", "user3"]
  end

  defp fetch_posts do
    Process.sleep(200)
    ["post1", "post2"]
  end

  defp fetch_comments do
    Process.sleep(150)
    ["comment1", "comment2", "comment3"]
  end
end

Task.Supervisor 用于管理任务

defmodule MyApp.TaskSupervisor do
  use Task.Supervisor

  def start_link(_opts) do
    Task.Supervisor.start_link(name: __MODULE__)
  end

  def run_task(fun) do
    Task.Supervisor.async(__MODULE__, fun)
  end

  def run_task_nolink(fun) do
    Task.Supervisor.async_nolink(__MODULE__, fun)
  end
end

# 在 application.ex 中
children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

# 使用
task = Task.Supervisor.async(
  MyApp.TaskSupervisor,
  fn -> expensive_operation() end
)
result = Task.await(task)

GenServer 与超时

defmodule SessionManager do
  use GenServer

  @timeout 60_000  # 60 秒

  def start_link(session_id) do
    GenServer.start_link(__MODULE__, session_id)
  end

  def refresh(pid) do
    GenServer.cast(pid, :refresh)
  end

  @impl true
  def init(session_id) do
    {:ok, session_id, @timeout}
  end

  @impl true
  def handle_cast(:refresh, state) do
    {:noreply, state, @timeout}
  end

  @impl true
  def handle_info(:timeout, state) do
    IO.puts("会话 #{state} 超时")
    {:stop, :normal, state}
  end
end

Registry 用于进程查找

defmodule UserSession do
  use GenServer

  def start_link(user_id) do
    GenServer.start_link(
      __MODULE__,
      user_id,
      name: via_tuple(user_id)
    )
  end

  def via_tuple(user_id) do
    {:via, Registry, {MyApp.Registry, {:user_session, user_id}}}
  end

  def send_message(user_id, message) do
    case Registry.lookup(MyApp.Registry, {:user_session, user_id}) do
      [{pid, _}] ->
        GenServer.cast(pid, {:message, message})
      [] ->
        {:error, :not_found}
    end
  end

  @impl true
  def init(user_id) do
    {:ok, %{user_id: user_id, messages: []}}
  end

  @impl true
  def handle_cast({:message, message}, state) do
    {:noreply, %{state | messages: [message | state.messages]}}
  end
end

# 在 application.ex 中
children = [
  {Registry, keys: :unique, name: MyApp.Registry}
]

实现 GenServer 与状态清理

defmodule FileWatcher do
  use GenServer

  def start_link(file_path) do
    GenServer.start_link(__MODULE__, file_path)
  end

  @impl true
  def init(file_path) do
    case File.open(file_path, [:read]) do
      {:ok, file} ->
        schedule_check()
        {:ok, %{file: file, path: file_path, position: 0}}
      {:error, reason} ->
        {:stop, reason}
    end
  end

  @impl true
  def handle_info(:check, state) do
    # 从文件读取新行
    schedule_check()
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, %{file: file}) do
    File.close(file)
    :ok
  end

  defp schedule_check do
    Process.send_after(self(), :check, 1000)
  end
end

使用 ETS 与 GenServer

defmodule CacheServer do
  use GenServer

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def put(key, value) do
    GenServer.call(__MODULE__, {:put, key, value})
  end

  def get(key) do
    case :ets.lookup(__MODULE__, key) do
      [{^key, value}] -> {:ok, value}
      [] -> :not_found
    end
  end

  @impl true
  def init(:ok) do
    :ets.new(__MODULE__, [:named_table, :set, :public])
    {:ok, %{}}
  end

  @impl true
  def handle_call({:put, key, value}, _from, state) do
    :ets.insert(__MODULE__, {key, value})
    {:reply, :ok, state}
  end
end

何时使用此技能

使用 elixir-otp-patterns 当您需要:

  • 使用隔离进程构建并发应用程序
  • 通过监控树实现容错系统
  • 管理进程生命周期中的应用状态
  • 为异步任务处理创建工作池
  • 构建多并发用户的实时系统
  • 实现发布/订阅或事件驱动架构
  • 创建具有进程通信的分布式系统
  • 处理长时间运行的背景作业
  • 构建可扩展的 Web 服务器和 API

最佳实践

  • 使用 GenServer 处理有复杂逻辑的有状态进程
  • 使用 Agent 处理不需要自定义逻辑的简单状态
  • 使用 Task 处理一次性异步操作
  • 始终定义适当的监控策略
  • 使用 Registry 进行动态进程查找
  • 实现适当的超时处理
  • 在 terminate/2 回调中清理资源
  • 使用 via 元组进行命名进程注册
  • 将客户端 API 与服务器回调分离
  • 保持 handle_* 函数专注和简单

常见陷阱

  • 未实现适当的监控策略
  • 用长时间运行的操作阻塞 GenServer 调用
  • 忘记处理 :timeout 消息
  • 未在 terminate/2 中清理资源
  • 当需要同步确认时使用 cast
  • 不必要地创建太多进程
  • 未正确处理进程退出
  • 在进程状态中存储大数据而不是 ETS
  • 未使用 Registry 进行动态进程管理
  • 忽略异步操作中的背压

资源