适合java程序员的Kafka消息中间件实战

创作的初心:

我们在学习kafka时,都是基于大数据的开发而进行的讲解,这篇文章为java程序员为核心,助力大家掌握kafka实现。

什么是kafka:

历史:

  • 诞生与开源(2010 - 2011 年)
    • 2010 年,Kafka 由 LinkedIn 公司的工程师团队开发,用于处理公司内部的大规模实时数据,如用户活动、系统日志等。
    • 2011 年,Kafka 开源并成为 Apache 软件基金会的孵化项目,吸引了社区的广泛关注和参与。
  • 成为顶级项目(2012 - 2013 年)
    • 2012 年,Kafka 从 Apache 孵化项目毕业,成为 Apache 顶级项目,标志着它在开源社区中获得了广泛认可。
    • 这一时期,Kafka 的功能不断丰富,性能也得到了进一步提升,逐渐被越来越多的公司用于构建实时数据处理系统。
  • 蓬勃发展(2014 - 2017 年)
    • 2014 年,Confluent 公司成立,专注于 Kafka 的商业化推广和技术支持,为 Kafka 的发展提供了强大的商业推动力量。
    • 随着大数据和云计算技术的兴起,Kafka 作为高性能的消息队列和流处理平台,在数据处理领域的应用越来越广泛,许多公司开始将 Kafka 作为其数据基础设施的重要组成部分。
    • 2017 年,Kafka 发布了 0.11.0 版本,引入了幂等性和事务支持,进一步提升了 Kafka 在处理精确一次语义(Exactly - once Semantics)场景下的能力,使其更适合用于对数据一致性要求较高的业务场景。
  • 持续创新(2018 - 2022 年)
    • 2018 年,Kafka 发布了 2.0.0 版本,对 Kafka 的架构进行了一些重大改进,如引入了 Kafka Connect 用于数据集成,Kafka Streams 用于流处理等,使 Kafka 不仅仅是一个消息队列,还成为了一个功能强大的流计算平台。
    • 2020 年,Kafka 发布了 2.8.0 版本,开始引入 KIP - 500,即 Kafka Raft(KRaft),逐步实现不依赖 Zookeeper 的目标,开启了 Kafka 架构的重大变革。
    • 2022 年,Kafka 持续推进 KRaft 的发展,不断完善其功能和性能,为 Kafka 的未来发展奠定了坚实的基础。
  • 成熟与拓展(2023 年 - 至今)
    • 2023 年,Kafka 在 KRaft 模式下不断成熟,社区继续致力于提升 Kafka 的性能、稳定性和安全性,同时拓展其在更多领域的应用,如物联网、金融科技等。
    • 随着技术的不断发展,Kafka 将继续适应新的业务需求和技术趋势,不断演进和完善,保持其在分布式消息队列和流处理领域的领先地位。

总结:Kafka 是一个分布式的、高吞吐量的消息队列系统,由 Apache 软件基金会开发,最初是由 LinkedIn 公司开发并开源。

核心特点

  • 高吞吐量:Kafka 能够处理大量的消息,每秒可以处理数千甚至数百万条消息,这得益于其分布式架构和高效的存储机制。它采用了顺序读写磁盘、零拷贝等技术,大大提高了数据的读写速度。
  • 可扩展性:Kafka 集群可以很容易地扩展,通过添加新的节点(broker)可以线性地增加集群的处理能力和存储容量。同时,它支持自动的负载均衡,能够将数据和请求均匀地分布在各个节点上。
  • 持久性和可靠性:Kafka 将消息持久化到磁盘上,并通过副本机制来保证数据的可靠性。每个消息在多个节点上有副本,当某个节点出现故障时,其他副本可以继续提供服务,确保数据不会丢失。
  • 高并发:它能够支持大量的生产者和消费者同时并发地读写消息,通过分区和多副本机制,可以实现对消息的并行处理,提高系统的整体性能。

主要组件

  • 生产者(Producer):负责将消息发送到 Kafka 集群。生产者可以将消息发送到指定的主题(Topic),并可以指定消息的键(Key)和值(Value)。根据消息的键,Kafka 可以将消息分区,以便更好地进行数据的存储和处理。
  • 消费者(Consumer):从 Kafka 集群中读取消息进行消费。消费者属于一个消费者组(Consumer Group),每个消费者组可以有多个消费者实例。同一消费者组中的消费者会均衡地消费主题中的各个分区,不同消费者组之间相互独立,每个消费者组都会独立地从 Kafka 中获取消息。
  • 主题(Topic):是 Kafka 中消息的逻辑分类,类似于数据库中的表。每个主题可以分为多个分区(Partition),每个分区是一个有序的、不可变的消息序列。消息在分区中按照顺序进行存储,并且每个消息都有一个唯一的偏移量(Offset)来标识其在分区中的位置。
  • 代理(Broker):Kafka 集群中的服务器节点称为代理。每个代理负责处理一部分主题的分区,并将消息持久化到本地磁盘。代理之间通过网络进行通信,共同组成一个分布式的集群,实现数据的复制、备份和负载均衡等功能。

下载Kafka及命令行使用:

下载地址:

大家可以自行在官网下载:Apache Kafka

启动的方式:

kafka本身是不区分操作系统的,他的目录中我们可以发现它提供了Windows下的启动方式,

小编的版本是kafka_2.13-3.3.1

依赖 ZooKeeper(Kafka 2.8 之前)

1. 启动 ZooKeeper

ZooKeeper 是一个分布式协调服务,Kafka 依赖它来存储元数据。
在 Kafka 安装目录下,使用命令行工具执行以下命令启动 ZooKeeper:

# Windows 系统
.\bin\windows\zookeeper-server-start.bat ../../config/zookeeper.properties
# Linux 或 macOS 系统
bin/zookeeper-server-start.sh config/zookeeper.properties
2. 启动 Kafka 服务

在 ZooKeeper 成功启动后,你可以启动 Kafka 服务。同样在 Kafka 安装目录下,使用以下命令启动:

# Windows 系统
.\bin\windows\kafka-server-start.bat ../../config/server.properties
# Linux 或 macOS 系统
bin/kafka-server-start.sh config/server.properties

不依赖 ZooKeeper(Kafka 2.8 及之后)

在 Kafka 2.8 及更高版本中,可以不依赖 ZooKeeper 启动 Kafka,使用 KRaft 模式。步骤如下:

但是这种方式在windows上不好用,产生错误,没有官方的解决方式(但是有一位大神写了一个补丁版本的kafka,大家可以搜索着下载一下)。还是使用Linux操作系统吧。

1. 生成集群 ID
# Windows 系统
.\bin\windows\kafka-storage.bat random-uuid
# Linux 或 macOS 系统
./kafka-storage.sh random-uuid

执行上述命令后,会生成一个随机的 UUID,你需要记住这个 UUID,后续步骤会用到。

2. 格式化存储目录

使用上一步生成的 UUID 来格式化存储目录,在命令行中执行以下命令:

# Windows 系统
D:\soft_setup\kafka_2.13-3.3.1\bin\windows>kafka-storage.bat format -t ZlDD6NrNQk2fiMxF4-iB8w -c ../../config/kraft/server.properties
# Linux 或 macOS 系统
./kafka-storage.sh format -t svPXC5N-SIiymvhRKPwZ3g -c  ../config/kraft/server.properties

请将 <your-uuid> 替换为第一步中生成的实际 UUID。

3. 启动 Kafka 服务

格式化完成后,就可以启动 Kafka 服务了:

# Windows 系统
.\bin\windows\kafka-server-start.bat .\config\kraft\server.properties
# Linux 或 macOS 系统
./kafka-server-start.sh ../config/kraft/server.properties
4.关闭kafka
# Windows 系统
# Linux 或 macOS 系统./kafka-server-stop.sh ../config/kraft/server.properties

早期版本的kafka和zookeeper的关系:

  • Kafka 依赖 Zookeeper 进行元数据管理
    • Kafka 的集群信息、主题信息、分区信息以及消费者组的偏移量等元数据都存储在 Zookeeper 中。例如,当创建一个新的主题时,相关的主题配置和分区分配信息会被写入 Zookeeper。
    • Zookeeper 以树形结构存储这些元数据,使得 Kafka 能够方便地进行查询和更新操作,从而让 Kafka broker 可以快速获取到所需的元数据信息来处理客户端的请求。
  • Zookeeper 为 Kafka 提供集群管理功能
    • Kafka 集群中的 broker 节点会在 Zookeeper 上进行注册,通过 Zookeeper 的节点创建和观察机制,Kafka 可以实时感知到集群中 broker 节点的动态变化,如节点的加入或退出。
    • 当有新的 broker 节点加入集群时,它会向 Zookeeper 注册自己的信息,其他 broker 节点通过监听 Zookeeper 上的相关节点变化,就能及时发现新节点的加入,并进行相应的协调和数据分配操作。
  • Zookeeper 协助 Kafka 进行分区 leader 选举
    • 在 Kafka 中,每个分区都有一个 leader 副本和多个 follower 副本。当 leader 副本所在的 broker 节点出现故障时,需要选举出一个新的 leader。
    • Zookeeper 通过其选举机制,能够快速确定哪个 follower 副本可以成为新的 leader,确保分区的读写操作能够尽快恢复,保证了 Kafka 集群的高可用性和数据的一致性。
  • Zookeeper 帮助 Kafka 实现消费者组管理
    • 消费者组的成员信息、消费偏移量以及消费者组的协调等工作都依赖于 Zookeeper。消费者在启动时会向 Zookeeper 注册自己所属的消费者组和相关信息。
    • Zookeeper 会监控消费者组中各个消费者的状态,当有消费者加入或离开组时,会触发重新平衡操作,确保每个分区能够被合理地分配给消费者组中的消费者进行消费,从而实现了消费者组对主题分区的负载均衡消费

发展进程:

Kafka 从 2.8.0 版本开始引入 KIP-500,实现了 Raft 分布式一致性机制,开启了不依赖 Zookeeper 的进程1。但在 2.8.0 版本中,Zookeeper - less Kafka 还属于早期版本,并不完善1。

到 3.3 版本时,Kafka Raft(KRaft)被标记为生产就绪,具备了生产环境使用的条件3。3.4 版本提供了从 Zookeeper 模式到 KRaft 模式的早期访问迁移功能,3.5 版本中迁移脚本正式生产就绪,同时弃用了 Zookeeper 支持。

直至 4.0 版本,Zookeeper 被彻底移除,所有版本完全基于 KRaft 模式运行,Kafka 不再依赖 Zookeeper,这标志着 Kafka 在摆脱 Zookeeper 依赖方面的工作基本完成。

注意:下面小编的操作是基于Linux系统

kafka的主题Topic和事件Event

主题(Topic)

定义

主题是 Kafka 中消息的逻辑分类,类似于数据库中的表或者文件系统中的文件夹,用于对消息进行归类和管理。每个主题可以有多个生产者向其发送消息,也可以有多个消费者从其读取消息。

特点

可分区:一个主题可以被划分为多个分区(Partition),每个分区是一个有序的、不可变的消息序列。分区可以分布在不同的服务器上,从而实现数据的分布式存储和并行处理,提高系统的吞吐量和可扩展性。

多副本:为了保证数据的可靠性和高可用性,每个分区可以有多个副本(Replica),这些副本分布在不同的 Broker 上。其中一个副本被指定为领导者(Leader),负责处理读写请求,其他副本作为追随者(Follower),与领导者保持数据同步。

消息持久化:Kafka 将消息持久化到磁盘上,即使服务器重启,消息也不会丢失。消息会根据一定的保留策略在磁盘上保留一段时间,过期的消息将被自动删除,以释放磁盘空间。

创建主题:

通过kafka-topics.sh脚本语言创建主题,直接运行这个脚本就会告诉你如何使用这个脚本。

命令:

//使用脚本创建主题

 ./kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

//-creat  创建一个主题

//--topic 后面的hello是我们的topic的名字

// bootstrap-server localhost:9092  这个是必须的后面指定的是我们当前节点的主机地址

查找主题:

./kafka-topics.sh --list --bootstrap-server localhost:9092

删除主题

 ./kafka-topics.sh --delete --topic hello --bootstrap-server localhost:9092

显示主题详细信息:

 ./kafka-topics.sh --describe --topic hello --bootstrap-server localhost:9092

修改分区数:

 ./kafka-topics.sh --alter --topic hello --partitions 5 --bootstrap-server localhost:9092

事件(Event)

  • 定义:在 Kafka 的语境中,事件通常指的是生产者发送到主题中的一条具体的消息(Message)。它是 Kafka 中数据传输和处理的基本单元,包含了消息的内容、键(Key)、时间戳等元数据。
  • 特点
    • 不可变性:一旦事件被发布到 Kafka 主题中,它就是不可变的,不能被修改或删除。这确保了消息的一致性和可追溯性。
    • 有序性:在同一个分区内,事件是按照它们被生产的顺序进行存储和消费的,保证了消息的顺序性。但不同分区之间的事件顺序是不确定的。
    • 灵活性:事件的内容可以是任何格式的数据,如 JSON、XML、二进制数据等,生产者和消费者可以根据自己的需求对消息进行编码和解码。

事件的发送和接收:

事件的发送:

时间发送的命令

./kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092

在之后的每次换行就是一条消息

 事件读取:

从头开始读:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

--from-beginning  添加这个参数就是在日志文件中的第一个消息开始读,如果不添加这个就是监听当前消息,之后生产者发送消息,消费者才会监听。 

小技巧:我们在学习时,总是依靠记忆是不对的,因为总会忘记的,最好的方式就是依靠我们的管官方文档和开发手册,对于kafka的命令,我们还是依靠他的帮助来实现。

kafka的远程连接:

下载kafka插件:

插件是:

免费插件

连接kafka:

解决无法远程连接问题:

课程地址:042 Docker容器Kafka配置文件修改_哔哩哔哩_bilibili 

视频中演示的是docker的配置,但是实际上那种方式都是这种修改方式

我们在远程连接kafka是,按着默认的配置文件是无法远程连接的,但是我们可以通过修改配置文件的方式达到远程连接的要求。

配置文件修改的位置是:

我们需要将配置文件修改为:

 

修改之后我们可以远程连接kafka. 

显示我们的topic,证明连接成功。 

Spring-boot集成Kafka 

快速开始

导入的依赖:

//他的版本是2.8.10 原因是因为我的jdk版本是1.8,如果是3.X的kafka,集成的boot版本是3.X,JDK版本是17,小编不再升级JDK版本了,所以直接使用2.8.10
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

编写配置文件: 

对于我们所有的MQ产品,配置文件几乎都是三个部分构成,服务器连接配置,生产者配置,消费之配置。三个部分生成。

对于我们现在常见的三个中间件分别是:RabbitMQ,RocketMQ,kafka.如果大家想学习其他的中间件的话,可以看小编的其他文章。为大家带来了详细的消息中间件实战。

两小时拿下RocketMQ实战_rocketmq使用案例-CSDN博客

快速上手RabbitMQ_逸Y 仙X的博客-CSDN博客

spring:kafka:#kafka连接地址bootstrap-servers: 192.168.0.169:9092#配置生产者 (24个配置,我们在这个基础班的base中全部使用默认配置)#producer:#配置消费者,27个配置#consumer:

 编写生产者:

@Component
public class EventProduce {//加入spring-kafka依赖之后,自动配置好了kafkaTemplate@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;public void sentEvent(){ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("hello", "这是使用java发送的第一条消息");}}//使用test进行测试的代码
@SpringBootTest
public class EventProduceTest {@Autowiredprivate EventProduce  eventProduce;@Testpublic void produceEventTest(){eventProduce.sentEvent();}
}

编写消费者:

注意点:使用@KafkaListener注解监听时,必须雨哦的两个参数是 topics 和 groupId

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author HanDK* @CreateTime: 2025/4/29 20:36*/
@Component
public class EventConsumer {//监听的方式@KafkaListener(topics = "hello",groupId ="hello-group" )//必须有的两个参数是topics  和  groupIdpublic void eventConsumer(String event){//默认是知识监听最新的消息System.out.println("读取的事件是"+event);}
}

展示结果:

详细解释配置文件:

consumer中的配置:auto-offset-reset: earliest

当前配置是可以读取更早的信息,也就是读取以签的信息 ,但是如果当前的消费组id已经消费过的话,kafka会记住偏移量,配置就不会生效,Kafka只会在中不到偏移量时,使用配置,可以手动重置偏移量,或者是使用新的id

spring:kafka:#kafka连接地址bootstrap-servers: 192.168.0.169:9092#配置生产者 (24个配置,我们在这个基础班的base中全部使用默认配置)#producer:#配置消费者,27个配置consumer:#当前配置是可以读取更早的信息,也就是读取以签的信息#但是如果当前的消费组id已经消费过的话,kafka会记住偏移量,配置就不会生效,Kafka只会在中不到偏移量时,使用配置,可以手动重置偏移量,或者是使用新的idauto-offset-reset: earliest

使用 新的id让配置生效:

@Component
public class EventConsumer {//监听的方式@KafkaListener(topics = "hello",groupId ="hello-group-02" )//必须有的两个参数是topics  和  groupIdpublic void eventConsumer(String event){//默认是知识监听最新的消息System.out.println("读取的事件是"+event);}
}

结果展示:

方式二:重置偏移量:

 命令:

将偏移量设置为最早开始位置

./kafka-consumer-groups.sh --bootstrap-server localhost:9092  --topic hello --group hello-group-02  --reset-offsets --to-earliest --execute

//设置为最后的偏移量位置

 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092  --topic hello --group hello-group-02  --reset-offsets --to-latest --execute

发送消息:

发送Message消息:

 /*** 发送Message消息*/public void sentMessgeEvent(){//todo 使用这种方式创建时,将topic的名字放在header中,这个方式的来与时在KafkaOperations这个类中Message<String> message = MessageBuilder.withPayload("使用message发送消息到hello topic").setHeader(KafkaHeaders.TOPIC,"hello-Message").build();kafkaTemplate.send(message);System.out.println("消息发送成功");}

发送ProducerRecord消息 

 /*** 发送ProducerRecord对象*/public void sendProducerRecord(){//使用headers,传递数据,消费者可以获得我们传输的数据Headers header = new RecordHeaders();header.add("phone","11111111111".getBytes(StandardCharsets.UTF_8));//public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)ProducerRecord<String,String> producerRecord = new ProducerRecord<>("hello",0, Instant.now().toEpochMilli(),"key1" ,"kafka value1",header);ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(producerRecord);System.out.println("发送成功");}

sent的重载方式:

    /*** sent的重载方式*/public void sentLong(){ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("hello", 0, Instant.now().toEpochMilli(), "key2", "value2");}

sendDefault方式:

他是一种通过配置文件,省略我们在发送是指定topic的返送方式,对于我们每次只是发送到相同的topic中可以采用的方式

 /*** 测试sendDefault方式*/public void sentDefault(){//大家可以看一下,这里面是不是没有指定我们的topic,如果直接发送的话就会出现错误//需要配置配置文件ListenableFuture<SendResult<String, String>> sendResultListenableFuture = kafkaTemplate.sendDefault( 0, Instant.now().toEpochMilli(), "key2", "value2");System.out.println("消息发送成功");}

配置文件: 

spring:kafka:template:#配置模板的默认的主题,使用sendDefault时,直接发送到hello中default-topic: hello

发送对象:

通过序列化的方式发送对象:

spring:kafka:#kafka连接地址bootstrap-servers: 192.168.0.169:9092#配置生产者 (24个配置,我们在这个基础班的base中全部使用默认配置)#producer:#配置消费者,27个配置producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializer#key-serializer: 键序列化,默认是StringSerializer
  /*** 发送对象* 直接序列化的时候出现异常,异常是StringSerializer,需要配置序列化方式*/public void sendObj(){//其实如果是发送对象的话,就是将对象进行序列化User user = User.builder().id(1).name("lihua").phone("13464299018").build();//如果分区是null,kafka自己选择放置到那个分区中template.send("hello",null,Instant.now().toEpochMilli(),"key3",user);}

发送状态接受:

同步方式:

    /*** 发送之后获取结果,阻塞的方式获取结果*/public void resultSent(){ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("hello", "这是使用java发送的第一条消息");try {//这个方法是阻塞的SendResult<String, String> sendResult = result.get();//RecordMetadata 如果是空就是没有接受到消息if(sendResult.getRecordMetadata()!=null){//kafka接受消息成功System.out.println("kafka接受消息成功");ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();String value = producerRecord.value();System.out.println("value = "+value);}} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}

异步方式:

spring-kafka 2.X
    /*** 发送之后使用异步的方式获取结果* 使用回调函数在ListenableFuture(kafka2.X),* 使用thenAccept()   thenApply()   thenRun() 等方式来注册回调函数, CompletableFuture(kafka3.X)完成时执行*/public void sendAsynchronous(){ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("hello", "这是使用java发送的第一条消息");result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println(ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {if(result.getRecordMetadata()!=null){//kafka接受消息成功System.out.println("kafka接受消息成功");ProducerRecord<String, String> producerRecord = result.getProducerRecord();String value = producerRecord.value();System.out.println("value = "+value);}}});}/*** 当前操作时将我们的返回值转化为CompletableFuture类型进行操作 结合了我们的3.X的方式实现* 发送之后使用异步的方式获取结果* 使用回调函数在ListenableFuture(kafka2.X),* 使用thenAccept()   thenApply()   thenRun() 等方式来注册回调函数, CompletableFuture(kafka3.X)完成时执行*/public void sendAsynchronous2(){ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("hello", "这是使用java发送的第一条消息");CompletableFuture<SendResult<String, String>> completable = result.completable();try{completable.thenAccept((sendResult)->{if(sendResult.getRecordMetadata()!=null){//kafka接受消息成功System.out.println("kafka接受消息成功");ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();String value = producerRecord.value();System.out.println("value = "+value);}}).exceptionally((ex)->{ex.printStackTrace();//如果失败,进行处理return null;});}catch (RuntimeException e){throw new RuntimeException();}}
spring-kafka 3.X

 下面是一个案例代码,我们的上面2.X中的代码第二种方式,删除转化的部分,3.X可以直接使用。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;import java.util.concurrent.CompletableFuture;public class KafkaSender3x {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaSender3x(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送之后使用异步的方式获取结果* 使用 CompletableFuture 的 thenAccept 方法来处理结果*/public void sendAsynchronous() {CompletableFuture<SendResult<String, String>> result = kafkaTemplate.send("hello", "这是使用java发送的第一条消息");result.thenAccept(sendResult -> {System.out.println("消息发送成功,分区: " + sendResult.getRecordMetadata().partition() +", 偏移量: " + sendResult.getRecordMetadata().offset());}).exceptionally(ex -> {System.err.println("消息发送失败: " + ex.getMessage());return null;});}
}    

Spring-boot创建主题指定分区和副本: 

分区(Partition)和副本(Replica)概念讲解:

  • 分区(Partition)
    • 定义:可以将其看作是一个主题(Topic)的物理细分。如果把主题类比为一个文件夹,那么分区就像是文件夹中的不同子文件夹。每个分区都是一个独立的、有序的消息序列,消息在分区内按照顺序进行存储和处理。
    • 作用:通过分区,可以实现数据的并行处理和存储,提高系统的吞吐量和可扩展性。不同的分区可以分布在不同的服务器上,这样多个消费者可以同时从不同的分区读取消息进行处理,从而加快消息处理的速度。例如,在一个处理大量订单消息的系统中,将订单主题分为多个分区,每个分区可以由不同的消费者组进行处理,从而提高订单处理的整体效率。
  • 副本(Replica)
    • 定义:是分区的一个拷贝,它包含了与原始分区相同的消息数据。副本可以存在于不同的服务器上,用于提供数据的冗余和容错能力。
    • 作用:当某个分区所在的服务器出现故障时,副本可以替代故障分区继续提供服务,保证数据的可用性和系统的稳定性。例如,在一个分布式消息队列系统中,如果一个分区的主副本所在服务器崩溃了,那么系统可以自动切换到该分区的其他副本所在服务器上,继续处理消息,而不会导致数据丢失或服务中断。同时,副本也可以用于负载均衡,多个副本可以分担读取请求的压力,提高系统的整体性能。
    • 细节点:replica:副本 他是为了放置我们partition数据不丢失,且kafka可以继续工作,kafka的每个节点可以有1个或者是多个副本 .副本分为Leader Replica 和 Follower Replica副本。 副本最少是1个,最多不能超果节点数(kafka服务器数),否则将不能创建Topic。 我们主副本可读可写,从副本只能读不能写 

    • 命令行方式创建副本:

    • ./kafka-topics.sh --create  --topic mytopic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

      我们当前是单节点的kafka,创建两个副本的话,就会出现错误,现实的错误信息是: only 1 broker(s) are registered.

 使用spring-kafka创建主题分区和副本

 如果我们直接使用kafkaTemplate的send(String topic,String event);这种方式的话,就是创建了以topic,但是他只有一个分区和一个副本(主副本,可读可写);

通过编写配置文件设置分区个数和副本个数:

package com.hdk.springbootkafkabase01.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaResourceFactory;
import org.springframework.kafka.core.KafkaTemplate;import java.util.Optional;/*** @Author HanDK* @CreateTime: 2025/5/1 09:18*/
@Configuration
public class KafkaConfig {/*** 我们再次启动时,当前的消息不会丢失,也不会将我们以签有的topic覆盖掉* 也就是如果存在的话就不会创建,只有不存在才会创建* @return*/@Beanpublic NewTopic newTopic(){
//          构造函数
//         public NewTopic(String name, int numPartitions, short replicationFactor) {
//            this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
//        }//创建一个topic  5个分区  1个副本NewTopic newTopic = new NewTopic("myTopic",5,(short)1 );return newTopic;}//但是现在将我们的myTopic的分区改为9个分区//在创建时如果有一摸一样的topic,不会创建。但是如果有改变的话,就会修改@Beanpublic NewTopic updateNewTopic(){
//          构造函数
//         public NewTopic(String name, int numPartitions, short replicationFactor) {
//            this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
//        }//创建一个topic  5个分区  1个副本NewTopic newTopic = new NewTopic("myTopic",9,(short)1 );return newTopic;}}

消息发送策略:

我们的一个topic中有很多的分区,但是我们在发送时使用的是什么策略呢?

默认的随机策略:

指定key:使用key生成hash值,之后在计算获取我们的partition的分区数值。如果我们的key值是不变的,他就会一直放置在一个分区中。

没有key:但是如果在发送消息时没有指定key值,他会随机发送到那个partition中。使用随机数算法获取随机的分区数。

轮询分配策略:

kafka的类:RoundRobinPartitioner implements Partitioner

如何使用轮询策略:(代码的方式获取)

我们发现直接使用配置文件的形式是不可以配置轮询策略的,使用代码的方式将策略设置为轮询策略。

编写配置文件:

    @Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Beanpublic Map<String,Object> producerConfigs(){//使用map的形式填写配置文件Map<String,Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String,Object> producerFactory(){//创建生产者工厂return new DefaultKafkaProducerFactory<>(producerConfigs());}//@Beanpublic KafkaTemplate<String,Object> kafkaTemplate(){KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}

当前我们的每个分区都没有事件,现在我发送9条消息,展示是不是轮询策略。

我们可以看到他确实时进入了轮询的策略,大家可以使用这个代码去Debug 

 

我们对于partition方法调用两次,他的放置文件时就是间隔一份放置。

自定义分配策略:

按着上面的思路,我们可以知道,其实如果自定一分区策略的话,自己去实现我们的Partitioner接口,实现分区策略就可以了。

生产者消息发送的流程:

编写拦截器:

package com.hdk.springbootkafkabase01.config;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author H* @CreateTime: 2025/5/1 17:04*/
public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {/*** 发送消息是,对于消息进行拦截* @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.* @return*/@Overridepublic ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {System.out.println("获取到内容是:"+record.value());return record;}/*** 消息确认机制* @param metadata The metadata for the record that was sent (i.e. the partition and offset).*                 If an error occurred, metadata will contain only valid topic and maybe*                 partition. If partition is not given in ProducerRecord and an error occurs*                 before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.*                 The metadata may be null if the client passed null record to*                 {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.* @param exception The exception thrown during processing of this record. Null if no error occurred.*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata!=null){System.out.println("发送成功");}else{System.out.println("出现异常");exception.printStackTrace();}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

配置拦截器

直接在配置文件中还是不能直接获取到配置项,使用编码实现

package com.hdk.springbootkafkabase01.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaResourceFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;
import java.util.Optional;/*** @Author HanDK* @CreateTime: 2025/5/1 09:18*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Beanpublic Map<String,Object> producerConfigs(){//使用map的形式填写配置文件Map<String,Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);//配置拦截器,默认是没有拦截器的props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName());return props;}@Beanpublic ProducerFactory<String,?> producerFactory(){//创建生产者工厂return new DefaultKafkaProducerFactory<>(producerConfigs());}//@Beanpublic KafkaTemplate<String,?> kafkaTemplate(){KafkaTemplate<String,?> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}/*** 我们再次启动时,当前的消息不会丢失,也不会将我们以签有的topic覆盖掉* 也就是如果存在的话就不会创建,只有不存在才会创建* @return*/@Beanpublic NewTopic newTopic(){
//          构造函数
//         public NewTopic(String name, int numPartitions, short replicationFactor) {
//            this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
//        }//创建一个topic  5个分区  1个副本NewTopic newTopic = new NewTopic("myTopic",5,(short)1 );return newTopic;}//但是现在将我们的myTopic的分区改为9个分区//在创建时如果有一摸一样的topic,不会创建。但是如果有改变的话,就会修改@Beanpublic NewTopic updateNewTopic(){
//          构造函数
//         public NewTopic(String name, int numPartitions, short replicationFactor) {
//            this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
//        }//创建一个topic  5个分区  1个副本NewTopic newTopic = new NewTopic("myTopic",9,(short)1 );return newTopic;}}

 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName());//这里不可以直接.class  

ProducerConfig.INTERCEPTOR_CLASSES_CONFIG ,该配置项期望的值是一个包含拦截器类全限定名(Fully Qualified Class Name)的字符串或者字符串列表。这是因为 Kafka 在启动时需要根据这些类名,通过 Java 的反射机制来实例化对应的拦截器类。不能直接传输Class对象。

 消息消费细节:

@Payload注解

他修饰的变量是发送的内容

@Hearder注解

他标注请求头的信息。但是需要指明获取的是头信息中的那个键值信息

ConsumerRecord<String,String> record  使用他接受消息的全信息

package com.hdk.springbootkafkabase02.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;/*** @Author HanDK* @CreateTime: 2025/5/1 17:37*/
@Component
public class EventConsumer {//@Payload  这个注解证明他修饰的变量就是消息体的内容//@Header   这个注解接收请求头的信息@KafkaListener(topics = {"helloTopic"},groupId="group1")public void listenerEvent(@Payload String message, @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,ConsumerRecord<String,String> record){System.out.println("接收的信息是"+message);System.out.println("接收的topic是"+topic);System.out.println("record中获取value:"+record.value());System.out.println("record中获取偏移量:"+record.offset());System.out.println("打印record的所有信息"+record);}/*** ConsumerRecord<String,String> record  使用他接受消息*/
}

 接收的信息是hello topic
接收的topic是helloTopic
record中获取value:hello topic
record中获取偏移量:2
打印record的所有信息ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1746152401289, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello topic)

消费对象信息:

在消费对象消息式,我们可以使用序列化和反序列化的方式,首先是想通过框架直接序列化和反序列化,但是出现了不信任的问题,所以我们将序列化和反序列化的工作交给我们程序员,编写Json工具类,手动实现

#producer:#value-serializer: org.springframework.kafka.support.serializer.JsonSerializer#consumer:#这是需要jackson依赖,导入我们的spring-boot-stater-json  依赖#错误点二:如果需要反序列化的话。当前报必须是可信赖的,需要将这个类设置为可信赖#The class 'com.hdk.springbootkafkabase02.entity.User' is not in the trusted packages: [java.util, java.lang].#If you believe this class is safe to deserialize, please provide its name.#If the serialization is only done by a trusted source, you can also enable trust all (*).#value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

 序列化工具类:

package com.hdk.springbootkafkabase02.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;/*** @CreateTime: 2025/5/2 11:25* 这里的序列化方式大家可以选择自己习惯的序列化方式*/
public class JsonUtils {//使用jackson方式序列化private static final ObjectMapper OBJECTJSON = new ObjectMapper();/*** 将自定义类型序列化* @param obj* @return*/public static String toJson(Object obj) {String ans = null;try {ans = OBJECTJSON.writeValueAsString(obj);} catch (JsonProcessingException e){e.printStackTrace();}return ans;}/*** json转对象* @param json* @param clazz* @return* @param <T>*/public static <T> T toBean(String json,Class<T> clazz){T obj = null;try {obj = OBJECTJSON.readValue(json, clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}return obj;}
}

生产者代码:

 public void sendObj(){/***       #这是需要jackson依赖,导入我们的spring-boot-stater-json  依赖*       #错误点二:如果需要反序列化的话。当前报必须是可信赖的,需要将这个类设置为可信赖*       #The class 'com.hdk.springbootkafkabase02.entity.User' is not in the trusted packages: [java.util, java.lang].*       #If you believe this class is safe to deserialize, please provide its name.*       #If the serialization is only done by a trusted source, you can also enable trust all (*).*       因为上面的原因,将我们的User类型转化为Json字符串*/User user = new User("lihua", 15, "11155551111");/*** 这种方式不需要值的序列化,也不需要反序列化*/String userJson = JsonUtils.toJson(user);kafkaTemplate.send("helloTopic",userJson);}

 消费者代码:

//接收对象消息@KafkaListener(topics = {"helloTopic"},groupId="group1")public void listenerObj(@Payload String userJson){System.out.println(userJson);//再自行进行反序列化User user = JsonUtils.toBean(userJson, User.class);System.out.println(user);}

 将监听信息写道配置文件中:

上面的消费者的topics,groupId写死在我们的代码中,这种方式的编码修改不方便

可以采用${}的方式在配置文件中读取

配置文件:

#自定义配置文件
kafka:topic:name: helloTopicconsumer:groupId: group1

代码:

//接收对象消息@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.groupId}")public void listenerObj2(@Payload String userJson){System.out.println(userJson);//再自行进行反序列化User user = JsonUtils.toBean(userJson, User.class);System.out.println(user);}

kafka消费手动确认:

kafka默认是自动确认,首先在配置文件中设置为手动确认

spring:kafka:bootstrap-servers: 192.168.0.168:9092#配置监听器listener:ack-mode: manual

之后在接受参数是,添加上参数`` ``,代码展示如下:

    //手动确认下的消费者代码@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.groupId}")public void listenerManual(@Payload String userJson, Acknowledgment acknowledgment){System.out.println(userJson);//再自行进行反序列化User user = JsonUtils.toBean(userJson, User.class);System.out.println(user);acknowledgment.acknowledge();}

 但是如果我们没有手动进行确认的化,会发生什么呢:

如果没有确认消费的话,我们的偏移量不会更新,我们在重启时,还会再之前的偏移量的位置开始消费。

我们再业务中可以这样写代码:

 //手动确认下的消费者代码@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.groupId}")public void listenerManual(@Payload String userJson, Acknowledgment acknowledgment){try {System.out.println(userJson);//再自行进行反序列化User user = JsonUtils.toBean(userJson, User.class);System.out.println(user);//没有问题时,直接确认acknowledgment.acknowledge();} catch (Exception e) {//出现问题,没有消费成功,抛出异常throw new RuntimeException(e);}}

细化消费:如何指定消费的分区,偏移量

  //细化消费,指定分区和偏移量@KafkaListener(groupId="${kafka.consumer.groupId}",topicPartitions = {@TopicPartition(topic ="${kafka.topic.name}",partitions = {"0","1","2"},  // 0 1 2 分区所有的数据都读partitionOffsets = {//分区3 4 只是读3之后的数据@PartitionOffset(partition = "3",initialOffset = "3"),@PartitionOffset(partition = "4",initialOffset = "1")})})public void consumerPartition(@Payload String jsonUser,Acknowledgment acknowledgment){System.out.println("获取到的数据是"+jsonUser);acknowledgment.acknowledge();}

偏移量细节:

1. Kafka 偏移量机制

Kafka 的偏移量是一个单调递增的数字,用来标记消息在分区中的位置。当你指定一个初始偏移量时,Kafka 会尝试从这个偏移量开始为你提供消息。要是指定的偏移量超过了分区中当前最大的偏移量,Kafka 会按照消费策略(例如从最早或者最新的消息开始消费)来处理。

2. 消费策略的影响

在 Kafka 里,当指定的偏移量超出了分区的范围,就会依据 auto.offset.reset 配置项来决定从哪里开始消费。这个配置项有两个常用的值:

  • earliest:从分区的最早消息开始消费。
  • latest:从分区的最新消息开始消费。

所以,当分区中的消息数小于你设定的初始偏移量时,Kafka 会依据 auto.offset.reset 的值来决定起始消费位置,而不是从你指定的偏移量开始。

批量消费消息:

配置文件:

spring:kafka:bootstrap-servers: 192.168.0.168:9092listener:# 默认是single(单一的),这是消费方式是批量type: batchconsumer:#为消费之设置消费数量max-poll-records: 20

消费者代码:

package com.hdk.springbootkafkabase03.consumer;import com.hdk.springbootkafkabase03.entity.User;
import com.hdk.springbootkafkabase03.utils.JsonUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.List;/*** @CreateTime: 2025/5/3 08:28*/
@Component
public class EventConsumer {/*** 对于批量的消费,接收时必须是集合形式接收* @param jsonUser* @param records*/@KafkaListener(groupId ="${kafka.consumer.group}",topics = "batchTopic")public void consumerBatchEvent(@Payload List<String> jsonUser, List<ConsumerRecord<String,String>> records){System.out.println("开始批量消费");System.out.println("获取的消息是"+jsonUser);}
}

运行截图:

 消息拦截:

在消息消费之前,我们可以设置拦截器,对消息进行一些符合业务的操作。例如记录日志,修改消息内容或者执行一些安全检查。

实现方式:

实现接口ConsumerIntercepter

package com.hdk.springbootkafkabase03.interceptor;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.time.LocalDateTime;
import java.util.Map;/*** 自定义消息拦截器* @CreateTime: 2025/5/3 09:12*/public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {//在消息消费之前@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {//记录日志System.out.println("开始消费"+ LocalDateTime.now());return records;   //返回的数据继续执行}/*** 消费消费之后,提交offset之前的方法* @param offsets A map of offsets by partition with associated metadata*/@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("提交offset"+offsets);}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

将过滤器配置到项目中:

配置消费者工厂,配置监听器容器工厂

package com.hdk.springbootkafkabase03.config;import com.hdk.springbootkafkabase03.interceptor.CustomConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;/*** @CreateTime: 2025/5/3 08:37*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public Map<String,Object> consumerConfig(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);//添加一个消费者拦截器props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());return props;}/*** 配置消费者工厂* @return*/@Beanpublic ConsumerFactory<String,String> consumerFactory(){DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerConfig());return defaultKafkaConsumerFactory;}/*** 创建消费者监听期容器工厂*/@Beanpublic KafkaListenerContainerFactory kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory();concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());return concurrentKafkaListenerContainerFactory;}}

消费者代码:

    /*** 对于批量的消费,接收时必须是集合形式接收* @param jsonUser* @param records* containerFactory 注意这个配置,指定一下*/@KafkaListener(groupId ="${kafka.consumer.group}",topics = "batchTopic",containerFactory = "kafkaListenerContainerFactory")public void consumerBatchEvent(@Payload List<String> jsonUser, List<ConsumerRecord<String,String>> records){System.out.println("开始批量消费");System.out.println("获取的消息是"+jsonUser);}

消息转发:

情景模拟:

我们监听TopicA的消息,经过处理之后发送给TopicB,使用业务b监听TopicB的消息。实现了我们雄消息的转发。

package com.hdk.springbootkafkabase05.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;/*** 消息转发* @CreateTime: 2025/5/3 15:54*/
@Component
public class EventConsumer {/*** @param record  消息的信息* @return  将要转发到topicB的信息*/@KafkaListener(topics = "topicA",groupId = "group1")@SendTo("topicB")public String consumerAndSendMessage(ConsumerRecord<String,String> record){System.out.println("当前的消息信息是:"+record.value());return record.value()+"forward-message";}@KafkaListener(topics = "topicB",groupId = "group1")public void consumerTopicB(ConsumerRecord<String,String> record){System.out.println("当前的消息信息是:"+record.value());}}

发送者代码:

@Component
public class EventProducer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;public void sendToTopicA(){kafkaTemplate.send("topicA","消息发送到kafkaA");}
}

消息消费时的分区策略:

默认消费策略rangeAssignor:

我们debug启动时,可以发现直接进入了我们的类:

package org.apache.kafka.clients.consumer 中的RangeAssignor

按着范围分区: 

假设我们的myTopic主题中有10个分区,一个消费组中有三个消费者consumer1 ,consumer2,consunmer3。

他的分配策略是:

1.计算每个消费者应得的分区数:

分区总数/消费者数=3.......1;

但是有一个余数是1.这时第一个消费者会获取到4个分区。consumer1的分区数是4;

2.具体分配是:

consumer1:0 1 2 3

consumer2:4 5 6

consumer3:7 8 9 

 使用代码测试一下:

消费者代码:

package com.hdk.springbootkafkabase06.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author HanDK* @CreateTime: 2025/5/5 18:26*/
@Component
public class EventConsumer {/*** 默认策略RangeAssignor的结果:* org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1 线程消费的消息内容是发送消息,结合分区策略* org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1 线程消费的消息内容是发送消息,结合分区策略* org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 线程消费的消息内容是发送消息,结合分区策略* 发现是三个线程名在消费* 使用这个concurrency,后面的配置证明他的一个消费组中,有几个消费者* 其实就是开启了那几个线程消费消息* 下面的代码表示一个消费组中有三个消费者* @param record*/@KafkaListener(topics = "myTopic",groupId = "myGroup",concurrency ="3" )public void listener(ConsumerRecord<String,String> record){String value = record.value();//借助线程名查看不同的消费者消费消息System.out.println(Thread.currentThread().getName()+"线程消费的消息内容是"+value);}
}

生产者代码:

package com.hdk.springbootkafkabase06.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** @Author HanDK* @CreateTime: 2025/5/5 18:29*/
@Component
public class EventProducer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;public void sendString(){for(int i=0;i<100;i++){String index=String.valueOf(i);ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("myTopic", index,"发送消息,结合分区策略");send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("消息发送失败,开始写入数据库");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("写如数据库成功");}@Overridepublic void onSuccess(SendResult<String, String> result) {System.out.println("发送消息成功");}});}}
}

 创建主题和分区代码:

package com.hdk.springbootkafkabase06.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;/*** @Author HanDK* @CreateTime: 2025/5/5 19:58*/
@Configuration
public class KafkaConfig {/*** 创建一个主题,里面有10个分区* @return*/@Beanpublic NewTopic newTopic(){return new NewTopic("myTopic",10,(short) 1);}
}

轮询策略RoundRobinAssignor:

在配置文件中我们发现是无法直接通过配置文件的方式配置的,所以只能是代码的形式编写配置文件。

在轮询的策略下,我们的消费的具体分配是:

consumer1:0 3  6 9

consumer2:1 4 7 

consumer3:2 5 8

配置文件的编写:

package com.hdk.springbootkafkabase06.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;import static org.apache.kafka.clients.consumer.RoundRobinAssignor.ROUNDROBIN_ASSIGNOR_NAME;/*** @Author HanDK* @CreateTime: 2025/5/5 19:58*/
@Configuration
public class KafkaConfig {/*** 创建一个主题,里面有10个分区* @return*/@Beanpublic NewTopic newTopic(){return new NewTopic("myTopic",10,(short) 1);}@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;/*** 自定义配置,返回Map类型的配置文件* @return*/@Beanpublic Map<String,Object> consumerConfig(){Map<String,Object> props = new HashMap<>();//设置主机和端口号props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);//设置序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);//设置消费策略props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);//设置消费分区策略为轮询策略props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName() );return props;}/*** 创建消费者工厂* @return*/@Beanpublic ConsumerFactory consumerFactory(){ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig());return consumerFactory;}/*** 创建监听器容器工厂* @return*/@Beanpublic KafkaListenerContainerFactory ourKafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory();listenerContainerFactory.setConsumerFactory(consumerFactory());return listenerContainerFactory;}
}

消费者代码:

 /*** 沦胥策略监听* containerFactory = "ourKafkaListenerContainerFactory"  使用这个注解表明我们使用的监听器容器工厂是哪个* 但是需要注意的是我们改变消费者的分区策略时,我们的消费组是不能有offset的* 我们将上面的myGroup改变为myGroup1* @param record*/@KafkaListener(topics = "myTopic",groupId = "myGroup1",concurrency ="3",containerFactory = "ourKafkaListenerContainerFactory")public void listenerRoundRobin(ConsumerRecord<String,String> record){System.out.println(Thread.currentThread().getName()+record);//借助线程名查看不同的消费者消费消息}

StickyAssignor:

尽可能的保持当前的消费者和分区的关系不变,即使我们的消费者的成员发生变话,也要减少不必要的分配。

仅仅只是对新的消费者或离开的消费者进行分区调整,大多数消费者还是继续保持他的消费分区不变。只是少数的消费者处理额外的分区。是一种粘性的分配

CooperativeStickyAssignor:

 与StickyAssignor类似,但增加了对协作式重新分配的支持,消费者在他离开消费者之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时进行分配。

Kafka事件(消息,数据)存储:

kafka的所有的事件消息都是以日志的形式保存的。他的配置方式是log.dir=****

kafka一般是海量的日志数据,避免日志文件过大,日志文件被放在多个目录下,日志文件的命名规则是<topic_name>--<partition_id>;

 

 

Kafka的__consumer_offsets的主题:

这个主题记录的每次消费完成之后,会保存当前消费到的最近的一个offset.,--consumer-offsets他保存了consumer_group某一时刻提交的offset信息。这个主题的默认有50个分区。

 

生产者的offset:

生产者发送一条消息到topic下的partition,kafka内部会为每条消息分配一个唯一的offset,该offset就是该消息在partition中的位置。

消费者的offset:

消费者的offset是消费者需要知道自己已经读取到的位置,接下来需要从哪个位置开始读取。

每个消费组中的消费者都会独立的维护自己的offset,当消费者从某个partition读取消息时,他会记录当前读到的offset,这样即使是消费者宕机或重启,也不会出现数据的丢失。(之后消息确认才会提交offset)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/79858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PDF智能解析与知识挖掘:基于pdfminer.six的全栈实现

前言 在数字化信息爆炸的时代&#xff0c;PDF&#xff08;便携式文档格式&#xff09;作为一种通用的电子文档标准&#xff0c;承载着海量的结构化与非结构化知识。然而&#xff0c;PDF格式的设计初衷是用于展示而非数据提取&#xff0c;这使得从PDF中挖掘有价值的信息成为数据…

Python爬虫+代理IP+Header伪装:高效采集亚马逊数据

1. 引言 在当今大数据时代&#xff0c;电商平台&#xff08;如亚马逊&#xff09;的数据采集对于市场分析、竞品监控和价格追踪至关重要。然而&#xff0c;亚马逊具有严格的反爬虫机制&#xff0c;包括IP封禁、Header检测、验证码挑战等。 为了高效且稳定地采集亚马逊数据&am…

架构思维:探讨架构师的本质使命

文章目录 软件工程1. 软件工程的定义与核心目标2. 软件工程 vs. 软件项目管理3. 软件工程的两大特性4. 软件工程的关键活动与方法论5. 架构师在软件工程中的职责架构师的职责和思维架构师心性修炼三大核心能力架构设计的基本准则 团队共识“设计文档”的统一结构框架阅读他人代…

QT设计权限管理系统

Qt能够简单实现系统的权限设计 首先我们需要一个登陆界面 例如这样 然后一级权限&#xff0c;可以看到所有的内容&#xff0c;不设置菜单栏的隐藏。 然后其他权限&#xff0c;根据登陆者的身份进行菜单栏不同的展示。 菜单栏的隐藏代码如下&#xff1a; ui->actionuser-…

Debezium 架构详解与实战示例

Debezium 架构详解与实战示例 1. 整体架构图 #mermaid-svg-tkAquOxA2pylXzON {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-tkAquOxA2pylXzON .error-icon{fill:#552222;}#mermaid-svg-tkAquOxA2pylXzON .error-t…

Qt天气预报系统更新UI界面

Qt天气预报系统更新UI界面 1、创建各个小部分列表2、定义一个更新UI函数2.1 实现更新UI界面函数 1、创建各个小部分列表 QList<QLabel *> weekList; //星期 QList<QLabel *> dateList; //日期QList<QLabel *> weatherL…

AWS MCP Servers

文章目录 一、关于 AWS MCP Servers什么是模型上下文协议&#xff08;MCP&#xff09;以及它是如何与AWS MCP服务器协同工作的&#xff1f;为什么选择MCP服务器&#xff1f; 二、可用 Servers核心MCP服务器AWS 文档服务器亚马逊 Bedrock 知识库检索 MCP 服务器AWS CDK MCP 服务…

python如何把pdf转word

在Python中将PDF转换为Word文档&#xff08;.docx&#xff09;比反向转换&#xff08;Word转PDF&#xff09;更具挑战性&#xff0c;因为PDF是固定格式&#xff0c;而Word是可编辑格式。以下是几种可行的方法及详细步骤&#xff1a; 方法1&#xff1a;使用 pdf2docx 库 pdf2do…

NLP 和大模型技术路线

transformers快速入门 NLP 和大模型技术路线 在自然语言处理&#xff08;NLP&#xff09;和大模型领域&#xff0c;技术路线的学习应该从基础开始&#xff0c;逐步深入到更高阶的应用和优化技术。本文将详细介绍相关技术点的学习顺序&#xff0c;以及每个技术点的关键学习内容…

WordPress个人博客搭建(二):在 Ubuntu 22.04 x64 系统中使用1Panel 部署 WordPress

前言 在之前的安装1Panel面板的文章中&#xff0c;我们已经成功将1Panel面板安装到了2核4G配置的非凡云云服务器上。1Panel作为一款现代化的服务器管理面板&#xff0c;极大简化了网站部署流程。本文将详细介绍如何使用1Panel面板在云服务器上安装部署WordPress&#xff0c;帮…

面试高频算法:最长回文子串

题目&#xff1a;5. 最长回文子串 给你一个字符串 s&#xff0c;找到 s 中最长的回文子串。 回文&#xff1a;如果字符串向前和向后读都相同&#xff0c;则它满足回文性&#xff1b;子串&#xff1a;子字符串 是字符串中连续的非空字符序列。 示例 1&#xff1a; 输入&…

全文索引数据库Elasticsearch底层Lucene

Lucene 全文检索的心&#xff0c;天才的想法。 一个高效的&#xff0c;可扩展的&#xff0c;全文检索库。全部用 Java 实现&#xff0c;无须配置。仅支持纯文本文件的索引(Indexing)和搜索(Search)。不负责由其他格式的文件抽取纯文本文件&#xff0c;或从网络中抓取文件的过程…

JVM——Java内存模型

Java内存模型 在Java多线程编程中&#xff0c;Java内存模型&#xff08;Java Memory Model, JMM&#xff09;是理解程序执行行为和实现线程安全的关键。下面我们深入探讨Java内存模型的内容。 Java内存模型概述 Java内存模型定义了Java程序中变量的内存操作规则&#xff0c;…

nRF Connect SDK system off模式介绍

目录 概述 1. 软硬件环境 1.1 软件开发环境 1.2 硬件环境 2 System Off 模式 2.1 模式介绍 2.2 注意事项 3 功能实现 3.1 框架结构介绍 3.2 代码介绍 4 功能验证 4.1 编译和下载代码 4.2 测试 4.3 使能CONFIG_APP_USE_RETAINED_MEM的测试 5 main.c的源代码文件…

白杨SEO:如何查看百度、抖音、微信、微博、小红书、知乎、B站、视频号、快手等7天内最热门话题及流量关键词有哪些?使用方法和免费工具推荐以及注意事项【干货】

大家好&#xff0c;我是白杨SEO&#xff0c;专注SEO十年以上&#xff0c;全网SEO流量实战派&#xff0c;AI搜索优化研究者。 &#xff08;温馨提醒&#xff1a;本文有点长&#xff0c;看不完建议先收藏或星标&#xff0c;后面慢慢看哈&#xff09; 最近&#xff0c;不管是在白…

2025 Mac常用软件安装配置

1、homebrew 2、jdk 1、使用brew安装jdk&#xff1a; brew install adoptopenjdk/openjdk/adoptopenjdk8 jdk默认安装位置在 /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home 目录。 2、配置环境变量&#xff1a; vim ~/.zshrc# Jdk export JAVA_HOM…

Linux 内核学习(6) --- Linux 内核基础知识

目录 Linux 内核基础知识进程调度内存管理虚拟文件系统和网络接口进程间通信Linux 内核编译Makefile 和 Kconfig内核Makefile内核Kconfig 配置项标识的写法depend 关键字select 关键字表达式逻辑关系Kconfig 其他语法 配置文件的编译Linux 内核引导方法Booloader 定义Linux 内核…

常见汇编代码及其指令

1. 数据传输指令 1.1. mov 作用&#xff1a;将数据从源操作数复制到目标操作数。语法&#xff1a;mov dest, src mov eax, 10 ; 将立即数 10 存入 eax 寄存器 mov ebx, eax ; 将 eax 的值复制到 ebx mov [ecx], eax ; 将 eax 的值写入 ecx 指向的内存地址 1.2. …

STM32基础教程——软件SPI

目录 前言 技术实现 接线图 代码实现 技术要点 引脚操作 SPI初始化 SPI起始信号 SPI终止信号 SPI字节交换 宏替换命令 W25Q64写使能 忙等待 读取设备ID号和制造商ID 页写入 数据读取 实验结果 问题记录 前言 SPI&#xff08;Serial Peripheral Interf…

(B题|矿山数据处理问题)2025年第二十二届五一数学建模竞赛(五一杯/五一赛)解题思路|完整代码论文集合

我是Tina表姐&#xff0c;毕业于中国人民大学&#xff0c;对数学建模的热爱让我在这一领域深耕多年。我的建模思路已经帮助了百余位学习者和参赛者在数学建模的道路上取得了显著的进步和成就。现在&#xff0c;我将这份宝贵的经验和知识凝练成一份全面的解题思路与代码论文集合…