Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?

一、sortBy 和 RangePartitioner

sortBy 在 Spark 中会在执行排序时采用 rangePartitioner 进行分区,这会影响数据的分区方式,并且这一步骤是通过对数据进行 “采样” 来计算分区的范围。不过,重要的是,sortBy 本身仍然是一个 transformation,它不会立即触发计算,但在执行过程中会涉及到对数据的排序、分区和最终计算。

1. sortBy 和 RangePartitioner

sortBy 会利用 RangePartitioner 来决定数据如何进行分区。RangePartitioner 会在排序之前,首先对数据进行采样,从而得出每个分区的范围,然后根据这些范围进行数据的分区。这是因为数据排序是一个全局操作,而 RangePartitioner 提供了一个合理的划分策略,使得 Spark 在执行排序时能够并行化。

  • 采样过程
    当调用 sortBy 时,Spark 会对数据进行 采样,通常使用的是 SampledRDD,这种采样会用来估计数据的分布范围,并为后续的分区计算提供依据。

  • RangePartitioner 的使用
    RangePartitioner 会根据数据的值划分成不同的范围。通常在分布式环境中,我们需要将数据按某种方式划分为多个分区,这个过程会使用一个范围来决定数据分布。

2. 是否会触发 runJob

sortBy 作为 transformation 不会立即触发作业执行。它返回一个新的 RDD,并仅在后续执行 action 操作时才会触发实际的计算。因此,sortBy 不会直接导致 runJob 的执行。只有在你执行类似 collect(), count(), saveAsTextFile() 等行动算子时,整个作业才会执行。

但是,sortBy 内部会涉及到 采样范围分区,这些过程是为了确保排序能够在多个分区上并行高效地完成,所有这些操作都在 Spark 内部的 task 中完成。runJob 会在行动算子执行时启动,但在执行过程中,rangePartitioner 的计算、数据的重新分区等步骤会被逐步执行。

3. 源码分析

我们可以通过查看 Spark 源码来更清楚地理解这些步骤。以下是关于 sortBy 和其内部处理的一些关键源码:

RDD.sortBy 源码
def sortBy[K: ClassTag, U: Ordering](f: T => K, ascending: Boolean = true, numPartitions: Int)(implicit ord: Ordering[K]): RDD[T] = {val partitioner = new RangePartitioner(numPartitions, this) // 使用 RangePartitionerval map = this.mapPartitionsWithIndex { (index, iter) =>// 计算分区内的排序val partitioned = iter.toArray.sortBy(f) partitioned.iterator}map
}

在这个方法中,RangePartitioner 被用来决定如何将数据分成多个分区。而在实际执行时,分区是通过 mapPartitionsWithIndex 来执行的。

