一、消息队列介绍
二、基于List结构模拟消息队列
总结:
三、基于PubSub实现消息队列
(1)PubSub介绍
PubSub是publish与subscribe两个单词的缩写,见明知意,PubSub就是发布与订阅的意思。
可以到Redis官网查看通配符的书写规则:
(2)使用演示
- 当输入subscribe或psubscribe后就会进入阻塞状态,等待获取消息队列中的消息
- 使用publish发布消息后两个频道都能成功获取到消息
- 有时仅有符合通配符的消息才会被指定频道接收到
(3)总结
四、Stream的单消费模式
Stream是一种数据类型,用于做数据存储,所以是支持数据持久化的,对数据安全有保障。
(1)发送与读取消息
①发送消息
key指的是队列的名称,需要向哪个队列中去发送消息;
在Stream数据结构的底层会基于消息id去形成树状结构,方便将来去查询消息,若想要去自定指定id就必须要严格按照"时间戳-递增数字"的格式,假如在同一毫秒内插入多条消息,那么后面的数字就会自动递增,用于区分消息;
field-value就是实际存储到消息队列当中的消息体,一个消息id可以去存放多个消息体键值对。
②读取消息
使用示例:
在队列中存入一条消息后发现可以在多个频道中多次读取该条消息,若想读取还未读取过的最新消息则需要使用 $符,同样的也可以去阻塞等待最新消息
等到队列中存入一条新消息时,频道就可以立刻读取到
(2)java代码中的编写方式
(3)总结
五、Stream的消费者组模式
(1)命令介绍
①XGROUP类型命令
②XREADGROUP类型命令
③XACK及XPENDING
- XACK消息确认
- XPENDING读取pending-list中消息
空闲时间指的是在获取到消息后直至确认消息前的时间间隔,假设我们赋予一个值为5000,也就代表会取出空闲时间超过5000ms以上的消息。
假设现在再去读取一条消息v6,但是并没有对其进行确认,那么该条消息就会被存储到该消费者的pending-list中,此时使用 XPENDING命令就可以查询到未被确认的该条消息v6,此处id范围的 “- +” 代表所有消息。
也可以通过XREADGROUP命令来读取pending-list中的第一条消息
(2)java中的实现方式
(3)总结
三种消息队列的比较:
Stream消息队列的缺点:
虽然Stream支持消息持久化,但是这是依赖于Redis本身持久化的,这往往并不能保证万无一失,是有丢失风险的;而且目前这种消息确认机制只支持消费者确认机制,并不支持生产者确认机制,如果是生产者在发消息的过程中发生丢失,那就无法保证消息的安全性了。
若公司业务较为庞大,对于消息队列的要求更加严格,那么更建议去使用专业的MQ消息队列。
六、基于Stream消息队列实现异步秒杀
①创建消息队列
队列只需要在初始化时创建一次即可长久使用,所以不需要java代码来完成
②编写Lua脚本
③java业务逻辑改造
- 判断秒杀资格逻辑改造
- 改造线程任务执行流程
不再需要以前注入的阻塞队列
修改run方法
定义handlePendingList方法用于处理pending-list中的未被确认的消息