name: 错误处理模式 description: 掌握跨语言的错误处理模式,包括异常、Result类型、错误传播和优雅降级,以构建弹性应用程序。在实现错误处理、设计API或提高应用可靠性时使用。
错误处理模式
构建具有强大错误处理策略的弹性应用程序,优雅地处理故障并提供优秀的调试体验。
何时使用此技能
- 在新功能中实现错误处理
- 设计错误弹性的API
- 调试生产问题
- 提高应用可靠性
- 为用户和开发者创建更好的错误消息
- 实现重试和断路器模式
- 处理异步/并发错误
- 构建故障容忍的分布式系统
核心概念
1. 错误处理哲学
异常 vs Result类型:
- 异常: 传统的try-catch,中断控制流
- Result类型: 显式成功/失败,函数式方法
- 错误代码: C风格,需要纪律
- Option/Maybe类型: 用于可空值
何时使用每种:
- 异常:意外错误,异常条件
- Result类型:预期错误,验证失败
- Panics/Crashes:不可恢复错误,编程错误
2. 错误类别
可恢复错误:
- 网络超时
- 缺失文件
- 无效用户输入
- API速率限制
不可恢复错误:
- 内存不足
- 栈溢出
- 编程错误(空指针等)
语言特定模式
Python错误处理
自定义异常层次结构:
class ApplicationError(Exception):
"""所有应用错误的基异常。"""
def __init__(self, message: str, code: str = None, details: dict = None):
super().__init__(message)
self.code = code
self.details = details or {}
self.timestamp = datetime.utcnow()
class ValidationError(ApplicationError):
"""验证失败时引发。"""
pass
class NotFoundError(ApplicationError):
"""资源未找到时引发。"""
pass
class ExternalServiceError(ApplicationError):
"""外部服务失败时引发。"""
def __init__(self, message: str, service: str, **kwargs):
super().__init__(message, **kwargs)
self.service = service
# 使用
def get_user(user_id: str) -> User:
user = db.query(User).filter_by(id=user_id).first()
if not user:
raise NotFoundError(
f"用户未找到",
code="USER_NOT_FOUND",
details={"user_id": user_id}
)
return user
上下文管理器用于清理:
from contextlib import contextmanager
@contextmanager
def database_transaction(session):
"""确保事务提交或回滚。"""
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()
# 使用
with database_transaction(db.session) as session:
user = User(name="Alice")
session.add(user)
# 自动提交或回滚
带指数退避的重试:
import time
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar('T')
def retry(
max_attempts: int = 3,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
):
"""带指数退避的重试装饰器。"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
sleep_time = backoff_factor ** attempt
time.sleep(sleep_time)
continue
raise
raise last_exception
return wrapper
return decorator
# 使用
@retry(max_attempts=3, exceptions=(NetworkError,))
def fetch_data(url: str) -> dict:
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
TypeScript/JavaScript错误处理
自定义错误类:
// 自定义错误类
class ApplicationError extends Error {
constructor(
message: string,
public code: string,
public statusCode: number = 500,
public details?: Record<string, any>,
) {
super(message);
this.name = this.constructor.name;
Error.captureStackTrace(this, this.constructor);
}
}
class ValidationError extends ApplicationError {
constructor(message: string, details?: Record<string, any>) {
super(message, "VALIDATION_ERROR", 400, details);
}
}
class NotFoundError extends ApplicationError {
constructor(resource: string, id: string) {
super(`${resource} not found`, "NOT_FOUND", 404, { resource, id });
}
}
// 使用
function getUser(id: string): User {
const user = users.find((u) => u.id === id);
if (!user) {
throw new NotFoundError("User", id);
}
return user;
}
Result类型模式:
// 用于显式错误处理的Result类型
type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
// 辅助函数
function Ok<T>(value: T): Result<T, never> {
return { ok: true, value };
}
function Err<E>(error: E): Result<never, E> {
return { ok: false, error };
}
// 使用
function parseJSON<T>(json: string): Result<T, SyntaxError> {
try {
const value = JSON.parse(json) as T;
return Ok(value);
} catch (error) {
return Err(error as SyntaxError);
}
}
// 消费Result
const result = parseJSON<User>(userJson);
if (result.ok) {
console.log(result.value.name);
} else {
console.error("解析失败:", result.error.message);
}
// 链式Result
function chain<T, U, E>(
result: Result<T, E>,
fn: (value: T) => Result<U, E>,
): Result<U, E> {
return result.ok ? fn(result.value) : result;
}
异步错误处理:
// 使用async/await进行适当的错误处理
async function fetchUserOrders(userId: string): Promise<Order[]> {
try {
const user = await getUser(userId);
const orders = await getOrders(user.id);
return orders;
} catch (error) {
if (error instanceof NotFoundError) {
return []; // 未找到时返回空数组
}
if (error instanceof NetworkError) {
// 重试逻辑
return retryFetchOrders(userId);
}
// 重新抛出意外错误
throw error;
}
}
// Promise错误处理
function fetchData(url: string): Promise<Data> {
return fetch(url)
.then((response) => {
if (!response.ok) {
throw new NetworkError(`HTTP ${response.status}`);
}
return response.json();
})
.catch((error) => {
console.error("获取失败:", error);
throw error;
});
}
Rust错误处理
Result和Option类型:
use std::fs::File;
use std::io::{self, Read};
// 用于可能失败的操作的Result类型
fn read_file(path: &str) -> Result<String, io::Error> {
let mut file = File::open(path)?; // ? 操作符传播错误
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}
// 自定义错误类型
#[derive(Debug)]
enum AppError {
Io(io::Error),
Parse(std::num::ParseIntError),
NotFound(String),
Validation(String),
}
impl From<io::Error> for AppError {
fn from(error: io::Error) -> Self {
AppError::Io(error)
}
}
// 使用自定义错误类型
fn read_number_from_file(path: &str) -> Result<i32, AppError> {
let contents = read_file(path)?; // 自动转换io::Error
let number = contents.trim().parse()
.map_err(AppError::Parse)?; // 显式转换ParseIntError
Ok(number)
}
// Option用于可空值
fn find_user(id: &str) -> Option<User> {
users.iter().find(|u| u.id == id).cloned()
}
// 结合Option和Result
fn get_user_age(id: &str) -> Result<u32, AppError> {
find_user(id)
.ok_or_else(|| AppError::NotFound(id.to_string()))
.map(|user| user.age)
}
Go错误处理
显式错误返回:
// 基本错误处理
func getUser(id string) (*User, error) {
user, err := db.QueryUser(id)
if err != nil {
return nil, fmt.Errorf("查询用户失败: %w", err)
}
if user == nil {
return nil, errors.New("用户未找到")
}
return user, nil
}
// 自定义错误类型
type ValidationError struct {
Field string
Message string
}
func (e *ValidationError) Error() string {
return fmt.Sprintf("验证失败 for %s: %s", e.Field, e.Message)
}
// 用于比较的哨兵错误
var (
ErrNotFound = errors.New("not found")
ErrUnauthorized = errors.New("unauthorized")
ErrInvalidInput = errors.New("invalid input")
)
// 错误检查
user, err := getUser("123")
if err != nil {
if errors.Is(err, ErrNotFound) {
// 处理未找到
} else {
// 处理其他错误
}
}
// 错误包装和解包
func processUser(id string) error {
user, err := getUser(id)
if err != nil {
return fmt.Errorf("处理用户失败: %w", err)
}
// 处理用户
return nil
}
// 解包错误
err := processUser("123")
if err != nil {
var valErr *ValidationError
if errors.As(err, &valErr) {
fmt.Printf("验证错误: %s
", valErr.Field)
}
}
通用模式
模式1: 断路器
防止分布式系统中的级联故障。
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, TypeVar
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # 正常操作
OPEN = "open" # 失败,拒绝请求
HALF_OPEN = "half_open" # 测试是否恢复
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout: timedelta = timedelta(seconds=60),
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func: Callable[[], T]) -> T:
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 使用
circuit_breaker = CircuitBreaker()
def fetch_data():
return circuit_breaker.call(lambda: external_api.get_data())
模式2: 错误聚合
收集多个错误而不是在第一个错误时失败。
class ErrorCollector {
private errors: Error[] = [];
add(error: Error): void {
this.errors.push(error);
}
hasErrors(): boolean {
return this.errors.length > 0;
}
getErrors(): Error[] {
return [...this.errors];
}
throw(): never {
if (this.errors.length === 1) {
throw this.errors[0];
}
throw new AggregateError(
this.errors,
`${this.errors.length} errors occurred`,
);
}
}
// 使用: 验证多个字段
function validateUser(data: any): User {
const errors = new ErrorCollector();
if (!data.email) {
errors.add(new ValidationError("Email is required"));
} else if (!isValidEmail(data.email)) {
errors.add(new ValidationError("Email is invalid"));
}
if (!data.name || data.name.length < 2) {
errors.add(new ValidationError("Name must be at least 2 characters"));
}
if (!data.age || data.age < 18) {
errors.add(new ValidationError("Age must be 18 or older"));
}
if (errors.hasErrors()) {
errors.throw();
}
return data as User;
}
模式3: 优雅降级
在错误发生时提供回退功能。
from typing import Optional, Callable, TypeVar
T = TypeVar('T')
def with_fallback(
primary: Callable[[], T],
fallback: Callable[[], T],
log_error: bool = True
) -> T:
"""尝试主函数,错误时回退到备用函数。"""
try:
return primary()
except Exception as e:
if log_error:
logger.error(f"主函数失败: {e}")
return fallback()
# 使用
def get_user_profile(user_id: str) -> UserProfile:
return with_fallback(
primary=lambda: fetch_from_cache(user_id),
fallback=lambda: fetch_from_database(user_id)
)
# 多个回退
def get_exchange_rate(currency: str) -> float:
return (
try_function(lambda: api_provider_1.get_rate(currency))
or try_function(lambda: api_provider_2.get_rate(currency))
or try_function(lambda: cache.get_rate(currency))
or DEFAULT_RATE
)
def try_function(func: Callable[[], Optional[T]]) -> Optional[T]:
try:
return func()
except Exception:
return None
最佳实践
- 快速失败: 早期验证输入,快速失败
- 保留上下文: 包括堆栈跟踪、元数据、时间戳
- 有意义的消息: 解释发生了什么以及如何修复
- 适当日志: 错误=日志,预期失败=不要垃圾日志
- 在正确级别处理: 在可以有意义处理的地方捕获
- 清理资源: 使用try-finally、上下文管理器、defer
- 不要吞没错误: 日志或重新抛出,不要默默忽略
- 类型安全错误: 尽可能使用类型化错误
# 好的错误处理示例
def process_order(order_id: str) -> Order:
"""处理订单,具有全面的错误处理。"""
try:
# 验证输入
if not order_id:
raise ValidationError("订单ID是必需的")
# 获取订单
order = db.get_order(order_id)
if not order:
raise NotFoundError("订单", order_id)
# 处理支付
try:
payment_result = payment_service.charge(order.total)
except PaymentServiceError as e:
# 日志并包装外部服务错误
logger.error(f"订单 {order_id} 支付失败: {e}")
raise ExternalServiceError(
f"支付处理失败",
service="payment_service",
details={"order_id": order_id, "amount": order.total}
) from e
# 更新订单
order.status = "completed"
order.payment_id = payment_result.id
db.save(order)
return order
except ApplicationError:
# 重新抛出已知应用错误
raise
except Exception as e:
# 日志意外错误
logger.exception(f"处理订单 {order_id} 时发生意外错误")
raise ApplicationError(
"订单处理失败",
code="INTERNAL_ERROR"
) from e
常见陷阱
- 捕获太广泛:
except Exception隐藏错误 - 空捕获块: 默默吞没错误
- 日志和重新抛出: 创建重复日志条目
- 不清理: 忘记关闭文件、连接
- 错误消息差: “发生错误” 没有帮助
- 返回错误代码: 使用异常或Result类型
- 忽略异步错误: 未处理的Promise拒绝
资源
- references/exception-hierarchy-design.md: 设计错误类层次结构
- references/error-recovery-strategies.md: 不同场景的恢复模式
- references/async-error-handling.md: 处理并发代码中的错误
- assets/error-handling-checklist.md: 错误处理审查清单
- assets/error-message-guide.md: 编写有帮助的错误消息
- scripts/error-analyzer.py: 分析日志中的错误模式