利用tornado使請求實現異步非阻塞

基本IO模型

網上搜了不少關於同步異步,阻塞非阻塞的說法,理解仍是不能很透徹,有必要買書看下。
參考:使用異步 I/O 大大提升應用程序的性能
怎樣理解阻塞非阻塞與同步異步的區別?javascript

  1. 同步和異步:主要關注消息通訊機制(重點在B?)。
    同步:A調用B,B處理直到得到結果,才返回給A。
    異步:A調用B,B直接返回。無需等待結果,B經過狀態,通知等來通知A或回調函數來處理。css

  2. 阻塞和非阻塞:主要關注程序等待(重點在A?)。
    阻塞:A調用B,A被掛起直到B返回結果給A,A繼續執行。
    非阻塞:A調用B,A不會被掛起,A能夠執行其餘操做(但可能A須要輪詢檢查B是否返回)。html

  3. 同步阻塞:A調用B,A掛起,B處理直到得到結果,返回給A,A繼續執行。java

  4. 同步非阻塞:A調用B,A繼續執行,B處理直到得到結果,處理的同時A輪詢檢查B是否返回結果。python

  5. 異步阻塞:異步阻塞 I/O 模型的典型流程 (select)。linux

  6. 異步非阻塞:A調用B,B當即返回,A繼續執行,B獲得結果後經過狀態,通知等通知A或回調函數處理。git

tornado實現異步非阻塞

參考:github

  1. 使用tornado讓你的請求異步非阻塞
  2. 官方文檔
利用異步方法(回調)和@tornado.web.asynchronous

@tornado.web.asynchronous 並不能將一個同步方法變成異步,因此修飾在同步方法上是無效的,只是告訴框架,這個方法是異步的,且只能適用於HTTP verb方法(get、post、delete、put等)。@tornado.web.asynchronous 裝飾器適用於callback-style的異步方法,若是是協程則能夠用@tornado.gen.coroutine來修飾。對於用@tornado.web.asynchronous 修飾的異步方法,須要主動self.finish()來結束該請求,普通的方法(get()等)會自動結束請求在方法返回的時候。
最基本的使用callback-style的例子,直接使用異步方法,並定義callback方法。web

# sync blocking class SleepHandler(BaseHandler): # no effective @tornado.web.asynchronous def get(self): time.sleep(5) self.write('sleep for 5s') class SleepHandler(BaseHandler): @tornado.web.asynchronous def get(self): tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response) def on_response(self): self.write('sleep for 5s') self.finish() # call back class MyRequestHandler(BaseHandler): @tornado.web.asynchronous def get(self): http = httpclient.AsyncHTTPClient() http.fetch('http://www.baidu.com', self._on_download) def _on_download(self, response): self.write(response.body) self.finish() 

利用ThreadPoolExecutor

利用ThreadPoolExecutor的submit和future對象的add_done_callback 方法,將一個同步方法變成異步。以下例所示,若沒有無參能夠不用partial()ruby

class SleepHandler(BaseHandler): @tornado.web.asynchronous def get(self): sleep_time = 5 def callback(future): self.write(future.result()) self.finish() EXECUTOR.submit(partial(self.get_sleep, sleep_time)).add_done_callback( lambda future: tornado.ioloop.IOLoop.instance().add_callback( partial(callback, future))) def get_sleep(self, sleep_time): time.sleep(sleep_time) return "Awake! %s" % time.time() 

分解一下:

  1. future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))返回了一個future對象。
  2. future.add_done_callback()future添加一個完成回調函數callback_func
  3. 實際上這個回調函數是
callback_func = lambda future: tornado.ioloop.IOLoop.instance().add_callback(partial(callback, future))``` 4. 結合起來就是: 

class SleepHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
sleep_time = 5

def final_callback(future_param): self.write(future_param.result()) self.finish() def executor_callback(future_param): tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future_param)) future = EXECUTOR.submit(partial(self.get_sleep, sleep_time)) future.add_done_callback(executor_callback) def get_sleep(self, sleep_time): time.sleep(sleep_time) return "Awake! %s" % time.time() 
5. 沒看文檔,源碼前本認爲爲何要畫蛇添足進行兩次callback,直接

tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))

不就ok了嗎,爲什麼還要再加上一層

future.add_done_callback(executor_callback)

但發現直接這樣是會阻塞IOLoop的。查閱相關文檔後發現,``` tornado.ioloop.IOLoop.instance().add_callback ``` 這個方法會將控制權從其餘線程轉到IOLoop線程上,直接用 

tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))
時,final_callback()中獲取future_param.result()仍然會阻塞,因此須要future.add_done_callback()```,在該線程完成獲取結果後在回callback給IOLoop。


