学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}

我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28148.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶——函数重载

前言&#xff1a;C中除了可以在不同的命名空间中使用同名函数&#xff0c;还有一种支持在同一个作用域中同名函数的方式——函数重载。 函数重载 一.什么是函数重载&#xff1f;二.函数重载的3种规则三.特殊情况 一.什么是函数重载&#xff1f; C允许同样同一作用域中声明几个功…

Ubuntu 20.04 安装 Stable Diffusionn

步骤 1&#xff1a;安装 wget、git、Python3 和 Python3虚拟环境&#xff08;如果已安装可忽略这步骤&#xff09; sudo apt install wget git python3 python3-venv步骤 2&#xff1a;克隆 SD 项目到本地 git clone https://github.com/AUTOMATIC1111/stable-diffusion-webu…

Vscode python调试和运行环境设置

Vscode python调试和运行环境设置 文章目录 Vscode python调试和运行环境设置前言一、是否为每次运行python程序都要选择环境烦恼二、是否为python程序调试不能进标准/第三方库而烦恼 前言 一、是否为每次运行python程序都要选择环境烦恼 在.vscode文件夹(没有就自己造一个)下…

Flink之SideOutput(数据分流)

Flink在早期版本有一个split算子用来做数据分流使用的,但是在flink-1.12开始这个API就已经被删除了,在1.12版本以后我们是通过process算子来做数据分流的,这里就介绍一下如何使用prodess进行数据分流. 代码 import org.apache.flink.api.common.typeinfo.TypeInformation; im…

【网络编程】实现一个简单多线程版本TCP服务器(附源码)

TCP多线程 &#x1f335;预备知识&#x1f384; Accept函数&#x1f332;字节序转换函数&#x1f333;listen函数 &#x1f334;代码&#x1f331;Log.hpp&#x1f33f;Makefile☘️TCPClient.cc&#x1f340;TCPServer.cc&#x1f38d; util.hpp &#x1f335;预备知识 &…

RabbitMQ - 简单案例

目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…

对 Promise 的理解

Promise 是异步编程的一种解决方案&#xff0c;它是一个对象&#xff0c;可以获取异步 操作的消息&#xff0c;他的出现大大改善了异步编程的困境&#xff0c;避免了地狱回调&#xff0c; 它比传统的解决方案回调函数和事件更合理和更强大。 所谓 Promise&#xff0c;简单说就…

算法通关村——透彻理解二分查找

1. 循环法 public static int binarySearch(int[] arr, int low, int high, int target) {while (low < high) {// 这样写主要是避免溢出的情况&#xff0c;以及>>优先级小于&#xff0c;避免出现死循环int mid low ((high - low) >> 1);if (arr[mid] target…

7.1 动手实现AlexNet

AlexNet引入了dropput层 代码 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(# 样本数为1,通道数为96,11x11的卷积核,步幅为4&#xff0c;减少输出的高度和深度。 LeNet的通道数才6&#xff0c;此处96&#xff0c;为什么要增加这么多通…

MIT 6.830数据库系统 -- lab six

MIT 6.830数据库系统 -- lab six 项目拉取引言steal/no-force策略redo log与undo log日志格式和检查点 开始回滚练习1&#xff1a;LogFile.rollback() 恢复练习2&#xff1a;LogFile.recover() 测试结果疑问点分析 项目拉取 原项目使用ant进行项目构建&#xff0c;我已经更改为…

在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示

解码并处理视频流的多线程应用 随着视频处理技术的不断发展&#xff0c;越来越多的应用需要对视频流进行解码和处理。在本文中&#xff0c;我们将介绍一个基于Python的多线程应用程序&#xff0c;该应用程序可以解码并处理多个RTSP视频流&#xff0c;同时利用GPU加速&#xff0…

use gnustep objective-c

first app #import <Foundation/Foundation.h>int main(int argc, const char * argv[]) {NSAutoreleasePool *pool [NSAutoreleasePool new];NSLog("first start");[pool drain];return 0; }tech 专注于概念&#xff0c;而不是迷失在语言技术细节中编程语言…

微服务技术栈(1.0)

微服务技术栈 认识微服务 单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 优点&#xff1a; 架构简单部署成本低 缺点&#xff1a; 耦合度高 分布式架构 分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c…

npm四种下载方式的区别

npm install moduleName 命令 安装模块到项目node_modules目录下。 不会将模块依赖写入devDependencies或dependencies 节点。 运行 npm install 初始化项目时不会下载模块。npm install -g moduleName 命令 安装模块到全局&#xff0c;不会在项目node_modules目录中保存模块包…

如何在 Spring Boot 中集成日志框架 SLF4J、Log4j

文章目录 具体步骤附录 笔者的操作环境&#xff1a; Spring Cloud Alibaba&#xff1a;2022.0.0.0-RC2 Spring Cloud&#xff1a;2022.0.0 Spring Boot&#xff1a;3.0.2 Nacos 2.2.3 Maven 3.8.3 JDK 17.0.7 IntelliJ IDEA 2022.3.1 (Ultimate Edition) 具体步骤 因为 …

Java课题笔记~ 使用 Spring 的事务注解管理事务(掌握)

通过Transactional 注解方式&#xff0c;可将事务织入到相应 public 方法中&#xff0c;实现事务管理。 Transactional 的所有可选属性如下所示&#xff1a; propagation&#xff1a;用于设置事务传播属性。该属性类型为 Propagation 枚举&#xff0c; 默认值为 Propagation.R…

【学习日记】【FreeRTOS】链表结构体及函数详解

写在前面 本文主要是对于 FreeRTOS 中链表相关内容的详细解释&#xff0c;代码大部分参考了野火FreeRTOS教程配套源码&#xff0c;作了一小部分修改。 一、结构体定义 主要包含三种结构体&#xff1a; 普通节点结构体结尾节点&#xff08;mini节点&#xff09;结构体链表结…

awk经典实战、正则表达式

目录 1.筛选给定时间范围内的日志 2.统计独立IP 案列 需求 代码 运行结果 3.根据某字段去重 案例 运行结果 4.正则表达式 1&#xff09;认识正则 2&#xff09;匹配字符 3&#xff09;匹配次数 4&#xff09;位置锚定&#xff1a;定位出现的位置 5&#xff09;分组…

ESP32 Max30102 (3)修复心率误差

1. 运行效果 2. 新建修复心率误差.py 代码如下: from machine import sleep, SoftI2C, Pin, Timer from utime import ticks_diff, ticks_us from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM from hrcalc import calc_hr_and_spo2BEATS = 0 # 存储心率 FINGER_F…

如何识别手机是否有灵动岛(dynamic island)

如何识别手机是否有灵动岛&#xff08;dynamic island&#xff09; 灵动岛是苹果2022年9月推出的iPhone 14 Pro、iPhone 14 Pro Max首次出现&#xff0c;操作系统最低是iOS16.0。带灵动岛的手机在竖屏时顶部工具栏大于等于51像素。 #define isHaveDynamicIsland ({ BOOL isH…