uwsgi部署多进程django apscheduler与问题排查

  • 💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。
  • 📝 CSDN主页:Zeeland🔥
  • 📣 我的博客:Zeeland
  • 📚 Github主页: Undertone0809 (Zeeland)
  • 🎉 支持我:点赞👍+收藏⭐️+留言📝
  • 💬介绍:The mixture of software dev+Iot+ml+anything🔥

简介

之前写django的apscheduler一直采用decorator的方式构建,因为业务的定时任务是定死的,没有产生什么其他的问题。最近定时任务需要做动态增减,进行定时任务的动态设置,因此传统的decorator写法行不通了,需要用scheduler.add_job()来添加job。

最开始以为只是一个简单的函数替换,没想到替换了一下出现了很多不一样的问题,故笔者打算记录一下其中发生的问题。

本文将介绍如何使用uwsgi部署django+apscheduler的系统,为了聚焦重点,本文不细说的uwsgi.ini等文件的配置过程,只介绍如何在django中编写构建一个apscheduler系统,并且部署在uwsgi服务器上。

背景介绍

上面已经简单介绍过了,下面介绍一下笔者原有的apscheduler构建方式。在项目根目录下构建了一个jobs/core_job.py的文件,在里面编写核心的任务调度,大致代码如下所示:

from typing import List, Dict, Anyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_jobfrom app import models
from commons.log_util import get_logger, enable_log
from services.db import db_service
from services.huasi import huasi_storagelogger = get_logger()
enable_log()# 创建后台调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")@register_job(scheduler,"interval",seconds=60 * 120 * 1,id="store A device data",replace_existing=True,
)
def store_pingxiang_device_job():...logger.info("[store A device job] finish")@register_job(scheduler,"interval",seconds=60 * 60 * 1,id="store B device data",replace_existing=True,
)
def store_baiyun_device_job():...logger.info("[store B device job] finish")def start_scheduler():"""获取需要存储任务调度的项目,开启任务调度"""scheduler.start()logger.info("[django scheduler] start scheduler")def resume_scheduler():logger.info("[django scheduler] resume schedule")scheduler.resume()def stop_scheduler():logger.info("[django scheduler] pause schedule")scheduler.pause()

一开始,我把它放到子模块的apps.py ready()里面,如下所示:

from django.apps import AppConfigclass AppConfig(AppConfig):default_auto_field = "django.db.models.BigAutoField"name = "app"verbose_name = "xxx管理系统"def ready(self):from jobs.core_job import start_schedulerstart_scheduler()

后面我发现,这是一个很不明智的选择,因为在uwsgi中,如果你的processes不为1,即你要开启多进程模式时,ready()会被执行多次,而如果你的job是固定的,那么你就会出现下面这种问题,django apscheduler在初始化添加task到数据库时,发生pymysql.err.IntegrityError: (1062, "Duplicate entry 'store A device job' for key 'django_apscheduler_djangojob.PRIMARY'")这种问题。

根据提供的错误信息,问题是由于数据表 django_apscheduler_djangojob 中存在重复的条目导致的完整性错误(IntegrityError)。具体错误信息是:“Duplicate entry ‘store A device job’ for key ‘django_apscheduler_djangojob.PRIMARY’”,意思是在数据库中的 django_apscheduler_djangojob 表的主键上存在重复的条目,具体插入的条目是 ‘store A device job’。

要解决这个问题,你可以检查下你的代码中是否有重复插入相同数据的操作,或者手动清理数据库中的重复数据,确保每个条目在主键上是唯一的。你可以使用 SQL 命令来删除重复的条目,或者使用 Django ORM 提供的方法进行清理操作,具体取决于你的应用程序的需求和代码实现。

在数据库中,多次执行添加job会导致重复性问题,这也是为什么放到子模块的apps.py ready()不是一个明智的选择。

而事实上,如果你在uwsgi.ini中设置了多进程模式,那么放在项目目录下的settings.py中,也会产生同样的问题,因为两个进程的变量是独立的,而同时执行两个程序则会出现数据库字段重复的问题,从而产生报错。

解决多进程多次启动apscheduler的问题

方案1

要实现在多个worker中只运行一次apscheduler程序,可以使用分布式锁的概念。在这种方案中,只有一个worker能够获得锁并执行apscheduler程序,其他的worker会检测到锁已经被获取,并根据需要等待或跳过执行。

以下是一种可能的解决方案:

  1. 使用一个共享的可靠存储系统,如数据库或分布式缓存(例如Redis)来存储锁的状态。

  2. 在你的Django应用中,使用一个装饰器或中间件来实现锁的逻辑。这个装饰器/中间件的作用是在apscheduler程序执行之前检查锁的状态,并根据情况决定是否执行程序。

下面是一个简单的实现示例,使用Redis作为存储系统:

import redis
from functools import wrapsdef apscheduler_lock(func):@wraps(func)def wrapper(*args, **kwargs):lock_key = "apscheduler_lock"lock_value = "locked"# 连接到Redisredis_client = redis.Redis(host='localhost', port=6379)# 尝试获取锁acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)if acquired_lock:# 如果成功获得了锁,执行apscheduler程序result = func(*args, **kwargs)# 执行完毕后释放锁redis_client.delete(lock_key)return resultelse:# 没有获得锁,跳过执行apscheduler程序return None  # 可以根据需要返回其他信息return wrapper

在你的apscheduler任务函数上应用这个装饰器,例如:

from apscheduler.schedulers.background import BackgroundScheduler@schedudler_lock
def my_apscheduler_task():# 执行你的apscheduler任务pass# 创建一个定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(my_apscheduler_task, 'interval', minutes=10)
scheduler.start()

这样,无论你启动多少个worker(也可以使用多个uwsgi进程),只有一个worker能够获取到锁并执行apscheduler程序,其他的worker会跳过执行。

请根据你的实际情况适配这个示例,并确保你的共享存储系统的配置和连接代码是正确的。

方案2

如果你觉得分布式锁比较麻烦,也可以参考django-apscheduler作者提供的方案,使用manage.py操作命令行的方式启动的apscheduler,这种方式比较自由,在多进程的django启动时候,再执行命令启动apscheduler,可以避免上述阐述的重复添加job的问题。

Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:

# runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utillogger = logging.getLogger(__name__)def my_job():# Your job processing logic here...pass# The `close_old_connections` decorator ensures that database connections, that have become
# unusable or are obsolete, are closed before and after your job has run. You should use it
# to wrap any jobs that you schedule that access the Django database in any way. 
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""This job deletes APScheduler job execution entries older than `max_age` from the database.It helps to prevent the database from filling up with old historical records that are nolonger useful.:param max_age: The maximum length of time to retain historical job execution records.Defaults to 7 days."""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.add_job(my_job,trigger=CronTrigger(second="*/10"),  # Every 10 secondsid="my_job",  # The `id` assigned to each job MUST be uniquemax_instances=1,replace_existing=True,)logger.info("Added job 'my_job'.")scheduler.add_job(delete_old_job_executions,trigger=CronTrigger(day_of_week="mon", hour="00", minute="00"),  # Midnight on Monday, before start of the next work week.id="delete_old_job_executions",max_instances=1,replace_existing=True,)logger.info("Added weekly job: 'delete_old_job_executions'.")try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")

The management command defined above should be invoked via ./manage.py runapscheduler whenever the webserver serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which webserver you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.

Register any APScheduler jobs as you would normally. Note that if you haven’t set DjangoJobStore as the ‘default’ job store, then you will need to include jobstore=‘djangojobstore’ in your scheduler.add_job() calls.

总结

本文总结了使用uwsgi部署多进程的django apscheduler可能会产生的问题,并提供了两种解决方案,如果你有更好的想法,欢迎在评论区一起讨论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Simulink的用于电力系统动态分析

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

【正版软件】Air Explorer - 一个程序访问您的所有云服务

前言:Air Explorer支持最好的云服务。 功能特点: 直接管理云中的文件 设置同一服务上的多个帐户 您可以在任何云服务或计算机之间同步文件夹 云文件浏览器易于使用 通过加入您的所有云服务来增加存储空间 应用程序适用于Windows/Mac Air Explorer…

vue+antd——table组件实现动态列+表头下拉选择功能——技能提升

Table 表格 展示行列数据。 何时使用 当有大量结构化的数据需要展现时; 当需要对数据进行排序、搜索、分页、自定义操作等复杂行为时。 最近在写vueantd的框架,遇到一个需求:就是要实现table表格的动态列,并且相应的表头要实现下拉…

Scrapy的基本介绍、安装及工作流程

一.Scrapy介绍 Scrapy是什么? Scrapy 是用 Python 实现的一个为了爬取网站数据、提取结构性数据而编写的应用框架(异步爬虫框架) 通常我们可以很简单的通过 Scrapy 框架实现一个爬虫,抓取指定网站的内容或图片。 Scrapy使用了Twisted异步网络框架&…

攻防世界-WEB-NewsCenter

打开环境 有查询,猜测是sql注入 保存请求头到文件中 准备利用sqlmap 查找数据库 python sqlmap.py -r ./123.txt --dbs 查找表 python sqlmap.py -r ./123.txt --tables -D news 查找字段 python sqlmap.py -r ./123.txt --column -D news -T secret_table 显示字…

【Java Web】Servlet规范讲解

目录 一、前言 二、Servlet规范介绍 2.1 常见版本及新功能 2.2 Servlet的作用 2.3 Servlet的本质 三、Servlet接口和实现类 3.1 Servlet接口 3.2 Servlet接口实现类示例 3.3 Servlet接口实现类开发步骤 3.3.1 关键点 3.3.2 引入Servlet源码包 1、描述 Servlet接口…

Java-day13(IO流)

IO流 凡是与输入,输出相关的类,接口等都定义在java.io包下 1.File类的使用 File类可以有构造器创建其对象,此对象对应着一个文件(.txt,.avi,.doc,.mp3等)或文件目录 File类对象是与平台无关的 File中的方法仅涉及到如何创建,…

OpenCV(二十三):中值滤波

1.中值滤波的原理 中值滤波(Median Filter)是一种常用的非线性图像滤波方法,用于去除图像中的椒盐噪声等离群点。它的原理是基于邻域像素值的排序,并将中间值作为当前像素的新值。 2.中值滤波函数 medianBlur() void cv::medianBl…

dyld: Symbol not found: __ZNSt3__113basic_filebufIcNS_11char_traitsIcEEEC1Ev

问题描述 当我修改了几行代码,build了新的lib并集成到app以后,app 在mac11 的OS上运行良好,但是在 mac11 以及更多版本上,app持续crash,launch不起来。 使用terminal 打开app的时候,输出如下错误&#xff…

怎么把视频转换成mp4格式

怎么把视频转换成mp4格式?如今,随着科技的不断发展,我们在工作中接触到的多媒体视频格式也越来越多。其中,MP4作为一种广泛兼容的视频格式,在许多软件中都能轻松播放,并且成为了剪辑与裁剪视频时大家常用的…

2023年高教社杯数学建模国赛 赛题浅析

2023年国赛如期而至,为了方便大家尽快确定选题,这里将对赛题进行浅析,以分析赛题的主要难点、出题思路以及选择之后可能遇到的难点进行说明,方便大家尽快确定选题。 难度排序 B>A>C 选题人数 C>A>B (预估结果&…

机器学习笔记之最优化理论与方法(三)凸集的简单认识(下)

机器学习笔记之最优化理论与方法——凸集的简单认识[下] 引言回顾:基本定义——凸集关于保持集合凸性的运算仿射变换 凸集基本性质:投影定理点与凸集的分离支撑超平面定理 引言 继续凸集的简单认识(上)进行介绍,本节将介绍凸集的基本性质以及…

HTTPS加密协议详解:HTTPS性能与优化

1、HTTPS性能损耗 前文讨论了HTTPS原理与优势:身份验证、信息加密与完整性校验等,且未对TCP和HTTP协议做任何修改。但通过增加新协议以实现更安全的通信必然需要付出代价,HTTPS协议的性能损耗主要体现如下: (1).增加延时 分析前…

C++ primer plus第十二章编程练习答案

1.对于下面的声明: class Cow{ char name[20] ; char hobby; double weight ; public: Cow(); Cow(const char * nm,const char * ho,double wt); Cow(const Cow c&); ~Cow(); Cow operator(const Cow & c); void ShowCow() const; // display all cow da…

02-zookeeper分布式锁案例

1 Zookeeper分布式案例 1.1 Zookeeper分布式锁原理 核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。 当我们假设根节点/ 下有/locks节点时 1)客户端获取锁时,在locks节点下创建临时顺序…

go小知识2

Golang开发新手常犯的50个错误_gezhonglei2007的博客-CSDN博客 一些题目整理,附带大佬的解释 1.go中哪些值不能寻址& 常量(const常量,字面值3.14,字符串“xxx”,函数或方法, map的val值) golang中接…

Python、Rust中的协程

协程 协程在不同的堆栈上同时运行,但每次只有一个协程运行,而其调用者则等待: F启动G,但G并不会立即运行,F必须显式的恢复G,然后 G 开始运行。在任何时候,G 都可能转身并让步返回到 F。这会暂停 G 并继续…

OpenCV(二十五):边缘检测(一)

目录 1.边缘检测原理 2.Sobel算子边缘检测 3.Scharr算子边缘检测 4.两种算子的生成getDerivKernels() 1.边缘检测原理 其原理是基于图像中灰度值的变化来捕捉图像中的边界和轮廓。梯度则表示了图像中像素强度变化的强弱和方向。 所以沿梯度方向找到有最大梯度值的像素&…

Linux网络编程 网络基础知识

目录 1.网络的历史和协议的分成 2.网络互联促成了TCP/IP协议的产生 3.网络的体系结构 4.TCP/IP协议族体系 5.网络各层的协议解释 6.网络的封包和拆包 7.网络预备知识 1.网络的历史和协议的分成 Internet-"冷战"的产物 1957年十月和十一月,前苏…

C#__线程的优先级和状态控制

线程的优先级: 一个CPU同一时刻只能做一件事情,哪个线程优先级高哪个先运行,优先级相同看调度算法。 在Thread类中的Priority属性(Highest,Above,Normal,BelowNormal,Lowest)可以影响线程的优先级 关于…