flink和kafka区别_Apache Flink和Kafka入门

flink和kafka区别

介绍

Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。

Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。

弗林克·卡夫卡

先决条件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

创建您的Flink流项目

第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,其中包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。

在此应用程序中,我们将创建两个作业:

  • WriteToKafka :生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。
  • ReadFromKafka :读取相同的主题,并使用Kafka Flink连接器及其使用者在标准输出中打印消息。 API。

完整项目可在GitHub上找到:

  • Flink和Kakfa应用

让我们使用Apache Maven创建项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven将创建以下结构:

tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│   └── com│       └── grallandco│           └── demos│               ├── BatchJob.java│               ├── SocketTextStreamWordCount.java│               ├── StreamingJob.java│               └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

此项目配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行它所需的所有依赖关系。

该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。

添加Kafka连接器

打开pom.xml并将以下依赖项添加到您的项目中:

第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:

您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>元素中添加以下条目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

Flink项目现在准备通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。

安装并启动Kafka

下载Kafka,在终端中输入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通过在新终端中运行以下命令来启动Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一个终端中,运行以下命令来创建一个名为flink-demo的Kafka主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具将消息发布和使用到flink-demo主题。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消费者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。

编写您的Flink应用程序

现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。

制片人

生产者使用SimpleStringGenerator()类生成消息,然后将字符串发送到flink-demo主题。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代码在此处提供 。

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 在应用程序环境中创建一个新的DataStream时, SimpleStringGenerator类将Flink中所有流数据源的Source接口实现SourceFunction 。
  • FlinkKafkaProducer09器添加到主题。

消费者

使用者只需从flink-demo主题中读取消息,然后将它们打印到控制台中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者group.id
  • 使用FlinkKafkaConsumer09从主题flink-demo获取消息

生成并运行应用程序

让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。

1-建立专案:

$ mvn clean package

2-运行Flink生产者作业

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-运行Flink消费者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在终端中,您应该看到生产者生成的消息

现在,您可以在Flink群集上部署并执行此作业。

结论

在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。

翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

flink和kafka区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

solr cloud 更新 solrconfig 配置_Solr各版本新特性「4.x,5.x,6.x,7.x」

一.Solr4.x新特性1.近实时搜索Solr的近实时搜索【Near Real-Time&#xff0c;NRT】功能实现了文档添加到搜索的快速进行&#xff0c;以应对搜索快速变化的数据。2.原子更新与乐观并发原子更新功能允许客户端应用对已有文档上进行添加、更新、删除和对字段增值等操作&#xff0c…

电商系统的商品流水记录

商品销售流水 销售单变成“归档”之后&#xff0c;系统生成相关商品的销售流水记录&#xff0c;这里要考虑退货扣减的问题也可以不生成相应的流水记录&#xff0c;直接从销售子单取相关数据显示即可 商品出库流水 出库单变成“已出库”后&#xff0c;系统生成相关商品的出库…

junit数据驱动测试_使用Junit和Easytest进行数据驱动的测试

junit数据驱动测试在本文中&#xff0c;我们将看到如何使用Junit进行数据驱动的测试。 为此&#xff0c;我将使用一个名为EasyTest的库。 我们知道&#xff0c;对于TestNG&#xff0c;它已内置了数据提供程序。 通过简单的测试&#xff0c;我们可以使用Junit进行数据驱动的测试…

centos rpm安装mysql5.5_CentOS下以RPM方式安装MySQL5.5

首先去mySQL官网下载页面&#xff1a;http://dev.mysql.com/downloads/mysql/#downloadsSelect Platform 选择 Oracle & Red Hat Linux 4 & 5分别下载以下三个文件(由于我的机器是32位&#xff0c;下面是32位版本的包&#xff0c;如果你的机器是64位的请下载64位版本)…

HH SaaS电商系统的出库功能模块设计

文章目录出库单业务流程基本流程扩展流程找不到符合条件的仓库&#xff0c;要求部分退款&#xff08;未生成出库单时&#xff09;找不到符合条件的仓库&#xff0c;全部退款&#xff08;未生成出库单时&#xff09;找不到符合条件的仓库&#xff0c;等待库存补足&#xff08;未…

java 拼图_拼图项目的诅咒:为什么Java 9一遍又一遍地延迟?

java 拼图JDK 9发行日期推迟到2017年7月 距JDK 9发行不到200天&#xff0c;它又被推迟了 。 新的发布日期已更新为2017年7月&#xff0c;比之前推迟的日期晚了四个月。 推迟日期 9月13日&#xff0c;Oracle Java平台小组的首席架构师Mark Reinhold发表了他的建议&#xff0c;…

mysql数据库增删改查关键字_mysql数据库的增删改查

数据库基本操作&#xff1a;增删改查#DML语言/*数据操作语言&#xff1a;插入&#xff1a;insert修改&#xff1a;update删除&#xff1a;delete*/1.增插入语句的方式一表已经存在啦&#xff0c;我们需要往里面插入数据/*语法&#xff1a;insert into 表名(列名,…) values(值1…

