GleamActor模型Skill GleamActorModel

这个技能是关于在Gleam编程语言中使用Actor Model来构建并发和容错的应用程序,涵盖进程管理、消息传递、GenServer模式、监控器和故障容忍技术。适用于后端开发、系统架构和分布式系统构建。关键词:Gleam, Actor Model, OTP, 并发编程, 容错系统, 消息传递, GenServer, Supervisor, 后端开发, 软件架构。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

名称: Gleam Actor 模型 用户可调用: false 描述: 当在Gleam中使用OTP actor模式时使用,包括进程、消息传递、GenServer实现、监控器、故障容忍、状态管理,以及在Erlang VM上构建并发、容错的应用程序。 允许工具: []

Gleam Actor 模型

引言

Gleam利用Erlang VM的actor模型,支持轻量级并发进程通过消息传递进行通信。该模型提供固有的故障容忍、隔离和可扩展性,非常适合构建分布式系统。

Gleam中的actor模型使用OTP(开放电信平台)模式,包括用于有状态进程的GenServer、用于故障恢复的监控器,以及用于进程间通信的消息传递。每个进程有自己的堆,并以异步方式通信,消除了共享内存的担忧。

本技能涵盖进程创建和消息传递、用于有状态actor的GenServer模式、监控器和故障容忍、进程链接和监控、选择性接收,以及构建稳健并发应用程序的模式。

进程基础和消息传递

进程是轻量级、隔离的执行单元,通过消息传递通信。

导入 gleam/erlang/process
导入 gleam/io

// 基本进程创建
公共 函数 简单进程() {
  process.spawn(函数() {
    io.println("来自进程的问候!")
  })
}

// 带消息传递的进程
公共 类型 消息 {
  请求
  响应
  停止
}

公共 函数 回音进程() {
  让 subject = process.new_subject()

  process.spawn(函数() {
    循环(subject)
  })

  subject
}

函数 循环(subject: process.Subject(消息)) {
  情况 process.receive(subject, 1000) {
    确认(请求) -> {
      io.println("收到请求")
      循环(subject)
    }
    确认(响应) -> {
      io.println("收到响应")
      循环(subject)
    }
    确认(停止) -> {
      io.println("停止")
      空
    }
    错误(_) -> {
      io.println("超时")
      循环(subject)
    }
  }
}

// 发送消息
公共 函数 发送消息(subject: process.Subject(消息)) {
  process.send(subject, 请求)
  process.send(subject, 响应)
  process.send(subject, 停止)
}

// 请求-响应模式
公共 类型 请求 {
  获取值(回复到: process.Subject(整数))
  设置值(值: 整数, 回复到: process.Subject(空))
}

公共 函数 状态进程(初始: 整数) {
  让 subject = process.new_subject()

  process.spawn(函数() {
    状态循环(subject, 初始)
  })

  subject
}

函数 状态循环(subject: process.Subject(请求), 状态: 整数) {
  情况 process.receive(subject, 5000) {
    确认(获取值(回复到)) -> {
      process.send(回复到, 状态)
      状态循环(subject, 状态)
    }
    确认(设置值(值, 回复到)) -> {
      process.send(回复到, 空)
      状态循环(subject, 值)
    }
    错误(_) -> 状态循环(subject, 状态)
  }
}

// 调用状态进程
公共 函数 使用状态进程() {
  让 proc = 状态进程(0)
  让 回复主题 = process.new_subject()

  // 设置值
  process.send(proc, 设置值(42, 回复主题))
  让 _确认 = process.receive(回复主题, 1000)

  // 获取值
  process.send(proc, 获取值(回复主题))
  情况 process.receive(回复主题, 1000) {
    确认(值) -> io.debug(值)
    错误(_) -> io.println("超时")
  }
}

// 多消息类型进程
公共 类型 服务器消息 {
  请求(id: 整数, 回复到: process.Subject(字符串))
  广播(消息: 字符串)
  关闭
}

公共 函数 多消息进程() {
  让 subject = process.new_subject()

  process.spawn(函数() {
    多循环(subject, [])
  })

  subject
}

函数 多循环(
  subject: process.Subject(服务器消息),
  客户端列表: 列表(process.Subject(字符串)),
) {
  情况 process.receive(subject, 1000) {
    确认(请求(id, 回复到)) -> {
      让 响应 = "响应为 " <> 整数.到字符串(id)
      process.send(回复到, 响应)
      多循环(subject, [回复到, ..客户端列表])
    }
    确认(广播(消息)) -> {
      列表.每个(客户端列表, 函数(客户端) {
        process.send(客户端, 消息)
      })
      多循环(subject, 客户端列表)
    }
    确认(关闭) -> 空
    错误(_) -> 多循环(subject, 客户端列表)
  }
}

