RabbitMQ ④-持久化 || 死信队列 || 延迟队列 || 事务

在这里插入图片描述

消息确认机制

简单介绍

RabbitMQ Broker 发送消息给消费者后,消费者处理该消息时可能会发生异常,导致消费失败。

如果 Broker 在发送消息后就直接删了,就会导致消息的丢失。

为了保证消息可靠到达消费者并且成功处理了该消息,RabbitMQ 提供了消息确认机制。

消费者在订阅队列时,可以指定 autoAck 参数,该参数指定是否自动确认消息。

  • autoAck=true:消费者接收到消息后,自动确认消息,RabbitMQ Broker 立即删除该消息。
  • autoAck=false:消费者接收到消息后,不自动确认消息,需要消费者调用 channel.basicAck() 方法确认消息。如果消费者处理消息时发生异常,则可以调用 channel.basicNack() 方法,表示不确认该消息的接收。

Spring AMQP 提供了三种模式的消息确认

  • AcknowledgeMode.NONE:消息一经发送,就不管它了,不管消费者是否处理成功,都直接确认消息。
  • AcknowledgeMode.AUTO(默认):自动确认,消息接收后,消费者处理成功时自动确认该消息,如果处理时发送异常,则不会确认消息。
  • AcknowledgeMode.MANUAL:手动确认,消息接收后,消费者处理成功时,需要调用 channel.basicAck() 方法确认消息,如果处理时发送异常,则需要调用 channel.basicNack() 方法,表示不确认该消息的接收。

代码示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public DirectExchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}
//    @Bean("binding")
//    public Binding binding(Exchange exchange, Queue queue) {
//        return BindingBuilder.bind(queue)
//                .to(exchange)
//                .with("ack")
//                .noargs();
//    }@Bean("binding1")public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消费者消息确认喵~");return "发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");int a = 3 / 0;System.out.println("模拟处理业务完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

持久性机制

简单介绍

前面讲了消费端处理消息时,消息如何不丢失,但是如何保证 RabbitMQ 服务停掉以后,生产者发送的消息不丢失呢。默认情况下, RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息。

为了保证消息持久化,RabbitMQ 提供了持久化机制,分别是:交换机持久化、队列持久化和消息持久化。

  • 交换机持久化:使用 ExchangeBuilder.durable(true) 方法创建的交换机,RabbitMQ 会将交换机信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 队列持久化:使用 QueueBuilder.durable(true) 方法创建的队列,RabbitMQ 会将队列信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 消息持久化:消息持久化可以保证消息不丢失,即使 RabbitMQ 重启或者崩溃,消息也不会丢失。

将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,这是因为写入磁盘的速度相比于写入内存的速度还是很慢的,对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。

在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

尽管设置了持久化,也不能保证就一定可以持久化。这是因为在将这些持久化信息写入磁盘时也是需要时间的,如果 RabbitMQ 在这段时间内崩溃,那么这些信息也会丢失。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("persistQueue")public Queue persistQueue() {return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE).build();}@Bean("persistExchange")public DirectExchange persistExchange() {return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE).durable(false).build();}@Bean("binding2")public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("persist").noargs();}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/persist")public String persist() {Message message = new Message("消费者消息确认喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);return "发送成功";}
}

发送方确认机制

简单介绍

在发送方将消息发送至 RabbitMQ Broker 时,也有可能出现消息丢失的情况。

为了保证消息可靠到达 Broker,RabbitMQ 提供了发送方确认机制。

发送方确认机制是指,在消息发送到 Broker 后,发送方会等待 Broker 回应,如果发送方收到消息,则发送方认为消息发送成功,如果发送方未收到消息,则发送方认为消息发送失败,可以重新发送。

