name: effect-patterns-streams description: Effect-TS流模式。在Effect-TS应用程序中处理流时使用。
Effect-TS流模式
此技能提供了8个精选的Effect-TS流模式。 在处理以下相关任务时使用此技能:
- 流
- Effect-TS应用程序中的最佳实践
- 真实世界的模式和解决方案
🟢 初级模式
流模式1:使用Map和Filter转换流
规则: 使用map和filter组合器声明性地转换流元素,创建管道以重塑数据而无需物化中间结果。
良好示例:
此示例演示通过多个阶段转换原始数据流。
import { Stream, Effect, Chunk } from "effect";
interface RawLogEntry {
readonly timestamp: string;
readonly level: string;
readonly message: string;
}
interface ProcessedLog {
readonly date: Date;
readonly severity: "low" | "medium" | "high";
readonly normalizedMessage: string;
}
// 创建原始日志条目流
const createLogStream = (): Stream.Stream<RawLogEntry> =>
Stream.fromIterable([
{ timestamp: "2025-12-17T09:00:00Z", level: "DEBUG", message: "App starting" },
{ timestamp: "2025-12-17T09:01:00Z", level: "INFO", message: "Connected to DB" },
{ timestamp: "2025-12-17T09:02:00Z", level: "ERROR", message: "Query timeout" },
{ timestamp: "2025-12-17T09:03:00Z", level: "DEBUG", message: "Retry initiated" },
{ timestamp: "2025-12-17T09:04:00Z", level: "WARN", message: "Connection degraded" },
{ timestamp: "2025-12-17T09:05:00Z", level: "INFO", message: "Recovered" },
]);
// 转换:解析时间戳
const parseTimestamp = (entry: RawLogEntry): RawLogEntry => ({
...entry,
timestamp: entry.timestamp, // 已经是ISO格式,但可以在这里解析
});
// 转换:将日志级别映射到严重性
const mapSeverity = (level: string): "low" | "medium" | "high" => {
if (level === "DEBUG" || level === "INFO") return "low";
if (level === "WARN") return "medium";
return "high";
};
// 转换:规范化消息
const normalizeMessage = (message: string): string =>
message.toLowerCase().trim();
// 过滤:仅保留重要日志
const isImportant = (entry: RawLogEntry): boolean => {
return entry.level !== "DEBUG";
};
// 主管道
const program = Effect.gen(function* () {
console.log(`
[流] 使用map/filter处理日志流
`);
// 创建并转换流
const transformedStream = createLogStream().pipe(
// 过滤:仅保留非调试日志
Stream.filter((entry) => {
const important = isImportant(entry);
console.log(
`[过滤] ${entry.level} → ${important ? "✓ 保留" : "✗ 过滤掉"}`
);
return important;
}),
// 映射:提取日期
Stream.map((entry) => {
const date = new Date(entry.timestamp);
console.log(`[映射-1] 解析日期: ${date.toISOString()}`);
return { ...entry, parsedDate: date };
}),
// 映射:规范化和映射严重性
Stream.map((entry) => {
const processed: ProcessedLog = {
date: entry.parsedDate,
severity: mapSeverity(entry.level),
normalizedMessage: normalizeMessage(entry.message),
};
console.log(
`[映射-2] 转换: ${entry.level} → ${processed.severity}`
);
return processed;
})
);
// 收集所有转换后的日志
const results = yield* transformedStream.pipe(
Stream.runCollect
);
console.log(`
[结果]`);
console.log(` 总日志数: ${results.length}`);
Chunk.forEach(results, (log) => {
console.log(
` - [${log.severity.toUpperCase()}] ${log.date.toISOString()}: ${log.normalizedMessage}`
);
});
});
Effect.runPromise(program);
输出显示惰性评估和过滤:
[流] 使用map/filter处理日志流
[过滤] DEBUG → ✗ 过滤掉
[过滤] INFO → ✓ 保留
[映射-1] 解析日期: 2025-12-17T09:01:00.000Z
[映射-2] 转换: INFO → low
[过滤] ERROR → ✓ 保留
[映射-1] 解析日期: 2025-12-17T09:02:00.000Z
[映射-2] 转换: ERROR → high
...
[结果]
总日志数: 5
- [LOW] 2025-12-17T09:01:00.000Z: connected to db
- [HIGH] 2025-12-17T09:02:00.000Z: query timeout
...
原理:
使用Stream.map和Stream.filter转换流:
- map:转换每个元素(1→1)
- filter:保留匹配谓词的元素(N→N,丢弃一些)
- 链式:组合多个操作
- 惰性:按需转换元素(无缓冲)
模式:stream.pipe(Stream.map(...), Stream.filter(...))
没有map/filter的流数据转换会带来问题:
- 缓冲:必须在转换前收集所有数据
- 代码冗长:为每个转换手动循环
- 内存使用:大型中间数组
- 可组合性:难以链式操作
Map/filter支持:
- 惰性评估:按需转换
- 可组合:自然链式操作
- 内存高效:无中间集合
- 表达性强:清晰声明意图
真实世界示例:处理日志
- 没有map/filter:收集日志,按级别过滤,映射到对象,转换字段
- 使用map/filter:
logStream.pipe(Stream.filter(...), Stream.map(...))
🟡 中级模式
流模式2:合并和组合多个流
规则: 使用merge和concat组合器组合多个流,支持从多个独立源聚合数据。
良好示例:
此示例演示将多个事件流合并为统一流。
import { Stream, Effect, Chunk } from "effect";
interface Event {
readonly source: string;
readonly type: string;
readonly data: string;
readonly timestamp: Date;
}
// 从不同源创建独立事件流
const createUserEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "user-service", type: "login", data: "user-123", timestamp: new Date(Date.now() + 0) },
{ source: "user-service", type: "logout", data: "user-123", timestamp: new Date(Date.now() + 500) },
]).pipe(
Stream.tap(() => Effect.sleep("500 millis"))
);
const createPaymentEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "payment-service", type: "payment-started", data: "order-456", timestamp: new Date(Date.now() + 200) },
{ source: "payment-service", type: "payment-completed", data: "order-456", timestamp: new Date(Date.now() + 800) },
]).pipe(
Stream.tap(() => Effect.sleep("600 millis"))
);
const createAuditEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "audit-log", type: "access-granted", data: "resource-789", timestamp: new Date(Date.now() + 100) },
{ source: "audit-log", type: "access-revoked", data: "resource-789", timestamp: new Date(Date.now() + 900) },
]).pipe(
Stream.tap(() => Effect.sleep("800 millis"))
);
// 合并流(交错,无序)
const mergedEventStream = (): Stream.Stream<Event> => {
const userStream = createUserEventStream();
const paymentStream = createPaymentEventStream();
const auditStream = createAuditEventStream();
return Stream.merge(userStream, paymentStream, auditStream);
};
// 连接流(顺序,有序)
const concatenatedEventStream = (): Stream.Stream<Event> => {
return createUserEventStream().pipe(
Stream.concat(createPaymentEventStream()),
Stream.concat(createAuditEventStream())
);
};
// 主程序:比较merge与concat
const program = Effect.gen(function* () {
console.log(`
[合并] 从多个源交错事件:
`);
// 收集合并流
const mergedEvents = yield* mergedEventStream().pipe(
Stream.runCollect
);
Chunk.forEach(mergedEvents, (event, idx) => {
console.log(
` ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
);
});
console.log(`
[连接] 顺序事件(用户 → 支付 → 审计):
`);
// 收集连接流
const concatEvents = yield* concatenatedEventStream().pipe(
Stream.runCollect
);
Chunk.forEach(concatEvents, (event, idx) => {
console.log(
` ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
);
});
});
Effect.runPromise(program);
输出显示合并交错与连接顺序:
[合并] 从多个源交错事件:
1. [audit-log] access-granted: resource-789
2. [user-service] login: user-123
3. [payment-service] payment-started: order-456
4. [user-service] logout: user-123
5. [payment-service] payment-completed: order-456
6. [audit-log] access-revoked: resource-789
[连接] 顺序事件(用户 → 支付 → 审计):
1. [user-service] login: user-123
2. [user-service] logout: user-123
3. [payment-service] payment-started: order-456
4. [payment-service] payment-completed: order-456
5. [audit-log] access-granted: resource-789
6. [audit-log] access-revoked: resource-789
原理:
使用以下方式组合多个流:
- merge:从多个流交错元素(无序)
- concat:顺序链式流(有序,等待第一个完成)
- mergeAll:合并流集合
- zip:组合多个流的对应元素
模式:Stream.merge(stream1, stream2) 或 stream1.pipe(Stream.concat(stream2))
没有merge/concat的多源数据处理会带来问题:
- 复杂协调:手动循环多个源
- 难以聚合:从不同源收集冗长
- 顺序混淆:顺序与并行不明确
- 资源管理:多个独立消费者
Merge/concat支持:
- 简单组合:自然组合流
- 语义清晰:Merge = 并行,concat = 顺序
- 聚合:单一消费者处理多源
- 可扩展性:添加源无需重构
真实世界示例:聚合用户事件
- 没有merge:轮询用户服务、事件日志、通知分开处理
- 使用merge:
Stream.merge(userStream, eventStream, notificationStream)
流模式3:控制流中的背压
规则: 使用背压控制来管理快速生产者和慢速消费者之间的流,防止内存耗尽和资源溢出。
良好示例:
此示例演示以不同速率消费事件时的背压管理。
import { Stream, Effect, Chunk } from "effect";
interface DataPoint {
readonly id: number;
readonly value: number;
}
// 快速生产者:每秒生成100个项目
const fastProducer = (): Stream.Stream<DataPoint> =>
Stream.fromIterable(Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }))).pipe(
Stream.tap(() => Effect.sleep("10 millis")) // 每个项目10毫秒 = 100/秒
);
// 慢速消费者:每秒处理10个项目
const slowConsumer = (item: DataPoint): Effect.Effect<void> =>
Effect.gen(function* () {
yield* Effect.sleep("100 millis"); // 每个项目100毫秒 = 10/秒
});
// 无背压(危险 - 队列无界增长)
const unbufferedStream = (): Stream.Stream<DataPoint> =>
fastProducer().pipe(
Stream.tap((item) =>
Effect.log(`[无缓冲] 产生项目 ${item.id}`)
)
);
// 有界缓冲(背压生效)
const bufferedStream = (bufferSize: number): Stream.Stream<DataPoint> =>
fastProducer().pipe(
// 最多缓冲10个项目;如果满,生产者等待
Stream.buffer(bufferSize),
Stream.tap((item) =>
Effect.log(`[缓冲] 消费项目 ${item.id}`)
)
);
// 节流(限速发射)
const throttledStream = (): Stream.Stream<DataPoint> =>
fastProducer().pipe(
// 最多每秒20个项目(每50毫秒1个)
Stream.throttle(1, "50 millis"),
Stream.tap((item) =>
Effect.log(`[节流] 项目 ${item.id}`)
)
);
// 主程序:比较方法
const program = Effect.gen(function* () {
console.log(`
[开始] 演示背压管理
`);
// 测试缓冲方法
console.log(`[测试1] 缓冲流(缓冲区大小5):
`);
const startBuffer = Date.now();
yield* bufferedStream(5).pipe(
Stream.take(20), // 仅取20个项目
Stream.runForEach(slowConsumer)
);
const bufferTime = Date.now() - startBuffer;
console.log(`
[结果] 缓冲方法耗时 ${bufferTime}毫秒
`);
// 测试节流方法
console.log(`[测试2] 节流流(每50毫秒1个项目):
`);
const startThrottle = Date.now();
yield* throttledStream().pipe(
Stream.take(20),
Stream.runForEach(slowConsumer)
);
const throttleTime = Date.now() - startThrottle;
console.log(`
[结果] 节流方法耗时 ${throttleTime}毫秒
`);
// 总结
console.log(`[总结]`);
console.log(` 无背压控制:`);
console.log(` - 队列会增长到100个项目(内存风险)`);
console.log(` - 生产者/消费者独立运行`);
console.log(` 有缓冲:`);
console.log(` - 队列限制为5个项目(安全)`);
console.log(` - 缓冲区满时生产者等待`);
console.log(` 有节流:`);
console.log(` - 生产速率限制为每秒20个`);
console.log(` - 平滑可控流`);
});
Effect.runPromise(program);
原理:
背压是流控制:慢速消费者告诉快速生产者减速。
技术:
- 缓冲:临时存储项目(有限队列)
- 节流:限速项目发射
- 分块:以固定大小批次处理
- 防抖:跳过快速重复项
模式:stream.pipe(Stream.throttle(...), Stream.buffer(...))
没有背压管理,生产者和消费者速度不匹配会导致:
- 内存耗尽:生产者快于消费者 → 队列无界增长
- 垃圾收集暂停:大缓冲区导致GC压力
- 资源泄漏:打开连接/文件句柄累积
- 级联故障:一个慢消费者阻塞整个管道
背压支持:
- 内存安全:有界缓冲区防止溢出
- 资源效率:消费者自然调整生产者节奏
- 性能:调整缓冲区大小提高吞吐量
- 可观察性:监控背压作为健康指标
真实世界示例:读取大文件与写入数据库
- 无背压:将整个文件读入内存,慢速写入 → 内存耗尽
- 有背压:读取1000行,等待数据库,读取下一批
流模式4:使用Scan和Fold进行状态操作
规则: 使用scan进行逐元素状态处理,使用fold进行最终聚合,支持无需缓冲整个流的复杂流分析。
良好示例:
此示例演示维护测量流的统计信息。
import { Stream, Effect, Chunk } from "effect";
interface Measurement {
readonly id: number;
readonly value: number;
readonly timestamp: Date;
}
interface RunningStats {
readonly count: number;
readonly sum: number;
readonly min: number;
readonly max: number;
readonly average: number;
readonly variance: number;
readonly lastValue: number;
}
// 创建测量流
const createMeasurementStream = (): Stream.Stream<Measurement> =>
Stream.fromIterable([
{ id: 1, value: 10, timestamp: new Date() },
{ id: 2, value: 20, timestamp: new Date() },
{ id: 3, value: 15, timestamp: new Date() },
{ id: 4, value: 25, timestamp: new Date() },
{ id: 5, value: 30, timestamp: new Date() },
{ id: 6, value: 22, timestamp: new Date() },
]);
// 初始统计状态
const initialStats: RunningStats = {
count: 0,
sum: 0,
min: Infinity,
max: -Infinity,
average: 0,
variance: 0,
lastValue: 0,
};
// 归约器:为每个测量更新统计
const updateStats = (
stats: RunningStats,
measurement: Measurement
): RunningStats => {
const newCount = stats.count + 1;
const newSum = stats.sum + measurement.value;
const newAverage = newSum / newCount;
// 增量计算方差
const delta = measurement.value - stats.average;
const delta2 = measurement.value - newAverage;
const newVariance = stats.variance + delta * delta2;
return {
count: newCount,
sum: newSum,
min: Math.min(stats.min, measurement.value),
max: Math.max(stats.max, measurement.value),
average: newAverage,
variance: newVariance / newCount,
lastValue: measurement.value,
};
};
// 主程序:演示scan与统计
const program = Effect.gen(function* () {
console.log(`
[scan] 运行统计流:
`);
// 使用scan发射中间统计
const statsStream = createMeasurementStream().pipe(
Stream.scan(initialStats, (stats, measurement) => {
const newStats = updateStats(stats, measurement);
console.log(
`[测量 ${measurement.id}] 值: ${measurement.value}`
);
console.log(
` 计数: ${newStats.count}, 平均: ${newStats.average.toFixed(2)}, ` +
`最小: ${newStats.min}, 最大: ${newStats.max}, ` +
`方差: ${newStats.variance.toFixed(2)}`
);
return newStats;
})
);
// 收集所有中间统计
const allStats = yield* statsStream.pipe(Stream.runCollect);
// 最终统计
const finalStats = Chunk.last(allStats);
if (finalStats._tag === "Some") {
console.log(`
[最终统计]`);
console.log(` 总测量数: ${finalStats.value.count}`);
console.log(` 平均值: ${finalStats.value.average.toFixed(2)}`);
console.log(` 最小值: ${finalStats.value.min}`);
console.log(` 最大值: ${finalStats.value.max}`);
console.log(
` 标准差: ${Math.sqrt(finalStats.value.variance).toFixed(2)}`
);
}
// 与fold比较(仅发射最终结果)
console.log(`
[fold] 仅最终统计:
`);
const finalResult = yield* createMeasurementStream().pipe(
Stream.fold(initialStats, updateStats),
Stream.tap((stats) =>
Effect.log(`最终: 计数=${stats.count}, 平均=${stats.average.toFixed(2)}`)
)
);
});
Effect.runPromise(program);
原理:
状态流操作:
- scan:使用累加器应用函数,发射中间状态
- fold:使用累加器应用函数,仅发射最终结果
- reduce:类似fold但需要非空流
模式:stream.pipe(Stream.scan(initialState, reducer)) 或 Stream.fold(initialState, reducer)
没有scan/fold处理流会带来问题:
- 手动状态跟踪:流外部的Ref或可变变量
- 丢失上下文:难以关联中间值
- 易错:容易忘记状态更新
- 测试困难:状态散布在代码中
Scan/fold支持:
- 声明式状态:状态通过流传递
- 中间值:每步发射状态(scan)
- 类型安全:累加器类型保证
- 可组合:链式状态操作
真实世界示例:指标运行平均
- 没有scan:手动跟踪计数和总和,计算平均,发射
- 使用scan:
stream.pipe(Stream.scan(initialState, updateAverage))
🟠 高级模式
流模式5:流的分组和窗口
规则: 使用groupBy按键分区流,使用滚动/滑动窗口在时间窗口上聚合流。
良好示例:
此示例演示窗口和分组模式。
import { Effect, Stream, Ref, Duration, Schedule } from "effect";
interface Event {
readonly timestamp: Date;
readonly userId: string;
readonly action: string;
readonly duration: number; // 毫秒
}
// 模拟事件流
const generateEvents = (): Event[] => [
{ timestamp: new Date(Date.now() - 5000), userId: "user1", action: "click", duration: 100 },
{ timestamp: new Date(Date.now() - 4500), userId: "user2", action: "view", duration: 250 },
{ timestamp: new Date(Date.now() - 4000), userId: "user1", action: "scroll", duration: 150 },
{ timestamp: new Date(Date.now() - 3500), userId: "user3", action: "click", duration: 120 },
{ timestamp: new Date(Date.now() - 3000), userId: "user2", action: "click", duration: 180 },
{ timestamp: new Date(Date.now() - 2500), userId: "user1", action: "view", duration: 200 },
{ timestamp: new Date(Date.now() - 2000), userId: "user3", action: "view", duration: 300 },
{ timestamp: new Date(Date.now() - 1500), userId: "user1", action: "submit", duration: 500 },
{ timestamp: new Date(Date.now() - 1000), userId: "user2", action: "scroll", duration: 100 },
];
// 主程序:窗口和分组示例
const program = Effect.gen(function* () {
console.log(`
[窗口与分组] 流组织模式
`);
const events = generateEvents();
// 示例1:滚动窗口(固定大小批次)
console.log(`[1] 滚动窗口(2事件批次):
`);
const windowSize = 2;
let batchNumber = 1;
for (let i = 0; i < events.length; i += windowSize) {
const batch = events.slice(i, i + windowSize);
yield* Effect.log(`[窗口 ${batchNumber}] (${batch.length} 事件)`);
let totalDuration = 0;
for (const event of batch) {
yield* Effect.log(
` - ${event.userId}: ${event.action} (${event.duration}毫秒)`
);
totalDuration += event.duration;
}
yield* Effect.log(`[窗口 ${batchNumber}] 总时长: ${totalDuration}毫秒
`);
batchNumber++;
}
// 示例2:滑动窗口(重叠)
console.log(`[2] 滑动窗口(最后3事件,每次滑动1):
`);
const windowSizeSlide = 3;
const slideBy = 1;
for (let i = 0; i <= events.length - windowSizeSlide; i += slideBy) {
const window = events.slice(i, i + windowSizeSlide);
const avgDuration =
window.reduce((sum, e) => sum + e.duration, 0) / window.length;
yield* Effect.log(
`[滑动 ${i / slideBy}] ${window.length} 事件, 平均时长: ${avgDuration.toFixed(0)}毫秒`
);
}
// 示例3:按键分组
console.log(`
[3] 按用户分组:
`);
const byUser = new Map<string, Event[]>();
for (const event of events) {
if (!byUser.has(event.userId)) {
byUser.set(event.userId, []);
}
byUser.get(event.userId)!.push(event);
}
for (const [userId, userEvents] of byUser) {
const totalActions = userEvents.length;
const totalTime = userEvents.reduce((sum, e) => sum + e.duration, 0);
const avgTime = totalTime / totalActions;
yield* Effect.log(
`[用户 ${userId}] ${totalActions} 动作, ${totalTime}毫秒总计, ${avgTime.toFixed(0)}毫秒平均`
);
}
// 示例4:分组 + 窗口组合
console.log(`
[4] 按用户分组,按动作类型窗口:
`);
for (const [userId, userEvents] of byUser) {
const byAction = new Map<string, Event[]>();
for (const event of userEvents) {
if (!byAction.has(event.action)) {
byAction.set(event.action, []);
}
byAction.get(event.action)!.push(event);
}
yield* Effect.log(`[用户 ${userId}] 动作细分:`);
for (const [action, actionEvents] of byAction) {
const count = actionEvents.length;
const total = actionEvents.reduce((sum, e) => sum + e.duration, 0);
yield* Effect.log(` ${action}: ${count}次 (${total}毫秒总计)`);
}
}
// 示例5:会话窗口(基于不活动超时)
console.log(`
[5] 会话窗口(间隔 > 1000毫秒 = 新会话):
`);
const sessionGapMs = 1000;
const sessions: Event[][] = [];
let currentSession: Event[] = [];
let lastTimestamp = events[0]?.timestamp.getTime() ?? 0;
for (const event of events) {
const currentTime = event.timestamp.getTime();
const timeSinceLastEvent = currentTime - lastTimestamp;
if (timeSinceLastEvent > sessionGapMs && currentSession.length > 0) {
sessions.push(currentSession);
yield* Effect.log(
`[会话] 关闭 (${currentSession.length} 事件, 间隔: ${timeSinceLastEvent}毫秒)`
);
currentSession = [];
}
currentSession.push(event);
lastTimestamp = currentTime;
}
if (currentSession.length > 0) {
sessions.push(currentSession);
yield* Effect.log(`[会话] 最终 (${currentSession.length} 事件)`);
}
// 示例6:窗口中Top-K聚合
console.log(`
[6] 最后窗口中前2动作:
`);
const lastWindow = events.slice(-3);
const actionCounts = new Map<string, number>();
for (const event of lastWindow) {
actionCounts.set(
event.action,
(actionCounts.get(event.action) ?? 0) + 1
);
}
const topActions = Array.from(actionCounts.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 2);
yield* Effect.log(`[Top-K] 在最后3事件窗口中:`);
for (const [action, count] of topActions) {
yield* Effect.log(` ${action}: ${count}次`);
}
});
Effect.runPromise(program);
原理:
窗口将无界流组织为有界块:
- 滚动窗口:固定大小非重叠(例如,1秒窗口)
- 滑动窗口:重叠窗口(例如,10秒窗口,5秒跳步)
- 按键分组:按字段值分区流
- 会话窗口:基于事件的窗口(例如,空闲超时)
- 批处理聚合:处理N个项目或等待T秒
模式:Stream.groupBy(),使用Ref和Schedule自定义窗口
无界流需要边界:
问题1:内存耗尽
- 处理1M事件无边界 = 全部保存在内存
- 累积内存无界增长
- 最终OOM错误
问题2:延迟聚合
- 流结束前无法求和所有事件(永不)
- 需要决定:“在此1秒窗口内求和事件”
问题3:分组复杂性
- 用户事件流:需要按用户聚合
- 没有groupBy:手动状态跟踪(易错)
问题4:时间模式
- “最后5分钟前10搜索”需要窗口
- “每分钟每个端点平均响应时间”需要分组 + 窗口
解决方案:
滚动窗口:
- 将流划分为1秒、5秒或1分钟块
- 独立处理每个块
- 窗口间清除内存
- 适用于:指标、批处理、报告
滑动窗口:
- 始终保留最后5分钟数据
- 每秒发射更新聚合
- 检测重叠期间模式
- 适用于:异常检测、趋势
按键分组:
- 按键分离流
- 每个键有独立状态
- 发射分组结果
- 适用于:按用户、按端点、按租户
流模式6:流中的资源管理
规则: 使用Stream.bracket或effect作用域保证资源清理,即使流失败或中断也能防止泄漏。
良好示例:
此示例演示资源获取、使用和保证清理。
import { Effect, Stream, Resource, Scope, Ref } from "effect";
interface FileHandle {
readonly path: string;
readonly fd: number;
}
interface Connection {
readonly id: string;
readonly isOpen: boolean;
}
// 模拟资源管理
const program = Effect.gen(function* () {
console.log(`
[资源管理] 流资源生命周期
`);
// 示例1:文件流的Bracket模式
console.log(`[1] Bracket模式(获取 → 使用 → 释放):
`);
let openHandles = 0;
let closedHandles = 0;
const openFile = (path: string) =>
Effect.gen(function* () {
openHandles++;
yield* Effect.log(`[打开] 文件 "${path}" (总计打开: ${openHandles})`);
return { path, fd: 1000 + openHandles };
});
const closeFile = (handle: FileHandle) =>
Effect.gen(function* () {
closedHandles++;
yield* Effect.log(`[关闭] 文件 "${handle.path}" (总计关闭: ${closedHandles})`);
});
const readFileWithBracket = (path: string) =>
Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile(path);
yield* Effect.log(
`[使用] 从fd ${handle.fd}读取 ("${handle.path}")`
);
// 模拟读取
return "文件内容";
} finally {
// 即使上方出错也保证运行
if (handle) {
yield* closeFile(handle);
}
}
});
// 测试成功情况
yield* Effect.log(`[测试] 成功案例:`);
const content = yield* readFileWithBracket("/data/file.txt");
yield* Effect.log(`[结果] 获取: "${content}"
`);
// 测试失败情况(模拟)
yield* Effect.log(`[测试] 错误案例:`);
const failCase = Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile("/data/missing.txt");
// 模拟操作中错误
yield* Effect.fail(new Error("读取失败"));
} finally {
if (handle) {
yield* closeFile(handle);
}
}
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[错误] 捕获: ${error.message}`);
yield* Effect.log(`[检查] 关闭句柄: ${closedHandles} (验证清理)
`);
})
)
);
yield* failCase;
// 示例2:连接池管理
console.log(`[2] 连接池管理:
`);
interface ConnectionPool {
acquire: () => Effect.Effect<Connection>;
release: (conn: Connection) => Effect.Effect<void>;
}
const createConnectionPool = (maxSize: number): Effect.Effect<ConnectionPool> =>
Effect.gen(function* () {
const available = yield* Ref.make<Connection[]>([]);
const inUse = yield* Ref.make<Set<string>>(new Set());
let idCounter = 0;
return {
acquire: Effect.gen(function* () {
const avail = yield* Ref.get(available);
if (avail.length > 0) {
yield* Effect.log(`[池] 从池中重用连接`);
const conn = avail.pop()!;
yield* Ref.modify(inUse, (set) => [
undefined,
new Set(set).add(conn.id),
]);
return conn;
}
const inUseCount = (yield* Ref.get(inUse)).size;
if (inUseCount >= maxSize) {
yield* Effect.fail(new Error("池耗尽"));
}
const connId = `conn-${++idCounter}`;
yield* Effect.log(`[池] 创建新连接: ${connId}`);
const conn = { id: connId, isOpen: true };
yield* Ref.modify(inUse, (set) => [
undefined,
new Set(set).add(connId),
]);
return conn;
}),
release: (conn: Connection) =>
Effect.gen(function* () {
yield* Ref.modify(inUse, (set) => {
const updated = new Set(set);
updated.delete(conn.id);
return [undefined, updated];
});
yield* Ref.modify(available, (avail) => [
undefined,
[...avail, conn],
]);
yield* Effect.log(`[池] 返回连接: ${conn.id}`);
}),
};
});
const pool = yield* createConnectionPool(3);
// 获取和释放连接
const conn1 = yield* pool.acquire();
const conn2 = yield* pool.acquire();
yield* pool.release(conn1);
const conn3 = yield* pool.acquire(); // 重用conn1
yield* Effect.log(`
`);
// 示例3:基于作用域的资源安全
console.log(`[3] 作用域资源(分层清理):
`);
let scopedCount = 0;
const withScoped = <R,>(create: () => Effect.Effect<R>) =>
Effect.gen(function* () {
scopedCount++;
const id = scopedCount;
yield* Effect.log(`[作用域] 进入作用域 ${id}`);
const resource = yield* create();
yield* Effect.log(`[作用域] 在作用域 ${id} 中使用资源`);
yield* Effect.sync(() => {
// 作用域退出时清理发生
yield* Effect.log(`[作用域] 退出作用域 ${id}`);
}).pipe(
Effect.ensuring(
Effect.log(`[作用域] 保证清理作用域 ${id}`)
)
);
return resource;
});
// 嵌套作用域
const result = yield* withScoped(() =>
Effect.succeed({
level: 1,
data: yield* withScoped(() => Effect.succeed("内部数据")),
})
).pipe(
Effect.catchAll(() => Effect.succeed({ level: 0, data: null }))
);
yield* Effect.log(`[作用域] 清理顺序: 内部 → 外部
`);
// 示例4:流资源管理
console.log(`[4] 带资源清理的流:
`);
let streamResourceCount = 0;
// 模拟获取资源的流
const streamWithResources = Stream.empty.pipe(
Stream.tap(() =>
Effect.gen(function* () {
streamResourceCount++;
yield* Effect.log(`[流-资源] 获取资源 ${streamResourceCount}`);
})
),
// 流结束时清理
Stream.ensuring(
Effect.log(`[流-资源] 清理所有 ${streamResourceCount} 资源`)
)
);
yield* Stream.runDrain(streamWithResources);
// 示例5:带清理的错误传播
console.log(`
[5] 带清理的错误安全:
`);
const safeRead = (retryCount: number) =>
Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile(`/data/file-${retryCount}.txt`);
if (retryCount < 2) {
yield* Effect.log(`[读取] 尝试 ${retryCount}: 故意失败`);
yield* Effect.fail(new Error(`尝试 ${retryCount} 失败`));
}
yield* Effect.log(`[读取] 尝试 ${retryCount} 成功`);
return "成功";
} finally {
if (handle) {
yield* closeFile(handle);
}
}
});
// 带保证清理的重试
const result2 = yield* safeRead(1).pipe(
Effect.retry(
Schedule.recurs(2).pipe(
Schedule.compose(Schedule.fixed("10 millis"))
)
),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[最终] 所有重试失败: ${error.message}`);
return "回退";
})
)
);
yield* Effect.log(`
[最终] 结果: ${result2}`);
});
Effect.runPromise(program);
原理:
流必须确定性地清理资源:
- 获取/释放:获取资源,使用,返回资源
- Bracket模式:确保成功或失败时清理
- 作用域安全:即使异常也保证清理
- 连接池:重用连接,防止耗尽
- 并发清理:处理并发下的清理
模式:Stream.bracket(),Resource.make(),Scope用于资源安全
没有资源管理的流会导致问题:
问题1:资源耗尽
- 打开文件流不关闭 → 文件描述符限制超限
- 从池获取连接永不返回 → 连接饥饿
- 系统无响应
问题2:内存泄漏
- 流发射大对象 → 内存增长
- 无清理 → 垃圾持久
- GC无法回收
问题3:数据损坏
- 写入文件不刷新 → 崩溃时部分写入
- 读取连接时另一线程写入 → 数据竞争
- 结果不可预测
问题4:静默失败
- 资源清理失败 → 错误丢失
- 应用程序继续如成功
- 隐藏bug变为难以追踪的后续崩溃
解决方案:
Bracket模式:
- 获取资源
- 使用资源(即使错误)
- 总是释放资源
- 单独跟踪错误
资源作用域:
- 嵌套资源管理
- 父清理等待子
- 分层资源图
- 类型安全保证
连接池:
- 重用连接
- 跟踪可用/使用中
- 防止耗尽
- 支持优雅关闭
流模式7:流中的错误处理
规则: 使用流错误处理程序从故障中恢复、重试操作,并在个别元素失败时维护流完整性。
良好示例:
此示例演示流错误处理模式。
import { Effect, Stream, Ref } from "effect";
interface DataRecord {
id: string;
value: number;
}
interface ProcessingResult {
successful: DataRecord[];
failed: Array<{ id: string; error: string }>;
retried: number;
}
const program = Effect.gen(function* () {
console.log(`
[流错误处理] 弹性流处理
`);
// 示例1:继续处理错误(跳过失败,处理其余)
console.log(`[1] 尽管错误继续处理:
`);
const processElement = (record: DataRecord): Effect.Effect<string> =>
Effect.gen(function* () {
if (record.value < 0) {
yield* Effect.fail(new Error(`无效值: ${record.value}`));
}
return `processed-${record.id}`;
});
const records = [
{ id: "rec1", value: 10 },
{ id: "rec2", value: -5 }, // 将失败
{ id: "rec3", value: 20 },
{ id: "rec4", value: -1 }, // 将失败
{ id: "rec5", value: 30 },
];
const successfulProcessing = yield* Stream.fromIterable(records).pipe(
Stream.mapEffect((record) =>
processElement(record).pipe(
Effect.map((result) => ({ success: true, result })),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[错误] 记录 ${record.id} 失败`);
return { success: false, error };
})
)
)
),
Stream.runCollect
);
yield* Effect.log(
`[结果] ${successfulProcessing.filter((r) => r.success).length}/${records.length} 成功
`
);
// 示例2:使用回退值恢复
console.log(`[2] 错误时提供回退值:
`);
const getData = (id: string): Effect.Effect<number> =>
id.includes("fail") ? Effect.fail(new Error("数据错误")) : Effect.succeed(42);
const recovered = yield* Stream.fromIterable(["ok1", "fail1", "ok2"]).pipe(
Stream.mapEffect((id) =>
getData(id).pipe(
Effect.catchAll(() =>
Effect.gen(function* () {
yield* Effect.log(`[回退] 为 ${id} 使用默认值`);
return -1; // 回退值
})
)
)
),
Stream.runCollect
);
yield* Effect.log(`[值] ${recovered.join(", ")}
`);
// 示例3:收集错误与成功
console.log(`[3] 收集错误和成功:
`);
const results = yield* Ref.make<ProcessingResult>({
successful: [],
failed: [],
retried: 0,
});
yield* Stream.fromIterable(records).pipe(
Stream.mapEffect((record) =>
processElement(record).pipe(
Effect.tap((result) =>
Ref.modify(results, (r) => [
undefined,
{
...r,
successful: [...r.successful, record],
},
])
),
Effect.catchAll((error) =>
Ref.modify(results, (r) => [
undefined,
{
...r,
failed: [
...r.failed,
{ id: record.id, error: error.message },
],
},
])
)
)
),
Stream.runDrain
);
const finalResults = yield* Ref.get(results);
yield* Effect.log(
`[聚合] ${finalResults.successful.length} 成功, ${finalResults.failed.length} 失败`
);
for (const failure of finalResults.failed) {
yield* Effect.log(` - ${failure.id}: ${failure.error}`);
}
// 示例4:带退避重试错误
console.log(`
[4] 带指数退避的重试:
`);
let attemptCount = 0;
const unreliableOperation = (id: string): Effect.Effect<string> =>
Effect.gen(function* () {
attemptCount++;
if (attemptCount <= 2) {
yield* Effect.log(`[尝试 ${attemptCount}] 为 ${id} 失败`);
yield* Effect.fail(new Error("临时失败"));
}
yield* Effect.log(`[成功] 尝试 ${attemptCount} 成功`);
return `result-${id}`;
});
const retried = unreliableOperation("test").pipe(
Effect.retry(
Schedule.exponential("10 millis").pipe(
Schedule.upTo("100 millis"),
Schedule.recurs(3)
)
),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[耗尽] 所有重试失败`);
return "回退";
})
)
);
yield* retried;
// 示例5:流中的错误上下文
console.log(`
[5] 传播错误上下文:
`);
interface StreamContext {
batchId: string;
timestamp: Date;
}
const processWithContext = (context: StreamContext) =>
Stream.fromIterable([1, 2, -3, 4]).pipe(
Stream.mapEffect((value) =>
Effect.gen(function* () {
if (value < 0) {
yield* Effect.fail(
new Error(
`批次 ${context.batchId} 中在 ${context.timestamp.toISOString()} 的负值`
)
);
}
return value * 2;
})
),
Stream.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[上下文错误] ${error.message}`);
return Stream.empty;
})
)
);
const context: StreamContext = {
batchId: "batch-001",
timestamp: new Date(),
};
yield* processWithContext(context).pipe(Stream.runDrain);
// 示例6:部分恢复(保留好数据,记录坏数据)
console.log(`
[6] 部分恢复策略:
`);
const mixedQuality = [
{ id: "1", data: "good" },
{ id: "2", data: "bad" },
{ id: "3", data: "good" },
{ id: "4", data: "bad" },
{ id: "5", data: "good" },
];
const processQuality = (record: { id: string; data: string }) =>
record.data === "good"
? Effect.succeed(`valid-${record.id}`)
: Effect.fail(new Error(`${record.id} 的无效数据`));
const partialResults = yield* Stream.fromIterable(mixedQuality).pipe(
Stream.mapEffect((record) =>
processQuality(record).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[记录] ${error.message}`);
return null; // 跳过此记录
})
)
)
),
Stream.filter((result) => result !== null),
Stream.runCollect
);
yield* Effect.log(
`[部分] 保留 ${partialResults.length}/${mixedQuality.length} 有效记录
`
);
// 示例7:流中的超时处理
console.log(`[7] 每个元素的超时处理:
`);
const slowOperation = (id: string): Effect.Effect<string> =>
Effect.gen(function* () {
// 模拟慢操作
if (id === "slow") {
yield* Effect.sleep("200 millis");
} else {
yield* Effect.sleep("50 millis");
}
return `done-${id}`;
});
const withTimeout = yield* Stream.fromIterable(["fast1", "slow", "fast2"]).pipe(
Stream.mapEffect((id) =>
slowOperation(id).pipe(
Effect.timeout("100 millis"),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[超时] 操作 ${id} 超时`);
return "timeout-fallback";
})
)
)
),
Stream.runCollect
);
yield* Effect.log(`[结果] ${withTimeout.join(", ")}
`);
// 示例8:关键错误时流终止
console.log(`[8] 关键错误时终止流:
`);
const isCritical = (error: Error): boolean =>
error.message.includes("关键");
const terminateOnCritical = Stream.fromIterable([1, 2, 3]).pipe(
Stream.mapEffect((value) =>
value === 2
? Effect.fail(new Error("关键: 系统故障"))
: Effect.succeed(value)
),
Stream.catchAll((error) =>
Effect.gen(function* () {
if (isCritical(error)) {
yield* Effect.log(`[关键] 终止流`);
return Stream.fail(error);
}
yield* Effect.log(`[警告] 尽管错误继续`);
return Stream.empty;
})
)
);
yield* terminateOnCritical.pipe(
Stream.runCollect,
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[停止] 流停止: ${error.message}`);
return [];
})
)
);
});
Effect.runPromise(program);
原理:
流错误处理支持弹性:
- 继续处理错误:跳过失败元素,处理其余
- 恢复:提供回退值
- 重试:再次尝试失败元素
- 聚合:收集错误与成功值
- 优雅终止:受控关闭
- 传播:让错误向上游流动
模式:Stream.catchAll(),Stream.retry(),Stream.recover(),Stream.runCollect()
流中错误导致级联故障:
问题1:流死亡
- 处理10,000条记录
- 第5000条有坏数据
- 流崩溃
- 9,000条记录未处理
- 需要手动重新运行
问题2:静默数据丢失
- 流遇到错误
- 停止处理
- 调用者未注意
- 缺失数据未被检测
- 报告错误数字
问题3:无恢复可见性
- 错误发生
- 是否重试?多少次?
- 恢复了吗?
- 需要静默猜测
问题4:下游影响
- 流错误影响所有订阅者
- 级联故障
- 系统不可用
- 所有下游阻塞
解决方案:
继续处理错误:
- 跳过失败元素
- 处理流的其余部分
- 收集错误供后续处理
- 部分成功可接受
带退避重试:
- 临时错误?重试
- 指数退避
- 最终放弃
- 移至下一元素
错误聚合:
- 收集所有错误
- 收集所有成功
- 报告两者
- 分析/调试
优雅终止:
- 错误时发出流结束信号
- 允许清理
- 防止资源泄漏
- 受控关闭
流模式8:高级流转换
规则: 使用高级流运算符构建复杂数据管道,优雅组合并在规模上保持性能。
良好示例:
此示例演示高级流转换。
import { Effect, Stream, Ref, Chunk } from "effect";
interface LogEntry {
timestamp: Date;
level: "info" | "warn" | "error";
message: string;
context?: Record<string, unknown>;
}
interface Metric {
name: string;
value: number;
tags: Record<string, string>;
}
const program = Effect.gen(function* () {
console.log(`
[高级流转换] 复杂数据流
`);
// 示例1:自定义过滤器运算符
console.log(`[1] 带effect逻辑的自定义过滤器:
`);
const filterByEffect = <A,>(
predicate: (a: A) => Effect.Effect<boolean>
) =>
(stream: Stream.Stream<A>) =>
stream.pipe(
Stream.mapEffect((value) =>
predicate(value).pipe(
Effect.map((keep) => (keep ? value : null))
)
),
Stream.filter((value) => value !== null)
);
const isValid = (num: number): Effect.Effect<boolean> =>
Effect.gen(function* () {
// 模拟验证effect(例如,API调用)
return num > 0 && num < 100;
});
const numbers = [50, 150, 25, -10, 75];
const validNumbers = yield* Stream.fromIterable(numbers).pipe(
filterByEffect(isValid),
Stream.runCollect
);
yield* Effect.log(`[有效] ${validNumbers.join(", ")}
`);
// 示例2:丰富转换
console.log(`[2] 使用额外数据丰富记录:
`);
interface RawRecord {
id: string;
value: number;
}
interface EnrichedRecord {
id: string;
value: number;
validated: boolean;
processed: Date;
metadata: Record<string, unknown>;
}
const enrich = (record: RawRecord): Effect.Effect<EnrichedRecord> =>
Effect.gen(function* () {
// 模拟查找/验证
const validated = record.value > 0;
return {
id: record.id,
value: record.value,
validated,
processed: new Date(),
metadata: { source: "stream" },
};
});
const rawData = [
{ id: "r1", value: 10 },
{ id: "r2", value: -5 },
{ id: "r3", value: 20 },
];
const enriched = yield* Stream.fromIterable(rawData).pipe(
Stream.mapEffect((record) => enrich(record)),
Stream.runCollect
);
yield* Effect.log(`[丰富] ${enriched.length} 条记录已丰富
`);
// 示例3:解复用(将一个流拆分为多个)
console.log(`[3] 按类别解复用:
`);
interface Event {
id: string;
type: "click" | "view" | "purchase";
data: unknown;
}
const events: Event[] = [
{ id: "e1", type: "click", data: { x: 100, y: 200 } },
{ id: "e2", type: "view", data: { url: "/" } },
{ id: "e3", type: "purchase", data: { amount: 99.99 } },
{ id: "e4", type: "click", data: { x: 50, y: 100 } },
];
const clicks = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "click"),
Stream.runCollect
);
const views = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "view"),
Stream.runCollect
);
const purchases = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "purchase"),
Stream.runCollect
);
yield* Effect.log(
`[解复用] 点击: ${clicks.length}, 查看: ${views.length}, 购买: ${purchases.length}
`
);
// 示例4:分块处理(批量转换)
console.log(`[4] 分块处理(N个批次):
`);
const processChunk = (chunk: Array<{ id: string; value: number }>) =>
Effect.gen(function* () {
const sum = chunk.reduce((s, r) => s + r.value, 0);
const avg = sum / chunk.length;
yield* Effect.log(
`[块] ${chunk.length} 项, 平均: ${avg.toFixed(2)}`
);
return { size: chunk.length, sum, avg };
});
const data = Array.from({ length: 10 }, (_, i) => ({
id: `d${i}`,
value: i + 1,
}));
const chunkSize = 3;
const chunks = [];
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
chunks.push(chunk);
}
const chunkResults = yield* Effect.all(
chunks.map((chunk) => processChunk(chunk))
);
yield* Effect.log(
`[块] 处理了 ${chunkResults.length} 个批次
`
);
// 示例5:多阶段转换管道
console.log(`[5] 多阶段管道(解析 → 验证 → 转换):
`);
const rawStrings = ["10", "twenty", "30", "-5", "50"];
// 阶段1:解析
const parsed = yield* Stream.fromIterable(rawStrings).pipe(
Stream.mapEffect((s) =>
Effect.gen(function* () {
try {
return parseInt(s);
} catch (error) {
yield* Effect.fail(
new Error(`解析失败: ${s}`)
);
}
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[解析错误] ${error.message}`);
return null;
})
)
)
),
Stream.filter((n) => n !== null),
Stream.runCollect
);
yield* Effect.log(`[阶段1] 解析: ${parsed.join(", ")}`);
// 阶段2:验证
const validated = parsed.filter((n) => n > 0);
yield* Effect.log(`[阶段2] 验证: ${validated.join(", ")}`);
// 阶段3:转换
const transformed = validated.map((n) => n * 2);
yield* Effect.log(`[阶段3] 转换: ${transformed.join(", ")}
`);
// 示例6:自定义运算符组合
console.log(`[6] 可组合转换管道:
`);
// 定义自定义运算符
const withLogging = <A,>(label: string) =>
(stream: Stream.Stream<A>) =>
stream.pipe(
Stream.tap((value) =>
Effect.log(`[${label}] 处理: ${JSON.stringify(value)}`)
)
);
const filterPositive = (stream: Stream.Stream<number>) =>
stream.pipe(
Stream.filter((n) => n > 0),
Stream.tap(() => Effect.log(`[过滤] 保留正数`))
);
const scaleUp = (factor: number) =>
(stream: Stream.Stream<number>) =>
stream.pipe(
Stream.map((n) => n * factor),
Stream.tap((n) =>
Effect.log(`[缩放] 缩放到 ${n}`)
)
);
const testData = [10, -5, 20, -3, 30];
const pipeline = yield* Stream.fromIterable(testData).pipe(
withLogging("输入"),
filterPositive,
scaleUp(10),
Stream.runCollect
);
yield* Effect.log(`[结果] 最终: ${pipeline.join(", ")}
`);
// 示例7:状态转换
console.log(`[7] 状态转换(运行总计):
`);
const runningTotal = yield* Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.scan(0, (acc, value) => acc + value),
Stream.runCollect
);
yield* Effect.log(`[总计] ${runningTotal.join(", ")}
`);
// 示例8:条件转换
console.log(`[8] 条件转换(不同路径):
`);
interface Item {
id: string;
priority: "high" | "normal" | "low";
}
const transformByPriority = (item: Item): Effect.Effect<{
id: string;
processed: string;
}> =>
Effect.gen(function* () {
switch (item.priority) {
case "high":
yield* Effect.log(`[高] 为 ${item.id} 优先处理`);
return { id: item.id, processed: "紧急" };
case "normal":
yield* Effect.log(
`[正常] 为 ${item.id} 标准处理`
);
return { id: item.id, processed: "标准" };
case "low":
yield* Effect.log(`[低] 为 ${item.id} 延迟处理`);
return { id: item.id, processed: "延迟" };
}
});
const items: Item[] = [
{ id: "i1", priority: "normal" },
{ id: "i2", priority: "high" },
{ id: "i3", priority: "low" },
];
const processed = yield* Stream.fromIterable(items).pipe(
Stream.mapEffect((item) => transformByPriority(item)),
Stream.runCollect
);
yield* Effect.log(
`[条件] 处理了 ${processed.length} 项
`
);
// 示例9:性能优化转换
console.log(`[9] 性能优化:
`);
const largeDataset = Array.from({ length: 1000 }, (_, i) => i);
const startTime = Date.now();
// 使用高效运算符
const result = yield* Stream.fromIterable(largeDataset).pipe(
Stream.filter((n) => n % 2 === 0), // 保留偶数
Stream.take(100), // 限制为前100
Stream.map((n) => n * 2), // 转换
Stream.runCollect
);
const elapsed = Date.now() - startTime;
yield* Effect.log(
`[性能] 在 ${elapsed}毫秒内处理了1000项, 保留了 ${result.length} 项`
);
});
Effect.runPromise(program);
原理:
高级转换支持复杂数据流:
- 自定义运算符:构建可重用转换
- 基于effect:带副作用的转换
- 惰性评估:仅计算所需内容
- 融合:优化组合操作
- 分层:多个转换层
- 组合:清洁组合运算符
模式:Stream.mapEffect(),Stream.map(),管道组合
简单转换不扩展:
问题1:性能退化
- 每层创建中间集合
- 10个转换 = 10个分配
- 处理1M项 = 10M分配
- GC压力,内存耗尽
问题2:复杂逻辑分散
- 验证在这里,丰富在那里,过滤在别处
- 难以维护
- 更改破坏其他部分
- 无清晰数据流
问题3:effect处理
- 转换需要副作用
- 网络调用,数据库查询
- 朴素方法:全部加载,顺序转换
- 慢,低效
问题4:可重用性
- 自定义转换仅用一次
- 下次从头重写
- 代码重复
- 错误复制
解决方案:
自定义运算符:
- 封装转换逻辑
- 跨项目可重用
- 可隔离测试
- 可组合
惰性评估:
- 按元素流计算
- 无中间集合
- 恒定内存
- 仅计算所用内容
融合:
- 组合多个map/filter
- 单次数据通过
- 无中间集合
- 编译器/库优化
Effect组合:
- 自然链式effect
- 自动错误传播
- 保证资源清理
- 可读代码