Java程序创建Kafka Topic,以及数据生产消费,常用的命令

转自: Java程序创建Kafka Topic,以及数据生产消费,常用的命令_Zyy_z_的博客-CSDN博客_java kafka创建topicKafka简介: Kafka是一个分布式发布——订阅消息传递系统。Kafka快速、可扩展且耐用。它保留主题中的消息源。生产者将数据写入主题,消费者从主题中读取数据。Kafka的特点: 1. 同时为分布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万条消息(50MB),每秒处理55万条消息...https://blog.csdn.net/Zyy_z_/article/details/101680138


【1】Kafka简介

Kafka是一个分布式发布——订阅消息传递系统。Kafka快速、可扩展且耐用。它保留主题中的消息源。生产者将数据写入主题,消费者从主题中读取数据。

1)Kafka的特点:

  • 1. 同时为分布和订阅提供高吞吐量。 据了解,Kafka每秒可以生产约25万条消息(50MB),每秒处理55万条消息(110MB)这里说条数,可能不上特别准确,因为消息的大小可能不一致;
  • 2. 可进行持久化操作,将消息持久化到到磁盘,以日志的形式存储,因此可用于批量消费,例如ETL,以及实时应用程序。 通过将数据持久化到硬盘以及replication防止数据丢失。
  • 3. 分布式系统,易于向外拓展。所有的Producer、broker和consumer都会有多个,均为分布式。无需停机即可拓展 机器。
  • 4. 消息被处理的状态是在consumer端维护,而不是由server端维护,当失败时能自动平衡。

2)Kafka名词解释:

  1. producer:消息的生成者
  2. consumer:消息的消费者
  3. topic:你把它理解为标签
  4. broker:Kafka处理资源的消息源(feeds of messages)的不同分类

3)Kafka常用命令:

  1. 创建主题(4个分区,2个副本):  kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  2. 查询所有Topic:kafka-topics.sh --zookeeper localhost:2181 --list
  3. 查看指定得Topic:kafka-topics.sh --zookeeper localhost:2181 --describe --topic t_cdr
  4. 删除Topic:kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic t_cdr
  5. 生产者 :kafka-console-producer.sh --broker-list localhost:9092 --topic test
  6. 消费者 : kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
  7. 新生产者(支持0.9版本+):  kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
  8. 新消费者(支持0.9版本+):  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

【2】kafka java api

【2.1】Java程序操作创建Topic:   

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); props.put("retries", 0);
props.put("batch.size", 16384); props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
AdminClient  create = KafkaAdminClient.create(props);//创建Topic
create.createTopics(Lists.newArrayList(new NewTopic("Topic名称"),1,(short)1));//一个分区
create.close();//关闭

其他创建Topic得方式Java API:

https://blog.csdn.net/meng984611383/article/details/80500761icon-default.png?t=LA46https://blog.csdn.net/meng984611383/article/details/80500761


【2.2】Kafka生产数据:

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++) //生产数据producer.send(new ProducerRecord<String, String>("Topic名称", Integer.toString(i), Integer.toString(i)));
producer.close(); //关闭

【2.3】消费数据

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

  1. ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。
  2. retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  3. producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
  4. 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  5. buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
  6. key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。

 
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329840.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++ 凸包 分治算法_三维凸包

缘起众所周知&#xff0c;二维凸包可以使用 Graham 扫描 内解决.所以本文来学习一下三维空间中凸包的一种直观算法——增量算法(increment algorithm)分析有一条叫 Willy 的苹果虫一直快乐的居住在一个苹果中&#xff0c;直到有一天有一只仓鼠想吃这个苹果&#xff0c;Willy 自…

深度分析Java的ClassLoader机制(源码级别)

转载自 深度分析Java的ClassLoader机制&#xff08;源码级别&#xff09;Java中的所有类&#xff0c;必须被装载到jvm中才能运行&#xff0c;这个装载工作是由jvm中的类装载器完成的&#xff0c;类装载器所做的工作实质是把类文件从硬盘读取到内存中&#xff0c;JVM在加载类的时…

.NET跨平台之Sake和KoreBuild

最近在了解Asp.net Core 1.0&#xff08;也可称为Asp.net5\Asp.net vNext)的跨平台&#xff0c;发现了两个新的新东西&#xff1a;Sake和KoreBuild&#xff08;或者已经出了很久&#xff09;。 通过国内某度查询资料大部分都是复制黏贴来的&#xff0c;几乎没有详细的介绍。 只…

Ubuntu系统安装准备

前言 最初学习程序开发时&#xff0c;大多数都会教linux系统的基础用法&#xff0c;但是很少以linux为操作系统开发程序&#xff0c;因为windows系统的简单便利为人的提供优秀的操作基础&#xff0c;成为首选系统。 突然心血来找想使用linux操作系统作基础进行程序代码开发。 …

(转)Kafka 消费者 Java 实现

转自&#xff1a; Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息&#xff0c;首先理解 Kafka 中消费者&#xff08;consumer&#xff09;和消费者组&#xff08;consumer group...https://www.jianshu.com/p/1f9e18e926f6据原文作者&…

docker export_docker使用简介

一 docker服务端和客户端declare -x DOCKER_HOST"192.168.2.162export DOCKER_HOST192.168.2.162docker -H tcp://192.168.2.162:2375 images二 docker daemon配置cat /etc/sysconfig/dockerOPTIONS--selinux-enabled --log-driverjournald --signature-verificationfalse…

深度解析单例与序列化之间的爱恨情仇

转载自 深度解析单例与序列化之间的爱恨情仇本文将通过实例阅读Java源码的方式介绍序列化是如何破坏单例模式的&#xff0c;以及如何避免序列化对单例的破坏。单例模式&#xff0c;是设计模式中最简单的一种。通过单例模式可以保证系统中一个类只有一个实例而且该实例易于外界访…

Windows 10中国定制版完工!更专业

去年底&#xff0c;微软与中国电子科技网络信息安全有限公司成立了一个合资公司&#xff0c;该公司的主要任务是为中国政府和企业定制 Windows 10 系统。 现在&#xff0c;微软大中华区董事长兼CEO贺乐赋在接受财新网记者采访时表示&#xff0c;微软与中国电子科技集团公司&…

Spring [CVE-2022-22965]漏洞处理

问题描述 近期spring官方公布了漏洞 - [CVE-2022-22965] 参考地址&#xff1a; https://tanzu.vmware.com/security/cve-2022-22965 参考issues提到的问题答案开发人员回应&#xff1a; 可能是由于Springframework 3.x 早于 JDK9发布&#xff0c;甚至发布时还未完整的支持JDK…

kafak消费者从头开始消费(消费者组)

【README】 本文主要用于描述 kafka 消费者如何从头开始消费&#xff1b; 【1】从头开始消费 1&#xff09;从头开始消费&#xff0c;需要满足两个条件&#xff0c; 如下&#xff1a; 条件1&#xff0c; 使用一个全新的消费者组id&#xff1b;条件2&#xff0c;指定 auto.of…

jenkins安装与配置windows_Windows下Scoop安装、配置与使用

Scoop简介Scoop是Windows的命令行安装程序&#xff0c;是一个强大的包管理工具。可以在github上找到其项目的相关信息&#xff0c;项目网址。安装的起因&#xff1a;在平常生活中如果要安装像gcc、git等一些需要手动配置相关参数的工具&#xff0c;需要先去官网下载安装程序&am…

对于线程安全的集合类(例如Vector)的任何操作是不是都能保证线程安全

转载自 对于线程安全的集合类&#xff08;例如Vector&#xff09;的任何操作是不是都能保证线程安全之前在公众号中问了这个问题&#xff1a;对于线程安全的集合类&#xff08;例如Vector&#xff09;的任何操作是不是都能保证线程安全&#xff1f; 三天之内收到120回复&#x…

ASP.NET Core 1.0中的管道-中间件模式

ASP.NET Core 1.0借鉴了Katana项目的管道设计(Pipeline)。日志记录、用户认证、MVC等模块都以中间件(Middleware)的方式注册在管道中。显而易见这样的设计非常松耦合并且非常灵活&#xff0c;你可以自己定义任意功能的Middleware注册在管道中。这一设计非常适用于“请求-响应”…

怎么样安装Ubuntu系统,一文告诉你

前言 额滴神呐/(ㄒoㄒ)/~~&#xff0c;用惯了windows开发&#xff0c;初上手Linux桌面开发真的是举步维艰&#xff08;内心ps&#xff1a;谁让你立这个标题的&#xff0c;现在后悔了吧… 你自己想办法 怎么把这个标题栏目圆过去&#xff09; 经过跟内心戏反复的都在&#xff0…

(转) SpringBoot接入两套kafka集群

转自&#xff1a; SpringBoot接入两套kafka集群 - 风小雅 - 博客园引入依赖 compile org.springframework.kafka:spring-kafka 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html 引入依赖 compile org.springframework.kafka:spring…

idea tomcat部署web项目_项目开发之部署帆软到Tomcat服务一

书接上回上一篇文章介绍了两种图表取数的方式&#xff0c;新增数据库查询和通过存储过程取数&#xff0c;其他的内置数据集&#xff0c;文件数据集和关联数据集等方式暂时还没有用到&#xff0c;先暂时不介绍了&#xff0c;等之后用到了或者等小编有时间试过之后再来做个简单的…

C#工业物联网和集成系统解决方案的技术路线

前言 2000年以后&#xff0c;互联网在中国的大地上如火如荼的发展&#xff0c;在这个行业竞争中比的是加速度。我清晰的记得《世界是平的》中有这样一段话&#xff1a;在非洲&#xff0c;羚羊每天早上醒来时&#xff0c;它知道自己必须跑得比最快的狮子还快&#xff0c;否则就会…

转:Kafka事务使用和编程示例/实例

Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中&#xff0c;或者说是一个原子操作&#xff0c;生产消息和提交偏移量同时成功或者失败。注意&#xff1a;kafk…

[初级]Java中的switch对整型、字符型、字符串的具体实现细节

转载自 [初级]Java中的switch对整型、字符型、字符串的具体实现细节Java 7中&#xff0c;switch的参数可以是String类型了&#xff0c;这对我们来说是一个很方便的改进。到目前为止switch支持这样几种数据类型&#xff1a;byteshort int char String 。但是&#xff0c;作为一个…

SpringBoot-Cache整合redis

前言 SpringBoot的众多Starter有两个很重要的缓存Starter&#xff0c;其中一个是我们经常用到的Redis&#xff08;spring-boot-starter-data-redis&#xff09;还有一个是 spring-boot-starter-cache。 今天主要是简单介绍一个如何整合这两个组件&#xff0c;达到相互合作的关系…