Saga模式技能Skill saga-patterns

此技能专注于设计和实现分布式事务的Saga模式,支持编排和编排两种协调风格。它提供模式指导、最佳实践和实现示例,适用于微服务架构中的事务处理,确保最终一致性。关键词:Saga模式,分布式事务,微服务,编排,编排,补偿事务,错误处理。

架构设计 0 次安装 0 次浏览 更新于 3/11/2026

名称: saga-patterns 描述: 使用编排和编排的分布式事务模式 参数提示: <事务> [–style <编排|编排>] [–services <服务1,服务2>] 允许工具: Read, Glob, Grep, Write, Edit, Skill, Task, mcp__perplexity__搜索

Saga 模式技能

何时使用此技能

使用此技能当:

  • Saga 模式任务 - 处理使用编排和编排的分布式事务模式
  • 规划或设计 - 需要 Saga 模式的指导
  • 最佳实践 - 希望遵循已建立的模式和标准

概述

设计用于微服务的分布式事务模式,使用编排和编排。

强制:文档优先方法

在设计 sagas 之前:

  1. 调用 docs-management 技能 用于 saga 模式
  2. 验证模式 通过 MCP 服务器(perplexity, context7)
  3. 基于已建立的微服务模式提供指导

Saga 基础

为什么使用 Sagas?

问题:
跨服务的分布式事务很复杂。
传统的 2PC(两阶段提交)无法扩展。

解决方案:
Saga = 本地事务序列
每个步骤都有补偿操作
最终一致性替代 ACID

┌─────────┐    ┌─────────┐    ┌─────────┐
│ 步骤 1  │───►│ 步骤 2  │───►│ 步骤 3  │
│ 正向事务 + 补偿事务 │    │ 正向事务 + 补偿事务 │    │ 正向事务 + 补偿事务 │
└─────────┘    └─────────┘    └─────────┘
     │              │              │
     ▼              ▼              ▼
   本地事务       本地事务       本地事务

正向事务 = 正向事务
补偿事务 = 补偿事务

Saga 协调风格

编排(事件驱动)

编排模式:

服务通过事件通信。
无中央协调器。
每个服务知道下一步做什么。

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   订单服务   │    │   支付服务   │    │   库存服务   │
└──────┬──────┘    └──────┬──────┘    └──────┬──────┘
       │                  │                  │
       │ 订单创建         │                  │
       │─────────────────►│                  │
       │                  │ 支付处理         │
       │                  │─────────────────►│
       │                  │                  │ 库存预留
       │◄─────────────────┼──────────────────│
       │ 订单确认         │                  │

特点:
✓ 松耦合
✓ 简单服务
✗ 难以跟踪
✗ 循环依赖风险

编排(协调器驱动)

编排模式:

中央协调器协调 saga。
服务暴露命令。
协调器管理状态。

                ┌─────────────────┐
                │     协调器      │
                │  (Saga 管理器)  │
                └────────┬────────┘
                         │
        ┌────────────────┼────────────────┐
        │                │                │
        ▼                ▼                ▼
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   订单服务   │  │   支付服务   │  │   库存服务   │
└─────────────┘  └─────────────┘  └─────────────┘

特点:
✓ 清晰流程可见性
✓ 更易调试
✗ 单点故障
✗ 与协调器耦合

编排实现

事件驱动流

// 订单服务 - 启动 Saga
public class OrderService
{
    private readonly IEventPublisher _events;

    public async Task CreateOrderAsync(CreateOrderCommand cmd)
    {
        var order = new Order(cmd.CustomerId, cmd.Items);
        await _repository.SaveAsync(order);

        // 发布事件以启动 saga
        await _events.PublishAsync(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = cmd.CustomerId,
            TotalAmount = order.TotalAmount
        });
    }

    // 处理补偿
    public async Task HandleAsync(PaymentFailed @event)
    {
        var order = await _repository.GetAsync(@event.OrderId);
        order.Cancel("支付失败");
        await _repository.SaveAsync(order);

        await _events.PublishAsync(new OrderCancelled
        {
            OrderId = @event.OrderId,
            Reason = "支付失败"
        });
    }
}

