Linux Celery-一个会做异步任务,定时任务的芹菜
Celery 分布式任务队列同步与异步比如说你要去一个餐厅吃饭,你点完菜以后假设服务员告诉你,你点的菜,要两个小时才能做完,这个时候你可以有两个选择
? 所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。 ? 所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。 阻塞与非阻塞继续上面的例子
? 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的 同步/异步与阻塞/非阻塞同步阻塞形式 效率最低。拿上面的例子来说,就是你专心的在餐馆等着,什么别的事都不做。 异步阻塞形式 在家里等待的过程中,你一直盯着手机,不去做其它的事情,那么很显然,你被阻塞在了这个等待的操作上面; 异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。 同步非阻塞形式 实际上是效率低下的。 想象一下你如果害怕服务员忘记给你打电话通知你,你过一会就要去餐厅看一下你的饭菜好了没有,没好,在回家等待,过一会再去看一眼,没好再回家等着,那么效率可想而知是低下的。 异步非阻塞形式 ? 比如说你回家以后就直接看电视了,把手机放在一边,等什么时候电话响了,你在去接电话.这就是异步非阻塞形式,大家想一下这样是不是效率是最高的 ? 那么同步一定是阻塞的吗?异步一定是非阻塞的吗? 生产者消费者模型在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。 单单抽象出生产者和消费者,还够不上是生产者消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示 ? 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过消息队列(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给消息队列,消费者不找生产者要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个消息队列就是用来给生产者和消费者解耦的。------------->这里又有一个问题,什么叫做解耦? 解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。 因为太抽象,看过网上的说明之后,通过我的理解,我举了个例子:吃包子。 假如你非常喜欢吃包子(吃起来根本停不下来),今天,你妈妈(生产者)在蒸包子,厨房有张桌子(缓冲区),你妈妈将蒸熟的包子盛在盘子(消息)里,然后放到桌子上,你正在看巴西奥运会,看到蒸熟的包子放在厨房桌子上的盘子里,你就把盘子取走,一边吃包子一边看奥运。在这个过程中,你和你妈妈使用同一个桌子放置盘子和取走盘子,这里桌子就是一个共享对象。生产者添加食物,消费者取走食物。桌子的好处是,你妈妈不用直接把盘子给你,只是负责把包子装在盘子里放到桌子上,如果桌子满了,就不再放了,等待。而且生产者还有其他事情要做,消费者吃包子比较慢,生产者不能一直等消费者吃完包子把盘子放回去再去生产,因为吃包子的人有很多,如果这期间你好朋友来了,和你一起吃包子,生产者不用关注是哪个消费者去桌子上拿盘子,而消费者只去关注桌子上有没有放盘子,如果有,就端过来吃盘子中的包子,没有的话就等待。对应关系如下图: ? celery生产者消费者模型消费者from celery import Celery task=Celery(‘task‘,broker="redis://10.211.55.19:6379") #task可以是任何名称,后面跟的是队列的缓存者,celery中一般称为中间人,如果要是密码访问的话,需要是redis://:{pass}@IP地址:端口 @task.task def add(a,b): return a+b
启动?celery从4.0版本以后就不在支持windows了,如果想在windows环境下使用的话,需要安装eventlet这个包,启动的时候需要指定-P eventlet celery worker -A c -l info 生产者 from c import add for i in range(10): add.delay(1,2) 模拟两个消费者 在不同的位置在启动一个worker既可以了 celery worker -A c -l info 生产者消费者模型升级消费者from celery import Celery task=Celery(‘task‘,broker="redis://10.211.55.19:6379/0",backend="redis://10.211.55.19:6379/2")#broker和backend可以是不同的队列,这里使用redis不同的库来模拟不同的队列,当然也可以一样 @task.task def add(a,b): return a+b 启动过程跟上面一样 生产者from c import add for i in range(10): t=add.delay(i,2) print(t.get()) #获取结果 登录redis查看信息redis-cli
127.0.0.1:6379[1]> SELECT 2
127.0.0.1:6379[2]> KEYS *
127.0.0.1:6379[2]> get celery-task-meta-6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42
"{"status": "SUCCESS","result": 3,"traceback": null,"children": [],"task_id": "6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42"}"
## 解析数据 d="{"status": "SUCCESS","task_id": "6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42"}" import json print(json.loads(d))
获取执行状态 倘若任务抛出了一个异常, get() 会重新抛出异常, 但你可以指定 propagate 参数来覆盖这一行为: result.get(propagate=False)
如果任务抛出了一个异常,你也可以获取原始的回溯信息: result.traceback…
print(t) print(t.ready()) print(t.get()) print(t.ready())
定时任务apply_asynct=add.apply_async((1,2),countdown=5) #表示延迟5秒钟执行任务 print(t) print(t.get()) 问题:是延迟5秒发送还是立即发送,消费者延迟5秒在执行那? 支持的参数 :
周期任务from c import task task.conf.beat_schedule={ timezone=‘Asia/Shanghai‘,"each10s_task":{ "task":"c.add","schedule":3,# 每3秒钟执行一次 "args":(10,10) },} 其实celery也支持linux里面的crontab格式的书写的 from celery.schedules import crontab task.conf.beat_schedule={ timezone=‘Asia/Shanghai‘,"each3m_task":{ "task":"c.add","schedule":crontab(minute=3),#每小时的第3分钟执行 "args":(10,10) },"schedule":crontab(minute=*/3),}
后台启动worker: celery multi start worker1 -A c --pidfile="$HOME/run/celery/%n.pid" --logfile="$HOME/log/celery/%n%I.log" celery multi restart worker1 -A proj --logfile="$HOME/log/celery/%n%I.log" --pidfile="$HOME/run/celery/%n.pid celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid" beat: celery -A d beat --detach -l info -f beat.log 与django结合1.执行异步任务1.1 在生成的目录文件中添加celery文件,内容如下from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
# set the default Django settings module for the ‘celery‘ program. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘,‘tests.settings‘) #与项目关联 app = Celery(‘tests‘,backend=‘redis://10.211.55.19/3‘,broker=‘redis://10.211.55.19/4‘) #创建celery对象 # Using a string here means the worker doesn‘t have to serialize # the configuration object to child processes. # - namespace=‘CELERY‘ means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object(‘django.conf:settings‘,namespace=‘CELERY‘) #在django中创建celery的命名空间 # Load task modules from all registered Django app configs. app.autodiscover_tasks() #自动加载任务
1.2编辑settings.py同级目录的init.pyfrom __future__ import absolute_import,unicode_literals from .celery import app as celery_app __all__ = [‘celery_app‘] 1.3 在项目中添加tasks文件,用来保存tasks的文件from celery import shared_task @shared_task def add(x,y): return x + y @shared_task def mul(x,y): return x * y @shared_task def xsum(numbers): return sum(numbers) 1.4添加views文件内容from .tasks import add def index(request): result = add.delay(2,3) return HttpResponse(‘返回数据{}‘.format(result.get())) 1.5 启动workercelery -A tests worker -l info 1.6添加url并调用2.执行周期性任务2.1需要安装一个django的组件来完成这个事情pip install django-celery-beat 2.2将django-celery-beat添加到INSTALLED_APPS里面INSTALLED_APPS = (
...,‘django_celery_beat‘,)
2.3刷新到数据库python3 manage.py makemigrations #不执行这个会有问题 python3 manage.py migrate 2.4 admin配置2.5启动beatcelery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 2.6 启动workercelery -A tests worker -l info (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |