名称: saga-patterns 描述: 使用编排和编排的分布式事务模式 参数提示: <事务> [–style <编排|编排>] [–services <服务1,服务2>] 允许工具: Read, Glob, Grep, Write, Edit, Skill, Task, mcp__perplexity__搜索
Saga 模式技能
何时使用此技能
使用此技能当:
- Saga 模式任务 - 处理使用编排和编排的分布式事务模式
- 规划或设计 - 需要 Saga 模式的指导
- 最佳实践 - 希望遵循已建立的模式和标准
概述
设计用于微服务的分布式事务模式,使用编排和编排。
强制:文档优先方法
在设计 sagas 之前:
- 调用
docs-management技能 用于 saga 模式 - 验证模式 通过 MCP 服务器(perplexity, context7)
- 基于已建立的微服务模式提供指导
Saga 基础
为什么使用 Sagas?
问题:
跨服务的分布式事务很复杂。
传统的 2PC(两阶段提交)无法扩展。
解决方案:
Saga = 本地事务序列
每个步骤都有补偿操作
最终一致性替代 ACID
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 步骤 1 │───►│ 步骤 2 │───►│ 步骤 3 │
│ 正向事务 + 补偿事务 │ │ 正向事务 + 补偿事务 │ │ 正向事务 + 补偿事务 │
└─────────┘ └─────────┘ └─────────┘
│ │ │
▼ ▼ ▼
本地事务 本地事务 本地事务
正向事务 = 正向事务
补偿事务 = 补偿事务
Saga 协调风格
编排(事件驱动)
编排模式:
服务通过事件通信。
无中央协调器。
每个服务知道下一步做什么。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 订单服务 │ │ 支付服务 │ │ 库存服务 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 订单创建 │ │
│─────────────────►│ │
│ │ 支付处理 │
│ │─────────────────►│
│ │ │ 库存预留
│◄─────────────────┼──────────────────│
│ 订单确认 │ │
特点:
✓ 松耦合
✓ 简单服务
✗ 难以跟踪
✗ 循环依赖风险
编排(协调器驱动)
编排模式:
中央协调器协调 saga。
服务暴露命令。
协调器管理状态。
┌─────────────────┐
│ 协调器 │
│ (Saga 管理器) │
└────────┬────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 订单服务 │ │ 支付服务 │ │ 库存服务 │
└─────────────┘ └─────────────┘ └─────────────┘
特点:
✓ 清晰流程可见性
✓ 更易调试
✗ 单点故障
✗ 与协调器耦合
编排实现
事件驱动流
// 订单服务 - 启动 Saga
public class OrderService
{
private readonly IEventPublisher _events;
public async Task CreateOrderAsync(CreateOrderCommand cmd)
{
var order = new Order(cmd.CustomerId, cmd.Items);
await _repository.SaveAsync(order);
// 发布事件以启动 saga
await _events.PublishAsync(new OrderCreated
{
OrderId = order.Id,
CustomerId = cmd.CustomerId,
TotalAmount = order.TotalAmount
});
}
// 处理补偿
public async Task HandleAsync(PaymentFailed @event)
{
var order = await _repository.GetAsync(@event.OrderId);
order.Cancel("支付失败");
await _repository.SaveAsync(order);
await _events.PublishAsync(new OrderCancelled
{
OrderId = @event.OrderId,
Reason = "支付失败"
});
}
}
// 支付服务 - 响应 OrderCreated
public class PaymentService
{
public async Task HandleAsync(OrderCreated @event)
{
try
{
var payment = await ProcessPaymentAsync(@event.OrderId, @event.TotalAmount);
await _events.PublishAsync(new PaymentProcessed
{
OrderId = @event.OrderId,
PaymentId = payment.Id
});
}
catch (PaymentException ex)
{
await _events.PublishAsync(new PaymentFailed
{
OrderId = @event.OrderId,
Reason = ex.Message
});
}
}
}
// 库存服务 - 响应 PaymentProcessed
public class InventoryService
{
public async Task HandleAsync(PaymentProcessed @event)
{
try
{
await ReserveInventoryAsync(@event.OrderId);
await _events.PublishAsync(new InventoryReserved
{
OrderId = @event.OrderId
});
}
catch (InsufficientInventoryException)
{
// 触发补偿
await _events.PublishAsync(new InventoryReservationFailed
{
OrderId = @event.OrderId
});
}
}
// 补偿操作
public async Task HandleAsync(OrderCancelled @event)
{
await ReleaseInventoryAsync(@event.OrderId);
}
}
编排实现
Saga 协调器
// Saga 状态机
public class OrderSaga : Saga<OrderSagaData>,
IAmStartedBy<OrderCreated>,
IHandle<PaymentProcessed>,
IHandle<PaymentFailed>,
IHandle<InventoryReserved>,
IHandle<InventoryReservationFailed>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(s => s.OrderId)
.ToMessage<OrderCreated>(m => m.OrderId)
.ToMessage<PaymentProcessed>(m => m.OrderId)
.ToMessage<PaymentFailed>(m => m.OrderId)
.ToMessage<InventoryReserved>(m => m.OrderId)
.ToMessage<InventoryReservationFailed>(m => m.OrderId);
}
public async Task Handle(OrderCreated message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.CustomerId = message.CustomerId;
Data.TotalAmount = message.TotalAmount;
Data.Status = SagaStatus.Started;
// 请求支付
await context.Send(new ProcessPaymentCommand
{
OrderId = message.OrderId,
Amount = message.TotalAmount
});
}
public async Task Handle(PaymentProcessed message, IMessageHandlerContext context)
{
Data.PaymentId = message.PaymentId;
Data.Status = SagaStatus.PaymentCompleted;
// 请求库存预留
await context.Send(new ReserveInventoryCommand
{
OrderId = message.OrderId
});
}
public async Task Handle(PaymentFailed message, IMessageHandlerContext context)
{
Data.Status = SagaStatus.Failed;
// 补偿:取消订单
await context.Send(new CancelOrderCommand
{
OrderId = message.OrderId,
Reason = "支付失败"
});
MarkAsComplete();
}
public async Task Handle(InventoryReserved message, IMessageHandlerContext context)
{
Data.Status = SagaStatus.Completed;
// 完成 saga
await context.Publish(new OrderCompleted
{
OrderId = Data.OrderId
});
MarkAsComplete();
}
public async Task Handle(InventoryReservationFailed message, IMessageHandlerContext context)
{
Data.Status = SagaStatus.Failed;
// 补偿:退款
await context.Send(new RefundPaymentCommand
{
OrderId = Data.OrderId,
PaymentId = Data.PaymentId
});
// 补偿:取消订单
await context.Send(new CancelOrderCommand
{
OrderId = Data.OrderId,
Reason = "库存不足"
});
MarkAsComplete();
}
}
public class OrderSagaData : ContainSagaData
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public Guid? PaymentId { get; set; }
public SagaStatus Status { get; set; }
}
补偿事务
补偿设计
补偿原则:
1. 语义撤销
不总是精确反转
例如:取消订单 vs 撤销创建订单
2. 幂等性
可以安全调用多次
无论重试次数,结果相同
3. 永不失败
补偿必须最终成功
使用退避重试
4. 有序
按反向顺序补偿
最后步骤先补偿,第一步骤最后补偿
补偿流:
步骤 1 ─► 步骤 2 ─► 步骤 3 ─► 失败
│ │ │ │
│ │ │ ▼
│ │ └───► 补偿 3
│ │ │
│ └───────────────► 补偿 2
│ │
└─────────────────────────► 补偿 1
补偿示例
// 正向事务和补偿对
public class ReservationService
{
// 正向:预留库存
public async Task<ReservationId> ReserveAsync(OrderId orderId, List<Item> items)
{
var reservation = new Reservation(orderId, items);
foreach (var item in items)
{
await _inventory.DecrementAsync(item.ProductId, item.Quantity);
}
await _repository.SaveAsync(reservation);
return reservation.Id;
}
// 补偿:释放预留
public async Task ReleaseAsync(ReservationId reservationId)
{
var reservation = await _repository.GetAsync(reservationId);
if (reservation.Status == ReservationStatus.Released)
return; // 幂等
foreach (var item in reservation.Items)
{
await _inventory.IncrementAsync(item.ProductId, item.Quantity);
}
reservation.Release();
await _repository.SaveAsync(reservation);
}
}
错误处理
重试策略
重试模式:
1. 立即重试
用于临时失败
网络故障、超时
2. 指数退避
增加延迟
1秒 → 2秒 → 4秒 → 8秒
3. 断路器
在阈值后停止重试
允许恢复时间
4. 死信队列
捕获失败消息
手动干预
超时处理
// 带超时的 Saga
public class OrderSaga : Saga<OrderSagaData>
{
public async Task Handle(OrderCreated message, IMessageHandlerContext context)
{
// 为支付设置超时
await RequestTimeout<PaymentTimeout>(
context,
TimeSpan.FromMinutes(30));
await context.Send(new ProcessPaymentCommand { ... });
}
public async Task Timeout(PaymentTimeout timeout, IMessageHandlerContext context)
{
if (Data.Status == SagaStatus.AwaitingPayment)
{
// 支付未在时间内完成
await context.Send(new CancelOrderCommand
{
OrderId = Data.OrderId,
Reason = "支付超时"
});
Data.Status = SagaStatus.TimedOut;
MarkAsComplete();
}
}
}
Saga 设计模板
# Saga 设计: [流程名称]
## 概述
[此 saga 实现的功能]
## 触发器
[启动此 saga 的事件]
## 步骤
| 步骤 | 服务 | 动作 | 补偿动作 |
|------|---------|--------|---------------------|
| 1 | [服务] | [正向动作] | [补偿] |
| 2 | [服务] | [正向动作] | [补偿] |
| 3 | [服务] | [正向动作] | [补偿] |
## 流图
```text
[ASCII saga 流图]
失败场景
| 失败点 | 失败内容 | 补偿链 |
|---|---|---|
| 步骤 1 后 | [描述] | 补偿 1 |
| 步骤 2 后 | [描述] | 补偿 2 → 1 |
超时处理
- 步骤 1 超时: [发生情况]
- 步骤 2 超时: [发生情况]
幂等性
- [如何处理重复]
监控
- [监控内容]
- [告警阈值]
选择编排 vs 编排
| 因素 | 编排 | 编排 |
|---|---|---|
| 耦合 | 松 | 紧 |
| 可见性 | 分布式 | 集中化 |
| 复杂性 | 在事件中 | 在协调器中 |
| 调试 | 更难 | 更易 |
| 团队结构 | 独立团队 | 中心团队 |
| 失败处理 | 分布式 | 集中化 |
| 最佳适用 | 简单流 | 复杂流 |
工作流
设计 sagas 时:
- 识别边界:哪些服务参与?
- 定义步骤:正向动作按顺序
- 设计补偿:为每个步骤的反向动作
- 选择风格:编排或编排?
- 处理失败:超时、重试、死信
- 确保幂等性:所有动作可安全重复
- 计划监控:跟踪 saga 状态和失败
- 测试失败路径:验证补偿工作
用户界面
当用户直接调用时,此技能为分布式事务设计 saga 模式。
执行工作流
- 解析参数 - 提取事务描述、风格偏好(编排/编排/推荐)和参与服务列表。如果未提供事务,询问用户。
- 研究上下文 - 使用 MCP 服务器了解类似事务的 saga 模式。
- 分析事务 - 识别参与服务,定义正向事务步骤,设计补偿动作,计划错误处理。
- 推荐风格 - 如果未指定风格,基于流复杂性、团队结构、调试需求和耦合容忍度评估编排 vs 编排。
- 设计 Saga - 创建步骤定义、补偿、状态机(编排)或事件流(编排),以及失败场景。
- 生成输出 - 生成 saga 设计文档,包含流图、步骤详情表、C# 实现示例、错误处理策略和监控建议。
参考
详细指导:
最后更新: 2025-12-26