Java常用任务调度

JAVA 任务调度技术

前言

在日常开发过程中,我们经常会遇到周期性执行某段代码的场景。比如定期同步订单,定期更新商品信息,定期发送消息等。这些重复执行的代码可以抽象为一个任务(Task)。 一个Task的特点如下:

  1. 包含需要执行的业务逻辑。
  2. 能够在指定的时间重复(一次或者多次)执行。

围绕Task的特点,开发者们开发了不同的调度框架或者中间件,满足日常开发中的使用。

以下表格列出了部分实现。

技术来源使用场景依赖第三方
TimerJDK自带目前比较少使用
ScheduledExecutorServiceJDK自带基于线程池技术,常用于中间件中
Spring TaskSpring-contextSpring 项目,常用于单体应用开发
XXL-JOB国产开源中间件可用于分布式项目调度依赖mysql
QuartzOpenSymphony 开源组织一些中间件常常基于Quartz开发分布式需要依赖数据库
Elastic-Job当当⽹开源可用于分布式项目调度需要依赖ZooKeeper + Mesos
Apache DolphinScheduler易观开源大数据任务调度

本文对部分技术的实现进行了介绍,从简单到复杂,从具体到抽象,希望可以让大家在面对一个任务调度框架时可以快速抓住要点,不再陌生。

1. Timer

java.util.Timer位于JDK的rt.jar包下,始于jdk1.3,是JDK自带的任务调度器,虽然目前基本不再使用Timer来进行任务调度,但是Timer设计简单,理解起来比较容易。而且后续ScheduledExecutorService的基本原理和Timer基本类似,因此需要对Timer进行一个详细的了解。

Timer的核心类比较少,只需要以下4个类即可。

功能说明
Timer入口类,整个调度器的组织者,相当于其他框架的.定义了多个提交task的方法
TimerThread任务调度器后台线程执行一个轮询方法,核心方法为mainLoop()。
TimerTask抽象类,其实现类包装业务逻辑核心属性:nextExecutionTime下一个执行时间点。
TaskQueue任务队列优先队列,头部节点为最早执行的Task

以上类都处于java.util包下。

1.1 存储任务的数据结构-平衡二叉堆

一个任务框架,需要可以容纳在不同时间执行的任务,因此必须要有一个容器来缓存或者持久化提交的任务。 那么在多任务的场景下,我们如何挑选出需要执行的任务呢?以下对一些场景进行分析:

方案1. 对所有的任务进行遍历,对于下次执行时间小于当前时间的任务,执行业务逻辑。

时间复杂度为O(n),一些时间没到的任务也被遍历到了。性能不好。

方案2. 先对所有的任务,按照下次执行时间的大小进行排序,每次只取头部任务。

对2进行分析可以发现,只要保证队列头部为最早执行的元素即可,对于其他任务,因为还不需要执行,是否有序并不重要。

方案3. 采用优先队列,头部为权值最小,每次取权值即可。

从以上分析可以看出,一个任务框架,可以采用优先队列来容纳提交的任务。Timer正是如此,它的基本数据结构为平衡二叉堆(balanced binary heap)。想要理解Timer,需要对平衡二叉堆进行了解。
详细可以参考 【Java基础】JAVA中优先队列详解 。 摘抄如下:

1.1.1 基本结构

Java平衡二叉堆的定义为:

任意一个非叶子节点的权值,都不大于其左右子节点的权值

可以使用下面的在线数据模拟器进行

平衡二叉堆数据结构模拟器

结构示例如下:

在这里插入图片描述

从图中可以看出,可以通过数组来实现平衡二叉堆。每一个节点的编号,可以使用数组的下标来表示。

  1. 数组的第一个元素为二叉树的根节点,在所有节点中权值最小。
  2. 父子节点之间的关系可以用以下算法表示。这个算法很重要。在新增元素或者删除元素的时候,都需要使用到该算法。

leftNo = parentNo * 2+1
rightNo = parentNo * 2+2
parentNo = (nodeNo-1)/2

在优先队列中,一般只使用到新增元素和删除根节点元素,因此只对这两个算法进行介绍。

1.1.2 新增元素

在这里插入图片描述

步骤如下:

1.先在队尾新增一个元素。如果数组长度不够就先扩容。
2.如果有父节点,则与父节点进行对比。如果权值比父节点小,则与父节点交换位置。
3.重复步骤2,直到没有父节点或者比父节点小则完成新增。2~3步一般称作siftUp。

//siftUp()
private void siftUp(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;//parentNo = (nodeNo-1)/2Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0)//调用比较器的比较方法break;queue[k] = e;k = parent;}queue[k] = x;
}

通过以上步骤。可以保证所有的父节点权值都小于子节点的权值。

1.1.3 删除队首元素

在这里插入图片描述

步骤如下:

1.删除数组的第一个元素。
2.将队尾的元素放置到头部位置,记为一个父节点。
3.通过比较获取子节点中较小的一个,并与父节点比较,如果父节点大于子节点,则交换位子。
4.重复步骤3,直到父节点小于等于子节点或者已经没有子节点,则结束比较。这个过程一般称作siftDown。

