RabbitMQ惰性队列的工作原理、消息持久化机制、同步刷盘的概念、延迟插件的使用方法

惰性队列工作原理

惰性队列通过尽可能多地将消息存储到磁盘上来减少内存的使用。与传统队列相比,惰性队列不会主动将消息加载到内存中,而是尽量让消息停留在磁盘上,从而降低内存占用。尽管如此,它并不保证所有操作都是同步写入磁盘的。这意味着消息可能会先被缓存到操作系统的缓冲区中,然后由操作系统决定何时将其真正写入磁盘。

  • 优点:适合处理大量消息且对内存压力敏感的场景。
  • 缺点:由于频繁的磁盘I/O操作,性能可能不如传统队列。

同步刷盘的概念

同步刷盘意味着每次写入操作都会等待数据完全写入磁盘后才返回确认信息。虽然这种方式提供了更强的数据持久性保证,但它也显著增加了写入操作的延迟。对于RabbitMQ而言,可以通过设置消息为持久化来增加数据的安全性,但对于极端情况下的数据安全性要求,还需要结合其他策略如调整操作系统参数或使用文件系统级别的同步写入配置。

延迟插件的工作原理

RabbitMQ本身没有内置的延迟队列功能,但可以通过安装rabbitmq_delayed_message_exchange插件实现这一功能。该插件允许创建一个自定义交换机类型,该交换机能够根据消息头中的延迟时间属性来延迟消息的传递。

在Spring Boot中集成RabbitMQ惰性队列和延迟消息

1. 项目初始化

首先,确保你的Spring Boot项目中包含必要的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
2. 配置RabbitMQ连接

application.yml中配置RabbitMQ连接信息:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
3. 定义惰性队列

创建一个配置类来定义惰性队列:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定义惰性模式的队列* @return 返回惰性队列实例*/@Beanpublic Queue lazyQueue() {Map<String, Object> args = new HashMap<>();// 设置队列为惰性模式args.put("x-queue-mode", "lazy");return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability}
}
4. 发送持久化消息

创建一个服务类用于发送消息,并确保消息是持久化的:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送一条持久化消息到惰性队列* @param message 要发送的消息内容*/public void sendMessage(String message) {rabbitTemplate.convertAndSend("my_lazy_queue", message);System.out.println(" [x] Sent '" + message + "'");}
}

确保消息持久化可以在application.yml中设置如下:

spring:rabbitmq:template:exchange: ''routing-key: 'my_lazy_queue'mandatory: truepublisher-confirms: truepublisher-returns: true
5. 接收消息

创建一个监听器来接收消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageReceiver {/*** 监听并接收来自惰性队列的消息* @param message 接收到的消息内容*/@RabbitListener(queues = "my_lazy_queue")public void receiveMessage(String message) {System.out.println(" [x] Received '" + message + "'");}
}
6. 使用延迟插件发送延迟消息

首先,在RabbitMqConfig中定义延迟交换机:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定义延迟交换机* @return 返回延迟交换机实例*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}/*** 绑定延迟队列到延迟交换机* @param delayedQueue 延迟队列* @param delayExchange 延迟交换机* @return 返回绑定实例*/@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayExchange) {return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());}/*** 定义延迟队列* @return 返回延迟队列实例*/@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}
}

然后,创建一个服务类来发送延迟消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送带有延迟的消息* @param message 要发送的消息内容* @param delayTime 延迟时间(毫秒)*/public void sendDelayedMessage(String message, int delayTime) {MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);return message;};rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);System.out.println(" [x] Sent '" + message + "' with delay.");}
}

最后,创建一个监听器来接收延迟消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedMessageReceiver {/*** 监听并接收来自延迟队列的消息* @param message 接收到的消息内容*/@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println(" [x] Received delayed message '" + message + "'");}
}

