大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
//3. 构建 SparkContext 对象
val sc = new SparkContext ( conf )
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
//5. 输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
println ( "rdd.glom().collect() 的内容是 :" )
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 读取指定目录下的小文件
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
//6. 获取小文件中的内容
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
println ( "---------------------------" )
println ( array . mkString ( "|" ))
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 创建 Rdd
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
//groupBy 插入的函数的用意是指定按照谁进行分组
// 分组后的结果是有二元组组成的 RDD
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
// 收集到 Driver
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println ( result1 . mkString ( "," ))
//7. 使用 map 转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/69194.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[创业之路-286]:《产品开发管理-方法.流程.工具 》-1- IPD两个跨职能团队的组织

IPD&#xff08;集成产品开发&#xff09;中的两个重要跨职能组织是IPMT&#xff08;集成产品管理团队&#xff09;和PDT&#xff08;产品开发团队&#xff09;。 在IPD&#xff08;集成产品开发&#xff09;体系中&#xff0c;IRB&#xff08;投资评审委员会&#xff09;、IPM…

maven详细讲解

学习目标 那什么是mavenmaven概念以及核心思想maven构建的生命周期、阶段以及目标maven仓库有哪些&#xff1f;maven依赖 那什么是maven&#xff1f;maven概念以及核心思想&#xff0c;maven构建的生命周期、阶段以及目标&#xff1f; 那什么是maven Maven是一个项目管理和构建…

DeepSeek 提示词之角色扮演的使用技巧

老六哥的小提示&#xff1a;我们可能不会被AI轻易淘汰&#xff0c;但是会被“会使用AI的人”淘汰。 在DeepSeek的官方提示库中&#xff0c;有“角色扮演&#xff08;自定义人设&#xff09;”的提示词案例。截图如下&#xff1a; 在“角色扮演”的提示词案例中&#xff0c;其实…

第二个Qt开发实例:在Qt中利用GPIO子系统和sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口(效果为LED2灯的灭和亮)

引言 本文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145420998 里的代码&#xff0c;在那里面代码的基础上添加上利用sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口的代码&#xff0c;进而实现LED2灯的灭和亮。 最终的效果是点击下面的LED按钮实现LED…

登山第十七梯:矩形拟合——无惧噪声

文章目录 一 摘要 二 资源 三 内容 (文章末尾提供源代码) 一 摘要 目前,获取点集的矩形拟合结果的主要方法是计算其最小外包直立矩形或者旋转矩形。这些方法简单、易用,在数据质量良好的情况下能够较好的贴合矩形形状。然而,在数据缺失时,最小外包围盒方法将会…

57. Uboot图形化界面配置

一、Uboot图形化配置方法 1、通过终端配置。 2、进入到uboot的源码根目录下。 3、首先默认配置 make mx6ull_alientek_emmc_defconfig //默认配置 4、输入make menuconfig。打开图形化配置界面。 5、注意&#xff0c;新电脑需要安装ncurses库。sudo apt-get install libncurs…

Java面试场景题分享

假设你在做电商秒杀活动&#xff0c;秒杀开始时&#xff0c;成千上万的用户同时请求抢购商品。你会如何设计系统来处理这些请求&#xff0c;确保库存不超卖 你如何保证库存的准确性&#xff1f; 这个问题引导你思考如何在高并发下确保库存更新的原子性&#xff0c;最直接的方式…

kalman滤波器C++设计仿真实例第三篇

1. 仿真场景 水面上有条船在做匀速直线航行&#xff0c;航行过程中由于风和浪的影响&#xff0c;会有些随机的干扰&#xff0c;也就是会有些随机的加速度作用在船身上&#xff0c;这个随机加速度的均方差大约是0.1&#xff0c;也就是说方差是0.01。船上搭载GPS设备&#xff0c;…

(2025|ICLR,音频 LLM,蒸馏/ALLD,跨模态学习,语音质量评估,MOS)音频 LLM 可作为描述性语音质量评估器

Audio Large Language Models Can Be Descriptive Speech Quality Evaluators 目录 1. 概述 2. 研究背景与动机 3. 方法 3.1 语音质量评估数据集 3.2 ALLD 对齐策略 4. 实验结果分析 4.1 MOS 评分预测&#xff08;数值评估&#xff09; 4.2 迁移能力&#xff08;在不同…

stm32生成hex文件详解

1.产生的map文件干啥的&#xff1f; 2.组成情况&#xff1f;&#xff1f;&#xff1f; 废话少说&#xff0c;直接上代码具体内容况&#xff1a; Component: ARM Compiler 5.06 update 7 (build 960) Tool: armlink [4d3601]Section Cross Referencesstartup_stm32f103xe.o(S…

百度热力图数据获取,原理,处理及论文应用6

目录 0、数据简介0、示例数据1、百度热力图数据日期如何选择1.1、其他实验数据的时间1.2、看日历1.3、看天气 2、百度热力图几天够研究&#xff1f;部分文章统计3、数据原理3.1.1 ** 这个比较重要&#xff0c;后面还会再次出现。核密度的值怎么理解&#xff1f;**3.1.2 Csv->…

汽车自动驾驶AI

汽车自动驾驶AI是当前汽车技术领域的前沿方向&#xff0c;以下是关于汽车自动驾驶AI的详细介绍&#xff1a; 技术原理 感知系统&#xff1a;自动驾驶汽车通过多种传感器&#xff08;如激光雷达、摄像头、雷达、超声波传感器等&#xff09;收集周围环境的信息。AI算法对这些传感…

[转]Java面试近一个月的面试总结

本文是在学习中的总结&#xff0c;欢迎转载但请注明出处&#xff1a;http://blog.csdn.net/pistolove/article/details/46753275 前言 打算换个工作&#xff0c;近一个月面试了不少的公司&#xff0c;下面将一些面试经验和思考分享给大家。另外校招也快要开始了&#xff0c;为…

出租车特殊计费表算法解析与实现

目录 引言算法核心概念 特殊计费规则解析数据类型与输入输出算法数学原理 数字位判断与处理逻辑数值转换与累加计算算法框架图Python 实现 代码展示代码解析Python 实现的优势与局限C 语言实现 代码展示代码解析C 语言实现的性能特点性能分析与优化 性能分析 时间复杂度空间复杂…

学习threejs,tga格式图片文件贴图

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️TGA图片1.2 ☘️THREE.Mesh…

MSPFN 代码复现

1、环境配置 conda create -n MSPFN python3.9 conda activate MSPFN pip install opencv-python pip install tensorflow pip install tqdm pip install matplotlib2、train 2.1 创建数据集 2.1.1 数据集格式 |--rainysamples |--file1&#xff1a; |--file2:|--fi…

【创建模式-单例模式(Singleton Pattern)】

赐萧瑀 实现方案饿汉模式懒汉式&#xff08;非线程安全&#xff09;懒汉模式&#xff08;线程安全&#xff09;双重检查锁定静态内部类 攻击方式序列化攻击反射攻击 枚举(最佳实践)枚举是一种类 唐 李世民 疾风知劲草&#xff0c;板荡识诚臣。 勇夫安识义&#xff0c;智者必怀仁…

2025职业发展规划

2025职业发展规划 我是一名大公司的高级移动应用开发技术专家&#xff0c;目前参与了鸿蒙App开发&#xff0c;对鸿蒙的TS语言也有所了解。现在需要制定2025年的职业发展规划&#xff0c;包括学习内容和方向&#xff0c;并以思维导图的形式呈现。我需要梳理出合适的发展路径。首…

如何查看linux机器有几个cpu

在 Linux 机器上&#xff0c;你可以使用以下几种方法来查看 CPU 的数量&#xff08;物理 CPU 和逻辑 CPU&#xff09;&#xff1a; 方法 1&#xff1a;使用 lscpu 命令 lscpu输出示例&#xff1a; CPU(s): 8 Thread(s) per core: 2 Core(s) per socket: 4 Soc…

fputs的概念和使用案例

fputs 是 C 语言中用于向文件写入字符串的标准库函数。它与 puts 类似&#xff0c;但不会自动添加换行符&#xff0c;且支持向任意文件流&#xff08;如磁盘文件、标准输出等&#xff09;写入数据。 概念解析 函数原型&#xff1a;int fputs(const char *str, FILE *stream); …