Kafka 多线程开发消费者实例

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

Kafka Java Consumer 设计原理

在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。

谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程

所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。

其实,在社区推出 Java Consumer API 之前,Kafka 中存在着一组统称为 Scala Consumer 的 API。这组 API,或者说这个 Consumer,也被称为老版本 Consumer,目前在新版的 Kafka 代码中已经被完全移除了。

我之所以重提旧事,是想告诉你,老版本 Consumer 是多线程的架构,每个 Consumer 实例在内部为所有订阅的主题分区创建对应的消息获取线程,也称 Fetcher 线程。老版本 Consumer 同时也是阻塞式的(blocking),Consumer 实例启动后,内部会创建很多阻塞式的消息获取迭代器。但在很多场景下,Consumer 端是有非阻塞需求的,比如在流处理应用中执行过滤(filter)、连接(join)、分组(group by)等操作时就不能是阻塞式的。基于这个原因,社区为新版本 Consumer 设计了单线程 + 轮询的机制。这种设计能够较好地实现非阻塞式的消息获取。

除此之外,单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

另外,不论使用哪种编程语言,单线程的设计都比较容易实现。相反,并不是所有的编程语言都能够很好地支持多线程。从这一点上来说,单线程设计的 Consumer 更容易移植到其他语言上。毕竟,Kafka 社区想要打造上下游生态的话,肯定是希望出现越来越多的客户端的。

多线程方案

了解了单线程的设计原理之后,我们来具体分析一下 KafkaConsumer 这个类的使用方法,以及如何推演出对应的多线程方案。

首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

当然了,这也不是绝对的。KafkaConsumer 中有个方法是例外的,它就是wakeup(),你可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。

鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。

  1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

总体来说,这两种方案都会创建多个线程,这些线程都会参与到消息的消费过程中,但各自的思路是不一样的。

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。

这两种方案孰优孰劣呢?应该说是各有千秋。我总结了一下这两种方案的优缺点,我们先来看看下面这张表格。


推荐阅读

结合案例深入理解DDD聚合与聚合根

技术架构:作为开发,你真的了解系统吗-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/899511.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zynq7000 + ucos3 + lwip202_v1_2调试过程

1 现在裸机应用上验证lwip 跑起来可能会报错,看下面的链接解决 zynq 网卡Phy setup error问题 zynq 网卡Phy setup error问题-CSDN博客 2 ping同以后,在zynq上添加ucos系统 链接如下: ZYNQ移植uCOSIII_zynq ucos-CSDN博客 3 移植lwip协议…

Android7 Input(二)Linux 驱动层输入事件管理

概述 在Linux系统中,将键盘,鼠标,触摸屏等这类交互设备交由Linux Input子系统进行管理,Linux Input驱动子系统由于具有良好的和用户空间交互的接口。因此Linux Input驱动子系统,不止于只管理输入类型的设备。也可以将其…

Java内存中的Heap(堆)的作用

Java内存中的Heap(堆)的作用 在 Java 的内存模型中,Heap(堆) 是 JVM(Java Virtual Machine)管理的运行时数据区域之一,主要用于存储程序运行过程中动态分配的对象和数据。它是 Java…

自行车模型与汽车模型的混合策略在自动驾驶中的多维度协同优化

基于动态架构与智能调度的自动驾驶系统设计 #mermaid-svg-1yvF1EzG07ktndY6 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-1yvF1EzG07ktndY6 .error-icon{fill:#552222;}#mermaid-svg-1yvF1EzG07ktndY6 .error-tex…

mysql.8.4.4--初始化报错--libnuma.so.1缺失

错误 mysqld: error while loading shared libraries: libnuma.so.1: cannot open shared object file: No such file or directory解决办法:下载相关依赖 sudo apt update sudo apt install numactl然后重新初始化 mysqld --initialize

【区块链安全 | 第三篇】主流公链以太坊运行机制

文章目录 1. 以太坊账户类型2. 以太坊网络架构2.1 节点类型2.2 交易流程 3. 共识机制4. Gas 机制4.1 Gas 计算方式4.2 以太坊 EIP-1559 交易机制 5. EVM(以太坊虚拟机)5.1 EVM 结构5.2 EVM 指令5.3 EVM 运行机制 6. 智能合约7. ERC 代币标准7.1 ERC-207.…

计算机三级信息安全部分英文缩写

eip,指令寄存器,用于存放指向下一条将执行指令的指针,即返回地址栈顶指针esp基址指针寄存器EBP,基地址数据执行保护DEP(Data Execute Prevention)技术可以设置内存堆栈区的代码为不可执行状态,从而防范溢出后代码的执行…