// 支付服务 - 响应 OrderCreated
public class PaymentService
{
    public async Task HandleAsync(OrderCreated @event)
    {
        try
        {
            var payment = await ProcessPaymentAsync(@event.OrderId, @event.TotalAmount);

            await _events.PublishAsync(new PaymentProcessed
            {
                OrderId = @event.OrderId,
                PaymentId = payment.Id
            });
        }
        catch (PaymentException ex)
        {
            await _events.PublishAsync(new PaymentFailed
            {
                OrderId = @event.OrderId,
                Reason = ex.Message
            });
        }
    }
}

// 库存服务 - 响应 PaymentProcessed
public class InventoryService
{
    public async Task HandleAsync(PaymentProcessed @event)
    {
        try
        {
            await ReserveInventoryAsync(@event.OrderId);

            await _events.PublishAsync(new InventoryReserved
            {
                OrderId = @event.OrderId
            });
        }
        catch (InsufficientInventoryException)
        {
            // 触发补偿
            await _events.PublishAsync(new InventoryReservationFailed
            {
                OrderId = @event.OrderId
            });
        }
    }

    // 补偿操作
    public async Task HandleAsync(OrderCancelled @event)
    {
        await ReleaseInventoryAsync(@event.OrderId);
    }
}

编排实现

Saga 协调器

// Saga 状态机
public class OrderSaga : Saga<OrderSagaData>,
    IAmStartedBy<OrderCreated>,
    IHandle<PaymentProcessed>,
    IHandle<PaymentFailed>,
    IHandle<InventoryReserved>,
    IHandle<InventoryReservationFailed>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(s => s.OrderId)
            .ToMessage<OrderCreated>(m => m.OrderId)
            .ToMessage<PaymentProcessed>(m => m.OrderId)
            .ToMessage<PaymentFailed>(m => m.OrderId)
            .ToMessage<InventoryReserved>(m => m.OrderId)
            .ToMessage<InventoryReservationFailed>(m => m.OrderId);
    }

    public async Task Handle(OrderCreated message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.CustomerId = message.CustomerId;
        Data.TotalAmount = message.TotalAmount;
        Data.Status = SagaStatus.Started;

        // 请求支付
        await context.Send(new ProcessPaymentCommand
        {
            OrderId = message.OrderId,
            Amount = message.TotalAmount
        });
    }

    public async Task Handle(PaymentProcessed message, IMessageHandlerContext context)
    {
        Data.PaymentId = message.PaymentId;
        Data.Status = SagaStatus.PaymentCompleted;

        // 请求库存预留
        await context.Send(new ReserveInventoryCommand
        {
            OrderId = message.OrderId
        });
    }

    public async Task Handle(PaymentFailed message, IMessageHandlerContext context)
    {
        Data.Status = SagaStatus.Failed;

        // 补偿:取消订单
        await context.Send(new CancelOrderCommand
        {
            OrderId = message.OrderId,
            Reason = "支付失败"
        });

        MarkAsComplete();
    }

    public async Task Handle(InventoryReserved message, IMessageHandlerContext context)
    {
        Data.Status = SagaStatus.Completed;

        // 完成 saga
        await context.Publish(new OrderCompleted
        {
            OrderId = Data.OrderId
        });

        MarkAsComplete();
    }

    public async Task Handle(InventoryReservationFailed message, IMessageHandlerContext context)
    {
        Data.Status = SagaStatus.Failed;

        // 补偿:退款
        await context.Send(new RefundPaymentCommand
        {
            OrderId = Data.OrderId,
            PaymentId = Data.PaymentId
        });

        // 补偿:取消订单
        await context.Send(new CancelOrderCommand
        {
            OrderId = Data.OrderId,
            Reason = "库存不足"
        });

        MarkAsComplete();
    }
}

public class OrderSagaData : ContainSagaData
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public Guid? PaymentId { get; set; }
    public SagaStatus Status { get; set; }
}

补偿事务

补偿设计

补偿原则:

1. 语义撤销
   不总是精确反转
   例如:取消订单 vs 撤销创建订单

2. 幂等性
   可以安全调用多次
   无论重试次数,结果相同

3. 永不失败
   补偿必须最终成功
   使用退避重试

4. 有序
   按反向顺序补偿
   最后步骤先补偿,第一步骤最后补偿

补偿流:
步骤 1 ─► 步骤 2 ─► 步骤 3 ─► 失败
   │         │         │         │
   │         │         │         ▼
   │         │         └───► 补偿 3
   │         │                   │
   │         └───────────────► 补偿 2
   │                             │
   └─────────────────────────► 补偿 1

补偿示例

