信号处理pika / python
发布时间:2020-12-16 21:30:38 所属栏目:Python 来源:网络整理
导读:我在消费者中使用pika.BlockingConnection,为每条消息执行一些任务.我还添加了信号处理功能,以便消费者在完成所有任务后正常死亡. 在处理消息并接收到信号时,我只是从函数中获得“信号接收”,但代码不会退出.所以,我决定检查在回调函数结束时收到的信号.问题
我在消费者中使用pika.BlockingConnection,为每条消息执行一些任务.我还添加了信号处理功能,以便消费者在完成所有任务后正常死亡.
在处理消息并接收到信号时,我只是从函数中获得“信号接收”,但代码不会退出.所以,我决定检查在回调函数结束时收到的信号.问题是,我检查信号的次数是多少,因为此代码中还有更多的功能.是否有更好的处理信号的方法而不会过度使用东西? import signal import sys import pika from time import sleep received_signal = False all_over = False def signal_handler(signal,frame): global received_signal print "signal received" received_signal = True signal.signal(signal.SIGINT,signal_handler) signal.signal(signal.SIGTERM,signal_handler) mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server,virtual_host='test')) mq_channel = mq_connection.channel() def callback(ch,method,properties,body): if received_signal: print "Exiting,as a kill signal is already received" exit(0) print body sleep(50) mq_channel.basic_ack(delivery_tag=method.delivery_tag) print "Message consumption complete" if received_signal: print "Exiting,as a kill signal is already received" exit(0) try: print ' [*] Waiting for messages. To exit press CTRL+C' mq_channel.basic_consume(callback,queue='test') mq_channel.start_consuming() except Exception: mq_channel.close() exit() 这是我的第一个问题,如果需要更多细节,请告诉我. 解决方法
我认为这可以满足您的需求:
#!/usr/bin/python import signal import sys import pika from contextlib import contextmanager received_signal = False processing_callback = False def signal_handler(signal,frame): global received_signal print "signal received" received_signal = True if not processing_callback: sys.exit() signal.signal(signal.SIGINT,signal_handler) @contextmanager def block_signals(): global processing_callback processing_callback = True try: yield finally: processing_callback = False if received_signal: sys.exit() def callback(ch,body): with block_signals: print body sum(xrange(0,200050000)) # sleep gets interrupted by signals,this doesn't. mq_channel.basic_ack(delivery_tag=method.delivery_tag) print "Message consumption complete" if __name__ == "__main__": try: mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) mq_channel = mq_connection.channel() print ' [*] Waiting for messages. To exit press CTRL+C' mq_channel.basic_consume(callback,queue='test') mq_channel.start_consuming() except Exception as e: mq_channel.close() sys.exit() 我使用了一个上下文管理器来处理阻塞信号,因此所有逻辑都隐藏在回调本身之外.这也应该使重用代码更容易.只是为了澄清它是如何工作的,它等同于: def callback(ch,body): global processing_callback processing_callback = True try: print body sum(xrange(0,200050000)) mq_channel.basic_ack(delivery_tag=method.delivery_tag) print "Message consumption complete" finally: processing_callback = False if received_signal: sys.exit() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |