Effect-TS可观测性模式Skill effect-patterns-observability

这个技能提供了Effect-TS中可观测性的13个精选模式,包括日志记录、指标监控、追踪等,帮助开发者在TypeScript应用程序中实现高效的可观测性、调试和监控。关键词:Effect-TS、可观测性、日志、指标、追踪、TypeScript、后端开发、调试、监控、模式。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

name: effect-patterns-observability description: Effect-TS的可观测性模式。在Effect-TS应用程序中处理可观测性时使用。

Effect-TS模式:可观测性

此技能提供了13个精选的Effect-TS可观测性模式。 在以下任务中使用此技能:

  • 可观测性
  • Effect-TS应用程序中的最佳实践
  • 现实世界的模式和解决方案

🟢 初级模式

调试Effect程序

规则: 使用Effect.tap和日志记录来检查值,而不改变程序流程。

好例子:

import { Effect, pipe } from "effect"

// ============================================
// 1. 使用tap检查值
// ============================================

const fetchUser = (id: string) =>
  Effect.succeed({ id, name: "Alice", email: "alice@example.com" })

const processUser = (id: string) =>
  fetchUser(id).pipe(
    // tap运行为其副作用的效果,然后继续使用原始值
    Effect.tap((user) => Effect.log(`获取的用户:${user.name}`)),
    Effect.map((user) => ({ ...user, processed: true })),
    Effect.tap((user) => Effect.log(`处理后的:${JSON.stringify(user)}`))
  )

// ============================================
// 2. 调试流水线
// ============================================

const numbers = [1, 2, 3, 4, 5]

const pipeline = Effect.gen(function* () {
  yield* Effect.log("开始流水线")

  const step1 = numbers.filter((n) => n % 2 === 0)
  yield* Effect.log(`过滤后(偶数):${JSON.stringify(step1)}`)

  const step2 = step1.map((n) => n * 10)
  yield* Effect.log(`映射后(*10):${JSON.stringify(step2)}`)

  const step3 = step2.reduce((a, b) => a + b, 0)
  yield* Effect.log(`归约后(求和):${step3}`)

  return step3
})

// ============================================
// 3. 调试错误
// ============================================

const riskyOperation = (shouldFail: boolean) =>
  Effect.gen(function* () {
    yield* Effect.log("开始高风险操作")

    if (shouldFail) {
      yield* Effect.log("即将失败...")
      return yield* Effect.fail(new Error("出了点问题"))
    }

    yield* Effect.log("成功!")
    return "结果"
  })

const debugErrors = riskyOperation(true).pipe(
  // 记录操作失败时
  Effect.tapError((error) => Effect.log(`操作失败:${error.message}`)),

  // 提供后备方案
  Effect.catchAll((error) => {
    return Effect.succeed(`从错误中恢复:${error.message}`)
  })
)

// ============================================
// 4. 跟踪执行流程
// ============================================

const step = (name: string, value: number) =>
  Effect.gen(function* () {
    yield* Effect.log(`[${name}] 输入:${value}`)
    const result = value * 2
    yield* Effect.log(`[${name}] 输出:${result}`)
    return result
  })

const tracedWorkflow = Effect.gen(function* () {
  const a = yield* step("步骤 1", 5)
  const b = yield* step("步骤 2", a)
  const c = yield* step("步骤 3", b)
  yield* Effect.log(`最终结果:${c}`)
  return c
})

// ============================================
// 5. 使用控制台快速调试
// ============================================

// 有时只需要console.log
const quickDebug = Effect.gen(function* () {
  const value = yield* Effect.succeed(42)
  
  // Effect.sync包装副作用
  yield* Effect.sync(() => console.log("快速调试:", value))
  
  return value
})

// ============================================
// 6. 运行示例
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("=== Tap示例 ===")
  yield* processUser("123")

  yield* Effect.log("
=== 流水线调试 ===")
  yield* pipeline

  yield* Effect.log("
=== 错误调试 ===")
  yield* debugErrors

  yield* Effect.log("
=== 跟踪工作流 ===")
  yield* tracedWorkflow
})

Effect.runPromise(program)

原理:

使用Effect.tap检查值和Effect.log跟踪执行,而不改变程序行为。


调试Effect代码与命令式代码不同:

  1. 无断点 - Effects是描述,不是执行
  2. 惰性求值 - 代码在调用runPromise时稍后运行
  3. 组合 - Effects链接在一起

tap和日志记录让你在不打破链的情况下查看内部。



你的第一个日志

规则: 使用Effect.log和相关函数进行结构化、上下文感知的日志记录。

好例子:

import { Effect, Logger, LogLevel } from "effect"

// ============================================
// 1. 基本日志记录
// ============================================

const basicLogging = Effect.gen(function* () {
  // 不同日志级别
  yield* Effect.logDebug("调试消息 - 用于开发")
  yield* Effect.logInfo("信息消息 - 正常操作")
  yield* Effect.log("默认日志 - 与logInfo相同")
  yield* Effect.logWarning("警告 - 某些不寻常")
  yield* Effect.logError("错误 - 出了问题")
})

// ============================================
// 2. 带上下文的日志记录
// ============================================

const withContext = Effect.gen(function* () {
  // 向日志添加结构化数据
  yield* Effect.log("用户登录").pipe(
    Effect.annotateLogs({
      userId: "user-123",
      action: "login",
      ipAddress: "192.168.1.1",
    })
  )

  // 添加单个注释
  yield* Effect.log("处理请求").pipe(
    Effect.annotateLogs("requestId", "req-456")
  )
})

// ============================================
// 3. 日志跨度用于计时
// ============================================

const withTiming = Effect.gen(function* () {
  yield* Effect.log("开始操作")

  // withLogSpan添加计时信息
  yield* Effect.sleep("100毫秒").pipe(
    Effect.withLogSpan("数据库查询")
  )

  yield* Effect.log("操作完成")
})

// ============================================
// 4. 实际例子
// ============================================

interface User {
  id: string
  email: string
}

const processOrder = (orderId: string, userId: string) =>
  Effect.gen(function* () {
    yield* Effect.logInfo("处理订单").pipe(
      Effect.annotateLogs({ orderId, userId })
    )

    // 模拟工作
    yield* Effect.sleep("50毫秒")

    yield* Effect.logInfo("订单处理成功").pipe(
      Effect.annotateLogs({ orderId, status: "已完成" })
    )

    return { orderId, status: "已完成" }
  }).pipe(
    Effect.withLogSpan("processOrder")
  )

// ============================================
// 5. 配置日志级别
// ============================================

const debugProgram = basicLogging.pipe(
  // 显示所有日志,包括调试
  Logger.withMinimumLogLevel(LogLevel.Debug)
)

const productionProgram = basicLogging.pipe(
  // 只显示警告和错误
  Logger.withMinimumLogLevel(LogLevel.Warning)
)

// ============================================
// 6. 运行
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("=== 基本日志记录 ===")
  yield* basicLogging

  yield* Effect.log("
=== 带上下文 ===")
  yield* withContext

  yield* Effect.log("
=== 带计时 ===")
  yield* withTiming

  yield* Effect.log("
=== 处理订单 ===")
  yield* processOrder("order-789", "user-123")
})

Effect.runPromise(program)

原理:

使用Effect的内置日志函数进行结构化、上下文感知的日志记录,可与任何日志后端配合使用。


Effect的日志记录优于console.log

  1. 结构化 - 日志是数据,不仅仅是字符串
  2. 上下文感知 - 自动包含光纤信息、时间戳
  3. 可配置 - 更改日志级别、格式、目的地
  4. 类型安全 - Effect类型系统的一部分


🟡 中级模式

使用Effect.fn对函数调用进行仪器化和观察

规则: 使用Effect.fn以组合和类型安全的方式包装函数,添加效果式仪器化,如日志记录、指标或追踪。

好例子:

import { Effect } from "effect";

// 要仪器化的简单函数
function add(a: number, b: number): number {
  return a + b;
}

// 使用Effect.fn为函数添加可观测性仪器化
const addWithLogging = Effect.fn("add")(add).pipe(
  Effect.withSpan("add", { attributes: { "fn.name": "add" } })
);

// 在Effect工作流中使用仪器化函数
const program = Effect.gen(function* () {
  yield* Effect.logInfo("调用add函数");
  const sum = yield* addWithLogging(2, 3);
  yield* Effect.logInfo(`和为 ${sum}`);
  return sum;
});

// 运行程序
Effect.runPromise(program);

解释:

  • Effect.fn("name")(fn)包装函数,启用仪器化能力。
  • 你可以向函数边界添加追踪跨度、日志记录、指标和其他可观测性逻辑。
  • 保持仪器化与业务逻辑分离,完全可组合。
  • 包装的函数与Effect的可观测性和追踪基础设施无缝集成。

反模式:

将日志记录、指标或追踪逻辑直接散布在业务函数中,使代码难以测试、维护和组合。

原理:

使用Effect.fn包装函数调用,添加效果式逻辑,如日志记录、指标或追踪。 这使你能以组合、类型安全的方式观察、监控和调试函数边界。

仪器化函数调用对于可观测性至关重要,尤其是在复杂或关键代码路径中。 Effect.fn让你在函数调用之前、之后或周围添加效果式逻辑(日志记录、指标、追踪等),而不改变函数的核心逻辑。


利用Effect的内置结构化日志记录

规则: 使用Effect.log、Effect.logInfo和Effect.logError向Effect代码添加结构化、上下文感知的日志记录。

好例子:

import { Effect } from "effect";

// 记录简单消息
const program = Effect.gen(function* () {
  yield* Effect.log("启动应用程序");
});

// 在不同级别记录
const infoProgram = Effect.gen(function* () {
  yield* Effect.logInfo("用户登录");
});

const errorProgram = Effect.gen(function* () {
  yield* Effect.logError("连接数据库失败");
});

// 记录动态值
const userId = 42;
const logUserProgram = Effect.gen(function* () {
  yield* Effect.logInfo(`处理用户:${userId}`);
});

// 在工作流中使用日志记录
const workflow = Effect.gen(function* () {
  yield* Effect.log("开始工作流");
  // ... 做一些工作
  yield* Effect.logInfo("工作流步骤完成");
  // ... 处理错误
  yield* Effect.logError("出了点问题");
});

解释:

  • Effect.log以默认级别记录消息。
  • Effect.logInfoEffect.logError以特定级别记录。
  • 日志记录是上下文感知的,可以在Effect工作流的任何位置使用。

反模式:

使用console.log或临时日志记录散布在代码中,这些日志不结构化、不上下文感知,且在生产中难以管理。

原理:

使用Effect.logEffect.logInfoEffect.logError和相关函数向Effect代码添加结构化、上下文感知的日志记录。 这使你能以一致和可配置的方式捕获重要事件、错误和业务信息。

结构化日志记录使得在生产中搜索、过滤和分析日志更容易。 Effect的日志函数是上下文感知的,意味着它们自动包含相关元数据,并且可以全局配置。


向应用程序添加自定义指标

规则: 使用Metric.counter、Metric.gauge和Metric.histogram对代码进行仪器化以进行监控。

好例子:

此例子创建一个计数器来跟踪用户创建次数,以及一个直方图来跟踪数据库操作的持续时间。

import { Effect, Metric, Duration } from "effect"; // 我们不再需要MetricBoundaries

// 1. 定义你的指标
const userRegisteredCounter = Metric.counter("users_registered_total", {
  description: "已注册用户数量的计数器。",
});

const dbDurationTimer = Metric.timer(
  "db_operation_duration",
  "数据库操作持续时间的计时器"
);

// 2. 模拟数据库调用
const saveUserToDb = Effect.succeed("用户已保存").pipe(
  Effect.delay(Duration.millis(Math.random() * 100))
);

// 3. 仪器化业务逻辑
const createUser = Effect.gen(function* () {
  // 计时操作
  yield* saveUserToDb.pipe(Metric.trackDuration(dbDurationTimer));

  // 递增计数器
  yield* Metric.increment(userRegisteredCounter);

  return { status: "成功" };
});

// 运行Effect
const programWithLogging = Effect.gen(function* () {
  const result = yield* createUser;
  yield* Effect.log(`结果:${JSON.stringify(result)}`);
  return result;
});

Effect.runPromise(programWithLogging);

反模式:

不向应用程序添加任何指标。没有指标,你就是在盲目飞行。你没有应用程序健康、性能或业务KPI的高级概览。你无法构建仪表板,无法为异常行为设置警报(例如,“错误率太高”),并且被迫依赖挖掘日志来 理解系统状态。

原理:

要监控应用程序的健康和性能,使用Metric模块对代码进行仪器化。三种主要类型是:

  • Metric.counter("name"):计数事件发生次数(例如,users_registered_total)。只增不减。
  • Metric.gauge("name"):跟踪可以增减的值(例如,active_connections)。
  • Metric.histogram("name"):跟踪值的分布(例如,request_duration_seconds)。

日志用于事件,追踪用于请求,而指标用于聚合。它们提供系统健康的高级数值视图,非常适合构建仪表板和设置警报。

Effect的Metric模块提供了一种简单、声明式的方式来添加这种仪器化。通过预先定义指标,你可以使用Metric.incrementEffect.timed等操作符来更新它们。这与Effect的上下文系统完全集成,允许通过Layer提供不同的指标后端(如Prometheus或StatsD)。

这使你能回答如下问题:

  • “过去24小时我们的用户注册率是多少?”
  • “我们是否接近最大数据库连接数?”
  • “我们API请求的95百分位延迟是多少?”


向应用程序添加自定义指标

规则: 使用Effect的Metric模块定义和更新自定义指标,用于业务和性能监控。

好例子:

import { Effect, Metric } from "effect";

// 定义处理作业的计数器指标
const jobsProcessed = Metric.counter("jobs_processed");

// 处理作业时递增计数器
const processJob = Effect.gen(function* () {
  // ... 处理作业
  yield* Effect.log("作业已处理");
  yield* Metric.increment(jobsProcessed);
});

// 定义当前活跃用户的仪表
const activeUsers = Metric.gauge("active_users");

// 用户登录或退出时更新仪表
const userSignedIn = Metric.set(activeUsers, 1); // 设置为1(简化例子)
const userSignedOut = Metric.set(activeUsers, 0); // 设置为0(简化例子)

// 定义请求持续时间的直方图
const requestDuration = Metric.histogram("request_duration", [
  0.1, 0.5, 1, 2, 5,
] as any); // 边界,单位秒

// 记录请求持续时间
const recordDuration = (duration: number) =>
  Metric.update(requestDuration, duration);

解释:

  • Metric.counter跟踪事件计数。
  • Metric.gauge跟踪可以增减的值(例如,活跃用户)。
  • Metric.histogram跟踪分布(例如,请求持续时间)。
  • Effect.updateMetric在工作流中更新指标。

反模式:

仅依赖日志进行监控,或使用未与可观测性堆栈集成的临时计数器和变量。

原理:

使用Effect的Metric模块定义和更新自定义指标,如计数器、仪表和直方图。 这使你能以类型安全和组合的方式跟踪业务事件、性能指标和系统健康。

指标提供应用程序行为和性能的定量洞察。 通过对代码进行指标仪器化,你可以监控关键事件、检测异常并驱动业务决策。


使用跨度跨服务追踪操作

规则: 使用Effect.withSpan创建和注释追踪跨度以进行操作,启用分布式追踪和性能分析。

好例子:

import { Effect } from "effect";

// 使用自定义跨度追踪数据库查询
const fetchUser = Effect.sync(() => {
  // ...从数据库获取用户
  return { id: 1, name: "Alice" };
}).pipe(Effect.withSpan("db.fetchUser"));

// 追踪HTTP请求并添加额外属性
const fetchData = Effect.tryPromise({
  try: () => fetch("https://api.example.com/data").then((res) => res.json()),
  catch: (err) => `网络错误:${String(err)}`,
}).pipe(
  Effect.withSpan("http.fetchData", {
    attributes: { url: "https://api.example.com/data" },
  })
);

// 在工作流中使用跨度
const program = Effect.gen(function* () {
  yield* Effect.log("开始工作流").pipe(
    Effect.withSpan("workflow.start")
  );
  const user = yield* fetchUser;
  yield* Effect.log(`获取的用户:${user.name}`).pipe(
    Effect.withSpan("workflow.end")
  );
});

解释:

  • Effect.withSpan在操作周围创建追踪跨度。
  • 跨度可以命名并用属性注释以提供更丰富的上下文。
  • 追踪启用分布式可观测性和性能分析。

反模式:

仅依赖日志或指标进行性能分析,或缺乏对跨服务的请求和操作流的可见性。

原理:

使用Effect.withSpan在应用程序中的重要操作周围创建自定义追踪跨度。 这启用分布式追踪、性能分析和系统请求流的深入可见性。

追踪跨度帮助你理解操作的流和计时,特别是在分布式系统或复杂工作流中。 它们使你能精确定位瓶颈、可视化依赖关系,并将日志和指标与特定请求关联起来。


使用跨度跨服务追踪操作

规则: 使用Effect.withSpan为重要操作创建自定义追踪跨度。

好例子:

此例子显示一个多步操作。每一步和整个操作都用跨度包装。这在追踪中创建一个易于可视化的父子层次结构。

import { Effect, Duration } from "effect";

const validateInput = (input: unknown) =>
  Effect.gen(function* () {
    yield* Effect.logInfo("开始输入验证...");
    yield* Effect.sleep(Duration.millis(10));
    const result = { email: "paul@example.com" };
    yield* Effect.logInfo(`✅ 输入已验证:${result.email}`);
    return result;
  }).pipe(
    // 这创建一个子跨度
    Effect.withSpan("validateInput")
  );

const saveToDatabase = (user: { email: string }) =>
  Effect.gen(function* () {
    yield* Effect.logInfo(`保存用户到数据库:${user.email}`);
    yield* Effect.sleep(Duration.millis(50));
    const result = { id: 123, ...user };
    yield* Effect.logInfo(`✅ 用户已保存,ID:${result.id}`);
    return result;
  }).pipe(
    // 此跨度包含有用属性
    Effect.withSpan("saveToDatabase", {
      attributes: { "db.system": "postgresql", "db.user.email": user.email },
    })
  );

const createUser = (input: unknown) =>
  Effect.gen(function* () {
    yield* Effect.logInfo("=== 使用追踪创建用户 ===");
    yield* Effect.logInfo(
      "此示例展示跨度如何通过调用堆栈追踪操作"
    );

    const validated = yield* validateInput(input);
    const user = yield* saveToDatabase(validated);

    yield* Effect.logInfo(
      `✅ 用户创建完成:${JSON.stringify(user)}`
    );
    yield* Effect.logInfo(
      "注意:在生产环境中,跨度将发送到追踪系统,如Jaeger或Zipkin"
    );

    return user;
  }).pipe(
    // 这是整个操作的父跨度
    Effect.withSpan("createUserOperation")
  );

// 演示追踪功能
const program = Effect.gen(function* () {
  yield* Effect.logInfo("=== 使用跨度追踪操作演示 ===");

  // 创建多个用户以展示追踪作用
  const user1 = yield* createUser({ email: "user1@example.com" });

  yield* Effect.logInfo("
--- 创建第二个用户 ---");
  const user2 = yield* createUser({ email: "user2@example.com" });

  yield* Effect.logInfo("
=== 总结 ===");
  yield* Effect.logInfo("使用追踪跨度创建的用户:");
  yield* Effect.logInfo(`用户 1:ID ${user1.id}, 邮箱:${user1.email}`);
  yield* Effect.logInfo(`用户 2:ID ${user2.id}, 邮箱:${user2.email}`);
});

// 当使用追踪SDK运行时,这将产生带有根跨度
// "createUserOperation"和子跨度:"validateInput"和"saveToDatabase"的追踪。
Effect.runPromise(program);

反模式:

不向业务逻辑添加自定义跨度。 没有它们,你的追踪将只显示框架的高级信息(例如,“HTTP POST /users”)。 你将无法查看请求处理器内部各个步骤的性能,使得精确定位瓶颈非常困难。你的应用程序逻辑在追踪中仍然是一个“黑盒”。

原理:

要获得应用程序性能和流的可见性,使用Effect.withSpan("span-name")包装逻辑工作单元。你可以使用attributes选项向这些跨度添加上下文信息。


日志告诉你发生了什么,追踪告诉你为什么它很慢。在复杂应用程序中,单个用户请求可能触发对多个服务的调用(认证、数据库、外部API)。追踪让你将整个事件链可视化为单个、分层的“追踪”。

该追踪中的每个工作单元是一个跨度Effect.withSpan允许你创建自己的自定义跨度。这对于回答如下问题非常宝贵:

  • “对于此API请求,我们大部分时间花在数据库还是调用外部支付网关上?”
  • “我们用户创建逻辑的哪一部分是瓶颈?”

Effect的追踪基于行业标准OpenTelemetry构建,因此与Jaeger、Zipkin和Datadog等工具无缝集成。



🟠 高级模式

创建可观测性仪表板

规则: 创建专注于回答系统健康特定问题的仪表板。

原理:

设计仪表板以回答关于系统健康、性能和用户体验的特定问题。


好仪表板提供:

  1. 快速健康检查 - 一目了然地查看问题
  2. 趋势分析 - 发现逐渐退化
  3. 调试辅助 - 在事件期间关联指标
  4. 容量规划 - 预测资源需求


设置警报

规则: 基于SLO和症状创建警报,而不是原因。

好例子:

import { Effect, Metric, Schedule, Duration, Ref } from "effect"

// ============================================
// 1. 定义可警报条件
// ============================================

interface Alert {
  readonly name: string
  readonly severity: "critical" | "warning" | "info"
  readonly message: string
  readonly timestamp: Date
  readonly labels: Record<string, string>
}

interface AlertRule {
  readonly name: string
  readonly condition: Effect.Effect<boolean>
  readonly severity: "critical" | "warning" | "info"
  readonly message: string
  readonly labels: Record<string, string>
  readonly forDuration: Duration.DurationInput
}

// ============================================
// 2. 定义警报规则
// ============================================

const createAlertRules = (metrics: {
  errorRate: () => Effect.Effect<number>
  latencyP99: () => Effect.Effect<number>
  availability: () => Effect.Effect<number>
}): AlertRule[] => [
  {
    name: "HighErrorRate",
    condition: metrics.errorRate().pipe(Effect.map((rate) => rate > 0.01)),
    severity: "critical",
    message: "错误率超过1%",
    labels: { team: "backend", service: "api" },
    forDuration: "5分钟",
  },
  {
    name: "HighLatency",
    condition: metrics.latencyP99().pipe(Effect.map((p99) => p99 > 2)),
    severity: "warning",
    message: "P99延迟超过2秒",
    labels: { team: "backend", service: "api" },
    forDuration: "10分钟",
  },
  {
    name: "LowAvailability",
    condition: metrics.availability().pipe(Effect.map((avail) => avail < 99.9)),
    severity: "critical",
    message: "可用性低于99.9% SLO",
    labels: { team: "backend", service: "api" },
    forDuration: "5分钟",
  },
  {
    name: "ErrorBudgetLow",
    condition: Effect.succeed(false), // 基于错误预算计算实现
    severity: "warning",
    message: "错误预算低于25%",
    labels: { team: "backend", service: "api" },
    forDuration: "0秒",
  },
]

// ============================================
// 3. 警报管理器
// ============================================

interface AlertState {
  readonly firing: Map<string, { since: Date; alert: Alert }>
  readonly resolved: Alert[]
}

const makeAlertManager = Effect.gen(function* () {
  const state = yield* Ref.make<AlertState>({
    firing: new Map(),
    resolved: [],
  })

  const checkRule = (rule: AlertRule) =>
    Effect.gen(function* () {
      const isTriggered = yield* rule.condition

      yield* Ref.modify(state, (s) => {
        const firing = new Map(s.firing)
        const resolved = [...s.resolved]
        const key = rule.name

        if (isTriggered) {
          if (!firing.has(key)) {
            // 新警报
            firing.set(key, {
              since: new Date(),
              alert: {
                name: rule.name,
                severity: rule.severity,
                message: rule.message,
                timestamp: new Date(),
                labels: rule.labels,
              },
            })
          }
        } else {
          if (firing.has(key)) {
            // 警报已解决
            const prev = firing.get(key)!
            resolved.push({
              ...prev.alert,
              message: `[已解决] ${prev.alert.message}`,
              timestamp: new Date(),
            })
            firing.delete(key)
          }
        }

        return [undefined, { firing, resolved }]
      })
    })

  const getActiveAlerts = () =>
    Ref.get(state).pipe(
      Effect.map((s) => Array.from(s.firing.values()).map((f) => f.alert))
    )

  const getRecentResolved = () =>
    Ref.get(state).pipe(Effect.map((s) => s.resolved.slice(-10)))

  return {
    checkRule,
    getActiveAlerts,
    getRecentResolved,
  }
})

// ============================================
// 4. 警报通知
// ============================================

interface NotificationChannel {
  readonly send: (alert: Alert) => Effect.Effect<void>
}

const slackChannel: NotificationChannel = {
  send: (alert) =>
    Effect.gen(function* () {
      const emoji =
        alert.severity === "critical"
          ? "🔴"
          : alert.severity === "warning"
            ? "🟡"
            : "🔵"

      yield* Effect.log(`${emoji} [${alert.severity.toUpperCase()}] ${alert.name}`).pipe(
        Effect.annotateLogs({
          message: alert.message,
          labels: JSON.stringify(alert.labels),
        })
      )

      // 在实际实现中:调用Slack API
    }),
}

const pagerDutyChannel: NotificationChannel = {
  send: (alert) =>
    Effect.gen(function* () {
      if (alert.severity === "critical") {
        yield* Effect.log("PagerDuty:创建事件").pipe(
          Effect.annotateLogs({ alert: alert.name })
        )
        // 在实际实现中:调用PagerDuty API
      }
    }),
}

// ============================================
// 5. 警报评估循环
// ============================================

const runAlertEvaluation = (
  rules: AlertRule[],
  channels: NotificationChannel[],
  interval: Duration.DurationInput
) =>
  Effect.gen(function* () {
    const alertManager = yield* makeAlertManager
    const previousAlerts = yield* Ref.make(new Set<string>())

    yield* Effect.forever(
      Effect.gen(function* () {
        // 检查所有规则
        for (const rule of rules) {
          yield* alertManager.checkRule(rule)
        }

        // 获取当前活跃警报
        const active = yield* alertManager.getActiveAlerts()
        const current = new Set(active.map((a) => a.name))
        const previous = yield* Ref.get(previousAlerts)

        // 找到新触发的警报
        for (const alert of active) {
          if (!previous.has(alert.name)) {
            // 新警报 - 发送通知
            for (const channel of channels) {
              yield* channel.send(alert)
            }
          }
        }

        yield* Ref.set(previousAlerts, current)
        yield* Effect.sleep(interval)
      })
    )
  })

// ============================================
// 6. Prometheus警报规则(YAML)
// ============================================

const prometheusAlertRules = `
groups:
  - name: effect-app-alerts
    rules:
      - alert: HighErrorRate
        expr: |
          sum(rate(http_errors_total[5m]))
          /
          sum(rate(http_requests_total[5m]))
          > 0.01
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "检测到高错误率"
          description: "错误率为 {{ $value | humanizePercentage }}"

      - alert: HighLatency
        expr: |
          histogram_quantile(0.99,
            sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
          ) > 2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "高P99延迟"
          description: "P99延迟为 {{ $value }}s"

      - alert: SLOViolation
        expr: |
          sum(rate(http_requests_total{status!~"5.."}[30m]))
          /
          sum(rate(http_requests_total[30m]))
          < 0.999
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "SLO违规"
          description: "可用性为 {{ $value | humanizePercentage }}"
`

原理:

基于面向用户的症状(SLO违规)而不是系统指标(CPU使用率)设置警报。


好的警报:

  1. 捕捉真正问题 - 当用户受影响时警报
  2. 减少噪音 - 较少误报
  3. 启用响应 - 可操作信息
  4. 支持SLO - 跟踪服务级别目标


将指标导出到Prometheus

规则: 使用Effect指标并暴露一个/metrics端点用于Prometheus抓取。

好例子:

import { Effect, Metric, MetricLabel, Duration } from "effect"
import { HttpServerResponse } from "@effect/platform"

// ============================================
// 1. 定义应用程序指标
// ============================================

// 计数器 - 计数事件
const httpRequestsTotal = Metric.counter("http_requests_total", {
  description: "HTTP请求总数",
})

// 带标签的计数器
const httpRequestsByStatus = Metric.counter("http_requests_by_status", {
  description: "按状态码的HTTP请求",
})

// 仪表 - 当前值
const activeConnections = Metric.gauge("active_connections", {
  description: "活跃连接数",
})

// 直方图 - 值的分布
const requestDuration = Metric.histogram("http_request_duration_seconds", {
  description: "HTTP请求持续时间,单位秒",
  boundaries: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
})

// 摘要 - 百分位数
const responseSizeBytes = Metric.summary("http_response_size_bytes", {
  description: "HTTP响应大小,单位字节",
  maxAge: Duration.minutes(5),
  maxSize: 100,
  quantiles: [0.5, 0.9, 0.99],
})

// ============================================
// 2. 用指标仪器化代码
// ============================================

const handleRequest = (path: string, status: number) =>
  Effect.gen(function* () {
    const startTime = Date.now()

    // 递增请求计数器
    yield* Metric.increment(httpRequestsTotal)

    // 带标签递增
    yield* Metric.increment(
      httpRequestsByStatus.pipe(
        Metric.tagged("status", String(status)),
        Metric.tagged("path", path)
      )
    )

    // 跟踪活跃连接
    yield* Metric.increment(activeConnections)

    // 模拟工作
    yield* Effect.sleep("100毫秒")

    // 记录持续时间
    const duration = (Date.now() - startTime) / 1000
    yield* Metric.update(requestDuration, duration)

    // 记录响应大小
    yield* Metric.update(responseSizeBytes, 1024)

    // 递减活跃连接
    yield* Metric.decrement(activeConnections)
  })

// ============================================
// 3. Prometheus文本格式导出器
// ============================================

interface MetricSnapshot {
  name: string
  type: "counter" | "gauge" | "histogram" | "summary"
  help: string
  values: Array<{
    labels: Record<string, string>
    value: number
  }>
  // 对于直方图
  buckets?: Array<{
    le: number
    count: number
    labels?: Record<string, string>
  }>
  sum?: number
  count?: number
}

const formatPrometheusMetrics = (metrics: MetricSnapshot[]): string => {
  const lines: string[] = []

  for (const metric of metrics) {
    // 帮助行
    lines.push(`# HELP ${metric.name} ${metric.help}`)
    lines.push(`# TYPE ${metric.name} ${metric.type}`)

    // 值
    for (const { labels, value } of metric.values) {
      const labelStr = Object.entries(labels)
        .map(([k, v]) => `${k}="${v}"`)
        .join(",")

      if (labelStr) {
        lines.push(`${metric.name}{${labelStr}} ${value}`)
      } else {
        lines.push(`${metric.name} ${value}`)
      }
    }

    // 直方图桶
    if (metric.buckets) {
      for (const bucket of metric.buckets) {
        const labelStr = Object.entries(bucket.labels || {})
          .map(([k, v]) => `${k}="${v}"`)
          .concat([`le="${bucket.le}"`])
          .join(",")
        lines.push(`${metric.name}_bucket{${labelStr}} ${bucket.count}`)
      }
      lines.push(`${metric.name}_sum ${metric.sum}`)
      lines.push(`${metric.name}_count ${metric.count}`)
    }

    lines.push("")
  }

  return lines.join("
")
}

// ============================================
// 4. /metrics端点处理器
// ============================================

const metricsHandler = Effect.gen(function* () {
  // 在实际实现中,从Effect的MetricRegistry读取
  const metrics: MetricSnapshot[] = [
    {
      name: "http_requests_total",
      type: "counter",
      help: "HTTP请求总数",
      values: [{ labels: {}, value: 1234 }],
    },
    {
      name: "http_requests_by_status",
      type: "counter",
      help: "按状态码的HTTP请求",
      values: [
        { labels: { status: "200", path: "/api/users" }, value: 1000 },
        { labels: { status: "404", path: "/api/users" }, value: 50 },
        { labels: { status: "500", path: "/api/users" }, value: 10 },
      ],
    },
    {
      name: "active_connections",
      type: "gauge",
      help: "活跃连接数",
      values: [{ labels: {}, value: 42 }],
    },
    {
      name: "http_request_duration_seconds",
      type: "histogram",
      help: "HTTP请求持续时间,单位秒",
      values: [],
      buckets: [
        { le: 0.01, count: 100 },
        { le: 0.05, count: 500 },
        { le: 0.1, count: 800 },
        { le: 0.25, count: 950 },
        { le: 0.5, count: 990 },
        { le: 1, count: 999 },
        { le: Infinity, count: 1000 },
      ],
      sum: 123.456,
      count: 1000,
    },
  ]

  const body = formatPrometheusMetrics(metrics)

  return HttpServerResponse.text(body, {
    headers: {
      "Content-Type": "text/plain; version=0.0.4; charset=utf-8",
    },
  })
})

// ============================================
// 5. 示例输出
// ============================================

/*
# HELP http_requests_total HTTP请求总数
# TYPE http_requests_total counter
http_requests_total 1234

# HELP http_requests_by_status 按状态码的HTTP请求
# TYPE http_requests_by_status counter
http_requests_by_status{status="200",path="/api/users"} 1000
http_requests_by_status{status="404",path="/api/users"} 50
http_requests_by_status{status="500",path="/api/users"} 10

# HELP active_connections 活跃连接数
# TYPE active_connections gauge
active_connections 42

# HELP http_request_duration_seconds HTTP请求持续时间,单位秒
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{le="0.01"} 100
http_request_duration_seconds_bucket{le="0.05"} 500
http_request_duration_seconds_bucket{le="0.1"} 800
http_request_duration_seconds_bucket{le="+Inf"} 1000
http_request_duration_seconds_sum 123.456
http_request_duration_seconds_count 1000
*/

原理:

使用Effect的Metric API创建指标,并通过HTTP端点以Prometheus文本格式暴露它们。


Prometheus指标启用:

  1. 实时监控 - 查看当前情况
  2. 历史分析 - 随时间跟踪趋势
  3. 警报 - 获取问题通知
  4. 仪表板 - 可视化系统健康


实现分布式追踪

规则: 跨服务边界传播追踪上下文以关联请求。

好例子:

import { Effect, Context, Layer } from "effect"
import { HttpClient, HttpClientRequest, HttpServerRequest, HttpServerResponse } from "@effect/platform"

// ============================================
// 1. 定义追踪上下文
// ============================================

interface TraceContext {
  readonly traceId: string
  readonly spanId: string
  readonly parentSpanId?: string
  readonly sampled: boolean
}

class CurrentTrace extends Context.Tag("CurrentTrace")<
  CurrentTrace,
  TraceContext
>() {}

// W3C Trace Context 头名称
const TRACEPARENT_HEADER = "traceparent"
const TRACESTATE_HEADER = "tracestate"

// ============================================
// 2. 生成追踪ID
// ============================================

const generateTraceId = (): string =>
  Array.from(crypto.getRandomValues(new Uint8Array(16)))
    .map((b) => b.toString(16).padStart(2, "0"))
    .join("")

const generateSpanId = (): string =>
  Array.from(crypto.getRandomValues(new Uint8Array(8)))
    .map((b) => b.toString(16).padStart(2, "0"))
    .join("")

// ============================================
// 3. 解析和格式化追踪上下文
// ============================================

const parseTraceparent = (header: string): TraceContext | null => {
  // 格式:00-traceId-spanId-flags
  const parts = header.split("-")
  if (parts.length !== 4) return null

  return {
    traceId: parts[1],
    spanId: generateSpanId(),  // 此服务的新跨度
    parentSpanId: parts[2],
    sampled: parts[3] === "01",
  }
}

const formatTraceparent = (ctx: TraceContext): string =>
  `00-${ctx.traceId}-${ctx.spanId}-${ctx.sampled ? "01" : "00"}`

// ============================================
// 4. 从传入请求提取追踪
// ============================================

const extractTraceContext = Effect.gen(function* () {
  const request = yield* HttpServerRequest.HttpServerRequest

  const traceparent = request.headers[TRACEPARENT_HEADER]

  if (traceparent) {
    const parsed = parseTraceparent(traceparent)
    if (parsed) {
      yield* Effect.log("提取的追踪上下文").pipe(
        Effect.annotateLogs({
          traceId: parsed.traceId,
          parentSpanId: parsed.parentSpanId,
        })
      )
      return parsed
    }
  }

  // 无传入追踪 - 启动新的
  const newTrace: TraceContext = {
    traceId: generateTraceId(),
    spanId: generateSpanId(),
    sampled: Math.random() < 0.1,  // 10%采样
  }

  yield* Effect.log("启动新追踪").pipe(
    Effect.annotateLogs({ traceId: newTrace.traceId })
  )

  return newTrace
})

// ============================================
// 5. 传播追踪到传出请求
// ============================================

const makeTracedHttpClient = Effect.gen(function* () {
  const baseClient = yield* HttpClient.HttpClient
  const trace = yield* CurrentTrace

  return {
    get: (url: string) =>
      Effect.gen(function* () {
        // 为传出请求创建子跨度
        const childSpan: TraceContext = {
          traceId: trace.traceId,
          spanId: generateSpanId(),
          parentSpanId: trace.spanId,
          sampled: trace.sampled,
        }

        yield* Effect.log("进行追踪的HTTP请求").pipe(
          Effect.annotateLogs({
            traceId: childSpan.traceId,
            spanId: childSpan.spanId,
            url,
          })
        )

        const request = HttpClientRequest.get(url).pipe(
          HttpClientRequest.setHeader(
            TRACEPARENT_HEADER,
            formatTraceparent(childSpan)
          )
        )

        return yield* baseClient.execute(request)
      }),
  }
})

// ============================================
// 6. HTTP服务器的追踪中间件
// ============================================

const withTracing = <A, E, R>(
  handler: Effect.Effect<A, E, R | CurrentTrace>
): Effect.Effect<A, E, R | HttpServerRequest.HttpServerRequest> =>
  Effect.gen(function* () {
    const traceContext = yield* extractTraceContext

    return yield* handler.pipe(
      Effect.provideService(CurrentTrace, traceContext),
      Effect.withLogSpan(`request-${traceContext.spanId}`),
      Effect.annotateLogs({
        "trace.id": traceContext.traceId,
        "span.id": traceContext.spanId,
        "parent.span.id": traceContext.parentSpanId ?? "none",
      })
    )
  })

// ============================================
// 7. 示例:服务A调用服务B
// ============================================

// 服务B处理器
const serviceBHandler = withTracing(
  Effect.gen(function* () {
    const trace = yield* CurrentTrace
    yield* Effect.log("服务B处理请求")

    // 模拟工作
    yield* Effect.sleep("50毫秒")

    return HttpServerResponse.json({
      message: "来自服务B的问候",
      traceId: trace.traceId,
    })
  })
)

// 服务A处理器(调用服务B)
const serviceAHandler = withTracing(
  Effect.gen(function* () {
    const trace = yield* CurrentTrace
    yield* Effect.log("服务A处理请求")

    // 用追踪传播调用服务B
    const tracedClient = yield* makeTracedHttpClient
    const response = yield* tracedClient.get("http://service-b/api/data")

    yield* Effect.log("服务A从B收到响应")

    return HttpServerResponse.json({
      message: "来自服务A的问候",
      traceId: trace.traceId,
    })
  })
)

// ============================================
// 8. 运行和观察
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("=== 分布式追踪演示 ===")

  // 模拟带追踪的传入请求
  const incomingTrace: TraceContext = {
    traceId: generateTraceId(),
    spanId: generateSpanId(),
    sampled: true,
  }

  yield* Effect.log("处理追踪的请求").pipe(
    Effect.provideService(CurrentTrace, incomingTrace),
    Effect.annotateLogs({
      "trace.id": incomingTrace.traceId,
      "span.id": incomingTrace.spanId,
    })
  )
})

Effect.runPromise(program)

原理:

通过HTTP头传播追踪上下文,并在跨服务中使用一致的跨度命名来实现分布式追踪。


分布式追踪显示完整的请求旅程:

  1. 端到端可见性 - 查看整个请求流
  2. 延迟分析 - 找到慢服务
  3. 错误关联 - 跨服务链接错误
  4. 依赖映射 - 理解服务关系


集成Effect追踪与OpenTelemetry

规则: 将Effect.withSpan与OpenTelemetry集成,以导出追踪并可视化跨服务的请求流。

好例子:

import { Effect } from "effect";
// 伪代码:用实际OpenTelemetry集成替换为你的堆栈
import { trace, context, SpanStatusCode } from "@opentelemetry/api";

// 包装Effect.withSpan以导出到OpenTelemetry
function withOtelSpan<T>(
  name: string,
  effect: Effect.Effect<unknown, T, unknown>
) {
  return Effect.gen(function* () {
    const otelSpan = trace.getTracer("default").startSpan(name);
    try {
      const result = yield* effect;
      otelSpan.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (err) {
      otelSpan.setStatus({ code: SpanStatusCode.ERROR, message: String(err) });
      throw err;
    } finally {
      otelSpan.end();
    }
  });
}

// 使用
const program = withOtelSpan(
  "fetchUser",
  Effect.sync(() => {
    // ...获取用户逻辑
    return { id: 1, name: "Alice" };
  })
);

解释:

  • 进入效果式操作时启动OpenTelemetry跨度。
  • 根据需要设置状态和属性。
  • 操作完成或失败时结束跨度。
  • 这启用全分布式追踪和可观测性平台中的可视化。

反模式:

使用Effect.withSpan而不导出到OpenTelemetry,或缺乏分布式追踪,这会限制你诊断和可视化复杂请求流的能力。

原理:

将Effect的追踪跨度连接到OpenTelemetry,以启用分布式追踪、可视化以及跨整个堆栈的关联。

OpenTelemetry是分布式追踪的行业标准。 通过将Effect的跨度与OpenTelemetry集成,你可以深入了解请求流、性能瓶颈和依赖关系——跨所有服务和基础设施。