Effect-TS流模式Skill effect-patterns-streams

此技能提供了Effect-TS中处理数据流的8个精选模式,涵盖转换、合并、背压控制、状态操作、分组窗口、资源管理、错误处理和高级变换。适用于构建高效、可维护的数据管道,用于流处理、异步编程和函数式编程场景。关键词:Effect-TS、流处理、数据管道、异步编程、函数式编程、TypeScript、背压、错误处理、资源管理。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

name: effect-patterns-streams description: Effect-TS流模式。在Effect-TS应用程序中处理流时使用。

Effect-TS流模式

此技能提供了8个精选的Effect-TS流模式。 在处理以下相关任务时使用此技能:

  • Effect-TS应用程序中的最佳实践
  • 真实世界的模式和解决方案

🟢 初级模式

流模式1:使用Map和Filter转换流

规则: 使用map和filter组合器声明性地转换流元素,创建管道以重塑数据而无需物化中间结果。

良好示例:

此示例演示通过多个阶段转换原始数据流。

import { Stream, Effect, Chunk } from "effect";

interface RawLogEntry {
  readonly timestamp: string;
  readonly level: string;
  readonly message: string;
}

interface ProcessedLog {
  readonly date: Date;
  readonly severity: "low" | "medium" | "high";
  readonly normalizedMessage: string;
}

// 创建原始日志条目流
const createLogStream = (): Stream.Stream<RawLogEntry> =>
  Stream.fromIterable([
    { timestamp: "2025-12-17T09:00:00Z", level: "DEBUG", message: "App starting" },
    { timestamp: "2025-12-17T09:01:00Z", level: "INFO", message: "Connected to DB" },
    { timestamp: "2025-12-17T09:02:00Z", level: "ERROR", message: "Query timeout" },
    { timestamp: "2025-12-17T09:03:00Z", level: "DEBUG", message: "Retry initiated" },
    { timestamp: "2025-12-17T09:04:00Z", level: "WARN", message: "Connection degraded" },
    { timestamp: "2025-12-17T09:05:00Z", level: "INFO", message: "Recovered" },
  ]);

// 转换:解析时间戳
const parseTimestamp = (entry: RawLogEntry): RawLogEntry => ({
  ...entry,
  timestamp: entry.timestamp, // 已经是ISO格式,但可以在这里解析
});

// 转换:将日志级别映射到严重性
const mapSeverity = (level: string): "low" | "medium" | "high" => {
  if (level === "DEBUG" || level === "INFO") return "low";
  if (level === "WARN") return "medium";
  return "high";
};

// 转换:规范化消息
const normalizeMessage = (message: string): string =>
  message.toLowerCase().trim();

// 过滤:仅保留重要日志
const isImportant = (entry: RawLogEntry): boolean => {
  return entry.level !== "DEBUG";
};

// 主管道
const program = Effect.gen(function* () {
  console.log(`
[流] 使用map/filter处理日志流
`);

  // 创建并转换流
  const transformedStream = createLogStream().pipe(
    // 过滤:仅保留非调试日志
    Stream.filter((entry) => {
      const important = isImportant(entry);
      console.log(
        `[过滤] ${entry.level} → ${important ? "✓ 保留" : "✗ 过滤掉"}`
      );
      return important;
    }),

    // 映射:提取日期
    Stream.map((entry) => {
      const date = new Date(entry.timestamp);
      console.log(`[映射-1] 解析日期: ${date.toISOString()}`);
      return { ...entry, parsedDate: date };
    }),

    // 映射:规范化和映射严重性
    Stream.map((entry) => {
      const processed: ProcessedLog = {
        date: entry.parsedDate,
        severity: mapSeverity(entry.level),
        normalizedMessage: normalizeMessage(entry.message),
      };
      console.log(
        `[映射-2] 转换: ${entry.level} → ${processed.severity}`
      );
      return processed;
    })
  );

  // 收集所有转换后的日志
  const results = yield* transformedStream.pipe(
    Stream.runCollect
  );

  console.log(`
[结果]`);
  console.log(`  总日志数: ${results.length}`);

  Chunk.forEach(results, (log) => {
    console.log(
      `  - [${log.severity.toUpperCase()}] ${log.date.toISOString()}: ${log.normalizedMessage}`
    );
  });
});

Effect.runPromise(program);

输出显示惰性评估和过滤:

[流] 使用map/filter处理日志流

[过滤] DEBUG → ✗ 过滤掉
[过滤] INFO → ✓ 保留
[映射-1] 解析日期: 2025-12-17T09:01:00.000Z
[映射-2] 转换: INFO → low
[过滤] ERROR → ✓ 保留
[映射-1] 解析日期: 2025-12-17T09:02:00.000Z
[映射-2] 转换: ERROR → high
...

[结果]
  总日志数: 5
  - [LOW] 2025-12-17T09:01:00.000Z: connected to db
  - [HIGH] 2025-12-17T09:02:00.000Z: query timeout
  ...

原理:

使用Stream.mapStream.filter转换流:

  • map:转换每个元素(1→1)
  • filter:保留匹配谓词的元素(N→N,丢弃一些)
  • 链式:组合多个操作
  • 惰性:按需转换元素(无缓冲)

模式:stream.pipe(Stream.map(...), Stream.filter(...))


没有map/filter的流数据转换会带来问题:

  • 缓冲:必须在转换前收集所有数据
  • 代码冗长:为每个转换手动循环
  • 内存使用:大型中间数组
  • 可组合性:难以链式操作

Map/filter支持:

  • 惰性评估:按需转换
  • 可组合:自然链式操作
  • 内存高效:无中间集合
  • 表达性强:清晰声明意图

真实世界示例:处理日志

  • 没有map/filter:收集日志,按级别过滤,映射到对象,转换字段
  • 使用map/filterlogStream.pipe(Stream.filter(...), Stream.map(...))


🟡 中级模式

流模式2:合并和组合多个流

规则: 使用merge和concat组合器组合多个流,支持从多个独立源聚合数据。

良好示例:

此示例演示将多个事件流合并为统一流。

import { Stream, Effect, Chunk } from "effect";

interface Event {
  readonly source: string;
  readonly type: string;
  readonly data: string;
  readonly timestamp: Date;
}

// 从不同源创建独立事件流
const createUserEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "user-service", type: "login", data: "user-123", timestamp: new Date(Date.now() + 0) },
    { source: "user-service", type: "logout", data: "user-123", timestamp: new Date(Date.now() + 500) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("500 millis"))
  );

const createPaymentEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "payment-service", type: "payment-started", data: "order-456", timestamp: new Date(Date.now() + 200) },
    { source: "payment-service", type: "payment-completed", data: "order-456", timestamp: new Date(Date.now() + 800) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("600 millis"))
  );

const createAuditEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "audit-log", type: "access-granted", data: "resource-789", timestamp: new Date(Date.now() + 100) },
    { source: "audit-log", type: "access-revoked", data: "resource-789", timestamp: new Date(Date.now() + 900) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("800 millis"))
  );

// 合并流(交错,无序)
const mergedEventStream = (): Stream.Stream<Event> => {
  const userStream = createUserEventStream();
  const paymentStream = createPaymentEventStream();
  const auditStream = createAuditEventStream();

  return Stream.merge(userStream, paymentStream, auditStream);
};

// 连接流(顺序,有序)
const concatenatedEventStream = (): Stream.Stream<Event> => {
  return createUserEventStream().pipe(
    Stream.concat(createPaymentEventStream()),
    Stream.concat(createAuditEventStream())
  );
};

// 主程序:比较merge与concat
const program = Effect.gen(function* () {
  console.log(`
[合并] 从多个源交错事件:
`);

  // 收集合并流
  const mergedEvents = yield* mergedEventStream().pipe(
    Stream.runCollect
  );

  Chunk.forEach(mergedEvents, (event, idx) => {
    console.log(
      `  ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
    );
  });

  console.log(`
[连接] 顺序事件(用户 → 支付 → 审计):
`);

  // 收集连接流
  const concatEvents = yield* concatenatedEventStream().pipe(
    Stream.runCollect
  );

  Chunk.forEach(concatEvents, (event, idx) => {
    console.log(
      `  ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
    );
  });
});

Effect.runPromise(program);

输出显示合并交错与连接顺序:

[合并] 从多个源交错事件:

  1. [audit-log] access-granted: resource-789
  2. [user-service] login: user-123
  3. [payment-service] payment-started: order-456
  4. [user-service] logout: user-123
  5. [payment-service] payment-completed: order-456
  6. [audit-log] access-revoked: resource-789

[连接] 顺序事件(用户 → 支付 → 审计):

  1. [user-service] login: user-123
  2. [user-service] logout: user-123
  3. [payment-service] payment-started: order-456
  4. [payment-service] payment-completed: order-456
  5. [audit-log] access-granted: resource-789
  6. [audit-log] access-revoked: resource-789

原理:

使用以下方式组合多个流:

  • merge:从多个流交错元素(无序)
  • concat:顺序链式流(有序,等待第一个完成)
  • mergeAll:合并流集合
  • zip:组合多个流的对应元素

模式:Stream.merge(stream1, stream2)stream1.pipe(Stream.concat(stream2))


没有merge/concat的多源数据处理会带来问题:

  • 复杂协调:手动循环多个源
  • 难以聚合:从不同源收集冗长
  • 顺序混淆:顺序与并行不明确
  • 资源管理:多个独立消费者

Merge/concat支持:

  • 简单组合:自然组合流
  • 语义清晰:Merge = 并行,concat = 顺序
  • 聚合:单一消费者处理多源
  • 可扩展性:添加源无需重构

真实世界示例:聚合用户事件

  • 没有merge:轮询用户服务、事件日志、通知分开处理
  • 使用mergeStream.merge(userStream, eventStream, notificationStream)


流模式3:控制流中的背压

规则: 使用背压控制来管理快速生产者和慢速消费者之间的流,防止内存耗尽和资源溢出。

良好示例:

此示例演示以不同速率消费事件时的背压管理。

import { Stream, Effect, Chunk } from "effect";

interface DataPoint {
  readonly id: number;
  readonly value: number;
}

// 快速生产者:每秒生成100个项目
const fastProducer = (): Stream.Stream<DataPoint> =>
  Stream.fromIterable(Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }))).pipe(
    Stream.tap(() => Effect.sleep("10 millis")) // 每个项目10毫秒 = 100/秒
  );

// 慢速消费者:每秒处理10个项目
const slowConsumer = (item: DataPoint): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.sleep("100 millis"); // 每个项目100毫秒 = 10/秒
  });

// 无背压(危险 - 队列无界增长)
const unbufferedStream = (): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    Stream.tap((item) =>
      Effect.log(`[无缓冲] 产生项目 ${item.id}`)
    )
  );

// 有界缓冲(背压生效)
const bufferedStream = (bufferSize: number): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    // 最多缓冲10个项目;如果满,生产者等待
    Stream.buffer(bufferSize),
    Stream.tap((item) =>
      Effect.log(`[缓冲] 消费项目 ${item.id}`)
    )
  );

// 节流(限速发射)
const throttledStream = (): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    // 最多每秒20个项目(每50毫秒1个)
    Stream.throttle(1, "50 millis"),
    Stream.tap((item) =>
      Effect.log(`[节流] 项目 ${item.id}`)
    )
  );

// 主程序:比较方法
const program = Effect.gen(function* () {
  console.log(`
[开始] 演示背压管理
`);

  // 测试缓冲方法
  console.log(`[测试1] 缓冲流(缓冲区大小5):
`);

  const startBuffer = Date.now();

  yield* bufferedStream(5).pipe(
    Stream.take(20), // 仅取20个项目
    Stream.runForEach(slowConsumer)
  );

  const bufferTime = Date.now() - startBuffer;
  console.log(`
[结果] 缓冲方法耗时 ${bufferTime}毫秒
`);

  // 测试节流方法
  console.log(`[测试2] 节流流(每50毫秒1个项目):
`);

  const startThrottle = Date.now();

  yield* throttledStream().pipe(
    Stream.take(20),
    Stream.runForEach(slowConsumer)
  );

  const throttleTime = Date.now() - startThrottle;
  console.log(`
[结果] 节流方法耗时 ${throttleTime}毫秒
`);

  // 总结
  console.log(`[总结]`);
  console.log(`  无背压控制:`);
  console.log(`    - 队列会增长到100个项目(内存风险)`);
  console.log(`    - 生产者/消费者独立运行`);
  console.log(`  有缓冲:`);
  console.log(`    - 队列限制为5个项目(安全)`);
  console.log(`    - 缓冲区满时生产者等待`);
  console.log(`  有节流:`);
  console.log(`    - 生产速率限制为每秒20个`);
  console.log(`    - 平滑可控流`);
});

Effect.runPromise(program);

原理:

背压是流控制:慢速消费者告诉快速生产者减速。

技术:

  • 缓冲:临时存储项目(有限队列)
  • 节流:限速项目发射
  • 分块:以固定大小批次处理
  • 防抖:跳过快速重复项

模式:stream.pipe(Stream.throttle(...), Stream.buffer(...))


没有背压管理,生产者和消费者速度不匹配会导致:

  • 内存耗尽:生产者快于消费者 → 队列无界增长
  • 垃圾收集暂停:大缓冲区导致GC压力
  • 资源泄漏:打开连接/文件句柄累积
  • 级联故障:一个慢消费者阻塞整个管道

