Python韧性编程模式Skill python-resilience

Python 韧性编程模式技能用于构建健壮的应用程序,通过自动重试、指数退避、超时和容错装饰器处理瞬态故障、网络问题和服务中断。适用于外部服务调用、微服务架构、故障恢复场景和系统设计,关键词包括Python、韧性、重试、超时、容错、故障处理、指数退避、微服务、网络可靠性。

后端开发 0 次安装 2 次浏览 更新于 3/22/2026

名称: python-韧性 描述: Python韧性模式包括自动重试、指数退避、超时和容错装饰器。用于添加重试逻辑、实现超时、构建容错服务或处理瞬态故障。

Python 韧性模式

构建容错的Python应用程序,优雅地处理瞬态故障、网络问题和服务中断。韧性模式确保系统在依赖不可靠时保持运行。

何时使用此技能

  • 为外部服务调用添加重试逻辑
  • 实现网络操作的超时
  • 构建容错微服务
  • 处理速率限制和反压
  • 创建基础设施装饰器
  • 设计断路器

核心概念

1. 瞬态故障与永久性故障

重试瞬态错误(如网络超时、临时服务问题)。不重试永久性错误(如无效凭证、错误请求)。

2. 指数退避

增加重试之间的等待时间,避免压垮正在恢复的服务。

3. 抖动

为退避添加随机性,防止多个客户端同时重试导致的惊群效应。

4. 有界重试

限制尝试次数和总持续时间,防止无限重试循环。

快速开始

from tenacity import retry, stop_after_attempt, wait_exponential_jitter

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
    return httpx.post("https://api.example.com", json=request).json()

基本模式

模式 1: 使用 Tenacity 的基本重试

使用 tenacity 库实现生产级重试逻辑。对于简单情况,可考虑内置重试功能或轻量级自定义实现。

from tenacity import (
    retry,
    stop_after_attempt,
    stop_after_delay,
    wait_exponential_jitter,
    retry_if_exception_type,
)

TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)

