错误处理Skill error-handling

错误处理技能用于软件系统中的错误管理,包括设计错误类型、实施恢复策略(如重试和断路器)、提供用户友好错误消息等,以提高系统可靠性和安全性。关键词:错误处理、异常处理、恢复策略、重试逻辑、回退机制、优雅降级、断路器、API错误响应、数据管道错误处理。

架构设计 0 次安装 0 次浏览 更新于 3/24/2026

name: error-handling description: 全面的错误处理模式和策略,包括Rust的Result/Option、API错误响应、数据管道错误处理和安全性感知的错误处理。用于实现异常处理、错误恢复、重试逻辑、断路器、回退机制、优雅降级或设计错误层次结构。触发词:error, exception, try, catch, throw, raise, Result, Option, panic, recover, retry, fallback, graceful degradation, circuit breaker, error boundary, 500, 4xx, 5xx, thiserror, anyhow, RFC 7807, error propagation, error messages, stack trace。

错误处理

概述

错误处理是稳健软件开发的关键方面。本技能涵盖错误类型和层次结构、恢复策略、传播模式、用户友好消息、上下文日志记录和语言特定实现(Rust、Python、TypeScript)。

代理委托

  • senior-software-engineer (Opus) - 错误架构设计、选择错误策略
  • software-engineer (Sonnet) - 实现错误处理模式
  • senior-software-engineer (Opus) - 安全错误处理(无信息泄漏、消毒)
  • senior-software-engineer (Opus) - 基础设施错误处理(重试、断路器)

指令

1. 设计错误层次结构

创建结构化的错误类型,提供清晰的分类。

Rust: thiserror 和 anyhow

// 使用 thiserror 用于库错误
use thiserror::Error;

#[derive(Error, Debug)]
pub enum AppError {
    #[error("validation failed: {0}")]
    Validation(String),

    #[error("resource not found: {resource_type} with id {id}")]
    NotFound {
        resource_type: String,
        id: String,
    },

    #[error("database error")]
    Database(#[from] sqlx::Error),

    #[error("IO error")]
    Io(#[from] std::io::Error),

    #[error("external service error: {service}")]
    ExternalService {
        service: String,
        #[source]
        source: Box<dyn std::error::Error + Send + Sync>,
    },
}

// 使用 anyhow 用于应用程序错误
use anyhow::{Context, Result};

fn process_order(order_id: &str) -> Result<Order> {
    let order = fetch_order(order_id)
        .context("Failed to fetch order from database")?;

    validate_order(&order)
        .context(format!("Order {} validation failed", order_id))?;

    Ok(order)
}

Python

# Python 示例
class AppError(Exception):
    """基础应用程序错误"""
    def __init__(self, message: str, code: str, details: dict = None):
        self.message = message
        self.code = code
        self.details = details or {}
        super().__init__(message)

class ValidationError(AppError):
    """输入验证错误"""
    pass

class NotFoundError(AppError):
    """资源未找到错误"""
    pass

class ServiceError(AppError):
    """外部服务错误"""
    pass

TypeScript

class AppError extends Error {
  constructor(
    message: string,
    public code: string,
    public details?: Record<string, unknown>,
  ) {
    super(message);
    this.name = this.constructor.name;
  }
}

class ValidationError extends AppError {}
class NotFoundError extends AppError {}
class ServiceError extends AppError {}

2. 实施恢复策略

重试与指数退避

Rust
use std::time::Duration;
use tokio::time::sleep;
use rand::Rng;

pub async fn retry_with_backoff<T, E, F, Fut>(
    mut operation: F,
    max_retries: u32,
    base_delay_ms: u64,
    max_delay_ms: u64,
) -> Result<T, E>
where
    F: FnMut() -> Fut,
    Fut: std::future::Future<Output = Result<T, E>>,
    E: std::fmt::Display,
{
    let mut attempts = 0;

    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if attempts >= max_retries => return Err(e),
            Err(e) => {
                let delay = std::cmp::min(
                    base_delay_ms * 2_u64.pow(attempts),
                    max_delay_ms
                );
                let jitter = rand::thread_rng().gen_range(0..delay / 10);
                let total_delay = delay + jitter;

                tracing::warn!(
                    "Attempt {}/{} failed: {}. Retrying in {}ms",
                    attempts + 1,
                    max_retries,
                    e,
                    total_delay
                );

                sleep(Duration::from_millis(total_delay)).await;
                attempts += 1;
            }
        }
    }
}

// 使用示例
let result = retry_with_backoff(
    || async { fetch_from_api().await },
    3,
    1000,
    30000,
).await?;
Python
import asyncio
from typing import TypeVar, Callable
import random

