利用 Celery 构建 Web 服务的后台任务调度模块

来源:http://www.tuicool.com/articles/Enaeymm


任务队列在 Web 服务里的应用

在 Web2.0 后的时代,社交网站、搜索引擎的的迅猛发展对 Web 服务的后台管理系统提出了更高的需求。考虑几个常见的使用场景:

  1. 社交网站的用户在其主页发布了一组新的照片,这条新鲜事需要适时地推送至该用户的所有好友。该网站的活跃用户有千万级别,在同一时刻会有非常多的“新鲜事推送”任务需要处理,并且每个用户的好友数会达到 1000+的级别。出于用户体验的考虑,用户发布照片的这个操作需要在较短时间内得到反馈。
  2. 在文献搜索系统的主页,用户可以查到当前一小时内最热门的十大文献,并且能够直接访问该文献。该文献管理系统所管理的文献数量非常多,达到 PB 的级别。处于用户体验的考虑,用户获得十大热门文献这个动作需要在较短时间内获得反馈。

考虑对于高并发大用户量的 Web 服务系统,对于场景一和场景二中的需求,如果在请求处理周期内完成这些任务,然后再返回结果,这种传统的做法会导致用户等待的时间过长。同时 Web 服务管理后台对任务处理能力也缺乏扩展性。

在这种场景下,任务队列是有效的解决方案。在一个任务队列系统中,“将新鲜事推送至用户 A 的所有好友”或者“查询当前最热门的十大文献”这种查询或者计算工作可以被当成一个“任务”。在任务队列系统中,一般有任务生产者、任务处理中间方以及任务消费者三方。其中任务生产者负责生产任务,比如“将新鲜事推送至用户 A 的所有好友”这一任务的发起方就可以称作任务生产者。任务处理中间方负责接收任务生产者的任务处理请求,对任务进行调度,最后将任务分发给任务消费者来进行处理。任务消费者就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。在生产方、消费者和任务处理中间方之间一般使用消息传递的方式来进行通信。

在任务队列系统框架中,任务消费者可以跨越不同的服务节点,可以动态地增加节点来增加系统的任务处理能力,非常适合高并发、需要横向扩展的 Web 服务后台。

回页首

Celery: 基于 Python 的开源分布式任务调度模块

Celery 是一个用 Python 编写的分布式的任务调度模块,它有着简明的 API,并且有丰富的扩展性,适合用于构建分布式的 Web 服务。

图 1. Celery 的模块架构


Celery 的模块架构较为简洁,但是提供了较为完整的功能:

任务生产者 (task producer)

任务生产者 (task producer) 负责产生计算任务,交给任务队列去处理。在 Celery 里,一段独立的 Python 代码、一段嵌入在 Django Web 服务里的一段请求处理逻辑,只要是调用了 Celery 提供的 API,产生任务并交给任务队列处理的,我们都可以称之为任务生产者。

任务调度器 (celery beat)

Celery beat 是一个任务调度器,它以独立进程的形式存在。Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。Celery beat 是 Celery 系统自带的任务生产者。系统管理员可以选择关闭或者开启 Celery beat。同时在一个 Celery 系统中,只能存在一个 Celery beat 调度器。

任务代理 (broker)

任务代理方负责接受任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方 (celery worker)。因为任务处理是基于 message(消息) 的,所以我们一般选择 RabbitMQ、Redis 等消息队列或者数据库作为 Celery 的 message broker。

任务消费方 (celery worker)

Celery worker 就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

结果保存

Celery 支持任务处理完后将状态信息和结果的保存,以供查询。Celery 内置支持 rpc, Django ORM,Redis,RabbitMQ 等方式来保存任务处理后的状态信息。

回页首

构建第一个 Celery 程序

在我们的第一个 Celery 程序中,我们尝试在 Celery 中构建一个“将新鲜事通知到朋友”的任务,并且尝试通过编写一个 Python 程序来启动这个任务。

安装 Celery

Pip install celery

选择合适的消息代理中间件

Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件,在本文中,我们选择 RabbitMQ 作为消息代理中间件。

sudo apt-get install rabbitmq-server

创建 Celery 对象

Celery 对象是所有 Celery 功能的入口,所以在开始其它工作之前,我们必须先定义我们自己的 Celery 对象。该对象定义了任务的具体内容、任务队列的服务地址、以及保存任务执行结果的地址等重要信息。

# notify_friends.py
from celery import Celery
import time
app = Celery('notify_friends', backend='rpc://', broker='amqp://localhost')@app.task
def notify_friends(userId, newsId):print 'Start to notify_friends task at {0}, userID:{1} newsID:{2}'.format(time.ctime(), userId, newsId)time.sleep(2)print 'Task notify_friends succeed at {0}'.format(time.ctime())return True

在本文中,为了模拟真实的应用场景,我们定义了 notify_friends 这个任务,它接受两个参数,并且在输出流中打印出一定的信息,

创建 Celery Worker 服务进程

在定义完 Celery 对象后,我们可以创建对应的任务消费者--Celery worker 进程,后续的任务处理请求都是由这个 Celery worker 进程来最终执行的。

celery -A celery_test worker --loglevel=info

在 Python 程序中调用 Celery Task

我们创建一个简单的 Python 程序,来触发 notify_friends 这个任务。

# call_notify_friends.pyfrom notify_friends import notify_friends
import timedef notify(userId, messageId):result = notify_friends.delay(userId, messageId)while not result.ready():time.sleep(1)print result.get(timeout=1)if __name__ == '__main__':notify('001', '001')

我们在 call_notify_friends.py 这个程序文件中,定义了 Notify 函数,它调用了我们之前定义的 notify_friends 这个 API,来发送任务处理请求到任务队列,并且不断地查询等待来获得任务处理的结果。

Celery worker 中的 log 信息:

[tasks]. celery_test.notify_friends[2015-11-16 15:02:31,113: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-11-16 15:02:31,122: INFO/MainProcess] mingle: searching for neighbors
[2015-11-16 15:02:32,142: INFO/MainProcess] mingle: all alone
[2015-11-16 15:02:32,179: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready.
[2015-11-16 15:04:45,474: INFO/MainProcess] Received task: 
celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c]
[2015-11-16 15:04:45,475: WARNING/Worker-2] Start to notify_friends task at 
Mon Nov 16 15:04:45 2015, userID:001 newsID:001
[2015-11-16 15:04:47,477: WARNING/Worker-2] Task notify_friends succeed at Mon Nov 16 15:04:47 2015
[2015-11-16 15:04:47,511: INFO/MainProcess] Task celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] succeeded in 2.035536565s: True

我们可以看到,Celery worker 收到了 Python 程序的 notify_friends 任务的处理请求,并且执行完毕。

回页首

利用调度器创建周期任务

在我们第二个 Celery 程序中,我们尝试构建一个周期性执行“查询当前一小时最热门文献”的任务,每隔 100 秒执行一次,并将结果保存起来。后续的搜索请求到来后可以直接返回已有的结果,极大优化了用户体验。

创建配置文件

Celery 的调度器的配置是在 CELERYBEAT_SCHEDULE 这个全局变量上配置的,我们可以将配置写在一个独立的 Python 模块,在定义 Celery 对象的时候加载这个模块。我们将 select_populate_book 这个任务定义为每 100 秒执行一次。

# config.py
from datetime import timedeltaCELERYBEAT_SCHEDULE = {'select_populate_book': {'task': 'favorite_book.select_populate_book','schedule': timedelta(seconds=100),},
}

创建 Celery 对象

在 Celery 对象的定义里,我们加载了之前定义的配置文件,并定义了 select_populate_book 这个任务。

#favorite_book.py
from celery import Celery
import timeapp = Celery('select_populate_book', backend='rpc://', broker='amqp://localhost')
app.config_from_object('config')@app.task
def select_populate_book():print 'Start to select_populate_book task at {0}'.format(time.ctime())time.sleep(2)print 'Task select_populate_book succeed at {0}'.format(time.ctime())return True

启动 Celery worker

celery -A favorite_book worker --loglevel=info

启动 Celery beat

启动 Celery beat 调度器,Celery beat 会周期性地执行在 CELERYBEAT_SCHEDULE 中定义的任务,即周期性地查询当前一小时最热门的书籍。

celery -A favorite_book beatyuwenhao@yuwenhao:~$ celery -A favorite_book beat
celery beat v3.1.15 (Cipater) is starting.
__ - ... __ - _
Configuration ->. broker -> amqp://guest:**@localhost:5672//. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> now (0s)
[2015-11-16 16:21:15,443: INFO/MainProcess] beat: Starting...
[2015-11-16 16:21:15,447: WARNING/MainProcess] Reset: 
Timezone changed from 'UTC' to None
[2015-11-16 16:21:25,448: INFO/MainProcess] Scheduler: 
Sending due task select_populate_book (favorite_book.select_populate_book)
[2015-11-16 16:21:35,485: INFO/MainProcess] Scheduler:
Sending due task select_populate_book (favorite_book.select_populate_book)
[2015-11-16 16:21:45,490: INFO/MainProcess] Scheduler:
Sending due task select_populate_book (favorite_book.select_populate_book)

我们可以看到,Celery beat 进程周期性地将任务执行请求 select_populate_book 发送至任务队列。