// 进程池
公共 函数 工作池(大小: 整数) -> 列表(process.Subject(消息)) {
  列表.范围(1, 大小)
  |> 列表.映射(函数(_) { 回音进程() })
}

公共 函数 分发工作(池: 列表(process.Subject(消息)),
                      工作: 列表(消息)) {
  列表.压缩(工作, 列表.循环(池))
  |> 列表.每个(函数(对) {
    让 #(消息, 工作者) = 对
    process.send(工作者, 消息)
  })
}

轻量级进程与消息传递使并发应用程序无需共享内存复杂性。

GenServer 模式

GenServer为有状态进程提供标准模式,支持同步和异步操作。

导入 gleam/otp/actor
导入 gleam/erlang/process

// 状态类型
公共 类型 计数器 {
  计数器(值: 整数)
}

// 消息类型
公共 类型 计数器消息 {
  递增
  递减
  获取值(回复到: process.Subject(整数))
  重置(回复到: process.Subject(空))
}

// GenServer实现
公共 函数 启动计数器() -> 结果(process.Subject(计数器消息),
                                  actor.启动错误) {
  actor.start(计数器(值: 0), 处理消息)
}

函数 处理消息(
  消息: 计数器消息,
  状态: 计数器,
) -> actor.下一个(计数器消息, 计数器) {
  情况 消息 {
    递增 -> {
      actor.continue(计数器(值: 状态.值 + 1))
    }
    递减 -> {
      actor.continue(计数器(值: 状态.值 - 1))
    }
    获取值(回复到) -> {
      process.send(回复到, 状态.值)
      actor.continue(状态)
    }
    重置(回复到) -> {
      process.send(回复到, 空)
      actor.continue(计数器(值: 0))
    }
  }
}

// 使用GenServer
公共 函数 使用计数器() {
  情况 启动计数器() {
    确认(计数器) -> {
      // 递增
      process.send(计数器, 递增)
      process.send(计数器, 递增)

      // 获取值
      让 回复 = process.new_subject()
      process.send(计数器, 获取值(回复))
      情况 process.receive(回复, 1000) {
        确认(值) -> io.debug(值)  // 2
        错误(_) -> io.println("超时")
      }
    }
    错误(_) -> io.println("启动计数器失败")
  }
}

// 带复杂状态的GenServer
公共 类型 缓存状态 {
  缓存状态(条目: 字典(字符串, 字符串), 最大大小: 整数)
}

公共 类型 缓存消息 {
  获取(键: 字符串, 回复到: process.Subject(选项(字符串)))
  放置(键: 字符串, 值: 字符串, 回复到: process.Subject(布尔))
  删除(键: 字符串, 回复到: process.Subject(布尔))
  大小(回复到: process.Subject(整数))
}

公共 函数 启动缓存(最大大小: 整数) -> 结果(process.Subject(缓存消息),
                                           actor.启动错误) {
  actor.start(
    缓存状态(条目: dict.new(), 最大大小: 最大大小),
    处理缓存消息,
  )
}

函数 处理缓存消息(
  消息: 缓存消息,
  状态: 缓存状态,
) -> actor.下一个(缓存消息, 缓存状态) {
  情况 消息 {
    获取(键, 回复到) -> {
      让 值 = dict.get(状态.条目, 键)
      process.send(回复到, 值)
      actor.continue(状态)
    }
    放置(键, 值, 回复到) -> {
      让 当前大小 = dict.size(状态.条目)
      情况 当前大小 < 状态.最大大小 {
        真 -> {
          让 新条目 = dict.insert(状态.条目, 键, 值)
          process.send(回复到, 真)
          actor.continue(缓存状态(..状态, 条目: 新条目))
        }
        假 -> {
          process.send(回复到, 假)
          actor.continue(状态)
        }
      }
    }
    删除(键, 回复到) -> {
      让 新条目 = dict.delete(状态.条目, 键)
      process.send(回复到, 真)
      actor.continue(缓存状态(..状态, 条目: 新条目))
    }
    大小(回复到) -> {
      process.send(回复到, dict.size(状态.条目))
      actor.continue(状态)
    }
  }
}

// 带初始化的GenServer
公共 类型 连接状态 {
  连接状态(网址: 字符串, 已连接: 布尔)
}

公共 类型 连接消息 {
  连接(回复到: process.Subject(结果(空, 字符串)))
  断开连接
  状态(回复到: process.Subject(布尔))
}

公共 函数 启动连接(网址: 字符串) ->
  结果(process.Subject(连接消息), actor.启动错误) {
  actor.start_spec(actor.规格(
    初始化: 函数() {
      // 初始化逻辑
      让 状态 = 连接状态(网址: 网址, 已连接: 假)
      actor.Ready(状态, process.new_selector())
    },
    初始化超时: 5000,
    循环: 处理连接消息,
  ))
}

函数 处理连接消息(
  消息: 连接消息,
  状态: 连接状态,
) -> actor.下一个(连接消息, 连接状态) {
  情况 消息 {
    连接(回复到) -> {
      情况 状态.已连接 {
        真 -> {
          process.send(回复到, 错误("已连接"))
          actor.continue(状态)
        }
        假 -> {
          // 模拟连接
          process.send(回复到, 确认(空))
          actor.continue(连接状态(..状态, 已连接: 真))
        }
      }
    }
    断开连接 -> {
      actor.continue(连接状态(..状态, 已连接: 假))
    }
    状态(回复到) -> {
      process.send(回复到, 状态.已连接)
      actor.continue(状态)
    }
  }
}

// 带超时的GenServer
公共 类型 超时消息 {
  心跳
  数据(字符串)
  超时
}

公共 函数 超时actor() -> 结果(process.Subject(超时消息), actor.启动错误) {
  actor.start(0, 函数(消息, 状态) {
    情况 消息 {
      心跳 -> {
        io.println("收到心跳")
        actor.continue(状态)
      }
      数据(字符串) -> {
        io.println("数据: " <> 字符串)
        actor.continue(状态)
      }
      超时 -> {
        io.println("发生超时")
        actor.Stop(process.Normal)
      }
    }
  })
}

GenServer模式为有状态并发进程提供结构化消息处理。

监控器和故障容忍

监控器监控子进程并在失败时重启,实现故障容忍系统。

导入 gleam/otp/supervisor
导入 gleam/erlang/process

// 简单工作者
公共 函数 工作者() -> 结果(process.Subject(消息), actor.启动错误) {
  actor.start(0, 函数(消息, 状态) {
    情况 消息 {
      请求 -> {
        io.println("工作者存活")
        actor.continue(状态)
      }
      停止 -> actor.Stop(process.Normal)
      _ -> actor.continue(状态)
    }
  })
}

// 监控器规范
公共 函数 启动监控器() ->
  结果(process.Subject(supervisor.消息), supervisor.启动错误) {
  supervisor.start(函数(子进程) {
    子进程
    |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
    |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
  })
}

// 带命名工作者的监控器
公共 类型 工作者名称 {
  计数器工作者
  缓存工作者
  数据库工作者
}

公共 函数 启动命名监控器() ->
  结果(process.Subject(supervisor.消息), supervisor.启动错误) {
  supervisor.start(函数(子进程) {
    子进程
    |> supervisor.add(supervisor.worker_spec(
      启动: 函数(_) { 启动计数器() },
      重启: supervisor.RestartForever,
    ))
    |> supervisor.add(supervisor.worker_spec(
      启动: 函数(_) { 启动缓存(100) },
      重启: supervisor.RestartForever,
    ))
  })
}

// 监控器树
公共 函数 启动应用() ->
  结果(process.Subject(supervisor.消息), supervisor.启动错误) {
  supervisor.start(函数(子进程) {
    子进程
    // 工作者
    |> supervisor.add(supervisor.worker(函数(_) { 启动计数器() }))
    |> supervisor.add(supervisor.worker(函数(_) { 启动缓存(100) }))
    // 子监控器
    |> supervisor.add(supervisor.supervisor(函数(子进程) {
      子进程
      |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
      |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
    }))
  })
}

// 自定义重启策略
公共 函数 启动自定义监控器() ->
  结果(process.Subject(supervisor.消息), supervisor.启动错误) {
  supervisor.start_spec(supervisor.规格(
    参数: 空,
    最大频率: 5,
    频率周期: 60,
    初始化: 函数(子进程) {
      子进程
      |> supervisor.add(supervisor.worker_spec(
        启动: 函数(_) { 工作者() },
        重启: supervisor.RestartTransient,  // 仅异常退出时重启
      ))
    },
  ))
}