// 正向事务和补偿对
public class ReservationService
{
    // 正向:预留库存
    public async Task<ReservationId> ReserveAsync(OrderId orderId, List<Item> items)
    {
        var reservation = new Reservation(orderId, items);
        foreach (var item in items)
        {
            await _inventory.DecrementAsync(item.ProductId, item.Quantity);
        }
        await _repository.SaveAsync(reservation);
        return reservation.Id;
    }

    // 补偿:释放预留
    public async Task ReleaseAsync(ReservationId reservationId)
    {
        var reservation = await _repository.GetAsync(reservationId);
        if (reservation.Status == ReservationStatus.Released)
            return; // 幂等

        foreach (var item in reservation.Items)
        {
            await _inventory.IncrementAsync(item.ProductId, item.Quantity);
        }

        reservation.Release();
        await _repository.SaveAsync(reservation);
    }
}

错误处理

重试策略

重试模式:

1. 立即重试
   用于临时失败
   网络故障、超时

2. 指数退避
   增加延迟
   1秒 → 2秒 → 4秒 → 8秒

3. 断路器
   在阈值后停止重试
   允许恢复时间

4. 死信队列
   捕获失败消息
   手动干预

超时处理

// 带超时的 Saga
public class OrderSaga : Saga<OrderSagaData>
{
    public async Task Handle(OrderCreated message, IMessageHandlerContext context)
    {
        // 为支付设置超时
        await RequestTimeout<PaymentTimeout>(
            context,
            TimeSpan.FromMinutes(30));

        await context.Send(new ProcessPaymentCommand { ... });
    }

    public async Task Timeout(PaymentTimeout timeout, IMessageHandlerContext context)
    {
        if (Data.Status == SagaStatus.AwaitingPayment)
        {
            // 支付未在时间内完成
            await context.Send(new CancelOrderCommand
            {
                OrderId = Data.OrderId,
                Reason = "支付超时"
            });

            Data.Status = SagaStatus.TimedOut;
            MarkAsComplete();
        }
    }
}

Saga 设计模板

# Saga 设计: [流程名称]

## 概述
[此 saga 实现的功能]

## 触发器
[启动此 saga 的事件]

## 步骤

| 步骤 | 服务 | 动作 | 补偿动作 |
|------|---------|--------|---------------------|
| 1 | [服务] | [正向动作] | [补偿] |
| 2 | [服务] | [正向动作] | [补偿] |
| 3 | [服务] | [正向动作] | [补偿] |

## 流图

```text
[ASCII saga 流图]

失败场景

失败点 失败内容 补偿链
步骤 1 后 [描述] 补偿 1
步骤 2 后 [描述] 补偿 2 → 1

超时处理

  • 步骤 1 超时: [发生情况]
  • 步骤 2 超时: [发生情况]

幂等性

  • [如何处理重复]

监控

  • [监控内容]
  • [告警阈值]

选择编排 vs 编排

因素 编排 编排
耦合
可见性 分布式 集中化
复杂性 在事件中 在协调器中
调试 更难 更易
团队结构 独立团队 中心团队
失败处理 分布式 集中化
最佳适用 简单流 复杂流

工作流

设计 sagas 时:

  1. 识别边界:哪些服务参与?
  2. 定义步骤:正向动作按顺序
  3. 设计补偿:为每个步骤的反向动作
  4. 选择风格:编排或编排?
  5. 处理失败:超时、重试、死信
  6. 确保幂等性:所有动作可安全重复
  7. 计划监控:跟踪 saga 状态和失败
  8. 测试失败路径:验证补偿工作

用户界面

当用户直接调用时,此技能为分布式事务设计 saga 模式。

执行工作流

  1. 解析参数 - 提取事务描述、风格偏好(编排/编排/推荐)和参与服务列表。如果未提供事务,询问用户。
  2. 研究上下文 - 使用 MCP 服务器了解类似事务的 saga 模式。
  3. 分析事务 - 识别参与服务,定义正向事务步骤,设计补偿动作,计划错误处理。
  4. 推荐风格 - 如果未指定风格,基于流复杂性、团队结构、调试需求和耦合容忍度评估编排 vs 编排。
  5. 设计 Saga - 创建步骤定义、补偿、状态机(编排)或事件流(编排),以及失败场景。
  6. 生成输出 - 生成 saga 设计文档,包含流图、步骤详情表、C# 实现示例、错误处理策略和监控建议。

参考

详细指导:


最后更新: 2025-12-26