把每个Transformations算子都敲着练习几遍会理解的更深刻
Transformations算子之后要写action算子才会进行计算。
1. map(func)
描述:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)val numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(x => x * x)resultRDD.foreach(println)}
结果:
1
4
9
16
25
36
2. filter(func)
描述:返回一个新的RDD,该RDD经过func函数计算后返回值为true的输入元素组成
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)//parallelize()创建个rddval numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(_%2 == 0)resultRDD.foreach(println)resultRDD.take(100).foreach(println)resultRDD.collect()}
结果:
false
true
false
true
false
true
3.flatMap(func)
描述:类似map,到每个输入元素可以被映射为0个或者多个输入元素(所以func返回一个序列,而不是一个元素)
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val words = Array("hello python","hello hadoop","hello spark")val wordRDD = sc.parallelize(words)wordRDD.flatMap(_.split(" ")).collect.foreach(println)}
结果:
hello
python
hello
hadoop
hello
spark
4.mapPartitions(func)
描述:类似map,但独立在RDD的每个分区上运行,因此在类型为T的RDD上运行时,,func函数的类型必须是Iterator => Iterator
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val array = Array(1,2,1,2,2,3,4,5,6,7,8,9)val arrayRDD = sc.parallelize(array)arrayRDD.mapPartitions(elements =>{val result = new ArrayBuffer[Int]()elements.foreach(e =>{result +=e})result.iterator}).foreach(println)}
结果:
121223456789
5.mapPartitionsWithIndex(func)
描述:类似于mapPartitions,但func带有一个整形参数表示分片的索引值,因此在类型为T的RDD上运行时func函数的类型必须(int,Iterator)=> Iterator
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2) //2表示分区数arrayRDD.mapPartitionsWithIndex((index,elements) =>{println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)}
运行结果:
partition index:0
1
2
3
4partition index:1
5
6
7
8
9
6.sample(WithReplacement,fraction,seed)
描述:根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(true,0.001) //true表示抽样之后放回println(sampleRDD.count())}结果:10
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(false,0.001) //false表示抽样之后不放回println(sampleRDD.count())结果:9}
7.union(otherDataset)
描述:对源RDD和参数RDD求并集后并返回一个新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)val resultRDD = rdd1.union(rdd2)resultRDD.foreach(print)}
结果:
11121314151617181920
8.intersection(otherDataset)
描述:对源RDD和参数RDD求交集后并返回一个新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Array(1,3,5,7,8))val rdd2 = sc.parallelize(Array(3,5,7))val resultRDD = rdd1.intersection(rdd2)resultRDD.foreach(println)}
结果:
3
7
5
9.distinct([numTasks])
描述:对源RDD进行去重,返回一个新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(Tuple3("max","math",90),("max","englist",85),("mike","math",100))val scoreRDD = sc.parallelize(arr)val studentNumber = scoreRDD.map(_._1).distinct().collect()println(studentNumber.mkString(","))}
结果:
max,mike
10.groupByKey([numTasks])
描述:在一个(k,v)形式的RDD上调用,返回一个(k,Iterator[V])的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)var x =0val arr = Array("chun1 chun2 chun3 chun1 chun1 chun2", "chun1")val arrRDD = sc.parallelize(arr)val resultRDD = arrRDD.flatMap(_.split(" ")).map((_,1)).groupByKey()//resultRDD.foreach(println)resultRDD.foreach(element => {println(element._1+" "+element._2.size)})}
chun1 4
chun3 1
chun2 2
11.reduceByKey(func,[numTasks])
描述:在一个(k,v)形式的RDD上调用,返回一个(k,v)的RDD,使用指定的reduce函数,将相同key的值聚集到一起,与groupBy类似,reudce任务的个数可以通过第二个参数来设置
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr =Array("chun1 chun2 chun3 chun1 chun1 chun2","chun1")val arrRDD=sc.parallelize(arr)val resultRDD = a.flatMap(_.split(" ")).map(x=>((x,1))).reduceByKey(_+_).collect.foreach(println)}
结果:
(chun1,4)
(chun3,1)
(chun2,2)
12.aggregateByKey(zeroValue)(seqOp,combOP,[numTasks])
描述:当调用(k,v)对的数据集时,返回(K,U)数据集,其中每个key的值使用给定的聚合函数和中性‘零’进行聚合,与groupyKey类似,reduce任务的数量可以通过可选的第二个参数进行配置
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val data = List((1,3),(1,4),(2,3),(3,6),(1,2),(3,8))val rdd =sc.parallelize(data)rdd.aggregateByKey(0)(math.max(_,_),_+_).collect(.foreach(println()))}
结果:(1,4)
(3,8)
(2,3)
13.sortByKey([ascending],[numTasks])
描述:在一个(k,v)形式的RDD上调用,k必须实现Ordered接口,返回一个按照key进行排序的(k,v)的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val scores = Array(Tuple2("mike",80),("max",90),("bob",100))val scoresRDD = sc.parallelize(scores)val sortByKeyRDD = scoresRDD.map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1) //把元组k,v换位值进行排序后,再换回来)sortByKeyRDD.collect.foreach(println)}
(bob,100)
(max,90)
(mike,80)
14.join(otherDataset,[numTasks])
描述:当调用(k,v)和(k,w)类型的数据集时,返回一个(k,(v,w))形式的数据集,支持left outer join、right outer join 和full outer join
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)//学生信息val students = Array(Tuple2(1,"max"),Tuple2(2,"mike"),Tuple2(3,"bob"))//分数val scores = Array(Tuple2(1,90),Tuple2(2,120),Tuple2(3,80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)//两组kv对join,返回的是(k,(v,w))val resultRDD = stuRDD.join(scoresRDD).sortByKey()resultRDD.foreach(x => {println("id:" +x._1 +" name:"+x._2._1 + " score:"+x._2._2)println("=========================")})}
结果:id:1 name:max score:90
=========================
id:2 name:mike score:120
=========================
id:3 name:bob score:80
=========================
15.cogroup(otherDataset,[numTasks])
描述:当调用(k,v)和(k,w)类型的数据集时,返回(k,(Iterator,Iterator))元组的数据集
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun1").setMaster("local")val sc = new SparkContext(conf)//学生信息val students = Array(("class1","max"),("class1","mike"),("class2","bob"))//分数val scores = Array(("class1",90),("class1",120),("class2",80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)val resultRDD = stuRDD.cogroup(scoresRDD).sortByKey()resultRDD.foreach(x =>{println("class:"+x._1)x._2._1.foreach(println)x._2._2.foreach(println) //可以去掉只显示名字println("===========")})}
结果:class:class1
max
mike
90
120
===========
class:class2
bob
80
===========
16.cartesian(otherDataset)
描述:当调用T和U类型的数据集时,返回一个(T,U)类型的数据集
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr1 = sc.parallelize(Array(1,3,5))val arr2 = sc.parallelize(Array(2,4,6))arr1.cartesian(arr2).collect().foreach(println)}
(1,2)
(1,4)
(1,6)
(3,2)
(3,4)
(3,6)
(5,2)
(5,4)
(5,6)
17.pipe(command,[envVars])
描述:通过shell命令(例如perl或bash脚本)对RDD的每个分区进行管道连接。RDD元素写入进程的stdin,输出到其stdout的行作为字符串的RDD返回
18.coalesce(numpartitions)
描述:将RDD中的分区数减少到numpartitions,在过滤大型数据集后,可以更高效地运行操作
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 20,10)println(rdd1.partitions.length) //10var rdd2 = rdd1.coalesce(15,true)println(rdd2.partitions.length) //15var rdd3 = rdd1.repartition(15)println(rdd3.partitions.length) //15var rdd4 = rdd1.coalesce(15,false) //这种是不可以重新分区的println(rdd4.partitions.length) //10var rdd5 = rdd1.coalesce(2,false)println(rdd5.partitions.length) //2rdd5.foreach(print) //第一个区:12345678910 第二个区:11121314151617181920var rdd6 = rdd1.coalesce(2,true)println(rdd6.partitions.length) //2rdd6.foreach(print) //第一个区:135791113151719 第二个区:2468101214161820}
19.repartiton(numPartitions)
描述:随机重组RDD中的数据,以创建更多或更少的分区,并在分区之间进行平衡,总是会产生shuffle操作
repartition和coalesce
他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)
1)、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
3)如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。
总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的。
20.repartitionAndSortWithinPartitions(partitioner)
描述:根据给定的分区重新分区RDD,在每个结果分区中,根据它们的键对记录进行排序。这比调用重新分区更有效,然后在每个分区中进行排序,因为它可以将排序推入到洗牌机器中。
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) //3表示分区数arrayRDD.mapPartitionsWithIndex((index,elements) =>{ //index为索引值,elements数据println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)}
结果:
partition index:0
1
2
3
partition index:1
4
5
6
partition index:2
7
8
9