名称: effect-patterns-making-http-requests 描述: Effect-TS HTTP请求模式。在Effect-TS应用中处理HTTP请求时使用。
Effect-TS 模式:HTTP请求
这个技能提供了10个精心策划的Effect-TS模式,用于进行HTTP请求。 在涉及以下任务时使用此技能:
- 进行HTTP请求
- Effect-TS应用中的最佳实践
- 实际模式和解决方案
🟢 初学者模式
安全解析JSON响应
规则: 始终使用Schema验证HTTP响应,以捕获运行时的API变化。
好例子:
import { Effect, Console } from "effect"
import { Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"
// ============================================
// 1. 定义响应模式
// ============================================
const PostSchema = Schema.Struct({
id: Schema.Number,
title: Schema.String,
body: Schema.String,
userId: Schema.Number,
})
type Post = Schema.Schema.Type<typeof PostSchema>
const PostArraySchema = Schema.Array(PostSchema)
// ============================================
// 2. 获取并验证单个项目
// ============================================
const getPost = (id: number) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const response = yield* client.get(
`https://jsonplaceholder.typicode.com/posts/${id}`
)
const json = yield* HttpClientResponse.json(response)
// 验证模式 - 如果数据不匹配则失败
const post = yield* Schema.decodeUnknown(PostSchema)(json)
return post
})
// ============================================
// 3. 获取并验证数组
// ============================================
const getPosts = Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const response = yield* client.get(
"https://jsonplaceholder.typicode.com/posts"
)
const json = yield* HttpClientResponse.json(response)
// 验证帖子数组
const posts = yield* Schema.decodeUnknown(PostArraySchema)(json)
return posts
})
// ============================================
// 4. 处理验证错误
// ============================================
const safeGetPost = (id: number) =>
getPost(id).pipe(
Effect.catchTag("ParseError", (error) =>
Effect.gen(function* () {
yield* Console.error(`无效的响应格式:${error.message}`)
// 返回默认值或以不同方式失败
return yield* Effect.fail(new Error(`帖子${id}格式无效`))
})
)
)
// ============================================
// 5. 具有可选字段的模式
// ============================================
const UserSchema = Schema.Struct({
id: Schema.Number,
name: Schema.String,
email: Schema.String,
phone: Schema.optional(Schema.String), // 可能不存在
website: Schema.optional(Schema.String),
company: Schema.optional(
Schema.Struct({
name: Schema.String,
catchPhrase: Schema.optional(Schema.String),
})
),
})
const getUser = (id: number) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const response = yield* client.get(
`https://jsonplaceholder.typicode.com/users/${id}`
)
const json = yield* HttpClientResponse.json(response)
return yield* Schema.decodeUnknown(UserSchema)(json)
})
// ============================================
// 6. 运行示例
// ============================================
const program = Effect.gen(function* () {
yield* Console.log("=== 已验证的单个帖子 ===")
const post = yield* getPost(1)
yield* Console.log(`标题:${post.title}`)
yield* Console.log("
=== 已验证的帖子数组 ===")
const posts = yield* getPosts
yield* Console.log(`已获取${posts.length}个帖子`)
yield* Console.log("
=== 具有可选字段的用户 ===")
const user = yield* getUser(1)
yield* Console.log(`用户:${user.name}`)
yield* Console.log(`公司:${user.company?.name ?? "N/A"}`)
})
program.pipe(
Effect.provide(NodeHttpClient.layer),
NodeRuntime.runMain
)
原理:
使用Effect Schema验证HTTP JSON响应,确保数据在运行时匹配您的预期类型。
API可能无警告地更改:
- 字段消失 - 后端移除字段
- 类型更改 - 字符串变为数字
- 空值出现 - 必需字段变为可选
- 新字段 - 您未期望的额外数据
Schema验证立即捕获这些问题。
您的第一个HTTP请求
规则: 使用@effect/platform HttpClient进行类型安全的HTTP请求,具有自动错误处理。
好例子:
import { Effect, Console } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"
// ============================================
// 1. 简单GET请求
// ============================================
const simpleGet = Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
// 进行GET请求
const response = yield* client.get("https://jsonplaceholder.typicode.com/posts/1")
// 以JSON形式获取响应
const json = yield* HttpClientResponse.json(response)
return json
})
// ============================================
// 2. 具有类型化响应的GET
// ============================================
interface Post {
id: number
title: string
body: string
userId: number
}
const getPost = (id: number) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const response = yield* client.get(
`https://jsonplaceholder.typicode.com/posts/${id}`
)
const post = yield* HttpClientResponse.json(response) as Effect.Effect<Post>
return post
})
// ============================================
// 3. 带有正文的POST
// ============================================
const createPost = (title: string, body: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const request = HttpClientRequest.post(
"https://jsonplaceholder.typicode.com/posts"
).pipe(
HttpClientRequest.jsonBody({ title, body, userId: 1 })
)
const response = yield* client.execute(yield* request)
const created = yield* HttpClientResponse.json(response)
return created
})
// ============================================
// 4. 处理错误
// ============================================
const safeGetPost = (id: number) =>
getPost(id).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Console.error(`无法获取帖子${id}:${error}`)
return { id, title: "不可用", body: "", userId: 0 }
})
)
)
// ============================================
// 5. 运行程序
// ============================================
const program = Effect.gen(function* () {
yield* Console.log("=== 简单GET ===")
const data = yield* simpleGet
yield* Console.log(JSON.stringify(data, null, 2))
yield* Console.log("
=== 类型化GET ===")
const post = yield* getPost(1)
yield* Console.log(`帖子:${post.title}`)
yield* Console.log("
=== POST请求 ===")
const created = yield* createPost("我的新帖子", "这是正文")
yield* Console.log(`已创建:${JSON.stringify(created)}`)
})
// 提供HTTP客户端实现
program.pipe(
Effect.provide(NodeHttpClient.layer),
NodeRuntime.runMain
)
原理:
使用Effect的HttpClient从@effect/platform进行HTTP请求,具有内置错误处理、重试和类型安全性。
Effect的HttpClient比fetch更好:
- 类型安全错误 - 网络失败是类型化的,不是异常
- 自动JSON解析 - 无需手动
.json()调用 - 可组合 - 链接请求,添加重试、超时
- 可测试 - 易于在测试中模拟
🟡 中级模式
使用退避重试HTTP请求
规则: 使用Schedule重试失败的HTTP请求,具有可配置的退避策略。
好例子:
import { Effect, Schedule, Duration, Data } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse, HttpClientError } from "@effect/platform"
// ============================================
// 1. 基本指数退避重试
// ============================================
const fetchWithRetry = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((response) => HttpClientResponse.json(response)),
Effect.retry(
Schedule.exponential("100 millis", 2).pipe(
Schedule.intersect(Schedule.recurs(5)), // 最多5次重试
Schedule.jittered // 添加随机性
)
)
)
})
// ============================================
// 2. 仅重试特定状态码
// ============================================
class RetryableHttpError extends Data.TaggedError("RetryableHttpError")<{
readonly status: number
readonly message: string
}> {}
class NonRetryableHttpError extends Data.TaggedError("NonRetryableHttpError")<{
readonly status: number
readonly message: string
}> {}
const isRetryable = (status: number): boolean =>
status === 429 || // 速率限制
status === 503 || // 服务不可用
status === 502 || // 坏网关
status === 504 || // 网关超时
status >= 500 // 服务器错误
const fetchWithSelectiveRetry = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const response = yield* client.get(url).pipe(
Effect.flatMap((response) => {
if (response.status >= 400) {
if (isRetryable(response.status)) {
return Effect.fail(new RetryableHttpError({
status: response.status,
message: `HTTP ${response.status}`,
}))
}
return Effect.fail(new NonRetryableHttpError({
status: response.status,
message: `HTTP ${response.status}`,
}))
}
return Effect.succeed(response)
}),
Effect.retry({
schedule: Schedule.exponential("200 millis").pipe(
Schedule.intersect(Schedule.recurs(3))
),
while: (error) => error._tag === "RetryableHttpError",
})
)
return yield* HttpClientResponse.json(response)
})
// ============================================
// 3. 重试并记录日志
// ============================================
const fetchWithRetryLogging = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3)),
Schedule.tapOutput((_, output) =>
Effect.log(`重试尝试,等待${Duration.toMillis(output)}ms`)
)
)
),
Effect.tapError((error) => Effect.log(`请求失败:${error}`))
)
})
// ============================================
// 4. 自定义重试策略
// ============================================
const customRetryPolicy = Schedule.exponential("500 millis", 2).pipe(
Schedule.intersect(Schedule.recurs(5)),
Schedule.union(Schedule.spaced("30 seconds")), // 30秒后也重试
Schedule.whileOutput((duration) => Duration.lessThanOrEqualTo(duration, "2 minutes")),
Schedule.jittered
)
// ============================================
// 5. 尊重Retry-After头的重试
// ============================================
const fetchWithRetryAfter = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
const makeRequest = client.get(url).pipe(
Effect.flatMap((response) => {
if (response.status === 429) {
const retryAfter = response.headers["retry-after"]
const delay = retryAfter ? parseInt(retryAfter, 10) * 1000 : 1000
return Effect.fail({
_tag: "RateLimited" as const,
delay,
})
}
return Effect.succeed(response)
})
)
return yield* makeRequest.pipe(
Effect.retry(
Schedule.recurWhile<{ _tag: "RateLimited"; delay: number }>(
(error) => error._tag === "RateLimited"
).pipe(
Schedule.intersect(Schedule.recurs(3)),
Schedule.delayed((_, error) => Duration.millis(error.delay))
)
),
Effect.flatMap((r) => HttpClientResponse.json(r))
)
})
// ============================================
// 6. 用法
// ============================================
const program = Effect.gen(function* () {
yield* Effect.log("使用重试获取中...")
const data = yield* fetchWithRetry("https://api.example.com/data").pipe(
Effect.catchAll((error) => {
return Effect.succeed({ error: "所有重试用尽" })
})
)
yield* Effect.log(`结果:${JSON.stringify(data)}`)
})
原理:
使用Effect的retry与Schedule自动重试失败的HTTP请求,具有指数退避和抖动。
HTTP请求因暂时原因失败:
- 网络问题 - 临时连接问题
- 服务器过载 - 503服务不可用
- 速率限制 - 429请求过多
- 超时 - 慢响应
适当的重试逻辑优雅地处理这些。
记录HTTP请求和响应
规则: 使用Effect的日志记录跟踪HTTP请求以进行调试和监控。
好例子:
import { Effect, Duration } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
// ============================================
// 1. 简单请求/响应日志记录
// ============================================
const withLogging = <A, E>(
request: Effect.Effect<A, E, HttpClient.HttpClient>
): Effect.Effect<A, E, HttpClient.HttpClient> =>
Effect.gen(function* () {
const startTime = Date.now()
yield* Effect.log("→ HTTP请求开始...")
const result = yield* request
const duration = Date.now() - startTime
yield* Effect.log(`← HTTP响应接收(${duration}ms)`)
return result
})
// ============================================
// 2. 详细请求日志记录
// ============================================
interface RequestLog {
method: string
url: string
headers: Record<string, string>
body?: unknown
}
interface ResponseLog {
status: number
headers: Record<string, string>
duration: number
size?: number
}
const makeLoggingClient = Effect.gen(function* () {
const baseClient = yield* HttpClient.HttpClient
const logRequest = (method: string, url: string, headers: Record<string, string>) =>
Effect.log("HTTP请求").pipe(
Effect.annotateLogs({
method,
url,
headers: JSON.stringify(headers),
})
)
const logResponse = (status: number, duration: number, headers: Record<string, string>) =>
Effect.log("HTTP响应").pipe(
Effect.annotateLogs({
status: String(status),
duration: `${duration}ms`,
headers: JSON.stringify(headers),
})
)
return {
get: <T>(url: string, options?: { headers?: Record<string, string> }) =>
Effect.gen(function* () {
const headers = options?.headers ?? {}
yield* logRequest("GET", url, headers)
const startTime = Date.now()
const response = yield* baseClient.get(url)
yield* logResponse(
response.status,
Date.now() - startTime,
response.headers
)
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
}),
post: <T>(url: string, body: unknown, options?: { headers?: Record<string, string> }) =>
Effect.gen(function* () {
const headers = options?.headers ?? {}
yield* logRequest("POST", url, headers).pipe(
Effect.annotateLogs("body", JSON.stringify(body).slice(0, 200))
)
const startTime = Date.now()
const request = yield* HttpClientRequest.post(url).pipe(
HttpClientRequest.jsonBody(body)
)
const response = yield* baseClient.execute(request)
yield* logResponse(
response.status,
Date.now() - startTime,
response.headers
)
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
}),
}
})
// ============================================
// 3. 使用范围进行计时日志记录
// ============================================
const fetchWithSpan = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.withLogSpan(`HTTP GET ${url}`)
)
})
// ============================================
// 4. 条件日志记录(调试模式)
// ============================================
const makeConditionalLoggingClient = (debug: boolean) =>
Effect.gen(function* () {
const baseClient = yield* HttpClient.HttpClient
const maybeLog = (message: string, data?: Record<string, unknown>) =>
debug
? Effect.log(message).pipe(
data ? Effect.annotateLogs(data) : (e) => e
)
: Effect.void
return {
get: <T>(url: string) =>
Effect.gen(function* () {
yield* maybeLog("HTTP请求", { method: "GET", url })
const startTime = Date.now()
const response = yield* baseClient.get(url)
yield* maybeLog("HTTP响应", {
status: String(response.status),
duration: `${Date.now() - startTime}ms`,
})
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
}),
}
})
// ============================================
// 5. 请求ID跟踪
// ============================================
const makeTrackedClient = Effect.gen(function* () {
const baseClient = yield* HttpClient.HttpClient
return {
get: <T>(url: string) =>
Effect.gen(function* () {
const requestId = crypto.randomUUID().slice(0, 8)
yield* Effect.log("HTTP请求").pipe(
Effect.annotateLogs({
requestId,
method: "GET",
url,
})
)
const startTime = Date.now()
const response = yield* baseClient.get(url)
yield* Effect.log("HTTP响应").pipe(
Effect.annotateLogs({
requestId,
status: String(response.status),
duration: `${Date.now() - startTime}ms`,
})
)
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
})
}
})
// ============================================
// 6. 错误日志记录
// ============================================
const fetchWithErrorLogging = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((response) => {
if (response.status >= 400) {
return Effect.gen(function* () {
yield* Effect.logError("HTTP错误").pipe(
Effect.annotateLogs({
url,
status: String(response.status),
})
)
return yield* Effect.fail(new Error(`HTTP ${response.status}`))
})
}
return Effect.succeed(response)
}),
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.tapError((error) =>
Effect.logError("请求失败").pipe(
Effect.annotateLogs({
url,
error: String(error),
})
)
)
)
})
// ============================================
// 7. 用法
// ============================================
const program = Effect.gen(function* () {
const client = yield* makeLoggingClient
yield* Effect.log("开始HTTP操作...")
const data = yield* client.get("https://api.example.com/users")
yield* Effect.log("操作完成")
})
原理:
用日志记录中间件包装HTTP客户端,以捕获请求详情、响应信息和计时,用于调试和可观察性。
HTTP日志记录有助于:
- 调试 - 查看发送/接收的内容
- 性能 - 跟踪慢请求
- 审计 - 记录API使用
- 故障排除 - 诊断生产问题
缓存HTTP响应
规则: 使用内存或持久缓存存储HTTP响应。
好例子:
import { Effect, Ref, HashMap, Option, Duration } from "effect"
import { HttpClient, HttpClientResponse } from "@effect/platform"
// ============================================
// 1. 简单内存缓存
// ============================================
interface CacheEntry<T> {
readonly data: T
readonly timestamp: number
readonly ttl: number
}
const makeCache = <T>() =>
Effect.gen(function* () {
const store = yield* Ref.make(HashMap.empty<string, CacheEntry<T>>())
const get = (key: string): Effect.Effect<Option.Option<T>> =>
Ref.get(store).pipe(
Effect.map((map) => {
const entry = HashMap.get(map, key)
if (entry._tag === "None") return Option.none()
const now = Date.now()
if (now > entry.value.timestamp + entry.value.ttl) {
return Option.none() // 已过期
}
return Option.some(entry.value.data)
})
)
const set = (key: string, data: T, ttl: number): Effect.Effect<void> =>
Ref.update(store, (map) =>
HashMap.set(map, key, {
data,
timestamp: Date.now(),
ttl,
})
)
const invalidate = (key: string): Effect.Effect<void> =>
Ref.update(store, (map) => HashMap.remove(map, key))
const clear = (): Effect.Effect<void> =>
Ref.set(store, HashMap.empty())
return { get, set, invalidate, clear }
})
// ============================================
// 2. 缓存HTTP客户端
// ============================================
interface CachedHttpClient {
readonly get: <T>(
url: string,
options?: { ttl?: Duration.DurationInput }
) => Effect.Effect<T, Error>
readonly invalidate: (url: string) => Effect.Effect<void>
}
const makeCachedHttpClient = Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
const cache = yield* makeCache<unknown>()
const client: CachedHttpClient = {
get: <T>(url: string, options?: { ttl?: Duration.DurationInput }) => {
const ttl = options?.ttl ? Duration.toMillis(Duration.decode(options.ttl)) : 60000
return Effect.gen(function* () {
// 先检查缓存
const cached = yield* cache.get(url)
if (Option.isSome(cached)) {
yield* Effect.log(`缓存命中:${url}`)
return cached.value as T
}
yield* Effect.log(`缓存未命中:${url}`)
// 从网络获取
const response = yield* httpClient.get(url)
const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>
// 存储在缓存中
yield* cache.set(url, data, ttl)
return data
})
},
invalidate: (url) => cache.invalidate(url),
}
return client
})
// ============================================
// 3. 陈旧-同时重新验证模式
// ============================================
interface SWRCache<T> {
readonly data: T
readonly timestamp: number
readonly staleAfter: number
readonly expireAfter: number
}
const makeSWRClient = Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
const cache = yield* Ref.make(HashMap.empty<string, SWRCache<unknown>>())
return {
get: <T>(
url: string,
options: {
staleAfter: Duration.DurationInput
expireAfter: Duration.DurationInput
}
) =>
Effect.gen(function* () {
const now = Date.now()
const staleMs = Duration.toMillis(Duration.decode(options.staleAfter))
const expireMs = Duration.toMillis(Duration.decode(options.expireAfter))
const cached = yield* Ref.get(cache).pipe(
Effect.map((map) => HashMap.get(map, url))
)
if (cached._tag === "Some") {
const entry = cached.value
const age = now - entry.timestamp
if (age < staleMs) {
// 新鲜 - 立即返回
return entry.data as T
}
if (age < expireMs) {
// 陈旧 - 返回缓存,后台重新验证
yield* Effect.fork(
httpClient.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.flatMap((data) =>
Ref.update(cache, (map) =>
HashMap.set(map, url, {
data,
timestamp: Date.now(),
staleAfter: staleMs,
expireAfter: expireMs,
})
)
),
Effect.catchAll(() => Effect.void) // 忽略错误
)
)
return entry.data as T
}
}
// 已过期或缺失 - 获取新鲜数据
const response = yield* httpClient.get(url)
const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>
yield* Ref.update(cache, (map) =>
HashMap.set(map, url, {
data,
timestamp: now,
staleAfter: staleMs,
expireAfter: expireMs,
})
)
return data
}),
}
})
// ============================================
// 4. 具有请求去重化的缓存
// ============================================
const makeDeduplicatedClient = Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
const inFlight = yield* Ref.make(HashMap.empty<string, Effect.Effect<unknown>>())
const cache = yield* makeCache<unknown>()
return {
get: <T>(url: string, ttl: number = 60000) =>
Effect.gen(function* () {
// 检查缓存
const cached = yield* cache.get(url)
if (Option.isSome(cached)) {
return cached.value as T
}
// 检查是否已有请求在飞行中
const pending = yield* Ref.get(inFlight).pipe(
Effect.map((map) => HashMap.get(map, url))
)
if (pending._tag === "Some") {
yield* Effect.log(`去重请求:${url}`)
return (yield* pending.value) as T
}
// 进行请求
const request = httpClient.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.tap((data) => cache.set(url, data, ttl)),
Effect.ensuring(
Ref.update(inFlight, (map) => HashMap.remove(map, url))
)
)
// 存储飞行中请求
yield* Ref.update(inFlight, (map) => HashMap.set(map, url, request))
return (yield* request) as T
}),
}
})
// ============================================
// 5. 用法
// ============================================
const program = Effect.gen(function* () {
const client = yield* makeCachedHttpClient
// 第一次调用 - 缓存未命中
yield* client.get("https://api.example.com/users/1", { ttl: "5 minutes" })
// 第二次调用 - 缓存命中
yield* client.get("https://api.example.com/users/1")
// 数据更改时使失效
yield* client.invalidate("https://api.example.com/users/1")
})
原理:
缓存HTTP响应以减少网络调用,提高延迟,并处理离线场景。
缓存提供:
- 性能 - 避免冗余网络调用
- 成本降低 - 更少的API调用
- 弹性 - 当API宕机时提供过时数据
- 速率限制安全 - 保持在配额内
为HTTP请求添加超时
规则: 始终在HTTP请求上设置超时,以确保您的应用程序不会挂起。
好例子:
import { Effect, Duration, Data } from "effect"
import { HttpClient, HttpClientRequest, HttpClientResponse } from "@effect/platform"
// ============================================
// 1. 基本请求超时
// ============================================
const fetchWithTimeout = (url: string, timeout: Duration.DurationInput) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.timeout(timeout)
)
// 返回Option<A> - 如果超时则为None
})
// ============================================
// 2. 具有自定义错误的超时
// ============================================
class RequestTimeoutError extends Data.TaggedError("RequestTimeoutError")<{
readonly url: string
readonly timeout: Duration.Duration
}> {}
const fetchWithTimeoutError = (url: string, timeout: Duration.DurationInput) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.timeoutFail({
duration: timeout,
onTimeout: () => new RequestTimeoutError({
url,
timeout: Duration.decode(timeout),
}),
})
)
})
// ============================================
// 3. 不同阶段的不同超时
// ============================================
const fetchWithPhasedTimeouts = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
// 连接超时(初始)
const response = yield* client.get(url).pipe(
Effect.timeout("5 seconds"),
Effect.flatten,
Effect.mapError(() => new Error("连接超时"))
)
// 读取超时(正文)
const body = yield* HttpClientResponse.text(response).pipe(
Effect.timeout("30 seconds"),
Effect.flatten,
Effect.mapError(() => new Error("读取超时"))
)
return body
})
// ============================================
// 4. 具有回退的超时
// ============================================
interface ApiResponse {
data: unknown
cached: boolean
}
const fetchWithFallback = (url: string): Effect.Effect<ApiResponse> =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.map((data) => ({ data, cached: false })),
Effect.timeout("5 seconds"),
Effect.flatMap((result) =>
result._tag === "Some"
? Effect.succeed(result.value)
: Effect.succeed({ data: null, cached: true }) // 回退
)
)
})
// ============================================
// 5. 具有中断的超时
// ============================================
const fetchWithInterrupt = (url: string) =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient
return yield* client.get(url).pipe(
Effect.flatMap((r) => HttpClientResponse.json(r)),
Effect.interruptible,
Effect.timeout("10 seconds")
)
// 如果超时,光纤被中断,释放资源
})
// ============================================
// 6. 可配置超时包装器
// ============================================
interface TimeoutConfig {
readonly connect: Duration.DurationInput
readonly read: Duration.DurationInput
readonly total: Duration.DurationInput
}
const defaultTimeouts: TimeoutConfig = {
connect: "5 seconds",
read: "30 seconds",
total: "60 seconds",
}
const createHttpClient = (config: TimeoutConfig = defaultTimeouts) =>
Effect.gen(function* () {
const baseClient = yield* HttpClient.HttpClient
return {
get: (url: string) =>
baseClient.get(url).pipe(
Effect.timeout(config.connect),
Effect.flatten,
Effect.flatMap((r) =>
HttpClientResponse.json(r).pipe(
Effect.timeout(config.read),
Effect.flatten
)
),
Effect.timeout(config.total),
Effect.flatten
),
}
})
// ============================================
// 7. 用法
// ============================================
const program = Effect.gen(function* () {
yield* Effect.log("使用超时获取中...")
const result = yield* fetchWithTimeoutError(
"https://api.example.com/slow",
"5 seconds"
).pipe(
Effect.catchTag("RequestTimeoutError", (error) =>
Effect.gen(function* () {
yield* Effect.log(`请求到${error.url}超时`)
return { error: "timeout" }
})
)
)
yield* Effect.log(`结果:${JSON.stringify(result)}`)
})
原理:
使用Effect的超时功能设置HTTP请求持续时间的限制,并具有适当的回退处理。
HTTP请求可以无限期挂起:
- 服务器问题 - 无响应服务器
- 网络问题 - 数据包丢失
- 慢响应 - 大负载
- 资源泄漏 - 连接从未关闭
超时防止这些阻塞您的应用程序。
将依赖项建模为服务
规则: 将依赖项建模为服务。
好例子:
import { Effect } from "effect";
// 定义Random服务,具有默认的生产实现
export class Random extends Effect.Service<Random>()("Random", {
// 默认生产实现
sync: () => ({
next: Effect.sync(() => Math.random()),
}),
}) {}
// 示例用法
const program = Effect.gen(function* () {
const random = yield* Random;
const value = yield* random.next;
return value;
});
// 使用默认实现运行
const programWithLogging = Effect.gen(function* () {
const value = yield* Effect.provide(program, Random.Default);
yield* Effect.log(`随机值:${value}`);
return value;
});
Effect.runPromise(programWithLogging);
解释:
通过将依赖项建模为服务,您可以轻松替换为模拟或确定性实现进行测试,从而实现更可靠和可预测的测试。
反模式:
直接在业务逻辑中调用外部API如fetch或使用不纯函数如Math.random()。这会将您的逻辑紧密耦合到特定实现,并使其难以测试。
原理:
表示任何外部依赖或不同能力——从数据库客户端到简单的UUID生成器——作为服务。
此模式是可测试性的关键。它允许您在生产中提供Live实现,在测试中提供Test实现(返回模拟数据),使您的代码解耦且可靠。
创建可测试的HTTP客户端服务
规则: 定义一个HttpClient服务,具有独立的Live和Test层,以实现可测试的API交互。
好例子:
1. 定义服务
import { Effect, Data, Layer } from "effect";
interface HttpErrorType {
readonly _tag: "HttpError";
readonly error: unknown;
}
const HttpError = Data.tagged<HttpErrorType>("HttpError");
interface HttpClientType {
readonly get: <T>(url: string) => Effect.Effect<T, HttpErrorType>;
}
class HttpClient extends Effect.Service<HttpClientType>()("HttpClient", {
sync: () => ({
get: <T>(url: string): Effect.Effect<T, HttpErrorType> =>
Effect.tryPromise<T>(() =>
fetch(url).then((res) => res.json() as T)
).pipe(Effect.catchAll((error) => Effect.fail(HttpError({ error })))),
}),
}) {}
// 测试实现
const TestLayer = Layer.succeed(
HttpClient,
HttpClient.of({
get: <T>(_url: string) => Effect.succeed({ title: "模拟数据" } as T),
})
);
// 示例用法
const program = Effect.gen(function* () {
const client = yield* HttpClient;
yield* Effect.logInfo("获取数据中...");
const data = yield* client.get<{ title: string }>(
"https://api.example.com/data"
);
yield* Effect.logInfo(`接收数据:${JSON.stringify(data)}`);
});
// 使用测试实现运行
Effect.runPromise(Effect.provide(program, TestLayer));
2. 创建Live实现
import { Effect, Data, Layer } from "effect";
interface HttpErrorType {
readonly _tag: "HttpError";
readonly error: unknown;
}
const HttpError = Data.tagged<HttpErrorType>("HttpError");
interface HttpClientType {
readonly get: <T>(url: string) => Effect.Effect<T, HttpErrorType>;
}
class HttpClient extends Effect.Service<HttpClientType>()("HttpClient", {
sync: () => ({
get: <T>(url: string): Effect.Effect<T, HttpErrorType> =>
Effect.tryPromise({
try: () => fetch(url).then((res) => res.json()),
catch: (error) => HttpError({ error }),
}),
}),
}) {}
// 测试实现
const TestLayer = Layer.succeed(
HttpClient,
HttpClient.of({
get: <T>(_url: string) => Effect.succeed({ title: "模拟数据" } as T),
})
);
// 示例用法
const program = Effect.gen(function* () {
const client = yield* HttpClient;
yield* Effect.logInfo("获取数据中...");
const data = yield* client.get<{ title: string }>(
"https://api.example.com/data"
);
yield* Effect.logInfo(`接收数据:${JSON.stringify(data)}`);
});
// 使用测试实现运行
Effect.runPromise(Effect.provide(program, TestLayer));
3. 创建Test实现
// src/services/HttpClientTest.ts
import { Effect, Layer } from "effect";
import { HttpClient } from "./HttpClient";
export const HttpClientTest = Layer.succeed(
HttpClient,
HttpClient.of({
get: (url) => Effect.succeed({ mock: "data", url }),
})
);
4. 在业务逻辑中使用
您的业务逻辑现在干净且仅依赖于抽象的HttpClient。
// src/features/User/UserService.ts
import { Effect } from "effect";
import { HttpClient } from "../../services/HttpClient";
export const getUserFromApi = (id: number) =>
Effect.gen(function* () {
const client = yield* HttpClient;
const data = yield* client.get(`https://api.example.com/users/${id}`);
// ... 逻辑以解析和返回用户
return data;
});
反模式:
直接从您的业务逻辑函数调用fetch。这在您的函数上创建了硬依赖到全局fetch API,使函数难以测试和重用。
import { Effect } from "effect";
// ❌ 错误:此函数不易测试。
export const getUserDirectly = (id: number) =>
Effect.tryPromise({
try: () =>
fetch(`https://api.example.com/users/${id}`).then((res) => res.json()),
catch: () => "ApiError" as const,
});
原理:
要与外部API交互,定义一个HttpClient服务。为此服务创建两个独立的Layer实现:
HttpClientLive:使用真实HTTP客户端(如fetch)进行网络请求的生产实现。HttpClientTest:返回模拟数据的测试实现,允许您测试业务逻辑而无需进行实际网络调用。
直接在业务逻辑中使用fetch使其几乎无法测试。您的测试将变得缓慢、不稳定(依赖于网络条件),并可能有意外副作用。
通过将HTTP客户端抽象为服务,您将应用程序的逻辑与HTTP请求的具体实现方式解耦。您的业务逻辑仅依赖于抽象的HttpClient接口。在生产中,您提供Live层。在测试中,您提供Test层。这使您的测试快速、确定且可靠。
处理速率限制响应
规则: 检测429响应,并在Retry-After期间后自动重试。
好例子:
import { Effect, Schedule, Duration, Data, Ref } from "effect"
import { HttpClient, HttpClientResponse } from "@effect/platform"
// ============================================
// 1. 速率限制错误类型
// ============================================
class RateLimitedError extends Data.TaggedError("RateLimitedError")<{
readonly retryAfter: number
readonly limit: number | undefined
readonly remaining: number | undefined
readonly reset: number | undefined
}> {}
// ============================================
// 2. 解析速率限制头
// ============================================
interface RateLimitInfo {
readonly retryAfter: number
readonly limit?: number
readonly remaining?: number
readonly reset?: number
}
const parseRateLimitHeaders = (headers: Record<string, string>): RateLimitInfo => {
// 解析Retry-After(秒或日期)
const retryAfterHeader = headers["retry-after"]
let retryAfter = 60 // 默认60秒
if (retryAfterHeader) {
const parsed = parseInt(retryAfterHeader, 10)
if (!isNaN(parsed)) {
retryAfter = parsed
} else {
// 尝试解析为日期
const date = Date.parse(retryAfterHeader)
if (!isNaN(date)) {
retryAfter = Math.max(0, Math.ceil((date - Date.now()) / 1000))
}
}
}
return {
retryAfter,
limit: headers["x-ratelimit-limit"] ? parseInt(headers["x-ratelimit-limit"], 10) : undefined,
remaining: headers["x-ratelimit-remaining"] ? parseInt(headers["x-ratelimit-remaining"], 10) : undefined,
reset: headers["x-ratelimit-reset"] ? parseInt(headers["x-ratelimit-reset"], 10) : undefined,
}
}
// ============================================
// 3. 具有速率限制处理的HTTP客户端
// ============================================
const makeRateLimitAwareClient = Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
return {
get: <T>(url: string) =>
Effect.gen(function* () {
const response = yield* httpClient.get(url)
if (response.status === 429) {
const rateLimitInfo = parseRateLimitHeaders(response.headers)
yield* Effect.log(
`速率限制。重试后${rateLimitInfo.retryAfter}s`
)
return yield* Effect.fail(new RateLimitedError({
retryAfter: rateLimitInfo.retryAfter,
limit: rateLimitInfo.limit,
remaining: rateLimitInfo.remaining,
reset: rateLimitInfo.reset,
}))
}
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
}).pipe(
Effect.retry({
schedule: Schedule.recurWhile<RateLimitedError>(
(e) => e._tag === "RateLimitedError"
).pipe(
Schedule.intersect(Schedule.recurs(3)),
Schedule.delayed((_, error) =>
Duration.seconds(error.retryAfter + 1) // 添加1s缓冲区
)
),
while: (error) => error._tag === "RateLimitedError",
})
),
}
})
// ============================================
// 4. 主动速率限制(客户端)
// ============================================
interface RateLimiter {
readonly acquire: () => Effect.Effect<void>
readonly release: () => Effect.Effect<void>
}
const makeClientRateLimiter = (requestsPerSecond: number) =>
Effect.gen(function* () {
const tokens = yield* Ref.make(requestsPerSecond)
const interval = 1000 / requestsPerSecond
// 定期补充令牌
yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep(Duration.millis(interval))
yield* Ref.update(tokens, (n) => Math.min(n + 1, requestsPerSecond))
})
)
)
const limiter: RateLimiter = {
acquire: () =>
Effect.gen(function* () {
let acquired = false
while (!acquired) {
const current = yield* Ref.get(tokens)
if (current > 0) {
yield* Ref.update(tokens, (n) => n - 1)
acquired = true
} else {
yield* Effect.sleep(Duration.millis(interval))
}
}
}),
release: () => Ref.update(tokens, (n) => Math.min(n + 1, requestsPerSecond)),
}
return limiter
})
// ============================================
// 5. 组合客户端
// ============================================
const makeRobustHttpClient = (requestsPerSecond: number) =>
Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
const rateLimiter = yield* makeClientRateLimiter(requestsPerSecond)
return {
get: <T>(url: string) =>
Effect.gen(function* () {
// 等待速率限制器令牌
yield* rateLimiter.acquire()
const response = yield* httpClient.get(url)
if (response.status === 429) {
const info = parseRateLimitHeaders(response.headers)
yield* Effect.log(`服务器速率限制命中,等待${info.retryAfter}s`)
yield* Effect.sleep(Duration.seconds(info.retryAfter))
return yield* Effect.fail(new Error("速率限制"))
}
return yield* HttpClientResponse.json(response) as Effect.Effect<T>
}).pipe(
Effect.retry(
Schedule.exponential("1 second").pipe(
Schedule.intersect(Schedule.recurs(3))
)
)
),
}
})
// ============================================
// 6. 批量请求以保持在限制内
// ============================================
const batchRequests = <T>(
urls: string[],
requestsPerSecond: number
) =>
Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient
const results: T[] = []
const interval = 1000 / requestsPerSecond
for (const url of urls) {
const response = yield* httpClient.get(url)
const data = yield* HttpClientResponse.json(response) as Effect.Effect<T>
results.push(data)
// 请求之间等待
if (urls.indexOf(url) < urls.length - 1) {
yield* Effect.sleep(Duration.millis(interval))
}
}
return results
})
// ============================================
// 7. 用法
// ============================================
const program = Effect.gen(function* () {
const client = yield* makeRateLimitAwareClient
yield* Effect.log("进行速率限制请求...")
const data = yield* client.get("https://api.example.com/data").pipe(
Effect.catchTag("RateLimitedError", (error) =>
Effect.gen(function* () {
yield* Effect.log(`在速率限制后放弃。限制:${error.limit}`)
return { error: "rate_limited" }
})
)
)
yield* Effect.log(`结果:${JSON.stringify(data)}`)
})
原理:
通过读取Retry-After头并在重试前等待来处理HTTP 429(请求过多)响应。
速率限制保护API:
- 公平使用 - 在客户端之间共享资源
- 稳定性 - 防止过载
- 配额 - 强制执行计费层级
尊重限制防止禁令并确保可靠访问。
🟠 高级模式
构建基本HTTP服务器
规则: 使用从Layer创建的管理运行时来处理Node.js HTTP服务器中的请求。
好例子:
此示例创建一个简单服务器,具有Greeter服务。服务器启动,创建一个包含Greeter的运行时,然后使用该运行时处理请求。
import { HttpServer, HttpServerResponse } from "@effect/platform";
import { NodeHttpServer } from "@effect/platform-node";
import { Duration, Effect, Fiber, Layer } from "effect";
import { createServer } from "node:http";
// 使用Node的内置HTTP服务器创建服务器层
const ServerLive = NodeHttpServer.layer(() => createServer(), { port: 3001 });
// 定义您的HTTP应用(这里对每个请求响应"Hello World")
const app = Effect.gen(function* () {
yield* Effect.logInfo("接收到HTTP请求");
return yield* HttpServerResponse.text("Hello World");
});
const serverLayer = HttpServer.serve(app).pipe(Layer.provide(ServerLive));
const program = Effect.gen(function* () {
yield* Effect.logInfo("服务器启动于 http://localhost:3001");
const fiber = yield* Layer.launch(serverLayer).pipe(Effect.fork);
yield* Effect.sleep(Duration.seconds(2));
yield* Fiber.interrupt(fiber);
yield* Effect.logInfo("服务器关闭完成");
});
Effect.runPromise(program as unknown as Effect.Effect<void, unknown, never>);
反模式:
为每个传入请求创建新运行时或重建层。这非常低效,并违背了Effect的Layer系统的目的。
import * as http from "http";
import { Effect, Layer } from "effect";
import { GreeterLive } from "./somewhere";
// ❌ 错误:这在每个请求上重建GreeterLive层。
const server = http.createServer((_req, res) => {
const requestEffect = Effect.succeed("Hello!").pipe(
Effect.provide(GreeterLive) // 在此处提供层是低效的
);
Effect.runPromise(requestEffect).then((msg) => res.end(msg));
});
原理:
要构建HTTP服务器,创建一个主AppLayer,提供所有您的应用程序服务。在启动时将此层编译为管理Runtime。使用此运行时执行每个传入HTTP请求的Effect,确保所有逻辑都在Effect系统内处理。
此模式演示了长运行Effect应用程序的完整生命周期。
- 设置阶段: 您将所有应用程序依赖项(数据库连接、客户端、配置)定义为
Layer并将其组合成单个AppLayer。 - 运行时创建: 使用
Layer.toRuntime(AppLayer)创建高度优化的Runtime对象。这在服务器启动时完成_一次_。 - 请求处理: 对于每个传入请求,您创建一个
Effect,描述要完成的工作(例如,解析请求、调用服务、创建响应)。 - 执行: 您使用在设置阶段创建的
Runtime,通过Runtime.runPromise执行请求处理Effect。
此架构确保您的请求处理逻辑完全可测试,受益于结构化并发,并与服务器的设置和基础设施完全解耦。