flink1.16使用消费/生产kafka之DataStream

flink高级版本后,消费kafka数据一种是Datastream 一种之tableApi。

上官网 Kafka | Apache Flink

Kafka Source

引入依赖 flink和kafka的连接器,里面内置了kafka-client

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.16.2</version>
</dependency>

使用方法

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

很简单一目了然。

topic和partition

多个topic
KafkaSource.builder().setTopics("topic-a", "topic-b");
正则匹配多个topic
KafkaSource.builder().setTopicPattern("topic.*");
指定分区
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);

 反序列化

import org.apache.kafka.common.serialization.StringDeserializer;KafkaSource.<String>builder().setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

其实就是实现接口 DeserializationSchema 的deserialize()方法 把byte转为你想要的类型。

起始消费位点

KafkaSource.builder()// 从消费组提交的位点开始消费,不指定位点重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 从时间戳大于等于指定时间戳(毫秒)的数据开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 从最早位点开始消费.setStartingOffsets(OffsetsInitializer.earliest())// 从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest());

有界 / 无界模式 

Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSource 设置为以流模式运行,因此作业永远不会停止,直到 Flink 作业失败或被取消。 可以使用 setBounded(OffsetsInitializer) 指定停止偏移量使 Kafka Source 以批处理模式运行。当所有分区都达到其停止偏移量时,Kafka Source 会退出运行。

流模式下运行通过使用 setUnbounded(OffsetsInitializer) 也可以指定停止消费位点,当所有分区达到其指定的停止偏移量时,Kafka Source 会退出运行。

估计99的人都用不到这个 。也就是设置一个结束的offset的点

其他属性

除了上述属性之外,您还可以使用 setProperties(Properties) 和 setProperty(String, String) 为 Kafka Source 和 Kafka Consumer 设置任意属性。KafkaSource 有以下配置项:

  • client.id.prefix,指定用于 Kafka Consumer 的客户端 ID 前缀
  • partition.discovery.interval.ms,定义 Kafka Source 检查新分区的时间间隔。 请参阅下面的动态分区检查一节
  • register.consumer.metrics 指定是否在 Flink 中注册 Kafka Consumer 的指标
  • commit.offsets.on.checkpoint 指定是否在进行 checkpoint 时将消费位点提交至 Kafka broker  这个还是有用的

Kafka consumer 的配置可以参考 Apache Kafka 文档。

请注意,即使指定了以下配置项,构建器也会将其覆盖:

  • key.deserializer 始终设置为 ByteArrayDeserializer
  • value.deserializer 始终设置为 ByteArrayDeserializer
  • auto.offset.reset.strategy 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
  • partition.discovery.interval.ms 会在批模式下被覆盖为 -1

动态分区检查

为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为非负值:

KafkaSource.builder().setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区
分区检查功能默认 不开启。需要显式地设置分区检查间隔才能启用此功能。

这个是为了扩容的,因为kafka的消费能力和分区有关,消费能力不够的时候需要动态增加分区 

事件时间和水印

默认情况下,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间。您可以定义自己的水印策略(Watermark Strategy) 以从消息中提取事件时间,并向下游发送水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");

消费位点提交 

Kafka source 在 checkpoint 完成时提交当前的消费位点 ,以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。如果未开启 checkpoint,Kafka source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑,自动提交功能由 enable.auto.commit 和 auto.commit.interval.ms 两个 Kafka consumer 配置项进行配置。

注意:Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度,以在 broker 端进行监控。

这里是说明消费的offset是由kafka管理还是flink管理。

举个例子

.setProperty("enable.auto.commit","true")
.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100")

并且env没有 env.enableCheckpoint()

此时只要消息进入到flink,过了100ms就会被认为已经消费过了offset会+1 不管你这个消息是否处理完,是否处理失败了。

但是如果你env.enableCheckpoint(),那么此时就是由checkpoint被提交的之前提交offset了。和checkpoint息息相关。checkpoint如果失败了 那么这个offset就不会被提交了。

kafka安全认证

KafkaSource.builder().setProperty("security.protocol", "SASL_PLAINTEXT").setProperty("sasl.mechanism", "PLAIN").setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

上面这个比较常用 

另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:

KafkaSource.builder().setProperty("security.protocol", "SASL_SSL")// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks").setProperty("ssl.truststore.password", "test1234")// 如果要求客户端认证,则需要配置 keystore (私钥) 的路径.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks").setProperty("ssl.keystore.password", "test1234")// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256.setProperty("sasl.mechanism", "SCRAM-SHA-256")// 配置 JAAS.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在 JAR 中实际的类路径来改写以上配置。

Kafka Sink

KafkaSink 可将数据流写入一个或多个 Kafka topic。

使用方法

DataStream<String> stream = ...;KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic-name").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();stream.sinkTo(sink);

以下属性在构建 KafkaSink 时是必须指定的:

  • Bootstrap servers, setBootstrapServers(String)
  • 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)

 序列化器

KafkaRecordSerializationSchema.builder().setTopicSelector((element) -> {<your-topic-selection-logic>}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build();

其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器。

容错

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释:

一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:

  • DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
  • 启用 Kerberos 身份验证

  • Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml 中配置 Flink。像这样为 Kafka 启用 Kerberos 身份验证:
    1.通过设置以下内容配置 Kerberos 票据

  • security.kerberos.login.use-ticket-cache:默认情况下,这个值是 true,Flink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。
  • security.kerberos.login.keytab 和 security.kerberos.login.principal:要使用 Kerberos keytabs,需为这两个属性设置值。
    2。将 KafkaClient 追加到 security.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。
  • 将 security.protocol 设置为 SASL_PLAINTEXT(默认为 NONE):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用 SASL_SSL;请在此处查看如何为 SSL 配置 Kafka 客户端。
  • 将 sasl.kerberos.service.name 设置为 kafka(默认为 kafka):此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

问题排查

数据丢失 

根据你的 Kafka 配置,即使在 Kafka 确认写入后,你仍然可能会遇到数据丢失。特别要记住在 Kafka 的配置中设置以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

UnknownTopicOrPartitionException 

导致此错误的一个可能原因是正在进行新的 leader 选举,例如在重新启动 Kafka broker 之后或期间。这是一个可重试的异常,因此 Flink job 应该能够重启并恢复正常运行。也可以通过更改 producer 设置中的 retries 属性来规避。但是,这可能会导致重新排序消息,反过来可以通过将 max.in.flight.requests.per.connection 设置为 1 来避免不需要的消息。

ProducerFencedException 

这个错误是由于 FlinkKafkaProducer 所生成的 transactional.id 与其他应用所使用的的产生了冲突。多数情况下,由于 FlinkKafkaProducer 产生的 ID 都是以 taskName + "-" + operatorUid 为前缀的,这些产生冲突的应用也是使用了相同 Job Graph 的 Flink Job。 我们可以使用 setTransactionalIdPrefix() 方法来覆盖默认的行为,为每个不同的 Job 分配不同的 transactional.id 前缀来解决这个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/8224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++类与对象 - 2(构造函数和析构函数)(超详细)

构造函数和析构函数 - 超详细讲解 1. 构造函数1.1 概念1.2特性 2. 析构函数2.1 概念2.2特性 1. 构造函数 1.1 概念 对于以下Date类&#xff1a; class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout <&l…

C语言float类型学习

C语言的小数类型有两种&#xff0c;float和double&#xff1b; float 称为单精度浮点型&#xff0c;double 称为双精度浮点型&#xff1b; float 占用4个字节&#xff0c;double 占用8个字节&#xff1b; 下面看一下float&#xff1b; #include <stdio.h>int main() {…

电脑记事本在哪里?电脑桌面显示记事本要怎么设置?

绝大多数上班族在使用电脑办公时&#xff0c;都需要随手记录一些琐碎或重要的事情&#xff0c;例如工作注意事项、常用的文案、某项工作的具体要求、多个平台的账号和密码等。于是就有不少小伙伴想要使用电脑记事本软件来记录&#xff0c;那么电脑记事本在哪里呢&#xff1f;想…

JAVA基础-集合的工具类Collections

目录 引言 一&#xff0c;Collections工具类的操作方法方法 1&#xff0c;排序操作 2&#xff0c;替换 和 查找操作 二&#xff0c;Collections工具类的使用 2.1&#xff0c;排序操作 2.1.1&#xff0c;集合的逆序 2.1.2&#xff0c;集合的随机排序 2.1.3&#xff0c;集…

SkyWalking链路追踪中Trace概念以及Trace与span的关系

基本概念 在SkyWalking链路追踪中&#xff0c;Trace&#xff08;追踪&#xff09;是指一个请求或者一个操作从开始到结束的完整路径。它涵盖了分布式系统中所有相关组件的调用关系和性能信息。 具体来说&#xff0c;Trace包含了一系列的span&#xff08;跨度&#xff09;&…

【C#】.Net Framework框架使用JWT

2023年&#xff0c;第31周&#xff0c;第2篇文章。给自己一个目标&#xff0c;然后坚持总会有收货&#xff0c;不信你试试&#xff01; 本篇文章主要简单讲讲&#xff0c;.Net Framework框架下使用JWT的代码例子&#xff0c;以及他们的基本概念。 2002年微软发布了.net framewo…

GFLv2 论文学习

1. 解决了什么问题&#xff1f; 预测定位质量对于目标检测很重要&#xff0c;在 NMS 时它能提供准确的得分排序&#xff0c;提高模型的表现。现有方法都是通过分类或回归的卷积特征来预测定位质量得分。 2. 提出了什么方法&#xff1f; 受到 GFLv1 的 general distribution …

前端开发实习总结参考范文(合集)

▼前端开发实习总结篇一 今天就简单聊聊上面的StrutsSpringHibernate吧。 Struts 代表&#xff1a;表示层;Spring代表&#xff1a;业务逻辑层;Hibernate则代表持久层。他们是目前在Java Web编程开发中用得最多的框架&#xff0c;其实这样区分是为了适应软件开发过程中各个分工…

Shell输出帮助手册

代码&#xff1a; 帮助手册雏形 function help(){echo -e "Help manual":echo -e " -h. -- help View the help manual"echo -e " install Installation"echo -e " uninstall Uninstall" }case "$1&qu…

无涯教程-不是(选择器)

not(selector)方法从匹配的元素集中过滤掉所有与指定选择器匹配的元素。 not( selector ) - 语法 selector.not( selector ) 这是此方法使用的所有参数的描述- selector - 可能是一个逗号分隔的选择器列表&#xff0c;可一次应用多个过滤器(如not(".class1&#…

【stable diffusion】保姆级入门课程05-Stable diffusion(SD)图生图-涂鸦重绘的用法

1.什么是涂鸦重绘 涂鸦重绘又称手涂蒙版。 简单来说&#xff0c;局部重绘手涂蒙版 就是涂鸦局部重绘的结合体&#xff0c;这个功能的出现是为了解决用户不想改变整张图片的情况下&#xff0c;对多个元素进行修改。 功能支持&#xff1a; 1.支持蒙版功能 2.笔刷决定绘制的元素…

基于Java+SpringBoot+Vue前后端分离摄影分享网站平台系统

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

Git常用命令解释

目录 执行git status的信息解释Git基本操作Git仓库/分支相关远程跟踪分支的概念 本文记录学习git命令行的过程和命令的解释 执行git status的信息解释 Changes to be committed&#xff1a;只要在 Changes to be committed 这行下面的&#xff0c;就说明是已暂存状态。 如果此…

使用css 动画实现,水波纹的效果

每日鸡汤&#xff1a;每个你想要学习的瞬间都是未来的你向自己求救 需求&#xff0c;实现水波纹动画效果&#xff0c;要求中心一个圆点&#xff0c;然后有3个圈&#xff0c;一圈一圈的向里面缩小。 说实话我第一个想到了给3个圈设置不同的宽高&#xff0c;然后设置动画0-100%&a…

用Python实战,畅享音乐海洋,一键采集你喜爱的音乐!

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 解释器 Pycharm 编辑器 模块使用: requests >>> pip install requests re 第三方模块安装方法&#xff1a; win R 输入cmd 输入安装命令 pip install 模块名 (如果你觉得安装速…

GitHub发布Copilot Chat公测版,助力开发者编写代码

近日微软GitHub推出了Copilot Chat 的公开测试版&#xff0c;而这个版本不仅仅局限于“代码缺省补充”与“代码纠错”&#xff0c;还能直接基于上文&#xff0c;自动联想出后文。 据了解&#xff0c;该AI工具主要用来助力开发者编写代码&#xff0c;可直接集成到开发者的桌面 …

linux五 进程控制

2 进程相关概念 2.1 程序和进程 程序&#xff0c;是指编译好的二进制文件&#xff0c;在磁盘上&#xff0c;占用磁盘空间, 是一个静态的概念. 进程&#xff0c;一个启动的程序&#xff0c; 进程占用的是系统资源&#xff0c;如&#xff1a;物理内存&#xff0c;CPU&#xf…

Python基础知识-2

名片管理系统 def addHandler():print("添加名片")def deleteHandler():IsDelete input("(づ&#xffe3;3&#xffe3;)づ╭❤&#xff5e;亲~您确定要删除名片吗&#xff1f;Yes/No").upper()if (IsDelete YES):print("删除名片")else:prin…

GB/T 25000.51解读——软件产品的易用性怎么测?

GB/T 25000.51-2016《软件产品质量要求和测试细则》是申请软件检测CNAS认可一定会用到的一部国家标准。在前面的文章中&#xff0c;我们为大家整体介绍了GB/T 25000.51-2016《软件产品质量要求和测试细则》国家标准的结构和所涵盖的内容以及对软件产品的八大质量特性中的功能性…

安全初级:字符编码

字符编码 字符编码&#xff1a;是一种映射规则&#xff0c;根据映射规则将字符映射成其他形式的数据在计算机中存储和传输。 常用的编码 编码制定时间作用ASCII1967表示英语及西欧语言GB23121980国家简体中文字符集&#xff0c;兼容ASCIIUnicode1991国际标准化组织统一标准字…