分布式任务队列Celery入门与进阶
一、简介Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:
应用场景举例: 1.web应用:当用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站的并发以及用户的体验感。 2.任务场景:比如在运维场景下需要批量在几百台机器执行某些命令或者任务,此时Celery可以轻松搞定。 3.定时任务:向定时导数据报表、定时发送通知类似场景,虽然Linux的计划任务可以帮我实现,但是非常不利于管理,而Celery可以提供管理接口和丰富的API。 二、架构&工作原理Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend),如下图:
工作原理:
消息中间件Broker消息中间件Broker官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ。 任务执行单元WorkerWorker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。 结果存储BackendBackend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、?Redis、Memcached,SQLAlchemy,Django ORM、Apache Cassandra、Elasticsearch。 三、安装使用这里我使用的redis作为消息中间件,redis安装可以参考https://www.cnblogs.com/wdliu/p/9360286.html。 Celery安装: pip3 install celery
简单使用目录结构: project/
各目录文件说明: __init__.py:初始化Celery以及加载配置文件
celery = Celery()
app.config_from_object()
config.py:? Celery相关配置文件,更多配置参考:http://docs.celeryproject.org/en/latest/userguide/configuration.html
=
=
=
= 60 * 60 * 24
=
= (
tasks.py :任务定义文件 <span style="color: #0000ff;">from project <span style="color: #0000ff;">import<span style="color: #000000;"> app@app.task <span style="color: #0000ff;">def<span style="color: #000000;"> show_name(name): <span style="color: #0000ff;">return name 启动Worker: celery worker -A project -l debug
各个参数含义: worker: 代表第启动的角色是work当然还有beat等其他角色; -A :项目路径,这里我的目录是project -l:启动的日志级别,更多参数使用celery --help查看 查看日志输出,会发现我们定义的任务,以及相关配置: 虽然启动了worker,但是我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果: AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:
四、进阶使用对于普通的任务来说可能满足不了我们的任务需求,所以还需要了解一些进阶用法,Celery提供了诸多调度方式,例如任务编排、根据任务状态执行不同的操作、重试机制等,以下会对常用高阶用法进行讲述。 定时任务&计划任务Celery的提供的定时任务主要靠schedules来完成,通过beat组件周期性将任务发送给woker执行。在示例中,新建文件period_task.py,并添加任务到配置文件中: period_task.py:
project celery.schedules @app.on_after_configure.connect
<span style="color: #0000ff;">def setup_periodic_tasks(sender,**<span style="color: #000000;">kwargs): sender.add_periodic_task(10.0,add.s(1,3),name=<span style="color: #800000;">'<span style="color: #800000;">1+3=<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 每10秒执行add <span style="color: #000000;"> sender.add_periodic_task( crontab(hour=16,minute=56,day_of_week=1),<span style="color: #008000;">#<span style="color: #008000;">每周一下午四点五十六执行sayhai sayhi.s(<span style="color: #800000;">'<span style="color: #800000;">wd<span style="color: #800000;">'),name=<span style="color: #800000;">'<span style="color: #800000;">say_hi<span style="color: #800000;">'<span style="color: #000000;"> ) @app.task @app.task config.py
=
=
=
= 60 * 60 * 24
=
= ( ,
)
启动worker和beat: celery worker -A project -l debug
celery beat -A project.period_task -l debug
我们可以观察worker日志: 还可以通过配置文件方式指定定时和计划任务,此时的配置文件如下: <span style="color: #0000ff;">from project <span style="color: #0000ff;">import<span style="color: #000000;"> app<span style="color: #0000ff;">from celery.schedules <span style="color: #0000ff;">import<span style="color: #000000;"> crontab BROKER_URL = <span style="color: #800000;">'<span style="color: #800000;">redis://10.1.210.69:6379/0<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> Broker配置,使用Redis作为消息中间件<span style="color: #000000;"> CELERY_RESULT_BACKEND = <span style="color: #800000;">'<span style="color: #800000;">redis://10.1.210.69:6379/0<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> BACKEND配置,这里使用redis <span style="color: #000000;"> CELERY_RESULT_SERIALIZER = <span style="color: #800000;">'<span style="color: #800000;">json<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> 结果序列化方案 <span style="color: #000000;"> CELERY_TASK_RESULT_EXPIRES = 60 60 24 <span style="color: #008000;">#<span style="color: #008000;"> 任务过期时间 <span style="color: #000000;"> CELERY_TIMEZONE=<span style="color: #800000;">'<span style="color: #800000;">Asia/Shanghai<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> 时区配置 <span style="color: #000000;"> CELERY_IMPORTS = ( <span style="color: #008000;">#<span style="color: #008000;"> 指定导入的任务模块,<span style="color: #800000;">'<span style="color: #800000;">project.period_task<span style="color: #800000;">'<span style="color: #000000;">,) app.conf.beat_schedule =<span style="color: #000000;"> { 此时的period_task.py只需要注册到woker中就行了,如下:
project @app.task
<span style="color: #0000ff;">def<span style="color: #000000;"> add(x,y): <span style="color: #0000ff;">print(x+<span style="color: #000000;">y) <span style="color: #0000ff;">return x+<span style="color: #000000;">y @app.task <span style="color: #0000ff;">return <span style="color: #800000;">'<span style="color: #800000;">hello %s<span style="color: #800000;">' % name 同样启动worker和beat结果和第一种方式一样。更多详细的内容请参考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules 任务绑定Celery可通过任务绑定到实例获取到任务的上下文,这样我们可以在任务运行时候获取到任务的状态,记录相关日志等。 修改任务中的period_task.py,如下:
project celery.utils.log = get_task_logger(=True)
)
=10]==1
self.retry(exc=e,countdown=5,max_retries=3)
x+
在以上代码中,通过bind参数将任务绑定,self指任务的上下文,通过self获取任务状态,同时在任务出错时进行任务重试,我们观察日志: 内置钩子函数Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行),在进行使用是我们只需要重写这些方法,完成相应的操作即可。 在以下示例中,我们继续修改period_task.py,分别定义三个任务来演示任务失败、重试、任务成功后执行的操作:
project celery.utils.log celery logger = get_task_logger(<span style="color: #800080;">name<span style="color: #000000;">)
<span style="color: #0000ff;">class<span style="color: #000000;"> demotask(Task):
@app.task(base =demotask,bind=<span style="color: #000000;">True)<span style="color: #0000ff;">def<span style="color: #000000;"> add(self,y): <span style="color: #0000ff;">try<span style="color: #000000;">: a=<span style="color: #000000;">[] a[10]==1 <span style="color: #0000ff;">except<span style="color: #000000;"> Exception as e: <span style="color: #0000ff;">raise self.retry(exc=e,max_retries=1) <span style="color: #008000;">#<span style="color: #008000;"> 出错每5秒尝试一次,总共尝试1次 <span style="color: #0000ff;">return x+<span style="color: #000000;">y @app.task(base=<span style="color: #000000;">demotask) @app.task(base=<span style="color: #000000;">demotask) 此时的配置文件config.py: <span style="color: #0000ff;">from project <span style="color: #0000ff;">import<span style="color: #000000;"> app<span style="color: #0000ff;">from celery.schedules <span style="color: #0000ff;">import<span style="color: #000000;"> crontab BROKER_URL = <span style="color: #800000;">'<span style="color: #800000;">redis://10.1.210.69:6379/0<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> Broker配置,使用Redis作为消息中间件<span style="color: #000000;"> CELERY_RESULT_BACKEND = <span style="color: #800000;">'<span style="color: #800000;">redis://10.1.210.69:6379/0<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> BACKEND配置,这里使用redis <span style="color: #000000;"> CELERY_RESULT_SERIALIZER = <span style="color: #800000;">'<span style="color: #800000;">json<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> 结果序列化方案 <span style="color: #000000;"> CELERY_TASK_RESULT_EXPIRES = 60 60 24 <span style="color: #008000;">#<span style="color: #008000;"> 任务过期时间 <span style="color: #000000;"> CELERY_TIMEZONE=<span style="color: #800000;">'<span style="color: #800000;">Asia/Shanghai<span style="color: #800000;">' <span style="color: #008000;">#<span style="color: #008000;"> 时区配置 <span style="color: #000000;"> CELERY_IMPORTS = ( <span style="color: #008000;">#<span style="color: #008000;"> 指定导入的任务模块,) app.conf.beat_schedule =<span style="color: #000000;"> { 然后重启worker和beat,查看日志: 任务编排在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery同样也能实现这样的任务,完成这类型的任务通过以下模块完成:
修改tasks.py:
project @app.task
<span style="color: #0000ff;">def<span style="color: #000000;"> add(x,y): <span style="color: #0000ff;">return x+<span style="color: #000000;">y @app.task <span style="color: #0000ff;">return x*<span style="color: #000000;">y @app.task res=<span style="color: #000000;">0 <span style="color: #0000ff;">for i <span style="color: #0000ff;">in<span style="color: #000000;"> data_list: res+=<span style="color: #000000;">i <span style="color: #0000ff;">return res group: 组任务,组内每个任务并行执行 和project同级目录新建consumer.py如下: celery project.tasks = group(add.s(1,2),2))()
(
结果: chain:链式任务 链式任务中,默认上一个任务的返回结果作为参数传递给子任务 celery project.tasks = chain(add.s(1,add.s(3),mul.s(3))()
(
还可以使用|表示链式任务,上面任务也可以表示为: res = (add.s(1,2) | add.s(3) | (mul.s(3
chord:任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body celery project.tasks = chord(header=[add.s(1,mul.s(3,4)],body=sum.s())()
(
<span style="color: #008000;"># <span style="color: #008000;">结果:<span style="color: #008000;"> <span style="color: #008000;">res:15chunks:任务分组,按照任务的个数分组 project.tasks = add.chunks(zip(range(5),range(5)),4)()
(
结果: <h3 id="delay-和-apply-async">delay &apply_async 对于delay和apply_async都可以用来进行任务的调度,本质上是delay对apply_async进行了再一次封装(或者可以说是快捷方式),两者都返回AsyncResult对象,以下是两个方法源码。 delay(self,*args,**
Does not support the extra options enabled by :meth:`apply_async`.
Arguments:
*args (Any): Positional arguments passed on to the task.
**kwargs (Any): Keyword arguments passed on to the task.
Returns:
celery.result.AsyncResult: Future promise.
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">return</span> self.apply_async(args,kwargs)</pre>
apply_async(self,args=None,kwargs=None,task_id=None,producer==None,link_error=None,shadow=None,**
Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
countdown (float): Number of seconds into the future that the
task should execute. Defaults to immediate execution.
eta (~datetime.datetime): Absolute time and date of when the task
should be executed. May not be specified if `countdown`
is also supplied.
expires (float,~datetime.datetime): Datetime or
seconds in the future for the task should expire.
The task won't be executed after the expiration time.
shadow (str): Override task name used in logs/monitoring.
Default is retrieved from :meth:`shadow_name`.
connection (kombu.Connection): Re-use existing broker connection
instead of acquiring one from the connection pool.
retry (bool): If enabled sending of the task message will be
retried in the event of connection loss or failure.
Default is taken from the :setting:`task_publish_retry`
setting. Note that you need to handle the
producer/connection manually for this to work.
retry_policy (Mapping): Override the retry policy used.
See the :setting:`task_publish_retry_policy` setting.
queue (str,kombu.Queue): The queue to route the task to.
This must be a key present in :setting:`task_queues`,or
:setting:`task_create_missing_queues` must be
enabled. See :ref:`guide-routing` for more
information.
exchange (str,kombu.Exchange): Named custom exchange to send the
task to. Usually not used in combination with the ``queue``
argument.
routing_key (str): Custom routing key used to route the task to a
worker server. If in combination with a ``queue`` argument
only used to specify custom routing keys to topic exchanges.
priority (int): The task priority,a number between 0 and 9.
Defaults to the :attr:`priority` attribute.
serializer (str): Serialization method to use.
Can be `pickle`,`json`,`yaml`,`msgpack` or any custom
serialization method that's been registered
with :mod:`kombu.serialization.registry`.
Defaults to the :attr:`serializer` attribute.
compression (str): Optional compression method
to use. Can be one of ``zlib``,``bzip2``,or any custom compression methods registered with
:func:`kombu.compression.register`.
Defaults to the :setting:`task_compression` setting.
link (Signature): A single,or a list of tasks signatures
to apply if the task returns successfully.
link_error (Signature): A single,or a list of task signatures
to apply if an error occurs while executing the task.
producer (kombu.Producer): custom producer to use when publishing
the task.
add_to_parent (bool): If set to True (default) and the task
is applied while executing another task,then the result
will be appended to the parent tasks ``request.children``
attribute. Trailing can also be disabled by default using the
:attr:`trail` attribute
publisher (kombu.Producer): Deprecated alias to ``producer``.
headers (Dict): Message headers to be included in the message.
Returns:
celery.result.AsyncResult: Promise of future evaluation.
Raises:
TypeError: If not enough arguments are passed,or too many
arguments are passed. Note that signature checks may
be disabled by specifying ``@task(typing=False)``.
kombu.exceptions.OperationalError: If a connection to the
transport cannot be made,or if the connection is lost.
Note:
Also supports all keyword arguments supported by
:meth:`kombu.Producer.publish`.
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">if</span><span style="color: #000000;"> self.typing:
</span><span style="color: #0000ff;">try</span><span style="color: #000000;">:
check_arguments </span>= self.<span style="color: #800080;">__header__</span>
<span style="color: #0000ff;">except</span> AttributeError: <span style="color: #008000;">#</span><span style="color: #008000;"> pragma: no cover</span>
<span style="color: #0000ff;">pass</span>
<span style="color: #0000ff;">else</span><span style="color: #000000;">:
check_arguments(</span>*(args <span style="color: #0000ff;">or</span> ()),**(kwargs <span style="color: #0000ff;">or</span><span style="color: #000000;"> {}))
app </span>=<span style="color: #000000;"> self._get_app()
</span><span style="color: #0000ff;">if</span><span style="color: #000000;"> app.conf.task_always_eager:
with denied_join_result():
</span><span style="color: #0000ff;">return</span> self.apply(args,task_id=task_id <span style="color: #0000ff;">or</span><span style="color: #000000;"> uuid(),link</span>=link,link_error=link_error,**<span style="color: #000000;">options)
</span><span style="color: #0000ff;">if</span> self.<span style="color: #800080;">__v2_compat__</span><span style="color: #000000;">:
shadow </span>= shadow <span style="color: #0000ff;">or</span><span style="color: #000000;"> self.shadow_name(self(),options)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
shadow </span>= shadow <span style="color: #0000ff;">or</span><span style="color: #000000;"> self.shadow_name(args,options)
preopts </span>=<span style="color: #000000;"> self._get_exec_options()
options </span>= dict(preopts,**options) <span style="color: #0000ff;">if</span> options <span style="color: #0000ff;">else</span><span style="color: #000000;"> preopts
options.setdefault(</span><span style="color: #800000;">'</span><span style="color: #800000;">ignore_result</span><span style="color: #800000;">'</span><span style="color: #000000;">,self.ignore_result)
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> app.send_task(
self.name,task_id</span>=task_id,producer=<span style="color: #000000;">producer,result_cls=<span style="color: #000000;">self.AsyncResult,shadow</span>=shadow,task_type=<span style="color: #000000;">self,</span>**<span style="color: #000000;">options
)</span></pre>
对于其使用,apply_async支持常用参数:
interval_start:重试等待时间
interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒 interval_max:最大等待时间 <span style="color: #008000;">#<span style="color: #008000;">###示例add.apply_async((1,retry=True,retry_policy=<span style="color: #000000;">{ <span style="color: #800000;">'<span style="color: #800000;">max_retries<span style="color: #800000;">': 1<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">interval_start<span style="color: #800000;">'<span style="color: #000000;">: 0,<span style="color: #800000;">'<span style="color: #800000;">interval_step<span style="color: #800000;">': 0.8<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">interval_max<span style="color: #800000;">': 5<span style="color: #000000;">,}) 更多参数参考:http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
?五、管理与监控Celery管理和监控功能是通过flower组件实现的,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理。 安装使用pip3 install flower
启动 flower -A project --port=5555
访问http:ip:5555 api使用,例如获取woker信息: curl http://127.0.0.1:5555/api/workers
结果: 更多api参考:https://flower.readthedocs.io/en/latest/api.html
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |