【2】flink数据流转换算子

【README】

本文记录了flink对数据的转换操作,包括

  1. 基本转换,map,flatMap,filter;
  2. 滚动聚合(min minBy max maxBy sum);
  3. 规约聚合-reduce;
  4. 分流;
  5. connect连接流;
  6. union合流;
  7. 富函数;
  8. 重分区;

本文使用的flink为 1.14.4 版本;maven依赖如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>

本文部分内容参考了 flink 官方文档:

概览 | Apache Flink算子 # 用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。数据流转换 # Map # DataStream → DataStream # 输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:Java DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); Scala dataStream.map { x => x * 2 } Python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) data_stream.map(lambda x: 2 * x, output_type=Types.,>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/overview/


【1】基本转换算子

包括 map-转换, flatMap-打散,filter-过滤;

1)代码如下:

/*** @Description flink对数据流的基本转换* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> baseStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 1-map-转换或映射或函数; 把string转为长度输出
//        DataStream<Integer> mapStream =  baseStream.map(x->x.length());DataStream<Integer> mapStream =  baseStream.map(String::length);// 2-flatMap-打散-按照逗号分割字段DataStream<String> flatMapStream = baseStream.flatMap((String raw, Collector<String> collector)->{for (String rd : raw.split(",")) {collector.collect(rd);}}).returns(Types.STRING);// 3-filter-过滤-筛选 sensor_1 开头的结束DataStream<String> filterStream = baseStream.filter(x->x.startsWith("sensor_1"));// 打印输出mapStream.print("mapStream");flatMapStream.print("flatMapStream");filterStream.print("filterStream");// 执行env.execute("BaseTransformStreamJob");}
}

sensor 文本文件如下:

sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_1,12341573,43.1

打印结果:

mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341561
flatMapStream> 36.1
filterStream> sensor_1,12341561,36.1
mapStream> 22
flatMapStream> sensor_2
flatMapStream> 12341562
flatMapStream> 33.5
mapStream> 22
flatMapStream> sensor_3
flatMapStream> 12341563
flatMapStream> 39.9
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341573
flatMapStream> 43.1
filterStream> sensor_1,12341573,43.1


【2】滚动聚合算子

keyBy算子-根据key对数据流分组,因为聚合前必须前分组,类似于sql的group by;

keyBy算子的作用

  • 逻辑把一个数据流拆分为多个分区(但还是同一个流),每个分区包含相同key(相同hash)的元素,底层对key求hash来实现;
  • 在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。

keyBy可以形成 KeyedStream

然后滚动聚合算子可以对 KeyStream 进行操作,滚动聚合算子如下:

  • sum
  • min
  • max
  • minBy
  • maxBy

【2.1】代码示例

/*** @Description 滚动聚合算子* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest2_RollingAgg {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子对数据流分组,并做滚动聚合(单字段分组)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// keyBy 多字段分组
//        KeyedStream<SensorReading, Tuple1<String>> keyedStream = sensorStream.keyBy(new KeySelector<SensorReading, Tuple1<String>>() {
//            @Override
//            public Tuple1<String> getKey(SensorReading sensorReading) throws Exception {
//                return Tuple1.of(sensorReading.getId());
//            }
//        });// max聚合DataStream<SensorReading> maxTempratureStream = keyedStream.max("temperature");// maxBy 聚合DataStream<SensorReading> maxbyTempratureStream = keyedStream.maxBy("temperature");// 打印输出maxTempratureStream.print("maxTempratureStream");// 打印输出maxbyTempratureStream.print("maxbyTempratureStream");// 执行env.execute("maxTempratureStreamJob");}
}

sensor文本内容:

sensor_1,11,36.1
sensor_2,21,33.1
sensor_1,12,36.2
sensor_1,13,36.3
sensor_2,22,33.2

max聚合打印结果:

max> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.2}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.3}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.2}

maxBy聚合打印结果:

maxBy> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
maxBy> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
maxBy> SensorRd{id='sensor_1', timestamp=12, temperature=36.2}
maxBy> SensorRd{id='sensor_1', timestamp=13, temperature=36.3}
maxBy> SensorRd{id='sensor_2', timestamp=22, temperature=33.2}

小结,max与maxBy区别:

  • max:把聚合字段(最大温度值)取出来,其他字段和第一条记录保持一致;
  • maxBy:把聚合字段(最大温度值)取出来,且连同最大温度值所在记录的其他字段一并取出来;

同理 min与minby,本文不再演示;

补充: 聚合时要先分组,可以根据单字段分组,也可以根据多个字段分组

上述代码注释部分给出了多个字段分组的例子,一个组记录称为Tuple,元组

1个字段叫 Tuple1,2个字段叫Tuple2;....


【2.2】规约聚合-reduce

定义:

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

场景:根据sensorid分组后,形成keyedStream,然后查询最大温度,且最新时间戳;即多个聚合算子;

代码

/*** @Description reduce规约聚合算子 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest3_Reduce {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子对数据流分组,并做滚动聚合(单字段分组)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// reduce规约聚合-查询最大温度,且最新时间戳DataStream<SensorReading> reduceStream = keyedStream.reduce((a,b)->new SensorReading(a.getId(), Math.max(a.getTimestamp(),b.getTimestamp()), Math.max(a.getTemperature(),b.getTemperature())));// 打印输出reduceStream.print("reduceStream");// 执行env.execute("reduceStreamJob");}
}

sensor文本:

sensor_1,11,36.1
sensor_2,21,33.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,31.2

打印结果:

reduceStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
reduceStream> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_2', timestamp=22, temperature=33.1}


【3】分流(把一个流切分为多个流)

flink 1.14.4 移除了 split 算子,refer2  https://issues.apache.org/jira/browse/FLINK-19083

转而使用 side output 侧输出实现,refer2

Side Outputs | Apache Flink

【3.1】 切分流(flink移除了split方法,需要使用 side output 来实现流切分)

1)代码,启动大于30度算高温,否则低温;

通过实现  ProcessFunction 来实现;

public class TransformTest4_SplitStream {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照温度值是否大于30度,分为两条流-高温和低温OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把数据发送到侧输出context.output(record.getTemperature()>30? highTag : lowTag, record);// 把数据发送到常规输出collector.collect(record);}});// 2, 选择流打印输出splitStream.getSideOutput(highTag).print("high");splitStream.getSideOutput(lowTag).print("low");// 执行env.execute("reduceStreamJob");}
}

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

打印结果:

high> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
low> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
high> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
high> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
low> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}

以上分流代码refer2  Process function: a versatile tool in Flink datastream API | Develop Paper


【4】connect 连接流

1)定义: 把多个流连接为一个流,叫做连接流,连接流中的子流的各自元素类型可以不同

2)步骤:

  • 把2个流 connect 连接再一起形成 ConnectedStream;
  • 把连接流 通过 map 得到数据流;

代码:

/*** @Description connect-连接流* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest5_ConnectStream {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照温度值是否大于30度,分为两条流-高温和低温OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把数据发送到侧输出context.output(record.getTemperature() > 30 ? highTag : lowTag, record);// 把数据发送到常规输出collector.collect(record);}});// 得到高温和低温流DataStream<SensorReading> highStream = splitStream.getSideOutput(highTag);DataStream<SensorReading> lowStream = splitStream.getSideOutput(lowTag);// 2 把2个流连接为1个流(子流1的元素为3元组,子流2的元素为2元组)ConnectedStreams<SensorReading, SensorReading> connectedStreams = highStream.connect(lowStream);DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<SensorReading, SensorReading, Object>() {@Overridepublic Object map1(SensorReading rd) throws Exception {return new Tuple3<>(rd.getId(), rd.getTemperature(), "high"); // map1 作用于第1个流 highStream}@Overridepublic Object map2(SensorReading rd) throws Exception {return new Tuple2<>(rd.getId(), rd.getTemperature()); // map2 作用于第2个流 lowStream}});// 3 打印结果resultStream.print("connectedStream");// 执行env.execute("connectedStreamJob");}
}

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

打印结果:

connectedStream> (sensor_1,36.1,high)
connectedStream> (sensor_2,23.1)
connectedStream> (sensor_1,36.2,high)
connectedStream> (sensor_2,11.2)
connectedStream> (sensor_1,30.3,high)


【5】合流-union

上述connect,只能连接两条流,如果要合并多条流,connect需要多次连接,不太适合;

如果要合并多条流,需要用 union,前提是 多个流的元素数据类型需要相同;

1)代码

 // 2 把3个流合并为1个流DataStream<SensorReading> inputStream2 = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor2.txt").map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});DataStream<SensorReading> unionStream =  highStream.union(lowStream,inputStream2);// 3 打印结果unionStream.print("unionStream");// 执行env.execute("unionStreamJob");

打印结果:

unionStream> SensorRd{id='sensor2_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor2_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor2_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor2_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor2_2', timestamp=22, temperature=11.2}
unionStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}


【6】自定义函数 UDF user-defined function

flink 暴露了所有udf 函数的接口,如MapFunction, FilterFunction, ProcessFunction等;可以理解为 java8引入的 函数式接口;

可以参考官方的udf文档:

ck自定义函数 | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/

【6.1】富函数

1)复函数可以获取上下文信息,而普通函数则不行;

代码:

/*** @Description 富函数* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest7_RichFunction {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 自定义富函数DataStream<Tuple3<String,Integer,Integer>> richMapStream = sensorStream.map(new MyRichMapFunction());// 3 打印结果richMapStream.print("richMapStream");// 执行env.execute("richMapStreamJob");}// 富函数类static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> map(SensorReading record) throws Exception {// 富函数可以获取运行时上下文的属性  getRuntimeContext() ,普通map函数则不行return new Tuple3<String, Integer, Integer>(record.getId(), record.getId().length(), getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化工作,一般是定义状态, 或者建立数据库连接System.out.println("open db conn");}@Overridepublic void close() throws Exception {// 关闭连接,清空状态System.out.println("close db conn");}}
}

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

 打印结果:

open db conn
open db conn
richMapStream:1> (sensor_1,8,0)
richMapStream:2> (sensor_1,8,1)
richMapStream:1> (sensor_2,8,0)
richMapStream:2> (sensor_2,8,1)
richMapStream:1> (sensor_1,8,0)
close db conn
close db conn

从打印结果可以看出,每个子任务(线程)都会执行 open close方法 ,tuple3中的第3个字段是 执行上下文的任务id(这是富函数才可以获得上下文);


【7】flink中的数据重分区

1)flink中的分区指的是: taskmanager中的槽,即线程

分区操作有:

  • shuffle-洗牌乱分区;
  • keyBy-按照key分区;
  • global 把数据转到第1个分区

2)代码 :

/*** @Description 重分区* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest8_Partition2 {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1-shuffle 洗牌(乱分区)DataStream<SensorReading> shuffleStream = sensorStream.shuffle();shuffleStream.print("shuffleStream");// 2-keyby 按照key分区DataStream<SensorReading> keybyStream = sensorStream.keyBy(SensorReading::getId);
//        keybyStream.print("keybyStream");// 3-global 把数据转到第1个分区DataStream<SensorReading> globalStream = sensorStream.global();
//        globalStream.print("globalStream");// 执行env.execute("partitionJob");}
}

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

原生分区结果:(重分区前)

rawStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
rawStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
rawStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
rawStream:2> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
rawStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}

shuffle-洗牌乱分区结果:

shuffleStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
shuffleStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
shuffleStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
shuffleStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
shuffleStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}

keyby-按照key进行分区的结果:

keybyStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
keybyStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
keybyStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
keybyStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
keybyStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}

global-把数据转到第1个分区的打印结果:

globalStream:1> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
globalStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
globalStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
globalStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
globalStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329563.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第三篇 Entity Framework Plus 之 Query Cache

离上一篇博客&#xff0c;快一周&#xff0c;工作太忙&#xff0c;只能利用休息日来写一些跟大家分享&#xff0c;Entity Framework Plus 组件系列文章&#xff0c;之前已经写过两篇 第一篇 Entity Framework Plus 之 Audit 第二篇 Entity Framework Plus 之 Query Future 计划…

wireshark tcp抓包分析_网络分析系列之八_使用Wireshark抓包

通过前面的部分&#xff0c;我们对Wireshark界面主体内容有了大致了解。这一节主要介绍如何抓包&#xff0c;抓包后的界面显示&#xff08;因为Wireshark打开数据包后又是另一副界面&#xff09;。如何保存或导出抓取的报文等内容。第一次抓包现在可以开始你的第一次数据包捕获…

Oracle入门(五C)之68个系统变量的key和默认value

Oracle的68个系统变量的key和默认valueappinfo 为 OFF 并且已设置为 "SQL*Plus" arraysize 15 autocommit OFF autoprint OFF autorecovery OFF autotrace OFF blockterminator "." (hex 2e) btitle OFF 为下一条 SELECT 语句的前几个字符 …

【3】flink sink

【README】 本文记录了flink sink操作&#xff0c;输出目的存储器&#xff08;中间件&#xff09;包括 kafka&#xff1b;es&#xff1b;db&#xff1b;等等有很多&#xff1b;本文只给出了 sink2kafka的代码&#xff1b; 本文使用的flink为 1.14.4 版本&#xff1b; 本文部…

【给中高级开发者】构建高性能ASP.NET应用的几点建议

如果你在构建一个面向公众的web站点&#xff0c;那么在项目结尾时你想要实现的就是web负载性能良好。这意味着&#xff0c;你要确保你的产品在高负载下&#xff08;50个并发用户或者每秒200个用户等等&#xff09;能够运行&#xff0c;即使你认为此时不会有那么大的负载。久而久…

Oracle入门(三B)之11G新特性 SYSASM 角色用来管理ASM

转载自 oracle 11G新特性--SYSASM 角色--用来管理ASM SYSASM 角色自动存储管理 (ASM) 是在 Oracle 数据库 10g 中引入的&#xff0c;它在某种程度上打破了 DBA 和系统管理员之间在存储分配功能上的界限。ASM 实例由 DBA 管理&#xff0c;正如常规的 DBA 工作需要以 SYSDBA 角色…

stm32编程入门_电子设计与单片机编程书籍资料推荐

有些同学让我推荐些入门书籍&#xff0c;尝试写写&#xff1a;注&#xff1a;请多看下目录与介绍&#xff0c;网络上也有一些读书笔记和推荐&#xff0c;自行判断是否符合现有层次和水平&#xff0c;再决定是否购买。针对零基础、非专业的电子爱好者&#xff0c;我的推荐学习步…

.NET Core 1.0 RC2 历险之旅

文章背景&#xff1a;对于.NET Core大家应该并不陌生, 从它被 宣布 到现在已经有1-2年的时间了&#xff0c;其比较重要的一个版本1.0 RC2 也即将发布。.Net Core从一个一个的测试版到现在的RC2&#xff0c;经历了很多个大大小小的变化。特别是在RC1到RC2的更新之中&#xff0c;…

【4.1】flink窗口算子的trigger触发器和Evictor清理器

【README】 本文记录了 窗口算子的触发器trigger和 evictor清理器&#xff1b; trigger触发器&#xff1a;决定了一个窗口&#xff08;由 window assigner 定义&#xff09;何时可以被 window function 处理&#xff1b;evictor清理器&#xff1a; evictor 可以在 trigger 触…

ue4 运行禁用鼠标_[UE4] VS code使用LuaPanda断点调试

luaPanda安装搜索luapanda 点击下载安装或者打开下面的地址&#xff0c;点击会提示打开vscode进行安装luaPanda下载地址&#xff1a;https://marketplace.visualstudio.com/items?itemNamestuartwang.luapanda&ssrfalse#review-details在gethbub中下载LuaPanda.lua文件&am…

Oracle入门(五B)之desc命令

翻译自 DESCRIBE describe命令 列出指定表的列定义&#xff0c;视图或同义词&#xff0c;或指定函数或存储过程的详述。 语法&#xff1a;desc[ribe] {[模式.]对象[链接串]} 模式 表示对象驻留的架构。如果省略架构&#xff0c;SQL*Plus假定拥有自己的对象。 对象 表示要描…

【4】flink window窗口算子

【README】 本文记录了 窗口算子操作&#xff1b;本文使用的flink为 1.14.4 版本&#xff1b;本文部分内容总结自 flink 官方文档&#xff1a; 窗口 | Apache Flink窗口 # 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中…

Microsoft将在UWP上支持React Native,同时为VS Code添加工具软件

Microsoft和Facebook日前宣布React Native的下一个目标平台是Universal Windows Platform&#xff08;UWP&#xff09;。 对于已经在多个设备平台上使用React Native的开发者来说&#xff0c;这是个好消息&#xff0c;因为这让他们可以在Windows上重用组件。对于UWP的开发者来说…

Xamarin Evolve 2016 Keynote回顾

编者语&#xff1a;距离上一次Xamarin Evolve 大会足足有两年时间了&#xff0c;这两年整个行业都在变化&#xff0c;Xamarin是整个.NET行业的表表者。两年过去Xamarin终于并入微软&#xff0c;免费了&#xff0c;也开源了。还有什么大招呢&#xff1f;刚结束的Xamarin Evolve给…

【5】flink窗口与水位线watermark例子

【README】 0&#xff09;本文编写了多个flink水位线watermark的代码例子&#xff0c;加深对watermark的理解 &#xff1b; 1&#xff09;时间分类 Event Time&#xff1a; 事件创建的时间&#xff08;事件发生时间&#xff09;&#xff1b;Ingestion Time&#xff1a;数据进…

技术人生,专家本色——采访张善友老师后的一点感受

距离Xamarin Evolve开幕还有不到三个小时。Miguel deIcaza和他的团队又将发布周年升级平台Xamarin 5。作为一个关注Mono/Xamarin多年的老粉丝&#xff0c;这个时候心情是蛮激动的。我想&#xff0c;刚刚作客.NET FM第五期“来者何人”专访系列的张善友老师&#xff0c;一定也是…

(翻)为什么要训练人工神经网络

【README】 本文翻译自 https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73 【1】介绍 今天我将从非常简短的神经网络介绍开始&#xff0c;这足以理解我将要讨论的…

使用VS Code开发 调试.NET Core 应用程序

使用VS Code开发 调试.NET Core RC2应用程序,由于.NET Core 目前还处于预览版。 本文使用微软提供的示例进行开发及调试。 https://github.com/aspnet/cli-samples .NET Core 介绍及说明&#xff1a; https://github.com/dotnet/cli http://dotnet.github.io/getting-started/ …

(转)简单神经网络(权值阈值训练)

本文转自&#xff1a; 神经网络入门 - 阮一峰的网络日志https://www.ruanyifeng.com/blog/2017/07/neural-network.html 眼下最热门的技术&#xff0c;绝对是人工智能。 人工智能的底层模型是"神经网络"&#xff08;neural network&#xff09;。许多复杂的应用&am…

Oracle入门(五A)之conn命令

一、connect命令将给定的用户名连接到Oracle数据库。当你运行一个连接命令、站点配置文件、Galgn.SQL和用户配置文件&#xff0c;按顺序处理Login .SQL。连接不重发如果初始连接不成功&#xff0c;请使用用户名或密码。语法&#xff1a; conn[ect] [{登录串|/|代理串} [AS…