Python的Tornado框架的異步任務與AsyncHTTPClient 

轉載自http://www.php.cn/python-tutorials-284773.htmlphp

 

高性能服務器Tornado
Python的web框架名目繁多,各有千秋。正如光榮屬於希臘,偉大屬於羅馬。Python的優雅結合WSGI的設計,讓web框架接口實現千秋一統。WSGI 把應用(Application)和服務器(Server)結合起來。Django 和 Flask 均可以結合 gunicon 搭建部署應用。html

 

與 django 和 flask 不同,tornado 既能夠是 wsgi 應用,也能夠是 wsgi 服務。固然,選擇tornado更多的考量源於其單進程單線程異步IO的網絡模式。高性能每每吸引人,但是有很多朋友使用以後會提出疑問,tornado號稱高性能,實際使用的時候卻怎麼感覺不到呢?python

實際上,高性能源於Tornado基於Epoll(unix爲kqueue)的異步網絡IO。由於tornado的單線程機制,一不當心就容易寫出阻塞服務(block)的代碼。不但沒有性能提升,反而會讓性能急劇降低。所以,探索tornado的異步使用方式頗有必要。git

Tornado 異步使用方式
簡而言之,Tornado的異步包括兩個方面,異步服務端和異步客戶端。不管服務端和客戶端,具體的異步模型又能夠分爲回調(callback)和協程(coroutine)。具體應用場景,也沒有很明確的界限。每每一個請求服務裏還包含對別的服務的客戶端異步請求。github

服務端異步方式
服務端異步,能夠理解爲一個tornado請求以內,須要作一個耗時的任務。直接寫在業務邏輯裏可能會block整個服務。所以能夠把這個任務放到異步處理,實現異步的方式就有兩種,一種是yield掛起函數,另一種就是使用類線程池的方式。請看一個同步例子:web

class SyncHandler(tornado.web.RequestHandler):

  def get(self, *args, **kwargs):
    # 耗時的代碼
    os.system("ping -c 2 www.google.com")
    self.finish('It works')

 

使用ab測試一下:redis

ab -c 5 -n 5 http://127.0.0.1:5000/sync

 

Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  5.076 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.99 [#/sec] (mean)
Time per request:    5076.015 [ms] (mean)
Time per request:    1015.203 [ms] (mean, across all concurrent requests)
Transfer rate:     0.19 [Kbytes/sec] received

 

qps 僅有可憐的 0.99,姑且當成每秒處理一個請求吧。django

下面祭出異步大法:json

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    tornado.ioloop.IOLoop.instance().add_timeout(1, callback=functools.partial(self.ping, 'www.google.com'))

    # do something others

    self.finish('It works')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

 

儘管在執行異步任務的時候選擇了timeout 1秒,主線程的返回仍是很快的。ab壓測以下:flask

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.009 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  556.92 [#/sec] (mean)
Time per request:    8.978 [ms] (mean)
Time per request:    1.796 [ms] (mean, across all concurrent requests)
Transfer rate:     107.14 [Kbytes/sec] received

 

上述的使用方式,經過tornado的IO循環,把能夠把耗時的任務放到後臺異步計算,請求能夠接着作別的計算。但是,常常有一些耗時的任務完成以後,咱們須要其計算的結果。此時這種方式就不行了。車道山前必有路,只須要切換一異步方式便可。下面使用協程來改寫:

class AsyncTaskHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):
    # yield 結果
    response = yield tornado.gen.Task(self.ping, ' www.google.com')
    print 'response', response
    self.finish('hello')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

 

能夠看到異步在處理,而結果值也被返回了。

Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /async/task
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.049 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  101.39 [#/sec] (mean)
Time per request:    49.314 [ms] (mean)
Time per request:    9.863 [ms] (mean, across all concurrent requests)
Transfer rate:     19.51 [Kbytes/sec] received

 

qps提高仍是很明顯的。有時候這種協程處理,未必就比同步快。在併發量很小的狀況下,IO自己拉開的差距並不大。甚至協程和同步性能差很少。例如你跟博爾特跑100米確定輸給他,但是若是跟他跑2米,鹿死誰手還未定呢。

yield掛起函數協程,儘管沒有block主線程,由於須要處理返回值,掛起到響應執行仍是有時間等待,相對於單個請求而言。另一種使用異步和協程的方式就是在主線程以外,使用線程池,線程池依賴於futures。Python2須要額外安裝。

下面使用線程池的方式修改成異步處理:

from concurrent.futures import ThreadPoolExecutor

class FutureHandler(tornado.web.RequestHandler):
  executor = ThreadPoolExecutor(10)

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'www.google.com'
    tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping, url))
    self.finish('It works')

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))

 

再運行ab測試:

Document Path:     /future
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.003 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   995 bytes
HTML transferred:    25 bytes
Requests per second:  1912.78 [#/sec] (mean)
Time per request:    2.614 [ms] (mean)
Time per request:    0.523 [ms] (mean, across all concurrent requests)
Transfer rate:     371.72 [Kbytes/sec] received

 

qps瞬間達到了1912.78。同時,能夠看到服務器的log還在不停的輸出ping的結果。
想要返回值也很容易。再切換一下使用方式接口。使用tornado的gen模塊下的with_timeout功能(這個功能必須在tornado>3.2的版本)。

class Executor(ThreadPoolExecutor):
  _instance = None

  def __new__(cls, *args, **kwargs):
    if not getattr(cls, '_instance', None):
      cls._instance = ThreadPoolExecutor(max_workers=10)
    return cls._instance


class FutureResponseHandler(tornado.web.RequestHandler):
  executor = Executor()

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    future = Executor().submit(self.ping, 'www.google.com')

    response = yield tornado.gen.with_timeout(datetime.timedelta(10), future,
                         quiet_exceptions=tornado.gen.TimeoutError)

    if response:
      print 'response', response.result()

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 1 {}".format(url))
    return 'after'

 

線程池的方式也能夠經過使用tornado的yield把函數掛起,實現了協程處理。能夠得出耗時任務的result,同時不會block住主線程。

Concurrency Level:   5
Time taken for tests:  0.043 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   960 bytes
HTML transferred:    0 bytes
Requests per second:  116.38 [#/sec] (mean)
Time per request:    42.961 [ms] (mean)
Time per request:    8.592 [ms] (mean, across all concurrent requests)
Transfer rate:     21.82 [Kbytes/sec] received

 

qps爲116,使用yield協程的方式,僅爲非reponse的十分之一左右。看起來性能損失了不少,主要緣由這個協程返回結果須要等執行完畢任務。

比如打魚,前一種方式是撒網,而後就完事,漠不關心,時間固然快,後一種方式則撒網以後,還得收網,等待收網也是一段時間。固然,相比同步的方式仍是快了千百倍,畢竟撒網仍是比一隻只釣比較快。

具體使用何種方式,更多的依賴業務,不須要返回值的每每須要處理callback,回調太多容易暈菜,固然若是須要不少回調嵌套,首先優化的應該是業務或產品邏輯。yield的方式很優雅,寫法能夠異步邏輯同步寫,爽是爽了,固然也會損失必定的性能。

異步多樣化
Tornado異步服務的處理大抵如此。如今異步處理的框架和庫也不少,藉助redis或者celery等,也能夠把tonrado中一些業務異步化,放到後臺執行。

此外,Tornado還有客戶端異步功能。該特性主要是在於 AsyncHTTPClient的使用。此時的應用場景每每是tornado服務內,須要針對另外的IO進行請求和處理。順便說起,上述的例子中,調用ping其實也算是一種服務內的IO處理。接下來,將會探索一下AsyncHTTPClient的使用,尤爲是使用AsyncHTTPClient上傳文件與轉發請求。

異步客戶端
前面瞭解Tornado的異步任務的經常使用作法,姑且歸結爲異步服務。一般在咱們的服務內,還須要異步的請求第三方服務。針對HTTP請求,Python的庫Requests是最好用的庫,沒有之一。官網宣稱:HTTP for Human。然而,在tornado中直接使用requests將會是一場惡夢。requests的請求會block整個服務進程。

上帝關上門的時候,每每回打開一扇窗。Tornado提供了一個基於框架自己的異步HTTP客戶端(固然也有同步的客戶端)--- AsyncHTTPClient。

AsyncHTTPClient 基本用法
AsyncHTTPClient是 tornado.httpclinet 提供的一個異步http客戶端。使用也比較簡單。與服務進程同樣,AsyncHTTPClient也能夠callback和yield兩種使用方式。前者不會返回結果,後者則會返回response。

若是請求第三方服務是同步方式,一樣會殺死性能。

class SyncHandler(tornado.web.RequestHandler):
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    resp = requests.get(url)
    print resp.status_code

    self.finish('It works')

 

使用ab測試大概以下:

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  10.255 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.49 [#/sec] (mean)
Time per request:    10255.051 [ms] (mean)
Time per request:    2051.010 [ms] (mean, across all concurrent requests)
Transfer rate:     0.09 [Kbytes/sec] received

 

性能至關慢了,換成AsyncHTTPClient再測:

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    http_client.fetch(url, self.on_response)
    self.finish('It works')

  @tornado.gen.coroutine
  def on_response(self, response):
    print response.code

 

qps 提升了不少

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.162 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  30.92 [#/sec] (mean)
Time per request:    161.714 [ms] (mean)
Time per request:    32.343 [ms] (mean, across all concurrent requests)
Transfer rate:     5.95 [Kbytes/sec] received

 

一樣,爲了獲取response的結果,只須要yield函數。

class AsyncResponseHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    response = yield tornado.gen.Task(http_client.fetch, url)
    print response.code
    print response.body

 

AsyncHTTPClient 轉發
使用Tornado常常須要作一些轉發服務,須要藉助AsyncHTTPClient。既然是轉發,就不可能只有get方法,post,put,delete等方法也會有。此時涉及到一些 headers和body,甚至還有https的waring。

下面請看一個post的例子, yield結果,一般,使用yield的時候,handler是須要 tornado.gen.coroutine。

headers = self.request.headers
body = json.dumps({'name': 'rsj217'})
http_client = tornado.httpclient.AsyncHTTPClient()

resp = yield tornado.gen.Task(
  self.http_client.fetch, 
  url,
  method="POST", 
  headers=headers,
  body=body, 
  validate_cert=False)

 

AsyncHTTPClient 構造請求
若是業務處理並非在handlers寫的,而是在別的地方,當沒法直接使用tornado.gen.coroutine的時候,能夠構造請求,使用callback的方式。

body = urllib.urlencode(params)
req = tornado.httpclient.HTTPRequest(
 url=url, 
 method='POST', 
 body=body, 
 validate_cert=False) 

http_client.fetch(req, self.handler_response)

def handler_response(self, response):

  print response.code

 

用法也比較簡單,AsyncHTTPClient中的fetch方法,第一個參數實際上是一個HTTPRequest實例對象,所以對於一些和http請求有關的參數,例如method和body,可使用HTTPRequest先構造一個請求,再扔給fetch方法。一般在轉發服務的時候,若是開起了validate_cert,有可能會返回599timeout之類,這是一個warning,官方卻認爲是合理的。

AsyncHTTPClient 上傳圖片
AsyncHTTPClient 更高級的用法就是上傳圖片。例如服務有一個功能就是請求第三方服務的圖片OCR服務。須要把用戶上傳的圖片,再轉發給第三方服務。

@router.Route('/api/v2/account/upload')
class ApiAccountUploadHandler(helper.BaseHandler):
  @tornado.gen.coroutine
  @helper.token_require
  def post(self, *args, **kwargs):
    upload_type = self.get_argument('type', None)

    files_body = self.request.files['file']

    new_file = 'upload/new_pic.jpg'
    new_file_name = 'new_pic.jpg'

    # 寫入文件
    with open(new_file, 'w') as w:
      w.write(file_['body'])

    logging.info('user {} upload {}'.format(user_id, new_file_name))

    # 異步請求 上傳圖片
    with open(new_file, 'rb') as f:
      files = [('image', new_file_name, f.read())]

    fields = (('api_key', KEY), ('api_secret', SECRET))

    content_type, body = encode_multipart_formdata(fields, files)

    headers = {"Content-Type": content_type, 'content-length': str(len(body))}
    request = tornado.httpclient.HTTPRequest(config.OCR_HOST,
                         method="POST", headers=headers, body=body, validate_cert=False)

    response = yield tornado.httpclient.AsyncHTTPClient().fetch(request)

def encode_multipart_formdata(fields, files):
  """
  fields is a sequence of (name, value) elements for regular form fields.
  files is a sequence of (name, filename, value) elements for data to be
  uploaded as files.
  Return (content_type, body) ready for httplib.HTTP instance
  """
  boundary = '----------ThIs_Is_tHe_bouNdaRY_$'
  crlf = '\r\n'
  l = []
  for (key, value) in fields:
    l.append('--' + boundary)
    l.append('Content-Disposition: form-data; name="%s"' % key)
    l.append('')
    l.append(value)
  for (key, filename, value) in files:
    filename = filename.encode("utf8")
    l.append('--' + boundary)
    l.append(
        'Content-Disposition: form-data; name="%s"; filename="%s"' % (
          key, filename
        )
    )
    l.append('Content-Type: %s' % get_content_type(filename))
    l.append('')
    l.append(value)
  l.append('--' + boundary + '--')
  l.append('')
  body = crlf.join(l)
  content_type = 'multipart/form-data; boundary=%s' % boundary
  return content_type, body


def get_content_type(filename):
  import mimetypes

  return mimetypes.guess_type(filename)[0] or 'application/octet-stream'

 

對比上述的用法,上傳圖片僅僅是多了一個圖片的編碼。將圖片的二進制數據按照multipart 方式編碼。編碼的同時,還須要把傳遞的相關的字段處理好。相比之下,使用requests 的方式則很是簡單:

files = {}
f = open('/Users/ghost/Desktop/id.jpg')
files['image'] = f
data = dict(api_key='KEY', api_secret='SECRET')
resp = requests.post(url, data=data, files=files)
f.close()
print resp.status_Code

 

總結
經過AsyncHTTPClient的使用方式,能夠輕鬆的實現handler對第三方服務的請求。結合前面關於tornado異步的使用方式。無非仍是兩個key。是否須要返回結果,來肯定使用callback的方式仍是yield的方式。固然,若是不一樣的函數都yield,yield也能夠一直傳遞。這個特性,tornado的中的tornado.auth 裏面對oauth的認證。

大體就是這樣的用法。

相關文章
相關標籤/搜索