java客户端作为kafka消费者测试

【README】

本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演; 

 

【1】普通消费者

设置消费者组;

重置消费者的offset, 即每次都从最头开始消费(默认仅保持7天内数据) ;

类似于 命令行 --from-beginning

kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning

小结:从头开始消费,必须满足2个条件;

条件1: 必须重新换组, 如本文中的消费者组 从 sichuan 更新为 sichuan1 ;
条件2: 需要设置offset, 修改为 earliest, 默认值是 lastest;

/*** 普通消费者*/
public class MyConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] " + rd.key() + "--" + rd.value()); }} /* 关闭消费者 */ 
//		consumer.close(); }
}

 从官网可以找到以上配置值; https://kafka.apache.org/0110/documentation.html#configuration

 

【2】kafka消费者-手动提交offset 

手动提交offset有3种方式:

  • 方式1:同步手动提交;
  • 方式2:异步手动提交; 
  • 方式3:自定义手动提交策略;

0)为啥需要手动提交?

kafka自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

1)关闭自动提交(默认为true)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

第一次启动 consumer 从 90 开始消费;
第2次启动相同 consumer ,还是从90开始消费;

2) 如何使用手动提交?

kafka提供了手动提交offset的api;
方法1:commitSync 同步提交:  ;
方法2:commitAsync 异步提交;
两者相同点:都会将本次 poll  的一批数据最高的偏移量提交; 
不同点是, commitSync 阻塞当前线程,一直到提交成功, 并且会自动失败重试;
而 commitAsync 没有失败重试机制, 可能提交失败; 

3)同步手动提交offset

/*** 手动同步提交offset */
public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */ 
//		consumer.close(); }
}

4)异步手动提交offset 

/*** 异步手动提交offset  */
public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */  consumer.commitAsync(new OffsetCommitCallback() {@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception !=null) {System.out.println("异步提交失败");} else {System.out.println("异步提交成功"); }}}); } /* 关闭消费者 */ 
//		consumer.close(); }
}

5)自定义手动提交offset策略

5.0)为啥需要自定义?

因为异步提交有一些问题,如下:
先消费数据,后提交offset, 可能导致数据重复消费; 
先提交offset, 后走业务逻辑,可能会丢数据; 

5.1)应用场景:

把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面;

5.2)如何实现?需要实现 ConsumerRebalanceListener 来实现。

/*** 自定义手动提交offset策略  */
public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】调用}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】调用  /* 分区分配方法 */for (TopicPartition partition :  partitions) { /*定位到某个 offset*/consumer.seek(partition, 1); // TODO: 这里需要输入1  }}  });/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */ 
//		consumer.close(); }
}

补充: 消费者rebalance 是什么?

消费者 rebalance, 什么时候触发 rebalance?  如 同一个消费者组下的 某个消费者机器宕机,或新增一个消费者机器,都会触发 rebalance,即重新分配  kafka分区数据与 消费者的对应关系; 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330211.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring bean初始化及销毁你必须要掌握的回调方法。

转载自 spring bean初始化及销毁你必须要掌握的回调方法。 spring bean在初始化和销毁的时候我们可以触发一些自定义的回调操作。 初始化的时候实现的方法 1、通过java提供的PostConstruct注解&#xff1b; 2、通过实现spring提供的InitializingBean接口&#xff0c;并重写其a…

java生产者实现kafka拦截器

【RAEDME】 本文中&#xff0c; java客户端作为生产者&#xff0c; centos中consumer线程作为消费者&#xff1b; 【1】拦截器简述 1&#xff09;拦截器是什么&#xff1f; 很明显&#xff0c;为了实现面向切面编码&#xff0c;即在 具体逻辑的上下文 添加一些逻辑&#xff1…

Redis两种客户端:lettuce和Jedis的区别

spring boot 2的spring-boot-starter-data-redis中&#xff0c;默认使用的是lettuce作为redis客户端&#xff0c;它与jedis的主要区别如下&#xff1a; Jedis是同步的&#xff0c;不支持异步&#xff0c;Jedis客户端实例不是线程安全的&#xff0c;需要每个线程一个Jedis实例&…

mysql语句性能开销检测profiling详解

转载自 mysql语句性能开销检测profiling详解 之前我介绍过msyql查询优化explain检查命令的使用&#xff0c;explain主要是检查sql语句的基本性能&#xff0c;sql是否优秀&#xff0c;但不能查看具体的涉及硬件资源的开销&#xff0c;今天要介绍的这个profiling工具可以更细节的…

数据库主从和主备部署介绍

转自&#xff1a; https://www.cnblogs.com/fengzheng/p/13401783.html 数据库层的几种模式 在系统架构中&#xff0c;数据库层主要由如下几种模式&#xff0c;分别是单点模式、主备模式、主从模式。 单点模式 单点模式是最简单的模式&#xff0c;只有一台数据库服务器&…

lv官网编码查询_图文讲解,如何免费自主查询商标近似情况

每一个商业主体&#xff0c;都会给自己的企业、产品、服务注册一个好听的商标&#xff0c;但一个商标名并不是我们喜欢就可以&#xff0c;还要能够得到核准注册。为此&#xff0c;如何能判断出一个商标名是否能够被注册呢&#xff1f;这便需要利用商标的近似判断了(查询该商标名…

hashCode到底有什么用?

