延迟任务的11种实现方式(下)!!

接上文:

Redisson的RDelayedQueue

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。

先来个demo

引入pom

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.1</version>
</dependency>

封装了一个RedissonDelayQueue类

@Component
@Slf4j
public class RedissonDelayQueue {private RedissonClient redissonClient;private RDelayedQueue<String> delayQueue;private RBlockingQueue<String> blockingQueue;@PostConstructpublic void init() {initDelayQueue();startDelayQueueConsumer();}private void initDelayQueue() {Config config = new Config();SingleServerConfig serverConfig = config.useSingleServer();serverConfig.setAddress("redis://localhost:6379");redissonClient = Redisson.create(config);blockingQueue = redissonClient.getBlockingQueue("SANYOU");delayQueue = redissonClient.getDelayedQueue(blockingQueue);}private void startDelayQueueConsumer() {new Thread(() -> {while (true) {try {String task = blockingQueue.take();log.info("接收到延迟任务:{}", task);} catch (Exception e) {e.printStackTrace();}}}, "SANYOU-Consumer").start();}public void offerTask(String task, long seconds) {log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);delayQueue.offer(task, seconds, TimeUnit.SECONDS);}}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

@RestController
public class RedissonDelayQueueController {@Resourceprivate RedissonDelayQueue redissonDelayQueue;@GetMapping("/add")public void addTask(@RequestParam("task") String task) {redissonDelayQueue.offerTask(task, 5);}}

启动项目,在浏览器输入如下连接,添加任务

http://localhost:8080/add?task=sanyou

静静等待5s,成功获取到任务。

图片

实现原理

如下是Redisson延迟队列的实现原理

图片

SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要

  • redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。

  • SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的

  • redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

任务提交的时候,Redisson会将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳

Redisson客户端内部通过监听redisson_delay_queue_channel:SANYOU这个channel来提交一个延迟任务,这个延迟任务能够保证将redisson_delay_queue_timeout:SANYOU中到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列中。

于是消费者就可以从SANYOU这个目标队列获取到延迟任务了。

所以从这可以看出,Redisson的延迟任务的实现跟前面说的MQ的实现都是殊途同归,最开始任务放到中间的一个地方,叫做redisson_delay_queue_timeout:SANYOU,然后会开启一个类似于定时任务的一个东西,去判断这个中间地方的消息是否到了延迟时间,到了再放到最终的目标的队列供消费者消费。

Redisson的这种实现方式比监听Redis过期key的实现方式更加可靠,因为消息都存在list和sorted set数据类型中,所以消息很少丢。

上述说的两种Redis的方案更详细的介绍,可以查看我之前写的用Redis实现延迟队列,我研究了两种方案,发现并不简单这篇文章。

Netty的HashedWheelTimer

先来个demo
@Slf4j
public class NettyHashedWheelTimerDemo {public static void main(String[] args) {HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);timer.start();log.info("提交延迟任务");timer.newTimeout(timeout -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);}}

测试结果

图片

实现原理

图片

如图,时间轮会被分成很多格子(上述demo中的8就代表了8个格子),一个格子代表一段时间(上述demo中的100就代表一个格子是100ms),所以上述demo中,每800ms会走一圈。

当任务提交的之后,会根据任务的到期时间进行hash取模,计算出这个任务的执行时间所在具体的格子,然后添加到这个格子中,通过如果这个格子有多个任务,会用链表来保存。所以这个任务的添加有点像HashMap储存元素的原理。

HashedWheelTimer内部会开启一个线程,轮询每个格子,找到到了延迟时间的任务,然后执行。

由于HashedWheelTimer也是单线程来处理任务,所以跟Timer一样,长时间运行的任务会导致其他任务的延时处理。

前面Redisson中提到的客户端延迟任务就是基于Netty的HashedWheelTimer实现的。

Hutool的SystemTimer

Hutool工具类也提供了延迟任务的实现SystemTimer

demo
@Slf4j
public class SystemTimerDemo {public static void main(String[] args) {SystemTimer systemTimer = new SystemTimer();systemTimer.start();log.info("提交延迟任务");systemTimer.addTask(new TimerTask(() -> log.info("执行延迟任务"), 5000));}}

执行结果

图片

Hutool底层其实也用到了时间轮。

Qurtaz

Qurtaz是一款开源作业调度框架,基于Qurtaz提供的api也可以实现延迟任务的功能。

demo

依赖

<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version>
</dependency>

SanYouJob实现Job接口,当任务到达执行时间的时候会调用execute的实现,从context可以获取到任务的内容

@Slf4j
public class SanYouJob implements Job {@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {JobDetail jobDetail = context.getJobDetail();JobDataMap jobDataMap = jobDetail.getJobDataMap();log.info("获取到延迟任务:{}", jobDataMap.get("delayTask"));}
}

测试类

public class QuartzDemo {public static void main(String[] args) throws SchedulerException, InterruptedException {// 1.创建Scheduler的工厂SchedulerFactory sf = new StdSchedulerFactory();// 2.从工厂中获取调度器实例Scheduler scheduler = sf.getScheduler();// 6.启动 调度器scheduler.start();// 3.创建JobDetail,Job类型就是上面说的SanYouJobJobDetail jb = JobBuilder.newJob(SanYouJob.class).usingJobData("delayTask", "这是一个延迟任务").build();// 4.创建TriggerTrigger t = TriggerBuilder.newTrigger()//任务的触发时间就是延迟任务到的延迟时间.startAt(DateUtil.offsetSecond(new Date(), 5)).build();// 5.注册任务和定时器log.info("提交延迟任务");scheduler.scheduleJob(jb, t);}
}

执行结果:

图片

实现原理

核心组件

  • Job:表示一个任务,execute方法的实现是对任务的执行逻辑

  • JobDetail:任务的详情,可以设置任务需要的参数等信息

  • Trigger:触发器,是用来触发业务的执行,比如说指定5s后触发任务,那么任务就会在5s后触发

