名称: effect-patterns-streams-getting-started 描述: Effect-TS 模式用于流入门。在Effect-TS应用程序中处理流入门时使用。
Effect-TS 模式:流入门
此技能提供了4个精选的Effect-TS模式,用于流入门。 在处理以下相关任务时使用此技能:
- 流入门
- Effect-TS应用程序中的最佳实践
- 实际模式和解决方案
🟢 初学者模式
您的第一个流
规则: 使用流来懒惰且高效地处理数据序列。
好例子:
import { Effect, Stream } from "effect"
// 从显式值创建流
const numbers = Stream.make(1, 2, 3, 4, 5)
// 从数组创建流
const fromArray = Stream.fromIterable([10, 20, 30])
// 创建单值流
const single = Stream.succeed("hello")
// 转换并运行流
const program = numbers.pipe(
Stream.map((n) => n * 2), // 将每个数字加倍
Stream.filter((n) => n > 4), // 只保留大于4的
Stream.runCollect // 收集结果
)
Effect.runPromise(program).then((chunk) => {
console.log([...chunk]) // [6, 8, 10]
})
反模式:
当需要懒惰处理或异步操作时,不要使用常规数组:
// 反模式:急切处理,所有内容都在内存中
const numbers = [1, 2, 3, 4, 5]
const doubled = numbers.map((n) => n * 2)
const filtered = doubled.filter((n) => n > 4)
这会立即将所有内容加载到内存中。使用流的时机:
- 数据量大或可能是无限的
- 数据异步到达
- 需要背压或资源管理
原理:
流是一个懒惰的值序列,可以一次处理一个值。使用 Stream.make、Stream.fromIterable 或 Stream.succeed 创建流。
流是Effect处理数据序列的答案。与一次性将所有值保存在内存中的数组不同,流按需生成值。这使得它们适用于:
- 大数据集 - 处理数百万条记录,无需将所有内容加载到内存中
- 异步数据 - 处理随时间到达的数据(文件、API、事件)
- 可组合管道 - 链式转换,逐元素工作
流 vs 效果 - 何时使用哪个
规则: 使用效果处理单个值,使用流处理值序列。
好例子:
import { Effect, Stream } from "effect"
// ============================================
// 效果:单结果操作
// ============================================
// 获取一个用户 - 返回 Effect<User>
const fetchUser = (id: string) =>
Effect.tryPromise(() =>
fetch(`/api/users/${id}`).then((r) => r.json())
)
// 读取整个配置 - 返回 Effect<Config>
const loadConfig = Effect.tryPromise(() =>
fetch("/config.json").then((r) => r.json())
)
// ============================================
// 流:多值操作
// ============================================
// 逐行处理文件 - 返回 Stream<string>
const fileLines = Stream.fromIterable([
"第一行",
"第二行",
"第三行",
])
// 随时间生成事件 - 返回 Stream<Event>
const events = Stream.make(
{ type: "点击", x: 10 },
{ type: "点击", x: 20 },
{ type: "滚动", y: 100 },
)
// ============================================
// 在它们之间转换
// ============================================
// 效果 → 流(单个值变为单元素流)
const effectToStream = Stream.fromEffect(fetchUser("123"))
// 流 → 效果(收集所有值到数组)
const streamToEffect = Stream.runCollect(fileLines)
// 流 → 效果(为副作用处理每个值)
const processAll = fileLines.pipe(
Stream.runForEach((line) => Effect.log(`处理中: ${line}`))
)
// ============================================
// 决策指南
// ============================================
// 使用效果当:
// - 获取单个资源
// - 计算单个结果
// - 执行一个操作
// 使用流当:
// - 逐行读取文件
// - 处理分页API结果
// - 处理实时事件
// - 处理大数据集
// - 构建数据管道
原理:
当您的操作产生单个结果时,使用 效果。当您的操作随时间产生多个值时,使用 流。
效果和流都是懒惰且可组合的,但它们服务于不同目的:
| 方面 | 效果 | 流 |
|---|---|---|
| 产生 | 一个值 | 零个或多个值 |
| 内存 | 保存一个结果 | 增量处理 |
| 用例 | API调用、数据库查询 | 文件行、事件、批次 |
运行和收集流结果
规则: 根据您从结果中需要的选择正确的 Stream.run* 方法。
好例子:
import { Effect, Stream, Option } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// ============================================
// runCollect - 将所有结果作为 Chunk 获取
// ============================================
const collectAll = numbers.pipe(
Stream.map((n) => n * 10),
Stream.runCollect
)
Effect.runPromise(collectAll).then((chunk) => {
console.log([...chunk]) // [10, 20, 30, 40, 50]
})
// ============================================
// runForEach - 处理每个项目
// ============================================
const processEach = numbers.pipe(
Stream.runForEach((n) =>
Effect.log(`处理中: ${n}`)
)
)
Effect.runPromise(processEach)
// 日志: 处理中: 1, 处理中: 2, 等等。
// ============================================
// runDrain - 仅运行为副作用
// ============================================
const withSideEffects = numbers.pipe(
Stream.tap((n) => Effect.log(`看到: ${n}`)),
Stream.runDrain // 丢弃值,仅运行
)
// ============================================
// runHead - 仅获取第一个值
// ============================================
const getFirst = numbers.pipe(
Stream.runHead
)
Effect.runPromise(getFirst).then((option) => {
if (Option.isSome(option)) {
console.log(`第一个: ${option.value}`) // 第一个: 1
}
})
// ============================================
// runLast - 仅获取最后一个值
// ============================================
const getLast = numbers.pipe(
Stream.runLast
)
Effect.runPromise(getLast).then((option) => {
if (Option.isSome(option)) {
console.log(`最后一个: ${option.value}`) // 最后一个: 5
}
})
// ============================================
// runFold - 累积为单个结果
// ============================================
const sum = numbers.pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
Effect.runPromise(sum).then((total) => {
console.log(`总和: ${total}`) // 总和: 15
})
// ============================================
// runCount - 计数元素
// ============================================
const count = numbers.pipe(Stream.runCount)
Effect.runPromise(count).then((n) => {
console.log(`计数: ${n}`) // 计数: 5
})
原理:
流是懒惰的 - 直到您运行它们才会发生任何事。根据您需要的选择运行方法:所有结果、每个项目的效果,或仅完成。
效果提供了几种消费流的方式,每种都针对不同的用例进行了优化:
| 方法 | 返回 | 使用时 |
|---|---|---|
| runCollect | Chunk<A> |
需要内存中的所有结果 |
| runForEach | void |
为副作用处理每个项目 |
| runDrain | void |
运行为副作用,忽略值 |
| runHead | Option<A> |
仅需要第一个值 |
| runLast | Option<A> |
仅需要最后一个值 |
| runFold | S |
累积为单个结果 |
获取和丢弃流元素
规则: 使用 take/drop 控制流大小,使用 takeWhile/dropWhile 进行条件限制。
好例子:
import { Effect, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// ============================================
// take - 获取前 N 个元素
// ============================================
const firstThree = numbers.pipe(
Stream.take(3),
Stream.runCollect
)
Effect.runPromise(firstThree).then((chunk) => {
console.log([...chunk]) // [1, 2, 3]
})
// ============================================
// drop - 跳过前 N 个元素
// ============================================
const skipThree = numbers.pipe(
Stream.drop(3),
Stream.runCollect
)
Effect.runPromise(skipThree).then((chunk) => {
console.log([...chunk]) // [4, 5, 6, 7, 8, 9, 10]
})
// ============================================
// 组合用于分页(skip + limit)
// ============================================
const page2 = numbers.pipe(
Stream.drop(3), // 跳过第一页
Stream.take(3), // 获取第二页
Stream.runCollect
)
Effect.runPromise(page2).then((chunk) => {
console.log([...chunk]) // [4, 5, 6]
})
// ============================================
// takeWhile - 获取当条件为真时
// ============================================
const untilFive = numbers.pipe(
Stream.takeWhile((n) => n < 5),
Stream.runCollect
)
Effect.runPromise(untilFive).then((chunk) => {
console.log([...chunk]) // [1, 2, 3, 4]
})
// ============================================
// dropWhile - 跳过当条件为真时
// ============================================
const afterFive = numbers.pipe(
Stream.dropWhile((n) => n < 5),
Stream.runCollect
)
Effect.runPromise(afterFive).then((chunk) => {
console.log([...chunk]) // [5, 6, 7, 8, 9, 10]
})
// ============================================
// takeUntil - 获取直到条件变为真
// ============================================
const untilSix = numbers.pipe(
Stream.takeUntil((n) => n === 6),
Stream.runCollect
)
Effect.runPromise(untilSix).then((chunk) => {
console.log([...chunk]) // [1, 2, 3, 4, 5, 6]
})
// ============================================
// 实际:处理带标题的文件
// ============================================
const fileLines = Stream.make(
"# 标题",
"# 注释",
"数据1",
"数据2",
"数据3"
)
const dataOnly = fileLines.pipe(
Stream.dropWhile((line) => line.startsWith("#")),
Stream.runCollect
)
Effect.runPromise(dataOnly).then((chunk) => {
console.log([...chunk]) // ["数据1", "数据2", "数据3"]
})
原理:
使用 take 限制要处理的元素数量。使用 drop 跳过元素。添加 While 变体进行基于条件的限制。
流可以是无限的或非常大。这些操作符让您:
- 限制处理 - 仅获取您需要的
- 跳过标题 - 丢弃前 N 个元素
- 条件限制 - 基于谓词获取/丢弃
- 分页 - 实现跳过/限制模式