Apache Flink和Kafka入门

介绍

Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。

Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。

弗林克·卡夫卡

先决条件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

创建您的Flink流项目

第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,该原型包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。

在此应用程序中,我们将创建两个作业:

  • WriteToKafka :生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。
  • ReadFromKafka :读取相同的主题,并使用Kafka Flink连接器及其使用方在标准输出中显示消息。 API。

完整项目可在GitHub上找到:

  • Flink和Kakfa应用

让我们使用Apache Maven创建项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven将创建以下结构:

tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│   └── com│       └── grallandco│           └── demos│               ├── BatchJob.java│               ├── SocketTextStreamWordCount.java│               ├── StreamingJob.java│               └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

该项目被配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行该文件所需的所有依赖项。

该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。

添加Kafka连接器

打开pom.xml并将以下依赖项添加到您的项目中:

第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:

您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>元素中添加以下条目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

现在,Flink项目已准备就绪,可以通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。

安装并启动Kafka

下载Kafka,在终端中输入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通过在新终端中运行以下命令来启动Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一个终端中,运行以下命令来创建一个名为flink-demo的Kafka主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具将消息发布和使用到flink-demo主题。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消费者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。

编写您的Flink应用程序

现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。

制片人

生产者使用SimpleStringGenerator()类生成消息,并将该字符串发送到flink-demo主题。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代码在此处可用。

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 在应用程序环境中创建一个新的DataStream时, SimpleStringGenerator类将Flink中所有流数据源的Source接口实现SourceFunction 。
  • FlinkKafkaProducer09器添加到主题。

消费者

使用者只需从flink-demo主题中读取消息,然后将它们打印到控制台中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者group.id
  • 使用FlinkKafkaConsumer09从主题flink-demo获取消息

生成并运行应用程序

让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。

1-建立专案:

$ mvn clean package

2-运行Flink生产者作业

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-运行Flink消费者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在终端中,您应该看到生产者生成的消息

现在,您可以在Flink群集上部署并执行此作业。

结论

在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。

翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab集群搭建问题

本文是在matlab 集群搭建中遇到一些问题的总结&#xff1a; 1、破解版的是否可以用&#xff0c;我已经搭建到集群还没使用目前看来破解版的是可以用的&#xff0c;不存在要用到“Licence Manager”。 2、同一个集群中的PC最好是安装同一版本的matlab 和mcde 以防出现不兼容的…

linux下不同服务器间数据传输(rcp,scp,rsync,ftp,sftp,lftp,wget,curl)

因为工作原因&#xff0c;需要经常在不同的服务器见进行文件传输&#xff0c;特别是大文件的传输&#xff0c;因此对linux下不同服务器间数据传输命令和工具进行了研究和总结。主要是rcp,scp,rsync,ftp,sftp,lftp,wget,curl。 rcp rcp不是一种安全的的传输文件的方式&#xff0…

bzoj3143: [Hnoi2013]游走

求经过边的期望次数&#xff0c;然后边的编号相当于给期望一个系数&#xff0c;期望大到小给编号就好 假如可以强行改边为点高斯消元的话是很方便的&#xff0c;然而并不资瓷 但是我们可以先把经过点的期望次数求出来&#xff1a;f(u)sigema((u,v)属于E且v!n)v f(v)/du(v)&…

pythonsqlite3模糊_Python编写通讯录通过数据库存储实现模糊查询功能

1.要求 数据库存储通讯录&#xff0c;要求按姓名/电话号码查询&#xff0c;查询条件只有一个输入入口&#xff0c;自动识别输入的是姓名还是号码&#xff0c;允许模糊查询。 2.实现功能 可通过输入指令进行操作。 &#xff08;1&#xff09;首先输入“add”&#xff0c;可以对通…

分布式Matlab计算集群建立方法与Demo

文章来源&#xff1a;http://hi.baidu.com/modelren/item/6a9d09ff178db405d99e7220 我的实验室有五台双核Pentium D 925计算机&#xff0c;这正适合用来做分布式或并行式计算。我打算只调用那些计算机中的一个核参与计算&#xff0c;留下一个核可以让其他人正常地使用该计算机…

用python实现远程复制 (scp + expect )

scp 功能很强大&#xff0c;但需要人工输入 password, 当然可以通过把 公钥保存在远程主机的 ~/.ssh 目录中&#xff0c;而后就不用输入password&#xff0c;但这需要配置. 用 sshpass 可能在命令输入 password, 但 需要用 “sudo apt-get install sshpass” 安装 如果不想用…

esp8266oled做时钟python_利用esp8266和鸿蒙带的OLED屏做了一个时钟

连接图&#xff1a; 先看原理图&#xff1a;然后接线接线原理如下&#xff1a; * 7pin SPI引脚&#xff0c;正面看&#xff0c;从左到右依次为GND、VCC、D0、D1、RES、DC、CS * ESP8266 --- OLED * 3V --- VCC * G --- GNDS * D7 --- D1 * D5 --- D0 * D2orD8--- CS * D1 --- D…

