Celery 基本使用
1. 认识 CeleryCelery 是一个 基于 Python 开发的分布式异步消息任务队列,可以实现任务异步处理,制定定时任务等。
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用 优点:
Celery 构成 Celery 主要模块:
2. 异步任务实现异步任务步骤:
1、安装 pip3 install 'celery[redis]' pip3 install celery 2、创建 Celery 实例
# -*- coding: utf-8 -*- import time from celery import Celery broker = 'redis://127.0.0.1:6379' # 消息中间件 backend = 'redis://127.0.0.1:6379/0' # backend ,存储结果 app = Celery('my_task',broker=broker,backend=backend) # 创建实例 # 创建一个任务,5s 后执行 @app.task(name='tasks.add') def add(x,y): time.sleep(5) # 模拟耗时操作 return x + y 3、启动 Celery Worker 打开 Ubuntu 终端,输入: 参数:
4、调用任务 另起一个终端,进入 Python 环境,执行任务: # Celery 提供两种方法来调用任务,delay() 或 apply_async() 方法 python3 >>> from tasks import add >>> add.delay(6,8) # 调用任务,并返回一个任务 ID <AsyncResult: 194e99af-d0bd-481b-a500-433ec19117e4> 判断任务是否完成: >>> result = add.delay(6,8) >>> result.ready() # True 表示已完成 True 获取任务结果: >>> result.get() 14
参考博客:Received unregistered task of type ‘XXX’ Celery报错 3. 项目中使用 celerycelery 还可以配置成一个应用,放置在项目中使用,其结构为:
1、 from __future__ import absolute_import,unicode_literals # 将相对路径转换为绝对路径 from celery import Celery # 创建一个Celery的实例 app = Celery('tasks',# redis://:[email?protected]:port/db_number 有密码认证的连接 broker='redis://127.0.0.1:6379',# broker='redis://:密码@192.168.2.105:6379/0',backend='redis://127.0.0.1:6379/0',# 用于Celery的返回结果的接收 include=['proj.tasks'] # 用于声明Celery要执行的tasks任务的位置 ) # 配置结果超时时间 app.conf.update( result_expires=3600,# Celery结果存在中间件Redis的超时时间[仅针对当前的Celery的App] ) if __name__ == '__main__': app.start() 2、 from __future__ import absolute_import,unicode_literals from .celery import app # 从我的Celery中导入App import time @app.task(name='tasks.add') # 需要配置 name='tasks.add',否则报 Received unregistered task of type 'app.tasks.add'. def add(x,y): time.sleep(10) return x + y @app.task(name='tasks.mul') def mul(x,y): time.sleep(10) return x * y 3、启动 worker,分为前台和后台启动(无需关心起行为): # 前台 celery -A proj worker -l info 运行结果如下: 4、调用任务: # 在这里使用终端调用,也可以再项目中调用 >>> from proj.tasks import add,mul >>> result1 = add.delay(5,8) >>> result2 = mul.delay(5,8) >>> result1.get() # 取值 13 >>> result2.get() 40 将 # w1:worker celery multi start w1 -A proj -l info # 启动 worker celery multi restart w1 -A proj -l info # 重启 celery multi stop w1 -A proj -l info # 关闭 ps -ef | grep celery # 查看目前还有几个 worker 正在运行 参考文章
4. 定时任务celery 通过 4.1 小试牛刀1、新建一个 from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender,**kwargs): # 每过 10 s,执行一次 hello sender.add_periodic_task(10.0,test.s('hello'),name='add every 10') # 每过 30 s,执行一次 world sender.add_periodic_task(30.0,test.s('world'),expires=10) # 每周一七点三十执行一次 Happy Mondays! sender.add_periodic_task( crontab(hour=7,minute=30,day_of_week=1),test.s('Happy Mondays!'),) @app.task def test(arg): print(arg) 也可以配置成下面这样,或许更好理解: # 可以配置多个 app.conf.beat_schedule = { 'add-every-30-seconds': { # 任务名字 'task': 'tasks.add',# 执行 tasks 中的 add 函数 'schedule': 30.0,# 时间,也可以用 timedelta(seconds=20),'args': (16,16) # 参数 },} app.conf.timezone = 'UTC' # 时区 2、启动 [email?protected]:~/桌面/c1$ celery -A task1 beat 3、启动 [email?protected]:~/桌面/c1$ celery -A task1 worker 从上图中可以看到,每过 10s,就会输出一个
# beat 运行时,会产生几个文件 [email?protected]:~/桌面/c1$ ls celerybeat.pid celerybeat-schedule __pycache__ task1.py # 指定文件位置 celery -A task1 beat -s /home/celery/var/run/celerybeat-schedule 4.2 使用 crontab 构建复杂定时任务如果你只是想每过多少秒输出一个 from celery.schedules import crontab from datetime import timedelta app.conf.beat_schedule = { # 任务一 'sum-task':{ # 任务名 'task':'tasks.add',# 执行 tasks.py 中的 add 函数 'schedule':timedelta(seconds=20),# 时间 'args':(5,6) # 参数 },# 任务二 'multi-task': { 'task': 'tasks.multi','schedule': crontab(hour=4,'args': (3,4) } } 更多关于
参考文章
5. Django 中使用 Celery5.1 构建简单的异步任务- project/ # 项目主目录 - app/ # app - urls.py - views.py - tasks.py # celery 任务,名字必须是 tasks.py - project/ # 项目文件 - __init__.py - settings.py - urls.py - celery.py # 创建 Celery 实例,加载 redis 配置文件 - manage.py 在 Django 中使用 Celery ,依赖 pip3 install django_celery_beat 并将其添加到 INSTALLED_APPS = [ 'django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','app','django_celery_beat',] ... # redis 连接 CELERY_BROKER_URL = 'redis://127.0.0.1:6397' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6397/0' 1、 from __future__ import absolute_import,unicode_literals import os from celery import Celery,platforms # 使用 Django 环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE','Project.settings') app = Celery('celery_task') app.config_from_object('django.conf:settings',namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() # 运行 root 用户运行 celery platforms.C_FORCE_ROOT = True @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) 2、 from __future__ import absolute_import,unicode_literals # 确保导入应用,Django 启动就能使用 app from .celery import app as celery_app __all__ = ['celery_app'] 3、创建任务 from __future__ import absolute_import,unicode_literals from celery import shared_task import time @shared_task def add(x,y): time.sleep(10) return x + y @shared_task def multi(x,y): time.sleep(10) return x * y
4、视图中调用任务
from django.shortcuts import render,HttpResponse from celery.result import AsyncResult def celery_test(request): # 调用任务 task = add.delay(4,22) return HttpResponse(task.id) # 获取任务 id def celery_res(request): # 获取任务结果 task_id = 'b3fbe0da-57bb-4055-aea2-160afd6ae801' res = AsyncResult(id=task_id) return HttpResponse(res.get()) # 获取结果 5、路由配置 path('celery_test/',views.celery_test,name='celery_test'),path('celery_result/',views.celery_result,name='celery_result'), 6、打开终端启动 celery -A project worker -l info 访问 访问 因为这是异步处理的,所有再执行任务时,其他代码照样执行。 5.2 在 Django 中使用定时任务在 Django 也能设置定时任务,依赖于 1、在 from celery.schedules import crontab from datetime import timedelta app.conf.update( CELERYBEAT_SCHEDULE = { # 任务一 'sum-task':{ 'task':'app.tasks.add','schedule':timedelta(seconds=20),'args':(5,6) },# 任务二 'multi-task': { 'task': 'app.tasks.multi',4) } } ) 在上面添加了两个定时任务
启动 celery -A project beat -l info timedelta timedelta 是datetime 的一个对象,需要引入
crontab
总结
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |