003 SpringBoot集成Kafka操作

4.SpringBoot集成Kafka

文章目录

  • 4.SpringBoot集成Kafka
      • 1.入门示例
      • 2.yml完整配置
      • 3.关键配置注释说明
        • 1. 生产者优化参数
        • 2. 消费者可靠性配置
        • 3. 监听器高级特性
        • 4. 安全认证配置
      • 4.配置验证方法
      • 5.不同场景配置模板
        • 场景1:高吞吐日志收集
        • 场景2:金融级事务消息
        • 场景3:跨数据中心同步
    • 5.高级配置
      • 1.事务支持
      • 2.消息重试与死信队列

来源参考的deepseek,如有侵权联系立删

1.入门示例

1.pom依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2.KafkaProducer消息生产者配置

@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}

springboot3.x写法

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
  • Spring Boot 2.xsend() 返回 ListenableFuture<SendResult>,支持 addCallback() 回调。
  • Spring Boot 3.xsend() 返回 CompletableFuture<SendResult>,弃用 ListenableFuture,因此需要使用 CompletableFuture 的 API(如 whenComplete)。

3.KafkaConsumer消息消费

@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}

4.yml配置文件

生产者配置

#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1

消费者配置

spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch

5.实体类

@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}

6.消息发送

@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}

kafka启动方式

./kafka-server-start.sh  ../config/server.properties

2.yml完整配置

spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092  # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app         # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 值序列化器acks: all                         # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5                        # 发送失败重试次数(需配合幂等性使用)batch-size: 16384                 # 批量发送缓冲区大小(单位:字节)linger-ms: 50                     # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432           # 生产者内存缓冲区大小(默认32MB)compression-type: snappy          # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx-        # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group      # 消费者组ID(同一组共享分区)auto-offset-reset: earliest       # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false         # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500             # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500            # 消费者等待broker返回数据的最长时间isolation-level: read_committed   # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single                      # 监听器类型:single(单条)/batch(批量)ack-mode: manual                  # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3                    # 消费者线程数(建议等于分区数)poll-timeout: 3000                # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3                     # 最大重试次数initial-interval: 1000          # 初始重试间隔(毫秒)multiplier: 2.0                 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic}   # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT  # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN             # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events        # 业务输入Topicoutput-topic: processed-events  # 业务输出Topic

3.关键配置注释说明

1. 生产者优化参数
参数说明推荐值
acks=all确保所有ISR副本写入成功,防止数据丢失高可靠性场景必选
compression-type=snappy减少网络带宽占用,提升吞吐量消息体>1KB时启用
transaction-id-prefix支持跨分区原子性写入(需配合@Transactional注解)金融交易类业务必配
2. 消费者可靠性配置
参数说明注意事项
enable-auto-commit=false避免消息处理失败但Offset已提交导致数据丢失需手动调用ack.acknowledge()
isolation-level=read_committed只消费已提交的事务消息需与生产者事务配置联动
3. 监听器高级特性
参数使用场景示例
type=batch批量消费(提升吞吐量)适用于日志处理等实时性要求低的场景
concurrency=3并发消费者数需与Topic分区数一致,避免资源浪费
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
  • 企业级必配:生产环境需启用SSL加密+SASL认证

4.配置验证方法

  1. 启动检查:添加@ConfigurationProperties(prefix = "spring.kafka")绑定配置到Bean,通过单元测试验证注入值
  2. 日志监控:开启DEBUG日志观察生产者/消费者连接状态
   logging:level:org.springframework.kafka: DEBUG
  1. AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}

5.不同场景配置模板

场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true  # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips  # 支持多IP解析reconnect.backoff.ms: 1000          # 断线重连策略

5.高级配置

1.事务支持

// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}

2.消息重试与死信队列

spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70862.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

