Effect-TSHTTP请求模式Skill effect-patterns-making-http-requests

这个技能提供了10个精心策划的Effect-TS模式,用于在TypeScript应用中安全、可靠地处理HTTP请求。包括响应验证、请求重试、日志记录、缓存、超时处理、依赖建模、测试客户端构建、速率限制处理和HTTP服务器构建。关键词:Effect-TS, HTTP请求, TypeScript, 模式, 后端开发, 软件架构。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: effect-patterns-making-http-requests 描述: Effect-TS HTTP请求模式。在Effect-TS应用中处理HTTP请求时使用。

Effect-TS 模式:HTTP请求

这个技能提供了10个精心策划的Effect-TS模式,用于进行HTTP请求。 在涉及以下任务时使用此技能:

  • 进行HTTP请求
  • Effect-TS应用中的最佳实践
  • 实际模式和解决方案

🟢 初学者模式

安全解析JSON响应

规则: 始终使用Schema验证HTTP响应,以捕获运行时的API变化。

好例子:

import { Effect, Console } from "effect"
import { Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"

// ============================================
// 1. 定义响应模式
// ============================================

const PostSchema = Schema.Struct({
  id: Schema.Number,
  title: Schema.String,
  body: Schema.String,
  userId: Schema.Number,
})

type Post = Schema.Schema.Type<typeof PostSchema>

const PostArraySchema = Schema.Array(PostSchema)

// ============================================
// 2. 获取并验证单个项目
// ============================================

const getPost = (id: number) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    const response = yield* client.get(
      `https://jsonplaceholder.typicode.com/posts/${id}`
    )
    const json = yield* HttpClientResponse.json(response)

    // 验证模式 - 如果数据不匹配则失败
    const post = yield* Schema.decodeUnknown(PostSchema)(json)

    return post
  })

// ============================================
// 3. 获取并验证数组
// ============================================

const getPosts = Effect.gen(function* () {
  const client = yield* HttpClient.HttpClient

  const response = yield* client.get(
    "https://jsonplaceholder.typicode.com/posts"
  )
  const json = yield* HttpClientResponse.json(response)

  // 验证帖子数组
  const posts = yield* Schema.decodeUnknown(PostArraySchema)(json)

  return posts
})

// ============================================
// 4. 处理验证错误
// ============================================

const safeGetPost = (id: number) =>
  getPost(id).pipe(
    Effect.catchTag("ParseError", (error) =>
      Effect.gen(function* () {
        yield* Console.error(`无效的响应格式:${error.message}`)
        // 返回默认值或以不同方式失败
        return yield* Effect.fail(new Error(`帖子${id}格式无效`))
      })
    )
  )

// ============================================
// 5. 具有可选字段的模式
// ============================================

const UserSchema = Schema.Struct({
  id: Schema.Number,
  name: Schema.String,
  email: Schema.String,
  phone: Schema.optional(Schema.String),        // 可能不存在
  website: Schema.optional(Schema.String),
  company: Schema.optional(
    Schema.Struct({
      name: Schema.String,
      catchPhrase: Schema.optional(Schema.String),
    })
  ),
})

const getUser = (id: number) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    const response = yield* client.get(
      `https://jsonplaceholder.typicode.com/users/${id}`
    )
    const json = yield* HttpClientResponse.json(response)

    return yield* Schema.decodeUnknown(UserSchema)(json)
  })

// ============================================
// 6. 运行示例
// ============================================

const program = Effect.gen(function* () {
  yield* Console.log("=== 已验证的单个帖子 ===")
  const post = yield* getPost(1)
  yield* Console.log(`标题:${post.title}`)

  yield* Console.log("
=== 已验证的帖子数组 ===")
  const posts = yield* getPosts
  yield* Console.log(`已获取${posts.length}个帖子`)

  yield* Console.log("
=== 具有可选字段的用户 ===")
  const user = yield* getUser(1)
  yield* Console.log(`用户:${user.name}`)
  yield* Console.log(`公司:${user.company?.name ?? "N/A"}`)
})

program.pipe(
  Effect.provide(NodeHttpClient.layer),
  NodeRuntime.runMain
)

原理:

使用Effect Schema验证HTTP JSON响应,确保数据在运行时匹配您的预期类型。


API可能无警告地更改:

  1. 字段消失 - 后端移除字段
  2. 类型更改 - 字符串变为数字
  3. 空值出现 - 必需字段变为可选
  4. 新字段 - 您未期望的额外数据

Schema验证立即捕获这些问题。



您的第一个HTTP请求

规则: 使用@effect/platform HttpClient进行类型安全的HTTP请求,具有自动错误处理。

好例子:

import { Effect, Console } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"

// ============================================
// 1. 简单GET请求
// ============================================

const simpleGet = Effect.gen(function* () {
  const client = yield* HttpClient.HttpClient
  
  // 进行GET请求
  const response = yield* client.get("https://jsonplaceholder.typicode.com/posts/1")
  
  // 以JSON形式获取响应
  const json = yield* HttpClientResponse.json(response)
  
  return json
})

// ============================================
// 2. 具有类型化响应的GET
// ============================================

interface Post {
  id: number
  title: string
  body: string
  userId: number
}

const getPost = (id: number) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient
    const response = yield* client.get(
      `https://jsonplaceholder.typicode.com/posts/${id}`
    )
    const post = yield* HttpClientResponse.json(response) as Effect.Effect<Post>
    return post
  })

// ============================================
// 3. 带有正文的POST
// ============================================

