Kafka入门:从初识到Spring Boot实战

news/2025/11/29 13:18:09/文章来源:https://www.cnblogs.com/sun-10387834/p/19286179

回顾完RabbitMQ,再跟我一起回顾下Kafka ~

一、Kafka介绍

1. 什么是Kafka?

Kafka是由Apache软件基金会开发的分布式流处理平台,最初由LinkedIn公司设计,现已成为大数据领域核心的消息中间件。它能处理实时数据流,支持高吞吐、低延迟、可扩展的消息传递,广泛用于日志收集、实时分析、事件驱动架构等场景。

2. 核心特点

  • 高吞吐:单机可支持百万级消息/秒,通过分区并行处理实现。
  • 持久化:消息持久化到磁盘,支持TB级数据存储(默认保留7天)。
  • 分布式:集群由多个Broker(服务器)组成,支持水平扩展。
  • 多订阅者:一个Topic的消息可被多个消费者组独立消费(广播/负载均衡)。

二、Kafka架构与核心组件

1. 核心组件

组件 作用
Broker Kafka服务器节点,存储Topic数据,每个Broker有唯一ID(broker.id)。
Topic 消息的逻辑分类(如order-topic),类似“消息频道”,包含多个Partition。
Partition Topic的物理分片(有序日志文件),分布式存储的基本单位,每个Partition有Leader和Follower副本。
Producer 发送消息到Topic的客户端(如订单服务)。
Consumer 从Topic订阅消息的客户端(如库存服务)。
Consumer Group 消费者组,组内多个消费者负载均衡消费Partition,组间独立消费(广播)。

2. 架构图(Mermaid)

graph TDsubgraph Kafka ClusterBroker1[Broker 1<br/>broker.id=0<br/>• TopicA-Partition0 Leader<br/>• TopicB-Partition1 Follower]Broker2[Broker 2<br/>broker.id=1<br/>• TopicA-Partition1 Leader<br/>• TopicB-Partition0 Leader]Broker3[Broker 3<br/>broker.id=2<br/>• TopicA-Partition0 Follower]endZK[ZooKeeper<br/>集群协调<br/>存储元数据]Producer[Producer<br/>发送消息到Topic]ConsumerGroup[Consumer Group<br/>组内负载均衡消费]TopicA[TopicA<br/>• Partition0<br/>• Partition1]Producer -->|发送消息| TopicATopicA -->|分区存储| Broker1TopicA -->|分区存储| Broker2Broker1 -->|同步数据| Broker3TopicA -->|负载均衡消费| ConsumerGroupZK -.->|协调| Broker1ZK -.->|协调| Broker2ZK -.->|协调| Broker3ZK -.->|管理消费者组| ConsumerGroup

三、消息流转完整路径(生产者→消费者)

1. 流转步骤

  1. 生产者发送消息:生产者指定Topic和Key(可选),通过分区器将消息分配到Partition(默认按Key哈希)。
  2. Broker存储消息:Leader副本接收消息并写入磁盘(Segment文件),Follower副本同步数据。
  3. 消费者组分配Partition:消费者组启动时,协调者(Coordinator)将Topic的Partition分配给组内消费者(一个Partition仅被一个消费者消费)。
  4. 消费者拉取消息:消费者定期拉取(Poll)分配到的Partition消息,处理后提交偏移量(Offset)。

2. 消息流转图示(Mermaid)

sequenceDiagramparticipant P as Producerparticipant B as Broker (Leader)participant F as Broker (Follower)participant C as Consumer (Group)Note over P,B: 1. 生产者发送消息P->>B: 发送消息到Topic-Partition0 (Key: order-1)B->>B: 写入本地日志 (LEO=100)B->>F: 同步消息 (LEO=100)F->>B: 确认同步 (LEO=100)B->>P: 返回ACK (消息提交成功)Note over B,C: 2. 消费者拉取消息C->>B: Poll请求 (获取Partition0消息)B->>C: 返回消息 (Offset=99, Value=订单数据)C->>C: 处理消息 (扣减库存)C->>B: 提交偏移量 (Offset=100)

四、Kafka安装(ZooKeeper传统模式,CentOS 7)

1. 环境准备

  • CentOS 7系统,关闭防火墙(或开放端口2181、9092):
    systemctl stop firewalld && systemctl disable firewalld
    
  • 安装JDK 8+:
    yum install java-1.8.0-openjdk-devel -y
    

2. 安装ZooKeeper(Kafka依赖)

步骤1:下载并解压

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.7.1-bin /opt/zookeeper

步骤2:配置ZooKeeper

cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg  # 修改以下配置
dataDir=/var/lib/zookeeper  # 数据存储目录
clientPort=2181             # 客户端端口

步骤3:启动ZooKeeper

mkdir -p /var/lib/zookeeper
/opt/zookeeper/bin/zkServer.sh start  # 启动
/opt/zookeeper/bin/zkServer.sh status  # 查看状态(显示Mode: standalone为成功)

3. 安装Kafka Broker

步骤1:下载并解压

wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -zxvf kafka_2.13-3.6.0.tgz -C /opt/
mv /opt/kafka_2.13-3.6.0 /opt/kafka

步骤2:配置Kafka

cd /opt/kafka/config
vim server.properties  # 修改以下配置
# 核心配置
broker.id=0                  # 当前Broker唯一ID(集群中不可重复)
listeners=PLAINTEXT://localhost:9092  # 监听地址(本地测试用localhost)
log.dirs=/var/lib/kafka/logs  # 消息存储目录
zookeeper.connect=localhost:2181/kafka  # 连接ZooKeeper(/kafka为根节点)

步骤3:启动Kafka

mkdir -p /var/lib/kafka/logs
/opt/kafka/bin/kafka-server-start.sh -daemon config/server.properties  # 后台启动
jps  # 查看进程(显示Kafka为成功)

4. 创建Topic(测试用)

/opt/kafka/bin/kafka-topics.sh --create \--topic order-topic \          # 主题名称--bootstrap-server localhost:9092 \  # Kafka地址--partitions 3 \               # 分区数(建议≥3)--replication-factor 1         # 副本数(单节点只能设1)

五、Spring Boot保姆级案例(生产者+消费者)

1. 项目结构

src/main/java/com/example/kafkademo/
├── KafkaDemoApplication.java  # 启动类
├── model/Order.java           # 订单实体类
├── producer/OrderProducer.java # 生产者服务
├── consumer/OrderConsumer.java # 消费者服务
└── controller/OrderController.java # 测试接口
src/main/resources/
└── application.yml            # 配置文件

2. pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version> <!-- Spring Boot 2.7.x稳定版 --><relativePath/></parent><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-demo</name><dependencies><!-- Web依赖(提供HTTP接口) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Kafka依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Lombok(简化实体类) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

3. application.yml配置

server:port: 8080  # 应用端口spring:application:name: kafka-demo  # 应用名称kafka:bootstrap-servers: localhost:9092  # Kafka集群地址(多个用逗号分隔)# 生产者配置producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # Key序列化器(字符串)value-serializer: org.springframework.kafka.support.serialization.JsonSerializer  # Value序列化器(JSON)acks: all  # 消息确认级别:all=所有ISR副本确认(最高可靠性)retries: 3  # 发送失败重试次数enable-idempotence: true  # 启用幂等性(防重复消息)# 消费者配置consumer:group-id: order-group  # 消费者组ID(同一组内负载均衡)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Key反序列化器value-deserializer: org.springframework.kafka.support.serialization.JsonDeserializer  # Value反序列化器auto-offset-reset: earliest  # 无偏移量时策略:earliest=从头消费enable-auto-commit: false  # 关闭自动提交偏移量(手动控制)properties:spring.json.trusted.packages: "com.example.kafkademo.model"  # 信任的实体类包(JSON反序列化用)# 监听器配置(消费者)listener:ack-mode: manual_immediate  # 手动立即提交偏移量(处理完一条提交一条)concurrency: 3  # 并发消费者数(建议=Topic分区数,此处3分区)

4. 实体类(Order.java)

package com.example.kafkademo.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;/*** 订单实体类(消息载体)*/
@Data  // Lombok注解:自动生成getter/setter/toString等
@NoArgsConstructor  // 无参构造
@AllArgsConstructor  // 全参构造
public class Order {private String orderId;  // 订单IDprivate String productName;  // 商品名称private BigDecimal amount;  // 订单金额private String status;  // 订单状态(CREATED/PAID/SHIPPED)
}

5. 生产者服务(OrderProducer.java)

package com.example.kafkademo.producer;import com.example.kafkademo.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.math.BigDecimal;
import java.util.UUID;/*** 订单生产者服务:发送订单消息到Kafka*/
@Service  // 标记为Spring服务组件
@Slf4j  // Lombok日志注解
public class OrderProducer {// 注入KafkaTemplate(Spring Boot自动配置,用于发送消息)@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;private static final String TOPIC_NAME = "order-topic";  // 目标Topic名称(需与消费者一致)/*** 发送订单消息* @param order 订单对象(若为null则自动生成测试订单)*/public void sendOrder(Order order) {// 1. 若订单ID为空,生成UUID作为订单IDif (order == null) {order = new Order();order.setOrderId(UUID.randomUUID().toString());  // 随机生成订单IDorder.setProductName("测试商品");  // 测试商品名称order.setAmount(new BigDecimal("99.99"));  // 测试金额order.setStatus("CREATED");  // 初始状态:已创建}// 2. 发送消息到Kafka(Key=订单ID,确保同一订单进入同一Partition)ListenableFuture<SendResult<String, Order>> future = kafkaTemplate.send(TOPIC_NAME, order.getOrderId(), order);// 3. 异步回调:处理发送结果(成功/失败)future.addCallback(new ListenableFutureCallback<SendResult<String, Order>>() {@Overridepublic void onSuccess(SendResult<String, Order> result) {// 发送成功:打印消息元数据(Topic、分区、偏移量)log.info("订单发送成功:orderId={}, 分区={}, 偏移量={}",order.getOrderId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {// 发送失败:打印错误信息log.error("订单发送失败:orderId={}, 原因={}", order.getOrderId(), ex.getMessage());}});}
}

6. 消费者服务(OrderConsumer.java)

package com.example.kafkademo.consumer;import com.example.kafkademo.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;/*** 订单消费者服务:从Kafka接收并处理订单消息*/
@Service  // 标记为Spring服务组件
@Slf4j  // Lombok日志注解
public class OrderConsumer {/*** 监听order-topic主题,消费订单消息* @param record 消息记录(包含Topic、分区、偏移量、消息体等)* @param ack 手动提交偏移量的工具类*/@KafkaListener(topics = "order-topic", groupId = "order-group")  // 监听的Topic和消费者组IDpublic void consumeOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {// 1. 解析消息体(Order对象)Order order = record.value();log.info("收到订单消息:orderId={}, productName={}, amount={}, status={}, 分区={}, 偏移量={}",order.getOrderId(),order.getProductName(),order.getAmount(),order.getStatus(),record.partition(),record.offset());try {// 2. 模拟业务处理(如扣减库存、更新订单状态)log.info("处理订单:orderId={},开始扣减库存...", order.getOrderId());order.setStatus("PAID");  // 更新状态为“已支付”Thread.sleep(500);  // 模拟处理耗时log.info("订单处理完成:orderId={},状态更新为{}", order.getOrderId(), order.getStatus());// 3. 手动提交偏移量(确认消息已处理,Kafka不再重复投递)ack.acknowledge();log.info("偏移量提交成功:orderId={},分区={},偏移量={}",order.getOrderId(), record.partition(), record.offset());} catch (Exception e) {log.error("订单处理失败:orderId={}, 原因={}", order.getOrderId(), e.getMessage());// 处理失败时抛出异常,触发重试(需配置重试策略,此处简化)}}
}

7. 测试控制器(OrderController.java)

package com.example.kafkademo.controller;import com.example.kafkademo.model.Order;
import com.example.kafkademo.producer.OrderProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;/*** 测试接口:通过HTTP发送订单消息*/
@RestController  // 标记为REST接口控制器
public class OrderController {@Autowired  // 注入生产者服务private OrderProducer orderProducer;/*** 创建订单接口(发送消息到Kafka)* @param order 订单对象(JSON格式)* @return 响应消息*/@PostMapping("/create-order")public String createOrder(@RequestBody(required = false) Order order) {// 调用生产者发送订单(若order为null,生产者自动生成测试订单)orderProducer.sendOrder(order);return "订单消息已发送,请查看控制台日志!";}
}

8. 启动类(KafkaDemoApplication.java)

package com.example.kafkademo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication  // Spring Boot启动类注解
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);  // 启动应用}
}

六、测试步骤

  1. 启动Kafka集群:确保ZooKeeper和Kafka Broker已启动(参考“四、Kafka安装”)。
  2. 创建Topic:若未创建,执行命令:
    /opt/kafka/bin/kafka-topics.sh --create --topic order-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    
  3. 运行Spring Boot应用:启动KafkaDemoApplication,访问http://localhost:8080/create-order(POST请求,可传JSON订单或用Postman测试)。
  4. 查看日志
    • 生产者日志:显示“订单发送成功”及分区、偏移量。
    • 消费者日志:显示“收到订单消息”“处理订单”“偏移量提交成功”。

总结

本文从Kafka架构、安装到Spring Boot实战,覆盖了入门级核心内容。通过“生产者发送订单→消费者消费订单”的简单案例,快速上手Kafka的基本使用。后续可深入学习分区策略、数据可靠性、事务等高级特性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/980747.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VFox版本管理工具使用教程

官网 https://vfox.dev/zh-hans/guides/intro.html Github:https://github.com/version-fox/vfox 使用教程: 简单安装使用(Java JDK使用为例) 安装在Releases下载最新版本的zip安装包 配置PATH环境变量,将vfox安装目…

unprofitable25,5

11https://matt.might.net/articles/how-to-blog-as-an-academic/

2025年浙江电子汽车衡年度排名:电子汽车衡制造商、诚信的防

在工业称重领域,电子汽车衡作为物流、化工、港口等场景的核心设备,其精度、稳定性与定制化能力直接影响企业运营效率与安全合规。面对市场上鱼龙混杂的供应商,如何选择兼具技术实力、诚信口碑与定制能力的合作伙伴?…

FileGDB代码示例

FileGDB代码示例 1. 读取FileGDB图层 1.1 实现思路graph TDA[注册OGR] --> B[设置中文路径支持]B --> C[打开数据驱动]C --> D[打开数据源]D --> G[获取图层]G --> H[读取图层信息]1.2 代码示例public …

2025年中国十大比较好的AI智能客服企业推荐:口碑好且资质

本榜单依托全维度市场调研与真实行业口碑,深度筛选出十家标杆企业,重点围绕技术实力、服务体验、资质合规、客户反馈四大核心维度,为企业选型提供客观依据,助力精准匹配适配的服务伙伴。 TOP1 推荐:广州市塔灯人工…

07-实战案例与最佳实践

第七章:实战案例与最佳实践 7.1 案例一:参数化建模工具 7.1.1 需求分析 参数化建模是现代CAD系统的核心功能之一。本案例将实现一个参数化齿轮建模工具,用户可以通过调整参数动态生成齿轮模型。 功能需求:支持设置…

06-二次开发进阶

第六章:二次开发进阶 6.1 自定义渲染器 6.1.1 渲染管线概述 Chili3D的渲染基于Three.js,理解其渲染管线对于自定义渲染至关重要: 场景图(Scene Graph)↓ 几何体处理(Geometry Processing)↓ 材质着色(Material Shad…

2025年哈尔滨精密轴承企业综合实力前十强排行榜

我们优先筛选了通过国际、国内双重认证的企业 —— 比如 ISO9001 质量管理体系(这是基础)、GJB9001A 军工质量管理体系(针对高端装备领域)、TS16949 汽车行业认证(汽车轴承的 “入场券”),还有 AS9100 航空航天…

05-二次开发入门

第五章:二次开发入门 5.1 开发环境配置 5.1.1 推荐开发工具 进行Chili3D二次开发,推荐使用以下开发工具: 代码编辑器:Visual Studio Code(推荐):免费、开源、功能强大 JetBrains WebStorm:专业的Web开发IDE推荐…

04-用户界面与交互系统

第四章:用户界面与交互系统 4.1 UI架构概述 4.1.1 组件化设计 Chili3D的用户界面采用组件化设计,将复杂的界面分解为可复用的小组件。这种设计使得代码更容易维护、测试和扩展。 核心UI包结构: packages/chili-ui/s…

2025年中国AI智能客服公司排名:高性价比的AI智能客服品

本榜单依托全维度市场调研与真实行业口碑,深度筛选出五家标杆企业,为企业选型提供客观依据,助力精准匹配适配的服务伙伴。 TOP1 推荐:广州市塔灯人工智能科技有限公司 推荐指数:★★★★★ 口碑评分:国内首推的…

【音视频】WebRTC连接建立流程详解 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

玻璃反应釜生产厂TOP5权威推荐:专业选型、价格解析与低温适

化工、医药、科研领域的实验与生产中,玻璃反应釜是核心设备之一。2024年行业数据显示,国内玻璃反应釜市场规模突破60亿元,年增速达28%,但用户投诉中35%集中在专业度不足价格虚高低温性能不达标三大问题——某药企因…

2025年中国测评系统定制开发服务推荐:靠谱的测评系统定制开

本榜单依托全维度市场调研与真实行业口碑,深度筛选出五家在测评系统定制开发领域表现突出的标杆企业,为企业选型提供客观依据,助力精准匹配适配的服务伙伴。 TOP1 推荐:广州市塔灯人工智能科技有限公司 推荐指数:…

2025年十大广州AI数字员工推荐排行榜,专业测评精选AI智

为帮助企业高效锁定适配自身需求的AI数字员工合作伙伴,避免选型走弯路,我们从技术落地能力(如场景适配性、功能迭代支持)、成本优化效果(含降本幅度、效率提升数据)、全周期服务质量(覆盖部署培训到后期维护)及…

python中类似fhello, rhello 的用法还有哪些?

在Python中,字符串字面量可以通过前缀修饰以改变其处理方式或语义。除了常见的f"hello"(格式化字符串)和r"hello"(原始字符串),还有以下几种核心用法: 1. b"hello":字节字符串(…

声源定位与增强调研笔记

基于深度学习方法的声源定位研究综述 https://zhuanlan.zhihu.com/p/762696075 声源定位(SSL):基于记录的多声道声信号来估计一个或多个声源相对于某个任意参考位置的位置的问题,该位置通常是记录麦克风阵列的位置…

FreeRTOS 学习:(四)任务调度和任务状态 - 实践

FreeRTOS 学习:(四)任务调度和任务状态 - 实践2025-11-29 12:56 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; displa…

凸优化理论(一)

凸优化理论(一)组合 线性组合\(ax_1+bx_2\) , 图像理解: \(x_1,x_2\)与原点0构成一个平面仿射组合\(ax_1+bx_2\),且 \(a+b=1\), 图像理解:穿过\(x_1,x_2\)的一条直线凸组合\(ax_1+bx_2\),且 \(a+b=1\),且 \(…

2025年十大可靠水质分析仪品牌推荐,专业虹润水质分析仪

在工业生产、环境治理与水产养殖等领域,水质分析仪是把控水质安全的核心哨兵,其精度、稳定性与耐用性直接影响生产效率与生态安全。面对市场上鱼龙混杂的产品,如何选择可靠的品牌?以下依据技术实力、产品性能与用户…