深入解析:spring boot3.0整合rabbitmq3.13

news/2025/9/21 8:09:33/文章来源:https://www.cnblogs.com/yfceshi/p/19103152

1 RabbitMQ 核心概念

RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议 (AMQP 0-9-1),为应用程序提供了异步通信的能力。在深入了解消息发送机制之前,我们需要理解几个核心概念:

  • 生产者 (Producer):发送消息的应用程序,负责创建消息并将其发布到 RabbitMQ 交换器。
  • 消费者 (Consumer):接收消息的应用程序,从队列中获取消息并进行处理。
  • 交换器 (Exchange):消息的入口点,生产者将消息发送到交换器。交换器根据特定规则将消息路由到一个或多个队列。
  • 队列 (Queue):存储消息的缓冲区,类似于邮箱名称。消息一直在队列中等待,直到被消费者处理。
  • 绑定 (Binding):连接交换器和队列的路由规则,定义了消息应该如何从交换器流转到队列。
  • 路由键 (Routing Key):生产者发送消息时指定的一个属性,交换器使用它来确定如何路由消息。

消息流经 RabbitMQ 的基本过程是:

  • 生产者创建消息并将其发送到交换器。
  • 交换器接收消息并根据其类型和绑定规则决定将消息路由到哪些队列。
  • 消息存储在队列中,直到被消费者处理。
  • 消费者从队列中获取消息并进行处理。

这种架构提供了应用程序解耦、异步通信和流量缓冲等优势,使分布式系统更加灵活和可靠。

2.RabbitMQ 交换器类型详解

RabbitMQ中的Exchange(交换器)是消息路由的核心组件,负责接收生产者发送的消息,并根据特定的路由规则将消息分发到一个或多个队列中。在RabbitMQ 3.13中,主要有以下几种交换器类型:

2.1. Direct Exchange(直连交换器)

特点

  • 基于精确匹配的路由机制
  • 消息的Routing Key必须与Binding Key完全匹配才会被路由到相应队列

工作原理

  • 当队列绑定到Direct Exchange时,需要指定一个Binding Key
  • 只有当消息的Routing Key与Binding Key完全一致时,消息才会被路由到该队列

应用场景

  • 适用于一对一的精确消息投递场景
  • 比如将路由键为"sms"的消息只投递给绑定键也为"sms"的队列

2.2. Fanout Exchange(扇出交换器)

特点

工作原理

应用场景

2.3. Topic Exchange(主题交换器)

特点

  • 基于模式匹配的路由机制
  • 支持通配符匹配Routing Key和Binding Key

工作原理

  • Routing Key和Binding Key都是包含".“分隔符的字符串(如"quick.orange.rabbit”)
  • 支持两种通配符:
    • “*”:匹配一个单词
    • “#”:匹配零个或多个单词

应用场景

  • 适用于基于主题的发布订阅模式
  • 比如绑定键为"*.stock.usd"的队列会收到所有以任意单词开头,以"stock.usd"结尾的路由键的消息

2.4. Headers Exchange(头交换器)

特点

  • 基于消息头部属性进行路由
  • 不依赖Routing Key进行匹配

工作原理

  • 根据消息的Headers属性进行匹配
  • 队列绑定到Headers Exchange时需要指定匹配的头部属性

应用场景

  • 适用于基于消息属性的复杂路由场景
  • 但性能相对较差,实际应用较少

2.5. Default Exchange(默认交换器)

特点

工作原理

2.6. Dead Letter Exchange(死信交换器)

特点

  • 处理无法被正常投递的消息
  • 不是标准的交换器类型,而是一种处理模式

工作原理

  • 当消息被拒绝、过期或队列达到最大长度时,可以配置将其发送到DLX
  • 便于对无法处理的消息进行进一步处理或记录

总结

不同类型的交换器适用于不同的业务场景。在选择交换器类型时,需要根据具体的路由需求来决定:

3.spring boot 整合rabbitmq

3.1 项目结构