const createPost = (title: string, body: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient
    
    const request = HttpClientRequest.post(
      "https://jsonplaceholder.typicode.com/posts"
    ).pipe(
      HttpClientRequest.jsonBody({ title, body, userId: 1 })
    )
    
    const response = yield* client.execute(yield* request)
    const created = yield* HttpClientResponse.json(response)
    
    return created
  })

// ============================================
// 4. 处理错误
// ============================================

const safeGetPost = (id: number) =>
  getPost(id).pipe(
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Console.error(`无法获取帖子${id}:${error}`)
        return { id, title: "不可用", body: "", userId: 0 }
      })
    )
  )

// ============================================
// 5. 运行程序
// ============================================

const program = Effect.gen(function* () {
  yield* Console.log("=== 简单GET ===")
  const data = yield* simpleGet
  yield* Console.log(JSON.stringify(data, null, 2))

  yield* Console.log("
=== 类型化GET ===")
  const post = yield* getPost(1)
  yield* Console.log(`帖子:${post.title}`)

  yield* Console.log("
=== POST请求 ===")
  const created = yield* createPost("我的新帖子", "这是正文")
  yield* Console.log(`已创建:${JSON.stringify(created)}`)
})

// 提供HTTP客户端实现
program.pipe(
  Effect.provide(NodeHttpClient.layer),
  NodeRuntime.runMain
)

原理:

使用Effect的HttpClient@effect/platform进行HTTP请求,具有内置错误处理、重试和类型安全性。


Effect的HttpClient比fetch更好:

  1. 类型安全错误 - 网络失败是类型化的,不是异常
  2. 自动JSON解析 - 无需手动.json()调用
  3. 可组合 - 链接请求,添加重试、超时
  4. 可测试 - 易于在测试中模拟


🟡 中级模式

使用退避重试HTTP请求

规则: 使用Schedule重试失败的HTTP请求,具有可配置的退避策略。

好例子:

import { Effect, Schedule, Duration, Data } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse, HttpClientError } from "@effect/platform"

// ============================================
// 1. 基本指数退避重试
// ============================================

const fetchWithRetry = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((response) => HttpClientResponse.json(response)),
      Effect.retry(
        Schedule.exponential("100 millis", 2).pipe(
          Schedule.intersect(Schedule.recurs(5)),     // 最多5次重试
          Schedule.jittered                            // 添加随机性
        )
      )
    )
  })

// ============================================
// 2. 仅重试特定状态码
// ============================================

class RetryableHttpError extends Data.TaggedError("RetryableHttpError")<{
  readonly status: number
  readonly message: string
}> {}

class NonRetryableHttpError extends Data.TaggedError("NonRetryableHttpError")<{
  readonly status: number
  readonly message: string
}> {}

const isRetryable = (status: number): boolean =>
  status === 429 ||    // 速率限制
  status === 503 ||    // 服务不可用
  status === 502 ||    // 坏网关
  status === 504 ||    // 网关超时
  status >= 500        // 服务器错误

const fetchWithSelectiveRetry = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    const response = yield* client.get(url).pipe(
      Effect.flatMap((response) => {
        if (response.status >= 400) {
          if (isRetryable(response.status)) {
            return Effect.fail(new RetryableHttpError({
              status: response.status,
              message: `HTTP ${response.status}`,
            }))
          }
          return Effect.fail(new NonRetryableHttpError({
            status: response.status,
            message: `HTTP ${response.status}`,
          }))
        }
        return Effect.succeed(response)
      }),
      Effect.retry({
        schedule: Schedule.exponential("200 millis").pipe(
          Schedule.intersect(Schedule.recurs(3))
        ),
        while: (error) => error._tag === "RetryableHttpError",
      })
    )

    return yield* HttpClientResponse.json(response)
  })

// ============================================
// 3. 重试并记录日志
// ============================================

const fetchWithRetryLogging = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.retry(
        Schedule.exponential("100 millis").pipe(
          Schedule.intersect(Schedule.recurs(3)),
          Schedule.tapOutput((_, output) =>
            Effect.log(`重试尝试,等待${Duration.toMillis(output)}ms`)
          )
        )
      ),
      Effect.tapError((error) => Effect.log(`请求失败:${error}`))
    )
  })

// ============================================
// 4. 自定义重试策略
// ============================================

const customRetryPolicy = Schedule.exponential("500 millis", 2).pipe(
  Schedule.intersect(Schedule.recurs(5)),
  Schedule.union(Schedule.spaced("30 seconds")),  // 30秒后也重试
  Schedule.whileOutput((duration) => Duration.lessThanOrEqualTo(duration, "2 minutes")),
  Schedule.jittered
)

// ============================================
// 5. 尊重Retry-After头的重试
// ============================================

const fetchWithRetryAfter = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    const makeRequest = client.get(url).pipe(
      Effect.flatMap((response) => {
        if (response.status === 429) {
          const retryAfter = response.headers["retry-after"]
          const delay = retryAfter ? parseInt(retryAfter, 10) * 1000 : 1000

          return Effect.fail({
            _tag: "RateLimited" as const,
            delay,
          })
        }
        return Effect.succeed(response)
      })
    )

    return yield* makeRequest.pipe(
      Effect.retry(
        Schedule.recurWhile<{ _tag: "RateLimited"; delay: number }>(
          (error) => error._tag === "RateLimited"
        ).pipe(
          Schedule.intersect(Schedule.recurs(3)),
          Schedule.delayed((_, error) => Duration.millis(error.delay))
        )
      ),
      Effect.flatMap((r) => HttpClientResponse.json(r))
    )
  })

