手拉手springboot整合kafka

前期准备安装kafka

启动Kafka本地环境需Java 8+以上

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。

Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

解压tar -xzf kafka_2.13-3.7.0.tgz

一、Zookeeper启动Kafka(kafka内置zookeeper)

Kafka依赖Zookeeper

1、启动Zookeeper 2、启动Kafka

使用kafka自带Zookeeper启动

./zookeeper-server-start.sh ../config/zookeeper.properties &

./zookeeper-server-stop.sh ../config/zookeeper.properties

./kafka-server-start.sh ../config/server.properties &

./kafka-server-stop.sh ../config/server.properties

二、Zookeeper服务器启动Kafka

Zookeeper服务器安装

https://zookeeper.apache.org/

https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

tar zxvf apache-zookeeper-3.9.2-bin.tar.gz

配置Zookeeper服务器

cp zoo_sample.cfg zoo.cfg

启动Zookeeper服务器

./zkServer.sh start

修改Zookeeper端口

Zoo.cfg添加内容

admin.serverPort=8099

apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper

Zookeeper服务器启动kafka

/opt/kafka_2.13-3.7.0/bin目录下

./kafka-server-start.sh ../config/server.properties &

Kafka配置文件server.properties

三、使用KRaft启动Kafka

UUID通用唯一识别码(Universally Unique Identifier)

1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid

2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties

3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &

springboot集成kafka

创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)

修改server.properties文件

vim server.properties

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.68.133:9092

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

代码语言:java

spring:kafka:bootstrap-servers: 192.168.68.133:9092consumer:auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

脚本重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832259.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP定时任务框架taskPHP3.0学习记录7宝塔面板手动可以执行自动无法执行问题排查及解决方案(sh脚本、删除超过特定天数的日志文件、kill -9)

PHP定时任务框架taskPHP3.0学习记录 PHP定时任务框架taskPHP3.0学习记录1&#xff08;TaskPHP、执行任务类的实操代码实例&#xff09;PHP定时任务框架taskPHP3.0学习记录2&#xff08;环境要求、配置Redis、crontab执行时间语法、命令操作以及Screen全屏窗口管理器&#xff0…

深入剖析Tomcat(六) Tomcat各组件的生命周期控制

Catalina中有很多组件&#xff0c;像上一章提到的四种容器&#xff0c;载入器&#xff0c;映射器等都是一种组件。每个组件在对外提供服务之前都需要有个启动过程&#xff1b;组件在销毁之前&#xff0c;也需要有个关闭过程&#xff1b;例如servlet容器关闭时&#xff0c;需要调…

字符串循环左移

#include <iostream> #include <string> using namespace std;int main() {string s1, s2;getline(cin, s1);int n;cin >> n;if(n>s1.size()){nn-s1.size();s2 s1.substr(0, n);s1.erase(0, n);cout << s1s2;}else{// 提取s1的前n个字符到s2中s2 …

MyBatis 多表映射及动态语句

三、MyBatis多表映射 3.1 多表映射概念 多表查询结果映射思路 前面说明中&#xff0c;我全面梳理了单表的mybatis操作&#xff01;但是开发中更多的是多表查询需求&#xff0c;这种情况我们如何让进行处理&#xff1f;MyBatis 思想是&#xff1a;数据库不可能永远是你所想或…

金融行业专题|信托超融合架构转型与场景探索合集

文章包含 15 信托用户基于超融合实现私有云建设、平台云下迁、信创云转型、容器云探索等场景实践分享。下载《【核心业务篇】金融核心生产业务场景探索文章合集》、《【信创转型与架构升级篇】金融核心生产业务场景探索文章合集》、《【数据库与数据仓库篇】金融核心生产业务场…

编程入门(六)【Linux系统基础操作一】

读者大大们好呀&#xff01;&#xff01;!☀️☀️☀️ &#x1f525; 欢迎来到我的博客 &#x1f440;期待大大的关注哦❗️❗️❗️ &#x1f680;欢迎收看我的主页文章➡️寻至善的主页 文章目录 &#x1f525;前言&#x1f680;Linux操作系统介绍与环境准备Linux操作系统介…

Windows远程桌面实现之十四:实现AirPlay接收端,让苹果设备(iOS,iPad等)屏幕镜像到PC端

by fanxiushu 2024-05-04 转载或引用请注明原始作者。 这个课题已经持续了好几年&#xff0c;已经可以说是很长时间了。 实现的程序是 xdisp_virt&#xff0c; 可以去github下载使用:GitHub - fanxiushu/xdisp_virt: xfsredir file system 一开始是基于测试镜像驱动的目的随便开…

Vue前端环境准备

vue-cli Vue-cli是Vue官方提供的脚手架&#xff0c;用于快速生成一个Vue项目模板 提供功能&#xff1a; 统一的目录结构 本地调试 热部署 单元测试 集成打包上线 依赖环境&#xff1a;NodeJs 安装NodeJs与Vue-Cli 1、安装nodejs&#xff08;已经安装就不用了&#xff09; node-…

linux文本三剑客之grep

目录 1、三剑客特点和应用场景 2、三件客之grep 1) -v 参数使用示例&#xff1a; 1、三剑客特点和应用场景 命令特点场景grep过滤grep命令过滤速度最快sed替换&#xff0c;修改文件内容&#xff0c;取行 如果要进替换/修改文件内容 取出某个范围的内容&#xff08;从中午12.到…

【stomp 实战】spring websocket用户消息发送源码分析

这一节&#xff0c;我们学习用户消息是如何发送的。 消息的分类 spring websocket将消息分为两种&#xff0c;一种是给指定的用户发送&#xff08;用户消息&#xff09;&#xff0c;一种是广播消息&#xff0c;即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了…

我们说的数据分析,到底要分析些什么?

作者 Gam 本文为CDA志愿者投稿作品 “我们说数据分析&#xff0c;到底要分析些什么&#xff1f;” 数据分析这个话题自从进入人们的视线以来&#xff0c;这个话题就成为人们茶余饭后的谈资&#xff0c;但是一千个人眼中就有一千个哈姆雷特&#xff0c;就意味着每个人对数据分…

使用Photoshop压缩图片大小的4种方法

使用Photoshop压缩图片大小&#xff0c;一般可采用下面4种方法&#xff1a; 1.调整图片分辨率&#xff1a; 打开需要压缩的图片文件。 依次点击菜单栏中的“图像”>“图像大小”。 在弹出的对话框中&#xff0c;通过调整分辨率参数来减小文件大小。 2.降低图片品质&#…

什么是水经微图注册码?

水经微图&#xff08;以下简称“微图”&#xff09;注册码&#xff0c;是微图的一种授权方式。 什么是微图注册码&#xff1f; 注册码仅可授权一台电脑&#xff0c;绑定CPU和网卡&#xff0c;激活后不可更换电脑使用。 如果CPU或网卡被更换&#xff0c;以及电脑损坏无法开机…

数据库中索引的底层原理和SQL优化

文章目录 关于索引B 树的特点MySQL 为什么使用 B 树&#xff1f; 索引分类聚簇索引 和 非聚簇索引覆盖索引索引的最左匹配原则索引与NULL索引的代价大表结构修改 SQL优化EXPLAIN命令选择索引列其它细节 关于索引 索引是一种用来加快查找效率的数据结构&#xff0c;可以简单粗暴…

卸载、安装、配置快捷mysql

卸载mysql 1、筛选过滤出mysql相关组件 rpm -qa | grep mysql2、关闭MySQL服务 systemctl stop mysql.service 3、卸载对应组件命令如下&#xff1a; rpm -ev --nodeps [显示的组件名称] 4、查找MySQL对应的所有文件夹 find / -name mysql rm -rf [显示的文件夹路径] 检查…

基于若依框架搭建网站的开发日志(一):若依框架搭建、启动、部署

RuoYi&#xff08;基于SpringBoot开发的轻量级Java快速开发框架&#xff09; 链接&#xff1a;开源地址 若依是一款开源的基于VueSpringCloud的微服务后台管理系统&#xff08;也有SpringBoot版本&#xff09;&#xff0c;集成了用户管理、权限管理、定时任务、前端表单生成等…

linux的基础入门(2)

环境变量 在Shell中&#xff0c;正确的赋值语法是没有空格的&#xff0c;即变量名数值。所以&#xff0c;正确的方式是&#xff1a; tmpshy 这样就将变量tmp赋值为"shy"了。 注意&#xff1a;并不是任何形式的变量名都是可用的&#xff0c;变量名只能是英文字母、…

【neteq】tgcall的调用、neteq的创建及接收侧ReceiveStatisticsImpl统计

G:\CDN\P2P-DEV\Libraries\tg_owt\src\call\call.cc基本是按照原生webrtc的来的:G:\CDN\P2P-DEV\tdesktop-offical\Telegram\ThirdParty\tgcalls\tgcalls\group\GroupInstanceCustomImpl.cpptg对neteq的使用 worker 线程创建call Call的config需要neteqfactory Call::CreateAu…

Java中使用RediSearch进行高效数据检索

RediSearch是一款构建在Redis上的搜索引擎&#xff0c;它为Redis数据库提供了全文搜索、排序、过滤和聚合等高级查询功能。通过RediSearch&#xff0c;开发者能够在Redis中实现复杂的数据搜索需求&#xff0c;而无需依赖外部搜索引擎。本文将介绍如何在Java应用中集成并使用Red…