一口气说出 6 种延时队列的实现方法,面试官满意的笑了

这是我的第 193 期分享

作者 | 程序员内点事

来源 | 程序员内点事(ID:chegnxy-nds) 

分享 | Java中文社群(ID:javacn666)

五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败。所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如。

知耻而后勇,这不逼着自己又学起来了,个人比较喜欢一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该分享什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给大家整理分享一道面试题:“如何实现延时队列?”。

下边会介绍多种实现延时队列的思路,文末提供有几种实现方式的 github地址。其实哪种方式都没有绝对的好与坏,只是看把它用在什么业务场景中,技术这东西没有最好的只有最合适的。

一、延时队列的应用

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

1、订单成功后,在30分钟内没有支付,自动取消订单

2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。

3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

。。。。

上边的这些场景都可以应用延时队列解决。

二、延时队列的实现

我个人一直秉承的观点:工作上能用JDK自带API实现的功能,就不要轻易自己重复造轮子,或者引入三方中间件。一方面自己封装很容易出问题(大佬除外),再加上调试验证产生许多不必要的工作量;另一方面一旦接入三方的中间件就会让系统复杂度成倍的增加,维护成本也大大的增加。

1、DelayQueue 延时队列

JDK 中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue

DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。

先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒10秒15秒后取消。

要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这哥接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。

public class Order implements Delayed {/*** 延迟时间*/@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")private long time;String name;public Order(String name, long time, TimeUnit unit) {this.name = name;this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {Order Order = (Order) o;long diff = this.time - Order.time;if (diff <= 0) {return -1;} else {return 1;}}
}

DelayQueueput方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法 poll()take()poll() 为非阻塞获取,没有到期的元素直接返回null;take() 阻塞方式获取,没有到期的元素线程将会等待。

public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);DelayQueue<Order> delayQueue = new DelayQueue<>();delayQueue.put(Order1);delayQueue.put(Order2);delayQueue.put(Order3);System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));while (delayQueue.size() != 0) {/*** 取队列头部元素是否过期*/Order task = delayQueue.poll();if (task != null) {System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}Thread.sleep(1000);}}
}

上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。

执行后看到结果如下,Order1Order2Order3 分别在 5秒10秒15秒后被执行,至此就用DelayQueue实现了延时队列。

订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
2、Quartz 定时任务

Quartz一款非常经典任务调度框架,在RedisRabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。

引入quartz框架依赖包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

在启动类中使用@EnableScheduling注解开启定时任务功能。

@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {public static void main(String[] args) {SpringApplication.run(DelayqueueApplication.class, args);}
}

编写一个定时任务,每个5秒执行一次。

@Component
public class QuartzDemo {//每隔五秒@Scheduled(cron = "0/5 * * * * ? ")public void process(){System.out.println("我是定时任务!");}
}
3、Redis sorted set

Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,redis通过score来为集合中的成员进行从小到大的排序。通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期。

 zadd delayqueue 3 order3

消费端轮询队列delayqueue, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key

    /*** 消费消息*/public void pollOrderQueue() {while (true) {Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);String value = ((Tuple) set.toArray()[0]).getElement();int score = (int) ((Tuple) set.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {jedis.zrem(DELAY_QUEUE, value);System.out.println(sdf.format(new Date()) + " removed key:" + value);}if (jedis.zcard(DELAY_QUEUE) <= 0) {System.out.println(sdf.format(new Date()) + " zset empty ");return;}Thread.sleep(1000);}}

我们看到执行结果符合预期

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty 
4、Redis 过期回调

Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。

修改redis.conf文件开启notify-keyspace-events Ex

notify-keyspace-events Ex

Redis监听配置,注入Bean RedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}

编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] pattern) {String expiredKey = message.toString();System.out.println("监听到key:" + expiredKey + "已过期");}
}

到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key 并给定3s的过期时间。

 set xiaofu 123 ex 3

在控制台成功监听到了这个过期的key

监听到过期的key为:xiaofu
5、RabbitMQ 延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTLDXL这两个属性间接实现的。

先来认识一下 TTLDXL两个概念:

Time To Live(TTL) :

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列消息本身

  • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。

  • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

Dead Letter ExchangesDLX

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQQueue(队列)可以配置两个参数x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

  • 消息或者队列的TTL过期

  • 队列达到最大长度

  • 消息被消费端拒绝(basic.reject or basic.nack)

下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

发送消息时指定消息延迟的时间

public void send(String delayTimes) {amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});}
}

设置延迟队列出现死信后的转发规则

