之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。
注意:由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19。
代码参考:https://github.com/forever1986/flink-study.git
目录
- 1 窗口定义及分类
- 1.1 定义
- 1.2 窗口分类
- 1.3 计算函数
- 2 代码演示
- 2.1 reduce、aggregate&process演示
- 2.1.1 reduce方法演示
- 2.1.2 aggregate方法演示
- 2.1.3 process方法演示
- 2.1.4 结合使用
- 2.2 时间窗口演示
- 2.2.3 时间滚动窗口
- 2.2.2 时间滑动窗口
- 2.2.3 时间会话窗口
- 2.3 计数窗口演示
- 2.3.1 计数滚动窗口
- 2.3.2 计数滑动窗口
前面几章对Flink从数据输入到中间计算,最后到数据输出的整个流程讲了一遍。但是这些只不过是Flink最基本的内容。接下来需要更为深入的了解Flink的特性,这些特性就是体现Flink的优势。本章先来了解第一个高级概念:“窗口”
1 窗口定义及分类
1.1 定义
根据《官方文档》的描述:窗口是Flink处理无界流的核心。窗口将流分成有限大小的“桶”,可以对桶内的数据进行定制化的计算。这么说可能会比较难理解,下面通过图解来说明一下窗口的概念:
- 管道里的数据不断的流过来,Flink会根据规则生成一个桶来接住这些数据,桶的生成规则如下:
1)基于时间规则:一定范围时间内的数据放到一个桶
2)基于计数规则:一定数量的数据放到一个桶 - 当一开始没有任何数据过来时,Flink是不会生成桶的,而是第一条数据过来才会生成一个桶。也就是事件触发类型的。
- Flink也不是触发了就一下子生成很多桶,而是到来的那一条数据符合某些条件(时间或者数量条件)时,才会生成新的桶
1.2 窗口分类
通过前面的描述可以了解到关于窗口简单理解为一个按照一定规则接受数据的桶,可以根据不同规则、不同窗口分配器以及是否做KeyBy的维度进行分类:
1)根据是否KeyBy可分为:
- 进行KeyBy:则会将数据的key相同的分配到同一个子任务,并在子任务进行窗口计算,可以是由多个子任务计算不同key的数据。
- 不进行KeyBy:不会对数据进行分开,全部都由一个子任务进行计算。
2)根据不同规则可分为:
- 时间窗口:根据设定的的时间跨度进行划分窗口,比如设定5秒钟,则会5秒钟一个桶。
- 计数窗口:根据设定的计数总量进行划分窗口,比如设定100条数据,则会100条一个桶。
3)根据窗口分配器可分为:
-
滚动窗口:将数据按照时间或者计数方式划分为不重叠的窗口。每个窗口都有固定时间或者计数。
-
滑动窗口:将数据按照设定的时间或者计数+步长的方式划分窗口,每个窗口都有固定时间或者计数,但是窗口跟窗口之间存在一定重叠数据。
注意:这里需要注意2个点。
1)当输入前2条数据时,就已经会有一个窗口生成,因为前2条数据会与之前2条(假设的数据)构成一个窗口。那么也就意味着滑动窗口是会按照步长每2条数据创建一个窗口。
2)滑动的窗口与窗口之间是会重叠2条数据,也就是有2条数据会被共享。滚动窗口经常用于计算最近1个小时、1天的数据量这种场景
3)细心的朋友还发现,其实滚动窗口就是滑动窗口一个特例,当滑动窗口的步长等于窗口长度时,就是滚动窗口。
- 会话窗口:将数据按照规定多少时间内没有其它数据再进来,那么之前进来的数据归为一个窗口。每个窗口都没有固定时间,窗口与窗口之间数据不重复。但是这种方式只支持时间规则,不支持计数规则。
它们之间可以进行组合,并产生不同的作用。当然并非所有的都能组合,本来有223=12种,但是由于计数与会话窗口不能组合,因此只有下面10种组合结果:
类型 | 作用 | 子任务数 | 是否固定 | 窗口之间是否重叠数据 |
---|---|---|---|---|
不进行KeyBy+时间+滚动 | 按照设定的时间间隔,划分窗口 | 1个 | 时间固定,数量不固定 | 不重叠 |
不进行KeyBy+时间+滑动 | 按照设定的时间间隔+步长,划分窗口 | 1个 | 时间固定,数量不固定 | 重叠 |
不进行KeyBy+时间+会话 | 按照设定的时间间隔,超过该时间间隔没有数据进入, 则开启新的窗口 | 1个 | 时间不固定,数量不固定 | 不重叠 |
不进行KeyBy+计数+滚动 | 按照设定的计数,划分窗口 | 1个 | 数量固定,数据不固定 | 不重叠 |
不进行KeyBy+计数+滑动 | 按照设定的计数+步长,划分窗口 | 1个 | 固定计数 | 重叠 |
进行KeyBy+时间+滚动 | 按照设定的时间间隔,划分窗口 | 1或n个 | 时间固定,数量不固定 | 不重叠 |
进行KeyBy+时间+滑动 | 按照设定的时间间隔+步长,划分窗口 | 1或n个 | 时间固定,数量不固定 | 重叠 |
进行KeyBy+时间+会话 | 按照设定的时间间隔,超过该时间间隔没有数据进入, 则开启新的窗口 | 1或n个 | 时间不固定,数量不固定 | 不重叠 |
进行KeyBy+计数+滚动 | 按照设定的计数,划分窗口 | 1或n个 | 数量固定,数据不固定 | 不重叠 |
进行KeyBy+计数+滑动 | 按照设定的计数+步长,划分窗口 | 1或n个 | 固定计数 | 重叠 |
1.3 计算函数
如果要进行窗口计算,需要先通过windowAll或window方法进行开窗操作,开窗之后就可以使用其计算函数。通过拿AllWindowedStream类来举例子,可以看到有很多种计算方式,包括max、min、reduce、aggregate、process等方法。但其实只需要关注其中reduce、aggregate、process三个计算函数即可,因为其它方法都是基于这三个方法中的函数进行扩展的,下面说明一下这三个函数的内容:
1)reduce:ReduceFunction函数,输入、累加器以及输出的数据类型必须一致
reduce方法需要实现ReduceFunction函数:
/*** 输入、累加器以及输出的数据类型必须一致*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* @param value1 上一次累加的结果* @param value2 这一次进来的数据* @return 返回相同数据类型的数据*/T reduce(T value1, T value2) throws Exception;
}
2)aggregate:AggregateFunction函数,输入、累加器以及输出的数据类型可以不一致
aggregate:AggregateFunction函数:
/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN 输入的数据类型* ACC 累加器的数据类型* OUT 返回的数据类型*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 创建一个累加器,也就是初始化最初开始累积的值。比如你是一个计算总数的,你初始累加器就可以是0。*/ACC createAccumulator();/*** 进行累加操作,也就是计算操作。(不要被add的名称误导了,它就是写你自己的聚合逻辑)* @param value 这一次进来的数据* @param accumulator 上一次累加的结果* @return 返回累加的数据*/ACC add(IN value, ACC accumulator);/*** 输出的最终值给下游算子*/OUT getResult(ACC accumulator);/*** 一般在会话窗口中使用,用于合并窗口与窗口结果*/ACC merge(ACC a, ACC b);
}
3)process:ProcessWindowFunction和ProcessAllWindowFunction函数,这个是自定义计算函数
ProcessWindowFunction和ProcessAllWindowFunction函数之前在《系列之十一 - Data Stream API的中间算子的底层原理及其自定义》就有讲过,但是没有代码演示,这一章会进行代码演示。ProcessWindowFunction和ProcessAllWindowFunction函数其实就是自定义聚合方式,两者区别在于是否使用过keyBy操作。他们有一个process方法要实现,该方法只有窗口结束时,才会被执行。
ProcessWindowFunction函数:
/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN 输入的数据类型* OUT 返回的数据类型* KEY key的类型,如果是ProcessAllWindowFunction,则没有该泛型* W 开窗类型*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {/*** 处理聚合操作的方法* */public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
}
2 代码演示
前面讲了很多概念,可能通过字面还是很难理解,现在通过demo的方式来说明。可能就能够理解。由于做KeyBy和不做KeyBy唯一的区别就是有多少个子任务来处理开窗操作。所以这里只演示做KeyBy的示例:
以下所有代码参考lesson08子模块
2.1 reduce、aggregate&process演示
这里先对reduce、aggregate和process三个方法进行演示,本次演示大家要关注的是看看三个方法的差异:
方法 | 实现函数 | 关键函数方法 | 特性 | 执行频率 | 是否有rich函数 |
---|---|---|---|---|---|
reduce | ReduceFunction | reduce | 输入、输出和累加器的数据类型必须一致 | 每个窗口除了第一条数据之外,每来一条数据都会执行 | 有 |
aggregate | AggregateFunction | add | 输入、输出和累加器的数据类型可以不一致 | 每个窗口每来一条数据都会执行 | 有 |
process | ProcessWindowFunction或 者ProcessAllWindowFunction | process | 输入和输出的数据类型可以不一致 | 每个窗口结束时,执行一次 | 过期(非Rich函数已经能够获取上下文) |
2.1.1 reduce方法演示
示例说明:输入的cpu值在同一个窗口中累加起来。看看reduce如何被调用
ReduceDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<ServerInfo>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1="+ value1 + "value2=" + value2);value1.setCpu(value1.getCpu()+value2.getCpu());return value1;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前2条数据快速输入,第3条等一段时间后再输入
输出:
知识点:总共输入了3条数据,其中前2条数据快速输入,第3条等一段时间后再输入,结果是:reduce只打印1次,print打印2次。原因:
1)本示例中使用滚动窗口,设置时间间隔为10秒,因此第一条和第二条在第一个时间窗口,第三条在第二个时间窗口,因此print打印2次
2)reduce只输出1次,这个在《系列之八 - Data Stream API的中间算子:转换和聚合》中讲聚合类算子讲过,reduce第一条数据是不会被调用的。因此第一条数据进来时,不调用reduce;第二条数据进来时,调用reduce;第三条数据进来时,由于是新的窗口,因此它算是该窗口的第一条数据,因此也不调用reduce。
2.1.2 aggregate方法演示
示例说明:通过aggregate实现求同一个窗口下的平均cpu值
AggregateDemo类:
import com.demo.lesson08.model.ServerAvgInfo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示Aggregate方法*/
public class AggregateDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.aggregate(new AggregateFunction<ServerInfo, ServerAvgInfo, String>() {@Overridepublic ServerAvgInfo createAccumulator() {// 初始值System.out.println("====createAccumulator====");return new ServerAvgInfo();}@Overridepublic ServerAvgInfo add(ServerInfo value, ServerAvgInfo accumulator) {System.out.println("====add====");// 累积cpu值以及条数accumulator.setServerId(value.getServerId());accumulator.setNum(accumulator.getNum()==null?1:accumulator.getNum()+1);accumulator.setCpuTotal(accumulator.getCpuTotal()==null?value.getCpu():accumulator.getCpuTotal()+ value.getCpu());return accumulator;}@Overridepublic String getResult(ServerAvgInfo accumulator) {System.out.println("====getResult====");// 平均cpu值return "平均cpu值: "+(accumulator.getCpuTotal()/accumulator.getNum());}@Overridepublic ServerAvgInfo merge(ServerAvgInfo a, ServerAvgInfo b) {System.out.println("====merge====");return null;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,第4条等一段时间(10秒)后再输入
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间(10秒)后再输入。结果是:createAccumulator打印2次,add打印4次,getResult打印2次,merge打印0次,print打印2次。原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。createAccumulator和getResult也是打印2次,因此说明他们是在窗口创建和关闭时分别被调用
2)add输出4次,说明每一次数据输入,都会执行add方法。
3)支持输入、累加器以及输出都是不一样的数据类型
2.1.3 process方法演示
示例说明:通过process实现求同一个窗口下的平均cpu值
ProcessDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;public class ProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,等待控制台输出process,再输入第4条数据
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,print打印2次,原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。
2)process打印2次,说明每个窗口在最后的时候process才会调用一次,并且将本次窗口的数据条数都一次性计算,并没有每条数据计算。
3)支持输入和输出都是不一样的数据类型
2.1.4 结合使用
细心的朋友还能看到reduce或者aggregate方法还有另外重载的方法,可以传入2个参数。比如reduce可以传入ReduceFunction和ProcessWindowFunction,aggregate可以传入AggregateFunction和ProcessWindowFunction。其实这是由于它们各自存在一定的缺点,为了弥补缺点,可以结合使用,各取有点。
注意:使用时是会将ReduceFunction或AggregateFunction最终的数据给ProcessWindowFunction,也就是每个窗口,ProcessWindowFunction的process方法的elements只会拿到ReduceFunction或AggregateFunction累加器最后计算得到的一条数据。
示例说明:在ReduceFunction中累积CPU,然后在ProcessWindowFunction计算平均值
ReduceAndProcessDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceAndProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1=" + value1 + "value2=" + value2);value1.setCpu(value1.getCpu() + value2.getCpu());return value1;}}, new ProcessWindowFunction<>() {@Overridepublic void process(String key, ProcessWindowFunction<ServerInfo, ServerInfo, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<ServerInfo> out) throws Exception {// 平均cpu值long num = elements.spliterator().estimateSize();ServerInfo serverInfo = elements.iterator().next();System.out.println("服务器id=" + key + "在窗口的时间:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+ " 的条数=" + elements.spliterator().estimateSize()+ " 错误平均cpu值: " + (serverInfo.getCpu() / num));out.collect(serverInfo);}});// 7. 打印reduce.print("最终结果");// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:快速输入3条数据,保证3条数据在一个窗口
输出:
知识点:从上面输出结果可以看到,reduce被调用2次,这是因为总共输入3条数据。process被调用一次,而且可以看到平均值计算错误,在process中打印的结果显示获取到的条数是1条。由此可以看出process获取到的是reduce最后一条数据。
2.2 时间窗口演示
时间窗口的演示,只需要关注的是不同窗口的时间划分
2.2.3 时间滚动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
TumblingTimeWindowsDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滚动窗口示例*/
public class TumblingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,等待控制台输出数据之后,再输入第4条
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,说明被分为2个窗口:
1)注意窗口的开始时间和结束时间,第一个窗口是从0秒-10秒,第二个窗口是从10秒-20秒
2)这说明滚动窗口,窗口之间是不会重叠的。
3)另外注意的点是,窗口时间范围是左闭右开的,也就是0秒是属于第一个窗口,但10秒不属于第一个窗口,而是属于第二个窗口
2.2.2 时间滑动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SlidingTimeWindowsDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滑动窗口示例*/
public class SlidingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗 - 滑动窗口,间隔为10秒,步长为5秒WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10),Duration.ofSeconds(5)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:先快速输入前三条,等待控制台输出之后,再输入第4条数据
输出:
知识点:从控制台可以看到process被执行了2次,说明开了2个窗口。
1)注意其每个窗口的开始时间和结束时间,比如第一个窗口是从15秒-25秒,第二个窗口是从20秒-30秒,说明在20秒-25秒是两个窗口重叠之处,也就是代码中设置的步长5秒
2)前三条数据分别被第一个窗口和第二个窗口给计算了,因此说明前3条数据输入时间落在了20秒-25秒之间。因此说明两个窗口是会重叠的。
2.2.3 时间会话窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SessionTimeWindowsDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间会话窗口演示*/
public class SessionTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(// 固定间隔时间ProcessingTimeSessionWindows.withGap(Duration.ofSeconds(10))// 动态间隔时间
// ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ServerInfo>(){
// @Override
// public long extract(ServerInfo element) {
// return 0;
// }
// }));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前面2条数据在10秒钟之内输入,第2条数据输入后的10秒钟之后,再输入第3条数据
输出:
知识点:从控制台可以看到process被执行2次,因此它是开了2个窗口
1)前面两条数据被合并为一个窗口,因此平均值是2.5,第三条数据在第二个窗口,因此平均值2.8。
2)注意一下每个窗口的开始和结束时间,都不想之前滚动或者滑动窗口一样,都是固定的
3)会话窗口还支持动态规定间隔时间,你可以使用withDynamicGap方法,实现函数接口,在输入的数据中设置某个属性为间隔时间,实现动态的会话窗口。
2.3 计数窗口演示
2.3.1 计数滚动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
TumblingCountWindowsDemo 类:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滚动窗口示例*/
public class TumblingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(3);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {ServerInfo next = iterator.next();sum = sum + next.getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:输入前3条,再输入后3条
输出:(平均值由于精度问题不用管)
知识点:示例中设置滚动窗口,每个窗口3条数据。输入了6条数据,process调用了2次
1)这样就知道滚动窗口是每3条会新建一个窗口,窗口之间不重叠,与代码示例中设置的一样
2.3.2 计数滑动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SlidingCountWindowsDemo 类:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滑动窗口示例*/
public class SlidingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(4,2);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {sum = sum + iterator.next().getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:先输入2条,等控制台输出后再输入第3条和第4条,等控制台输出后再输入第5条和第6条
输出:
知识点:总共输入6条数据,先输入2条,等一下再输入第3条和第4条,等一下再输入第5条和第6条。process调用了3次
1)可以看出它是会按照步长来调用process,这是因为第1条和第2条会是前一个窗口与当前窗口重叠
结语:本章说明了不同的窗口类型,还通过代码演示,更深刻了解窗口的工作原理。接下来一章将接触更底层的Flink如何实现不同窗口的。