Java的Future机制详解

Java的Future机制详解

  • 一、为什么出现Future机制
  • 二、Future的相关类图
    • 2.1 Future 接口
    • 2.2 FutureTask 类
  • 三、FutureTask的使用方法
  • 四、FutureTask源码分析
    • 4.1 state字段
    • 4.2 其他变量
    • 4.4 构造函数
    • 4.5 run方法及其他

一、为什么出现Future机制

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

在这里插入图片描述

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

二、Future的相关类图

2.1 Future 接口

首先,我们需要清楚,Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在这里插入图片描述

接口定义行为,我们通过上图可以看到实现Future接口的子类会具有哪些行为:

  • 我们可以取消这个执行逻辑,如果这个逻辑已经正在执行,提供可选的参数来控制是否取消已经正在执行的逻辑。
  • 我们可以判断执行逻辑是否已经被取消。
  • 我们可以判断执行逻辑是否已经执行完成。
  • 我们可以获取执行逻辑的执行结果。
  • 我们可以允许在一定时间内去等待获取执行结果,如果超过这个时间,抛TimeoutException。

2.2 FutureTask 类

类图如下:

在这里插入图片描述

FutureTask是Future的具体实现。FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Runnable 和 Future 接口。所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

三、FutureTask的使用方法

举个例子,假设我们要执行一个算法,算法需要两个输入 input1 和 input2, 但是input1和input2需要经过一个非常耗时的运算才能得出。由于算法必须要两个输入都存在,才能给出输出,所以我们必须等待两个输入的产生。接下来就模仿一下这个过程。

