XXL-JOB从入门到进阶——架构架构、核心原理

news/2025/11/14 17:36:15/文章来源:https://www.cnblogs.com/tlnshuju/p/19222850

XXL-JOB从入门到进阶——架构架构、核心原理

2025-11-14 17:29  tlnshuju  阅读(0)  评论(0)    收藏  举报

目录

1 XXL-JOB系统架构

1.1 XXL-JOB系统架构

1.1.1 调度中心(Admin)

1.1.2 执行器(Executor)

2 XXL-JOB设计思想

2.1 定时任务执行流程图

2.2 细节分析

2.2.1 执行器初始化

2.2.2 内嵌服务器

2.2.3 执行器注册

2.2.4 任务重复执行问题

2.2.5 快慢执行线程池

2.2.6 任务阻塞队列

2.2.7 回调


前言:

        有关XXL-JOB的使用可以查看上一篇文章,这篇文章主要深入学习一些XXL-JOB的底层原理。

XXL-JOB从入门到进阶——特性、部署、快速集成https://blog.csdn.net/sniper_fandc/article/details/153210307?fromshare=blogdetail&sharetype=blogdetail&sharerId=153210307&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link

1 XXL-JOB系统架构

1.1 XXL-JOB系统架构

        XXL-JOB将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

        将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

        因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

1.1.1 调度中心(Admin)

        作用:任务的大脑,负责管理所有任务的调度逻辑。

        功能:任务的创建、修改、删除;定时检查任务触发条件(如Cron表达式);将任务分发给执行器集群。

1.1.2 执行器(Executor

        作用:任务的执行者,嵌入在业务系统中。

        功能:接收调度中心的指令并执行任务;支持多种任务类型(Java方法、HTTP请求、Shell脚本等);将执行结果和日志反馈给调度中心。

2 XXL-JOB设计思想

2.1 定时任务执行流程图

        这张图展现了xxl-job调度中心和执行器关于任务调度和任务执行的核心流程(参考源码制作,省略了部分细节)。

2.2 细节分析

2.2.1 执行器初始化

        在XxlJobSpringExecutor类(该类继承父类XxlJobExecutor)中:

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext != null) {String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);String[] var3 = beanDefinitionNames;int var4 = beanDefinitionNames.length;for(int var5 = 0; var5 < var4; ++var5) {String beanDefinitionName = var3[var5];Object bean = null;Lazy onBean = (Lazy)applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean != null) {logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);} else {bean = applicationContext.getBean(beanDefinitionName);Map annotatedMethods = null;try {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup() {public XxlJob inspect(Method method) {return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable var14) {Throwable ex = var14;logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods != null && !annotatedMethods.isEmpty()) {Iterator var15 = annotatedMethods.entrySet().iterator();while(var15.hasNext()) {Map.Entry methodXxlJobEntry = (Map.Entry)var15.next();Method executeMethod = (Method)methodXxlJobEntry.getKey();XxlJob xxlJob = (XxlJob)methodXxlJobEntry.getValue();this.registJobHandler(xxlJob, bean, executeMethod);}}}}}}

        该方法就是执行器初始化最关键的方法,会依次扫描所有@XxlJob注解注释的方法,并通过this调用继承自父类的registJobHandler方法来将定时任务方法封装为MethodJobHandler对象:

    // 定时任务的IJobHandler对象存储结构private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();// 将定时任务存储到jobHandlerRepository中public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return (IJobHandler)jobHandlerRepository.put(name, jobHandler);}// 封装定时任务为MethodJobHandler类型并调用上面的存储方法protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {if (xxlJob != null) {String name = xxlJob.value();Class clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");} else if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");} else {executeMethod.setAccessible(true);Method initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException var11) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException var10) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}}}

2.2.2 内嵌服务器

(1)内嵌服务器配置与启动

public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken)

        执行器启动时,还会创建一个http内嵌服务器EmbedServer,基于Netty实现。在XxlJobExecutor类中完成EmbedServer的服务器的启动,设置address、port、appname、accessToken等值。

        对于端口号port,如果没有传入port参数,在XxlJobExecutor类就会设置默认值9999:

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {port = port > 0 ? port : NetUtil.findAvailablePort(9999);ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();if (address == null || address.trim().length() == 0) {String ip_port_address = IpUtil.getIpPort(ip, port);address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}if (accessToken == null || accessToken.trim().length() == 0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}this.embedServer = new EmbedServer();this.embedServer.start(address, port, appname, accessToken);}

        而XxlJobExecutor类的配置有自定义的配置类实现,比如使用如下配置类从配置文件读取配置信息实现XxlJobExecutor的配置:

@Configuration
@Slf4j
public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAccessToken(accessToken);return xxlJobSpringExecutor;}
}