//siftDown()
private void siftDown(int k, E x) {int half = size >>> 1;while (k < half) {//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标int child = (k << 1) + 1;//leftNo = parentNo*2+1Object c = queue[child];int right = child + 1;if (right < size &&comparator.compare((E) c, (E) queue[right]) > 0)c = queue[child = right];if (comparator.compare(x, (E) c) <= 0)break;queue[k] = c;//然后用c取代原来的值k = child;}queue[k] = x;
}

1.2 Timer 核心执行逻辑

查看Timer类的结构,可以看到提交任务的方法有6个

在这里插入图片描述

代表了6种不同的场景

delay: 延迟毫秒数period: 时间间隔

1.延迟delay毫秒后,执行任务一次。
2.延迟delay毫秒后,周期性执行任务,两次任务之间间隔period毫秒。
3.延迟delay毫秒后,以固定频率执行任务,两次任务之间间隔period毫秒。

4.指定的时间Date开始,执行任务一次。
5.指定的时间Date开始,周期性执行任务,两次任务之间间隔period毫秒。
6.指定的时间Date开始,以固定频率执行任务,两次任务之间间隔period毫秒。

其中需要特别说明的是2和3,5和6之间的区别,也就是schedule和scheduleAtFixedRate的区别。具体看下表:

方法名下一个执行时间nextExecutionTime说明
schedulecurrentTime + delay当前序堵塞时,会影响到后续任务的下次计划时间,
下次任务会推迟执行,对于丢失的时间不会补上任务
scheduleAtFixedRatenextExecutionTime + delay当堵塞时,影响到后续任务的计划时间,
任务的次数不会丢失,快速补上调度次数

注: currentTime:当前时间,nextExecutionTime:下次执行时间,delay:时间间隔

1.2.1 Timer简单的例子:

Demo先行,先看一个简单的例子,有个初步的印象。

public class Application {public static void main(String[] args) {//初始化一个timer对象Timer timer = new Timer();//创建抽象类TimerTask的实例。TimerTask myTask = new TimerTask() {@Overridepublic void run() {System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");}};//提交任务,延迟1秒执行,每两秒执行一次timer.schedule(myTask, 1000, 1000*2);}
}

结果:

执行run方法,time=25秒
执行run方法,time=27秒
执行run方法,time=29秒
执行run方法,time=31秒
执行run方法,time=33秒
执行run方法,time=35秒
执行run方法,time=37秒

原始代码分析如下,只挑选了核心代码展示。

1.2.2 Timer 类源码分析

Timer 是整个任务架构的组织者,也是入口,因此首先看Timer的代码。

public class Timer {// TaskQueue 实现了一个优先队列private final TaskQueue queue = new TaskQueue();// TimerThread继承了Thread。同时组合了TaskQueue。当Timer实例化时。会启动TimerThread实例的的start()方法。启动线程处理定时任务。private final TimerThread thread = new TimerThread(queue);//构造函数。做了一件事情,及时启动了TimerThread线程,处理队列数据。public Timer(String name, boolean isDaemon) {thread.setName(name);thread.setDaemon(isDaemon);thread.start();}//延迟delay毫秒后执行一次任务public void schedule(TimerTask task, long delay) {sched(task, System.currentTimeMillis()+delay, 0);}//延迟delay毫秒,以固定频率执行定时任务。下次的执行时间为当前系统时间(System.currentTimeMillis())+|period|//当发生阻塞时,有可能丢失调度次数public void schedule(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, -period);}//延迟delay毫秒,以固定频率执行定时任务。与schedule不同。下次执行时间为当前本应执行时间(nextExecutionTime)+period//当发生阻塞时,不会丢失调度次数。public void scheduleAtFixedRate(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, period);}/*** 核心新增定时任务的方法。* @param task 为实现了业务的任务类。* @param time 为下次执行任务的时间。  * @param period  为0时表示不会重复执行。当period !=0时表示会周期性执行*/private void sched(TimerTask task, long time, long period) {//以下代码省去了部分校验代码synchronized(queue) {synchronized(task.lock) {task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}queue.add(task);if (queue.getMin() == task)//说明加入task之前。队列为空,处于wait状态,需要唤醒。或者还有一种情况,就是加入的task处于头部,需要立即处理。有可能此时线程处于等待状态,需要唤醒。queue.notify();}}//省略。。。}

1.2.3 TaskQueue 类源码分析

TaskQueue 本质是一个平衡二叉堆,1.1已经有所介绍。

class TaskQueue {/*** Priority queue represented as a balanced binary heap: the two children* of queue[n] are queue[2*n] and queue[2*n+1].  The priority queue is* ordered on the nextExecutionTime field: The TimerTask with the lowest* nextExecutionTime is in queue[1] (assuming the queue is nonempty).  For* each node n in the heap, and each descendant of n, d,* n.nextExecutionTime <= d.nextExecutionTime.* 注释已经讲明,就是一个平衡二叉堆。*/private TimerTask[] queue = new TimerTask[128];void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);//加到队尾queue[++size] = task;//向上排序fixUp(size);}//新增元素时向上排序private void fixUp(int k) {while (k > 1) {int j = k >> 1;if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}//刷新void rescheduleMin(long newTime) {queue[1].nextExecutionTime = newTime;fixDown(1);}//删除元素时向下排序private void fixDown(int k) {int j;while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}//省略其他
}

1.2.3 TimerThread 类源码分析

从Timer代码可以看到,当实例化Timer时,将会启动一个TimerThread线程,具体作用是不断轮询队列的头部元素,然后执行业务代码。核心代码如下(省略部分代码):

class TimerThread extends Thread {//标识线程已经启用。当为false时,跳出循环。boolean newTasksMayBeScheduled = true;private TaskQueue queue;public void run() {try {mainLoop();} finally {//忽略。。。}}//执行轮询private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled) {//while循环 让出锁,等待新任务加入queue.wait();}if (queue.isEmpty()){//此时newTasksMayBeScheduled队列已死。退出循环break; }long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {//判断状态if (task.state == TimerTask.CANCELLED) {//检查已经取消的任务,移除。queue.removeMin();continue;  }currentTime = System.currentTimeMillis();//当前task计划的执行时间。executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {//当前任务的计划执行时间<=当前时间,则允许执行。if (task.period == 0) { // Non-repeating, remove//只执行一次的任务,移除。queue.removeMin();task.state = TimerTask.EXECUTED;} else { //修改队列中最小的一个task的时间为下一个执行时间,并且重新排序。queue.rescheduleMin(task.period<0 ? currentTime   - task.period: executionTime + task.period);}}}if (!taskFired) // Task hasn't yet fired; wait//最近时间执行的任务还未到时间,需要等待。让出锁。queue.wait(executionTime - currentTime);}if (taskFired)  //执行业务逻辑。这里可以看到是同步执行的。如果业务逻辑耗时较长,会影响后续任务的执行。task.run();} catch(InterruptedException e) {}}}
}

TimerThread 中的 mainLoop 方法+TaskQueue队列,看起来非常熟悉,在queue为空的时候,会调用queue.wait()方法。直到Timer在新增元素时,调用了queue.notify()。这些代码和BlockQueue原理非常像。

1.2.4 TimerTask 类源码分析

public abstract class TimerTask implements Runnable {//执行状态int state = VIRGIN;//下次执行时间,如果是重复的任务,在任务执行前会被更新成下次的执行时间。long nextExecutionTime;// 毫秒数,用于重复执行的时间间隔。证书标识以固定频率调度。负数标识以固定的时间延迟调度。0代表不会重复执行。long period = 0;//抽象犯法,用于实现业务public abstract void run();//省略部分代码。
}

1.3 Timer 调度示意图

通过对Timer的四个核心类,我们可以得出以下调度示意图。

在这里插入图片描述

1.4 Timer 总结

可以看到,Timer 是JDK自带的任务调度器。实现的逻辑如下

  • 实现一个优先队列。队列的头部为最先需要执行的任务。
  • 启动一个后台线程,不断从优先队列中获取待执行的任务。
  • 执行任务。

通过使用Timer,我们可以方便地在一个线程中执行多个计划任务。但是也有一定的局限性,主要是多个任务之间相互影响:

  • 所有的任务都在一个线程中执行,如果前面的任务耗时比较长,则会影响后续任务的执行。
  • 假设前序任务抛出了非InterruptedException的异常,则整个队列将会被清空,任务调度终止。

基于以上局限性,在实际应用中,使用Timer使用得并不多。常用的为 ScheduledExecutorService。ScheduledExecutorService与Timer 的最大区别是将任务提交给线程池处理。

2. ScheduledExecutorService

在前一章节可以了解到,在 Timer 类中所有的任务都是同步执行,如果前序任务发生了阻塞或者耗时比较长,那么后续任务就容易被阻塞到。

JDK在1.5之后J引入了 ThreadPoolExecutor 线程池技术。 线程池技术的逻辑机构图如下:

在这里插入图片描述

(参考聊聊Java进阶之并发基础技术—线程池剖析)

从上图可以看到,线程池也是将不同的任务加入到一个队列中(BlockingQueue),等待着多个线程的调用。与Timer的调度很相似,只是最大区别是线程池队列是被多个线程调用的。

因此JDK在1.5引入了ThreadPoolExecutor的同时,也重新编写了一套新任务调度器-ScheduledExecutorService,具体实现类为ScheduledThreadPoolExecutor,用于任务的调度。

其继承关系如下:
在这里插入图片描述

从继承关系中可以看出,ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,拥有线程池的所有功能。

ScheduledThreadPoolExecutor 在实现上与Timer是相似的,都是通过实现一个优先队列来管理任务,同时这个优先队列又是一个阻塞队列,在获取第一个任务后,只有到了执行时间才会返回任务。一个比较大的改进在于,获取任务后不是直接执行代码,而是交给线程池来调度。

2.1核心类

ScheduledExecutorService 的一些核心类如下:

功能说明
ScheduledExecutorService抽象类
Executors.DelegatedScheduledExecutorService包装类用于包装 ScheduledThreadPoolExecutor,
只暴露关键方法
ScheduledThreadPoolExecutor核心执行器实现类,真正执行调度逻辑的地方
ScheduledThreadPoolExecutor.DelayedWorkQueue延迟阻塞队列任务周期执行的核心方法在这个类中实现
ScheduledThreadPoolExecutor.ScheduledFutureTask队列中的对象是ScheduledThreadPoolExecutor的成员内部类

以上类处于java.util.concurrent包下

2.2 ScheduledExecutorService 的简单用法和介绍

public class Application {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();Runnable runnable = ()->{System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");};scheduledExecutorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);}
}

查看 ScheduledExecutorService 的结构,

在这里插入图片描述

提交任务的方法共有4个,与Timer不同的是,可以提交Callable类型的任务。

public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

2.3 ScheduledThreadPoolExecutor 类

ScheduledExecutorService只定义了相应的规范,还需要具体类进行实现。

通过查看 Executors.newSingleThreadScheduledExecutor(),具体实现如下

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}

DelegatedScheduledExecutorService 只是一个包装类,核心逻辑在 ScheduledThreadPoolExecutor。
其构造函数调用了父类的构造函数的时候,传入了 DelayedWorkQueue 延时阻塞队列。

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {// 构造函数1public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}// 构造函数2public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}// 构造函数3public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);}// 构造函数4public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}//ScheduledThreadPoolExecutor 提供了4个构造函数,每个构造函数都调用了父类ThreadPoolExecutor的构造函数,这四个经典参数中,DelayedWorkQueue是不变的,说明它是实现任务队列的关键。//向队列提交任务。// 查看源码,所有提交任务的方法,经过包装后最终会调用delayedExecute,像队列中新增任务。并调用父类的ensurePrestart()方法确认线程池已经准备就绪。private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
}

到这时ScheduledExecutorService的秘密浮出水面,核心在于 DelayedWorkQueue。

2.4 DelayedWorkQueue 类实现

DelayedWorkQueue 类的定义如下。可以看到,DelayedWorkQueue的实现了BlockingQueue接口,可以传入JDK的线程池进行消费。

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
}

但是具体何如实现队列中的任务,在指定的时间被调度呢?

首先来看一下他的队列实现方式。


static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {/** A DelayedWorkQueue is based on a heap-based data structure* like those in DelayQueue and PriorityQueue, except that* every ScheduledFutureTask also records its index into the* heap array. * ...* * 从以上注释中可以知道 DelayedWorkQueue 本质上也是一个基于堆的数据结构。*///初始化了一个数组private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];//用于新增元素时向上移动元素private void siftUp(int k, RunnableScheduledFuture<?> key) {//省略代码}//去除元素时向下移动元素private void siftDown(int k, RunnableScheduledFuture<?> key) {//省略代码}private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();//查看阻塞队列的take方法public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {//1、从队首获取待执行的任务。此任务在队列中最早执行。RunnableScheduledFuture<?> first = queue[0];if (first == null)// 2 队列中还没有元素,等待。available.await();else {//3 delay=下次执行时间-当前时间。当<=0说明需要被执行。long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {// 4 删除队列的第一个个元素并重新向下排序。返回任务return finishPoll(first);}first = null; // don't retain ref while waiting  等待时去除引用if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {//5 当前线程等待,直到被唤醒或者等待时长结束available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}/*** 查看finishPoll方法。* * 使用最后一个元素替换掉当前元素,并且重新向下排序。* 注意,这时第一个元素已经从队列中去除,这一点与Timer的实现方式不同。* Timer是修改时间了之后,从上往下重新排序。只需要排序一次。* ScheduledExecutorService执行一个定时任务,需要进行两次排序。第一次是获取了task,第二次是真正执行task的时候。* * @param f* @return*/private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)//把队列头部换成x,向下排序siftDown(0, x);setIndex(f, -1);return f;}
}

具体流程如下:
在这里插入图片描述

2.5 ScheduledFutureTask 类

从上一节DelayedWorkQueue类中的take方法和finishPoll方法可知,在线程池获取task后,已经从队列中移走,那么对于重复执行的队列怎么办呢?那就是在线程池执行run方法前,重新将task加到队列中。

// 注意 ScheduledFutureTask 属于  ScheduledThreadPoolExecutor 的成员内部类,因此可以使用ScheduledThreadPoolExecutor方法和成员变量。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {/** 下次执行时间 */private long time;/** 周期性执行任务的时间间隔 分为0、正、负值。*/private final long period;ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}/**Overrides FutureTask version so as to reset/requeue if periodic. * 重写了 FutureTask 的run方法,主要是增加了重置下个调度时间以及重新排序*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))//判断当前状态能不能运行cancel(false);else if (!periodic)//非周期性的任务,只执行一次ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {//设置下次执行时间setNextRunTime();//直接调用外部类的方法。重新将任务加入到队列中。reExecutePeriodic(outerTask);}}}//重新将任务加入到队列中void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {//往队列加任务super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}
}

2.6 总结

通过分析源码可以看出,ScheduledExecutorService 是通过实现一个优先队列来存储和调度任务的。从原理上来说是和Timer是类似的。可以认为是Timer 的升级版,新增了线程池执行任务的功能。

在这里插入图片描述

ScheduledExecutorService 和 Timer 比较

框架相同不同
Timer都使用堆作为数据结构1、同步执行任务

2、从队列头部获取任务之后,直接修改下次执行时间,直接排序
ScheduledExecutorService1、线程池异步执行任务
2、从头部获取任务后,移除当前任务 ,排序一次。在执行任务前修改时间后,再提交到队列。相当于排序两次

但是ScheduledExecutorService也有一定的局限性,那就是任务只能执行一次或者以固定的时间差周期性执行。不够灵活。

3 Spring Task

Spring Task处于spring-context项目的org.springframework.scheduling包下。可以通过注解的方式,将Spring bean中的某个方法变成一个task,非常方便。而且引入了cron表达式,使用更加灵活。

3.1 Spring Task 简单用法

新建一个maven项目,引入spring-context包

<dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.15</version>
</dependency>

新建一个启动类

@Configuration
@EnableScheduling
public class Application {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application.class);}@Scheduled(fixedRate = 1*1000)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒");}
}

启动main方法后,执行结果如下:

执行定时任务,time=18秒
执行定时任务,time=19秒
执行定时任务,time=20秒
执行定时任务,time=21秒

从以上demo中可以看出,在spring中,只需要两个注解@EnableScheduling和@Scheduled,就可以启动一个定时任务了,非常方便。

3.2 Spring Task核心类

在这里插入图片描述

初始化基本流程
在这里插入图片描述

需要注意的是,Spring Task本质上只定义了提交task的抽象类,并没有提供具体的调度的实现。
为了实现开箱可用,Spring task将ScheduledExecutorService适配成了默认的实现,但是可以根据具体的需要,替换成其他实现。

3.3 Trigger接口及实现类

Trigger排在最前面介绍,是因为Timer和ScheduledExcecutorService是没有Trigger的概念的,都是再在各自的Task类中拥有一个下次执行时间的属性。Timer的TimerTask是private long nextExecutionTime。ScheduledFutureTask的是private long time。为了让任务的触发时间更加灵活,引入了Trigger的概念。
Trigger的本意为触发器,枪的扳机。因此相对的,当一个任务被触发时,常常用fire这个词。
Trigger的本质是封装了获取下一次执行时间的逻辑。

public interface Trigger {// 获取下一次执行时间@NullableDate nextExecutionTime(TriggerContext triggerContext);
}//上下文,用于存储一些时间变量
public interface TriggerContext {default Clock getClock() {return Clock.systemDefaultZone();}//上一次计划执行时间@NullableDate lastScheduledExecutionTime();// 上一次具体执行时间@NullableDate lastActualExecutionTime();//完成时间@NullableDate lastCompletionTime();}
//TriggerContext实现类很简单
public class SimpleTriggerContext implements TriggerContext {private final Clock clock;@Nullableprivate volatile Date lastScheduledExecutionTime;@Nullableprivate volatile Date lastActualExecutionTime;@Nullableprivate volatile Date lastCompletionTime;
}

spring task中提供了两个Trigger实现
在这里插入图片描述
周期性触发器-目前没有看到使用的地方

public class PeriodicTrigger implements Trigger {private final long period;private final TimeUnit timeUnit;private volatile long initialDelay;private volatile boolean fixedRate;@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date lastExecution = triggerContext.lastScheduledExecutionTime();Date lastCompletion = triggerContext.lastCompletionTime();if (lastExecution == null || lastCompletion == null) {return new Date(triggerContext.getClock().millis() + this.initialDelay);}if (this.fixedRate) {return new Date(lastExecution.getTime() + this.period);}return new Date(lastCompletion.getTime() + this.period);}}

CronTrigger-支持cron表达式

public class CronTrigger implements Trigger {private final CronExpression expression;private final ZoneId zoneId;//提供了多个构造函数 完成 expression 和 zoneId的复制public CronTrigger(String expression) {this(expression, ZoneId.systemDefault());}public CronTrigger(String expression, TimeZone timeZone) {this(expression, timeZone.toZoneId());}public CronTrigger(String expression, ZoneId zoneId) {Assert.hasLength(expression, "Expression must not be empty");Assert.notNull(zoneId, "ZoneId must not be null");this.expression = CronExpression.parse(expression);this.zoneId = zoneId;}@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date date = triggerContext.lastCompletionTime();if (date != null) {Date scheduled = triggerContext.lastScheduledExecutionTime();if (scheduled != null && date.before(scheduled)) {// Previous task apparently executed too early...// Let's simply use the last calculated execution time then,// in order to prevent accidental re-fires in the same second.date = scheduled;}}else {date = new Date(triggerContext.getClock().millis());}ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), this.zoneId);ZonedDateTime next = this.expression.next(dateTime);return (next != null ? Date.from(next.toInstant()) : null);}
}

目前,在Spring task中,只有cron类型的任务才用到Trigger,其他的固定固定周期执行和固定频率执行,还是用传统的模式。

3.4 Task父类以及子类

Task是所有Spring task的所有父类,比较简单,就是定义了runnable变量,用来存储业务逻辑,而其他周期性执行,延迟执行等参数留给子类实现

public class Task {private final Runnable runnable;
}

其子类如下:
在这里插入图片描述

从图上可知,Task的子类分为两种:

  1. Trigger触发的Task。代表为CronTask
  2. 固定间歇执行的Task。代表为FixedDelayTask和FixedRateTask。其中FixedDelayTask和FixedRateTask本身没有任何实现,只是为了区分不同的调度方式(还记得Timer中的正负period吗),具体功能都是由父类IntervalTask承担。
