name: saga-orchestration description: 实现saga模式,用于分布式事务和跨聚合工作流。当协调多步骤业务流程、处理补偿事务或管理长运行工作流时使用。 license: MIT metadata: version: “1.0.0” domain: 架构 triggers: saga模式、分布式事务、补偿事务、长运行工作流、编排 role: 架构师 scope: 系统设计 output-format: 代码 related-skills: 事件溯源架构、CQRS实现、微服务模式
Saga 编排
管理分布式事务和长运行业务流程的模式。
不要使用此技能当
- 任务与saga编排无关
- 需要此范围外的不同领域或工具
指令
- 澄清目标、约束和所需输入。
- 应用相关最佳实践并验证结果。
- 提供可操作步骤和验证。
- 如果需要详细示例,请打开
resources/implementation-playbook.md。
使用此技能当
- 协调多服务事务
- 实现补偿事务
- 管理长运行业务工作流
- 处理分布式系统中的故障
- 构建订单履行流程
- 实现审批工作流
核心概念
1. Saga 类型
编排式 协调式
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│服务A│─►│服务B│─►│服务C│ │ 协调器 │
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
事件 事件 事件 ▼ ▼ ▼
┌────┐┌────┐┌────┐
│服务1││服务2││服务3│
└────┘└────┘└────┘
2. Saga 执行状态
| 状态 | 描述 |
|---|---|
| 已开始 | Saga 已初始化 |
| 待处理 | 等待步骤完成 |
| 补偿中 | 由于失败而回滚 |
| 已完成 | 所有步骤成功 |
| 失败 | Saga 在补偿后失败 |
模板
模板 1: Saga 协调器基础
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None
@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class SagaOrchestrator(ABC):
"""Saga协调器基类。"""
def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher
@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""定义saga步骤。"""
pass
@property
@abstractmethod
def saga_type(self) -> str:
"""唯一的saga类型标识符。"""
pass
async def start(self, data: Dict) -> Saga:
"""开始一个新的saga。"""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""处理成功的步骤完成。"""
saga = await self.saga_store.get(saga_id)
# 更新步骤
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break
saga.current_step += 1
saga.updated_at = datetime.utcnow()
# 检查saga是否完成
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""处理步骤失败 - 开始补偿。"""
saga = await self.saga_store.get(saga_id)
# 标记步骤为失败
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break
saga.state = SagaState.COMPENSATING
saga.updated_at = datetime.utcnow()
await self.saga_store.save(saga)
# 从当前步骤向后开始补偿
await self._compensate(saga)
async def _execute_next_step(self, saga: Saga):
"""执行saga中的下一个步骤。"""
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
await self.saga_store.save(saga)
# 发布命令以执行步骤
await self.event_publisher.publish(
step.action,
{
"saga_id": saga.saga_id,
"step_name": step.name,
**saga.data
}
)
async def _compensate(self, saga: Saga):
"""为已完成的步骤执行补偿。"""
# 以相反顺序补偿
for i in range(saga.current_step - 1, -1, -1):
step = saga.steps[i]
if step.status == "completed":
step.status = "compensating"
await self.saga_store.save(saga)
await self.event_publisher.publish(
step.compensation,
{
"saga_id": saga.saga_id,
"step_name": step.name,
"original_result": step.result,
**saga.data
}
)
async def handle_compensation_completed(self, saga_id: str, step_name: str):
"""处理补偿完成。"""
saga = await self.saga_store.get(saga_id)
for step in saga.steps:
if step.name == step_name:
step.status = "compensated"
step.compensated_at = datetime.utcnow()
break
# 检查是否所有补偿完成
all_compensated = all(
s.status in ("compensated", "pending", "failed")
for s in saga.steps
)
if all_compensated:
saga.state = SagaState.FAILED
await self._on_saga_failed(saga)
await self.saga_store.save(saga)
async def _on_saga_completed(self, saga: Saga):
"""当saga成功完成时调用。"""
await self.event_publisher.publish(
f"{self.saga_type}Completed",
{"saga_id": saga.saga_id, **saga.data}
)
async def _on_saga_failed(self, saga: Saga):
"""当saga在补偿后失败时调用。"""
await self.event_publisher.publish(
f"{self.saga_type}Failed",
{"saga_id": saga.saga_id, "error": "Saga失败", **saga.data}
)
模板 2: 订单履行Saga
class OrderFulfillmentSaga(SagaOrchestrator):
"""协调跨服务的订单履行。"""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
)
]
# 使用
async def create_order(order_data: Dict):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"]
})
# 每个服务中的事件处理程序
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
# 保留库存
reservation = await self.reserve(
command["items"],
command["order_id"]
)
# 报告成功
await self.event_publisher.publish(
"SagaStepCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
}
)
except InsufficientInventoryError as e:
await self.event_publisher.publish(
"SagaStepFailed",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
}
)
async def handle_release_reservation(self, command: Dict):
# 补偿操作
await self.release_reservation(
command["original_result"]["reservation_id"]
)
await self.event_publisher.publish(
"SagaCompensationCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
}
)
模板 3: 基于编排的Saga
from dataclasses import dataclass
from typing import Dict, Any
import asyncio
@dataclass
class SagaContext:
"""在编排的saga事件中传递。"""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""使用事件的编排式saga。"""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# 补偿处理程序
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
"""步骤 1: 订单创建,保留库存。"""
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_inventory_reserved(self, event: Dict):
"""步骤 2: 库存保留,处理支付。"""
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"]
})
async def _on_payment_processed(self, event: Dict):
"""步骤 3: 支付完成,创建发货。"""
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"]
})
async def _on_shipment_created(self, event: Dict):
"""步骤 4: 完成 - 发送确认。"""
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"]
})
# 补偿处理程序
async def _on_payment_failed(self, event: Dict):
"""支付失败 - 释放库存。"""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "支付失败"
})
async def _on_shipment_failed(self, event: Dict):
"""发货失败 - 退款支付并释放库存。"""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"]
)
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
)
模板 4: 带超时的Saga
class TimeoutSagaOrchestrator(SagaOrchestrator):
"""带步骤超时的Saga协调器。"""
def __init__(self, saga_store, event_publisher, scheduler):
super().__init__(saga_store, event_publisher)
self.scheduler = scheduler
async def _execute_next_step(self, saga: Saga):
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
await self.saga_store.save(saga)
# 调度超时检查
await self.scheduler.schedule(
f"saga_timeout_{saga.saga_id}_{step.name}",
self._check_timeout,
{"saga_id": saga.saga_id, "step_name": step.name},
run_at=step.timeout_at
)
await self.event_publisher.publish(
step.action,
{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
)
async def _check_timeout(self, data: Dict):
"""检查步骤是否超时。"""
saga = await self.saga_store.get(data["saga_id"])
step = next(s for s in saga.steps if s.name == data["step_name"])
if step.status == "executing":
# 步骤超时 - 失败
await self.handle_step_failed(
data["saga_id"],
data["step_name"],
"步骤超时"
)
最佳实践
要做的事
- 使步骤幂等 - 安全可重试
- 仔细设计补偿 - 它们必须工作
- 使用关联ID - 用于跨服务跟踪
- 实现超时 - 不要无限等待
- 记录一切 - 用于调试故障
不要做的事
- 不要假设即时完成 - Saga需要时间
- 不要跳过补偿测试 - 最关键部分
- 不要耦合服务 - 使用异步消息
- 不要忽略部分失败 - 优雅处理