背压支持:

  • 内存安全:有界缓冲区防止溢出
  • 资源效率:消费者自然调整生产者节奏
  • 性能:调整缓冲区大小提高吞吐量
  • 可观察性:监控背压作为健康指标

真实世界示例:读取大文件与写入数据库

  • 无背压:将整个文件读入内存,慢速写入 → 内存耗尽
  • 有背压:读取1000行,等待数据库,读取下一批


流模式4:使用Scan和Fold进行状态操作

规则: 使用scan进行逐元素状态处理,使用fold进行最终聚合,支持无需缓冲整个流的复杂流分析。

良好示例:

此示例演示维护测量流的统计信息。

import { Stream, Effect, Chunk } from "effect";

interface Measurement {
  readonly id: number;
  readonly value: number;
  readonly timestamp: Date;
}

interface RunningStats {
  readonly count: number;
  readonly sum: number;
  readonly min: number;
  readonly max: number;
  readonly average: number;
  readonly variance: number;
  readonly lastValue: number;
}

// 创建测量流
const createMeasurementStream = (): Stream.Stream<Measurement> =>
  Stream.fromIterable([
    { id: 1, value: 10, timestamp: new Date() },
    { id: 2, value: 20, timestamp: new Date() },
    { id: 3, value: 15, timestamp: new Date() },
    { id: 4, value: 25, timestamp: new Date() },
    { id: 5, value: 30, timestamp: new Date() },
    { id: 6, value: 22, timestamp: new Date() },
  ]);

// 初始统计状态
const initialStats: RunningStats = {
  count: 0,
  sum: 0,
  min: Infinity,
  max: -Infinity,
  average: 0,
  variance: 0,
  lastValue: 0,
};

// 归约器:为每个测量更新统计
const updateStats = (
  stats: RunningStats,
  measurement: Measurement
): RunningStats => {
  const newCount = stats.count + 1;
  const newSum = stats.sum + measurement.value;
  const newAverage = newSum / newCount;

  // 增量计算方差
  const delta = measurement.value - stats.average;
  const delta2 = measurement.value - newAverage;
  const newVariance = stats.variance + delta * delta2;

  return {
    count: newCount,
    sum: newSum,
    min: Math.min(stats.min, measurement.value),
    max: Math.max(stats.max, measurement.value),
    average: newAverage,
    variance: newVariance / newCount,
    lastValue: measurement.value,
  };
};

// 主程序:演示scan与统计
const program = Effect.gen(function* () {
  console.log(`
[scan] 运行统计流:
`);

  // 使用scan发射中间统计
  const statsStream = createMeasurementStream().pipe(
    Stream.scan(initialStats, (stats, measurement) => {
      const newStats = updateStats(stats, measurement);

      console.log(
        `[测量 ${measurement.id}] 值: ${measurement.value}`
      );
      console.log(
        `  计数: ${newStats.count}, 平均: ${newStats.average.toFixed(2)}, ` +
        `最小: ${newStats.min}, 最大: ${newStats.max}, ` +
        `方差: ${newStats.variance.toFixed(2)}`
      );

      return newStats;
    })
  );

  // 收集所有中间统计
  const allStats = yield* statsStream.pipe(Stream.runCollect);

  // 最终统计
  const finalStats = Chunk.last(allStats);

  if (finalStats._tag === "Some") {
    console.log(`
[最终统计]`);
    console.log(`  总测量数: ${finalStats.value.count}`);
    console.log(`  平均值: ${finalStats.value.average.toFixed(2)}`);
    console.log(`  最小值: ${finalStats.value.min}`);
    console.log(`  最大值: ${finalStats.value.max}`);
    console.log(
      `  标准差: ${Math.sqrt(finalStats.value.variance).toFixed(2)}`
    );
  }

  // 与fold比较(仅发射最终结果)
  console.log(`
[fold] 仅最终统计:
`);

  const finalResult = yield* createMeasurementStream().pipe(
    Stream.fold(initialStats, updateStats),
    Stream.tap((stats) =>
      Effect.log(`最终: 计数=${stats.count}, 平均=${stats.average.toFixed(2)}`)
    )
  );
});

Effect.runPromise(program);

原理:

状态流操作:

  • scan:使用累加器应用函数,发射中间状态
  • fold:使用累加器应用函数,仅发射最终结果
  • reduce:类似fold但需要非空流

模式:stream.pipe(Stream.scan(initialState, reducer))Stream.fold(initialState, reducer)


没有scan/fold处理流会带来问题:

  • 手动状态跟踪:流外部的Ref或可变变量
  • 丢失上下文:难以关联中间值
  • 易错:容易忘记状态更新
  • 测试困难:状态散布在代码中

Scan/fold支持:

  • 声明式状态:状态通过流传递
  • 中间值:每步发射状态(scan)
  • 类型安全:累加器类型保证
  • 可组合:链式状态操作

真实世界示例:指标运行平均

  • 没有scan:手动跟踪计数和总和,计算平均,发射
  • 使用scanstream.pipe(Stream.scan(initialState, updateAverage))


🟠 高级模式

流模式5:流的分组和窗口

规则: 使用groupBy按键分区流,使用滚动/滑动窗口在时间窗口上聚合流。

良好示例:

此示例演示窗口和分组模式。

import { Effect, Stream, Ref, Duration, Schedule } from "effect";

interface Event {
  readonly timestamp: Date;
  readonly userId: string;
  readonly action: string;
  readonly duration: number; // 毫秒
}

// 模拟事件流
const generateEvents = (): Event[] => [
  { timestamp: new Date(Date.now() - 5000), userId: "user1", action: "click", duration: 100 },
  { timestamp: new Date(Date.now() - 4500), userId: "user2", action: "view", duration: 250 },
  { timestamp: new Date(Date.now() - 4000), userId: "user1", action: "scroll", duration: 150 },
  { timestamp: new Date(Date.now() - 3500), userId: "user3", action: "click", duration: 120 },
  { timestamp: new Date(Date.now() - 3000), userId: "user2", action: "click", duration: 180 },
  { timestamp: new Date(Date.now() - 2500), userId: "user1", action: "view", duration: 200 },
  { timestamp: new Date(Date.now() - 2000), userId: "user3", action: "view", duration: 300 },
  { timestamp: new Date(Date.now() - 1500), userId: "user1", action: "submit", duration: 500 },
  { timestamp: new Date(Date.now() - 1000), userId: "user2", action: "scroll", duration: 100 },
];

// 主程序:窗口和分组示例
const program = Effect.gen(function* () {
  console.log(`
[窗口与分组] 流组织模式
`);

  const events = generateEvents();

  // 示例1:滚动窗口(固定大小批次)
  console.log(`[1] 滚动窗口(2事件批次):
`);

  const windowSize = 2;
  let batchNumber = 1;

  for (let i = 0; i < events.length; i += windowSize) {
    const batch = events.slice(i, i + windowSize);

    yield* Effect.log(`[窗口 ${batchNumber}] (${batch.length} 事件)`);

    let totalDuration = 0;

    for (const event of batch) {
      yield* Effect.log(
        `  - ${event.userId}: ${event.action} (${event.duration}毫秒)`
      );

      totalDuration += event.duration;
    }

    yield* Effect.log(`[窗口 ${batchNumber}] 总时长: ${totalDuration}毫秒
`);

    batchNumber++;
  }

  // 示例2:滑动窗口(重叠)
  console.log(`[2] 滑动窗口(最后3事件,每次滑动1):
`);

  const windowSizeSlide = 3;
  const slideBy = 1;

  for (let i = 0; i <= events.length - windowSizeSlide; i += slideBy) {
    const window = events.slice(i, i + windowSizeSlide);

    const avgDuration =
      window.reduce((sum, e) => sum + e.duration, 0) / window.length;

    yield* Effect.log(
      `[滑动 ${i / slideBy}] ${window.length} 事件, 平均时长: ${avgDuration.toFixed(0)}毫秒`
    );
  }

  // 示例3:按键分组
  console.log(`
[3] 按用户分组:
`);

  const byUser = new Map<string, Event[]>();

  for (const event of events) {
    if (!byUser.has(event.userId)) {
      byUser.set(event.userId, []);
    }

    byUser.get(event.userId)!.push(event);
  }

  for (const [userId, userEvents] of byUser) {
    const totalActions = userEvents.length;
    const totalTime = userEvents.reduce((sum, e) => sum + e.duration, 0);
    const avgTime = totalTime / totalActions;

    yield* Effect.log(
      `[用户 ${userId}] ${totalActions} 动作, ${totalTime}毫秒总计, ${avgTime.toFixed(0)}毫秒平均`
    );
  }

  // 示例4:分组 + 窗口组合
  console.log(`
[4] 按用户分组,按动作类型窗口:
`);

  for (const [userId, userEvents] of byUser) {
    const byAction = new Map<string, Event[]>();

    for (const event of userEvents) {
      if (!byAction.has(event.action)) {
        byAction.set(event.action, []);
      }

      byAction.get(event.action)!.push(event);
    }

    yield* Effect.log(`[用户 ${userId}] 动作细分:`);

    for (const [action, actionEvents] of byAction) {
      const count = actionEvents.length;
      const total = actionEvents.reduce((sum, e) => sum + e.duration, 0);

      yield* Effect.log(`  ${action}: ${count}次 (${total}毫秒总计)`);
    }
  }

  // 示例5:会话窗口(基于不活动超时)
  console.log(`
[5] 会话窗口(间隔 > 1000毫秒 = 新会话):
`);

  const sessionGapMs = 1000;
  const sessions: Event[][] = [];
  let currentSession: Event[] = [];
  let lastTimestamp = events[0]?.timestamp.getTime() ?? 0;

  for (const event of events) {
    const currentTime = event.timestamp.getTime();
    const timeSinceLastEvent = currentTime - lastTimestamp;

    if (timeSinceLastEvent > sessionGapMs && currentSession.length > 0) {
      sessions.push(currentSession);
      yield* Effect.log(
        `[会话] 关闭 (${currentSession.length} 事件, 间隔: ${timeSinceLastEvent}毫秒)`
      );

      currentSession = [];
    }

    currentSession.push(event);
    lastTimestamp = currentTime;
  }

  if (currentSession.length > 0) {
    sessions.push(currentSession);
    yield* Effect.log(`[会话] 最终 (${currentSession.length} 事件)`);
  }

  // 示例6:窗口中Top-K聚合
  console.log(`
[6] 最后窗口中前2动作:
`);

  const lastWindow = events.slice(-3);

  const actionCounts = new Map<string, number>();

  for (const event of lastWindow) {
    actionCounts.set(
      event.action,
      (actionCounts.get(event.action) ?? 0) + 1
    );
  }

  const topActions = Array.from(actionCounts.entries())
    .sort((a, b) => b[1] - a[1])
    .slice(0, 2);

  yield* Effect.log(`[Top-K] 在最后3事件窗口中:`);

  for (const [action, count] of topActions) {
    yield* Effect.log(`  ${action}: ${count}次`);
  }
});

Effect.runPromise(program);

原理:

窗口将无界流组织为有界块:

  • 滚动窗口:固定大小非重叠(例如,1秒窗口)
  • 滑动窗口:重叠窗口(例如,10秒窗口,5秒跳步)
  • 按键分组:按字段值分区流
  • 会话窗口:基于事件的窗口(例如,空闲超时)
  • 批处理聚合:处理N个项目或等待T秒

模式:Stream.groupBy(),使用RefSchedule自定义窗口


无界流需要边界:

问题1:内存耗尽

  • 处理1M事件无边界 = 全部保存在内存
  • 累积内存无界增长
  • 最终OOM错误

问题2:延迟聚合

  • 流结束前无法求和所有事件(永不)
  • 需要决定:“在此1秒窗口内求和事件”

问题3:分组复杂性

  • 用户事件流:需要按用户聚合
  • 没有groupBy:手动状态跟踪(易错)

问题4:时间模式

  • “最后5分钟前10搜索”需要窗口
  • “每分钟每个端点平均响应时间”需要分组 + 窗口

解决方案:

滚动窗口

  • 将流划分为1秒、5秒或1分钟块
  • 独立处理每个块
  • 窗口间清除内存
  • 适用于:指标、批处理、报告

滑动窗口

  • 始终保留最后5分钟数据
  • 每秒发射更新聚合
  • 检测重叠期间模式
  • 适用于:异常检测、趋势

按键分组

  • 按键分离流
  • 每个键有独立状态
  • 发射分组结果
  • 适用于:按用户、按端点、按租户


流模式6:流中的资源管理

规则: 使用Stream.bracket或effect作用域保证资源清理,即使流失败或中断也能防止泄漏。

良好示例:

此示例演示资源获取、使用和保证清理。

import { Effect, Stream, Resource, Scope, Ref } from "effect";

interface FileHandle {
  readonly path: string;
  readonly fd: number;
}

interface Connection {
  readonly id: string;
  readonly isOpen: boolean;
}

// 模拟资源管理
const program = Effect.gen(function* () {
  console.log(`
[资源管理] 流资源生命周期
`);

  // 示例1:文件流的Bracket模式
  console.log(`[1] Bracket模式(获取 → 使用 → 释放):
`);

  let openHandles = 0;
  let closedHandles = 0;

  const openFile = (path: string) =>
    Effect.gen(function* () {
      openHandles++;
      yield* Effect.log(`[打开] 文件 "${path}" (总计打开: ${openHandles})`);

      return { path, fd: 1000 + openHandles };
    });

  const closeFile = (handle: FileHandle) =>
    Effect.gen(function* () {
      closedHandles++;
      yield* Effect.log(`[关闭] 文件 "${handle.path}" (总计关闭: ${closedHandles})`);
    });

  const readFileWithBracket = (path: string) =>
    Effect.gen(function* () {
      let handle: FileHandle | null = null;

      try {
        handle = yield* openFile(path);

        yield* Effect.log(
          `[使用] 从fd ${handle.fd}读取 ("${handle.path}")`
        );

        // 模拟读取
        return "文件内容";
      } finally {
        // 即使上方出错也保证运行
        if (handle) {
          yield* closeFile(handle);
        }
      }
    });

  // 测试成功情况
  yield* Effect.log(`[测试] 成功案例:`);

  const content = yield* readFileWithBracket("/data/file.txt");

  yield* Effect.log(`[结果] 获取: "${content}"
`);

  // 测试失败情况(模拟)
  yield* Effect.log(`[测试] 错误案例:`);

  const failCase = Effect.gen(function* () {
    let handle: FileHandle | null = null;

    try {
      handle = yield* openFile("/data/missing.txt");

      // 模拟操作中错误
      yield* Effect.fail(new Error("读取失败"));
    } finally {
      if (handle) {
        yield* closeFile(handle);
      }
    }
  }).pipe(
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[错误] 捕获: ${error.message}`);
        yield* Effect.log(`[检查] 关闭句柄: ${closedHandles} (验证清理)
`);
      })
    )
  );

  yield* failCase;

  // 示例2:连接池管理
  console.log(`[2] 连接池管理:
`);

  interface ConnectionPool {
    acquire: () => Effect.Effect<Connection>;
    release: (conn: Connection) => Effect.Effect<void>;
  }

  const createConnectionPool = (maxSize: number): Effect.Effect<ConnectionPool> =>
    Effect.gen(function* () {
      const available = yield* Ref.make<Connection[]>([]);
      const inUse = yield* Ref.make<Set<string>>(new Set());
      let idCounter = 0;

      return {
        acquire: Effect.gen(function* () {
          const avail = yield* Ref.get(available);

          if (avail.length > 0) {
            yield* Effect.log(`[池] 从池中重用连接`);

            const conn = avail.pop()!;

            yield* Ref.modify(inUse, (set) => [
              undefined,
              new Set(set).add(conn.id),
            ]);

            return conn;
          }

          const inUseCount = (yield* Ref.get(inUse)).size;

          if (inUseCount >= maxSize) {
            yield* Effect.fail(new Error("池耗尽"));
          }

          const connId = `conn-${++idCounter}`;

          yield* Effect.log(`[池] 创建新连接: ${connId}`);

          const conn = { id: connId, isOpen: true };

          yield* Ref.modify(inUse, (set) => [
            undefined,
            new Set(set).add(connId),
          ]);

          return conn;
        }),

        release: (conn: Connection) =>
          Effect.gen(function* () {
            yield* Ref.modify(inUse, (set) => {
              const updated = new Set(set);
              updated.delete(conn.id);
              return [undefined, updated];
            });

            yield* Ref.modify(available, (avail) => [
              undefined,
              [...avail, conn],
            ]);

            yield* Effect.log(`[池] 返回连接: ${conn.id}`);
          }),
      };
    });

  const pool = yield* createConnectionPool(3);

  // 获取和释放连接
  const conn1 = yield* pool.acquire();
  const conn2 = yield* pool.acquire();

  yield* pool.release(conn1);

  const conn3 = yield* pool.acquire(); // 重用conn1

  yield* Effect.log(`
`);

  // 示例3:基于作用域的资源安全
  console.log(`[3] 作用域资源(分层清理):
`);

  let scopedCount = 0;

  const withScoped = <R,>(create: () => Effect.Effect<R>) =>
    Effect.gen(function* () {
      scopedCount++;
      const id = scopedCount;

      yield* Effect.log(`[作用域] 进入作用域 ${id}`);

      const resource = yield* create();

      yield* Effect.log(`[作用域] 在作用域 ${id} 中使用资源`);

      yield* Effect.sync(() => {
        // 作用域退出时清理发生
        yield* Effect.log(`[作用域] 退出作用域 ${id}`);
      }).pipe(
        Effect.ensuring(
          Effect.log(`[作用域] 保证清理作用域 ${id}`)
        )
      );

      return resource;
    });

  // 嵌套作用域
  const result = yield* withScoped(() =>
    Effect.succeed({
      level: 1,
      data: yield* withScoped(() => Effect.succeed("内部数据")),
    })
  ).pipe(
    Effect.catchAll(() => Effect.succeed({ level: 0, data: null }))
  );

  yield* Effect.log(`[作用域] 清理顺序: 内部 → 外部
`);

  // 示例4:流资源管理
  console.log(`[4] 带资源清理的流:
`);

  let streamResourceCount = 0;

  // 模拟获取资源的流
  const streamWithResources = Stream.empty.pipe(
    Stream.tap(() =>
      Effect.gen(function* () {
        streamResourceCount++;
        yield* Effect.log(`[流-资源] 获取资源 ${streamResourceCount}`);
      })
    ),
    // 流结束时清理
    Stream.ensuring(
      Effect.log(`[流-资源] 清理所有 ${streamResourceCount} 资源`)
    )
  );

  yield* Stream.runDrain(streamWithResources);

  // 示例5:带清理的错误传播
  console.log(`
[5] 带清理的错误安全:
`);

  const safeRead = (retryCount: number) =>
    Effect.gen(function* () {
      let handle: FileHandle | null = null;

      try {
        handle = yield* openFile(`/data/file-${retryCount}.txt`);

        if (retryCount < 2) {
          yield* Effect.log(`[读取] 尝试 ${retryCount}: 故意失败`);
          yield* Effect.fail(new Error(`尝试 ${retryCount} 失败`));
        }

        yield* Effect.log(`[读取] 尝试 ${retryCount} 成功`);

        return "成功";
      } finally {
        if (handle) {
          yield* closeFile(handle);
        }
      }
    });

  // 带保证清理的重试
  const result2 = yield* safeRead(1).pipe(
    Effect.retry(
      Schedule.recurs(2).pipe(
        Schedule.compose(Schedule.fixed("10 millis"))
      )
    ),
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[最终] 所有重试失败: ${error.message}`);
        return "回退";
      })
    )
  );

  yield* Effect.log(`
[最终] 结果: ${result2}`);
});

Effect.runPromise(program);

原理:

流必须确定性地清理资源:

  • 获取/释放:获取资源,使用,返回资源
  • Bracket模式:确保成功或失败时清理
  • 作用域安全:即使异常也保证清理
  • 连接池:重用连接,防止耗尽
  • 并发清理:处理并发下的清理

模式:Stream.bracket()Resource.make()Scope用于资源安全


没有资源管理的流会导致问题:

问题1:资源耗尽

  • 打开文件流不关闭 → 文件描述符限制超限
  • 从池获取连接永不返回 → 连接饥饿
  • 系统无响应

问题2:内存泄漏

  • 流发射大对象 → 内存增长
  • 无清理 → 垃圾持久
  • GC无法回收

问题3:数据损坏

  • 写入文件不刷新 → 崩溃时部分写入
  • 读取连接时另一线程写入 → 数据竞争
  • 结果不可预测

问题4:静默失败

  • 资源清理失败 → 错误丢失
  • 应用程序继续如成功
  • 隐藏bug变为难以追踪的后续崩溃

解决方案:

Bracket模式

  • 获取资源
  • 使用资源(即使错误)
  • 总是释放资源
  • 单独跟踪错误

资源作用域

  • 嵌套资源管理
  • 父清理等待子
  • 分层资源图
  • 类型安全保证

连接池

  • 重用连接
  • 跟踪可用/使用中
  • 防止耗尽
  • 支持优雅关闭


流模式7:流中的错误处理

规则: 使用流错误处理程序从故障中恢复、重试操作,并在个别元素失败时维护流完整性。

良好示例:

此示例演示流错误处理模式。

import { Effect, Stream, Ref } from "effect";

interface DataRecord {
  id: string;
  value: number;
}

interface ProcessingResult {
  successful: DataRecord[];
  failed: Array<{ id: string; error: string }>;
  retried: number;
}

