建设银行官方网站下载低代码app开发平台
web/
2025/9/29 22:34:49/
文章来源:
建设银行官方网站下载,低代码app开发平台,提升学历正规渠道,wordpress文章自动生成标签目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗#xff1f;三、如果我想手动提交偏移量#xff0c;该怎么做#xff1f; 一、为什么需要带有 subscribe 的 group.id
消费概念#xff1a; Kafka 使用消费者组的概念来实现主题的… 目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗三、如果我想手动提交偏移量该怎么做 一、为什么需要带有 subscribe 的 group.id
消费概念 Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次无论该组中实际有多少个消费者。所以 group 参数是强制性的如果没有组Kafka 将不知道如何对待订阅同一主题的其他消费者。偏移量 每当我们启动一个消费者时它都会加入一个消费者组然后根据该消费者组中的其他消费者数量为其分配要读取的分区。对于这些分区它会检查列表读取偏移量是否已知如果找到它将从这一点开始读取消息。如果没有找到偏移量则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。
二、我们需要使用commitSync手动提交偏移量吗 是否需要手动提交偏移 是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下此设置为 true这意味着消费者将定期自动提交其偏移量由auto.commit.interval.ms 决定提交的频率。如果将其设置为 false那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因由于偏移量是自动提交的因此它将使用该偏移量。 有没有办法从头开始重播消息 如果想每次都从头开始读取可以调用seekToBeginning如果不带参数调用它将重置为所有订阅分区中的第一条消息或者仅重置您传入的那些分区。 seekToBeginning 查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区则查找所有当前分配的分区的第一个偏移量。 public class MyListener implements ConsumerSeekAware {...Overridepublic void onPartitionsAssigned(MapTopicPartition, Long assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}有没有办法从最后开始重播消息 有的可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。 public class MyListener extends AbstractConsumerSeekAware {KafkaListener(...)void listn(...) {...}
}public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}三、如果我想手动提交偏移量该怎么做 1、禁用自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);提交方法 对于手动提交KafkaConsumers提供了两种方法即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用在偏移量成功提交后返回commitAsync()则立即返回。如果想知道提交是否成功可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意在两次提交调用中消费者都会提交最新poll()调用的偏移量。 举个例子假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时偏移量 6 将被提交因为这是消费者客户端跟踪的最新偏移量。 同时commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量如果你使用允许你指定的相应重载那么MapTopicPartition, OffsetAndMetadata消费者将仅提交指定的偏移量即映射可以包含分配的分区的任何子集 并且指定的偏移量可以为任意值。 同步提交 阻塞线程直到提交成功或遇到不可恢复的错误在这种情况下它被抛出给调用者 while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s, record.offset(), record.key(), record.value());consumer.commitSync();}
}对于 for 循环中的每次迭代只有在consumer.commitSync()成功返回或因抛出异常而中断后代码才会移至下一次迭代。 异步提交 是一种非阻塞方法。调用它不会阻塞线程。相反它将继续处理以下指令无论最终是成功还是失败。 while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s, record.offset(), record.key(), record.value());consumer.commitAsync(callback);}
}对于 for 循环中的每次迭代无论consumer.commitAsync()最终会发生什么代码都会移至下一次迭代。并且提交的结果将由定义的回调函数处理。 权衡延迟与数据一致性 1、如果必须确保数据一致性请选择commitSync()因为它将确保在执行任何进一步操作之前你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的你将花费更多的时间来等待提交完成这会导致高延迟。 2、如果可以接受某些数据不一致并希望具有低延迟请选择commitAsync()因为它不会等待完成。相反它只会发出提交请求并稍后处理来自 Kafka 的响应成功或失败同时代码将继续执行。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/84115.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!