将VsCode变得顺手好用(1

目录 设置中文 配置调试功能 提效和增强相关插件 主题和图标相关插件 创建js文件 设置中文 打开【拓展】 输入【Chinese】 下载完成后重启Vs即可变为中文 配置调试功能 在随便一个位置新建一个文件夹&#xff0c;用于放置调试文件以及你未来写的代码&#xff0c;随便命名但…

1.1部署es:9200

安装es&#xff1a;root用户&#xff1a; 1.布署java环境 - 所有节点 wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm yum localinstall jdk-8u341-linux-x64.rpm -y java -version 2.下载安装elasticsearch - 所有节点 wget ftp://10.3.148.254/Note/Elk/…

java后端开发day20--面向对象进阶(一)--static继承

&#xff08;以下内容全部来自上述课程&#xff09; 1.static–静态–共享 static表示静态&#xff0c;是java中的一个修饰符&#xff0c;可以修饰成员方法&#xff0c;成员变量。 1.静态变量 被static修饰的成员变量&#xff0c;叫做静态变量。 特点&#xff1a; 被该类…

DeepSeek本地部署+自主开发对话Web应用

文章目录 引言前端部分核心页面DeepSeek.vueMyModal.vue 后端部分WebSocketConfig 配置类AbstractDeepSeekToolDeepSeekWebSocketHandler 数据库设计总结 引言 最近DeepSeep横空出世&#xff0c;在全球内掀起一股热潮&#xff0c;到处都是满血大模型接入的应用&#xff0c;但这…

【算法】798. 差分矩阵

题目 798. 差分矩阵 思路 实质是二维差分&#xff0c;构造数组b&#xff0c;a为b的前缀和&#xff0c;也要用到前缀和的内容&#xff0c;求出数组b之后用b表示a&#xff0c;和一维差分思路类似&#xff0c;不同之处是在加减c时二维要复杂一些。 代码 #include<iostream…

MySQL企业开发中高频使用语句

以下是企业级MySQL开发中高频使用的语句分类及示例&#xff0c;结合典型业务场景说明&#xff1a; 一、数据定义&#xff08;DDL&#xff09; 表结构管理 -- 创建用户表&#xff08;含索引优化&#xff09; CREATE TABLE user (id INT PRIMARY KEY AUTO_INCREMENT,name VARCHAR…

使用DeepSeek/chatgpt等AI工具辅助网络协议流量数据包分析

随着deepseek,chatgpt等大模型的能力越来越强大&#xff0c;本文将介绍一下deepseek等LLM在分数流量数据包这方面的能力。为需要借助LLM等大模型辅助分析流量数据包的同学提供参考&#xff0c;也了解一下目前是否有必要继续学习wireshark工具以及复杂的协议知识。 pcap格式 目…

DeepSeek-R1:通过强化学习激发大语言模型的推理能力

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》&#xff08;人工智能科学与技术丛书&#xff09;【陈敬雷编著】【清华大学出版社】 文章目录 DeepSeek大模型技术系列三DeepSeek大模型技术系列三》DeepSeek-…

基于YOLO11深度学习的医学X光骨折检测与语音提示系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

数据结构:二叉树的数组结构以及堆的实现详解

目录 一.树与二叉树 1.树的概念与相关术语&#xff1a; 2.二叉树&#xff1a; &#xff08;1&#xff09;定义&#xff1a; &#xff08;2&#xff09;特殊的二叉树&#xff1a; &#xff08;3&#xff09;完全二叉树 &#xff08;4&#xff09;二叉树的存储结构&#x…

Python 函数式编程-偏函数

目录 偏函数 小结 偏函数 Python的functools模块提供了很多有用的功能&#xff0c;其中一个就是偏函数&#xff08;Partial function&#xff09;。要注意&#xff0c;这里的偏函数和数学意义上的偏函数不一样。 在介绍函数参数的时候&#xff0c;我们讲到&#xff0c;通过…

鸿蒙中连接手机可能遇到的问题

连接权限问题&#xff1a;手机开启了严格的权限管理机制&#xff0c;若未授予鸿蒙设备连接所需的权限&#xff0c;如蓝牙连接时未开启蓝牙权限&#xff0c;或者 USB 连接时未允许设备进行调试、文件传输等操作&#xff0c;就会导致连接失败。例如&#xff0c;当使用鸿蒙平板通过…

VMware各个软件的作用

VMware作为全球领先的虚拟化与云计算解决方案提供商&#xff0c;其软件产品覆盖了从桌面级虚拟化到企业级云平台的全场景需求。以下结合其核心产品体系&#xff0c;详细解析各软件的功能定位与业务价值&#xff1a; 一、基础虚拟化平台 VMware vSphere 作为企业级服务器虚拟化…

第16届蓝桥杯模拟赛3 python组个人题解

第16届蓝桥杯模拟赛3 python组 思路和答案不保证正确 1.填空 如果一个数 p 是个质数&#xff0c;同时又是整数 a 的约数&#xff0c;则 p 称为 a 的一个质因数。 请问&#xff0c; 2024 的最大的质因数是多少&#xff1f; 因为是填空题&#xff0c;所以直接枚举2023~2 &am…

SQL笔记#复杂查询

一、视图 1、视图和表 使用试图时会执行SELECT语句并创建一张临时表。视图中保存的是SELECT语句;表中保存的是实际数据。 2、创建视图的方法 CREATE VIEW 视图名称(<视图列名1>,<视图列名2>,……) AS <SELECT语句> CREATE VIEW ProductSum (prod…

深度求索(DeepSeek)的AI革命:NLP、CV与智能应用的技术跃迁

Deepseek官网&#xff1a;DeepSeek 引言&#xff1a;AI技术浪潮中的深度求索 近年来&#xff0c;人工智能技术以指数级速度重塑全球产业格局。在这场技术革命中&#xff0c;深度求索&#xff08;DeepSeek&#xff09;凭借其前沿的算法研究、高效的工程化能力以及对垂直场景的…

203、【数组】NLP分词实现(Python)

题目描述 给定一个词典&#xff0c;比如[“杭州”,“西湖”,“博物馆”,“杭州西湖博物馆”,“我”]​ 对于输入的文本进分词&#xff1a;我在杭州的杭州西湖博物馆玩了一天​ 分词结果处理为如下形式的字符串: 我\W 在 杭州\W 的 杭州西湖博物馆\W 玩了一天​ 对于输入的文本…

在 Vue 3 中,如何缓存和复用动态组件

在 Vue 3 中&#xff0c;如何缓存和复用动态组件&#xff0c;这有助于提高应用的性能&#xff0c;避免组件重复创建和销毁带来的开销。下面详细介绍其使用方法和相关配置。 1. 使用 <KeepAlive> 组件缓存动态组件 基本使用 <KeepAlive> 是 Vue 3 内置的一个组件…

Nginx面试宝典【刷题系列】

文章目录 1、nginx是如何实现高并发的&#xff1f;2、Nginx如何处理HTTP请求&#xff1f;3、使用“反向代理服务器”的优点是什么?4、列举Nginx服务器的最佳用途。5、Nginx服务器上的Master和Worker进程分别是什么?6、什么是C10K问题?7、请陈述stub_status和sub_filter指令的…

excel单、双字节字符转换函数(中英文输入法符号转换)

在Excel中通常使用函数WIDECHAR和ASC来实现单、双字节字符之间的转换。其中 WIDECHAR函数将所有的字符转换为双字节&#xff0c;ASC函数将所有的字符转换为单字节 首先来解释一下单双字节的含义。单字节一般对应英文输入法的输入&#xff0c;如英文字母&#xff0c;英文输入法…