使用Vert.x进行响应式开发

最近,似乎我们正在听到有关Java的最新和最好的框架的消息。 忍者 , SparkJava和Play等工具; 但是每个人都固执己见,使您感到您需要重新设计整个应用程序以利用它们的出色功能。 这就是为什么当我发现Vert.x时令我感到宽慰的原因。 Vert.x不是一个框架,它是一个工具包,它不受质疑,而且正在解放。 Vert.x不想让您重新设计整个应用程序以使用它,它只是想让您的生活更轻松。 您可以在Vert.x中编写整个应用程序吗? 当然! 您可以在现有的Spring / Guice / CDI应用程序中添加Vert.x功能吗? 是的 您可以在现有JavaEE应用程序中使用Vert.x吗? 绝对! 这就是让它变得惊人的原因。

背景

Vert.x诞生于Tim Fox决定他喜欢NodeJS生态系统中正在开发的许多东西,但他不喜欢在V8中进行权衡取舍:单线程,有限的库支持以及JavaScript本身。 Tim着手编写一个工具包,该工具包在使用方式和位置方面不受质疑,因此他决定在JVM上实现它的最佳位置。 因此,Tim和社区开始着手创建一个事件驱动的,非阻塞的,反应性的工具箱,该工具箱在许多方面都反映了NodeJS可以完成的工作,而且还利用了JVM内部的强大功能。 Node.x诞生了,后来发展成为Vert.x。

总览

Vert.x旨在实现事件总线,该事件总线使应用程序的不同部分可以以非阻塞/线程安全的方式进行通信。 它的一部分是根据Eralng和Akka展示的Actor方法建模的。 它还旨在充分利用当今的多核处理器和高度并发的编程需求。 因此,默认情况下,所有Vert.x VERTICLES默认都实现为单线程。 与NodeJS不同,Vert.x可以在许多线程中运行许多顶点。 此外,您可以指定某些顶点为“工作”顶点,并且可以是多线程的。 为了给蛋糕锦上添花,Vert.x通过使用Hazelcast对事件总线的多节点群集提供了底层支持。 它继续包含许多其他令人惊奇的功能,这些功能在这里没有列出太多,但是您可以在Vert.x官方文档中内容。

关于Vert.x,您需要了解的第一件事是,与NodeJS相似,永远不要阻塞当前线程。 默认情况下,Vert.x中的所有内容均已设置为使用回调/未来/承诺。 Vert.x提供了异步方法来执行大多数I / O和处理器密集型操作,这些操作可能会阻塞当前线程,而不是执行同步操作。 现在,使用回调可能很丑陋且很痛苦,因此Vert.x可以选择提供基于RxJava的API,该API使用Observer模式实现相同的功能。 最后,Vert.x通过在许多异步API上提供executeBlocking(Function f)方法,可以轻松使用现有的类和方法。 这意味着您可以选择喜欢使用Vert.x的方式,而不是由工具包指示必须如何使用它。

了解Vert.x的第二件事是它由顶点,模块和节点组成。 顶点是Vert.x中最小的逻辑单元,通常由单个类表示。 遵循UNIX Philosophy的原则,顶点应该是简单且具有单一用途的。 一组顶点可以放到一个模块中,该模块通常打包为单个JAR文件。 一个模块代表了一组相关的功能,当一起使用时,它们可以代表一个完整的应用程序或一个较大的分布式应用程序的一部分。 最后,节点是运行一个或多个模块/垂直模块的JVM的单个实例。 由于Vert.x具有从头开始内置的群集功能,因此Vert.x应用程序可以跨越一台计算机或跨多个地理位置的多台计算机跨越节点(尽管延迟可能会掩盖性能)。

示例项目

现在,我最近去过一些聚会和会议,在谈论反应式编程时,它们向您展示的第一件事就是构建一个聊天室应用程序。 一切都很好,但这并不能真正帮助您完全理解响应式开发的力量。 聊天室应用程序既简单又简单。 我们可以做得更好。 在本教程中,我们将使用旧版Spring应用程序并将其转换为利用Vert.x的优势。 这具有多个目的:表明该工具包易于与现有Java项目集成,它使我们能够利用可能是生态系统中根深蒂固的现有工具的优势,并且使我们遵循DRY原则 ,因为我们不不必重写大量代码即可获得Vert.x的好处。

