SpringBoot基础Kafka示例

这里将生产者和消费者放在一个应用中

使用的Boot3.4.3

引入Kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:application:name: kafka-1#kafka连接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生产者producer:#消息发送失败重试次数retries: 0#一个批次可以使用内存的大小batch-size: 16384#一个批次消息数量buffer-memory: 33554432#键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自动提交enable-auto-commit: false#自动提交的频率auto-commit-interval: 1000#earliest	从分区的最早偏移量开始消费	需要消费所有历史消息  latest	从分区的最新偏移量开始消费,忽略历史消息	只关心新消息#none	如果没有有效的偏移量,抛出异常	严格要求偏移量必须存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消费者如何处理消息的确认  ack配置方式  这里指定由消费者手动提交偏移量#Acknowledgment.acknowledge() 方法来提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099

生产者示例,一般可能是一个MQTT接收消息入口

package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}

消费者示例

注意:如果配置了手动提交ack,那么

主要目的不仅仅是避免重复消费,而是为了确保消息的可靠处理和偏移量(offset)的正确提交。它可以避免重复消费,但更重要的是保证消息不会丢失,并且在消息处理失败时能够重新消费。

package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 处理消息逻辑// ...} catch (Exception e) {// 处理异常,记录日志System.err.println("处理消息失败: " + e.getMessage());// 可以根据业务需求决定是否重新抛出异常}finally {// 手动提交偏移量ack.acknowledge();}}
}

生产者可选择异步或者同步发送消息

生产者发送消息有同步异步之说 那么消费者在消费消息时候 有没有同步异步之说呢???

在 Kafka 消费者中,消费消息的方式本质上是由 Kafka 的设计决定的,而不是由消费者代码显式控制的。Kafka 消费者在消费消息时,通常是以拉取(poll)的方式从 Kafka 服务器获取消息,然后处理这些消息。从这个角度来看,消费者的消费行为是同步的,因为消费者需要主动调用 poll 方法来获取消息。

然而,消费者的消息处理逻辑可以是同步异步的,具体取决于业务实现。以下是对消费者消费消息的同步和异步行为的详细分析:

 消费者的同步消费

在默认情况下,Kafka 消费者的消费行为是同步的,即:

  • 消费者通过 poll 方法从 Kafka 拉取一批消息。

  • 消费者逐条处理这些消息。

  • 每条消息处理完成后,消费者提交偏移量(offset)。

  • 消费者继续调用 poll 方法获取下一批消息。

特点:
  • 消息处理是顺序的,即一条消息处理完成后才会处理下一条消息。

  • 如果某条消息处理时间较长,会影响后续消息的处理速度。

  • 适合消息处理逻辑简单、处理时间较短的场景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步处理消息逻辑processMessage(message);} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

2. 消费者的异步消费

在某些场景下,消费者可能需要以异步的方式处理消息,即:

  • 消费者通过 poll 方法拉取一批消息。

  • 将每条消息提交到一个线程池或异步任务中处理。

  • 消费者继续调用 poll 方法获取下一批消息,而不等待上一条消息处理完成。

特点:
  • 消息处理是并发的,可以提高消息处理的吞吐量。

  • 需要额外的线程池或异步任务管理机制。

  • 适合消息处理逻辑复杂、处理时间较长的场景。

示例代码:

@Autowired
private ExecutorService executorService; // 注入线程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交异步任务处理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 异步处理消息} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

同步代码示例

@RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿结果SendResult<String, String> sendResult = send.get();System.out.println("说明消息发送成功,如果不成功会抛出异常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}

异步注册回调的方式

 @RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞  异步 注册回调异步通知send.thenAccept(result -> {System.out.println("消息发送成功");}).exceptionally(e->{System.out.println("发送失败");e.printStackTrace();return null;});return "Message sent to topic";}

如果需要发送的不是String类型 

那么要发送的不是String类型

KafkaTemplate<String,Object> kafkaTemplate;

一般来说可以专成JSON字符串发送

在引入spring-kafka的时候     KafkaAutoConfiguration中  配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate发送对象的时候

默认用的String序列化   会报错   除非将对象转为JSON字符串(一般可以这么做)

如果用对象的话   改成JsonSerializer  这样自动转JSON字符串

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/71999.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

API调试工具的无解困境:白名单、动态IP与平台设计问题

引言 你是否曾经在开发中遇到过这样的尴尬情形&#xff1a;你打开了平台的API调试工具&#xff0c;准备一番操作&#xff0c;结果却发现根本无法连接到平台&#xff1f;别急&#xff0c;问题出在调试工具本身。今天我们要吐槽的就是那些神奇的开放平台API调试工具&#xff0c;…

多方安全计算(MPC)电子拍卖系统

目录 一、前言二、多方安全计算(MPC)与电子拍卖系统概述2.1 多方安全计算(MPC)的基本概念2.2 电子拍卖系统背景与需求三、MPC电子拍卖系统设计原理3.1 系统总体架构3.2 电子拍卖中的安全协议3.3 数学与算法证明四、数据加解密模块设计五、GPU加速与系统性能优化六、GUI设计与系…

【Linux篇】初识Linux指令(上篇)

Linux命令世界&#xff1a;从新手到高手的必备指南 一 Linux发展与历史1.1 Linux起源与发展1.2 Linux与Windows操作系统对比 二 Linux常用操作指令2.1 ls命令 - “List”&#xff08;列出文件)2.2 pwd指令- "打印当前工作目录"2.3 cd指令 - “Change Directory”&…

编程视界:C++命名空间

目录 命名空间 为什么要使用命名空间 什么是命名空间 命名空间的使用方式 关键点总结 命名空间的嵌套使用 匿名命名空间 跨模块调用问题 命名空间可以多次定义 总结 首先从C的hello,world程序入手&#xff0c;来认识一下C语言 #include <iostream> using name…

Redux 和 MobX 高频面试题

Redux 和 MobX 是 React 生态中的两大状态管理方案&#xff0c;在面试中常涉及 原理、使用方式、对比、最佳实践 等方面。以下是 高频面试题 详细答案&#xff0c;助你轻松应对面试&#xff01;&#x1f680; &#x1f525; Redux 部分 1. Redux 是什么&#xff1f;为什么需要…

Excel 保护工作簿:它能解决哪些问题?如何正确使用?

在日常办公中&#xff0c;Excel 表格常常涉及多人协作、重要数据保护&#xff0c;甚至是避免误操作的情况。这时候&#xff0c;“保护工作簿”功能就能派上用场。它能有效防止他人修改表结构、删除工作表&#xff0c;甚至可以设置密码&#xff0c;确保数据的完整性和安全性。今…

Android Retrofit 框架注解定义与解析模块深度剖析(一)

一、引言 在现代 Android 和 Java 开发中&#xff0c;网络请求是不可或缺的一部分。Retrofit 作为 Square 公司开源的一款强大的类型安全的 HTTP 客户端&#xff0c;凭借其简洁易用的 API 和高效的性能&#xff0c;在开发者社区中广受欢迎。Retrofit 的核心特性之一便是通过注…

C# Enumerable类 之 数据分组

总目录 前言 在 C# 中&#xff0c;System.Linq.Enumerable 类是 LINQ&#xff08;Language Integrated Query&#xff09;的核心组成部分&#xff0c;它提供了一系列静态方法&#xff0c;用于操作实现了 IEnumerable 接口的集合。通过这些方法&#xff0c;我们可以轻松地对集合…

推理模型对SQL理解能力的评测:DeepSeek r1、GPT-4o、Kimi k1.5和Claude 3.7 Sonnet

引言 随着大型语言模型&#xff08;LLMs&#xff09;在技术领域的应用日益广泛&#xff0c;评估这些模型在特定技术任务上的能力变得越来越重要。本研究聚焦于四款领先的推理模型——DeepSeek r1、GPT-4o、Kimi k1.5和Claude 3.7 Sonnet在SQL理解与分析方面的能力&#xff0c;…

IDEA接入阿里云百炼中免费的通义千问[2025版]

安装deepseek 上一篇文章IDEA安装deepseek最新教程2025中说明了怎么用idea安装codeGPT插件&#xff0c;并接入DeepSeek&#xff0c;无奈接入的官方api已经不能使用了&#xff0c;所以我们尝试从其他地方接入 阿里云百炼https://bailian.console.aliyun.com/ 阿里云百炼‌是阿…

