加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – 我想将gevent.evnet用于celery.task

发布时间:2020-12-16 22:00:00 所属栏目:Python 来源:网络整理
导读:我一直在研究长轮询系统.我用烧瓶mongokit芹菜gevent. 当进入芹菜任务时,完成gevent.event.set()不能正常工作.所以我想帮助解决这个问题.(我之所以与celery同时使用gevent,在Notification系统中有很大的处理过程) 这是我的示例代码. #server.py @celery.task

我一直在研究长轮询系统.我用烧瓶mongokit芹菜gevent.

当进入芹菜任务时,完成gevent.event.set()不能正常工作.所以我想帮助解决这个问题.(我之所以与celery同时使用gevent,在Notification系统中有很大的处理过程)

这是我的示例代码.

 #server.py
 @celery.task()
 def doing_task(uid,message):
     notification = Notification() # this is a notification Model
     notification.add(request.args.get('to'),some_notification)
     app.event.set()
     app.event.clear()

 @app.route('/main')
 def main():
     return render_template('main.html')

 @app.route('/set')
 def set():
     doing_task.delay(request.args.get('uid'),'Notify')
     return 'OK'

 @app.route('/poll')
 def poll():
     uid = request.args.get('uid')
     app.event.wait()
     if is_authorized(uid): #uid 1 is a authorized account
         return Notification().get(uid)

 #main.html
 
最佳答案
现在,在Flask 0.9中添加了Flask.app_context,使用Flask.app_context可以获得当前的上下文.

见Application Context.

例如,

from flask import Flask
from celery import Celery

app = Flask(__name__)
celery = Celery(__name__)

@celery.task
def hello():
    # Also,you are able to deal with current request as use test_request_context 
    with app.app_context():
        print current_app
    with app.test_request_context() as request:
        print('Hello {0!r}'.format(request))

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读