我们的旧版Spring应用程序是使用Spring Boot,Spring Data JPA和Spring REST的REST API的简单示例。 源代码可以在“主”分支中找到该处 。 我们还将使用其他分支来演示进行中的进度,因此,只要对gitJava 8有一点经验的人都可以遵循。 让我们从检查常规Spring应用程序的Spring Configuration类开始。

@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {public static void main(String[] args) {ApplicationContext ctx = SpringApplication.run(Application.class, args);System.out.println("Let's inspect the beans provided by Spring Boot:");String[] beanNames = ctx.getBeanDefinitionNames();Arrays.sort(beanNames);for (String beanName : beanNames) {System.out.println(beanName);}}@Beanpublic DataSource dataSource() {EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();return builder.setType(EmbeddedDatabaseType.HSQL).build();}@Beanpublic EntityManagerFactory entityManagerFactory() {HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();vendorAdapter.setGenerateDdl(true);LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();factory.setJpaVendorAdapter(vendorAdapter);factory.setPackagesToScan("com.zanclus.data.entities");factory.setDataSource(dataSource());factory.afterPropertiesSet();return factory.getObject();}@Beanpublic PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {final JpaTransactionManager txManager = new JpaTransactionManager();txManager.setEntityManagerFactory(emf);return txManager;}
}

正如您在课程顶部看到的那样,我们有一些非常标准的Spring Boot注释。 你还会看到@ SLF4J批注这是一部分Lombok库,旨在帮助降低锅炉板代码。 我们还有@Bean批注的方法,用于提供对JPA EntityManager,TransactionManager和DataSource的访问。 这些项目中的每一个都提供可注入的对象,供其他类使用。 项目中的其余类也非常简单。 有一个客户 POJO,它是服务中使用的实体类型。 通过Spring Data创建了一个CustomerDAO 。 最后,有一个CustomerEndpoints类,它是JAX-RS注释的REST控制器。

如前所述,这是Spring Boot应用程序中的所有标准票价。 该应用程序的问题在于,在大多数情况下,它的可伸缩性有限。 您可以在Servlet容器中运行此应用程序,也可以在Jetty或Undertow之类的嵌入式服务器中运行该应用程序。 无论哪种方式,每个请求都占用一个线程,因此在等待I / O操作时浪费了资源。

切换到Convert-To-Vert.x-Web分支,我们可以看到Application类发生了一些变化。 现在,我们有了一些新的@Bean批注方法来注入Vertx实例本身,以及ObjectMapper实例(Jackson JSON库的一部分)。 我们还用新的CustomerVerticle替换了CustomerEnpoints类。 几乎所有其他内容都是相同的。

CustomerVerticle类带有@Component注释,这意味着Spring将在启动时实例化该类。 它还具有用@PostConstruct注释的start方法,以便在启动时启动Verticle。 查看代码的实际内容,我们看到Vert.x代码的第一部分: Router

Router类是vertx-web库的一部分,它使我们能够使用流畅的API来定义HTTP URL,方法和标头过滤器以进行请求处理。 将BodyHandler实例添加到默认路由可以处理POST / PUT正文并将其转换为JSON对象,然后Vert.x可以将其作为RoutingContext的一部分进行处理。 Vert.x中的路由顺序可能很重要。 如果您定义的路由具有某种形式的全局匹配(*或regex),则除非实现chaining ,否则它会吞下对其后定义的路由的请求。 我们的示例最初显示了3条路线。

@PostConstructpublic void start() throws Exception {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.get("/v1/customer/:id").produces("application/json").blockingHandler(this::getCustomerById);router.put("/v1/customer").consumes("application/json").produces("application/json").blockingHandler(this::addCustomer);router.get("/v1/customer").produces("application/json").blockingHandler(this::getAllCustomers);vertx.createHttpServer().requestHandler(router::accept).listen(8080);}

请注意,定义了HTTP方法,定义了“ Accept”标头(通过消耗),定义了“ Content-Type”标头(通过生产)。 我们还看到我们正在通过对blockingHandler方法的调用传递对请求的处理。 Vert.x路由的阻塞处理程序接受RoutingContext对象,因为它是唯一的参数。 RoutingContext包含Vert.x请求对象,响应对象以及任何参数/ POST主体数据(例如“:id”)。 您还将看到,我使用了方法引用而不是lambda来将逻辑插入blockingHandler(我发现它更具可读性)。 这3条请求路由的每个处理程序都在该类中一个单独的方法中定义。 这些方法基本上只是调用DAO上的方法,根据需要进行序列化或反序列化,设置一些响应头,并通过发送响应来结束请求。 总体而言,非常简单明了。

private void addCustomer(RoutingContext rc) {try {String body = rc.getBodyAsString();Customer customer = mapper.readValue(body, Customer.class);Customer saved = dao.save(customer);if (saved!=null) {rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));} else {rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");}} catch (IOException e) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", e);}}private void getCustomerById(RoutingContext rc) {log.info("Request for single customer");Long id = Long.parseLong(rc.request().getParam("id"));try {Customer customer = dao.findOne(id);if (customer==null) {rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");} else {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));}} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}private void getAllCustomers(RoutingContext rc) {log.info("Request for all customers");List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());try {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}

您可能会说:“但是,这比我的Spring注释和类还要更多的代码和混乱”。 这可能是正确的,但实际上取决于您如何实现代码。 这只是一个介绍性的示例,因此我使代码非常简单易懂。 我可以使用Vert.x的注释库以类似于JAX-RS的方式实现端点。 此外,我们还获得了可扩展性的巨大改进。 在幕后,Vert.x Web使用Netty进行低级异步I / O操作,从而使我们能够处理更多并发请求(受数据库连接池的大小限制)。

通过使用Vert.x Web库,我们已经对该应用程序的可伸缩性和并发性进行了一些改进,但是我们可以通过实现Vert.x EventBus来进行一些改进。 通过将数据库操作分为Worker Verticles而不是使用blockingHandler,我们可以更有效地处理请求处理。 这显示在“ 转换为工作人员垂直”分支中。 应用程序类保持不变,但是我们更改了CustomerEndpoints类,并添加了一个名为CustomerWorker的新类。 此外,我们添加了一个名为Spring Vert.x Extension的新库,该库为Vert.x Verticles提供了Spring Dependency Injections支持。 首先查看新的CustomerEndpoints类。

@PostConstructpublic void start() throws Exception {log.info("Successfully create CustomerVerticle");DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {if (res.succeeded()) {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());final DeliveryOptions opts = new DeliveryOptions().setSendTimeout(2000);router.get("/v1/customer/:id").produces("application/json").handler(rc -> {opts.addHeader("method", "getCustomer").addHeader("id", rc.request().getParam("id"));vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});router.put("/v1/customer").consumes("application/json").produces("application/json").handler(rc -> {opts.addHeader("method", "addCustomer");vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));});router.get("/v1/customer").produces("application/json").handler(rc -> {opts.addHeader("method", "getAllCustomers");vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});vertx.createHttpServer().requestHandler(router::accept).listen(8080);} else {log.error("Failed to deploy worker verticles.", res.cause());}});}

路由相同,但实现代码不同。 现在,我们不再使用对blockingHandler的调用,而是实现了适当的异步处理程序,该处理程序在事件总线上发送事件。 此Verticle中不再进行任何数据库处理。 我们已将数据库处理移至一个工作线程,该线程具有多个实例,以线程安全的方式并行处理多个请求。 我们还为这些事件的回复时间注册了一个回调,以便我们可以向发出请求的客户端发送适当的响应。 现在,在CustomerWorker Verticle中,我们已经实现了数据库逻辑和错误处理。

@Override
public void start() throws Exception {vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest);
}public void handleDatabaseRequest(Message<Object> msg) {String method = msg.headers().get("method");DeliveryOptions opts = new DeliveryOptions();try {String retVal;switch (method) {case "getAllCustomers":retVal = mapper.writeValueAsString(dao.findAll());msg.reply(retVal, opts);break;case "getCustomer":Long id = Long.parseLong(msg.headers().get("id"));retVal = mapper.writeValueAsString(dao.findOne(id));msg.reply(retVal);break;case "addCustomer":retVal = mapper.writeValueAsString(dao.save(mapper.readValue(((JsonObject)msg.body()).encode(), Customer.class)));msg.reply(retVal);break;default:log.error("Invalid method '" + method + "'");opts.addHeader("error", "Invalid method '" + method + "'");msg.fail(1, "Invalid method");}} catch (IOException | NullPointerException e) {log.error("Problem parsing JSON data.", e);msg.fail(2, e.getLocalizedMessage());}
}

CustomerWorker工人垂直服务器在事件总线上注册消费者以获取消息。 代表事件总线上地址的字符串是任意的,但建议使用反向tld样式的命名结构,以确保地址唯一(com.zanclus.customer)很简单。 每当有新消息发送到该地址时,它将被发送到一个,只有一个工作层。 然后,工作层将调用handleDatabaseRequest来完成数据库工作,JSON序列化和错误处理。

