(处理 Kafka 消息积压) - 高吞吐 + 零丢失的阻塞队列实战方案

一、分布式日志消费场景与挑战

在分布式日志系统中,Kafka 通常作为消息队列中间件,负责从日志生产者接收日志,并将其分发给日志消费者进行处理。为了平衡 Kafka 消费速度与日志处理速度,BlockingQueue 常被用作缓冲区,连接 Kafka 消费线程和多线程日志处理器。

典型架构:
1、Kafka 消费线程
从 Kafka 中持续拉取日志,放入 BlockingQueue 中。
2、多线程日志处理器
从 BlockingQueue 中取出日志,进行解析、存储或其他业务逻辑处理。
3、缓冲区(BlockingQueue)
作为生产者(Kafka 消费线程)和消费者(日志处理线程)之间的桥梁,平衡两者的速度差异。

主要挑战:
1、高吞吐需求
需要设计高效的线程模型,最大化日志处理吞吐量。
2、消息积压问题
当日志处理速度跟不上 Kafka 消费速度时,BlockingQueue 可能被填满,导致 Kafka 消费线程阻塞,甚至丢失消息。

二、基于 BlockingQueue 的高吞吐消费者线程模型

在分布式日志消费中,实现高吞吐的关键在于多线程并发处理和合理的线程模型设计。以下是一个典型的高吞吐消费者线程模型。

1、 消费者线程模型设计
为了高效消费和处理日志,我们可以将任务分为以下两部分:

  • Kafka 消费线程

负责从 Kafka 持续拉取日志,并将其放入 BlockingQueue。

  • 日志处理线程池

从 BlockingQueue 中取出日志,执行并发处理。

代码示例:

// 定义阻塞队列,作为缓冲区  
BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(10000);  // Kafka 消费线程  
Thread kafkaConsumerThread = new Thread(() -> {  while (true) {  try {  // 从 Kafka 拉取日志  String log = kafkaConsumer.poll(Duration.ofMillis(100));  if (log != null) {  logQueue.put(log); // 放入阻塞队列  }  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  
});  // 日志处理线程池  
ExecutorService logProcessorPool = Executors.newFixedThreadPool(10);  
for (int i = 0; i < 10; i++) {  logProcessorPool.submit(() -> {  while (true) {  try {  // 从阻塞队列中取日志  String log = logQueue.take();  processLog(log); // 处理日志  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  });  
}  // 启动 Kafka 消费线程  
kafkaConsumerThread.start();

2、设计要点分析
阻塞队列的大小:
队列大小需要根据系统的内存限制和吞吐量需求进行合理配置。过小的队列可能导致 Kafka 消费线程频繁阻塞,过大的队列则可能占用过多内存,影响系统性能。

线程池的大小:
日志处理线程池的线程数需要根据业务逻辑的复杂度和 CPU 核心数调整。一般情况下,线程数可以设置为 CPU 核心数的 2 倍(I/O 密集型任务)或相等(CPU 密集型任务)。

Kafka 消费速率:
使用 Kafka 的 poll 方法可以批量拉取日志,适当调整批量大小可以提高消费效率。建议设置批量大小与队列容量相匹配,避免一次性拉取过多数据。

三、如何避免队列满了导致消息丢失?

在高并发场景下,如果日志处理速度跟不上 Kafka 消费速度,BlockingQueue 很可能被填满,导致 Kafka 消费线程阻塞,甚至引发消息丢失问题。以下是几种常见的解决方案:

1、流控机制:动态调整 Kafka 消费速率
流控机制的核心思想是根据队列的剩余容量动态调整消费速率,确保生产和消费的平衡。具体实现方法如下:

暂停 Kafka 消费线程
当队列接近满时,暂停 Kafka 消费线程;当队列有足够空间时,恢复消费。
实现示例:

Thread kafkaConsumerThread = new Thread(() -> {  while (true) {  try {  // 如果队列已满,暂停消费  if (logQueue.remainingCapacity() == 0) {  Thread.sleep(100); // 暂停 100ms  continue;  }  // 拉取日志并放入队列  String log = kafkaConsumer.poll(Duration.ofMillis(100));  if (log != null) {  logQueue.put(log);  }  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  
});

动态调整批量大小
根据队列的剩余容量,动态调整 Kafka 拉取日志的批量大小,避免一次性拉取过多数据导致队列溢出。

2、自定义阻塞队列:持久化溢出日志
默认的 BlockingQueue 会在队列满时阻塞生产线程,但我们可以通过自定义队列,在队列满时将溢出的日志持久化到磁盘,避免数据丢失。

自定义队列实现示例:

class DiskBackedQueue extends LinkedBlockingQueue<String> {  private final File backupFile = new File("backup.log");  @Override  public boolean offer(String log) {  boolean success = super.offer(log);  if (!success) {  // 队列满时,将日志写入磁盘  try (FileWriter writer = new FileWriter(backupFile, true)) {  writer.write(log + System.lineSeparator());  } catch (IOException e) {  e.printStackTrace();  }  }  return success;  }  
}

通过这种方式,即使队列满了,日志也不会丢失,而是被安全地存储到磁盘中。

3、消息回写 Kafka:使用死信队列
当队列满时,可以将日志重新写入 Kafka 的另一个主题(通常称为“死信队列”),以便后续重新消费。

实现步骤:

1、当 BlockingQueue 满时,捕获 offer 方法的失败状态。
2、使用 Kafka Producer 将日志写入死信队列。
实现代码:

if (!logQueue.offer(log)) {  kafkaProducer.send(new ProducerRecord<>("dead_letter_topic", log)); // 写入死信队列  
}

这种方式可以保证即使队列溢出,日志也不会丢失,而是被转移到另一个 Kafka 主题中等待后续处理。

4、提升队列处理能力
如果队列溢出频繁发生,可以通过以下方式提升处理能力:

  • 增加日志处理线程数

扩展线程池规模,以提高日志的处理速度。

  • 优化日志处理逻辑

减少单条日志的处理耗时,例如使用批量处理或异步存储。

  • 多队列分流

根据日志的类型或来源,将日志分配到多个队列,每个队列独立消费。

四、总结与最佳实践

在分布式日志系统中,BlockingQueue 是实现高吞吐和缓冲的重要工具,但在高并发场景下,消息积压和队列溢出可能导致数据丢失。以下是本文总结的最佳实践:

1、高吞吐消费者线程模型:

  • 使用 Kafka 消费线程与日志处理线程池分工协作。
  • 根据吞吐量需求调整队列大小和线程池规模。

2、流控机制避免队列溢出:

  • 动态调整 Kafka 消费速率,确保生产与消费平衡。
  • 暂停或限制 Kafka 消费线程的拉取操作。

3、自定义队列或持久化机制:

  • 自定义队列将溢出日志存储到磁盘或回写 Kafka。
  • 使用死信队列保存无法及时处理的日志。

4、提升处理能力:

  • 增加线程池规模或优化日志处理逻辑。
  • 使用多队列分流,将日志按类型分配到不同的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68319.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Unity】unity3D 调用LoadSceneAsync 场景切换后比较暗 部门材质丢失

解决方法&#xff1a;两个场景使用同样灯光 现象 直接进入第二个场景是可以正常显示 调用LoadSceneAsync来切换后&#xff0c;第二个场景出现比较暗的情况 解决方法&#xff1a;两个场景使用同样灯光&#xff0c;在loading 的场景中加入灯光。 Light—Directional Light 如果…

红日-VulnStack靶场一

http://vulnstack.qiyuanxuetang.net/vuln/ 一、环境部署 win7(被攻击机/关火墙) web服务器 1张外网网卡(桥接192.168.1.105)&#xff0c;一张内网网卡192.168.52.143/255.255.255.0/192.168.52.2 DNS 192.168.52.138 winser2008 域控服务器 1张…

【单片机通过蜂鸣器模拟警号 救护车 警车 等声音 】

单片机通过蜂鸣器模拟警号 救护车 警车 等声音 模拟原理实现代码 模拟原理 该函数利用定时器中断&#xff0c;通过改变 u16Compare 的值&#xff0c;并使用 Adt_SetPeriodBuf 和 Adt_SetCompareValue 函数调整定时器的周期和比较值&#xff0c;产生不同类型的声音。 SoundType…

实现linux硬盘smart检测

一、下载交叉编译libatasmart库 下载链接&#xff1a;https://www.linuxfromscratch.org/blfs/view/svn/general/libatasmart.html libatasmart库编译依赖libudev库&#xff0c;交叉编译器前先准备依赖的libudev: 设置libudev的环境变量&#xff0c;并通过configure编译文件生…

蓝桥杯算法|基础笔记(1)

**时间复杂度** 一、概念理解 时间复杂度是用来衡量算法运行时间随输入规模增长而增长的量级。它主要关注的是当输入规模趋向于无穷大时&#xff0c;算法执行基本操作的次数的增长趋势&#xff0c;而不是精确的运行时间。 二、分析代码中的基本操作 确定关键操作 在一段代码…

Uniapp判断设备是安卓还是 iOS,并调用不同的方法

在 UniApp 中&#xff0c;可以通过 uni.getSystemInfoSync() 方法来获取设备信息&#xff0c;然后根据系统类型判断当前设备是安卓还是 iOS&#xff0c;并调用不同的方法。 示例代码 export default {onLoad() {this.checkPlatform();},methods: {checkPlatform() {// 获取系…

K8S 节点选择器

今天我们来实验 pod 调度的 nodeName 与 nodeSelector。官网描述如下&#xff1a; 假设有如下三个节点的 K8S 集群&#xff1a; k8s31master 是控制节点 k8s31node1、k8s31node2 是工作节点 容器运行时是 containerd 一、镜像准备 1.1、镜像拉取 docker pull tomcat:8.5-jre8…

【Go】:深入解析 Go 1.24:新特性、改进与最佳实践

前言 Go 1.24 尚未发布。这些是正在进行中的发布说明。Go 1.24 预计将于 2025 年 2 月发布。本文将深入探讨 Go 1.24 中引入的各项更新&#xff0c;并通过具体示例展示这些变化如何影响日常开发工作&#xff0c;确保为读者提供详尽而有价值的参考。 新特性及改进综述 HTTP/2 …

macos arm 本地/docker/本地k8s 安装jupyterhub 并登陆

概述 很多文章写的启动官方docker镜像后,新建linux用户即可直接登录,不知道是否版本原因,总之目前最新版我亲测不可以,踩坑两天,这里记录下解决过程,以及各种细节在文档中的位置.以及为什么官方镜像不能直接使用的原因. part1 本地安装jupyterhub https://jupyterhub.readth…

Multi-Agent如何设计

文章小结 研究背景和目的 在单一大语言模型长期主导人工智能领域的背景下&#xff0c;多智能体系统在对话任务解决中逐渐崭露头角。 虽然先前的研究已经展示了多智能体系统在推理任务和创造性工作中的潜力&#xff0c;但对于其在对话范式方面的局限性以及单个智能体的影响&am…

Web端实时播放RTSP视频流(监控)

一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <

Linux 机器学习

Linux 机器学习是指在 Linux 操作系统环境下进行机器学习相关的开发、训练和应用。 具体步骤 环境搭建&#xff1a; 选择合适的 Linux 发行版&#xff1a;如 Ubuntu、Fedora、Arch Linux 等。Ubuntu 因其易用性和丰富的软件包管理系统&#xff0c;适合初学者&#xff1b;Fed…

keepalived双机热备(LVS+keepalived)实验笔记

目录 前提准备&#xff1a; keepalived1&#xff1a; keepalived2&#xff1a; web1&#xff1a; web2&#xff1a; keepalived介绍 功能特点 工作原理 应用场景 前提准备&#xff1a; 准备4台centos&#xff0c;其中两台为keepalived&#xff0c;两台为webkeepalive…

CentOS 7 下 Nginx 的详细安装与配置

1、安装方式 1.1、通过编译方式安装 下载Nginx1.16.1的安装包 https://nginx.org/download/nginx-1.16.1.tar.gz 下载后上传至/home目录下。 1.2、通过yum方式安装 这种方式安装更简单。 2、通过编译源码包安装Nginx 2.1、安装必要依赖 sudo yum -y install gcc gcc-c sudo…

八股学习 Redis

八股学习 Redis 常见场景常见问题问题1、2示例场景缓存穿透解决方案一解决方案二 问题3示例场景缓存击穿解决方案 问题4示例场景缓存雪崩解决方案 问题5示例场景双写一致性强一致方案允许延时一致方案 问题6RDB方式AOF方式两种方式对比 问题7数据过期策略惰性删除定期删除 问题…

Python自学 - 标准库介绍

<< 返回目录 1 Python自学 - 标准库介绍 标准库是安装Python时自带的一些模块集合&#xff0c;集成了丰富的功能&#xff0c;避免用户反复造轮子&#xff0c;这极大的提高了生产效率&#xff01; 1.1 几种常用的标准库 1.1.1 os 模块 提供了与操作系统交互的接口&…

Flutter中Get.snackbar和Get.dialog关闭冲突问题记录

背景&#xff1a; 在使用GetX框架时&#xff0c;同时使用了Get.snackbar提示框和Get.dialog加载框&#xff0c;当这两个widget同时存在时&#xff0c;Get.dialog加载框调用Get.back()无法正常关闭。 冲突解释&#xff1a; 之所以会产生冲突&#xff0c;是因为Get.snackbar在关…

C++ 在线编译软件介绍、杭电OJ、北大OJ、力扣OJ

在线编译软件的话&#xff0c;可见下&#xff1a; https://www.jyshare.com/compile/12/ 杭州电子科技大学开发的一个免费的写代码地址 &#xff0c;杭电OJ https://bestcoder.hdu.edu.cn/ 北大OJ http://poj.org/ 力扣OJ 力扣 (LeetCode) 全球极客挚爱的技术成长平台

【全套】基于机器学习的印度森林火灾发生概率的分析与预测

【私信送源码文档】基于机器学习的印度森林火灾发生概率的分析与预测 对应的ppt 摘 要 随着全球气候变化的不断加剧&#xff0c;火灾的频发和规模逐渐增大&#xff0c;成为备受关注的问题。本文旨在提高对火灾发生概率的准确性&#xff0c;为火灾的预防和管理提供科学支持。在…

mysql性能压测

软件安装 安装sysbench yum install epel-release -y yum install sysbench sysbench --version 创建测试库 CREATE DATABASE demobench CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; 创建测试表&#xff08;不需要&#xff0c;执行准备阶段的命令即可&#xff09;…