HH SaaS电商系统的采购功能模块设计

文章目录如何生成采购单系统生成采购单的流程基本流程扩展流程找不到符合条件的供应商&#xff0c;要求部分退款&#xff08;初次生成采购单时&#xff09;找不到符合条件的供应商&#xff0c;要求全部退款&#xff08;初次生成采购单时&#xff09;指定供应商的库存不足&#…

本地缓存防止缓存击穿_防止缓存爆炸的快速提示

本地缓存防止缓存击穿在很多情况下&#xff0c;您都可以从应用程序中的常用对象缓存中受益&#xff0c;特别是在面向Web和微服务的环境中。 您可以在Java中执行的最简单的缓存类型可能是引入一个私有HashMap&#xff0c;在计算对象之前先查询该哈希表&#xff0c;以确保您不会重…

php mysql导出csv文件_详解PHP导入导出CSV文件

我们先准备mysql数据表&#xff0c;假设项目中有一张记录学生信息的表student&#xff0c;并有id&#xff0c;name&#xff0c;sex&#xff0c;age分别记录学生的姓名、性别、年龄等信息。CREATE TABLE student (id int(11) NOT NULL auto_increment,name varchar(50) NOT NULL…

HH SaaS电商系统的入库功能模块设计

文章目录创建入库单的场景创建入库单的业务流程商品直接入库内部仓退货入库&#xff08;内部仓&#xff09;换货入库&#xff08;内部仓&#xff09;退货入库&#xff08;外部仓&#xff09;换货入库&#xff08;外部仓&#xff09;备货入库&#xff08;内部仓&#xff09;备货…

aws ecr_在ECR上推送Spring Boot Docker映像

aws ecr在先前的博客中&#xff0c;我们将Spring Boot应用程序与EC2集成在一起。 它是您可以在Amazon Web Services上进行的最原始的部署形式之一。 在本教程中&#xff0c;我们将使用我们的应用程序创建一个docker映像&#xff0c;该映像将存储到Amazon EC2容器注册表中 。 …

HH SaaS电商系统的商品营销角标功能模块设计

1、角标只属于商城&#xff0c;自营店铺共享商城的角标&#xff0c;第三方店铺可以申请角标 2、角标跟着素材模板走&#xff0c;关联素材模板id&#xff0c;一对多关系

python为什么这么小_同样是 Python,怎么区别这么大

发现问题上周&#xff0c;我的测试同事告诉我&#xff0c;你的用户名怎么还允许中文啊&#xff1f;当时我心里就想&#xff0c;你们测试肯定又搞错接口了&#xff0c;我用的是正则\w过滤了参数&#xff0c;怎么可能出错&#xff0c;除非Python正则系统出错了&#xff0c;那是不…

dynamodb容器使用_使用DynamoDBMapper扫描DynamoDB项目

dynamodb容器使用之前&#xff0c;我们介绍了如何使用DynamoDBMapper或底层Java api查询DynamoDB数据库。 除了发出查询之外&#xff0c;DynamoDB还提供扫描功能。 扫描的目的是获取您在DynamoDB表上可能拥有的所有项目。 因此&#xff0c;扫描不需要任何基于我们的分区键或…

HH SaaS电商系统的商品营销标签功能模块设计

1、营销标签只属于商城&#xff0c;自营店铺共享商城的营销标签&#xff0c;第三方店铺可以申请营销标签 2、营销标签跟着商品走&#xff0c;关联spu_ext_id&#xff0c;多对多关系 3、一个商品最多添加5个营销标签

python加减法计算题 代码_关于《剑指offer》中不用加减乘除做加法的Python代码的问题...

题目如下&#xff1a;写一个函数&#xff0c;求两个整数之和&#xff0c;要求在函数体内不得使用、-、*、/四则运算符号。题目不难&#xff0c;可以采用位操作来实现&#xff0c;利用异或运算来计算不带进位的加法结果&#xff0c;利用与运算计算进位的标志&#xff0c;然后将这…

接口方法javadoc注释_继承Javadoc方法注释

接口方法javadoc注释尽管用于javadoc工具的JDK工具和实用程序页面通过实现和继承方法来描述Javadoc方法注释重用的规则&#xff0c;但是当实际上不需要使用{inheritDoc}时&#xff0c;很容易不必要地显式描述注释继承&#xff0c;因为会使用相同的注释隐式继承。 Java 8 javado…

HH SaaS电商系统的商品销售管理标签功能模块设计

1、商城和店铺有各自的销售管理标签 2、商品管理标签跟着商品走&#xff0c;关联spu_ext_id或者sku_ext_id&#xff0c;多对多关系

redis java 监听_从零手写实现redis(四)添加监听器

前言java从零手写实现redis&#xff08;一&#xff09;如何实现固定大小的缓存&#xff1f;java从零手写实现redis&#xff08;三&#xff09;redis expire 过期原理java从零手写实现redis&#xff08;三&#xff09;内存数据如何重启不丢失&#xff1f;本节&#xff0c;让我们…