你有它。 您已经看到,可以将Vert.x集成到旧版应用程序中,以提高并发性和效率,而不必重写整个应用程序。 我们可以使用现有的Google Guice或JavaEE CDI应用程序完成类似的操作。 当我们在Vert.x中尝试添加响应功能时,所有业务逻辑都可能保持相对不变。 下一步由您决定。 接下来的一些想法包括Clustering , WebSockets和ReactiveX sugar的VertxRx 。

翻译自: https://www.javacodegeeks.com/2015/12/reactive-development-using-vert-x.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用xtrabackup(innobackupex)实现MySQL的热备

mysql 的热备http://www.178linux.com/10139http://www.linuxidc.com/Linux/2014-04/99671.htmhttp://634871.blog.51cto.com/624871/1351049http://www.cnblogs.com/galengao/p/5755835.htmlhttp://heylinux.com/archives/3777.html 遇到问题&#xff0c;解决链接 http://blog…

mysql查看执行计划_MySql中如何使用 explain 查询 SQL 的执行计划

explain命令是查看查询优化器如何决定执行查询的主要方法。这个功能有局限性&#xff0c;并不总会说出真相&#xff0c;但它的输出是可以获取的最好信息&#xff0c;值得花时间去了解&#xff0c;因为可以学习到查询是如何执行的。1、什么是MySQL执行计划要对执行计划有个比较好…

vim的tab键设定

多在windows上编程的童鞋可能习惯于感受tab键为4个空格的长度&#xff0c;不过在linux系统中一般默认设定tab键为8个空格长度来显示。事实上tab也确实是8个空格的长度。不过由于习惯问题&#xff0c;某些童鞋还是希望在linux上也感受tab键为4个空格长度的显示&#xff0c;因为v…

依赖注入

依赖注入指的就是类A依赖于类B&#xff0c;通过外部注入的方式来实现&#xff0c;而不是通过自身去实现。 这样做的好处就是可以使得代码之间更加的解耦。 举个例子&#xff0c;船&#xff08;ship&#xff09;包含有船桨&#xff08;oar&#xff09;&#xff0c;以下为高耦合的…

idea内置junit5_JUnit的内置Hamcrest Core Matcher支持

idea内置junit5在通过JUnit和Hamcrest改善对assertEquals的文章中&#xff0c;我简要讨论了Hamcrest “ 核心 ”匹配器与JUnit的现代版本“结合”在一起的情况。 在那篇文章中&#xff0c;我特别关注了JUnit的assertThat&#xff08;T&#xff0c;Matcher&#xff09;静态方法与…

jenkins 发送邮件模板

jenkins 发送邮件模板 <!DOCTYPE html> <html> <head> <meta charset"UTF-8"> <title>${ENV, var"JOB_NAME"}-第${BUILD_NUMBER}次构建日志</title> </head><body leftmargin"8" marginwidth"…

centos lnmp源码安装mysql_CentOS 6.6 下源码编译安装MySQL 5.7.5

说明&#xff1a;CentOS 6.6 下源码编译安装MySQL 5.7.51. 安装相关工具# yum -y install gcc-c ncurses-devel cmake make perl \gcc autoconf automake zlib libxml libgcrypt libtool bison2. 清理环境检查boost版本&#xff1a;# rpm -qa boost*卸载boost-*等库&#xff1a…

Oracle Spring Clean JavaFX应该吗?

我们确实在Codename One上依赖JavaFX&#xff0c;我们的模拟器需要它。 我们的桌面版本使用它&#xff0c;而我们的设计器工具基于Swing。 我们希望它成功&#xff0c;这对我们的业务至关重要&#xff01; 即使您是Java EE开发人员并且不关心桌面编程&#xff0c;我们也不是一个…

laravel mysql 锁表_Laravel中MySQL的乐观锁与悲观锁

MySQL/InnoDB的加锁&#xff0c;是一个老生常谈的话题。在数据库高并发请求下&#xff0c;如何兼顾数据完整性与用户体验的敏捷性是一代又一代程序员一直在思考的问题。乐观锁乐观锁之所以叫乐观&#xff0c;是因为这个模式不会对数据加锁。而是对数据操作保持一种乐观的心态&a…

python初心记录二

