Saga编排Skill saga-orchestration

Saga编排技能用于实现和管理Saga模式,协调分布式事务和长运行业务流程。它涉及处理多服务事务、补偿事务、工作流编排,适用于微服务架构中的故障处理和业务协调。关键词:Saga模式、分布式事务、补偿事务、工作流编排、微服务架构、系统设计、长运行工作流。

架构设计 0 次安装 0 次浏览 更新于 3/22/2026

name: saga-orchestration description: 实现saga模式,用于分布式事务和跨聚合工作流。当协调多步骤业务流程、处理补偿事务或管理长运行工作流时使用。 license: MIT metadata: version: “1.0.0” domain: 架构 triggers: saga模式、分布式事务、补偿事务、长运行工作流、编排 role: 架构师 scope: 系统设计 output-format: 代码 related-skills: 事件溯源架构、CQRS实现、微服务模式

Saga 编排

管理分布式事务和长运行业务流程的模式。

不要使用此技能当

  • 任务与saga编排无关
  • 需要此范围外的不同领域或工具

指令

  • 澄清目标、约束和所需输入。
  • 应用相关最佳实践并验证结果。
  • 提供可操作步骤和验证。
  • 如果需要详细示例,请打开 resources/implementation-playbook.md

使用此技能当

  • 协调多服务事务
  • 实现补偿事务
  • 管理长运行业务工作流
  • 处理分布式系统中的故障
  • 构建订单履行流程
  • 实现审批工作流

核心概念

1. Saga 类型

编排式                    协调式
┌─────┐  ┌─────┐  ┌─────┐     ┌─────────────┐
│服务A│─►│服务B│─►│服务C│     │ 协调器      │
└─────┘  └─────┘  └─────┘     └──────┬──────┘
   │        │        │               │
   ▼        ▼        ▼         ┌─────┼─────┐
 事件     事件     事件       ▼     ▼     ▼
                            ┌────┐┌────┐┌────┐
                            │服务1││服务2││服务3│
                            └────┘└────┘└────┘

2. Saga 执行状态

状态 描述
已开始 Saga 已初始化
待处理 等待步骤完成
补偿中 由于失败而回滚
已完成 所有步骤成功
失败 Saga 在补偿后失败

模板

模板 1: Saga 协调器基础

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid

class SagaState(Enum):
    STARTED = "started"
    PENDING = "pending"
    COMPENSATING = "compensating"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class SagaStep:
    name: str
    action: str
    compensation: str
    status: str = "pending"
    result: Optional[Dict] = None
    error: Optional[str] = None
    executed_at: Optional[datetime] = None
    compensated_at: Optional[datetime] = None


@dataclass
class Saga:
    saga_id: str
    saga_type: str
    state: SagaState
    data: Dict[str, Any]
    steps: List[SagaStep]
    current_step: int = 0
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)


