Django中使用Celery
一、前言Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。下面将描述Django的请求处理大致流程(图片来源于网络): 请求过程简单说明:浏览器发起请求-->请求处理-->请求经过中间件-->路由映射-->视图处理业务逻辑-->响应请求(template或response) 二、配置使用celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。 配置新建立项目taskproj,目录结构(每个app下多了个tasks文件,用于定义任务): 在项目目录taskproj/taskproj/目录下新建celery.py:
celery os.environ.setdefault(<span style="color: #800000;">'<span style="color: #800000;">DJANGO_SETTINGS_MODULE<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">taskproj.settings<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 设置django环境
<span style="color: #000000;"> app = Celery(<span style="color: #800000;">'<span style="color: #800000;">taskproj<span style="color: #800000;">'<span style="color: #000000;">) app.config_fromobject( <span style="color: #800000;">'<span style="color: #800000;">django.conf:settings<span style="color: #800000;">',namespace=<span style="color: #800000;">'<span style="color: #800000;">CELERY<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 使用CELERY 作为前缀,在settings中写配置<span style="color: #000000;"> app.autodiscover_tasks() <span style="color: #008000;">#<span style="color: #008000;"> 发现任务文件每个app下的task.py taskproj/taskproj/__init__.py: from .celery import app as celery_app = []
taskproj/taskproj/settings.py CELERY_BROKER_URL =
=
=
进入项目的taskproj目录启动worker: celery worker -A taskproj -l debug
定义与触发任务任务定义在每个tasks文件中,app01/tasks.py: celery @shared_task
<span style="color: #0000ff;">def<span style="color: #000000;"> add(x,y): <span style="color: #0000ff;">return x +<span style="color: #000000;"> y @shared_task <span style="color: #0000ff;">return x * y 视图中触发任务 django.http app01 <span style="color: #008000;">#<span style="color: #008000;"> Create your views here.
<span style="color: #0000ff;">def index(request,*args,**<span style="color: #000000;">kwargs):res=tasks.add.delay(1,3<span style="color: #000000;">) <span style="color: #008000;">#<span style="color: #008000;">任务逻辑 <span style="color: #0000ff;">return JsonResponse({<span style="color: #800000;">'<span style="color: #800000;">status<span style="color: #800000;">':<span style="color: #800000;">'<span style="color: #800000;">successful<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">task_id<span style="color: #800000;">':res.task_id}) 访问http://127.0.0.1:8000/index ?若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取: 扩展除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。 1.安装 pip install django-celery-results
2.配置settings.py,注册app INSTALLED_APPS =
4.修改backend配置,将redis改为django-db
=
5.修改数据库 python3 manage.py migrate django_celery_results
此时会看到数据库会多创建: task_id </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task id</span><span style="color: #800000;">'</span>),max_length=255,unique=<span style="color: #000000;">True)
task_name </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task name</span><span style="color: #800000;">'</span>),null=True,max_length=255<span style="color: #000000;">)
task_args </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task arguments</span><span style="color: #800000;">'</span>),null=<span style="color: #000000;">True)
task_kwargs </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task kwargs</span><span style="color: #800000;">'</span>),null=<span style="color: #000000;">True)
status </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">state</span><span style="color: #800000;">'</span>),max_length=50<span style="color: #000000;">,default</span>=<span style="color: #000000;">states.PENDING,choices</span>=<span style="color: #000000;">TASK_STATE_CHOICES
)
content_type </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">content type</span><span style="color: #800000;">'</span>),max_length=128<span style="color: #000000;">)
content_encoding </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">content encoding</span><span style="color: #800000;">'</span>),max_length=64<span style="color: #000000;">)
result </span>= models.TextField(null=True,default=None,editable=<span style="color: #000000;">False)
date_done </span>= models.DateTimeField(_(<span style="color: #800000;">'</span><span style="color: #800000;">done at</span><span style="color: #800000;">'</span>),auto_now=<span style="color: #000000;">True)
traceback </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">traceback</span><span style="color: #800000;">'</span>),blank=True,null=<span style="color: #000000;">True)
hidden </span>= models.BooleanField(editable=False,default=False,db_index=<span style="color: #000000;">True)
meta </span>= models.TextField(null=True,editable=<span style="color: #000000;">False)
objects </span>=<span style="color: #000000;"> managers.TaskResultManager()
</span><span style="color: #0000ff;">class</span><span style="color: #000000;"> Meta:
</span><span style="color: #800000;">"""</span><span style="color: #800000;">Table information.</span><span style="color: #800000;">"""</span><span style="color: #000000;">
ordering </span>= [<span style="color: #800000;">'</span><span style="color: #800000;">-date_done</span><span style="color: #800000;">'</span><span style="color: #000000;">]
verbose_name </span>= _(<span style="color: #800000;">'</span><span style="color: #800000;">task result</span><span style="color: #800000;">'</span><span style="color: #000000;">)
verbose_name_plural </span>= _(<span style="color: #800000;">'</span><span style="color: #800000;">task results</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> as_dict(self):
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> {
</span><span style="color: #800000;">'</span><span style="color: #800000;">task_id</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_id,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_name</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_name,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_args</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_args,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_kwargs</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_kwargs,</span><span style="color: #800000;">'</span><span style="color: #800000;">status</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.status,</span><span style="color: #800000;">'</span><span style="color: #800000;">result</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.result,</span><span style="color: #800000;">'</span><span style="color: #800000;">date_done</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.date_done,</span><span style="color: #800000;">'</span><span style="color: #800000;">traceback</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.traceback,</span><span style="color: #800000;">'</span><span style="color: #800000;">meta</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.meta,}
</span><span style="color: #0000ff;">def</span> <span style="color: #800080;">__str__</span><span style="color: #000000;">(self):
</span><span style="color: #0000ff;">return</span> <span style="color: #800000;">'</span><span style="color: #800000;"><Task: {0.task_id} ({0.status})></span><span style="color: #800000;">'</span>.format(self)</pre>
三、Django中使用定时任务如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。 安装配置1.beat插件安装 pip3 install django-celery-beat
2.注册APP INSTALLED_APPS =
3.数据库变更 python3 manage.py migrate django_celery_beat
4.分别启动woker和beta celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
-A taskproj -l info
5.配置admin urls.py
django.conf.urls django.contrib urlpatterns =<span style="color: #000000;"> [
url(r<span style="color: #800000;">'<span style="color: #800000;">^admin/<span style="color: #800000;">'<span style="color: #000000;">,admin.site.urls),] 6.创建用户 python3 manage.py createsuperuser
7.登录admin进行管理(地址http://127.0.0.1:8000/admin)并且还可以看到我们上次使用orm作为结果存储的表。 http://127.0.0.1:8000/admin/login/?next=/admin/ ?使用示例: ?查看结果: 二次开发django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例: settings.py INSTALLED_APPS =
urls.py urlpatterns =,views.TaskView.as_view({:
views.py django_celery_beat.models PeriodicTask
rest_framework rest_framework rest_framework.viewsets ==
<span style="color: #0000ff;">class <span style="color: #000000;"> Mypagination(pagination.PageNumberPagination):<span style="color: #800000;">"""<span style="color: #800000;">自定义分页<span style="color: #800000;">"""<span style="color: #000000;"> page_size=2<span style="color: #000000;"> page_query_param = <span style="color: #800000;">'<span style="color: #800000;">p<span style="color: #800000;">'<span style="color: #000000;"> page_size_query_param=<span style="color: #800000;">'<span style="color: #800000;">size<span style="color: #800000;">'<span style="color: #000000;"> max_page_size=4 <span style="color: #0000ff;">class <span style="color: #000000;"> TaskView(ModelViewSet):queryset =<span style="color: #000000;"> PeriodicTask.objects.all() serializer_class =<span style="color: #000000;"> Userserializer permission_classes =<span style="color: #000000;"> [] pagination_class = Mypagination 访问http://127.0.0.1:8000/tasks如下: (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |