Spark的Transformations算子(理解+实例)

把每个Transformations算子都敲着练习几遍会理解的更深刻

Transformations算子之后要写action算子才会进行计算。

1. map(func)

描述:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)val numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(x => x * x)resultRDD.foreach(println)}
结果:
1
4
9
16
25
36

2. filter(func)

描述:返回一个新的RDD,该RDD经过func函数计算后返回值为true的输入元素组成

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)//parallelize()创建个rddval numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(_%2 == 0)resultRDD.foreach(println)resultRDD.take(100).foreach(println)resultRDD.collect()}
结果:
false
true
false
true
false
true

3.flatMap(func)

描述:类似map,到每个输入元素可以被映射为0个或者多个输入元素(所以func返回一个序列,而不是一个元素)

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val words = Array("hello python","hello hadoop","hello spark")val wordRDD = sc.parallelize(words)wordRDD.flatMap(_.split(" ")).collect.foreach(println)}
结果:
hello
python
hello
hadoop
hello
spark

4.mapPartitions(func)

描述:类似map,但独立在RDD的每个分区上运行,因此在类型为T的RDD上运行时,,func函数的类型必须是Iterator => Iterator

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val array = Array(1,2,1,2,2,3,4,5,6,7,8,9)val arrayRDD = sc.parallelize(array)arrayRDD.mapPartitions(elements =>{val result = new ArrayBuffer[Int]()elements.foreach(e =>{result +=e})result.iterator}).foreach(println)}
结果:
121223456789

5.mapPartitionsWithIndex(func)

描述:类似于mapPartitions,但func带有一个整形参数表示分片的索引值,因此在类型为T的RDD上运行时func函数的类型必须(int,Iterator)=> Iterator

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2)  //2表示分区数arrayRDD.mapPartitionsWithIndex((index,elements) =>{println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)}
运行结果:
partition index:0
1
2
3
4partition index:1
5
6
7
8
9

6.sample(WithReplacement,fraction,seed)

描述:根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(true,0.001)    //true表示抽样之后放回println(sampleRDD.count())}结果:10
 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(false,0.001)  //false表示抽样之后不放回println(sampleRDD.count())结果:9}

7.union(otherDataset)

描述:对源RDD和参数RDD求并集后并返回一个新的RDD

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)val resultRDD = rdd1.union(rdd2)resultRDD.foreach(print)}
结果:
11121314151617181920

8.intersection(otherDataset)

描述:对源RDD和参数RDD求交集后并返回一个新的RDD

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Array(1,3,5,7,8))val rdd2 = sc.parallelize(Array(3,5,7))val resultRDD = rdd1.intersection(rdd2)resultRDD.foreach(println)}
结果:
3
7
5

9.distinct([numTasks])

描述:对源RDD进行去重,返回一个新的RDD

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(Tuple3("max","math",90),("max","englist",85),("mike","math",100))val scoreRDD = sc.parallelize(arr)val studentNumber = scoreRDD.map(_._1).distinct().collect()println(studentNumber.mkString(","))}
结果:
max,mike

10.groupByKey([numTasks])

描述:在一个(k,v)形式的RDD上调用,返回一个(k,Iterator[V])的RDD

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)var x =0val arr = Array("chun1 chun2 chun3 chun1 chun1 chun2", "chun1")val arrRDD = sc.parallelize(arr)val resultRDD = arrRDD.flatMap(_.split(" ")).map((_,1)).groupByKey()//resultRDD.foreach(println)resultRDD.foreach(element => {println(element._1+" "+element._2.size)})}
chun1 4
chun3 1
chun2 2

11.reduceByKey(func,[numTasks])

描述:在一个(k,v)形式的RDD上调用,返回一个(k,v)的RDD,使用指定的reduce函数,将相同key的值聚集到一起,与groupBy类似,reudce任务的个数可以通过第二个参数来设置

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr =Array("chun1 chun2 chun3 chun1 chun1 chun2","chun1")val arrRDD=sc.parallelize(arr)val resultRDD = a.flatMap(_.split(" ")).map(x=>((x,1))).reduceByKey(_+_).collect.foreach(println)}
结果:
(chun1,4)
(chun3,1)
(chun2,2)

12.aggregateByKey(zeroValue)(seqOp,combOP,[numTasks])

描述:当调用(k,v)对的数据集时,返回(K,U)数据集,其中每个key的值使用给定的聚合函数和中性‘零’进行聚合,与groupyKey类似,reduce任务的数量可以通过可选的第二个参数进行配置

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val data = List((1,3),(1,4),(2,3),(3,6),(1,2),(3,8))val rdd =sc.parallelize(data)rdd.aggregateByKey(0)(math.max(_,_),_+_).collect(.foreach(println()))}
结果:(1,4)
(3,8)
(2,3)

13.sortByKey([ascending],[numTasks])

描述:在一个(k,v)形式的RDD上调用,k必须实现Ordered接口,返回一个按照key进行排序的(k,v)的RDD

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val scores = Array(Tuple2("mike",80),("max",90),("bob",100))val scoresRDD = sc.parallelize(scores)val sortByKeyRDD = scoresRDD.map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)         //把元组k,v换位值进行排序后,再换回来)sortByKeyRDD.collect.foreach(println)}
(bob,100)
(max,90)
(mike,80)

14.join(otherDataset,[numTasks])

描述:当调用(k,v)和(k,w)类型的数据集时,返回一个(k,(v,w))形式的数据集,支持left outer join、right outer join 和full outer join

 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)//学生信息val students = Array(Tuple2(1,"max"),Tuple2(2,"mike"),Tuple2(3,"bob"))//分数val scores = Array(Tuple2(1,90),Tuple2(2,120),Tuple2(3,80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)//两组kv对join,返回的是(k,(v,w))val resultRDD = stuRDD.join(scoresRDD).sortByKey()resultRDD.foreach(x => {println("id:" +x._1 +" name:"+x._2._1 + " score:"+x._2._2)println("=========================")})}
结果:id:1 name:max score:90
=========================
id:2 name:mike score:120
=========================
id:3 name:bob score:80
=========================

15.cogroup(otherDataset,[numTasks])

描述:当调用(k,v)和(k,w)类型的数据集时,返回(k,(Iterator,Iterator))元组的数据集

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun1").setMaster("local")val sc = new SparkContext(conf)//学生信息val students = Array(("class1","max"),("class1","mike"),("class2","bob"))//分数val scores = Array(("class1",90),("class1",120),("class2",80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)val resultRDD = stuRDD.cogroup(scoresRDD).sortByKey()resultRDD.foreach(x =>{println("class:"+x._1)x._2._1.foreach(println)x._2._2.foreach(println)  //可以去掉只显示名字println("===========")})}
结果:class:class1
max
mike
90
120
===========
class:class2
bob
80
===========

16.cartesian(otherDataset)

描述:当调用T和U类型的数据集时,返回一个(T,U)类型的数据集

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr1 = sc.parallelize(Array(1,3,5))val arr2 = sc.parallelize(Array(2,4,6))arr1.cartesian(arr2).collect().foreach(println)}
(1,2)
(1,4)
(1,6)
(3,2)
(3,4)
(3,6)
(5,2)
(5,4)
(5,6)

17.pipe(command,[envVars])

描述:通过shell命令(例如perl或bash脚本)对RDD的每个分区进行管道连接。RDD元素写入进程的stdin,输出到其stdout的行作为字符串的RDD返回

18.coalesce(numpartitions)

描述:将RDD中的分区数减少到numpartitions,在过滤大型数据集后,可以更高效地运行操作

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 20,10)println(rdd1.partitions.length) //10var rdd2 = rdd1.coalesce(15,true)println(rdd2.partitions.length) //15var rdd3 = rdd1.repartition(15)println(rdd3.partitions.length) //15var rdd4 = rdd1.coalesce(15,false) //这种是不可以重新分区的println(rdd4.partitions.length) //10var rdd5 = rdd1.coalesce(2,false)println(rdd5.partitions.length) //2rdd5.foreach(print) //第一个区:12345678910 第二个区:11121314151617181920var rdd6 = rdd1.coalesce(2,true)println(rdd6.partitions.length) //2rdd6.foreach(print) //第一个区:135791113151719 第二个区:2468101214161820}

19.repartiton(numPartitions)

描述:随机重组RDD中的数据,以创建更多或更少的分区,并在分区之间进行平衡,总是会产生shuffle操作


repartition和coalesce

他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)
1)、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
3)如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。
总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的。

20.repartitionAndSortWithinPartitions(partitioner)

描述:根据给定的分区重新分区RDD,在每个结果分区中,根据它们的键对记录进行排序。这比调用重新分区更有效,然后在每个分区中进行排序,因为它可以将排序推入到洗牌机器中。

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) //3表示分区数arrayRDD.mapPartitionsWithIndex((index,elements) =>{  //index为索引值,elements数据println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)}
结果:
partition index:0
1
2
3
partition index:1
4
5
6
partition index:2
7
8
9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437648.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【转】掀起Azure AD的盖头来——深入理解Microsoft Graph应用程序和服务权限声明

引子 这是一篇计划外的文章。我们都知道要进行Microsoft Graph的开发的话&#xff0c;需要进行应用程序注册。这个在此前我已经有专门的文章写过了。但这里存在一个小的问题&#xff1a;国内版的Office 365在申请好之后&#xff0c;并没有像国际版那样&#xff0c;有一个对应的…

Python 异步库 asyncio、aiohttp

asyncio 版本支持 asyncio 模块在 Python3.4 时发布。async 和 await 关键字最早在 Python3.5 中引入。Python3.3 之前不支持。 关键概念 event_loop 事件循环&#xff1a;程序开启一个无限的循环&#xff0c;程序员会把一些函数&#xff08;协程&#xff09;注册到事件循环…

Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

看longAccumulator()方法源码里是val acc new LongAccumulator然后用register(acc)在Spark中注册了累加器&#xff0c;进入ctrl鼠标左键进入LongAccumulator&#xff0c;可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器 实现类 //1.继…

Python3 学习系列 丨 博客目录索引

整个博客有关 Python 学习目录索引&#xff0c;方便快捷定位查询基础学习篇 Python3 基础学习笔记 C01【变量和简单数据类型】Python3 基础学习笔记 C02【列表】Python3 基础学习笔记 C03【操作列表】Python3 基础学习笔记 C04【if 语句】Python3 基础学习笔记 C05【字典】Pyt…

【转】日邮物流:实现智慧物流,这个云上对了!

和阳光、空气、水、网络一样&#xff0c;「物流」早已成为当代企业、个人赖以生存的必要条件。2020第一季度全球物流受疫情影响面临挑战&#xff0c;业内普遍预计全球物流及供应链将重新优化布局。借此时机&#xff0c;物流业纷纷将目光投向“数字化智慧物流”方向&#xff0c;…

Spark-三大数据结构之-广播变量

什么是广播变量 分布式只读共享变量 首先广播变量是一个调优策略(可以减少数据的传输&#xff0c;也就是数据从driver传输到executor) (每一个executor都要传list数据&#xff0c;如果数据太多就很慢&#xff0c;采用广播变量他是一个共享只读变量&#xff0c;可以减少数据传…

Python 实现十大经典排序算法

目录排序算法分类一、冒泡排序&#xff08;Bubble Sort&#xff09;1、原理2、步骤3、动画演示4、代码实现5、具体示例二、选择排序&#xff08;Selection Sort&#xff09;1、原理2、步骤3、动画演示4、代码实现5、具体示例三、插入排序&#xff08;Insertion Sort&#xff09…

【转】Microsoft Graph 桌面应用程序

桌面应用程序&#xff0c;在我这篇文章的语境中&#xff0c;我是特指在Windows桌面上面直接运行的.NET应用程序&#xff0c;包括Console Application&#xff0c;WPF Application&#xff0c;Windows Forms Application, UWP Application&#xff0c;并且限于篇幅&#xff0c;我…

【转】Microsoft Graph Web应用程序极致开发体验

前言 这篇文章最早写于2017年5月2日&#xff0c;当时的想法是从最简单的方式来写如何在一个ASP.NET MVC应用程序中集成Microsoft Graph&#xff0c;但实际上还真不是那么简单&#xff0c;至少我是不满意的&#xff0c;加上这一两周都比较忙&#xff0c;所以这一篇就一直搁置。…

Spark(idea)操作mysql进行查询和插入 (代码+理解)

首先在maven中加入配置 <!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>然后在idea配置数据库 1&#xff09; 查询 //1.查询数…

【转】在无人值守程序(服务)中调用Microsoft Graph

什么是无人值守程序&#xff08;服务&#xff09; 我在此前用了几篇文章分别介绍了在桌面应用程序&#xff08;控制台&#xff09;&#xff0c;Web应用程序&#xff08;ASP.NET MVC&#xff09;&#xff0c;以及PowerSehll脚本中如何访问Microsoft Graph&#xff0c;今天这一篇…

MapReduce的shuffle阶段

Shuffle 为何需要shuffle • Reduce阶段的数据来源于不同的Map Shuffle由Map端和Reduce端组成 Shuffle的核心机制 • 数据分区排序 Map端 • 对Map输出结果进行spill Reduce端 • 拷贝Map端输出结果到本地 • 对拷贝的数据进行归并排序Shuffle Map端 Map端会源源不断的把数据输…

【转】使用PowerApps快速构建基于主题的轻业务应用 —— 入门篇

前言 在上一篇文章 基于Office 365的随需应变业务应用平台 中我提到&#xff0c;随着随需应变的业务需要&#xff0c;以及技术的发展&#xff0c;业务应用的开发的模式也有了深刻的变化。基于微软的平台&#xff0c;有服务于主干业务应用的Dynamic 365 业务应用平台&#xff0…

Spark内核源码学习(暂未学完)

1&#xff09; 回顾 1.1 Spark通用运行流程概述 在submit任务条件是需要指定executo个数&#xff0c;executor-CUP个数&#xff0c;可以提高并行度。 什么是并行&#xff0c;什么是并发&#xff1f; 并发&#xff1a;假如有多个任务task&#xff0c;并行是在一个cup中&#x…

【转】使用PowerApps快速构建基于主题的轻业务应用 —— 进阶篇

在上一篇 使用PowerApps快速构建基于主题的轻业务应用 —— 入门篇 中&#xff0c;我用了三个实际的例子演示了如何快速开始使用PowerApps构建轻业务应用&#xff0c;你可能已经发现&#xff0c;我都是使用默认生成的设置&#xff0c;没有做任何修改。当然&#xff0c;那样做出…

Spark一些组件的定义

Driver program: 运行应用程序的main函数并创建SparkContext的进程 除了RDD的最终执行所写的业务逻辑&#xff0c;剩下的都在Driver里生成&#xff0c;Driver端执行action算子才会到开始执行所创建的DAG-RDD图。 Cluster manager&#xff1a; 用于获取集群资源外部服务 Mas…

【转】D365 FO第三方集成(二)---访问认证(获取访问令牌)

D365 FO 在github上发布了第三方访问D365 FO的示例代码&#xff0c;里面包含了各种调用示例&#xff0c;代码很清晰。https://github.com/microsoft/Dynamics-AX-Integration 这篇blog简单分析一下代码中获取访问令牌的部分代码。 与获取访问令牌相关的代码有两个类ClientConfi…

SparkSQL练习+理解+详解

def main(args: Array[String]): Unit {//创建配置对象val conf new SparkConf().setAppName("SparkSQL01_demo").setMaster("local[*]")val session SparkSession.builder().config(conf).getOrCreate()//创建RDD (session里包含sparkContext&#xf…

【转】D365 FO第三方集成(三)---服务实现

D365 FO的Custom Service的实现比AX2012简单了很多。 AX2012服务方法要用属性SysEntryPointAttribute标记&#xff0c;添加到Services以后&#xff0c;还要发布服务并在系统管理入站端口添加操作&#xff0c;服务运行在CIL下&#xff0c;所以每次改动服务方法的代码都要增量生成…

SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

AVG是求平均值&#xff0c;所以输出类型是Double类型 1&#xff09;创建弱类型聚合函数类extends UserDefinedAggregateFunction class MyAgeFunction extends UserDefinedAggregateFunction {//函数输入的数据结构&#xff0c;需要new一个具体的结构对象&#xff0c;然后添加…