pre1-flink理论-批处理与流处理+简单示例

【README】

1.本文包含了 批处理与流处理的代码示例;

  • 批处理:把数据 攒在一起(或攒一段时间或攒一定内存大小),然后再处理,这叫批处理;
  • 流处理:数据每来一个就处理一个;

2.特点:

数据处理方式特点
批处理1.高延时;
流处理1.低延时;

3.引入flink的maven依赖:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency></dependencies>

【1】flink批处理离线数据(数据有限)

【1.1】代码

1)数据源,我们保存在本地文本文件中,命名为  hello.txt

hello world
hello flink
how are you
thank you
hello zhangsan
hello lisi

2)批处理代码:

/*** @Description 批处理,word count程序(离线数据)* @author xiao tang* @version 1.0.0* @createTime 2022年04月09日*/
public class WordCount {public static void main(String[] args) throws Exception {// 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据String inputPath = "D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt";DataSource<String> dataSource = env.readTextFile(inputPath);// 对数据集处理,按照空格分词展开,转为 (word,1) 二元组统计DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new MyFlatMapper()).groupBy(0) // 按照第1个位置的word分组.sum(1); // 将第2个位置上的数据求和resultSet.print();}public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {// 按照空格分词String[] words = value.split(" ");// 遍历所有word,包装成word 输出Arrays.stream(words).forEach(x->{collector.collect(new Tuple2<>(x, 1));});}}
}

批处理打印结果:

(you,2)
(flink,1)
(world,1)
(hello,4)
(lisi,1)
(zhangsan,1)
(are,1)
(thank,1)
(how,1)

批处理的结果是最终结果


【2】flink流处理离线数据(数据有限)

/*** @Description 流数据(无限数据)* @author xiao tang* @version 1.0.0* @createTime 2022年04月09日*/
public class StreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从文件中读取数据String inputPath = "D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt";DataStream<String> dataStream = streamEnv.readTextFile(inputPath);// 定义流操作DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务(流终止操作)streamEnv.execute();}
}

打印结果:

2> (world,1)
1> (thank,1)
2> (flink,1)
1> (hello,1)
2> (how,1)
2> (you,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (zhangsan,1)
1> (hello,4)
2> (lisi,1)
1> (are,1)

流处理的结果是一个动态变化的有状态的结果;

有状态的意思说白了就是:后面的处理结果依赖前面的处理结果,如对hello计数为3,它是在前面hello计数为2的基础上做的处理;


【3】flink流处理在线数据(数据无限)

我们引入了 netcat(nc),底层使用socket模拟向某端口写入数据;

然后 flink监控该端口的数据,并做处理;

【3.1】 flink处理类

处理类监听了 nc所在机器的的端口,即 192.168.163.201:7777;

/*** @Description socket文本流词计数* @author xiao tang* @version 1.0.0* @createTime 2022年04月09日*/
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从 flinkjava parametertool 获取参数(或有)
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String host = parameterTool.get("host");
//        int port = parameterTool.getInt("port");// 从socket文本流读取数据DataStream<String> inputDataStream = streamEnv.socketTextStream("192.168.163.201", 7777);// 定义流操作DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务(流终止操作)streamEnv.execute();}
}

演示效果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python表单提交的两种方式_Flask框架学习笔记之表单基础介绍与表单提交方式

本文实例讲述了Flask框架学习笔记之表单基础介绍与表单提交方式。分享给大家供大家参考&#xff0c;具体如下&#xff1a;表单介绍表单是HTML页面中负责数据采集功能的部件。由表单标签&#xff0c;表单域和表单按钮组成。通过表单&#xff0c;将用户输入的数据提交给服务器&am…

高级 | Java中获取类名的3种方法

转载自 高级 | Java中获取类名的3种方法获取类名的方法 Java 中获取类名的方式主要有以下三种。 getName() 返回的是虚拟机里面的class的类名表现形式。 getCanonicalName() 返回的是更容易理解的类名表示。 getSimpleName() 返回的是类的简称。 都有什么区别&#xff1f; 通过…

Asp.net 面向接口可扩展框架之核心容器

新框架的容器部分终于调通了&#xff01;容器实在太重要了,所以有用了一个名词叫“核心容器”。 容器为什么那么重要呢&#xff1f;这个有必要好好说道说道。 1、首先我们从框架名称面向接口编程说起,什么是面向接口编程&#xff1f;(这个度娘回答一下) 解读一下:类是个体的定义…

pre2-flink单机部署与job提交

【README】 本文记录了flink单机部署&#xff0c;以及flink job2种提交方式&#xff1b; 【1】flink 单机部署 step1&#xff09;下载flink 包&#xff1b; Apache Flink: Stateful Computations over Data Streamshttps://flink.apache.org/ step2&#xff09;解压 tar -z…

到底什么是跨域?附解决方案

转载自 到底什么是跨域&#xff1f;附解决方案什么是跨域 要了解跨域&#xff0c;先要说说同源策略。 同源策略是由 Netscape 公司提出的一个著名的安全策略&#xff0c;所有支持 JavaScript 的浏览器都会使用这个策略。 所谓同源是指&#xff0c;域名&#xff0c;协议&#xf…

vue 字典_【开源】基于Vue的前端组件库HeyUI

说道vue组件库&#xff0c;目前主流的基本就是iview和element。今天又发现一个很不错的。HeyUI。组件也很丰富&#xff0c;入门比较简单。反正开源框架我们有不嫌多&#xff0c;多多益善啊。感兴趣的可以看看。关于HeyUIHeyUI 是一套基于 Vue2.0 的开源 UI 组件库&#xff0c;主…

(译)java8-流定义

【README】 本文翻译自 Stream In Java - GeeksforGeeks &#xff0c; 主要介绍了java8流&#xff1b; 【1】流 1&#xff09;流定义&#xff1a;流是支持各种方法的对象序列&#xff08;一系列对象&#xff09;&#xff0c;这些方法可以流水线化调用以产生期望结果&#xff…

基于CefSharp构建基于Chromium的应用程序

chromium是google chrome浏览器所采用的内核&#xff0c;最开始由苹果的webkit发展而出&#xff0c;由于webkit在发展上存在分歧&#xff0c;而google希望在开发上有更大的自由 度&#xff0c;2013年google决定自己开发webcore的分支&#xff0c;叫做Blink引擎&#xff0c;而后…

最新后端架构师技术图谱

转载自 最新后端架构师技术图谱深呼吸&#xff0c;慢慢学&#xff0c;技术长路漫漫… 数据结构二叉树完全二叉树平衡二叉树二叉查找树&#xff08;BST&#xff09;红黑树B-&#xff0c;B&#xff0c;B*树LSM 树队列集合链表、数组字典、关联数组栈树BitSet常用算法KPM 算法选择…

ansible脚本-Playbook(一)

Playbook组成部分&#xff1a; task 任务&#xff1a;包含目标主机上执行的操作&#xff0c;使用模块定义这些操作&#xff0c;每个任务都是一个模块的调用Variables变量&#xff1a;存储和传递数据&#xff0c;变量可以自定义&#xff0c;可以在playbook当中定义为全局变量&a…

三级pc技术_第十九周PC、笔电、数码周边新品汇总:AMD英特尔激战正酣

【dogkeji-科技犬】各位网友周末好&#xff0c;又到了2020年第十九周的PC、笔电、数码周边新品发布汇总时刻&#xff08;2020年5月4日至2020年5月9日&#xff09;&#xff0c;那么本周有那些PC、笔电、数码周边新品发布呢&#xff1f;通过科技犬的汇总我们来一起回顾一下吧。AM…

【DDD/CQRS/微服务架构案例】在Ubuntu 14.04.4 LTS中运行WeText项目的服务端

在《WeText项目&#xff1a;一个基于.NET实现的DDD、CQRS与微服务架构的演示案例》文章中&#xff0c;我介绍了自己用Visual Studio 2015&#xff08;C# 6.0 with .NET Framework 4.6.1&#xff09;开发的DDD/CQRS/微服务架构的案例项目&#xff1a;WeText。文章发出后反响很好…

es6 dsl与sql对比

【README】 1.本文总结了 dsl 与 sql的对比写法&#xff1b; 2.es采用 7.2.1 版本&#xff1b; 【1】创建es索引 1&#xff09;新建一个数据库事务执行日志索引 put localhost:9200/txlog { "mappings" :{ "properties":{"APPNAME"…

echarts line 去掉最外围方框_干货 | 关于射频芯片最详细解读

传统来说&#xff0c;一部可支持打电话、发短信、网络服务、APP应用的手机&#xff0c;一般包含五个部分部分&#xff1a;射频部分、基带部分、电源管理、外设、软件。射频部分&#xff1a;一般是信息发送和接收的部分&#xff1b;基带部分&#xff1a;一般是信息处理的部分&am…

服务器性能指标(一)——负载(Load)分析及问题排查

转载自 服务器性能指标&#xff08;一&#xff09;——负载&#xff08;Load&#xff09;分析及问题排查平常的工作中&#xff0c;在衡量服务器的性能时&#xff0c;经常会涉及到几个指标&#xff0c;load、cpu、mem、qps、rt等。每个指标都有其独特的意义&#xff0c;很多时候…

HoloLens开发手记 - HoloLens shell概述 HoloLens shell overview

使用HoloLens时&#xff0c;shell是由你周围的世界和来自系统的全息图像构成。我们将这种空间成为混合世界&#xff08;mixed world&#xff09;。 shell包含了一个可以让你将全息图像和应用放置在世界中的开始菜单&#xff08;Start Menu&#xff09;。当一个应用已经被放置在…

【1】flink-source读取数据

【README】 本文记录了flink读取不同数据源的编码方式&#xff0c;数据源包括&#xff1b; 集合&#xff08;元素列表&#xff09;&#xff1b;文件kafka&#xff1b;自定义数据源&#xff1b; 本文使用的flink为 1.14.4 版本&#xff1b;maven依赖如下&#xff1a; <dep…

Oracle入门(二)之服务启动bat

转载自 批处理&#xff08;bat文件&#xff09;自动启动/关闭oracle服务 批处理&#xff08;bat文件&#xff09; 自动启动/关闭oracle服务 判断oracle 服务状态如果服务处于启动状态&#xff0c;就关闭服务&#xff1b;如果服务处于关闭状态&#xff0c;就启动服务。 ECHO OFF…

【2】flink数据流转换算子

【README】 本文记录了flink对数据的转换操作&#xff0c;包括 基本转换&#xff0c;map&#xff0c;flatMap&#xff0c;filter&#xff1b;滚动聚合&#xff08;min minBy max maxBy sum&#xff09;&#xff1b;规约聚合-reduce&#xff1b;分流&#xff1b;connect连接流…

第三篇 Entity Framework Plus 之 Query Cache

离上一篇博客&#xff0c;快一周&#xff0c;工作太忙&#xff0c;只能利用休息日来写一些跟大家分享&#xff0c;Entity Framework Plus 组件系列文章&#xff0c;之前已经写过两篇 第一篇 Entity Framework Plus 之 Audit 第二篇 Entity Framework Plus 之 Query Future 计划…