kafka基本概念及操作

kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

1.使用场景

  • 日志收集:可以用Kafka收集各种服务的log,通过kafka以统⼀接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

  • 消息系统:解耦和生产者和消费者、缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

2.kafka基本概念

image-20230811105243277

整个流程应该是:producer通过网络发送消息到Kafka集群,然后consumer 来进行消费,如下图:

image-20230811105555825

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

kafka基本使用

1.安装&关闭

以下所有操作全部基于kafka_2.13-3.0.1.tgz (3.0.1版本) 这个版本

配置文件server.properties(主要修改以下配置)

#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9092
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9092
#kafka的消息存储⽂件
log.dir=/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181

启动

./kafka-server-start.sh -daemon ../config/server.properties

验证

# 查看端口是否占用
netstat -ntlp 

或者

进入到zk内查看是否有kafka的节点:/brokers/ids/0

./zkCli.sh

image-20230814134630858

关闭kafka

./kafka-server-stop.sh stop ../config/server.properties

2.创建topic

执行以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因子也设置为 1

./kafka-topics.sh --bootstrap-server kafkahost:9092 --create --topic test --partitions 1 --replication-factor 1-- 新版本的kafka,已经不需要依赖zookeeper来创建topic,新版的kafka创建topic指令如下:
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test --partitions 1 --replication-factor 1

3.查看kafka中所有的主题

./kafka-topics.sh --bootstrap-server kafkahost:9092 --list./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --list

4.发送消息

kafka自带了⼀个producer命令客户端,可以从本地文件中读取内容或者以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每⼀个行会被当做成⼀个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

把消息发送给broker中的某个topic,打开⼀个kafka发送消息的客户端,然后开始用客户端向kafka服务器发送消息

./kafka-console-producer.sh --bootstrap-server 124.222.253.33:9092 --topic test

5.消费消息

消费消息两种方式

对于consumer,kafka同样也携带了⼀个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。 使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

  1. 从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --topic test
    
  2. 从当前主题中的第⼀条消息开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --from-beginning --topic test
    

6.消息的细节

image-20230811094410650

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时可以通过offset来描述当前要消费的那条消息的位置

7.单播&多播消息

单播还是多播消息取决于topic有多少消费组

1)单播

如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。(同⼀个消费组中只能有⼀个消费者收到订阅topic中的消息。)

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup --topic test

2)多播

不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同⼀个消息。

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup2 --topic test

3)区别

image-20230811111854111

8.查看消费组详细信息

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --list# 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --describe --group testGroup

image-20230811113410786

  • Currennt-offset:当前消费组的已消费偏移量(最后被消费的消息的偏移量)
  • Log-end-offset:主题对应分区消息的结束偏移量(HW) 【消息总量,最后一条消息偏移量】
  • Lag:当前消费组未消费的消息数(积压消息量)

Kafka中主题和分区的概念

1.主题

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。

但是有⼀个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的。为了解决单个文件过大的问题,kafka提出了Partition分区的概念。

2.分区

1)分区的概念

通过partition将⼀个topic中的消息分区来存储。

这样的好处有多个:

  • 分区存储,可以解决统⼀存储文件过大的问题
  • 提高了读写的吞吐量:读和写可以同时在多个分区中进行

image-20230813215613707

2)创建多分区的主题

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test1 --partitions 2 --replication-factor 1

3.kafka中消息日志文件中保存的内容

  • 00000.log: 这个文件中保存的就是消息

  • __consumer_offsets-49:

    kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区(可以通过offsets.topic.num.partitions设置)。

    • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
    • 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值
  • 文件中保存的消息,默认保存7天。七天到后消息会被删除,最后就保留最新的那条数据。

Kafka集群操作

1.搭建kafka集群(三个broker)

  • 创建三个server.properties文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9094
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
  • 通过命令来启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 校验是否启动成功

进入到zk中查看/brokers/ids中过是否有三个znode(0,1,2)

2.副本的概念

在创建主题时,除了指明了主题的分区数以外,还指明了副本数 replication-factor参数

如下主题,创建了两分区、三副本(副本对应集群中broker数量)

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic my-replicated-topic --partitions 2 --replication-factor 3

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个副本作为leader,其他是follower。

查看topic情况:

# 查看topic情况
./kafka-topics.sh --describe --bootstrap-server 124.222.253.33:9092 --topic my-replicated-topic

image-20230814105758165

image-20230814105921066

  • leader:

kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产⽣⼀个新的leader(follower通过poll的方式来同步数据)

  • follower:

接收leader的同步的数据,leader挂了,参与leader选举

  • replicas:

当前副本存在的broker节点

  • isr:

可以同步和已同步的broker节点会被存入到isr集合中。如果isr中的broker节点性能较差,会被踢出isr集合。

3.broker、主题、分区、副本

综上broker、主题、分区、副本概念已全部展示:

集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。

4.kafka集群消息的发送

./kafka-console-producer.sh --broker-list 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --topic my-replicated-topic

5.kafka集群消息的消费

1)普通消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --topic my-replicated-topic

2)指定消费组消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

6.分区分消费组的集群消费中的细节

image-20230814115045080

  • ⼀个partition只能被⼀个消费组中的⼀个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?(Kafka只在partition的范围内保证消息消费的局部顺序性不能在同⼀个topic中的多个partition中保证总的消费顺序性。 ⼀个消费者可以消费多个partition。)

  • partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息

  • 如果消费者挂了,那么会触发rebalance机制,会让其他消费者来消费该分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/37386.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode】242 . 有效的字母异位词

242 . 有效的字母异位词(简单) 方法:哈希表 思路 首先判断两个字符串长度是否相等,不相等直接返回 false;接下来设置一个长度为26 的哈希表,分别对应26个小写字母;遍历两个字符串,…

Go语言工程实践之测试与Gin项目实践

Go 语言并发编程 及 进阶与依赖管理_软工菜鸡的博客-CSDN博客 03 测试 回归测试一般是QA(质量保证)同学手动通过终端回归一些固定的主流程场景 集成测试是对系统功能维度做测试验证,通过服务暴露的某个接口,进行自动化测试 而单元测试开发阶段,开发者对单独的函数…

day-21 代码随想录算法训练营(19)二叉树part07

530.二叉搜索树的最小绝对差 思路一:二叉搜索树的中序遍历必为升序数组,加入数组后计算相邻两个数差值,即可求出最小绝对差 思路二:同样的思路,中序遍历,直接使用指针记录上一个节点,同时更新…

KAFKA第二课之生产者(面试重点)

生产者学习 1.1 生产者消息发送流程 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到K…

03-基础入门-搭建安全拓展

基础入门-搭建安全拓展 1、涉及的知识点2、常见的问题3、web权限的设置4、演示案例-环境搭建(1)PHPinfo(2)wordpress(3)win7虚拟机上使用iis搭建网站(4)Windows Server 2003配置WEB站…

C#应用处理传入参数 - 开源研究系列文章

今天介绍关于C#的程序传入参数的处理例子。 程序的传入参数应用比较普遍,特别是一个随操作系统启动的程序,需要设置程序启动的时候不显示主窗体,而是在后台运行,于是就有了传入参数问题,比如传入/h或者/min等等。所以此…

YOLO v8目标跟踪详细解读(二)

上一篇,结合代码,我们详细的介绍了YOLOV8目标跟踪的Pipeline。大家应该对跟踪的流程有了大致的了解,下面我们将对跟踪中出现的卡尔曼滤波进行解读。 1.卡尔曼滤波器介绍 卡尔曼滤波(kalman Filtering)是一种利用线性…

欧拉OS 使用 CentOS 7 yum repo

一、下载CentOS的repo的yum文件 任何基于CentOS的yum的repo 的url是这样的: 但欧拉OS输出这个变量为:openEuler 20.03 (LTS-SP3) 那明显欧拉想要使用这个yum的url找不到这个版本, 所以直接讲这个变量替换为 7, Centos 7的7 然后执行&…

wget 详解

wget 详解 wget 详解基本用法:命令参数:递归下载:断点续传:限速下载:后台下载: 示例 wget 详解 wget(Web Get)是一个用于从网络上下载文件的命令行工具,常用于在 Linux …

从零实战SLAM-第七课(多视角几何)

