多线程-定时任务线程池源码

定时任务线程池

ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。

定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务的执行顺序,把最先执行的任务放到第一个,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞完成后,去执行任务。对于周期性执行的任务,执行完成后,会计算下一次启动时间,然后把任务重新提交到延迟队列。

源码分析

定时任务线程池的继承体系

定时任务线程池继承了ThreadPoolExecutor,同时实现了ScheduledExecutorService,这个接口定义了定时调度相关的功能

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {

ScheduledExecutorService:定义了定时调度的功能

public interface ScheduledExecutorService extends ExecutorService {// 定时调度1次的任务public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);// 定时调度1次的任务,有返回值public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);// 以固定频率调度的任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);// 以固定延迟调度的任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}

描述定时任务的类

描述定时任务的类:ScheduledThreadPoolExecutor的内部类ScheduledFutureTask

// ScheduledFutureTask:是定时任务线程池的内部类,封装了任务的启动时间、周期时间(隔多长时间执行一次),
// 任务在延迟队列中的索引、任务序号
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 任务的启动时间,单位是纳秒private long time;// 任务的执行周期,单位是纳秒private final long period;// 任务在队列中的索引int heapIndex;// 任务序号,通过原子类生成private final long sequenceNumber;// 持有自己的实例RunnableScheduledFuture<V> outerTask = this;// 构造方法,参数1 异步任务,参数2 结果,参数3 任务的启动时间,参数4 任务的周期时间ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}
}// ScheduledFutureTask实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {// 定时任务是否是周期性的boolean isPeriodic();
}// RunnableScheduledFuture继承了ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}// ScheduledFuture继承了Delayed,它封装了任务的延迟时间,表示任务延迟多久启动,继承
// Comparable接口,因为在循环队列中排序时需要用到
public interface Delayed extends Comparable<Delayed> {// 返回对象相关的延迟时长long getDelay(TimeUnit unit);
}

ScheduledFutureTask的继承体系上:

  • 继承了FutureTask,代表一个异步任务。 // FutureTask在之前学习Callable接口的时候已经接触到了。
  • 实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果,同时间接实现了Delayed接口,用于排序

定时任务主要有两个参数来描述任务的执行时间:

  • time:任务的启动时间,这是一个绝对时间,描述到了某个时间点,任务应该启动执行
  • period:任务的周期,描述两个任务之间间隔多长时间

延迟队列

定时任务线程池和普通线程池不一样的地方,在于它使用延迟队列,定时任务中封装好了任务的执行时间,任务的调度工作,是由延迟队列来执行的。

延迟队列的结构:

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {// 队列内部使用的数组private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private int size = 0;// 等待队列中第一个任务的线程private Thread leader = null;
}

这里的结构很简单,主要是它的计算比较复杂,任务之间需要排序,组成一个最小堆,最先执行的任务放到前面,以及元素出队的方法、元素入队的方法。

工作机制

这里以scheduleAtFixedRate为例, 固定频率的定时任务,讲解定时任务的执行流程,其它类型的定时任务也类似。

提交定时任务

通过scheduleAtFixedRate方法,创建定时任务:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =// 构建ScheduledFutureTask实例new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),  // 任务的触发时间unit.toNanos(period));            // 任务的执行周期RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}

第一步:创建ScheduledFutureTask的实例

ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);  // 调用父类FutureTask的构造方法this.time = ns;  // 任务的触发时间,这是一个绝对时间this.period = period;  // 任务的执行周期,表示两个任务之间间隔多长时间this.sequenceNumber = sequencer.getAndIncrement();  // 当前任务的序列号,通过原子类生成
}// 父类FutureTask的构造方法
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

第二步:添加任务到延迟队列,并且新建线程执行任务

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);  // 任务添加到延迟队列if (isShutdown() &&   // 判断线程池是否关闭,如果关闭,移除任务!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart(); // 向线程池中添加线程,确保有线程执行任务}
}// 添加任务到延迟队列的方法
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  // 异步任务实例final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {  // 如果当前任务是第一个任务,要唤醒在条件变量上阻塞的线程leader = null;available.signal();}} finally {lock.unlock();}return true;
}// 向线程池中添加线程
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)// 注意,这里Worker实例的第一个参数 firstTask,值为null,表示Worker只可以从队列中获取任务addWorker(null, true);else if (wc == 0)addWorker(null, false);
}

从阻塞队列中获取任务

新线程启动后,会执行Worker类的run方法 (参考ThreadPoolExecutor的执行原理),在run方法中,会从阻塞队列中获取异步任务,定时任务使用的阻塞队列是DelayedWorkQueue。

从阻塞队列中获取任务的方法:

// take方法没有指定超时时长,类似的,还有指定了超时时长的poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁try {for (;;) {RunnableScheduledFuture<?> first = queue[0];  // 队列中的第一个元素,if (first == null)available.await();  // 如果队列为空,阻塞else {// 获取第一个任务的延迟时间,表示延迟指定时长后,开始执行任务long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);  // 如果延迟时长小于等于0,证明可以开始执行任务了first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;  // 设置当前线程为leadertry {available.awaitNanos(delay);  // 如果延迟时长大于0,那么线程进入阻塞状态并且指定时长} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}// 获取任务的延迟时间
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);  // time是绝对时间,它减去now(),就是相对时间,也就是延迟时间
}

阻塞队列负责按照任务的执行时间,对任务进行排序,最先执行的任务放在队列的第一位,这里没有展示排序的逻辑,排序是按照最小堆的逻辑来排序的。线程从阻塞队列中获取任务,会计算第一个任务的延迟时长,然后等待指定时长,在执行任务,这就是定时任务可以在指定时长后启动的逻辑,如果延迟队列中没有任务,线程会一直等待,同时,向延迟队列中添加任务时,如果发现当前任务是第一个任务,会唤醒正在等待的线程。

执行定时任务

从延迟队列中获取到任务后,线程会执行ScheduledFutureTask的run方法,因为ScheduledFutureTask间接继承了Runnable接口

// ScheduledFutureTask的run方法
public void run() {boolean periodic = isPeriodic();  // 任务是否是周期性的if (!canRunInCurrentRunState(periodic))  // 判断线程池是否还在运行cancel(false);else if (!periodic)  // 如果不是周期性的任务,直接执行,这里执行的是FutureTask中的run方法ScheduledFutureTask.super.run();// 如果是周期性的任务,执行完之后计算下次执行时间,然后重新提交任务实例到阻塞队列else if (ScheduledFutureTask.super.runAndReset()) {  setNextRunTime();  // 计算下次任务的执行时间,这个方法会更新任务的time属性reExecutePeriodic(outerTask);  // 再次向线程池中提交任务实例}
}// 判断任务是否是周期性的
public boolean isPeriodic() {// 参考之前ScheduledFutureTask的实例的创建过程,period代表任务的执行周期,// 这个值不为0,证明是周期性的任务return period != 0;
}

1、为什么执行任务时会执行FutureTask中的run方法?因为在FutureTask的run方法中,会调用用户编写的run方法,也就是异步任务,ScheduledFutureTask中的run方法负责整体流程。

2、如果是周期性的任务,执行FutureTask中的runAndReset方法,它和run方法有什么不同?它执行完任务后,不会设置返回值,同时会把任务设置为初始状态,这个方法是为了执行多次的异步任务而设计的。

3、周期性的任务,执行完任务后,如何计算下次任务的执行时间?

// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period;  // 任务的执行周期if (p > 0)time += p;    // 当前时间加上周期,固定频率(scheduleAtFixedRate)的定时任务走这段逻辑elsetime = triggerTime(-p);  // 更新任务的执行时间,固定延迟(scheduleWithFixedDelay)的定时任务走这段逻辑
}

这里需要解释一下,time属性是任务的执行时间,是一个绝对时间,表示到了某个点,例如 2020-01-01 00:00:00 这个固定的点,启动定时任务,period,是两个任务之间的间隔时长,例如,每隔10分钟,执行一次定时任务。对于固定频率的定时任务和固定延迟的定时任务,它们在创建任务实例的过程中稍有不同:

// 创建固定频率的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),   // 计算time的值unit.toNanos(period));             // 计算period的值,注意,period是正数
// 创建固定延迟的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),   // 计算time的值unit.toNanos(-delay));             // 计算period的值,注意,period是负数

一个执行周期是正数,一个执行周期时负数,所以在计算任务下次执行时间的方法中,它们会走向不同的链路,把该方法重新粘贴到下面,重新再看:

// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period;  // 任务的执行周期if (p > 0)time += p;    // 固定频率,上次任务执行时间加上执行周期,就是下次执行时间elsetime = triggerTime(-p);  // 固定延迟,当前时间加上执行周期,就是下次执行时间, // triggerTime方法在下面
}long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

所以,固定频率执行的任务,如果上一次的任务执行超时,直到下一次任务该启动时还没有执行完成,一旦上一次任务执行完成,下一次任务立刻启动,因为上一次任务执行完成后,计算下一次任务的执行时间,发现执行时间在当前时间之前,所以线程获取任务时不会阻塞,会立刻取出任务,然后执行。固定延迟的任务,是根据上次任务结束时间来计算下次任务开始时间的,所以它是固定延迟。