// ============================================
// 6. 用法
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("使用重试获取中...")

  const data = yield* fetchWithRetry("https://api.example.com/data").pipe(
    Effect.catchAll((error) => {
      return Effect.succeed({ error: "所有重试用尽" })
    })
  )

  yield* Effect.log(`结果:${JSON.stringify(data)}`)
})

原理:

使用Effect的retrySchedule自动重试失败的HTTP请求,具有指数退避和抖动。


HTTP请求因暂时原因失败:

  1. 网络问题 - 临时连接问题
  2. 服务器过载 - 503服务不可用
  3. 速率限制 - 429请求过多
  4. 超时 - 慢响应

适当的重试逻辑优雅地处理这些。



记录HTTP请求和响应

规则: 使用Effect的日志记录跟踪HTTP请求以进行调试和监控。

好例子:

import { Effect, Duration } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"

// ============================================
// 1. 简单请求/响应日志记录
// ============================================

const withLogging = <A, E>(
  request: Effect.Effect<A, E, HttpClient.HttpClient>
): Effect.Effect<A, E, HttpClient.HttpClient> =>
  Effect.gen(function* () {
    const startTime = Date.now()
    yield* Effect.log("→ HTTP请求开始...")

    const result = yield* request

    const duration = Date.now() - startTime
    yield* Effect.log(`← HTTP响应接收(${duration}ms)`)

    return result
  })

// ============================================
// 2. 详细请求日志记录
// ============================================

interface RequestLog {
  method: string
  url: string
  headers: Record<string, string>
  body?: unknown
}

interface ResponseLog {
  status: number
  headers: Record<string, string>
  duration: number
  size?: number
}

const makeLoggingClient = Effect.gen(function* () {
  const baseClient = yield* HttpClient.HttpClient

  const logRequest = (method: string, url: string, headers: Record<string, string>) =>
    Effect.log("HTTP请求").pipe(
      Effect.annotateLogs({
        method,
        url,
        headers: JSON.stringify(headers),
      })
    )

  const logResponse = (status: number, duration: number, headers: Record<string, string>) =>
    Effect.log("HTTP响应").pipe(
      Effect.annotateLogs({
        status: String(status),
        duration: `${duration}ms`,
        headers: JSON.stringify(headers),
      })
    )

  return {
    get: <T>(url: string, options?: { headers?: Record<string, string> }) =>
      Effect.gen(function* () {
        const headers = options?.headers ?? {}
        yield* logRequest("GET", url, headers)
        const startTime = Date.now()

        const response = yield* baseClient.get(url)

        yield* logResponse(
          response.status,
          Date.now() - startTime,
          response.headers
        )

        return yield* HttpClientResponse.json(response) as Effect.Effect<T>
      }),

    post: <T>(url: string, body: unknown, options?: { headers?: Record<string, string> }) =>
      Effect.gen(function* () {
        const headers = options?.headers ?? {}
        yield* logRequest("POST", url, headers).pipe(
          Effect.annotateLogs("body", JSON.stringify(body).slice(0, 200))
        )
        const startTime = Date.now()

        const request = yield* HttpClientRequest.post(url).pipe(
          HttpClientRequest.jsonBody(body)
        )
        const response = yield* baseClient.execute(request)

        yield* logResponse(
          response.status,
          Date.now() - startTime,
          response.headers
        )

        return yield* HttpClientResponse.json(response) as Effect.Effect<T>
      }),
  }
})

// ============================================
// 3. 使用范围进行计时日志记录
// ============================================

const fetchWithSpan = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.withLogSpan(`HTTP GET ${url}`)
    )
  })

// ============================================
// 4. 条件日志记录(调试模式)
// ============================================

const makeConditionalLoggingClient = (debug: boolean) =>
  Effect.gen(function* () {
    const baseClient = yield* HttpClient.HttpClient

    const maybeLog = (message: string, data?: Record<string, unknown>) =>
      debug
        ? Effect.log(message).pipe(
            data ? Effect.annotateLogs(data) : (e) => e
          )
        : Effect.void

    return {
      get: <T>(url: string) =>
        Effect.gen(function* () {
          yield* maybeLog("HTTP请求", { method: "GET", url })
          const startTime = Date.now()

          const response = yield* baseClient.get(url)

          yield* maybeLog("HTTP响应", {
            status: String(response.status),
            duration: `${Date.now() - startTime}ms`,
          })

          return yield* HttpClientResponse.json(response) as Effect.Effect<T>
        }),
    }
  })

// ============================================
// 5. 请求ID跟踪
// ============================================

const makeTrackedClient = Effect.gen(function* () {
  const baseClient = yield* HttpClient.HttpClient

  return {
    get: <T>(url: string) =>
      Effect.gen(function* () {
        const requestId = crypto.randomUUID().slice(0, 8)

        yield* Effect.log("HTTP请求").pipe(
          Effect.annotateLogs({
            requestId,
            method: "GET",
            url,
          })
        )

        const startTime = Date.now()
        const response = yield* baseClient.get(url)

        yield* Effect.log("HTTP响应").pipe(
          Effect.annotateLogs({
            requestId,
            status: String(response.status),
            duration: `${Date.now() - startTime}ms`,
          })
        )

        return yield* HttpClientResponse.json(response) as Effect.Effect<T>
      })
  }
})

// ============================================
// 6. 错误日志记录
// ============================================

const fetchWithErrorLogging = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((response) => {
        if (response.status >= 400) {
          return Effect.gen(function* () {
            yield* Effect.logError("HTTP错误").pipe(
              Effect.annotateLogs({
                url,
                status: String(response.status),
              })
            )
            return yield* Effect.fail(new Error(`HTTP ${response.status}`))
          })
        }
        return Effect.succeed(response)
      }),
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.tapError((error) =>
        Effect.logError("请求失败").pipe(
          Effect.annotateLogs({
            url,
            error: String(error),
          })
        )
      )
    )
  })