/*** 延时队列*/@Bean(name = "order.delay.queue")public Queue getMessageQueue() {return QueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)// 配置到期后转发的交换.withArgument("x-dead-letter-exchange", "order.close.exchange")// 配置到期后转发的路由键.withArgument("x-dead-letter-routing-key", "order.close.queue").build();}
6、时间轮

前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafkanetty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

先来看一张时间轮的原理图,解读一下时间轮的几个基本概念wheel :时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈round 长度为24秒,刻度数为 8,那么每一个刻度表示 3秒。那么时间精度就是  3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的roundindex信息。当round=0,index=0 ,指针指向0格子  任务A并不会执行,因为 round=0不满足要求。

所以每一个格子代表的是一些时间,比如1秒25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队列主要用HashedWheelTimerHashedWheelTimer底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。

下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

  • ThreadFactory :表示用于生成工作线程,一般采用线程池;

  • tickDurationunit:每格的时间间隔,默认100ms;

  • ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}
  • TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。

  • Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。

  • Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。

public class NettyDelayQueue {public static void main(String[] args) {final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);//定时任务TimerTask task1 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order1  5s 后执行 ");timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册}};timer.newTimeout(task1, 5, TimeUnit.SECONDS);TimerTask task2 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order2  10s 后执行");timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册}};timer.newTimeout(task2, 10, TimeUnit.SECONDS);//延迟任务timer.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order3  15s 后执行一次");}}, 15, TimeUnit.SECONDS);}
}

从执行的结果看,order3order3延时任务只执行了一次,而order2order1为定时任务,按照不同的周期重复执行。

order1  5s 后执行 
order2  10s 后执行
order3  15s 后执行一次
order1  5s 后执行 
order2  10s 后执行

总结

为了让大家更容易理解,上边的代码写的都比较简单粗糙,几种实现方式的demo已经都提交到github 地址:https://github.com/chengxy-nds/delayqueue,感兴趣的小伙伴可以下载跑一跑。

这篇文章肝了挺长时间,写作一点也不比上班干活轻松,查证资料反复验证demo的可行性,搭建各种RabbitMQRedis环境,只想说我太难了!

可能写的有不够完善的地方,如哪里有错误或者不明了的,欢迎大家踊跃指正!!!

最后

原创不易,码字不易,点个再看吧~

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

《大厂内部资料》Redis 性能优化的 13 条军规!全网首发

关注公众号发送”进群“,老王拉你进读者群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546139.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一些c语言图形库

想编程绘制一些简单的图形&#xff0c;有不少的图形库可以选择&#xff1a; &#xff08;1&#xff09;BGI图形库&#xff1a;即turbo c所带的图形库。misaki 在vc&#xff08;vc6,vc2008,vc2010等)下重写了该库&#xff0c;名为EGE 。另一个类似的vc(vc6,vc2008,vc2010)下的库…

acl 服务器编程框架特点介绍

2019独角兽企业重金招聘Python工程师标准>>> acl 中服务器框架模块是一个非常重要的模块&#xff0c;使用该模块技术人员可以快速地写出稳定、安全、高效的网络服务应用&#xff0c;该模块主要来源于著名的邮件服务器程序 (Postfix) 中的 master 模块&#xff0c;为…

turbo c相关文档

无意中在网上找到的turbo c 2.0相关文档&#xff0c;有reference guide 和user guide.下载地址见&#xff08;镜像一 &#xff0c;镜像二 &#xff0c;镜像三 &#xff0c;镜像四 &#xff09;。这些网站还有很多其他各类软件相关文档&#xff0c;感兴趣的可以自己看看。另外&a…

一个类可以有一个接口,接口可以有一个Java类吗?

In the very first step, we will see can a class have an interface in Java? 在第一步中&#xff0c;我们将看到类可以在Java中具有接口吗&#xff1f; Yes, it is possible to define an interface inside the class. 是的&#xff0c;可以在类内部定义接口。 The interf…

人人都能看懂的 6 种限流实现方案!(纯干货)

这是我的第 195 期分享作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;为了上班方便&#xff0c;去年我把自己在北郊的房子租出去了&#xff0c;搬到了南郊&#xff0c;这样…

測试新浪微博@小冰 为代码机器人的一些方法

微软的微信小冰被腾讯封杀之后,如今移民到了新浪微博; 小冰 这里贴一些眼下有效的用来识别是这是"机器"而不是有正常人类智商的代码的方法: 1. 在正常的文字中夹杂其他符号,确保不存在有意义的连续的词汇,人眼能够分辨,机器不知所云而会露馅: 比方: ^^^小v冰^^^-…

nethack

nethack是一款开源游戏&#xff0c;不支持声音和华丽的图像&#xff0c;却被称为最好玩的游戏之一。 相关链接&#xff1a; nethack主页 nethack下载地址 nethack wiki nethack贴吧 beginners guide to nethack sources 其他一些有趣的链接&#xff1a; games in c with s…

秒建一个后台管理系统?用这5个开源免费的Java项目就够了

这是我的第 196 期分享作者 | Guide来源 | JavaGuide&#xff08;ID&#xff1a;JavaGuide&#xff09; 分享 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;大家好&#xff0c;我是 Guide 哥&#xff0c;一个三观比主角还正的技术人。今天推荐几个 Java 项目…

读《白帽子讲Web安全》之客户端脚本安全(一)

2019独角兽企业重金招聘Python工程师标准>>> 【第2章 浏览器安全】 1、同源策略&#xff08;Same Origin Policy&#xff09;是一种约定&#xff0c;它是浏览器最核心也最基本的安全功能。 浏览器的同源策略&#xff0c;限制了来自不同源的“document”或脚本&…

RocketMQ一行代码造成消息发送失败

这是我的第 198 期分享作者 | 丁威来源 | 中间件兴趣圈&#xff08;ID&#xff1a;dingwpmz_zjj&#xff09;分享 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;1、问题现象首先接到项目反馈使用 RocketMQ 会出现如下错误&#xff1a;错误信息关键点&#xf…

生命游戏(game of life)

生命游戏 &#xff08;game of life )是一款非常著名的游戏。它包括一个二维 矩形世界&#xff0c;这个世界中的每个方格居住着一个活着的或死了的细胞。一个细胞在下一个时刻生死取决于相邻八个方格中活着的或死了的细胞的数量。如果相邻方格活着的细胞数量过多&#xff0c;这…

setpriority_Java Thread类的最终void setPriority(int priority)方法(带示例)

setpriority线程类最终void setPriority(int priority) (Thread Class final void setPriority(int priority)) This method is available in package java.lang.Thread.setPriority(int priority). 软件包java.lang.Thread.setPriority(int priority)中提供了此方法。 This me…

汇编级UART串口初始化与打印

用于新PCB板调试开发&#xff0c;在系统最开始&#xff08;内存初始化之前&#xff09;&#xff0c;尽快打印字符&#xff0c;验证CPU是否正常启动。 以freescale QorIQ 处理器兼容的UART为例&#xff0c;符合16550串口标准&#xff1a; /*UART DEBUG*/ /*#define CCSBAR_RESET…

Java 中的 String 有没有长度限制?

这是我的第 199 期分享作者 | Hollis来源 | Hollis&#xff08;ID&#xff1a;hollischuang&#xff09; 分享 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;关于String有没有长度限制的问题&#xff0c;我之前单独写过一篇文章分析过&#xff0c;最近我又抽…

c语言编程输入a是输出为a_C ++编程基本输入,输出,数据类型,声明能力倾向问题和解答...

c语言编程输入a是输出为aThis section contains C programming Basic Input, Output, Data types, Declaration etc Aptitude Questions and Answers with explanations. 本节包含C 编程的基本输入&#xff0c;输出&#xff0c;数据类型&#xff0c;声明等&#xff0c;以及有关…

关联数组(associative array)

关联数组&#xff08;associative array )是一种常用的抽象数据类型。它有很多别名&#xff0c;例如associative container , map , mapping , dictionary , finite map , table,index 等。它的特点是由一个关键字和其他各种属性组成的集合。典型的操作包括插入&#xff0c;删除…

开源 免费 java CMS - FreeCMS2.1 菜单管理

2019独角兽企业重金招聘Python工程师标准>>> 项目地址&#xff1a;http://www.freeteam.cn/ 菜单管理 FreeCMS在设计时定位于面向二次开发友好&#xff0c;所以FreeCMS提供了菜单管理功能&#xff0c;二次开发人员可以自由增加新的功能菜单到FreeCMS。 为了让后台…

本来想用“{{”秀一波,结果却导致了内存溢出!

这是我的第 200 期分享作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;生活中的尴尬无处不在&#xff0c;有时候你只是想简单的装一把&#xff0c;但某些“老同志”总是在不…

在Ruby中使用&运算符(new_array- arr&old_Array)创建数组实例

In the last articles, we have gone through many methods through which we can create Array Instances but you all must know that those all were Public class methods and now in the upcoming articles, we will be learning about Public instance methods. 在上一篇…

Run Length Encoding

游程编码 (Run Length Encoding ) 是一种简单的编码方法&#xff0c;通常用于控制论中对二值图像编码。ACM有一道题目就是关于该编码。见tzu 1149 或poj 1782 。虽然是简单题&#xff0c;我却花了好大功夫才搞定&#xff0c;功力还是不足阿。 程序代码如下&#xff1a; #incl…