Effect-TS模式:调度周期性任务Skill effect-patterns-scheduling-periodic-tasks

本技能提供Effect-TS中调度周期性任务的精选模式,包括防抖、节流、cron表达式和高级重试链与断路器,用于处理事件调度、实现最佳实践和解决实际应用问题。关键词:Effect-TS, 调度, 周期性任务, 防抖, 节流, cron, 重试, 断路器, 函数式编程

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

name: effect-patterns-scheduling-periodic-tasks description: Effect-TS模式用于调度周期性任务。在Effect-TS应用程序中处理调度周期性任务时使用。

Effect-TS模式:调度周期性任务

本技能提供了3个精选的Effect-TS模式用于调度周期性任务。 在以下任务相关时使用此技能:

  • 调度周期性任务
  • Effect-TS应用程序中的最佳实践
  • 实际世界的模式和解决方案

🟡 中级模式

调度模式4:防抖和节流执行

规则: 使用防抖等待静默后执行,使用节流限制执行频率,两者对处理快速事件至关重要。

好例子:

此示例演示了常见场景下的防抖和节流。

import { Effect, Schedule, Ref } from "effect";

interface SearchQuery {
  readonly query: string;
  readonly timestamp: Date;
}

// 模拟API搜索
const performSearch = (query: string): Effect.Effect<string[]> =>
  Effect.gen(function* () {
    yield* Effect.log(`[API] Searching for: "${query}"`);

    yield* Effect.sleep("100 millis"); // 模拟API延迟

    return [
      `Result 1 for ${query}`,
      `Result 2 for ${query}`,
      `Result 3 for ${query}`,
    ];
  });

// 主程序:演示防抖和节流
const program = Effect.gen(function* () {
  console.log(`
[DEBOUNCE/THROTTLE] Handling rapid events
`);

  // 示例1:防抖搜索输入
  console.log(`[1] Debounced search (wait for silence):
`);

  const searchQueries = ["h", "he", "hel", "hell", "hello"];

  const debouncedSearches = yield* Ref.make<Effect.Effect<string[]>[]>([]);

  for (const query of searchQueries) {
    yield* Effect.log(`[INPUT] User typed: "${query}"`);

    // 在实际应用中,这会被防抖
    yield* Effect.sleep("150 millis"); // 用户输入
  }

  // 用户停止后,执行搜索
  yield* Effect.log(`[DEBOUNCE] User silent for 200ms, executing search`);

  const searchResults = yield* performSearch("hello");

  yield* Effect.log(`[RESULTS] ${searchResults.length} results found
`);

  // 示例2:节流滚动事件
  console.log(`[2] Throttled scroll handler (max 10/sec):
`);

  const scrollEventCount = yield* Ref.make(0);
  const updateCount = yield* Ref.make(0);

  // 模拟100个快速滚动事件
  for (let i = 0; i < 100; i++) {
    yield* Ref.update(scrollEventCount, (c) => c + 1);

    // 在实际应用中,滚动处理程序会被节流
    if (i % 10 === 0) {
      // 模拟节流更新(每秒最多10次)
      yield* Ref.update(updateCount, (c) => c + 1);
    }
  }

  const events = yield* Ref.get(scrollEventCount);
  const updates = yield* Ref.get(updateCount);

  yield* Effect.log(
    `[THROTTLE] ${events} scroll events → ${updates} updates (${(updates / events * 100).toFixed(1)}% update rate)
`
  );

  // 示例3:去重
  console.log(`[3] Deduplicating rapid events:
`);

  const userClicks = ["click", "click", "click", "dblclick", "click"];

  const lastClick = yield* Ref.make<string | null>(null);
  const clickCount = yield* Ref.make(0);

  for (const click of userClicks) {
    const prev = yield* Ref.get(lastClick);

    if (click !== prev) {
      yield* Effect.log(`[CLICK] Processing: ${click}`);
      yield* Ref.update(clickCount, (c) => c + 1);
      yield* Ref.set(lastClick, click);
    } else {
      yield* Effect.log(`[CLICK] Duplicate: ${click} (skipped)`);
    }
  }

  const processed = yield* Ref.get(clickCount);

  yield* Effect.log(
    `
[DEDUPE] ${userClicks.length} clicks → ${processed} processed
`
  );

  // 示例4:错误时的指数退避重试
  console.log(`[4] Throttled retry on errors:
`);

  let retryCount = 0;

  const operation = Effect.gen(function* () {
    retryCount++;

    if (retryCount < 3) {
      yield* Effect.fail(new Error("Still failing"));
    }

    yield* Effect.log(`[SUCCESS] Succeeded on attempt ${retryCount}`);

    return "done";
  }).pipe(
    Effect.retry(
      Schedule.exponential("100 millis").pipe(
        Schedule.upTo("1 second"),
        Schedule.recurs(5)
      )
    )
  );

  yield* operation;
});

