跟网站开发公司签合同主要要点wordpress阅读次数自动增长
web/
2025/10/3 0:50:22/
文章来源:
跟网站开发公司签合同主要要点,wordpress阅读次数自动增长,个人资料展示网站,wordpress rss feedSpark算子 文章目录Spark算子一、转换算子coalesce函数repartition函数flatMap——flatMap变换sample——抽样zip——联结mapValues——对Value值进行变换二、行动Action算子数据运算类行动算子reduce——Reduce操作collect——收集元素countByKey——按Key值统计Key/Value型RD…Spark算子 文章目录Spark算子一、转换算子coalesce函数repartition函数flatMap——flatMap变换sample——抽样zip——联结mapValues——对Value值进行变换二、行动Action算子数据运算类行动算子reduce——Reduce操作collect——收集元素countByKey——按Key值统计Key/Value型RDD中的元素个数countByValue——统计RDD中元素值出现的次数foreach——逐个处理RDD元素lookup——查找元素take——获取前n个元素takeSample——提取n个元素takeOrdered——获取排序后的前n个元素存储型行动算子saveAsObjectFile——存储为二进制文件saveAsTextFile——存储为文本文件saveAsNewAPIHadoopFile——存储为Hadoop文件三、缓存算子cache——缓存RDDcheckpoint——建立RDD的检查点persist——持久化RDD一、转换算子
coalesce函数
返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖例如你将1000个分区转换成100个分区这个过程不会发生shuffle相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区例如合并成一个分区这会导致你的计算在少数几个集群节点上计算言外之意并行度不够。为了避免这种情况你可以将第二个shuffle参数传递一个true这样会在重新分区过程中多一步shuffle这意味着上游的分区可以并行运行。
注意第二个参数shuffletrue将会产生多于之前的分区数目例如你有一个个数较少的分区假如是100调用coalesce(1000, shuffle true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个对于提高并行度是非常有用的。如果shuff为false时如果传入的参数大于现有的分区数目RDD的分区数不变也就是说不经过shuffle是无法将RDD的partition数变多的
repartition函数
返回一个恰好有numPartitions个分区的RDD可以增加或者减少此RDD的并行度。内部这将使用shuffle重新分布数据如果你减少分区数考虑使用coalesce这样可以避免执行shuffle
Repartition函数内部调用了coalesce函数 shuffle 为True
def repartition(numPartitions: Int)(implicit ord: Ordering[T] null): RDD[T] withScope {coalesce(numPartitions, shuffle true)}flatMap——flatMap变换
算子函数格式:
flatMap[U](f:FlatMapFunction[T,U]):JavaRDD[U]
在前面我们已经了解到map变换是对原RDD中的每个元素进行一对一变换生成
新RDD,而flatMap不同的地方在于,它是对原RDD中的每个元素用指定函数f进行一
对多(这也是lat前缀的由来)的变换,然后将转换后的结果汇聚生成新RDD.
示例:
flatMap示例代码
scalavalrddsc,parallelize(0 to 3,1)//生成由0-3序列构成的RDDrdd:org.apache,spark.rdd.RDD[Int]ParallelCollectionRDD[17] at parallelize at:21scalaval flatMappedRDDrdd.flatMap(x0tox)//使用flatMap将每个原始变换为一个序列flatMappedRDD:org.apache.spark.rdd.RDD[Int]MapPartitionsRDD[18] at flatMap at:23scalaflatMappedRDD.collect//显示新的RDDres0:Array[Int]Array(0,0,1,0,1,2,0,1,2,3)sample——抽样
算子函数格式:
sample(withReplacement:Boolean,fraction:Double,seed:Long):JavaRDD[T]
对原始RDD中的元素进行随机抽样,抽样后产生的元素集合构成新的RDD.
参数fraction 指定新集合中元素的数量占原始集合的比例.抽样时的随机数种子由seed指定.
参数withReplacement为false时,抽样方式为不放回抽样.
参数withReplacement为true时,抽样方式为放回抽样.
示例:
sample示例代码
1:scalavalrddsc.parallelize(0to9,1)//生成由0-9的序列构成的RDDrdd:org.apache.spark.rdd.RDD [Int1ParallelCollectionRDD[5]at parallelize at:212:scalardd.sample(false,0.5).collect//不放回抽样一半比例的元素生成新的RDDres4:Array[Int]Array(0,1,2,3,4,7)3:rdd.sample(false,0.5).collect//再次不放回抽样一半比例的元素生成新的RDDres7:Array [Int]Array(0,1,3,6,8)4:scalardd.sample(false,0.8).collect//不放回抽样80%比例的元素生成新的RDDres8:Array[Int]Array(0,1,2,5,6,8,9)5:scalardd.sample(true,0.5).co1lect//放回抽样一半比例的元素生成新的RDDres9:Array[Int]Array(0,2,3,4,4,6,7,9)zip——联结
算子函数格式:
zip[U](other:JavaRDDLike[U,_]):JavapairRDD(T,U]
输入参数为另一个RDD,zip变换生成由原始RDD的值为Key、输入参数RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD.
示例:
zip示例代码
1:scalaval rdd1sc.parallelize(0 to 4,1)//构建原始RDDrdd_1:org.apache.spark.rdd.RDD[Int]ParallelcollectionRDD[19]at parallelize at :212:scalaval rdd2sc.parallelize(5 to 9,1)//构建输入参数RDDrdd_2:org.apache.spark.rdd.RDD[Int]ParallelcollectionRDD [20]at parallelize at :213:scalardd_1.zip(rdd_2).collect//对两个RDD进行联结res5:Array[(Int,Int)]Array((0,5),(1,6),(2,7),(3,8),(4,9})mapValues——对Value值进行变换
算子函数格式:
mapValues[u](f:Function[v,U]):JavapairRDD[K,U]
将Key/Value型RDD中的每个元素的Value值,使用输入参数函数f进行变换,生成新的RDD.
示例:
1:scalaval pairssc.parallelize(List(apple,banana,berry,cherry,cumquat,haw),1).keyBy(_.1ength)//构建原始RDDpairs:org.apache.spark.rdd.RDDI(Int,String)]MappedRDD[16]at keyBy at:122:scalapairs.mapvalues(vvV{0)).collect//生成将单词加单词首字母的RDDres0:Array[(Int,string)]Array{(5,apple a),(6,banana b),(5,berry b),(6,cherry c),7,cumquat c),(3,haw h))二、行动Action算子
数据运算类行动算子
reduce——Reduce操作
算子函数格式:
reduce(f:Function2[T,T,T]):T
对RDD中的每个元素依次使用指定的函数f进行运算,并输出最终的计算结果.
需要注意的是,Spark中的reduce操作与Hadoop中的reduce操作并不一样.在Hadoop中,reduce操作是将指定的函数作用在Key值相同的全部元素上.而Spark的reduce操作则是对所有元素依次进行相同的函数计算.
示例:
1:scalaval numssc.parallelize(0 to 9,5)//构建由数字0-9构成的RDDnums:org,apache.spark.rdd.RDD[Int]ParallelCollectionRDD[18]at parallelize at:122:scalanums.reduce(__)//计算RDD中所有数字的和collect——收集元素
算子函数格式:
co1lect():List[T]
collect的作用是以数组格式返回RDD内的所有元素.
示例:
1:scalaval datasc.parallelize(List(1,2,3,4,5,6,7,8,9,0},2)//构建原始RDDdata;org.apache.spark.rdd.RDD[Int]ParallelCo1lectionRDD[8]at parallelize at:122:scaladata.collect//显示原始RDD中的元素res0:Array[Int]Array(l,2,3,4,5,6,7,8,9,0)countByKey——按Key值统计Key/Value型RDD中的元素个数
算子函数格式:
countByKey():Map[K,Long]
计算Key/Value型RDD中每个Key值对应的元素个数,并以Map数据类型返回
统计结果.
示例:
1:scalaval pairRDDsc.parallelize(List((fruit,Apple),(fruit,Banana),{fruit,Cherry ),{vegetable,bean),(vegetable,cucumber),(vegetable,pepper)),2} //构建原始 RDDpairRDD:org.apache.spark.rdd.RDD [(String,String)]Paralle1 Collection RDD[3 Jat parallelize at :122:sca1apairRDD.countByKey //统计原始RDD中每个物品类型下的物品数量res0:scala.collection.Map[String,Long]Map(fruit-3,vegetable-3)countByValue——统计RDD中元素值出现的次数
算子函数格式:
countByValue():Map[T,Long]
计算RDD中每个元素的值出现的次数,并以Map数据类型返回统计结果.
countByValue示例代码
1:scalaval numsc.parallelize(List(1,1,1,2,2,3),2)//构建原始RDDnum:org.apache.spark.rdd.RDD [Int]ParallelcollectionRDD[4]atparallelize at:122:scalanum.countByValue//统计原始RDD中每个数字出现的次数res0:scala.collection.Map[Int,Long]Map(2-2,1-3,3-1)foreach——逐个处理RDD元素
算子函数格式:
foreach(f:VoidFunction[(K,V)]):Unit
对RDD中的每个元素,使用参数f指定的函数进行处理.
示例:
1:scalaval wordssc.parallelize(List(A,B,C,D),2)//构建原始 RDDwords;org.apache.spark.rdd,RDD[String]ParallelCollectionRDD[9] at parallelize at :212:scalawords.foreach(xprint1n(xis a letter.))/打印输出每个单词构造的一句话Cis a letter.Ais a letter.Dis a letter.Bis a letter.lookup——查找元素
算子函数格式:
lookup(key:K):List[V]
在Key/Value型的RDD中,查找与参数key相同Key值的元素,并得到这些元素
的Value值构成的序列.
示例:
1:scalaval pairssc.parallelize(List(apple,banana,berry,cherry,cumcquat,haw),1).keyBy (_.1ength)//构建原始RDDpairs:org.apache.spark.rdd.RDDt(Int,String)]MapPartitionsRDD[13] at keyBy at:212:scalapairs.collectres18:Array [(Int,String)]Array((5,apple),(6,banana),(5,berry),(6,cherry),(7,cumcuat),(3,haw))3:scalapairs.lookup(5)//查找长度为5的单词res19:Seq[string]WrappedArray (apple,berry)take——获取前n个元素
takeSample——提取n个元素
takeOrdered——获取排序后的前n个元素
存储型行动算子
saveAsObjectFile——存储为二进制文件
算子函数格式:
saveAsobjectPile(path:string):Unit
将RDD转换为序列号对象后,以Hadoop SequenceFile文件格式保存,保存路径由
参数path指定.
示例:
1:scalaval datasc.parallelize(0to9,1)//构建0-9组成的RDDdata:org.apache.spark.rdd.RDD[Int]Paralle1CollectionRDD[40]at parallelize at :122:scaladata.saveAsobjectFile(obj)//将RDD以SequenceFile文件格式保存,文件名为objsaveAsTextFile——存储为文本文件
saveAsNewAPIHadoopFile——存储为Hadoop文件
三、缓存算子
为了提高计算效率,Spark采用了两个重要机制:
①基于分布式内存数据集进行运算,也就是我们已经熟知的RDD;
②变换算子的惰性执行(Lazy Evaluation),即RDD的变换操作并不是在运行到该行代码时立即执行,而仅记录下转换操作的操作对象.只有当运行到一个行动算子代码时,变换操作的计算逻辑才真正执行.
这两个机制帮助Spark提高了运算效率,但正如’硬币都有两面’一样,在带来提升性能的好处的同时,这两个机制也留下了隐患.
例如:
①如果在计算过程中,需要反复使用某个RDD,而该RDD需要经过多次变换才能得到,则每次使用该RDD时都需要重复这些变换操作,这种运算效率是很低的;
②在计算过程中数据存放在内存中,如果出现参与计算的某个节点出现问题,则存放在该节点内存中的RDD数据会发生损坏.如果损坏的也是需要经过多次变换才能得到的RDD,此时虽然可以通过再次执行计算恢复该RDD,但仍然要付出很大的代价.因此,Spark提供了一类缓存算子,以帮助用户解决此类问题.
cache——缓存RDD
算子函数格式:
cache():JavaRDD[T]
cache将RDD的数据持久化存储在内存中,其实现方法是使用后面我们会介绍的persist算子.当需要反复使用某RDD时,使用cache缓存后,可以直接从内存中读出,不再需要执行该RDD的变换过程.需要注意的是,这种缓存方式虽然可以提高再次使用某个RDD的效率,但由于cache后的数据仅仅存储在内存中,因此不能解决RDD出错时需要再次恢复运算的问题.而且cache保存的数据在Driver关闭后会被清除,因此不能被在其他Driver中启动的Spark程序使用.
示例:
1:scalaval numsc.parallelize(0to9,1)//构建RDDnum:org,apache.spark.rdd.RDD[Int]ParallelCollectionRDD[7]at parallelize at:212:scalaval resultnum.map(xx*x)//对原始RDD进行map变换result:org.apache.spark.rdd.RDD[Int]MapPartitionsRDD [8]at map at:233:scalaresult.cache//对新RDD进行缓存res19:result.typeMapPartitionsRDD[8 Jat map at :234:scalaresult.count//统计新RDD中的元素个数res30:Long105:scalaresult.collect().mkstring(,)//再次使用新RDD,生成用逗号分隔的序列res31:String0,1,4,9,16,25,36,49,64,81checkpoint——建立RDD的检查点
算子函数格式:
checkpoint():Unit
对于需要很长时间才能计算出或者需要依赖很多其他RDD变化才能得到的RDD,如果在计算过程中出错,要从头恢复需要付出很大的代价.此时,可以利用checkpoint建立中间过程的检查点,Spark会将执行checkpoint操作的RDD持久化,以二进制文件的形式存放在指定的目录下.与cache不同的是,checkpoint保存的数据在Driver关闭后仍然以文件的形式存在,因此可以被其他Driver中的Spark程序使用.
示例:
1:scalaval rddsc.makeRDD(1to9,2)//构建原始RDDrdd:org.apache.spark.rdd.RDD[Int]ParallelCollectionRDD[0]at makeRDD at :212:scalaval flatMapRDDrdd.flatMap(xSeq(x,x))//对原始RDD做flatMap变换flatMapRDD:org.apache.spark.rdd.RDD [Int]MappartitionsRDD[1]at flatMap at:233:scalasc.setCheckpointDir(my_checkpoint)//指定checkpoint存放的目录4:scalaflatMapRDD.checkpoint()//建立 checkpoint5:scalaflatMapRDD.dependencies.head.rdd//显示变换后RDD的依赖res2:org.apache.spark.rdd.RDD(_]ParallelcollectionRDD[0]at makeRDD at:216:scalaflatMapRDD.collect()//显示变换后的RDDres3:Array[Int]Array(1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9)7:scalaflatMapRDD.dependencies.head.rdd//再次显示变换后RDD的依赖res4:org.apache.spark.rdd.RDD [_1CheckpointRDD[2]at collect at:26persist——持久化RDD
算子函数格式:
persist(newLeve1:storageLeve1):JavaRDD[T]
调用persist可对RDD进行持久化操作,利用参数newlevel可以指定不同的持久化方式,常用的持久化方式包括: MEMORY_ONLY仅在内存中持久化,且将RDD作为非序列化的Java对象存储在JVM中.这种方式比较轻量,是默认的持久化方式. MEMORY_ONLY_SER仅在内存中持久化,且将RDD作为序列化的Java对象存储(每个分区一个byte数组).这种方式比MEMORY_ONLY方式要更加节省空间,但会耗费更多的CPU资源进行序列化操作. MEMORY_ONLY_2仅在内存中持久化,且将数据复制到集群的两个节点中. MEMORY_AND_DISK同时在内存和磁盘中持久化,且将RDD作为非序列化的Java对象存储. MEMORY_AND_DISK_SER同时在内存和磁盘中持久化,且将RDD作为序列化的Java对象存储. MEMORY_AND_DISK_2同时在内存和磁盘中持久化,且将数据复制到集群的两个节点中.
persist示例代码
1:scalaval numsc.parallelize(0to9,1)//构建RDDnum:org.apache.spark.rdd.RDD[Int]ParallelCollectionRDD[0]at parallelize at :122:scalanum.getStorageLeve1//显示RDD当前的持久化状态res8:org.apache.spark.storage.StoragelevelStorageLevel{false,false,false,false,1)3:scalanum.persist()//使用persist进行默认的MEMORY_ONLY持久化res9:num.typeParallelCollectionRDD [5] at parallelize at:214:scalanum.getStorageLeve1//显示RDD新的持久化状态res10:org,apache,spark,storage.StorageLevelStorageLevel(false,true,false,true,1)
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/85918.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!