package src;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class FutureTaskTest {public static void main(String[] args) throws InterruptedException, ExecutionException {long starttime = System.currentTimeMillis();//input2生成, 需要耗费3秒FutureTask<Integer> input2_futuretask = new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(3000);return 5;}});new Thread(input2_futuretask).start();//input1生成,需要耗费2秒FutureTask<Integer> input1_futuretask = new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(2000);return 3;}});new Thread(input1_futuretask).start();Integer integer2 = input2_futuretask.get();Integer integer1 = input1_futuretask.get();System.out.println(algorithm(integer1, integer2));long endtime = System.currentTimeMillis();System.out.println("用时:" + String.valueOf(endtime - starttime));}//这是我们要执行的算法public static int algorithm(int input, int input2) {return input + input2;}
}

输出结果:

在这里插入图片描述

我们可以看到用时3001毫秒,与最费时的input2生成时间差不多。
注意,我们在程序中生成input1时,也让线程休眠了2秒,但是结果不是3+2。说明FutureTask是被异步执行了。

四、FutureTask源码分析

4.1 state字段

volatile修饰的state字段;表示FutureTask当前所处的状态。可能的转换过程见注释。

	/*** Possible state transitions:* NEW -> COMPLETING -> NORMAL(创建到正常运行结束的状态变化轨迹)* NEW -> COMPLETING -> EXCEPTIONAL(创建到异常运行结束的状态变化轨迹)* NEW -> CANCELLED  (创建到取消的状态变化轨迹)* NEW -> INTERRUPTING -> INTERRUPTED(创建到中断结束的状态变化轨迹)*/private volatile int state;// NEW 新建状态,表示这个 FutureTask还没有开始运行private static final int NEW          = 0;// COMPLETING 完成状态, 表示 FutureTask 任务已经计算完毕了// 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。private static final int COMPLETING   = 1;// FutureTask 任务完结,正常完成,没有发生异常private static final int NORMAL       = 2;// FutureTask 任务完结,因为发生异常。private static final int EXCEPTIONAL  = 3;// FutureTask 任务完结,因为取消任务private static final int CANCELLED    = 4;// FutureTask 任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求private static final int INTERRUPTING = 5;// FutureTask 任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求private static final int INTERRUPTED  = 6;

4.2 其他变量

    /** 任务 */private Callable<V> callable;/** 储存结果*/private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程*/private volatile Thread runner;/** get方法阻塞的线程队列 */private volatile WaitNode waiters;//FutureTask的内部类,get方法的等待队列static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}

4.3 CAS工具初始化

    // Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}

这段代码是为了后面使用CAS而准备的。可以这么理解:

一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的,用这个UNSAFE.objectFieldOffset()方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关。

4.4 构造函数

FutureTask有两个构造函数:

public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

这两个构造函数区别在于,如果使用第一个构造函数最后获取线程实行结果就是callable的执行的返回结果;而如果使用第二个构造函数那么最后获取线程实行结果就是参数中的result,接下来让我们看一下FutureTask的run方法。

同时两个构造函数都把当前状态设置为NEW。

4.5 run方法及其他

构造完FutureTask后,会把它当做线程的参数传进去,然后线程就会运行它的run方法。所以我们先来看一下run方法:

public void run() {//如果状态不是new,或者runner旧值不为null(已经启动过了),就结束if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable; // 这里的callable是从构造方法里面传人的if (c != null && state == NEW) {V result;boolean ran;try {result = c.call(); //执行任务,并将结果保存在result字段里。ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex); // 保存call方法抛出的异常}if (ran)set(result); // 保存call方法的执行结果}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

其中,catch语句中的setException(ex)如下:

//发生异常时设置state和outcome
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion();// 唤醒get()方法阻塞的线程}}

而正常完成时,set(result);方法如下:

//正常完成时,设置state和outcome
protected void set(V v) {
//正常完成时,NEW->COMPLETING->NORMALif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); // 唤醒get方法阻塞的线程}}

这两个set方法中,都是用到了finishCompletion()去唤醒get方法阻塞的线程。下面来看看这个方法:

//移除并唤醒所有等待的线程,调用done,并清空callable
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t); //唤醒线程}//接下来的这几句代码是将当前节点剥离出队列,然后将q指向下一个等待节点。被剥离的节点由于thread和next都为null,所以会被GC回收。WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done(); //这个是空的方法,子类可以覆盖,实现回调的功能。callable = null;        // to reduce footprint}

好,到这里我们把运行以及设置结果的流程分析完了。那接下来看一下怎么获得执行结果把。也就是get方法。

get方法有两个,一个是有超时时间设置,另一个没有超时时间设置。

    public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {// get(timeout, unit) 也很简单, 主要还是在 awaitDone里面if(unit == null){throw new NullPointerException();}int s = state;// 判断state状态是否 <= Completing, 调用awaitDone进行自旋等待if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){throw new TimeoutException();}// 根据state的值进行返回结果或抛出异常return report(s);}

两个get方法都用到了awaitDone()。这个方法的作用是: 等待任务执行完成、被中断或超时。看一下源码:

    //等待完成,可能是是中断、异常、正常完成,timed:true,考虑等待时长,false:不考虑等待时长private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L; //如果设置了超时时间WaitNode q = null;boolean queued = false;for (;;) {/***  有优先级顺序*  1、如果线程已中断,则直接将当前节点q从waiters中移出*  2、如果state已经是最终状态了,则直接返回state*  3、如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可*  4、如果发现尚未有节点,则创建节点*  5、如果当前节点尚未入队,则将当前节点放到waiters中的首节点,并替换旧的waiters*  6、线程被阻塞指定时间后再唤醒*  7、线程一直被阻塞直到被其他线程唤醒**/if (Thread.interrupted()) {// 1removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {// 2if (q != null)q.thread = null;return s; }else if (s == COMPLETING) // 3Thread.yield();else if (q == null) // 4q = new WaitNode();else if (!queued) // 5queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {// 6nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q); //从waiters中移出节点qreturn state; }LockSupport.parkNanos(this, nanos); }else // 7LockSupport.park(this);}}

接下来看下removeWaiter()移除等待节点的源码:

    private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null; // 将移除的节点的thread=null, 为移除做标示retry:for (;;) {          // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;//通过 thread 判断当前 q 是否是需要移除的 q节点,因为我们刚才标示过了if (q.thread != null) pred = q; //当不是我们要移除的节点,就往下走else if (pred != null) {//当p.thread==null时,到这里。下面这句话,相当于把q从队列移除。pred.next = s;//pred.thread == null 这种情况是在多线程进行并发 removeWaiter 时产生的//此时正好移除节点 node 和 pred, 所以loop跳到retry, 从新进行这个过程。想象一下,如果在并发的情况下,其他线程把pred的线程置为空了。那说明这个链表不应该包含pred了。所以我们需要跳到retry从新开始。if (pred.thread == null) // check for racecontinue retry;}//到这步说明p.thread==null 并且 pred==null。说明node是头结点。else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}

最后在get方法中调用report(s),根据状态s的不同进行返回结果或抛出异常。

    private V report(int s) throws ExecutionException {Object x = outcome;  //之前我们set的时候,已经设置过这个值了。所以直接用。if (s == NORMAL)return (V)x;  //正常执行结束,返回结果if (s >= CANCELLED)throw new CancellationException(); //被取消或中断了,就抛异常。throw new ExecutionException((Throwable)x);}

以上就是FutureTask的源码分析。

最后总结一下:

FutureTask既可以当做Runnable也可以当做Future。线程通过执行FutureTask的run方法,将正常运行的结果放入FutureTask类的result变量中。使用get方法可以阻塞直到获得结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/819382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python进阶编程 --- 2.MySQL、pymysql、PySpark

文章目录 第一章&#xff1a;SQL基础入门1.1 数据库数据库如何存储数据 1.2 数据库和SQL的关系1.3 MySQL版本1.4 命令提示符内使用MySQL1.5 SQL概述1.5.1 SQL语言分类1.5.2 SQL语言特性 1.6 DDL库管理表管理 1.7 DML - 数据操作1.8 DQL - 查询和计算数据1.8.1 基础数据查询1.8.…

HDFS Lease详解

本文主要介绍hdfs lease的设计以及实现。 写在前面 https://www.cnblogs.com/jhcelue/p/6783076.html https://blog.csdn.net/yexiguafu/article/details/118890014 https://www.jianshu.com/p/33e1a5a2b876 https://blog.csdn.net/breakout_alex/article/details/1014569…

行业模板|DataEase批发零售大屏模板推荐

DataEase开源数据可视化分析平台于2022年6月发布模板市场&#xff08;https://templates-de.fit2cloud.com&#xff09;&#xff0c;并于2024年1月新增适用于DataEase v2版本的模板分类。模板市场旨在为DataEase用户提供专业、美观、拿来即用的大屏模板&#xff0c;方便用户根据…

【Canvas与艺术】绘制斜置黄色三角biohazard标志

【关键点】 径向渐变色和文字按角度偏转。 【成果图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>使用Html5/Canvas绘制…

spring-cloud微服务gateway

核心部分&#xff1a;routes(路由)&#xff0c; predicates(断言)&#xff0c;filters(过滤器) id&#xff1a;可以理解为是这组配置的一个id值&#xff0c;请保证他的唯一的&#xff0c;可以设置为和服务名一致 uri&#xff1a;可以理解为是通过条件匹配之后需要路由到&…

2024 CKA 基础操作教程(十二)

题目内容 考点相关内容分析 Pods Pod 是可以在 Kubernetes 中创建和管理的、最小的可部署的计算单元。 Pod 是 Kubernetes 中的原子单元&#xff0c;用于封装应用程序的一个或多个容器、存储资源、唯一的网络 IP&#xff0c;以及有关如何运行容器的选项。Pod 提供了一个共享的…

一些实用的工具网站

200 css渐变底色 https://webgradients.com/ 200动画效果复制 https://css-loaders.com/classic/ 二次贝塞尔曲线 https://blogs.sitepointstatic.com/examples/tech/canvas-curves/bezier-curve.html 三次贝塞尔曲线 https://blogs.sitepointstatic.com/examples/tech/c…

Day92:系统攻防-WindowsLinux远程探针本地自检任意执行权限提升入口点

目录 操作系统-远程漏扫-Nessus&Nexpose&Goby Nessus Nexpose 知识点&#xff1a; 1、远程漏扫-Nessus&Nexpose&Goby 2、本地漏扫-Wesng&Tiquan&Suggester 3、利用场景-远程利用&本地利用&利用条件 操作系统-远程漏扫-Nessus&Nexpose&a…

Python——详细解析目标检测xml格式标注转换为txt格式

本文简述了目标检测xml格式标注的内容&#xff0c;以及yolo系列模型所需的txt格式标注的内容。并提供了一个简单的&#xff0c;可以将xml格式标注文件转换为txt格式标注文件的python脚本。 1. xml格式文件内容 <size>标签下为图片信息&#xff0c;包括 <width> …

​​​​​​​iOS配置隐私清单文件App Privacy Configuration

推送到TestFlight后邮件收到警告信息如下&#xff0c;主要关于新的隐私政策需要补充&#xff1a; Hello, We noticed one or more issues with a recent submission for TestFlight review for the following app: AABBCC Version 10.10.10 Build 10 Although submission for …

servlet的三个重要的类(httpServlet 、httpServletRequst、 httpServletResponse)

一、httpServlet 写一个servlet代码一般都是要继承httpServlet 这个类&#xff0c;然后重写里面的方法 但是它有一个特点&#xff0c;根据之前写的代码&#xff0c;我们发现好像没有写main方法也能正常执行。 原因是&#xff1a;这个代码不是直接运行的&#xff0c;而是放到…

文章解读与仿真程序复现思路——中国电机工程学报EI\CSCD\北大核心《应用图论建模输电网的电力现货市场出清模型》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

JavaSE图书管理系统实战

代码仓库地址&#xff1a;Java图书管理系统 1.前言 该项目将JavaSE的封装继承多态三大特性&#xff0c;使用了大量面向对象的操作&#xff0c;有利于巩固理解 &#xff08;1&#xff09;实现效果 2.实现步骤 第一步先把框架搭建起来&#xff0c;即创建出人&#xff1a;管理员和…

RocketMQ 02 功能大纲介绍

RocketMQ 02 主流的MQ有很多&#xff0c;比如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等。 之前阿里巴巴也是使用ActiveMQ&#xff0c;随着业务发展&#xff0c;ActiveMQ IO 模块出现瓶颈&#xff0c;后来阿里巴巴 通过一系列优化但是还是不能很好的解决&#xff0c;之后…

MySQL底层架构

MySQL底层架构 连接器 验证客户端连接的用户名密码、校验权限、维持和管理连接。 客户端如果超过 wailt_timeout 没有动静&#xff0c;连接器会主动将它断开&#xff0c;此时客户端再次发送请求的话&#xff0c;就会收到错误&#xff1a;lost connection to MySQL server dur…

【Modelsim】保持波形格式重编译and波形的保存与查看

文章目录 保持原波形格式重编译波形的保持与查看保存波形打开工程查看波形 保持原波形格式重编译 Modelsim 仿真设置好波形格式后&#xff0c;若需要修改代码并保持原波形格式重新查看波形&#xff0c;只需将文件重新编译后仿真即可。 1.修改代码后Project页面的代码状态变成…

外网如何访问内网数据库?

在当今信息时代&#xff0c;随着互联网的快速发展&#xff0c;很多企业和个人都面临着外网访问内网数据库的需求。外网访问内网数据库可以实现远程操作&#xff0c;方便用户在任何地点使用移动设备进行数据管理和查询。本文将介绍一种名为【天联】的组网产品&#xff0c;它是一…

SkyWalking 为所有的API接口增加 tag

背景胡扯 线上接口报错&#xff0c;接着被 SkyWalking 抓到&#xff0c;然后 SkyWalking 触发告警&#xff0c;最后老板你&#xff0c;让你辛苦一下&#xff0c;在明早上班前把这个bug 改了&#xff0c;并告诉你你是全公司的希望。谁说不是呢&#xff1f;为公司业务保驾护航&a…

C语言 | 自定义类型:struct结构体(详解)

目录&#xff1a; --前言 1. 结构体类型的定义与基础结构 2. 结构体的使用 3. typedef相关 4. 结构体的自引用 5. 结构体内存对齐 6. 结构体传参 7. 结构体实现位段 --前言&#xff1a; c语言中内置类型&#xff0c;也有自定义的类型。 例如&#xff1a;内置类型 in…

windows应急响应基础知识

一、系统排查 1、系统详细信息 systeminfo2、网络链接 netstat -ano LISTENING 服务启动后首先处于侦听 ESTABLISHED 建立连接。表示两台机器正在通信。 CLOSE_WAIT 对方主动关闭连接或者网络异常导致连接中断&#xff0c;这时我方的状态会变成CLOSE_WAIT 此时我方要调用…