最详细的CompletableFuture异步编程-进阶篇

1、异步任务的交互

异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。

1.1 applyToEither

applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)

演示案例:使用最先完成的异步任务的结果

public class ApplyToEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 开启异步任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {int x = new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog("任务1耗时:" + x + "秒");return x;});// 开启异步任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {int y = new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog("任务2耗时:" + y + "秒");return y;});
​// 哪些异步任务的结果先到达,就使用哪个异步任务的结果CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {CommonUtils.printThreadLog("最先到达的结果:" + result);return result;}));
​// 主线程休眠4秒,等待所有异步任务完成CommonUtils.sleepSecond(4);Integer ret = future.get();CommonUtils.printThreadLog("ret = " + ret);}
}
​

速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

以下是applyToEither 和其对应的异步回调版本

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)

1.2 acceptEither

acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)  
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

演示案例:使用最先完成的异步任务的结果

public class AcceptEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 异步任务交互CommonUtils.printThreadLog("main start");// 开启异步任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {int x = new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog("任务1耗时:" + x + "秒");return x;});
​// 开启异步任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {int y = new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog("任务2耗时:" + y + "秒");return y;});
​// 哪些异步任务的结果先到达,就使用哪个异步任务的结果future1.acceptEither(future2,result -> {CommonUtils.printThreadLog("最先到达的结果:" + result);});
​// 主线程休眠4秒,等待所有异步任务完成CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

1.3 runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)
提示
异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

2、get() 和 join() 区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

那么该如何选用呢?请看如下案例:

public class GetOrJoinDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";});
​String ret = null;// 抛出检查时异常,必须处理try {ret = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("ret = " + ret);
​// 抛出运行时异常,可以不处理ret = future.join();System.out.println("ret = " + ret);}
}

使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

3、ParallelStream VS CompletableFuture

CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?

同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?

我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势

需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时

定义一个MyTask类,来模拟耗时的长任务

public class MyTask {private int duration;
​public MyTask(int duration) {this.duration = duration;}
​// 模拟耗时的长任务public int doWork() {CommonUtils.printThreadLog("doWork");CommonUtils.sleepSecond(duration);return duration;}
}

同时,我们创建10个任务,每个持续1秒。

IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);
}).collect(Collectors.toList());

3.1 并行流的局限

我们先使用串行执行,让所有的任务都在主线程 main 中执行。