T = TypeVar('T')

async def retry_with_backoff(
    operation: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    retryable_exceptions: tuple = (ServiceError,)
) -> T:
    """重试操作,使用指数退避和抖动。"""
    for attempt in range(max_retries + 1):
        try:
            return await operation()
        except retryable_exceptions as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            await asyncio.sleep(delay + jitter)

断路器

import time
from enum import Enum
from dataclasses import dataclass

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3

    def __post_init__(self):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0

    def call(self, operation):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitOpenError("Circuit is open")

        try:
            result = operation()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = CircuitState.CLOSED
        self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

回退模式

async function withFallback<T>(
  primary: () => Promise<T>,
  fallback: () => Promise<T>,
  shouldFallback: (error: Error) => boolean = () => true,
): Promise<T> {
  try {
    return await primary();
  } catch (error) {
    if (shouldFallback(error as Error)) {
      return await fallback();
    }
    throw error;
  }
}

// 使用示例
const data = await withFallback(
  () => fetchFromPrimaryAPI(),
  () => fetchFromCache(),
  (error) => error instanceof ServiceError,
);

3. 错误传播模式

包装和丰富错误

def process_order(order_id: str) -> Order:
    try:
        order = fetch_order(order_id)
        validate_order(order)
        return process(order)
    except DatabaseError as e:
        raise ServiceError(
            message="Failed to process order",
            code="ORDER_PROCESSING_FAILED",
            details={"order_id": order_id, "original_error": str(e)}
        ) from e

结果类型(Rust 风格)

from dataclasses import dataclass
from typing import Generic, TypeVar, Union

T = TypeVar('T')
E = TypeVar('E')

@dataclass
class Ok(Generic[T]):
    value: T

@dataclass
class Err(Generic[E]):
    error: E

Result = Union[Ok[T], Err[E]]

def divide(a: float, b: float) -> Result[float, str]:
    if b == 0:
        return Err("Division by zero")
    return Ok(a / b)

# 使用示例
result = divide(10, 0)
match result:
    case Ok(value):
        print(f"Result: {value}")
    case Err(error):
        print(f"Error: {error}")

4. 用户友好错误消息

ERROR_MESSAGES = {
    "VALIDATION_FAILED": "请检查您的输入并重试。",
    "NOT_FOUND": "请求的项目未找到。",
    "SERVICE_UNAVAILABLE": "服务暂时不可用,请稍后重试。",
    "UNAUTHORIZED": "请登录以继续。",
    "FORBIDDEN": "您没有权限执行此操作。",
}

def get_user_message(error: AppError) -> str:
    """将内部错误转换为用户友好消息。"""
    return ERROR_MESSAGES.get(error.code, "发生意外错误,请重试。")

def format_error_response(error: AppError, include_details: bool = False) -> dict:
    """格式化错误以供API响应。"""
    response = {
        "error": {
            "code": error.code,
            "message": get_user_message(error)
        }
    }
    if include_details and error.details:
        response["error"]["details"] = error.details
    return response

5. 带上下文的错误日志记录

import logging
import traceback
from contextvars import ContextVar

request_id: ContextVar[str] = ContextVar('request_id', default='unknown')

def log_error(error: Exception, context: dict = None):
    """记录错误及完整上下文。"""
    logger = logging.getLogger(__name__)

    error_context = {
        "request_id": request_id.get(),
        "error_type": type(error).__name__,
        "error_message": str(error),
        "stack_trace": traceback.format_exc(),
        **(context or {})
    }

    if isinstance(error, AppError):
        error_context["error_code"] = error.code
        error_context["error_details"] = error.details

    logger.error(
        f"Error occurred: {error}",
        extra={"structured_data": error_context}
    )

最佳实践

  1. 快速失败:尽早验证输入并立即抛出错误,而不是继续使用无效数据。

  2. 具体明确:创建特定的错误类型,而非使用通用异常。这有助于更好的处理和调试。

  3. 保留上下文:包装错误时,始终使用如Python中的from e或其他语言中的cause机制保留原始错误链。

  4. 不要忽略错误:避免空的捕获块。至少记录错误。

  5. 区分可恢复与不可恢复:设计错误层次结构以清楚指示哪些错误可以重试。

  6. 使用适当的恢复策略

    • 重试:用于暂时性故障(网络超时、速率限制)
    • 回退:当存在替代方案时(缓存、默认值)
    • 断路器:防止级联故障
  7. 清理用户面向消息:切勿向用户暴露内部错误详情、堆栈跟踪或敏感信息。

  8. 在边界处记录:当错误跨越系统边界(API端点、服务调用)时记录错误。

示例

API端点中的完整错误处理

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(AppError)
async def app_error_handler(request: Request, error: AppError):
    log_error(error, {"path": request.url.path, "method": request.method})

    status_codes = {
        ValidationError: 400,
        NotFoundError: 404,
        ServiceError: 503,
    }

    status_code = status_codes.get(type(error), 500)
    return JSONResponse(
        status_code=status_code,
        content=format_error_response(error)
    )

@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    circuit_breaker = get_circuit_breaker("order_service")

    async def fetch():
        return await order_service.get(order_id)

    try:
        return await retry_with_backoff(
            lambda: circuit_breaker.call(fetch),
            max_retries=3,
            retryable_exceptions=(ServiceError,)
        )
    except CircuitOpenError:
        # 回退到缓存
        cached = await cache.get(f"order:{order_id}")
        if cached:
            return cached
        raise ServiceError(
            message="Order service unavailable",
            code="SERVICE_UNAVAILABLE"
        )

React中的错误边界

import React, { Component, ErrorInfo, ReactNode } from "react";

interface Props {
  children: ReactNode;
  fallback: ReactNode;
}

interface State {
  hasError: boolean;
  error?: Error;
}

class ErrorBoundary extends Component<Props, State> {
  state: State = { hasError: false };

  static getDerivedStateFromError(error: Error): State {
    return { hasError: true, error };
  }

  componentDidCatch(error: Error, errorInfo: ErrorInfo) {
    console.error("Error boundary caught:", error, errorInfo);
    // 发送到错误跟踪服务
    errorTracker.captureException(error, { extra: errorInfo });
  }

  render() {
    if (this.state.hasError) {
      return this.props.fallback;
    }
    return this.props.children;
  }
}

API错误响应(RFC 7807)

RFC 7807定义了HTTP API问题详情标准格式。

Rust实现

use serde::{Deserialize, Serialize};
use axum::{
    http::StatusCode,
    response::{IntoResponse, Response},
    Json,
};

#[derive(Serialize, Deserialize, Debug)]
pub struct ProblemDetails {
    #[serde(rename = "type")]
    pub type_uri: String,
    pub title: String,
    pub status: u16,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub detail: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub instance: Option<String>,
}

impl ProblemDetails {
    pub fn new(status: StatusCode, title: impl Into<String>) -> Self {
        Self {
            type_uri: format!("about:blank"),
            title: title.into(),
            status: status.as_u16(),
            detail: None,
            instance: None,
        }
    }

    pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
        self.detail = Some(detail.into());
        self
    }
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, title, detail) = match self {
            AppError::Validation(msg) => (
                StatusCode::BAD_REQUEST,
                "Validation Failed",
                Some(msg),
            ),
            AppError::NotFound { resource_type, id } => (
                StatusCode::NOT_FOUND,
                "Resource Not Found",
                Some(format!("{} with id {} not found", resource_type, id)),
            ),
            AppError::Database(_) | AppError::Io(_) => (
                StatusCode::INTERNAL_SERVER_ERROR,
                "Internal Server Error",
                None, // 绝不暴露内部错误
            ),
            AppError::ExternalService { service, .. } => (
                StatusCode::SERVICE_UNAVAILABLE,
                "Service Unavailable",
                Some(format!("{} is temporarily unavailable", service)),
            ),
        };

        let mut problem = ProblemDetails::new(status, title);
        if let Some(d) = detail {
            problem = problem.with_detail(d);
        }

        (status, Json(problem)).into_response()
    }
}

Python(FastAPI)

from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

class ProblemDetails(BaseModel):
    type: str = "about:blank"
    title: str
    status: int
    detail: str | None = None
    instance: str | None = None

@app.exception_handler(AppError)
async def app_error_handler(request: Request, error: AppError):
    status_map = {
        ValidationError: 400,
        NotFoundError: 404,
        ServiceError: 503,
    }

    status = status_map.get(type(error), 500)

    problem = ProblemDetails(
        title=error.__class__.__name__,
        status=status,
        detail=str(error) if status < 500 else None,  # 隐藏内部信息
        instance=str(request.url.path),
    )

    return JSONResponse(
        status_code=status,
        content=problem.model_dump(exclude_none=True),
        headers={"Content-Type": "application/problem+json"},
    )

数据管道错误处理

数据管道需要特殊错误处理,以处理部分失败和数据质量问题。

Rust管道与错误收集

use std::collections::HashMap;

#[derive(Debug)]
pub struct ProcessingResult<T> {
    pub successful: Vec<T>,
    pub failed: Vec<FailedItem>,
}

#[derive(Debug)]
pub struct FailedItem {
    pub index: usize,
    pub error: String,
    pub record: serde_json::Value,
}