RabbitMQ 提供了两种方式保证发送方发送的消息的可靠传输

  • confirm 确认模式:发送方在发送消息后,对发送方设置一个 ConfirmCallback 的监听,无论消息是否抵达 Exchange,这个监听都会被执行,如果消息抵达了 Exchange,则 ACKtrue,如果消息没有抵达 Exchange,则 ACKfalse
  • returns 退回模式:尽管确认消息发送至 Exchange 后,也依然不能完全保证消息的可靠传输。在 ExchangeQueue 会有一个 Routing Key(Binding Key) 的绑定关系,如果消息没有匹配到任何一个 Queue,则通过 returns 模式则会退回到发送方。

代码示例

confirm 确认模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送确认机制publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("binding3")public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("confirm");}
}
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置确认消息机制rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息,消息ID:%s\n",correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",correlationData == null ? null : correlationData.getId(), cause);}}});return rabbitTemplate;}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {// ! 直接使用 setConfirmCallback 会影响其他接口的调用// ! 且只能设置一个确认回调,多次发起请求会报错
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                System.out.println("执行了confirm方法");
//                if (ack) {
//                    System.out.printf("接收到消息,消息ID:%s\n",
//                            correlationData == null ? null : correlationData.getId());
//                } else {
//                    System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
//                            correlationData == null ? null : correlationData.getId(), cause);
//                }
//            }
//        });CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);return "消息发送成功";}
}

returns 退回模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送退回机制publisher-returns: true
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置消息退回机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:" + returned);}});return rabbitTemplate;}
}

总结:如何确保消息的可靠性传输

  • 发送方 => 服务端:通过发送方确认机制confirm 确认模式returns 退回模式,确保消息可靠到达。
  • 服务端:通过持久化机制,保证消息不丢失。
  • 服务端 => 接收方:通过消息确认机制,确保消息被消费者正确消费。

重试机制

简单介绍

消息在处理失败后,重新发送,重新处理,这便是消息重试机制。

RabbitMQ 提供了消息重试机制,可以设置消息最大重试次数,超过最大重试次数还未成功消费,则消息会被丢弃。

代码示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:
#         消息接收确认机制# acknowledge-mode: manual # 手动确认时,重发机制无效acknowledge-mode: autoretry:enabled: true # 开启重试机制initial-interval: 5000ms # 重发时间间隔max-attempts: 5 # 最大重试次数
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("binding4")public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void process(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("业务处理完成");}
}

TTL 机制

简单介绍

TTL(Time To Live)机制,可以设置消息的存活时间,超过存活时间还未消费,则消息会被丢弃。

RabbitMQ 提供了 TTL 机制,可以设置队列和消息的 TTL 值,超过 TTL 值还未消费,则消息会被丢弃。

两者区别:

  • 设置队列 TTL 值,一旦消息过期,就会从队列中删除。设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期扫描对头的消息是否过期即可。
  • 设置消息 TTL 值,即使消息过期,也不会马上删除,只有在发送至消费者时才会检测其是否已经过期,如果过期才会删除。设置消息过期时间,每个消息的过期时间都可能不尽相同,所以需要扫描整个队列的消息才可确定是否过期,为了确保性能,所以采取类似于 懒汉模式 的方式。

将队列 TTL 设置为 30s,第一个消息的 TTL 设置为 30s,第二个消息的 TTL 设置为 10s。

理论上说,在 10s 后,第二个消息应该被丢弃。但由于设置了队列 TTL 值的机制,只会扫描队头的消息是否过期,所以在第一个消息过期之前,第二个消息不会被删除。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20_000) // ? 设置队列的 TTL 值.build();}@Bean("ttlExchange")public DirectExchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("binding5")public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 设置消息的 TTL 值message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);return "消息发送成功";}
}

死信队列

简单介绍

死信(Dead Letter),就是因为某种原因,导致消费者消费失败的消息,称之为死信。

死信队列,当消息在一个队列中变成死信时,它就被重新被发送到另一个交换机,该交换机就是死信交换机(Dead Letter Exchange)。

该死信交换机绑定死信队列,当消息被重新发送到死信交换机时,它就被重新投递到死信队列。

消息变成死信会有如下几种原因:

  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 参数设置为 false。
  • 消息过期。
  • 队列达到最大长度。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DlConfig {@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置该队列的死信交换机.deadLetterRoutingKey("dl") // ? 死信交换机绑定死信队列的 Routing Key.ttl(10_000).maxLength(10L) // ? 设置队列最大长度.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void processNormal(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void processDl(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");return "消息发送成功:" + new Date();}
}

延迟队列

简单介绍

延迟队列(Delay Queue),即消息被发送以后,并不想让消费者立刻消费该消息,而是等待一段时间后再消费。

延迟队列的使用场景有很多,比如:

  • 智能家居:智能设备产生的事件,如开关、温度变化等,可以先存放在延迟队列中,等待一段时间后再消费。
  • 日常管理:预定会议,需要在会议开始前 15 分钟通知参会人员。
  • 订单处理:订单创建后,需要 30 分钟后才会发货。

RabbitMQ 本身没有提供延迟队列的功能,但是基于消息过期后会变成死信的特性,可以通过设置 TTL 和死信队列来实现延迟队列的功能。

代码示例

@RequestMapping("/delay")
public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});return "发送成功!";
}

由于 RabbitMQ 检查消息是否过期的机制,如果 20s 的消息先到队列,那么 10s 的消息只会在 20s 后才会被检查到过期。

延迟队列插件

RabbitMQ 官方提供了延迟队列插件,可以实现延迟队列的功能。

延迟队列插件

安装队列插件

延迟队列插件下载地址

下载插件后,需要将插件放到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/plugins)下,然后重启 RabbitMQ 服务。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).durable(true).delayed() // ? 设置队列为延迟队列.build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void processDelay(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(20_000L); // ? 设置消息的延迟发送时间return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(10_000L); // ? 设置消息的延迟发送时间return message;});return "消息发送成功:" + new Date();}
}

事务机制

简单介绍

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制事务。

Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。

代码示例

配置事务管理器:

@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {String msg = "trans test...";System.out.println("发送第一条消息:" + msg + 1);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);int a = 3 / 0;System.out.println("发送第二条消息:" + msg + 2);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);return "消息发送成功:";
}

消息分发

简单介绍

一个队列可以给多个消费者消费,默认情况下,RabbitMQ 是以轮询的方式将消息分发给这些消费者,不管该消费是否已经消费并且确认。

这种情况是不太合理的,如果每个消费者消费的能力都不同,有的消费者消费快,有的慢,这会极大降低整体系统的吞吐量和处理速度。

我们可以使用 channel.basicQos(int prefetchCount) 来限制当前信
道上的消费者所能保持的最大未确认消息的数量。

当该消费者达到最大的 prefetchCount 限制时,RabbitMQ 会停止向该消费者分发消息,直到该消费者的未确认消息数量小于 prefetchCount 时。

代码示例

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5 # 队列最大接收五条消息
@Bean("qosQueue")
public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos");
}
@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");}return "消息发送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");System.out.println("模拟处理业务完成");//            channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}
}

应用场景

  • 限流:可以根据消费者的处理能力,设置 prefetchCount 限制每个消费者所能接收的消息数量,从而达到限流的目的。
  • 负载均衡:通过将 prefetchCount 设置为 1,通过设 prefetch = 1 的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/80683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python打卡训练营打卡记录day31

知识点回顾 规范的文件命名规范的文件夹管理机器学习项目的拆分编码格式和类型注解 作业&#xff1a;尝试针对之前的心脏病项目ipynb&#xff0c;将他按照今天的示例项目整理成规范的形式&#xff0c;思考下哪些部分可以未来复用。 心脏病项目目录 目录结构:heart/ ├── conf…

mac .zshrc:1: command not found: 0 解决方案

