Flink 系列之十五 - 高级概念 - 窗口

之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。

注意由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19
代码参考:https://github.com/forever1986/flink-study.git

目录

  • 1 窗口定义及分类
    • 1.1 定义
    • 1.2 窗口分类
    • 1.3 计算函数
  • 2 代码演示
    • 2.1 reduce、aggregate&process演示
      • 2.1.1 reduce方法演示
      • 2.1.2 aggregate方法演示
      • 2.1.3 process方法演示
      • 2.1.4 结合使用
    • 2.2 时间窗口演示
      • 2.2.3 时间滚动窗口
      • 2.2.2 时间滑动窗口
      • 2.2.3 时间会话窗口
    • 2.3 计数窗口演示
      • 2.3.1 计数滚动窗口
      • 2.3.2 计数滑动窗口

前面几章对Flink从数据输入到中间计算,最后到数据输出的整个流程讲了一遍。但是这些只不过是Flink最基本的内容。接下来需要更为深入的了解Flink的特性,这些特性就是体现Flink的优势。本章先来了解第一个高级概念:“窗口

1 窗口定义及分类

1.1 定义

根据《官方文档》的描述:窗口是Flink处理无界流的核心。窗口将流分成有限大小的“”,可以对桶内的数据进行定制化的计算。这么说可能会比较难理解,下面通过图解来说明一下窗口的概念:

在这里插入图片描述

  • 管道里的数据不断的流过来,Flink会根据规则生成一个桶来接住这些数据,桶的生成规则如下:
    1)基于时间规则:一定范围时间内的数据放到一个桶
    2)基于计数规则:一定数量的数据放到一个桶
  • 当一开始没有任何数据过来时,Flink是不会生成桶的,而是第一条数据过来才会生成一个桶。也就是事件触发类型的。
  • Flink也不是触发了就一下子生成很多桶,而是到来的那一条数据符合某些条件(时间或者数量条件)时,才会生成新的桶

1.2 窗口分类

通过前面的描述可以了解到关于窗口简单理解为一个按照一定规则接受数据的桶,可以根据不同规则、不同窗口分配器以及是否做KeyBy的维度进行分类:

1)根据是否KeyBy可分为:

  • 进行KeyBy:则会将数据的key相同的分配到同一个子任务,并在子任务进行窗口计算,可以是由多个子任务计算不同key的数据。
  • 不进行KeyBy:不会对数据进行分开,全部都由一个子任务进行计算。

2)根据不同规则可分为:

  • 时间窗口:根据设定的的时间跨度进行划分窗口,比如设定5秒钟,则会5秒钟一个桶。
  • 计数窗口:根据设定的计数总量进行划分窗口,比如设定100条数据,则会100条一个桶。

3)根据窗口分配器可分为:

  • 滚动窗口:将数据按照时间或者计数方式划分为不重叠的窗口。每个窗口都有固定时间或者计数。
    在这里插入图片描述

  • 滑动窗口:将数据按照设定的时间或者计数+步长的方式划分窗口,每个窗口都有固定时间或者计数,但是窗口跟窗口之间存在一定重叠数据。
    在这里插入图片描述

注意:这里需要注意2个点。
1)当输入前2条数据时,就已经会有一个窗口生成,因为前2条数据会与之前2条(假设的数据)构成一个窗口。那么也就意味着滑动窗口是会按照步长每2条数据创建一个窗口。
2)滑动的窗口与窗口之间是会重叠2条数据,也就是有2条数据会被共享。滚动窗口经常用于计算最近1个小时、1天的数据量这种场景
3)细心的朋友还发现,其实滚动窗口就是滑动窗口一个特例,当滑动窗口的步长等于窗口长度时,就是滚动窗口

  • 会话窗口:将数据按照规定多少时间内没有其它数据再进来,那么之前进来的数据归为一个窗口。每个窗口都没有固定时间,窗口与窗口之间数据不重复。但是这种方式只支持时间规则不支持计数规则

它们之间可以进行组合,并产生不同的作用。当然并非所有的都能组合,本来有223=12种,但是由于计数与会话窗口不能组合,因此只有下面10种组合结果:

类型作用子任务数是否固定窗口之间是否重叠数据
不进行KeyBy+时间+滚动按照设定的时间间隔,划分窗口1个时间固定,数量不固定不重叠
不进行KeyBy+时间+滑动按照设定的时间间隔+步长,划分窗口1个时间固定,数量不固定重叠
不进行KeyBy+时间+会话按照设定的时间间隔,超过该时间间隔没有数据进入,
则开启新的窗口
1个时间不固定,数量不固定不重叠
不进行KeyBy+计数+滚动按照设定的计数,划分窗口1个数量固定,数据不固定不重叠
不进行KeyBy+计数+滑动按照设定的计数+步长,划分窗口1个固定计数重叠
进行KeyBy+时间+滚动按照设定的时间间隔,划分窗口1或n个时间固定,数量不固定不重叠
进行KeyBy+时间+滑动按照设定的时间间隔+步长,划分窗口1或n个时间固定,数量不固定重叠
进行KeyBy+时间+会话按照设定的时间间隔,超过该时间间隔没有数据进入,
则开启新的窗口
1或n个时间不固定,数量不固定不重叠
进行KeyBy+计数+滚动按照设定的计数,划分窗口1或n个数量固定,数据不固定不重叠
进行KeyBy+计数+滑动按照设定的计数+步长,划分窗口1或n个固定计数重叠

1.3 计算函数

如果要进行窗口计算,需要先通过windowAll或window方法进行开窗操作,开窗之后就可以使用其计算函数。通过拿AllWindowedStream类来举例子,可以看到有很多种计算方式,包括max、min、reduce、aggregate、process等方法。但其实只需要关注其中reduce、aggregate、process三个计算函数即可,因为其它方法都是基于这三个方法中的函数进行扩展的,下面说明一下这三个函数的内容:

1)reduceReduceFunction函数,输入、累加器以及输出的数据类型必须一致

reduce方法需要实现ReduceFunction函数:

/*** 输入、累加器以及输出的数据类型必须一致*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* @param value1  上一次累加的结果* @param value2  这一次进来的数据* @return 返回相同数据类型的数据*/T reduce(T value1, T value2) throws Exception;
}

2)aggregateAggregateFunction函数,输入、累加器以及输出的数据类型可以不一致

aggregate:AggregateFunction函数:

/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN  输入的数据类型* ACC  累加器的数据类型* OUT  返回的数据类型*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 创建一个累加器,也就是初始化最初开始累积的值。比如你是一个计算总数的,你初始累加器就可以是0。*/ACC createAccumulator();/*** 进行累加操作,也就是计算操作。(不要被add的名称误导了,它就是写你自己的聚合逻辑)* @param value 这一次进来的数据* @param accumulator 上一次累加的结果* @return 返回累加的数据*/ACC add(IN value, ACC accumulator);/*** 输出的最终值给下游算子*/OUT getResult(ACC accumulator);/*** 一般在会话窗口中使用,用于合并窗口与窗口结果*/ACC merge(ACC a, ACC b);
}

3)processProcessWindowFunctionProcessAllWindowFunction函数,这个是自定义计算函数

ProcessWindowFunctionProcessAllWindowFunction函数之前在《系列之十一 - Data Stream API的中间算子的底层原理及其自定义》就有讲过,但是没有代码演示,这一章会进行代码演示。ProcessWindowFunctionProcessAllWindowFunction函数其实就是自定义聚合方式,两者区别在于是否使用过keyBy操作。他们有一个process方法要实现,该方法只有窗口结束时,才会被执行。

ProcessWindowFunction函数:

/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN  输入的数据类型* OUT  返回的数据类型* KEY  key的类型,如果是ProcessAllWindowFunction,则没有该泛型* W  开窗类型*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {/*** 处理聚合操作的方法* */public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
}

2 代码演示

前面讲了很多概念,可能通过字面还是很难理解,现在通过demo的方式来说明。可能就能够理解。由于做KeyBy和不做KeyBy唯一的区别就是有多少个子任务来处理开窗操作。所以这里只演示做KeyBy的示例:

以下所有代码参考lesson08子模块

2.1 reduce、aggregate&process演示

这里先对reduce、aggregate和process三个方法进行演示,本次演示大家要关注的是看看三个方法的差异

方法实现函数关键函数方法特性执行频率是否有rich函数
reduceReduceFunctionreduce输入、输出和累加器的数据类型必须一致每个窗口除了第一条数据之外,每来一条数据都会执行
aggregateAggregateFunctionadd输入、输出和累加器的数据类型可以不一致每个窗口每来一条数据都会执行
processProcessWindowFunction或
者ProcessAllWindowFunction
process输入和输出的数据类型可以不一致每个窗口结束时,执行一次过期(非Rich函数已经能够获取上下文)

