java生产者实现kafka拦截器

【RAEDME】

本文中, java客户端作为生产者, centos中consumer线程作为消费者;

 

【1】拦截器简述

1)拦截器是什么? 很明显,为了实现面向切面编码,即在 具体逻辑的上下文 添加一些逻辑;如

逻辑1
具体逻辑
逻辑2

2)什么时候调用拦截器?这就要从 kafka生产者发送数据说起了;

kafka生产者使用了2个线程来发送数据: 
step1)生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理;然后再把数据写到 RecordAccumulator; 
step2)send 线程从 RecordAccumulator 中取出数据写入到broker list;

【2】拦截器实现

/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));

1)需求

第1个拦截器, 在消息发送前将时间戳加到消息value的 最前面; 
第2个拦截器,在消息发送后更新成功发送消息数或失败发送消息数; 

2)代码实现

-- 添加拦截器

/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));

-- 带有拦截器的生产者

/*** 带有拦截器的生产者*/ 
public class InterceptorProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 添加拦截器 */props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("first", "key"+i, "value-first-20210101--J" + i));}/* 11.关闭资源 */  producer.close(); // 间接调用了拦截器的close 方法  System.out.println("kafka生产者写入数据完成"); } 
}
/*** 时间拦截器-在消息前添加时间戳 */
public class TimeInterceptor implements ProducerInterceptor<String, String>{@Overridepublic void configure(Map<String, ?> configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 获取消息值 String value = record.value();return new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + record.value()); }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用; 且通常在 生产者回调逻辑触发之前调用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}
}
/*** 计数拦截器*/
public class CounterInterceptor implements ProducerInterceptor<String, String>{int sucCounter = 0;int errCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record; }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用; 且通常在 生产者回调逻辑触发之前调用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata !=null ) {sucCounter++;} else {errCounter++; }}@Overridepublic void close() {System.out.println("sucCounter =" + sucCounter + ", errCounter=" + errCounter);  }
}

-- java生产者打印日志 

sucCounter =10, errCounter=0
kafka生产者写入数据完成

3)centos消费者消费结果

[root@centos201 ~]# kafka-console-consumer.sh --topic first --zookeeper centos201:2181   
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1609599630884,value-first-20210102--A0
1609599631203,value-first-20210102--A1
1609599631204,value-first-20210102--A6
1609599631203,value-first-20210102--A2
1609599631203,value-first-20210102--A3
1609599631204,value-first-20210102--A4
1609599631204,value-first-20210102--A5
1609599631204,value-first-20210102--A7
1609599631204,value-first-20210102--A8
1609599631204,value-first-20210102--A9

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330209.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis两种客户端:lettuce和Jedis的区别

spring boot 2的spring-boot-starter-data-redis中&#xff0c;默认使用的是lettuce作为redis客户端&#xff0c;它与jedis的主要区别如下&#xff1a; Jedis是同步的&#xff0c;不支持异步&#xff0c;Jedis客户端实例不是线程安全的&#xff0c;需要每个线程一个Jedis实例&…

mysql语句性能开销检测profiling详解

转载自 mysql语句性能开销检测profiling详解 之前我介绍过msyql查询优化explain检查命令的使用&#xff0c;explain主要是检查sql语句的基本性能&#xff0c;sql是否优秀&#xff0c;但不能查看具体的涉及硬件资源的开销&#xff0c;今天要介绍的这个profiling工具可以更细节的…

数据库主从和主备部署介绍

转自&#xff1a; https://www.cnblogs.com/fengzheng/p/13401783.html 数据库层的几种模式 在系统架构中&#xff0c;数据库层主要由如下几种模式&#xff0c;分别是单点模式、主备模式、主从模式。 单点模式 单点模式是最简单的模式&#xff0c;只有一台数据库服务器&…

lv官网编码查询_图文讲解,如何免费自主查询商标近似情况

每一个商业主体&#xff0c;都会给自己的企业、产品、服务注册一个好听的商标&#xff0c;但一个商标名并不是我们喜欢就可以&#xff0c;还要能够得到核准注册。为此&#xff0c;如何能判断出一个商标名是否能够被注册呢&#xff1f;这便需要利用商标的近似判断了(查询该商标名…

hashCode到底有什么用?

转载自 hashCode到底有什么用&#xff1f; hashCode概念 hashCode是jdk根据对象的地址算出来的一个int数字&#xff0c;即对象的哈希码值&#xff0c;代表了该对象在内存中的存储位置。 我们都知道hashCode()方法是顶级类Object类的提供的一个方法&#xff0c;所有的类都可以进…

MySQL、MongoDB、列数据库的区别及应用场景

目录什么是行存储和列存储&#xff1f;什么是MongoDB&#xff08;NoSQL&#xff09;?OLTP和OLAP什么是CAP定理&#xff1f;使用场景行存储的适用场景&#xff1a;列存储的适用场景&#xff1a;MongoDB相对于MySQL的优点更适用MySQL的场景更适用MongoDB的场景个人理解扩展参考什…

转-Redis AOF 持久化详解

转自&#xff1a; https://juejin.cn/post/6844903902991630349 Redis AOF 持久化详解 Redis 是一种内存数据库&#xff0c;将数据保存在内存中&#xff0c;读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出&#xff0c;Redis 的数据就会丢失。 为了…

