RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)

RabbitMQ 核心功能

RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客


前言

最近再看 RabbitMQ,看了看自己之前写的博客,诶,一言难尽,当时学的懵懵懂懂的。这里重新整理 RabbitMQ 的核心功能。

在分布式系统中,消息队列是实现异步通信、解耦服务的关键组件。RabbitMQ 作为一款功能强大的消息队列,其消息可靠性是确保系统稳定运行的重要因素。这里将深入探讨 RabbitMQ 的消息确认机制、持久化策略、发送方确认机制以及重试机制!!


一、消息确认机制

1.1 消息确认机制概述

生产者发送消息到消费端后,可能出现消息处理成功或异常的情况。如果 RabbitMQ 在发送消息后就将其删除,当消息处理异常时,就会造成消息丢失。为了确保消费端成功接收并正确处理消息,RabbitMQ 提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据该参数设置,消息确认机制分为自动确认和手动确认两种:

  • 自动确认(autoAck=true):RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正消费到了这些消息。这种模式适合对消息可靠性要求不高的场景。
  • 手动确认(autoAck=false):RabbitMQ 会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景。

以下是basicConsume方法的定义:

/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

1.2 手动确认方法

消费者在收到消息之后,可以选择确认、直接拒绝或者跳过,RabbitMQ 提供了三种不同的确认应答方式:

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple):RabbitMQ 已知道该消息并且成功处理消息,可以将其丢弃。
    • deliveryTag:消息的唯一标识,是一个单调递增的 64 位长整型值,每个通道(Channel)独立维护,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道进行确认。
    • multiple:是否批量确认。值为true则会一次性 ack 所有小于或等于指定deliveryTag的消息;值为false,则只确认当前指定deliveryTag的消息。
  • 否定确认:Channel.basicReject(long deliveryTag, boolean requeue):消费者客户端可以调用channel.basicReject方法来告诉 RabbitMQ 拒绝这个消息。
    • deliveryTag:参考channel.basicAck
    • requeue:表示拒绝后,这条消息如何处理。如果requeue参数设置为true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue参数设置为false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。
  • 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):如果想要批量拒绝消息,可以使用Basic.Nack命令。
    • 参数介绍参考前面两个方法。multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

1.3 代码示例

我们基于 Spring Boot 来演示消息的确认机制,Spring - AMQP 对消息确认机制提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
1.3.1 AcknowledgeMode.NONE
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
  • 发送消息
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}@Configuration
public class RabbitmqConfig {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
}@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");return "发送成功!";}
}
  • 消费端逻辑
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");}
}

运行结果:消息处理失败,但消息已从 RabbitMQ 中移除,因为NONE模式下消息一旦投递就会被自动确认。

1.3.2 AcknowledgeMode.AUTO
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto

重新运行程序,当消费者出现异常时,RabbitMQ 会不断重发消息,由于异常多次重试还是失败,消息没被确认,也无法 nack,就一直是unacked状态,导致消息积压。

1.3.3 AcknowledgeMode.MANUAL
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
  • 消费端手动确认逻辑
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//手动设置一个异常, 来测试异常拒绝机制// int num = 3/0;//3. 手动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送, 若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}
}

运行结果:消息正常处理时会被签收;异常时会不断重试。


二、持久性

2.1 交换机持久化

交换器的持久化是通过在声明交换机时将durable参数置为true实现的。这样当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机,交换机会自动建立。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失。

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

2.2 队列持久化

队列的持久化是通过在声明队列时将durable参数置为true实现的。如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列就会被删掉,数据也会丢失。但队列持久化不能保证内部所存储的消息不丢失,要确保消息不丢失,需要将消息设置为持久化。

QueueBuilder.durable(Constant.ACK_QUEUE).build();

创建非持久化队列:

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); 

2.3 消息持久化

消息实现持久化,需要把消息的投递模式(MessageProperties中的deliveryMode)设置为 2,也就是MessageDeliveryMode.PERSISTENT

// 要发送的消息内容
String message = "This is a persistent message";
// 创建一个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);

需要注意的是,将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,因为写入磁盘的速度比写入内存的速度慢很多。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

即使将交换器、队列、消息都设置了持久化,也不能百分之百保证数据不丢失。例如,消费者订阅队列时autoAck参数设置为true,消费者接收到消息后还没来得及处理就宕机,会导致数据丢失;持久化的消息存入 RabbitMQ 后,还需要一段时间才能存入磁盘,如果在这段时间内 RabbitMQ 服务节点发生异常,消息可能会丢失。可以通过引入 RabbitMQ 的仲裁队列或在发送端引入事务机制、发送方确认机制来提高可靠性。(后续都会讲到)


三、发送方确认