public class IntervalTask extends Task {private final long interval;private final long initialDelay;public IntervalTask(Runnable runnable, long interval, long initialDelay) {super(runnable);this.interval = interval;this.initialDelay = initialDelay;}
}
public class FixedRateTask extends IntervalTask {public FixedRateTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class FixedDelayTask extends IntervalTask {public FixedDelayTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class TriggerTask extends Task {private final Trigger trigger;public TriggerTask(Runnable runnable, Trigger trigger) {super(runnable);this.trigger = trigger;}
}public class CronTask extends TriggerTask {private final String expression;public CronTask(Runnable runnable, String expression) {this(runnable, new CronTrigger(expression));}
}

3.3 TaskScheduler接口逻辑

TaskScheduler 是Spring task中的任务调度接口,定义了一系列提交任务的方法,与 ScheduledExecutorService 角色相当。
方法概览如下:

public interface TaskScheduler {default Clock getClock() {return Clock.systemDefaultZone();}@NullableScheduledFuture<?> schedule(Runnable task, Trigger trigger);ScheduledFuture<?> schedule(Runnable task, Date startTime);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);//另外提供了一些default方法default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {return schedule(task, Date.from(startTime));}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {return scheduleAtFixedRate(task, period.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {return scheduleWithFixedDelay(task, Date.from(startTime), delay.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {return scheduleWithFixedDelay(task, delay.toMillis());}}

与ScheduledExecutorService的接口很类似

public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

但需要注意的是新增了一个方法。

ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

这个方法比较特殊,也是实现cron表达式的关键,依靠 Trigger。Trigger在上一节已经有介绍。
看下TaskScheduler,spring提供了三个实现类。
在这里插入图片描述
实现类 ConcurrentTaskScheduler 注解上讲的很明白,就是一个将java.util.concurrent.ScheduledExecutorService 适配成 TaskScheduler 的适配器。
其构造器如下
在这里插入图片描述

ThreadPoolTaskScheduler 则是封装了ScheduledThreadPoolExecutor。
在这里插入图片描述

因此很明显,默认情况下,Spring-task的底层就是由ScheduledExecutorService来提供实际调度的。当然也可以自己实现一个TaskScheduler的实现类,但目前看来并没有理由再造一个这样的轮子。置于为什么没有直接使用ScheduledExecutorService,一是提供了一个新的方法提交Trigger。二是方便拓展,可以自己实现一个任务调度器。

public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {@Nullableprivate static Class<?> managedScheduledExecutorServiceClass;static {try {// 需要单独引入 javax.enterprise.concurrent-api 包。默认是没有的。ManagedScheduledExecutorServicemanagedScheduledExecutorServiceClass = ClassUtils.forName("javax.enterprise.concurrent.ManagedScheduledExecutorService",ConcurrentTaskScheduler.class.getClassLoader());}catch (ClassNotFoundException ex) {// JSR-236 API not available...managedScheduledExecutorServiceClass = null;}}private ScheduledExecutorService scheduledExecutor;private boolean enterpriseConcurrentScheduler = false;@Nullableprivate ErrorHandler errorHandler;private Clock clock = Clock.systemDefaultZone();public ConcurrentTaskScheduler() {super();this.scheduledExecutor = initScheduledExecutor(null);}public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {super(scheduledExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {super(concurrentExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {if (scheduledExecutor != null) {this.scheduledExecutor = scheduledExecutor;// 当前实现类为 ManagedScheduledExecutorService的子类this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));} else {this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();this.enterpriseConcurrentScheduler = false;}return this.scheduledExecutor;}@Override@Nullablepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);} else {//默认为走到这里ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();}} catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> schedule(Runnable task, Date startTime) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.schedule(decorateTask(task, false), initialDelay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), 0, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);if (this.enterpriseConcurrentScheduler) {result = ManagedTaskBuilder.buildManagedTask(result, task.toString());}return result;}/*** Delegate that adapts a Spring Trigger to a JSR-236 Trigger.* Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.*/private class EnterpriseConcurrentTriggerScheduler {public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {@Override@Nullablepublic Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {return (trigger.nextExecutionTime(le != null ?new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :new SimpleTriggerContext()));}@Overridepublic boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {return false;}});}}}

3.4 Scheduled 注解

首先看一下Scheduled的代码,看提供了哪些功能呢。

public @interface Scheduled {//核心参数如下//1、cron表达式String cron() default "";//2、固定延迟long fixedDelay() default -1;//3、固定频率long fixedRate() default -1;//4、固定的延迟long initialDelay() default -1;//时间单位,默认是毫秒TimeUnit timeUnit() default TimeUnit.MILLISECONDS;//一下是String类型的配,方便接收配置化的数据,如${fixedDelay:10}String fixedDelayString() default "";String fixedRateString() default "";String initialDelayString() default "";}

Scheduled 中包含了任务调度的相关配置参数。 相比较ScheduledExecutorService,多了cron表达式。在任务的控制上更加灵活,不再局限于固定重复周期。

3.5 ScheduledAnnotationBeanPostProcessor 类

spring-task需要@EnableScheduling开启注解,查看其定义:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}

最重要的是@Import(SchedulingConfiguration.class),将SchedulingConfiguration注入到spring容器中

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}

SchedulingConfiguration只做了一件事情,那就是配置了ScheduledAnnotationBeanPostProcessor。
ScheduledAnnotationBeanPostProcessor实现了Spring的后置处理器,因此在Spring启动后,可以根据相应的配置或者注解,可以筛选出对应的方法,封装成为ScheduledTask,等待被调用。在Spring初始化完成后将会触发任务的调度。
首先来看下 ScheduledMethodRunnable。ScheduledAnnotationBeanPostProcessor一个重要的目标就是将注解的方法封装成为ScheduledMethodRunnable。

public class ScheduledMethodRunnable implements Runnable {private final Object target;private final Method method;public ScheduledMethodRunnable(Object target, Method method) {this.target = target;this.method = method;}@Overridepublic void run() {try {ReflectionUtils.makeAccessible(this.method);this.method.invoke(this.target);} catch (InvocationTargetException ex) {}}
}
//只保留核心代码
public class ScheduledAnnotationBeanPostProcessorimplements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {//用来缓存task和TaskScheduler调度器private final ScheduledTaskRegistrar registrar;public ScheduledAnnotationBeanPostProcessor() {this.registrar = new ScheduledTaskRegistrar();}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {// 忽略...// 查找Scheduled注解的MethodMap<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);});annotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));//忽略。。}//处理Spring beanprotected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {//将Spring bean 封装成为一个Runnable 。在执行Runnable方法时,使用反射技术 method.invoke(this.target),执行原本逻辑即可。Runnable runnable = createRunnable(bean, method);// 处理延迟时间long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}if (StringUtils.hasLength(initialDelayString)) {try {initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");}}}// 1、处理cron表达式String cron = scheduled.cron();if (StringUtils.hasText(cron)) {//处理一下时区的问题String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// At this point we don't need to differentiate between initial delay set or not anymoreif (initialDelay < 0) {initialDelay = 0;}// 2、处理// Check fixed delaylong fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());if (fixedDelay >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}// 3、处理字符串形式的Scheduled ,比如配置化${time:1000}String fixedDelayString = scheduled.fixedDelayString();//延迟if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {processedSchedule = true;try {fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}// 3 固定频率的任务long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());if (fixedRate >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}//忽略其他情况}catch (Exception e){}}protected Runnable createRunnable(Object target, Method method) {Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());return new ScheduledMethodRunnable(target, invocableMethod);}//监听容器刷新事件public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}}//这部分主要是从Spring中获取配置的 scheduler 只保留核心代码private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}//检查是否做了配置SchedulingConfigurerif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}/*** 获取调度器的核心代码*/if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try {//1、根据类型获取TaskScheduler实现类this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {try {//2、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {}}catch (NoSuchBeanDefinitionException ex) {try {//3、根据类型获取 ScheduledExecutorService 实现类this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {try {//4、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {}}catch (NoSuchBeanDefinitionException ex2) {// Giving up -> falling back to default scheduler within the registrar...}}}this.registrar.afterPropertiesSet();}
}

从 ScheduledAnnotationBeanPostProcessor 源码可以看出,经过处理后,所有的任务和执行器都存放于ScheduledTaskRegistrar中。通过调用afterPropertiesSet()来启动任务。总结来说,做了以下3件事情

1、将被注解的方法封装成为 Task。
2、从容器中查找合适的 TaskScheduler。
3、将1和2都存到ScheduledTaskRegistrar。

其中查找 TaskScheduler 的来源分为4个。

1、调用ScheduledAnnotationBeanPostProcessor实例的set方法。
2、配置SchedulingConfigurer实现类到spring容器中。
3、配置 TaskScheduler 实现类到spring容器中。
4、配置ScheduledExecutorService实现类到Spring 容器中。
5、在1-4都没有做的情况下ScheduledTaskRegistrar会直接调用 Executors.newSingleThreadScheduledExecutor()获取一个ScheduledExecutorService。

其中1-4在 ScheduledAnnotationBeanPostProcessor 中实现, 5在ScheduledTaskRegistrar中实现。

3.6 ScheduledTaskRegistrar 类

ScheduledTaskRegistrar是一个核心类,也是一个容器类,保存了所有的task的定义(这地方和xxl-job的类似)。同时也是真正将Task提交给调度器的地方。具体看以下代码。

public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {//处理器,是Spring的类private TaskScheduler taskScheduler;//默认处理器 JDKprivate ScheduledExecutorService localExecutor;//根据不同的配置,缓存了不同的类型的task。名字与Scheduled中配置基本一样。@Nullableprivate List<TriggerTask> triggerTasks;@Nullableprivate List<CronTask> cronTasks;@Nullableprivate List<IntervalTask> fixedRateTasks;@Nullableprivate List<IntervalTask> fixedDelayTasks;private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);//已经加入执行器中的任务private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);@Overridepublic void afterPropertiesSet() {scheduleTasks();}//具体将任务提交到执行器中的方法protected void scheduleTasks() {if (this.taskScheduler == null) {//当容器中没有找到执行器的时候,将会使用 ScheduledExecutorService。此时只有一个线程处理。this.localExecutor = Executors.newSingleThreadScheduledExecutor();//需要将ScheduledExecutorService 封装成为 TaskScheduler 才能够使用this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}}public ScheduledTask scheduleTriggerTask(TriggerTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addTriggerTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedRateTask(IntervalTask task) {FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task :new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedRateTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedRateTask(FixedRateTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());}}else {addFixedRateTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedDelayTask(IntervalTask task) {FixedDelayTask taskToUse = (task instanceof FixedDelayTask ? (FixedDelayTask) task :new FixedDelayTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedDelayTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());}}else {addFixedDelayTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}}

3.7 如何执行cron表达式的任务

从前面的代码中我们知道, Spring-task默认使用 ScheduledExecutorService 作为底层逻辑,但是ScheduledExecutorService并不支持cron表达式。不过可以通过将cron表达式的任务封装成ScheduledExecutorService支持的参数即可。基本思想是将任务当成一次延时任务即可,等执行完上一次任务之后,如果还有下次,则重新提交到调度器。也就是:

1、将task在封装一层成为 ReschedulingRunnable,
2、计算cron的下次执行时间与当前的时间差delay。
3、调用提交任务。让任务延迟delay执行一次,注意只执行一次。
4、执行业务 run 方法。
5、重复执行2-4即可。

这个思路与ScheduledExecutorService获取task后再提交到队列中的思路是一样的。具体代码参考 ReschedulingRunnable 类和 CronTask 类。

在这里插入图片描述

class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {// trigger 封装了cron和获取下次触发时间的类private final Trigger trigger;// 上下文private final SimpleTriggerContext triggerContext;// 执行器private final ScheduledExecutorService executor;@Nullableprivate ScheduledFuture<?> currentFuture;@Nullableprivate Date scheduledExecutionTime;private final Object triggerContextMonitor = new Object();//重新将task封装成为 ReschedulingRunnable public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock,ScheduledExecutorService executor, ErrorHandler errorHandler) {super(delegate, errorHandler);this.trigger = trigger;this.triggerContext = new SimpleTriggerContext(clock);this.executor = executor;}@Overridepublic void run() {//实际执行时间Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());//执行业务方法super.run();//完成时间Date completionTime = new Date(this.triggerContext.getClock().millis());synchronized (this.triggerContextMonitor) {//更新相关参数this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);//判断是否可以继续执行if (!obtainCurrentFuture().isCancelled()) {//继续提交任务schedule();}}}@Nullablepublic ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {//获取下次执行时间this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {//没有下次时间,终止执行return null;}long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();//提交到线程池 ScheduledExecutorService。只执行一次。this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}}//省略其他代码。。。
}

3.8 Spring task使用注意事项

通过ScheduledAnnotationBeanPostProcessor查找TaskScheduler的过程,以及ScheduledTaskRegistrar调度过程,我们知道,如果我们没有配置TaskScheduler实例,默认情况下使用 Executors.newSingleThreadScheduledExecutor()新建了一个实例,这个实例只有一个线程处理任务,在任务耗时比较高的情况下会有可能发生阻塞。最好是配置一个ScheduledExecutorService实例交给Spring管理
如:

@Configuration
@EnableScheduling
public class Application2 {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application2.class);}@Scheduled(fixedRate = 100)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒。threadName="+Thread.currentThread().getName());}@Beanpublic ScheduledExecutorService executorService(){ThreadFactory factory = new ThreadFactory() {private  int seq = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"定时任务线程池 seq="+seq++);}};ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, factory);return scheduledExecutorService;}
}

3.9 总结

spring task中任务处理器为TaskScheduler实现类,任务为Task的子类。
基本的思想还是与 ScheduledExecutorService 想类似的。在默认情况下也是使用ScheduledExecutorService作为任务的处理器。

4 XXL-JOB

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。由于是国产开发的,中文文档,而且很全面,具体使用方法可以直接看官方文档 《分布式任务调度平台XXL-JOB》-官方文档 。

fe最初XXL-JOB的底层是Quartz,后来发现Quartz比较复杂,不利于扩展和维护,因此自研了一个调度器,简单但是很实用,很值得学习。

4.1 为什么我们需要一个分布式的调度平台

前面介绍的Timer, ScheduledExcutorService 和 Spring Task,都是针对单个实例内的任务调度,在分布式部署的时候,可能会有一定的问题。比如假设有一个任务A,是给用户发送消息的,设置每一秒执行一次,如果部署了3个实例,那么就会变成每秒执行3次,调度频率随着实例的增多而增多,如果没有加全局锁,会出现重复发送的问题。此外在实际的业务中,我们还有可能需要随时JOB的调度周期,随时停止和启动一个任务等,这些操作都需要发版才能实现。因此在分布式系统中,有一个分布式调度器尤为重要。

4.1.1 分布式调度平台的设计思路

通过前面的几个任务调度工具的介绍,我们可以总结出来,一个任务调度系统,包含以下几个元素:

在这里插入图片描述

其中,Runnable(业务逻辑)可以是任意的可执行的业务逻辑,包括一个HTTP请求。如果是通过HTTP请求调度其他服务的任务,那么就实现了一个最基本的任务调度服务。

在这里插入图片描述

但是,如果我们还希望任务的调度更加灵活,比如可以随时修改调度的频率,随时停止和启动,那么就必须将触发器持久化,比如存储到Mysql等。

在这里插入图片描述

除此以外,我们还希望这个调度服务是高可用的,因此多实例部署,加一个锁就可以了

在这里插入图片描述

4.2 XXL-JOB 的模块

从github 上下载源码,可以看到XXL-JOB的核心模块分为2个,xxl-job-admin 和 xxl-job-core。另外的xxl-job-executor-samples是一个例子模块

在这里插入图片描述

核心模块的作用如下:

模块说明功能
xxl-job-admin服务端(调度中心)管理界面+任务调度
xxl-job-core客户端(执行器)在项目中引用,执行业务逻辑

从模块划分可以看出,xxl-job的任务调度和任务的执行是分开的,客户端只管执行任务,不用管任务单的调度。任务调度是由服务端执行,这样各司其职,进行了解耦,提高系统整体稳定性和扩展性

官方架构图
在这里插入图片描述

如果大家心中没有任务调度的概念,直接看官方架构图是有些吃力的,因此我做了简化,保留了核心部分,如下图:

在这里插入图片描述

从上图可以看出,XXL-JOB框架分为三个结构。

1、Mysql:存储相关信息
2、客户端:将自己注册到服务端,等待任务下发。
3、服务端:维护JOB的信息,将需要执行的JOB,通过一定的策略,找到对应的客户端地址,发送HTTP请求,客户端执行即可。

4.3 xxl-job-admin 解析

服务端核心类如下:

功能
JobScheduleHelper调度器,将需要执行的JOB提交到线程池
XxlJobInfo实体类,记录了一个任务的配置,持久化到mysql中
JobTriggerPoolHelper线程池,多个线程发送job到客户端
XxlJobTrigger触发器, 真正处理 XxlJobInfo并发送到制定的客户端
JobRegistryHelper注册中心 ,接收客户端的注册和心跳

以上类处于com.xxl.job.admin.core包

4.3.1 服务端初始化入口

admin的初始化入口在XxlJobAdminConfig,是InitializingBean的实现类,因此在spring配置文件初始化完成后就触发了XXL-JOB的初始化

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}//省略
}

从上边可以看到,真正执行初始化的类是XxlJobScheduler,初始化逻辑在init()方法中。

public class XxlJobScheduler  {//启动了一些列的线程或者线程池来来处理相关逻辑public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start  触发器,触发http请求。接收JobScheduleHelper发送过来的triggerJobTriggerPoolHelper.toStart();// admin registry monitor run  处理注册上来的服务IP地址,并监控心跳等信息JobRegistryHelper.getInstance().start();// admin fail-monitor run  生成一些任务失败的告警等JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )  // 处理执行完成的任务,并监控丢失执行器的任务记录为失败。JobCompleteHelper.getInstance().start();// admin log report start  调度统计JobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper )// 调度器JobScheduleHelper.getInstance().start();}//省略。。。
}

服务端初始化流程图如下:

在这里插入图片描述

XXL-JOB服务端在初始化过程中启动了多个后台线程或者线程池,用于异步处理多项任务。

4.3.2 JobScheduleHelper 类

从前面对任务调度的介绍可以看出,一个任务调度器,离不开

1.带有执行时间的任务列表
2.轮询执行任务的调度器

XXL-JOB 也不例外。其中JobScheduleHelper类就属于轮询执行任务的调度器,包含了任务调用的基本逻辑,属于必看的类

具体代码如下


public class JobScheduleHelper {//预读5秒public static final long PRE_READ_MS = 5000;    // pre read//时间轮刻度-任务ID映射表private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {// 按照默认配置 (快200+慢100)* 20 = 6000int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();//数据库连接Connection conn = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();conn.setAutoCommit(false);//加锁preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、预读 查询数据库,获取下次执行时间 <= 当前时间+5秒 的所有JOBlong nowTime = System.currentTimeMillis();//SELECT * FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time <= #{maxNextTime} ORDER BY id ASC LIMIT #{pagesize}List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、遍历处理JOB,看是直接提交给线程池还是先提交到 time-ring后再提交给线程池。for (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、超时>5秒以上,当做错失触发时机// 1、超时触发策略MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);}// 2、更新时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、超时时间在5秒以内// 1、提交到线程池JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);// 2、刷新一次时间refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {//下次执行在5秒内,说明下次循环还有它,可以再预读一次,直接提交到时间轮,提高想能// 1、计算时间轮刻度int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、再更新一次时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、未到执行时间// 1、计算时间轮刻度(60秒时间轮)int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮线程pushTimeRing(ringSecond, jobInfo.getId());// 3、刷新refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、将修改了下次执行时间的任务存到数据库中for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}} finally {//省略处理数据库连接}long cost = System.currentTimeMillis()-start;//省略。。。}}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {//时间轮代码,忽略}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}//忽略其他代码
}

源码逻辑解析
在这里插入图片描述

从以上逻辑看,XXL-JOB的核心逻辑与JDK的 ScheduledExecutorService 是基本类似的。都是先从一个队列(xxl-job是使用mysql排序)中取出JOB,然后提交给线程池处理。

但有一点区别:XXL-JOB是从数据库读取数据,因此为了提高性能,做了一个预读5秒的变化。未到时间执行的job提交给时间轮,再由时间轮提交给线程池处理。

4.3.3 XxlJobTrigger 类

经过JobScheduleHelper调度,job的参数会被提交的线程池,线程池由JobTriggerPoolHelper实现,比较简单,不再描述,然后最终会使用 XxlJobTrigger 是触发执行job的地方。

public class XxlJobTrigger {public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {//1、 从数据库获取job配置XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {return;}if (executorParam != null) {//如果传参就覆盖jobInfo.setExecutorParam(executorParam);}int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 2 从数据库获取分组信息(本质就是获取接收job的地址)XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressListif (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);//传参的话就覆盖group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {//分片参数,这里意思是只发给一个地址。shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}/*** @param group                     job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount  纯粹是为了打日志* @param triggerType          为了打日志* @param index                     sharding index* @param total                     sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){//阻塞策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy//路由策略 就是如何发给客户端,如第一个,最后一个,一致性哈希ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy//分片参数 日志记录用String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());//记录日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);// 2、组装参数TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {//广播模式,获取指定下标的 url 。if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//根据路由策略获取 url.routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info//忽略一长串组装日志代码//保存日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);}//根据地址url,将参数发送到指定的客户端。public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}}

总结以上XxlJobTrigger类的代码,做了几件事

1、根据jobId从数据库获取job参数
2、根据job参数获取groupId后,再获取分组信息,里面包含了分组中的客户端 ip:port地址。
3、根据路由策略,获取指定的address,将job参数通过http发送往客户端。
4、记录日志。

4.3.4 JobRegistryHelper 类

客户端会定时上报自身的ip+port,JobRegistryHelper就是专门处理这些信息的。
实现类比较简单,就补贴源码了,只讲一下逻辑。
1、定义了一个线程池,专门保存或者修改用来处理客户端上报的address。
2、定义了一个后台线程,周期性(30s)处理以下逻辑:清除过期的客户端注册信息(30*3s不上报),将最新的address更新到各自的任务组中。

4.3.4 路由策略

路由策略抽象类为 ExecutorRouter,在配置job的时候指定的路由策略,就有对应的ExecutorRouter子类去实现。

public abstract class ExecutorRouter {/*** route address* @param addressList* @return  ReturnT.content=address*/public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);}

1、第一个:直接取addressList第一个地址
2、最后一个:直接取 addressList最后地址
3、轮询: 对调度次数进行计数n,n%addressList.size获取地址下标。
4、随机: 随机取一个。
5、一致性hash: 使用java.util.TreeMap.tailMap()方法来实现。负载均衡之一致性哈希环算法
6、最不经常使用:LFU(Least Frequently Used):最不经常使用,频率/次数
7、最近最久未使用:LRU(Least Recently Used):最近最久未使用,时间
8、故障转移:对addressList进行循环http请求,第一个正常返回的地址作为调度地州。
9、忙碌转移:通过http请求客户端检查JobThread,第一个空闲的客户端作为调度客户端。
10、分片广播:发送给所有的客户端。

4.4 客户端逻辑

客户端核心类如下:

在这里插入图片描述

功能说明
XxlJobTask注解被标注的方法将会被处理成为 IJobHandler, 与@Scheduled注解功能相似 (19年底新增注解)。 每个IJobHandler有唯标识
EmbedServer客户端server启动一个netty,用于接收服务端的调度
ExecutorBizImpl处理服务端的请求EmbedServer接收请求后,实际交给ExecutorBizImpl进行处理,里面有处理阻塞策略
TriggerParam触发参数记录服务端发送过来的任务
JobThreadJob线程用 LinkedBlockingQueue 缓存服务端传递过来的 TriggerParam。轮询 LinkedBlockingQueue,顺序处理同一个job的任务
IJobHandlerTask抽象类被@XxlJob的注释的方法,或者通过服务端传递过来的代码,将会封装成为一个 IJobHandler 实现类
XxlJobContext上下文内置InheritableThreadLocal,在线程中存储变量,供给IJobHandler

从以上表格基本可以看出客户端的执行逻辑,其中比较重要的是 ExecutorBizImpl 和 JobThread,以及XxlJob注解的原理。 将会对这三个类进行介绍。

4.4.1 @XxlJob 注解原理

在客户端引入XXL-JOB的时候,一般需要进行如下配置

    @Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}

查看XxlJobSpringExecutor 具体代码如下

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}//通过Spring的ApplicationContext,获取到使用了@XxlJob注解的类,缓存起来。private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// 父类核心方法registJobHandler(xxlJob, bean, executeMethod);}}}}

继续看父类XxlJobExecutor,可以看到使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。

public class XxlJobExecutor {//使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}String name = xxlJob.value();//make and simplify the variables since they'll be called several times laterClass<?> clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;//初始化代码if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}//销毁代码if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}
}

最终包装成为了MethodJobHandler

public class MethodJobHandler extends IJobHandler {private final Object target;private final Method method;private Method initMethod;private Method destroyMethod;
}

以上流程总结如下

在spring启动,所有单例类都创建完成后,触发从ApplicationContext获取所有标注了 @XxlJob的bean和对应方法。最终封装成为MethodJobHandler,存储到了了一个ConcurrentMap中。key为JOB的唯一标识,与服务端一对一对应。等待服务端的调用。

4.4.2 JobThread

从@XxlJob的原理,可以看到,一个job最终会被封装成为MethodJobHandler,那么客户端如何处理服务端下发的调度任务呢。
J
obThread是真正客户端真正执行任务的地方。每一个JAVA类型的JOB都会对应一个JobThread。

public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;//标注了@XxlJob的方法或者从前端传过来的代码脚本。private IJobHandler handler;// 存储服务端穿过来的请求,如果前一个任务没有执行文,后续的会继续存在这里。private LinkedBlockingQueue<TriggerParam> triggerQueue;// 服务端每发送一次到客户端,会生成一个唯一的JOBid,可以用来做幂等,防止HTTP请求重试等造成重复调用。private Set<Long> triggerLogIdSet;private volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;			// idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());// assign job thread namethis.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());}public IJobHandler getHandler() {return handler;}//存储服务端调度请求public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}//杀死调度任务public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}//启动线程@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// 死循环知道停止while(!toStop){running = false;//统计空闲次数 超过30次就终止线程idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// 创建日志文件,用于存储日志,日志异步上报。String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// 初始化上下文,使用InheritableThreadLocal保存XxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// 设定调度过期时间Thread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {//没有过期时间,直接执行handler.execute();}// 处理执行结果if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4.4.3 ExecutorBizImpl

客户端在启动后,会在EmbedServer实例中启动一个netty专门接收从服务端发送来的请求。其中包括检查服务端心跳,job线程心跳,终止调度,读取日志以及JOB调度等,都是交给ExecutorBizImpl进行处理。这里主要介绍下任务调度的过程:

public class ExecutorBizImpl implements ExecutorBiz {@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {// 根据id获取 JOB的执行线程JobThreadJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());// 获取jobThread内部的 jobHandlerIJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}
}

4.5 几个表格作用

xxl_job_group:任务分组。一个执行器算作一个组。每组下面会记录对应的实例地址。
在这里插入图片描述

xxl_job_info:具体的任务信息。
xxl_job_lock:分布式锁
xxl_job_log:日志
xxl_job_log_report:调度统计
xxl_job_logglue: 可以记录GLUE模式代码历史版本
xxl_job_registry:注册信息表,每一台机器注册上来,都会记录一条记录。

在这里插入图片描述

xxl_job_user:用户表

其中xxl_job_group,xxl_job_info,xxl_job_lock是调度器的关键,其他的是起到支撑辅助作用。

5 Quartz

5.1 什么是 Quartz

从 Quartz官网 简介可以知道,
Quartz 是一个开源的任务调度框架,可以用于单体应用,也可以用于大型的电子商务平台,支持成千上万的任务。

5.1 Quartz 简单demo

新建一个maven项目,引入依赖

<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.1</version>
</dependency>
public class Application {public static void main(String[] args) {try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.start();//定义一个工作对象 设置工作名称与组名JobDetail job = JobBuilder.newJob(HelloJob.class).withIdentity("job41","group1").build();//定义一个触发器 简单Trigger 设置工作名称与组名 5秒触发一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","group1").startNow().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5)).build();//设置工作 与触发器scheduler.scheduleJob(job, trigger);
//            scheduler.shutdown();} catch (SchedulerException se) {se.printStackTrace();}}
}

Scheduler 是一个接口定义了一系列提交job的方法。
其实现类如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0GBKRmSg-1646875792192)(./assets/JAVA任务调度技术-1645528734979.png)]

