XXL-JOB从入门到进阶——架构架构、核心原理
2025-11-14 17:29 tlnshuju 阅读(0) 评论(0) 收藏 举报目录
1 XXL-JOB系统架构
1.1 XXL-JOB系统架构
1.1.1 调度中心(Admin)
1.1.2 执行器(Executor)
2 XXL-JOB设计思想
2.1 定时任务执行流程图
2.2 细节分析
2.2.1 执行器初始化
2.2.2 内嵌服务器
2.2.3 执行器注册
2.2.4 任务重复执行问题
2.2.5 快慢执行线程池
2.2.6 任务阻塞队列
2.2.7 回调
前言:
有关XXL-JOB的使用可以查看上一篇文章,这篇文章主要深入学习一些XXL-JOB的底层原理。
XXL-JOB从入门到进阶——特性、部署、快速集成
https://blog.csdn.net/sniper_fandc/article/details/153210307?fromshare=blogdetail&sharetype=blogdetail&sharerId=153210307&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
1 XXL-JOB系统架构
1.1 XXL-JOB系统架构

XXL-JOB将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
1.1.1 调度中心(Admin)
作用:任务的大脑,负责管理所有任务的调度逻辑。
功能:任务的创建、修改、删除;定时检查任务触发条件(如Cron表达式);将任务分发给执行器集群。
1.1.2 执行器(Executor)
作用:任务的执行者,嵌入在业务系统中。
功能:接收调度中心的指令并执行任务;支持多种任务类型(Java方法、HTTP请求、Shell脚本等);将执行结果和日志反馈给调度中心。
2 XXL-JOB设计思想
2.1 定时任务执行流程图

这张图展现了xxl-job调度中心和执行器关于任务调度和任务执行的核心流程(参考源码制作,省略了部分细节)。
2.2 细节分析
2.2.1 执行器初始化
在XxlJobSpringExecutor类(该类继承父类XxlJobExecutor)中:
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext != null) {String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);String[] var3 = beanDefinitionNames;int var4 = beanDefinitionNames.length;for(int var5 = 0; var5 < var4; ++var5) {String beanDefinitionName = var3[var5];Object bean = null;Lazy onBean = (Lazy)applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean != null) {logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);} else {bean = applicationContext.getBean(beanDefinitionName);Map annotatedMethods = null;try {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup() {public XxlJob inspect(Method method) {return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable var14) {Throwable ex = var14;logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods != null && !annotatedMethods.isEmpty()) {Iterator var15 = annotatedMethods.entrySet().iterator();while(var15.hasNext()) {Map.Entry methodXxlJobEntry = (Map.Entry)var15.next();Method executeMethod = (Method)methodXxlJobEntry.getKey();XxlJob xxlJob = (XxlJob)methodXxlJobEntry.getValue();this.registJobHandler(xxlJob, bean, executeMethod);}}}}}}
该方法就是执行器初始化最关键的方法,会依次扫描所有@XxlJob注解注释的方法,并通过this调用继承自父类的registJobHandler方法来将定时任务方法封装为MethodJobHandler对象:
// 定时任务的IJobHandler对象存储结构private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();// 将定时任务存储到jobHandlerRepository中public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return (IJobHandler)jobHandlerRepository.put(name, jobHandler);}// 封装定时任务为MethodJobHandler类型并调用上面的存储方法protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {if (xxlJob != null) {String name = xxlJob.value();Class clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");} else if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");} else {executeMethod.setAccessible(true);Method initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException var11) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException var10) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}}}
2.2.2 内嵌服务器
(1)内嵌服务器配置与启动
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken)
执行器启动时,还会创建一个http内嵌服务器EmbedServer,基于Netty实现。在XxlJobExecutor类中完成EmbedServer的服务器的启动,设置address、port、appname、accessToken等值。
对于端口号port,如果没有传入port参数,在XxlJobExecutor类就会设置默认值9999:
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {port = port > 0 ? port : NetUtil.findAvailablePort(9999);ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();if (address == null || address.trim().length() == 0) {String ip_port_address = IpUtil.getIpPort(ip, port);address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}if (accessToken == null || accessToken.trim().length() == 0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}this.embedServer = new EmbedServer();this.embedServer.start(address, port, appname, accessToken);}
而XxlJobExecutor类的配置有自定义的配置类实现,比如使用如下配置类从配置文件读取配置信息实现XxlJobExecutor的配置:
@Configuration
@Slf4j
public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAccessToken(accessToken);return xxlJobSpringExecutor;}
}
(2)内嵌服务器作用
内嵌服务器主要作用就是接收调度中心的任务调度请求,其内部的静态类EmbedHttpServerHandler就是接收请求的,接收请求后交给ExecutorBiz接口处理。
ExecutorBiz接口有两个实现:

ExecutorBizImpl:这是执行器端内嵌服务器的请求处理类。
ExecutorBizClient:这是调度系统的请求发送客户端。
2.2.3 执行器注册
public void startRegistry(String appname, String address) {ExecutorRegistryThread.getInstance().start(appname, address);}
在EmbedServer类中,会在启动内嵌服务器时同时通过执行器注册线程ExecutorRegistryThread(执行器启动时创建)向调度中心注册执行器,该执行器注册线程会将appname(执行器名称)和address(ip+port)注册到调度中心:

2.2.4 任务重复执行问题
关于Springboot使用@Scheduled注解,在分布式环境下,由于存在多个服务实例,因此定时任务可能会被重复执行。在xxl-job中,使用了排他锁,即xxl_job_lock表的lock_name字段:

调度中心使用如下sql:
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
这条语句为该字段添加行级排他锁,当一个调度中心实例在该字段加锁后,其它调度中心实例就会无法加锁,从而保证每时刻只有一个调度中心进行任务调度,避免多个调度中心进行重复的任务调度造成任务被重复执行。
2.2.5 快慢执行线程池
private ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;public void start(){fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {logger.error(">>>>>>>>>>> xxl-job, admin JobTriggerPoolHelper-fastTriggerPool execute too fast, Runnable="+r.toString() );}});slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(5000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {logger.error(">>>>>>>>>>> xxl-job, admin JobTriggerPoolHelper-slowTriggerPool execute too fast, Runnable="+r.toString() );}});}
在JobTriggerPoolHelper类,实现了快/慢触发线程(流程图中快慢执行线程)(位于调度中心xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobTriggerPoolHelper.java).
如何区分任务应该进入快触发线程还是慢触发线程?xxl-job会记录任务的调度时间(也就是触发时间),这里的调度时间快慢是指调度中心根据任务调度请求的发送时间(不是任务执行时间)决定快慢,超过500ms就认为是慢。如果1分钟时间内某个任务超过10次都是慢,就会把任务交给慢触发线程进行调度。
这样设计的原因是减少频繁调度的任务(如果调度时间长)对其它需要调度的任务阻塞时间。
2.2.6 任务阻塞队列

在JobThread中维护了一个triggerQueue的阻塞队列,待执行任务会放入这个阻塞队列中,由JobThread线程从阻塞队列获取任务并执行。
与直接把任务交给线程来做相比,把任务放到阻塞队列再由线程从阻塞队列取任务执行,这样做的好处是符合单机串行的阻塞策略(因为队列先进先出),并且符合生产者消费者模型,解耦任务执行过程。
2.2.7 回调
JobThread执行任务结束后会将任务执行的结果发送到一个另一个阻塞队列callBackQueue中,该阻塞队列位于TriggerCallbackThread(触发回调线程):

该线程TriggerCallbackThread会在执行器启动时就创建,循环监控callBackQueue队列获取任务执行结果,并将执行结果批量发送给调度中心。
如果任务执行失败,或者存在子任务(任务依赖关系),调度中心会根据执行结果、重试策略等信息,再次进入任务的调度、执行流程。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/965568.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!