flink kafka的enableCommitOnCheckpoints 和 enable.auto.commit 参数

背景

每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰,本文就来从源码看看这两个参数的作用

enableCommitOnCheckpoints 和 enable.auto.commit参数

1.FlinkKafkaConsumerBase的open方法,查看offsetCommitMode的赋值

public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());}

2.OffsetCommitModes.fromConfiguration方法

public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}

从这个代码可知,enableCommitOnCheckpoint 和 enableAutoCommit是不会同时存在的,也就是flink如果在checkpoint的时候提交偏移,他就肯定不会设置enableAutoCommit自动提交,反之亦然

enableCommitOnCheckpoint 提交偏移的关键代码

1.FlinkKafkaConsumerBase.snapshotState方法

public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
//  这里如果是checkpoint模式会在checkpoint的时候保存offset到状态中
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}}

2.FlinkKafkaConsumerBase.notifyCheckpointComplete方法

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);

enable.auto.commit参数

1.KafkaConsumerThread.run线程

if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}

2.KafkaConsumer的poll方法

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
//  updateAssignmentMetadataIfNeeded方法是关键
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}

3.KafkaConsumer.updateAssignmentMetadataIfNeeded方法

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (coordinator != null && !coordinator.poll(timer)) {
return false;
}return updateFetchPositions(timer);
}4.ConsumerCoordinator.poll方法public boolean poll(Timer timer) {
maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if (subscriptions.partitionsAutoAssigned()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}if (!client.ensureFreshMetadata(timer)) {
return false;
}maybeUpdateSubscriptionMetadata();
}if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
//  这里是重点
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}

5.ConsumerCoordinatormaybeAutoCommitOffsetsAsync方法

public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}

看到没,这里就是判断autoCommitEnabled的地方,这里如果打开了自动提交功能的话,就会进行offset的提交

特别重要的两点

1.kafkaconsumer当开始进行消费时,即使不提交任何偏移量,也不影响它消费消息,他还是能正常消费kafka主题的消息,这里提交偏移的主要作用在于当kafkaconsumer断线然后需要重连kafka broker进行消费时,此时它一般会从它最后提交的offset位置开始消费(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下),这才是consumer提交offset偏移的最大意义

2.对于flink来说,由于每次重启的时候,flink的consumer都会从checkpoint中把偏移取出来并设置,所以flink的consumer在消息消费过程中无论通过enableCommitOnCheckpoint 还是enableAutoCommit提交的偏移并没有意义,因为并没有使用到,它的意义只在于flink没有从checkpoint中启动时,此时flink的consumer才会从enableCommitOnCheckpoint 、enableAutoCommit提交的偏移开始消费消息(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下)

参考文章:https://blog.csdn.net/qq_42009500/article/details/119875158

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/834580.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AlphaFold3: Google DeepMind的的新突破

AlphaFold 3的论文今天在Nature期刊发表啦!这可是AI在生物领域最厉害的突破的最新版本。AlphaFold-3的新招就是用扩散模型去"画出"分子的结构。它一开始先从一团模模糊糊的原子云下手,然后慢慢透过去噪把分子变得越来越清楚。 Alphafold3 我们活在一个从Llama和Sora那…

C# WinForm —— 12 ListBox绑定数据

ListBox加载大量数据时&#xff0c;避免窗体闪烁的方法&#xff1a; 在加载语句的前后分别加上 BeginUpdate()方法 和 EndUpdate()方法 指定一个集合为绑定的数据源 1. 首先&#xff0c;右键项目&#xff0c;添加类 2. 在新建的类文件中添加属性值信息 3. 构建初始化的对象…

跟TED演讲学英文:Teachers need real feedback by Bill Gates

Teachers need real feedback Link: https://www.ted.com/talks/bill_gates_teachers_need_real_feedback Speaker: Bill Gates Date: May 2013 文章目录 Teachers need real feedbackIntroductionVocabularyTranscriptSummary后记 Introduction Until recently, many teach…

MYSQL-8.调优

性能优化思维 整体思维 木桶效应&#xff1a;系统的性能符合木桶效应&#xff08;一个木桶能装多少水&#xff0c;取决于木桶中最短的那块木板&#xff09;&#xff0c;所以性能优化需要从多个方面去考虑&#xff0c;如架构优化、业务优化、前端优化、中间件调优、网关优化、…

Python_AI库 Pandas的loc和iloc的区别与使用实例

Python中Pandas的loc和iloc的区别与使用实例 在Pandas中&#xff0c;loc和iloc是两个常用的方法&#xff0c;用于基于标签&#xff08;label&#xff09;和整数位置&#xff08;integer location&#xff09;来选择数据。尽管两者在功能上有重叠&#xff0c;但它们在用法和性能…

去哪儿网机票服务请求体bella值逆向

作者声明&#xff1a;文章仅供学习交流与参考&#xff01;严禁用于任何商业与非法用途&#xff01;否则由此产生的一切后果均与作者无关&#xff01;如有侵权&#xff0c;请联系作者本人进行删除&#xff01; 一、加密定位 直接全局搜索bella&#xff0c;在可疑的地方下断&…

汇编--栈和寄存器

栈 栈是一种运算受限的线性表&#xff0c;其限定仅在表尾进行插入和删除操作的线性表&#xff0c;表尾也被叫做栈顶。简单概括就是我们对于元素的操作只能够在栈顶进行&#xff0c;也造就了其先进后出的结构特性。 栈 这种内存空间其实本质上有两种操作&#xff1a;将数据放入…

