详解 Flink 的 window API

一、window 概述

​ Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 Flink window 是一种将无限数据切割为有限块进行处理的手段。window 是无限数据流处理的核心, window 将一个无限的 stream 拆分成有限大小的 ”buckets” 桶,然后可以在这些桶上做计算操作

二、window 类型

1. Time Window

时间窗口,按照时间生成 Window

1.1 Tumbling Time Window

滚动时间窗口

在这里插入图片描述

  • 将数据依据固定的窗口长度(时间)对数据进行切片
  • 特点:时间对齐,窗口长度固定,没有重叠
  • 重要参数:窗口长度(时间值)
  • 适用场景:适合做 BI 统计等(做每个时间段的聚合计算)
1.2 Sliding Time Window

滑动时间窗口

在这里插入图片描述

  • 滑动时间窗口由固定的窗口长度和滑动间隔组成
  • 特点:时间对齐,窗口长度固定,可以有重叠,数据最大的重叠数 = 窗口长度/滑动间隔
  • 重要参数:窗口长度和滑动间隔(时间值)
  • 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)
1.3 Session Window

会话时间窗口

在这里插入图片描述

  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口
  • 特点:时间无对齐
  • 重要参数:会话最小时间间隔

2. Count Window

计数窗口,按照指定的数据条数生成一个 Window,与时间无关

2.1 Tumbling Count Window

滚动计数窗口

  • 将数据依据固定的窗口长度(计数)对数据进行切片
  • 特点:计数对齐,窗口长度固定,没有重叠
  • 重要参数:窗口长度(计数值)
2.2 Sliding Count Window

滑动计数窗口

  • 滑动计数窗口由固定的窗口长度和滑动间隔组成
  • 特点:计数对齐,窗口长度固定,可以有重叠,数据最大的重叠数 = 窗口长度/滑动间隔
  • 重要参数:窗口长度和滑动间隔(计数值)

三、window API 操作

在这里插入图片描述

1. Window 创建

1.1 非按键分区流

原始的 DataStream 调用 windowAll() 方法创建的窗口只能在一个任务(task)上执行,相当于并行度变成了 1,生产上不建议使用

AllWindowedStream stream = dataStream.windowAll()
1.2 按键分区流

Window 的创建推荐是 DataStream 经过 KeyBy 之后调用 window() 方法

在这里插入图片描述

/**通用开窗方法:WindowedStream<T> window()参数:WindowAssignerFlink 提供的通用 WindowAssigner:1.滚动窗口(tumbling window)2.滑动窗口(sliding window)3.会话窗口(session window)4.全局窗口(global window)
*/
public class TestWindowCreate {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口//1.滚动时间窗口//1.1 使用通用 window() 方法dataStream.keyBy("id").window(TumblingProcessTimeWindows.of(Time.seconds(5)));//1.2 使用 timeWindow() 方法dataStream.keyBy("id").timeWindow(Time.seconds(5));//2.滑动时间窗口//2.1 使用通用 window() 方法dataStream.keyBy("id").window(SlidingProcessTimeWindows.of(Time.seconds(6), Time.seconds(2)));//2.2 使用 timeWindow() 方法dataStream.keyBy("id").timeWindow(Time.seconds(6), Time.seconds(2));//3.会话窗口dataStream.keyBy("id").window(EventTimeSessionWindows.withGap(Time.minutes(1)));//4.计数窗口//4.1 滚动计数窗口dataStream.keyBy("id").countWindow(10L);//4.2 滑动计数窗口dataStream.keyBy("id").countWindow(10L, 2L);env.execute();}
}

2. Window 函数

window function 定义了要对窗口中收集的数据做的计算操作

2.1 增量聚合函数

incremental aggregation functions,每条数据到来就进行计算,保持一个简单的状态,窗口结束时输出最终的状态。简单的 sum/max/maxBy/min/minBy 聚合函数都是增量聚合

2.1.1 ReduceFunction
/**方法签名:reduce(ReduceFunction<T> reduce)注意:ReduceFunction 的类型 T 不能改变
*/
public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<SenesorReading>() {@Overridepublic SenesorReading reduce(SenesorReading value1, SenesorReading value2) throws Exception {return value2;}}).print();env.execute();}
}
2.1.2 AggregateFunction
/**方法签名:aggregate(AggregateFunction<IN, ACC, OUT> aggregate)AggregateFunction 的 3 个泛型:1.IN:输入数据类型2.ACC:中间累加器的数据类型3.OUT:输出数据类型AggregateFunction 接口中需要实现的 4 个方法:1.createAccumulator():创建一个累加器,即为聚合创建了一个初始状态,每个聚合任务只会调用一次2.add():将输入的元素添加到累加器中。基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value 和当前的累加器accumulator;返回一个新的累加器值,是对聚合状态进行更新。每条数据到来之后都会调用这个方法3.getResult():从累加器中提取聚合的输出结果。可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如计算平均值,可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用4.merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging  Window)的场景就是会话窗口(Session Windows)
*/
public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).aggregate(new AggregateFunction<SenesorReading, Integer, Integer>() {@Overridepublic Integer createAccumulator() { return 0;}@Overridepublic Integer add(SenesorReading value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}).print();env.execute();}
}
2.2 全窗口函数

full window functions,先收集窗口中的每一条数据,并在内部缓存起来,等到窗口要输出结果的时候再将所有数据进行计算并输出

2.2.1 WindowFunction
/**方法签名:apply(WindowFunction<IN, OUT, KEY, W extends Window> window)泛型:1.IN:输入数据类型2.OUT:输出数据类型3.KEY:分组 key 的类型4.W:窗口的类型需要实现的方法:void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out)1.key:分区的 key2.window:当前窗口信息3.input:窗口所有数据的可迭代集合4.out:数据收集器
*/
public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).apply(new WindowFunction<SenesorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple key, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception { String id = key.getField(0);Long windowEnd = window.getEnd();Integer count = IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}}).print();env.execute();}
}
2.2.2 ProcessWindowFunction
/**方法签名:process(ProcessWindowFunction<IN, OUT, KEY, W extends Window> window)泛型:1.IN:输入数据类型2.OUT:输出数据类型3.KEY:分组 key 的类型4.W:窗口的类型需要实现的方法:void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out)1.key:分区的 key2.context:上下文环境对象3.input:窗口所有数据的可迭代集合4.out:数据收集器
*/
public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).process(new ProcessWindowFunction<SenesorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {@Overridepublic void process(Tuple key, Context context, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception { String id = key.getField(0);Long windowEnd = context.window().getEnd();Integer count = IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}}).print();env.execute();}
}

3. 其他可选 API

3.1 trigger

触发器主要是用来控制窗口什么时候触发计算,即执行窗口函数

/**参数:Trigger 抽象类内置实现类:EventTimeTrigger、ProcessingTimeTrigger 和 CountTrigger 等自定义实现类:继承 Trigger 抽象类并重写方法1.onElement():窗口中每到来一个元素,都会调用这个方法2.onEventTime():当注册的事件时间定时器触发时,将调用这个方法3.onProcessingTime():当注册的处理时间定时器触发时,将调用这个方法4.clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
*/
trigger(Trigger<> trigger)
3.2 evictor

移除器主要用来定义移除某些数据的逻辑

/**参数:Evictor 接口实现方法:1.evictBefore():定义执行窗口函数之前的移除数据操作2.evictAfter():定义执行窗口函数之后的以处数据操作注意:默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
*/
evictor(Evictor evictor)
3.3 allowedLateness

允许延迟的数据,设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算更新结果。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口

/**方法签名
*/
allowedLateness(Time time)
3.4 sideOutputLateData

将迟到的数据放入侧输出流,可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据

/**参数:OutputTag 输出标签,用来标记分支的迟到数据流
*/
sideOutputLateData(OutputTag<T> outputTag)//实例化方式:
OutputTag<String> outputTag = new OutputTag<String>("late") {};//提取侧输出流方法:由执行完所有窗口函数后得到的 DataStream 调用
getSideOutput(OutputTag<T> outputTag)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/849907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java 新特性系列】Java 8 Optional 类完全指南

Optional 是 Java8 提供的为了解决 null 安全问题的一个 API。善用 Optional 可以使我们代码中很多繁琐、丑陋的设计变得十分优雅。 文章目录 1、Optional 类概述1.1、Optional 类介绍1.2、使用 Optional 的前后对比1.2.1、不使用 Optional1.2.2、使用 Optional 2、Java 8 中 O…

信奥之路(五)——顺序结构

** 以顺序、选择和循环三种基本结构的组合来描述程序&#xff0c;是结构化程序设计方法的主要特征之一。每条语句按照自上而下的顺序依次运行一次&#xff0c;这种自上而下依次执行的程序称为顺序结构程序。 ** 1.题目描述 爸爸妈妈让小明去附近的超市买一些玉米回来。爸爸…

单片机原理及技术(三)—— AT89S51单片机(二)(C51编程)

一、AT89S51单片机的并行I/O端口 1.1 P0口 AT89S51的P0口是一个通用的I/O口&#xff0c;可以用于输入和输出。每个引脚都可以通过软件控制为输入或输出模式。 1.1.1 P0口的工作原理 P0口的工作原理是通过对P0寄存器的读写操作来控制P0口的引脚。 输出模式&#xff1a;当P0口…

python --监听鼠标事件

import pyautogui from pynput import mouse, keyboardpyautogui.FAILSAFE Falseclass MouseMonitor:def __init__(self):self.mouse mouse.Controller()self.lock Truedef on_move(self, x, y):鼠标位移时回调函数if self.lock:print(11)pyautogui.moveTo(500, 500)self.loc…

C++ 纯虚函数 virtual = 0

上代码&#xff0c;看一下下面类的封装&#xff1a; class BlockerBase {public:virtual ~BlockerBase() default;virtual void Reset() 0;virtual void ClearObserved() 0;virtual void ClearPublished() 0;virtual void Observe() 0;virtual bool IsObservedEmpty() co…

UI学习笔记(一)

UI学习 一&#xff1a;UIView基础frame属性隐藏视图对象&#xff1a;UIView的层级关系 二&#xff1a;UIWindow对象三&#xff1a;UIViewController基础UIViewController使用 四&#xff1a;定时器与视图移动五&#xff1a;UISwitch控件六&#xff1a;滑动条和进度条七&#xf…

VMware给没安装VMTools的系统封装ISO以送入文件

VMware给没安装VMTools的系统封装ISO以送入文件&#xff0c;其实不需要其它工具 VMware自带mkisofs 2.01&#xff0c;不过mkisofs 2.01已经停止更新&#xff0c;最大的问题是不支持中文&#xff0c;也不支持UEFI引导记录&#xff0c;但一般已经够用了&#xff0c;除此之外还可…

跨库数据同步 SYNC data

1.exp imp 由于DBV行不通 2. 直接使用pl/sql 的导入导出&#xff0c;导出insert脚本肯定不现实&#xff0c;导成专用pde文件&#xff0c;发现24小时只能导入1000多万表&#xff08;基本每两三分钟10000&#xff09;。 3.使用expdp impdp, 遇到的问题&#xff08;imperva会拦截…

2021年vue面试题整理(万字解析)

一、对MVVM的理解 MVVM分为Model、View、ViewModel。 Model 代表数据模型&#xff0c;数据和业务逻辑都在Model层中定义&#xff1b;泛指后端进行的各种业务逻辑处理和数据操控&#xff0c;对于前端来说就是后端提供的 api 接口。 View 代表UI视图&#xff0c;负责数据的展示…

默认launcher

目录 前提代码 前提 刷机后开机提示选择launcher应用&#xff0c;此时设备中有至少两个apk配置有属性&#xff0c;想要开机自动进入launcher,可以通过修改ResolverActivity,在开机时默认选择指定的launcher程序 代码 //frameworks/base/core/java/com/android/internal/app/…

