【kafka系列】生产者

目录

发送流程

1. 流程逻辑分析

阶段一:主线程处理

阶段二:Sender 线程异步发送

核心设计思想

2. 流程

关键点总结

重要参数

一、核心必填参数

二、可靠性相关参数

三、性能优化参数

四、高级配置

五、安全性配置(可选)

六、错误处理与监控

典型配置示例

关键注意事项


发送流程

  • 序列化与分区:消息通过Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。
  • 批次合并Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。
  • 发送至Broker:通过NetworkClient异步发送,Broker的LogAppendTime处理写入请求。
  • ACK机制:根据acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据

1. 流程逻辑分析

Kafka 生产者发送消息的核心流程分为 主线程处理Sender 线程异步发送 两个阶段,具体步骤如下:


阶段一:主线程处理
  1. 创建 ProducerRecord
    • 用户调用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
  1. 选择分区(Partition)
    • 若未指定分区,根据以下规则选择:
      • 有 Key:对 Key 哈希取模(hash(key) % 分区数),确保相同 Key 的消息进入同一分区。
      • 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 对 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到缓冲区(RecordAccumulator)
    • 将消息按 Topic-Partition 分组,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。
      • linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。

阶段二:Sender 线程异步发送
  1. Sender 线程拉取批次
    • Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为 ProducerRequest
  1. 构建请求并发送到 Broker
    • 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
    • 关键配置
      • acks:控制消息持久化确认级别:
        • 0:不等待确认(可能丢失数据)。
        • 1:等待 Leader 确认(默认)。
        • all:等待所有 ISR 副本确认(最高可靠性)。
      • max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
  1. 处理 Broker 响应
    • 成功:触发用户设置的 Callback 回调,并释放批次内存。
    • 失败
      • 可重试错误(如网络抖动、Leader 切换):根据 retries(默认 0)和 retry.backoff.ms(默认 100ms)重试。
      • 不可重试错误(如消息过大):直接触发回调并抛出异常。

核心设计思想
  • 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
  • 零拷贝优化:使用 sendfile 系统调用提升网络传输效率。
  • 高可靠性:通过重试机制和 acks=all 确保消息不丢失。

2. 流程


关键点总结

  1. 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
  2. 批次优化:通过 batch.sizelinger.ms 平衡延迟与吞吐。
  3. 可靠性保障:通过 acksretries 配置确保消息持久化。
  4. 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。

重要参数

以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:


一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化类(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化类(同上)。


二、可靠性相关参数

参数名

默认值

说明

acks

1

消息持久化确认机制:

0:不等待确认(可能丢失数据)。 1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。

retries

0

发送失败后的重试次数(建议设为 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否启用幂等性(true时保证消息不重复,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

单个 Broker 的未确认请求数。若启用幂等性,建议设为 1

以保证顺序。


三、性能优化参数

参数名

默认值

说明

linger.ms

0

消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。

batch.size

16384

(16KB)

单个批次的大小阈值,达到阈值后立即发送。

buffer.memory

33554432

(32MB)

生产者缓冲区的总内存大小。

compression.type

none

消息压缩算法(gzip

snappy

lz4

zstd

),减少网络带宽占用。


四、高级配置

参数名

默认值

说明

request.timeout.ms

30000

(30秒)

生产者等待 Broker 响应的超时时间。

max.block.ms

60000

(60秒)

生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。

partitioner.class

默认轮询/哈希策略

自定义分区策略(实现 Partitioner

接口)。


五、安全性配置(可选)

参数名

默认值

说明

security.protocol

PLAINTEXT

安全协议(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 证书路径(客户端认证时需配置)。

sasl.mechanism

SASL 认证机制(如 PLAIN

SCRAM-SHA-256

)。


六、错误处理与监控

参数名

默认值

说明

interceptor.classes

生产者拦截器(实现 ProducerInterceptor

接口),用于监控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指标采样窗口时间。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

关键注意事项

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延迟。
  1. 幂等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允许 5)。
  1. 监控与调优
    • 通过 metrics 和拦截器监控生产者性能,动态调整参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/70108.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 入门与实战:从安装到容器管理的完整指南

🚀 Docker 入门与实战:从安装到容器管理的完整指南 🌟 📖 简介 在现代软件开发中,容器化技术已经成为不可或缺的一部分。而 Docker 作为容器化领域的领头羊,以其轻量级、高效和跨平台的特性,深…

MySQL 插入替换语句(replace into statement)

我们日常使用 insert into 语句向表中插入数据时,一定遇到过主键或唯一索引冲突的情况,MySQL的反应是报错并停止执行后续的语句,而replace into语句可以实现强制插入。 文章目录 一、replace into 语句简介1.1 基本用法1.2 使用set语句 二、注…

基于SpringBoot+Vue的智慧校园管理系统设计和实现(源码+文档+部署讲解)

🎬 秋野酱:《个人主页》 🔥 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 .🚀 技术架构技术栈全景 🎯 功能模块功能矩阵表📊 数据库设计核心ER关系图 💻 核心…

【Three.js】JS 3D library(一个月进化史)

#春节过完了,该继续投入学习了~ 作为一个平面开发者,想要增进更多的技能,掌握web3D开发# Day 1 了解熟悉Three.js,着重基础理论 学习资源: 前端可视化从0-1 Day 2 写一个简易demo 搭建环境-->安装包-->创建…

moveable 一个可实现前端海报编辑器的 js 库

目录 缘由-胡扯本文实验环境通用流程1.基础移动1.1 基础代码1.1.1 data-* 解释 1.2 操作元素创建1.3 css 修饰1.4 cdn 引入1.5 js 实现元素可移动1.6 图片拖拽2.缩放3.旋转4.裁剪 懒得改文案了,海报编辑器换方案了,如果后面用别的再更。 缘由-胡扯 导火…

Apollo 9.0 速度动态规划决策算法 – path time heuristic optimizer

文章目录 1. 动态规划2. 采样3. 代价函数3.1 障碍物代价3.2 距离终点代价3.3 速度代价3.4 加速度代价3.5 jerk代价 4. 回溯 这一章将来讲解速度决策算法,也就是SPEED_HEURISTIC_OPTIMIZER task里面的内容。Apollo 9.0使用动态规划算法进行速度决策,从类名…

【Day41 LeetCode】单调栈问题

一、单调栈问题 单调栈问题通常是在一维数组中寻找任一个元素的右边或者左边第一个比自己大或者小的元素的位置。 1、每日温度 739 这题的目的是对于当天,找到未来温度升高的那一天,也就是当前元素的右边第一个比自己大的元素。所以我们需要维护一个单…

Cherno C++ P55 宏

这篇文章我们讲一下C当中的宏。其实接触过大型项目的朋友可能都被诡异的宏折磨过。 宏是在预处理当中,通过文本替换的方式来实现一些操作,这样可以不用反复的输入代码,帮助我们实现自动化。至于预处理的过程,其实就是文本编辑&am…

web第三次作业

弹窗案例 1.首页代码 <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>综合案例</title><st…

深入解析LVS命令参数及DR模式下的ARP抑制原理

深入解析LVS命令参数及DR模式下的ARP抑制原理 一、LVS简介 Linux Virtual Server (LVS) 是基于Linux内核的高性能负载均衡解决方案&#xff0c;支持NAT、DR&#xff08;Direct Routing&#xff09;和TUN&#xff08;IP Tunneling&#xff09;三种模式。其中&#xff0c;ipvsad…

阿里云一键部署DeepSeek-V3、DeepSeek-R1模型

目录 支持的模型列表 模型部署 模型调用 WebUI使用 在线调试 API调用 关于成本 FAQ 点击部署后服务长时间等待 服务部署成功后&#xff0c;调用API返回404 请求太长导致EAS网关超时 部署完成后&#xff0c;如何在EAS的在线调试页面调试 模型部署之后没有“联网搜索…

Win10环境借助DockerDesktop部署大数据时序数据库Apache Druid

Win10环境借助DockerDesktop部署最新版大数据时序数据库Apache Druid32.0.0 前言 大数据分析中&#xff0c;有一种常见的场景&#xff0c;那就是时序数据&#xff0c;简言之&#xff0c;数据一旦产生绝对不会修改&#xff0c;随着时间流逝&#xff0c;每个时间点都会有个新的…

【第13章:自监督学习与少样本学习—13.1 自监督学习最新进展与实现方法】

凌晨三点的实验室,博士生小王盯着屏幕里正在"自娱自乐"的神经网络——这个没有吃过一张标注图片的模型,正在通过旋转、拼图、填色等游戏任务,悄悄掌握着理解世界的秘诀。这种魔法般的修炼方式,正是当今AI领域最炙手可热的技术:自监督学习。 一、打破数据枷锁:自…

数据库报错1045-Access denied for user ‘root‘@‘localhost‘ (using password: YES)解决方式

MySQL 报错 1045 表示用户root从localhost连接时被拒绝访问&#xff0c;通常是因为密码错误、权限问题或配置问题。以下是解决该问题的常见方法&#xff1a; 方法一&#xff1a;检查用户名和密码 • 确认用户名和密码是否正确&#xff1a; 确保输入的用户名和密码完全正确&am…

八大排序——简单选择排序

目录 1.1基本操作&#xff1a; 1.2动态图&#xff1a; 1.3代码&#xff1a; 代码解释 1. main 方法 2. selectSort 方法 示例运行过程 初始数组 每轮排序后的数组 最终排序结果 代码总结 1.1基本操作&#xff1a; 选择排序&#xff08;select sorting&#xff09;也…

与传统光伏相比 城电科技的光伏太阳花有什么优势?

相比于传统光伏&#xff0c;城电科技的光伏太阳花有以下优势&#xff1a; 一、发电效率方面 智能追踪技术&#xff1a;光伏太阳花通过内置的智能追踪系统&#xff0c;采用全球定位跟踪算法&#xff0c;能够实时调整花瓣&#xff08;即光伏板&#xff09;的角度&#xff0c;确…

FPGA的星辰大海

编者按 时下风头正盛的DeepSeek,正值喜好宏大叙事的米国大统领二次上岗就业,OpenAI、软银、甲骨文等宣布投资高达5000亿美元“星际之门”之际,对比尤为强烈。 某种程度上,,是低成本创新理念的直接落地。 包括来自开源社区的诸多赞誉是,并非体现技术有多“超越”,而是…

Elasticsearch:15 年来致力于索引一切,找到重要内容

作者&#xff1a;来自 Elastic Shay Banon 及 Philipp Krenn Elasticsearch 刚刚 15 岁了&#xff01;回顾过去 15 年的索引和搜索&#xff0c;并展望未来 15 年的相关内容。 Elasticsearch 刚刚成立 15 周年。一切始于 2010 年 2 月的一篇公告博客文章&#xff08;带有标志性的…

嵌入式软件、系统、RTOS(高软23)

系列文章目录 4.2嵌入式软件、系统、RTOS 文章目录 系列文章目录前言一、嵌入式软件二、嵌入式系统三、嵌入式系统分类四、真题总结 前言 本节讲明嵌入式相关知识&#xff0c;包括软件、系统。 一、嵌入式软件 二、嵌入式系统 三、嵌入式系统分类 四、真题 总结 就是高软笔记…

数据结构 day02

3. 线性表 3.1. 顺序表 3.1.3. 顺序表编程实现 操作&#xff1a;增删改查 .h 文件 #ifndef __SEQLIST_H__ #define __SEQLIST_H__ #define N 10 typedef struct seqlist {int data[N];int last; //代表数组中最后一个有效元素的下标 } seqlist_t;//1.创建一个空的顺序表 seq…