docker安装kafka,并通过springboot快速集成kafka

目录

一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

2.运行 Zookeeper 容器

3.拉取 Kafka 的 Docker 镜像

4.运行 Kafka 容器

5.下载 Kafdrop

6.运行 Kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

2. 配置Kafka

3. 创建消息对象

4. 创建生产者

5. 创建消费者

6. 测试

三、web访问Kafdrop


一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

docker pull wurstmeister/zookeeper

2.运行 Zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

3.拉取 Kafka 的 Docker 镜像

docker pull wurstmeister/kafka

4.运行 Kafka 容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka

5.下载 Kafdrop

docker pull obsidiandynamics/kafdrop

6.运行 Kafdrop

docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

下载:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
(kafdrop是一个kafka的web图形管理界面)
docker pull obsidiandynamics/kafdrop
打包:
docker save -o ./zookeeper.tar wurstmeister/zookeeper
docker save -o ./kafka.tar wurstmeister/kafka
docker save -o ./kafdrop.tar obsidiandynamics/kafdrop
传输:
scp kafka.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp zookeeper.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp kafdrop.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可

目标机加载成docker镜像
docker load -i /usr/root/kafka/kafka.tar
docker load -i /usr/root/kafka/zookeeper.tar
docker load -i /usr/root/kafka/kafdrop.tar
查看镜像列表
docker images

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092

这个命令,会启动一个额外的 Kafka 消费者来监听 users 主题。这个消费者是通过 Kafka 自带的 kafka-console-consumer.sh 工具启动的,主要用于测试和验证目的。它会持续监听并打印出发送到 users 主题的所有消息。

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

首先,在你的pom.xml文件中添加Kafka的依赖:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

2. 配置Kafka

在application.properties或application.yml文件中配置Kafka的相关属性。这里以application.properties为例:

# Kafka broker地址

spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# 消费者配置

spring.kafka.consumer.group-id=my-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*

3. 创建消息对象

假设我们要发送和接收一个简单的KafkaMsgs 对象:

public class KafkaMsgs {

    private String id;

    private String msg;

    private Long date;

    // 构造函数、getter和setter省略

}

4. 创建生产者

创建一个生产者类来发送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducer {

    @Autowired

    private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;

    public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {

        kafkaTemplate.send(topic, kafkaMsgs);

    }

}

5. 创建消费者

创建一个消费者类来接收消息:

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

@Service

public class KafkaConsumer {

    @KafkaListener(topics = "users", groupId = "my-group")

    public void listen(KafkaMsgs kafkaMsgs) {

        System.out.println("Received message: " + kafkaMsgs);

    }

}

6. 测试

你可以创建一个简单的测试类来验证生产和消费是否正常工作:

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.esop.resurge.core.config.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.airbubble.kingdom.army.reponse.FeedBack;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags="kafka数据控制器")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
    @Autowired
    KafkaProducer kafkaProducer;

    @ApiOperation(value = "测试发送数据到kafka", httpMethod = "GET")
    @GetMapping(value = "/sendKafkaData")
    public FeedBack<String> sendKafkaData(
            @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,
            @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg
    ) throws Exception {
        kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(
                IdUtil.fastUUID(),
                msg,
                Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))
        ));
        return FeedBack.getInstance("发送成功");
    }

}

三、web访问Kafdrop

 打开浏览器,访问 http://192.168.7.46:9000,你应该能够看到 Kafdrop 的 Web 界面

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/70155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 与 Java 的对比分析:除法运算中的错误处理

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: Java 文章目录 &#x1f4af;前言&#x1f4af;C中的除法错误处理&#x1f4af;Java中的除法错误处理&#x1f4af;C与Java错误处理的对比&#x1f4af;错误处理的优化和实践&#x1f4af;小结 &#x1f4af;前言 在…

LLM之循环神经网络(RNN)

在人工智能的领域中&#xff0c;神经网络是推动技术发展的核心力量。今天&#xff0c;让我们深入探讨循环神经网络&#xff08;RNN&#xff09; 一、神经网络基础 &#xff08;1&#xff09;什么是神经网络 神经网络&#xff0c;又称人工神经网络&#xff0c;其设计灵感源于人…

SQL sever数据导入导出实验

1.创建数据库TCP-H &#xff08;1&#xff09;右键“数据库”&#xff0c;点击“新建数据库”即可 &#xff08;2&#xff09;用sql语言创建&#xff0c;此处以创建数据库DB_test为例&#xff0c;代码如下&#xff1a; use master;go--检查在当前服务器系统中的所有数据里面…

让编程变成一种享受-明基RD320U显示器

引言 作为一名有着多年JAVA开发经验的从业者&#xff0c;在工作过程中&#xff0c;显示器的重要性不言而喻。它不仅是我们与代码交互的窗口&#xff0c;更是影响工作效率和体验的关键因素。在多年的编程生涯中&#xff0c;我遇到过各种各样的问题。比如&#xff0c;在进行代码…

计算机网络(涵盖OSI,TCP/IP,交换机,路由器,局域网)

一、网络通信基础 &#xff08;一&#xff09;网络通信的概念 网络通信是指终端设备之间通过计算机网络进行的信息传递与交流。它类似于现实生活中的物品传递过程&#xff1a;数据&#xff08;物品&#xff09;被封装成报文&#xff08;包裹&#xff09;&#xff0c;通过网络…

图像处理篇---基本OpenMV图像处理

