Effect-TS流汇模式Skill effect-patterns-streams-sinks

此技能提供Effect-TS中用于流汇的六个模式,包括批处理、事件日志、消息队列、回退和重试策略,帮助开发者在处理流数据时实现高效、可靠的数据持久化和分发。关键词:Effect-TS, 流汇, 流处理, 模式, 函数式编程, 后端开发, 数据持久化, 分布式系统

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: effect-patterns-streams-sinks 描述: Effect-TS中用于流汇的模式。适用于在Effect-TS应用中处理流汇的任务。

Effect-TS模式:流汇

此技能提供6个精选的Effect-TS流汇模式。 在以下任务中使用此技能:

  • 流汇
  • Effect-TS应用中的最佳实践
  • 真实世界模式和解决方案

🟡 中级模式

汇模式1:批量插入流记录到数据库

规则: 在数据库操作之前批量处理流记录,以提高吞吐量并减少事务开销。

良好示例:

此示例演示了从分页API流式传输用户记录,并批量插入到数据库中以提高效率。

import { Effect, Stream, Sink, Chunk } from "effect";

interface User {
  readonly id: number;
  readonly name: string;
  readonly email: string;
}

interface PaginatedResponse {
  readonly users: User[];
  readonly nextPage: number | null;
}

// 模拟返回分页用户的API
const fetchUserPage = (
  page: number
): Effect.Effect<PaginatedResponse> =>
  Effect.succeed(
    page < 10
      ? {
          users: Array.from({ length: 50 }, (_, i) => ({
            id: page * 50 + i,
            name: `User ${page * 50 + i}`,
            email: `user${page * 50 + i}@example.com`,
          })),
          nextPage: page + 1,
        }
      : { users: [], nextPage: null }
  ).pipe(Effect.delay("10 millis"));

// 模拟数据库插入,接受一批用户
const insertUserBatch = (
  users: readonly User[]
): Effect.Effect<number> =>
  Effect.sync(() => {
    console.log(`插入 ${users.length} 个用户的批次`);
    return users.length;
  }).pipe(Effect.delay("50 millis"));

// 从分页API创建用户流
const userStream: Stream.Stream<User> = Stream.paginateEffect(
  0,
  (page) =>
    fetchUserPage(page).pipe(
      Effect.map((response) => [
        Chunk.fromIterable(response.users),
        response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
      ])
    )
);

// 批量插入用户的汇
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
  0,
  (count, chunk: Chunk.Chunk<User>) =>
    Effect.gen(function* () {
      const users = Chunk.toArray(chunk);
      const inserted = yield* insertUserBatch(users);
      return count + inserted;
    }),
  (count) => Effect.succeed(count)
).pipe(
  // 按100个用户一组进行批处理
  Sink.withChunking((chunk) =>
    chunk.pipe(
      Chunk.chunksOf(100),
      Stream.fromIterable,
      Stream.runCollect
    )
  )
);

// 运行带批处理汇的流
const program = Effect.gen(function* () {
  const totalInserted = yield* userStream.pipe(
    Stream.run(batchInsertSink)
  );
  console.log(`总共插入用户数:${totalInserted}`);
});

Effect.runPromise(program);

此模式:

  1. 创建流 从分页API获取用户
  2. 定义批处理汇 将用户分组为100个一组
  3. 插入每个批次 到数据库,单次操作
  4. 跟踪总数 插入的记录

批处理自动发生——汇收集元素直到达到批次大小,然后处理整个批次。


理由:

当消费流记录以持久化到数据库时,使用Sink在插入前收集它们为批次。这减少了数据库往返次数和事务开销,显著提高整体吞吐量。


逐条插入记录效率低下:

  • 每次插入是一个单独的数据库调用(网络延迟、连接开销)
  • 每次插入可能是一个单独的事务(ACID开销)
  • 在规模上资源争用和连接池耗尽

批处理通过以下方式解决:

  • 将N个记录分组为单个批量插入操作
  • 在多个记录间分摊数据库开销
  • 即使在高负载下保持吞吐量
  • 使整个批次的高效事务语义成为可能

