完整教程:zk管理kafkakafka-broker通信

news/2025/9/27 12:37:37/文章来源:https://www.cnblogs.com/ljbguanli/p/19114968

完整教程:zk管理kafka&kafka-broker通信

系列文章目录


一、作用和不足

作用

2.8版本以前,kafka使用zk管理并存储kafka的关键元数据,每个kafka集群都有broker作为控制器,是zk watcher 选举产生的。controller不仅像其他普通节点一样存储主体分区日志和处理消费生成数据请求,而且还维护着kafka集群元数据,比如broker id,主题分区,leader, isr信息。它将这些信息持久化到zk中。

0.9版本以前,controller通过单线程循环的方式,将更新后的元数据同步给其他broker节点,生产者和消费者都需要直接连接到zk集群进行发送和消费数据,现在这种方式已经被通过 bootstrap.servers连接的方式取代

不足

在一个健康的zk集群中,每时每刻只有一个健康的leader节点,zk必须保证半数以上的节点可用,才能保证zk集群可用。浪费资源。

zk和kafka是两个不同的系统,运维工作加倍。

controller节点存在单点故障。

controller宕机的话,新controller 需要从zk拉取所有分区的元数据信息,向每个broker节点发送leaderAndISR和updateMetaData请求,时间复杂度是O(num.partitions)。

broker宕机的话, 受控关机时间长,尤其是某个broker节点上分布的分区leader非常多时,需要重选leader,重新拉取元数据信息,分区越多重启时间越长,时间复杂度是O(partitions.leader)。

二、zk全部宕机后,kafka集群能正常收发消息吗?

能够正常收发消息。但是在kafka的server log里面,会报错:xxxx拒绝连接
原因是,kafka0.9以后,zk的功能被弱化,用来帮助kafka集群选举controller节点等基本操作,zk节点宕机后,kafka集群只是不能再发现新的broker节点加入了,但是不影响既有的节点收发日志。

三、controller功能和如何选举

  1. 分区 Leader 选举
    当某个 Broker 宕机或重新加入时,Controller 负责重新选举该 Broker 上的分区 Leader。
    确保每个分区都有一个 Leader 和多个 Follower(副本)。
  2. 分区分配与重平衡
    在集群扩容、Broker 故障恢复等情况下,Controller 会重新分配分区到不同的 Broker 上,以实现负载均衡。
    控制 Rebalance(再平衡)过程,确保所有副本同步。
  3. 元数据管理
    维护并更新集群的元数据信息,如:
    Topic 的分区数量
    分区的 Leader 和副本信息
    Broker 的状态(在线/离线)
    Controller 自己的状态(主/备)
  4. 监控 Broker 状态
    检测 Broker 是否正常运行。
    如果某个 Broker 失联,Controller 会触发相应的处理逻辑(如重新分配分区)。
  5. 控制副本同步
    确保副本之间的数据同步,防止数据丢失

集群的启动中,如何选出controller: 抢占机制

集群运行中,一些broker节点压力过大,导致controller反应慢,由于没有超时,zk无法判断是否下线,但是导致某些分区选主超时,可以删除controller临时节点,zk触发controller重新选主。健康的broker节点抢先注册成为controller, epoch任期号是2.
path: /controller path:/controller_epoch

value:{broker: 1} value:2

四、kafka发送数据到client

向broker节点传输数据是通过networkclient完成的。kafka客户端的networkclient类使用java nio 实现与broker节点通信,源代码可以看Selector类,

producer获取元数据信息,判断需要发送的topic分区位于哪个broker节点,selector和具体的broker节点创建一个socket 连接。
this.nioSelector = java.nio.channels.Selector.open();

具体是如何连接的呢?看源码

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
//1.检查是否和Node重复建立连接,确保每个channel当下只能有一个Node连接
ensureNotRegistered(id);
//2.创建SocketChannel对象
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
/**
* 3.配置SocketChannel对象
* 将SocketChannel设置为非阻塞模式,keepAlive设置为true,TCP_NODELAY设置为true
* 指定发送和接收缓冲区的大小(如果不是默认值)
* */
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
/**
* 4.确认连接是否真正建立
* 因为是非阻塞模式,scoketChannel.connect()只是尝试发起一个连接请求,并不保证连接一定建立成功
* connect()返回true表示连接已经建立成功,返回false表示连接请求已经发出但还没有建立成功
* 因此boolean connected会立即返回连接状态,但是并一定是true
* */
boolean connected = doConnect(socketChannel, address);
/**
* 5.注册SocketChannel对象到Selector
* 注册感兴趣的事件为OP_CONNECT,表示当连接建立成功时,Selector会收到通知
* */
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
//6.1 将key加入immediatelyConnectedKeys集合,表示该连接已经建立成功
immediatelyConnectedKeys.add(key);
//6.2 将key的感兴趣事件清空,因为连接已经建立成功,不需要再监听OP_CONNECT事件
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}

Selector类中还有一个poll()方法,获取准备就绪的SelectionKey,代表的是当前SocketChannel上已经就绪的时间,然后就可以 遍历这些SelectionKey来执行在interestSets中的兴趣操作
在KafkaProducer和KafkaConsumer中,负责与Broker通信的线程会重复调用NetworkClient的poll(), 在poll()中,会执行selector的select()方法,并以IO多路复用的方式处理与Broker之间的通信

if (numReadyKeys >
0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
//获取所有准备就绪的SelectionKey
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

sender子线程将封装好的发送请求缓存在InFlightRequests中,然后由NIO请求将消息发送到broker
在这里插入图片描述

InFlightRequests中的一个Deque中可以缓存多个sender创建好的ProduceRequest。“飞行中的请求”指的是请求发出去,尚未收到broker端的响应
其中Deque大小由参数max.in.flight.requests.per.connection决定,默认是5.如果将幂等性参数设置为true,broker会按照请求的先后顺序处理produceRequest;反之如果设置为false,只能将max.in.flight.requests.per.connection设置为1,才能保证顺序发送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/919465.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

域泛化DomainBed的评价指标含义解释

DomainBed是域泛化领域的公认框架,其统一了输入输出以及相关细节处理,使得泛化性能比较更加公平公正,但是庞大的框架使其理解十分困难,今天首先介绍其评价指标,即Selection字段。结果展示 +------------+--------…

JUC: 线程锁

1 面试题复盘如何理解多线程,如何处理并发,线程池有哪些核心参数?Java加锁有哪几种锁?synchronized原理是什么?为什么可重入?如何获取对象的锁?JVM对原生锁做了哪些优化?什么是锁清除和锁粗化?乐观锁是什么?…

手机网站是怎么制作的wordpress好玩插件

1.新建Android应用&#xff0c;确定应用包名 2.注册高德开放平台&#xff0c;打开控制台页面&#xff0c;应用管理&#xff0c;我的应用&#xff0c;创建新应用 3.添加Key 4.获取SHA1码 找到Android Studio自带的keytool 将其拖到cmd中&#xff0c;输入命令 -v -list -keystor…

网站在线咨询模块东营市招投标信息网

&#x1f389;博主首页&#xff1a; 有趣的中国人 &#x1f389;专栏首页&#xff1a; Linux &#x1f389;其它专栏&#xff1a; C初阶 | C进阶 | 初阶数据结构 小伙伴们大家好&#xff0c;本片文章将会讲解Linux中项目自动化构建工具make/makefile的相关内容。 如果看到最后…

dede网站地图怎么做lamp网站开发 pdf

为什么80%的码农都做不了架构师&#xff1f;>>> 介绍 在本系列的第一篇文章中&#xff0c;安装了Node.js、Ignite的Node.js瘦客户端包&#xff0c;并且测试了一个示例应用。在本文中&#xff0c;可以看一下Ignite在处理其它数据源&#xff08;比如关系数据库&#…

InteractiveCommunication Problems

/偏向于前者。CSP 初赛塞了两个交互,有点慌。

JSON 框架混用避坑指南:FastJSON vs Jackson

`com.alibaba.fastjson.JSON.parseObject()` 方法无法识别 Jackson 的 `@JsonProperty` 注解,导致字段映射失败。 核心矛盾:FastJSON 无法识别 Jackson 的 @JsonProperty 注解目录一、问题定位二、框架对比表三、典…

实用指南:网络通信协议全解析:HTTP/UDP/TCP核心要点

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

企业级大数据技术栈:基于Hadoop+Spark的全球经济指标分析与可视化环境实践

企业级大数据技术栈:基于Hadoop+Spark的全球经济指标分析与可视化环境实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-famil…

网站制作的相关术语西安专业做网站建

连接MySQL数据库时常见故障问题的分析与解决 初学的mysql网友好象经常会碰到mysql无法连接的错误。特开贴收集这样问题的现象和原因。 先自己扔块砖头出来。 归纳如下&#xff1a; 故障现象 : 无法连接 mysql 错误信息1 &#xff1a;ERROR 1045 (28000): Access deni…

若邻接矩阵是三角矩阵,则存在拓扑序列;反之则不一定成立

目录1. 命题回顾2. 前半句:邻接矩阵是三角矩阵 ⇒ 存在拓扑序列2.1 邻接矩阵是上三角矩阵的情况2.2 邻接矩阵是下三角矩阵的情况3. 后半句:反之则不一定成立4. 最终判断1. 命题回顾若邻接矩阵是三角矩阵,则存在拓扑…

Gateway-断言 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

macOS 多 Java 版本管理(jenv 方案)

目录一、目标二、查看已安装的 JDK三、使用 jenv 管理 Java 版本1. 安装 jenv2. 配置 Shell 环境3. 添加已安装的 JDK4. 查看可用版本5. 切换 Java 版本6. 验证版本四、常见问题1. 权限问题2. Shell 配置文件选择错误五…

龙口网站制作价格衡阳网站建设技术外包

操作&#xff1a; 是时机函数&#xff0c;在页面加载前&#xff0c;可以在这两个函数里面做一些事情&#xff0c; 比如发送异步请求。 类似过滤器&#xff0c;或者拦截器。1. axios安装 安装报错&#xff0c;多装几遍&#xff0c;或者用cnpm安装 npm install axios -s npm in…

怎么提高网站关键字排名网站怎么做360免费优化

在数字化浪潮席卷全球的今天&#xff0c;跨境电商业务蓬勃发展&#xff0c;成为推动国际贸易增长的重要引擎。亚马逊&#xff0c;作为全球最大的电商平台之一&#xff0c;以其独特的平台特点和全球化布局&#xff0c;为卖家和买家提供了便捷、高效的交易环境&#xff0c;成为众…

广州搜索seo网站优化建设银行网站字体

免责声明: 本文旨在提供有关特定漏洞的深入信息,帮助用户充分了解潜在的安全风险。发布此信息的目的在于提升网络安全意识和推动技术进步,未经授权访问系统、网络或应用程序,可能会导致法律责任或严重后果。因此,作者不对读者基于本文内容所采取的任何行为承担责任。读者在…

AI 落地教育智慧招生:从 “热线占线” 到 “724 小时精准应答” 的实践分享

AI 落地教育智慧招生:从 “热线占线” 到 “724 小时精准应答” 的实践分享在教育招生季,家长对 “报名时间”“学区范围”“学校特色” 的咨询需求集中爆发,而传统招生咨询模式往往陷入 “家长急、老师累、效率低”…

软件技术基础第一次课程

这个作业属于哪个课程 https://edu.cnblogs.com/campus/zjlg/25rjjc 这个作业的目标 初步学习博客的发文方法,进行自我评估,有初步的认知 姓名-学号 林靖迪- 2023329301118自我介绍与自我评估自我介绍 我叫林靖迪,是…