kafka消费者接收分区测试

【README】

  • 本文演示了当有新消费者加入组后,其他消费者接收分区情况;
  • 本文还模拟了 broker 宕机的情况;
  • 本文使用的是最新的 kafka3.0.0 ;
  • 本文测试案例,来源于 消费者接收分区的5种模型,建议先看模型,refer2   https://blog.csdn.net/PacosonSWJTU/article/details/121853461icon-default.png?t=LA92https://blog.csdn.net/PacosonSWJTU/article/details/121853461

【1】kafka测试环境准备

1)kafka集群 

  • 3个broker,分别为 centos201, centos202, centos203 ,id分别为 1,2,3 ;
  • topic, 3个分区,2个副本;

 2)生产者代码;

public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/** 设置拦截器 */
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 设置阻塞超时时间 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.发送数据 */int order = 1;for (int i = 0; i < 100000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10",j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}/* 11.关闭资源 */producer.close();System.out.println("kafka生产者写入数据完成");}
}

生产者,会向每个分区发送1条消息,发送完成后,睡眠500ms; 共计循环 10w次; 共计5w秒;计划耗时 10+小时;(这里其他同学可以自行设置为其他值)

3)4个消费者;编号为1,2,3,4

public class MyConsumer1 {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092,centos202:9092,centos203:9092");/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的间隔时间*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello10G1"); // group.id/*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 默认值是 lastest/*2.7 关闭自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("hello10"));/* 指定消费者的每个分区从偏移量1开始读取,下面的poll方法就会从位置1开始消费消息  */
//		for (TopicPartition partition : consumer.assignment()) {
//			consumer.seek(partition, 1);
//		}// 消费消息try {// 死循环while(!Thread.interrupted()) {try {System.out.println(DateUtils.getNowTimestamp() + " 消费者1-等待消费消息");TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息-获取数据ConsumerRecords<String, String> consumerRds  = consumer.poll(100);// 遍历 ConsumerRecordsfor(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消费者1-分区【" + rd.partition() + "】offset【" + rd.offset() + "】 -> " + DateUtils.getNowTimestamp() + rd.key() + "--" + rd.value());}consumer.commitSync(); // 同步提交}} finally {// 记得关闭消费者consumer.close();}}
}

这样的消费者有4个,分别编号为 消费者 1,2,3,4 ;我的意思是4个不同的消费者类,以便打印日志标识;

我的消费者消费的是 lastest 最新产生的消费,这里可以自行设置为其他值,如 earlies;

4)添加日志配置,不打印 debug日志(因为kafka消费者debug日志很多)

新建 logback.xml ,设置仅打印info以上级别日志;

<?xml version="1.0" encoding="UTF-8"?>
<configuration><logger name="org.apache.kafka.clients" level="info" />
</configuration>

5)为了直观展示消费详情,我会用命令行启动4个不同消费者,而用idea启动生产者;但编译都通过maven;


【2】kafka测试

【2.1】测试1:当有新消费者加入后,整个消费者组成员接收分区情况; 

写在前面: 文末会po出命令行启动消费者的命令及参数;

消费者接收分区消息模型,参见

step0)启动 生产者,发送消息到kafka;

step2)命令行启动消费者1,消息消费日志如下:

消费者1接收了3个分区消息; 

 step2)命令行启动消费者2,群组消费日志如下:

消费者1接收了个分区2消息;

消费者2接收了分区0和分区2的消息;

  step3)命令行继续启动消费者3,群组消费日志如下:

消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;

 step4)命令行继续启动消费者4, 日志如下:

消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;

消费者4空闲;

 


【2】 模拟kafka broker 宕机

写在前面,模拟宕机前先查看 topic 详情

(图1)

step1) 停止掉 201 broker的服务

情况1:topic的分区没有受影响,但leader 副本选举为3,比较本图和图1,看差别; 

 情况2:所有消费者全部阻塞,直到超时全部抛出异常;

等待 kafka集群的控制器,首领副本选择完成后,又可以接收消费者请求; 

  • 补充1: 这里有一小段时间延时,即当有broker宕机后,需要重新选举控制器,首领副本等;而且会发生分区再均衡

 step2)重启 201;消费日志:如下:

消费者1接收了个分区1消息;
消费者2接收了分区2的消息;
消费者3空闲;
消费者4接收了分区0的消息;

 之所以 消费者3空闲,消费者4忙碌,是因为 broker 动态上下线,导致了分区再均衡使得分区所有权从消费者A转到消费者B(201宕机前,是消费者3忙碌,消费者4空闲);

【小结】

1,要保证kafka消息可靠性,需要 生产者,broker,消费者3方的全力配合;

2,本文这里仅记录了一部分 kafka集群异常的情况;


【附录】

命令行启动消费者命令及参数;仅供参考;因为路径肯定不一样;

其实,这是拷贝idea的执行日志里的命令,如下:

 

java -classpath D:\Java\jdk1.8.0_172\jre\lib\charsets.jar;D:\Java\jdk1.8.0_172\jre\lib\deploy.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_172\jre\lib\javaws.jar;D:\Java\jdk1.8.0_172\jre\lib\jce.jar;D:\Java\jdk1.8.0_172\jre\lib\jfr.jar;D:\Java\jdk1.8.0_172\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_172\jre\lib\jsse.jar;D:\Java\jdk1.8.0_172\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_172\jre\lib\plugin.jar;D:\Java\jdk1.8.0_172\jre\lib\resources.jar;D:\Java\jdk1.8.0_172\jre\lib\rt.jar;D:\workbench_idea\study4vw\vwstudy22\target\classes;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.5.4\spring-boot-starter-web-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.4\spring-boot-starter-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot\2.5.4\spring-boot-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.4\spring-boot-autoconfigure-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.4\spring-boot-starter-logging-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-classic\1.2.5\logback-classic-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-core\1.2.5\logback-core-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.5.4\spring-boot-starter-json-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.5.4\spring-boot-starter-tomcat-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.52\tomcat-embed-core-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.52\tomcat-embed-el-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.52\tomcat-embed-websocket-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\kafka\kafka-clients\3.0.0\kafka-clients-3.0.0.jar;D:\software_cluster\mvn_repo\.m2\repository\com\github\luben\zstd-jni\1.5.0-2\zstd-jni-1.5.0-2.jar;D:\software_cluster\mvn_repo\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.1\snappy-java-1.1.8.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-simple\1.7.25\slf4j-simple-1.7.25.jar kafka.consumer.MyConsumer2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329738.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python数据分析架构_Python数据分析

引言&#xff1a;本文重点是用十分钟的时间帮读者建立Python数据分析的逻辑框架。其次&#xff0c;讲解“如何通过Python 函数或代码和统计学知识来实现数据分析”。本次介绍的建模框架图分为六大版块&#xff0c;依次为导入数据&#xff0c;数据探索&#xff0c;数据处理&…

JAVA面试常考系列七

转载自 JAVA面试常考系列七 题目一 Swing的方法中&#xff0c;有哪些是线程安全的&#xff1f; Swing的规则是&#xff1a;当Swing组件被具现化时&#xff0c;所有可能影响或依赖于组件状态的代码都应该在事件派发线程中执行。 因此有3个线程安全的方法&#xff1a; repaint()…

图片中的Build 2016

微软主办的Build 2016大会刚刚落幕&#xff0c;让我们通过下面的图片集锦来回顾大会的一些容易被人忽视的细节。 Xamarin加入微软大家庭 微软公司于二月底花大价钱买下了Xamarin这家移动开发平台提供商&#xff0c;终于补全了它Mobile First Cloud First战略的短板。 图片一&am…

diy实现spring依赖注入

【README】 本文diy代码实现了 spring 依赖注入&#xff0c;一定程度上揭示了依赖注入原理&#xff1b; 【1】控制反转-Inversion of Control 是一种编码思想&#xff0c;简而言之就是 应用程序A可以使用组件B&#xff0c;但A无法控制B的生命周期&#xff08;如创建&#xff…

html 中一个格子拆分成两个_一个效果惊人的数字游戏

安爸曾多次讲过数学推理能力对孩子成长的重要性&#xff0c;听到有位家长说自己用扔骰子的方法教孩子数学等式。步骤大致是扔骰子时&#xff0c;如果骰子是3&#xff0c;就在棋盘上从0出发走3步&#xff0c;并且写出033的加法等式。扔到负数就后退&#xff0c;写出减法等式。科…

JAVA面试常考系列八

转载自 JAVA面试常考系列八 题目一 JDBC是什么&#xff1f; JDBC&#xff08;Java DataBase Connectivity,java数据库连接&#xff09;是一种用于执行SQL语句的Java API&#xff0c;可以为多种关系数据库提供统一访问&#xff0c;由一组用Java语言编写的类和接口组成。JDBC提供…

【广州/深圳 活动】 MVP社区巡讲

紧跟当今的技术发展趋势还远远不够&#xff0c;我们要引领变革&#xff01;加入本地技术专家社区&#xff0c;获取真实案例、实况培训演示以及探讨新一代解决方案。在此活动中&#xff0c;您将&#xff1a; 了解如何运用开源&#xff08;OSS&#xff09;技术、Microsoft 技术及…

java socket实现简单即时通讯