例如,逐条插入10,000个记录可能需要100秒。以100个为一组批处理可能只需2-3秒。



汇模式2:将流事件写入事件日志

规则: 将流事件附加到事件日志中,包含元数据,以维护完整、有序的发生记录。

良好示例:

此示例演示事件溯源模式,其中用户账户事件流被附加到带元数据的事件日志中。

import { Effect, Stream, Sink, DateTime, Data } from "effect";

// 事件类型
type AccountEvent =
  | AccountCreated
  | MoneyDeposited
  | MoneyWithdrawn
  | AccountClosed;

class AccountCreated extends Data.TaggedError("AccountCreated")<{
  readonly accountId: string;
  readonly owner: string;
  readonly initialBalance: number;
}> {}

class MoneyDeposited extends Data.TaggedError("MoneyDeposited")<{
  readonly accountId: string;
  readonly amount: number;
}> {}

class MoneyWithdrawn extends Data.TaggedError("MoneyWithdrawn")<{
  readonly accountId: string;
  readonly amount: number;
}> {}

class AccountClosed extends Data.TaggedError("AccountClosed")<{
  readonly accountId: string;
}> {}

// 带元数据的事件信封
interface StoredEvent {
  readonly eventId: string; // 每个事件的唯一标识符
  readonly eventType: string; // 事件类型
  readonly aggregateId: string; // 事件相关的聚合ID
  readonly aggregateType: string; // 事物类型(如账户)
  readonly data: any; // 事件负载
  readonly metadata: {
    readonly timestamp: number;
    readonly version: number; // 日志中的位置
    readonly causationId?: string; // 引发此事件的原因ID
  };
}

// 模拟附加事件的事件日志
const eventLog: StoredEvent[] = [];
let eventVersion = 0;

const appendToEventLog = (
  event: AccountEvent,
  aggregateId: string
): Effect.Effect<StoredEvent> =>
  Effect.gen(function* () {
    const now = yield* DateTime.now;
    const storedEvent: StoredEvent = {
      eventId: `evt-${eventVersion}-${Date.now()}`,
      eventType: event._tag,
      aggregateId,
      aggregateType: "Account",
      data: event,
      metadata: {
        timestamp: now.toEpochMillis(),
        version: ++eventVersion,
      },
    };

    // 附加到日志(模拟)
    eventLog.push(storedEvent);
    console.log(
      `[v${storedEvent.metadata.version}] ${storedEvent.eventType}: ${aggregateId}`
    );

    return storedEvent;
  });

// 模拟从各种账户操作产生的事件流
const accountEvents: Stream.Stream<[string, AccountEvent]> = Stream.fromIterable([
  [
    "acc-1",
    new AccountCreated({
      accountId: "acc-1",
      owner: "Alice",
      initialBalance: 1000,
    }),
  ],
  ["acc-1", new MoneyDeposited({ accountId: "acc-1", amount: 500 })],
  ["acc-1", new MoneyWithdrawn({ accountId: "acc-1", amount: 200 })],
  [
    "acc-2",
    new AccountCreated({
      accountId: "acc-2",
      owner: "Bob",
      initialBalance: 2000,
    }),
  ],
  ["acc-2", new MoneyDeposited({ accountId: "acc-2", amount: 1000 })],
  ["acc-1", new AccountClosed({ accountId: "acc-1" })],
]);

// 汇:将每个事件附加到日志
const eventLogSink: Sink.Sink<number, never, [string, AccountEvent]> = Sink.fold(
  0,
  (count, [aggregateId, event]) =>
    appendToEventLog(event, aggregateId).pipe(
      Effect.map(() => count + 1)
    ),
  (count) => Effect.succeed(count)
);

// 运行流并附加所有事件
const program = Effect.gen(function* () {
  const totalEvents = yield* accountEvents.pipe(Stream.run(eventLogSink));

  console.log(`
总共附加事件数:${totalEvents}`);
  console.log(`
事件日志内容:`);
  eventLog.forEach((event) => {
    console.log(`  [v${event.metadata.version}] ${event.eventType}`);
  });
});

Effect.runPromise(program);

此模式:

  1. 定义事件类型 使用标记错误(如AccountCreated、MoneyDeposited等)
  2. 创建事件信封 包含元数据(时间戳、版本、原因)
  3. 流式传输事件 从各种来源
  4. 附加到日志 使用适当的版本控制和排序
  5. 维护历史记录 用于重建和审计

理由:

当消费表示系统更改的事件流时,使用Sink将每个事件附加到事件日志。事件日志提供不可变、有序的记录,支持事件溯源、审计跟踪和时间查询。


事件日志是许多模式的基础:

  • 事件溯源:不是存储当前状态,而是存储导致该状态的事件序列
  • 审计跟踪:完整的、防篡改的谁、何时、做了什么记录
  • 时间查询:在任何时间点重建状态
  • 一致性:事件发生的单一真实来源
  • 重放:通过重放事件重建状态或测试更改

与批量插入不同,事件日志是仅追加的。每个事件一旦写入即不可变。这种简单性支持:

  • 快速附加(无更新,仅顺序写入)
  • 自然排序(按写入顺序的事件)
  • 易于分发(复制日志)
  • 强一致性(事件是事实,不会改变)


汇模式4:将流记录发送到消息队列

规则: 将流记录流式传输到消息队列,使用适当的批处理和确认,以实现可靠的分布式数据流。

良好示例:

此示例演示了流式传输传感器读数,并以基于主题的分区方式发布到消息队列。

import { Effect, Stream, Sink, Chunk } from "effect";

interface SensorReading {
  readonly sensorId: string;
  readonly location: string;
  readonly temperature: number;
  readonly humidity: number;
  readonly timestamp: number;
}

// 模拟消息队列发布者
interface QueuePublisher {
  readonly publish: (
    topic: string,
    partition: string,
    messages: readonly SensorReading[]
  ) => Effect.Effect<{ acknowledged: number; messageIds: string[] }>;
}

// 创建模拟队列发布者
const createMockPublisher = (): QueuePublisher => {
  const publishedMessages: Record<string, SensorReading[]> = {};

  return {
    publish: (topic, partition, messages) =>
      Effect.gen(function* () {
        const key = `${topic}/${partition}`;
        publishedMessages[key] = [
          ...(publishedMessages[key] ?? []),
          ...messages,
        ];

        const messageIds = Array.from({ length: messages.length }, (_, i) =>
          `msg-${Date.now()}-${i}`
        );

        console.log(
          `发布 ${messages.length} 条消息到 ${key}(批次)`
        );

        return { acknowledged: messages.length, messageIds };
      }),
  };
};

// 基于传感器位置确定分区键
const getPartitionKey = (reading: SensorReading): string =>
  reading.location; // 按位置路由以实现数据局部性

// 模拟传感器读数流
const sensorStream: Stream.Stream<SensorReading> = Stream.fromIterable([
  {
    sensorId: "temp-1",
    location: "warehouse-a",
    temperature: 22.5,
    humidity: 45,
    timestamp: Date.now(),
  },
  {
    sensorId: "temp-2",
    location: "warehouse-b",
    temperature: 21.0,
    humidity: 50,
    timestamp: Date.now() + 100,
  },
  {
    sensorId: "temp-3",
    location: "warehouse-a",
    temperature: 22.8,
    humidity: 46,
    timestamp: Date.now() + 200,
  },
  {
    sensorId: "temp-4",
    location: "warehouse-c",
    temperature: 20.5,
    humidity: 55,
    timestamp: Date.now() + 300,
  },
  {
    sensorId: "temp-5",
    location: "warehouse-b",
    temperature: 21.2,
    humidity: 51,
    timestamp: Date.now() + 400,
  },
  {
    sensorId: "temp-6",
    location: "warehouse-a",
    temperature: 23.0,
    humidity: 47,
    timestamp: Date.now() + 500,
  },
]);