RangePartitioner 源码
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {def getPartition(key: Any): Int = {// 根据 key 的范围来决定在哪个分区val partitionIndex = rangePartition(key)partitionIndex}def rangePartition(key: Any): Int = {// 进行采样,并将数据按范围分到对应的分区}
}

4. 触发计算的条件

  • sortBy 是一个 transformation 操作,它会生成一个新的 RDD,并不会立即执行排序。
  • RangePartitioner 会在后台进行数据的分区计算和范围分割,但这一切都不会触发作业执行,直到 action 操作 被调用。

5. 总结

  • sortBy 会利用 RangePartitioner 进行数据的分区和范围划分,这过程中会对数据进行采样以确定每个分区的范围。
  • 这个过程本身不会触发作业执行,只有当你执行一个 action 操作时(如 collect()saveAsTextFile()),Spark 才会触发计算,并启动实际的作业执行,进行排序和分区。

二、 RangePartitioner 的 采样过程

在 Spark 中,RangePartitioner采样过程 是其核心部分之一,它确保能够为数据分配适当的分区,并保证每个分区的数据范围在排序时能够合理地分布。这里我们将深入探讨 RangePartitioner 是如何通过采样来计算分区范围的。

1. RangePartitioner 概述

RangePartitioner 是 Spark 中的一个分区器,常用于按范围将数据进行分区。它通常用于类似 sortBy 这类需要全局排序的操作,目的是为了在分布式环境中进行高效的并行排序。

RangePartitioner 在执行分区时,会利用 采样 来估算每个分区的范围(即每个分区的边界)。这种采样过程通过从数据中提取一个小样本,帮助计算出数据在不同分区上的分布,从而保证数据能够均匀地分配到各个分区中。

2. RangePartitioner 采样过程

采样是 RangePartitioner 计算每个分区的范围的关键。这个过程涉及到以下步骤:

2.1 数据采样

RangePartitioner 会从数据中 随机采样 一部分元素,用来估算数据的分布和计算每个分区的边界。这个采样过程通常不会采用全部数据,而是通过一定比例的数据来进行推测。这是为了减少计算开销,同时确保分区的均衡性。

采样操作通常是在 分布式环境中并行执行 的,Spark 会在多个分区上并行地获取样本数据。

  • 采样的比例:采样比例通常是一个相对较小的数值,目的是减少计算量。Spark 内部会在每个分区中执行采样,以确保最终分区的边界能够反映整个数据集的分布。
2.2 计算分区边界

一旦采样完成,RangePartitioner 就会使用这些采样数据来计算每个分区的边界。这个过程基于采样数据的排序:

  • 排序样本数据:首先,对采样数据进行排序,确保数据可以按顺序进行分区。
  • 计算分割点:然后,RangePartitioner 会根据排序后的数据划分出多个边界点。这些边界点代表了每个分区的数据范围。例如,如果数据有 1000 个元素,并且要求将数据划分为 10 个分区,那么就会在排序后的数据中选取 9 个分割点。
2.3 创建分区

RangePartitioner 利用这些边界点来创建新的分区。数据根据其值所在的范围,决定落入哪个分区。具体来说,RangePartitioner 会为每个分区计算出一个边界值,然后将所有数据按这些边界值进行分配。

  • 分区计算:对于每个数据元素,RangePartitioner 会根据元素的值和这些边界值,决定该元素属于哪个分区。

3. 代码实现中的采样部分

在 Spark 的源码中,RangePartitioner 的采样过程是通过以下代码来实现的:

3.1 RangePartitioner 类中的采样
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {// 进行数据的采样val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)val sortedSample = sample.map(_._1).sortBy(identity)// 计算每个分区的分割点val splits = sortedSample.zipWithIndex.map { case (key, index) =>if (index % (sampleCount / partitions) == 0) key else null}.filter(_ != null)def getPartition(key: Any): Int = {// 根据采样的分割点进行分区var low = 0var high = partitions - 1while (low < high) {val mid = (low + high) / 2if (key < splits(mid)) high = mid - 1 else low = mid + 1}low}
}

在上面的代码中,sample 操作会对 RDD 中的数据进行采样,并将其按值排序。然后,通过分割排序后的数据,计算出每个分区的边界点。这些边界点随后用于 getPartition 方法中来确定数据的分配。

3.2 采样与排序
  • rdd.sample(withReplacement = false, fraction = 0.1):从原始 RDD 中采样 10% 的数据(fraction = 0.1),并且不进行重复采样。
  • sortBy(identity):对采样的数据进行排序,确保采样数据的顺序正确,便于后续计算边界。

4. 触发计算

在执行 sortBy 操作时,Spark 会根据 RangePartitioner 对数据进行采样、排序和分区计算。这些操作会在你执行 action 操作(如 collect()saveAsTextFile())时触发,具体的分区计算会在计算过程中完成。直到行动算子触发,计算过程才会开始,RangePartitioner 会根据采样数据生成分区,并最终执行数据的排序。

5. 总结

  • 采样RangePartitioner 会从数据中随机采样一部分元素(通常是 10% 或其他比例),用来估算数据的分布。
  • 排序与计算分区边界:采样数据被排序,并根据排序后的数据计算出每个分区的边界。这样可以确保数据均匀分配到不同的分区。
  • 数据分区:根据采样和计算出的边界,RangePartitioner 会将数据分配到相应的分区中。

通过这种采样与分区机制,RangePartitioner 能够高效地支持 Spark 的排序操作,使得数据在分布式环境中能够有效地并行处理。


三、举例介绍RangePartitioner采样过程

理解 RangePartitioner 如何通过采样来获得数据分布、计算边界,并将数据分配到相应分区的过程,确实比较抽象。我会通过一个简单的例子来帮助你更直观地理解这个过程。

问题场景

假设你有一个数据集,包含了以下的 10 个整数:

[10, 23, 1, 9, 15, 37, 2, 16, 40, 3]

你想用 RangePartitioner 来将这些数据分为 3 个分区,并且根据它们的值进行排序。

1. 采样数据

首先,为了计算每个分区的边界,RangePartitioner 会对数据集进行采样。假设我们采样 30% 的数据(即随机选择 3 个数据点)。假设采样到的数据是:

[10, 23, 3]

2. 排序采样数据

然后,对采样的数据进行排序,确保它们按大小排列。对于这个例子,排序后的采样数据是:

[3, 10, 23]

3. 计算分区边界

通过对采样数据进行排序,RangePartitioner 可以计算出分区的边界。在我们的例子中,我们有 3 个分区,因此我们需要为数据计算 2 个边界(因为 n 个分区需要 n-1 个边界)。

根据排序后的采样数据 [3, 10, 23]RangePartitioner 可以选择分割点来确定边界:

  • 第一个边界:选择采样数据的第一个元素(3)。
  • 第二个边界:选择采样数据的最后一个元素(23)。

现在我们有了两个边界:

  • 分区 1:所有小于 10 的数据
  • 分区 2:所有大于等于 10 小于 23 的数据
  • 分区 3:所有大于等于 23 的数据

4. 分配数据到分区

接下来,RangePartitioner 会根据这些边界将数据分配到相应的分区中。具体的分区规则是:

  • 分区 1:所有小于 10 的元素 → [1, 2, 3, 9]
  • 分区 2:所有大于等于 10 且小于 23 的元素 → [10, 15, 16]
  • 分区 3:所有大于等于 23 的元素 → [23, 37, 40]

所以最终的分区结果是:

  • 分区 1[1, 2, 3, 9]
  • 分区 2[10, 15, 16]
  • 分区 3[23, 37, 40]

5. 总结过程

通过这个例子,我们可以看到 RangePartitioner 的整个过程:

  1. 采样数据:从整个数据集中随机抽取一部分数据(这里是 30%)。
  2. 排序采样数据:对采样数据进行排序,确保我们能根据数据的范围计算边界。
  3. 计算分区边界:根据排序后的采样数据,选择边界来划分数据(例如第一个和最后一个元素)。
  4. 分配数据到分区:根据边界将所有数据分配到相应的分区中。

6. 实际执行的情况

  • 采样比例:在实际的 Spark 中,采样比例并不一定是 30%,通常是根据数据的大小和分区数量进行调整的。采样可以确保 RangePartitioner 在计算边界时不会消耗过多资源。
  • 多个分区:如果数据集更大,分区数量更多,RangePartitioner 会选择更多的采样点来划分分区。边界点会根据排序后的采样数据来动态选择。

7. 关键源码中的采样部分

在实际 Spark 的源码中,采样是通过 sample 方法实现的:

val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
val sortedSample = sample.map(_._1).sortBy(identity)

然后通过这些采样的排序数据,计算每个分区的边界。例如,当分区数量是 3 时,RangePartitioner 会选取采样数据的前几个元素作为边界,并用这些边界来确定每个分区的范围。


8. 进一步优化

在实际使用中,Spark 的 RangePartitioner 会通过自适应调整采样的比例和算法来优化性能,确保在处理大型数据集时依然高效。在某些情况下,Spark 会使用更智能的策略来决定采样的方式,以便在并行处理中避免过多的计算开销。

总结

通过采样、排序和计算边界,RangePartitioner 确保了数据可以均匀地分配到不同的分区中,从而为排序等操作提供并行化的支持。这一过程使得 Spark 在处理大规模数据时能够有效地进行全局排序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入探讨 Puppeteer 如何使用 X 和 Y 坐标实现鼠标移动

背景介绍 现代爬虫技术中&#xff0c;模拟人类行为已成为绕过反爬虫系统的关键策略之一。无论是模拟用户点击、滚动&#xff0c;还是鼠标的轨迹移动&#xff0c;都可以为爬虫脚本带来更高的“伪装性”。在众多的自动化工具中&#xff0c;Puppeteer作为一个无头浏览器控制库&am…

【学术论文投稿】JavaScript 前端开发:从入门到精通的奇幻之旅

【中文核刊&普刊投稿通道】2024年体育科技与运动表现分析国际学术会议(ICSTPA 2024)_艾思科蓝_学术一站式服务平台 更多学术会议论文投稿请看&#xff1a;https://ais.cn/u/nuyAF3 目录 一、引言 二、JavaScript 基础 &#xff08;一&#xff09;变量与数据类型 &am…

java实现枚举

介绍 枚举算法也叫暴力算法&#xff0c;是一种简单直接的问题求解方法&#xff0c;它通过逐一列举问题的所有可能解&#xff0c;并检查每个可能解是否符合问题的条件&#xff0c;直到找到正确的解或者遍历完所有可能的情况。 对于一个问题&#xff0c;首先确定解的范围&#x…

Leetcode 最长回文子串

目录 解法1&#xff1a;递归算法 解法2&#xff1a;Map取同字母位置法 解法3&#xff1a;中心扩展法 解法4&#xff1a;动态规划法 解法5: Manacher算法 示例 1&#xff1a; 输入&#xff1a;s "babad" 输出&#xff1a;"bab" 解释&#xff1a;&quo…

B树的简单实现

template<class K, size_t M> struct BTreeNode {K _keys[M]; // 用于存储关键字的数组&#xff0c;最多容纳 M 个关键字&#xff08;超额一个&#xff0c;为分裂提供空间&#xff09;。BTreeNode<K, M>* _subs[M 1]; // 存储子节点的指针数组&#xff0c;最多 M1…

2020 年 9 月青少年软编等考 C 语言三级真题解析

目录 T1. 因子问题思路分析T2. 质数的和与积思路分析T3. 括号匹配问题思路分析T4. 吃糖果 2思路分析T5. 铺砖思路分析T1. 因子问题 任给两个正整数 n n n、 m m m,求一个最小的正整数 a a a,使得 a a a 和 ( m − a ) (m-a) (m−a) 都是 n n n 的因子。 时间限制:1 s…

SpringBoot(8)-任务

目录 一、异步任务 二、定时任务 三、邮件任务 一、异步任务 使用场景&#xff1a;后端发送邮件需要时间&#xff0c;前端若响应不动会导致体验感不佳&#xff0c;一般会采用多线程的方式去处理这些任务&#xff0c;但每次都需要自己去手动编写多线程来实现 1、编写servic…

React的诞生与发展

React诞生于2013年&#xff0c;由Facebook&#xff08;现Meta&#xff09;的工程师Jordan Walke开发。那时的前端开发还处在jQuery的天下&#xff0c;组件化的概念尚未形成。React的出现犹如一阵春风&#xff0c;为前端开发带来了全新的开发理念和方法论。 React最核心的设计理…

WebStorm 2022.3.2/IntelliJ IDEA 2024.3出现elementUI提示未知 HTML 标记、组件引用爆红等问题处理

WebStorm 2022.3.2/IntelliJ IDEA 2024.3出现elementUI提示未知 HTML 标记、组件引用爆红等问题处理 1. 标题识别elementUI组件爆红 这个原因是&#xff1a; 在官网说明里&#xff0c;才版本2024.1开始&#xff0c;默认启用的 Vue Language Server&#xff0c;但是在 Vue 2 项…

Odoo :免费且开源的农牧行业ERP管理系统

文 / 开源智造Odoo亚太金牌服务 引言 提供农牧企业数字化、智能化、无人化产品服务及全产业链高度协同的一体化解决方案&#xff0c;提升企业智慧种养、成本领先、产业互联的核心竞争力。 行业典型痛点 一、成本管理粗放&#xff0c;效率低、管控弱 产品研发过程缺少体系化…

解决Excel文件流读取数字为时间乱码问题

在将Excel文件流转换为Java中的List时&#xff0c;如果遇到文本被错误地识别为日期格式的问题&#xff0c;这通常是由于Apache POI库在处理单元格数据时默认的行为所导致的。Apache POI会尝试根据单元格的内容自动确定其类型&#xff0c;包括字符串、数字&#xff08;可能解释为…

【Unity踩坑】出现d3d11问题导致编辑器崩溃

升级到Unity 6&#xff0c;有时出现下面这种D3D11的问题&#xff0c;会导致编辑器崩溃。 有人总结了这个问题的解决方法&#xff0c;可以做为参考&#xff1a; Failed to present D3D11 swapchain due to device reset/removed. List of Solutions - Unity Engine - Unity Dis…

数据库基础(MySQL)

1. 数据库基础 1.1 什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问题文件不利于数据查询和管理文件不利于存储海量数据文件在程序中控制不方便 数据库存储介质&#xff1a; 磁盘内存 为…

JMeter监听器与压测监控之Grafana

Grafana 是一个开源的度量分析和可视化套件&#xff0c;通常用于监控和观察系统和应用的性能。本文将指导你如何在 Kali Linux 上使用 Docker 来部署 Grafana 性能监控平台。 前提条件 Kali Linux&#xff1a;确保你已经安装了 Kali Linux。Docker&#xff1a;确保你的系统已…

集群聊天服务器(13)redis环境安装和发布订阅命令

目录 环境安装订阅redis发布-订阅的客户端编程环境配置客户端编程 功能测试 环境安装 sudo apt-get install redis-server 先启动redis服务 /etc/init.d/redis-server start默认在6379端口上 redis是存键值对的&#xff0c;还可以存链表、数组等等复杂数据结构 而且数据是在…

linux常用指令总结(附Vim编辑器学习总结)

本文是博主对Linux中经常用到的一些指令进行的总结&#xff0c;文章也附带了Linux中经常用到的Vim编辑器的一些基本知识和使用指令&#xff0c;觉得有帮助的朋友可以点赞收藏&#xff01; 本文会持续进行更新 linux常用指令总结 $ pwd # 查看当前终端所在…

w046基于web的古典舞在线交流平台的设计与实现

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;原创团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文…

阿里云轻量应用服务器可以用在哪些场景呢

在数字化转型的浪潮中&#xff0c;中小企业面临着如何快速、高效地上云的挑战。阿里云轻量应用服务器&#xff08;SWAS&#xff09;作为一款专为中小企业设计的云服务产品&#xff0c;提供了简单易用、经济实惠的解决方案&#xff0c;助力企业轻松实现云端部署&#xff0c;赋能…

植物明星大乱斗15

能帮到你的话&#xff0c;就给个赞吧 &#x1f618; 文章目录 player.hplayer.cppparticle.hparticle.cpp player.h #pragma once #include <graphics.h> #include "vector2.h" #include "animation.h" #include "playerID.h" #include &…

爬虫开发工具与环境搭建——使用Postman和浏览器开发者工具

第三节&#xff1a;使用Postman和浏览器开发者工具 在网络爬虫开发过程中&#xff0c;我们经常需要对HTTP请求进行测试、分析和调试。Postman和浏览器开发者工具&#xff08;特别是Network面板和Console面板&#xff09;是两种最常用的工具&#xff0c;能够帮助开发者有效地捕…