kafka 重新分配节点_Kafka控制器-分区重分配

分区重分配指的是将分区的副本重新分配到不同的代理节点上。如果ZK节点中分区的副本的新副本集合和当前分区副本集合相同,这个分区就不需要重新分配了。

分区重分配是通过监听ZK的 /admin/reassign_partitions 节点触发的,Kafka也提供了相应的脚本工具进行分区重分配,使用方法如下:

./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute

其中 XXX.json是分区重分配的JSON文件,格式如下:

{

"version":1,

"partitions":[

{"topic":"product", "partition":0, "replicas":[4,5,6]},

{"topic":"product", "partition":1, "replicas":[1,2,3]},

{"topic":"product", "partition":4, "replicas":[4,5,6]}

]}

假设主题 product 的分区数只有 {P0, P1},当执行上面的脚本时。此时会发现P4的分区对于 product 主题根本就不存在,此时就会忽略掉P4的副本迁移。对于P0和P1的副本重分配,可以简单的理解为下面的过程。

分区重分配命令接收

当使用脚本提交分区重分配时,接收命令的是 kafka.admin.ReassignPartitionsCommand#executeAssignment():

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {

val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)

val adminZkClient = new AdminZkClient(zkClient)

val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)

// 如果当前有正在执行中的分区重分配,则终止当前提交

if (zkClient.reassignPartitionsInProgress()) {

println("There is an existing assignment running.")

reassignPartitionsCommand.maybeLimit(throttle)

} else {

printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))

if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)

println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))

/** 更新重分配数据至ZK */

if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {

println("Successfully started reassignment of partitions.")

} else

println("Failed to reassign partitions %s".format(partitionAssignment))

}

}

提交命令时,如果分区重分配还在进行,那么本次无法提交,意味着当前只能有一个执行的分区重分配。

重分配监听执行整体流程

当 /admin/reassign_partitions 被修改后,监听器会触发 PartitionReassignment 事件,其代码执行链如下所示:

下面我们看一下代码执行流程的展开。

分区重分配流程

控制器事件模型中 PartitionReassignment 事件,会触发调用processPartitionReassignment()。此时会注册监听ZK节点 /admin/reassign_partitions 变化,当重分配策略更新到ZK上时,该监听器就会被触发,然后执行分区重分配逻辑。

case PartitionReassignment =>

processPartitionReassignment()

private def processPartitionReassignment(): Unit = {

if (!isActive) return

/** 注册 /admin/reassign_partitions 节点变化监听 */

if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {

val partitionReassignment = zkClient.getPartitionReassignment

partitionReassignment.foreach { case (tp, newReplicas) =>

val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)

/** 记录正在迁移的分区副本 */

controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))

}

maybeTriggerPartitionReassignment(partitionReassignment.keySet)

}

}

前置判断

对于是否需要分区重分配,在 maybeTriggerPartitionReassignment() 中做了一些判断取舍,其代码实现如下:

/**

* 如有下面情况发生,则不进行分区重分配:

* 1. Topic设置了删除标识;

* 2. 新副本与已经存在的副本相同;

* 3. 分区所有新分配的副本都不存活;

* 上面的情况发生时, 会输出一条日志, 并从ZK移除该分区副本的重分配记录

*/

private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {

val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]

topicPartitions.foreach { tp =>

if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {

/** 如果topic已经设置了删除,不进行重分配(从需要副本迁移的集合中移除) */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {

throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +

s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")

}

val newReplicas = reassignedPartitionContext.newReplicas

val topic = tp.topic

val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)

if (assignedReplicas.nonEmpty) {

if (assignedReplicas == newReplicas) {

/** 新副本与已经存在的副本相同,不进行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

try {

/** 注册ISR变化监听 */

reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)

/** 设置正在迁移的副本不能删除 */

topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress")

/** 执行重分配 */

onPartitionReassignment(tp, reassignedPartitionContext)

} catch {

}

}

} else {

/** 分区副本都不存活,不进行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

}

}

}

removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)

}

对于前置校验的流程如下:

1、如果topic已经设置了删除,不进行重分配(从需要副本迁移的集合中移除);

2、如果分区副本都不存活,不进行重分配;

3、如果新副本与已经存在的副本相同,不进行重分配;

4、注册ISR变化监听;

5、设置将要迁移的副本为不能删除;

6、调用 onPartitionReassignment() 执行重分配。

执行分区重分配

分区重分配的执行是在 onPartitionReassignment() 中实现的,下面说明一下官方给出的几个技术名词:

RAR:新分配的副本列表;

OAR:原先的分区副本列表;

AR:当前副本列表,随着分配过程不断变化;

RAR-OAR:RAR与OAR的差集,即需要创建、数据迁移的新副本;

OAR-RAR:OAR与RAR的差集,即迁移后需要下线的副本。

重分配的具体代码实现如下所示:

/**

* 当需要进行分区重分配时, 会在[/admin/reassign_partitions]下创建一个节点来触发操作

* RAR: 重新分配的副本, OAR: 分区原副本列表, AR: 当前的分配的副本

*/

private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {

val reassignedReplicas = reassignedPartitionContext.newReplicas

if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {

/** 新分配的并没有全在ISR中 */

/** RAR-OAR */

val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet

/** RAR+OAR */

val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet

/** 1.将AR更新为OAR + RAR */

updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)

/** 2.向上面AR(OAR+RAR)中的所有副本发送LeaderAndIsr请求 */

updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq)

/** 3.新分配的副本状态更新为NewReplica(第2步中发送LeaderAndIsr时, 新副本会开始创建并且同步数据)*/

startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)

} else {

/** 4.等待所有的RAR都在ISR中 */

/** OAR - RAR */

val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet

/** 5.将副本状态设置为OnlineReplica */

reassignedReplicas.foreach { replica =>

replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)

}

