Django 自定义celery-beat调度器,查询自定义表的Cron表达式进行任务调度

学习目标:

通过自定义的CronScheduler调度器在兼容标准的调度器的情况下,查询自定义任务表去生成调度任务并分配给celery worker进行执行

不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架组件分析及使用


学习内容:

  1. 创建自定义的Scheduler,设置自定义的Scheduler实现对原有配置的定时任务的兼容
  2. 如何启动自定义Scheduler对任务进行调度
  3. 创建自定义Scheduler遇到的一些问题

如何创建自定义的Scheduler:

在创建自定义Scheduler之前,我们先了解一下Scheduler类,这个类是Celery Beat的核心类,维护了任务的创建逻辑,调度逻辑。入下图是整个celery-beat启动执行流程图

     +---------------------+| Celery Beat 启动    |+----------+----------+|v+----------+----------+| Service.start()     |  --> 初始化 Scheduler(此时调用 setup_schedule)+----------+----------+|v+----------+----------+| scheduler.tick()    | 每隔 interval 触发一次,把当前任务按最近执行时间放入 _heap(调度堆)+----------+----------+|v+--------------------------+| schedule.get_schedule()  || 返回所有任务 ScheduleEntry | 此处会获取所有的可执行的调度任务,需要重写+--------------------------+|v+-----------------------------+| entry.is_due()              || 判断任务是否需要执行		   |+-----------------------------+|如果是 True           否则等待下次 tick|v+-----------------------------+| apply_async(entry)          || 发出任务执行                  |+-----------------------------+

要定义一个DjangoCronScheduler继承Scheduler,需要封装自己的数据库Cron表达式数据加载逻辑

import logging
import refrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connectionsfrom record.models import KeywordVideoSchedule, AccountVideoSchedulelogger = logging.getLogger(__name__)class DjangoCronScheduler(Scheduler):"""从Django模型动态加载cron任务的调度器"""def __init__(self, *args, **kwargs):self._schedule = {}self._cron_schedule = {}self._last_refresh = Noneself._refresh_interval = kwargs.pop('refresh_interval', 3600)  # 默认3600秒刷新一次super().__init__(*args, **kwargs)# 第一步,初始化所有的动态cron表达式的任务def setup_schedule(self):"""初始化调度"""super().setup_schedule()# 加载静态配置中的任务(即 app.conf.beat_schedule)for name, entry in self.app.conf.beat_schedule.items():if isinstance(entry, dict):self._schedule[name] = self.Entry(name=name,task=entry['task'],schedule=entry['schedule'],args=entry.get('args', []),kwargs=entry.get('kwargs', {}),options=entry.get('options', {}),app=self.app)else:# 已经是 ScheduleEntry 的实例self._schedule[name] = entryself._refresh_cron_tasks(force=True)def tick(self):"""重写tick方法,处理动态刷新"""if self.should_refresh():self._refresh_cron_tasks()return super().tick()def should_refresh(self):"""检查是否需要刷新任务"""if self._last_refresh is None:return Truenow = self.app.now()return (now - self._last_refresh).total_seconds() >= self._refresh_intervaldef _refresh_cron_tasks(self, force=False):"""从Django模型刷新cron任务"""logger.info(f'begin refresh_cron_tasks: force={force}')try:close_old_connections()  # 确保数据库连接有效# 获取所有启用的任务(关键词视频任务)db_tasks_keyword = {f'keyword_video_{task.id}': taskfor task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 获取所有启用的任务(账号视频任务)db_tasks_account = {f'account_video_{task.id}': taskfor task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 合并两个字典db_tasks = {**db_tasks_keyword, **db_tasks_account}current_names = set(self._cron_schedule.keys())new_names = set(db_tasks.keys())# 删除不再存在的任务for name in current_names - new_names:del self._cron_schedule[name]logging.info(f"Removed cron task: {name}")# 添加或更新任务for name, db_task in db_tasks.items():existing = self._cron_schedule.get(name)# 判断任务类型,并赋值任务名if name.startswith('keyword_video_'):task_name = 'record.tasks.spider_keyword_video'elif name.startswith('account_video_'):task_name = 'record.tasks.spider_account_video'else:logger.error(f"Unknown cron task: {name}")continue# 如果是新任务或cron表达式有变化if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:try:crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))entry = ScheduleEntry(name=name,task=task_name,schedule=crontab_schedule,args=[db_task.id],kwargs={},options={},total_run_count=0,last_run_at=existing.last_run_at if existing else None)self._cron_schedule[name] = entrylogging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")except ValueError as e:logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")elif force:# 强制刷新时更新其他参数self._cron_schedule[name].update({'args': [db_task.id],'kwargs': {}})self._last_refresh = self.app.now()except Exception as e:logging.error(f"Failed to refresh cron tasks: {str(e)}")finally:close_old_connections()  # 清理数据库连接def _parse_cron_expr(self, cron_expr):"""解析cron表达式为celery crontab参数"""# 标准的cron表达式是包含秒的,但是python的crontab对象最小的单位为分,如果是6位含s的数据,需要舍弃前面的秒的处理cron_parts = cron_expr.strip().split()if len(cron_parts) == 6:# 忽略秒段(第 0 段)cron_parts = cron_parts[1:]if len(cron_parts) != 5:logger.error(f"无效的 Cron 表达式: {cron_expr} (需要5或6个字段)")raise ValueError("Cron expression must have 5 parts or 6 parts")logger.info(f'cron_parts : {cron_parts}')# 通用转换规则converted = []for part in cron_parts:# 规则1: 将 0/x 转换为 */x。celery不支持0/5这种格式if re.match(r'^0/\d+$', part):part = part.replace('0/', '*/')# 规则2: 将 1-5/x 转换为 1-5/x (保持不变)# 规则3: 保持 *、数字、, 等标准语法不变converted.append(part)return (converted[0],  # minuteconverted[1],  # hourconverted[2],  # day_of_monthconverted[3],  # month_of_yearconverted[4]  # day_of_week)@propertydef schedule(self):"""确保 Celery Beat 能识别自定义调度任务"""return self.schedule()def get_schedule(self):"""获取当前调度任务"""return {**self._schedule, **self._cron_schedule}
  1. 定义一个_schedule属性接收原有的通过代码编写的固定cron表达式的执行task,例如现有项目在celery_app.py当中配置的
app.conf.beat_schedule = {'resume_autoclip_task_5min': {'task': 'record.comsumers.resume_autoclip_task','schedule': crontab(minute='*/1')},'flash_all_template_view_and_like_1day': {'task': 'record.comsumers.flash_all_template_view_and_like','schedule': crontab(hour='1', minute='0')}
}
  1. 定义一个 _cron_schedule 属性用来接收数据库当中配置的动态cron表达式的数据,根据这个表达式生成ScheduleEntry对象,也就是每个动态需要执行task的对象.

  2. 定义一个_refresh_interval属性进行刷新,因为我们的任务数据是配置在数据,数据可能会删除,也可能会修改,暂时还没考虑实时去更新Scheduler的条件下,我们就需要考虑动态刷新的额情况

  3. 对设置的属性进行初始化赋值,我们需要关注的函数为setup_schedule和tick setup_schedule函数是sever启动的初始化入口,我们可以在项目启动时把我们需要初始化的数据都初始化进去,我们需要额外注意的是crontab对象接收的cron表达式和我们传统不一样,一般我们传统的cron表达式都是6位,crontab只接收5位参数,不携带秒的那一位,而且crontab不接收0/5这种格式,会报错。所以我在此处特殊处理了cron表达式的,将其通过_parse_cron_expr转换成crontab对象能够接收的参数。

  4. def schedule(self)函数的作用,tick() 方法将按照执行时间获取可执行的任务,会触发调用schedule(self)方法,该方法会将系统代码配置任务和cron表达式的任务合并返回给调度器。理论上在setup_schedule阶段也可以将所有的数据都加载到同一个_schedule属性下,但是博主测试之后发现不成功,只能使用该方式进行处理了。

至此我们通过自定义的Scheduler就算编写完成了,下面我们来配置及启动测试。


如何启动自定义Scheduler对任务进行调度

  • 首先我们使用了celery beat组件,需要我们在项目当中引入celery beat,博主项目结合使用的是Django框架,所以在setting.py文件的属性INSTALLED_APPS 当中添加组件django_celery_beat
INSTALLED_APPS = ['django_celery_beat',  # 需要引入celery_beat进行任务生成调度
]
  • 启动celery beat使用自定义的Scheduler,有两种方式,一个是通过代码配置,另外一种是通过命令行参数启动
    • 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
      # 使用自定义的调度器,app为你的app名称,modules为定义的自定义Scheduler的文件名称
      app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
      
    • 配置完成后我们通过标准的启动方式就能启动了
       celery -A you_projiect_name beat  -l info
      
    • 启动完成后即可看到,启动的提示信息
      在这里插入图片描述
  • 启动参数指定启动的Scheduler,此时我们就不需要配置celery_app.py当中的beat_scheduler,直接在启动命令行上去配置
    celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
    

