事件溯源设计Skill event-sourcing-design

该技能用于设计事件溯源系统,包括事件存储、投影和版本控制模式,适用于需要完整审计追踪、复杂业务逻辑和事件驱动架构的软件开发场景。关键词:事件溯源、软件架构、微服务、审计追踪、事件驱动、数据持久化。

架构设计 0 次安装 0 次浏览 更新于 3/11/2026

name: event-sourcing-design description: 事件溯源模式和设计决策 allowed-tools: 读, 全局, 搜索, 写, 编辑

事件溯源设计技能

设计具有适当事件存储、投影和版本控制模式的事件溯源系统。

强制:文档优先方法

在设计事件溯源之前:

  1. 调用 docs-management 技能 获取事件溯源模式
  2. 通过 MCP 服务器验证模式(如 perplexity, context7)
  3. 基于已建立的事件溯源文献提供指导

事件溯源基础

传统 vs 事件溯源:

传统(基于状态):
┌─────────────┐    ┌─────────────┐
│ 应用程序   │───►│ 数据库     │
│             │    │ (当前状态)│
└─────────────┘    └─────────────┘

事件溯源:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 应用程序   │───►│ 事件存储   │───►│ 投影       │
│             │    │ (所有事件)│    │ (读取视图)│
└─────────────┘    └─────────────┘    └─────────────┘
                         │
                         ▼
                   [完整历史]

何时使用事件溯源

适用场景

事件溯源适合:

✓ 审计需求
  - 需要完整历史
  - 法规遵从
  - 法律证据

✓ 复杂领域逻辑
  - 业务规则演变
  - 需要时间查询
  - “假设”分析

✓ 高价值聚合
  - 金融交易
  - 医疗记录
  - 法律文件

✓ 协作场景
  - 冲突解决
  - 合并能力
  - 离线同步

✓ 事件驱动架构
  - 微服务集成
  - 异步处理
  - 实时更新

不适用场景

事件溯源可能不适合:

✗ 简单 CRUD
  - 基本数据录入
  - 无审计需求
  - 简单查询

✗ 频繁更新
  - 高频小变化
  - 实时流数据
  - IoT 传感器数据

✗ 大型聚合
  - 每个聚合许多事件
  - 性能担忧
  - 内存限制

✗ 临时查询
  - 复杂报告
  - 未知查询模式
  - BI/分析重点

事件存储设计

事件结构

// C# 事件结构示例
public record DomainEvent
{
    public required Guid EventId { get; init; }
    public required string EventType { get; init; }
    public required Guid AggregateId { get; init; }
    public required string AggregateType { get; init; }
    public required long Version { get; init; }
    public required DateTimeOffset Timestamp { get; init; }
    public required string Payload { get; init; }  // JSON
    public required string? Metadata { get; init; } // 关联, 因果
}

流组织

流策略:

按聚合(最常见):
流: "Order-{orderId}"
事件: OrderCreated, ItemAdded, OrderPaid, ...

按类别:
流: "$ce-Order"(类别投影)
所有订单的所有事件

按关联:
流: "Saga-{correlationId}"
一个工作流中跨聚合的事件

全局流:
流: "$all"
所有事件按顺序(用于投影)

事件存储模式

-- PostgreSQL 事件存储模式
CREATE TABLE events (
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_position BIGINT NOT NULL,
    global_position BIGSERIAL NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    metadata JSONB,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE(stream_id, stream_position)
);

CREATE INDEX idx_events_stream ON events(stream_id, stream_position);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);

聚合设计

聚合结构

// C# 聚合示例
public abstract class Aggregate
{
    public Guid Id { get; protected set; }
    public long Version { get; protected set; } = -1;

    private readonly List<object> _uncommittedEvents = new();

    protected void Apply(object @event)
    {
        When(@event);
        _uncommittedEvents.Add(@event);
    }

    protected abstract void When(object @event);

    public void Load(IEnumerable<object> events)
    {
        foreach (var @event in events)
        {
            When(@event);
            Version++;
        }
    }

    public IReadOnlyList<object> GetUncommittedEvents()
        => _uncommittedEvents;

    public void ClearUncommittedEvents()
        => _uncommittedEvents.Clear();
}

public class Order : Aggregate
{
    private OrderStatus _status;
    private List<OrderItem> _items = new();

    public void Place(Guid customerId, List<OrderItem> items)
    {
        if (_status != OrderStatus.Draft)
            throw new InvalidOperationException("订单已下");

        Apply(new OrderPlaced(Id, customerId, items, DateTimeOffset.UtcNow));
    }

    protected override void When(object @event)
    {
        switch (@event)
        {
            case OrderPlaced e:
                Id = e.OrderId;
                _status = OrderStatus.Placed;
                _items = e.Items.ToList();
                break;
            // 处理其他事件...
        }
    }
}

重水化模式

聚合重水化:

1. 加载流
   从事件存储读取聚合的所有事件

2. 创建聚合
   实例化空聚合

3. 应用事件
   重放每个事件以重建状态

4. 执行命令
   根据当前状态验证
   生成新事件

5. 保存事件
   将新事件附加到流
   使用乐观并发