spring-rabbitmq-demo/
├── src/
│   └── main/
│       ├── java/
│       │   └── cn/
│       │       └── spring/
│       │           └── rabbitmq/
│       │               └── demo/
│       │                   ├── config/
│       │                   │   ├── RabbitMQConfig.java
│       │                   │   ├── RabbitmqTemplatePostProcessor.java
│       │                   │   └── SwaggerConfig.java
│       │                   ├── controller/
│       │                   │   └── rabbitmq/
│       │                   │       └── RabbitmqDemoController.java
│       │                   ├── rabbitmq/
│       │                   │   ├── consumer/
│       │                   │   │   └── MessageConsumer.java
│       │                   │   ├── message/
│       │                   │   │   ├── BaseMessage.java
│       │                   │   │   └── DemoMessage.java
│       │                   │   └── sender/
│       │                   │       └── MessageSender.java
│       │                   └── RabbitmqDemoApplication.java
│       └── resources/
│           ├── application.yaml
│           └── logback-spring.xml
└── pom.xml

3.2 依赖配置

Maven 依赖

在 pom.xml 文件中添加以下依赖:

<dependencies><!-- Web 相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RabbitMQ 相关 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- Swagger 相关 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.8.11</version></dependency><!-- Jackson 用于JSON序列化 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- hutool 工具包 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.15</version></dependency></dependencies>

3.3. 配置文件

server:
port: 8081
spring:
application:
name: spring-rabbitmq-demo
profiles:
active: dev
main:
allow-circular-references: true # 允许循环依赖
--- #############rabbitmq配置#####################
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xxxx
password: xxxx
virtual-host: /
publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
# NONE(默认值):不返回任何信息。
# SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
# CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
publisher-confirm-type: NONE
listener:
# SIMPLE:RabbitMQ 消费者将消息分发到一个单独的线程(Invoker Thread)进行处理,消费者线程与业务处理线程是分离的
#特点:
#异步处理:消息消费与业务处理在不同线程中执行
#更好的容错性:业务处理异常不会直接影响 RabbitMQ 消费者线程
#更高的资源消耗:需要额外的线程来进行消息分发和处理
#支持并发消费者配置
#DIRECT:适用于对延迟敏感、吞吐量要求高的场景,或者资源受限的环境
#工作原理:监听器直接在 RabbitMQ 消费者线程上调用执行,没有额外的分发线程
#特点:
#同步处理:消息消费与业务处理在同一个线程中执行
#更低的延迟:没有线程切换开销
#更少的资源消耗:不需要额外的线程池
#更简单的线程模型:更容易调试和分析
#业务处理的异常会直接影响消费者线程
#STREAM(流容器)
#工作原理:使用 RabbitMQ Stream Client 处理消息流
#特点:
#专为高吞吐量和持久化流处理设计
#支持超大规模消息保留
#支持消费者组和消息重播
#适用于需要处理大量历史数据的场景
#需要 RabbitMQ 3.9+ 版本支持
type: simple # 使用simple类型监听器容器
simple:
# 是否在启动时自动启动容器,默认为true 当设置为 true 时,容器会在 Spring 应用上下文启动完成后自动开始监听消息;设置为 false 时,需要手动调用 start() 方法启动容器
auto-startup: true
# 侦听器调用者线程的最小数量,默认为1 控制并发消费者的最小数量,用于处理消息的并行度
concurrency: 1
# 侦听器调用者线程的最大数量,默认为1(与concurrency相同)当消息负载较高时,容器可以动态扩展到的最大消费者数量
max-concurrency: 1
# 每个消费者能够同时存在且未被确认的消息的最大数量。,默认为250
prefetch: 250
# 确认模式,可选值:NONE(不确认)、MANUAL(手动确认)、AUTO(自动确认),默认为AUTO
acknowledge-mode: AUTO
# 默认情况下,拒绝交付是否重新排队,默认为true 当监听器方法抛出异常时,决定消息是否重新放回队列。设置为 false 可以避免消息无限重试
default-requeue-rejected: true
# 应该多久发布一次空闲容器事件,默认不发布(无默认值) 用于监控容器状态,当容器在指定时间内没有消息处理时会发布 ApplicationEvent
idle-event-interval: 0ms
# 是否将批处理消息作为离散消息传递,或者将整个批处理传递给监听器,默认为true 当启用时,批量消息会被分解为单条消息分别处理;禁用时,整个批次作为一个消息传递给监听器
de-batching-enabled: true
# 当容器停止时是否立即停止还是处理完所有预取的消息,默认为false 设置为 true 时,容器会在处理完当前消息后立即停止;false 时,会处理完所有预取消息后再停止
force-stop: false
# 是否启用观察(Observation),默认为false 启用后可以通过 Micrometer 收集容器的指标信息
observation-enabled: false
# 批次大小,以物理消息数量表示,默认为1 与批量处理相关,定义每个批次包含的消息数量
batch-size: 10
# 如果容器声明的队列在代理上不可用,是否失败,默认为true 设置为 true 时,如果队列不可用容器会失败或停止;false 时容器会继续运行
missing-queues-fatal: true
# 是否基于'receive-timeout'和'batch-size'创建消息批次,默认为false 启用后会将 deBatchingEnabled 强制设为 true,将生产者创建的批次内容作为离散记录包含在批次中
consumer-batch-enabled: false
# 适中的接收超时时间
receive-timeout: 5000ms
# 重试相关配置
retry:
# 是否启用重试机制,默认为false
enabled: false
# 最大尝试次数,默认为3
max-attempts: 3
# 初始重试间隔时间,默认为1000ms
initial-interval: 1000ms
# 最大重试间隔时间,默认为10000ms
max-interval: 10000ms
# 重试间隔的乘数,默认为2.0
multiplier: 2.0
# 重试时是有状态还是无状态,默认为true(无状态)
stateless: true
template:
# 是否启用强制消息投递,默认为false
# 当设置为true时,如果消息无法路由到队列,会抛出AmqpMessageReturnedException异常
# 需要配合RabbitTemplate的ReturnsCallback使用
mandatory: false
# receive()操作的超时时间,默认为0ms(无限等待)
# 用于receive()方法调用时的等待超时时间
receive-timeout: 0ms
# sendAndReceive()操作的超时时间,默认为5000ms(5秒)
# 用于请求-回复模式下的等待超时时间
reply-timeout: 5000ms
# 发送操作使用的默认交换机名称,默认为空字符串(使用默认交换机)
# 当使用RabbitTemplate发送消息时不指定交换机时使用此默认值
exchange: ""
# 发送操作使用的默认路由键,默认为空字符串
# 当使用RabbitTemplate发送消息时不指定路由键时使用此默认值
routing-key: ""
# 当没有明确指定接收队列时,默认接收消息的队列名称,默认为null
# 用于RabbitTemplate接收消息时的默认队列
default-receive-queue:
# 是否启用观察(Observation),默认为false
# 启用后可以通过Micrometer收集RabbitTemplate的指标信息
observation-enabled: false
# 用于反序列化时允许的包/类的简单模式列表,默认为null
# 用于控制哪些类可以被反序列化,防止不安全的反序列化
allowed-list-patterns:
# 重试相关配置
retry:
# 是否启用重试机制,默认为false
# 启用后RabbitTemplate在发送消息失败时会进行重试
enabled: false
# 最大尝试次数,默认为3次
max-attempts: 3
# 初始重试间隔时间,默认为1000ms(1秒)
initial-interval: 1000ms
# 最大重试间隔时间,默认为10000ms(10秒)
max-interval: 10000ms
# 重试间隔的乘数,默认为2.0
# 每次重试的间隔时间会乘以这个数值
multiplier: 2.0

