Celery 是一个基于 Python 的分布式任务队列框架,它允许你在不同的进程甚至不同的服务器上异步执行任务。
特点
- 简单:易于使用和配置,提供了简洁的 API。
- 高可用:支持任务的可靠交付,即使在出现故障时也能保证任务不丢失。
- 分布式:可以在多个节点上分布式执行任务,提高系统的处理能力和可扩展性。
- 多语言支持:虽然主要用 Python 编写,但也支持其他语言与它进行交互。
架构
- 任务队列:存储等待处理的任务,通常使用消息中间件(如
RabbitMQ
、Redis
等)来实现。 - 工作者(Worker):负责从任务队列中获取任务并执行。
- 应用(App):定义任务和配置 Celery 的相关设置。
1.安装配置
安装 Celery:pip install celery
配置消息中间件:如果使用 Redis 作为消息中间件,则需要在Celery 配置中指定 Redis 作为后端和消息代理。
定义任务:在 Python 代码中定义需要异步执行的任务函数,使用@app.task
装饰器将函数标记为 Celery 任务。
新建 celery_tasks.py
文件, 定义了两个任务
import time
from celery import Celery# 创建Celery应用
app = Celery('celery_app',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0',broker_transport_options={'global_keyprefix': 'celery_tasks:'},result_backend_transport_options={'global_keyprefix': 'celery_tasks:'})# 定义发送消息任务
@app.task
def send_msg(message):print(f"开始发送消息: {message}")time.sleep(3)print(f"发送消息完成: {message}")return message# 定义发送邮箱任务
@app.task
def send_email(email):print(f"开始发送邮箱: {email}")time.sleep(3)print(f"发送邮箱完成: {email}")return email
'celery_app'
:这是应用的名称broker
:指的是消息代理,其作用是在任务生产者(调用任务的代码)和任务消费者(Celery 工作进程)之间传递任务消息。backend
:是结果后端,用来存储任务的执行结果。当任务执行完成后,结果会被存储到结果后端中,之后可以通过任务的 ID 来获取这些结果
2.启动工作者
启动工作者:在命令行中启动 Celery 工作者,监听任务队列并执行任务。
celery -A celery_tasks worker --loglevel=info -P eventlet
-A celery_tasks
: -A 参数指定了包含 Celery 应用实例的模块名。
Celery 会去加载celery_tasks.py
文件中的app
实例,并启动一个工作进程来监听消息队列。当有新的send_email
任务或者send_email
任务被发送到消息队列时,工作进程就会执行该任务。worker
: 表示你想要启动一个 worker 进程来执行任务队列中的任务。--loglevel=info
: 设置日志级别为 info-P eventlet
: 指定使用 eventlet 作为并发池的实现。默认情况下,Celery 使用的是 prefork(多进程)模型,但在某些环境下,比如 Windows 上,或者当你需要更高效的 I/O 操作时,可以使用 eventlet 或 gevent 这样的协程库来代替。Eventlet 提供了基于 green thread 的并发模型,对于 I/O 密集型的任务来说可能更加高效,并且解决了在 Windows 上由于缺少 fork 支持而可能出现的问题。安装:pip install eventlet
启动之后redis数据库会生成这几个
3.调用任务
调用任务:在其他地方的代码中,可以通过调用任务函数的delay
或apply_async
方法来将任务发送到消息队列中。
from celery_tasks import send_msg, send_emailresult_msg = send_msg.delay('Hello, World!')
result_email = send_email.delay('123456@qq.com')print(f"Message task ID: {result_msg.id}")
print(f"Email task ID: {result_email.id}")
Message task ID: ab6ecfcc-ee26-4265-8097-5087786e4659
Email task ID: 1cf3f8f8-4e1a-4d8c-88a5-f2a32356f763
运行完成,获取到了ID
可以看到Celery worker 是能够并行处理这两个任务的
于此同时,redis数据库多了两个,我们就可以根据ID取获取数据