// 创建汇:批量发布到消息队列
const createQueuePublishSink = (
  publisher: QueuePublisher,
  topic: string,
  batchSize: number = 100
): Sink.Sink<number, Error, SensorReading> =>
  Sink.fold(
    { batches: new Map<string, SensorReading[]>(), totalPublished: 0 },
    (state, reading) =>
      Effect.gen(function* () {
        const partition = getPartitionKey(reading);
        const batch = state.batches.get(partition) ?? [];
        const newBatch = [...batch, reading];

        if (newBatch.length >= batchSize) {
          // 批次已满,发布它
          const result = yield* publisher.publish(topic, partition, newBatch);
          const newState = new Map(state.batches);
          newState.delete(partition);

          return {
            ...state,
            batches: newState,
            totalPublished: state.totalPublished + result.acknowledged,
          };
        } else {
          // 添加到批次并继续
          const newState = new Map(state.batches);
          newState.set(partition, newBatch);

          return { ...state, batches: newState };
        }
      }),
    (state) =>
      Effect.gen(function* () {
        let finalCount = state.totalPublished;

        // 发布任何剩余的部分批次
        for (const [partition, batch] of state.batches) {
          if (batch.length > 0) {
            const result = yield* publisher.publish(topic, partition, batch);
            finalCount += result.acknowledged;
          }
        }

        return finalCount;
      })
  );

// 运行流并发布到队列
const program = Effect.gen(function* () {
  const publisher = createMockPublisher();
  const topic = "sensor-readings";

  const published = yield* sensorStream.pipe(
    Stream.run(createQueuePublishSink(publisher, topic, 50)) // 批次大小50
  );

  console.log(
    `
发布到队列的总消息数:${published}`
  );
});

Effect.runPromise(program);

此模式:

  1. 按分区分组读数(位置)以实现数据局部性
  2. 批处理记录 在发布前(每次50个)
  3. 发布批次 到队列,带分区键
  4. 刷新部分批次 当流结束时
  5. 跟踪队列确认

理由:

当消费需要分发到其他系统的事件流时,使用Sink将它们发布到消息队列。消息队列提供可靠的、可扩展的分发,具有排序和恰好一次语义等保证。


消息队列是事件驱动架构的支柱:

  • 解耦:生产者不等待消费者
  • 可扩展性:多个订阅者可以独立消费
  • 持久性:即使订阅者下线,消息也持久化
  • 排序:维护事件序列(每个分区/主题)
  • 可靠性:确认和重试确保无消息丢失

与直接写入不同,队列发布是异步的,支持:

  • 高吞吐量发布(每个操作批量多个记录)
  • 背压处理(队列管理流)
  • 多订阅者模式(扇出)
  • 死信队列用于错误处理


汇模式5:故障时回退到替代汇

规则: 实现回退汇,以优雅地处理故障,并确保即使主目的地不可用,数据也能持久化。

良好示例:

此示例演示了一个系统,它首先尝试将订单记录写入快速内存缓存,如果缓存失败则回退到数据库,如果数据库失败则回退到死信文件。

import { Effect, Stream, Sink, Chunk, Either, Data } from "effect";

interface Order {
  readonly orderId: string;
  readonly customerId: string;
  readonly total: number;
  readonly timestamp: number;
}

class CacheSinkError extends Data.TaggedError("CacheSinkError")<{
  readonly reason: string;
}> {}

class DatabaseSinkError extends Data.TaggedError("DatabaseSinkError")<{
  readonly reason: string;
}> {}

// 模拟内存缓存汇(快速但有限)
const createCacheSink = (): Sink.Sink<number, CacheSinkError, Order> => {
  const cache: Order[] = [];
  const MAX_CACHE_SIZE = 1000;

  return Sink.fold(
    0,
    (count, order) =>
      Effect.gen(function* () {
        if (cache.length >= MAX_CACHE_SIZE) {
          yield* Effect.fail(
            new CacheSinkError({
              reason: `缓存已满(${cache.length}/${MAX_CACHE_SIZE})`,
            })
          );
        }

        cache.push(order);
        console.log(`[CACHE] 缓存订单 ${order.orderId}`);
        return count + 1;
      }),
    (count) =>
      Effect.gen(function* () {
        console.log(`[CACHE] 最终:缓存中 ${count} 个订单`);
        return count;
      })
  );
};

// 模拟数据库汇(较慢但可靠)
const createDatabaseSink = (): Sink.Sink<number, DatabaseSinkError, Order> => {
  const orders: Order[] = [];

  return Sink.fold(
    0,
    (count, order) =>
      Effect.gen(function* () {
        // 模拟偶尔的数据库故障
        if (Math.random() < 0.1) {
          yield* Effect.fail(
            new DatabaseSinkError({
              reason: "连接超时",
            })
          );
        }

        orders.push(order);
        console.log(`[DATABASE] 持久化订单 ${order.orderId}`);
        return count + 1;
      }),
    (count) =>
      Effect.gen(function* () {
        console.log(`[DATABASE] 最终:数据库中 ${count} 个订单`);
        return count;
      })
  );
};

// 模拟文件汇(总是工作但慢)
const createDeadLetterSink = (): Sink.Sink<number, never, Order> => {
  const deadLetters: Order[] = [];

  return Sink.fold(
    0,
    (count, order) =>
      Effect.gen(function* () {
        deadLetters.push(order);
        console.log(
          `[DEAD-LETTER] 将订单 ${order.orderId} 写入死信文件`
        );
        return count + 1;
      }),
    (count) =>
      Effect.gen(function* () {
        console.log(
          `[DEAD-LETTER] 最终:死信文件中 ${count} 个订单`
        );
        return count;
      })
  );
};

// 创建回退汇:尝试缓存 -> 数据库 -> 文件
const createFallbackSink = (): Sink.Sink<
  { readonly cached: number; readonly persisted: number; readonly deadLetters: number },
  never,
  Order
> =>
  Sink.fold(
    { cached: 0, persisted: 0, deadLetters: 0 },
    (state, order) =>
      Effect.gen(function* () {
        // 先尝试缓存
        const cacheResult = yield* createCacheSink()
          .pipe(Sink.feed(Chunk.of(order)))
          .pipe(Effect.either);

        if (Either.isRight(cacheResult)) {
          return {
            ...state,
            cached: state.cached + cacheResult.right,
          };
        }

        console.log(
          `[FALLBACK] 缓存失败(${cacheResult.left.reason}),尝试数据库`
        );

        // 缓存失败,尝试数据库
        const dbResult = yield* createDatabaseSink()
          .pipe(Sink.feed(Chunk.of(order)))
          .pipe(Effect.either);

        if (Either.isRight(dbResult)) {
          return {
            ...state,
            persisted: state.persisted + dbResult.right,
          };
        }

        console.log(
          `[FALLBACK] 数据库失败(${dbResult.left.reason}),回退到死信`
        );

        // 数据库失败,使用死信
        const dlResult = yield* createDeadLetterSink()
          .pipe(Sink.feed(Chunk.of(order)));

        return {
          ...state,
          deadLetters: state.deadLetters + dlResult,
        };
      }),
    (state) =>
      Effect.gen(function* () {
        console.log(`
[SUMMARY]`);
        console.log(`  缓存:      ${state.cached}`);
        console.log(`  持久化:   ${state.persisted}`);
        console.log(`  死信: ${state.deadLetters}`);
        return state;
      })
  );

// 模拟订单流
const orderStream: Stream.Stream<Order> = Stream.fromIterable([
  {
    orderId: "order-1",
    customerId: "cust-1",
    total: 99.99,
    timestamp: Date.now(),
  },
  {
    orderId: "order-2",
    customerId: "cust-2",
    total: 149.99,
    timestamp: Date.now() + 100,
  },
  {
    orderId: "order-3",
    customerId: "cust-1",
    total: 49.99,
    timestamp: Date.now() + 200,
  },
  {
    orderId: "order-4",
    customerId: "cust-3",
    total: 199.99,
    timestamp: Date.now() + 300,
  },
  {
    orderId: "order-5",
    customerId: "cust-2",
    total: 89.99,
    timestamp: Date.now() + 400,
  },
]);

// 运行带回退汇的流
const program = Effect.gen(function* () {
  const result = yield* orderStream.pipe(Stream.run(createFallbackSink()));
  console.log(`
总共处理订单数:${result.cached + result.persisted + result.deadLetters}`);
});