nano ~/.zshrc 使用自带的nano命令打开文件&#xff0c;修改后 Ctrl X 然后输入y 然后回车即可保存成功 一般情况下&#xff0c;不是常用这个命令&#xff0c;除非是遇到有问题的文件&#xff0c;才用&#xff0c; 例如 遇到下面的问题 /Users/xxli/.zshrc:1: command no…

uniapp生成的app,关于跟其他设备通信的支持和限制

以下内容通过AI生成&#xff0c;这里做一下记录。 蓝牙 移动应用&#xff08;App&#xff09;通过蓝牙与其他设备通信&#xff0c;是通过分层协作实现的。 一、通信架构分层 应用层&#xff08;App&#xff09; 调用操作系统提供的蓝牙API&#xff08;如Android的BluetoothA…

第50天-使用Python+Qt+DeepSeek开发AI运势测算

1. 环境准备 bash 复制 下载 pip install pyside6 requests python-dotenv 2. 获取DeepSeek API密钥 访问DeepSeek官网注册账号 进入控制台创建API密钥 在项目根目录创建.env文件: env 复制 下载 DEEPSEEK_API_KEY=your_api_key_here 3. 创建主应用框架 python 复制…

上位机与Hid设备通信

前置知识 什么是HID&#xff1f; HID&#xff08;Human Interface Device&#xff09;是‌直接与人交互的电子设备‌&#xff0c;通过标准化协议实现用户与计算机或其他设备的通信&#xff0c;典型代表包括键盘、鼠标、游戏手柄等。‌ 为什么HID要与qt进行通信&#xff1f; …

JVM 工具实战指南(jmap / jstack / Arthas / MAT)

&#x1f50d; 从诊断到定位&#xff1a;掌握生产级 JVM 排查工具链 &#x1f4d6; 前言&#xff1a;系统故障时&#xff0c;如何快速定位&#xff1f; 无论 JVM 理论多么扎实&#xff0c;当线上服务出现 CPU 飙高、响应超时、内存泄漏或频繁 Full GC 时&#xff0c;仅靠猜测…

mac上安装 Rust 开发环境

1.你可以按照提示在终端中执行以下命令&#xff08;安全、官方支持&#xff09;&#xff1a; curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh然后按提示继续安装即可。 注意&#xff1a;安装过程中建议选择默认配置&#xff08;按 1 即可&#xff09;。 如果遇…

C++(5)switch语句 循环while

这是一个电影评分的程序 default 就是 如果上述的都没有执行 就统一的执行default的内容。 然后记得break ___________________________________ 循环 &#xff08;while&#xff09; while的使用方式 输出 0-9的while循环

[Linux] Linux线程信号的原理与应用

Linux线程信号的原理与应用 文章目录 Linux线程信号的原理与应用**关键词****第一章 理论综述****第二章 研究方法**1. **实验设计**1.1 构建多线程测试环境1.2 信号掩码策略对比实验 2. **数据来源**2.1 内核源码分析2.2 用户态API调用日志与性能监控 **第三章 Linux信号的用法…

25.5.20学习总结

做题思路 数列分段 Section IIhttps://www.luogu.com.cn/problem/P1182正如题目所说&#xff0c;我们需要得到一个最小的最大段的值&#xff0c;可能有人将注意力放在分段上&#xff0c;事实上&#xff0c;我们更多的应该关注结果。这是一道二分答案的题&#xff0c;你可以先确…

Python爬虫-爬取百度指数之人群兴趣分布数据,进行数据分析

前言 本文是该专栏的第56篇,后面会持续分享python爬虫干货知识,记得关注。 在本专栏之前的文章《Python爬虫-爬取百度指数之需求图谱近一年数据》中,笔者有详细介绍过爬取需求图谱的数据教程。 而本文,笔者将再以百度指数为例子,基于Python爬虫获取指定关键词的人群“兴…

【工具使用】STM32CubeMX-USB配置-实现U盘功能

一、概述 无论是新手还是大佬&#xff0c;基于STM32单片机的开发&#xff0c;使用STM32CubeMX都是可以极大提升开发效率的&#xff0c;并且其界面化的开发&#xff0c;也大大降低了新手对STM32单片机的开发门槛。     本文主要讲述STM32芯片USB功能的配置及其相关知识。 二…

从ISO17025合规到信创适配 解密质检lims系统实验室的 AI 质检全链路实践

在北京某国家级质检中心的 CMA 复评审现场&#xff0c;审核专家通过系统后台调取近半年的检测记录&#xff0c;从样品登记时的电子签名到报告签发的 CA 签章&#xff0c;178 项合规指标全部自动校验通过 —— 这是白码质检 LIMS 系统创造的合规奇迹。 一、智能合规引擎&#xf…

【操作系统】进程同步问题——生产者-消费者问题

问题描述 生产者进程负责生产产品&#xff0c;并将产品存入缓冲池&#xff0c;消费者进程则从缓冲池中取出产品进行消费。为实现生产者和消费者的并发执行&#xff0c;系统在两者之间设置了一个包含n个缓冲区的缓冲池。生产者将产品放入缓冲区&#xff0c;消费者则从缓冲区中取…

SpringBoot-6-在IDEA中配置SpringBoot的Web开发测试环境

文章目录 1 环境配置1.1 JDK1.2 Maven安装配置1.2.1 安装1.2.2 配置1.3 Tomcat1.4 IDEA项目配置1.4.1 配置maven1.4.2 配置File Encodings1.4.3 配置Java Compiler1.4.4 配置Tomcat插件2 Web开发环境2.1 项目的POM文件2.2 项目的主启动类2.3 打包为jar或war2.4 访问测试3 附录3…

Vue3 父子组件传值, 跨组件传值,传函数

目录 1.父组件向子组件传值 1.1 步骤 1.2 格式 2. 子组件向父组件传值 1.1 步骤 1.2 格式 3. 跨组件传值 运行 4. 跨组件传函数 ​5. 总结 1. 父传子 2. 子传父 3. 跨组件传值(函数) 1.父组件向子组件传值 1.1 步骤 在父组件中引入子组件 在子组件标签中自定义属…

嵌入式学习笔记 - STM32 U(S)ART 模块HAL 库函数总结

一 串口发送方式&#xff1a; ①轮训方式发送&#xff0c;也就是主动发送&#xff0c;这个容易理解&#xff0c;使用如下函数&#xff1a; HAL_UART_Transmit(UART_HandleTypeDef *huart, const uint8_t *pData, uint16_t Size, uint32_t Timeout); ②中断方式发送&#xff…

AI无法解决的Bug系列(一)跨时区日期过滤问题

跨时区开发中&#xff0c;React Native如何处理新西兰的日期过滤问题 有些Bug&#xff0c;不是你写错代码&#xff0c;而是现实太魔幻。 比如我最近给新西兰客户开发一个React Native应用&#xff0c;功能非常朴素&#xff1a;用户选一个日期范围&#xff0c;系统返回该范围内…

基于天猫 API 的高效商品详情页实时数据接入方法解析

一、引言 在电商大数据分析、竞品监控及智能选品等场景中&#xff0c;实时获取天猫商品详情页数据是关键需求。本文将详细解析通过天猫开放平台 API 高效接入商品详情数据的技术方案&#xff0c;涵盖接口申请、数据获取逻辑及代码实现&#xff0c;帮助开发者快速构建实时数据采…

系分论文《论遗产系统演化》

系统分析师论文范文系列 摘要 2022年6月,某金融机构启动核心业务系统的技术升级项目,旨在对其运行超过十年的遗留系统进行演化改造。该系统承担着账户管理、支付结算等关键业务功能,但其技术架构陈旧、扩展性不足,难以适应数字化转型与业务快速增长的需求。作为系统分析师,…