Effect-TS模式:资源管理Skill effect-patterns-resource-management

这个技能提供了 8 个精选的 Effect-TS 资源管理模式,用于 Effect-TS 应用中的资源管理。它涵盖了安全括号资源使用、池化资源、创建服务层等最佳实践,帮助开发者构建健壮、可测试和防泄漏的应用。适用于资源管理、Effect-TS 应用的最佳实践和真实世界模式。关键词:Effect-TS, 资源管理, TypeScript, 函数式编程, 异步, 并发, 防泄漏, 模式, 后端开发。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: effect-patterns-resource-management 描述: Effect-TS 资源管理模式。在 Effect-TS 应用中处理资源管理时使用。

Effect-TS 模式:资源管理

本技能提供了 8 个精选的 Effect-TS 资源管理模式。 在处理以下任务时使用此技能:

  • 资源管理
  • Effect-TS 应用中的最佳实践
  • 真实世界模式和解决方案

🟢 初级模式

使用 acquireRelease 安全括号资源使用

规则:acquirerelease 效果之间括号资源的使用。

好例子:

import { Effect, Console } from "effect";

// 一个需要管理的模拟资源
const getDbConnection = Effect.sync(() => ({ id: Math.random() })).pipe(
  Effect.tap(() => Effect.log("连接已获取"))
);

const closeDbConnection = (conn: {
  id: number;
}): Effect.Effect<void, never, never> =>
  Effect.log(`连接 ${conn.id} 已释放`);

// 使用资源的程序
const program = Effect.acquireRelease(
  getDbConnection, // 1. 获取
  (connection) => closeDbConnection(connection) // 2. 清理
).pipe(
  Effect.tap((connection) =>
    Effect.log(`使用连接 ${connection.id} 运行查询...`)
  )
);

Effect.runPromise(Effect.scoped(program));

/*
输出:
连接已获取
使用连接 0.12345... 运行查询...
连接 0.12345... 已释放
*/

解释: 通过使用 Effect.acquireReleasecloseDbConnection 逻辑保证在主要逻辑完成后运行。这创建了一个自包含、防泄漏的工作单元,可以安全地组合到更大的程序中。

反模式:

使用带有 async/await 的标准 try...finally 块。虽然它处理成功和失败情况,但不防中断。如果执行 Promise 的 fiber 被 Effect 的结构化并发中断,finally 块不保证运行,导致资源泄漏。

// 反模式:不防中断
async function getUser() {
  const connection = await getDbConnectionPromise(); // 获取
  try {
    return await useConnectionPromise(connection); // 使用
  } finally {
    // 如果 fiber 被中断,此块可能不会运行!
    await closeConnectionPromise(connection); // 释放
  }
}

原理:

将资源的获取、使用和释放包装在 Effect.acquireRelease 调用中。这确保资源的清理逻辑被执行,无论使用逻辑成功、失败还是被中断。

这个模式是 Effect 中资源安全的基础。它提供了一个可组合且防中断的替代标准 try...finally 块。release 效果保证执行,防止资源泄漏,这在复杂的异步应用中很常见,尤其是在涉及并发任务可取消的场景中。


🟡 中级模式

池化资源以重用

规则: 使用 Pool 来管理可跨操作重用的昂贵资源。

好例子:

import { Effect, Pool, Scope, Duration } from "effect"

// ============================================
// 1. 定义可池化的资源
// ============================================

interface DatabaseConnection {
  readonly id: number
  readonly query: (sql: string) => Effect.Effect<unknown[]>
  readonly close: () => Effect.Effect<void>
}

let connectionId = 0

const createConnection = Effect.gen(function* () {
  const id = ++connectionId
  yield* Effect.log(`创建连接 ${id}`)
  
  // 模拟连接设置时间
  yield* Effect.sleep("100 毫秒")
  
  const connection: DatabaseConnection = {
    id,
    query: (sql) => Effect.gen(function* () {
      yield* Effect.log(`[连接 ${id}] 执行: ${sql}`)
      return [{ result: "数据" }]
    }),
    close: () => Effect.gen(function* () {
      yield* Effect.log(`关闭连接 ${id}`)
    }),
  }
  
  return connection
})

