深圳市罗湖区住房和建设局网站wordpress前台修改
news/
2025/9/24 5:45:43/
文章来源:
深圳市罗湖区住房和建设局网站,wordpress前台修改,渭南免费做网站,做网站公司哪里好动态资源分配#xff0c;主要是spark在运行中可以相对合理的分配资源。
初始申请的资源远超实际需要#xff0c;减少executor初始申请的资源比实际需要少很多#xff0c;增多executorSpark运行多个job#xff0c;这些job所需资源有的多有的少#xff0c;动态调整executor…动态资源分配主要是spark在运行中可以相对合理的分配资源。
初始申请的资源远超实际需要减少executor初始申请的资源比实际需要少很多增多executorSpark运行多个job这些job所需资源有的多有的少动态调整executor数量
相关参数
spark.dynamicAllocation.enabled默认false设置为true则启用动态资源分配允许 Spark 根据任务需求自动调整执行器的数量。 spark.shuffle.service.enabled默认为false禁用独立的 Shuffle 服务。如果使用动态资源分配需要设置为true将Shuffle与Executor分开。 spark.dynamicAllocation.initialExecutors默认0初始执行器的数量。 spark.dynamicAllocation.minExecutors默认0执行器的最小数量。 spark.dynamicAllocation.maxExecutors默认Int最大值执行器的最大数量。 spark.dynamicAllocation.executorAllocationRatio默认1.0用于执行器分配的比例表示给每个应用程序分配的资源相对于集群中所有可用资源的比例。 spark.dynamicAllocation.schedulerBacklogTimeout默认1s作业调度队列中作业等待的超时时间。 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout默认1s作业调度队列中连续等待的时间阈值。 spark.dynamicAllocation.executorIdleTimeout默认60s没有缓存的执行器空闲时自动释放的超时时间。 spark.dynamicAllocation.cachedExecutorIdleTimeout默认Int最大值有缓存的空闲执行器的超时时间。
ExecutorAllocationManager
ExecutorAllocationManager是在SparkContext初始化的时候创建的创建后调用它的start方法。 initializing变量标记ExecutorAllocationManager是否可以进行动态调整。 addTime变量是添加新的executor的时间点
start
在start方法首先注册了两个listener
ExecutorAllocationListener通知给定的分配管理器何时添加和删除执行器。ExecutorMonitor执行器活动的监视器用于检测空闲执行器。
定时调度每100ms执行一次schedule方法。 最后向更新集群发送所需executor的信息。
numExecutors向集群申请的executor数量。集群不一定为了达到这个数量就启动或者杀死executorlocalityAwareTasksstage中具有局部首选项的任务数。这包括正在运行、挂起和已完成的任务。有些task是有指定在哪里运行或者哪里不运行的。hostToLocalTaskCounthost和希望在host上运行的task数量。包括正在运行、挂起和已完成的任务。 schedule
调用executorMonitor的timedOutExecutors获取超时的executor。 如有超时的executor表明executor首次部署成功过将initializing置为false标志可以进行动态调整executor数量。 调用updateAndSyncNumExecutorsTarget方法向集群同步executor调度的相关信息集群收到新的信息后会判断是否满足需求不满足的话会添加executor。这里集群只可能增加executor来满足目标数量不会进行kill executor。 最后调用removeExecutors移除超时的executor集合。
updateAndSyncNumExecutorsTarget
首先是调用maxNumExecutorsNeeded方法获取所需executor的最大数量。
initializing为true表明executor首次还没有部署完成不能动态调整maxNeeded numExecutorsTarget此次所需的最大数量比上次申请的executor数量少此时就要向集群更新executor目标数量让集群可以停止还没有完成部署的executor的申请addTime ! NOT_SET now addTime到达添加时间可以申请添加executor其他情况没有达到添加时间 maxNumExecutorsNeeded
计算当前任务所需要的最大executor数量。
addExecutors
计算新的executor目标值每次新增都是加上numExecutorsToAdd变量值。再经过校验调整到合理的值。 如果跟上一次的目标值一致表示新增executor过程完成了重置numExecutorsToAdd为1。 向集群发送executor目标值让集群根据情况调整。 最后调整numExecutorsToAdd方便下一次扩容。 executor新增的速度是 1 2 4 8…这样做是因为新增速度为固定值会造成目标1.executor数量小增长速度大申请了过多的executor2.目标executor数量大增长速度小executor扩容慢。
removeExecutors
移除executor不能直接将超时的executor都移除了存活的executor数量还要大于等于executor最小数量、executor目标数量。 executorIdsToBeRemoved是实际需要移除的executor 向集群发送kill executor的命令更新executor目标数量到集群。最后修改executorMonitor中对应executor状态为待移除不再进行监控这些executor
onSchedulerBacklogged
当调度程序收到新的待处理任务时调用回调。有挤压任务添加addTime
stage完成提交等待task调度推测task提交task执行失败需要重试执行 onSchedulerQueueEmpty
没有等待执行的task任务重置addTime
stage中task全部完成task开始pending的task数量为0 ExecutorAllocationListener
可以简单看一下相关变量只要是记录stage和task的关系task总量运行的task数量pending的task数量运行的推测task数量pending的推测task数量。。。 它是是一个listener主要是监听了stage和task相关事件
SparkListenerStageSubmittedSparkListenerStageCompletedSparkListenerTaskStartSparkListenerTaskEndSparkListenerSpeculativeTaskSubmitted 根据上面的变量获取running和pending任务量
onStageSubmitted
stage提交完成将initializing置为false。更新相关变量。
onStageCompleted
stage完成修改相关变量。如果这个stage是最后一个stage表明没有任务需要执行就调用onSchedulerQueueEmpty将addTime、numExecutorsToAdd重置。
onTaskStart
task开始执行更新相关变量。如果处于pending状态的task数量为0调用onSchedulerQueueEmpty重置executor新增相关变量。
onTaskEnd
task执行结束更新相关变量。
onSpeculativeTaskSubmitted
推测任务提交更新相关变量。实际task数量增加调用onSchedulerBacklogged进行新的调度。
ExecutorMonitor
ExecutorMonitor监听executor相关事件使用Tracker记录executor的信息可以返回超时的executor信息。 executorsexecutor信息的集合 nextTimeout下一次超时的时间 timedOutExecs超时的executor集合
timedOutExecutors
遍历executor的tracker获取超时的executor。最后更新下一次超时时间。 newNextTimeout下一次超时时间是所有executor中最近的超时时间
updateNextTimeout
更新nextTimeout
executorsKilled
是ExecutorAllocationManager在移除executor的时候调用这里是标记executor为待移除不是真的移除。真的移除是监听SparkListenerExecutorRemoved事件
监听相关的方法
基本都是更新相关的变量
Tracker
记录executor信息 主要变量 timeoutAt超时时间 idleStartexecutor空闲开始时间 cachedBlocks缓存的block
updateTimeout
获取timeout不含cache和shuffle的就是idleTimeoutNs有cacje和shuffle的时候还要计算cache和shuffle的超时时间。 调用ExecutorMonitor的updateNextTimeout更新下一次超时时间nextTimeout
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/914993.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!