Spring,Reactor和ElasticSearch:从回调到反应流

Spring 5(以及Boot 2,在数周之内到货)是一次革命。 不是“ XML上的注释 ”或“ Java上的注释类 ”的革命。 这是一个真正的革命性框架,可以编写全新的应用程序类别。 近年来,我对此框架感到有些恐惧。 “ Spring Cloud是简化了Spring Boot使用的框架,Spring简化了Spring使用的框架,是简化了企业开发的框架。” start.spring.io (也称为“ 开始…点弹簧…点I…O ”)列出了120个可以添加到服务中的不同模块(!)。 这些天的春天变成了一个庞大的伞式项目,我可以想象为什么有些人(仍然!)偏爱Java EE(或这些天叫什么)。

但是Spring 5带来了革命性的革命。 它不再只是阻止servlet API和各种Web框架的包装器。 在Project Reactor之上的Spring 5允许编写高性能,超快速和可伸缩的服务器,完全避免了servlet堆栈。 该死的,CLASSPATH上没有Jetty甚至servlet API! 在Spring 5 Web-flux的核心,我们将找到Netty ,这是一个用于编写异步客户端和服务器的低级框架。 最终,Spring成为反应框架家族的一等公民。 Java开发人员可以实现快速服务,而不必离开自己的舒适区,也可以使用https://doc.akka.io/docs/akka-http/current/或https://www.playframework.com/ 。 Spring 5是用于构建高度可扩展且具有弹性的应用程序的完全被动的现代工具。 尽管如此,诸如控制器,Bean,依赖注入之类的基本原理都是相同的。 而且,升级路径很顺利,我们可以逐步添加功能,而不是学习全新的外来框架。 足够多的谈话,让我们写一些代码。

在本文中,我们将编写一个简单的无头应用程序,该应用程序可以在ElasticSearch中大量索引文档。 我们的目标是即使服务器速度变慢,也只需要几个线程即可实现数千个并发连接。 但是,与Spring Data MongoDB不同, Spring Data ElasticSearch本身不支持非阻塞存储库。 好吧,后者似乎不再维护了,当前版本已经3年了。 许多文章定位Spring 5 +的MongoDB其仓库返回无阻塞流( FluxFlowable的RxJava)。 这一点会更高级。

ElasticSearch 6 Java API使用RESTful接口,并使用非阻塞HTTP客户端实现。 不幸的是,它使用回调而不是像CompletableFuture这样的理智的东西。 因此,让我们自己构建客户端适配器。

使用Fluxes和Monos的ElasticSearch客户端

本文的源代码可在react reactive-elastic-search分支上的github.com/nurkiewicz/elastic-flux上找到。

我们想通过返回FluxMono来构建一个支持Project Reactor的ElasticSearch Java客户端。 当然,如果基础流是完全异步的并且不消耗线程,则将获得最大的好处。 幸运的是,Java API就是这样。 首先,让我们将ElasticSearch的客户端设置为Spring Bean:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;@Bean
RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200)).setRequestConfigCallback(config -> config.setConnectTimeout(5_000).setConnectionRequestTimeout(5_000).setSocketTimeout(5_000)).setMaxRetryTimeoutMillis(5_000));
}

在现实生活中,我们显然会参数化大多数这些东西。 我们将为简单的JSON文档建立索引,目前它们的内容并不重要:

@Value
class Doc {private final String username;private final String json;
}

我们将编写的代码包装RestHighLevelClient并通过返回Mono<IndexResponse>使它更高级Mono非常类似于CompletableFuture但有两个例外:

  • 这很懒–只要您不订阅,就不会开始计算
  • CompletableFuture不同, Mono可以正常完成而不会发出任何值

第二个区别总是对我有些误导。 在RxJava 2.x中,有两种不同的类型: Single (总是带有值或错误来完成)和Maybe (类似于Mono )。 太糟糕的Reactor并没有做到这一点。 没关系,适配器层是什么样的? 普通的Elastic API如下所示:

client.indexAsync(indexRequest, new ActionListener() {@Overridepublic void onResponse(IndexResponse indexResponse) {//got response}@Overridepublic void onFailure(Exception e) {//got error}
});

您可以看到前进的方向: callback hell 。 与其将自定义ActionListener公开为该逻辑的参数,不如将其包装在Mono

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;private Mono<IndexResponse> indexDoc(Doc doc) {return Mono.create(sink -> {IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());indexRequest.source(doc.getJson(), XContentType.JSON);client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {@Overridepublic void onResponse(IndexResponse indexResponse) {sink.success(indexResponse);}@Overridepublic void onFailure(Exception e) {sink.error(e);}});});
}

我们必须创建IndexRequest包装JSON文档,并通过RESTful API发送它。 但这不是重点。 我们正在使用Mono.create()方法,它有一些缺点,但稍后会介绍更多。 Mono是懒惰的,因此仅调用indexDoc()还不够,没有对ElasticSearch发出HTTP请求。 但是,每次有人订阅此单元素源时,都会执行create()内部的逻辑。 关键行是sink.success()sink.error() 。 它们将结果从ElasticSearch(来自后台异步线程)传播到流中。 在实践中如何使用这种方法? 非常简单!

Doc doc = //...
indexDoc(doc).subscribe(indexResponse -> log.info("Got response"));

当然,反应流处理的真正能力来自于组合多个流。 但是我们迈出了第一步:将基于回调的异步API转换为通用流。 如果您不愿意使用MongoDB,它会在存储库中内置支持诸如MonoFlux类的反应类型。 Cassandra和Redis也是如此 。 在下一篇文章中,我们将学习如何生成一些虚假数据并对其进行索引。

翻译自: https://www.javacodegeeks.com/2018/01/spring-reactor-elasticsearch-callbacks-reactive-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348535.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通信工程施工设计纲领文件

通信工程设计基本概念 通信系统的重要组成部分&#xff1a;交换机、传输设备、基站控制器、无线基站 通信建设工程设计的任务&#xff1a;将上述器材有机结合&#xff0c;让他们形成预期的高效的通信系统&#xff0c;在经济社会建设中发挥最大的作用 名称作用通信建设工程设计…

删除文件夹下所有的文件_VB删除文件和文件夹的方法

在VB编程中&#xff0c;我们如何删除一个指定的文件&#xff0c;或者文件夹呢&#xff1f;本文&#xff0c;介绍两种方法&#xff0c;详细的介绍如何实现删除文件和文件夹&#xff0c;并对两种方法的区别做一个说明。一、删除文件的方法方法1&#xff1a;使用kill方法删除文件语…

vue 无法进入response拦截器_vue拦截器的一次实践

起因最近在做一个项目前端框架使用的是vue&#xff0c;项目接近尾声的时候发现需要增加一个对所有的http请求过滤的功能&#xff0c;所有的请求需要加上token再发送给服务器&#xff0c;服务器根据token判断用户身份是否有效&#xff0c;响应也需要过滤&#xff0c;判断是否授权…

无线网络设计基础

移动电波的传播特点 受到地形影响因素大存在严重的多径衰落迹象存在固定通信中没有的阴影衰落存在相对运动引起的多普勒效应存在由时延散布引起的信号波形展宽 无线收发信设备、天馈线系统、无线电信道组成无线通信系统 自由空间传输损耗 理想条件下&#xff1a; Lbs32.4520…

lambdas for_Lambdas:来到您附近的Java 8!

lambdas for什么是Lambda&#xff1f; Lambda表达式是一种匿名函数&#xff0c;可以在方法中内联编写&#xff0c;并且可以在使用表达式的任何地方使用。 有时您可能会发现它们被称为闭包&#xff0c;尽管我在下面解释了对该参考的一些注意事项。 像普通的Java方法一样&#xf…

python反转一个整数、123变成321_python整数反转算法

题目描述&#xff1a;给出一个 32 位的有符号整数&#xff0c;你需要将这个整数中每位上的数字进行反转。示例 1:输入: 123输出: 321示例 2:输入: -123输出: -321示例 3:输入: 120输出: 21注意:假设我们的环境只能存储得下 32 位的有符号整数&#xff0c;则其数值范围为 [−231…

GSM网络与CDMA网络话务量、基站容量相关计算

GSM网络与CDMA网络的区别 网络类型GSMCDMA编码方式频分多址和时分多址相结合方式码分多址覆盖面积较小较大容量小大话音质量较低支持软切换&#xff0c;和较软切换&#xff0c;使得用户在基站边缘通话时信号更加稳定 GSM规范中推荐使用的频道配置规范 GSM的频道配置 每个小区…

python开源流程图软件_Dia

软件简介Dia是开放源码的流程图软件&#xff0c;是GNU计划的一部分&#xff0c;程式创立者 是Alexander Larsson。Dia使用single documentinterface (CSDI)模式&#xff0c;类似于GIMP。Dia将多种需求以模组化来设计&#xff0c;如流程图、网络图、电路图等。各模组之间的符号仍…

Spring,Reactor和ElasticSearch:使用伪造的测试数据进行标记

在上一篇文章中&#xff0c;我们创建了一个从ElasticSearch的API到Reactor的Mono的简单适配器&#xff0c;如下所示&#xff1a; import reactor.core.publisher.Mono;private Mono indexDoc(Doc doc) {//... }现在&#xff0c;我们希望以受控的并发级别运行此方法数百万次。 …

通信工程施工图案例分析

分析入口 给你一个城市&#xff0c;要如何开始分析&#xff0c;如下图&#xff1a; 分析一下城市的情况&#xff1a; 主要有&#xff1a; 行政区域地理位置&#xff0c;周围的地形经济发展情况X事发展的情况交通发展的情况其他&#xff1a;还包括农林、旅游、气象、地质、水…

参数整定临界比例度实验_PID理解起来很难?系统讲解PID控制及参数调节,理论加实际才好!...

在实际工程中&#xff0c;应用最为广泛的调节器控制规律为比例、积分、微分控制&#xff0c;简称PID控制&#xff0c;又称PID调节。PID控制器问世至今以其结构简单、稳定性好、工作可靠、调整方便而成为工业控制的主要技术之一。PID调节控制是一个传统控制方法&#xff0c;它适…

工程计价里面的表以及相关税额的计算

序号表名表一工程预算总表表二建筑安装工程费用预算表表三&#xff08;甲&#xff09;建筑安装工程量算表表三&#xff08;乙&#xff09;建筑安装工程施工机械使用费算表表三&#xff08;丙&#xff09;建筑安装工程仪器仪表使用费预算表表四&#xff08;甲&#xff09;国内器…

apache jmeter_Apache Server和JMeter调试

apache jmeter我一直在使用JMeter为生产服务器生成负载以测试我的应用程序。 该测试计划具有13个以上的HTTP采样器以发出不同的请求&#xff0c;并具有一个正则表达式提取器以从响应中提取一些值。 此值在连续的HTTP Sampler中使用。 这个测试用例简单而直接。 最初&#xff0c…

CDMA系统的三种码

CDMA系统中用到的三种码&#xff1a;PN码、Wash码、长码 种类作用PN码伪随机码&#xff0c;主要是把发送的序列转化为伪随机序列Wash码消除或者抑制多址干扰&#xff0c;如果多址信号是正交的&#xff0c;那么多址干扰可以减小到0&#xff1b;区分前向信道长码区分用户的业务信…

4计算准确率_孩子计算总出错?4个好方法帮助低年级学生提高计算准确率!

低年级孩子&#xff0c;由于活泼好动&#xff0c;注意力不容易集中&#xff0c;思维容易被分散。表现在学业上&#xff0c;就会出现学习水平参差不齐的情况。而最主要的表现&#xff0c;就是计算能力的差异。据资深数学老师观察&#xff1a;成绩好的孩子&#xff0c;一般不只掌…

使用Speedion 3.0.17或更高版本轻松从事务中返回值

交易次数 在我以前的文章中&#xff0c;我写了关于如何使用Speedment轻松使用事务的方法&#xff0c;其中我们原子地更新了两个银行帐户。 众所周知&#xff0c;事务是一种将多个数据库操作组合到一个原子执行的单个操作中的方法。 但是事务不仅与更新数据库有关&#xff0c;而…

无线业务需求的线路设计以及拓扑图实现

MSC →\rightarrow→BSC →\rightarrow→ BTS ADM-1 →\rightarrow→ ADM-14 →\rightarrow→ ADM-16 边缘层 →\rightarrow→ 汇聚层 →\rightarrow→ 核心层 设计光缆链路时要注意的问题&#xff1a; 链路要适当长一些&#xff0c;减少中继设备接续造成的损耗尽可能多的使…

双代号网络图节点时间参数_管理和实务都考!快速学会单代号与双代号参数计算...

工程网络计划是二级建造师《建设工程施工管理》科目每年均会进行考核的高频考点&#xff0c;重点在双代号、单代号网络计划的概念及应用。主要题型为通过网络图或文字描述计算相关网络参数或确定关键线路&#xff0c;本篇就双代号、单代号网络计划相关参数的计算&#xff0c;以…

idea中maven执行install报错_IntelliJ IDEA Maven编译install时报错,无效的发行版:1.8

1.首先看java环境是否配置正确JAVA_HOME : C:\Program Files\Java\jdk1.8.0_92 //安装的Jdk路径PATH: %JAVA_HOME%\bin;按下"window"R 输入cmd 按下"ENTER" 输入java -version&#xff0c;如果出现了版本号等信息说明配置成功2.确认maven配…

gc可视化分析_GC内存可视化器教程–第一部分

gc可视化分析正如您从过去的文章中可能已经读到的那样&#xff0c;要获得的Java程序员的一项关键技能就是理解和评估JVM的运行状况的能力&#xff0c;例如Java堆内存占用量以及垃圾回收过程。 为了实现上述目标&#xff0c;所有JVM供应商&#xff08;Oracle&#xff0c;IBM等&…