总结

定时任务的执行过程:

  • 第一步:向线程池提交定时任务(schedule方法)
  • 第二步:创建定时任务实例(ScheduleFutureTask实例)
  • 第三步:把定时任务添加到延迟队列,延迟队列会对任务进行排序,最先执行的定时任务放到开头
  • 第四步:新建线程,从延迟队列中获取定时任务,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞结束后,执行定时任务
  • 第五步:执行完成后,计算下一次任务的执行时间,然后重新向线程池中提交任务实例

Q&A

只执行一次的定时任务和周期性的定时任务,分别是如何执行的?

周期性的定时任务,在执行完一次后,会计算下次任务的启动时间,然后再次向阻塞队列中提交任务实例,只执行一次的定时任务则不会

线程是如何在指定时间启动定时任务的?

阻塞队列会把需要最先执行的定时任务放在队列的开头,线程会获取第一个任务的延迟时间,然后根据延迟时间休眠指定时长,休眠结束后,执行定时任务。

按照固定频率执行的定时任务和按照固定延迟执行的定时任务,分别是如何执行的?

按照固定频率执行的定时任务,下次任务的执行时间 = 上次任务的启动时间 + 周期

按照固定延迟执行的定时任务,下次任务的执行时间 = 上次任务的结束时间 + 周期

依据不同的计算方式,计算出下次任务的执行时间,然后提交任务实例到队列中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/71446.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统盘还原成正常U盘

选择格式化,等格式化完毕就完了 点击还原设备的默认值格式化就完了

Linux系统下安装配置 Nginx 超详细图文教程_linux安装nginx

#安装当前路径下所有安装包 rpm -Uvh *.rpm --nodeps --force2.2、安装Nginx 找到Nginx的安装包进行解压 #解压安装包 tar -zxvf nginx-1.24.0.tar.gz进入解压之后的nginx目录下&#xff1a; #进入nginx目录 cd /usr/local/nginx-1.24.0执行配置脚本&#xff0c;–prefix是指…

【玩转正则表达式】替换与正则表达式的结合

在文本处理和数据分析的领域中&#xff0c;正则表达式&#xff08;Regular Expressions&#xff0c;简称regex&#xff09;是一种强大而灵活的工具。它能够帮助用户匹配、搜索、替换和编辑字符串中的特定模式。而光能匹配可能在实际使用场景中还不是很足够&#xff0c;有时候我…

如何排查服务器内存泄漏问题

服务器内存泄漏是一种常见的问题&#xff0c;可能导致系统性能下降甚至系统崩溃。以下是一般情况下用于排查服务器内存泄漏问题的步骤&#xff1a; 排查服务器内存泄漏问题的步骤&#xff1a; 监控系统资源&#xff1a; 使用系统监控工具&#xff08;如top、htop、free&#x…

Linux使用笔记:Find Tree 命令

Tree 命令的使用 使用-I 参数&#xff0c;过滤掉不想展未的目录或文件使用-L参数&#xff0c;指定展示的目录层级个数 arsenaltxzq1899:~/Workspace/vue-application$ tree -I node_modules/ -I public/ -L 2 . ├── components.json ├── Dockerfile ├── ecosystem.c…

山东大学计算机科学与技术学院软件工程实验日志

--- Author: "Inori_333" Date: 2025-03-04 --- 实验一 团队建立、阅读开源软件 1.队伍创建与分工 队伍最终确定由5人组成&#xff0c;小组成员之间进行了高效的沟通&#xff0c;并确定了各自的负责的部分内容。 2.代码复现与分析 写在前面&#xff1a;由于“…

深入 Vue.js 组件开发:从基础到实践

深入 Vue.js 组件开发&#xff1a;从基础到实践 Vue.js 作为一款卓越的前端框架&#xff0c;其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中&#xff0c;我们将深入探讨 Vue.js 组件开发的各个方面&#xff0c;从基础概念到高级技巧&#xff0c;助…

历年杭州电子科技大学计算机考研复试上机真题

历年杭州电子科技大学计算机考研复试机试真题 在线评测&#xff1a;https://app2098.acapp.acwing.com.cn/ 最大公约数和最小公倍数 题目描述 输入两个正整数 m 和 n&#xff0c;求其最大公约数和最小公倍数。 输入格式 两个整数 输出格式 最大公约数&#xff0c;最小公…

单片机入门(一)

一、单片机的内部资源 Flash&#xff1a;程序存储空间 RAM&#xff1a; 数据存储空间 SFR: 特殊功能寄存器的简称。它存在于单片机的内部存储空间中&#xff0c;用于存储控制单片机各种硬件模块工作的数据。这些寄存器具有特定的功能&#xff0c;控制着单片机的各种操作&#…

将 MySQL 数据高效导入 Redis

目录 1. RESP 协议 &#xff08;1&#xff09;RESP 协议的优点 &#xff08;2&#xff09;RESP 支持的 5 种数据类型 &#xff08;3&#xff09;RESP 的用途 &#xff08;4&#xff09;RESP 协议示例 2. redis-cli 的 pipe 模式 &#xff08;1&#xff09;pipe 模式的作…

mybatis映射文件相关的知识点总结

mybatis映射文件相关的知识点总结 mybatis官网地址 英文版&#xff1a;https://mybatis.org/mybatis-3/index.html 中文版&#xff1a;https://mybatis.p2hp.com/ 搭建环境 /* SQLyog Ultimate v10.00 Beta1 MySQL - 8.0.30 : Database - mybatis-label *****************…

SQLAlchemy系列教程:SQLAlchemy快速入门示例项目

SQLAlchemy是与数据库交互的Python开发人员不可或缺的库。这个强大的ORM允许使用python结构进行简单的数据库操作。设置过程很简单&#xff0c;并且允许可扩展的数据库应用程序开发。本文通过入门项目完整介绍SQLAlchemy的应用过程&#xff0c;包括安装依赖包&#xff0c;创建连…

HTML + CSS 题目

1.说说你对盒子模型的理解? 一、是什么 对一个文档进行布局的时候&#xff0c;浏览器渲染引擎会根据标准之一的css基础盒模型&#xff0c;将所有元素表示为一个个矩形的盒子。 一个盒子由四个部分组成: content&#xff0c;padding&#xff0c;border&#xff0c;margin 下…

el-table 手动选择展示列

需求&#xff1a; 由于表格的列过多,用滚动条进行滚动对比数据不方便&#xff0c;所以提出&#xff0c;手动选择展示列 实现思路&#xff1a; 表格默认展示所有字段&#xff0c;每个字段通过 v-if 属性来进行判断是否显示&#xff1b;点击设置按钮图标(表格右上角&#xff0…

家政预约小程序用例图分析

在和客户进行需求沟通的时候&#xff0c;除了使用常规的问答的形式&#xff0c;我还使用图形化工具更深入的沟通。比如借助UML的用例图来开展系统分析&#xff0c;并且按照角色详细拆解了家政预约小程序的各个用例。在分析阶段思考的越多&#xff0c;沟通的越多&#xff0c;在系…

详解 scanf 和 printf(占位符、printf、scanf的返回值、printf的输出格式、scanf的输入格式)

一、printf 1.基本用法 •printf 的作⽤是将参数⽂本输出到屏幕 •printf print f &#xff0c;printf 代表输出打印&#xff0c;f代表 format &#xff08;格式化&#xff09;&#xff0c;format表⽰可以定制输出⽂本的格式,所以详细来说printf是将数据按照指定格式打印 …

爬蟲動態IP代理與數據採集穩定性

對於從事爬蟲開發的人來說&#xff0c;IP代理的使用直接影響了爬蟲的效率和穩定性。爬蟲的本質是模擬用戶訪問網站&#xff0c;通過抓取網頁內容來獲取所需數據。但大多數網站為了保護自己的數據或防止濫用&#xff0c;會設置諸如IP訪問頻率限制、登錄驗證甚至封禁等反爬蟲措施…

js之原型及原型链

如果js没有构造函数 首先不考虑构造函数这个鬼东西&#xff0c;当他不存在。 这个时候&#xff0c;创建对象的方式就是 <script type"text/javascript">var dog {name: hachi,age: 3}</script> 然后在浏览器上观察该对象&#xff0c;可以看到该对象包…

Xcode 运行真机失败

错误提示&#xff1a; iPhone xxx is not available because it is unpaired. Pair with the device in the Xcode Devices Window, and respond to any pairing prompts on the device. 处理方法&#xff1a; 把Xcode关闭&#xff0c;手机断开数据线&#xff0c;打开终端&…

BIO、NIO、AIO、Netty从简单理解到使用

Java编程中BIO、NIO、AIO是三种不同的I/O&#xff08;输入/输出&#xff09;模型&#xff0c;它们代表了不同的I/O处理方式。 Netty就是基于Java的NIO&#xff08;New Input/Output&#xff09;类库编写的一个高性能、异步事件驱动的网络应用程序框架&#xff0c;用于快速开发可…