使用 Java 实现一个简单且高效的任务调度框架

目录

一、任务调度系统概述

(一)任务调度的目标

(二)任务调度框架的关键组成

二、任务状态设计

(一)任务状态流转设计

(二)任务表设计(SQL)

三、单机任务调度实现

(一)获取待处理任务

(二)执行任务

代码实现(单线程版本)

(三)多线程提高吞吐量

四、使用阻塞队列解耦生产者-消费者

五、分布式任务调度

(一)分片ID(取模分片)

(二)中心化调度(使用 Redis)

六、结论


干货分享,感谢您的阅读!

在实际业务中,任务调度系统负责从任务队列中获取任务并执行。为了满足高吞吐、高可用、轻量级及可扩展性等需求,任务调度系统的设计必须具备灵活性、可伸缩性和容错性。

本文将展示如何使用 Java 实现一个简单且高效的任务调度框架,并深入探讨每个设计要点,包括任务状态管理、任务并发执行、分布式处理等内容。

一、任务调度系统概述

任务调度系统广泛应用于各种业务场景中,任务往往是异步执行的,需要管理任务的生命周期、处理任务的优先级、失败重试、任务超时等问题。

(一)任务调度的目标

  • 高吞吐量:任务处理速度需要尽可能快。

  • 高可用性:任务调度系统在遇到故障时能够恢复并继续处理任务。

  • 低延迟:任务提交后能迅速被处理。

  • 易于扩展:可以轻松应对任务量的增加,适应分布式环境。

(二)任务调度框架的关键组成

  • 任务状态管理:追踪任务的执行状态。

  • 任务执行策略:决定如何执行任务,包括单机和分布式执行策略。

  • 任务失败与重试机制:处理任务失败后如何重试。

  • 系统监控与报警:对任务执行情况进行实时监控,发现异常时报警。

二、任务状态设计

(一)任务状态流转设计

任务的状态管理是调度系统的核心。任务需要在生命周期内从一个状态流转到另一个状态。常见的任务状态有:

  • INIT:任务初始状态,表示任务已创建但尚未处理。

  • PROCESSING:任务正在处理中,标识任务已被调度但尚未完成。

  • SUCCESS:任务执行成功。

  • FAILED:任务执行失败。

  • RETRY:任务执行失败后需要重试。

任务状态流转的主要问题是如何避免任务的重复执行、如何保证任务在失败时的容错性和可靠性。

(二)任务表设计(SQL)

任务表记录了任务的当前状态及其他元数据,如任务类型、优先级、执行时间等。

CREATE TABLE task (task_id BIGINT AUTO_INCREMENT PRIMARY KEY,task_type VARCHAR(255) NOT NULL,status ENUM('INIT', 'PROCESSING', 'SUCCESS', 'FAILED', 'RETRY') DEFAULT 'INIT',priority INT DEFAULT 0,retry_count INT DEFAULT 0,create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,execute_time TIMESTAMP NULL
);

三、单机任务调度实现

(一)获取待处理任务

首先,我们需要从数据库中获取状态为 INIT 的任务,并按优先级和创建时间排序,优先处理高优先级和较早创建的任务。

SELECT * FROM task WHERE status = 'INIT' 
ORDER BY priority DESC, create_time ASC LIMIT 10;

(二)执行任务

当获取到待处理任务时,系统将执行这些任务,并在任务执行结束后更新任务的状态。对于任务失败的情况,我们可以设置重试机制,最多重试一定次数。

代码实现(单线程版本)
public class TaskScheduler {private static final int SLEEP_INTERVAL = 5000; // 每5秒检查一次任务private TaskRepository taskRepository; // 任务存储库(假设是数据库)public TaskScheduler(TaskRepository taskRepository) {this.taskRepository = taskRepository;}public void start() {while (true) {Task task = getPendingTask();if (task != null) {try {executeTask(task);updateTaskStatus(task.getTaskId(), "SUCCESS");} catch (Exception e) {updateTaskStatus(task.getTaskId(), "FAILED");}} else {try {Thread.sleep(SLEEP_INTERVAL); // 无任务,休眠} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}private Task getPendingTask() {return taskRepository.findFirstByStatusOrderByPriorityDescCreateTimeAsc("INIT");}private void executeTask(Task task) {// 任务执行逻辑,具体实现根据任务类型而定System.out.println("Executing task: " + task.getTaskId());}private void updateTaskStatus(Long taskId, String status) {taskRepository.updateStatus(taskId, status);}
}