3.1 confirm 确认模式

Producer 在发送消息时,对发送端设置一个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被执行。如果Exchange成功收到消息,ACKtrue;如果没收到消息,ACKfalse

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认

设置确认回调逻辑并发送消息

@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());}else {System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);return "确认成功";}
}
  • 测试
    运行程序,调用接口http://127.0.0.1:8080/product/confirm,观察控制台,消息确认成功。修改交换机名称,重新运行,会触发消息发送失败的结果。

3.2 return 退回模式

消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中。如果一条消息无法被任何队列消费,可以选择把消息退还给发送者。

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认
  • 设置返回回调逻辑并发送消息
@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/msgReturn")public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);return "消息发送成功";}
}

测试
运行程序,调用接口http://127.0.0.1:8080/product/msgReturn,观察控制台,消息被退回。


四、重试机制

在消息传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题可能导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送。但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数。

4.1 重试配置

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数(包括自身消费的一次)

4.2 配置交换机 & 队列

//重试机制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重试机制 发布订阅模式
//1. 交换机
@Bean("retryExchange")
public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();

五:如何保证 RabbitMQ 消息的可靠传输?

消息可能丢失的场景以及解决方案如下:

生产者将消息发送到 RabbitMQ 失败

  • 可能原因是网络问题等,
  • 解决办法是参考发送方确认 - confirm确认模式

消息在交换机中无法路由到指定队列

  • 可能原因是代码或者配置层面错误,导致消息路由失败,
  • 解决办法是参考发送方确认 - return模式

消息队列自身数据丢失

  • 可能原因是消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失,
  • 解决办法是参考持久化开启 RabbitMQ 持久化,也可以通过集群的方式提高可靠性。

消费者异常,导致消息丢失

  • 可能原因是消息到达消费者,还没来得及消费,消费者宕机或消费者逻辑有问题,
  • 解决办法是参考消息确认,开启手动确认,配置重试机制

以上就是四个RabbitMQ保证消息可靠性的四个机制,后续有更多核心机制的更新,感谢阅览!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用DeepSeek-R1-Distill-data-110k蒸馏中文数据集 微调Qwen2.5-7B-Instruct!

下载模型与数据 模型下载: huggingface: Qwen/Qwen2.5-7B-Instruct HF MirrorWe’re on a journey to advance and democratize artificial intelligence through open source and open science.https://hf-mirror.com/Qwen/Qwen2.5-7B-Instruct 魔搭&a…

在IDEA中进行git回滚操作:Reset current branch to here‌或Reset HEAD

问题描述 1)在本地修改好的代码,commit到本地仓库,突然发觉有问题不想push推到远程仓库了,但它一直在push的列表中存在,那该怎么去掉push列表中的内容呢? 2)合并别的分支到当前分支&#xff0…

六十天前端强化训练之第十一天之事件机制超详解析

欢迎来到编程星辰海的博客讲解 目录 一、事件模型演进史 1.1 原始事件模型(DOM Level 0) 1.2 DOM Level 2事件模型 1.3 DOM Level 3事件模型 二、事件流深度剖析 2.1 捕获与冒泡对比实验 2.2 事件终止方法对比 三、事件委托高级应用 3.1 动态元…

Qwen架构与Llama架构的核心区别

我们在讨论Deepseek不同版本之间的区别时了解到,DeepSeek-R1的蒸馏模型分为Qwen和Llama两个系列,包括Qwen系列的0.5B、1.5B、3B、7B、14B、32B、72B和Llama系列的8B、70B。Qwen系列以阿里通义千问(Qwen)为基础模型架构(具体是Qwen-2.5),Llama系列以Meta的Llama为基础模型…

匿名GitHub链接使用教程(Anonymous GitHub)2025

Anonymous GitHub 1. 引言2. 准备3. 进入Anonymous GitHub官网4. 用GitHub登录匿名GitHub并授权5. 进入个人中心,然后点击• Anonymize Repo实例化6. 输入你的GitHub链接7. 填写匿名链接的基础信息8. 提交9. 实例化对应匿名GitHub链接10. 进入个人中心管理项目11. 查…

工程化与框架系列(25)--低代码平台开发

低代码平台开发 🔧 引言 低代码开发平台是一种通过可视化配置和少量代码实现应用开发的技术方案。本文将深入探讨低代码平台的设计与实现,包括可视化编辑器、组件系统、数据流管理等关键主题,帮助开发者构建高效的低代码开发平台。 低代码…

Redis系列之慢查询分析与调优

Redis 慢查询分析与优化:提升性能的实战指南 Redis 作为一款高性能的内存数据库,因其快速的数据读写能力和灵活的数据结构,被广泛应用于缓存、消息队列、排行榜等多种业务场景。然而,随着业务规模的扩大和数据量的增加&#xff0…

Git系列之git tag和ReleaseMilestone

以下是关于 Git Tag、Release 和 Milestone 的深度融合内容,并补充了关于 Git Tag 的所有命令、详细解释和指令实例,条理清晰,结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交(commit&#xf…

开源项目介绍:Native-LLM-for-Android

项目地址:Native-LLM-for-Android 创作活动时间:2025年 支持在 Android 设备上运行大型语言模型 (LLM) ,具体支持的模型包括: DeepSeek-R1-Distill-Qwen: 1.5B Qwen2.5-Instruct: 0.5B, 1.5B Qwen2/2.5VL:…

深入理解 Java 虚拟机内存区域

Java 虚拟机(JVM)是 Java 程序运行的核心环境,它通过内存管理为程序提供高效的执行支持。JVM 在运行时将内存划分为多个区域,每个区域都有特定的作用和生命周期。本文将详细介绍 JVM 的运行时数据区域及其功能,并探讨与…

PDF转JPG(并去除多余的白边)

首先,手动下载一个软件(poppler for Windows),下载地址:https://github.com/oschwartz10612/poppler-windows/releases/tag/v24.08.0-0 否则会出现以下错误: PDFInfoNotInstalledError: Unable to get pag…

深入剖析MyBatis缓存机制:原理、源码与实战指南

引言 MyBatis作为一款优秀的ORM框架,其缓存机制能显著提升数据库查询性能。但许多开发者仅停留在“知道有缓存”的层面,对其实现原理和细节知之甚少。本文将结合可运行的代码示例和源码分析,手把手带您彻底掌握MyBatis缓存机制。 一、MyBatis缓存分类 MyBatis提供两级缓存…

Vue 使用 vue-router 时,多级嵌套路由缓存问题处理

Vue 使用 vue-router 时,多级嵌套路由缓存问题处理 对于三级菜单(或多级嵌套路由),vue 都是 通过 keep-alive 组件来实现路由组件的缓存。 有时候三级或者多级路由时,会出现失效情况。以下是三级菜单缓存的例子。 最…

QSplitter保存和读取

官方文档提供的方案 保存 connect(ui->splitter, &QSplitter::splitterMoved, [](){settings.setValue("splitterSizes", ui->splitter->saveState()); });读取 ui->splitter->restoreState(settings.value("splitterSizes").toByteA…

VanillaVueSvelteReactSolidAngularPreact前端框架/库的简要介绍及其优势

VanillaVueSvelteReactSolidAngularPreact前端框架/库的简要介绍及其优势。以下是这些前端框架/库的简要介绍及其优势: 1. Vanilla 定义:Vanilla 并不是一个框架,而是指 原生 JavaScript(即不使用任何框架或库)。优势…

Java多线程与高并发专题——关于CopyOnWrite 容器特点

引入 在 CopyOnWriteArrayList 出现之前,我们已经有了 ArrayList 和 LinkedList 作为 List 的数组和链表的实现,而且也有了线程安全的 Vector 和Collections.synchronizedList() 可以使用。 首先我们来看看Vector是如何实现线程安全的 ,还是…

Jmeter接口测试详解

今天笔者呢,想给大家聊聊Jmeter接口测试流程详解,废话不多说直接进入正题。 一、jmeter简介 Jmeter是由Apache公司开发的java开源项目,所以想要使用它必须基于java环境才可以; Jmeter采用多线程,允许通过多个线程并…

DeepSeek开启AI办公新模式,WPS/Office集成DeepSeek-R1本地大模型!

从央视到地方媒体,已有多家媒体机构推出AI主播,最近杭州文化广播电视集团的《杭州新闻联播》节目,使用AI主持人进行新闻播报,且做到了0失误率,可见AI正在逐渐取代部分行业和一些重复性的工作,这一现象引发很…

通过Golang的container/list实现LRU缓存算法

文章目录 力扣:146. LRU 缓存主要结构 List 和 Element常用方法1. 初始化链表2. 插入元素3. 删除元素4. 遍历链表5. 获取链表长度使用场景注意事项 源代码阅读 在 Go 语言中,container/list 包提供了一个双向链表的实现。链表是一种常见的数据结构&#…

【大学生体质】智能 AI 旅游推荐平台(Vue+SpringBoot3)-完整部署教程

智能 AI 旅游推荐平台开源文档 项目前端地址 ☀️项目介绍 智能 AI 旅游推荐平台(Intelligent AI Travel Recommendation Platform)是一个利用 AI 模型和数据分析为用户提供个性化旅游路线推荐、景点评分、旅游攻略分享等功能的综合性系统。该系统融合…