kafka Interceptors and Listeners

Interceptors

ProducerInterceptor

https://www.cnblogs.com/huxi2b/p/7072447.html

Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们.

API

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率void onAcknowledgement(RecordMetadata metadata, Exception exception);//关闭interceptor,主要用于执行一些资源清理工作void close();
}

demo

    public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> props = new HashMap();props.put("bootstrap.servers", "localhost:9092");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);List<String> interceptors = new ArrayList<>();interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "test-topic";Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);producer.send(record).get();}// 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}

ConsumerInterceptor

https://blog.csdn.net/warybee/article/details/121980296

消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

  • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
  • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
  • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
  • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

API

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {/**该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。*/ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);/**当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。调用者将忽略此方法抛出的任何异常。*/void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);/***  关闭Interceptor之前调用*/void close();
}

配置

//如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");

Listeners

ProducerListener

https://blog.csdn.net/u014494148/article/details/125344184

Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:

API

public interface ProducerListener<K, V> {/*** Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).*/default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}/*** Invoked after an attempt to send a message has failed.*/default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,Exception exception) {}}

自定义Listener

public class MyProducerListener<K, V> implements ProducerListener<K, V> {private FallbackHandler<K, V> fallbackHandler;@Overridepublic void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {//fallbackHandler.process.//write error metrics...}@Overridepublic void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {//write success metrics...}
}

demo(KafkaTemplate.setProducerListener())

    public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();listener1.setFallbackHandler(fallbackHandler);kafkaTemplate.setProducerListener(listener1);return kafkaTemplate;}

KafkaListenerErrorHandler

当@KafkaListener方法抛出异常时调用的错误处理程序.

API

@FunctionalInterface
public interface KafkaListenerErrorHandler {/*** Handle the error.*/Object handleError(Message<?> message, ListenerExecutionFailedException exception);
}

自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费)

/*** 可以通过:* @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")* 来引入该配置*/
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {//记录了所有的 kafka MessageListenerContainerprivate final KafkaListenerEndpointRegistry endpointRegistry;public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception) {// 处理异常// 暂停消费者String listenerId = exception.getGroupId();MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);listenerContainer.pause();//滑动窗口算法 ---// 休眠一段时间(例如 30秒)try {Thread.sleep(30000); // 暂停 30 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 恢复消费者listenerContainer.resume();return null;}
}

demo

@org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")

Callback

producer.Callback

public interface Callback {//processed befeore listener...void onCompletion(RecordMetadata metadata, Exception exception);
}

demo

