kafka 动态扩容现有 topic 的分区数和副本数

文章目录

    • @[toc]
      • 创建一个演示 topic
      • 生产一些数据
      • 使用消费者组消费数据
      • 增加分区
        • 无新数据产生,有旧数据未消费
        • 有新数据产生,有旧数据未消费
      • 增加副本
        • 创建 json 文件
        • 使用指定的 json 文件增加 topic 的副本数
        • 使用指定的 json 文件查看 topic 的副本数增加的进度
        • 查看 topic 情况

  • 文档内出现的 ${KAFKA_BROKERS} 表示 kafka 的连接地址,${ZOOKEEPER_CONNECT} 表示 zk 的连接地址,需要替换成自己的实际 ip 地址

创建一个演示 topic

kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT} --replication-factor 1 --partitions 3 --topic test-topic-update

查看 topic 详情

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update

总共是六个 kafka 节点,三分区一副本,分散在三个不同的 kafka 节点

Topic:test-topic-update PartitionCount:3        ReplicationFactor:1     Configs:segment.bytes=1073741824Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5     Isr: 5Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
  • 关于输出内容的概念
    • 分区(Partition)
      • 主题(Topic)在 Kafka 中的数据被分成一个或多个分区。每个分区是一个有序且持久化的消息日志。
      • 分区允许 Kafka 集群进行水平扩展,使多个消费者能够并行地处理主题的消息。
      • 消费者组中的每个消费者负责处理一个或多个分区的消息。
    • 领导者(Leader)
      • 每个分区都有一个领导者,领导者负责处理该分区的所有读写请求。
      • 生产者向领导者发送消息,消费者从领导者读取消息。
      • 领导者也负责维护分区的复制和同步。
    • 副本(Replicas)
      • 为了提高数据的冗余和可用性,每个分区可以有多个副本,包括一个领导者副本和零个或多个追随者副本。
      • 领导者副本处理写请求,追随者副本用于数据冗余和读请求。
    • 同步副本集(In-Sync Replicas,ISR)
      • 同步副本集是指在分区的所有副本中,与领导者副本保持同步的副本。
      • 领导者和同步副本集中的副本是可用于读取的,其他追随者副本可能会有一些延迟。

生产一些数据

  • 手动生产 300 条数据
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 300

使用消费者组消费数据

  • 消费者组不存在的情况下,没有返回被消费的数据,过两三秒之后,可以中断这个命令,然后使用下面的 --describe 来验证
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group

查看消费组内的 topic 消费情况

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

目前三百条都被消费了,使用上面的生产数据的命令,再生产300条,模拟 topic 有数据的场景

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             100             0               -               -               -
test-topic-update-group test-topic-update 0          100             100             0               -               -               -
test-topic-update-group test-topic-update 1          100             100             0               -               -               -

生产完数据后,再次查看,返回结果如下

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             200             100             -               -               -
test-topic-update-group test-topic-update 0          100             200             100             -               -               -
test-topic-update-group test-topic-update 1          100             200             100             -               -               -

增加分区

  • 在增加分区的场景下比较方便,直接使用 --alter 就能实现,这里将原来的 3 分区改成 12 分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12

无新数据产生,有旧数据未消费

查看 topic 情况

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update

可以看到,分区已经更新成 12 个了,也可以看出,kafka 在动态增加分区的时候,是均分的,都会按照类似下面的 5-1-0-3-2-4 这样的顺序去均分(当然,前提是分区数和节点数是倍数关系)

Topic:test-topic-update PartitionCount:12       ReplicationFactor:1     Configs:segment.bytes=1073741824Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5     Isr: 5Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0     Isr: 0Topic: test-topic-update        Partition: 3    Leader: 3       Replicas: 3     Isr: 3Topic: test-topic-update        Partition: 4    Leader: 2       Replicas: 2     Isr: 2Topic: test-topic-update        Partition: 5    Leader: 4       Replicas: 4     Isr: 4Topic: test-topic-update        Partition: 6    Leader: 5       Replicas: 5     Isr: 5Topic: test-topic-update        Partition: 7    Leader: 1       Replicas: 1     Isr: 1Topic: test-topic-update        Partition: 8    Leader: 0       Replicas: 0     Isr: 0Topic: test-topic-update        Partition: 9    Leader: 3       Replicas: 3     Isr: 3Topic: test-topic-update        Partition: 10   Leader: 2       Replicas: 2     Isr: 2Topic: test-topic-update        Partition: 11   Leader: 4       Replicas: 4     Isr: 4

查看消费组内的分区情况

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