(2)内嵌服务器作用

        内嵌服务器主要作用就是接收调度中心的任务调度请求,其内部的静态类EmbedHttpServerHandler就是接收请求的,接收请求后交给ExecutorBiz接口处理。

        ExecutorBiz接口有两个实现:

        ExecutorBizImpl:这是执行器端内嵌服务器的请求处理类。

        ExecutorBizClient:这是调度系统的请求发送客户端。

2.2.3 执行器注册

    public void startRegistry(String appname, String address) {ExecutorRegistryThread.getInstance().start(appname, address);}

        在EmbedServer类中,会在启动内嵌服务器时同时通过执行器注册线程ExecutorRegistryThread(执行器启动时创建)向调度中心注册执行器,该执行器注册线程会将appname(执行器名称)和address(ip+port)注册到调度中心:

2.2.4 任务重复执行问题

        关于Springboot使用@Scheduled注解,在分布式环境下,由于存在多个服务实例,因此定时任务可能会被重复执行。在xxl-job中,使用了排他锁,即xxl_job_lock表的lock_name字段:

        调度中心使用如下sql:

select * from xxl_job_lock where lock_name = 'schedule_lock' for update

        这条语句为该字段添加行级排他锁,当一个调度中心实例在该字段加锁后,其它调度中心实例就会无法加锁,从而保证每时刻只有一个调度中心进行任务调度,避免多个调度中心进行重复的任务调度造成任务被重复执行。

2.2.5 快慢执行线程池

    private ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;public void start(){fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {logger.error(">>>>>>>>>>> xxl-job, admin JobTriggerPoolHelper-fastTriggerPool execute too fast, Runnable="+r.toString() );}});slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(5000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {logger.error(">>>>>>>>>>> xxl-job, admin JobTriggerPoolHelper-slowTriggerPool execute too fast, Runnable="+r.toString() );}});}

        在JobTriggerPoolHelper类,实现了快/慢触发线程(流程图中快慢执行线程)(位于调度中心xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobTriggerPoolHelper.java).

        如何区分任务应该进入快触发线程还是慢触发线程?xxl-job会记录任务的调度时间(也就是触发时间),这里的调度时间快慢是指调度中心根据任务调度请求的发送时间(不是任务执行时间)决定快慢,超过500ms就认为是慢。如果1分钟时间内某个任务超过10次都是慢,就会把任务交给慢触发线程进行调度。

        这样设计的原因是减少频繁调度的任务(如果调度时间长)对其它需要调度的任务阻塞时间。

2.2.6 任务阻塞队列

        在JobThread中维护了一个triggerQueue的阻塞队列,待执行任务会放入这个阻塞队列中,由JobThread线程从阻塞队列获取任务并执行。

        与直接把任务交给线程来做相比,把任务放到阻塞队列再由线程从阻塞队列取任务执行,这样做的好处是符合单机串行的阻塞策略(因为队列先进先出),并且符合生产者消费者模型,解耦任务执行过程。

2.2.7 回调

        JobThread执行任务结束后会将任务执行的结果发送到一个另一个阻塞队列callBackQueue中,该阻塞队列位于TriggerCallbackThread(触发回调线程):

        该线程TriggerCallbackThread会在执行器启动时就创建,循环监控callBackQueue队列获取任务执行结果,并将执行结果批量发送给调度中心。
        如果任务执行失败,或者存在子任务(任务依赖关系),调度中心会根据执行结果、重试策略等信息,再次进入任务的调度、执行流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/965568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2 组件封装 el-input

vue2 组件封装 el-input customInputindex.vue. // 组件页面index.js. 全局注册使用index.vue<template><el-inputv-bind="$attrs"v-on="$listeners":value="value"@inpu…

tts sdk 安装使用

# 安装/验证 SDK 版本(3.0.1460) pip install tencentcloud-sdk-python-tts==3.0.1460 pip install websocket-client==0.48 requests tts_sdk# -*- coding: utf-8 -*- import os import json import time import re…

Docker版本太老了,不支持下载镜像的解决方案

生产环境的Docker版本太老了,已经不支持下载镜像了,为了解决这个问题,可以从其它的服务器下载之前的镜像,再加载上去。# 在新环境docker save -o openjdk8.tar openjdk:8-jdk-alpine # 传输 tar 到生产机# 在生产机…

2025 最新广州补习培训机构权威推荐榜:综合实力、提分效果与口碑测评,优质补习机构最新推荐广州课外补习/广州补课/广州提分/广州学习机构推荐

引言 随着教育竞争的日益激烈,优质课外辅导成为提升学习能力的重要助力,但市场机构良莠不齐,选择难题愈发凸显。本次榜单依托国际教育质量评估协会(IEQA)最新测评数据,结合教学资质、师资水平、课程适配性、提分…

POSTROUTING 数据包离开前,路由之后 SNAT(源地址转换),源地址转换出去前

链名处理阶段典型用途PREROUTING 数据包刚到达,路由之前 DNAT(目标地址转换)POSTROUTING 数据包离开前,路由之后 SNAT(源地址转换)INPUT 发往本机的数据包 过滤本地服务访问OUTPUT 本机发出的数据包 过滤本地进程…

C++ 进阶知识点详细教程 - 第1部分

C++ 进阶知识点详细教程 - 第1部分 1. do while 循环 1.1 基本语法 do {// 循环体 } while (条件);关键特点:至少执行一次循环体,因为条件判断在循环体执行之后。 1.2 与while循环的区别 // while循环:先判断条件,…

2025年苗木批发基地实力排行:这些批发商值得信赖,青叶复叶槭/金森女贞/白蜡/金叶女贞/红叶李/苗木/紫薇/栾树/金叶复叶槭供应商哪个好

行业背景分析 随着城市绿化建设持续推进和生态修复需求增长,苗木批发行业呈现蓬勃发展态势。据最新行业数据显示,2024年全国苗木市场规模已突破2000亿元,年均增长率保持在8%以上。在这一背景下,优质苗木批发基地的…

使用ollama本地部署Embedding模型bge-large-zh-v1.5 - yi

使用ollama本地部署Embedding模型bge-large-zh-v1.5下载模型sudo ollama pull modelscope.cn/Embedding-GGUF/bge-large-zh-v1.5:latest 测试curl -X POST http://localhost:11434/api/embeddings \ -H "Content…

2025年CHRO战略指南发布,头部厂商易路提供“三位一体”数智化落地路径

一、2025 年HR领导力愿景:CHRO的三大战略核心命题 在数字化转型纵深发展、全球化竞争加剧、人才价值重构的时代背景下,人力资源管理正从传统的事务支撑职能,向驱动企业战略落地、构建核心竞争力的关键引擎转变。《2…

LLM应用剖析: 舆情分析多智能体-微舆BettaFish

本文主要讲解了微舆的整体架构,并通过研读并调试源码,整理了多个Agent各自的执行流程。1. 背景 近两周github一直霸榜的国产项目-微舆,引起了广泛的关注,11月3日start数3.4K,截止今天11月14日,start数26.6K,火…

详细介绍:kafka 4.x docker启动kafka4.0.0 docker-compose启动最新版kafka 如何使用docker容器启动最新版kafka

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

HIPCXX

https://rocm.docs.amd.com/projects/radeon-ryzen/en/latest/docs/install/installryz/native_linux/install-ryzen.html HIPCXX="$(hipconfig -l)/clang" HIP_PATH="$(hipconfig -R)" cmake…

Salesforce AI能理解业务、写代码,程序员还能做什么?

过去几年,Salesforce 一直在说“AI赋能开发”,但今年的 Dreamforce ’25,让这句话第一次真正落地。 这一届大会上,Salesforce 把 AI 从“助手”变成了参与者——能理解上下文、能协作、能写、能测、还能优化。 开发…

AI元人文:岐金兰的回应

AI元人文:岐金兰的回应 岐金兰 2025年11月14日 面对诸多对"AI元人文构想"的质疑,我需要作出如下澄清。这些质疑源于传统思维范式,而我们的构想恰恰是要超越这种范式。 关于价值降解的可行性 有人说价值不…

化工产线再升级,稳定互联profinet转devicenet网关连接技术研究

本案例适用于化工生产过程控制系统,西门子S7-1200PLC通过Profinet总线连接DeviceNet主站网关,网关下联DeviceNet从站型传感器(压力、温度传感器)和执行器(电磁阀),实现现场设备数据的采集与控制信号的下发。核心…

2025 11 14

CF2119D 计数,dp考虑当 \(p_i\) 固定的时候选 a 数组方案数是确定的,即为所有 \(p_i > 0\) 的乘积 考虑 \(p_i <= i\) 这个条件,可以看成带系数的类似于括号匹配的东西 很明显这并不是我想到的,我一直在想怎么…

用户头像文件存储机制是如何实现的?

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年行星减速机十大优质品牌排行榜,RV减速机/伺服减速机/传动减速机/传统减速电机/朕轴器/vgm减速机/精密行星减速机企业有哪些

行业背景与评选标准 随着工业自动化程度不断提升,行星减速机作为精密传动领域的核心部件,其市场需求持续增长。本次排行榜基于企业技术实力、产品性能、市场口碑及服务体系等多个维度,对行业内优质供应商进行综合评…

2025年真空管道软管厂家权威推荐榜单:给排水管道软管/由令波纹软管/快接波纹软管源头厂家精选

在工业自动化与管道输送技术持续发展的背景下,真空管道软管作为连接系统中的关键部件,其性能直接关系到整个系统的密封性能与运行效率。 据行业数据显示,2025年全球工业软管市场规模预计将保持稳定增长,其中耐真空…

2025年家具定制厂家权威推荐榜单:智能全屋定制家居/全屋定制/全屋定制家具源头厂家精选

在消费升级与个性化需求双重驱动下,我国家具定制行业正迎来新一轮发展机遇。据行业数据显示,2025年定制家居市场规模预计将突破6000亿元,消费者对环保、设计、服务的需求持续升级。 本次榜单基于技术实力、生产能力…