创建自定义Scheduler遇到的一些问题

  • 在编写自定义Scheduler之前,博主也不是很了解Scheduler的执行原理,只是看到网上很多使用了Scheduler自带的cron配置页去配置的cron表达式任务,它是有自己两张固定的表结构的,如果想使用自己的cron表达式和表数据,还必须将自己的数据同步到django_celery_beat_periodictaskdjango_celery_beat_crontabschedule表当中。博主就想能不能使用自己的表去执行任务,统计结果,因为每次网上述的两个表同步数据感觉处理下来也不是很方便。
  • Scheduler数据的加载的时机和机制,如果不了解这个的话,确实是无从下手,就算初始化了数据也不清楚是怎么同步到任务堆里的,需要好好看一下上述的执行流程图。
  • crontab对象的参数和标准的cron表达式不一致导致的报错,包括长度的区别,对于0/4这种格式的处理区别。
  • 理论上还确少,数据动态变化时实时的变更schedule的方法,博主这边偷懒使用了定时刷新的机制,这个使用场景对于配置实时性感知要求不高的情况下是满足要求的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81081.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯 1. 确定字符串是否包含唯一字符

确定字符串是否包含唯一字符 原题目链接 题目描述 实现一个算法来识别一个字符串的字符是否是唯一的(忽略字母大小写)。 若唯一,则输出 YES,否则输出 NO。 输入描述 输入一行字符串,长度不超过 100。 输出描述 输…

a-upload组件实现文件的上传——.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt

实现下面的上传/下载/删除功能&#xff1a;要求支持&#xff1a;【.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt】 分析上面的效果图&#xff0c;分为【上传】按钮和【文件列表】功能&#xff1a; 解决步骤1&#xff1a;上传按钮 直接上代码&#xff1a; <a-uploadmultip…

.NET Core 数据库ORM框架用法简述

.NET Core ORM框架用法简述 一、主流.NET Core ORM框架概述 在.NET Core生态系统中&#xff0c;主流的ORM(Object-Relational Mapping)框架包括&#xff1a; ​​Entity Framework Core (EF Core)​​ - 微软官方推出的ORM框架​​Dapper​​ - 轻量级微ORM​​Npgsql.Entit…

halcon打开图形窗口

1、dev_open_window 参数如下&#xff1a; 1&#xff09;Row(输入参数) y方向上&#xff0c;图形窗口距离左上角顶端的像素个数 2&#xff09;Column(输入参数) x方向上&#xff0c;距离左上角左边的像素个数 3&#xff09;Width(输入参数) 图形窗口宽度 4&#xff09;He…

2025东三省D题深圳杯D题数学建模挑战赛数模思路代码文章教学

完整内容请看文章最下面的推广群 一、问题一&#xff1a;混合STR图谱中贡献者人数判定 问题解析 给定混合STR图谱&#xff0c;识别其中的真实贡献者人数是后续基因型分离与个体识别的前提。图谱中每个位点最多应出现2n个峰&#xff08;n为人数&#xff09;&#xff0c;但由…

iView Table 组件跨页选择功能实现文档

iView Table 组件跨页选择功能实现文档 功能概述 实现基于 iView Table 组件的多选功能&#xff0c;支持以下特性&#xff1a; ✅ 跨页数据持久化选择✅ 当前页全选/取消全选✅ 自动同步选中状态显示✅ 分页切换状态保持✅ 高性能大数据量支持 实现方案 技术栈 iView UI 4…

家庭服务器IPV6搭建无限邮箱系统指南

qq邮箱操作 // 邮箱配置信息 // 注意&#xff1a;使用QQ邮箱需要先开启IMAP服务并获取授权码 // 设置方法&#xff1a;登录QQ邮箱 -> 设置 -> 账户 -> 开启IMAP/SMTP服务 -> 生成授权码 服务器操作 fetchmail 同步QQ邮箱 nginx搭建web显示本地同步过来的邮箱 ssh…

Tauri v1 与 v2 配置对比

本文档对比 Tauri v1 和 v2 版本的配置结构和内容差异&#xff0c;帮助开发者了解版本变更并进行迁移。 配置结构变化 v1 配置结构 {"package": { ... },"tauri": { "allowlist": { ... },"bundle": { ... },"security":…

对js的Date二次封装,继承了原Date的所有方法,增加了自己扩展的方法,可以实现任意时间往前往后推算多少小时、多少天、多少周、多少月;

封装js时间工具 概述 该方法继承了 js 中 Date的所有方法&#xff1b;同时扩展了一部分自用方法&#xff1a; 1、任意时间 往前推多少小时&#xff0c;天&#xff0c;月&#xff0c;周&#xff1b;参数1、2必填&#xff0c;参数3可选beforeDate(num,formatter,dateVal); befo…

