MQ常见问题
- 消息丢失
- 消息会在哪些环节丢失
- 应对机制
- 消息的顺序性
- 消息幂等
- 消息积压的处理
消息丢失
消息会在哪些环节丢失
- 网络传输环节:生产者发送消息到broker,broker中master同步消息给slave,consumer消费消息,这3个环节都是跨网络传输,可能造成消息丢失。
- 缓存信息刷盘环节:MQ存盘时都会先写⼊操作系统的缓存pagecache中,然后再由操作系统异步的将消息写⼊硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写⼊硬盘的消息就会丢失。
应对机制
- 生产者发送消息环节:消息确认机制。使用同步发送机制,可以收到broker端的发送成功确认。该方法消息安全但是效率低。
- 消息同步环节:普通master-slave集群下,master挂掉后slave不会主动切换未master,等master再次启动后,消息不会丢失,但这种方式牺牲了可用性。高可用Dledger集群,消息要写入半数以上节点才会通知客户端写成功,消息安全性比较高,可以认为不会丢失消息。但是在脑裂情况下,也有可能会丢失消息。
- 刷盘环节:RocketMQ的Broker提供了⼀个很明确的配置项flushDiskType,可以选择刷盘模式。有两个可选项,SYNC_FLUSH同步刷盘和ASYNC_FLUSH异步刷盘。所谓同步刷盘,是指broker每往⽇志⽂件中写⼊⼀条消息,就调⽤⼀次刷盘操作,其实也是以10毫秒的间隔去调⽤刷盘操作。⽽异步刷盘,则是指broker每隔⼀个固定的时间,才去调⽤⼀次刷盘操作。异步刷盘性能更稳定,但是会有丢消息的可能。⽽同步刷盘的消息安全性就更⾼,但是操作系统的IO压⼒就会⾮常⼤。从理论上来说,也还是会有⾮正常断电造成消息丢失的可能,甚⾄严格意义上来说,任何应⽤程序都不可能完全保证断电消息不丢失。
- 消费环节:消费状态确认机制。也就是消费者处理完消息后,需要给Broker⼀个响应,
表示消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这些没有处理成功的消息。RocketMQ是把消费失败的消息方式重试队列里面重新推送。如果Consumer给Broker返回了消费成功,用异步的方式去处理消息,这种情况可能会丢失消息。 - 如果MQ服务全挂掉了,想继续保持业务运行,又想不丢失消息,通常的做法是设计⼀个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。此时,再启动⼀个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,⾄少当MQ服务恢复过来后,这些消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
消息的顺序性
通常讨论MQ的消息顺序性,其实是在强调局部有序,⽽不是全局有序。比如某个业务流程的消息有序。
RocketMQ的顺序消费机制:Producer通过设置MessageQueueSelector将⼀组有序的消息写⼊到同⼀个MessageQueue中。Consumer使用Orderly的消费方式每个线程集中从⼀个MessageQueue中拿取消息。
消息幂等
- 消息的重复发送:Producer发送消息时,如果采⽤发送者确认的机制,那么Producer发送消息会等待Broker的响应。如果没有收到Broker的响应,Producer就会发起重试。但是,Producer没有收到Broker的响应,也有可能是Broker已经正常处理完了消息,只不过发给Producer的响应请求丢失了。这时候Producer再次发起消息重试,就有可能造成消息重复。RocketMQ的处理⽅式,是会在发送消息时,给每条消息分配⼀个唯⼀的ID。重试时broker可以根据msgId判断是否已经处理。
- 消息的重复消费:RocketMQ是通过消费者的响应机制来推进offset的,如果consumer从broker上获取了消息,正常处理之后,他要往broker返回⼀个响应,但是如果⽹络出现波动,consumer从broker上拿取到了消息,但是等到他向broker发响应时,发⽣⽹络波动,这个响应丢失了,那么就会造成消息的重复消费。因为broker没有收到响应,就会向这个Consumer所在的Group重复投递消息。Comsumer端可以使用msgId进行唯一性控制,或者更严格的可以使⽤message的key属性写入业务的唯一属性来控制。
消息积压的处理
产⽣消息积压的根本原因还是Consumer处理消息的效率低于消息产生的速度。所以处理消息积压第一个可以优化Consumer处理消息的效率,第二个可以增加Consumer实例的个数。但是增加Consumer实例个数是有上限的。RocketMQ中一个Topic下的MessageQueue只能由一个消费者绑定,因此如果消费者实例数最多增加到等于MessageQueue的个数。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有MessageQueue去消费的,因此也就没有⽤了。
如果要快速处理积压的消息,可以创建⼀个新的Topic,配置⾜够多的MessageQueue。
然后把Consumer实例的Topic转向新的Topic,并紧急上线⼀组新的消费者,只负责消费旧Topic中的消息,并转存到新的Topic中。这个速度明显会⽐普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添加消费者个数来提⾼消费速度了。之后再根据情况考虑是否要恢复成正常情况。