加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

Flask Celery update_state来自另一个函数

发布时间:2020-12-20 13:14:45 所属栏目:Python 来源:网络整理
导读:我想从另一个函数更新我的Celery任务的状态.这就是我现在拥有的: 路线 @app.route('/my-long-function',methods=['POST'])def my_long_function(): param1 = request.form['param1'] param2 = request.form['param2'] task = outside_function.delay(param1
我想从另一个函数更新我的Celery任务的状态.这就是我现在拥有的:

路线

@app.route('/my-long-function',methods=['POST'])
def my_long_function():

    param1 = request.form['param1']
    param2 = request.form['param2']

    task = outside_function.delay(param1,param2)

    return task.id

Celery Task – 在后台启动some_python_script.handle

@celery.task(name='outside_function')
def outside_function(param1,param2):
    with app.app_context():
        some_python_script.handle(param1,param2)

some_python_script.handle:

def handle(param1,param2):
    param1 + param2
    # many,many different things

理想情况下,我希望能够自我更新芹菜任务,以便我可以轻松地从我的应用程序请求其状态,如下所示:

some_python_script.handle(理想情况下):

def handle(param1,many different things
    self.outside_function.update_state('PROGRESS',meta = {'status':'progressing'})

检查进度(理想情况下):

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = outside_function.AsyncResult(task_id)
    response = {
    'state': task.state,'id': task.id,'status' : task.status,}

    return jsonify(response)

或类似的东西.非常感谢任何帮助,我对芹菜很新!

解决方法

您应该声明要调用的任务ID.
你可以查看 update_state.

以下代码应该有效.

# capture id of celery task
ID = self.request.id

def handle(param1,many different things
    # update the state of celery task with direct reference to it
    self.update_state(task_id=ID,state='PROGRESS',meta = {'status':'progressing'})

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读