TimeDistill:通过跨架构蒸馏的MLP高效长期时间序列预测

原文地址&#xff1a;https://arxiv.org/abs/2502.15016 发表会议&#xff1a;暂定&#xff08;但是Star很高&#xff09; 代码地址&#xff1a;无 作者&#xff1a;Juntong Ni &#xff08;倪浚桐&#xff09;, Zewen Liu &#xff08;刘泽文&#xff09;, Shiyu Wang&…

DeepSeek最新大模型发布-DeepSeek-Prover-V2-671B

2025 年 4 月 30 日&#xff0c;DeepSeek 开源了新模型 DeepSeek-Prover-V2-671B&#xff0c;该模型聚焦数学定理证明任务&#xff0c;基于混合专家架构&#xff0c;使用 Lean 4 框架进行形式化推理训练&#xff0c;参数规模达 6710 亿&#xff0c;结合强化学习与大规模合成数据…

如何用AI生成假期旅行照?

以下是2025年最新AI生成假期旅行照片的实用工具推荐及使用指南&#xff0c;结合工具特点、研发背景和适用场景进行综合解析&#xff1a; 一、主流AI旅行照片生成工具推荐与对比 1. 搜狐简单AI&#xff08;国内工具&#xff09; • 特点&#xff1a; • 一键优化与背景替换&…

ElaticSearch

ElaticSearch: 全文搜索 超级强&#xff0c;比如模糊查询、关键词高亮等 海量数据 高效查询&#xff0c;比传统关系数据库快得多&#xff08;尤其是搜索&#xff09; 灵活的数据结构&#xff08;Schema灵活&#xff0c;可以动态字段&#xff09; 分布式高可用&#xff0c;天…

Android开发,实现一个简约又好看的登录页

文章目录 1. 编写布局文件2.设计要点说明3. 效果图4. 关于作者其它项目视频教程介绍 1. 编写布局文件 编写activity.login.xml 布局文件 <?xml version"1.0" encoding"utf-8"?> <androidx.appcompat.widget.LinearLayoutCompat xmlns:android…

机器学习:【抛掷硬币的贝叶斯后验概率】

首先,抛硬币的问题通常涉及先验概率、似然函数和后验概率。假设用户可能想通过观察一系列的正面(H)和反面(T)来更新硬币的偏差概率。例如,先验可能假设硬币是均匀的,但随着观察到更多数据,用贝叶斯定理计算后验分布。 通常,硬币的偏差可以用Beta分布作为先验,因为它…

Echarts 问题:自定义的 legend 点击后消失,格式化 legend 的隐藏文本样式

文章目录 问题分析实现步骤代码解释问题 如下图所示,在自定义的 legend 点击后会消失 分析 我把隐藏的图例字体颜色设为灰色,可以借助 legend.formatter 和 legend.textStyle 结合 option.series 的 show 属性来达成。以下是具体的实现步骤和示例代码: <!DOCTYPE ht…

光谱相机如何提升目标检测与识别精度

光谱相机&#xff08;多光谱/高光谱&#xff09;通过捕捉目标在多个波段的光谱特征&#xff0c;能够揭示传统RGB相机无法感知的材质、化学成分及物理特性差异。以下是提升其目标检测与识别精度的核心方法&#xff1a; ‌1. 硬件优化&#xff1a;提升数据质量‌ ‌(1) 光谱分辨…

springboot项目配置nacos,指定使用环境

遇到这样一个问题&#xff0c;在开发、测试、生成环境之间切换的问题。 大多数的操作是通过修改spring.profiles.active来确定指向使用的环境配置文件&#xff0c;对应项目中需要增加对应的配置文件。 但是现在几乎所有公司都会有代码管理不管是SVN、git&#xff0c;这样就会涉…

AI代码审查的落地实施方案 - Java架构师面试实战

AI代码审查的落地实施方案 - Java架构师面试实战 本文通过模拟一位拥有十年Java研发经验的资深架构师马架构与面试官之间的对话&#xff0c;深入探讨了AI代码审查的落地实施方案。 第一轮提问 面试官&#xff1a; 马架构&#xff0c;请介绍一下您对AI代码审查的理解。 马架…

TDengine 订阅不到数据问题排查

简介 TDengine 在实际生产应用中&#xff0c;经常会遇到订阅程序订阅不到数据的问题&#xff0c;总结大部分都为使用不当或状态不正确等问题&#xff0c;需手工解决。 查看服务端状态 通过 sql 命令查看有问题的 topic 和consumer_group 组订阅是否正常。 select * from inf…