【1】socket服务器 /*** Description 即时消息服务器* author xiao tang* version 1.0.0* createTime 2022年01月23日*/ public class IMSocketServer {private static int PORT 13;public static void main(String[] args) {ServerSocket server null;try {// 开启端口serv…

蝌蚪网课助手mac_疫情期间如何录网课?(干货教程)手把手教你录出高质量网课。...

鉴于国外疫情的持续爆发&#xff0c;中小学开学日期进一步延期&#xff0c;我们的网课教学也同样面临持续后延。我们的很多教师朋友&#xff0c;可能此时他们正需要这么一个教程来熟悉网课的录制方法。于是这篇文章就应运而生了&#xff0c;希望它能给各位老师带来些许帮助。​…

JAVA面试常考系列九

转载自 JAVA面试常考系列九 题目一 RMI架构层的结构是如何组成的&#xff1f; RMI体系结构由三层组成&#xff0c;分别是&#xff1a; 存根和骨架层&#xff08;Stub and Skeleton Layer&#xff09; 远程引用层&#xff08;Remote Reference Layer&#xff09; 传输层&#xf…

WebAPI前置知识:HTTP与RestfulAPI

对HTTP协议的基本了解是能理解并使用RestFul风格API的基础&#xff0c;在了解了这些基础之后&#xff0c;使用各种RestFul的开发框架才能得心应手。我一开始使用WebApi的时候就因为对这些知识缺乏了解&#xff0c;觉得用起来各种不顺手&#xff0c;直到熟悉了这些HTTP的知识后&…

Java三种代理模式-静态代理,动态代理和cglib代理

【README】 本文阐述了3种代理模式的定义&#xff0c;并编写了代码测试案例&#xff1b; 代理其实是一种设计模式&#xff0c;可以在访问目标对象的方法上下文添加处理逻辑&#xff08;扩展目标对象的功能&#xff09;&#xff0c;是 切面编程的基石&#xff1b; 【举个例子】…

python遗传算法工具箱的使用_遗传算法的python实现,非常值得看的一篇文章

遗传算法是一种智能优化算法&#xff0c;通常用于求解复杂的数学问题。相比于传统方法&#xff0c;遗传算法摒弃了盲目的穷举或完全随机的求解策略&#xff0c;借鉴了自然界优胜劣汰、自然进化的思想&#xff0c;快速逼近最优解。上文对遗传算法的基本内容进行了介绍&#xff0…

JAVA面试常考系列十一

转载自 JAVA面试常考系列十一 题目一 什么是JSP&#xff1f; JSP(Java Server Page)是一个文本文档&#xff0c;是一种将静态内容和动态生成内容混合在一起的技术。 JSP包含两种类型的文本&#xff1a;静态数据和JSP元素。静态数据可以用任何基于文本的格式表示&#xff0c;如H…

.NET跨平台实践:用C#开发Linux守护进程

Linux守护进程&#xff08;Daemon&#xff09;是Linux的后台服务进程&#xff0c;它脱离了与控制终端的关联&#xff0c;直接由Linux init进程管理其生命周期&#xff0c;即使你关闭了控制台&#xff0c;daemon也能在后台正常工作。 一句话&#xff0c;为Linux开发与控制台无关…

ThreadLocalRandom与Random区别

转自&#xff1a; 一文秒懂 Java ThreadLocalRandom - Java 一文秒懂 - 简单教程&#xff0c;简单编程随机数生成是一个非常常见的操作&#xff0c;而且 Java 也提供了 java.util.Random 类用于生成随机数&#xff0c;而且呢&#xff0c;这个类也是线程安全的&#xff0c;就是…

python自动配置文件_【python接口自动化】- ConfigParser配置文件的使用

前言&#xff1a;目前我们使用的绝大多数计算机程序&#xff0c;无论是办公软件&#xff0c;浏览器&#xff0c;甚至游戏、视频都是通过菜单界面系统配置的&#xff0c;它几乎成了我们使用机器的默认方式。而在python中&#xff0c;也有这样的一个配置模块可以把代码可配置化。…

JAVA面试常考系列十

转载自 JAVA面试常考系列十 题目一 Servlet是什么&#xff1f; Servlet&#xff08;Server Applet&#xff09;是Java Servlet的简称&#xff0c;称为小服务程序或服务连接器&#xff0c;是用Java编写的服务器端程序&#xff0c;主要的作用是处理客户端请求并生成动态Web内容。…

DotNet 资源大全

Awesome DotNet&#xff0c;这又是一个 Awesome XXX 系列的资源整理&#xff0c;由 quozd 发起和维护。内容包括&#xff1a;编译器、压缩、应用框架、应用模板、加密、数据库、反编译、IDE、日志、风格指南等。 伯乐在线已在 GitHub 上发起「DotNet 资源大全中文版」的整理。欢…

javabean与json转换(fastjson与jackson两个版本)

【README】 本文演示了 javabean与json转换的开发方式&#xff1b; 要想 javabean的属性名 与 json的字段名不一致&#xff0c;也是可以转换的&#xff1b; 之前需要引入 ali.fastjson <dependency><groupId>com.alibaba</groupId><artifactId>fas…