fork join框架_Java中的Fork / Join框架的简要概述

fork join框架

Fork / Join框架是使用并发分治法解决问题的框架。 引入它们是为了补充现有的并发API。 在介绍它们之前,现有的ExecutorService实现是运行异步任务的流行选择,但是当任务同质且独立时,它们会发挥最佳作用。 运行依赖的任务并使用这些实现来组合其结果并不容易。 随着Fork / Join框架的引入,人们试图解决这一缺陷。 在本文中,我们将简要介绍API,并解决几个简单的问题以了解其工作原理。

解决非阻塞任务

让我们直接跳入代码。 让我们创建一个任务,该任务将返回List的所有元素的总和。 以下步骤以伪代码表示我们的算法:

01.查找列表的中间索引

02.在中间划分列表

03.递归创建一个新任务,该任务将计算左侧部分的总和

04.递归创建一个新任务,该任务将计算正确部分的总和

05.将左总和,中间元素和右总和的结果相加

这是代码–

@Slf4j
public class ListSummer extends RecursiveTask<Integer> {private final List<Integer> listToSum;ListSummer(List<Integer> listToSum) {this.listToSum = listToSum;}@Overrideprotected Integer compute() {if (listToSum.isEmpty()) {log.info("Found empty list, sum is 0");return 0;}int middleIndex = listToSum.size() / 2;log.info("List {}, middle Index: {}", listToSum, middleIndex);List<Integer> leftSublist = listToSum.subList(0, middleIndex);List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());ListSummer leftSummer = new ListSummer(leftSublist);ListSummer rightSummer = new ListSummer(rightSublist);leftSummer.fork();rightSummer.fork();Integer leftSum = leftSummer.join();Integer rightSum = rightSummer.join();int total = leftSum + listToSum.get(middleIndex) + rightSum;log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);return total;}
}

首先,我们扩展了ForkJoinTask的RecursiveTask子类型。 这是我们期望并发任务返回结果时的扩展类型。 当任务不返回结果而仅执行效果时,我们扩展RecursiveAction子类型。 对于我们解决的大多数实际任务,这两个子类型就足够了。

其次,RecursiveTask和RecursiveAction都定义了一种抽象计算方法。 这是我们进行计算的地方。

第三,在我们的计算方法内部,我们检查通过构造函数传递的列表的大小。 如果为空,则我们已经知道总和的结果为零,然后我们立即返回。 否则,我们将列表分为两个子列表,并创建ListSummer类型的两个实例。 然后,我们在这两个实例上调用fork()方法(在ForkJoinTask中定义)–

leftSummer.fork();
rightSummer.fork();

导致将这些任务安排为异步执行的原因,稍后将在本文中解释用于此目的的确切机制。

之后,我们调用join()方法(也在ForkJoinTask中定义)以等待这两部分的结果

Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();

然后将其与列表的中间元素相加以获得最终结果。

添加了许多日志消息,以使示例更易于理解。 但是,当我们处理包含数千个条目的列表时,进行详细的日志记录(尤其是记录整个列表)可能不是一个好主意。

就是这样。 现在为测试运行创建一个测试类–

public class ListSummerTest {@Testpublic void shouldSumEmptyList() {ListSummer summer = new ListSummer(List.of());ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isZero();}@Testpublic void shouldSumListWithOneElement() {ListSummer summer = new ListSummer(List.of(5));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(5);}@Testpublic void shouldSumListWithMultipleElements() {ListSummer summer = new ListSummer(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(45);}
}

在测试中,我们创建一个ForkJoinPool的实例。 ForkJoinPool是用于运行ForkJoinTasks的唯一ExecutorService实现。 它采用一种称为工作窃取算法的特殊算法。 与其他ExecutorService实现相反,在其他实现中,只有一个队列包含要执行的所有任务,在工作窃取实现中,每个工作线程都获得其工作队列。 每个线程都从其队列开始执行任务。

当我们检测到ForkJoinTask可以分解为多个较小的子任务时,便将它们分解为较小的任务,然后在这些任务上调用fork()方法。 该调用导致子任务被推入执行线程的队列中。 在执行过程中,当一个线程用尽队列/没有要执行的任务时,它可以从其他线程的队列中“窃取”任务(因此称为“工作窃取”)。 与使用任何其他ExecutorService实现相比,这种窃取行为可以带来更高的吞吐量。

之前,当我们在leftSummer和rightSummer任务实例上调用fork()时,它们被推入执行线程的工作队列中,之后它们被池中的其他活动线程“偷”(依此类推),因为它们确实那时没有其他事情要做。

很酷吧?

解决阻止任务

我们刚才解决的问题本质上是非阻塞的。 如果我们想解决一个阻塞操作的问题,那么为了获得更好的吞吐量,我们将需要改变策略。

让我们通过另一个示例进行研究。 假设我们要创建一个非常简单的网络搜寻器。 该搜寻器将接收HTTP链接列表,执行GET请求以获取响应主体,然后计算响应长度。 这是代码–

@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {private final List<String> links;ResponseLengthCalculator(List<String> links) {this.links = links;}@Overrideprotected Map<String, Integer> compute() {if (links.isEmpty()) {log.info("No more links to fetch");return Collections.emptyMap();}int middle = links.size() / 2;log.info("Middle index: {}", links, middle);ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));log.info("Forking left partition");leftPartition.fork();log.info("Left partition forked, now forking right partition");rightPartition.fork();log.info("Right partition forked");String middleLink = links.get(middle);HttpRequester httpRequester = new HttpRequester(middleLink);String response;try {log.info("Calling managedBlock for {}", middleLink);ForkJoinPool.managedBlock(httpRequester);response = httpRequester.response;} catch (InterruptedException ex) {log.error("Error occurred while trying to implement blocking link fetcher", ex);response = "";}Map<String, Integer> responseMap = new HashMap<>(links.size());Map<String, Integer> leftLinks = leftPartition.join();responseMap.putAll(leftLinks);responseMap.put(middleLink, response.length());Map<String, Integer> rightLinks = rightPartition.join();responseMap.putAll(rightLinks);log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);return responseMap;}private static class HttpRequester implements ForkJoinPool.ManagedBlocker {private final String link;private String response;private HttpRequester(String link) {this.link = link;}@Overridepublic boolean block() {HttpGet headRequest = new HttpGet(link);CloseableHttpClient client = HttpClientBuilder.create().disableRedirectHandling().build();try {log.info("Executing blocking request for {}", link);CloseableHttpResponse response = client.execute(headRequest);log.info("HTTP request for link {} has been executed", link);this.response = EntityUtils.toString(response.getEntity());} catch (IOException e) {log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());this.response = "";}return true;}@Overridepublic boolean isReleasable() {return false;}}
}

我们创建ForkJoinPool.ManagedBlocker的实现,在其中放置阻塞的HTTP调用。 该接口定义了两个方法– block()和isReleasable() 。 block()方法是我们进行阻塞调用的地方。 在完成阻塞操作之后,我们返回true,指示不再需要进一步的阻塞。 我们从isReleasable()实现返回false,以向fork-join工作线程指示block()方法实现实际上可能在阻塞。 isReleasable()实现将在调用block()方法之前先由fork-join工作线程调用。 最后,我们通过调用ForkJoinPool.managedBlock()静态方法将HttpRequester实例提交到池中。 之后,我们的阻止任务将开始执行。 当它阻塞HTTP请求时,ForkJoinPool.managedBlock()方法还将安排一个备用线程,以在必要时确保足够的并行性被激活。

接下来,让我们将此实现用于测试驱动! 这是代码–

public class ResponseLengthCalculatorTest {@Testpublic void shouldReturnEmptyMapForEmptyList() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).isEmpty();}@Testpublic void shouldHandle200Ok() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(1).containsKeys("http://httpstat.us/200").containsValue(0);}@Testpublic void shouldFetchResponseForDifferentResponseStatus() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200","http://httpstat.us/302","http://httpstat.us/404","http://httpstat.us/502"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(4);}
}

今天就这样,伙计们! 与往常一样,任何反馈/改进建议/评论都将受到高度赞赏!

此处讨论的所有示例都可以在Github上找到( 特定提交 )。

大呼大叫的http://httpstat.us服务对开发简单的测试很有帮助。

翻译自: https://www.javacodegeeks.com/2019/01/brief-overview-fork-join-framework-java.html

fork join框架

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/332884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3模型大小_Github推荐一个国内牛人开发的超轻量级通用人脸检测模型

Ultra-Light-Fast-Generic-Face-Detector-1MB1MB轻量级通用人脸检测模型作者表示该模型设计是为了边缘计算设备以及低功耗设备&#xff08;如arm&#xff09;设计的实时超轻量级通用人脸检测模型。它可以用于arm等低功耗计算设备&#xff0c;实现实时的通用场景人脸。检测推理同…

macOS如何使用命令启动服务/停止服务/查看服务

文章目录开启服务停止服务查看服务是否启动开启服务 使用命令开启 sshd 服务&#xff1a; $ sudo launchctl load -w /System/Library/LaunchDaemons/ssh.plist注&#xff1a;成功启动不会有任何输出 停止服务 $ sudo launchctl unload -w /System/Library/LaunchDaemons/s…

如何在用例之间传递值_如何从0搭建自己的自动化测试体系

大家好&#xff0c;我是爱吃面条&#xff0c;今天给大家讲讲如何从0搭建自己的自动化测试体系1. 需求和目标在我开展自动化测试之前&#xff0c;其实该项目以前的测试人员也已经写了很多的接口测试用例&#xff0c;但是大多数用例处于“半瘫痪”状态&#xff0c;在CI上无人维护…

envoy api 网关_为Envoy构建控制平面的指南-特定于域的配置API

envoy api 网关建立您的控制平面交互点和API表面 一旦考虑了哪些组件可能构成控制平面体系结构&#xff08;请参见上一章&#xff09;&#xff0c;您将要确切考虑用户将如何与控制平面进行交互&#xff0c;甚至更重要的是&#xff0c; 用户将是谁&#xff1f; 要回答这个问题&a…

异步非阻塞_细说同步异步、阻塞非阻塞

同步、异步同步、异步分别指的是一种通讯方式&#xff0c;当 cpu 不需要执行线程上下文切换就能完成任务&#xff0c;此时便认为这种通讯方式是同步的&#xff0c;相对的如果存在cpu 上下文切换&#xff0c;这种方式便是异步。这里通过一个去食堂打饭的示例来理解什么是同步、异…

如何将网页部署到maven_如何通过Rultor将Maven工件部署到CloudRepo

如何将网页部署到maven在我以前的文章中 &#xff0c;我描述了如何在Amazon S3中设置私有Maven存储库并通过Rultor进行部署。 如果您熟悉管理Amazon Web Services&#xff08;AWS&#xff09;&#xff0c; S3和AWS Identity and Access Management&#xff08;IAM&#xff09;的…

macOS下如何使用命令启动数据库/停止数据库/重启数据库(MySQL)

文章目录启动数据库手动启动数据库自启动数据库查看数据库服务是否启动重启数据库服务停止数据库服务注意使用 sudo 执行命令会报错&#xff0c;所以不要使用 sudo&#xff1a; sudo mysql.server start . ERROR! The server quit without updating PID file (/usr/local/var/…

table 多行 宽度不一致_layui table 中固定列的行高和table行高不一致

Pipe&lpar;点积叉积的应用POJ1039&rpar;Pipe Time Limit: 1000MS Memory Limit: 10000K Total Submissions: 9723 Accepted: 2964 Description ...cocos2d-x之CCMotionStreak类&mdash&semi;&mdash&semi;2013-08-25 16在游戏的实现过程中,有时会需要…

java rest框架_比较Java REST文档框架

java rest框架决定在记录REST API时选择哪种Java框架可能很麻烦。 在本博文中&#xff0c;我们将简要比较我们自己使用的REST Web服务的三个文档框架&#xff0c;以及它们如何与Spring框架&#xff08;这是Foreach中最常用的Java框架&#xff09;集成。 这些是RESTful API建模语…

数据库的事务学习

文章目录事务四大特征多事务并发存在的问题事务隔离级别查询数据库的隔离级别设置数据库隔离级别1.事务里面的语句出错时并不会主动回滚&#xff0c;需要用户主动执行回滚指令 2.事务开始后&#xff0c;如果没有主动执行回滚或者提交指令&#xff0c;事务始终在执行中 3.事务执…

