[Java实战]Spring Boot整合Kafka:高吞吐量消息系统实战(二十七)

[Java实战]Spring Boot整合Kafka:高吞吐量消息系统实战(二十七)

一、引言

Apache Kafka作为一款高吞吐量、低延迟的分布式消息队列系统,广泛应用于实时数据处理、日志收集和事件驱动架构。结合Spring Boot的自动化配置能力,可以快速搭建高性能消息系统。本文将从环境搭建、代码实现、原理分析到测试优化,全面解析Spring Boot与Kafka的整合实践。

二、环境准备

1. Kafka安装与启动

  1. 下载Kafka:从Apache Kafka官网下载最新版本(推荐3.x+)。
  2. 启动Zookeeper(Kafka依赖):
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  3. 启动Kafka服务
    bin/kafka-server-start.sh config/server.properties
    

2. 创建Topic

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

说明:手动创建Topic可指定分区数(如3),提升并发处理能力。

三、环境准备(docker)

1. 使用Docker快速启动Kafka

通过Docker可以快速部署Kafka服务,无需手动安装依赖,步骤如下:

  1. 创建docker-compose.yml文件
    在项目根目录下新建文件,内容如下:
    version: '3'
    services:zookeeper:image: docker.1ms.run/confluentinc/cp-zookeeper:7.4.0ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: docker.1ms.run/confluentinc/cp-kafka:7.4.0ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.231.132:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"  # 禁止自动创建Topicdepends_on:- zookeeper
    

    关键配置说明

    • KAFKA_ADVERTISED_LISTENERS: 确保客户端能通过localhost:9092访问Kafka。
  • KAFKA_AUTO_CREATE_TOPICS_ENABLE: 设为false避免自动创建Topic,推荐手动控制。
  1. 启动Kafka服务
    执行以下命令启动服务:
    docker-compose up -d#停掉
    #docker-compose down
    

2. 创建Topic

通过Docker执行命令创建Topic:

docker exec -it kafka-kafka-1 kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

注意

  • kafka-kafka-1为容器名称(根据实际名称调整)。
  • --partitions 3指定分区数,提升并发处理能力。

3.安装成功截图

在这里插入图片描述

四、Spring Boot项目搭建

1. 添加依赖

pom.xml中引入Spring Kafka:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件

application.yml配置Kafka连接及序列化方式:

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

关键参数auto-offset-reset: earliest确保消费者从最早消息开始消费。

五、代码实现

1. 生产者配置

@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// 发送消息(支持回调)public void sendMessage(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(result -> {System.out.println("发送成功: " + result.getRecordMetadata().offset());}, ex -> {System.out.println("发送失败: " + ex.getMessage());});}
}

高级特性:回调机制可监控消息发送状态。

2. 消费者配置

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("接收到消息: " + message);// 业务处理逻辑}
}

批量消费:通过设置spring.kafka.consumer.max-poll-records可支持批量处理。

3.测试结果

KafkaController编写:

@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducer;@PostMapping("/send")public ResponseEntity<String> sendMs(@RequestBody String request) {kafkaProducer.sendMessage("my-topic","你好");return ResponseEntity.ok("ok");}
}

测试结果:

在这里插入图片描述

六、原理分析

1. Spring Kafka核心组件

  • KafkaTemplate:封装生产者操作,支持异步发送和事务管理。
  • @KafkaListener:基于监听器模式,自动创建消费者并订阅Topic。
  • ConsumerFactory/ProducerFactory:工厂类管理Kafka客户端配置。

2. 高吞吐量优化

  • 生产者端:调整batch.size(批次大小)和linger.ms(等待时间)提升批量发送效率。
  • 消费者端:增加分区数、配置多线程消费(ConcurrentKafkaListenerContainerFactory)。

七、高级特性

1. 自定义分区策略

实现Partitioner接口,指定消息路由规则:

public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑(如按Key哈希)return key.hashCode() % cluster.partitionCountForTopic(topic);}
}

配置文件中指定分区器:

spring:kafka:producer:properties:partitioner.class: com.example.CustomPartitioner

2. 事务支持

通过KafkaTransactionManager实现事务消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendInTransaction() {kafkaTemplate.executeInTransaction(operations -> {operations.send("topic1", "Message1");operations.send("topic2", "Message2");return null;});
}

八、测试步骤

1. 单元测试(使用嵌入式Kafka)

添加测试依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope>
</dependency>

编写测试类:

