【kafka】基本命令

创建 Kafka Topic 的命令

以下是创建 Kafka Topic 的几种常用方法:

1. 使用 kafka-topics.sh 基础命令(Kafka 自带工具)

bin/kafka-topics.sh --create \--bootstrap-server <broker地址:端口> \--topic <topic名称> \--partitions <分区数> \--replication-factor <副本数>
bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_new_topic \--partitions 3 \--replication-factor 2

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test01_topic --partitions 2 --replication-factor 2

2. 带额外配置的创建命令

bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_config_topic \--partitions 5 \--replication-factor 1 \--config retention.ms=172800000 \--config segment.bytes=1073741824

3. 使用 ZooKeeper 的旧版命令(Kafka 2.2 之前版本)

bin/kafka-topics.sh --create \--zookeeper localhost:2181 \--topic legacy_topic \--partitions 2 \--replication-factor 1

4. 使用 Kafka API (Java) 创建 Topic

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);NewTopic newTopic = new NewTopic("my_api_topic", 3, (short) 1);
newTopic.configs(Map.of("retention.ms", "86400000"));CreateTopicsResult result = admin.createTopics(Collections.singletonList(newTopic));
result.all().get(); // 等待创建完成

5. 验证 Topic 是否创建成功

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my_new_topic --bootstrap-server localhost:9092

常用配置参数