/** 6.将上下文中的AR设置为RAR */

/** 7.新加入的副本已经同步完成, LeaderAndIsr都更新到最新的结果 */

moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)

/** 8-9.将旧的副本下线 */

stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)

/** 10.将ZK中的AR设置为RAR */

updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)

/** 11.分区重分配完成, 从ZK /admin/reassign_partitions 节点删除迁移报文 */

removePartitionsFromReassignedPartitions(Set(topicPartition))

/** 12.发送metadata更新请求给所有存活的broker */

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))

/** 如果topic标记了删除, 此时唤醒删除线程*/

topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))

}

}

上面代码执行的过程描述如下:

1.将AR更新为OAR+RAR;

2.向上面AR(OAR+RAR)中的所有副本发送LeaderAndIsr请求;

3.新分配的副本状态更新为NewReplica(第2步中发送LeaderAndIsr时, 新副本会开始创建并且同步数据);

4.等待所有的RAR都在ISR中;

5.将副本状态设置为OnlineReplica;

6.将上下文中的AR设置为RAR;

7.新加入的副本已经同步完成, LeaderAndIsr都更新到最新的结果;

8-9.将旧的副本下线;

10.将ZK中的AR设置为RAR;

11.分区重分配完成, 从ZK /admin/reassign_partitions 节点删除迁移报文;

12.发送metadata更新请求给所有存活的broker;

重分配简单描述

通过代码层面看起来不是很好理解,下面简单描述一下执行过程:

1、创建新的副本,开始同步数据,等所有新副本都加入了ISR后,在RAR中进行Leader选举;

2、下线不需要的副本(OAR-RAR),下线完成后将AR(即RAR)信息更新到ZK中;

3、发送LeaderAndIsr给存活broker。

假如初始情况下,分区副本在 {1,2,3} 三个 Broker 上;重分配之后在{4,5,6}上,此时变化过程如下图所示:

参考:《Kafka技术内幕》、《Apache Kafka 源码剖析》、Kafka源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/385997.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7天拿到阿里安卓岗位offer,统统给你解决!

开头 技术的发展产生了程序员这个职位,从这些年各大互联网公司曝光的一些员工收入水平来看,程序员的工资还是相对比较高的,可是我们在互联网上还听到了另外一种声音,很多程序员想转行,特别是大龄程序员,这…

python mysqldb 查询不到最新记录_python – MySQLdb是否缓存SELECT结果?

我正在循环中运行SELECT查询.偶尔,数据库表会更新(由另一个程序).第一个SELECT检索正确的数据,但循环中的其他调用返回第一个值.如何检索最新数据?到目前为止我找到的唯一解决方法是在每次迭代时重新连接到数据库!在我的例子中,取消注释#1#和#2#的注释.仅…

7天拿到阿里安卓岗位offer,附高频面试题合集

前言 众所周知,Android是一个基于Linux实现的操作系统。但对于Linux内核来说,Android也仅仅只是一个运行在内核之上的应用程序,与其他运行在内核之上的应用程序没有任何区别。 所以Android也需要运行环境,需要Linux内核在启动完…

找零钱问题

最近在做华为机试体验题,遇到一个“找零钱”的题目,如下 想起之前在牛客网上看到左程云老师讲过的动态规划问题,很像,题目如下: 有数组penny,penny中所有的值都为正数且不重复。每个值代表一种面值的货币&…

vga焊接线顺序_焊接工艺问答,不做焊接也要收藏起来

点 机械前沿”关注置顶引领机械前沿、机械视频,汽车、加工技术、3D打印、自动化、机器人、生产工艺、轴承、模具、机床、钣金等行业前沿在这里等你 焊接工艺问答1.什么叫焊接条件?它有哪些内容? 答:焊…

7年老Android一次操蛋的面试经历,挥泪整理面经

看到还有很多程序员连面试流程都没有彻底弄清楚,今天,我们以阿里为例,来聊聊互联网大厂的面试流程和过程! 本篇主要还是聊聊社招的面试过程!阿里以及其他的互联网大厂的技术类社招面试,通常情况是 4 个轮次…

gin context和官方context_Go Web 小技巧(一)简化Gin接口代码