在七月算法报的班,老师讲的蛮好。好记性不如烂笔头,关键内容还是记录一下吧,课程入口,感兴趣的同学可以学习一下。 --------------------------------------------------------------------------------------------------------…

整型int溢出引起的crash

线上系统发生了crash&#xff0c;后发现是整型溢出。 1、初始化函数的伪代码&#xff1a; init_mem(int count, int size){for(int i0; i<count; i)mem_list[i] i*size; # 溢出发生的地方} 2、问题分析&#xff1a; 原有的变量 i、size 为有符号的int类型&#xff0c;i…

设计模式--策略模式

目录 一.场景 1.1场景 2.2 何时使用 2.3个人理解 二. 业务场景练习 2.1业务: 2.2具体实现 2.3思路 三.总结 3.1策略模式的特点&#xff1a; 3.2策略模式优点 3.3策略模式缺点 一.场景 1.1场景 许多相关的类仅仅是行为有异&#xff0c;也就是说业务代码需要根据场景不…

Android数字价格变化的动画效果的简单实现

原理&#xff1a;使用ValueAnimator属性动画类实现&#xff0c;它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码&#xff1a; public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…

C语言中的 RSA加密和解密算法: 深度探索与实现

C语言中的 RSA加密和解密算法: 深度探索与实现 RSA加密算法是一种非对称加密算法&#xff0c;即公开密钥加密&#xff0c;私有密钥解密。在公开密钥加密和私有密钥解密的过程中&#xff0c;密钥是不同的&#xff0c;这是与其他加密算法的主要区别。RSA算法的安全性依赖于大数分…

ssm+mybatis无法给带有下划线属性赋值问题

原因&#xff1a;mybaitis根据配置&#xff0c;将有下划线的字段名改为了驼峰格式。 具体见&#xff1a;ssmmybatis无法给带有下划线属性赋值问题&#xff0c;无法获取数据库带下划线的字段值 - 开发者博客 解决方式&#xff1a; 直接将实体类中的下划线去掉返回值使用resul…

归并排序 与 计数排序

目录 1.归并排序 1.1 递归实现归并排序&#xff1a; 1.2 非递归实现归并排序 1.3 归并排序的特性总结: 1.4 外部排序 2.计数排序 2.1 操作步骤: 2.2 计数排序的特性总结: 3. 7种常见比较排序比较 1.归并排序 基本思想: 归并排序(MERGE-SORT)是建立在归并操作上的一种…

代理技术在网络安全、爬虫和数据隐私中的多重应用

1. Socks5代理&#xff1a;灵活的数据中转 Socks5代理协议在网络通信中起着关键作用。与其他代理技术不同&#xff0c;Socks5代理不仅支持TCP连接&#xff0c;还能够处理UDP流量&#xff0c;使其在需要实时数据传输的场景中表现尤为出色。通过将请求和响应中转到代理服务器&am…

redis分布式集群-redis+keepalived+ haproxy

redis分布式集群架构&#xff08;RedisKeepalivedHaproxy&#xff09;至少需要3台服务器、6个节点&#xff0c;一台服务器2个节点。 redis分布式集群架构中的每台服务器都使用六个端口来实现多路复用&#xff0c;最终实现主从热备、负载均衡、秒级切换的目标。 redis分布式集…

使用Edge和chrom扩展工具(GoFullPage)实现整页面截图或生成PDF文件

插件GoFullPage下载&#xff1a;点击免费下载 如果在浏览网页时&#xff0c;有需要整个页面截图或导出PDF文件的需求&#xff0c;这里分享一个Edge浏览器的扩展插件&#xff1a;GoFullPage。 这个工具可以一键实现页面从上到下滚动并截取。 一、打开“管理扩展”&#xff08;…

网络设备(防火墙、路由器、交换机)日志分析监控

外围网络设备&#xff08;如防火墙、路由器、交换机等&#xff09;是关键组件&#xff0c;因为它们控制进出公司网络的流量。因此&#xff0c;监视这些设备的活动有助于 IT 管理员解决操作问题&#xff0c;并保护网络免受攻击者的攻击。通过收集和分析这些设备的日志来监控这些…