概念
CQRS (https://learn.microsoft.com/zh-cn/azure/architecture/patterns/cqrs)是一种与领域驱动设计和事件溯源相关的架构模式, 它的全称是Command Query Responsibility Segregation, 又叫命令查询职责分离, Greg Young在2010年创造了这个术语, 它是基于Bertrand Meyer 的 CQS (Command-Query Separation 命令查询分离原则) 设计模式。
CQRS认为不论业务多复杂在最终实现的时候, 无非是读写操作, 因此建议将应用程序分为两个方面, 即Command(命令)和Query(查询)
命令端:
关注各种业务如何处理, 更新状态进行持久化
不返回任何结果 (void)
查询端:
查询, 并从不修改数据库
CQRS的三种实现
单一数据库的CQRS

命令与读取操作的是同一个数据库, 命令端通过ORM框架将实体保存到数据库中, 查询端通过数据访问层获取数据 (数据访问层通过ORM框架或者存储过程获取数据)
双数据库的CQRS

命令与读取操作的是不同的数据库, 命令端通过ORM框架将实体保存到 写库 (Write Db), 并将本地改动推送到 读库 (Read Db), 查询端通过数据访问层访问 读库 (Read Db), 使用这种模式可以带来以下好处:
查询更简单
读操作不需要任何的完整性校验, 也不需要外键约束, 可以减少锁争用, 我们可以针对查询端单独优化, 还可以使用刚好包含每个模板需要的数据的数据库视图,使得查询变得更快更简单
提升查询端的使用体验
由于这种架构将读写彻底分离,由于一般系统是读操作远远大于写操作, 这给我们的系统带来了巨大的性能提升, 极大的提升了客户的使用体验
关注点分离
读写分离的模型可以使得关注点分离, 使得读模型会变得相对简单
事件溯源 (Event Sourcing) CQRS

通过事件溯源实现的CQRS中会将应用程序的改变都以事件的方式存储起来, 使用这种模式可以带来以下好处:
事件存储中了完整的审计跟踪, 后续出现问题时方便跟踪
可以在任何的时间点重建实体的状态, 它将有助于排查问题并修复问题
提升查询端的使用体验
查询端与命令端可以是完全不同的数据源, 查询端可以针对查询条件做针对应的优化, 或者使用像
ES、Redis等用来存储数据, 提升查询效率
独立缩放
命令端与查询端可以被独立缩放, 减少锁争用
当然事情有利自然也有弊, CQRS的使用固然会带来很多好处, 但同样它也会给项目带来复杂度的提升, 并且双数据库模式、事件溯源模式 (https://microservices.io/patterns/data/event-sourcing.html) 的CQRS, 使用的是最终一致性, 这些都是我们在选择技术方案时必须要考虑的
设计
上述文章中我们了解到了CQRS其本质上是一种读写分离的设计思想, 它并不是强制性的规定必须要怎样去做, 这点与之前的IEvent (进程内事件)、IIntegrationEvent (跨进程事件)不同, 它并不是强制性的, 根据CQRS的设计模式我们将事件分成Command、Query
由于Query (查询) 是需要有返回值的, 因此我们在继承IEvent的同时, 还额外增加了一个Result属性用以存储结果, 我们希望将查询的结果保存到Result中, 但它不是强制性的, 我们并没有强制性要求必须要将结果保存到Result中。
由于Command (命令) 是没有返回值的, 因此我们并没有额外新增Result属性, 我们认为命令会更新数据, 那就需要用到工作单元, 因此Command除了继承IEvent之外, 还继承了ITransaction,这方便了我们在Handler中的可以通过@event.UnitOfWork来管理工作单元, 而不需要通过构造函数来获取
但MASA Framework
(https://github.com/masastack/MASA.Framework) 并没有要求必须使用 Event Sourcing 模式
(https://microservices.io/patterns/data/event-sourcing.html) 或者 双数据库模式 的CQRS, 具体使用哪种实现, 它取决于业务的决策者
下面就就来看看MASA Framework提供的CQRS是如何使用的
入门
安装.NET 6.0 (https://dotnet.microsoft.com/zh-cn/download/dotnet/6.0)
1.新建ASP.NET Core 空项目Assignment.CqrsDemo,并安装Masa.Contrib.Dispatcher.Events,Masa.Contrib.Dispatcher.IntegrationEvents,Masa.Contrib.Dispatcher.IntegrationEvents.Dapr,Masa.Contrib.ReadWriteSplitting.Cqrs,Masa.Contrib.Development.DaprStarter.AspNetCore
dotnet new web -o Assignment.CqrsDemo
cd Assignment.CqrsDemodotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.9 //使用进程内事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents --version 0.7.0-preview.9 //使用跨进程事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.9 //使用Dapr提供pubsub能力
dotnet add package Masa.Contrib.ReadWriteSplitting.Cqrs --version 0.7.0-preview.9 //使用CQRSdotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.9 //开发环境下协助 Dapr Sidecar, 用于通过Dapr发布集成事件 2.注册跨进程事件总线、进程内事件总线, 修改类Program.cs
示例中未真实使用DB, 不再使用发件箱模式, 只需要使用集成事件提供的PubSub (https://docs.dapr.io/zh-hans/developing-applications/building-blocks/pubsub/pubsub-overview/)能力即可
builder.Services.AddIntegrationEventBus(dispatcherOptions =>
{dispatcherOptions.UseDapr();//使用 Dapr 提供的PubSub能力dispatcherOptions.UseEventBus();//使用进程内事件总线
});3.注册Dapr Starter协助管理Dapr Sidecar (开发环境使用)
if (builder.Environment.IsDevelopment())builder.Services.AddDaprStarter();4.新增加添加商品方法, 修改类Program.cs
app.MapPost("/goods/add", async (AddGoodsCommand command, IEventBus eventBus) =>
{await eventBus.PublishAsync(command);
});/// <summary>
/// 添加商品参数, 用于接受商品参数
/// </summary>
public record AddGoodsCommand : Command
{public string Name { get; set; }public string Cover { get; set; }public decimal Price { get; set; }public int Count { get; set; }
}5.新增加查询商品的方法, 修改类Program.cs
app.MapGet("/goods/{id}", async (Guid id, IEventBus eventBus) =>
{var query = new GoodsItemQuery(id);await eventBus.PublishAsync(query);return query.Result;
});/// <summary>
/// 用于接收查询商品信息参数
/// </summary>
public record GoodsItemQuery : Query<GoodsItemDto>
{public Guid Id { get; set; } = default!;public override GoodsItemDto Result { get; set; }public GoodsItemQuery(Guid id){Id = id;}
}/// <summary>
/// 用于返回商品信息
/// </summary>
public class GoodsItemDto
{public Guid Id { get; set; }public string Name { get; set; }public string Cover { get; set; }public decimal Price { get; set; }public int Count { get; set; }public DateTime DateTime { get; set; }
}6.新增Command处理程序, 添加类CommandHandler.cs
public class CommandHandler
{/// <summary>/// 将商品添加到Db,并发送跨进程事件/// </summary>/// <param name="command"></param>/// <param name="integrationEventBus"></param>[EventHandler]public async Task AddGoods(AddGoodsCommand command, IIntegrationEventBus integrationEventBus){//todo: 模拟添加商品到db并发送添加商品集成事件var goodsId = Guid.NewGuid(); //模拟添加到db后并获取商品idawait integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent(goodsId, command.Name, command.Cover, command.Price,command.Count));}
}/// <summary>
/// 跨进程事件, 发送添加商品事件
/// </summary>
/// <param name="Id"></param>
/// <param name="Name"></param>
/// <param name="Cover"></param>
/// <param name="Price"></param>
/// <param name="Count"></param>
public record AddGoodsIntegrationEvent(Guid Id, string Name, string Cover, decimal Price, int Count) : IntegrationEvent
{public Guid Id { get; set; } = Id;public string Name { get; set; } = Name;public string Cover { get; set; } = Cover;public decimal Price { get; set; } = Price;public int Count { get; set; } = Count;public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
}7.新增Query处理程序, 添加类QueryHandler.cs
public class QueryHandler
{/// <summary>/// 从缓存查询商品信息/// </summary>/// <param name="query"></param>/// <returns></returns>[EventHandler]public Task GetGoods(GoodsItemQuery query){//todo: 模拟从cache获取商品var goods = new GoodsItemDto();query.Result = goods;return Task.CompletedTask;}
}8.新增添加商品的跨进程事件的处理服务, 修改Program.cs
app.MapPost("/integration/goods/add",[Topic("pubsub", nameof(AddGoodsIntegrationEvent))](AddGoodsIntegrationEvent @event, ILogger<Program> logger) =>{//todo: 模拟添加商品到缓存logger.LogInformation("添加商品到缓存, {Event}", @event);});// 使用 dapr 来订阅跨进程事件
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoint =>
{endpoint.MapSubscribeHandler();
});流水账式的服务会使得
Program.cs变得十分臃肿, 可以通过MASA Framework提供的MinimalAPIs来简化Program.cs点击查看详情(https://blogs.masastack.com/2022/07/12/masa/framework/practice/14.%20%E6%9C%80%E5%B0%8F%20API%20-%20MinimalAPIs)
我们上面的例子是通过事件总线来完成解耦以及数据模型的同步, 使用的双数据库模式, 但读库使用的是 缓存数据库, 在Command端做商品的添加操作, 在Query端只做查询, 且两端分别使用各自的数据源, 两者业务互不影响, 并且由于缓存数据库性能更强, 它将最大限度的提升性能, 使得我们有更好的使用体验。
在Masa Framework中仅仅是通过ICommand、IQuery将读写分开, 但这并没有硬性要求, 事实上你使用IEvent也是可以的, CQRS只是一种设计模式, 这点我们要清楚, 它只是告诉我们要按照一个什么样的标准去做, 但具体怎么来做, 取决于业务的决策者, 除此之外, 后续Masa Framework还会增加对Event Sourcing(事件溯源 (https://docs.microsoft.com/zh-cn/azure/architecture/patterns/event-sourcing))的支持, 通过事件重放, 允许我们随时重建到对象的任何状态
本章源码
Assignment15
https://github.com/zhenlei520/MasaFramework.Practice
CQRS架构项目:https://github.com/masalabs/MASA.EShop/tree/main/src/Services/Masa.EShop.Services.Catalog
参考
CQRS 模式
(https://learn.microsoft.com/zh-cn/azure/architecture/patterns/cqrs)
在微服务中应用简化后的 CQRS 和 DDD 模式
(https://learn.microsoft.com/zh-cn/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/apply-simplified-microservice-cqrs-ddd-patterns)
.NET现代化应用开发 - CQRS&类目管理代码剖析
(https://www.bilibili.com/video/BV1D24y1R7jE)
开源地址
MASA.Framework:https://github.com/masastack/MASA.Framework
如果你对我们的 MASA Framework 感兴趣, 无论是代码贡献、使用、提 Issue, 欢迎联系我们


《MASA Framework实战课程》已开课
点击“阅读原文”查看课程安排