3.4 核心组件实现

3.4.1 消息实体类

BaseMessage.java

@Data
public class BaseMessage
implements java.io.Serializable {
@Serial
private static final long serialVersionUID = 1L;
protected Long deliveryTag = IdUtil.getSnowflake().nextId();
}

DemoMessage.java

@Data
public class DemoMessage
extends BaseMessage{
private Long userId;
private String message;
private Date createTime;
}
3.4.2 RabbitMQ 配置类

RabbitMQConfig.java

@Configuration
@Import(RabbitmqTemplatePostProcessor.class)
public class RabbitMQConfig
{
/**
* 延迟交换机(用于接收延迟消息)
*
* @return
*/
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.directExchange("delay.exchange")
.durable(true)
.build();
}
/**
* 延迟队列(没有消费者监听)
* 设置死信交换机和路由键,当消息过期后自动转发到死信交换机
*
* @return
*/
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
// 设置死信交换机
.deadLetterExchange("delay.process.exchange")
// 设置死信路由键
.deadLetterRoutingKey("delay.process")
.build();
}
/**
* 延迟交换机与延迟队列绑定
*
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with("delay")
.noargs();
}
/**
* json消息转换器
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 为批量处理创建专用的监听器容器工厂
*
* @param connectionFactory
* @return
*/
@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(3);
factory.setPrefetchCount(10);
factory.setBatchListener(true);
// 启用批量监听
factory.setConsumerBatchEnabled(true);
// 启用消费者端批量处理
factory.setBatchSize(10);
// 设置批次大小
factory.setReceiveTimeout(5000L);
// 设置接收超时时间
factory.setBatchReceiveTimeout(5000L);
factory.setDeBatchingEnabled(false);
// 禁用分解批处理消息
factory.setMessageConverter(messageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置为 true 可在当前消息处理完毕后停止容器,并重新排队任何预取的消息。在使用独占或单活动消费者时很有用
factory.setForceStop(true);
// 设置关闭超时时间
factory.setContainerCustomizer((container ->
{
container.setShutdownTimeout(30000L);
// container.setExclusive(true);
}));
return factory;
}
}