┌──────────┐    ┌─────────────┐    ┌──────────┐
│ 加载     │───►│ 重放       │───►│ 执行     │
│ 事件     │    │ 事件       │    │ 命令     │
└──────────┘    └─────────────┘    └─────┬────┘
                                         │
                     ┌───────────────────┘
                     ▼
            ┌──────────────┐
            │ 附加新事件   │
            │              │
            └──────────────┘

投影模式

投影类型

投影类别:

1. 实时投影
   - 实时构建
   - 订阅事件流
   - 最终一致性
   - 适合读取模型

2. 追赶投影
   - 从历史重建
   - 可随时运行
   - 用于新读取模型
   - 批处理

3. 快照投影
   - 定期状态捕获
   - 优化重水化
   - 与事件结合

4. 内联投影
   - 与写操作同一事务
   - 强一致性
   - 可扩展性有限

投影实现

// C# 投影示例
public class OrderSummaryProjection : IProjection
{
    private readonly IOrderSummaryRepository _repository;

    public async Task HandleAsync(OrderPlaced @event)
    {
        var summary = new OrderSummary
        {
            OrderId = @event.OrderId,
            CustomerId = @event.CustomerId,
            Status = "已下",
            ItemCount = @event.Items.Count,
            TotalAmount = @event.Items.Sum(i => i.Price * i.Quantity),
            PlacedAt = @event.Timestamp
        };

        await _repository.InsertAsync(summary);
    }

    public async Task HandleAsync(OrderPaid @event)
    {
        await _repository.UpdateAsync(
            @event.OrderId,
            summary => summary.Status = "已支付");
    }
}

快照

何时快照

快照决策:

快照当:
- 聚合有许多事件(100+)
- 重水化时间慢
- 读取性能重要
- 事件追加频繁

快照频率:
- 每 N 个事件(例如,100)
- 按时间间隔
- 在重大状态变化时
- 按需(懒)

不快照当:
- 聚合短期存在
- 每个聚合事件少
- 完整历史重放罕见

快照结构

// 快照记录
public record Snapshot
{
    public required Guid AggregateId { get; init; }
    public required string AggregateType { get; init; }
    public required long Version { get; init; }
    public required string State { get; init; }  // 序列化
    public required DateTimeOffset CreatedAt { get; init; }
}

// 加载带快照
public async Task<Order> LoadAsync(Guid orderId)
{
    // 1. 尝试加载快照
    var snapshot = await _snapshotStore.GetLatestAsync(orderId);

    // 2. 从快照或空创建聚合
    var order = snapshot != null
        ? Order.FromSnapshot(snapshot)
        : new Order();

    // 3. 加载快照版本后的事件
    var events = await _eventStore.ReadAsync(
        $"Order-{orderId}",
        fromVersion: snapshot?.Version + 1 ?? 0);

    // 4. 应用剩余事件
    order.Load(events);

    return order;
}

事件版本控制

版本控制策略

事件模式演进:

1. 弱模式(推荐)
   - 添加新的可选字段
   - 旧事件反序列化使用默认值
   - 前向/后向兼容

2. 向上转换
   - 将旧事件转换为新格式
   - 在读取时,不在存储时
   - 保持原始事件完整

3. 事件类型版本控制
   - OrderPlacedV1, OrderPlacedV2
   - 路由到适当处理器
   - 更明确,更冗长

4. 复制和转换
   - 迁移整个流
   - 从旧事件创建新事件
   - 一次性操作(有风险)

向上转换示例

// 向上转换模式
public interface IEventUpcaster
{
    bool CanUpcast(string eventType, JsonDocument payload);
    object Upcast(string eventType, JsonDocument payload);
}

public class OrderPlacedV1ToV2Upcaster : IEventUpcaster
{
    public bool CanUpcast(string eventType, JsonDocument payload)
    {
        return eventType == "OrderPlaced"
            && !payload.RootElement.TryGetProperty("Currency", out _);
    }

    public object Upcast(string eventType, JsonDocument payload)
    {
        // 将 V1(无货币)转换为 V2(有货币)
        return new OrderPlacedV2
        {
            OrderId = payload.GetProperty("OrderId").GetGuid(),
            CustomerId = payload.GetProperty("CustomerId").GetGuid(),
            Items = DeserializeItems(payload.GetProperty("Items")),
            Currency = "USD",  // V1 事件的默认值
            Timestamp = payload.GetProperty("Timestamp").GetDateTimeOffset()
        };
    }
}

设计决策矩阵

因素 事件溯源 基于状态
审计追踪 内置,完整 需要单独日志
复杂性 初始较高 初始较低
查询灵活性 需要投影 直接查询
时间查询 原生支持 难以改造
存储 随事件增长 固定(当前状态)
调试 事件重放 状态检查
团队经验 需要培训 熟悉模式

工作流程

设计事件溯源系统时:

  1. 评估适用性:事件溯源是否合适?
  2. 识别聚合:定义一致性边界
  3. 设计事件:命名、结构、版本控制策略
  4. 选择事件存储:EventStoreDB, Marten, 自定义
  5. 规划投影:所需读取模型
  6. 考虑快照:性能优化
  7. 版本策略:事件如何演进?
  8. 测试策略:基于事件的测试

参考

详细指导:


最后更新: 2025-12-26