// ============================================
// 7. 用法
// ============================================

const program = Effect.gen(function* () {
  const client = yield* makeLoggingClient

  yield* Effect.log("开始HTTP操作...")

  const data = yield* client.get("https://api.example.com/users")

  yield* Effect.log("操作完成")
})

原理:

用日志记录中间件包装HTTP客户端,以捕获请求详情、响应信息和计时,用于调试和可观察性。


HTTP日志记录有助于:

  1. 调试 - 查看发送/接收的内容
  2. 性能 - 跟踪慢请求
  3. 审计 - 记录API使用
  4. 故障排除 - 诊断生产问题


缓存HTTP响应

规则: 使用内存或持久缓存存储HTTP响应。

好例子:

import { Effect, Ref, HashMap, Option, Duration } from "effect"
import { HttpClient, HttpClientResponse } from "@effect/platform"

// ============================================
// 1. 简单内存缓存
// ============================================

interface CacheEntry<T> {
  readonly data: T
  readonly timestamp: number
  readonly ttl: number
}

const makeCache = <T>() =>
  Effect.gen(function* () {
    const store = yield* Ref.make(HashMap.empty<string, CacheEntry<T>>())

    const get = (key: string): Effect.Effect<Option.Option<T>> =>
      Ref.get(store).pipe(
        Effect.map((map) => {
          const entry = HashMap.get(map, key)
          if (entry._tag === "None") return Option.none()

          const now = Date.now()
          if (now > entry.value.timestamp + entry.value.ttl) {
            return Option.none()  // 已过期
          }
          return Option.some(entry.value.data)
        })
      )

    const set = (key: string, data: T, ttl: number): Effect.Effect<void> =>
      Ref.update(store, (map) =>
        HashMap.set(map, key, {
          data,
          timestamp: Date.now(),
          ttl,
        })
      )

    const invalidate = (key: string): Effect.Effect<void> =>
      Ref.update(store, (map) => HashMap.remove(map, key))

    const clear = (): Effect.Effect<void> =>
      Ref.set(store, HashMap.empty())

    return { get, set, invalidate, clear }
  })

// ============================================
// 2. 缓存HTTP客户端
// ============================================

interface CachedHttpClient {
  readonly get: <T>(
    url: string,
    options?: { ttl?: Duration.DurationInput }
  ) => Effect.Effect<T, Error>
  readonly invalidate: (url: string) => Effect.Effect<void>
}

const makeCachedHttpClient = Effect.gen(function* () {
  const httpClient = yield* HttpClient.HttpClient
  const cache = yield* makeCache<unknown>()

  const client: CachedHttpClient = {
    get: <T>(url: string, options?: { ttl?: Duration.DurationInput }) => {
      const ttl = options?.ttl ? Duration.toMillis(Duration.decode(options.ttl)) : 60000

      return Effect.gen(function* () {
        // 先检查缓存
        const cached = yield* cache.get(url)
        if (Option.isSome(cached)) {
          yield* Effect.log(`缓存命中:${url}`)
          return cached.value as T
        }

        yield* Effect.log(`缓存未命中:${url}`)

        // 从网络获取
        const response = yield* httpClient.get(url)
        const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>

        // 存储在缓存中
        yield* cache.set(url, data, ttl)

        return data
      })
    },

    invalidate: (url) => cache.invalidate(url),
  }

  return client
})

// ============================================
// 3. 陈旧-同时重新验证模式
// ============================================

interface SWRCache<T> {
  readonly data: T
  readonly timestamp: number
  readonly staleAfter: number
  readonly expireAfter: number
}

const makeSWRClient = Effect.gen(function* () {
  const httpClient = yield* HttpClient.HttpClient
  const cache = yield* Ref.make(HashMap.empty<string, SWRCache<unknown>>())

  return {
    get: <T>(
      url: string,
      options: {
        staleAfter: Duration.DurationInput
        expireAfter: Duration.DurationInput
      }
    ) =>
      Effect.gen(function* () {
        const now = Date.now()
        const staleMs = Duration.toMillis(Duration.decode(options.staleAfter))
        const expireMs = Duration.toMillis(Duration.decode(options.expireAfter))

        const cached = yield* Ref.get(cache).pipe(
          Effect.map((map) => HashMap.get(map, url))
        )

        if (cached._tag === "Some") {
          const entry = cached.value
          const age = now - entry.timestamp

          if (age < staleMs) {
            // 新鲜 - 立即返回
            return entry.data as T
          }

          if (age < expireMs) {
            // 陈旧 - 返回缓存,后台重新验证
            yield* Effect.fork(
              httpClient.get(url).pipe(
                Effect.flatMap((r) => HttpClientResponse.json(r)),
                Effect.flatMap((data) =>
                  Ref.update(cache, (map) =>
                    HashMap.set(map, url, {
                      data,
                      timestamp: Date.now(),
                      staleAfter: staleMs,
                      expireAfter: expireMs,
                    })
                  )
                ),
                Effect.catchAll(() => Effect.void)  // 忽略错误
              )
            )
            return entry.data as T
          }
        }

        // 已过期或缺失 - 获取新鲜数据
        const response = yield* httpClient.get(url)
        const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>

        yield* Ref.update(cache, (map) =>
          HashMap.set(map, url, {
            data,
            timestamp: now,
            staleAfter: staleMs,
            expireAfter: expireMs,
          })
        )

        return data
      }),
  }
})

// ============================================
// 4. 具有请求去重化的缓存
// ============================================

const makeDeduplicatedClient = Effect.gen(function* () {
  const httpClient = yield* HttpClient.HttpClient
  const inFlight = yield* Ref.make(HashMap.empty<string, Effect.Effect<unknown>>())
  const cache = yield* makeCache<unknown>()

  return {
    get: <T>(url: string, ttl: number = 60000) =>
      Effect.gen(function* () {
        // 检查缓存
        const cached = yield* cache.get(url)
        if (Option.isSome(cached)) {
          return cached.value as T
        }

        // 检查是否已有请求在飞行中
        const pending = yield* Ref.get(inFlight).pipe(
          Effect.map((map) => HashMap.get(map, url))
        )

        if (pending._tag === "Some") {
          yield* Effect.log(`去重请求:${url}`)
          return (yield* pending.value) as T
        }

        // 进行请求
        const request = httpClient.get(url).pipe(
          Effect.flatMap((r) => HttpClientResponse.json(r)),
          Effect.tap((data) => cache.set(url, data, ttl)),
          Effect.ensuring(
            Ref.update(inFlight, (map) => HashMap.remove(map, url))
          )
        )

        // 存储飞行中请求
        yield* Ref.update(inFlight, (map) => HashMap.set(map, url, request))

        return (yield* request) as T
      }),
  }
})

// ============================================
// 5. 用法
// ============================================

const program = Effect.gen(function* () {
  const client = yield* makeCachedHttpClient

  // 第一次调用 - 缓存未命中
  yield* client.get("https://api.example.com/users/1", { ttl: "5 minutes" })

  // 第二次调用 - 缓存命中
  yield* client.get("https://api.example.com/users/1")

  // 数据更改时使失效
  yield* client.invalidate("https://api.example.com/users/1")
})

原理:

缓存HTTP响应以减少网络调用,提高延迟,并处理离线场景。


缓存提供:

  1. 性能 - 避免冗余网络调用
  2. 成本降低 - 更少的API调用
  3. 弹性 - 当API宕机时提供过时数据
  4. 速率限制安全 - 保持在配额内


为HTTP请求添加超时

规则: 始终在HTTP请求上设置超时,以确保您的应用程序不会挂起。

好例子:

import { Effect, Duration, Data } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"

// ============================================
// 1. 基本请求超时
// ============================================

const fetchWithTimeout = (url: string, timeout: Duration.DurationInput) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.timeout(timeout)
    )
    // 返回Option<A> - 如果超时则为None
  })

// ============================================
// 2. 具有自定义错误的超时
// ============================================

class RequestTimeoutError extends Data.TaggedError("RequestTimeoutError")<{
  readonly url: string
  readonly timeout: Duration.Duration
}> {}

const fetchWithTimeoutError = (url: string, timeout: Duration.DurationInput) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.timeoutFail({
        duration: timeout,
        onTimeout: () => new RequestTimeoutError({
          url,
          timeout: Duration.decode(timeout),
        }),
      })
    )
  })

// ============================================
// 3. 不同阶段的不同超时
// ============================================

const fetchWithPhasedTimeouts = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    // 连接超时(初始)
    const response = yield* client.get(url).pipe(
      Effect.timeout("5 seconds"),
      Effect.flatten,
      Effect.mapError(() => new Error("连接超时"))
    )

    // 读取超时(正文)
    const body = yield* HttpClientResponse.text(response).pipe(
      Effect.timeout("30 seconds"),
      Effect.flatten,
      Effect.mapError(() => new Error("读取超时"))
    )

    return body
  })

// ============================================
// 4. 具有回退的超时
// ============================================

interface ApiResponse {
  data: unknown
  cached: boolean
}

const fetchWithFallback = (url: string): Effect.Effect<ApiResponse> =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.map((data) => ({ data, cached: false })),
      Effect.timeout("5 seconds"),
      Effect.flatMap((result) =>
        result._tag === "Some"
          ? Effect.succeed(result.value)
          : Effect.succeed({ data: null, cached: true })  // 回退
      )
    )
  })

// ============================================
// 5. 具有中断的超时
// ============================================

const fetchWithInterrupt = (url: string) =>
  Effect.gen(function* () {
    const client = yield* HttpClient.HttpClient

    return yield* client.get(url).pipe(
      Effect.flatMap((r) => HttpClientResponse.json(r)),
      Effect.interruptible,
      Effect.timeout("10 seconds")
    )
    // 如果超时,光纤被中断,释放资源
  })

// ============================================
// 6. 可配置超时包装器
// ============================================

interface TimeoutConfig {
  readonly connect: Duration.DurationInput
  readonly read: Duration.DurationInput
  readonly total: Duration.DurationInput
}

const defaultTimeouts: TimeoutConfig = {
  connect: "5 seconds",
  read: "30 seconds",
  total: "60 seconds",
}

const createHttpClient = (config: TimeoutConfig = defaultTimeouts) =>
  Effect.gen(function* () {
    const baseClient = yield* HttpClient.HttpClient

    return {
      get: (url: string) =>
        baseClient.get(url).pipe(
          Effect.timeout(config.connect),
          Effect.flatten,
          Effect.flatMap((r) =>
            HttpClientResponse.json(r).pipe(
              Effect.timeout(config.read),
              Effect.flatten
            )
          ),
          Effect.timeout(config.total),
          Effect.flatten
        ),
    }
  })

// ============================================
// 7. 用法
// ============================================