查看 StdScheduler 本质上是一个代理类,代理了 QuartzScheduler 类的所有方法。

挑选一个方法来看:

public class QuartzScheduler implements RemotableQuartzScheduler {//存储jobprivate QuartzSchedulerResources resources;public Date scheduleJob(JobDetail jobDetail,Trigger trigger) throws SchedulerException {//省略校验OperableTrigger trig = (OperableTrigger) trigger;if (trigger.getJobKey() == null) {trig.setJobKey(jobDetail.getKey());} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {throw new SchedulerException("Trigger does not reference given job!");}trig.validate();Calendar cal = null;if (trigger.getCalendarName() != null) {cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());}Date ft = trig.computeFirstFireTime(cal);if (ft == null) {throw new SchedulerException("Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");}resources.getJobStore().storeJobAndTrigger(jobDetail, trig);notifySchedulerListenersJobAdded(jobDetail);notifySchedulerThread(trigger.getNextFireTime().getTime());notifySchedulerListenersSchduled(trigger);return ft;}//start方法public void start() throws SchedulerException {if (shuttingDown|| closed) {throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");}// QTZ-212 : calling new schedulerStarting() method on the listeners// right after entering start()notifySchedulerListenersStarting();if (initialStart == null) {initialStart = new Date();//启动了线程this.resources.getJobStore().schedulerStarted();startPlugins();} else {resources.getJobStore().schedulerResumed();}schedThread.togglePause(false);getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");notifySchedulerListenersStarted();}
}

可以看到存储和触发的代码

resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

其中resources.getJobStore() 为 JobStore 实例。用于存储job和triger提供给 QuartzScheduler 使用。

JOB Store

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OIbYPdrE-1646875792193)(./assets/JAVA任务调度技术-1645529673682.png)]

RAMJobStore 内存行存储,单机情况下默认。
JDBC JobStore 数据库。
查看 RAMJobStore 。

public class RAMJobStore implements JobStore {/** ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* * Data members.* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/protected HashMap<JobKey, JobWrapper> jobsByKey = new HashMap<JobKey, JobWrapper>(1000);protected HashMap<TriggerKey, TriggerWrapper> triggersByKey = new HashMap<TriggerKey, TriggerWrapper>(1000);protected HashMap<String, HashMap<JobKey, JobWrapper>> jobsByGroup = new HashMap<String, HashMap<JobKey, JobWrapper>>(25);protected HashMap<String, HashMap<TriggerKey, TriggerWrapper>> triggersByGroup = new HashMap<String, HashMap<TriggerKey, TriggerWrapper>>(25);protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());protected HashMap<String, Calendar> calendarsByName = new HashMap<String, Calendar>(25);protected Map<JobKey, List<TriggerWrapper>> triggersByJob = new HashMap<JobKey, List<TriggerWrapper>>(1000);protected final Object lock = new Object();protected HashSet<String> pausedTriggerGroups = new HashSet<String>();protected HashSet<String> pausedJobGroups = new HashSet<String>();protected HashSet<JobKey> blockedJobs = new HashSet<JobKey>();protected long misfireThreshold = 5000l;protected SchedulerSignaler signaler;//获取下一个任务public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {synchronized (lock) {List<OperableTrigger> result = new ArrayList<OperableTrigger>();Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();long batchEnd = noLaterThan;// return empty list if store has no triggers.if (timeTriggers.size() == 0)return result;while (true) {TriggerWrapper tw;try {tw = timeTriggers.first();if (tw == null)break;timeTriggers.remove(tw);} catch (java.util.NoSuchElementException nsee) {break;}if (tw.trigger.getNextFireTime() == null) {continue;}if (applyMisfire(tw)) {if (tw.trigger.getNextFireTime() != null) {timeTriggers.add(tw);}continue;}if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {timeTriggers.add(tw);break;}// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then// put it back into the timeTriggers set and continue to search for next trigger.JobKey jobKey = tw.trigger.getJobKey();JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;if (job.isConcurrentExectionDisallowed()) {if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {excludedTriggers.add(tw);continue; // go to next trigger in store.} else {acquiredJobKeysForNoConcurrentExec.add(jobKey);}}tw.state = TriggerWrapper.STATE_ACQUIRED;tw.trigger.setFireInstanceId(getFiredTriggerRecordId());OperableTrigger trig = (OperableTrigger) tw.trigger.clone();if (result.isEmpty()) {batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;}result.add(trig);if (result.size() == maxCount)break;}// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.if (excludedTriggers.size() > 0)timeTriggers.addAll(excludedTriggers);return result;}}
}

从以上源码可知 RAMJobStore 是存储job以及相关参数的地方。
其中特别注意的是timeTriggers属性,其使用TreeSet来保存,这个是实现类似Timer优先队列的作用。

QuartzSchedulerThread 的run方法,就是去获取任务的地方。

public class QuartzSchedulerThread extends Thread {/*** <p>* The main processing loop of the <code>QuartzSchedulerThread</code>.* </p>*/@Overridepublic void run() {int acquiresFailed = 0;while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());acquiresFailed = 0;if (log.isDebugEnabled())log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {if (acquiresFailed == 0) {qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;} catch (RuntimeException e) {if (acquiresFailed == 0) {getLog().error("quartzSchedulerThreadLoop: RuntimeException "+e.getMessage(), e);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;while(timeUntilTrigger > 2) {synchronized (sigLock) {if (halted.get()) {break;}if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try {// we could have blocked a long while// on 'synchronize', so we must recomputenow = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;if(timeUntilTrigger >= 1)sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}}}if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break;}now = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;}// this happens if releaseIfScheduleChangedSignificantly decided to release triggersif(triggers.isEmpty())continue;// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);if(res != null)bndles = res;} catch (SchedulerException se) {qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop againfor (int i = 0; i < triggers.size(); i++) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));}continue;}}for (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result =  bndles.get(i);TriggerFiredBundle bndle =  result.getTriggerFiredBundle();Exception exception = result.getException();if (exception instanceof RuntimeException) {getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}// it's possible to get 'null' if the triggers was paused,// blocked, or other similar occurrences that prevent it being// fired at this time...  or if the scheduler was shutdown (halted)if (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted)}} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted)}long now = System.currentTimeMillis();long waitTime = now + getRandomizedIdleWaitTime();long timeUntilContinue = waitTime - now;synchronized(sigLock) {try {if(!halted.get()) {// QTZ-336 A job might have been completed in the mean time and we might have// missed the scheduled changed signal by not waiting for the notify() yet// Check that before waiting for too long in case this very job needs to be// scheduled very soonif (!isScheduleChanged()) {sigLock.wait(timeUntilContinue);}}} catch (InterruptedException ignore) {}}} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;}} 

5.2 几个概念

Trigger
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-seQJRg7e-1646875792194)(./assets/Java任务调度-Quartz-1645780446974.png)]

6 Elastic-Job

待续

7 Apache DolphinScheduler

待续

总结

请添加图片描述

参考
Java中常见的几种任务调度框架对比

引用

