【5】flink窗口与水位线watermark例子

【README】

0)本文编写了多个flink水位线watermark的代码例子,加深对watermark的理解 ;

1)时间分类

  • Event Time: 事件创建的时间(事件发生时间);
  • Ingestion Time:数据进入flink的实际;
  • Processing Time:执行算子的本地机器时间 ;

我们主要讨论的是 事件时间

2)flink窗口分为 滚动窗口,滑动窗口, 本文使用了 滚动窗口

  1. 滚动窗口: 只有1个参数,窗口长度与窗口步长(窗口创建频率)相等;
  2. 滑动窗口:有2个参数,即窗口长度,窗口步长;可以手动设置,可以相等也可以不等;

3)本文结合代码示例讲了 水位线, 窗口,窗口属性 lateness 延迟属性, 窗口流的 siteOutputLateData 侧输出流(旁路输出),及其它们的作用;


【1】水位线

1)定义(本文自定义总结,非官方):水位线 watermark,指的是 flink底层在数据流中添加的带有时间戳的数据,当这些水位线数据到达算子时(如窗口算子),算子会认为 小于水位线的业务数据都来了;(数据可以理解为 一条日志,或温度传感器采集的温度信息)

2)作用: 水位线可以用来处理无序数据流;(下文代码例子会给出);

3)如何产生水位线?

  • 指定水位线的时间戳如何获取? 可以指定 水位线时间戳从业务数据(抽象为javabean)的某个属性获取;
  • 指定水位线可以延迟多长时间,即允许无序数据最多可以晚来多长时间;(超过这个时间会被丢弃)

【1.1】事件迟到被丢弃

1)建立一个 10s 滚动窗口算子(每10s新开一个长度为10s的窗口),水位线取温度bean的时间戳,且延迟 0,如下:

其中 窗口用于收集id号码,即属于同一个窗口的元素的id会被收集到一起;

public class WindowTest3_EventTimeWatermarkWindow3 {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket读取数据,数据格式参见 sensorTimeWatermarkWindow.txt
//        DataStream<String> textStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWatermarkWindow.txt");// nc -lk 7777 DataStream<String> textStream = env.socketTextStream("192.168.163.201", 7778);// 转换为 SensorReader pojo类型DataStream<SensorReadingTimeWatermarkWindow> sensorStream = textStream.map(x -> {String[] arr = x.split(",");return new SensorReadingTimeWatermarkWindow(arr[0], arr[1], arr[2], new BigDecimal(arr[3]));});// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是时间时间SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));// 开窗聚合SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {@Overridepublic String createAccumulator() {return "";}@Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s + ", " + sensorReadingTimeWatermarkWindow.getId();}@Overridepublic String getResult(String s) {return s;}@Overridepublic String merge(String s, String acc1) {return s + ", " + acc1;}});// 打印aggForWindowStream.print("aggForWindowStream");// 执行env.execute("aggForWindowStream");}
}

上述代码中的水位线的延迟时间为0s,即水位线时间戳等于事件时间戳; 

元素抽象为 传感器信息bean,如下:

public class SensorReadingTimeWatermarkWindow {private String id;private String type;private Timestamp timestamp;private BigDecimal temperature;public SensorReadingTimeWatermarkWindow() {}public SensorReadingTimeWatermarkWindow(String id, String type, String timeStr, BigDecimal temperature) {this.id = id;this.type = type;this.temperature = temperature;this.parseTimestamp(timeStr);}private void parseTimestamp(String timeStr)  {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {this.timestamp = new Timestamp(simpleDateFormat.parse(timeStr).getTime());} catch (ParseException e) {this.timestamp = new Timestamp(System.currentTimeMillis());}}
}

接收的是 nc 客户端的socket文本流,窗口算子计算结果如下:

详情如下:

1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9  -> 1, 7, 8, 11 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 16,sensor1,2022-04-17 22:07:23,36.9 -> 12,15 

【结果分析】

  • 发现1:当事件12(id=12)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件12的时间戳=22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11】;
  • 发现2: 当事件16(id=16)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件16的时间戳=22:07:23,这个时间戳大于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;
  • 发现3:事件13没有更新水位线,因为水位线必须单调递增(事件12发生时的水位线是22:07:11,事件13的时间戳是22:07:09,所以事件13发生时不会更新水位线);

问题来了: 事件13去哪里了? 被 flink 丢弃了,因为事件13迟到了

  • 如何理解事件迟到了: 因为事件12 的时间戳为 22:07:11,又水位线延迟0s,所以水位线的 时间戳也是 22:07:11,这大于窗口结束时间,所以窗口关闭并计算结果,窗口关闭后,事件13才来,因此被丢弃

【补充】窗口范围是左闭右开;如上图,第1个窗口的范围是 [0,10),第2个窗口是 [10,20)


【1.2】 事件迟到但被正常处理

1)修改上述水位线代码, 设置延迟时间为5s,重新录入上述数据,结果如下:

// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是事件时间
SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 水位线延迟时间修改为 5s.withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));

 

1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9  13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 -> 1, 7, 8, 11, 13 16,sensor1,2022-04-17 22:07:23,36.9 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15

【结果分析】

  • 发现1:事件13,事件21 不会更新水位线时间戳,原因上文已经解释过了;
  • 发现2:当事件15(id=15)出现时,因水位线延迟时间为5s,所以水位线等于事件15的时间戳减去5s = 22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11,13】;
  • 发现3:事件13没有被丢弃,因为水位线延迟了5s,窗口在事件15发生时才关闭,所以可以探测到事件13,这也阐述了为啥 flink水位线可以处理无序数据的原理,flink的设计者的水位线idea真的很棒(对比来看,【1.1】中的例子事件13被丢弃);
  • 发现4:当事件22(id=22)出现时,因水位线延迟时间为5s,所以水位线等于事件22的时间戳减去5s = 22:07:20,这个时间戳大于等于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;(大于等于窗口结束时间,窗口就被关闭,因为窗口范围是左开右闭

【2】窗口的 lateness 延迟属性

此外,窗口还有 lateness 属性,表示延迟多长时间关闭窗口;

如下面代码每10s 创建一个长度为12s的窗口; (如果没有 lateness参数或其为0的话, 就是 每10s 创建一个长度为10s的窗口)

代码修改如下:

 SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许窗口延迟 2 秒后关闭窗口

窗口算子计算结果如下:

详情如下:

1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9  13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9  被丢弃 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15, 17, 19 

【结果分析】

  1. 事件15发生时:因水位线延迟5s,所以水位线时间戳=22:07:15-5s=22:07:10,等于第1个窗口的结束时间,故第1个窗口计算,结果为 【1, 7, 8, 11, 13】,但窗口没有关闭,因为lateness为2s,延迟2秒关闭,即当水位线大于等于 22:07:12 时,窗口关闭
  2. 事件16发生时:第1个窗口因为 lateness=2s 没有关闭,又事件16时间戳=22:07:09,所以还是参与窗口1的计算,输出结果【1, 7, 8, 11, 13, 16】;
  3. 事件17发生时:时间戳=22:07:16,水位线时间戳=22:07:11,这小于带lateness=2s的窗口1的关闭时间 22:07:12,所以窗口1还是不会关闭;
  4. 事件18发生时:时间戳=22:07:09, 因水位线单调递增,故不变,还是22:07:11;事件18参与窗口1的计算,结果为 【1, 7, 8, 11, 13, 16, 18】
  5. 事件19发生时:时间戳=22:07:17,水位线=22:07:12,等于带lateness=2s的窗口1的关闭时间,窗口1关闭;
  6. 事件20发生时:时间戳=22:07:09,落入了窗口1的范围(22:07:00~22:07:10),但因窗口1已经关闭,所以事件20被丢弃

通过以上示例,本文应该是把窗口的lateness属性 讲清楚了;

【问题】 事件20被丢弃的话, 不满足业务场景对数据一致性的要求;

  • 因为服务1发送了10条数据,到达服务2的时候却只有9条数据,这不满足业务需求,是开发团队不愿意看到的事情;那如何找回这些被丢弃的事件呢通过旁路输出

【3】如何收集迟到数据

 从旁路输出(side output)获取迟到数据;
通过 Flink 的 旁路输出 功能,可以获得迟到数据的数据流。
首先,需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明需要把迟到数据存入 旁输出流。

代码修改如下:添加旁路输出流(侧输出流)

// 侧输出流,对于延迟的且没有进入窗口的数据,放到侧输出流(旁路输出流)OutputTag<SensorReadingTimeWatermarkWindow> lateOutputTag = new OutputTag<SensorReadingTimeWatermarkWindow>("late") {};// 开窗聚合SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟 2 秒后关闭窗口.sideOutputLateData(lateOutputTag) // 无法进入窗口,则进入侧输出流.aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {@Overridepublic String createAccumulator() {return "";}@Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s + ", " + sensorReadingTimeWatermarkWindow.getId();}@Overridepublic String getResult(String s) {return s;}@Overridepublic String merge(String s, String acc1) {return s + ", " + acc1;}});// 打印窗口算子结果aggForWindowStream.print("aggForWindowStream");// 打印旁输出流aggForWindowStream.getSideOutput(lateOutputTag).print("lateOutputTag");// 执行env.execute("aggForWindowStream");

事件发生详情如下:

1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9  13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9  -> lateOutputTag> SensorReadingTimeWindow{id='20', type='sensor1', timestamp=2022-04-17 22:07:09.0, temperature=36.9} 

结果分析:

  • 相比于【2】中代码示例, 事件20被丢弃了;而【3】中代码,当事件20出现时,由于窗口已经关闭,但存在侧输出流(旁路输出),所以事件20 存入侧输出流解决了乱序数据迟到事件过长导致数据不一致的问题);相反如果没有侧输出流,则事件20会被丢弃;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329548.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

技术人生,专家本色——采访张善友老师后的一点感受

距离Xamarin Evolve开幕还有不到三个小时。Miguel deIcaza和他的团队又将发布周年升级平台Xamarin 5。作为一个关注Mono/Xamarin多年的老粉丝&#xff0c;这个时候心情是蛮激动的。我想&#xff0c;刚刚作客.NET FM第五期“来者何人”专访系列的张善友老师&#xff0c;一定也是…

(翻)为什么要训练人工神经网络

【README】 本文翻译自 https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73 【1】介绍 今天我将从非常简短的神经网络介绍开始&#xff0c;这足以理解我将要讨论的…

使用VS Code开发 调试.NET Core 应用程序

使用VS Code开发 调试.NET Core RC2应用程序,由于.NET Core 目前还处于预览版。 本文使用微软提供的示例进行开发及调试。 https://github.com/aspnet/cli-samples .NET Core 介绍及说明&#xff1a; https://github.com/dotnet/cli http://dotnet.github.io/getting-started/ …

(转)简单神经网络(权值阈值训练)

本文转自&#xff1a; 神经网络入门 - 阮一峰的网络日志https://www.ruanyifeng.com/blog/2017/07/neural-network.html 眼下最热门的技术&#xff0c;绝对是人工智能。 人工智能的底层模型是"神经网络"&#xff08;neural network&#xff09;。许多复杂的应用&am…

Oracle入门(五A)之conn命令

一、connect命令将给定的用户名连接到Oracle数据库。当你运行一个连接命令、站点配置文件、Galgn.SQL和用户配置文件&#xff0c;按顺序处理Login .SQL。连接不重发如果初始连接不成功&#xff0c;请使用用户名或密码。语法&#xff1a; conn[ect] [{登录串|/|代理串} [AS…

2-操作系统启动

【README】 本文内容总结自 《操作系统-哈工大李治军老师》&#xff0c;内容非常棒&#xff0c;墙裂推荐&#xff1b; 【1】概述 1&#xff09;问题&#xff1a;操作系统启动时&#xff0c;它应该做什么事情&#xff1f; 计算机启动时&#xff0c;需要把存储在磁盘上的操作系…

Asp.net 面向接口可扩展框架之类型转化基础服务

新框架正在逐步完善,可喜可贺的是基础服务部分初具模样了,给大家分享一下 由于基础服务涉及面太广,也没开发完,这篇只介绍其中的类型转化部分,命名为类型转化基础服务,其实就是基础服务模块的类型转化子模块 说到类型转化必须要弄清楚.net的类型,类型都不清楚何来类型转化 1、P…

asset文件夹路径 unity_我们来捣鼓一下Unity的平台跳跃Microgame

作者&#xff1a;Truly大家好。Platformer Microgame是Unity官方的一个2D平台跳跃游戏项目模板&#xff0c;小伙伴们可以在这个模板的基础上修改或者制作自己的游戏。下边我们就来简单试玩下这个工程吧~一、资源导入写文章时&#xff0c;该资源暂时还不支持Unity 2019.x版本&am…

Oracle入门(五)之基本命令操作

一、show &#xff08;1&#xff09;查询Oracle系统变量值 语法&#xff1a;show 变量名 show all --查看所有68个系统变量值 show user --显示当前连接用户 注&#xff1a;Oracle的68个系统变量的key和默认value &#xff08;2&#xff09;查询oracle数据库当前的参数值 …

Linux工程师新法宝:在Visual Studio上用C++ 写Linux

