Effect-TS流模式入门Skill effect-patterns-streams-getting-started

这个技能提供了Effect-TS中流处理的入门模式,帮助开发者高效处理数据序列、异步操作和大数据集,适用于函数式编程和TypeScript开发。关键词:Effect-TS, 流处理, TypeScript, 函数式编程, 异步数据流, 数据管道。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: effect-patterns-streams-getting-started 描述: Effect-TS 模式用于流入门。在Effect-TS应用程序中处理流入门时使用。

Effect-TS 模式:流入门

此技能提供了4个精选的Effect-TS模式,用于流入门。 在处理以下相关任务时使用此技能:

  • 流入门
  • Effect-TS应用程序中的最佳实践
  • 实际模式和解决方案

🟢 初学者模式

您的第一个流

规则: 使用流来懒惰且高效地处理数据序列。

好例子:

import { Effect, Stream } from "effect"

// 从显式值创建流
const numbers = Stream.make(1, 2, 3, 4, 5)

// 从数组创建流
const fromArray = Stream.fromIterable([10, 20, 30])

// 创建单值流
const single = Stream.succeed("hello")

// 转换并运行流
const program = numbers.pipe(
  Stream.map((n) => n * 2),           // 将每个数字加倍
  Stream.filter((n) => n > 4),        // 只保留大于4的
  Stream.runCollect                    // 收集结果
)

Effect.runPromise(program).then((chunk) => {
  console.log([...chunk])  // [6, 8, 10]
})

反模式:

当需要懒惰处理或异步操作时,不要使用常规数组:

// 反模式:急切处理,所有内容都在内存中
const numbers = [1, 2, 3, 4, 5]
const doubled = numbers.map((n) => n * 2)
const filtered = doubled.filter((n) => n > 4)

这会立即将所有内容加载到内存中。使用流的时机:

  • 数据量大或可能是无限的
  • 数据异步到达
  • 需要背压或资源管理

原理:

流是一个懒惰的值序列,可以一次处理一个值。使用 Stream.makeStream.fromIterableStream.succeed 创建流。


流是Effect处理数据序列的答案。与一次性将所有值保存在内存中的数组不同,流按需生成值。这使得它们适用于:

  1. 大数据集 - 处理数百万条记录,无需将所有内容加载到内存中
  2. 异步数据 - 处理随时间到达的数据(文件、API、事件)
  3. 可组合管道 - 链式转换,逐元素工作


流 vs 效果 - 何时使用哪个

规则: 使用效果处理单个值,使用流处理值序列。

好例子:

import { Effect, Stream } from "effect"

// ============================================
// 效果:单结果操作
// ============================================

// 获取一个用户 - 返回 Effect<User>
const fetchUser = (id: string) =>
  Effect.tryPromise(() =>
    fetch(`/api/users/${id}`).then((r) => r.json())
  )

// 读取整个配置 - 返回 Effect<Config>
const loadConfig = Effect.tryPromise(() =>
  fetch("/config.json").then((r) => r.json())
)

// ============================================
// 流:多值操作
// ============================================

// 逐行处理文件 - 返回 Stream<string>
const fileLines = Stream.fromIterable([
  "第一行",
  "第二行",
  "第三行",
])

// 随时间生成事件 - 返回 Stream<Event>
const events = Stream.make(
  { type: "点击", x: 10 },
  { type: "点击", x: 20 },
  { type: "滚动", y: 100 },
)

// ============================================
// 在它们之间转换
// ============================================

// 效果 → 流(单个值变为单元素流)
const effectToStream = Stream.fromEffect(fetchUser("123"))

// 流 → 效果(收集所有值到数组)
const streamToEffect = Stream.runCollect(fileLines)

// 流 → 效果(为副作用处理每个值)
const processAll = fileLines.pipe(
  Stream.runForEach((line) => Effect.log(`处理中: ${line}`))
)

// ============================================
// 决策指南
// ============================================

// 使用效果当:
// - 获取单个资源
// - 计算单个结果
// - 执行一个操作

// 使用流当:
// - 逐行读取文件
// - 处理分页API结果
// - 处理实时事件
// - 处理大数据集
// - 构建数据管道

原理:

当您的操作产生单个结果时,使用 效果。当您的操作随时间产生多个值时,使用


效果和流都是懒惰且可组合的,但它们服务于不同目的:

方面 效果
产生 一个值 零个或多个值
内存 保存一个结果 增量处理
用例 API调用、数据库查询 文件行、事件、批次


运行和收集流结果

规则: 根据您从结果中需要的选择正确的 Stream.run* 方法。

好例子:

import { Effect, Stream, Option } from "effect"

const numbers = Stream.make(1, 2, 3, 4, 5)

// ============================================
// runCollect - 将所有结果作为 Chunk 获取
// ============================================

const collectAll = numbers.pipe(
  Stream.map((n) => n * 10),
  Stream.runCollect
)

Effect.runPromise(collectAll).then((chunk) => {
  console.log([...chunk])  // [10, 20, 30, 40, 50]
})

// ============================================
// runForEach - 处理每个项目
// ============================================

