名称: Gleam Actor 模型 用户可调用: false 描述: 当在Gleam中使用OTP actor模式时使用,包括进程、消息传递、GenServer实现、监控器、故障容忍、状态管理,以及在Erlang VM上构建并发、容错的应用程序。 允许工具: []
Gleam Actor 模型
引言
Gleam利用Erlang VM的actor模型,支持轻量级并发进程通过消息传递进行通信。该模型提供固有的故障容忍、隔离和可扩展性,非常适合构建分布式系统。
Gleam中的actor模型使用OTP(开放电信平台)模式,包括用于有状态进程的GenServer、用于故障恢复的监控器,以及用于进程间通信的消息传递。每个进程有自己的堆,并以异步方式通信,消除了共享内存的担忧。
本技能涵盖进程创建和消息传递、用于有状态actor的GenServer模式、监控器和故障容忍、进程链接和监控、选择性接收,以及构建稳健并发应用程序的模式。
进程基础和消息传递
进程是轻量级、隔离的执行单元,通过消息传递通信。
导入 gleam/erlang/process
导入 gleam/io
// 基本进程创建
公共 函数 简单进程() {
process.spawn(函数() {
io.println("来自进程的问候!")
})
}
// 带消息传递的进程
公共 类型 消息 {
请求
响应
停止
}
公共 函数 回音进程() {
让 subject = process.new_subject()
process.spawn(函数() {
循环(subject)
})
subject
}
函数 循环(subject: process.Subject(消息)) {
情况 process.receive(subject, 1000) {
确认(请求) -> {
io.println("收到请求")
循环(subject)
}
确认(响应) -> {
io.println("收到响应")
循环(subject)
}
确认(停止) -> {
io.println("停止")
空
}
错误(_) -> {
io.println("超时")
循环(subject)
}
}
}
// 发送消息
公共 函数 发送消息(subject: process.Subject(消息)) {
process.send(subject, 请求)
process.send(subject, 响应)
process.send(subject, 停止)
}
// 请求-响应模式
公共 类型 请求 {
获取值(回复到: process.Subject(整数))
设置值(值: 整数, 回复到: process.Subject(空))
}
公共 函数 状态进程(初始: 整数) {
让 subject = process.new_subject()
process.spawn(函数() {
状态循环(subject, 初始)
})
subject
}
函数 状态循环(subject: process.Subject(请求), 状态: 整数) {
情况 process.receive(subject, 5000) {
确认(获取值(回复到)) -> {
process.send(回复到, 状态)
状态循环(subject, 状态)
}
确认(设置值(值, 回复到)) -> {
process.send(回复到, 空)
状态循环(subject, 值)
}
错误(_) -> 状态循环(subject, 状态)
}
}
// 调用状态进程
公共 函数 使用状态进程() {
让 proc = 状态进程(0)
让 回复主题 = process.new_subject()
// 设置值
process.send(proc, 设置值(42, 回复主题))
让 _确认 = process.receive(回复主题, 1000)
// 获取值
process.send(proc, 获取值(回复主题))
情况 process.receive(回复主题, 1000) {
确认(值) -> io.debug(值)
错误(_) -> io.println("超时")
}
}
// 多消息类型进程
公共 类型 服务器消息 {
请求(id: 整数, 回复到: process.Subject(字符串))
广播(消息: 字符串)
关闭
}
公共 函数 多消息进程() {
让 subject = process.new_subject()
process.spawn(函数() {
多循环(subject, [])
})
subject
}
函数 多循环(
subject: process.Subject(服务器消息),
客户端列表: 列表(process.Subject(字符串)),
) {
情况 process.receive(subject, 1000) {
确认(请求(id, 回复到)) -> {
让 响应 = "响应为 " <> 整数.到字符串(id)
process.send(回复到, 响应)
多循环(subject, [回复到, ..客户端列表])
}
确认(广播(消息)) -> {
列表.每个(客户端列表, 函数(客户端) {
process.send(客户端, 消息)
})
多循环(subject, 客户端列表)
}
确认(关闭) -> 空
错误(_) -> 多循环(subject, 客户端列表)
}
}
// 进程池
公共 函数 工作池(大小: 整数) -> 列表(process.Subject(消息)) {
列表.范围(1, 大小)
|> 列表.映射(函数(_) { 回音进程() })
}
公共 函数 分发工作(池: 列表(process.Subject(消息)),
工作: 列表(消息)) {
列表.压缩(工作, 列表.循环(池))
|> 列表.每个(函数(对) {
让 #(消息, 工作者) = 对
process.send(工作者, 消息)
})
}
轻量级进程与消息传递使并发应用程序无需共享内存复杂性。
GenServer 模式
GenServer为有状态进程提供标准模式,支持同步和异步操作。
导入 gleam/otp/actor
导入 gleam/erlang/process
// 状态类型
公共 类型 计数器 {
计数器(值: 整数)
}
// 消息类型
公共 类型 计数器消息 {
递增
递减
获取值(回复到: process.Subject(整数))
重置(回复到: process.Subject(空))
}
// GenServer实现
公共 函数 启动计数器() -> 结果(process.Subject(计数器消息),
actor.启动错误) {
actor.start(计数器(值: 0), 处理消息)
}
函数 处理消息(
消息: 计数器消息,
状态: 计数器,
) -> actor.下一个(计数器消息, 计数器) {
情况 消息 {
递增 -> {
actor.continue(计数器(值: 状态.值 + 1))
}
递减 -> {
actor.continue(计数器(值: 状态.值 - 1))
}
获取值(回复到) -> {
process.send(回复到, 状态.值)
actor.continue(状态)
}
重置(回复到) -> {
process.send(回复到, 空)
actor.continue(计数器(值: 0))
}
}
}
// 使用GenServer
公共 函数 使用计数器() {
情况 启动计数器() {
确认(计数器) -> {
// 递增
process.send(计数器, 递增)
process.send(计数器, 递增)
// 获取值
让 回复 = process.new_subject()
process.send(计数器, 获取值(回复))
情况 process.receive(回复, 1000) {
确认(值) -> io.debug(值) // 2
错误(_) -> io.println("超时")
}
}
错误(_) -> io.println("启动计数器失败")
}
}
// 带复杂状态的GenServer
公共 类型 缓存状态 {
缓存状态(条目: 字典(字符串, 字符串), 最大大小: 整数)
}
公共 类型 缓存消息 {
获取(键: 字符串, 回复到: process.Subject(选项(字符串)))
放置(键: 字符串, 值: 字符串, 回复到: process.Subject(布尔))
删除(键: 字符串, 回复到: process.Subject(布尔))
大小(回复到: process.Subject(整数))
}
公共 函数 启动缓存(最大大小: 整数) -> 结果(process.Subject(缓存消息),
actor.启动错误) {
actor.start(
缓存状态(条目: dict.new(), 最大大小: 最大大小),
处理缓存消息,
)
}
函数 处理缓存消息(
消息: 缓存消息,
状态: 缓存状态,
) -> actor.下一个(缓存消息, 缓存状态) {
情况 消息 {
获取(键, 回复到) -> {
让 值 = dict.get(状态.条目, 键)
process.send(回复到, 值)
actor.continue(状态)
}
放置(键, 值, 回复到) -> {
让 当前大小 = dict.size(状态.条目)
情况 当前大小 < 状态.最大大小 {
真 -> {
让 新条目 = dict.insert(状态.条目, 键, 值)
process.send(回复到, 真)
actor.continue(缓存状态(..状态, 条目: 新条目))
}
假 -> {
process.send(回复到, 假)
actor.continue(状态)
}
}
}
删除(键, 回复到) -> {
让 新条目 = dict.delete(状态.条目, 键)
process.send(回复到, 真)
actor.continue(缓存状态(..状态, 条目: 新条目))
}
大小(回复到) -> {
process.send(回复到, dict.size(状态.条目))
actor.continue(状态)
}
}
}
// 带初始化的GenServer
公共 类型 连接状态 {
连接状态(网址: 字符串, 已连接: 布尔)
}
公共 类型 连接消息 {
连接(回复到: process.Subject(结果(空, 字符串)))
断开连接
状态(回复到: process.Subject(布尔))
}
公共 函数 启动连接(网址: 字符串) ->
结果(process.Subject(连接消息), actor.启动错误) {
actor.start_spec(actor.规格(
初始化: 函数() {
// 初始化逻辑
让 状态 = 连接状态(网址: 网址, 已连接: 假)
actor.Ready(状态, process.new_selector())
},
初始化超时: 5000,
循环: 处理连接消息,
))
}
函数 处理连接消息(
消息: 连接消息,
状态: 连接状态,
) -> actor.下一个(连接消息, 连接状态) {
情况 消息 {
连接(回复到) -> {
情况 状态.已连接 {
真 -> {
process.send(回复到, 错误("已连接"))
actor.continue(状态)
}
假 -> {
// 模拟连接
process.send(回复到, 确认(空))
actor.continue(连接状态(..状态, 已连接: 真))
}
}
}
断开连接 -> {
actor.continue(连接状态(..状态, 已连接: 假))
}
状态(回复到) -> {
process.send(回复到, 状态.已连接)
actor.continue(状态)
}
}
}
// 带超时的GenServer
公共 类型 超时消息 {
心跳
数据(字符串)
超时
}
公共 函数 超时actor() -> 结果(process.Subject(超时消息), actor.启动错误) {
actor.start(0, 函数(消息, 状态) {
情况 消息 {
心跳 -> {
io.println("收到心跳")
actor.continue(状态)
}
数据(字符串) -> {
io.println("数据: " <> 字符串)
actor.continue(状态)
}
超时 -> {
io.println("发生超时")
actor.Stop(process.Normal)
}
}
})
}
GenServer模式为有状态并发进程提供结构化消息处理。
监控器和故障容忍
监控器监控子进程并在失败时重启,实现故障容忍系统。
导入 gleam/otp/supervisor
导入 gleam/erlang/process
// 简单工作者
公共 函数 工作者() -> 结果(process.Subject(消息), actor.启动错误) {
actor.start(0, 函数(消息, 状态) {
情况 消息 {
请求 -> {
io.println("工作者存活")
actor.continue(状态)
}
停止 -> actor.Stop(process.Normal)
_ -> actor.continue(状态)
}
})
}
// 监控器规范
公共 函数 启动监控器() ->
结果(process.Subject(supervisor.消息), supervisor.启动错误) {
supervisor.start(函数(子进程) {
子进程
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
})
}
// 带命名工作者的监控器
公共 类型 工作者名称 {
计数器工作者
缓存工作者
数据库工作者
}
公共 函数 启动命名监控器() ->
结果(process.Subject(supervisor.消息), supervisor.启动错误) {
supervisor.start(函数(子进程) {
子进程
|> supervisor.add(supervisor.worker_spec(
启动: 函数(_) { 启动计数器() },
重启: supervisor.RestartForever,
))
|> supervisor.add(supervisor.worker_spec(
启动: 函数(_) { 启动缓存(100) },
重启: supervisor.RestartForever,
))
})
}
// 监控器树
公共 函数 启动应用() ->
结果(process.Subject(supervisor.消息), supervisor.启动错误) {
supervisor.start(函数(子进程) {
子进程
// 工作者
|> supervisor.add(supervisor.worker(函数(_) { 启动计数器() }))
|> supervisor.add(supervisor.worker(函数(_) { 启动缓存(100) }))
// 子监控器
|> supervisor.add(supervisor.supervisor(函数(子进程) {
子进程
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
}))
})
}
// 自定义重启策略
公共 函数 启动自定义监控器() ->
结果(process.Subject(supervisor.消息), supervisor.启动错误) {
supervisor.start_spec(supervisor.规格(
参数: 空,
最大频率: 5,
频率周期: 60,
初始化: 函数(子进程) {
子进程
|> supervisor.add(supervisor.worker_spec(
启动: 函数(_) { 工作者() },
重启: supervisor.RestartTransient, // 仅异常退出时重启
))
},
))
}
// 一对一 vs 一对多
公共 函数 一对一监控器() {
// 每个子进程独立重启
supervisor.start(函数(子进程) {
子进程
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
|> supervisor.add(supervisor.worker(函数(_) { 工作者() }))
})
}
// 动态监控器(运行时添加子进程)
公共 类型 动态消息 {
添加工作者(回复到: process.Subject(结果(process.Pid, 字符串)))
移除工作者(pid: process.Pid)
}
监控器通过进程监控和重启提供自动故障恢复和系统弹性。
进程链接和监控
链接和监控使进程能对相关进程的失败做出反应。
导入 gleam/erlang/process
// 进程链接
公共 函数 链接进程() {
让 parent = process.self()
让 child = process.spawn_link(函数() {
io.println("子进程启动")
process.sleep(1000)
io.println("子进程退出")
})
// 父进程链接到子进程 - 将接收退出信号
io.println("父进程等待...")
process.sleep(2000)
}
// 进程监控
公共 函数 监控进程() {
让 monitored = process.spawn(函数() {
io.println("监控进程启动")
process.sleep(1000)
})
让 monitor = process.monitor_process(monitored)
// 等待关闭消息
让 selector = process.new_selector()
|> process.selecting_process_down(monitor, 函数(down) { down })
情况 process.select(selector, 2000) {
确认(down) -> io.println("进程退出")
错误(_) -> io.println("仍在运行")
}
}
// 捕获退出用于监控
公共 函数 陷阱退出() {
process.trap_exits(真)
让 child = process.spawn_link(函数() {
io.println("子进程启动")
恐慌为 "模拟错误"
})
让 selector = process.new_selector()
|> process.selecting_trapped_exits(函数(exit) { exit })
情况 process.select(selector, 2000) {
确认(exit) -> {
io.println("捕获子进程退出")
// 可在此重启子进程
}
错误(_) -> io.println("未收到退出")
}
}
// 监控多个进程
公共 函数 监控池(工作者列表: 列表(process.Pid)) {
让 monitors = 列表.映射(工作者列表, process.monitor_process)
// 处理任何工作者失败
让 selector = 列表.折叠(monitors, process.new_selector(), 函数(sel, mon) {
process.selecting_process_down(sel, mon, 函数(down) { down })
})
情况 process.select(selector, 10000) {
确认(down) -> {
io.println("工作者失败")
// 重启逻辑在此
}
错误(_) -> io.println("所有工作者健康")
}
}
链接和监控使构建故障容忍系统具备适当失败处理。
最佳实践
-
对有状态进程使用GenServer 以利用OTP模式和标准行为
-
将GenServer包装在监控器树中 以启用自动故障恢复
-
保持进程状态最小 以减少内存使用和简化状态管理
-
使用带回复字段的消息类型 用于同步请求-响应模式
-
在接收操作上设置适当超时 以防止无限阻塞
-
监控外部进程而非链接 当你不想一起崩溃时
-
使用描述性消息类型 而非通用元组
-
处理所有消息类型 在循环中以防止意外消息积累
-
为失败设计 假设进程会崩溃并使用监控器
-
保持进程层次简单 有清晰的父子关系
常见陷阱
-
未处理超时情况 在接收操作中导致进程无限挂起
-
忘记回复 在请求-响应模式中导致客户端超时
-
创建过多进程 无理由增加开销无益
-
不使用监控器 失去actor模型的故障容忍优势
-
在消息处理器中阻塞 阻止处理其他消息导致死锁
-
积累未消费消息 在邮箱中导致内存泄漏
-
错误链接进程 导致意外崩溃传播
-
未设置init_timeout 在actor上导致启动延迟崩溃系统
-
使用共享可变状态 破坏actor模型的隔离优势
-
忽略退出信号 当捕获退出时阻止适当清理
何时使用此技能
应用actor于需要隔离状态和基于消息通信的并发操作。
使用GenServer当实现有状态服务如缓存、连接或工作者时。
利用监控器用于任何应在失败时自动重启的进程。
应用进程监控当一个进程需要对另一个的终止做出反应时。
使用进程池用于跨多个并发工作者分发工作。
构建监控器树用于结构化具有多个组件的复杂应用。