junit白盒测试 案例_JUnit通过失败测试案例

junit白盒测试 案例为什么要建立一种预期测试失败的机制&#xff1f; 有一段时间&#xff0c;人们会希望并期望JUnit Test案例失败。 尽管这种情况很少见&#xff0c;但确实发生了。 我需要检测JUnit测试何时失败&#xff0c;然后&#xff08;如果期望的话&#xff09;通过而不…

局域网共享问题全方位解决

声明&#xff1a;这不是共享组建教程&#xff0c;而是问题解决。如果你对共享一直搞不清&#xff0c;那么你可以花一至几小时的时间来看这篇文章&#xff0c;我相信以后共享问题你基本上都能解决。 看过了很多人写共享教程&#xff0c;看过了更多人写共享问题解决。可是&#x…

Oracle SQL中实现indexOf和lastIndexOf功能

Oracle SQL中实现indexOf和lastIndexOf功能 https://www.2cto.com/database/201305/210470.html转载于:https://www.cnblogs.com/diyunpeng/p/9884387.html

Linux静态库和动态库学习总结

一、废话 之前由于工作需要&#xff0c;要封装一个Linux加密解密转换的动态库&#xff0c;这个之前只做过Windows下面的&#xff0c;Linux下面还真没有做过&#xff0c;之后做了整一个晚上才算做好&#xff0c;不过其中也学到了不少东西&#xff0c;包括Linux下的动态库和静态库…

Java Date Nuances的痛苦提醒

这些天&#xff0c;我不再需要使用java.util.Date了&#xff0c;但是最近选择这样做&#xff0c;这让我想起了使用与Java Date关联的API的痛苦 。 在这篇文章中&#xff0c;我看了弃用的参数化Date构造函数的一些令人惊讶的API期望&#xff0c;该构造函数接受六个整数 。 在20…

python 与别的程序通信_《Python》进程之间的通信(IPC)、进程之间的数据共享、进程池...

一、进程间通信---队列和管道&#xff08;multiprocess.Queue、multiprocess.Pipe&#xff09; 进程间通信&#xff1a;IPC&#xff08;inter-Process Communication&#xff09; 1、队列 概念介绍&#xff1a; 创建共享的进程队列&#xff0c;Queue是多进程的安全的队列&#…

Matlab里evalin和assignin的用法

原文地址&#xff1a;Matlab里evalin和assignin的用法作者&#xff1a;了凡春秋assignin与evalin MATLAB通常的基本工作空间是base空间。MATLAB在程序运行过程中&#xff0c;将为每个函数分配它自己的函数工作空间&#xff08;从基本空间中分割出的一块&#xff0c; 以函数…

判断一个程序员水平高低的标准?

a.结果导向 胜者为王&#xff0c;败者为寇。也可以理解为做过什么NB的项目&#xff0c;带来了多少价值。b.竞赛比武 在同一平台下&#xff0c;大家八仙过海各显神通。c.广度和深度 在平时工作或交谈中&#xff0c;逼格最高者被大家膜拜。d.熟练度 …

git编辑器选哪个_[Git]Git创建和修改代码库

有了上一篇的简易Git使用指南&#xff0c;接下来我们就可以创建自己的代码库了创建代码库1.把文件夹变成git文件夹git init 这样子就这里面的内容就可以git了2.把要git的文件放入暂存区 git有三个空间: Working Directory工作区&#xff0c;Staging Area暂存区&#xff0c;Repo…

Q 语言初学者系列:(1)开门篇

声明&#xff1a;本系列文章全部参考自官方教程&#xff0c;由于缺乏中文资料而且本人E文实在太菜&#xff0c;对于E文较好的朋友可以直接通过下面的链接访问官方网站提供的教程&#xff0c; 欢迎大家一起学习讨论。 hhttps://code.kx.com/trac/wiki/QforMortals2/contents 用户…

Java 将数据写入磁盘并读取磁盘上的文件

package test; import java.io.BufferedReader;import java.io.FileReader;import java.io.FileWriter;import java.util.ArrayList;import java.util.List; public class test { public static void main(String[] args) { //创建集合 List<String> listnew ArrayList&l…

simulink怎么生成vxworks的执行程序_Matlab/Simulink基础了解(五):基础Simulink模型搭建及Embedded基础使用...

为了方便大家阅读&#xff0c;从这章开始在章节名中添加该章节要讲的主要的东西。本章给大家讲一个最简单的Simulink模型&#xff1a;在处理应用逻辑时常常会判断车速信号来得到车是否静止&#xff0c;如果车已经跑起来了&#xff0c;很多功能都会被禁用&#xff0c;比如诊断&a…

Q 语言初学者系列:(2)基本数据类型

明&#xff1a;本系列文章全部参考自官方教程&#xff0c;由于缺乏中文资料而且本人E文实在太菜&#xff0c;对于E文较好的朋友可以直接通过下面的链接访问官方网站提供的教程&#xff0c; 欢迎大家一起学习讨论。 hhttps://code.kx.com/trac/wiki/QforMortals2/contents 用户名…