学习目标:
通过自定义的CronScheduler调度器在兼容标准的调度器的情况下,查询自定义任务表去生成调度任务并分配给celery worker进行执行
不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架组件分析及使用
学习内容:
- 创建自定义的Scheduler,设置自定义的Scheduler实现对原有配置的定时任务的兼容
- 如何启动自定义Scheduler对任务进行调度
- 创建自定义Scheduler遇到的一些问题
如何创建自定义的Scheduler:
在创建自定义Scheduler之前,我们先了解一下Scheduler类,这个类是Celery Beat的核心类,维护了任务的创建逻辑,调度逻辑。入下图是整个celery-beat启动执行流程图
+---------------------+| Celery Beat 启动 |+----------+----------+|v+----------+----------+| Service.start() | --> 初始化 Scheduler(此时调用 setup_schedule)+----------+----------+|v+----------+----------+| scheduler.tick() | 每隔 interval 触发一次,把当前任务按最近执行时间放入 _heap(调度堆)+----------+----------+|v+--------------------------+| schedule.get_schedule() || 返回所有任务 ScheduleEntry | 此处会获取所有的可执行的调度任务,需要重写+--------------------------+|v+-----------------------------+| entry.is_due() || 判断任务是否需要执行 |+-----------------------------+|如果是 True 否则等待下次 tick|v+-----------------------------+| apply_async(entry) || 发出任务执行 |+-----------------------------+
要定义一个DjangoCronScheduler继承Scheduler,需要封装自己的数据库Cron表达式数据加载逻辑
import logging
import refrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connectionsfrom record.models import KeywordVideoSchedule, AccountVideoSchedulelogger = logging.getLogger(__name__)class DjangoCronScheduler(Scheduler):"""从Django模型动态加载cron任务的调度器"""def __init__(self, *args, **kwargs):self._schedule = {}self._cron_schedule = {}self._last_refresh = Noneself._refresh_interval = kwargs.pop('refresh_interval', 3600) # 默认3600秒刷新一次super().__init__(*args, **kwargs)# 第一步,初始化所有的动态cron表达式的任务def setup_schedule(self):"""初始化调度"""super().setup_schedule()# 加载静态配置中的任务(即 app.conf.beat_schedule)for name, entry in self.app.conf.beat_schedule.items():if isinstance(entry, dict):self._schedule[name] = self.Entry(name=name,task=entry['task'],schedule=entry['schedule'],args=entry.get('args', []),kwargs=entry.get('kwargs', {}),options=entry.get('options', {}),app=self.app)else:# 已经是 ScheduleEntry 的实例self._schedule[name] = entryself._refresh_cron_tasks(force=True)def tick(self):"""重写tick方法,处理动态刷新"""if self.should_refresh():self._refresh_cron_tasks()return super().tick()def should_refresh(self):"""检查是否需要刷新任务"""if self._last_refresh is None:return Truenow = self.app.now()return (now - self._last_refresh).total_seconds() >= self._refresh_intervaldef _refresh_cron_tasks(self, force=False):"""从Django模型刷新cron任务"""logger.info(f'begin refresh_cron_tasks: force={force}')try:close_old_connections() # 确保数据库连接有效# 获取所有启用的任务(关键词视频任务)db_tasks_keyword = {f'keyword_video_{task.id}': taskfor task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 获取所有启用的任务(账号视频任务)db_tasks_account = {f'account_video_{task.id}': taskfor task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 合并两个字典db_tasks = {**db_tasks_keyword, **db_tasks_account}current_names = set(self._cron_schedule.keys())new_names = set(db_tasks.keys())# 删除不再存在的任务for name in current_names - new_names:del self._cron_schedule[name]logging.info(f"Removed cron task: {name}")# 添加或更新任务for name, db_task in db_tasks.items():existing = self._cron_schedule.get(name)# 判断任务类型,并赋值任务名if name.startswith('keyword_video_'):task_name = 'record.tasks.spider_keyword_video'elif name.startswith('account_video_'):task_name = 'record.tasks.spider_account_video'else:logger.error(f"Unknown cron task: {name}")continue# 如果是新任务或cron表达式有变化if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:try:crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))entry = ScheduleEntry(name=name,task=task_name,schedule=crontab_schedule,args=[db_task.id],kwargs={},options={},total_run_count=0,last_run_at=existing.last_run_at if existing else None)self._cron_schedule[name] = entrylogging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")except ValueError as e:logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")elif force:# 强制刷新时更新其他参数self._cron_schedule[name].update({'args': [db_task.id],'kwargs': {}})self._last_refresh = self.app.now()except Exception as e:logging.error(f"Failed to refresh cron tasks: {str(e)}")finally:close_old_connections() # 清理数据库连接def _parse_cron_expr(self, cron_expr):"""解析cron表达式为celery crontab参数"""# 标准的cron表达式是包含秒的,但是python的crontab对象最小的单位为分,如果是6位含s的数据,需要舍弃前面的秒的处理cron_parts = cron_expr.strip().split()if len(cron_parts) == 6:# 忽略秒段(第 0 段)cron_parts = cron_parts[1:]if len(cron_parts) != 5:logger.error(f"无效的 Cron 表达式: {cron_expr} (需要5或6个字段)")raise ValueError("Cron expression must have 5 parts or 6 parts")logger.info(f'cron_parts : {cron_parts}')# 通用转换规则converted = []for part in cron_parts:# 规则1: 将 0/x 转换为 */x。celery不支持0/5这种格式if re.match(r'^0/\d+$', part):part = part.replace('0/', '*/')# 规则2: 将 1-5/x 转换为 1-5/x (保持不变)# 规则3: 保持 *、数字、, 等标准语法不变converted.append(part)return (converted[0], # minuteconverted[1], # hourconverted[2], # day_of_monthconverted[3], # month_of_yearconverted[4] # day_of_week)@propertydef schedule(self):"""确保 Celery Beat 能识别自定义调度任务"""return self.schedule()def get_schedule(self):"""获取当前调度任务"""return {**self._schedule, **self._cron_schedule}
- 定义一个
_schedule
属性接收原有的通过代码编写的固定cron表达式的执行task,例如现有项目在celery_app.py
当中配置的
app.conf.beat_schedule = {'resume_autoclip_task_5min': {'task': 'record.comsumers.resume_autoclip_task','schedule': crontab(minute='*/1')},'flash_all_template_view_and_like_1day': {'task': 'record.comsumers.flash_all_template_view_and_like','schedule': crontab(hour='1', minute='0')}
}
-
定义一个
_cron_schedule
属性用来接收数据库当中配置的动态cron表达式的数据,根据这个表达式生成ScheduleEntry对象,也就是每个动态需要执行task的对象. -
定义一个
_refresh_interval
属性进行刷新,因为我们的任务数据是配置在数据,数据可能会删除,也可能会修改,暂时还没考虑实时去更新Scheduler的条件下,我们就需要考虑动态刷新的额情况 -
对设置的属性进行初始化赋值,我们需要关注的函数为setup_schedule和tick setup_schedule函数是sever启动的初始化入口,我们可以在项目启动时把我们需要初始化的数据都初始化进去,我们需要额外注意的是
crontab
对象接收的cron表达式和我们传统不一样,一般我们传统的cron表达式都是6位,crontab只接收5位参数,不携带秒的那一位,而且crontab不接收0/5这种格式,会报错。所以我在此处特殊处理了cron表达式的,将其通过_parse_cron_expr
转换成crontab对象能够接收的参数。 -
def schedule(self)函数的作用,tick() 方法将按照执行时间获取可执行的任务,会触发调用schedule(self)方法,该方法会将系统代码配置任务和cron表达式的任务合并返回给调度器。理论上在
setup_schedule
阶段也可以将所有的数据都加载到同一个_schedule属性下,但是博主测试之后发现不成功,只能使用该方式进行处理了。
至此我们通过自定义的Scheduler就算编写完成了,下面我们来配置及启动测试。
如何启动自定义Scheduler对任务进行调度
- 首先我们使用了celery beat组件,需要我们在项目当中引入celery beat,博主项目结合使用的是Django框架,所以在setting.py文件的属性
INSTALLED_APPS
当中添加组件django_celery_beat
INSTALLED_APPS = ['django_celery_beat', # 需要引入celery_beat进行任务生成调度
]
- 启动celery beat使用自定义的Scheduler,有两种方式,一个是通过代码配置,另外一种是通过命令行参数启动
- 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
# 使用自定义的调度器,app为你的app名称,modules为定义的自定义Scheduler的文件名称 app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
- 配置完成后我们通过标准的启动方式就能启动了
celery -A you_projiect_name beat -l info
- 启动完成后即可看到,启动的提示信息
- 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
- 启动参数指定启动的Scheduler,此时我们就不需要配置celery_app.py当中的beat_scheduler,直接在启动命令行上去配置
celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
创建自定义Scheduler遇到的一些问题
- 在编写自定义Scheduler之前,博主也不是很了解Scheduler的执行原理,只是看到网上很多使用了Scheduler自带的cron配置页去配置的cron表达式任务,它是有自己两张固定的表结构的,如果想使用自己的cron表达式和表数据,还必须将自己的数据同步到
django_celery_beat_periodictask
和django_celery_beat_crontabschedule
表当中。博主就想能不能使用自己的表去执行任务,统计结果,因为每次网上述的两个表同步数据感觉处理下来也不是很方便。 - Scheduler数据的加载的时机和机制,如果不了解这个的话,确实是无从下手,就算初始化了数据也不清楚是怎么同步到任务堆里的,需要好好看一下上述的执行流程图。
- crontab对象的参数和标准的cron表达式不一致导致的报错,包括长度的区别,对于0/4这种格式的处理区别。
- 理论上还确少,数据动态变化时实时的变更
schedule
的方法,博主这边偷懒使用了定时刷新的机制,这个使用场景对于配置实时性感知要求不高的情况下是满足要求的。