@retry(
    retry=retry_if_exception_type(TRANSIENT_ERRORS),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
    """在瞬态故障时自动重试获取数据。"""
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

模式 2: 仅重试适当错误

白名单特定的瞬态异常。从不重试:

  • ValueError, TypeError - 这些是错误,非瞬态问题
  • AuthenticationError - 无效凭证不会变得有效
  • HTTP 4xx 错误(除 429 外) - 客户端错误是永久性的
from tenacity import retry, retry_if_exception_type
import httpx

# 定义可重试的异常
RETRYABLE_EXCEPTIONS = (
    ConnectionError,
    TimeoutError,
    httpx.ConnectTimeout,
    httpx.ReadTimeout,
)

@retry(
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def resilient_api_call(endpoint: str) -> dict:
    """在网络问题时进行API调用并重试。"""
    return httpx.get(endpoint, timeout=10).json()

模式 3: HTTP 状态码重试

重试指示瞬态问题的特定HTTP状态码。

from tenacity import retry, retry_if_result, stop_after_attempt
import httpx

RETRY_STATUS_CODES = {429, 502, 503, 504}

def should_retry_response(response: httpx.Response) -> bool:
    """检查响应是否指示可重试错误。"""
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=retry_if_result(should_retry_response),
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
    """在瞬态状态码时进行HTTP请求并重试。"""
    return httpx.request(method, url, timeout=30, **kwargs)

模式 4: 结合异常和状态重试

处理网络异常和HTTP状态码。

from tenacity import (
    retry,
    retry_if_exception_type,
    retry_if_result,
    stop_after_attempt,
    wait_exponential_jitter,
    before_sleep_log,
)
import logging
import httpx

logger = logging.getLogger(__name__)

TRANSIENT_EXCEPTIONS = (
    ConnectionError,
    TimeoutError,
    httpx.ConnectError,
    httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}

def is_retryable_response(response: httpx.Response) -> bool:
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=(
        retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
        retry_if_result(is_retryable_response)
    ),
    stop=stop_after_attempt(5),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
    method: str,
    url: str,
    **kwargs,
) -> httpx.Response:
    """具有全面重试处理的HTTP调用。"""
    return httpx.request(method, url, timeout=30, **kwargs)

高级模式

模式 5: 记录重试尝试

跟踪重试行为以进行调试和告警。

from tenacity import retry, stop_after_attempt, wait_exponential
import structlog

logger = structlog.get_logger()

def log_retry_attempt(retry_state):
    """记录详细的重试信息。"""
    exception = retry_state.outcome.exception()
    logger.warning(
        "重试操作",
        attempt=retry_state.attempt_number,
        exception_type=type(exception).__name__,
        exception_message=str(exception),
        next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, max=10),
    before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
    """带有重试记录的外部调用。"""
    ...

模式 6: 超时装饰器

创建可重用的超时装饰器以实现一致的超时处理。

import asyncio
from functools import wraps
from typing import TypeVar, Callable

T = TypeVar("T")

def with_timeout(seconds: float):
    """为异步函数添加超时的装饰器。"""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            return await asyncio.wait_for(
                func(*args, **kwargs),
                timeout=seconds,
            )
        return wrapper
    return decorator

@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
    """使用30秒超时获取URL。"""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

模式 7: 通过装饰器实现横切关注点

堆叠装饰器以将基础设施与业务逻辑分离。

from functools import wraps
from typing import TypeVar, Callable
import structlog

logger = structlog.get_logger()
T = TypeVar("T")

def traced(name: str | None = None):
    """为函数调用添加追踪。"""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        span_name = name or func.__name__

        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            logger.info("操作开始", operation=span_name)
            try:
                result = await func(*args, **kwargs)
                logger.info("操作完成", operation=span_name)
                return result
            except Exception as e:
                logger.error("操作失败", operation=span_name, error=str(e))
                raise
        return wrapper
    return decorator

# 堆叠多个关注点
@traced("fetch_user_data")
@with_timeout(30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
async def fetch_user_data(user_id: str) -> dict:
    """使用追踪、超时和重试获取用户数据。"""
    ...

模式 8: 依赖注入以提高可测试性

通过构造函数传递基础设施组件以方便测试。

from dataclasses import dataclass
from typing import Protocol

class Logger(Protocol):
    def info(self, msg: str, **kwargs) -> None: ...
    def error(self, msg: str, **kwargs) -> None: ...

class MetricsClient(Protocol):
    def increment(self, metric: str, tags: dict | None = None) -> None: ...
    def timing(self, metric: str, value: float) -> None: ...

@dataclass
class UserService:
    """具有注入基础设施的服务。"""

    repository: UserRepository
    logger: Logger
    metrics: MetricsClient

    async def get_user(self, user_id: str) -> User:
        self.logger.info("获取用户", user_id=user_id)
        start = time.perf_counter()

        try:
            user = await self.repository.get(user_id)
            self.metrics.increment("user.fetch.success")
            return user
        except Exception as e:
            self.metrics.increment("user.fetch.error")
            self.logger.error("获取用户失败", user_id=user_id, error=str(e))
            raise
        finally:
            elapsed = time.perf_counter() - start
            self.metrics.timing("user.fetch.duration", elapsed)

# 使用假组件易于测试
service = UserService(
    repository=FakeRepository(),
    logger=FakeLogger(),
    metrics=FakeMetrics(),
)

模式 9: 故障安全默认值

当非关键操作失败时优雅降级。

from typing import TypeVar
from collections.abc import Callable

T = TypeVar("T")

def fail_safe(default: T, log_failure: bool = True):
    """在失败时返回默认值而不是引发异常。"""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if log_failure:
                    logger.warning(
                        "操作失败,使用默认值",
                        function=func.__name__,
                        error=str(e),
                    )
                return default
        return wrapper
    return decorator

@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
    """获取推荐,失败时返回空列表。"""
    ...

最佳实践总结

  1. 仅重试瞬态错误 - 不重试错误或认证失败
  2. 使用指数退避 - 给服务时间恢复
  3. 添加抖动 - 防止同步重试导致的惊群效应
  4. 限制总持续时间 - stop_after_attempt(5) | stop_after_delay(60)
  5. 记录每次重试 - 静默重试隐藏系统性问题
  6. 使用装饰器 - 保持重试逻辑与业务逻辑分离
  7. 注入依赖 - 使基础设施可测试
  8. 随处设置超时 - 每个网络调用都需要超时
  9. 优雅降级 - 对非关键路径返回缓存/默认值
  10. 监控重试率 - 高重试率指示潜在问题