设计模式之动态代理模式实战

转载自 设计模式之动态代理模式实战 昨天分享了静态代理的概念及存在的缺点&#xff0c;所以今天讲的动态代理模式十分重要。动态代理在我们工作当中应用相当广泛&#xff0c;如Srping AOP就是动态代理的在开源框架的比较出名的应用。 动态代理有两种试&#xff0c;一是通过JD…

南理工计算机博士 年薪_计算机专业的女博士毕业后,进入211大学当讲师,年薪曝光...

现如今&#xff0c;对于很多年轻人来说&#xff0c;高学历就意味着高收入&#xff0c;大家在进入职场以后&#xff0c;通过高学历找到高薪的工作&#xff0c;能够在职场中少走很多弯路&#xff0c;他们在大公司能够跟很多的同事在一起勤快的合作&#xff0c;能够决定他们在以后…

MySQL优化(一):表结构优化

目录表优化数据类型的选择避免列的值为NULLVARCHAR和CHAR日期和时间类型选择标识符&#xff08;主键&#xff09;的类型错误的表结构一张表中有太多列太多的关联适当建立冗余数据混用范式和反范式建立缓存表和汇总表参考表优化 此文章用于记录《高性能MySQL》一书的知识点。 …

centos上安装redis

【README】 本文旨在阐述在linux上安装 redis 的步骤&#xff1b; 【1】具体步骤 step1&#xff09; 把 redis 压缩包 上传到 /opt/software, 并解压 step2&#xff09;打开 redis目录&#xff0c;执行 make&#xff0c; 若没有gcc编译器&#xff0c; 执行命令安装gcc&#…

设计模式之静态代理模式实战

转载自 设计模式之静态代理模式实战静态代理模式很简单&#xff0c;代理类和实现类都实现相同的接口&#xff0c;然后通过代理类来调用实现类的方法。如我们想保存用户信息之前打印用户信息&#xff0c;或者保存用户信息之后把这些信息缓存下来&#xff0c;即在运行方法前后插入…

pandas追加写入excel_[Excel]如果你爱Excel,请学好pandas

现在坐办公室的各种大小团体里&#xff0c;都会有一个“懂Excel的人”&#xff0c;可能那个人是团体里的一员&#xff0c;也可能是和这个团队关系比较好的一个热心人&#xff0c;但总之&#xff0c;你的身边会有这么一个人。如果你环顾四周也没发现&#xff0c;那可能你就是那个…

转:Redis 集群搭建详细指南

转自&#xff1a; https://www.cnblogs.com/mafly/p/redis_cluster.html 【README】 非常棒的一篇文章&#xff0c;感谢作者的分享&#xff1b; 先有鸡还是先有蛋&#xff1f; 最近有朋友问了一个问题&#xff0c;说毕业后去大城市还是小城市&#xff1f;去大公司还是小公…

快速排序算法思想及实现

简介 快速排序是对冒泡排序的一种改进&#xff0c;是一种分治算法&#xff0c;时间复杂度为O(nlogn) 基本思想 先从数列中取出一个数作为基准数。分区过程&#xff0c;将比这个数大的数全放到它的右边&#xff0c;小于或等于它的数全放到它的左边。再对左右区间重复第二步&a…

作为架构师,你必需要搞清楚的概念:POJO、PO、DTO、DAO、BO、VO

转载自 作为架构师&#xff0c;你必需要搞清楚的概念&#xff1a;POJO、PO、DTO、DAO、BO、VOPOJO、PO、DTO、DAO、BO、VO这些概念作为Java开发来说应该全部或者部分遇到过&#xff0c;作为架构师的你想必更是清楚这些概念在不同场景的应用。下面我逐一介绍一下&#xff0c;想必…

搭建redis集群

【README】 redis集群搭建有很多坑儿&#xff0c;注意&#xff1b; 【1】坑er 集合 坑儿1、 [ERR] Sorry, cant connect to node 192.168.163.203:6381 报错现场&#xff1a;执行如下命令新建集群&#xff0c;报错如下&#xff1b; /usr/local/redis-cluster/bin/redis-t…

怎么看cudnn的版本好_针对此次版本削弱,怎么用好嫦娥!

大招的费蓝 & 蓝量伤害比 &#xff08;先把嫦娥的冷门知识点放置顶&#xff0c;干货感比较重要&#xff09;嫦娥100%蓝时&#xff0c;所有造成的伤害均要2倍&#xff0c;蓝量每下降1%&#xff0c;伤害倍数就下降2%。即嫦娥在50%蓝的时候&#xff0c;伤害就不加倍了。嫦娥最…

设计模式(一):工厂方法

目录概括目的主要解决何时使用使用场景总结三种工厂模式的对比代码示例概括 定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。使用特殊的工厂方法代替对于对象构造函数的直接调用&#xff08;即使用 new运算符&#xff0c;工厂方法 使一个类的实例化延迟到其…

mysql查询优化explain命令详解

转载自 mysql查询优化explain命令详解mysql查询优化的方法有很多种&#xff0c;explain是工作当中用的比较多的一种检查方式。explain翻译即解释&#xff0c;就是看mysql语句的查询解释计划&#xff0c;从解释计划我们能很清楚的看到解释的语句有没有合理用到索引&#xff0c;扫…