Python的Twisted框架中使用Deferred对象来管理回调函数
首先抛出我们在讨论使用回调编程时的一些观点:
Deferred def getDummyData(inputData): """ This function is a dummy which simulates a delayed result and returns a Deferred which will fire with that result. Don't try too hard to understand this. """ print('getDummyData called') deferred = defer.Deferred() # simulate a delayed result by asking the reactor to fire the # Deferred in 2 seconds time with the result inputData * 3 reactor.callLater(2,deferred.callback,inputData * 3) return deferred def cbPrintData(result): """ Data handling function to be added as a callback: handles the data by printing the result """ print('Result received: {}'.format(result)) deferred = getDummyData(3) deferred.addCallback(cbPrintData) # manually set up the end of the process by asking the reactor to # stop itself in 4 seconds time reactor.callLater(4,reactor.stop) # start up the Twisted reactor (event loop handler) manually print('Starting the reactor') reactor.run() 多个回调函数 from twisted.internet import reactor,defer class Getter: def gotResults(self,x): """ The Deferred mechanism provides a mechanism to signal error conditions. In this case,odd numbers are bad. This function demonstrates a more complex way of starting the callback chain by checking for expected results and choosing whether to fire the callback or errback chain """ if self.d is None: print("Nowhere to put results") return d = self.d self.d = None if x % 2 == 0: d.callback(x * 3) else: d.errback(ValueError("You used an odd number!")) def _toHTML(self,r): """ This function converts r to HTML. It is added to the callback chain by getDummyData in order to demonstrate how a callback passes its own result to the next callback """ return "Result: %s" % r def getDummyData(self,x): """ The Deferred mechanism allows for chained callbacks. In this example,the output of gotResults is first passed through _toHTML on its way to printData. Again this function is a dummy,simulating a delayed result using callLater,rather than using a real asynchronous setup. """ self.d = defer.Deferred() # simulate a delayed result by asking the reactor to schedule # gotResults in 2 seconds time reactor.callLater(2,self.gotResults,x) self.d.addCallback(self._toHTML) return self.d def cbPrintData(result): print(result) def ebPrintError(failure): import sys sys.stderr.write(str(failure)) # this series of callbacks and errbacks will print an error message g = Getter() d = g.getDummyData(3) d.addCallback(cbPrintData) d.addErrback(ebPrintError) # this series of callbacks and errbacks will print "Result: 12" g = Getter() d = g.getDummyData(4) d.addCallback(cbPrintData) d.addErrback(ebPrintError) reactor.callLater(4,reactor.stop) reactor.run() 需要注意的一点是,在方法gotResults中处理self.d的方式。在Deferred对象被结果或者错误激活之前,这个属性被设置成了None,这样Getter实例就不会再持有将要激活的Deferred对象的引用。这样做有几个好处,首先,这样可以避免Getter.gotResults有时会重复激活相同的Deferred对象的可能性(这样会导致出现AlreadyCalledError异常)。其次,这样做可以使得该Deferred对象上可以添加一个调用了Getter.getDummyData函数的回调函数,而不会产生问题。还有,这样使得Python垃圾收集器更容易通过引用循环来检测出一个对象是否需要回收。 1.请求方法请求数据到Data Sink,得到返回的Deferred对象。 2.请求方法把回调函数关联到Deferred对象上。 1.当结果已经准备好后,把它传递给Deferred对象。如果操作成功就调用Deferred对象的.callback(result)方法,如果操作失败就调用Deferred对象的.errback(faliure)方法。注意failure是twisted.python.failure.Failure类的一个实例。 try: # code that may throw an exception cookSpamAndEggs() except (SpamException,EggException): # Handle SpamExceptions and EggExceptions ... 可以写成: def errorHandler(failure): failure.trap(SpamException,EggException) # Handle SpamExceptions and EggExceptions d.addCallback(cookSpamAndEggs) d.addErrback(errorHandler) 如果传递给faliure.trap的参数没有能和Faliure中的错误匹配的,那它会重新抛出这个错误。 # Case 1 d = getDeferredFromSomewhere() d.addCallback(callback1) # A d.addErrback(errback1) # B d.addCallback(callback2) d.addErrback(errback2) # Case 2 d = getDeferredFromSomewhere() d.addCallbacks(callback1,errback1) # C d.addCallbacks(callback2,errback2) 对于Case 1来说,如果在callback1里面发生了错误,那么errback1就会被调用。而对于Case 2来说,被调用的却是是errback2。 # Make sure errors get logged from twisted.python import log d.addErrback(log.err) 处理同步和异步结果 def authenticateUser(isValidUser,user): if isValidUser(user): print("User is authenticated") else: print("User is not authenticated") 这个函数假定isValidUser是立即返回的,然而实际上isValidUser可能是异步认证用户的并且返回的是一个Deferred对象。把这个函数调整为既能接收同步的isValidUser又能接收异步的isValidUser是有可能的。同时把同步的函数改成返回值为Deferred对象也是可以的。 def synchronousIsValidUser(user): ''' Return true if user is a valid user,false otherwise ''' return user in ["Alice","Angus","Agnes"] 这是一个异步的用户认证方法,返回一个Deferred对象: from twisted.internet import reactor,defer def asynchronousIsValidUser(user): d = defer.Deferred() reactor.callLater(2,d.callback,user in ["Alice","Agnes"]) return d 我们最初对authenticateUser的实现希望isValidUser是同步的,但是现在需要把它改成既能处理同步又能处理异步的isValidUser实现。对此,可以使用maybeDeferred函数来调用isValidUser,这个函数可以保证isValidUser函数的返回值是一个Deferred对象,即使isValidUser是一个同步的函数: from twisted.internet import defer def printResult(result): if result: print("User is authenticated") else: print("User is not authenticated") def authenticateUser(isValidUser,user): d = defer.maybeDeferred(isValidUser,user) d.addCallback(printResult) 现在isValidUser无论是同步的还是异步的都可以了。 def startConnecting(someEndpoint): def connected(it): "Do something useful when connected." return someEndpoint.connect(myFactory).addCallback(connected) # ... connectionAttempt = startConnecting(endpoint) def cancelClicked(): connectionAttempt.cancel() 显然,startConnecting被一些UI元素用来让用户选择连接哪个机器。然后是一个取消按钮陪着到cancelClicked函数上。
即使这个取消操作已经表达了让底层的操作停止的需求,但是底层的操作不大可能马上就对此作出反应。甚至在这个简单的例子中就有一个不会被中断的操作:域名解析,因此需要在在一个线程中执行;这个应用中的连接操作如果在等待域名解析的时候就不能被取消。所以你要取消的Deferred对象可能不会立即调用回调函数或错误处理回调函数。 一个Deferred对象可能会在执行它的回调函数链的任何一点时等待另一个Deferred对象的完成。没有方法可以在回调函数链的特定一个点知道是否每件事都已经准备好了。由于有可能一个回调函数链的很多层次上的函数都会希望取消同一个Deferred对象,那么链上任何层次的函数在任意时刻都有可能调用.cancel()函数。.cancel()函数从不抛出任何异常或者返回任何值。你可以重复调用它,即使这个Deferred对象已经被激活了,它已经没有剩余的回调函数了。 operation = Deferred() def x(result): print("Hooray,a result:" + repr(x)) operation.addCallback(x) # ... def operationDone(): operation.callback("completed") 如果需要取消operation这个Deferred对象,而operation没有一个canceller的取消函数,就会产生下面两种之一的结果: class HTTPClient(Protocol): def request(self,method,path): self.resultDeferred = Deferred( lambda ignore: self.transport.abortConnection()) request = b"%s %s HTTP/1.0rnrn" % (method,path) self.transport.write(request) return self.resultDeferred def dataReceived(self,data): # ... parse HTTP response ... # ... eventually call self.resultDeferred.callback() ... 现在如果在HTTPClient.request()返回的Deferred对象上调用了cancel()函数,这个HTTP请求就会取消(如果没有太晚的话)。要注意的是,还要在一个已经被取消的、带有canceller的Deferred对象上调用callback()。 现在就可以把这个DeferredList当成一个普通的Deferred来看待了,例如你也可以调用addCallbacks等等。这个DeferredList会在所有的Deferred对象都完成之后才调用它的回调函数。这个回调函数的参数是这个DeferredList对象中包含的所有Deferred对象返回结果的列表,例如: # A callback that unpacks and prints the results of a DeferredList def printResult(result): for (success,value) in result: if success: print('Success:',value) else: print('Failure:',value.getErrorMessage()) # Create three deferreds. deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred3 = defer.Deferred() # Pack them into a DeferredList dl = defer.DeferredList([deferred1,deferred3],consumeErrors=True) # Add our callback dl.addCallback(printResult) # Fire our three deferreds with various values. deferred1.callback('one') deferred2.errback(Exception('bang!')) deferred3.callback('three') # At this point,dl will fire its callback,printing: # Success: one # Failure: bang! # Success: three # (note that defer.SUCCESS == True,and defer.FAILURE == False) 正常情况下DeferredList不会调用errback,但是除非把cousumeErrors设置成True,否则在Deferred对象中产生的错误仍然会激活每个Deferred对象各自的errback。 def printResult(result): print(result) def addTen(result): return result + " ten" # Deferred gets callback before DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred1.addCallback(addTen) dl = defer.DeferredList([deferred1,deferred2]) dl.addCallback(printResult) deferred1.callback("one") # fires addTen,checks DeferredList,stores "one ten" deferred2.callback("two") # At this point,printing: # [(True,'one ten'),(True,'two')] # Deferred gets callback after DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() dl = defer.DeferredList([deferred1,deferred2]) deferred1.addCallback(addTen) # will run *after* DeferredList gets its value dl.addCallback(printResult) deferred1.callback("one") # checks DeferredList,stores "one",fires addTen deferred2.callback("two") # At this point,'one),'two')] DeferredList接受三个关键字参数来定制它的行为:fireOnOneCallback、fireOnOneErrback和cousumeErrors。如果设置了fireOnOneCallback,那么只要有一个Deferred对象调用了它的回调函数,DeferredList就会立即调用它的回调函数。相似的,如果设置了fireOnOneErrback,那么只要有一个Deferred调用了errback,DeferredList就会调用它的errback。注意,DeferredList只是一次性的,所以在一次callback或者errback调用之后,它就会什么也不做(它会忽略它的Deferred传递给它的所有结果)。 from twisted.internet import defer d1 = defer.Deferred() d2 = defer.Deferred() d = defer.gatherResults([d1,d2],consumeErrors=True) def cbPrintResult(result): print(result) d.addCallback(cbPrintResult) d1.callback("one") # nothing is printed yet; d is still awaiting completion of d2 d2.callback("two") # printResult prints ["one","two"] 链式的Deferred chainDeferred(otherDeferred) 总结 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |