【4.1】flink窗口算子的trigger触发器和Evictor清理器

【README】

本文记录了 窗口算子的触发器trigger和 evictor清理器;

  • trigger触发器:决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理;
  • evictor清理器: evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素;

【1】触发器trigger

1)Trigger 接口提供了五个方法来响应不同的事件

  1. onElement() 方法在每个元素被加入窗口时调用。
  2. onEventTime() 方法在注册的 event-time timer 触发时调用。
  3. onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  4. onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  5. 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。

2)有两点需要注意:

  • 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:
    • CONTINUE: 什么也不做
    • FIRE: 触发计算
    • PURGE: 清空窗口内的元素
    • FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素
  • 上面的任意方法都可以用来注册 processing-time 或 event-time timer。

3)触发(Fire)与清除(Purge)
当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE


【1.1】触发器代码

1)以滑动计数窗口为例

DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream.keyBy(SensorReadingWindow::getId)// 按照id 分组.countWindow(5, 2) // 滑动计数窗口

countWindow() 如下:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}
CountTrigger.of() 就是返回计数触发器;
public static <W extends Window> CountTrigger<W> of(long maxCount) {return new CountTrigger(maxCount);}

 其中 maxCount等于滑动步长2,CountTrigger计数触发器定义如下:

 对于 onElement方法,当 计数值大于 maxCount时,则触发生成新窗口;而maxCount等于滑动步长2 ;具体代码如下:

public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);count.add(1L);if ((Long)count.get() >= this.maxCount) {count.clear();return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}

即,对于 7个元素的流,会在第2 , 4, 6 个元素生成新窗口;因为滑动步长为2;


 

【2】清理器 evictor 

1)定义:Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。

2)Evictor 接口提供了两个方法实现此功能:

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。

(图1 CountEvictor类定义)

3)清理器分类

Flink 内置有三个 evictor:

  1. CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除
  2. DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。
  3. TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

 

【2.1】代码示例

对于 countWindow 计数窗口方法,其使用了 Evictor,如下:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}

CountEvictor 定义见图1;其长度为 窗口大小;


【3】滑动计数窗口代码

1)滑动计数窗口参数

  • 窗口大小:5;
  • 滑动步长:2,即生成新窗口频率,每2条数据就会生成一个新窗口;

2)代码

/*** @Description 滑动计数窗口算子* @author xiao tang* @version 1.0.0* @createTime 2022年04月17日*/
public class WindowTest2_CountWindow {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket读取数据DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWindow.txt");// 转换为 SensorReader pojo类型DataStream<SensorReadingWindow> sensorStream = fileStream.map(x -> {String[] arr = x.split(",");return new SensorReadingWindow(arr[0], arr[1], new BigDecimal(arr[2]));});// 滑动计数窗口,进行增量聚合来计算温度均值(采用 Tuple2 )DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream.keyBy(SensorReadingWindow::getId)// 按照id 分组.countWindow(5, 2) // 滑动计数窗口.aggregate(new AggregateFunction<SensorReadingWindow, Tuple3<String, BigDecimal, Integer>, Tuple2<String, BigDecimal>>() {@Overridepublic Tuple3<String, BigDecimal, Integer> createAccumulator() {return new Tuple3<>("", BigDecimal.ZERO, 0); // 初始值}@Overridepublic Tuple3<String, BigDecimal, Integer> add(SensorReadingWindow sensorReading, Tuple3<String, BigDecimal, Integer> accumulator) {return new Tuple3<>(sensorReading.getId(), sensorReading.getTemperature().add(accumulator.f1).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f2+1);}@Overridepublic Tuple2<String, BigDecimal> getResult(Tuple3<String, BigDecimal, Integer> accumulator) {return new Tuple2<>(accumulator.f0, accumulator.f1.divide(new BigDecimal(accumulator.f2)).setScale(2,BigDecimal.ROUND_HALF_UP));}@Overridepublic Tuple3<String, BigDecimal, Integer> merge(Tuple3<String, BigDecimal, Integer> a, Tuple3<String, BigDecimal, Integer> b) {return new Tuple3<String, BigDecimal, Integer>(a.f0, a.f1.add(b.f1), a.f2 + b.f2);}});// 打印windowAggStream.print("slideCountWindowAggStream");// 执行env.execute("slideCountWindowAggStreamJob");}
}

sensor文本:

sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7

打印结果:

slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)

3)分析:

sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 ;

sensor2 的温度是 36.2、36.4 ;

所以 sensor2 的温度均值是 36.3 ,打印结果没有问题

对于 sensor1 的温度均值, 又每2个生成一个新窗口,则 第1个窗口的均值是 数据 36.1 和 36.3  的均值;

