JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。

  • 一、Kafka 简介
  • 二、Kafka 功能
  • 三、POM依赖
  • 四、配置文件
  • 五、生产者
  • 六、消费者

君子之学贵一,一则明,明则有功

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>

四、配置文件

spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic   消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/68332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 无缝集成SpringAI的函数调用模块

这是一个 完整的 Spring AI 函数调用实例&#xff0c;涵盖从函数定义、注册到实际调用的全流程&#xff0c;以「天气查询」功能为例&#xff0c;结合代码详细说明&#xff1a; 1. 环境准备 1.1 添加依赖 <!-- Spring AI OpenAI --> <dependency><groupId>o…

媒体新闻发稿要求有哪些?什么类型的稿件更好通过?

为了保证推送信息的内容质量&#xff0c;大型新闻媒体的审稿要求一向较为严格。尤其在商业推广的过程中&#xff0c;不少企业的宣传稿很难发布在这些大型新闻媒体平台上。 媒体新闻发稿要求有哪些&#xff1f;就让我们来了解下哪几类稿件更容易过审。 一、媒体新闻发稿要求有哪…

ui-automator定位官网文档下载及使用

一、ui-automator定位官网文档简介及下载 AndroidUiAutomator&#xff1a;移动端特有的定位方式&#xff0c;uiautomator是java实现的&#xff0c;定位类型必须写成java类型 官方地址&#xff1a;https://developer.android.com/training/testing/ui-automator.html#ui-autom…

ThreadLocal概述、解决SimpleDateFormat出现的异常、内存泄漏、弱引用、remove方法

①. ThreadLocal简介 ①. ThreadLocal是什么 ①. ThreadLocal本地线程变量,线程自带的变量副本(实现了每一个线程副本都有一个专属的本地变量,主要解决的就是让每一个线程绑定自己的值,自己用自己的,不跟别人争抢。通过使用get()和set()方法,获取默认值或将其值更改为当前线程…

总结8..

#include <stdio.h> // 定义结构体表示二叉树节点&#xff0c;包含左右子节点编号 struct node { int l; int r; } tree[100000]; // 全局变量记录二叉树最大深度&#xff0c;初始为0 int ans 0; // 深度优先搜索函数 // pos: 当前节点在数组中的位置&#xff0c…

科普篇 | “机架、塔式、刀片”三类服务器对比

一、引言 在互联网的世界里&#xff0c;服务器就像是默默运转的超级大脑&#xff0c;支撑着我们日常使用的各种网络服务。今天&#xff0c;咱们来聊聊服务器家族中的三位 “明星成员”&#xff1a;机架式服务器、塔式服务器和刀片式服务器。如果把互联网比作一座庞大的城市&…

动手学图神经网络(2):跆拳道俱乐部案例实战

动手学图神经网络(2):跆拳道俱乐部案例实战 在深度学习领域,图神经网络(GNNs)能将传统深度学习概念推广到不规则的图结构数据,使神经网络能够处理对象及其关系。将基于 PyTorch Geometric 库,一步步探索图神经网络的奥秘。 安装必要的包 首先, 安装所需的 Python 包…

【vue3组件】【大文件上传】【断点续传】支持文件分块上传,能够在上传过程中暂停、继续上传的组件

一、概述 本示例实现了一个基于 Vue3 和 TypeScript 的断点上传功能。该功能支持文件分块上传&#xff0c;能够在上传过程中暂停、继续上传&#xff0c;并且支持检测已经上传的分块&#xff0c;避免重复上传&#xff0c;提升上传效率。以下是关键的技术点与实现流程&#xff1…

OpenCV 版本不兼容导致的问题

问题和解决方案 今天运行如下代码&#xff0c;发生了意外的错误&#xff0c;代码如下&#xff0c;其中输入的 frame 来自于 OpenCV 开启数据流的读取 """ cap cv2.VideoCapture(RTSP_URL) print("链接视频流完成") while True:ret, frame cap.rea…

Day25-【13003】短文,什么是算法?如何衡量时间复杂度?什么是最优,平均时间复杂度?

文章目录 第二节概览什么是算法&#xff1f;算法的5个特性&#xff1f; 算法如何评估&#xff1f;时间指标如何衡量&#xff1f;算法的复杂度如何度量&#xff1f;算法开销上限和下限如何表示&#xff1f;什么是常数复杂度&#xff1f;线性操作&#xff1f;对数复杂度-线性对数…

python基础语法(3) -------- 学习笔记分享

目录: 1. 函数 1.1 语法格式 1.2 函数参数 1.3 函数返回值 1.4 变量的作用域 1.5 函数的执行过程 1.6 函数的链式调用 1.7 函数的嵌套调用 1.8 函数递归 1.9 参数默认值 1.10 函数的关键字传参 2. 列表和元组 2.1 列表和元组是啥 2.2 创建列表 2.3 访问下标 2.…

磐维数据库PanWeiDB2.0日常维护

磐维数据库简介 “中国移动磐维数据库”&#xff08;ChinaMobileDB&#xff09;&#xff0c;简称“磐维数据库”&#xff08;PanWeiDB&#xff09;。是中国移动信息技术中心首个基于中国本土开源数据库打造的面向ICT基础设施的自研数据库产品。 其产品内核能力基于华为 OpenG…

Linux:文件与fd(未被打开的文件)

hello&#xff0c;各位小伙伴&#xff0c;本篇文章跟大家一起学习《Linux&#xff1a;文件与fd&#xff08;未被打开的文件&#xff09;》&#xff0c;感谢大家对我上一篇的支持&#xff0c;如有什么问题&#xff0c;还请多多指教 &#xff01; 如果本篇文章对你有帮助&#xf…

传输层协议TCP与UDP:深入解析与对比

传输层协议TCP与UDP&#xff1a;深入解析与对比 目录 传输层协议TCP与UDP&#xff1a;深入解析与对比引言1. 传输层协议概述2. TCP协议详解2.1 TCP的特点2.2 TCP的三次握手与四次挥手三次握手四次挥手 2.3 TCP的流量控制与拥塞控制2.4 TCP的可靠性机制 3. UDP协议详解3.1 UDP的…

自动驾驶中的多传感器时间同步

目录 前言 1.多传感器时间特点 2.统一时钟源 2.1 时钟源 2.2 PPSGPRMC 2.3 PTP 2.4 全域架构时间同步方案 3.时间戳误差 3.1 硬件同步 3.2 软件同步 3.2.3 其他方式 ① ROS 中的 message_filters 包 ② 双端队列 std::deque 参考&#xff1a; 前言 对多传感器数据…

U-Net - U型网络:用于图像分割的卷积神经网络

U-Net是一种专为图像分割任务设计的卷积神经网络&#xff08;CNN&#xff09;&#xff0c;最初由Olaf Ronneberger等人于2015年提出。它被广泛应用于医学影像分析、遥感图像分割、自动驾驶和其他许多需要对图像进行像素级分类的任务中。U-Net具有强大的特征提取和恢复能力&…

c++小知识点

抽象类包含至少一个纯虚函数&#xff0c;不能实例化对象。派生类必须实现基类的所有纯虚函数才能成为非抽象类&#xff0c;从而可以实例化对象。可以使用抽象类的指针或引用指向派生类对象&#xff0c;实现多态性调用。抽象类虽然不能直接实例化&#xff0c;但可以拥有构造函数…

关于使用PHP时WordPress排错——“这意味着您在wp-config.php文件中指定的用户名和密码信息不正确”的解决办法

本来是看到一位好友的自己建站&#xff0c;所以突发奇想&#xff0c;在本地装个WordPress玩玩吧&#xff0c;就尝试着装了一下&#xff0c;因为之前电脑上就有MySQL&#xff0c;所以在自己使用PHP建立MySQL时报错了。 最开始是我的php启动mysql时有问题&#xff0c;也就是启动过…

写一个存储“网站”的网站前的分析

要创建一个能够存储自己网站内容的“网站”,通常意味着你希望有一个可以存储网站数据、文件、内容等信息的系统。为了实现这一目标,可以考虑构建一个内容管理系统(CMS),这个系统能够帮助你存储和管理网站上的内容。 图片仅供参考 以下是如何实现一个可以存储自己网站内容…

[STM32 标准库]定时器输出PWM配置流程 PWM模式解析

前言&#xff1a; 本文内容基本来自江协&#xff0c;整理起来方便日后开发使用。MCU&#xff1a;STM32F103C8T6。 一、配置流程 1、开启GPIO&#xff0c;TIM的时钟 /*开启时钟*/RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2, ENABLE); //开启TIM2的时钟RCC_APB2PeriphClockC…