高级特性和最佳实践

  • 发布确认机制:为了提高可靠性,可以开启发布确认机制,以确保消息确实被RabbitMQ服务器接受。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message acknowledged");} else {System.err.println("Message not acknowledged due to: " + cause);}
});
  • 预取计数(Prefetch Count):通过设置预取计数限制每个消费者同时处理的消息数量,有助于防止消费者被过多未处理的消息压垮。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConnectionConfig {@Beanpublic CachingConnectionFactory cachingConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setChannelCacheSize(25);connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);return connectionFactory;}
}

可以在application.yml中设置:

spring:rabbitmq:listener:simple:prefetch: 10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/76567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spark Core(二)

Spark-Core编程&#xff08;二&#xff09; RDD转换算子 RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型 Value类型 1&#xff09;map 将处理的数据逐条进行映射转换&#xff0c;这里的转换可以是类型的转换&#xff0c;也可以是…

C#打开文件及目录脚本

如果每天开始工作前都要做一些准备工作&#xff0c;比如打开文件或文件夹&#xff0c;我们可以使用代码一键完成。 using System.Diagnostics; using System.IO;namespace OpenFile {internal class Program{static void Main(string[] args){Console.WriteLine("Hello, …

Python生成exe

其中的 -w 参数是 PyInstaller 用于窗口模式&#xff08;Windowed mode&#xff09;&#xff0c;它会关闭命令行窗口的输出&#xff0c;这通常用于 图形界面程序&#xff08;GUI&#xff09;&#xff0c;比如使用 PyQt6, Tkinter, PySide6 等。 所以&#xff1a; 如果你在没有…

【大模型微调】如何解决llamaFactory微调效果与vllm部署效果不一致如何解决

以下个人没整理太全 一、生成式语言模型的对话模板介绍 使用Qwen/Qwen1.5-0.5B-Chat训练 对话模板不一样。回答的内容就会不一样。 我们可以看到例如qwen模型的tokenizer_config.json文件&#xff0c;就可以看到对话模板&#xff0c;一般同系列的模型&#xff0c;模板基本都…

Linux网络编程——详解网络层IP协议、网段划分、路由

目录 一、前言 二、IP协议的认识 1、什么是IP协议&#xff1f; 2、IP协议报头 三、网段划分 1、初步认识IP与路由 2、IP地址 I、DHCP动态主机配置协议 3、IP地址的划分 I、CIDR设计 II、子网数目的计算 III、子网掩码的确定 四、特殊的IP地址 五、IP地址的数量限…

ansible+docker+docker-compose快速部署4节点高可用minio集群

目录 github项目地址 示例服务器列表 安装前 修改变量文件group_vars/all.yml 修改ansible主机清单 修改setup.sh安装脚本 用法演示 安装后验证 github项目地址 https://github.com/sulibao/ansible_minio_cluster.git 示例服务器列表 安装前 修改变量文件group_var…

MySql主从相关概念

想象一下&#xff0c;你的业务飞速增长&#xff0c;用户请求如潮水般涌来&#xff0c;突然数据库主库宕机&#xff0c;数据丢失&#xff0c;服务瘫痪——这简直是开发者的噩梦&#xff01;MySQL主从复制就像一张安全网&#xff0c;通过主库写、从库读的协作模式&#xff0c;不仅…

机械臂只有位置信息是否可以进行手眼标定?

平常我在做手眼标定时&#xff0c;一般都是通过OpenCV的cv::calibrateHandEye函数进行求解&#xff0c;需要输入多组不同的机械臂位姿。今天遇到了一款舵机机器人&#xff0c;只能获取位置&#xff0c;得不到姿态信息&#xff0c;想着那就把姿态都设为0&#xff0c;结果求不出来…

华为数字芯片机考2025合集2已校正

单选 1. 题目内容 关于亚稳态的描述错误的是&#xff08; &#xff09;。 1. 解题步骤 1.1 理解亚稳态&#xff08;Metastability&#xff09;的核心特性 亚稳态是指触发器无法在指定时间内稳定输出有效逻辑电平&#xff08;0或1&#xff09;的状态&#xff0c;其关键特点…

T-Box车载系统介绍及其应用

