压力测试
概览
压力测试将系统推向超出正常运行能力以识别故障点、故障模式和恢复行为。它验证了系统在极端条件下的稳定性,并帮助确定在性能退化或故障前的最大容量。
何时使用
- 寻找系统容量限制
- 识别故障点
- 测试自动伸缩行为
- 在负载下验证错误处理
- 测试故障后的恢复
- 规划容量需求
- 验证优雅降级
- 测试高峰流量处理
测试类型
- 压力测试:逐渐增加负载直至失败
- 高峰测试:突然大幅度增加负载
- 浸泡测试:在长时间内持续高负载
- 容量测试:寻找最大可持续负载
- 体积测试:大量数据卷
- 可伸缩性测试:在不同规模下的性能
指令
1. k6 压力测试
// stress-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';
const errorRate = new Rate('errors');
export const options = {
stages: [
// 压力测试:逐步增加负载
{ duration: '2m', target: 100 }, // 正常负载
{ duration: '5m', target: 100 }, // 维持正常
{ duration: '2m', target: 200 }, // 超过正常
{ duration: '5m', target: 200 }, // 维持超过正常
{ duration: '2m', target: 300 }, // 接近故障点
{ duration: '5m', target: 300 }, // 维持高负载
{ duration: '2m', target: 400 }, // 超出容量
{ duration: '5m', target: 400 }, // 系统处于压力下
{ duration: '5m', target: 0 }, // 逐渐恢复
],
thresholds: {
http_req_duration: ['p(99)<1000'], // 压力期间99%低于1s
http_req_failed: ['rate<0.05'], // 压力下允许5%的错误率
errors: ['rate<0.1'],
},
};
const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';
export function setup() {
// 准备测试数据
const res = http.post(`${BASE_URL}/api/auth/login`, {
email: 'stress-test@example.com',
password: 'test123',
});
return { token: res.json('token') };
}
export default function (data) {
const headers = {
Authorization: `Bearer ${data.token}`,
'Content-Type': 'application/json',
};
// 重型数据库查询
const productsRes = http.get(
`${BASE_URL}/api/products?page=1&limit=100`,
{ headers }
);
const productsCheck = check(productsRes, {
'products loaded': (r) => r.status === 200,
'has products': (r) => r.json('products').length > 0,
});
if (!productsCheck) {
errorRate.add(1);
console.error(`Products failed: ${productsRes.status} ${productsRes.body}`);
}
sleep(1);
// 写操作 - 压力数据库
const orderPayload = JSON.stringify({
items: [
{ productId: Math.floor(Math.random() * 100), quantity: 2 },
],
});
const orderRes = http.post(`${BASE_URL}/api/orders`, orderPayload, {
headers,
});
const orderCheck = check(orderRes, {
'order created': (r) => r.status === 201 || r.status === 503,
'response within 5s': (r) => r.timings.duration < 5000,
});
if (!orderCheck) {
errorRate.add(1);
}
// 监控性能退化
if (orderRes.status === 503) {
console.log('Service unavailable - system at capacity');
}
sleep(1);
}
export function teardown(data) {
// 记录最终指标
console.log('Stress test completed');
}
2. 高峰测试
// spike-test.js
import http from 'k6/http';
import { check } from 'k6';
export const options = {
stages: [
{ duration: '30s', target: 10 }, // 正常基线
{ duration: '1m', target: 10 }, // 稳定基线
{ duration: '10s', target: 1000 }, // 峰值!增加100倍
{ duration: '3m', target: 1000 }, // 维持峰值
{ duration: '10s', target: 10 }, // 回落
{ duration: '3m', target: 10 }, // 恢复期
],
thresholds: {
http_req_duration: ['p(95)<5000'], // 峰值期间允许性能退化
http_req_failed: ['rate<0.1'], // 峰值期间允许10%错误
},
};
export default function () {
const res = http.get('http://api.example.com/health');
check(res, {
'system responsive': (r) => r.status === 200 || r.status === 429,
'response received': (r) => r.body.length > 0,
});
}
3. 浸泡/耐久性测试
# soak_test.py
import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
import psutil
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SoakTest:
"""运行持续负载测试以检测内存泄漏和性能退化。"""
def __init__(self, url, duration_hours=4, requests_per_second=50):
self.url = url
self.duration = timedelta(hours=duration_hours)
self.rps = requests_per_second
self.metrics = {
'requests': 0,
'errors': 0,
'response_times': [],
'memory_usage': [],
}
async def make_request(self, session):
"""单个请求并记录指标。"""
start = time.time()
try:
async with session.get(self.url) as response:
await response.read()
duration = time.time() - start
self.metrics['requests'] += 1
self.metrics['response_times'].append(duration)
if response.status >= 400:
self.metrics['errors'] += 1
logger.warning(f"Error: {response.status}")
except Exception as e:
self.metrics['errors'] += 1
logger.error(f"Request failed: {e}")
async def worker(self, session):
"""以目标速率发起请求的工作器。"""
while self.running:
await self.make_request(session)
await asyncio.sleep(1 / self.rps)
def monitor_resources(self):
"""监控系统资源。"""
process = psutil.Process()
return {
'memory_mb': process.memory_info().rss / 1024 / 1024,
'cpu_percent': process.cpu_percent(),
'timestamp': datetime.now(),
}
async def run(self):
"""执行浸泡测试。"""
start_time = datetime.now()
end_time = start_time + self.duration
self.running = True
logger.info(f"Starting soak test for {self.duration}")
logger.info(f"Target: {self.rps} req/s to {self.url}")
async with aiohttp.ClientSession() as session:
# 启动工作器
workers = [
asyncio.create_task(self.worker(session))
for _ in range(10) # 10个并发工作器
]
# 定期监控资源
while datetime.now() < end_time:
await asyncio.sleep(60) # 每分钟检查一次
resources = self.monitor_resources()
self.metrics['memory_usage'].append(resources)
# 记录进度
elapsed = (datetime.now() - start_time).total_seconds()
error_rate = self.metrics['errors'] / max(self.metrics['requests'], 1)
avg_response = sum(self.metrics['response_times'][-1000:]) / 1000
logger.info(
f"Elapsed: {elapsed:.0f}s | "
f"Requests: {self.metrics['requests']} | "
f"Error Rate: {error_rate:.2%} | "
f"Avg Response: {avg_response:.3f}s | "
f"Memory: {resources['memory_mb']:.1f}MB"
)
# 检查内存泄漏
if len(self.metrics['memory_usage']) > 10:
initial_mem = self.metrics['memory_usage'][0]['memory_mb']
current_mem = resources['memory_mb']
growth = current_mem - initial_mem
if growth > 500: # 500MB增长
logger.warning(f"Possible memory leak: +{growth:.1f}MB")
# 停止工作器
self.running = False
await asyncio.gather(*workers, return_exceptions=True)
self.report()
def report(self):
"""生成测试报告。"""
total_requests = self.metrics['requests']
error_rate = self.metrics['errors'] / total_requests if total_requests > 0 else 0
response_times = self.metrics['response_times']
print("
" + "="*60)
print("SOAK TEST RESULTS")
print("="*60)
print(f"Total Requests: {total_requests:,}")
print(f"Total Errors: {self.metrics['errors']:,}")
print(f"Error Rate: {error_rate:.2%}")
print(f"
Response Times:")
print(f" Min: {min(response_times):.3f}s")
print(f" Max: {max(response_times):.3f}s")
print(f" Mean: {sum(response_times)/len(response_times):.3f}s")
print(f" P95: {sorted(response_times)[int(len(response_times)*0.95)]:.3f}s")
# 内存分析
if self.metrics['memory_usage']:
initial_mem = self.metrics['memory_usage'][0]['memory_mb']
final_mem = self.metrics['memory_usage'][-1]['memory_mb']
growth = final_mem - initial_mem
print(f"
Memory Usage:")
print(f" Initial: {initial_mem:.1f}MB")
print(f" Final: {final_mem:.1f}MB")
print(f" Growth: {growth:.1f}MB ({growth/initial_mem*100:.1f}%)")
if growth > 200:
print(" ⚠️ Possible memory leak detected!")
print("="*60)
# 运行浸泡测试
if __name__ == '__main__':
test = SoakTest(
url='http://api.example.com/products',
duration_hours=4,
requests_per_second=50
)
asyncio.run(test.run())
4. JMeter 压力测试
<!-- stress-test.jmx -->
<jmeterTestPlan>
<ThreadGroup testname="Stress Test Thread Group">
<!-- Ultimate Thread Group for advanced load patterns -->
<elementProp name="ThreadGroup.main_controller">
<!-- Stage 1: Ramp up to 100 users -->
<collectionProp name="ultimatethreadgroupdata">
<stringProp>100</stringProp> <!-- Users -->
<stringProp>60</stringProp> <!-- Ramp-up (sec) -->
<stringProp>300</stringProp> <!-- Duration (sec) -->
</collectionProp>
<!-- Stage 2: Ramp up to 500 users -->
<collectionProp name="ultimatethreadgroupdata">
<stringProp>500</stringProp>
<stringProp>120</stringProp>
<stringProp>600</stringProp>
</collectionProp>
<!-- Stage 3: Ramp up to 1000 users (stress) -->
<collectionProp name="ultimatethreadgroupdata">
<stringProp>1000</stringProp>
<stringProp>180</stringProp>
<stringProp>600</stringProp>
</collectionProp>
</elementProp>
<HTTPSamplerProxy testname="Heavy Query">
<stringProp name="HTTPSampler.domain">api.example.com</stringProp>
<stringProp name="HTTPSampler.path">/api/search?q=stress</stringProp>
<stringProp name="HTTPSampler.method">GET</stringProp>
</HTTPSamplerProxy>
<!-- Monitor for errors and degradation -->
<ResponseAssertion testname="Allow 503 During Stress">
<stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
<stringProp name="Assertion.test_type">8</stringProp>
<stringProp>200|503</stringProp>
</ResponseAssertion>
</ThreadGroup>
</jmeterTestPlan>
5. 自动伸缩验证
// test-autoscaling.ts
import { test, expect } from '@playwright/test';
import axios from 'axios';
test.describe('Auto-scaling Stress Test', () => {
test('system should scale up under load', async () => {
const baseUrl = 'http://api.example.com';
const cloudwatch = new AWS.CloudWatch();
// 初始实例计数
const initialInstances = await getInstanceCount();
console.log(`Initial instances: ${initialInstances}`);
// 产生高负载
const requests = [];
for (let i = 0; i < 1000; i++) {
requests.push(
axios.get(`${baseUrl}/api/heavy-operation`)
.catch(err => ({ error: err.message }))
);
}
// 等待自动伸缩触发
await Promise.all(requests);
await new Promise(resolve => setTimeout(resolve, 120000)); // 2 min
// 检查是否扩展
const scaledInstances = await getInstanceCount();
console.log(`Scaled instances: ${scaledInstances}`);
expect(scaledInstances).toBeGreaterThan(initialInstances);
// 验证指标
const cpuMetrics = await cloudwatch.getMetricStatistics({
Namespace: 'AWS/EC2',
MetricName: 'CPUUtilization',
// ... metric params
}).promise();
expect(cpuMetrics.Datapoints.some(d => d.Average > 70)).toBe(true);
});
});
6. 故障点分析
# find_breaking_point.py
import requests
import threading
import time
from collections import defaultdict
class BreakingPointTest:
"""通过逐渐增加负载找到系统的故障点。"""
def __init__(self, url):
self.url = url
self.results = defaultdict(lambda: {'success': 0, 'errors': 0, 'times': []})
self.running = True
def worker(self, vusers):
"""发起请求的工作线程。"""
while self.running:
start = time.time()
try:
response = requests.get(self.url, timeout=10)
duration = time.time() - start
if response.status_code == 200:
self.results[vusers]['success'] += 1
self.results[vusers]['times'].append(duration)
else:
self.results[vusers]['errors'] += 1
except Exception as e:
self.results[vusers]['errors'] += 1
time.sleep(0.1)
def test_load_level(self, vusers, duration=60):
"""用特定数量的虚拟用户测试系统。"""
print(f"
Testing with {vusers} concurrent users...")
threads = []
for _ in range(vusers):
t = threading.Thread(target=self.worker, args=(vusers,))
t.start()
threads.append(t)
time.sleep(duration)
self.running = False
for t in threads:
t.join()
self.running = True
# 分析结果
stats = self.results[vusers]
total = stats['success'] + stats['errors']
error_rate = stats['errors'] / total if total > 0 else 0
avg_time = sum(stats['times']) / len(stats['times']) if stats['times'] else 0
print(f" Requests: {total}")
print(f" Success: {stats['success']}")
print(f" Errors: {stats['errors']}")
print(f" Error Rate: {error_rate:.1%}")
print(f" Avg Response: {avg_time:.3f}s")
# 如果错误率>5%或平均响应时间>5s,则系统故障
is_breaking = error_rate > 0.05 or avg_time > 5.0
return not is_breaking
def find_breaking_point(self):
"""二分查找故障点。"""
min_users = 10
max_users = 1000
breaking_point = None
while min_users < max_users:
mid = (min_users + max_users) // 2
if self.test_load_level(mid):
# 系统能处理这个负载,尝试更高的负载
min_users = mid + 10
else:
# 系统故障,找到上限
breaking_point = mid
max_users = mid - 10
print(f"
{'='*60}")
print(f"Breaking point: ~{breaking_point} concurrent users")
print(f"{'='*60}")
return breaking_point
# 运行
test = BreakingPointTest('http://api.example.com/products')
test.find_breaking_point()
监控指标
应用指标
- 响应时间(P50、P95、P99、最大值)
- 错误率和类型
- 吞吐量(请求/秒)
- 队列深度
- 断路器跳闸
系统指标
- CPU利用率
- 内存使用和泄漏
- 磁盘I/O
- 网络带宽
- 线程/连接池
数据库指标
- 查询执行时间
- 连接池使用情况
- 锁争用
- 缓存命中率
- 复制延迟
最佳实践
✅ DO
- 在类似生产的环境中测试
- 监控所有系统资源
- 逐步增加负载以找到限制
- 在压力后测试恢复
- 记录故障点
- 测试自动伸缩行为
- 计划优雅降级
- 监控内存泄漏
❌ DON’T
- 没有保护措施的情况下在生产中测试
- 跳过恢复测试
- 忽略警告信号(CPU、内存)
- 只测试成功场景
- 假设线性可伸缩性
- 忘记数据库容量
- 跳过监控第三方依赖
- 测试时没有适当的清理
工具
- 负载生成:k6、JMeter、Gatling、Locust、Artillery
- 监控:Prometheus、Grafana、DataDog、New Relic
- 云指标:CloudWatch、Azure Monitor、GCP Monitoring
- 分析:py-spy、async-profiler、clinic.js
示例
另见:性能测试、持续测试、API版本策略,以进行全面的系统测试。