java客户端作为kafka生产者测试

【README】

1、本文主要对 java客户端作为kafka 生产者进行测试, 消费者由 centos的kafka命令行线程扮演; 

2、消息发送: kafka的生产者采用异步发送消息的方式,在消息发送过程中,涉及到2个线程——main线程和sender线程,以及一个线程共享变量 RecordAccumulator。main线程将消息发送给 RecordAccumulator,sender线程不断从 RecordAccumulator 中读取数据发送到 kafka broker;

step1)生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理;然后再把数据写到 RecordAccumulator; 
step2)send 线程从 RecordAccumulator 中取出数据写入到kafka集群; 

3、开发环境

-- pom.xml<!-- 依赖 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency></dependencies>-- log4j.properties
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

【0】 生产者同步发送消息

为啥需要同步发送? 因为 kafka可以保证单个分区内消息有序,但无法保证全局有序,即多个分区消息有序; 

存在一些业务场景,需要消息有序;

/*** 同步消息生产者*/
public class SyncProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.同步发送数据 */ for (int i = 0; i < 10; i++) { try {Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--D" + i));RecordMetadata rMetadata = future.get(); // 调用future的get方法,让main线程阻塞,就可以实现同步发送 } catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */  producer.close();System.out.println("kafka生产者写入数据完成"); } 
}

 

下面都是异步发送

【1】普通生产者

1.1、生产者代码 

/*** 普通生产者 */
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--D" + i));try {System.out.println(future.get().partition() + "-" + future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */  producer.close();System.out.println("kafka生产者写入数据完成"); } 
}
-- 日志 
0-183203
0-183204
0-183205
0-183206
0-183207
0-183208
0-183209
0-183210
0-183211
0-183212
kafka生产者写入数据完成

1.2、消费者

[root@centos201 ~]# kafka-console-consumer.sh --topic first100 --bootstrap-server centos201:9092
first100-20210101--D0
first100-20210101--D1
first100-20210101--D2
first100-20210101--D3
first100-20210101--D4
first100-20210101--D5
first100-20210101--D6
first100-20210101--D7
first100-20210101--D8
first100-20210101--D9

【2】带回调的生产者

2.1、生产者

/*** 带回调的生产者 */for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--E" + i), (metadata, exception)-> {/* lambda 表达式  */System.out.println(metadata.partition() + " -- " + metadata.offset());});}

2.2、消费者

first100-20210101--E0
first100-20210101--E1
first100-20210101--E2
first100-20210101--E3
first100-20210101--E4
first100-20210101--E5
first100-20210101--E6
first100-20210101--E7
first100-20210101--E8
first100-20210101--E9

【3】创建分区策略的生产者 (指定分区)

0、查看topic, 4个分区,3个副本

[root@centos201 ~]# kafka-topics.sh --describe --topic aaa --zookeeper centos201:2181
Topic:aaa       PartitionCount:4        ReplicationFactor:3     Configs:Topic: aaa      Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 1,2,3Topic: aaa      Partition: 1    Leader: 3       Replicas: 3,2,1 Isr: 1,2,3Topic: aaa      Partition: 2    Leader: 1       Replicas: 1,3,2 Isr: 2,1,3Topic: aaa      Partition: 3    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3

虽然集群只有3台机器, centos201, centos202, centos203 ; 

当我的分区数是4,即分区数可以大于broker数量; 但副本数必须小于等于 broker数量; 

3.1、生产者

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); // 设置分区器 /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("aaa", "aaa-key", "aaa-20210101--B" + i), (metadata, exception)-> {/* lambda 表达式  */System.out.println(metadata.partition() + " -- " + metadata.offset());});}-- 日志
1 -- 112
1 -- 113
1 -- 114
1 -- 115
1 -- 116
1 -- 117
1 -- 118
1 -- 119
1 -- 120
1 -- 121
kafka生产者写入数据完成

3.2、自定义分区器 

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {Integer integer = cluster.partitionCountForTopic(topic);return 1;}@Overridepublic void close() {}
}

3.3、消费者

