名称: effect-patterns-scheduling 描述: 用于Effect-TS应用中的调度模式。在Effect-TS应用程序中处理调度任务时使用此技能。
Effect-TS模式:调度
本技能提供了3个精选的Effect-TS调度模式。 在以下任务中使用此技能:
- 调度
- Effect-TS应用中的最佳实践
- 现实世界的模式和解决方案
🟢 初学者模式
重试失败的操作
规则: 使用Effect.retry配合Schedule来优雅处理临时性故障。
好例子:
import { Effect, Schedule, Data } from "effect"
// ============================================
// 1. 定义错误类型
// ============================================
class NetworkError extends Data.TaggedError("NetworkError")<{
readonly message: string
}> {}
class RateLimitError extends Data.TaggedError("RateLimitError")<{
readonly retryAfter: number
}> {}
class NotFoundError extends Data.TaggedError("NotFoundError")<{
readonly resource: string
}> {}
// ============================================
// 2. 模拟一个不稳定的API调用
// ============================================
let callCount = 0
const fetchData = Effect.gen(function* () {
callCount++
yield* Effect.log(`API调用尝试 ${callCount}`)
// 模拟间歇性故障
if (callCount < 3) {
return yield* Effect.fail(new NetworkError({ message: "连接超时" }))
}
return { data: "成功!", attempts: callCount }
})
// ============================================
// 3. 基本重试 - 固定尝试次数
// ============================================
const withBasicRetry = fetchData.pipe(
Effect.retry(Schedule.recurs(5)) // 最多重试5次
)
// ============================================
// 4. 带延迟的重试
// ============================================
const withDelayedRetry = fetchData.pipe(
Effect.retry(
Schedule.spaced("500毫秒").pipe(
Schedule.intersect(Schedule.recurs(5))
)
)
)
// ============================================
// 5. 仅重试特定错误
// ============================================
const fetchWithErrors = (shouldFail: boolean) =>
Effect.gen(function* () {
if (shouldFail) {
// 随机失败,生成不同错误
const random = Math.random()
if (random < 0.5) {
return yield* Effect.fail(new NetworkError({ message: "超时" }))
} else if (random < 0.8) {
return yield* Effect.fail(new RateLimitError({ retryAfter: 1000 }))
} else {
return yield* Effect.fail(new NotFoundError({ resource: "用户:123" }))
}
}
return "数据获取成功!"
})
// 仅重试网络和速率限制错误,非NotFoundError
const retryTransientOnly = fetchWithErrors(true).pipe(
Effect.retry({
schedule: Schedule.recurs(3),
while: (error) =>
error._tag === "NetworkError" || error._tag === "RateLimitError",
})
)
// ============================================
// 6. 带指数退避的重试
// ============================================
const withExponentialBackoff = fetchData.pipe(
Effect.retry(
Schedule.exponential("100毫秒", 2).pipe( // 100ms, 200ms, 400ms...
Schedule.intersect(Schedule.recurs(5)) // 最大5次重试
)
)
)
// ============================================
// 7. 运行并观察
// ============================================
const program = Effect.gen(function* () {
yield* Effect.log("开始重试演示...")
// 重置计数器
callCount = 0
const result = yield* withBasicRetry
yield* Effect.log(`最终结果: ${JSON.stringify(result)}`)
})
Effect.runPromise(program)
原理:
使用 Effect.retry 自动重试因临时错误(如网络超时)而失败的操作。
许多故障是暂时的:
- 网络问题 - 连接断开、超时
- 速率限制 - 请求过多
- 资源争用 - 数据库锁
- 服务重启 - 短暂不可用
自动重试无需手动干预即可处理这些情况。
你的第一个调度器
规则: 使用Schedule来控制效果运行的时间和频率。
好例子:
import { Effect, Schedule } from "effect"
// ============================================
// 1. 重试失败的操作
// ============================================
let attempts = 0
const flakyOperation = Effect.gen(function* () {
attempts++
if (attempts < 3) {
yield* Effect.log(`尝试 ${attempts} 失败`)
return yield* Effect.fail(new Error("临时故障"))
}
return `第 ${attempts} 次尝试成功`
})
// 最多重试5次
const withRetry = flakyOperation.pipe(
Effect.retry(Schedule.recurs(5))
)
// ============================================
// 2. 重复成功操作
// ============================================
const logTime = Effect.gen(function* () {
const now = new Date().toISOString()
yield* Effect.log(`当前时间: ${now}`)
return now
})
// 重复3次
const repeated = logTime.pipe(
Effect.repeat(Schedule.recurs(3))
)
// ============================================
// 3. 在操作间添加延迟
// ============================================
// 每秒重复一次,共5次
const polling = logTime.pipe(
Effect.repeat(
Schedule.spaced("1秒").pipe(
Schedule.intersect(Schedule.recurs(5))
)
)
)
// ============================================
// 4. 常见调度模式
// ============================================
// 尝试间固定延迟
const fixedDelay = Schedule.spaced("500毫秒")
// 递增延迟(1秒, 2秒, 4秒, 8秒...)
const exponentialBackoff = Schedule.exponential("1秒")
// 最大尝试次数
const limitedAttempts = Schedule.recurs(3)
// 组合:指数退避,最大5次尝试
const retryPolicy = Schedule.exponential("100毫秒").pipe(
Schedule.intersect(Schedule.recurs(5))
)
// ============================================
// 5. 运行示例
// ============================================
const program = Effect.gen(function* () {
yield* Effect.log("--- 重试示例 ---")
const result = yield* withRetry
yield* Effect.log(`结果: ${result}`)
yield* Effect.log("
--- 重复示例 ---")
yield* repeated
})
Effect.runPromise(program)
原理:
使用 Schedule 控制Effect程序中的定时——重试失败操作、重复成功操作或添加延迟。
调度器解决常见定时问题:
- 重试 - 失败后重试
- 轮询 - 定期检查更新
- 速率限制 - 控制运行速度
- 退避 - 增加尝试间的延迟
🟡 中级模式
调度模式1:在固定间隔重复一个效果
规则: 使用Schedule.fixed以固定间隔重复效果,用于稳态操作和后台任务。
好例子:
此示例演示了一个健康检查服务,每30秒轮询多个服务端点并报告其状态。
import { Effect, Schedule, Duration } from "effect";
interface ServiceStatus {
readonly service: string;
readonly url: string;
readonly isHealthy: boolean;
readonly responseTime: number;
readonly lastChecked: number;
}
// 模拟健康检查,调用端点
const checkServiceHealth = (
url: string,
service: string
): Effect.Effect<ServiceStatus> =>
Effect.gen(function* () {
const startTime = Date.now();
// 模拟带偶尔失败的HTTP调用
const isHealthy = Math.random() > 0.1; // 90% 成功率
const responseTime = Math.random() * 500; // 0-500ms
yield* Effect.sleep(Duration.millis(Math.round(responseTime)));
if (!isHealthy) {
yield* Effect.fail(new Error(`${service} 不健康`));
}
return {
service,
url,
isHealthy: true,
responseTime: Math.round(Date.now() - startTime),
lastChecked: Date.now(),
};
});
// 多服务健康检查
interface HealthCheckConfig {
readonly services: Array<{
readonly name: string;
readonly url: string;
}>;
readonly intervalSeconds: number;
}
// 跟踪服务状态
const serviceStatuses = new Map<string, ServiceStatus>();
// 检查所有服务并报告状态
const checkAllServices = (
config: HealthCheckConfig
): Effect.Effect<void> =>
Effect.gen(function* () {
for (const service of config.services) {
const status = yield* checkServiceHealth(service.url, service.name).pipe(
Effect.either
);
if (status._tag === "Right") {
serviceStatuses.set(service.name, status.right);
console.log(
`✓ ${service.name}: 成功 (${status.right.responseTime}毫秒)`
);
} else {
console.log(`✗ ${service.name}: 失败`);
// 如果可用,保持上次已知状态
}
}
});
// 创建重复的健康检查
const createHealthCheckScheduler = (
config: HealthCheckConfig
): Effect.Effect<void> =>
checkAllServices(config).pipe(
// 用固定间隔调度(固定 = 忽略执行时间)
Effect.repeat(
Schedule.fixed(Duration.seconds(config.intervalSeconds))
)
);
// 报告当前状态
const reportStatus = (): Effect.Effect<void> =>
Effect.sync(() => {
if (serviceStatuses.size === 0) {
console.log("
[状态] 尚未检查任何服务");
return;
}
console.log("
[状态报告]");
for (const [service, status] of serviceStatuses) {
const ago = Math.round((Date.now() - status.lastChecked) / 1000);
console.log(
` ${service}: ${status.isHealthy ? "✓" : "✗"} (${ago} 秒前检查)`
);
}
});
// 在后台运行健康检查器并定期检查状态
const program = Effect.gen(function* () {
const config: HealthCheckConfig = {
services: [
{ name: "API", url: "https://api.example.com/health" },
{ name: "Database", url: "https://db.example.com/health" },
{ name: "Cache", url: "https://cache.example.com/health" },
],
intervalSeconds: 5, // 每5秒检查一次
};
// 分叉健康检查器以在后台运行
const checker = yield* createHealthCheckScheduler(config).pipe(
Effect.fork
);
// 每15秒检查和报告状态,持续60秒
yield* reportStatus().pipe(
Effect.repeat(
Schedule.addDelay(
Schedule.recurs(3), // 3次重复 = 总共4次(初始 + 3次)
() => Duration.seconds(15)
)
)
);
// 中断后台检查器
yield* checker.interrupt();
});
Effect.runPromise(program);
此模式:
- 定义服务健康检查,可能失败
- 使用Schedule.fixed 每5秒重复一次
- 优雅处理故障(保持上次已知状态)
- 在后台运行,同时主逻辑继续
- 定期报告当前状态
原理:
当需要以固定间隔重复运行效果时(例如,每5秒,每30分钟),使用 Schedule.fixed 指定间隔。这创建一个调度器,重复效果直到条件停止,执行之间有精确定时。
许多生产系统需要定期操作:
- 健康检查:每30秒轮询服务可用性
- 缓存刷新:每5分钟更新缓存
- 指标收集:每10秒收集系统指标
- 数据同步:定期与远程服务同步数据
- 清理任务:每晚移除陈旧数据
没有适当调度的缺点:
- 使用
while循环手动轮询浪费CPU(忙等待) - Thread.sleep 阻塞线程,阻止其他工作
- 失败时无自动重启
- 难以确定性测试
使用 Schedule.fixed 的优势:
- 高效、非阻塞的重复
- 自动故障处理和重试
- 使用TestClock可测试
- 干净、声明式语法