rabbitmq 延时队列

要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。

1. 安装 RabbitMQ Delayed Message Plugin

首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可以通过以下命令安装和启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 创建交换机和队列

你需要创建一个 延时交换机x-delayed-message)和一个普通队列。我们将在发送消息时指定延迟时间。

3. 发送延迟消息的代码示例

假设你已经在 RabbitMQ 中设置了延时交换机。以下是使用 Java 和 Spring AMQP 发送延迟消息的代码示例。

Maven 依赖

确保你的项目中已经添加了 Spring AMQP 相关依赖:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.4.6</version>  <!-- 适配你使用的版本 -->
</dependency>

配置延时交换机和队列

你需要配置一个 延时交换机队列,并设置消息的延迟时间。

@Configuration
public class RabbitConfig {// 创建一个延时交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> arguments = new HashMap<>();// 设定交换机类型为延时交换机arguments.put("x-delayed-type", "direct");return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, arguments);}// 创建队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true);}// 将队列绑定到延时交换机@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routing.key").noargs();}
}

发送延迟消息

在消息发送时,你需要通过设置消息的属性来指定延迟时间。可以使用 AMQP.BasicProperties 来设置消息的 x-delay 属性,这个值表示延迟的时间(单位:毫秒)。

@Service
public class MessageProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, int delayMilliseconds) {// 创建消息属性,并设置延迟时间MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayMilliseconds);  // 设置延迟时间(毫秒)Message messageObj = new Message(message.getBytes(), messageProperties);// 发送消息到延时交换机amqpTemplate.send("delayed-exchange", "delayed.routing.key", messageObj);System.out.println("Sent delayed message: " + message + " with delay: " + delayMilliseconds + " ms");}
}

在上面的代码中,setDelay(delayMilliseconds) 方法设置了延迟时间。这个时间会告诉 RabbitMQ 延迟多久后将消息投递到队列中。

监听消息

最后,你需要设置消费者来监听这个延时队列,并处理接收到的消息:

@Service
public class MessageConsumer {@RabbitListener(queues = "delayed-queue")public void consume(String message) {System.out.println("Received delayed message: " + message);}
}

4. 测试发送延迟消息

现在,你可以在业务逻辑中调用 sendDelayedMessage 方法发送延时消息。例如,发送一条延迟 10 秒的消息:

@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 启动和测试

  1. 启动你的 Spring Boot 应用。
  2. 调用 testDelay 方法发送延迟消息。
  3. 你将看到消息在队列中延迟指定的时间(例如,10秒)后被消费。

关键点:

  • 通过 x-delayed-message 交换机,设置 x-delayed-typedirecttopic,根据需求选择交换机类型。
  • 使用 setDelay 方法设置延迟时间,单位是毫秒。
  • RabbitMQ 会在指定的时间到达后,将消息投递到目标队列。

总结

通过 RabbitMQ Delayed Message Plugin,你可以非常方便地实现延时队列。只需要创建一个支持延迟的交换机,并通过设置 x-delay 属性来指定消息的延迟时间。

配置并行消费

要启动多个消费者并并行处理 RabbitMQ 中的消息,通常可以通过 Spring AMQPRabbitListener 实现。这将帮助你加快消费速度,提升系统的吞吐量。下面是如何启动多个消费者进行并行消费的代码修改步骤:

1. 配置多个消费者

Spring AMQP 支持使用 @RabbitListener 注解启动多个消费者实例。通过配置 并行消费者,Spring 会为每个消费者实例分配一个独立的线程来处理消息。

2. 增加消费者并发处理能力

为了实现并发消费,我们可以通过以下几种方式:

  • 使用 @RabbitListener 启动多个消费者实例:每个 @RabbitListener 注解的消费者都会独立地消费队列中的消息。
  • 配置 SimpleMessageListenerContainer 的并发设置:通过配置 SimpleMessageListenerContainer,你可以设置多个消费者同时监听队列,从而提高并发消费能力。

3. 代码修改示例

1) 创建并发消费者

首先,创建一个通用的消息监听器,并将 @RabbitListener 注解应用于多个消费者实例上。你可以通过 @RabbitListener 注解中的 concurrency 属性来设置消费者的并发数量。

@Service
public class ConcurrentMessageConsumer {// 使用 @RabbitListener 注解配置多个并发消费者,默认启动2个消费者@RabbitListener(queues = "delayed-queue", concurrency = "3-5")  // 设置并发消费者数目 3-5 个消费者public void consume(String message) {System.out.println("Thread: " + Thread.currentThread().getName() + " - Received message: " + message);}
}

在上面的代码中,concurrency = "3-5" 表示 Spring 会启动 3 到 5 个消费者实例来并行处理队列中的消息。消费者数目是动态的,具体数量由 Spring 的消息监听容器控制。

  • "3-5" 表示最低启动 3 个消费者,最多启动 5 个消费者来并行处理消息。
  • 如果消息量很大,Spring 会动态调整消费者的数量,以适应系统的负载。

2) 配置并发消费者的线程池(可选)

为了更好地控制消费者的线程池和消息消费的并发度,你可以通过配置 SimpleMessageListenerContainer 来定义更具体的并发设置。例如,你可以在 Spring 配置类中手动定义消费者容器。

