tornado框架的異步非阻塞

原文: http://www.liangxiansen.cn/2018/04/11/tornado/  做者: 梁先森
稍有改動css

Tornado默認是單進程單線程。實時的web特性一般須要爲每一個用戶一個大部分時間都處於空閒的長鏈接. 在傳統的同步web服務器中,這意味着須要給每一個用戶分配一個專用的線程,這樣的開銷是十分巨大的.html

爲了減少對於併發鏈接須要的開銷,Tornado使用了一種單線程事件循環的方式. 這意味着全部應用程序代碼都應該是異步和非阻塞的,由於在同一時刻只有一個操做是有效的.python

Tornado 中推薦用 協程 來編寫異步代碼. 協程使用 Python 中的關鍵字 yield 來替代鏈式回調來實現掛起和繼續程序的執行(像在 gevent 中使用的輕量級線程合做的方法有時也稱做協程, 可是在 Tornado 中全部協程使用異步函數來實現的明確的上下文切換).nginx

同步阻塞(Blocking)

一個函數一般在它等待返回值的時候被 阻塞 .一個函數被阻塞可能因爲不少緣由: 網絡I/O,磁盤I/O,互斥鎖等等.事實上, 每個 函數都會被阻塞,只是時間會比較短而已, 當一個函數運行時而且佔用CPU(舉一個極端的例子來講明爲何CPU阻塞的時間必須考慮在內, 考慮如下密碼散列函數像bcrypt, 這個函數須要佔據幾百毫秒的CPU時間, 遠遠超過了一般對於網絡和磁盤請求的時間). 一個函數能夠在某些方面阻塞而在其餘方面不阻塞.舉例來講, tornado.httpclient 在默認設置下將阻塞與DNS解析,可是在其它網絡請求時不會阻塞 (爲了減輕這種影響,能夠用 ThreadedResolver 或經過正確配置 libcurl 使用 tornado.curl_httpclient ). 在Tornado的上下文中咱們一般討論網絡I/O上下文阻塞, 雖然各類阻塞已經被最小化了.git

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Lian
# Python 3.5
import time
import tornado.web
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
def doing():
    time.sleep(10)
    return 'Blocking'
class BlockingHandler(tornado.web.RequestHandler):
    def get(self):
        result = doing()
        self.write(result)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/blocking", BlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

瀏覽器訪問:http://127.0.0.1:8888/index
瀏覽器訪問:http://127.0.0.1:8888/blocking
你會發現blocking會一直在轉圈,處於一個堵塞狀態。
你再訪問index頁面,你發現index頁面也會堵塞住。github

異步非阻塞(Non Blocking)

一個 異步 函數在它結束前就已經返回了,並且一般會在程序中觸發一些動做而後在後臺執行一些任務. (和正常的 同步 函數相比, 同步函數在返回以前作完了全部的事). 這裏有幾種類型的異步接口:web

  • 回調函數(基本不用)
  • tornado協程+生成器
  • tornado協程+Future
  • 線程池進程池

tornado封裝的協程+生成器

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import tornado.web
from tornado import gen
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
@gen.coroutine
def doing():
    """
    穿上@gen.coroutine 裝飾器以後,最終結果會返回一個能夠被yield 的生成器 Future 對象
    不同凡響的是這個函數的返回值須要以 raise gen.Return() 這種形式返回。
    :return: Future object
    """
    # time.sleep(10)     # time.sleep() 是blocking 的,不支持異步操做,我剛開始測試tornado的時候坑了
    yield gen.sleep(10)  # 使用這個方法代替上面的方法模擬 I/O 等待的狀況, 能夠點進去看下這個方法的介紹
    raise gen.Return('Non-Blocking')
class NonBlockingHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        result = yield doing()
        self.write(result)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

瀏覽器訪問:http://127.0.0.1:8888/nonblocking
瀏覽器訪問:http://127.0.0.1:8888/index
你會發現nonblocking會一直在轉圈,處於一個堵塞狀態。
你再訪問index頁面,你發現index頁面可以訪問不受影響。
包含了 yield 關鍵字的函數是一個 生成器(generator). 全部的生成器都是異步的; 當調用它們的時候,會返回一個生成器對象,而不是一個執行完的結果. @gen.coroutine 裝飾器經過 yield 表達式和生成器進行交流, 並且經過返回一個 Future 與協程的調用方進行交互. 協程通常不會拋出異常: 它們拋出的任何異常將被 Future 捕獲 直到它被獲得. 這意味着用正確的方式調用協程是重要的, 不然你可能有被 忽略的錯誤。@gen.coroutine 可讓你的函數以異步協程的形式運行,可是依賴第三方的異步庫,要求你的函數自己不是blocking的。例如上面的os.sleep() 方法是blocking 的,沒辦法實現異步非阻塞。django

tornado封裝的協程+Future