因为没有新数据进入,也没有消费旧数据,此时还是显示的原先的信息

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             200             100             -               -               -
test-topic-update-group test-topic-update 0          100             200             100             -               -               -
test-topic-update-group test-topic-update 1          100             200             100             -               -               -

将未消费的 300 条数据进行消费

kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 300

消费完成后,再次查看消费组的情况

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

此时就变成正常的 12 分区了

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 0          100             100             0               -               -               -
test-topic-update-group test-topic-update 7          0               0               0               -               -               -
test-topic-update-group test-topic-update 5          0               0               0               -               -               -
test-topic-update-group test-topic-update 1          100             100             0               -               -               -
test-topic-update-group test-topic-update 6          0               0               0               -               -               -
test-topic-update-group test-topic-update 2          100             100             0               -               -               -
test-topic-update-group test-topic-update 3          0               0               0               -               -               -
test-topic-update-group test-topic-update 10         0               0               0               -               -               -
test-topic-update-group test-topic-update 9          0               0               0               -               -               -
test-topic-update-group test-topic-update 8          0               0               0               -               -               -
test-topic-update-group test-topic-update 11         0               0               0               -               -               -
test-topic-update-group test-topic-update 4          0               0               0               -               -               -

这里为了方便验证,我把 topic 删了后重建了,下面这个删除 topic 的命令,大家别随意执行,会删除数据的

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --delete --topic test-topic-update

有新数据产生,有旧数据未消费

  • 同样,先扩容分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12

未生产新数据的时候,查看消费者组的信息同样是没有更新分区信息

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

此时,手动使用命令模拟新数据进来

kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 100

通过命令查看消费者组的情况

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

此时显示的是老分区,而且只显示了 8+8+9=25 条数据

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             108             8               -               -               -
test-topic-update-group test-topic-update 0          100             108             8               -               -               -
test-topic-update-group test-topic-update 1          100             109             9               -               -               -

手动消费一下数据试试

kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 100

发现返回的信息里面,只显示25条数据

kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group

但是观察消费者组的情况,显示的是都消费了,看起来,应该是和 topic 加入新消费者组的情况一样,不展示,但实际消费数据了(这块是个人的理解,具体的原理需要有兴趣的大佬深究一下,希望能赐教带我飞)

GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 0          108             108             0               -               -               -
test-topic-update-group test-topic-update 7          9               9               0               -               -               -
test-topic-update-group test-topic-update 5          8               8               0               -               -               -
test-topic-update-group test-topic-update 1          109             109             0               -               -               -
test-topic-update-group test-topic-update 6          9               9               0               -               -               -
test-topic-update-group test-topic-update 2          108             108             0               -               -               -
test-topic-update-group test-topic-update 3          8               8               0               -               -               -
test-topic-update-group test-topic-update 10         8               8               0               -               -               -
test-topic-update-group test-topic-update 9          8               8               0               -               -               -
test-topic-update-group test-topic-update 8          8               8               0               -               -               -
test-topic-update-group test-topic-update 11         8               8               0               -               -               -
test-topic-update-group test-topic-update 4          9               9               0               -               -               -

增加副本

创建 json 文件

  • kafka-reassign-partitions.sh 是 Kafka 提供的命令行工具,用于重新分配主题分区的副本。这个工具允许你重新定义主题分区副本的分布,以实现负载均衡、故障恢复或集群扩展等目的

之前的 topic 是 1 副本,12 分区,按照之前的 5-1-0-3-2-4 的顺序来分配第一个副本,然后按照 4-3-2-0-1-5 的顺序来分配第二个副本,我这里的 json 文件就命名为:add_rep_test_topic_update.json,大家可以以自己实际来命名

{"version":1, "partitions":[
{"topic":"test-topic-update","partition":0,"replicas":[5,4]},
{"topic":"test-topic-update","partition":1,"replicas":[1,3]},
{"topic":"test-topic-update","partition":2,"replicas":[0,2]},
{"topic":"test-topic-update","partition":3,"replicas":[3,0]},
{"topic":"test-topic-update","partition":4,"replicas":[2,1]},
{"topic":"test-topic-update","partition":5,"replicas":[4,5]},
{"topic":"test-topic-update","partition":6,"replicas":[5,4]},
{"topic":"test-topic-update","partition":7,"replicas":[1,3]},
{"topic":"test-topic-update","partition":8,"replicas":[0,2]},
{"topic":"test-topic-update","partition":9,"replicas":[3,0]},
{"topic":"test-topic-update","partition":10,"replicas":[2,1]},
{"topic":"test-topic-update","partition":11,"replicas":[4,5]}]
}

使用指定的 json 文件增加 topic 的副本数

kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --execute --reassignment-json-file add_rep_test_topic_update.json

使用指定的 json 文件查看 topic 的副本数增加的进度

kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --verify --reassignment-json-file add_rep_test_topic_update.json

通过命令返回的内容,可以看出都成功了

Reassignment of partition test-topic-update-0 completed successfully
Reassignment of partition test-topic-update-7 completed successfully
Reassignment of partition test-topic-update-5 completed successfully
Reassignment of partition test-topic-update-1 completed successfully
Reassignment of partition test-topic-update-6 completed successfully
Reassignment of partition test-topic-update-2 completed successfully
Reassignment of partition test-topic-update-3 completed successfully
Reassignment of partition test-topic-update-10 completed successfully
Reassignment of partition test-topic-update-9 completed successfully
Reassignment of partition test-topic-update-8 completed successfully
Reassignment of partition test-topic-update-11 completed successfully
Reassignment of partition test-topic-update-4 completed successfully

查看 topic 情况

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update

现在的 topic 变成了 12 分区,2 副本的状态了

Topic:test-topic-update PartitionCount:12       ReplicationFactor:2     Configs:segment.bytes=1073741824Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5,4   Isr: 5,4Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1,3   Isr: 3,1Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2Topic: test-topic-update        Partition: 3    Leader: 3       Replicas: 3,0   Isr: 3,0Topic: test-topic-update        Partition: 4    Leader: 1       Replicas: 2,1   Isr: 1,2Topic: test-topic-update        Partition: 5    Leader: 4       Replicas: 4,5   Isr: 5,4Topic: test-topic-update        Partition: 6    Leader: 5       Replicas: 5,4   Isr: 4,5Topic: test-topic-update        Partition: 7    Leader: 1       Replicas: 1,3   Isr: 1,3Topic: test-topic-update        Partition: 8    Leader: 0       Replicas: 0,2   Isr: 0,2Topic: test-topic-update        Partition: 9    Leader: 3       Replicas: 3,0   Isr: 0,3Topic: test-topic-update        Partition: 10   Leader: 1       Replicas: 2,1   Isr: 1,2Topic: test-topic-update        Partition: 11   Leader: 4       Replicas: 4,5   Isr: 4,5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66395.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日志文件之间关系和介绍及应用

1.常用日志框架代码举例 Log4j: Log4j是Java中广泛使用的日志框架之一。它提供了灵活的配置选项和丰富的功能,支持日志级别、日志输出目标等。Log4j有1.x版本和2.x版本,其中Log4j 2.x是对1.x的升级和扩展。 Logback: Logback是由Log4j创始人设计的Log4…

Java8新特性2——方法引用

Java8新特性2——方法引用 注:以下内容基于Java 8,所有代码都已在Java 8环境下测试通过 目录: Java8新特性1——函数式接口&lambda表达式方法引用Stream 1. 方法引用 方法引用提供了一种替代 lambda 表达式的语法,允许以更…

Docker 及 Docker Compose 安装指南

Docker 是一个开源的容器化平台,可以帮助我们快速构建、打包和运行应用程序。而 Docker Compose 则是用于管理多个容器应用的工具,可以轻松定义和管理多个容器之间的关系。现在,让我们开始安装过程吧! docker 安装 apt安装 sudo…

C++,多继承

多继承的基本概念 一个类由多个类共同派生被称为多继承。 多继承的格式&#xff1a; class 类名:继承方式1 类名1,继承方式2 类名2,.....,继承方式n 类名n {子类的拓展 }; 示例&#xff1a; #include <iostream> using namespace std;//封装 沙发的类 class Sofa { p…

202. 快乐数

202. 快乐数 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。 然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无限循环 但始终变不到 1。 如果这个过程 结果…

从传统到智能化:汽车内部通信的安全挑战与SecOC解决方案

01/需求背景 Demand background 在传统的汽车电子结构中&#xff0c;车内的电控单元&#xff08;ECU&#xff09;数量和复杂性受到限制&#xff0c;通信带宽也受到限制。因此&#xff0c;人们普遍认为车内各个ECU之间的通信是可靠的。只要ECU节点接收到相应的消息&#xff0c…