// ============================================
// 2. 创建一个池
// ============================================

const makeConnectionPool = Pool.make({
  acquire: createConnection,
  size: 5,  // 最多 5 个连接
})

// ============================================
// 3. 使用池
// ============================================

const runQuery = (pool: Pool.Pool<DatabaseConnection>, sql: string) =>
  Effect.scoped(
    Effect.gen(function* () {
      // 从池中获取一个连接
      const connection = yield* pool.get
      
      // 使用它
      const results = yield* connection.query(sql)
      
      // 当作用域结束时,连接自动返回到池中
      return results
    })
  )

// ============================================
// 4. 并发运行多个查询
// ============================================

const program = Effect.scoped(
  Effect.gen(function* () {
    const pool = yield* makeConnectionPool
    
    yield* Effect.log("开始并发查询...")
    
    // 使用仅 5 个连接运行 10 个查询
    const queries = Array.from({ length: 10 }, (_, i) =>
      runQuery(pool, `SELECT * FROM users WHERE id = ${i}`)
    )
    
    const results = yield* Effect.all(queries, { concurrency: "unbounded" })
    
    yield* Effect.log(`完成 ${results.length} 个查询`)
    return results
  })
)

Effect.runPromise(program)

原理:

使用 Pool 来管理可重用资源的集合。池自动处理获取、释放和生命周期管理。


创建资源是昂贵的:

  1. 数据库连接 - TCP 握手、认证
  2. HTTP 客户端 - 连接设置、TLS 协商
  3. 工作线程 - 生成开销
  4. 文件句柄 - 系统调用

池化跨多个操作分摊此成本。



从受管理的资源创建服务层

规则: 使用 Layer.scoped 向应用上下文提供受管理的资源。

好例子:

import { Effect, Console } from "effect";

// 1. 定义服务接口
interface DatabaseService {
  readonly query: (sql: string) => Effect.Effect<string[], never, never>;
}

// 2. 使用作用域资源管理定义服务实现
class Database extends Effect.Service<DatabaseService>()("Database", {
  // scoped 属性管理资源生命周期
  scoped: Effect.gen(function* () {
    const id = Math.floor(Math.random() * 1000);

    // 获取连接
    yield* Effect.log(`[池 ${id}] 获取`);

    // 设置清理以在作用域关闭时运行
    yield* Effect.addFinalizer(() => Effect.log(`[池 ${id}] 释放`));

    // 返回服务实现
    return {
      query: (sql: string) =>
        Effect.sync(() => [`来自池 ${id} 的 '${sql}' 的结果`]),
    };
  }),
}) {}

// 3. 在程序中使用服务
const program = Effect.gen(function* () {
  const db = yield* Database;
  const users = yield* db.query("SELECT * FROM users");
  yield* Effect.log(`查询成功: ${users[0]}`);
});

// 4. 使用作用域资源管理运行程序
Effect.runPromise(
  Effect.scoped(program).pipe(Effect.provide(Database.Default))
);

/*
输出:
[池 458] 获取
查询成功:来自池 458 的 'SELECT * FROM users' 的结果
[池 458] 释放
*/

解释: Effect.Service 帮助器创建 Database 类,它既作为服务定义又作为其上下文键(Tag)。Database.Live 层将此服务连接到具体、生命周期管理的实现。当 program 请求 Database 服务时,Effect 运行时使用 Live 层运行一次 acquire 效果,缓存结果 DbPool,并注入它。release 效果在程序完成时自动运行。

反模式:

创建和导出资源的全局单例实例。这紧密耦合您的应用到特定实现,使测试困难,并且不保证优雅关闭。

// 反模式:全局单例
export const dbPool = makeDbPoolSync(); // 急切创建,难以测试/模拟

function someBusinessLogic() {
  // 此函数对全局 dbPool 有隐藏依赖
  return dbPool.query("SELECT * FROM products");
}

原理:

使用 class MyService extends Effect.Service(...) 定义一个服务。使用服务类的 scoped 属性实现服务。此属性应该是一个作用域 Effect(通常来自 Effect.acquireRelease),用于构建和释放底层资源。