producer.send(producerRecord, (recordMetadata, exception) -> {if (exception == null) {System.out.println("Record written to offset " +recordMetadata.offset() + " timestamp " +recordMetadata.timestamp());} else {System.err.println("An error occurred");exception.printStackTrace(System.err);}
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/732340.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java SE入门及基础(29)

第三节 访问修饰符 1. 概念 访问修饰符就是控制访问权限的修饰符号 2. 类的访问修饰符 类的访问修饰符只有两种&#xff1a;public 修饰符和默认修饰符&#xff08;不写修饰符就是默认&#xff09; public 修饰符修饰类表示类可以公开访问。默认修饰符修饰类表示该类只能…

融资项目——网关微服务

1. 网关的路由转发功能 在前后端分离的项目中&#xff0c;网关服务可以将前端的相关请求转发到相应的后端微服务中。 2. 网关微服务的配置 首先需要创建一个网关微服务&#xff0c;并添加依赖。 <!-- 网关 --><dependency><groupId>org.springframework.cl…

wpf中的Border和Background

在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;Border和Background是两个非常重要的属性&#xff0c;它们通常用于定义用户界面元素的外观样式。 Border&#xff1a; Border 是一个控件&#xff0c;它本身是一个装饰器&#xff0c;允许您为任何…

FreeRTOS学习笔记-基于stm32(3)中断管理

一、什么是中断 通俗点讲就是让CPU停止当前在做的事&#xff0c;转而去做更紧急的事。 二、中断优先级分组 这个紧急的事也有一个等级之分&#xff0c;优先级越高越先执行。stm32使用中断优先配置寄存器的高4位&#xff0c;共16级的中断优先等级。 stm32的中断优先等级可以分为…

让开源浏览器Chromium正常显示中文

什么是 Chromium &#xff1f; Chromium 是一个开源浏览器项目&#xff0c;旨在为所有用户构建一种更安全、更快、更稳定的网络体验方式。 和老苏之前介绍的 Firefox 的作用是一样的 文章传送门&#xff1a;给群晖安装firefox浏览器 因为是基于 vnc 的应用&#xff0c;感觉资源…

Elasticsearch 通过索引阻塞实现数据保护深入解析

Elasticsearch 是一种强大的搜索和分析引擎&#xff0c;被广泛用于各种应用中&#xff0c;以其强大的全文搜索能力而著称。 不过&#xff0c;在日常管理 Elasticsearch 时&#xff0c;我们经常需要对索引进行保护&#xff0c;以防止数据被意外修改或删除&#xff0c;特别是在进…

mysql笔记:1. 数据库创建与删除

可以使用show databases语句来查看当前所有存在的数据库。 mysql> show databases; Database ---------- information_schema mysql performance_schema sys其中&#xff0c;mysql用来描述用户访问权限。 创建数据库 创建数据库可以使用create database命令。 例如&#…

动态规划 第一期 背包问题

前言 动态规划难度较高&#xff0c;但是也十分重要&#xff0c;希望大家能够好好的理解&#xff01;&#xff01;&#xff01; 一、背包问题 思维导图&#xff1a; 背包问题(Knapsack problem)是一种组合优化的NP完全问题。问题可以描述为&#xff1a;给定一组物品&#xff…

c++ primer中文版第五版作业第十四章

见仓库 文章目录 14.114.214.314.414.514.614.714.814.914.1014.1114.1214.1314.1414.1514.1614.1714.1814.1914.2014.2114.2214.2314.2414.2514.2614.2714.2814.2914.3014.3114.3214.3314.3414.3514.3614.3714.3814.3914-4014.4114.4214.4314.4414.4514.4614.4714.4814.4914.5…

实现QT中qDebug()的日志重定向

背景&#xff1a; 在项目开发过程中&#xff0c;为了方便分析和排查问题&#xff0c;我们需要将原本输出到控制台的调试信息写入日志文件&#xff0c;进行持久化存储&#xff0c;还可以实现日志分级等。 日志输出格式&#xff1a; 我们需要的格式包括以下内容&#xff1a; 1.…

MySQL死锁详细介绍

首先死锁产生的原因&#xff1a;两个及以上事务争夺资源导致互相等待造成的 比如事务A先修改id为1的数据再去修改id为2的数据&#xff0c;事务B先修改id为2的数据再去修改id为1的数据&#xff0c;因为事务A先拿到id1的锁再去拿id2的锁&#xff0c;而事务B先拿到id2的锁又去拿id…

“成像光谱遥感技术中的AI革命:ChatGPT应用指

遥感技术主要通过卫星和飞机从远处观察和测量我们的环境&#xff0c;是理解和监测地球物理、化学和生物系统的基石。ChatGPT是由OpenAI开发的最先进的语言模型&#xff0c;在理解和生成人类语言方面表现出了非凡的能力。本文重点介绍ChatGPT在遥感中的应用&#xff0c;人工智能…

【Algorithms 4】算法(第4版)学习笔记 16 - 4.2 有向图

文章目录 前言参考目录学习笔记1&#xff1a;介绍1.1&#xff1a;有向图简介1.2&#xff1a;应用举例1.3&#xff1a;相关问题2&#xff1a;有向图 API2.1&#xff1a;有向图表示2.1.1&#xff1a;邻接表数组 Adjacency-list2.1.2&#xff1a;Java 实现&#xff1a;邻接表数组2…

2. gin中间件注意事项、路由拆分与注册技巧

文章目录 一、中间件二、Gin路由简介1、普通路由2、路由组 三、路由拆分与注册1、基本的路由注册2、路由拆分成单独文件或包3、路由拆分成多个文件4、路由拆分到不同的APP 一、中间件 在日常工作中&#xff0c;经常会有一些计算接口耗时和限流的操作&#xff0c;如果每写一个接…

Sftp服务器搭建(linux)

Sftp服务器搭建&#xff08;linux&#xff09; 一、基本工作原理 FTP的基本工作原理如下&#xff1a; 1&#xff09;建立连接&#xff1a;客户端与服务器之间通过TCP/IP建立连接。默认情况下&#xff0c;FTP使用端口号21作为控制连接的端口。​​​​​​​ 2&#xff09;身…

基于AI软件平台 HEGERLS智能托盘四向车机器人物流仓储解决方案持续升级

随着各大中小型企业对仓储需求的日趋复杂&#xff0c;柔性、离散的物流子系统也不断涌现&#xff0c;各种多类型的智能移动机器人、自动化仓储装备大量陆续的应用于物流行业中&#xff0c;但仅仅依靠传统的物流技术和单点的智能化设备&#xff0c;已经无法更有效的应对这些挑战…

Office 2007软件安装教程(附软件下载地址)

软件简介&#xff1a; 软件【下载地址】获取方式见文末。注&#xff1a;推荐使用&#xff0c;更贴合此安装方法&#xff01; 微软Office 2007是一款具有重大创新与革命性的办公软件套件。它引入了全新设计的用户界面&#xff0c;提供稳定安全的文件格式&#xff0c;并实现了无…

数据结构 - 堆(优先队列)+ 堆的应用 + 堆练习

文章目录 前言堆一、什么是堆二、堆又分为大根堆和小根堆三、由于堆的逻辑结构被看成完全二叉树&#xff0c;那么我们先来了解一下完全二叉树。四、堆使用数组还是链表储存数据呢&#xff1f;五、数组构建二叉树和父子节点之间的定位六、堆进行的操作七、实现小根堆1、堆的初始…

算法(数据结构)面试问题准备 二分法/DFS/BFS/快排

一、算法概念题 1. 二分法 总结链接几种查找情况的模板另一个好记的总结总结&#xff1a;搜索元素两端闭&#xff0c;while带等&#xff0c;mid1&#xff0c;结束返-1 搜索边界常常左闭右开&#xff0c;while小于&#xff0c;mid看边界开闭&#xff0c;闭开&#xff0c;结束i…

vue2【详解】生命周期(含父子组件的生命周期顺序)

1——beforeCreate&#xff1a;在内存中创建出vue实例&#xff0c;数据观测 (data observer) 和 event/watcher 事件配置还没调用&#xff08;data 和 methods 属性还没初始化&#xff09; 【执行数据观测 (data observer) 和 event/watcher 事件配置】 2——created&#xf…