三亚做网站多少钱阿里巴巴国际站怎么做网站模版
news/
2025/9/24 10:50:27/
文章来源:
三亚做网站多少钱,阿里巴巴国际站怎么做网站模版,网站开发的价钱,合肥瑶海区教育局官网【README】
本文记录了flink读取不同数据源的编码方式#xff0c;数据源包括#xff1b;
集合#xff08;元素列表#xff09;#xff1b;文件kafka#xff1b;自定义数据源#xff1b;
本文使用的flink为 1.14.4 版本#xff1b;maven依赖如下#xff1a;
dep…【README】
本文记录了flink读取不同数据源的编码方式数据源包括
集合元素列表文件kafka自定义数据源
本文使用的flink为 1.14.4 版本maven依赖如下
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.14.4/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.14.4/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.14.4/version/dependency 【1】从集合读取数据
【1.1】代码
/*** Description flink从集合读取数据 * author xiao tang* version 1.0.0* createTime 2022年04月15日*/
public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从集合读取数据DataStreamSensorReading sensorStream env.fromCollection(Arrays.asList(new SensorReading(sensor_1, 12341561L, 36.1), new SensorReading(sensor_2, 12341562L, 33.5), new SensorReading(sensor_3, 12341563L, 39.9), new SensorReading(sensor_4, 12341564L, 31.2)));// 打印输出sensorStream.print(sensor);// 从元素列表读取数据DataStreamInteger intStream env.fromElements(1, 2, 3, 7, 8, 2, 100, 34, 3);intStream.print(intStream);// 执行env.execute(sensorJob);}
}
/*** Description 传感器温度读数* author xiao tang* version 1.0.0* createTime 2022年04月15日*/
public class SensorReading {private String id;private Long timestamp;private double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, double temperature) {this.id id;this.timestamp timestamp;this.temperature temperature;}打印结果 intStream:6 8 intStream:5 7 intStream:7 2 sensor:8 SensorReading{idsensor_2, timestamp12341562, temperature33.5} intStream:1 34 sensor:1 SensorReading{idsensor_3, timestamp12341563, temperature39.9} intStream:3 2 intStream:4 3 intStream:2 1 intStream:2 3 sensor:7 SensorReading{idsensor_1, timestamp12341561, temperature36.1} intStream:8 100 sensor:2 SensorReading{idsensor_4, timestamp12341564, temperature31.2} 【2】 从文件读取数据
【2.1】代码
/*** Description flink从文件读取数据* author xiao tang* version 1.0.0* createTime 2022年04月15日*/
public class SourceTest2_File {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 从文件读取数据DataStreamString fileStream env.readTextFile(D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt);// 打印输出fileStream.print(sensor);// 执行env.execute(sensorJob);}
}
sensor.txt 如下
sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_4,12341564,31.2
打印结果 sensor sensor_1,12341561,36.1 sensor sensor_2,12341562,33.5 sensor sensor_3,12341563,39.9 sensor sensor_4,12341564,31.2 【3】从kafka读取数据
1引入maven依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.14.4/version/dependency
2flink作为消费者连接到kafka
/*** Description flink从kafka读取数据* author xiao tang* version 1.0.0* createTime 2022年04月15日*/
public class SourceTest3_kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 创建flink连接kafkaKafkaSource kafkaSource KafkaSource.Stringbuilder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics(hello0415).setGroupId(flink).build();DataStreamString kafkaStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkaSource);// 打印输出kafkaStream.print(kafkaStream);// 执行env.execute(kafkaStreamJob);}
}
public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, G1);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);}public Properties getProps() {return props;}
}
3打开 kafka生产者命令行
kafka-console-producer.sh --broker-list centos201:9092,centos202:9092,centos203:9092 --topic hello0415 补充 关于kafka集群可以参见我的文章
kafka集群搭建_PacosonSWJTU的博客-CSDN博客 【4】自定义数据源
自定义数据源可以用于自测 flinkjob 的场景中
public class SourceTest4_UDF {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度为1// 创建自定义数据源DataStreamSensorReading udfStream env.addSource(new SourceFunctionSensorReading() {int i 1;int mod 1000;Random random new Random();boolean runnable true;Overridepublic void run(SourceContextSensorReading sourceContext) throws Exception {while (runnable) {sourceContext.collect(new SensorReading(String.valueOf(i % mod 1), System.currentTimeMillis(), 30 random.nextGaussian()));if (i % 5 0) TimeUnit.SECONDS.sleep(1);}}Overridepublic void cancel() {runnable false;}});// 打印输出udfStream.print(udfStream);// 执行env.execute(udfStreamJob);}
}
打印结果 udfStream:4 SensorReading{id5, timestamp1650030559865, temperature31.015354380481117} udfStream:1 SensorReading{id2, timestamp1650030559853, temperature29.23797321841027} udfStream:3 SensorReading{id4, timestamp1650030559865, temperature31.148402161461384} udfStream:2 SensorReading{id3, timestamp1650030559865, temperature30.082462570224305}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/915588.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!