name: event-sourcing-design description: 事件溯源模式和设计决策 allowed-tools: 读, 全局, 搜索, 写, 编辑
事件溯源设计技能
设计具有适当事件存储、投影和版本控制模式的事件溯源系统。
强制:文档优先方法
在设计事件溯源之前:
- 调用
docs-management技能 获取事件溯源模式 - 通过 MCP 服务器验证模式(如 perplexity, context7)
- 基于已建立的事件溯源文献提供指导
事件溯源基础
传统 vs 事件溯源:
传统(基于状态):
┌─────────────┐ ┌─────────────┐
│ 应用程序 │───►│ 数据库 │
│ │ │ (当前状态)│
└─────────────┘ └─────────────┘
事件溯源:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 应用程序 │───►│ 事件存储 │───►│ 投影 │
│ │ │ (所有事件)│ │ (读取视图)│
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
[完整历史]
何时使用事件溯源
适用场景
事件溯源适合:
✓ 审计需求
- 需要完整历史
- 法规遵从
- 法律证据
✓ 复杂领域逻辑
- 业务规则演变
- 需要时间查询
- “假设”分析
✓ 高价值聚合
- 金融交易
- 医疗记录
- 法律文件
✓ 协作场景
- 冲突解决
- 合并能力
- 离线同步
✓ 事件驱动架构
- 微服务集成
- 异步处理
- 实时更新
不适用场景
事件溯源可能不适合:
✗ 简单 CRUD
- 基本数据录入
- 无审计需求
- 简单查询
✗ 频繁更新
- 高频小变化
- 实时流数据
- IoT 传感器数据
✗ 大型聚合
- 每个聚合许多事件
- 性能担忧
- 内存限制
✗ 临时查询
- 复杂报告
- 未知查询模式
- BI/分析重点
事件存储设计
事件结构
// C# 事件结构示例
public record DomainEvent
{
public required Guid EventId { get; init; }
public required string EventType { get; init; }
public required Guid AggregateId { get; init; }
public required string AggregateType { get; init; }
public required long Version { get; init; }
public required DateTimeOffset Timestamp { get; init; }
public required string Payload { get; init; } // JSON
public required string? Metadata { get; init; } // 关联, 因果
}
流组织
流策略:
按聚合(最常见):
流: "Order-{orderId}"
事件: OrderCreated, ItemAdded, OrderPaid, ...
按类别:
流: "$ce-Order"(类别投影)
所有订单的所有事件
按关联:
流: "Saga-{correlationId}"
一个工作流中跨聚合的事件
全局流:
流: "$all"
所有事件按顺序(用于投影)
事件存储模式
-- PostgreSQL 事件存储模式
CREATE TABLE events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_position BIGINT NOT NULL,
global_position BIGSERIAL NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(stream_id, stream_position)
);
CREATE INDEX idx_events_stream ON events(stream_id, stream_position);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);
聚合设计
聚合结构
// C# 聚合示例
public abstract class Aggregate
{
public Guid Id { get; protected set; }
public long Version { get; protected set; } = -1;
private readonly List<object> _uncommittedEvents = new();
protected void Apply(object @event)
{
When(@event);
_uncommittedEvents.Add(@event);
}
protected abstract void When(object @event);
public void Load(IEnumerable<object> events)
{
foreach (var @event in events)
{
When(@event);
Version++;
}
}
public IReadOnlyList<object> GetUncommittedEvents()
=> _uncommittedEvents;
public void ClearUncommittedEvents()
=> _uncommittedEvents.Clear();
}
public class Order : Aggregate
{
private OrderStatus _status;
private List<OrderItem> _items = new();
public void Place(Guid customerId, List<OrderItem> items)
{
if (_status != OrderStatus.Draft)
throw new InvalidOperationException("订单已下");
Apply(new OrderPlaced(Id, customerId, items, DateTimeOffset.UtcNow));
}
protected override void When(object @event)
{
switch (@event)
{
case OrderPlaced e:
Id = e.OrderId;
_status = OrderStatus.Placed;
_items = e.Items.ToList();
break;
// 处理其他事件...
}
}
}
重水化模式
聚合重水化:
1. 加载流
从事件存储读取聚合的所有事件
2. 创建聚合
实例化空聚合
3. 应用事件
重放每个事件以重建状态
4. 执行命令
根据当前状态验证
生成新事件
5. 保存事件
将新事件附加到流
使用乐观并发
┌──────────┐ ┌─────────────┐ ┌──────────┐
│ 加载 │───►│ 重放 │───►│ 执行 │
│ 事件 │ │ 事件 │ │ 命令 │
└──────────┘ └─────────────┘ └─────┬────┘
│
┌───────────────────┘
▼
┌──────────────┐
│ 附加新事件 │
│ │
└──────────────┘
投影模式
投影类型
投影类别:
1. 实时投影
- 实时构建
- 订阅事件流
- 最终一致性
- 适合读取模型
2. 追赶投影
- 从历史重建
- 可随时运行
- 用于新读取模型
- 批处理
3. 快照投影
- 定期状态捕获
- 优化重水化
- 与事件结合
4. 内联投影
- 与写操作同一事务
- 强一致性
- 可扩展性有限
投影实现
// C# 投影示例
public class OrderSummaryProjection : IProjection
{
private readonly IOrderSummaryRepository _repository;
public async Task HandleAsync(OrderPlaced @event)
{
var summary = new OrderSummary
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
Status = "已下",
ItemCount = @event.Items.Count,
TotalAmount = @event.Items.Sum(i => i.Price * i.Quantity),
PlacedAt = @event.Timestamp
};
await _repository.InsertAsync(summary);
}
public async Task HandleAsync(OrderPaid @event)
{
await _repository.UpdateAsync(
@event.OrderId,
summary => summary.Status = "已支付");
}
}
快照
何时快照
快照决策:
快照当:
- 聚合有许多事件(100+)
- 重水化时间慢
- 读取性能重要
- 事件追加频繁
快照频率:
- 每 N 个事件(例如,100)
- 按时间间隔
- 在重大状态变化时
- 按需(懒)
不快照当:
- 聚合短期存在
- 每个聚合事件少
- 完整历史重放罕见
快照结构
// 快照记录
public record Snapshot
{
public required Guid AggregateId { get; init; }
public required string AggregateType { get; init; }
public required long Version { get; init; }
public required string State { get; init; } // 序列化
public required DateTimeOffset CreatedAt { get; init; }
}
// 加载带快照
public async Task<Order> LoadAsync(Guid orderId)
{
// 1. 尝试加载快照
var snapshot = await _snapshotStore.GetLatestAsync(orderId);
// 2. 从快照或空创建聚合
var order = snapshot != null
? Order.FromSnapshot(snapshot)
: new Order();
// 3. 加载快照版本后的事件
var events = await _eventStore.ReadAsync(
$"Order-{orderId}",
fromVersion: snapshot?.Version + 1 ?? 0);
// 4. 应用剩余事件
order.Load(events);
return order;
}
事件版本控制
版本控制策略
事件模式演进:
1. 弱模式(推荐)
- 添加新的可选字段
- 旧事件反序列化使用默认值
- 前向/后向兼容
2. 向上转换
- 将旧事件转换为新格式
- 在读取时,不在存储时
- 保持原始事件完整
3. 事件类型版本控制
- OrderPlacedV1, OrderPlacedV2
- 路由到适当处理器
- 更明确,更冗长
4. 复制和转换
- 迁移整个流
- 从旧事件创建新事件
- 一次性操作(有风险)
向上转换示例
// 向上转换模式
public interface IEventUpcaster
{
bool CanUpcast(string eventType, JsonDocument payload);
object Upcast(string eventType, JsonDocument payload);
}
public class OrderPlacedV1ToV2Upcaster : IEventUpcaster
{
public bool CanUpcast(string eventType, JsonDocument payload)
{
return eventType == "OrderPlaced"
&& !payload.RootElement.TryGetProperty("Currency", out _);
}
public object Upcast(string eventType, JsonDocument payload)
{
// 将 V1(无货币)转换为 V2(有货币)
return new OrderPlacedV2
{
OrderId = payload.GetProperty("OrderId").GetGuid(),
CustomerId = payload.GetProperty("CustomerId").GetGuid(),
Items = DeserializeItems(payload.GetProperty("Items")),
Currency = "USD", // V1 事件的默认值
Timestamp = payload.GetProperty("Timestamp").GetDateTimeOffset()
};
}
}
设计决策矩阵
| 因素 | 事件溯源 | 基于状态 |
|---|---|---|
| 审计追踪 | 内置,完整 | 需要单独日志 |
| 复杂性 | 初始较高 | 初始较低 |
| 查询灵活性 | 需要投影 | 直接查询 |
| 时间查询 | 原生支持 | 难以改造 |
| 存储 | 随事件增长 | 固定(当前状态) |
| 调试 | 事件重放 | 状态检查 |
| 团队经验 | 需要培训 | 熟悉模式 |
工作流程
设计事件溯源系统时:
- 评估适用性:事件溯源是否合适?
- 识别聚合:定义一致性边界
- 设计事件:命名、结构、版本控制策略
- 选择事件存储:EventStoreDB, Marten, 自定义
- 规划投影:所需读取模型
- 考虑快照:性能优化
- 版本策略:事件如何演进?
- 测试策略:基于事件的测试
参考
详细指导:
最后更新: 2025-12-26