KAFKA高级应用

kafka高级应用

一些kafka的基础使用以及说明请参考上一篇文章kafka的基础入门。这篇文章主要是写kafka的一些高级特性、存储结构以及原理。

1.kafka副本同步机制

高可用是很多分布式系统中必备的特征之一,Kafka的高可用是通过基于 leader-follower的多副本同步实现的

1.副本同步机制中的一些概念

kafka中topic的每个partition有一个预写式日志文件,每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中,partition中的每个消息都有一个连续的序列号叫做offset,确定他在partition中的唯一位置。

1.LEO(last end offset)

日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新 LEO 值。leader分区和follower分区的LEO值是不一定相同的

2.HW (hign watermark)

高水位值,小于 HW 值的消息被认为是“已提交”或“已备份”的消息,并对消费者可见,在HW值以内的消息是绝对安全的。HW以下的消息才会对消费者可见。HW取的是LEO和remote LEO的最小值,统计的是ISR队列中的follower副本节点,follower副本由于自身原因到了OSR队列之前的remote LEO会删除。

3.remote LEO

leader分区副本会记录自己的LEO以及远程的follower分区副本的LEO值也就是(remote LEO)。HW的值取的就是这两个LEO的最小值。remote LEO和HW在follower向leader拉取消息的时候更新。

2.副本同步过的过程

生产者发送了一条消息到leader副本分区,写入该条消息后 leader 更新为 LEO = 1,follower副本在发送 fetch 请求同步leader数据时携带当前最新的 offset = 0,leader 处理 fetch 请求时,更新 remoteLEO = 0,对比 LEO 值最小为 0,所以 HW = 0。follower写入后自己的LEO变为1 发送第二轮 fetch 请求,携带当前最新的 offset = 1,leader 处理 fetch 请求时,更新remote LEO = 1,对比 LEO 值最小为 1,所以 HW = 1,

3.副本同步存在的问题

leader 中保存的 remote LEO 值的更新总是需要额外一轮 fetch RPC 请求才能完成,这意味着在 leader 切换过程中,会存在数据丢失以及数据不一致的问题。

1.数据丢失

第一轮follower节点发送fetch请求的时候,follower节点提交的offset值为1,更新的主节点的hw为1 leo为2 remote leo为1,这时候follower节点的hw为1 leo为2, 在follower节点第二轮发送fetch请求的时候,更新的主节点的hw为2 leo为2 remote leo为2,在第二轮请求响应的时候主节点宕机了,这时候重新选取主节点这个follower节点被选取为主节点这时候本应该是hw为2 leo为2 remote leo为2的主节点变成了hw为1 leo为2 hw本应该是2却变成了1,这时候为了保证消息一致性会截断日志将主节点变成hw为1leo为1,数据也就丢失了。

2.数据不一致

leader节点有3条消息,m1 m2 m3,leader节点已经完成了HW更新但是却没有同步到follower节点,follower节点的hw值仍然是2。这时候机房整体宕机。follower节点先恢复的,这时候follower节点先恢复启动,follower节点被选取为主节点,因为它的hw值是2为了保证数据的一致性会截断数据,导致主节点的消息变为m1 m2 这时候之前的leader节点恢复了它的消息是m1 m2 m3 这时候又来了一个消息 m4 主节点变为m1 m2 m4 它跟follower节点一比对它们的hw值都一样不做数据同步 这时候主节点的第三个offset存的值是m3 从节点的第三个offset存的是m4出现了消息数据不一致的情况。

3.问题处理
1.配置ack = all && 副本数 >= 2 && 最小同步副本数 >= 2
2.leader epoch

就是添加一个纪元值和leo值,follower epoch会发起一个leaderepochRequest请求判断leader 和 follower的epoch是否一致,如果不一致通过leader-epoch-checkpoint处理这个问题。

Epoch:一个单调增加的版本号,每当副本领导权发生变更时,都会增加该版本号,小版本号的Leader 被认为是过期 Leader,不能再行使 Leader 权力

