name: effect-patterns-scheduling-periodic-tasks description: Effect-TS模式用于调度周期性任务。在Effect-TS应用程序中处理调度周期性任务时使用。
Effect-TS模式:调度周期性任务
本技能提供了3个精选的Effect-TS模式用于调度周期性任务。 在以下任务相关时使用此技能:
- 调度周期性任务
- Effect-TS应用程序中的最佳实践
- 实际世界的模式和解决方案
🟡 中级模式
调度模式4:防抖和节流执行
规则: 使用防抖等待静默后执行,使用节流限制执行频率,两者对处理快速事件至关重要。
好例子:
此示例演示了常见场景下的防抖和节流。
import { Effect, Schedule, Ref } from "effect";
interface SearchQuery {
readonly query: string;
readonly timestamp: Date;
}
// 模拟API搜索
const performSearch = (query: string): Effect.Effect<string[]> =>
Effect.gen(function* () {
yield* Effect.log(`[API] Searching for: "${query}"`);
yield* Effect.sleep("100 millis"); // 模拟API延迟
return [
`Result 1 for ${query}`,
`Result 2 for ${query}`,
`Result 3 for ${query}`,
];
});
// 主程序:演示防抖和节流
const program = Effect.gen(function* () {
console.log(`
[DEBOUNCE/THROTTLE] Handling rapid events
`);
// 示例1:防抖搜索输入
console.log(`[1] Debounced search (wait for silence):
`);
const searchQueries = ["h", "he", "hel", "hell", "hello"];
const debouncedSearches = yield* Ref.make<Effect.Effect<string[]>[]>([]);
for (const query of searchQueries) {
yield* Effect.log(`[INPUT] User typed: "${query}"`);
// 在实际应用中,这会被防抖
yield* Effect.sleep("150 millis"); // 用户输入
}
// 用户停止后,执行搜索
yield* Effect.log(`[DEBOUNCE] User silent for 200ms, executing search`);
const searchResults = yield* performSearch("hello");
yield* Effect.log(`[RESULTS] ${searchResults.length} results found
`);
// 示例2:节流滚动事件
console.log(`[2] Throttled scroll handler (max 10/sec):
`);
const scrollEventCount = yield* Ref.make(0);
const updateCount = yield* Ref.make(0);
// 模拟100个快速滚动事件
for (let i = 0; i < 100; i++) {
yield* Ref.update(scrollEventCount, (c) => c + 1);
// 在实际应用中,滚动处理程序会被节流
if (i % 10 === 0) {
// 模拟节流更新(每秒最多10次)
yield* Ref.update(updateCount, (c) => c + 1);
}
}
const events = yield* Ref.get(scrollEventCount);
const updates = yield* Ref.get(updateCount);
yield* Effect.log(
`[THROTTLE] ${events} scroll events → ${updates} updates (${(updates / events * 100).toFixed(1)}% update rate)
`
);
// 示例3:去重
console.log(`[3] Deduplicating rapid events:
`);
const userClicks = ["click", "click", "click", "dblclick", "click"];
const lastClick = yield* Ref.make<string | null>(null);
const clickCount = yield* Ref.make(0);
for (const click of userClicks) {
const prev = yield* Ref.get(lastClick);
if (click !== prev) {
yield* Effect.log(`[CLICK] Processing: ${click}`);
yield* Ref.update(clickCount, (c) => c + 1);
yield* Ref.set(lastClick, click);
} else {
yield* Effect.log(`[CLICK] Duplicate: ${click} (skipped)`);
}
}
const processed = yield* Ref.get(clickCount);
yield* Effect.log(
`
[DEDUPE] ${userClicks.length} clicks → ${processed} processed
`
);
// 示例4:错误时的指数退避重试
console.log(`[4] Throttled retry on errors:
`);
let retryCount = 0;
const operation = Effect.gen(function* () {
retryCount++;
if (retryCount < 3) {
yield* Effect.fail(new Error("Still failing"));
}
yield* Effect.log(`[SUCCESS] Succeeded on attempt ${retryCount}`);
return "done";
}).pipe(
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.upTo("1 second"),
Schedule.recurs(5)
)
)
);
yield* operation;
});
Effect.runPromise(program);
理由:
防抖和节流管理快速事件:
- 防抖:等待静默(最后事件后的延迟),然后执行一次
- 节流:每个间隔最多执行一次
- 去重:跳过重复事件
- 速率限制:限制每秒事件数
模式:Schedule.debounce(duration) 或 Schedule.throttle(maxEvents, duration)
没有防抖/节流的快速事件会导致问题:
防抖示例:搜索输入
- 用户逐个字符输入“hello”
- 没有防抖:5个API调用(每个字符一次)
- 有防抖:用户停止输入后1个API调用
节流示例:滚动事件
- 滚动每秒触发100+次
- 没有节流:更新滞后,GC压力
- 有节流:每秒最多更新60次
实际问题:
- API过载:搜索查询打击后端
- 渲染滞后:太多DOM更新
- 资源耗尽:事件处理程序永远赶不上
防抖/节流启用:
- 效率:更少操作
- 响应性:UI保持平滑
- 资源安全:防止耗尽
- 理智:可预测执行
调度模式3:使用Cron表达式调度任务
规则: 使用cron表达式在特定日历时间调度周期性任务,实现超越简单固定间隔的灵活调度。
好例子:
此示例演示了使用cron调度每日报告生成,支持时区。
import { Effect, Schedule, Console } from "effect";
import { DateTime } from "luxon"; // 用于时区处理
interface ReportConfig {
readonly cronExpression: string;
readonly timezone?: string;
readonly jobName: string;
}
interface ScheduledReport {
readonly timestamp: Date;
readonly jobName: string;
readonly result: string;
}
// 简单cron解析器(生产中,使用如cron-parser的库)
const parseCronExpression = (
expression: string
): {
minute: number[];
hour: number[];
dayOfMonth: number[];
month: number[];
dayOfWeek: number[];
} => {
const parts = expression.split(" ");
const parseField = (field: string, max: number): number[] => {
if (field === "*") {
return Array.from({ length: max + 1 }, (_, i) => i);
}
if (field.includes(",")) {
return field.split(",").flatMap((part) => parseField(part, max));
}
if (field.includes("-")) {
const [start, end] = field.split("-").map(Number);
return Array.from({ length: end - start + 1 }, (_, i) => start + i);
}
return [Number(field)];
};
return {
minute: parseField(parts[0], 59),
hour: parseField(parts[1], 23),
dayOfMonth: parseField(parts[2], 31),
month: parseField(parts[3], 12),
dayOfWeek: parseField(parts[4], 6),
};
};
// 检查当前时间是否匹配cron表达式
const shouldRunNow = (parsed: ReturnType<typeof parseCronExpression>): boolean => {
const now = new Date();
return (
parsed.minute.includes(now.getUTCMinutes()) &&
parsed.hour.includes(now.getUTCHours()) &&
parsed.dayOfMonth.includes(now.getUTCDate()) &&
parsed.month.includes(now.getUTCMonth() + 1) &&
parsed.dayOfWeek.includes(now.getUTCDay())
);
};
// 生成报告
const generateReport = (jobName: string): Effect.Effect<ScheduledReport> =>
Effect.gen(function* () {
yield* Console.log(`[REPORT] Generating ${jobName}...`);
// 模拟报告生成
yield* Effect.sleep("100 millis");
return {
timestamp: new Date(),
jobName,
result: `Report generated at ${new Date().toISOString()}`,
};
});
// 使用cron表达式调度
const scheduleWithCron = (config: ReportConfig) =>
Effect.gen(function* () {
const parsed = parseCronExpression(config.cronExpression);
yield* Console.log(
`[SCHEDULER] Scheduling job: ${config.jobName}`
);
yield* Console.log(`[SCHEDULER] Cron: ${config.cronExpression}`);
yield* Console.log(`[SCHEDULER] Timezone: ${config.timezone || "UTC"}
`);
// 创建每分钟检查的调度
const schedule = Schedule.fixed("1 minute").pipe(
Schedule.untilInputEffect((report: ScheduledReport) =>
Effect.gen(function* () {
const isPastTime = shouldRunNow(parsed);
if (isPastTime) {
yield* Console.log(
`[SCHEDULED] ✓ Running at ${report.timestamp.toISOString()}`
);
return true; // 停止调度
}
return false; // 继续调度
})
)
);
// 使用cron调度生成报告
yield* generateReport(config.jobName).pipe(
Effect.repeat(schedule)
);
});
// 演示多个cron调度
const program = Effect.gen(function* () {
console.log(
`
[START] Scheduling multiple jobs with cron expressions
`
);
// 调度示例(注意:在实际应用中,这些会在实际时间运行)
const jobs = [
{
cronExpression: "0 9 * * 1-5", // 工作日9 AM
jobName: "Daily Standup Report",
timezone: "America/New_York",
},
{
cronExpression: "0 0 * * *", // 每日午夜
jobName: "Nightly Backup",
timezone: "UTC",
},
{
cronExpression: "0 0 1 * *", // 每月1日午夜
jobName: "Monthly Summary",
timezone: "Europe/London",
},
];
yield* Console.log("[JOBS] Scheduled:");
jobs.forEach((job) => {
console.log(
` - ${job.jobName}: ${job.cronExpression} (${job.timezone})`
);
});
});
Effect.runPromise(program);
理由:
使用cron表达式进行与业务日历对齐的调度:
- 每小时备份:
0 * * * *(每小时整点) - 每日报告:
0 9 * * 1-5(工作日9 AM) - 月度清理:
0 0 1 * *(每月1日午夜) - 营业时间:
0 9-17 * * 1-5(周一至周五9 AM-5 PM)
格式:分钟 小时 日 月 星期
固定间隔不与业务需求对齐:
固定间隔(每24小时):
- 如果任务需要2小时,下次运行是26小时后
- 随时间漂移
- 不与日历对齐
- 在夏令时更改时失败
Cron表达式:
- 特定日历时间(例如,总是9 AM)
- 独立于执行持续时间
- 与营业时间对齐
- 自然DST处理(时钟调整,cron重新同步)
- 人类可读 vs 毫秒
实际示例:每日9 AM报告
- 固定间隔:在9:00调度,需要1小时 → 下次在10:00 → 漂移至5 PM
- Cron
0 9 * * *:无论持续时间或先前延迟,总是9:00运行
🟠 高级模式
调度模式5:高级重试链和断路器
规则: 使用带断路器的重试链处理复杂失败场景,早期检测级联失败,并防止资源耗尽。
好例子:
此示例演示了断路器模式和回退链模式。
import { Effect, Schedule, Ref, Data } from "effect";
// 错误分类
class RetryableError extends Data.TaggedError("RetryableError")<{
message: string;
code: string;
}> {}
class NonRetryableError extends Data.TaggedError("NonRetryableError")<{
message: string;
code: string;
}> {}
class CircuitBreakerOpenError extends Data.TaggedError("CircuitBreakerOpenError")<{
message: string;
}> {}
// 断路器状态
interface CircuitBreakerState {
status: "closed" | "open" | "half-open";
failureCount: number;
lastFailureTime: Date | null;
successCount: number;
}
// 创建断路器
const createCircuitBreaker = (config: {
failureThreshold: number;
resetTimeoutMs: number;
halfOpenRequests: number;
}) =>
Effect.gen(function* () {
const state = yield* Ref.make<CircuitBreakerState>({
status: "closed",
failureCount: 0,
lastFailureTime: null,
successCount: 0,
});
const recordSuccess = Effect.gen(function* () {
yield* Ref.modify(state, (s) => {
if (s.status === "half-open") {
return [
undefined,
{
...s,
successCount: s.successCount + 1,
status: s.successCount + 1 >= config.halfOpenRequests
? "closed"
: "half-open",
failureCount: 0,
},
];
}
return [undefined, s];
});
});
const recordFailure = Effect.gen(function* () {
yield* Ref.modify(state, (s) => {
const newFailureCount = s.failureCount + 1;
const newStatus = newFailureCount >= config.failureThreshold
? "open"
: s.status;
return [
undefined,
{
...s,
failureCount: newFailureCount,
lastFailureTime: new Date(),
status: newStatus,
},
];
});
});
const canExecute = Effect.gen(function* () {
const current = yield* Ref.get(state);
if (current.status === "closed") {
return true;
}
if (current.status === "open") {
const timeSinceFailure = Date.now() - (current.lastFailureTime?.getTime() ?? 0);
if (timeSinceFailure > config.resetTimeoutMs) {
yield* Ref.modify(state, (s) => [
undefined,
{
...s,
status: "half-open",
failureCount: 0,
successCount: 0,
},
]);
return true;
}
return false;
}
// 半开:允许有限请求
return true;
});
return { recordSuccess, recordFailure, canExecute, state };
});
// 主示例
const program = Effect.gen(function* () {
console.log(`
[ADVANCED RETRY] Circuit breaker and fallback chains
`);
// 创建断路器
const cb = yield* createCircuitBreaker({
failureThreshold: 3,
resetTimeoutMs: 1000,
halfOpenRequests: 2,
});
// 示例1:断路器在行动
console.log(`[1] Circuit breaker state transitions:
`);
let requestCount = 0;
const callWithCircuitBreaker = (shouldFail: boolean) =>
Effect.gen(function* () {
const canExecute = yield* cb.canExecute;
if (!canExecute) {
yield* Effect.fail(
new CircuitBreakerOpenError({
message: "Circuit breaker is open",
})
);
}
requestCount++;
if (shouldFail) {
yield* cb.recordFailure;
yield* Effect.log(
`[REQUEST ${requestCount}] FAILED (Circuit: ${(yield* Ref.get(cb.state)).status})`
);
yield* Effect.fail(
new RetryableError({
message: "Service error",
code: "500",
})
);
} else {
yield* cb.recordSuccess;
yield* Effect.log(
`[REQUEST ${requestCount}] SUCCESS (Circuit: ${(yield* Ref.get(cb.state)).status})`
);
return "success";
}
});
// 模拟失败然后恢复
const failSequence = [true, true, true, false, false, false];
for (const shouldFail of failSequence) {
yield* callWithCircuitBreaker(shouldFail).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
if (error._tag === "CircuitBreakerOpenError") {
yield* Effect.log(
`[REQUEST ${requestCount + 1}] REJECTED (Circuit open)`
);
} else {
yield* Effect.log(
`[REQUEST ${requestCount + 1}] ERROR caught`
);
}
})
)
);
// 请求间添加延迟
yield* Effect.sleep("100 millis");
}
// 示例2:回退链
console.log(`
[2] Fallback chain (primary → secondary → cache):
`);
const endpoints = {
primary: "https://api.primary.com/data",
secondary: "https://api.secondary.com/data",
cache: "cached-data",
};
const callEndpoint = (name: string, shouldFail: boolean) =>
Effect.gen(function* () {
yield* Effect.log(`[CALL] Trying ${name}`);
if (shouldFail) {
yield* Effect.sleep("50 millis");
yield* Effect.fail(
new RetryableError({
message: `${name} failed`,
code: "500",
})
);
}
yield* Effect.sleep("50 millis");
return `data-from-${name}`;
});
const fallbackChain = callEndpoint("primary", true).pipe(
Effect.orElse(() => callEndpoint("secondary", false)),
Effect.orElse(() => {
yield* Effect.log(`[FALLBACK] Using cached data`);
return Effect.succeed(endpoints.cache);
})
);
const result = yield* fallbackChain;
yield* Effect.log(`[RESULT] Got: ${result}
`);
// 示例3:错误特定重试策略
console.log(`[3] Error classification and adaptive retry:
`);
const classifyError = (code: string) => {
if (["502", "503", "504"].includes(code)) {
return "retryable-service-error";
}
if (["408", "429"].includes(code)) {
return "retryable-rate-limit";
}
if (["404", "401", "403"].includes(code)) {
return "non-retryable";
}
if (code === "timeout") {
return "retryable-network";
}
return "unknown";
};
const errorCodes = ["500", "404", "429", "503", "timeout"];
for (const code of errorCodes) {
const classification = classifyError(code);
const shouldRetry = !classification.startsWith("non-retryable");
yield* Effect.log(
`[ERROR ${code}] → ${classification} (Retry: ${shouldRetry})`
);
}
// 示例4:隔板模式
console.log(`
[4] Bulkhead isolation (limit concurrency per endpoint):
`);
const bulkheads = {
"primary-api": { maxConcurrent: 5, currentCount: 0 },
"secondary-api": { maxConcurrent: 3, currentCount: 0 },
};
const acquirePermit = (endpoint: string) =>
Effect.gen(function* () {
const bulkhead = bulkheads[endpoint as keyof typeof bulkheads];
if (!bulkhead) {
return false;
}
if (bulkhead.currentCount < bulkhead.maxConcurrent) {
bulkhead.currentCount++;
return true;
}
yield* Effect.log(
`[BULKHEAD] ${endpoint} at capacity (${bulkhead.currentCount}/${bulkhead.maxConcurrent})`
);
return false;
});
// 模拟请求
for (let i = 0; i < 10; i++) {
const endpoint = i < 6 ? "primary-api" : "secondary-api";
const acquired = yield* acquirePermit(endpoint);
if (acquired) {
yield* Effect.log(
`[REQUEST] Acquired permit for ${endpoint}`
);
}
}
});
Effect.runPromise(program);
理由:
高级重试策略处理多种失败类型:
- 断路器:当错误率高时停止重试
- 隔板:限制每个操作的并发性
- 回退链:按顺序尝试多种方法
- 自适应重试:基于失败模式调整策略
- 健康检查:在恢复前验证恢复
模式:结合 Schedule.retry、Ref 状态和错误分类
简单重试在生产中失败:
场景1:级联失败
- 服务A调用服务B(下线)
- 重试堆积,消耗资源
- A在尝试恢复B时过载
- 系统崩溃
场景2:混合失败
- 404(未找到) - 重试无济于事
- 500(服务器错误) - 重试可能有帮助
- 网络超时 - 重试可能有帮助
- 所有相同重试策略 = 低效
场景3:惊群效应
- 10,000个客户端同时重试
- 服务器恢复,再次被打击
- 需要协调回退 + 抖动
解决方案:
断路器:
- 监控错误率
- 高时停止请求
- 逐渐恢复
- 防止级联失败
回退链:
- 尝试主要端点
- 尝试次要端点
- 使用缓存
- 返回降级结果
自适应重试:
- 分类错误类型
- 使用适当策略
- 跳过不可重试错误
- 动态调整回退