  1. Spring Job?Quartz?XXL-Job?年轻人才做选择,艿艿全莽~
  2. Timer与TimerTask的真正原理&使用介绍
  3. 深入 DelayQueue 内部实现
  4. PriorityQueue详解
  5. Java优先级队列DelayedWorkQueue原理分析
  6. 【Java基础】JAVA中优先队列详解
  7. quartz (从原理到应用)详解篇
  8. 《分布式任务调度平台XXL-JOB》-官方文档
  9. 平衡二叉堆
  10. 聊聊Java进阶之并发基础技术—线程池剖析
  11. Java中常见的几种任务调度框架对比
  12. Quartz 源码解析(一) —— 基本介绍

(该文成于2022年公司内部主题分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58331.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

将多个commit合并成一个commit并提交

0 Preface/foreword 1 压缩多个commit方法 1.1 git merge --squash 主分支&#xff1a;main 开发分支&#xff1a;test 当前在test分支提交了8个commits&#xff0c;功能已经开发完成&#xff0c;需要将test分支合并到main分支&#xff0c;但是不想在合并时候&#xff0c;看…

开源一套基于若依的wms仓库管理系统,支持lodop和网页打印入库单、出库单的源码

大家好&#xff0c;我是一颗甜苞谷&#xff0c;今天分享一款基于若依的wms仓库管理系统&#xff0c;支持lodop和网页打印入库单、出库单的源码。 前言 在当今快速发展的商业环境中&#xff0c;库存管理对于企业来说至关重要。然而&#xff0c;许多企业仍然依赖于传统的、手动…

【Rust】环境搭建

▒ 目录 ▒ &#x1f6eb; 导读需求 1️⃣ 安装Chocolatey安装依赖 2️⃣ 安装RustRover安装toolchain&#xff08;rustup、VS&#xff09;重启配置生效设置安装插件 &#x1f4d6; 参考资料 &#x1f6eb; 导读 需求 重装系统&#xff0c;记录下环境搭建遇到的问题。 1️⃣ …

安装Ubuntu系统

打开vmware&#xff0c;新建一个Ubuntu虚拟机&#xff0c;点击自定义&#xff0c;进入下一步 &#xff0c;选择Workstation 17.x后&#xff0c;点击下一步 选择稍后安装系统选项&#xff0c;进入选择客户机操作系统页面&#xff0c;客户机操作系统选择Linux&#xff0c;版本选…

rom定制系列------红米note8_miui14安卓13定制修改固件 带面具root权限 刷写以及界面预览

&#x1f49d;&#x1f49d;&#x1f49d;红米note8机型代码&#xff1a;ginkgo。高通芯片。此固件官方最终版为稳定版12.5.5安卓11的版本。目前很多工作室需要高安卓版本的固件来适应他们的软件。并且需要root权限。根据客户要求。修改固件为完全root。并且修改为可批量刷写的…

了解一下,RN中怎么加载 threejs的

在React Native&#xff08;RN&#xff09;中加载和使用Three.js&#xff0c;一个流行的3D图形库&#xff0c;通常需要一些额外的步骤&#xff0c;因为Three.js主要是为Web浏览器设计的&#xff0c;而React Native则使用原生的渲染引擎。不过&#xff0c;有一些方法可以在React…

电脑仅一个C盘如何重装系统?超简单教程分享!

当我们的电脑仅配备一个C盘时&#xff0c;重装系统的过程可能会显得尤为棘手。因为一旦格式化硬盘&#xff0c;安装系统的分区也可能被一并清除&#xff0c;导致安装过程中断。这时候我们完全可以通过对电脑进行分区来解决这一问题。分区不仅能够帮助我们更好地管理硬盘空间&am…

提升网站速度与性能优化的有效策略与实践

内容概要 在数字化快速发展的今天&#xff0c;网站速度与性能优化显得尤为重要&#xff0c;它直接影响用户的浏览体验。用户在访问网站时&#xff0c;往往希望能够迅速获取信息&#xff0c;若加载时间过长&#xff0c;轻易可能导致他们转向其他更为流畅的网站。因此&#xff0…

流媒体协议.之(RTP,RTCP,RTSP,RTMP,HTTP)(二)

继续上篇介绍&#xff0c;本篇介绍一下封装RTP的数据格式&#xff0c;如何将摄像头采集的码流&#xff0c;音频的码流&#xff0c;封装到rtp里&#xff0c;传输。 有自己私有协议例子&#xff0c;有rtp协议&#xff0c;参考代码。注意不是rtsp协议。 一、私有协议 玩过tcp协议…

构建灵活、高效的HTTP/1.1应用:探索h11库

文章目录 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库背景这个库是什么&#xff1f;如何安装这个库&#xff1f;库函数使用方法使用场景常见的Bug及解决方案总结 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库 背景 在现代网络应用中&#xff0c;HTTP协议是基础…

利用游戏引擎的优势

大家好&#xff0c;我是小蜗牛。 在当今快速发展的游戏产业中&#xff0c;选择合适的游戏引擎对开发者来说至关重要。Cocos Creator作为一款功能强大且灵活的游戏引擎&#xff0c;为开发者提供了丰富的工具和资源&#xff0c;使他们能够高效地开发出优秀的游戏。本文将探讨如何…

TensorFlow面试整理-分布式

在深度学习的训练过程中,随着数据量和模型的复杂性增加,单个 GPU 或 CPU 无法满足高效训练的需求。TensorFlow 提供了强大的 分布式训练 功能,通过并行处理加速训练过程。分布式训练可以在多个 GPU、多个机器甚至是 TPU 上运行。以下是分布式训练的关键概念及其使用方法。 1…

仓颉编程语言官网正式上线 !首个公测版本开放下载 !

今年6月21日&#xff0c;华为开发者大会&#xff08;HDC&#xff09;正式公开介绍了华为自研的通用编程语言&#xff1a;仓颉编程语言&#xff0c;并发布了HarmonyOS NEXT仓颉语言开发者预览版&#xff0c;开发者可以使用仓颉开发鸿蒙原生应用。4个月以来&#xff0c;仓颉编程语…

PHP爬虫的奇幻之旅:如何用代码“偷窥”京东商品的SKU信息

开篇&#xff1a;代码界的007 想象一下&#xff0c;你是一名代码界的007&#xff0c;你的任务是潜入京东的数据库&#xff0c;获取商品的SKU信息。不过别担心&#xff0c;我们不是真的去偷数据&#xff0c;而是用PHP编写一个爬虫&#xff0c;合法地获取公开的API数据。这不仅是…

C++初阶(七)--类和对象(4)

目录 ​编辑 一、再谈构造函数 1.构造函数体赋值 2.初始化列表 二、类型转换 1.隐式类型转换 2.explicit关键字 3.类类型之间的对象隐式转换 三、static成员函数 1.概念 2.特性 3.面试题&#xff1a; 四、友元函数 1.基本介绍 2.回顾&#xff1a; 3.友元类&am…

【问题记录】当机器人存在多个串口需要绑定时udevadm的作用

一、正常绑定 输入sudo udevadm info -a /dev/ttyUSBx | grep KERNELS 命令 会出现KERNELS的编号&#xff0c;记录编号。 修改规则文件/etc/udev/rules.d/99-usb.rules 添加以下命令 KERNEL"ttyUSB*", KERNELS"2-1.2:1.0", MODE:"0666", GROU…

kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

大家好&#xff0c;我是锋哥。今天分享关于【kafka 分布式&#xff08;不是单机&#xff09;的情况下&#xff0c;如何保证消息的顺序消费?】面试题&#xff1f;希望对大家有帮助&#xff1b; kafka 分布式&#xff08;不是单机&#xff09;的情况下&#xff0c;如何保证消息的…

掌握闲鱼商品 API 接口对接流程

对接闲鱼商品 API 接口主要有以下步骤&#xff1a; 注册成为闲鱼平台合作伙伴&#xff1a; 访问闲鱼开放平台&#xff08;如添加 TNY264278 卫星号&#xff09;&#xff0c;按照平台要求进行账号注册。你需要提供个人或公司的相关信息&#xff0c;如联系方式、企业资质&#xf…

微信小程序时间弹窗——年月日时分

需求 1、默认当前时间2、选择时间弹窗限制最大值、最小值3、每次弹起更新最大值为当前时间&#xff0c;默认值为上次选中时间4、 minDate: new Date(2023, 10, 1).getTime(),也可以传入时间字符串new Date(2023-10-1 12:22).getTime() html <view class"flex bb ptb…

【UE5.3 Cesium for Unreal】编译GlobePawn

目录 前言 效果 步骤 一、下载所需文件 二、下载CesiumForUnreal插件 三、处理下载的文件 四、修改代码 “CesiumForUnreal.uplugin”部分 “CesiumEditor.cpp”部分 “CesiumEditor.h”部分 “CesiumPanel.cpp”部分 “IonQuickAddPanel.cpp”部分 “IonQuickAd…