网站UI怎么做常德网站建设公司
网站UI怎么做,常德网站建设公司,注册会计师报名,seo与sem的关系Spark笔记——技术点汇总 目录 概况 手工搭建集群 引言 安装Scala 配置文件 启动与测试 应用部署 部署架构 应用程序部署 核心原理 RDD概念 RDD核心组成 RDD依赖关系 DAG图 RDD故障恢复机制 Standalone模式的Spark架构 YARN模式的Spark架构 应用程序资源构建… Spark笔记——技术点汇总 目录 · 概况 · 手工搭建集群 · 引言 · 安装Scala · 配置文件 · 启动与测试 · 应用部署 · 部署架构 · 应用程序部署 · 核心原理 · RDD概念 · RDD核心组成 · RDD依赖关系 · DAG图 · RDD故障恢复机制 · Standalone模式的Spark架构 · YARN模式的Spark架构 · 应用程序资源构建 · API · WordCount示例 · RDD构建 · RDD缓存与持久化 · RDD分区数 · 共享变量 · RDD Operation · RDD Operation隐式转换 · RDD[T]分区Operation · RDD[T]常用聚合Operation · RDD间操作Operation · DoubleRDDFunctions常用Operation · PairRDDFunctions聚合Operation · PairRDDFunctions间操作Operation · OrderedRDDFunctions常用Operation · 案例移动终端上网数据分析 · 数据准备 · 加载预处理 · 统计App访问次数 · 统计DAU · 统计MAU · 统计App上下流量 概况 1. Spark相对MapReduce的优势 a) 支持迭代计算 b) 中间结果存储在内存而不是硬盘降低延迟。 2. Spark已成为轻量级大数据快速处理统一平台“One stack to rule them all”一个平台完成即席查询ad-hoc queries、批处理batch processing、流式处理stream processing。 3. Spark集群搭建方式 a) 集成部署工具如Cloudera Manager b) 手工搭建。 4. Spark源码编译方式 a) SBT编译 b) Maven编译。 手工搭建集群 引言 1. 环境 Role Host name Master centos1 Slave centos2 centos3 2. Standalone模式需在Master和Slave节点部署YARN模式仅需在命令提交机器部署。 3. 假设已成功安装JDK、Hadoop集群。 安装Scala 1. [MasterStandalone模式或命令提交机器YARN模式]安装Scala到/opt/app目录下。 tar zxvf scala-2.10.6.tgz -C /opt/app 2. [MasterStandalone模式或命令提交机器YARN模式]配置环境变量。 vi /etc/profile export SCALA_HOME/opt/app/scala-2.10.6
export PATH$SCALA_HOME/bin:$PATH source /etc/profile # 生效
env | grep SCALA_HOME # 验证 配置文件 3. [MasterStandalone模式或命令提交机器YARN模式] tar zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /opt/app
cd /opt/app/spark-1.6.3-bin-hadoop2.6/conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh export JAVA_HOME/opt/app/jdk1.8.0_121
export SCALA_HOME/opt/app/scala-2.10.6
export HADOOP_HOME/opt/app/hadoop-2.6.5
export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop
export YARN_CONF_DIR${HADOOP_HOME}/etc/hadoop
# For standalone mode
export SPARK_WORKER_CORES1
export SPARK_DAEMON_MEMORY512m cp spark-defaults.conf.template spark-defaults.conf
hadoop fs -mkdir /spark.eventLog.dir
vi spark-defaults.conf spark.driver.extraClassPath /opt/app/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.22-bin.jar
spark.eventLog.enabled true
spark.eventLog.dir hdfs://centos1:9000/spark.eventLog.dir cp slaves.template slaves
vi slaves centos2
centos3 ln -s /opt/app/apache-hive-1.2.2-bin/conf/hive-site.xml . 4. [MasterStandalone模式]从Master复制Spark目录到各Slave。注意仅Standalone集群需要执行本步骤。 scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoopcentos2:/opt/app
scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoopcentos3:/opt/app 启动与测试 5. [MasterStandalone模式或命令提交机器YARN模式]配置Spark环境变量。 export SPARK_HOME/opt/app/spark-1.6.3-bin-hadoop2.6
export PATH$PATH:$SPARK_HOME/binexport PYSPARK_PYTHON/usr/bin/python3 # 可选export PYSPARK_DRIVER_PYTHON/usr/bin/python3 # 可选 6. [MasterStandalone模式]启动Spark测试。 sbin/start-all.sh
jps Master # Master机器进程
Worker # Slave机器进程 7. [MasterStandalone模式或命令提交机器YARN模式]测试。 bin/spark-submit --master spark://centos1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Client模式运行
bin/spark-submit --master spark://centos1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Cluster模式运行
bin/spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Client模式运行
bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Custer模式运行bin/spark-submit --master yarn-client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 examples/src/main/python/pi.py # Yarn Client模式运行Python bin/yarn application -list # 查看YARN运行的应用
bin/yarn application -kill ApplicationID # 杀死YARN运行的应用 bin/spark-shell --master spark://centos1:7077 --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Standalone Client模式运行
bin/spark-shell --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Yarn Client模式运行 8. 监控页面。 http://centos1:8080 Spark监控 http://centos1:8088 YARN监控 应用部署 部署架构 1. ApplicationSpark应用程序包括一个Driver Program和集群中多个WorkNode中的Executor其中每个WorkNode为每个Application仅提供一个Executor。 2. Driver Program运行Application的main函数。通常也用SparkContext表示。负责DAG构建、Stage划分、Task管理及调度、生成SchedulerBackend用于Akka通信主要组件有DAGScheduler、TaskScheduler、SchedulerBackend。 3. Cluster Manager集群管理器可封装如Spark Standalone、YARN等不同集群管理器。Driver Program通过Cluster Manager分配资源并将任务发送到多个Work Node执行。 4. WorkNode集群节点。应用程序在运行时的Task在WorkNode的Executor中执行。 5. ExecutorWorkNode为Application启动的一个进程负责执行Task。 6. Stage一个Applicatoin一般包含一到多个Stage。 7. Task被Driver Program发送到Executor的计算单元通常一个Task处理一个split即一个分区每个split一般是一个Block大小。一个Stage包含一到多个Task通过多个Task实现并行计算。 8. DAGScheduler将Application分解成一到多个Stage每个Stage根据RDD分区数决定Task个数然后生成相应TaskSet放到TaskScheduler中。 9. DeployModeDriver进程部署模式有cluster和client两种。 10. 注意 a) Driver Program必须与Spark集群处于同一网络环境。因为SparkContext要发送任务给不同WorkNode的Executor并接受Executor的执行结果。 b) 生产环境中Driver Program所在机器性能配置尤其CPU较好。 应用程序部署 1. 分类 a) spark-shell交互式用于开发调试。已创建好“val sc: SparkContext”和“val sqlContext: SQLContext”实例。 b) spark-submit应用提交式用于生产部署。 2. spark-shell参数 bin/spark-shell --help Usage: ./bin/spark-shell [options]Options:--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.--deploy-mode DEPLOY_MODE Whether to launch the driver program locally (client) oron one of the worker machines inside the cluster (cluster)(Default: client).--class CLASS_NAME Your applications main class (for Java / Scala apps).--name NAME A name of your application.--jars JARS Comma-separated list of local jars to include on the driverand executor classpaths.--packages Comma-separated list of maven coordinates of jars to includeon the driver and executor classpaths. Will search the localmaven repo, then maven central and any additional remoterepositories given by --repositories. The format for thecoordinates should be groupId:artifactId:version.--exclude-packages Comma-separated list of groupId:artifactId, to exclude whileresolving the dependencies provided in --packages to avoiddependency conflicts.--repositories Comma-separated list of additional remote repositories tosearch for the maven coordinates given with --packages.--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to placeon the PYTHONPATH for Python apps.--files FILES Comma-separated list of files to be placed in the workingdirectory of each executor.--conf PROPVALUE Arbitrary Spark configuration property.--properties-file FILE Path to a file from which to load extra properties. If notspecified, this will look for conf/spark-defaults.conf.--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).--driver-java-options Extra Java options to pass to the driver.--driver-library-path Extra library path entries to pass to the driver.--driver-class-path Extra class path entries to pass to the driver. Note thatjars added with --jars are automatically included in theclasspath.--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).--proxy-user NAME User to impersonate when submitting the application.--help, -h Show this help message and exit--verbose, -v Print additional debug output--version, Print the version of current SparkSpark standalone with cluster deploy mode only:--driver-cores NUM Cores for driver (Default: 1).Spark standalone or Mesos with cluster deploy mode only:--supervise If given, restarts the driver on failure.--kill SUBMISSION_ID If given, kills the driver specified.--status SUBMISSION_ID If given, requests the status of the driver specified.Spark standalone and Mesos only:--total-executor-cores NUM Total cores for all executors.Spark standalone and YARN only:--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,or all available cores on the worker in standalone mode)YARN-only:--driver-cores NUM Number of cores used by the driver, only in cluster mode(Default: 1).--queue QUEUE_NAME The YARN queue to submit to (Default: default).--num-executors NUM Number of executors to launch (Default: 2).--archives ARCHIVES Comma separated list of archives to be extracted into theworking directory of each executor.--principal PRINCIPAL Principal to be used to login to KDC, while running onsecure HDFS.--keytab KEYTAB The full path to the file that contains the keytab for theprincipal specified above. This keytab will be copied tothe node running the Application Master via the SecureDistributed Cache, for renewing the login tickets and thedelegation tokens periodically. 3. spark-submit参数除Usage外其他参数与spark-shell一样 bin/spark-submit --help Usage: spark-submit [options] app jar | python file [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]Options:--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.--deploy-mode DEPLOY_MODE Whether to launch the driver program locally (client) oron one of the worker machines inside the cluster (cluster)(Default: client).--class CLASS_NAME Your applications main class (for Java / Scala apps).--name NAME A name of your application.--jars JARS Comma-separated list of local jars to include on the driverand executor classpaths.--packages Comma-separated list of maven coordinates of jars to includeon the driver and executor classpaths. Will search the localmaven repo, then maven central and any additional remoterepositories given by --repositories. The format for thecoordinates should be groupId:artifactId:version.--exclude-packages Comma-separated list of groupId:artifactId, to exclude whileresolving the dependencies provided in --packages to avoiddependency conflicts.--repositories Comma-separated list of additional remote repositories tosearch for the maven coordinates given with --packages.--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to placeon the PYTHONPATH for Python apps.--files FILES Comma-separated list of files to be placed in the workingdirectory of each executor.--conf PROPVALUE Arbitrary Spark configuration property.--properties-file FILE Path to a file from which to load extra properties. If notspecified, this will look for conf/spark-defaults.conf.--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).--driver-java-options Extra Java options to pass to the driver.--driver-library-path Extra library path entries to pass to the driver.--driver-class-path Extra class path entries to pass to the driver. Note thatjars added with --jars are automatically included in theclasspath.--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).--proxy-user NAME User to impersonate when submitting the application.--help, -h Show this help message and exit--verbose, -v Print additional debug output--version, Print the version of current SparkSpark standalone with cluster deploy mode only:--driver-cores NUM Cores for driver (Default: 1).Spark standalone or Mesos with cluster deploy mode only:--supervise If given, restarts the driver on failure.--kill SUBMISSION_ID If given, kills the driver specified.--status SUBMISSION_ID If given, requests the status of the driver specified.Spark standalone and Mesos only:--total-executor-cores NUM Total cores for all executors.Spark standalone and YARN only:--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,or all available cores on the worker in standalone mode)YARN-only:--driver-cores NUM Number of cores used by the driver, only in cluster mode(Default: 1).--queue QUEUE_NAME The YARN queue to submit to (Default: default).--num-executors NUM Number of executors to launch (Default: 2).--archives ARCHIVES Comma separated list of archives to be extracted into theworking directory of each executor.--principal PRINCIPAL Principal to be used to login to KDC, while running onsecure HDFS.--keytab KEYTAB The full path to the file that contains the keytab for theprincipal specified above. This keytab will be copied tothe node running the Application Master via the SecureDistributed Cache, for renewing the login tickets and thedelegation tokens periodically. 4. 默认参数 a) 默认应用程序参数配置文件conf/spark-defaults.conf b) 默认JVM参数配置文件conf/spark-env.sh c) 常用的jar文件可通过“--jar”参数配置。 5. 参数优先级由高到低 a) SparkConf显示配置参数 b) spark-submit指定参数 c) conf/spark-defaults.conf配置文件参数。 6. MASTER_URL格式 MASTER_URL 说明 local 以单线程在本地运行完全无并行 local[K] 在本地以K个Worker线程运行K设置为CPU核数较理想 local[*] KCPU核数 spark://HOST:PORT 连接Standalone集群的Master即Spark监控页面的URL端口默认为7077不支持省略 yarn-client 以client模式连接到YARN集群通过HADOOP_CONF_DIR环境变量查找集群 yarn-cluster 以cluster模式连接到YARN集群通过HADOOP_CONF_DIR环境变量查找集群 7. 注意 a) spark-shell默认使用4040端口当4040端口被占用时程序打印日志警告WARN并尝试递增端口4041、4042……直到找到可用端口为止。 b) Executor节点上每个Driver Program的jar包和文件会被复制到工作目录下可能占用大量空间。YARN集群会自动清除Standalone集群需配置“spark.worker.cleanup.appDataTtl”开启自动清除。 8. 应用程序模板 import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContextobject Test {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(Test)val sc new SparkContext(conf)// ...}
} 9. 提交示例 bin/spark-submit --master spark://ubuntu1:7077 --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.3-hadoop2.6.0.jar 核心原理 RDD概念 1. RDDResilient Distributed Dataset弹性分布式数据集。 2. 意义Spark最核心的抽象概念具有容错性基于内存的集群计算方法。 RDD核心组成 1. 5个核心方法。 a) getPartitions分区列表数据块列表 b) compute计算各分区数据的函数。 c) getDependencies对父RDD的依赖列表。 d) partitionerkey-value RDD的分区器。 e) getPreferredLocations每个分区的预定义地址列表如HDFS上的数据块地址。 2. 按用途分类以上5个方法 a) 前3个描述RDD间的血统关系Lineage必须有的方法 b) 后2个用于优化执行。 3. RDD的实例RDD[T]T为泛型即实例。 4. 分区 a) 分区概念将大数据量T实例集合split成多个小数据量的T实例子集合。 b) 分区源码实际上是Iterator[T]。 c) 分区存储例如以Block方式存在HDFS。 5. 依赖 a) 依赖列表一个RDD可有多个父依赖所以是父RDD依赖列表。 b) 与分区关系依赖是通过RDD分区间的依赖体现的通过依赖列表和getPartitions方法可知RDD各分区是如何依赖一组父RDD分区的。 6. compute方法 a) 延时lazy特性当触发Action时才真正执行compute方法 b) 计算粒度是分区而不是T元素。 7. partitioner方法T实例为key-value对类型的RDD。 8. RDD抽象类源码节选自v1.6.3 1 package org.apache.spark.rdd2 3 // …4 5 /**6 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,7 * partitioned collection of elements that can be operated on in parallel. This class contains the8 * basic operations available on all RDDs, such as map, filter, and persist. In addition,9 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
10 * pairs, such as groupByKey and join;
11 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
12 * Doubles; and
13 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
14 * can be saved as SequenceFiles.
15 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
16 * through implicit.
17 *
18 * Internally, each RDD is characterized by five main properties:
19 *
20 * - A list of partitions
21 * - A function for computing each split
22 * - A list of dependencies on other RDDs
23 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
24 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
25 * an HDFS file)
26 *
27 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
28 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
29 * reading data from a new storage system) by overriding these functions. Please refer to the
30 * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
31 * on RDD internals.
32 */
33 abstract class RDD[T: ClassTag](
34 transient private var _sc: SparkContext,
35 transient private var deps: Seq[Dependency[_]]
36 ) extends Serializable with Logging {
37 // ...
38
39 //
40 // Methods that should be implemented by subclasses of RDD
41 //
42
43 /**
44 * :: DeveloperApi ::
45 * Implemented by subclasses to compute a given partition.
46 */
47 DeveloperApi
48 def compute(split: Partition, context: TaskContext): Iterator[T]
49
50 /**
51 * Implemented by subclasses to return the set of partitions in this RDD. This method will only
52 * be called once, so it is safe to implement a time-consuming computation in it.
53 */
54 protected def getPartitions: Array[Partition]
55
56 /**
57 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
58 * be called once, so it is safe to implement a time-consuming computation in it.
59 */
60 protected def getDependencies: Seq[Dependency[_]] deps
61
62 /**
63 * Optionally overridden by subclasses to specify placement preferences.
64 */
65 protected def getPreferredLocations(split: Partition): Seq[String] Nil
66
67 /** Optionally overridden by subclasses to specify how they are partitioned. */
68 transient val partitioner: Option[Partitioner] None
69
70 // ...
71 } RDD依赖关系 1. 窄依赖与宽依赖外框表示RDD内框表示分区 2. 窄依赖父RDD每个分区最多被一个子RDD分区所用。 3. 宽依赖子RDD每个分区都依赖所有分区或多个分区。 4. 特性 a) pipeline操作窄依赖可pipeline操作即允许在单个集群节点上流水线式执行该节点可计算所有父分区。 b) RDD故障恢复窄依赖只需在故障集群节点上重新计算丢失的父分区并且可在不同节点上并行重新计算对于宽依赖失败节点可能导致一个RDD所有父RDD分区丢失都需重新计算。 5. WordCount依赖图 a) ShuffledRDD为宽依赖将DAG划分成两个Stage第1个Stage从HadoopRDD到MapPartitionsRDD生成ShuffleMapTask第2个Stage从ShuffledRDD到MapPartitionsRDD生成ResultTask。 b) 第一个Stage由3个ShuffleMapTask通过pipeline方式并行执行直至3个Task均执行结束至MapPartitionsRDD处。 DAG图 1. DAG在图论中如果一个有向图无法从任一顶点出发经过若干条边回到该点则这个图是一个有向无环图Directed Acyclic Graph。 2. Spark DAGSpark将数据在分布式环境下分区再将作业Job转化为DAG并分阶段进行DAG调度和任务分布式并行处理。 3. StageDAG调度时会根据Shuffle将Job划分Stage。如图RDD A到RDD B间、RDD F到RDD G间都需要Shuffle所以有3个StageRDD A、RDD C到RDD F、RDD B和RDD F到RDD G。 4. 流水线pipeline a) Spark采用贪心算法划分Stage即如果RDD的分区到父RDD分区是窄依赖则实施经典的Fusion融合优化把对应的Operation划分到一个Stage。 b) 如果连续RDD序列都是窄依赖则把多个Operation并到一个Stage直到遇到宽依赖。 c) pipeline好处减少大量的全局屏障barrier并无须物化很多中间结果RDD极大地提升性能。 RDD故障恢复机制 1. 假设一个RDD故障根据依赖关系和分区仅需要再执行一遍父RDD的相应分区。 2. 跨宽依赖的再执行涉及多个父RDD为避免故障RDD的大量父RDD再执行Spark保持Map阶段中间数据输出的持久再执行可获取相应分区的中间数据。 3. Spark提供数据checkpoint和记录日志持久化中间RDD。checkpoint直接将RDD持久化到磁盘或HDFS等存储与cache/persist方法不同checkpoint的RDD不会因作业结束而被消除一直存在并被后续作业直接读取加载。 Standalone模式的Spark架构 1. Standalone模式两种运行方式--deploy-mode参数控制 a) cluster方式Driver运行在Worker节点。 b) client方式Driver运行在客户端。 2. 作业执行流程cluster方式 a) 客户端提交Application给MasterMaster让一个Worker启动Driver即SchedulerBackendWorker创建一个DriverRunner线程DriverRunner启动SchedulerBackend进程。 b) Master会让其余Worker启动Executor即ExecutorBackendWorker创建一个ExecutorRunner线程ExecutorRunner会启动ExecutorBackend进程。 c) ExecutorBackend启动后向Driver的SchedulerBackend注册。 d) SchedulerBackend进程中包含DAGScheduler它根据用户程序生成执行计划并调度执行。对于每个Stage的Task都被存放到TaskScheduler中ExecutorBackend向SchedulerBackend汇报时把TaskScheduler中的Task调度到ExecutorBackend执行。 e) 所有Stage都完成后Application结束。 3. 故障恢复 a) 如果Worker发生故障Worker退出前将该Worker上的Executor杀掉Worker通过定时心跳让Master感知Worker故障而后汇报给Driver并将该Worker移除Driver可知该Worker上的Executor已被杀死。 b) 如果Executor发生故障ExecutorRunner汇报给Master由于Executor所在Worker正常Master则发送LaunchExecutor指令给该Worker让其再次启动一个Executor。 c) 如果Master发生故障通过ZooKeeper搭建的Master HA一个Active其他Standby切换Master。 YARN模式的Spark架构 1. YARN模式两种运行方式--deploy-mode参数控制 a) cluster方式Driver运行在NodeManager节点。 b) client方式Driver运行在客户端。 i. SparkAppMaster相当于Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。 ii. Executor相当于Standalone模式的ExecutorBackend。 2. 作业执行流程cluster方式 a) 客户端提交Application给ResourceManagerResourceManager在某一NodeManager汇报时把SparkAppMaster分配给NodeManagerNodeManager启动SparkAppMaster。 b) SparkAppMaster启动后初始化Application然后向ResourceManager申请资源申请后通过RPC让相应SparkAppMaster相当于Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。 c) Executor相当于Standalone模式的ExecutorBackend。的NodeManager启动SparkExecutor。 d) SparkExecutor向SparkAppMaster汇报并完成Task。 e) 此外SparkClient通过SparkAppMaster获取Application运行状态。 应用程序资源构建 1. 两种资源构建方式 a) 粗粒度应用程序提交后运行前根据应用程序资源需求一次性凑齐资源整个运行时不再申请资源。 b) 细粒度应用程序提交后动态向Cluster Manager申请资源只要等到资源满足一个Task的运行便开始运行该Task而不必等到所有资源全部到位。 2. Spark on YARN仅支持粗粒度构建方式。 API WordCount示例 1. 准备数据。 hadoop fs -mkdir -p /test/wordcount
hadoop fs -put README.md /test/wordcount 2. 执行程序。 spark-shell --master spark://centos1:7077 1 import org.apache.log4j.{Logger,Level}
2 Logger.getLogger(org.apache.spark).setLevel(Level.WARN)
3 Logger.getLogger(org.apache.spark.sql).setLevel(Level.WARN)
4 val textFile sc.textFile(/test/wordcount/README.md)
5 val wordCounts textFile.flatMap(line line.split( )).map(word (word, 1)).reduceByKey((count1, count2) count1 count2)
6 wordCounts.saveAsTextFile(/test/wordcount/result)
7 wordCounts.collect a) “flatMap(line line.split( ))”将文本每一行按空格拆分成单词RDD。 b) “map(word (word, 1))”将每个单词转换为单词单词数的二元组RDD。 c) “reduceByKey((count1, count2) count1 count2)”按key分组即按单词分组后每组内单词数求和。 d) “collect”Action操作将RDD全部元素转换为Scala Array返回给Driver Program。如果数据量过大会导致Driver Program内存不足。 3. 查看结果。 hadoop fs -cat /test/wordcount/WordCounts RDD构建 1. 加载外部存储系统的文件构建RDD a) 方法定义 def textFile(path: String, minPartitions: Int defaultMinPartitions): RDD[String] b) “sc.textFile(/test/directory)”加载指定目录下所有文件。 c) “sc.textFile(/test/directory/*.txt)”加载指定目录下所有txt格式的文件。 d) “sc.textFile(/test/directory/*.gz)”加载指定目录下所有gz格式的文件Hadoop内置支持.gz格式但不支持split。其他压缩格式参考文档。 e) “sc.textFile(/test/directory/**/*)”加载指定目录下所有文件包含子目录。 f) “sc.sequenceFile(/test/directory)”以序列文件方式加载指定目录下所有文件。 g) textFile方法和sequenceFile方法底层均调用hadoopFile方法只是参数不同均使用HadoopInputFormat的子类TextInputFormat和SequenceFileInputFormat。 2. 从Scala数据集构建RDD a) 方法定义 def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int defaultParallelism): RDD[T] b) 示例 1 val testRdd sc.parallelize(List(A, B, C, D), 2)
2 testRdd.partitions.size // 分区数2
3 testRdd.toDebugString // 查看Lineage RDD缓存与持久化 1. 主动持久化的目的RDD重用将计算结果RDD存储以供后续Operation使用。 2. persist方法将任意RDD缓存到内存、磁盘和Tachyon文件系统。 def persist(newLevel: StorageLevel): this.type 3. cache方法persist方法使用MEMORY_ONLY存储级别的快捷方式。 def cache() 4. 存储级别。 存储级别Storage Level 含义 MEMORY_ONLY 将RDD以反序列化deserializedJava对象存储到JVM。如果RDD不能被内存装下一些分区将不会被缓存并且在需要的时候被重新计算。这是默认级别。 MEMORY_AND_DISK 将RDD以反序列化Java对象存储到JVM。如果RDD不能被内存装下超出的分区被保存到硬盘上并且在需要是被读取。 MEMORY_ONLY_SER 将RDD以序列化serializedJava对象存储每一分区占用一个字节数组。通常比对象反序列化的空间利用率更高尤其当使用快速序列化器fast serializer但在读取时比较消耗CPU。 MEMORY_AND_DISK_SER 类似于MEMORY_ONLY_SER但把超出内存的分区存储在硬盘上而不是在每次需要时重新计算。 DISK_ONLY 只将RDD分区存储在硬盘上。 MEMORY_ONLY_2 MEMORY_AND_DISK_2 与上述存储级别一样但将每个分区都复制到两个集群节点上。 OFF_HEAP (experimental) 以序列化的格式将RDD存储到Tachyon…… 5. 数据移除 a) 自动集群内存不足时Spark根据LRULeast Recently Uesed最近最少使用算法删除数据分区。 b) 手动unpersit方法立即生效。 6. 演示效果。 a) 可在Spark监控页面Storage查看缓存生效情况 b) 内存不足时打印警告日志“Not enough space to cache rdd_ ... in memory ...”。 1 val file sc.textFile(/test/wordcount/README.md) // 可分别尝试小文件和超大文件视内存
2 file.cache // 缓存到内存lazy操作
3 file.count // Action操作触发lazy
4 file.unpersit // 释放缓存eager操作 RDD分区数 1. 加载文件创建RDD的分区数 a) “sc.defaultParallelism”默认并行数是加载文件创建RDD的分区数最小值参考实际的分区数由加载文件时的split数决定即文件的HDFS block数也可以由加载时的API参数制定分区数。 b) “sc.defaultParallelism”取配置项“spark.default.parallelism”的值集群模式缺省为8、本地模式缺省为总内核数。 c) 示例。 1 val textFile sc.textFile(/test/README.md)
2 textFile.toDebugString // 查看Lineage
3 textFile.partitions.size // 分区数4
4 sc.defaultParallelism // 默认并行数4 2. key-value RDD的分区数 a) partitioner方法是针对key-value RDD的分区器默认使用HashPartitioner。 b) 通过源码可知没有设置分区数时会使用“spark.default.parallelism”配置项的值作为默认分区数。 1 // …2 package org.apache.spark3 // …4 object Partitioner {5 /**6 * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.7 *8 * If any of the RDDs already has a partitioner, choose that one.9 *
10 * Otherwise, we use a default HashPartitioner. For the number of partitions, if
11 * spark.default.parallelism is set, then well use the value from SparkContext
12 * defaultParallelism, otherwise well use the max number of upstream partitions.
13 *
14 * Unless spark.default.parallelism is set, the number of partitions will be the
15 * same as the number of partitions in the largest upstream RDD, as this should
16 * be least likely to cause out-of-memory errors.
17 *
18 * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
19 */
20 def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner {
21 val bySize (Seq(rdd) others).sortBy(_.partitions.size).reverse
22 for (r - bySize if r.partitioner.isDefined r.partitioner.get.numPartitions 0) {
23 return r.partitioner.get
24 }
25 if (rdd.context.conf.contains(spark.default.parallelism)) {
26 new HashPartitioner(rdd.context.defaultParallelism)
27 } else {
28 new HashPartitioner(bySize.head.partitions.size)
29 }
30 }
31 }
32 // … 共享变量 1. 普通变量的问题远程机器上对变量的修改无法传回Driver程序。当Spark以多个Task在不同Worker上并执行一个函数时它传递每一个变量的副本并缓存在Worker上。 2. 分类广播变量、累加器。 3. 广播变量将只读变量缓存在每台Worker节点的cache而不是每个Task发送一份副本。 1 val broadcastVar sc.broadcast(Array(1, 2, 3)) // 初始化
2 broadcastVar.value // 获取 4. 累加器能够进行“加”操作的变量Spark原生支持Int和Double类型可自定义新类型支持。 1 val accumulator sc.accumulator(0) // 初始化
2 accumulator 100 // 累加
3 accumulator.value // 获取 RDD Operation 1. RDD Operation分为Transformation和Action a) Transformation从已存在的RDD上创建一个新的RDD是RDD的逻辑操作并无真正计算lazy而是形成DAG。 b) Action提交一个与前一个Action之间的所有Transformation组成的Job进行计算。 2. 常用Operation罗列后面会详细展开各Operation的用法 a) 通用RDD Transformation 名称 说明 map(func) 数据集中的每条元素经过func函数转换后形成一个新的分布式数据集 filter(func) 过滤作用选取数据集中让函数func返回值为true的元素形成一个新的数据集 flatMap(func) 类似map但每个输入项可以被映射到0个或更多的输出项所以func应该返回一个Seq而不是一个单独项 mapPartitions(func) 类似map但单独运行在RDD每个分区块因此运行类型为Type TRDD上时func类型必须是IteratorT IteratorU mapPartitionsWithIndex(func) 与mapPartitions相似但也要提供func与一个代表分区的的索引整数项因此所运行的RDD为Type T时func类型必须是(Int, IteratorT) IteratorU sample(withReplacement, fraction) 根据给定的随机种子seed随机抽样出数量为fraction的数据可以选择有无替代replacement union(otherDataset) 返回一个由原数据集和参数联合而成的新的数据集 intersection(otherDataset) 返回一个包含数据集交集元素的新的RDD和参数 distinct([numTasks]) 返回一个数据集去重过后的新的数据集 cartesian(otherDataset) 当在数据类型为T和U的数据集上调用时返回由(T, U)对组成的一个数据集 pipe(command, [envVars]) 通过一个shell命令如Perl或bash脚本流水化各个分区的RDD。RDD元素被写入到进程的stdin输出到stdout的行将会以一个RDD字符串的形式返回 coalesce(numPartitions) 将RDD分区的数目合并为numPartitions repartition(numPartitions) 在RDD上随机重洗数据从而创造出更多或更少的分区以及它们之间的平衡。这个操作将重洗网络上所有的数据 b) key-value RDD Transformation 名称 说明 groupByKey([numTasks]) 当在一个由键值对(K, V)组成的数据集上调用时按照key进行分组返回一个(K, IterableV)对的数据集。 注意 1) 如果是为了按照key聚合数据如求和、平均值而进行分组使用reduceByKey或combineByKey方法性能更好 2) 默认情况下输出的并行程度取决于父RDD的分区数。可以通过传递一个可选的的numTasks参数设置不同的并行任务数 reduceByKey(func, [numTasks]) 当在一个键值对(K, V)组成的数据集上调用时按照key进行分组使用给定func聚合values值返回一个键值对(K, V)数据集其中func函数的类型必须是(V, V) V。类似于groupByKey并行任务数可通过可选的第二个参数配置 sortByKey([ascending], [numTasks]) 返回一个以key排序升序或降序的(K, V)键值对组成的数据集其中布尔型参数ascending决定升序还是降序而numTasks为并行任务数 join(otherDataset, [numTasks]) 根据key连接两个数据集将类型为(K, V)和(K, W)的数据集合并成一个(K, (V, W))类型的数据集。外连接通过leftouterjoin和rightouterjoin其中numTasks为并行任务数 cogroup(otherDataset, [numTasks]) 当在两个形如(K, V)和(K, W)的键值对数据集上调用时返回一个(K, IterableV, IterableW)形式的数据集 c) 通用RDD Action 名称 说明 reduce(func) 通过函数func聚集数据集中的所有元素func函数接收两个参数返回一个值这个函数必须满足交换律和结合律以确保可以被正确地并发执行 collect() 在Driver程序中以数组形式返回数据集的所有元素到Driver程序为防止Driver程序内存溢出一般要控制返回的数据集大小 count() 返回数据集的元素个数 first() 返回数据集的第一个元素 take(n) 以数组形式返回数据集上前n个元素 takeSample(withReplacement, num, seed) 返回一个数组由数据集中随机采样的num个元素组成可以选择是否用随机数替换不足的部分seed用于指定的随机数生成器种子 takeOrdered(n, [ordering]) 使用自然顺序或自定义比较器返回前n个RDD元素 foreach(func) 在数据集的每个元素上运行func。具有副作用如会更新累加器变量或与外部存储系统相互作用 d) key-value RDD Action 名称 说明 countByKey 返回形如(K, int)的hashmap对每个key的个数计数 RDD Operation隐式转换 1. 隐式转换函数为装载不同类型的RDD提供了相应的额外方法。 2. 隐式转换后的类包括以下几种 a) PairRDDFunctions输入的数据单元是2元元组分别为key和value。 b) DoubleRDDFunctions输入的数据单元可隐式转换为Scala的Double类型。 c) OrderedRDDFunctions输入的数据单元是2元元组并且key可排序。 d) SequenceFileRDDFunctions输入的数据单元是2元元组。 RDD[T]分区Operation 1. coalesce a) 定义 def coalesce(numPartitions: Int, shuffle: Boolean false)(implicit ord: Ordering[T] null): RDD[T] 参数 numPartitions 新分区数 shuffle 重新分区时是否shuffle b) 功能 返回分区数为numPartitions的新RDD。如果增加分区数shuffle必须为true否则重新分区无效。 c) 示例 1 val rdd sc.parallelize(List(A, B, C, D, E), 2)
2 rdd.partitions.size // 分区数2
3 rdd.coalesce(1).partitions.size // 分区数2→1
4 rdd.coalesce(4).partitions.size // 分区数2→4无效仍然是2
5 rdd.coalesce(4, true).partitions.size // 分区数2→4 d) 应用场景 i. 大数据RDD过滤后各分区数据量非常小可重新分区减小分区数把小数据量分区合并成一个分区。 ii. 小数据量RDD保存到HDFS前可重新分区减小分区数比如1保存成1个文件从而减少小文件个数也方便查看。 iii. 分区数过少导致CPU使用率过低时可重新分区增加分区数从而提高CPU使用率提升性能。 2. repartition a) 定义 def repartition(numPartitions: Int)(implicit ord: Ordering[T] null): RDD[T] 参数 numPartitions 新分区数 b) 功能 从源码可看出repartition是coalesce shuffle为true的版本故不在赘述。 1 /**2 * Return a new RDD that has exactly numPartitions partitions.3 *4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses5 * a shuffle to redistribute data.6 *7 * If you are decreasing the number of partitions in this RDD, consider using coalesce,8 * which can avoid performing a shuffle.9 */
10 def repartition(numPartitions: Int)(implicit ord: Ordering[T] null): RDD[T] withScope {
11 coalesce(numPartitions, shuffle true)
12 } c) 示例 1 val rdd sc.parallelize(List(A, B, C, D, E), 2)
2 rdd.partitions.size // 分区数2
3 rdd.repartition(1).partitions.size // 分区数2→1
4 rdd.repartition(4).partitions.size // 分区数2→4 d) 应用场景 与coalesce一致。 RDD[T]常用聚合Operation 1. aggregate a) 定义 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U): U 参数 zeroValue 执行seqOp和combOp的初始值。源码注释the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation) seqOp 聚合分区的函数。源码注释an operator used to accumulate results within a partition combOp 聚合seqOp结果的函数。源码注释an associative operator used to combine results from different partitions b) 功能 先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型再使用combOp将之前每个分区聚合后的U类型聚合成U类型注意seqOp和combOp都会使用zeroValue作为初始值zeroValue的类型为U。 c) 示例 1 var rdd sc.makeRDD(1 to 10, 2)
2 rdd.aggregate(2)({(x: Int, y: Int) x y}, {(a: Int, b: Int) a * b})
3 // 分区12 1 2 3 4 5 17
4 // 分区22 6 7 8 9 10 42
5 // 最后2 * 17 * 42 1428 2. reduce a) 定义 def reduce(f: (T, T) T): T 参数 f 合并函数二变一 b) 功能 将RDD中元素两两传递给输入f函数同时产生一个新的值新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。 c) 示例 1 var rdd sc.makeRDD(1 to 10, 2)
2 rdd.reduce((x, y) if (x y) x else y) // 求最大值10
3 import java.lang.Math
4 rdd.reduce((x, y) Math.max(x, y)) // 求最大值与上面逻辑一致
5 rdd.reduce((x, y) x y) // 求和55 3. fold a) 定义 def fold(zeroValue: T)(op: (T, T) T): T 参数 zeroValue op函数的初始值 op 合并函数二变一 b) 功能 以zeroValue为初始值将RDD中元素两两传递给输入f函数同时产生一个新的值新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。 c) 示例 1 var rdd sc.makeRDD(1 to 10, 2)
2 rdd.fold(100)((x, y) if (x y) x else y) // 求最大值100
3 import java.lang.Math
4 rdd.reduce((x, y) Math.max(x, y)) // 求最大值与上面逻辑一致
5 rdd.reduce((x, y) x y) // 求和155 4. 三者关系 fold比reduce多一个初始值fold是aggregate seqOp和combOp函数相同时的简化版。 RDD间操作Operation 1. cartesian a) 定义 def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 参数 other 另一个RDD b) 功能 笛卡尔积。 c) 示例 1 val rdd1 sc.parallelize(List[String](A, B, C, D, E), 2)
2 val rdd2 sc.parallelize(List[Int](1, 2, 3, 4, 5, 6), 2)
3 val rdd3 rdd1 cartesian rdd2
4 rdd3.collect
5 val rdd4 rdd2 cartesian rdd1
6 rdd4.collect 2. unioin a) 定义 def union(other: RDD[T]): RDD[T] 参数 other 另一个RDD b) 功能 联合两个RDD注意不会去重。 union实际是将父依赖RDD所有分区合并成各自分区最终的分区与父依赖RDD分区一一对应。 c) 示例 1 val rdd1 sc.parallelize(List[String](A, B, C), 2)
2 val rdd2 sc.parallelize(List[String](D, E), 1)
3 rdd1.partitions.size // 分区数2
4 rdd2.partitions.size // 分区数1
5 val rdd3 rdd1 union rdd2
6 rdd3.partitions.size // 分区数3
7 rdd3.collect // Array(A, B, C, C, D) 3. zip a) 定义 def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] 参数 other 另一个RDD b) 功能 拉链操作将两个RDD中第i个元素组成一个元祖形成key-value的二元组PairRDD。 注意两个RDD分区数必须一致否则报错“Can’t zip RDDs with unequal numbers of partitions”两个RDD元素个数必须一致否则报错“Can only zip RDDs with same number of elements”。 c) 示例 1 val rdd1 sc.parallelize(1 to 4, 2)
2 val rdd2 sc.parallelize(a b c d.split( ), 2)
3 val rdd3 rdd1 zip rdd2
4 rdd3.collect // Array((1, a), (2, b), (3, c), (4, d)) 4. zipPartitions a) 定义 def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) Iterator[V]): RDD[V] 参数 f 拉链操作函数 b) 功能 NN2, 3, 4个RDD的拉链操作具体操作由f函数指定。 c) 示例 val rdd1 sc.parallelize(1 to 4, 2)
val rdd2 sc.parallelize(a b c d.split( ), 2)
def zipFunc(aIter: Iterator[Int], bIter: Iterator[String]): Iterator[String] {var list List[String]()while (aIter.hasNext bIter.hasNext) {val str aIter.next bIter.nextlist :: str}list.iterator
}
val rdd3 rdd1.zipPartitions(rdd1, rdd2)(zipFunc)
rdd3.collect // Array(1a, 2b, 3c, 4d) 5. zipWithUniqueId a) 定义 def zipWithUniqueId(): RDD[(T, Long)] b) 功能 将当前RDD元素与索引i进行拉链操作。 c) 示例 1 val rdd sc.parallelize(a b c d.split( ), 2)
2 rdd.zipWithUniqueId.collect // Array((a, 0), (b, 1), (c, 2), (d, 3)) DoubleRDDFunctions常用Operation 1. histogram a) 定义 def histogram(buckets: Array[Double], evenBuckets: Boolean false): Array[Long]
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] 参数 buckets 分桶区间左闭右开区间“[)” evenBuckets 是否采用常亮时间内快速分桶方法 bucketCount 平均分桶每桶区间 b) 功能 生成柱状图的分桶。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.histogram(Array(0.0, 4.1, 9.0))
3 rdd.histogram(Array(0.0, 4.1, 9.0), true)
4 rdd.histogram(3) 2. mean/meanApprox a) 定义 def mean(): Double
def meanApprox(timeout: Long, confidence: Double 0.95): PartialResult[BoundedDouble] b) 功能 Mean计算平均值meanApprox计算近似平均值。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.mean 3. sampleStdev a) 定义 def sampleStdev(): Double b) 功能 计算样本标准偏差sample standard deviation。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.sampleStdev 4. sampleVariance a) 定义 def sampleVariance(): Double b) 功能 计算样本偏差sample variance。 c) 示例 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
rdd.sampleVariance 5. stats a) 定义 def stats(): StatCounter b) 功能 返回org.apache.spark.util.StatCounter对象包括平均值、标准偏差、最大值、最小值等。 c) 示例 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
rdd.stats 6. stdev a) 定义 def stdev(): Double b) 功能 计算标准偏差standard deviation。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.stdev 7. sum/sumApprox a) 定义 def sum(): Double
def sumApprox(timeout: Long, confidence: Double 0.95): PartialResult[BoundedDouble] b) 功能 sum计算总和sumApprox计算近似总和。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.sum
3 rdd.sumApprox 8. variance a) 定义 def variance(): Double b) 功能 计算方差variance。 c) 示例 1 val rdd sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
2 rdd.variance PairRDDFunctions聚合Operation 1. aggregateByKey a) 定义 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) U, combOp: (U, U) U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) U, combOp: (U, U) U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) U, combOp: (U, U) U): RDD[(K, U)] 参数 zeroValue 参考aggregate seqOp 参考aggregate combOp 参考aggregate numPartitions 分区数使用new HashPartitioner(numPartitions)分区器 partitioner 指定自定义分区器 b) 功能 aggregateByKey与aggregate功能类似区别在于前者仅对key相同的聚合。 c) 示例 1 val rdd sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3))) // Array((1,9), (2,3))
2 import java.lang.Math
3 rdd.aggregateByKey(1)({(x: Int, y: Int) Math.max(x, y)}, {(a: Int, b: Int) a b}).collect 2. combineByKey a) 定义 def combineByKey[C](createCombiner: V C, mergeValue: (C, V) C, mergeCombiners: (C, C) C): RDD[(K, C)]
def combineByKey[C](createCombiner: V C, mergeValue: (C, V) C, mergeCombiners: (C, C) C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V C, mergeValue: (C, V) C, mergeCombiners: (C, C) C, partitioner: Partitioner, mapSideCombine: Boolean true, serializer: Serializer null): RDD[(K, C)] 参数 createCombiner 组合器函数用于将V类型转换成C类型输入参数为RDD[K,V]中的V,输出为C mergeValue 合并值函数将一个C类型和一个V类型值合并成一个C类型输入参数为(C,V)输出为C mergeCombiners 合并组合器函数用于将两个C类型值合并成一个C类型输入参数为(C,C)输出为C mapSideCombine 是否需要在Map端进行combine操作类似于MapReduce中的combine默认为true b) 功能 将RDD[(K,V)]combine为RDD[(K,C)]。非常重要aggregateByKey、foldByKey、reduceByKey等函数都基于它实现。 c) 示例 1 val rdd sc.parallelize(List((1, www), (1, iteblog), (1, com), (2, bbs), (2, iteblog), (2, com), (3, good)))
2 rdd.combineByKey(List(_), (x: List[String], y: String) y :: x, (x: List[String], y: List[String]) x ::: y).collect // Array((1, List(www, iteblog, com)), (2, List(bbs, iteblog, com)), (3, List(good))) 1 val rdd sc.parallelize(List((iteblog, 1), (bbs, 1), (iteblog, 3)))
2 rdd.combineByKey(x x, (x: Int, y: Int) x y, (x: Int, y: Int) x y).collect // Array((iteblog, 4), (bbs, 1)) d) 应用场景 combineByKey将大数据的处理转为对小数据量的分区级别处理然后合并各分区处理后再次进 行聚合提升了对大数据量的处理性能。 3. foldByKey a) 定义 def foldByKey(zeroValue: V)(func: (V, V) V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) V): RDD[(K, V)] 参数 zeroValue func函数的初始值 func 合并函数二变一 b) 功能 foldByKey与fold功能类似区别在于前者仅对key相同的聚合。 c) 示例 1 var rdd sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1)))
2 rdd.foldByKey(0)(__).collect // Array((A,2), (B,3), (C,1))
3 rdd.foldByKey(2)(__).collect // Array((A,6), (B,7), (C,3)) 4. reduceByKey a) 定义 def reduceByKey(func: (V, V) V): RDD[(K, V)]
def reduceByKey(func: (V, V) V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) V): RDD[(K, V)] 参数 func 合并函数二变一 b) 功能 reduceByKey与reduce功能类似区别在于前者仅对key相同的聚合。 c) 示例 1 var rdd sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1)))
2 rdd.reduceByKey(__).collect // Array((A,2), (B,3), (C,1)) PairRDDFunctions间操作Operation 1. join族 a) 定义 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))] 参数 other 另一个RDD b) 功能 两个RDD连接操作。 c) 示例 1 val rdd1 sc.parallelize(List((Tom, 21), (Jerry, 31), (Mary, 23)))
2 val rdd2 sc.parallelize(List((Tom, m), (Mary, f), (Henry, m)))
3 val rdd3 rdd1 join rdd2
4 rdd3.collect // Array((Mary, (23, f)), (Tom, (21, m))) OrderedRDDFunctions常用Operation 1. sortByKey a) 定义 def sortByKey(ascending: Boolean true, numPartitions: Int self.partitions.length): RDD[(K, V)] 参数 ascending 是否正序 numPartitions 新分区数默认为原分区数 b) 功能 返回按key排序的新RDD。 c) 示例 1 val rdd1 sc.parallelize(List((3, a), (7, b), (5, c), (3, b), (6, c), (9, d)), 3)
2 val rdd2 rdd1.sortByKey
3 rdd2.collect // Array((3, a), (3, b), (5, c), (6, c), (7, b), (9, d))
4 rdd2.partitions.size // 分区数3
5 val rdd3 rdd1.sortByKey(true, 2)
6 rdd3.collect
7 rdd3.partitions.size // 分区数2 2. repartitionAndSortWithinPartitions a) 定义 def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] 参数 partitioner 分区器 b) 功能 返回使用分区器partitioner重新分区并且对各分区按key排序的新RDD。 c) 示例 1 val rdd1 sc.parallelize(List((3, a), (7, b), (5, c), (3, b), (6, c), (9, d)), 3)
2 import org.apache.spark.HashPartitioner
3 val rdd2 rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(2)) // HashPartitioner(2)以key对分区数取模分区所以奇数、偶数分到两个分区。
4 rdd2.collect // Array((6, c), (3, a), (3, b), (5, c), (7, b), (9, d))
5 rdd1.partitions.size // 分区数3
6 rdd2.partitions.size // 分区数2 案例移动终端上网数据分析 数据准备 1. 数据结构移动终端上网访问记录 字段 说明 nodeid 基站ID ci 小区标识cell identity imei 国际移动设备标识码IMEI app 应用名称 time 访问时间 uplinkbytes 上行字节数 downlinkbytes 下行字节数 2. 测试数据 1,1,460028714280218,360,2015-05-01,7,1116
1,2,460028714280219,qq,2015-05-02,8,121
1,3,460028714280220,yy,2015-05-03,9,122
1,4,460028714280221,360,2015-05-04,10,119
2,1,460028714280222,yy,2015-05-05,5,1119
2,2,460028714280223,360,2015-05-01,12,121
2,3,460028714280224,qq,2015-05-02,13,122
3,1,460028714280225,qq,2015-05-03,1,1117
3,2,460028714280226,qq,2015-05-04,9,1118
3,3,460028714280227,qq,2015-05-05,10,120
1,1,460028714280218,360,2015-06-01,7,1118
1,2,460028714280219,qq,2015-06-02,8,1119
1,3,460028714280220,yy,2015-06-03,9,1120
1,4,460028714280221,360,2015-06-04,10,119
2,1,460028714280222,yy,2015-06-05,11,1118
2,2,460028714280223,360,2015-06-02,4,121
2,3,460028714280224,qq,2015-06-03,17,1119
3,1,460028714280225,qq,2015-06-04,18,119
3,2,460028714280226,qq,2015-06-05,19,1119
3,3,460028714280227,qq,2015-06-10,120,121 3. 上传数据文件至HDFS hadoop fs -put mobile.csv /test/ 加载预处理 预处理如无效数据过滤等。 1 val fields sc.broadcast(List(nodeid, ci, imei, app, time, uplinkbytes, downlinkbytes))
2 val mobile sc.textFile(/test/mobile.csv).map(_.split(,)).filter(line line.length ! fields.value.length) 统计App访问次数 1 mobile.map(line (line(fields.value.indexOf(app)), 1)).reduceByKey(__).map(appCount (appCount._2, appCount._1)).sortByKey(false).map(appCount (appCount._1, appCount._2)).repartition(1).saveAsTextFile(/text/result.csv) // Array((qq, 10), (360, 6), (yy, 4)) 统计DAU 1 mobile.map(line line(fields.value.indexOf(imei)) : line(fields.value.indexOf(time))).distinct().map(imeiTime (imeiTime.split(:)(1), 1)).reduceByKey(__).sortByKey().collect // Array((2015-05-01, 2), (2015-05-02, 2), (2015-05-03, 2), (2015-05-04, 2), (2015-05-05, 2), (2015-06-01, 2), (2015-06-03, 2), (2015-06-04, 2), (2015-06-05, 2), (2015-06-10, 1)) 统计MAU 1 mobile.map { line
2 val time line(fields.value.indexOf(time))
3 val month time.substring(0, time.lastIndexOf(-))
4 line(fields.value.indexOf(imei)) : month
5 }.distinct.map { imeiMonth (imeiMonth.split(:)(1), 1) }.reduceByKey(__).sortByKey().collect // Array((2015-05, 10), (2015-06, 10)) 统计App上下流量 1 mobile.map { line
2 val uplinkbytes line(fields.value.indexOf(uplinkbytes))
3 val downlinkbytes line(fields.value.indexOf(downlinkbytes))
4 (line(fields.value.indexOf(app)), (uplinkbytes, downlinkbytes))
5 }.reduceByKey((updownlinkbytes1, updownlinkbytes2) (updownlinkbytes1._1 updownlinkbytes2._1, updownlinkbytes1._2 updownlinkbytes2._2)).collect // Array((yy, (34.0, 3479.0)), (qq, (117.0, 6195.0)), (360, (54.0, 2714.0))) 作者netoxi出处http://www.cnblogs.com/netoxi本文版权归作者和博客园共有欢迎转载未经同意须保留此段声明且在文章页面明显位置给出原文连接。欢迎指正与交流。 posted on 2017-08-02 08:59 netoxi 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/netoxi/p/7223412.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/89044.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!