可以在创建时通过 --config 指定:

  • retention.ms - 消息保留时间(毫秒)

  • `segment.bytes

查看 Kafka 所有 Topic 的命令

以下是几种查看 Kafka 中所有 Topic 的常用方法:

1. 使用 kafka-topics.sh 基础命令(推荐)

bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2. 查看所有 Topic 的详细信息(包括分区、副本等)

bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092

3. 使用 ZooKeeper 的旧版命令(Kafka 2.2 之前版本)

bin/kafka-topics.sh --list --zookeeper <zookeeper地址:端口>
bin/kafka-topics.sh --list --zookeeper localhost:2181

4. 使用 kafkacat 工具

kafkacat -L -b <broker地址:端口>
kafkacat -L -b localhost:9092

5. 使用 Kafka API (Java)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);ListTopicsResult topics = admin.listTopics();
Set<String> topicNames = topics.names().get();topicNames.forEach(System.out::println);

6. 查看包含内部 Topic 的所有 Topic(如 __consumer_offsets)

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal

7. 使用正则表达式过滤 Topic

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | grep "your_pattern"

注意事项

  1. 新版本 Kafka(2.2+)推荐使用 --bootstrap-server 而不是 --zookeeper

  2. 生产环境中可能需要添加认证参数,如:

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config admin.properties
  3. 对于大型集群,列出所有 Topic 可能需要一些时间

查看 Kafka Topic 分区信息的命令

以下是几种查看 Kafka Topic 分区信息的常用方法:

1. 使用 kafka-topics.sh 工具(Kafka 自带)

bin/kafka-topics.sh --describe --topic <topic名称> --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092

2. 查看所有 topic 的分区信息

bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口>

bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
 

3. 使用 kafkacat 工具

kafkacat -L -b <broker地址:端口> -t <topic名称>

4. 使用 Kafka API (Java)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("my_topic"));
TopicDescription description = result.all().get().get("my_topic");description.partitions().forEach(partition -> {System.out.println("Partition: " + partition.partition());System.out.println("Leader: " + partition.leader());System.out.println("Replicas: " + partition.replicas());System.out.println("ISR: " + partition.isr());
});

输出信息解释

Topic: my_topic	PartitionCount: 3	ReplicationFactor: 2	Configs:Topic: my_topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: my_topic	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0Topic: my_topic	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

其中:

  • PartitionCount: 分区总数

  • ReplicationFactor: 副本因子数

  • Leader: 负责该分区读写的主 broker

  • Replicas: 该分区的所有副本所在的 broker

  • Isr: 同步中的副本(In-Sync Replicas)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/80641.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编程速递:适用于 Delphi 12.3 的 FMX Linux 现已推出

Embarcadero非常高兴地宣布&#xff0c;用于使用Delphi构建Linux客户端应用程序的FMX Linux UI库再次在RAD Studio 12.3版本以及RAD Studio 12.2版本中提供支持&#xff0c;同时也适用于更早的版本。 作为RAD Studio的一个附加库&#xff0c;FMX Linux为开发面向Linux的图形用…

通过实例讲解螺旋模型

目录 一、螺旋模型的核心概念 二、螺旋模型在电子商城系统开发中的应用示例 第 1 次螺旋:项目启动与风险初探

vue3 vite 路由

如路由是这种格式 http://localhost:7058/admin/product/brand路由配置如下 import { createRouter, createWebHistory } from vue-router import HomeView from ../views/HomeView.vue import NProgress from nprogress; import nprogress/nprogress.css; import {errorRour…

【Redis】Hash 存储相比 String 存储的优势

在 Redis 中&#xff0c;Hash 存储相比 String 存储具有以下 优势&#xff0c;特别适用于某些特定场景&#xff1a; ✅ 1. 更节省内存&#xff08;尤其适合存储对象&#xff09; Hash 内部使用压缩列表&#xff08;ziplist&#xff09;或哈希表实现&#xff0c;在数据量较小时…

CSS详解:特性、选择器与优先级

CSS详解&#xff1a;特性、选择器与优先级 目录 CSS详解&#xff1a;特性、选择器与优先级一、CSS的核心特性1. 层叠性&#xff08;Cascading&#xff09;2. 继承性&#xff08;Inheritance&#xff09;3. 优先级&#xff08;Specificity&#xff09;4. 响应式设计5. 动画与过渡…

《算法导论(第4版)》阅读笔记:p86-p90

《算法导论(第4版)》学习第 19 天&#xff0c;p83-p85 总结&#xff0c;总计 3 页。 一、技术总结 无。 二、英语总结(生词&#xff1a;2) 1. inkling (1)inkling: inclen(“utter in an undertone&#xff0c;低声说话”) c. a hint(提示)&#xff1b;a slight knowledg…

nginx概念及使用

一、Nginx 核心概念 Nginx&#xff08;发音为 "engine-x"&#xff09;是一个高性能、开源的 Web 服务器和反向代理服务器&#xff0c;由俄罗斯工程师伊戈尔・赛索耶夫&#xff08;Igor Sysoev&#xff09;于 2004 年开发&#xff0c;最初用于解决当时高并发场景下 Ap…

2025蓝桥杯JAVA编程题练习Day8

1. 路径 题目描述 小蓝学习了最短路径之后特别高兴&#xff0c;他定义了一个特别的图&#xff0c;希望找到图 中的最短路径。 小蓝的图由 2021 个结点组成&#xff0c;依次编号 1 至 2021。 对于两个不同的结点 a, b&#xff0c;如果 a 和 b 的差的绝对值大于 21&#xff0…

【赵渝强老师】Memcached的路由算法

Memcached支持两种不同方式的客户端路由算法&#xff0c;即&#xff1a;求余数Hash算法和一致性Hash算法。下面分别进行介绍。 一、 求余数的路由算法 求余数Hash算法的客户端路由是对插入数据的键进行求余数&#xff0c;根据余数来决定存储到哪个Memcached实例。 视频讲解如…

NLP学习路线图(一): 线性代数(矩阵运算、特征值分解等)

引言&#xff1a;语言与矩阵的奇妙邂逅 在自然语言处理&#xff08;NLP&#xff09;的魔法世界里&#xff0c;每个词语都像被施了变形术的精灵&#xff0c;在数学的殿堂中翩翩起舞。当我们用"king - man woman queen"这样的向量魔法破解语义密码时&#xff0c;线性…

BUUCTF PWN刷题笔记(持续更新!!)

ciscn_2019_c_1 64位&#xff0c;没有开启保护。点进去没发现明显的漏洞函数&#xff0c;考虑泄露libc基地址的rop构造。先看看有多少gadget 估计也够用了。puts函数只接受一个参数&#xff0c;观看汇编看看用的哪个寄存器传输的参数。 用的是edi。但是我们怎么找到so的版本呢…

Java EE初阶——线程安全

1. 线程的状态 1. 线程状态分类&#xff08;Thread.State 枚举&#xff09; Java 定义了 6 种线程状态&#xff0c;这些状态均由 java.lang.Thread.State 枚举表示&#xff1a; NEW&#xff08;新建&#xff09; 线程对象已创建&#xff0c;但尚未调用 start() 方法。此时线程…

Vue 3.0中响应式依赖和更新

响应式依赖和更新是Vue 3.0中最重要的机制&#xff0c;其核心代码如下&#xff0c;本文将结合代码对这个设计机制作出一些解释。 // 全局依赖存储&#xff1a;WeakMap<target, Map<key, Set<effect>>> const targetMap new WeakMap();// 当前活动的副作用函…

一、内存调优

一、内存调优 什么是内存泄漏 监控Java内存的常用工具 内存泄露的常见场景 内存泄露的解决方案 内存泄露与内存溢出的区别 内存泄露&#xff1a;在Java中如果不再使用一个对象&#xff0c;但是该对象依然在GC ROOT的引用链上&#xff0c;这个对象就不会被垃圾回收器回收&…

Linux /etc/rc.d/init.d/

在传统的 SysV init 系统中&#xff0c;服务启动脚本通常位于 /etc/rc.d/init.d/ 目录下。这些脚本可以直接执行以启动、停止或重启服务&#xff0c;并且可以接受参数如 start, stop, status 等。 如果你想知道位于 /etc/rc.d/init.d/ 目录下的某个脚本文件实际上指向哪里,如果…

S7 200 smart连接Profinet转ModbusTCP网关与西门子1200PLC配置案例

控制要求&#xff1a;使用MODBUSTCP通信进行两台PLC之间的数据交换&#xff0c;由于改造现场不能改动程序&#xff0c;只留出了对应的IQ地址。于是客户决定使用网关进行通讯把数据传到plc。 1、读取服务器端40001~40005地址中的数据&#xff0c;放入到VW200~VW208中&#xff1…

打破传统仓库管理困局:WMS如何重构出入库全流程

引言 在制造业与零售业高速发展的今天&#xff0c;仓库管理仍普遍面临效率低、错发漏发频发、库存数据滞后等痛点。人工登记导致30%的错单率&#xff0c;货位混乱让拣货耗时增加50%&#xff0c;而账实不符引发的二次采购成本更吞噬着企业利润。如何突破传统管理桎梏&#xff1…

Text2SQL在Spark NLP中的实现与应用:将自然语言问题转换为SQL查询的技术解析

概述 SQL 仍然是当前行业中最受欢迎的技能之一 免责声明&#xff1a;Spark NLP 中的 Text2SQL 注释器在 v3.x&#xff08;2021 年 3 月&#xff09;中已被弃用&#xff0c;不再使用。如果您想测试该模块&#xff0c;请使用 Spark NLP for Healthcare 的早期版本。 自新千年伊…

微服务项目->在线oj系统(Java版 - 5)

相信自己,终会成功 微服务代码: lyyy-oj: 微服务 目录 C端代码 用户题目接口 修改后用户提交代码(应用版) 用户提交题目判题结果 代码沙箱 1. 代码沙箱的核心功能 2. 常见的代码沙箱实现方式 3. 代码沙箱的关键问题与解决方案 4. 你的代码如何与沙箱交互&#xff1f; …

Vue3 Element Plus 中el-table-column索引使用问题

在 Element Plus 的 el-table 组件中&#xff0c;使用 scope.index 是不准确的。正确的索引属性应该是 scope.$index。你的代码需要调整为&#xff1a; vue 复制 下载 <el-button type"primary" size"default" text click"onModifyClick(scope…