Effect.runPromise(program);

此模式:

  1. 首先尝试缓存(快速,容量有限)
  2. 回退到数据库 如果缓存已满
  3. 回退到死信 如果数据库失败
  4. 跟踪每个记录 使用了哪个汇
  5. 报告摘要 数据去向

理由:

当消费流到可能失败的主目的地时,使用回退模式包装它。如果主汇失败,自动将流重定向到替代汇。这支持系统逐渐降级,而不是完全失败。


生产系统需要弹性:

  • 主故障:数据库停机、网络超时、配额超出
  • 逐渐降级:保持系统运行,即使能力降低
  • 无数据丢失:回退确保数据被持久化到某处
  • 操作灵活性:根据故障类型选择回退
  • 监控:跟踪回退使用情况以提醒操作员

没有回退模式时:

  • 主目的地失败时系统失败
  • 主不可用时数据丢失
  • 无明确信号表明降级发生

有回退汇时:

  • 即使主失败,流继续
  • 数据安全持久化到替代汇
  • 清晰的审计跟踪,记录使用了哪个汇


汇模式6:重试失败的流操作

规则: 在汇中实现重试策略,以处理瞬时故障并提高弹性,无需手动干预。

良好示例:

此示例演示了重试数据库写入,使用指数退避,跟踪尝试次数,并在永久故障时回退。

import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect";

interface UserRecord {
  readonly userId: string;
  readonly name: string;
  readonly email: string;
}

class WriteError extends Error {
  readonly isTransient: boolean;

  constructor(message: string, isTransient: boolean = true) {
    super(message);
    this.name = "WriteError";
    this.isTransient = isTransient;
  }
}

// 模拟偶尔失败的数据库
const database = {
  failureRate: 0.3, // 30% 瞬时故障率
  permanentFailureRate: 0.05, // 5% 永久故障率

  insertUser: (user: UserRecord): Effect.Effect<void, WriteError> =>
    Effect.gen(function* () {
      const rand = Math.random();

      // 永久故障(例如,约束违规)
      if (rand < database.permanentFailureRate) {
        throw new WriteError(
          `永久:用户 ${user.userId} 已存在`,
          false
        );
      }

      // 瞬时故障(例如,连接超时)
      if (rand < database.permanentFailureRate + database.failureRate) {
        throw new WriteError(
          `瞬时:写入 ${user.userId} 时连接超时`,
          true
        );
      }

      // 成功
      console.log(`✓ 写入用户 ${user.userId}`);
    }),
};

// 重试配置
interface RetryConfig {
  readonly maxAttempts: number;
  readonly initialDelayMs: number;
  readonly maxDelayMs: number;
  readonly backoffFactor: number;
}

const defaultRetryConfig: RetryConfig = {
  maxAttempts: 5,
  initialDelayMs: 100, // 从100毫秒开始
  maxDelayMs: 5000, // 上限5秒
  backoffFactor: 2, // 每次加倍
};

// 结果跟踪
interface OperationResult {
  readonly succeeded: number;
  readonly transientFailures: number;
  readonly permanentFailures: number;
  readonly detailedStats: Array<{
    readonly userId: string;
    readonly attempts: number;
    readonly status: "success" | "transient-failed" | "permanent-failed";
  }>;
}

// 创建带重试逻辑的汇
const createRetrySink = (config: RetryConfig): Sink.Sink<OperationResult, never, UserRecord> =>
  Sink.fold(
    {
      succeeded: 0,
      transientFailures: 0,
      permanentFailures: 0,
      detailedStats: [],
    },
    (state, user) =>
      Effect.gen(function* () {
        let lastError: WriteError | null = null;
        let attempts = 0;

        // 重试循环
        for (attempts = 1; attempts <= config.maxAttempts; attempts++) {
          try {
            yield* database.insertUser(user);

            // 成功!
            console.log(
              `[${user.userId}] 尝试 ${attempts}/${config.maxAttempts} 成功`
            );

            return {
              ...state,
              succeeded: state.succeeded + 1,
              detailedStats: [
                ...state.detailedStats,
                {
                  userId: user.userId,
                  attempts,
                  status: "success",
                },
              ],
            };
          } catch (error) {
            lastError = error as WriteError;

            if (!lastError.isTransient) {
              // 永久故障,不重试
              console.log(
                `[${user.userId}] 永久故障:${lastError.message}`
              );

              return {
                ...state,
                permanentFailures: state.permanentFailures + 1,
                detailedStats: [
                  ...state.detailedStats,
                  {
                    userId: user.userId,
                    attempts,
                    status: "permanent-failed",
                  },
                ],
              };
            }

            // 瞬时故障,如果还有尝试次数则重试
            if (attempts < config.maxAttempts) {
              // 计算带指数退避的延迟
              let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1);
              delayMs = Math.min(delayMs, config.maxDelayMs);

              // 添加抖动(±10%)
              const jitter = delayMs * 0.1;
              delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter;

              console.log(
                `[${user.userId}] 瞬时故障(尝试 ${attempts}/${config.maxAttempts}):${lastError.message}`
              );
              console.log(`  在 ${Math.round(delayMs)} 毫秒后重试...`);

              yield* Effect.sleep(Duration.millis(Math.round(delayMs)));
            }
          }
        }

        // 所有重试已用尽
        console.log(
          `[${user.userId}] 在 ${config.maxAttempts} 次尝试后失败`
        );

        return {
          ...state,
          transientFailures: state.transientFailures + 1,
          detailedStats: [
            ...state.detailedStats,
            {
              userId: user.userId,
              attempts: config.maxAttempts,
              status: "transient-failed",
            },
          ],
        };
      }),
    (state) =>
      Effect.gen(function* () {
        console.log(`
[SUMMARY]`);
        console.log(`  成功:           ${state.succeeded}`);
        console.log(`  瞬时故障:  ${state.transientFailures}`);
        console.log(`  永久故障:  ${state.permanentFailures}`);
        console.log(`  总计:               ${state.detailedStats.length}`);

        // 显示详细统计
        const failed = state.detailedStats.filter((s) => s.status !== "success");
        if (failed.length > 0) {
          console.log(`
[FAILURES]`);
          failed.forEach((stat) => {
            console.log(
              `  ${stat.userId}: ${stat.attempts} 次尝试(${stat.status})`
            );
          });
        }

        return state;
      })
  );

// 模拟要插入的用户流
const userStream: Stream.Stream<UserRecord> = Stream.fromIterable([
  { userId: "user-1", name: "Alice", email: "alice@example.com" },
  { userId: "user-2", name: "Bob", email: "bob@example.com" },
  { userId: "user-3", name: "Charlie", email: "charlie@example.com" },
  { userId: "user-4", name: "Diana", email: "diana@example.com" },
  { userId: "user-5", name: "Eve", email: "eve@example.com" },
]);

// 运行带重试汇的流
const program = Effect.gen(function* () {
  const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig)));
  console.log(`
处理完成。`);
});

Effect.runPromise(program);

此模式:

  1. 尝试操作 直到最大重试次数
  2. 区分瞬时与永久故障
  3. 使用指数退避 来间隔重试
  4. 添加抖动 以防止雪崩效应
  5. 跟踪详细统计 用于监控
  6. 报告结果摘要

理由:

当消费流到可能经历瞬时故障(如网络超时、限速、暂时不可用)的目的地时,使用重试策略包装汇操作。使用指数退避避免压倒正在恢复的系统,同时仍能快速恢复。


瞬时故障在分布式系统中常见:

  • 网络超时:临时连接问题自行解决
  • 限速:一旦限速窗口重置,服务恢复
  • 暂时不可用:服务重启或扩展
  • 断路器跳闸:服务在退避期后恢复

没有重试逻辑时:

  • 每个瞬时故障导致数据丢失或流中断
  • 需要手动干预来重启
  • 系统显得比实际更不可靠

有智能重试逻辑时:

  • 从瞬时故障自动恢复
  • 指数退避防止雪崩效应
  • 清晰可见哪些操作永久失败
  • 尽管临时问题,数据持续流动