// 一对一 vs 一对多
公共 函数 一对一监控器() {
  // 每个子进程独立重启
  supervisor.start(函数(子进程) {
    子进程
    |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
    |> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
  })
}

// 动态监控器(运行时添加子进程)
公共 类型 动态消息 {
  添加工作者(回复到: process.Subject(结果(process.Pid, 字符串)))
  移除工作者(pid: process.Pid)
}

监控器通过进程监控和重启提供自动故障恢复和系统弹性。

进程链接和监控

链接和监控使进程能对相关进程的失败做出反应。

导入 gleam/erlang/process

// 进程链接
公共 函数 链接进程() {
  让 parent = process.self()

  让 child = process.spawn_link(函数() {
    io.println("子进程启动")
    process.sleep(1000)
    io.println("子进程退出")
  })

  // 父进程链接到子进程 - 将接收退出信号
  io.println("父进程等待...")
  process.sleep(2000)
}

// 进程监控
公共 函数 监控进程() {
  让 monitored = process.spawn(函数() {
    io.println("监控进程启动")
    process.sleep(1000)
  })

  让 monitor = process.monitor_process(monitored)

  // 等待关闭消息
  让 selector = process.new_selector()
    |> process.selecting_process_down(monitor, 函数(down) { down })

  情况 process.select(selector, 2000) {
    确认(down) -> io.println("进程退出")
    错误(_) -> io.println("仍在运行")
  }
}

// 捕获退出用于监控
公共 函数 陷阱退出() {
  process.trap_exits(真)

  让 child = process.spawn_link(函数() {
    io.println("子进程启动")
    恐慌为 "模拟错误"
  })

  让 selector = process.new_selector()
    |> process.selecting_trapped_exits(函数(exit) { exit })

  情况 process.select(selector, 2000) {
    确认(exit) -> {
      io.println("捕获子进程退出")
      // 可在此重启子进程
    }
    错误(_) -> io.println("未收到退出")
  }
}

// 监控多个进程
公共 函数 监控池(工作者列表: 列表(process.Pid)) {
  让 monitors = 列表.映射(工作者列表, process.monitor_process)

  // 处理任何工作者失败
  让 selector = 列表.折叠(monitors, process.new_selector(), 函数(sel, mon) {
    process.selecting_process_down(sel, mon, 函数(down) { down })
  })

  情况 process.select(selector, 10000) {
    确认(down) -> {
      io.println("工作者失败")
      // 重启逻辑在此
    }
    错误(_) -> io.println("所有工作者健康")
  }
}

链接和监控使构建故障容忍系统具备适当失败处理。

最佳实践

  1. 对有状态进程使用GenServer 以利用OTP模式和标准行为

  2. 将GenServer包装在监控器树中 以启用自动故障恢复

  3. 保持进程状态最小 以减少内存使用和简化状态管理

  4. 使用带回复字段的消息类型 用于同步请求-响应模式

  5. 在接收操作上设置适当超时 以防止无限阻塞

  6. 监控外部进程而非链接 当你不想一起崩溃时

  7. 使用描述性消息类型 而非通用元组

  8. 处理所有消息类型 在循环中以防止意外消息积累

  9. 为失败设计 假设进程会崩溃并使用监控器

  10. 保持进程层次简单 有清晰的父子关系

常见陷阱

  1. 未处理超时情况 在接收操作中导致进程无限挂起

  2. 忘记回复 在请求-响应模式中导致客户端超时

  3. 创建过多进程 无理由增加开销无益

  4. 不使用监控器 失去actor模型的故障容忍优势

  5. 在消息处理器中阻塞 阻止处理其他消息导致死锁

  6. 积累未消费消息 在邮箱中导致内存泄漏

  7. 错误链接进程 导致意外崩溃传播

  8. 未设置init_timeout 在actor上导致启动延迟崩溃系统

  9. 使用共享可变状态 破坏actor模型的隔离优势

  10. 忽略退出信号 当捕获退出时阻止适当清理

何时使用此技能

应用actor于需要隔离状态和基于消息通信的并发操作。

使用GenServer当实现有状态服务如缓存、连接或工作者时。

利用监控器用于任何应在失败时自动重启的进程。

应用进程监控当一个进程需要对另一个的终止做出反应时。

使用进程池用于跨多个并发工作者分发工作。

构建监控器树用于结构化具有多个组件的复杂应用。

资源