KafKa概念与安装 - 详解

news/2025/10/21 9:18:18/文章来源:https://www.cnblogs.com/slgkaifa/p/19154102

KafKa概念与安装 - 详解

Kafka 的核心概念

  1. Producer(生产者):向 Kafka 集群发送消息的应用程序
  2. Consumer(消费者):从 Kafka 集群读取消息的应用程序
  3. Broker(代理):Kafka 服务器,负责存储消息和处理客户端请求
  4. Topic(主题):消息的分类名称,生产者向特定主题发送消息,消费者从特定主题读取消息
  5. Partition(分区):每个主题分为多个分区,实现消息的并行处理和存储
  6. Offset(偏移量):每个分区中的消息都有一个唯一的偏移量,用于标识消息位置
  7. Consumer Group(消费者组):多个消费者组成的群体,共同消费一个主题的消息

Kafka 的主要特点

  • 高吞吐量:能够处理每秒数十万条消息
  • 持久性:消息被持久化到磁盘,可持久保存
  • 分布式:集群部署,具有高可用性和容错性
  • 实时性:支持实时数据处理和流处理
  • 可扩展性:可以轻松扩展集群规模

为什么使用消息中间件(MQ)

  • 异步调用,一个应用内部的两个模块之间(同步变异步)
  • 应用解耦(提供基于数据的接口层)
  • 流量削峰(缓解瞬时高流量压力)

典型应用场景

  • 日志收集:集中收集分布式系统的日志数据(如 ELK 架构中的日志传输)。
  • 实时数据管道:在不同系统间构建实时数据流转通道(如数据库变更同步、业务数据实时分发)。
  • 流处理:与流处理框架(如 Flink、Spark Streaming)结合,实现实时数据清洗、分析和计算(如实时监控、实时推荐)。
  • 消息系统:作为高可靠的消息中间件,实现系统解耦和异步通信。

kafka的安装

kafka安装:⾸先恢复快照
(1)下载并上传kafka_2.11-2.4.0.tgz到/opt/software
(2)解压:tar -zxvf kafka_2.11-2.4.0.tgz -C /opt/install
(3)创建软链接:ln -s kafka_2.11-2.4.0/ kafka
(4)配置环境变量:vi /etc/profile
export KAFKA_HOME=/opt/install/kafka

export KAFKA_HOME=/opt/install/kafka

(5)使环境变量⽣效:source /etc/profile
(6)修改config/server.properties⽂件:
log.dirs=/opt/install/kafka/kafka-logs zookeeper.connect=hadoop101:2181/kafka末尾添加:delete.topic.enable=true
(7)启动zookeeper服务:zkServer.sh start
(8)启动kafka服务:bin/kafka-server-start.sh -daemon config/server.properties
(9)新开窗⼝,验证服务:jps
(10)创建主题:kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic mytopic001
(11)查看所有主题:kafka-topics.sh --list --bootstrap-server hadoop101:9092
(12)查看特定主题:kafka-topics.sh --describe --bootstrap-server hadoop101:9092 --topic mytopic001
(13)新开窗⼝,⽣产消息:kafka-console-producer.sh --broker-list hadoop101:9092 --topic mytopic001 --property parse.k
ey=true【默认消息键与消息值间使⽤“Tab键”进⾏分隔】
(14)新开窗⼝,消费消息:kafka-console-consumer.sh --bootstra
p-server hadoop101:9092 --topic mytopic001 --property print.key=true --from-beginning
,然后在⽣产窗⼝中输⼊数据并观察消费窗⼝
(15)删除主题:kafka-topics.sh --delete --bootstrap-server
hadoop101:9092 --topic mytopic001
(16)停⽌kafka服务:bin/kafka-server-stop.sh
(17)停⽌zookeeper服务:zkServer.sh stop

流程:生产者发消息到主题分区→Broker 存储并同步副本→消费者组从分区拉取消息处理。

核心特点:分区并行提升吞吐量,副本保证高可用,支持海量数据持久化与实时处理。

Kafka Topic

1. Topic
主题是已发布消息的类别名称
发布和订阅数据必须指定主题
主题副本数量不⼤于Brokers个数

Partition
⼀个主题包含多个分区,默认按Key Hash分区
每个Partition对应⼀个⽂件夹<topic_name>-<partition_id>
每个Partition被视为⼀个有序的⽇志⽂件(LogSegment)
Replication策略是基于Partition,⽽不是Topic
每个Partition都有⼀个Leader,0或多个Follower且被动复制
Leader
基本的配置在/opt/install/kafka/conf/server.properties⽂件中
kafka-topics.sh --create--bootstrap-server hadoop101:9092 \--topic mytopic002 --partitions 3--replication-factor 3
创建⼀个主题,分区数为3, 副本数为1
kafka-topics.sh --create \--bootstrap-server hadoop101:9092 \--topic mytopic002 \--partitions 3 \--replication-factor 1
查看特定主题
kafka-topics.sh --describe \--bootstrap-server hadoop101:9092 \--topic mytopic002

Kafka Producer

2. Producer直接发送消息到Broker上的Leader Partition
Producer发布消息时根据消息是否有键,采⽤不同的分区策略:
消息没有键时,通过轮询⽅式进⾏客户端负载均衡;
消息有键时,根据分区语义(例如hash)确保相同键的消息总是发
送到同⼀分区
15. Kafka Consumer
1. 消费者通过订阅消费消息
offset的管理是基于消费组(group.id)的级别
每个Partition只能由同⼀消费组内的⼀个Consumer来消费
每个Consumer可以消费多个分区
消费过的数据仍会保留在Kafka中
同⼀组的消费者数量不能超过分区数量
消费模式
发布/订阅:所有消费者可被分配到不同的消费组

一个生产者,两个消费者
(13)新窗口,生产消息
kafka-console-producer.sh \--broker-list hadoop101:9092 \  # 指定 Kafka 集群的 Broker 地址--topic mytopic002 \             # 消息要发送到的目标主题--property parse.key=true        # 允许输入消息的 Key(键)
(14)新开窗口,消费消息
新开窗⼝,消费消息:kafka-console-consumer.sh --bootstra
p-server hadoop101:9092 --topic mytopic002 --property pr
int.key=true
(14)新开窗⼝,消费消息:kafka-console-consumer.sh --bootstra
p-server hadoop101:9092 --topic mytopic002 --property pr
int.key=true
两个消费者,在同一个默认组里,只能有一个消费?????
指定两个消费者
(13)新窗口,生产消息
kafka-console-producer.sh \--broker-list hadoop101:9092 \  # 指定 Kafka 集群的 Broker 地址--topic mytopic002 \             # 消息要发送到的目标主题--property parse.key=true        # 允许输入消息的 Key(键)
两消费者,不在通过一个组里面  -- group
# 消费者1(组g1)
kafka-console-consumer.sh--bootstrap-server hadoop101:9092 \--topic mytopic002--property print.key=true--group g1
# 消费者2(组g2)
kafka-console-consumer.sh--bootstrap-server hadoop101:9092 \--topic mytopic002--property print.key=true--group g2
两个消费者在不同的组里面都会收到消息

kafkaMessage

ZooKeeper在Kafka中的作⽤

1. Broker注册并监控状态
/brokers/ids
Topic注册
/brokers/topics
⽣产者负载均衡
每个Broker启动时,都会完成Broker注册过程,⽣产者会通过该节
点的变化来动态地感知到Broker服务器列表的变更
offset维护
Kafka使⽤⾃⼰的内部主题维护offset

kafka数据流

流程图关键步骤说明:

  1. 生产者发送:生产者将封装好的消息(含业务数据 Value、路由用 Key、附加 Headers)发送到 Broker 集群。
  2. 分区路由:Kafka 根据消息 Key 的哈希值,将消息分配到 Topic 下的指定分区(保证同 Key 消息入同一分区)。
  3. 持久化存储:消息按 Offset 顺序写入分区对应的磁盘日志文件,确保数据不丢失。
  4. 副本同步:Leader 分区(处理读写)的数据实时同步到 Follower 副本(备份节点),保证高可用。
  5. 消费者拉取:消费者组内的消费者主动从分配到的分区拉取消息(组内分区唯一分配,实现负载均衡)。
  6. 记录 Offset:每个消费者组独立维护 Offset,标记已消费的消息位置,支持断点续传和回溯消费。
  7. 消息清理:按配置的保留策略(如默认 7 天)自动删除过期消息,释放磁盘空间。

副本同步、容灾、高并发、负载均衡

broker : 有3台
producer: 有2个⽣产者
consumer: 有4个消费者
group : 有2个消费组
topic : 有2个主题
topic0 有2个分区
topic0 有3个副本
topic1 有1分区
leader 是红⾊
蓝线是⽣产者给leader发消息
绿线是leader给 flower同步消息
同⼀个分区的消息,同组⾥只能有⼀个消费者消费
同⼀个消息可以给不同的分组消费

1.生产者发送消息

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();props.put("bootstrap.servers", "hadoop101:9092"); // Broker 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Key 序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Value 序列化器// 2. 创建生产者实例Producer producer = new KafkaProducer<>(props);// 3. 发送消息String topic = "mytopic002";for (int i = 0; i < 5; i++) {String key = "key" + i;String value = "message" + i;// 构建消息ProducerRecord record = new ProducerRecord<>(topic, key, value);// 同步发送(或使用 send(record, callback) 异步发送)producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("发送成功:" + metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());} else {exception.printStackTrace();}});}// 4. 关闭生产者producer.close();}
}

2.消费者接收消息

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put("bootstrap.servers", "hadoop101:9092"); // Broker 地址props.put("group.id", "g1"); // 消费者组 IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // Key 反序列化器props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // Value 反序列化器props.put("auto.offset.reset", "earliest"); // 无偏移量时从头消费// 2. 创建消费者实例Consumer consumer = new KafkaConsumer<>(props);// 3. 订阅主题String topic = "mytopic002";consumer.subscribe(Collections.singletonList(topic));// 4. 拉取消息while (true) {ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 拉取消息,超时时间 100msfor (ConsumerRecord record : records) {System.out.printf("接收消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(),record.key(), record.value());}consumer.commitSync(); // 手动提交偏移量(或配置自动提交)}// 5. 关闭消费者(实际中需在退出时调用)// consumer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/941942.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MongoDB 聚合管道完全指南:数据分析的强大设备

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025 年最新防火涂料厂家排行榜:膨胀型 / 非膨胀型 / 厚型 / 薄型钢结构防火涂料优质企业最新推荐

引言 在建筑与工业安全领域,防火涂料是抵御火灾、守护生命财产安全的关键防线。当前市场上防火涂料品牌繁杂,部分产品存在防火性能不达标、环保性差、施工适配性不足等问题,加之行业技术迭代快,新品牌不断涌现,企…

2025 年南昌装修设计公司推荐:宿然设计,非营销型技术工作室,专注落地还原,提供全国纯设计与江西全案服务

当前南昌装修行业发展迅速,市场上装修公司数量众多,种类繁杂。不少消费者在选择装修公司时,常常面临难以抉择的困境,他们既希望找到能提供新颖设计方案的公司,又担忧设计无法顺利落地,难以实现理想中的居住空间效…

2025 年板材源头厂家最新推荐排行榜:聚焦 ENF 级环保与高品质,精选 6 家实力企业助您轻松选

引言 当前建材市场中,板材产品种类繁杂,质量与性能参差不齐,消费者在选购时常常面临诸多困扰:传统板材环保不达标,苯系物、甲醛等有害物质释放危害健康;部分产品易燃、施工复杂,不仅增加成本还延误工期;缺乏权…

优先队列运算符重载

方式 1 struct ab{int b,v,k;bool operator <(const ab &a)const{//代表前(v)与后(a.v)进行比较return v>a.v;} }; priority_queue<ab> q;方式 2 struct ab{int b,v,k;friend bool operator<(ab a,…

Mac INodeClient 异常连接 解决方案

起因 Mac 重启电脑 未关闭InodeClient 导致重启之后 连接卡死 反复重启卸载并没有启作用 解决方案 #!/bin/bash # 修复版清理脚本echo "=== 1. 强制终止所有iNode进程 ===" sudo pkill -9 iNode 2>/dev/n…

2025年GEO品牌推荐榜单:AI技术驱动的行业革新者

摘要 随着人工智能技术的快速发展,GEO行业正迎来前所未有的变革机遇。2025年,基于AI搜索优化和智能推荐算法的GEO服务已成为企业数字化转型的核心驱动力。本文通过对行业领先企业的深度分析,为寻求GEO服务的企业提供…

2025年GEO品牌推荐排行榜TOP10:AI技术驱动的行业新格局

摘要 随着人工智能技术在GEO领域的深度应用,2025年行业正迎来智能化转型的关键节点。本文基于技术实力、服务能力和市场口碑三大维度,对当前主流GEO服务商进行综合评估,为寻求加盟合作的企业提供权威参考。文末附行…

基于STM32F1x系列与JY901模块串口通信

一、硬件JY901引脚 STM32F103引脚 功能说明VCC 3.3V 电源供电GND GND 地线TX PA10 (USART1_RX) 接收数据RX PA9 (USART1_TX) 发送数据二、STM32串口配置代码(HAL库) // usart.c #include "stm32f1xx_hal.h"…

2025 年最新推荐防火涂料厂家排行榜:涵盖膨胀型、非膨胀型、室内外及超薄厚型钢结构防火涂料,助选优质产品

引言 在建筑行业持续发展的当下,防火安全是建筑安全的核心环节,防火涂料作为关键防护屏障,其质量直接关乎生命财产安全。当前市场中,部分防火涂料品牌为逐利降低成本,导致产品防火性能不达标、耐久性差,且品牌繁…

Hash与布隆过滤器

hash 函数 映射函数 Hash(key)=addr ;hash 函数可能会把两个或两个以上的不同 key 映射到同一地址,这种情况称之为冲突(或者 hash 碰撞)hash的优势计算速度快 强随机分布(等概率、均匀地分布在整个地址空间) m…

习题-归纳定义原理

习题1. 设\((b_1,b_2,\cdots)\)是实数的一个无穷序列。用归纳法定义它的和\(\sum_{k=1}^n b_k\)如下: \[\begin{align*}&\sum_{k=1}^n b_k = b_1\qquad\qquad\text{当}n=1,\\&\sum_{k=1}^n=(\sum_{k=1}^{n-1}…

对话式 AI 年度春晚:Convo AIRTE2025 全议程解锁

10 月 31 日 - 11 月 1 日北京悠唐皇冠假日酒店RTE2025 第十一届实时互联网大会两日全议程上线抢先预览,即刻收藏!阅读更多 Voice Agent 学习笔记:了解最懂 AI 语音的头脑都在思考什么

2025年安恒信息公司:深度解析AI与数据安全双轮驱动的技术护城河

引言:本文从“技术落地与标准制定”维度切入,拆解安恒信息如何在AI安全垂域大模型、隐私计算平台、国家级标准编制三条主线中形成可复用的技术护城河,为正在评估安全供应商的政企单位提供一份可落地的客观参考。 背…

C# Avalonia 16- Animation- SampleViewer - SimpleExample

C# Avalonia 16- Animation- SampleViewer - SimpleExampleSampleViewer.axaml代码<Window xmlns="https://github.com/avaloniaui"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xm…

2025年安恒信息深度解析:AI与数据安全双轮驱动的技术演进全景

引言 本文聚焦“技术演进”这一核心维度,对安恒信息技术股份有限公司(688023)进行拆解,为正在评估网络安全供应商、规划数据安全预算或研究AI安全落地路径的政企决策者提供一份可对照的技术路线图。 背景与概况 安…

清单测试

* { margin: 0; padding: 0; box-sizing: border-box; font-family: "Segoe UI", system-ui, sans-serif } body { min-height: 100vh; display: flex; justify-content: center; align-items: center; padd…

开源手写识别库zinnia

开源手写识别库zinnia1.识别率依赖于笔画的顺序和方向 2.汉字结构特征: 左右结构:明 好 上下结构:思 想 包围结构:国 围 独体字:人 水 3.局限性 对连笔字识别效果较差 无法处理行书,草书等自由书写 单字识别,缺乏上…

穿透式页面的参数注意事项

穿透式页面的参数注意事项从一个面板点击一个卡片穿透到另一个页面,需要带一些查询统计的参数过去,但是新的页面自带了一些默认的查询参数,怎么办?不能直接把默认的查询参数去掉,因为跳转的页面其他地方也需要用上…

2025年10月中国宝宝辅食品牌推荐榜:深海去刺鱼领衔对比

第一次给宝宝添辅食,家长往往一边兴奋一边忐忑:怕过敏、怕营养不够、怕重金属、怕质地太粗噎到孩子。母婴社群里“谁家米粉铁超标”“哪款鱼泥刺没剔干净”的吐槽,让新手爸妈把购物车改来改去。2025年农业农村部《婴…