定义 T-Box汽车系统&#xff0c;全称为Telematics - BOX&#xff0c;也常简称为车载T - BOX&#xff0c;是汽车智能系统及车联网系统中的核心组成部分&#xff0c;是安装在车辆上的一种高科技远程信息处理器。 工作原理 T-Box的核心功能主要通过MPU和MCU实现。MPU负责应用程序功…

[redis进阶一]redis的持久化(1)RDB篇章

目录 一 认识持久化 (1)先看总结图 (2)什么是持久化? (3)redis是怎么进行持久化的呢 (4)简单分析一下RDB持久化和AOF持久化的不同 二 RDB持久化 (1)RDB的触发机制 (2)RDB的bgsave执行流程 (3)RDB文件的处理 (4)RDB的优缺点 (5)RDB效果演示板书 三 温习Linux文件…

uniapp日常总结--uniapp页面跳转方式

uniapp日常总结--uniapp页面跳转方式_uniapp 跳转-CSDN博客

《汽车电器与电子技术》实验报告

SRS系统结构原理与故障检测诊断 车辆上为什么要配安全气囊&#xff1f;——解析汽车被动安全的关键防线 一、安全气囊的核心作用&#xff1a;应对高速碰撞的“救命缓冲垫” 车辆在高速碰撞时&#xff08;如正面碰撞、侧面碰撞&#xff09;&#xff0c;人体会因惯性以极高速度…

ffmpeg编解码器相关函数

文章目录 &#x1f3af; 你需要理解的核心结构体&#xff1a;&#x1f4e6; 常用函数及使用顺序&#xff08;以解码为例&#xff09;1️⃣ avcodec_find_decoder() / avcodec_find_encoder()2️⃣ avcodec_alloc_context3()3️⃣ avcodec_parameters_to_context()4️⃣ avcodec…

尚硅谷2019版Java网络编程笔记

第14章 网络编程 网络编程概述 什么是网络编程&#xff1f; 网络编程是通过网络协议实现计算机之间的数据交换。Java提供了强大的网络编程支持&#xff0c;隐藏了底层细节&#xff0c;开发者可以轻松实现网络通信。 网络编程的核心问题 如何定位网络上的主机&#xff1a;通…

解决【远程主机可能不符合 glibc 和 libstdc++ Vs code 服务器的先决条件】

可能是因为vscode不支持远程操作系统的版本&#xff0c;要么升级操作系统要么回退vscode版本 vscode回退1.97版本下载地址&#xff1a; 1.97版本VSCODE

forms+windows添加激活水印

formswindows添加激活水印 多语言水印文本&#xff0c;根据系统语言自动切换。水印显示在每个屏幕的右下角&#xff0c;位置动态调整。半透明灰色文字&#xff0c;微软雅黑字体。窗口无边框、置顶、透明背景&#xff0c;不干扰用户操作。支持多显示器。高DPI适配。 效果图&am…

LeetCode --- 444 周赛

题目列表 3507. 移除最小数对使数组有序 I 3508. 设计路由器 3509. 最大化交错和为 K 的子序列乘积 3510. 移除最小数对使数组有序 II 一、移除最小数对使数组有序 I & II 由于数组是给定的&#xff0c;所以本题的操作步骤是固定的&#xff0c;我们只要能快速模拟操作的过…

限流、降级、熔断、隔离?

在微服务架构中&#xff0c;服务限流、降级、熔断和隔离是保障系统高可用性的核心手段&#xff0c;但它们解决的问题和应用场景不同。以下是它们的区别、解决方案和实际案例的详细说明&#xff1a; 一、服务限流&#xff08;Rate Limiting&#xff09; 定义&#xff1a;通过限…

Day22 -php开发01--留言板+知识点(超全局变量 文件包含 数据库操作 第三方插件)

环境要求&#xff1a;php7.0.9 小皮 navicat phpstorm24.1 知识点&#xff1a;会写&#xff08;留言板 留言板后台&#xff09; 超全局变量 三方插件的使用 文件包含 1、开启小皮并利用navicat新建一个数据库 注意&#xff1a;本地的服务mysql关闭后 才可打开小皮。属…