利用网上菜谱做网站打开wordpress
利用网上菜谱做网站,打开wordpress,php网站开发周期多长,做合约交易哪个网站好文章目录 前言MQ如何保证消息不丢失RabbitMQRocketMQKafkaMQ MQ如何保证顺序消息RabbitMQRocketMQKafka MQ刷盘机制/集群同步RabbitMQRocketMQKafka 广播消息集群消息RabbitMQRocketMQ MQ集群架构RabbitMQRocketMQKafka 消息重试RabbitMQRockeMqKafka 死信队列RocketMQKaf… 文章目录 前言MQ如何保证消息不丢失RabbitMQRocketMQKafkaMQ MQ如何保证顺序消息RabbitMQRocketMQKafka MQ刷盘机制/集群同步RabbitMQRocketMQKafka 广播消息集群消息RabbitMQRocketMQ MQ集群架构RabbitMQRocketMQKafka 消息重试RabbitMQRockeMqKafka 死信队列RocketMQKafka 消息去重RocketMQKafka 事务消息RabbitMQRocketMQKafka 消息积压RabbitMQ 设计MQ思路博客记录 前言
消息丢失的三种情况
消息在传入服务过程中丢失MQ收到消息暂存内存中还没消费自己挂掉了内存中的数据搞丢消费者消费到了这个消息但还没来得及处理就挂了MQ以为消息已经被处理
也就是生产者丢失消息、消息列表丢失消息、消费者丢失消息
MQ如何保证消息不丢失
RabbitMQ
一、生产者
开启RabbitMQ事务 生产者发送数据之前开启 RabbitMQ 事务channel.txSelect然后发送消息如果消息没有成功被 RabbitMQ 接收到那么生产者会收到异常报错此时就可以回滚事务channel.txRollback然后重试发送消息如果收到了消息那么可以提交事务channel.txCommit。
// 开启事务
channel.txSelect
try {// 这里发送消息
} catch (Exception e) {channel.txRollback// 这里再次重发这条消息}// 提交事务
channel.txCommit设置Confirm模式
同步确认
//开启发布确认
channel.confirmSelect();
String message i ;
channel.basicPublish(,queueName,null,message.getBytes());
//服务端返回 false 或超时时间内未返回生产者可以消息重发
boolean flag channel.waitForConfirms();
if(flag){System.out.println(消息发送成功);
}异步确认
略
服务端
消息持久化必须满足以下三个条件缺一不可。 Exchange 设置持久化 Queue 设置持久化 Message持久化发送发送消息设置发送模式deliveryMode2代表持久化消息
发送消息时设置delivery_mode属性为2使消息被持久化保存到磁盘即使RabbitMQ服务器宕机也能保证消息不丢失。同时创建队列时设置durable属性为True以确保队列也被持久化保存。
// 声明队列并将队列设置为持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message Hello, RabbitMQ!;
// 发送消息时将消息设置为持久化
channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());AMQP.BasicProperties properties new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish(, myQueue, properties, Hello, RabbitMQ.getBytes());设置备份交换机
MapString, Object arguments new HashMap();
arguments.put(alternate-exchange, myAlternateExchange);
channel.exchangeDeclare(myExchange, BuiltinExchangeType.DIRECT, true, false, arguments);
channel.exchangeDeclare(myAlternateExchange, BuiltinExchangeType.FANOUT, true, false, null);二 服务端设置集群镜像模式 单节点模式 最简单的情况非集群模式节点挂了消息就不能用了。业务可能瘫痪只能等待。 普通模式 消息只会存在与当前节点中并不会同步到其他节点当前节点宕机有影响的业务会瘫痪只能等待节点恢复重启可用必须持久化消息情况下。 镜像模式 消息会同步到其他节点上可以设置同步的节点个数但吞吐量会下降。属于RabbitMQ的HA方案
消费方开启消息确认机制
// 开启消息确认机制
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println(Received message: message);// 手动发送消息确认channel.basicAck(envelope.getDeliveryTag(), false);}
});手动确认
codechannel.basicConsume(myQueue, false, (consumerTag, delivery) - {// 处理消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag - {});RocketMQ
一、生产者提供SYNC的发送消息方式等待broker处理结果。
RocketMQ生产者提供了3种发送消息方式分别是
同步发送Producer 向 broker 发送消息阻塞当前线程等待 broker 响应发送结果。
Message msg new Message(TopicTest,TagA,OrderID188,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息消息会发给集群中的⼀个Broker节点。
SendResult sendResult producer.send(msg);异步发送Producer 首先构建一个向 broker 发送消息的任务把该任务提交给线程池等执行完该任务时回调用户自定义的回调函数执行处理结果。
Message msg new Message(TopicTest,TagA,OrderID188,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%-10d OK %s %n, index, sendResult.getMsgId());}Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();}
});Oneway发送Oneway 方式只负责发送请求不等待应答Producer只负责把请求发出去而不处理响应结果。
Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
//核⼼发送消息。没有返回值发完消息就不管了不知道有没有发送消息成功
producer.sendOneway(msg);SendResult定义说明(来自RocketMQ官方)
SEND_OK 消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息还应启用同步Master服务器或同步刷盘即SYNC_MASTER或SYNC_FLUSH。FLUSH_DISK_TIMEOUT 消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列内存只有服务器宕机消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度如果Broker服务器设置了刷盘方式为同步刷盘即FlushDiskTypeSYNC_FLUSH默认为异步刷盘方式当Broker服务器未在同步刷盘时间内默认为5s完成刷盘则将返回该状态——刷盘超时。FLUSH_SLAVE_TIMEOUT 消息发送成功但是服务器同步到Slave时超时。此时消息已经进入服务器队列只有服务器宕机消息才会丢失。如果Broker服务器的角色是同步Master即SYNC_MASTER默认是异步Master即ASYNC_MASTER并且从Broker服务器未在同步刷盘时间默认为5秒内完成与主服务器的同步则将返回该状态——数据同步到Slave服务器超时。SLAVE_NOT_AVAILABLE 消息发送成功但是此时Slave不可用。如果Broker服务器的角色是同步Master即- SYNC_MASTER默认是异步Master服务器即ASYNC_MASTER但没有配置slave Broker服务器则将返回该状态——无Slave服务器可用。
我们在调用producer.send方法时不指定回调方法则默认采用同步发送消息的方式这也是丢失几率最小的一种发送方式(但是效率比较低)。
二、Borker 方面 设置成同步刷盘及同步复制开启集群模式集群同步
1异步刷盘
默认。消息写入 CommitLog 时并不会直接写入磁盘而是先写入 PageCache 缓存后返回成功然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量但是可能会有消息丢失的情况比如断点导致机器停机PageCache 中没来得及刷盘的消息就会丢失。
2同步刷盘
消息写入内存后立刻请求刷盘线程进行刷盘如果消息未在约定的时间内(默认 5 s)刷盘成功就返回 FLUSH_DISK_TIMEOUTProducer 收到这个响应后可以进行重试。同步刷盘策略保证了消息的可靠性同时降低了吞吐量增加了延迟。要开启同步刷盘需要增加下面配置
flushDiskTypeSYNC_FLUSH同步复制后消息复制流程如下 slave 初始化后跟 master 建立连接并向 master 发送自己的 offset master 收到 slave 发送的 offset 后将 offset 后面的消息批量发送给 slave slave 把收到的消息写入 commitLog 文件并给 master 发送新的 offset master 收到新的 offset 后如果 offset producer 发送消息后的 offset给 Producer 返回 SEND_OK。
三、消费者 RocketMQ消费失败后的消费重试机制 手动提交消息偏移量
consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});public enum ConsumeConcurrentlyStatus {//业务方消费成功CONSUME_SUCCESS,//业务方消费失败之后进行重新尝试消费RECONSUME_LATER;
}RECONSUME_LATER “%RETRY%ConsumeGroupName”—重试队列的主题
KafkaMQ
解决方案 1、生产者调用异步回调消息。伪代码如下: producer.send(msgcallback) 2、生产者增加消息确认机制设置生产者参数:acks all。partition的leader副本接收到消息等待所有的follower副本都同步到了消息之后才认为本次生产者发送消息成功了 3、生产者设置重试次数。比如:retries3增加重试次数以保证消息的不丢失 4、定义本地消息日志表定时任务扫描这个表自动补偿做好监控告警。 5、后台提供一个补偿消息的工具可以手工补偿。 生产者设置同步发送 // 异步发送 默认kafkaProducer.send(new ProducerRecord(first,kafka i));// 同步发送RecordMetadata first kafkaProducer.send(new ProducerRecord(first, kafka i)).get(); 生产者设置发送ack
// 1. 创建 kafka 生产者的配置对象
Properties properties new Properties();// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, all);// 3. 创建 kafka 生产者对象
KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);kafkaProducer.send(new ProducerRecord(first,atguigu i));生产者设置重试次数。比如:retries3增加重试次数以保证消息的不丢失
三、消费者
通过在Consumer端设置“enable.auto.commit”属性为false后 在代码中手动调用KafkaConsumer实例的commitSync()方法提交, 这里指的是同步阻塞commit消费的偏移量等待Broker端的返回响应需要注意Broker端在对commit请求做出响应之前消费端会处于阻塞状态从而限制消息的处理性能和整体吞吐量以确保消息能够正常被消费。 如果在消费过程中消费端突然Crash这时候消费偏移量没有commit等正常恢复后依然还会处理刚刚未commit的消息。
生产者acks参数指定了必须要有多少个分区副本收到消息生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。
1ack0生产者在成功写入悄息之前不会等待任何来自服务器的响应。
2ack1只要集群的首领副本收到消息生产者就会收到一个来自服务器的成功响应。
3ackall只有当所有同步副本全部收到消息时生产者才会收到一个来自 服务器的成功响应。
MQ如何保证顺序消息
严格顺序消费的注意事项
生产者不能异步发送异步发送在发送失败的情况下就没办法保证消息顺序。
比如你连续发了123。 过了一会返回结果1失败2, 3成功。你把1再重新发送1遍这个时候顺序就乱掉了。
应用中应确保业务中添加事务锁防止并发处理同一对象。
比如修改业务员的手机号操作员A和操作员B同时修改业务员张三的手机号如两人的填入手机号相同无影响如不同操作员A输入正确操作员B输入错误可能造成消费顺序乱掉手机号修改错误。
对于消费端不能并行消费生产者顺序发送消费端必须顺序消费。
RabbitMQ
消息单个消费者单线程消费。
RocketMQ
RocketMQ 提供了两种顺序消息模式全局顺序消息和分区顺序消息。
全局顺序消息适用于性能要求不高的场景所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。全局顺序消息实际上是一种特殊的分区顺序消息即Topic中只有一个分区因此全局顺序和分区顺序的实现原理相同。由于分区顺序消息有多个分区所以分区顺序消息比全局顺序消息的并发度和性能更高。分区顺序消息适用于性能要求高的场景所有消息根据Sharding Key进行区块分区同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序不同分区之间的消息顺序不做要求。对于指定的一个Topic所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。分区顺序消息比全局顺序消息的并发度和性能更高。
发送方使用MessageQueueSelector选择队列
Message msg new Message(OrderTopicTest, order_orderId,KEY orderId,(order_orderId step j).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息队列的选择器
SendResult sendResult producer.send(msg, new MessageQueueSelector() {//第一个参数所有的消息第二个参数发送的消息第三个参数根据什么发送这里面传的是orderIdOverridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}//同一个订单id可以放到同一个队列里面去
}, orderId);消费方使用MessageQueueSelector选择队列
consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {//自动提交context.setAutoCommit(true);for(MessageExt msg:msgs){System.out.println(收到消息内容 new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}
});Kafka
Kafka是分布式多partition的它会将一个topic中的消息尽可能均匀的分发到每个partition上。那么问题就来了这样怎么保证同一个topic消息的顺序呢
kafka可以通过partitionKey将某类消息写入同一个partition一个partition只能对应一个消费线程以保证数据有序。 也就是说生产者在写消息的时候可以指定一个 key比如说我们指定了某个订单 id 作为 key那么这个订单相关的数据一定会被分发到同一个 partition 中去而且这个 partition 中的数据一定是有顺序的。
先后两条消息发送时前一条消息发送失败后一条消息发送成功然后失败的消息重试后发送成功造成乱序。为了解决重试机制引起的消息乱序为实现Producer的幂等性Kafka引入了Producer ID即PID和Sequence Number。
对于每个PID该Producer发送消息的每个Topic, Partition都对应一个单调递增的Sequence Number。 同样Broker端也会为每个PID, Topic, Partition维护一个序号并且每Commit一条消息时将其对应序号递增。 对于接收的每条消息如果其序号比Broker维护的序号大一则Broker会接受它否则将其丢弃 如果消息序号比Broker维护的序号差值比一大说明中间有数据尚未写入即乱序此时Broker拒绝该消息 如果消息序号小于等于Broker维护的序号说明该消息已被保存即为重复消息Broker直接丢弃该消息 发送失败后会重试这样可以保证每个消息都被发送到broker
消费者从 partition 中取出来数据的时候也一定是有顺序的。到这里顺序还是 ok 的没有错乱。
指定发送partition的分区
//没有指明 partition 值但有 key 的情况下将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
// 依次指定 key 值为 a,b,f 数据 key 的 hash 值与 3 个分区求余
//kafkaProducer.send(new ProducerRecord(first,a,atguigu i), new Callback() {}
kafkaProducer.send(new ProducerRecord(first, 0, , atguigu i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e null) {System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {e.printStackTrace();}}});MQ刷盘机制/集群同步
RabbitMQ
RocketMQ
同步刷盘、异步刷盘 RocketMQ的消息是存储到磁盘上的这样既能保证断电后恢复又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候有两种写磁盘方式
1异步刷盘方式在返回写成功状态时消息可能只是被写入了内存的PAGECACHE立刻返回发送端发送成功有单独的线程执行刷盘写操作的返回快吞吐量大当内存里的消息量积累到一定程度时统一触发写磁盘操作快速写入。
2同步刷盘方式在返回写成功状态时消息已经被写入磁盘。同步调用MappedByteBuffer的force()方法同步等待刷盘结果进行刷盘结果返回告知发送端。具体流程是消息写入内存的PAGECACHE后立刻通知刷盘线程刷盘然后等待刷盘完成刷盘线程执行完成后唤醒等待的线程返回消息写成功的状态。 同步刷盘还是异步刷盘是通过Broker配置文件里的flushDiskType参数设置的这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个 消息存储时先将消息存储到内存再根据不同的刷盘策略进行刷盘
同步刷盘
异步刷盘
刷盘源码
同步复制、异步复制 如果一个broker组有Master和Slave消息需要从Master复制到Slave上有同步和异步两种复制方式。
同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态异步复制方式是只要Master写成功即可反馈给客户端写成功状态
这两种复制方式各有优劣:
在异步复制方式下系统拥有较低的延迟和较高的吞吐量但是如果Master出了故障有些数据因为没有被写入Slave有可能会丢失在同步复制方式下如果Master出故障Slave上有全部的备份数据容易恢复但是同步复制会增大数据写入延迟降低系统吞吐量。
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。
实际应用中要结合业务场景合理设置刷盘方式和主从复制方式尤其是SYNC_FLUSH方式由于频繁的触发写磁盘动作会明显降低性能。
通常情况下应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式
主从之间配置成SYNC_MASTER的复制方式这样即使有一台机器出故障仍然可以保证数据不丢。
Kafka
Broker针对每个分区会创建一个分区目录分区目录下面存放的是日志文件.log和索引文件.index
Kafka的刷盘策略主要有两种同步刷盘sync flush和异步刷盘async flush。
同步异步刷盘的区别在于消息存储在内存memory中以后是否会等待执行完刷盘动作再返回即是否会等待将消息中的消息写入磁盘中。kafka可以通过配置flush.message和flush.ms来设置刷盘策略如果flush.message设置为5表示每5条消息进行一次刷盘如果flush.message设置为1表示每一条消息都进行一次刷盘。如果flush.ms设置为1000表示每过1000ms进行一次刷盘如果flush.ms设置为5000表示每过5000ms进行一次刷盘。
同步刷盘每条消息被写入磁盘前必须等待操作系统完成该消息的磁盘写入操作。这种方式可以确保数据不丢失但由于每次消息都要等待磁盘I/O完成因此会影响性能。在Kafka中默认使用的是异步刷盘策略因为它结合了多副本和基于日志的存储机制通过复制和重放来保障数据的高可用性。异步刷盘的目的是为了提高吞吐量和适应高性能应用场景。不过这种方法增加了数据丢失的风险尤其是在系统发生故障的情况下。异步刷盘这是一种更轻量级的刷盘方式它允许消息先被写入内存中的页缓存然后在空闲时由操作系统异步地刷入磁盘。这样可以减少对性能的影响尤其是当处理大量消息时。然而由于没有立即将数据刷入磁盘所以存在一定的数据丢失风险。Kafka提供了配置项log.flush.interval.messages和log.flush.interval.ms来控制何时触发强制的刷盘操作。如果没有设置这些参数那么Kafka会根据log.flush.scheduler.interval.ms默认值为3000毫秒的时间间隔进行检查以确定是否需要刷新所有日志到磁盘。需要注意的是尽管可以通过这些参数来实现一定程度的控制但是官方并不推荐依赖它们来强制刷盘而是强调通过副本机制来保证数据的一致性和可靠性。
广播消息集群消息
RabbitMQ
RocketMQ
RocketMQ主要提供了两种消费模式集群消费以及广播消费。我们只需要在定义消费者的时候通过setMessageModel(MessageModel.XXX)方法就可以指定是集群还是广播式消费默认是集群消费模式即每个Consumer Group中的Consumer均摊所有的消息。
public class MQProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {// 创建DefaultMQProducer类并设定生产者名称DefaultMQProducer mqProducer new DefaultMQProducer(producer-group-test);// 设置NameServer地址如果是集群的话使用分号;分隔开mqProducer.setNamesrvAddr(10.0.91.71:9876);// 消息最大长度 默认4MmqProducer.setMaxMessageSize(4096);// 发送消息超时时间默认3000mqProducer.setSendMsgTimeout(3000);// 发送消息失败重试次数默认2mqProducer.setRetryTimesWhenSendAsyncFailed(2);// 启动消息生产者mqProducer.start();// 循环十次发送十条消息for (int i 1; i 10; i) {String msg hello, 这是第 i 条同步消息;// 创建消息并指定Topic(主题)Tag(标签)和消息内容Message message new Message(CLUSTERING_TOPIC, , msg.getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送同步消息到一个Broker可以通过sendResult返回消息是否成功送达SendResult sendResult mqProducer.send(message);System.out.println(sendResult);}// 如果不再发送消息关闭Producer实例mqProducer.shutdown();}
}public class MQConsumerB {public static void main(String[] args) throws MQClientException {// 创建DefaultMQPushConsumer类并设定消费者名称DefaultMQPushConsumer mqPushConsumer new DefaultMQPushConsumer(consumer-group-test);// 设置NameServer地址如果是集群的话使用分号;分隔开mqPushConsumer.setNamesrvAddr(10.0.91.71:9876);// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动那么按照上次消费的位置继续消费mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 设置消费模型集群还是广播默认为集群mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);// 消费者最小线程量mqPushConsumer.setConsumeThreadMin(5);// 消费者最大线程量mqPushConsumer.setConsumeThreadMax(10);// 设置一次消费消息的条数默认是1mqPushConsumer.setConsumeMessageBatchMaxSize(1);// 订阅一个或者多个Topic以及Tag来过滤需要消费的消息如果订阅该主题下的所有tag则使用*mqPushConsumer.subscribe(CLUSTERING_TOPIC, *);// 注册回调实现类来处理从broker拉取回来的消息mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {// 监听类实现MessageListenerConcurrently接口即可重写consumeMessage方法接收数据Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt msgList.get(0);String body new String(messageExt.getBody(), StandardCharsets.UTF_8);System.out.println(消费者接收到消息: messageExt.toString() ---消息内容为 body);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例mqPushConsumer.start();}
}setMessageModel(MessageModel.CLUSTERING);//设置集群消息
setMessageModel(MessageModel.BROADCASTING); //设置广播消息1、在Rocket集群消费模式下订阅同一个主题Topic下的消息对于不同的消费者组是一种“广播形式”即每个消费者组的都会消费消息。
2、在Rocket集群消费模式下订阅同一个主题Topic下的消息对于相同的消费者组的消费者而言是一种集群模式即同一个消费者组内的所有消费者均分消息并消费。
案例博客
MQ集群架构
RabbitMQ
普通集群 镜像集群
RocketMQ
单Master模式
这种方式风险较大一旦Broker重启或者宕机时会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
多Master模式
一个集群无Slave全是Master例如2个Master或者3个Master这种模式的优缺点如下
优点配置简单单个Master宕机或重启维护对应用无影响在磁盘配置为RAID10时即使机器宕机不可恢复情况下由于RAID10磁盘非常可靠消息也不会丢异步刷盘丢失少量消息同步刷盘一条不丢性能最高缺点单台机器宕机期间这台机器上未被消费的消息在机器恢复之前不可订阅消息实时性会受到影响。
多Master多Slave模式-异步复制
每个Master配置一个Slave有多对Master-SlaveHA采用异步复制方式主备有短暂消息延迟毫秒级这种模式的优缺点如下
优点即使磁盘损坏消息丢失的非常少且消息实时性不会受影响同时Master宕机后消费者仍然可以从Slave消费而且此过程对应用透明不需要人工干预性能同多Master模式几乎一样缺点Master宕机磁盘损坏情况下会丢失少量消息(非同步刷盘的情况下)
多Master多Slave模式-同步双写
每个Master配置一个Slave有多对Master-SlaveHA采用同步双写方式即只有主备都写成功才向应用返回成功这种模式的优缺点如下
优点数据与服务都无单点故障Master宕机情况下消息无延迟服务可用性与数据可用性都非常高缺点性能比异步复制模式略低大约低10%左右发送单个消息的RT会略高且目前版本在主节点宕机后备机不能自动切换为主机 如果是想不存在消息丢失的情况那么在多Master的情况下要配置消息同步刷盘而在 多Master多Slave模式-同步双写 的情况下配置同步刷盘。 RocketMQ有一个集群模式叫做Dleger模式。当主节点失活时能够自动重新触发选举。 DLedger集群节点状态leader、candidate、follower leader接受客户端请求本地写入日志数据并将数据复制给follower定期发送心跳数据给follower维护leader状态 candidatemaster故障后节点的中间状态只有处于candidate状态的节点才会发送投票选举请求master选举完成后节点状态为leader或者follower follower负责同步leader的日志数据接受leader心跳数据重置倒计时器保持follower状态并将心跳响应返回给leader
集群架构原博客
集群搭建
DLedger集群
Kafka
消息重试
重试带来的副作用 不停的重试看起来很美好但也是有副作用的主要包括两方面消息重复服务端压力增大
远程调用的不确定性因请求超时触发消息发送重试流程此时客户端无法感知服务端的处理结果客户端进行的消息发送重试可能会导致消费方重复消费应该按照用户ID、业务主键等信息幂等处理消息。较多的重试次数也会增大服务端的处理压力。
RabbitMQ
消费者默认是自动提交如果消费时出现了RuntimException会导致消息直接重新入队再次投递进入队首进入死循环继而导致后面的消息被阻塞。
消息阻塞带来的后果是后边的消息无法被消费RabbitMQ服务端继续接收消息占内存和磁盘越来越多。
重试机制有2种情况
消息是自动确认时如果抛出了异常导致多次重试都失败消息被自动确认消息就丢失了消息是手动确认时如果抛出了异常导致多次重试都失败消息没被确认也无法nack就一直是unacked状态导致消息积压。
消息重试了5次之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted提示我们重试次数已经用尽了。
消息重试次数用尽后消息就会被抛弃。
重试机制
RockeMq
Producer端重试 消息发送时默认情况下会进行2次重试。如果重试次数达到上限消息将不会被再次发送。
重试配置
producer.setRetryTimesWhenSendFailed(x)同步参数默认值是2
producer.setRetryTimesWhenSendAsyncFailed()异步参数默认值是2Consumer端重试
当 Consumer 端遇到异常时消息通常会重复重试16次重试的时间间隔包括10秒、30秒、1分钟、2分钟、3分钟等。如果 Consumer 端没有返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 ConsumeConcurrentlyStatus.RECONSUME_LATER且消息没有消费成功MQ 会无限制地发送给消费端直到达到最大重试次数。在集群模式下如果消费业务逻辑代码返回 Action.ReconsumerLater、NULL 或抛出异常消息最多会重试16次。如果重试16次后消息仍然失败则会被丢弃。消息队列 RocketMQ 默认允许每条消息最多重试16次每次重试的间隔时间根据配置的间隔而变化。如果消息在16次重试后仍然失败则不再投递该消息。理论上如果消息持续失败最长可能需要4小时46分钟内完成这16次重试。
Properties properties new Properties();
// 配置对应 Group ID的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, 20);
Consumer consumer ONSFactory.createConsumer(properties);1重试队列如果Consumer端因为各种类型异常导致本次消费失败为防止该消息丢失而需要将其重新回发给Broker端保存保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。 考虑到异常恢复需要一些时间RocketMQ会为重试队列设置多个重试级别每个重试级别都有与之对应的重新投递延时重试次数越多投递延时就越大。 RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%consumerGroup”的重试队列这里需要注意的是这个Topic的重试队列是针对消费组而不是针对每个Topic设置的用于暂时保存因为各种异常而导致Consumer端无法消费的消息。 考虑到异常恢复起来需要一些时间会为重试队列设置多个重试级别每个重试级别都有与之对应的重新投递延时重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%consumerGroup”的重试队列中具体细节后面会详细阐述。
2死信队列由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息为了确保消息不会被无故的丢弃那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中SubscriptionGroupConfig配置常量默认地设置了两个参数一个是retryQueueNums为1重试队列数量为1个另外一个是retryMaxTimes为16最大重试消费的次数为16次。Broker端通过校验判断如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%consumerGroup的死信队列。一般在实际应用中移入至死信队列的消息需要人工干预处理
重试队列死信队列
Consumer 消费失败这里有 3 种情况 返回 RECONSUME_LATER 返回 null 抛出异常
public class MessageListenerImpl implements MessageListener {Overridepublic Action consume(Message message, ConsumeContext context) {//处理消息doConsumeMessage(message);//方式1返回 Action.ReconsumeLater消息将重试return Action.ReconsumeLater;//方式2返回 null消息将重试return null;//方式3直接抛出异常 消息将重试throw new RuntimeException(Consumer Message exceotion);}
}如果希望消费失败后不重试可以直接返回Action.CommitMessage。 一条消息无论重试多少次这些重试消息的Message ID都不会改变。 消息重试只针对集群消费模式生效广播消费模式不提供失败重试特性即消费失败后失败消息不再重试继续消费新的消息。
官方文档
重试配置
Kafka
死信队列
RocketMQ
当一条消息初次消费失败消息队列 RocketMQ 会自动进行消息重试达到最大重试次数后若消费依然失败则表明消费者在正常情况下无法正确地消费该消息此时消息队列 RocketMQ 不会立刻将消息丢弃而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中这种正常情况下无法被消费的消息称为死信消息Dead-Letter Message存储死信消息的特殊队列称为死信队列Dead-Letter Queue。
死信消息具有以下特性
不会再被消费者正常消费。有效期与正常消息相同均为 3 天3 天后会被自动删除。因此请在死信消息产生后的 3 天内及时处理。(commitLog文件的过期时间)
死信队列:
一个死信队列对应一个 Group ID 而不是对应单个消费者实例。如果一个 Group ID 未产生死信消息消息队列 RocketMQ 不会为其创建相应的死信队列。一个死信队列包含了对应 Group ID 产生的所有死信消息不论该消息属于哪个 Topic。死信队列是一个特殊的Topic名称为%DLQ%consumerGroupconsumerGroup
Kafka
消息去重 生产时消息重复 由于生产者发送消息给MQ在MQ确认的时候出现了网络波动生产者没有收到确认实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。 生产者中如果消息未被确认或确认失败我们可以使用定时任务redis/db来进行消息重试 消费时消息重复 消费者消费成功后再给MQ确认的时候出现了网络波动MQ没有接收到确认为了保证消息被消费MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。 由于重复消息是由于网络原因造成的因此不可避免重复消息。但是我们需要保证消息的幂等性。
需要从生产者和消费端同时保证 生产者生产消息判断是否已经发送过。 消费端保证消息幂等性即多次处理相同消息的效果与处理一次相同。
RabbitMQ 中消息重复消费的问题可以通过以下几种方式解决
使用消息去重在生产者发送消息时为每条消息生成一个唯一标识符并将其存储到数据库或缓存中。消费者在接收到消息后先查询该标识符是否存在如果存在则说明该消息已被处理过直接跳过否则进行业务处理并将该标识符存储到数据库或缓存中。
使用幂等性操作即使同一条消息被消费多次也不会对业务造成影响。例如在更新操作时使用乐观锁或悲观锁机制来避免并发更新问题。
使用 TTLTime To Live特性设置每条消息的生命周期超过指定时间后自动删除。如果消费者在该时间内未能成功处理该消息则可以认为该消息已经丢失。
使用 RabbitMQ 提供的 ACK 机制当消费者成功处理一条消息时发送 ACK 确认信号给 RabbitMQ 服务器。服务器收到 ACK 后才会将该条消息从队列中删除。如果消费者处理失败则发送 NACK 信号给 RabbitMQ 服务器并设置重新入队requeue参数为 true在下次重新投递时再次尝试消费。
在生产者端设置 IDempotent幂等属性确保相同 ID 的请求只执行一次。这样就可以避免重复发送消息从而避免了消息的重复消费。
RocketMQ
生产者生产时候设置一个唯一keyid
public void testRepeatProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();// 我们可以使用自定义key当做唯一标识String keyId UUID.randomUUID().toString();System.out.println(keyId);Message msg new Message(TopicTest, tagA, keyId, 我是一个测试消息.getBytes());SendResult send producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown();
}/*** 在boot项目中可以使用Bean在整个容器中放置一个单利对象*/
public static BitMapBloomFilter bloomFilter new BitMapBloomFilter(100); // m数组长度Test
public void testRepeatConsumer() throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(repeat-consumer-group);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);consumer.subscribe(repeatTestTopic, *);// 注册一个消费监听 MessageListenerConcurrently是并发消费consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt msgs.get(0);String keys messageExt.getKeys();// 判断是否存在布隆过滤器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下处理业务return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 这个处理业务然后放入过滤器中// do sth...bloomFilter.add(keys);System.out.println(keys: keys);System.out.println(new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}Kafka
Producer 的幂等性指的是当发送同一条消息时数据在 Server 端只会被持久化一次数据不丟不重Kafka为了实现幂等性底层设计架构中引入了ProducerID和SequenceNumbe。
当Producer发送消息(x2,y2)给Broker时Broker接收到消息并将其追加到消息流中。此时Broker返回Ack信号给Producer时发生异常导致Producer接收Ack信号失败。对于Producer来说会触发重试机制将消息(x2,y2)再次发送但是由于引入了幂等性在每条消息中附带了PIDProducerID和SequenceNumber。相同的PID和SequenceNumber发送给Broker而之前Broker缓存过之前发送的相同的消息那么在消息流中的消息就只有一条(x2,y2)不会出现重复发送的情况。
缺点Kafka 的 Exactly Once 幂等性只能保证单次会话内的精准一次性不能解决跨会话和跨分区的问题
事务消息
RabbitMQ
RabbitMQ中与事务机制有关的方法有三个txSelect(), txCommit()以及txRollback(); txSelect用于将当前channel设置成transaction模式txCommit用于提交事务txRollback用于回滚事务在通过txSelect开启事务之后我们便可以发布消息给broker代理服务器了。 如果txCommit提交成功了则消息一定到达了broker了如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常这个时候我们便可以捕获异常通过txRollback回滚事务了。
// 开启事务
try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.txSelect();try {// 发送消息channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());System.out.println(消息发送成功);// 提交事务channel.txCommit();} catch (IOException e) {// 回滚事务channel.txRollback();System.out.println(消息发送失败事务回滚);}} catch (IOException | TimeoutException e) {e.printStackTrace();}RocketMQ
RocketMQ支持事务消息整体流程如下图 1、Producer向broker发送半消息。 2、Producer端收到响应消息发送成功此时消息是半消息标记为“不可投递”状态Consumer消费不了。 3、Producer端执行本地事务。 4、正常情况本地事务执行完成Producer向Broker发送Commit/Rollback如果是CommitBroker端将半消息标记为正常消息Consumer可以消费如果是RollbackBroker丢弃此消息。 5、异常情况Broker端迟迟等不到二次确认。在一定时间后会查询所有的半消息然后到Producer端查询半消息的执行情况。 6、Producer端查询本地事务的状态。 7、根据事务的状态提交commit/rollback到broker端。567是消息回查
事务的三种状态: TransactionStatus.CommitTransaction提交事务消息消费者可以消费此消息 TransactionStatus.RollbackTransaction回滚事务它代表该消息将被删除不允许被消费。 TransactionStatus.Unknown 中间状态它代表需要检查消息队列来确定状态。
/*** 事务消息生产者*/
public class TransactionMessageProducer {/*** 事务消息监听实现*/private final static TransactionListener transactionListenerImpl new TransactionListener() {/*** 在发送消息成功时执行本地事务* param msg* param arg producer.sendMessageInTransaction的第二个参数* return 返回事务状态* LocalTransactionState.COMMIT_MESSAGE提交事务提交后broker才允许消费者使用* LocalTransactionState.RollbackTransaction回滚事务回滚后消息将被删除并且不允许别消费* LocalTransactionState.Unknown中间状态表示MQ需要核对以确定状态*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// TODO 开启本地事务实际就是我们的jdbc操作// TODO 执行业务代码插入订单数据库表// int i orderDatabaseService.insert(....)// TODO 提交或回滚本地事务(如果用spring事务注解这些都不需要我们手工去操作)// 模拟一个处理结果int index 8;/*** 模拟返回事务状态*/switch (index) {case 3:System.out.printf(本地事务回滚回滚消息id:%s%n, msg.getKeys());return LocalTransactionState.ROLLBACK_MESSAGE;case 5:case 8:return LocalTransactionState.UNKNOW;default:System.out.println(事务提交消息正常处理);return LocalTransactionState.COMMIT_MESSAGE;}}/*** Broker端对未确定状态的消息发起回查将消息发送到对应的Producer端同一个Group的Producer* 由Producer根据消息来检查本地事务的状态进而执行Commit或者Rollback* param msg* return 返回事务状态*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 根据业务正确处理 订单场景只要数据库有了这条记录消息应该被commitString transactionId msg.getTransactionId();String key msg.getKeys();System.out.printf(回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n, key, msg.getMsgId(), transactionId);if (id_5.equals(key)) { // 刚刚测试的10条消息 把id_5这条消息提交其他的全部回滚。System.out.printf(回查到本地事务已提交提交消息id:%s%n, msg.getKeys());return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.printf(未查到本地事务状态回滚消息id:%s%n, msg.getKeys());return LocalTransactionState.ROLLBACK_MESSAGE;}}};public static void main(String[] args) throws MQClientException, IOException {// 1. 创建事务生产者对象// 和普通消息生产者有所区别这里使用的是TransactionMQProducerTransactionMQProducer producer new TransactionMQProducer(GROUP_TEST);// 2. 设置NameServer的地址如果设置了环境变量NAMESRV_ADDR可以省略此步producer.setNamesrvAddr(192.168.100.242:9876);// 3. 设置事务监听器producer.setTransactionListener(transactionListenerImpl);// 4. 启动生产者producer.start();for (int i 0; i 10; i) {String content Hello transaction message i;Message message new Message(TopicTest, TagA, id_ i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息(发送一条新订单生成的通知)SendResult result producer.sendMessageInTransaction(message, i);System.out.printf(发送结果%s%n, result);}System.in.read();// 6. 停止生产者producer.shutdown();}
}rocketmq相关
Kafka
在使用Kafka事务前需要开启幂等特性将 enable.idempotence 设置为 true
Kafka 0.11.0 版本开始引入了事务性功能。实现事务性消息的过程涉及到生产者Producer和消费者Consumer两个方面
一、生产者事务 生产者可以通过事务将一批消息发送到 Kafka并保证这批消息要么全部发送成功要么全部发送失败实现消息的原子性。
1在使用事务之前生产者需要先初始化一个事务即调用 initTransactions() 方法。这样会将生产者切换到事务模式。
2然后生产者开始事务将待发送的消息放入事务中而不是直接发送到 Kafka。
3在事务中可以将多条消息添加到一个或多个主题的不同分区中。在事务中如果发送消息成功则会将消息暂存于事务缓冲区不会立即发送到 Kafka。
4在所有消息都添加到事务中后可以调用 commitTransaction() 方法提交事务。如果所有消息提交成功则整个事务提交成功所有消息会被一起发送到 Kafka。
5如果在事务过程中出现错误或者某条消息发送失败可以调用 abortTransaction() 方法回滚事务之前添加到事务中的消息不会发送到 Kafka。
Properties properties new Properties();
properties.put(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);KafkaProducerString, String producer new KafkaProducerString, String(properties);// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();try {// 处理业务逻辑ProducerRecordString, String record1 new ProducerRecordString, String(topic, msg1);producer.send(record1);ProducerRecordString, String record2 new ProducerRecordString, String(topic, msg2);producer.send(record2);ProducerRecordString, String record3 new ProducerRecordString, String(topic, msg3);producer.send(record3);// 处理其他业务逻辑// 提交事务producer.commitTransaction();
} catch (ProducerFencedException e) {// 中止事务类似于事务回滚producer.abortTransaction();
}
producer.close();
事务手动提交 在一个事务中如果需要手动提交消息需要先将 enable.auto.commit 参数设置为 false然后调用 sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets, String consumerGroupId) 方法进行手动提交该方式特别适用于 消费-转换-生产模式的状况
producer.initTransactions();while (true){org.apache.kafka.clients.consumer.ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));if (!records.isEmpty()){MapTopicPartition, OffsetAndMetadata offsets new HashMap();producer.beginTransaction();try {for (TopicPartition partition: records.partitions()){ListConsumerRecordString, String partitionRecords records.records(partition);for (ConsumerRecordString, String record : partitionRecords) {ProducerRecordString, String producerRecord new ProducerRecord(topic-sink, record.key(), record.value());producer.send(producerRecord);}long lastConsumedOffset partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset 1));}// 手动提交事务producer.sendOffsetsToTransaction(offsets, groupId);producer.commitTransaction();}catch (ProducerFencedException e){// log the exceptionproducer.abortTransaction();}}}二、消费者事务 消费者可以通过事务来保证消息的读取和消息处理的原子性。
1消费者可以将消息的偏移量Offset保存在事务中并在处理完消息后将偏移量提交到事务中。
2当事务成功提交后偏移量会被记录到消费者组中表示这批消息已经被成功消费。
3如果事务失败或回滚偏移量不会被提交下次消费者启动时会从上次提交的偏移量处继续消费。
通过使用事务生产者和消费者都可以实现对消息的原子性处理保证消息的可靠传输和处理。这对于一些需要强一致性的应用场景非常重要例如金融交易系统或者订单处理系统等。但需要注意使用事务会增加一定的系统开销因此在实现事务时需要权衡性能和可靠性。
为了实现事务Kafka引入了事务协调器TransactionCoodinator负责事务的处理所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的。
broker节点有一个专门管理事务的内部主题 __transaction_stateTransactionCoodinator 会将事务状态持久化到该主题中。
事务消息流程:
查找 TransactionCoordinator生产者会先向某个broker发送 FindCoordinator 请求找到 TransactionCoordinator 所在的 broker节点.获取PID生产者会向 TransactionCoordinator 申请获取 PIDTransactionCoordinator 收到请求后会把 transactionalId 和对应的 PID 以消息的形式保存到主题 __transaction_state 中保证 transaction_IdPID的对应关系被持久化即使宕机该对应关系也不会丢失开启事务调用 beginTransaction()后生产者本地会标记开启了一个新事务发送消息生产者向用户主题发送消息过程跟普通消息相同但第一次发送请求前会先发送请求给TransactionCoordinator 将 transactionalId 和 TopicPartition 的对应关系存储在 __transaction_state 中提交或中止事务Kafka除了普通消息还有专门的控制消息ControlBatch来标志一个事务的结束控制消息有两种类型分别用来表征事务的提交和中止 该阶段本质就是一个两阶段提交过程 将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state将COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets将 COMPLETE_COMMIT 或 COMPLETE_COMMIT_ABORT 消息写入主题 __transaction_state
学习博客
消息积压
消息积压处理办法临时紧急扩容。先修复 consumer 的问题确保其恢复消费速度然后将现有 cnosumer 都停掉。新建一个 topicpartition 是原来的 10 倍临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序这个程序部署上去消费积压的数据消费之后不做耗时的处理直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍以正常的 10 倍速度来消费数据。等快速消费完积压数据之后得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。MQ中消息失效假设你用的是 RabbitMQRabbtiMQ 是可以设置过期时间的也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里而是大量的数据会直接搞丢。我们可以采取一个方案就是批量重导这个我们之前线上也有类似的场景干过。就是大量积压的时候我们当时就直接丢弃数据了然后等过了高峰期以后比如大家一起喝咖啡熬夜到晚上12点以后用户都睡觉了。这个时候我们就开始写程序将丢失的那批数据写个临时程序一点一点的查出来然后重新灌入 mq 里面去把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面没有处理其中 1000 个订单都丢了你只能手动写程序把那 1000 个订单给查出来手动发到 mq 里去再补一次。mq消息队列块满了如果消息积压在 mq 里你很长时间都没有处理掉此时导致 mq 都快写满了咋办这个还有别的办法吗没有谁让你第一个方案执行的太慢了你临时写程序接入数据来消费消费一个丢弃一个都不要了快速消费掉所有的消息。然后走第二个方案到了晚上再补数据吧。
RabbitMQ
设计MQ思路
博客记录
mq中常见问题
rocketmq 相关问题
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/90443.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!