@SpringBootTest
@EmbeddedKafka(topics = "test-topic")
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void testSendAndReceive() {kafkaTemplate.send("test-topic", "Hello Kafka");// 通过监听器验证消息接收}
}

说明:嵌入式Kafka无需外部服务,适合CI/CD环境。

九、总结

本文从环境搭建到代码实现,结合Spring Boot与Kafka的高吞吐量特性,实现了消息系统的快速开发。通过自定义分区、事务支持和批量消费等高级功能,可进一步优化系统性能。实际应用中需根据业务场景调整参数,并借助监控工具(如Kafka Manager)持续优化。

参考文档

  • Spring Kafka官方文档
  • Apache Kafka架构解析

希望本教程对您有帮助,请点赞❤️收藏⭐关注支持!欢迎在评论区留言交流技术细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/906397.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kotlin Multiplatform--04:经验总结(持续更新)

Kotlin Multiplatform--04&#xff1a;经验总结&#xff08;持续更新&#xff09; 引言 引言 本章用来记载笔者开发过程中的一些经验总结 一、Ktor设置Header 在官方文档中&#xff0c;想要设置Header的示例代码如下&#xff1a; client.get("https://ktor.io&qu…

在 Ubuntu 系统中,将 JAR 包安装为服务

在 Ubuntu 系统中&#xff0c;将 JAR 包安装为服务可以通过 systemd 来实现。以下是详细的操作步骤&#xff1a; 准备工作 确保 JAR 文件路径和 Java 运行时环境已准备好。验证 Java 是否可用&#xff1a; java -version创建 systemd 服务文件 systemd 的服务文件通常位于 …

电商项目-商品微服务-品牌管理微服务开发

一、功能分析 品牌管理微服务包括&#xff1a; &#xff08;1&#xff09;查询全部列表数据 &#xff08;2&#xff09;根据ID查询实体数据 &#xff08;3&#xff09;增加 &#xff08;4&#xff09;修改 &#xff08;5&#xff09;删除 &#xff08;6&#xff09;分页…

Spring Boot开发—— 整合Lucene构建轻量级毫秒级响应的全文检索引擎

文章目录 一、为什么选择 Lucene?轻量级搜索的底层密码二、核心原理:Lucene 的倒排索引2.1 倒排索引:速度之源2.2 段合并优化策略三、Spring Boot集成Lucene实战3.1 依赖配置3.2 实体与索引设计3.3 核心索引服务(含异常处理)3.4 使用示例(测试类)四、高级优化技巧4.1 索…

SpringBootDay1|面试题

目录 一、springboot框架 1、什么是springboot 2、Spring Boot的主要优点 3、springboot核心注解 4、定义banner&#xff08;springboot的logo&#xff09; 5、springboot配置文件 6、springboot 整合 jdbc 二、面试题 1&#xff09;springmvc的作用 ​编辑 2&#x…

jQuery Ajax中dataType 和 content-type 参数的作用详解

jQuery Ajax中dataType与contentType参数解析 一、核心概念对比 参数作用对象数据类型默认值dataType响应数据预期接收的数据格式jQuery自动判断&#xff08;根据响应头MIME类型&#xff09;contentType请求数据发送数据的编码格式application/x-www-form-urlencoded 二、da…

几款常用的虚拟串口模拟器

几款常用的虚拟串口模拟器&#xff08;Virtual Serial Port Emulator&#xff09;&#xff0c;适用于 Windows 系统&#xff0c;可用于开发和调试串口通信应用&#xff1a; 1. com0com (开源免费) 特点&#xff1a; 完全开源免费&#xff0c;无功能限制。 可创建多个虚拟串口…

LLM笔记(六)线性代数

公式速查表 1. 向量与矩阵&#xff1a;表示、转换与知识存储的基础 向量表示 (Vectors): 语义的载体 在LLM中&#xff0c;向量 x ∈ R d \mathbf{x}\in\mathbb{R}^d x∈Rd 是信息的基本单元&#xff0c;承载着丰富的语义信息&#xff1a; 词嵌入向量 (Word Embeddings)&am…

[特殊字符] Word2Vec:将词映射到高维空间,它到底能解决什么问题?

一、在 Word2Vec 之前,我们怎么处理语言? 在 Word2Vec 出现之前,自然语言处理更多是“工程方法”,例如字符串匹配、关键词提取、正则规则...。但这些表示通常缺乏语义,词与词之间看不出任何联系以及非常浅显。当然,技术没有好坏,只有适合的场景。例如: 关键词匹配非常…

栈和队列的模拟实现

