做京东网站的摘要广西安策企业管理咨询有限公司
news/
2025/10/3 22:39:06/
文章来源:
做京东网站的摘要,广西安策企业管理咨询有限公司,2024近期新闻,做网站的出路目录 Spark简介
1 什么是Spark
2 Spark特点
3 Spark分布式环境安装
3.1 Spark HA的环境安装
3.2 动态增删一个worker节点到集群
4 Spark核心概念
5 Spark案例
5.2 Master URL
5.3 spark日志的管理
5.4 WordCount案例程序的执行过程
6 Spark作业运行架构图#xff…目录 Spark简介
1 什么是Spark
2 Spark特点
3 Spark分布式环境安装
3.1 Spark HA的环境安装
3.2 动态增删一个worker节点到集群
4 Spark核心概念
5 Spark案例
5.2 Master URL
5.3 spark日志的管理
5.4 WordCount案例程序的执行过程
6 Spark作业运行架构图standalone模式
7 RDD操作
7.1 RDD初始化
7.2 RDD操作
7.3 transformation转换算子
7.3 action行动算子
8 高级排序
8.1 普通的排序
8.2 二次排序
8.3 分组TopN
8.4 优化分组TopN
9 持久化操作
9.1 为什要持久化
9.2 如何进行持久化
9.3 持久化策略
9.4 如何选择持久化策略
10 共享变量
10.1 概述
10.2 broadcast广播变量
10.3 accumulator累加器
10.4 自定义累加器 SparkCore基础
1 什么是Spark
Spark是一个通用的可扩展的处理海量数据集的计算引擎。
Spark集成离线计算实时计算SQL查询机器学习图计算为一体的通用的计算框架。
2 Spark特点
1快相比给予MR官方表明基于内存计算spark要快mr100倍基于磁盘计算spark要快mr10倍
快的原因①基于内存计算②计算和数据的分离 ③基于DAGScheduler的计算划分 ④只有一次的Shuffle输出操作
2易用Spark提供超过80多个高阶算子函数来支持对数据集的各种各样的计算使用的时候可以使用java、scala、python、R非常灵活易用。
3通用在一个项目中既可以使用离线计算也可以使用其他比如SQL查询机器学习图计算等等而这时Spark最强大的优势
4到处运行
3 Spark分布式环境安装
1下载解压添加环境变量
2修改配置文件
spark的配置文件在$SPARK_HOME/conf目录下
①拷贝slaves和spark-env.sh文件 cp slaves.template slaves和cp spark-env.sh.template spark-env.sh
②修改slaves配置配置spark的从节点的主机名spark中的从节点叫做worker主节点叫做Master。vim slaves
bigdata02
bigdata03
③修改spark-env.sh文件添加如下内容
export JAVA_HOME/opt/jdk
export SCALA_HOME/home/refuel/opt/mouldle/scala
export SPARK_MASTER_IPbigdata01
export SPARK_MASTER_PORT7077 ##rpc通信端口类似hdfs的9000端口不是50070
export SPARK_WORKER_CORES2
export SPARK_WORKER_INSTANCES1
export SPARK_WORKER_MEMORY1g
export HADOOP_CONF_DIR/home/refuel/opt/mouldle/hadoop/etc/hadoop
3同步spark到其它节点中
3.1 Spark HA的环境安装
有两种方式解决单点故障一种基于文件系统FileSystem(生产中不用)还有一种基于Zookeeper(使用)。 配置基于Zookeeper的一个ha是非常简单的只需要在spark-env.sh中添加一句话即可。
export SPARK_DAEMON_JAVA_OPTS-Dspark.deploy.recoveryModeZOOKEEPER -Dspark.deploy.zookeeper.urlbigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir/spark
spark.deploy.recoveryMode设置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录缺省为 /spark。因为ha不确定master在bigdata01上面启动所以将export SPARK_MASTER_IPbigdata01和export SPARK_MASTER_PORT7077注释掉
3.2 动态增删一个worker节点到集群
1上线一个节点不需要在现有集群的配置上做任何修改只需要准备一台worker机器即可可和之前的worker的配置相同。
2下线一个节点kill或者stop-slave.sh都可以
4 Spark核心概念
ClusterManager在Standalone(依托于spark集群本身)模式中即为Master主节点控制整个集群监控Worker。在YARN模式中为资源管理器ResourceManager。
Worker从节点负责控制计算节点启动Executor。在YARN模式中为NodeManager负责计算节点的控制启动的进程叫Container。
Driver运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念是spark编程的入口作用相当于mr中的Job)。
Executor执行器在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
SparkContext整个应用的上下文控制应用的生命周期是spark编程的入口。
RDD弹性式分布式数据集。Spark的基本计算单元一组RDD可形成执行的有向无环图RDD Graph。
DAGScheduler实现将Spark作业分解成一到多个Stage每个Stage根据RDD的Partition个数决定Task的个数然后生成相应的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大脑中枢神经
TaskScheduler将任务Task分发给Executor执行。
Stage一个Spark作业一般包含一到多个Stage。
Task 一个Stage包含一到多个Task通过多个Task实现并行运行的功能。 task的个数由rdd的partition分区决定
Transformations转换(Transformations) (如map, filter, groupBy, join等)Transformations操作是Lazy的也就是说从一个RDD转换生成另一个RDD的操作不是马上执行Spark在遇到Transformations操作时只会记录需要这样的操作并不会去执行需要等到有Actions操作的时候才会真正启动计算过程进行计算。
Actions操作/行动(Actions)算子 (如count, collect, foreach等)Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
SparkEnv线程级别的上下文存储运行时的重要组件的引用。SparkEnv内创建并包含如下一些重要组件的引用。
MapOutPutTracker负责Shuffle元信息的存储
BroadcastManager负责广播变量的控制与元信息的存储。
BlockManager负责存储管理、创建和查找块。
MetricsSystem监控运行时性能指标信息。
SparkConf负责存储配置信息。作用相当于hadoop中的Configuration。
5 Spark案例
pom文件的依赖配置如下
dependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.12/version/dependency!-- scala去除因为spark-core包里有了scala的依赖dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.11.8/version/dependency --!-- sparkcore --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.2.2/version/dependency!-- sparksql --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion2.2.2/version/dependency!-- sparkstreaming --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.11/artifactIdversion2.2.2/version/dependency/dependencies注意入口类为SparkContextjava版本的是JavaSparkContextscala的版本就是SparkContextSparkSQL的入口有SQLContext、HiveContextSparkStreaming的入口又是StreamingContext。
java版本
public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、创建编程入口类SparkConf conf new SparkConf();conf.setMaster(local[*]);conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc new JavaSparkContext(conf);//step 2、加载外部数据 形成spark中的计算的编程模型RDDJavaRDDString linesRDD jsc.textFile(E:/hello.txt);// step 3、对加载的数据进行各种业务逻辑操作---转换操作transformationJavaRDDString wordsRDD linesRDD.flatMap(new FlatMapFunctionString, String() {public IteratorString call(String line) throws Exception {return Arrays.asList(line.split(\\s)).iterator();}});//JavaRDDString wordsRDD linesRDD.flatMap(line - Arrays.asList(line.split(\\s)).iterator());System.out.println(-----经过拆分之后的rdd数据----);wordsRDD.foreach(new VoidFunctionString() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println(-----word拼装成键值对----);JavaPairRDDString, Integer pairsRDD wordsRDD.mapToPair(new PairFunctionString, String, Integer() {public Tuple2String, Integer call(String word) throws Exception {return new Tuple2String, Integer(word, 1);}});//JavaPairRDDString, Integer pairsRDD wordsRDD.mapToPair(word - new Tuple2String, Integer(word, 1));pairsRDD.foreach(new VoidFunctionTuple2String, Integer() {public void call(Tuple2String, Integer t) throws Exception {System.out.println(t._1 --- t._2);}});System.out.println(------按照相同的key统计value--------------);JavaPairRDDString, Integer retRDD pairsRDD.reduceByKey(new Function2Integer, Integer, Integer() {public Integer call(Integer v1, Integer v2) throws Exception {int i 1 / 0; //印证出这些转换的transformation算子是懒加载的需要action的触发return v1 v2;}});//JavaPairRDDString, Integer retRDD pairsRDD.reduceByKey((v1, v2) - v1 v2);retRDD.foreach(new VoidFunctionTuple2String, Integer() {public void call(Tuple2String, Integer t) throws Exception {System.out.println(t._1 --- t._2);}});}
}
scala版本
object SparkWordCountOps {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(SparkWordCount)val sc new SparkContext(conf)//load data from fileval linesRDD:RDD[String] sc.textFile(E:/hello.txt)val wordsRDD:RDD[String] linesRDD.flatMap(line line.split(\\s))val pairsRDD:RDD[(String, Int)] wordsRDD.map(word (word, 1))val ret pairsRDD.reduceByKey((v1, v2) v1 v2)ret.foreach(t println(t._1 --- t._2))sc.stop()}
}
5.2 Master URL
master-url通过sparkConf.setMaster来完成。代表的是spark作业的执行方式或者指定的spark程序的cluster-manager的类型。
master含义local程序在本地运行同时为本地程序提供一个线程来处理local[M]程序在本地运行同时为本地程序分配M个工作线程来处理local[*]程序在本地运行同时为本地程序分配机器可用的CPU core的个数工作线程来处理local[M, N]程序在本地运行同时为本地程序分配M个工作线程来处理,如果提交程序失败会进行最多N次的重试spark://ip:port基于standalone的模式运行提交撑到ip对应的master上运行spark://ip1:port1,ip2:port2基于standalone的ha模式运行提交撑到ip对应的master上运行yarn/启动脚本中的deploy-mode配置为cluster基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面在yarn集群中yarn/启动脚本中的deploy-mode配置为client基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面不在yarn集群中
5.3 spark日志的管理
1全局管理项目classpath下面引入log4j.properties配置文件进行管理
# 基本日志输出级别为INFO输出目的地为console
log4j.rootCategoryINFO, consolelog4j.appender.consoleorg.apache.log4j.ConsoleAppender
log4j.appender.console.targetSystem.err
log4j.appender.console.layoutorg.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 输出配置的是spark提供的日志级别
log4j.logger.org.spark_project.jettyINFO
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycleERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyperINFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreterINFO
log4j.logger.org.apache.parquetERROR
log4j.logger.parquetERROR
2局部管理 就是在当前类中进行日志的管理。
import org.apache.log4j.{Level, Logger}
Logger.getLogger(org.apache.spark).setLevel(Level.WARN)
Logger.getLogger(org.apache.hadoop).setLevel(Level.WARN)
Logger.getLogger(org.spark_project).setLevel(Level.WARN)
5.4 WordCount案例程序的执行过程 当deploy-mode为client模式的时候driver就在我们提交作业的本机而spark的作业对应的executor在spark集群中运行。
在上图中可以发现相邻两个rdd之间有依赖关系依赖分为宽依赖和窄依赖。
窄依赖rdd中的partition中的数据只依赖于父rdd中的一个partition或者常数个partition。常见的窄依赖操作有flatMapmapfiltercoalesce等
宽依赖rdd中的partition中的数据只依赖于父rdd中的所有partition。常见的宽依赖操作有reduceByKeygroupByKeyjoinsortByKeyrepartition等
rdd和rdd之间的依赖关系构成了一个链条这个链条称之为lineage血缘
6 Spark作业运行架构图standalone模式 ①启动spark集群通过spark的start-all.sh脚本启动spark集群启动了对应的Master进程和Worker进程
②Worker启动之后向Master进程发送注册信息
③Worker向Master注册成功之后worker要不断的向master发送心跳包去监听主节点是否存在
④Driver向Spark集群提交作业就是向Master提交作业申请运行资源
⑤Master收到Driver的提交请求向Worker节点指派相应的作业任务就是在对应的Worker节点上启动对应的executor进程
⑥Worker节点接收到Master节点启动executor任务之后就启动对应的executor进程向master汇报成功启动可以接收任务
⑦executor进程启动之后就像Driver进程进行反向注册告诉Driver谁可以执行spark任务
⑧Driver接收到注册之后就知道向谁发送spark作业那么这样在spark集群中就有一组独立的executor进程为该Driver服务
⑨DAGScheduler根据编写的spark作业逻辑将spark作业分成若干个阶段Stage基于Spark的transformation里是否有shuffle Dependency然后为每一个阶段组装一批task组成taskSettask里面包含了序列化之后的我们编写的spark transformation然后将这些DAGScheduler组装好的taskSet交给taskScheduler由taskScheduler将这些任务发给对应的executor
⑩executor进程接收到了Driver发送过来的taskSet之后进行反序列化然后将这些task封装进一个叫tasksunner的线程中然后放到本地线程池中调度我们的作业的执行。
7 RDD操作
7.1 RDD初始化
RDD的初始化原生api提供的2中创建方式
①是读取文件textFile
②加载一个scala集合parallelize。
当然也可以通过transformation算子来创建的RDD。
7.2 RDD操作
RDD操作算子的分类基本上分为两类transformation和action当然更加细致的分可以分为输入算子转换算子缓存算子行动算子。
输入在Spark程序运行中数据从外部数据空间如分布式存储textFile读取HDFS等parallelize方法输入Scala集合或数据输入Spark数据进入Spark运行时数据空间转化为Spark中的数据块通过BlockManager进行管理。
运行在Spark数据输入形成RDD后便可以通过变换算子如filter等对数据进行操作并将RDD转化为新的RDD通过Action算子触发Spark提交作业。 如果数据需要复用可以通过Cache算子将数据缓存到内存。
输出程序运行结束数据会输出Spark运行时空间存储到分布式存储中如saveAsTextFile输出到HDFS或Scala数据或集合中collect输出到Scala集合count返回Scala int型数据。
7.3 transformation转换算子
1map
rdd.map(func):RDD,对rdd集合中的每一个元素都作用一次该func函数之后返回值为生成元素构成的一个新的RDD。
2flatMap
rdd.flatMap(func):RDD rdd集合中的每一个元素都要作用func函数返回0到多个新的元素这些新的元素共同构成一个新的RDD。
map操作是一个一到一的操作flatMap操作是一个1到多的操作
3filter
rdd.filter(func):RDD 对rdd中的每一个元素操作func函数该函数的返回值为Boolean类型保留返回值为true的元素共同构成一个新的RDD过滤掉哪些返回值为false的元素。
4sample
rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD 抽样sample抽样不是一个精确的抽样。一个非常重要的作用就是来看rdd中数据的分布情况根据数据分布的情况进行各种调优与优化防止数据倾斜。
withReplacement:抽样的方式true有放回抽样, false为无返回抽样
fraction: 抽样比例取值范围就是0~1
seed: 抽样的随机数种子有默认值通常也不需要传值
5union
rdd1.union(rdd2),联合rdd1和rdd2中的数据形成一个新的rdd其作用相当于sql中的union all。
6join
join就是sql中的inner join。
注意要想两个RDD进行连接那么这两个rdd的数据格式必须是k-v键值对的其中的k就是关联的条件也就是sql中的on连接条件。
RDD1的类型[K, V], RDD2的类型[K, W]
内连接 val joinedRDD:RDD[(K, (V, W))] rdd1.join(rdd2)
左外连接 val leftJoinedRDD:RDD[(K, (V, Option[W]))] rdd1.leftOuterJoin(rdd2)
右外连接 val rightJoinedRDD:RDD[(K, (Option[V], W))] rdd1.rightOuterJoin(rdd2)
全连接 val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] rdd1.fullOuterJoin(rdd2)
7groupByKey
rdd.groupByKey(),按照key进行分组如果原始rdd的类型时[(K, V)] 那必然其结果就肯定[(K, Iterable[V])]是一个shuffle dependency宽依赖shuffle操作但是这个groupByKey不建议在工作过程中使用除非非要用因为groupByKey没有本地预聚合性能较差一般我们能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就尽量代替。
8reduceByKey
rdd.reduceByKey(func:(V, V) V):RDD[(K, V)] 在scala集合中学习过一个reduce(func:(W, W) W)操作是一个聚合操作这里的reduceByKey按照就理解为在groupByKey(按照key进行分组[(K, Iterable[V])])的基础上对每一个key对应的Iterable[V]执行reduce操作。
同时reduceByKey操作会有一个本地预聚合的操作所以是一个shuffle dependency宽依赖shuffle操作。
9sortByKey
按照key进行排序
10combineByKey
这是spark最底层的聚合算子之一按照key进行各种各样的聚合操作spark提供的很多高阶算子都是基于该算子实现的。
def combineByKey[C](createCombiner: V C,mergeValue: (C, V) C,mergeCombiners: (C, C) C): RDD[(K, C)] {...
}
createCombiner: V C, 相同的Key在分区中会调用一次该函数用于创建聚合之后的类型为了和后续Key相同的数据进行聚合mergeValue: (C, V) C, 在相同分区中基于上述createCombiner基础之上的局部聚合mergeCombiners: (C, C) C) 将每个分区中相同key聚合的结果在分区间进行全局聚合
11aggregateByKey
aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) U, combOp: (U, U) U): RDD[(K, U)]和combineByKey都是一个相对底层的聚合算子可以完成系统没有提供的其它操作相当于自定义算子。aggregateByKey底层使用combineByKeyWithClassTag来实现所以本质上二者没啥区别区别就在于使用时的选择而已。
aggregateByKey更为简单但是如果聚合前后数据类型不一致建议使用combineByKey同时如果初始化操作较为复杂也建议使用combineByKey。
7.3 action行动算子
这些算子都是在rdd上的分区partition上面执行的不是在driver本地执行。
1foreach
用于遍历RDD,将函数f应用于每一个元素无返回值(action算子)
2count
统计该rdd中元素的个数
3take(n)
返回该rdd中的前N个元素如果该rdd的数据是有序的那么take(n)就是Top N
4first
take(n)中比较特殊的一个take(1)(0)
5collect
将分布在集群中的各个partition中的数据拉回到driver中进行统一的处理但是这个算子有很大的风险存在第一driver内存压力很大第二数据在网络中大规模的传输效率很低所以一般不建议使用如果非要用请先执行filter。
6reduce
reduce是一个action操作reduceByKey是一个transformation。reduce对一个rdd执行聚合操作并返回结果结果是一个值。
7countByKey
统计key出现的次数
8saveAsTextFile
保存到文件本质上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]
9saveAsObjectFile和saveAsSequenceFile
saveAsObjectFile本质上是saveAsSequenceFile
10saveAsHadoopFile和saveAsNewAPIHadoopFile
这二者的主要区别就是OutputFormat的区别接口org.apache.hadoop.mapred.OutputFormat
抽象类org.apache.hadoop.mapreduce.OutputFormat 所以saveAshadoopFile使用的是接口OutputFormatsaveAsNewAPIHadoopFile使用的抽象类OutputFormat建议使用后者。
8 高级排序
8.1 普通的排序
1sortByKey
object SortByKeyOps {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(SortByKeyOps).setMaster(local[2])val sc new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val stuRDD:RDD[Student] sc.parallelize(List(Student(1, refuel01, 19, 168),Student(2, refuel02, 25, 175),Student(3, refuel03, 25, 176),Student(4, refuel04, 16, 180),Student(5, refuel05, 18, 168.5)))//按照学生身高进行降序排序val height2Stu stuRDD.map(stu (stu.height, stu))//注意sortByKey是局部排序不是全局排序如果要进行全局排序// 必须将所有的数据都拉取到一台机器上面才可以val sorted height2Stu.sortByKey(ascending false, numPartitions 1)sorted.foreach{case (height, stu) println(stu)}sc.stop()}
}case class Student(id:Int, name:String, age:Int, height:Double)
2sortBy
这个sortBy其实使用sortByKey来实现但是比sortByKey更加灵活因为sortByKey只能应用在k-v数据格式上而这个sortBy可以应在非k-v键值对的数据格式上面。
val sortedBy stuRDD.sortBy(stu stu.height,ascending true,numPartitions 1)(new Ordering[Double](){override def compare(x: Double, y: Double) y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)
sortedBy的操作除了正常的升序分区个数以外还需需要传递一个将原始数据类型提取其中用于排序的字段并且提供用于比较的方式以及在运行时的数据类型ClassTag标记型trait。
3takeOrdered
takeOrdered也是对rdd进行排序但是和上述的sortByKey和sortBy相比较takeOrdered是一个action操作返回值为一个集合而前两者为transformation返回值为rdd。如果我们想在driver中获取排序之后的结果那么建议使用takeOrdered因为该操作边排序边返回。其实是take和sortBy的一个结合体。
takeOrdered(n)获取排序之后的n条记录
//先按照身高降序排序身高相对按照年龄升序排 --- 二次排序
stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) {var ret y.height.compareTo(x.height)if(ret 0) {ret x.age.compareTo(y.age)}ret}
}).foreach(println)
8.2 二次排序
所谓二次排序指的是排序字段不唯一有多个共同排序
object SortByKeyOps {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(SortByKeyOps).setMaster(local[2])val sc new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Student] sc.parallelize(List(Student(1, refuel01, 19, 168),Student(2, refuel02, 25, 175),Student(3, refuel03, 25, 176),Student(4, refuel04, 16, 180),Student(5, refuel05, 18, 168.5)))personRDD.map(stu (stu, null)).sortByKey(true, 1).foreach(p println(p._1))sc.stop()}
}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) {var ret this.height.compareTo(that.height)if(ret 0) {ret this.age.compareTo(that.age)}ret}
}
8.3 分组TopN
在分组的情况之下获取每个组内的TopN数据
object GroupSortTopN {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(GroupSortTopN).setMaster(local[2])val sc new SparkContext(conf)val lines sc.textFile(file:/E:/data/topn.txt)//按照科目进行排序val course2Info:RDD[(String, String)] lines.map(line {val spaceIndex line.indexOf( )val course line.substring(0, spaceIndex)val info line.substring(spaceIndex 1)(course, info)})//按照科目排序指的是科目内排序不是科目间的排序所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] course2Infos.map{case (course, infos) {val topN mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) {val xScore x.split(\\s)(1)val yScore y.split(\\s)(1)yScore.compareTo(xScore)}})for(info - infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}
}
8.4 优化分组TopN
上述在编码过程当中使用groupByKey我们说着这个算子的性能很差因为没有本地预聚合所以应该在开发过程当中尽量避免使用能用其它代替就代替。
1使用combineByKey优化1
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(GroupSortByCombineByKeyTopN).setMaster(local[2])val sc new SparkContext(conf)val lines sc.textFile(file:/E:/data/topn.txt)//按照科目进行排序val course2Info:RDD[(String, String)] lines.map(line {val spaceIndex line.indexOf( )val course line.substring(0, spaceIndex)val info line.substring(spaceIndex 1)(course, info)})//按照科目排序指的是科目内排序不是科目间的排序所以需要把每个科目的信息汇总val course2Infos course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] course2Infos.map{case (course, infos) {val topN mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) {val xScore x.split(\\s)(1)val yScore y.split(\\s)(1)yScore.compareTo(xScore)}})for(info - infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] {val ab new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] {ab.:(ab1)}
}
此时这种写法和上面的groupByKey性能一模一样没有任何的优化。
2使用combineByKey的优化2
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(GroupSortByCombineByKeyTopN).setMaster(local[2])val sc new SparkContext(conf)val lines sc.textFile(file:/E:/data/spark/topn.txt)//按照科目进行排序val course2Info:RDD[(String, String)] lines.map(line {val spaceIndex line.indexOf( )val course line.substring(0, spaceIndex)val info line.substring(spaceIndex 1)(course, info)})//按照科目排序指的是科目内排序不是科目间的排序所以需要把每个科目的信息汇总val sorted course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] {val ts new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) {val xScore x.split(\\s)(1)val yScore y.split(\\s)(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] {ab.add(info)if(ab.size 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] {for (info - ab1) {ab.add(info)}if(ab.size 3) {ab.take(3)} else {ab}}
} 9 持久化操作
9.1 为什要持久化
一个RDD如果被多次操作为了提交后续的执行效率我们建议对该RDD进行持久化操作。
9.2 如何进行持久化
rdd.persist()/cache()就完成了rdd的持久化操作我们可以将该rdd的数据持久化到内存磁盘等等。
如果我们已经不再对该rdd进行一个操作而此时程序并没有终止可以卸载已经持久化的该rdd数据rdd.unPersist()。
9.3 持久化策略
可以通过persist(StoreageLevle的对象)来指定持久化策略,eg:StorageLevel.MEMORY_ONLY。
持久化策略含义MEMORY_ONLY(默认)rdd中的数据以未经序列化的java对象格式存储在内存中。如果内存不足剩余的部分不持久化使用的时候没有持久化的那一部分数据重新加载。这种效率是最高但是是对内存要求最高的。MEMORY_ONLY_SER就比MEMORY_ONLY多了一个SER序列化保存在内存中的数据是经过序列化之后的字节数组同时每一个partition此时就是一个比较大的字节数组。MEMORY_AND_DISK和MEMORY_ONLY相比就多了一个内存存不下的数据存储在磁盘中。MEMEORY_AND_DISK_SER比MEMORY_AND_DISK多了个序列化。DISK_ONLY就是MEMORY_ONLY对应都保存在磁盘效率太差一般不用。xxx_2就是上述多个策略后面加了一个_2,比如MEMORY_ONLY_2MEMORY_AND_DISK_SER_2等等就多了一个replicate而已备份所以性能会下降但是容错或者高可用加强了。所以需要在二者直接做权衡。如果说要求数据具备高可用同时容错的时间花费比从新计算花费时间少此时便可以使用否则一般不用。HEAP_OFF(experimental)使用非Spark的内存也即堆外内存比如TachyonHBase、Redis等等内存来补充spark数据的缓存。
9.4 如何选择持久化策略
1如果要持久化的数据是可以在内存中进行保存那么毫无疑问选择MEMEORY_ONLY因为这种方式的效率是最高的但是在生成中往往要进行缓存的数据量还是蛮大的而且因为数据都是未经序列化的java对象所以很容易引起频繁的gc。
2如果上述满足不了就退而求其次MEMORY_ONLY_SER这种方式增加的额外的性能开销就是序列化和反序列化经过反序列化之后的对象就是纯java对象因此性能还是蛮高的。
3如果还是扛不住再退而求其次MEMOEY_AND_DISK_SER因为到这一步的话那说明对象体积确实很多为了提交执行效率应该尽可能的将数据保存在内存所以就对数据进行序列化其次在序列化到磁盘。
4一般情况下DISK_ONLYDISK_SER不用效率太低有时候真的不容从源头计算一遍。
5一般情况下我们都不用XXX_2,代备份的种种持久化策略除非程序对数据的安全性要求非常高或者说备份的对性能的消耗低于从头再算一遍我们可以使用这种xxx_2以外基本不用。
10 共享变量
10.1 概述
如果transformation使用到Driver中的变量在executor中执行的时候就需要通过网络传输到对应的executor如果该变量很大那么网络传输一定会成为性能的瓶颈。Spark就提供了两种有限类型的共享变量累加器和广播变量
10.2 broadcast广播变量
广播变量为每个task都拷贝一份变量将变量包装成为一个广播变量broadcast只需要在executor中拷贝一份在task运行的时候直接从executor调用即可相当于局部变量变成成员变量性能就得到了提升。
val num:Any xxxval numBC:Broadcast[Any] sc.broadcast(num)调用val n numBC.value注意该num需要进行序列化。
10.3 accumulator累加器
累加器的一个好处是不需要修改程序的业务逻辑来完成数据累加同时也不需要额外的触发一个action job来完成累加反之必须要添加新的业务逻辑必须要触发一个新的action job来完成显然这个accumulator的操作性能更佳
构建一个累加器val accu sc.longAccumuator()累加的操作accu.add(参数)获取累加器的结果val ret accu.value
val conf new SparkConf()
.setAppName(AccumulatorOps)
.setMaster(local[*])val sc new SparkContext(conf)val lines sc.textFile(file:/data.txt)
val words lines.flatMap(_.split(\\s))//统计每个单词出现的次数
val accumulator sc.longAccumulatorval rbk words.map(word {if(word is)accumulator.add(1)(word, 1)
}).reduceByKey(__)
rbk.foreach(println)
println(使用累加器)
println(is accumulator.value)Thread.sleep(1000000)
sc.stop()
注意累加器的调用在action之后被调用也就是说累加器必须在action触发之后多次使用同一个累加器应该尽量做到用完即重置尽量给累加器指定name方便我们在web-ui上面进行查看
10.4 自定义累加器
自定义一个类继承AccumulatorV2重写方法
/*自定义累加器IN 指的是accmulator.add(sth.)中sth的数据类型OUT 指的是accmulator.value返回值的数据类型*/
class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map mutable.Map[String, Long]()/*** 当前累加器是否有初始化值* 如果为一个long的值0就是初始化值如果为listNil就是初始化值是mapMap()就是初始化值*/override def isZero: Boolean trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] {val accu new MyAccumulatoraccu.map this.mapaccu}override def reset(): Unit map.clear()//分区内的数据累加 is: 5, of:4override def add(word: String): Unit {if(map.contains(word)) {val newCount map(word) 1map.put(word, newCount)} else {map.put(word, 1)}
// map.put(word, map.getOrElse(word, 0) 1)}//多个分区间的数据累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit {other.value.foreach{case (word, count) {if(map.contains(word)) {val newCount map(word) countmap.put(word, newCount)} else {map.put(word, count)}
// map.put(word, map.getOrElse(word, 0) count)}}}override def value: Map[String, Long] map.toMap
}
注册使用
object _08AccumulatorOps {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName($AccumulatorOps).setMaster(local[*])val sc new SparkContext(conf)val lines sc.textFile(file:/E:/data.txt)val words lines.flatMap(_.split(\\s))//注册val myAccu new MyAccumulator()sc.register(myAccu, myAccu)//统计每个单词出现的次数val pairs words.map(word {if(word is || word of || word a)myAccu.add(word)(word, 1)})val rbk pairs.reduceByKey(__)rbk.foreach(println)println(累加器)myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/926401.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!