RabbitMQ消息

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

1.生产者确认机制

在这里插入图片描述

  • 对应配置:
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug # Debug Info Warn Error Fatal
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher-confirm-type: correlated #ConfirmCallback 生产者消费确认到交换机publisher-returns: true #ConfirmCallback ReturnCallback 到队列template:mandatory: true

在这里插入图片描述

  • 启动配置类
    每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置

ApplicationContextAware ->bean工厂通知->拿到rabbitTemplate

@Slf4j
@Configuration
//生产者消息确认,确认信心到达队列
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判断是否是延迟消息if(message.getMessageProperties().getReceivedDelay()>0){return;}//失败时才会回调//处理:记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message);//可以得到所有的错误信息,有需要的话,可以选择重发信息});}
}
  • 消息发送
  @Test//生产者消息确认,确认信息到达交换机public void testSendMessage2SimpleQueue1() throws InterruptedException {String routingKey = "red";// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(confirm -> {if (confirm.isAck()){//ASClog.debug("消息发送到交换机成功:ID:{}",correlationData.getId());}else {//nASClog.debug("消息发送到交换机失败:ID:{},原因:{}",correlationData.getId(),confirm.getReason());}}, throwable -> {log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),throwable.getMessage());});// 4.发送消息rabbitTemplate.convertAndSend("exchange.direct", routingKey, message,correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);}

2.消息持久化

  • 交换机持久化
    RabbitMQ中交换机默认是非持久化的,mq重启后就丢失
    默认情况下,由SpringAMQP声明的交换机都是持久化的
@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}@RabbitListener
value = @Queue(name = "dl.ttl.queue", durable = "true"),  持久化exchange = @Exchange(name = "dl.ttl.direct",durable = "true"), //死信交换机
  • 队列持久化
    RabbitMQ中队列默认是非持久化的,mq重启后就丢失
    默认情况下,由SpringAMQP声明的队列都是持久化的
@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
  • 消息持久化
    利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode
    默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定
    在这里插入图片描述

3.1消费者确认机制

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  1. RabbitMQ投递消息给消费者
  2. 消费者获取消息后,返回ACK给RabbitMQ
  3. RabbitMQ删除消息
  4. 消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可

3.2消费失败重试机制

  • 重试接收的交换机及队列配置类
@Configuration
public class ExchangeErrorQueueConfig {private final String ExchangeName ="error.direct";private final String QueueName ="error.queue";private final String RoutingKey ="error";@Bean//定义错误交换机public DirectExchange errorMessageExchange(){return new DirectExchange(ExchangeName);}//定义错误处理队列@Beanpublic Queue errorQueue(){return new Queue(QueueName);}//将交换机和队列绑定@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RoutingKey);}//定义一个RepublishMessageRecoverer,关联队列和交换机@Beanpublic RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,ExchangeName,RoutingKey);}
}

消费者两种模式配置

logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1#acknowledge-mode: none # 关闭ack 消息处理抛异常时,消息依然被RabbitMQ删除acknowledge-mode: auto # ack 自动返回结果retry:enabled: true # 开启消费者失败重试 在消费者本地重试,不会返回队列initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true  # true无状态;false有状态。如果业务中包含事务,这里改为false

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/330630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里巴巴对Java编程【注释规约】的规约

转载自 阿里巴巴对Java编程【注释规约】的规约注释规约1. 【强制】类、类属性、类方法的注释必须使用 Javadoc 规范,使用/**内容*/格式,不得使用// xxx 方式。 说明:在 IDE 编辑窗口中, Javadoc 方式会提示相关注释,生…

动态创建标记+css_dom+js动态效果

【7】动态创建标记【7.1】一些传统方法【7.1.1】document.write方法&#xff0c;不推荐使用 &#xff08;1&#xff09;<!DOCTYPE html> <html lang"en"> <head><meta http-equiv"content-type" content"text/html; charsetutf-…

orcle安装及用户初始化

1.orcle资源 orlce安装包点击下载 2.首次安装 参照: https://jingyan.baidu.com/article/f79b7cb32095f79144023eae.html 3.卸载后安装 先卸载清除本地的orcle服务 参照: https://jingyan.baidu.com/article/6b18230943e9d7fb59e1590f.html再重新下载资源解压安装’’ 注意…

JDK8新特性之Lambda表达式

转载自 JDK8新特性之Lambda表达式 什么是Lambda表达式 Java 8的一个大亮点是引入Lambda表达式&#xff0c;使用它设计的代码会更加简洁。当开发者在编写Lambda表达式时&#xff0c;也会随之被编译成一个函数式接口。 Lambda语法 一行执行语句的写法&#xff1a; (paramete…

eclipse发布web项目到tomcat服务器

README: 使用eclipse发布web项目到tomcat有很多坑儿的。下面依依道来。 step1&#xff09;eclipse建立web 项目&#xff1a;step2&#xff09;在tomcat服务器上为该web项目配置的虚拟目录&#xff0c;即把该web项目发布到tomcat&#xff1a; tomcat的server.xml 增加如下语句&…

springboot设置默认端口访问界面

1.项目结构 2.配置方法 <1>配置类默认加载 Configuration public class WebConfigurer implements WebMvcConfigurer {Overridepublic void addViewControllers(ViewControllerRegistry registry) {//默认地址&#xff08;可以是页面或后台请求接口&#xff09;registr…

Java BigDecimal和double区别

转自&#xff1a; https://www.cnblogs.com/mingforyou/p/3344489.htmlBigDecimal类 对于不需要任何准确计算精度的数字可以直接使用float或double&#xff0c;但是如果需要精确计算的结果&#xff0c;则必须使用BigDecimal类&#xff0c;而且使用BigDecimal类也可以进行大数的…

JDK8新特性之接口默认方法与静态方法

转载自 JDK8新特性之接口默认方法与静态方法 接口默认方法与静态方法 有这样一些场景&#xff0c;如果一个接口要添加一个方法&#xff0c;那所有的接口实现类都要去实现&#xff0c;而某些实现类根本就不需要实现这个方法也要写一个空实现&#xff0c;所以接口默认方法就是为…

mybatis generator Unknown system variable 'query_cache_size' 的解决方法

出现这种错误&#xff0c;很显然是数据库驱动程序 与 数据库版本不对应&#xff1b;如 mybatis使用 mysql-5.1.10的驱动程序&#xff0c;而mybatis配置的数据源连接的是 mysql-8.0.11 &#xff0c;修改 pom文件即可&#xff0c;如下&#xff1a; <dependency><groupId…

Java NoSuchElementException: No value present 问题解决

1 问题描述 java.util.NoSuchElementException: No value 2 问题分析 2.1 Java 1、使用stream()流里面的max().get()、min().get()、findFirst().get()方法&#xff0c;由于max()、min()、findFirst()方法会返回Optional对象&#xff0c;如果Optional对象里面没有数据&#xf…

JDK8新特性之Optional

转载自 JDK8新特性之Optional Optional是什么 java.util.Optional Jdk8提供 Optional&#xff0c;一个可以包含null值的容器对象&#xff0c;可以用来代替xx ! null的判断。 Optional常用方法 of public static <T> Optional<T> of(T value) {return new Opti…

使用maven聚合安装多个maven工程到本地仓库报错的解决方法:child module pom.xml does not exist

转自&#xff1a; https://stackoverflow.com/questions/26021141/maven-child-module-does-not-exist 1&#xff09;在maven项目Parent中的 pom.xml 中 使用聚合安装多个 maven工程到本地仓库&#xff0c;pom配置如下&#xff1a; <project xmlns"http://maven.apache…

JDK8新特性之重复注解

转载自 JDK8新特性之重复注解 什么是重复注解 下面是JDK8中的重复注解&#xff08; java.lang.annotation.Repeatable&#xff09;定义的源码。 Documented Retention(RetentionPolicy.RUNTIME) Target(ElementType.ANNOTATION_TYPE) public interface Repeatable {Class<…

JDK8之新特性扩展篇

转载自 JDK8之新特性扩展篇 BASE64 base64编码解码已经被加入到了jdk8中了。 import java.nio.charset.StandardCharsets; import java.util.Base64;public class Base64Test {public static void main(String[] args) {String text "hello javastack";String en…

eclipse maven 项目发布到tomcat 报错 Failed to scan JAR [file:/C:/xxxxx.jar] from WEB-INF/lib

报错信息如下&#xff1a; 警告: Failed to scan JAR [file:/D:/Development/Tomcat/apache-tomcat-7.0.35-64bit/webapps/Monday2/WEB-INF/lib/com.springsource.net.sf.cglib-2.2.0.jar] from WEB-INF/lib java.util.zip.ZipException: error in opening zip fileat java.uti…

SpringCloud配置中心高可用搭建

转载自 SpringCloud配置中心高可用搭建 本文通过config server连接git仓库来实现配置中心&#xff0c;除了git还可以使用svn或者系统本地目录都行。 引入依赖 <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId…

maven项目 报错 java.lang.ClassNotFoundException: org.springframework.web.filter.HiddenHttpMethodFilter

报错信息如下&#xff1a; 严重: Exception starting filter hiddenHttpMethodFilter java.lang.ClassNotFoundException: org.springframework.web.filter.HiddenHttpMethodFilterat org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1714)at …

SpringCloud配置中心客户端读取配置

转载自 SpringCloud配置中心客户端读取配置 微服务连接配置中心来实现外部配置的读取。 引入依赖 <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId></d…

奇技淫巧:在spring官网上下载历史版本的spring插件,springsource-tool-suite

转自&#xff1a;https://blog.csdn.net/u010203767/article/details/69211072目前spring官网(http://spring.io/tools/sts/all)上可下载的spring插件只有&#xff1a;springsource-tool-suite-3.8.4(sts-3.8.4)。但这只针对指定的eclipse版本适用。 如何为自己的eclipse下载历…

Junit5新功能一览

转载自 Junit5新功能一览Java的JUnit测试框架已经来到了5这个版本&#xff0c;与以前的版本不同的是JUnit5具有来自多个子项目的模块&#xff0c;其中包括&#xff1a; 1、Platform&#xff0c;用于在JVM上启动测试框架&#xff0c;并通过命令行定义TestEngine API。 2、Jupite…