集成测试与TestContainers
何时使用此技能
使用此技能时:
- 编写需要真实基础设施(数据库、缓存、消息队列)的集成测试
- 对实际数据库的数据访问层进行测试
- 验证消息队列集成
- 测试Redis缓存行为
- 避免对基础设施组件使用模拟
- 确保测试在类似生产的环境中工作
- 测试数据库迁移和架构变更
核心原则
- 真实基础设施优于模拟 - 使用容器中的实际数据库/服务,而不是模拟
- 测试隔离 - 每个测试获得新的容器或新数据
- 自动清理 - TestContainers处理容器生命周期和清理
- 快速启动 - 在同一个类中的测试之间重用容器,当适当时
- CI/CD兼容 - 在Docker支持的CI环境中无缝工作
- 端口随机化 - 容器使用随机端口以避免冲突
为什么选择TestContainers而不是模拟?
❌ 模拟基础设施的问题
// 糟糕:模拟数据库
public class OrderRepositoryTests
{
private readonly Mock<IDbConnection> _mockDb = new();
[Fact]
public async Task GetOrder_ReturnsOrder()
{
// 这不会测试实际的SQL行为、约束或性能
_mockDb.Setup(db => db.QueryAsync<Order>(It.IsAny<string>()))
.ReturnsAsync(new[] { new Order { Id = 1 } });
var repo = new OrderRepository(_mockDb.Object);
var order = await repo.GetOrderAsync(1);
Assert.NotNull(order);
}
}
问题:
- 不测试实际的SQL查询
- 错过数据库约束、索引和性能问题
- 可能给出错误的自信
- 不会捕获SQL语法错误或架构不匹配
✅ 更好:使用真实数据库的TestContainers
// 好:针对真实数据库进行测试
public class OrderRepositoryTests : IAsyncLifetime
{
private readonly TestcontainersContainer _dbContainer;
private IDbConnection _connection;
public OrderRepositoryTests()
{
_dbContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
.WithEnvironment("ACCEPT_EULA", "Y")
.WithEnvironment("SA_PASSWORD", "Your_password123")
.WithPortBinding(1433, true)
.Build();
}
public async Task InitializeAsync()
{
await _dbContainer.StartAsync();
var port = _dbContainer.GetMappedPublicPort(1433);
var connectionString = $"Server=localhost,{port};Database=TestDb;User Id=sa;Password=Your_password123;TrustServerCertificate=true";
_connection = new SqlConnection(connectionString);
await _connection.OpenAsync();
// 运行迁移
await RunMigrationsAsync(_connection);
}
public async Task DisposeAsync()
{
await _connection.DisposeAsync();
await _dbContainer.DisposeAsync();
}
[Fact]
public async Task GetOrder_WithRealDatabase_ReturnsOrder()
{
// 安排:插入真实的测试数据
await _connection.ExecuteAsync(
"INSERT INTO Orders (Id, CustomerId, Total) VALUES (1, 'CUST1', 100.00)");
var repo = new OrderRepository(_connection);
// 行动:对真实数据库执行
var order = await repo.GetOrderAsync(1);
// 断言:验证实际数据库行为
Assert.NotNull(order);
Assert.Equal(1, order.Id);
Assert.Equal("CUST1", order.CustomerId);
Assert.Equal(100.00m, order.Total);
}
}
好处:
- 测试真实的SQL查询和数据库行为
- 捕获约束违规、索引问题和性能问题
- 验证迁移是否正确工作
- 对数据访问层给出真正的信心
必需的NuGet包
<ItemGroup>
<PackageReference Include="Testcontainers" Version="*" />
<PackageReference Include="xunit" Version="*" />
<PackageReference Include="xunit.runner.visualstudio" Version="*" />
<!-- 数据库特定包 -->
<PackageReference Include="Microsoft.Data.SqlClient" Version="*" />
<PackageReference Include="Npgsql" Version="*" /> <!-- 对于PostgreSQL -->
<PackageReference Include="MySqlConnector" Version="*" /> <!-- 对于MySQL -->
<!-- 其他基础设施 -->
<PackageReference Include="StackExchange.Redis" Version="*" /> <!-- 对于Redis -->
<PackageReference Include="RabbitMQ.Client" Version="*" /> <!-- 对于RabbitMQ -->
</ItemGroup>
模式1:SQL Server集成测试
using Testcontainers;
using Xunit;
public class SqlServerTests : IAsyncLifetime
{
private readonly TestcontainersContainer _dbContainer;
private IDbConnection _db;
public SqlServerTests()
{
_dbContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
.WithEnvironment("ACCEPT_EULA", "Y")
.WithEnvironment("SA_PASSWORD", "Your_password123")
.WithPortBinding(1433, true)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(1433))
.Build();
}
public async Task InitializeAsync()
{
await _dbContainer.StartAsync();
var port = _dbContainer.GetMappedPublicPort(1433);
var connectionString = $"Server=localhost,{port};Database=master;User Id=sa;Password=Your_password123;TrustServerCertificate=true";
_db = new SqlConnection(connectionString);
await _db.OpenAsync();
// 创建测试数据库
await _db.ExecuteAsync("CREATE DATABASE TestDb");
await _db.ExecuteAsync("USE TestDb");
// 运行架构迁移
await _db.ExecuteAsync(@"
CREATE TABLE Orders (
Id INT PRIMARY KEY,
CustomerId NVARCHAR(50) NOT NULL,
Total DECIMAL(18,2) NOT NULL,
CreatedAt DATETIME2 DEFAULT GETUTCDATE()
)");
}
public async Task DisposeAsync()
{
await _db.DisposeAsync();
await _dbContainer.DisposeAsync();
}
[Fact]
public async Task CanInsertAndRetrieveOrder()
{
// 安排
await _db.ExecuteAsync(@"
INSERT INTO Orders (Id, CustomerId, Total)
VALUES (1, 'CUST001', 99.99)");
// 行动
var order = await _db.QuerySingleAsync<Order>(
"SELECT * FROM Orders WHERE Id = @Id",
new { Id = 1 });
// 断言
Assert.Equal(1, order.Id);
Assert.Equal("CUST001", order.CustomerId);
Assert.Equal(99.99m, order.Total);
}
}
模式2:PostgreSQL集成测试
public class PostgreSqlTests : IAsyncLifetime
{
private readonly TestcontainersContainer _dbContainer;
private NpgsqlConnection _connection;
public PostgreSqlTests()
{
_dbContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("postgres:latest")
.WithEnvironment("POSTGRES_PASSWORD", "postgres")
.WithEnvironment("POSTGRES_DB", "testdb")
.WithPortBinding(5432, true)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(5432))
.Build();
}
public async Task InitializeAsync()
{
await _dbContainer.StartAsync();
var port = _dbContainer.GetMappedPublicPort(5432);
var connectionString = $"Host=localhost;Port={port};Database=testdb;Username=postgres;Password=postgres";
_connection = new NpgsqlConnection(connectionString);
await _connection.OpenAsync();
// 创建架构
await _connection.ExecuteAsync(@"
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
total NUMERIC(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)");
}
public async Task DisposeAsync()
{
await _connection.DisposeAsync();
await _dbContainer.DisposeAsync();
}
[Fact]
public async Task PostgreSql_ShouldHandleTransactions()
{
using var transaction = await _connection.BeginTransactionAsync();
await _connection.ExecuteAsync(
"INSERT INTO orders (customer_id, total) VALUES (@CustomerId, @Total)",
new { CustomerId = "CUST1", Total = 100.00m },
transaction);
await transaction.RollbackAsync();
var count = await _connection.QuerySingleAsync<int>(
"SELECT COUNT(*) FROM orders");
Assert.Equal(0, count); // 回滚应防止插入
}
}
模式3:Redis集成测试
public class RedisTests : IAsyncLifetime
{
private readonly TestcontainersContainer _redisContainer;
private IConnectionMultiplexer _redis;
public RedisTests()
{
_redisContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("redis:alpine")
.WithPortBinding(6379, true)
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(6379))
.Build();
}
public async Task InitializeAsync()
{
await _redisContainer.StartAsync();
var port = _redisContainer.GetMappedPublicPort(6379);
_redis = await ConnectionMultiplexer.ConnectAsync($"localhost:{port}");
}
public async Task DisposeAsync()
{
await _redis.DisposeAsync();
await _redisContainer.DisposeAsync();
}
[Fact]
public async Task Redis_ShouldCacheValues()
{
var db = _redis.GetDatabase();
// 设置值
await db.StringSetAsync("key1", "value1");
// 获取值
var value = await db.StringGetAsync("key1");
Assert.Equal("value1", value.ToString());
}
[Fact]
public async Task Redis_ShouldExpireKeys()
{
var db = _redis.GetDatabase();
await db.StringSetAsync("temp-key", "temp-value",
expiry: TimeSpan.FromSeconds(1));
// 密钥应该存在
Assert.True(await db.KeyExistsAsync("temp-key"));
// 等待过期
await Task.Delay(1100);
// 密钥应该消失
Assert.False(await db.KeyExistsAsync("temp-key"));
}
}
模式4:RabbitMQ集成测试
public class RabbitMqTests : IAsyncLifetime
{
private readonly TestcontainersContainer _rabbitContainer;
private IConnection _connection;
public RabbitMqTests()
{
_rabbitContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("rabbitmq:management-alpine")
.WithPortBinding(5672, true) // AMQP
.WithPortBinding(15672, true) // 管理UI
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(5672))
.Build();
}
public async Task InitializeAsync()
{
await _rabbitContainer.StartAsync();
var port = _rabbitContainer.GetMappedPublicPort(5672);
var factory = new ConnectionFactory
{
HostName = "localhost",
Port = port,
UserName = "guest",
Password = "guest"
};
_connection = await factory.CreateConnectionAsync();
}
public async Task DisposeAsync()
{
await _connection.CloseAsync();
await _rabbitContainer.DisposeAsync();
}
[Fact]
public async Task RabbitMq_ShouldPublishAndConsumeMessage()
{
using var channel = await _connection.CreateChannelAsync();
var queueName = "test-queue";
await channel.QueueDeclareAsync(queueName, durable: false,
exclusive: false, autoDelete: true);
// 发布消息
var message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "",
routingKey: queueName,
body: body);
// 消费消息
var consumer = new EventingBasicConsumer(channel);
var tcs = new TaskCompletionSource<string>();
consumer.Received += (model, ea) =>
{
var receivedMessage = Encoding.UTF8.GetString(ea.Body.ToArray());
tcs.SetResult(receivedMessage);
};
await channel.BasicConsumeAsync(queueName, autoAck: true,
consumer: consumer);
// 等待消息
var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(message, received);
}
}
模式5:多容器网络
当你需要多个容器通信时:
public class MultiContainerTests : IAsyncLifetime
{
private readonly INetwork _network;
private readonly TestcontainersContainer _dbContainer;
private readonly TestcontainersContainer _redisContainer;
public MultiContainerTests()
{
_network = new TestcontainersNetworkBuilder()
.Build();
_dbContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("postgres:latest")
.WithNetwork(_network)
.WithNetworkAliases("db")
.WithEnvironment("POSTGRES_PASSWORD", "postgres")
.Build();
_redisContainer = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("redis:alpine")
.WithNetwork(_network)
.WithNetworkAliases("redis")
.Build();
}
public async Task InitializeAsync()
{
await _network.CreateAsync();
await Task.WhenAll(
_dbContainer.StartAsync(),
_redisContainer.StartAsync());
}
public async Task DisposeAsync()
{
await Task.WhenAll(
_dbContainer.DisposeAsync().AsTask(),
_redisContainer.DisposeAsync().AsTask());
await _network.DisposeAsync();
}
[Fact]
public async Task Containers_CanCommunicate()
{
// 两个容器都可以通过网络别名相互到达
// db -> redis://redis:6379
// redis -> postgres://db:5432
}
}
模式6:跨测试重用容器
为了更快的测试执行,跨测试在类中重用容器:
[Collection("Database collection")]
public class FastDatabaseTests
{
private readonly DatabaseFixture _fixture;
public FastDatabaseTests(DatabaseFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task Test1()
{
// 使用 _fixture.Connection
// 如果需要,在测试后清理数据
}
[Fact]
public async Task Test2()
{
// 重用同一个容器
}
}
// 共享固定装置
public class DatabaseFixture : IAsyncLifetime
{
private readonly TestcontainersContainer _container;
public IDbConnection Connection { get; private set; }
public DatabaseFixture()
{
_container = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
.WithEnvironment("ACCEPT_EULA", "Y")
.WithEnvironment("SA_PASSWORD", "Your_password123")
.WithPortBinding(1433, true)
.Build();
}
public async Task InitializeAsync()
{
await _container.StartAsync();
// 设置连接
}
public async Task DisposeAsync()
{
await Connection.DisposeAsync();
await _container.DisposeAsync();
}
}
[CollectionDefinition("Database collection")]
public class DatabaseCollection : ICollectionFixture<DatabaseFixture> { }
模式7:使用真实数据库测试迁移
public class MigrationTests : IAsyncLifetime
{
private readonly TestcontainersContainer _container;
private string _connectionString;
public async Task InitializeAsync()
{
_container = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
.WithEnvironment("ACCEPT_EULA", "Y")
.WithEnvironment("SA_PASSWORD", "Your_password123")
.WithPortBinding(1433, true)
.Build();
await _container.StartAsync();
var port = _container.GetMappedPublicPort(1433);
_connectionString = $"Server=localhost,{port};Database=TestDb;User Id=sa;Password=Your_password123;TrustServerCertificate=true";
}
[Fact]
public async Task Migrations_ShouldRunSuccessfully()
{
// 运行Entity Framework迁移
var optionsBuilder = new DbContextOptionsBuilder<AppDbContext>();
optionsBuilder.UseSqlServer(_connectionString);
using var context = new AppDbContext(optionsBuilder.Options);
// 应用迁移
await context.Database.MigrateAsync();
// 验证架构
var canConnect = await context.Database.CanConnectAsync();
Assert.True(canConnect);
// 验证表存在
var tables = await context.Database.SqlQueryRaw<string>(
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES").ToListAsync();
Assert.Contains("Orders", tables);
Assert.Contains("Customers", tables);
}
public async Task DisposeAsync()
{
await _container.DisposeAsync();
}
}
最佳实践
- 始终使用IAsyncLifetime - 适当的异步设置和拆除
- 等待端口可用性 - 使用
WaitStrategy确保容器已准备好 - 使用随机端口 - 让TestContainers自动分配端口
- 在测试之间清理数据 - 使用新的容器或截断表
- 尽可能重用容器 - 比创建新容器更快
- 测试真实查询 - 不要只测试模拟;验证实际的SQL行为
- 验证约束 - 测试外键、唯一约束、索引
- 测试事务 - 验证回滚和提交行为
- 使用现实数据 - 用类似生产的数据量进行测试
- 处理清理 - 总是在
DisposeAsync中处置容器
常见问题和解决方案
问题1:容器启动超时
问题: 容器启动时间过长
解决方案:
_container = new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("postgres:latest")
.WithWaitStrategy(Wait.ForUnixContainer()
.UntilPortIsAvailable(5432)
.WithTimeout(TimeSpan.FromMinutes(2)))
.Build();
问题2:端口已在使用中
问题: 测试因端口已绑定而失败
解决方案: 始终使用随机端口映射:
.WithPortBinding(5432, true) // true = 分配随机公共端口
问题3:容器未清理
问题: 容器在测试后仍在运行
解决方案: 确保正确处置:
public async Task DisposeAsync()
{
await _connection?.DisposeAsync();
await _container?.DisposeAsync();
}
问题4:CI中的测试失败但在本地通过
问题: CI环境没有Docker
解决方案: 确保CI支持Docker:
# GitHub Actions
runs-on: ubuntu-latest # 预安装Docker
services:
docker:
image: docker:dind
CI/CD集成
GitHub Actions
name: 集成测试
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest # 预安装Docker
steps:
- uses: actions/checkout@v3
- name: 设置.NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 9.0.x
- name: 运行集成测试
run: |
dotnet test tests/YourApp.IntegrationTests \
--filter Category=Integration \
--logger trx
- name: 清理容器
if: always()
run: docker container prune -f
模式8:使用Respawn重置数据库
当跨测试重用容器时,使用Respawn在测试之间重置数据库状态,而不是重新创建容器:
<PackageReference Include="Respawn" Version="*" />
基本Respawn设置
using Respawn;
public class DatabaseFixture : IAsyncLifetime
{
private readonly TestcontainersContainer _container;
private Respawner _respawner = null!;
public NpgsqlConnection Connection { get; private set; } = null!;
public string ConnectionString { get; private set; } = null!;
public async Task InitializeAsync()
{
await _container.StartAsync();
var port = _container.GetMappedPublicPort(5432);
ConnectionString = $"Host=localhost;Port={port};Database=testdb;Username=postgres;Password=postgres";
Connection = new NpgsqlConnection(ConnectionString);
await Connection.OpenAsync();
// 首先运行迁移
await RunMigrationsAsync();
// 在架构存在后创建respawner
_respawner = await Respawner.CreateAsync(ConnectionString, new RespawnerOptions
{
TablesToIgnore = new Table[]
{
"__EFMigrationsHistory", // EF Core迁移表
"AspNetRoles", // 身份角色(种子数据)
"schema_version" // DbUp/Flyway版本表
},
DbAdapter = DbAdapter.Postgres
});
}
/// <summary>
/// 重置数据库为清洁状态。在测试设置或测试之间调用此方法。
/// </summary>
public async Task ResetDatabaseAsync()
{
await _respawner.ResetAsync(ConnectionString);
}
public async Task DisposeAsync()
{
await Connection.DisposeAsync();
await _container.DisposeAsync();
}
}
在测试中使用Respawn
[Collection("Database collection")]
public class OrderTests : IAsyncLifetime
{
private readonly DatabaseFixture _fixture;
public OrderTests(DatabaseFixture fixture)
{
_fixture = fixture;
}
public async Task InitializeAsync()
{
// 每个测试前重置数据库
await _fixture.ResetDatabaseAsync();
}
public Task DisposeAsync() => Task.CompletedTask;
[Fact]
public async Task CreateOrder_ShouldPersist()
{
// 数据库是干净的 - 没有其他测试留下的数据
await _fixture.Connection.ExecuteAsync(
"INSERT INTO orders (customer_id, total) VALUES (@CustomerId, @Total)",
new { CustomerId = "CUST1", Total = 100.00m });
var count = await _fixture.Connection.QuerySingleAsync<int>(
"SELECT COUNT(*) FROM orders");
Assert.Equal(1, count);
}
[Fact]
public async Task AnotherTest_StartsWithCleanDatabase()
{
// 这个测试也从空表开始
var count = await _fixture.Connection.QuerySingleAsync<int>(
"SELECT COUNT(*) FROM orders");
Assert.Equal(0, count); // 干净的石板!
}
}
Respawn选项
var respawner = await Respawner.CreateAsync(connectionString, new RespawnerOptions
{
// 保留的表(参考数据、迁移历史)
TablesToIgnore = new Table[]
{
"__EFMigrationsHistory",
new Table("public", "lookup_data"), // 架构限定
},
// 要清理的架构(默认:所有架构)
SchemasToInclude = new[] { "public", "app" },
// 或排除特定架构
SchemasToExclude = new[] { "audit", "logging" },
// 数据库适配器
DbAdapter = DbAdapter.Postgres, // 或SqlServer, MySql
// 处理循环外键
WithReseed = true // 重置身份列(SQL Server)
});
为什么选择Respawn而不是容器重建
| 方法 | 优点 | 缺点 |
|---|---|---|
| 每个测试新容器 | 完全隔离 | 慢(每个容器10-30秒) |
| Respawn | 快速(~50ms),保留架构/迁移 | 需要仔细排除表 |
| 事务回滚 | 最快 | 不能测试提交行为 |
何时使用Respawn:
- 测试通过xUnit集合固定装置共享容器
- 您需要测试实际提交(不仅仅是回滚)
- 容器启动时间是瓶颈
性能提示
- 重用容器 - 在集合中跨测试共享固定装置
- 使用Respawn - 无需重新创建容器即可重置数据
- 并行执行 - TestContainers自动处理端口冲突
- 使用轻量级镜像 - Alpine版本更小、更快
- 缓存镜像 - Docker会本地缓存已拉取的镜像
- 限制容器资源 - 如有需要,设置CPU/内存限制:
.WithResourceMapping(new CpuCount(2))
.WithResourceMapping(new MemoryLimit(512 * 1024 * 1024)) // 512MB