yuwenhao@yuwenhao:~$ celery -A favorite_book worker --loglevel=info
[2015-11-16 16:21:11,560: WARNING/MainProcess] 
/usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']You must only enable the serializers that you will actually use.warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))-------------- celery@yuwenhao-VirtualBox v3.1.15 (Cipater)
---- **** ----- 
--- * *** * -- Linux-3.5.0-23-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app: select_populate_book:0x1b219d0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- 
--- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. favorite_book.select_populate_book[2015-11-16 16:21:11,579: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-11-16 16:21:11,590: INFO/MainProcess] mingle: searching for neighbors
[2015-11-16 16:21:12,607: INFO/MainProcess] mingle: all alone
[2015-11-16 16:21:12,631: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready.
[2015-11-16 16:21:25,459: INFO/MainProcess] Received task: 
favorite_book.select_populate_book[515f7c55-7ff0-4fcf-bc40-8838f69805fd]
[2015-11-16 16:21:25,460: WARNING/Worker-2] 
Start to select_populate_book task at Mon Nov 16 16:21:25 2015
[2015-11-16 16:21:27,462: WARNING/Worker-2] 
Task select_populate_book succeed at Mon Nov 16 16:21:27 2015
[2015-11-16 16:21:27,475: INFO/MainProcess] Task favorite_book.select_populate_book
[515f7c55-7ff0-4fcf-bc40-8838f69805fd] succeeded in 2.015802141s: True
[2015-11-16 16:21:35,494: INFO/MainProcess] Received task: 
favorite_book.select_populate_book[277d718a-3435-4bca-a881-a8f958d64aa9]
[2015-11-16 16:21:35,498: WARNING/Worker-1]
Start to select_populate_book task at Mon Nov 16 16:21:35 2015
[2015-11-16 16:21:37,501: WARNING/Worker-1] 
Task select_populate_book succeed at Mon Nov 16 16:21:37 2015
[2015-11-16 16:21:37,511: INFO/MainProcess] 
Task favorite_book.select_populate_book
[277d718a-3435-4bca-a881-a8f958d64aa9] succeeded in 2.014368786s: True

我们可以看到,任务 select_populate_book 的 Celery worker 周期性地收到 Celery 调度器的任务的处理请求,并且运行该任务。

回页首

结束语

任务队列技术可以满足 Web 服务系统后台任务管理和调度的需求,适合构建分布式的 Web 服务系统后台。Celery 是一个基于 Python 的开源任务队列系统。它有着简明的 API 以及良好的扩展性。本文首先介绍了队列技术的基本原理,然后介绍了 Celery 的模块架构以及工作原理。最后,本文通过实例介绍了如何在 Python 程序中调用 Celery API 并通过 Celery 任务队列来执行任务,以及如何通过 Celery beat 在 Celery 任务队列中创建周期性执行的任务。希望本文可以对 Web 后台开发者、以及 Celery 的初学者有所帮助。



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/496479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java元素符号是什么_Java 代码中 @ 符号是什么意思?

展开全部annotation。Annotation,是Java5的新特性,下面是Sun的Tutorial的描述,因为是英文,这里我翻译下,希望能够比较清晰32313133353236313431303231363533e58685e5aeb931333264633435的描述一下Annotation的语法以及…

风景这边独好的AI大年,百度亚马逊等巨头第一季度成绩单怎么样?

苹果春季新品发布会邀请函来源:遇见人工智能该来的还是来了,但是那又如何?这句话是送给苹果的。本以为这个季度不会再有新品发布的苹果,却用一张主打“同学们,来次课外活动吧”的神奇邀请函打消了所有人的怀疑。根据从…

Java加密与解密的艺术~数字证书~证书使用openssl

证书工具 /*** 2009-5-20*/ package org.zlex.chapter10_2;import java.io.FileInputStream; import java.security.KeyStore; import java.security.PrivateKey; import java.security.PublicKey; import java.security.Signature; import java.security.cert.Certificate; i…

Run-time system与虚拟机

Run-time system与虚拟机 一个Run-time系统,由一个执行引擎(Execution Engine)、一个机器级的调试器(Debugger)、一个汇编器(Assembler)和与之配套的开发工具(Tools)组成…

最近火了的自动驾驶全球产业链全景图

来源:传感器技术摘要:目前自动驾驶已经发展的很快,除了特斯拉和奥迪以外,很多豪车都引进了自动驾驶技术。下面来盘点一下自动驾驶产业链:首先看一下自动驾驶的等级标准:1、英特尔:Mobileye Alt…

java get set 注解_java技能提升,用Lombok甩掉get和set,让代码变得更简洁

前言前几天有个新来的同事(实习生)惊讶的对我说:我们的代码里好多错误,我的程序本地都启动不了。我一脸懵逼的质问他:目前线上的代码,怎么会有问题吗?他不服气的说:你来看嘛,就是有问题&#xf…

.pfx 证书和 .cer 证书

证书系列: 1:.pfx 证书和 .cer 证书 2:导入pfx证书 通常情况下,作为文件形式存在的证书一般有三种格式: 第一种:带有私钥的证书,由Public Key Cryptography Standards #12,PKCS#1…

HTML5、CSS、CSS3、SCSS (SASS) 相关教程

1、HTML5 教程 W3School HTML5 教程:http://www.w3school.com.cn/html5/index.asp 菜鸟网站 HTML5教程:http://www.runoob.com/html/html5-intro.html 知乎 零基础如何迅速学习HTML:https://www.zhihu.com/question/27018083 请问如何从头…

连接查询_左连接/右连接/全连接的区别

介绍表连接,更确切的说是inner joins內连接. 內连接仅选出两张表中互相匹配的记录.因此,这会导致有时我们需要的记录没有包含进来。 为更好的理解这个概念,我们介绍两个表作演示。苏格兰议会中的政党表(party)和议员表…

java getclass 相等_java使用反射比较两个bean对象属性值是否相等

import java.lang.reflect.Field;import java.lang.reflect.Method;import java.util.HashMap;import java.util.Map;import org.apache.log4j.Logger;public class DomainEquals {/*** 日志操作类*/private static Logger logger Logger.getLogger(DomainEquals.class);publi…

生命起源之谜:RNA世界假说将迎来终结?

○ 流行的理论认为,生命起源于物质丰富的化学汤,而 RNA 是化学汤中最初的自我复制单元。但是,多肽和RNA混合起来或许会更高效。 | 图片来源:Novikov Aleksey来源:科学出版社 撰文:Jordana Cepelewicz 翻…

PHP 学习路线

PHP 官网文档(中文):https://www.php.net/manual/zh/langref.php ThinkPhp (官方手册、入门教程):https://sites.thinkphp.cn/1556331 ​W3School PHP 教程:http://www.w3school.com.cn/php/index.asp w3cschool (在线教程&技术文档)&am…

jQuery 对话框 jQuery.plugin

jQuery 对话框 jQuery.plugin 强烈推荐对话框插件jquery.weebox.js,本站开源账务管理系统中使用的对话框组件,各种形式的对话框:确认、成功、警告、错误等 ………… 如下图的右下角: 账务管理系统(个人版)演示 图的右下角的框架就…

java 微信 sha1_【微信开发-JavaWeb】SHA1算法

微信开发-SHA1算法public static String getSha1(String str){if(str null || str.length()0){return null;}char hexDigits[]{0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f};try {MessageDigest mdTemp MessageDigest.getInstance("SHA1");mdTemp.update(str.getBytes("…

Java加密与解密的艺术~数字证书~证书管理openssl

OpenSSL功能远胜于KeyTool,可用于根证书,服务器证书和客户证书的管理 这里使用的是Win32OpenSSL_Light-1_0_1e.exe http://www.slproweb.com/products/Win32OpenSSL.html 1,构建根证书 构建根证书前,需要构建随机数文件&#xff0…

2018年聊天机器人状态报告

来源: 199IT互联网数据中心根据Drift、SurveyMonkey Audience、Salesforce和myclever的“2018年聊天机器人状态报告”,聊天机器人预计能够24小时为简单任务提供即时服务,但不是进行复杂查询的最佳渠道。聊天机器人尚未在消费者中找到广泛的吸…

PHP、MySQL 注入

Welcome to the NetSPI SQL Injection Wiki:https://sqlwiki.netspi.com/ 因为需要了解下 SQL 注入,就使用 PHP 自己写了一个只有一个网页的网站测试下,现在记录下过程。。。 直接使用的 KALI系统 (KALI官网:Kali Linux | Penetr…

在asp.net 2.0中使用SqlBulkCopy类迁移数据[转]

我们经常要在一个表中将数据迁移到另一个表,当然,用的方法十分多了。在.net 2.0中,提供了一个sqlbulkcopy类,也可以实现如下的操作,下面简单介绍下。比如一个表如下CREATE TABLE Person3(PersonID int IDENTITY(1,1) P…

jdk1.8 base64注意事项

由于jdk1.7和jdk1.8内置的Base64遵守的RFC协议不一致,jdk1.7按照照RFC1521实现的,jdk1.8是按照rfc4648和rfc2045两个协议来实现的。具体可以从类注释中查询到。由于协议的不同可能导致jdk1.8的解码jdk1.7编码的数据时抛出java.lang.IllegalArgumentExcep…

争自动驾驶领头羊还是确保技术安全?欧美选择不同

来源:发掘新视界摘要:对于那些未知或有潜在危险的技术,欧洲更倾向于保护民众,而非是引领创新与进步。自优步自动驾驶汽车致人死亡事件发生之后,欧洲与美国对于技术的态度差异再度凸显,欧洲更倾向于加强监管…