spring-kafka整合:KafkaTemplate-kafka模板类介绍

【README】

1,本文主要关注 KafkaTemplate的重点方法,并非全部方法;

2,KafkaTemplate  底层依赖于 DefaultKafkaProducerFactory , 关于 DefaultKafkaProducerFactory 的介绍,refer2 

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客【1】 类描述类描述:单例共享 Producer 实例的 ProducerFactory 实现。此实现将为每次 createProducer() 调用时提供的 Map 配置和可选的 Serializer 实现返回相同的 Producer 实例(如果未启用事务)。如果您使用的序列化器没有参数构造函数并且不需要设置,那么最简单的方法是在传递给 DefaultKafkaProducerFactory 构造函数的配置中针对 ProducerConfig.KEY_SERIALIZER_CLASS_Chttps://blog.csdn.net/PacosonSWJTU/article/details/121306370


【1】KafkaTemplate 类说明

用于执行高级操作的模板。 当与 DefaultKafkaProducerFactory 一起使用时,模板是线程安全的。 生产者工厂和 org.apache.kafka.clients.producer.KafkaProducer 确保这一点;

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,ApplicationListener<ContextStoppedEvent>, DisposableBean {

 【1.1】构造方法

使用提供的生产者工厂和 autoFlush 设置创建一个实例。


如果您已将生产者的 linger.ms 配置为非默认值并希望立即在此模板上发送操作,无论该设置如何, 又或者您希望阻塞直到服务器根据acs属性确认已收到消息, 需要把autoFlush设置为true

如果 configOverrides 不为 null 或不为空,则将使用合并的生产者属性创建一个新的 DefaultKafkaProducerFactory,这些属性在提供的工厂属性之后进行覆盖。

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,@Nullable Map<String, Object> configOverrides) {Assert.notNull(producerFactory, "'producerFactory' cannot be null");this.autoFlush = autoFlush;this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;// 是否自定义生产者工厂 this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;if (this.customProducerFactory) {Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties()); 
// 覆盖工厂属性 configs.putAll(configOverrides); // 创建新的 DefaultKafkaProducerFactoryDefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs, producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier());
// 设置物理关闭生产者的超时时间  newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds());
// 设置是否分区 newFactory.setProducerPerConsumerPartition(producerFactory.isProducerPerConsumerPartition());
// 设置是否 每个线程创建一个 生产者; newFactory.setProducerPerThread(producerFactory.isProducerPerThread());
// 新工厂赋值this.producerFactory = newFactory;} else {this.producerFactory = producerFactory;}
// 是否开启kafka事务 this.transactional = this.producerFactory.transactionCapable();
}

【1.2】发送消息方法(非常重要)

发送消息有很多方法,大致分为两类;

  • send();
  • doSend();

【1.2.1】send() 发送消息

有4个外观方法,使用的都是默认topic;

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {return send(this.defaultTopic, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {return send(this.defaultTopic, key, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {return send(this.defaultTopic, partition, key, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {return send(this.defaultTopic, partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord);
}

可以看到,最后还是调用了 底层的 doSend() 方法;


【1.2.2】doSend() 方法

5个 doSend() 方法的外观方法 ,这5个方法对 topic ,分区, 消息key,时间戳,消息value  进行了重载

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {Assert.notNull(record, "'record' cannot be null");return doSend(record);
}

底层 doSend() 定义如下:

protected ListenableFuture<SendResult<K, V>>

        doSend(final ProducerRecord<K, V> producerRecord)

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {// 获取生产者 final Producer<K, V> producer = getTheProducer(producerRecord.topic());this.logger.trace(() -> "Sending: " + producerRecord);final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();Object sample = null;if (this.micrometerEnabled && this.micrometerHolder == null) {this.micrometerHolder = obtainMicrometerHolder();}if (this.micrometerHolder != null) {sample = this.micrometerHolder.start();}// 发送消息 Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));// May be an immediate failure (注意,这里可能马上失败,或有运行时异常抛出)if (sendFuture.isDone()) { try {sendFuture.get(); // 这里调用get会阻塞,如果发送没有完成的话 }catch (InterruptedException e) {Thread.currentThread().interrupt();throw new KafkaException("Interrupted", e);}catch (ExecutionException e) {throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace}}if (this.autoFlush) { // 自动刷新 flush();}this.logger.trace(() -> "Sent: " + producerRecord);return future;
}

【代码解说】

step1, 调用了 getTheProducer() 获取生产者 ;

关于 DefaultKafkaProducerFactory.createProducer() 可以参见 以下博文,因篇幅,本文不再赘述;

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客

step2,producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)) ; 调用了 buildCallback(...) 构建回调对象;

 

非事务模式,则关闭生产者

由 DefaultKafkaProducerFactory 可知, 生产者是  CloseSafeProducer, 其包裹了 原生 kafka生产者; 所以 调用了 CloseSafeProducer.close() 方法;

 

 step3,自动刷新缓存 flush(); 

如果 ProducerFactory 提供单例生产者(例如 DefaultKafkaProducerFactory),则调用此方法才有意义。

public void flush() {Producer<K, V> producer = getTheProducer();try {producer.flush();}finally {closeProducer(producer, inTransaction());}
}protected void closeProducer(Producer<K, V> producer, boolean inTx) {if (!inTx) { // 非事务才关闭 producer.close(this.closeTimeout);}
}


【2】KafkaTemplate 发送消息与生产者复用 

 我们再次 follow了 DefaultKafkaProducerFactory的 doCreateProducer() 方法;

第1次因为发送消息 新建了 producer;

第2次再发送消息时,因为producer 不为null;所以直接取走;

同时 synchronized同步块可以避免并发问题;

发送消息后,是否关闭生产者,可以参考 【小结】


【小结】

通过分析 KafkaTemplate.doSend() 消息发送分发, 我们可以看到,

每发送一条消息,如果抛出异常的话,则会关闭kafka生产者,否则不会关闭生产者;原因参见  

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客https://blog.csdn.net/PacosonSWJTU/article/details/121306370中的章节 【4.5.1】;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329812.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓 on a null object reference_详解Object.prototype.__proto__

Object.prototype 的 __proto__ 属性是一个访问器属性(一个getter函数和一个setter函数), 暴露了通过它访问的对象的内部[[Prototype]] (一个对象或 null)。使用__proto__是有争议的&#xff0c;也不鼓励使用它。因为它从来没有被包括在EcmaScript语言规范中&#xff0c;但是现…

【SpringBoot】服务器JVM远程调试

目的 当系统部署到测试环境服务器时&#xff0c;难免会遇到bug。这个时候如果能远程调试&#xff0c;那么能够大大提高我们的生产效率&#xff0c;快速完成服务调试&#xff0c;最快发布生产环境。&#xff08;领导好评不就到手了&#xff09; 准备 Idea&#xff08;Java最好…

图说:为什么Java中的字符串被定义为不可变的

转载自 图说&#xff1a;为什么Java中的字符串被定义为不可变的字符串&#xff0c;想必大家最熟悉不过了&#xff0c;通常我们在代码中有几种方式可以创建字符串&#xff0c;比如&#xff1a;String s "Hollis";这时&#xff0c;其实会在堆内存中创建一个字符串对象…

值得推荐的微软技术公众号推荐

为开阔技术人眼界&#xff0c;促进技术人职业成长。小二在此诚意推荐最值得关注的微软技术公众号。平时关注推送的文章或多或少要么学到知识技能&#xff0c;要么收到一些启发&#xff0c;有利于个人成长。这些公众号为笔者个人积累&#xff0c;不一定都合大家口味&#xff0c;…

springboot:BeanPostProcessor示例及分析

【README】 1&#xff0c;本文主要分析 BeanPostProcessor 的作用&#xff0c; 开发方式&#xff1b; 2&#xff0c;BeanPostProcessor 是bean后置处理器&#xff0c; 简而言之就是bean被创建好了&#xff0c;之后如果需要对其属性进行修改&#xff0c;则 需要使用 BeanPost…

实现了某一个接口的匿名类的例子_java中的内部类内部接口详解,一文搞定

简介一般来说&#xff0c;我们创建类和接口的时候都是一个类一个文件&#xff0c;一个接口一个文件&#xff0c;但有时候为了方便或者某些特殊的原因&#xff0c;java并不介意在一个文件中写多个类和多个接口&#xff0c;这就有了我们今天要讲的内部类和内部接口。内部类先讲内…

Java 8 日期和时间解读

转载自 Java 8 日期和时间解读现在&#xff0c;一些应用程序仍然在使用java.util.Date和java.util.Calendar API和它们的类库&#xff0c;来使我们在生活中更加轻松的处理日期和时间&#xff0c;比如&#xff1a;JodaTime。然而&#xff0c;Java 8 引进的新的类库来处理日期和时…

云计算产值将超3000亿美元 亚马逊微软谷歌居三甲

腾讯科技讯 3月27日消息&#xff0c;据外电报道&#xff0c;云计算曾经主要是无法承担建造和维护基础设施的初创公司的解决方案&#xff0c;但对于管理数字业务的大型企业而言&#xff0c;云计算正快速成为省钱的管理数字业务的方式。市场调研公司IDC在上月的一份调查数据显示&…

oracle中join另一个表后会查询不出一些数据_面试必备 | 8个Hive数据仓工具面试题锦集!...

是新朋友吗&#xff1f;记得先点蓝字关注我哦&#xff5e;今日课程菜单Java全栈开发 | Web前端H5大数据开发 | 数据分析人工智能Python | 人工智能物联网进入数据时代&#xff0c;大数据技术成为互联网发展的核心要素之一。与此同时大数据开发工程师的薪资也成为行业内高薪的代…

springboot-Initializer例子及分析

【README】 1&#xff0c;本文主要编写了 初始化器例子并分析了其调用路径&#xff1b; 2&#xff0c;初始化器的执行顺序 先于 后置处理器&#xff1b; 后置处理器&#xff0c;refer2 springboot&#xff1a;BeanPostProcessor示例及分析_PacosonSWJTU的博客-CSDN博客【RE…

ASP.NET 开发人员不必担心 Node 的五大理由

哦别误会……我真的很喜欢 Node&#xff0c;而且我觉得它提出的概念和模式将在很长一段时间内&#xff0c;对服务端 Web 编程产生深远的影响。即使随着时间的推移 Node 过气了&#xff0c;我们肯定可以从下一个牛逼玩意身上或多或少的感觉到它的影响(不管好的和/或坏的)。而在这…

Spring Boot面试题

转载自 Spring Boot面试题 Spring Boot 是微服务中最好的 Java 框架. 我们建议你能够成为一名 Spring Boot 的专家。问题一 Spring Boot、Spring MVC 和 Spring 有什么区别&#xff1f; SpringFrameSpringFramework 最重要的特征是依赖注入。所有 SpringModules 不是依赖注入就…

synchronized原理_Java并发编程 -- synchronized保证线程安全的原理

线程安全是并发编程中的重要关注点&#xff0c;应该注意到的是&#xff0c;造成线程安全问题的主要诱因有两点&#xff0c;一是存在共享数据(也称临界资源)&#xff0c;二是存在多条线程共同操作共享数据。因此为了解决这个问题&#xff0c;我们可能需要这样一个方案&#xff0…

转:Java 7 种阻塞队列详解

转自&#xff1a; Java 7 种阻塞队列详解 - 云社区 - 腾讯云队列&#xff08;Queue&#xff09;是一种经常使用的集合。Queue 实际上是实现了一个先进先出&#xff08;FIFO&#xff1a;First In First Out&#xff09;的有序表。和 List、Set ...https://cloud.tencent.com/de…

微软Build 2016前瞻:让开发者编写能畅行所有设备的app

本周三&#xff0c;5000名软件开发者将齐聚旧金山莫斯康展览中心参加微软公司年度开发者大会&#xff08;Build 2016&#xff09;&#xff0c;和往年一样&#xff0c;微软在大会上发布了一系列新的技术支持。 据透露&#xff0c;微软将会让开发人员编写可以在任何Windows设备上…

XSS的那些事儿

转载自 XSS的那些事儿XSS是什么XSS&#xff0c;Cross-site scripting&#xff0c;跨站脚本攻击&#xff0c;为了区分与CSS&#xff0c;起名为XSS。黑客利用网站的漏洞&#xff0c;通过代码注入的方式将一些包含了恶意攻击脚本程序注入到网页中&#xff0c;企图在用户加载网页时…

js 时间戳转换成时间_JavaScript 时间戳转成日期格式

我们在开发中经常需要把时间戳转化成日期格式&#xff0c;但 JavaScript 本身自带的 Date 方法并不像 PHP date 的那么强大。因此&#xff0c;我们就需要自己动手写一个方法。首先我们要先了解下需要用到的 JavaScript 自带的 Date 对象的方法&#xff1a;getDate&#xff1a;获…

java阻塞队列小结

【README】 1&#xff0c;本文介绍了java的7个阻塞队列&#xff1b; 2&#xff0c;阻塞队列的作用 做缓冲作用&#xff0c;如缓冲kafka消息&#xff0c;而不是直接发送给kafka&#xff0c;减少kafka集群的压力&#xff1b;【1】阻塞队列 BlockingQueue 概述 1&#xff0c;队…

来自.NET FM的感谢信

掐指一算&#xff0c;我们的播客 .NET FM 已经上线一周了&#xff01;&#xff01;&#xff01;不过瞅下节节攀升的流量&#xff0c;二位主播一边感叹 .NET 中文社区的热情&#xff0c;一边摸了摸瘪下去的荷包&#xff1a; • 首日访问 > 2000人次 • 五日访问 > 5000人次…

并发场景下MySQL存在的问题及解决思路

转载自 并发场景下MySQL存在的问题及解决思路 目录1、背景2、表锁导致的慢查询的问题3、线上修改表结构有哪些风险&#xff1f;4、一个死锁问题的分析5、锁等待问题的分析6、小结 一、背景对于数据库系统来说在多用户并发条件下提高并发性的同时又要保证数据的一致性一直是数据…