  • Scheduler:调度器,内部可以注册多个任务和对应任务的触发器,之后会调度任务的执行

图片

启动的时候会开启一个QuartzSchedulerThread调度线程,这个线程会去判断任务是否到了执行时间,到的话就将任务交给任务线程池去执行。

无限轮询延迟任务

无限轮询的意思就是开启一个线程不停的去轮询任务,当这些任务到达了延迟时间,那么就执行任务。

demo
@Slf4j
public class PollingTaskDemo {private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();public static void main(String[] args) {new Thread(() -> {while (true) {try {for (DelayTask delayTask : DELAY_TASK_LIST) {if (delayTask.triggerTime <= System.currentTimeMillis()) {log.info("处理延迟任务:{}", delayTask.taskContent);DELAY_TASK_LIST.remove(delayTask);}}TimeUnit.MILLISECONDS.sleep(100);} catch (Exception e) {}}}).start();log.info("提交延迟任务");DELAY_TASK_LIST.add(new DelayTask("三友的java日记", 5L));}@Getter@Setterpublic static class DelayTask {private final String taskContent;private final Long triggerTime;public DelayTask(String taskContent, Long delayTime) {this.taskContent = taskContent;this.triggerTime = System.currentTimeMillis() + delayTime * 1000;}}}

任务可以存在数据库又或者是内存,看具体的需求,这里我为了简单就放在内存里了。

执行结果:

图片

这种操作简单,但是就是效率低下,每次都得遍历所有的任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/70397.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BS5852英国家具防火安全条款主要包括哪几个方面呢?

什么是BS5852检测&#xff1f; BS5852是英国针对家用家具的强制性安全要求&#xff0c;主要测试家具在受到燃烧香烟和火柴等火源时的可燃性。这个标准通常分为四个部分进行测试&#xff0c;但实际应用中主要测试第一部分和第二部分&#xff0c;包括烟头测试和利用乙炔火焰模拟…

如何使用Spark SQL进行复杂的数据查询和分析

使用Spark SQL进行复杂的数据查询和分析是一个涉及多个步骤和技术的过程。以下是如何使用Spark SQL进行复杂数据查询和分析的详细指南&#xff1a; 一、准备阶段 环境搭建&#xff1a; 确保已经安装并配置好了Apache Spark环境。准备好数据源&#xff0c;可以是CSV文件、JSON…

iOS事件传递和响应

背景 对于身处中小公司且业务不怎么复杂的程序员来说&#xff0c;很多技术不常用&#xff0c;你可能看过很多遍也都大致了解&#xff0c;但是实际让你讲&#xff0c;不一定讲的清楚。你可能说&#xff0c;我以独当一面&#xff0c;应对自如了&#xff0c;但是技术的知识甚多&a…

FFmpeg 源码编译安装

参考&#xff1a; https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu Linux (Ubuntu) 下载 FFmpeg 源码&#xff0c;并将其解压&#xff0c;这里我将它放在 ~/ffmpeg_source 目录下&#xff1b; cd ~/ffmpeg_sources wget -O ffmpeg-snapshot.tar.bz2 https://ffmpeg.org…

【pytest】编写自动化测试用例命名规范README

API_autoTest 项目介绍 1. pytest命名规范 测试文件&#xff1a; 文件名需要以 test_ 开头或者以 _test.py 结尾。例如&#xff0c;test_login.py、user_management_test.py 这样的命名方式&#xff0c;pytest 能够自动识别并将其作为测试文件来执行其中的测试用例。 测试类…

Windows桌面系统管理5:Windows 10操作系统注册表

Windows桌面系统管理0&#xff1a;总目录-CSDN博客 Windows桌面系统管理1&#xff1a;计算机硬件组成及组装-CSDN博客 Windows桌面系统管理2&#xff1a;VMware Workstation使用和管理-CSDN博客 Windows桌面系统管理3&#xff1a;Windows 10操作系统部署与使用-CSDN博客 Wi…

llama.cpp将sensor格式的大模型转化为gguf格式

前言 ollama本地只能导入gguf格式的大模型文件&#xff0c;将safetensors 文件转化为gguf格式。需要使用 llama.cpp 这个开源工具。以下是使用 llama.cpp 转换 .safetensors 格式模型到 .gguf 格式的详细步骤: 1. 首先克隆并编译 llama.cpp: 克隆项目 git clone https://gi…

【运维】源码编译安装cmake

背景&#xff1a; 已经在本地源码编译安装gcc/g&#xff0c;现在源码安装cmake 下载源码 下载地址&#xff1a;CMake - Upgrade Your Software Build System 安装步骤&#xff1a; ./bootstrap --prefix/usr/local/cmake make make install 错误处理 1、提示找不到libmpc.…

如何通过AI优化敏捷开发中的任务管理与分配?

用ChatGPT做软件测试 在现代软件开发中&#xff0c;敏捷开发&#xff08;Agile&#xff09;已成为一种广泛采用的开发方法论&#xff0c;其核心思想是强调快速响应变化、与客户的持续沟通以及团队协作的高效性。然而&#xff0c;随着项目规模的不断扩大&#xff0c;敏捷开发面临…

petalinux高版本设置自动登录和开机自启动配置

petalinux-config -c rootfs 依次选择 Image Features -> serial-autologin-root 这是配置 进来就是root权限 创建并安装名为 myapp-init 的新建应用程序 petalinux-create -t apps --template install -n myapp-init --enable 编辑 project-spec/meta-user/recipes-…

STM32 USB 设备的描述信息作用

在使用 STM32 USB 功能时 usbd_desc.c 文件中定义了一段宏&#xff0c;以下解每段宏的用途。 #define USBD_VID 1155 #define USBD_LANGID_STRING 1033 #define USBD_MANUFACTURER_STRING "STMicroelectronics" #define US…

React通用登录/注销功能实现方案(基于shadcn/ui)

React通用登录/注销功能实现方案&#xff08;基于shadcn/ui&#xff09; 一、功能需求分析二、通用功能封装1. 通用登录表单组件2. 认证Hook封装 三、功能使用示例1. 登录页面实现2. 用户菜单实现 四、路由保护实现五、方案优势 一、功能需求分析 需要实现以下核心功能&#x…

jEasyUI 创建学校课程表

jEasyUI 创建学校课程表 引言 随着信息技术的飞速发展,教育行业也迎来了数字化转型的浪潮。学校课程表的创建和管理作为教育信息化的重要组成部分,其效率和准确性直接影响到学校的教学秩序。jEasyUI,作为一款优秀的开源UI框架,凭借其易用性、灵活性和丰富的组件,成为了许…

Linux 内核中的 container_of 宏:以 ipoib_rx_poll_rss 函数为例

在 Linux 内核编程中,container_of 是一个非常实用的宏,主要用于通过结构体的成员指针来获取包含该成员的整个结构体的指针。rx_ring = container_of(napi, struct ipoib_recv_ring, napi); 在代码中就是利用了这个宏,下面我们详细分析它的作用和工作原理。 背景知识 在内…

【论文学习】RVS-FDSC:一种基于四方向条带卷积的视网膜血管分割方法以增强特征提取

写在前面&#xff1a;本博客仅作记录学习之用&#xff0c;部分图片来自网络&#xff0c;如需引用请注明出处&#xff0c;同时如有侵犯您的权益&#xff0c;请联系删除&#xff01; 文章目录 前言论文论文内容RSC模块MSPF2 模块RPDA模块 实验效果 总结互动致谢参考往期回顾 前言…

蓝桥杯篇---IAP15F2K61S2矩阵键盘

文章目录 前言简介矩阵键盘的工作原理1.行扫描2.检测列状态3.按键识别 硬件连接1.行线2.列线 矩阵键盘使用步骤1.初始化IO口2.扫描键盘3.消抖处理4.按键识别 示例代码&#xff1a;4x4矩阵键盘扫描示例代码&#xff1a;优化后的矩阵键盘扫描注意事项1.消抖处理2.扫描频率3.IO口配…

【ISO 14229-1:2023 UDS诊断(ECU复位0x11服务)测试用例CAPL代码全解析⑲】

ISO 14229-1:2023 UDS诊断【ECU复位0x11服务】_TestCase19 作者&#xff1a;车端域控测试工程师 更新日期&#xff1a;2025年02月19日 关键词&#xff1a;UDS诊断协议、ECU复位服务、0x11服务、ISO 14229-1:2023 TC11-019测试用例 用例ID测试场景验证要点参考条款预期结果TC…

Vue 3 30天精进之旅:Day 29 - 项目实战

在学习了近一个月的Vue 3知识后&#xff0c;今天是我们学习旅程的第29天。在这一天&#xff0c;我们将专注于实践&#xff0c;通过一个小型项目来巩固之前的学习成果&#xff0c;并为之后的展示做好准备。 一、项目目标 我们将构建一个简单的个人博客应用&#xff0c;具备以下…

Windows Docker运行Implicit-SVSDF-Planner

Windows Docker运行GitHub - ZJU-FAST-Lab/Implicit-SVSDF-Planner: [SIGGRAPH 2024 & TOG] 1. 设置环境 我将项目git clone在D:/Github目录中。 下载ubuntu20.04 noetic镜像 docker pull osrf/ros:noetic-desktop-full-focal 启动容器&#xff0c;挂载主机的D:/Github文…

PHP 安全与加密:守护 Web 应用的基石

PHP 学习资料 PHP 学习资料 PHP 学习资料 在当今数字化时代&#xff0c;Web 应用无处不在&#xff0c;而 PHP 作为一种广泛使用的服务器端脚本语言&#xff0c;承载着无数网站和应用的核心逻辑。然而&#xff0c;随着网络攻击手段日益复杂&#xff0c;PHP 应用面临着诸多安全…