Apache Spark RDD和Java流

几个月前,我很幸运地参加了一些使用Apache Spark的PoC(概念验证)。 在这里,我有机会使用弹性分布式数据集(简称RDD ),转换和操作。

几天后,我意识到虽然Apache Spark和JDK是非常不同的平台,但RDD转换和操作以及流中间操作和终端操作之间存在相似之处。 我认为这些相似之处可以帮助初学者(如我* grin * )开始使用Apache Spark。

Java流 Apache Spark RDD
中级作业 转型
终端操作 行动

请注意,Apache Spark和JDK是
非常不同的平台。 Apache Spark是一个开放源代码集群计算框架,可帮助进行大数据处理和分析。 JDK(Java开发工具包)包括用于开发,调试和监视Java应用程序(而不仅仅是数据处理)的工具。

Java流

让我们从流开始。 Java 8于2014年某个时候发布。可以说,它带来的最重要的功能是Streams API(或简称为Streams)。

创建Stream ,它将提供许多操作,这些操作可以分为两类:

  • 中间,
  • 和终端。

中间操作返回上一个流。 这些中间操作可以连接在一起以形成管道。 另一方面, 终端操作关闭流管道,并返回结果。

这是一个例子。

Stream.of(1, 2, 3).peek(n -> System.out.println("Peeked at: " + n)).map(n -> n*n).forEach(System.out::println);

运行上面的示例时,它将生成以下输出:

Peeked at: 1
1
Peeked at: 2
4
Peeked at: 3
9

中间操作懒惰的。 直到遇到终端操作,才开始实际执行。 在这种情况下,终端操作为forEach() 。 因此,我们不到以下内容。

Peeked at: 1
Peeked at: 2
Peeked at: 3
1
4
9

取而代之的是,我们看到的是: peek()map()forEach()已结合在一起以形成管道。 在每遍中,static of()操作从指定的值返回一个元素。 然后调用管道: peek()打印字符串“ Peeked at:1”,后跟map() ,并终止于显示数字“ 1”的forEach() 。 然后以of()开始的另一遍返回指定值中的下一个元素,然后是peek()map() ,依此类推。

执行诸如peek()类的中间操作实际上并不会执行任何窥视,而是创建一个新的流,该新流在遍历时将包含初始流的相同元素,但还会执行所提供的操作。

Apache Spark RDD

现在,让我们转到Spark的RDD(弹性分布式数据集)。 Spark处理数据的核心抽象是弹性分布式数据集(RDD)。

RDD只是元素的分布式集合。 在Spark中,所有工作都表示为创建新的RDD或调用RDD上的操作以计算结果。 在后台,Spark会自动在整个群集中分布RDD中包含的数据,并并行化您对其执行的操作。

创建后,RDD将提供两种类型的操作:

  • 转变,
  • 和行动。

转换从上一个构造新的RDD。 另一方面, 动作基于RDD计算结果,然后将其返回到驱动程序或将其保存到外部存储系统(例如HDFS)。

这是一个使用Java Streams的大致示例。

SparkConf conf = new SparkConf().setAppName(...);
JavaSparkContext sc = new JavaSparkContext(conf);List<Integer> squares = sc.parallelize(Arrays.asList(1, 2, 3)).map(n -> n*n).collect();System.out.println(squares.toString());// Rough equivalent using Java Streams
List<Integer> squares2 = Stream.of(1, 2, 3).map(n -> n*n).collect(Collectors.toList());System.out.println(squares2.toString());

设置Spark上下文之后,我们调用parallelize() ,该方法从给定的元素列表中创建一个RDD。 map()是一个转换,而collect()是一个动作。 像Java中的中间流操作一样,转换会被延迟评估。 在此示例中,Spark在看到动作之前不会开始执行对map()的调用中提供的功能。 这种方法乍一看可能并不常见,但是在处理大量数据(换句话说就是大数据)时,这很有意义。 它允许Spark拆分工作并并行进行。

字数示例

让我们以字数统计为例。 在这里,我们有两种实现:一种使用Apache Spark,另一种使用Java Streams。

这是Java Stream版本。

public class WordCountJava {private static final String REGEX = "\\s+";public Map<String, Long> count(URI uri) throws IOException {return Files.lines(Paths.get(uri)).map(line -> line.split(REGEX)).flatMap(Arrays::stream).map(word -> word.toLowerCase()).collect(groupingBy(identity(), TreeMap::new, counting()));}}

在这里,我们逐行读取源文件,并按单词顺序转换每一行(通过map()中间操作)。 由于每行都有一个单词序列,并且有很多行,因此可以使用flatMap()将它们转换为单个单词序列。 最后,我们根据它们的identity()它们分组(即,字符串的身份就是字符串本身),然后对它们进行计数。

针对包含两行的文本文件进行测试时:

The quick brown fox jumps over the lazy dog
The quick brown fox jumps over the lazy dog

它输出以下地图:

{brown=2, dog=2, fox=2, jumps=2, lazy=2, over=2, quick=2, the=4}

现在,这是Spark版本。

public class WordCountSpark {private static final String REGEX = "\\s+";public List<Tuple2<String, Long>> count(URI uri, JavaSparkContext sc) throws IOException {JavaRDD<String> input = sc.textFile(Paths.get(uri).toString());return input.flatMap(line -> Arrays.asList(line.split(REGEX)).iterator()).map(word -> word.toLowerCase()).mapToPair(word -> new Tuple2<String, Long>(word, 1L)).reduceByKey((x, y) -> (Long) x + (Long) y).sortByKey().collect();}}

当对同一个两行文本文件运行时,它输出以下内容:

[(brown,2), (dog,2), (fox,2), (jumps,2), (lazy,2), (over,2), (quick,2), (the,4)]

为简洁起见,已排除了JavaSparkContext的初始配置。 我们从文本文件创建JavaRDD 。 值得一提的是,此初始RDD将在文本文件中逐行操作。 这就是为什么我们将每一行拆分为单词序列,然后将其flatMap()拆分的原因。 然后,我们将一个单词转换为一个计数为一(1)的键值元组,以进行增量计数。 完成此操作后,我们将单词( reduceByKey() )与上一个RDD中的键值元组进行分组,最后我们以自然顺序对其进行排序。

收盘时

如图所示,两种实现方式是相似的。 Spark实施需要更多的设置和配置,并且功能更强大。 了解中间流和终端流操作可以帮助Java开发人员开始了解Apache Spark。

感谢Krischelle, RB和Juno ,让我参与了使用Apache Spark的PoC。

翻译自: https://www.javacodegeeks.com/2017/04/apache-spark-rdd-java-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350672.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用脚本js把结果转化为固定小数位的形式

function roundTo(base,precision) {var mMath.pow(10,precision);var aMath.round(base * m) / m;return a; } 例如&#xff1a;给定数字n6.3241712&#xff0c;则用roundTo&#xff08;n,0&#xff09;得4&#xff0c;用roundTo(n,7)得到6.3241712转载于:https://www.cnblogs…

存储过程详解

什么是存储过程&#xff1a;存储过程可以说是一个记录集吧&#xff0c;它是由一些T-SQL语句组成的代码块&#xff0c;这些T-SQL语句代码像一个方法一样实现一些功能&#xff08;对单表或多表的增删改查&#xff09;&#xff0c;然后再给这个代码块取一个名字&#xff0c;在用到…

gpu版tensorflow测试

测试程序&#xff1a; import tensorflow as tfwith tf.Session(configtf.ConfigProto(allow_soft_placementTrue, log_device_placementFalse)) as sess:a tf.constant(1)b tf.constant(3)c a bprint(结果是&#xff1a;%d\n 值为&#xff1a;%d % (sess.run(c), sess.ru…

随机森林原理_机器学习(29):随机森林调参实战(信用卡欺诈预测)

点击“机器学习研习社”&#xff0c;“置顶”公众号重磅干货&#xff0c;第一时间送达回复【大礼包】送你机器学习资料与笔记回顾推荐收藏>机器学习文章集合&#xff1a;1-20机器学习(21): Tensorflow Keras手写数字识别机器学习(22): Tensorflow Keras识别猫狗机器学习(23)…

sudo 命令报错的解决方法

尝试着用终端打开Mac的安全权限&#xff08;sudo spctl --master-disable&#xff09;&#xff0c;却显示以下提示&#xff0c;望高手解答。sudo: /etc/sudoers is world writablesudo: no valid sudoers sources found, quittingsudo: unable to initialize policy plugin 解决…

BGR转RGB

原图&#xff1a; 源代码&#xff1a; #codingutf-8#OpenCV读进来的图像,通道顺序为BGR&#xff0c; 而matplotlib的顺序为RGB&#xff0c;因此需要转换 import cv2 import numpy as np from matplotlib import pyplot as pltimg cv2.imread(./test1.jpg) B, G, R cv2.split…

C++ set的一些用法

set也是STL中比较常见的容器。set集合容器实现了红黑树的平衡二叉检索树的数据结构&#xff0c;它会自动调整二叉树的排列&#xff0c;把元素放到适当的位置。set容器所包含的元素的值是唯一的&#xff0c;集合中的元素按一定的顺序排列。 我们构造set集合的目的是为了快速的检…

ide在控制台输入编译命令_快速编译调试 Redis

