使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构

“ Lambda体系结构是一种数据处理体系结构,旨在通过利用批处理和流处理方法来处理大量数据。 这种体系结构方法试图通过使用批处理提供批处理数据的全面而准确的视图,同时使用实时流处理提供在线数据的视图来平衡延迟 , 吞吐量和容错能力 。 在演示之前,可以将两个视图输出合并。 lambda体系结构的兴起与大数据的增长,实时分析以及减轻地图缩减延迟的驱动力有关。” –维基百科

以前,我已经写了一些博客,涉及许多用例,这些用例是使用Oracle Data Integrator(ODI)在MapR分发之上进行批处理,以及使用Oracle GoldenGate(OGG)将事务数据流式传输到MapR Streams和其他Hadoop组件中。 最新的ODI(12.2.1.2.6)结合了这两种产品以完全适合lambda架构,同时具有许多新的强大功能,包括能够将Kafka流作为ODI本身的源和目标进行处理。 通过简化我们在一种产品下以相同逻辑设计处理和处理批处理和快速数据的方式,此功能对已经拥有或计划拥有lambda架构的任何人都具有巨大的优势。 现在,如果我们将OGG流传输功能和ODI批处理/流传输功能结合在一起,则可能性是无限的。

在本博客中,我将向您展示如何使用Spark Streaming在Oracle Data Integrator上配置MapR流(aka Kafka)以创建真正的lambda体系结构:补充批处理和服务层的快速层。

在本文中,我将跳过ODI的“赞扬和称赞”部分,但我只想强调一点:自从ODI首次发布以来,为该博客设计的映射,就像您将设计的所有其他映射一样,都是您可以直接在Hadoop / Spark集群上以100%的本机代码运行,而无需编写零行代码,也不必担心如何以及在何处编码。

我已经在MapR上完成了此操作,因此我可以制作“两只鸟一块石头”。 向您展示MapR Streams步骤和Kafka。 由于两者在概念或API实现上并没有太大差异,因此如果您使用的是Kafka,则可以轻松地应用相同的步骤。

如果您不熟悉MapR Streams和/或Kafka概念,建议您花一些时间来阅读它们。 以下内容假定您知道什么是MapR Streams和Kafka(当然还有ODI)。 否则,您仍然会对可能的功能有个好主意。

准备工作

MapR Streams(aka Kafka)相关的准备工作

显然,我们需要创建MapR Streams路径和主题。 与Kafka不同,MapR通过“ maprcli”命令行实用程序使用其自己的API来创建和定义主题。 因此,如果您使用商品Kafka,则此步骤将略有不同。 Web上有很多有关如何创建和配置Kafka主题和服务器的示例,因此您并不孤单。

为了进行此演示,我创建了一个路径和该路径下的两个主题。 我们将让ODI从其中一个主题(注册)进行消费,并生成另一个主题(registrations2)。 这样,您将看到它如何通过ODI起作用。

创建一个名为“ users-stream”的MapR Streams路径和一个名为“ registrations”的主题:

在我之前定义的相同路径上创建第二个主题“ registrations2”:

Hadoop相关准备

由于我使用的是已安装并正在运行MapR的个人预配置VM,因此此处没有很多准备工作。 但是,需要一些步骤才能成功完成ODI映射。 如果您想知道我如何使ODI可以用于MapR发行版,则可以参考此博客文章 。

  • Spark:我已经在Spark 1.6.1上进行了测试,您也应该这样做。 至少不要转到任何较低版本。 此外,您需要针对Spark构建具有特定的标签版本。 我从标签1605(这是MapR发布约定)开始测试,但是我的工作失败了。 究其原因,我发现PySpark库不是MapR Streams API的最新版本。 他们可以使用商品Kafka,但不能使用MapR。 这是我使用过的RPM的链接 。
  • Spark日志记录:在spark路径下,有一个“ config”文件夹,其中包含不同的配置文件。 如果需要的话,我们只对其中一项感兴趣。 文件名为“ log4j.properties”。 您需要确保将“ rootCategory”参数设置为INFO,否则,当您运行提交到Spark的任何ODI映射时,都会出现异常:

  • Hadoop凭证存储:在提交的任何作业中需要某些密码时,ODI都将引用Hadoop凭证存储。 这样,我们就不会在参数/属性文件或代码本身中包含任何明确的密码。 在此演示中,我们将在某个时候使用MySQL,因此我需要创建一个存储并为MySQL密码添加别名。 首先,您需要确保在core-site.xml中有一个用于凭证存储的条目,然后实际上为密码值创建一个别名:

上一张图片是我的“ site-core.xml”的摘要,向您显示了我添加的凭据存储。 下一步将是验证商店是否存在,然后为密码值创建别名:

更改之后,即使在编辑core-site.xml之后,也无需重新启动任何hadoop组件。

