实践中的事件源和CQRS

任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情。 您需要确保可以自由创建,修改和删除数据库实体而不会出错,在大多数情况下,解决方案将以性能为代价。 可以用来解决此问题的一种方法是根据一系列事件而不是可变状态来设计系统。 这通常称为事件来源。

在本文中,我将展示一个演示应用程序,该应用程序使用开源工具包Speedment快速启动并运行可扩展的基于事件的数据库应用程序。 示例的完整源代码在此处 。

什么是事件源?

在典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。 状态更改时,应用程序使用UPDATE或DELETE语句修改行。 这种方法的问题在于,当要确保没有更改任何行以致使系统处于非法状态时,它将对数据库增加很多要求。 您不希望任何人提取比他们帐户中更多的钱,或者要竞标已经结束的拍卖。

在基于事件的系统中,我们对此采取了不同的方法。 无需将实体的状态存储在数据库中,而是存储导致该状态的一系列更改 。 事件一旦创建便是不可变的,这意味着您仅需实现两个操作CREATE和READ。 如果实体被更新或删除,则可以通过创建“更新”或“删除”事件来实现。

事件源系统可以轻松扩展以提高性能,因为任何节点都可以简单地下载事件日志并重播当前状态。 由于写入和查询由不同的机器处理,因此您还可以获得更好的性能。 这称为CQRS(命令查询职责隔离)。 正如您将在示例中看到的,使用Speedment工具包,我们可以在极短的时间内获得最终一致的实例化视图并开始运行。

可预订的桑拿

为了展示构建事件源系统的工作流程,我们将创建一个小型应用程序来处理住宅区中共享桑拿浴室的预订。 我们有多个租户有兴趣预订桑拿浴室,但我们需要确保害羞的租户永远不会意外预订它。 我们还希望在同一系统中支持多个桑拿浴室。

为了简化与数据库的通信,我们将使用Speedment工具箱 。 Speedment是一个Java工具,它使我们能够从数据库生成完整的域模型,并且还可以使用优化的Java 8流轻松查询数据库。 在Apache 2-license下可以使用Speedment ,在Github页面上有很多很好的例子说明了不同的用法。

步骤1:定义数据库架构

第一步是定义我们的(MySQL)数据库。 我们仅拥有一张称为“预订”的桌子,用于存储与预订桑拿浴室有关的事件。 请注意,预订是事件而不是实体。 如果我们要取消预订或对其进行更改,则必须将其他更改发布为新行。 我们不允许修改或删除已发布的行。

CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`)
);

“ id”列是一个递增的整数,每次将新事件发布到日志时都会自动分配。 “ booking_id”告诉我们我们指的是哪个预订。 如果两个事件共享相同的预订ID,则它们引用相同的实体。 我们还有一个名为“ event_type”的枚举,它描述了我们试图执行的操作。 之后是属于预订的信息。 如果列为NULL,则与任何先前值相比,我们将其视为未修改。

步骤2:使用加速生成代码

下一步是使用Speedment为项目生成代码。 只需创建一个新的maven项目并将以下代码添加到pom.xml文件即可。

pom.xml

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version>
</properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins>
</build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency>
</dependencies>

如果生成项目,则IDE 中将出现一个名为speedment:tool的新maven目标。 运行它以启动Speedment用户界面。 在其中,连接到Sauna数据库并使用默认设置生成代码。 现在应在项目中填充源文件。

提示:如果对数据库进行了更改,则可以使用speedment:reload -goal下载新配置,并使用speedment:generate 重新生成源。 无需重新启动该工具!

步骤3:创建物化视图

物化视图是一个组件,该组件定期轮询数据库以查看是否已添加任何新行,如果已添加,则以正确的顺序下载并将它们合并到视图中。 由于轮询有时会花费很多时间,因此我们希望此过程在单独的线程中运行。 我们可以使用Java Timer和TimerTask来实现。

轮询数据库? 真? 好吧,需要考虑的重要一点是,只有服务器才能轮询数据库,而不会轮询客户端。 这给我们提供了很好的可伸缩性,因为我们可以让少数服务器轮询数据库,从而为成千上万的租户提供服务。 将此与常规系统进行比较,在常规系统中,每个客户端都会从服务器请求资源,然后服务器又会联系数据库。

BookingView.java

public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last  = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;}
}

计时器任务是匿名定义的,这就是轮询逻辑所在的位置。

final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}}
};

有时,合并任务可能需要花费比计时器间隔更多的时间。 为避免此问题,我们使用AtomicBoolean进行检查并确保只能同时执行一个任务。 这类似于信号量,不同之处在于我们想要删除没有时间的任务而不是排队,因为我们确实不需要执行所有任务,因此只需一秒钟即可完成一个新任务。

构造函数和基本成员方法很容易实现。 我们将传递给类的计时器作为参数存储在构造函数中,以便在需要停止时可以取消该计时器。 我们还会存储一张地图,以将所有预订的当前视图保存在内存中。

private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY   = 1_000; // Millisecondsprivate final Timer timer;
private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer    = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>();
}public Stream<Booking> stream() {return bookings.values().stream();
}public void stop() {timer.cancel();
}

BookingView类的最后一个缺少的部分是合并过程中上面使用的accept()方法。 在这里考虑新事件并将其合并到视图中。

private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()||  !ev.getTenant().isPresent()||  !ev.getBookedFrom().isPresent()||  !ev.getBookedTo().isPresent()||  !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;}
}

在事件源系统中,规则在收到事件时不执行,但在实现时才执行。 基本上,任何人都可以在表的末尾插入新事件到系统中。 在这种方法中,我们选择丢弃不遵循规则设置的事件。

步骤4:示例用法

在此示例中,我们将使用标准的Speedment API将三个新的预订插入到数据库中,其中两个有效,第三个与先前的一个相交。 然后,我们将等待视图更新并打印出所有预订。

public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop();
}

如果运行它,将得到以下输出:

677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

我的GitHub页面上提供了此演示应用程序的完整源代码。 在这里您还可以找到许多其他示例,这些示例说明了如何在各种情况下使用Speedment快速开发数据库应用程序。

摘要

在本文中,我们在数据库表上开发了一个物化视图,该视图可评估物化而不是插入时的事件。 这样就可以启动应用程序的多个实例,而不必担心对其进行同步,因为它们最终将保持一致。 然后,我们通过展示如何使用Speedment API查询实例化视图来生成当前预订列表来结束。

感谢您的阅读,请在Github页面上查看更多Speedment示例 !

翻译自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352061.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Lintcode--1(463)--整数排序

题目&#xff1a;给一组整数&#xff0c;按照升序排序&#xff0c;使用选择排序&#xff0c;冒泡排序&#xff0c;插入排序或者任何 O(n2) 的排序算法1、冒泡排序原理&#xff1a;从第一个整数开始第一趟&#xff0c;比较相邻的两个元素&#xff0c;大的放在后面&#xff1b;一…

Preparing Cities for Robot Cars【城市准备迎接自动驾驶汽车】

Preparing Cities for Robot Cars The possibility of self-driving robot cars has often seemed like a futurists dream, years away from materializing in the read world. 自动驾驶机器人汽车的可能性貌似看起来常常是未来主义者的梦想&#xff0c;在真实世界里的实现还…

百度地图no result available_【整理之路二】百度地图的路径规划和调用本机地图导航...

推荐看完之后注意一下最后的东西一、细说百度地图的路径规划路径规划主要有这么几种1.公交路径规划1.1 市内公交规划&#xff08;暂时不在这里说&#xff09;1.2 跨市/省公交规划// 导入头文件#import <BaiduMapAPI_Search/BMKSearchComponent.h>#import <BaiduMapAPI…

最短路径—Dijkstra算法和Floyd算法

Dijkstra算法 1.定义概览 Dijkstra(迪杰斯特拉)算法是典型的单源最短路径算法&#xff0c;用于计算一个节点到其他所有节点的最短路径。主要特点是以起始点为中心向外层层扩展&#xff0c;直到扩展到终点为止。Dijkstra算法是很有代表性的最短路径算法&#xff0c;在很多专业课…

实现两个数的交换(异或,加减)

1、 通常我们通过设置临时变量来实现两个数的交换&#xff0c;如下&#xff1a; void swap(int *a,int *b){int temp;temp*a;*a*b;*btemp;} 2、还可以通过异或来实现两个不同整数的交换&#xff0c;如下&#xff1a; void swap(int &a,int &b){tempa^b; //设a为临…

url override and HttpSession implements session

背景 HttpSession默认使用Cookie存储Session ID&#xff0c;如果在用户禁用浏览器Cookie的功能后&#xff0c;仍打算运用HttpSession来进行会话管理&#xff0c;那么可以搭配URL重写来实现。 实现方法 使用HttpServletResponse的encodeURL()方法协助产生URL。  服务器端调用r…

怎么用python写名字_python中的__name__ 到底是个什么玩意?应该怎么用到它?

本文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理以下文章来源于腾讯云 作者&#xff1a;Python进击者( 想要学习Python&#xff1f;Python学习交流群&#xff1a;1039649593&#xff0c;满足你的需求&…

用RAII技术管理资源及其泛型实现

前言 RAII的含义是“资源获取即初始化”。 一段看似安全的代码 首先看一段代码&#xff1a; try{int *p new int[100];// ... do somethingdelete[] p; }catch(exception &e){ // ..... } 这段代码中&#xff0c;我们先进行了动态内存分配&#xff0c;使…

使用Lambda在AWS云上使用Java

如今&#xff0c;Amazon Web Services越来越受欢迎。 Java是AWS的一等公民&#xff0c;它很容易上手。 部署应用程序有些不同&#xff0c;但是仍然很容易和方便。 AWS Lambda是一种计算服务&#xff0c;您可以在其中将代码上传到AWS Lambda&#xff0c;并且该服务可以使用AWS…

Lintcode--3(366)--斐波那契数列

题目&#xff1a;查找斐波纳契数列中第 N 个数。所谓的斐波纳契数列是指&#xff1a;前2个数是 0 和 1 。第 i 个数是第 i-1 个数和第i-2 个数的和。斐波纳契数列的前10个数字是&#xff1a;0,1,1,2,3,5,8,13,21... 程序&#xff1a; class Solution { public: /* * para…

nt6启动菜单自动修复工具_轻量级windows系统修复,清理工具——Dism++

收藏分享计划读完需要4分钟速读仅需 2 分钟Dism是一款操作简单&#xff0c;轻量级的系统维护工具。Dism 作为第三版清理工具更加深入系统底层&#xff0c;功能和清理效果都非常不错1 简介Dism 是由初雨团队采用微软内部 API 编写的一款开源免费的实用工具&#xff0c;最开始的名…

【日常小记】linux中强大且常用命令:find、grep

在linux下面工作&#xff0c;有些命令能够大大提高效率。本文就向大家介绍find、grep命令&#xff0c;他哥俩可以算是必会的linux命令&#xff0c;我几乎每天都要用到他们。本文结构如下&#xff1a; find命令 find命令的一般形式 find命令的常用选项及实例 find与xargs grep命…

Spring----最小化Spring配置

在Spring的配置文件中&#xff0c;我们可以使用<bean>元素定义Bean,以及使用<constructor-arg>或着<property>元素装配bean,这对于包含少量Bean的应用来说以经非常不错了&#xff0c;但是随着应用的发展&#xff0c;我们不得不编写越来越复杂的XML配置。为解…

Lintcode--2(56)--两数之和

题目&#xff1a;给一个整数数组&#xff0c;找到两个数使得他们的和等于一个给定的数 target。 你需要实现的函数twoSum需要返回这两个数的下标, 并且第一个下标小于第二个下标。注意这里下标的范围是 0 到 n-1。注意事项你可以假设只有一组答案。样例给出 numbers [2, 7, 11…

qml如何发布程序_首创PC端小程序直播发布会,360如何与手机厂商一起共振?

文 | Toby Lu全新的线上发布会形式&#xff0c;正在搅动着手机品牌营销江湖。疫情之下&#xff0c;线上发布会的形式成为手机品牌产品亮相的最佳形式&#xff0c;与传统的联合各家媒体做直播不同&#xff0c;聚焦于一个媒体平台&#xff0c;全场景、全链路的营销模式&#xff0…

CUBA Platform 6.3的新增功能

我们很自豪地宣布新版本的CUBA平台和Studio全面上市&#xff01; 也许这是有史以来功能最丰富的平台版本之一–在各个级别都有重要的变化&#xff1a;体系结构&#xff0c;可扩展性&#xff0c;API可用性和性能。 本文介绍了该平台的主要增强功能。 发行说明中提供了完整的更…

Python字符串的编码与解码(encode与decode)

首先要搞清楚&#xff0c;字符串在Python内部的表示是unicode编码&#xff0c;因此&#xff0c;在做编码转换时&#xff0c;通常需要以unicode作为中间编码&#xff0c;即先将其他编码的字符串解码&#xff08;decode&#xff09;成unicode&#xff0c;再从unicode编码&#xf…

Linux复习笔记

常用命令&#xff1a; pwd &#xff1b;cd&#xff1b; ls&#xff1b; cp&#xff1b; mv&#xff1b; rm &#xff1b;cat&#xff1b; stat&#xff1b; 01234567---r---w-rw---xr-x-wxrwx目录&#xff1a; root: 在root下: useradd mjq 创建mjq用户 passwd mjq 创建密…

Lintcode--4(1)--A+B

题目&#xff1a;给出两个整数a和b, 求他们的和, 但不能使用 等数学运算符。 说明&#xff1a; a和b都是 32位 整数么&#xff1f;是的我可以使用位运算符么&#xff1f;当然可以 样例&#xff1a;如果 a1 并且 b2&#xff0c;返回3显然你可以直接 return a b&#xff0c;但…

1w存银行一年多少利息_100万存银行一年利息多少?能赚多少钱?

100万存银行一年利息多少&#xff0c;是否可以辞掉工作什么都不做随着经济水平的提升&#xff0c;大家手上的存款也越来越多了&#xff0c;众所周知&#xff0c;将资金存放在银行是可以赚取利息收益的&#xff0c;那么如果我们有100万的存款资金后&#xff0c;一年可以获得多少…