celery独立部署接入数据库配置

目录结构:

config下配置:

__init__:

import os
import sys
sys.path.append(os.getcwd())
from celery import Celeryapp=Celery('celeryTester') # 创建一个Celery实例,名字自定义
app.config_from_object('config.celery_config') # 从celery_config获取配置

celery_config:

from .celery_signals import task_failure_handler,task_postrun_handler # 全局信号
broker_url = 'redis://xxx' # 作为任务队列,beat及手动调用往里添加任务,worker从这里取任务
result_backend = 'redis://xxx'    # 任务结果
broker_connection_retry_on_startup=True # 重连可以不配
# 序列化相关,默认都是json
task_serializer='json'
accept_content=['json']
result_serializer='json'
#时区设置
timezone='Asia/Shanghai'
enable_utc=True
# 从哪些模块获取任务,一般是从根目录启动worker及beat进程,目录以根目录起
include=['tasks.sample_tasks']
# 使用自定义调度器
beat_schedule= {} # 动态获取这里配置为空
beat_scheduler = 'my_scheduler.dynamic_scheduler.DynamicScheduler' # 使用自定义scheduler类
beat_scheduler_reload_interval = 300  # beat_scheduler多久重新加载,可覆盖默认值

celery_signals:

import datetimefrom celery.signals import task_postrun, task_failure
from utils.coon import DatabaseConnection@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):# 只记录有返回值的成功任务,暂时只更新periodic_task表里的状态if state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': True},# set值'task=%s',(task.name,))# where后condition值@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 任务失败走此信号机制duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': False},'task=%s',(task.name,))

dynamic_scheduler:

import json
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from utils.coon import DatabaseConnectionclass DynamicScheduler(Scheduler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._schedule = {}self._last_reload = time.time()self.reload_interval = kwargs.get('reload_interval', 60*5)  # 默认300秒重载def setup_schedule(self):"""初始化加载调度配置,起beat进程时会调用"""self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""从配置源加载调度配置"""schedule = {}tasks =DatabaseConnection().fetch_all('select * from periodic_task where enabled=1')for item in tasks:name = item['name']if item['interval']:sche = item['interval']else:cron = item['crontab']minute, hour, day, month, week_day = cron.strip().split(' ')sche = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)item['args']=item['args'] and json.loads(item['args'])item['kwargs']=item['kwargs'] and json.loads(item['kwargs'])schedule[name] = ScheduleEntry(name=name,task=item['task'],schedule=sche,args=item.get('args' or ()),kwargs=item.get('kwargs', {}))return scheduledef tick(self, event_t=..., min=..., max=...):"""重载tick方法实现定期检查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:"""返回当前调度配置"""return self._schedule

sample_tasks:

from config import app@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def hello_world():return "Hello World!"

coon下面就是一个数据库连接类,自己随便找个就行这里就不贴出来了

启动命令:

windows机器下
worker进程:
celery -A app worker --loglevel=info --pool=solo
beat进程
celery -A app beat --loglevel=info

最后解释下原理,一般最简单的是在celery_config配置beat_schedule,我们是通过自定义Scheduler类,从数据库里面取值,类似于动态拿到这个beat_schedule值。好处就是可以直接通过配置修改或者添加定时任务,不用再去代码修改添加了,并在最开始和结束添加落表等操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/906446.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

攻防世界-题目名称-文件包含

进入环境 看到 include(),想到文件包含,用php伪协议 /?filenamephp://filter/readconvert.base64-encode/resourceflag.php do not hack!猜测可能是黑名单检测的敏感字符 输入单个字符串/?filenamebase64 还是显示do not hack! 构造payl…

MySQL高频面试八连问(附场景化解析)

文章目录 "为什么订单查询突然变慢了?"——从这个问题开始说起一、索引的生死时速(必考题!)二、事务的"套娃"艺术三、锁机制的相爱相杀四、存储引擎的抉择五、慢查询的破案技巧六、分页的深度优化七、高可用架…

Android 中 自定义生成的 APK/AAR 文件名称

在 Kotlin DSL 中,可以通过配置 build.gradle.kts 文件来自定义生成的 APK 或 AAR 文件名称。 1、自定义 APK 名称 在模块的 build.gradle.kts 中通过修改 applicationVariants.all 配置来实现。 android {......applicationVariants.all {outputs.all {val df …

《从零开始:Spring Cloud Eureka 配置与服务注册全流程》​

关于Eureka的学习,主要学习如何搭建Eureka,将order-service和product-service都注册到Eureka。 1.为什么使用Eureka? 我在实现一个查询订单功能时,希望可以根据订单中productId去获取对应商品的详细信息,但是产品服务和订单服…

鸿蒙开发进阶:深入解析ArkTS语言特性与高性能编程实践

一、前言 在鸿蒙生态蓬勃发展的当下,开发者对于高效、优质的应用开发语言需求愈发迫切。ArkTS 作为鸿蒙应用开发的核心语言,在继承 TypeScript 优势的基础上,进行了诸多优化与扩展,为开发者带来了全新的编程体验。本文将深入剖析…

ARM-Linux 完全入门

1.准备部分 1.1 虚拟机安装 准备VMware软件、ubuntu系统镜像安装过程 VMware安装 破解(自己百度破解码,多试几个网址,会有能用的)Ubuntu安装 配置联网 桥接 虚拟机Ubuntu系统必须能连接到外网,不然不能更新软件安装…

深度学习驱动下的目标检测技术:原理、算法与应用创新(三)

五、基于深度学习的目标检测代码实现 5.1 开发环境搭建 开发基于深度学习的目标检测项目,首先需要搭建合适的开发环境,确保所需的工具和库能够正常运行。以下将详细介绍 Python、PyTorch 等关键开发工具和库的安装与配置过程。 Python 是一种广泛应用于…

致敬经典 << KR C >> 之打印输入单词水平直方图和以每行一个单词打印输入 (练习1-12和练习1-13)

1. 前言 不知道有多少同学正在自学C/C, 无论你是一个在校学生, 还是已经是上班族. 如果你想从事或即将从事软件开发这个行业, C/C都是一个几乎必须要接触的系统级程序开发语言. 虽然现在有Rust更安全的系统级编程语言作为C/C的替代, 但作为入门, C应该还是要好好学的. C最早由B…

【Leetcode 每日一题】3355. 零数组变换 I

问题背景 给定一个长度为 n n n 的整数数组 n u m s nums nums 和一个二维数组 q u e r i e s queries queries,其中 q u e r i e s [ i ] [ l i , r i ] queries[i] [l_i, r_i] queries[i][li​,ri​]。 对于每个查询 q u e r i e s [ i ] queries[i] quer…

[java八股文][Java虚拟机面试篇]垃圾回收

什么是Java里的垃圾回收?如何触发垃圾回收? 垃圾回收(Garbage Collection, GC)是自动管理内存的一种机制,它负责自动释放不再被程序引用的对象所占用的内存,这种机制减少了内存泄漏和内存管理错误的可能性…

ubuntu服务器版启动卡在start job is running for wait for...to be Configured

目录 前言 一、原因分析 二、解决方法 总结 前言 当 Ubuntu 服务器启动时,系统会显示类似 “start job is running for wait for Network to be Configured” 或 “start job is running for wait for Plymouth Boot Screen Service” 等提示信息,并且…

Android 手写签名功能详解:从原理到实践

Android 手写签名功能详解 1. 引言2. 手写签名核心实现:SignatureView 类3. 交互层实现:MainActivity 类4. 布局与配置5. 性能优化与扩展方向 1. 引言 在电子政务、金融服务等移动应用场景中,手写签名功能已成为提升用户体验与业务合规性的关…

【nRF9160 常用prj.conf配置与AT指令介绍】

参考资料: 技术讨论:Q群:542294007 nRF91 NCS SDK安装工具与SDK安装包等常用软件下载地址 云盘下载:pan.olib.cn 一、nRF9160 常用prj.conf配置介绍 nRF9160通过prj.conf配置网络模式为:CAT-M模式 CONFIG_LTE_NETWOR…

小型化边缘计算设备

以下是关于小型化边缘计算设备的核心技术与应用特点的综合分析: 一、核心硬件平台与算力表现‌ NVIDIA Jetson Orin系列‌ Jetson Orin Nano‌:配备1024个CUDA核心和32个Tensor核心,支持高达100 TOPS的AI算力,适用于机器人、无…

css使用clip-path属性切割显示可见内容

1. 需求 想要实现一个渐变的箭头Dom&#xff0c;不想使用svg、canvas去画&#xff0c;可以考虑使用css的clip-path属性切割显示内容。 2. 实现 <div class"arrow">箭头 </div>.arrow{width: 200px;height: 60px;background-image: linear-gradient(45…

Kotlin与物联网(IoT):Android Things开发探索

在物联网&#xff08;IoT&#xff09;领域&#xff0c;Kotlin 凭借其简洁性、安全性和与 Java 生态的无缝兼容性&#xff0c;逐渐成为 Android Things 开发的有力工具。尽管 Google 已于 2022 年宣布停止对 Android Things 的官方支持&#xff0c;但其技术思想仍值得探索&#…

2025年AI搜索引擎发展洞察:技术革新与市场变革

引言&#xff1a;AI搜索的崛起与市场格局重塑 2024-2025年&#xff0c;AI搜索市场迎来了前所未有的变革期。随着DeepSeek-R1等先进大语言模型的推出&#xff0c;传统搜索引擎、AI原生搜索平台以及各类内容平台纷纷加速智能化转型&#xff0c;推动搜索技术从基础信息检索向深度…

基于 ESP32 与 AWS 全托管服务的 IoT 架构:MQTT + WebSocket 实现设备-云-APP 高效互联

目录 一、总体架构图 二、设备端(ESP32)低功耗设计(适配 AWS IoT) 1.MQTT 设置(ESP32 连接 AWS IoT Core) 2.低功耗策略总结(ESP32) 三、云端架构(基于 AWS Serverless + IoT Core) 1.AWS IoT Core 接入 2.云端 → APP:WebSocket 推送方案 流程: 3.数据存…

【LeetCode 热题 100】有效的括号 / 最小栈 / 字符串解码 / 柱状图中最大的矩形

⭐️个人主页&#xff1a;小羊 ⭐️所属专栏&#xff1a;LeetCode 热题 100 很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~ 目录 栈有效的括号最小栈字符串解码每日温度柱状图中最大的矩形 堆数组中的第K个最大元素 栈 有效的括号 有效的括号 cl…

Petalinux

Petalinux 命令 参考《UG 1157 PetaLinux Command Line Reference Guide》 //创建petalinux工程 petalinux-create -t project --template zynq -n <name> //配置工程 cd 上一步的工程 petalinux-config --get-hw-description ../xsa_folder///配置Linux内核 petalinux-…