不知道大家在使用 Gin 构建 API 服务时有没有这样的问题:参数绑定的环节可不可以自动处理?错误可不可以直接返回,不想写空 return, 漏写就是 bug本文通过简单地封装,利用 go 的接口特性,提供一个解决上述两个问题的思路一、解决过…

7年老Android一次操蛋的面试经历,深度好文

Java基础 Java Object类方法HashMap原理,Hash冲突,并发集合,线程安全集合及实现原理HashMap 和 HashTable 区别HashCode 作用,如何重载hashCode方法ArrayList与LinkList区别与联系GC机制Java反射机制,Java代理模式Jav…

Hadoop大数据应用生态圈中最主要的组件及其关系

Hadoop Common Hadoop Common是在Hadoop0.2版本之后分离出来的HDFS和MapReduce独立子项目的内容,是Hadoop的核心部分,能为其他模块提供一些常用工具集,如序列化机制、Hadoop抽象文件系统FileSystem、系统配置工具Configuration,并…

7年老Android一次操蛋的面试经历,系列教学

公司的需求 不同的公司,不同的需求现在的市场上,公司很多,大致上可以归纳为两个大类:大公司和小公司,他们招聘时对人才的需求也不一样。 小公司 小公司他们一般急需的是能够投入工作的人才,因为公司规模…

丁香园 武汉 神童_杭州、武汉、成都哪个城市更适合程序员发展

很多朋友讨论起房价和职业发展机会,都会提到这三个城市,有的人认为目前杭州房价太贵了,生活成本高,华中的武汉和西部崛起的成都都在鼓励高新技术发展并且有了一定成果,在选择职业发展和定居城市之间该如何取舍呢&#…

Windows 7 64位系统上搭建Hadoop伪分布式环境(很详细)

在开始配置前,我们先了解Hadoop的三种运行模式。 Hadoop的三种运行模式 独立(或本地)模式:无需运行任何守护进程,所有程序都在同一个JVM上执行。在独立模式下测试和调试MapReduce程序很方便,因此该模式在…

7年老Android一次操蛋的面试经历,讲的太透彻了

由于涉及到的面试题较多导致篇幅较长,我根据这些面试题所涉及到的常问范围总结了并做出了一份学习进阶路线图​​​​​​​及面试题答案免费分享给大家,文末有免费领取方式! View面试专题 View的滑动方式View的事件分发机制View的加载流程…

处理效应模型stata实例_stata︱政策处理效应模型sata基本命令汇总

本文来源经管之家论坛,由坛友cuifengbao归纳 Use ,文件名.dta,clear Ssc installpamatch2,replace 一、首先做一元回归 reg 结果变量 处理变量,r 二、直接引入协变量,再做多元回归 reg 结果变量 处理变量 协变量1 协变量2 协变量3……,r 三、接下来进行倾向得分匹配 1.将数…

80后程序员月薪30K+感慨中年危机,面试必问!

说说程序猿行业 现在社会上给IT行业贴上了几个标签:高薪、高危、高大上、秃顶(哈哈)。这些标签我相比大家都比较清楚,至于为什么是这些标签呢?而且这些标签是真实还是假象呢? 高薪 作为IT行业来说&#…

华为照片在哪个文件夹_原来华为手机还能这样清理垃圾,怪不得你的手机可以多用5年...

对于目前市场上的智能手机来说,大家的手机功能都是差不多的,除了一些外观上的差别之外,最大的区别就是手机的内存,但是很多朋友却表示手机内存很大,但是没用多久,手机就会出现卡顿或者是运行速度变慢的现象…

996页阿里Android面试真题解析火爆全网,全网首发!

在安卓系统中: 当系统内存不足时,Android系统将根据进程的优先级选择杀死一 些不太重要的进程,优先级低的先杀死。进程优先级从高到低如下。 前台进程 处于正在与用户交互的activity与前台activity绑定的service调用了startForeground&…

python不适合大型项目_在大型项目上,Python 是个烂语言吗? |

【洪强宁的回答(89票)】:太多硬伤和臆想,懒得批。只说“代码超过 10w 以后你就别想用 python 开发了”这一句,2012年4月豆瓣主站项目代码行数就近50万行了,可我们还在用 python 开发。【刘鑫的回答(42票)】:我写过几年Python,也写…

996页阿里Android面试真题解析火爆全网,分享面经!

导语 学历永远是横在我们进人大厂的一道门槛,好像无论怎么努力,总能被那些985,211 按在地上摩擦! 不仅要被“他们”看不起,在HR挑选简历,学历这块就直接被刷下去了,连证明自己的机会也没有,学…

access ole 对象 最大长度_Redis 数据结构和对象系统,有这 12 张图就够了!

作者 | 程序员历小冰责编 | 林瑟Redis 是一个开源的 key-value 存储系统,它使用六种底层数据结构构建了包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象的对象系统。 今天我们就通过 12 张图来全面了解一下它的数据结构和对象系统的实现原理。01数据结…