[root@centos201 ~]# kafka-console-consumer.sh --topic aaa --bootstrap-server centos201:9092
aaa-20210101--C0
aaa-20210101--C1
aaa-20210101--C2
aaa-20210101--C3
aaa-20210101--C4
aaa-20210101--C5
aaa-20210101--C6
aaa-20210101--C7
aaa-20210101--C8
aaa-20210101--C9

小结: 可以查看,即便topic 有4个分区,但我在自定义分区器中指定写入到分区1, 所以生产者只把消息写到分区1; 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330214.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis学习之缓存穿透、缓存击穿和缓存雪崩详解

目录缓存穿透解决方案缓存空对象布隆过滤器缓存击穿解决方案对访问数据库的操作加锁提前缓存热点数据&#xff0c;设置热点数据永不过期缓存雪崩解决方案Redis高可用限流降级数据预热设置合理的过期时间参考缓存穿透 指的是对某个一定不存在的数据进行请求&#xff0c;该请求将…

Redis高可用:主从复制及哨兵模式

目录主从复制作用复制原理使用的方式哨兵模式主从切换过程Redis Sentinel的配置文件参考主从复制 主从复制&#xff0c;是指将一台Redis服务器的数据&#xff0c;复制到其他的Redis服务器。前者称为主节点(master)&#xff0c;后者称为从节点(slave)&#xff1b;数据的复制是单…

java客户端作为kafka消费者测试

【README】 本文主要对 java客户端作为kafka 消费者进行测试&#xff0c; 生产者由 kafka客户端扮演&#xff1b; 【1】普通消费者 设置消费者组&#xff1b; 重置消费者的offset&#xff0c; 即每次都从最头开始消费&#xff08;默认仅保持7天内数据&#xff09; &#xf…

spring bean初始化及销毁你必须要掌握的回调方法。

转载自 spring bean初始化及销毁你必须要掌握的回调方法。 spring bean在初始化和销毁的时候我们可以触发一些自定义的回调操作。 初始化的时候实现的方法 1、通过java提供的PostConstruct注解&#xff1b; 2、通过实现spring提供的InitializingBean接口&#xff0c;并重写其a…

java生产者实现kafka拦截器

【RAEDME】 本文中&#xff0c; java客户端作为生产者&#xff0c; centos中consumer线程作为消费者&#xff1b; 【1】拦截器简述 1&#xff09;拦截器是什么&#xff1f; 很明显&#xff0c;为了实现面向切面编码&#xff0c;即在 具体逻辑的上下文 添加一些逻辑&#xff1…

Redis两种客户端:lettuce和Jedis的区别

spring boot 2的spring-boot-starter-data-redis中&#xff0c;默认使用的是lettuce作为redis客户端&#xff0c;它与jedis的主要区别如下&#xff1a; Jedis是同步的&#xff0c;不支持异步&#xff0c;Jedis客户端实例不是线程安全的&#xff0c;需要每个线程一个Jedis实例&…

mysql语句性能开销检测profiling详解

转载自 mysql语句性能开销检测profiling详解 之前我介绍过msyql查询优化explain检查命令的使用&#xff0c;explain主要是检查sql语句的基本性能&#xff0c;sql是否优秀&#xff0c;但不能查看具体的涉及硬件资源的开销&#xff0c;今天要介绍的这个profiling工具可以更细节的…

数据库主从和主备部署介绍

转自&#xff1a; https://www.cnblogs.com/fengzheng/p/13401783.html 数据库层的几种模式 在系统架构中&#xff0c;数据库层主要由如下几种模式&#xff0c;分别是单点模式、主备模式、主从模式。 单点模式 单点模式是最简单的模式&#xff0c;只有一台数据库服务器&…

lv官网编码查询_图文讲解,如何免费自主查询商标近似情况

每一个商业主体&#xff0c;都会给自己的企业、产品、服务注册一个好听的商标&#xff0c;但一个商标名并不是我们喜欢就可以&#xff0c;还要能够得到核准注册。为此&#xff0c;如何能判断出一个商标名是否能够被注册呢&#xff1f;这便需要利用商标的近似判断了(查询该商标名…

hashCode到底有什么用?

