RabbitMq--消息可靠性

12.消息可靠性

1.消息丢失的情况

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

image-20250310192901333

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性

2.生产者的可靠性

1.生产者重连

由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制

application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)

spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 连接超时时间template:retry:enabled: true # 开启连接超时重试机制initial-interval: 1000ms # 连接失败后的初始等待时间multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数

注意事项: 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也可以考虑使用异步线程来执行发送消息的代码

2.生产者确认

RabbitMQ 提供了 Publisher ConfirmPublisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况

  • 消息投递到了 MQ,但是路由失败(业务原因),此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功

  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功

  • 其它情况都会返回 NACK,告知生产者消息投递失败

image-20250310194409045

生产者确认机制有关的配置信息( application.yml 文件)

spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回调rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}

添加一个测试类,测试 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息发送成功System.out.println("消息发送成功,收到ack");} else {// 消息发送失败System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回调失败System.err.println("消息回调失败");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果Thread.sleep(2000);
}

如何看待和处理生产者的确认信息

  • 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
  • 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息

3.消息代理(RabbitMQ)的可靠性

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  • 一旦 RabbitMQ 宕机,内存中的消息会丢失
  • 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象

**MQ 阻塞:**当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

1.数据持久化

RabbitMQ 实现数据持久化包括 3 个方面:

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

注意事项:利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)

2. LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)

从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
  2. 消费者要处理消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执

  • 在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    image-20250310200323362

  • 在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    //注解式创建
    @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazeQueue(String message) {System.out.println("消费者收到了 laze.queue2的消息: " + message);
    }
    //编程式创建@Bean
    public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
    }
    

4.消费者的可靠性

1. 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  1. ack:成功处理消息,RabbitMQ 从队列中删除该消息
  2. nack:消息处理失败,RabbitMQ 需要再次投递消息
  3. reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  • none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活

  • auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:

    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject

开启消息确认机制,需要在 application.yml 文件中编写相关的配置

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none
2.失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力

我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队

在 application.yml 配置文件中开启失败重试机制

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 开启消息消费失败重试机制initial-interval: 1000ms # 消息消费失败后的初始等待时间multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false

在达到最大重试次数后,消息会丢失!!!怎样解决?

3. 失败消息的处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

image-20250310203318152

我们来演示一下使用 RepublishMessageRecoverer 类的情况

  • 第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
    public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
  • 第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
  • 在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

    image-20250310203613628

  • 在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

    image-20250310203633853

总结:消费者如何保证消息一定被消费?

  • 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
  • 开启消费者失败重试机制,并设置 MessageRecoverer ,多次重试失败后将消息投递到异常交换机,交由人工处理
4. 业务幂等性

在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

image-20250310203936288

那么有什么方法能够确保业务的幂等性呢

方案一:为每条消息设置一个唯一的 id

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  1. 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  3. 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId

@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息

image-20250310204104246

但这种方式对业务有一定的侵入性

方案二:结合业务判断 – 结合实例
兜底的解决方案

我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/71886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows中在VSCode/Cursor上通过CMake或launch文件配置CUDA编程环境

前置步骤 安装符合GPU型号的CUDA Toolkit 配置好 nvcc 环境变量 安装 Visual Studio 参考https://blog.csdn.net/Cony_14/article/details/137510909 VSCode 安装插件 Nsight Visual Studio Code Edition 注意:不是vscode-cudacpp。若两个插件同时安装,…

Spark(8)配置Hadoop集群环境-使用脚本命令实现集群文件同步

一.hadoop的运行模式 二.scp命令————基本使用 三.scp命令———拓展使用 四.rsync远程同步 五.xsync脚本集群之间的同步 一.hadoop的运行模式 hadoop一共有如下三种运行方式: 1. 本地运行。数据存储在linux本地,测试偶尔用一下。我们上一节课使用…

聚焦两会:科技与发展并进,赛逸展2025成创新新舞台

在十四届全国人大三次会议和全国政协十四届三次会议期间,代表委员们围绕多个关键议题展开深入讨论,为国家未来发展谋篇布局。其中,技术竞争加剧与经济转型需求成为两会焦点,将在首都北京举办的2025第七届亚洲消费电子技术贸易展&a…

【音视频】ffmpeg命令提取像素格式

1、提取YUV数据 提取yuv数据,并保持分辨率与原视频一致 使用-pix_fmt或-pixel_format指定yuv格式提取数据,并保持原来的分辨率 ffmpeg -i music.mp4 -t "01:00" -pixel_format yuv420p music.yuv提取成功后,可以使用ffplay指定y…

【从零开始学习计算机科学】计算机体系结构(二)指令级并行(ILP)

【从零开始学习计算机科学】【从零开始学习计算机科学】计算机体系结构(二)指令级并行(ILP) ILP流水线(pipeline)流水线调度循环展开和循环流水循环展开。循环展开的具体步骤可以描述为,软件流水(循环流水)。我们可以通过流水线的思想处理循环的执行,即不需要这一次的…

android edittext 防止输入多个小数点或负号

有些英文系统的输入法,或者定制输入法。使用xml限制不了输入多个小数点和多个负号。所以代码来控制。 一、通过XML设置限制 <EditTextandroid:id="@+id/editTextNumber"android:layout_width="wrap_content"android:layout_height="wrap_conten…

2019年蓝桥杯第十届CC++大学B组真题及代码

目录 1A&#xff1a;组队&#xff08;填空5分_手算&#xff09; 2B&#xff1a;年号字符&#xff08;填空5分_进制&#xff09; 3C&#xff1a;数列求值&#xff08;填空10分_枚举&#xff09; 4D&#xff1a;数的分解&#xff08;填空10分&#xff09; 5E&#xff1a;迷宫…

从C#中的MemberwiseClone()浅拷贝说起

MemberwiseClone() 是 C# 中的一个方法&#xff0c;用于创建当前对象的浅拷贝&#xff08;shallow copy&#xff09;。它属于 System.Object 类&#xff0c;因此所有 C# 对象都可以调用该方法。 1. MemberwiseClone() 的含义 浅拷贝&#xff1a;MemberwiseClone() 会创建一个新…

笔记六:单链表链表介绍与模拟实现

在他一生中&#xff0c;从来没有人能够像你们这样&#xff0c;以他的视角看待这个世界。 ---------《寻找天堂》 目录 文章目录 一、什么是链表&#xff1f; 二、为什么要使用链表&#xff1f; 三、 单链表介绍与使用 3.1 单链表 3.1.1 创建单链表节点 3.1.2 单链表的头插、…

尚硅谷爬虫note15n

1. 多条管道 多条管道开启&#xff08;2步&#xff09;&#xff1a; (1)定义管道类 &#xff08;2&#xff09;在settings中开启管道 在pipelines中&#xff1a; import urllib.request # 多条管道开启 #(1)定义管道类 #&#xff08;2&#xff09;在setti…

oracle检查字段为空

在Oracle数据库中&#xff0c;检查字段是否为空通常涉及到使用IS NULL条件。如果你想查询某个表中的字段是否为空&#xff0c;你可以使用SELECT语句结合WHERE子句来实现。这里有一些基本示例来展示如何进行这样的查询。 示例1: 检查单个字段是否为空 假设你有一个表employees…

虚幻基础:动画层接口

文章目录 动画层&#xff1a;动画图表中的函数接口&#xff1a;名字&#xff0c;没有实现。动画层接口&#xff1a;由动画蓝图实现1.动画层可直接调用实现功能2.动画层接口必须安装3.动画层默认使用本身实现4.动画层也可使用其他动画蓝图实现&#xff0c;但必须在角色蓝图中关联…

HarmonyOS学习第18天:多媒体功能全解析

一、开篇引入 在当今数字化时代&#xff0c;多媒体已经深度融入我们的日常生活。无论是在工作中通过视频会议进行沟通协作&#xff0c;还是在学习时借助在线课程的音频讲解加深理解&#xff0c;亦或是在休闲时光用手机播放音乐放松身心、观看视频打发时间&#xff0c;多媒体功…

绪论数据结构基本概念(刷题笔记)

&#xff08;一&#xff09;单选题 1.与数据元素本身的形式、相对位置和个数无关的是&#xff08;B&#xff09;【广东工业大学2019年829数据结构】 A.数据存储结构 B.数据逻辑结构 C.算法 D.操作 2.在数据结构的讨论中把数据结构从逻辑上分为&#xff08;C&#xff09;【中国…

GPTQ - 生成式预训练 Transformer 的精确训练后压缩

GPTQ - 生成式预训练 Transformer 的精确训练后压缩 flyfish 曾经是 https://github.com/AutoGPTQ/AutoGPTQ 现在是https://github.com/ModelCloud/GPTQModel 对应论文是 《Accurate Post-Training Quantization for Generative Pre-trained Transformers》 生成式预训练Tr…

git的使用方法

文章目录 前言git简介GIT的基本操作克隆仓库 (Clone)获取最新代码 (Pull)提交代码到远程仓库查看当前分支查看提交代码的日志git config 配置用户信息 GIT的实操 前言 git是一种软件版本管理工具&#xff0c;在多人团队软件开发中地方非常重要。 类似与SVN&#xff0c;git工具…

php虚拟站点提示No input file specified时的问题及权限处理方法

访问站点&#xff0c;提示如下 No input file specified. 可能是文件权限有问题&#xff0c;也可能是“.user.ini”文件路径没有配置对&#xff0c;最简单的办法就是直接将它删除掉&#xff0c;还有就是将它设置正确 #配置成自己服务器上正确的路径 open_basedir/mnt/qiy/te…

使用Langflow和AstraDB构建AI助手:从架构设计到与NocoBase的集成

本文由 Leandro Martins 编写&#xff0c;最初发布于 Building an AI Assistant with Langflow and AstraDB: From Architecture to Integration with NocoBase。 引言 本文的目标是演示如何创建一个集成了 NocoBase、LangFlow 和 VectorDB 工具的 AI 助手。作为基础&#xf…

6.聊天室环境安装 - Ubuntu22.04 - elasticsearch(es)的安装和使用

目录 介绍安装安装kibana安装ES客户端使用 介绍 Elasticsearch&#xff0c; 简称 ES&#xff0c;它是个开源分布式搜索引擎&#xff0c;它的特点有&#xff1a;分布式&#xff0c;零配置&#xff0c;自动发现&#xff0c;索引自动分片&#xff0c;索引副本机制&#xff0c;res…

SSL VXN

SSL VPN是采用SSL&#xff08;Security Socket Layer&#xff09;/TLS&#xff08;Transport Layer Security&#xff09;协议来实现远程接入的一种轻量级VPN技术,其基于B/S架构&#xff0c;免于安装客户端&#xff0c;相较与IPSEC有更高的灵活度和管理性&#xff0c;当隧道建立…