第2个窗口的均值是数据{36.1  36.3  36.5 36.6 } 4个数据的均值(四舍五入);


【4】 滑动计数窗口代码变体

把sensor文本修改为:(多了一条数据,第8条数据)

sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7
sensor1,2022-04-17 22:07:07,36.7

打印结果为:

slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)
slideCountWindowAggStream> (sensor1,36.56)

分析 【4】与【3】的源代码相同,但输入数据sensor和打印结果不同,原因如下:

【4】有8条数据:按照key分组如下:

sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 、36.7;
sensor2 的温度是 36.2、36.4 ;

sensor2的数据没变,下文只讨论 sensor1的数据;又每2个生成一个新窗口,则窗口列表如下:

  • 第1个窗口的均值是 数据 36.1 和 36.3  的均值 36.2;
  • 第2个窗口的均值是 数据 36.1 ,36.3,36.5, 36.6  的均值 36.375的四舍五入值 36.38;
  • 第3个窗口的均值是 数据  36.3,36.5, 36.6 , 36.7, 36.7  的均值 36.56  的四舍五入值 36.56;(注意,第3个窗口的数据项不是 36.1, 36.3,36.5, 36.6 , 36.7, 36.7,因为窗口大小为5,包不下6个数字

【小结】

经过以上分析,本文应该是把窗口计算规则讲明白了;

在进行窗口聚合时,需要关注2个参数,即 窗口大小,滑动步长

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329554.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ue4 运行禁用鼠标_[UE4] VS code使用LuaPanda断点调试

luaPanda安装搜索luapanda 点击下载安装或者打开下面的地址&#xff0c;点击会提示打开vscode进行安装luaPanda下载地址&#xff1a;https://marketplace.visualstudio.com/items?itemNamestuartwang.luapanda&ssrfalse#review-details在gethbub中下载LuaPanda.lua文件&am…

Oracle入门(五B)之desc命令

翻译自 DESCRIBE describe命令 列出指定表的列定义&#xff0c;视图或同义词&#xff0c;或指定函数或存储过程的详述。 语法&#xff1a;desc[ribe] {[模式.]对象[链接串]} 模式 表示对象驻留的架构。如果省略架构&#xff0c;SQL*Plus假定拥有自己的对象。 对象 表示要描…

【4】flink window窗口算子

【README】 本文记录了 窗口算子操作&#xff1b;本文使用的flink为 1.14.4 版本&#xff1b;本文部分内容总结自 flink 官方文档&#xff1a; 窗口 | Apache Flink窗口 # 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中…

Microsoft将在UWP上支持React Native,同时为VS Code添加工具软件

Microsoft和Facebook日前宣布React Native的下一个目标平台是Universal Windows Platform&#xff08;UWP&#xff09;。 对于已经在多个设备平台上使用React Native的开发者来说&#xff0c;这是个好消息&#xff0c;因为这让他们可以在Windows上重用组件。对于UWP的开发者来说…

Xamarin Evolve 2016 Keynote回顾

编者语&#xff1a;距离上一次Xamarin Evolve 大会足足有两年时间了&#xff0c;这两年整个行业都在变化&#xff0c;Xamarin是整个.NET行业的表表者。两年过去Xamarin终于并入微软&#xff0c;免费了&#xff0c;也开源了。还有什么大招呢&#xff1f;刚结束的Xamarin Evolve给…

【5】flink窗口与水位线watermark例子

【README】 0&#xff09;本文编写了多个flink水位线watermark的代码例子&#xff0c;加深对watermark的理解 &#xff1b; 1&#xff09;时间分类 Event Time&#xff1a; 事件创建的时间&#xff08;事件发生时间&#xff09;&#xff1b;Ingestion Time&#xff1a;数据进…

技术人生,专家本色——采访张善友老师后的一点感受

距离Xamarin Evolve开幕还有不到三个小时。Miguel deIcaza和他的团队又将发布周年升级平台Xamarin 5。作为一个关注Mono/Xamarin多年的老粉丝&#xff0c;这个时候心情是蛮激动的。我想&#xff0c;刚刚作客.NET FM第五期“来者何人”专访系列的张善友老师&#xff0c;一定也是…

(翻)为什么要训练人工神经网络

【README】 本文翻译自 https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73 【1】介绍 今天我将从非常简短的神经网络介绍开始&#xff0c;这足以理解我将要讨论的…

使用VS Code开发 调试.NET Core 应用程序

使用VS Code开发 调试.NET Core RC2应用程序,由于.NET Core 目前还处于预览版。 本文使用微软提供的示例进行开发及调试。 https://github.com/aspnet/cli-samples .NET Core 介绍及说明&#xff1a; https://github.com/dotnet/cli http://dotnet.github.io/getting-started/ …

(转)简单神经网络(权值阈值训练)

本文转自&#xff1a; 神经网络入门 - 阮一峰的网络日志https://www.ruanyifeng.com/blog/2017/07/neural-network.html 眼下最热门的技术&#xff0c;绝对是人工智能。 人工智能的底层模型是"神经网络"&#xff08;neural network&#xff09;。许多复杂的应用&am…

Oracle入门(五A)之conn命令

一、connect命令将给定的用户名连接到Oracle数据库。当你运行一个连接命令、站点配置文件、Galgn.SQL和用户配置文件&#xff0c;按顺序处理Login .SQL。连接不重发如果初始连接不成功&#xff0c;请使用用户名或密码。语法&#xff1a; conn[ect] [{登录串|/|代理串} [AS…

2-操作系统启动

【README】 本文内容总结自 《操作系统-哈工大李治军老师》&#xff0c;内容非常棒&#xff0c;墙裂推荐&#xff1b; 【1】概述 1&#xff09;问题&#xff1a;操作系统启动时&#xff0c;它应该做什么事情&#xff1f; 计算机启动时&#xff0c;需要把存储在磁盘上的操作系…

Asp.net 面向接口可扩展框架之类型转化基础服务

新框架正在逐步完善,可喜可贺的是基础服务部分初具模样了,给大家分享一下 由于基础服务涉及面太广,也没开发完,这篇只介绍其中的类型转化部分,命名为类型转化基础服务,其实就是基础服务模块的类型转化子模块 说到类型转化必须要弄清楚.net的类型,类型都不清楚何来类型转化 1、P…

asset文件夹路径 unity_我们来捣鼓一下Unity的平台跳跃Microgame

作者&#xff1a;Truly大家好。Platformer Microgame是Unity官方的一个2D平台跳跃游戏项目模板&#xff0c;小伙伴们可以在这个模板的基础上修改或者制作自己的游戏。下边我们就来简单试玩下这个工程吧~一、资源导入写文章时&#xff0c;该资源暂时还不支持Unity 2019.x版本&am…

Oracle入门(五)之基本命令操作

一、show &#xff08;1&#xff09;查询Oracle系统变量值 语法&#xff1a;show 变量名 show all --查看所有68个系统变量值 show user --显示当前连接用户 注&#xff1a;Oracle的68个系统变量的key和默认value &#xff08;2&#xff09;查询oracle数据库当前的参数值 …

Linux工程师新法宝:在Visual Studio上用C++ 写Linux

如今我们正在开发一个新的插件&#xff0c;一个能够让开发者在 Visual Studio (以下简称 VS ) 上建构能够在 Linux 上运行 C 程序的套件。开发者可以借由这个插件将 C 程序移转到 Linux 服务器、PC 以及移动设备上&#xff0c;也同时可以借由这个插件将这些机器连结至你的 VS 上…

互联网+大赛作品_“颂中国力量 绘美好梦想”全市中小学生互联网+书画大赛作品展示(五)...

主办平顶山市教育体育局承办平顶山市教育体育局关心下一代工作委员会平顶山教育电视台教育部中国书画等级考试平顶山市招生管理办公室协办平顶山市书法家协会平顶山市美术家协会一比赛宗旨作品以中国战胜疫情为背景&#xff0c;以书法绘画为表现形式&#xff0c;真实记录全市中…

(转)Spring Boot启动过程 和 Bean初始化过程中的拓展接口详解

转自&#xff1a; Spring Boot启动过程 和 Bean初始化过程中的拓展接口详解_guyue35的博客-CSDN博客Spring Boot启动过程和 Bean初始化过程中的拓展接口详解1.背景 Spring的核心思想就是容器&#xff0c;当容器refresh的时候&#xff0c;外部看上去风平浪静&#xff0c;其实内…

Oracle入门(七B)之表空间删除数据文件未删除

转载自 oracle表空间删除数据文件未删除清理老旧数据的时候,不太熟悉操作,直接把表空间删了,未删除数据文件, HIS20170927此表空间已删除但数据文件还在,这时新创建一个表空间直接指定到这个数据文件上. create tablespace HIS20170927 datafile /oracle/oradata/****/HIS20170…

玩玩Xamarin Evolve 2016带来的新特性(一)-iOS Simulator(for Windows)

编者语&#xff1a; Xamarin Evolve 2016给不少.NET社群的朋友带来了强心剂&#xff0c;做.NET还是有前途啊&#xff0c;特别在微软开源的策略上。我会在五一假期和大家谈谈Xamarin中大家比较关注的iOS Windows模拟器&#xff0c;Xamarin Forms Previewer,还有Xamarin WorkBook…