Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

  1. ConsumerFactory
    • 作用:负责创建 Kafka 消费者的实例。ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer实例。
    • 用法:通常在Spring配置类中定义,并通过依赖注入提供给KafkaListenerContainerFactory
  2. ConcurrentKafkaListenerContainerFactory
    • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个Kafka MessageListenerContainer来提供并发消息消费。
    • 特点:可以设置并发消费的数量,即同时运行的MessageListenerContainer的数量。
      支持消息过滤、错误处理和事务管理。
    • 用法:在Spring配置类中定义,并设置其ConsumerFactory和其他相关配置。然后,可以通过@KafkaListener注解直接使用,Spring会自动使用这个工厂来创建监听器。
  3. KafkaListenerEndpointRegistry
    • 作用:这是一个管理类,用于管理应用中所有由@KafkaListener注解创建的消息监听器容器。
    • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
      可以用来查询当前所有注册的监听器的状态。
    • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
  4. KafkaTemplate
    • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
    • 特点:提供同步和异步发送消息的方法。
      支持事务消息发送。
    • 用法:定义在Spring配置类中,注入生产者工厂ProducerFactory,并用于应用中的消息发送。
  5. @KafkaListener
    作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
    特点:
    可以指定主题、分区和消费组。
    支持并发消费。
    用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

从上面的内容可以看到,KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

@Component
public class KafkaConfig {@Autowiredprivate KafkaListenerEndpointRegistry registry;@PostConstructpublic void init() {System.out.println(registry);}
}

IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
在这里插入图片描述
我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

@SpringBootApplication
public class SpringKafkaExampleApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaExampleApplication.class, args);}
}

引入的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>

于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

KafkaListenerEndpointRegistry 隐式注册分析

SpringBoot 对于 kafka 有如下的自动配置:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter recordMessageConverter;private final RecordFilterStrategy<Object, Object> recordFilterStrategy;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final CommonErrorHandler commonErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> recordMessageConverter,ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,ObjectProvider<CommonErrorHandler> commonErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.recordMessageConverter = recordMessageConverter.getIfUnique();this.recordFilterStrategy = recordFilterStrategy.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.commonErrorHandler = commonErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);configurer.setBatchMessageConverter(this.batchMessageConverter);configurer.setRecordMessageConverter(this.recordMessageConverter);configurer.setRecordFilterStrategy(this.recordFilterStrategy);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setCommonErrorHandler(this.commonErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}
}

可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

@EnableKafka 定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}

这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {@Overridepublic String[] selectImports(AnnotationMetadata importingClassMetadata) {return new String[] { KafkaBootstrapConfiguration.class.getName() };}}

上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

KafkaBootstrapConfiguration 内容如下:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));}if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,new RootBeanDefinition(KafkaListenerEndpointRegistry.class));}}}

KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

RootBeanDefinition 的功能

RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

  • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
  • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
  • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
在这里插入图片描述
然后在如下所示的位置:
在这里插入图片描述
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
在这里插入图片描述
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
所以当我们 springboot 项目引入了

 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

总结

当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/827384.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Opencv_2_ 图像色彩空间转换

ColorInvert.h 内容如下&#xff1a; #pragma once #include <opencv.hpp> using namespace std; #include <opencv.hpp> using namespace cv; using namespace std; class ColorInvert{ public : void colorSpaceInvert(Mat&image); }; ColorInvert.cpp…

高效过滤器检漏方法选择指南及关键注意事项一览

在生物制药企业中&#xff0c;高效过滤器&#xff08;HEPA&#xff09;的检漏工作是确保洁净室能够达到并保持设计的洁净级别的关键步骤。这关系到产品的质量和安全&#xff0c;因此必须遵循相关法规标准和操作流程。 关于北京中邦兴业 北京中邦兴业科技有限公司是一家国家高新…

element中file-upload组件的提示‘按delete键可删除’,怎么去掉?

问题描述 element中file-upload组件会出现这种提示‘按delete键可删除’ 解决方案&#xff1a; 这是因为使用file-upload组件时自带的提示会盖住上传的文件名&#xff0c;修改一下自带的样式即可 ::v-deep .el-upload-list__item.is-success.focusing .el-icon-close-tip {d…

基于SpringBoot的宠物领养网站管理系统

基于SpringBootVue的宠物领养网站管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatis工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 宠物领养 宠物救助站 宠物论坛 登录界面 管理员界面 摘要 基于Spr…

1.C++入门

目录 1.C关键字 2.命名空间 作用域方面的优化 a.命名空间定义 b.命名空间使用 3.C 输入&输出 1.C关键字 C有63个关键字&#xff0c;C语言有32个关键字&#xff0c;存在重叠如荧光笔标出 2.命名空间 作用域方面的优化 如果变量&#xff0c;函数和类的名称都存在于全…

AI自动生成PPT文档 aippt的API介绍文档

官方链接直达&#xff01; 产品介绍​ 能力介绍​ AiPPT 是一款智能生成演示幻灯片的在线工具。专业设计团队打造海量模板资源&#xff0c;输入标题即可轻松生成完整的PPT。同时 AiPPT 支持导入多格式文档一键生成 PPT&#xff0c;让 PPT 创作更加高效。聚焦于内容&#xff0…

安装zabbix server