华为OD机试 - 最长的指定瑕疵度的元音子串 - 正则表达式(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

9.1 校招 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | 理想汽车2024校园招聘正式启动&#xff08;内推&#xff09; 校招 | 理想汽车2024校园招聘正式启动&#xff08;内推&#xff09; 2、2023校招总结--自动驾驶&#xff08;软开/规控…

[libglog][FFmpeg] 如何把 ffmpeg 的库日志输出到 libglog里

ffmpeg 提供了自己的 log 模块 av_log&#xff0c;会默认把输出打印到 stderr 上&#xff0c;因此无法方便地跟踪日志。但是 ffmpeg 提供了一个接口 av_log_set_callback 以供外界自定义自己的日志输出。 libglog 提供的是c 形式的日志输出样式&#xff0c;因此需要将二者关联起…

代码随想录训练营二刷第十一天 | 20. 有效的括号 1047. 删除字符串中的所有相邻重复项 150. 逆波兰表达式求值

代码随想录训练营二刷第十一天 | 20. 有效的括号 1047. 删除字符串中的所有相邻重复项 150. 逆波兰表达式求值 一、20. 有效的括号 题目链接&#xff1a;https://leetcode.cn/problems/valid-parentheses/ 思路&#xff1a;思路遇到左括号把对应的右括号压入栈&#xff0c;节…

Unity 数据保存失败

问题 游戏数据突然保存不了了 没有任何报错 切后台保存也出问题 编辑器上和PC端没问题 移动端上保存不了 原因 我使用的存储方式是 Newtonsoft.Json 将对象转换成加密字符串并保存到本地 而不巧的是 我使用了 HashSet 这导致Newtonsoft.Json在转换的时候崩掉 且没有报错提示…

视频文件损坏无法播放如何修复?导致视频文件损坏的原因

如果我们遇到因视频文件损坏而无法正常播放&#xff0c;我们该怎么办&#xff1f;这种情况通常意味着视频文件已经损坏。我们不能访问、编辑或使用它们。那么应该用什么正确的工具和修复程序来修复视频呢&#xff1f; 视频文件损坏的原因 了解视频损坏如何修复之前&#xff0c…

任意文件读取和漏洞复现

任意文件读取 1. 概述 一些网站的需求&#xff0c;可能会提供文件查看与下载的功能。如果对用户查看或下载的文件没有限制或者限制绕过&#xff0c;就可以查看或下载任意文件。这些文件可以是漂代码文件&#xff0c;配置文件&#xff0c;敏感文件等等。 任意文件读取会造成&…

EasyExcel读模板生成excel文件注解Bean生成文件

文章目录 1、EasyExce依赖准备2、通过注解Bean的方式生成Excel2.1、注解Bean准备2.2、封装数据&#xff0c;生成Excel&#xff08;只需要几行代码&#xff09;2.3、生成结果展示 3、通过Excel模板生成数据3.1、准备编写Excel模板3.2、封装数据&#xff0c;生成excel3.3、模板导…

stm32之28.ADC

须看原理图&#xff08;引脚、电压值、ADC几号通道&#xff09;配置 。 若对比值0~4096 模拟电压/参考电压4096/x 假设模拟电压2.1V&#xff0c;参考电压3.3v&#xff0c;4096/x3.3/2.1 ->3.3x2.1x4096 ->x2,606.5 也可反推出模拟电压 ADC转换时间 ADC时钟来源于…

JavaScript -【第二周】

文章来源于网上收集和自己原创&#xff0c;若侵害到您的权利&#xff0c;请您及时联系并删除~~~ 理解什么是流程控制&#xff0c;知道条件控制的种类并掌握其对应的语法规则&#xff0c;具备利用循环编写简易ATM取款机程序能力 运算符语句综合案例 1. 运算符 算术运算符赋值运…

C#实现日期选择器、显示当地时间、跑马灯等功能

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System

SAP_ABAP_接口技术_RFC远程函数实践总结

SAP ABAP顾问能力模型梳理_企业数字化建设者的博客-CSDN博客SAP Abap顾问能力模型&#xff0c;ALV/REPORT|SMARTFROM|SCREEN|OLE|BAPI|BDC|PI|IDOC|RFC|API|WEBSERVICE|Enhancement|UserExits|Badi|Debughttps://blog.csdn.net/java_zhong1990/article/details/132469977 SAP接…

zabbix自动发现linux系统挂载的nas盘,并实现读写故障的监控告警

一.准备好被监控机器上面执行脚本,以备服务端发现和监控 脚本的内容: ZABBI安装路径可执行文件及配置文件根据实际部署的路径更改 #!/bin/bash >/zabbixconfpath/zbx_nas.conf >/zabbixscriptspath/findnas.sh >/zabbixscriptspath/checknas.sh >/zabbixscripts…

docker安装jenkins

运行jenkins docker run -d \--name jenkins \ --hostname jenkins \-u root \-p 29090:8080 \--restart always \-v D:\springcloud\学习\jekins\jenkins\jks_home:/var/jenkins_home \ jenkins/jenkins获取root登录密码 密码在jekins_home/secrets/initalAdminPassword文件…