名称: effect-patterns-resource-management 描述: Effect-TS 资源管理模式。在 Effect-TS 应用中处理资源管理时使用。
Effect-TS 模式:资源管理
本技能提供了 8 个精选的 Effect-TS 资源管理模式。 在处理以下任务时使用此技能:
- 资源管理
- Effect-TS 应用中的最佳实践
- 真实世界模式和解决方案
🟢 初级模式
使用 acquireRelease 安全括号资源使用
规则: 在 acquire 和 release 效果之间括号资源的使用。
好例子:
import { Effect, Console } from "effect";
// 一个需要管理的模拟资源
const getDbConnection = Effect.sync(() => ({ id: Math.random() })).pipe(
Effect.tap(() => Effect.log("连接已获取"))
);
const closeDbConnection = (conn: {
id: number;
}): Effect.Effect<void, never, never> =>
Effect.log(`连接 ${conn.id} 已释放`);
// 使用资源的程序
const program = Effect.acquireRelease(
getDbConnection, // 1. 获取
(connection) => closeDbConnection(connection) // 2. 清理
).pipe(
Effect.tap((connection) =>
Effect.log(`使用连接 ${connection.id} 运行查询...`)
)
);
Effect.runPromise(Effect.scoped(program));
/*
输出:
连接已获取
使用连接 0.12345... 运行查询...
连接 0.12345... 已释放
*/
解释:
通过使用 Effect.acquireRelease,closeDbConnection 逻辑保证在主要逻辑完成后运行。这创建了一个自包含、防泄漏的工作单元,可以安全地组合到更大的程序中。
反模式:
使用带有 async/await 的标准 try...finally 块。虽然它处理成功和失败情况,但不防中断。如果执行 Promise 的 fiber 被 Effect 的结构化并发中断,finally 块不保证运行,导致资源泄漏。
// 反模式:不防中断
async function getUser() {
const connection = await getDbConnectionPromise(); // 获取
try {
return await useConnectionPromise(connection); // 使用
} finally {
// 如果 fiber 被中断,此块可能不会运行!
await closeConnectionPromise(connection); // 释放
}
}
原理:
将资源的获取、使用和释放包装在 Effect.acquireRelease 调用中。这确保资源的清理逻辑被执行,无论使用逻辑成功、失败还是被中断。
这个模式是 Effect 中资源安全的基础。它提供了一个可组合且防中断的替代标准 try...finally 块。release 效果保证执行,防止资源泄漏,这在复杂的异步应用中很常见,尤其是在涉及并发任务可取消的场景中。
🟡 中级模式
池化资源以重用
规则: 使用 Pool 来管理可跨操作重用的昂贵资源。
好例子:
import { Effect, Pool, Scope, Duration } from "effect"
// ============================================
// 1. 定义可池化的资源
// ============================================
interface DatabaseConnection {
readonly id: number
readonly query: (sql: string) => Effect.Effect<unknown[]>
readonly close: () => Effect.Effect<void>
}
let connectionId = 0
const createConnection = Effect.gen(function* () {
const id = ++connectionId
yield* Effect.log(`创建连接 ${id}`)
// 模拟连接设置时间
yield* Effect.sleep("100 毫秒")
const connection: DatabaseConnection = {
id,
query: (sql) => Effect.gen(function* () {
yield* Effect.log(`[连接 ${id}] 执行: ${sql}`)
return [{ result: "数据" }]
}),
close: () => Effect.gen(function* () {
yield* Effect.log(`关闭连接 ${id}`)
}),
}
return connection
})
// ============================================
// 2. 创建一个池
// ============================================
const makeConnectionPool = Pool.make({
acquire: createConnection,
size: 5, // 最多 5 个连接
})
// ============================================
// 3. 使用池
// ============================================
const runQuery = (pool: Pool.Pool<DatabaseConnection>, sql: string) =>
Effect.scoped(
Effect.gen(function* () {
// 从池中获取一个连接
const connection = yield* pool.get
// 使用它
const results = yield* connection.query(sql)
// 当作用域结束时,连接自动返回到池中
return results
})
)
// ============================================
// 4. 并发运行多个查询
// ============================================
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* makeConnectionPool
yield* Effect.log("开始并发查询...")
// 使用仅 5 个连接运行 10 个查询
const queries = Array.from({ length: 10 }, (_, i) =>
runQuery(pool, `SELECT * FROM users WHERE id = ${i}`)
)
const results = yield* Effect.all(queries, { concurrency: "unbounded" })
yield* Effect.log(`完成 ${results.length} 个查询`)
return results
})
)
Effect.runPromise(program)
原理:
使用 Pool 来管理可重用资源的集合。池自动处理获取、释放和生命周期管理。
创建资源是昂贵的:
- 数据库连接 - TCP 握手、认证
- HTTP 客户端 - 连接设置、TLS 协商
- 工作线程 - 生成开销
- 文件句柄 - 系统调用
池化跨多个操作分摊此成本。
从受管理的资源创建服务层
规则: 使用 Layer.scoped 向应用上下文提供受管理的资源。
好例子:
import { Effect, Console } from "effect";
// 1. 定义服务接口
interface DatabaseService {
readonly query: (sql: string) => Effect.Effect<string[], never, never>;
}
// 2. 使用作用域资源管理定义服务实现
class Database extends Effect.Service<DatabaseService>()("Database", {
// scoped 属性管理资源生命周期
scoped: Effect.gen(function* () {
const id = Math.floor(Math.random() * 1000);
// 获取连接
yield* Effect.log(`[池 ${id}] 获取`);
// 设置清理以在作用域关闭时运行
yield* Effect.addFinalizer(() => Effect.log(`[池 ${id}] 释放`));
// 返回服务实现
return {
query: (sql: string) =>
Effect.sync(() => [`来自池 ${id} 的 '${sql}' 的结果`]),
};
}),
}) {}
// 3. 在程序中使用服务
const program = Effect.gen(function* () {
const db = yield* Database;
const users = yield* db.query("SELECT * FROM users");
yield* Effect.log(`查询成功: ${users[0]}`);
});
// 4. 使用作用域资源管理运行程序
Effect.runPromise(
Effect.scoped(program).pipe(Effect.provide(Database.Default))
);
/*
输出:
[池 458] 获取
查询成功:来自池 458 的 'SELECT * FROM users' 的结果
[池 458] 释放
*/
解释:
Effect.Service 帮助器创建 Database 类,它既作为服务定义又作为其上下文键(Tag)。Database.Live 层将此服务连接到具体、生命周期管理的实现。当 program 请求 Database 服务时,Effect 运行时使用 Live 层运行一次 acquire 效果,缓存结果 DbPool,并注入它。release 效果在程序完成时自动运行。
反模式:
创建和导出资源的全局单例实例。这紧密耦合您的应用到特定实现,使测试困难,并且不保证优雅关闭。
// 反模式:全局单例
export const dbPool = makeDbPoolSync(); // 急切创建,难以测试/模拟
function someBusinessLogic() {
// 此函数对全局 dbPool 有隐藏依赖
return dbPool.query("SELECT * FROM products");
}
原理:
使用 class MyService extends Effect.Service(...) 定义一个服务。使用服务类的 scoped 属性实现服务。此属性应该是一个作用域 Effect(通常来自 Effect.acquireRelease),用于构建和释放底层资源。
这个模式是构建健壮、可测试和防泄漏 Effect 应用的关键。它将受管理的资源提升为可以在应用中任何地方使用的一流服务。Effect.Service 帮助器简化了定义服务接口和上下文键。这种方法将您的业务逻辑与具体实现解耦,因为逻辑仅依赖于抽象服务。Layer 声明性地处理资源的整个生命周期,确保它被延迟获取、安全共享和自动释放。
使用 Layer.merge 组合资源生命周期
规则: 使用 Layer.merge 或通过提供一个层给另一个来组合多个作用域层。
好例子:
import { Effect, Layer, Console } from "effect";
// --- 服务 1: 数据库 ---
interface DatabaseOps {
query: (sql: string) => Effect.Effect<string, never, never>;
}
class Database extends Effect.Service<DatabaseOps>()("Database", {
sync: () => ({
query: (sql: string): Effect.Effect<string, never, never> =>
Effect.sync(() => `数据库说: ${sql}`),
}),
}) {}
// --- 服务 2: API 客户端 ---
interface ApiClientOps {
fetch: (path: string) => Effect.Effect<string, never, never>;
}
class ApiClient extends Effect.Service<ApiClientOps>()("ApiClient", {
sync: () => ({
fetch: (path: string): Effect.Effect<string, never, never> =>
Effect.sync(() => `API 说: ${path}`),
}),
}) {}
// --- 应用层 ---
// 我们将两个独立层合并为一个。
const AppLayer = Layer.merge(Database.Default, ApiClient.Default);
// 此程序使用两个服务,不了解其实现细节。
const program = Effect.gen(function* () {
const db = yield* Database;
const api = yield* ApiClient;
const dbResult = yield* db.query("SELECT *");
const apiResult = yield* api.fetch("/users");
yield* Effect.log(dbResult);
yield* Effect.log(apiResult);
});
// 将组合层提供给程序。
Effect.runPromise(Effect.provide(program, AppLayer));
/*
输出(注意 LIFO 释放顺序):
数据库池已打开
API 客户端会话已开始
数据库说: SELECT *
API 说: /users
API 客户端会话已结束
数据库池已关闭
*/
解释:
我们定义了两个完全独立的服务,Database 和 ApiClient,每个都有自己的资源生命周期。通过使用 Layer.merge 组合它们,我们创建了一个单一的 AppLayer。当 program 运行时,Effect 获取两个层的资源。当 program 完成时,Effect 关闭应用的作用域,以获取顺序的相反顺序释放资源(ApiClient 然后 Database),确保干净和可预测的关闭。
反模式:
手动、命令式的启动和关闭脚本。这种方法脆弱且容易出错。开发人员负责维护正确的初始化顺序,更重要的是,关闭的相反顺序。随着应用增长,这变得不可管理。
// 反模式:手动、脆弱且容易出错
async function main() {
const db = await initDb(); // 获取 1
const client = await initApiClient(); // 获取 2
try {
await doWork(db, client); // 使用
} finally {
// 此顺序容易出错!
await client.close(); // 释放 2
await db.close(); // 释放 1
}
}
原理:
使用函数如 Layer.merge 将多个资源管理 Layer 组合成一个应用层。Effect 运行时将自动构建依赖图,以正确顺序获取资源,并以相反顺序释放它们。
这个模式是定义带 Layer 服务的最终回报。它允许真正的模块化。每个服务可以在自己的文件中定义,在其 Live 层中声明自己的资源需求,完全不了解其他服务。
当您组装最终应用层时,Effect 分析依赖:
- 获取顺序: 确保资源以正确顺序获取。例如,
Logger层可能在Database层之前初始化,后者使用它进行日志记录。 - 释放顺序: 保证资源以获取顺序的完全相反顺序释放。这对于防止关闭错误至关重要,例如
UserRepository在Logger已关闭后尝试记录最终消息。
这自动化了应用架构中最复杂和容易出错的部分之一。
处理资源超时
规则: 始终在资源获取上设置超时,以防止无限等待。
好例子:
import { Effect, Duration, Scope } from "effect"
// ============================================
// 1. 定义获取缓慢的资源
// ============================================
interface Connection {
readonly id: string
readonly query: (sql: string) => Effect.Effect<unknown>
}
const acquireConnection = Effect.gen(function* () {
yield* Effect.log("尝试连接...")
// 模拟慢连接
yield* Effect.sleep("2 秒")
const connection: Connection = {
id: crypto.randomUUID(),
query: (sql) => Effect.succeed({ rows: [] }),
}
yield* Effect.log(`已连接: ${connection.id}`)
return connection
})
const releaseConnection = (conn: Connection) =>
Effect.log(`已释放: ${conn.id}`)
// ============================================
// 2. 获取超时
// ============================================
const acquireWithTimeout = acquireConnection.pipe(
Effect.timeout("1 秒"),
Effect.catchTag("TimeoutException", () =>
Effect.fail(new Error("连接超时 - 数据库不可达"))
)
)
// ============================================
// 3. 使用超时
// ============================================
const queryWithTimeout = (conn: Connection, sql: string) =>
conn.query(sql).pipe(
Effect.timeout("5 秒"),
Effect.catchTag("TimeoutException", () =>
Effect.fail(new Error(`查询超时: ${sql}`))
)
)
// ============================================
// 4. 带有超时的完整资源生命周期
// ============================================
const useConnectionWithTimeouts = Effect.acquireRelease(
acquireWithTimeout,
releaseConnection
).pipe(
Effect.flatMap((conn) =>
Effect.gen(function* () {
yield* Effect.log("运行查询...")
// 每个查询有自己的超时
const result1 = yield* queryWithTimeout(conn, "SELECT 1")
const result2 = yield* queryWithTimeout(conn, "SELECT 2")
return [result1, result2]
})
),
Effect.scoped
)
// ============================================
// 5. 整个操作超时
// ============================================
const entireOperationWithTimeout = useConnectionWithTimeouts.pipe(
Effect.timeout("10 秒"),
Effect.catchTag("TimeoutException", () =>
Effect.fail(new Error("整个操作超时"))
)
)
// ============================================
// 6. 用不同场景运行
// ============================================
const program = Effect.gen(function* () {
yield* Effect.log("=== 测试超时 ===")
const result = yield* entireOperationWithTimeout.pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.logError(`失败: ${error.message}`)
return []
})
)
)
yield* Effect.log(`结果: ${JSON.stringify(result)}`)
})
Effect.runPromise(program)
原理:
在资源获取和使用上设置超时,以确保您的应用不会因等待不可用资源而挂起。
资源可能变得不可用:
- 网络分区 - 无法到达数据库
- 池耗尽 - 所有连接都在使用中
- 死锁 - 资源被无限期持有
- 慢操作 - 查询时间过长
超时提供安全网。
🟠 高级模式
使用 Scope 手动管理生命周期
规则: 使用 Effect.scope 和 Scope.addFinalizer 对资源清理进行细粒度控制。
好例子:
import { Effect, Console } from "effect";
// 模拟复杂文件操作
const openFile = (path: string) =>
Effect.succeed({ path, handle: Math.random() }).pipe(
Effect.tap((f) => Effect.log(`已打开 ${f.path}`))
);
const createTempFile = (path: string) =>
Effect.succeed({ path: `${path}.tmp`, handle: Math.random() }).pipe(
Effect.tap((f) => Effect.log(`已创建临时文件 ${f.path}`))
);
const closeFile = (file: { path: string }) =>
Effect.sync(() => Effect.log(`已关闭 ${file.path}`));
const deleteFile = (file: { path: string }) =>
Effect.sync(() => Effect.log(`已删除 ${file.path}`));
// 此程序获取两个资源(一个文件和一个临时文件)
// 并使用 acquireRelease 确保两者都被正确清理。
const program = Effect.gen(function* () {
const file = yield* Effect.acquireRelease(openFile("data.csv"), (f) =>
closeFile(f)
);
const tempFile = yield* Effect.acquireRelease(
createTempFile("data.csv"),
(f) => deleteFile(f)
);
yield* Effect.log("...从临时文件写入数据到主文件...");
});
// 使用作用域运行程序
Effect.runPromise(Effect.scoped(program));
/*
输出(注意 LIFO 清理顺序):
已打开 data.csv
已创建临时文件 data.csv.tmp
...从临时文件写入数据到主文件...
已删除 data.csv.tmp
已关闭 data.csv
*/
解释:
Effect.scope 创建一个新的 Scope 并将其提供给 program。在 program 内部,我们访问此 Scope 并使用 addFinalizer 在获取每个资源后立即注册清理操作。当 Effect.scope 完成执行 program 时,它关闭作用域,进而以添加顺序的相反顺序执行所有注册的终结器。
反模式:
尝试使用嵌套的 try...finally 块管理多个相互依赖的资源清理。这导致“末日金字塔”,难以阅读,并且在面对中断时不安全。
// 反模式:嵌套、不安全且难以阅读
async function complexOperation() {
const file = await openFilePromise(); // 获取 1
try {
const tempFile = await createTempFilePromise(); // 获取 2
try {
await doWorkPromise(file, tempFile); // 使用
} finally {
// 中断时此块可能不会运行!
await deleteFilePromise(tempFile); // 释放 2
}
} finally {
// 中断时此块也可能不会运行!
await closeFilePromise(file); // 释放 1
}
}
原理:
对于复杂场景,其中一个资源的生命周期不适合简单的 acquireRelease 模式,使用 Effect.scope 为终结器创建一个边界。在此边界内,您可以访问 Scope 服务并使用 Scope.addFinalizer 手动注册清理操作。
虽然 Effect.acquireRelease 和 Layer.scoped 对大多数用例足够,但有时您需要更多控制。此模式在以下情况必不可少:
- 单个逻辑操作获取多个需要独立清理的资源。
- 您正在构建一个自定义、复杂的
Layer,协调几个依赖资源。 - 您需要理解驱动所有 Effect 资源管理的基本机制。
通过直接与 Scope 交互,您在 Effect 的声明性、功能性框架内获得了精确的、命令式风格的控制。添加到作用域的终结器保证在作用域关闭时以后进先出(LIFO)顺序运行。
管理分层资源
规则: 使用嵌套作用域管理具有父子依赖关系的资源。
好例子:
import { Effect, Scope, Exit } from "effect"
// ============================================
// 1. 定义分层资源
// ============================================
interface Database {
readonly name: string
readonly createConnection: () => Effect.Effect<Connection, never, Scope.Scope>
}
interface Connection {
readonly id: string
readonly database: string
readonly beginTransaction: () => Effect.Effect<Transaction, never, Scope.Scope>
}
interface Transaction {
readonly id: string
readonly connectionId: string
readonly execute: (sql: string) => Effect.Effect<void>
}
// ============================================
// 2. 使用适当的生命周期创建资源
// ============================================
const makeDatabase = (name: string): Effect.Effect<Database, never, Scope.Scope> =>
Effect.acquireRelease(
Effect.gen(function* () {
yield* Effect.log(`打开数据库: ${name}`)
const db: Database = {
name,
createConnection: () => makeConnection(name),
}
return db
}),
(db) => Effect.log(`关闭数据库: ${db.name}`)
)
const makeConnection = (dbName: string): Effect.Effect<Connection, never, Scope.Scope> =>
Effect.acquireRelease(
Effect.gen(function* () {
const id = `conn-${crypto.randomUUID().slice(0, 8)}`
yield* Effect.log(` 打开连接: ${id} 到 ${dbName}`)
const conn: Connection = {
id,
database: dbName,
beginTransaction: () => makeTransaction(id),
}
return conn
}),
(conn) => Effect.log(` 关闭连接: ${conn.id}`)
)
const makeTransaction = (connId: string): Effect.Effect<Transaction, never, Scope.Scope> =>
Effect.acquireRelease(
Effect.gen(function* () {
const id = `tx-${crypto.randomUUID().slice(0, 8)}`
yield* Effect.log(` 开始事务: ${id}`)
const tx: Transaction = {
id,
connectionId: connId,
execute: (sql) => Effect.log(` [${id}] ${sql}`),
}
return tx
}),
(tx) => Effect.log(` 提交事务: ${tx.id}`)
)
// ============================================
// 3. 使用分层资源
// ============================================
const program = Effect.scoped(
Effect.gen(function* () {
yield* Effect.log("=== 开始分层资源演示 ===
")
// 第 1 层:数据库
const db = yield* makeDatabase("myapp")
// 第 2 层:连接(数据库的子项)
const conn = yield* db.createConnection()
// 第 3 层:事务(连接的子项)
const tx = yield* conn.beginTransaction()
// 使用事务
yield* tx.execute("INSERT INTO users (name) VALUES ('Alice')")
yield* tx.execute("INSERT INTO users (name) VALUES ('Bob')")
yield* Effect.log("
=== 工作完成,释放资源 ===
")
// 资源以相反顺序释放:
// 1. 事务提交
// 2. 连接关闭
// 3. 数据库关闭
})
)
Effect.runPromise(program)
// ============================================
// 4. 同一级别的多个子项
// ============================================
const multipleConnections = Effect.scoped(
Effect.gen(function* () {
const db = yield* makeDatabase("myapp")
// 创建多个连接
const conn1 = yield* db.createConnection()
const conn2 = yield* db.createConnection()
// 每个连接可以有事务
const tx1 = yield* conn1.beginTransaction()
const tx2 = yield* conn2.beginTransaction()
// 使用两个事务
yield* Effect.all([
tx1.execute("UPDATE table1 SET x = 1"),
tx2.execute("UPDATE table2 SET y = 2"),
])
// 所有资源以适当顺序释放
})
)
原理:
使用嵌套 Scope 管理分层资源,其中子资源依赖于其父项并且必须首先释放。
资源通常有依赖关系:
- 数据库 → 连接 → 事务 - 事务需要连接
- 服务器 → 路由 → 处理器 - 处理器需要服务器上下文
- 文件 → 阅读器 → 解析器 - 解析器需要阅读器
释放顺序很重要:子项在父项之前。
为作用域资源创建受管理的运行时
规则: 为作用域资源创建受管理的运行时。
好例子:
import { Effect, Layer } from "effect";
class DatabasePool extends Effect.Service<DatabasePool>()("DbPool", {
effect: Effect.gen(function* () {
yield* Effect.log("获取池");
return {
query: () => Effect.succeed("结果"),
};
}),
}) {}
// 创建一个使用 DatabasePool 服务的程序
const program = Effect.gen(function* () {
const db = yield* DatabasePool;
yield* Effect.log("使用数据库");
yield* db.query();
});
// 使用服务实现运行程序
Effect.runPromise(
program.pipe(Effect.provide(DatabasePool.Default), Effect.scoped)
);
解释:
Layer.launch 确保资源被安全获取和释放,即使在错误或中断事件中。
反模式:
不要将 Layer.toRuntime 与包含作用域资源的层一起使用。这将获取资源,但运行时没有释放它的机制,导致资源泄漏。
原理:
对于管理需要显式清理的资源(例如,数据库连接)的服务,在 Layer 中使用 Layer.scoped 定义它们。然后,使用 Layer.launch 将此层提供给您的应用。
Layer.launch 设计用于资源安全。它获取所有资源,将它们提供给您的效果,并且——关键地——保证所有注册的终结器在完成或中断时执行。