大网站建设个人网站图片
news/
2025/9/29 3:38:29/
文章来源:
大网站建设,个人网站图片,深圳企业网站制作公司查询,网站开发硬件RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点#xff0c;实现了异步通讯等一些优点#xff0c;但是在消息的传递中引入了MQ Broker必然会带来一些其他问题#xff0c;比如如何保证消息在传输过程中可靠性#xff08;即不让数据丢失#xff0c;发送一次消息就… RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点实现了异步通讯等一些优点但是在消息的传递中引入了MQ Broker必然会带来一些其他问题比如如何保证消息在传输过程中可靠性即不让数据丢失发送一次消息就会被消费一次这篇博客将详细从生产者MQ Broker以及消费者的角度讲解如何保证消息的可靠性 1消息丢失的情况
1.1 消息传递流程图如下 Producer - exchange -queue - Consumer其中exchange和queue属于MQ Broker的组件
1.2 消息可能丢失的情况
生产者给交换机exchange的过程中发生数据丢失交换机exchange路由给队列queue的过程中发生数据丢失消息到达MQ的一瞬间MQ发生了宕机的情况造成数据丢失消费者从队列queue中取出消息进行消费的一瞬间消费者宕机了造成数据丢失。
2生产者确认机制 生产者确认机制主要是站在生产者的角度来保证消息的可靠性针对的是生产者给交换机发送消息以及交换机给队列发送消息的过程中数据丢失的情况 2.1 书写配置信息
# 配置日志信息
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debugspring:rabbitmq:host: 123.207.72.43 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /publisher-confirm-type: correlatedpublisher-returns: true#消息发送失败时执行returnCallback回调函数template:mandatory: true
publisher-confirm-type表示开启publisher-confirm这个参数有两种类型分别是correlated和simplecorrelated代表异步等待回调类似于js中发送的ajax请求的回调函数MQ返回结果时会执行定义的confirmCallback函数simple代表同步等待confirm结果直到超时publisher-returns表示开启publish-return功能同样是基于callback机制不过是定义returnCallbacktemplate.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息。
2.2 定义return回调机制
我们使用的是SpringBoot来整合的RabbitMQ所以不论是return回调还是confim回调都是用rabbittemplate对象进行定义的。
Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {//记录日志log.error(消息发送队列失败响应码{}失败原因{}交换机{}路由key{}消息{},replyCode,replyText,exchange,routingKey,message.toString());//如果需要的话进行消息的重发});}
}注意
一个RabbitTemplate只能配置一个ReturnCallback所以需要在项目启动的时候进行定义这样rabbitTemplate就是全局唯一的了也可以采用PostConstruct注解中的init方法进行定义ApplicationContextAware是Spring创建完Bean工厂之后的通知方法当Spring创建完Bean工厂之后就可以在Spring容器中拿到RabbitTemplate对象了配置ReturnCallback时可以采用匿名内部类的方法简化代码如果消息发送失败可以根据需要进行消息重发操作。
2.3 定义confirm回调机制
ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同可以通过测试方法进行定义。 Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage() throws InterruptedException {//1.准备消息String message hello spring amqp;//2.准备CorrelationData//2.1 消息IDCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());//2.2 准备准备ConfirmCallbackcorrelationData.getFuture().addCallback(confirm - {if (confirm.isAck()) {log.debug(消息成功投递到交换机消息ID{}, correlationData.getId());} else {log.error(消息投递到交换机上失败消息ID{}, correlationData.getId());//重发消息}}, throwable - {//记录日志log.error(发送消息失败,throwable);//重发消息});//3.发送消息rabbitTemplate.convertAndSend(amq.topic,a.simple.hello,message,correlationData);//加上休眠时间 避免mq连接直接关闭Thread.sleep(1000);}注意
生产者给交换机发送的消息数据很多的为了区分每个消息的归属每个消息都要附属上一个ID信息可以采用UUID的方式生成唯一身份标识在发送消息的时候需要增加一个correlation变量这个变量记录了两个东西1.每个消息的ID 2.定义的cinfirm回调机制加上线程休眠的操作是为了避免消息发送到交换机之后mq的连接直接关闭这样会导致返回ack的错误。
3消息持久化 消息持久化是站在MQ Broker的角度来保证消息的可靠性的将交换机、队列以及消息设置成持久化的从而避免MQ宕机造成消息的丢失 3.1 交换机持久化
Beanpublic DirectExchange simpleDirect(){return new DirectExchange(simple.direct,true,false);}
第二个参数设置成true就是让就交换机是可持久化的第三个参数是是否自动删除一般设为false
3.2 队列持久化
Beanpublic Queue simpleQueue(){return QueueBuilder.durable(simple.queue).build();}
durable的意思就是可持久化的传入队列名称然后进行build操作这样创建的队列就是一个可持久化的队列
3.3 消息持久化
将交换机和队列设置为持久化的之后重启MQ服务器之后消息依然会丢失因为发送的消息不是可持久化的所以也需要将消息设置成可持久化的
4消费者消息确认 消费者消息确认是站在消费者的角度来保证消息可靠性的消息者处理完一条消息之后需要给MQ Broker返回一条ACK表示消息处理完成 4.1 三种确认模式
RabbitMQ支持消费者确认机制即消费者处理消息后可以向MQ发送ack回执MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式
manual手动ack需要在业务代码结束后调用api发送ackauto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nacknone关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除。
4.2 none模式的演示
1.修改消费者工程中的配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack
2.监听一个队列在监听的方法中模拟一个异常情况观察消息是否会被删除
RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) {log.debug(消费者接收到simple.queue的消息【 msg 】);//这里模拟一个异常System.out.println(1 / 0);log.info(消费者处理消息成功);}
3.在rabbitmq控制台模拟发送一条消息观察抛出异常之后消息是否会重发 抛出异常消费者并没有处理消息成功再观察控制台是否将消息删除 队列中已经没有消息了说明消息被删除了 消费者确认机制为none的时候只要消费者拿到消息之后MQ就会把消息删除不关心消费者是否将消息成功处理 4.3 auto模式的演示
1.修改消费者工程中的配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack 2.监听一个队列在监听的方法中模拟一个异常情况观察消息是否会被删除
RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) {log.debug(消费者接收到simple.queue的消息【 msg 】);//这里模拟一个异常System.out.println(1 / 0);log.info(消费者处理消息成功);}
3.在rabbitmq控制台模拟发送一条消息观察抛出异常之后消息是否会重发 消费者确认机制为auto的时候消费者拿到消息之后MQ并不会立刻删除队列中的消息只有消费者成功处理完消息之后给队列返回一个ack的时候队列才会删除消息 5 消费者失败重试机制 我们发现当消费者确认机制为auto时如果代码中出现了异常消息会进行重复入队列requeue的操作重复入队的操作对于MQ来说开销会非常大消息处理飙升所以引入了失败重试机制当代码中出现了异常的时候消费者内部会进行重发的操作可以控制重发的时间和次数如果超过设置的重发次数消费者还未成功处理消息默认将消息丢弃 5.1 本地重试
Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列可以在消费者工程的yml文件中添加如下配置
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 3 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 4 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false 4次重发之后消息还未成功处理spring抛出了AmqpRejectAndDontRequeueException异常这是失败之后的默认处理方式默认消费者给队列返回了ack此时队列会将消息从队列中删除
5.2 失败策略
失败达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecovery接口来处理它包含三种不同的实现
RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息默认就是这种方式ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机。
如果消息这个消息比较重要达到最大重试次数之后这个消息不能被丢弃该怎么办此时就可以使用RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。
Configuration
public class ErrorMessageConfig {//定义失败之后处理的交换机和队列Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}//将交换机和队列进行绑定Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}//定义一个RepublishMessageRecoverer替换spring默认的处理机制Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}
流程图如下 6 如何保证RabbitMQ消息的可靠性
开启生产者确认机制确保生产者的消息能到达队列开启持久化功能确保消息未消费前在队列中不会丢失开启消费者确认机制为auto由spring确认消息处理成功后完成ack开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/921401.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!