名称: effect-patterns-streams-sinks 描述: Effect-TS中用于流汇的模式。适用于在Effect-TS应用中处理流汇的任务。
Effect-TS模式:流汇
此技能提供6个精选的Effect-TS流汇模式。 在以下任务中使用此技能:
- 流汇
- Effect-TS应用中的最佳实践
- 真实世界模式和解决方案
🟡 中级模式
汇模式1:批量插入流记录到数据库
规则: 在数据库操作之前批量处理流记录,以提高吞吐量并减少事务开销。
良好示例:
此示例演示了从分页API流式传输用户记录,并批量插入到数据库中以提高效率。
import { Effect, Stream, Sink, Chunk } from "effect";
interface User {
readonly id: number;
readonly name: string;
readonly email: string;
}
interface PaginatedResponse {
readonly users: User[];
readonly nextPage: number | null;
}
// 模拟返回分页用户的API
const fetchUserPage = (
page: number
): Effect.Effect<PaginatedResponse> =>
Effect.succeed(
page < 10
? {
users: Array.from({ length: 50 }, (_, i) => ({
id: page * 50 + i,
name: `User ${page * 50 + i}`,
email: `user${page * 50 + i}@example.com`,
})),
nextPage: page + 1,
}
: { users: [], nextPage: null }
).pipe(Effect.delay("10 millis"));
// 模拟数据库插入,接受一批用户
const insertUserBatch = (
users: readonly User[]
): Effect.Effect<number> =>
Effect.sync(() => {
console.log(`插入 ${users.length} 个用户的批次`);
return users.length;
}).pipe(Effect.delay("50 millis"));
// 从分页API创建用户流
const userStream: Stream.Stream<User> = Stream.paginateEffect(
0,
(page) =>
fetchUserPage(page).pipe(
Effect.map((response) => [
Chunk.fromIterable(response.users),
response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
])
)
);
// 批量插入用户的汇
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
0,
(count, chunk: Chunk.Chunk<User>) =>
Effect.gen(function* () {
const users = Chunk.toArray(chunk);
const inserted = yield* insertUserBatch(users);
return count + inserted;
}),
(count) => Effect.succeed(count)
).pipe(
// 按100个用户一组进行批处理
Sink.withChunking((chunk) =>
chunk.pipe(
Chunk.chunksOf(100),
Stream.fromIterable,
Stream.runCollect
)
)
);
// 运行带批处理汇的流
const program = Effect.gen(function* () {
const totalInserted = yield* userStream.pipe(
Stream.run(batchInsertSink)
);
console.log(`总共插入用户数:${totalInserted}`);
});
Effect.runPromise(program);
此模式:
- 创建流 从分页API获取用户
- 定义批处理汇 将用户分组为100个一组
- 插入每个批次 到数据库,单次操作
- 跟踪总数 插入的记录
批处理自动发生——汇收集元素直到达到批次大小,然后处理整个批次。
理由:
当消费流记录以持久化到数据库时,使用Sink在插入前收集它们为批次。这减少了数据库往返次数和事务开销,显著提高整体吞吐量。
逐条插入记录效率低下:
- 每次插入是一个单独的数据库调用(网络延迟、连接开销)
- 每次插入可能是一个单独的事务(ACID开销)
- 在规模上资源争用和连接池耗尽
批处理通过以下方式解决:
- 将N个记录分组为单个批量插入操作
- 在多个记录间分摊数据库开销
- 即使在高负载下保持吞吐量
- 使整个批次的高效事务语义成为可能
例如,逐条插入10,000个记录可能需要100秒。以100个为一组批处理可能只需2-3秒。
汇模式2:将流事件写入事件日志
规则: 将流事件附加到事件日志中,包含元数据,以维护完整、有序的发生记录。
良好示例:
此示例演示事件溯源模式,其中用户账户事件流被附加到带元数据的事件日志中。
import { Effect, Stream, Sink, DateTime, Data } from "effect";
// 事件类型
type AccountEvent =
| AccountCreated
| MoneyDeposited
| MoneyWithdrawn
| AccountClosed;
class AccountCreated extends Data.TaggedError("AccountCreated")<{
readonly accountId: string;
readonly owner: string;
readonly initialBalance: number;
}> {}
class MoneyDeposited extends Data.TaggedError("MoneyDeposited")<{
readonly accountId: string;
readonly amount: number;
}> {}
class MoneyWithdrawn extends Data.TaggedError("MoneyWithdrawn")<{
readonly accountId: string;
readonly amount: number;
}> {}
class AccountClosed extends Data.TaggedError("AccountClosed")<{
readonly accountId: string;
}> {}
// 带元数据的事件信封
interface StoredEvent {
readonly eventId: string; // 每个事件的唯一标识符
readonly eventType: string; // 事件类型
readonly aggregateId: string; // 事件相关的聚合ID
readonly aggregateType: string; // 事物类型(如账户)
readonly data: any; // 事件负载
readonly metadata: {
readonly timestamp: number;
readonly version: number; // 日志中的位置
readonly causationId?: string; // 引发此事件的原因ID
};
}
// 模拟附加事件的事件日志
const eventLog: StoredEvent[] = [];
let eventVersion = 0;
const appendToEventLog = (
event: AccountEvent,
aggregateId: string
): Effect.Effect<StoredEvent> =>
Effect.gen(function* () {
const now = yield* DateTime.now;
const storedEvent: StoredEvent = {
eventId: `evt-${eventVersion}-${Date.now()}`,
eventType: event._tag,
aggregateId,
aggregateType: "Account",
data: event,
metadata: {
timestamp: now.toEpochMillis(),
version: ++eventVersion,
},
};
// 附加到日志(模拟)
eventLog.push(storedEvent);
console.log(
`[v${storedEvent.metadata.version}] ${storedEvent.eventType}: ${aggregateId}`
);
return storedEvent;
});
// 模拟从各种账户操作产生的事件流
const accountEvents: Stream.Stream<[string, AccountEvent]> = Stream.fromIterable([
[
"acc-1",
new AccountCreated({
accountId: "acc-1",
owner: "Alice",
initialBalance: 1000,
}),
],
["acc-1", new MoneyDeposited({ accountId: "acc-1", amount: 500 })],
["acc-1", new MoneyWithdrawn({ accountId: "acc-1", amount: 200 })],
[
"acc-2",
new AccountCreated({
accountId: "acc-2",
owner: "Bob",
initialBalance: 2000,
}),
],
["acc-2", new MoneyDeposited({ accountId: "acc-2", amount: 1000 })],
["acc-1", new AccountClosed({ accountId: "acc-1" })],
]);
// 汇:将每个事件附加到日志
const eventLogSink: Sink.Sink<number, never, [string, AccountEvent]> = Sink.fold(
0,
(count, [aggregateId, event]) =>
appendToEventLog(event, aggregateId).pipe(
Effect.map(() => count + 1)
),
(count) => Effect.succeed(count)
);
// 运行流并附加所有事件
const program = Effect.gen(function* () {
const totalEvents = yield* accountEvents.pipe(Stream.run(eventLogSink));
console.log(`
总共附加事件数:${totalEvents}`);
console.log(`
事件日志内容:`);
eventLog.forEach((event) => {
console.log(` [v${event.metadata.version}] ${event.eventType}`);
});
});
Effect.runPromise(program);
此模式:
- 定义事件类型 使用标记错误(如AccountCreated、MoneyDeposited等)
- 创建事件信封 包含元数据(时间戳、版本、原因)
- 流式传输事件 从各种来源
- 附加到日志 使用适当的版本控制和排序
- 维护历史记录 用于重建和审计
理由:
当消费表示系统更改的事件流时,使用Sink将每个事件附加到事件日志。事件日志提供不可变、有序的记录,支持事件溯源、审计跟踪和时间查询。
事件日志是许多模式的基础:
- 事件溯源:不是存储当前状态,而是存储导致该状态的事件序列
- 审计跟踪:完整的、防篡改的谁、何时、做了什么记录
- 时间查询:在任何时间点重建状态
- 一致性:事件发生的单一真实来源
- 重放:通过重放事件重建状态或测试更改
与批量插入不同,事件日志是仅追加的。每个事件一旦写入即不可变。这种简单性支持:
- 快速附加(无更新,仅顺序写入)
- 自然排序(按写入顺序的事件)
- 易于分发(复制日志)
- 强一致性(事件是事实,不会改变)
汇模式4:将流记录发送到消息队列
规则: 将流记录流式传输到消息队列,使用适当的批处理和确认,以实现可靠的分布式数据流。
良好示例:
此示例演示了流式传输传感器读数,并以基于主题的分区方式发布到消息队列。
import { Effect, Stream, Sink, Chunk } from "effect";
interface SensorReading {
readonly sensorId: string;
readonly location: string;
readonly temperature: number;
readonly humidity: number;
readonly timestamp: number;
}
// 模拟消息队列发布者
interface QueuePublisher {
readonly publish: (
topic: string,
partition: string,
messages: readonly SensorReading[]
) => Effect.Effect<{ acknowledged: number; messageIds: string[] }>;
}
// 创建模拟队列发布者
const createMockPublisher = (): QueuePublisher => {
const publishedMessages: Record<string, SensorReading[]> = {};
return {
publish: (topic, partition, messages) =>
Effect.gen(function* () {
const key = `${topic}/${partition}`;
publishedMessages[key] = [
...(publishedMessages[key] ?? []),
...messages,
];
const messageIds = Array.from({ length: messages.length }, (_, i) =>
`msg-${Date.now()}-${i}`
);
console.log(
`发布 ${messages.length} 条消息到 ${key}(批次)`
);
return { acknowledged: messages.length, messageIds };
}),
};
};
// 基于传感器位置确定分区键
const getPartitionKey = (reading: SensorReading): string =>
reading.location; // 按位置路由以实现数据局部性
// 模拟传感器读数流
const sensorStream: Stream.Stream<SensorReading> = Stream.fromIterable([
{
sensorId: "temp-1",
location: "warehouse-a",
temperature: 22.5,
humidity: 45,
timestamp: Date.now(),
},
{
sensorId: "temp-2",
location: "warehouse-b",
temperature: 21.0,
humidity: 50,
timestamp: Date.now() + 100,
},
{
sensorId: "temp-3",
location: "warehouse-a",
temperature: 22.8,
humidity: 46,
timestamp: Date.now() + 200,
},
{
sensorId: "temp-4",
location: "warehouse-c",
temperature: 20.5,
humidity: 55,
timestamp: Date.now() + 300,
},
{
sensorId: "temp-5",
location: "warehouse-b",
temperature: 21.2,
humidity: 51,
timestamp: Date.now() + 400,
},
{
sensorId: "temp-6",
location: "warehouse-a",
temperature: 23.0,
humidity: 47,
timestamp: Date.now() + 500,
},
]);
// 创建汇:批量发布到消息队列
const createQueuePublishSink = (
publisher: QueuePublisher,
topic: string,
batchSize: number = 100
): Sink.Sink<number, Error, SensorReading> =>
Sink.fold(
{ batches: new Map<string, SensorReading[]>(), totalPublished: 0 },
(state, reading) =>
Effect.gen(function* () {
const partition = getPartitionKey(reading);
const batch = state.batches.get(partition) ?? [];
const newBatch = [...batch, reading];
if (newBatch.length >= batchSize) {
// 批次已满,发布它
const result = yield* publisher.publish(topic, partition, newBatch);
const newState = new Map(state.batches);
newState.delete(partition);
return {
...state,
batches: newState,
totalPublished: state.totalPublished + result.acknowledged,
};
} else {
// 添加到批次并继续
const newState = new Map(state.batches);
newState.set(partition, newBatch);
return { ...state, batches: newState };
}
}),
(state) =>
Effect.gen(function* () {
let finalCount = state.totalPublished;
// 发布任何剩余的部分批次
for (const [partition, batch] of state.batches) {
if (batch.length > 0) {
const result = yield* publisher.publish(topic, partition, batch);
finalCount += result.acknowledged;
}
}
return finalCount;
})
);
// 运行流并发布到队列
const program = Effect.gen(function* () {
const publisher = createMockPublisher();
const topic = "sensor-readings";
const published = yield* sensorStream.pipe(
Stream.run(createQueuePublishSink(publisher, topic, 50)) // 批次大小50
);
console.log(
`
发布到队列的总消息数:${published}`
);
});
Effect.runPromise(program);
此模式:
- 按分区分组读数(位置)以实现数据局部性
- 批处理记录 在发布前(每次50个)
- 发布批次 到队列,带分区键
- 刷新部分批次 当流结束时
- 跟踪队列确认
理由:
当消费需要分发到其他系统的事件流时,使用Sink将它们发布到消息队列。消息队列提供可靠的、可扩展的分发,具有排序和恰好一次语义等保证。
消息队列是事件驱动架构的支柱:
- 解耦:生产者不等待消费者
- 可扩展性:多个订阅者可以独立消费
- 持久性:即使订阅者下线,消息也持久化
- 排序:维护事件序列(每个分区/主题)
- 可靠性:确认和重试确保无消息丢失
与直接写入不同,队列发布是异步的,支持:
- 高吞吐量发布(每个操作批量多个记录)
- 背压处理(队列管理流)
- 多订阅者模式(扇出)
- 死信队列用于错误处理
汇模式5:故障时回退到替代汇
规则: 实现回退汇,以优雅地处理故障,并确保即使主目的地不可用,数据也能持久化。
良好示例:
此示例演示了一个系统,它首先尝试将订单记录写入快速内存缓存,如果缓存失败则回退到数据库,如果数据库失败则回退到死信文件。
import { Effect, Stream, Sink, Chunk, Either, Data } from "effect";
interface Order {
readonly orderId: string;
readonly customerId: string;
readonly total: number;
readonly timestamp: number;
}
class CacheSinkError extends Data.TaggedError("CacheSinkError")<{
readonly reason: string;
}> {}
class DatabaseSinkError extends Data.TaggedError("DatabaseSinkError")<{
readonly reason: string;
}> {}
// 模拟内存缓存汇(快速但有限)
const createCacheSink = (): Sink.Sink<number, CacheSinkError, Order> => {
const cache: Order[] = [];
const MAX_CACHE_SIZE = 1000;
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
if (cache.length >= MAX_CACHE_SIZE) {
yield* Effect.fail(
new CacheSinkError({
reason: `缓存已满(${cache.length}/${MAX_CACHE_SIZE})`,
})
);
}
cache.push(order);
console.log(`[CACHE] 缓存订单 ${order.orderId}`);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(`[CACHE] 最终:缓存中 ${count} 个订单`);
return count;
})
);
};
// 模拟数据库汇(较慢但可靠)
const createDatabaseSink = (): Sink.Sink<number, DatabaseSinkError, Order> => {
const orders: Order[] = [];
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
// 模拟偶尔的数据库故障
if (Math.random() < 0.1) {
yield* Effect.fail(
new DatabaseSinkError({
reason: "连接超时",
})
);
}
orders.push(order);
console.log(`[DATABASE] 持久化订单 ${order.orderId}`);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(`[DATABASE] 最终:数据库中 ${count} 个订单`);
return count;
})
);
};
// 模拟文件汇(总是工作但慢)
const createDeadLetterSink = (): Sink.Sink<number, never, Order> => {
const deadLetters: Order[] = [];
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
deadLetters.push(order);
console.log(
`[DEAD-LETTER] 将订单 ${order.orderId} 写入死信文件`
);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(
`[DEAD-LETTER] 最终:死信文件中 ${count} 个订单`
);
return count;
})
);
};
// 创建回退汇:尝试缓存 -> 数据库 -> 文件
const createFallbackSink = (): Sink.Sink<
{ readonly cached: number; readonly persisted: number; readonly deadLetters: number },
never,
Order
> =>
Sink.fold(
{ cached: 0, persisted: 0, deadLetters: 0 },
(state, order) =>
Effect.gen(function* () {
// 先尝试缓存
const cacheResult = yield* createCacheSink()
.pipe(Sink.feed(Chunk.of(order)))
.pipe(Effect.either);
if (Either.isRight(cacheResult)) {
return {
...state,
cached: state.cached + cacheResult.right,
};
}
console.log(
`[FALLBACK] 缓存失败(${cacheResult.left.reason}),尝试数据库`
);
// 缓存失败,尝试数据库
const dbResult = yield* createDatabaseSink()
.pipe(Sink.feed(Chunk.of(order)))
.pipe(Effect.either);
if (Either.isRight(dbResult)) {
return {
...state,
persisted: state.persisted + dbResult.right,
};
}
console.log(
`[FALLBACK] 数据库失败(${dbResult.left.reason}),回退到死信`
);
// 数据库失败,使用死信
const dlResult = yield* createDeadLetterSink()
.pipe(Sink.feed(Chunk.of(order)));
return {
...state,
deadLetters: state.deadLetters + dlResult,
};
}),
(state) =>
Effect.gen(function* () {
console.log(`
[SUMMARY]`);
console.log(` 缓存: ${state.cached}`);
console.log(` 持久化: ${state.persisted}`);
console.log(` 死信: ${state.deadLetters}`);
return state;
})
);
// 模拟订单流
const orderStream: Stream.Stream<Order> = Stream.fromIterable([
{
orderId: "order-1",
customerId: "cust-1",
total: 99.99,
timestamp: Date.now(),
},
{
orderId: "order-2",
customerId: "cust-2",
total: 149.99,
timestamp: Date.now() + 100,
},
{
orderId: "order-3",
customerId: "cust-1",
total: 49.99,
timestamp: Date.now() + 200,
},
{
orderId: "order-4",
customerId: "cust-3",
total: 199.99,
timestamp: Date.now() + 300,
},
{
orderId: "order-5",
customerId: "cust-2",
total: 89.99,
timestamp: Date.now() + 400,
},
]);
// 运行带回退汇的流
const program = Effect.gen(function* () {
const result = yield* orderStream.pipe(Stream.run(createFallbackSink()));
console.log(`
总共处理订单数:${result.cached + result.persisted + result.deadLetters}`);
});
Effect.runPromise(program);
此模式:
- 首先尝试缓存(快速,容量有限)
- 回退到数据库 如果缓存已满
- 回退到死信 如果数据库失败
- 跟踪每个记录 使用了哪个汇
- 报告摘要 数据去向
理由:
当消费流到可能失败的主目的地时,使用回退模式包装它。如果主汇失败,自动将流重定向到替代汇。这支持系统逐渐降级,而不是完全失败。
生产系统需要弹性:
- 主故障:数据库停机、网络超时、配额超出
- 逐渐降级:保持系统运行,即使能力降低
- 无数据丢失:回退确保数据被持久化到某处
- 操作灵活性:根据故障类型选择回退
- 监控:跟踪回退使用情况以提醒操作员
没有回退模式时:
- 主目的地失败时系统失败
- 主不可用时数据丢失
- 无明确信号表明降级发生
有回退汇时:
- 即使主失败,流继续
- 数据安全持久化到替代汇
- 清晰的审计跟踪,记录使用了哪个汇
汇模式6:重试失败的流操作
规则: 在汇中实现重试策略,以处理瞬时故障并提高弹性,无需手动干预。
良好示例:
此示例演示了重试数据库写入,使用指数退避,跟踪尝试次数,并在永久故障时回退。
import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect";
interface UserRecord {
readonly userId: string;
readonly name: string;
readonly email: string;
}
class WriteError extends Error {
readonly isTransient: boolean;
constructor(message: string, isTransient: boolean = true) {
super(message);
this.name = "WriteError";
this.isTransient = isTransient;
}
}
// 模拟偶尔失败的数据库
const database = {
failureRate: 0.3, // 30% 瞬时故障率
permanentFailureRate: 0.05, // 5% 永久故障率
insertUser: (user: UserRecord): Effect.Effect<void, WriteError> =>
Effect.gen(function* () {
const rand = Math.random();
// 永久故障(例如,约束违规)
if (rand < database.permanentFailureRate) {
throw new WriteError(
`永久:用户 ${user.userId} 已存在`,
false
);
}
// 瞬时故障(例如,连接超时)
if (rand < database.permanentFailureRate + database.failureRate) {
throw new WriteError(
`瞬时:写入 ${user.userId} 时连接超时`,
true
);
}
// 成功
console.log(`✓ 写入用户 ${user.userId}`);
}),
};
// 重试配置
interface RetryConfig {
readonly maxAttempts: number;
readonly initialDelayMs: number;
readonly maxDelayMs: number;
readonly backoffFactor: number;
}
const defaultRetryConfig: RetryConfig = {
maxAttempts: 5,
initialDelayMs: 100, // 从100毫秒开始
maxDelayMs: 5000, // 上限5秒
backoffFactor: 2, // 每次加倍
};
// 结果跟踪
interface OperationResult {
readonly succeeded: number;
readonly transientFailures: number;
readonly permanentFailures: number;
readonly detailedStats: Array<{
readonly userId: string;
readonly attempts: number;
readonly status: "success" | "transient-failed" | "permanent-failed";
}>;
}
// 创建带重试逻辑的汇
const createRetrySink = (config: RetryConfig): Sink.Sink<OperationResult, never, UserRecord> =>
Sink.fold(
{
succeeded: 0,
transientFailures: 0,
permanentFailures: 0,
detailedStats: [],
},
(state, user) =>
Effect.gen(function* () {
let lastError: WriteError | null = null;
let attempts = 0;
// 重试循环
for (attempts = 1; attempts <= config.maxAttempts; attempts++) {
try {
yield* database.insertUser(user);
// 成功!
console.log(
`[${user.userId}] 尝试 ${attempts}/${config.maxAttempts} 成功`
);
return {
...state,
succeeded: state.succeeded + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "success",
},
],
};
} catch (error) {
lastError = error as WriteError;
if (!lastError.isTransient) {
// 永久故障,不重试
console.log(
`[${user.userId}] 永久故障:${lastError.message}`
);
return {
...state,
permanentFailures: state.permanentFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "permanent-failed",
},
],
};
}
// 瞬时故障,如果还有尝试次数则重试
if (attempts < config.maxAttempts) {
// 计算带指数退避的延迟
let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1);
delayMs = Math.min(delayMs, config.maxDelayMs);
// 添加抖动(±10%)
const jitter = delayMs * 0.1;
delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter;
console.log(
`[${user.userId}] 瞬时故障(尝试 ${attempts}/${config.maxAttempts}):${lastError.message}`
);
console.log(` 在 ${Math.round(delayMs)} 毫秒后重试...`);
yield* Effect.sleep(Duration.millis(Math.round(delayMs)));
}
}
}
// 所有重试已用尽
console.log(
`[${user.userId}] 在 ${config.maxAttempts} 次尝试后失败`
);
return {
...state,
transientFailures: state.transientFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts: config.maxAttempts,
status: "transient-failed",
},
],
};
}),
(state) =>
Effect.gen(function* () {
console.log(`
[SUMMARY]`);
console.log(` 成功: ${state.succeeded}`);
console.log(` 瞬时故障: ${state.transientFailures}`);
console.log(` 永久故障: ${state.permanentFailures}`);
console.log(` 总计: ${state.detailedStats.length}`);
// 显示详细统计
const failed = state.detailedStats.filter((s) => s.status !== "success");
if (failed.length > 0) {
console.log(`
[FAILURES]`);
failed.forEach((stat) => {
console.log(
` ${stat.userId}: ${stat.attempts} 次尝试(${stat.status})`
);
});
}
return state;
})
);
// 模拟要插入的用户流
const userStream: Stream.Stream<UserRecord> = Stream.fromIterable([
{ userId: "user-1", name: "Alice", email: "alice@example.com" },
{ userId: "user-2", name: "Bob", email: "bob@example.com" },
{ userId: "user-3", name: "Charlie", email: "charlie@example.com" },
{ userId: "user-4", name: "Diana", email: "diana@example.com" },
{ userId: "user-5", name: "Eve", email: "eve@example.com" },
]);
// 运行带重试汇的流
const program = Effect.gen(function* () {
const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig)));
console.log(`
处理完成。`);
});
Effect.runPromise(program);
此模式:
- 尝试操作 直到最大重试次数
- 区分瞬时与永久故障
- 使用指数退避 来间隔重试
- 添加抖动 以防止雪崩效应
- 跟踪详细统计 用于监控
- 报告结果摘要
理由:
当消费流到可能经历瞬时故障(如网络超时、限速、暂时不可用)的目的地时,使用重试策略包装汇操作。使用指数退避避免压倒正在恢复的系统,同时仍能快速恢复。
瞬时故障在分布式系统中常见:
- 网络超时:临时连接问题自行解决
- 限速:一旦限速窗口重置,服务恢复
- 暂时不可用:服务重启或扩展
- 断路器跳闸:服务在退避期后恢复
没有重试逻辑时:
- 每个瞬时故障导致数据丢失或流中断
- 需要手动干预来重启
- 系统显得比实际更不可靠
有智能重试逻辑时:
- 从瞬时故障自动恢复
- 指数退避防止雪崩效应
- 清晰可见哪些操作永久失败
- 尽管临时问题,数据持续流动
汇模式3:将流行写入文件
规则: 使用缓冲输出和适当的资源管理,高效地将流行写入文件。
良好示例:
此示例演示了流式传输日志条目,并使用缓冲写入文件。
import { Effect, Stream, Sink, Chunk, FileSystem } from "effect";
interface LogEntry {
readonly level: "debug" | "info" | "warn" | "error";
readonly message: string;
readonly timestamp: number;
}
// 将日志条目格式化为一行
const formatLogLine = (entry: LogEntry): string => {
const iso = new Date(entry.timestamp).toISOString();
return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`;
};
// 模拟日志条目流
const logStream: Stream.Stream<LogEntry> = Stream.fromIterable([
{ level: "info", message: "服务器启动中", timestamp: Date.now() },
{ level: "debug", message: "加载配置", timestamp: Date.now() + 100 },
{ level: "info", message: "连接到数据库", timestamp: Date.now() + 200 },
{ level: "warn", message: "检测到高内存使用", timestamp: Date.now() + 300 },
{ level: "info", message: "处理请求", timestamp: Date.now() + 400 },
{ level: "error", message: "连接超时", timestamp: Date.now() + 500 },
{ level: "info", message: "重试连接", timestamp: Date.now() + 600 },
{ level: "info", message: "连接恢复", timestamp: Date.now() + 700 },
]);
// 创建带缓冲的文件写入汇
const createFileWriteSink = (
filePath: string,
bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
Effect.scoped(
Effect.gen(function* () {
// 以追加模式打开文件
const fs = yield* FileSystem.FileSystem;
const handle = yield* fs.open(filePath, "a");
let buffer: string[] = [];
let lineCount = 0;
// 将缓冲行刷新到磁盘
const flush = Effect.gen(function* () {
if (buffer.length === 0) return;
const content = buffer.join("
") + "
";
yield* fs.write(handle, content);
buffer = [];
});
// 返回汇
return Sink.fold(
0,
(count, line: string) =>
Effect.gen(function* () {
buffer.push(line);
const newCount = count + 1;
// 当缓冲区达到大小限制时刷新
if (buffer.length >= bufferSize) {
yield* flush;
}
return newCount;
}),
(count) =>
Effect.gen(function* () {
// 关闭前刷新任何剩余行
yield* flush;
yield* fs.close(handle);
return count;
})
);
})
).pipe(
Effect.flatten
);
// 处理日志流
const program = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const filePath = "/tmp/app.log";
// 首先清除文件
yield* fs.writeFileString(filePath, "");
// 流式传输日志,格式化并写入文件
const written = yield* logStream.pipe(
Stream.map(formatLogLine),
Stream.run(createFileWriteSink(filePath, 50)) // 刷新前缓冲50行
);
console.log(`写入 ${written} 行日志到 ${filePath}`);
// 读回文件以验证
const content = yield* fs.readFileString(filePath);
console.log("
文件内容:");
console.log(content);
});
Effect.runPromise(program);
此模式:
- 打开文件 用于追加
- 在内存中缓冲日志行(刷新前50行)
- 定期刷新 当缓冲区填满或流结束时
- 使用作用域安全关闭文件
- 跟踪行数 以确认
理由:
当消费流数据以持久化为文件行时,使用带文件写入器的Sink。缓冲输出以提高效率,并使用Effect的作用域管理确保适当的资源清理。
将流数据写入文件需要:
- 缓冲:一次写入一行很慢。在刷新到磁盘前缓冲多行
- 效率:通过批量写入减少系统调用和I/O开销
- 资源管理:确保即使出错也正确关闭文件句柄
- 排序:保持流中行的顺序
此模式对于以下情况必不可少:
- 日志文件和审计跟踪
- CSV/JSON行导出
- 流数据归档
- 带文件中间件的数据管道