kafka使用心得(一)

kafka入门

一种分布式的、基于发布/订阅的消息系统,scala编写,具备快速、可扩展、可持久化的特点。

基本概念

topic
主题

partition
分区,一个topic下可以有多个partition,消息是分散到多个partition里存储的,partition支持水平扩展。
一个partition内的消息是有序的,partition间的消息则是无序的
每个partition会有若干副本。

broker
一个kafka节点

consumer
消费者,从topic里取得消息。
每个consumer维护自己的offset。
consumer数量小于分区数,会有一个消费者处理多个分区;反之,会有空闲的消费者,造成浪费。

producer
生产者,负责将消息写入topic

特点

基于硬盘的消息保存,避免在producer上累积消息或者消息丢失。
同一个消息可以由多个consumer消费。

可扩展性:随着数据的增加,可扩展为数十台,上百台规模的大集群。扩展可以在集群正常运行的时候进行,对于整个系统的运作没有影响;这也就意味着,对于很多台broker 的集群,如果一台broker 有故障,不影响为client 提供服务.集群如果要同时容忍更多的故障的话, 可以配置更高的replication
factors。

高性能:上面的这些特性使得Apache Kafka 成为一个能够在高负载的情况下表现出优越性能的发布-订阅消息系统。Producer, consumer 和broker 都能在大数据流的情况下轻松的扩展.

kafka的版本

比如kafka_2.10-0.10.2.0
这里,2.10指的是编译kafka的scala版本,真正的kafka版本号是后面的:0.10.2.0

配置kafka

config/server.properties文件里的配置项说明:
broker.id
每个kafka 的broker 都需要有一个整型的唯一标识,这个标识通过broker.id 来设置。默认的情况下,这个数字是0,但是它可以设置成任何值。需要注意的
是,需要保证集群中这个id 是唯一的。这个值是可以任意填写的,并且可以在必要的时候从broker 集群中删除。比较好的做法是使用主机名相关的标识来做
为id,比如,你的主机名当中有数字相关的信息,如hosts1.example.com,host2.example.com,那么这个数字就可以用来作为broker.id 的值。

port
默认启动kafka 时,监听的是TCP 的9092 端口,端口号可以被任意修改。如果端口号设置为小于1024,那么kafka 需要以root 身份启动。但是并不推荐以root 身份启动。

zookeeper.connect
这个参数指定了Zookeeper 所在的地址,它存储了broker 的元信息。默认是运行在本机的2181 端口上,因此这个值被设置成
localhost:2181。这个值可以通过分号设置多个值,每个值的格式都是hostname:port/path

log.dirs
这个参数用于配置Kafka 保存数据的位置,Kafka 中所有的消息都会存在这个目录下。可以通过逗号来指定多个目录,kafka 会根据最少被使用的原则选择目录分配新的partition。注意kafka 在分配partition 的时候选择的规则不是按照磁盘的空间大小来定的,而是分配的parition 的个数大小。

num.recovery.thread.per.data.dir
kafka 可以配置一个线程池,线程池的使用场景如下:

  • 当正常启动的时候,开启每个parition 的文档块
  • 当失败后重启时,检查parition 的文档块
  • 当关闭kafka 的时候,清除关闭文档块
    默认,每个目录只有一个线程。最好是设置多个线程数,这样在服务器启动或者关闭的时候,都可以并行的进行操作。尤其是当非正常停机后,重启时,如果有大量的分区数,那么启动broker 将会花费大量的时间。
    【注意】这个参数是针对每个目录的。比如,num.recovery.threads.per.data.dir 设置为8,如果有3个log.dirs 路径,那么一共会有24 个线程。

num.partitions
这个参数用于配置新创建的topic 有多少个分区,默认是1 个。注意partition 的个数只可以被增加,不能被减少。这就意味着如果想要减少主题的分区数,那
么就需要重新创建topic

在第一章中介绍过,kafka 通过分区来对topic 进行扩展,因此需要使用分区的个数来做负载均衡,如果新增了broker,那么就会引发重新负载分配。这并不意味着所有的主题的分区数都需要大于broker 的数量,因为kafka 是支持多个主题的,其他的主题会使用其余的broker。需要注意的是,如果消息的吞吐量很高,那么可以通过设置一个比较大的分区数,来分摊压力。

log.retention.ms
这个参数用于配置kafka 中消息保存的时间,也可以使用log.retention.hours,默认这个参数是168 个小时,即一周。另外,还支持log.retention.minutes 和log.retention.ms。这三个参数都会控制删除过期数据的时间,推荐还是使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。

log.retention.bytes
这个参数也是用来配置消息过期的,它会应用到每个分区,比如,你有一个主题,有8 个分区,并且设置了log.retention.bytes 为1G,那么这个主题总共可以保留8G 的数据。注意,所有的过期配置都会应用到patition 粒度,而不是主题粒度。这也意味着,如果增加了主题的分区数,那么主题所能保留的数据也就随之增加了。
如果设置了log.retention.bytes 和log.retention.ms(或者其他过期时间的配置),只要满足其中一个条件,消息就会被删除。

log.segment.bytes
这个参数用来控制log 段文件的大小,而不是消息的大小。在kafka 中,所有的消息都会进入broker,然后以追加的方式追加到分区当前最新的segment 段文件中。一旦这个段文件到达log.segment.bytes 设置的大小,比如默认的1G,这个段文件就会被关闭,然后创建一个新的。一旦这个文件被关闭,就可以理解成这个文件已经过期了。这个参数设置的越小,那么关闭文件创建文件的操作就会越频繁,这样也会造成大量的磁盘读写的开销。
通过生产者发送过来的消息的情况可以判断这个值的大小。比如,主题每天接收100M 的消息,并且log.segment.bytes 为默认设置,那么10 天后,这个段文件才会被填满。由于段文件在没有关闭的时候,是不能删除的,log.retention.ms 又是默认的设置,那么这个消息将会在17 天后,才过期删除。因为10 天后,段文件才关闭。再过7 天,这个文件才算真正过期,才能被清除。

message.max.bytes
这个参数用于限制生产者消息的大小,默认是1000000,也就是1M。生产者在发送消息给broker 的时候,如果出错,会尝试重发;但是如果是因为大小的原因,那生产者是不会重发的。另外,broker上的消息可以进行压缩,这个参数是指压缩后的大小,这样能多存储很多消息。
需要注意的是,允许发送更大的消息会对性能有很大影响。更大的消息,就意味着broker 在处理网络连接的时候需要更长的时间,它也会增加磁盘的写操作压力,影响IO 吞吐量。

启动kafka

进入kafka/bin目录。

启动zk

nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

测试zk是否启动

telnet localhost 2181
输入srvr,应该会返回:

[2017-12-01 15:59:18,829] INFO Processing srvr command from /0:0:0:0:0:0:0:1:40194 (org.apache.zookeeper.server.NIOServerCnxn)
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0xdf
Mode: standalone
Node count: 127
[2017-12-01 15:59:18,833] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:40194 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
Connection closed by foreign host.

不建议zookeeper运行在多于7个的节点上,因为集群性能会因一致性的特征而降低。

启动kafka

nohup  ./kafka-server-start.sh -daemon ../config/server.properties

以守护进程方式执行。

topic管理命令

创建具有指定数量分区(或复制因子)的topic
./kafka-topics.sh --create --zookeeper xx.xx.xx.xx:2181 --topic test1 --replication-factor 1 --partitions 6

查看topic的元信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1

查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list

修改已有topic的分区

./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test1 --partitions 4./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.ms=5000./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.messages=20000

删除topic

./kafka-topics.sh --delete --zookeeper xx.xx.xx.xx:2181 --topic test1

删除topic并不是真的删除,只是打个标记,要真正删除,需要同时修改server.properties:

delete.topic.enable=true

并重启kafka才能生效。

向topic发布消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
输入:
msg1
msg2
按^D停止发送。

若kafka的server.properties配置了host.name,则localhost必须改成host.name的值,例如:
./kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic test1

查看topic里的消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
输出:
msg1
msg2
按^C停止接收。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39156.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

剑指Offer48.最长不含重复字符的子字符串 C++

1、题目描述 请从字符串中找出一个最长的不包含重复字符的子字符串,计算该最长子字符串的长度。 示例 1: 输入: “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc”,所以其长度为 3。 示例 2: 输入: “bbbbb” 输出: 1 解释: 因为无重复字…

图像处理技巧形态学滤波之膨胀操作

1. 引言 欢迎回来,我的图像处理爱好者们!今天,让我们继续研究图像处理领域中的形态学计算。在本篇中,我们将重点介绍腐蚀操作的反向效果膨胀操作。 闲话少说,我们直接开始吧! 2. 膨胀操作原理 膨胀操作…

macOS CLion 使用 bits/stdc++.h

macOS 下 CLion 使用 bits/stdc.h 头文件 terminal运行 brew install gccCLion里配置 -D CMAKE_CXX_COMPILER/usr/local/bin/g-11

Visual Studio 2022 中解决使用scanf报错的方法(一劳永逸)

目录 【前言】 一、scanf报错示例 二、解决使用scanf报错的方法 解决方法1(不推荐) 解决方法2(不推荐) 解决方法3(强烈推荐) 第一步 第二步 第三步 三、效果演示(方法三) …

根据一棵树的两种遍历构造二叉树

题目 给定两个整数数组 preorder 和 inorder ,其中 preorder 是二叉树的先序遍历, inorder 是同一棵树的中序遍历,请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9,3,15,20,7] 输出: [3,9,20,null,null,…

Unity-Linux部署WebGL项目MIME类型添加

在以往的文章中有提到过使用IIS部署WebGL添加MIME类型使WebGL项目在浏览器中能够正常加载,那么如果咱们做的是商业项目,往往是需要部署在学校或者云服务器上面的,大部分情况下如果项目有接口或者后台管理系统,后台基本都会使用Lin…

机器学习笔记:李宏毅ChatGPT Finetune VS Prompt

1 两种大语言模型:GPT VS BERT 2 对于大语言模型的两种不同期待 2.1 “专才” 2.1.1 成为专才的好处 Is ChatGPT A Good Translator? A Preliminary Study 2023 Arxiv 箭头方向指的是从哪个方向往哪个方向翻译 表格里面的数值越大表示翻译的越好 可以发现专门做翻…

Ceph入门到精通-Linux下Ceph源码编译和GDB调试

Ceph版本:14.2.22 Linux版本:ubuntu-server 18.04 第一部分 下载Ceph源码 1.1 配置Ceph源码镜像源 Ceph源码是托管在Github上,由于某些原因,国内访问Github网站很慢,所以需要从其他途径加速获取源码。Github官方给出…

【ubuntu18.04】01-network-manager-all.yaml和interfaces和resolv.conf各有什么区别和联系

文章目录 01-network-manager-all.yaml、interfaces 和 resolv.conf 是与网络配置相关的文件,它们在网络设置中有着不同的作用和使用方式。 01-network-manager-all.yaml: 这是一个配置文件,通常在 Ubuntu 系统上使用 NetworkManager 进行网络管理时使用…

ChatGPT​保密吗?它有哪些潜在风险?如何规避?

自2022年11月公开发布以来,ChatGPT已成为许多企业和个人的必备工具,但随着该技术越来越多地融入我们的日常生活,人们很自然地想知道:ChatGPT是否是保密的。 问:ChatGPT保密吗? 答:否&#xff0…

C++11并发与多线程笔记(3)线程传参详解,detach()大坑,成员函数做线程函数

C11并发与多线程笔记(3)线程传参详解,detach 大坑,成员函数做线程函数 1、传递临时对象作为线程参数1.1 要避免的陷阱11.2 要避免的陷阱21.3 总结 2、临时对象作为线程参数2.1 线程id概念2.2 临时对象构造时机抓捕 3、传递类对象…

VR时代真的到来了?

业界对苹果的期待是,打造一台真正颠覆性的,给头显设备奠定发展逻辑底座的产品,而实际上,苹果只是发布了一台更强大的头显。 大众希望苹果回答的问题是“我为什么需要一台AR或者VR产品?”,但苹果回答的是“…

从零开始学习 Java:简单易懂的入门指南之MAth、System(十二)

常见API,MAth、System 1 Math类1.1 概述1.2 常见方法1.3 算法小题(质数)1.4 算法小题(自幂数) 2 System类2.1 概述2.2 常见方法 1 Math类 1.1 概述 tips:了解内容 查看API文档,我们可以看到API文档中关于Math类的定义如下: Math类…

每天一道leetcode:300. 最长递增子序列(动态规划中等)

今日份题目: 给你一个整数数组 nums ,找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列,删除(或不删除)数组中的元素而不改变其余元素的顺序。例如,[3,6,2,7] 是数组 [0,3,1,6,2,2,7] …

【JavaEE进阶】SpringBoot项目的创建

文章目录 一. SpringBoot简介1. 什么是SpringBoot?2. SpringBoot的优点 二. SpringBoot项目创建1. 使用IDEA创建2. 使用网页创建SpringBoot项目 三. 运行SpringBoot项目 一. SpringBoot简介 1. 什么是SpringBoot? Spring Boot 是一个用于快速构建基于 Spring 框架的应用程序…

Spring对象装配

在spring中,Bean的执行流程为启动spring容器,实例化bean,将bean注册到spring容器中,将bean装配到需要的类中。 既然我们需要将bea装配到需要的类中,那么如何实现呢?这篇文章,将来阐述一下如何实…

SOFABoot——基本使用(笔记)

文章目录 一、前言二、快速开始2.1 基本搭建2.2 测试是否成功2.3 其他部分日志测试异步启动 三、SOFABoot的模块化开发3.1 基于Spring上下文的隔离3.2 Root Application Context3.3 模块并行化启动3.4 JVM服务与RPC服务的发布与引用3.5 模块配置Module-NameRequire-ModuleSprin…

wsl2安装mysql环境

安装完mysql后通过如下命令启动mysql service mysql start 会显示如下错误: mysql: unrecognized service 实际上上面显示的错误是由于mysql没有启动成功造成的 我们要想办法成功启动mysql才可以 1.通过如下操作就可以跳过密码直接进入mysql环境 2.如果想找到my…

微服务与Nacos概述-5

引入OpenFeign 添加依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>com.alibaba.cloud</groupId>…

“记账”很麻烦,看这场竞赛中的队伍与合合信息是如何解决问题的

在我们日常生活中或多或少都会有记账的情况&#xff0c;以此来对自己的收支和消费习惯进行分析&#xff0c;来帮助自己减少不必要的开支&#xff0c;优化财务决策、合理分配资金&#xff0c;减少财务压力和不必要的浪费。 但记账这个动作本身就是一件比较麻烦的。虽然现阶段有…