郑州 互联网 公司网站那个网站做港货比较好
web/
2025/10/7 5:04:18/
文章来源:
郑州 互联网 公司网站,那个网站做港货比较好,深圳龙岗邮编,wordpress git项目背景
本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子#xff0c;这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并#xff0c;使得最后落盘的文件不会太大也不会太小#xff0c;从而达到小文件合并的作用#xff0c;…背景
本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并使得最后落盘的文件不会太大也不会太小从而达到小文件合并的作用这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则CoalesceShufflePartitions的作用主要是进行文件的合并是得文件不会太小OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。
结论
OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分使得罗盘的文件不会太大这个会有个问题如果我们在使用Rebalance(col)这种情况的时候如果col的值是固定的比如说值永远是20240320,那么这里就得注意一下关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置如果这些配置调整的不合适就会导致写文件的时候有可能只有一个Task在运行那么最终就只有一个文件。而且大大加长了整个任务的运行时间。
分析
直接到OptimizeSkewInRebalancePartitions中的代码中来: override def apply(plan: SparkPlan): SparkPlan {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) tryOptimizeSkewedPartitions(stage)}}如果我们禁用掉对rebalance的倾斜处理也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false默认是true那么就不会应用此规则那么如果Col为固定值的情况下就只会有一个Task进行文件的写入操作也就只有一个文件因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的此时只有一个reduce task去拉取数据)如图 假如说hash(col)为0那实际上只有reduceTask0有数据其他的ReduceTask1等等都是没有数据的所以最终只有ReduceTask0写文件并且只有一个文件。
在看合并的计算公式该数据流如下 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSizesplitSizeListByTargetSize方法中涉及到的参数解释如下
参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组举例 sizes [100,200,300,400] 表明该任务有4个maptask0表示maptask为0的所属reduce的大小1表示maptask为1的所属reduce的大小依次类推,图解如下: 比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.
参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值假如说是256MB参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值默认是0.2 这里有个计算公式 def tryMergePartitions() {// When we are going to start a new partition, its possible that the current partition or// the previous partition is very small and its better to merge the current partition into// the previous partition.val shouldMergePartitions lastPartitionSize -1 ((currentPartitionSize lastPartitionSize) targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize targetSize * smallPartitionFactor ||lastPartitionSize targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize currentPartitionSize} else {lastPartitionSize currentPartitionSize}}。。。while (i sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i 0 currentPartitionSize sizes(i) targetSize) {tryMergePartitions()partitionStartIndices icurrentPartitionSize sizes(i)} else {currentPartitionSize sizes(i)}i 1}tryMergePartitions()partitionStartIndices.toArray这里的计算公式大致就是从每个maptask中的获取到属于同一个reduce的数值依次累加如果大于targetSize就尝试合并直至到最后一个maptask 可以看到tryMergePartitions有个计算公式:currentPartitionSize targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 51.2MB 的话也还是会合并到前一个分区中去如果smallPartitionFactor设置过大可能会导致所有的分区都会合并到一个分区中去最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor*shuffleNum) 比如说以下的测试案例 val targetSize 100val smallPartitionFactor2 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq Seq(0))val sizeList6 Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq Seq(0))这种情况下就会只有一个reduce任务运行。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/88293.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!