转载自 hashCode到底有什么用&#xff1f; hashCode概念 hashCode是jdk根据对象的地址算出来的一个int数字&#xff0c;即对象的哈希码值&#xff0c;代表了该对象在内存中的存储位置。 我们都知道hashCode()方法是顶级类Object类的提供的一个方法&#xff0c;所有的类都可以进…

MySQL、MongoDB、列数据库的区别及应用场景

目录什么是行存储和列存储&#xff1f;什么是MongoDB&#xff08;NoSQL&#xff09;?OLTP和OLAP什么是CAP定理&#xff1f;使用场景行存储的适用场景&#xff1a;列存储的适用场景&#xff1a;MongoDB相对于MySQL的优点更适用MySQL的场景更适用MongoDB的场景个人理解扩展参考什…

转-Redis AOF 持久化详解

转自&#xff1a; https://juejin.cn/post/6844903902991630349 Redis AOF 持久化详解 Redis 是一种内存数据库&#xff0c;将数据保存在内存中&#xff0c;读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出&#xff0c;Redis 的数据就会丢失。 为了…

设计模式之动态代理模式实战

转载自 设计模式之动态代理模式实战 昨天分享了静态代理的概念及存在的缺点&#xff0c;所以今天讲的动态代理模式十分重要。动态代理在我们工作当中应用相当广泛&#xff0c;如Srping AOP就是动态代理的在开源框架的比较出名的应用。 动态代理有两种试&#xff0c;一是通过JD…

南理工计算机博士 年薪_计算机专业的女博士毕业后,进入211大学当讲师,年薪曝光...

现如今&#xff0c;对于很多年轻人来说&#xff0c;高学历就意味着高收入&#xff0c;大家在进入职场以后&#xff0c;通过高学历找到高薪的工作&#xff0c;能够在职场中少走很多弯路&#xff0c;他们在大公司能够跟很多的同事在一起勤快的合作&#xff0c;能够决定他们在以后…

MySQL优化(一):表结构优化

目录表优化数据类型的选择避免列的值为NULLVARCHAR和CHAR日期和时间类型选择标识符&#xff08;主键&#xff09;的类型错误的表结构一张表中有太多列太多的关联适当建立冗余数据混用范式和反范式建立缓存表和汇总表参考表优化 此文章用于记录《高性能MySQL》一书的知识点。 …

centos上安装redis

【README】 本文旨在阐述在linux上安装 redis 的步骤&#xff1b; 【1】具体步骤 step1&#xff09; 把 redis 压缩包 上传到 /opt/software, 并解压 step2&#xff09;打开 redis目录&#xff0c;执行 make&#xff0c; 若没有gcc编译器&#xff0c; 执行命令安装gcc&#…

设计模式之静态代理模式实战

转载自 设计模式之静态代理模式实战静态代理模式很简单&#xff0c;代理类和实现类都实现相同的接口&#xff0c;然后通过代理类来调用实现类的方法。如我们想保存用户信息之前打印用户信息&#xff0c;或者保存用户信息之后把这些信息缓存下来&#xff0c;即在运行方法前后插入…

pandas追加写入excel_[Excel]如果你爱Excel,请学好pandas

现在坐办公室的各种大小团体里&#xff0c;都会有一个“懂Excel的人”&#xff0c;可能那个人是团体里的一员&#xff0c;也可能是和这个团队关系比较好的一个热心人&#xff0c;但总之&#xff0c;你的身边会有这么一个人。如果你环顾四周也没发现&#xff0c;那可能你就是那个…

转:Redis 集群搭建详细指南

转自&#xff1a; https://www.cnblogs.com/mafly/p/redis_cluster.html 【README】 非常棒的一篇文章&#xff0c;感谢作者的分享&#xff1b; 先有鸡还是先有蛋&#xff1f; 最近有朋友问了一个问题&#xff0c;说毕业后去大城市还是小城市&#xff1f;去大公司还是小公…

快速排序算法思想及实现

简介 快速排序是对冒泡排序的一种改进&#xff0c;是一种分治算法&#xff0c;时间复杂度为O(nlogn) 基本思想 先从数列中取出一个数作为基准数。分区过程&#xff0c;将比这个数大的数全放到它的右边&#xff0c;小于或等于它的数全放到它的左边。再对左右区间重复第二步&a…