注解动态赋值_Java注解是如何玩转的,面试官和我聊了半个小时

作者&#xff1a;wind瑞 来自&#xff1a;JavaQ面试官&#xff1a;自定义的Java注解是如何生效的&#xff1f; 小白&#xff1a;自定义注解后&#xff0c;需要定义这个注解的注解解析及处理器&#xff0c;在这个注解解析及处理器的内部&#xff0c;通过反射使用Class、Method、…

阿里云服务器如何创建快照备份数据

文章目录创建快照快照收费你申请了阿里云服务器&#xff0c;其实就是申请了一个云盘&#xff0c;这个云盘安装了操作系统&#xff0c;你可以远程访问这个系统并使用该系统。我们可以给云盘创建快照&#xff0c;从而达到备份服务器数据的目的。创建快照 快照收费 如果您计划不再…

shebang_Java 11:运行单文件程序和“ shebang”脚本

shebang在Java 11中&#xff0c;对java启动器进行了增强&#xff0c;可以直接运行单文件源代码程序&#xff0c;而不必先编译它们。 例如&#xff0c;考虑以下仅添加其参数的类&#xff1a; import java.util.*; public class Add {public static void main(String[] args) {…

iShot快捷键

快捷键说明Option A选择截图区域Option Z截图光标下的窗口Option X截图整个屏幕Option D延时截图Option W开始录屏/结束录屏

日期格式化为yyyymmdd_Excel小技巧——如何将多行日期快速转换为数字文本

Hello~大家好&#xff0c;本来计划这一期给大家介绍一下如何根据身份证号快速获得性别信息的小技巧&#xff0c;但是在上一节内容如何将数字快速转换成日期格式发布后&#xff0c;就有朋友私信问我&#xff0c;可不可以再介绍一下将日期快速转换成数字文本的方法呢&#xff1f;…

顽皮狗 多线程分享_谁去过顽皮,谁去过尼斯? 圣诞老人为您提供Java 11建议!...

顽皮狗 多线程分享有没有想过圣诞老人如何为世界各地的孩子们提供节日礼物&#xff1f; 有20亿个孩子&#xff0c;每个孩子都有自己的愿望清单&#xff0c;他会在24小时内完成。 这意味着平均每个孩子43微秒&#xff0c;他需要检查每个孩子是否顽皮或好。 您无需再怀疑了。 我…

2021新交规超速处罚规定

1.时速超过限定时速不到10%的&#xff0c;给予警告&#xff1b; 2.在限速为50公里以下的道路&#xff0c;时速超过限定时速10%以上不到20%的&#xff08;必须低于时速55公里&#xff09;&#xff0c;处50元罚款&#xff1b;超过限定时速20%以上不到50%的&#xff0c;处100元罚…

处理接口超时_架构设计|异步请求如何同步处理?

本文创意来自一次业务需求&#xff0c;这次需要接入一个第三方外部服务。由于这个服务只提供异步 API&#xff0c;为了不影响现有系统同步处理的方式&#xff0c;接入该外部服务时&#xff0c;应用对外屏蔽这种差异&#xff0c;内部实现异步请求同步。全文摘要&#xff1a;异步…

使用Spring Boot和MongoDB构建一个React式应用程序

“我喜欢编写身份验证和授权代码。” 〜从来没有Java开发人员。 厌倦了一次又一次地建立相同的登录屏幕&#xff1f; 尝试使用Okta API进行托管身份验证&#xff0c;授权和多因素身份验证。 如果您要处理大量流数据&#xff0c;React式应用程序可让您更好地扩展。 它们是非阻塞…

jax-ws和jax-rs_JAX-RS和OpenAPI对Hypermedia API的支持:任重而道远

jax-ws和jax-rs或早或晚&#xff0c;大多数积极使用REST&#xff08;ful&#xff09; Web服务和API的开发人员偶然发现了这种真正的外星事物&#xff0c;即HATEOAS &#xff1a; 超文本作为应用程序状态的引擎 。 对HATEOAS是什么及其与REST的关系的好奇最终将导致发现Richards…