name: error-handling description: 全面的错误处理模式和策略,包括Rust的Result/Option、API错误响应、数据管道错误处理和安全性感知的错误处理。用于实现异常处理、错误恢复、重试逻辑、断路器、回退机制、优雅降级或设计错误层次结构。触发词:error, exception, try, catch, throw, raise, Result, Option, panic, recover, retry, fallback, graceful degradation, circuit breaker, error boundary, 500, 4xx, 5xx, thiserror, anyhow, RFC 7807, error propagation, error messages, stack trace。
错误处理
概述
错误处理是稳健软件开发的关键方面。本技能涵盖错误类型和层次结构、恢复策略、传播模式、用户友好消息、上下文日志记录和语言特定实现(Rust、Python、TypeScript)。
代理委托
- senior-software-engineer (Opus) - 错误架构设计、选择错误策略
- software-engineer (Sonnet) - 实现错误处理模式
- senior-software-engineer (Opus) - 安全错误处理(无信息泄漏、消毒)
- senior-software-engineer (Opus) - 基础设施错误处理(重试、断路器)
指令
1. 设计错误层次结构
创建结构化的错误类型,提供清晰的分类。
Rust: thiserror 和 anyhow
// 使用 thiserror 用于库错误
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("validation failed: {0}")]
Validation(String),
#[error("resource not found: {resource_type} with id {id}")]
NotFound {
resource_type: String,
id: String,
},
#[error("database error")]
Database(#[from] sqlx::Error),
#[error("IO error")]
Io(#[from] std::io::Error),
#[error("external service error: {service}")]
ExternalService {
service: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
// 使用 anyhow 用于应用程序错误
use anyhow::{Context, Result};
fn process_order(order_id: &str) -> Result<Order> {
let order = fetch_order(order_id)
.context("Failed to fetch order from database")?;
validate_order(&order)
.context(format!("Order {} validation failed", order_id))?;
Ok(order)
}
Python
# Python 示例
class AppError(Exception):
"""基础应用程序错误"""
def __init__(self, message: str, code: str, details: dict = None):
self.message = message
self.code = code
self.details = details or {}
super().__init__(message)
class ValidationError(AppError):
"""输入验证错误"""
pass
class NotFoundError(AppError):
"""资源未找到错误"""
pass
class ServiceError(AppError):
"""外部服务错误"""
pass
TypeScript
class AppError extends Error {
constructor(
message: string,
public code: string,
public details?: Record<string, unknown>,
) {
super(message);
this.name = this.constructor.name;
}
}
class ValidationError extends AppError {}
class NotFoundError extends AppError {}
class ServiceError extends AppError {}
2. 实施恢复策略
重试与指数退避
Rust
use std::time::Duration;
use tokio::time::sleep;
use rand::Rng;
pub async fn retry_with_backoff<T, E, F, Fut>(
mut operation: F,
max_retries: u32,
base_delay_ms: u64,
max_delay_ms: u64,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut attempts = 0;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if attempts >= max_retries => return Err(e),
Err(e) => {
let delay = std::cmp::min(
base_delay_ms * 2_u64.pow(attempts),
max_delay_ms
);
let jitter = rand::thread_rng().gen_range(0..delay / 10);
let total_delay = delay + jitter;
tracing::warn!(
"Attempt {}/{} failed: {}. Retrying in {}ms",
attempts + 1,
max_retries,
e,
total_delay
);
sleep(Duration::from_millis(total_delay)).await;
attempts += 1;
}
}
}
}
// 使用示例
let result = retry_with_backoff(
|| async { fetch_from_api().await },
3,
1000,
30000,
).await?;
Python
import asyncio
from typing import TypeVar, Callable
import random
T = TypeVar('T')
async def retry_with_backoff(
operation: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (ServiceError,)
) -> T:
"""重试操作,使用指数退避和抖动。"""
for attempt in range(max_retries + 1):
try:
return await operation()
except retryable_exceptions as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
断路器
import time
from enum import Enum
from dataclasses import dataclass
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
def __post_init__(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def call(self, operation):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitOpenError("Circuit is open")
try:
result = operation()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
回退模式
async function withFallback<T>(
primary: () => Promise<T>,
fallback: () => Promise<T>,
shouldFallback: (error: Error) => boolean = () => true,
): Promise<T> {
try {
return await primary();
} catch (error) {
if (shouldFallback(error as Error)) {
return await fallback();
}
throw error;
}
}
// 使用示例
const data = await withFallback(
() => fetchFromPrimaryAPI(),
() => fetchFromCache(),
(error) => error instanceof ServiceError,
);
3. 错误传播模式
包装和丰富错误
def process_order(order_id: str) -> Order:
try:
order = fetch_order(order_id)
validate_order(order)
return process(order)
except DatabaseError as e:
raise ServiceError(
message="Failed to process order",
code="ORDER_PROCESSING_FAILED",
details={"order_id": order_id, "original_error": str(e)}
) from e
结果类型(Rust 风格)
from dataclasses import dataclass
from typing import Generic, TypeVar, Union
T = TypeVar('T')
E = TypeVar('E')
@dataclass
class Ok(Generic[T]):
value: T
@dataclass
class Err(Generic[E]):
error: E
Result = Union[Ok[T], Err[E]]
def divide(a: float, b: float) -> Result[float, str]:
if b == 0:
return Err("Division by zero")
return Ok(a / b)
# 使用示例
result = divide(10, 0)
match result:
case Ok(value):
print(f"Result: {value}")
case Err(error):
print(f"Error: {error}")
4. 用户友好错误消息
ERROR_MESSAGES = {
"VALIDATION_FAILED": "请检查您的输入并重试。",
"NOT_FOUND": "请求的项目未找到。",
"SERVICE_UNAVAILABLE": "服务暂时不可用,请稍后重试。",
"UNAUTHORIZED": "请登录以继续。",
"FORBIDDEN": "您没有权限执行此操作。",
}
def get_user_message(error: AppError) -> str:
"""将内部错误转换为用户友好消息。"""
return ERROR_MESSAGES.get(error.code, "发生意外错误,请重试。")
def format_error_response(error: AppError, include_details: bool = False) -> dict:
"""格式化错误以供API响应。"""
response = {
"error": {
"code": error.code,
"message": get_user_message(error)
}
}
if include_details and error.details:
response["error"]["details"] = error.details
return response
5. 带上下文的错误日志记录
import logging
import traceback
from contextvars import ContextVar
request_id: ContextVar[str] = ContextVar('request_id', default='unknown')
def log_error(error: Exception, context: dict = None):
"""记录错误及完整上下文。"""
logger = logging.getLogger(__name__)
error_context = {
"request_id": request_id.get(),
"error_type": type(error).__name__,
"error_message": str(error),
"stack_trace": traceback.format_exc(),
**(context or {})
}
if isinstance(error, AppError):
error_context["error_code"] = error.code
error_context["error_details"] = error.details
logger.error(
f"Error occurred: {error}",
extra={"structured_data": error_context}
)
最佳实践
-
快速失败:尽早验证输入并立即抛出错误,而不是继续使用无效数据。
-
具体明确:创建特定的错误类型,而非使用通用异常。这有助于更好的处理和调试。
-
保留上下文:包装错误时,始终使用如Python中的
from e或其他语言中的cause机制保留原始错误链。 -
不要忽略错误:避免空的捕获块。至少记录错误。
-
区分可恢复与不可恢复:设计错误层次结构以清楚指示哪些错误可以重试。
-
使用适当的恢复策略:
- 重试:用于暂时性故障(网络超时、速率限制)
- 回退:当存在替代方案时(缓存、默认值)
- 断路器:防止级联故障
-
清理用户面向消息:切勿向用户暴露内部错误详情、堆栈跟踪或敏感信息。
-
在边界处记录:当错误跨越系统边界(API端点、服务调用)时记录错误。
示例
API端点中的完整错误处理
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(AppError)
async def app_error_handler(request: Request, error: AppError):
log_error(error, {"path": request.url.path, "method": request.method})
status_codes = {
ValidationError: 400,
NotFoundError: 404,
ServiceError: 503,
}
status_code = status_codes.get(type(error), 500)
return JSONResponse(
status_code=status_code,
content=format_error_response(error)
)
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
circuit_breaker = get_circuit_breaker("order_service")
async def fetch():
return await order_service.get(order_id)
try:
return await retry_with_backoff(
lambda: circuit_breaker.call(fetch),
max_retries=3,
retryable_exceptions=(ServiceError,)
)
except CircuitOpenError:
# 回退到缓存
cached = await cache.get(f"order:{order_id}")
if cached:
return cached
raise ServiceError(
message="Order service unavailable",
code="SERVICE_UNAVAILABLE"
)
React中的错误边界
import React, { Component, ErrorInfo, ReactNode } from "react";
interface Props {
children: ReactNode;
fallback: ReactNode;
}
interface State {
hasError: boolean;
error?: Error;
}
class ErrorBoundary extends Component<Props, State> {
state: State = { hasError: false };
static getDerivedStateFromError(error: Error): State {
return { hasError: true, error };
}
componentDidCatch(error: Error, errorInfo: ErrorInfo) {
console.error("Error boundary caught:", error, errorInfo);
// 发送到错误跟踪服务
errorTracker.captureException(error, { extra: errorInfo });
}
render() {
if (this.state.hasError) {
return this.props.fallback;
}
return this.props.children;
}
}
API错误响应(RFC 7807)
RFC 7807定义了HTTP API问题详情标准格式。
Rust实现
use serde::{Deserialize, Serialize};
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
#[derive(Serialize, Deserialize, Debug)]
pub struct ProblemDetails {
#[serde(rename = "type")]
pub type_uri: String,
pub title: String,
pub status: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub instance: Option<String>,
}
impl ProblemDetails {
pub fn new(status: StatusCode, title: impl Into<String>) -> Self {
Self {
type_uri: format!("about:blank"),
title: title.into(),
status: status.as_u16(),
detail: None,
instance: None,
}
}
pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
self.detail = Some(detail.into());
self
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, title, detail) = match self {
AppError::Validation(msg) => (
StatusCode::BAD_REQUEST,
"Validation Failed",
Some(msg),
),
AppError::NotFound { resource_type, id } => (
StatusCode::NOT_FOUND,
"Resource Not Found",
Some(format!("{} with id {} not found", resource_type, id)),
),
AppError::Database(_) | AppError::Io(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
"Internal Server Error",
None, // 绝不暴露内部错误
),
AppError::ExternalService { service, .. } => (
StatusCode::SERVICE_UNAVAILABLE,
"Service Unavailable",
Some(format!("{} is temporarily unavailable", service)),
),
};
let mut problem = ProblemDetails::new(status, title);
if let Some(d) = detail {
problem = problem.with_detail(d);
}
(status, Json(problem)).into_response()
}
}
Python(FastAPI)
from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class ProblemDetails(BaseModel):
type: str = "about:blank"
title: str
status: int
detail: str | None = None
instance: str | None = None
@app.exception_handler(AppError)
async def app_error_handler(request: Request, error: AppError):
status_map = {
ValidationError: 400,
NotFoundError: 404,
ServiceError: 503,
}
status = status_map.get(type(error), 500)
problem = ProblemDetails(
title=error.__class__.__name__,
status=status,
detail=str(error) if status < 500 else None, # 隐藏内部信息
instance=str(request.url.path),
)
return JSONResponse(
status_code=status,
content=problem.model_dump(exclude_none=True),
headers={"Content-Type": "application/problem+json"},
)
数据管道错误处理
数据管道需要特殊错误处理,以处理部分失败和数据质量问题。
Rust管道与错误收集
use std::collections::HashMap;
#[derive(Debug)]
pub struct ProcessingResult<T> {
pub successful: Vec<T>,
pub failed: Vec<FailedItem>,
}
#[derive(Debug)]
pub struct FailedItem {
pub index: usize,
pub error: String,
pub record: serde_json::Value,
}
pub async fn process_batch<T, F, Fut>(
items: Vec<serde_json::Value>,
processor: F,
) -> ProcessingResult<T>
where
F: Fn(serde_json::Value) -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
let mut successful = Vec::new();
let mut failed = Vec::new();
for (index, item) in items.into_iter().enumerate() {
match processor(item.clone()).await {
Ok(result) => successful.push(result),
Err(e) => {
tracing::error!("Failed to process item {}: {}", index, e);
failed.push(FailedItem {
index,
error: e.to_string(),
record: item,
});
}
}
}
ProcessingResult { successful, failed }
}
// 使用死信队列
let result = process_batch(records, |record| async {
validate_and_transform(record).await
}).await;
if !result.failed.is_empty() {
dead_letter_queue.send(result.failed).await?;
}
Python ETL错误处理
from dataclasses import dataclass
from typing import TypeVar, Callable, Generic
import logging
T = TypeVar('T')
@dataclass
class ProcessingResult(Generic[T]):
successful: list[T]
failed: list[dict]
error_summary: dict[str, int]
def process_with_error_tracking(
items: list[dict],
processor: Callable[[dict], T],
continue_on_error: bool = True,
) -> ProcessingResult[T]:
successful = []
failed = []
error_counts = {}
for index, item in enumerate(items):
try:
result = processor(item)
successful.append(result)
except Exception as e:
error_type = type(e).__name__
error_counts[error_type] = error_counts.get(error_type, 0) + 1
logging.error(f"Failed to process item {index}: {e}")
failed.append({
"index": index,
"item": item,
"error": str(e),
"error_type": error_type,
})
if not continue_on_error:
raise
return ProcessingResult(
successful=successful,
failed=failed,
error_summary=error_counts,
)
安全性感知的错误处理
防止通过错误消息和堆栈跟踪泄漏信息。
生产与开发错误详情
use std::env;
pub struct ErrorResponse {
pub message: String,
pub details: Option<serde_json::Value>,
}
impl From<AppError> for ErrorResponse {
fn from(error: AppError) -> Self {
let is_production = env::var("ENVIRONMENT")
.unwrap_or_default()
.to_lowercase() == "production";
let message = match &error {
AppError::Validation(msg) => msg.clone(),
AppError::NotFound { .. } => "Resource not found".to_string(),
_ => "An error occurred".to_string(),
};
let details = if is_production {
None // 绝不暴露堆栈跟踪或内部详情
} else {
Some(serde_json::json!({
"error_type": format!("{:?}", error),
"backtrace": std::backtrace::Backtrace::capture().to_string(),
}))
};
ErrorResponse { message, details }
}
}
记录前清理错误
import re
import os
SENSITIVE_PATTERNS = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 电子邮件
r'\b\d{3}-\d{2}-\d{4}\b', # 社会安全号码
r'\b(?:\d{4}[-\s]?){3}\d{4}\b', # 信用卡
r'password["\']?\s*[:=]\s*["\']?[\w!@#$%^&*]+', # 密码
]
def sanitize_error_message(message: str) -> str:
"""记录前从错误消息中移除敏感数据。"""
sanitized = message
for pattern in SENSITIVE_PATTERNS:
sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
return sanitized
def log_error_safely(error: Exception, context: dict = None):
"""记录错误,使用清理后的消息。"""
sanitized_message = sanitize_error_message(str(error))
logger.error(
sanitized_message,
extra={
"error_type": type(error).__name__,
"context": context or {},
"include_stacktrace": os.getenv("ENVIRONMENT") != "production",
}
)
速率限制错误响应
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
pub struct RateLimitedErrorLogger {
last_logged: Mutex<HashMap<String, Instant>>,
min_interval: Duration,
}
impl RateLimitedErrorLogger {
pub fn new(min_interval: Duration) -> Self {
Self {
last_logged: Mutex::new(HashMap::new()),
min_interval,
}
}
pub async fn log_if_allowed(&self, error_key: &str, error: &dyn std::error::Error) {
let mut last_logged = self.last_logged.lock().await;
let now = Instant::now();
if let Some(last_time) = last_logged.get(error_key) {
if now.duration_since(*last_time) < self.min_interval {
return; // 跳过记录,防止日志泛滥
}
}
tracing::error!("Error occurred: {}", error);
last_logged.insert(error_key.to_string(), now);
}
}