目录 1、实验环境 2、yum 安装zabbix server 2.1 解决权限问题和放行流量 2.2 安装zabbix-server 1、实验环境 操作系统rhel8zabbix6.0TLS数据库mysql8.0.30IP地址192.168.81.131时间配置NTP时间服务器同步 2、yum 安装zabbix server 如果通过yum源安装&#xff0c;操作系…

SysetmUI开机是否显示Keyguard的流程

KeyguardViewMediator的onSystemReady方法 没有启用keyguard时KeyguardViewMediator的log&#xff1a; onSystemReady 方法 doKeyguardLocked LockPatternUtils.isLockScreenDisabled 来判断是否启用 public final static String DISABLE_LOCKSCREEN_KEY "lockscreen.…

信息化工作人员必备常识4——ping命令详解【不间断发包自定义发包的大小自定义发包次数】

信息化工作人员必备常识4——ping命令详解【不间断发包&自定义发包的大小&自定义发包次数】 前言回顾pingtelnetnslookup命令 ping 命令详解帮助手册不间断向目标 IP 发送数据包 -t定义发送数据包的大小 -l-t&-l 验证网络承载能力自定义发送数据包的次数统计响应时…

[BT]BUUCTF刷题第20天(4.22)

第20天 Web [GWCTF 2019]我有一个数据库 打开网站发现乱码信息&#xff08;查看其他题解发现显示的是&#xff1a;我有一个数据库&#xff0c;但里面什么也没有~ 不信你找&#xff09; 但也不是明显信息&#xff0c;通过dirsearch扫描得到robots.txt&#xff0c;然后在里面得…

51单片机数字温度报警器_DS18B20可调上下限(仿真+程序+原理图)

数字温度报警器 1 **主要功能&#xff1a;*****\*资料下载链接&#xff08;可点击&#xff09;&#xff1a;\**** 2 **仿真图&#xff1a;**3 **原理图&#xff1a;**4 **设计报告&#xff1a;**5 **程序设计&#xff1a;**主函数外部中断函数DS18B20驱动 6 讲解视频7 **资料清…

上海亚商投顾:沪指震荡调整 油气等周期股集体下挫

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日震荡调整&#xff0c;深成指、创业板指小幅走弱。军工板块逆势拉升&#xff0c;中船应急、捷安高科、…

simlab python二次开发2-一键生成轴瓦并设定节点号

simlab python二次开发2-一键生成轴瓦并设定节点号 1、节点坐标计算并建立1.1、建坐标原点节点&#xff0c;并得到Model-1.gda1.2、轴瓦节点计算并建立 2、由节点建面2.1、由4个节点建面得到3个面单元Body2.2、得到Bodies名称2.3、根据Bodies名称选面特征&#xff08;放入Group…

AR爆发的前夜,Rokid站在了门口

文&#xff5c;刘俊宏 摆脱6寸的手机屏幕&#xff0c;栖居在300寸大屏的智慧生活是什么样子&#xff1f; 4月20日&#xff0c;Rokid在新品AR Lite空间计算套装的发布会上&#xff0c;“硬刚”了苹果的Vision Pro。 Rokid AR Lite空间计算套装 Rokid AR Lite与苹果Vision Pro…

el-upload组件如何上传blob格式的url地址视频

el-upload组件如何上传blob格式的url地址视频 一、存在问题二、直接上代码 需求&#xff1a;想把视频地址url:“blob:http://localhost:8083/65bd3c0f-52ec-4844-b85e-06fdb5095b7b”&#xff0c;通过el-upload组件上传 el-upload是Element UI中用于文件上传的组件&#xff0c;…

中文医疗大模型及中文底座大模型参考

参考&#xff1a;https://github.com/HqWu-HITCS/Awesome-Chinese-LLM 中文底座大模型 中文医疗大模型

c#学习入门1

一、环境配置 颜色主题 字体设置 行号设置 二、第一个应用程序 1. 在解决方案下创建一个新项目 第一种注释&#xff1a;两杠注释 第二种注释&#xff1a;星号注释 第三种注释&#xff1a;三杠注释(只有在花括号后面输出才会自动补全&#xff09; 2.控制台输入打印基础语句 输…

第⑯讲:Ceph集群Pool资源池管理以及PG的数据分布的核心技术要点

文章目录 1.Pool资源池的管理1.1.查看Pool资源池列表1.2.创建一个Pool资源池1.3.查看Pool资源池的参数信息1.4.修改Pool资源池的参数信息1.5.为Pool资源池设置应用模式1.6.重命名Pool资源池1.7.设置Pool资源池的限额1.8.删除Pool资源池1.9.查看Pool资源池的利用率 2.PG的数据分…

产品经理必会12个产品模型

很多运营经理&#xff0c;常常觉得产品成功的决定性因素是“产品做得好”。 而很多产品经理却认为&#xff0c;产品互抄太严重了&#xff0c;差异化的竞争要点是“产品运营得好”。 在商业高速发展时代&#xff0c;成功产品定义往往不是单点成功&#xff0c;而是由3大要素共同…

就业班 第三阶段(负载均衡) 2401--4.18 day2 nginx2 LVS-DR模式

3、LVS/DR 模式 实验说明&#xff1a; 1.网络使用NAT模式 2.DR模式要求Director DIP 和 所有RealServer RIP必须在同一个网段及广播域 3.所有节点网关均指定真实网关 主机名ip系统用途client172.16.147.1mac客户端lvs-server172.16.147.154centos7.5分发器real-server1172.16.…