加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – 在芹菜中有错误队列

发布时间:2020-12-20 13:43:46 所属栏目:Python 来源:网络整理
导读:在芹菜中有没有办法,如果任务执行失败,我可以自动将它放入另一个队列. 例如,任务在队列x中运行,异常将其排入另一个名为error_x的队列 编辑: 目前我使用celery == 3.0.13以及django 1.4,Rabbitmq作为经纪人. 有时候任务失败了.在芹菜中有没有办法将消息添加
在芹菜中有没有办法,如果任务执行失败,我可以自动将它放入另一个队列.

例如,任务在队列x中运行,异常将其排入另一个名为error_x的队列

编辑:

目前我使用celery == 3.0.13以及django 1.4,Rabbitmq作为经纪人.

有时候任务失败了.在芹菜中有没有办法将消息添加到错误队列并稍后处理它.

celery任务失败时的问题是我无法访问消息队列名称.所以我不能使用self.retry重试将它放到不同的错误队列中.

解决方法

好吧,如果要将任务路由到另一个队列,则无法使用重试机制.来自文档:

retry() can be used to re-execute the task,for example in the event
of recoverable errors.

When you call retry it will send a new message,using the same
task-id,and it will take care to make sure the message is delivered
to the same queue as the originating task.

如果出现任何异常,您必须重新启动并手动将其路由到您想要的队列.这似乎是一个很好的工作error callbacks.

主要问题是我们需要在错误回调中获取任务名称才能启动它.此外,我们可能不希望每次启动任务时都添加回调.因此,装饰器将是自动添加正确回调的好方法.

from functools import partial,wraps

import celery


@celery.shared_task
def error_callback(task_id,task_name,retry_queue,retry_routing_key):
    # We must retrieve the task object itself.
    # `tasks` is a dict of 'task_name': celery_task_object
    task = celery.current_app.tasks[task_name]
    # Re launch the task in specified queue.
    task.apply_async(queue=retry_queue,routing_key=retry_routing_key)


def retrying_task(retry_queue,retry_routing_key):
    """Decorates function to automatically add error callbacks."""
    def retrying_decorator(func):
        @celery.shared_task
        @wraps(func)  # just to keep the original task name
        def wrapper(*args,**kwargs):
            return func(*args,**kwargs)
        # Monkey patch the apply_async method to add the callback.
        wrapper.apply_async = partial(
            wrapper.apply_async,link_error=error_callback.s(wrapper.name,retry_routing_key)
        )
        return wrapper
    return retrying_decorator


# Usage:
@retrying_task(retry_queue='another_queue',retry_routing_key='another_routing_key')
def failing_task():
    print 'Hi,I will fail!'
    raise Exception("I'm failing!")

failing_task.apply_async()

您可以调整装饰器以传递您需要的任何参数.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读