Effect.runPromise(program);

理由:

防抖和节流管理快速事件:

  • 防抖:等待静默(最后事件后的延迟),然后执行一次
  • 节流:每个间隔最多执行一次
  • 去重:跳过重复事件
  • 速率限制:限制每秒事件数

模式:Schedule.debounce(duration)Schedule.throttle(maxEvents, duration)


没有防抖/节流的快速事件会导致问题:

防抖示例:搜索输入

  • 用户逐个字符输入“hello”
  • 没有防抖:5个API调用(每个字符一次)
  • 有防抖:用户停止输入后1个API调用

节流示例:滚动事件

  • 滚动每秒触发100+次
  • 没有节流:更新滞后,GC压力
  • 有节流:每秒最多更新60次

实际问题:

  • API过载:搜索查询打击后端
  • 渲染滞后:太多DOM更新
  • 资源耗尽:事件处理程序永远赶不上

防抖/节流启用:

  • 效率:更少操作
  • 响应性:UI保持平滑
  • 资源安全:防止耗尽
  • 理智:可预测执行


调度模式3:使用Cron表达式调度任务

规则: 使用cron表达式在特定日历时间调度周期性任务,实现超越简单固定间隔的灵活调度。

好例子:

此示例演示了使用cron调度每日报告生成,支持时区。

import { Effect, Schedule, Console } from "effect";
import { DateTime } from "luxon"; // 用于时区处理

interface ReportConfig {
  readonly cronExpression: string;
  readonly timezone?: string;
  readonly jobName: string;
}

interface ScheduledReport {
  readonly timestamp: Date;
  readonly jobName: string;
  readonly result: string;
}

// 简单cron解析器(生产中,使用如cron-parser的库)
const parseCronExpression = (
  expression: string
): {
  minute: number[];
  hour: number[];
  dayOfMonth: number[];
  month: number[];
  dayOfWeek: number[];
} => {
  const parts = expression.split(" ");

  const parseField = (field: string, max: number): number[] => {
    if (field === "*") {
      return Array.from({ length: max + 1 }, (_, i) => i);
    }

    if (field.includes(",")) {
      return field.split(",").flatMap((part) => parseField(part, max));
    }

    if (field.includes("-")) {
      const [start, end] = field.split("-").map(Number);
      return Array.from({ length: end - start + 1 }, (_, i) => start + i);
    }

    return [Number(field)];
  };

  return {
    minute: parseField(parts[0], 59),
    hour: parseField(parts[1], 23),
    dayOfMonth: parseField(parts[2], 31),
    month: parseField(parts[3], 12),
    dayOfWeek: parseField(parts[4], 6),
  };
};

// 检查当前时间是否匹配cron表达式
const shouldRunNow = (parsed: ReturnType<typeof parseCronExpression>): boolean => {
  const now = new Date();

  return (
    parsed.minute.includes(now.getUTCMinutes()) &&
    parsed.hour.includes(now.getUTCHours()) &&
    parsed.dayOfMonth.includes(now.getUTCDate()) &&
    parsed.month.includes(now.getUTCMonth() + 1) &&
    parsed.dayOfWeek.includes(now.getUTCDay())
  );
};

// 生成报告
const generateReport = (jobName: string): Effect.Effect<ScheduledReport> =>
  Effect.gen(function* () {
    yield* Console.log(`[REPORT] Generating ${jobName}...`);

    // 模拟报告生成
    yield* Effect.sleep("100 millis");

    return {
      timestamp: new Date(),
      jobName,
      result: `Report generated at ${new Date().toISOString()}`,
    };
  });

// 使用cron表达式调度
const scheduleWithCron = (config: ReportConfig) =>
  Effect.gen(function* () {
    const parsed = parseCronExpression(config.cronExpression);

    yield* Console.log(
      `[SCHEDULER] Scheduling job: ${config.jobName}`
    );
    yield* Console.log(`[SCHEDULER] Cron: ${config.cronExpression}`);
    yield* Console.log(`[SCHEDULER] Timezone: ${config.timezone || "UTC"}
`);

    // 创建每分钟检查的调度
    const schedule = Schedule.fixed("1 minute").pipe(
      Schedule.untilInputEffect((report: ScheduledReport) =>
        Effect.gen(function* () {
          const isPastTime = shouldRunNow(parsed);

          if (isPastTime) {
            yield* Console.log(
              `[SCHEDULED] ✓ Running at ${report.timestamp.toISOString()}`
            );
            return true; // 停止调度
          }

          return false; // 继续调度
        })
      )
    );

    // 使用cron调度生成报告
    yield* generateReport(config.jobName).pipe(
      Effect.repeat(schedule)
    );
  });

// 演示多个cron调度
const program = Effect.gen(function* () {
  console.log(
    `
[START] Scheduling multiple jobs with cron expressions
`
  );

  // 调度示例(注意:在实际应用中,这些会在实际时间运行)
  const jobs = [
    {
      cronExpression: "0 9 * * 1-5", // 工作日9 AM
      jobName: "Daily Standup Report",
      timezone: "America/New_York",
    },
    {
      cronExpression: "0 0 * * *", // 每日午夜
      jobName: "Nightly Backup",
      timezone: "UTC",
    },
    {
      cronExpression: "0 0 1 * *", // 每月1日午夜
      jobName: "Monthly Summary",
      timezone: "Europe/London",
    },
  ];

  yield* Console.log("[JOBS] Scheduled:");
  jobs.forEach((job) => {
    console.log(
      `  - ${job.jobName}: ${job.cronExpression} (${job.timezone})`
    );
  });
});

Effect.runPromise(program);

理由:

使用cron表达式进行与业务日历对齐的调度:

  • 每小时备份0 * * * *(每小时整点)
  • 每日报告0 9 * * 1-5(工作日9 AM)
  • 月度清理0 0 1 * *(每月1日午夜)
  • 营业时间0 9-17 * * 1-5(周一至周五9 AM-5 PM)

格式:分钟 小时 日 月 星期


固定间隔不与业务需求对齐:

固定间隔(每24小时):

  • 如果任务需要2小时,下次运行是26小时后
  • 随时间漂移
  • 不与日历对齐
  • 在夏令时更改时失败

Cron表达式

  • 特定日历时间(例如,总是9 AM)
  • 独立于执行持续时间
  • 与营业时间对齐
  • 自然DST处理(时钟调整,cron重新同步)
  • 人类可读 vs 毫秒

实际示例:每日9 AM报告

  • 固定间隔:在9:00调度,需要1小时 → 下次在10:00 → 漂移至5 PM
  • Cron 0 9 * * *:无论持续时间或先前延迟,总是9:00运行


🟠 高级模式

调度模式5:高级重试链和断路器

规则: 使用带断路器的重试链处理复杂失败场景,早期检测级联失败,并防止资源耗尽。

好例子:

此示例演示了断路器模式和回退链模式。

import { Effect, Schedule, Ref, Data } from "effect";

// 错误分类
class RetryableError extends Data.TaggedError("RetryableError")<{
  message: string;
  code: string;
}> {}

class NonRetryableError extends Data.TaggedError("NonRetryableError")<{
  message: string;
  code: string;
}> {}

class CircuitBreakerOpenError extends Data.TaggedError("CircuitBreakerOpenError")<{
  message: string;
}> {}

// 断路器状态
interface CircuitBreakerState {
  status: "closed" | "open" | "half-open";
  failureCount: number;
  lastFailureTime: Date | null;
  successCount: number;
}