const program = Effect.gen(function* () {
  yield* Effect.log("使用超时获取中...")

  const result = yield* fetchWithTimeoutError(
    "https://api.example.com/slow",
    "5 seconds"
  ).pipe(
    Effect.catchTag("RequestTimeoutError", (error) =>
      Effect.gen(function* () {
        yield* Effect.log(`请求到${error.url}超时`)
        return { error: "timeout" }
      })
    )
  )

  yield* Effect.log(`结果:${JSON.stringify(result)}`)
})

原理:

使用Effect的超时功能设置HTTP请求持续时间的限制,并具有适当的回退处理。


HTTP请求可以无限期挂起:

  1. 服务器问题 - 无响应服务器
  2. 网络问题 - 数据包丢失
  3. 慢响应 - 大负载
  4. 资源泄漏 - 连接从未关闭

超时防止这些阻塞您的应用程序。



将依赖项建模为服务

规则: 将依赖项建模为服务。

好例子:

import { Effect } from "effect";

// 定义Random服务,具有默认的生产实现
export class Random extends Effect.Service<Random>()("Random", {
  // 默认生产实现
  sync: () => ({
    next: Effect.sync(() => Math.random()),
  }),
}) {}

// 示例用法
const program = Effect.gen(function* () {
  const random = yield* Random;
  const value = yield* random.next;
  return value;
});

// 使用默认实现运行
const programWithLogging = Effect.gen(function* () {
  const value = yield* Effect.provide(program, Random.Default);
  yield* Effect.log(`随机值:${value}`);
  return value;
});

Effect.runPromise(programWithLogging);

解释:
通过将依赖项建模为服务,您可以轻松替换为模拟或确定性实现进行测试,从而实现更可靠和可预测的测试。

反模式:

直接在业务逻辑中调用外部API如fetch或使用不纯函数如Math.random()。这会将您的逻辑紧密耦合到特定实现,并使其难以测试。

原理:

表示任何外部依赖或不同能力——从数据库客户端到简单的UUID生成器——作为服务。

此模式是可测试性的关键。它允许您在生产中提供Live实现,在测试中提供Test实现(返回模拟数据),使您的代码解耦且可靠。


创建可测试的HTTP客户端服务

规则: 定义一个HttpClient服务,具有独立的Live和Test层,以实现可测试的API交互。

好例子:

1. 定义服务

import { Effect, Data, Layer } from "effect";

interface HttpErrorType {
  readonly _tag: "HttpError";
  readonly error: unknown;
}

const HttpError = Data.tagged<HttpErrorType>("HttpError");

interface HttpClientType {
  readonly get: <T>(url: string) => Effect.Effect<T, HttpErrorType>;
}

class HttpClient extends Effect.Service<HttpClientType>()("HttpClient", {
  sync: () => ({
    get: <T>(url: string): Effect.Effect<T, HttpErrorType> =>
      Effect.tryPromise<T>(() =>
        fetch(url).then((res) => res.json() as T)
      ).pipe(Effect.catchAll((error) => Effect.fail(HttpError({ error })))),
  }),
}) {}

// 测试实现
const TestLayer = Layer.succeed(
  HttpClient,
  HttpClient.of({
    get: <T>(_url: string) => Effect.succeed({ title: "模拟数据" } as T),
  })
);

// 示例用法
const program = Effect.gen(function* () {
  const client = yield* HttpClient;
  yield* Effect.logInfo("获取数据中...");
  const data = yield* client.get<{ title: string }>(
    "https://api.example.com/data"
  );
  yield* Effect.logInfo(`接收数据:${JSON.stringify(data)}`);
});

// 使用测试实现运行
Effect.runPromise(Effect.provide(program, TestLayer));

2. 创建Live实现

import { Effect, Data, Layer } from "effect";

interface HttpErrorType {
  readonly _tag: "HttpError";
  readonly error: unknown;
}

const HttpError = Data.tagged<HttpErrorType>("HttpError");

interface HttpClientType {
  readonly get: <T>(url: string) => Effect.Effect<T, HttpErrorType>;
}

class HttpClient extends Effect.Service<HttpClientType>()("HttpClient", {
  sync: () => ({
    get: <T>(url: string): Effect.Effect<T, HttpErrorType> =>
      Effect.tryPromise({
        try: () => fetch(url).then((res) => res.json()),
        catch: (error) => HttpError({ error }),
      }),
  }),
}) {}

// 测试实现
const TestLayer = Layer.succeed(
  HttpClient,
  HttpClient.of({
    get: <T>(_url: string) => Effect.succeed({ title: "模拟数据" } as T),
  })
);

// 示例用法
const program = Effect.gen(function* () {
  const client = yield* HttpClient;
  yield* Effect.logInfo("获取数据中...");
  const data = yield* client.get<{ title: string }>(
    "https://api.example.com/data"
  );
  yield* Effect.logInfo(`接收数据:${JSON.stringify(data)}`);
});

// 使用测试实现运行
Effect.runPromise(Effect.provide(program, TestLayer));

3. 创建Test实现

// src/services/HttpClientTest.ts
import { Effect, Layer } from "effect";
import { HttpClient } from "./HttpClient";

export const HttpClientTest = Layer.succeed(
  HttpClient,
  HttpClient.of({
    get: (url) => Effect.succeed({ mock: "data", url }),
  })
);

4. 在业务逻辑中使用

您的业务逻辑现在干净且仅依赖于抽象的HttpClient

// src/features/User/UserService.ts
import { Effect } from "effect";
import { HttpClient } from "../../services/HttpClient";

export const getUserFromApi = (id: number) =>
  Effect.gen(function* () {
    const client = yield* HttpClient;
    const data = yield* client.get(`https://api.example.com/users/${id}`);
    // ... 逻辑以解析和返回用户
    return data;
  });

反模式:

直接从您的业务逻辑函数调用fetch。这在您的函数上创建了硬依赖到全局fetch API,使函数难以测试和重用。

import { Effect } from "effect";

// ❌ 错误:此函数不易测试。
export const getUserDirectly = (id: number) =>
  Effect.tryPromise({
    try: () =>
      fetch(`https://api.example.com/users/${id}`).then((res) => res.json()),
    catch: () => "ApiError" as const,
  });

原理:

要与外部API交互,定义一个HttpClient服务。为此服务创建两个独立的Layer实现:

  1. HttpClientLive:使用真实HTTP客户端(如fetch)进行网络请求的生产实现。
  2. HttpClientTest:返回模拟数据的测试实现,允许您测试业务逻辑而无需进行实际网络调用。

直接在业务逻辑中使用fetch使其几乎无法测试。您的测试将变得缓慢、不稳定(依赖于网络条件),并可能有意外副作用。

通过将HTTP客户端抽象为服务,您将应用程序的逻辑与HTTP请求的具体实现方式解耦。您的业务逻辑仅依赖于抽象的HttpClient接口。在生产中,您提供Live层。在测试中,您提供Test层。这使您的测试快速、确定且可靠。



处理速率限制响应

规则: 检测429响应,并在Retry-After期间后自动重试。

好例子:

import { Effect, Schedule, Duration, Data, Ref } from "effect"
import { HttpClient, HttpClientResponse } from "@effect/platform"

// ============================================
// 1. 速率限制错误类型
// ============================================

class RateLimitedError extends Data.TaggedError("RateLimitedError")<{
  readonly retryAfter: number
  readonly limit: number | undefined
  readonly remaining: number | undefined
  readonly reset: number | undefined
}> {}

// ============================================
// 2. 解析速率限制头
// ============================================

interface RateLimitInfo {
  readonly retryAfter: number
  readonly limit?: number
  readonly remaining?: number
  readonly reset?: number
}

const parseRateLimitHeaders = (headers: Record<string, string>): RateLimitInfo => {
  // 解析Retry-After(秒或日期)
  const retryAfterHeader = headers["retry-after"]
  let retryAfter = 60  // 默认60秒

  if (retryAfterHeader) {
    const parsed = parseInt(retryAfterHeader, 10)
    if (!isNaN(parsed)) {
      retryAfter = parsed
    } else {
      // 尝试解析为日期
      const date = Date.parse(retryAfterHeader)
      if (!isNaN(date)) {
        retryAfter = Math.max(0, Math.ceil((date - Date.now()) / 1000))
      }
    }
  }

  return {
    retryAfter,
    limit: headers["x-ratelimit-limit"] ? parseInt(headers["x-ratelimit-limit"], 10) : undefined,
    remaining: headers["x-ratelimit-remaining"] ? parseInt(headers["x-ratelimit-remaining"], 10) : undefined,
    reset: headers["x-ratelimit-reset"] ? parseInt(headers["x-ratelimit-reset"], 10) : undefined,
  }
}

// ============================================
// 3. 具有速率限制处理的HTTP客户端
// ============================================

const makeRateLimitAwareClient = Effect.gen(function* () {
  const httpClient = yield* HttpClient.HttpClient

  return {
    get: <T>(url: string) =>
      Effect.gen(function* () {
        const response = yield* httpClient.get(url)

        if (response.status === 429) {
          const rateLimitInfo = parseRateLimitHeaders(response.headers)

          yield* Effect.log(
            `速率限制。重试后${rateLimitInfo.retryAfter}s`
          )

          return yield* Effect.fail(new RateLimitedError({
            retryAfter: rateLimitInfo.retryAfter,
            limit: rateLimitInfo.limit,
            remaining: rateLimitInfo.remaining,
            reset: rateLimitInfo.reset,
          }))
        }

        return yield* HttpClientResponse.json(response) as Effect.Effect<T>
      }).pipe(
        Effect.retry({
          schedule: Schedule.recurWhile<RateLimitedError>(
            (e) => e._tag === "RateLimitedError"
          ).pipe(
            Schedule.intersect(Schedule.recurs(3)),
            Schedule.delayed((_, error) =>
              Duration.seconds(error.retryAfter + 1)  // 添加1s缓冲区
            )
          ),
          while: (error) => error._tag === "RateLimitedError",
        })
      ),
  }
})

// ============================================
// 4. 主动速率限制(客户端)
// ============================================

interface RateLimiter {
  readonly acquire: () => Effect.Effect<void>
  readonly release: () => Effect.Effect<void>
}

const makeClientRateLimiter = (requestsPerSecond: number) =>
  Effect.gen(function* () {
    const tokens = yield* Ref.make(requestsPerSecond)
    const interval = 1000 / requestsPerSecond

    // 定期补充令牌
    yield* Effect.fork(
      Effect.forever(
        Effect.gen(function* () {
          yield* Effect.sleep(Duration.millis(interval))
          yield* Ref.update(tokens, (n) => Math.min(n + 1, requestsPerSecond))
        })
      )
    )

    const limiter: RateLimiter = {
      acquire: () =>
        Effect.gen(function* () {
          let acquired = false
          while (!acquired) {
            const current = yield* Ref.get(tokens)
            if (current > 0) {
              yield* Ref.update(tokens, (n) => n - 1)
              acquired = true
            } else {
              yield* Effect.sleep(Duration.millis(interval))
            }
          }
        }),

      release: () => Ref.update(tokens, (n) => Math.min(n + 1, requestsPerSecond)),
    }

    return limiter
  })

