spring boot3 kafka集群搭建到使用

首先自行安装docker,通过docker容器安装kafka
CentOS 系统 docker安装地址

 1.pom.xml和application.properties或者application.yml文件配置

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring:kafka:bootstrap-servers: [fafka地址1,fafka地址2,....]
#    producer序列化设置producer:#key序列化设置,设置成json对象
#      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#    val序列化设置,设置成json对象value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

2.博主安装了kafka ui插件,就直接创建主题了

当前一个集群,因为博主只搭建了一台服务器,也可以称为一个节点

创建主题

没有安装kafka ui,就再main那里启动项目时创建

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);TopicBuilder.name("my-new-topic")//主题.partitions(3)//分区.replicas(2)//副本.build();}}

副本就是备份,有几节点就可以创建几个副本,副本数量一般采取分区数量-1,只有一个节点就N分区1副本


 3.在main 加上这个注解@EnableKafka

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);}}

4.生产者发送消息

package com.atguigu.boot3_08_kafka.controller;import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/jjj") public String hello() {kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主题", 分区号,"key","val")return "ok";}@GetMapping("/odj")public String odj() {kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//对象json需要序列化,可用配置文件配置,也可以在对象中序列化对象return "OK";}
}

5.消费者监听消息

package com.atguigu.boot3_08_kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;@Component
public class MykafkaListener {/*** 默认的监听是从最后一个消息开始拿,也就是只会拿新消息,不会拿历史的* @KafkaListener(topics = "主题",groupId = "用户组")* ConsumerRecord 消费者从 Kafka 获取消息的各种元数据和实际的消息* @param record*/@KafkaListener(topics = "tach",groupId = "teach")public void listen(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}/***  想要到历史的消息或者全部消息,只能设置偏移量*  @KafkaListener(groupId = "用户组" ,topicPartitions = {设置分区,设置偏移量})*  @TopicPartition(topic = "主题" ,partitionOffsets 设置偏移量)*  @PartitionOffset(partition = "哪个分区", initialOffset = "从第几个偏移量开始")** @param record*/@KafkaListener(groupId = "teach" ,topicPartitions = {@TopicPartition(topic = "tach" ,partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})public void listens(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}
}

最后查看结果


最后补充一个小知识

groupId = "用户组"

组里的成员是竞争模式

用户组和用户组之间是发布/订阅模式

由zookeeper分配管理

好了可以和面试官吹牛逼了


课外话

如果是传对象json需要序列化,创建对象时序列化,不推荐太原始重要是很占资源

因为开始我们都配置好了,有对象就会自动序列化

package com.atguigu.boot3_08_kafka.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推荐implements Serializable private Long id;private String name;private Integer age;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/72323.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 RWA 模型与 AI - Agent 协同的企业级 aPAAS 架构设计

一、引言 在企业数字化转型不断深化的当下&#xff0c;现实世界资产&#xff08;RWA&#xff09;模型与人工智能智能体&#xff08;AI - Agent&#xff09;的协同融合&#xff0c;为企业级应用平台即服务&#xff08;aPAAS&#xff09;架构的创新发展带来了新契机。这种架构旨在…

Flutter PopScope对于iOS设置canPop为false无效问题

这个问题应该出现很久了&#xff0c;之前的组件WillPopScope用的好好的&#xff0c;flutter做优化打算“软性”处理禁用返回手势&#xff0c;出了PopScope&#xff0c;这个组件也能处理在安卓设备上的左滑返回事件。但是iOS上面左滑返回手势禁用&#xff0c;一直无效。 当然之…

基尔霍夫定律课后学习日志

基尔霍夫定律课后日志 今天在学习基尔霍夫定律后&#xff0c;我对它在实际工程中的价值有了全新的认识。 基尔霍夫电流定律&#xff08;KCL&#xff09;和电压定律&#xff08;KVL&#xff09;是电路分析的基石。在电子电路设计领域&#xff0c;这两个定律发挥着关键作用。以…

矩阵交换行(信息学奥赛一本通-1119)

【题目描述】 给定一个55的矩阵(数学上&#xff0c;一个rc的矩阵是一个由r行c列元素排列成的矩形阵列)&#xff0c;将第n行和第m行交换&#xff0c;输出交换后的结果。 【输入】 输入共6行&#xff0c;前5行为矩阵的每一行元素,元素与元素之间以一个空格分开。 第6行包含两个整…

SWPU 2022 新生赛

webdog1__start if (isset($_GET[web])) {$first$_GET[web];if ($firstmd5($first)) md5 自等 web0e215962017 &#xff08;md5后也是 0e) 登入后得到提示&#xff0c;robots.txt 访问 f14g.php 返回包里发现 hint > if (isset($_GET[get])){$get$_GET[get];if(!strs…

什么是 Redis

Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的、基于内存的键值存储系统&#xff0c;常用作数据库、缓存和消息中间件。它支持多种数据结构&#xff0c;如字符串、哈希、列表、集合、有序集合等&#xff0c;并提供丰富的操作命令。 主要特点 高性能&am…

海数通-海员数字化管理平台(企业数字化)

产品介绍 海数通产品是一套海员数字化管理平台&#xff0c;为企业提供海员档案、海历、动态、证书、培训、晋升、薪资、社险等全流程多维度的智能化管理能力。薪资计算模型和社险计算模型能够自动并准确的完成复杂的工资、社险计算工作&#xff0c;极大的提高工作效率和准确性…

如何处理PHP中的日期和时间问题

如何处理PHP中的日期和时间问题 在PHP开发中&#xff0c;日期和时间的处理是一个常见且重要的任务。无论是记录用户操作时间、生成时间戳&#xff0c;还是进行日期计算&#xff0c;PHP提供了丰富的函数和类来帮助开发者高效处理这些需求。本文将详细介绍如何在PHP中处理日期和…

【SpringMVC】入门版

1.基本概念 1.1三层架构 三层架构也就是我们常说的b/s架构中的表现层&#xff0c;业务层和持久层,每层都各司其职&#xff0c;下面来分别讲解这三层的作用。 表现层&#xff1a; 也就是我们常说的web层。它负责接收客户端的请求&#xff0c;向客户端响应结果&#xff0c;通…

【Java进阶学习 第八篇】石头迷阵游戏

绘制页面 首先绘制指定宽和高的窗体 JFrame frame new JFrame();frame.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);frame.setSize(514,595);frame.setTitle("石头迷阵单机版v1.0");//想让游戏一直在屏幕的最上层frame.setAlwaysOnTop(true);//想让窗…

wireguard搭配udp2raw部署内网

前言 上一篇写了使用 wireguard 可以非常轻松的进行组网部署&#xff0c;但是如果服务器厂商屏蔽了 udp 端口&#xff0c;那就没法了 针对 udp 被服务器厂商屏蔽的情况&#xff0c;需要使用一款 udp2raw 或 socat 类似的工具&#xff0c;来将 udp 打包成 tcp 进行通信 这里以…

Node.js REPL 深入解析

Node.js REPL 深入解析 引言 Node.js 作为一种流行的 JavaScript 运行环境,在服务器端开发中扮演着重要角色。REPL(Read-Eval-Print Loop,读取-求值-打印循环)是 Node.js 的一个核心特性,它允许开发者在一个交互式环境中执行 JavaScript 代码。本文将深入探讨 Node.js R…

系统可观测性(5)OpenTelemetry基础使用

系统可观测性(5)OpenTelemetry基础概念 Author: Once Day Date: 2025年3月12日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 本文档翻译整理自《OpenTelemetry Docs》&a…

AVL树的平衡算法的简化问题

AVL树是一种紧凑的二叉查找树。它的每个结点&#xff0c;都有左右子树高度相等&#xff0c;或者只相差1这样的特性。文章https://blog.csdn.net/aaasssdddd96/article/details/106291144给出了一个例子。 为了便于讨论&#xff0c;这里对AVL树的结点平衡情况定义2个名称&#…

Jenkins 集成DingDing 推送

现状分析 开发频繁发布代码&#xff0c;和测试没有及时沟通&#xff0c;导致测试返工、bug漏测等 解决方案 Jenkins 集成DingDing机器人&#xff0c;在构建时触发推送 DingDing端机器人配置 1、在钉钉电脑端建立群聊 2、点击群右上角设置&#xff0c;点击【智能群助手】 …

【Quarkus】通过Quarkus集成后端服务示例

说明&#xff1a; REST资源接口&#xff08;AuthResource&#xff09;。REST资源实现类&#xff08;AuthResourceImpl&#xff09;。服务接口&#xff08;AuthService&#xff09;。服务实现类&#xff08;AuthServiceImpl&#xff09;。配置文件&#xff08;application.prop…

硬件驱动——51单片机:独立按键、中断、定时器/计数器

目录 一、独立按键 1.原理 2.封装函数 3.按键控制点灯 数码管 二、中断 1.原理 2.步骤 3.中断寄存器IE 4.控制寄存器TCON 5.打开外部中断0和1 三、定时器/计数器 1.原理 2.控制寄存器TCON 3.工作模式寄存器TMOD 4.按键控制频率的动态闪烁 一、独立按键 1…

基于PMU的14节点、30节点电力系统状态估计MATLAB程序

“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 程序简介&#xff1a; 程序采用三种方法对14节点和30节点电力系统状态进行评估&#xff1a; ①PMU同步向量测量单元结合加权最小二乘法&#xff08;WLS&#xff09;分析电力系统的电压幅值和相角状态&#xff1b; …

Apifox Helper 自动生成API接口文档

在我们开发过程中我们在编写请求地址和编写请求参数的时候特别花费时间耗费了我们很多时间&#xff0c;作为一个程序员&#xff0c;更应该把精力时间集中在开发上&#xff0c; Apifox Helper 是 Apifox 团队针对 IntelliJ IDEA 环境所推出的插件&#xff0c;可以在 IDEA 环境中…

Python 3.13实现数据未来预测功能(详细功能实现及环境搭建)

目录 摘要 1. 导入所需库 2. 加载和查看数据 3. 数据预处理 4. 拆分数据集 5. 模型训练 6. 模型评估 7. 进行预测 结论 摘要 本文将引导您使用Python 3.13实现数据预测功能。我们将使用常用的Python库, 如pandas、numpy和sklearn&#xff0c;来帮助读者快速搭建一个简…