如何将自定义数据源集成到Apache Spark中

如今,流数据是一个热门话题,而Apache Spark是出色的流框架。 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中。

Spark Streaming使我们能够从各种来源进行流传输,同时使用相同的简洁API访问数据流,执行SQL查询或创建机器学习算法。 这些功能使Spark成为流式(或任何类型的工作流)应用程序的首选框架,因为我们可以使用框架的所有方面。

面临的挑战是弄清楚如何将自定义数据源集成到Spark中,以便我们能够利用其强大功能而无需更改为更多标准源。 更改似乎是合乎逻辑的,但是在某些情况下,这样做是不可能或不方便的。

流式自定义接收器

Spark提供了不同的扩展点,正如我们在此处扩展Data Source API以便将自定义数据存储集成到Spark SQL中所看到的那样。

在此示例中,我们将做同样的事情,但是我们还将扩展流API,以便我们可以从任何地方流。

为了实现我们的自定义接收器,我们需要扩展Receiver [A]类。 请注意,它具有类型注释,因此我们可以从流客户端的角度对DStream实施类型安全。

我们将使用此自定义接收器来流式传输我们的应用程序之一通过套接字发送的订单。

通过网络传输的数据的结构如下所示:

1 5
1 1 2
2 1 1
2 1 1
4 1 1
2 2
1 2 2

我们首先接收订单ID和订单总金额,然后接收订单的行项目。 第一个值是商品ID,第二个是订单ID(与订单ID值匹配),然后是商品成本。 在此示例中,我们有两个订单。 第一个只有四个项目,第二个只有一个项目。

这个想法是将所有这些隐藏在我们的Spark应用程序中,因此它在DStream上收到的是在流上定义的完整顺序,如下所示:

val orderStream: DStream[Order] = .....
val orderStream: DStream[Order] = .....

同时,我们还使用接收器来流式传输我们的自定义流式源。 即使它通过套接字发送数据,使用来自Spark的标准套接字流也将非常复杂,因为我们将无法控制数据的输入方式,并且会遇到在应用程序上遵循顺序的问题本身。 这可能非常复杂,因为一旦进入应用程序空间,我们便会并行运行,并且很难同步所有这些传入数据。 但是,在接收方空间中,很容易从原始输入文本创建订单。

让我们看看我们的初始实现是什么样的。

case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}
case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}

我们的OrderReceiver扩展了Receiver [Order],它使我们可以在Spark内部存储Order(带注释的类型)。 我们还需要实现onStart()和onStop()方法。 请注意,onStart()创建一个线程,因此它是非阻塞的,这对于正确的行为非常重要。

现在,让我们看一下接收方法,真正发生魔术的地方。

def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}
def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}

在这里,我们创建一个套接字并将其指向源,然后我们就可以简单地开始读取它,直到调度了stop命令,或者套接字上没有更多数据为止。 请注意,我们正在读取与之前定义的结构相同的结构(如何发送数据)。 完全阅读订单后,我们将调用store(…),以便将其保存到Spark中。

除了在我们的应用程序中使用我们的接收器外,这里别无所要做:

val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))
val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))

请注意我们是如何使用自定义OrderReceiver创建流的(仅为了清楚起见,对val流进行了注释,但这不是必需的)。 从现在开始,我们将流(DString [Order])用作我们在任何其他应用程序中使用的任何其他流。

stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}
stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}

摘要

当处理生成无尽数据的源时,Spark Streaming非常方便。 您可以使用与Spark SQL和系统中其他组件相同的API,但它也足够灵活,可以扩展以满足您的特定需求。

翻译自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352826.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux用户管理练习题

转载自:http://blog.sina.com.cn/s/blog_6a8d2f120100qiyj.html 1)新建一个组group1,新建一个系统组group2 参考答案: groupadd group1 groupadd -r group2 cat /etc/group /etc/gshadow|grep group[1-2] 2)更改组group2的GID为103,更改组名为grouptest 参考答案:…

闪念-许久未来一切没变

好久好久没来逛社区了,前几天有空登录进来看看,好多以前的老朋友都还在,甚是欣慰。有种亲切的感觉,时间真的很快,差不多有3-4年没来玩社区了。经历了两家互联网行业级Top3的公司一度没有时间逛社区,玩社区交…

C语言麻将递归,C++数据结构与算法——麻将胡牌算法(二:完全胡牌算法)

虽然单花色胡牌算法面试时写出来了,但是完整的胡牌算法却没有写,既然遇到了,秉着不抛弃不放弃的精神,当然不能原谅懒惰的自己了。下面这篇为一个完整的胡牌算法。胡牌规则除了以下几点,其余与单花色胡牌规则一致&#…

第一行代码-第一章

模拟器和真机切换 点击app选择Configuration,Target选择USB设备或模拟器 真机不能获取debug日志 以360手机为例 1.拨号界面输入“*20121220#”,进入工程模式,点击日志输出等级。 2.修改以下选项 Log print enable 选 enable Java log level 选…

啊哈c语言推箱子小游戏,啊哈C入门版学完了,现发推箱子源代码~

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼该推箱子拥有对成绩的记录功能,因此,请挑战自己的最少步数吧!源代码如下:#include #include int main(){system("color 0a");char a[9][11]{" ###### "," # ###",…

