探索cqrs和事件源_实践中的事件源和CQRS

探索cqrs和事件源

任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情。 您需要确保可以自由创建,修改和删除数据库实体而不会出错,并且在大多数情况下,解决方案将以性能为代价。 可以用来解决此问题的一种方法是根据一系列事件而不是可变状态来设计系统。 这通常称为事件源。

在本文中,我将展示一个演示应用程序,该应用程序使用开源工具包Speedment快速启动并运行可扩展的基于事件的数据库应用程序。 示例的完整源代码在此处 。

什么是事件源?

在典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。 状态改变时,应用程序使用UPDATE或DELETE语句修改行。 这种方法的问题在于,当要确保没有更改任何行以致使系统处于非法状态时,它将对数据库增加很多要求。 您不希望任何人提取比他们帐户中更多的钱或对已经结束的拍卖出价。

在事件源系统中,我们对此采取了不同的方法。 无需将实体的状态存储在数据库中,而是存储导致该状态的一系列更改 。 事件一旦创建便是不可变的,这意味着您仅需实现两个操作CREATE和READ。 如果实体被更新或删除,则可以通过创建“更新”或“删除”事件来实现。

事件源系统可以轻松扩展规模以提高性能,因为任何节点都可以简单地下载事件日志并重播当前状态。 由于写入和查询由不同的机器处理,因此您还可以获得更好的性能。 这称为CQRS(命令查询职责隔离)。 正如您将在示例中看到的,使用Speedment工具包,我们可以在极短的时间内获得最终一致的实例化视图并开始运行。

可预订的桑拿

为了展示构建事件源系统的工作流程,我们将创建一个小型应用程序来处理住宅区中共享桑拿的预订。 我们有多个租户有兴趣预订桑拿房,但我们需要确保害羞的租户永远不会意外预订它。 我们还希望在同一系统中支持多个桑拿浴室。

为了简化与数据库的通信,我们将使用Speedment工具箱 。 Speedment是一个Java工具,它使我们能够从数据库生成完整的域模型,并且还可以使用优化的Java 8流轻松查询数据库。 在Apache 2-license下可以使用Speedment ,在Github页面上有很多很好的例子说明了不同的用法。

步骤1:定义数据库架构

第一步是定义我们的(MySQL)数据库。 我们只是有一张称为“预订”的表,用于存储与预订桑拿有关的事件。 请注意,预订是事件而不是实体。 如果我们要取消预订或对其进行更改,则必须将具有更改的其他事件发布为新行。 我们不允许修改或删除已发布的行。

CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`)
);

“ id”列是一个递增的整数,每次将新事件发布到日志时都会自动分配。 “ booking_id”告诉我们我们指的是哪个预订。 如果两个事件共享相同的预订ID,则它们引用相同的实体。 我们还有一个名为“ event_type”的枚举,它描述了我们试图执行的操作。 之后是属于预订的信息。 如果列为NULL,则与任何先前值相比,我们将其视为未修改的。

步骤2:使用加速生成代码

下一步是使用Speedment为项目生成代码。 只需创建一个新的maven项目并将以下代码添加到pom.xml文件即可。

pom.xml

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version>
</properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins>
</build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency>
</dependencies>

如果生成项目,则IDE 中将出现一个新的maven目标,称为speedment:tool 。 运行它以启动Speedment用户界面。 在其中,连接到Sauna数据库并使用默认设置生成代码。 现在应在项目中填充源文件。

提示:如果对数据库进行了更改,则可以使用speedment:reload -goal下载新配置,并使用speedment:generate 重新生成源。 无需重新启动该工具!

步骤3:创建物化视图

物化视图是一个组件,该组件定期轮询数据库以查看是否已添加任何新行,如果有,则以正确的顺序下载并将它们合并到视图中。 由于轮询有时会花费很多时间,因此我们希望此过程在单独的线程中运行。 我们可以使用Java Timer和TimerTask来实现。

轮询数据库? 真? 好吧,要考虑的重要一点是,只有服务器才能轮询数据库,而不是客户端。 这给我们提供了很好的可伸缩性,因为我们可以让少数服务器轮询数据库,从而服务于成千上万的租户。 将此与常规系统进行比较,在常规系统中,每个客户端都会从服务器请求资源,然后服务器又与数据库进行联系。

BookingView.java

public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last  = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;}
}

计时器任务是匿名定义的,这就是轮询逻辑所在的位置。

final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}}
};

有时,合并任务所花费的时间可能会超过计时器的时间间隔。 为了避免这引起问题,我们使用AtomicBoolean进行检查并确保只能同时执行一个任务。 这类似于信号量,不同之处在于我们希望删除没有时间的任务而不是排队,因为我们实际上不需要执行所有任务,因此只需一秒钟即可完成一个新任务。

构造函数和基本成员方法相当容易实现。 我们将传递给类的计时器作为参数存储在构造函数中,以便在需要停止时可以取消该计时器。 我们还存储了一张地图,将所有预订的当前视图保存在内存中。

private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY   = 1_000; // Millisecondsprivate final Timer timer;
private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer    = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>();
}public Stream<Booking> stream() {return bookings.values().stream();
}public void stop() {timer.cancel();
}

BookingView类的最后一个缺失部分是合并过程中上面使用的accept()方法。 在这里考虑新事件并将其合并到视图中。

private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()||  !ev.getTenant().isPresent()||  !ev.getBookedFrom().isPresent()||  !ev.getBookedTo().isPresent()||  !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;}
}

在事件源系统中,规则在收到事件时不执行,但在实现时才执行。 基本上,任何人都可以在表的末尾插入新事件到系统中。 在这种方法中,我们选择丢弃不遵循规则设置的事件。

步骤4:用法示例

在此示例中,我们将使用标准的Speedment API将三个新的预订插入到数据库中,其中两个有效,而第三个与先前的一个相交。 然后,我们将等待视图更新并打印出所有预订。

public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop();
}

如果运行它,将得到以下输出:

677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

我的GitHub页面上提供了此演示应用程序的完整源代码。 在这里您还可以找到许多其他示例,这些示例说明了如何在各种情况下使用Speedment快速开发数据库应用程序。

摘要

在本文中,我们在数据库表上开发了一个物化视图,该视图可评估物化而不是插入时的事件。 这样就可以启动应用程序的多个实例,而不必担心对其进行同步,因为它们最终将保持一致。 然后,我们通过展示如何使用Speedment API查询实例化视图以生成当前预订列表来结束。

感谢您的阅读,请在Github页面上查看更多Speedment示例 !

翻译自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html

探索cqrs和事件源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335552.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows功能_你的Windows杀毒软件有这个功能吗?

安全软件首推-火绒&#xff0c;良心之作。比起360安全卫士、腾讯电脑管家&#xff0c;它无广告&#xff0c;无捆绑&#xff0c;无劫持&#xff0c;无弹窗&#xff0c;还免费。我认为最喜人最贴心的功能属它的‘弹窗拦截’了&#xff0c;可以拦截程序推送烦人的弹窗。五大浓缩亮…

base64 二进制流java_读取和base64编码二进制文件

我m trying to read a binary file from the filesystem and then base64 encode it in JavaScript. I使用FileReader API读取数据并找到base64编码器here .我的代码似乎接近工作&#xff0c;问题是生成的base64数据是错误的 . 这是我到目前为止所得到的&#xff1a;function s…

drools dmn_使用Drools的DMN运行时示例

drools dmn正如去年宣布的那样 &#xff0c;Drools 7.0将在合规级别3对DMN模型提供全面的运行时支持。 在撰写本文时&#xff0c;运行时实现已完成&#xff0c;并且该团队现在正在努力进行改进&#xff0c;以进行错误修复和用户友好。 不幸的是&#xff0c;对于7.0版本&#…

笔记本电脑关机后指示灯还亮_汽车仪表常见指示符号之清洗液指示灯,灯亮了怎么办?...

清洗液指示灯就是玻璃水指示灯&#xff0c;用来显示玻璃水的储存量的&#xff0c;平时为熄灭状态&#xff0c;当玻璃水不足时就会点亮提醒驾驶员该添加了。添加后清洗液指示灯还亮的说明出现故障&#xff0c;检查玻璃水电机&#xff0c;相关线路保险丝等&#xff0c;行车中此灯…

java image 设置大小_如何在Java中调整BufferedImage的大小

单步缩放的主要问题是它们通常不会产生高质量的输出&#xff0c;因为它们专注于将原始图像压缩到更小的空间中&#xff0c;通常通过剔除大量像素信息(不同的算法完成不同的事情&#xff0c;所以我归纳有效)威尔drawGraphics规模上下&#xff0c;是的&#xff0c;它会向它或产生…

xxx钻石商城功能开发需求

文章目录1. 买家小程序端1.1. 首页1.2. 店铺1.2.1. 搜索门店1.2.2. 门店信息1.2.3. 预约试戴1.3. 购物袋1.3.1. 加入购物车1.3.2. 编辑购物车1.4. 个人中心1.4.1. 个人信息1.4.2. 实名认证1.4.3. 我的等级1.4.4. 查看收益1.4.4.1 门店收益1.4.4.1.1查看结算单1.4.4.2 分享收益1…

drools6.5_Drools 6.5.0.Final可用

drools6.5最新和最出色的Drools 6.5.0.Final版本现已可供下载。 这是我们以前的版本的增量版本&#xff0c;重点是一些重要的改进以完善6.x系列。 您可以在此处找到更多详细信息&#xff0c;下载和文档&#xff1a; Drools网站 资料下载 文献资料 发行说明 请阅读下面的一…

c++ vector 一部分_C++ vector 使用注意事项

1、初始化c 11以后新增了大括号{}的初始化方式&#xff0c;需要注意与()的区别,如&#xff1a;std::vector<int> vecTest1(5); //初始化5个元素&#xff0c;每个都是0std::vector<int> vecTest2{ 5 }; //初始化1个元素&#xff0c;值是52、添加元素&#xff1a;pus…

java list 取前3个_用java 截取字符串,每三个一组

展开全部以下是将一长串数字分组用逗号隔开,每三个一组,转化为字符串.例如:2345678--->2,345,67823--->23private static String convert(long space) {String str String.valueOf(space);StringBuilder builder new StringBuilder(str);str builder.reverse().toStri…

react打包后图片丢失_React中型项目的优化实践

本文可能涉及的内容--项目介绍整个项目大概有60个页面&#xff0c;用到的组件大概150&#xff0c;package里面的依赖大概有70个&#xff0c;应该勉强算得上是一个中型的React的项目了。下面给大家看看我们现在build一次项目的结果--打包时间约150s&#xff0c;打包完之后的资源…

HH SaaS电商系统的商品入仓功能需求说明

文章目录租户添加入仓商品商城添加入仓商品总店添加入库商品分店添加入库商品供应商添加入库商品租户添加入仓商品 点击“选择商品入仓”可以看到全部所属供应商及租户自供的商品&#xff0c;具体数据在“SKU供应信息表”查询&#xff0c;所属租户ID匹配&#xff0c;且所属平台…

cuba.platform_CUBA Platform 6.3的新增功能

cuba.platform我们很自豪地宣布新版本的CUBA平台和Studio全面上市&#xff01; 也许这是有史以来功能最丰富的平台版本之一–在各个级别上都有重要的变化&#xff1a;体系结构&#xff0c;可扩展性&#xff0c;API可用性和性能。 本文介绍了该平台的主要增强功能。 发行说明中…

java annotation应用_Java Annotation高级应用

前言&#xff1a;在此行文《java annotation高级应用》&#xff0c;具体实例化解释annotation和annotation processing tool(APT)的使用。望能对各位的有所帮助。一、摘要&#xff1a;《java annotation高级应用》具体实例化解释annotation和annotation processing tool(APT)的…

如何通过命令终端访问本地/局域网/远程的MySQL数据库_访问数据库_连接数据库_登录数据库

文章目录Windows系统下访问本地MySQL数据库访问远程主机的MySQL数据库本地安装了MySQL数据库本地没有安装MySQLLinux系统下退出数据库登录Windows系统下 访问本地MySQL数据库 使用命令终端访问本地MySQL数据库&#xff0c;打开终端直接输入以下命令语句&#xff1a; mysql …

科大讯飞 jason word_2019科大讯飞全球1024开发者节开幕啦

2019年10月24日&#xff0c;属于开发者的共同节日2019科大讯飞全球1024开发者节于合肥滨湖国际会展中心盛大启幕。以“A.I.前进&#xff0c;拼世界”为主题的本届大会&#xff0c;将集结120位行业大咖主题演讲、20场行业分论坛详解全域A.I.&#xff0c;科大讯飞的1024计划3.0、…

多线程线程池的实现java_如何在Java中实现线程池

多线程线程池的实现java线程是独立程序的执行路径。 在java中&#xff0c;每个线程都扩展java.lang.Thread类或实现java.lang.Runnable。 多线程是指在一个任务中同时执行两个或多个线程。在多线程中&#xff0c;每个任务可以有多个线程&#xff0c;并且这些线程可以异步或同步…

java timestamp是什么类型_JAVA比较2个Timestamp类型的时间大小-由此引发的思考

今天忽然要对2个Timestamp变量的类型进行比较。没怎么用过&#xff0c;百度发现居然很多都是转换类型的。后面发现Timestamp自己都有方法进行比较。但是百度一堆都是那些要转换类型的。我就想简单的知道2个Timestamp的时间哪个早哪个晚嘛。经过自己的百度的验证&#xff0c;终于…

Windows 使用命令执行 sql 脚本文件

文章目录MySQL 数据库方法一&#xff1a;使用 mysql 命令方法二&#xff1a;使用 source 命令Oracle 数据库MySQL 数据库 方法一&#xff1a;使用 mysql 命令 未配置 MySQL 的环境变量&#xff0c;则需要进入 MySQL 的 bin 目录下才能执行 mysql 命令已配置 MySQL 的环境变量…

windows系统连接同局域网下的其他电脑mysql等服务

一、首先设置被连接的mysql服务的 root 用户及访问权限&#xff08;必须确保连接用户拥有不限ip访问权限&#xff09; 参考&#xff1a;https://blog.csdn.net/hkl_Forever/article/details/127543546 二、配置被连接电脑的防火墙、网络共享、设置出入端口规则等 1、打开防火墙…