public class SequenceDemo {public static void main(String[] args) {// 方案一:在主线程中使用串行执行// step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行tasks集合中的每个任务,统计总耗时long start = System.currentTimeMillis();List<Integer> result = tasks.stream().map(myTask -> {return myTask.doWork();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);}
}

它花费了10秒, 因为每个任务在主线程一个接一个的执行。

因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

public class ParallelDemo {public static void main(String[] args) {// 方案二:使用并行流// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行10个MyTask,统计总耗时long start = System.currentTimeMillis();List<Integer> results = tasks.parallelStream().map(myTask -> {return myTask.doWork();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);}
}

它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

3.2 CompletableFuture 在流式操作的优势

让我们看看使用CompletableFuture是否执行的更有效率

public class CompletableFutureDemo {public static void main(String[] args) {// 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时// 方案三:使用CompletableFuture// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 根据MyTask对象构建10个耗时的异步任务long start = System.currentTimeMillis();List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {return CompletableFuture.supplyAsync(() -> {return myTask.doWork();});}).collect(Collectors.toList());// step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中List<Integer> results = futures.stream().map(future -> {return future.join();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);}
}

运行发现,两者使用的时间大致一样。能否进一步优化呢?

CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

public class CompletableFutureDemo2 {public static void main(String[] args) {// 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时// 方案三:使用CompletableFuture// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// 准备线程池final int N_CPU = Runtime.getRuntime().availableProcessors();// 设置线程池的数量最少是10个,最大是16个ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));// step 2: 根据MyTask对象构建10个耗时的异步任务long start = System.currentTimeMillis();List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {return CompletableFuture.supplyAsync(() -> {return myTask.doWork();},executor);}).collect(Collectors.toList());// step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中List<Integer> results = futures.stream().map(future -> {return future.join();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);// 关闭线程池executor.shutdown();}
}

测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

3.3 合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能

问题1:如何选用 CompletableFuture 和 ParallelStream ?

如果你的任务是IO密集型的,你应该使用CompletableFuture;

如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。

问题2:IO密集型任务和CPU密集型任务的区别?

CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。

CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短

IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。

简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。

IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。

问题3:既然要控制线程池中线程的数量,多少合适呢?

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1

如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

注意的是:以上给的是参考值,详细配置超出本次课程的范围,选不赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/75419.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C/C++操作加密与不加密的zip文件

为了后续的方便操作zip文件&#xff0c; 将所有的操作封装成了一个动态库了。 /*** \description 从压缩包文件中解压出指定的文件到指定的目录.* \author sunsz* \date 2023/09/09**/ LIBZIP_API int UnpackFile(const char* password, char zipfilename[], char filename_…

如何使用ArcGIS去除卫星影像上的云

虽然目前发布的地图都是对云量进行过筛选&#xff08;一般低于20%&#xff09;&#xff0c;但是还是有可能会遇到有云的情况&#xff08;特别是下载历史影像的时候&#xff09;&#xff0c;那么这些云应该怎么去除呢&#xff0c;我们可以尝试使用ArcGIS进行处理。 识别像素 将…

pb:垃圾收集函数

PB系统函数大全 - 垃圾收集函数 垃圾收集函数让应用程序能够控制何时开始收集系统产生的垃圾。 1、GarbageCollect() 功 能:强制系统立即开始收集垃圾。 语 法:GarbageCollect ( ) 返回值:无。 用 法:该函数强制系统立即开始收集垃圾。PowerBuilder将查找并标识未…

Python一行命令搭建HTTP服务器并外网访问 - 内网穿透

文章目录 1.前言2.本地http服务器搭建2.1.Python的安装和设置2.2.Python服务器设置和测试 3.cpolar的安装和注册3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 Python作为热度比较高的编程语言&#xff0c;其语法简单且语句清晰&#xff0c;而且python有…

CC-Proxy配置实验室网络代理服务器

1.安装CCProxy 2.关闭自己电脑防火墙&#xff0c;打开CCProxy软件 3.使用MobaXterm远程工具来连接服务器&#xff0c; 输入&#xff1a; export http_proxy"自己电脑的ip地址:808" export https_proxy"自己电脑的ip地址:808"之后可以输入如下命令来检查…

TCP三次握手和四次挥手

目录 TCP连接建立 问题思考 1.为什么要三次握手&#xff1f; 2.三次握手一定要保证成功吗&#xff1f; TCP连接释放 问题思考 ​ 1.理解TIME-WAIT状态 2.理解CLOSE-WAIT状态 TCP连接建立 TCP建立连接的过程叫作握手&#xff0c;握手需要在客户和服务器之间交换三个TCP…

详解Typescript中的泛型

泛型 在 TypeScript 中&#xff0c;泛型&#xff08;Generics&#xff09;是一种在编写可重用、灵活的代码时使用的工具。它允许我们在定义函数、类或接口时使用类型参数&#xff0c;以便在使用时指定具体的类型。 通过使用泛型&#xff0c;我们可以编写更通用的代码&#xff…

tomcat的优化

TOMCAT的优化 tomcat的优化主要是从三个方面进行的&#xff0c;第一个是 tomcat配置的优化第二是对JVM虚拟机的优化第三是对Linux系统内核的优化&#xff0c;配置文件中的优化主要在tomcat中server.xml文件夹内 tomcat配置文件的优化 1、 maxThreads&#xff1a; Tomcat 使用…

调用微信公众号创建会员卡接口报错48001

调用文档&#xff1a;1.新版会员卡介绍 | 微信开放文档 接口地址&#xff1a; HTTP请求方式: POSTURL:https://api.weixin.qq.com/card/create?access_tokenACCESS_TOKEN 错误描述&#xff1a;48001 {"errcode":48001,"errmsg":"api unauthorized hi…

Fastjson反序列化漏洞

文章目录 一、概念二、Fastjson-历史漏洞三、漏洞原理四、Fastjson特征五、Fastjson1.2.47漏洞复现1.搭建环境2.漏洞验证&#xff08;利用 dnslog&#xff09;3.漏洞利用1)Fastjson反弹shell2)启动HTTP服务器3)启动LDAP服务4)启动shell反弹监听5)Burp发送反弹shell 一、概念 啥…

网站优化搜索引擎与关键词

网站优化搜索引擎与关键词 人们不应该高估搜索引擎的智商。这不利于seo的研究&#xff0c;事实上&#xff0c;搜索引擎是非常愚蠢的&#xff0c;让我们举一个非常简单的例子&#xff0c;你在搜索引擎中输入“教师”这个词&#xff0c;搜索引擎就会给出一个准确的搜索列表。我们…

U3D外包开发框架及特点

U3D&#xff08;Unity3D&#xff09;是一款流行的跨平台游戏开发引擎&#xff0c;用于创建2D和3D游戏以及交互性应用程序。U3D有许多常用的开发框架和库&#xff0c;这些框架和库可以扩展其功能&#xff0c;使开发人员更轻松地构建游戏和应用程序。以下是一些常用的U3D开发框架…

免费的敏捷项目管理,scrum开发软件

Scrum中非常强调公开、透明、直接有效的沟通&#xff0c;这也是“可视化的管理工具”在敏捷开发中如此重要的原因之一。通过“可视化的管理工具”让所有人直观的看到需求&#xff0c;故事&#xff0c;任务之间的流转状态&#xff0c;可以使团队成员更加快速适应敏捷开发流程。 …

3D打印,耗材注意事项

目前我的3D打印机只用到PETG和PLA俩种耗材材料 PLA 喷头温度:200或200热床温度40~50移动速度&#xff0c;最好100%挤出速度和移动速度最好一致 PETG PETG这个材料比PLA感觉上更软一点,商家说PLA比PETG温度更低 喷头温度:220~250℃热床温度最好80℃移动速度&#xff0c;最好5…

【计算机网络】 确认应答机制与超时重传

文章目录 ACK机制——确认应答机制超时重传 ACK机制——确认应答机制 当我们客户端发送了一个数据&#xff0c;seq是1100&#xff0c;那么服务端在收到时就会回一个ack101的ACK包&#xff0c;代表101之前的包我都收到了&#xff0c;下面请你从101继续发送。然后客户端就会发送1…

2.linux的组管理和权限管理

一、组管理 1.Linux组的介绍 在linux中每个用户必须属于一个组&#xff0c;不能独立于组外。在linux中每个文件有所有者&#xff0c;所有组&#xff0c;其他组的概念 ①所有者 ②所在组 ③其他组 ④改变用户所在组 2.文件/目录 所有者 哪个用户创建了文件&#xff0c;就…

【webrtc】时间戳reordered 重新排序、环绕的判断

inter_frame_delay_.CalculateDelay( ) 计算传输抖动值 webrtc源码分析(6)- jitter delay计算详解 大神对这块的使用,内涵外延,有深入细致的讲解。输入rtp时间戳、到达时间(当前系统时间?)-- 在rtp的时间戳的处理上,inter_frame_delay_.CalculateDelay( ) 计算传输抖动值…

《向量数据库指南》——向量数据库的底层原理是什么?

向量数据库的底层实现原理可以根据具体的数据库系统和索引方法而有所不同。不同的向量数据库可能使用不同的数据结构和算法来支持高效的向量存储和相似性搜索。以下是一些常见的底层实现原理和概念: 1、向量存储: 数据结构:向量数据库通常使用数据结构来存储向量数据。这些数…

Java测试(10)--- selenium

1.定位一组元素 &#xff08;1&#xff09;如何打开本地的HTML页面 拼成一个URL &#xff1a;file: /// 文件的绝对路径 import os os.path.abspath(文件的绝对路径&#xff09; &#xff08;2&#xff09;先定位出同一类元素&#xff08;tag name&#xff0c;name&…

DGA行为转变引发了对网络安全的担忧

Akamai的研究人员发现&#xff0c;在域名系统(DNS)流量数据中&#xff0c;动态种子域生成算法(DGA)家族的行为发生了令人担忧的变化。这一发现揭示了恶意行为者如何调整他们的策略来延长他们的指挥与控制(C2)通信通道的寿命&#xff0c;以保护他们的僵尸网络。 从技术角度来看…