AIDA64制作机箱副屏/性能监控屏, 刷新慢问题

AIDA64制作机箱副屏/性能监控屏, 刷新慢问题 手上有闲置的手机/平板, 非闲置也行, 毕竟也没人边刷手机边打游戏吧AIDA64可以监控大多数参数, 放到一些酷炫的LCD模板上, 通过手机/平板等的浏览器就可访问这个页面, 实现无线监控 在网上找一个AIDA64, 安装文件-设置-LCD找到Rem…

javaone_JavaOne 2012:Java策略主题演讲和IBM主题演讲

javaone与 JavaOne 2010 相似,我对JavaOne 2012有了一个粗略的起点。由于“计算机和打印机技术上的困难”,办理登机手续的人花了70分钟为我提供JavaOne徽章。 尽管我不是世界上最有耐心的人,但比等待更令人失望的是,我错过了参加“…

常用正则表达式(?i)忽略字母的大小写!

转载自:http://blog.csdn.net/iwanttoknow7/article/details/5773285 1。^/d$  //匹配非负整数(正整数 0) 2 。 ^ [ 0 - 9 ] * [ 1 - 9 ][ 0 - 9 ] * $   // 匹配正整数 3 。 ^ (( - /d) | ( 0))$   // 匹配非正整数(负…

内存刷新机制

red log buffer、data buffer、binlog cache。在O和M中,讲究日志先行策略,就是一条DML语句进入数据库之后,都会先写日志,再写数据文件。 1.red log, 重做日志文件,用于记录事务操作的变化,记录的…

android 字符串对齐,android – 使用Spanable String对齐ImageSpan

我知道有很多相同类型的问题可供使用,我尝试了很多解决方案,但所有这些问题都达不到我的要求.我的问题是我必须在包含Spanable字符串和Imagespan的文本之间添加动态行间距,但是当我添加行间距时,文本和图像的对齐会变形.我已经尝试了Stackoverflow上几乎所有可用的解决方案,如t…

如何将JBoss HR员工奖励项目放入云端

我们一直在讨论为什么应用程序开发人员在App Dev Cloud Stack系列中不能再忽略其堆栈了。 我们从头到尾讨论了各个层,但尚未为您提供除Red Hat Container Development Kit(CDK)之外的任何应用程序开发工具。 到目前为止,您所拥有…

用JIRA管理你的项目

https://blog.csdn.net/gaowenhui2008/article/details/70241657 (一) JIRA环境搭建 转载于:https://www.cnblogs.com/eustoma/p/9637509.html

ArcGIS编辑操作的常用快捷键一览表

在ArcMap中,快捷键与一些编辑工具和命令相关联。使用快捷键能使编辑工作更加快捷有效,总结如下: 一、公共快捷键(对所有编辑工具有效): Z:放大 X:缩小 C:漫游 V&#xff…

javaone_JavaOne 2012:使用HTML5和Java构建移动应用程序

javaone我返回了Parc 55 (任务会议室),观看Max Katz的( Exadel开发人员关系)“使用HTML5和Java构建移动应用程序” Bird-of-Feather(BoF)演示文稿。 具体来说,Katz在Tiggzi &#xf…

android recyclerview 横向item 宽度,RecyclerView的item宽度不能全屏显示及线性布局与grid布局切换混乱解决方法...

1.RecyclerView的Item宽度不能全屏显示出现这种的原因有两种:1.第一种是在adapter中加载inflater时parent传入为null2.如果第一种不行,第二种原因就是在RecyclerView外随意加一层布局即可充满解决方法,在onCreateViewHolder按照如下修改&…

剑指offer(一):二维数组中的查找

题目: 在一个二维数组中(每个一维数组的长度相同),每一行都按照从左到右递增的顺序排序,每一列都按照从上到下递增的顺序排序。请完成一个函数,输入这样的一个二维数组和一个整数,判断数组中是否…

sed以及awk的替换命令

转载自:http://blog.csdn.net/imzoer/article/details/8721590 先看sed 的。 比如说, 在上面使用sed替换每行第一个出现的nau为n_a_u。 如果加上g参数,那么对已一行,每次遇到一个nau就要替换一次。 -----------------------------…

图片下载中文传输转义问题

// 定义utf-8转义失败,中文是一串什么3E%什么的 URLEncoder.encode((productName".jpeg"), "UTF-8") // 改成如下代码 new String((productName ".jpeg").getBytes("utf-8"), "ISO8859-1") 转载于:https://www.…

NetBeans Java EE技巧8:持久性单元

任何好的IDE的基本目标都是简化和组织代码和开发工作流程。 NetBeans在实现这些目标方面做得非常出色,有时我们将基础知识视为理所当然。 例如,如果您曾经试图在纯文本编辑器中阅读XML蒙上了双眼,那么过去在配置和使用Persistence Units时可能…

c语言循环链表中设立尾链表,C语言实现双向非循环链表(带头结点尾结点)的节点插入...

对于双向链表,个人推荐使用带头结点尾结点的方式来处理会比较方便。我在《C语言实现双向非循环链表(不带头结点)的节点插入》中详细实现了在不带头结点的情况下的插入。这次我们将会来在使用头结点尾结点的情况下在任意的位置插入元素。代码上传至 https://github.c…