Tornado異步非阻塞

隨便問一個Python開發者,Tornado框架和Django/Flask之類的框架有什麼區別,十有八九會回答,Tornado支持異步非阻塞。Tornado和其餘兩個框架最大的區別在於,Tornado既是web框架同時又是異步網絡庫,也是web服務器。而不管Django仍是Flask,它們都只是web框架,不能單獨在生產環境中使用,必須搭配wsgi web服務器配套使用(如gunicorn, uWSGI),同時又由於WSGI協議的緣故,這兩個web框架都不支持異步非阻塞。而Tornado又是如何實現處理異步請求的又是一個話題。 這篇博客聚焦的是,如何在Tornado中正確的實現異步非阻塞操做。全部的示例代碼都是基於Tornado 4.0+ 和 Python 3.5+ 。 關於Tornado異步IO,文檔 已經寫得很清楚了。Tornado中主要有三種形式的異步實現方式:回調、協程和Future。不管何種形式實現的,都和Tornado核心 IOLoop有千絲萬縷的聯繫。html

基於回調

在Tornado中,用@tornado.web.asynchronous 裝飾器裝飾的請求方法就是基於回調的異步實現。最多見的例子就是AsyncHTTPClient:git

class CallbackFetchHandler(RequestHandler):

 @web.asynchronous
    def get(self):
        url = self.get_argument("url", "https://httpbin.org/")
        client = httpclient.AsyncHTTPClient()
        client.fetch(url, self._on_response)

    def _on_response(self, response):
        size = len(response.body)
        self.write("pageSize is %s" % size)
        self.finish()
複製代碼

這裏須要注意的是,@web.asynchronous 這個裝飾器只能在請求方法中使用,而在其餘方法使用回發生未知錯誤,同時要在回調函數中自行調用 self.finish() 來關閉請求,不然這個請求回一直pending。 這種異步實現須要有支持回調的異步庫或者函數能夠調用,若是沒有即便用了這個裝飾器也是白搭。例如AsyncHTTPClient.fetch 方法本事的實現就支持異步的,而HTTPClient.fetch只是作了一個同步的封裝。github

基於協程

相比回調,用協程實現的方式在代碼流程上更易於閱讀。Tornado用 @gen.coroutine 來配合函數實現異步的調用方式。 在Python3 以前,Python的協程都已生成器的方式實現的,因此Tornado本身的協程實現,不能使用普通的生成器,而Python3 以後有原生的協程實現,因此對於新項目,不考慮兼容比較老的Python,能夠考慮用原生的語法實現。在較早以前,用這個裝飾器的方式返回的是YieldPoint,Tornado 4.0以後便改爲了Future。 上面的例子能夠改爲,web

class GenAsyncHandler(RequestHandler):

 @gen.coroutine
    def get(self):
        url = self.get_argument("url", "https://httpbin.org/")
        client = httpclient.AsyncHTTPClient()
        response = yield client.fetch(url)
        self.write("pageSize is %s" % len(response.body))
複製代碼

並且基於協程的異步能夠同時運行幾個Future,相似於redis

@gen.coroutine
def get(self):
    http_client = AsyncHTTPClient()
    response1, response2 = yield [http_client.fetch(url1),
                                  http_client.fetch(url2)]
    response_dict = yield dict(
        url3=http_client.fetch(url3),
        url4=http_client.fetch(url4)
    )
    response3 = response_dict['url3']
    response4 = response_dict['url4']
複製代碼

這種對於在一個請求中要去請求多個外部服務的狀況下,仍是有很大的做用的。 可是不管是基於回調仍是基於協程,都必須有一個前提,那就是要有支持異步的庫才能實現異步,若是在一個異步的函數中調用了一個同步的方法,整個函數仍是同步的。Tornado的第三方裏面羅列了一些,可是不少都只是支持Python2,也有不少已經再也不更新了,對比於Django和Flask的生態確實差不少。也難怪,基本上全部異步庫都要和Tornado自己的IOLoop綁定在一塊兒。算法

基於Future

那麼假如要調用一些沒有實現異步的外部庫呢?Tornado也給出瞭解決方案,那就是基於線程池的形式,使用concurrent.futures中的ThreadPoolExecutor, 把可能回阻塞的操做都放到線程池中去實現,一樣以剛纔的例子,服務器

class ExecutorHandler(RequestHandler):
    executor = ThreadPoolExecutor()

 @gen.coroutine
    def get(self):
        url = self.get_argument("url", "https://httpbin.org/")
        response = yield self.fetch(url)
        self.write("pageSize is %s" % len(response))
        self.finish()

 @run_on_executor
    def fetch(self, url):
        with urlopen(url) as page:
            content = page.read().decode("utf-8")
        return content
複製代碼

可是因爲GIL的緣故,Python的多線程在一些計算密集型的任務重表現並非很好。但不少時候,很難去判斷說一個任務是計算密集型仍是IO密集型性,比如須要調用一個有鑑權的外部服務,而鑑權的加解密算法須要大量的計算,這就很難講是IO密集型了。 處理這些任務,也能夠藉助任務隊列來實現,好比Celery。在一個項目中,咱們就利用了Celery來處理和硬件用UDP協議來進行交互。在Python3中使用Tornado和Celery中,有一個 tornado-celery 庫,實際發現使用起來仍是同步阻塞的。因爲做者長時間不更新,並且也不支持新版的Tornado,因此不建議使用。 實際上稍微修改一下就能夠本身實現Celery的異步了,以下:網絡

class CeleryAsyncHandler(BaseHandler):

    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            tornado.ioloop.IOLoop.current().add_callback(
                partial(self.wait_for_result, task, callback)
            )

 @gen.coroutine
    def get(self):
        url = self.get_argument("url", "https://httpbin.org/")
        task = fetch_content.apply_async(args=(url, ))
        content = yield gen.Task(self.wait_for_result, task)
        self.write("pageSize is %s" % len(content))
複製代碼

而Celery中worker代碼以下;多線程

from urllib.request import urlopen

from celery import Celery


app = Celery('tasks')

app.conf.update(
    broker_url="amqp://guest:guest@localhost:5672",
    result_backend="redis://localhost:6379/2",
    timezone='Asia/Shanghai',
    enable_utc=True,
    TCELERY_RESULT_NOWAIT=False
)


@app.task
def fetch_content(url):
    with urlopen(url) as page:
        content = page.read().decode("utf-8")
    return content
複製代碼

實際上就是把可能的複雜計算或者是網絡IO所有移出tornado主線程。這個能實現的原理就是把Celery的一個任務當成一個網絡IO註冊在IOLoop中,Tornado會定時遍歷事件是否已經就緒,若是已經就緒則進行回調處理。app

參考資料

相關文章
相關標籤/搜索