【RocketMQ系列十四】RocketMQ中消息堆积如何处理

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 消息堆积
    • 2. 消息堆积出现的原因
    • 3. 如何解决消息堆积

1. 消息堆积

消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。

在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。

消息堆积

这里有个延迟就表示目前堆积的消息数。

2. 消息堆积出现的原因

消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:

  1. 第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。

  2. 第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。

  3. 第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。

3. 如何解决消息堆积

  1. 解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。

  2. 解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。

  3. 解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:

    1. 同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。

    2. 提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息,可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。

    3. 批量消费消息:

      某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。

    下面就让我们来看个例子:

    生产者:使用的是DefaultMQProducer;

    	//4.创建消息StopWatch stopWatch = new StopWatch();stopWatch.start();for (int i = 0; i < 20000; i++) {// 创建消息,指定topic,以及消息体Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());//5.发送消息SendResult send = defaultMQProducer.send(message);System.out.println(send);}stopWatch.stop();System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
    

    消费者:使用的是DefaultMQPushConsumer;

    	// 4.创建一个回调函数consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println("本批次收到的消息数="+msgs.size());// 5.处理消息for (MessageExt msg : msgs) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));}// 返回消费成功的对象return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
    

    生产者329秒内发送了2万条消息,平均60条,

    image-20231014152350841

    而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。

    image-20231014144541572

image-20231014152042570

这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。

image-20231014153721132

当然,官方推荐使用SimpleConsumer进行批量消费消息。

	//每批次拉取16条消息int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(15);// Receive message, multi-threading is more recommended.do {final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);log.info("Received {} message(s)", messages.size());for (MessageView message : messages) {final MessageId messageId = message.getMessageId();try {consumer.ack(message);log.info("Message is acknowledged successfully, messageId={}", messageId);} catch (Throwable t) {log.error("Message is failed to be acknowledged, messageId={}", messageId, t);}}} while (true);

官方的代码示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/117431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ原理(五):消费者的可靠性

文章目录 3.消费者的可靠性3.1.消费者确认机制3.2.失败重试机制3.3.失败处理策略3.4.业务幂等性3.4.1.唯一消息ID3.4.2.业务判断 3.5.兜底方案 3.消费者的可靠性 当RabbitMQ向消费者投递消息以后&#xff0c;需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定…

Unsupervised Medical Image Translation with Adversarial Diffusion Models

基于对抗扩散模型的无监督医学图像翻译 论文链接&#xff1a;https://arxiv.org/abs/2207.08208 项目链接&#xff1a;https://github.com/icon-lab/SynDiff Abstract 通过源-目标模态转换对缺失图像进行补全可以提高医学成像方案的多样性。利用生成对抗网络(GAN)进行一次映…

Leetcode—104.二叉树的最大深度【简单】

2023每日刷题&#xff08;六&#xff09; Leetcode—104.二叉树的最大深度 递归实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/int maxDepth(struct TreeNode* root){…

分类预测 | MATLAB实现SSA-CNN-GRU-Attention数据分类预测(SE注意力机制)

分类预测 | MATLAB实现SSA-CNN-GRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09; 目录 分类预测 | MATLAB实现SSA-CNN-GRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09;分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.MATLAB实现…

python 正则表达式

re.match 在起始位置开始匹配 # 正则表达式是一个特殊的字符序列&#xff0c;它能帮助你方便的检查一个字符串是否与某种模式匹配# re.match() 尝试从字符串的起始位置匹配一个模式&#xff0c;如果不是起始位置匹配成功的话import rehhre.match(我爱你,我爱你-我爱你) # 在起…

C#,数值计算——分类与推理Phylo_upgma的计算方法与源程序

