Effect-TS调度模式Skill effect-patterns-scheduling

这个技能提供了Effect-TS中处理调度的3个模式,包括重试失败操作、控制效果运行频率和定时等,适用于函数式编程环境中的网络故障处理、轮询、速率限制等场景。关键词:Effect-TS,调度,重试,重复,函数式编程,TypeScript,定时控制。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: effect-patterns-scheduling 描述: 用于Effect-TS应用中的调度模式。在Effect-TS应用程序中处理调度任务时使用此技能。

Effect-TS模式:调度

本技能提供了3个精选的Effect-TS调度模式。 在以下任务中使用此技能:

  • 调度
  • Effect-TS应用中的最佳实践
  • 现实世界的模式和解决方案

🟢 初学者模式

重试失败的操作

规则: 使用Effect.retry配合Schedule来优雅处理临时性故障。

好例子:

import { Effect, Schedule, Data } from "effect"

// ============================================
// 1. 定义错误类型
// ============================================

class NetworkError extends Data.TaggedError("NetworkError")<{
  readonly message: string
}> {}

class RateLimitError extends Data.TaggedError("RateLimitError")<{
  readonly retryAfter: number
}> {}

class NotFoundError extends Data.TaggedError("NotFoundError")<{
  readonly resource: string
}> {}

// ============================================
// 2. 模拟一个不稳定的API调用
// ============================================

let callCount = 0
const fetchData = Effect.gen(function* () {
  callCount++
  yield* Effect.log(`API调用尝试 ${callCount}`)

  // 模拟间歇性故障
  if (callCount < 3) {
    return yield* Effect.fail(new NetworkError({ message: "连接超时" }))
  }

  return { data: "成功!", attempts: callCount }
})

// ============================================
// 3. 基本重试 - 固定尝试次数
// ============================================

const withBasicRetry = fetchData.pipe(
  Effect.retry(Schedule.recurs(5))  // 最多重试5次
)

// ============================================
// 4. 带延迟的重试
// ============================================

const withDelayedRetry = fetchData.pipe(
  Effect.retry(
    Schedule.spaced("500毫秒").pipe(
      Schedule.intersect(Schedule.recurs(5))
    )
  )
)

// ============================================
// 5. 仅重试特定错误
// ============================================

const fetchWithErrors = (shouldFail: boolean) =>
  Effect.gen(function* () {
    if (shouldFail) {
      // 随机失败,生成不同错误
      const random = Math.random()
      if (random < 0.5) {
        return yield* Effect.fail(new NetworkError({ message: "超时" }))
      } else if (random < 0.8) {
        return yield* Effect.fail(new RateLimitError({ retryAfter: 1000 }))
      } else {
        return yield* Effect.fail(new NotFoundError({ resource: "用户:123" }))
      }
    }
    return "数据获取成功!"
  })

// 仅重试网络和速率限制错误,非NotFoundError
const retryTransientOnly = fetchWithErrors(true).pipe(
  Effect.retry({
    schedule: Schedule.recurs(3),
    while: (error) =>
      error._tag === "NetworkError" || error._tag === "RateLimitError",
  })
)

// ============================================
// 6. 带指数退避的重试
// ============================================

const withExponentialBackoff = fetchData.pipe(
  Effect.retry(
    Schedule.exponential("100毫秒", 2).pipe(  // 100ms, 200ms, 400ms...
      Schedule.intersect(Schedule.recurs(5))      // 最大5次重试
    )
  )
)

// ============================================
// 7. 运行并观察
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("开始重试演示...")
  
  // 重置计数器
  callCount = 0
  
  const result = yield* withBasicRetry
  yield* Effect.log(`最终结果: ${JSON.stringify(result)}`)
})

Effect.runPromise(program)

原理:

使用 Effect.retry 自动重试因临时错误(如网络超时)而失败的操作。


许多故障是暂时的:

  1. 网络问题 - 连接断开、超时
  2. 速率限制 - 请求过多
  3. 资源争用 - 数据库锁
  4. 服务重启 - 短暂不可用

自动重试无需手动干预即可处理这些情况。



你的第一个调度器

规则: 使用Schedule来控制效果运行的时间和频率。

好例子:

import { Effect, Schedule } from "effect"

// ============================================
// 1. 重试失败的操作
// ============================================

let attempts = 0
const flakyOperation = Effect.gen(function* () {
  attempts++
  if (attempts < 3) {
    yield* Effect.log(`尝试 ${attempts} 失败`)
    return yield* Effect.fail(new Error("临时故障"))
  }
  return `第 ${attempts} 次尝试成功`
})

// 最多重试5次
const withRetry = flakyOperation.pipe(
  Effect.retry(Schedule.recurs(5))
)

// ============================================
// 2. 重复成功操作
// ============================================

const logTime = Effect.gen(function* () {
  const now = new Date().toISOString()
  yield* Effect.log(`当前时间: ${now}`)
  return now
})

// 重复3次
const repeated = logTime.pipe(
  Effect.repeat(Schedule.recurs(3))
)

// ============================================
// 3. 在操作间添加延迟
// ============================================

// 每秒重复一次,共5次
const polling = logTime.pipe(
  Effect.repeat(
    Schedule.spaced("1秒").pipe(
      Schedule.intersect(Schedule.recurs(5))
    )
  )
)

// ============================================
// 4. 常见调度模式
// ============================================

// 尝试间固定延迟
const fixedDelay = Schedule.spaced("500毫秒")

// 递增延迟(1秒, 2秒, 4秒, 8秒...)
const exponentialBackoff = Schedule.exponential("1秒")

// 最大尝试次数
const limitedAttempts = Schedule.recurs(3)

// 组合:指数退避,最大5次尝试
const retryPolicy = Schedule.exponential("100毫秒").pipe(
  Schedule.intersect(Schedule.recurs(5))
)

// ============================================
// 5. 运行示例
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("--- 重试示例 ---")
  const result = yield* withRetry
  yield* Effect.log(`结果: ${result}`)

  yield* Effect.log("
--- 重复示例 ---")
  yield* repeated
})

Effect.runPromise(program)

原理:

使用 Schedule 控制Effect程序中的定时——重试失败操作、重复成功操作或添加延迟。


调度器解决常见定时问题:

  1. 重试 - 失败后重试
  2. 轮询 - 定期检查更新
  3. 速率限制 - 控制运行速度
  4. 退避 - 增加尝试间的延迟


🟡 中级模式

调度模式1:在固定间隔重复一个效果

规则: 使用Schedule.fixed以固定间隔重复效果,用于稳态操作和后台任务。

好例子:

此示例演示了一个健康检查服务,每30秒轮询多个服务端点并报告其状态。

import { Effect, Schedule, Duration } from "effect";

interface ServiceStatus {
  readonly service: string;
  readonly url: string;
  readonly isHealthy: boolean;
  readonly responseTime: number;
  readonly lastChecked: number;
}

// 模拟健康检查,调用端点
const checkServiceHealth = (
  url: string,
  service: string
): Effect.Effect<ServiceStatus> =>
  Effect.gen(function* () {
    const startTime = Date.now();

    // 模拟带偶尔失败的HTTP调用
    const isHealthy = Math.random() > 0.1; // 90% 成功率
    const responseTime = Math.random() * 500; // 0-500ms

    yield* Effect.sleep(Duration.millis(Math.round(responseTime)));

    if (!isHealthy) {
      yield* Effect.fail(new Error(`${service} 不健康`));
    }

    return {
      service,
      url,
      isHealthy: true,
      responseTime: Math.round(Date.now() - startTime),
      lastChecked: Date.now(),
    };
  });

// 多服务健康检查
interface HealthCheckConfig {
  readonly services: Array<{
    readonly name: string;
    readonly url: string;
  }>;
  readonly intervalSeconds: number;
}

// 跟踪服务状态
const serviceStatuses = new Map<string, ServiceStatus>();

// 检查所有服务并报告状态
const checkAllServices = (
  config: HealthCheckConfig
): Effect.Effect<void> =>
  Effect.gen(function* () {
    for (const service of config.services) {
      const status = yield* checkServiceHealth(service.url, service.name).pipe(
        Effect.either
      );

      if (status._tag === "Right") {
        serviceStatuses.set(service.name, status.right);
        console.log(
          `✓ ${service.name}: 成功 (${status.right.responseTime}毫秒)`
        );
      } else {
        console.log(`✗ ${service.name}: 失败`);
        // 如果可用,保持上次已知状态
      }
    }
  });

// 创建重复的健康检查
const createHealthCheckScheduler = (
  config: HealthCheckConfig
): Effect.Effect<void> =>
  checkAllServices(config).pipe(
    // 用固定间隔调度(固定 = 忽略执行时间)
    Effect.repeat(
      Schedule.fixed(Duration.seconds(config.intervalSeconds))
    )
  );

// 报告当前状态
const reportStatus = (): Effect.Effect<void> =>
  Effect.sync(() => {
    if (serviceStatuses.size === 0) {
      console.log("
[状态] 尚未检查任何服务");
      return;
    }

    console.log("
[状态报告]");
    for (const [service, status] of serviceStatuses) {
      const ago = Math.round((Date.now() - status.lastChecked) / 1000);
      console.log(
        `  ${service}: ${status.isHealthy ? "✓" : "✗"} (${ago} 秒前检查)`
      );
    }
  });

// 在后台运行健康检查器并定期检查状态
const program = Effect.gen(function* () {
  const config: HealthCheckConfig = {
    services: [
      { name: "API", url: "https://api.example.com/health" },
      { name: "Database", url: "https://db.example.com/health" },
      { name: "Cache", url: "https://cache.example.com/health" },
    ],
    intervalSeconds: 5, // 每5秒检查一次
  };

  // 分叉健康检查器以在后台运行
  const checker = yield* createHealthCheckScheduler(config).pipe(
    Effect.fork
  );

  // 每15秒检查和报告状态,持续60秒
  yield* reportStatus().pipe(
    Effect.repeat(
      Schedule.addDelay(
        Schedule.recurs(3), // 3次重复 = 总共4次(初始 + 3次)
        () => Duration.seconds(15)
      )
    )
  );

  // 中断后台检查器
  yield* checker.interrupt();
});

Effect.runPromise(program);

此模式:

  1. 定义服务健康检查,可能失败
  2. 使用Schedule.fixed 每5秒重复一次
  3. 优雅处理故障(保持上次已知状态)
  4. 在后台运行,同时主逻辑继续
  5. 定期报告当前状态

原理:

当需要以固定间隔重复运行效果时(例如,每5秒,每30分钟),使用 Schedule.fixed 指定间隔。这创建一个调度器,重复效果直到条件停止,执行之间有精确定时。


许多生产系统需要定期操作:

  • 健康检查:每30秒轮询服务可用性
  • 缓存刷新:每5分钟更新缓存
  • 指标收集:每10秒收集系统指标
  • 数据同步:定期与远程服务同步数据
  • 清理任务:每晚移除陈旧数据

没有适当调度的缺点:

  • 使用 while 循环手动轮询浪费CPU(忙等待)
  • Thread.sleep 阻塞线程,阻止其他工作
  • 失败时无自动重启
  • 难以确定性测试

使用 Schedule.fixed 的优势:

  • 高效、非阻塞的重复
  • 自动故障处理和重试
  • 使用TestClock可测试
  • 干净、声明式语法