【Kafka】主题Topic详解

目录

  • 主题的管理
    • 创建主题
    • 查看主题
    • 修改主题
    • 删除主题
  • 增加分区
  • 分区副本的分配
  • 必要参数配置
  • KafkaAdminClient应用
    • 功能
    • 操作示例

主题的管理

使用kafka-topics.sh脚本。

下面是使用脚本的一些选项

选项说明
–config <String: name=value>为创建的或修改的主题指定配置信息。
–create创建一个新主题
–delete删除一个主题
–delete-config <String: name>删除现有主题的一个主题配置条目。这些条目就 是在–config中给出的配置条目。
–alter更改主题的分区数量,副本分配和/或配置条目。
–describe列出给定主题的细节。
–disable-rack-aware禁用副本分配的机架感知。
–force抑制控制台提示信息
–help打印帮助信息
–replica-assignment
<String:broker_id_for_part1_replica1
:broker_id_for_part1_replica2
,broker_id_for_part2_replica1
:broker_id_for_part2_replica2 , …>
当创建或修改主题的时候手动指定partition-to-broker的分配关系。
–replication-factor <Integer:replication factor>要创建的主题分区副本数。1表示只有一个副本, 也就是Leader副本。
–topic <String: topic>要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。
–topics-with-overridesif set when describing topics, only show topics that have overridden configs
–unavailable-partitionsif set when describing topics, only show partitions whose leader is not available
–under-replicated-partitionsif set when describing topics, only show under replicated partitions
–zookeeper <String: urls>必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。

创建主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x - -partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list 
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x 
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe

修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576 kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

删除主题时,只是给主题添加删除的标记,要过一段时间删除。

增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通过–alter修改主题的分区数,增加分区。

kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2

分区副本的分配

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  2. 其余副本通过增加偏移进行分配。

必要参数配置

使用kafka-topics.sh --config xx=xx --config yy=yy

配置给主题的参数如下:

属性默认值服务器默认属性说明
cleanup.policydeletelog.cleanup.policy要么是”delete“要么是”compact“; 这个字符串指明了当他们的回收时间或者尺寸限制到达时,针对旧日志部分的利用方式。默认 方式(“delete”)将会丢弃旧的部分;”compact“将会进行日志压缩。
compression.typenoneproducer用于压缩数据的压缩类 型。默认是无压缩。正确的选项 值是none、gzip、snappy、 lz4。压缩最好用于批量处理,批 量处理消息越多,压缩性能越 好。
max.message.bytes1000000max.message.byteskafka追加消息的最大字节数。注 意如果你增大这个字节数,也必 须增大consumer的fetch字节 数,这样consumer才能fetch到 这些最大字节数的消息。
min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项配置控制log压缩器试图进行 清除日志的频率。默认情况下, 将避免清除压缩率超过50%的日 志。这个比率避免了最大的空间 浪费
min.insync.replicas1min.insync.replicas当producer设置 request.required.acks为-1时, min.insync.replicas指定replicas 的最小数目(必须确认每一个 repica的写数据都是成功的), 如果这个数目没有达到, producer会产生异常。
retention.bytesNonelog.retention.bytes如果使用“delete”的retention 策 略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制
retention.ms7 dayslog.retention.minutes如果使用“delete”的retention策 略,这项配置就是指删除日志前 日志保存的时间。
segment.bytes1GBlog.segment.byteskafka中log日志是分成一块块存储的,此配置是指log
segment.index.bytes10MBlog.index.size.max.bytes此配置是有关offsets和文件位置 之间映射的索引文件的大小;一 般不需要修改这个配置
segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.
segment.ms7 dayslog.roll.hours即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件
unclean.leader.election.enabletrue指明了是否能够使不在ISR中的replicas设置用来作为leader

KafkaAdminClient应用

除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient

功能

KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

  1. 创建主题: createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)

  2. 删除主题:deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)

  3. 列出所有主题:listTopics(final ListTopicsOptions options)

  4. 查询主题:describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)

  5. 查询集群信息:describeCluster(DescribeClusterOptions options)

  6. 查询配置信息:describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)

  7. 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)

  8. 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)

  9. 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)

  10. 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,DescribeReplicaLogDirsOptions options)

  11. 增加分区:createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。

用到的参数:

属性说明重要性
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。这个列表仅影响用来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,…
由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。
一般最好两台,以防其中一台宕掉。
high
client.id生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。一般该id是跟业务有关的字符串。medium
connections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000medium
request.timeout.ms客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000medium
security.protocol跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL.string类型值,默认:PLAINTEXTmedium
reconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。long型值,默认:50medium
retries重试的次数,达到此值,失败。int类型值,默认5。low
retry.backoff.ms在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。该时间的存在避免了密集循环。long型值,默认值:100。low
receive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认65536medium
send.buffer.bytes用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。int类型值,默认值:131072medium
reconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
long型值,默认1000
low

操作示例

主要操作步骤:

  1. 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
  2. 客户端发送请求至Kafka Broker。
  3. Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
  4. 客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。