const processEach = numbers.pipe(
  Stream.runForEach((n) =>
    Effect.log(`处理中: ${n}`)
  )
)

Effect.runPromise(processEach)
// 日志: 处理中: 1, 处理中: 2, 等等。

// ============================================
// runDrain - 仅运行为副作用
// ============================================

const withSideEffects = numbers.pipe(
  Stream.tap((n) => Effect.log(`看到: ${n}`)),
  Stream.runDrain  // 丢弃值,仅运行
)

// ============================================
// runHead - 仅获取第一个值
// ============================================

const getFirst = numbers.pipe(
  Stream.runHead
)

Effect.runPromise(getFirst).then((option) => {
  if (Option.isSome(option)) {
    console.log(`第一个: ${option.value}`)  // 第一个: 1
  }
})

// ============================================
// runLast - 仅获取最后一个值
// ============================================

const getLast = numbers.pipe(
  Stream.runLast
)

Effect.runPromise(getLast).then((option) => {
  if (Option.isSome(option)) {
    console.log(`最后一个: ${option.value}`)  // 最后一个: 5
  }
})

// ============================================
// runFold - 累积为单个结果
// ============================================

const sum = numbers.pipe(
  Stream.runFold(0, (acc, n) => acc + n)
)

Effect.runPromise(sum).then((total) => {
  console.log(`总和: ${total}`)  // 总和: 15
})

// ============================================
// runCount - 计数元素
// ============================================

const count = numbers.pipe(Stream.runCount)

Effect.runPromise(count).then((n) => {
  console.log(`计数: ${n}`)  // 计数: 5
})

原理:

流是懒惰的 - 直到您运行它们才会发生任何事。根据您需要的选择运行方法:所有结果、每个项目的效果,或仅完成。


效果提供了几种消费流的方式,每种都针对不同的用例进行了优化:

方法 返回 使用时
runCollect Chunk<A> 需要内存中的所有结果
runForEach void 为副作用处理每个项目
runDrain void 运行为副作用,忽略值
runHead Option<A> 仅需要第一个值
runLast Option<A> 仅需要最后一个值
runFold S 累积为单个结果


获取和丢弃流元素

规则: 使用 take/drop 控制流大小,使用 takeWhile/dropWhile 进行条件限制。

好例子:

import { Effect, Stream } from "effect"

const numbers = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// ============================================
// take - 获取前 N 个元素
// ============================================

const firstThree = numbers.pipe(
  Stream.take(3),
  Stream.runCollect
)

Effect.runPromise(firstThree).then((chunk) => {
  console.log([...chunk])  // [1, 2, 3]
})

// ============================================
// drop - 跳过前 N 个元素
// ============================================

const skipThree = numbers.pipe(
  Stream.drop(3),
  Stream.runCollect
)

Effect.runPromise(skipThree).then((chunk) => {
  console.log([...chunk])  // [4, 5, 6, 7, 8, 9, 10]
})

// ============================================
// 组合用于分页(skip + limit)
// ============================================

const page2 = numbers.pipe(
  Stream.drop(3),   // 跳过第一页
  Stream.take(3),   // 获取第二页
  Stream.runCollect
)

Effect.runPromise(page2).then((chunk) => {
  console.log([...chunk])  // [4, 5, 6]
})

// ============================================
// takeWhile - 获取当条件为真时
// ============================================

const untilFive = numbers.pipe(
  Stream.takeWhile((n) => n < 5),
  Stream.runCollect
)

Effect.runPromise(untilFive).then((chunk) => {
  console.log([...chunk])  // [1, 2, 3, 4]
})

// ============================================
// dropWhile - 跳过当条件为真时
// ============================================

const afterFive = numbers.pipe(
  Stream.dropWhile((n) => n < 5),
  Stream.runCollect
)

Effect.runPromise(afterFive).then((chunk) => {
  console.log([...chunk])  // [5, 6, 7, 8, 9, 10]
})

// ============================================
// takeUntil - 获取直到条件变为真
// ============================================

const untilSix = numbers.pipe(
  Stream.takeUntil((n) => n === 6),
  Stream.runCollect
)

Effect.runPromise(untilSix).then((chunk) => {
  console.log([...chunk])  // [1, 2, 3, 4, 5, 6]
})

// ============================================
// 实际:处理带标题的文件
// ============================================

const fileLines = Stream.make(
  "# 标题",
  "# 注释",
  "数据1",
  "数据2",
  "数据3"
)

const dataOnly = fileLines.pipe(
  Stream.dropWhile((line) => line.startsWith("#")),
  Stream.runCollect
)

Effect.runPromise(dataOnly).then((chunk) => {
  console.log([...chunk])  // ["数据1", "数据2", "数据3"]
})

原理:

使用 take 限制要处理的元素数量。使用 drop 跳过元素。添加 While 变体进行基于条件的限制。


流可以是无限的或非常大。这些操作符让您:

  1. 限制处理 - 仅获取您需要的
  2. 跳过标题 - 丢弃前 N 个元素
  3. 条件限制 - 基于谓词获取/丢弃
  4. 分页 - 实现跳过/限制模式