【Python数据分析--Numpy库】Python数据分析Numpy库学习笔记,Python数据分析教程,Python数据分析学习笔记(小白入门)

一&#xff0c;Numpy教程 给大家推荐一个很不错的笔记&#xff0c;个人长期学习过程中整理的 Python超详细的学习笔记共21W字点我获取 1-1 安装 1-1-1 使用已有的发行版本 对于许多用户&#xff0c;尤其是在 Windows 上&#xff0c;最简单的方法是下载以下的 Python 发行版…

java 8 新特性CompletableFuture使用

准备工作&#xff1a;定义一个线程池 ExecutorService pool Executors.newFixedThreadPool(3,(Runnable r)->{Thread tnew Thread(r);t.setDaemon(true);return t;});一、执行方式 1、对于有返回值的 CompletableFuture<String> futureCompletableFuture.supplyAsync…

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)

概览 在任何语言中对序列(或集合)元素的排序无疑是一种司空见惯的常规操作,在 Swift 语言里自然也不例外。序列排序看似简单,实则“暗藏玄机”。 要想真正掌握 Swift 语言中对排序的“各种姿势”,我们还得从长计议。不如就先从最简单的排序基本功开始聊起吧。 在本篇博…

【十大排序算法】插入排序

插入排序&#xff0c;如一位细心的整理者&#xff0c; 她从序列的左端开始&#xff0c; 挨个将元素归位。 每当她遇到一个无序的元素&#xff0c; 便将它插入已经有序的部分&#xff0c; 直至所有元素有序排列。 她不张扬&#xff0c;却有效率&#xff0c; 用自己的方式&…

探索HTML5新Input类型:重塑表单交互的未来

随着HTML5标准的演进&#xff0c;表单设计迎来了重大革新&#xff0c;其中最引人注目的莫过于一系列新的input类型。这些新类型不仅简化了前端开发&#xff0c;提升了用户体验&#xff0c;还增强了网页表单的数据验证能力。然而&#xff0c;值得注意的是&#xff0c;不同浏览器…

什么是umi

UMI&#xff08;Umi Next.js Inspired Middleware&#xff09;是一个基于 React 的企业级前端应用框架&#xff0c;由阿里巴巴团队开发和维护。UMI 框架结合了 React、Webpack、Babel、Dva&#xff08;一个基于 Redux 和 redux-saga 的数据流方案&#xff09;等主流前端技术&am…

pdf文件在线压缩网站,pdf文件在线压缩工具软件

在数字化时代的今天&#xff0c;PDF文件已经成为我们日常生活和工作中不可或缺的一部分。然而&#xff0c;随着PDF文件的广泛使用&#xff0c;其文件大小问题也日益凸显。过大的PDF文件不仅占用了大量的存储空间&#xff0c;而且在传输和共享过程中也往往面临诸多不便。因此&am…

SylixOS网卡多 IP 配置

概述 网卡多 IP 是指在同一个网络接口上配置和绑定多个 IP 地址。 引进网卡多 IP 的目的主要有以下几个&#xff1a; 提供服务高可用性。通过在同一接口绑定多个 IP 地址&#xff0c;然后在服务端使用这些 IP 地址启动多个服务实例。这样在任意一 IP 出现问题时&#xff0c;可…

Redis学习(十二)Redis的三种删除策略

目录 一、背景二、Redis 的三种删除策略2.1 定时删除&#xff08;用CPU换内存空间&#xff09;2.2 定期删除2.3 惰性删除&#xff08;用内存换CPU性能&#xff09; 三、总结 一、背景 我们都知道 Redis 是一种内存数据&#xff0c;所有的数据均存储在内存中&#xff0c;可以通…

Android 代码打印meminfo

旨在替代adb shell dumpsys meminfo packageName&#xff0c;在log打印meminfo&#xff0c;以便分析内存情况 ActivityManager.MemoryInfo memoryInfo new ActivityManager.MemoryInfo(); activityManager.getMemoryInfo(memoryInfo); long totalMemory Runtime.getRuntime(…