【Goalng】第九弹-----文件操作、JSON处理

🎁个人主页:星云爱编程 🔍所属专栏:【Go】 🎉欢迎大家点赞👍评论📝收藏⭐文章 长风破浪会有时,直挂云帆济沧海 目录 1.文件操作 1.1文件介绍 1.2.文件流 1.3.打开和关闭文件 1…

C#高级:启动、中止一个指定路径的exe程序

一、启动一个exe class Program {static void Main(string[] args){string exePath "D:\测试\Test.exe";// 修改为你要运行的exe路径StartProcess(exePath);}private static bool StartProcess(string exePath){// 创建一个 ProcessStartInfo 对象来配置进程启动参…

猜猜我用的是哪个大模型?我的世界游戏界面简单的模拟效果

我的罗里吧嗦的,根据小朋友的要求,边听边写边输入的提示词: 请生成一段完整的在网页中用html5和javascript代码模拟“我的世界”中游戏场景的互动画面,要求提供若干人物选项可以选择,请自行选择需要使用哪些库或框架来…

AI知识补全(八):多模态大模型是什么?

名人说:人生如逆旅,我亦是行人。 ——苏轼《临江仙送钱穆父》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 上一篇:AI知识补全(七):AI Agent 智能…

更新docker 容器时,提前换后端jar 包,为什么会存在异常

我们现场更新时,通常都是提前将后端jar 包替换了,然后到了更新的时间,只需要更新相关的前端文件和修改各种配置,就行了。 但是最近一次更新操作中,忽然发现,提前更新后端包,会存在依赖丢失问题…

LoRA 模型微调框架核心原理及实现步骤

LoRA(Low-Rank Adaptation)模型微调框架通过低秩矩阵分解原理,实现了对大型预训练模型的高效微调。其核心原理是:在冻结预训练模型权重的基础上,向特定层注入可训练的低秩矩阵,以极少量参数(通常…

XHR.readyState详解

XHR.readyState详解 引言 XHR.readyState是XMLHttpRequest对象的一个属性,它反映了当前请求的状态。在Ajax编程中,正确理解和使用XHR.readyState对于调试和确保异步请求的正确执行至关重要。本文将详细介绍XHR.readyState的属性值、含义以及在Ajax请求中的具体应用。 XHR.…

MySQL8.4 InnoDB Cluster高可用集群使用指南

简介 高可用方案 Orchestrator: 可视化 Web 界面管理 MySQL 拓扑结构,并且兼容多种复制架构(异步、半同步、GTID),提供自动和手动的故障转移。但是8.0.21后 MySQL 更新了主从复制相关命令,Orchestrator无…

扩散模型总结

目录 定义与原理 发展历程 正向扩散过程 反向扩散过程 噪声预测网络 离散时间模型 连续时间模型 条件扩散模型 生成质量 训练稳定性 采样灵活性 图像生成 音频合成 文本生成 计算效率 模型复杂度 定义与原理 扩散模型是一种新型的生成模型,其核心原理源于热力…

【Java】Java核心知识点与相应面试技巧(七)——类与对象(二)

Java 类与对象篇 1.上期面试题解析: 上文链接:https://blog.csdn.net/weixin_73492487/article/details/146607026 创建对象时的内存分配过程? ① 加载类 ② 堆内存分配空间 ③ 默认初始化 ④ 显式初始化 ⑤ 构造器执行 this和super能否同时…

笔记:遇见未来——6G协同创新技术研讨会

https://www.cww.net.cn/article?id564308 研讨会由中国移动研究院首席科学家易芝玲博士主持。来自清华大学-中国移动联合研究院、北京邮电大学-中国移动研究院联合创新中心、东南大学-中国移动研究院联合创新中心、中关村泛联移动通信技术创新应用研究院等合作载体的知名教授…

Python Cookbook-4.14 反转字典

任务 给定一个字典,此字典将不同的键映射到不同的值。而你想创建一个反转的字典,将各个值反映射到键。 解决方案 可以创建一个函数,此函数传递一个列表推导作为dict的参数以创建需要的字典。 def invert_dict(d):return dict([(v,k) for …

深度学习在测距模型中的应用

一、单目视觉测距和双目视觉测距简介 1、单目视觉测距 模型:深度估计(Depth Estimation) 原理:通过深度学习模型(如MonoDepth2、MiDaS)或传统的计算机视觉方法(如单目相机结合物体大小推断&am…