2.1.1 reduce方法演示

示例说明:输入的cpu值在同一个窗口中累加起来。看看reduce如何被调用

ReduceDemo类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<ServerInfo>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1="+ value1 + "value2=" + value2);value1.setCpu(value1.getCpu()+value2.getCpu());return value1;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:前2条数据快速输入,第3条等一段时间后再输入
在这里插入图片描述

输出:
在这里插入图片描述

知识点:总共输入了3条数据,其中前2条数据快速输入,第3条等一段时间后再输入,结果是:reduce只打印1次,print打印2次。原因:
1)本示例中使用滚动窗口,设置时间间隔为10秒,因此第一条和第二条在第一个时间窗口,第三条在第二个时间窗口,因此print打印2次
2)reduce只输出1次,这个在《系列之八 - Data Stream API的中间算子:转换和聚合》中讲聚合类算子讲过,reduce第一条数据是不会被调用的。因此第一条数据进来时,不调用reduce;第二条数据进来时,调用reduce;第三条数据进来时,由于是新的窗口,因此它算是该窗口的第一条数据,因此也不调用reduce。

2.1.2 aggregate方法演示

示例说明:通过aggregate实现求同一个窗口下的平均cpu值

AggregateDemo类:

import com.demo.lesson08.model.ServerAvgInfo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示Aggregate方法*/
public class AggregateDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.aggregate(new AggregateFunction<ServerInfo, ServerAvgInfo, String>() {@Overridepublic ServerAvgInfo createAccumulator() {// 初始值System.out.println("====createAccumulator====");return new ServerAvgInfo();}@Overridepublic ServerAvgInfo add(ServerInfo value, ServerAvgInfo accumulator) {System.out.println("====add====");// 累积cpu值以及条数accumulator.setServerId(value.getServerId());accumulator.setNum(accumulator.getNum()==null?1:accumulator.getNum()+1);accumulator.setCpuTotal(accumulator.getCpuTotal()==null?value.getCpu():accumulator.getCpuTotal()+ value.getCpu());return accumulator;}@Overridepublic String getResult(ServerAvgInfo accumulator) {System.out.println("====getResult====");// 平均cpu值return "平均cpu值: "+(accumulator.getCpuTotal()/accumulator.getNum());}@Overridepublic ServerAvgInfo merge(ServerAvgInfo a, ServerAvgInfo b) {System.out.println("====merge====");return null;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:前3条数据快速输入,第4条等一段时间(10秒)后再输入
在这里插入图片描述

输出:
在这里插入图片描述

知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间(10秒)后再输入。结果是:createAccumulator打印2次,add打印4次,getResult打印2次,merge打印0次,print打印2次。原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。createAccumulator和getResult也是打印2次,因此说明他们是在窗口创建和关闭时分别被调用
2)add输出4次,说明每一次数据输入,都会执行add方法
3)支持输入、累加器以及输出都是不一样的数据类型

2.1.3 process方法演示

示例说明:通过process实现求同一个窗口下的平均cpu值

ProcessDemo类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;public class ProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:前3条数据快速输入,等待控制台输出process,再输入第4条数据
在这里插入图片描述

输出:
在这里插入图片描述

知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,print打印2次,原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。
2)process打印2次,说明每个窗口在最后的时候process才会调用一次,并且将本次窗口的数据条数都一次性计算,并没有每条数据计算。
3)支持输入和输出都是不一样的数据类型

2.1.4 结合使用

细心的朋友还能看到reduce或者aggregate方法还有另外重载的方法,可以传入2个参数。比如reduce可以传入ReduceFunctionProcessWindowFunction,aggregate可以传入AggregateFunctionProcessWindowFunction。其实这是由于它们各自存在一定的缺点,为了弥补缺点,可以结合使用,各取有点。

注意:使用时是会将ReduceFunctionAggregateFunction最终的数据给ProcessWindowFunction,也就是每个窗口,ProcessWindowFunctionprocess方法的elements只会拿到ReduceFunctionAggregateFunction累加器最后计算得到的一条数据

示例说明:在ReduceFunction中累积CPU,然后在ProcessWindowFunction计算平均值

ReduceAndProcessDemo 类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceAndProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1=" + value1 + "value2=" + value2);value1.setCpu(value1.getCpu() + value2.getCpu());return value1;}}, new ProcessWindowFunction<>() {@Overridepublic void process(String key, ProcessWindowFunction<ServerInfo, ServerInfo, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<ServerInfo> out) throws Exception {// 平均cpu值long num = elements.spliterator().estimateSize();ServerInfo serverInfo = elements.iterator().next();System.out.println("服务器id=" + key + "在窗口的时间:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+ " 的条数=" + elements.spliterator().estimateSize()+ " 错误平均cpu值: " + (serverInfo.getCpu() / num));out.collect(serverInfo);}});// 7. 打印reduce.print("最终结果");// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:快速输入3条数据,保证3条数据在一个窗口
在这里插入图片描述

输出:
在这里插入图片描述

知识点:从上面输出结果可以看到,reduce被调用2次,这是因为总共输入3条数据。process被调用一次,而且可以看到平均值计算错误,在process中打印的结果显示获取到的条数是1条。由此可以看出process获取到的是reduce最后一条数据。

2.2 时间窗口演示

时间窗口的演示,只需要关注的是不同窗口的时间划分

2.2.3 时间滚动窗口

示例说明:通过process实现求同一个窗口下的平均cpu值

TumblingTimeWindowsDemo 类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滚动窗口示例*/
public class TumblingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:前3条数据快速输入,等待控制台输出数据之后,再输入第4条
在这里插入图片描述

输出:
在这里插入图片描述

知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,说明被分为2个窗口:
1)注意窗口的开始时间和结束时间,第一个窗口是从0秒-10秒,第二个窗口是从10秒-20秒
2)这说明滚动窗口,窗口之间是不会重叠的。
3)另外注意的点是,窗口时间范围是左闭右开的,也就是0秒是属于第一个窗口,但10秒不属于第一个窗口,而是属于第二个窗口

2.2.2 时间滑动窗口

示例说明:通过process实现求同一个窗口下的平均cpu值

SlidingTimeWindowsDemo 类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滑动窗口示例*/
public class SlidingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗 - 滑动窗口,间隔为10秒,步长为5秒WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10),Duration.ofSeconds(5)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:先快速输入前三条,等待控制台输出之后,再输入第4条数据
在这里插入图片描述

输出:
在这里插入图片描述

知识点:从控制台可以看到process被执行了2次,说明开了2个窗口。
1)注意其每个窗口的开始时间和结束时间,比如第一个窗口是从15秒-25秒,第二个窗口是从20秒-30秒,说明在20秒-25秒是两个窗口重叠之处,也就是代码中设置的步长5秒
2)前三条数据分别被第一个窗口和第二个窗口给计算了,因此说明前3条数据输入时间落在了20秒-25秒之间。因此说明两个窗口是会重叠的。

2.2.3 时间会话窗口

示例说明:通过process实现求同一个窗口下的平均cpu值

SessionTimeWindowsDemo类:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间会话窗口演示*/
public class SessionTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(// 固定间隔时间ProcessingTimeSessionWindows.withGap(Duration.ofSeconds(10))// 动态间隔时间
//                ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ServerInfo>(){
//                    @Override
//                    public long extract(ServerInfo element) {
//                        return 0;
//                    }
//                }));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:前面2条数据在10秒钟之内输入,第2条数据输入后的10秒钟之后,再输入第3条数据
在这里插入图片描述

输出:
在这里插入图片描述

知识点:从控制台可以看到process被执行2次,因此它是开了2个窗口
1)前面两条数据被合并为一个窗口,因此平均值是2.5,第三条数据在第二个窗口,因此平均值2.8。
2)注意一下每个窗口的开始和结束时间,都不想之前滚动或者滑动窗口一样,都是固定的
3)会话窗口还支持动态规定间隔时间,你可以使用withDynamicGap方法,实现函数接口,在输入的数据中设置某个属性为间隔时间,实现动态的会话窗口。

2.3 计数窗口演示

2.3.1 计数滚动窗口

示例说明:通过process实现求同一个窗口下的平均cpu值

TumblingCountWindowsDemo 类:

import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滚动窗口示例*/
public class TumblingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(3);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {ServerInfo next = iterator.next();sum = sum + next.getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:输入前3条,再输入后3条
在这里插入图片描述

输出:(平均值由于精度问题不用管)
在这里插入图片描述

知识点:示例中设置滚动窗口,每个窗口3条数据。输入了6条数据,process调用了2次
1)这样就知道滚动窗口是每3条会新建一个窗口,窗口之间不重叠,与代码示例中设置的一样

2.3.2 计数滑动窗口

示例说明:通过process实现求同一个窗口下的平均cpu值

SlidingCountWindowsDemo 类:

import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滑动窗口示例*/
public class SlidingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(4,2);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {sum = sum + iterator.next().getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}

输入:先输入2条,等控制台输出后再输入第3条和第4条,等控制台输出后再输入第5条和第6条
在这里插入图片描述

输出:
在这里插入图片描述

知识点:总共输入6条数据,先输入2条,等一下再输入第3条和第4条,等一下再输入第5条和第6条。process调用了3次
1)可以看出它是会按照步长来调用process,这是因为第1条和第2条会是前一个窗口与当前窗口重叠

结语:本章说明了不同的窗口类型,还通过代码演示,更深刻了解窗口的工作原理。接下来一章将接触更底层的Flink如何实现不同窗口的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/80578.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大疆卓驭嵌入式面经及参考答案

FreeRTOS 有哪 5 种内存管理方式&#xff1f; heap_1.c&#xff1a;这种方式简单地在编译时分配一块固定大小的内存&#xff0c;在整个运行期间不会进行内存的动态分配和释放。它适用于那些对内存使用需求非常明确且固定&#xff0c;不需要动态分配内存的场景&#xff0c;优点是…

Java 线程池原理

Java 线程池是一种管理和复用线程的机制&#xff0c;其原理如下&#xff1a; 核心概念 线程池的初始化 &#xff1a;在创建线程池时&#xff0c;需要设置一些关键参数&#xff0c;如核心线程数&#xff08;corePoolSize&#xff09;、最大线程数&#xff08;maximumPoolSize&am…

大模型都有哪些超参数

大模型的超参数是影响其训练效果、性能和泛化能力的关键设置,可分为以下几大类别并结合实际应用进行详细说明: 一、训练过程相关超参数 学习率(Learning Rate) 作用:控制参数更新的步长,直接影响收敛速度和稳定性。过高会导致震荡或过拟合,过低则收敛缓慢。调整策略:初…

路由器断流排查终极指南:从Ping测试到Wireshark抓包5步定位法

测试路由器是否出现“断流”&#xff08;网络连接间歇性中断&#xff09;&#xff0c;需通过多维度排查硬件、软件及外部干扰因素。以下是详细步骤指南&#xff1a; 一、基础环境准备 设备连接 有线测试&#xff1a;用网线将电脑直接连接路由器LAN口&#xff0c;排除WiFi干扰。…

低代码开发:开启软件开发的新篇章

摘要 低代码开发作为一种新兴的软件开发方式&#xff0c;正在迅速改变传统软件开发的模式和效率。它通过可视化界面和预设的模板&#xff0c;使非专业开发者也能够快速构建应用程序&#xff0c;极大地降低了开发门槛和成本。本文将深入探讨低代码开发的定义、优势、应用场景以及…

基于Django汽车数据分析大屏可视化系统项目

基于Django汽车数据分析大屏可视化系统项目 一、项目概述 本项目是一个基于 Python 的汽车数据分析大屏可视化系统&#xff0c;旨在通过直观的可视化界面展示汽车相关数据&#xff0c;帮助用户更好地理解和分析汽车市场动态、车辆性能等信息。系统采用前后端分离的架构&#…

WebRTC通信原理与流程

1、服务器与协议相关 1.1 STUN服务器 图1.1.1 STUN服务器在通信中的位置图 1.1.1 STUN服务简介 STUN&#xff08;Session Traversal Utilities for NAT&#xff0c;NAT会话穿越应用程序&#xff09;是一种网络协议&#xff0c;它允许位于NAT&#xff08;或多重 NAT&#xff09;…

Beta分布--贝叶斯建模概率或比例常用分布

Beta分布是一种定义在区间 ([0, 1]) 上的连续概率分布&#xff0c;常用于描述比例或概率的不确定性。它的形状由两个正参数 (\alpha)&#xff08;alpha&#xff09;和 (\beta)&#xff08;beta&#xff09;控制&#xff0c;能够呈现多种形态&#xff08;如对称、偏态、U型等&am…

深度学习算法:开启智能时代的钥匙

引言 深度学习作为机器学习的一个分支&#xff0c;近年来在图像识别、自然语言处理、语音识别等多个领域取得了革命性的进展。它的核心在于构建多层的神经网络&#xff0c;通过模仿人脑处理信息的方式&#xff0c;让机器能够从数据中学习复杂的模式。 深度学习算法的基本原理…

深入了解linux系统—— 自定义shell

