CircuitBreakerPatternSkill circuit-breaker-pattern

电路断路器模式是一种容错模式,用于在依赖服务失败时防止系统级联故障,提供优雅的降级策略,并实现自动故障检测和回退机制。

架构设计 0 次安装 0 次浏览 更新于 3/3/2026

以下是电路断路器模式的中文翻译:


name: 电路断路器模式 description: 实现电路断路器模式以增强容错能力,自动故障检测和回退机制。在调用外部服务、处理级联故障或实现弹性模式时使用。

电路断路器模式

概览

实现电路断路器模式以防止级联故障,并在依赖项失败时提供优雅的降级。

使用场景

  • 外部API调用
  • 微服务通信
  • 数据库连接
  • 第三方服务集成
  • 防止级联故障
  • 实现回退机制
  • 限流保护
  • 超时处理

电路状态

┌──────────┐
│  CLOSED  │  ←─ 正常操作
└────┬─────┘
     │ 超过故障阈值
     ▼
┌──────────┐
│   OPEN   │  ←─ 拒绝请求
└────┬─────┘
     │ 超时到期
     ▼
┌──────────┐
│HALF-OPEN │  ←─ 测试恢复
└────┬─────┘
     │ 成功/失败
     ▼
返回CLOSED或OPEN

实现示例

1. TypeScript 电路断路器

enum CircuitState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN'
}

interface CircuitBreakerConfig {
  failureThreshold: number;
  successThreshold: number;
  timeout: number;
  resetTimeout: number;
}

interface CircuitBreakerStats {
  failures: number;
  successes: number;
  consecutiveFailures: number;
  consecutiveSuccesses: number;
  lastFailureTime?: number;
}

class CircuitBreaker {
  private state: CircuitState = CircuitState.CLOSED;
  private stats: CircuitBreakerStats = {
    failures: 0,
    successes: 0,
    consecutiveFailures: 0,
    consecutiveSuccesses: 0
  };
  private nextAttempt: number = Date.now();

  constructor(private config: CircuitBreakerConfig) {}

  async execute<T>(
    operation: () => Promise<T>,
    fallback?: () => T | Promise<T>
  ): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() < this.nextAttempt) {
        console.log('Circuit breaker OPEN, using fallback');

        if (fallback) {
          return await fallback();
        }

        throw new Error('Circuit breaker is OPEN');
      }

      // 尝试恢复
      this.state = CircuitState.HALF_OPEN;
      console.log('Circuit breaker entering HALF_OPEN state');
    }

    try {
      const result = await this.executeWithTimeout(operation);
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();

      if (fallback) {
        return await fallback();
      }

      throw error;
    }
  }

  private async executeWithTimeout<T>(
    operation: () => Promise<T>
  ): Promise<T> {
    return Promise.race([
      operation(),
      new Promise<T>((_, reject) =>
        setTimeout(
          () => reject(new Error('Operation timeout')),
          this.config.timeout
        )
      )
    ]);
  }

  private onSuccess(): void {
    this.stats.successes++;
    this.stats.consecutiveSuccesses++;
    this.stats.consecutiveFailures = 0;

    if (this.state === CircuitState.HALF_OPEN) {
      if (
        this.stats.consecutiveSuccesses >= this.config.successThreshold
      ) {
        console.log('Circuit breaker CLOSED after recovery');
        this.state = CircuitState.CLOSED;
        this.resetStats();
      }
    }
  }

  private onFailure(): void {
    this.stats.failures++;
    this.stats.consecutiveFailures++;
    this.stats.consecutiveSuccesses = 0;
    this.stats.lastFailureTime = Date.now();

    if (this.state === CircuitState.HALF_OPEN) {
      console.log('Circuit breaker OPEN after failed recovery');
      this.trip();
      return;
    }

    if (
      this.state === CircuitState.CLOSED &&
      this.stats.consecutiveFailures >= this.config.failureThreshold
    ) {
      console.log('Circuit breaker OPEN after threshold reached');
      this.trip();
    }
  }

  private trip(): void {
    this.state = CircuitState.OPEN;
    this.nextAttempt = Date.now() + this.config.resetTimeout;
  }

  private resetStats(): void {
    this.stats = {
      failures: 0,
      successes: 0,
      consecutiveFailures: 0,
      consecutiveSuccesses: 0
    };
  }

  getState(): CircuitState {
    return this.state;
  }

  getStats(): CircuitBreakerStats {
    return { ...this.stats };
  }

  reset(): void {
    this.state = CircuitState.CLOSED;
    this.resetStats();
  }
}

// 使用
const breaker = new CircuitBreaker({
  failureThreshold: 5,
  successThreshold: 2,
  timeout: 3000,
  resetTimeout: 60000
});

async function callExternalAPI() {
  return breaker.execute(
    async () => {
      const response = await fetch('https://api.example.com/data');
      if (!response.ok) throw new Error('API error');
      return response.json();
    },
    () => {
      // 回退:返回缓存数据
      return { data: 'cached' };
    }
  );
}

2. 带监控的电路断路器

interface CircuitBreakerMetrics {
  state: CircuitState;
  totalRequests: number;
  successfulRequests: number;
  failedRequests: number;
  rejectedRequests: number;
  averageResponseTime: number;
  lastStateChange: number;
}

class MonitoredCircuitBreaker extends CircuitBreaker {
  private metrics: CircuitBreakerMetrics = {
    state: CircuitState.CLOSED,
    totalRequests: 0,
    successfulRequests: 0,
    failedRequests: 0,
    rejectedRequests: 0,
    averageResponseTime: 0,
    lastStateChange: Date.now()
  };

  private responseTimes: number[] = [];

  async execute<T>(
    operation: () => Promise<T>,
    fallback?: () => T | Promise<T>
  ): Promise<T> {
    this.metrics.totalRequests++;

    if (this.getState() === CircuitState.OPEN) {
      this.metrics.rejectedRequests++;
    }

    const startTime = Date.now();

    try {
      const result = await super.execute(operation, fallback);

      this.metrics.successfulRequests++;
      this.recordResponseTime(Date.now() - startTime);

      return result;
    } catch (error) {
      this.metrics.failedRequests++;
      throw error;
    }
  }

  private recordResponseTime(time: number): void {
    this.responseTimes.push(time);

    // 只保留最后100个响应时间
    if (this.responseTimes.length > 100) {
      this.responseTimes.shift();
    }

    this.metrics.averageResponseTime =
      this.responseTimes.reduce((a, b) => a + b, 0) /
      this.responseTimes.length;
  }

  getMetrics(): CircuitBreakerMetrics {
    return {
      ...this.metrics,
      state: this.getState()
    };
  }
}

3. Opossum风格的电路断路器(Node.js)

import CircuitBreaker from 'opossum';

// 创建电路断路器
const options = {
  timeout: 3000, // 3秒
  errorThresholdPercentage: 50,
  resetTimeout: 30000, // 30秒
  rollingCountTimeout: 10000,
  rollingCountBuckets: 10,
  name: 'api-breaker'
};

const breaker = new CircuitBreaker(callExternalAPI, options);

// 事件处理器
breaker.on('open', () => {
  console.log('Circuit breaker opened');
});

breaker.on('halfOpen', () => {
  console.log('Circuit breaker half-opened');
});

breaker.on('close', () => {
  console.log('Circuit breaker closed');
});

breaker.on('success', (result) => {
  console.log('Request succeeded:', result);
});

breaker.on('failure', (error) => {
  console.error('Request failed:', error);
});

breaker.on('timeout', () => {
  console.error('Request timed out');
});

breaker.on('reject', () => {
  console.warn('Request rejected by circuit breaker');
});

// 回退
breaker.fallback(() => {
  return { data: 'fallback data' };
});

// 使用电路断路器
async function callExternalAPI() {
  const response = await fetch('https://api.example.com/data');
  if (!response.ok) throw new Error('API error');
  return response.json();
}

// 使用电路断路器执行
breaker.fire()
  .then(data => console.log(data))
  .catch(err => console.error(err));

4. Python 电路断路器

from enum import Enum
from typing import Callable, Optional, TypeVar, Generic
import time
import threading

T = TypeVar('T')

class CircuitState(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

class CircuitBreaker(Generic[T]):
    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 2,
        timeout: float = 3.0,
        reset_timeout: float = 60.0
    ):
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout
        self.reset_timeout = reset_timeout

        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.last_failure_time = None
        self.next_attempt = time.time()
        self.lock = threading.Lock()

    def call(
        self,
        func: Callable[[], T],
        fallback: Optional[Callable[[], T]] = None
    ) -> T:
        """带有电路断路器保护执行函数。"""
        with self.lock:
            if self.state == CircuitState.OPEN:
                if time.time() < self.next_attempt:
                    print("Circuit breaker OPEN")
                    if fallback:
                        return fallback()
                    raise Exception("Circuit breaker is OPEN")

                # 尝试恢复
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker entering HALF_OPEN")

        try:
            result = func()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            if fallback:
                return fallback()
            raise

    def _on_success(self):
        """处理成功请求。"""
        with self.lock:
            self.failures = 0
            self.successes += 1

            if self.state == CircuitState.HALF_OPEN:
                if self.successes >= self.success_threshold:
                    print("Circuit breaker CLOSED")
                    self.state = CircuitState.CLOSED
                    self.successes = 0

    def _on_failure(self):
        """处理失败请求。"""
        with self.lock:
            self.failures += 1
            self.successes = 0
            self.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                print("Circuit breaker OPEN after failed recovery")
                self._trip()
            elif self.failures >= self.failure_threshold:
                print(f"Circuit breaker OPEN after {self.failures} failures")
                self._trip()

    def _trip(self):
        """打开电路。"""
        self.state = CircuitState.OPEN
        self.next_attempt = time.time() + self.reset_timeout

    def get_state(self) -> CircuitState:
        """获取当前电路状态。"""
        return self.state

    def reset(self):
        """手动重置电路断路器。"""
        with self.lock:
            self.state = CircuitState.CLOSED
            self.failures = 0
            self.successes = 0


# 使用
import requests

breaker = CircuitBreaker(
    failure_threshold=5,
    success_threshold=2,
    timeout=3.0,
    reset_timeout=60.0
)

def call_api():
    response = requests.get('https://api.example.com/data', timeout=3)
    response.raise_for_status()
    return response.json()

def fallback():
    return {"data": "cached or default"}

# 带有电路断路器执行
try:
    result = breaker.call(call_api, fallback)
    print(result)
except Exception as e:
    print(f"Error: {e}")

5. Resilience4j风格(Java)

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.vavr.control.Try;

import java.time.Duration;
import java.util.function.Supplier;

public class CircuitBreakerExample {

    public static void main(String[] args) {
        // 创建电路断路器配置
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofMillis(30000))
            .permittedNumberOfCallsInHalfOpenState(2)
            .slidingWindowSize(10)
            .recordExceptions(Exception.class)
            .build();

        // 创建注册表
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);

        // 获取或创建电路断路器
        CircuitBreaker breaker = registry.circuitBreaker("apiBreaker");

        // 事件处理器
        breaker.getEventPublisher()
            .onStateTransition(event ->
                System.out.println("State: " + event.getStateTransition())
            )
            .onError(event ->
                System.out.println("Error: " + event.getThrowable())
            )
            .onSuccess(event ->
                System.out.println("Success")
            );

        // 装饰供应商
        Supplier<String> decoratedSupplier = CircuitBreaker
            .decorateSupplier(breaker, this::callExternalService);

        // 使用电路断路器执行
        Try<String> result = Try.of(decoratedSupplier::get)
            .recover(throwable -> "fallback");

        System.out.println(result.get());
    }

    private String callExternalService() {
        // 外部服务调用
        return "data";
    }
}

最佳实践

✅ 执行

  • 使用适合您用例的适当阈值
  • 实现回退机制
  • 监控断路器状态
  • 设置合理的超时
  • 使用指数退避
  • 记录状态转换
  • 对频繁的断路器跳闸发出警报
  • 测试断路器行为
  • 使用每个依赖项的断路器
  • 实施健康检查

❌ 不要

  • 为所有依赖项使用相同的断路器
  • 设置不切实际的阈值
  • 跳过回退实现
  • 忽略打开的断路器
  • 使用过于激进的重置超时
  • 忘记监控

资源