文章目录 前言1. 灰度化&#xff08;Grayscale&#xff09;2. 二值化&#xff08;Thresholding&#xff09;3. 掩膜&#xff08;Mask&#xff09;4. 腐蚀&#xff08;Erosion&#xff09;5. 膨胀&#xff08;Dilation&#xff09;6. 缩放&#xff08;Scaling&#xff09;7. 旋转…

SpringMVC重定向接口,参数暴露在url中解决方案!RedirectAttributes

OK&#xff0c;首先描述下业务场景&#xff0c;终端数量限制登录 1.首先访问项目login的get接口 2.输入账号密码点击登录后&#xff0c;会请求login的POST接口 3.后台对终端数量逻辑处理不允许登录跳回到登录页面 4.因代码原因需在后台进行多次重定向接口&#xff0c;最后跳…

Spring Boot01(注解、)---java八股

Spring Boot中常用注解及其底层实现 1、SpringBootApplication注解&#xff1a; SpringBootApplication注解&#xff1a;这个注解标识了一个SpringBoot工程&#xff0c;它实际上是另外三个注解的组合&#xff0c;这三个注解是&#xff1a; aSpringBootConfiguration&#xff1a…

✨2.快速了解HTML5的标签类型

✨✨HTML5 的标签类型丰富多样&#xff0c;每种类型都有其独特的功能和用途&#xff0c;以下是一些常见的 HTML5 标签类型介绍&#xff1a; &#x1f98b;结构标签 &#x1faad;<html>&#xff1a;它是 HTML 文档的根标签&#xff0c;所有其他标签都包含在这个标签内&am…

eNSP防火墙综合实验

一、实验拓扑 二、ip和安全区域配置 1、防火墙ip和安全区域配置 新建两个安全区域 ip配置 Client1 Client2 电信DNS 百度web-1 联通DNS 百度web-2 R2 R1 三、DNS透明代理相关配置 1、导入运营商地址库 2、新建链路接口 3、配置真实DNS服务器 4、创建虚拟DNS服务器 5、配置D…

Linux 配置交换空间(Swap)解决内存不足

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 仓库主页&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 欢迎点赞 &#x1f44d; 收藏 ⭐评论 …

个人shell脚本分享

在周一到周五做增量备份&#xff0c;在周六周日做完全备份 #!/bin/bash定义变量 SRC“/path/to/source” # 源目录 BKUP“/backup” # 备份主目录 FUL“KaTeX parse error: Expected EOF, got # at position 22: …ull" #̲ 完全备份目录 INC"BKUP/inc” # 增量备份…

Django 5 实用指南(一)安装与配置

1.1 Django5的背景与发展 Django 自从2005年由Adrian Holovaty和Simon Willison在 Lawrence Journal-World 新闻网站上首次发布以来&#xff0c;Django 一直是 Web 开发领域最受欢迎的框架之一。Django 框架经历了多个版本的演进&#xff0c;每次版本更新都引入了新功能、改进了…

百度搜索融合 DeepSeek 满血版,开启智能搜索新篇

百度搜索融合 DeepSeek 满血版&#xff0c;开启智能搜索新篇 &#x1f680; &#x1f539; 一、百度搜索全量接入 DeepSeek &#x1f539; 百度搜索迎来重要升级&#xff0c;DeepSeek 满血版全面上线&#xff01;&#x1f389; 用户在百度 APP 搜索后&#xff0c;点击「AI」即…

RabbitMQ服务异步通信

消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 1. 消息可靠性 消息从发送&#xff0c;到消费者接收&#xff0c;会经理多个过程&#xff1a; 其中的每一步都可能导致消息丢失&#xff0c;常见的丢失原因包括&#xff1a; 发送时丢失&#xff1a; 生…

【教程】MySQL数据库学习笔记(七)——多表操作(持续更新)

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 【MySQL数据库学习】系列文章 第一章 《认识与环境搭建》 第二章 《数据类型》 第三章 《数据定义语言DDL》 第四章 《数据操…

胶囊网络动态路由算法:突破CNN空间局限性的数学原理与工程实践

一、CNN的空间局限性痛点解析 传统CNN的瓶颈&#xff1a; 池化操作导致空间信息丢失&#xff08;最大池化丢弃85%激活值&#xff09;无法建模层次空间关系&#xff08;旋转/平移等变换不敏感&#xff09;局部感受野限制全局特征整合 示例对比&#xff1a; # CNN最大池化示例…

#渗透测试#批量漏洞挖掘#Apache Log4j反序列化命令执行漏洞

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。 目录 Apache Log4j反序列化命令执行漏洞 一、…

深入剖析Spring MVC

一、Spring MVC 概述 1. 什么是 Spring MVC&#xff1f; Spring MVC 是基于 Spring 框架的 Web 框架&#xff0c;它实现了 MVC 设计模式&#xff0c;将应用程序分为三个核心部分&#xff1a; Model&#xff1a;封装应用程序的数据和业务逻辑。 View&#xff1a;负责渲染数据…

机器学习入门-读书摘要

先看了《深度学习入门&#xff1a;基于python的理论和实践》这本电子书&#xff0c;早上因为入迷还坐过站了。。 因为里面的反向传播和链式法则特别难懂&#xff0c;又网上搜了相关内容进行进一步理解&#xff0c;参考的以下文章&#xff08;个人认为都讲的都非常好&#xff0…