const program = Effect.gen(function* () {
  console.log(`
[流错误处理] 弹性流处理
`);

  // 示例1:继续处理错误(跳过失败,处理其余)
  console.log(`[1] 尽管错误继续处理:
`);

  const processElement = (record: DataRecord): Effect.Effect<string> =>
    Effect.gen(function* () {
      if (record.value < 0) {
        yield* Effect.fail(new Error(`无效值: ${record.value}`));
      }

      return `processed-${record.id}`;
    });

  const records = [
    { id: "rec1", value: 10 },
    { id: "rec2", value: -5 }, // 将失败
    { id: "rec3", value: 20 },
    { id: "rec4", value: -1 }, // 将失败
    { id: "rec5", value: 30 },
  ];

  const successfulProcessing = yield* Stream.fromIterable(records).pipe(
    Stream.mapEffect((record) =>
      processElement(record).pipe(
        Effect.map((result) => ({ success: true, result })),
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[错误] 记录 ${record.id} 失败`);

            return { success: false, error };
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(
    `[结果] ${successfulProcessing.filter((r) => r.success).length}/${records.length} 成功
`
  );

  // 示例2:使用回退值恢复
  console.log(`[2] 错误时提供回退值:
`);

  const getData = (id: string): Effect.Effect<number> =>
    id.includes("fail") ? Effect.fail(new Error("数据错误")) : Effect.succeed(42);

  const recovered = yield* Stream.fromIterable(["ok1", "fail1", "ok2"]).pipe(
    Stream.mapEffect((id) =>
      getData(id).pipe(
        Effect.catchAll(() =>
          Effect.gen(function* () {
            yield* Effect.log(`[回退] 为 ${id} 使用默认值`);

            return -1; // 回退值
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(`[值] ${recovered.join(", ")}
`);

  // 示例3:收集错误与成功
  console.log(`[3] 收集错误和成功:
`);

  const results = yield* Ref.make<ProcessingResult>({
    successful: [],
    failed: [],
    retried: 0,
  });

  yield* Stream.fromIterable(records).pipe(
    Stream.mapEffect((record) =>
      processElement(record).pipe(
        Effect.tap((result) =>
          Ref.modify(results, (r) => [
            undefined,
            {
              ...r,
              successful: [...r.successful, record],
            },
          ])
        ),
        Effect.catchAll((error) =>
          Ref.modify(results, (r) => [
            undefined,
            {
              ...r,
              failed: [
                ...r.failed,
                { id: record.id, error: error.message },
              ],
            },
          ])
        )
      )
    ),
    Stream.runDrain
  );

  const finalResults = yield* Ref.get(results);

  yield* Effect.log(
    `[聚合] ${finalResults.successful.length} 成功, ${finalResults.failed.length} 失败`
  );

  for (const failure of finalResults.failed) {
    yield* Effect.log(`  - ${failure.id}: ${failure.error}`);
  }

  // 示例4:带退避重试错误
  console.log(`
[4] 带指数退避的重试:
`);

  let attemptCount = 0;

  const unreliableOperation = (id: string): Effect.Effect<string> =>
    Effect.gen(function* () {
      attemptCount++;

      if (attemptCount <= 2) {
        yield* Effect.log(`[尝试 ${attemptCount}] 为 ${id} 失败`);

        yield* Effect.fail(new Error("临时失败"));
      }

      yield* Effect.log(`[成功] 尝试 ${attemptCount} 成功`);

      return `result-${id}`;
    });

  const retried = unreliableOperation("test").pipe(
    Effect.retry(
      Schedule.exponential("10 millis").pipe(
        Schedule.upTo("100 millis"),
        Schedule.recurs(3)
      )
    ),
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[耗尽] 所有重试失败`);

        return "回退";
      })
    )
  );

  yield* retried;

  // 示例5:流中的错误上下文
  console.log(`
[5] 传播错误上下文:
`);

  interface StreamContext {
    batchId: string;
    timestamp: Date;
  }

  const processWithContext = (context: StreamContext) =>
    Stream.fromIterable([1, 2, -3, 4]).pipe(
      Stream.mapEffect((value) =>
        Effect.gen(function* () {
          if (value < 0) {
            yield* Effect.fail(
              new Error(
                `批次 ${context.batchId} 中在 ${context.timestamp.toISOString()} 的负值`
              )
            );
          }

          return value * 2;
        })
      ),
      Stream.catchAll((error) =>
        Effect.gen(function* () {
          yield* Effect.log(`[上下文错误] ${error.message}`);

          return Stream.empty;
        })
      )
    );

  const context: StreamContext = {
    batchId: "batch-001",
    timestamp: new Date(),
  };

  yield* processWithContext(context).pipe(Stream.runDrain);

  // 示例6:部分恢复(保留好数据,记录坏数据)
  console.log(`
[6] 部分恢复策略:
`);

  const mixedQuality = [
    { id: "1", data: "good" },
    { id: "2", data: "bad" },
    { id: "3", data: "good" },
    { id: "4", data: "bad" },
    { id: "5", data: "good" },
  ];

  const processQuality = (record: { id: string; data: string }) =>
    record.data === "good"
      ? Effect.succeed(`valid-${record.id}`)
      : Effect.fail(new Error(`${record.id} 的无效数据`));

  const partialResults = yield* Stream.fromIterable(mixedQuality).pipe(
    Stream.mapEffect((record) =>
      processQuality(record).pipe(
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[记录] ${error.message}`);

            return null; // 跳过此记录
          })
        )
      )
    ),
    Stream.filter((result) => result !== null),
    Stream.runCollect
  );

  yield* Effect.log(
    `[部分] 保留 ${partialResults.length}/${mixedQuality.length} 有效记录
`
  );

  // 示例7:流中的超时处理
  console.log(`[7] 每个元素的超时处理:
`);

  const slowOperation = (id: string): Effect.Effect<string> =>
    Effect.gen(function* () {
      // 模拟慢操作
      if (id === "slow") {
        yield* Effect.sleep("200 millis");
      } else {
        yield* Effect.sleep("50 millis");
      }

      return `done-${id}`;
    });

  const withTimeout = yield* Stream.fromIterable(["fast1", "slow", "fast2"]).pipe(
    Stream.mapEffect((id) =>
      slowOperation(id).pipe(
        Effect.timeout("100 millis"),
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[超时] 操作 ${id} 超时`);

            return "timeout-fallback";
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(`[结果] ${withTimeout.join(", ")}
`);

  // 示例8:关键错误时流终止
  console.log(`[8] 关键错误时终止流:
`);

  const isCritical = (error: Error): boolean =>
    error.message.includes("关键");

  const terminateOnCritical = Stream.fromIterable([1, 2, 3]).pipe(
    Stream.mapEffect((value) =>
      value === 2
        ? Effect.fail(new Error("关键: 系统故障"))
        : Effect.succeed(value)
    ),
    Stream.catchAll((error) =>
      Effect.gen(function* () {
        if (isCritical(error)) {
          yield* Effect.log(`[关键] 终止流`);

          return Stream.fail(error);
        }

        yield* Effect.log(`[警告] 尽管错误继续`);

        return Stream.empty;
      })
    )
  );

  yield* terminateOnCritical.pipe(
    Stream.runCollect,
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[停止] 流停止: ${error.message}`);

        return [];
      })
    )
  );
});

Effect.runPromise(program);

原理:

流错误处理支持弹性:

  • 继续处理错误:跳过失败元素,处理其余
  • 恢复:提供回退值
  • 重试:再次尝试失败元素
  • 聚合:收集错误与成功值
  • 优雅终止:受控关闭
  • 传播:让错误向上游流动

模式:Stream.catchAll()Stream.retry()Stream.recover()Stream.runCollect()


流中错误导致级联故障:

问题1:流死亡

  • 处理10,000条记录
  • 第5000条有坏数据
  • 流崩溃
  • 9,000条记录未处理
  • 需要手动重新运行

问题2:静默数据丢失

  • 流遇到错误
  • 停止处理
  • 调用者未注意
  • 缺失数据未被检测
  • 报告错误数字

问题3:无恢复可见性

  • 错误发生
  • 是否重试?多少次?
  • 恢复了吗?
  • 需要静默猜测

问题4:下游影响

  • 流错误影响所有订阅者
  • 级联故障
  • 系统不可用
  • 所有下游阻塞

解决方案:

继续处理错误

  • 跳过失败元素
  • 处理流的其余部分
  • 收集错误供后续处理
  • 部分成功可接受

带退避重试

  • 临时错误?重试
  • 指数退避
  • 最终放弃
  • 移至下一元素

错误聚合

  • 收集所有错误
  • 收集所有成功
  • 报告两者
  • 分析/调试

优雅终止

  • 错误时发出流结束信号
  • 允许清理
  • 防止资源泄漏
  • 受控关闭


流模式8:高级流转换

规则: 使用高级流运算符构建复杂数据管道,优雅组合并在规模上保持性能。

良好示例:

此示例演示高级流转换。

import { Effect, Stream, Ref, Chunk } from "effect";

interface LogEntry {
  timestamp: Date;
  level: "info" | "warn" | "error";
  message: string;
  context?: Record<string, unknown>;
}

interface Metric {
  name: string;
  value: number;
  tags: Record<string, string>;
}

const program = Effect.gen(function* () {
  console.log(`
[高级流转换] 复杂数据流
`);

  // 示例1:自定义过滤器运算符
  console.log(`[1] 带effect逻辑的自定义过滤器:
`);

  const filterByEffect = <A,>(
    predicate: (a: A) => Effect.Effect<boolean>
  ) =>
    (stream: Stream.Stream<A>) =>
      stream.pipe(
        Stream.mapEffect((value) =>
          predicate(value).pipe(
            Effect.map((keep) => (keep ? value : null))
          )
        ),
        Stream.filter((value) => value !== null)
      );

  const isValid = (num: number): Effect.Effect<boolean> =>
    Effect.gen(function* () {
      // 模拟验证effect(例如,API调用)
      return num > 0 && num < 100;
    });

  const numbers = [50, 150, 25, -10, 75];

  const validNumbers = yield* Stream.fromIterable(numbers).pipe(
    filterByEffect(isValid),
    Stream.runCollect
  );

  yield* Effect.log(`[有效] ${validNumbers.join(", ")}
`);

  // 示例2:丰富转换
  console.log(`[2] 使用额外数据丰富记录:
`);

  interface RawRecord {
    id: string;
    value: number;
  }

  interface EnrichedRecord {
    id: string;
    value: number;
    validated: boolean;
    processed: Date;
    metadata: Record<string, unknown>;
  }

  const enrich = (record: RawRecord): Effect.Effect<EnrichedRecord> =>
    Effect.gen(function* () {
      // 模拟查找/验证
      const validated = record.value > 0;

      return {
        id: record.id,
        value: record.value,
        validated,
        processed: new Date(),
        metadata: { source: "stream" },
      };
    });

  const rawData = [
    { id: "r1", value: 10 },
    { id: "r2", value: -5 },
    { id: "r3", value: 20 },
  ];

  const enriched = yield* Stream.fromIterable(rawData).pipe(
    Stream.mapEffect((record) => enrich(record)),
    Stream.runCollect
  );

  yield* Effect.log(`[丰富] ${enriched.length} 条记录已丰富
`);

  // 示例3:解复用(将一个流拆分为多个)
  console.log(`[3] 按类别解复用:
`);

  interface Event {
    id: string;
    type: "click" | "view" | "purchase";
    data: unknown;
  }

  const events: Event[] = [
    { id: "e1", type: "click", data: { x: 100, y: 200 } },
    { id: "e2", type: "view", data: { url: "/" } },
    { id: "e3", type: "purchase", data: { amount: 99.99 } },
    { id: "e4", type: "click", data: { x: 50, y: 100 } },
  ];

  const clicks = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "click"),
    Stream.runCollect
  );

  const views = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "view"),
    Stream.runCollect
  );

  const purchases = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "purchase"),
    Stream.runCollect
  );

  yield* Effect.log(
    `[解复用] 点击: ${clicks.length}, 查看: ${views.length}, 购买: ${purchases.length}
`
  );

  // 示例4:分块处理(批量转换)
  console.log(`[4] 分块处理(N个批次):
`);

  const processChunk = (chunk: Array<{ id: string; value: number }>) =>
    Effect.gen(function* () {
      const sum = chunk.reduce((s, r) => s + r.value, 0);
      const avg = sum / chunk.length;

      yield* Effect.log(
        `[块] ${chunk.length} 项, 平均: ${avg.toFixed(2)}`
      );

      return { size: chunk.length, sum, avg };
    });

  const data = Array.from({ length: 10 }, (_, i) => ({
    id: `d${i}`,
    value: i + 1,
  }));

  const chunkSize = 3;
  const chunks = [];

  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);

    chunks.push(chunk);
  }

  const chunkResults = yield* Effect.all(
    chunks.map((chunk) => processChunk(chunk))
  );

  yield* Effect.log(
    `[块] 处理了 ${chunkResults.length} 个批次
`
  );

  // 示例5:多阶段转换管道
  console.log(`[5] 多阶段管道(解析 → 验证 → 转换):
`);

  const rawStrings = ["10", "twenty", "30", "-5", "50"];

  // 阶段1:解析
  const parsed = yield* Stream.fromIterable(rawStrings).pipe(
    Stream.mapEffect((s) =>
      Effect.gen(function* () {
        try {
          return parseInt(s);
        } catch (error) {
          yield* Effect.fail(
            new Error(`解析失败: ${s}`)
          );
        }
      }).pipe(
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[解析错误] ${error.message}`);

            return null;
          })
        )
      )
    ),
    Stream.filter((n) => n !== null),
    Stream.runCollect
  );

  yield* Effect.log(`[阶段1] 解析: ${parsed.join(", ")}`);

  // 阶段2:验证
  const validated = parsed.filter((n) => n > 0);

  yield* Effect.log(`[阶段2] 验证: ${validated.join(", ")}`);

  // 阶段3:转换
  const transformed = validated.map((n) => n * 2);

  yield* Effect.log(`[阶段3] 转换: ${transformed.join(", ")}
`);

  // 示例6:自定义运算符组合
  console.log(`[6] 可组合转换管道:
`);

  // 定义自定义运算符
  const withLogging = <A,>(label: string) =>
    (stream: Stream.Stream<A>) =>
      stream.pipe(
        Stream.tap((value) =>
          Effect.log(`[${label}] 处理: ${JSON.stringify(value)}`)
        )
      );

  const filterPositive = (stream: Stream.Stream<number>) =>
    stream.pipe(
      Stream.filter((n) => n > 0),
      Stream.tap(() => Effect.log(`[过滤] 保留正数`))
    );

  const scaleUp = (factor: number) =>
    (stream: Stream.Stream<number>) =>
      stream.pipe(
        Stream.map((n) => n * factor),
        Stream.tap((n) =>
          Effect.log(`[缩放] 缩放到 ${n}`)
        )
      );

  const testData = [10, -5, 20, -3, 30];

  const pipeline = yield* Stream.fromIterable(testData).pipe(
    withLogging("输入"),
    filterPositive,
    scaleUp(10),
    Stream.runCollect
  );

  yield* Effect.log(`[结果] 最终: ${pipeline.join(", ")}
`);

  // 示例7:状态转换
  console.log(`[7] 状态转换(运行总计):
`);

  const runningTotal = yield* Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
    Stream.scan(0, (acc, value) => acc + value),
    Stream.runCollect
  );

  yield* Effect.log(`[总计] ${runningTotal.join(", ")}
`);

  // 示例8:条件转换
  console.log(`[8] 条件转换(不同路径):
`);

  interface Item {
    id: string;
    priority: "high" | "normal" | "low";
  }

  const transformByPriority = (item: Item): Effect.Effect<{
    id: string;
    processed: string;
  }> =>
    Effect.gen(function* () {
      switch (item.priority) {
        case "high":
          yield* Effect.log(`[高] 为 ${item.id} 优先处理`);

          return { id: item.id, processed: "紧急" };

        case "normal":
          yield* Effect.log(
            `[正常] 为 ${item.id} 标准处理`
          );

          return { id: item.id, processed: "标准" };

        case "low":
          yield* Effect.log(`[低] 为 ${item.id} 延迟处理`);

          return { id: item.id, processed: "延迟" };
      }
    });

  const items: Item[] = [
    { id: "i1", priority: "normal" },
    { id: "i2", priority: "high" },
    { id: "i3", priority: "low" },
  ];

  const processed = yield* Stream.fromIterable(items).pipe(
    Stream.mapEffect((item) => transformByPriority(item)),
    Stream.runCollect
  );

  yield* Effect.log(
    `[条件] 处理了 ${processed.length} 项
`
  );

  // 示例9:性能优化转换
  console.log(`[9] 性能优化:
`);

  const largeDataset = Array.from({ length: 1000 }, (_, i) => i);

  const startTime = Date.now();

  // 使用高效运算符
  const result = yield* Stream.fromIterable(largeDataset).pipe(
    Stream.filter((n) => n % 2 === 0), // 保留偶数
    Stream.take(100), // 限制为前100
    Stream.map((n) => n * 2), // 转换
    Stream.runCollect
  );

  const elapsed = Date.now() - startTime;

  yield* Effect.log(
    `[性能] 在 ${elapsed}毫秒内处理了1000项, 保留了 ${result.length} 项`
  );
});