栈和队列的模拟实现 容器适配器priority_queue(优先级队列&#xff09;priority_queue的使用priority_queue的模拟实现&#xff1a; 仿函数什么叫仿函数&#xff1f;需要自己实现仿函数的情况&#xff1a; 栈的模拟实现队列的模拟实现deque&#xff08;vector和list的缝合怪&am…

idea本地debug断点小技巧

idea本地debug断点小技巧 简单的设置断点条件 断点后&#xff0c;右键这个断点&#xff0c;可以在 condition 中填写能得出布尔的表达式 a 1 你如果写如下&#xff0c;表示先给他赋值&#xff0c;然后断住 a 2; true 断点后设置某个变量的值 在 debug 区域可以设置变量…

Oracle中如何解决FREE BUFFER WAITS

基于性能上的考虑&#xff0c;服务器进程在扫描LRU主列的同时&#xff0c;会将脏块移至LRU-W列&#xff0c;如果发现没有足够可用&#xff08;可替换&#xff09;的BUFFER CACHE&#xff0c;进程并不会无止尽地扫描整条LRU主列&#xff0c;而是在扫描到某个阀值&#xff08;该阀…

Git命令使用全攻略:从创建分支到合并的完整流程

Git命令使用全攻略&#xff1a;从创建分支到合并的完整流程 引言一、初始化项目与基础配置1.1 克隆远程仓库1.2 查看当前分支状态 二、创建与管理分支2.1 从main分支创建新功能分支2.2 查看分支列表2.3 提交代码到新分支2.4 推送分支到GitHub 三、版本发布与标签管理3.1 创建轻…

MATLAB跳动的爱心

520&#xff0c;一个会动的心~~~ function particleHeart2 % author : slandarer% 所需匿名函数 col1Func(n) repmat([255,158,196]./255,[n,1])repmat([-39,-81,-56]./255,[n,1]).*rand([n,1]); col2Func(n) repmat([118,156,216]./255,[n,1])repmat([137,99,39].*.1./255,[n,…

Go的单测gomock及覆盖率命令

安装gomock&#xff1a; go get github.com/golang/mock/gomockgo get github.com/golang/mock/mockgen 使用 mockgen 生成 mock 代码: 参考 mockgen -sourceservice/user.go -destinationservice /mocks/mock_user_service.go -packagemocks go test -coverprofilecoverage.ou…

vue添加loading后修复页面渲染问题

问题&#xff1a;想要通过选择流程&#xff08;1&#xff09;后加载出角色信息&#xff08;2&#xff09; 选择后无法展示经过排查&#xff0c;再调用接口给角色数组赋值后&#xff0c;页面在接口调用完之前就已经渲染完成。接口是采用的异步加载解决&#xff1a;loadingRoles…

Python入门手册:Python简介,什么是Python

在当今数字化时代&#xff0c;编程语言犹如一把把神奇的钥匙&#xff0c;能够开启通往技术世界的大门。而Python&#xff0c;无疑是其中最闪耀的一颗明星。今天&#xff0c;就让我们一起走进Python的世界&#xff0c;从它的起源、应用领域以及优缺点三个方面&#xff0c;来全面…

用PyTorch在超大规模下训练深度学习模型:并行策略全解析

我猜咱们每个人肯定都累坏了&#xff0c;天天追着 LLM 研究社区跑&#xff0c;感觉每天都冒出个新的最牛模型&#xff0c;把之前的基准都给打破了呢。要是你好奇为啥创新速度能这么快&#xff0c;那主要就是研究人员能够在超大规模下训练和验证模型啦&#xff0c;这全靠并行计算…

提示工程(Prompt Engineering)应用技巧

Prompt&#xff08;提示&#xff09;就是用户与大模型交互输入的代称。即我们给大模型的输入称为 Prompt&#xff0c;而大模型返回的输出一般称为 Completion。 Prompt 需要清晰明确地表达需求&#xff0c;提供充足上下文&#xff0c;使语言模型能够准确理解我们的意图。更长、…

[原创](现代Delphi 12指南):[macOS 64bit App开发]: 如何获取目录大小?

[作者] 常用网名: 猪头三 出生日期: 1981.XX.XX 企鹅交流: 643439947 个人网站: 80x86汇编小站 编程生涯: 2001年~至今[共24年] 职业生涯: 22年 开发语言: C/C++、80x86ASM、Object Pascal、Objective-C、C#、R、Python、PHP、Perl、 开发工具: Visual Studio、Delphi、XCode、…