如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
1.生产者确认机制
- 对应配置:
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug # Debug Info Warn Error Fatal
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher-confirm-type: correlated #ConfirmCallback 生产者消费确认到交换机publisher-returns: true #ConfirmCallback ReturnCallback 到队列template:mandatory: true
- 启动配置类
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
ApplicationContextAware ->bean工厂通知->拿到rabbitTemplate
@Slf4j
@Configuration
//生产者消息确认,确认信心到达队列
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判断是否是延迟消息if(message.getMessageProperties().getReceivedDelay()>0){return;}//失败时才会回调//处理:记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message);//可以得到所有的错误信息,有需要的话,可以选择重发信息});}
}
- 消息发送
@Test//生产者消息确认,确认信息到达交换机public void testSendMessage2SimpleQueue1() throws InterruptedException {String routingKey = "red";// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(confirm -> {if (confirm.isAck()){//ASClog.debug("消息发送到交换机成功:ID:{}",correlationData.getId());}else {//nASClog.debug("消息发送到交换机失败:ID:{},原因:{}",correlationData.getId(),confirm.getReason());}}, throwable -> {log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),throwable.getMessage());});// 4.发送消息rabbitTemplate.convertAndSend("exchange.direct", routingKey, message,correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);}
2.消息持久化
- 交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失
默认情况下,由SpringAMQP声明的交换机都是持久化的
@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}@RabbitListener
value = @Queue(name = "dl.ttl.queue", durable = "true"), 持久化exchange = @Exchange(name = "dl.ttl.direct",durable = "true"), //死信交换机
- 队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失
默认情况下,由SpringAMQP声明的队列都是持久化的
@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
- 消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定
3.1消费者确认机制
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- RabbitMQ投递消息给消费者
- 消费者获取消息后,返回ACK给RabbitMQ
- RabbitMQ删除消息
- 消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可
3.2消费失败重试机制
- 重试接收的交换机及队列配置类
@Configuration
public class ExchangeErrorQueueConfig {private final String ExchangeName ="error.direct";private final String QueueName ="error.queue";private final String RoutingKey ="error";@Bean//定义错误交换机public DirectExchange errorMessageExchange(){return new DirectExchange(ExchangeName);}//定义错误处理队列@Beanpublic Queue errorQueue(){return new Queue(QueueName);}//将交换机和队列绑定@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RoutingKey);}//定义一个RepublishMessageRecoverer,关联队列和交换机@Beanpublic RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,ExchangeName,RoutingKey);}
}
消费者两种模式配置
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug
spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1#acknowledge-mode: none # 关闭ack 消息处理抛异常时,消息依然被RabbitMQ删除acknowledge-mode: auto # ack 自动返回结果retry:enabled: true # 开启消费者失败重试 在消费者本地重试,不会返回队列initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false