基于 Apache Dolphinscheduler3.1.9中的Task 处理流程解析

实现一个调度任务,可能很简单。但是如何让工作流下的任务跑得更好、更快、更稳定、更具有扩展性,同时可视化,是值得我们去思考得问题。

Apache DolphinScheduler是一个分布式和可扩展的开源工作流协调平台,具有强大的DAG可视化界面,广泛应用于数据集成、数据分析和大规模数据迁移。

Master 整体启动流程

    @PostConstructpublic void run() throws SchedulerException {// init rpc serverthis.masterRPCServer.start();// install task pluginthis.taskPluginManager.loadPlugin();// self tolerantthis.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);this.masterSchedulerBootstrap.init();// 处理任务的核心 重点是处理任务this.masterSchedulerBootstrap.start();// 事件执行启动this.eventExecuteService.start();this.failoverExecuteThread.start();this.schedulerApi.start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (!ServerLifeCycleManager.isStopped()) {close("MasterServer shutdownHook");}}));}

上面的代码主要做的事情:

初始化rpc的服务端,也即netty的服务端,为处理请求做好铺垫
安装Task插件,task插件主要为业务需要集成的SPI信息
master注册客户端启动
初始化master定时引导
master定时引导启动
事件执行启动
失败执行线程启动
定时任务api启动

下面我们重点看这两段代码:

this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();

this.masterSchedulerBootstrap.start()

任务的来源在t_ds_command表里面,因此需要先取出指令信息,然后将指令转处理实例,遍历处理实例,创建新的工作流线程。

(1) 将处理实例id和处理线程放入到processInstanceExecCacheManager中。

(2) 添加工作流运行状态和实例id放入到workflowEventQueue队列中。

因此可以看到消费的poolEvent,可以看到workflowEventQueue.poolEvent() 本质就是workflowEventLooper.start()启动消费。

此时我们可以看到workflowEventLooper.start(),处理放入到workflowEventQueue中的事件event,也即workflowEventQueue.take(),获取工作流处理器,处理事件。

Run中的核心处理:

刘亚洲

此时先执行工作流的流程,核心方法:workflowExecuteRunnable::call

workflowEventLooper.start()启动做的事情:

将任务放入到优先任务队列之后,就可以进行消费队列中的任务了。

ProcessInstance任务放入的核心

ProcessInstance 启动入口点为:org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#call

根据工作流线程状态可分为:

1)创建过程中的逻辑:

2)初始化DAG的过程:

3)初始化队列的过程:

4)工作流状态成功:

生成者消费者模型的产生是优先任务队列的放入和优先任务队列的消费。

因此可以看到两者在转换的过程之后基于Netty做了任务的转发操作,从而在Netty中做指令处理,从而完成消费,最终流转到具体的Task。

消费任务:TaskPriorityQueueConsumer

核心方法:this.batchDispatch(fetchTaskNum)

netty服务端消费任务消息

实质是放入任务线程,也即:

workerManager.offer(workerTaskExecuteRunnable)

处理消费在run方法里面:

waitSubmitQueue.take()

处理任务的核心:此时会具体流转到具体的任务,执行处理

此时完成任务的适配业务任务的处理,最终实现任务的处理。

this.eventExecuteService.start()

针对任务处理的状态进行处理。

stateEventHandler.handleStateEvent(this, stateEvent)

其主要过程是添加状态事件和消费状态事件,重点看队列的生产和消费。

分析的思路和上面的队列模式差不多,这里不展开了。

总结

从上面我们可以看到生产者消费者模型、线程模型在DS3.1.9版本中使用非常的多,也是我们去了解处理的思路的点。

同时对应任务的处理,为了保证任务的高效处理,使用了Netty来处理任务。

总体来说,代码写得还是很不错的,值得我们去学习。同时还使用了很多设计模式,比如SPI、工厂模式、状态模式等等。

参考:

dolphinscheduler文档:https://dolphinscheduler.apache.org/zh-cn github地址:https://github.com/apache/dolphinscheduler

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62141.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯2117砍竹子(简单易懂 包看包会版)

问题描述 这天, 小明在砍竹子, 他面前有 n 棵竹子排成一排, 一开始第 i 棵竹子的 高度为 hi​. 他觉得一棵一棵砍太慢了, 决定使用魔法来砍竹子。魔法可以对连续的一 段相同高度的竹子使用, 假设这一段竹子的高度为 H, 那么 用一次魔法可以 把这一段竹子的高度都变为 ⌊H2⌋…

如何进行 JavaScript 性能优化?

要进行 JavaScript 性能优化,我们可以从多个角度进行思考,主要包括减少页面渲染时间、减少内存占用、优化代码执行效率等。以下是优化的一些方法,并结合实际项目代码示例讲解。 目录结构 减少 DOM 操作 缓存 DOM 元素批量更新 DOM 优化 Jav…

CTF-PWN: 全保护下格式化字符串利用 [第一届“吾杯”网络安全技能大赛 如果能重来] 赛后学习(不会)

通过网盘分享的文件:如果能重来.zip 链接: https://pan.baidu.com/s/1XKIJx32nWVcSpKiWFQGpYA?pwd1111 提取码: 1111 --来自百度网盘超级会员v2的分享漏洞分析 格式化字符串漏洞,在printf(format); __int64 sub_13D7() {char format[56]; // [rsp10h] [rbp-40h]…

selenium-常见问题解决方案汇总

selenium-常见问题解决方案 selenium版本selenium代理本地浏览器页面Selenium之多窗口句柄的切换 selenium版本 selenium版本为: 3.141.0 注:selenium4x跟selenium3x会有不同的使用方法, selenium代理本地浏览器页面 利用 Selenium 库实现对 Google C…

Flask使用长连接

Flask使用flask_socketio实现websocket Python中的单例模式 在HTTP通信中,连接复用(Connection Reuse)是一个重要的概念,它允许客户端和服务器在同一个TCP连接上发送和接收多个HTTP请求/响应,而不是为每个新的请求/响…

雨晨 26100.2454 Windows 11 24H2 专业工作站 极简纯净版

文件: 雨晨 26100.2454 Windows 11 24H2 专业工作站极简 install.esd 大小: 1947043502 字节 修改时间: 2024年12月6日, 星期五, 16:38:37 MD5: 339B7FDCA0130D432A0E98957738A9DD SHA1: 2978AE0CEAF02E52EC4135200D4BDBC861E07BE8 CRC32: 8C329C89 简述: 由YCDIS…

[docker中首次配置git环境与时间同步问题]

11月没写东西,12月初赶紧水一篇。 刚开始搭建docker服务器时,网上找一堆指令配置好git后,再次新建容器后忘记怎么配了,,这次记录下。 一、git ssh指令法,该方法不用每次提交时输入密码 前期准备&#xff0…

MongoDB性能监控工具

mongostat mongostat是MongoDB自带的监控工具,其可以提供数据库节点或者整个集群当前的状态视图。该功能的设计非常类似于Linux系统中的vmstat命令,可以呈现出实时的状态变化。不同的是,mongostat所监视的对象是数据库进程。mongostat常用于…

linux下的python打包

linux下的python打包 一、pyinstaller 优点:打包简单,将整个运行环境进行打包 缺点:打包文件大、臃肿、启动慢 安装pyinstaller包 pip install pyinstaller 打包一个文件 pyinstaller -D app.py会在当前路径中生成build、dist文件夹还有…

Python模块之random、hashlib、json、time等内置模块语法学习

Python内置模块语法学习 random、hashlib、json、time、datetime、os等内置模块语法学习 模块 简单理解为就是一个.py后缀的一个文件 分为三种: 内置模块:python自带,可调用第三方模块:别人设计的,可调用自定义模块…

从ctfwiki开始的pwn之旅 5.ret2csu

ret2csu 原理 在 64 位程序中,函数的前 6 个参数是通过寄存器传递的,但是大多数时候,我们很难找到每一个寄存器对应的 gadgets。 这时候,我们可以利用 x64 下的 __libc_csu_init 中的 gadgets。这个函数是用来对 libc 进行初始…

Ceph对象存储

Ceph对象存储1.概念对象存储(Object Storage)是一种用于存储大量非结构化数据的架构模型它使用简单的HTTP或HTTPS协议进行文件访问,而不是传统的文件系统API与传统的文件系统存储方式不同,对象存储不是将数据存储在目录或文件夹中…

嵌入式蓝桥杯学习拓展 LCD翻转显示

通过配置SS和GS两个标志位,实现扫描方向的切换。 将lcd.c的REG_932X_Init函数进行部分修改。 将LCD_WriteReg(R1, 0x0000);修改为LCD_WriteReg(R1,0x0100); 将LCD_WriteReg(R96, 0x2700); 修改为LCD_WriteReg(R96, 0xA700); void REG_932X_Init1(void) {LCD_Wr…

小程序 —— Day1

组件 — view和scroll-view view 类似于HTML中的div,是一个块级元素 案例:通过view组件实现页面的基础布局 scroll-view 可滚动的视图区域,用来实现滚动列表效果 案例:实现纵向滚动效果 scroll-x属性:允许横向滚动…

git pull error: cannot lock ref

Git: cannot lock ref ‘refs/remotes/origin/feature/xxx’: refs/remotes/origin/feature/xxx/car’ exists; cannot create refs/remotes/origin/feature/xxx git remote prune origin重新整理服务端和本地的关联关系即可

pubmed关键词搜索技能1:待更新

1,白话变为领域内学术词: 例如,我想要做蛋白质糖基化修饰以功能,这个领域课题,则 第一性原理,首先是拆分词汇:糖基化(一般比蛋白质、修饰、功能要在title中更常见,或者是…

iPhone手机清理软件:相册清理大师推荐

随着智能手机成为我们日常生活的必需品,手机中的数据日益膨胀,尤其是照片和视频这类容易积累的文件。对于iPhone用户来说,管理这些文件,特别是清理相册变得尤为重要。本文将介绍一款备受推崇的iPhone手机清理软件——CleanMyPhone…

SpringBoot 开源停车场管理收费系统

一、下载项目文件 下载源码项目文件口令: 【前端小程序地址】(3.0):伏脂火器白泽知洞座/~6f8d356LNL~:/【后台管理地址】(3.0):伏脂火器仇恨篆洞座/~0f4a356Ks2~:/【岗亭端地址】(3.0):动作火器智汇堂多好/~dd69356K6r~:/复制口令…

网络原理之 TCP 协议

目录 1. TCP 协议格式 2. TCP 原理 (1) 确认应答 (2) 超时重传 (3) 连接管理 a) 三次握手 b) 四次挥手 (4) 滑动窗口 (5) 流量控制 (6) 拥塞控制 (7) 延时应答 (8) 捎带应答 3. TCP 特性 4. 异常情况的处理 1) 进程崩溃 2) 主机关机 (正常流程) 3) 主机掉电 (…

STM32使用RCC(Reset Clock Contorl,复位时钟控制器)配置时钟以及时钟树

RCC主要作用 设置系统时钟SYSCLK(System Clock)频率;设置AHB、APB2、APB1以及各个外设分频因子,从而设置HCLK、PCLK2、PCLK1以及各个外设的时钟频率;控制AHB、APB2、APB1这三条总线时钟以及每个外设的时钟开启&#xf…