SpringBoot 多组 Kafka 配置

SpringBoot 多组 Kafka 配置

单组 Kafka 配置

时隔多日,冒个泡吧。

场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理,但是呢,不同的三方服务他们用了不同的 kafka 集群,那么默认的 Spring 自动读取的 kafka 配置就不行了,它默认只支持一组,那么就需要单独进行多组配置。

先说单组配置的场景,只需要在你的 yml 里增加配置

spring:kafka:bootstrap-servers: 192.168.25.11:9092,192.168.25.22:9092properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: SCRAM-SHA-256sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";topic: your-topic

里面 security 和 sasl 用于鉴权

然后你就可以直接写个 consumer 来接受消息了

@Component
@Slf4j
public class Consumer {@KafkaListener(topics = "${spring.kafka.topic}", groupId = "your-group")public void consumeMsgLog(ConsumerRecord<?, ?> record) {// do everything}}

在上述配置和代码示例中,groupId 是 Kafka 消费者组的标识符,它在 Kafka 架构中起到了关键的角色。让我解释一下 groupId 在 Kafka 架构设计中的作用:

  1. Kafka 消费者组:Kafka 消费者组是一组 Kafka 消费者的逻辑集合,它们共同订阅一个或多个 Kafka 主题。消费者组中的每个消费者可以独立处理主题中的消息,而消费者组协调消息的分配和处理。
  2. 消息分发:Kafka主题中的每个分区中的消息可以被同一个消费者组的一个消费者处理。groupId 用于将消费者组中的消费者分配到分区,以确保消息被均匀地分发。这意味着每个分区的消息只能被消费者组中的一个消费者处理。
  3. Offset 管理groupId 还用于管理消息偏移量(offsets)。每个分区的消息都有一个偏移量,用于跟踪已处理的消息。Kafka维护每个消费者组的每个分区的偏移量,以确保消息不会被重复处理。这使得每个消费者组可以在不同时间点开始处理消息,并且不会丢失已处理的消息。
  4. 水平伸缩groupId 允许消费者组进行水平伸缩。您可以添加或删除消费者,而不会破坏分配的消息负载均衡。Kafka会根据消费者组的大小自动重新分配分区。

也就是说,一个主题中的消息,可以被多个消费者组消费,但是不能被同一个消费者组的多个消费者消费

在Kafka中,消费者组不需要显式地创建。当您的消费者开始订阅特定的主题时,如果指定了相同的 groupId,Kafka 会自动将这些消费者视为同一个消费者组。这意味着,只要您在消费者配置中指定了相同的 groupId,Kafka 就会自动将它们分配到同一个消费者组。

如果指定了不同的 groupId,Kafka 将把它们视为不同的消费者组,并且这些消费者组会独立地消费相同或不同的主题中的消息。

做了一些小小的铺垫,让我们进入正题

多组 Kafka 配置

Spring Kafka 提供了 ConcurrentKafkaListenerContainerFactory 以支持同时监听多个不同的 Kafka 集群或主题。可以为每个不同的 Kafka 集群或主题配置不同的 ConcurrentKafkaListenerContainerFactory 实例,以满足多组消费者需求。

所以就是我们自己定义加载配置而不是使用 Spring Boot 默认的预留配置。

那比如我有两组 Kafka 集群,为了省事,第一组我就用默认的,而另一组单独设置一组,然后进行ConcurrentKafkaListenerContainerFactory 的定制化注入

@Slf4j
@Configuration
public class KafkaConfiguration {@Value("${kafka.sec-kafka.consumer.bootstrap-servers:192.168.25.22:9092}")private String servers;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic ConsumerFactory<String, String> secKafkaConsumerFactory() {Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers", servers);consumerProps.put("group.id", "your-group");consumerProps.put("enable.auto.commit", "true");consumerProps.put("auto.commit.interval.ms", "2000");consumerProps.put("key.deserializer", StringDeserializer.class);consumerProps.put("value.deserializer", StringDeserializer.class);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 鉴权相关配置consumerProps.put("security.protocol", "SASL_PLAINTEXT");consumerProps.put("sasl.mechanism", "SCRAM-SHA-256");consumerProps.put("sasl.jaas.config", jaasConfig);return new DefaultKafkaConsumerFactory<>(consumerProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> secKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(secKafkaConsumerFactory());return factory;}}

解释一下配置

  1. bootstrap.servers:指定了 Kafka 服务器的地址和端口,这是连接到 Kafka 集群的入口点。

  2. group.id:指定了消费者所属的消费者组的标识符。Kafka 使用消费者组来协调消息分发,确保消息被均匀分发给消费者。