pub async fn process_batch<T, F, Fut>(
    items: Vec<serde_json::Value>,
    processor: F,
) -> ProcessingResult<T>
where
    F: Fn(serde_json::Value) -> Fut,
    Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
    let mut successful = Vec::new();
    let mut failed = Vec::new();

    for (index, item) in items.into_iter().enumerate() {
        match processor(item.clone()).await {
            Ok(result) => successful.push(result),
            Err(e) => {
                tracing::error!("Failed to process item {}: {}", index, e);
                failed.push(FailedItem {
                    index,
                    error: e.to_string(),
                    record: item,
                });
            }
        }
    }

    ProcessingResult { successful, failed }
}

// 使用死信队列
let result = process_batch(records, |record| async {
    validate_and_transform(record).await
}).await;

if !result.failed.is_empty() {
    dead_letter_queue.send(result.failed).await?;
}

Python ETL错误处理

from dataclasses import dataclass
from typing import TypeVar, Callable, Generic
import logging

T = TypeVar('T')

@dataclass
class ProcessingResult(Generic[T]):
    successful: list[T]
    failed: list[dict]
    error_summary: dict[str, int]

def process_with_error_tracking(
    items: list[dict],
    processor: Callable[[dict], T],
    continue_on_error: bool = True,
) -> ProcessingResult[T]:
    successful = []
    failed = []
    error_counts = {}

    for index, item in enumerate(items):
        try:
            result = processor(item)
            successful.append(result)
        except Exception as e:
            error_type = type(e).__name__
            error_counts[error_type] = error_counts.get(error_type, 0) + 1

            logging.error(f"Failed to process item {index}: {e}")
            failed.append({
                "index": index,
                "item": item,
                "error": str(e),
                "error_type": error_type,
            })

            if not continue_on_error:
                raise

    return ProcessingResult(
        successful=successful,
        failed=failed,
        error_summary=error_counts,
    )

安全性感知的错误处理

防止通过错误消息和堆栈跟踪泄漏信息。

生产与开发错误详情

use std::env;

pub struct ErrorResponse {
    pub message: String,
    pub details: Option<serde_json::Value>,
}

impl From<AppError> for ErrorResponse {
    fn from(error: AppError) -> Self {
        let is_production = env::var("ENVIRONMENT")
            .unwrap_or_default()
            .to_lowercase() == "production";

        let message = match &error {
            AppError::Validation(msg) => msg.clone(),
            AppError::NotFound { .. } => "Resource not found".to_string(),
            _ => "An error occurred".to_string(),
        };

        let details = if is_production {
            None // 绝不暴露堆栈跟踪或内部详情
        } else {
            Some(serde_json::json!({
                "error_type": format!("{:?}", error),
                "backtrace": std::backtrace::Backtrace::capture().to_string(),
            }))
        };

        ErrorResponse { message, details }
    }
}

记录前清理错误

import re
import os

SENSITIVE_PATTERNS = [
    r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 电子邮件
    r'\b\d{3}-\d{2}-\d{4}\b',  # 社会安全号码
    r'\b(?:\d{4}[-\s]?){3}\d{4}\b',  # 信用卡
    r'password["\']?\s*[:=]\s*["\']?[\w!@#$%^&*]+',  # 密码
]

def sanitize_error_message(message: str) -> str:
    """记录前从错误消息中移除敏感数据。"""
    sanitized = message
    for pattern in SENSITIVE_PATTERNS:
        sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
    return sanitized

def log_error_safely(error: Exception, context: dict = None):
    """记录错误,使用清理后的消息。"""
    sanitized_message = sanitize_error_message(str(error))

    logger.error(
        sanitized_message,
        extra={
            "error_type": type(error).__name__,
            "context": context or {},
            "include_stacktrace": os.getenv("ENVIRONMENT") != "production",
        }
    )

速率限制错误响应

use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

pub struct RateLimitedErrorLogger {
    last_logged: Mutex<HashMap<String, Instant>>,
    min_interval: Duration,
}

impl RateLimitedErrorLogger {
    pub fn new(min_interval: Duration) -> Self {
        Self {
            last_logged: Mutex::new(HashMap::new()),
            min_interval,
        }
    }

    pub async fn log_if_allowed(&self, error_key: &str, error: &dyn std::error::Error) {
        let mut last_logged = self.last_logged.lock().await;
        let now = Instant::now();

        if let Some(last_time) = last_logged.get(error_key) {
            if now.duration_since(*last_time) < self.min_interval {
                return; // 跳过记录,防止日志泛滥
            }
        }

        tracing::error!("Error occurred: {}", error);
        last_logged.insert(error_key.to_string(), now);
    }
}