【1】flink-source读取数据

【README】

本文记录了flink读取不同数据源的编码方式,数据源包括;

  • 集合(元素列表);
  • 文件
  • kafka;
  • 自定义数据源;

本文使用的flink为 1.14.4 版本;maven依赖如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>

【1】从集合读取数据

【1.1】代码

/*** @Description flink从集合读取数据 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从集合读取数据DataStream<SensorReading> sensorStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 12341561L, 36.1), new SensorReading("sensor_2", 12341562L, 33.5), new SensorReading("sensor_3", 12341563L, 39.9), new SensorReading("sensor_4", 12341564L, 31.2)));// 打印输出sensorStream.print("sensor");// 从元素列表读取数据DataStream<Integer> intStream = env.fromElements(1, 2, 3, 7, 8, 2, 100, 34, 3);intStream.print("intStream");// 执行env.execute("sensorJob");}
}
/*** @Description 传感器温度读数* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SensorReading {private String id;private Long timestamp;private double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, double temperature) {this.id = id;this.timestamp = timestamp;this.temperature = temperature;}

打印结果:

intStream:6> 8
intStream:5> 7
intStream:7> 2
sensor:8> SensorReading{id='sensor_2', timestamp=12341562, temperature=33.5}
intStream:1> 34
sensor:1> SensorReading{id='sensor_3', timestamp=12341563, temperature=39.9}
intStream:3> 2
intStream:4> 3
intStream:2> 1
intStream:2> 3
sensor:7> SensorReading{id='sensor_1', timestamp=12341561, temperature=36.1}
intStream:8> 100
sensor:2> SensorReading{id='sensor_4', timestamp=12341564, temperature=31.2}


【2】 从文件读取数据

【2.1】代码

/*** @Description flink从文件读取数据* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest2_File {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 从文件读取数据DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 打印输出fileStream.print("sensor");// 执行env.execute("sensorJob");}
}

sensor.txt 如下:

sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_4,12341564,31.2

打印结果:

sensor> sensor_1,12341561,36.1
sensor> sensor_2,12341562,33.5
sensor> sensor_3,12341563,39.9
sensor> sensor_4,12341564,31.2


【3】从kafka读取数据

1)引入maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.4</version></dependency>

2)flink作为消费者连接到kafka

/*** @Description flink从kafka读取数据* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest3_kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 创建flink连接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// 打印输出kafkaStream.print("kafkaStream");// 执行env.execute("kafkaStreamJob");}
}
public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;}
}

3)打开 kafka生产者命令行:

kafka-console-producer.sh --broker-list centos201:9092,centos202:9092,centos203:9092 --topic hello0415

补充: 关于kafka集群,可以参见我的文章 :

kafka集群搭建_PacosonSWJTU的博客-CSDN博客


 【4】自定义数据源

自定义数据源,可以用于自测 flinkjob 的场景中;

public class SourceTest4_UDF {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度为1// 创建自定义数据源DataStream<SensorReading> udfStream = env.addSource(new SourceFunction<SensorReading>() {int i = 1;int mod = 1000;Random random = new Random();boolean runnable = true;@Overridepublic void run(SourceContext<SensorReading> sourceContext) throws Exception {while (runnable) {sourceContext.collect(new SensorReading(String.valueOf(i++ % mod + 1), System.currentTimeMillis(), 30 + random.nextGaussian()));if (i % 5 == 0) TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {runnable = false;}});// 打印输出udfStream.print("udfStream");// 执行env.execute("udfStreamJob");}
}

打印结果:

udfStream:4> SensorReading{id='5', timestamp=1650030559865, temperature=31.015354380481117}
udfStream:1> SensorReading{id='2', timestamp=1650030559853, temperature=29.23797321841027}
udfStream:3> SensorReading{id='4', timestamp=1650030559865, temperature=31.148402161461384}
udfStream:2> SensorReading{id='3', timestamp=1650030559865, temperature=30.082462570224305}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle入门(二)之服务启动bat

转载自 批处理&#xff08;bat文件&#xff09;自动启动/关闭oracle服务 批处理&#xff08;bat文件&#xff09; 自动启动/关闭oracle服务 判断oracle 服务状态如果服务处于启动状态&#xff0c;就关闭服务&#xff1b;如果服务处于关闭状态&#xff0c;就启动服务。 ECHO OFF…

【2】flink数据流转换算子

【README】 本文记录了flink对数据的转换操作&#xff0c;包括 基本转换&#xff0c;map&#xff0c;flatMap&#xff0c;filter&#xff1b;滚动聚合&#xff08;min minBy max maxBy sum&#xff09;&#xff1b;规约聚合-reduce&#xff1b;分流&#xff1b;connect连接流…

第三篇 Entity Framework Plus 之 Query Cache

离上一篇博客&#xff0c;快一周&#xff0c;工作太忙&#xff0c;只能利用休息日来写一些跟大家分享&#xff0c;Entity Framework Plus 组件系列文章&#xff0c;之前已经写过两篇 第一篇 Entity Framework Plus 之 Audit 第二篇 Entity Framework Plus 之 Query Future 计划…

wireshark tcp抓包分析_网络分析系列之八_使用Wireshark抓包

通过前面的部分&#xff0c;我们对Wireshark界面主体内容有了大致了解。这一节主要介绍如何抓包&#xff0c;抓包后的界面显示&#xff08;因为Wireshark打开数据包后又是另一副界面&#xff09;。如何保存或导出抓取的报文等内容。第一次抓包现在可以开始你的第一次数据包捕获…

Oracle入门(五C)之68个系统变量的key和默认value

Oracle的68个系统变量的key和默认valueappinfo 为 OFF 并且已设置为 "SQL*Plus" arraysize 15 autocommit OFF autoprint OFF autorecovery OFF autotrace OFF blockterminator "." (hex 2e) btitle OFF 为下一条 SELECT 语句的前几个字符 …

【3】flink sink

【README】 本文记录了flink sink操作&#xff0c;输出目的存储器&#xff08;中间件&#xff09;包括 kafka&#xff1b;es&#xff1b;db&#xff1b;等等有很多&#xff1b;本文只给出了 sink2kafka的代码&#xff1b; 本文使用的flink为 1.14.4 版本&#xff1b; 本文部…

【给中高级开发者】构建高性能ASP.NET应用的几点建议

如果你在构建一个面向公众的web站点&#xff0c;那么在项目结尾时你想要实现的就是web负载性能良好。这意味着&#xff0c;你要确保你的产品在高负载下&#xff08;50个并发用户或者每秒200个用户等等&#xff09;能够运行&#xff0c;即使你认为此时不会有那么大的负载。久而久…

Oracle入门(三B)之11G新特性 SYSASM 角色用来管理ASM

转载自 oracle 11G新特性--SYSASM 角色--用来管理ASM SYSASM 角色自动存储管理 (ASM) 是在 Oracle 数据库 10g 中引入的&#xff0c;它在某种程度上打破了 DBA 和系统管理员之间在存储分配功能上的界限。ASM 实例由 DBA 管理&#xff0c;正如常规的 DBA 工作需要以 SYSDBA 角色…

stm32编程入门_电子设计与单片机编程书籍资料推荐

有些同学让我推荐些入门书籍&#xff0c;尝试写写&#xff1a;注&#xff1a;请多看下目录与介绍&#xff0c;网络上也有一些读书笔记和推荐&#xff0c;自行判断是否符合现有层次和水平&#xff0c;再决定是否购买。针对零基础、非专业的电子爱好者&#xff0c;我的推荐学习步…

.NET Core 1.0 RC2 历险之旅

文章背景&#xff1a;对于.NET Core大家应该并不陌生, 从它被 宣布 到现在已经有1-2年的时间了&#xff0c;其比较重要的一个版本1.0 RC2 也即将发布。.Net Core从一个一个的测试版到现在的RC2&#xff0c;经历了很多个大大小小的变化。特别是在RC1到RC2的更新之中&#xff0c;…

【4.1】flink窗口算子的trigger触发器和Evictor清理器

【README】 本文记录了 窗口算子的触发器trigger和 evictor清理器&#xff1b; trigger触发器&#xff1a;决定了一个窗口&#xff08;由 window assigner 定义&#xff09;何时可以被 window function 处理&#xff1b;evictor清理器&#xff1a; evictor 可以在 trigger 触…

ue4 运行禁用鼠标_[UE4] VS code使用LuaPanda断点调试

luaPanda安装搜索luapanda 点击下载安装或者打开下面的地址&#xff0c;点击会提示打开vscode进行安装luaPanda下载地址&#xff1a;https://marketplace.visualstudio.com/items?itemNamestuartwang.luapanda&ssrfalse#review-details在gethbub中下载LuaPanda.lua文件&am…

Oracle入门(五B)之desc命令

翻译自 DESCRIBE describe命令 列出指定表的列定义&#xff0c;视图或同义词&#xff0c;或指定函数或存储过程的详述。 语法&#xff1a;desc[ribe] {[模式.]对象[链接串]} 模式 表示对象驻留的架构。如果省略架构&#xff0c;SQL*Plus假定拥有自己的对象。 对象 表示要描…

【4】flink window窗口算子

【README】 本文记录了 窗口算子操作&#xff1b;本文使用的flink为 1.14.4 版本&#xff1b;本文部分内容总结自 flink 官方文档&#xff1a; 窗口 | Apache Flink窗口 # 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中…

Microsoft将在UWP上支持React Native,同时为VS Code添加工具软件

Microsoft和Facebook日前宣布React Native的下一个目标平台是Universal Windows Platform&#xff08;UWP&#xff09;。 对于已经在多个设备平台上使用React Native的开发者来说&#xff0c;这是个好消息&#xff0c;因为这让他们可以在Windows上重用组件。对于UWP的开发者来说…

Xamarin Evolve 2016 Keynote回顾

编者语&#xff1a;距离上一次Xamarin Evolve 大会足足有两年时间了&#xff0c;这两年整个行业都在变化&#xff0c;Xamarin是整个.NET行业的表表者。两年过去Xamarin终于并入微软&#xff0c;免费了&#xff0c;也开源了。还有什么大招呢&#xff1f;刚结束的Xamarin Evolve给…

【5】flink窗口与水位线watermark例子

【README】 0&#xff09;本文编写了多个flink水位线watermark的代码例子&#xff0c;加深对watermark的理解 &#xff1b; 1&#xff09;时间分类 Event Time&#xff1a; 事件创建的时间&#xff08;事件发生时间&#xff09;&#xff1b;Ingestion Time&#xff1a;数据进…

技术人生,专家本色——采访张善友老师后的一点感受

距离Xamarin Evolve开幕还有不到三个小时。Miguel deIcaza和他的团队又将发布周年升级平台Xamarin 5。作为一个关注Mono/Xamarin多年的老粉丝&#xff0c;这个时候心情是蛮激动的。我想&#xff0c;刚刚作客.NET FM第五期“来者何人”专访系列的张善友老师&#xff0c;一定也是…

(翻)为什么要训练人工神经网络

【README】 本文翻译自 https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73 【1】介绍 今天我将从非常简短的神经网络介绍开始&#xff0c;这足以理解我将要讨论的…

使用VS Code开发 调试.NET Core 应用程序

使用VS Code开发 调试.NET Core RC2应用程序,由于.NET Core 目前还处于预览版。 本文使用微软提供的示例进行开发及调试。 https://github.com/aspnet/cli-samples .NET Core 介绍及说明&#xff1a; https://github.com/dotnet/cli http://dotnet.github.io/getting-started/ …