汇模式3:将流行写入文件

规则: 使用缓冲输出和适当的资源管理,高效地将流行写入文件。

良好示例:

此示例演示了流式传输日志条目,并使用缓冲写入文件。

import { Effect, Stream, Sink, Chunk, FileSystem } from "effect";

interface LogEntry {
  readonly level: "debug" | "info" | "warn" | "error";
  readonly message: string;
  readonly timestamp: number;
}

// 将日志条目格式化为一行
const formatLogLine = (entry: LogEntry): string => {
  const iso = new Date(entry.timestamp).toISOString();
  return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`;
};

// 模拟日志条目流
const logStream: Stream.Stream<LogEntry> = Stream.fromIterable([
  { level: "info", message: "服务器启动中", timestamp: Date.now() },
  { level: "debug", message: "加载配置", timestamp: Date.now() + 100 },
  { level: "info", message: "连接到数据库", timestamp: Date.now() + 200 },
  { level: "warn", message: "检测到高内存使用", timestamp: Date.now() + 300 },
  { level: "info", message: "处理请求", timestamp: Date.now() + 400 },
  { level: "error", message: "连接超时", timestamp: Date.now() + 500 },
  { level: "info", message: "重试连接", timestamp: Date.now() + 600 },
  { level: "info", message: "连接恢复", timestamp: Date.now() + 700 },
]);

// 创建带缓冲的文件写入汇
const createFileWriteSink = (
  filePath: string,
  bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
  Effect.scoped(
    Effect.gen(function* () {
      // 以追加模式打开文件
      const fs = yield* FileSystem.FileSystem;
      const handle = yield* fs.open(filePath, "a");

      let buffer: string[] = [];
      let lineCount = 0;

      // 将缓冲行刷新到磁盘
      const flush = Effect.gen(function* () {
        if (buffer.length === 0) return;

        const content = buffer.join("
") + "
";
        yield* fs.write(handle, content);
        buffer = [];
      });

      // 返回汇
      return Sink.fold(
        0,
        (count, line: string) =>
          Effect.gen(function* () {
            buffer.push(line);
            const newCount = count + 1;

            // 当缓冲区达到大小限制时刷新
            if (buffer.length >= bufferSize) {
              yield* flush;
            }

            return newCount;
          }),
        (count) =>
          Effect.gen(function* () {
            // 关闭前刷新任何剩余行
            yield* flush;
            yield* fs.close(handle);
            return count;
          })
      );
    })
  ).pipe(
    Effect.flatten
  );

// 处理日志流
const program = Effect.gen(function* () {
  const fs = yield* FileSystem.FileSystem;
  const filePath = "/tmp/app.log";

  // 首先清除文件
  yield* fs.writeFileString(filePath, "");

  // 流式传输日志,格式化并写入文件
  const written = yield* logStream.pipe(
    Stream.map(formatLogLine),
    Stream.run(createFileWriteSink(filePath, 50)) // 刷新前缓冲50行
  );

  console.log(`写入 ${written} 行日志到 ${filePath}`);

  // 读回文件以验证
  const content = yield* fs.readFileString(filePath);
  console.log("
文件内容:");
  console.log(content);
});

Effect.runPromise(program);

此模式:

  1. 打开文件 用于追加
  2. 在内存中缓冲日志行(刷新前50行)
  3. 定期刷新 当缓冲区填满或流结束时
  4. 使用作用域安全关闭文件
  5. 跟踪行数 以确认

理由:

当消费流数据以持久化为文件行时,使用带文件写入器的Sink。缓冲输出以提高效率,并使用Effect的作用域管理确保适当的资源清理。


将流数据写入文件需要:

  • 缓冲:一次写入一行很慢。在刷新到磁盘前缓冲多行
  • 效率:通过批量写入减少系统调用和I/O开销
  • 资源管理:确保即使出错也正确关闭文件句柄
  • 排序:保持流中行的顺序

此模式对于以下情况必不可少:

  • 日志文件和审计跟踪
  • CSV/JSON行导出
  • 流数据归档
  • 带文件中间件的数据管道