另外一種相似的寫法,將上例的方法寫成了一個裝飾器,見下例,但我的認爲利用這種方式異步時沒有必要單獨寫一個裝飾器吧,並且也不通用吧?

def unblock(f): @tornado.web.asynchronous @wraps(f) def wrapper(*args, **kwargs): self = args[0] def callback(future): self.write(future.result()) self.finish() EXECUTOR.submit( partial(f, *args, **kwargs) ).add_done_callback( lambda future: tornado.ioloop.IOLoop.instance().add_callback( partial(callback, future))) return wrapper class SleepHandler(BaseHandler): @unblock def get(self): time.sleep(5) return "Awake! %s" % time.time() 

使用tornado.concurrent.run_on_executor簡化

上述的兩例都至關於開啓了新的線程池?
除此以外還能夠利用@run_on_executor裝飾器將同步阻塞函數變成異步(或者說被tornado的裝飾器理解和識別)。首先閱讀一下@run_on_executor源碼:

def run_on_executor(*args, **kwargs): """Decorator to run a synchronous method asynchronously on an executor. The decorated method may be called with a ``callback`` keyword argument and returns a future. The `.IOLoop` and executor to be used are determined by the ``io_loop`` and ``executor`` attributes of ``self``. To use different attributes, pass keyword arguments to the decorator:: @run_on_executor(executor='_thread_pool') def foo(self): pass .. versionchanged:: 4.2 Added keyword arguments to use alternative attributes. """ def run_on_executor_decorator(fn): executor = kwargs.get("executor", "executor") io_loop = kwargs.get("io_loop", "io_loop")  @functools.wraps(fn) def wrapper(self, *args, **kwargs): callback = kwargs.pop("callback", None) future = getattr(self, executor).submit(fn, self, *args, **kwargs) if callback: getattr(self, io_loop).add_future( future, lambda future: callback(future.result())) return future return wrapper if args and kwargs: raise ValueError("cannot combine positional and keyword args") if len(args) == 1: return run_on_executor_decorator(args[0]) elif len(args) != 0: raise ValueError("expected 1 argument, got %d", len(args)) return run_on_executor_decorator 

能夠很快發如今不存在callback關鍵字參數時,該裝飾器返回了一個future對象,由此並結合上述兩例子能夠很快的利用@run_on_executor寫出,感受至關於簡化了上述兩例的代碼並使之變得通用:

class SleepHandler(BaseHandler): executor = ThreadPoolExecutor(2) @tornado.web.asynchronous def get(self): def callback(future_param): self.write('sleep %ss' % future_param.result()) self.finish() future = self.sleep() future.add_done_callback(lambda f: tornado.ioloop.IOLoop.instance().add_callback( partial(callback, f))) @run_on_executor def sleep(self): time.sleep(5) return 5 

固然也能夠利用callback參數:

class SleepHandler(BaseHandler): executor = ThreadPoolExecutor(2) io_loop = tornado.ioloop.IOLoop.instance() @tornado.web.asynchronous def get(self): def callback(res): self.write('sleep %ss' % res) self.finish() self.sleep(callback=callback) @run_on_executor def sleep(self): time.sleep(5) return 5 

協程方式:@tornado.gen.coroutineyield

除了上述的用利用@tornado.web.asynchronous和callback的方式,還能夠用@tornado.gen.coroutineyield,以下例子:

class GenRequestHandler(BaseHandler): @tornado.gen.coroutine def get(self): http = httpclient.AsyncHTTPClient() res = yield http.fetch('http://www.baidu.com') self.write(res.body) 

參考官方文檔

Most asynchronous functions in Tornado return a Future
; yielding this object returns its result
.

利用tornado.gen.Task修飾一個callback型的異步函數使其可以與yield使用。

gen.Task is now a function that returns a Future

看一下例子(對於sleep能夠直接用yield tornado.gen.sleep(5)):

# gen.Task class SleepHandler(BaseHandler): @tornado.gen.coroutine def get(self): yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5) # yield tornado.gen.sleep(5) self.write('sleep for 5s') 

還能夠用結合celery使用(但並非最好的方案)。


Last

https://github.com/tornadoweb/tornado/wiki/Links tornado的wiki上有不少異步相關支持的庫。

做者:蔣狗 連接:https://www.jianshu.com/p/ef01c1386933 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索