一&#xff1a;开篇Redis 它是个宝&#xff0c;男女老少都说好。秒杀限流分布式&#xff0c;什么需求都能搞。Redis 主要的用途是分布式缓存&#xff0c;其实不用我多介绍&#xff0c;相信大家都用过Redis。之前也看过不少Redis的书&#xff0c;其中就包括《Redis设计与实现》。…

Java增强枚举的用例

Brian Goetz在消息“ 增强枚举-用例 ”中写道&#xff1a;“我们希望就现在实现的功能[ 增强枚举 ]获得用户反馈。” 他陈述了他的信息的第一个目的&#xff1a;“开始工作&#xff0c;这是一些通用枚举可能有用的典型用例。” 所提供的两个示例中的第一个示例是重构com.sun.to…

图片上传获取名字

Override public ResultResponse<String> uploadImg(MultipartFile file) { String imgUrl null; try { //MultipartFile类中两个方法区别&#xff1a;//getName : 获取表单中文件组件的名字//getOriginalFilename : 获取上传文件的原名 String name file.getOriginalF…

tf.nn.softmax

通过Softmax回归&#xff0c;将logistic的预测二分类的概率的问题推广到了n分类的概率的问题。通过公式 可以看出当月分类的个数变为2时&#xff0c;Softmax回归又退化为logistic回归问题。 下面的几行代码说明一下用法 # -*- coding: utf-8 -*- import tensorflow as tfA […

python easygui_python简单图形界面GUI入门——easygui

首先是easygui包下载&#xff0c;两种方式&#xff1a;1)在命令行提示符环境下&#xff0c;用pip install easygui直接安装&#xff1a;2)从http://easygui.sourceforge.net下载。将下载得到的easygui.py文件&#xff0c;复制到Python安装路 径下的Lib文件夹中。等待安装完成即…

使用 Python ElementTree 生成 xml

Python 处理 xml 文档的方法有很多&#xff0c;除了经典的 sax 和 dom 之外&#xff0c;还有一个 ElementTree。 首先 import 之&#xff1a; 1from xml.etree import ElementTree as etree然后开始构建 xml 树&#xff1a; 1234567891011121314from xml.etree.ElementTree imp…

卷积核输出维度计算

1&#xff09;卷积层&#xff1a; 参数&#xff1a;W&#xff1a;宽&#xff1b; H&#xff1a;高&#xff1b; D&#xff1a;深度&#xff1b; K&#xff1a;卷积核的个数&#xff1b; F&#xff1a;卷积核的大小&#xff1b; S&#xff1a;步长&#xff1b; P&#xff1a;…

接受与返回json数据

转载于:https://www.cnblogs.com/classmethond/p/10801606.html

归一化方法列举

归一化方法&#xff1a;除以序列最大值的&#xff0c;叫峰归一化&#xff1b;除以序列之和的&#xff0c;叫面积归一化&#xff1b;除以序列的模&#xff0c;叫数值归一化&#xff0c;得到序列的方差为0&#xff0c;均值为1&#xff1b;(1) 线性函数转换&#xff0c;表达式如下…

定时器和promise_手写Promise核心原理,再也不怕面试官问我Promise原理

整体流程的介绍 整体流程的介绍1. 定义整体结构2. 实现Promise构造函数3. 实现then方法3.实现catch方法4. 实现Promise.resolve5.实现Promise.reject6.实现Promise.all7.实现Promise.race文章会配合例子来讲解为什么要这么实现&#xff0c;尽我所能讲得粗俗易懂。有什么不理解或…

在Java 9中使用sun.misc.Unsafe

Java 9 EA版本已经发布&#xff0c;现在我们可以看到如何使用sun.misc.Unsafe。 我领导了公开运动&#xff0c;以保留对Java 9的访问&#xff0c;该访问最终成功&#xff0c;从而导致对JEP 260的修订。 那么&#xff0c;事情如何结束&#xff1f; 设定 首先&#xff0c;您需要…

惊现神作!!!

发现穿越类小说 《穿越位面的狂人》 起点地址&#xff1a; https://book.qidian.com/info/1010641845 小说名《位面穿梭之宿舍电梯》改为《穿越位面的狂人》nx大学大一学生柳风&#xff0c;因经济拮据&#xff0c;不得不住在宿舍楼顶的一间破屋子里&#xff0c;半夜下楼上厕…

[HNOI2019]校园旅行

题目 过于神仙啊&#xff0c;抄题解.jpg 首先\(n\)并不是很大啊&#xff0c;我们可以直接用\(f_{i,j}\)表示\(i\)到\(j\)是否存在一个回文路径 对于一条回文路径&#xff0c;如果在两端分别添加一个相同的字符&#xff0c;那么仍然是一个回文路径&#xff0c;于是我们可以利用这…