RabbitmqTemplatePostProcessor.java
RabbitTemplate 启用 ConfirmCallback、ReturnsCallback 和消息持久化 需要配合配置:

publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
# NONE(默认值):不返回任何信息。
# SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
# CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
publisher-confirm-type: NONE
@Slf4j
public class RabbitmqTemplatePostProcessor
implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
// 启用发送确认机制(ConfirmCallback)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功发送到交换机,correlationData: {}", correlationData);
} else {
log.error("消息发送到交换机失败,correlationData: {}, cause: {}", correlationData, cause);
}
}
});
// 启用发送失败回调机制(ReturnCallback)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息无法路由到队列,exchange: {}, routingKey: {}, message: {}, replyCode: {}, replyText: {}",
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
}
});
// 设置消息持久化
rabbitTemplate.setMandatory(true);
log.info("RabbitTemplate 配置完成:启用 ConfirmCallback、ReturnsCallback 和消息持久化");
}
return bean;
}
}
3.4.3 消息发送者

MessageSender.java

@Component
public class MessageSender
{
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.fanout.exchange:fanout-exchange}")
private String fanoutExchange;
@Value("${rabbitmq.topic.exchange:topic-exchange}")
private String topicExchange;
@Value("${rabbitmq.delay.exchange:delay.exchange}")
private String delayExchange;
/**
* 发送消息到队列demo.queue
*
* @param message
*/
public void basicSend(DemoMessage message) {
rabbitTemplate.convertAndSend("demo.queue", message);
}
/**
* 发送消息到广播exchange
*
* @param message
*/
public void fanoutSend(DemoMessage message) {
rabbitTemplate.convertAndSend(fanoutExchange, "", message);
}
/**
* 发送消息到topic exchange
*
* @param message 消息内容
* @param routingKey 路由键
*/
public void topicSend(DemoMessage message, String routingKey) {
rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
}
/**
* 发送延迟消息
*
* @param message 消息内容
* @param delay 延迟时间(毫秒)
*/
public void delaySend(DemoMessage message, long delay) {
rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg ->
{
if (delay >
0) {
msg.getMessageProperties().setExpiration(String.valueOf(delay));
}
return msg;
});
}
/**
* 批量发送消息
*
* @param messages 消息列表
*/
public void batchSend(List<
DemoMessage> messages) {
messages.forEach(message ->
{
rabbitTemplate.convertAndSend("batch.exchange", "batch", message, msg ->
{
// 自定义消息属性
return msg;
});
});
}
}
3.4.4 消息消费者

MessageConsumer.java

