1 RabbitMQ 核心概念
RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议 (AMQP 0-9-1),为应用程序提供了异步通信的能力。在深入了解消息发送机制之前,我们需要理解几个核心概念:
- 生产者 (Producer):发送消息的应用程序,负责创建消息并将其发布到 RabbitMQ 交换器。
- 消费者 (Consumer):接收消息的应用程序,从队列中获取消息并进行处理。
- 交换器 (Exchange):消息的入口点,生产者将消息发送到交换器。交换器根据特定规则将消息路由到一个或多个队列。
- 队列 (Queue):存储消息的缓冲区,类似于邮箱名称。消息一直在队列中等待,直到被消费者处理。
- 绑定 (Binding):连接交换器和队列的路由规则,定义了消息应该如何从交换器流转到队列。
- 路由键 (Routing Key):生产者发送消息时指定的一个属性,交换器使用它来确定如何路由消息。
消息流经 RabbitMQ 的基本过程是:
- 生产者创建消息并将其发送到交换器。
- 交换器接收消息并根据其类型和绑定规则决定将消息路由到哪些队列。
- 消息存储在队列中,直到被消费者处理。
- 消费者从队列中获取消息并进行处理。
这种架构提供了应用程序解耦、异步通信和流量缓冲等优势,使分布式系统更加灵活和可靠。
2.RabbitMQ 交换器类型详解
RabbitMQ中的Exchange(交换器)是消息路由的核心组件,负责接收生产者发送的消息,并根据特定的路由规则将消息分发到一个或多个队列中。在RabbitMQ 3.13中,主要有以下几种交换器类型:
2.1. Direct Exchange(直连交换器)
特点:
- 基于精确匹配的路由机制
- 消息的Routing Key必须与Binding Key完全匹配才会被路由到相应队列
工作原理:
- 当队列绑定到Direct Exchange时,需要指定一个Binding Key
- 只有当消息的Routing Key与Binding Key完全一致时,消息才会被路由到该队列
应用场景:
- 适用于一对一的精确消息投递场景
- 比如将路由键为"sms"的消息只投递给绑定键也为"sms"的队列
2.2. Fanout Exchange(扇出交换器)
特点:
- 广播模式,忽略Routing Key
- 将消息路由到所有绑定到该交换器的队列
工作原理:
- 不需要指定Binding Key
- 发送到Fanout Exchange的每条消息都会被转发到所有绑定的队列
应用场景:
- 适用于广播通知场景
- 需要将同一条消息发送给多个消费者的场景
2.3. Topic Exchange(主题交换器)
特点:
- 基于模式匹配的路由机制
- 支持通配符匹配Routing Key和Binding Key
工作原理:
- Routing Key和Binding Key都是包含".“分隔符的字符串(如"quick.orange.rabbit”)
- 支持两种通配符:
- “*”:匹配一个单词
- “#”:匹配零个或多个单词
应用场景:
- 适用于基于主题的发布订阅模式
- 比如绑定键为"*.stock.usd"的队列会收到所有以任意单词开头,以"stock.usd"结尾的路由键的消息
2.4. Headers Exchange(头交换器)
特点:
- 基于消息头部属性进行路由
- 不依赖Routing Key进行匹配
工作原理:
- 根据消息的Headers属性进行匹配
- 队列绑定到Headers Exchange时需要指定匹配的头部属性
应用场景:
- 适用于基于消息属性的复杂路由场景
- 但性能相对较差,实际应用较少
2.5. Default Exchange(默认交换器)
特点:
- 无名的Direct Exchange
- 自动存在于每个RabbitMQ实例中
工作原理:
- 当队列创建时,会自动绑定到默认交换器
- Routing Key就是目标队列的名称
2.6. Dead Letter Exchange(死信交换器)
特点:
- 处理无法被正常投递的消息
- 不是标准的交换器类型,而是一种处理模式
工作原理:
- 当消息被拒绝、过期或队列达到最大长度时,可以配置将其发送到DLX
- 便于对无法处理的消息进行进一步处理或记录
总结
不同类型的交换器适用于不同的业务场景。在选择交换器类型时,需要根据具体的路由需求来决定:
- 需要精确匹配时使用Direct Exchange
- 需要广播消息时使用Fanout Exchange
- 需要基于模式匹配时使用Topic Exchange
- 特殊情况下可以考虑Headers Exchange
3.spring boot 整合rabbitmq
3.1 项目结构
spring-rabbitmq-demo/
├── src/
│ └── main/
│ ├── java/
│ │ └── cn/
│ │ └── spring/
│ │ └── rabbitmq/
│ │ └── demo/
│ │ ├── config/
│ │ │ ├── RabbitMQConfig.java
│ │ │ ├── RabbitmqTemplatePostProcessor.java
│ │ │ └── SwaggerConfig.java
│ │ ├── controller/
│ │ │ └── rabbitmq/
│ │ │ └── RabbitmqDemoController.java
│ │ ├── rabbitmq/
│ │ │ ├── consumer/
│ │ │ │ └── MessageConsumer.java
│ │ │ ├── message/
│ │ │ │ ├── BaseMessage.java
│ │ │ │ └── DemoMessage.java
│ │ │ └── sender/
│ │ │ └── MessageSender.java
│ │ └── RabbitmqDemoApplication.java
│ └── resources/
│ ├── application.yaml
│ └── logback-spring.xml
└── pom.xml
3.2 依赖配置
Maven 依赖
在 pom.xml 文件中添加以下依赖:
<dependencies><!-- Web 相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RabbitMQ 相关 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- Swagger 相关 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.8.11</version></dependency><!-- Jackson 用于JSON序列化 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- hutool 工具包 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.15</version></dependency></dependencies>
3.3. 配置文件
server:
port: 8081
spring:
application:
name: spring-rabbitmq-demo
profiles:
active: dev
main:
allow-circular-references: true # 允许循环依赖
--- #############rabbitmq配置#####################
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xxxx
password: xxxx
virtual-host: /
publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
# NONE(默认值):不返回任何信息。
# SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
# CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
publisher-confirm-type: NONE
listener:
# SIMPLE:RabbitMQ 消费者将消息分发到一个单独的线程(Invoker Thread)进行处理,消费者线程与业务处理线程是分离的
#特点:
#异步处理:消息消费与业务处理在不同线程中执行
#更好的容错性:业务处理异常不会直接影响 RabbitMQ 消费者线程
#更高的资源消耗:需要额外的线程来进行消息分发和处理
#支持并发消费者配置
#DIRECT:适用于对延迟敏感、吞吐量要求高的场景,或者资源受限的环境
#工作原理:监听器直接在 RabbitMQ 消费者线程上调用执行,没有额外的分发线程
#特点:
#同步处理:消息消费与业务处理在同一个线程中执行
#更低的延迟:没有线程切换开销
#更少的资源消耗:不需要额外的线程池
#更简单的线程模型:更容易调试和分析
#业务处理的异常会直接影响消费者线程
#STREAM(流容器)
#工作原理:使用 RabbitMQ Stream Client 处理消息流
#特点:
#专为高吞吐量和持久化流处理设计
#支持超大规模消息保留
#支持消费者组和消息重播
#适用于需要处理大量历史数据的场景
#需要 RabbitMQ 3.9+ 版本支持
type: simple # 使用simple类型监听器容器
simple:
# 是否在启动时自动启动容器,默认为true 当设置为 true 时,容器会在 Spring 应用上下文启动完成后自动开始监听消息;设置为 false 时,需要手动调用 start() 方法启动容器
auto-startup: true
# 侦听器调用者线程的最小数量,默认为1 控制并发消费者的最小数量,用于处理消息的并行度
concurrency: 1
# 侦听器调用者线程的最大数量,默认为1(与concurrency相同)当消息负载较高时,容器可以动态扩展到的最大消费者数量
max-concurrency: 1
# 每个消费者能够同时存在且未被确认的消息的最大数量。,默认为250
prefetch: 250
# 确认模式,可选值:NONE(不确认)、MANUAL(手动确认)、AUTO(自动确认),默认为AUTO
acknowledge-mode: AUTO
# 默认情况下,拒绝交付是否重新排队,默认为true 当监听器方法抛出异常时,决定消息是否重新放回队列。设置为 false 可以避免消息无限重试
default-requeue-rejected: true
# 应该多久发布一次空闲容器事件,默认不发布(无默认值) 用于监控容器状态,当容器在指定时间内没有消息处理时会发布 ApplicationEvent
idle-event-interval: 0ms
# 是否将批处理消息作为离散消息传递,或者将整个批处理传递给监听器,默认为true 当启用时,批量消息会被分解为单条消息分别处理;禁用时,整个批次作为一个消息传递给监听器
de-batching-enabled: true
# 当容器停止时是否立即停止还是处理完所有预取的消息,默认为false 设置为 true 时,容器会在处理完当前消息后立即停止;false 时,会处理完所有预取消息后再停止
force-stop: false
# 是否启用观察(Observation),默认为false 启用后可以通过 Micrometer 收集容器的指标信息
observation-enabled: false
# 批次大小,以物理消息数量表示,默认为1 与批量处理相关,定义每个批次包含的消息数量
batch-size: 10
# 如果容器声明的队列在代理上不可用,是否失败,默认为true 设置为 true 时,如果队列不可用容器会失败或停止;false 时容器会继续运行
missing-queues-fatal: true
# 是否基于'receive-timeout'和'batch-size'创建消息批次,默认为false 启用后会将 deBatchingEnabled 强制设为 true,将生产者创建的批次内容作为离散记录包含在批次中
consumer-batch-enabled: false
# 适中的接收超时时间
receive-timeout: 5000ms
# 重试相关配置
retry:
# 是否启用重试机制,默认为false
enabled: false
# 最大尝试次数,默认为3
max-attempts: 3
# 初始重试间隔时间,默认为1000ms
initial-interval: 1000ms
# 最大重试间隔时间,默认为10000ms
max-interval: 10000ms
# 重试间隔的乘数,默认为2.0
multiplier: 2.0
# 重试时是有状态还是无状态,默认为true(无状态)
stateless: true
template:
# 是否启用强制消息投递,默认为false
# 当设置为true时,如果消息无法路由到队列,会抛出AmqpMessageReturnedException异常
# 需要配合RabbitTemplate的ReturnsCallback使用
mandatory: false
# receive()操作的超时时间,默认为0ms(无限等待)
# 用于receive()方法调用时的等待超时时间
receive-timeout: 0ms
# sendAndReceive()操作的超时时间,默认为5000ms(5秒)
# 用于请求-回复模式下的等待超时时间
reply-timeout: 5000ms
# 发送操作使用的默认交换机名称,默认为空字符串(使用默认交换机)
# 当使用RabbitTemplate发送消息时不指定交换机时使用此默认值
exchange: ""
# 发送操作使用的默认路由键,默认为空字符串
# 当使用RabbitTemplate发送消息时不指定路由键时使用此默认值
routing-key: ""
# 当没有明确指定接收队列时,默认接收消息的队列名称,默认为null
# 用于RabbitTemplate接收消息时的默认队列
default-receive-queue:
# 是否启用观察(Observation),默认为false
# 启用后可以通过Micrometer收集RabbitTemplate的指标信息
observation-enabled: false
# 用于反序列化时允许的包/类的简单模式列表,默认为null
# 用于控制哪些类可以被反序列化,防止不安全的反序列化
allowed-list-patterns:
# 重试相关配置
retry:
# 是否启用重试机制,默认为false
# 启用后RabbitTemplate在发送消息失败时会进行重试
enabled: false
# 最大尝试次数,默认为3次
max-attempts: 3
# 初始重试间隔时间,默认为1000ms(1秒)
initial-interval: 1000ms
# 最大重试间隔时间,默认为10000ms(10秒)
max-interval: 10000ms
# 重试间隔的乘数,默认为2.0
# 每次重试的间隔时间会乘以这个数值
multiplier: 2.0
3.4 核心组件实现
3.4.1 消息实体类
BaseMessage.java
@Data
public class BaseMessage
implements java.io.Serializable {
@Serial
private static final long serialVersionUID = 1L;
protected Long deliveryTag = IdUtil.getSnowflake().nextId();
}
DemoMessage.java
@Data
public class DemoMessage
extends BaseMessage{
private Long userId;
private String message;
private Date createTime;
}
3.4.2 RabbitMQ 配置类
RabbitMQConfig.java
@Configuration
@Import(RabbitmqTemplatePostProcessor.class)
public class RabbitMQConfig
{
/**
* 延迟交换机(用于接收延迟消息)
*
* @return
*/
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.directExchange("delay.exchange")
.durable(true)
.build();
}
/**
* 延迟队列(没有消费者监听)
* 设置死信交换机和路由键,当消息过期后自动转发到死信交换机
*
* @return
*/
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
// 设置死信交换机
.deadLetterExchange("delay.process.exchange")
// 设置死信路由键
.deadLetterRoutingKey("delay.process")
.build();
}
/**
* 延迟交换机与延迟队列绑定
*
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with("delay")
.noargs();
}
/**
* json消息转换器
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 为批量处理创建专用的监听器容器工厂
*
* @param connectionFactory
* @return
*/
@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(3);
factory.setPrefetchCount(10);
factory.setBatchListener(true);
// 启用批量监听
factory.setConsumerBatchEnabled(true);
// 启用消费者端批量处理
factory.setBatchSize(10);
// 设置批次大小
factory.setReceiveTimeout(5000L);
// 设置接收超时时间
factory.setBatchReceiveTimeout(5000L);
factory.setDeBatchingEnabled(false);
// 禁用分解批处理消息
factory.setMessageConverter(messageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置为 true 可在当前消息处理完毕后停止容器,并重新排队任何预取的消息。在使用独占或单活动消费者时很有用
factory.setForceStop(true);
// 设置关闭超时时间
factory.setContainerCustomizer((container ->
{
container.setShutdownTimeout(30000L);
// container.setExclusive(true);
}));
return factory;
}
}
RabbitmqTemplatePostProcessor.java
RabbitTemplate 启用 ConfirmCallback、ReturnsCallback 和消息持久化 需要配合配置:
publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
# NONE(默认值):不返回任何信息。
# SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
# CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
publisher-confirm-type: NONE
@Slf4j
public class RabbitmqTemplatePostProcessor
implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
// 启用发送确认机制(ConfirmCallback)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功发送到交换机,correlationData: {}", correlationData);
} else {
log.error("消息发送到交换机失败,correlationData: {}, cause: {}", correlationData, cause);
}
}
});
// 启用发送失败回调机制(ReturnCallback)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息无法路由到队列,exchange: {}, routingKey: {}, message: {}, replyCode: {}, replyText: {}",
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
}
});
// 设置消息持久化
rabbitTemplate.setMandatory(true);
log.info("RabbitTemplate 配置完成:启用 ConfirmCallback、ReturnsCallback 和消息持久化");
}
return bean;
}
}
3.4.3 消息发送者
MessageSender.java
@Component
public class MessageSender
{
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.fanout.exchange:fanout-exchange}")
private String fanoutExchange;
@Value("${rabbitmq.topic.exchange:topic-exchange}")
private String topicExchange;
@Value("${rabbitmq.delay.exchange:delay.exchange}")
private String delayExchange;
/**
* 发送消息到队列demo.queue
*
* @param message
*/
public void basicSend(DemoMessage message) {
rabbitTemplate.convertAndSend("demo.queue", message);
}
/**
* 发送消息到广播exchange
*
* @param message
*/
public void fanoutSend(DemoMessage message) {
rabbitTemplate.convertAndSend(fanoutExchange, "", message);
}
/**
* 发送消息到topic exchange
*
* @param message 消息内容
* @param routingKey 路由键
*/
public void topicSend(DemoMessage message, String routingKey) {
rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
}
/**
* 发送延迟消息
*
* @param message 消息内容
* @param delay 延迟时间(毫秒)
*/
public void delaySend(DemoMessage message, long delay) {
rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg ->
{
if (delay >
0) {
msg.getMessageProperties().setExpiration(String.valueOf(delay));
}
return msg;
});
}
/**
* 批量发送消息
*
* @param messages 消息列表
*/
public void batchSend(List<
DemoMessage> messages) {
messages.forEach(message ->
{
rabbitTemplate.convertAndSend("batch.exchange", "batch", message, msg ->
{
// 自定义消息属性
return msg;
});
});
}
}
3.4.4 消息消费者
MessageConsumer.java
@Slf4j
@Component
public class MessageConsumer
{
@Resource
private ObjectProvider<
MessageConverter> messageConverterObjectProvider;
/**
* 监听并处理DemoMessage类型的消息
*
* @param message 消息内容
*/
@RabbitListener(queuesToDeclare = {
@Queue("demo.queue")
})
public void handleMessageByAnnotation(DemoMessage message) {
log.info("[handleMessageByAnnotation] 收到消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听广播消息1
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue.1"),
exchange = @Exchange(value = "fanout-exchange", type = "fanout")
))
public void handleFanoutMessage1(DemoMessage message) {
log.info("[handleFanoutMessage1] 收到广播消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听广播消息2
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue.2"),
exchange = @Exchange(value = "fanout-exchange", type = "fanout")
))
public void handleFanoutMessage2(DemoMessage message) {
log.info("[handleFanoutMessage2] 收到广播消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听topic消息1
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.1"),
exchange = @Exchange(value = "topic-exchange", type = "topic"),
key = "topic.message.specific"
))
public void handleTopicMessage1(DemoMessage message) {
log.info("[handleTopicMessage1] 收到Topic消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听topic消息2
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.2"),
exchange = @Exchange(value = "topic-exchange", type = "topic"),
key = "topic.message.*"
))
public void handleTopicMessage2(DemoMessage message) {
log.info("[handleTopicMessage2] 收到Topic消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听延迟消息处理队列
* 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("delay.process.queue"),
exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
key = "delay.process"
))
public void handleDelayMessage(DemoMessage message) {
log.info("[handleDelayMessage] 收到延迟消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 批量处理消息
*
* @param messages
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("batch.queue"),
exchange = @Exchange(value = "batch.exchange", type = "direct"),
key = "batch"
), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
log.info("[handleBatchMessage] 开始处理批量消息,共 {} 条", messages.size());
if (CollUtil.isEmpty(messages)) {
log.info("[handleBatchMessage] 消息列表为空,无需处理");
return;
}
// 分别存储成功和失败的消息
List<
Message> successMessages = new ArrayList<
>();
List<
Message> failedMessages = new ArrayList<
>();
// 批量转换消息
for (Message message : messages) {
try {
DemoMessage demoMessage = (DemoMessage) messageConverterObjectProvider.getObject().fromMessage(message);
demoMessage.setDeliveryTag(message.getMessageProperties().getDeliveryTag());
successMessages.add(message);
log.debug("[handleBatchMessage] 消息转换成功: deliveryTag={}", message.getMessageProperties().getDeliveryTag());
} catch (Exception e) {
log.error("[handleBatchMessage] 消息转换失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
failedMessages.add(message);
}
}
// 处理成功转换的消息
if (CollUtil.isNotEmpty(successMessages)) {
try {
log.info("[handleBatchMessage] 开始处理 {} 条成功转换的消息", successMessages.size());
// 模拟处理时间 - 实际应用中这里应该是真正的业务逻辑
processMessages(successMessages);
// 批量确认所有成功处理的消息
for (Message message : successMessages) {
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
log.debug("[handleBatchMessage] 消息确认成功: deliveryTag={}", deliveryTag);
} catch (IOException e) {
log.error("[handleBatchMessage] 消息确认失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
}
}
log.info("[handleBatchMessage] 成功处理并确认 {} 条消息", successMessages.size());
} catch (Exception e) {
log.error("[handleBatchMessage] 处理消息时发生异常", e);
// 如果处理过程中出现异常,将所有成功转换的消息标记为失败
failedMessages.addAll(successMessages);
}
}
// 处理转换失败的消息
if (!failedMessages.isEmpty()) {
log.warn("[handleBatchMessage] 共 {} 条消息处理失败,尝试重新入队", failedMessages.size());
for (Message message : failedMessages) {
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 第三个参数设为true,表示将消息重新放回队列
channel.basicNack(deliveryTag, false, true);
log.debug("[handleBatchMessage] 消息重新入队: deliveryTag={}", deliveryTag);
} catch (IOException e) {
log.error("[handleBatchMessage] 消息重新入队失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
}
}
}
log.info("[handleBatchMessage] 批量消息处理完成: 成功={}条, 失败={}条",
successMessages.size(), failedMessages.size());
}
/**
* 实际处理消息的方法
* 在实际应用中,这里应该包含真正的业务逻辑
*
* @param messages 待处理的消息列表
* @throws Exception 处理异常
*/
private void processMessages(List<
Message> messages) throws Exception {
// 模拟处理时间
Thread.sleep(50L);
// 实际应用中,这里应该是真正的业务逻辑处理
log.info("[processMessages] 处理了 {} 条消息", messages.size());
}
3.5 关键技术点
3.5.1 消息确认机制
在批量处理消息时,使用手动确认模式(MANUAL)来确保消息处理的可靠性:
@RabbitListener(..., ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
// 处理消息
// 手动确认消息
channel.basicAck(deliveryTag, false);
}
3.5.2 批量消息处理
@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setBatchListener(true);
// 启用批量监听
factory.setConsumerBatchEnabled(true);
// 启用消费者端批量处理
factory.setBatchSize(10);
// 设置批次大小
return factory;
}
/**
* 批量处理消息
*
* @param messages
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("batch.queue"),
exchange = @Exchange(value = "batch.exchange", type = "direct"),
key = "batch"
), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
}
3.5.3 延迟消息
public void delaySend(DemoMessage message, long delay) {
rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg ->
{
if (delay >
0) {
msg.getMessageProperties().setExpiration(String.valueOf(delay));
}
return msg;
});
}
/**
* 监听延迟消息处理队列
* 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("delay.process.queue"),
exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
key = "delay.process"
))
public void handleDelayMessage(DemoMessage message) {
...
}
4.最佳实践
- 使用 JSON 序列化:通过 Jackson2JsonMessageConverter 实现消息的 JSON 序列化,提高可读性和兼容性。
- 手动确认消息:在关键业务场景中使用手动确认模式,确保消息被正确处理。
- 异常处理:合理处理消息转换和业务处理过程中的异常,避免消息丢失。
- 批量处理:对于高吞吐量场景,使用批量处理提高处理效率。
- 配置优化:根据业务需求合理配置消费者数量、预取数量等参数。
5.总结
本文档介绍了 Spring Boot 与 RabbitMQ 的整合方案,包括基础配置、消息发送、消息消费、批量处理、延迟消息等核心功能。通过合理使用这些功能,可以构建高可用、高性能的异步消息处理系统。在实际应用中,需要根据具体业务场景进行相应的调整和优化。
源码地址:https://gitee.com/zheji/spring-rabbitmq-demo