使用Linux部署Kafka教程

目录

一、部署Zookeeper

1 拉取Zookeeper镜像

2 运行Zookeeper

二、部署Kafka

1 拉取Kafka镜像

2 运行Kafka

三、验证是否部署成功

1 进入到kafka容器中

2 创建topic 生产者

3 生产者发送消息

4 消费者消费消息

四、搭建kafka管理平台

五、SpringBoot整合Kafka 

1、导入依赖

2、修改配置

3、生产者

 4、消费者

5、测试发送消息

 6、测试收到消息


一、部署Zookeeper

1 拉取Zookeeper镜像

docker pull wurstmeister/zookeeper
  • 1

2 运行Zookeeper

docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2  \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper

二、部署Kafka

1 拉取Kafka镜像

docker pull wurstmeister/kafka

2 运行Kafka

docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \-p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-v /etc/localtime:/etc/localtime \-d wurstmeister/kafka

参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

三、验证是否部署成功

1 进入到kafka容器中

docker exec -it kafka /bin/sh

2 创建topic 生产者

cd opt/kafka_2.13-2.8.1bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic

在这里插入图片描述

3 生产者发送消息

bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic

在这里插入图片描述

4 消费者消费消息

  • 新打开个ssh窗口
  • 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning

在这里插入图片描述

四、搭建kafka管理平台

 docker search kafdrop

docker run -d --rm  -p 9000:9000 \-e JVM_OPTS="-Xms32M -Xmx64M" \-e KAFKA_BROKERCONNECT=<host:port,host:port> \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留上面的命令是百度的以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \-e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \-e KAFKA_BROKERCONNECT=192.168.58.130:9092 \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动

访问地址:Kafdrop: Broker List 

五、SpringBoot整合Kafka 

1、导入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、修改配置

spring:kafka:bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

本次测试:linux地址:192.168.58.130

spring.kafka.bootstrap-servers=192.168.58.130:9092

advertised.listeners=192.168.58.130:9092

3、生产者

import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 事件的生产者*/
@Slf4j
@Component
public class KafkaProducer {@Autowiredpublic KafkaTemplate kafkaTemplate;/** 主题 */public static final String TOPIC_TEST = "Test";/** 消费者组 */public static final String TOPIC_GROUP = "test-consumer-group";public void send(Object obj){String obj2String = JSON.toJSONString(obj);log.info("准备发送消息为:{}",obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);//回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());}});}}

 4、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 事件消费者*/
@Component
public class KafkaConsumer {private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);@KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}

5、测试发送消息

@Testvoid kafkaTest(){kafkaProducer.send("Hello Kafka");}

 6、测试收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/54327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT(C++)-QTreeview节点折叠与展开

文章目录 1、前言2、QTreeview全部展开与折叠3、QTreeview某个节点展开与折叠3.1 节点折叠与展开的信号与槽3.2 槽函数的实现3.3 某个节点展开与折叠 1、前言 最近要用QT开发项目&#xff0c;对QT不是很熟&#xff0c;就根据网上的查到的知识和自己的摸索&#xff0c;将一些经…

大彩串口屏使用记录

写在最前面 屏幕型号 DC10600M070 IDE VisualTFT&#xff08;官方&#xff09; VSCode&#xff08;lua编程&#xff09; 用之前看一下官方那个1小时的视频教程就大概懂控件怎么用了&#xff0c;用官方的软件VisualTFT很简单 本文只是简单记录遇到的一些坑 lua编辑器 VisualTF…

内嵌功能强大、低功耗STM32WB55CEU7、STM32WB55CGU7 射频微控制器 - MCU, 48-UFQFN

一、概述&#xff1a; STM32WB55xx多协议无线和超低功耗器件内嵌功能强大的超低功耗无线电模块&#xff08;符合蓝牙 低功耗SIG规范5.0和IEEE 802.15.4-2011标准&#xff09;。该器件内含专用的Arm Cortex -M0&#xff0c;用于执行所有的底层实时操作。这些器件基于高性能Arm …

铝合金表面处理方法调研总结

近期产品外壳的表面处理工艺造成的电磁兼容和应用领域受限的问题引起我们研发小组的注意&#xff0c;故而对铝合金外壳的表面处理工艺进行一次调研&#xff0c;通过调研学习帮助后续产品外壳加工工艺的选择和产品性能改进。 铝合金虽然不会生锈&#xff0c;但会在空气中与氧气发…

XLua框架使用

一、XLua集成第三方C库 1、XLua集成RapidJson与protobuf&#xff1a;跳转链接 2、XLua常用库集成&#xff08;lua-protobuf、LuaSocket、RapidJson、LPeg&#xff09;&#xff1a;跳转链接 3、集成第三方库常遇到的问题 A、mac上sh编译脚本运行一次后要关闭命令行窗口&#…

TensorFlow中slim包的具体用法

TensorFlow中slim包的具体用法 1、训练脚本文件&#xff08;该文件包含数据下载打包、模型训练&#xff0c;模型评估流程&#xff09;3、模型训练1、数据集相关模块&#xff1a;2、设置网络模型模块3、数据预处理模块4、定义损失loss5、定义优化器模块 本次使用的TensorFlow版本…

Redis五大数据类型

Redis五大数据类型 Redis-Key 官网&#xff1a;https://www.redis.net.cn/order/ 序号命令语法描述1DEL key该命令用于在 key 存在时删除 key2DUMP key序列化给定 key &#xff0c;并返回被序列化的值3EXISTS key检查给定 key 是否存在&#xff0c;存在返回1&#xff0c;否则返…

yolov8热力图可视化

安装pytorch_grad_cam pip install grad-cam自动化生成不同层的bash脚本 # 循环10次&#xff0c;将i的值从0到9 for i in $(seq 0 13) doecho "Running iteration $i";python yolov8_heatmap.py $i; done热力图生成python代码 import warnings warnings.filterwarn…

vscode流程图插件使用

vscode流程图插件使用 1.在vscode中点击左下角设置然后选择扩展。 2.在扩展中搜索Draw.io Integration&#xff0c;安装上面第一个插件。 3.安装插件后在工程中创建一个后缀为drawio的文件并且双击打开即可绘制流程图

2023-08-26 LeetCode每日一题(汇总区间)

2023-08-26每日一题 一、题目编号 228. 汇总区间二、题目链接 点击跳转到题目位置 三、题目描述 给定一个 无重复元素 的 有序 整数数组 nums 。 返回 恰好覆盖数组中所有数字 的 最小有序 区间范围列表 。也就是说&#xff0c;nums 的每个元素都恰好被某个区间范围所覆盖…

如何在地图上寻找最密集点的位置?

最近我在工作中遇到了一个小的需求点&#xff0c;大概是需要在地图上展示出一堆点中的点密度最密集的位置。最开始没想到好的方法&#xff0c;就使用了一个非常简单的策略——所有点的坐标求平均值&#xff0c;这个方法大部分的时候好用&#xff0c;因为大部分城市所有点位基本…

深度学习4. 循环神经网络 – Recurrent Neural Network | RNN

目录 循环神经网络 – Recurrent Neural Network | RNN 为什么需要 RNN &#xff1f;独特价值是什么&#xff1f; RNN 的基本原理 RNN 的优化算法 RNN 到 LSTM – 长短期记忆网络 从 LSTM 到 GRU RNN 的应用和使用场景 总结 百度百科维基百科 循环神经网络 – Recurre…

深度学习书籍

简单的图像分类 《深度学习与图像识别&#xff1a;原理与实践》 python基础 《python编程:从入门到实践》 numpy https://numpy.org/ 《NumPy攻略&#xff1a;Python科学计算与数据分析》 tensorflow 《TensorFlow深度学习——手把手教你掌握100个精彩案例》 深度学习 …

【手写promise——基本功能、链式调用、promise.all、promise.race】

文章目录 前言一、前置知识二、实现基本功能二、实现链式调用三、实现Promise.all四、实现Promise.race总结 前言 关于动机&#xff0c;无论是在工作还是面试中&#xff0c;都会遇到Promise的相关使用和原理&#xff0c;手写Promise也有助于学习设计模式以及代码设计。 本文主…

nethogs交叉编译

前文&#xff1a; 网上实在找不到交叉编译的案例&#xff0c;只能自己手动搞了 参考&#xff1a;iftop交叉编译(ncurses-5.9、pcap-1.8.1、iftop-1.0)_iftop 编译_sinat_25505501的博客-CSDN博客 参照上述链接&#xff0c;自己编译好ncurses和libpcap 我这边用的是 ncurses6…

leetcode分类刷题:矩阵顺时针模拟

1、这种题目是对代码熟练度考察&#xff0c;模拟顺时针建立或访问矩阵&#xff0c;需要注意矩阵是否为方阵 2、具体思路&#xff1a;以圈数为循环条件&#xff0c;每一圈都坚持左闭右开的区间规则&#xff1b;当小的行列值为奇数&#xff0c;最后一圈为一行或一列或一个数字的不…

WPF基础入门-Class5-WPF命令

WPF基础入门 Class5-WPF命令 1、xaml编写一个button&#xff0c;Command绑定一个命令 <Grid><ButtonWidth"100"Height"40" Command"{Binding ShowCommand}"></Button> </Grid>2、编写一个model.cs namespace WPF_Le…

我的编程学习过程

自信与经验 在毕业的时候就觉得繁体字很难。大陆都在使用简体字&#xff0c;戴季陶说这是在亡国&#xff0c;没有这么严 重。繁体字会意&#xff0c;简体字简单&#xff0c;中国文盲很多&#xff0c;为了加快经济建设的步伐&#xff0c;不得不牺牲很多 东西。为了解决温饱&…

【LeetCode-面试经典150题-day15】

目录 104.二叉树的最大深度 100.相同的树 226.翻转二叉树 101.对称二叉树 105.从前序与中序遍历序列构造二叉树 106.从中序与后序遍历序列构造二叉树 117.填充每个节点的下一个右侧节点指针Ⅱ 104.二叉树的最大深度 题意&#xff1a; 给定一个二叉树 root &#xff0c;返回其…

STM32F103 4G Cat.1模块EC200S使用

一、简介 EC200S-CN 是移远通信最近推出的 LTE Cat 1 无线通信模块&#xff0c;支持最大下行速率 10Mbps 和最大上行速率 5Mbps&#xff0c;具有超高的性价比&#xff1b;同时在封装上兼容移远通信多网络制式 LTE Standard EC2x&#xff08;EC25、EC21、EC20 R2.0、EC20 R2.1&a…