shell的原理 我们知道&#xff0c;我们程序启动时创建的进程&#xff0c;它的父进程都是bash也就是shell命令行解释器&#xff1b; 那bash都做了哪些工作呢&#xff1f; 根据已有的知识&#xff0c;我们可以简单理解为&#xff1a; 输出命令行提示符获取并解析我们输入的指令…

Redux和Vuex

为什么React和Vue需要Redux和Vuex 状态管理需求的演变 #mermaid-svg-GaKl3pkZ82yc1m8E {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GaKl3pkZ82yc1m8E .error-icon{fill:#552222;}#mermaid-svg-GaKl3pkZ82yc1m8E…

Kubernetes排错(十三):Pod间偶发超时问题排查

在微服务架构中&#xff0c;Pod间偶发的通信超时是最令人头疼的问题之一。本文将通过生产环境中的真实案例&#xff0c;手把手教你定位这类"幽灵问题"。 一、快速定位问题方向&#xff08;5分钟缩小范围&#xff09; 1. 基础检查三板斧 # 检查Service与Endpoint映…

Nginx 源码安装成服务

一、环境准备 一台装有 CentOS 7.9 的虚拟机&#xff08;IP: 192.168.40.81&#xff09;nginx-1.21.6.tar.gz 安装包一个&#xff08;版本随意&#xff09; 二、安装 1&#xff09;解压 nginx-1.21.6.tar.gz tar -xzvf nginx-1.21.6.tar.gz -tar&#xff1a;这是一个在 Linu…

L51.【LeetCode题解】438. 找到字符串中所有字母异位词(四种方法)

目录 1.题目 2.分析 暴力解法 方法1:排序(超时) 方法2:哈希表(险过) ★判断两个哈希表是否相同算法(通用方法,必须掌握) 能相等的前提:两个哈希表的大小相等 哈希表有迭代器,可以使用范围for从头到尾遍历 提交结果 优化方法:定长滑动窗口 提交结果 使用哈希数组更快…

Qt模块化架构设计教程 -- 轻松上手插件开发

概述 在软件开发领域,随着项目的增长和需求的变化,保持代码的可维护性和扩展性变得尤为重要。一个有效的解决方案是采用模块化架构,尤其是利用插件系统来增强应用的功能性和灵活性。Qt框架提供了一套强大的插件机制,可以帮助开发者轻松实现这种架构。 模块化与插件系统 模…

深入理解 HashMap 的索引计算:右移与异或的作用

在 Java 中&#xff0c;HashMap 是一种高效的数据结构&#xff0c;它通过将键映射到数组中的索引位置来实现快速的插入和查找。但之前看源码总是理解到它要hash之后散列到数组中某一个位置&#xff0c;但却从未深究它究竟怎么散列的&#xff0c;如果不够散那就意味着hash冲突增…

overleaf较高级的细节指令

换行命令 原来代码是将三个矩阵表达式在同一行显示&#xff0c;使用aligned环境&#xff08;需引入amsmath宏包&#xff0c;一般文档导言区默认会引入&#xff09;&#xff0c;把三个矩阵的定义分别放在不同行&#xff0c;可通过\\换行。 对齐命令 &放在等号前&#xff0…

LiteLLM:统一API接口,让多种LLM模型调用如臂使指

在人工智能迅猛发展的今天,各种大语言模型(LLM)层出不穷。对开发者而言,如何高效集成和管理这些模型成为一个棘手问题。LiteLLM应运而生,它提供了一个统一的API接口,让开发者可以轻松调用包括OpenAI、Anthropic、Cohere等在内的多种LLM模型。本文将深入介绍LiteLLM的特性、…

Google语法整理

以下是从整理出的 Google 语法&#xff1a; site&#xff1a;指定域名&#xff0c;如 “apache site:bbs.xuegod.cn”&#xff0c;可查询网站的收录情况 。 inurl&#xff1a;限定在 url 中搜索&#xff0c;如 “inurl:qq.txt”&#xff0c;可搜索 url 中包含特定内容的页面&a…

python 写一个工作 简单 番茄钟

1、图 2、需求 番茄钟&#xff08;Pomodoro Technique&#xff09;是一种时间管理方法&#xff0c;由弗朗西斯科西里洛&#xff08;Francesco Cirillo&#xff09;在 20 世纪 80 年代创立。“Pomodoro”在意大利语中意为“番茄”&#xff0c;这个名字来源于西里洛最初使用的一个…