RocketMQ消息积压

news/2025/12/3 22:29:57/文章来源:https://www.cnblogs.com/ciel717/p/19282029

一、概述

在分布式系统架构中,RocketMQ作为主流消息中间件,承担着业务解耦、流量削峰、异步通信的核心职责。但消息积压是其运行过程中高频出现的问题,一旦发生,轻则导致业务响应延迟(如订单状态同步慢、通知推送超时),重则引发Broker磁盘溢出、核心服务中断,直接影响用户体验与系统稳定性。

二、线上典型案例

2.1 突发流量

背景:某电商平台活动期间,订单提交业务的RocketMQ Topicorder-submit-topic)突然出现消息积压,1小时内堆积量从0飙升至80万条,订单状态更新延迟超30分钟,用户频繁投诉“下单后显示未支付”。
现象:监控显示生产TPS从平时的2000暴涨至15000,消费TPS始终稳定在3000,且所有队列积压均匀,Broker CPUIO使用率正常。
排查与解决

  • 根因:大促前未根据预估流量(20000TPS)扩容消费端,默认20个消费线程+8Topic队列,并行处理能力不足;
  • 应急:10分钟内新增6台消费节点,将消费线程数从20调至80(匹配队列数8,实际并行8线程,新增节点通过负载均衡分摊队列),30分钟后积压清零;
  • 长期:大促前压测验证消费能力,将Topic队列数扩容至32,支持32个并行线程,同时配置生产端限流。

2.2 消费慢查询

背景:某物流平台的“运单状态同步”Topic(waybill-status-topic)出现持续性积压,每天新增积压10万条,消费端服务器CPU使用率仅30%,无明显异常日志。
现象:单条消息处理耗时从正常的50ms增至800ms,消费线程无报错但大量处于TIMED_WAITING状态。
排查与解决

  • 根因:消费代码中同步调用DBwaybill_status表查询,该表因索引失效导致查询耗时从10ms增至700ms,消费线程被阻塞;
  • 应急:临时新增DB索引(idx_waybill_id),单条消息处理耗时恢复至60ms,2小时后积压清零;
  • 长期:将同步DB操作改为“异步线程池提交+重试机制”,避免消费主线程阻塞。

2.3 磁盘IO瓶颈

背景:某支付平台的“交易日志同步”Topic(trade-log-topic)出现Broker级积压,所有队列的消息写入耗时从1ms增至80ms,生产端发送超时率达15%
现象Broker服务器磁盘IO使用率长期维持在98%store.log中频繁出现“Flush disk slow”日志。
排查与解决

  • 根因:Broker使用机械硬盘(HDD),峰值时段日志写入量达500MB/分钟HDD IO能力不足(仅100MB/分钟),导致消息刷盘延迟;
  • 应急:临时将Broker的刷盘策略从SYNC_FLUSH改为ASYNC_FLUSHIO使用率降至60%,写入耗时恢复至5ms
  • 长期:将Broker磁盘替换为SSD,同时扩容Broker集群,分担写入压力。

三、积压核心原因

消息积压的本质是“生产-Broker-消费”全链路处理速度不匹配,具体可从三个核心环节拆解:

3.1 生产端

核心问题是流量超出系统承载上限。常见场景包括电商大促、秒杀等突发流量未做控制,生产TPS短时间内暴涨数倍;部分业务未配置限流熔断机制,当下游Broker或消费端故障时,仍持续发送消息;批量发送参数设置不合理,单次发送1000条以上消息,导致Broker单批处理耗时超1秒,间接引发后续消息排队。

3.2 消费端

核心问题是处理能力不足。例如消费代码包含复杂业务逻辑,同步DB写入、多第三方接口调用导致单条消息处理耗时从几十毫秒增至数百毫秒,TPS大幅下降;消费线程配置与队列数不匹配,如Topic16个队列却仅启动10个线程,实际并行能力被限制为10个线程;失败消息重试机制滥用,未设置重试次数上限,部分因DB临时不可用的消息反复进入重试队列,占用正常消费资源;消费节点故障,如部分服务器宕机、网络断连,剩余节点无法承接全部队列的消费任务。

3.3 Broker端

核心问题是资源瓶颈阻塞消息流转。比如使用机械硬盘(HDD)处理高写入量业务,磁盘IO能力不足导致刷盘延迟;Topic队列数配置过少,并行处理能力上限低,无法支撑高并发消费;Broker内存不足,PageCache(页缓存)满后无法缓存消息,导致频繁磁盘读写,处理速度下降;延时队列配置不当,大量设置“1小时后发送”的延时消息,未及时从延时队列转发至目标队列,形成专属积压。

四、积压线上排查

线上排查消息积压需遵循“从监控到现场、逐步定位”的逻辑,避免盲目操作,具体步骤如下:

4.1 快速锁定问题环节

优先查看RocketMQ DashboardPrometheus监控面板,重点关注三类指标:

  • 消息积压量(消费位点与生产位点的差值,案例1中90万条即为此差值)、
  • 生产与消费TPS对比(若生产TPS持续高于消费TPS,说明消费能力不足)、
  • Broker核心指标(磁盘IO使用率超85%、队列写入耗时超50ms需警惕,案例3中IO使用率99%直接指向Broker瓶颈)。

4.2 日志深挖故障点

  • 消费端日志需重点搜索“TimeoutException”“SQLException”等异常关键字,同时关注标注“处理耗时xxx ms”的慢日志(案例2中通过慢日志发现DB查询耗时过长);
  • Broker日志则查看broker.log中的“Flush disk slow”“Thread pool is full”“Disk full”等信息,定位是否存在刷盘延迟、线程池满、磁盘溢出等问题。

4.3 现场诊断确认根因

  • 借助命令行工具分析实时状态:查看消费线程状态用jstack 消费进程ID | grep ConsumeMessageThread,若大量线程处于BLOCKEDTIMED_WAITING,可能是消费逻辑阻塞(案例2即通过该命令确认线程阻塞);
  • 分析GC情况用jstat -gcutil 消费进程ID 1000,若Full GC每分钟超过3次,会导致消费线程频繁暂停;
  • 查看消息内容用./mqadmin viewMessage -t Topic名称 -i 消息ID,排查是否存在单条消息过大(如超过10MB)的情况。

五、解决方案

5.1 临时方案

临时方案的核心目标是“快速缓解积压,避免业务影响扩大”,主要包括以下措施:

  • 临时扩容消费线程。通过调整消费端配置增加线程数,确保线程数与Topic队列数匹配(避免资源浪费),Spring Boot环境下的配置代码如下:
@Configuration
public class RocketMQConsumerConfig {// 临时调大消费线程数(案例1中从20调至80,与队列数匹配)@Value("${rocketmq.consumer.thread.min:80}")private int consumeThreadMin;@Value("${rocketmq.consumer.thread.max:80}")private int consumeThreadMax;@Beanpublic DefaultMQPushConsumer orderConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");consumer.setNamesrvAddr("192.168.1.100:9876");// 设置消费线程数consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);// 订阅订单提交Topicconsumer.subscribe("order-submit-topic", "*");// 消费逻辑consumer.registerMessageListener((msgs, context) -> {for (MessageExt msg : msgs) {// 订单业务处理(简化示例)String orderJson = new String(msg.getBody(), StandardCharsets.UTF_8);OrderDTO order = JSON.parseObject(orderJson, OrderDTO.class);orderService.handleSubmit(order);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();return consumer;}
}
  • 限流非核心生产任务。对非核心业务(如用户行为日志、统计数据)进行限流,避免加剧积压,生产端使用Guava RateLimiter的限流代码如下:
@Service
public class OrderProducerService {// 限制非核心业务生产TPS为5000(匹配消费能力)private final RateLimiter logRateLimiter = RateLimiter.create(5000.0);@Autowiredprivate DefaultMQProducer producer;public void sendOrderMessage(OrderDTO order, MessageType type) throws Exception {// 非核心业务(日志类)触发限流if (type == MessageType.ORDER_LOG) {if (!logRateLimiter.tryAcquire()) {log.warn("非核心消息限流,orderId:{}", order.getOrderId());return;}}// 发送消息Message message = new Message("order-submit-topic","order-tag",JSON.toJSONBytes(order));producer.send(message);}
}
  • 跳过无效积压消息。若积压消息为可丢弃类型(如过期统计数据),可在消费端临时添加过滤逻辑,直接标记为消费成功,代码示例如下:
// 消费端临时过滤过期消息(简化示例)
consumer.registerMessageListener((msgs, context) -> {long now = System.currentTimeMillis();for (MessageExt msg : msgs) {// 消息存储时间超过24小时,直接跳过if (now - msg.getStoreTimestamp() > 24 * 60 * 60 * 1000) {log.info("跳过过期消息,msgId:{}", msg.getMsgId());continue;}// 正常业务处理handleMessage(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

5.2 长期方案

长期方案需从根源解决问题,避免积压反复出现,主要包括以下措施:

  • 优化消费逻辑(异步化处理)。将同步DB操作、第三方接口调用等耗时任务异步化,避免阻塞消费主线程,代码示例如下:
@Service
public class WaybillConsumerService {// 异步处理线程池(核心线程16,最大32,队列容量1024)private final ExecutorService asyncHandlerPool = new ThreadPoolExecutor(16, 32, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1024),new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时回退到调用线程);@Autowiredprivate WaybillMapper waybillMapper;@Autowiredprivate LogisticsApi logisticsApi;public ConsumeConcurrentlyStatus consume(List<MessageExt> msgs) {for (MessageExt msg : msgs) {WaybillTrackDTO track = JSON.parseObject(msg.getBody(), WaybillTrackDTO.class);// 异步提交处理任务asyncHandlerPool.submit(() -> {try {// 异步更新DBwaybillMapper.updateTrack(track);// 异步调用第三方接口logisticsApi.notifyTrack(track);} catch (Exception e) {log.error("处理运单轨迹失败,waybillId:{}", track.getWaybillId(), e);// 重试3次后停止(避免无效重试)if (msg.getReconsumeTimes() < 3) {throw new RuntimeException("重试消费", e);}}});}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}
  • 优化Broker配置。调整刷盘策略、队列数、文件大小等参数,提升Broker处理能力,broker.conf核心配置如下:
# 刷盘策略:SSD环境下用异步刷盘(兼顾性能与可靠性)
flushDiskType=ASYNC_FLUSH# CommitLog文件大小:1GB(减少刷盘次数,降低IO压力)
mapedFileSizeCommitLog=1073741824# 消息过期时间:72小时(避免过期消息占用磁盘)
fileReservedTime=259200# Topic队列数调整:通过命令扩容至32(读写队列数一致)
# 执行命令:./mqadmin updateTopic -n 192.168.1.100:9876 -t trade-flow-topic -r 32 -w 32
  • 批量消费优化。通过批量拉取消息、批量写入DB提升消费效率,代码示例如下:
@Bean
public DefaultMQPushConsumer batchConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("trade-flow-consumer-group");consumer.setNamesrvAddr("192.168.1.100:9876");// 批量拉取消息:一次拉取32条(根据业务调整,最大128)consumer.setPullBatchSize(32);consumer.subscribe("trade-flow-topic", "*");consumer.registerMessageListener((msgs, context) -> {if (CollectionUtils.isEmpty(msgs)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 批量解析消息List<TradeFlowDTO> flowList = msgs.stream().map(msg -> JSON.parseObject(msg.getBody(), TradeFlowDTO.class)).collect(Collectors.toList());// 批量写入DB(MyBatis批量插入)tradeFlowMapper.batchInsert(flowList);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();return consumer;
}

六、预防方案

预防消息积压需从“监控、压测、架构”三个维度构建保障体系,具体措施如下:

6.1 完善监控告警

RocketMQ DashboardPrometheus中配置阈值告警:

  • 消息积压量超1万条、消费延迟超5分钟时触发告警;
  • Broker端设置磁盘IO85%、内存使用率超90%、刷盘耗时超500ms的告警;
  • 消费端配置单条消息处理耗时超500ms、消费失败率超1%的告警。

告警触发后通过短信、钉钉通知运维与开发人员,确保问题早发现。

6.2 大促前压测验证

新业务上线或大促活动前,通过mqadmin工具模拟峰值流量进行压测,命令示例如下:

# 模拟20000 TPS的消息发送,测试消费能力
./mqadmin sendMessage -n 192.168.1.100:9876 -t order-submit-topic -p '{"orderId":"test","amount":100}' -c 2000000  # 总消息数-s 20000     # 发送TPS

压测中若发现消费TPS不足、Broker资源瓶颈,需提前扩容节点、优化配置,确保正式环境能承载峰值流量。

6.3 架构层面削峰填谷

  • 高并发业务(如秒杀、大促订单)在生产端前增加Redis缓冲层:用户请求先写入Redis队列,再由定时任务按消费能力分批次(如1000TPS)向RocketMQ发送消息;
  • 核心业务与非核心业务拆分Topic(如“订单提交”与“订单日志”分为两个Topic),分别配置消费资源,避免非核心业务抢占核心资源;
  • 引入死信队列与重试管理平台,统一管理失败消息,定期排查失败原因,避免无效重试导致的资源浪费。

七、总结

RocketMQ消息积压的处理核心是“先定位根因,再分层解决”:

  • 突发流量用“临时扩容+限流”快速缓解,
  • 消费阻塞用“异步化+批量处理”优化能力,
  • Broker瓶颈用“资源升级+配置调整”突破限制。

而预防积压的关键在于“提前监控、压测验证、架构优化”,通过全链路保障体系,从源头降低积压风险。实际业务中需结合场景灵活调整方案:

  • 金融类业务需优先保障数据可靠性,可选择SYNC_FLUSH刷盘策略;
  • 非核心统计业务可允许短暂丢弃,优先保证处理效率。

若遇到死信队列积压、跨国Broker延迟等特殊场景,可进一步结合RocketMQ的重试机制、异地容灾方案优化,确保消息中间件稳定支撑业务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/986103.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring的三级缓存及二三级缓存解决的问题 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

个人学习匿名内部类转lambda表达式转方法引用运算符的一个记录 - 亚麻青

首先我先来讲一下匿名内部类。 匿名内部类是内部类的一种,他的特点是无名字,直接进行实例化操作,最常见的就是接口和/抽象类的实例化。 使用条件:1、必须继承一个类或者实现一个接口。2、只能实现单继承。3、必须创…

函数指针与函数对象

函数指针与函数对象一、函数指针是什么? 函数指针的定义:在 C++ 中就是指向函数的指针变量,类型为 返回值类型(*)(参数类型列表),它保存的是函数的地址。通过函数指针,你可以动态调用指针指向的函数,实现更灵活的…

敏捷冲刺日志 - Day 5

敏捷冲刺日志 - Day 5 站立会议 站立时会议改为线上进行。昨天已完成的工作:saveVideoToGallery 功能已实现并测试通过。 初步定位了“替换”闪退问题的原因是 SecurityException。今天计划完成的工作:核心任务:尝试修…

12月3日日记

1.今天学习java web以及数据结合的复习 2.明天体育课打比赛 3.如何配置 StringRedisTemplate 和 RedisTemplate 的序列化(避免乱码 + 支持对象存储)?

第五篇Scrum冲刺博客

每日Scrum报告 日期: 2025-11-30 会议时间: 09:00 1. 当日站立式会议记录 会议照片成员同步内容 成员:齐思贤昨天已完成的工作:开发个人资料查询接口(GET /api/v1/users/me),返回用户详情(含review_count/coll…

敏捷冲刺日志 - Day 6

敏捷冲刺日志 - Day 6 站立会议 站立时会议改为线上进行。昨天已完成的工作:编写了兼容 Android 10 和 Android 11+ 的文件删除逻辑。 测试发现 Android 10 的 RecoverableSecurityException 方案不稳定。今天计划完成…

深入解析:Spring Kafka消费者被踢出组?CommitFailedException异常全面解析与解决方案

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

OWASP Java HTML 清理库曝出 XSS 漏洞:noscript 与 style 标签组合成隐患

OWASP Java HTML Sanitizer 库在某些特定策略配置下(允许 noscript 和 style 标签并允许 style 标签内含文本),存在跨站脚本漏洞。攻击者可构造特殊载荷绕过清理,导致 XSS 攻击。CVE-2025-66021:OWASP Java HTML …

敏捷冲刺日志 - Day 4

敏捷冲刺日志 - Day 4 站立会议 站立时会议改为线上进行。昨天已完成的工作:实现了视频的多选和队列压缩功能。 UI 可以正确显示批量压缩的进度。今天计划完成的工作:新功能:实现“保存到相册”功能,确保用户压缩的视…

计算机视觉黄金时代的回顾与展望

文章记录了资深科学家梅迪奥尼分享其40多年计算机视觉研究历程,重点阐述了从理解客户问题出发、逆向发明技术的方法论,并介绍了其在无感支付等场景下的实际应用,认为当前是计算机视觉解决现实商业问题的黄金时代。1…

homebrew运行机制

🍺 Homebrew 的结构 = 一个“酿酒厂” 以下是核心概念的真实意义 + 隐喻意义:1. Formula(配方) 真实意义: Homebrew 安装“源码软件”的脚本,描述软件从哪里下载、如何编译、有哪些依赖等。 隐喻: 👉 配方:…

解码构造与析构

构造与析构基础概念 核心定义构造函数:对象被创建时自动调用的特殊成员函数,唯一作用是初始化对象的成员属性,确保对象创建后处于合法可用状态。 析构函数:对象被销毁前自动调用的特殊成员函数,用于释放对象占用的…

敏捷冲刺日志 - Day 2

敏捷冲刺日志 - Day 2 站立会议 站立时会议改为线上进行。昨天已完成的工作:熟悉了项目初始版本(v1.0)的代码结构和功能。 分析了用户提出的三个核心需求:修复权限异常、界面汉化、增加批量处理。今天计划完成的工作…

10.结构型 - 代理模式 (Proxy Pattern)

代理模式 (Proxy Pattern) 在软件开发中,由于一些原因,客户端不想或不能直接访问一个对象,此时可以通过一个称为"代理"的第三者来实现间接访问.该方案对应的设计模式被称为代理模式. 代理模式(Proxy Design …

敏捷冲刺日志 - Day 1

敏捷冲刺日志 - Day 1 各个成员在 Alpha 阶段认领的任务 本次七天冲刺即为项目的 Alpha 阶段,目标是交付一个具备核心功能、可运行、可演示的最小可行产品(MVP)。团队成员在本阶段的任务分配如下:刘瑞康 (开发):负…

2025年中国集成灶十大品牌综合实力榜:选购指南与权威解析

body { font-family: "Microsoft YaHei", sans-serif; line-height: 1.8; color: rgba(51, 51, 51, 1); max-width: 1000px; margin: 0 auto; padding: 20px; background-color: rgba(249, 249, 249, 1) } h…

朝花夕拾OI回忆录

朝花夕拾 OI 回忆录 序言 或许是因为喜欢追忆吧,也或许是临近AFO,内心有一些触动,又或者是为了给后续的OIer一些前者的失败经验吧……总之,2025年12月3日,我十六岁生日这天,我决定写这篇 OI 回忆录,以记录我对O…

细胞因子:细胞信使的分子世界与功能解析

在复杂的多细胞生物体内,细胞间的信息交流是维持生命活动的基础。其中,细胞因子 作为一类关键的信使分子,在免疫调节、细胞生长、分化、炎症反应和组织修复等过程中扮演着不可或缺的角色。本文将深入探讨细胞因子的…

NOIp 的 p 是 painting 的 p!

哇还有连续剧。 作者在 CSP 后推完魔宴正在推 WA2。 Day -6 发现惊天理论:Day -3 最后的 ZR 有点娱乐赛,T1 是哈希表广告题,赛时裸 umap 拿了 90pts,赛后拿 umap 和 gp 卡了一万年卡不过,严肃学习 Dzb 牌哈希表,…