名称: cqrs-architecture 描述: CQRS模式实现和查询优化 允许工具: 读取, 全局, 搜索, 写入, 编辑
CQRS架构技能
设计和实现命令查询职责分离模式,用于可扩展系统。
强制:文档优先方法
在实现CQRS之前:
- 调用
docs-management技能获取CQRS模式文档 - 通过MCP服务器验证模式(如perplexity, context7)
- 基于已建立的CQRS文献提供指导
CQRS基础
传统 vs CQRS:
传统(单一模型):
┌─────────────────────────────────┐
│ 应用程序 │
├─────────────────────────────────┤
│ 领域模型 │
│ (读取 + 写入) │
├─────────────────────────────────┤
│ 数据库 │
└─────────────────────────────────┘
CQRS(分离模型):
┌───────────────┐ ┌───────────────┐
│ 命令端 │ │ 查询端 │
│ (写入模型) │ │ (读取模型) │
├───────────────┤ ├───────────────┤
│ 领域逻辑 │ │ DTO/视图 │
│ 聚合 │ │ 投影 │
├───────────────┤ ├───────────────┤
│ 写入数据库 │───►│ 读取数据库 │
└───────────────┘ └───────────────┘
CQRS级别
级别1:逻辑分离
相同数据库,分离代码路径:
┌─────────────────────────────────────┐
│ 应用程序 │
├──────────────────┬──────────────────┤
│ 命令处理器 │ 查询处理器 │
│ - 验证 │ - 直接SQL │
│ - 领域逻辑 │ - 投影 │
│ - 事件 │ - DTOs │
├──────────────────┴──────────────────┤
│ 单一数据库 │
└─────────────────────────────────────┘
好处:
✓ 代码清晰分离
✓ 简单部署
✓ 单一数据源
✓ 良好的起点
级别2:分离读取模型
相同写入数据库,分离读取数据库:
┌─────────────────┐ ┌─────────────────┐
│ 命令端 │ │ 查询端 │
├─────────────────┤ ├─────────────────┤
│ 命令处理器 │ │ 查询处理器 │
│ 领域模型 │ │ DTOs │
├─────────────────┤ ├─────────────────┤
│ 写入数据库 │───►│ 读取数据库 │
│ (规范化) │同步│ (反规范化) │
└─────────────────┘ └─────────────────┘
好处:
✓ 优化读取性能
✓ 独立扩展读取
✓ 不同存储技术
✓ 最终一致读取
级别3:事件源CQRS
事件存储作为写入模型,投影作为读取:
┌─────────────────┐ ┌─────────────────┐
│ 命令端 │ │ 查询端 │
├─────────────────┤ ├─────────────────┤
│ 命令处理器 │ │ 查询处理器 │
│ 聚合 │ │ 读取模型 │
├─────────────────┤ ├─────────────────┤
│ 事件存储 │───►│ 多读取数据库 │
│ (仅追加) │ │ │
└─────────────────┘ └─────────────────┘
好处:
✓ 完整审计跟踪
✓ 时间查询
✓ 多投影
✓ 重建读取模型
命令端设计
命令结构
// 命令定义
public record PlaceOrderCommand(
Guid CustomerId,
List<OrderItemDto> Items,
string ShippingAddress
) : ICommand<OrderId>;
// 命令处理器
public class PlaceOrderHandler : ICommandHandler<PlaceOrderCommand, OrderId>
{
private readonly IOrderRepository _repository;
private readonly IEventPublisher _events;
public async Task<OrderId> HandleAsync(
PlaceOrderCommand command,
CancellationToken ct)
{
// 验证
if (!command.Items.Any())
throw new ValidationException("订单必须有商品");
// 领域逻辑
var order = Order.Create(
command.CustomerId,
command.Items.Select(i => new OrderItem(i.ProductId, i.Quantity)));
// 持久化
await _repository.SaveAsync(order, ct);
// 发布事件
await _events.PublishAsync(order.GetDomainEvents(), ct);
return order.Id;
}
}
命令模式
命令最佳实践:
命名:
- 命令式:PlaceOrder, CancelOrder, UpdateAddress
- 包含上下文:不只是“Create”而是“CreateOrder”
结构:
- 不可变(记录)
- 仅操作所需数据
- 命令中无业务逻辑
验证:
- 处理器中输入验证
- 领域业务验证
- 返回有意义错误
幂等性:
- 包含幂等键
- 处理重复提交
- 重试返回相同结果
查询端设计
查询结构
// 查询定义
public record GetOrderByIdQuery(Guid OrderId) : IQuery<OrderDetailsDto>;
// 查询处理器
public class GetOrderByIdHandler : IQueryHandler<GetOrderByIdQuery, OrderDetailsDto>
{
private readonly IReadDbContext _db;
public async Task<OrderDetailsDto> HandleAsync(
GetOrderByIdQuery query,
CancellationToken ct)
{
var order = await _db.OrderDetails
.Where(o => o.OrderId == query.OrderId)
.Select(o => new OrderDetailsDto
{
OrderId = o.OrderId,
CustomerName = o.Customer.Name,
Items = o.Items.Select(i => new OrderItemDto
{
ProductName = i.ProductName,
Quantity = i.Quantity,
Price = i.Price
}).ToList(),
Status = o.Status,
TotalAmount = o.TotalAmount
})
.FirstOrDefaultAsync(ct);
return order ?? throw new NotFoundException("订单未找到");
}
}
读取模型优化
查询优化策略:
1. 反规范化
- 预连接数据
- 存储计算值
- 扁平化层次
2. 物化视图
- 数据库管理
- 自动更新
- 查询优化
3. 缓存
- 热数据内存缓存
- 共享分布式缓存
- 事件触发失效
4. 专用存储
- ElasticSearch用于搜索
- Redis用于实时
- ClickHouse用于分析
同步模式
事件驱动投影
// 事件驱动投影
public class OrderProjection : IEventHandler<OrderPlaced>, IEventHandler<OrderShipped>
{
private readonly IOrderViewRepository _views;
public async Task HandleAsync(OrderPlaced @event, CancellationToken ct)
{
var view = new OrderView
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
Status = "已放置",
PlacedAt = @event.Timestamp,
ItemCount = @event.Items.Count,
TotalAmount = @event.TotalAmount
};
await _views.InsertAsync(view, ct);
}
public async Task HandleAsync(OrderShipped @event, CancellationToken ct)
{
await _views.UpdateAsync(@event.OrderId, view =>
{
view.Status = "已发货";
view.ShippedAt = @event.Timestamp;
view.TrackingNumber = @event.TrackingNumber;
}, ct);
}
}
一致性模式
一致性选项:
强一致性(同一事务):
┌──────────┐ ┌──────────┐
│ 命令 │───►│ 读取 │
│ 数据库 │ │ 模型更新 │
│ │ │ │
└──────────┴────┴──────────┘
同一事务
最终一致性(异步):
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 命令 │───►│ 消息 │───►│ 读取 │
│ 数据库 │ │ 队列 │ │ 模型 │
└──────────┘ └──────────┘ └──────────┘
异步,最终一致
混合(读你写):
- 立即从命令端读取
- 其他最终一致
- 查询中版本检查
MediatR实现
MediatR设置
// 注册
services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
});
// 命令/查询接口
public interface ICommand<TResult> : IRequest<TResult> { }
public interface IQuery<TResult> : IRequest<TResult> { }
// 处理器接口
public interface ICommandHandler<TCommand, TResult>
: IRequestHandler<TCommand, TResult>
where TCommand : ICommand<TResult> { }
public interface IQueryHandler<TQuery, TResult>
: IRequestHandler<TQuery, TResult>
where TQuery : IQuery<TResult> { }
管道行为
// 验证行为
public class ValidationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var failures = _validators
.Select(v => v.Validate(request))
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (failures.Any())
throw new ValidationException(failures);
return await next();
}
}
// 日志行为
public class LoggingBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
_logger.LogInformation("处理 {RequestType}", typeof(TRequest).Name);
var response = await next();
_logger.LogInformation("已处理 {RequestType}", typeof(TRequest).Name);
return response;
}
}
CQRS的API设计
REST API模式
[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
private readonly IMediator _mediator;
// 命令使用POST/PUT/DELETE
[HttpPost]
public async Task<ActionResult<OrderId>> PlaceOrder(
[FromBody] PlaceOrderCommand command,
CancellationToken ct)
{
var orderId = await _mediator.Send(command, ct);
return CreatedAtAction(nameof(GetOrder), new { id = orderId }, orderId);
}
// 查询使用GET
[HttpGet("{id}")]
public async Task<ActionResult<OrderDetailsDto>> GetOrder(
Guid id,
CancellationToken ct)
{
var order = await _mediator.Send(new GetOrderByIdQuery(id), ct);
return Ok(order);
}
[HttpGet]
public async Task<ActionResult<PagedResult<OrderSummaryDto>>> ListOrders(
[FromQuery] ListOrdersQuery query,
CancellationToken ct)
{
var orders = await _mediator.Send(query, ct);
return Ok(orders);
}
}
何时使用CQRS
适用场景
CQRS适用场景:
✓ 复杂读写操作
- 不同优化需求
- 读写比例不平衡
✓ 数据多视图
- 不同查询模式
- 多UI需求
✓ 协作领域
- 多并发用户
- 复杂验证
✓ 事件驱动系统
- 微服务
- 异步处理
✓ 可扩展需求
- 独立读写扩展
- 性能优化
不适用场景
CQRS可能不适用:
✗ 简单CRUD应用
- 开销不合理
- 单一模型足够
✗ 小团队/项目
- 增加复杂性
- 维护负担
✗ 强一致需求
- 实时需求
- 金融交易
✗ 未知查询模式
- 临时报告
- BI需求
工作流
实现CQRS时:
- 评估适用性:CQRS是否适合此上下文?
- 选择级别:逻辑、物理或事件源?
- 设计命令:识别写入操作
- 设计查询:识别读取模式
- 计划同步:如何更新读取模型?
- 实现管道:验证、日志等
- 考虑一致性:需要什么保证?
- 测试两端:命令和查询测试
参考
详细指导:
最后更新: 2025-12-26