class SagaOrchestrator(ABC):
    """Saga协调器基类。"""

    def __init__(self, saga_store, event_publisher):
        self.saga_store = saga_store
        self.event_publisher = event_publisher

    @abstractmethod
    def define_steps(self, data: Dict) -> List[SagaStep]:
        """定义saga步骤。"""
        pass

    @property
    @abstractmethod
    def saga_type(self) -> str:
        """唯一的saga类型标识符。"""
        pass

    async def start(self, data: Dict) -> Saga:
        """开始一个新的saga。"""
        saga = Saga(
            saga_id=str(uuid.uuid4()),
            saga_type=self.saga_type,
            state=SagaState.STARTED,
            data=data,
            steps=self.define_steps(data)
        )
        await self.saga_store.save(saga)
        await self._execute_next_step(saga)
        return saga

    async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
        """处理成功的步骤完成。"""
        saga = await self.saga_store.get(saga_id)

        # 更新步骤
        for step in saga.steps:
            if step.name == step_name:
                step.status = "completed"
                step.result = result
                step.executed_at = datetime.utcnow()
                break

        saga.current_step += 1
        saga.updated_at = datetime.utcnow()

        # 检查saga是否完成
        if saga.current_step >= len(saga.steps):
            saga.state = SagaState.COMPLETED
            await self.saga_store.save(saga)
            await self._on_saga_completed(saga)
        else:
            saga.state = SagaState.PENDING
            await self.saga_store.save(saga)
            await self._execute_next_step(saga)

    async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
        """处理步骤失败 - 开始补偿。"""
        saga = await self.saga_store.get(saga_id)

        # 标记步骤为失败
        for step in saga.steps:
            if step.name == step_name:
                step.status = "failed"
                step.error = error
                break

        saga.state = SagaState.COMPENSATING
        saga.updated_at = datetime.utcnow()
        await self.saga_store.save(saga)

        # 从当前步骤向后开始补偿
        await self._compensate(saga)

    async def _execute_next_step(self, saga: Saga):
        """执行saga中的下一个步骤。"""
        if saga.current_step >= len(saga.steps):
            return

        step = saga.steps[saga.current_step]
        step.status = "executing"
        await self.saga_store.save(saga)

        # 发布命令以执行步骤
        await self.event_publisher.publish(
            step.action,
            {
                "saga_id": saga.saga_id,
                "step_name": step.name,
                **saga.data
            }
        )

    async def _compensate(self, saga: Saga):
        """为已完成的步骤执行补偿。"""
        # 以相反顺序补偿
        for i in range(saga.current_step - 1, -1, -1):
            step = saga.steps[i]
            if step.status == "completed":
                step.status = "compensating"
                await self.saga_store.save(saga)

                await self.event_publisher.publish(
                    step.compensation,
                    {
                        "saga_id": saga.saga_id,
                        "step_name": step.name,
                        "original_result": step.result,
                        **saga.data
                    }
                )

    async def handle_compensation_completed(self, saga_id: str, step_name: str):
        """处理补偿完成。"""
        saga = await self.saga_store.get(saga_id)

        for step in saga.steps:
            if step.name == step_name:
                step.status = "compensated"
                step.compensated_at = datetime.utcnow()
                break

        # 检查是否所有补偿完成
        all_compensated = all(
            s.status in ("compensated", "pending", "failed")
            for s in saga.steps
        )

        if all_compensated:
            saga.state = SagaState.FAILED
            await self._on_saga_failed(saga)

        await self.saga_store.save(saga)

    async def _on_saga_completed(self, saga: Saga):
        """当saga成功完成时调用。"""
        await self.event_publisher.publish(
            f"{self.saga_type}Completed",
            {"saga_id": saga.saga_id, **saga.data}
        )

    async def _on_saga_failed(self, saga: Saga):
        """当saga在补偿后失败时调用。"""
        await self.event_publisher.publish(
            f"{self.saga_type}Failed",
            {"saga_id": saga.saga_id, "error": "Saga失败", **saga.data}
        )

模板 2: 订单履行Saga

class OrderFulfillmentSaga(SagaOrchestrator):
    """协调跨服务的订单履行。"""

    @property
    def saga_type(self) -> str:
        return "OrderFulfillment"

    def define_steps(self, data: Dict) -> List[SagaStep]:
        return [
            SagaStep(
                name="reserve_inventory",
                action="InventoryService.ReserveItems",
                compensation="InventoryService.ReleaseReservation"
            ),
            SagaStep(
                name="process_payment",
                action="PaymentService.ProcessPayment",
                compensation="PaymentService.RefundPayment"
            ),
            SagaStep(
                name="create_shipment",
                action="ShippingService.CreateShipment",
                compensation="ShippingService.CancelShipment"
            ),
            SagaStep(
                name="send_confirmation",
                action="NotificationService.SendOrderConfirmation",
                compensation="NotificationService.SendCancellationNotice"
            )
        ]


# 使用
async def create_order(order_data: Dict):
    saga = OrderFulfillmentSaga(saga_store, event_publisher)
    return await saga.start({
        "order_id": order_data["order_id"],
        "customer_id": order_data["customer_id"],
        "items": order_data["items"],
        "payment_method": order_data["payment_method"],
        "shipping_address": order_data["shipping_address"]
    })


# 每个服务中的事件处理程序
class InventoryService:
    async def handle_reserve_items(self, command: Dict):
        try:
            # 保留库存
            reservation = await self.reserve(
                command["items"],
                command["order_id"]
            )
            # 报告成功
            await self.event_publisher.publish(
                "SagaStepCompleted",
                {
                    "saga_id": command["saga_id"],
                    "step_name": "reserve_inventory",
                    "result": {"reservation_id": reservation.id}
                }
            )
        except InsufficientInventoryError as e:
            await self.event_publisher.publish(
                "SagaStepFailed",
                {
                    "saga_id": command["saga_id"],
                    "step_name": "reserve_inventory",
                    "error": str(e)
                }
            )

    async def handle_release_reservation(self, command: Dict):
        # 补偿操作
        await self.release_reservation(
            command["original_result"]["reservation_id"]
        )
        await self.event_publisher.publish(
            "SagaCompensationCompleted",
            {
                "saga_id": command["saga_id"],
                "step_name": "reserve_inventory"
            }
        )

