网站设计 视频业绩统计网站开发
news/
2025/10/9 12:02:08/
文章来源:
网站设计 视频,业绩统计网站开发,wordpress怎么把分类弄在左边,免费百度广告怎么投放Kafka的体系结构
Kafka是由Apache软件基金会管理的一个开源的分布式数据流处理平台。Kafka具有支持消息的发布/订阅模式、高吞吐量与低延迟、持久化、支持水平扩展、高可用性等特点。可以将Kafka应用于大数据实时处理、高性能数据管道、流分析、数据集成和关键任务应用等场景。…Kafka的体系结构
Kafka是由Apache软件基金会管理的一个开源的分布式数据流处理平台。Kafka具有支持消息的发布/订阅模式、高吞吐量与低延迟、持久化、支持水平扩展、高可用性等特点。可以将Kafka应用于大数据实时处理、高性能数据管道、流分析、数据集成和关键任务应用等场景。在使用Kafka时也要充分考虑Kafka在某些方面的不足如引入Kafka作为消息队列后系统将强依赖Kafka当Kafka不可用时系统也会受到影响、引入Kafka后会提升系统的复杂度、引入Kafka作为消息队列后会带来分布式系统的一致性问题等。Kafka整体架构的逻辑视图如下所示 一个典型的Kafka体系结构包括Producer(生产者)、Broker(服务代理节点)、Consumer(消费者)等三种角色。其中Producer负责将消息发送到BrokerBroker负责将收到的消息存储到磁盘中而Consumer负责从Broker订阅并消费消息。 在Kafka的早期版本Kafka使用ZooKeeper来管理Kafka集群如Broker注册与删除、Topic的注册与删除、根据当前的Partition数量及Consumer数量来实现动态负载均衡等等。但是在Kafka 2.8之后引入了基于Raft协议的KRaft模式不再依赖Zookeeper大大简化了Kafka的架构可以以一种轻量级的方式来使用Kafka。同时移除ZooKeeper可以进一步降低脑裂问题的出现概率提升系统可靠性。注意Kafka实现集群自治确实可以在一定程度上减少脑裂问题的风险但并不能完全解决脑裂问题。此外如果要使用KRaft模式的话建议选择较高版本的Kafka因为这个功能还在持续完善优化中。Kafka 3.3.1版本是第一个将KRaftKafka Raft共识协议标记为生产就绪的版本。而在Kafka 3.5版本ZooKeeper已经被标记为弃用。在后续的Kafka 4.0版本将不再支持ZooKeeper。 生产者是指Message(消息)的生成者即将消息发送到Kafka存储的Topic(主题)中的生成者。Topic是一个逻辑概念用于组织和存储消息。Topic通过分区的方式存储在Broker中。生产者可以通过特定的分区函数决定消息路由到Topic的某个分区。默认的分区函数是轮询策略也即将消息均匀地分布在Topic所有的Partition上。消息的生成者发送消息有两种模式分别为同步模式和异步模式。根据需要发送的消息数量可以将消息分为单条发送消息和批量发送消息。 Broker接收到消息后以一种有序、不可变、分段的消息存储结构将消息存储到磁盘。消息按分段的方式存储每个分段包含1GB或一周的数据以较小的判断标准为主。为了快速定位消息数据Kafka在消息日志之上构建了索引结构。索引可以帮助快速定位某个消息的物理偏移量从而加快消息的读取速度。为保证消息不丢失Kafka为分区引入了多副本(Replica)机制通过增加副本数量可以提升容灾能力。同一个分区的不同副本中保存的是相同的信息副本之间是一主多从的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互)Follower只负责与Leader的消息同步。当Leader出现故障时从Follower中重新选举新的Leader并对外提供服务。 消费者负责订阅Kafka中的Topic并从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费者组(Consumer Group)的概念。一个消费者组可以包含一个或多个消费者。同一消费组中的消费者不会重复消费消息同样的不同消费组中的消费者消息消息时互不影响。Kafka通过消费组来实现消息的点对点(Point-to-PointP2P)模式和广播模式。
Broker
Broker是Kafka Cluster中的一个核心组件负责消息的存储、消息的转发、消息的多副本管理和集群管理等功能。 Broker的主要职责之一就是存储生产者发送的消息。Broker接收到消息后会将其持久化到本地磁盘中。Kafka将消息组织成Topic而每个Topic又被划分为一个或多个Partition。Broker负责将消息按照Partition存储到本地磁盘上确保消息的持久化。Kafka采用顺序写入的方式来提高写入性能消息在磁盘上是按照顺序进行存储的这有助于减少磁盘的寻道时间。 除了存储消息外Broker还负责将消息转发给消费者。当消费者需要读取消息时它会与Broker建立连接并请求获取某个Topic的特定分区的消息。Broker会根据消费者的请求从相应的分区中读取消息并将其发送给消费者。这样Broker就充当了消息的生产者和消费者之间的桥梁。 为了保证消息的可靠性和高可用性Kafka采用了多副本机制。每个分区都有一个Leader副本和多个Follower副本。Leader副本负责处理读写请求而Follower副本则用于同步数据以确保数据的备份和容错。当Leader副本出现故障时Kafka会自动从Follower副本中选举出新的Leader副本以保证服务的连续性。 Broker之间通过分布式协调机制来维护集群的状态和一致性。Broker之间通过一种称为Controller的选举机制来选举出一个负责集群协调的Broker。这个Kafka Controller负责处理集群中的元数据更新、分区领导选举、集群中节点故障检测、元数据管理等集群管理操作。这使得Kafka集群能够自动处理节点故障、扩展和缩容等情况保持整个系统的状态一致性、稳定性和可用性。 此外Broker还负责处理与安全性相关的任务如消息的加密、解密以及客户端的身份验证等。Kafka支持多种认证和授权机制如SASL/SSL、OAuth等以确保数据传输的安全性和访问控制的有效性。
接收来自生产者发送过来的消息 SocketServer 向消费者提供消息 Topic管理 OffsetManager
消息存储
Broker接收到从生产者发送过来的消息后会将其持久化到本地磁盘中。Kafka将消息组织成Topic而每个Topic又被划分为一个或多个Partition。每个Partition可以有一个或多个副本每个副本对应一个日志文件每个日志文件对应一个或多个日志分段(LogSegment)。Kafka中消息存储的逻辑视图如下所示 Topic
Kafka中的Topic是消息的逻辑分类可以将其理解为一个逻辑上的消息容器。生产者将消息发布到特定的Topic而消费者可以从Topic中订阅并消费消息。 Topic的创建支持自动创建和手动创建。针对手动创建Topic的场景比较推荐的方式是通过kafka-topics.sh脚本来创建Topic对于云平台场景可以通过可视化界面创建Topic。针对自动创建场景首先需要确保Broker的配置参数auto.create.topics.enable设置为true(默认值就是true)那么当生产者向一个尚未创建的Topic发送消息时会自动创建一个Partition数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的Topic。此外当一个消费者开始从未知Topic中读取消息或当任意一个客户端向未知Topic发送元数据请求时都会创建一个相应的主题。需要说明的是这种自动创建Topic的行为是非预期的。除非有特殊需求否则不建议将Broker的配置参数auto.create.topics.enable设置为true这会增加Topic的管理和维护的难度。 Topic不支持重命名。可能会有Topic重命名的场景如基于业务规范的需求、Topic的合并或拆分需求。尽管上述需求看似合理但是Kafka的设计哲学更倾向于使用不可变的Topic名称。这是因为Topic名称在Kafka中扮演着重要的角色它们不仅是消息的标识符还用于在集群中定位和管理数据。允许重命名Topic可能会导致一系列复杂性和潜在的问题如消费者偏移量的管理、数据一致性的保证等。所以为实现和Topic重命名类似的功能可以通过创建一个新的Topic将旧Topic的数据迁移到新Topic上删除旧Topic的过程间接实现。 如果确定不再使用一个Topic可以删除这个Topic。注意必须将Broker的配置参数delete.topic.enable设置为true(默认值是true)才能够删除Topic。如果delete.topic.enable参数配置为false那么删除Topic的操作将会被忽略。删除Topic是一个不可逆操作。一旦删除之后与其相关的所有消息数据会被全部删除要谨慎操作。
Partition
每个Topic又可进一步细分为一个或多个Partition。这样Topic就可通过Partition实现水平扩展能力。在创建Topic时可以指定Partition的数量。在设计Topic和Partition时需要考虑主题的数据量、消息生产者和消费者的数量以及消息处理的延迟等因素。增加Partition数可以提高并发处理能力和系统的吞吐量但也会增加存储和网络开销因此需要根据实际需求和资源情况来选择合适的Partition数。 Kafka也支持在运行时动态调整Topic的Partition数量。但是目前Kafka只支持增加Partition数而不支持减少Partition数。可以通过kafka-topics.sh脚本来增加Partition对于云平台场景可以通过可视化界面增加Partition。需要注意的是动态增加Partition并不会自动迁移原有Partition的数据到新的Partition中。也就是说新增加的Partition一开始是不包含任何消息的。如果需要将原有数据分布到新的Partition中需要自行实现数据迁移的逻辑。这可以通过编写自定义的迁移脚本或使用Kafka提供的工具来完成。 Kafka之所以不支持减少Partition数主要是实现此功能需要考虑的因素比较多。如删除分区中的消息该如何处理。如果随着分区一起消息则会带来数据丢失的问题如果需要保留还需考虑如何消费的问题。此外还要考虑顺序性问题、性能问题等。如果需要实现该功能可以重新创建一个Partition数较少的Topic然后将现有Topic的数据迁移到新Topic上。 如何选择合适的Partition数是很多Kafka的使用者经常面临的问题。对于这个问题没有非常统一的答案只能从某些角度做具体分析最终还是根据实际的业务场景来做具体的考量。如性能与Partition数有着必然的联系在设定Partition数时一般也需要考虑性能的因素。如不同的任务类型可能需要不同数量的分区如果正在使用Kafka作为日志收集系统那么可能不需要太多的分区因为这种情况下通常只需要顺序写入一组日志即可。然而如果正在使用Kafka作为实时数据管道或流处理平台那么更多的分区可能有助于支持更高的并发性和实时性要求。Partition数并不是越大越好随着Partition数的增加响应的吞吐量也跟着上涨。一旦分区数超过了某个阈值整体的吞吐量不降反升。
Replica
每个Partition可以有一个或多个Replica。为保证消息不丢失Kafka为分区引入了多副本(Replica)机制通过增加副本数量可以提升容灾能力。同一个分区的不同副本中保存的是相同的信息副本之间是一主多从的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互)Follower只负责与Leader的消息同步。当Leader出现故障时从Follower中重新选举新的Leader并对外提供服务。更多Replica相关知识细节可以参考下文的消息的多副本管理一节。
Log和LogSegment
每个Replica对应一个Log文件。为防止Log文件过大每个Log文件又可以分为一个或多个LogSegment相当于一个巨型文件被平均分配为多个相对较小的文件。事实上Log和LogSegment不是纯粹物理意义上的概念Log在物理上只以文件夹的形式存储而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件以及可能的其他文件。Log和LogSegment的逻辑关系如下图所示 Broker接收到消息后以一种有序、不可变、分段的消息存储结构将消息存储到磁盘。向Log中追加消息是顺序写入的只有最后一个LogSegment才能执行写入操作其他的LogSegment均不能写入数据(可以理解为写满)。当最后一个LogSegment满足一定条件时(每个分段包含1GB或一周的数据以较小的判断标准为主)就需要创建新的LogSegment之后新追加的消息就写入这个LogSegment上。 为了便于消息的检索每个LogSegment的日志文件(以.log为文件后缀)都有对应的两个索引文件偏移量索引文件(以.index为文件后缀)和时间戳索引文件(以.timeindex为文件后缀)。每个LogSegment都有一个基本偏移量BaseOffset用来表示当前LogSegment中的第一条消息的Offset。偏移量是一个64位的长整数日志文件和两个索引文件都是根据基准偏移量命名的名称固定为20位整数没有用到位数用0填充。Offset是消息在分区中的唯一标识Kafka 通过它来保证消息在分区内的顺序性不过Offset并不跨越分区。也就是说Kafka保证的是分区有序性而不是Topic的有序性即局部有序。 注意每个LogSegment中不仅仅包含.log、“.index”、“.timeindex这3个文件还可能包含deleted”、“cleaned等临时文件以及可能的”.snapshot、“txnindex等文件。 为了减少消息占用的存储空间和传输带宽的消耗可以将消息进行压缩。在Kafka中消息压缩是在生产者端进行的。一般情况下生成者发送的压缩数据在Broker中也是保持压缩状态存储的。消费者从Broker获取的也是压缩的消息消费者在处理消息之前才会解压消息这样保持了端到端的消息压缩。 Kafka日志中使用哪种压缩方式是通过参数compression.type来配置的默认值为producer”表示保留生产者使用的压缩方式。如果参数compression.type配置为uncompressed则表示不压缩。 每个从生产者发出的消息的Offset都是从0开始的。对Offset的转换是在Broker中进行的。消费者在消费消息时会从Broker获取该压缩消息的Absolute Offset。需要注意的是Kafka的压缩机制是端到端的即生产者负责压缩消息消费者负责解压缩消息。因此在使用压缩功能时需要确保生产者和消费者都支持相同的压缩算法并且配置了正确的压缩参数。 Kafka将消息存储在磁盘中为了控制磁盘占用空间的不断增加需要对消息做清理操作。Kafka本身没有提供消息的TTL功能。但是可以设置Topic中的消息的默认保存时限(默认是7天)。这个默认保存时间可以通过server.properties文件中的log.retention.hours属性进行修改。Kafka提供了两种日志清理策略(1) 日志删除(Log Retention)按照一定的保留策略直接删除不符合条件的LogSegment(2) 日志压缩(Log Compaction)针对每个消息的Key进行整合对于有相同Key的不同Value只保留最后一个版本。
消息转发
除了存储消息外Broker还负责将消息转发给消费者。当消费者需要读取消息时它会与Broker建立连接并请求获取某个Topic的特定分区的消息。Broker会根据消费者的请求从相应的分区中读取消息并将其发送给消费者。这样Broker就充当了消息的生产者和消费者之间的桥梁。 为了加快消息的检索Broker在消息日志之上构建了索引结构。索引可以帮助快速定位某个消息的物理偏移量从而加快消息的读取速度。Broker提供了两种索引结构偏移量索引和时间戳索引。其中偏移量索引用来建立消息偏移量(Offset)到物理地址之间的映射关系方便快速定位消息所在的物理文件位置。时间戳索引则根据指定的时间戳(Timestamp)来查找对应的偏移量信息。 Kafka中的索引文件以稀疏索引的方式构造消息的索引它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量的消息时偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。 偏移量索引文件中的偏移量是单调递增的查询指定偏移量时使用二分查找法来快速定位偏移量的位置如果指定的偏移量不再索引文件中则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增查询指定时间戳时也根据二分查找法来查找不大于该时间戳的最大偏移量至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
消息的多副本管理
为保证消息不丢失Kafka为Partition引入了多副本(Replica)机制通过增加副本数量来提升容灾能力。同一个Partition的不同副本中保存的是相同的信息副本之间是一主多从的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互)Follower只负责与Leader的消息同步。当Leader出现故障时从Follower中重新选举新的Leader并对外提供服务。在创建Topic的时候可以同步指定副本因子或使用默认的副本因子。具体来说如果是手动创建Topic通过设置replication.factor参数来指定副本数量如果是自动创建Topic且Broker的配置参数auto.create.topics.enable已经设置为true(默认值就是true)那么副本数量就是default.replication.factor(默认值为1)。以由4个Broker组成的Kafka Cluster为例副本数为3其副本分布如下图所示 Kafka会确保Partition的每个副本被放在不同的Broker上这样可以保证不会出现因一个Broker失效导致多个副本同时失效的问题的发生。如果Kafka的副本数量不能大于Broker的数量则至少有两个副本分布在同一个Broker上那么Kafka会无法成功分配所有副本从而导致Topic创建失败。因此在配置Kafka集群时需要确保Broker的数量足够以支持所需的副本数。如果Broker数量不足可能需要增加Broker的数量或者调整主题的副本数设置以确保Kafka集群能够正常运行并满足数据可靠性和可用性的需求。一般业务使用三副本即可对于金融系统等更推荐五副本。副本数的选择是一个多方面因素考虑的结果要根据具体的应用选择合适的数量。 对于Broker来说为了让Leader副本平均分布从而避免单个Broker分配过多Leader副本带来负载失衡的问题Kafka引入了优先副本原则。理想情况下Kafka可以确保所有Topic的优先副本在Broker中均匀分布。这样就能保证所有Partition的Leader的均衡分布。但是对于Broker来说仍然无法解决单个Leader负载过重带来的数据倾斜问题。单个Leader负载过重问题的性能优化需要从端到端的角度解决不在本文讨论的范围。常见的解决策略有优化Producer配置、调整分区数、调整负载均衡策略、数据压缩等等。
消息同步
在多副本管理时一个重要的过程就是Leader和Follower间的消息同步。Kafka使用了ISR机制来保证Leader和Follower的数据同步。这里先介绍下ISR机制相关的关键概念 Partition中的所有副本统称为AR(Assigned Replicas)。在AR中将所有与Leader保持一定程度同步的副本(包含Leader在内)称为ISR(In-Sync Replicas)将与Leader同步滞后的副本(一定不包括Leader)称为OSR(Out-of-Sync Replicas)。所以ARISROSR。Leader负责维护和跟踪ISR集合中所含有Follower的滞后状态当Follower落后太多或失效时Leader会把它从ISR集合中剔除。当然如果OSR集合中有Follower跟上Leader那么Leader会把它从OSR集合转移到ISR集合。 在Leader和Follower进行数据同步时(Leader推送消息给FollowerFollower从Leader拉取消息均有可能使用到)可以选择多种复制方案如同步、异步等这主要通过生产者的消息确认模式的设置acks方式实现 acks0生产者发送消息后不会等待任何来自Broker的响应即只要消息通过网络发送出去就认为消息已成功写入Kafka。这种情况下如果Broker没有收到消息或消息丢失生产者将无从得知因此可能会导致数据丢失。但由于不需要等待服务器响应这种设置可以以网络支持的最大速度发送消息达到较高的吞吐量。 acks1生产者发送消息后只要集群的Leader副本成功写入消息(并不一定要写入磁盘)就会收到一个成功响应的ack。一旦消息无法写入Leader分区副本如由于网络原因或Leader副本所在节点崩溃生产者会收到一个错误响应并可以选择重发消息以避免数据丢失。这是消息可靠性和吞吐量之间的一个折中方案。 acksall或acks-1生产者发送消息后会等待所有同步副本ISRIn-Sync Replicas都收到消息后才认为写入成功。这种设置提供了更高的数据可靠性但可能会降低吞吐量。从功能上来看acks-1还是acksall没有差异。在实际应用中选择acks-1还是acksall取决于个人的编码习惯。但是为了避免潜在的混淆或误解建议根据团队的编码规范或Kafka的最佳实践来选择一个明确的值这里推荐使用-1表示一种标识且与其他参数表现一致。需要说明的是ISR集合中同步副本的数量是由Broker中的min.insync.replicas参数(默认值是1)设置。min.insync.replicas参数表示ISR集合中的最少副本数。它的主要目的是确保在ISR中至少有一定数量的副本已经同步了消息从而增强数据的可靠性和持久性。这个参数只在acks设置为all或-1时才有效。这个参数作用于所有Topic在使用时要注意。 在Leader副本和Follower副本进行消息同步时为了避免消息同步对正常业务的影响需要有一个限流的机制。根据数据同步的方式(pull模式或push模式)还可针对Leader副本和Follower副本单独进行设置。在Kafka中可以为Broker设置复制速率以确保所有Topic的副本复制不超过这个速率也可对某个Topic设置复制速率。使用Kafka自带的命令行工具kafka-configs.sh来设置这个参数的示例如下
# 设置Broker的复制速率这里速率单位是字节/秒
kafka-configs.sh localhost:2181/kafka --entity-type brokers --entity-name 2 --alter --add-config follower.replication.throttled.rate10000,leader.replication.throttled.rate10000
# 设置某个Topic的复制速率
kafka-configs.sh localhost:2181/kafka --entity-type topics --entity-name my_topic --alter --add-config leader.replication.throttled.rate10000故障转移
在多副本管理时另一个重要的过程就是Leader副本所在节点出现故障后如何从Follower副本中选举出新的Leader副本从而保证服务的可用性。 在Kafka中基于KRaft算法从Broker集合中选出Kafka Controller后接下来就由Kafka Controller完成Leader副本的选举及通知。简单来说当Leader副本故障后Kafka Controller会收集所有可用Follower副本的信息并选择一个新的Leader副本。选举的过程通常基于一些优先级规则如ISRIn-Sync Replicas列表中的副本优先于非ISR列表中的副本具有较低副本ID的副本优先于具有较高副本ID的副本等。一旦选定了新的Leader副本Kafka Controller会向集群中的Broker发送更新集群元数据的请求。 在选举的过程中Kafka Controller会按照如下的步骤来进行选举 (1) 筛选出在线的ISR和在线的AR。 (2) 优先在在线的ISR中选择如果在线的ISR不为空则选择在线ISR列表中的第一个作为Leader副本然后结束选举。 (3) 如果在线的ISR为空则根据Kafka Broker中的unclean.leader.election参数(默认是true)来决定是否在在线的AR列表中选举。如果该参数设置为true则选择在线的AR列表中的第一个作为Leader副本然后选举结束否则选举失败选举也同样结束。注意启用此选项可能会导致数据丢失因为非ISR副本中的数据可能与已故障Leader的数据不一致。 以上选举策略仅适用于Partition中Leader故障的场景对应的选举策略是OfflinPartitionLeaderElectionStrategy实际的Leader选举更复杂。针对不同的场景使用的选择策略也不相同。如对于分区重分配场景现有Leader并为故障但仍需要重新选举Leader这时就需采用其他选举策略如PreferredReplicaPartitionLeaderSelector或ReassignedPartitionLeaderSelector。Leader选举不是本文的重点有兴趣的同学还请自行学习。
集群管理
Broker之间通过一种称为Controller的选举机制(基于KRaft算法)来选举出一个负责集群协调的Broker。这个Kafka Controller负责处理集群中的元数据更新、分区领导选举、集群中节点故障检测、元数据管理等集群管理操作。 在Kafka Controller的选举过程中仍然存在脑裂问题或羊群效应但发生频率较低且引入了一系列处理手段。如通过引入Controller Epoch来确保只有一个有效的Controller在运作避免应网络分区导致Controller重新选主而引发脑裂现象的发生。通过合理设置消费者的重试策略等方式避免因Leader副本失效后大量的消费者几乎同时尝试重新连接或请求其他可用的副本从而导致集群中的其他Broker负载急剧增加而引发羊群效应。 Kafka Controller负责维护Kafka集群的元数据包括Broker的存活状态、分区的分配情况以及副本的分配情况等。Kafka Controller监控集群中Broker的状态变化并更新集群的元数据。当Kafka Controller检测到某个分区的ISRIn-Sync Replicas集合发生变化时它会负责通知所有Broker更新其元数据信息。 当有新的topic创建或者有新的Broker加入集群时Kafka Controller会根据一定的策略来进行分区分配。它会根据集群的负载情况、副本的分布情况等因素将分区均匀地分配给各个Broker。 Kafka Controller维护集群中所有分区的状态以及每个分区的Leader副本信息。如当动态增加分区时Kafka会在后台创建新的分区文件并将它们分配到集群中的Broker上。当Leader副本故障后Kafka Controller会从Follower副本中按照选举策略选出新的Leader副本。
Producer
生产者是消息的生成者即将消息发送到Kafka存储的Topic中的生成者。生产者可以通过特定的分区函数决定消息路由到Topic的某个分区。默认的分区函数是轮询策略也即将消息均匀地分布在Topic所有的Partition上。
生产者参数配置
生产者发送消息前需要创建一个生产者实例。这里介绍下生产者实例创建所需的一些关键配置并给出示例代码。
public Properties createProducerConfig(String brokerList, String producerId) {Properties props new Properties();// 指定生产者连接Broker所需的地址清单内容格式为host1:port1,host2:port2// 注意这里并非需要所有的Broker地址而是要通过暴露的Broker获取集群元数据信息props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);// Broker接收的消息必须是字节数组所以还需对消息的key和value做相应的序列化操作props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acks为-1意味着Producer会等待分区副本确认写入后才认为消息发送成功 props.put(ProducerConfig.ACKS_CONFIG, -1); // 显式设置Producer Id命名时要有业务含义且遵循团队规范方便后面的消息查找props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);return props;
}KafkaProducer是线程安全的可以在多个线程中共享单个KafkaProducer示例也可将KafkaProducer实例进行池化来供其他线程调用。一般情况下每个客户端使用一个KafkaProducer即可。
消息发送模式
消息的生成者发送消息有两种模式分别为同步模式和异步模式。在KafkaProducer中仅提供了异步方法send()但是可以通过阻塞等待消息返回达到同步的效果。示例代码如下
public void sendSync(ProducerString, String producer, ProducerRecordString, String record) {// 发送消息FutureRecordMetadata future producer.send(record);// 等待消息发送完成并获取元数据try {RecordMetadata metadata future.get();System.out.println(消息已发送到分区 metadata.partition() 偏移量 metadata.offset());} catch (Exception e) {e.printStackTrace();}
}public void sendAsync(ProducerString, String producer, ProducerRecordString, String record) {// 异步发送消息try {producer.send(record, (metadata, exception) - {if (exception ! null) {// 发送失败的处理逻辑exception.printStackTrace();System.out.println(发送消息失败: exception.getMessage());} else {// 发送成功的处理逻辑System.out.println(消息已发送到分区 metadata.partition() 偏移量 metadata.offset());}});} catch (Exception e) {e.printStackTrace();}
}单条发送和批量发送
根据需要发送的消息数量可以将消息分为单条发送消息和批量发送消息。Kafka Producer并没有提供批量发送消息的接口但是可以通过异步发送间接实现。在异步发送消息时可以设置延迟发送的时间来收集更多的消息达到批量发送的效果。此外为避免等待期间消息的数量多大还可以通过设置batch.size来控制批量发送的size。对应生产者配置如下
public Properties createProducerConfig(String brokerList, String producerId) {Properties props new Properties();// 指定生产者连接Broker所需的地址清单内容格式为host1:port1,host2:port2// 注意这里并非需要所有的Broker地址而是要通过暴露的Broker获取集群元数据信息props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);// Broker接收的消息必须是字节数组所以还需对消息的key和value做相应的序列化操作props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acks为-1意味着Producer会等待分区副本确认写入后才认为消息发送成功 props.put(ProducerConfig.ACKS_CONFIG, -1); // 显式设置Producer Id命名时要有业务含义且遵循团队规范方便后面的消息查找props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);// 设置批量提交的字节数默认是16384字节即16KBprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 设置生产者发送请求前的等待时间以毫秒为单位默认是0表示无需等待props.put(ProducerConfig.LINGER_MS_CONFIG, 10);return props;
}这样再次使用异步发送代码发送消息时消息不会立即发送而是在linger.ms或batch.size的任一条件满足后才发送消息从而达到批量发送的效果。
消息返回确认
生产者将消息发送给Broker后接下来就是通过ACK(Acknowledgment)机制来确认消息是否返回。ACK机制确保了消息在生产者和Broker之间的可靠传递并在消息被认为已成功发送或处理之前防止其丢失。ACK机制的确认级别之前已有介绍这里重复介绍下 (1) acks0这是最快速的确认级别也是最不可靠的。生产者发送消息后不会等待任何确认直接将消息添加到分区的副本中并认为消息已成功发送。在这种模式下如果发生故障或错误生产者将不会知道也不会重试发送消息。这种模式通常用于不太关心消息可靠性的场景。 (2) acks1Leader确认这是默认的确认级别。生产者发送消息后会等待分区的领导者Leader副本确认消息已成功写入到其本地日志。一旦领导者确认生产者会认为消息已成功发送。然而如果领导者成功写入消息但在复制给其他副本时发生故障消息仍有可能会丢失。 (3)acks-1也称为all这是最可靠的确认模式。生产者发送消息后会等待Leader和ISR集合中的Follower确认消息已写入然后发送下一条消息。这种模式下只有在所有副本都成功写入消息后生产者才会收到确认确保了消息的可靠性但可能会导致更长的延迟。 生产者可以通过配置参数来控制其行为以适应不同的业务需求。在选择确认机制时需要权衡消息的可靠性和系统的吞吐量。对于需要高可靠性的场景建议使用acks-1而对于对延迟敏感或不太关心消息可靠性的场景可以使用acks0或acks1。
失败重试和错误处理
KafkaProducer中一般会发生两种类型的异常可重试的异常和不可重试的异常。比如网络异常这个有可能是网络抖动导致消息没有达到Broker这时可以通过重试解决。但是有时异常无法通过重试解决。如消息体过大的异常这种异常就无法通过重试解决只能对错误进行处理如记录日志重写现有代码保证消息体不会过大。 对于失败后可重试的异常其重试参数可以通过生产者的retries参数配置。KafkaProducer会自动捕捉可重试的失败无需编码。重试的次数默认值是10配置实例代码如下
props.put(ProducerConfig.RETRIES_CONFIG,10);对于消息发送无法处理的失败则需要将其作为错误处理。这种失败处理机制和Java的异常处理机制类似。在Java中会将错误分为Error和Exception两类。对于Error是无法捕获对于Exception则是可以捕获并处理。以下是一种常见的针对不可重试的异常的处理的示例代码
try {producer.send(record, (metadata, exception) - {if (exception ! null) {// Broker返回的消息发送失败的异常exception.printStackTrace();System.out.println(发送消息失败: exception.getMessage());} else {// 发送成功的处理逻辑System.out.println(消息已发送到分区 metadata.partition() 偏移量 metadata.offset());}});
} catch (Throwable e) {// 捕获执行send方法产生的异常e.printStackTrace();
}Consumer
消费者负责订阅Kafka中的Topic并从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费者组(Consumer Group)的概念。一个消费者组可以包含一个或多个消费者。同一消费组中的消费者不会重复消费消息同样的不同消费组中的消费者消息消息时互不影响。Kafka通过消费组来实现消息的点对点(Point-to-PointP2P)模式和广播模式。点对点模式是基于队列的消息生产者发送消息到队列消息消费者从队列中接收消息。广播模式定义了如何向一个内容结点发送和订阅消息这个内容节点称为Topic。Topic可以认为是消息传播的中介消息发布者将消息发布到某个Topic而消息订阅者从Topic中订阅消息。Topic使得消息的订阅者和发布者保持相对独立不需要进行接触即可保证消息的传递。 接下来举例说明Topic、Partition、消费者组、消费者之间的映射关系。某个Topic共有4个PartitionP1、P2、P3、P4。有两个消费者组A和B都订阅了该Topic消费者组A有4个消费者C1、C2、C3、C4消费者组B中有2个消费者C21、C22。则上述实例中Topic、Partition、消费者组、消费者之间的映射关系可用下图表示 一个Topic可以有一个或多个Partition。在创建Topic的时候就指定了Partition的数量。对于自动创建Topic其Partition数量从num.partitions(默认值为1)获取对于手动创建Topic则根据指定的Partition数决定。 每个Partition只能被一个消费者组中的一个消费者所消费。如果Partition数量等于消费者组中消费者的数量那么每个消费者只需要消费Topic的一个Partition。如果Partition数量大于消费者组中消费者的数量那么每个消费者至少消费Topic的一个Partition且至少有一个消费者消费两个Partition。如果Partition数量小于消费者组中消费者的数量那么每个消费者至多消费Topic的一个Partition且存在消费者不消费Partition的情况。 当在Kafka运行时动态的增加Partition数时会带来Partition-Consumer的再分配。需要注意的动态增加Partition并不会自动迁移原有Partition的数据到新的Partition中。也就是说新增加的Partition一开始是不包含任何消息的。如果需要将原有数据分布到新的Partition中需要自行实现数据迁移的逻辑。 消费者也支持动态的增加或减少且也会带来Partition-Consumer的再分配。由于Partition此时并不会改变所以只是更换了Partition对应的Consumer。注意当Consumer数量等于Partition数量时再次增加Consumer不会提高消费者端的消费能力因为这个新的Consumer没有对应的Partition可以消费。 这里之所以不讨论Partition动态减少的情况是因为目前Kafka并不支持该功能这里不拓展讲解具体的原因有兴趣的同学可以自行学习。
消费者参数配置
与生产者类似消费者在接收消息前需要创建一个消费者实例。这里介绍下消费者实例创建所需的一些关键配置并给出示例代码。
public Properties createConsumerConfig(String brokerList, String groupId) {Properties props new Properties();// 指定生产者连接Broker所需的地址清单内容格式为host1:port1,host2:port2// 注意这里并非需要所有的Broker地址而是要通过暴露的Broker获取集群元数据信息props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);// 每个消费者都对应一个消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);// 从Broker接收的消息都是经过生产者序列化之后的所以在消费的时候要进行反序列化处理props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 控制一次消息拉取的最大记录数。换句话说它限制了消费者每次从Kafka中拉取消息时能够获取的最大消息数量props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);// 是否自动提交消费者的Offset默认是trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 指定自动提交消费者的Offset的时间间隔单位是毫秒默认是5000ms也即5sprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 自动消费重置参数消费者提交的偏移量不在Broker上规范如何消费消息默认是lastest即如果分区中没有已提交的偏移量消费者将不会消费任何已存在的数据而是等待新数据的到来// 在消费者启动时该参数起着关键作用尤其是在消费者尝试读取一个分区时该分区没有初始的偏移量或当前偏移量不再存在例如由于数据被删除。该参数决定了消费者在这种情况下应该采取的行为。props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);return props;
}与KafkaProducer不同Kafka Consumer是非线程安全的。这意味着在多个线程中直接共享一个KafkaConsumer实例是不安全的可能会导致数据不一致等问题。然而这并不意味着Kafka Consumer只能在单线程中使用。为了在多线程环境中使用Kafka Consumer可以采取一些策略如每个线程维护一个Kafka Consumer实例或者使用单Kafka Consumer实例加上多worker线程的模式。 尽管Kafka Consumer本身不是线程安全的但Kafka提供了其他机制来确保数据的一致性和可靠性。如Kafka Consumer的提交偏移量操作是线程安全的这意味着即使在多线程环境中Kafka也能确保提交操作的原子性和顺序性从而避免数据冲突或丢失。
单条消费和批量消费
根据需要接收的消息数量可以将消息消费分为单条消费和批量消费。可以通过设置max.poll.records(默认值是500)为1来实现单条消费一般情况下不建议单条消费尽量批量拉取、单条处理。在Kafka Consumer API中接提供connsumer.poll(Duration.ofMillis(timeout))方法来拉取消息。poll()方法返回的是一个消息集合所以单条消费和多条消费可以使用相同的代码
public void consumeMessage(ConsumerString, String consumer) {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 如果拉取的消息只有一条则为单条处理如果拉取的消息是多条则为批量处理System.out.println(Received message: record.value());}}
}手写代码可能会存在这样那样的问题对于使用Spring Boot开发的应用来说推荐使用Spring Kafka三方件提供的单条消费和批量消费的接口。
自动提交偏移量和手动提交偏移量
Kafka Consumer在消费消息时支持自动提交偏移量和手动提交偏移量。自动提交偏移量是Kafka Consumer的默认方式可以简化客户端代码对偏移量的感知手动提交偏移量方式需要客户端代码主动提交偏移量。对于自动提交偏移量方式如果在接收消息后出现错误自动提交可能仍在进行进而导致这部分数据丢失也即实现至多一次的语义保证。而手动提交偏移量方式可以由客户端代码决定何时提交偏移量可以在消息处理失败时不处理偏移量从而避免消息丢失也即实现至少一次或恰好一次的效果。 自动提交适合简单的用例其中消息的丢失或重复处理不是关键问题。手动提交适用于需要更高控制级别和更强语义保证的场景例如金融交易或关键业务逻辑。 自动提交偏移量方式是Kafka Consumer的默认实现具体代码可以参考之前的示例这里给出手动提交偏移量的示例代码
public void consumeMessageByManual(ConsumerString, String consumer) {try {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 处理消息逻辑System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());// 假设消息处理成功则提交偏移量// 同步提交偏移量consumer.commitSync();}}} catch (Throwable e) {// 处理异常e.printStackTrace();} finally {consumer.close();}
}注意在编写手动提交偏移量的代码前还需配置Kafka Consumer为手动提交模式对应参数是enable.auto.commit对应的值为false。在Kafka Consumer手动提交模式下还支持同步提交偏移量和异步提交偏移量两种方式commitSync()方法会一直阻塞直到偏移量被成功提交或发生错误。如果希望提交操作是非阻塞的可以使用commitAsync()方法并为其提供一个回调来处理可能的异常。
可靠性保证
在将Kafka作为消息管理平台时为保证系业务系统的可靠性还需对消息处理的常见场景进行考量以期实现一个符合业务需要的系统。常见的问题有消息丢失问题、消息重复消费问题、消息的顺序性问题、消息积压问题、消息TTL问题、消息回溯问题。
消息丢失问题
在Kafka中消息从生产到存储到消费都可能存在消息丢失的问题。在使用Kafka的时候要根据业务需要进行合理的配置。
生产者防止消息丢失
生产者使用ACK机制来确保了消息在生产者和Broker之间的可靠传递并在消息被认为已成功发送或处理之前防止其丢失。其中 (1) acks0是最不可靠的这个策略下生产者发送消息后不会等待任何确认直接将消息添加到分区的副本中并认为消息已成功发送。在这种模式下如果发生故障或错误生产者将不会知道也不会重试发送消息。这种模式通常用于不太关心消息可靠性的场景 (2) acks1是默认的确认级别。这个策略下生产者发送消息后会等待分区的领导者Leader副本确认消息已成功写入到其本地日志。一旦领导者确认生产者会认为消息已成功发送。然而如果领导者成功写入消息但在复制给其他副本时发生故障消息仍有可能会丢失 (3) acks-1也称为all是最可靠的确认模式。这个策略下生产者发送消息后会等待Leader和ISR集合中的Follower确认消息已写入然后发送下一条消息。这种模式下只有在所有副本都成功写入消息后生产者才会收到确认确保了消息的可靠性但可能会导致更长的延迟。生产者可以通过配置参数来控制其行为以适应不同的业务需求。在选择确认机制时需要权衡消息的可靠性和系统的吞吐量。对于需要高可靠性的场景建议使用acks-1而对于对延迟敏感或不太关心消息可靠性的场景可以使用acks0或acks1。 Kafka Producer中一般会发生两种类型的异常可重试的异常和不可重试的异常。比如网络异常这个有可能是网络抖动导致消息没有达到Broker这时可以通过重试解决。但是有时异常无法通过重试解决。如消息体过大的异常这种异常就无法通过重试解决只能对错误进行处理如记录日志重写现有代码保证消息体不会过大。 对于失败后可重试的异常其重试参数可以通过生产者的retries参数配置。KafkaProducer会自动捕捉可重试的失败无需编码。重试的次数默认值是10。对于消息发送无法处理的失败则需要将其作为错误处理。这种失败处理机制和Java的异常处理机制类似。在Java中会将错误分为Error和Exception两类。对于Error是无法捕获对于Exception则是可以捕获并处理。
Broker防止消息丢失
为保证消息不丢失Kafka为Partition引入了多副本(Replica)机制通过增加副本数量来提升容灾能力。同一个Partition的不同副本中保存的是相同的信息副本之间是一主多从的关系。其中Leader负责处理读写请求(Producer和Consumer只与Leader交互)Follower只负责与Leader的消息同步。当Leader出现故障时从Follower中重新选举新的Leader并对外提供服务。 此外生产环境要给Broker设置监控和警报系统及时发现和处理消息丢失的问题。这包括监控Kafka集群的状态、生产者和消费者的性能以及消息的处理情况等。
消费者防止消息丢失
消费者消费消息的过程中无论是使用自动提交偏移量还是手动提交偏移量都有可能导致消息丢失。特别是自动提交偏移量如果消息处理错误但消息的偏移量已自动提交则这部分消息将丢失。如果不能容忍这类场景下的消息丢失则需使用手动提交偏移量方式实现至少一次或恰好一次的效果。但是因为手动提交偏移量需要开发者自行编写代码必须确保代码在充分验证的情况下保证不会出现不必要的消息丢失问题。
消息重复消费问题
Kafka出现消息重复消费问题主要是由于已经消费的消息的偏移量没有成功提交到Broker。出现以上问题的常见原因有 (1) 消费者宕机或重启当消费者宕机或重启时如果之前已经消费但尚未提交Offset的数据会在消费者恢复后再次被消费导致重复。 (2) 自动提交Offset与Rebalance冲突当消费者设置为自动提交Offset并且在新的消费者加入或移除导致Rebalance发生时尚未提交的Offset可能导致数据重复消费。 (3) 消息处理耗时过长如果消费者处理消息的时间过长超过了Kafka配置的最大轮询间隔max.poll.interval.msKafka会认为该消费者已经死亡触发Rebalance从而导致重复消费。 针对消息重复问题可以考虑以下方案 (1) 采用手动提交Offset手动提交Offset可以在消息被成功处理后再提交Offset。这样可以确保只有在消息真正被处理后才更新Offset减少重复消费的可能性。 (2) 优化消息处理逻辑减少消息处理的时间避免超过max.poll.interval.ms配置的时间。如果消息处理确实需要较长时间可以考虑增加该配置的值。 (3) 幂等性设计在设计应用时保证消息的消费逻辑时幂等的即无论同一个消息被消费多少次都不会产生副作用。 (4) 监控和警报设置监控和警报系统及时发现和处理消息重复消费的问题。 结合以上解决方案可以有效地减少Kafka消息重复消费的风险。同时应根据具体的业务场景和需求选择最适合的解决方案来应对消息重复消费问题。
消息的顺序性问题
所谓的消息顺序性主要是指消息可以按照写入的逻辑顺序进行消费。在Kafka中只能保证单个Partition的消息是有序的无法保证多个Partition间消息的有序性。如果期望实现Kafka消息的顺序性则必须保证这个Topic只有一个Partition且只有一个消费者去消费。 如果需要在多个Partition之间保证顺序消费可以采用一些额外策略如分区设计时根据一定的规则为业务标识分配一个唯一的标识符并将相同标识符的消息发送到同一个分区中。
消息积压问题
所谓的消息积压是指消息大量堆积消息的生产速率和消费速率无法匹配。常见的消息积压的原因有(1) 生产者速度过快当生产者发送消息的速度远超过消费者处理消息的速度时消息会在Kafka中积压(2) 消费者速度过慢消费者处理消息的速度较慢无法及时消费掉生产者发送的消息导致消息积压(3) 副本同步延迟Kafka的副本同步延迟较高导致副本之间的同步速度跟不上消息的写入速度从而引发消息积压(4) 网络故障Kafka集群所在的网络出现故障影响生产者和消费者的正常通信导致消息积压。 针对产生消息堆积的不同原因可以选择对应的解决方案 (1) 优化生产者发送速度如果生产者发送速度过快可以考虑调整其发送策略如增加消息发送的间隔或使用流控机制来限制发送速度。 (2) 调整消费者消费速度使用Kafka的消费者配置参数如max.poll.records来控制消费者每次拉取的最大记录数以适应生产者的速度。 (3) 增加消费者数量增加消费者的数量提高处理消息的速度减少单个消费者的负载压力。注意如果消费者的数量已经等于Partition的数量则增加消费者的数量无法提高消费速度。但是对于生产应用来说除了消息消费还承载其他业务增加消费者可以通过分担现网业务压力间接提交消息处理速度。 (4) 增加Kafka的分区数num.partitions提高吞吐量。注意分区数的增加会对现网业务产生影响应在充分验证的前提下谨慎操作。 (5) 监控和警报。设置监控和警报系统及时发现和处理消息积压问题。监控Kafka集群的状态、生产者和消费者的性能以及消息的处理情况等。对于现网业务来说针对Kafka消息积压的监控和告警是必不可少的。 (6) 提高消费者的硬件配置如增加内存或CPU资源以加快消息处理速度。可以临时太高消费者的硬件配置解决消息堆积问题。 解决Kafka消息积压问题需要综合考虑多个方面并根据实际情况调整和优化相关配置和策略。
消息TTL
所谓的消息TTL(Time To Live)就是消息的可存活时间。Kafka本身没有提供消息的TTL功能。但是可以设置Topic中的消息的默认保存时限(默认是7天)。这个默认保存时间可以通过server.properties文件中的log.retention.hours属性进行修改。
消息回溯
Kafka中的消息回溯是指用户可以从消息队列中重新消费之前的消息。Kafka支持两种主要的消息回溯方式 (1) 基于消息偏移量Offset的回溯在Kafka的每个分区中每条消息都有一个唯一的偏移量用来表示消息在分区中的位置。消费者每次消费消息后都会将消费的此条消息的偏移量提交到Broker用于记录消费到分区中的位置。因此基于消息偏移量回溯非常直接只需要重置偏移量消费者就会从该偏移量之后开始消费消息。 (2) 基于时间点的消息回溯Kafka存储消息是以日志的形式每个分区对应一个日志但日志实际上是由多个文件组成的。当需要基于时间点回溯消息时Kafka可以根据存储的日志文件和对应的时间戳来找到并重新消费之前的消息。 消息回溯可用于以下场景 (1) 数据丢失或错误处理当消费者处理消息时发生错误或数据丢失时回溯机制允许消费者重新读取之前的消息以便进行错误处理或重新处理数据。 (2) 版本升级在Kafka集群进行版本升级时回溯机制可以确保消费者能够与新版本的Kafka集群保持兼容避免兼容性问题。 需要注意的是虽然消息回溯提供了很大的灵活性但在实际使用中需要谨慎处理以避免对Kafka集群的性能和稳定性产生负面影响。同时也应考虑消息幂等性问题避免出现业务错误。
参考
《深入理解Kafka核心设计与实践原理》 朱忠华 著 《Kafka权威指南》 Neha Narkhede Gwen Shapira Todd Palino 著 薛命灯译 《Kafka源码解析与实战》 王亮 著 https://blog.csdn.net/qq_32828253/article/details/110732652 Kafka 设计架构原理详细解析超详细图解 https://kafka.apache.org/documentation/#zk 6.9 ZooKeeper https://iteritory.com/beginners-guide-apache-kafka-basic-architecture-components-concepts/ Apache Kafka Basic Architecture, Components, Concepts
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/932516.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!