压力测试 stress-testing

压力测试是一种软件测试方法,用于验证系统在极端负载条件下的稳定性和性能,帮助确定系统的最大处理能力,并识别可能导致性能退化或故障的瓶颈。

测试 0 次安装 0 次浏览 更新于 3/4/2026

压力测试

概览

压力测试将系统推向超出正常运行能力以识别故障点、故障模式和恢复行为。它验证了系统在极端条件下的稳定性,并帮助确定在性能退化或故障前的最大容量。

何时使用

  • 寻找系统容量限制
  • 识别故障点
  • 测试自动伸缩行为
  • 在负载下验证错误处理
  • 测试故障后的恢复
  • 规划容量需求
  • 验证优雅降级
  • 测试高峰流量处理

测试类型

  • 压力测试:逐渐增加负载直至失败
  • 高峰测试:突然大幅度增加负载
  • 浸泡测试:在长时间内持续高负载
  • 容量测试:寻找最大可持续负载
  • 体积测试:大量数据卷
  • 可伸缩性测试:在不同规模下的性能

指令

1. k6 压力测试

// stress-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';

const errorRate = new Rate('errors');

export const options = {
  stages: [
    // 压力测试:逐步增加负载
    { duration: '2m', target: 100 },    // 正常负载
    { duration: '5m', target: 100 },    // 维持正常
    { duration: '2m', target: 200 },    // 超过正常
    { duration: '5m', target: 200 },    // 维持超过正常
    { duration: '2m', target: 300 },    // 接近故障点
    { duration: '5m', target: 300 },    // 维持高负载
    { duration: '2m', target: 400 },    // 超出容量
    { duration: '5m', target: 400 },    // 系统处于压力下
    { duration: '5m', target: 0 },      // 逐渐恢复
  ],
  thresholds: {
    http_req_duration: ['p(99)<1000'],  // 压力期间99%低于1s
    http_req_failed: ['rate<0.05'],     // 压力下允许5%的错误率
    errors: ['rate<0.1'],
  },
};

const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';

export function setup() {
  // 准备测试数据
  const res = http.post(`${BASE_URL}/api/auth/login`, {
    email: 'stress-test@example.com',
    password: 'test123',
  });

  return { token: res.json('token') };
}

export default function (data) {
  const headers = {
    Authorization: `Bearer ${data.token}`,
    'Content-Type': 'application/json',
  };

  // 重型数据库查询
  const productsRes = http.get(
    `${BASE_URL}/api/products?page=1&limit=100`,
    { headers }
  );

  const productsCheck = check(productsRes, {
    'products loaded': (r) => r.status === 200,
    'has products': (r) => r.json('products').length > 0,
  });

  if (!productsCheck) {
    errorRate.add(1);
    console.error(`Products failed: ${productsRes.status} ${productsRes.body}`);
  }

  sleep(1);

  // 写操作 - 压力数据库
  const orderPayload = JSON.stringify({
    items: [
      { productId: Math.floor(Math.random() * 100), quantity: 2 },
    ],
  });

  const orderRes = http.post(`${BASE_URL}/api/orders`, orderPayload, {
    headers,
  });

  const orderCheck = check(orderRes, {
    'order created': (r) => r.status === 201 || r.status === 503,
    'response within 5s': (r) => r.timings.duration < 5000,
  });

  if (!orderCheck) {
    errorRate.add(1);
  }

  // 监控性能退化
  if (orderRes.status === 503) {
    console.log('Service unavailable - system at capacity');
  }

  sleep(1);
}

export function teardown(data) {
  // 记录最终指标
  console.log('Stress test completed');
}

2. 高峰测试

// spike-test.js
import http from 'k6/http';
import { check } from 'k6';

export const options = {
  stages: [
    { duration: '30s', target: 10 },     // 正常基线
    { duration: '1m', target: 10 },      // 稳定基线
    { duration: '10s', target: 1000 },   // 峰值!增加100倍
    { duration: '3m', target: 1000 },    // 维持峰值
    { duration: '10s', target: 10 },     // 回落
    { duration: '3m', target: 10 },      // 恢复期
  ],
  thresholds: {
    http_req_duration: ['p(95)<5000'],   // 峰值期间允许性能退化
    http_req_failed: ['rate<0.1'],       // 峰值期间允许10%错误
  },
};

export default function () {
  const res = http.get('http://api.example.com/health');

  check(res, {
    'system responsive': (r) => r.status === 200 || r.status === 429,
    'response received': (r) => r.body.length > 0,
  });
}

3. 浸泡/耐久性测试

# soak_test.py
import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
import psutil
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SoakTest:
    """运行持续负载测试以检测内存泄漏和性能退化。"""

    def __init__(self, url, duration_hours=4, requests_per_second=50):
        self.url = url
        self.duration = timedelta(hours=duration_hours)
        self.rps = requests_per_second
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'response_times': [],
            'memory_usage': [],
        }

    async def make_request(self, session):
        """单个请求并记录指标。"""
        start = time.time()
        try:
            async with session.get(self.url) as response:
                await response.read()
                duration = time.time() - start

                self.metrics['requests'] += 1
                self.metrics['response_times'].append(duration)

                if response.status >= 400:
                    self.metrics['errors'] += 1
                    logger.warning(f"Error: {response.status}")

        except Exception as e:
            self.metrics['errors'] += 1
            logger.error(f"Request failed: {e}")

    async def worker(self, session):
        """以目标速率发起请求的工作器。"""
        while self.running:
            await self.make_request(session)
            await asyncio.sleep(1 / self.rps)

    def monitor_resources(self):
        """监控系统资源。"""
        process = psutil.Process()
        return {
            'memory_mb': process.memory_info().rss / 1024 / 1024,
            'cpu_percent': process.cpu_percent(),
            'timestamp': datetime.now(),
        }

    async def run(self):
        """执行浸泡测试。"""
        start_time = datetime.now()
        end_time = start_time + self.duration
        self.running = True

        logger.info(f"Starting soak test for {self.duration}")
        logger.info(f"Target: {self.rps} req/s to {self.url}")

        async with aiohttp.ClientSession() as session:
            # 启动工作器
            workers = [
                asyncio.create_task(self.worker(session))
                for _ in range(10)  # 10个并发工作器
            ]

            # 定期监控资源
            while datetime.now() < end_time:
                await asyncio.sleep(60)  # 每分钟检查一次

                resources = self.monitor_resources()
                self.metrics['memory_usage'].append(resources)

                # 记录进度
                elapsed = (datetime.now() - start_time).total_seconds()
                error_rate = self.metrics['errors'] / max(self.metrics['requests'], 1)
                avg_response = sum(self.metrics['response_times'][-1000:]) / 1000

                logger.info(
                    f"Elapsed: {elapsed:.0f}s | "
                    f"Requests: {self.metrics['requests']} | "
                    f"Error Rate: {error_rate:.2%} | "
                    f"Avg Response: {avg_response:.3f}s | "
                    f"Memory: {resources['memory_mb']:.1f}MB"
                )

                # 检查内存泄漏
                if len(self.metrics['memory_usage']) > 10:
                    initial_mem = self.metrics['memory_usage'][0]['memory_mb']
                    current_mem = resources['memory_mb']
                    growth = current_mem - initial_mem

                    if growth > 500:  # 500MB增长
                        logger.warning(f"Possible memory leak: +{growth:.1f}MB")

            # 停止工作器
            self.running = False
            await asyncio.gather(*workers, return_exceptions=True)

        self.report()

    def report(self):
        """生成测试报告。"""
        total_requests = self.metrics['requests']
        error_rate = self.metrics['errors'] / total_requests if total_requests > 0 else 0
        response_times = self.metrics['response_times']

        print("
" + "="*60)
        print("SOAK TEST RESULTS")
        print("="*60)
        print(f"Total Requests: {total_requests:,}")
        print(f"Total Errors: {self.metrics['errors']:,}")
        print(f"Error Rate: {error_rate:.2%}")
        print(f"
Response Times:")
        print(f"  Min: {min(response_times):.3f}s")
        print(f"  Max: {max(response_times):.3f}s")
        print(f"  Mean: {sum(response_times)/len(response_times):.3f}s")
        print(f"  P95: {sorted(response_times)[int(len(response_times)*0.95)]:.3f}s")

        # 内存分析
        if self.metrics['memory_usage']:
            initial_mem = self.metrics['memory_usage'][0]['memory_mb']
            final_mem = self.metrics['memory_usage'][-1]['memory_mb']
            growth = final_mem - initial_mem

            print(f"
Memory Usage:")
            print(f"  Initial: {initial_mem:.1f}MB")
            print(f"  Final: {final_mem:.1f}MB")
            print(f"  Growth: {growth:.1f}MB ({growth/initial_mem*100:.1f}%)")

            if growth > 200:
                print("  ⚠️  Possible memory leak detected!")

        print("="*60)

# 运行浸泡测试
if __name__ == '__main__':
    test = SoakTest(
        url='http://api.example.com/products',
        duration_hours=4,
        requests_per_second=50
    )
    asyncio.run(test.run())

4. JMeter 压力测试

<!-- stress-test.jmx -->
<jmeterTestPlan>
  <ThreadGroup testname="Stress Test Thread Group">
    <!-- Ultimate Thread Group for advanced load patterns -->
    <elementProp name="ThreadGroup.main_controller">
      <!-- Stage 1: Ramp up to 100 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>100</stringProp>  <!-- Users -->
        <stringProp>60</stringProp>   <!-- Ramp-up (sec) -->
        <stringProp>300</stringProp>  <!-- Duration (sec) -->
      </collectionProp>

      <!-- Stage 2: Ramp up to 500 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>500</stringProp>
        <stringProp>120</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>

      <!-- Stage 3: Ramp up to 1000 users (stress) -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>1000</stringProp>
        <stringProp>180</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>
    </elementProp>

    <HTTPSamplerProxy testname="Heavy Query">
      <stringProp name="HTTPSampler.domain">api.example.com</stringProp>
      <stringProp name="HTTPSampler.path">/api/search?q=stress</stringProp>
      <stringProp name="HTTPSampler.method">GET</stringProp>
    </HTTPSamplerProxy>

    <!-- Monitor for errors and degradation -->
    <ResponseAssertion testname="Allow 503 During Stress">
      <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
      <stringProp name="Assertion.test_type">8</stringProp>
      <stringProp>200|503</stringProp>
    </ResponseAssertion>
  </ThreadGroup>
</jmeterTestPlan>

5. 自动伸缩验证

// test-autoscaling.ts
import { test, expect } from '@playwright/test';
import axios from 'axios';

test.describe('Auto-scaling Stress Test', () => {
  test('system should scale up under load', async () => {
    const baseUrl = 'http://api.example.com';
    const cloudwatch = new AWS.CloudWatch();

    // 初始实例计数
    const initialInstances = await getInstanceCount();
    console.log(`Initial instances: ${initialInstances}`);

    // 产生高负载
    const requests = [];
    for (let i = 0; i < 1000; i++) {
      requests.push(
        axios.get(`${baseUrl}/api/heavy-operation`)
          .catch(err => ({ error: err.message }))
      );
    }

    // 等待自动伸缩触发
    await Promise.all(requests);
    await new Promise(resolve => setTimeout(resolve, 120000)); // 2 min

    // 检查是否扩展
    const scaledInstances = await getInstanceCount();
    console.log(`Scaled instances: ${scaledInstances}`);

    expect(scaledInstances).toBeGreaterThan(initialInstances);

    // 验证指标
    const cpuMetrics = await cloudwatch.getMetricStatistics({
      Namespace: 'AWS/EC2',
      MetricName: 'CPUUtilization',
      // ... metric params
    }).promise();

    expect(cpuMetrics.Datapoints.some(d => d.Average > 70)).toBe(true);
  });
});

6. 故障点分析

# find_breaking_point.py
import requests
import threading
import time
from collections import defaultdict

class BreakingPointTest:
    """通过逐渐增加负载找到系统的故障点。"""

    def __init__(self, url):
        self.url = url
        self.results = defaultdict(lambda: {'success': 0, 'errors': 0, 'times': []})
        self.running = True

    def worker(self, vusers):
        """发起请求的工作线程。"""
        while self.running:
            start = time.time()
            try:
                response = requests.get(self.url, timeout=10)
                duration = time.time() - start

                if response.status_code == 200:
                    self.results[vusers]['success'] += 1
                    self.results[vusers]['times'].append(duration)
                else:
                    self.results[vusers]['errors'] += 1

            except Exception as e:
                self.results[vusers]['errors'] += 1

            time.sleep(0.1)

    def test_load_level(self, vusers, duration=60):
        """用特定数量的虚拟用户测试系统。"""
        print(f"
Testing with {vusers} concurrent users...")

        threads = []
        for _ in range(vusers):
            t = threading.Thread(target=self.worker, args=(vusers,))
            t.start()
            threads.append(t)

        time.sleep(duration)

        self.running = False
        for t in threads:
            t.join()

        self.running = True

        # 分析结果
        stats = self.results[vusers]
        total = stats['success'] + stats['errors']
        error_rate = stats['errors'] / total if total > 0 else 0
        avg_time = sum(stats['times']) / len(stats['times']) if stats['times'] else 0

        print(f"  Requests: {total}")
        print(f"  Success: {stats['success']}")
        print(f"  Errors: {stats['errors']}")
        print(f"  Error Rate: {error_rate:.1%}")
        print(f"  Avg Response: {avg_time:.3f}s")

        # 如果错误率>5%或平均响应时间>5s,则系统故障
        is_breaking = error_rate > 0.05 or avg_time > 5.0

        return not is_breaking

    def find_breaking_point(self):
        """二分查找故障点。"""
        min_users = 10
        max_users = 1000
        breaking_point = None

        while min_users < max_users:
            mid = (min_users + max_users) // 2

            if self.test_load_level(mid):
                # 系统能处理这个负载,尝试更高的负载
                min_users = mid + 10
            else:
                # 系统故障,找到上限
                breaking_point = mid
                max_users = mid - 10

        print(f"
{'='*60}")
        print(f"Breaking point: ~{breaking_point} concurrent users")
        print(f"{'='*60}")

        return breaking_point

# 运行
test = BreakingPointTest('http://api.example.com/products')
test.find_breaking_point()

监控指标

应用指标

  • 响应时间(P50、P95、P99、最大值)
  • 错误率和类型
  • 吞吐量(请求/秒)
  • 队列深度
  • 断路器跳闸

系统指标

  • CPU利用率
  • 内存使用和泄漏
  • 磁盘I/O
  • 网络带宽
  • 线程/连接池

数据库指标

  • 查询执行时间
  • 连接池使用情况
  • 锁争用
  • 缓存命中率
  • 复制延迟

最佳实践

✅ DO

  • 在类似生产的环境中测试
  • 监控所有系统资源
  • 逐步增加负载以找到限制
  • 在压力后测试恢复
  • 记录故障点
  • 测试自动伸缩行为
  • 计划优雅降级
  • 监控内存泄漏

❌ DON’T

  • 没有保护措施的情况下在生产中测试
  • 跳过恢复测试
  • 忽略警告信号(CPU、内存)
  • 只测试成功场景
  • 假设线性可伸缩性
  • 忘记数据库容量
  • 跳过监控第三方依赖
  • 测试时没有适当的清理

工具

  • 负载生成:k6、JMeter、Gatling、Locust、Artillery
  • 监控:Prometheus、Grafana、DataDog、New Relic
  • 云指标:CloudWatch、Azure Monitor、GCP Monitoring
  • 分析:py-spy、async-profiler、clinic.js

示例

另见:性能测试、持续测试、API版本策略,以进行全面的系统测试。