模板 3: 基于编排的Saga

from dataclasses import dataclass
from typing import Dict, Any
import asyncio

@dataclass
class SagaContext:
    """在编排的saga事件中传递。"""
    saga_id: str
    step: int
    data: Dict[str, Any]
    completed_steps: list


class OrderChoreographySaga:
    """使用事件的编排式saga。"""

    def __init__(self, event_bus):
        self.event_bus = event_bus
        self._register_handlers()

    def _register_handlers(self):
        self.event_bus.subscribe("OrderCreated", self._on_order_created)
        self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
        self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
        self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)

        # 补偿处理程序
        self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
        self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)

    async def _on_order_created(self, event: Dict):
        """步骤 1: 订单创建,保留库存。"""
        await self.event_bus.publish("ReserveInventory", {
            "saga_id": event["order_id"],
            "order_id": event["order_id"],
            "items": event["items"]
        })

    async def _on_inventory_reserved(self, event: Dict):
        """步骤 2: 库存保留,处理支付。"""
        await self.event_bus.publish("ProcessPayment", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "amount": event["total_amount"],
            "reservation_id": event["reservation_id"]
        })

    async def _on_payment_processed(self, event: Dict):
        """步骤 3: 支付完成,创建发货。"""
        await self.event_bus.publish("CreateShipment", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "payment_id": event["payment_id"]
        })

    async def _on_shipment_created(self, event: Dict):
        """步骤 4: 完成 - 发送确认。"""
        await self.event_bus.publish("OrderFulfilled", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "tracking_number": event["tracking_number"]
        })

    # 补偿处理程序
    async def _on_payment_failed(self, event: Dict):
        """支付失败 - 释放库存。"""
        await self.event_bus.publish("ReleaseInventory", {
            "saga_id": event["saga_id"],
            "reservation_id": event["reservation_id"]
        })
        await self.event_bus.publish("OrderFailed", {
            "order_id": event["order_id"],
            "reason": "支付失败"
        })

    async def _on_shipment_failed(self, event: Dict):
        """发货失败 - 退款支付并释放库存。"""
        await self.event_bus.publish("RefundPayment", {
            "saga_id": event["saga_id"],
            "payment_id": event["payment_id"]
        )
        await self.event_bus.publish("ReleaseInventory", {
            "saga_id": event["saga_id"],
            "reservation_id": event["reservation_id"]
        )

模板 4: 带超时的Saga

class TimeoutSagaOrchestrator(SagaOrchestrator):
    """带步骤超时的Saga协调器。"""

    def __init__(self, saga_store, event_publisher, scheduler):
        super().__init__(saga_store, event_publisher)
        self.scheduler = scheduler

    async def _execute_next_step(self, saga: Saga):
        if saga.current_step >= len(saga.steps):
            return

        step = saga.steps[saga.current_step]
        step.status = "executing"
        step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
        await self.saga_store.save(saga)

        # 调度超时检查
        await self.scheduler.schedule(
            f"saga_timeout_{saga.saga_id}_{step.name}",
            self._check_timeout,
            {"saga_id": saga.saga_id, "step_name": step.name},
            run_at=step.timeout_at
        )

        await self.event_publisher.publish(
            step.action,
            {"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
        )

    async def _check_timeout(self, data: Dict):
        """检查步骤是否超时。"""
        saga = await self.saga_store.get(data["saga_id"])
        step = next(s for s in saga.steps if s.name == data["step_name"])

        if step.status == "executing":
            # 步骤超时 - 失败
            await self.handle_step_failed(
                data["saga_id"],
                data["step_name"],
                "步骤超时"
            )

最佳实践

要做的事

  • 使步骤幂等 - 安全可重试
  • 仔细设计补偿 - 它们必须工作
  • 使用关联ID - 用于跨服务跟踪
  • 实现超时 - 不要无限等待
  • 记录一切 - 用于调试故障

不要做的事

  • 不要假设即时完成 - Saga需要时间
  • 不要跳过补偿测试 - 最关键部分
  • 不要耦合服务 - 使用异步消息
  • 不要忽略部分失败 - 优雅处理

资源