示例:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;public class MyAdminClient {private KafkaAdminClient client;@Beforepublic void before() {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "node1:9092");configs.put("client.id", "admin_001");client = (KafkaAdminClient) KafkaAdminClient.create(configs);}@Afterpublic void after() {// 关闭admin客户端client.close();}@Testpublic void testListTopics() throws ExecutionException, InterruptedException {// 列出主题
//        final ListTopicsResult listTopicsResult = client.listTopics();ListTopicsOptions options = new ListTopicsOptions();// 列出内部主题options.listInternal(true);// 设置请求超时时间,单位是毫秒options.timeoutMs(500);final ListTopicsResult listTopicsResult = client.listTopics(options);//        final Set<String> strings = listTopicsResult.names().get();
//
//        strings.forEach(name -> {
//            System.out.println(name);
//        });// 将请求变成同步的请求,直接获取结果final Collection<TopicListing> topicListings = listTopicsResult.listings().get();topicListings.forEach(new Consumer<TopicListing>() {@Overridepublic void accept(TopicListing topicListing) {// 该主题是否是内部主题final boolean internal = topicListing.isInternal();// 该主题的名字final String name = topicListing.name();System.out.println("主题是否是内部主题:" + internal);System.out.println("主题的名字:" + name);System.out.println(topicListing);System.out.println("=====================================");}});}@Testpublic void testDescribeLogDirs() throws ExecutionException, InterruptedException {final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap= describeLogDirsResult.all().get();integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {@Overridepublic void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {System.out.println("broker.id = " + integer);
//                log.dirs可以设置多个目录stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {@Overridepublic void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {System.out.println("logdir = " + s);final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {@Overridepublic void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {System.out.println("主题分区:" + topicPartition.partition());System.out.println("主题:" + topicPartition.topic());
//                                final boolean isFuture = replicaInfo.isFuture;
//                                final long offsetLag = replicaInfo.offsetLag;
//                                final long size = replicaInfo.size;}});}});}});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mybatis-Plus入门

Mybatis-Plus入门 MyBatis-Plus 官网&#xff1a;https://mp.baomidou.com/ 1、简介 MyBatis-Plus (简称 MP) 是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、 提高效率而生。 https://github.com/baomidou/mybatis-p…

【Java】Spring注解开发

一、Spring注解开发 1 注解开发定义Bean对象【重点】 目的&#xff1a;xml配置Bean对象有些繁琐&#xff0c;使用注解简化Bean对象的定义 问题导入 问题1&#xff1a;使用什么标签进行Spring注解包扫描&#xff1f; 问题2&#xff1a;Component注解和Controller、Service、R…

Blender教程(基础)-初始用户界面-01

开始第一天的Blender学习、也是业余学习。希望记录下这一份学习的过程、并且分享给大家。今天带大家认识Blender这一款软件&#xff0c;先说说我为什么选择了Blender&#xff0c;我在软件市场找了好久&#xff0c;市场上其他雷同软件都是要么收费要么不好用&#xff0c;最终决定…

【Vitest】 Vitest测试框架的简单使用

简言 在了解vue源码的时候接触到了vitest测试框架,它的官网语言有中文&#xff0c;所以本篇只作简单的使用介绍。 Vitest官网 Vitest 旨在将自己定位为 Vite 项目的首选测试框架&#xff0c;即使对于不使用 Vite 的项目也是一个可靠的替代方案。它本身也兼容一些Jest的API用法…

1.28学习总结

队列&#xff1a; 1.求区间所有后缀最大值的位置&#xff08;单调队列&#xff09; 搜索&#xff1a; 1.天下第一&#xff08;记忆化&#xff09; 2.拯救oibh总部&#xff08;DFS连通性问题&#xff09; 3.国王的魔镜&#xff08;递归&#xff09; 4.回家&#xff08;BFS三维的…

AngularJS基础入门文档

引言&#xff1a; AngularJS是一个开源的JavaScript框架&#xff0c;用于构建动态Web应用程序。它提供了一套强大的工具和功能&#xff0c;使开发人员能够更轻松地构建交互性强、响应式的网页应用。本文将为您介绍AngularJS的基本概念和使用方法&#xff0c;帮助您快速入门并掌…

寒假思维训练计划day16 A. Did We Get Everything Covered?

今天更新一道1月27号晚上div2的C题作为素材&#xff0c;感觉用到了我的构造题总结模型&#xff0c;我总结了一系列的模型和例题。 摘要&#xff1a; Part1 定义"边界贪心法" Part2 题意 Part3 题解 Part4 代码 Part5 思维构造题模型和例题 Part1 边界贪心…

【服务器Midjourney】创建部署Midjourney网站

目录 🌺【前言】 🌺【准备】 🌺【宝塔搭建MJ】 🌼1. 给服务器添加端口 🌼2. 使用Xshell连接服务器 🌼3. 安装docker 🌼4. 安装Midjourney程序 🌼5. 绑定域名+申请SSL证书 🌼6. 更新网站

全球唯一使用Python生成双色球和大乐透

生成双色球和大乐透代码&#xff1a; import randomdef gen_union_lotto(nums: int):"""随机生成N个双色球:param nums::return:"""union_list []for i in range(0, nums):data []for data_ in range(0, 6):random_num random.randint(1, 33…

电脑 文件夹内 显示是 文件在一起 ,实际存储硬盘的不同地方?

是的&#xff0c;在电脑上&#xff0c;文件夹内显示在一起的文件可能实际上存储在硬盘的不同物理位置。这是因为现代操作系统使用的是文件系统来管理磁盘上的数据&#xff0c;常见的如NTFS&#xff08;Windows&#xff09;、HFS&#xff08;旧版Mac&#xff09;或APFS&#xff…

【GitHub项目推荐--如何构建项目】【转载】

这是一个 138K Star 的开源项目&#xff0c;这个仓库汇集了诸多优质资源&#xff0c;教你如何构建一些属于自己的东西&#xff0c;内容主要分为增强现实、区块链、机器人、编辑器、命令行工具、神经网络、操作系统等几大类别。 开源地址&#xff1a;https://github.com/danist…

【贪吃蛇:C语言实现】

文章目录 前言1.了解Win32API相关知识1.1什么是Win32API1.2设置控制台的大小、名称1.3控制台上的光标1.4 GetStdHandle&#xff08;获得控制台信息&#xff09;1.5 SetConsoleCursorPosition&#xff08;设置光标位置&#xff09;1.6 GetConsoleCursorInfo&#xff08;获得光标…

滴水逆向三期笔记与作业——02C语言——10 Switch语句反汇编

滴水逆向三期笔记与作业——02C语言——10 Switch语句反汇编 一、Switch语句1、switch语句 是if语句的简写2、break加与不加有什么特点?default语句可以省略吗&#xff1f;3、游戏中的switch语句&#xff08;示例&#xff09;4、添加case后面的值&#xff0c;一个一个增加&…

深兰科技入选亿欧《“制”敬不凡先锋榜·智能机器人Top10》榜单

日前&#xff0c;由亿欧协办的2023工博会工业智能化发展高峰论坛于上海成功举办&#xff0c;会上发布了《2023智能制造&#xff1a;“制”敬不凡先锋者》系列名单。深兰科技凭借在智能机器人开发中的技术创新和模式应用&#xff0c;入选《“制”敬不凡先锋榜——智能机器人Top1…

前端大厂面试题探索编辑部——第二期

目录 题目 单选题1 题解 关于TCP 关于UDP 单选题2 题解 A选项的HTTP是否是无状态协议 B选项的HTTP支持的方法 C选项的关于HTTP的状态码 D选项HTTP协议的传输格式 题目 单选题1 1.以下哪个描述是关于 TCP 和 UDP 的区别&#xff08;&#xff09; A. TCP 是无连接的…

行测-资料:3. 比重、平均数

1、比重 1.1 现期比重★★★ C A&#xff0c;16.63%≈1/6 B C&#xff0c;拆成 50% 和 6.6% ≈ 1/15。 C D 1.2 基期比重★ 数学推导&#xff0c;A&#xff0c;B&#xff0c;A/(1 a)&#xff0c;B / (1 b) A&#xff0c;4 / 9&#xff0c;12 / 27 x 1.14 / 1.18&#xff0c;看…

【大数据】Flink 架构(四):状态管理

Flink 架构&#xff08;四&#xff09;&#xff1a;状态管理 1.算子状态2.键值分区状态3.状态后端4.有状态算子的扩缩容4.1 带有键值分区状态的算子4.2 带有算子列表状态的算子4.3 带有算子联合列表状态的算子4.4 带有算子广播状态的算子 在前面的博客中我们指出&#xff0c;大…

【Java万花筒】数字信号魔法:Java库的魅力解析

从傅立叶到矩阵&#xff1a;数字信号Java库全景剖析 前言 随着数字信号处理在科学、工程和数据分析领域的广泛应用&#xff0c;开发者对高效、灵活的工具的需求日益增长。本文旨在探讨几个与数字信号处理相关的Java库&#xff0c;通过介绍其特点、用途以及与已有库的关系&…

WinRAR压缩包高级技巧:永久设置压缩包单个或批量单独压缩成包并且不内嵌文件夹,解压保留原始时间设置

目录点击跳转&#xff1a;WinRAR压缩包高级技巧&#xff1a;永久设置压缩包单个或批量单独压缩成包并且不内嵌文件夹&#xff0c;解压保留原始时间设置 解压永久设置1 解压保存原始时间 压缩永久设置1 默认压缩成zip手机电脑都通用的格式2 默认压缩文件不多额外嵌套一层文件夹&…

【新书推荐】3.1节 布尔运算

本节内容&#xff1a;布尔运算&#xff0c;又称为逻辑运算或位运算。 ■布尔代数&#xff1a;and与、or或、not非、xor异或&#xff0c;按位运算。 3.1.1 布尔代数 ■布尔代数与二进制的关系 乔治布尔是一位英国小学数学老师&#xff0c;19世纪最重要的数学家之一。出版了《…