  3. enable.auto.commit:指定是否启用自动提交偏移量。如果设置为 “true”,Kafka 消费者会自动定期提交偏移量,以记录已经处理的消息。如果设置为 “false”,您需要手动管理偏移量。

  4. auto.commit.interval.ms:如果启用了自动提交,这个参数指定了自动提交偏移量的时间间隔,以毫秒为单位。

  5. key.deserializervalue.deserializer:指定用于反序列化消息键和值的反序列化器类。在这种情况下,它们都设置为 StringDeserializer.class,表示消息键和值都被视为字符串。

  6. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG:指定了当消费者启动时或者偏移量丢失时如何处理消息的偏移量。“earliest” 表示从最早的可用消息开始处理,“latest” 表示从最新的消息开始处理。

  7. 鉴权相关配置(SASL):这些配置用于设置 Kafka 消费者与 Kafka 集群之间的安全通信和身份验证。这包括 security.protocolsasl.mechanismsasl.jaas.config。它们指定了使用 SASL 加密和身份验证的方式,以及相应的配置信息。jaasConfig 包含了 SASL 配置的详细信息。

这些属性是 Kafka 消费者连接和配置的关键部分,它们确保了消费者可以连接到 Kafka 集群并以安全的方式处理消息

而这个时候你的 Consumer,只需要在注解里多一个配置 containerFactory

@Component
@Slf4j
public class SecConsumer {@KafkaListener(topics = "${kafka.topic}", groupId = "your-group" containerFactory = "secKafkaListenerContainerFactory")public void consumeMsgLog(ConsumerRecord<?, ?> record) {// do everything}}

更多使用方法可以参考官方文档 Spring for Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/131294.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[黑马程序员SpringBoot2]——运维实用篇

目录&#xff1a; 工程打包与运行打包插件Boot工程快速启动&#xff08;Linux版本&#xff09;临时属性配置文件4级分类自定义配置文件多环境开发(yaml版)多环境开发多文件版&#xff08;yaml版&#xff09;多环境开发多文件版&#xff08;properties版&#xff09;多环境分组…

vue如何实现视频全屏切换

最近项目开发中遇到一个视频窗口全屏切换功能&#xff0c;为此在这里做个记录。 具体的实现思路&#xff1a; <template><div class"content-box"><div class"container"><div id"screen" class"screen"><…

难题来了:分库分表后,查询太慢了,如何优化?

说在前面&#xff1a; 尼恩社群中&#xff0c;很多小伙伴反馈&#xff0c; Sharding-JDBC 分页查询的速度超级慢&#xff0c; 怎么处理&#xff1f; 反馈这个问题的小伙伴&#xff0c;很多很多。 而且这个问题&#xff0c;也是面试的核心难题。前段时间&#xff0c;有小伙伴…

MySQL数据库干货_13—— MySQL查询数据

MySQL查询数据 SELECT基本查询 SELECT语句的功能 SELECT 语句从数据库中返回信息。使用一个 SELECT 语句&#xff0c;可以做下面的事&#xff1a; 列选择&#xff1a;能够使用 SELECT 语句的列选择功能选择表中的列&#xff0c;这些列是想 要用查询返回的。当查询时&#xf…

vue-render函数的三个参数

第一个参数(必须) - {String | Object | Function} Vue.component(elem, {render: function(createElement) {return createElement(div);//一个HTML标签字符/*return createElement({template: <div></div>//组件选项对象});*//*var func function() {return {t…

使用electron ipcRenderer接收通信消息多次触发

使用electron ipcRenderer接收通信消息多次触发 在使用electron ipcRenderer.on接收ipcRenderer.send的返回值时&#xff0c;ipcRenderer.send发送一次信息&#xff0c; ipcRenderer.on会打印多个日志&#xff0c; renderer.once(get-file-path, (event: any, paths: any) &g…

商用车自动驾驶进入「拐点」时刻

对于自动驾驶的商业化落地来说&#xff0c;这个「性感」的赛道一直备受争议。在过去几年&#xff0c;包括港口、矿山等在内的封闭场景进入商业订单的收获期&#xff1b;但类似干线物流这样的半开放式场景&#xff0c;却喜忧参半。 今年初&#xff0c;作为全球自动驾驶领域的技…

利用MySQL玩转数据分析之基础篇

知识无底&#xff0c;学海无涯&#xff0c;到今天进入MySQL的学习4天了&#xff0c;知识点虽然简单&#xff0c;但是比较多&#xff0c;所以写一篇博客将MySQL的基础写出来&#xff0c;方便自己以后查找&#xff0c;还有就是分享给大家。 1、SQL简述 1&#xff09;SQL的概述 S…

windows 用vs创建cmake工程并编译opencv应用项目生成exe流程简述

目录 前言一、安装opencv&#xff08;1&#xff09;下载&#xff08;2&#xff09;双击安装&#xff08;3&#xff09;环境变量和system文件夹设置 二、打开vs创建项目三、编辑cpp&#xff0c;.h&#xff0c;cmakelist.txt文件&#xff08;1&#xff09;h文件&#xff08;2&…

【Python从入门到进阶】41、有关requests代理的使用

接上篇《40、requests的基本使用》 上一篇我们介绍了requests库的基本使用&#xff0c;本篇我们来学习requests的代理。 一、引言 在网络爬虫和数据抓取的过程中&#xff0c;我们经常需要发送HTTP请求来获取网页内容或与远程服务器进行通信。然而&#xff0c;在某些情况下&…

生成Linux系统下的一些文件

生成Linux系统下的一些文件 文章目录 生成Linux系统下的一些文件1. Initrd1.1 dracut命令1.2 mkinitramfs命令 2. GRUB2.1 Grub2.2 grub.cfg2.3 grub.efi 3. fstab3.1 自动更新3.2 手动更新 4. 生成用户 1. Initrd 通常, lib/modules/下的文件夹名称和内核的版本名是一致的, 所…

通过在Z平面放置零极点的来设计数字滤波器

文章来源地址&#xff1a;https://www.yii666.com/blog/393376.html 通过在Z平面放置零极点的来设计数字滤波器 要求&#xff1a;设计一款高通滤波器&#xff0c;用在音频信号处理过程中&#xff0c;滤掉100Hz以下的信号。 实现方法&#xff1a;通过在Z平面放置零极点的来设…

数据结构与算法【02】—线性表

CSDN系列专栏&#xff1a;数据结构与算法专栏 针对以前写的数据结构与算法系列重写(针对文字描述、图片、错误修复)&#xff0c;改动会比较大&#xff0c;一直到更新完为止 前言 通过前面数据结构与算法基础知识我们知道了数据结构的一些概念和重要性&#xff0c;那么本章总结…

华为政企光传输网络产品集

产品类型产品型号产品说明 maintainProductEA5800-X15 典型配置 上行160G 下行64口GPON 16口XGS PONEA5800系列多业务接入设备定位为面向NG-PON的下一代OLT&#xff0c;基于分布式架构&#xff0c;运用虚拟接入技术&#xff0c;为用户提供宽带、无线、视频回传等多业务统一承…

15、Nuxt.js代理转发解决跨域问题

nuxt.config.js export default {...// Modules: https://go.nuxtjs.dev/config-modulesmodules: ["nuxtjs/axios"],axios: {proxy: true, // 开启代理转发prefix: "/api"},// 代理转发proxy: {/api: {target: "https://mock.mengxuegu.com/mock/654…

【UE 材质】简单的闪闪发光材质

效果 节点 参考视频&#xff1a; https://www.bilibili.com/video/BV1uK411y737/?vd_source36a3e35639c44bb339f59760641390a8

MySQL(8):聚合函数

聚合函数介绍 聚合函数&#xff1a; 对一组数据进行汇总的函数&#xff0c;输入的是一组数据的集合&#xff0c;输出的是单个值。 聚合函数类型&#xff1a;AVG(),SUM(),MAX(),MIN(),COUNT() AVG / SUM 只适用于数值类型的字段&#xff08;或变量&#xff09; SELECT AVG(…

【LeetCode】每日一题 2023_11_4 数组中两个数的最大异或值

文章目录 刷题前唠嗑题目&#xff1a;数组中两个数的最大异或值题目描述代码与解题思路 结语 刷题前唠嗑 LeetCode? 启动&#xff01;&#xff01;&#xff01; 题目&#xff1a;数组中两个数的最大异或值 题目链接&#xff1a;421. 数组中两个数的最大异或值 题目描述 代…

python问题笔记2

70 列表嵌套元组,分别按字母和数字排序 您可以使用Python中的sorted()函数来对列表中的元组进行排序。首先,您需要定义一个自定义的排序函数,以便根据字母或数字进行排序。 以下是一个例子,展示如何按字母和数字分别对嵌套元组进行排序: def sort_by_letter(item):retu…

前端埋点方式

前言&#xff1a; 想要了解用户在系统中所做的操作&#xff0c;从而得出用户在本系统中最常用的模块、在系统中停留的时间。对于了解用户的行为、分析用户的需求有很大的帮助&#xff0c;想实现这种需求可以通过前端埋点的方式。 埋点方式&#xff1a; 1.什么是埋点&#xff1f…