注意:如果您遇到“操作系统异常”(例如137),请确保您有足够的可用内存。

ODI相关准备

您将在ODI中进行的常规准备工作。 我将在此博客中显示相关内容。

Hadoop数据服务器

以下配置特定于MapR。 如果使用其他发行版,则需要输入相关的端口号和路径:

Spark-Python数据服务器

在此ODI版本12.2.1.2.6中,如果要使用Spark Streaming和常规Spark服务器/群集,则需要创建多个Spark数据服务器。 在此演示中,我仅创建了Spark Streaming服务器,并将其称为Spark-Async。

您需要将“主群集”值更改为实际使用的值:yarn-client或yarn-cluster,然后选择我们先前创建的Hadoop DataServer。

现在,这里配置的有趣部分是Spark-Async数据服务器的属性:

我已经强调了您需要注意的最重要的方面。 之所以使用ASYNC,是因为我们将使用Spark Streaming。 其余属性与性能有关。

Kafka数据服务器

在这里,我们将定义MapR Streams数据服务器:

元数据代理具有一个“虚拟”地址,仅符合Kafka API。 MapR Streams客户端将为您提供连接到MapR Streams所需的服务。 您可能无法在此处测试数据服务器,因为在MapR上没有运行这样的Kafka服务器。 因此,请安全地忽略此处的测试连接,因为它将失败(这样就可以了)。

对于属性,您需要定义以下内容:

您需要手动定义“ key.deserializer”和“ value.deserializer”。 MapR Streams都需要这两者,如果未定义作业,作业将失败。

ODI映射设计

我已经在这里进行了测试,涵盖了五个用例。 但是,我将只完整介绍一个,并突出显示其他内容,以免您阅读多余和常识性的步骤。

1)MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka):

在此映射中,我们将从先前创建的主题中读取流数据,应用一些功能(简单的功能),然后将结果生成到另一个主题。 这是映射的逻辑设计:

我通过复制已经为MySQL反向工程设计的模型之一(结构相同)定义了MapR_Streams_Registrations1模型,但是在这种情况下,当然选择的技术是Kafka。 您将能够选择流数据的格式:Avro,JSON,Parquet或Delimited:

物理设计如下所示:

  • SOURCE_GROUP:这是我们的MapR Streams主题“注册”
  • TRANS_GROUP:这是我们的Spark异步服务器
  • TARGET_GROUP:这是我们的MapR Streams主题“ registrations2”

物理实现的属性为:

您需要选择暂存位置作为Spark Async并启用“流式传输”。

要将主题注册中的流数据加载到Spark流中,我们需要选择合适的LKM,即LKM Kafka到Spark:

然后从Spark Streaming加载到MapR Stream目标主题registrations2,我们需要选择LKM Spark到Kafka:

2)MapR-FS(HDFS)=> Spark Streaming => MapR Streams(Kafka):

除了使用的知识模块之外,我在这里不会向您展示太多。 要将MapR-FS(HDFS)加载到Spark Streaming,我使用了LKM File来Spark:

为了从Spark Streaming加载到MapR Streams,我像以前的映射一样使用LKM Spark到Kafka。

注意:LKM File to Spark将充当一个流,一个文件流(显然)。 ODI将仅接收任何更新/新文件,而不是静态文件。

3)MapR Streams(Kafka)=> Spark Streaming => MySQL:

要将MapR Streams(Kafka)加载到Spark Streaming,就像在第一个映射中一样,我使用了LKM Kafka到Spark。 然后从Spark Streaming加载到MySQL,我使用了LKM Spark到SQL:

4)MapR流(Kafka)=> Spark流=> MapR-FS(HDFS)

为了从MapR流加载到Spark流,我像以前一样使用LKM Kafka到Spark,然后从Spark Stream加载到MapR-FS(HDFS),我已经使用LKM Spark到File:

5)MapR Streams(Kafka)和Oracle DB => Spark Streaming => MySQL

这是另一个有趣的用例,您实际上可以在现场将Kafka流与SQL源一起加入。 这仅(当前)适用于查找组件:

请注意,驱动程序源必须是Kafka(在我们的示例中为MapR流),而查找源必须是SQL数据库。 我使用了与以前的映射几乎相同的LKM:从LKM SQL到Spark,从LKM Kafka到Spark和从LKM Spark到SQL。

行刑

我将仅向您展示第一个用例的执行步骤,即MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka)。 为了模拟这种情况,我创建了一个Kafka生产者控制台和另一个Kafka消费者控制台,以便可以监视结果。 查看下面的生产者,我粘贴了一些记录:

我已经突出显示了其中一个URL,以确保您注意到它是小写的。 等待几秒钟,Spark将处理这些消息并将其发送到目标MapR Streams主题:

请注意,所有URL均大写。 成功!

通过映射,结果与预期的一样。 因为它们很简单,所以我不会为它们显示测试步骤。 这里的想法是向您展示如何使用MapR Streams(Kafka)配置ODI。

最后的话

值得一提的是,在执行任何映射时,您都可以钻取日志并查看正在发生的事情(生成的代码等)。 此外,您将获得指向工作历史URL的链接以在Spark UI上访问它:

打开链接将带我们到Spark UI:

如果要控制流作业可以生存多长时间,则需要增加Spark-Async数据服务器的“ spark.streaming.timeout”属性或从映射配置本身覆盖它。 您可能还需要创建一个ODI程序包,该程序包具有一个循环和其他有用的组件来满足您的业务需求。

结论

ODI可以处理lambda架构中的两个层:批处理层和快速层。 这不仅是ODI在其非常长的综合功能列表中添加的一项重要功能,而且还将提高从一个统一,易于使用的界面设计数据管道的生产率和效率。 显然,ODI可以像使用商品Kafka一样轻松地与MapR Streams一起使用,这要感谢MapR的二进制文件与Kafka API兼容,以及ODI不需要依赖于一个框架。 这可以确保您ODI是真正的开放式模块化E-LT工具,与其他工具不同。

其他一些相关职位:

  • Oracle Data Integrator和MapR融合数据平台:请检查!
  • 使用Oracle GoldenGate将事务数据流式传输到MapR流中
  • 使用Oracle GoldenGate进行MapR-FS实时事务数据提取
  • 带有ODI的逆向工程师MapR-DB

免责声明

这里表达的思想,实践和观点仅是作者的观点,不一定反映Oracle的观点。

翻译自: https://www.javacodegeeks.com/2017/02/perfecting-lambda-architecture-oracle-data-integrator-kafka-mapr-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350391.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab regionprops区域属性信息

stats regionprops(L,properties) 返回的stats为一个结构体struct 测量标注矩阵L中每一个标注区域的一系列属性。L中不同的正整数元素对应不同的区域,例如:L中等于整数1的元素对应区域1;L中等于整数2的元素对应区域2;以此类推。…

python3虚拟环境使用教程_python虚拟环境完美部署教程

一、前言预处理建议仔细看完本文章之后在进行操作,避免失误,本环境可以用于生产环境,有利于生产环境python之间的环境隔离,互相不会产生环境冲突;pyenv和pyenv-virtualenv可以完美结合使用,具体使用情况看项…

yandexbot ip列表整理做俄罗斯市场的站长可以关注一下

这段时间ytkah在负责一个客户的网站,主要做俄罗斯市场,当然是要研究Yandex了,首先是要知道yandexbot的ip有哪些,本文通过分析这个站从2018.12.02到2019.05.21这段时间产生的网站log日志得出的结果,log日志文件1.3G&…

为什么闲鱼不能搜索python_Python 分析后告诉你闲鱼上哪些商品抢手?

前言文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理。作者:【Airpython】PS:如有需要Python学习资料的小伙伴可以加点击下方链接自行获取http://t.cn/A6Zvjdun准 备 工 作在编写代码…

matlab imfill孔洞填充

BW2 imfill(BW) 作用填充二值图像BW中的空洞 clear all; clc; close all; img imread(test1.png); if ndims(img)3 img rgb2gray(img); end img_bw im2bw(img); img_fill imfill(img_bw, holes); figure; subplot(1,2,1),imshow(img_bw), title(有空洞的图像); s…

将Gatling集成到Gradle构建中–了解SourceSet和配置

我最近在一个项目中工作,我们不得不将出色的负载测试工具Gatling集成到基于Gradle的版本中。 有可用的gradle插件使此操作变得容易,其中两个是this和this ,但是对于大多数需求而言,只需简单执行命令行工具本身就足够了&#xff0c…

matlab bwlabel标记连通区域

[L,num] bwlabel(BW,n) 这里num返回的就是BW中连通区域的个数。返回一个和BW大小相同的L矩阵,包含了标记了BW中每个连通区域的类别标签,这些标签的值为1、2、num(连通区域的个数)。n的值为4或8,表示是按4连通寻找区域…

mysql遇到时区问题的坑(Java解决方案)

最近项目遇到一个坑&#xff0c;就是server和db之间存在时区问题&#xff0c;本人的db是utc时间&#xff0c; 可以使用代码设置时区来解决&#xff0c;本人这里使用joda三方包&#xff0c;joda蛮好用的&#xff0c;具体用法这里不做详细描述。 先引入pom <dependency><…

vba 不等于_EXCEL表格VBA中的运算符

