Python的Tornado框架的异步任务与AsyncHTTPClient
高性能服务器Tornado 与 django 和 flask 不一样,tornado 既可以是 wsgi 应用,也可以是 wsgi 服务。当然,选择tornado更多的考量源于其单进程单线程异步IO的网络模式。高性能往往吸引人,可是有不少朋友使用之后会提出疑问,tornado号称高性能,实际使用的时候却怎么感受不到呢? 实际上,高性能源于Tornado基于Epoll(unix为kqueue)的异步网络IO。因为tornado的单线程机制,一不小心就容易写出阻塞服务(block)的代码。不但没有性能提高,反而会让性能急剧下降。因此,探索tornado的异步使用方式很有必要。 Tornado 异步使用方式 服务端异步方式 class SyncHandler(tornado.web.RequestHandler): def get(self,*args,**kwargs): # 耗时的代码 os.system("ping -c 2 www.google.com") self.finish('It works') 使用ab测试一下: ab -c 5 -n 5 http://127.0.0.1:5000/sync Server Software: TornadoServer/4.3 Server Hostname: 127.0.0.1 Server Port: 5000 Document Path: /sync Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 5.076 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 0.99 [#/sec] (mean) Time per request: 5076.015 [ms] (mean) Time per request: 1015.203 [ms] (mean,across all concurrent requests) Transfer rate: 0.19 [Kbytes/sec] received qps 仅有可怜的 0.99,姑且当成每秒处理一个请求吧。 下面祭出异步大法: class AsyncHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self,**kwargs): tornado.ioloop.IOLoop.instance().add_timeout(1,callback=functools.partial(self.ping,'www.google.com')) # do something others self.finish('It works') @tornado.gen.coroutine def ping(self,url): os.system("ping -c 2 {}".format(url)) return 'after' 尽管在执行异步任务的时候选择了timeout 1秒,主线程的返回还是很快的。ab压测如下: Document Path: /async Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.009 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 556.92 [#/sec] (mean) Time per request: 8.978 [ms] (mean) Time per request: 1.796 [ms] (mean,across all concurrent requests) Transfer rate: 107.14 [Kbytes/sec] received 上述的使用方式,通过tornado的IO循环,把可以把耗时的任务放到后台异步计算,请求可以接着做别的计算。可是,经常有一些耗时的任务完成之后,我们需要其计算的结果。此时这种方式就不行了。车道山前必有路,只需要切换一异步方式即可。下面使用协程来改写: class AsyncTaskHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self,**kwargs): # yield 结果 response = yield tornado.gen.Task(self.ping,' www.google.com') print 'response',response self.finish('hello') @tornado.gen.coroutine def ping(self,url): os.system("ping -c 2 {}".format(url)) return 'after' 可以看到异步在处理,而结果值也被返回了。 Server Software: TornadoServer/4.3 Server Hostname: 127.0.0.1 Server Port: 5000 Document Path: /async/task Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.049 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 101.39 [#/sec] (mean) Time per request: 49.314 [ms] (mean) Time per request: 9.863 [ms] (mean,across all concurrent requests) Transfer rate: 19.51 [Kbytes/sec] received qps提升还是很明显的。有时候这种协程处理,未必就比同步快。在并发量很小的情况下,IO本身拉开的差距并不大。甚至协程和同步性能差不多。例如你跟博尔特跑100米肯定输给他,可是如果跟他跑2米,鹿死谁手还未定呢。 yield挂起函数协程,尽管没有block主线程,因为需要处理返回值,挂起到响应执行还是有时间等待,相对于单个请求而言。另外一种使用异步和协程的方式就是在主线程之外,使用线程池,线程池依赖于futures。Python2需要额外安装。 下面使用线程池的方式修改为异步处理: from concurrent.futures import ThreadPoolExecutor class FutureHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10) @tornado.web.asynchronous @tornado.gen.coroutine def get(self,**kwargs): url = 'www.google.com' tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping,url)) self.finish('It works') @tornado.concurrent.run_on_executor def ping(self,url): os.system("ping -c 2 {}".format(url)) 再运行ab测试: Document Path: /future Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.003 seconds Complete requests: 5 Failed requests: 0 Total transferred: 995 bytes HTML transferred: 25 bytes Requests per second: 1912.78 [#/sec] (mean) Time per request: 2.614 [ms] (mean) Time per request: 0.523 [ms] (mean,across all concurrent requests) Transfer rate: 371.72 [Kbytes/sec] received qps瞬间达到了1912.78。同时,可以看到服务器的log还在不停的输出ping的结果。 class Executor(ThreadPoolExecutor): _instance = None def __new__(cls,**kwargs): if not getattr(cls,'_instance',None): cls._instance = ThreadPoolExecutor(max_workers=10) return cls._instance class FutureResponseHandler(tornado.web.RequestHandler): executor = Executor() @tornado.web.asynchronous @tornado.gen.coroutine def get(self,**kwargs): future = Executor().submit(self.ping,'www.google.com') response = yield tornado.gen.with_timeout(datetime.timedelta(10),future,quiet_exceptions=tornado.gen.TimeoutError) if response: print 'response',response.result() @tornado.concurrent.run_on_executor def ping(self,url): os.system("ping -c 1 {}".format(url)) return 'after' 线程池的方式也可以通过使用tornado的yield把函数挂起,实现了协程处理。可以得出耗时任务的result,同时不会block住主线程。 Concurrency Level: 5 Time taken for tests: 0.043 seconds Complete requests: 5 Failed requests: 0 Total transferred: 960 bytes HTML transferred: 0 bytes Requests per second: 116.38 [#/sec] (mean) Time per request: 42.961 [ms] (mean) Time per request: 8.592 [ms] (mean,across all concurrent requests) Transfer rate: 21.82 [Kbytes/sec] received qps为116,使用yield协程的方式,仅为非reponse的十分之一左右。看起来性能损失了很多,主要原因这个协程返回结果需要等执行完毕任务。 好比打鱼,前一种方式是撒网,然后就完事,不闻不问,时间当然快,后一种方式则撒网之后,还得收网,等待收网也是一段时间。当然,相比同步的方式还是快了千百倍,毕竟撒网还是比一只只钓比较快。 具体使用何种方式,更多的依赖业务,不需要返回值的往往需要处理callback,回调太多容易晕菜,当然如果需要很多回调嵌套,首先优化的应该是业务或产品逻辑。yield的方式很优雅,写法可以异步逻辑同步写,爽是爽了,当然也会损失一定的性能。 异步多样化 此外,Tornado还有客户端异步功能。该特性主要是在于 AsyncHTTPClient的使用。此时的应用场景往往是tornado服务内,需要针对另外的IO进行请求和处理。顺便提及,上述的例子中,调用ping其实也算是一种服务内的IO处理。接下来,将会探索一下AsyncHTTPClient的使用,尤其是使用AsyncHTTPClient上传文件与转发请求。 异步客户端 上帝关上门的时候,往往回打开一扇窗。Tornado提供了一个基于框架本身的异步HTTP客户端(当然也有同步的客户端)--- AsyncHTTPClient。 AsyncHTTPClient 基本用法 如果请求第三方服务是同步方式,同样会杀死性能。 class SyncHandler(tornado.web.RequestHandler): def get(self,**kwargs): url = 'https://api.github.com/' resp = requests.get(url) print resp.status_code self.finish('It works') 使用ab测试大概如下: Document Path: /sync Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 10.255 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 0.49 [#/sec] (mean) Time per request: 10255.051 [ms] (mean) Time per request: 2051.010 [ms] (mean,across all concurrent requests) Transfer rate: 0.09 [Kbytes/sec] received 性能相当慢了,换成AsyncHTTPClient再测: class AsyncHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self,**kwargs): url = 'https://api.github.com/' http_client = tornado.httpclient.AsyncHTTPClient() http_client.fetch(url,self.on_response) self.finish('It works') @tornado.gen.coroutine def on_response(self,response): print response.code qps 提高了很多 Document Path: /async Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.162 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 30.92 [#/sec] (mean) Time per request: 161.714 [ms] (mean) Time per request: 32.343 [ms] (mean,across all concurrent requests) Transfer rate: 5.95 [Kbytes/sec] received 同样,为了获取response的结果,只需要yield函数。 class AsyncResponseHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self,**kwargs): url = 'https://api.github.com/' http_client = tornado.httpclient.AsyncHTTPClient() response = yield tornado.gen.Task(http_client.fetch,url) print response.code print response.body AsyncHTTPClient 转发 下面请看一个post的例子, yield结果,通常,使用yield的时候,handler是需要 tornado.gen.coroutine。 headers = self.request.headers body = json.dumps({'name': 'rsj217'}) http_client = tornado.httpclient.AsyncHTTPClient() resp = yield tornado.gen.Task( self.http_client.fetch,url,method="POST",headers=headers,body=body,validate_cert=False) AsyncHTTPClient 构造请求 body = urllib.urlencode(params) req = tornado.httpclient.HTTPRequest( url=url,method='POST',validate_cert=False) http_client.fetch(req,self.handler_response) def handler_response(self,response): print response.code 用法也比较简单,AsyncHTTPClient中的fetch方法,第一个参数其实是一个HTTPRequest实例对象,因此对于一些和http请求有关的参数,例如method和body,可以使用HTTPRequest先构造一个请求,再扔给fetch方法。通常在转发服务的时候,如果开起了validate_cert,有可能会返回599timeout之类,这是一个warning,官方却认为是合理的。 AsyncHTTPClient 上传图片 @router.Route('/api/v2/account/upload') class ApiAccountUploadHandler(helper.BaseHandler): @tornado.gen.coroutine @helper.token_require def post(self,**kwargs): upload_type = self.get_argument('type',None) files_body = self.request.files['file'] new_file = 'upload/new_pic.jpg' new_file_name = 'new_pic.jpg' # 写入文件 with open(new_file,'w') as w: w.write(file_['body']) logging.info('user {} upload {}'.format(user_id,new_file_name)) # 异步请求 上传图片 with open(new_file,'rb') as f: files = [('image',new_file_name,f.read())] fields = (('api_key',KEY),('api_secret',SECRET)) content_type,body = encode_multipart_formdata(fields,files) headers = {"Content-Type": content_type,'content-length': str(len(body))} request = tornado.httpclient.HTTPRequest(config.OCR_HOST,validate_cert=False) response = yield tornado.httpclient.AsyncHTTPClient().fetch(request) def encode_multipart_formdata(fields,files): """ fields is a sequence of (name,value) elements for regular form fields. files is a sequence of (name,filename,value) elements for data to be uploaded as files. Return (content_type,body) ready for httplib.HTTP instance """ boundary = '----------ThIs_Is_tHe_bouNdaRY_$' crlf = 'rn' l = [] for (key,value) in fields: l.append('--' + boundary) l.append('Content-Disposition: form-data; name="%s"' % key) l.append('') l.append(value) for (key,value) in files: filename = filename.encode("utf8") l.append('--' + boundary) l.append( 'Content-Disposition: form-data; name="%s"; filename="%s"' % ( key,filename ) ) l.append('Content-Type: %s' % get_content_type(filename)) l.append('') l.append(value) l.append('--' + boundary + '--') l.append('') body = crlf.join(l) content_type = 'multipart/form-data; boundary=%s' % boundary return content_type,body def get_content_type(filename): import mimetypes return mimetypes.guess_type(filename)[0] or 'application/octet-stream' 对比上述的用法,上传图片仅仅是多了一个图片的编码。将图片的二进制数据按照multipart 方式编码。编码的同时,还需要把传递的相关的字段处理好。相比之下,使用requests 的方式则非常简单: files = {} f = open('/Users/ghost/Desktop/id.jpg') files['image'] = f data = dict(api_key='KEY',api_secret='SECRET') resp = requests.post(url,data=data,files=files) f.close() print resp.status_Code 总结 大致就是这样的用法。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |