使用Spring Reactor Core进行分散收集

我在使用Netflix Rx-Java库方面有良好的工作经验,并且以前曾写过关于使用Rx-Java和Java 8 CompletableFuture解决分散式问题的博客。 在这里,我想探索使用Spring Reactor Core库应用相同的模式。

tldr –如果您熟悉Netflix Rx-Java,您已经很熟悉Spring Reactor Core,API的地图,并且我很高兴看到Spring Reactor团队在Javadoc API中勤奋地使用了Marble图。

另一个快速点是, rx.Observable根据是否要发射许多项目或是否要发射一个项目来映射到Flux或Mono 。

有了这个,我可以直接进入示例–我执行了一个简单的任务(使用延迟模拟),该任务被生成了几次,我需要同时执行这些任务,然后收集结果,使用rx表示如下。可观察的代码:

@Test
public void testScatterGather() throws Exception {ExecutorService executors = Executors.newFixedThreadPool(5);List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());Observable<List<String>> merged = Observable.merge(obs).toList();List<String> result = merged.toBlocking().first();logger.info(result.toString());}private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);s.onNext( i + "-test");s.onCompleted();}).subscribeOn(Schedulers.from(executorService));
}

请注意,我纯粹是出于测试目的。 现在,使用Spring Reactor Core的类似代码可以转换为以下代码:

@Test
public void testScatterGather() {ExecutorService executors = Executors.newFixedThreadPool(5);List<Flux<String>> fluxList = IntStream.range(0, 10).boxed().map(i -> generateTask(executors, i)).collect(Collectors.toList());Mono<List<String>> merged = Flux.merge(fluxList).toList();List<String> list = merged.get();logger.info(list.toString());}public Flux<String> generateTask(ExecutorService executorService, int i) {return Flux.<String>create(s -> {Util.delay(2000);s.onNext(i + "-test");s.onComplete();}).subscribeOn(executorService);
}

它或多或少地一对一映射。 Mono类型的区别很小,我个人认为该类型是反应式库的不错的介绍,因为它可以很清楚地表明是否发出了多个项目,而只发出了一个。样品。 这些对我来说仍然是早期的探索,我希望对这个优秀的图书馆更加熟悉。

翻译自: https://www.javacodegeeks.com/2016/04/scatter-gather-using-spring-reactor-core.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352795.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

添加操作审计记录

1.所有操作审计记录 在环境变量/etc/profile中加入如下字段&#xff0c;可记录所有用户登录系统的操作 #history bash USERwhoami USER_IPwho -u am i 2>/dev/null| awk {print $NF}|sed -e s/[()]//g if [ "$USER_IP" "" ]; then USER_IPhostname fi …

android sharesdk分享功能,Android ShareSDK快速实现分享功能

第一步 &#xff1a;获取ShareSDK为了集成ShareSDK&#xff0c;您首先需要到ShareSDK官方网站注册并且创建应用&#xff0c;获得ShareSDK的Appkey&#xff0c;然后到SDK的下载页面下载SDK的压缩包&#xff0c;解压以后可以得到如下图的目录结构&#xff1a;ShareSDK在“ShareSD…

shell编程-分支语句

目标&#xff1a;完成这一章&#xff0c;你将能够作以下事情&#xff1a;描述条件分支语句中返回值的作用。 使用test命令来分析一个命令的返回值。 在shell程序中使用if和case结构。 1.返回值shell变量“&#xff1f;”中保存上一个被执行命令的返回值&#xff1a;0&#xff1…

android自定义表盘部件,Android自定义view仿支付宝芝麻信用表盘

演示效果实现步骤&#xff1a;1.画不同宽度和半径的内外圆弧2.通过循环旋转canvas&#xff0c;在固定位置绘制短线刻度&#xff0c;长线刻度&#xff0c;刻度文字3.绘制view中心几个文本&#xff0c;并调整位置4.实时更新当前旋转角度刷新小圆点位置&#xff1b;5.判断分数应该…

记录的详细操作

拷贝表拷贝结构 与数据create table copy_table select *from customer ;仅拷贝结构create table copy_table select *from customer where 0 > 1;共同点&#xff1a; 索引 描述&#xff08;自增&#xff09; 不能以下语法中记录的详细操作[] 表示可选的{}表示必选的增ins…

linux查看文件有多少行

使用wc命令 具体通过wc --help 可以查看。 如&#xff1a;wc -l filename 就是查看文件里有多少行 wc -w filename 看文件里有多少个word。 wc -L filename 文件里最长的那一行是多少个字。 wc命令 wc命令的功能为统计指定文件中的字节数、字数、行数, 并将统计结果显示输出。 …

java 消息通知_用Java弹出创建新的消息通知

java 消息通知首先创建JFrame作为弹出窗口。 在其中添加一些JLabel以包含信息&#xff0c;并在适当的位置分配它们&#xff0c;使其看起来像一条通知消息。 下面给出了示例代码&#xff1a; String message You got a new notification message. Isnt it awesome to have suc…

vs android 压缩,Android Studio是否压缩classes.dex文件?

看起来输出文件夹中apk文件的classes.dex与已安装的应用程序不同.我正在使用classes.dex文件来解决一些安全问题,所以通常我解压缩最终的apk文件并从classes.dex文件中获取信息.但是当我在运行时读取classes.dex文件时文件大小是如此不同. (8MB vs 46KB)应用程序本身工作得很好…

自己写的py文件中调用django models

import os os.environ[DJANGO_SETTINGS_MODULE] 项目名.settingsimport djangodjango.setup()from blog import modelsentry models.Entry.objects.get(pk1)tech_blog models.Blog.objects.get(name科技)print(entry, tech_blog) 转载于:https://www.cnblogs.com/dangrui072…

shell脚本 -d 是目录文件,那么-e,-f分别是什么?还有! -e这又是什么意思呢?

shell脚本 -d 是目录文件&#xff0c;那么-e&#xff0c;-f分别是什么?还有"&#xff01; -e"这又是什么意思呢&#xff1f; -e filename 如果 filename存在&#xff0c;则为真 -d filename 如果 filename为目录&#xff0c;则为真 -f filename 如果 filename为常规…

将Java应用程序作为Windows服务安装

这听起来像是您不需要的东西&#xff0c;但是有时候&#xff0c;当您分发最终用户软件时&#xff0c;可能需要将Java程序安装为Windows服务。 我之所以必须这样做&#xff0c;是因为我开发了一种用于公务员的工具 &#xff0c;可以自动将其Excel文件转换并将其推入我国的openda…

怎样实现banner自动播放html,纯CSS3实现banner图片自动轮播效果方式总结

自动轮播&#xff1a;实现切换图片&#xff0c;图片循环播放&#xff1b;鼠标悬停某张图片&#xff0c; 则暂停切换。css方法一、opacity控制透明度实现轮播效果依照需求咱们选择用CSS3的animation动画进行实现&#xff1b;transition动画须要触发才能启动&#xff0c;html因此…

你好a+b(非入门)

题目传送门&#xff1a;https://www.nowcoder.com/acm/contest/165/A来源&#xff1a;牛客网 牛牛刚学习了输入输出&#xff0c;他遇到了一道这样的题目。 输入2个整数a和b保证输入的a和b在long long范围之内&#xff0c;即满足-9223372036854775808 < a, b < 9223372036…

/etc/sysconfig/i18n文件详解

编辑/etc/sysconfig/i18n这个文件&#xff0c; 不管你装的是中文版,还是英文版.删掉原来的设置,把下面的拷贝过去 LANG"zh_CN.GB18030" SUPPORTED"zh_CN.GB18030:zh_CN:zh:en_US.UTF-8:en_US:en" SYSFONT"latarcyrheb-sun16" 保存,重起.OK了 这时…

201771010112罗松《面向对象程序设计(java)》第三周学习总结

实验三 Java基本程序设计 201771010112 罗松 1、实验目的与要求 &#xff08;1&#xff09;进一步掌握Eclipse集成开发环境下java程序开发基本步骤&#xff1b; &#xff08;2&#xff09;熟悉PTA平台线上测试环境&#xff1b; &#xff08;3&#xff09;掌握Java语言构造基本…

构建openjdk镜像_在Windows上构建OpenJDK

构建openjdk镜像通过做一些实验&#xff0c;我发现手头提供JDK源代码来进行一些更改&#xff0c;使用它等等通常很有用。因此&#xff0c;我决定下载并编译该野兽。 显然&#xff0c;这花了我一些时间&#xff0c;尽管我最初的想法是&#xff0c;它应该和运行make命令一样简单:…

html图像特征提取,图像识别之图像特征提取

图像识别之图像特征提取HOG特征&#xff1a;方向梯度直方图(Histogram of Oriented Gradient, HOG)特征是一种在计算机视觉和图像处置中用来停止物体检测的特征描绘子。它经过计算和统计图像部分区域的梯度方向直方图来构成特征。Hog特征分离SVM分类器曾经被普遍应用于图像辨认…

190. 颠倒二进制位

题目 代码 class Solution { public:uint32_t reverseBits(uint32_t n) {n(n>>16)|(n<<16);n((n&0xff00ff00)>>8)|((n&0x00ff00ff)<<8);n((n&0xf0f0f0f0)>>4)|((n&0x0f0f0f0f)<<4);n((n&0xcccccccc)>>2)|((n&am…

Linux--date命令 date命令

Linux--date命令 date命令 date命令的功能是显示和设置系统日期和时间。 该命令的一般格式为&#xff1a; date [选项] 显示时间格式&#xff08;以开头&#xff0c;后面接格式&#xff09; date 设置时间格式 命令中各选项的含义分别为&#xff1a; -d datestr, --date datest…

为某人命名以重新连接到您的服务器

在进行测试自动化时&#xff0c;通常需要知道当前计算机的名称&#xff0c;以提示另一台计算机连接到它&#xff0c;特别是在并行运行测试的情况下。 本周&#xff0c;我试图对服务器进行测试&#xff0c;以使其连接回在从属测试计算机上运行的WireMock服务器。 堆栈溢出的标准…