切片 L [1,2,3,4,5,6,7,8,9,0] L[0:3] L[-3] 迭代 如果给定一个list或tuple&#xff0c;我们可以通过for循环来遍历这个list或tuple&#xff0c;这种遍历我们称为迭代&#xff08;Iteration&#xff09;。 因为dict的存储不是按照list的方式顺序排列&#xff0c;所以&#xff0…

Spring对事物的实现

Spring对待事物的实现有一个标签可以使用Transactional标签来实现事务的管理&#xff0c;但是在使用的时候很多人不清楚实现的原理而使用了错误的使用方式&#xff0c;导致日志里的确是打印了&#xff0c;但是方法的确没有回滚。 我在遇到问题的时候发生了这样的问题&#xff0…

mysql 超长记录_谁记录了mysql error log中的超长信息(记pt-stalk一个bug的定位过程)...

【问题】最近查看MySQL的error log文件时&#xff0c;发现有很多服务器的文件中有大量的如下日志&#xff0c;内容很长(大小在200K左右)&#xff0c;从记录的内容看&#xff0c;并没有明显的异常信息。有一台测试服务器也有类似的问题&#xff0c;为什么会记录这些信息&#xf…

glassfish发布应用_Arquillian 1.0.0.Final正式发布! 准备使用GlassFish和WebLogic! 所有虫子死亡!...

glassfish发布应用红帽公司和JBoss社区今天宣布的1.0.0.Final发布的Arquillian &#xff0c;其屡获殊荣的建在Java虚拟机&#xff08;JVM&#xff09;运行测试平台。 Arquillian大大减少了编写和执行Java中间件集成和功能测试所需的工作。 它甚至使测试工程师能够解决以前认为无…

php中 ob_start()有什么作用

<?php ob_start(); //开启缓冲区 echo "这是第一次输出内容!\n"; $ff[1] ob_get_contents() ; //获取当前缓冲区内容 ob_flush();//缓冲器清除 echo "这是第二次输出内容!\n"; $ff[2] ob_get_contents() ; //获取当前缓冲区内容 echo "这是第三…

mysql mycat one_Mycat 整合 MySQL 8.x 踩坑实践

Mycat 目前还未全面支持MySQL 8以上的版本&#xff0c;可能会导致一些问题&#xff0c;例如Mycat连接MySQL 8时可能会报密码错误&#xff0c;因为新版的密码加密方式与旧版不一样。还有就是时区问题&#xff0c;新版的连接方式需要增加时区参数。除此之外&#xff0c;可能还会有…

使用ADF列表视图的主从数据

最近&#xff0c;从UI角度来看&#xff0c;ADF Faces 表组件不再被认为很酷。 对于显示数据集合&#xff0c; 列表视图今天应该很酷。 这并不意味着我们根本不应该使用af&#xff1a;table 。 在某些情况下&#xff08;经常是:)&#xff09;&#xff0c;表比列表视图更适合。 但…

mysql memory=off_MySQL内存调优

原文链接: MySQL Memory Allocation -- by Rick James原文日期: Created 2010; Refreshed Oct, 2012, Jan, 2014翻译人员: 铁锚翻译日期: 2014年5月28日MySQL 内存分配—— 高速设置方案假设仅使用MyISAM存储引擎,设置 key_buffer_size为可用内存的20%,(再加上设置 innodb_buff…

Seajs的用法

以前经常听到Seajs&#xff0c;但是没深入了解过&#xff0c;不清楚到底是用做哪个方面&#xff0c;后来调组到M站做开发&#xff0c;发现项目用到了Seajs&#xff0c;便去了解下 SeaJS是一个遵循CMD规范的JavaScript模块加载框架&#xff0c;可以实现JavaScript的模块化开发及…

java 调用私有方法_公开调用私有Java方法?

java 调用私有方法我们是Java开发人员&#xff0c;在Java中已知4种访问修饰符&#xff1a;私有&#xff0c;受保护&#xff0c;公共和包。 好吧&#xff0c;除了私有以外&#xff0c;最后三个可以通过继承&#xff0c;相同的包或实例从类外部调用。 现在&#xff0c;常见的问题…

mysql慢查询开启语句分析_linux下开启mysql慢查询,分析查询语句

mysql> show variables like "%long%"; //查看一下默认为慢查询的时间10秒----------------------------| Variable_name | Value |----------------------------| long_query_time | 10.000000 |----------------------------1 row in set (0.00 sec)mysql> s…