这个模式是构建健壮、可测试和防泄漏 Effect 应用的关键。它将受管理的资源提升为可以在应用中任何地方使用的一流服务。Effect.Service 帮助器简化了定义服务接口和上下文键。这种方法将您的业务逻辑与具体实现解耦,因为逻辑仅依赖于抽象服务。Layer 声明性地处理资源的整个生命周期,确保它被延迟获取、安全共享和自动释放。


使用 Layer.merge 组合资源生命周期

规则: 使用 Layer.merge 或通过提供一个层给另一个来组合多个作用域层。

好例子:

import { Effect, Layer, Console } from "effect";

// --- 服务 1: 数据库 ---
interface DatabaseOps {
  query: (sql: string) => Effect.Effect<string, never, never>;
}

class Database extends Effect.Service<DatabaseOps>()("Database", {
  sync: () => ({
    query: (sql: string): Effect.Effect<string, never, never> =>
      Effect.sync(() => `数据库说: ${sql}`),
  }),
}) {}

// --- 服务 2: API 客户端 ---
interface ApiClientOps {
  fetch: (path: string) => Effect.Effect<string, never, never>;
}

class ApiClient extends Effect.Service<ApiClientOps>()("ApiClient", {
  sync: () => ({
    fetch: (path: string): Effect.Effect<string, never, never> =>
      Effect.sync(() => `API 说: ${path}`),
  }),
}) {}

// --- 应用层 ---
// 我们将两个独立层合并为一个。
const AppLayer = Layer.merge(Database.Default, ApiClient.Default);

// 此程序使用两个服务,不了解其实现细节。
const program = Effect.gen(function* () {
  const db = yield* Database;
  const api = yield* ApiClient;

  const dbResult = yield* db.query("SELECT *");
  const apiResult = yield* api.fetch("/users");

  yield* Effect.log(dbResult);
  yield* Effect.log(apiResult);
});

// 将组合层提供给程序。
Effect.runPromise(Effect.provide(program, AppLayer));

/*
输出(注意 LIFO 释放顺序):
数据库池已打开
API 客户端会话已开始
数据库说: SELECT *
API 说: /users
API 客户端会话已结束
数据库池已关闭
*/

解释: 我们定义了两个完全独立的服务,DatabaseApiClient,每个都有自己的资源生命周期。通过使用 Layer.merge 组合它们,我们创建了一个单一的 AppLayer。当 program 运行时,Effect 获取两个层的资源。当 program 完成时,Effect 关闭应用的作用域,以获取顺序的相反顺序释放资源(ApiClient 然后 Database),确保干净和可预测的关闭。

反模式:

手动、命令式的启动和关闭脚本。这种方法脆弱且容易出错。开发人员负责维护正确的初始化顺序,更重要的是,关闭的相反顺序。随着应用增长,这变得不可管理。

// 反模式:手动、脆弱且容易出错
async function main() {
  const db = await initDb(); // 获取 1
  const client = await initApiClient(); // 获取 2

  try {
    await doWork(db, client); // 使用
  } finally {
    // 此顺序容易出错!
    await client.close(); // 释放 2
    await db.close(); // 释放 1
  }
}

原理:

使用函数如 Layer.merge 将多个资源管理 Layer 组合成一个应用层。Effect 运行时将自动构建依赖图,以正确顺序获取资源,并以相反顺序释放它们。

这个模式是定义带 Layer 服务的最终回报。它允许真正的模块化。每个服务可以在自己的文件中定义,在其 Live 层中声明自己的资源需求,完全不了解其他服务。

当您组装最终应用层时,Effect 分析依赖:

  1. 获取顺序: 确保资源以正确顺序获取。例如,Logger 层可能在 Database 层之前初始化,后者使用它进行日志记录。
  2. 释放顺序: 保证资源以获取顺序的完全相反顺序释放。这对于防止关闭错误至关重要,例如 UserRepositoryLogger 已关闭后尝试记录最终消息。