(三)多线程提高吞吐量

在单线程模型下,任务处理速度较慢,我们可以通过使用线程池来提高并发性。线程池会创建一定数量的线程,并行执行多个任务,从而提升系统的吞吐量。

public class TaskScheduler {private static final int THREAD_POOL_SIZE = 10; // 线程池大小private static final int SLEEP_INTERVAL = 5000;private TaskRepository taskRepository;private ExecutorService executorService;public TaskScheduler(TaskRepository taskRepository) {this.taskRepository = taskRepository;this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);}public void start() {while (true) {Task task = getPendingTask();if (task != null) {executorService.submit(() -> {try {executeTask(task);updateTaskStatus(task.getTaskId(), "SUCCESS");} catch (Exception e) {updateTaskStatus(task.getTaskId(), "FAILED");}});} else {try {Thread.sleep(SLEEP_INTERVAL); // 无任务,休眠} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}private Task getPendingTask() {return taskRepository.findFirstByStatusOrderByPriorityDescCreateTimeAsc("INIT");}private void executeTask(Task task) {// 任务执行逻辑System.out.println("Executing task: " + task.getTaskId());}private void updateTaskStatus(Long taskId, String status) {taskRepository.updateStatus(taskId, status);}
}

四、使用阻塞队列解耦生产者-消费者

为了更好地解耦任务生产者和消费者,我们可以使用阻塞队列。生产者从任务表中拉取任务并放入队列,消费者从队列中取任务并执行。这种方式能够有效地隔离任务生产和消费的压力。

import java.util.concurrent.*;public class TaskScheduler {private static final int THREAD_POOL_SIZE = 10;private static final int SLEEP_INTERVAL = 5000;private static final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();private TaskRepository taskRepository;private ExecutorService executorService;public TaskScheduler(TaskRepository taskRepository) {this.taskRepository = taskRepository;this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);}public void start() {// 启动消费者线程池for (int i = 0; i < THREAD_POOL_SIZE; i++) {executorService.submit(this::consumeTasks);}// 生产者循环,持续从数据库拉取任务并放入队列while (true) {Task task = getPendingTask();if (task != null) {try {taskQueue.put(task); // 将任务放入队列updateTaskStatus(task.getTaskId(), "PROCESSING");} catch (InterruptedException e) {Thread.currentThread().interrupt();}} else {try {Thread.sleep(SLEEP_INTERVAL); // 无任务,休眠} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}private void consumeTasks() {while (true) {try {Task task = taskQueue.take(); // 从队列中取任务executeTask(task);updateTaskStatus(task.getTaskId(), "SUCCESS");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}private Task getPendingTask() {return taskRepository.findFirstByStatusOrderByPriorityDescCreateTimeAsc("INIT");}private void executeTask(Task task) {// 任务执行逻辑System.out.println("Executing task: " + task.getTaskId());}private void updateTaskStatus(Long taskId, String status) {taskRepository.updateStatus(taskId, status);}
}

五、分布式任务调度

随着任务量的增加,单机版的任务调度框架可能会遇到性能瓶颈。此时,我们可以考虑分布式任务调度框架。

(一)分片ID(取模分片)

将任务根据 task_id 进行分片处理,每个机器只负责一个特定范围的任务。

例如,通过 task_id % N 来决定任务属于哪个分片,这样可以使每台机器只处理一部分任务。

(二)中心化调度(使用 Redis)

我们可以使用 Redis 来实现分布式任务调度。任务生产者将任务推送到 Redis 队列,任务消费者从队列中获取任务并执行。

import redis.clients.jedis.Jedis;public class DistributedTaskScheduler {private Jedis jedis;private ExecutorService executorService;public DistributedTaskScheduler(Jedis jedis) {this.jedis = jedis;this.executorService = Executors.newFixedThreadPool(10);}public void start() {while (true) {String taskJson = jedis.blpop(0, "taskQueue").get(1); // 阻塞式取任务Task task = deserializeTask(taskJson);executorService.submit(() -> {try {executeTask(task);updateTaskStatus(task.getTaskId(), "SUCCESS");} catch (Exception e) {updateTaskStatus(task.getTaskId(), "FAILED");}});}}private void executeTask(Task task) {// 任务执行逻辑System.out.println("Executing task: " + task.getTaskId());}private void updateTaskStatus(Long taskId, String status) {// 更新任务状态}private Task deserializeTask(String taskJson) {return new Gson().fromJson(taskJson, Task.class);}
}

六、结论

本文介绍了如何设计并实现一个简单的高吞吐、高可用的任务调度系统。我们从任务的状态管理、任务的并发执行、失败重试机制到分布式任务调度等方面进行了详细的探讨。通过合适的设计模式和技术栈,我们能够实现一个灵活且高效的任务调度系统,满足业务需求并具备良好的扩展性和容错性。

未来可以在此框架的基础上增加更多的功能,例如任务优先级、任务分片、动态调整线程池、任务监控与报警等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81359.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于GPT 模板开发智能写作辅助应用

目录 项目说明 1. 项目背景 2. 项目目标 3. 功能需求 4. 技术选型 项目结构 详细代码实现 前端代码(client) client/src/main.js client/src/App.vue client/src/components/HistoryList.vue 后端代码(server) server/app.js server/routes/api.js server/mo…

linux 使用nginx部署next.js项目,并使用pm2守护进程

前言 本文基于&#xff1a;操作系统 CentOS Stream 8 使用工具&#xff1a;Xshell8、Xftp8 服务器基础环境&#xff1a; node - 请查看 linux安装node并全局可用pm2 - 请查看 linux安装pm2并全局可用nginx - 请查看 linux 使用nginx部署vue、react项目 所需服务器基础环境&…

使用huggingface_hub需要注意的事项

在安装huggingface_hub的时候要注意如果你的python是放在c盘下时记得用管理员模式命令行来安装huggingface_hub&#xff0c;否则安装过程会报错&#xff0c;之后也不会有huggingface-cli命令。 如果安装时因为没有用管理员权限安装而报错了&#xff0c;可以先卸载huggingface-…

Spring MVC @RequestHeader 注解怎么用?

我们来详细解释一下 Spring MVC 中的 RequestHeader 注解。 RequestHeader 注解的作用 RequestHeader 注解用于将 HTTP 请求中的**请求头&#xff08;Request Headers&#xff09;**的值绑定到 Controller 方法的参数上。 请求头是 HTTP 请求的一部分&#xff0c;包含了关于…

Rust 学习笔记:关于结构体的例题

Rust 学习笔记&#xff1a;关于结构体的例题 Rust 学习笔记&#xff1a;关于结构体的例题下面的程序能通过编译吗&#xff1f;下面的程序能通过编译吗&#xff1f;下面的程序能通过编译吗&#xff1f;哪种说法最能描述 Display 和 Debug 特质之间的区别&#xff1f;下面哪个选项…

STM32 SPI通信协议

1. SPI协议概述 1.1 什么是SPI&#xff1f; SPI&#xff08;Serial Peripheral Interface&#xff09;是由摩托罗拉公司于1980年代提出的同步串行通信协议&#xff0c;主要用于短距离高速芯片间通信。作为四线制全双工通信协议&#xff0c;它以简单的硬件实现和高效的传输速率…

92.一个简单的输入与显示示例 Maui例子 C#例子

一、关于项目命名的注意事项 在开发.NET MAUI项目时&#xff0c;项目命名是一个不可忽视的细节。如果你习惯了在C#控制台或WPF项目中使用中文项目名称&#xff0c;那么在.NET MAUI中&#xff0c;你可能会遇到一些问题。我之前就因为使用中文项目名称而导致项目无法直接运行&am…

Locate 3D:Meta出品自监督学习3D定位方法

标题&#xff1a; Locate 3D: Real-World Object Localization via Self-Supervised Learning in 3D 摘要&#xff1a; 我们提出了 Locate 3D&#xff0c;这是一种可根据指代表达&#xff08;如“沙发和灯之间的小咖啡桌”&#xff09;在三维场景中定位物体的模型。Locate 3…

FastAPI 与数据库交互示例

目录 安装必要的包完整代码示例运行应用使用说明API 端点说明代码解析 下面将创建一个简单的 FastAPI 应用程序&#xff0c;演示如何与 SQLite 数据库进行交互。这个例子包括创建、读取、更新和删除&#xff08;CRUD&#xff09;操作。 安装必要的包 首先&#xff0c;需要安装…

YOLO旋转目标检测之ONNX模型推理

YOLO旋转检测相较于目标检测而言&#xff0c;其只是最后的输出层网络发生了改变&#xff0c;一个最明显的区别便是&#xff1a;目标检测的检测框是xywh&#xff0c;而旋转检测则为xywha&#xff0c;其中&#xff0c;这个a代表angle&#xff0c;即旋转角度&#xff0c;其余的基本…

架构进阶:深入学习企业总体架构规划(Oracle 战略专家培训课件)【附全文阅读】

本文主要讨论了企业总体技术架构规划的重要性与实施建议。针对Oracle战略专家培训课件中的内容&#xff0c;文章强调了行业面临的挑战及现状分析、总体技术架构探讨、SOA集成解决方案讨论与问题解答等方面。文章指出&#xff0c;为了消除信息孤岛、强化应用系统&#xff0c;需要…

llamafactory-cli webui启动报错TypeError: argument of type ‘bool‘ is not iterable

一、问题 在阿里云NoteBook上启动llamafactory-cli webui报错TypeError: argument of type ‘bool’ is not iterable This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run gradio deploy from the terminal in the working directory t…

Gas 优化不足、升级机制缺陷问题

以下是针对智能合约中 Gas 优化不足 与 升级机制缺陷 的技术风险分析与解决方案: 一、Gas 优化不足 1. 核心问题 Gas 优化不足会导致合约执行成本过高,直接影响用户体验和协议可行性,尤其在交易高峰期可能引发链上拥堵或交易失败。 2. 常见风险点 冗余计算与存储操作 例如…

使用xlwings计算合并单元格的求和

有如下一个excel表 表内有合并单元格&#xff0c;现在需要求和&#xff0c;不能直接下拉填充公式怎么办&#xff1f; 通常的办法是先取消合并单元格&#xff0c;计算后&#xff0c;再次合并单元格&#xff0c;比较繁琐。 在此&#xff0c;尝试使用python和xlwings运行直接给出…

[创业之路-354]:农业文明到智能纪元:四次工业革命下的人类迁徙与价值重构

农业文明到智能纪元&#xff1a;四次工业革命下的人类迁徙与价值重构 从游牧到定居&#xff0c;从蒸汽轰鸣到算法洪流&#xff0c;人类文明的每一次跨越都伴随着生产关系的剧烈震荡。四次工业革命的浪潮不仅重塑了物质世界的生产方式&#xff0c;更将人类推向了身份认同与存在…

LeetCode 2302.统计得分小于 K 的子数组数目:滑动窗口(不需要前缀和)

【LetMeFly】2302.统计得分小于 K 的子数组数目&#xff1a;滑动窗口&#xff08;不需要前缀和&#xff09; 力扣题目链接&#xff1a;https://leetcode.cn/problems/count-subarrays-with-score-less-than-k/ 一个数组的 分数 定义为数组之和 乘以 数组的长度。 比方说&…

kafka学习笔记(四、生产者(客户端)深入研究(二)——消费者协调器与_consumer_offsets剖析)

1.消费者协调器和组协调器 如果消费者客户端中配置了多个分配策略&#xff0c;则多消费者的分区分配交由消费者协调器和组协调器来完成&#xff0c;他们之间使用一套组协调协议进行交互。 1.1.在均衡原理 将全部消费者分成多个子集&#xff0c;每个消费者组的子集在服务中对…

快速将FastAPI接口转为模型上下文协议(MCP)!

fastapi_mcp 是一个用于将 FastAPI 端点暴露为模型上下文协议&#xff08;Model Context Protocol, MCP&#xff09;工具的库&#xff0c;并且支持认证功能。 环境macbook&#xff0c;python3.13 pip install fastapi uvicorn fastapi-mcp 代码 from fastapi import FastAPI, …

实验数据的转换

最近做实验需要把x轴y轴z轴的数据处理一下&#xff0c;总结一下解决的方法&#xff1a; 源文件为两个txt文档&#xff0c;分别为x轴和y轴&#xff0c;如下&#xff1a; 最终需要达到的效果是如下&#xff1a; 就是需要把各个矩阵的数据整理好放在同一个txt文档里。 步骤① …

第Y3周:yolov5s.yaml文件解读

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 本次任务&#xff1a;将yolov5s网络模型中的第4层的C3x2修改为C3x1&#xff0c;第6层的C3x3修改为C3x2。 首先输出原来的网络结构&#xff1a; from n pa…