SpringBoot整合Kafka (二)

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️
上文链接:SpringBoot整合Kafka (一)

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、消费者
        • 2.1、Kafka应答机制
          • ACK应答级别
        • 2.2、Kafka消息消费确认机制
          • 自动提交
          • 手动提交
        • 2.3、指定消费
          • 监听一个主题,指定分区消费消息
  • 📑文章末尾


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;@Test
void contextLoads() {ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {@KafkaListener(topics = "test01-topic")public void readMsg(String msg){System.out.println("msg = " + msg);}
}

2、消费者

2.1、Kafka应答机制

在生产者(producer)往Kafka发送数据的进程中,为了确保数据能够发送到指定的topic中,topic中的每一个partition在收到数据后,都需要向生产者发送 ack(ackacknowledgement)。

假设 producer 在必定的时间内收不到应对,那么producer会再次向Kafka发送此条数据。这就类似于写信,假定我们写一封信给或人,然后我们会在一段时间后收到一封回信,但假设超过了一个月我们还没有收到回信,就会猜想是不是信件丢掉了,会将这封信进行从头发送,直到收到回信中止。

ACK应答级别

一、0
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了

二、1
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了

三、-1(all)
介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
数据可靠性分析:可靠

spring:kafka:bootstrap-servers: 192.168.***.***:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数properties:linger.ms: 0 # spring.kafka.producer.properties.linger.ms=0,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafkaacks: 1 # 修改ACK应答级别,默认是1
2.2、Kafka消息消费确认机制

Kafka消费消息确认机制分为两种:自动确认和手动确认。
(1)自动确认:在自动确认模式下,Kafka消费者消费一条消息后,会自动将消息偏移量提交到服务器端,不需要手动进行确认,从而确保消息被有效处理。此种确认机制的优点是操作简单,但是可能会导致消息重复消费,即当消费者处理消息的过程中出现异常,导致偏移量提交失败,下一次启动时就会重新消费之前已经处理过的消息。
(2)手动确认:在手动确认模式下,消费者需要显式地调用commit()方法,将消息的偏移量提交到服务器端,才会被标记为已处理。手动确认模式下,可以避免重复消费的问题,但是需要开发者自己实现确认逻辑,增加了一定的开发复杂度。
总的来说,自动确认适用于对消息的可靠性要求不高、实时性较高的场景;手动确认适用于对消息的可靠性要求较高、不要求实时性的场景

自动提交

这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

server:port: 18082
spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.168.***.***:9093,192.168.***.***:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 120000auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

设置enable-auto-commit: true,开启自动提交,也就是偏移量不需要我们手动提交,程序会自己提交。
设置auto.commit.interval.ms=120000,也就是消费后,不会立即提交,会在2分钟后提交,只要在这期间服务异常终止,偏移量就无法提交到Broker,再次启动,会重复消费。

手动提交

手动提交模式可以有效确保消息不丢失以及不重复消费

MANUAL:poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。
我们可以先测试一下MANUAL模式,只需要需改配置application.yml即可:

spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.***.***.130:9093,192.***.***.130:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: false # 是否自动提交offsetauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual # 手动添加偏移量

消费者代码

@KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup")
public void listener(ConsumerRecord<String,String> record, Acknowledgment ack){//获取消息String message = record.value();//消息偏移量long offset = record.offset();System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);//手动提交偏移量ack.acknowledge();
}
2.3、指定消费

属性解释:
id:消费者ID
groupId:消费组ID
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。

监听一个主题,指定分区消费消息
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA 函数下边出现红色的波浪线,提示报错

Inferred annotations: Method makeOkResult: org.jetbrains.annotations.Contract("_, _, _, _ -> new") org.jetbrains.annotations.NotNull Parameter headers: org.jetbrains.annotations.NotNull 出现这个提示&#xff0c;我应该怎么处理这个函数&#xff1…

Lua中如何使用continue,goto continue(模拟C++ C#的continue)

Lua中模拟goto continue(模拟C C#的continue 介绍具体方法goto continuewhile模拟continue方法 总结 介绍 在C#或者C里面应该都见过continue&#xff0c;他的用法其实就是打断当前循环直接直接进入下次循环的&#xff0c;代码如下&#xff1a; for (int i 0; i < 10; i){i…

Angular项目升级最佳实践

1、查看第三方依赖库各版本的重大变化&#xff0c;确定待升级的Angular版本以及第三方库的版本。 2、在SourceTree中新建一个源码分支&#xff0c;用作项目升级。 3、安装新版node.js 4、打开https://update.angular.io/?v10.0-16.0&#xff0c;选择angular新旧版本&#x…

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…

查看apk签名

cmd 命令&#xff1a; keytool -v -list -keystore "E:\xxx\release.jks"

kubernetes集群编排——k8s存储(configmap,secrets)

configmap 字面值创建 kubectl create configmap my-config --from-literalkey1config1 --from-literalkey2config2kubectl get cmkubectl describe cm my-config 通过文件创建 kubectl create configmap my-config-2 --from-file/etc/resolv.confkubectl describe cm my-confi…

深入剖析React Hooks中的 useCallback

前言 自 React 16.8 版本引入 Hooks 以来&#xff0c;useCallback 成为了前端开发者们越来越青睐的一个功能。useCallback 可以有效优化组件性能&#xff0c;尤其在处理函数式组件中的状态更新时。本文将详细介绍 useCallback 的用法及其注意事项。 1. useCallback 简介 use…

Unreal UnLua + Lua Protobuf

Unreal UnLua Lua Protobuf https://protobuf.dev/ protobuf wire format&#xff1a;pb 编译到底层的数据协议 https://github.com/starwing/lua-protobuf/blob/master/README.zh.md buffer 处理 lua string 可以当 buffer 用&#xff0c;# len 不会遇到 0 截断&#xf…

算法leetcode|85. 最大矩形(rust重拳出击)

文章目录 85. 最大矩形&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;样例 4&#xff1a;样例 5&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 85. 最…

Python算法例8 将整数A转换为B

1. 问题描述 给定整数A和B&#xff0c;求出将整数A转换为B&#xff0c;需要改变bit的位数。 2. 问题示例 把31转换为14&#xff0c;需要改变2个bit位&#xff0c;即&#xff1a;&#xff08;31&#xff09;10&#xff08;11111&#xff09;2&#xff0c;&#xff08;14&…

c语言的内存使用

#include <stdio.h> #include <stdlib.h> typedef struct info{int a;char b; }Info, *INFO;int main(){INFO ptr (INFO)malloc(sizeof(Info) *3);ptr[0].a 100;ptr[1].b c;printf("[%c]\n", ptr[1].b);free(ptr)return 0; } 定义一个结构体Info&…

CAN 协议常见面试题总结

0.讲一下CAN通讯的过程 第一段&#xff1a;需要发送的通讯设备&#xff0c;先发送一个显性电平0&#xff0c;告诉其他通讯设备&#xff0c;需要开始通讯。 第二段&#xff1a;就是发送仲裁段&#xff0c;其中包括ID帧和数据帧类型&#xff0c;告诉其他通讯设备&#xff0c;需…

alipay sofa-ark-1.1.5 各种类加载器 优先级

. 各种类加载器 & 优先级 /*** <pre>* 类加载器的使用优先级&#xff08;由高到底&#xff09;* 0、JDKDelegateClassLoader* 1、ContainerClassLoader* 2、hook级别类{前置}加载器* 3、PluginClassLoader* 4、B…

uniapp分包

以下是一个完整的 Uniapp 分包示例&#xff0c;代码分布在不同的文件夹中&#xff0c;其中包含了两个子包 sub1 和 sub2&#xff0c;以及一个主包 main。 在项目根目录下创建 pages 文件夹&#xff0c;并在其中创建各个页面的文件夹。 在每个页面文件夹中创建对应的 vue 文件和…

【SSD1306 OLED屏幕测试程序 (开源)orangepi zero2 全志H616 】.md updata: 23/11/07

orangepi zero2 H616 SSD1306 OLED屏幕测试程序 orangepi zero2 配置wiringpi 库后&#xff0c;突发奇想构建一个测试oled屏幕的程序&#xff0c;放一个蜗牛每次移动一个像素点&#xff0c;实时显示蜗牛的步数&#xff0c;后面要显示其他内容在此代码上修改即可&#xff0c;如…

flask和fastapi的区别以及demo实现

flask和fastapi的区别以及demo实现 flask和fastapi的区别fastapi简单demoFastAPI包括全局异常捕捉和参数验证的demoflask和fastapi的区别 Flask:Flask是一个轻量级的Web框架,它提供了最基本的工具,可以自由选择其他库和组件来构建应用。灵活性:Flask允许用户自由选择数据库、…

NLP之BM25:BM25算法的简介、相关库、案例应用之详细攻略

NLP之BM25:BM25算法的简介、相关库、案例应用之详细攻略 目录 相关文章 NLP之BM25:BM25算法的简介、相关库、案例应用之详细攻略 Py之rank_bm25:rank_bm25的简介、安装、使用方法 BM25算法的简介

智慧农业:农林牧数据可视化监控平台

数字农业是一种现代农业方式&#xff0c;它将信息作为农业生产的重要元素&#xff0c;并利用现代信息技术进行农业生产过程的实时可视化、数字化设计和信息化管理。能将信息技术与农业生产的各个环节有机融合&#xff0c;对于改造传统农业和改变农业生产方式具有重要意义。 图扑…

Android Studio(项目收获)

取消按钮默认背景色 像按钮默认背景色为深蓝色&#xff0c;即使使用了background属性指定颜色也不能生效。 参考如下的解决方法&#xff1a; 修改/res/values/themes.xml中的指定内容如下&#xff1a; <style name"Theme.TianziBarbecue" parent"Theme.Mater…

OSCP系列靶场-Esay-Dawn

总结 getwebshell → SMB共享无密码 → SMB存在上传功能 → 存在周期执行任务 → SMB上传反弹shell → 被执行获得webshell 提 权 思 路 → suid发现zsh → -p容器提权 准备工作 启动VPN 获取攻击机IP > 192.168.45.163 启动靶机 获取目标机器IP > 192.168.242.11 信…