名称: effect-patterns-testing 描述: Effect-TS 测试模式。在 Effect-TS 应用程序中进行测试时使用。
Effect-TS 模式:测试
本技能提供10个精选的 Effect-TS 测试模式。 在以下任务相关时使用此技能:
- 测试
- Effect-TS 应用程序中的最佳实践
- 真实世界的模式和解决方案
🟢 初级模式
您的第一个 Effect 测试
规则: 在测试中使用 Effect.runPromise 来运行和断言 Effect 结果。
好例子:
import { describe, it, expect } from "vitest"
import { Effect } from "effect"
// ============================================
// 待测试的代码
// ============================================
const add = (a: number, b: number): Effect.Effect<number> =>
Effect.succeed(a + b)
const divide = (a: number, b: number): Effect.Effect<number, Error> =>
b === 0
? Effect.fail(new Error("不能除以零"))
: Effect.succeed(a / b)
const fetchUser = (id: string): Effect.Effect<{ id: string; name: string }> =>
Effect.succeed({ id, name: `用户 ${id}` })
// ============================================
// 测试
// ============================================
describe("基本 Effect 测试", () => {
it("应该加两个数字", async () => {
const result = await Effect.runPromise(add(2, 3))
expect(result).toBe(5)
})
it("应该除数字", async () => {
const result = await Effect.runPromise(divide(10, 2))
expect(result).toBe(5)
})
it("应该在除以零时失败", async () => {
await expect(Effect.runPromise(divide(10, 0))).rejects.toThrow(
"不能除以零"
)
})
it("应该获取用户", async () => {
const user = await Effect.runPromise(fetchUser("123"))
expect(user).toEqual({
id: "123",
name: "用户 123",
})
})
})
// ============================================
// 测试 Effect.gen 程序
// ============================================
const calculateDiscount = (price: number, quantity: number) =>
Effect.gen(function* () {
if (price <= 0) {
return yield* Effect.fail(new Error("无效价格"))
}
const subtotal = price * quantity
const discount = quantity >= 10 ? 0.1 : 0
const total = subtotal * (1 - discount)
return { subtotal, discount, total }
})
describe("Effect.gen 测试", () => {
it("应该计算无折扣", async () => {
const result = await Effect.runPromise(calculateDiscount(10, 5))
expect(result.subtotal).toBe(50)
expect(result.discount).toBe(0)
expect(result.total).toBe(50)
})
it("应该应用批量折扣", async () => {
const result = await Effect.runPromise(calculateDiscount(10, 10))
expect(result.subtotal).toBe(100)
expect(result.discount).toBe(0.1)
expect(result.total).toBe(90)
})
it("应该在无效价格时失败", async () => {
await expect(
Effect.runPromise(calculateDiscount(-5, 10))
).rejects.toThrow("无效价格")
})
})
理由:
通过使用 Effect.runPromise 运行 Effect 程序并对结果使用标准测试断言来测试 Effect 程序。
测试 Effect 代码很简单:
- Effects 是值 - 在测试中像任何其他值一样构建它们
- 运行以获取结果 - 使用
Effect.runPromise执行 - 正常断言 - 标准断言适用于结果
测试带服务的 Effects
规则: 提供服务的测试实现以使 Effect 程序可测试。
好例子:
import { describe, it, expect } from "vitest"
import { Effect, Context } from "effect"
// ============================================
// 1. 定义一个服务
// ============================================
class UserRepository extends Context.Tag("UserRepository")<
UserRepository,
{
readonly findById: (id: string) => Effect.Effect<User | null>
readonly save: (user: User) => Effect.Effect<void>
}
>() {}
interface User {
id: string
name: string
email: string
}
// ============================================
// 2. 使用该服务的代码
// ============================================
const getUser = (id: string) =>
Effect.gen(function* () {
const repo = yield* UserRepository
const user = yield* repo.findById(id)
if (!user) {
return yield* Effect.fail(new Error(`用户 ${id} 未找到`))
}
return user
})
const createUser = (name: string, email: string) =>
Effect.gen(function* () {
const repo = yield* UserRepository
const user: User = {
id: crypto.randomUUID(),
name,
email,
}
yield* repo.save(user)
return user
})
// ============================================
// 3. 创建测试实现
// ============================================
const makeTestUserRepository = (initialUsers: User[] = []) => {
const users = new Map(initialUsers.map(u => [u.id, u]))
return UserRepository.of({
findById: (id) => Effect.succeed(users.get(id) ?? null),
save: (user) => Effect.sync(() => { users.set(user.id, user) }),
})
}
// ============================================
// 4. 编写测试
// ============================================
describe("用户服务测试", () => {
it("应该找到现有用户", async () => {
const testUser: User = {
id: "123",
name: "Alice",
email: "alice@example.com",
}
const testRepo = makeTestUserRepository([testUser])
const result = await Effect.runPromise(
getUser("123").pipe(
Effect.provideService(UserRepository, testRepo)
)
)
expect(result).toEqual(testUser)
})
it("应该在用户未找到时失败", async () => {
const testRepo = makeTestUserRepository([])
await expect(
Effect.runPromise(
getUser("999").pipe(
Effect.provideService(UserRepository, testRepo)
)
)
).rejects.toThrow("用户 999 未找到")
})
it("应该创建并保存用户", async () => {
const savedUsers: User[] = []
const trackingRepo = UserRepository.of({
findById: () => Effect.succeed(null),
save: (user) => Effect.sync(() => { savedUsers.push(user) }),
})
const result = await Effect.runPromise(
createUser("Bob", "bob@example.com").pipe(
Effect.provideService(UserRepository, trackingRepo)
)
)
expect(result.name).toBe("Bob")
expect(result.email).toBe("bob@example.com")
expect(savedUsers).toHaveLength(1)
expect(savedUsers[0].name).toBe("Bob")
})
})
理由:
当测试需要服务的 Effects 时,使用 Effect.provideService 或测试层提供测试实现。
Effect 的服务模式使测试变得容易:
- 声明依赖 - Effects 指定它们需要什么
- 注入测试替身 - 为测试提供假实现
- 无模拟库 - 只需提供不同的服务实现
- 类型安全 - 编译器确保您提供所有依赖
🟡 中级模式
使用 Clock 访问当前时间
规则: 使用 Clock 服务获取当前时间,使用 TestClock 实现确定性测试。
好例子:
这个例子展示了一个检查令牌是否过期的函数。其逻辑依赖于 Clock,使其完全可测试。
import { Effect, Clock, Duration } from "effect";
interface Token {
readonly value: string;
readonly expiresAt: number; // UTC 毫秒
}
// 这个函数是纯的且可测试的,因为它依赖于 Clock
const isTokenExpired = (
token: Token
): Effect.Effect<boolean, never, Clock.Clock> =>
Clock.currentTimeMillis.pipe(
Effect.map((now) => now > token.expiresAt),
Effect.tap((expired) =>
Clock.currentTimeMillis.pipe(
Effect.flatMap((currentTime) =>
Effect.log(
`令牌过期? ${expired} (当前时间: ${new Date(currentTime).toISOString()})`
)
)
)
)
);
// 创建一个推进时间的测试时钟服务
const makeTestClock = (timeMs: number): Clock.Clock => ({
currentTimeMillis: Effect.succeed(timeMs),
currentTimeNanos: Effect.succeed(BigInt(timeMs * 1_000_000)),
sleep: (duration: Duration.Duration) => Effect.succeed(void 0),
unsafeCurrentTimeMillis: () => timeMs,
unsafeCurrentTimeNanos: () => BigInt(timeMs * 1_000_000),
[Clock.ClockTypeId]: Clock.ClockTypeId,
});
// 创建一个在1秒后过期的令牌
const token = { value: "abc", expiresAt: Date.now() + 1000 };
// 使用不同的时钟检查令牌过期
const program = Effect.gen(function* () {
// 使用当前时间检查
yield* Effect.log("使用当前时间检查...");
yield* isTokenExpired(token);
// 使用过去时间检查
yield* Effect.log("
使用过去时间检查(1分钟前)...");
const pastClock = makeTestClock(Date.now() - 60_000);
yield* isTokenExpired(token).pipe(
Effect.provideService(Clock.Clock, pastClock)
);
// 使用未来时间检查
yield* Effect.log("
使用未来时间检查(1小时后)...");
const futureClock = makeTestClock(Date.now() + 3600_000);
yield* isTokenExpired(token).pipe(
Effect.provideService(Clock.Clock, futureClock)
);
});
// 使用默认时钟运行程序
Effect.runPromise(
program.pipe(Effect.provideService(Clock.Clock, makeTestClock(Date.now())))
);
反模式:
在业务逻辑中直接调用 Date.now()。这创建了一个不可靠测试的不纯函数。
import { Effect } from "effect";
interface Token {
readonly expiresAt: number;
}
// ❌ 错误:这个函数的行为每毫秒都在变化。
const isTokenExpiredUnsafely = (token: Token): Effect.Effect<boolean> =>
Effect.sync(() => Date.now() > token.expiresAt);
// 测试这个函数需要复杂地模拟全局 API 或将是不可确定的。
理由:
每当在 Effect 中需要获取当前时间时,不要直接调用 Date.now()。相反,依赖于 Clock 服务并使用其方法之一,例如 Clock.currentTimeMillis。
直接调用 Date.now() 使您的代码不纯并紧密耦合到系统时钟。这使得测试变得困难和不可靠,因为函数的输出每次运行都会变化。
Clock 服务是 Effect 解决这个问题的方法。它是“当前时间”的抽象。
- 在 生产 中,默认的
LiveClock实现使用真实的系统时间。 - 在 测试 中,您可以提供
TestClock层。这为您提供了一个虚拟时钟,您可以手动控制,允许您将时间设置为特定值或按特定持续时间推进。
这使得任何时间相关的逻辑都是纯的、确定性的,并且易于以完美精度测试。
编写适应应用程序代码的测试
规则: 编写适应应用程序代码的测试。
好例子:
import { Effect } from "effect";
// 定义我们的类型
interface User {
id: number;
name: string;
}
class NotFoundError extends Error {
readonly _tag = "NotFoundError";
constructor(readonly id: number) {
super(`用户 ${id} 未找到`);
}
}
// 定义数据库服务接口
interface DatabaseServiceApi {
getUserById: (id: number) => Effect.Effect<User, NotFoundError>;
}
// 使用模拟数据实现服务
class DatabaseService extends Effect.Service<DatabaseService>()(
"DatabaseService",
{
sync: () => ({
getUserById: (id: number) => {
// 模拟数据库查找
if (id === 404) {
return Effect.fail(new NotFoundError(id));
}
return Effect.succeed({ id, name: `用户 ${id}` });
},
}),
}
) {}
// 用于测试的服务实现
class TestDatabaseService extends Effect.Service<TestDatabaseService>()(
"TestDatabaseService",
{
sync: () => ({
getUserById: (id: number) => {
// 具有可预测响应的测试数据
const testUsers = [
{ id: 1, name: "测试用户 1" },
{ id: 2, name: "测试用户 2" },
{ id: 123, name: "用户 123" },
];
const user = testUsers.find((u) => u.id === id);
if (user) {
return Effect.succeed(user);
}
return Effect.fail(new NotFoundError(id));
},
}),
}
) {}
// 使用数据库服务的业务逻辑
const getUserWithFallback = (id: number) =>
Effect.gen(function* () {
const db = yield* DatabaseService;
return yield* Effect.gen(function* () {
const user = yield* db.getUserById(id);
return user;
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
if (error instanceof NotFoundError) {
yield* Effect.logInfo(`用户 ${id} 未找到,使用回退`);
return { id, name: `回退用户 ${id}` };
}
return yield* Effect.fail(error);
})
)
);
});
// 创建展示服务的程序
const program = Effect.gen(function* () {
yield* Effect.logInfo(
"=== 编写适应应用程序代码的测试演示 ==="
);
const db = yield* DatabaseService;
// 示例 1: 成功的用户查找
yield* Effect.logInfo("
1. 查找现有用户 123...");
const user = yield* Effect.gen(function* () {
try {
return yield* db.getUserById(123);
} catch (error) {
yield* Effect.logError(
`获取用户失败: ${error instanceof Error ? error.message : "未知错误"}`
);
return { id: -1, name: "错误" };
}
});
yield* Effect.logInfo(`找到用户: ${JSON.stringify(user)}`);
// 示例 2: 处理不存在的用户,具有适当的错误处理
yield* Effect.logInfo("
2. 查找不存在的用户 404...");
const notFoundUser = yield* Effect.gen(function* () {
try {
return yield* db.getUserById(404);
} catch (error) {
if (error instanceof NotFoundError) {
yield* Effect.logInfo(
`✅ 正确处理了 NotFoundError: ${error.message}`
);
return { id: 404, name: "未找到" };
}
yield* Effect.logError(
`意外错误: ${error instanceof Error ? error.message : "未知错误"}`
);
return { id: -1, name: "错误" };
}
});
yield* Effect.logInfo(`结果: ${JSON.stringify(notFoundUser)}`);
// 示例 3: 具有回退的业务逻辑
yield* Effect.logInfo("
3. 对缺失用户具有回退的业务逻辑:");
const userWithFallback = yield* getUserWithFallback(999);
yield* Effect.logInfo(
`具有回退的用户: ${JSON.stringify(userWithFallback)}`
);
// 示例 4: 使用不同服务实现进行测试
yield* Effect.logInfo("
4. 使用测试服务实现进行测试:");
yield* Effect.provide(
Effect.gen(function* () {
const testDb = yield* TestDatabaseService;
// 测试现有用户
const testUser1 = yield* Effect.gen(function* () {
try {
return yield* testDb.getUserById(1);
} catch (error) {
yield* Effect.logError(
`测试失败: ${error instanceof Error ? error.message : "未知错误"}`
);
return { id: -1, name: "测试错误" };
}
});
yield* Effect.logInfo(`测试用户 1: ${JSON.stringify(testUser1)}`);
// 测试不存在的用户
const testUser404 = yield* Effect.gen(function* () {
try {
return yield* testDb.getUserById(404);
} catch (error) {
yield* Effect.logInfo(
`✅ 测试服务正确抛出了 NotFoundError: ${error instanceof Error ? error.message : "未知错误"}`
);
return { id: 404, name: "测试未找到" };
}
});
yield* Effect.logInfo(`测试结果: ${JSON.stringify(testUser404)}`);
}),
TestDatabaseService.Default
);
yield* Effect.logInfo(
"
✅ 适应应用程序代码的测试演示完成!"
);
yield* Effect.logInfo(
"相同的业务逻辑适用于不同的服务实现!"
);
});
// 使用默认数据库服务运行程序
Effect.runPromise(
Effect.provide(program, DatabaseService.Default) as Effect.Effect<
void,
never,
never
>
);
解释:
测试应该反映代码的真实接口和行为,而不是强迫其更改。
反模式:
任何测试强制更改应用程序代码的行为。不要修改服务文件以添加方法仅仅因为测试需要它。如果测试失败,修复测试。
理由:
测试是次要的工件,用于验证应用程序。应用程序的代码和接口是真理的来源。当测试失败时,修复测试的逻辑或设置,而不是生产代码。
在测试期间将应用程序代码视为不可变的,防止引入错误和虚假的测试信心。测试的目的是验证真实世界的行为;更改该行为以适应测试会使其目的无效。
在测试中使用自动生成的 .Default 层
规则: 在测试中使用自动生成的 .Default 层。
好例子:
import { Effect } from "effect";
// 使用 Effect.Service 模式定义 MyService
class MyService extends Effect.Service<MyService>()("MyService", {
sync: () => ({
doSomething: () =>
Effect.succeed("完成").pipe(
Effect.tap(() => Effect.log("MyService 做了某事!"))
),
}),
}) {}
// 创建使用 MyService 的程序
const program = Effect.gen(function* () {
yield* Effect.log("获取 MyService...");
const service = yield* MyService;
yield* Effect.log("调用 doSomething()...");
const result = yield* service.doSomething();
yield* Effect.log(`结果: ${result}`);
});
// 使用默认服务实现运行程序
Effect.runPromise(Effect.provide(program, MyService.Default));
解释:
这种方法确保您的测试是惯用的、可维护的,并充分利用 Effect 的依赖注入系统。
反模式:
不要在测试中为您的服务创建手动层(Layer.succeed(...))或尝试直接提供服务类。这绕过了预期的依赖注入机制。
理由:
在您的测试中,使用 Effect.Service 自动附加到您的服务类的静态 .Default 属性来提供服务依赖。
.Default 层是在测试环境中提供服务的规范方式。它是自动创建的、正确范围的,并处理解析任何传递依赖,使测试更清洁和更健壮。
在测试中模拟依赖
规则: 通过测试特定的 Layer 提供模拟服务实现,以隔离被测单元。
好例子:
我们想要测试一个使用 EmailClient 发送电子邮件的 Notifier 服务。在我们的测试中,我们提供一个不实际发送电子邮件但只返回成功值的模拟 EmailClient。
import { Effect, Layer } from "effect";
// --- 服务 ---
interface EmailClientService {
send: (address: string, body: string) => Effect.Effect<void>;
}
class EmailClient extends Effect.Service<EmailClientService>()("EmailClient", {
sync: () => ({
send: (address: string, body: string) =>
Effect.sync(() => Effect.log(`发送电子邮件到 ${address}: ${body}`)),
}),
}) {}
interface NotifierService {
notifyUser: (userId: number, message: string) => Effect.Effect<void>;
}
class Notifier extends Effect.Service<NotifierService>()("Notifier", {
effect: Effect.gen(function* () {
const emailClient = yield* EmailClient;
return {
notifyUser: (userId: number, message: string) =>
emailClient.send(`user-${userId}@example.com`, message),
};
}),
dependencies: [EmailClient.Default],
}) {}
// 创建使用 Notifier 服务的程序
const program = Effect.gen(function* () {
yield* Effect.log("使用默认 EmailClient 实现...");
const notifier = yield* Notifier;
yield* notifier.notifyUser(123, "您的发票已准备就绪。");
// 创建记录不同的模拟 EmailClient
yield* Effect.log("
使用模拟 EmailClient 实现...");
const mockEmailClient = Layer.succeed(EmailClient, {
send: (address: string, body: string) =>
// 直接返回 Effect.log,而不嵌套在 Effect.sync 中
Effect.log(`模拟: 将发送到 ${address},内容: ${body}`),
} as EmailClientService);
// 使用模拟客户端运行相同的通知
yield* Effect.gen(function* () {
const notifier = yield* Notifier;
yield* notifier.notifyUser(123, "您的发票已准备就绪。");
}).pipe(Effect.provide(mockEmailClient));
});
// 运行程序
Effect.runPromise(Effect.provide(program, Notifier.Default));
反模式:
使用其依赖项的“实时”实现测试您的业务逻辑。这创建了集成测试,而不是单元测试。它将缓慢、不可靠,并可能具有真实世界的副作用(如实际发送电子邮件)。
import { Effect } from "effect";
import { NotifierLive } from "./somewhere";
import { EmailClientLive } from "./somewhere"; // 真实的电子邮件客户端
// ❌ 错误:这个测试将尝试发送真实的电子邮件。
it("发送真实电子邮件", () =>
Effect.gen(function* () {
const notifier = yield* Notifier;
yield* notifier.notifyUser(123, "这是一封测试电子邮件!");
}).pipe(
Effect.provide(NotifierLive),
Effect.provide(EmailClientLive), // 使用实时层使其成为集成测试
Effect.runPromise
));
理由:
要隔离测试一段代码,识别其服务依赖并使用测试特定的 Layer 提供模拟实现。创建模拟层的最常见方法是使用 Layer.succeed(ServiceTag, mockImplementation)。
单元测试的主要目标是验证单个代码单元的逻辑,独立于其外部依赖。Effect 的依赖注入系统旨在使这变得容易和类型安全。
通过在测试中提供模拟 Layer,您用返回可预测数据的假依赖替换真实依赖(如进行网络调用的 HttpClient)。这提供了几个关键好处:
- 确定性: 您的测试总是产生相同的结果,不受网络或数据库连接的脆弱性影响。
- 速度: 测试立即运行,无需等待缓慢的 I/O 操作。
- 类型安全: TypeScript 编译器确保您的模拟实现完美匹配真实服务的接口,防止您的测试变得过时。
- 明确性: 测试设置清楚地记录了代码运行所需的所有依赖。
🟠 高级模式
将层组织成可组合的模块
规则: 将服务组织成模块化的层,这些层分层组合以管理大型应用程序中的复杂性。
好例子:
这个例子展示了一个带有 Logger 的 BaseLayer,一个使用 Logger 的 UserModule,以及一个将它们连接起来的最终 AppLayer。
1. 基础基础设施层
// src/core/Logger.ts
import { Effect } from "effect";
export class Logger extends Effect.Service<Logger>()("App/Core/Logger", {
sync: () => ({
log: (msg: string) => Effect.log(`[日志] ${msg}`),
}),
}) {}
// src/features/User/UserRepository.ts
export class UserRepository extends Effect.Service<UserRepository>()(
"App/User/UserRepository",
{
// 定义使用 Logger 的实现
effect: Effect.gen(function* () {
const logger = yield* Logger;
return {
findById: (id: number) =>
Effect.gen(function* () {
yield* logger.log(`查找用户 ${id}`);
return { id, name: `用户 ${id}` };
}),
};
}),
// 声明 Logger 依赖
dependencies: [Logger.Default],
}
) {}
// 使用示例
const program = Effect.gen(function* () {
const repo = yield* UserRepository;
const user = yield* repo.findById(1);
return user;
});
// 使用默认实现运行
Effect.runPromise(Effect.provide(program, UserRepository.Default));
const programWithLogging = Effect.gen(function* () {
const result = yield* program;
yield* Effect.log(`程序结果: ${JSON.stringify(result)}`);
return result;
});
Effect.runPromise(Effect.provide(programWithLogging, UserRepository.Default));
2. 功能模块层
// src/core/Logger.ts
import { Effect } from "effect";
export class Logger extends Effect.Service<Logger>()("App/Core/Logger", {
sync: () => ({
log: (msg: string) => Effect.sync(() => console.log(`[日志] ${msg}`)),
}),
}) {}
// src/features/User/UserRepository.ts
export class UserRepository extends Effect.Service<UserRepository>()(
"App/User/UserRepository",
{
// 定义使用 Logger 的实现
effect: Effect.gen(function* () {
const logger = yield* Logger;
return {
findById: (id: number) =>
Effect.gen(function* () {
yield* logger.log(`查找用户 ${id}`);
return { id, name: `用户 ${id}` };
}),
};
}),
// 声明 Logger 依赖
dependencies: [Logger.Default],
}
) {}
// 使用示例
const program = Effect.gen(function* () {
const repo = yield* UserRepository;
const user = yield* repo.findById(1);
return user;
});
// 使用默认实现运行
Effect.runPromise(Effect.provide(program, UserRepository.Default)).then(
console.log
);
3. 最终应用程序组合
// src/layers.ts
import { Layer } from "effect";
import { BaseLayer } from "./core";
import { UserModuleLive } from "./features/User";
// import { ProductModuleLive } from "./features/Product";
const AllModules = Layer.mergeAll(UserModuleLive /*, ProductModuleLive */);
// 一次性向所有模块提供 BaseLayer,创建一个自包含的 AppLayer。
export const AppLayer = Layer.provide(AllModules, BaseLayer);
反模式:
大型应用程序的扁平组合策略。虽然起初简单,但很快变得难以管理。
// ❌ 这个文件在大型项目中变得庞大且难以导航。
const AppLayer = Layer.mergeAll(
LoggerLive,
ConfigLive,
DatabaseLive,
TracerLive,
UserServiceLive,
UserRepositoryLive,
ProductServiceLive,
ProductRepositoryLive,
BillingServiceLive
// ...以及 50 个其他服务
);
理由:
对于大型应用程序,避免单个扁平的服務列表。相反,通过创建分层结构您的应用程序:
BaseLayer: 提供应用程序范围的基础设施(Logger, Config, Database)。FeatureModule层: 提供特定业务域的服务(例如,UserModule,ProductModule)。这些依赖于BaseLayer。AppLayer: 通过向功能模块提供BaseLayer来组合它们的顶级层。
随着应用程序的增长,将所有服务合并到一个巨型层中的扁平组合策略变得笨拙且难以推理。可组合模块模式通过引入结构来解决这个问题。
这种方法创建了一个干净、可扩展和高度可测试的架构,其中复杂性包含在每个模块内。顶级组合成为应用程序架构的清晰、高级图表,并且功能模块可以通过向它们提供模拟的 BaseLayer 来隔离测试。
测试流式 Effects
规则: 使用 Stream.runCollect 和断言来验证流行为。
好例子:
import { describe, it, expect } from "vitest"
import { Effect, Stream, Chunk, Ref } from "effect"
describe("流测试", () => {
// ============================================
// 1. 测试基本流操作
// ============================================
it("应该转换流元素", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.map((n) => n * 2),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6, 8, 10])
})
it("应该过滤流元素", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5, 6]).pipe(
Stream.filter((n) => n % 2 === 0),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6])
})
// ============================================
// 2. 测试流聚合
// ============================================
it("应该将流折叠为单个值", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
)
expect(result).toBe(15)
})
it("应该计数流元素", async () => {
const count = await Effect.runPromise(
Stream.fromIterable(["a", "b", "c", "d"]).pipe(
Stream.runCount
)
)
expect(count).toBe(4)
})
// ============================================
// 3. 测试流中的错误处理
// ============================================
it("应该捕获流中的错误", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3]).pipe(
Stream.mapEffect((n) =>
n === 2
? Effect.fail(new Error("在 2 上失败"))
: Effect.succeed(n * 10)
),
Stream.catchAll((error) =>
Stream.succeed(-1) // 用哨兵替换错误
),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([10, -1])
})
it("应该处理错误并使用 orElse 继续", async () => {
const failingStream = Stream.fail(new Error("主要失败"))
const fallbackStream = Stream.fromIterable([1, 2, 3])
const result = await Effect.runPromise(
failingStream.pipe(
Stream.orElse(() => fallbackStream),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
})
// ============================================
// 4. 测试流分块
// ============================================
it("应该将流元素分块", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.grouped(2),
Stream.runCollect
)
)
const chunks = Chunk.toReadonlyArray(result).map(Chunk.toReadonlyArray)
expect(chunks).toEqual([[1, 2], [3, 4], [5]])
})
// ============================================
// 5. 测试带效果的流
// ============================================
it("应该为每个元素运行效果", async () => {
const processed: number[] = []
await Effect.runPromise(
Stream.fromIterable([1, 2, 3]).pipe(
Stream.tap((n) =>
Effect.sync(() => {
processed.push(n)
})
),
Stream.runDrain
)
)
expect(processed).toEqual([1, 2, 3])
})
// ============================================
// 6. 测试流资源管理
// ============================================
it("应该在完成时释放资源", async () => {
const acquired: string[] = []
const released: string[] = []
const managedStream = Stream.acquireRelease(
Effect.gen(function* () {
acquired.push("资源")
return "资源"
}),
() =>
Effect.sync(() => {
released.push("资源")
})
).pipe(
Stream.flatMap(() => Stream.fromIterable([1, 2, 3]))
)
await Effect.runPromise(Stream.runDrain(managedStream))
expect(acquired).toEqual(["资源"])
expect(released).toEqual(["资源"])
})
it("应该在错误时释放资源", async () => {
const released: string[] = []
const managedStream = Stream.acquireRelease(
Effect.succeed("资源"),
() => Effect.sync(() => { released.push("已释放") })
).pipe(
Stream.flatMap(() =>
Stream.fromEffect(Effect.fail(new Error("哎呀")))
)
)
await Effect.runPromise(
Stream.runDrain(managedStream).pipe(
Effect.catchAll(() => Effect.void)
)
)
expect(released).toEqual(["已释放"])
})
// ============================================
// 7. 测试流时序与 take/drop
// ============================================
it("应该获取前 N 个元素", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.take(3),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
})
it("应该丢弃前 N 个元素", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.drop(2),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([3, 4, 5])
})
// ============================================
// 8. 测试流合并
// ============================================
it("应该合并流", async () => {
const stream1 = Stream.fromIterable([1, 3, 5])
const stream2 = Stream.fromIterable([2, 4, 6])
const result = await Effect.runPromise(
Stream.merge(stream1, stream2).pipe(
Stream.runCollect
)
)
const array = Chunk.toReadonlyArray(result)
expect(array).toHaveLength(6)
expect(array).toContain(1)
expect(array).toContain(6)
})
})
理由:
通过收集结果并验证转换、错误处理和资源管理来测试流。
流测试验证:
- 转换 - map, filter, flatMap 正常工作
- 错误处理 - 失败被捕获和处理
- 资源安全 - 资源被释放
- 背压 - 数据流被控制
测试并发代码
规则: 使用 TestClock 和受控并发性使并发测试具有确定性。
好例子:
import { describe, it, expect } from "vitest"
import { Effect, Fiber, Ref, TestClock, Duration, Deferred } from "effect"
describe("并发代码测试", () => {
// ============================================
// 1. 测试并行执行
// ============================================
it("应该并行运行效果", async () => {
const executionOrder: string[] = []
const task1 = Effect.gen(function* () {
yield* Effect.sleep("100 毫秒")
executionOrder.push("任务1")
return 1
})
const task2 = Effect.gen(function* () {
yield* Effect.sleep("50 毫秒")
executionOrder.push("任务2")
return 2
})
const program = Effect.all([task1, task2], { concurrency: 2 })
// 使用 TestClock 控制时间
const result = await Effect.runPromise(
Effect.gen(function* () {
const fiber = yield* Effect.fork(program)
// 推进时间以触发两个任务
yield* TestClock.adjust("100 毫秒")
return yield* Fiber.join(fiber)
}).pipe(Effect.provide(TestClock.live))
)
expect(result).toEqual([1, 2])
// 使用真实时间,任务2会先完成
expect(executionOrder).toContain("任务1")
expect(executionOrder).toContain("任务2")
})
// ============================================
// 2. 测试竞争条件
// ============================================
it("应该正确处理竞争条件", async () => {
const counter = await Effect.runPromise(
Effect.gen(function* () {
const ref = yield* Ref.make(0)
// 模拟并发增量
const increment = Ref.update(ref, (n) => n + 1)
// 运行 100 个并发增量
yield* Effect.all(
Array.from({ length: 100 }, () => increment),
{ concurrency: "unbounded" }
)
return yield* Ref.get(ref)
})
)
// Ref 是原子的,所以所有增量都应该被计数
expect(counter).toBe(100)
})
// ============================================
// 3. 测试受控光纤执行
// ============================================
it("应该测试光纤生命周期", async () => {
const events: string[] = []
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.gen(function* () {
events.push("已启动")
yield* Effect.sleep("1 秒")
events.push("已完成")
return "结果"
})
)
events.push("已分支")
// 中断光纤
yield* Fiber.interrupt(fiber)
events.push("已中断")
const exit = yield* Fiber.await(fiber)
return exit
})
await Effect.runPromise(program)
expect(events).toEqual(["已分支", "已启动", "已中断"])
expect(events).not.toContain("已完成")
})
// ============================================
// 4. 测试超时行为
// ============================================
it("应该对慢操作超时", async () => {
const slowOperation = Effect.gen(function* () {
yield* Effect.sleep("10 秒")
return "已完成"
})
const result = await Effect.runPromise(
Effect.gen(function* () {
const fiber = yield* Effect.fork(
slowOperation.pipe(Effect.timeout("1 秒"))
)
// 超过超时时间
yield* TestClock.adjust("2 秒")
return yield* Fiber.join(fiber)
}).pipe(Effect.provide(TestClock.live))
)
// 由于超时,结果是 Option.None
expect(result._tag).toBe("None")
})
// ============================================
// 5. 使用 Deferred 进行同步测试
// ============================================
it("应该正确同步光纤", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const deferred = yield* Deferred.make<string>()
const results: string[] = []
// 消费者等待生产者
const consumer = Effect.fork(
Effect.gen(function* () {
const value = yield* Deferred.await(deferred)
results.push(`已消费: ${value}`)
})
)
// 生产者完成 deferred
const producer = Effect.gen(function* () {
results.push("生产中")
yield* Deferred.succeed(deferred, "数据")
results.push("已生产")
})
yield* consumer
yield* producer
// 等待消费者处理
yield* Effect.sleep("10 毫秒")
return results
})
)
expect(result).toContain("生产中")
expect(result).toContain("已生产")
expect(result).toContain("已消费: 数据")
})
// ============================================
// 6. 测试无死锁
// ============================================
it("应该使用适当的资源顺序避免死锁", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const ref1 = yield* Ref.make(0)
const ref2 = yield* Ref.make(0)
// 两个光纤以相同顺序访问 refs(无死锁)
const fiber1 = yield* Effect.fork(
Effect.gen(function* () {
yield* Ref.update(ref1, (n) => n + 1)
yield* Ref.update(ref2, (n) => n + 1)
})
)
const fiber2 = yield* Effect.fork(
Effect.gen(function* () {
yield* Ref.update(ref1, (n) => n + 1)
yield* Ref.update(ref2, (n) => n + 1)
})
)
yield* Fiber.join(fiber1)
yield* Fiber.join(fiber2)
return [yield* Ref.get(ref1), yield* Ref.get(ref2)]
}).pipe(Effect.timeout("1 秒"))
)
expect(result._tag).toBe("Some")
expect(result.value).toEqual([2, 2])
})
})
理由:
使用 Effect 的 TestClock 和光纤控制使并发测试具有确定性和可重复性。
并发代码难以测试:
- 非确定性 - 不同运行,不同结果
- 竞争条件 - 时间相关的错误
- 死锁 - 难以重现
- 不稳定的测试 - 有时通过,有时失败
Effect 的测试实用程序提供对时序和并发性的控制。
使用 Effect 进行基于属性的测试
规则: 使用基于属性的测试来发现您的基于示例的测试遗漏的边缘情况。
好例子:
import { describe, it, expect } from "vitest"
import { Effect, Option, Either, Schema } from "effect"
import * as fc from "fast-check"
describe("使用 Effect 进行基于属性的测试", () => {
// ============================================
// 1. 测试纯函数属性
// ============================================
it("应该满足数组反转属性", () => {
fc.assert(
fc.property(fc.array(fc.integer()), (arr) => {
// 反转两次返回原始
const reversed = arr.slice().reverse()
const doubleReversed = reversed.slice().reverse()
return JSON.stringify(arr) === JSON.stringify(doubleReversed)
})
)
})
it("应该满足排序幂等性", () => {
fc.assert(
fc.property(fc.array(fc.integer()), (arr) => {
const sorted = arr.slice().sort((a, b) => a - b)
const sortedTwice = sorted.slice().sort((a, b) => a - b)
return JSON.stringify(sorted) === JSON.stringify(sortedTwice)
})
)
})
// ============================================
// 2. 测试 Effect 操作
// ============================================
it("应该先 map 然后 flatMap 等于 flatMap 内部映射", async () => {
await fc.assert(
fc.asyncProperty(fc.integer(), async (n) => {
const f = (x: number) => x * 2
const g = (x: number) => Effect.succeed(x + 1)
// map 然后 flatMap
const result1 = await Effect.runPromise(
Effect.succeed(n).pipe(
Effect.map(f),
Effect.flatMap(g)
)
)
// flatMap 内部映射
const result2 = await Effect.runPromise(
Effect.succeed(n).pipe(
Effect.flatMap((x) => g(f(x)))
)
)
return result1 === result2
})
)
})
// ============================================
// 3. 测试 Option 属性
// ============================================
it("应该满足 Option map 恒等性", () => {
fc.assert(
fc.property(fc.option(fc.integer(), { nil: undefined }), (maybeN) => {
const option = maybeN === undefined ? Option.none() : Option.some(maybeN)
// 映射恒等函数返回相同的 Option
const mapped = Option.map(option, (x) => x)
return Option.getOrElse(option, () => -1) ===
Option.getOrElse(mapped, () => -1)
})
)
})
// ============================================
// 4. 测试 Schema 编码/解码往返
// ============================================
it("应该通过 Schema 往返", async () => {
const UserSchema = Schema.Struct({
name: Schema.String,
age: Schema.Number.pipe(Schema.int(), Schema.positive()),
})
const userArbitrary = fc.record({
name: fc.string({ minLength: 1 }),
age: fc.integer({ min: 1, max: 120 }),
})
await fc.assert(
fc.asyncProperty(userArbitrary, async (user) => {
const encode = Schema.encode(UserSchema)
const decode = Schema.decode(UserSchema)
// 编码然后解码应该返回等效值
const encoded = await Effect.runPromise(encode(user))
const decoded = await Effect.runPromise(decode(encoded))
return decoded.name === user.name && decoded.age === user.age
})
)
})
// ============================================
// 5. 测试错误处理属性
// ============================================
it("应该从任何错误恢复", async () => {
await fc.assert(
fc.asyncProperty(
fc.string(),
fc.string(),
async (errorMsg, fallback) => {
const failing = Effect.fail(new Error(errorMsg))
const result = await Effect.runPromise(
failing.pipe(
Effect.catchAll(() => Effect.succeed(fallback))
)
)
return result === fallback
}
)
)
})
// ============================================
// 6. 域类型的自定义生成器
// ============================================
interface Email {
readonly _tag: "Email"
readonly value: string
}
const emailArbitrary = fc.emailAddress().map((value): Email => ({
_tag: "Email",
value,
}))
interface UserId {
readonly _tag: "UserId"
readonly value: string
}
const userIdArbitrary = fc.uuid().map((value): UserId => ({
_tag: "UserId",
value,
}))
it("应该正确处理域类型", () => {
fc.assert(
fc.property(emailArbitrary, userIdArbitrary, (email, userId) => {
// 使用生成的域类型测试您的域函数
return email.value.includes("@") && userId.value.length > 0
})
)
})
// ============================================
// 7. 测试代数属性
// ============================================
it("应该满足字符串连接的幺半群属性", () => {
const empty = ""
const concat = (a: string, b: string) => a + b
fc.assert(
fc.property(fc.string(), fc.string(), fc.string(), (a, b, c) => {
// 恒等性: empty + a = a = a + empty
const leftIdentity = concat(empty, a) === a
const rightIdentity = concat(a, empty) === a
// 结合性: (a + b) + c = a + (b + c)
const associative = concat(concat(a, b), c) === concat(a, concat(b, c))
return leftIdentity && rightIdentity && associative
})
)
})
// ============================================
// 8. 测试带约束
// ============================================
it("应该处理正数", () => {
fc.assert(
fc.property(
fc.integer({ min: 1, max: 1000000 }),
fc.integer({ min: 1, max: 1000000 }),
(a, b) => {
// 正数的除法是正数
const result = a / b
return result > 0
}
)
)
})
})
理由:
使用 fast-check 进行基于属性的测试以测试不变量并自动发现边缘情况。
基于属性的测试发现基于示例的测试遗漏的错误:
- 边缘情况 - 空数组、负数、Unicode
- 不变量 - 应始终持有的属性
- 收缩 - 最小的失败示例
- 覆盖 - 从一个测试中获得许多输入