【3】flink sink

【README】

本文记录了flink sink操作,输出目的存储器(中间件)包括

  • kafka;
  • es;
  • db;
  • 等等有很多;
  • 本文只给出了 sink2kafka的代码

本文使用的flink为 1.14.4 版本;

本文部分内容参考了 flink 官方文档,如下:

Kafka | Apache Flinkicon-default.png?t=M3C8https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/


【1】 flink sink2kafka

1)场景:

  • 消费上游topic hello0415的数据,并把数据流输出到下游kafka topic hell0416;
  • 如,我们在java框架中把数据库日志发送到 topic1 ,然后我想统计执行时间大于3秒的sql,则需要把筛选后的sql 发送到 下游 topic2, 就可以使用sink 来完成;

2)代码

/*** @Description flink流输出到kafka(下沉)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度// 创建flink连接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// kafka生产者属性Properties kafkaProducerProps = new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka数据流输出到(sink) topic hello0416KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("hello0416").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print("kafkaStream");// 执行env.execute("kafkaSinkJob");}
}

效果:


 【补充】

kafka 消费者属性

public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329559.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【给中高级开发者】构建高性能ASP.NET应用的几点建议

如果你在构建一个面向公众的web站点&#xff0c;那么在项目结尾时你想要实现的就是web负载性能良好。这意味着&#xff0c;你要确保你的产品在高负载下&#xff08;50个并发用户或者每秒200个用户等等&#xff09;能够运行&#xff0c;即使你认为此时不会有那么大的负载。久而久…

Oracle入门(三B)之11G新特性 SYSASM 角色用来管理ASM

转载自 oracle 11G新特性--SYSASM 角色--用来管理ASM SYSASM 角色自动存储管理 (ASM) 是在 Oracle 数据库 10g 中引入的&#xff0c;它在某种程度上打破了 DBA 和系统管理员之间在存储分配功能上的界限。ASM 实例由 DBA 管理&#xff0c;正如常规的 DBA 工作需要以 SYSDBA 角色…

stm32编程入门_电子设计与单片机编程书籍资料推荐

有些同学让我推荐些入门书籍&#xff0c;尝试写写&#xff1a;注&#xff1a;请多看下目录与介绍&#xff0c;网络上也有一些读书笔记和推荐&#xff0c;自行判断是否符合现有层次和水平&#xff0c;再决定是否购买。针对零基础、非专业的电子爱好者&#xff0c;我的推荐学习步…

.NET Core 1.0 RC2 历险之旅

文章背景&#xff1a;对于.NET Core大家应该并不陌生, 从它被 宣布 到现在已经有1-2年的时间了&#xff0c;其比较重要的一个版本1.0 RC2 也即将发布。.Net Core从一个一个的测试版到现在的RC2&#xff0c;经历了很多个大大小小的变化。特别是在RC1到RC2的更新之中&#xff0c;…

【4.1】flink窗口算子的trigger触发器和Evictor清理器

【README】 本文记录了 窗口算子的触发器trigger和 evictor清理器&#xff1b; trigger触发器&#xff1a;决定了一个窗口&#xff08;由 window assigner 定义&#xff09;何时可以被 window function 处理&#xff1b;evictor清理器&#xff1a; evictor 可以在 trigger 触…

ue4 运行禁用鼠标_[UE4] VS code使用LuaPanda断点调试

luaPanda安装搜索luapanda 点击下载安装或者打开下面的地址&#xff0c;点击会提示打开vscode进行安装luaPanda下载地址&#xff1a;https://marketplace.visualstudio.com/items?itemNamestuartwang.luapanda&ssrfalse#review-details在gethbub中下载LuaPanda.lua文件&am…

Oracle入门(五B)之desc命令

翻译自 DESCRIBE describe命令 列出指定表的列定义&#xff0c;视图或同义词&#xff0c;或指定函数或存储过程的详述。 语法&#xff1a;desc[ribe] {[模式.]对象[链接串]} 模式 表示对象驻留的架构。如果省略架构&#xff0c;SQL*Plus假定拥有自己的对象。 对象 表示要描…

【4】flink window窗口算子

【README】 本文记录了 窗口算子操作&#xff1b;本文使用的flink为 1.14.4 版本&#xff1b;本文部分内容总结自 flink 官方文档&#xff1a; 窗口 | Apache Flink窗口 # 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中…

Microsoft将在UWP上支持React Native,同时为VS Code添加工具软件

Microsoft和Facebook日前宣布React Native的下一个目标平台是Universal Windows Platform&#xff08;UWP&#xff09;。 对于已经在多个设备平台上使用React Native的开发者来说&#xff0c;这是个好消息&#xff0c;因为这让他们可以在Windows上重用组件。对于UWP的开发者来说…

Xamarin Evolve 2016 Keynote回顾

编者语&#xff1a;距离上一次Xamarin Evolve 大会足足有两年时间了&#xff0c;这两年整个行业都在变化&#xff0c;Xamarin是整个.NET行业的表表者。两年过去Xamarin终于并入微软&#xff0c;免费了&#xff0c;也开源了。还有什么大招呢&#xff1f;刚结束的Xamarin Evolve给…

【5】flink窗口与水位线watermark例子

【README】 0&#xff09;本文编写了多个flink水位线watermark的代码例子&#xff0c;加深对watermark的理解 &#xff1b; 1&#xff09;时间分类 Event Time&#xff1a; 事件创建的时间&#xff08;事件发生时间&#xff09;&#xff1b;Ingestion Time&#xff1a;数据进…

技术人生,专家本色——采访张善友老师后的一点感受

距离Xamarin Evolve开幕还有不到三个小时。Miguel deIcaza和他的团队又将发布周年升级平台Xamarin 5。作为一个关注Mono/Xamarin多年的老粉丝&#xff0c;这个时候心情是蛮激动的。我想&#xff0c;刚刚作客.NET FM第五期“来者何人”专访系列的张善友老师&#xff0c;一定也是…

(翻)为什么要训练人工神经网络

【README】 本文翻译自 https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73https://towardsdatascience.com/how-do-we-train-neural-networks-edd985562b73 【1】介绍 今天我将从非常简短的神经网络介绍开始&#xff0c;这足以理解我将要讨论的…

使用VS Code开发 调试.NET Core 应用程序

使用VS Code开发 调试.NET Core RC2应用程序,由于.NET Core 目前还处于预览版。 本文使用微软提供的示例进行开发及调试。 https://github.com/aspnet/cli-samples .NET Core 介绍及说明&#xff1a; https://github.com/dotnet/cli http://dotnet.github.io/getting-started/ …

(转)简单神经网络(权值阈值训练)

本文转自&#xff1a; 神经网络入门 - 阮一峰的网络日志https://www.ruanyifeng.com/blog/2017/07/neural-network.html 眼下最热门的技术&#xff0c;绝对是人工智能。 人工智能的底层模型是"神经网络"&#xff08;neural network&#xff09;。许多复杂的应用&am…

Oracle入门(五A)之conn命令

一、connect命令将给定的用户名连接到Oracle数据库。当你运行一个连接命令、站点配置文件、Galgn.SQL和用户配置文件&#xff0c;按顺序处理Login .SQL。连接不重发如果初始连接不成功&#xff0c;请使用用户名或密码。语法&#xff1a; conn[ect] [{登录串|/|代理串} [AS…

2-操作系统启动

【README】 本文内容总结自 《操作系统-哈工大李治军老师》&#xff0c;内容非常棒&#xff0c;墙裂推荐&#xff1b; 【1】概述 1&#xff09;问题&#xff1a;操作系统启动时&#xff0c;它应该做什么事情&#xff1f; 计算机启动时&#xff0c;需要把存储在磁盘上的操作系…

Asp.net 面向接口可扩展框架之类型转化基础服务

新框架正在逐步完善,可喜可贺的是基础服务部分初具模样了,给大家分享一下 由于基础服务涉及面太广,也没开发完,这篇只介绍其中的类型转化部分,命名为类型转化基础服务,其实就是基础服务模块的类型转化子模块 说到类型转化必须要弄清楚.net的类型,类型都不清楚何来类型转化 1、P…

asset文件夹路径 unity_我们来捣鼓一下Unity的平台跳跃Microgame

作者&#xff1a;Truly大家好。Platformer Microgame是Unity官方的一个2D平台跳跃游戏项目模板&#xff0c;小伙伴们可以在这个模板的基础上修改或者制作自己的游戏。下边我们就来简单试玩下这个工程吧~一、资源导入写文章时&#xff0c;该资源暂时还不支持Unity 2019.x版本&am…

Oracle入门(五)之基本命令操作

一、show &#xff08;1&#xff09;查询Oracle系统变量值 语法&#xff1a;show 变量名 show all --查看所有68个系统变量值 show user --显示当前连接用户 注&#xff1a;Oracle的68个系统变量的key和默认value &#xff08;2&#xff09;查询oracle数据库当前的参数值 …