(精华)转:RDD:创建的几种方式(scala和java)

转: https://blog.csdn.net/weixin_38750084/article/details/82769600 

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {public static void main(String[] args) {SparkConf conf=new SparkConf();conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);System.out.println(sc);}}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/*** 引用外部文件系统的数据集(HDFS)创建RDD*  匿名内部类定义函数传给spark* @author 汤高**/
public class RDDOps {//完成对所有行的长度求和public static void main(String[] args) {SparkConf conf=new SparkConf();conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);System.out.println(sc);//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {public Integer call(String s) { System.out.println("每行长度"+s.length());return s.length(); }});//运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {public Integer call(Integer a, Integer b) { return a + b; }});System.out.println(totalLength);//为了以后复用  持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}
}

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/*** 引用外部文件系统的数据集(HDFS)创建RDD *  外部类定义函数传给spark* @author 汤高**/
public class RDDOps2 {// 完成对所有行的长度求和public static void main(String[] args) {SparkConf conf = new SparkConf();conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);System.out.println(sc);//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengthsJavaRDD<Integer> lineLengths = lines.map(new GetLength());//运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Sum());System.out.println("总长度"+totalLength);// 为了以后复用 持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}//定义map函数//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型static class GetLength implements Function<String, Integer> {public Integer call(String s) {return s.length();}}//定义reduce函数 //第一个参数为内容,第三个参数为函数操作完后返回的结果类型static class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) {return a + b;}}
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/*** 并行化一个已经存在于驱动程序中的集合创建RDD* @author 汤高**/
public class RDDOps3 {// 完成对所有数求和public static void main(String[] args) {SparkConf conf = new SparkConf();conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);System.out.println(sc);List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);//并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDDJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> lineLengths = distData.map(new GetLength());// 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,// 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Sum());System.out.println("总和" + totalLength);// 为了以后复用 持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}// 定义map函数static class GetLength implements Function<Integer, Integer> {@Overridepublic Integer call(Integer a) throws Exception {return a;}}// 定义reduce函数static class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) {return a + b;}}
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

 

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]  
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330334.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

POJ3278(BFS入门)

Problem Descrption Farmer John has been informed of the location of a fugitive cow and wants to catch her immediately. He starts at a point N (0 ≤ N ≤ 100,000) on a number line and the cow is at a point K (0 ≤ K ≤ 100,000) on the same number line. Far…

漫画:什么是架构师

转载自 什么是架构师&#xff1f; 架构师英文architect&#xff0c;这个词源于建筑学。软件工程当中的架构师和建筑工程当中建筑师有许多相通之处&#xff0c;都是负责“产品”宏观的架构设计。在一个团队里&#xff0c;架构师充当了技术Leader的角色&#xff0c;不仅要完成项目…

转:随机过程好书推荐

转自&#xff1a; https://blog.csdn.net/rastlos/article/details/8928510 随机过程实在太重要了,用当年林元烈上课给我们说的一句话,"随机数学充满了魅力与威力"来形容随机过程再合适不过了.当然,随机过程比较难,故有"随机过程随机过"一说.这里,我就把美…

漫画:什么是机器学习

转载自 漫画&#xff1a;什么是机器学习故事一&#xff1a;瑞雪兆丰年我们中国有一句关于农业生产的古老谚语&#xff1a;瑞雪兆丰年。 就是说&#xff0c;如果前一年冬天下雪很大很多&#xff0c;那么第二年庄稼丰收的可能性比较大。 这条谚语是怎么来的呢&#xff1f;我们可以…

转:高等数学、线性代数、概率论数理统计书籍推荐

转&#xff1a; https://blog.csdn.net/Yahuvi/article/details/54692504 导语 最近在学习机器学习&#xff0c;发现需要恶补数学知识。总感觉国内大学的教材只适合考试&#xff0c;对数学知识的理解和运用不到位。现整理一些评价很高的书&#xff0c;总的来说较适合工科类的…

二叉树:HDU1754

很多学校流行一种比较的习惯。老师们很喜欢询问&#xff0c;从某某到某某当中&#xff0c;分数最高的是多少。 这让很多学生很反感。 不管你喜不喜欢&#xff0c;现在需要你做的是&#xff0c;就是按照老师的要求&#xff0c;写一个程序&#xff0c;模拟老师的询问。当然&…

转:微服务设计、拆分原则

转自&#xff1a; https://www.cnblogs.com/guanghe/p/10978349.html 一、AKF拆分原则 业界对于可扩展系统架构设计有一个朴素的理念&#xff1a;通过加机器就可以解决容量和可用性问题。 这一理念在云计算概念疯狂流行的今天&#xff0c;得到了广泛的认可&#xff0c;对于一个…

漫画:什么是中间人攻击

转载自 漫画&#xff1a;什么是中间人攻击故事发生在上世纪40年代......在解放战争初期&#xff0c;东北牡丹江一带&#xff0c;奶头山有一个土匪副官叫栾平。他计划将包含重要信息的先遣图&#xff0c;献给威虎山的土匪头子座山雕。而后栾平被共军抓获&#xff0c;侦查员杨子荣…

微信小程序的setData

1.setData&#xff08;&#xff09;中修改为bool类型不要加" " 例&#xff1a;setData({"a":false}) 2.修改以变量作为索引值的数组时的语法为 this.setData({ [ judge[${current}] ]: true })

转:AKF 扩展立方体

转&#xff1a; https://www.cnblogs.com/fengyc/p/12286726.html AKF 扩展立方体&#xff08;AKF Scale Cube&#xff09;是一个描述从单体应用到可扩展架构的模型&#xff0c;见 (https://akfpartners.com/growth-blog/scale-cube)[https://akfpartners.com/growth-blog/scal…

漫画:三分钟了解敏捷开发

转载自 漫画&#xff1a;三分钟了解敏捷开发 什么是敏捷开发&#xff1f; 敏捷开发&#xff08;Agile&#xff09;是一种以人为核心、迭代、循序渐进的开发方法。 在敏捷开发中&#xff0c;软件项目的构建被切分成多个子项目&#xff0c;各个子项目的成果都经过测试&#xff0c…

微信小程序数组的坑

定义和用法 splice() 方法用于添加或删除数组中的元素。 注意&#xff1a;这种方法会改变原始数组。 返回值 如果仅删除一个元素&#xff0c;则返回一个元素的数组。 如果未删除任何元素&#xff0c;则返回空数组。 主要&#xff1a;array.splice(index,howmany&#xff09…

转: 理解RESTful架构

转自&#xff1a; http://www.ruanyifeng.com/blog/2011/09/restful.html 作者&#xff1a; 阮一峰 日期&#xff1a; 2011年9月12日 越来越多的人开始意识到&#xff0c;网站即软件&#xff0c;而且是一种新型的软件。 这种"互联网软件"采用客户端/服务器模式&a…

Git 12 岁了,送给你 12 个 Git 使用技巧

转载自 Git 12 岁了&#xff0c;送给你 12 个 Git 使用技巧Git&#xff0c;一个分布式版本控制系统&#xff0c;它已经成为了开源世界的源码控制默认工具&#xff0c;在4月7号12岁了。但是使用Git中更另人沮丧的是&#xff0c;你需要了解多少才能让你更有效的使用它。同时这也是…

转:Kafka、RabbitMQ、RocketMQ等消息中间件的介绍和对比

转自&#xff1a; https://blog.csdn.net/yunfeng482/article/details/72856762 前言 在分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦。现在开源的消息中间件有很多,前段时间产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注。 概念 MQ简…

微信小程序定时器setInterval()的使用注意事项

setInterval&#xff08;function(){}, number 时间间隔/ms&#xff09; 注意在setInterval中定义的函数中使用 this 指向的是该计时器&#xff0c;若要用到页面数据应如下操作&#xff1a; let thatthis setInterval(function(){ that.data.a0; },number 时间间隔/ms) …

图解分布式架构的演进

转载自 图解分布式架构的演进一、什么是分布式架构分布式系统&#xff08;distributed system&#xff09; 是建立在网络之上的软件系统。内聚性&#xff1a;是指每一个数据库分布节点高度自治&#xff0c;有本地的数据库管理系统。透明性&#xff1a;是指每一个数据库分布节点…

转:微服务架构:BFF和网关是如何演化出来的?(这篇文章相当棒)

转自&#xff1a; https://juejin.im/entry/6844903806208049159 这篇文章写得非常棒&#xff0c;从服务&#xff0c;到bff&#xff0c; 到gateway 的一步步演化&#xff0c;描述的非常清晰易懂。 1、介绍 BFF(Backend for Frontend)和网关Gateway是微服务架构中的两个重要概…

微信小程序的坑

<input>组件后台接受到的是字符串类型&#xff0c;若要用数字类型应用Number()进行转化 微信小程序中许多API会是页面的 this 转向应注意用 let that this 来获取页面的指针&#xff01;

实现滚到div时淡入效果

首先实现淡入的动画 CSS代码如下&#xff1a; keyframes float { from { position: relative; margin-top:200px; opacity: 0; } to { position: relative; margin-top: 50px; opacity: 1; } } 接下来用JS判断当前滚动的位置并加入…