如今我们正在开发一个新的插件&#xff0c;一个能够让开发者在 Visual Studio (以下简称 VS ) 上建构能够在 Linux 上运行 C 程序的套件。开发者可以借由这个插件将 C 程序移转到 Linux 服务器、PC 以及移动设备上&#xff0c;也同时可以借由这个插件将这些机器连结至你的 VS 上…

互联网+大赛作品_“颂中国力量 绘美好梦想”全市中小学生互联网+书画大赛作品展示(五)...

主办平顶山市教育体育局承办平顶山市教育体育局关心下一代工作委员会平顶山教育电视台教育部中国书画等级考试平顶山市招生管理办公室协办平顶山市书法家协会平顶山市美术家协会一比赛宗旨作品以中国战胜疫情为背景&#xff0c;以书法绘画为表现形式&#xff0c;真实记录全市中…

(转)Spring Boot启动过程 和 Bean初始化过程中的拓展接口详解

转自&#xff1a; Spring Boot启动过程 和 Bean初始化过程中的拓展接口详解_guyue35的博客-CSDN博客Spring Boot启动过程和 Bean初始化过程中的拓展接口详解1.背景 Spring的核心思想就是容器&#xff0c;当容器refresh的时候&#xff0c;外部看上去风平浪静&#xff0c;其实内…

Oracle入门(七B)之表空间删除数据文件未删除

转载自 oracle表空间删除数据文件未删除清理老旧数据的时候,不太熟悉操作,直接把表空间删了,未删除数据文件, HIS20170927此表空间已删除但数据文件还在,这时新创建一个表空间直接指定到这个数据文件上. create tablespace HIS20170927 datafile /oracle/oradata/****/HIS20170…

玩玩Xamarin Evolve 2016带来的新特性(一)-iOS Simulator(for Windows)

编者语&#xff1a; Xamarin Evolve 2016给不少.NET社群的朋友带来了强心剂&#xff0c;做.NET还是有前途啊&#xff0c;特别在微软开源的策略上。我会在五一假期和大家谈谈Xamarin中大家比较关注的iOS Windows模拟器&#xff0c;Xamarin Forms Previewer,还有Xamarin WorkBook…

python实现倒n字形排列_Leetcode问题库——Z形变换(Python),补充X形变换,字形

Z字形变换题目将一个给定字符串根据给定的行数&#xff0c;以从上往下、从左到右进行 Z 字形排列。比如输入字符串为 “LEETCODEISHIRING” 行数为 3 时&#xff0c;排列如下&#xff1a;之后&#xff0c;你的输出需要从左往右逐行读取&#xff0c;产生出一个新的字符串&#x…

蓝绿发布,灰度发布及滚动发布

【README】 本文转自&#xff1a;理解蓝绿发布、灰度发布和滚动发布_Jitwxs的博客-CSDN博客_蓝绿发布和灰度发布的区别目前绝大多数公司的业务系统都是集群化部署&#xff0c;那么在新版本上线时&#xff0c;保证平滑稳定&#xff0c;尽量减少对线上用户的影响&#xff0c;就显…

Oracle入门(七)之表空间

表空间表空间是数据库的逻辑划分&#xff0c;一个表空间只能属于一个数据库。所有的数据库对象都存放在指定的表空间中。但主要存放的是表&#xff0c; 所以称作表空间。 Oracle数据库中至少存在一个表空间&#xff0c;即SYSTEM的表空间。 &#xff08;1&#xff09;创建表空…

让我们Core在一起:ASP.NET Core amp; .NET Core

Microsoft .NET 自 2002 年发行 v1.0 以来&#xff0c;已经过了近 14 个年头&#xff0c;在这 14 年里面&#xff0c;.NET 日渐成熟并成为 Microsoft 的重要开发平台之一&#xff0c;只要是在 Windows 平台上的相关应用&#xff0c;几乎都可以使用 .NET 以及所属的 C# 及 VB 语…

python菱形画法解释_用Python画棱形

定义一个函数画棱形def diamond(height):"""Return a string resembling a diamond of specified height (measured in lines).height must be an even integer."""L[]lines/bs\\for i in range(1,int(height/21)):aline.rjust(i,s).ljust(i*2,b…

3.操作系统接口与系统调用

【README】 本文内容总结自 《操作系统-哈工大李治军老师》&#xff0c;内容非常棒&#xff0c;墙裂推荐&#xff1b; 【1】操作系统接口 0&#xff09;用户使用计算机3种方式&#xff1a; 命令行&#xff1b; 命令行执行 hello world图形界面&#xff1b;如计算机磁盘浏览器…