上面提到Future 究竟是什麼呢,原始的 Future 版本十分複雜, 可是 Futures 是 Tornado 中推薦使用的一種作法, 由於它有兩個主要的優點. 錯誤處理時經過 Future.result 函數能夠簡單的拋出一個異常 (不一樣於某些傳統的基於回調方式接口的 一對一的錯誤處理方式), 並且 Futures 對於攜程兼容的很好. 咱們這裏簡單使用一下future 寫一個異步函數。瀏覽器

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import tornado.web
from tornado import gen
from tornado.concurrent import Future
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
def doing():
    future = Future()
    # here doing some things ...
    future.set_result('Non-Blocking')
    return future
class NonBlockingHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        result = yield doing()
        self.write(result)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Python 3.5: async and await

官方還介紹了在另外一種寫法, Python 3.5 引入了 async 和 await 關鍵字(使用這些關鍵字的 函數也被稱爲」原生協程」). 從Tornado 4.3, 你能夠用它們代替 yield 爲基礎的協程. 只須要簡單的使用 async def foo() 在函數定義的時候代替 @gen.coroutine 裝飾器, 用 await 代替yield. 本文檔的其餘部分會繼續使用 yield的風格來和舊版本的Python兼容, 可是若是 async 和 await 可用的話,它們運行起來會更快ruby

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import tornado.web
from tornado import gen
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
async def doing():
    await gen.sleep(10)  # here are doing some things
    return 'Non-Blocking'
class NonBlockingHandler(tornado.web.RequestHandler):
    async def get(self):
        result = await doing()
        self.write(result)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

並行執行

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import tornado.web
from tornado import gen
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
@gen.coroutine
def doing():
    yield gen.sleep(10)
    raise gen.Return('Non-Blocking')
class NonBlockingHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        result1, result2 = yield [doing(), doing()]
        self.write(result1)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

那async ,await 那種方式能並行執行嗎? 答案也是能夠的:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
# Date: 2017/12/13
import tornado.web
from tornado import gen
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
async def doing():
    await gen.sleep(10)
    return 'Non-Blocking'
class NonBlockingHandler(tornado.web.RequestHandler):
    async def get(self):
        result1, result2 = await gen.convert_yielded([doing(), doing()])
        self.write(result1)
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

await 關鍵字比 yield 關鍵字功能要少一些. 例如,在一個使用 yield 的協程中, 你能夠獲得Futures 列表, 你也可使用 tornado.gen.convert_yielded 來把任何使用 yield 工做的代碼轉換成使用 await 的形式.

線程池

coroutine 是給Non-blocking 函數提供異步協程的方式運行, ThreadPoolExecutor 則能夠給blocking 的函數提供異步的方式運行,可是因爲是多線程的,Python 使用多線程對性能來講是須要謹慎的,大量的計算量的狀況可能會形成性能的降低。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import time
import os
import tornado.web
from tornado import gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
        self.write('index')
        print('index')
class NonBlockingHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(4)
    @gen.coroutine
    def get(self):
        result = yield self.doing()
        self.write(result)
        print(result)
    # 使用tornado 線程池不須要加上下面的裝飾器到I/O函數
    @run_on_executor
    def doing(self):
        # time.sleep(10)
        # yield gen.sleep(10)
        os.system("ping -c 20 www.baidu.com")  # 模擬I/O 任務
        return 'Non-Blocking'
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

設置超時時間

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import time
import datetime
import os
import tornado.web
from tornado import gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
        print('index')
class NonBlockingHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(4)
    @gen.coroutine
    def get(self):
        try:
            start = time.time()
            # 並行執行
            result1, result2 = yield gen.with_timeout(datetime.timedelta(seconds=5), [self.doing(1), self.doing(2)], quiet_exceptions=tornado.gen.TimeoutError)
            self.write("NO Timeout")
            print(result1, result2)
            print(time.time() - start)
        except gen.TimeoutError:
            self.write("Timeout")
            print("Timeout")
            print(time.time() - start)
    # 使用tornado 線程池須要加上下面的裝飾器到I/O函數
    @run_on_executor
    def doing(self, num):
        time.sleep(10)
        return 'Non-Blocking%d' % num
application = tornado.web.Application([
    (r"/index", IndexHandler),
    (r"/nonblocking", NonBlockingHandler),
])
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

多進程運行

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Xian Sen
# Python 3.5
import tornado.web
from tornado import gen
from tornado.httpserver import HTTPServer
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('index')
@gen.coroutine
def doing():
    yield gen.sleep(10)
    raise gen.Return('Non-Blocking')
class NonBlockingHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        result = yield doing()
        self.write(result)
def make_app():
    return tornado.web.Application([
        (r"/index", IndexHandler),
        (r"/nonblocking", NonBlockingHandler),
    ])
def main():
    app = make_app()
    server = HTTPServer(app)
    server.bind(8888)
    server.start(2)  # 設置啓動多少個進程
    tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
    main()
相關文章
相關標籤/搜索