Effect.runPromise(program);

原理:

高级转换支持复杂数据流:

  • 自定义运算符:构建可重用转换
  • 基于effect:带副作用的转换
  • 惰性评估:仅计算所需内容
  • 融合:优化组合操作
  • 分层:多个转换层
  • 组合:清洁组合运算符

模式:Stream.mapEffect()Stream.map(),管道组合


简单转换不扩展:

问题1:性能退化

  • 每层创建中间集合
  • 10个转换 = 10个分配
  • 处理1M项 = 10M分配
  • GC压力,内存耗尽

问题2:复杂逻辑分散

  • 验证在这里,丰富在那里,过滤在别处
  • 难以维护
  • 更改破坏其他部分
  • 无清晰数据流

问题3:effect处理

  • 转换需要副作用
  • 网络调用,数据库查询
  • 朴素方法:全部加载,顺序转换
  • 慢,低效

问题4:可重用性

  • 自定义转换仅用一次
  • 下次从头重写
  • 代码重复
  • 错误复制

解决方案:

自定义运算符

  • 封装转换逻辑
  • 跨项目可重用
  • 可隔离测试
  • 可组合

惰性评估

  • 按元素流计算
  • 无中间集合
  • 恒定内存
  • 仅计算所用内容

融合

  • 组合多个map/filter
  • 单次数据通过
  • 无中间集合
  • 编译器/库优化

Effect组合

  • 自然链式effect
  • 自动错误传播
  • 保证资源清理
  • 可读代码