【RabbitMQ】延迟队列

1.概述

延迟队列其实就是队列里的消息是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延时队列的使用场景:

1.订单在十分钟之内未支付则自动取消

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.用户注册成功后,如果三天内没有登陆则进行短信提醒。

4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

2.代码演示 

代码是用springboot整合的。

先导入依赖

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency>
</dependencies>

配置文件

spring.rabbitmq.host=192.168.10.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=rabbit
spring.rabbitmq.virtual-host=/

启动器

@SpringBootApplicationpublic class App {public static void main(String[] args) {SpringApplication.run(App.class,args);}}

 需求如下:创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交 换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

定义配置类,描述上图的队列,交换机以及队列和交换机之间的关系

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;// 使用@Configuration注解表明这是一个配置类,Spring容器会扫描该类来获取Bean的定义信息
@Configuration
public class DelayedQueueConfig {// 定义直连类型(Direct)的交换机,名称为xExchange// @Bean注解用于将方法返回的对象注册为Spring容器中的一个Bean,"xExchange"是该Bean的名称@Bean("xExchange")public DirectExchange xExchange() {// 创建并返回一个名为"X"的DirectExchange实例,DirectExchange类型的交换机根据路由键直接转发消息return new DirectExchange("X");}// 声明另一个直连类型的交换机,名称为yExchange@Bean("yExchange")public DirectExchange yExchange() {// 创建并返回一个名为"Y"的DirectExchange实例return new DirectExchange("Y");}// 声明队列queueA@Bean("queueA")public Queue queueA() {// 创建一个HashMap用于存储队列的属性参数Map<String, Object> args = new HashMap<>();// 设置死信交换机(当消息在队列中过期或被否定确认等情况时,消息会被转发到这个交换机)为yExchangeargs.put("x-dead-letter-exchange", "Y");// 设置死信路由键,当消息进入死信交换机后,根据这个路由键来路由到对应的死信队列args.put("x-dead-letter-routing-key", "YD");// 设置消息在队列中的存活时间(即延时时间)为10000毫秒,也就是10秒,这使得queueA成为一个延时队列args.put("x-message-ttl", 10000);// 创建一个持久化的队列(QueueBuilder.durable方法),名称为"QA",并带上前面设置的属性参数return QueueBuilder.durable("QA").withArguments(args).build();}// 绑定交换机xExchange和队列queueA// @Qualifier注解用于根据指定的Bean名称来注入对应的Bean实例,这里分别指定了要注入的队列和交换机实例@Beanpublic Binding queueQABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {// 使用BindingBuilder将队列queueA绑定到交换机xExchange上,绑定的路由键为"XA"return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列queueB,设置其延时时间为40秒@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>();// 同样设置死信交换机为yExchangeargs.put("x-dead-letter-exchange", "Y");// 设置死信路由键为"YD"args.put("x-dead-letter-routing-key", "YD");// 设置消息在队列中的存活时间为40000毫秒,即40秒,使queueB成为延时队列args.put("x-message-ttl", 40000);// 创建一个持久化的队列,名称为"QB",并带上相关属性参数return QueueBuilder.durable("QB").withArguments(args).build();}// 绑定交换机xExchange和队列queueB@Beanpublic Binding queueQBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {// 将队列queueB绑定到交换机xExchange上,绑定的路由键为"XB"return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明死信队列queueD@Bean("queueD")public Queue queueD() {// 创建一个名称为"QD"的队列,用于接收从延时队列中过期转移过来的消息return new Queue("QD");}// 声明死信交换机yExchange和死信队列queueD的绑定关系@Beanpublic Binding deadQueueBindingQD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {// 使用BindingBuilder将死信队列queueD绑定到死信交换机yExchange上,绑定的路由键为"YD"return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

 生产者

// @RestController注解表明该类是一个RESTful风格的控制器,用于处理HTTP请求并返回JSON等格式的数据
@RestController 
// @RequestMapping("ttl")注解用于映射请求路径,所有以"/ttl"开头的请求会被该控制器处理
@RequestMapping("ttl") 
public class SendMessageController { // @Resource注解用于自动装配RabbitTemplate实例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具类@Resource RabbitTemplate rabbitTemplate; // 该方法用于处理"/sendMsg/{message}"路径的请求,是消息发送的逻辑所在// @RequestMapping("sendMsg/{message}")注解将该方法映射到指定的请求路径,其中{message}是一个路径变量@RequestMapping("sendMsg/{message}") public void sendMessage(@PathVariable("message") String message){ // 使用rabbitTemplate的convertAndSend方法向RabbitMQ发送消息// 第一个参数"X"指定交换机名称,对应前面配置的xExchange// 第二个参数"XA"指定路由键,用于将消息路由到绑定了该路由键的队列(这里是queueA)// 消息内容是拼接后的字符串,表明消息来自ttl为10秒钟的延时队列,并带上传入的参数messagerabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10秒钟的延时队列" + message); // 同理,这条消息发送到交换机"X",通过路由键"XB"路由到queueB// 消息内容表明来自ttl为40秒钟的延时队列rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40秒钟的延时队列" + message); }
}

消费者

// @Component注解将该类标记为一个Spring组件,使其能被Spring容器扫描并管理
@Component 
// @Slf4j注解是Lombok提供的,用于自动生成日志对象log,方便在类中记录日志
@Slf4j 
public class MessageConsumerListener { // @RabbitListener(queues = "QD")注解表明该方法是一个RabbitMQ消息监听器,监听名为"QD"的队列// 当队列"QD"(即前面配置的死信队列)中有消息时,该方法会被触发执行@RabbitListener(queues = "QD") public void getMessage(Message message, Channel channel) throws Exception{ // 获取消息体内容,将消息的字节数组转换为字符串String msg = new String(message.getBody()); // 使用日志对象log记录信息,输出当前时间以及从死信队列收到的消息内容log.info("当前时间是:{},收到死信队列的消息{}",new Date().toString(),msg); }
}

我们上面构建的延时队列太局限性了,因为我们直接写死了延时队列的时间,但我们实际的应用中很多情况都是根据客户端动态设置时间,比如腾讯会议我们要预定多久的会。

所以下面这个案例新增了一个队列QC,他不设置TTL,而是根据传送的数据来动态设定。

我们在配置类中加上QC和交换机x交换机y之间的绑定关系

@Configuration
public class DelayedQueueConfig {@Bean("queueC")public Queue queueC() {Map<String, Object> args = new HashMap<>();//设置绑定死信交换机的属性args.put("x-dead-letter-exchange", "Y");args.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable("QC").withArguments(args).build();}//绑定队列QC和交换机X之间的关系@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

 定义生产者

// @RestController注解表明该类是一个RESTful风格的控制器,用于处理HTTP请求并返回JSON等格式的数据
@RestController 
// @RequestMapping("ttl")注解用于映射请求路径,所有以"/ttl"开头的请求会被该控制器处理
@RequestMapping("ttl") 
public class SendMessageController { // @Resource注解用于自动装配RabbitTemplate实例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具类@Resource RabbitTemplate rabbitTemplate; // 该方法用于处理"/sendttlMessage/{message}/{ttl}"路径的请求,是消息发送的逻辑所在// @RequestMapping("sendttlMessage/{message}/{ttl}")注解将该方法映射到指定的请求路径,其中{message}和{ttl}是路径变量@RequestMapping("sendttlMessage/{message}/{ttl}") public void sendTtlMessage(@PathVariable("message") String message, @PathVariable("ttl") String ttl) { // 使用rabbitTemplate的convertAndSend方法向RabbitMQ发送消息// 第一个参数"X"指定交换机名称// 第二个参数"XC"指定路由键,用于将消息路由到绑定了该路由键的队列// 第三个参数message是消息内容// 第四个参数是一个Lambda表达式,用于在发送消息前设置消息的过期时间(通过setExpiration方法)// 设置的过期时间由路径变量ttl传入,单位为毫秒rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/76393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux笔记之Ubuntu系统设置自动登录tty1界面

Ubuntu22.04系统 编辑getty配置文件 vim /etc/systemd/system/gettytty1.service.d/override.conf如果该目录或者文件不存在&#xff0c;进行创建。 在override.conf文件中进行编辑&#xff1a; [Service] ExecStart ExecStart-/sbin/agetty --autologin yourusername --no…

C++程序诗篇的灵动赋形:多态

文章目录 1.什么是多态&#xff1f;2.多态的语法实现2.1 虚函数2.2 多态的构成2.3 虚函数的重写2.3.1 协变2.3.2 析构函数的重写 2.4 override 和 final 3.抽象类4.多态原理4.1 虚函数表4.2 多态原理实现4.3 动态绑定与静态绑定 5.继承和多态常见的面试问题希望读者们多多三连支…

算法训练之动态规划(三)

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…

$_GET变量

$_GET 是一个超级全局变量&#xff0c;在 PHP 中用于收集通过 URL 查询字符串传递的参数。它是一个关联数组&#xff0c;包含了所有通过 HTTP GET 方法发送到当前脚本的变量。 预定义的 $_GET 变量用于收集来自 method"get" 的表单中的值。 从带有 GET 方法的表单发…

jQuery多库共存

在现代Web开发中&#xff0c;项目往往需要集成多种JavaScript库或框架来满足不同的功能需求。然而&#xff0c;当多个库同时使用时&#xff0c;可能会出现命名冲突、功能覆盖等问题。幸运的是&#xff0c;jQuery提供了一些机制来确保其可以与其他库和谐共存。本文将探讨如何实现…

MySQL 中的聚簇索引和非聚簇索引有什么区别?

MySQL 中的聚簇索引和非聚簇索引有什么区别&#xff1f; 1. 从不同存储引擎去考虑 在MySIAM存储引擎中&#xff0c;索引和数据是分开存储的&#xff0c;包括主键索引在内的所有索引都是“非聚簇”的&#xff0c;每个索引的叶子节点存储的是数据记录的物理地址&#xff08;指针…

Java从入门到“放弃”(精通)之旅——启航①

&#x1f31f;Java从入门到“放弃 ”精通之旅&#x1f680; 今天我将要带大家一起探索神奇的Java世界&#xff01;希望能帮助到同样初学Java的你~ (๑•̀ㅂ•́)و✧ &#x1f525; Java是什么&#xff1f;为什么这么火&#xff1f; Java不仅仅是一门编程语言&#xff0c;更…

三相电为什么没零线也能通电

要理解三相电为什么没零线也能通电&#xff0c;就要从发电的原理说起 1、弧形磁铁中加入电枢&#xff0c;旋转切割磁感线会产生电流 随着电枢旋转的角度变化&#xff0c;电枢垂直切割磁感线 电枢垂直切割磁感线&#xff0c;此时会产生最大电压 当转到与磁感线平行时&#xf…

文件上传做题记录

1&#xff0c;[SWPUCTF 2021 新生赛]easyupload2.0 直接上传php 再试一下phtml 用蚁剑连发现连不上 那就只要命令执行了 2&#xff0c;[SWPUCTF 2021 新生赛]easyupload1.0 当然&#xff0c;直接上传一个php是不行的 phtml也不行&#xff0c;看下是不是前端验证&#xff0c;…

【Pandas】pandas DataFrame head

Pandas2.2 DataFrame Indexing, iteration 方法描述DataFrame.head([n])用于返回 DataFrame 的前几行 pandas.DataFrame.head pandas.DataFrame.head 是一个方法&#xff0c;用于返回 DataFrame 的前几行。这个方法非常有用&#xff0c;特别是在需要快速查看 DataFrame 的前…

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(1):承上启下,继续上路

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(1):承上启下,继续上路 1、前言(1)情况说明(2)工程师的信仰2、知识点(1)普通形(ふつうけい)と思います(2)辞書形ことができます(3)Vたことがあります。(4)Vた とき & Vる とき3、单词(1)日语单词(2…

码率自适应(ABR)相关论文阅读简报

标题&#xff1a;Quality Enhanced Multimedia Content Delivery for Mobile Cloud with Deep Reinforcement Learning 作者&#xff1a;Muhammad Saleem , Yasir Saleem, H. M. Shahzad Asif, and M. Saleem Mian 单位: 巴基斯坦拉合尔54890工程技术大学计算机科学与工程系 …

汇编语言:指令详解

零、前置知识 1、数据类型修饰符 名称解释byte一个字节&#xff0c;8bitword单字&#xff0c;占2个字节&#xff0c;16bitdword双字&#xff0c;占4个字节&#xff0c;32bitqword四字&#xff0c;占8个字节&#xff0c;64bit 2、关键词解释 ptr&#xff1a;它代表 pointer&a…

蓝桥杯c ++笔记(含算法 贪心+动态规划+dp+进制转化+便利等)

蓝桥杯 #include <iostream> #include <vector> #include <algorithm> #include <string> using namespace std; //常使用的头文件动态规划 小蓝在黑板上连续写下从 11 到 20232023 之间所有的整数&#xff0c;得到了一个数字序列&#xff1a; S12345…

【C++算法】54.链表_合并 K 个升序链表

文章目录 题目链接&#xff1a;题目描述&#xff1a;解法C 算法代码&#xff1a; 题目链接&#xff1a; 23. 合并 K 个升序链表 题目描述&#xff1a; 解法 解法一&#xff1a;暴力解法 每个链表的平均长度为n&#xff0c;有k个链表&#xff0c;时间复杂度O(nk^2) 合并两个有序…

Java中的注解技术讲解

Java中的注解&#xff08;Annotation&#xff09;是一种在代码中嵌入元数据的机制&#xff0c;不直接参与业务逻辑&#xff0c;而是为编译器、开发工具以及运行时提供额外的信息和指导。下面我们将由浅入深地讲解Java注解的概念、实现原理、各种应用场景&#xff0c;并通过代码…

京东与喜茶关系破裂:切断所有合作 禁止进入办公场所

快科技4月10日消息&#xff0c;据报道&#xff0c;京东集团近日被曝出内部下发全员禁令&#xff0c;全面封杀喜茶产品进入办公区域。 据知情人士透露&#xff0c;京东人力行政部门发布的通知明确规定&#xff1a;全国各职场禁止与喜茶品牌开展任何形式的合作&#xff1b;员工不…

+++++背到厌倦。持续更新

Spring IoC 的工作流程: 读取 BeanDefinition: Spring 容器启动时&#xff0c;会读取 Bean 的配置信息 (例如 XML 配置文件、注解或 Java 代码)&#xff0c;并将这些配置信息转换为 BeanDefinition 对象。创建 Bean 实例: 根据 BeanDefinition 中的信息&#xff0c;Spring 容器…

如何在Git历史中抹掉中文信息并翻译成英文

如何在Git历史中抹掉中文信息并翻译成英文 在软件开发和版本控制领域&#xff0c;维护一个清晰、一致的代码历史记录是至关重要的。然而&#xff0c;有时我们可能会遇到需要修改历史提交的情况&#xff0c;比如删除敏感信息或修正错误。本文将详细探讨如何在Git历史中抹掉中文…

21 天 Python 计划:MySQL中DML与权限管理

文章目录 前言一、介绍二、MySQL数据操作&#xff1a;DML2.1 插入数据&#xff08;INSERT&#xff09;2.1.1 插入完整数据&#xff08;顺序插入&#xff09;2.1.2 指定字段插入数据2.1.3 插入多条记录2.1.4 插入查询结果 2.2 更新数据&#xff08;UPDATE&#xff09;2.3 删除数…