网站代理协议汕头网页怎么制作
web/
2025/10/6 14:04:59/
文章来源:
网站代理协议,汕头网页怎么制作,做网站哪个公司,网站建设需要什么东西SpringBoot教程#xff08;十五#xff09; | SpringBoot集成RabbitMq#xff08;消息丢失、消息重复、消息顺序、消息顺序#xff09; RabbitMQ常见问题解决方案问题一#xff1a;消息丢失的解决方案#xff08;1#xff09;生成者丢失消息丢失的情景解决方案1#xf… SpringBoot教程十五 | SpringBoot集成RabbitMq消息丢失、消息重复、消息顺序、消息顺序 RabbitMQ常见问题解决方案问题一消息丢失的解决方案1生成者丢失消息丢失的情景解决方案1发送方确认机制推荐最常用解决方案2事务不推荐因为性能差 2MQ丢失消息丢失的情景解决方案开启RabbitMQ的持久化开启镜像队列 3消费者丢失消息丢失的情景 1解决方案无需解决丢失的情景 2扩展重试机制解决方案消费者方确认机制推荐最常用 问题二消息重复的解决方案什么时候会重复消费如何解决 问题三保证消息顺序的解决方案单一队列和单一消费者模式RabbitMQ 问题四消息堆积的解决方案消息堆积原因预防措施已出事故的解决措施 RabbitMQ常见问题解决方案
问题一消息丢失的解决方案 首先明确一条消息的传送流程生产者-MQ-消费者 所以这三个节点都可能丢失数据 1生成者丢失消息
丢失的情景 发送消息过程中出现网络问题生产者以为发送成功但RabbitMQ server没有收到 解决方案1发送方确认机制推荐最常用 发送方确认机制最大的好处在于它是异步的等信道返回ark确认的同时继续发送下一条消息不会堵塞其他消息的发送 一修改application.properties配置
# 确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirmstrue #旧版本
spring.rabbitmq.publisher-confirm-typecorrelated #新版本
# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returnstruespringBoot 2.2.0.RELEASE版本之前 是使用 spring.rabbitmq.publisher-confirmstrue 在2.2.0及之后 使用spring.rabbitmq.publisher-confirm-typecorrelated 属性配置代替 二新建配置文件RabbitTemplate 对于 发送确认 写法有多种方式以下的是其中的一种方式 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitTemplateConfig {Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//setMandatory设置表示消息在没有被队列接收时是否应该被退回给生产者true:退回;false:丢弃。//通常与yml配置文件中的publisher-returns配合一起使用若不配置该项setReutrnCallback将不会有消息返回rabbitTemplate.setMandatory(true);//帮助生产者判断 确认消息是否成功发送到RabbitMQ//ack 为true表示已发送成功 false表示发送失败rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {System.out.println(ConfirmCallback: 相关数据correlationData);System.out.println(ConfirmCallback: 确认情况ack);System.out.println(ConfirmCallback: 原因cause);});//当消息无法 放到队列里面时 返回的提醒rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {System.out.println(ReturnCallback: 消息message);System.out.println(ReturnCallback: 回应码replyCode);System.out.println(ReturnCallback: 回应信息replyText);System.out.println(ReturnCallback: 交换机exchange);System.out.println(ReturnCallback: 路由键routingKey);});return rabbitTemplate;}
}解决方案2事务不推荐因为性能差
RabbitMQ提供的事务功能在生产者发送数据之前开启RabbitMQ事务
2MQ丢失消息
丢失的情景 RabbitMQ服务端接收到消息后由于服务器宕机或重启等原因消息默认存在内存中导致消息丢失 解决方案开启RabbitMQ的持久化开启镜像队列 RabbitMQ的持久化分为三个部分交换器的持久化、队列的持久化、消息的持久化 三者 都 持久化 才能保证 RabbitMQ服务重启之后消息才能存在且能发出去 交换机持久化 交换机持久化描述的是当这个交换机上没有注册队列时这个交换机是否删除。 如果要打开持久化的话也很简单 上面列子都是有体现的
//定义直接交换机
Bean
public DirectExchange directExchange() {//第一个参数:定义交换机的名称第二个参数:是否持久化第三个参数:是否自动删除return new DirectExchange(directExchange, true, false);
}队列持久化 队列持久化描述的是当这个队列没有消费者在监听时是否进行删除。 持久化做法
//定义队列
Bean
public Queue directQueue() {//第一个参数:队列的名称第二个参数:是否持久化return new Queue(directQueue, true);
}消息持久化 关键配置 持久化MessageDeliveryMode.PERSISTENT Test
public void testDurableMessage() {// 1.准备消息Message message MessageBuilder.withBody(hello, rabbitmq.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.消息ID封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend(simple.queue, message, correlationData);log.info(发送消息成功);
}
3消费者丢失消息
丢失的情景 1 RabbitMQ服务端向消费者发送完消息之后网络断了消息并没有到达消费者 解决方案无需解决 无需解决。因为此情景下服务端收不到确认消息会再次发送的。 丢失的情景 2 启用了重试机制重试指定次数之后还没成功但消息被确认。 扩展重试机制
重试机制的三大前提 重试模式已启用通过配置 spring.rabbitmq.listener.simple.retry.enabledtrue 来启用重试模式。抛出了异常在 RabbitListener 标注的方法中抛出了异常通常是 RuntimeException 或 Error。 Spring AMQP 会捕获这些异常并根据配置的重试策略来重试消息。未达到最大重试次数消息的重试次数尚未达到配置的最大值spring.rabbitmq.listener.simple.retry.maxAttempts。 配置以下即可实现重试操作
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabledtrue
# 重试次数默认3次
spring.rabbitmq.listener.simple.retry.max-attempts5解决方案消费者方确认机制推荐最常用 改成手动后就 可以实现 “先操作业务逻辑数据库操作后再手动从队列上删除这个消息” 的动作 其中“从队列上删除这个消息“这个动作体现就是 使用 channel.basicAck 去完成的。 切记改成手动后这个channel.basicAck方法一定要写。 一修改application.properties配置
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-modemanual二修改Service接收信息项 当消息在进入 emailProcess、smsProcess被RabbitListener注解 方法时就已经被视为“接收到了”但是需要 你 执行 channel.basicAck(手动确认)才能让这个消息从队列上删除。 import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;Service
public class DirectReceiver {RabbitHandlerRabbitListener(queues emailQueue) //监听的队列名称public void emailProcess(Channel channel, Message message) throws IOException {try{System.out.println(new String(message.getBody(),UTF-8));//TODO 具体业务.......//你使用手动消息确认模式时basicAck 一定要执行不然会导致会保留在队列中无法被消费//第1个参数表示消息投递序号//第2个参数false只确认当前一个消息收到大多数情况下都设置为falsetrue确认所有consumer获得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//若是消息没有成功接收第二个参数设置为true的话代表重新放回队列中false则为丢弃在此也可以做成放置死信队列的操作channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}确认和拒绝消息 basicAck: 这个方法用于确认消息已被成功处理。 第一个参数是消息的delivery tag用于标识消息 第二个参数指定是否批量确认false表示只确认当前消息。basicReject: 这个方法用于拒绝消息。 第一个参数同样是delivery tag 第二个参数指定是否将消息重新放回队列false表示不重新放回即丢弃消息。 方法解释 emailProcess: 这个方法监听emailQueue队列。 当队列中有消息时它会打印出消息的内容并尝试确认消息。 如果处理过程中发生异常它会拒绝消息但不会重新放回队列第二个参数为false。 问题二消息重复的解决方案
什么时候会重复消费
1.自动提交模式时
消费者收到消息后要自动提交但提交后网络出故障RabbitMQ服务器没收到提交消息那么此消息会被重新放入队列会再次发给消费者。
2.手动提交模式时
情景1网络故障问题同上。 情景2接收到消息并处理结束了此时消费者挂了没有手动提交消息。
总体来说就是网络不可达、消费端宕机。
如何解决
消费端处理消息的业务逻辑保持幂等性
比如你拿个数据要写库先根据主键查一下如果这数据有了就别插入了update 一下。 比如你是写 Redis那没问题了反正每次都是 set天然幂等性。
问题三保证消息顺序的解决方案
单一队列和单一消费者模式RabbitMQ 在RabbitMQ中可以确保一个队列只被一个消费者消费这样可以保证消息按照发送的顺序被处理。 因为队列本身就是一个先进先出的结构。
适用场景RabbitMQ用户且对消息顺序有严格要求的场景。 优点实现简单易于管理。 缺点可能成为性能瓶颈在处理大量消息时需要考虑扩展性。
问题四消息堆积的解决方案
消息堆积原因
消息堆积即消息没及时被消费是生产者生产消息速度快于消费者消费的速度导致的。 消费者消费慢可能是因为本身逻辑耗费时间较长、阻塞了。
预防措施
生产者 1.减少发布频率 3.考虑使用队列最大长度限制 消费者 1.优化代码
已出事故的解决措施
情况1堆积的消息还需要使用
方案1简单修复 修复 消费者consumer的问题让他恢复消费速度然后等待几个小时消费完毕 方案2复杂修复 单队列消费转变为多队列并行消费 也是需要先 修复 消费者consumer的问题再进行下面的步骤
步骤 1: 队列和路由设置 1.创建新队列在RabbitMQ中创建10个新队列每个队列分配一个独特的名称。 2. 设置交换机定义一个直连型Direct交换机。 3. 绑定路由键将每个新队列通过唯一的路由键绑定到直连型交换机上。
伪代码例子
// 假设这是配置类的一部分
Bean
Queue queue1() { return new Queue(queue1, false);
}
Bean
Queue queue2() { return new Queue(queue2, false);
}
// 以此类推为其他9个队列创建Bean
.........
Bean
DirectExchange exchange() { return new DirectExchange(myExchange);
}
Bean
Binding binding1(Queue queue1, DirectExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with(routingKey1);
}
Bean
Binding binding2(Queue queue2, DirectExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with(routingKey2);
}
// 以此类推为其他队列和路由键创建绑定
......步骤 2: 消息分发 1.接收堆积数据现有消费者或分发者接收从发送者处堆积的数据。 2.分发到新队列实现分发逻辑将接收到的消息根据路由键分发到相应的10个新队列中。
伪代码例子
RabbitListener(queues oldQueue)
public void emailProcess(Message message, Channel channel) throws IOException { try { // 生成1-10之间的顺序数 SequentialRandom sequentialRandom new SequentialRandom()String key sequentialRandom.getNextSequentialRandom();// 重新发送消息到交换机交换机将根据routingKey将消息路由到正确的队列 rabbitTemplate.convertAndSend(myExchange, routingKeykey, new String(message.getBody(),UTF-8)); // 确认原始队列中的消息如果您想要的话 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理异常可能包括记录日志、发送警报等 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }
} public class SequentialRandom { private int currentIndex 1; // 初始索引为1 /** * 获取下一个顺序数* return 下一个数字从1到10循环 */ public int getNextSequentialRandom() { int next currentIndex; currentIndex (currentIndex % 10) 1; // 使用模运算实现循环并更新索引 return next; }
}步骤 3: 并行消费 1.开发新消费端编写新的消费端程序该程序能够监并处理来自10个新队列的消息。 2. 部署并启动将新消费端程序部署到服务器并启动它以开始并行消费。
伪代码例子
Component
public class ParallelConsumer { RabbitListener(queues {queue1}) public void receiveMessage1(Message message) { // 处理消息 } RabbitListener(queues {queue2}) public void receiveMessage2(Message message) { // 处理消息 } // ... RabbitListener(queues {queue10}) public void receiveMessage3(Message message) { // 处理消息 }
}情况2堆积的消息不需要使用
删除消息即可。可以在RabbitMQ控制台删除或者使用命令。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/87952.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!