@Configuration
public class RabbitConfig {@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,MessageListener messageListener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("delayed-queue");container.setMessageListener(messageListener);// 设置并发消费的最小值和最大值container.setConcurrentConsumers(3);  // 最小3个消费者container.setMaxConcurrentConsumers(10);  // 最大10个消费者return container;}
}
  • setConcurrentConsumers(3):设置最小消费者数量。
  • setMaxConcurrentConsumers(10):设置最大消费者数量,Spring 会根据消息的积压情况动态调整消费者的数量。

3) 控制消费者的负载和流量

如果你希望更精细地控制消息消费的负载,可以使用 @RabbitListener 注解中的 acknowledgeMode 设置来调整消息确认模式,确保消息被正确地处理和确认。例如,使用 MANUAL 手动确认消费:

@RabbitListener(queues = "delayed-queue", ackMode = "MANUAL")
public void consumeWithAck(Message message, Channel channel) throws IOException {try {// 消费消息System.out.println("Consumed message: " + new String(message.getBody()));// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理异常,手动拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

通过手动确认,你可以更好地控制消息的确认和失败重试机制,防止在消费者挂掉的情况下丢失消息。

4. 测试并发消费

你可以通过调用 testDelay 方法或者其他方式,发送延时消息来验证并发消费是否生效。发送的消息会被多个消费者并行处理,输出的日志中会显示哪个线程消费了哪个消息,从而验证消费者的并发能力。

@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 总结

通过配置多个并发消费者来加速消息消费,有以下几个要点:

  • 使用 @RabbitListener(concurrency = "3-5") 注解来启动多个并发消费者。
  • 配置 SimpleMessageListenerContainer 来更灵活地管理消费者线程池。
  • 使用手动确认模式(ackMode = "MANUAL")可以更精细地控制消息确认和失败重试。

通过这些配置,你可以根据消息量的大小和系统负载动态调整消费者数量,以达到加快消费速度的目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/70967.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在 Vue 单文件组件(SFC)中,标签的显式关闭与隐式关闭有着重要的区别

一、显式关闭标签 1、定义&#xff1a; 所有的 HTML 标签都必须有一个对应的结束标签。 自闭合标签也必须使用 / 来关闭。 <template> <div> <p>这是一个段落</p> <img src"image.png"…

第四届大数据、区块链与经济管理国际学术会议

重要信息 官网&#xff1a;www.icbbem.com 时间&#xff1a;2025年3月14-16日 地点&#xff1a;中国-武汉 &#xff08;线上召开&#xff09; 简介 第四届大数据、区块链与经济管理国际学术会议(ICBBEM 2025)&#xff0c;将于2025年3月14-16日在中国湖北省武汉市召开。…

每日十个计算机专有名词 (7)

Metasploit 词源&#xff1a;Meta&#xff08;超越&#xff0c;超出&#xff09; exploit&#xff08;漏洞利用&#xff09; Metasploit 是一个安全测试框架&#xff0c;用来帮助安全专家&#xff08;也叫渗透测试人员&#xff09;发现和利用计算机系统中的漏洞。你可以把它想…

使用Docker Compose部署 MySQL8

MySQL 8 是一个功能强大的关系型数据库管理系统,而 Docker 则是一个流行的容器化平台。结合使用它们可以极大地简化 MySQL 8 的部署过程,并且确保开发环境和生产环境的一致性。 安装 Docker 和 Docker Compose 首先,确保你的机器上已经安装了 Docker 和 Docker Compose。 …

mamba_ssm和causal-conv1d详细安装教程

1.前言 Mamba是近年来在深度学习领域出现的一种新型结构&#xff0c;特别是在处理长序列数据方面表现优异。在本文中&#xff0c;我将介绍如何在 Linux 系统上安装并配置 mamba_ssm 虚拟环境。由于官方指定mamba_ssm适用于 PyTorch 版本高于 1.12 且 CUDA 版本大于 11.6 的环境…

c++中初始化列表的使用

在 C 中&#xff0c;初始化列表是在构造函数的定义中&#xff0c;用于对类的成员变量进行初始化的一种方式。它紧跟在构造函数的参数列表之后&#xff0c;使用冒号 : 分隔&#xff0c;各成员变量的初始化用逗号 , 分隔。下面详细介绍初始化列表及其参数的含义。 基本语法 clas…

《Linux系统编程篇》System V信号量实现生产者与消费者问题(Linux 进程间通信(IPC))——基础篇(拓展思维)

文章目录 &#x1f4da; **生产者-消费者问题**&#x1f511; **问题分析**&#x1f6e0;️ **详细实现&#xff1a;生产者-消费者****步骤 1&#xff1a;定义信号量和缓冲区****步骤 2&#xff1a;创建信号量****步骤 3&#xff1a;生产者进程****步骤 4&#xff1a;消费者进程…

利用 Python 爬虫进行跨境电商数据采集

1 引言2 代理IP的优势3 获取代理IP账号4 爬取实战案例---&#xff08;某电商网站爬取&#xff09;4.1 网站分析4.2 编写代码4.3 优化代码 5 总结 1 引言 在数字化时代&#xff0c;数据作为核心资源蕴含重要价值&#xff0c;网络爬虫成为企业洞察市场趋势、学术研究探索未知领域…

HONOR荣耀MagicBook 15 2021款 独显(BOD-WXX9,BDR-WFH9HN)原厂Win10系统

适用型号&#xff1a;【BOD-WXX9】 MagicBook 15 2021款 i7 独显 MX450 16GB512GB (BDR-WFE9HN) MagicBook 15 2021款 i5 独显 MX450 16GB512GB (BDR-WFH9HN) MagicBook 15 2021款 i5 集显 16GB512GB (BDR-WFH9HN) 链接&#xff1a;https://pan.baidu.com/s/1S6L57ADS18fnJZ1…

c语言实现三子棋小游戏(涉及二维数组、函数、循环、常量、动态取地址等知识点)

使用C语言实现一个三子棋小游戏 涉及知识点&#xff1a;二维数组、自定义函数、自带函数库、循环、常量、动态取地址等等 一些细节点&#xff1a; 1、引入自定义头文件&#xff0c;需要用""双引号包裹文件名&#xff0c;目的是为了和官方头文件的<>区分开。…

C语言数据类型及其使用 (带示例)

目录 1. 基本数据类型 整型 浮点型 字符型 2. 构造数据类型 数组 结构体 联合体&#xff08;共用体&#xff09; 枚举类型 3. 指针类型 4. 空类型 在 C 语言中&#xff0c;数据类型是非常重要的概念&#xff0c;它决定了数据在内存中的存储方式、占用空间大小以及可…

Web自动化之Selenium添加网站Cookies实现免登录

在使用Selenium进行Web自动化时&#xff0c;添加网站Cookies是实现免登录的一种高效方法。通过模拟浏览器行为&#xff0c;我们可以将已登录状态的Cookies存储起来&#xff0c;并在下次自动化测试或爬虫任务中直接加载这些Cookies&#xff0c;从而跳过登录步骤。 Cookies简介 …

NAT 技术:网络中的 “地址魔术师”

目录 一、性能瓶颈&#xff1a;NAT 的 “阿喀琉斯之踵” &#xff08;一&#xff09;数据包处理延迟 &#xff08;二&#xff09;高并发下的性能损耗 二、应用兼容性&#xff1a;NAT 带来的 “适配难题” &#xff08;一&#xff09;端到端通信的困境 &#xff08;二&…

php序列化与反序列化

文章目录 基础知识魔术方法&#xff1a;在序列化和反序列化过程中自动调用的方法什么是 __destruct() 方法&#xff1f;何时触发 __destruct() 方法&#xff1f;用途&#xff1a;语法示例&#xff1a; 反序列化漏洞利用前提条件一些绕过策略绕过__wakeup函数绕过正则匹配绕过相…

docker 占用系统空间太大了,整体迁移到挂载的其他磁盘|【当前普通用户使用docker时,无法指定镜像、容器安装位置【无法指定】】

文章目录 前言【核心步骤皆为 大模型生成的方案】总结步骤应该是&#xff1a;详细步骤如下1. **停止 Docker 服务**2. **备份原数据&#xff08;防止迁移失败&#xff09;**3. **迁移数据到新磁盘**4. **修改 Docker 配置文件**5. **重启 Docker 服务**6. **验证容器和镜像**7.…

设计后端返回给前端的返回体

目录 1、为什么要设计返回体&#xff1f; 2、返回体包含哪些内容&#xff08;如何设计&#xff09;&#xff1f; 举例 3、总结 1、为什么要设计返回体&#xff1f; 在设计后端返回给前端的返回体时&#xff0c;通常需要遵循一定的规范&#xff0c;以确保前后端交互的清晰性…

Springboot 自动化装配的原理

Springboot 自动化装配的原理 SpringBoot 主要作用为&#xff1a;起步依赖、自动装配。而为了实现这种功能&#xff0c;SpringBoot 底层主要使用了 SpringBootApplication 注解。 首先&#xff0c;SpringBootApplication 是一个复合注解&#xff0c;它结合了 Configuration、…

基于vue框架的游戏博客网站设计iw282(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,博客信息,资源共享,游戏视频,游戏照片 开题报告内容 基于FlaskVue框架的游戏博客网站设计开题报告 一、项目背景与意义 随着互联网技术的飞速发展和游戏产业的不断壮大&#xff0c;游戏玩家对游戏资讯、攻略、评测等内容的需求日…

算法-二叉树篇13-路径总和

路径总和 力扣题目链接 题目描述 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径&#xff0c;这条路径上所有节点值相加等于目标和 targetSum 。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回…

8. 示例:对32位数据总线实现位宽和值域覆盖

文章目录 前言示例一&#xff1a;示例二&#xff1a;示例三&#xff1a;仿真与覆盖率分析覆盖点详细说明覆盖率提升技巧常见错误排查 示例四&#xff1a;仿真步骤 前言 针对32位数据总线实现位宽和值域的覆盖&#xff0c;并且能够用xrun运行&#xff0c;查看日志和波形。cover…