在Ubuntu安装RPM文件

Ubuntu软件源包含数千个deb软件包&#xff0c;可以从Ubuntu软件中心或使用apt命令行安装。 Deb是所有基于Debian的Linux发行版&#xff0c;例如包括Ubuntu&#xff0c;Linux mint等发行版使用的安装包格式。 如果某些软件在Ubuntu软件源中不可用&#xff0c;可以通过启用适当的…

物联网实战--平台篇之(五)账户界面

目录 一、界面框架 二、首页(未登录) 三、验证码登录 四、密码登录 五、帐号注册 六、忘记密码 本项目的交流QQ群:701889554 物联网实战--入门篇https://blog.csdn.net/ypp240124016/category_12609773.html 物联网实战--驱动篇https://blog.csdn.net/ypp240124016/cat…

物联网网关制造生产全流程揭秘!

如果您正有开发和定制物联网网关的计划&#xff0c;找一个专业的物联网设备厂商协助您制造生产物联网网关可以节省大量时间和成本&#xff0c;可以让您能专注于当前核心业务&#xff0c;而无需将精力过多地投入到自己不擅长的领域。 当然&#xff0c;了解物联网网关的测试和制…

HSA-42014和安泰ATA-4014C高压功率放大器对比

企业背景&#xff1a; Aigtek是一家来自中国的专业从事测量仪器研发、生产和销售的高科技企业。公司主要研发和生产功率放大器、功率放大器模块、功率信号源、计量校准源等产品。核心团队主要是来自西安交通大学及西北工业大学的专家教授等联合组成研发团队&#xff0c;目前拥有…

OPC UA(二)

一、配置PC Station 在TIA博途软件平台中配置PC Station&#xff0c;见&#xff08;一&#xff09; 二、使用OPC Scout V10测试通信结果 1. 添加OPC UA Server站点 1.1启动OPC Scout V10 1.2 打开OPC Scout V10&#xff0c;在 Server explorer窗口&#xff0c;查找UA serv…

Linux流量分析工具 | nethogs

在应急过程中&#xff0c;经常会遇到应用访问缓慢&#xff0c;网络阻塞的情况&#xff0c;分析原因可能会想到存在恶意程序把带宽占满的可能。通过这样一个小工具可以快速定位异常占用带宽程序的路径、PID、占用流量大小或是排除由带宽占满导致服务器缓慢的猜想。 一、简介 Ne…

Python学习——环境搭建

Python 介绍 Python&#xff08;英国发音&#xff1a;/ˈpaɪθən/ 美国发音&#xff1a;/ˈpaɪθɑːn/&#xff09;是一种广泛使用的解释型、高级编程、通用型编程语言&#xff0c;由吉多范罗苏姆创造&#xff0c;第一版发布于1991年。可以视之为一种改良&#xff08;加入…

DDD领域驱动模型设计

醍醐灌顶了朋友们 第一次写ddd还是 一路走来 丢失了东西 现在倒是也能找回来 只是有点可惜了 选择比努力更重要 独立功能 应用层&#xff1a;组织业务逻辑 领域&#xff1a;实体对象领域&#xff0c;业务核心 数据仓库&#xff1a; 不影响业务封装了数据操作&#xff0c;…

嵌入式开发九:STM32时钟系统

时钟对于单片机来说是非常重要的&#xff0c;它为单片机工作提供一个稳定的机器周期从而使系统能够正常运行。时钟系统犹如人的心脏&#xff0c;一旦有问题整个系统就崩溃。我们知道 STM32 属于高级单片机&#xff0c;其内部有很多的外设&#xff0c;但不是所有外设都使用同一时…

云计算安全扩展要求解析

云计算技术的信息系统&#xff0c;称为云计算平台&#xff0f;系统。 云计算平台&#xff0f;系统由设施、硬 件、资源抽象控制层、虚拟化计算资源、软件平台和应用软件等组成。 软件即服务(SaaS)、平台即服务 (PaaS) 、基础设施即服务ClaaS)是三种基本的云计算服务模式。在不同…

实验10配置 IPv4 和 IPv6 静态和 默认路由(课内实验)

上面这个是实验描述 下面是给的实验图 接下来我们跟着实验一步一步进行下去 第 1 部分&#xff1a;配置 IPv4 静态和 浮动静态默认路由配置ipv4静态路由&#xff1a;配置 IPv4静态和 浮动静态默认路由 步骤 1&#xff1a;配置一条 IPv4 静态 默认路由。在 Edge_Router 上&am…

ASP.NET校园新闻发布系统的设计与实现

摘 要 校园新闻发布系统是在学校区域内为学校教育提供资源共享、信息交流和协同工作的计算机网络信息系统。随着网络技术的发展和Internet应用的普及&#xff0c;互联网已成为人们获取信息的重要来源。由于现在各大学校的教师和学生对信息的需求越来越高&#xff0c;校园信息…

Linux-笔记 修改开发板默认时区

1. 时区文件 使用命令date -R查看当前的默认时区&#xff0c;date - R命令会自动解析/etc/localtime 文件&#xff0c;而该文件又是指向“ /usr/share/zoneinfo/$主时区/$次时区 ”&#xff0c;当需要更改到指定的时区只要将/etc/localtime 文件软链接到 ”/usr/share/zoneinf…