【README】
0)本文编写了多个flink水位线watermark的代码例子,加深对watermark的理解 ;
1)时间分类
- Event Time: 事件创建的时间(事件发生时间);
- Ingestion Time:数据进入flink的实际;
- Processing Time:执行算子的本地机器时间 ;
我们主要讨论的是 事件时间;
2)flink窗口分为 滚动窗口,滑动窗口, 本文使用了 滚动窗口;
- 滚动窗口: 只有1个参数,窗口长度与窗口步长(窗口创建频率)相等;
- 滑动窗口:有2个参数,即窗口长度,窗口步长;可以手动设置,可以相等也可以不等;
3)本文结合代码示例讲了 水位线, 窗口,窗口属性 lateness 延迟属性, 窗口流的 siteOutputLateData 侧输出流(旁路输出),及其它们的作用;
【1】水位线
1)定义(本文自定义总结,非官方):水位线 watermark,指的是 flink底层在数据流中添加的带有时间戳的数据,当这些水位线数据到达算子时(如窗口算子),算子会认为 小于水位线的业务数据都来了;(数据可以理解为 一条日志,或温度传感器采集的温度信息)
2)作用: 水位线可以用来处理无序数据流;(下文代码例子会给出);
3)如何产生水位线?
- 指定水位线的时间戳如何获取? 可以指定 水位线时间戳从业务数据(抽象为javabean)的某个属性获取;
- 指定水位线可以延迟多长时间,即允许无序数据最多可以晚来多长时间;(超过这个时间会被丢弃)
【1.1】事件迟到被丢弃
1)建立一个 10s 滚动窗口算子(每10s新开一个长度为10s的窗口),水位线取温度bean的时间戳,且延迟 0 秒,如下:
其中 窗口用于收集id号码,即属于同一个窗口的元素的id会被收集到一起;
public class WindowTest3_EventTimeWatermarkWindow3 {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket读取数据,数据格式参见 sensorTimeWatermarkWindow.txt
// DataStream<String> textStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWatermarkWindow.txt");// nc -lk 7777 DataStream<String> textStream = env.socketTextStream("192.168.163.201", 7778);// 转换为 SensorReader pojo类型DataStream<SensorReadingTimeWatermarkWindow> sensorStream = textStream.map(x -> {String[] arr = x.split(",");return new SensorReadingTimeWatermarkWindow(arr[0], arr[1], arr[2], new BigDecimal(arr[3]));});// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是时间时间SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));// 开窗聚合SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {@Overridepublic String createAccumulator() {return "";}@Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s + ", " + sensorReadingTimeWatermarkWindow.getId();}@Overridepublic String getResult(String s) {return s;}@Overridepublic String merge(String s, String acc1) {return s + ", " + acc1;}});// 打印aggForWindowStream.print("aggForWindowStream");// 执行env.execute("aggForWindowStream");}
}
上述代码中的水位线的延迟时间为0s,即水位线时间戳等于事件时间戳;
元素抽象为 传感器信息bean,如下:
public class SensorReadingTimeWatermarkWindow {private String id;private String type;private Timestamp timestamp;private BigDecimal temperature;public SensorReadingTimeWatermarkWindow() {}public SensorReadingTimeWatermarkWindow(String id, String type, String timeStr, BigDecimal temperature) {this.id = id;this.type = type;this.temperature = temperature;this.parseTimestamp(timeStr);}private void parseTimestamp(String timeStr) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {this.timestamp = new Timestamp(simpleDateFormat.parse(timeStr).getTime());} catch (ParseException e) {this.timestamp = new Timestamp(System.currentTimeMillis());}}
}
接收的是 nc 客户端的socket文本流,窗口算子计算结果如下:
详情如下:
1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 -> 1, 7, 8, 11 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 16,sensor1,2022-04-17 22:07:23,36.9 -> 12,15
【结果分析】
- 发现1:当事件12(id=12)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件12的时间戳=22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11】;
- 发现2: 当事件16(id=16)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件16的时间戳=22:07:23,这个时间戳大于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;
- 发现3:事件13没有更新水位线,因为水位线必须单调递增(事件12发生时的水位线是22:07:11,事件13的时间戳是22:07:09,所以事件13发生时不会更新水位线);
问题来了: 事件13去哪里了? 被 flink 丢弃了,因为事件13迟到了;
- 如何理解事件迟到了: 因为事件12 的时间戳为 22:07:11,又水位线延迟0s,所以水位线的 时间戳也是 22:07:11,这大于窗口结束时间,所以窗口关闭并计算结果,窗口关闭后,事件13才来,因此被丢弃。
【补充】窗口范围是左闭右开;如上图,第1个窗口的范围是 [0,10),第2个窗口是 [10,20)
【1.2】 事件迟到但被正常处理
1)修改上述水位线代码, 设置延迟时间为5s,重新录入上述数据,结果如下:
// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是事件时间
SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 水位线延迟时间修改为 5s.withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));
1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 -> 1, 7, 8, 11, 13 16,sensor1,2022-04-17 22:07:23,36.9 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15
【结果分析】
- 发现1:事件13,事件21 不会更新水位线时间戳,原因上文已经解释过了;
- 发现2:当事件15(id=15)出现时,因水位线延迟时间为5s,所以水位线等于事件15的时间戳减去5s = 22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11,13】;
- 发现3:事件13没有被丢弃,因为水位线延迟了5s,窗口在事件15发生时才关闭,所以可以探测到事件13,这也阐述了为啥 flink水位线可以处理无序数据的原理,flink的设计者的水位线idea真的很棒(对比来看,【1.1】中的例子事件13被丢弃);
- 发现4:当事件22(id=22)出现时,因水位线延迟时间为5s,所以水位线等于事件22的时间戳减去5s = 22:07:20,这个时间戳大于等于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;(大于等于窗口结束时间,窗口就被关闭,因为窗口范围是左开右闭)
【2】窗口的 lateness 延迟属性
此外,窗口还有 lateness 属性,表示延迟多长时间关闭窗口;
如下面代码每10s 创建一个长度为12s的窗口; (如果没有 lateness参数或其为0的话, 就是 每10s 创建一个长度为10s的窗口)
代码修改如下:
SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许窗口延迟 2 秒后关闭窗口
窗口算子计算结果如下:
详情如下:
1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9 被丢弃 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15, 17, 19
【结果分析】
- 事件15发生时:因水位线延迟5s,所以水位线时间戳=22:07:15-5s=22:07:10,等于第1个窗口的结束时间,故第1个窗口计算,结果为 【1, 7, 8, 11, 13】,但窗口没有关闭,因为lateness为2s,延迟2秒关闭,即当水位线大于等于 22:07:12 时,窗口关闭;
- 事件16发生时:第1个窗口因为 lateness=2s 没有关闭,又事件16时间戳=22:07:09,所以还是参与窗口1的计算,输出结果【1, 7, 8, 11, 13, 16】;
- 事件17发生时:时间戳=22:07:16,水位线时间戳=22:07:11,这小于带lateness=2s的窗口1的关闭时间 22:07:12,所以窗口1还是不会关闭;
- 事件18发生时:时间戳=22:07:09, 因水位线单调递增,故不变,还是22:07:11;事件18参与窗口1的计算,结果为 【1, 7, 8, 11, 13, 16, 18】
- 事件19发生时:时间戳=22:07:17,水位线=22:07:12,等于带lateness=2s的窗口1的关闭时间,窗口1关闭;
- 事件20发生时:时间戳=22:07:09,落入了窗口1的范围(22:07:00~22:07:10),但因窗口1已经关闭,所以事件20被丢弃;
通过以上示例,本文应该是把窗口的lateness属性 讲清楚了;
【问题】 事件20被丢弃的话, 不满足业务场景对数据一致性的要求;
- 因为服务1发送了10条数据,到达服务2的时候却只有9条数据,这不满足业务需求,是开发团队不愿意看到的事情;那如何找回这些被丢弃的事件呢?通过旁路输出;
【3】如何收集迟到数据
从旁路输出(side output)获取迟到数据;
通过 Flink 的 旁路输出 功能,可以获得迟到数据的数据流。
首先,需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明需要把迟到数据存入 旁输出流。
代码修改如下:添加旁路输出流(侧输出流)
// 侧输出流,对于延迟的且没有进入窗口的数据,放到侧输出流(旁路输出流)OutputTag<SensorReadingTimeWatermarkWindow> lateOutputTag = new OutputTag<SensorReadingTimeWatermarkWindow>("late") {};// 开窗聚合SingleOutputStreamOperator<String> aggForWindowStream =streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟 2 秒后关闭窗口.sideOutputLateData(lateOutputTag) // 无法进入窗口,则进入侧输出流.aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {@Overridepublic String createAccumulator() {return "";}@Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s + ", " + sensorReadingTimeWatermarkWindow.getId();}@Overridepublic String getResult(String s) {return s;}@Overridepublic String merge(String s, String acc1) {return s + ", " + acc1;}});// 打印窗口算子结果aggForWindowStream.print("aggForWindowStream");// 打印旁输出流aggForWindowStream.getSideOutput(lateOutputTag).print("lateOutputTag");// 执行env.execute("aggForWindowStream");
事件发生详情如下:
1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9 -> lateOutputTag> SensorReadingTimeWindow{id='20', type='sensor1', timestamp=2022-04-17 22:07:09.0, temperature=36.9}
结果分析:
- 相比于【2】中代码示例, 事件20被丢弃了;而【3】中代码,当事件20出现时,由于窗口已经关闭,但存在侧输出流(旁路输出),所以事件20 存入侧输出流(解决了乱序数据迟到事件过长导致数据不一致的问题);相反如果没有侧输出流,则事件20会被丢弃;