就像函数公式里面需要运算符一样&#xff0c;VBA中同样也需要运算符&#xff0c;今天我们就来了解下VBA中的运算符是怎么表达的。在学习运算符前&#xff0c;我们先来学习一段简单的VBA代码&#xff0c;Msgbox的表达。在代码窗口中输入&#xff1a;sub test ()tset表示测试的意…

C++常用的容器(vector、set、list、map)

C STL中最基本以及最常用的类或容器无非就是以下几个&#xff1a; stringvectorsetlistmap 下面就依次介绍它们&#xff0c;并给出一些最常见的最实用的使用方法&#xff0c;做到快速入门。 string 首先看看我们C语言一般怎么使用字符串的 char* s1 "Hello SYSU!&quo…

阿里 前端 规范_技术方案多、复用难?且看阿里文娱的前端工程管理实践

近两年&#xff0c;前端复杂度持续攀升&#xff0c;从框架到开发模式都衍生出了无数的技术方案。单点的小规模尝试&#xff0c;导致团队内部技术栈以及实现方案出现分化&#xff0c;间接造成了知识库之间的隔离、项目之间模块复用率下降&#xff0c;人员在不同项目中的学习成本…

Codeforces 671C Ultimate Weirdness of an Array 线段树 (看题解)

Ultimate Weirdness of an Array 写不出来&#xff0c; 日常好菜啊。。 考虑枚举GCD&#xff0c; 算出一共有多少个对 f(l, r) < GCD&#xff0c; 我们用fuc[ i ] 表示的是在 l i 这个位置开始, 最小的合法的 R&#xff0c; 可以发现这个函数随 i 单调不下降&#xff0c; …

安卓开发环境_我的安卓开发环境

大家好&#xff0c;今天想跟大家分享一下我的安卓开发环境&#xff0c;分别是硬件环境和软件环境。那么在开始之前先交代下我的背景&#xff0c;我从事安卓开发1年8个月&#xff0c;安卓教学9个月&#xff0c;大项目1个&#xff0c;小项目100。先说说硬件环境吧&#xff0c;直接…

构造函数,拷贝构造函数,赋值函数

C中一般创建对象&#xff0c;拷贝或赋值的方式有构造函数&#xff0c;拷贝构造函数&#xff0c;赋值函数这三种方法。下面就详细比较下三者之间的区别以及它们的具体实现 1.构造函数 构造函数是一种特殊的类成员函数&#xff0c;是当创建一个类的对象时&#xff0c;它被调用来…

性能优化系列

常见性能优化策略的总结&#xff08;转&#xff09; https://www.cnblogs.com/ajianbeyourself/p/6132546.html 网站前端和后台性能优化的34条宝贵经验和方法 https://www.imooc.com/article/41237 一次性能优化实战经历 https://www.cnblogs.com/SameZhao/p/6238997.html …

qotd服务_QOTD:Java线程与Java堆空间

qotd服务以下问题很常见&#xff0c;并且与OutOfMemoryError有关&#xff1a;在JVM线程创建过程和JVM线程容量期间无法创建新的本机线程问题。 这也是我向新技术候选人&#xff08;高级职位&#xff09;提出的典型面试问题。 我建议您在查看答案之前尝试提供自己的答复。 题&a…

ecs服务器数据迁移_某国际物流集团的云迁移解决方案

新钛云服已为您服务905天云计算流行当下&#xff0c;使用新技术较早的互联网企业&#xff0c;已基本实现云计算实施。传统企业也开始接触云计算并进行使用&#xff0c;但由于与互联网轻实体资产的模式不同&#xff0c;其往往面临着业务迁移上云的难题。云迁移过程非常复杂&…

红黑树的红黑标志有什么用

红黑树使用红黑二色进行“着色”&#xff0c;目的是利用颜色值作为二叉树的平衡对称性的检查&#xff0c;只要插入的节点“着色”满足红黑二色的规定&#xff0c;最短路径与最长路径不会相差的太远&#xff0c;红黑树的节点分布就能大体上达至均衡。 演示动画网站&#xff1a;h…

内存泄露与内存溢出

内存泄露是指你的应用使用资源之后没有及时释放&#xff0c;导致应用内存中持有了不需要的资源&#xff0c;这是一种状态描述。 内存溢出是指你的应用的内存已经不能满足正常使用了&#xff0c;堆栈已经达到系统设置的最大值&#xff0c;进而导致崩溃&#xff0c;这事一种结果描…

琥珀项目:较小的,面向生产力的Java语言功能

Brian Goetz最近的消息欢迎来到琥珀&#xff01; 介绍Project Amber &#xff08; OpenJDK的一部分&#xff0c; 最初于1月提出 &#xff09;。 Goetz通过介绍“欢迎使用Amber项目&#xff0c;这是我们面向特定生产力的Java语言JEP的孵化场”的介绍打开了这一消息。 Goetz重申&…