// ============================================
// 5. 组合客户端
// ============================================

const makeRobustHttpClient = (requestsPerSecond: number) =>
  Effect.gen(function* () {
    const httpClient = yield* HttpClient.HttpClient
    const rateLimiter = yield* makeClientRateLimiter(requestsPerSecond)

    return {
      get: <T>(url: string) =>
        Effect.gen(function* () {
          // 等待速率限制器令牌
          yield* rateLimiter.acquire()

          const response = yield* httpClient.get(url)

          if (response.status === 429) {
            const info = parseRateLimitHeaders(response.headers)
            yield* Effect.log(`服务器速率限制命中,等待${info.retryAfter}s`)
            yield* Effect.sleep(Duration.seconds(info.retryAfter))
            return yield* Effect.fail(new Error("速率限制"))
          }

          return yield* HttpClientResponse.json(response) as Effect.Effect<T>
        }).pipe(
          Effect.retry(
            Schedule.exponential("1 second").pipe(
              Schedule.intersect(Schedule.recurs(3))
            )
          )
        ),
    }
  })

// ============================================
// 6. 批量请求以保持在限制内
// ============================================

const batchRequests = <T>(
  urls: string[],
  requestsPerSecond: number
) =>
  Effect.gen(function* () {
    const httpClient = yield* HttpClient.HttpClient
    const results: T[] = []
    const interval = 1000 / requestsPerSecond

    for (const url of urls) {
      const response = yield* httpClient.get(url)
      const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>
      results.push(data)

      // 请求之间等待
      if (urls.indexOf(url) < urls.length - 1) {
        yield* Effect.sleep(Duration.millis(interval))
      }
    }

    return results
  })

// ============================================
// 7. 用法
// ============================================

const program = Effect.gen(function* () {
  const client = yield* makeRateLimitAwareClient

  yield* Effect.log("进行速率限制请求...")

  const data = yield* client.get("https://api.example.com/data").pipe(
    Effect.catchTag("RateLimitedError", (error) =>
      Effect.gen(function* () {
        yield* Effect.log(`在速率限制后放弃。限制:${error.limit}`)
        return { error: "rate_limited" }
      })
    )
  )

  yield* Effect.log(`结果:${JSON.stringify(data)}`)
})

原理:

通过读取Retry-After头并在重试前等待来处理HTTP 429(请求过多)响应。


速率限制保护API:

  1. 公平使用 - 在客户端之间共享资源
  2. 稳定性 - 防止过载
  3. 配额 - 强制执行计费层级

尊重限制防止禁令并确保可靠访问。



🟠 高级模式

构建基本HTTP服务器

规则: 使用从Layer创建的管理运行时来处理Node.js HTTP服务器中的请求。

好例子:

此示例创建一个简单服务器,具有Greeter服务。服务器启动,创建一个包含Greeter的运行时,然后使用该运行时处理请求。

import { HttpServer, HttpServerResponse } from "@effect/platform";
import { NodeHttpServer } from "@effect/platform-node";
import { Duration, Effect, Fiber, Layer } from "effect";
import { createServer } from "node:http";

// 使用Node的内置HTTP服务器创建服务器层
const ServerLive = NodeHttpServer.layer(() => createServer(), { port: 3001 });

// 定义您的HTTP应用(这里对每个请求响应"Hello World")
const app = Effect.gen(function* () {
  yield* Effect.logInfo("接收到HTTP请求");
  return yield* HttpServerResponse.text("Hello World");
});

const serverLayer = HttpServer.serve(app).pipe(Layer.provide(ServerLive));

const program = Effect.gen(function* () {
  yield* Effect.logInfo("服务器启动于 http://localhost:3001");
  const fiber = yield* Layer.launch(serverLayer).pipe(Effect.fork);
  yield* Effect.sleep(Duration.seconds(2));
  yield* Fiber.interrupt(fiber);
  yield* Effect.logInfo("服务器关闭完成");
});

Effect.runPromise(program as unknown as Effect.Effect<void, unknown, never>);

反模式:

为每个传入请求创建新运行时或重建层。这非常低效,并违背了Effect的Layer系统的目的。

import * as http from "http";
import { Effect, Layer } from "effect";
import { GreeterLive } from "./somewhere";

// ❌ 错误:这在每个请求上重建GreeterLive层。
const server = http.createServer((_req, res) => {
  const requestEffect = Effect.succeed("Hello!").pipe(
    Effect.provide(GreeterLive) // 在此处提供层是低效的
  );
  Effect.runPromise(requestEffect).then((msg) => res.end(msg));
});

原理:

要构建HTTP服务器,创建一个主AppLayer,提供所有您的应用程序服务。在启动时将此层编译为管理Runtime。使用此运行时执行每个传入HTTP请求的Effect,确保所有逻辑都在Effect系统内处理。


此模式演示了长运行Effect应用程序的完整生命周期。

  1. 设置阶段: 您将所有应用程序依赖项(数据库连接、客户端、配置)定义为Layer并将其组合成单个AppLayer
  2. 运行时创建: 使用Layer.toRuntime(AppLayer)创建高度优化的Runtime对象。这在服务器启动时完成_一次_。
  3. 请求处理: 对于每个传入请求,您创建一个Effect,描述要完成的工作(例如,解析请求、调用服务、创建响应)。
  4. 执行: 您使用在设置阶段创建的Runtime,通过Runtime.runPromise执行请求处理Effect

此架构确保您的请求处理逻辑完全可测试,受益于结构化并发,并与服务器的设置和基础设施完全解耦。