实施一套先进的智能摄像头服务系统。

一、项目背景 随着物联网、人工智能和大数据技术的飞速发展&#xff0c;智能摄像头已成为家庭、企业以及公共安全领域的重要设备。其便捷、高效、智能的特点&#xff0c;使得市场需求日益增长。为了满足用户对智能监控的多样化需求&#xff0c;提供更加全面、可靠的监控服务&a…

linux自启动服务

在Linux环境中&#xff0c;systemd是一个系统和服务管理器&#xff0c;它为每个服务使用.service文件进行配置。systemctl是用于控制系统服务的主要工具。本文将详细介绍如何使用systemctl来管理vsftpd服务&#xff0c;以及如何设置服务自启动。 使用Systemd设置自启动服务 创…

010-Catch2

Catch2 一、框架简介 Catch2 是一个基于 C 的现代化单元测试框架&#xff0c;支持 TDD&#xff08;测试驱动开发&#xff09;和 BDD&#xff08;行为驱动开发&#xff09;模式。其核心优势在于&#xff1a; 单头文件设计&#xff1a;v2.x 版本仅需包含 catch.hpp 即可使用自然…

数字人分身开发指南:从概念到实战

一、什么是数字人分身&#xff1f; 想象一下&#xff0c;在电脑或手机屏幕里&#xff0c;一个能跟你聊天、回答问题&#xff0c;甚至还能做表情的虚拟角色。这就是数字人分身&#xff0c;它用上了人工智能技术&#xff0c;让机器也能像人一样交流。无论是在线客服、网络主播还…

Pixelmator Pro for Mac 专业图像处理软件【媲美PS的修图】

介绍 Pixelmator Pro&#xff0c;是一款非常强大、美观且易于使用的图像编辑器&#xff0c;专为 Mac 设计。采用单窗口界面、基于机器学习的智能图像编辑、自动水平检测&#xff0c;智能快速选择及更好的修复工具等功能优点。许多非破坏性的专业编辑工具可让您进行最佳的照片处…

LiveGBS流媒体平台GB/T28181常见问题-视频流安全控制HTTP接口鉴权勾选流地址鉴权后401Unauthorized如何播放调用接口流地址校验

LiveGBS流媒体平台GB/T28181常见问题频流安全控制HTTP接口鉴权勾选流地址鉴权后401Unauthorized如何播放调用接口流地址校验&#xff1f; 1、安全控制1.1、HTTP接口鉴权1.2、流地址鉴权 2、401 Unauthorized2.1、携带token调用接口2.1.1、获取鉴权token2.1.2、调用其它接口2.1.…

C++设计模式-抽象工厂模式:从原理、适用场景、使用方法,常见问题和解决方案深度解析

一、模式基本概念 1.1 定义与核心思想 抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是创建型设计模式的集大成者&#xff0c;它通过提供统一的接口来创建多个相互关联或依赖的对象族&#xff0c;而无需指定具体类。其核心思想体现在两个维度&#xff1a; …

【prompt实战】知乎问题解答专家

本文原创作者&#xff1a;姚瑞南 AI-agent 大模型运营专家&#xff0c;先后任职于美团、猎聘等中大厂AI训练专家和智能运营专家岗&#xff1b;多年人工智能行业智能产品运营及大模型落地经验&#xff0c;拥有AI外呼方向国家专利与PMP项目管理证书。&#xff08;转载需经授权&am…

数据结构第八节:红黑树(初阶)

【本节要点】 红黑树概念红黑树性质红黑树结点定义红黑树结构红黑树插入操作的分析 一、红黑树的概念与性质 1.1 红黑树的概念 红黑树 &#xff0c;是一种 二叉搜索树 &#xff0c;但 在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是 Red和 Black 。 通过对 任何…

Spring Boot3.3.X整合Mybatis-Plus

前提说明&#xff1a; 项目的springboot版本为&#xff1a;3.3.2 需要整合的mybatis-plus版本&#xff1a;3.5.7 废话不多说&#xff0c;开始造吧 1.准备好数据库和表 2.配置全局文件application.properties或者是application.yml&#xff08;配置mapper的映射文件路径&am…