【RabbitMQ】延迟队列 事务 消息分发

文章目录

  • 一、延迟队列
    • 一、概念 && 应用场景
    • 二、TTL+死信队列实现
      • 该实现方式存在的问题🐔
    • 三、延迟队列插件
      • ① 安装延迟队列插件
      • ② 基于插件延迟队列实现
    • 四、两种实现方式的区别
  • 二、事务
    • 一、配置事务管理器
    • 二、声明队列
    • 三、发送消息时打开事务
  • 三、消息分发
    • 一、概念
    • 二、应用场景
      • ① 限流
      • ② 负载均衡

一、延迟队列

一、概念 && 应用场景

延迟队列(Delayed Queue)即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

延迟队列的使用场景有很多,比如:

  1. 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

  2. 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议。

  3. 用户注册成功后,7天后发送短信,提高用户活跃度等。

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过TTL+死信队列的组合模拟出延迟队列的功能,所以死信队列章节展示的也是延迟队列的使用。

假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列中。消费者订阅的并非是normal_queue这个队列,而是dl_queue死信队列。当消息从normal_queue这个队列中过期之后被存入dl_queue这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息。

二、TTL+死信队列实现

延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL 刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里,这样消费者一直消费死信队列里的消息就可以了。

  1. 声明以及配置队列:(沿用前面死信队列的配置,只不过略做修改)

    @Bean("normalQueue")publicQueuenormalQueue(){returnQueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE)// 绑定死信交换机.deadLetterRoutingKey("dlk")// 绑定死信路由键.build();}
  2. 发送消息:发送两条消息,一条消息 10s 后过期,第二条 20s 后过期

    @RequestMapping("/delay")publicStringdelay(){// 发送两条单独带TTL的消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s..."+newDate(),message->{message.getMessageProperties().setExpiration("10000");// 延迟10s到达死信队列returnmessage;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 20s..."+newDate(),message->{message.getMessageProperties().setExpiration("20000");// 延迟20s到达死信队列returnmessage;});return"发送成功!";}
  3. 消费者:监听死信队列,打印信息,观察现象

    // 监听死信队列@RabbitListener(queues=Constants.DL_QUEUE)publicvoiddlQueue(Messagemessage){System.out.printf("%tc 死信队列接收到消息: %s\n",newDate(),newString(message.getBody()));}

该实现方式存在的问题🐔

把生产消息的顺序修改一下:先发送 20s 过期数据,再发送 10s 过期数据:

@RequestMapping("/delay")publicStringdelay(){// 发送两条单独带TTL的消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 20s..."+newDate(),message->{message.getMessageProperties().setExpiration("20000");// 延迟20s到达死信队列returnmessage;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s..."+newDate(),message->{message.getMessageProperties().setExpiration("10000");// 延迟10s到达死信队列returnmessage;});return"发送成功!";}

这时会发现:10s 过期的消息在 20s 后才进入到死信队列??

这是因为消息过期之后,不一定会被马上丢弃。因为 RabbitMQ 只会检查队首消息是否过期,如果过期则丢到死信队列,此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。

所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

三、延迟队列插件

RabbitMQ 官方提供了一个延迟的插件来实现延迟的功能

参考:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

① 安装延迟队列插件

  1. 下载并上传插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

根据自己的 RabbitMQ 版本选择相应版本的延迟插件,下载后上传到服务器或者放到本地的 RabbitMQ 的 plugins 目录中,可以参考下图解释:

  1. 启动插件(下面是 linux 系统指令,其它系统指令直接问 gpt 即可)

    # 查看插件列表rabbitmq-plugins list# 启动插件rabbitmq-pluginsenablerabbitmq_delayed_message_exchange# 重启服务servicerabbitmq-server restart
  2. 验证插件

在 RabbitMQ 管理平台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了。

② 基于插件延迟队列实现

  1. 声明与绑定交换机、队列

    // 常量publicstaticfinalStringDELAY_EXCHANGE="delay_exchange";publicstaticfinalStringDELAY_QUEUE="delay_queue";// 延迟队列@Bean("delayQueue")publicQueuedelayQueue(){returnQueueBuilder.durable(Constants.DELAY_QUEUE).build();// 队列正常设置}@Bean("delayExchange")publicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")publicBindingdelayBinding(@Qualifier("delayQueue")Queuequeue,@Qualifier("delayExchange")Exchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
  2. 生产者发送两条消息,并设置延迟时间

    @RequestMapping("/delay")publicStringdelay(){// 发送两条单独带TTL的消息rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 10s..."+newDate(),message->{message.getMessageProperties().setDelayLong(20000L);// 延迟20s到达死信队列returnmessage;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 20s..."+newDate(),message->{message.getMessageProperties().setDelayLong(10000L);// 延迟10s到达死信队列returnmessage;});return"发送成功!";}
  3. 消费者监听延迟队列,打印并观察消息

    @ComponentpublicclassDelayListener{@RabbitListener(queues=Constants.DELAY_QUEUE)publicvoiddelayQueue(Messagemessage){System.out.printf("%tc 延迟队列接收到消息: %s\n",newDate(),newString(message.getBody()));}}

从结果可以看出,使用延迟队列,可以保证消息按照延迟时间到达消费者。

四、两种实现方式的区别

实现方式优点缺点
TTL+死信① 灵活,不依赖额外插件② 适用于任何标准 RabbitMQ 环境① 存在消息顺序问题(先到期的消息可能被后到期的阻塞)② 需要额外逻辑处理死信消息,系统复杂度提高
插件① 插件可直接创建延迟队列,实现简单② 避免 DLX 的时序问题,顺序更可靠① 依赖特定插件(需安装维护)② 只支持部分 RabbitMQ 版本,兼容性有限

二、事务

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。

RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败

要使用 RabbitMQ 事务,需要同时完成下面三步操作

一、配置事务管理器

因为需要配置事务管理器,所以通常单独配置RabbitTemplate,然后配置时候调用rabbitTemplate.setChannelTransacted(true)打开事务管理器,并且需要配置一下事务管理器RabbitTransactionManager,如下所示:

@ConfigurationpublicclassTransactionConfig{@BeanpublicRabbitTransactionManagertransactionManager(ConnectionFactoryconnectionFactory){returnnewRabbitTransactionManager(connectionFactory);}@Bean("transRabbitTemplate")publicRabbitTemplatetransRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);// 开启事务returnrabbitTemplate;}}

二、声明队列

声明队列就和普通队列一样,不需要什么特殊设置:

// 事务@Bean("transQueue")publicQueuetransQueue(){returnQueueBuilder.durable("transQueue").build();}

三、发送消息时打开事务

@Resource(name="transRabbitTemplate")privateRabbitTemplatetransRabbitTemplate;// 注入transRabbitTemplate@Transactional@RequestMapping("/trans")publicStringtrans(){transRabbitTemplate.convertAndSend("","transQueue","test trans 1...");inta=5/0;// 模拟出现异常transRabbitTemplate.convertAndSend("","transQueue","test trans 2...");return"发送成功!";}

如果三个步骤中没做其中的任何一个,都没办法保证事务机制的启动!(自行测试)

三、消息分发

一、概念

当 RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表里的一个消费者(普通队列的点对点消费)。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

如何解决❓❓❓

可以使用channel.basicQos(int prefetchCount)限制当前信道上的消费者所能保持的最大未确认消息的数量

其中参数prefetchCount设置为 0 时表示没有上限。

比如:消费端调用了channel.basicQos(5),RabbitMQ 会为该消费者计数,发送一条消息计数+1,消费一条消息计数-1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息。类似 TCP/IP 中的 “滑动窗口”。

💥注意事项:basicQos()对拉模式的消费无效

二、应用场景

消息分发的常见应用场景有如下:

  1. 限流
  2. 非公平分发

① 限流

如下场景:

订单系统每秒最多处理 5000 个请求,正常情况下,订单系统可以正常满足需求。

但是在秒杀时间点,请求瞬间增多,每秒 1w 个请求,如果这些请求全部通过 MQ 发送到订单系统,无疑会把订单系统压垮。

所以 RabbitMQ 提供了限流机制,可以控制消费端一次只拉取 N 个请求,保证消费端的正常运行。

操作:设置prefetchCount参数,同时设置消息确认机制为手动应答manual

  1. 配置prefetch参数,设置应答方式为手动应答

    spring:rabbitmq:addresses:amqp://liren:123123@127.0.0.1/lirendadalistener:simple:acknowledge-mode:manual# 手动确认prefetch:5
  2. 配置交换机,队列

    // 常量publicstaticfinalStringQOS_EXCHANGE="qos_exchange";publicstaticfinalStringQOS_QUEUE="qos_queue";// 消息分发@Bean("qosQueue")publicQueueqosQueue(){returnQueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")publicDirectExchangeqosExchange(){returnExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")publicBindingqosBinding(@Qualifier("qosQueue")Queuequeue,@Qualifier("qosExchange")Exchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
  3. 发送消息,一次发送20条消息

    @RequestMapping("/qos")publicStringqos(){for(inti=0;i<20;++i){rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","test qos..."+i);}return"发送成功!";}
  4. 消费者监听,进行手动确认

    @ConfigurationpublicclassQosListener{@RabbitListener(queues=Constants.QOS_QUEUE)publicvoidqosQueue(Messagemessage,Channelchannel)throwsIOException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d%n",newString(message.getBody()),deliveryTag);// channel.basicAck(deliveryTag, true); // 注释掉,不进行确认,观察现象}}

发送消息时,需要先把手动确认注释掉,不然会直接消费掉

prefetch注释掉,然后重新启动程序观察现象:

可以看到消息一次性都被消费者拿到了,就没有限流效果了!

② 负载均衡

如下图所示,在有两个消费者的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费者会一直很忙,而另一个消费者很闲。这是因为 RabbitMQ 只是在消息进入队列时分派消息,它不考虑消费者未确认消息的数量。

我们可以使用设置prefetch=1的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。此时,它会将它分派给下一个不忙的消费者。

  1. 配置prefetch参数,设置应答方式为手动应答

    spring:rabbitmq:addresses:amqp://liren:123123@127.0.0.1/lirendadalistener:simple:acknowledge-mode:manual# 手动确认消息prefetch:1
  2. 启动两个消费者(用休眠模拟业务处理耗时的不同)

    @ConfigurationpublicclassQosListener{@RabbitListener(queues=Constants.QOS_QUEUE)publicvoidqosQueue1(Messagemessage,Channelchannel)throwsIOException,InterruptedException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("qosQueue1 接收到消息:%s,deliveryTag:%d%n",newString(message.getBody()),deliveryTag);Thread.sleep(1000);// 模拟快业务处理,1schannel.basicAck(deliveryTag,true);}@RabbitListener(queues=Constants.QOS_QUEUE)publicvoidqosQueue2(Messagemessage,Channelchannel)throwsIOException,InterruptedException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();System.out.printf("qosQueue2 接收到消息:%s,deliveryTag:%d%n",newString(message.getBody()),deliveryTag);Thread.sleep(2000);// 模拟慢业务处理,2schannel.basicAck(deliveryTag,true);}}

💥注意:deliveryTag有重复是因为两个消费者使用的是不同的Channel,每个Channel上的deliveryTag是独立计数的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1168649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Python的农产品价格数据分析与可视化 系统设计与实现

一、系统开发背景与核心目标 当前农产品市场存在“价格波动难追踪、数据价值难挖掘”的问题&#xff1a;农户难以实时掌握不同地区、不同季节的农产品价格走势&#xff0c;导致种植与销售决策滞后&#xff1b;采购商需在多个渠道查询价格信息&#xff0c;对比成本效率低&#x…

【python】python有必要像go或者nodejs那样做多版本切换吗?

Python 多版本切换&#xff1a;必要性分析 Python 生态的核心设计思路&#xff08;对比Go/Node.js&#xff09; 你的这个问题非常核心且专业&#xff0c;问到了 Python 生态和 Go/Node.js 最本质的设计差异&#xff1a;Python 并非「不需要」多版本切换&#xff0c;而是「必要…

基于Python的高校毕业生招聘信息推荐系统设计与实现

一、系统开发背景与核心目标 高校毕业生在求职过程中常面临“信息过载与精准匹配缺失”的双重困境&#xff1a;招聘信息分散于各类平台&#xff0c;毕业生需耗费大量时间筛选有效内容&#xff1b;传统推荐多依赖简单关键词匹配&#xff0c;难以结合专业背景、技能特长、职业规划…

2026年转行网安到底行不行,网络安全有没有发展前途,零基础转行难不难?_网络安全失业人越来越多了

网络安全行业前景分析&#xff1a;后疫情时代的黄金赛道 疫情后的职场两极分化 在新冠疫情常态化的今天&#xff0c;职场呈现出明显的两极分化现象&#xff1a; 一些行业如在线教育、远程办公、网络安全等实现井喷式增长&#xff0c;从业者薪资翻倍、愈加繁忙一些传统行业则…

网络安全检测实战指南:核心方法、工具选型与在安全运维中的关键应用

一&#xff0c;网络安全漏洞 安全威胁是指所有能够对计算机网络信息系统的网络服务和网络信息的机密性&#xff0c;可用性和完整性产生阻碍&#xff0c;破坏或中断的各种因素。安全威胁可分为人为安全威胁和非人为安全威胁两大类。 1&#xff0c;网络安全漏洞威胁 漏洞分析的…

首届“国家绿色算力设施”推广交流活动开幕在即

首届“国家绿色算力设施”推广交流活动将于2026年1月21日在上海市召开&#xff0c;活动将邀请行业主管部门领导、相关领域专家以及中国移动通信集团甘肃有限公司、临港算力(上海)科技有限公司等国家绿色算力设施入选单位、国家信息化领域节能降碳技术提供企业、行业组织、中央企…

【收藏】2026年AI大模型学习避坑指南+实操干货,小白/程序员快速入门

最近不少朋友私下问我&#xff1a;“2026年想提升技术&#xff0c;学什么方向最有前景&#xff1f;” 我的答案始终坚定——AI大模型。其实我当初入门时&#xff0c;也是个跨行零基础的纯小白&#xff0c;全靠行业前辈的经验分享&#xff0c;才避开了无数弯路和坑。今天就把这份…

【技术教程】前端UI组件库Shadcn/ui

shadcn/ui 详解与实战案例 shadcn/ui 是近年来备受前端开发者青睐的 UI 组件库&#xff0c;与传统 UI 库&#xff08;如 Ant Design、MUI&#xff09;有本质区别。它不是一个通过 npm 安装的第三方依赖包&#xff0c;而是一套可直接复制到项目中的高质量组件源代码&#xff0c…

一文读懂计算机网络安全:核心要义、防护体系与实战资源清单

一、网络安全原理 网络安全包含两大部分内容&#xff1a;一是网络系统安全&#xff0c;二是网络上的信息安全。它涉及网络系统的可靠性、稳定性&#xff0c;以及网络上信息的保密性、完整性、可用性、真实性和可控性等。 网络系统安全&#xff1a;指保证信息处理和传输系统的…

基于FPGA的DS18B20温度监控与管理系统:实时显示温度,按键查看历史数据并存储温度点,超...

基于FPGA的 DS18B20多功能温度显示 实现功能&#xff1a; 1.实时温度显示在数码管上&#xff0c;更新速率1-2s一次 2.按下按键显示最近30s内的最高温和最低温 不包含板子&#xff0c;3.按下按键可以存储当前温度&#xff0c;最多存5个 4.超过温度报警最近在搞一个挺有意思的FPG…

奇灵·第八届AI短剧产业合作大会于1月9日深圳圆满落幕

1月9日&#xff0c;由传商、短剧右先生、短剧新势力联合主办&#xff0c;短剧探访承办&#xff0c;欧美片场协办&#xff0c;上海东方智媒城、Midjourney、马栏山音视频实验室、阿里云快快网络、漫屋、竖店短剧基地、橙调文化等机构共同支持的“奇灵第八届AI短剧产业合作大会”…

采购系统值不值得上?先看它能不能接住这几件日常工作

干采购的兄弟们&#xff0c;是不是经常被供应商催单到怀疑人生&#xff1f;合同漏签了&#xff0c;客户投诉上门库存对不上&#xff0c;月底盘点直接手忙脚乱审批流程卡在领导手机上&#xff0c;等得花儿都谢了我见过太多采购同事&#xff0c;半夜还在群里问谁有空批个单子&…

现代攻防下的网络安全防护:关键技术演进与核心场景实战解析

1&#xff1a;网络基础知识 Internet通过TCP/IP协议将遍布在全世界各地的计算机互联&#xff0c;从而形成超大的计算机网络。 2&#xff1a; 3&#xff1a;网络协议层模型 4&#xff1a;通信网络地址的发出点为源地址&#xff0c;接收点为目的地址&#xff1b; 在通信网络中&…

救命神器10个AI论文写作软件,助本科生搞定毕业论文!

救命神器10个AI论文写作软件&#xff0c;助本科生搞定毕业论文&#xff01; 论文写作的救星&#xff0c;AI 工具正在改变你的学习方式 在当今学术写作日益智能化的背景下&#xff0c;AI 工具正逐步成为学生群体中不可或缺的得力助手。尤其对于本科生而言&#xff0c;面对繁重的…

Minimax大模型算法岗面试全攻略:从基础到进阶,收藏这份超详细面经

本文分享了minimax大模型算法岗面试的详细经历&#xff0c;涵盖一面和二面的各类问题&#xff0c;包括大模型基础知识、项目经验、算法实现等。面试涉及位置编码、预训练推理、超长上下文处理、PPO与DPO算法对比等内容&#xff0c;还包含编程题。一面 自我介绍和项目介绍介绍一…

前后端分离专辑鉴赏网站系统|SpringBoot+Vue+MyBatis+MySQL完整源码+部署教程

摘要 随着互联网技术的快速发展&#xff0c;数字音乐产业迎来了前所未有的增长&#xff0c;用户对音乐资源的获取和鉴赏需求日益多样化。传统的音乐网站通常采用前后端耦合的架构&#xff0c;导致系统扩展性差、维护成本高&#xff0c;难以满足现代用户对高性能、高交互性的需求…

基于SpringBoot与微信小程序的文化旅游小程序系统设计与实现

一、系统开发背景与意义 当前文化旅游产业蓬勃发展&#xff0c;但游客在出行中常面临信息分散、体验单一等问题。传统旅游信息获取依赖旅游APP或线下攻略&#xff0c;存在下载门槛高、信息更新不及时等弊端&#xff0c;且难以深度融合文化元素&#xff0c;无法满足游客对文化体…

AI大模型工程架构全解析:从零基础入门到实战精通,一篇就够了!

文章揭示了AI智能体落地的真相&#xff1a;90%工作是工程架构设计&#xff0c;仅10%涉及大模型本身。详细拆解了智能体落地的14个层次&#xff0c;包括算力、基础设施、数据库、ETL、基础模型等&#xff0c;强调企业需全链路打磨而非单点突破。大模型虽是点睛之笔&#xff0c;但…

云边协同架构下:分布式 IDC 集群温湿度远程运维监控方案

原标题&#xff1a;机房智能化温湿度监控系统整体解决方案机房温湿度监控一、方案概述1. 建设目标• 实现机房温湿度 724 小时不间断监测&#xff0c;温度精度 0.2℃、湿度精度 2% RH&#xff0c;满足 GB 50174-2017 标准&#xff08;温度 18~27℃、湿度 40%~60% RH&#xff09…

收藏!程序员必看:5步转型AI大模型开发者,未来5年最值钱的技术路线

兄弟们&#xff01;现在用Cursor写代码确实爽&#xff0c;但你知道这玩意其实是慢性毒药吗&#xff1f; 当编程变得和用Word一样简单&#xff0c;老板还需要花钱雇你写CRUD吗&#xff1f; 未来5年真正值钱的程序员&#xff0c;都是懂大模型原理的程序员&#xff01; 随着模型…