名称: testng-parallel 用户可调用: false 描述: 用于配置TestNG并行测试执行,包括线程池、套件配置和同步。 允许工具: [读取, 写入, 编辑, Bash, Glob, Grep]
TestNG并行执行
掌握TestNG并行测试执行,包括线程池配置、套件级并行、方法级并行和线程安全模式。此技能涵盖最大化测试吞吐量同时保持测试可靠性的技术。
概述
TestNG支持多个级别的并行执行:套件、测试、类和方法。正确配置并行可以显著减少测试执行时间,但需要仔细考虑线程安全和资源管理。
并行执行模式
套件级并行
并行运行多个<test>标签:
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="并行套件" parallel="tests" thread-count="3">
<test name="Chrome测试">
<classes>
<class name="com.example.tests.BrowserTest"/>
</classes>
</test>
<test name="Firefox测试">
<classes>
<class name="com.example.tests.BrowserTest"/>
</classes>
</test>
<test name="Safari测试">
<classes>
<class name="com.example.tests.BrowserTest"/>
</classes>
</test>
</suite>
类级并行
并行运行测试类:
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="并行类" parallel="classes" thread-count="4">
<test name="所有测试">
<classes>
<class name="com.example.tests.UserServiceTest"/>
<class name="com.example.tests.ProductServiceTest"/>
<class name="com.example.tests.OrderServiceTest"/>
<class name="com.example.tests.PaymentServiceTest"/>
</classes>
</test>
</suite>
方法级并行
并行运行测试方法:
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="并行方法" parallel="methods" thread-count="5">
<test name="服务测试">
<classes>
<class name="com.example.tests.IndependentMethodsTest"/>
</classes>
</test>
</suite>
实例级并行
并行运行测试实例(与Factory结合使用):
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="并行实例" parallel="instances" thread-count="3">
<test name="工厂测试">
<classes>
<class name="com.example.tests.FactoryGeneratedTest"/>
</classes>
</test>
</suite>
线程池配置
基础线程配置
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="线程池套件" parallel="methods" thread-count="10">
<!-- 全局线程池配置 -->
<test name="测试组1" thread-count="5">
<!-- 为此特定测试覆盖 -->
<classes>
<class name="com.example.tests.Test1"/>
</classes>
</test>
<test name="测试组2">
<!-- 使用套件级线程数 -->
<classes>
<class name="com.example.tests.Test2"/>
</classes>
</test>
</suite>
数据提供器并行执行
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="数据提供器套件" data-provider-thread-count="20">
<test name="数据驱动测试">
<classes>
<class name="com.example.tests.ParallelDataProviderTest"/>
</classes>
</test>
</suite>
public class ParallelDataProviderTest {
@DataProvider(name = "largeDataSet", parallel = true)
public Object[][] provideLargeDataSet() {
Object[][] data = new Object[100][2];
for (int i = 0; i < 100; i++) {
data[i] = new Object[]{"User" + i, "user" + i + "@example.com"};
}
return data;
}
@Test(dataProvider = "largeDataSet")
public void testWithParallelData(String name, String email) {
System.out.println(Thread.currentThread().getName() + " - 测试: " + name);
// 每个数据行并行运行
}
}
线程安全模式
线程本地存储
import org.testng.annotations.*;
public class ThreadLocalTest {
// 线程本地存储用于测试特定资源
private static ThreadLocal<WebDriver> driverThread = new ThreadLocal<>();
private static ThreadLocal<String> sessionThread = new ThreadLocal<>();
@BeforeMethod
public void setUp() {
// 初始化线程本地资源
driverThread.set(createWebDriver());
sessionThread.set(generateSessionId());
}
@AfterMethod
public void tearDown() {
// 清理线程本地资源
WebDriver driver = driverThread.get();
if (driver != null) {
driver.quit();
}
driverThread.remove();
sessionThread.remove();
}
@Test
public void testParallelBrowser() {
WebDriver driver = driverThread.get();
String session = sessionThread.get();
System.out.println("线程: " + Thread.currentThread().getName() +
" 会话: " + session);
// 使用线程本地驱动程序
}
private WebDriver createWebDriver() {
// 创建浏览器实例
return new ChromeDriver();
}
private String generateSessionId() {
return UUID.randomUUID().toString();
}
}
同步共享资源
import org.testng.annotations.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
public class SynchronizedResourceTest {
// 线程安全计数器
private static AtomicInteger testCounter = new AtomicInteger(0);
// 线程安全集合
private static ConcurrentHashMap<String, String> sharedCache = new ConcurrentHashMap<>();
// 临界区锁对象
private static final Object lock = new Object();
@Test(threadPoolSize = 5, invocationCount = 100)
public void testAtomicOperations() {
int count = testCounter.incrementAndGet();
System.out.println("测试计数: " + count);
}
@Test(threadPoolSize = 3, invocationCount = 50)
public void testConcurrentMap() {
String threadName = Thread.currentThread().getName();
sharedCache.put(threadName, String.valueOf(System.currentTimeMillis()));
// 无需显式同步的线程安全
}
@Test(threadPoolSize = 2, invocationCount = 10)
public void testSynchronizedBlock() {
synchronized (lock) {
// 临界区 - 一次只一个线程
performCriticalOperation();
}
}
private void performCriticalOperation() {
// 需要独占访问的操作
}
}
不可变测试数据
import java.util.Collections;
import java.util.List;
import java.util.Arrays;
public class ImmutableDataTest {
// 不可变测试数据 - 天生线程安全
private static final List<String> TEST_USERS = Collections.unmodifiableList(
Arrays.asList("user1", "user2", "user3", "user4", "user5")
);
private static final Map<String, String> CONFIG = Collections.unmodifiableMap(
Map.of(
"url", "https://api.example.com",
"timeout", "30000",
"retries", "3"
)
);
@Test(threadPoolSize = 5, invocationCount = 20)
public void testWithImmutableData() {
// 从多个线程安全读取
int userIndex = ThreadLocalRandom.current().nextInt(TEST_USERS.size());
String user = TEST_USERS.get(userIndex);
String url = CONFIG.get("url");
System.out.println(Thread.currentThread().getName() +
" - 用户: " + user + ", URL: " + url);
}
}
测试隔离模式
独立测试方法
public class IndependentTestsExample {
// 每个测试方法完全独立
@Test
public void testFeatureA() {
// 创建自己的资源
UserService service = new UserService();
User user = service.createUser("testA");
assertNotNull(user);
// 清理
service.deleteUser(user.getId());
}
@Test
public void testFeatureB() {
// 与testFeatureA完全分离
ProductService service = new ProductService();
Product product = service.createProduct("testB");
assertNotNull(product);
service.deleteProduct(product.getId());
}
@Test
public void testFeatureC() {
// 不与其他测试共享状态
OrderService service = new OrderService();
Order order = service.createOrder();
assertNotNull(order);
service.cancelOrder(order.getId());
}
}
隔离数据库测试
import org.testng.annotations.*;
public class IsolatedDatabaseTest {
private Connection connection;
private String testSchema;
@BeforeMethod
public void setUp() throws SQLException {
// 为每个测试创建隔离模式
testSchema = "test_" + Thread.currentThread().getId() + "_" + System.currentTimeMillis();
connection = DriverManager.getConnection(DB_URL, USER, PASSWORD);
connection.createStatement().execute("CREATE SCHEMA " + testSchema);
connection.setCatalog(testSchema);
initializeTestData();
}
@AfterMethod
public void tearDown() throws SQLException {
// 删除隔离模式
connection.createStatement().execute("DROP SCHEMA " + testSchema + " CASCADE");
connection.close();
}
@Test
public void testDatabaseOperation1() throws SQLException {
// 在隔离模式中操作
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO users (name) VALUES (?)"
);
ps.setString(1, "TestUser1");
ps.executeUpdate();
ResultSet rs = connection.createStatement().executeQuery(
"SELECT COUNT(*) FROM users"
);
rs.next();
assertEquals(rs.getInt(1), 1);
}
@Test
public void testDatabaseOperation2() throws SQLException {
// 与testDatabaseOperation1完全隔离
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO products (name) VALUES (?)"
);
ps.setString(1, "TestProduct");
ps.executeUpdate();
}
private void initializeTestData() throws SQLException {
// 在隔离模式中创建表
}
}
并行执行与依赖
在组内保持顺序
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="有序并行" parallel="classes" thread-count="3">
<test name="有序测试" preserve-order="true">
<classes>
<!-- 类并行运行,方法按顺序 -->
<class name="com.example.tests.OrderedTest1">
<methods>
<include name="step1"/>
<include name="step2"/>
<include name="step3"/>
</methods>
</class>
<class name="com.example.tests.OrderedTest2"/>
</classes>
</test>
</suite>
分组线程
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="分组线程" parallel="methods" thread-count="4" group-by-instances="true">
<test name="实例分组">
<classes>
<class name="com.example.tests.InstanceGroupTest"/>
</classes>
</test>
</suite>
public class InstanceGroupTest {
private String instanceId;
@Factory
public Object[] createInstances() {
return new Object[] {
new InstanceGroupTest("instance1"),
new InstanceGroupTest("instance2"),
new InstanceGroupTest("instance3")
};
}
public InstanceGroupTest() {}
public InstanceGroupTest(String instanceId) {
this.instanceId = instanceId;
}
@Test
public void step1() {
System.out.println(instanceId + " - 步骤1");
}
@Test(dependsOnMethods = "step1")
public void step2() {
System.out.println(instanceId + " - 步骤2");
}
@Test(dependsOnMethods = "step2")
public void step3() {
System.out.println(instanceId + " - 步骤3");
}
}
性能优化
最优线程数
public class ThreadCountOptimization {
// 根据可用资源确定最优线程数
public static int getOptimalThreadCount() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
// 对于CPU密集型测试
int cpuBoundThreads = availableProcessors;
// 对于I/O密集型测试(网络、文件、数据库)
int ioBoundThreads = availableProcessors * 2;
// 对于混合工作负载
int mixedThreads = (int) (availableProcessors * 1.5);
return mixedThreads;
}
}
资源池模式
import java.util.concurrent.*;
public class ResourcePoolTest {
// 并行测试的连接池
private static BlockingQueue<Connection> connectionPool;
@BeforeSuite
public void setUpSuite() {
int poolSize = 10;
connectionPool = new ArrayBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
connectionPool.offer(createConnection());
}
}
@AfterSuite
public void tearDownSuite() {
Connection conn;
while ((conn = connectionPool.poll()) != null) {
closeConnection(conn);
}
}
@Test(threadPoolSize = 5, invocationCount = 50)
public void testWithPooledConnection() throws InterruptedException {
Connection conn = connectionPool.take(); // 借用
try {
// 使用连接
performDatabaseOperation(conn);
} finally {
connectionPool.offer(conn); // 归还
}
}
private Connection createConnection() {
// 创建数据库连接
return null;
}
private void closeConnection(Connection conn) {
// 关闭连接
}
private void performDatabaseOperation(Connection conn) {
// 数据库操作
}
}
并行测试报告
自定义并行执行报告器
import org.testng.*;
import java.util.concurrent.ConcurrentHashMap;
public class ParallelTestReporter implements ITestListener {
private static ConcurrentHashMap<Long, List<String>> threadTestMap =
new ConcurrentHashMap<>();
@Override
public void onTestStart(ITestResult result) {
long threadId = Thread.currentThread().getId();
threadTestMap.computeIfAbsent(threadId, k -> new CopyOnWriteArrayList<>())
.add(result.getName());
}
@Override
public void onFinish(ITestContext context) {
System.out.println("
=== 线程分布报告 ===");
threadTestMap.forEach((threadId, tests) -> {
System.out.println("线程 " + threadId + ": " + tests.size() + " 测试");
tests.forEach(test -> System.out.println(" - " + test));
});
System.out.println("使用的总线程数: " + threadTestMap.size());
}
}
超时配置
public class TimeoutTest {
@Test(timeOut = 5000)
public void testWithTimeout() {
// 如果超过5秒则失败
}
@Test(timeOut = 10000, threadPoolSize = 3, invocationCount = 10)
public void testParallelWithTimeout() {
// 每次调用有10秒超时
}
}
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="超时套件" time-out="60000">
<!-- 套件级超时:总60秒 -->
<test name="快速测试" time-out="10000">
<!-- 测试级超时:此组所有测试10秒 -->
<classes>
<class name="com.example.tests.QuickTest"/>
</classes>
</test>
</suite>
最佳实践
- 设计独立性 - 测试不应依赖共享可变状态
- 使用ThreadLocal用于每线程资源 - 驱动程序、会话、连接
- 首选不可变数据 - 天生线程安全
- 设置适当的线程数 - 基于资源可用性
- 实现适当清理 - 防止并行执行中的资源泄漏
- 使用线程安全集合 - ConcurrentHashMap, CopyOnWriteArrayList
- 配置超时 - 防止挂起测试阻塞线程
- 监控线程分布 - 确保工作负载均衡
- 首先本地测试 - 验证线程安全性再进入CI/CD
- 记录线程安全要求 - 为测试作者提供清晰期望
常见陷阱
- 共享可变状态 - 导致竞态条件和脆弱测试
- 无同步的静态字段 - 非线程安全
- 资源争用 - 过多线程竞争有限资源
- 顺序依赖 - 假设执行顺序的测试
- 缺少清理 - ThreadLocal资源未移除
- 隔离不足 - 数据库测试相互影响
- 过多线程 - 开销超过好处
- 忽略超时 - 挂起测试阻塞执行
- 非确定性失败 - 难以重现并行问题
- 不适当的连接池 - 连接泄漏或耗尽
何时使用此技能
- 减少测试套件执行时间
- 配置CI/CD并行测试执行
- 实现线程安全测试基础设施
- 设计并行友好测试架构
- 故障排除并行测试失败
- 优化测试中的资源利用
- 构建可扩展测试框架
- 实现跨浏览器并行测试