起始位移(Start Offset):Leader 副本在该 Epoch 值上写入的首条消息的位移(LEO。

不过实际上消息丢失和不一致的概率比较小可能1年都丢不了1条。看具体业务而定。不过kafka在设置了安全性参数以后性能会变差一些,如果真的考虑以安全性为主考虑用rocketmq吧,如果主打的是性能用kafka才是比较好的选择。

kafka在处理重要不可以丢失数据的时候要设置ack = all,分区副本数量和最小同步副本数量都要要大于等于2,并且要做消息的幂等性处理来保证消息不重复。kafka自身只提供生产者到broker的消息幂等处理kafka默认已经配置了这个参数不过这里的幂等也是单分区幂等性,消费者端是需要自己写代码控制的。

2.kafka数据有序

kafka只能保证单分区的数据有序

kafka1.0配置max.in.flight.requests.per.connection的值为1以内才能解决单分区数据有序的问题,1.0版本以后默认配置了消息发送的幂等性配置max.in.flight.requests.per.connection的值为1到5都可以。因为启用幂等后,Kafka服务器会在服务器缓存最近的五个请求的元数据,因而无论如何,kafka可以保证最近5个request的数据都是有序的。每次将Broker内存中的元数据落盘前,都会和缓存中的元数据对比,如果不是连续的 seqNumber 数据无法落盘,也就无法正确返回 ack,request 将无法得到响应而阻塞。同时,因为服务器只会缓存Producer发来的最近5个request的幂等性元数据,故而服务器最大能够处理排序的消息也是五个,sender线程保证顺序能发送到Broker中的单分区数据也必须小于五个,故而需要设置 max.in.flight.requests.per.connection=[1,5] 。

3.kafka消费消息

kafka作为业务上的单纯的mq使用功能不完善主要体现在消费信息的方面

1.消费者和消费者组

一个broker中的topic的partiition对于一个消费者组只能消费一次,如果一个topic所有的消费者都在一个消费者组就变成了队列模型类似rabbitmq的直连交换器,如果一个topic所有消费者都在不同的组那么就完全变成了发布订阅模式劣势rabbitmq的扇形交换器。

1.消费者组

kafka消费者组内部有很多消费者,这些消费者都公用一个id(group id),一个组内的所有消费者共同协作,完成对订阅的topic的所有partition消费,其中一个主题的一个分区只能由一个消费者消费。

1.一个消费者组可以有多个消费者
2.group id 是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组
3.每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费,消费者组之间不影响
2.消费者分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费,Kafka提供了3种消费者分区分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。Kafka默认采用RangeAssignor的分配算法

1.RangeAssigor

相邻的分区尽量分配给同一个消费者

分区总数/消费线程数,如果有余,则表明有的消费线程之间分配的分区不均匀,那么这个多出来的分区会给前几个消费线程处理。

比如7个分区,3个comsumer,则7/3=2,余1,这个表明如果3个消费线程均分7个分区还会多出1个分区,那么这个多出的额外分区就会给前面的消费线程处理,所以它会把第一个分区先给到contomer-1 消费线程消费

contomer-1 partition 0,partition 1,partition 2

contomer-2 partition 3,partition 4

contomer-3 partition 5,partition 6

2.RoundRobinAssignor

按照consumer的顺序以轮询的方式进行分配

contomer-1 partition 0,partition 3,partition 6

contomer-2 partition 1,partition 4

contomer-3 partition 2,partition 5

3.StickyAssignor

黏性分配策略

前两个分配策略比如再加一台消费者contomer-4或者由一台消费者宕机了,所有的分区和消费者关系要再均衡,根据前两个分配策略的算法消费者和topic分区的关系要发生很多变动。

黏性分配策略是尽可能的每一次分配变更相对上一次分配做最少的变动,尽可能保证分区分配均衡(即分配给consumers的分区数最大相差为1),当发生分区重分配再均衡时,尽可能多的保留现有的分配结果

2.kafka消费的安全问题

1.kafka消费的线程设计

kafka的java consumer新版本中是由用户主线程和心跳线程的双线程设计。用户主线程就是业务代码中消费者启动的那个线程,而心跳线程值服务定期发送心跳给broker保证自己是存活的。因为心跳线程只是保证自己的存活所以实际消息的处理还是主线程完成的也可以理解为是个单线程设计。

1.kafka的消费者单线程设计原因

兼容其他语言、设计简单、单线程+轮询的机制,这种设计能够较好的实现非阻塞式的消息获取

2.kafka消费单线程设计可能存在的问题

要注意catch吃掉异常,因为是单线程设计所以报错就终止了。

消费者里面写多线程是非线程安全的。

rocketmq rabbitmq 都是多线程的不需要额外处理。

kafka本身没有重试队列以及死信队列,要完成这些功能需要自己写业务代码完成比如自己手动创建一个队列或者引用一些第三方jar包kafka本身没有这样的功能。spring框架集成kafka有此类功能。

2.消息重复消费

比如消费者开启offset自动提交,consumer默认5s提交一次offset,不过2s的时候消费者了,宕机的这时候我们消费者实际消费了n条消息,不过broker的offset还没有更新,重启消费者后还是从broker的offset的指针处开始消费,这样就出现了消息重复的问题。每次增加消费者服务器或者消费者服务器宕机消费策略再均衡的时候也会出现重复消费的问题。

3.消息丢失

比如消费者开启offset手动提交,当offset被提交时数据还在内存中未处理,比如有8个消息消费一半就给提交了,broker的offset更新为8,不过这时候消费者宕机了就丢失了一半的消息。

4.消息堆积

消费者的消费速率小于生产者的生产速率。

消费能力不足的时候应该考虑增加topic的partition,同时增加消费者数量,因为一个partition只能被消费者组的一个消费者消费所以应该同时增加partition和消费者数量。

可以提高批次拉取消息的数量配置fetch.max.bytes、max.poll.records

3.消费者的位移管理

消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息,在kafka的broker中用offset标识,offset记录的是下一个消费位置。比如消费到9记录的是10。

1.分区位移

分区位移是发送到partition时,这个消息的偏移量offset是多少。

2.消费位移

消费位移是消费者消费到哪里,指向的是下一个消费消息位置的offset。

3.消费位移保存位置

Kafka0.9版本前这个offset是保存在zookeeper的,这个offset要频繁的写zk不太适合频繁的写更新,这样会拖慢zk集群的性能

kafka0.9后offset是保存在__consumer_offsets主题中。

4.位移提交方式

enable.auto.commit参数配置自动提交或者手动提交,kafka消费者端的设计是单线程的没有额外的线程来处理提交偏移量这个操作。

1.自动提交

默认每5s提交一次上次处理的偏移量。poll的时候会检测是否达到5s,如果达到5s会提交目前消费者处理的最大偏移量,比如消费了0到100直接提交的offset就是101,broker的__consumer_offsets收到101消费者指针就指向101。

1.日志压缩

即使当前位移主题没有消息可以消费了,位移主题中还是会不停地写入最新位移的消息使用日志压缩可以节省我们的磁盘空间。

生产环境中如果出现过位移主题无限膨胀占用过多磁盘空间的问题,建议去检查一下 Log Cleaner 线程的状态是不是挂掉了。

2.手动提交

需要设计enable.auto.commit = false来配置消费者手动提交。可以指定主题分区偏移量一个个提交

1.先手动提交offset后处理消息

提交之后消费者服务器宕机会消息丢失

2.先处理消息后提交offset

提交之前消费者服务器宕机会消息重复

3.同步位移提交

commitSync()方法会阻塞,tps不高,必须等待broker服务器响应。

4.异步位移提交

commitAsync()tps高,不需要等待服务器结果。不过如果第一次提交的offset是20 第二次提交的offset是100 ,第一次失败,第二次成功了了会从20还是重新消费会有重复消费的问题。

5.混合位移提交

先异步指定位移提交最后finally中同步位移提交

6.优雅退出

可以新建个jvm退出前的守护线程hook调用wake up方法使kafka消费者while(true) 的 poll 也结束在finally中调用同步位移提交方法提交偏移量。

4.kafka分区再均衡

消费者数量或者分区数量因为某些原因发生变化,消费者和分区的绑定关系会根据消费者分配策略进行再平衡重新分配就是kafka的分区再均衡。 partition被重新分配给一个消费者时,消费者当前的读取状态会丢失。

1.再均衡监听器

消费者订阅主题的时候传入再均衡监听器,再均衡监听器实现ConsumerRebalanceListener接口

1.在消费者停止消费消费后,在重平衡开始前调用。

public void onPartitionRevoked(Collection partitions)

可以在这个方法中代码实现同步提交偏移量

2.在分区分配给消费者后,在消费者开始读取消息前调用

public void onPartitionAssigned(Collection partitions)

5.kafka的存储结构

kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是 <topic_name>-<partition_id> ,例如 test-0

1.文件存储结构

topic – 多个partition – log – 多个segment – (.log、 .index 、 .timeindex)

1.存储路径

kafka数据存放位置是在config/server.properties中的log.dirs配置。

2.segment

为了方便管理partition的数据,将一个partition拆分成多个大小相等的数据段也就是segment,每个segment文件大小相等消息数量不一定相同

1.index

偏移量索引文件

2.timestamp

时间戳索引文件

3.log

日志文件,存储生产者生产的数据

4.snaphot

快照文件,记录事务信息

5.leader-epoch-checkpoint

保存每一个leader开始写入消息时候的offset,follwer被选为leader时会根据这个确认哪些消息可用

3.稀疏索引

为消息数据创建了2种索引,一种是方便时间查找的稀疏索引.timestamp,一种是方便offset查找的.稀疏索引.index

offset是逻辑偏移量,position是物理偏移量。segment标志了大体的偏移量比如1000.log 0 - 1000,.index标志了再具体一些的偏移量比如 100、200、300…,根据稀疏索引找到offset的位置然后找到磁盘对应的地址也就是position的值之后去顺序读。

4.日志清理

kafka提供了2种日志清理策略,日志删除和日志压缩

6.kafka优化手段

1.性能优化
1.批量发送消息

kafka采用了批量发送的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合从而减少网络传输的开销。

2.消息压缩

数据量越大,压缩效果才越好

通常有gzip snappy lz4 等压缩方式

3.高效序列化

比如protobuf、avro来减少实际网络传输量以及磁盘存储量,提高吞吐量。

2.存储优化
1.磁盘顺序写

磁盘顺序写的性能远远高于磁盘随机写,甚至高于内存随机写。

2.零拷贝

Kafka 用到了零拷贝(Zero-Copy)技术来提升性能,所谓的零拷贝是指数据直接从磁盘文件复制到网卡设备,而无需经过应用程序,减少了内核和用户模式之间的上下文切换。

为了减少用户态和内核态的过多交互就有了DMA技术,他是零拷贝的基石,通过DMA硬件可以绕过CPU自己去直接访问系统主内存

1.Kafka数据传输的的过程:
  1. 操作系统将数据从磁盘中加载到内核空间的Read Buffer(页缓存区)中。
  2. 操作系统之间将数据从内核空间的Read Buffer(页缓存区)传输到网卡中,并通过网卡将数据发送给接收方。
  3. 操作系统将数据的描述符拷贝到Socket Buffer中,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/613609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt QComboBox组合框控件

文章目录 1 属性和方法1.1 文本1.2 图标1.3 插入和删除1.4 信号和槽 2 实例2.1 布局2.2 代码实现 Qt中的组合框是集按钮和下拉列表体的控件&#xff0c;&#xff0c;它占用的屏幕空间很小&#xff0c;对应的类是QComboBox 1 属性和方法 QComboBox有很多属性&#xff0c;完整的…

Java编程避坑指南之关键字专题

1、Java常见关键字 1&#xff09;、48个关键字&#xff1a; abstract、assert、boolean、break、byte、case、catch、char、class、continue、default、do、double、else、enum、extends、final、finally、float、for、if、implements、import、int、interface、instanceof、l…

C++入门【27-C++ 引用】

引用变量是一个别名&#xff0c;也就是说&#xff0c;它是某个已存在变量的另一个名字。一旦把引用初始化为某个变量&#xff0c;就可以使用该引用名称或变量名称来指向变量。 C 引用 vs 指针 引用很容易与指针混淆&#xff0c;它们之间有三个主要的不同&#xff1a; 不存在…

编程语言的未来:创新与发展

编程语言的未来&#xff1f; 编程语言是计算机软件的基础&#xff0c;它们为程序员提供了一种沟通和指导计算机的方式。随着软件需求的不断增长和技术的进步&#xff0c;编程语言也需要不断演化和创新&#xff0c;以满足不断变化的需求。未来的编程语言将更加强大、灵活、易用…

航空服务市场分析:预计2024年客运总量将达40亿人次

在政策的引导和市场发展下&#xff0c;支线航空发展机遇在于在一些具备需求的区域&#xff0c;持续推进"航空服务大众化"。此前&#xff0c;美国实行"普遍航空服务"计划&#xff0c;我国也需要加快推进"国家基本航空服务计划"政策体系。国民经济…

springboot第46集:Nginx,Sentinel,计算机硬件的介绍

image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png 什么是单点容错率低&#xff1a; 单点容错率低指的是系统中存在某个关键节点&#xff0c;一旦这个节点发生故障或崩…

IDEA中在Java项目中添加Web模块 与配置tomcat服务器

现有项目添加直接走第二步 生成普通新项目 给项目添加框架支持 勾选 Web Application 选项, 点击OK 得到项目目录结构 , 出现web目录结构, 且web目录文件夹出现小蓝点 web或webapp 没有出现小蓝点 说明web配置没有出现或是手动构建的目录结构 , 在IDE关闭或者迁移项目时会出…

【虚拟仪器Labview】习题T1-详解

目录 题目要求思路计时部分详解实现第二个部分&#xff1a;将X*3Y 的的结果 Z与100进行比较。全部完成 题目要求 从前面板输入两个浮点数:X,Y,计算 X*3Y 的的结果 Z&#xff0c;在前面板显示计算结果。并且判断 Z是否小于 100&#xff0c;如果 Z 小于 100&#xff0c;前面板中的…

Java快速排序希尔排序归并排序

快速排序算法 快速排序的原理&#xff1a;选择一个关键值作为基准值。比基准值小的都在左边序列&#xff08;一般是无序的&#xff09;&#xff0c;比基准值大的都在右边&#xff08;一般是无序的&#xff09;。一般选择序列的第一个元素。 一次循环&#xff1a;从后往前比较&…

实现复数计算器

复数计算器实现 摘要 本论文描述了一个复数计算器的设计和实现&#xff0c;旨在扩展传统计算器的功能&#xff0c;以支持复数的加法、减法、乘法和除法。通过使用Java编程语言和Swing图形用户界面库&#xff0c;我们创建了一个直观、易于使用的界面&#xff0c;允许用户输入复…

在qemu虚拟机环境下,使用kgdb调试kernel

enable kgdb的情况下&#xff0c;使用qemu启动kernel 1&#xff0c;需要先在内核配置中增加kgdb的支持 2&#xff0c;启动qemu虚拟机时&#xff0c;增加参数-s -S&#xff0c;这两个参数会使得kernel在启动之后遇到的第一个指令等待gdb连接 例子&#xff1a; /qemu-project…

C++入门【28-C++ 把引用作为参数】

我们已经讨论了如何使用指针来实现引用调用函数。下面的实例使用了引用来实现引用调用函数。 实例 #include <iostream>using namespace std;// 函数声明void swap(int& x, int& y);int main () {// 局部变量声明int a 100;int b 200;cout << "交…

[力扣 Hot100]Day2 字母异位词分组

题目描述 给你一个字符串数组&#xff0c;请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 出处 思路 这题有点考阅读理解&#xff0c;意思就是把输入数组中的所含字母相同但顺序不同的单词放到同…

一、Sharding-JDBC系列01:整合SpringBoot实现分库分表,读写分离

目录 一、概述 二、案例演示-水平分表 (1)、创建springboot工程 (2)、创建数据库和数据表 (3)、application.yaml配置分片规则 (4)、测试数据插入、查询操作 4.1、插入-控制台SQL日志 4.2、查询-控制台SQL日志 三、案例演示-水平分库 (1)、创建数据库和数据表 (2…

阿里云和AWS之间的应用程序防火墙比较及选择建议!

对于大多数开发人员来说&#xff0c;托管在云中的 Web 应用程序或 REST API 是一种常见方案。但是&#xff0c;并非每个应用程序都具有相同的安全级别。将 Web 应用程序防火墙 &#xff08;WAF&#xff09; 添加到 Web 应用程序是提高安全性的有用方法。 在本文中&#xff0c;…

【MySQL】视图,15道常见面试题---含考核思路详细讲解

目录 一 视图 1.1视图是什么 1.2 创建视图 1.3 查看视图(两种) 1.4 修改视图(两种) 1.5 删除视图 二 外连接&内连接&子查询介绍 2.1 外连接 2.2 内连接 2.3 子查询 三 外连接&内连接&子查询案例 3.1 了解表结构与数据 3.2 15道常见面试题 四 思…

【MySQL】GROUP BY 后面直接使用数字的写法(简写)

力扣题 1、题目地址 1699. 两人之间的通话次数 2、模拟表 表&#xff1a;Calls Column NameTypefrom_idintto_idintdurationint 该表没有主键(具有唯一值的列)&#xff0c;它可能包含重复项。该表包含 from_id 与 to_id 间的一次电话的时长。from_id ! to_id 3、要求 编…

linux磁盘总结

什么是page_cache linux读写磁盘&#xff0c;如果都是采用directIO的话&#xff0c;效率太低&#xff0c;所以我们在读写磁盘上加了一层缓存&#xff0c;page_cache。读的话&#xff0c;如果page_cache有的话&#xff0c;就不用向磁盘发出请求。写的话&#xff0c;也直接写入的…

C# 使用多线程,关闭窗体时,退出所有线程

this.Close(); 只是关闭当前窗口&#xff0c;若不是主窗体的话&#xff0c;是无法退出程序的&#xff0c;另外若有托管线程&#xff08;非主线程&#xff09;&#xff0c;也无法干净地退出&#xff1b;Application.Exit(); 强制所有消息中止&#xff0c;退出所有的窗体&…

移动神器RAX3000M路由器不刷固件变身家庭云之六(高级应用):设置https

本系列文章&#xff1a; 移动神器RAX3000M路由器变身家庭云之一&#xff1a;开通SSH&#xff0c;安装新软件包 移动神器RAX3000M路由器变身家庭云之二&#xff1a;安装vsftpd 移动神器RAX3000M路由器变身家庭云之三&#xff1a;外网访问家庭云 移动神器RAX3000M路由器变身家庭云…