转载自 hashCode到底有什么用&#xff1f; hashCode概念 hashCode是jdk根据对象的地址算出来的一个int数字&#xff0c;即对象的哈希码值&#xff0c;代表了该对象在内存中的存储位置。 我们都知道hashCode()方法是顶级类Object类的提供的一个方法&#xff0c;所有的类都可以进…

MySQL、MongoDB、列数据库的区别及应用场景

目录什么是行存储和列存储&#xff1f;什么是MongoDB&#xff08;NoSQL&#xff09;?OLTP和OLAP什么是CAP定理&#xff1f;使用场景行存储的适用场景&#xff1a;列存储的适用场景&#xff1a;MongoDB相对于MySQL的优点更适用MySQL的场景更适用MongoDB的场景个人理解扩展参考什…

转-Redis AOF 持久化详解

转自&#xff1a; https://juejin.cn/post/6844903902991630349 Redis AOF 持久化详解 Redis 是一种内存数据库&#xff0c;将数据保存在内存中&#xff0c;读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出&#xff0c;Redis 的数据就会丢失。 为了…

设计模式之动态代理模式实战

转载自 设计模式之动态代理模式实战 昨天分享了静态代理的概念及存在的缺点&#xff0c;所以今天讲的动态代理模式十分重要。动态代理在我们工作当中应用相当广泛&#xff0c;如Srping AOP就是动态代理的在开源框架的比较出名的应用。 动态代理有两种试&#xff0c;一是通过JD…

南理工计算机博士 年薪_计算机专业的女博士毕业后,进入211大学当讲师,年薪曝光...

现如今&#xff0c;对于很多年轻人来说&#xff0c;高学历就意味着高收入&#xff0c;大家在进入职场以后&#xff0c;通过高学历找到高薪的工作&#xff0c;能够在职场中少走很多弯路&#xff0c;他们在大公司能够跟很多的同事在一起勤快的合作&#xff0c;能够决定他们在以后…

MySQL优化(一):表结构优化

目录表优化数据类型的选择避免列的值为NULLVARCHAR和CHAR日期和时间类型选择标识符&#xff08;主键&#xff09;的类型错误的表结构一张表中有太多列太多的关联适当建立冗余数据混用范式和反范式建立缓存表和汇总表参考表优化 此文章用于记录《高性能MySQL》一书的知识点。 …

centos上安装redis

【README】 本文旨在阐述在linux上安装 redis 的步骤&#xff1b; 【1】具体步骤 step1&#xff09; 把 redis 压缩包 上传到 /opt/software, 并解压 step2&#xff09;打开 redis目录&#xff0c;执行 make&#xff0c; 若没有gcc编译器&#xff0c; 执行命令安装gcc&#…

设计模式之静态代理模式实战

转载自 设计模式之静态代理模式实战静态代理模式很简单&#xff0c;代理类和实现类都实现相同的接口&#xff0c;然后通过代理类来调用实现类的方法。如我们想保存用户信息之前打印用户信息&#xff0c;或者保存用户信息之后把这些信息缓存下来&#xff0c;即在运行方法前后插入…

pandas追加写入excel_[Excel]如果你爱Excel,请学好pandas

现在坐办公室的各种大小团体里&#xff0c;都会有一个“懂Excel的人”&#xff0c;可能那个人是团体里的一员&#xff0c;也可能是和这个团队关系比较好的一个热心人&#xff0c;但总之&#xff0c;你的身边会有这么一个人。如果你环顾四周也没发现&#xff0c;那可能你就是那个…

转:Redis 集群搭建详细指南

转自&#xff1a; https://www.cnblogs.com/mafly/p/redis_cluster.html 【README】 非常棒的一篇文章&#xff0c;感谢作者的分享&#xff1b; 先有鸡还是先有蛋&#xff1f; 最近有朋友问了一个问题&#xff0c;说毕业后去大城市还是小城市&#xff1f;去大公司还是小公…

快速排序算法思想及实现

简介 快速排序是对冒泡排序的一种改进&#xff0c;是一种分治算法&#xff0c;时间复杂度为O(nlogn) 基本思想 先从数列中取出一个数作为基准数。分区过程&#xff0c;将比这个数大的数全放到它的右边&#xff0c;小于或等于它的数全放到它的左边。再对左右区间重复第二步&a…

作为架构师,你必需要搞清楚的概念:POJO、PO、DTO、DAO、BO、VO

转载自 作为架构师&#xff0c;你必需要搞清楚的概念&#xff1a;POJO、PO、DTO、DAO、BO、VOPOJO、PO、DTO、DAO、BO、VO这些概念作为Java开发来说应该全部或者部分遇到过&#xff0c;作为架构师的你想必更是清楚这些概念在不同场景的应用。下面我逐一介绍一下&#xff0c;想必…

搭建redis集群

【README】 redis集群搭建有很多坑儿&#xff0c;注意&#xff1b; 【1】坑er 集合 坑儿1、 [ERR] Sorry, cant connect to node 192.168.163.203:6381 报错现场&#xff1a;执行如下命令新建集群&#xff0c;报错如下&#xff1b; /usr/local/redis-cluster/bin/redis-t…

怎么看cudnn的版本好_针对此次版本削弱,怎么用好嫦娥!

大招的费蓝 & 蓝量伤害比 &#xff08;先把嫦娥的冷门知识点放置顶&#xff0c;干货感比较重要&#xff09;嫦娥100%蓝时&#xff0c;所有造成的伤害均要2倍&#xff0c;蓝量每下降1%&#xff0c;伤害倍数就下降2%。即嫦娥在50%蓝的时候&#xff0c;伤害就不加倍了。嫦娥最…