1 文本格式 using System; using System.Collections.Generic; namespace Legalsoft.Truffer { public class Phylo_upgma : Phylagglom { public override void premin(double[,] d, int[] nextp) { } public override double dminfn(doubl…

structs2 重构成SpringBoot架构

# 目录 structs2 重构成SpringBoot架构 1.1 structs2架构&#xff1a; 1.2 springboot 架构 1.3 演化要点&#xff1a; 1.基于前端的展示层不需要修改 2.HttpServlet 将会有SpringBoot annotation 来处理 3.构建前置的Structs url 转发器&#xff0c;适配 4.ActionSupport将由…

第五节——Vue 中如何编写样式

一、内联样式 <template><div style"color:red">红色</div> </template>二、在style标签中编写 1、创建learn-style.css文件 .red { color: red; } 3、在文件中引入 <template><div class"red">红色</div>…

C#开发的OpenRA游戏之金钱系统(5)

C#开发的OpenRA游戏之金钱系统(5) 前面分析了采矿车到矿场采矿的过程,那么采矿车什么时候采满呢?采满之后又是怎么样运送到精炼工厂呢? 首先我们来分析采矿车是怎么样判断采满矿产的,毕竟采矿车不能无限装载矿产资源。所以我们再次回到采矿车类Harvester,来分析它怎么…

上海亚商投顾:沪指放量反弹 两市超4500股飘红

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数昨日集体反弹&#xff0c;深成指、创业板指盘中涨超1%&#xff0c;黄白二线大幅分化&#xff0c;题材…

Git 修改已提交的用户名和邮箱

Git 修改已提交的用户名和邮箱 修改上一次提交的邮箱和用户名 git commit --amend --author Name<email>批量修改多次提交的邮箱和用户名 新建一个 .sh 脚本在 git 根目录下.sh脚本内容如下 git filter-branch --env-filter an"$GIT_AUTHOR_NAME" am"…

shell_46.Linux使用 getopts 命令

使用 getopts 命令 getopt 与 getopts 的不同之处在于&#xff0c;前者在将命令行中选项和参数处理后只生成一个输出&#xff0c;而后者能够和已有的 shell 位置变量配合默契。 getopts 每次只处理一个检测到的命令行参数。在处理完所有的参数后&#xff0c;getopts 会退出并返…

【SwiftUI模块】0060、SwiftUI基于Firebase搭建一个类似InstagramApp 3/7部分-搭建TabBar

SwiftUI模块系列 - 已更新60篇 SwiftUI项目 - 已更新5个项目 往期Demo源码下载 技术:SwiftUI、SwiftUI4.0、Instagram、Firebase 运行环境: SwiftUI4.0 Xcode14 MacOS12.6 iPhone Simulator iPhone 14 Pro Max SwiftUI基于Firebase搭建一个类似InstagramApp 3/7部分-搭建Tab…

Mybatis-Plus CRUD

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Mybatis-Plus CRUD 通用 Service CRUD 封装 IService 接口&#xff0c;进一步封装 CRUD 采用 get 查询、remove 删除 、list 查询集合、page 分页的前缀命名方式区分 …

华为手机的钱包里没有门钥匙要怎样弄

缘起&#xff1a; 即废话&#xff0c;公司的门禁卡又丢了&#xff0c;而经常出入的门又需要门禁卡&#xff0c;指纹识别太慢&#xff0c;而且一到春秋&#xff0c;我的指纹就很浅&#xff0c;很难识别。 聪明 拿起华为手机&#xff0c;一个年老的nova8. 进入钱包&#xff0c…

AlDente Pro for Mac: 掌控电池充电的终极解决方案

你是否曾经为了保护你的MacBook的电池&#xff0c;而苦恼于无法控制它的充电速度&#xff1f;AlDente Pro for Mac 是一款专为Mac用户设计的电池管理工具&#xff0c;它能帮助你解决这个问题。 AlDente Pro for Mac 是一款电池最大充电限制软件&#xff0c;它能够让你自由地设…

系统安全分析与设计

系统安全分析与设计&#xff08;2分&#xff09; 内容提要 对称加密与非对称加密 加密技术与认证技术 加密技术&#xff08;只能防止第三方窃听&#xff09; 讲解地址&#xff1a;对称加密与非对称加密_哔哩哔哩_bilibili 认证技术 骚戴理解&#xff1a;数字签名是用私钥签名…

【Airflow】构建爬虫任务系统

爬虫脚本太多了需要进行管理一下&#xff0c;领导决定使用airflow 我了解了一下这个平台是用来做任务调度。 是一个ETL工具 ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程 这里是一个github的地址 https://github.com/apache/airflow 这里是官方文档 http…

css3 2d转换transform详细解析与代码实例transform

CSS3 Transform是CSS3的一个模块&#xff0c;其目的是为了通过对元素的变形、旋转、缩放、平移等操作&#xff0c;能够更加丰富的展示页面效果。下面是CSS3 Transform的详细解析与代码实例&#xff1a; transform属性 transform属性用于对元素进行变形操作&#xff0c;其属性…

[100天算法】-最长有效括号(day 38)

题目描述 给定一个只包含 ( 和 ) 的字符串&#xff0c;找出最长的包含有效括号的子串的长度。示例 1:输入: "(()" 输出: 2 解释: 最长有效括号子串为 "()" 示例 2:输入: ")()())" 输出: 4 解释: 最长有效括号子串为 "()()"来源&#…