这自动化了应用架构中最复杂和容易出错的部分之一。


处理资源超时

规则: 始终在资源获取上设置超时,以防止无限等待。

好例子:

import { Effect, Duration, Scope } from "effect"

// ============================================
// 1. 定义获取缓慢的资源
// ============================================

interface Connection {
  readonly id: string
  readonly query: (sql: string) => Effect.Effect<unknown>
}

const acquireConnection = Effect.gen(function* () {
  yield* Effect.log("尝试连接...")
  
  // 模拟慢连接
  yield* Effect.sleep("2 秒")
  
  const connection: Connection = {
    id: crypto.randomUUID(),
    query: (sql) => Effect.succeed({ rows: [] }),
  }
  
  yield* Effect.log(`已连接: ${connection.id}`)
  return connection
})

const releaseConnection = (conn: Connection) =>
  Effect.log(`已释放: ${conn.id}`)

// ============================================
// 2. 获取超时
// ============================================

const acquireWithTimeout = acquireConnection.pipe(
  Effect.timeout("1 秒"),
  Effect.catchTag("TimeoutException", () =>
    Effect.fail(new Error("连接超时 - 数据库不可达"))
  )
)

// ============================================
// 3. 使用超时
// ============================================

const queryWithTimeout = (conn: Connection, sql: string) =>
  conn.query(sql).pipe(
    Effect.timeout("5 秒"),
    Effect.catchTag("TimeoutException", () =>
      Effect.fail(new Error(`查询超时: ${sql}`))
    )
  )

// ============================================
// 4. 带有超时的完整资源生命周期
// ============================================

const useConnectionWithTimeouts = Effect.acquireRelease(
  acquireWithTimeout,
  releaseConnection
).pipe(
  Effect.flatMap((conn) =>
    Effect.gen(function* () {
      yield* Effect.log("运行查询...")
      
      // 每个查询有自己的超时
      const result1 = yield* queryWithTimeout(conn, "SELECT 1")
      const result2 = yield* queryWithTimeout(conn, "SELECT 2")
      
      return [result1, result2]
    })
  ),
  Effect.scoped
)

// ============================================
// 5. 整个操作超时
// ============================================

const entireOperationWithTimeout = useConnectionWithTimeouts.pipe(
  Effect.timeout("10 秒"),
  Effect.catchTag("TimeoutException", () =>
    Effect.fail(new Error("整个操作超时"))
  )
)

// ============================================
// 6. 用不同场景运行
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("=== 测试超时 ===")
  
  const result = yield* entireOperationWithTimeout.pipe(
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.logError(`失败: ${error.message}`)
        return []
      })
    )
  )
  
  yield* Effect.log(`结果: ${JSON.stringify(result)}`)
})

Effect.runPromise(program)

原理:

在资源获取和使用上设置超时,以确保您的应用不会因等待不可用资源而挂起。


资源可能变得不可用:

  1. 网络分区 - 无法到达数据库
  2. 池耗尽 - 所有连接都在使用中
  3. 死锁 - 资源被无限期持有
  4. 慢操作 - 查询时间过长

超时提供安全网。



🟠 高级模式

使用 Scope 手动管理生命周期

规则: 使用 Effect.scopeScope.addFinalizer 对资源清理进行细粒度控制。

好例子:

import { Effect, Console } from "effect";

// 模拟复杂文件操作
const openFile = (path: string) =>
  Effect.succeed({ path, handle: Math.random() }).pipe(
    Effect.tap((f) => Effect.log(`已打开 ${f.path}`))
  );
const createTempFile = (path: string) =>
  Effect.succeed({ path: `${path}.tmp`, handle: Math.random() }).pipe(
    Effect.tap((f) => Effect.log(`已创建临时文件 ${f.path}`))
  );
const closeFile = (file: { path: string }) =>
  Effect.sync(() => Effect.log(`已关闭 ${file.path}`));
const deleteFile = (file: { path: string }) =>
  Effect.sync(() => Effect.log(`已删除 ${file.path}`));

// 此程序获取两个资源(一个文件和一个临时文件)
// 并使用 acquireRelease 确保两者都被正确清理。
const program = Effect.gen(function* () {
  const file = yield* Effect.acquireRelease(openFile("data.csv"), (f) =>
    closeFile(f)
  );

  const tempFile = yield* Effect.acquireRelease(
    createTempFile("data.csv"),
    (f) => deleteFile(f)
  );

  yield* Effect.log("...从临时文件写入数据到主文件...");
});

// 使用作用域运行程序
Effect.runPromise(Effect.scoped(program));

/*
输出(注意 LIFO 清理顺序):
已打开 data.csv
已创建临时文件 data.csv.tmp
...从临时文件写入数据到主文件...
已删除 data.csv.tmp
已关闭 data.csv
*/

解释: Effect.scope 创建一个新的 Scope 并将其提供给 program。在 program 内部,我们访问此 Scope 并使用 addFinalizer 在获取每个资源后立即注册清理操作。当 Effect.scope 完成执行 program 时,它关闭作用域,进而以添加顺序的相反顺序执行所有注册的终结器。

反模式:

尝试使用嵌套的 try...finally 块管理多个相互依赖的资源清理。这导致“末日金字塔”,难以阅读,并且在面对中断时不安全。

// 反模式:嵌套、不安全且难以阅读
async function complexOperation() {
  const file = await openFilePromise(); // 获取 1
  try {
    const tempFile = await createTempFilePromise(); // 获取 2
    try {
      await doWorkPromise(file, tempFile); // 使用
    } finally {
      // 中断时此块可能不会运行!
      await deleteFilePromise(tempFile); // 释放 2
    }
  } finally {
    // 中断时此块也可能不会运行!
    await closeFilePromise(file); // 释放 1
  }
}

原理:

对于复杂场景,其中一个资源的生命周期不适合简单的 acquireRelease 模式,使用 Effect.scope 为终结器创建一个边界。在此边界内,您可以访问 Scope 服务并使用 Scope.addFinalizer 手动注册清理操作。

虽然 Effect.acquireReleaseLayer.scoped 对大多数用例足够,但有时您需要更多控制。此模式在以下情况必不可少:

  1. 单个逻辑操作获取多个需要独立清理的资源。
  2. 您正在构建一个自定义、复杂的 Layer,协调几个依赖资源。
  3. 您需要理解驱动所有 Effect 资源管理的基本机制。

通过直接与 Scope 交互,您在 Effect 的声明性、功能性框架内获得了精确的、命令式风格的控制。添加到作用域的终结器保证在作用域关闭时以后进先出(LIFO)顺序运行。


管理分层资源

规则: 使用嵌套作用域管理具有父子依赖关系的资源。

好例子:

import { Effect, Scope, Exit } from "effect"

// ============================================
// 1. 定义分层资源
// ============================================

interface Database {
  readonly name: string
  readonly createConnection: () => Effect.Effect<Connection, never, Scope.Scope>
}

interface Connection {
  readonly id: string
  readonly database: string
  readonly beginTransaction: () => Effect.Effect<Transaction, never, Scope.Scope>
}

interface Transaction {
  readonly id: string
  readonly connectionId: string
  readonly execute: (sql: string) => Effect.Effect<void>
}

// ============================================
// 2. 使用适当的生命周期创建资源
// ============================================

const makeDatabase = (name: string): Effect.Effect<Database, never, Scope.Scope> =>
  Effect.acquireRelease(
    Effect.gen(function* () {
      yield* Effect.log(`打开数据库: ${name}`)
      
      const db: Database = {
        name,
        createConnection: () => makeConnection(name),
      }
      
      return db
    }),
    (db) => Effect.log(`关闭数据库: ${db.name}`)
  )

const makeConnection = (dbName: string): Effect.Effect<Connection, never, Scope.Scope> =>
  Effect.acquireRelease(
    Effect.gen(function* () {
      const id = `conn-${crypto.randomUUID().slice(0, 8)}`
      yield* Effect.log(`  打开连接: ${id} 到 ${dbName}`)
      
      const conn: Connection = {
        id,
        database: dbName,
        beginTransaction: () => makeTransaction(id),
      }
      
      return conn
    }),
    (conn) => Effect.log(`  关闭连接: ${conn.id}`)
  )

const makeTransaction = (connId: string): Effect.Effect<Transaction, never, Scope.Scope> =>
  Effect.acquireRelease(
    Effect.gen(function* () {
      const id = `tx-${crypto.randomUUID().slice(0, 8)}`
      yield* Effect.log(`    开始事务: ${id}`)
      
      const tx: Transaction = {
        id,
        connectionId: connId,
        execute: (sql) => Effect.log(`      [${id}] ${sql}`),
      }
      
      return tx
    }),
    (tx) => Effect.log(`    提交事务: ${tx.id}`)
  )

// ============================================
// 3. 使用分层资源
// ============================================

const program = Effect.scoped(
  Effect.gen(function* () {
    yield* Effect.log("=== 开始分层资源演示 ===
")
    
    // 第 1 层:数据库
    const db = yield* makeDatabase("myapp")
    
    // 第 2 层:连接(数据库的子项)
    const conn = yield* db.createConnection()
    
    // 第 3 层:事务(连接的子项)
    const tx = yield* conn.beginTransaction()
    
    // 使用事务
    yield* tx.execute("INSERT INTO users (name) VALUES ('Alice')")
    yield* tx.execute("INSERT INTO users (name) VALUES ('Bob')")
    
    yield* Effect.log("
=== 工作完成,释放资源 ===
")
    
    // 资源以相反顺序释放:
    // 1. 事务提交
    // 2. 连接关闭
    // 3. 数据库关闭
  })
)

Effect.runPromise(program)

// ============================================
// 4. 同一级别的多个子项
// ============================================

const multipleConnections = Effect.scoped(
  Effect.gen(function* () {
    const db = yield* makeDatabase("myapp")
    
    // 创建多个连接
    const conn1 = yield* db.createConnection()
    const conn2 = yield* db.createConnection()
    
    // 每个连接可以有事务
    const tx1 = yield* conn1.beginTransaction()
    const tx2 = yield* conn2.beginTransaction()
    
    // 使用两个事务
    yield* Effect.all([
      tx1.execute("UPDATE table1 SET x = 1"),
      tx2.execute("UPDATE table2 SET y = 2"),
    ])
    
    // 所有资源以适当顺序释放
  })
)

原理:

使用嵌套 Scope 管理分层资源,其中子资源依赖于其父项并且必须首先释放。


资源通常有依赖关系:

  1. 数据库 → 连接 → 事务 - 事务需要连接
  2. 服务器 → 路由 → 处理器 - 处理器需要服务器上下文
  3. 文件 → 阅读器 → 解析器 - 解析器需要阅读器

释放顺序很重要:子项在父项之前。



为作用域资源创建受管理的运行时

规则: 为作用域资源创建受管理的运行时。

好例子:

import { Effect, Layer } from "effect";

class DatabasePool extends Effect.Service<DatabasePool>()("DbPool", {
  effect: Effect.gen(function* () {
    yield* Effect.log("获取池");
    return {
      query: () => Effect.succeed("结果"),
    };
  }),
}) {}

// 创建一个使用 DatabasePool 服务的程序
const program = Effect.gen(function* () {
  const db = yield* DatabasePool;
  yield* Effect.log("使用数据库");
  yield* db.query();
});

// 使用服务实现运行程序
Effect.runPromise(
  program.pipe(Effect.provide(DatabasePool.Default), Effect.scoped)
);

解释:
Layer.launch 确保资源被安全获取和释放,即使在错误或中断事件中。

反模式:

不要将 Layer.toRuntime 与包含作用域资源的层一起使用。这将获取资源,但运行时没有释放它的机制,导致资源泄漏。

原理:

对于管理需要显式清理的资源(例如,数据库连接)的服务,在 Layer 中使用 Layer.scoped 定义它们。然后,使用 Layer.launch 将此层提供给您的应用。

Layer.launch 设计用于资源安全。它获取所有资源,将它们提供给您的效果,并且——关键地——保证所有注册的终结器在完成或中断时执行。