CQRS架构设计与查询优化技能Skill cqrs-architecture

本技能提供CQRS(命令查询职责分离)架构的完整设计与实现指南,涵盖从基础模式到高级事件驱动系统的查询优化、命令处理和同步策略。适用于构建可扩展、高性能的微服务和分布式系统。关键词:CQRS,软件架构,查询优化,事件驱动,命令处理,读模型,微服务,可扩展系统,MediatR实现。

架构设计 0 次安装 0 次浏览 更新于 3/11/2026

名称: cqrs-architecture 描述: CQRS模式实现和查询优化 允许工具: 读取, 全局, 搜索, 写入, 编辑

CQRS架构技能

设计和实现命令查询职责分离模式,用于可扩展系统。

强制:文档优先方法

在实现CQRS之前:

  1. 调用docs-management技能获取CQRS模式文档
  2. 通过MCP服务器验证模式(如perplexity, context7)
  3. 基于已建立的CQRS文献提供指导

CQRS基础

传统 vs CQRS:

传统(单一模型):
┌─────────────────────────────────┐
│         应用程序               │
├─────────────────────────────────┤
│      领域模型                 │
│  (读取 + 写入)              │
├─────────────────────────────────┤
│         数据库                │
└─────────────────────────────────┘

CQRS(分离模型):
┌───────────────┐    ┌───────────────┐
│  命令端      │    │   查询端      │
│ (写入模型)  │    │ (读取模型)   │
├───────────────┤    ├───────────────┤
│  领域逻辑    │    │  DTO/视图     │
│  聚合        │    │  投影         │
├───────────────┤    ├───────────────┤
│  写入数据库  │───►│  读取数据库    │
└───────────────┘    └───────────────┘

CQRS级别

级别1:逻辑分离

相同数据库,分离代码路径:

┌─────────────────────────────────────┐
│           应用程序                 │
├──────────────────┬──────────────────┤
│ 命令处理器      │ 查询处理器      │
│ - 验证          │ - 直接SQL       │
│ - 领域逻辑      │ - 投影          │
│ - 事件          │ - DTOs          │
├──────────────────┴──────────────────┤
│           单一数据库              │
└─────────────────────────────────────┘

好处:
✓ 代码清晰分离
✓ 简单部署
✓ 单一数据源
✓ 良好的起点

级别2:分离读取模型

相同写入数据库,分离读取数据库:

┌─────────────────┐    ┌─────────────────┐
│ 命令端          │    │  查询端         │
├─────────────────┤    ├─────────────────┤
│ 命令处理器      │    │ 查询处理器      │
│ 领域模型        │    │ DTOs            │
├─────────────────┤    ├─────────────────┤
│ 写入数据库      │───►│ 读取数据库      │
│ (规范化)      │同步│ (反规范化)    │
└─────────────────┘    └─────────────────┘

好处:
✓ 优化读取性能
✓ 独立扩展读取
✓ 不同存储技术
✓ 最终一致读取

级别3:事件源CQRS

事件存储作为写入模型,投影作为读取:

┌─────────────────┐    ┌─────────────────┐
│ 命令端          │    │  查询端         │
├─────────────────┤    ├─────────────────┤
│ 命令处理器      │    │ 查询处理器      │
│ 聚合            │    │ 读取模型        │
├─────────────────┤    ├─────────────────┤
│ 事件存储        │───►│ 多读取数据库    │
│ (仅追加)      │    │                 │
└─────────────────┘    └─────────────────┘

好处:
✓ 完整审计跟踪
✓ 时间查询
✓ 多投影
✓ 重建读取模型

命令端设计

命令结构

// 命令定义
public record PlaceOrderCommand(
    Guid CustomerId,
    List<OrderItemDto> Items,
    string ShippingAddress
) : ICommand<OrderId>;

// 命令处理器
public class PlaceOrderHandler : ICommandHandler<PlaceOrderCommand, OrderId>
{
    private readonly IOrderRepository _repository;
    private readonly IEventPublisher _events;

    public async Task<OrderId> HandleAsync(
        PlaceOrderCommand command,
        CancellationToken ct)
    {
        // 验证
        if (!command.Items.Any())
            throw new ValidationException("订单必须有商品");

        // 领域逻辑
        var order = Order.Create(
            command.CustomerId,
            command.Items.Select(i => new OrderItem(i.ProductId, i.Quantity)));

        // 持久化
        await _repository.SaveAsync(order, ct);

        // 发布事件
        await _events.PublishAsync(order.GetDomainEvents(), ct);

        return order.Id;
    }
}

命令模式

命令最佳实践:

命名:
- 命令式:PlaceOrder, CancelOrder, UpdateAddress
- 包含上下文:不只是“Create”而是“CreateOrder”

结构:
- 不可变(记录)
- 仅操作所需数据
- 命令中无业务逻辑

验证:
- 处理器中输入验证
- 领域业务验证
- 返回有意义错误

幂等性:
- 包含幂等键
- 处理重复提交
- 重试返回相同结果

查询端设计

查询结构

// 查询定义
public record GetOrderByIdQuery(Guid OrderId) : IQuery<OrderDetailsDto>;

// 查询处理器
public class GetOrderByIdHandler : IQueryHandler<GetOrderByIdQuery, OrderDetailsDto>
{
    private readonly IReadDbContext _db;

    public async Task<OrderDetailsDto> HandleAsync(
        GetOrderByIdQuery query,
        CancellationToken ct)
    {
        var order = await _db.OrderDetails
            .Where(o => o.OrderId == query.OrderId)
            .Select(o => new OrderDetailsDto
            {
                OrderId = o.OrderId,
                CustomerName = o.Customer.Name,
                Items = o.Items.Select(i => new OrderItemDto
                {
                    ProductName = i.ProductName,
                    Quantity = i.Quantity,
                    Price = i.Price
                }).ToList(),
                Status = o.Status,
                TotalAmount = o.TotalAmount
            })
            .FirstOrDefaultAsync(ct);

        return order ?? throw new NotFoundException("订单未找到");
    }
}

读取模型优化

查询优化策略:

1. 反规范化
   - 预连接数据
   - 存储计算值
   - 扁平化层次

2. 物化视图
   - 数据库管理
   - 自动更新
   - 查询优化

3. 缓存
   - 热数据内存缓存
   - 共享分布式缓存
   - 事件触发失效

4. 专用存储
   - ElasticSearch用于搜索
   - Redis用于实时
   - ClickHouse用于分析

同步模式

事件驱动投影

// 事件驱动投影
public class OrderProjection : IEventHandler<OrderPlaced>, IEventHandler<OrderShipped>
{
    private readonly IOrderViewRepository _views;

    public async Task HandleAsync(OrderPlaced @event, CancellationToken ct)
    {
        var view = new OrderView
        {
            OrderId = @event.OrderId,
            CustomerId = @event.CustomerId,
            Status = "已放置",
            PlacedAt = @event.Timestamp,
            ItemCount = @event.Items.Count,
            TotalAmount = @event.TotalAmount
        };

        await _views.InsertAsync(view, ct);
    }

    public async Task HandleAsync(OrderShipped @event, CancellationToken ct)
    {
        await _views.UpdateAsync(@event.OrderId, view =>
        {
            view.Status = "已发货";
            view.ShippedAt = @event.Timestamp;
            view.TrackingNumber = @event.TrackingNumber;
        }, ct);
    }
}

一致性模式

一致性选项:

强一致性(同一事务):
┌──────────┐    ┌──────────┐
│ 命令     │───►│ 读取     │
│ 数据库   │    │ 模型更新 │
│          │    │          │
└──────────┴────┴──────────┘
     同一事务

最终一致性(异步):
┌──────────┐    ┌──────────┐    ┌──────────┐
│ 命令     │───►│ 消息     │───►│ 读取     │
│ 数据库   │    │ 队列     │    │ 模型     │
└──────────┘    └──────────┘    └──────────┘
     异步,最终一致

混合(读你写):
- 立即从命令端读取
- 其他最终一致
- 查询中版本检查

MediatR实现

MediatR设置

// 注册
services.AddMediatR(cfg =>
{
    cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
});

// 命令/查询接口
public interface ICommand<TResult> : IRequest<TResult> { }
public interface IQuery<TResult> : IRequest<TResult> { }

// 处理器接口
public interface ICommandHandler<TCommand, TResult>
    : IRequestHandler<TCommand, TResult>
    where TCommand : ICommand<TResult> { }

public interface IQueryHandler<TQuery, TResult>
    : IRequestHandler<TQuery, TResult>
    where TQuery : IQuery<TResult> { }

管道行为

// 验证行为
public class ValidationBehavior<TRequest, TResponse>
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : notnull
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;

    public async Task<TResponse> Handle(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken ct)
    {
        var failures = _validators
            .Select(v => v.Validate(request))
            .SelectMany(r => r.Errors)
            .Where(f => f != null)
            .ToList();

        if (failures.Any())
            throw new ValidationException(failures);

        return await next();
    }
}

// 日志行为
public class LoggingBehavior<TRequest, TResponse>
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : notnull
{
    private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;

    public async Task<TResponse> Handle(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken ct)
    {
        _logger.LogInformation("处理 {RequestType}", typeof(TRequest).Name);
        var response = await next();
        _logger.LogInformation("已处理 {RequestType}", typeof(TRequest).Name);
        return response;
    }
}

CQRS的API设计

REST API模式

[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
    private readonly IMediator _mediator;

    // 命令使用POST/PUT/DELETE
    [HttpPost]
    public async Task<ActionResult<OrderId>> PlaceOrder(
        [FromBody] PlaceOrderCommand command,
        CancellationToken ct)
    {
        var orderId = await _mediator.Send(command, ct);
        return CreatedAtAction(nameof(GetOrder), new { id = orderId }, orderId);
    }

    // 查询使用GET
    [HttpGet("{id}")]
    public async Task<ActionResult<OrderDetailsDto>> GetOrder(
        Guid id,
        CancellationToken ct)
    {
        var order = await _mediator.Send(new GetOrderByIdQuery(id), ct);
        return Ok(order);
    }

    [HttpGet]
    public async Task<ActionResult<PagedResult<OrderSummaryDto>>> ListOrders(
        [FromQuery] ListOrdersQuery query,
        CancellationToken ct)
    {
        var orders = await _mediator.Send(query, ct);
        return Ok(orders);
    }
}

何时使用CQRS

适用场景

CQRS适用场景:

✓ 复杂读写操作
  - 不同优化需求
  - 读写比例不平衡

✓ 数据多视图
  - 不同查询模式
  - 多UI需求

✓ 协作领域
  - 多并发用户
  - 复杂验证

✓ 事件驱动系统
  - 微服务
  - 异步处理

✓ 可扩展需求
  - 独立读写扩展
  - 性能优化

不适用场景

CQRS可能不适用:

✗ 简单CRUD应用
  - 开销不合理
  - 单一模型足够

✗ 小团队/项目
  - 增加复杂性
  - 维护负担

✗ 强一致需求
  - 实时需求
  - 金融交易

✗ 未知查询模式
  - 临时报告
  - BI需求

工作流

实现CQRS时:

  1. 评估适用性:CQRS是否适合此上下文?
  2. 选择级别:逻辑、物理或事件源?
  3. 设计命令:识别写入操作
  4. 设计查询:识别读取模式
  5. 计划同步:如何更新读取模型?
  6. 实现管道:验证、日志等
  7. 考虑一致性:需要什么保证?
  8. 测试两端:命令和查询测试

参考

详细指导:


最后更新: 2025-12-26