// 创建断路器
const createCircuitBreaker = (config: {
  failureThreshold: number;
  resetTimeoutMs: number;
  halfOpenRequests: number;
}) =>
  Effect.gen(function* () {
    const state = yield* Ref.make<CircuitBreakerState>({
      status: "closed",
      failureCount: 0,
      lastFailureTime: null,
      successCount: 0,
    });

    const recordSuccess = Effect.gen(function* () {
      yield* Ref.modify(state, (s) => {
        if (s.status === "half-open") {
          return [
            undefined,
            {
              ...s,
              successCount: s.successCount + 1,
              status: s.successCount + 1 >= config.halfOpenRequests
                ? "closed"
                : "half-open",
              failureCount: 0,
            },
          ];
        }
        return [undefined, s];
      });
    });

    const recordFailure = Effect.gen(function* () {
      yield* Ref.modify(state, (s) => {
        const newFailureCount = s.failureCount + 1;
        const newStatus = newFailureCount >= config.failureThreshold
          ? "open"
          : s.status;

        return [
          undefined,
          {
            ...s,
            failureCount: newFailureCount,
            lastFailureTime: new Date(),
            status: newStatus,
          },
        ];
      });
    });

    const canExecute = Effect.gen(function* () {
      const current = yield* Ref.get(state);

      if (current.status === "closed") {
        return true;
      }

      if (current.status === "open") {
        const timeSinceFailure = Date.now() - (current.lastFailureTime?.getTime() ?? 0);

        if (timeSinceFailure > config.resetTimeoutMs) {
          yield* Ref.modify(state, (s) => [
            undefined,
            {
              ...s,
              status: "half-open",
              failureCount: 0,
              successCount: 0,
            },
          ]);
          return true;
        }

        return false;
      }

      // 半开:允许有限请求
      return true;
    });

    return { recordSuccess, recordFailure, canExecute, state };
  });

// 主示例
const program = Effect.gen(function* () {
  console.log(`
[ADVANCED RETRY] Circuit breaker and fallback chains
`);

  // 创建断路器
  const cb = yield* createCircuitBreaker({
    failureThreshold: 3,
    resetTimeoutMs: 1000,
    halfOpenRequests: 2,
  });

  // 示例1:断路器在行动
  console.log(`[1] Circuit breaker state transitions:
`);

  let requestCount = 0;

  const callWithCircuitBreaker = (shouldFail: boolean) =>
    Effect.gen(function* () {
      const canExecute = yield* cb.canExecute;

      if (!canExecute) {
        yield* Effect.fail(
          new CircuitBreakerOpenError({
            message: "Circuit breaker is open",
          })
        );
      }

      requestCount++;

      if (shouldFail) {
        yield* cb.recordFailure;
        yield* Effect.log(
          `[REQUEST ${requestCount}] FAILED (Circuit: ${(yield* Ref.get(cb.state)).status})`
        );
        yield* Effect.fail(
          new RetryableError({
            message: "Service error",
            code: "500",
          })
        );
      } else {
        yield* cb.recordSuccess;
        yield* Effect.log(
          `[REQUEST ${requestCount}] SUCCESS (Circuit: ${(yield* Ref.get(cb.state)).status})`
        );
        return "success";
      }
    });

  // 模拟失败然后恢复
  const failSequence = [true, true, true, false, false, false];

  for (const shouldFail of failSequence) {
    yield* callWithCircuitBreaker(shouldFail).pipe(
      Effect.catchAll((error) =>
        Effect.gen(function* () {
          if (error._tag === "CircuitBreakerOpenError") {
            yield* Effect.log(
              `[REQUEST ${requestCount + 1}] REJECTED (Circuit open)`
            );
          } else {
            yield* Effect.log(
              `[REQUEST ${requestCount + 1}] ERROR caught`
            );
          }
        })
      )
    );

    // 请求间添加延迟
    yield* Effect.sleep("100 millis");
  }

  // 示例2:回退链
  console.log(`
[2] Fallback chain (primary → secondary → cache):
`);

  const endpoints = {
    primary: "https://api.primary.com/data",
    secondary: "https://api.secondary.com/data",
    cache: "cached-data",
  };

  const callEndpoint = (name: string, shouldFail: boolean) =>
    Effect.gen(function* () {
      yield* Effect.log(`[CALL] Trying ${name}`);

      if (shouldFail) {
        yield* Effect.sleep("50 millis");
        yield* Effect.fail(
          new RetryableError({
            message: `${name} failed`,
            code: "500",
          })
        );
      }

      yield* Effect.sleep("50 millis");
      return `data-from-${name}`;
    });

  const fallbackChain = callEndpoint("primary", true).pipe(
    Effect.orElse(() => callEndpoint("secondary", false)),
    Effect.orElse(() => {
      yield* Effect.log(`[FALLBACK] Using cached data`);
      return Effect.succeed(endpoints.cache);
    })
  );

  const result = yield* fallbackChain;

  yield* Effect.log(`[RESULT] Got: ${result}
`);

  // 示例3:错误特定重试策略
  console.log(`[3] Error classification and adaptive retry:
`);

  const classifyError = (code: string) => {
    if (["502", "503", "504"].includes(code)) {
      return "retryable-service-error";
    }
    if (["408", "429"].includes(code)) {
      return "retryable-rate-limit";
    }
    if (["404", "401", "403"].includes(code)) {
      return "non-retryable";
    }
    if (code === "timeout") {
      return "retryable-network";
    }
    return "unknown";
  };

  const errorCodes = ["500", "404", "429", "503", "timeout"];

  for (const code of errorCodes) {
    const classification = classifyError(code);
    const shouldRetry = !classification.startsWith("non-retryable");

    yield* Effect.log(
      `[ERROR ${code}] → ${classification} (Retry: ${shouldRetry})`
    );
  }

  // 示例4:隔板模式
  console.log(`
[4] Bulkhead isolation (limit concurrency per endpoint):
`);

  const bulkheads = {
    "primary-api": { maxConcurrent: 5, currentCount: 0 },
    "secondary-api": { maxConcurrent: 3, currentCount: 0 },
  };

  const acquirePermit = (endpoint: string) =>
    Effect.gen(function* () {
      const bulkhead = bulkheads[endpoint as keyof typeof bulkheads];

      if (!bulkhead) {
        return false;
      }

      if (bulkhead.currentCount < bulkhead.maxConcurrent) {
        bulkhead.currentCount++;
        return true;
      }

      yield* Effect.log(
        `[BULKHEAD] ${endpoint} at capacity (${bulkhead.currentCount}/${bulkhead.maxConcurrent})`
      );

      return false;
    });

  // 模拟请求
  for (let i = 0; i < 10; i++) {
    const endpoint = i < 6 ? "primary-api" : "secondary-api";
    const acquired = yield* acquirePermit(endpoint);

    if (acquired) {
      yield* Effect.log(
        `[REQUEST] Acquired permit for ${endpoint}`
      );
    }
  }
});

Effect.runPromise(program);

理由:

高级重试策略处理多种失败类型:

  • 断路器:当错误率高时停止重试
  • 隔板:限制每个操作的并发性
  • 回退链:按顺序尝试多种方法
  • 自适应重试:基于失败模式调整策略
  • 健康检查:在恢复前验证恢复

模式:结合 Schedule.retryRef 状态和错误分类


简单重试在生产中失败:

场景1:级联失败

  • 服务A调用服务B(下线)
  • 重试堆积,消耗资源
  • A在尝试恢复B时过载
  • 系统崩溃

场景2:混合失败

  • 404(未找到) - 重试无济于事
  • 500(服务器错误) - 重试可能有帮助
  • 网络超时 - 重试可能有帮助
  • 所有相同重试策略 = 低效

场景3:惊群效应

  • 10,000个客户端同时重试
  • 服务器恢复,再次被打击
  • 需要协调回退 + 抖动

解决方案:

断路器

  • 监控错误率
  • 高时停止请求
  • 逐渐恢复
  • 防止级联失败

回退链

  • 尝试主要端点
  • 尝试次要端点
  • 使用缓存
  • 返回降级结果

自适应重试

  • 分类错误类型
  • 使用适当策略
  • 跳过不可重试错误
  • 动态调整回退