@Slf4j
@Component
public class MessageConsumer
{
@Resource
private ObjectProvider<
MessageConverter> messageConverterObjectProvider;
/**
* 监听并处理DemoMessage类型的消息
*
* @param message 消息内容
*/
@RabbitListener(queuesToDeclare = {
@Queue("demo.queue")
})
public void handleMessageByAnnotation(DemoMessage message) {
log.info("[handleMessageByAnnotation] 收到消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听广播消息1
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue.1"),
exchange = @Exchange(value = "fanout-exchange", type = "fanout")
))
public void handleFanoutMessage1(DemoMessage message) {
log.info("[handleFanoutMessage1] 收到广播消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听广播消息2
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue.2"),
exchange = @Exchange(value = "fanout-exchange", type = "fanout")
))
public void handleFanoutMessage2(DemoMessage message) {
log.info("[handleFanoutMessage2] 收到广播消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听topic消息1
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.1"),
exchange = @Exchange(value = "topic-exchange", type = "topic"),
key = "topic.message.specific"
))
public void handleTopicMessage1(DemoMessage message) {
log.info("[handleTopicMessage1] 收到Topic消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听topic消息2
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.2"),
exchange = @Exchange(value = "topic-exchange", type = "topic"),
key = "topic.message.*"
))
public void handleTopicMessage2(DemoMessage message) {
log.info("[handleTopicMessage2] 收到Topic消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 监听延迟消息处理队列
* 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("delay.process.queue"),
exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
key = "delay.process"
))
public void handleDelayMessage(DemoMessage message) {
log.info("[handleDelayMessage] 收到延迟消息: userId={}, message={}, createTime={}",
message.getUserId(), message.getMessage(), message.getCreateTime());
}
/**
* 批量处理消息
*
* @param messages
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("batch.queue"),
exchange = @Exchange(value = "batch.exchange", type = "direct"),
key = "batch"
), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
log.info("[handleBatchMessage] 开始处理批量消息,共 {} 条", messages.size());
if (CollUtil.isEmpty(messages)) {
log.info("[handleBatchMessage] 消息列表为空,无需处理");
return;
}
// 分别存储成功和失败的消息
List<
Message> successMessages = new ArrayList<
>();
List<
Message> failedMessages = new ArrayList<
>();
// 批量转换消息
for (Message message : messages) {
try {
DemoMessage demoMessage = (DemoMessage) messageConverterObjectProvider.getObject().fromMessage(message);
demoMessage.setDeliveryTag(message.getMessageProperties().getDeliveryTag());
successMessages.add(message);
log.debug("[handleBatchMessage] 消息转换成功: deliveryTag={}", message.getMessageProperties().getDeliveryTag());
} catch (Exception e) {
log.error("[handleBatchMessage] 消息转换失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
failedMessages.add(message);
}
}
// 处理成功转换的消息
if (CollUtil.isNotEmpty(successMessages)) {
try {
log.info("[handleBatchMessage] 开始处理 {} 条成功转换的消息", successMessages.size());
// 模拟处理时间 - 实际应用中这里应该是真正的业务逻辑
processMessages(successMessages);
// 批量确认所有成功处理的消息
for (Message message : successMessages) {
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
log.debug("[handleBatchMessage] 消息确认成功: deliveryTag={}", deliveryTag);
} catch (IOException e) {
log.error("[handleBatchMessage] 消息确认失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
}
}
log.info("[handleBatchMessage] 成功处理并确认 {} 条消息", successMessages.size());
} catch (Exception e) {
log.error("[handleBatchMessage] 处理消息时发生异常", e);
// 如果处理过程中出现异常,将所有成功转换的消息标记为失败
failedMessages.addAll(successMessages);
}
}
// 处理转换失败的消息
if (!failedMessages.isEmpty()) {
log.warn("[handleBatchMessage] 共 {} 条消息处理失败,尝试重新入队", failedMessages.size());
for (Message message : failedMessages) {
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 第三个参数设为true,表示将消息重新放回队列
channel.basicNack(deliveryTag, false, true);
log.debug("[handleBatchMessage] 消息重新入队: deliveryTag={}", deliveryTag);
} catch (IOException e) {
log.error("[handleBatchMessage] 消息重新入队失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
}
}
}
log.info("[handleBatchMessage] 批量消息处理完成: 成功={}条, 失败={}条",
successMessages.size(), failedMessages.size());
}
/**
* 实际处理消息的方法
* 在实际应用中,这里应该包含真正的业务逻辑
*
* @param messages 待处理的消息列表
* @throws Exception 处理异常
*/
private void processMessages(List<
Message> messages) throws Exception {
// 模拟处理时间
Thread.sleep(50L);
// 实际应用中,这里应该是真正的业务逻辑处理
log.info("[processMessages] 处理了 {} 条消息", messages.size());
}

3.5 关键技术点

3.5.1 消息确认机制

在批量处理消息时,使用手动确认模式(MANUAL)来确保消息处理的可靠性:

@RabbitListener(..., ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
// 处理消息
// 手动确认消息
channel.basicAck(deliveryTag, false);
}
3.5.2 批量消息处理
@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setBatchListener(true);
// 启用批量监听
factory.setConsumerBatchEnabled(true);
// 启用消费者端批量处理
factory.setBatchSize(10);
// 设置批次大小
return factory;
}
/**
* 批量处理消息
*
* @param messages
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("batch.queue"),
exchange = @Exchange(value = "batch.exchange", type = "direct"),
key = "batch"
), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
public void handleBatchMessage(List<
Message> messages, Channel channel) {
}
3.5.3 延迟消息
public void delaySend(DemoMessage message, long delay) {
rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg ->
{
if (delay >
0) {
msg.getMessageProperties().setExpiration(String.valueOf(delay));
}
return msg;
});
}
/**
* 监听延迟消息处理队列
* 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
*
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("delay.process.queue"),
exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
key = "delay.process"
))
public void handleDelayMessage(DemoMessage message) {
...
}

4.最佳实践

  1. 使用 JSON 序列化:通过 Jackson2JsonMessageConverter 实现消息的 JSON 序列化,提高可读性和兼容性。
  2. 手动确认消息:在关键业务场景中使用手动确认模式,确保消息被正确处理。
  3. 异常处理:合理处理消息转换和业务处理过程中的异常,避免消息丢失。
  4. 批量处理:对于高吞吐量场景,使用批量处理提高处理效率。
  5. 配置优化:根据业务需求合理配置消费者数量、预取数量等参数。

5.总结

本文档介绍了 Spring Boot 与 RabbitMQ 的整合方案,包括基础配置、消息发送、消息消费、批量处理、延迟消息等核心功能。通过合理使用这些功能,可以构建高可用、高性能的异步消息处理系统。在实际应用中,需要根据具体业务场景进行相应的调整和优化。

源码地址:https://gitee.com/zheji/spring-rabbitmq-demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/908662.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

完整教程:VLAN划分——TRUNK

完整教程:VLAN划分——TRUNKpre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco"…

现代操作系统-音频处理技术1 Linux驱动底层

应用数学基础: 香农-奈奎斯特采样定理 角速度倒数是频率; 采样一段最大角速度为ω的信息,理想状态下需要2ω的速度才能完全采样,否则就会产生混叠Aliasing(即较高频被对称采样到较低频段),而限制于前后端,一般…

mcp-server-chart chart mcp 服务

mcp-server-chart chart mcp 服务基于antv 的mcp 服务,支持25+ charts 的生成,对于图表的开发还是挺方便的,对于图表的渲染基于了ssr,同时官方还提供了额一个示例实现(ssr 转换为图片) 简单试用安装npm install …

元推理:人和事物,都是针对性的存在着与必然因果,残缺之美

元推理:人和事物,都是针对性的存在着与必然因果,残缺之美ECT-OS-JiuHuaShan/https://orcid.org/0009-0006-8591-1891▮ 推理就绪:基于自然辩证法数学形式化系统启动因果律算符 命题是穿透表象的终极洞察。所言的不…

人和事物,都是针对性的存在着与必然因果,残缺之美

人和事物,都是针对性的存在着与必然因果,残缺之美ECT-OS-JiuHuaShan/https://orcid.org/0009-0006-8591-1891▮ 推理就绪:基于自然辩证法数学形式化系统启动因果律算符 命题是穿透表象的终极洞察。所言的不是诗意的…

ArcEngine10.2中融合工具Dissolve的bug

问题描述 使用AE调用Dissolve的GP工具时,在10.2版本中发现一个bug,当融合字段dissolve_field不填任何值,执行时不报错也不结束,感觉似乎进入某种死循环。1 string inFc = @"D:\Data\Test\新建文件地理数据库.…

Linux驱动适配I2C/SPI例子

栗子: TI的PCM3060 /sound/soc/codecs/pcm3060-i2c.c /sound/soc/codecs/pcm3060-spi.c /sound/soc/codecs/pcm3060.c 主驱动 /sound/soc/codecs/pcm3060.h 导出一个probe符号给iic/spi probe时传入spi_client/i2c_cl…

[重要] PySimpleGU控件函数用法整理

以下是基于PySimpleGUI 4.60.5版本的控件函数用法演示示例整理,按功能分类并编号说明: 一、基础控件Text(文本显示) import PySimpleGUI as sg layout = [[sg.Text(欢迎使用PySimpleGUI, key=-TEXT-, font=(Arial,…

使用XState测试分布式微服务的完整指南

本文介绍了如何使用XState库建模微服务工作流,通过状态机简化测试流程,提升测试覆盖率,并利用声明式状态机实现可视化调试,适用于订单处理等分布式场景。测试分布式微服务使用XState 分布式微服务架构带来了可扩展…

含“华”量超高的奥迪,卖爆了

微信视频号:sph0RgSyDYV47z6快手号:4874645212抖音号:dy0so323fq2w小红书号:95619019828B站1:UID:3546863642871878B站2:UID: 3546955410049087奥迪转型,有点狠。没有四环标,但有隐藏式门把手,电子外后视镜,…

某些外审专家的意见,真是臭不可闻

微信视频号:sph0RgSyDYV47z6快手号:4874645212抖音号:dy0so323fq2w小红书号:95619019828B站1:UID:3546863642871878B站2:UID: 3546955410049087常在网上看到有学者抱怨,外审意见就是一坨翔! 每次看到这样的话,…

智元首次明确七人合伙人团队

微信视频号:sph0RgSyDYV47z6快手号:4874645212抖音号:dy0so323fq2w小红书号:95619019828B站1:UID:3546863642871878B站2:UID: 35469554100490879月19日,第一财经记者在智元官网发现,官网已经上架了“合伙人团队…

大模型赋能的具身智能:自主决策和具身学习技术最新综述

微信视频号:sph0RgSyDYV47z6快手号:4874645212抖音号:dy0so323fq2w小红书号:95619019828B站1:UID:3546863642871878B站2:UID: 3546955410049087 具身智能(Embodied AI)被视为通往通用人工智能(AGI)的关键路径…

ST首批中国产MCU,价格曝光

微信视频号:sph0RgSyDYV47z6快手号:4874645212抖音号:dy0so323fq2w小红书号:95619019828B站1:UID:3546863642871878B站2:UID: 3546955410049087去年十一月底,欧洲芯片大厂意法半导体STMicroelectronics在投资者…

ABC424

ABC424C. New Skill Acquired 多源bfs代码实现 #include <bits/stdc++.h> #define rep(i, n) for (int i = 0; i < (n); ++i)using namespace std;int main() {int n;cin >> n;vector<vector<int…

解决 Windows 无法挂载 HTTP WebDAV(AList,OpenList)的问题

Windows 默认的 WebClient 服务仅支持 HTTPS 协议,而本地搭建的 WebDAV 服务通常基于 HTTP 协议,但是我们有办法将其“修复”。解决 Windows 无法挂载 HTTP WebDAV 的问题 当前市面上大多数网盘都可以挂载到 AList(…

在Ubuntu系统中使用gcc和Makefile编译C程序

一.用Ubuntu系统编写hello world程序并编译运行 1.用vim命令编写hello world程序代码2.用gcc命令编译并运行二.用Ubuntu系统编写主程序文件main1.c和子程序文件sub1.h并编译运行 1.编写子程序sub1.h2.编写主程序main1.…

HN CSP-S 2024 游记

本文中,一 Day 指一段 \(24\) 小时的时间段,从 \(4:00\) 开始计算。S1 Day -1 @湖南省队御用绫厨TM_Sharweek 拉我进了一个群。 熬到了凌晨一点,与 @湖南省队御用绫厨TM_Sharweek 在 QQ 上进行了聊天。 睡着了。 S1…

CSP-S 2025 初赛解析

T1有 5 个红色球和 5 个蓝色球,它们除了颜色之外完全相同。将这 10 个球排成一排,要求任意两个蓝色球都不能相邻,有多少种不同的排列方法? ( )A. 25 B. 30 C. 6 D. 120选 C. 排列组合:不相邻问题先排 \(5\) 个红…

科研牛马碎碎念

写在前面 7年之前,刚进入高中的时候,我开始了写日记,名之“旧梦”。如今再去看里面记录的那些往事,竟真的亦真亦假、亦虚亦实,仿佛不是发生在自己身上的经历,而是做了一场很长又很短的梦,但又确是一些趣事,看来…